Code source de geonature.utils.celery

from celery import Celery
import flask


[docs] class FlaskCelery(Celery): def __init__(self, *args, **kwargs): super(FlaskCelery, self).__init__(*args, **kwargs) if "app" in kwargs: self.init_app(kwargs["app"])
[docs] def patch_task(self): TaskBase = self.Task _celery = self class ContextTask(TaskBase): abstract = True def __call__(self, *args, **kwargs): if hasattr(_celery, "app"): with _celery.app.app_context(): # No need for db.session.remove() since it is automatically closed # by flask-sqlalchemy when exit the app context created return TaskBase.__call__(self, *args, **kwargs) else: return TaskBase.__call__(self, *args, **kwargs) self.Task = ContextTask
[docs] def init_app(self, app): self.app = app self.config_from_object(app.config["CELERY"]) if not self.conf.task_always_eager: self.patch_task()
[docs] celery_app = FlaskCelery("geonature")