Code source de geonature.core.imports.checks.dataframe.utils

from functools import wraps
from inspect import signature

from sqlalchemy import func
from sqlalchemy.orm.exc import NoResultFound
from sqlalchemy.dialects.postgresql import insert as pg_insert

from geonature.utils.env import db

from geonature.core.imports.models import ImportUserError, ImportUserErrorType
from geonature.core.imports.utils import generated_fields


[docs] def dataframe_check(check_function): """ Decorator for check functions. Check functions must yield errors, and return updated_cols (or None if no column have been modified). """ parameters = signature(check_function).parameters pass_import = "imprt" in parameters pass_entity = "entity" in parameters @wraps(check_function) def wrapper(imprt, entity, df, *args, **kwargs): updated_cols = set() params = [] if pass_import: params.append(imprt) if pass_entity: params.append(entity) errors = check_function(*params, df, *args, **kwargs) try: while True: error = next(errors) updated_cols |= report_error(imprt, entity, df, error) or set() except StopIteration as e: updated_cols |= e.value or set() return updated_cols return wrapper
[docs] def error_replace(old_code, old_columns, new_code, new_column=None): """ For rows which trigger old_code error on all old_columns, these errors are replaced by new_code error on new_column. Usage example: @dataframe_check @error_replace(ImportCodeError.MISSING_VALUE, {"WKT","latitude","longitude"}, ImportCodeError.NO_GEOM, "Champs géométriques") def check_required_values: => MISSING_VALUE on WKT, latitude and longitude are replaced by NO-GEOM on "Champs géométrique" If new_code is None, error is deleted """ def _error_replace(check_function): @wraps(check_function) def __error_replace(*args, **kwargs): matching_errors = [] errors_gen = check_function(*args, **kwargs) try: while True: error = next(errors_gen) if error["error_code"] != old_code: yield error continue if error["column"] not in old_columns: yield error continue matching_errors.append(error) except StopIteration as e: if matching_errors: matching_indexes = list( map(lambda e: set(e["invalid_rows"].index), matching_errors) ) commons_indexes = set.intersection(*matching_indexes) if commons_indexes and new_code is not None: # Yield replacing error yield { "error_code": new_code, "column": new_column, "invalid_rows": matching_errors[0]["invalid_rows"].loc[ list(commons_indexes) ], } for error in matching_errors: indexes = set(error["invalid_rows"].index) - commons_indexes if indexes: # Yield old error but without rows where new error have been yield yield { "error_code": error["error_code"], "column": error["column"], "invalid_rows": error["invalid_rows"].loc[list(indexes)], } return e.value return __error_replace return _error_replace
[docs] def report_error(imprt, entity, df, error): """ Reports an error found in the dataframe, updates the validity column and insert the error in the `t_user_errors` table. Parameters ---------- imprt : Import The import entity. entity : Entity The entity to check. df : pandas.DataFrame The dataframe containing the data. error : dict The error to report. It should have the following keys: - invalid_rows : DataFrame The rows with errors. - error_code : str The name of the error code. - column : str The column with errors. - comment : str, optional A comment to add to the error. Returns ------- set set containing the name of the entity validity column. Raises ------ Exception If the error code is not found. """ if error["invalid_rows"].empty: return try: error_type = ImportUserErrorType.query.filter_by(name=error["error_code"]).one() except NoResultFound: raise Exception(f"Error code '{error['error_code']}' not found.") invalid_rows = error["invalid_rows"] df.loc[invalid_rows.index, entity.validity_column] = False # df['gn_invalid_reason'][invalid_rows.index.intersection(df['gn_invalid_reason'].isnull())] = \ # f'{error_type.name}' # FIXME comment ordered_invalid_rows = sorted(invalid_rows["line_no"]) column = generated_fields.get(error["column"], error["column"]) column = imprt.fieldmapping.get(column, column) # If an error for same import, same column and of the same type already exists, # we concat existing erroneous rows with current rows. stmt = pg_insert(ImportUserError).values( { "id_import": imprt.id_import, "id_error": error_type.pk, "id_entity": entity.id_entity, "column_error": column, "id_rows": ordered_invalid_rows, "comment": error.get("comment"), } ) stmt = stmt.on_conflict_do_update( index_elements=("id_import", "id_entity", "id_error", "column_error"), index_where=ImportUserError.id_entity.isnot(None), set_={ "id_rows": func.array_cat(ImportUserError.rows, stmt.excluded["id_rows"]), }, ) db.session.execute(stmt) return {entity.validity_column}