Code source de geonature.core.imports.checks.sql.core
from geonature.core.imports.checks.errors import ImportCodeError
from geonature.core.imports.checks.sql.utils import report_erroneous_rows
import sqlalchemy as sa
from geonature.utils.env import db
from geonature.core.imports.models import (
Entity,
EntityField,
BibFields,
TImports,
)
__all__ = ["init_rows_validity", "check_orphan_rows"]
[docs]
def init_rows_validity(imprt: TImports, dataset_name_field: str = "id_dataset"):
"""
Validity columns are three-states:
- None: the row does not contains data for the given entity
- False: the row contains data for the given entity, but data are erroneous
- True: the row contains data for the given entity, and data are valid
"""
transient_table = imprt.destination.get_transient_table()
entities = (
Entity.query.filter_by(destination=imprt.destination).order_by(sa.desc(Entity.order)).all()
)
# Set validity=NULL (not parcicipating in the entity) for all rows
db.session.execute(
sa.update(transient_table)
.where(transient_table.c.id_import == imprt.id_import)
.values({entity.validity_column: None for entity in entities})
)
# Multi-entity fields are ignored for entity identification, but this is not an issue
# as rows with multi-entity field only will raise an ORPHAN_ROW error
selected_fields_names = []
for field_name, source_field in imprt.fieldmapping.items():
if type(source_field) == list:
selected_fields_names.extend(set(source_field) & set(imprt.columns))
elif source_field in imprt.columns:
selected_fields_names.append(field_name)
for entity in entities:
# Select fields associated to this entity *and only to this entity*
fields = (
db.session.query(BibFields)
.where(BibFields.name_field.in_(selected_fields_names))
.where(BibFields.entities.any(EntityField.entity == entity))
.where(~BibFields.entities.any(EntityField.entity != entity))
.where(BibFields.name_field != dataset_name_field)
.all()
)
if fields:
db.session.execute(
sa.update(transient_table)
.where(transient_table.c.id_import == imprt.id_import)
.where(
sa.or_(
*[transient_table.c[field.source_column].isnot(None) for field in fields]
)
)
.values({entity.validity_column: True})
)
[docs]
def check_orphan_rows(imprt):
transient_table = imprt.destination.get_transient_table()
# TODO: handle multi-source fields
# This is actually not a big issue as multi-source fields are unlikely to also be multi-entity fields.
selected_fields_names = []
for field_name, source_field in imprt.fieldmapping.items():
if type(source_field) == list:
selected_fields_names.extend(set(source_field) & set(imprt.columns))
elif source_field in imprt.columns:
selected_fields_names.append(field_name)
# Select fields associated to multiple entities
AllEntityField = sa.orm.aliased(EntityField)
fields = (
db.session.query(BibFields)
.join(EntityField)
.join(Entity)
.order_by(Entity.order) # errors are associated to the first Entity
.filter(BibFields.name_field.in_(selected_fields_names))
.join(AllEntityField, AllEntityField.id_field == BibFields.id_field)
.group_by(BibFields.id_field, EntityField.id_field, Entity.id_entity)
.having(sa.func.count(AllEntityField.id_entity) > 1)
.all()
)
for field in fields:
report_erroneous_rows(
imprt,
entity=None, # OK because ORPHAN_ROW has only WARNING level
error_type=ImportCodeError.ORPHAN_ROW,
error_column=field.name_field,
whereclause=sa.and_(
transient_table.c[field.source_field].isnot(None),
*[transient_table.c[col].is_(None) for col in imprt.destination.validity_columns],
),
)
def check_mandatory_field(imprt, entity, field):
transient_table = imprt.destination.get_transient_table()
source_field = transient_table.c[field.source_column]
whereclause = sa.and_(
transient_table.c[entity.validity_column].isnot(None),
source_field.is_(None),
)
report_erroneous_rows(
imprt,
entity=entity,
error_type=ImportCodeError.MISSING_VALUE,
error_column=field.name_field,
whereclause=whereclause,
)
# Currently not used as done during dataframe checks
def check_mandatory_fields(imprt, entity, fields):
for field in fields.values():
if not field.mandatory or not field.dest_field:
continue
check_mandatory_field(imprt, entity, field)