Code source de geonature.core.gn_profiles.tasks

from sqlalchemy.sql import func
from celery.utils.log import get_task_logger
from celery.schedules import crontab

from geonature.utils.env import db
from geonature.utils.config import config
from geonature.utils.celery import celery_app


[docs] logger = get_task_logger(__name__)
@celery_app.on_after_finalize.connect
[docs] def setup_periodic_tasks(sender, **kwargs): ct = config["PROFILES_REFRESH_CRONTAB"] if ct: minute, hour, day_of_month, month_of_year, day_of_week = ct.split(" ") sender.add_periodic_task( crontab( minute=minute, hour=hour, day_of_week=day_of_week, day_of_month=day_of_month, month_of_year=month_of_year, ), refresh_profiles.s(), name="refresh profiles", )
@celery_app.task(bind=True)
[docs] def refresh_profiles(self): logger.info("Refresh profiles...") db.session.execute(func.gn_profiles.refresh_profiles()) db.session.commit() logger.info("Profiles refreshed.")