Code source de geonature.core.imports.routes.imports

import codecs
import csv
import json
import unicodedata
from io import BytesIO, StringIO, TextIOWrapper

# url_quote was deprecated in werkzeug 3.0 https://stackoverflow.com/a/77222063/5807438
from urllib.parse import quote as url_quote

from flask import current_app, g, jsonify, request, send_file, stream_with_context
from flask_login import current_user, login_required
from geonature.core.gn_commons.models import TModules
from geonature.core.gn_permissions import decorators as permissions
from geonature.core.imports.blueprint import blueprint
from geonature.core.imports.checks.sql.user import user_matching
from geonature.core.imports.models import (
    BibFields,
    ContentMapping,
    Destination,
    Entity,
    EntityField,
    FieldMapping,
    ImportUserError,
    TImports,
)
from geonature.core.imports.schemas import ImportSchema
from geonature.core.imports.tasks import do_import_checks, do_import_in_destination
from geonature.core.imports.utils import (
    ImportStep,
    clean_import,
    detect_encoding,
    detect_separator,
    generate_pdf_from_template,
    get_file_size,
    insert_import_data_in_transient_table,
)
from geonature.utils.config import config
from geonature.utils.env import db
from geonature.utils.sentry import start_sentry_child
from marshmallow import EXCLUDE
from pypnnomenclature.models import TNomenclatures
from pypnusershub.db.models import User
from sqlalchemy import delete, desc, func, or_, select
from sqlalchemy.inspection import inspect
from sqlalchemy.orm import Load, contains_eager, joinedload, load_only, undefer
from sqlalchemy.orm.attributes import set_committed_value
from sqlalchemy.sql.expression import collate, exists
from werkzeug.exceptions import BadRequest, Conflict, Forbidden, Gone, NotFound

