Code source de geonature.core.auth.routes

"""
    Module d'identificiation provisoire pour test du CAS INPN
"""

import datetime
import xmltodict
import logging
from copy import copy


from flask import (
    Blueprint,
    request,
    make_response,
    redirect,
    current_app,
    jsonify,
    render_template,
    session,
    Response,
)
from sqlalchemy import select
from utils_flask_sqla.response import json_resp

from pypnusershub.db.models import User, Organisme, Application
from pypnusershub.db.tools import encode_token
from pypnusershub.routes import insert_or_update_organism, insert_or_update_role
from geonature.utils import utilsrequests
from geonature.utils.errors import CasAuthentificationError
from geonature.utils.env import db


[docs] routes = Blueprint("gn_auth", __name__, template_folder="templates")
[docs] log = logging.getLogger()
@routes.route("/login_cas", methods=["GET", "POST"])
[docs] def loginCas(): """ Login route with the INPN CAS .. :quickref: User; """ config_cas = current_app.config["CAS"] params = request.args if "ticket" in params: base_url = current_app.config["API_ENDPOINT"] + "/gn_auth/login_cas" url_validate = "{url}?ticket={ticket}&service={service}".format( url=config_cas["CAS_URL_VALIDATION"], ticket=params["ticket"], service=base_url, ) response = utilsrequests.get(url_validate) data = None xml_dict = xmltodict.parse(response.content) resp = xml_dict["cas:serviceResponse"] if "cas:authenticationSuccess" in resp: data = resp["cas:authenticationSuccess"]["cas:user"] if data: ws_user_url = "{url}/{user}/?verify=false".format( url=config_cas["CAS_USER_WS"]["URL"], user=data ) try: response = utilsrequests.get( ws_user_url, ( config_cas["CAS_USER_WS"]["ID"], config_cas["CAS_USER_WS"]["PASSWORD"], ), ) assert response.status_code == 200 except AssertionError: log.error("Error with the inpn authentification service") raise CasAuthentificationError( "Error with the inpn authentification service", status_code=500 ) info_user = response.json() data = insert_user_and_org(info_user, update_user_organism=False) db.session.commit() # creation de la Response response = make_response(redirect(current_app.config["URL_APPLICATION"])) cookie_exp = datetime.datetime.utcnow() expiration = current_app.config["COOKIE_EXPIRATION"] cookie_exp += datetime.timedelta(seconds=expiration) data["id_application"] = ( db.session.execute( select(Application).filter_by( code_application=current_app.config["CODE_APPLICATION"] ) ) .scalar_one() .id_application ) token = encode_token(data) response.set_cookie("token", token, expires=cookie_exp) # User cookie organism_id = info_user["codeOrganisme"] if not organism_id: organism_id = ( db.session.execute( select(Organisme).filter_by(nom_organisme="Autre"), ) .scalar_one() .id_organisme, ) current_user = { "user_login": data["identifiant"], "id_role": data["id_role"], "id_organisme": organism_id, } response.set_cookie("current_user", str(current_user), expires=cookie_exp) return response else: log.info("Erreur d'authentification lié au CAS, voir log du CAS") log.error("Erreur d'authentification lié au CAS, voir log du CAS") return render_template( "cas_login_error.html", cas_logout=current_app.config["CAS_PUBLIC"]["CAS_URL_LOGOUT"], url_geonature=current_app.config["URL_APPLICATION"], ) return jsonify({"message": "Authentification error"}, 500)
@routes.route("/logout_cruved", methods=["GET"]) @json_resp
[docs] def logout_cruved(): """ Route to logout with cruved To avoid multiples server call, we store the cruved in the session when the user logout we need clear the session to get the new cruved session .. :quickref: User; """ copy_session_key = copy(session) for key in copy_session_key: session.pop(key) return "Logout", 200
[docs] def get_user_from_id_inpn_ws(id_user): URL = f"https://inpn.mnhn.fr/authentication/rechercheParId/{id_user}" config_cas = current_app.config["CAS"] try: response = utilsrequests.get( URL, ( config_cas["CAS_USER_WS"]["ID"], config_cas["CAS_USER_WS"]["PASSWORD"], ), ) assert response.status_code == 200 return response.json() except AssertionError: log.error("Error with the inpn authentification service")
[docs] def insert_user_and_org(info_user, update_user_organism: bool = True): organism_id = info_user["codeOrganisme"] organism_name = info_user.get("libelleLongOrganisme", "Autre") user_login = info_user["login"] user_id = info_user["id"] try: assert user_id is not None and user_login is not None except AssertionError: log.error("'CAS ERROR: no ID or LOGIN provided'") raise CasAuthentificationError("CAS ERROR: no ID or LOGIN provided", status_code=500) # Reconciliation avec base GeoNature if organism_id: organism = {"id_organisme": organism_id, "nom_organisme": organism_name} insert_or_update_organism(organism) # Retrieve user information from `info_user` user_info = { "id_role": user_id, "identifiant": user_login, "nom_role": info_user["nom"], "prenom_role": info_user["prenom"], "id_organisme": organism_id, "email": info_user["email"], "active": True, } # If not updating user organism and user already exists, retrieve existing user organism information rather than information from `info_user` existing_user = User.query.get(user_id) if not update_user_organism and existing_user: user_info["id_organisme"] = existing_user.id_organisme # Insert or update user user_info = insert_or_update_role(user_info) # Associate user to a default group if the user is not associated to any group user = existing_user or db.session.get(User, user_id) if not user.groups: if current_app.config["CAS"]["USERS_CAN_SEE_ORGANISM_DATA"] and organism_id: # group socle 2 - for a user associated to an organism if users can see data from their organism group_id = current_app.config["BDD"]["ID_USER_SOCLE_2"] else: # group socle 1 group_id = current_app.config["BDD"]["ID_USER_SOCLE_1"] group = db.session.get(User, group_id) user.groups.append(group) return user_info