import json
import datetime
import re
from collections import OrderedDict
from warnings import warn
from flask import (
Blueprint,
abort,
request,
Response,
current_app,
send_from_directory,
render_template,
jsonify,
g,
)
from geonature.core.gn_synthese.schemas import SyntheseSchema
from geonature.core.gn_synthese.synthese_config import MANDATORY_COLUMNS
from pypnusershub.db.models import User
from pypnnomenclature.models import BibNomenclaturesTypes, TNomenclatures
from werkzeug.exceptions import Forbidden, NotFound, BadRequest, Conflict
from werkzeug.datastructures import MultiDict
from sqlalchemy import distinct, func, desc, asc, select, case
from sqlalchemy.orm import joinedload, lazyload, selectinload, contains_eager
from geojson import FeatureCollection, Feature
import sqlalchemy as sa
from sqlalchemy.orm import load_only, aliased, Load, with_expression
from sqlalchemy.exc import NoResultFound
from utils_flask_sqla.generic import serializeQuery, GenericTable
from utils_flask_sqla.response import to_csv_resp, to_json_resp, json_resp
from utils_flask_sqla_geo.generic import GenericTableGeo
from geonature.utils import filemanager
from geonature.utils.env import db, DB
from geonature.utils.errors import GeonatureApiError
from geonature.utils.utilsgeometrytools import export_as_geo_file
from geonature.core.gn_meta.models import TDatasets
from geonature.core.notifications.utils import dispatch_notifications
from geonature.core.gn_synthese.models import (
BibReportsTypes,
CorAreaSynthese,
DefaultsNomenclaturesValue,
Synthese,
TSources,
VSyntheseForWebApp,
VColorAreaTaxon,
TReport,
SyntheseLogEntry,
)
from geonature.core.gn_synthese.synthese_config import MANDATORY_COLUMNS
from geonature.core.gn_synthese.utils.blurring import (
build_allowed_geom_cte,
build_blurred_precise_geom_queries,
build_synthese_obs_query,
split_blurring_precise_permissions,
)
from geonature.core.gn_synthese.utils.query_select_sqla import SyntheseQuery
from geonature.core.gn_synthese.utils.orm import is_already_joined
from geonature.core.gn_permissions import decorators as permissions
from geonature.core.gn_permissions.decorators import login_required, permissions_required
from geonature.core.gn_permissions.tools import get_scopes_by_action, get_permissions
from geonature.core.sensitivity.models import cor_sensitivity_area_type
from ref_geo.models import LAreas, BibAreasTypes
from apptax.taxonomie.models import (
Taxref,
bdc_statut_cor_text_area,
Taxref,
TaxrefBdcStatutCorTextValues,
TaxrefBdcStatutTaxon,
TaxrefBdcStatutText,
TaxrefBdcStatutType,
TaxrefBdcStatutValues,
VMTaxrefListForautocomplete,
)
[docs]
routes = Blueprint("gn_synthese", __name__)
############################################
########### GET OBSERVATIONS ##############
############################################
@routes.route("/for_web", methods=["GET", "POST"])
@permissions_required("R", module_code="SYNTHESE")
[docs]
def get_observations_for_web(permissions):
"""Optimized route to serve data for the frontend with all filters.
.. :quickref: Synthese; Get filtered observations
Query filtered by any filter, returning all the fields of the
view v_synthese_for_export::
properties = {
"id": r["id_synthese"],
"date_min": str(r["date_min"]),
"cd_nom": r["cd_nom"],
"nom_vern_or_lb_nom": r["nom_vern"] if r["nom_vern"] else r["lb_nom"],
"lb_nom": r["lb_nom"],
"dataset_name": r["dataset_name"],
"observers": r["observers"],
"url_source": r["url_source"],
"unique_id_sinp": r["unique_id_sinp"],
"entity_source_pk_value": r["entity_source_pk_value"],
}
geojson = json.loads(r["st_asgeojson"])
geojson["properties"] = properties
:qparam str limit: Limit number of synthese returned. Defaults to NB_MAX_OBS_MAP.
:qparam str cd_ref_parent: filtre tous les taxons enfants d'un TAXREF cd_ref.
:qparam str cd_ref: Filter by TAXREF cd_ref attribute
:qparam str taxonomy_group2_inpn: Filter by TAXREF group2_inpn attribute
:qparam str taxonomy_id_hab: Filter by TAXREF id_habitat attribute
:qparam str taxhub_attribut*: filtre générique TAXREF en fonction de l'attribut et de la valeur.
:qparam str *_red_lists: filtre générique de listes rouges. Filtre sur les valeurs. Voir config.
:qparam str *_protection_status: filtre générique de statuts (BdC Statuts). Filtre sur les types. Voir config.
:qparam str observers: Filter on observer
:qparam str id_organism: Filter on organism
:qparam str date_min: Start date
:qparam str date_max: End date
:qparam str id_acquisition_framework: *tbd*
:qparam str geoIntersection: Intersect with the geom send from the map
:qparam str period_start: *tbd*
:qparam str period_end: *tbd*
:qparam str area*: Generic filter on area
:qparam str *: Generic filter, given by colname & value
:>jsonarr array data: Array of synthese with geojson key, see above
:>jsonarr int nb_total: Number of observations
:>jsonarr bool nb_obs_limited: Is number of observations capped
"""
filters = request.json if request.is_json else {}
if type(filters) != dict:
raise BadRequest("Bad filters")
result_limit = request.args.get(
"limit", current_app.config["SYNTHESE"]["NB_MAX_OBS_MAP"], type=int
)
output_format = request.args.get("format", "ungrouped_geom")
if output_format not in ["ungrouped_geom", "grouped_geom", "grouped_geom_by_areas"]:
raise BadRequest(f"Bad format '{output_format}'")
# Get Column Frontend parameter to return only the needed columns
param_column_list = {
col["prop"]
for col in current_app.config["SYNTHESE"]["LIST_COLUMNS_FRONTEND"]
+ current_app.config["SYNTHESE"]["ADDITIONAL_COLUMNS_FRONTEND"]
}
# Init with compulsory columns
columns = []
for col in MANDATORY_COLUMNS:
columns.extend([col, getattr(VSyntheseForWebApp, col)])
if "count_min_max" in param_column_list:
count_min_max = case(
(
VSyntheseForWebApp.count_min != VSyntheseForWebApp.count_max,
func.concat(VSyntheseForWebApp.count_min, " - ", VSyntheseForWebApp.count_max),
),
(
VSyntheseForWebApp.count_min != None,
func.concat(VSyntheseForWebApp.count_min),
),
else_="",
)
columns += ["count_min_max", count_min_max]
param_column_list.remove("count_min_max")
if "nom_vern_or_lb_nom" in param_column_list:
nom_vern_or_lb_nom = func.coalesce(
func.nullif(VSyntheseForWebApp.nom_vern, ""), VSyntheseForWebApp.lb_nom
)
columns += ["nom_vern_or_lb_nom", nom_vern_or_lb_nom]
param_column_list.remove("nom_vern_or_lb_nom")
for column in param_column_list:
columns += [column, getattr(VSyntheseForWebApp, column)]
observations = func.json_build_object(*columns).label("obs_as_json")
# Need to check if there are blurring permissions so that the blurring process
# does not affect the performance if there is no blurring permissions
blurring_permissions, precise_permissions = split_blurring_precise_permissions(permissions)
if not blurring_permissions:
# No need to apply blurring => same path as before blurring feature
obs_query = (
select(observations)
.where(VSyntheseForWebApp.the_geom_4326.isnot(None))
.order_by(VSyntheseForWebApp.date_min.desc())
.limit(result_limit)
)
# Add filters to observations CTE query
synthese_query_class = SyntheseQuery(
VSyntheseForWebApp,
obs_query,
dict(filters),
)
synthese_query_class.apply_all_filters(g.current_user, permissions)
obs_query = synthese_query_class.build_query()
geojson_column = VSyntheseForWebApp.st_asgeojson
else:
# Build 2 queries that will be UNIONed
# Select size hierarchy if mesh mode is selected
select_size_hierarchy = output_format == "grouped_geom_by_areas"
blurred_geom_query, precise_geom_query = build_blurred_precise_geom_queries(
filters, select_size_hierarchy=select_size_hierarchy
)
allowed_geom_cte = build_allowed_geom_cte(
blurring_permissions=blurring_permissions,
precise_permissions=precise_permissions,
blurred_geom_query=blurred_geom_query,
precise_geom_query=precise_geom_query,
limit=result_limit,
)
obs_query = build_synthese_obs_query(
observations=observations,
allowed_geom_cte=allowed_geom_cte,
limit=result_limit,
)
geojson_column = func.st_asgeojson(allowed_geom_cte.c.geom)
if output_format == "grouped_geom_by_areas":
obs_query = obs_query.add_columns(VSyntheseForWebApp.id_synthese)
# Need to select the size_hierarchy to use is after (only if blurring permissions are found)
if blurring_permissions:
obs_query = obs_query.add_columns(
allowed_geom_cte.c.size_hierarchy.label("size_hierarchy")
)
obs_query = obs_query.cte("OBS")
agg_areas = (
select(CorAreaSynthese.id_synthese, LAreas.id_area)
.join(CorAreaSynthese, CorAreaSynthese.id_area == LAreas.id_area)
.join(BibAreasTypes, BibAreasTypes.id_type == LAreas.id_type)
.where(CorAreaSynthese.id_synthese == obs_query.c.id_synthese)
.where(
BibAreasTypes.type_code == current_app.config["SYNTHESE"]["AREA_AGGREGATION_TYPE"]
)
)
if blurring_permissions:
# Do not select cells which size_hierarchy is bigger than AREA_AGGREGATION_TYPE
# It means that we do not aggregate obs that have a blurring geometry greater in
# size than the aggregation area
agg_areas = agg_areas.where(obs_query.c.size_hierarchy <= BibAreasTypes.size_hierarchy)
agg_areas = agg_areas.lateral("agg_areas")
obs_query = (
select(func.ST_AsGeoJSON(LAreas.geom_4326).label("geojson"), obs_query.c.obs_as_json)
.select_from(
obs_query.outerjoin(
agg_areas, agg_areas.c.id_synthese == obs_query.c.id_synthese
).outerjoin(LAreas, LAreas.id_area == agg_areas.c.id_area)
)
.cte("OBSERVATIONS")
)
else:
obs_query = obs_query.add_columns(geojson_column.label("geojson")).cte("OBSERVATIONS")
if output_format == "ungrouped_geom":
query = select(obs_query.c.geojson, obs_query.c.obs_as_json)
else:
# Group geometries with main query
grouped_properties = func.json_build_object(
"observations", func.json_agg(obs_query.c.obs_as_json).label("observations")
)
query = select(obs_query.c.geojson, grouped_properties).group_by(obs_query.c.geojson)
results = DB.session.execute(query)
# Build final GeoJson
geojson_features = []
for geom_as_geojson, properties in results:
geojson_features.append(
Feature(
geometry=json.loads(geom_as_geojson) if geom_as_geojson else None,
properties=properties,
)
)
return jsonify(FeatureCollection(geojson_features))
@routes.route("/vsynthese/<id_synthese>", methods=["GET"])
@permissions_required("R", module_code="SYNTHESE")
[docs]
def get_one_synthese(permissions, id_synthese):
"""Get one synthese record for web app with all decoded nomenclature"""
synthese_query = Synthese.join_nomenclatures().options(
joinedload("dataset").options(
selectinload("acquisition_framework").options(
joinedload("creator"),
joinedload("nomenclature_territorial_level"),
joinedload("nomenclature_financing_type"),
),
),
# Used to check the sensitivity after
joinedload("nomenclature_sensitivity"),
)
##################
fields = [
"dataset",
"dataset.acquisition_framework",
"dataset.acquisition_framework.bibliographical_references",
"dataset.acquisition_framework.cor_af_actor",
"dataset.acquisition_framework.cor_objectifs",
"dataset.acquisition_framework.cor_territories",
"dataset.acquisition_framework.cor_volets_sinp",
"dataset.acquisition_framework.creator",
"dataset.acquisition_framework.nomenclature_territorial_level",
"dataset.acquisition_framework.nomenclature_financing_type",
"dataset.cor_dataset_actor",
"dataset.cor_dataset_actor.role",
"dataset.cor_dataset_actor.organism",
"dataset.cor_territories",
"dataset.nomenclature_source_status",
"dataset.nomenclature_resource_type",
"dataset.nomenclature_dataset_objectif",
"dataset.nomenclature_data_type",
"dataset.nomenclature_data_origin",
"dataset.nomenclature_collecting_method",
"dataset.creator",
"dataset.modules",
"validations",
"validations.validation_label",
"validations.validator_role",
"cor_observers",
"cor_observers.organisme",
"source",
"habitat",
"medias",
"areas",
"areas.area_type",
]
# get reports info only if activated by admin config
if "SYNTHESE" in current_app.config["SYNTHESE"]["ALERT_MODULES"]:
fields.append("reports.report_type.type")
synthese_query = synthese_query.options(
lazyload(Synthese.reports).joinedload(TReport.report_type)
)
try:
synthese = (
db.session.execute(synthese_query.filter_by(id_synthese=id_synthese))
.unique()
.scalar_one()
)
except NoResultFound:
raise NotFound()
if not synthese.has_instance_permission(permissions=permissions):
raise Forbidden()
_, precise_permissions = split_blurring_precise_permissions(permissions)
# If blurring permissions and obs sensitive.
if (
not synthese.has_instance_permission(precise_permissions)
and synthese.nomenclature_sensitivity.cd_nomenclature != "0"
):
# Use a cte to have the areas associated with the current id_synthese
cte = select(CorAreaSynthese).where(CorAreaSynthese.id_synthese == id_synthese).cte()
# Blurred area of the observation
BlurredObsArea = aliased(LAreas)
# Blurred area type of the observation
BlurredObsAreaType = aliased(BibAreasTypes)
# Types "larger" or equal in area hierarchy size that the blurred area type
BlurredAreaTypes = aliased(BibAreasTypes)
# Areas associates with the BlurredAreaTypes
BlurredAreas = aliased(LAreas)
# Inner join that retrieve the blurred area of the obs and the bigger areas
# used for "Zonages" in Synthese. Need to have size_hierarchy from ref_geo
inner = (
sa.join(CorAreaSynthese, BlurredObsArea)
.join(BlurredObsAreaType)
.join(
cor_sensitivity_area_type,
cor_sensitivity_area_type.c.id_area_type == BlurredObsAreaType.id_type,
)
.join(
BlurredAreaTypes,
BlurredAreaTypes.size_hierarchy >= BlurredObsAreaType.size_hierarchy,
)
.join(BlurredAreas, BlurredAreaTypes.id_type == BlurredAreas.id_type)
.join(cte, cte.c.id_area == BlurredAreas.id_area)
)
# Outer join to join CorAreaSynthese taking into account the sensitivity
outer = (
inner,
sa.and_(
Synthese.id_synthese == CorAreaSynthese.id_synthese,
Synthese.id_nomenclature_sensitivity
== cor_sensitivity_area_type.c.id_nomenclature_sensitivity,
),
)
synthese_query = (
synthese_query.outerjoin(*outer)
# contains_eager: to populate Synthese.areas directly
.options(contains_eager(Synthese.areas.of_type(BlurredAreas)))
.options(
with_expression(
Synthese.the_geom_authorized,
func.coalesce(BlurredObsArea.geom_4326, Synthese.the_geom_4326),
)
)
.order_by(BlurredAreaTypes.size_hierarchy)
)
else:
synthese_query = synthese_query.options(
lazyload("areas").options(
joinedload("area_type"),
),
with_expression(Synthese.the_geom_authorized, Synthese.the_geom_4326),
)
synthese = (
db.session.execute(synthese_query.filter(Synthese.id_synthese == id_synthese))
.unique()
.scalar_one()
)
synthese_schema = SyntheseSchema(
only=Synthese.nomenclature_fields + fields,
exclude=["areas.geom"],
as_geojson=True,
feature_geometry="the_geom_authorized",
)
return synthese_schema.dump(synthese)
################################
########### EXPORTS ############
################################
@routes.route("/export_taxons", methods=["POST"])
@permissions_required("E", module_code="SYNTHESE")
[docs]
def export_taxon_web(permissions):
"""Optimized route for taxon web export.
.. :quickref: Synthese;
This view is customisable by the administrator
Some columns are mandatory: cd_ref
POST parameters: Use a list of cd_ref (in POST parameters)
to filter the v_synthese_taxon_for_export_view
:query str export_format: str<'csv'>
"""
taxon_view = GenericTable(
tableName="v_synthese_taxon_for_export_view",
schemaName="gn_synthese",
engine=DB.engine,
)
columns = taxon_view.tableDef.columns
# Test de conformité de la vue v_synthese_for_export_view
try:
assert hasattr(taxon_view.tableDef.columns, "cd_ref")
except AssertionError as e:
return (
{
"msg": """
View v_synthese_taxon_for_export_view
must have a cd_ref column \n
trace: {}
""".format(
str(e)
)
},
500,
)
id_list = request.get_json()
sub_query = (
select(
VSyntheseForWebApp.cd_ref,
func.count(distinct(VSyntheseForWebApp.id_synthese)).label("nb_obs"),
func.min(VSyntheseForWebApp.date_min).label("date_min"),
func.max(VSyntheseForWebApp.date_max).label("date_max"),
)
.where(VSyntheseForWebApp.id_synthese.in_(id_list))
.group_by(VSyntheseForWebApp.cd_ref)
)
synthese_query_class = SyntheseQuery(
VSyntheseForWebApp,
sub_query,
{},
)
synthese_query_class.filter_query_all_filters(g.current_user, permissions)
subq = synthese_query_class.query.alias("subq")
query = select(*columns, subq.c.nb_obs, subq.c.date_min, subq.c.date_max).join(
subq, subq.c.cd_ref == columns.cd_ref
)
return to_csv_resp(
datetime.datetime.now().strftime("%Y_%m_%d_%Hh%Mm%S"),
data=serializeQuery(db.session.execute(query).all(), query.column_descriptions),
separator=";",
columns=[db_col.key for db_col in columns] + ["nb_obs", "date_min", "date_max"],
)
@routes.route("/export_observations", methods=["POST"])
@permissions_required("E", module_code="SYNTHESE")
[docs]
def export_observations_web(permissions):
"""Optimized route for observations web export.
.. :quickref: Synthese;
This view is customisable by the administrator
Some columns are mandatory: id_synthese, geojson and geojson_local to generate the exported files
POST parameters: Use a list of id_synthese (in POST parameters) to filter the v_synthese_for_export_view
:query str export_format: str<'csv', 'geojson', 'shapefiles', 'gpkg'>
:query str export_format: str<'csv', 'geojson', 'shapefiles', 'gpkg'>
"""
params = request.args
# set default to csv
export_format = params.get("export_format", "csv")
view_name_param = params.get("view_name", "gn_synthese.v_synthese_for_export")
# Test export_format
if export_format not in current_app.config["SYNTHESE"]["EXPORT_FORMAT"]:
raise BadRequest("Unsupported format")
config_view = {
"view_name": "gn_synthese.v_synthese_for_web_app",
"geojson_4326_field": "geojson_4326",
"geojson_local_field": "geojson_local",
}
# Test export view name is config params for security reason
if view_name_param != "gn_synthese.v_synthese_for_export":
try:
config_view = next(
_view
for _view in current_app.config["SYNTHESE"]["EXPORT_OBSERVATIONS_CUSTOM_VIEWS"]
if _view["view_name"] == view_name_param
)
except StopIteration:
raise Forbidden("This view is not available for export")
geojson_4326_field = config_view["geojson_4326_field"]
geojson_local_field = config_view["geojson_local_field"]
try:
schema_name, view_name = view_name_param.split(".")
except ValueError:
raise BadRequest("view_name parameter must be a string with schema dot view_name")
# get list of id synthese from POST
id_list = request.get_json()
# Get the SRID for the export
local_srid = DB.session.execute(
func.Find_SRID("gn_synthese", "synthese", "the_geom_local")
).scalar()
blurring_permissions, precise_permissions = split_blurring_precise_permissions(permissions)
# Get the view for export
# Useful to have geom column so that they can be replaced by blurred geoms
# (only if the user has sensitive permissions)
export_view = GenericTableGeo(
tableName=view_name,
schemaName=schema_name,
engine=DB.engine,
geometry_field=None,
srid=local_srid,
)
mandatory_columns = {"id_synthese", geojson_4326_field, geojson_local_field}
if not mandatory_columns.issubset(set(map(lambda col: col.name, export_view.db_cols))):
print(set(map(lambda col: col.name, export_view.db_cols)))
raise BadRequest(
f"The view {view_name} miss one of required columns {str(mandatory_columns)}"
)
# If there is no sensitive permissions => same path as before blurring implementation
if not blurring_permissions:
# Get the CTE for synthese filtered by user permissions
synthese_query_class = SyntheseQuery(
Synthese,
select(Synthese.id_synthese),
{},
)
synthese_query_class.filter_query_all_filters(g.current_user, permissions)
cte_synthese_filtered = synthese_query_class.build_query().cte("cte_synthese_filtered")
selectable_columns = [export_view.tableDef]
else:
# Use slightly the same process as for get_observations_for_web()
# Add a where_clause to filter the id_synthese provided to reduce the
# UNION queries
where_clauses = [Synthese.id_synthese.in_(id_list)]
blurred_geom_query, precise_geom_query = build_blurred_precise_geom_queries(
filters={}, where_clauses=where_clauses
)
cte_synthese_filtered = build_allowed_geom_cte(
blurring_permissions=blurring_permissions,
precise_permissions=precise_permissions,
blurred_geom_query=blurred_geom_query,
precise_geom_query=precise_geom_query,
limit=current_app.config["SYNTHESE"]["NB_MAX_OBS_EXPORT"],
)
# Overwrite geometry columns to compute the blurred geometry from the blurring cte
columns_with_geom_excluded = [
col
for col in export_view.tableDef.columns
if col.name
not in [
"geometrie_wkt_4326", # FIXME: hardcoded column names?
"x_centroid_4326",
"y_centroid_4326",
geojson_4326_field,
geojson_local_field,
]
]
# Recomputed the blurred geometries
blurred_geom_columns = [
func.st_astext(cte_synthese_filtered.c.geom).label("geometrie_wkt_4326"),
func.st_x(func.st_centroid(cte_synthese_filtered.c.geom)).label("x_centroid_4326"),
func.st_y(func.st_centroid(cte_synthese_filtered.c.geom)).label("y_centroid_4326"),
func.st_asgeojson(cte_synthese_filtered.c.geom).label(geojson_4326_field),
func.st_asgeojson(func.st_transform(cte_synthese_filtered.c.geom, local_srid)).label(
geojson_local_field
),
]
# Finally provide all the columns to be selected in the export query
selectable_columns = columns_with_geom_excluded + blurred_geom_columns
# Get the query for export
export_query = (
select(*selectable_columns)
.select_from(
export_view.tableDef.join(
cte_synthese_filtered,
cte_synthese_filtered.c.id_synthese == export_view.tableDef.columns["id_synthese"],
)
)
.where(export_view.tableDef.columns["id_synthese"].in_(id_list))
)
# Get the results for export
results = DB.session.execute(
export_query.limit(current_app.config["SYNTHESE"]["NB_MAX_OBS_EXPORT"])
)
db_cols_for_shape = []
columns_to_serialize = []
# loop over synthese config to exclude columns if its default export
for db_col in export_view.db_cols:
if view_name_param == "gn_synthese.v_synthese_for_export":
if db_col.key in current_app.config["SYNTHESE"]["EXPORT_COLUMNS"]:
db_cols_for_shape.append(db_col)
columns_to_serialize.append(db_col.key)
else:
# remove geojson fields of serialization
if db_col.key not in [geojson_4326_field, geojson_local_field]:
db_cols_for_shape.append(db_col)
columns_to_serialize.append(db_col.key)
file_name = datetime.datetime.now().strftime("%Y_%m_%d_%Hh%Mm%S")
file_name = filemanager.removeDisallowedFilenameChars(file_name)
if export_format == "csv":
formated_data = [export_view.as_dict(d, fields=columns_to_serialize) for d in results]
return to_csv_resp(file_name, formated_data, separator=";", columns=columns_to_serialize)
elif export_format == "geojson":
features = []
for r in results:
geometry = json.loads(getattr(r, geojson_4326_field))
feature = Feature(
geometry=geometry,
properties=export_view.as_dict(r, fields=columns_to_serialize),
)
features.append(feature)
results = FeatureCollection(features)
return to_json_resp(results, as_file=True, filename=file_name, indent=4)
else:
try:
dir_name, file_name = export_as_geo_file(
export_format=export_format,
export_view=export_view,
db_cols=db_cols_for_shape,
geojson_col=geojson_local_field,
data=results,
file_name=file_name,
)
return send_from_directory(dir_name, file_name, as_attachment=True)
except GeonatureApiError as e:
message = str(e)
return render_template(
"error.html",
error=message,
redirect=current_app.config["URL_APPLICATION"] + "/#/synthese",
)
# TODO: Change the following line to set method as "POST" only ?
@routes.route("/export_metadata", methods=["GET", "POST"])
@permissions_required("E", module_code="SYNTHESE")
@routes.route("/export_statuts", methods=["POST"])
@permissions_required("E", module_code="SYNTHESE")
[docs]
def export_status(permissions):
"""Route to get all the protection status of a synthese search
.. :quickref: Synthese;
Get the CRUVED from 'R' action because we don't give observations X/Y but only statuts
and to be consistent with the data displayed in the web interface.
Parameters:
- HTTP-GET: the same that the /synthese endpoint (all the filter in web app)
"""
filters = request.json if request.is_json else {}
# Initalize the select object
query = select(
distinct(VSyntheseForWebApp.cd_nom).label("cd_nom"),
Taxref.cd_ref,
Taxref.nom_complet,
Taxref.nom_vern,
TaxrefBdcStatutTaxon.rq_statut,
TaxrefBdcStatutType.regroupement_type,
TaxrefBdcStatutType.lb_type_statut,
TaxrefBdcStatutText.cd_sig,
TaxrefBdcStatutText.full_citation,
TaxrefBdcStatutText.doc_url,
TaxrefBdcStatutValues.code_statut,
TaxrefBdcStatutValues.label_statut,
)
# Initialize SyntheseQuery class
synthese_query = SyntheseQuery(VSyntheseForWebApp, query, filters)
# Filter query with permissions
synthese_query.filter_query_all_filters(g.current_user, permissions)
# Add join
synthese_query.add_join(Taxref, Taxref.cd_nom, VSyntheseForWebApp.cd_nom)
synthese_query.add_join(
CorAreaSynthese,
CorAreaSynthese.id_synthese,
VSyntheseForWebApp.id_synthese,
)
synthese_query.add_join(
bdc_statut_cor_text_area, bdc_statut_cor_text_area.c.id_area, CorAreaSynthese.id_area
)
synthese_query.add_join(TaxrefBdcStatutTaxon, TaxrefBdcStatutTaxon.cd_ref, Taxref.cd_ref)
synthese_query.add_join(
TaxrefBdcStatutCorTextValues,
TaxrefBdcStatutCorTextValues.id_value_text,
TaxrefBdcStatutTaxon.id_value_text,
)
synthese_query.add_join_multiple_cond(
TaxrefBdcStatutText,
[
TaxrefBdcStatutText.id_text == TaxrefBdcStatutCorTextValues.id_text,
TaxrefBdcStatutText.id_text == bdc_statut_cor_text_area.c.id_text,
],
)
synthese_query.add_join(
TaxrefBdcStatutType,
TaxrefBdcStatutType.cd_type_statut,
TaxrefBdcStatutText.cd_type_statut,
)
synthese_query.add_join(
TaxrefBdcStatutValues,
TaxrefBdcStatutValues.id_value,
TaxrefBdcStatutCorTextValues.id_value,
)
# Build query
query = synthese_query.build_query()
# Set enable status texts filter
query = query.where(TaxrefBdcStatutText.enable == True)
protection_status = []
data = DB.session.execute(query)
for d in data:
d = d._mapping
row = OrderedDict(
[
("cd_nom", d["cd_nom"]),
("cd_ref", d["cd_ref"]),
("nom_complet", d["nom_complet"]),
("nom_vern", d["nom_vern"]),
("type_regroupement", d["regroupement_type"]),
("type", d["lb_type_statut"]),
("territoire_application", d["cd_sig"]),
("intitule_doc", re.sub("<[^<]+?>", "", d["full_citation"])),
("code_statut", d["code_statut"]),
("intitule_statut", d["label_statut"]),
("remarque", d["rq_statut"]),
("url_doc", d["doc_url"]),
]
)
protection_status.append(row)
export_columns = [
"nom_complet",
"nom_vern",
"cd_nom",
"cd_ref",
"type_regroupement",
"type",
"territoire_application",
"intitule_doc",
"code_statut",
"intitule_statut",
"remarque",
"url_doc",
]
return to_csv_resp(
datetime.datetime.now().strftime("%Y_%m_%d_%Hh%Mm%S"),
protection_status,
separator=";",
columns=export_columns,
)
######################################
########### OTHERS ROUTES ############
######################################
@routes.route("/general_stats", methods=["GET"])
@permissions_required("R", module_code="SYNTHESE")
@json_resp
[docs]
def general_stats(permissions):
"""Return stats about synthese.
.. :quickref: Synthese;
- nb of observations
- nb of distinct species
- nb of distinct observer
- nb of datasets
"""
nb_allowed_datasets = db.session.scalar(
select(func.count("*"))
.select_from(TDatasets)
.where(TDatasets.filter_by_readable().whereclause)
)
query = select(
func.count(Synthese.id_synthese),
func.count(func.distinct(Synthese.cd_nom)),
func.count(func.distinct(Synthese.observers)),
)
synthese_query_obj = SyntheseQuery(Synthese, query, {})
synthese_query_obj.filter_query_with_cruved(g.current_user, permissions)
result = DB.session.execute(synthese_query_obj.query)
synthese_counts = result.fetchone()
data = {
"nb_data": synthese_counts[0],
"nb_species": synthese_counts[1],
"nb_observers": synthese_counts[2],
"nb_dataset": nb_allowed_datasets,
}
return data
@routes.route("/taxons_tree", methods=["GET"])
@login_required
@json_resp
[docs]
def get_taxon_tree():
"""Get taxon tree.
.. :quickref: Synthese;
"""
taxon_tree_table = GenericTable(
tableName="v_tree_taxons_synthese", schemaName="gn_synthese", engine=DB.engine
)
data = db.session.execute(select(taxon_tree_table.tableDef)).all()
return [taxon_tree_table.as_dict(datum) for datum in data]
@routes.route("/taxons_autocomplete", methods=["GET"])
@login_required
@json_resp
[docs]
def get_autocomplete_taxons_synthese():
"""Autocomplete taxon for web search (based on all taxon in Synthese).
.. :quickref: Synthese;
The request use trigram algorithm to get relevent results
:query str search_name: the search name (use sql ilike statement and puts "%" for spaces)
:query str regne: filter with kingdom
:query str group2_inpn : filter with INPN group 2
"""
search_name = request.args.get("search_name", "")
query = (
select(
VMTaxrefListForautocomplete,
func.similarity(VMTaxrefListForautocomplete.unaccent_search_name, search_name).label(
"idx_trgm"
),
)
.distinct()
.join(Synthese, Synthese.cd_nom == VMTaxrefListForautocomplete.cd_nom)
)
search_name = search_name.replace(" ", "%")
query = query.where(
VMTaxrefListForautocomplete.unaccent_search_name.ilike(
func.unaccent("%" + search_name + "%")
)
)
regne = request.args.get("regne")
if regne:
query = query.where(VMTaxrefListForautocomplete.regne == regne)
group2_inpn = request.args.get("group2_inpn")
if group2_inpn:
query = query.where(VMTaxrefListForautocomplete.group2_inpn == group2_inpn)
# FIXME : won't work now
# query = query.order_by(
# desc(VMTaxrefListForautocomplete.cd_nom == VMTaxrefListForautocomplete.cd_ref)
# )
limit = request.args.get("limit", 20)
data = db.session.execute(
query.order_by(
desc("idx_trgm"),
).limit(limit)
).all()
return [d[0].as_dict() for d in data]
@routes.route("/sources", methods=["GET"])
@login_required
@json_resp
[docs]
def get_sources():
"""Get all sources.
.. :quickref: Synthese;
"""
q = select(TSources)
data = db.session.scalars(q).all()
return [n.as_dict() for n in data]
@routes.route("/defaultsNomenclatures", methods=["GET"])
@login_required
[docs]
def getDefaultsNomenclatures():
"""Get default nomenclatures
.. :quickref: Synthese;
:query str group2_inpn:
:query str regne:
:query int organism:
"""
params = request.args
group2_inpn = "0"
regne = "0"
organism = 0
if "group2_inpn" in params:
group2_inpn = params["group2_inpn"]
if "regne" in params:
regne = params["regne"]
if "organism" in params:
organism = params["organism"]
types = request.args.getlist("mnemonique_type")
query = select(
distinct(DefaultsNomenclaturesValue.mnemonique_type),
func.gn_synthese.get_default_nomenclature_value(
DefaultsNomenclaturesValue.mnemonique_type, organism, regne, group2_inpn
),
)
if len(types) > 0:
query = query.where(DefaultsNomenclaturesValue.mnemonique_type.in_(tuple(types)))
data = db.session.execute(query).all()
if not data:
raise NotFound
return jsonify(dict(data))
@routes.route("/color_taxon", methods=["GET"])
@login_required
[docs]
def get_color_taxon():
"""Get color of taxon in areas (vue synthese.v_color_taxon_area).
.. :quickref: Synthese;
:query str code_area_type: Type area code (ref_geo.bib_areas_types.type_code)
:query int id_area: Id of area (ref_geo.l_areas.id_area)
:query int cd_nom: taxon code (taxonomie.taxref.cd_nom)
Those three parameters can be multiples
:returns: Array<dict<VColorAreaTaxon>>
"""
params = request.args
limit = int(params.get("limit", 100))
page = params.get("page", 1, int)
if "offset" in request.args:
warn(
"offset is deprecated, please use page for pagination (start at 1)", DeprecationWarning
)
page = (int(request.args["offset"]) / limit) + 1
id_areas_type = params.getlist("code_area_type")
cd_noms = params.getlist("cd_nom")
id_areas = params.getlist("id_area")
query = select(VColorAreaTaxon)
if len(id_areas_type) > 0:
query = query.join(LAreas, LAreas.id_area == VColorAreaTaxon.id_area).join(
BibAreasTypes, BibAreasTypes.id_type == LAreas.id_type
)
query = query.where(BibAreasTypes.type_code.in_(tuple(id_areas_type)))
if len(id_areas) > 0:
# check if the join already done on l_areas
if not is_already_joined(LAreas, query):
query = query.join(LAreas, LAreas.id_area == VColorAreaTaxon.id_area)
query = query.where(LAreas.id_area.in_(tuple(id_areas)))
query = query.order_by(VColorAreaTaxon.cd_nom).order_by(VColorAreaTaxon.id_area)
if len(cd_noms) > 0:
query = query.where(VColorAreaTaxon.cd_nom.in_(tuple(cd_noms)))
results = db.paginate(query, page=page, per_page=limit, error_out=False)
return jsonify([d.as_dict() for d in results.items])
@routes.route("/taxa_count", methods=["GET"])
@login_required
@json_resp
[docs]
def get_taxa_count():
"""
Get taxa count in synthese filtering with generic parameters
.. :quickref: Synthese;
Parameters
----------
id_dataset: `int` (query parameter)
Returns
-------
count: `int`:
the number of taxon
"""
params = request.args
query = (
select(func.count(distinct(Synthese.cd_nom)))
.select_from(Synthese)
.where(Synthese.id_dataset == params["id_dataset"] if "id_dataset" in params else True)
)
return db.session.scalar(query)
@routes.route("/observation_count", methods=["GET"])
@login_required
@json_resp
[docs]
def get_observation_count():
"""
Get observations found in a given dataset
.. :quickref: Synthese;
Parameters
----------
id_dataset: `int` (query parameter)
Returns
-------
count: `int`:
the number of observation
"""
params = request.args
query = select(func.count(Synthese.id_synthese)).select_from(Synthese)
if "id_dataset" in params:
query = query.where(Synthese.id_dataset == params["id_dataset"])
return DB.session.execute(query).scalar_one()
@routes.route("/observations_bbox", methods=["GET"])
@login_required
[docs]
def get_bbox():
"""
Get bbox of observations
.. :quickref: Synthese;
Parameters
-----------
id_dataset: int: (query parameter)
Returns
-------
bbox: `geojson`:
the bounding box in geojson
"""
params = request.args
query = select(func.ST_AsGeoJSON(func.ST_Extent(Synthese.the_geom_4326)))
if "id_dataset" in params:
query = query.where(Synthese.id_dataset == params["id_dataset"])
if "id_source" in params:
query = query.where(Synthese.id_source == params["id_source"])
data = db.session.execute(query).one()
if data and data[0]:
return Response(data[0], mimetype="application/json")
return "", 204
@routes.route("/observation_count_per_column/<column>", methods=["GET"])
@login_required
[docs]
def observation_count_per_column(column):
"""
Get observations count group by a given column
This function was used to count observations per dataset,
but this usage have been replaced by
TDatasets.synthese_records_count.
Remove this function as it is very inefficient?
"""
if column not in sa.inspect(Synthese).column_attrs:
raise BadRequest(f"No column name {column} in Synthese")
synthese_column = getattr(Synthese, column)
stmt = (
select(
func.count(Synthese.id_synthese).label("count"),
synthese_column.label(column),
)
.select_from(Synthese)
.group_by(synthese_column)
)
return jsonify(DB.session.execute(stmt).fetchall())
@routes.route("/taxa_distribution", methods=["GET"])
@login_required
[docs]
def get_taxa_distribution():
"""
Get taxa distribution for a given dataset or acquisition framework
and grouped by a certain taxa rank
"""
id_dataset = request.args.get("id_dataset")
id_af = request.args.get("id_af")
id_source = request.args.get("id_source")
rank = request.args.get("taxa_rank")
if not rank:
rank = "regne"
try:
rank = getattr(Taxref.__table__.columns, rank)
except AttributeError:
raise BadRequest("Rank does not exist")
Taxref.group2_inpn
query = (
select(func.count(distinct(Synthese.cd_nom)), rank)
.select_from(Synthese)
.outerjoin(Taxref, Taxref.cd_nom == Synthese.cd_nom)
)
if id_dataset:
query = query.where(Synthese.id_dataset == id_dataset)
elif id_af:
query = query.outerjoin(TDatasets, TDatasets.id_dataset == Synthese.id_dataset).where(
TDatasets.id_acquisition_framework == id_af
)
# User can add id_source filter along with id_dataset or id_af
if id_source is not None:
query = query.where(Synthese.id_source == id_source)
data = db.session.execute(query.group_by(rank)).all()
return jsonify([{"count": d[0], "group": d[1]} for d in data])
@routes.route("/reports", methods=["POST"])
@permissions_required("R", module_code="SYNTHESE")
@json_resp
[docs]
def create_report(permissions):
"""
Create a report (e.g report) for a given synthese id
Returns
-------
report: `json`:
Every occurrence's report
"""
session = DB.session
data = request.get_json()
if data is None:
raise BadRequest("Empty request data")
try:
type_name = data["type"]
id_synthese = data["item"]
content = data["content"]
except KeyError:
raise BadRequest("Empty request data")
if not id_synthese:
raise BadRequest("id_synthese is missing from the request")
if not type_name:
raise BadRequest("Report type is missing from the request")
if not content and type_name == "discussion":
raise BadRequest("Discussion content is required")
type_exists = BibReportsTypes.query.filter_by(type=type_name).first()
if not type_exists:
raise BadRequest("This report type does not exist")
synthese = db.session.scalars(
select(Synthese)
.options(
Load(Synthese).raiseload("*"),
joinedload("nomenclature_sensitivity"),
joinedload("cor_observers"),
joinedload("digitiser"),
joinedload("dataset"),
)
.filter_by(id_synthese=id_synthese)
.limit(1),
).first()
if not synthese:
abort(404)
if not synthese.has_instance_permission(permissions):
raise Forbidden
report_query = TReport.query.where(
TReport.id_synthese == id_synthese,
TReport.report_type.has(BibReportsTypes.type == type_name),
)
user_pin = TReport.query.where(
TReport.id_synthese == id_synthese,
TReport.report_type.has(BibReportsTypes.type == "pin"),
TReport.id_role == g.current_user.id_role,
)
# only allow one alert by id_synthese
if type_name in ["alert"]:
alert_exists = report_query.one_or_none()
if alert_exists is not None:
raise Conflict("This type already exists for this id")
if type_name in ["pin"]:
pin_exist = user_pin.one_or_none()
if pin_exist is not None:
raise Conflict("This type already exists for this id")
new_entry = TReport(
id_synthese=id_synthese,
id_role=g.current_user.id_role,
content=content,
creation_date=datetime.datetime.now(),
id_type=type_exists.id_type,
)
session.add(new_entry)
if type_name == "discussion":
# Get the observers of the observation
observers = {observer.id_role for observer in synthese.cor_observers}
# Get the users that commented the observation
commenters = {
report.id_role
for report in report_query.where(
TReport.id_role.notin_({synthese.id_digitiser} | observers)
).distinct(TReport.id_role)
}
# The id_roles are the Union between observers and commenters
id_roles = observers | commenters | {synthese.id_digitiser}
# Remove the user that just commented the obs not to notify him/her
id_roles.discard(g.current_user.id_role)
notify_new_report_change(
synthese=synthese, user=g.current_user, id_roles=id_roles, content=content
)
session.commit()
[docs]
def notify_new_report_change(synthese, user, id_roles, content):
if not synthese.id_digitiser:
return
dispatch_notifications(
code_categories=["OBSERVATION-COMMENT"],
id_roles=id_roles,
title="Nouveau commentaire sur une observation",
url=(
current_app.config["URL_APPLICATION"]
+ "/#/synthese/occurrence/"
+ str(synthese.id_synthese)
),
context={"synthese": synthese, "user": user, "content": content},
)
@routes.route("/reports/<int:id_report>", methods=["PUT"])
@login_required
@json_resp
[docs]
def update_content_report(id_report):
"""
Modify a report (e.g report) for a given synthese id
Returns
-------
report: `json`:
Every occurrence's report
"""
data = request.json
session = DB.session
idReport = data["idReport"]
row = TReport.query.get_or_404(idReport)
if row.user != g.current.user:
raise Forbidden
row.content = data["content"]
session.commit()
@routes.route("/reports", methods=["GET"])
@permissions_required("R", module_code="SYNTHESE")
[docs]
def list_reports(permissions):
type_name = request.args.get("type")
id_synthese = request.args.get("idSynthese")
sort = request.args.get("sort")
# VERIFY ID SYNTHESE
synthese = db.get_or_404(Synthese, id_synthese)
if not synthese.has_instance_permission(permissions):
raise Forbidden
# START REQUEST
req = TReport.query.where(TReport.id_synthese == id_synthese)
# SORT
if sort == "asc":
req = req.order_by(asc(TReport.creation_date))
if sort == "desc":
req = req.order_by(desc(TReport.creation_date))
# VERIFY AND SET TYPE
type_exists = BibReportsTypes.query.filter_by(type=type_name).one_or_none()
# type param is not required to get all
if type_name and not type_exists:
raise BadRequest("This report type does not exist")
if type_name:
req = req.where(TReport.report_type.has(BibReportsTypes.type == type_name))
# filter by id_role for pin type only
if type_name and type_name == "pin":
req = req.where(TReport.id_role == g.current_user.id_role)
req = req.options(
joinedload(TReport.user).load_only(User.nom_role, User.prenom_role),
joinedload(TReport.report_type),
)
result = [
report.as_dict(
fields=[
"id_report",
"id_synthese",
"id_role",
"report_type.type",
"report_type.id_type",
"content",
"deleted",
"creation_date",
"user.nom_role",
"user.prenom_role",
]
)
for report in req.all()
]
return jsonify(result)
@routes.route("/reports/<int:id_report>", methods=["DELETE"])
@login_required
@json_resp
[docs]
def delete_report(id_report):
reportItem = TReport.query.get_or_404(id_report)
# alert control to check cruved - allow validators only
if reportItem.report_type.type in ["alert"]:
permissions = get_permissions(module_code="SYNTHESE", action_code="R")
if not reportItem.synthese.has_instance_permission(permissions):
raise Forbidden("Permission required to delete this report !")
# only owner could delete a report for pin and discussion
if reportItem.id_role != g.current_user.id_role and reportItem.report_type.type in [
"discussion",
"pin",
]:
raise Forbidden
# discussion control to don't delete but tag report as deleted only
if reportItem.report_type.type == "discussion":
reportItem.content = ""
reportItem.deleted = True
else:
DB.session.delete(reportItem)
DB.session.commit()
@routes.route("/log", methods=["get"])
@login_required
[docs]
def list_synthese_log_entries() -> dict:
"""Get log history from synthese
Parameters
----------
Returns
-------
dict
log action list
"""
# FIXME SQLA 2
deletion_entries = SyntheseLogEntry.query.options(
load_only(
SyntheseLogEntry.id_synthese,
SyntheseLogEntry.last_action,
SyntheseLogEntry.meta_last_action_date,
)
)
create_update_entries = Synthese.query.with_entities(
Synthese.id_synthese,
db.case(
(Synthese.meta_create_date < Synthese.meta_update_date, "U"),
else_="I",
).label("last_action"),
func.coalesce(Synthese.meta_update_date, Synthese.meta_create_date).label(
"meta_last_action_date"
),
)
query = deletion_entries.union(create_update_entries)
# Filter
try:
query = query.filter_by_params(request.args)
except ValueError as exc:
raise BadRequest(*exc.args) from exc
# Sort
try:
query = query.sort(request.args.getlist("sort"))
except ValueError as exc:
raise BadRequest(*exc.args) from exc
# Paginate
limit = request.args.get("limit", type=int, default=50)
page = request.args.get("page", type=int, default=1)
results = query.paginate(page=page, per_page=limit, error_out=False)
return jsonify(
{
"items": [item.as_dict() for item in results.items],
"total": results.total,
"limit": limit,
"page": page,
}
)