Code source de apptax.taxonomie.commands.utils

import importlib.resources
from csv import DictReader
from io import TextIOWrapper
from zipfile import ZipFile

from alembic import op
import sqlalchemy as sa
from sqlalchemy.schema import Table, MetaData

from utils_flask_sqla.migrations.utils import open_remote_file

from apptax.database import db
from apptax.taxonomie.models import TMetaTaxref


[docs] def import_bdc_statuts(logger, base_url, zipfile, status_types_file, status_file): with open_remote_file(base_url, zipfile, open_fct=ZipFile) as archive: with archive.open(status_types_file) as f: logger.info("Insert BDC statuts types…") copy_from_csv(f, "bdc_statut_type") with archive.open(status_file) as f: logger.info("Insert BDC statuts…") copy_from_csv( f, "bdc_statut", dest_cols=( "cd_nom", "cd_ref", "cd_sup", "cd_type_statut", "lb_type_statut", "regroupement_type", "code_statut", "label_statut", "rq_statut", "cd_sig", "cd_doc", "lb_nom", "lb_auteur", "nom_complet_html", "nom_valide_html", "regne", "phylum", "classe", "ordre", "famille", "group1_inpn", "group2_inpn", "lb_adm_tr", "niveau_admin", "cd_iso3166_1", "cd_iso3166_2", "full_citation", "doc_url", "thematique", "type_value", ), ) logger.info("Populate BDC statuts…") db.session.execute( importlib.resources.read_text("apptax.migrations.data", "taxonomie_bdc_statuts.sql") ) populate_bdc_statut_cor_text_area(logger)
# FIXME: pourquoi on installe cet index si c’est pour le supprimer ? # db.session.execute("DROP INDEX taxonomie.bdc_statut_id_idx")
[docs] def populate_bdc_statut_cor_text_area(logger): # Clean table before populate logger.info("Populate Link BDC statuts with Areas…") db.session.execute( """ TRUNCATE TABLE taxonomie.bdc_statut_cor_text_area; """ ) # Populate table db.session.execute( """ -- Champ terxfr = true = territoire intra-métropole. False = les DOM-TOM WITH regions AS ( SELECT jsonb_array_elements('[ { "type": "old_r", "TERXFR": true, "code" : "11", "name" :"Île-de-France", "deps": ["75","77","78","91","92","93","94","95"] }, { "type": "old_r", "TERXFR": true, "code" : "21", "name" :"Champagne-Ardenne", "deps": ["08","10","51","52"] }, { "type": "old_r", "TERXFR": true, "code" : "22", "name" :"Picardie", "deps": ["02","60","80"] }, { "type": "old_r", "TERXFR": true, "code" : "23", "name" :"Haute-Normandie", "deps": ["27", "76"] }, { "type": "old_r", "TERXFR": true, "code" : "24", "name" :"Centre", "deps": ["18","28","36","37","41","45"] }, { "type": "old_r", "TERXFR": true, "code" : "25", "name" :"Basse-Normandie", "deps": ["14","50","61"] }, { "type": "old_r", "TERXFR": true, "code" : "26", "name" :"Bourgogne", "deps": ["21","58","71","89"] }, { "type": "old_r", "TERXFR": true, "code" : "31", "name" :"Nord-Pas-de-Calais", "deps": ["59", "62"] }, { "type": "old_r", "TERXFR": true, "code" : "41", "name" :"Lorraine", "deps": ["54","55","57","88"] }, { "type": "old_r", "TERXFR": true, "code" : "42", "name" :"Alsace", "deps": ["67", "68"] }, { "type": "old_r", "TERXFR": true, "code" : "43", "name" :"Franche-Comté", "deps": ["25","39","70","90"] }, { "type": "old_r", "TERXFR": true, "code" : "52", "name" :"Pays de la Loire", "deps": ["44","49","53","72","85"] }, { "type": "old_r", "TERXFR": true, "code" : "53", "name" :"Bretagne", "deps": ["22","29","35","56"] }, { "type": "old_r", "TERXFR": true, "code" : "54", "name" :"Poitou-Charentes", "deps": ["16","17","79","86"] }, { "type": "old_r", "TERXFR": true, "code" : "72", "name" :"Aquitaine", "deps": ["24","33","40","47","64"] }, { "type": "old_r", "TERXFR": true, "code" : "73", "name" :"Midi-Pyrénées", "deps": ["09","12","31","32","46","65","81","82"] }, { "type": "old_r", "TERXFR": true, "code" : "74", "name" :"Limousin", "deps": ["19","23","87"] }, { "type": "old_r", "TERXFR": true, "code" : "82", "name" :"Rhône-Alpes", "deps": ["01","07","26","38","42","69","73","74"] }, { "type": "old_r", "TERXFR": true, "code" : "83", "name" :"Auvergne", "deps": ["03", "15", "43", "63"] }, { "type": "old_r", "TERXFR": true, "code" : "91", "name" :"Languedoc-Roussillon", "deps": ["11","30","34","48","66"] }, { "type": "old_r", "TERXFR": true, "code" : "93", "name" :"Provence-Alpes-Côte d’Azur", "deps": ["04", "05", "06", "13", "83", "84"] }, { "type": "old_r", "TERXFR": true, "code" : "94", "name" :"Corse", "deps": ["2A", "2B"] }, { "type": "new_r", "TERXFR": true, "code" : "11", "name" :"Île-de-France", "deps": ["75","77","78","91","92","93","94","95"] }, { "type": "new_r", "TERXFR": true, "code" : "24", "name" :"Centre-Val de Loire", "deps": ["18","28","36","37","41","45"] }, { "type": "new_r", "TERXFR": true, "code" : "27", "name" :"Bourgogne-Franche-Comté", "deps": ["21","25","39","58","70","71","89","90"] }, { "type": "new_r", "TERXFR": true, "code" : "28", "name" :"Normandie", "deps": ["14","27","50","61","76"] }, { "type": "new_r", "TERXFR": true, "code" : "32", "name" :"Hauts-de-France", "deps": ["02", "59", "60", "62", "80"] }, { "type": "new_r", "TERXFR": true, "code" : "44", "name" :"Grand Est", "deps": ["08","10","51","52","54","55","57","67","68","88"] }, { "type": "new_r", "TERXFR": true, "code" : "52", "name" :"Pays de la Loire", "deps": ["44","49","53","72","85"] }, { "type": "new_r", "TERXFR": true, "code" : "53", "name" :"Bretagne", "deps": ["22","29","35","56"] }, { "type": "new_r", "TERXFR": true, "code" : "75", "name" :"Nouvelle-Aquitaine", "deps": ["16","17","19","23","24","33","40","47","64","79","86","87"] }, { "type": "new_r", "TERXFR": true, "code" : "76", "name" :"Occitanie", "deps": ["09", "11", "12", "30", "31", "32", "34", "46", "48", "65", "66", "81", "82"] }, { "type": "new_r", "TERXFR": true, "code" : "84", "name" :"Auvergne-Rhône-Alpes", "deps": ["01", "03", "07", "15", "26", "38", "42", "43", "63", "69", "73", "74"] }, { "type": "new_r", "TERXFR": true, "code" : "93", "name" :"Provence-Alpes-Côte d’Azur", "deps": ["04", "05", "06", "13", "83", "84"] }, { "type": "new_r", "TERXFR": true, "code" : "94", "name" :"Corse", "deps": ["2A", "2B"] }, { "type": "new_r", "TERXFR": false, "code" : "971", "name" :"Guadeloupe", "deps": ["971"] }, { "type": "new_r", "TERXFR": false, "code" : "972", "name" :"Martinique", "deps": ["972"] }, { "type": "new_r", "TERXFR": false, "code" : "973", "name" :"Guyane", "deps": ["973"] }, { "type": "new_r", "TERXFR": false, "code" : "974", "name" :"La Réunion", "deps": ["974"] }, { "type": "new_r", "TERXFR": false, "code" : "975", "name" :"Saint-Pierre-et-Miquelon", "deps": ["975"] }, { "type": "new_r", "TERXFR": false, "code" : "976", "name" :"Mayotte", "deps": ["976"] }, { "type": "new_r", "TERXFR": false, "code" : "977", "name" :"Saint-Barthélemy", "deps": ["977"] }, { "type": "new_r", "TERXFR": false, "code" : "978", "name" :"Saint-Martin", "deps": ["978"] }, { "type": "new_r", "TERXFR": false, "code" : "984A", "name" :"TAAF", "deps": ["984"] }, { "type": "new_r", "TERXFR": false, "code" : "984B", "name" :"TAAF", "deps": ["984"] }, { "type": "new_r", "TERXFR": false, "code" : "984C", "name" :"TAAF", "deps": ["984"] }, { "type": "new_r", "TERXFR": false, "code" : "986", "name" :"Wallis-et-Futuna", "deps": ["986"] }, { "type": "new_r", "TERXFR": false, "code" : "987", "name" :"Polynésie française", "deps": ["987"] }, { "type": "new_r", "TERXFR": false, "code" : "988", "name" :"Nouvelle-Calédonie", "deps": ["9881", "9882"] }, { "type": "new_r", "TERXFR": false, "code" : "989", "name" :"Île de Clipperton", "deps": ["989"] } ]'::jsonb)AS d ), regions_dep AS ( SELECT jsonb_array_elements_text(d->'deps') AS dep, d->>'code' AS code, d->>'type' AS type, (d->>'TERXFR')::boolean as terxfr FROM regions ) , regions_dep_areas AS ( SELECT la.id_area, d.code, d.TYPE, d.terxfr FROM ref_geo.l_areas la JOIN regions_dep d ON d.dep = la.area_code WHERE id_type = ref_geo.get_id_area_type('DEP') ) , texts AS ( SELECT -- Si 'ETATFRA' insertion de tous les départements bst.id_text, la.id_area FROM taxonomie.bdc_statut_text AS bst JOIN regions_dep_areas AS la ON bst.cd_sig = 'ETATFRA' UNION SELECT -- Si 'TERFXFR' insertion de tous les départements métropolitains bst.id_text, la.id_area FROM taxonomie.bdc_statut_text AS bst JOIN regions_dep_areas AS la ON la.terxfr = true AND bst.cd_sig = 'TERFXFR' AND length(la.code) = 2 UNION SELECT DISTINCT -- Si département bst.id_text, ( SELECT id_area FROM ref_geo.l_areas WHERE area_code = REPLACE(cd_sig, 'INSEED', '') AND id_type = ref_geo.get_id_area_type('DEP') ) FROM taxonomie.bdc_statut_text AS bst WHERE cd_sig ILIKE 'INSEED%' UNION SELECT DISTINCT -- Si nouvelle région bst.id_text, nrs.id_area FROM taxonomie.bdc_statut_text AS bst JOIN regions_dep_areas AS nrs ON (REPLACE(cd_sig, 'INSEENR', '') = nrs.code) AND nrs.TYPE= 'new_r' WHERE cd_sig ILIKE 'INSEENR%' UNION SELECT DISTINCT -- Si ancienne région bst.id_text, ors.id_area FROM taxonomie.bdc_statut_text AS bst JOIN regions_dep_areas AS ors ON (REPLACE(cd_sig, 'INSEER', '') = ors.code) AND ors.TYPE = 'old_r' WHERE cd_sig ILIKE 'INSEER%' UNION SELECT DISTINCT -- Si territoire outre mer bst.id_text, ors.id_area FROM taxonomie.bdc_statut_text AS bst JOIN regions_dep_areas AS ors ON ors.terxfr = false AND ( REPLACE(cd_sig, 'TER', '') = ors.code OR REPLACE(cd_sig, 'INSEET', '') = ors.code ) WHERE cd_sig ILIKE 'INSEET%' OR cd_sig ILIKE 'TER%' ) INSERT INTO taxonomie.bdc_statut_cor_text_area (id_text, id_area) SELECT id_text, id_area FROM texts AS t WHERE t.id_area IS NOT NULL ORDER BY t.id_text, t.id_area ASC; """ )
[docs] def truncate_bdc_statuts(): db.session.execute( """ TRUNCATE taxonomie.bdc_statut, taxonomie.bdc_statut_type, taxonomie.bdc_statut_text, taxonomie.bdc_statut_values, taxonomie.bdc_statut_taxons, taxonomie.bdc_statut_cor_text_values, taxonomie.bdc_statut_cor_text_area """ )
[docs] def refresh_taxref_vm(): db.session.execute("REFRESH MATERIALIZED VIEW taxonomie.vm_classe") db.session.execute("REFRESH MATERIALIZED VIEW taxonomie.vm_famille") db.session.execute("REFRESH MATERIALIZED VIEW taxonomie.vm_group1_inpn") db.session.execute("REFRESH MATERIALIZED VIEW taxonomie.vm_group2_inpn") db.session.execute("REFRESH MATERIALIZED VIEW taxonomie.vm_ordre") db.session.execute("REFRESH MATERIALIZED VIEW taxonomie.vm_phylum") db.session.execute("REFRESH MATERIALIZED VIEW taxonomie.vm_regne") db.session.execute("REFRESH MATERIALIZED VIEW taxonomie.vm_taxref_list_forautocomplete")
[docs] def get_csv_field_names(f, encoding, delimiter): if encoding == "WIN1252": # postgresql encoding encoding = "cp1252" # python encoding t = TextIOWrapper(f, encoding=encoding) reader = DictReader(t, delimiter=delimiter) field_names = reader.fieldnames t.detach() # avoid f to be closed on t garbage collection f.seek(0) return field_names
[docs] def populate_enable_bdc_statut_text(logger, clean, departements): logger.info("Enable or disable texts of BDC statuts with Areas…") if clean: # Clean table before populate db.session.execute( """ UPDATE taxonomie.bdc_statut_text AS bst SET "enable" = FALSE WHERE "enable" IS TRUE """ ) # enable text with departements db.session.execute( """ UPDATE taxonomie.bdc_statut_text s SET "enable" = TRUE FROM taxonomie.bdc_statut_cor_text_area AS ct JOIN ref_geo.l_areas AS la ON ct.id_area = la.id_area WHERE ct.id_text = s.id_text AND id_type = ref_geo.get_id_area_type('DEP') AND area_code IN :areas; """, {"areas": departements}, )
""" Insert CSV file into specified table. If source columns are specified, CSV file in copied in a temporary table, then data restricted to specified source columns are copied in final table. """
[docs] def copy_from_csv( f, table_name, dest_cols="", source_cols=None, schema="taxonomie", header=True, encoding=None, delimiter=None, ): bind = db.session.get_bind() metadata = MetaData(bind=bind) if dest_cols: dest_cols = " (" + ", ".join(dest_cols) + ")" if source_cols: final_table_name = table_name final_table_cols = dest_cols table_name = f"import_{table_name}" dest_cols = "" field_names = get_csv_field_names(f, encoding=encoding, delimiter=delimiter) table = Table( table_name, metadata, *[sa.Column(c, sa.String) for c in map(str.lower, field_names)], schema=schema, ) table.create(bind=db.session.connection()) options = ["FORMAT CSV"] if header: options.append("HEADER") if encoding: options.append(f"ENCODING '{encoding}'") if delimiter: options.append(f"DELIMITER E'{delimiter}'") options = ", ".join(options) cursor = db.session.connection().connection.cursor() cursor.copy_expert( f""" COPY {schema}.{table_name}{dest_cols} FROM STDIN WITH ({options}) """, f, ) if source_cols: source_cols = ", ".join(source_cols) db.session.execute( f""" INSERT INTO {schema}.{final_table_name}{final_table_cols} SELECT {source_cols} FROM {schema}.{table_name}; """ ) table.drop(bind=db.session.connection())
[docs] def insert_taxref_numversion(num_version): taxref_version = TMetaTaxref(referencial_name="Taxref", version=num_version) db.session.add(taxref_version)