Code source de geonature.core.gn_synthese.blueprints.reports

import datetime

import sqlalchemy as sa
from flask import Blueprint, abort, current_app, g, jsonify, request
from geonature.core.gn_permissions.decorators import (
    login_required,
    permissions_required,
)
from geonature.core.gn_permissions.tools import get_permissions
from geonature.core.gn_synthese.models import BibReportsTypes, Synthese, TReport
from geonature.core.gn_synthese.schemas import ReportSchema
from geonature.core.gn_synthese.utils.query_select_sqla import SyntheseQuery
from geonature.core.notifications.utils import dispatch_notifications
from geonature.utils.env import db
from pypnusershub.db.models import User
from sqlalchemy import asc, desc, or_, select
from sqlalchemy.orm import Load, joinedload
from utils_flask_sqla.response import json_resp
from werkzeug.exceptions import BadRequest, Conflict, Forbidden

[docs] reports_blueprint = Blueprint("reports", __name__)
@reports_blueprint.route("", methods=["POST"]) @permissions_required("R", module_code="SYNTHESE") @json_resp
[docs] def create_report(permissions): """ Create a report (e.g report) for a given synthese id Returns ------- report: `json`: Every occurrence's report """ session = db.session data = request.get_json() if data is None: raise BadRequest("Empty request data") try: type_name = data["type"] id_synthese = data["item"] content = data["content"] except KeyError: raise BadRequest("Empty request data") if not id_synthese: raise BadRequest("id_synthese is missing from the request") if not type_name: raise BadRequest("Report type is missing from the request") if not content and type_name == "discussion": raise BadRequest("Discussion content is required") type_exists = db.session.execute( sa.select(BibReportsTypes).filter_by(type=type_name) ).scalar_one_or_none() if not type_exists: raise BadRequest("This report type does not exist") synthese = db.session.scalars( select(Synthese) .options( Load(Synthese).raiseload("*"), joinedload("nomenclature_sensitivity"), joinedload("cor_observers"), joinedload("digitiser"), joinedload("dataset"), joinedload("areas"), joinedload("taxref_tree"), ) .filter_by(id_synthese=id_synthese) .limit(1), ).first() if not synthese: abort(404) if not synthese.has_instance_permission(permissions): raise Forbidden report_query = sa.select(TReport).where( TReport.id_synthese == id_synthese, TReport.report_type.has(BibReportsTypes.type == type_name), ) user_pin = sa.select(TReport).where( TReport.id_synthese == id_synthese, TReport.report_type.has(BibReportsTypes.type == "pin"), TReport.id_role == g.current_user.id_role, ) # only allow one alert by id_synthese if type_name in ["alert"]: alert_exists = db.session.execute(report_query).scalar_one_or_none() if alert_exists is not None: raise Conflict("This type already exists for this id") if type_name in ["pin"]: pin_exist = db.session.execute(user_pin).scalar_one_or_none() if pin_exist is not None: raise Conflict("This type already exists for this id") new_entry = TReport( id_synthese=id_synthese, id_role=g.current_user.id_role, content=content, creation_date=datetime.datetime.now(), id_type=type_exists.id_type, ) session.add(new_entry) if type_name == "discussion": # Get the observers of the observation observers = {observer.id_role for observer in synthese.cor_observers} # Get the users that commented the observation commenters = { report.id_role for report in db.session.scalars( report_query.where( TReport.id_role.notin_({synthese.id_digitiser} | observers) ).distinct(TReport.id_role) ).all() } # The id_roles are the Union between observers and commenters id_roles = observers | commenters | {synthese.id_digitiser} # Remove the user that just commented the obs not to notify him/her id_roles.discard(g.current_user.id_role) notify_new_report_change( synthese=synthese, user=g.current_user, id_roles=id_roles, content=content ) session.commit()
[docs] def notify_new_report_change(synthese, user, id_roles, content): if not synthese.id_digitiser: return dispatch_notifications( code_categories=["OBSERVATION-COMMENT"], id_roles=id_roles, title="Nouveau commentaire sur une observation", url=( current_app.config["URL_APPLICATION"] + "/#/synthese/occurrence/" + str(synthese.id_synthese) ), context={"synthese": synthese, "user": user, "content": content}, )
@reports_blueprint.route("/<int:id_report>", methods=["PUT"]) @login_required @json_resp
[docs] def update_content_report(id_report): """ Modify a report (e.g report) for a given synthese id Returns ------- report: `json`: Every occurrence's report """ data = request.json idReport = data["idReport"] report = db.get_or_404(TReport, idReport) if report.user != g.current.user: raise Forbidden report.content = data["content"] db.session.commit()
@reports_blueprint.route("", methods=["GET"]) @permissions_required("R", module_code="SYNTHESE")
[docs] def list_all_reports(permissions): # Parameters type_name = request.args.get("type") orderby = request.args.get("orderby", "creation_date") sort = request.args.get("sort") page = request.args.get("page", 1, int) per_page = request.args.get("per_page", 10, int) my_reports = request.args.get("my_reports", "false").lower() == "true" # Start query query = ( sa.select(TReport, User.nom_complet) .join(User, TReport.id_role == User.id_role) .options( joinedload(TReport.report_type).load_only( BibReportsTypes.type, BibReportsTypes.id_type ), joinedload(TReport.synthese).load_only( Synthese.cd_nom, Synthese.nom_cite, Synthese.observers, Synthese.date_min, Synthese.date_max, ), joinedload(TReport.user).load_only(User.nom_role, User.prenom_role), ) ) # Verify and filter by type if type_name: type_exists = db.session.scalar( sa.exists(BibReportsTypes).where(BibReportsTypes.type == type_name).select() ) if not type_exists: raise BadRequest("This report type does not exist") query = query.where(TReport.report_type.has(BibReportsTypes.type == type_name)) # Filter by id_role for 'pin' type only or if my_reports is true if type_name == "pin" or my_reports: query = query.where( or_( TReport.id_role == g.current_user.id_role, TReport.id_synthese.in_( select(TReport.id_synthese).where(TReport.id_role == g.current_user.id_role) ), TReport.synthese.has(Synthese.id_digitiser == g.current_user.id_role), TReport.synthese.has( Synthese.cor_observers.any(User.id_role == g.current_user.id_role) ), ) ) # On vérifie les permissions en lecture sur la synthese synthese_query = select(Synthese.id_synthese).select_from(Synthese) synthese_query_obj = SyntheseQuery(Synthese, synthese_query, {}) synthese_query_obj.filter_query_with_permissions(g.current_user, permissions) cte_synthese = synthese_query_obj.query.cte("cte_synthese") query = query.where(TReport.id_synthese == cte_synthese.c.id_synthese) SORT_COLUMNS = { "user.nom_complet": User.nom_complet, "content": TReport.content, "creation_date": TReport.creation_date, } # Determine the sorting if orderby in SORT_COLUMNS: sort_column = SORT_COLUMNS[orderby] if sort == "desc": query = query.order_by(desc(sort_column)) else: query = query.order_by(asc(sort_column)) else: raise BadRequest("Bad orderby") # Pagination paginated_results = db.paginate(query, page=page, per_page=per_page) result = [] for report in paginated_results.items: report_dict = { "id_report": report.id_report, "id_synthese": report.id_synthese, "id_role": report.id_role, "report_type": { "type": report.report_type.type, "id_type": report.report_type.id_type, }, "content": report.content, "deleted": report.deleted, "creation_date": report.creation_date, "user": {"nom_complet": report.user.nom_complet}, "synthese": { "cd_nom": report.synthese.cd_nom, "nom_cite": report.synthese.nom_cite, "observers": report.synthese.observers, "date_min": report.synthese.date_min, "date_max": report.synthese.date_max, }, } result.append(report_dict) response = { "total": paginated_results.total, "page": paginated_results.page, "per_page": paginated_results.per_page, "items": result, } return jsonify(response)
@reports_blueprint.route("/<int:id_synthese>", methods=["GET"]) @permissions_required("R", module_code="SYNTHESE")
[docs] def list_reports(permissions, id_synthese): type_name = request.args.get("type") synthese = db.get_or_404(Synthese, id_synthese) if not synthese.has_instance_permission(permissions): raise Forbidden query = sa.select(TReport).where(TReport.id_synthese == id_synthese) # Verify and filter by type if type_name: type_exists = db.session.scalar( sa.exists(BibReportsTypes).where(BibReportsTypes.type == type_name).select() ) if not type_exists: raise BadRequest("This report type does not exist") query = query.where(TReport.report_type.has(BibReportsTypes.type == type_name)) # Filter by id_role for 'pin' type only if type_name == "pin": query = query.where(TReport.id_role == g.current_user.id_role) # Join the User table query = query.options( joinedload(TReport.user).load_only(User.nom_role, User.prenom_role), joinedload(TReport.report_type), ) return ReportSchema(many=True, only=["+user.nom_role", "+user.prenom_role"]).dump( db.session.scalars(query).all() )
@reports_blueprint.route("/<int:id_report>", methods=["DELETE"]) @login_required @json_resp
[docs] def delete_report(id_report): reportItem = TReport.query.get_or_404(id_report) # alert control to check cruved - allow validators only if reportItem.report_type.type in ["alert"]: permissions = get_permissions(module_code="SYNTHESE", action_code="R") if not reportItem.synthese.has_instance_permission(permissions): raise Forbidden("Permission required to delete this report !") # only owner could delete a report for pin and discussion if reportItem.id_role != g.current_user.id_role and reportItem.report_type.type in [ "discussion", "pin", ]: raise Forbidden # discussion control to don't delete but tag report as deleted only if reportItem.report_type.type == "discussion": reportItem.content = "" reportItem.deleted = True else: db.session.delete(reportItem) db.session.commit()