Code source de pypnusershub.db.tools

# coding: utf8

from __future__ import unicode_literals, print_function, absolute_import, division

"""
    DB tools not related to any model in particular.
"""
import logging
from datetime import datetime, timedelta

from flask import current_app

from sqlalchemy.orm.exc import NoResultFound
import sqlalchemy as sa
from authlib.jose import JsonWebToken
from authlib.jose.errors import ExpiredTokenError, JoseError

from pypnusershub.db import models
from pypnusershub.utils import text_resource_stream, get_current_app_id

from pypnusershub.env import db

[docs] log = logging.getLogger(__name__)
[docs] class AccessRightsError(Exception): pass
[docs] class InsufficientRightsError(AccessRightsError): pass
[docs] class AccessRightsExpiredError(AccessRightsError): pass
[docs] class UnreadableAccessRightsError(AccessRightsError): pass
[docs] class NoPasswordError(Exception): pass
[docs] class DifferentPasswordError(Exception): pass
# def init_schema(con_uri): # with text_resource_stream('schema.sql', 'pypnusershub.db') as sql_file: # sql = sql_file.read() # engine = sa.create_engine(con_uri) # with engine.connect(): # engine.execute(sql) # engine.execute("COMMIT") # def delete_schema(con_uri): # engine = sa.create_engine(con_uri) # with engine.connect(): # engine.execute("DROP SCHEMA IF EXISTS utilisateurs CASCADE") # engine.execute("COMMIT") # def reset_schema(con_uri): # delete_schema(con_uri) # init_schema(con_uri)
[docs] def load_fixtures(con_uri): with text_resource_stream("fixtures.sql", "pypnusershub.db") as sql_file: engine = sa.create_engine(con_uri) with engine.connect(): for line in sql_file: if line.strip(): engine.execute(line) engine.execute("COMMIT")
[docs] def encode_token(payload): expire = datetime.now() + timedelta(seconds=current_app.config["COOKIE_EXPIRATION"]) header = { "alg": "HS256", "exp": int(datetime.timestamp(expire)), } jwt = JsonWebToken(["HS256"]) key = current_app.config["SECRET_KEY"].encode("UTF-8") return jwt.encode(header, payload, key)
[docs] def decode_token(payload): jwt = JsonWebToken(["HS256"]) key = current_app.config["SECRET_KEY"].encode("UTF-8") claims = jwt.decode(payload, key) claims.validate() return dict(claims)
[docs] def user_to_token(user): return encode_token(user.as_dict())
[docs] def user_from_token(token, secret_key=None): """Given a, authentification token, return the matching AppUser instance""" secret_key = secret_key or current_app.config["SECRET_KEY"] try: data = decode_token(token) id_role = data["id_role"] id_app = data["id_application"] id_app_from_config = get_current_app_id() # check that the id_app from the token well corespond to the current_app id_application # for prevent conflit of token between applications on the same domain # if no ID_APP is passed to the app config, we don't check the conformiity of the token # for retro-compatibility reasons if id_app_from_config: if id_app != id_app_from_config: log.info("Invalid token: the token not corespoding to the current app") raise UnreadableAccessRightsError("Token BadSignature", 403) return db.session.execute( sa.select(models.AppUser) .where(models.AppUser.id_role == id_role) .where(models.AppUser.id_application == id_app) ).scalar_one() except NoResultFound: raise UnreadableAccessRightsError( 'No user withd id "{}" for app "{}"'.format(id_role, id_app) ) except ExpiredTokenError: raise AccessRightsExpiredError("Token expired") except JoseError: raise UnreadableAccessRightsError("Invalid token", 403)