Code source de geonature.core.sensitivity.utils

import csv
from functools import lru_cache

import sqlalchemy as sa

from geonature.utils.env import db

from pypnnomenclature.models import (
    TNomenclatures as Nomenclature,
    BibNomenclaturesTypes as NomenclatureType,
)

from .models import CorSensitivityCriteria, SensitivityRule


@lru_cache(maxsize=64)
[docs] def get_nomenclature(type_mnemonique, code): # Retro-compatibility with freezed nomenclatures if type_mnemonique == "STATUT_BIO" and code in ["6", "7", "8", "10", "11", "12"]: type_mnemonique = "OCC_COMPORTEMENT" return db.session.execute( sa.select(Nomenclature).where( Nomenclature.active == True, # noqa: E712 Nomenclature.nomenclature_type.has(NomenclatureType.mnemonique == type_mnemonique), Nomenclature.cd_nomenclature == code, ) ).scalar_one()
[docs] def insert_sensitivity_referential(source, csvfile): statut_biologique_nomenclature_type = db.session.execute( sa.select(NomenclatureType).filter_by(mnemonique="STATUT_BIO") ).scalar_one() behaviour_nomenclature_type = db.session.execute( sa.select(NomenclatureType).filter_by(mnemonique="OCC_COMPORTEMENT") ).scalar_one() defaults_nomenclatures = { statut_biologique_nomenclature_type: set( db.session.scalars( sa.select(Nomenclature).where( Nomenclature.nomenclature_type == statut_biologique_nomenclature_type, Nomenclature.mnemonique.in_(["Inconnu", "Non renseigné", "Non Déterminé"]), ) ).all() ), behaviour_nomenclature_type: set( db.session.scalars( sa.select(Nomenclature).where( Nomenclature.nomenclature_type == behaviour_nomenclature_type, Nomenclature.mnemonique.in_(["NSP", "1"]), ) ).all() ), } rules = [] criterias = set() reader = csv.DictReader(csvfile, delimiter=";") dep_col = next( fieldname for fieldname in reader.fieldnames if fieldname in ["CD_DEP", "CD_DEPT"] ) for row in reader: sensi_nomenclature = get_nomenclature("SENSIBILITE", code=row["CD_SENSIBILITE"]) if row[dep_col] == "D3": cd_dep = "973" elif row[dep_col] == "D4": cd_dep = "974" else: cd_dep = row[dep_col] if row["DUREE"]: duration = int(row["DUREE"]) else: duration = 10000 rule = { "cd_nom": int(row["CD_NOM"]), "nom_cite": row["NOM_CITE"], "id_nomenclature_sensitivity": sensi_nomenclature.id_nomenclature, "sensitivity_duration": duration, "sensitivity_territory": "Département", "id_territory": cd_dep, "source": f"{source}", "comments": row["AUTRE"], "active": True, } _criterias = set() if row["STATUT_BIOLOGIQUE"]: criteria = get_nomenclature("STATUT_BIO", code=row["STATUT_BIOLOGIQUE"]) _criterias |= {criteria} | defaults_nomenclatures[criteria.nomenclature_type] if row["COMPORTEMENT"]: criteria = get_nomenclature("OCC_COMPORTEMENT", code=row["COMPORTEMENT"]) _criterias |= {criteria} | defaults_nomenclatures[criteria.nomenclature_type] for criteria in _criterias: criterias.add((len(rules), criteria)) rules.append(rule) results = db.session.execute( sa.insert(SensitivityRule).values(rules).returning(SensitivityRule.id) ) rules_indexes = [rule_index for rule_index, in results] # flattening singleton results db.session.execute( sa.insert(CorSensitivityCriteria).values( [ { "id_sensitivity": rules_indexes[rule_idx], "id_type_nomenclature": nomenclature.id_type, "id_criteria": nomenclature.id_nomenclature, } for rule_idx, nomenclature in criterias ] ) ) # Populate cor_sensitivity_area db.session.connection().execute( sa.text( """ INSERT INTO gn_sensitivity.cor_sensitivity_area SELECT DISTINCT id_sensitivity, id_area FROM gn_sensitivity.t_sensitivity_rules s JOIN ref_geo.l_areas a ON s.sensitivity_territory = 'Département' AND a.id_type = (SELECT id_type FROM ref_geo.bib_areas_types WHERE type_code ='DEP') AND regexp_replace(s.id_territory, '^([0-9])$', '0\\1') = a.area_code WHERE s.source = :source """ ), source=source, ) return len(rules)
[docs] def remove_sensitivity_referential(source): whereclause = SensitivityRule.source == source return db.session.execute(sa.delete(SensitivityRule).where(whereclause)).rowcount