Code source de geonature.core.gn_synthese.models

from collections import OrderedDict
from packaging import version
from typing import List

import sqlalchemy as sa
import datetime
from sqlalchemy import ForeignKey, Unicode, and_, DateTime, or_
from sqlalchemy.orm import (
    relationship,
    column_property,
    foreign,
    joinedload,
    contains_eager,
    deferred,
    query_expression,
)
from sqlalchemy.sql import select, func, exists
from sqlalchemy.schema import FetchedValue
from sqlalchemy.dialects.postgresql import UUID, JSONB
from geoalchemy2 import Geometry
from geoalchemy2.shape import to_shape

from geojson import Feature
from flask import g, current_app
import flask_sqlalchemy
from utils_flask_sqla.models import qfilter

if version.parse(flask_sqlalchemy.__version__) >= version.parse("3"):
    from flask_sqlalchemy.query import Query
else:
    from flask_sqlalchemy import BaseQuery as Query

from werkzeug.exceptions import NotFound
from werkzeug.datastructures import MultiDict

from pypnnomenclature.models import TNomenclatures
from pypnusershub.db.models import User
from utils_flask_sqla.serializers import serializable, SERIALIZERS
from utils_flask_sqla_geo.serializers import geoserializable, shapeserializable
from utils_flask_sqla_geo.mixins import GeoFeatureCollectionMixin
from pypn_habref_api.models import Habref
from apptax.taxonomie.models import Taxref
from geonature.core.imports.models import TImports as Import
from ref_geo.models import LAreas

from geonature.core.gn_meta.models import TDatasets, TAcquisitionFramework
from geonature.core.gn_commons.models import (
    THistoryActions,
    TValidations,
    last_validation,
    TMedias,
    TModules,
)
from geonature.utils.env import DB, db


