from typing import Mapping, Optional
from geonature.core.imports.checks.errors import ImportCodeError
from sqlalchemy.sql.expression import select, update
from sqlalchemy.sql import column
import sqlalchemy as sa
from geonature.core.gn_meta.models import TDatasets
from geonature.utils.env import db
from geonature.core.imports.models import BibFields, Entity, TImports
from geonature.core.imports.checks.sql.utils import report_erroneous_rows
from pypnnomenclature.models import TNomenclatures, BibNomenclaturesTypes
__all__ = [
"do_nomenclatures_mapping",
"check_nomenclature_exist_proof",
"check_nomenclature_blurring",
"check_nomenclature_source_status",
"check_nomenclature_technique_collect",
]
[docs]
def do_nomenclatures_mapping(
imprt: TImports,
entity: Entity,
fields: Mapping[str, BibFields],
fill_with_defaults: bool = False,
) -> None:
"""
Set nomenclatures using content mapping.
Parameters
----------
imprt : TImports
The import to check.
entity : Entity
The entity to check.
fields : Mapping[str, BibFields]
Mapping of field names to BibFields objects.
fill_with_defaults : bool, optional
If True, fill empty user fields with default nomenclatures.
Notes
-----
See the following link for explanation on empty fields and default nomenclature handling:
https://github.com/PnX-SI/gn_module_import/issues/68#issuecomment-1384267087
"""
transient_table = imprt.destination.get_transient_table()
# Set nomenclatures using content mapping
for field in filter(lambda field: field.mnemonique != None, fields.values()):
source_col = transient_table.c[field.source_field]
dest_col = transient_table.c[field.dest_field]
# This CTE return the list of source value / cd_nomenclature for a given nomenclature type
cte = (
select(
sa.func.nullif(column("key"), "").label("value"), # replace "" by NULL
column("value").label("cd_nomenclature"),
)
.select_from(sa.func.JSON_EACH_TEXT(TImports.contentmapping[field.mnemonique]))
.where(TImports.id_import == imprt.id_import)
.cte("cte")
)
# This statement join the cte results with nomenclatures
# in order to set the id_nomenclature
stmt = (
update(transient_table)
.where(
transient_table.c.id_import == imprt.id_import,
source_col.isnot_distinct_from(cte.c.value), # to ensure NULL == NULL is True
TNomenclatures.cd_nomenclature == cte.c.cd_nomenclature,
BibNomenclaturesTypes.mnemonique == field.mnemonique,
TNomenclatures.id_type == BibNomenclaturesTypes.id_type,
)
.values({field.dest_field: TNomenclatures.id_nomenclature})
)
db.session.execute(stmt)
erroneous_conds = [dest_col == None]
if fill_with_defaults:
# Set default nomenclature for empty user fields
stmt = (
update(transient_table)
.where(
transient_table.c.id_import == imprt.id_import,
source_col == None,
dest_col == None,
) # empty source_col may be have been completed by mapping
.values(
{
field.dest_field: getattr(
sa.func, entity.destination_table_schema
).get_default_nomenclature_value(
field.mnemonique,
)
}
)
)
db.session.execute(stmt)
# Do not report invalid nomenclature when source_col is NULL: if dest_col is NULL,
# it is because there are no default nomenclature. This is the same as server
# default value getting default nomenclature which may be NULL (unexisting).
erroneous_conds.append(source_col != None)
report_erroneous_rows(
imprt,
entity,
error_type=ImportCodeError.INVALID_NOMENCLATURE,
error_column=field.name_field,
whereclause=sa.and_(*erroneous_conds),
)
[docs]
def check_nomenclature_exist_proof(
imprt: TImports,
entity: Entity,
nomenclature_field: BibFields,
digital_proof_field: Optional[BibFields],
non_digital_proof_field: Optional[BibFields],
) -> None:
"""
Check the existence of a nomenclature proof in the transient table.
Parameters
----------
imprt : TImports
The import to check.
entity : Entity
The entity to check.
nomenclature_field : BibFields
The field representing the nomenclature to check.
digital_proof_field : Optional[BibFields]
The field for digital proof, if any.
non_digital_proof_field : Optional[BibFields]
The field for non-digital proof, if any.
"""
transient_table = imprt.destination.get_transient_table()
if digital_proof_field is None and non_digital_proof_field is None:
return
oui_nomenclature = db.session.execute(
sa.select(TNomenclatures).where(
TNomenclatures.mnemonique == "Oui",
TNomenclatures.nomenclature_type.has(
BibNomenclaturesTypes.mnemonique == nomenclature_field.mnemonique
),
)
).scalar_one()
oui_filter = (
transient_table.c[nomenclature_field.dest_field] == oui_nomenclature.id_nomenclature
)
proof_set_filters = []
if digital_proof_field is not None:
proof_set_filters.append(
transient_table.c[digital_proof_field.dest_field] != None,
)
if non_digital_proof_field is not None:
proof_set_filters.append(
transient_table.c[non_digital_proof_field.dest_field] != None,
)
proof_set_filter = sa.or_(*proof_set_filters) if proof_set_filters else sa.false()
report_erroneous_rows(
imprt,
entity,
error_type=ImportCodeError.INVALID_EXISTING_PROOF_VALUE,
error_column=nomenclature_field.name_field,
whereclause=sa.or_(
sa.and_(oui_filter, ~proof_set_filter),
sa.and_(~oui_filter, proof_set_filter),
),
)
[docs]
def check_nomenclature_blurring(
imprt, entity, blurring_field, id_dataset_field, uuid_dataset_field
):
"""
Raise an error if blurring not set.
Required if the dataset is private.
"""
transient_table = imprt.destination.get_transient_table()
id_nomenclature_private = db.session.scalar(
select(TNomenclatures.id_nomenclature).where(
TNomenclatures.nomenclature_type.has(BibNomenclaturesTypes.mnemonique == "DS_PUBLIQUE"),
TNomenclatures.mnemonique == "Privée",
)
)
report_erroneous_rows(
imprt,
entity,
error_type=ImportCodeError.CONDITIONAL_MANDATORY_FIELD_ERROR,
error_column=blurring_field.name_field,
whereclause=sa.and_(
sa.or_(
transient_table.c[id_dataset_field.name_field] == TDatasets.id_dataset,
transient_table.c[uuid_dataset_field.name_field] == TDatasets.unique_dataset_id,
),
TDatasets.id_nomenclature_data_origin == id_nomenclature_private,
transient_table.c[blurring_field.dest_field] == None,
),
)
[docs]
def check_nomenclature_source_status(
imprt: TImports, entity: Entity, source_status_field: BibFields, ref_biblio_field: BibFields
) -> None:
"""
Check the nomenclature source status and raise an error if the status is "Lit" (Literature)
whereas the reference biblio field is empty.
Parameters
----------
imprt : TImports
The import to check.
entity : Entity
The entity to check.
source_status_field : BibFields
The field representing the source status.
ref_biblio_field : BibFields
The field representing the reference bibliography.
Notes
-----
The error codes are:
- CONDITIONAL_MANDATORY_FIELD_ERROR: the field is mandatory and not set.
"""
transient_table = imprt.destination.get_transient_table()
litterature_nomenclature = db.session.execute(
sa.select(TNomenclatures).where(
TNomenclatures.nomenclature_type.has(
BibNomenclaturesTypes.mnemonique == "STATUT_SOURCE"
),
TNomenclatures.cd_nomenclature == "Li",
)
).scalar_one()
report_erroneous_rows(
imprt,
entity,
error_type=ImportCodeError.CONDITIONAL_MANDATORY_FIELD_ERROR,
error_column=source_status_field.name_field,
whereclause=sa.and_(
transient_table.c[source_status_field.dest_field]
== litterature_nomenclature.id_nomenclature,
transient_table.c[ref_biblio_field.dest_field] == None,
),
)
[docs]
def check_nomenclature_technique_collect(
imprt: TImports,
entity: Entity,
source_status_field: BibFields,
technical_precision_field: BibFields,
) -> None:
"""
Check the nomenclature source status and raise an error if the status is "Autre, préciser"
whereas technical precision field is empty.
Parameters
----------
imprt : TImports
The import to check.
entity : Entity
The entity to check.
source_status_field : BibFields
The field representing the source status.
technical_precision_field : BibFields
The field representing the technical precision.
Notes
-----
The error codes are:
- CONDITIONAL_MANDATORY_FIELD_ERROR: the field is mandatory and not set.
"""
transient_table = imprt.destination.get_transient_table()
other = db.session.execute(
sa.select(TNomenclatures).where(
TNomenclatures.nomenclature_type.has(
BibNomenclaturesTypes.mnemonique == "TECHNIQUE_COLLECT_HAB"
),
TNomenclatures.cd_nomenclature == "10",
)
).scalar_one()
report_erroneous_rows(
imprt,
entity,
error_type=ImportCodeError.CONDITIONAL_MANDATORY_FIELD_ERROR,
error_column=source_status_field.name_field,
whereclause=sa.and_(
transient_table.c[source_status_field.dest_field] == other.id_nomenclature,
transient_table.c[technical_precision_field.dest_field] == None,
),
)