Code source de geonature.core.health.routes
from celery import Celery
from flask import Blueprint
from geonature.utils.env import db
import redis
from flask import current_app
[docs]
routes = Blueprint("health", __name__)
[docs]
def check_all_dependencies():
"""
Check all dependencies are available.
This function checks that the database and the redis server are available.
Returns
-------
dict
The value for each key is a boolean indicating if the connection is available or not.
"""
check = {
"database_connection": {"status": True, "message": "The database is available"},
"redis_connection": {"status": True, "message": "The redis server is available"},
"celery_worker": {"status": True, "message": "The celery worker is available"},
}
try:
db.session.execute("SELECT 1")
except Exception as e:
check["database_connection"] = {
"status": False,
"message": f"The database is not available",
}
try:
if "CELERY" in current_app.config:
r = redis.from_url(current_app.config["CELERY"]["broker_url"])
r.ping()
except redis.ConnectionError:
check["redis_connection"] = {
"status": False,
"message": "The redis server is not available",
}
if "CELERY" in current_app.config:
if check["redis_connection"]["status"]:
from geonature.utils.celery import celery_app
is_pytest = current_app.config["CELERY"].get("task_always_eager", False)
if not is_pytest and not celery_app.control.inspect().active():
check["celery_worker"] = {
"status": False,
"message": "The celery worker is not running",
}
else:
check["celery_worker"] = {
"status": False,
"message": "The celery worker is not accessible because the redis server is not available",
}
return check
@routes.route("/healthz", methods=["GET"])
[docs]
def health_check():
services_status = [status_data["status"] for _, status_data in check_all_dependencies().items()]
if all(services_status):
return "OK", 200
return "Service Unavailable", 500
@routes.route("/services_status", methods=["GET"])
[docs]
def services_status():
return {
"services": [
{
"name": service_name,
"status": "ONLINE" if service_status["status"] else "OFFLINE",
"message": service_status["message"],
}
for service_name, service_status in check_all_dependencies().items()
]
}