import codecs
import csv
import json
import unicodedata
from io import BytesIO, StringIO, TextIOWrapper
# url_quote was deprecated in werkzeug 3.0 https://stackoverflow.com/a/77222063/5807438
from urllib.parse import quote as url_quote
from flask import current_app, g, jsonify, request, send_file, stream_with_context
from flask_login import current_user, login_required
from geonature.core.gn_commons.models import TModules
from geonature.core.gn_permissions import decorators as permissions
from geonature.core.imports.blueprint import blueprint
from geonature.core.imports.checks.sql.user import user_matching
from geonature.core.imports.models import (
BibFields,
ContentMapping,
Destination,
Entity,
EntityField,
FieldMapping,
ImportUserError,
TImports,
)
from geonature.core.imports.schemas import ImportSchema
from geonature.core.imports.tasks import do_import_checks, do_import_in_destination
from geonature.core.imports.utils import (
ImportStep,
clean_import,
detect_encoding,
detect_separator,
generate_pdf_from_template,
get_file_size,
insert_import_data_in_transient_table,
)
from geonature.utils.config import config
from geonature.utils.env import db
from geonature.utils.sentry import start_sentry_child
from marshmallow import EXCLUDE
from pypnnomenclature.models import TNomenclatures
from pypnusershub.db.models import User
from sqlalchemy import delete, desc, func, or_, select
from sqlalchemy.inspection import inspect
from sqlalchemy.orm import Load, contains_eager, joinedload, load_only, undefer
from sqlalchemy.orm.attributes import set_committed_value
from sqlalchemy.sql.expression import collate, exists
from werkzeug.exceptions import BadRequest, Conflict, Forbidden, Gone, NotFound
[docs]
IMPORTS_PER_PAGE = 15
@blueprint.url_value_preprocessor
[docs]
def resolve_import(endpoint, values):
if current_app.url_map.is_endpoint_expecting(endpoint, "import_id"):
import_id = values.pop("import_id")
if import_id is not None:
imprt = TImports.query.options(
joinedload(TImports.destination).joinedload(Destination.module)
).get_or_404(import_id)
if imprt.destination != values.pop("destination"):
raise NotFound
else:
imprt = None
values["imprt"] = imprt
@blueprint.route("/imports/", methods=["GET"])
@blueprint.route("/<destination>/imports/", methods=["GET"])
@permissions.check_cruved_scope("R", get_scope=True, module_code="IMPORT", object_code="IMPORT")
[docs]
def get_import_list(scope, destination=None):
"""
.. :quickref: Import; Get all imports.
Get all imports to which logged-in user has access.
"""
page = request.args.get("page", default=1, type=int)
limit = request.args.get("limit", default=IMPORTS_PER_PAGE, type=int)
search = request.args.get("search", default=None, type=str)
sort = request.args.get("sort", default="date_create_import", type=str)
sort_dir = request.args.get("sort_dir", default="desc", type=str)
filters = []
if search:
filters.append(TImports.full_file_name.ilike(f"%{search}%"))
filters.append(
TImports.authors.any(
or_(
User.prenom_role.ilike(f"%{search}%"),
User.nom_role.ilike(f"%{search}%"),
),
)
)
filters.append(
TImports.authors.any(
func.lower(User.nom_role).contains(func.lower(search)),
)
)
try:
order_by = get_foreign_key_attr(TImports, sort)
order_by = order_by() if callable(order_by) else order_by
except AttributeError:
raise BadRequest(f"Import field '{sort}' does not exist.")
if sort_dir == "desc":
order_by = desc(order_by)
query = (
select(TImports)
.options(
contains_eager(TImports.authors),
contains_eager(TImports.destination).contains_eager(Destination.module),
)
.join(TImports.authors, isouter=True)
.join(Destination)
.join(TModules)
.where(TImports.filter_by_scope(scope=scope))
.where(or_(*filters) if len(filters) > 0 else True)
.order_by(order_by)
)
if destination:
query = query.where(TImports.destination == destination)
imports = db.paginate(query, page=page, error_out=False, max_per_page=limit)
data = {
"imports": [imprt.as_dict() for imprt in imports.items],
"count": imports.total,
"limit": limit,
"offset": page - 1,
}
return jsonify(data)
@blueprint.route("/<destination>/imports/<int:import_id>/", methods=["GET"])
@permissions.check_cruved_scope("R", get_scope=True, module_code="IMPORT", object_code="IMPORT")
[docs]
def get_one_import(scope, imprt):
"""
.. :quickref: Import; Get an import.
Get an import.
"""
# check that the user has read permission to this particular import instance:
if not imprt.has_instance_permission(scope, action_code="R"):
raise Forbidden
return jsonify(imprt.as_dict())
@blueprint.route("/<destination>/imports/upload", defaults={"import_id": None}, methods=["POST"])
@blueprint.route("/<destination>/imports/<int:import_id>/upload", methods=["PUT"])
@permissions.check_cruved_scope("C", get_scope=True, module_code="IMPORT", object_code="IMPORT")
[docs]
def upload_file(scope, imprt, destination=None): # destination is set when imprt is None
"""
.. :quickref: Import; Add an import or update an existing import.
Add an import or update an existing import.
:form file: file to import
"""
if imprt:
if not imprt.has_instance_permission(scope, action_code="C"):
raise Forbidden
destination = imprt.destination
else:
assert destination
input_file = request.files.get("file", None)
input_fieldmapping_preset = request.form.get("fieldmapping", None)
# file must be set only when new import. otherwise, it's optional. it can also be a request to update the fieldmapping preset
if imprt is None and input_file is None:
raise BadRequest("File parameter is missig")
# Process destination
if imprt is None:
imprt = TImports(destination=destination)
author = g.current_user
imprt.authors.append(author)
db.session.add(imprt)
else:
# If imprt exists, remove observer mapping
imprt.observermapping = {}
# Process field mapping
if input_fieldmapping_preset:
fieldmapping_preset = json.loads(input_fieldmapping_preset)
# NOTES: Pas possible d'utiliser le validate value ici
# try:
# FieldMapping.validate_values(fields_to_map)
# except ValueError as e:
# raise BadRequest(*e.args)
if imprt.fieldmapping is None:
imprt.fieldmapping = {}
imprt.fieldmapping.update(fieldmapping_preset)
# Process file
if input_file:
size = get_file_size(input_file)
# value in config file is in Mo
max_file_size = current_app.config["IMPORT"]["MAX_FILE_SIZE"] * 1024 * 1024
if size > max_file_size:
raise BadRequest(
description=f"File too big ({size} > {max_file_size})."
) # FIXME better error signaling?
if size == 0:
raise BadRequest(description="Impossible to upload empty files")
else:
clean_import(imprt, ImportStep.UPLOAD)
with start_sentry_child(op="task", description="detect encoding"):
imprt.detected_encoding = detect_encoding(input_file)
with start_sentry_child(op="task", description="detect separator"):
imprt.detected_separator = detect_separator(
input_file,
encoding=imprt.encoding or imprt.detected_encoding,
)
imprt.source_file = input_file.read()
imprt.full_file_name = input_file.filename
# commit changes
db.session.commit()
return jsonify(imprt.as_dict())
@blueprint.route("/<destination>/imports/<int:import_id>/update", methods=["PUT"])
@permissions.check_cruved_scope("C", get_scope=True, module_code="IMPORT", object_code="IMPORT")
[docs]
def update_import(scope, imprt, destination=None): # destination is set when imprt is None
"""
.. :quickref: Import; Add an import or update an existing import.
Add an import or update an existing import.
:form file: file to import
"""
if imprt:
if not imprt.has_instance_permission(scope, action_code="C"):
raise Forbidden
destination = imprt.destination
else:
raise BadRequest("Import with id {} does not exist")
data = request.get_json()
if not data:
raise BadRequest(description="No data provided")
try:
ImportSchema().load(data, unknown=EXCLUDE)
except Exception as e:
raise BadRequest(description=f"{e}")
# commit changes
db.session.commit()
return jsonify(imprt.as_dict())
@blueprint.route("/<destination>/imports/<int:import_id>/decode", methods=["POST"])
@permissions.check_cruved_scope("C", get_scope=True, module_code="IMPORT", object_code="IMPORT")
[docs]
def decode_file(scope, imprt):
if not imprt.has_instance_permission(scope, action_code="C"):
raise Forbidden
if imprt.source_file is None:
raise BadRequest(description="A file must be first uploaded.")
if "encoding" not in request.json:
raise BadRequest(description="Missing encoding.")
encoding = request.json["encoding"]
try:
codecs.lookup(encoding)
except LookupError:
raise BadRequest(description="Unknown encoding.")
imprt.encoding = encoding
if "format" not in request.json:
raise BadRequest(description="Missing format.")
if request.json["format"] not in TImports.AVAILABLE_FORMATS:
raise BadRequest(description="Unknown format.")
imprt.format_source_file = request.json["format"]
if "srid" not in request.json:
raise BadRequest(description="Missing srid.")
try:
imprt.srid = int(request.json["srid"])
except ValueError:
raise BadRequest(description="SRID must be an integer.")
if "separator" not in request.json:
raise BadRequest(description="Missing separator")
if request.json["separator"] not in TImports.AVAILABLE_SEPARATORS:
raise BadRequest(description="Unknown separator")
imprt.separator = request.json["separator"]
clean_import(imprt, ImportStep.DECODE)
db.session.commit() # commit parameters
decode = request.args.get("decode", 1)
try:
decode = int(decode)
except ValueError:
raise BadRequest(description="decode parameter must but an int")
if decode:
csvfile = TextIOWrapper(BytesIO(imprt.source_file), encoding=imprt.encoding)
csvreader = csv.reader(csvfile, delimiter=imprt.separator)
try:
columns = next(csvreader)
while True: # read full file to ensure that no encoding errors occur
next(csvreader)
except UnicodeError:
raise BadRequest(
description="Erreur d’encodage lors de la lecture du fichier source. "
"Avez-vous sélectionné le bon encodage de votre fichier ?"
)
except StopIteration:
pass
duplicates = set([col for col in columns if columns.count(col) > 1])
if duplicates:
raise BadRequest(f"Duplicates column names: {duplicates}")
imprt.columns = columns
db.session.commit()
return jsonify(imprt.as_dict())
@blueprint.route("/<destination>/imports/<int:import_id>/fieldmapping", methods=["POST"])
@permissions.check_cruved_scope("C", get_scope=True, module_code="IMPORT", object_code="IMPORT")
[docs]
def set_import_field_mapping(scope, imprt):
if not imprt.has_instance_permission(scope, action_code="C"):
raise Forbidden
try:
FieldMapping.validate_values(request.json)
except ValueError as e:
raise BadRequest(*e.args)
imprt.fieldmapping = request.json
clean_import(imprt, ImportStep.LOAD)
db.session.commit()
return jsonify(imprt.as_dict())
@blueprint.route("/<destination>/imports/<int:import_id>/load", methods=["POST"])
@permissions.check_cruved_scope("C", get_scope=True, module_code="IMPORT", object_code="IMPORT")
[docs]
def load_import(scope, imprt):
if not imprt.has_instance_permission(scope, action_code="C"):
raise Forbidden
if imprt.source_file is None:
raise BadRequest(description="A file must be first uploaded.")
if imprt.fieldmapping is None:
raise BadRequest(description="File fields must be first mapped.")
clean_import(imprt, ImportStep.LOAD)
with start_sentry_child(op="task", description="insert data in db"):
line_no = insert_import_data_in_transient_table(imprt)
if not line_no:
raise BadRequest("File with 0 lines.")
imprt.source_count = line_no
imprt.loaded = True
db.session.commit()
return jsonify(imprt.as_dict())
@blueprint.route("/<destination>/imports/<int:import_id>/columns", methods=["GET"])
@permissions.check_cruved_scope("R", get_scope=True, module_code="IMPORT", object_code="IMPORT")
[docs]
def get_import_columns_name(scope, imprt):
"""
.. :quickref: Import;
Return all the columns of the file of an import
"""
if not imprt.has_instance_permission(scope, action_code="C"):
raise Forbidden
if not imprt.columns:
raise Conflict(description="Data have not been decoded.")
return jsonify(imprt.columns)
@blueprint.route("/<destination>/imports/<int:import_id>/values", methods=["GET"])
@permissions.check_cruved_scope("R", get_scope=True, module_code="IMPORT", object_code="IMPORT")
[docs]
def get_import_values(scope, imprt):
"""
.. :quickref: Import;
Return all values present in imported file for nomenclated fields
"""
# check that the user has read permission to this particular import instance:
if not imprt.has_instance_permission(scope, action_code="C"):
raise Forbidden
if not imprt.loaded:
raise Conflict(description="Data have not been loaded")
nomenclated_fields = db.session.scalars(
select(BibFields)
.where(BibFields.mnemonique != None, BibFields.destination == imprt.destination)
.options(joinedload(BibFields.nomenclature_type))
.order_by(BibFields.id_field)
).all()
# Note: response format is validated with jsonschema in tests
transient_table = imprt.destination.get_transient_table()
response = {}
for field in nomenclated_fields:
if field.name_field not in imprt.fieldmapping:
# this nomenclated field is not mapped
continue
source = imprt.fieldmapping[field.name_field]
if (
source.get("column_src", None) not in imprt.columns
and source.get("constant_value", None) is None
):
# the file do not contain this field expected by the mapping and there is no constant value
continue
# TODO: vérifier que l’on a pas trop de valeurs différentes ?
column = field.source_column
values = [
value
for value, in db.session.execute(
select(transient_table.c[column])
.where(transient_table.c.id_import == imprt.id_import)
.distinct(transient_table.c[column])
).fetchall()
]
set_committed_value(
field.nomenclature_type,
"nomenclatures",
TNomenclatures.query.filter_by(nomenclature_type=field.nomenclature_type).order_by(
collate(TNomenclatures.cd_nomenclature, "fr_numeric")
),
)
response[field.name_field] = {
"nomenclature_type": field.nomenclature_type.as_dict(),
"nomenclatures": [n.as_dict() for n in field.nomenclature_type.nomenclatures],
"values": values,
}
return jsonify(response)
@blueprint.route("/<destination>/imports/<int:import_id>/contentmapping", methods=["POST"])
@permissions.check_cruved_scope("C", get_scope=True, module_code="IMPORT", object_code="IMPORT")
[docs]
def set_import_content_mapping(scope, imprt):
if not imprt.has_instance_permission(scope, action_code="C"):
raise Forbidden
try:
ContentMapping.validate_values(request.json)
except ValueError as e:
raise BadRequest(*e.args)
imprt.contentmapping = request.json
clean_import(imprt, ImportStep.PREPARE)
db.session.commit()
return jsonify(imprt.as_dict())
@blueprint.route("/<destination>/imports/<int:import_id>/prepare", methods=["POST"])
@permissions.check_cruved_scope("C", get_scope=True, module_code="IMPORT", object_code="IMPORT")
[docs]
def prepare_import(scope, imprt):
"""
Prepare data to be imported: apply all checks and transformations.
"""
if not imprt.has_instance_permission(scope, action_code="C"):
raise Forbidden
# Check preconditions to execute this action
if not imprt.loaded:
raise Conflict("Field data must have been loaded before executing this action.")
# Remove previous errors
clean_import(imprt, ImportStep.PREPARE)
# Run background import checks
sig = do_import_checks.s(imprt.id_import)
task = sig.freeze()
imprt.task_id = task.task_id
db.session.commit()
sig.delay()
return jsonify(imprt.as_dict())
@blueprint.route("/<destination>/imports/<int:import_id>/preview_valid_data", methods=["GET"])
@permissions.check_cruved_scope("C", get_scope=True, module_code="IMPORT", object_code="IMPORT")
[docs]
def preview_valid_data(scope, imprt):
"""
Preview valid data for a given import.
Parameters
----------
scope : int
The scope of the (C, "IMPORT", "IMPORT") permission for the current user.
imprt : geonature.core.imports.models.TImports
The import object.
Returns
-------
flask.wrappers.Response
A JSON response containing valid data, entities, columns, and data statistics.
Raises
------
Forbidden
If the current user has no sufficient permission given the scope and the import object.
Conflict
If the import is not processed, i.e. it has not been prepared yet.
"""
if not imprt.has_instance_permission(scope, action_code="C"):
raise Forbidden
if not imprt.processed:
raise Conflict("Import must have been prepared before executing this action.")
data = {
"valid_bbox": imprt.destination.actions.compute_bounding_box(imprt),
"entities": [],
}
# Retrieve data for each entity from entries in the transient table which are related to the import
transient_table = imprt.destination.get_transient_table()
entities: list[Entity] = db.session.scalars(
select(Entity).filter_by(destination=imprt.destination).order_by(Entity.order)
).all()
for entity in entities:
fields = (
db.session.scalars(
select(BibFields).where(
BibFields.entities.any(EntityField.entity == entity),
BibFields.dest_field != None,
BibFields.name_field.in_(imprt.fieldmapping.keys()),
)
)
.unique()
.all()
)
columns = [{"prop": field.dest_column, "name": field.fr_label} for field in fields]
id_field = (
entity.unique_column.dest_field if entity.unique_column.dest_field in fields else None
)
data_fields_query = [transient_table.c[field.dest_field] for field in fields]
query = select(*data_fields_query).where(
transient_table.c.id_import == imprt.id_import,
transient_table.c[entity.validity_column] == True,
)
valid_data = db.session.execute(query.limit(100)).all()
def count_select(query_cte):
count_ = "*"
# if multiple entities and the entity has a unique column we base the count on the unique column
if entity.unique_column and len(entities) > 1 and id_field:
count_ = func.distinct(query_cte.c[id_field])
return count_
valid_data_cte = query.cte()
n_valid_data = db.session.scalar(
select(func.count(count_select(valid_data_cte))).select_from(valid_data_cte)
)
invalid_data_cte = (
select(data_fields_query)
.where(
transient_table.c.id_import == imprt.id_import,
transient_table.c[entity.validity_column] == False,
)
.cte()
)
n_invalid_data = db.session.scalar(
select(func.count(count_select(invalid_data_cte))).select_from(invalid_data_cte)
)
data["entities"].append(
{
"entity": entity.as_dict(),
"columns": columns,
"valid_data": valid_data,
"n_valid_data": n_valid_data,
"n_invalid_data": n_invalid_data, # NOTE: Not used in the frontend ...
}
)
return jsonify(data)
@blueprint.route("/<destination>/imports/<int:import_id>/errors", methods=["GET"])
@permissions.check_cruved_scope("R", get_scope=True, module_code="IMPORT", object_code="IMPORT")
[docs]
def get_import_errors(scope, imprt):
"""
.. :quickref: Import; Get errors of an import.
Get errors of an import.
"""
if not imprt.has_instance_permission(scope, action_code="R"):
raise Forbidden
return jsonify([error.as_dict(fields=["type", "entity"]) for error in imprt.errors])
@blueprint.route("/<destination>/imports/<int:import_id>/source_file", methods=["GET"])
@permissions.check_cruved_scope("R", get_scope=True, module_code="IMPORT", object_code="IMPORT")
[docs]
def get_import_source_file(scope, imprt):
if not imprt.has_instance_permission(scope, action_code="C"):
raise Forbidden
if imprt.source_file is None:
raise Gone
return send_file(
BytesIO(imprt.source_file),
download_name=imprt.full_file_name,
as_attachment=True,
mimetype=f"text/csv; charset={imprt.encoding}; header=present",
)
@blueprint.route("/<destination>/imports/<int:import_id>/invalid_rows", methods=["GET"])
@permissions.check_cruved_scope("R", get_scope=True, module_code="IMPORT", object_code="IMPORT")
[docs]
def get_import_invalid_rows_as_csv(scope, imprt):
"""
.. :quickref: Import; Get invalid rows of an import as CSV.
Export invalid data in CSV.
"""
if not imprt.has_instance_permission(scope, action_code="C"):
raise Forbidden
if not imprt.processed:
raise Conflict("Import must have been prepared before executing this action.")
filename = imprt.full_file_name.rsplit(".", 1)[0] # remove extension
filename = f"{filename}_errors.csv"
@stream_with_context
def generate_invalid_rows_csv():
sourcefile = TextIOWrapper(BytesIO(imprt.source_file), encoding=imprt.encoding)
destfile = StringIO()
csvreader = csv.reader(sourcefile, delimiter=imprt.separator)
csvwriter = csv.writer(destfile, dialect=csvreader.dialect, lineterminator="\n")
line_no = 1
for row in csvreader:
# line_no == 1 → csv header
if line_no == 1 or line_no in imprt.erroneous_rows:
csvwriter.writerow(row)
destfile.seek(0)
yield destfile.read().encode(imprt.encoding)
destfile.seek(0)
destfile.truncate()
line_no += 1
response = current_app.response_class(
generate_invalid_rows_csv(),
mimetype=f"text/csv; charset={imprt.encoding}; header=present",
)
try:
filename.encode("ascii")
except UnicodeEncodeError:
simple = unicodedata.normalize("NFKD", filename)
simple = simple.encode("ascii", "ignore").decode("ascii")
quoted = url_quote(filename, safe="")
names = {"filename": simple, "filename*": f"UTF-8''{quoted}"}
else:
names = {"filename": filename}
response.headers.set("Content-Disposition", "attachment", **names)
return response
@blueprint.route("/<destination>/imports/<int:import_id>/import", methods=["POST"])
@permissions.check_cruved_scope("C", get_scope=True, module_code="IMPORT", object_code="IMPORT")
[docs]
def import_valid_data(scope, imprt):
"""
.. :quickref: Import; Import the valid data.
Import valid data in destination table.
"""
if not imprt.has_instance_permission(scope, action_code="C"):
raise Forbidden
if not imprt.processed:
raise Forbidden("L’import n’a pas été préalablement vérifié.")
transient_table = imprt.destination.get_transient_table()
if not db.session.execute(
select(
exists()
.where(transient_table.c.id_import == imprt.id_import)
.where(or_(*[transient_table.c[v] == True for v in imprt.destination.validity_columns]))
)
).scalar():
raise BadRequest("Not valid data to import")
clean_import(imprt, ImportStep.IMPORT)
sig = do_import_in_destination.s(imprt.id_import)
task = sig.freeze()
imprt.task_id = task.task_id
db.session.commit()
sig.delay()
return jsonify(imprt.as_dict())
@blueprint.route("/<destination>/imports/<int:import_id>/", methods=["DELETE"])
@permissions.check_cruved_scope("D", get_scope=True, module_code="IMPORT", object_code="IMPORT")
[docs]
def delete_import(scope, imprt):
"""
.. :quickref: Import; Delete an import.
Delete an import.
"""
if not imprt.has_instance_permission(scope, action_code="C"):
raise Forbidden
ImportUserError.query.filter_by(imprt=imprt).delete()
transient_table = imprt.destination.get_transient_table()
db.session.execute(
delete(transient_table).where(transient_table.c.id_import == imprt.id_import)
)
imprt.destination.actions.remove_data_from_destination(imprt)
db.session.delete(imprt)
db.session.commit()
return jsonify()
@blueprint.route("/<destination>/export_pdf/<int:import_id>", methods=["POST"])
@permissions.check_cruved_scope("R", get_scope=True, module_code="IMPORT", object_code="IMPORT")
[docs]
def export_pdf(scope, imprt):
"""
Downloads the report in pdf format
"""
if not imprt.has_instance_permission(scope, action_code="R"):
raise Forbidden
ctx = imprt.as_dict()
ctx["map"] = request.form.get("map")
if ctx["map"] == "undefined":
ctx["map"] = None
ctx["chart"] = request.form.get("chart")
url_list = [
current_app.config["URL_APPLICATION"],
"#",
current_app.config["IMPORT"].get("MODULE_URL", "").replace("/", ""),
str(ctx["id_import"]),
"report",
]
ctx["url"] = "/".join(url_list)
ctx["statistics_formated"] = {}
for label_dict in ctx["destination"]["statistics_labels"]:
key = label_dict["value"]
if label_dict["key"] in ctx["statistics"]:
ctx["statistics_formated"][key] = ctx["statistics"][label_dict["key"]]
pdf_file = generate_pdf_from_template("import_template_pdf.html", ctx)
return send_file(
BytesIO(pdf_file),
mimetype="application/pdf",
as_attachment=True,
download_name="rapport.pdf",
)
[docs]
def get_foreign_key_attr(obj, field: str):
"""
Go through a object path to find the class to order on
"""
elems = dict(inspect(obj).relationships.items())
fields = field.split(".")
if len(fields) == 1:
return getattr(obj, fields[0], "")
else:
first_field = fields[0]
remaining_fields = ".".join(fields[1:])
return get_foreign_key_attr(elems[first_field].mapper.class_, remaining_fields)
@blueprint.route("/<destination>/report_plot/<int:import_id>", methods=["GET"])
@permissions.check_cruved_scope("R", get_scope=True, module_code="IMPORT", object_code="IMPORT")
[docs]
def report_plot(scope, imprt: TImports):
if not imprt.has_instance_permission(scope, action_code="R"):
raise Forbidden
return json.dumps(imprt.destination.actions.report_plot(imprt))
@blueprint.route("/<destination>/generate_user_matching/<int:import_id>", methods=["GET"])
@permissions.check_cruved_scope("C", get_scope=True, module_code="IMPORT", object_code="IMPORT")
[docs]
def generate_matching(scope, imprt: TImports):
if not imprt.has_instance_permission(scope, action_code="C"):
raise Forbidden
if imprt.destination.actions.is_observer_mapping_enabled():
fields = db.session.scalars(
select(BibFields).where(
BibFields.name_field.in_(imprt.fieldmapping.keys()),
BibFields.type_field == "observers",
BibFields.id_destination == imprt.destination.id_destination,
)
).all()
if len(fields) == 0:
imprt.observermapping = {}
for field in fields:
mapping = imprt.fieldmapping[field.name_field]
if mapping.get("column_src", None) is not None:
imprt.observermapping.update(user_matching(imprt, field))
db.session.commit()
elif imprt.observermapping != None:
imprt.observermapping = None
db.session.commit()
return jsonify(imprt.observermapping)
@blueprint.route("/<destination>/is_observer_mapping_enabled", methods=["GET"])
@login_required
[docs]
def is_observer_mapping_enabled(destination):
return jsonify({"allowed": destination.actions.is_observer_mapping_enabled()})