import datetime
import sqlalchemy as sa
import re
from flask import g
from geonature.core.gn_permissions.tools import get_scopes_by_action
from geonature.utils.env import DB, db
from pypnnomenclature.models import TNomenclatures
from pypnusershub.db.models import User
from sqlalchemy import ForeignKey, or_, func
from sqlalchemy.dialects.postgresql import UUID as UUIDType
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm import relationship
from sqlalchemy import func, select, exists
from utils_flask_sqla.models import qfilter
from utils_flask_sqla.serializers import serializable
from .commons import *
from .datasets import TDatasets
@serializable(exclude=["user_actors", "organism_actors"])
[docs]
class TAcquisitionFramework(db.Model):
[docs]
__tablename__ = "t_acquisition_frameworks"
[docs]
__table_args__ = {"schema": "gn_meta"}
[docs]
id_acquisition_framework = DB.Column(DB.Integer, primary_key=True)
[docs]
unique_acquisition_framework_id = DB.Column(
UUIDType(as_uuid=True), default=select(func.uuid_generate_v4())
)
[docs]
acquisition_framework_name = DB.Column(DB.Unicode(255))
[docs]
acquisition_framework_desc = DB.Column(DB.Unicode)
[docs]
id_nomenclature_territorial_level = DB.Column(
DB.Integer,
ForeignKey("ref_nomenclatures.t_nomenclatures.id_nomenclature"),
default=lambda: TNomenclatures.get_default_nomenclature("NIVEAU_TERRITORIAL"),
)
[docs]
territory_desc = DB.Column(DB.Unicode)
[docs]
keywords = DB.Column(DB.Unicode)
[docs]
id_nomenclature_financing_type = DB.Column(
DB.Integer,
ForeignKey("ref_nomenclatures.t_nomenclatures.id_nomenclature"),
default=lambda: TNomenclatures.get_default_nomenclature("TYPE_FINANCEMENT"),
)
[docs]
target_description = DB.Column(DB.Unicode)
[docs]
ecologic_or_geologic_target = DB.Column(DB.Unicode)
[docs]
acquisition_framework_parent_id = DB.Column(DB.Integer)
[docs]
is_parent = DB.Column(DB.Boolean)
[docs]
opened = DB.Column(DB.Boolean, default=True)
[docs]
id_digitizer = DB.Column(DB.Integer, ForeignKey(User.id_role))
[docs]
acquisition_framework_start_date = DB.Column(DB.Date, default=datetime.datetime.utcnow)
[docs]
acquisition_framework_end_date = DB.Column(DB.Date)
[docs]
initial_closing_date = DB.Column(DB.DateTime)
[docs]
creator = DB.relationship(User, lazy="joined") # = digitizer
[docs]
nomenclature_territorial_level = DB.relationship(
TNomenclatures,
foreign_keys=[id_nomenclature_territorial_level],
)
[docs]
nomenclature_financing_type = DB.relationship(
TNomenclatures,
foreign_keys=[id_nomenclature_financing_type],
)
[docs]
cor_af_actor = relationship(
CorAcquisitionFrameworkActor,
lazy="joined",
# cascade="save-update, merge, delete, delete-orphan",
cascade="all,delete-orphan",
uselist=True,
backref=DB.backref("actor_af"),
)
[docs]
cor_objectifs = DB.relationship(
TNomenclatures,
secondary=cor_acquisition_framework_objectif,
backref=DB.backref("objectif_af"),
)
[docs]
cor_volets_sinp = DB.relationship(
TNomenclatures,
secondary=cor_acquisition_framework_voletsinp,
backref=DB.backref("volet_sinp_af"),
)
[docs]
cor_territories = DB.relationship(
TNomenclatures,
secondary=cor_acquisition_framework_territory,
backref=DB.backref("territory_af"),
)
[docs]
bibliographical_references = DB.relationship(
"TBibliographicReference",
cascade="all,delete-orphan",
uselist=True,
backref=DB.backref("acquisition_framework"),
)
# FIXME: remove and use datasets instead
[docs]
t_datasets = DB.relationship(
"TDatasets",
lazy="joined", # DS required for permissions checks
cascade="all,delete-orphan",
uselist=True,
back_populates="acquisition_framework",
)
[docs]
datasets = DB.relationship(
"TDatasets",
cascade="all,delete-orphan",
uselist=True,
overlaps="t_datasets", # overlaps expected
)
@hybrid_property
[docs]
def user_actors(self):
return [actor.role for actor in self.cor_af_actor if actor.role]
@hybrid_property
[docs]
def organism_actors(self):
return [actor.organism for actor in self.cor_af_actor if actor.organism]
[docs]
def has_datasets(self):
return db.session.scalar(
exists(TDatasets)
.where(TDatasets.id_acquisition_framework == self.id_acquisition_framework)
.select()
)
[docs]
def has_child_acquisition_framework(self):
return db.session.scalar(
exists(TAcquisitionFramework)
.where(
TAcquisitionFramework.acquisition_framework_parent_id
== self.id_acquisition_framework
)
.select()
)
[docs]
def has_instance_permission(self, scope, _through_ds=True):
if scope == 0:
return False
elif scope in (1, 2):
if g.current_user.id_role == self.id_digitizer or g.current_user in self.user_actors:
return True
if scope == 2 and g.current_user.organisme in self.organism_actors:
return True
# rights on DS give rights on AF!
return _through_ds and any(
map(
lambda ds: ds.has_instance_permission(scope, _through_af=False),
self.datasets,
)
)
elif scope == 3:
return True
@staticmethod
[docs]
def get_id(uuid_af):
"""
return the acquisition framework's id
from its UUID if exist or None
"""
return DB.session.scalars(
select(TAcquisitionFramework.id_acquisition_framework)
.where(TAcquisitionFramework.unique_acquisition_framework_id == uuid_af)
.limit(1)
).first()
@staticmethod
[docs]
def get_user_af(user, only_query=False, only_user=False):
"""get the af(s) where the user is actor (himself or with its organism - only himelsemf id only_use=True) or digitizer
param:
- user from TRole model
- only_query: boolean (return the query not the id_datasets allowed if true)
- only_user: boolean: return only the dataset where user himself is actor (not with its organoism)
return: a list of id_dataset or a query"""
query = select(TAcquisitionFramework.id_acquisition_framework).outerjoin(
CorAcquisitionFrameworkActor,
CorAcquisitionFrameworkActor.id_acquisition_framework
== TAcquisitionFramework.id_acquisition_framework,
)
if user.id_organisme is None or only_user:
query = query.where(
or_(
CorAcquisitionFrameworkActor.id_role == user.id_role,
TAcquisitionFramework.id_digitizer == user.id_role,
)
)
else:
query = query.where(
or_(
CorAcquisitionFrameworkActor.id_organism == user.id_organisme,
CorAcquisitionFrameworkActor.id_role == user.id_role,
TAcquisitionFramework.id_digitizer == user.id_role,
)
)
if only_query:
return query
query = query.distinct()
data = db.session.scalars(query).all()
return data
@classmethod
[docs]
def _get_read_scope(cls, user=None):
if user is None:
user = g.current_user
cruved = get_scopes_by_action(id_role=user.id_role, module_code="METADATA")
return cruved["R"]
@qfilter(query=True)
[docs]
def filter_by_scope(cls, scope, *, query, user=None):
if user is None:
user = g.current_user
if scope == 0:
query = query.where(sa.false())
elif scope in (1, 2):
ors = [
TAcquisitionFramework.id_digitizer == user.id_role,
TAcquisitionFramework.cor_af_actor.any(id_role=user.id_role),
TAcquisitionFramework.datasets.any(id_digitizer=user.id_role),
TAcquisitionFramework.datasets.any(
TDatasets.cor_dataset_actor.any(id_role=user.id_role)
), # TODO test coverage
]
# if organism is None => do not filter on id_organism even if level = 2
if scope == 2 and user.id_organisme is not None:
ors += [
TAcquisitionFramework.cor_af_actor.any(id_organism=user.id_organisme),
TAcquisitionFramework.datasets.any(
TDatasets.cor_dataset_actor.any(id_organism=user.id_organisme)
), # TODO test coverage
]
query = query.where(or_(*ors))
return query
@qfilter(query=True)
[docs]
def filter_by_readable(cls, *, query, user=None):
"""
Return the afs where the user has autorization via its CRUVED
"""
return cls.filter_by_scope(TDatasets._get_read_scope(user=user), user=user, query=query)
@qfilter(query=True)
[docs]
def filter_by_areas(cls, areas, *, query):
"""
Filter meta by areas
"""
return query.where(
TAcquisitionFramework.datasets.any(
TDatasets.filter_by_areas(areas).whereclause,
),
)
@qfilter(query=True)
[docs]
def filter_by_params(cls, params={}, *, _ds_search=True, query=None):
# XXX frontend retro-compatibility
if params.get("selector") == "ds":
ds_params = params
params = {"datasets": ds_params}
if "search" in ds_params:
params["search"] = ds_params.pop("search")
ds_params = params.get("datasets")
if ds_params:
ds_filter = TDatasets.filter_by_params(ds_params).whereclause
if ds_filter is not None: # do not exclude AF without any DS
query = query.where(TAcquisitionFramework.datasets.any(ds_filter))
params = MetadataFilterSchema().load(params)
uuid = params.get("uuid")
name = params.get("name")
date = params.get("date")
query = (
query.where(
TAcquisitionFramework.unique_acquisition_framework_id == uuid if uuid else True
)
.where(
TAcquisitionFramework.acquisition_framework_name.ilike(f"%{name}%")
if name
else True
)
.where(TAcquisitionFramework.acquisition_framework_start_date == date if date else True)
)
actors = []
person = params.get("person")
organism = params.get("organism")
if person:
actors.append(
TAcquisitionFramework.cor_af_actor.any(
CorAcquisitionFrameworkActor.id_role == person
)
)
if organism:
actors.append(
TAcquisitionFramework.cor_af_actor.any(
CorAcquisitionFrameworkActor.id_organism == organism
)
)
if actors:
query = query.where(sa.or_(*actors))
areas = params.get("areas")
if areas:
query = TAcquisitionFramework.filter_by_areas(areas, query=query)
search = params.get("search")
if search:
search = search.strip()
# Where clauses to include other matching possibilities (id, uuid, date)
where_clauses = []
if search.isdigit(): # ID AF match
where_clauses.append(TAcquisitionFramework.id_acquisition_framework == int(search))
if len(search) >= MIN_LENGTH_UUID_OR_DATE_SEARCH_STRING: # UUID and date match
where_clauses.append(
sa.cast(TAcquisitionFramework.unique_acquisition_framework_id, sa.String).like(
f"{search}%"
)
)
try:
date = datetime.datetime.strptime(search, "%d/%m/%Y").date()
where_clauses.append(
TAcquisitionFramework.acquisition_framework_start_date == date
)
except ValueError:
pass
# If name search includes dataset
if _ds_search:
where_clauses.append(
TAcquisitionFramework.datasets.any(
TDatasets.filter_by_params({"search": search}, _af_search=False).whereclause
),
)
# Acquisition Framework name matching
search_words_af_cte = select(
func.unnest(func.string_to_array(search, " ")).label("word")
).cte("search_words_cte")
matched_words_af_cte = (
select(
TAcquisitionFramework.id_acquisition_framework,
func.count().label("match_count"),
)
.join(
search_words_af_cte,
sa.or_(
TAcquisitionFramework.acquisition_framework_name.ilike(
func.concat("%", search_words_af_cte.c.word, "%")
),
*where_clauses,
),
)
.group_by(
TAcquisitionFramework.id_acquisition_framework,
)
).cte("matched_words_af_cte")
query = query.where(
matched_words_af_cte.c.id_acquisition_framework
== TAcquisitionFramework.id_acquisition_framework
).order_by(matched_words_af_cte.c.match_count.desc())
return query