from itertools import groupby
import json
from flask import Blueprint, request, current_app
from flask.json import jsonify
import sqlalchemy as sa
from sqlalchemy import func, select, asc, desc
from sqlalchemy.sql import text
from sqlalchemy.orm import joinedload, undefer
from werkzeug.exceptions import BadRequest
from ref_geo.env import db
from ref_geo.models import BibAreasTypes, LiMunicipalities, LAreas
from ref_geo.schemas import AreaTypeSchema, MunicipalitySchema, AreaSchema
[docs]
routes = Blueprint("ref_geo", __name__)
[docs]
altitude_stmt = sa.select(sa.column("altitude_min"), sa.column("altitude_max")).select_from(
func.ref_geo.fct_get_altitude_intersection(
func.ST_SetSRID(
func.ST_GeomFromGeoJSON(sa.bindparam("geojson")),
4326,
),
)
)
[docs]
geojson_intersect_filter = func.ST_Intersects(
LAreas.geom_4326,
func.ST_SetSRID(func.ST_GeomFromGeoJSON(sa.bindparam("geojson")), 4326),
)
[docs]
area_size_func = func.ST_Area(
func.ST_Transform(
func.ST_SetSrid(
func.ST_GeomFromGeoJSON(sa.bindparam("geojson")),
4326,
),
func.Find_SRID("ref_geo", "l_areas", "geom"),
)
)
@routes.route("/info", methods=["POST"])
[docs]
def getGeoInfo():
"""
From a posted geojson, the route return the municipalities intersected
and the altitude min/max
.. :quickref: Ref Geo;
"""
if not request.is_json or request.json is None:
raise BadRequest("Missing request payload")
try:
geojson = request.json["geometry"]
except KeyError:
raise BadRequest("Missing 'geometry' in request payload")
geojson = json.dumps(geojson)
areas = (
select(LAreas)
.filter_by(enable=True)
.filter(geojson_intersect_filter.params(geojson=geojson))
)
if "area_type" in request.json:
areas = areas.join(BibAreasTypes).filter_by(type_code=request.json["area_type"])
elif "id_type" in request.json:
try:
id_type = int(request.json["id_type"])
except ValueError:
raise BadRequest("Parameter 'id_type' must be an integer")
areas = areas.filter_by(id_type=id_type)
altitude = dict(db.session.execute(altitude_stmt, params={"geojson": geojson}).fetchone())
return jsonify(
{
"areas": AreaSchema(only=["id_area", "id_type", "area_code", "area_name"]).dump(
db.session.scalars(areas).unique().all(), many=True
),
"altitude": altitude,
}
)
@routes.route("/altitude", methods=["POST"])
[docs]
def getAltitude():
"""
From a posted geojson get the altitude min/max
.. :quickref: Ref Geo;
"""
if not request.is_json:
raise BadRequest("Missing request payload")
try:
geojson = request.json["geometry"]
except KeyError:
raise BadRequest("Missing 'geometry' in request payload")
geojson = json.dumps(geojson)
altitude = db.session.execute(altitude_stmt, params={"geojson": geojson}).fetchone()
return jsonify(altitude)
@routes.route("/areas", methods=["POST"])
[docs]
def getAreasIntersection():
"""
From a posted geojson, the route return all the area intersected
from l_areas
.. :quickref: Ref Geo;
"""
if not request.is_json or request.json is None:
raise BadRequest("Missing request payload")
try:
geojson = request.json["geometry"]
except KeyError:
raise BadRequest("Missing 'geometry' in request payload")
geojson = json.dumps(geojson)
areas = (
select(LAreas)
.filter_by(enable=True)
.filter(geojson_intersect_filter.params(geojson=geojson))
)
if "area_type" in request.json:
areas = areas.join(BibAreasTypes).filter_by(type_code=request.json["area_type"])
elif "id_type" in request.json:
try:
id_type = int(request.json["id_type"])
except ValueError:
raise BadRequest("Parameter 'id_type' must be an integer")
areas = areas.filter_by(id_type=id_type)
areas = areas.order_by(LAreas.id_type)
response = {}
for id_type, _areas in groupby(
db.session.scalars(areas).unique().all(), key=lambda area: area.id_type
):
_areas = list(_areas)
response[id_type] = _areas[0].area_type.as_dict(fields=["type_code", "type_name"])
response[id_type].update(
{
"areas": AreaSchema(
only=[
"area_code",
"area_name",
"id_area",
"id_type",
]
).dump(_areas, many=True)
}
)
return jsonify(response)
@routes.route("/municipalities", methods=["GET"])
[docs]
def get_municipalities():
"""
Return the municipalities
.. :quickref: Ref Geo;
"""
parameters = request.args
q = select(LiMunicipalities).order_by(LiMunicipalities.nom_com.asc())
if "nom_com" in parameters:
q = q.where(LiMunicipalities.nom_com.ilike("{}%".format(parameters.get("nom_com"))))
limit = int(parameters.get("limit")) if parameters.get("limit") else 100
municipalities = db.session.scalars(q.limit(limit)).all()
return jsonify(MunicipalitySchema().dump(municipalities, many=True))
@routes.route("/areas", methods=["GET"])
[docs]
def get_areas():
"""
Return the areas of ref_geo.l_areas
.. :quickref: Ref Geo;
"""
# change all args in a list of value
params = {key: request.args.getlist(key) for key, value in request.args.items()}
query = (
select(LAreas)
.options(joinedload("area_type").load_only("type_code"))
.order_by(LAreas.area_name.asc())
)
if "enable" in params:
enable_param = params["enable"][0].lower()
accepted_enable_values = ["true", "false", "all"]
if enable_param not in accepted_enable_values:
response = {
"message": f"Le paramètre 'enable' accepte seulement les valeurs: {', '.join(accepted_enable_values)}.",
"status": "warning",
}
return response, 400
if enable_param == "true":
query = query.where(LAreas.enable == True)
elif enable_param == "false":
query = query.where(LAreas.enable == False)
else:
query = query.where(LAreas.enable == True)
if "id_type" in params:
query = query.where(LAreas.id_type.in_(params["id_type"]))
if "type_code" in params:
query = query.where(LAreas.area_type.has(BibAreasTypes.type_code.in_(params["type_code"])))
if "area_name" in params:
query = query.where(LAreas.area_name.ilike("%{}%".format(params.get("area_name")[0])))
limit = int(params.get("limit")[0]) if params.get("limit") else 100
# allow to format response
format = request.args.get("format", default="", type=str)
fields = {"area_type.type_code"}
if format == "geojson":
fields |= {"+geom_4326"}
query = query.options(undefer("geom_4326"))
areas = db.session.scalars(query.limit(limit)).unique().all()
response = AreaSchema(only=fields, as_geojson=format == "geojson").dump(areas, many=True)
if format == "geojson":
# retro-compat: return a list of Features instead of the FeatureCollection
response = response["features"]
return response
@routes.route("/area_size", methods=["Post"])
[docs]
def get_area_size():
"""
Return the area size from a given geojson
.. :quickref: Ref Geo;
:returns: An area size (int)
"""
if not request.is_json or request.json is None:
raise BadRequest("Missing request payload")
try:
geojson = request.json["geometry"]
except KeyError:
raise BadRequest("Missing 'geometry' in request payload")
geojson = json.dumps(geojson)
query = select(area_size_func.params(geojson=geojson))
return jsonify(db.session.execute(query).scalar())
@routes.route("/types", methods=["Get"])
[docs]
def get_area_types():
"""
Get areas types list
.. :quickref: Areas;
:query str code: Type area code (ref_geo.bib_areas_types.type_code)
:query str name: Type area name (ref_geo.bib_areas_types.type_name)
:query str sort: sort value as ASC - DESC
"""
type_code = request.args.get("code")
type_name = request.args.get("name")
sort = request.args.get("sort")
query = select(BibAreasTypes)
# GET ONLY INFO FOR A SPECIFIC CODE
if type_code:
code_exists = db.session.execute(
select(BibAreasTypes).where(BibAreasTypes.type_code == type_code)
).scalar_one_or_none()
if not code_exists:
raise BadRequest("This area type code does not exist")
query = query.where(BibAreasTypes.type_code == type_code)
# FILTER BY NAME
if type_name:
query = query.where(BibAreasTypes.type_name.ilike("%{}%".format(type_name)))
# SORT
if sort == "asc":
query = query.order_by(asc("type_name"))
if sort == "desc":
query = query.order_by(desc("type_name"))
# FIELDS
fields = ["type_name", "type_code", "id_type"]
return jsonify(AreaTypeSchema(only=fields).dump(db.session.scalars(query).all(), many=True))