[docs] sortable_columns = ["meta_last_action_date"]
[docs] filterable_columns = ["id_synthese", "last_action", "meta_last_action_date"]
@serializable(exclude=["module_url"])
[docs] class TSources(DB.Model):
[docs] __tablename__ = "t_sources"
[docs] __table_args__ = {"schema": "gn_synthese"}
[docs] id_source = DB.Column(DB.Integer, primary_key=True)
[docs] name_source = DB.Column(DB.Unicode)
[docs] desc_source = DB.Column(DB.Unicode)
[docs] entity_source_pk_field = DB.Column(DB.Unicode)
[docs] url_source = DB.Column(DB.Unicode)
[docs] meta_create_date = DB.Column(DB.DateTime)
[docs] meta_update_date = DB.Column(DB.DateTime)
[docs] id_module = DB.Column(DB.Integer, ForeignKey(TModules.id_module))
[docs] module = DB.relationship(TModules, backref=DB.backref("sources", cascade_backrefs=False))
@property
[docs] def module_url(self): if self.module is not None and hasattr(self.module, "generate_module_url_for_source"): return self.module.generate_module_url_for_source(self) else: return None
[docs] cor_observer_synthese = DB.Table( "cor_observer_synthese", DB.Column( "id_synthese", DB.Integer, ForeignKey("gn_synthese.synthese.id_synthese"), primary_key=True ), DB.Column("id_role", DB.Integer, ForeignKey(User.id_role), primary_key=True), schema="gn_synthese", )
@serializable
[docs] class CorObserverSynthese(DB.Model):
[docs] __tablename__ = "cor_observer_synthese"
[docs] __table_args__ = {"schema": "gn_synthese", "extend_existing": True}
[docs] id_synthese = DB.Column( DB.Integer, ForeignKey("gn_synthese.synthese.id_synthese"), primary_key=True )
[docs] id_role = DB.Column(DB.Integer, ForeignKey(User.id_role), primary_key=True)
[docs] corAreaSynthese = DB.Table( "cor_area_synthese", DB.Column( "id_synthese", DB.Integer, ForeignKey("gn_synthese.synthese.id_synthese"), primary_key=True ), DB.Column("id_area", DB.Integer, ForeignKey(LAreas.id_area), primary_key=True), schema="gn_synthese", )
[docs] class SyntheseLogEntryQuery(Query):
[docs] sortable_columns = ["meta_last_action_date"]
[docs] filterable_columns = ["id_synthese", "last_action", "meta_last_action_date"]
[docs] def filter_by_params(self, params): for col in self.filterable_columns: if col not in params: continue column = getattr(SyntheseLogEntry, col) for value in params.getlist(col): if isinstance(column.type, DateTime): self = self.filter_by_datetime(column, value) elif isinstance(column.type, Unicode): self = self.where(column.ilike(f"%{value}%")) else: self = self.where(column == value) return self
[docs] def filter_by_datetime(self, col, dt: str = None): """Filter on date only with operator among "<,>,=<,>=" Parameters ---------- filters_with_operator : dict params filters from url only Returns ------- Query """ if ":" in dt: operator, dt = dt.split(":", 1) else: operator = "eq" dt = datetime.datetime.fromisoformat(dt) if operator == "gt": f = col > dt elif operator == "gte": f = col >= dt elif operator == "lt": f = col < dt elif operator == "lte": f = col <= dt elif operator == "eq": # FIXME: if datetime is at midnight (looks like date), allows all the day? f = col == dt else: raise ValueError(f"Invalid comparison operator: {operator}") return self.where(f)
[docs] def sort(self, columns: List[str]): if not columns: columns = ["meta_last_action_date"] for col in columns: if ":" in col: col, direction = col.rsplit(":") if direction == "asc": direction = sa.asc elif direction == "desc": direction = sa.desc else: raise ValueError(f"Invalid sort direction: {direction}") else: direction = sa.asc if col not in self.sortable_columns: raise ValueError(f"Invalid sort column: {col}") self = self.order_by(direction(getattr(SyntheseLogEntry, col))) return self
[docs] class SyntheseQuery(GeoFeatureCollectionMixin, Query):
[docs] def join_nomenclatures(self): return self.options(*[joinedload(n) for n in Synthese.nomenclature_fields])
[docs] def lateraljoin_last_validation(self): subquery = ( select(TValidations) .where(TValidations.uuid_attached_row == Synthese.unique_id_sinp) .limit(1) .subquery() .lateral("last_validation") ) return self.outerjoin(subquery, sa.true()).options( contains_eager(Synthese.last_validation, alias=subquery) )
[docs] def filter_by_scope(self, scope, user=None): if user is None: user = g.current_user if scope == 0: self = self.where(sa.false()) elif scope in (1, 2): ors = [] datasets = db.session.scalars( TDatasets.filter_by_readable(user).with_entities(TDatasets.id_dataset) ).all() self = self.where( or_( Synthese.id_digitizer == user.id_role, Synthese.cor_observers.any(id_role=user.id_role), Synthese.id_dataset.in_([ds.id_dataset for ds in datasets]), ) ) return self
@serializable
[docs] class CorAreaSynthese(DB.Model):
[docs] __tablename__ = "cor_area_synthese"
[docs] __table_args__ = {"schema": "gn_synthese", "extend_existing": True}
[docs] id_synthese = DB.Column( DB.Integer, ForeignKey("gn_synthese.synthese.id_synthese"), primary_key=True )
[docs] id_area = DB.Column(DB.Integer, ForeignKey("ref_geo.l_areas.id_area"), primary_key=True)
@serializable @geoserializable(geoCol="the_geom_4326", idCol="id_synthese") @shapeserializable
[docs] class Synthese(DB.Model):
[docs] __tablename__ = "synthese"
[docs] __table_args__ = {"schema": "gn_synthese"}
[docs] query_class = SyntheseQuery
[docs] nomenclature_fields = [ "nomenclature_geo_object_nature", "nomenclature_grp_typ", "nomenclature_obs_technique", "nomenclature_bio_status", "nomenclature_bio_condition", "nomenclature_naturalness", "nomenclature_exist_proof", "nomenclature_valid_status", "nomenclature_diffusion_level", "nomenclature_life_stage", "nomenclature_sex", "nomenclature_obj_count", "nomenclature_type_count", "nomenclature_sensitivity", "nomenclature_observation_status", "nomenclature_blurring", "nomenclature_source_status", "nomenclature_info_geo_type", "nomenclature_behaviour", "nomenclature_biogeo_status", "nomenclature_determination_method", ]
[docs] id_synthese = DB.Column(DB.Integer, primary_key=True)
[docs] unique_id_sinp = DB.Column(UUID(as_uuid=True))
[docs] unique_id_sinp_grp = DB.Column(UUID(as_uuid=True))
[docs] id_source = DB.Column(DB.Integer, ForeignKey(TSources.id_source), nullable=False)
[docs] source = relationship(TSources)
[docs] id_module = DB.Column(DB.Integer, ForeignKey(TModules.id_module))
[docs] id_import = db.Column(db.Integer, ForeignKey(Import.id_import), nullable=True)
[docs] module = DB.relationship(TModules)
[docs] entity_source_pk_value = DB.Column(DB.Unicode)
[docs] id_dataset = DB.Column(DB.Integer, ForeignKey(TDatasets.id_dataset))
[docs] dataset = DB.relationship( TDatasets, backref=DB.backref("synthese_records", lazy="dynamic", cascade_backrefs=False) )
[docs] grp_method = DB.Column(DB.Unicode(length=255))
[docs] id_nomenclature_geo_object_nature = db.Column( db.Integer, ForeignKey(TNomenclatures.id_nomenclature) )
[docs] nomenclature_geo_object_nature = db.relationship( TNomenclatures, foreign_keys=[id_nomenclature_geo_object_nature] )
[docs] id_nomenclature_grp_typ = db.Column( db.Integer, ForeignKey(TNomenclatures.id_nomenclature), server_default=FetchedValue() )
[docs] nomenclature_grp_typ = db.relationship(TNomenclatures, foreign_keys=[id_nomenclature_grp_typ])
[docs] id_nomenclature_obs_technique = db.Column( db.Integer, ForeignKey(TNomenclatures.id_nomenclature), server_default=FetchedValue() )
[docs] nomenclature_obs_technique = db.relationship( TNomenclatures, foreign_keys=[id_nomenclature_obs_technique] )
[docs] id_nomenclature_bio_status = db.Column( db.Integer, ForeignKey(TNomenclatures.id_nomenclature), server_default=FetchedValue() )
[docs] nomenclature_bio_status = db.relationship( TNomenclatures, foreign_keys=[id_nomenclature_bio_status] )
[docs] id_nomenclature_bio_condition = db.Column( db.Integer, ForeignKey(TNomenclatures.id_nomenclature), server_default=FetchedValue() )
[docs] nomenclature_bio_condition = db.relationship( TNomenclatures, foreign_keys=[id_nomenclature_bio_condition] )
[docs] id_nomenclature_naturalness = db.Column( db.Integer, ForeignKey(TNomenclatures.id_nomenclature), server_default=FetchedValue() )
[docs] nomenclature_naturalness = db.relationship( TNomenclatures, foreign_keys=[id_nomenclature_naturalness] )
[docs] id_nomenclature_valid_status = db.Column( db.Integer, ForeignKey(TNomenclatures.id_nomenclature), server_default=FetchedValue() )
[docs] nomenclature_valid_status = db.relationship( TNomenclatures, foreign_keys=[id_nomenclature_valid_status] )
[docs] id_nomenclature_exist_proof = db.Column( db.Integer, ForeignKey(TNomenclatures.id_nomenclature), server_default=FetchedValue() )
[docs] nomenclature_exist_proof = db.relationship( TNomenclatures, foreign_keys=[id_nomenclature_exist_proof] )
[docs] id_nomenclature_diffusion_level = db.Column( db.Integer, ForeignKey(TNomenclatures.id_nomenclature) )
[docs] nomenclature_diffusion_level = db.relationship( TNomenclatures, foreign_keys=[id_nomenclature_diffusion_level] )
[docs] id_nomenclature_life_stage = db.Column( db.Integer, ForeignKey(TNomenclatures.id_nomenclature), server_default=FetchedValue() )
[docs] nomenclature_life_stage = db.relationship( TNomenclatures, foreign_keys=[id_nomenclature_life_stage] )
[docs] id_nomenclature_sex = db.Column( db.Integer, ForeignKey(TNomenclatures.id_nomenclature), server_default=FetchedValue() )
[docs] nomenclature_sex = db.relationship(TNomenclatures, foreign_keys=[id_nomenclature_sex])
[docs] id_nomenclature_obj_count = db.Column( db.Integer, ForeignKey(TNomenclatures.id_nomenclature), server_default=FetchedValue() )
[docs] nomenclature_obj_count = db.relationship( TNomenclatures, foreign_keys=[id_nomenclature_obj_count] )
[docs] id_nomenclature_type_count = db.Column( db.Integer, ForeignKey(TNomenclatures.id_nomenclature), server_default=FetchedValue() )
[docs] nomenclature_type_count = db.relationship( TNomenclatures, foreign_keys=[id_nomenclature_type_count] )
[docs] id_nomenclature_sensitivity = db.Column(db.Integer, ForeignKey(TNomenclatures.id_nomenclature))
[docs] nomenclature_sensitivity = db.relationship( TNomenclatures, foreign_keys=[id_nomenclature_sensitivity] )
[docs] id_nomenclature_observation_status = db.Column( db.Integer, ForeignKey(TNomenclatures.id_nomenclature), server_default=FetchedValue() )
[docs] nomenclature_observation_status = db.relationship( TNomenclatures, foreign_keys=[id_nomenclature_observation_status] )
[docs] id_nomenclature_blurring = db.Column( db.Integer, ForeignKey(TNomenclatures.id_nomenclature), server_default=FetchedValue() )
[docs] nomenclature_blurring = db.relationship(TNomenclatures, foreign_keys=[id_nomenclature_blurring])
[docs] id_nomenclature_source_status = db.Column( db.Integer, ForeignKey(TNomenclatures.id_nomenclature), server_default=FetchedValue() )
[docs] nomenclature_source_status = db.relationship( TNomenclatures, foreign_keys=[id_nomenclature_source_status], )
[docs] id_nomenclature_info_geo_type = db.Column( db.Integer, ForeignKey(TNomenclatures.id_nomenclature), server_default=FetchedValue() )
[docs] nomenclature_info_geo_type = db.relationship( TNomenclatures, foreign_keys=[id_nomenclature_info_geo_type] )
[docs] id_nomenclature_behaviour = db.Column( db.Integer, ForeignKey(TNomenclatures.id_nomenclature), server_default=FetchedValue() )
[docs] nomenclature_behaviour = db.relationship( TNomenclatures, foreign_keys=[id_nomenclature_behaviour] )
[docs] id_nomenclature_biogeo_status = db.Column( db.Integer, ForeignKey(TNomenclatures.id_nomenclature), server_default=FetchedValue() )
[docs] nomenclature_biogeo_status = db.relationship( TNomenclatures, foreign_keys=[id_nomenclature_biogeo_status] )
[docs] id_nomenclature_determination_method = db.Column( db.Integer, ForeignKey(TNomenclatures.id_nomenclature), server_default=FetchedValue() )
[docs] nomenclature_determination_method = db.relationship( TNomenclatures, foreign_keys=[id_nomenclature_determination_method] )
[docs] reference_biblio = DB.Column(DB.Unicode(length=5000))
[docs] count_min = DB.Column(DB.Integer)
[docs] count_max = DB.Column(DB.Integer)
[docs] cd_nom = DB.Column(DB.Integer, ForeignKey(Taxref.cd_nom))
[docs] taxref = relationship(Taxref)
[docs] cd_hab = DB.Column(DB.Integer, ForeignKey(Habref.cd_hab))
[docs] habitat = relationship(Habref)
[docs] nom_cite = DB.Column(DB.Unicode(length=1000), nullable=False)
[docs] meta_v_taxref = DB.Column(DB.Unicode(length=50))
[docs] sample_number_proof = DB.Column(DB.UnicodeText)
[docs] digital_proof = DB.Column(DB.UnicodeText)
[docs] non_digital_proof = DB.Column(DB.UnicodeText)
[docs] altitude_min = DB.Column(DB.Integer)
[docs] altitude_max = DB.Column(DB.Integer)
[docs] depth_min = DB.Column(DB.Integer)
[docs] depth_max = DB.Column(DB.Integer)
[docs] place_name = DB.Column(DB.Unicode(length=500))
[docs] the_geom_4326 = DB.Column(Geometry("GEOMETRY", 4326))
[docs] the_geom_4326_geojson = column_property(func.ST_AsGeoJSON(the_geom_4326), deferred=True)
[docs] the_geom_point = deferred(DB.Column(Geometry("GEOMETRY", 4326)))
[docs] the_geom_local = deferred(DB.Column(Geometry("GEOMETRY")))
[docs] the_geom_authorized = query_expression()
[docs] precision = DB.Column(DB.Integer)
[docs] id_area_attachment = DB.Column(DB.Integer, ForeignKey(LAreas.id_area))
[docs] date_min = DB.Column(DB.DateTime, nullable=False)
[docs] date_max = DB.Column(DB.DateTime, nullable=False)
[docs] validator = DB.Column(DB.Unicode(length=1000))
[docs] validation_comment = DB.Column(DB.Unicode)
[docs] observers = DB.Column(DB.Unicode(length=1000))
[docs] determiner = DB.Column(DB.Unicode(length=1000))
[docs] id_digitiser = DB.Column(DB.Integer, ForeignKey(User.id_role))
[docs] digitiser = db.relationship(User, foreign_keys=[id_digitiser])
[docs] comment_context = DB.Column(DB.UnicodeText)
[docs] comment_description = DB.Column(DB.UnicodeText)
[docs] additional_data = DB.Column(JSONB)
[docs] meta_validation_date = DB.Column(DB.DateTime)
[docs] meta_create_date = DB.Column(DB.DateTime, server_default=FetchedValue())
[docs] meta_update_date = DB.Column(DB.DateTime, server_default=FetchedValue())
[docs] last_action = DB.Column(DB.Unicode)
[docs] areas = relationship(LAreas, secondary=corAreaSynthese, backref="synthese_obs")
[docs] area_attachment = relationship(LAreas, foreign_keys=[id_area_attachment])
[docs] validations = relationship(TValidations, backref="attached_row")
[docs] last_validation = relationship(last_validation, uselist=False, viewonly=True)
[docs] medias = relationship( TMedias, primaryjoin=(TMedias.uuid_attached_row == foreign(unique_id_sinp)), uselist=True )
[docs] cor_observers = DB.relationship(User, secondary=cor_observer_synthese)
[docs] def _has_scope_grant(self, scope): if scope == 0: return False elif scope in (1, 2): if g.current_user == self.digitiser: return True if g.current_user in self.cor_observers: return True return self.dataset.has_instance_permission(scope) elif scope == 3: return True
[docs] def _has_permissions_grant(self, permissions): blur_sensitive_observations = current_app.config["SYNTHESE"]["BLUR_SENSITIVE_OBSERVATIONS"] if not permissions: return False for perm in permissions: if perm.has_other_filters_than("SCOPE", "SENSITIVITY"): continue # unsupported filters if perm.sensitivity_filter: if ( blur_sensitive_observations and self.nomenclature_sensitivity.cd_nomenclature == "4" ): continue if ( not blur_sensitive_observations and self.nomenclature_sensitivity.cd_nomenclature != "0" ): continue if perm.scope_value: if g.current_user == self.digitiser: return True if g.current_user in self.cor_observers: return True if self.dataset.has_instance_permission(perm.scope_value): return True continue # scope filter denied access, check next permission return True # no filter exclude this permission return False
[docs] def has_instance_permission(self, permissions): if type(permissions) == int: return self._has_scope_grant(permissions) else: return self._has_permissions_grant(permissions)
@qfilter(query=True)
[docs] def join_nomenclatures(cls, **kwargs): return kwargs["query"].options(*[joinedload(n) for n in Synthese.nomenclature_fields])
@qfilter(query=True)
[docs] def lateraljoin_last_validation(cls, **kwargs): subquery = ( select(TValidations) .where(TValidations.uuid_attached_row == Synthese.unique_id_sinp) .limit(1) .subquery() .lateral("last_validation") ) return ( kwargs["query"] .outerjoin(subquery, sa.true()) .options(contains_eager(Synthese.last_validation, alias=subquery)) )
@qfilter(query=True)
[docs] def filter_by_scope(cls, scope, user=None, **kwargs): query = kwargs["query"] if user is None: user = g.current_user if scope == 0: query = query.where(sa.false()) elif scope in (1, 2): ors = [] datasets = db.session.scalars( TDatasets.filter_by_readable(user).with_entities(TDatasets.id_dataset) ).all() query = query.where( or_( Synthese.id_digitizer == user.id_role, Synthese.cor_observers.any(id_role=user.id_role), Synthese.id_dataset.in_([ds.id_dataset for ds in datasets]), ) ) return query
@serializable
[docs] class DefaultsNomenclaturesValue(DB.Model):
[docs] __tablename__ = "defaults_nomenclatures_value"
[docs] __table_args__ = {"schema": "gn_synthese"}
[docs] mnemonique_type = DB.Column(DB.Integer, primary_key=True)
[docs] id_organism = DB.Column(DB.Integer, primary_key=True)
[docs] regne = DB.Column(DB.Unicode, primary_key=True)
[docs] group2_inpn = DB.Column(DB.Unicode, primary_key=True)
[docs] id_nomenclature = DB.Column(DB.Integer)
# Type library to list every report types @serializable
[docs] class BibReportsTypes(DB.Model):
[docs] __tablename__ = "bib_reports_types"
[docs] __table_args__ = {"schema": "gn_synthese"}
[docs] id_type = DB.Column(DB.Integer(), primary_key=True)
[docs] type = DB.Column(DB.Text())
# Relation report model with User and BibReportsTypes to get every infos about a report @serializable
[docs] class TReport(DB.Model):
[docs] __tablename__ = "t_reports"
[docs] __table_args__ = {"schema": "gn_synthese"}
[docs] id_report = DB.Column(DB.Integer(), primary_key=True)
[docs] id_synthese = DB.Column(DB.Integer(), ForeignKey("gn_synthese.synthese.id_synthese"))
[docs] id_role = DB.Column(DB.Integer(), ForeignKey(User.id_role))
[docs] id_type = DB.Column(DB.Integer(), ForeignKey(BibReportsTypes.id_type))
[docs] content = DB.Column(DB.Text())
[docs] creation_date = DB.Column(DB.DateTime(), default=datetime.datetime.utcnow)
[docs] deleted = DB.Column(DB.Boolean(), default=False)
[docs] synthese = relationship(Synthese, backref=db.backref("reports", order_by=creation_date))
[docs] report_type = relationship(BibReportsTypes)
[docs] user = DB.relationship(User)
@serializable @geoserializable(geoCol="the_geom_4326", idCol="id_synthese")
[docs] class VSyntheseForWebApp(DB.Model):
[docs] __tablename__ = "v_synthese_for_web_app"
[docs] __table_args__ = {"schema": "gn_synthese"}
[docs] id_synthese = DB.Column( DB.Integer, ForeignKey("gn_synthese.synthese.id_synthese"), primary_key=True, )
[docs] unique_id_sinp = DB.Column(UUID(as_uuid=True))
[docs] unique_id_sinp_grp = DB.Column(UUID(as_uuid=True))
[docs] id_source = DB.Column(DB.Integer, nullable=False)
[docs] id_import = DB.Column(DB.Integer, nullable=True)
[docs] id_module = DB.Column(DB.Integer)
[docs] entity_source_pk_value = DB.Column(DB.Integer)
[docs] id_dataset = DB.Column(DB.Integer)
[docs] dataset_name = DB.Column(DB.String)
[docs] id_acquisition_framework = DB.Column(DB.Integer)
[docs] count_min = DB.Column(DB.Integer)
[docs] count_max = DB.Column(DB.Integer)
[docs] cd_nom = DB.Column(DB.Integer)
[docs] cd_ref = DB.Column(DB.Unicode)
[docs] nom_cite = DB.Column(DB.Unicode)
[docs] nom_valide = DB.Column(DB.Unicode)
[docs] nom_vern = DB.Column(DB.Unicode)
[docs] lb_nom = DB.Column(DB.Unicode)
[docs] meta_v_taxref = DB.Column(DB.Unicode)
[docs] group1_inpn = DB.Column(DB.Unicode)
[docs] group2_inpn = DB.Column(DB.Unicode)
[docs] group3_inpn = DB.Column(DB.Unicode)
[docs] sample_number_proof = DB.Column(DB.Unicode)
[docs] digital_proof = DB.Column(DB.Unicode)
[docs] non_digital_proof = DB.Column(DB.Unicode)
[docs] altitude_min = DB.Column(DB.Integer)
[docs] altitude_max = DB.Column(DB.Integer)
[docs] depth_min = DB.Column(DB.Integer)
[docs] depth_max = DB.Column(DB.Integer)
[docs] place_name = DB.Column(DB.Unicode)
[docs] precision = DB.Column(DB.Integer)
[docs] the_geom_4326 = DB.Column(Geometry("GEOMETRY", 4326))
[docs] date_min = DB.Column(DB.DateTime)
[docs] date_max = DB.Column(DB.DateTime)
[docs] validator = DB.Column(DB.Unicode)
[docs] validation_comment = DB.Column(DB.Unicode)
[docs] observers = DB.Column(DB.Unicode)
[docs] determiner = DB.Column(DB.Unicode)
[docs] id_digitiser = DB.Column(DB.Integer)
[docs] comment_context = DB.Column(DB.Unicode)
[docs] comment_description = DB.Column(DB.Unicode)
[docs] meta_validation_date = DB.Column(DB.DateTime)
[docs] meta_create_date = DB.Column(DB.DateTime)
[docs] meta_update_date = DB.Column(DB.DateTime)
[docs] last_action = DB.Column(DB.Unicode)
[docs] id_nomenclature_geo_object_nature = DB.Column(DB.Integer)
[docs] id_nomenclature_info_geo_type = DB.Column(DB.Integer)
[docs] id_nomenclature_grp_typ = DB.Column(DB.Integer)
[docs] grp_method = DB.Column(DB.Unicode)
[docs] id_nomenclature_obs_technique = DB.Column(DB.Integer)
[docs] id_nomenclature_bio_status = DB.Column(DB.Integer)
[docs] id_nomenclature_bio_condition = DB.Column(DB.Integer)
[docs] id_nomenclature_naturalness = DB.Column(DB.Integer)
[docs] id_nomenclature_exist_proof = DB.Column(DB.Integer)
[docs] id_nomenclature_valid_status = DB.Column(DB.Integer)
[docs] id_nomenclature_diffusion_level = DB.Column(DB.Integer)
[docs] id_nomenclature_life_stage = DB.Column(DB.Integer)
[docs] id_nomenclature_sex = DB.Column(DB.Integer)
[docs] id_nomenclature_obj_count = DB.Column(DB.Integer)
[docs] id_nomenclature_type_count = DB.Column(DB.Integer)
[docs] id_nomenclature_sensitivity = DB.Column(DB.Integer)
[docs] id_nomenclature_observation_status = DB.Column(DB.Integer)
[docs] id_nomenclature_blurring = DB.Column(DB.Integer)
[docs] id_nomenclature_source_status = DB.Column(DB.Integer)
[docs] id_nomenclature_determination_method = DB.Column(DB.Integer)
[docs] id_nomenclature_behaviour = DB.Column(DB.Integer)
[docs] reference_biblio = DB.Column(DB.Unicode)
[docs] name_source = DB.Column(DB.Unicode)
[docs] url_source = DB.Column(DB.Unicode)
[docs] st_asgeojson = DB.Column(DB.Unicode)
[docs] medias = relationship( TMedias, primaryjoin=(TMedias.uuid_attached_row == foreign(unique_id_sinp)), uselist=True )
[docs] reports = relationship( TReport, primaryjoin=(TReport.id_synthese == foreign(id_synthese)), uselist=True )
# Non utilisé - laissé pour exemple d'une sérialisation ordonnée
[docs] def synthese_export_serialization(cls): """ Décorateur qui definit une serialisation particuliere pour la vue v_synthese_for_export Il rajoute la fonction as_dict_ordered qui conserve l'ordre des attributs tel que definit dans le model (fonctions utilisees pour les exports) et qui redefinit le nom des colonnes tel qu'ils sont nommes en configuration """ EXPORT_COLUMNS = config["SYNTHESE"]["EXPORT_COLUMNS"] # tab of cls attributes from EXPORT COLUMNS formated_default_columns = [key for key, value in EXPORT_COLUMNS.items()] # list of tuple (class attribute, serializer) cls_db_cols_and_serializer = [] # list of attributes of the class which are in the synthese export cnfig # use for generate shapefiles cls.db_cols = [] for key in formated_default_columns: # get the cls attribut: try: # get the class atribut from the syntese export config cls_attri = getattr(cls, key) # add in serialiser list if not cls_attri.type.__class__.__name__ == "Geometry": cls_db_cols_and_serializer.append( ( cls_attri.key, SERIALIZERS.get(cls_attri.type.__class__.__name__.lower(), lambda x: x), ) ) # add in cls.db_cols cls.db_cols.append(cls_attri) # execpt if the attribute does not exist except AttributeError: pass def serialize_order_fn(self): order_dict = OrderedDict() for item, _serializer in cls_db_cols_and_serializer: order_dict.update({EXPORT_COLUMNS.get(item): _serializer(getattr(self, item))}) return order_dict def serialize_geofn(self, geoCol, idCol): if not getattr(self, geoCol) is None: geometry = to_shape(getattr(self, geoCol)) else: geometry = {"type": "Point", "coordinates": [0, 0]} feature = Feature( id=str(getattr(self, idCol)), geometry=geometry, properties=self.as_dict_ordered(), ) return feature cls.as_dict_ordered = serialize_order_fn cls.as_geofeature_ordered = serialize_geofn return cls
@serializable
[docs] class VColorAreaTaxon(DB.Model):
[docs] __tablename__ = "v_color_taxon_area"
[docs] __table_args__ = {"schema": "gn_synthese"}
[docs] cd_nom = DB.Column(DB.Integer(), ForeignKey(Taxref.cd_nom), primary_key=True)
[docs] id_area = DB.Column(DB.Integer(), ForeignKey(LAreas.id_area), primary_key=True)
[docs] nb_obs = DB.Column(DB.Integer())
[docs] last_date = DB.Column(DB.DateTime())
[docs] color = DB.Column(DB.Unicode())
@serializable
[docs] class SyntheseLogEntry(DB.Model): """Log synthese table, populated with Delete Triggers on gn_synthes.synthese Parameters ---------- DB: Flask SQLAlchemy controller """
[docs] __tablename__ = "t_log_synthese"
[docs] __table_args__ = {"schema": "gn_synthese"}
[docs] query_class = SyntheseLogEntryQuery
[docs] id_synthese = DB.Column(DB.Integer(), primary_key=True)
[docs] last_action = DB.Column(DB.String(length=1))
[docs] meta_last_action_date = DB.Column(DB.DateTime)
@qfilter(query=True)
[docs] def filter_by_params(cls, params, **kwargs): query = kwargs["query"] for col in filterable_columns: if col not in params: continue column = getattr(cls, col) for value in params.getlist(col): if isinstance(column.type, DateTime): query = cls.filter_by_datetime(column, value) elif isinstance(column.type, Unicode): query = query.where(column.ilike(f"%{value}%")) else: query = query.where(column == value) return query
@qfilter(query=True)
[docs] def filter_by_datetime(cls, col, dt: str = None, **kwargs): """Filter on date only with operator among "<,>,=<,>=" Parameters ---------- filters_with_operator : dict params filters from url only Returns ------- Query """ query = kwargs["query"] if ":" in dt: operator, dt = dt.split(":", 1) else: operator = "eq" dt = datetime.datetime.fromisoformat(dt) if operator == "gt": f = col > dt elif operator == "gte": f = col >= dt elif operator == "lt": f = col < dt elif operator == "lte": f = col <= dt elif operator == "eq": # FIXME: if datetime is at midnight (looks like date), allows all the day? f = col == dt else: raise ValueError(f"Invalid comparison operator: {operator}") return query.where(f)
@qfilter(query=True)
[docs] def sort(cls, columns: List[str], *, query): if not columns: columns = ["meta_last_action_date"] for col in columns: if ":" in col: col, direction = col.rsplit(":") if direction == "asc": direction = sa.asc elif direction == "desc": direction = sa.desc else: raise ValueError(f"Invalid sort direction: {direction}") else: direction = sa.asc if col not in sortable_columns: raise ValueError(f"Invalid sort column: {col}") query = query.order_by(direction(getattr(cls, col))) return query
# defined here to avoid circular dependencies
[docs] source_subquery = ( select(TSources.id_source, Synthese.id_dataset) .where(TSources.id_source == Synthese.id_source) .distinct() .alias() )
TDatasets.sources = db.relationship( TSources, primaryjoin=TDatasets.id_dataset == source_subquery.c.id_dataset, secondaryjoin=source_subquery.c.id_source == TSources.id_source, secondary=source_subquery, viewonly=True, ) TDatasets.synthese_records_count = column_property( select(func.count(Synthese.id_synthese)) .where(Synthese.id_dataset == TDatasets.id_dataset) .scalar_subquery() .label("synthese_records_count"), deferred=True, )