from collections import OrderedDict
from packaging import version
from typing import List
import sqlalchemy as sa
import datetime
from sqlalchemy import ForeignKey, Unicode, and_, DateTime, or_
from sqlalchemy.orm import (
relationship,
column_property,
foreign,
joinedload,
contains_eager,
deferred,
query_expression,
)
from sqlalchemy.sql import select, func, exists
from sqlalchemy.schema import FetchedValue
from sqlalchemy.dialects.postgresql import UUID, JSONB
from geoalchemy2 import Geometry
from geoalchemy2.shape import to_shape
from geojson import Feature
from flask import g, current_app
import flask_sqlalchemy
from utils_flask_sqla.models import qfilter
if version.parse(flask_sqlalchemy.__version__) >= version.parse("3"):
from flask_sqlalchemy.query import Query
else:
from flask_sqlalchemy import BaseQuery as Query
from werkzeug.exceptions import NotFound
from werkzeug.datastructures import MultiDict
from pypnnomenclature.models import TNomenclatures
from pypnusershub.db.models import User
from utils_flask_sqla.serializers import serializable, SERIALIZERS
from utils_flask_sqla_geo.serializers import geoserializable, shapeserializable
from utils_flask_sqla_geo.mixins import GeoFeatureCollectionMixin
from pypn_habref_api.models import Habref
from apptax.taxonomie.models import Taxref
from geonature.core.imports.models import TImports as Import
from ref_geo.models import LAreas
from geonature.core.gn_meta.models import TDatasets, TAcquisitionFramework
from geonature.core.gn_commons.models import (
THistoryActions,
TValidations,
last_validation,
TMedias,
TModules,
)
from geonature.utils.env import DB, db
[docs]
sortable_columns = ["meta_last_action_date"]
[docs]
filterable_columns = ["id_synthese", "last_action", "meta_last_action_date"]
@serializable(exclude=["module_url"])
[docs]
class TSources(DB.Model):
[docs]
__tablename__ = "t_sources"
[docs]
__table_args__ = {"schema": "gn_synthese"}
[docs]
id_source = DB.Column(DB.Integer, primary_key=True)
[docs]
name_source = DB.Column(DB.Unicode)
[docs]
desc_source = DB.Column(DB.Unicode)
[docs]
entity_source_pk_field = DB.Column(DB.Unicode)
[docs]
url_source = DB.Column(DB.Unicode)
[docs]
id_module = DB.Column(DB.Integer, ForeignKey(TModules.id_module))
[docs]
module = DB.relationship(TModules, backref=DB.backref("sources", cascade_backrefs=False))
@property
[docs]
def module_url(self):
if self.module is not None and hasattr(self.module, "generate_module_url_for_source"):
return self.module.generate_module_url_for_source(self)
else:
return None
[docs]
cor_observer_synthese = DB.Table(
"cor_observer_synthese",
DB.Column(
"id_synthese", DB.Integer, ForeignKey("gn_synthese.synthese.id_synthese"), primary_key=True
),
DB.Column("id_role", DB.Integer, ForeignKey(User.id_role), primary_key=True),
schema="gn_synthese",
)
@serializable
[docs]
class CorObserverSynthese(DB.Model):
[docs]
__tablename__ = "cor_observer_synthese"
[docs]
__table_args__ = {"schema": "gn_synthese", "extend_existing": True}
[docs]
id_synthese = DB.Column(
DB.Integer, ForeignKey("gn_synthese.synthese.id_synthese"), primary_key=True
)
[docs]
id_role = DB.Column(DB.Integer, ForeignKey(User.id_role), primary_key=True)
[docs]
corAreaSynthese = DB.Table(
"cor_area_synthese",
DB.Column(
"id_synthese", DB.Integer, ForeignKey("gn_synthese.synthese.id_synthese"), primary_key=True
),
DB.Column("id_area", DB.Integer, ForeignKey(LAreas.id_area), primary_key=True),
schema="gn_synthese",
)
[docs]
class SyntheseLogEntryQuery(Query):
[docs]
sortable_columns = ["meta_last_action_date"]
[docs]
filterable_columns = ["id_synthese", "last_action", "meta_last_action_date"]
[docs]
def filter_by_params(self, params):
for col in self.filterable_columns:
if col not in params:
continue
column = getattr(SyntheseLogEntry, col)
for value in params.getlist(col):
if isinstance(column.type, DateTime):
self = self.filter_by_datetime(column, value)
elif isinstance(column.type, Unicode):
self = self.where(column.ilike(f"%{value}%"))
else:
self = self.where(column == value)
return self
[docs]
def filter_by_datetime(self, col, dt: str = None):
"""Filter on date only with operator among "<,>,=<,>="
Parameters
----------
filters_with_operator : dict
params filters from url only
Returns
-------
Query
"""
if ":" in dt:
operator, dt = dt.split(":", 1)
else:
operator = "eq"
dt = datetime.datetime.fromisoformat(dt)
if operator == "gt":
f = col > dt
elif operator == "gte":
f = col >= dt
elif operator == "lt":
f = col < dt
elif operator == "lte":
f = col <= dt
elif operator == "eq":
# FIXME: if datetime is at midnight (looks like date), allows all the day?
f = col == dt
else:
raise ValueError(f"Invalid comparison operator: {operator}")
return self.where(f)
[docs]
def sort(self, columns: List[str]):
if not columns:
columns = ["meta_last_action_date"]
for col in columns:
if ":" in col:
col, direction = col.rsplit(":")
if direction == "asc":
direction = sa.asc
elif direction == "desc":
direction = sa.desc
else:
raise ValueError(f"Invalid sort direction: {direction}")
else:
direction = sa.asc
if col not in self.sortable_columns:
raise ValueError(f"Invalid sort column: {col}")
self = self.order_by(direction(getattr(SyntheseLogEntry, col)))
return self
[docs]
class SyntheseQuery(GeoFeatureCollectionMixin, Query):
[docs]
def join_nomenclatures(self):
return self.options(*[joinedload(n) for n in Synthese.nomenclature_fields])
[docs]
def lateraljoin_last_validation(self):
subquery = (
select(TValidations)
.where(TValidations.uuid_attached_row == Synthese.unique_id_sinp)
.limit(1)
.subquery()
.lateral("last_validation")
)
return self.outerjoin(subquery, sa.true()).options(
contains_eager(Synthese.last_validation, alias=subquery)
)
[docs]
def filter_by_scope(self, scope, user=None):
if user is None:
user = g.current_user
if scope == 0:
self = self.where(sa.false())
elif scope in (1, 2):
ors = []
datasets = db.session.scalars(
TDatasets.filter_by_readable(user).with_entities(TDatasets.id_dataset)
).all()
self = self.where(
or_(
Synthese.id_digitizer == user.id_role,
Synthese.cor_observers.any(id_role=user.id_role),
Synthese.id_dataset.in_([ds.id_dataset for ds in datasets]),
)
)
return self
@serializable
[docs]
class CorAreaSynthese(DB.Model):
[docs]
__tablename__ = "cor_area_synthese"
[docs]
__table_args__ = {"schema": "gn_synthese", "extend_existing": True}
[docs]
id_synthese = DB.Column(
DB.Integer, ForeignKey("gn_synthese.synthese.id_synthese"), primary_key=True
)
[docs]
id_area = DB.Column(DB.Integer, ForeignKey("ref_geo.l_areas.id_area"), primary_key=True)
@serializable
@geoserializable(geoCol="the_geom_4326", idCol="id_synthese")
@shapeserializable
[docs]
class Synthese(DB.Model):
[docs]
__tablename__ = "synthese"
[docs]
__table_args__ = {"schema": "gn_synthese"}
[docs]
query_class = SyntheseQuery
[docs]
nomenclature_fields = [
"nomenclature_geo_object_nature",
"nomenclature_grp_typ",
"nomenclature_obs_technique",
"nomenclature_bio_status",
"nomenclature_bio_condition",
"nomenclature_naturalness",
"nomenclature_exist_proof",
"nomenclature_valid_status",
"nomenclature_diffusion_level",
"nomenclature_life_stage",
"nomenclature_sex",
"nomenclature_obj_count",
"nomenclature_type_count",
"nomenclature_sensitivity",
"nomenclature_observation_status",
"nomenclature_blurring",
"nomenclature_source_status",
"nomenclature_info_geo_type",
"nomenclature_behaviour",
"nomenclature_biogeo_status",
"nomenclature_determination_method",
]
[docs]
id_synthese = DB.Column(DB.Integer, primary_key=True)
[docs]
unique_id_sinp = DB.Column(UUID(as_uuid=True))
[docs]
unique_id_sinp_grp = DB.Column(UUID(as_uuid=True))
[docs]
id_source = DB.Column(DB.Integer, ForeignKey(TSources.id_source), nullable=False)
[docs]
source = relationship(TSources)
[docs]
id_module = DB.Column(DB.Integer, ForeignKey(TModules.id_module))
[docs]
id_import = db.Column(db.Integer, ForeignKey(Import.id_import), nullable=True)
[docs]
module = DB.relationship(TModules)
[docs]
entity_source_pk_value = DB.Column(DB.Unicode)
[docs]
id_dataset = DB.Column(DB.Integer, ForeignKey(TDatasets.id_dataset))
[docs]
dataset = DB.relationship(
TDatasets, backref=DB.backref("synthese_records", lazy="dynamic", cascade_backrefs=False)
)
[docs]
grp_method = DB.Column(DB.Unicode(length=255))
[docs]
id_nomenclature_geo_object_nature = db.Column(
db.Integer, ForeignKey(TNomenclatures.id_nomenclature)
)
[docs]
nomenclature_geo_object_nature = db.relationship(
TNomenclatures, foreign_keys=[id_nomenclature_geo_object_nature]
)
[docs]
id_nomenclature_grp_typ = db.Column(
db.Integer, ForeignKey(TNomenclatures.id_nomenclature), server_default=FetchedValue()
)
[docs]
nomenclature_grp_typ = db.relationship(TNomenclatures, foreign_keys=[id_nomenclature_grp_typ])
[docs]
id_nomenclature_obs_technique = db.Column(
db.Integer, ForeignKey(TNomenclatures.id_nomenclature), server_default=FetchedValue()
)
[docs]
nomenclature_obs_technique = db.relationship(
TNomenclatures, foreign_keys=[id_nomenclature_obs_technique]
)
[docs]
id_nomenclature_bio_status = db.Column(
db.Integer, ForeignKey(TNomenclatures.id_nomenclature), server_default=FetchedValue()
)
[docs]
nomenclature_bio_status = db.relationship(
TNomenclatures, foreign_keys=[id_nomenclature_bio_status]
)
[docs]
id_nomenclature_bio_condition = db.Column(
db.Integer, ForeignKey(TNomenclatures.id_nomenclature), server_default=FetchedValue()
)
[docs]
nomenclature_bio_condition = db.relationship(
TNomenclatures, foreign_keys=[id_nomenclature_bio_condition]
)
[docs]
id_nomenclature_naturalness = db.Column(
db.Integer, ForeignKey(TNomenclatures.id_nomenclature), server_default=FetchedValue()
)
[docs]
nomenclature_naturalness = db.relationship(
TNomenclatures, foreign_keys=[id_nomenclature_naturalness]
)
[docs]
id_nomenclature_valid_status = db.Column(
db.Integer, ForeignKey(TNomenclatures.id_nomenclature), server_default=FetchedValue()
)
[docs]
nomenclature_valid_status = db.relationship(
TNomenclatures, foreign_keys=[id_nomenclature_valid_status]
)
[docs]
id_nomenclature_exist_proof = db.Column(
db.Integer, ForeignKey(TNomenclatures.id_nomenclature), server_default=FetchedValue()
)
[docs]
nomenclature_exist_proof = db.relationship(
TNomenclatures, foreign_keys=[id_nomenclature_exist_proof]
)
[docs]
id_nomenclature_diffusion_level = db.Column(
db.Integer, ForeignKey(TNomenclatures.id_nomenclature)
)
[docs]
nomenclature_diffusion_level = db.relationship(
TNomenclatures, foreign_keys=[id_nomenclature_diffusion_level]
)
[docs]
id_nomenclature_life_stage = db.Column(
db.Integer, ForeignKey(TNomenclatures.id_nomenclature), server_default=FetchedValue()
)
[docs]
nomenclature_life_stage = db.relationship(
TNomenclatures, foreign_keys=[id_nomenclature_life_stage]
)
[docs]
id_nomenclature_sex = db.Column(
db.Integer, ForeignKey(TNomenclatures.id_nomenclature), server_default=FetchedValue()
)
[docs]
nomenclature_sex = db.relationship(TNomenclatures, foreign_keys=[id_nomenclature_sex])
[docs]
id_nomenclature_obj_count = db.Column(
db.Integer, ForeignKey(TNomenclatures.id_nomenclature), server_default=FetchedValue()
)
[docs]
nomenclature_obj_count = db.relationship(
TNomenclatures, foreign_keys=[id_nomenclature_obj_count]
)
[docs]
id_nomenclature_type_count = db.Column(
db.Integer, ForeignKey(TNomenclatures.id_nomenclature), server_default=FetchedValue()
)
[docs]
nomenclature_type_count = db.relationship(
TNomenclatures, foreign_keys=[id_nomenclature_type_count]
)
[docs]
id_nomenclature_sensitivity = db.Column(db.Integer, ForeignKey(TNomenclatures.id_nomenclature))
[docs]
nomenclature_sensitivity = db.relationship(
TNomenclatures, foreign_keys=[id_nomenclature_sensitivity]
)
[docs]
id_nomenclature_observation_status = db.Column(
db.Integer, ForeignKey(TNomenclatures.id_nomenclature), server_default=FetchedValue()
)
[docs]
nomenclature_observation_status = db.relationship(
TNomenclatures, foreign_keys=[id_nomenclature_observation_status]
)
[docs]
id_nomenclature_blurring = db.Column(
db.Integer, ForeignKey(TNomenclatures.id_nomenclature), server_default=FetchedValue()
)
[docs]
nomenclature_blurring = db.relationship(TNomenclatures, foreign_keys=[id_nomenclature_blurring])
[docs]
id_nomenclature_source_status = db.Column(
db.Integer, ForeignKey(TNomenclatures.id_nomenclature), server_default=FetchedValue()
)
[docs]
nomenclature_source_status = db.relationship(
TNomenclatures,
foreign_keys=[id_nomenclature_source_status],
)
[docs]
id_nomenclature_info_geo_type = db.Column(
db.Integer, ForeignKey(TNomenclatures.id_nomenclature), server_default=FetchedValue()
)
[docs]
nomenclature_info_geo_type = db.relationship(
TNomenclatures, foreign_keys=[id_nomenclature_info_geo_type]
)
[docs]
id_nomenclature_behaviour = db.Column(
db.Integer, ForeignKey(TNomenclatures.id_nomenclature), server_default=FetchedValue()
)
[docs]
nomenclature_behaviour = db.relationship(
TNomenclatures, foreign_keys=[id_nomenclature_behaviour]
)
[docs]
id_nomenclature_biogeo_status = db.Column(
db.Integer, ForeignKey(TNomenclatures.id_nomenclature), server_default=FetchedValue()
)
[docs]
nomenclature_biogeo_status = db.relationship(
TNomenclatures, foreign_keys=[id_nomenclature_biogeo_status]
)
[docs]
id_nomenclature_determination_method = db.Column(
db.Integer, ForeignKey(TNomenclatures.id_nomenclature), server_default=FetchedValue()
)
[docs]
nomenclature_determination_method = db.relationship(
TNomenclatures, foreign_keys=[id_nomenclature_determination_method]
)
[docs]
reference_biblio = DB.Column(DB.Unicode(length=5000))
[docs]
count_min = DB.Column(DB.Integer)
[docs]
count_max = DB.Column(DB.Integer)
[docs]
cd_nom = DB.Column(DB.Integer, ForeignKey(Taxref.cd_nom))
[docs]
taxref = relationship(Taxref)
[docs]
cd_hab = DB.Column(DB.Integer, ForeignKey(Habref.cd_hab))
[docs]
habitat = relationship(Habref)
[docs]
nom_cite = DB.Column(DB.Unicode(length=1000), nullable=False)
[docs]
sample_number_proof = DB.Column(DB.UnicodeText)
[docs]
digital_proof = DB.Column(DB.UnicodeText)
[docs]
non_digital_proof = DB.Column(DB.UnicodeText)
[docs]
altitude_min = DB.Column(DB.Integer)
[docs]
altitude_max = DB.Column(DB.Integer)
[docs]
depth_min = DB.Column(DB.Integer)
[docs]
depth_max = DB.Column(DB.Integer)
[docs]
place_name = DB.Column(DB.Unicode(length=500))
[docs]
the_geom_4326 = DB.Column(Geometry("GEOMETRY", 4326))
[docs]
the_geom_4326_geojson = column_property(func.ST_AsGeoJSON(the_geom_4326), deferred=True)
[docs]
the_geom_point = deferred(DB.Column(Geometry("GEOMETRY", 4326)))
[docs]
the_geom_local = deferred(DB.Column(Geometry("GEOMETRY")))
[docs]
the_geom_authorized = query_expression()
[docs]
precision = DB.Column(DB.Integer)
[docs]
id_area_attachment = DB.Column(DB.Integer, ForeignKey(LAreas.id_area))
[docs]
date_min = DB.Column(DB.DateTime, nullable=False)
[docs]
date_max = DB.Column(DB.DateTime, nullable=False)
[docs]
validator = DB.Column(DB.Unicode(length=1000))
[docs]
observers = DB.Column(DB.Unicode(length=1000))
[docs]
determiner = DB.Column(DB.Unicode(length=1000))
[docs]
id_digitiser = DB.Column(DB.Integer, ForeignKey(User.id_role))
[docs]
digitiser = db.relationship(User, foreign_keys=[id_digitiser])
[docs]
additional_data = DB.Column(JSONB)
[docs]
last_action = DB.Column(DB.Unicode)
[docs]
areas = relationship(LAreas, secondary=corAreaSynthese, backref="synthese_obs")
[docs]
area_attachment = relationship(LAreas, foreign_keys=[id_area_attachment])
[docs]
validations = relationship(TValidations, backref="attached_row")
[docs]
last_validation = relationship(last_validation, uselist=False, viewonly=True)
[docs]
cor_observers = DB.relationship(User, secondary=cor_observer_synthese)
[docs]
def _has_scope_grant(self, scope):
if scope == 0:
return False
elif scope in (1, 2):
if g.current_user == self.digitiser:
return True
if g.current_user in self.cor_observers:
return True
return self.dataset.has_instance_permission(scope)
elif scope == 3:
return True
[docs]
def _has_permissions_grant(self, permissions):
blur_sensitive_observations = current_app.config["SYNTHESE"]["BLUR_SENSITIVE_OBSERVATIONS"]
if not permissions:
return False
for perm in permissions:
if perm.has_other_filters_than("SCOPE", "SENSITIVITY"):
continue # unsupported filters
if perm.sensitivity_filter:
if (
blur_sensitive_observations
and self.nomenclature_sensitivity.cd_nomenclature == "4"
):
continue
if (
not blur_sensitive_observations
and self.nomenclature_sensitivity.cd_nomenclature != "0"
):
continue
if perm.scope_value:
if g.current_user == self.digitiser:
return True
if g.current_user in self.cor_observers:
return True
if self.dataset.has_instance_permission(perm.scope_value):
return True
continue # scope filter denied access, check next permission
return True # no filter exclude this permission
return False
[docs]
def has_instance_permission(self, permissions):
if type(permissions) == int:
return self._has_scope_grant(permissions)
else:
return self._has_permissions_grant(permissions)
@qfilter(query=True)
[docs]
def join_nomenclatures(cls, **kwargs):
return kwargs["query"].options(*[joinedload(n) for n in Synthese.nomenclature_fields])
@qfilter(query=True)
[docs]
def lateraljoin_last_validation(cls, **kwargs):
subquery = (
select(TValidations)
.where(TValidations.uuid_attached_row == Synthese.unique_id_sinp)
.limit(1)
.subquery()
.lateral("last_validation")
)
return (
kwargs["query"]
.outerjoin(subquery, sa.true())
.options(contains_eager(Synthese.last_validation, alias=subquery))
)
@qfilter(query=True)
[docs]
def filter_by_scope(cls, scope, user=None, **kwargs):
query = kwargs["query"]
if user is None:
user = g.current_user
if scope == 0:
query = query.where(sa.false())
elif scope in (1, 2):
ors = []
datasets = db.session.scalars(
TDatasets.filter_by_readable(user).with_entities(TDatasets.id_dataset)
).all()
query = query.where(
or_(
Synthese.id_digitizer == user.id_role,
Synthese.cor_observers.any(id_role=user.id_role),
Synthese.id_dataset.in_([ds.id_dataset for ds in datasets]),
)
)
return query
@serializable
[docs]
class DefaultsNomenclaturesValue(DB.Model):
[docs]
__tablename__ = "defaults_nomenclatures_value"
[docs]
__table_args__ = {"schema": "gn_synthese"}
[docs]
mnemonique_type = DB.Column(DB.Integer, primary_key=True)
[docs]
id_organism = DB.Column(DB.Integer, primary_key=True)
[docs]
regne = DB.Column(DB.Unicode, primary_key=True)
[docs]
group2_inpn = DB.Column(DB.Unicode, primary_key=True)
[docs]
id_nomenclature = DB.Column(DB.Integer)
# Type library to list every report types
@serializable
[docs]
class BibReportsTypes(DB.Model):
[docs]
__tablename__ = "bib_reports_types"
[docs]
__table_args__ = {"schema": "gn_synthese"}
[docs]
id_type = DB.Column(DB.Integer(), primary_key=True)
[docs]
type = DB.Column(DB.Text())
# Relation report model with User and BibReportsTypes to get every infos about a report
@serializable
[docs]
class TReport(DB.Model):
[docs]
__tablename__ = "t_reports"
[docs]
__table_args__ = {"schema": "gn_synthese"}
[docs]
id_report = DB.Column(DB.Integer(), primary_key=True)
[docs]
id_synthese = DB.Column(DB.Integer(), ForeignKey("gn_synthese.synthese.id_synthese"))
[docs]
id_role = DB.Column(DB.Integer(), ForeignKey(User.id_role))
[docs]
id_type = DB.Column(DB.Integer(), ForeignKey(BibReportsTypes.id_type))
[docs]
content = DB.Column(DB.Text())
[docs]
creation_date = DB.Column(DB.DateTime(), default=datetime.datetime.utcnow)
[docs]
deleted = DB.Column(DB.Boolean(), default=False)
[docs]
synthese = relationship(Synthese, backref=db.backref("reports", order_by=creation_date))
[docs]
report_type = relationship(BibReportsTypes)
[docs]
user = DB.relationship(User)
@serializable
@geoserializable(geoCol="the_geom_4326", idCol="id_synthese")
[docs]
class VSyntheseForWebApp(DB.Model):
[docs]
__tablename__ = "v_synthese_for_web_app"
[docs]
__table_args__ = {"schema": "gn_synthese"}
[docs]
id_synthese = DB.Column(
DB.Integer,
ForeignKey("gn_synthese.synthese.id_synthese"),
primary_key=True,
)
[docs]
unique_id_sinp = DB.Column(UUID(as_uuid=True))
[docs]
unique_id_sinp_grp = DB.Column(UUID(as_uuid=True))
[docs]
id_source = DB.Column(DB.Integer, nullable=False)
[docs]
id_import = DB.Column(DB.Integer, nullable=True)
[docs]
id_module = DB.Column(DB.Integer)
[docs]
entity_source_pk_value = DB.Column(DB.Integer)
[docs]
id_dataset = DB.Column(DB.Integer)
[docs]
dataset_name = DB.Column(DB.String)
[docs]
id_acquisition_framework = DB.Column(DB.Integer)
[docs]
count_min = DB.Column(DB.Integer)
[docs]
count_max = DB.Column(DB.Integer)
[docs]
cd_nom = DB.Column(DB.Integer)
[docs]
cd_ref = DB.Column(DB.Unicode)
[docs]
nom_cite = DB.Column(DB.Unicode)
[docs]
nom_valide = DB.Column(DB.Unicode)
[docs]
nom_vern = DB.Column(DB.Unicode)
[docs]
lb_nom = DB.Column(DB.Unicode)
[docs]
group1_inpn = DB.Column(DB.Unicode)
[docs]
group2_inpn = DB.Column(DB.Unicode)
[docs]
group3_inpn = DB.Column(DB.Unicode)
[docs]
sample_number_proof = DB.Column(DB.Unicode)
[docs]
digital_proof = DB.Column(DB.Unicode)
[docs]
non_digital_proof = DB.Column(DB.Unicode)
[docs]
altitude_min = DB.Column(DB.Integer)
[docs]
altitude_max = DB.Column(DB.Integer)
[docs]
depth_min = DB.Column(DB.Integer)
[docs]
depth_max = DB.Column(DB.Integer)
[docs]
place_name = DB.Column(DB.Unicode)
[docs]
precision = DB.Column(DB.Integer)
[docs]
the_geom_4326 = DB.Column(Geometry("GEOMETRY", 4326))
[docs]
date_min = DB.Column(DB.DateTime)
[docs]
date_max = DB.Column(DB.DateTime)
[docs]
validator = DB.Column(DB.Unicode)
[docs]
observers = DB.Column(DB.Unicode)
[docs]
determiner = DB.Column(DB.Unicode)
[docs]
id_digitiser = DB.Column(DB.Integer)
[docs]
last_action = DB.Column(DB.Unicode)
[docs]
id_nomenclature_geo_object_nature = DB.Column(DB.Integer)
[docs]
id_nomenclature_info_geo_type = DB.Column(DB.Integer)
[docs]
id_nomenclature_grp_typ = DB.Column(DB.Integer)
[docs]
grp_method = DB.Column(DB.Unicode)
[docs]
id_nomenclature_obs_technique = DB.Column(DB.Integer)
[docs]
id_nomenclature_bio_status = DB.Column(DB.Integer)
[docs]
id_nomenclature_bio_condition = DB.Column(DB.Integer)
[docs]
id_nomenclature_naturalness = DB.Column(DB.Integer)
[docs]
id_nomenclature_exist_proof = DB.Column(DB.Integer)
[docs]
id_nomenclature_valid_status = DB.Column(DB.Integer)
[docs]
id_nomenclature_diffusion_level = DB.Column(DB.Integer)
[docs]
id_nomenclature_life_stage = DB.Column(DB.Integer)
[docs]
id_nomenclature_sex = DB.Column(DB.Integer)
[docs]
id_nomenclature_obj_count = DB.Column(DB.Integer)
[docs]
id_nomenclature_type_count = DB.Column(DB.Integer)
[docs]
id_nomenclature_sensitivity = DB.Column(DB.Integer)
[docs]
id_nomenclature_observation_status = DB.Column(DB.Integer)
[docs]
id_nomenclature_blurring = DB.Column(DB.Integer)
[docs]
id_nomenclature_source_status = DB.Column(DB.Integer)
[docs]
id_nomenclature_determination_method = DB.Column(DB.Integer)
[docs]
id_nomenclature_behaviour = DB.Column(DB.Integer)
[docs]
reference_biblio = DB.Column(DB.Unicode)
[docs]
name_source = DB.Column(DB.Unicode)
[docs]
url_source = DB.Column(DB.Unicode)
[docs]
st_asgeojson = DB.Column(DB.Unicode)
[docs]
reports = relationship(
TReport, primaryjoin=(TReport.id_synthese == foreign(id_synthese)), uselist=True
)
# Non utilisé - laissé pour exemple d'une sérialisation ordonnée
[docs]
def synthese_export_serialization(cls):
"""
Décorateur qui definit une serialisation particuliere pour la vue v_synthese_for_export
Il rajoute la fonction as_dict_ordered qui conserve l'ordre des attributs tel que definit dans le model
(fonctions utilisees pour les exports) et qui redefinit le nom des colonnes tel qu'ils sont nommes en configuration
"""
EXPORT_COLUMNS = config["SYNTHESE"]["EXPORT_COLUMNS"]
# tab of cls attributes from EXPORT COLUMNS
formated_default_columns = [key for key, value in EXPORT_COLUMNS.items()]
# list of tuple (class attribute, serializer)
cls_db_cols_and_serializer = []
# list of attributes of the class which are in the synthese export cnfig
# use for generate shapefiles
cls.db_cols = []
for key in formated_default_columns:
# get the cls attribut:
try:
# get the class atribut from the syntese export config
cls_attri = getattr(cls, key)
# add in serialiser list
if not cls_attri.type.__class__.__name__ == "Geometry":
cls_db_cols_and_serializer.append(
(
cls_attri.key,
SERIALIZERS.get(cls_attri.type.__class__.__name__.lower(), lambda x: x),
)
)
# add in cls.db_cols
cls.db_cols.append(cls_attri)
# execpt if the attribute does not exist
except AttributeError:
pass
def serialize_order_fn(self):
order_dict = OrderedDict()
for item, _serializer in cls_db_cols_and_serializer:
order_dict.update({EXPORT_COLUMNS.get(item): _serializer(getattr(self, item))})
return order_dict
def serialize_geofn(self, geoCol, idCol):
if not getattr(self, geoCol) is None:
geometry = to_shape(getattr(self, geoCol))
else:
geometry = {"type": "Point", "coordinates": [0, 0]}
feature = Feature(
id=str(getattr(self, idCol)),
geometry=geometry,
properties=self.as_dict_ordered(),
)
return feature
cls.as_dict_ordered = serialize_order_fn
cls.as_geofeature_ordered = serialize_geofn
return cls
@serializable
[docs]
class VColorAreaTaxon(DB.Model):
[docs]
__tablename__ = "v_color_taxon_area"
[docs]
__table_args__ = {"schema": "gn_synthese"}
[docs]
cd_nom = DB.Column(DB.Integer(), ForeignKey(Taxref.cd_nom), primary_key=True)
[docs]
id_area = DB.Column(DB.Integer(), ForeignKey(LAreas.id_area), primary_key=True)
[docs]
nb_obs = DB.Column(DB.Integer())
[docs]
last_date = DB.Column(DB.DateTime())
[docs]
color = DB.Column(DB.Unicode())
@serializable
[docs]
class SyntheseLogEntry(DB.Model):
"""Log synthese table, populated with Delete Triggers on gn_synthes.synthese
Parameters
----------
DB:
Flask SQLAlchemy controller
"""
[docs]
__tablename__ = "t_log_synthese"
[docs]
__table_args__ = {"schema": "gn_synthese"}
[docs]
query_class = SyntheseLogEntryQuery
[docs]
id_synthese = DB.Column(DB.Integer(), primary_key=True)
[docs]
last_action = DB.Column(DB.String(length=1))
[docs]
meta_last_action_date = DB.Column(DB.DateTime)
@qfilter(query=True)
[docs]
def filter_by_params(cls, params, **kwargs):
query = kwargs["query"]
for col in filterable_columns:
if col not in params:
continue
column = getattr(cls, col)
for value in params.getlist(col):
if isinstance(column.type, DateTime):
query = cls.filter_by_datetime(column, value)
elif isinstance(column.type, Unicode):
query = query.where(column.ilike(f"%{value}%"))
else:
query = query.where(column == value)
return query
@qfilter(query=True)
[docs]
def filter_by_datetime(cls, col, dt: str = None, **kwargs):
"""Filter on date only with operator among "<,>,=<,>="
Parameters
----------
filters_with_operator : dict
params filters from url only
Returns
-------
Query
"""
query = kwargs["query"]
if ":" in dt:
operator, dt = dt.split(":", 1)
else:
operator = "eq"
dt = datetime.datetime.fromisoformat(dt)
if operator == "gt":
f = col > dt
elif operator == "gte":
f = col >= dt
elif operator == "lt":
f = col < dt
elif operator == "lte":
f = col <= dt
elif operator == "eq":
# FIXME: if datetime is at midnight (looks like date), allows all the day?
f = col == dt
else:
raise ValueError(f"Invalid comparison operator: {operator}")
return query.where(f)
@qfilter(query=True)
[docs]
def sort(cls, columns: List[str], *, query):
if not columns:
columns = ["meta_last_action_date"]
for col in columns:
if ":" in col:
col, direction = col.rsplit(":")
if direction == "asc":
direction = sa.asc
elif direction == "desc":
direction = sa.desc
else:
raise ValueError(f"Invalid sort direction: {direction}")
else:
direction = sa.asc
if col not in sortable_columns:
raise ValueError(f"Invalid sort column: {col}")
query = query.order_by(direction(getattr(cls, col)))
return query
# defined here to avoid circular dependencies
[docs]
source_subquery = (
select(TSources.id_source, Synthese.id_dataset)
.where(TSources.id_source == Synthese.id_source)
.distinct()
.alias()
)
TDatasets.sources = db.relationship(
TSources,
primaryjoin=TDatasets.id_dataset == source_subquery.c.id_dataset,
secondaryjoin=source_subquery.c.id_source == TSources.id_source,
secondary=source_subquery,
viewonly=True,
)
TDatasets.synthese_records_count = column_property(
select(func.count(Synthese.id_synthese))
.where(Synthese.id_dataset == TDatasets.id_dataset)
.scalar_subquery()
.label("synthese_records_count"),
deferred=True,
)