Code source de geonature.core.gn_synthese.blueprints.synthese

import json

from flask import (
    Blueprint,
    request,
    current_app,
    jsonify,
    g,
)

from werkzeug.exceptions import Forbidden, NotFound, BadRequest
from sqlalchemy import func, select, case, join, and_
from sqlalchemy.orm import joinedload, lazyload, selectinload, contains_eager
from geojson import FeatureCollection, Feature

from sqlalchemy.orm import aliased, with_expression
from sqlalchemy.exc import NoResultFound

import geonature.core.gn_synthese.module  # Don't delete !
from geonature.utils.env import db, DB
from geonature.core.gn_synthese.schemas import SyntheseSchema
from geonature.core.gn_synthese.synthese_config import MANDATORY_COLUMNS
from geonature.core.gn_synthese.synthese_config import MANDATORY_COLUMNS
from geonature.core.gn_synthese.models import (
    CorAreaSynthese,
    Synthese,
    VSyntheseForWebApp,
    TReport,
)
from geonature.core.gn_synthese.utils.blurring import (
    build_allowed_geom_cte,
    build_blurred_precise_geom_queries,
    build_synthese_obs_query,
    split_blurring_precise_permissions,
)
from geonature.core.gn_synthese.utils.query_select_sqla import SyntheseQuery
from geonature.core.gn_permissions.decorators import permissions_required
from geonature.core.sensitivity.models import cor_sensitivity_area_type

from ref_geo.models import LAreas, BibAreasTypes


