Code source de geonature.core.imports.models

from datetime import datetime
from collections.abc import Mapping
import re
from typing import Any, Iterable, List, Optional
from packaging import version

from flask import g
import sqlalchemy as sa
from sqlalchemy import func, ForeignKey, Table
from sqlalchemy.orm import relationship, deferred, joinedload
from sqlalchemy.types import ARRAY
from sqlalchemy.dialects.postgresql import JSON
from sqlalchemy.ext.mutable import MutableDict
from sqlalchemy.orm import column_property
from jsonschema.exceptions import ValidationError as JSONValidationError
from jsonschema import validate as validate_json
from celery.result import AsyncResult
import flask_sqlalchemy

if version.parse(flask_sqlalchemy.__version__) >= version.parse("3"):
    from flask_sqlalchemy.query import Query
else:  # retro-compatibility Flask-SQLAlchemy 2
    from flask_sqlalchemy import BaseQuery as Query

from utils_flask_sqla.models import qfilter
from utils_flask_sqla.serializers import serializable

from geonature.utils.env import db
from geonature.utils.celery import celery_app
from geonature.core.gn_permissions.tools import get_scopes_by_action
from geonature.core.gn_commons.models import TModules
from geonature.core.gn_meta.models import TDatasets
from pypnnomenclature.models import BibNomenclaturesTypes
from pypnusershub.db.models import User


