import json
import logging
from flask import (
Blueprint,
request,
jsonify,
g,
)
from werkzeug.exceptions import Forbidden, BadRequest
import sqlalchemy as sa
from sqlalchemy.orm import joinedload
from sqlalchemy import select, func, delete, exists
from geonature.utils.env import db
from geonature.core.gn_permissions import decorators as permissions
from geonature.core.notifications.models import (
Notification,
NotificationMethod,
NotificationRule,
NotificationTemplate,
NotificationCategory,
)
[docs]
routes = Blueprint("notifications", __name__)
[docs]
log = logging.getLogger()
# Get all database notification for current user
@routes.route("/notifications", methods=["GET"])
@permissions.login_required
[docs]
def list_database_notification():
notifications = select(Notification).where(Notification.id_role == g.current_user.id_role)
notifications = notifications.order_by(
Notification.code_status.desc(), Notification.creation_date.desc()
)
result = [
notificationsResult.as_dict(
fields=[
"id_notification",
"id_role",
"title",
"content",
"url",
"code_status",
"creation_date",
]
)
for notificationsResult in db.session.scalars(notifications).all()
]
return jsonify(result)
# count database unread notification for current user
@routes.route("/count", methods=["GET"])
@permissions.login_required
[docs]
def count_notification():
notificationNumber = db.session.execute(
select(func.count("*"))
.select_from(Notification)
.where(Notification.id_role == g.current_user.id_role, Notification.code_status == "UNREAD")
).scalar_one()
return jsonify(notificationNumber)
# Update status ( for the moment only UNREAD/READ)
@routes.route("/notifications/<int:id_notification>", methods=["POST"])
@permissions.login_required
[docs]
def update_notification(id_notification):
notification = db.get_or_404(Notification, id_notification)
if notification.id_role != g.current_user.id_role:
raise Forbidden
notification.code_status = "READ"
db.session.commit()
return jsonify(notification.as_dict())
# Get all database notification for current user
@routes.route("/rules", methods=["GET"])
@permissions.login_required
[docs]
def list_notification_rules():
rules = NotificationRule.filter_by_role_with_defaults().options(
joinedload(NotificationRule.method),
joinedload(NotificationRule.category),
)
result = [
rule.as_dict(
fields=[
"code_method",
"code_category",
"method.label",
"method.description",
"category.label",
"category.description",
"subscribed",
]
)
for rule in db.session.scalars(rules).all()
]
return jsonify(result)
# Delete all rules for current user
@routes.route("/notifications", methods=["DELETE"])
@permissions.login_required
[docs]
def delete_all_notifications():
nbNotificationsDeleted = delete(Notification).where(
Notification.id_role == g.current_user.id_role
)
nbNotificationsDeleted = db.session.execute(nbNotificationsDeleted).rowcount
db.session.commit()
return jsonify(nbNotificationsDeleted)
# add rule for user
@routes.route(
"/rules/category/<code_category>/method/<code_method>/subscribe",
methods=["POST"],
defaults={"subscribe": True},
)
@routes.route(
"/rules/category/<code_category>/method/<code_method>/unsubscribe",
methods=["POST"],
defaults={"subscribe": False},
)
@permissions.login_required
[docs]
def update_rule(code_category, code_method, subscribe):
if not db.session.scalar(
exists().where(NotificationCategory.code == str(code_category)).select()
):
raise BadRequest("Invalid category")
if not db.session.scalar(exists().where(NotificationMethod.code == str(code_method)).select()):
raise BadRequest("Invalid method")
# Create new rule for current user
rule = db.session.scalars(
select(NotificationRule).filter_by(
id_role=g.current_user.id_role,
code_method=code_method,
code_category=code_category,
)
).one_or_none()
if rule:
rule.subscribed = subscribe
else:
rule = NotificationRule(
id_role=g.current_user.id_role,
code_method=code_method,
code_category=code_category,
subscribed=subscribe,
)
db.session.add(rule)
db.session.commit()
return jsonify(rule.as_dict(fields=["code_method", "code_category", "subscribed"]))
# Delete all rules for current user
@routes.route("/rules", methods=["DELETE"])
@permissions.login_required
[docs]
def delete_all_rules():
nb_rules_deleted = delete(NotificationRule).where(
NotificationRule.id_role == g.current_user.id_role
)
nb_rules_deleted = db.session.execute(nb_rules_deleted).rowcount
db.session.commit()
return jsonify(nb_rules_deleted)
# Get all availabe method for notification
@routes.route("/methods", methods=["GET"])
@permissions.login_required
[docs]
def list_notification_methods():
notificationMethods = db.session.scalars(
select(NotificationMethod).order_by(NotificationMethod.code.asc())
).all()
result = [
notificationsMethod.as_dict(
fields=[
"code",
"label",
"description",
]
)
for notificationsMethod in notificationMethods
]
return jsonify(result)
# Get all availabe category for notification
@routes.route("/categories", methods=["GET"])
@permissions.login_required
[docs]
def list_notification_categories():
notificationCategories = db.session.scalars(
select(NotificationCategory).order_by(NotificationCategory.code.asc())
).all()
result = [
notificationsCategory.as_dict(
fields=[
"code",
"label",
"description",
]
)
for notificationsCategory in notificationCategories
]
return jsonify(result)