"""
Serialize function for SQLAlchemy models
"""
from inspect import signature, getattr_static
from warnings import warn
from collections import defaultdict, ChainMap
from itertools import chain
from functools import lru_cache
from uuid import UUID
from sqlalchemy.orm import ColumnProperty
from sqlalchemy import inspect
from sqlalchemy.ext.hybrid import hybrid_property, HYBRID_PROPERTY
from sqlalchemy.types import DateTime, Date, Time
from sqlalchemy.dialects.postgresql.base import UUID
"""
List of data type who need a particular serialization
@TODO MISSING FLOAT
"""
[docs]
SERIALIZERS = {
"date": lambda x: str(x) if x else None,
"datetime": lambda x: str(x) if x else None,
"time": lambda x: str(x) if x else None,
"timestamp": lambda x: str(x) if x else None,
"uuid": lambda x: str(x) if x else None,
"numeric": lambda x: str(x) if x else None,
}
[docs]
def get_serializer(col):
if isinstance(col, ColumnProperty):
col_type = col.columns[0].type
if isinstance(col_type, UUID):
return str
elif isinstance(col_type, Date):
return str
elif isinstance(col_type, Time):
return str
elif isinstance(col_type, DateTime):
return str
else:
return None
elif isinstance(col, hybrid_property):
# TODO Does some hybrid property need conversion?
return None
else:
# TODO May be support CompositeProperty, …
return None
[docs]
def get_serializable_decorator(fields=[], exclude=[], stringify=True):
default_fields = fields
default_exclude = exclude
default_stringify = stringify
firstlevel_default_fields = {field.split(".")[0] for field in default_fields}
def _serializable(cls):
"""
Décorateur de classe pour les DB.Models
Permet de rajouter la fonction as_dict
qui est basée sur le mapping SQLAlchemy
"""
mapper = inspect(cls)
def get_cls_db_columns():
"""
Liste des propriétés sérialisables de la classe
associées à leur sérializer en fonction de leur type
"""
cls_db_columns = []
for prop in cls.__mapper__.column_attrs:
if isinstance(prop, ColumnProperty): # and len(prop.columns) == 1:
# -1 : si on est dans le cas d'un heritage on recupere le dernier element de prop
# qui correspond à la derniere redefinition de cette colonne
db_col = prop.columns[-1]
# HACK
# -> Récupération du nom de l'attribut sans la classe
name = str(prop).split(".", 1)[1]
if db_col.type.__class__.__name__ == "Geometry":
continue
if name in exclude:
continue
cls_db_columns.append(
(
name,
SERIALIZERS.get(db_col.type.__class__.__name__.lower(), lambda x: x),
)
)
"""
Liste des propriétés synonymes
sérialisables de la classe
associées à leur sérializer en fonction de leur type
"""
for syn in cls.__mapper__.synonyms:
col = cls.__mapper__.c[syn.name]
# if column type is geometry pass
if col.type.__class__.__name__ == "Geometry":
pass
# else add synonyms in columns properties
cls_db_columns.append(
(syn.key, SERIALIZERS.get(col.type.__class__.__name__.lower(), lambda x: x))
)
return cls_db_columns
def get_cls_db_relationships():
"""
Liste des propriétés de type relationship
uselist permet de savoir si c'est une collection de sous objet
sa valeur est déduite du type de relation
(OneToMany, ManyToOne ou ManyToMany)
"""
return [
(db_rel.key, db_rel.uselist, getattr(cls, db_rel.key).mapper.class_)
for db_rel in cls.__mapper__.relationships
]
@lru_cache(maxsize=None)
def get_columns_and_relationships(fields=None, exclude=None):
deferred_columns = {
key for key, props in mapper.column_attrs.items() if props.deferred
}
_default_exclude = set(default_exclude) | deferred_columns
additional_fields = set()
if fields is None:
fields = default_fields
elif fields:
base_fields = set()
relationship_fields = set()
for field in fields:
if field.split(".")[0] in mapper.relationships:
relationship_fields.add(field)
elif field.startswith("+"):
field = field.lstrip("+")
additional_fields.add(field)
else:
base_fields.add(field)
# We remove given fields from default_exclude!
_default_exclude -= {field}
if base_fields:
fields = base_fields | additional_fields | relationship_fields
else: # given fields are only relationships or additional fields, but no columns
# if we have some columns in default fields, we add additional fields to these columns
if firstlevel_default_fields - set(mapper.relationships.keys()):
fields = set(default_fields) | additional_fields | relationship_fields
# else, we do not add additional fields as we want ALL columns (default behaviour), not only additional columns
else:
fields = set(default_fields) | relationship_fields
if exclude is None:
exclude = _default_exclude
# take 'a' instead of 'a.b'
firstlevel_fields = [rel.split(".")[0] for rel in fields]
properties = {
key: None for key in dir(cls) if isinstance(getattr_static(cls, key), property)
}
hybrid_properties = {
key: attr
for key, attr in mapper.all_orm_descriptors.items()
if attr.extension_type == HYBRID_PROPERTY
}
for field in (
(set([f for f in fields if "." not in f]) | additional_fields)
- set(mapper.attrs.keys())
- set(properties.keys())
- set(hybrid_properties.keys())
):
raise Exception(f"Field '{field}' does not exist on {cls}.")
for field in set([f.split(".")[0] for f in fields if "." in f]) - set(
mapper.relationships.keys()
):
raise Exception(f"Relationship '{field}' does not exist on {cls}.")
_columns = {
key: col
for key, col in ChainMap(
dict(mapper.column_attrs), properties, hybrid_properties
).items()
if key in fields
}
_relationships = {
key: rel for key, rel in mapper.relationships.items() if key in firstlevel_fields
}
if not _columns:
_columns = ChainMap(dict(mapper.column_attrs), properties, hybrid_properties)
if exclude:
_columns = {key: col for key, col in _columns.items() if key not in exclude}
_relationships = {
key: rel for key, rel in _relationships.items() if key not in exclude
}
_columns = {key: (col, get_serializer(col)) for key, col in _columns.items()}
return fields, exclude, _columns, _relationships
def serializefn(
self,
recursif=False,
columns=[],
relationships=[],
fields=None,
exclude=None,
stringify=None,
unloaded=None,
depth=None,
_excluded_mappers=[],
):
"""
Méthode qui renvoie les données de l'objet sous la forme d'un dict
Parameters
----------
fields: liste
Liste des champs (colonne native ou relationship) à prendre en compte, e.g. :
fields=['column1', 'column2', 'child1', 'child2']
Si fields n’est pas spécifié, l’ensemble des colonnes sont sélectionnées.
Les relationships doivent être explicitement spécifiées dans fields pour être
prise en compte en plus des propres colonnes de l’objet, e.g. :
fields=['child']
Il est également possible de spécifier les champs d’une relationship
à prendre en compte, sans limite de profondeur, avec un '.' :
fields=['child.column1', 'child.otherchild.column2']
exclude: list
Liste de champs à exclure.
Les exclusions s’appliquent après la sélection des champs avec fields.
Il est également possible d’utiliser la notation avec un '.', e.g. :
fields=['child'],exclude=['child.column2']
Les arguments ci-après sont dépréciés en faveur de fields et exclude.
recursif: boolean
Spécifie si on veut que les sous-objets (relationship)
depth: entier
spécifie le niveau de niveau de récursion:
0 juste l'objet
1 l'objet et ses sous-objets
2 ...
si depth est spécifié :
recursif prend la valeur True
si depth n'est pas spécifié et recursif est à True :
il n'y a pas de limite à la récursivité
columns: liste
liste des colonnes qui doivent être prises en compte
relationships: liste
liste des relationships qui doivent être prise en compte
"""
if stringify is None:
stringify = default_stringify
if columns:
warn(
"'columns' argument is deprecated. Please add columns to serialize "
"directly in 'fields' argument.",
DeprecationWarning,
)
if fields is None:
fields = []
fields = chain(fields, columns)
if relationships:
warn(
"'relationships' argument is deprecated. Please add relationships to serialize "
"directly in 'fields' argument.",
DeprecationWarning,
)
if fields is None:
fields = []
fields = chain(fields, relationships)
if depth:
recursif = True
if recursif:
warn(
"'recursif' argument is deprecated. Please add relationships to serialize "
"directly in 'fields' argument.",
DeprecationWarning,
)
_excluded_mappers = _excluded_mappers + [mapper]
if depth is None or depth > 0:
if fields is None:
fields = []
fields = chain(
fields,
[
rel.key
for rel in mapper.relationships
if rel.key not in fields and rel.mapper not in _excluded_mappers
],
)
if depth:
depth -= 1
if fields is not None:
fields = frozenset(fields)
if exclude is not None:
exclude = frozenset(exclude)
fields, exclude, _columns, _relationships = get_columns_and_relationships(
fields, exclude
)
serialize_kwargs = {
"recursif": recursif,
"depth": depth,
"unloaded": unloaded,
"_excluded_mappers": _excluded_mappers,
}
data = {}
for key, props in _columns.items():
col, serializer = props
data[key] = getattr(self, key)
if stringify and serializer is not None and data[key] is not None:
data[key] = serializer(data[key])
for key, rel in _relationships.items():
if unloaded is not None:
m = inspect(self)
if key in m.unloaded:
err = f"Relationship '{key}' on '{self}' is not loaded"
if unloaded == "raise":
raise Exception(err)
elif unloaded == "warn":
warn(err)
kwargs = serialize_kwargs.copy()
_fields = [
field.split(".", 1)[1] for field in fields if field.startswith(f"{key}.")
]
kwargs["fields"] = _fields or None
_exclude = [
field.split(".", 1)[1] for field in exclude if field.startswith(f"{key}.")
]
kwargs["exclude"] = _exclude or None
if rel.uselist:
data[key] = [o.as_dict(**kwargs) for o in getattr(self, key)]
else:
rel_object = getattr(self, rel.key)
if rel_object:
data[rel.key] = rel_object.as_dict(**kwargs)
else: # relationship may be null
data[rel.key] = None
return data
serializefn.__original_decorator = True
def populatefn(self, dict_in, recursif=False):
"""
Méthode qui initie les valeurs de l'objet à partir d'un dictionnaire
Parameters
----------
dict_in : dictionnaire contenant les valeurs à passer à l'objet
recursif: si on renseigne les relationships
"""
cls_db_columns_key = list(map(lambda x: x[0], get_cls_db_columns()))
# populate cls_db_columns
for key in dict_in:
if key in cls_db_columns_key:
setattr(self, key, dict_in[key])
# si non recursif, on ne traite pas les relationship
if not recursif:
return self
# gestion des relationships
frel = get_cls_db_relationships()
for rel, uselist, Model in frel:
if rel not in dict_in:
continue
values = dict_in.get(rel)
if not values:
# check if None or {}
setattr(self, rel, [] if uselist else None)
continue
# pour pouvoir traiter les cas uselist et not uselist de la même manière
if not uselist:
values = [values]
# get id_field_name
id_field_name = inspect(Model).primary_key[0].name
# si on a pas une liste de dictionaires
# -> on suppose qu'on a une liste d'id
# test sur le premier element de la liste
# on cree une liste [ ... { <id_field_name>: id_value } ... ]
if not isinstance(values[0], dict):
values_inter = []
for id_value in values:
data = {}
data[id_field_name] = id_value
values_inter.append(data)
values = values_inter
# preload with id
# pour faire une seule requête
ids = filter(lambda x: x, map(lambda x: x.get(id_field_name), values))
preload_res_with_ids = Model.query.where(
getattr(Model, id_field_name).in_(ids)
).all()
# resul
v_obj = []
for data in values:
id_value = data.pop(id_field_name, None)
res = (
# si on a une id -> on recupère dans la liste preload_res_with_ids
# TODO trouver un find plus propre ?
list(
filter(
lambda x: getattr(x, id_field_name) == id_value,
preload_res_with_ids,
)
)[0]
if id_value and len(preload_res_with_ids)
# sinon on cree une nouvelle instance
else Model()
)
if hasattr(res, "from_dict"):
res.from_dict(data, recursif)
v_obj.append(res)
# attribution de la relation
# si uselist est à false -> on prend le premier de la liste
setattr(self, rel, v_obj if uselist else v_obj[0])
return self
if hasattr(cls, "as_dict"):
# the Model has a as_dict(self, data) method, which expects serialized data as argument
if len(signature(cls.as_dict).parameters) == 2:
userfn = cls.as_dict
def chainedserializefn(self, *args, **kwargs):
return userfn(self, serializefn(self, *args, **kwargs))
cls.as_dict = chainedserializefn
# the Model has its own as_dict method
elif "as_dict" in vars(cls):
# the serialize decorator is applied a second time with new default arguments
if hasattr(cls.as_dict, "__original_decorator") and (
default_fields or default_exclude
):
cls.as_dict = serializefn
# which will call super().as_dict() it-self
else:
pass
# the Model has a as_dict method inherited, we replace-it with the serializer which will take child fields into account
else:
cls.as_dict = serializefn
else:
cls.as_dict = serializefn
cls.from_dict = populatefn
return cls
return _serializable
[docs]
def serializable(*args, **kwargs):
if not kwargs and len(args) == 1 and isinstance(args[0], type): # e.g. @serializable
return get_serializable_decorator()(args[0])
else:
return get_serializable_decorator(*args, **kwargs) # e.g. @serializable(exclude=['field'])