Code source de geonature.core.gn_synthese.utils.query_select_sqla

"""
Utility function to manage permissions and all filter of Synthese
Use these functions rather than query.py
Filter the query of synthese using SQLA expression language and 'select' object
https://docs.sqlalchemy.org/en/latest/core/tutorial.html#selecting
much more efficient
"""

import datetime
import unicodedata
import uuid

from flask import current_app

import sqlalchemy as sa
from sqlalchemy import func, or_, and_, select, distinct
from sqlalchemy.sql import text
from sqlalchemy.orm import aliased
from werkzeug.exceptions import BadRequest
from shapely.geometry import shape
from geoalchemy2.shape import from_shape

from geonature.utils.env import DB

from geonature.core.gn_commons.models import TModules
from geonature.core.gn_synthese.models import (
    CorObserverSynthese,
    CorAreaSynthese,
    BibReportsTypes,
    TReport,
    TSources,
)
from geonature.core.gn_meta.models import (
    CorDatasetActor,
    TDatasets,
)
from geonature.utils.errors import GeonatureApiError
from apptax.taxonomie.models import (
    Taxref,
    CorTaxonAttribut,
    TaxrefBdcStatutTaxon,
    bdc_statut_cor_text_area,
    TaxrefBdcStatutCorTextValues,
    TaxrefBdcStatutText,
    TaxrefBdcStatutValues,
)
from ref_geo.models import LAreas, BibAreasTypes
from utils_flask_sqla_geo.schema import FeatureSchema, FeatureCollectionSchema
from pypnnomenclature.models import TNomenclatures, BibNomenclaturesTypes


