import json
from operator import or_
from pathlib import Path
from flask import Blueprint, request, current_app, g, url_for
from flask.json import jsonify
from werkzeug.exceptions import Forbidden, Conflict
import requests
from sqlalchemy.orm import joinedload
from sqlalchemy import select, func
from utils_flask_sqla.response import json_resp
from utils_flask_sqla_geo.utilsgeometry import remove_third_dimension
from geonature.core.gn_commons.models import (
TModules,
TParameters,
TMobileApps,
TPlaces,
TAdditionalFields,
)
from geonature.core.gn_commons.repositories import TMediaRepository
from geonature.core.gn_commons.repositories import get_table_location_id
from geonature.utils.env import DB, db, BACKEND_DIR
from geonature.utils.config import config_frontend, config
from geonature.core.gn_permissions import decorators as permissions
from geonature.core.gn_permissions.decorators import login_required
from geonature.core.gn_permissions.tools import get_scope
from geonature.core.gn_commons.schemas import TAdditionalFieldsSchema
import geonature.core.gn_commons.tasks # noqa: F401
from shapely.geometry import shape
from geoalchemy2.shape import from_shape
from geonature.utils.errors import (
GeonatureApiError,
)
[docs]
routes = Blueprint("gn_commons", __name__)
# import routes sub folder
from .validation.routes import *
from .medias.routes import *
@routes.route("/config", methods=["GET"])
[docs]
def config_route():
"""
Returns geonature configuration
"""
return config_frontend
@routes.route("/modules", methods=["GET"])
@login_required
[docs]
def list_modules():
"""
Return the allowed modules of user from its cruved
.. :quickref: Commons;
"""
params = request.args
exclude = current_app.config["DISABLED_MODULES"].copy()
if "exclude" in params:
exclude.extend(params.getlist("exclude"))
query = (
select(TModules)
.options(joinedload(TModules.objects))
.options(joinedload(TModules.destination))
.where(TModules.module_code.notin_(exclude))
.order_by(TModules.module_order.asc())
.order_by(TModules.module_label.asc())
)
modules = db.session.scalars(query).unique().all()
allowed_modules = []
for module in modules:
module_allowed = False
# HACK : on a besoin d'avoir le module GeoNature en front pour l'URL de la doc
if module.module_code == "GEONATURE":
module_allowed = True
module_dict = module.as_dict(fields=["objects", "destination.code"])
# TODO : use has_any_permissions instead - must refactor the front
module_dict["cruved"] = {
action: get_scope(action, module_code=module.module_code, bypass_warning=True)
for action in "CRUVED"
}
if any(module_dict["cruved"].values()):
module_allowed = True
module_dict["module_external_url"] = (
"" if module.active_frontend else module.module_external_url
)
module_dict["module_url"] = module.module_path if module.active_frontend else ""
module_dict["module_objects"] = {}
# get cruved for each object
for obj_dict in module_dict["objects"]:
obj_code = obj_dict["code_object"]
obj_dict["cruved"] = {
action: get_scope(
action,
module_code=module.module_code,
object_code=obj_code,
bypass_warning=True,
)
for action in "CRUVED"
}
if any(obj_dict["cruved"].values()):
module_allowed = True
module_dict["module_objects"][obj_code] = obj_dict
if module_allowed:
allowed_modules.append(module_dict)
return jsonify(allowed_modules)
@routes.route("/module/<module_code>", methods=["GET"])
[docs]
def get_module(module_code):
module = db.one_or_404(select(TModules).filter_by(module_code=module_code))
return jsonify(module.as_dict())
@routes.route("/list/parameters", methods=["GET"])
@json_resp
[docs]
def get_parameters_list():
"""
Get all parameters from gn_commons.t_parameters
.. :quickref: Commons;
"""
return [d.as_dict() for d in db.session.scalars(select(TParameters)).all()]
@routes.route("/parameters/<param_name>", methods=["GET"])
@routes.route("/parameters/<param_name>/<int:id_org>", methods=["GET"])
@json_resp
[docs]
def get_one_parameter(param_name, id_org=None):
data = DB.session.scalars(
select(TParameters)
.where(TParameters.parameter_name == param_name)
.where(TParameters.id_organism == id_org if id_org else True)
).one()
return [data.as_dict()]
@routes.route("/additional_fields", methods=["GET"])
[docs]
def get_additional_fields():
params = request.args
query = select(TAdditionalFields).order_by(TAdditionalFields.field_order)
parse_param_value = lambda param: param.split(",") if len(param.split(",")) > 1 else param
params = {
param_key: parse_param_value(param_values) for param_key, param_values in params.items()
}
if "id_dataset" in params:
id_dataset = params["id_dataset"]
if id_dataset == "null":
# ~ operator means NOT EXISTS
query = query.where(~TAdditionalFields.datasets.any())
elif isinstance(id_dataset, list) and len(id_dataset) > 1:
query = query.where(
or_(
*[
TAdditionalFields.datasets.any(id_dataset=id_dastaset_i)
for id_dastaset_i in id_dataset
]
)
)
else:
query = query.where(TAdditionalFields.datasets.any(id_dataset=id_dataset))
if "module_code" in params:
module_code = params["module_code"]
if isinstance(module_code, list) and len(module_code) > 1:
query = query.where(
*[
TAdditionalFields.modules.any(module_code=module_code_i)
for module_code_i in module_code
]
)
else:
query = query.where(TAdditionalFields.modules.any(module_code=module_code))
if "object_code" in params:
object_code = params["object_code"]
if isinstance(object_code, list) and len(object_code) > 1:
query = query.where(
*[
TAdditionalFields.objects.any(code_object=object_code_i)
for object_code_i in object_code
]
)
else:
query = query.where(TAdditionalFields.objects.any(code_object=object_code))
#
schema = TAdditionalFieldsSchema(
only=["bib_nomenclature_type", "modules", "objects", "datasets", "type_widget"], many=True
)
return jsonify(schema.dump(db.session.scalars(query).all()))
@routes.route("/t_mobile_apps", methods=["GET"])
@json_resp
[docs]
def get_t_mobile_apps():
"""
Get all mobile applications
.. :quickref: Commons;
:query str app_code: the app code
:returns: Array<dict<TMobileApps>>
"""
query = select(TMobileApps)
if "app_code" in request.args:
query = query.where(TMobileApps.app_code.ilike(request.args["app_code"]))
data = db.session.scalars(query).all()
mobile_apps = []
for app in data:
app_dict = app.as_dict(exclude=["relative_path_apk"])
app_dict["settings"] = {}
# if local
if app.relative_path_apk:
relative_apk_path = Path("mobile", app.relative_path_apk)
app_dict["url_apk"] = url_for("media", filename=str(relative_apk_path), _external=True)
relative_settings_path = Path(f"mobile/{app.app_code.lower()}/settings.json")
app_dict["url_settings"] = url_for("media", filename=relative_settings_path, _external=True)
settings_file = Path(current_app.config["MEDIA_FOLDER"]) / relative_settings_path
with settings_file.open() as f:
app_dict["settings"] = json.load(f)
mobile_apps.append(app_dict)
return mobile_apps
# Table Location
@routes.route("/get_id_table_location/<string:schema_dot_table>", methods=["GET"])
@json_resp
[docs]
# schema_dot_table gn_commons.t_modules
def api_get_id_table_location(schema_dot_table):
schema_name = schema_dot_table.split(".")[0]
table_name = schema_dot_table.split(".")[1]
return get_table_location_id(schema_name, table_name)
##############################
# Gestion des lieux (places) #
##############################
@routes.route("/places", methods=["GET"])
@login_required
[docs]
def list_places():
places = db.session.scalars(
db.select(TPlaces)
.filter_by(id_role=g.current_user.id_role)
.order_by(TPlaces.place_name.asc())
).all()
return jsonify([p.as_geofeature() for p in places])
@routes.route("/place", methods=["POST"]) # XXX best practices recommend plural nouns
@routes.route("/places", methods=["POST"])
@login_required
[docs]
def add_place():
data = request.get_json()
# FIXME check data validity!
place_name = data["properties"]["place_name"]
place_exists = (
select(func.count("*"))
.select_from(TPlaces)
.where(TPlaces.place_name == place_name, TPlaces.id_role == g.current_user.id_role)
)
if db.session.execute(place_exists).scalar_one() > 0:
raise Conflict("Nom du lieu déjà existant")
new_shape = shape(data["geometry"])
two_dimension_geom = remove_third_dimension(new_shape)
place_geom = from_shape(two_dimension_geom, srid=4326)
place = TPlaces(id_role=g.current_user.id_role, place_name=place_name, place_geom=place_geom)
db.session.add(place)
db.session.commit()
return jsonify(place.as_geofeature())
@routes.route("/place/<int:id_place>", methods=["DELETE"])
@routes.route("/places/<int:id_place>", methods=["DELETE"])
@login_required
[docs]
def delete_place(id_place):
place = db.get_or_404(TPlaces, id_place)
if g.current_user.id_role != place.id_role:
raise Forbidden("Vous n'êtes pas l'utilisateur propriétaire de ce lieu")
db.session.delete(place)
db.session.commit()
return "", 204
##############################