Code source de pypnusershub.utils

# coding: utf8

from __future__ import unicode_literals, print_function, absolute_import, division

    Collections of utilities unrelated to the main module purpose.

import os
import io
from types import ModuleType
from typing import Optional
from urllib.parse import urlsplit

from flask import current_app, Response
from sqlalchemy import select

from pypnusershub.env import db

[docs] class RessourceError(EnvironmentError): def __init__(self, msg, errors): super(RessourceError, self).__init__(msg) self.errors = errors
[docs] def binary_resource_stream(resource, locations): """Return a resource from this path or package""" # convert errors = [] # If location is a string or a module, we wrap it in a tuple if isinstance(locations, (str, ModuleType)): locations = (locations,) for location in locations: # Assume location is a module and try to load it using pkg_resource try: import pkg_resources module_name = getattr(location, "__name__", location) return pkg_resources.resource_stream(module_name, resource) except (ImportError, EnvironmentError) as e: errors.append(e) # Falling back to load it from path. try: # location is a module module_path = __import__(location).__file__ parent_dir = os.path_dirname(module_path) except (AttributeError, ImportError): # location is a dir path parent_dir = os.path.realpath(location) # Now we got a resource full path. Just open it as a regular file. canonical_path = os.path.join(parent_dir, resource) try: return open(os.path.join(canonical_path), mode="rb") except EnvironmentError as e: errors.append(e) msg = ( 'Unable to find resource "%s" in "%s". ' "Inspect RessourceError.errors for list of encountered errors." ) raise RessourceError(msg % (resource, locations), errors)
[docs] def text_resource_stream( path, locations, encoding="utf8", errors=None, newline=None, line_buffering=False ): """Return a resource from this path or package. Transparently decode the stream.""" stream = binary_resource_stream(path, locations) return io.TextIOWrapper(stream, encoding, errors, newline, line_buffering)
[docs] def get_current_app_id(): if "ID_APP" in current_app.config: return current_app.config["ID_APP"] elif "CODE_APPLICATION" in current_app.config: from pypnusershub.db.models import Application return db.session.execute( select(Application.id_application).filter_by( code_application=current_app.config["CODE_APPLICATION"], ) ).scalar_one() else: return None