"""
Models of gn_permissions schema
"""
from flask import current_app
from packaging import version
from datetime import datetime
from ref_geo.models import LAreas
import sqlalchemy as sa
from sqlalchemy import ForeignKey, ForeignKeyConstraint
from sqlalchemy.sql import select
from sqlalchemy.orm import foreign, joinedload, contains_eager
import flask_sqlalchemy
from utils_flask_sqla.models import qfilter
from utils_flask_sqla.serializers import serializable
from pypnusershub.db.models import User
from apptax.taxonomie.models import Taxref
from geonature.utils.env import db
from geonature.core.gn_commons.models.base import TModules
@serializable
[docs]
class PermFilterType(db.Model):
[docs]
__tablename__ = "bib_filters_type"
[docs]
__table_args__ = {"schema": "gn_permissions"}
[docs]
id_filter_type = db.Column(db.Integer, primary_key=True)
[docs]
code_filter_type = db.Column(db.Unicode)
[docs]
label_filter_type = db.Column(db.Unicode)
[docs]
description_filter_type = db.Column(db.Unicode)
@serializable
[docs]
class PermScope(db.Model):
[docs]
__tablename__ = "bib_filters_scope"
[docs]
__table_args__ = {"schema": "gn_permissions"}
[docs]
value = db.Column(db.Integer, primary_key=True)
[docs]
label = db.Column(db.Unicode)
[docs]
description = db.Column(db.Unicode)
[docs]
def __str__(self):
return self.description
@serializable
[docs]
class PermAction(db.Model):
[docs]
__tablename__ = "bib_actions"
[docs]
__table_args__ = {"schema": "gn_permissions"}
[docs]
id_action = db.Column(db.Integer, primary_key=True)
[docs]
code_action = db.Column(db.Unicode)
[docs]
description_action = db.Column(db.Unicode)
[docs]
def __str__(self):
return self.description_action
[docs]
cor_object_module = db.Table(
"cor_object_module",
db.Column(
"id_cor_object_module",
db.Integer,
primary_key=True,
),
db.Column(
"id_object",
db.Integer,
ForeignKey("gn_permissions.t_objects.id_object"),
),
db.Column(
"id_module",
db.Integer,
ForeignKey("gn_commons.t_modules.id_module"),
),
schema="gn_permissions",
)
@serializable
[docs]
class PermObject(db.Model):
[docs]
__tablename__ = "t_objects"
[docs]
__table_args__ = {"schema": "gn_permissions"}
[docs]
id_object = db.Column(db.Integer, primary_key=True)
[docs]
code_object = db.Column(db.Unicode)
[docs]
description_object = db.Column(db.Unicode)
[docs]
def __str__(self):
return f"{self.code_object} ({self.description_object})"
# compat.
[docs]
def _nice_order(model, qs):
from geonature.core.gn_commons.models import TModules
return (
qs.join(model.module)
.join(model.object)
.join(model.action)
.options(
contains_eager(model.module),
contains_eager(model.object),
contains_eager(model.action),
)
.order_by(
TModules.module_code,
# ensure ALL at first:
sa.case([(PermObject.code_object == "ALL", "1")], else_=PermObject.code_object),
model.id_action,
)
)
[docs]
class PermissionAvailable(db.Model):
[docs]
__tablename__ = "t_permissions_available"
[docs]
__table_args__ = {"schema": "gn_permissions"}
[docs]
id_module = db.Column(
db.Integer, ForeignKey("gn_commons.t_modules.id_module"), primary_key=True
)
[docs]
id_object = db.Column(
db.Integer,
ForeignKey(PermObject.id_object),
default=select(PermObject.id_object).where(PermObject.code_object == "ALL"),
primary_key=True,
)
[docs]
id_action = db.Column(db.Integer, ForeignKey(PermAction.id_action), primary_key=True)
[docs]
label = db.Column(db.Unicode)
[docs]
module = db.relationship("TModules")
[docs]
object = db.relationship(PermObject)
[docs]
action = db.relationship(PermAction)
[docs]
scope_filter = db.Column(db.Boolean, server_default=sa.false())
[docs]
sensitivity_filter = db.Column(db.Boolean, server_default=sa.false(), nullable=False)
[docs]
areas_filter = db.Column(db.Boolean, server_default=sa.false(), nullable=False)
[docs]
taxons_filter = db.Column(db.Boolean, server_default=sa.false(), nullable=False)
[docs]
filters_fields = {
"SCOPE": "scope_filter",
"SENSITIVITY": "sensitivity_filter",
"GEOGRAPHIC": "areas_filter",
"TAXONOMIC": "taxons_filter",
}
@property
[docs]
def filters(self):
return [k for k, v in self.filters_fields.items() if getattr(self, v)]
[docs]
def __str__(self):
s = self.module.module_label
if self.object.code_object != "ALL":
object_label = self.object.code_object.title().replace("_", " ")
s += f" | {object_label}"
s += f" | {self.label}"
return s
@staticmethod
[docs]
def nice_order(**kwargs):
# TODO fix when flask admin is compatible with
# sqlalchemy2.0 query style
query = PermissionAvailable.query
return _nice_order(PermissionAvailable, query)
[docs]
class PermFilter:
def __init__(self, name, value):
[docs]
def __str__(self):
if self.name == "SCOPE":
if self.value is None:
return """<i class="fa fa-users" aria-hidden="true"></i> de tout le monde"""
elif self.value == 1:
return """<i class="fa fa-user" aria-hidden="true"></i> à moi"""
elif self.value == 2:
return """<i class="fa fa-user-circle" aria-hidden="true"></i> de mon organisme"""
elif self.name == "SENSITIVITY":
if self.value:
statut = (
"floutées"
if current_app.config["SYNTHESE"]["BLUR_SENSITIVE_OBSERVATIONS"]
else "exclues"
)
return f"""<i class="fa fa-low-vision" aria-hidden="true"></i> sensibles {statut}"""
else:
return """<i class="fa fa-eye" aria-hidden="true"></i> sensible et non sensible"""
elif self.name == "GEOGRAPHIC":
if self.value:
areas_names = ", ".join([a.area_name for a in self.value])
return f"""<i class="fa fa-map-marker" aria-hidden="true"></i> {areas_names}"""
else:
return (
"""<i class="fa fa-globe" aria-hidden="true"></i> Aucune limite géographique"""
)
elif self.name == "TAXONOMIC":
if self.value:
taxons_names = ", ".join([t.nom_vern_or_lb_nom for t in self.value])
return f"""<i class="fa fa-tree" aria-hidden="true"></i> {taxons_names}"""
else:
return """<i class="fa fa-tree" aria-hidden="true"></i> Tous les taxons"""
[docs]
cor_permission_area = db.Table(
"cor_permission_area",
sa.Column(
"id_permission",
sa.Integer,
sa.ForeignKey("gn_permissions.t_permissions.id_permission"),
primary_key=True,
),
sa.Column("id_area", sa.Integer, sa.ForeignKey("ref_geo.l_areas.id_area"), primary_key=True),
schema="gn_permissions",
)
[docs]
cor_permission_taxref = db.Table(
"cor_permission_taxref",
sa.Column(
"id_permission",
sa.Integer,
sa.ForeignKey("gn_permissions.t_permissions.id_permission"),
primary_key=True,
),
sa.Column("cd_nom", sa.Integer, sa.ForeignKey("taxonomie.taxref.cd_nom"), primary_key=True),
schema="gn_permissions",
)
@serializable
[docs]
class Permission(db.Model):
[docs]
__tablename__ = "t_permissions"
[docs]
__table_args__ = (
ForeignKeyConstraint(
["id_module", "id_object", "id_action"],
[
"gn_permissions.t_permissions_available.id_module",
"gn_permissions.t_permissions_available.id_object",
"gn_permissions.t_permissions_available.id_action",
],
),
{"schema": "gn_permissions"},
)
[docs]
id_permission = db.Column(db.Integer, primary_key=True)
[docs]
id_role = db.Column(db.Integer, ForeignKey("utilisateurs.t_roles.id_role"), nullable=False)
[docs]
id_action = db.Column(db.Integer, ForeignKey(PermAction.id_action), nullable=False)
[docs]
id_module = db.Column(db.Integer, ForeignKey("gn_commons.t_modules.id_module"), nullable=False)
[docs]
id_object = db.Column(
db.Integer,
ForeignKey(PermObject.id_object),
default=select(PermObject.id_object).where(PermObject.code_object == "ALL"),
nullable=False,
)
[docs]
created_on = db.Column(sa.DateTime, server_default=sa.func.now())
[docs]
expire_on = db.Column(db.DateTime)
[docs]
validated = db.Column(sa.Boolean, server_default=sa.true())
[docs]
role = db.relationship(User, backref=db.backref("permissions", cascade_backrefs=False))
[docs]
action = db.relationship(PermAction)
[docs]
module = db.relationship(TModules)
[docs]
object = db.relationship(PermObject)
[docs]
scope_value = db.Column(db.Integer, ForeignKey(PermScope.value), nullable=True)
[docs]
scope = db.relationship(PermScope)
[docs]
sensitivity_filter = db.Column(db.Boolean, server_default=sa.false(), nullable=False)
[docs]
areas_filter = db.relationship(LAreas, secondary=cor_permission_area)
[docs]
taxons_filter = db.relationship(Taxref, secondary=cor_permission_taxref)
[docs]
availability = db.relationship(
PermissionAvailable,
backref=db.backref("permissions", overlaps="action, object, module"), # overlaps expected
overlaps="action, object, module", # overlaps expected
)
[docs]
filters_fields = {
"SCOPE": "scope_value",
"SENSITIVITY": "sensitivity_filter",
"GEOGRAPHIC": "areas_filter",
"TAXONOMIC": "taxons_filter",
}
[docs]
def __repr__(self):
return f"""Permission {self.id_permission}
- Role: {self.role.nom_complet or self.role.identifiant}
- Module: {self.module.module_label}
- Action : {self.action.code_action}
- Scope : {self.scope_value}
- Taxons Filter : {self.taxons_filter}
- Areas Filter : {self.areas_filter}
- Object: {self.object}
- Floutage : {"Oui" if self.sensitivity_filter else "Non"}
- Expire le : {self.expire_on}\n"""
@staticmethod
[docs]
def __SCOPE_le__(a, b):
return b is None or (a is not None and a <= b)
@staticmethod
[docs]
def __SENSITIVITY_le__(a, b):
# False only if: A is False and b is True
return (not a) <= (not b)
@staticmethod
[docs]
def __GEOGRAPHIC_le__(a, b):
return (a and set(a).issubset(b)) or not b
@staticmethod
[docs]
def __TAXONOMIC_le__(a, b):
# True if *all* taxons of a is included in *any* taxons of b
return (a and any(all((_a <= _b for _a in a)) for _b in b)) or not b
@staticmethod
[docs]
def __default_le__(a, b):
return a == b or b is None
[docs]
def __le__(self, other):
"""
Return True if this permission is supersed by 'other' permission.
This requires all filters to be supersed by 'other' filters.
"""
for name, field in self.filters_fields.items():
# Get filter comparison function or use default comparison function
__le_fct__ = getattr(self, f"__{name}_le__", Permission.__default_le__)
self_value, other_value = getattr(self, field), getattr(other, field)
if not __le_fct__(self_value, other_value):
return False
return True
@property
[docs]
def filters(self):
filters = []
for name, field in self.filters_fields.items():
value = getattr(self, field)
mapper = self.__mapper__
if field in mapper.columns:
column = mapper.columns[field]
if column.nullable:
if value is None:
continue
if column.type.python_type is bool:
if not value:
continue
elif field in mapper.relationships:
if value == []:
continue
filters.append(PermFilter(name, value))
return filters
[docs]
def has_other_filters_than(self, *expected_filters):
for flt in self.filters:
if flt.name not in expected_filters:
return True
return False
@qfilter(query=True)
[docs]
def nice_order(cls, **kwargs):
return _nice_order(cls, kwargs["query"])
@property
[docs]
def is_active(self):
return (
self.expire_on is None or self.expire_on > datetime.now()
) and self.validated is True
@classmethod
[docs]
def active_filter(cls):
return sa.and_(
sa.or_(cls.expire_on.is_(sa.null()), cls.expire_on > datetime.now()),
cls.validated.is_(True),
)