[docs]defdo_import_checks(self,import_id):""" Verify the import data. Parameters ---------- import_id : int The ID of the import to verify. """logger.info(f"Starting verification of import {import_id}.")imprt=db.session.get(TImports,import_id)ifimprtisNoneorimprt.task_id!=self.request.id:logger.warning("Task cancelled, doing nothing.")returnimprt.destination.actions.check_transient_data(self,logger,imprt)self.update_state(state="PROGRESS",meta={"progress":1})imprt=db.session.get(TImports,import_id,with_for_update={"of":TImports})ifimprtisNoneorimprt.task_id!=self.request.id:logger.warning("Task cancelled, rollback changes.")db.session.rollback()else:logger.info("All done, committing…")transient_table=imprt.destination.get_transient_table()imprt.processed=Trueimprt.task_id=Nonestmt=(select(array_agg(aggregate_order_by(transient_table.c.line_no,transient_table.c.line_no))).where(sa.or_(*[transient_table.c[v]==Falseforvinimprt.destination.validity_columns])).where(transient_table.c.id_import==imprt.id_import))imprt.erroneous_rows=db.session.execute(stmt).scalar()db.session.commit()
@celery_app.task(bind=True)
[docs]defdo_import_in_destination(self,import_id):""" Insert valid transient data into the destination of an import. Parameters ---------- import_id : int The ID of the import to insert data into the destination. """logger.info(f"Starting insertion in destination of import {import_id}.")imprt=db.session.get(TImports,import_id)ifimprtisNoneorimprt.task_id!=self.request.id:logger.warning("Task cancelled, doing nothing.")returntransient_table=imprt.destination.get_transient_table()# Copy valid transient data to destinationimprt.destination.actions.import_data_to_destination(imprt)count_entities=0entities=db.session.scalars(sa.select(Entity).filter_by(destination=imprt.destination).order_by(Entity.order)).all()forentityinentities:fields=db.session.scalars(sa.select(BibFields).where(BibFields.entities.any(EntityField.entity==entity),BibFields.dest_field!=None,BibFields.name_field.in_(imprt.fieldmapping.keys()),)).all()columns_to_count_unique_entities=[transient_table.c[field.dest_column]forfieldinfields]n_valid_data=db.session.execute(select(func.count(func.distinct(*columns_to_count_unique_entities))).select_from(transient_table).where(transient_table.c.id_import==imprt.id_import).where(transient_table.c[entity.validity_column]==True)).scalar()count_entities+=n_valid_dataimprt.statistics["import_count"]=count_entities# COUNT VALID ROW in SOURCE FILEimprt.statistics["nb_line_valid"]=db.session.execute(select(func.count("*")).select_from(transient_table).where(transient_table.c.id_import==imprt.id_import).where(sa.or_(*[transient_table.c[entity.validity_column]!=Falseforentityinentities]))).scalar()# Clear transient datadb.session.execute(delete(transient_table).where(transient_table.c.id_import==imprt.id_import))imprt.loaded=Falseimprt=db.session.get(TImports,import_id,with_for_update={"of":TImports})ifimprtisNoneorimprt.task_id!=self.request.id:logger.warning("Task cancelled, rollback changes.")db.session.rollback()returnlogger.info("All done, committing…")imprt.task_id=Noneimprt.date_end_import=datetime.now()# Send element to notification systemnotify_import_done(imprt)db.session.commit()
# Send notification
[docs]defnotify_import_done(imprt:TImports):""" Notify the authors of an import that it has finished. Parameters ---------- imprt : TImports The import that has finished. """id_authors=[author.id_roleforauthorinimprt.authors]dispatch_notifications(code_categories=["IMPORT-DONE%"],id_roles=id_authors,title="Import terminé",url=(current_app.config["URL_APPLICATION"]+f"/#/import/{imprt.destination.code}/{imprt.id_import}/report"),context={"import":imprt,"destination":imprt.destination,"url_notification_rules":current_app.config["URL_APPLICATION"]+"/#/notification/rules",},)