Code source de geonature.core.users.routes

import json
import logging
from functools import wraps

import requests
from flask import (
    Blueprint,
    Response,
    current_app,
    g,
    render_template,
    request,
)
from geonature.core.gn_meta.models import CorDatasetActor, TDatasets
from geonature.core.gn_permissions import decorators as permissions
from geonature.core.users.models import VUserslistForallMenu
from geonature.core.users.register_post_actions import (
    execute_actions_after_validation,
    send_email_for_recovery,
    validate_temp_user,
    send_email_for_mail_change,
    send_email_to_old_mail,
)
from geonature.utils.env import DB, db
from pypnusershub.db.models import Application, Organisme, User, UserList
from pypnusershub.organisms_manager import (
    insert_or_update_organism,
    delete_organism as delete_organism_db,
)
from pypnusershub.auth import user_manager

from sqlalchemy import and_, select, func
from utils_flask_sqla.response import json_resp
from werkzeug.exceptions import BadRequest, Forbidden, InternalServerError, NotFound
from werkzeug.datastructures import MultiDict

[docs] routes = Blueprint("users", __name__, template_folder="templates")
[docs] log = logging.getLogger()
[docs] s = requests.Session()
[docs] user_fields = { "id_role", "identifiant", "nom_role", "prenom_role", "nom_complet", "id_organisme", "groupe", "active", "remarques", }
[docs] organism_fields = { "id_organisme", "uuid_organisme", "nom_organisme", }
@routes.route("/menu/<int:id_menu>", methods=["GET"]) @routes.route("/menu/", methods=["GET"]) @json_resp
[docs] def get_roles_by_menu_id(id_menu=None): """ Returns the list of roles associated with a menu Parameters ---------- id_menu : int The id of user list (utilisateurs.bib_list) nom_complet : str, optional Beginning of complete name of the role GET parameters -------------- :param id_menu: the id of user list (utilisateurs.bib_list) :type id_menu: int :query str nom_complet: beginning of complet name of the role """ query = select(VUserslistForallMenu).distinct(VUserslistForallMenu.nom_complet) if id_menu: query = query.filter_by(id_menu=id_menu) if nom_complet := request.args.get("nom_complet"): query = query.where(VUserslistForallMenu.nom_complet.ilike(f"{nom_complet}%")) data = DB.session.scalars(query.order_by(VUserslistForallMenu.nom_complet.asc())).all() return [n.as_dict() for n in data]
@routes.route("/menu_from_code/<string:code_liste>", methods=["GET"]) @json_resp
[docs] def get_roles_by_menu_code(code_liste): """ Returns the list of roles associated with a user list (identified by its code) Parameters ---------- code_liste : str The code of user list (utilisateurs.t_lists) nom_complet : str, optional Beginning of complete name of the role, default None Returns ------- list A list of roles associated with the user list """ query = select(VUserslistForallMenu).join( UserList, and_( UserList.id_liste == VUserslistForallMenu.id_menu, UserList.code_liste == code_liste, ), ) parameters = request.args if parameters.get("nom_complet"): query = query.where( VUserslistForallMenu.nom_complet.ilike("{}%".format(parameters.get("nom_complet"))) ) data = DB.session.scalars(query.order_by(VUserslistForallMenu.nom_complet.asc())).all() return [n.as_dict() for n in data]
@routes.route("/listes", methods=["GET"]) @json_resp
[docs] def get_listes(): query = select(UserList) lists = DB.session.scalars(query).all() return [l.as_dict() for l in lists]
@routes.route("/role/<int:id_role>", methods=["GET"]) @permissions.login_required @json_resp
[docs] def get_role(id_role): """ Get role detail Parameters ---------- id_role : int the id user Returns ------- dict A dictionary containing the role detail """ user = DB.get_or_404(User, id_role) fields = user_fields.copy() if g.current_user == user: fields.add("email") return user.as_dict(fields=fields)
@routes.route("/roles", methods=["GET"]) @permissions.login_required @json_resp
[docs] def get_roles(): """ Get all roles .. :quickref: User; """ params = request.args.to_dict() query = select(User) if "group" in params: query = query.where(User.groupe == params["group"]) if "orderby" in params: try: order_col = getattr(User.__table__.columns, params.pop("orderby")) query = query.order_by(order_col) except AttributeError: raise BadRequest("the attribute to order on does not exist") return [user.as_dict(fields=user_fields) for user in DB.session.scalars(query).all()]
@routes.route("/organisms", methods=["GET"]) @permissions.login_required @json_resp
[docs] def get_organismes(): """ Get all organisms .. :quickref: User; """ params = request.args.to_dict() query = select(Organisme) order_by_cols = [] if "search" in params: search = params.pop("search") query = query.where(func.word_similarity(Organisme.nom_organisme, search) > 0.7) order_by_cols = [func.word_similarity(Organisme.nom_organisme, search).desc()] if "orderby" in params: order_params = params["orderby"].split(":") try: order_col = getattr(Organisme.__table__.columns, order_params[0]) if len(order_params) > 1: order_col = order_col.asc() if order_params[1] == "asc" else order_col.desc() order_by_cols.append(order_col) except AttributeError: raise BadRequest("the attribute to order on does not exist") if order_by_cols: query = query.order_by(*order_by_cols) return [ organism.as_dict(fields=organism_fields) for organism in DB.session.scalars(query).all() ]
@routes.route("/organisms_dataset_actor", methods=["GET"]) @permissions.login_required @json_resp
[docs] def get_organismes_jdd(): """ Get all organisms and the JDD where there are actor and where the current user hase autorization with its cruved .. :quickref: User; """ params = request.args.to_dict() datasets = DB.session.scalars(TDatasets.filter_by_readable()).unique().all() datasets = [d.id_dataset for d in datasets] query = ( select(Organisme) .join(CorDatasetActor, Organisme.id_organisme == CorDatasetActor.id_organism) .where(CorDatasetActor.id_dataset.in_(datasets)) .distinct() ) if "orderby" in params: try: order_col = getattr(Organisme.__table__.columns, params.pop("orderby")) query = query.order_by(order_col) except AttributeError: raise BadRequest("the attribute to order on does not exist") return [ organism.as_dict(fields=organism_fields) for organism in DB.session.scalars(query).unique().all() ]
@routes.route("/organism/<int:id_organisme>", methods=["GET"]) @permissions.login_required @json_resp
[docs] def get_organism(id_organisme): """ Get complete organism details by ID .. :quickref: User; Returns: dict: Complete organism information including all fields """ organism = DB.session.get(Organisme, id_organisme) if not organism: raise NotFound("Organism not found") return organism.as_dict()
@routes.route("/organism", methods=["POST"]) @permissions.check_cruved_scope("C", module_code="GEONATURE", object_code="ORGANISM") @json_resp
[docs] def create_organism() -> dict: """ Create a new organism .. :quickref: User; Request body should contain: - nom_organisme (required): organism name - adresse_organisme (optional): address - cp_organisme (optional): postal code - ville_organisme (optional): city - tel_organisme (optional): telephone - fax_organisme (optional): fax - email_organisme (optional): email - url_organisme (optional): website URL - url_logo (optional): logo URL Returns: dict: The created organism with its ID """ data = request.get_json() if not data.get("nom_organisme"): raise BadRequest("Organism name is required") new_organism = { "nom_organisme": data.get("nom_organisme"), "adresse_organisme": data.get("adresse_organisme"), "cp_organisme": data.get("cp_organisme"), "ville_organisme": data.get("ville_organisme"), "tel_organisme": data.get("tel_organisme"), "fax_organisme": data.get("fax_organisme"), "email_organisme": data.get("email_organisme"), "url_organisme": data.get("url_organisme"), "url_logo": data.get("url_logo"), } try: new_organism_add = insert_or_update_organism(new_organism) return new_organism_add except Exception as e: log.error(f"Error creating organism: {str(e)}") raise InternalServerError(f"Error creating organism: {str(e)}")
################################### ### ACCOUNT_MANAGEMENT ROUTES ##### ###################################
[docs] def check_sign_up_enabled(key): """ Decorator to check if a user management feature is enabled. Parameters ---------- key : str The key of the feature. Must be one of: ENABLE_SIGN_UP, ENABLE_ACCOUNT_MANAGEMENT, AUTO_ACCOUNT_CREATION """ def decorator(f): @wraps(f) def decorated_function(*args, **kwargs): valid_keys = ["ENABLE_SIGN_UP", "ENABLE_USER_MANAGEMENT", "AUTO_ACCOUNT_CREATION"] if key not in valid_keys: raise KeyError(f"{key} is not a valid feature key. Must be one of {valid_keys}") if not current_app.config["ACCOUNT_MANAGEMENT"].get(key, False): raise NotFound("Page not found") return f(*args, **kwargs) return decorated_function return decorator
@routes.route("/inscription", methods=["POST"]) @check_sign_up_enabled("ENABLE_SIGN_UP")
[docs] def inscription(): """ Add a user to temporary users table from the GeoNature interface Works according to the 'ENABLE_SIGN_UP' authorization in the config. """ data = request.get_json() # ajout des valeurs non présentes dans le form data["id_application"] = ( DB.session.execute( select(Application).filter_by(code_application=current_app.config["CODE_APPLICATION"]) ) .scalar_one() .id_application ) data["groupe"] = False try: token_data = user_manager.create_temp_user(data) validate_temp_user(token_data) except Exception as e: raise BadRequest(str(e)) return {"message": "Subscription created"}, 200
@routes.route("/login/recovery", methods=["POST"]) @check_sign_up_enabled("ENABLE_USER_MANAGEMENT")
[docs] def login_recovery(): """ Send an email with the user login and a link to reset its password Only works if 'ENABLE_SIGN_UP' is enabled """ if not current_app.config.get("ACCOUNT_MANAGEMENT").get("ENABLE_USER_MANAGEMENT", False): raise NotFound("Page not found") data = request.get_json() try: user_manager.create_cor_role_token(data["email"]) user = db.session.execute( select(User).where(User.email == data["email"]), ).scalar_one() send_email_for_recovery(user) except Exception as e: raise BadRequest(str(e)) return {"message": "Token created"}, 200
@routes.route("/confirmation", methods=["GET"]) @check_sign_up_enabled("ENABLE_SIGN_UP")
[docs] def confirmation(): """ Validate a user account after a request (this action is triggered by the link in the email) Create a personal JDD as post_action if the parameter AUTO_DATASET_CREATION is set to True """ token = request.args.get("token", None) if token is None: raise BadRequest("Token not found") data = { "token": token, "id_application": DB.session.execute( select(Application).filter_by(code_application=current_app.config["CODE_APPLICATION"]) ) .scalar_one() .id_application, } try: user_data = user_manager.valid_temp_user(data) execute_actions_after_validation(user_data) except Exception as e: raise BadRequest(str(e)) return {"message": "Account validated"}, 200
@routes.route("/role", methods=["PUT"]) @permissions.login_required @json_resp @check_sign_up_enabled("ENABLE_USER_MANAGEMENT")
[docs] def update_role(): """ Modify the role of the user associated with the current token. """ data = dict(request.get_json()) user = g.current_user if user.is_public: raise Forbidden attliste = [k for k in data] for att in attliste: if not getattr(User, att, False): data.pop(att) # liste des attributs qui ne doivent pas être modifiable par l'user black_list_att_update = [ "active", "date_insert", "date_update", "groupe", "id_organisme", "id_role", "pass_plus", "pn", "uuid_role", ] for key, value in data.items(): if key not in black_list_att_update: setattr(user, key, value) DB.session.merge(user) DB.session.commit() DB.session.flush() return user.as_dict()
@routes.route("/password/change", methods=["PUT"]) @permissions.login_required @json_resp @check_sign_up_enabled("ENABLE_USER_MANAGEMENT")
[docs] def change_password_route(): """ Change the password of the connected user """ user = g.current_user data = request.get_json() init_password = data.get("init_password", None) if not init_password: if not data.get("token", None): raise BadRequest("No Token was found") else: if not user.check_password(data.get("init_password", None)): raise BadRequest("Initial password is incorrect") try: new_token = user_manager.create_cor_role_token(user.email)["token"] except Exception as e: raise InternalServerError(f"Error when creating a new token: {str(e)}") data["token"] = new_token required_fields = ["password", "password_confirmation", "token"] if not all(field in data for field in required_fields): raise InternalServerError("Missing required fields for password change") try: user_manager.change_password( data.get("token"), data.get("password"), data.get("password_confirmation") ) except user_manager.PrintableException as printable_exception: raise BadRequest(str(printable_exception)) return {"message": "Password changed with success"}, 200
@routes.route("/password/new", methods=["PUT"]) @json_resp @check_sign_up_enabled("ENABLE_USER_MANAGEMENT")
[docs] def new_password(): """ Changes the password of a user after they requested a password recovery Requires a token sent by mail to the user """ data = dict(request.get_json()) if not data.get("token", None): raise BadRequest("No token provided") try: user_manager.change_password( data.get("token"), data.get("password"), data.get("password_confirmation") ) except user_manager.PrintableException as printable_exception: raise BadRequest(str(printable_exception)) return {"message": "Password changed with success"}, 200
@routes.route("/mail/change", methods=["PUT"]) @json_resp @check_sign_up_enabled("ENABLE_USER_MANAGEMENT")
[docs] def new_mail(): """ Send a mail to the user with a link to confirm his new mail address """ user = g.current_user data = request.get_json() new_mail = data.get("new_mail", None) if not new_mail: raise BadRequest("No new mail provided") send_email_for_mail_change(new_mail, user) return {"message": f"Confirmation mail sent to {new_mail}"}, 200
@routes.route("/mail/new", methods=["PUT"]) @json_resp @check_sign_up_enabled("ENABLE_USER_MANAGEMENT")
[docs] def confirm_new_mail(): """ Change the email address of the user. Notes ----- Not required but this route is meant to be called after the `new mail` route. """ user = g.current_user data = MultiDict(request.get_json()) new_mail = data.get("new_mail", default=None) user_id = data.get("user", default=None, type=int) if not new_mail: raise BadRequest("No new mail provided") if not user.id_role == user_id: raise BadRequest(f"User id does not match user connected {user.id_role} != {user_id}") send_email_to_old_mail(user) user_manager.change_mail(user.id_role, new_mail) return {"message": f"Mail successfully changed"}, 200