[docs] IMPORTS_PER_PAGE = 15
@blueprint.url_value_preprocessor
[docs] def resolve_import(endpoint, values): if current_app.url_map.is_endpoint_expecting(endpoint, "import_id"): import_id = values.pop("import_id") if import_id is not None: imprt = TImports.query.options( joinedload(TImports.destination).joinedload(Destination.module) ).get_or_404(import_id) if imprt.destination != values.pop("destination"): raise NotFound else: imprt = None values["imprt"] = imprt
@blueprint.route("/imports/", methods=["GET"]) @blueprint.route("/<destination>/imports/", methods=["GET"]) @permissions.check_cruved_scope("R", get_scope=True, module_code="IMPORT", object_code="IMPORT")
[docs] def get_import_list(scope, destination=None): """ .. :quickref: Import; Get all imports. Get all imports to which logged-in user has access. """ page = request.args.get("page", default=1, type=int) limit = request.args.get("limit", default=IMPORTS_PER_PAGE, type=int) search = request.args.get("search", default=None, type=str) sort = request.args.get("sort", default="date_create_import", type=str) sort_dir = request.args.get("sort_dir", default="desc", type=str) filters = [] if search: filters.append(TImports.full_file_name.ilike(f"%{search}%")) filters.append( TImports.authors.any( or_( User.prenom_role.ilike(f"%{search}%"), User.nom_role.ilike(f"%{search}%"), ), ) ) filters.append( TImports.authors.any( func.lower(User.nom_role).contains(func.lower(search)), ) ) try: order_by = get_foreign_key_attr(TImports, sort) order_by = order_by() if callable(order_by) else order_by except AttributeError: raise BadRequest(f"Import field '{sort}' does not exist.") if sort_dir == "desc": order_by = desc(order_by) query = ( select(TImports) .options( contains_eager(TImports.authors), contains_eager(TImports.destination).contains_eager(Destination.module), ) .join(TImports.authors, isouter=True) .join(Destination) .join(TModules) .where(TImports.filter_by_scope(scope=scope)) .where(or_(*filters) if len(filters) > 0 else True) .order_by(order_by) ) if destination: query = query.where(TImports.destination == destination) imports = db.paginate(query, page=page, error_out=False, max_per_page=limit) data = { "imports": [imprt.as_dict() for imprt in imports.items], "count": imports.total, "limit": limit, "offset": page - 1, } return jsonify(data)
@blueprint.route("/<destination>/imports/<int:import_id>/", methods=["GET"]) @permissions.check_cruved_scope("R", get_scope=True, module_code="IMPORT", object_code="IMPORT")
[docs] def get_one_import(scope, imprt): """ .. :quickref: Import; Get an import. Get an import. """ # check that the user has read permission to this particular import instance: if not imprt.has_instance_permission(scope, action_code="R"): raise Forbidden return jsonify(imprt.as_dict())
@blueprint.route("/<destination>/imports/upload", defaults={"import_id": None}, methods=["POST"]) @blueprint.route("/<destination>/imports/<int:import_id>/upload", methods=["PUT"]) @permissions.check_cruved_scope("C", get_scope=True, module_code="IMPORT", object_code="IMPORT")
[docs] def upload_file(scope, imprt, destination=None): # destination is set when imprt is None """ .. :quickref: Import; Add an import or update an existing import. Add an import or update an existing import. :form file: file to import """ if imprt: if not imprt.has_instance_permission(scope, action_code="C"): raise Forbidden destination = imprt.destination else: assert destination input_file = request.files.get("file", None) input_fieldmapping_preset = request.form.get("fieldmapping", None) # file must be set only when new import. otherwise, it's optional. it can also be a request to update the fieldmapping preset if imprt is None and input_file is None: raise BadRequest("File parameter is missig") # Process destination if imprt is None: imprt = TImports(destination=destination) author = g.current_user imprt.authors.append(author) db.session.add(imprt) else: # If imprt exists, remove observer mapping imprt.observermapping = {} # Process field mapping if input_fieldmapping_preset: fieldmapping_preset = json.loads(input_fieldmapping_preset) # NOTES: Pas possible d'utiliser le validate value ici # try: # FieldMapping.validate_values(fields_to_map) # except ValueError as e: # raise BadRequest(*e.args) if imprt.fieldmapping is None: imprt.fieldmapping = {} imprt.fieldmapping.update(fieldmapping_preset) # Process file if input_file: size = get_file_size(input_file) # value in config file is in Mo max_file_size = current_app.config["IMPORT"]["MAX_FILE_SIZE"] * 1024 * 1024 if size > max_file_size: raise BadRequest( description=f"File too big ({size} > {max_file_size})." ) # FIXME better error signaling? if size == 0: raise BadRequest(description="Impossible to upload empty files") else: clean_import(imprt, ImportStep.UPLOAD) with start_sentry_child(op="task", description="detect encoding"): imprt.detected_encoding = detect_encoding(input_file) with start_sentry_child(op="task", description="detect separator"): imprt.detected_separator = detect_separator( input_file, encoding=imprt.encoding or imprt.detected_encoding, ) imprt.source_file = input_file.read() imprt.full_file_name = input_file.filename # commit changes db.session.commit() return jsonify(imprt.as_dict())
@blueprint.route("/<destination>/imports/<int:import_id>/update", methods=["PUT"]) @permissions.check_cruved_scope("C", get_scope=True, module_code="IMPORT", object_code="IMPORT")
[docs] def update_import(scope, imprt, destination=None): # destination is set when imprt is None """ .. :quickref: Import; Add an import or update an existing import. Add an import or update an existing import. :form file: file to import """ if imprt: if not imprt.has_instance_permission(scope, action_code="C"): raise Forbidden destination = imprt.destination else: raise BadRequest("Import with id {} does not exist") data = request.get_json() if not data: raise BadRequest(description="No data provided") try: ImportSchema().load(data, unknown=EXCLUDE) except Exception as e: raise BadRequest(description=f"{e}") # commit changes db.session.commit() return jsonify(imprt.as_dict())
@blueprint.route("/<destination>/imports/<int:import_id>/decode", methods=["POST"]) @permissions.check_cruved_scope("C", get_scope=True, module_code="IMPORT", object_code="IMPORT")
[docs] def decode_file(scope, imprt): if not imprt.has_instance_permission(scope, action_code="C"): raise Forbidden if imprt.source_file is None: raise BadRequest(description="A file must be first uploaded.") if "encoding" not in request.json: raise BadRequest(description="Missing encoding.") encoding = request.json["encoding"] try: codecs.lookup(encoding) except LookupError: raise BadRequest(description="Unknown encoding.") imprt.encoding = encoding if "format" not in request.json: raise BadRequest(description="Missing format.") if request.json["format"] not in TImports.AVAILABLE_FORMATS: raise BadRequest(description="Unknown format.") imprt.format_source_file = request.json["format"] if "srid" not in request.json: raise BadRequest(description="Missing srid.") try: imprt.srid = int(request.json["srid"]) except ValueError: raise BadRequest(description="SRID must be an integer.") if "separator" not in request.json: raise BadRequest(description="Missing separator") if request.json["separator"] not in TImports.AVAILABLE_SEPARATORS: raise BadRequest(description="Unknown separator") imprt.separator = request.json["separator"] clean_import(imprt, ImportStep.DECODE) db.session.commit() # commit parameters decode = request.args.get("decode", 1) try: decode = int(decode) except ValueError: raise BadRequest(description="decode parameter must but an int") if decode: csvfile = TextIOWrapper(BytesIO(imprt.source_file), encoding=imprt.encoding) csvreader = csv.reader(csvfile, delimiter=imprt.separator) try: columns = next(csvreader) while True: # read full file to ensure that no encoding errors occur next(csvreader) except UnicodeError: raise BadRequest( description="Erreur d’encodage lors de la lecture du fichier source. " "Avez-vous sélectionné le bon encodage de votre fichier ?" ) except StopIteration: pass duplicates = set([col for col in columns if columns.count(col) > 1]) if duplicates: raise BadRequest(f"Duplicates column names: {duplicates}") imprt.columns = columns db.session.commit() return jsonify(imprt.as_dict())
@blueprint.route("/<destination>/imports/<int:import_id>/fieldmapping", methods=["POST"]) @permissions.check_cruved_scope("C", get_scope=True, module_code="IMPORT", object_code="IMPORT")
[docs] def set_import_field_mapping(scope, imprt): if not imprt.has_instance_permission(scope, action_code="C"): raise Forbidden try: FieldMapping.validate_values(request.json) except ValueError as e: raise BadRequest(*e.args) imprt.fieldmapping = request.json clean_import(imprt, ImportStep.LOAD) db.session.commit() return jsonify(imprt.as_dict())
@blueprint.route("/<destination>/imports/<int:import_id>/load", methods=["POST"]) @permissions.check_cruved_scope("C", get_scope=True, module_code="IMPORT", object_code="IMPORT")
[docs] def load_import(scope, imprt): if not imprt.has_instance_permission(scope, action_code="C"): raise Forbidden if imprt.source_file is None: raise BadRequest(description="A file must be first uploaded.") if imprt.fieldmapping is None: raise BadRequest(description="File fields must be first mapped.") clean_import(imprt, ImportStep.LOAD) with start_sentry_child(op="task", description="insert data in db"): line_no = insert_import_data_in_transient_table(imprt) if not line_no: raise BadRequest("File with 0 lines.") imprt.source_count = line_no imprt.loaded = True db.session.commit() return jsonify(imprt.as_dict())
@blueprint.route("/<destination>/imports/<int:import_id>/columns", methods=["GET"]) @permissions.check_cruved_scope("R", get_scope=True, module_code="IMPORT", object_code="IMPORT")
[docs] def get_import_columns_name(scope, imprt): """ .. :quickref: Import; Return all the columns of the file of an import """ if not imprt.has_instance_permission(scope, action_code="C"): raise Forbidden if not imprt.columns: raise Conflict(description="Data have not been decoded.") return jsonify(imprt.columns)
@blueprint.route("/<destination>/imports/<int:import_id>/values", methods=["GET"]) @permissions.check_cruved_scope("R", get_scope=True, module_code="IMPORT", object_code="IMPORT")
[docs] def get_import_values(scope, imprt): """ .. :quickref: Import; Return all values present in imported file for nomenclated fields """ # check that the user has read permission to this particular import instance: if not imprt.has_instance_permission(scope, action_code="C"): raise Forbidden if not imprt.loaded: raise Conflict(description="Data have not been loaded") nomenclated_fields = db.session.scalars( select(BibFields) .where(BibFields.mnemonique != None, BibFields.destination == imprt.destination) .options(joinedload(BibFields.nomenclature_type)) .order_by(BibFields.id_field) ).all() # Note: response format is validated with jsonschema in tests transient_table = imprt.destination.get_transient_table() response = {} for field in nomenclated_fields: if field.name_field not in imprt.fieldmapping: # this nomenclated field is not mapped continue source = imprt.fieldmapping[field.name_field] if ( source.get("column_src", None) not in imprt.columns and source.get("constant_value", None) is None ): # the file do not contain this field expected by the mapping and there is no constant value continue # TODO: vérifier que l’on a pas trop de valeurs différentes ? column = field.source_column values = [ value for value, in db.session.execute( select(transient_table.c[column]) .where(transient_table.c.id_import == imprt.id_import) .distinct(transient_table.c[column]) ).fetchall() ] set_committed_value( field.nomenclature_type, "nomenclatures", TNomenclatures.query.filter_by(nomenclature_type=field.nomenclature_type).order_by( collate(TNomenclatures.cd_nomenclature, "fr_numeric") ), ) response[field.name_field] = { "nomenclature_type": field.nomenclature_type.as_dict(), "nomenclatures": [n.as_dict() for n in field.nomenclature_type.nomenclatures], "values": values, } return jsonify(response)
@blueprint.route("/<destination>/imports/<int:import_id>/contentmapping", methods=["POST"]) @permissions.check_cruved_scope("C", get_scope=True, module_code="IMPORT", object_code="IMPORT")
[docs] def set_import_content_mapping(scope, imprt): if not imprt.has_instance_permission(scope, action_code="C"): raise Forbidden try: ContentMapping.validate_values(request.json) except ValueError as e: raise BadRequest(*e.args) imprt.contentmapping = request.json clean_import(imprt, ImportStep.PREPARE) db.session.commit() return jsonify(imprt.as_dict())
@blueprint.route("/<destination>/imports/<int:import_id>/prepare", methods=["POST"]) @permissions.check_cruved_scope("C", get_scope=True, module_code="IMPORT", object_code="IMPORT")
[docs] def prepare_import(scope, imprt): """ Prepare data to be imported: apply all checks and transformations. """ if not imprt.has_instance_permission(scope, action_code="C"): raise Forbidden # Check preconditions to execute this action if not imprt.loaded: raise Conflict("Field data must have been loaded before executing this action.") # Remove previous errors clean_import(imprt, ImportStep.PREPARE) # Run background import checks sig = do_import_checks.s(imprt.id_import) task = sig.freeze() imprt.task_id = task.task_id db.session.commit() sig.delay() return jsonify(imprt.as_dict())
@blueprint.route("/<destination>/imports/<int:import_id>/preview_valid_data", methods=["GET"]) @permissions.check_cruved_scope("C", get_scope=True, module_code="IMPORT", object_code="IMPORT")
[docs] def preview_valid_data(scope, imprt): """ Preview valid data for a given import. Parameters ---------- scope : int The scope of the (C, "IMPORT", "IMPORT") permission for the current user. imprt : geonature.core.imports.models.TImports The import object. Returns ------- flask.wrappers.Response A JSON response containing valid data, entities, columns, and data statistics. Raises ------ Forbidden If the current user has no sufficient permission given the scope and the import object. Conflict If the import is not processed, i.e. it has not been prepared yet. """ if not imprt.has_instance_permission(scope, action_code="C"): raise Forbidden if not imprt.processed: raise Conflict("Import must have been prepared before executing this action.") data = { "valid_bbox": imprt.destination.actions.compute_bounding_box(imprt), "entities": [], } # Retrieve data for each entity from entries in the transient table which are related to the import transient_table = imprt.destination.get_transient_table() entities: list[Entity] = db.session.scalars( select(Entity).filter_by(destination=imprt.destination).order_by(Entity.order) ).all() for entity in entities: fields = ( db.session.scalars( select(BibFields).where( BibFields.entities.any(EntityField.entity == entity), BibFields.dest_field != None, BibFields.name_field.in_(imprt.fieldmapping.keys()), ) ) .unique() .all() ) columns = [{"prop": field.dest_column, "name": field.fr_label} for field in fields] id_field = ( entity.unique_column.dest_field if entity.unique_column.dest_field in fields else None ) data_fields_query = [transient_table.c[field.dest_field] for field in fields] query = select(*data_fields_query).where( transient_table.c.id_import == imprt.id_import, transient_table.c[entity.validity_column] == True, ) valid_data = db.session.execute(query.limit(100)).all() def count_select(query_cte): count_ = "*" # if multiple entities and the entity has a unique column we base the count on the unique column if entity.unique_column and len(entities) > 1 and id_field: count_ = func.distinct(query_cte.c[id_field]) return count_ valid_data_cte = query.cte() n_valid_data = db.session.scalar( select(func.count(count_select(valid_data_cte))).select_from(valid_data_cte) ) invalid_data_cte = ( select(data_fields_query) .where( transient_table.c.id_import == imprt.id_import, transient_table.c[entity.validity_column] == False, ) .cte() ) n_invalid_data = db.session.scalar( select(func.count(count_select(invalid_data_cte))).select_from(invalid_data_cte) ) data["entities"].append( { "entity": entity.as_dict(), "columns": columns, "valid_data": valid_data, "n_valid_data": n_valid_data, "n_invalid_data": n_invalid_data, # NOTE: Not used in the frontend ... } ) return jsonify(data)
@blueprint.route("/<destination>/imports/<int:import_id>/errors", methods=["GET"]) @permissions.check_cruved_scope("R", get_scope=True, module_code="IMPORT", object_code="IMPORT")
[docs] def get_import_errors(scope, imprt): """ .. :quickref: Import; Get errors of an import. Get errors of an import. """ if not imprt.has_instance_permission(scope, action_code="R"): raise Forbidden return jsonify([error.as_dict(fields=["type", "entity"]) for error in imprt.errors])
@blueprint.route("/<destination>/imports/<int:import_id>/source_file", methods=["GET"]) @permissions.check_cruved_scope("R", get_scope=True, module_code="IMPORT", object_code="IMPORT")
[docs] def get_import_source_file(scope, imprt): if not imprt.has_instance_permission(scope, action_code="C"): raise Forbidden if imprt.source_file is None: raise Gone return send_file( BytesIO(imprt.source_file), download_name=imprt.full_file_name, as_attachment=True, mimetype=f"text/csv; charset={imprt.encoding}; header=present", )
@blueprint.route("/<destination>/imports/<int:import_id>/invalid_rows", methods=["GET"]) @permissions.check_cruved_scope("R", get_scope=True, module_code="IMPORT", object_code="IMPORT")
[docs] def get_import_invalid_rows_as_csv(scope, imprt): """ .. :quickref: Import; Get invalid rows of an import as CSV. Export invalid data in CSV. """ if not imprt.has_instance_permission(scope, action_code="C"): raise Forbidden if not imprt.processed: raise Conflict("Import must have been prepared before executing this action.") filename = imprt.full_file_name.rsplit(".", 1)[0] # remove extension filename = f"{filename}_errors.csv" @stream_with_context def generate_invalid_rows_csv(): sourcefile = TextIOWrapper(BytesIO(imprt.source_file), encoding=imprt.encoding) destfile = StringIO() csvreader = csv.reader(sourcefile, delimiter=imprt.separator) csvwriter = csv.writer(destfile, dialect=csvreader.dialect, lineterminator="\n") line_no = 1 for row in csvreader: # line_no == 1 → csv header if line_no == 1 or line_no in imprt.erroneous_rows: csvwriter.writerow(row) destfile.seek(0) yield destfile.read().encode(imprt.encoding) destfile.seek(0) destfile.truncate() line_no += 1 response = current_app.response_class( generate_invalid_rows_csv(), mimetype=f"text/csv; charset={imprt.encoding}; header=present", ) try: filename.encode("ascii") except UnicodeEncodeError: simple = unicodedata.normalize("NFKD", filename) simple = simple.encode("ascii", "ignore").decode("ascii") quoted = url_quote(filename, safe="") names = {"filename": simple, "filename*": f"UTF-8''{quoted}"} else: names = {"filename": filename} response.headers.set("Content-Disposition", "attachment", **names) return response
@blueprint.route("/<destination>/imports/<int:import_id>/import", methods=["POST"]) @permissions.check_cruved_scope("C", get_scope=True, module_code="IMPORT", object_code="IMPORT")
[docs] def import_valid_data(scope, imprt): """ .. :quickref: Import; Import the valid data. Import valid data in destination table. """ if not imprt.has_instance_permission(scope, action_code="C"): raise Forbidden if not imprt.processed: raise Forbidden("L’import n’a pas été préalablement vérifié.") transient_table = imprt.destination.get_transient_table() if not db.session.execute( select( exists() .where(transient_table.c.id_import == imprt.id_import) .where(or_(*[transient_table.c[v] == True for v in imprt.destination.validity_columns])) ) ).scalar(): raise BadRequest("Not valid data to import") clean_import(imprt, ImportStep.IMPORT) sig = do_import_in_destination.s(imprt.id_import) task = sig.freeze() imprt.task_id = task.task_id db.session.commit() sig.delay() return jsonify(imprt.as_dict())
@blueprint.route("/<destination>/imports/<int:import_id>/", methods=["DELETE"]) @permissions.check_cruved_scope("D", get_scope=True, module_code="IMPORT", object_code="IMPORT")
[docs] def delete_import(scope, imprt): """ .. :quickref: Import; Delete an import. Delete an import. """ if not imprt.has_instance_permission(scope, action_code="C"): raise Forbidden ImportUserError.query.filter_by(imprt=imprt).delete() transient_table = imprt.destination.get_transient_table() db.session.execute( delete(transient_table).where(transient_table.c.id_import == imprt.id_import) ) imprt.destination.actions.remove_data_from_destination(imprt) db.session.delete(imprt) db.session.commit() return jsonify()
@blueprint.route("/<destination>/export_pdf/<int:import_id>", methods=["POST"]) @permissions.check_cruved_scope("R", get_scope=True, module_code="IMPORT", object_code="IMPORT")
[docs] def export_pdf(scope, imprt): """ Downloads the report in pdf format """ if not imprt.has_instance_permission(scope, action_code="R"): raise Forbidden ctx = imprt.as_dict() ctx["map"] = request.form.get("map") if ctx["map"] == "undefined": ctx["map"] = None ctx["chart"] = request.form.get("chart") url_list = [ current_app.config["URL_APPLICATION"], "#", current_app.config["IMPORT"].get("MODULE_URL", "").replace("/", ""), str(ctx["id_import"]), "report", ] ctx["url"] = "/".join(url_list) ctx["statistics_formated"] = {} for label_dict in ctx["destination"]["statistics_labels"]: key = label_dict["value"] if label_dict["key"] in ctx["statistics"]: ctx["statistics_formated"][key] = ctx["statistics"][label_dict["key"]] pdf_file = generate_pdf_from_template("import_template_pdf.html", ctx) return send_file( BytesIO(pdf_file), mimetype="application/pdf", as_attachment=True, download_name="rapport.pdf", )
[docs] def get_foreign_key_attr(obj, field: str): """ Go through a object path to find the class to order on """ elems = dict(inspect(obj).relationships.items()) fields = field.split(".") if len(fields) == 1: return getattr(obj, fields[0], "") else: first_field = fields[0] remaining_fields = ".".join(fields[1:]) return get_foreign_key_attr(elems[first_field].mapper.class_, remaining_fields)
@blueprint.route("/<destination>/report_plot/<int:import_id>", methods=["GET"]) @permissions.check_cruved_scope("R", get_scope=True, module_code="IMPORT", object_code="IMPORT")
[docs] def report_plot(scope, imprt: TImports): if not imprt.has_instance_permission(scope, action_code="R"): raise Forbidden return json.dumps(imprt.destination.actions.report_plot(imprt))
@blueprint.route("/<destination>/generate_user_matching/<int:import_id>", methods=["GET"]) @permissions.check_cruved_scope("C", get_scope=True, module_code="IMPORT", object_code="IMPORT")
[docs] def generate_matching(scope, imprt: TImports): if not imprt.has_instance_permission(scope, action_code="C"): raise Forbidden if imprt.destination.actions.is_observer_mapping_enabled(): fields = db.session.scalars( select(BibFields).where( BibFields.name_field.in_(imprt.fieldmapping.keys()), BibFields.type_field == "observers", BibFields.id_destination == imprt.destination.id_destination, ) ).all() if len(fields) == 0: imprt.observermapping = {} for field in fields: mapping = imprt.fieldmapping[field.name_field] if mapping.get("column_src", None) is not None: imprt.observermapping.update(user_matching(imprt, field)) db.session.commit() elif imprt.observermapping != None: imprt.observermapping = None db.session.commit() return jsonify(imprt.observermapping)
@blueprint.route("/<destination>/is_observer_mapping_enabled", methods=["GET"]) @login_required
[docs] def is_observer_mapping_enabled(destination): return jsonify({"allowed": destination.actions.is_observer_mapping_enabled()})