import json
import logging
from functools import wraps
import requests
from flask import (
Blueprint,
Response,
current_app,
g,
render_template,
request,
)
from geonature.core.gn_meta.models import CorDatasetActor, TDatasets
from geonature.core.gn_permissions import decorators as permissions
from geonature.core.users.models import CorRole, VUserslistForallMenu
from geonature.core.users.register_post_actions import (
execute_actions_after_validation,
send_email_for_recovery,
validate_temp_user,
)
from geonature.utils.config import config
from geonature.utils.env import DB, db
from pypnusershub.auth.subscribe import (
change_password,
create_cor_role_token,
create_temp_user,
valid_temp_user,
)
from pypnusershub.db.models import Application, Organisme, User, UserList
from sqlalchemy import and_, select
from sqlalchemy.sql import and_
from utils_flask_sqla.response import json_resp
from werkzeug.exceptions import BadRequest, Forbidden, InternalServerError, NotFound
[docs]
routes = Blueprint("users", __name__, template_folder="templates")
[docs]
log = logging.getLogger()
[docs]
user_fields = {
"id_role",
"identifiant",
"nom_role",
"prenom_role",
"nom_complet",
"id_organisme",
"groupe",
"active",
"remarques",
}
[docs]
organism_fields = {
"id_organisme",
"uuid_organisme",
"nom_organisme",
}
@routes.route("/menu/<int:id_menu>", methods=["GET"])
@json_resp
@routes.route("/menu_from_code/<string:code_liste>", methods=["GET"])
@json_resp
@routes.route("/listes", methods=["GET"])
@json_resp
[docs]
def get_listes():
query = select(UserList)
lists = DB.session.scalars(query).all()
return [l.as_dict() for l in lists]
@routes.route("/role/<int:id_role>", methods=["GET"])
@permissions.login_required
@json_resp
[docs]
def get_role(id_role):
"""
Get role detail
Parameters
----------
id_role : int
the id user
Returns
-------
dict
A dictionary containing the role detail
"""
user = DB.get_or_404(User, id_role)
fields = user_fields.copy()
if g.current_user == user:
fields.add("email")
return user.as_dict(fields=fields)
@routes.route("/roles", methods=["GET"])
@permissions.login_required
@json_resp
[docs]
def get_roles():
"""
Get all roles
.. :quickref: User;
"""
params = request.args.to_dict()
query = select(User)
if "group" in params:
query = query.where(User.groupe == params["group"])
if "orderby" in params:
try:
order_col = getattr(User.__table__.columns, params.pop("orderby"))
query = query.order_by(order_col)
except AttributeError:
raise BadRequest("the attribute to order on does not exist")
return [user.as_dict(fields=user_fields) for user in DB.session.scalars(query).all()]
@routes.route("/organisms", methods=["GET"])
@permissions.login_required
@json_resp
[docs]
def get_organismes():
"""
Get all organisms
.. :quickref: User;
"""
params = request.args.to_dict()
q = select(Organisme)
if "orderby" in params:
try:
order_col = getattr(Organisme.__table__.columns, params.pop("orderby"))
q = q.order_by(order_col)
except AttributeError:
raise BadRequest("the attribute to order on does not exist")
return [organism.as_dict(fields=organism_fields) for organism in DB.session.scalars(q).all()]
@routes.route("/organisms_dataset_actor", methods=["GET"])
@permissions.login_required
@json_resp
[docs]
def get_organismes_jdd():
"""
Get all organisms and the JDD where there are actor and where
the current user hase autorization with its cruved
.. :quickref: User;
"""
params = request.args.to_dict()
datasets = DB.session.scalars(TDatasets.filter_by_readable()).unique().all()
datasets = [d.id_dataset for d in datasets]
query = (
select(Organisme)
.join(CorDatasetActor, Organisme.id_organisme == CorDatasetActor.id_organism)
.where(CorDatasetActor.id_dataset.in_(datasets))
.distinct()
)
if "orderby" in params:
try:
order_col = getattr(Organisme.__table__.columns, params.pop("orderby"))
query = query.order_by(order_col)
except AttributeError:
raise BadRequest("the attribute to order on does not exist")
return [
organism.as_dict(fields=organism_fields)
for organism in DB.session.scalars(query).unique().all()
]
###################################
### ACCOUNT_MANAGEMENT ROUTES #####
###################################
[docs]
def check_sign_up_enabled(key):
"""
Decorator to check if a user management feature is enabled.
Parameters
----------
key : str
The key of the feature. Must be one of:
ENABLE_SIGN_UP, ENABLE_ACCOUNT_MANAGEMENT, AUTO_ACCOUNT_CREATION
"""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
valid_keys = ["ENABLE_SIGN_UP", "ENABLE_USER_MANAGEMENT", "AUTO_ACCOUNT_CREATION"]
if key not in valid_keys:
raise KeyError(f"{key} is not a valid feature key. Must be one of {valid_keys}")
if not current_app.config["ACCOUNT_MANAGEMENT"].get(key, False):
raise NotFound("Page not found")
return f(*args, **kwargs)
return decorated_function
return decorator
@routes.route("/inscription", methods=["POST"])
@check_sign_up_enabled("ENABLE_SIGN_UP")
[docs]
def inscription():
"""
Add a user to temporary users table from the GeoNature interface
Works according to the 'ENABLE_SIGN_UP' authorization in the config.
"""
data = request.get_json()
# ajout des valeurs non présentes dans le form
data["id_application"] = (
DB.session.execute(
select(Application).filter_by(code_application=current_app.config["CODE_APPLICATION"])
)
.scalar_one()
.id_application
)
data["groupe"] = False
try:
token_data = create_temp_user(data)
validate_temp_user(token_data)
except Exception as e:
raise BadRequest(str(e))
return {"message": "Subscription created"}, 200
@routes.route("/login/recovery", methods=["POST"])
@check_sign_up_enabled("ENABLE_USER_MANAGEMENT")
[docs]
def login_recovery():
"""
Send an email with the user login and a link to reset its password
Only works if 'ENABLE_SIGN_UP' is enabled
"""
if not current_app.config.get("ACCOUNT_MANAGEMENT").get("ENABLE_USER_MANAGEMENT", False):
raise NotFound("Page not found")
data = request.get_json()
try:
create_cor_role_token(data["email"])
user = db.session.execute(
select(User).where(User.email == data["email"]),
).scalar_one()
send_email_for_recovery(user)
except Exception as e:
raise BadRequest(str(e))
return {"message": "Token created"}, 200
@routes.route("/confirmation", methods=["GET"])
@check_sign_up_enabled("ENABLE_SIGN_UP")
[docs]
def confirmation():
"""
Validate a user account after a request (this action is triggered by the link in the email)
Create a personal JDD as post_action if the parameter AUTO_DATASET_CREATION is set to True
"""
token = request.args.get("token", None)
if token is None:
raise BadRequest("Token not found")
data = {
"token": token,
"id_application": DB.session.execute(
select(Application).filter_by(code_application=current_app.config["CODE_APPLICATION"])
)
.scalar_one()
.id_application,
}
try:
user_data = valid_temp_user(data)
execute_actions_after_validation(user_data)
except Exception as e:
raise BadRequest(str(e))
return {"message": "Account validated"}, 200
@routes.route("/role", methods=["PUT"])
@permissions.login_required
@json_resp
@check_sign_up_enabled("ENABLE_USER_MANAGEMENT")
[docs]
def update_role():
"""
Modify the role of the user associated with the current token.
"""
data = dict(request.get_json())
user = g.current_user
if user.is_public:
raise Forbidden
attliste = [k for k in data]
for att in attliste:
if not getattr(User, att, False):
data.pop(att)
# liste des attributs qui ne doivent pas être modifiable par l'user
black_list_att_update = [
"active",
"date_insert",
"date_update",
"groupe",
"id_organisme",
"id_role",
"pass_plus",
"pn",
"uuid_role",
]
for key, value in data.items():
if key not in black_list_att_update:
setattr(user, key, value)
DB.session.merge(user)
DB.session.commit()
DB.session.flush()
return user.as_dict()
@routes.route("/password/change", methods=["PUT"])
@permissions.login_required
@json_resp
@check_sign_up_enabled("ENABLE_USER_MANAGEMENT")
[docs]
def change_password_route():
"""
Change the password of the connected user
"""
user = g.current_user
data = request.get_json()
init_password = data.get("init_password", None)
if not init_password:
if not data.get("token", None):
raise BadRequest("No Token was found")
else:
if not user.check_password(data.get("init_password", None)):
raise BadRequest("Initial password is incorrect")
try:
new_token = create_cor_role_token(user.email)["token"]
except Exception as e:
raise InternalServerError(f"Error when creating a new token: {str(e)}")
data["token"] = new_token
required_fields = ["password", "password_confirmation", "token"]
if not all(field in data for field in required_fields):
raise InternalServerError("Missing required fields for password change")
try:
change_password(data.get("token"), data.get("password"), data.get("password_confirmation"))
except Exception as e:
raise BadRequest("An error occurred while changing the password")
return {"message": "Password changed with success"}, 200
@routes.route("/password/new", methods=["PUT"])
@json_resp
@check_sign_up_enabled("ENABLE_USER_MANAGEMENT")
[docs]
def new_password():
"""
Changes the password of a user after they requested a password recovery
Requires a token sent by mail to the user
"""
data = dict(request.get_json())
if not data.get("token", None):
raise BadRequest("No token provided")
try:
change_password(data.get("token"), data.get("password"), data.get("password_confirmation"))
except Exception as e:
raise BadRequest(str(e))
return {"message": "Password changed with success"}, 200