[docs] synthese_routes = Blueprint("synthese", __name__)
@synthese_routes.route("/for_web", methods=["GET", "POST"]) @permissions_required("R", module_code="SYNTHESE")
[docs] def get_observations_for_web(permissions): """Optimized route to serve data for the frontend with all filters. .. :quickref: Synthese; Get filtered observations Query filtered by any filter, returning all the fields of the view v_synthese_for_export:: properties = { "id": r["id_synthese"], "date_min": str(r["date_min"]), "cd_nom": r["cd_nom"], "nom_vern_or_lb_nom": r["nom_vern"] if r["nom_vern"] else r["lb_nom"], "lb_nom": r["lb_nom"], "dataset_name": r["dataset_name"], "observers": r["observers"], "url_source": r["url_source"], "unique_id_sinp": r["unique_id_sinp"], "entity_source_pk_value": r["entity_source_pk_value"], } geojson = json.loads(r["st_asgeojson"]) geojson["properties"] = properties :qparam str limit: Limit number of synthese returned. Defaults to NB_MAX_OBS_MAP. :qparam str cd_ref_parent: filtre tous les taxons enfants d'un TAXREF cd_ref. :qparam str cd_ref: Filter by TAXREF cd_ref attribute :qparam str taxonomy_group2_inpn: Filter by TAXREF group2_inpn attribute :qparam str taxonomy_id_hab: Filter by TAXREF id_habitat attribute :qparam str taxhub_attribut*: filtre générique TAXREF en fonction de l'attribut et de la valeur. :qparam str *_red_lists: filtre générique de listes rouges. Filtre sur les valeurs. Voir config. :qparam str *_protection_status: filtre générique de statuts (BdC Statuts). Filtre sur les types. Voir config. :qparam str observers: Filter on observer :qparam str id_organism: Filter on organism :qparam str date_min: Start date :qparam str date_max: End date :qparam str id_acquisition_framework: *tbd* :qparam str geoIntersection: Intersect with the geom send from the map :qparam str period_start: *tbd* :qparam str period_end: *tbd* :qparam str area*: Generic filter on area :qparam str *: Generic filter, given by colname & value :>jsonarr array data: Array of synthese with geojson key, see above :>jsonarr int nb_total: Number of observations :>jsonarr bool nb_obs_limited: Is number of observations capped """ filters = request.json if request.is_json else {} if not isinstance(filters, dict): raise BadRequest("Bad filters") result_limit = request.args.get( "limit", current_app.config["SYNTHESE"]["NB_MAX_OBS_MAP"], type=int ) result_limit = None if result_limit == -1 else result_limit output_format = request.args.get("format", "ungrouped_geom") if output_format not in ["ungrouped_geom", "grouped_geom", "grouped_geom_by_areas"]: raise BadRequest(f"Bad format '{output_format}'") # Get Column Frontend parameter to return only the needed columns param_column_list = { col["prop"] for col in current_app.config["SYNTHESE"]["LIST_COLUMNS_FRONTEND"] + current_app.config["SYNTHESE"]["ADDITIONAL_COLUMNS_FRONTEND"] } # Init with compulsory columns columns = [] for col in MANDATORY_COLUMNS: columns.extend([col, getattr(VSyntheseForWebApp, col)]) if "count_min_max" in param_column_list: count_min_max = case( ( VSyntheseForWebApp.count_min != VSyntheseForWebApp.count_max, func.concat(VSyntheseForWebApp.count_min, " - ", VSyntheseForWebApp.count_max), ), ( VSyntheseForWebApp.count_min != None, func.concat(VSyntheseForWebApp.count_min), ), else_="", ) columns += ["count_min_max", count_min_max] param_column_list.remove("count_min_max") if "nom_vern_or_lb_nom" in param_column_list: nom_vern_or_lb_nom = func.coalesce( func.nullif(VSyntheseForWebApp.nom_vern, ""), VSyntheseForWebApp.lb_nom ) columns += ["nom_vern_or_lb_nom", nom_vern_or_lb_nom] param_column_list.remove("nom_vern_or_lb_nom") for column in param_column_list: columns += [column, getattr(VSyntheseForWebApp, column)] observations_columns = func.json_build_object(*columns).label("obs_as_json") # Need to check if there are blurring permissions so that the blurring process # does not affect the performance if there is no blurring permissions blurring_permissions, precise_permissions = split_blurring_precise_permissions(permissions) if not blurring_permissions: # No need to apply blurring => same path as before blurring feature obs_query = ( select(observations_columns) .where(VSyntheseForWebApp.the_geom_4326.isnot(None)) .order_by(VSyntheseForWebApp.date_min.desc(), VSyntheseForWebApp.id_synthese.desc()) .distinct(VSyntheseForWebApp.id_synthese, VSyntheseForWebApp.date_min) .limit(result_limit) ) # Add filters to observations CTE query synthese_query_class = SyntheseQuery( VSyntheseForWebApp, obs_query, dict(filters), ) synthese_query_class.apply_all_filters(g.current_user, permissions) obs_query = synthese_query_class.build_query() geojson_column = VSyntheseForWebApp.st_asgeojson else: # Build 2 queries that will be UNIONed # Select size hierarchy if mesh mode is selected select_size_hierarchy = output_format == "grouped_geom_by_areas" blurred_geom_query, precise_geom_query = build_blurred_precise_geom_queries( filters, select_size_hierarchy=select_size_hierarchy ) allowed_geom_cte = build_allowed_geom_cte( blurring_permissions=blurring_permissions, precise_permissions=precise_permissions, blurred_geom_query=blurred_geom_query, precise_geom_query=precise_geom_query, limit=result_limit, ) obs_query = build_synthese_obs_query( observations_columns=observations_columns, allowed_geom_cte=allowed_geom_cte, limit=result_limit, ) geojson_column = func.st_asgeojson(allowed_geom_cte.c.geom) if output_format == "grouped_geom_by_areas": obs_query = obs_query.add_columns(VSyntheseForWebApp.id_synthese) # Need to select the size_hierarchy to use is after (only if blurring permissions are found) if blurring_permissions: obs_query = obs_query.add_columns( allowed_geom_cte.c.size_hierarchy.label("size_hierarchy") ) obs_query = obs_query.cte("OBS") agg_areas = ( select(CorAreaSynthese.id_synthese, LAreas.id_area) .join(CorAreaSynthese, CorAreaSynthese.id_area == LAreas.id_area) .join(BibAreasTypes, BibAreasTypes.id_type == LAreas.id_type) .where( CorAreaSynthese.id_synthese == obs_query.c.id_synthese, BibAreasTypes.type_code == current_app.config["SYNTHESE"]["AREA_AGGREGATION_TYPE"], ) ) if blurring_permissions: # Do not select cells which size_hierarchy is bigger than AREA_AGGREGATION_TYPE # It means that we do not aggregate obs that have a blurring geometry greater in # size than the aggregation area agg_areas = agg_areas.where(obs_query.c.size_hierarchy <= BibAreasTypes.size_hierarchy) agg_areas = agg_areas.lateral("agg_areas") obs_query = ( select(func.ST_AsGeoJSON(LAreas.geom_4326).label("geojson"), obs_query.c.obs_as_json) .select_from( obs_query.outerjoin( agg_areas, agg_areas.c.id_synthese == obs_query.c.id_synthese ).outerjoin(LAreas, LAreas.id_area == agg_areas.c.id_area) ) .cte("OBSERVATIONS") ) else: obs_query = obs_query.add_columns(geojson_column.label("geojson")).cte("OBSERVATIONS") if output_format == "ungrouped_geom": query = select(obs_query.c.geojson, obs_query.c.obs_as_json) else: # Group geometries with main query grouped_properties = func.json_build_object( "observations", func.json_agg(obs_query.c.obs_as_json).label("observations") ) query = select(obs_query.c.geojson, grouped_properties).group_by(obs_query.c.geojson) results = DB.session.execute(query) # Build final GeoJson geojson_features = [] for geom_as_geojson, properties in results: geojson_features.append( Feature( geometry=json.loads(geom_as_geojson) if geom_as_geojson else None, properties=properties, ) ) return jsonify(FeatureCollection(geojson_features))
@synthese_routes.route("/vsynthese/<id_synthese>", methods=["GET"]) @permissions_required("R", module_code="SYNTHESE")
[docs] def get_one_synthese(permissions, id_synthese): """Get one synthese record for web app with all decoded nomenclature""" synthese_query = Synthese.join_nomenclatures().options( joinedload("dataset").options( selectinload("acquisition_framework").options( joinedload("creator"), joinedload("nomenclature_territorial_level"), joinedload("nomenclature_financing_type"), ), ), # Used to check the sensitivity after joinedload("nomenclature_sensitivity"), ) ################## fields = [ "dataset", "dataset.acquisition_framework", "dataset.acquisition_framework.bibliographical_references", "dataset.acquisition_framework.cor_af_actor", "dataset.acquisition_framework.cor_objectifs", "dataset.acquisition_framework.cor_territories", "dataset.acquisition_framework.cor_volets_sinp", "dataset.acquisition_framework.creator", "dataset.acquisition_framework.nomenclature_territorial_level", "dataset.acquisition_framework.nomenclature_financing_type", "dataset.cor_dataset_actor", "dataset.cor_dataset_actor.role", "dataset.cor_dataset_actor.organism", "dataset.cor_territories", "dataset.nomenclature_source_status", "dataset.nomenclature_resource_type", "dataset.nomenclature_dataset_objectif", "dataset.nomenclature_data_type", "dataset.nomenclature_data_origin", "dataset.nomenclature_collecting_method", "dataset.creator", "dataset.modules", "validations", "validations.validation_label", "validations.validator_role", "cor_observers", "cor_observers.organisme", "source", "habitat", "medias", "areas", "areas.area_type", ] # get reports info only if activated by admin config if "SYNTHESE" in current_app.config["SYNTHESE"]["ALERT_MODULES"]: fields.append("reports.report_type.type") synthese_query = synthese_query.options( lazyload(Synthese.reports).joinedload(TReport.report_type) ) try: synthese = ( db.session.execute(synthese_query.filter_by(id_synthese=id_synthese)) .unique() .scalar_one() ) except NoResultFound: raise NotFound() if not synthese.has_instance_permission(permissions=permissions): raise Forbidden() _, precise_permissions = split_blurring_precise_permissions(permissions) # If blurring permissions and obs sensitive. if ( not synthese.has_instance_permission(precise_permissions) and synthese.nomenclature_sensitivity.cd_nomenclature != "0" ): # Use a cte to have the areas associated with the current id_synthese cte = select(CorAreaSynthese).where(CorAreaSynthese.id_synthese == id_synthese).cte() # Blurred area of the observation BlurredObsArea = aliased(LAreas) # Blurred area type of the observation BlurredObsAreaType = aliased(BibAreasTypes) # Types "larger" or equal in area hierarchy size that the blurred area type BlurredAreaTypes = aliased(BibAreasTypes) # Areas associates with the BlurredAreaTypes BlurredAreas = aliased(LAreas) # Inner join that retrieve the blurred area of the obs and the bigger areas # used for "Zonages" in Synthese. Need to have size_hierarchy from ref_geo inner = ( join(CorAreaSynthese, BlurredObsArea) .join(BlurredObsAreaType) .join( cor_sensitivity_area_type, cor_sensitivity_area_type.c.id_area_type == BlurredObsAreaType.id_type, ) .join( BlurredAreaTypes, BlurredAreaTypes.size_hierarchy >= BlurredObsAreaType.size_hierarchy, ) .join(BlurredAreas, BlurredAreaTypes.id_type == BlurredAreas.id_type) .join(cte, cte.c.id_area == BlurredAreas.id_area) ) # Outer join to join CorAreaSynthese taking into account the sensitivity outer = ( inner, and_( Synthese.id_synthese == CorAreaSynthese.id_synthese, Synthese.id_nomenclature_sensitivity == cor_sensitivity_area_type.c.id_nomenclature_sensitivity, ), ) synthese_query = ( synthese_query.outerjoin(*outer) # contains_eager: to populate Synthese.areas directly .options(contains_eager(Synthese.areas.of_type(BlurredAreas))) .options( with_expression( Synthese.the_geom_authorized, func.coalesce(BlurredObsArea.geom_4326, Synthese.the_geom_4326), ) ) .order_by(BlurredAreaTypes.size_hierarchy) ) else: synthese_query = synthese_query.options( lazyload("areas").options( joinedload("area_type"), ), with_expression(Synthese.the_geom_authorized, Synthese.the_geom_4326), ) synthese = ( db.session.execute(synthese_query.filter(Synthese.id_synthese == id_synthese)) .unique() .scalar_one() ) synthese_schema = SyntheseSchema( only=Synthese.nomenclature_fields + fields, exclude=["areas.geom"], as_geojson=True, feature_geometry="the_geom_authorized", ) return synthese_schema.dump(synthese)