Code source de app.models

import hashlib

from flask import current_app, jsonify
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy import ForeignKey, distinct, or_, desc
from sqlalchemy.dialects.postgresql import UUID, JSONB
from sqlalchemy.sql import select, func
from sqlalchemy.orm import synonym, relationship, backref
from pypnusershub.db.models import check_and_encrypt_password

from app.env import db
from app.utils.utilssqlalchemy import serializable
from app.genericRepository import GenericRepository


"""
    Fichier contenant les models de la base de données
"""


@serializable
[docs] class Bib_Organismes(GenericRepository): """ Model de la table Bib_Organismes """
[docs] __tablename__ = "bib_organismes"
[docs] __table_args__ = {"schema": "utilisateurs", "extend_existing": True}
[docs] id_organisme = db.Column(db.Integer, primary_key=True)
[docs] uuid_organisme = db.Column( UUID(as_uuid=True), default=select([func.uuid_generate_v4()]) )
[docs] nom_organisme = db.Column(db.Unicode)
[docs] adresse_organisme = db.Column(db.Unicode)
[docs] cp_organisme = db.Column(db.Unicode)
[docs] ville_organisme = db.Column(db.Unicode)
[docs] tel_organisme = db.Column(db.Unicode)
[docs] fax_organisme = db.Column(db.Unicode)
[docs] email_organisme = db.Column(db.Unicode)
[docs] url_organisme = db.Column(db.Unicode)
[docs] id_parent = db.Column(db.Integer)
@serializable
[docs] class TRoles(GenericRepository): """ Model de la table t_roles """
[docs] __tablename__ = "t_roles"
[docs] __table_args__ = {"schema": "utilisateurs", "extend_existing": True}
[docs] id_role = db.Column(db.Integer, primary_key=True)
[docs] groupe = db.Column(db.Boolean)
[docs] uuid_role = db.Column(UUID(as_uuid=True), default=select([func.uuid_generate_v4()]))
[docs] identifiant = db.Column(db.Unicode)
[docs] nom_role = db.Column(db.Unicode)
[docs] prenom_role = db.Column(db.Unicode)
[docs] desc_role = db.Column(db.Unicode)
[docs] pass_md5 = db.Column("pass", db.Unicode)
[docs] pass_plus = db.Column(db.Unicode)
[docs] email = db.Column(db.Unicode)
[docs] id_organisme = db.Column( db.Integer, ForeignKey("utilisateurs.bib_organismes.id_organisme") )
[docs] organisme_rel = relationship("Bib_Organismes")
[docs] remarques = db.Column(db.Unicode)
[docs] active = db.Column(db.Boolean)
[docs] pass5 = db.Column("pass", db.Unicode)
pass_plus = db.Column(db.Unicode)
[docs] champs_addi = db.Column(JSONB)
# Add synonym for column pass # Hack because pass is an python reserved word pass_md5 = synonym("pass5")
[docs] def set_password(self, password, password_confirmation): (self.pass_plus, self.pass_md5) = check_and_encrypt_password( password, password_confirmation, current_app.config["PASS_METHOD"] == "md5" or current_app.config["FILL_MD5_PASS"], )
@classmethod
[docs] def choixSelect(cls, id="id_role", nom="full_name", aucun=None): """ Methode qui retourne une tableau de tuples d'id de roles et de nom de roles ACTIF Avec pour paramètres un id de role et un nom de role Le paramètre aucun si il a une valeur permet de rajouter le tuple (-1,Aucun) au tableau """ # recupère tous les role actif roles = cls.get_all(as_model=True, params=[{"col": "active", "filter": True}]) choices = [] for role in roles: role_as_dict = role.as_dict_full_name() choices.append((role_as_dict[id], role_as_dict[nom])) if aucun != None: choices.append((-1, "Aucun")) return choices
@classmethod
[docs] def choix_group(cls, id, nom, aucun=None): """ Methode qui retourne une tableau de tuples d'id de groupes et de nom de goupes Avec pour paramètres un id de groupe et un nom de groupe Le paramètre aucun si il a une valeur permet de rajouter le tuple (-1,Aucun) au tableau """ q = db.session.query(cls).order_by(cls.identifiant) q = q.filter(cls.groupe == True) data = [data.as_dict(True) for data in q.all()] choices = [] for d in data: choices.append((d[id], d[nom])) if aucun != None: choices.append((-1, "Aucun")) return choices
@classmethod
[docs] def get_user_groups(cls, id_role): """ Get all groups of a user Parameters: id_role (int): the user's id Return: Array<TRoles> """ cor_role_query = db.session.query(CorRoles.id_role_groupe).filter( CorRoles.id_role_utilisateur == id_role ) return db.session.query(TRoles).filter(TRoles.id_role.in_(cor_role_query)).all()
@classmethod
[docs] def get_user_lists(cls, id_role): """ Get all lists of a user Parameters: id_role (int): the user's id Return: Array<TListes> """ # get ids_role list from user and user groups ids_role = [id_role] ids_group = [group.id_role for group in cls.get_user_groups(id_role)] for id in ids_group: ids_role.append(id) cor_role_list_query = db.session.query(CorRoleListe.id_liste).filter( CorRoleListe.id_role.in_(ids_role) ) return ( db.session.query(TListes) .filter(TListes.id_liste.in_(cor_role_list_query)) .all() )
@classmethod
[docs] def get_user_app_profils(cls, id_role, id_application=None): """ Get all listapp profils of a user Parameters: id_role (int): the user's id id_application optional (int): application id Return: Array<CorRoleAppProfil> """ # get ids_role list from user and user groups ids_role = [id_role] ids_group = [group.id_role for group in cls.get_user_groups(id_role)] for id in ids_group: ids_role.append(id) # get right rows in cor_role_app_profil q = ( db.session.query(CorRoleAppProfil) .distinct(CorRoleAppProfil.id_application) .filter(CorRoleAppProfil.id_role.in_(ids_role)) ) if id_application: q = q.filter(CorRoleAppProfil.id_application == id_application) q = q.order_by(CorRoleAppProfil.id_application) rights = q.all() return rights
[docs] def get_full_name(self): """ Methode qui concatène le nom et prénom du role retourne un nom complet """ return " ".join([(self.nom_role or ""), (self.prenom_role or "")])
[docs] def as_dict_full_name(self): """ Methode qui ajout le nom complet d'un role au dictionnaire qui le défini retourne un dictionnaire d'un utilisateur avec une nouvelle 'full_name' """ full_name = self.get_full_name() user_as_dict = self.as_dict() user_as_dict["full_name"] = full_name return user_as_dict
@classmethod
[docs] def test_group(cls, tab): """ Methode qui test si le tableau contient un élement groupe = False, Si c'est le cas alors on remplace le boolean par un string du même nom retourne un tableau avec le groupe sous forme de string """ table = [] for d in tab: if d["groupe"] is False: d["groupe"] = "False" else: d["groupe"] = "True" table.append(d) return table
@classmethod
[docs] def get_user_in_list(cls, id_liste): """ Methode qui retourne un dictionnaire des roles d'une liste Avec pour paramètre un id_liste """ q = db.session.query(cls).filter(cls.active == True) q = q.order_by(desc(cls.nom_role)) q = q.join(CorRoleListe) q = q.filter(id_liste == CorRoleListe.id_liste) data = [data.as_dict_full_name() for data in q.all()] return data
@classmethod
[docs] def get_user_out_list(cls, id_liste): """ Methode qui retourne un dictionnaire de roles n'appartenant pas à une liste Avec pour paramètre un id_liste """ q = db.session.query(cls) q = q.order_by(desc(cls.nom_role)) subquery = ( ~db.session.query(CorRoleListe) .filter(CorRoleListe.id_liste == id_liste) .filter(CorRoleListe.id_role == cls.id_role) .exists() ) q = q.filter(subquery).filter(cls.active == True) data = [data.as_dict_full_name() for data in q.all()] return data
@classmethod
[docs] def get_user_in_group(cls, id_groupe): """ Methode qui retourne un dictionnaire de role appartenant à un groupe Avec pour paramètres un id de role """ q = db.session.query(cls).filter(cls.active == True) q = q.order_by(desc(cls.groupe)) q = q.join(CorRoles) q = q.filter(id_groupe == CorRoles.id_role_groupe) data = [data.as_dict_full_name() for data in q.all()] return data
@classmethod
[docs] def get_user_out_group(cls, id_groupe): """ Methode qui retourne un dictionnaire de role n'appartenant pas à un groupe donné Avec pour paramètre un id de role """ q = db.session.query(cls).filter(cls.id_role != id_groupe) q = q.order_by(desc(cls.groupe)) subquery = db.session.query(CorRoles.id_role_utilisateur).filter( CorRoles.id_role_groupe == id_groupe ) subquery2 = db.session.query(CorRoles.id_role_groupe).filter( CorRoles.id_role_utilisateur == id_groupe ) # TODO a vérifier (problème de récursivité) q = q.filter(cls.id_role.notin_(subquery)) q = q.filter(cls.id_role.notin_(subquery2)) # TODO filtrer les roles actifs data = [data.as_dict_full_name() for data in q.all()] return data
@classmethod
[docs] def get_user_profil_in_app(cls, id_application): """ Methode qui retourne un dictionnaire de roles avec leur profil sur une application Avec pour paramètre un id d'application Ne retourne que les utilisateurs actifs """ # get the user data = ( db.session.query(cls, TProfils) .join(CorRoleAppProfil, cls.id_role == CorRoleAppProfil.id_role) .join(TProfils, TProfils.id_profil == CorRoleAppProfil.id_profil) .filter(cls.active == True) .filter(CorRoleAppProfil.id_application == id_application) .order_by(desc(cls.groupe)) .all() ) user_with_profil = [] for d in data: user = d[0].as_dict_full_name() user["id_profil"] = d[1].id_profil user["profil"] = d[1].nom_profil user_with_profil.append(user) return user_with_profil
@classmethod
[docs] def get_user_profil_out_app(cls, id_application): """ Methode qui retourne un dictionnaire de roles n'ayant pas de droits sur une application Avec pour paramètre un id d'application """ q = db.session.query(cls) q = q.order_by(desc(cls.groupe)) subquery = db.session.query(distinct(CorRoleAppProfil.id_role)).filter( CorRoleAppProfil.id_application == id_application ) q = q.filter(cls.id_role.notin_(subquery)) return [data.as_dict_full_name() for data in q.all()]
@classmethod
[docs] def get_groups(cls): """ Methode qui retourne une liste des roles de type groupe """ q = db.session.query(cls).filter(TRoles.groupe == True) return q.all()
@serializable
[docs] class CorRoles(GenericRepository): """ Classe de correspondance entre un utilisateur et un groupe """
[docs] __tablename__ = "cor_roles"
[docs] __table_args__ = {"schema": "utilisateurs", "extend_existing": True}
[docs] id_role_groupe = db.Column(db.Integer, primary_key=True)
[docs] id_role_utilisateur = db.Column( db.Integer, ForeignKey("utilisateurs.t_roles.id_role"), primary_key=True )
[docs] t_roles = db.relationship("TRoles")
@classmethod
[docs] def test_role_in_grp(cls, id_role, id_group): """ Methode qui retourne vrai si le role appartient au groupe """ in_grp = ( db.session.query(CorRoles) .filter(CorRoles.id_role_utilisateur == id_role) .filter(CorRoles.id_role_groupe == id_group) .all() ) if in_grp: return True return False
@classmethod
[docs] def add_cor(cls, id_group, ids_role): """ Methode qui ajoute des relations roles <-> groupe Avec pour paramètres un id de groupe(id_role) et un tableau d'id de roles """ dict_add = dict() dict_add["id_role_groupe"] = id_group for d in ids_role: dict_add["id_role_utilisateur"] = d cls.post(dict_add)
@classmethod
[docs] def del_cor(cls, id_group, ids_role): """ Methode qui supprime des relations roles <-> groupe Avec pour paramètres un id de groupe(id_role) et un tableau d'id de roles """ for d in ids_role: cls.query.filter(cls.id_role_groupe == id_group).filter( cls.id_role_utilisateur == d ).delete() db.session.commit()
@serializable
[docs] class TListes(GenericRepository): """ Model de la table t_listes """
[docs] __tablename__ = "t_listes"
[docs] __table_args__ = {"schema": "utilisateurs", "extend_existing": True}
[docs] id_liste = db.Column(db.Integer, primary_key=True)
[docs] code_liste = db.Column(db.Unicode)
[docs] nom_liste = db.Column(db.Unicode)
[docs] desc_liste = db.Column(db.Unicode)
@serializable
[docs] class CorRoleListe(GenericRepository): """Classe de correspondance entre la table t_roles et la table t_listes"""
[docs] __tablename__ = "cor_role_liste"
[docs] __table_args__ = {"schema": "utilisateurs", "extend_existing": True}
[docs] id_role = db.Column( db.Integer, ForeignKey("utilisateurs.t_roles.id_role"), primary_key=True )
[docs] id_liste = db.Column( db.Integer, ForeignKey("utilisateurs.t_listes.id_liste"), primary_key=True )
[docs] role_rel = relationship("TRoles")
@classmethod
[docs] def add_cor(cls, id_liste, ids_role): """ Methode qui ajoute des relations roles <-> liste Avec pour paramètres un id de liste et un tableau d'id de roles """ dict_add = dict() dict_add["id_liste"] = id_liste for d in ids_role: dict_add["id_role"] = d cls.post(dict_add)
@classmethod
[docs] def del_cor(cls, id_liste, ids_role): """ Methode qui supprime des relations roles <-> liste Avec pour paramètres un id de liste et un tableau d'id de roles """ for d in ids_role: cls.query.filter(cls.id_liste == id_liste).filter(cls.id_role == d).delete() db.session.commit()
@serializable
[docs] class TApplications(GenericRepository): """ Model de la table t_applications """
[docs] __tablename__ = "t_applications"
[docs] __table_args__ = {"schema": "utilisateurs", "extend_existing": True}
[docs] id_application = db.Column(db.Integer, primary_key=True)
[docs] code_application = db.Column(db.Unicode)
[docs] nom_application = db.Column(db.Unicode)
[docs] desc_application = db.Column(db.Unicode)
[docs] id_parent = db.Column(db.Unicode)
@serializable
[docs] class TProfils(GenericRepository): """ Model de la classe t_profils """
[docs] __tablename__ = "t_profils"
[docs] __table_args__ = {"schema": "utilisateurs", "extend_existing": True}
[docs] id_profil = db.Column(db.Integer, primary_key=True)
[docs] code_profil = db.Column(db.Unicode)
[docs] nom_profil = db.Column(db.Unicode)
[docs] desc_profil = db.Column(db.Unicode)
@classmethod
[docs] def get_profils_in_app(cls, id_application): """ Methode qui retourne tous les profils autorisés dans une app Parameters: id_app (int): l'id de l'application Returns: Array<TProfils> """ return ( db.session.query(TProfils) .join(CorProfilForApp, CorProfilForApp.id_profil == TProfils.id_profil) .filter(CorProfilForApp.id_application == id_application) .order_by(TProfils.code_profil) .all() )
@classmethod
[docs] def get_profil_in_app_with_code(cls, id_application, code_profil): """ Methode qui retourne un profil à partir de son code """ return ( db.session.query(TProfils) .join(CorProfilForApp, CorProfilForApp.id_profil == TProfils.id_profil) .filter(CorProfilForApp.id_application == id_application) .filter(TProfils.code_profil == str(code_profil)) .first() )
@classmethod
[docs] def get_profils_out_app(cls, id_application): """ Methode qui retourne un dictionnaire des profils non utilisés pour une application Avec pour paramètre un id application """ q = db.session.query(cls) subquery = db.session.query(CorProfilForApp.id_profil).filter( CorProfilForApp.id_application == id_application ) q = q.filter(cls.id_profil.notin_(subquery)) return [data.as_dict() for data in q.all()]
@classmethod
[docs] def choixSelect( cls, key="id_profil", label="nom_profil", id_application=None ): # noqa """ Methode qui retourne un tableau de tuples d'id profil et de nom de profil Ce que l'on met en key et label sont paramétrable """ if id_application: profils = cls.get_profils_in_app(id_application) return [(getattr(d, key), getattr(d, label)) for d in profils] return [(getattr(d, key), getattr(d, label)) for d in cls.get_all()]
@serializable
[docs] class CorProfilForApp(GenericRepository): """Classe de correspondance entre la table t_applications et la table t_profils"""
[docs] __tablename__ = "cor_profil_for_app"
[docs] __table_args__ = {"schema": "utilisateurs", "extend_existing": True}
[docs] id_application = db.Column( db.Integer, ForeignKey("utilisateurs.t_applications.id_application"), primary_key=True, )
[docs] id_profil = db.Column( db.Integer, ForeignKey("utilisateurs.t_profils.id_profil"), primary_key=True )
[docs] profil_rel = relationship("TProfils")
@classmethod
[docs] def add_cor(cls, id_application, ids_profil): """ Methode qui ajoute des relations applications <-> profil Avec pour paramètres un id profil et un tableau d'id d'applications """ dict_add = dict() dict_add["id_application"] = id_application for d in ids_profil: dict_add["id_profil"] = d cls.post(dict_add)
@classmethod
[docs] def del_cor(cls, id_application, ids_profil): """ Methode qui supprime des relations applications <-> profil Avec pour paramètres un id profil et un tableau d'id d'applications """ for d in ids_profil: cls.query.filter(cls.id_application == id_application).filter( cls.id_profil == d ).delete() db.session.commit()
@serializable
[docs] class CorRoleAppProfil(GenericRepository): """ Classe de correspondance entre la table t_roles, t_profils et t_applications """
[docs] __tablename__ = "cor_role_app_profil"
[docs] __table_args__ = {"schema": "utilisateurs", "extend_existing": True}
[docs] id_role = db.Column( db.Integer, ForeignKey("utilisateurs.t_roles.id_role"), primary_key=True )
[docs] id_profil = db.Column( db.Integer, ForeignKey("utilisateurs.t_profils.id_profil"), primary_key=True )
[docs] id_application = db.Column( db.Integer, ForeignKey("utilisateurs.t_applications.id_application"), primary_key=True, )
[docs] is_default_group_for_app = db.Column(db.Boolean, default=False)
[docs] role_rel = relationship("TRoles")
[docs] application_rel = relationship("TApplications")
[docs] profil_rel = relationship("TProfils")
# surchage de la méthode get_one # car il n'y a pas de clé primaire unique sur une cor @classmethod
[docs] def get_one(cls, id_role, id_application): return ( db.session.query(cls) .filter_by(id_role=id_role, id_application=id_application) .first() )
@classmethod
[docs] def get_default_for_app(cls, id_application): return ( db.session.query(cls) .filter_by(id_application=id_application) .filter_by(is_default_group_for_app=True) .first() )
# surchage de la méthode delete car # il n'y a pas de clé primaire unique sur une cor # TODO cette méthode supprime tous les profils # pour une application et un role # faire une méthode qui supprime seulement # un enregistrement grace à une PK unique # necessite ne pas utiliser le template # table_database.html qui est trop génériqe @classmethod
[docs] def delete(cls, id_role, id_application): cors = ( db.session.query(cls) .filter_by(id_role=id_role, id_application=id_application) .all() ) for cor in cors: db.session.delete(cor) try: db.session.commit() except Exception: db.session.rollback() raise
@classmethod
[docs] def add_cor(cls, id_app, tab_profil): dict_add = {} for d in tab_profil: dict_add = { "id_role": d["id_role"], "id_profil": d["id_profil"], "id_application": id_app, } cls.post(dict_add)
@classmethod
[docs] def del_cor(cls, id_app, tab_profil): for t in tab_profil: cls.query.filter(cls.id_role == t["id_role"]).filter( cls.id_profil == t["id_profil"] ).filter(cls.id_application == id_app).delete() db.session.commit()