[docs]definit_rows_validity(imprt:TImports,dataset_name_field:str="id_dataset"):""" Validity columns are three-states: - None: the row does not contains data for the given entity - False: the row contains data for the given entity, but data are erroneous - True: the row contains data for the given entity, and data are valid """transient_table=imprt.destination.get_transient_table()entities=(Entity.query.filter_by(destination=imprt.destination).order_by(sa.desc(Entity.order)).all())# Set validity=NULL (not parcicipating in the entity) for all rowsdb.session.execute(sa.update(transient_table).where(transient_table.c.id_import==imprt.id_import).values({entity.validity_column:Noneforentityinentities}))# Multi-entity fields are ignored for entity identification, but this is not an issue# as rows with multi-entity field only will raise an ORPHAN_ROW errorselected_fields_names=[]forfield_name,source_fieldinimprt.fieldmapping.items():iftype(source_field)==list:selected_fields_names.extend(set(source_field)&set(imprt.columns))elifsource_fieldinimprt.columns:selected_fields_names.append(field_name)forentityinentities:# Select fields associated to this entity *and only to this entity*fields=(db.session.query(BibFields).where(BibFields.name_field.in_(selected_fields_names)).where(BibFields.entities.any(EntityField.entity==entity)).where(~BibFields.entities.any(EntityField.entity!=entity)).where(BibFields.name_field!=dataset_name_field).all())iffields:db.session.execute(sa.update(transient_table).where(transient_table.c.id_import==imprt.id_import).where(sa.or_(*[transient_table.c[field.source_column].isnot(None)forfieldinfields])).values({entity.validity_column:True}))
[docs]defcheck_orphan_rows(imprt):transient_table=imprt.destination.get_transient_table()# TODO: handle multi-source fields# This is actually not a big issue as multi-source fields are unlikely to also be multi-entity fields.selected_fields_names=[]forfield_name,source_fieldinimprt.fieldmapping.items():iftype(source_field)==list:selected_fields_names.extend(set(source_field)&set(imprt.columns))elifsource_fieldinimprt.columns:selected_fields_names.append(field_name)# Select fields associated to multiple entitiesAllEntityField=sa.orm.aliased(EntityField)fields=(db.session.query(BibFields).join(EntityField).join(Entity).order_by(Entity.order)# errors are associated to the first Entity.filter(BibFields.name_field.in_(selected_fields_names)).join(AllEntityField,AllEntityField.id_field==BibFields.id_field).group_by(BibFields.id_field,EntityField.id_field,Entity.id_entity).having(sa.func.count(AllEntityField.id_entity)>1).all())forfieldinfields:report_erroneous_rows(imprt,entity=None,# OK because ORPHAN_ROW has only WARNING levelerror_type=ImportCodeError.ORPHAN_ROW,error_column=field.name_field,whereclause=sa.and_(transient_table.c[field.source_field].isnot(None),*[transient_table.c[col].is_(None)forcolinimprt.destination.validity_columns],),)
defcheck_mandatory_field(imprt,entity,field):transient_table=imprt.destination.get_transient_table()source_field=transient_table.c[field.source_column]whereclause=sa.and_(transient_table.c[entity.validity_column].isnot(None),source_field.is_(None),)report_erroneous_rows(imprt,entity=entity,error_type=ImportCodeError.MISSING_VALUE,error_column=field.name_field,whereclause=whereclause,)# Currently not used as done during dataframe checksdefcheck_mandatory_fields(imprt,entity,fields):forfieldinfields.values():ifnotfield.mandatoryornotfield.dest_field:continuecheck_mandatory_field(imprt,entity,field)