# coding: utf8
from __future__ import unicode_literals, print_function, absolute_import, division
from utils_flask_sqla.models import qfilter
"""
mappings applications et utilisateurs
"""
import hashlib
import bcrypt
from bcrypt import checkpw
from os import environ
from importlib import import_module
from packaging import version
from flask_sqlalchemy import SQLAlchemy
import flask_sqlalchemy
if version.parse(flask_sqlalchemy.__version__) >= version.parse("3"):
from flask_sqlalchemy.query import Query
else:
from flask_sqlalchemy import BaseQuery as Query
from flask import current_app
from flask_login import UserMixin
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm import relationship, backref
from sqlalchemy.orm.session import object_session
from sqlalchemy import Sequence, func, ForeignKey, or_
from sqlalchemy.sql import select, func
from sqlalchemy.dialects.postgresql import UUID, JSONB, array
from pypnusershub.db.tools import NoPasswordError, DifferentPasswordError
from pypnusershub.env import db
from pypnusershub.utils import get_current_app_id
from utils_flask_sqla.serializers import serializable
[docs]
def check_and_encrypt_password(password, password_confirmation, md5=False):
if not password:
raise NoPasswordError
if password != password_confirmation:
raise DifferentPasswordError
pass_plus = bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt())
pass_md5 = None
if md5:
pass_md5 = hashlib.md5(password.encode("utf-8")).hexdigest()
return pass_plus.decode("utf-8"), pass_md5
[docs]
def fn_check_password(self, pwd):
if current_app.config["PASS_METHOD"] == "md5":
if not self._password:
raise ValueError("User %s has no password" % (self.identifiant))
return self._password == hashlib.md5(pwd.encode("utf8")).hexdigest()
elif current_app.config["PASS_METHOD"] == "hash":
if not self._password_plus:
raise ValueError("User %s has no password" % (self.identifiant))
return checkpw(pwd.encode("utf8"), self._password_plus.encode("utf8"))
else:
raise ValueError("Undefine crypt method (PASS_METHOD)")
[docs]
cor_roles = db.Table(
"cor_roles",
db.Column(
"id_role_utilisateur",
db.Integer,
db.ForeignKey("utilisateurs.t_roles.id_role"),
primary_key=True,
),
db.Column(
"id_role_groupe",
db.Integer,
db.ForeignKey("utilisateurs.t_roles.id_role"),
primary_key=True,
),
schema="utilisateurs",
extend_existing=True,
)
[docs]
class UserQuery(Query):
[docs]
def filter_by_app(self, code_app=None):
if code_app is None:
code_app = current_app.config["CODE_APPLICATION"]
return (
self.outerjoin(cor_roles, User.id_role == cor_roles.c.id_role_utilisateur)
.outerjoin(
UserApplicationRight,
or_(
UserApplicationRight.id_role == cor_roles.c.id_role_groupe,
UserApplicationRight.id_role == User.id_role,
),
)
.join(
Application,
Application.id_application == UserApplicationRight.id_application,
)
.where(Application.code_application == code_app)
)
@serializable(exclude=["_password", "password", "_password_plus"])
[docs]
class User(db.Model, UserMixin):
[docs]
__tablename__ = "t_roles"
[docs]
__table_args__ = {"schema": "utilisateurs"}
[docs]
query_class = UserQuery
[docs]
groupe = db.Column(db.Boolean, default=False)
[docs]
id_role = db.Column(
db.Integer,
primary_key=True,
)
# TODO: make that unique ?
[docs]
identifiant = db.Column(db.Unicode)
[docs]
nom_role = db.Column(db.Unicode)
[docs]
prenom_role = db.Column(db.Unicode)
[docs]
desc_role = db.Column(db.Unicode)
[docs]
_password = db.Column("pass", db.Unicode)
[docs]
_password_plus = db.Column("pass_plus", db.Unicode)
[docs]
email = db.Column(db.Unicode)
[docs]
id_organisme = db.Column(
db.Integer, ForeignKey("utilisateurs.bib_organismes.id_organisme")
)
[docs]
remarques = db.Column(db.Unicode)
[docs]
champs_addi = db.Column(JSONB)
[docs]
date_insert = db.Column(db.DateTime)
[docs]
date_update = db.Column(db.DateTime)
[docs]
active = db.Column(db.Boolean)
[docs]
groups = db.relationship(
"User",
secondary=cor_roles,
primaryjoin="User.id_role == utilisateurs.cor_roles.c.id_role_utilisateur",
secondaryjoin="User.id_role == utilisateurs.cor_roles.c.id_role_groupe",
backref=backref("members", cascade_backrefs=False),
)
@property
[docs]
def max_level_profil(self):
q = (
object_session(self)
.query(func.max(Profils.code_profil))
.select_from(User)
.join(
UserApplicationRight,
or_(
UserApplicationRight.id_role == self.id_role,
UserApplicationRight.id_role.in_(
[role.id_role for role in self.groups]
),
),
)
.join(Profils, UserApplicationRight.id_profil == Profils.id_profil)
.where(UserApplicationRight.id_application == get_current_app_id())
)
return q.scalar() or 0
@hybrid_property
[docs]
def nom_complet(self):
return " ".join([i for i in [self.nom_role, self.prenom_role] if i])
@nom_complet.expression
def nom_complet(cls):
return db.func.array_to_string(array([cls.nom_role, cls.prenom_role]), " ")
# applications_droits = db.relationship('AppUser', lazy='joined')
# for Flask-Admin
[docs]
def get_id(self):
return str(self.id_role)
@property
[docs]
def password(self):
if current_app.config["PASS_METHOD"] == "md5":
return self._password
elif current_app.config["PASS_METHOD"] == "hash":
return self._password_plus
else:
raise Exception
# TODO: change password digest algorithm for something stronger such
# as bcrypt. This need to be done at usershub level first.
@password.setter
def password(self, pwd):
pwd = pwd.encode("utf-8")
if current_app.config["PASS_METHOD"] == "md5":
self._password = hashlib.md5(pwd).hexdigest()
elif current_app.config["PASS_METHOD"] == "hash":
self._password_plus = bcrypt.hashpw(pwd, bcrypt.gensalt()).decode("utf-8")
else:
raise Exception("Unknown pass method")
[docs]
check_password = fn_check_password
@property
[docs]
def is_public(self):
return (
current_app.config.get("PUBLIC_ACCESS_USERNAME")
and current_app.config.get("PUBLIC_ACCESS_USERNAME") == self.identifiant
)
[docs]
def __repr__(self):
return "<User '{!r}' id='{}'>".format(self.identifiant, self.id_role)
[docs]
def __str__(self):
return self.identifiant or self.nom_complet
@qfilter
[docs]
def filter_by_app(cls, code_app=None, **kwargs):
if code_app is None:
code_app = current_app.config["CODE_APPLICATION"]
query = kwargs["query"]
return (
query.outerjoin(cor_roles, User.id_role == cor_roles.c.id_role_utilisateur)
.outerjoin(
UserApplicationRight,
or_(
UserApplicationRight.id_role == cor_roles.c.id_role_groupe,
UserApplicationRight.id_role == User.id_role,
),
)
.join(
Application,
Application.id_application == UserApplicationRight.id_application,
)
.where(Application.code_application == code_app)
).whereclause
@serializable
[docs]
class Organisme(db.Model):
[docs]
__tablename__ = "bib_organismes"
[docs]
__table_args__ = {"schema": "utilisateurs"}
[docs]
id_organisme = db.Column(db.Integer, primary_key=True)
[docs]
uuid_organisme = db.Column(
UUID(as_uuid=True), default=select(func.uuid_generate_v4())
)
[docs]
nom_organisme = db.Column(db.Unicode)
[docs]
adresse_organisme = db.Column(db.Unicode)
[docs]
cp_organisme = db.Column(db.Unicode)
[docs]
ville_organisme = db.Column(db.Unicode)
[docs]
tel_organisme = db.Column(db.Unicode)
[docs]
fax_organisme = db.Column(db.Unicode)
[docs]
email_organisme = db.Column(db.Unicode)
[docs]
url_organisme = db.Column(db.Unicode)
[docs]
url_logo = db.Column(db.Unicode)
[docs]
id_parent = db.Column(
db.Integer, db.ForeignKey("utilisateurs.bib_organismes.id_organisme")
)
[docs]
additional_data = db.Column(JSONB, nullable=True, server_default="{}")
[docs]
members = db.relationship(User, backref="organisme")
[docs]
def __str__(self):
return self.nom_organisme
[docs]
profils_for_app = db.Table(
"cor_profil_for_app",
db.Column(
"id_profil",
db.Integer,
ForeignKey("utilisateurs.t_profils.id_profil"),
primary_key=True,
),
db.Column(
"id_application",
db.Integer,
ForeignKey("utilisateurs.t_applications.id_application"),
primary_key=True,
),
schema="utilisateurs",
)
[docs]
class Profils(db.Model):
"""
Model de la classe t_profils
"""
[docs]
__tablename__ = "t_profils"
[docs]
__table_args__ = {"schema": "utilisateurs", "extend_existing": True}
[docs]
id_profil = db.Column(db.Integer, primary_key=True)
[docs]
code_profil = db.Column(db.Unicode)
[docs]
nom_profil = db.Column(db.Unicode)
[docs]
desc_profil = db.Column(db.Unicode)
[docs]
applications = relationship(
"Application", secondary=profils_for_app, back_populates="profils"
)
@serializable
[docs]
class Application(db.Model):
"""
Représente une application ou un module
"""
[docs]
__tablename__ = "t_applications"
[docs]
__table_args__ = {"schema": "utilisateurs"}
[docs]
id_application = db.Column(db.Integer, primary_key=True)
[docs]
code_application = db.Column(db.Integer)
[docs]
nom_application = db.Column(db.Unicode)
[docs]
desc_application = db.Column(db.Unicode)
[docs]
id_parent = db.Column(db.Integer)
[docs]
profils = relationship(
Profils, secondary=profils_for_app, back_populates="applications"
)
[docs]
def __repr__(self):
return "<Application {!r}>".format(self.nom_application)
[docs]
def __str__(self):
return self.nom_application
@staticmethod
[docs]
def get_application(nom_application):
return db.session.execute(
select(Application).where(Application.nom_application == nom_application)
).scalar_one()
[docs]
class ApplicationRight(db.Model):
"""
Droit d'acces a une application
"""
[docs]
__tablename__ = "bib_droits"
[docs]
__table_args__ = {"schema": "utilisateurs"}
[docs]
id_droit = db.Column(db.Integer, primary_key=True)
[docs]
nom_droit = db.Column(db.Unicode)
[docs]
desc_droit = db.Column(db.UnicodeText)
[docs]
def __repr__(self):
return "<ApplicationRight {!r}>".format(self.desc_droit)
[docs]
def __str__(self):
return self.nom_droit
[docs]
class UserApplicationRight(db.Model):
"""
Droit d'acces d'un user particulier a une application particuliere
"""
[docs]
__tablename__ = "cor_role_app_profil"
[docs]
__table_args__ = {"schema": "utilisateurs"} # , 'extend_existing': True}
[docs]
id_role = db.Column(
db.Integer, ForeignKey("utilisateurs.t_roles.id_role"), primary_key=True
)
[docs]
id_profil = db.Column(
db.Integer, ForeignKey("utilisateurs.t_profils.id_profil"), primary_key=True
)
[docs]
id_application = db.Column(
db.Integer,
ForeignKey("utilisateurs.t_applications.id_application"),
primary_key=True,
)
[docs]
role = relationship("User")
[docs]
profil = relationship("Profils")
[docs]
application = relationship("Application")
[docs]
def __repr__(self):
return "<UserApplicationRight role='{}' profil='{}' app='{}'>".format(
self.id_role, self.id_profil, self.id_application
)
@serializable(exclude=["password", "_password_plus"])
[docs]
class AppUser(db.Model):
"""
Relations entre applications et utilisateurs
"""
[docs]
__tablename__ = "v_userslist_forall_applications"
[docs]
__table_args__ = {"schema": "utilisateurs"}
[docs]
id_role = db.Column(
db.Integer, db.ForeignKey("utilisateurs.t_roles.id_role"), primary_key=True
)
[docs]
role = relationship("User", backref="app_users")
[docs]
nom_role = db.Column(db.Unicode)
[docs]
prenom_role = db.Column(db.Unicode)
[docs]
id_application = db.Column(
db.Integer,
db.ForeignKey("utilisateurs.t_applications.id_application"),
primary_key=True,
)
[docs]
id_organisme = db.Column(db.Integer)
[docs]
application = relationship("Application", backref="app_users")
[docs]
identifiant = db.Column(db.Unicode)
[docs]
_password = db.Column("pass", db.Unicode)
[docs]
_password_plus = db.Column("pass_plus", db.Unicode)
[docs]
id_droit_max = db.Column(db.Integer, primary_key=True)
# user = db.relationship('User', backref='relations', lazy='joined')
# application = db.relationship('Application',
# backref='relations', lazy='joined')
@property
[docs]
def password(self):
return self._password
[docs]
check_password = fn_check_password
[docs]
def __repr__(self):
return "<AppUser role='{}' app='{}'>".format(self.id_role, self.id_application)
[docs]
class AppRole(db.Model):
"""
Relations entre applications et role
"""
[docs]
__tablename__ = "v_roleslist_forall_applications"
[docs]
__table_args__ = {"schema": "utilisateurs"}
[docs]
id_role = db.Column(
db.Integer, db.ForeignKey("utilisateurs.t_roles.id_role"), primary_key=True
)
[docs]
groupe = db.Column(db.Boolean)
[docs]
nom_role = db.Column(db.Unicode)
[docs]
prenom_role = db.Column(db.Unicode)
[docs]
id_application = db.Column(
db.Integer,
db.ForeignKey("utilisateurs.t_applications.id_application"),
primary_key=True,
)
[docs]
id_organisme = db.Column(db.Integer)
[docs]
identifiant = db.Column(db.Unicode)
[docs]
application = db.relationship(Application)
[docs]
def as_dict(self):
cols = (c for c in self.__table__.columns)
return {c.name: getattr(self, c.name) for c in cols}
[docs]
cor_role_liste = db.Table(
"cor_role_liste",
db.Column(
"id_role",
db.Integer,
ForeignKey("utilisateurs.t_roles.id_role"),
primary_key=True,
),
db.Column(
"id_liste",
db.Integer,
ForeignKey("utilisateurs.t_listes.id_liste"),
primary_key=True,
),
schema="utilisateurs",
)
@serializable
[docs]
class UserList(db.Model):
[docs]
__tablename__ = "t_listes"
[docs]
__table_args__ = {"schema": "utilisateurs"}
[docs]
id_liste = db.Column(db.Integer, primary_key=True)
[docs]
code_liste = db.Column(db.Unicode(length=20))
[docs]
nom_liste = db.Column(db.Unicode(length=50))
[docs]
desc_liste = db.Column(db.Unicode)
[docs]
users = db.relationship(User, secondary=cor_role_liste)