# coding: utf8
# Fichier contenant les routes pour faire des appels à UsersHub
#
# - en se connectant en tant que qu'administrateur de l'application courante
# -- qui est utilisateur de USERSHUB avec des droits suffisants pour creer des utilisateurs
# -- pour cela il faut renseigner dans la configuration
# --
#
# - pour creer des nouveaux utilisateurs
#
# - pour changer les mots de passe
#
# - pour éditer les droits des utilisateurs
#
# - etc..
import json
import requests
from flask import (
Blueprint,
request,
session,
)
from functools import wraps
from .db.models import Application, UserApplicationRight, AppUser, db
from .env import REGISTER_POST_ACTION_FCT
from flask import current_app
from sqlalchemy import select
from pypnusershub.utils import get_current_app_id
# from .utils_register import connect_usershub, req_json_or_text
# from .routes import check_auth
[docs]
bp = Blueprint("register", __name__)
[docs]
def get_json_request(r):
"""
r : retour de la requete requests
fonction pour recuperer la reponse json sans lever d'erreur
"""
try:
r_json = r.json()
except Exception:
r_json = None
return r_json
[docs]
def req_json_or_text(r, msg_pypn=""):
"""
r : retour de la requete requests
msg_pypn : message supplementaire rajouté a la reponse
revoie un tuple avec la réponse de la requete en json r.json si possible
{'msg': r.text} sinon
et status_code
"""
r_json = get_json_request(r)
if not r_json and r.text:
r_json = {"msg": r.text}
if not r_json and not r.text:
r_json = {"msg": "empty message"}
if msg_pypn:
r_json["msg_pypn"] = msg_pypn
return json.dumps(r_json), r.status_code
[docs]
def connect_admin():
"""
decorateur pour la connexion de l'admin a une appli
ici url config['URL_USERSHUB'] sans / à la fin
"""
def _connect_admin(f):
@wraps(f)
def __connect_admin(*args, **kwargs):
# connexion à usershub
id_app_usershub = db.session.scalar(
select(Application.id_application)
.where(Application.code_application == "UH")
.limit(1)
)
if not id_app_usershub:
return json.dumps({"msg": "Pas d'id app USERSHUB"}), 500
# test si on est déjà connecté
try:
r = s.post(
current_app.config["URL_USERSHUB"] + "/api_register/test_connexion",
headers={"Content-Type": "application/json"},
)
b_connexion = r.status_code == 200
except requests.ConnectionError:
return (
json.dumps(
{
"msg": "Erreur de connexion a l'application USERSHUB (causes possibles : url erronee, application USERSHUB ne fonctionne pas, ..;)"
}
),
500,
)
# si on est pas connecté on se connecte
if not b_connexion:
# connexion à usershub
r = s.post(
current_app.config["URL_USERSHUB"] + "/" + "pypn/auth/login",
json={
"login": current_app.config["ADMIN_APPLICATION_LOGIN"],
"password": current_app.config["ADMIN_APPLICATION_PASSWORD"],
"id_application": id_app_usershub,
},
)
# si echec de connexion
if r.status_code != 200:
return req_json_or_text(r, "Problème de connexion à UsersHub")
return f(*args, **kwargs)
return __connect_admin
return _connect_admin
@bp.route("test_uh", methods=["GET"])
@connect_admin()
[docs]
def test():
"""
route pour tester le décorateur connect_admin
ainsi que les paramètres de connexion à USERSHUB:
- config['ADMIN_APPLICATION_LOGIN']
- config['ADMIN_APPLICATION_PASSWORD']
"""
r = s.post(current_app.config["URL_USERSHUB"] + "/api_register/test_connexion")
return req_json_or_text(r, "Test pypn")
@bp.route("post_usershub/<string:type_action>", methods=["POST"])
@connect_admin()
[docs]
def post_usershub(type_action):
"""
route generique pour appeler les routes UsersHub en tant qu'administrateur de l'appli en cours
ex : post_usershub/test_connexion appelle la route URL_USERSHUB/api_register/test_connexion
"""
# attribution des droits pour les actions
dict_type_action_droit = {
"test_connexion": 0,
"valid_temp_user": 0,
"create_temp_user": 0,
"change_password": 0,
"create_cor_role_token": 0,
"add_application_right_to_role": 0,
"login_recovery": 0,
"password_recovery": 0,
"update_user": 1,
"change_application_right": 4,
}
params = request.args
id_droit = 0
if session.get("current_user", None):
id_role = session["current_user"]["id_role"]
q = (
db.select(AppUser.id_droit_max)
.filter(AppUser.id_role == id_role)
.filter(AppUser.id_application == get_current_app_id())
)
id_droit = db.session.execute(q).scalar_one()[0]
# si pas de droit definis pour cet action, alors les droits requis sont à 7 => action impossible
if id_droit < dict_type_action_droit.get(type_action, 7):
return (
json.dumps(
{"msg": "Droits insuffisant pour la requête UsersHub : " + type_action}
),
403,
)
# les test de paramètres seront faits dans UsersHub
data = request.get_json()
url = current_app.config["URL_USERSHUB"] + "/" + "api_register/" + type_action
r_usershub = s.post(url, json=data)
# after request definir route dans app
# par ex. pour l'envoi de mails
# lancer uniquement si enable_post_action n'est pas = False dans le body de la requête
if r_usershub.status_code == 200 and data.get("enable_post_action", True):
out_after = after_request(type_action, get_json_request(r_usershub))
# 0 = pas d'action definie dans current_app.config['after_USERSHUB_request'][type_action]
if out_after != 0:
if out_after["msg"] != "ok":
return (
json.dumps(
{
"msg": "Problème after request pour post_usershub "
+ type_action
+ ":"
+ out_after["msg"]
}
),
500,
)
return req_json_or_text(r_usershub)
[docs]
def after_request(type_action, data, *args, **kwargs):
"""
lorsqu'une fonction est definie dans REGISTER_POST_ACTION_FCT[type_action]
elle est executée avec les données fournies en retour de la requete USERSHUB
"""
if not REGISTER_POST_ACTION_FCT:
return 0
f = REGISTER_POST_ACTION_FCT.get(type_action, None)
if not f:
return 0
return f(data)