[docs] class ImportModule(TModules):
[docs] __mapper_args__ = { "polymorphic_identity": "import", }
[docs] def generate_module_url_for_source(self, source): id_import = re.search(r"^Import\(id=(?P<id>\d+)\)$", source.name_source).group("id") destination = db.session.scalars( db.select(Destination.code) .where(Destination.id_destination == TImports.id_destination) .where(TImports.id_import == id_import) ).one_or_none() return f"/import/{destination}/{id_import}/report"
""" Erreurs ======= ImportErrorType = un type d’erreur, avec sa description ImportErrorType.category = la catégorie auquelle est rattaché ce type d’erreur ex: le type d’erreur « date invalide » est rattaché à la catégorie « erreur de format » note: c’est un champs texte libre, il n’y a pas de modèle ImportErrorCategory ImportError = occurance d’un genre d’erreur, associé à une ou plusieurs ligne d’un import précis """ @serializable
[docs] class ImportUserErrorType(db.Model):
[docs] __tablename__ = "bib_errors_types"
[docs] __table_args__ = {"schema": "gn_imports"}
[docs] pk = db.Column("id_error", db.Integer, primary_key=True)
[docs] category = db.Column("error_type", db.Unicode, nullable=False)
[docs] name = db.Column(db.Unicode, nullable=False, unique=True)
[docs] description = db.Column(db.Unicode)
[docs] level = db.Column("error_level", db.Unicode)
[docs] def __str__(self): return f"<ImportErrorType {self.name}>"
@serializable
[docs] class ImportUserError(db.Model):
[docs] __tablename__ = "t_user_errors"
[docs] __table_args__ = {"schema": "gn_imports"}
[docs] pk = db.Column("id_user_error", db.Integer, primary_key=True)
[docs] id_import = db.Column( db.Integer, db.ForeignKey("gn_imports.t_imports.id_import", onupdate="CASCADE", ondelete="CASCADE"), )
[docs] imprt = db.relationship("TImports", back_populates="errors")
[docs] id_type = db.Column( "id_error", db.Integer, db.ForeignKey(ImportUserErrorType.pk, onupdate="CASCADE", ondelete="CASCADE"), )
[docs] type = db.relationship("ImportUserErrorType")
[docs] column = db.Column("column_error", db.Unicode)
[docs] rows = db.Column("id_rows", db.ARRAY(db.Integer))
[docs] comment = db.Column(db.UnicodeText)
[docs] id_entity = db.Column( db.Integer, db.ForeignKey("gn_imports.bib_entities.id_entity", onupdate="CASCADE", ondelete="CASCADE"), )
[docs] entity = db.relationship("Entity")
[docs] def __str__(self): return f"<ImportError import={self.id_import},type={self.type.name},rows={self.rows}>"
@serializable
[docs] class Destination(db.Model):
[docs] __tablename__ = "bib_destinations"
[docs] __table_args__ = {"schema": "gn_imports"}
[docs] id_destination = db.Column(db.Integer, primary_key=True, autoincrement=True)
[docs] id_module = db.Column(db.Integer, ForeignKey(TModules.id_module), nullable=True)
[docs] code = db.Column(db.String(64), unique=True)
[docs] label = db.Column(db.String(128))
[docs] table_name = db.Column(db.String(64))
[docs] module = relationship(TModules, backref="destination")
[docs] entities = relationship("Entity", back_populates="destination")
[docs] def get_transient_table(self): return Table( self.table_name, db.metadata, autoload=True, autoload_with=db.session.connection(), schema="gn_imports", )
@property
[docs] def validity_columns(self): return [entity.validity_column for entity in self.entities]
@property
[docs] def statistics_labels(self): return self.actions.statistics_labels()
@property
[docs] def actions(self): try: return self.module.__import_actions__ except AttributeError as exc: """ This error is likely to occurs when you have some imports to a destination for which the corresponding module is missing in the venv. As a result, sqlalchemy fail to find the proper polymorphic identity, and fallback on TModules which does not have __import_actions__ property. """ raise AttributeError(f"Is your module of type '{self.module.type}' installed?") from exc
@staticmethod
[docs] def allowed_destinations( user: Optional[User] = None, action_code: str = "C" ) -> List["Destination"]: """ Return a list of allowed destinations for a given user and an action. Parameters ---------- user : User, optional The user to filter destinations for. If not provided, the current_user is used. action : str The action to filter destinations for. Possible values are 'C', 'R', 'U', 'V', 'E', 'D'. Returns ------- allowed_destination : List of Destination List of allowed destinations for the given user. """ # If no user is provided, use the current user if not user: user = g.current_user # Retrieve all destinations all_destination = db.session.scalars(sa.select(Destination)).all() return [dest for dest in all_destination if dest.has_instance_permission(user, action_code)]
@qfilter
[docs] def filter_by_role(cls, user: Optional[User] = None, action_code: str = "C", **kwargs): """ Filter Destination by role. Parameters ---------- user : User, optional The user to filter destinations for. If not provided, the current_user is used. Returns ------- sqlalchemy.sql.elements.BinaryExpression A filter criterion for the ``id_destination`` column of the ``Destination`` table. """ allowed_destination = Destination.allowed_destinations(user=user, action_code=action_code) return Destination.id_destination.in_(map(lambda x: x.id_destination, allowed_destination))
[docs] def has_instance_permission(self, user: Optional[User] = None, action_code: str = "C"): """ Check if a user has the permissions to do an action on this destination. Parameters ---------- user : User, optional The user to check the permission for. If not provided, the current_user is used. action_code : str The action to check the permission for. Possible values are 'C', 'R', 'U', 'V', 'E', 'D'. Returns ------- bool True if the user has the right to do the action on this destination, False otherwise. """ if not user: user = g.current_user max_scope = get_scopes_by_action(id_role=user.id_role, module_code=self.module.module_code)[ action_code ] return max_scope > 0
[docs] def __repr__(self): return self.label
@serializable
[docs] class BibThemes(db.Model):
[docs] __tablename__ = "bib_themes"
[docs] __table_args__ = {"schema": "gn_imports"}
[docs] id_theme = db.Column(db.Integer, primary_key=True)
[docs] name_theme = db.Column(db.Unicode, nullable=False)
[docs] fr_label_theme = db.Column(db.Unicode, nullable=False)
[docs] eng_label_theme = db.Column(db.Unicode, nullable=True)
[docs] desc_theme = db.Column(db.Unicode, nullable=True)
[docs] order_theme = db.Column(db.Integer, nullable=False)
@serializable
[docs] class EntityField(db.Model):
[docs] __tablename__ = "cor_entity_field"
[docs] __table_args__ = {"schema": "gn_imports"}
[docs] id_entity = db.Column( db.Integer, db.ForeignKey("gn_imports.bib_entities.id_entity"), primary_key=True )
[docs] entity = relationship("Entity", back_populates="fields")
[docs] id_field = db.Column( db.Integer, db.ForeignKey("gn_imports.bib_fields.id_field"), primary_key=True )
[docs] field = relationship("BibFields", back_populates="entities")
[docs] desc_field = db.Column(db.Unicode, nullable=True)
[docs] id_theme = db.Column(db.Integer, db.ForeignKey(BibThemes.id_theme), nullable=False)
[docs] theme = relationship(BibThemes)
[docs] order_field = db.Column(db.Integer, nullable=False)
[docs] comment = db.Column(db.Unicode)
@serializable
[docs] class Entity(db.Model):
[docs] __tablename__ = "bib_entities"
[docs] __table_args__ = {"schema": "gn_imports"}
[docs] id_entity = db.Column(db.Integer, primary_key=True, autoincrement=True)
[docs] id_destination = db.Column(db.Integer, ForeignKey(Destination.id_destination))
[docs] destination = relationship(Destination, back_populates="entities")
[docs] code = db.Column(db.String(16))
[docs] label = db.Column(db.String(64))
[docs] order = db.Column(db.Integer)
[docs] validity_column = db.Column(db.String(64))
[docs] destination_table_schema = db.Column(db.String(63))
[docs] destination_table_name = db.Column(db.String(63))
[docs] id_unique_column = db.Column( db.Integer, db.ForeignKey("gn_imports.bib_fields.id_field"), primary_key=True )
[docs] id_parent = db.Column(db.Integer, ForeignKey("gn_imports.bib_entities.id_entity"))
[docs] parent = relationship("Entity", back_populates="childs", remote_side=[id_entity])
[docs] childs = relationship("Entity", back_populates="parent")
[docs] fields = relationship("EntityField", back_populates="entity")
[docs] unique_column = relationship("BibFields")
[docs] def get_destination_table(self): return Table( self.destination_table_name, db.metadata, autoload=True, autoload_with=db.session.connection(), schema=self.destination_table_schema, )
[docs] class InstancePermissionMixin:
[docs] def get_instance_permissions(self, scopes, user=None): if user is None: user = g.current_user if isinstance(scopes, Mapping): return { key: self.has_instance_permission(scope, user=user) for key, scope in scopes.items() } else: return [self.has_instance_permission(scope, user=user) for scope in scopes]
[docs] cor_role_import = db.Table( "cor_role_import", db.Column("id_role", db.Integer, db.ForeignKey(User.id_role), primary_key=True), db.Column( "id_import", db.Integer, db.ForeignKey("gn_imports.t_imports.id_import"), primary_key=True, ), schema="gn_imports", )
@serializable( fields=[ "authors.nom_complet", "dataset.dataset_name", "dataset.active", "destination.code", "destination.label", "destination.statistics_labels", "destination.module", ] )
[docs] class TImports(InstancePermissionMixin, db.Model):
[docs] __tablename__ = "t_imports"
[docs] __table_args__ = {"schema": "gn_imports"}
# https://docs.python.org/3/library/codecs.html # https://chardet.readthedocs.io/en/latest/supported-encodings.html # TODO: move in configuration file
[docs] AVAILABLE_ENCODINGS = { "utf-8", "iso-8859-1", "iso-8859-15", }
[docs] AVAILABLE_FORMATS = ["csv", "geojson"]
[docs] AVAILABLE_SEPARATORS = [",", ";"]
[docs] id_import = db.Column(db.Integer, primary_key=True, autoincrement=True)
[docs] id_destination = db.Column(db.Integer, ForeignKey(Destination.id_destination))
[docs] destination = relationship(Destination)
[docs] format_source_file = db.Column(db.Unicode, nullable=True)
[docs] srid = db.Column(db.Integer, nullable=True)
[docs] separator = db.Column(db.Unicode, nullable=True)
[docs] detected_separator = db.Column(db.Unicode, nullable=True)
[docs] encoding = db.Column(db.Unicode, nullable=True)
[docs] detected_encoding = db.Column(db.Unicode, nullable=True)
# import_table = db.Column(db.Unicode, nullable=True)
[docs] full_file_name = db.Column(db.Unicode, nullable=True)
[docs] id_dataset = db.Column(db.Integer, ForeignKey("gn_meta.t_datasets.id_dataset"), nullable=True)
[docs] date_create_import = db.Column(db.DateTime, default=datetime.now)
[docs] date_update_import = db.Column(db.DateTime, default=datetime.now, onupdate=datetime.now)
[docs] date_end_import = db.Column(db.DateTime, nullable=True)
[docs] source_count = db.Column(db.Integer, nullable=True)
[docs] erroneous_rows = deferred(db.Column(ARRAY(db.Integer), nullable=True))
[docs] statistics = db.Column( MutableDict.as_mutable(JSON), nullable=False, server_default="'{}'::jsonb" )
[docs] date_min_data = db.Column(db.DateTime, nullable=True)
[docs] date_max_data = db.Column(db.DateTime, nullable=True)
[docs] uuid_autogenerated = db.Column(db.Boolean)
[docs] altitude_autogenerated = db.Column(db.Boolean)
[docs] authors = db.relationship( User, lazy="joined", secondary=cor_role_import, )
[docs] loaded = db.Column(db.Boolean, nullable=False, default=False)
[docs] processed = db.Column(db.Boolean, nullable=False, default=False)
[docs] dataset = db.relationship(TDatasets, lazy="joined")
[docs] source_file = deferred(db.Column(db.LargeBinary))
[docs] columns = db.Column(ARRAY(db.Unicode))
# keys are target names, values are source names
[docs] fieldmapping = db.Column(MutableDict.as_mutable(JSON))
[docs] contentmapping = db.Column(MutableDict.as_mutable(JSON))
[docs] task_id = db.Column(sa.String(155))
[docs] errors = db.relationship( "ImportUserError", back_populates="imprt", order_by="ImportUserError.id_type", # TODO order by type.category cascade="all, delete-orphan", )
@property
[docs] def cruved(self): scopes_by_action = get_scopes_by_action(module_code="IMPORT", object_code="IMPORT") return { action: self.has_instance_permission(scope) for action, scope in scopes_by_action.items() }
[docs] errors_count = column_property(func.array_length(erroneous_rows, 1))
@property
[docs] def task_progress(self): if self.task_id is None: return None result = AsyncResult(self.task_id, app=celery_app) if result.state in ["PENDING", "STARTED"]: return 0 elif result.state == "PROGRESS": return result.result["progress"] elif result.state == "SUCCESS": return None else: return -1
[docs] def has_instance_permission(self, scope, user=None, action_code="C"): if user is None: user = g.current_user if not self.destination.has_instance_permission(user, action_code) and action_code != "R": return False if scope == 0: # pragma: no cover (should not happen as already checked by the decorator) return False elif scope == 1: # self return user.id_role in [author.id_role for author in self.authors] elif scope == 2: # organism return user.id_role in [author.id_role for author in self.authors] or ( user.id_organisme is not None and user.id_organisme in [author.id_organisme for author in self.authors] ) elif scope == 3: # all return True
@staticmethod
[docs] def filter_by_scope(scope, user=None, **kwargs): if user is None: user = g.current_user if scope == 0: return sa.false() elif scope in (1, 2): filters = [User.id_role == user.id_role] if scope == 2 and user.id_organisme is not None: filters += [User.id_organisme == user.id_organisme] return TImports.authors.any(sa.or_(*filters)) elif scope == 3: return sa.true() else: raise Exception(f"Unexpected scope {scope}")
[docs] def as_dict(self, import_as_dict): import_as_dict["authors_name"] = "; ".join([author.nom_complet for author in self.authors]) if self.detected_encoding: import_as_dict["available_encodings"] = sorted( TImports.AVAILABLE_ENCODINGS | { self.detected_encoding, } ) else: import_as_dict["available_encodings"] = sorted(TImports.AVAILABLE_ENCODINGS) import_as_dict["available_formats"] = TImports.AVAILABLE_FORMATS import_as_dict["available_separators"] = TImports.AVAILABLE_SEPARATORS if self.full_file_name and "." in self.full_file_name: extension = self.full_file_name.rsplit(".", 1)[-1] if extension in TImports.AVAILABLE_FORMATS: import_as_dict["detected_format"] = extension return import_as_dict
@serializable
[docs] class BibFields(db.Model):
[docs] __tablename__ = "bib_fields"
[docs] __table_args__ = {"schema": "gn_imports"}
[docs] id_field = db.Column(db.Integer, primary_key=True)
[docs] id_destination = db.Column(db.Integer, ForeignKey(Destination.id_destination))
[docs] destination = relationship(Destination)
[docs] name_field = db.Column(db.Unicode, nullable=False, unique=True)
[docs] source_field = db.Column(db.Unicode, unique=True)
[docs] dest_field = db.Column(db.Unicode, unique=True)
[docs] fr_label = db.Column(db.Unicode, nullable=False)
[docs] eng_label = db.Column(db.Unicode, nullable=True)
[docs] type_field = db.Column(db.Unicode, nullable=True)
[docs] mandatory = db.Column(db.Boolean, nullable=False)
[docs] autogenerated = db.Column(db.Boolean, nullable=False)
[docs] mnemonique = db.Column(db.Unicode, db.ForeignKey(BibNomenclaturesTypes.mnemonique))
[docs] nomenclature_type = relationship("BibNomenclaturesTypes")
[docs] display = db.Column(db.Boolean, nullable=False)
[docs] multi = db.Column(db.Boolean)
[docs] optional_conditions = db.Column(db.ARRAY(db.Unicode), nullable=True)
[docs] mandatory_conditions = db.Column(db.ARRAY(db.Unicode), nullable=True)
[docs] entities = relationship("EntityField", back_populates="field")
@property
[docs] def source_column(self): return self.source_field if self.source_field else self.dest_field
@property
[docs] def dest_column(self): return self.dest_field if self.dest_field else self.source_field
[docs] def __str__(self): return self.fr_label
[docs] cor_role_mapping = db.Table( "cor_role_mapping", db.Column("id_role", db.Integer, db.ForeignKey(User.id_role), primary_key=True), db.Column( "id_mapping", db.Integer, db.ForeignKey("gn_imports.t_mappings.id"), primary_key=True, ), schema="gn_imports", )
[docs] class MappingTemplate(db.Model):
[docs] __tablename__ = "t_mappings"
[docs] __table_args__ = {"schema": "gn_imports"}
[docs] id = db.Column(db.Integer, primary_key=True)
[docs] id_destination = db.Column(db.Integer, ForeignKey(Destination.id_destination), nullable=False)
[docs] destination = relationship(Destination)
[docs] label = db.Column(db.Unicode(255), nullable=False)
[docs] type = db.Column(db.Unicode(10), nullable=False)
[docs] active = db.Column(db.Boolean, nullable=False, default=True, server_default="true")
[docs] public = db.Column(db.Boolean, nullable=False, default=False, server_default="false")
@property
[docs] def cruved(self): scopes_by_action = get_scopes_by_action(module_code="IMPORT", object_code="MAPPING") return { action: self.has_instance_permission(scope) for action, scope in scopes_by_action.items() }
[docs] __mapper_args__ = { "polymorphic_on": type, }
[docs] owners = relationship( User, lazy="joined", secondary=cor_role_mapping, )
[docs] def has_instance_permission(self, scope: int, user=None): if user is None: user = g.current_user if scope == 0: return False elif scope in (1, 2): return user in self.owners or ( scope == 2 and user.id_organisme is not None and user.id_organisme in [owner.id_organisme for owner in self.owners] ) elif scope == 3: return True
@staticmethod
[docs] def filter_by_scope(scope, user=None): if user is None: user = g.current_user if scope == 0: return sa.false() elif scope in (1, 2): filters = [ MappingTemplate.public == True, MappingTemplate.owners.any(id_role=user.id_role), ] if scope == 2 and user.id_organisme is not None: filters.append(MappingTemplate.owners.any(id_organisme=user.id_organisme)) return sa.or_(*filters) elif scope == 3: return sa.true() else: raise Exception(f"Unexpected scope {scope}")
[docs] def optional_conditions_to_jsonschema(name_field: str, optional_conditions: Iterable[str]) -> dict: """ Convert optional conditions into a JSON schema. Parameters ---------- name_field : str The name of the field. optional_conditions : Iterable[str] The optional conditions. Returns ------- dict The JSON schema. Notes ----- The JSON schema is created to ensure that if any of the optional conditions is not provided, the name_field is required. """ assert isinstance(optional_conditions, list) assert len(optional_conditions) > 0 return { "anyOf": [ { "if": { "not": { "properties": { field_opt: {"type": "string"} for field_opt in optional_conditions } } }, "then": {"required": [name_field]}, } ] }
# TODO move to utils lib
[docs] def get_fields_of_an_entity( entity: "Entity", columns: Optional[List[str]] = None, optional_where_clause: Optional[Any] = None, ) -> List["BibFields"]: """ Get all BibFields associated with a given entity. Parameters ---------- entity : Entity The entity to get the fields for. columns : Optional[List[str]], optional The columns to retrieve. If None, all columns are retrieved. optional_where_clause : Optional[Any], optional An optional where clause to apply to the query. Returns ------- List[BibFields] The BibFields associated with the given entity. """ select_args = [BibFields] query = sa.select(BibFields).where( BibFields.entities.any(EntityField.entity == entity), ) if columns: select_args = [getattr(BibFields, col) for col in columns] query.with_only_columns(*select_args) if optional_where_clause is not None: query = query.where(optional_where_clause) return db.session.scalars(query).all()
@serializable
[docs] class FieldMapping(MappingTemplate):
[docs] __tablename__ = "t_fieldmappings"
[docs] __table_args__ = {"schema": "gn_imports"}
[docs] id = db.Column(db.Integer, ForeignKey(MappingTemplate.id), primary_key=True)
[docs] values = db.Column(MutableDict.as_mutable(JSON))
[docs] __mapper_args__ = { "polymorphic_identity": "FIELD", }
@staticmethod
[docs] def validate_values(field_mapping_json, destination=None): """ Validate the field mapping values returned by the client form. Parameters ---------- field_mapping_json : dict The field mapping values. Raises ------ ValueError If the field mapping values are invalid. """ bib_fields_col = [ "name_field", "autogenerated", "mandatory", "multi", "optional_conditions", "mandatory_conditions", ] (g.destination if (destination is None) else destination) entities_for_destination: List[Entity] = ( Entity.query.filter_by( destination=(g.destination if (destination is None) else destination) ) .order_by(sa.desc(Entity.order)) .all() ) fields = [] for entity in entities_for_destination: # Get fields associated to this entity and exists in the given field mapping fields_of_ent = get_fields_of_an_entity( entity, columns=bib_fields_col, optional_where_clause=sa.and_( sa.or_( ~BibFields.entities.any(EntityField.entity != entity), BibFields.name_field == entity.unique_column.name_field, ), BibFields.name_field.in_(field_mapping_json.keys()), ), ) # if the only column corresponds to id_columns, we only do the validation on the latter if [entity.unique_column.name_field] == [f.name_field for f in fields_of_ent]: fields.extend(fields_of_ent) else: # if other columns than the id_columns are used, we need to check every fields of this entity fields.extend( get_fields_of_an_entity( entity, columns=bib_fields_col, optional_where_clause=sa.and_( BibFields.destination == (g.destination if (destination is None) else destination), BibFields.display == True, ), ) ) schema = { "type": "object", "properties": { field.name_field: { "type": ( "boolean" if field.autogenerated else ("array" if field.multi else "string") ), } for field in fields }, "required": [ field.name_field for field in fields if field.mandatory and not field.optional_conditions ], "dependentRequired": { field.name_field: field.mandatory_conditions for field in fields if field.mandatory_conditions }, "additionalProperties": False, } optional_conditions = [ optional_conditions_to_jsonschema(field.name_field, field.optional_conditions) for field in fields if field.optional_conditions ] if optional_conditions: schema["allOf"] = optional_conditions try: validate_json(field_mapping_json, schema) except JSONValidationError as e: raise ValueError(e.message)
@serializable
[docs] class ContentMapping(MappingTemplate):
[docs] __tablename__ = "t_contentmappings"
[docs] __table_args__ = {"schema": "gn_imports"}
[docs] id = db.Column(db.Integer, ForeignKey(MappingTemplate.id), primary_key=True)
[docs] values = db.Column(MutableDict.as_mutable(JSON))
[docs] __mapper_args__ = { "polymorphic_identity": "CONTENT", }
@staticmethod
[docs] def validate_values(values, destination=None): nomenclature_fields = ( BibFields.query.filter( BibFields.destination == (g.destination if (destination is None) else destination), BibFields.nomenclature_type != None, ) .options( joinedload(BibFields.nomenclature_type).joinedload( BibNomenclaturesTypes.nomenclatures ), ) .all() ) properties = {} for nomenclature_field in nomenclature_fields: cd_nomenclatures = [ nomenclature.cd_nomenclature for nomenclature in nomenclature_field.nomenclature_type.nomenclatures ] properties[nomenclature_field.mnemonique] = { "type": "object", "patternProperties": { "^.*$": { "type": "string", "enum": cd_nomenclatures, }, }, } schema = { "type": "object", "properties": properties, "additionalProperties": False, } try: validate_json(values, schema) except JSONValidationError as e: raise ValueError(e.message)