Code source de geonature.core.imports.checks.sql.user

import sqlalchemy as sa
from flask import jsonify
from geonature.core.imports.checks.errors import ImportCodeError
from geonature.core.imports.checks.sql.utils import report_erroneous_rows
from geonature.core.imports.models import BibFields, Entity, TImports
from geonature.utils.env import db
from marshmallow import Schema, fields
from pypnusershub.db.models import User
from sqlalchemy.dialects.postgresql import JSONB, ARRAY
from geonature.utils.config import config


[docs] class UserMatchingSchema(Schema):
[docs] user_to_match = fields.String()
[docs] id_role = fields.Integer()
[docs] identifiant = fields.String()
[docs] nom_complet = fields.String()
[docs] def user_matching(imprt: TImports, field: BibFields): """ Find matching user for a given transient table and csv column. Parameters ---------- imprt : TImports The import object which contains the transient table. field : BibFields field use to fetch user name strings Returns ------- dict A dictionary of users name (as it appears in the source file) as key and a dictionary of matching information as value. The matching information contains id_role, identifiant, nom_complet. Notes ----- The matching is done by computing the similarity between the source file usernames and the nom_complet of the users in the `utilisateurs.t_roles` table. """ transient_table = imprt.destination.get_transient_table() column_transient = transient_table.c[field.source_column] if isinstance(column_transient.type, JSONB): column_transient = sa.cast(column_transient, db.Unicode) separators = config["IMPORT"]["OBSERVER_FIELD_SEPARATORS"] field_separators_as_regexp = rf"[{''.join(separators)}]+" cte_user_to_match = ( sa.select( sa.func.distinct( sa.func.trim( sa.func.regexp_split_to_table(column_transient, field_separators_as_regexp) ) ).label("user_to_match") ) .where(sa.and_(column_transient != None, sa.func.trim(column_transient) != "")) .where(transient_table.c.id_import == imprt.id_import) .cte("cte_user_to_match") ) cte_user_nom_complet = sa.select( User.id_role, User.identifiant, User.nom_complet, ).cte("cte_user_nom_complet") matches = sa.select( cte_user_to_match.c.user_to_match, cte_user_nom_complet.c.id_role, cte_user_nom_complet.c.identifiant, cte_user_nom_complet.c.nom_complet, sa.func.similarity( cte_user_to_match.c.user_to_match, cte_user_nom_complet.c.nom_complet ).label("similarity"), sa.func.row_number() .over( partition_by=cte_user_to_match.c.user_to_match, order_by=sa.desc( sa.func.similarity( cte_user_to_match.c.user_to_match, cte_user_nom_complet.c.nom_complet ) ), ) .label("rang"), ).cte() query = sa.select(matches).where(matches.c.rang == 1, matches.c.similarity > 0.7) result = UserMatchingSchema( many=True, unknown="exclude", ).dump(db.session.execute(query).all()) sim_match = {res["user_to_match"]: res for res in result} non_match = db.session.execute( sa.select(cte_user_to_match.c.user_to_match) .where(cte_user_to_match.c.user_to_match.notin_(sim_match.keys())) .where(cte_user_to_match.c.user_to_match != None) ).all() sim_match.update({res["user_to_match"]: {} for res in non_match}) return sim_match
[docs] def map_observer_matching(imprt: TImports, entity: Entity, observer_field: BibFields): user_matching = imprt.observermapping if not user_matching: return transient_table = imprt.destination.get_transient_table() observers_jsonb = ( sa.func.jsonb_each(TImports.observermapping.cast(JSONB)) .table_valued("key", "value") .alias("observer_jsonb") ) observer_string_id_role = ( sa.select( sa.literal_column("observer_jsonb.key").label("observer_string"), sa.cast(sa.literal_column("(observer_jsonb.value->>'id_role')"), sa.Integer).label( "id_role" ), ) .select_from( TImports, observers_jsonb, ) .where(TImports.id_import == imprt.id_import) .cte("observer_string_id_role") ) query = ( sa.update(transient_table) .where( transient_table.c.id_import == imprt.id_import, transient_table.c[observer_field.dest_column] == None, transient_table.c[observer_field.source_column] == observer_string_id_role.c.observer_string, ) .values({observer_field.dest_column: observer_string_id_role.c.id_role}) ) db.session.execute(query)