import datetime
from flask import g
import sqlalchemy as sa
from sqlalchemy import ForeignKey, or_, func
from sqlalchemy.sql import select, func
from sqlalchemy.orm import relationship
from sqlalchemy.dialects.postgresql import UUID as UUIDType
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.schema import FetchedValue
from utils_flask_sqla.models import qfilter
import marshmallow as ma
from pypnnomenclature.models import TNomenclatures
from pypnusershub.db.models import User
from utils_flask_sqla.serializers import serializable
from geonature.utils.env import DB, db
from geonature.core.gn_permissions.tools import get_scopes_by_action
from geonature.core.gn_commons.models import cor_field_dataset, cor_module_dataset
from ref_geo.models import LAreas
from .commons import *
@serializable(exclude=["user_actors", "organism_actors"])
[docs]
class TDatasets(db.Model):
[docs]
__tablename__ = "t_datasets"
[docs]
__table_args__ = {"schema": "gn_meta"}
[docs]
id_dataset = DB.Column(DB.Integer, primary_key=True)
[docs]
unique_dataset_id = DB.Column(UUIDType(as_uuid=True), default=select(func.uuid_generate_v4()))
[docs]
id_acquisition_framework = DB.Column(
DB.Integer,
ForeignKey("gn_meta.t_acquisition_frameworks.id_acquisition_framework"),
)
[docs]
acquisition_framework = DB.relationship(
"TAcquisitionFramework", back_populates="datasets", lazy="joined"
) # join AF as required for permissions checks
[docs]
dataset_name = DB.Column(DB.Unicode)
[docs]
dataset_shortname = DB.Column(DB.Unicode)
[docs]
dataset_desc = DB.Column(DB.Unicode)
[docs]
id_nomenclature_data_type = DB.Column(
DB.Integer,
ForeignKey("ref_nomenclatures.t_nomenclatures.id_nomenclature"),
default=lambda: TNomenclatures.get_default_nomenclature("DATA_TYP"),
)
[docs]
keywords = DB.Column(DB.Unicode)
[docs]
marine_domain = DB.Column(DB.Boolean)
[docs]
terrestrial_domain = DB.Column(DB.Boolean)
[docs]
id_nomenclature_dataset_objectif = DB.Column(
DB.Integer,
ForeignKey("ref_nomenclatures.t_nomenclatures.id_nomenclature"),
default=lambda: TNomenclatures.get_default_nomenclature("JDD_OBJECTIFS"),
)
[docs]
bbox_west = DB.Column(DB.Float)
[docs]
bbox_east = DB.Column(DB.Float)
[docs]
bbox_south = DB.Column(DB.Float)
[docs]
bbox_north = DB.Column(DB.Float)
[docs]
id_nomenclature_collecting_method = DB.Column(
DB.Integer,
ForeignKey("ref_nomenclatures.t_nomenclatures.id_nomenclature"),
default=lambda: TNomenclatures.get_default_nomenclature("METHO_RECUEIL"),
)
[docs]
id_nomenclature_data_origin = DB.Column(
DB.Integer,
ForeignKey("ref_nomenclatures.t_nomenclatures.id_nomenclature"),
default=lambda: TNomenclatures.get_default_nomenclature("DS_PUBLIQUE"),
)
[docs]
id_nomenclature_source_status = DB.Column(
DB.Integer,
ForeignKey("ref_nomenclatures.t_nomenclatures.id_nomenclature"),
default=lambda: TNomenclatures.get_default_nomenclature("STATUT_SOURCE"),
)
[docs]
id_nomenclature_resource_type = DB.Column(
DB.Integer,
ForeignKey("ref_nomenclatures.t_nomenclatures.id_nomenclature"),
default=lambda: TNomenclatures.get_default_nomenclature("RESOURCE_TYP"),
)
[docs]
active = DB.Column(DB.Boolean, default=True)
[docs]
validable = DB.Column(DB.Boolean, server_default=FetchedValue())
[docs]
id_digitizer = DB.Column(DB.Integer, ForeignKey(User.id_role))
[docs]
digitizer = DB.relationship(User, lazy="joined") # joined for permission check
[docs]
creator = DB.relationship(
User, lazy="joined", overlaps="digitizer"
) # overlaps as alias of digitizer
[docs]
id_taxa_list = DB.Column(DB.Integer)
[docs]
modules = DB.relationship("TModules", secondary=cor_module_dataset, backref="datasets")
[docs]
nomenclature_data_type = DB.relationship(
TNomenclatures,
foreign_keys=[id_nomenclature_data_type],
)
[docs]
nomenclature_dataset_objectif = DB.relationship(
TNomenclatures,
foreign_keys=[id_nomenclature_dataset_objectif],
)
[docs]
nomenclature_collecting_method = DB.relationship(
TNomenclatures,
foreign_keys=[id_nomenclature_collecting_method],
)
[docs]
nomenclature_data_origin = DB.relationship(
TNomenclatures,
foreign_keys=[id_nomenclature_data_origin],
)
[docs]
nomenclature_source_status = DB.relationship(
TNomenclatures,
foreign_keys=[id_nomenclature_source_status],
)
[docs]
nomenclature_resource_type = DB.relationship(
TNomenclatures,
foreign_keys=[id_nomenclature_resource_type],
)
[docs]
cor_territories = DB.relationship(
TNomenclatures,
secondary=cor_dataset_territory,
backref=DB.backref("territory_dataset"),
)
# because CorDatasetActor could be an User or an Organisme object...
[docs]
cor_dataset_actor = relationship(
CorDatasetActor,
lazy="joined",
cascade="save-update, merge, delete, delete-orphan",
backref=DB.backref("actor_dataset"),
)
[docs]
additional_fields = DB.relationship(
"TAdditionalFields", secondary=cor_field_dataset, back_populates="datasets"
)
@hybrid_property
[docs]
def user_actors(self):
return [actor.role for actor in self.cor_dataset_actor if actor.role is not None]
@hybrid_property
[docs]
def organism_actors(self):
return [actor.organism for actor in self.cor_dataset_actor if actor.organism is not None]
[docs]
def is_deletable(self):
return not DB.session.execute(self.synthese_records.exists().select()).scalar()
[docs]
def has_instance_permission(self, scope, _through_af=True):
"""
_through_af prevent infinite recursion
"""
if scope == 0:
return False
elif scope in (1, 2):
if g.current_user.id_role == self.id_digitizer or g.current_user in self.user_actors:
return True
if scope == 2 and g.current_user.organisme in self.organism_actors:
return True
return _through_af and self.acquisition_framework.has_instance_permission(
scope, _through_ds=False
)
elif scope == 3:
return True
[docs]
def __str__(self):
return self.dataset_name
@staticmethod
[docs]
def get_id(uuid_dataset):
return DB.session.execute(
select(TDatasets.id_dataset).where(TDatasets.unique_dataset_id == uuid_dataset)
).scalar_one_or_none()
@staticmethod
[docs]
def get_uuid(id_dataset):
return DB.session.execute(
select(TDatasets.unique_dataset_id).where(TDatasets.id_dataset == id_dataset)
).scalar_one_or_none()
@classmethod
[docs]
def _get_read_scope(cls, user=None):
if user is None:
user = g.current_user
cruved = get_scopes_by_action(id_role=user.id_role, module_code="METADATA")
return cruved["R"]
@classmethod
[docs]
def _get_create_scope(cls, module_code, user=None, object_code=None):
if user is None:
user = g.current_user
cruved = get_scopes_by_action(
id_role=user.id_role, module_code=module_code, object_code=object_code
)
return cruved["C"]
@qfilter(query=True)
[docs]
def filter_by_scope(cls, scope, *, query, user=None):
from .aframework import TAcquisitionFramework
whereclause = sa.true()
if user is None:
user = g.current_user
if scope == 0:
whereclause = sa.false()
elif scope in (1, 2):
ors = [
cls.id_digitizer == user.id_role,
cls.cor_dataset_actor.any(id_role=user.id_role),
cls.acquisition_framework.has(id_digitizer=user.id_role),
cls.acquisition_framework.has(
TAcquisitionFramework.cor_af_actor.any(id_role=user.id_role),
),
]
# if organism is None => do not filter on id_organism even if level = 2
if scope == 2 and user.id_organisme is not None:
ors += [
cls.cor_dataset_actor.any(id_organism=user.id_organisme),
cls.acquisition_framework.has(
TAcquisitionFramework.cor_af_actor.any(id_organism=user.id_organisme),
),
]
whereclause = or_(*ors)
return query.where(whereclause)
@qfilter(query=True)
[docs]
def filter_by_params(cls, params={}, *, _af_search=True, query=None):
# query is always set by qfilter, but we need a default value as params as a default value
from .aframework import TAcquisitionFramework
class DatasetFilterSchema(MetadataFilterSchema):
active = ma.fields.Boolean()
orderby = ma.fields.String()
module_code = ma.fields.String()
id_acquisition_frameworks = ma.fields.List(ma.fields.Integer(), allow_none=True)
params = DatasetFilterSchema().load(params)
active = params.get("active")
if active is not None:
query = query.where(cls.active == active)
module_code = params.get("module_code")
if module_code:
query = query.where(cls.modules.any(module_code=module_code))
af_ids = params.get("id_acquisition_frameworks")
if af_ids:
query = query.where(
sa.or_(*[cls.id_acquisition_framework == af_id for af_id in af_ids])
)
uuid = params.get("uuid")
if uuid:
query = query.where(cls.unique_dataset_id == uuid)
name = params.get("name")
if name:
query = query.where(cls.dataset_name.ilike(f"%{name}%"))
date = params.get("date")
if date:
query = query.where(sa.cast(cls.meta_create_date, sa.DATE) == date)
actors = []
person = params.get("person")
if person:
actors.append(cls.cor_dataset_actor.any(CorDatasetActor.id_role == person))
organism = params.get("organism")
if organism:
actors.append(cls.cor_dataset_actor.any(CorDatasetActor.id_organism == organism))
if actors:
query = query.where(sa.or_(*actors))
areas = params.get("areas")
if areas:
query = TDatasets.filter_by_areas(areas, query=query)
search = params.get("search")
if search:
search = search.strip()
# Where clauses to include other matching possibilities (id, uuid)
where_clauses = []
if search.isdigit(): # ID AF match
where_clauses.append(cls.id_dataset == int(search))
if len(search) >= MIN_LENGTH_UUID_OR_DATE_SEARCH_STRING: # UUID match
where_clauses.append(sa.cast(cls.unique_dataset_id, sa.String).ilike(f"{search}%"))
# if name search include acquisition framework
if _af_search:
where_clauses.append(
cls.acquisition_framework.has(
TAcquisitionFramework.filter_by_params(
{"search": search},
_ds_search=False,
).whereclause
),
)
# Dataset name matching
search_words_dataset_cte = select(
func.unnest(func.string_to_array(search, " ")).label("word")
).cte("search_words_dataset_cte")
matched_words_dataset_cte = (
select(
cls.id_dataset,
func.count().label("match_count"),
)
.join(
search_words_dataset_cte,
sa.or_(
cls.dataset_name.ilike(
func.concat("%", search_words_dataset_cte.c.word, "%")
),
*where_clauses,
),
)
.group_by(
cls.id_dataset,
)
).cte("matched_words_dataset_cte")
query = query.where(cls.id_dataset == matched_words_dataset_cte.c.id_dataset).order_by(
matched_words_dataset_cte.c.match_count.desc()
)
return query
@qfilter(query=True)
[docs]
def filter_by_readable(cls, query, user=None):
"""
Return the datasets where the user has autorization via its CRUVED
"""
return cls.filter_by_scope(cls._get_read_scope(user), user=user, query=query)
@qfilter(query=True)
[docs]
def filter_by_creatable(cls, module_code, *, query, user=None, object_code=None):
"""
Return all dataset where user have read rights minus those who user to not have
create rigth
"""
query = query.where(cls.modules.any(module_code=module_code))
scope = cls._get_read_scope(user)
create_scope = cls._get_create_scope(module_code, user=user, object_code=object_code)
if create_scope < scope:
scope = create_scope
return cls.filter_by_scope(scope, user=user, query=query)
@qfilter(query=True)
[docs]
def filter_by_areas(cls, areas, *, query):
from geonature.core.gn_synthese.models import Synthese
areaFilter = []
for id_area in areas:
areaFilter.append(LAreas.id_area == id_area)
return query.where(cls.synthese_records.any(Synthese.areas.any(sa.or_(*areaFilter))))