Code source de geonature.core.imports.tasks
from datetime import datetime
from flask import current_app
import sqlalchemy as sa
from sqlalchemy import func, select, delete
from sqlalchemy.dialects.postgresql import array_agg, aggregate_order_by
from celery.utils.log import get_task_logger
from geonature.utils.env import db
from geonature.utils.celery import celery_app
from geonature.core.notifications.utils import dispatch_notifications
from geonature.core.imports.models import BibFields, Entity, EntityField, TImports
from geonature.core.imports.checks.sql import init_rows_validity, check_orphan_rows
[docs]
logger = get_task_logger(__name__)
@celery_app.task(bind=True)
[docs]
def do_import_checks(self, import_id):
"""
Verify the import data.
Parameters
----------
import_id : int
The ID of the import to verify.
"""
logger.info(f"Starting verification of import {import_id}.")
imprt = db.session.get(TImports, import_id)
if imprt is None or imprt.task_id != self.request.id:
logger.warning("Task cancelled, doing nothing.")
return
imprt.destination.actions.check_transient_data(self, logger, imprt)
self.update_state(state="PROGRESS", meta={"progress": 1})
imprt = db.session.get(TImports, import_id, with_for_update={"of": TImports})
if imprt is None or imprt.task_id != self.request.id:
logger.warning("Task cancelled, rollback changes.")
db.session.rollback()
else:
logger.info("All done, committing…")
transient_table = imprt.destination.get_transient_table()
imprt.processed = True
imprt.task_id = None
stmt = (
select(
array_agg(aggregate_order_by(transient_table.c.line_no, transient_table.c.line_no))
)
.where(
sa.or_(*[transient_table.c[v] == False for v in imprt.destination.validity_columns])
)
.where(transient_table.c.id_import == imprt.id_import)
)
imprt.erroneous_rows = db.session.execute(stmt).scalar()
db.session.commit()
@celery_app.task(bind=True)
[docs]
def do_import_in_destination(self, import_id):
"""
Insert valid transient data into the destination of an import.
Parameters
----------
import_id : int
The ID of the import to insert data into the destination.
"""
logger.info(f"Starting insertion in destination of import {import_id}.")
imprt = db.session.get(TImports, import_id)
if imprt is None or imprt.task_id != self.request.id:
logger.warning("Task cancelled, doing nothing.")
return
transient_table = imprt.destination.get_transient_table()
# Copy valid transient data to destination
imprt.destination.actions.import_data_to_destination(imprt)
count_entities = 0
entities = db.session.scalars(
sa.select(Entity).filter_by(destination=imprt.destination).order_by(Entity.order)
).all()
for entity in entities:
fields = db.session.scalars(
sa.select(BibFields).where(
BibFields.entities.any(EntityField.entity == entity),
BibFields.dest_field != None,
BibFields.name_field.in_(imprt.fieldmapping.keys()),
)
).all()
columns_to_count_unique_entities = [
transient_table.c[field.dest_column] for field in fields
]
n_valid_data = db.session.execute(
select(func.count(func.distinct(*columns_to_count_unique_entities)))
.select_from(transient_table)
.where(transient_table.c.id_import == imprt.id_import)
.where(transient_table.c[entity.validity_column] == True)
).scalar()
count_entities += n_valid_data
imprt.statistics["import_count"] = count_entities
# COUNT VALID ROW in SOURCE FILE
imprt.statistics["nb_line_valid"] = db.session.execute(
select(func.count("*"))
.select_from(transient_table)
.where(transient_table.c.id_import == imprt.id_import)
.where(sa.or_(*[transient_table.c[entity.validity_column] != False for entity in entities]))
).scalar()
# Clear transient data
db.session.execute(
delete(transient_table).where(transient_table.c.id_import == imprt.id_import)
)
imprt.loaded = False
imprt = db.session.get(TImports, import_id, with_for_update={"of": TImports})
if imprt is None or imprt.task_id != self.request.id:
logger.warning("Task cancelled, rollback changes.")
db.session.rollback()
return
logger.info("All done, committing…")
imprt.task_id = None
imprt.date_end_import = datetime.now()
# Send element to notification system
notify_import_done(imprt)
db.session.commit()
# Send notification
[docs]
def notify_import_done(imprt: TImports):
"""
Notify the authors of an import that it has finished.
Parameters
----------
imprt : TImports
The import that has finished.
"""
id_authors = [author.id_role for author in imprt.authors]
dispatch_notifications(
code_categories=["IMPORT-DONE%"],
id_roles=id_authors,
title="Import terminé",
url=(
current_app.config["URL_APPLICATION"]
+ f"/#/import/{imprt.destination.code}/{imprt.id_import}/report"
),
context={
"import": imprt,
"destination": imprt.destination,
"url_notification_rules": current_app.config["URL_APPLICATION"]
+ "/#/notification/rules",
},
)