Code source de pypnusershub.routes

# coding: utf8

from __future__ import unicode_literals, print_function, absolute_import, division


"""
routes relatives aux application, utilisateurs et à l'authentification
"""

import json
import logging

import datetime
from flask_login import login_user, logout_user, current_user
from flask import (
    Blueprint,
    request,
    Response,
    current_app,
    redirect,
    g,
    make_response,
    jsonify,
)
from markupsafe import escape

from sqlalchemy.orm import exc
import sqlalchemy as sa
from werkzeug.exceptions import BadRequest, Forbidden

from pypnusershub.utils import get_current_app_id
from pypnusershub.db import models, db
from pypnusershub.db.tools import (
    encode_token,
)
from pypnusershub.schemas import OrganismeSchema, UserSchema


[docs] log = logging.getLogger(__name__)
# This module was originally designed as a submodule of designed # to be a submodule for https://github.com/PnX-SI/TaxHub/ # The original behavior from the lib is to rely on the side effects of # a file called "server.py" in TaxHub, specially a function "init_app()" # that is globally called to initialised the current application object. # To avoid coupling, we replaced most call to init_app() by flask.current_app, # which does the same job in the context of a request. # However, there are still 3 use cases not cover by this: # - TaxHub app initialization: be provide it by having a routes.py at the # root of this project where init_app() is imported and called. Because # it will be imported automatically by TaxHub, but only by TaxHub, it # should not cause problems. # - The cookie expiration is manage in a callback registered in init__app(). # If we want this behavior to be preserved, we need to register the # callback as well, but we can't use current_app object because the # registration happens outside of the req/res cycle. Hence we create a # custom Blueprint object, which register method is called once the # root app object is created. We can then register the callback from here. # To avoid TaxHub to register this callback twice, the registration happens # only if we request it using a 'COOKIE_AUTORENEW' setting. # - The DB needs to be registered on the app. We use the same trick, but # but the param is called 'INIT_APP_WITH_DB' and default to True. # - the 'login' url must be configuratble. We provide this with the # 'LOGIN_ROUTE' param, but we still default to '/login' and POST.
[docs] class ConfigurableBlueprint(Blueprint):
[docs] def register(self, app, *args, **kwargs): # set cookie autorenew app.config["PASS_METHOD"] = app.config.get("PASS_METHOD", "hash") app.config["REMEMBER_COOKIE_NAME"] = app.config.get( "REMEMBER_COOKIE_NAME", "token" ) parent = super(ConfigurableBlueprint, self) parent.register(app, *args, **kwargs) @app.before_request def load_current_user(): g.current_user = current_user
[docs] routes = ConfigurableBlueprint("auth", __name__)
# retrocompatibilité before 2.0 from pypnusershub.decorators import check_auth @routes.route("/login", methods=["POST"])
[docs] def login(): user_data = request.json try: login = user_data.get("login") password = user_data.get("password") id_app = user_data.get("id_application", get_current_app_id()) if id_app is None or login is None or password is None: msg = json.dumps( "One of the following parameter is required ['id_application', 'login', 'password']" ) return Response(msg, status=400) app = db.session.get(models.Application, id_app) if not app: raise BadRequest(f"No app for id {id_app}") user = db.session.execute( sa.select(models.User) .where(models.User.identifiant == login) .where(models.User.filter_by_app()) ).scalar_one() user_dict = UserSchema(exclude=["remarques"], only=["+max_level_profil"]).dump( user ) except exc.NoResultFound as e: msg = json.dumps( { "type": "login", "msg": ( 'No user found with the username "{login}" for ' 'the application with id "{id_app}"' ).format(login=escape(login), id_app=id_app), } ) log.info(msg) status_code = current_app.config.get("BAD_LOGIN_STATUS_CODE", 490) return Response(msg, status=status_code) if not user.check_password(user_data["password"]): msg = json.dumps({"type": "password", "msg": "Mot de passe invalide"}) log.info(msg) status_code = current_app.config.get("BAD_LOGIN_STATUS_CODE", 490) return Response(msg, status=status_code) login_user(user) # Génération d'un token token = encode_token(user_dict) token_exp = datetime.datetime.now(datetime.timezone.utc) token_exp += datetime.timedelta(seconds=current_app.config["COOKIE_EXPIRATION"]) return jsonify( {"user": user_dict, "expires": token_exp.isoformat(), "token": token.decode()} )
@routes.route("/public_login", methods=["POST"])
[docs] def public_login(): if not current_app.config.get("PUBLIC_ACCESS_USERNAME", {}): raise Forbidden login = current_app.config.get("PUBLIC_ACCESS_USERNAME") user = db.session.execute( sa.select(models.User) .where(models.User.identifiant == login) .where(models.User.filter_by_app(code_app="GN")) ).scalar_one() user_dict = user.as_dict() login_user(user) # Génération d'un token token = encode_token(user_dict) token_exp = datetime.datetime.now(datetime.timezone.utc) token_exp += datetime.timedelta(seconds=current_app.config["COOKIE_EXPIRATION"]) return jsonify( {"user": user_dict, "expires": token_exp.isoformat(), "token": token.decode()} )
@routes.route("/logout", methods=["GET", "POST"])
[docs] def logout(): params = request.args if "redirect" in params: resp = redirect(params["redirect"], code=302) else: resp = make_response() logout_user() return resp
[docs] def insert_or_update_organism(organism): """ Insert a organism """ organism_schema = OrganismeSchema() organism = organism_schema.load(organism) db.session.add(organism) return organism_schema.dump(organism)
[docs] def insert_or_update_role(data): """ Insert or update a role (also add groups if provided) """ user_schema = UserSchema(only=["groups"]) user = user_schema.load(data) db.session.add(user) return user_schema.dump(user)