[docs] class SyntheseQuery: """ class for building synthese query and manage join Attributes: query: SQLA select object filters: dict of query string filters model: a SQLA model _already_joined_table: (private) a list of already joined table. Auto build with 'add_join' method query_joins = SQLA Join object """ def __init__( self, model, query, filters, id_synthese_column="id_synthese", id_dataset_column="id_dataset", observers_column="observers", id_digitiser_column="id_digitiser", with_generic_table=False, query_joins=None, geom_column=None, ): self.query = query self.filters = filters self.first = query_joins is None self.model = model self._already_joined_table = [] self.query_joins = query_joins self.geom_column = geom_column if geom_column is not None else model.the_geom_4326 if with_generic_table: model_temp = model.columns else: model_temp = model # get the mandatory column try: self.model_id_syn_col = getattr(model_temp, id_synthese_column) self.model_id_dataset_column = getattr(model_temp, id_dataset_column) self.model_observers_column = getattr(model_temp, observers_column) self.model_id_digitiser_column = getattr(model_temp, id_digitiser_column) except AttributeError as e: raise GeonatureApiError( """the {model} table does not have a column {e} If you change the {model} table, please edit your synthese config (cf EXPORT_***_COL) """.format( e=e, model=model ) )
[docs] def add_join(self, right_table, right_column, left_column, join_type="right"): if self.first: if join_type == "right": self.query_joins = self.model.__table__.join( right_table, left_column == right_column ) else: self.query_joins = self.model.__table__.outerjoin( right_table, left_column == right_column ) self.first = False self._already_joined_table.append(right_table) else: # check if the table not already joined if right_table not in self._already_joined_table: self.query_joins = self.query_joins.join(right_table, left_column == right_column) # push the joined table in _already_joined_table list self._already_joined_table.append(right_table)
[docs] def add_join_multiple_cond(self, right_table, conditions): if self.first: self.query_joins = self.model.__table__.join(right_table, and_(*conditions)) self.first = False else: # check if the table not already joined if right_table not in self._already_joined_table: self.query_joins = self.query_joins.join(right_table, and_(*conditions)) # push the joined table in _already_joined_table list self._already_joined_table.append(right_table)
[docs] def build_permissions_filter(self, user, permissions): """ Return a where clause for the given permissions set """ blur_sensitive_observations = current_app.config["SYNTHESE"]["BLUR_SENSITIVE_OBSERVATIONS"] subquery_observers = ( select(CorObserverSynthese.id_synthese) .select_from(CorObserverSynthese) .where(CorObserverSynthese.id_role == user.id_role) ) datasets_by_scope = {} # to avoid fetching datasets several time for same scope permissions_filters = [] excluded_sensitivity = None for perm in permissions: if perm.has_other_filters_than("SCOPE", "SENSITIVITY"): continue perm_filters = [] if perm.sensitivity_filter: if excluded_sensitivity is None: excluded_sensitivity = TNomenclatures.query.where( TNomenclatures.nomenclature_type.has( BibNomenclaturesTypes.mnemonique == "SENSIBILITE" ) ) if not blur_sensitive_observations: excluded_sensitivity = excluded_sensitivity.where( TNomenclatures.cd_nomenclature > "0" ) else: excluded_sensitivity = excluded_sensitivity.where( TNomenclatures.cd_nomenclature == "4" ) excluded_sensitivity = excluded_sensitivity.all() perm_filters.append( self.model.id_nomenclature_sensitivity.notin_( (nomenclature.id_nomenclature for nomenclature in excluded_sensitivity) ) ) if perm.scope_value: if perm.scope_value not in datasets_by_scope: datasets_t = ( DB.session.scalars(TDatasets.filter_by_scope(perm.scope_value)) .unique() .all() ) datasets_by_scope[perm.scope_value] = [d.id_dataset for d in datasets_t] datasets = datasets_by_scope[perm.scope_value] scope_filters = [ self.model_id_syn_col.in_(subquery_observers), # user is observer self.model_id_digitiser_column == user.id_role, # user id digitizer self.model_id_dataset_column.in_( datasets ), # user is dataset (or parent af) actor ] perm_filters.append(or_(*scope_filters)) if perm_filters: permissions_filters.append(and_(*perm_filters)) else: permissions_filters.append(sa.true()) if permissions_filters: return or_(*permissions_filters) else: return sa.false()
[docs] def filter_query_with_permissions(self, user, permissions): """ Filter the query with the permissions of a user """ where_clause = self.build_permissions_filter(user=user, permissions=permissions) self.query = self.query.where(where_clause)
[docs] def filter_query_with_cruved(self, user, scope): """ Filter the query with the cruved authorization of a user """ if scope in (1, 2): # get id synthese where user is observer subquery_observers = ( select(CorObserverSynthese.id_synthese) .select_from(CorObserverSynthese) .where(CorObserverSynthese.id_role == user.id_role) ) ors_filters = [ self.model_id_syn_col.in_(subquery_observers), self.model_id_digitiser_column == user.id_role, ] datasets = DB.session.scalars(TDatasets.filter_by_scope(scope)).unique().all() allowed_datasets = [dataset.id_dataset for dataset in datasets] ors_filters.append(self.model_id_dataset_column.in_(allowed_datasets)) self.query = self.query.where(or_(*ors_filters))
[docs] def filter_taxonomy(self): """ Filters the query with taxonomic attributes Parameters: - q (SQLAchemyQuery): an SQLAchemy query - filters (dict): a dict of filter Returns: -Tuple: the SQLAlchemy query and the filter dictionnary """ cd_ref_childs = [] if "cd_ref_parent" in self.filters: # find all taxon child from cd_ref parent cd_ref_parent_int = list(map(lambda x: int(x), self.filters.pop("cd_ref_parent"))) sql = text( """SELECT DISTINCT cd_ref FROM taxonomie.find_all_taxons_children(:id_parent)""" ) result = DB.engine.execute(sql, id_parent=cd_ref_parent_int) if result: cd_ref_childs = [r[0] for r in result] cd_ref_selected = [] if "cd_ref" in self.filters: cd_ref_selected = self.filters.pop("cd_ref") # concat cd_ref child and just selected cd_ref cd_ref_childs.extend(cd_ref_selected) if len(cd_ref_childs) > 0: self.add_join(Taxref, Taxref.cd_nom, self.model.cd_nom) self.query = self.query.where(Taxref.cd_ref.in_(cd_ref_childs)) if "taxonomy_id_hab" in self.filters: self.add_join(Taxref, Taxref.cd_nom, self.model.cd_nom) self.query = self.query.where( Taxref.id_habitat.in_(self.filters.pop("taxonomy_id_hab")) ) aliased_cor_taxon_attr = {} protection_status_value = [] red_list_filters = {} for colname, value in self.filters.items(): if colname.startswith("taxonomy_group"): # colname = group type (group2 or group3 inpn) # value = list of group values colname = colname.split("taxonomy_")[-1] self.add_join(Taxref, Taxref.cd_nom, self.model.cd_nom) self.query = self.query.where(getattr(Taxref, colname).in_(value)) if colname.startswith("taxhub_attribut"): self.add_join(Taxref, Taxref.cd_nom, self.model.cd_nom) taxhub_id_attr = colname[16:] aliased_cor_taxon_attr[taxhub_id_attr] = aliased(CorTaxonAttribut) self.add_join_multiple_cond( aliased_cor_taxon_attr[taxhub_id_attr], [ aliased_cor_taxon_attr[taxhub_id_attr].id_attribut == taxhub_id_attr, aliased_cor_taxon_attr[taxhub_id_attr].cd_ref == func.taxonomie.find_cdref(self.model.cd_nom), ], ) self.query = self.query.where( aliased_cor_taxon_attr[taxhub_id_attr].valeur_attribut.in_(value) ) elif colname.endswith("_red_lists"): red_list_id = colname.replace("_red_lists", "") all_red_lists_cfg = current_app.config["SYNTHESE"]["RED_LISTS_FILTERS"] red_list_cfg = next( (item for item in all_red_lists_cfg if item["id"] == red_list_id), None ) red_list_filters[red_list_cfg["status_type"]] = value elif colname.endswith("_protection_status"): status_id = colname.replace("_protection_status", "") all_status_cfg = current_app.config["SYNTHESE"]["STATUS_FILTERS"] status_cfg = next( (item for item in all_status_cfg if item["id"] == status_id), None ) # Check if a checkbox was used. if ( isinstance(value, bool) and value == True and len(status_cfg["status_types"]) == 1 ): value = status_cfg["status_types"] protection_status_value += value if protection_status_value or red_list_filters: self.build_bdc_status_pr_nb_lateral_join(protection_status_value, red_list_filters) # remove attributes taxhub from filters self.filters = { colname: value for colname, value in self.filters.items() if not colname.startswith("taxhub_attribut") }
[docs] def filter_other_filters(self, user): """ Other filters """ if "has_medias" in self.filters: media_filter = self.model.medias.any() if self.filters["has_medias"] is False: media_filter = ~media_filter self.query = self.query.where(media_filter) if "has_alert" in self.filters: alert_filter = self.model.reports.any( TReport.report_type.has(BibReportsTypes.type == "alert") ) if self.filters["has_alert"] is False: alert_filter = ~alert_filter self.query = self.query.where(alert_filter) if "has_pin" in self.filters: pin_filter = self.model.reports.any( and_( TReport.report_type.has(BibReportsTypes.type == "pin"), TReport.id_role == user.id_role, ) ) if self.filters["has_pin"] is False: pin_filter = ~pin_filter self.query = self.query.where(pin_filter) if "has_comment" in self.filters: comment_filter = self.model.reports.any( TReport.report_type.has(BibReportsTypes.type == "discussion") ) if self.filters["has_comment"] is False: comment_filter = ~comment_filter self.query = self.query.where(comment_filter) if "id_dataset" in self.filters: self.query = self.query.where(self.model.id_dataset.in_(self.filters.pop("id_dataset"))) if "observers" in self.filters: # découpe des éléments saisies par des "," observers = self.filters.pop("observers").split(",") self.query = self.query.where( or_( *[ func.unaccent(self.model.observers).ilike( "%" + remove_accents(observer) + "%" ) for observer in observers ] ) ) if "observers_list" in self.filters: self.query = self.query.where( and_( *[ self.model.observers.ilike("%" + observer.get("nom_complet") + "%") for observer in self.filters.pop("observers_list") ] ) ) if "id_organism" in self.filters: datasets = DB.session.scalars( select(CorDatasetActor.id_dataset).where( CorDatasetActor.id_organism.in_(self.filters.pop("id_organism")) ) ).all() formated_datasets = [d for d in datasets] self.query = self.query.where(self.model.id_dataset.in_(formated_datasets)) if "date_min" in self.filters: self.query = self.query.where(self.model.date_min >= self.filters.pop("date_min")) if "date_max" in self.filters: # set the date_max at 23h59 because a hour can be set in timestamp date_max = datetime.datetime.strptime(self.filters.pop("date_max"), "%Y-%m-%d") date_max = date_max.replace(hour=23, minute=59, second=59) self.query = self.query.where(self.model.date_max <= date_max) if "id_source" in self.filters: self.add_join(TSources, self.model.id_source, TSources.id_source) self.query = self.query.where(self.model.id_source.in_(self.filters.pop("id_source"))) if "id_module" in self.filters: self.query = self.query.where(self.model.id_module.in_(self.filters.pop("id_module"))) if "id_acquisition_framework" in self.filters: if hasattr(self.model, "id_acquisition_framework"): self.query = self.query.where( self.model.id_acquisition_framework.in_( self.filters.pop("id_acquisition_framework") ) ) else: self.add_join(TDatasets, self.model.id_dataset, TDatasets.id_dataset) self.query = self.query.where( TDatasets.id_acquisition_framework.in_( self.filters.pop("id_acquisition_framework") ) ) if "geoIntersection" in self.filters: # Insersect with the geom send from the map geojson = self.filters["geoIntersection"] if type(geojson) is not dict or "type" not in geojson: raise BadRequest("geoIntersection is missing type") if geojson["type"] == "Feature": features = [FeatureSchema().load(geojson)] elif geojson["type"] == "FeatureCollection": features = FeatureCollectionSchema().load(geojson)["features"] else: raise BadRequest("Unsupported geoIntersection type") geo_filters = [] for feature in features: geom_wkb = from_shape(shape(feature["geometry"]), srid=4326) # if the geom is a circle if "radius" in feature["properties"]: radius = feature["properties"]["radius"] geo_filter = func.ST_DWithin( func.ST_GeogFromWKB(self.geom_column), func.ST_GeogFromWKB(geom_wkb), radius, ) else: geo_filter = self.geom_column.ST_Intersects(geom_wkb) geo_filters.append(geo_filter) self.query = self.query.where(or_(*geo_filters)) self.filters.pop("geoIntersection") if "period_start" in self.filters and "period_end" in self.filters: period_start = self.filters.pop("period_start") period_end = self.filters.pop("period_end") self.query = self.query.where( or_( func.gn_commons.is_in_period( func.date(self.model.date_min), func.to_date(period_start, "DD-MM"), func.to_date(period_end, "DD-MM"), ), func.gn_commons.is_in_period( func.date(self.model.date_max), func.to_date(period_start, "DD-MM"), func.to_date(period_end, "DD-MM"), ), ) ) if "unique_id_sinp" in self.filters: try: uuid_filter = uuid.UUID(self.filters.pop("unique_id_sinp")) except ValueError as e: raise BadRequest(str(e)) self.query = self.query.where(self.model.unique_id_sinp == uuid_filter) # generic filters for colname, value in self.filters.items(): if colname.startswith("area"): if self.geom_column.class_ != self.model: l_areas_cte = LAreas.query.filter(LAreas.id_area.in_(value)).cte("area_filter") self.query = self.query.where( func.ST_Intersects(self.geom_column, l_areas_cte.c.geom) ) else: cor_area_synthese_alias = aliased(CorAreaSynthese) self.add_join( cor_area_synthese_alias, cor_area_synthese_alias.id_synthese, self.model.id_synthese, ) self.query = self.query.where(cor_area_synthese_alias.id_area.in_(value)) elif colname.startswith("id_"): col = getattr(self.model.__table__.columns, colname) if isinstance(value, list): self.query = self.query.where(col.in_(value)) else: self.query = self.query.where(col == value) elif hasattr(self.model.__table__.columns, colname): col = getattr(self.model.__table__.columns, colname) if str(col.type) == "INTEGER": if colname in ["precision"]: self.query = self.query.where(col <= value) else: self.query = self.query.where(col == value) else: self.query = self.query.where(col.ilike("%{}%".format(value)))
[docs] def apply_all_filters(self, user, permissions): if type(permissions) == int: # scope self.filter_query_with_cruved(user, scope=permissions) else: self.filter_query_with_permissions(user, permissions) self.filter_taxonomy() self.filter_other_filters(user)
[docs] def build_query(self): if self.query_joins is not None: self.query = self.query.select_from(self.query_joins) return self.query
[docs] def filter_query_all_filters(self, user, permissions): """High level function to manage query with all filters. Apply CRUVED, taxonomy and other filters. Parameters ---------- user: str User filtered by CRUVED. Returns ------- sqlalchemy.orm.query.Query.filter Combined filter to apply. """ self.apply_all_filters(user, permissions) return self.build_query()
[docs] def build_bdc_status_pr_nb_lateral_join(self, protection_status_value, red_list_filters): """ Create subquery for bdc_status filters Objectif : filtrer les données ayant : - les statuts du type demandé par l'utilisateur - les status s'appliquent bien sur la zone géographique de la donnée (c-a-d le département) Idée de façon à limiter le nombre de sous reqêtes, la liste des status selectionnés par l'utilisateur s'appliquant à l'observation est aggrégée de façon à tester le nombre puis jointé sur le département de la donnée """ # Ajout de la table taxref si non ajouté self.add_join(Taxref, Taxref.cd_nom, self.model.cd_nom) # Ajout jointure permettant d'avoir le département pour chaque donnée cas_dep = aliased(CorAreaSynthese) lareas_dep = aliased(LAreas) bib_area_dep = aliased(BibAreasTypes) self.add_join(cas_dep, cas_dep.id_synthese, self.model.id_synthese) self.add_join(lareas_dep, lareas_dep.id_area, cas_dep.id_area) self.add_join_multiple_cond( bib_area_dep, [bib_area_dep.id_type == lareas_dep.id_type, bib_area_dep.type_code == "DEP"], ) # Creation requête CTE : taxon, zone d'application départementale des textes # pour les taxons répondant aux critères de selection bdc_status_cte = ( select( TaxrefBdcStatutTaxon.cd_ref, func.array_agg(bdc_statut_cor_text_area.c.id_area).label("ids_area"), ) .select_from( TaxrefBdcStatutTaxon.__table__.join( TaxrefBdcStatutCorTextValues, TaxrefBdcStatutCorTextValues.id_value_text == TaxrefBdcStatutTaxon.id_value_text, ) .join( TaxrefBdcStatutText, TaxrefBdcStatutText.id_text == TaxrefBdcStatutCorTextValues.id_text, ) .join( TaxrefBdcStatutValues, TaxrefBdcStatutValues.id_value == TaxrefBdcStatutCorTextValues.id_value, ) .join( bdc_statut_cor_text_area, bdc_statut_cor_text_area.c.id_text == TaxrefBdcStatutText.id_text, ) ) .where(TaxrefBdcStatutText.enable == True) ) # ajout des filtres de selection des textes bdc_status_filters = [] if red_list_filters: bdc_status_filters = [ and_( TaxrefBdcStatutValues.code_statut.in_(v), TaxrefBdcStatutText.cd_type_statut == k, ) for k, v in red_list_filters.items() ] if protection_status_value: bdc_status_filters.append( TaxrefBdcStatutText.cd_type_statut.in_(protection_status_value) ) bdc_status_cte = bdc_status_cte.where(or_(*bdc_status_filters)) # group by de façon à ne selectionner que les taxons # qui ont les textes selectionnés par l'utilisateurs bdc_status_cte = bdc_status_cte.group_by(TaxrefBdcStatutTaxon.cd_ref).having( func.count(distinct(TaxrefBdcStatutText.cd_type_statut)) == (len(protection_status_value) + len(red_list_filters)) ) bdc_status_cte = bdc_status_cte.cte(name="status") # Jointure sur le taxon # et vérification que l'ensemble des textes # soit sur bien sur le département de l'observation self.add_join_multiple_cond( bdc_status_cte, [ bdc_status_cte.c.cd_ref == Taxref.cd_ref, func.array_length( func.array_positions(bdc_status_cte.c.ids_area, cas_dep.id_area), 1 ) == (len(protection_status_value) + len(red_list_filters)), ], )
[docs] def remove_accents(input_str): nfkd_form = unicodedata.normalize("NFKD", input_str) return "".join([c for c in nfkd_form if not unicodedata.combining(c)])