Code source de app.utils.utilssqlalchemy

"""
Fonctions utilitaires
"""

import json
from functools import wraps

from dateutil import parser
from flask import Response, current_app
from werkzeug.datastructures import Headers

from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy import create_engine, MetaData
from sqlalchemy.orm import ColumnProperty


# def testDataType(value, sqlType, paramName):
#     if sqlType == DB.Integer or isinstance(sqlType, (DB.Integer)):
#         try:
#             int(value)
#         except Exception as e:
#             return '{0} must be an integer'.format(paramName)
#     if sqlType == DB.Numeric or isinstance(sqlType, (DB.Numeric)):
#         try:
#             float(value)
#         except Exception as e:
#             return '{0} must be an float (decimal separator .)'\
#                 .format(paramName)
#     elif sqlType == DB.DateTime or isinstance(sqlType, (DB.Date, DB.DateTime)):
#         try:
#             dt = parser.parse(value)
#         except Exception as e:
#             return '{0} must be an date (yyyy-mm-dd)'.format(paramName)
#     return None

"""
    Liste des types de donnees sql qui
    necessite une sérialisation particulière en
     MANQUE FLOAT
"""
[docs] SERIALIZERS = { "date": lambda x: str(x) if x else None, "datetime": lambda x: str(x) if x else None, "time": lambda x: str(x) if x else None, "timestamp": lambda x: str(x) if x else None, "uuid": lambda x: str(x) if x else None, }
[docs] class GenericTable: """ Classe permettant de créer à la volée un mapping d'une vue avec la base de données par rétroingénierie """ def __init__(self, tableName, schemaName, geometry_field): meta = MetaData(schema=schemaName, bind=DB.engine) meta.reflect(views=True) try: self.tableDef = meta.tables["{}.{}".format(schemaName, tableName)] except KeyError: raise KeyError("table doesn't exists") self.geometry_field = geometry_field # Mise en place d'un mapping des colonnes en vue d'une sérialisation self.serialize_columns = [ (name, SERIALIZERS.get(db_col.type.__class__.__name__.lower(), lambda x: x)) for name, db_col in self.tableDef.columns.items() if not db_col.type.__class__.__name__ == "Geometry" ] self.columns = [column.name for column in self.tableDef.columns]
[docs] def as_dict(self, data): return { item: _serializer(getattr(data, item)) for item, _serializer in self.serialize_columns }
[docs] def serializeQuery(data, columnDef): rows = [ { c["name"]: getattr(row, c["name"]) for c in columnDef if getattr(row, c["name"]) is not None } for row in data ] return rows
[docs] def serializeQueryTest(data, columnDef): rows = list() for row in data: inter = {} for c in columnDef: if getattr(row, c["name"]) is not None: if isinstance(c["type"], (DB.Date, DB.DateTime, UUID)): inter[c["name"]] = str(getattr(row, c["name"])) elif isinstance(c["type"], DB.Numeric): inter[c["name"]] = float(getattr(row, c["name"])) elif not isinstance(c["type"], Geometry): inter[c["name"]] = getattr(row, c["name"]) rows.append(inter) return rows
[docs] def serializeQueryOneResult(row, columnDef): row = { c["name"]: getattr(row, c["name"]) for c in columnDef if getattr(row, c["name"]) is not None } return row
[docs] def serializable(cls): """ Décorateur de classe pour les DB.Models Permet de rajouter la fonction as_dict qui est basée sur le mapping SQLAlchemy """ """ Liste des propriétés sérialisables de la classe associées à leur sérializer en fonction de leur type """ cls_db_columns = [] for prop in cls.__mapper__.column_attrs: if isinstance(prop, ColumnProperty) and len(prop.columns) == 1: db_col = prop.columns[0] # HACK # -> Récupération du nom de l'attribut sans la classe name = str(prop).split(".", 1)[1] if not db_col.type.__class__.__name__ == "Geometry": cls_db_columns.append( ( name, SERIALIZERS.get( db_col.type.__class__.__name__.lower(), lambda x: x ), ) ) """ Liste des propriétés synonymes sérialisables de la classe associées à leur sérializer en fonction de leur type """ for syn in cls.__mapper__.synonyms: col = cls.__mapper__.c[syn.name] # if column type is geometry pass if col.type.__class__.__name__ == "Geometry": pass # else add synonyms in columns properties cls_db_columns.append( (syn.key, SERIALIZERS.get(col.type.__class__.__name__.lower(), lambda x: x)) ) """ Liste des propriétés de type relationship uselist permet de savoir si c'est une collection de sous objet sa valeur est déduite du type de relation (OneToMany, ManyToOne ou ManyToMany) """ cls_db_relationships = [ (db_rel.key, db_rel.uselist) for db_rel in cls.__mapper__.relationships ] def serializefn(self, recursif=False, columns=()): """ Méthode qui renvoie les données de l'objet sous la forme d'un dict Parameters ---------- recursif: boolean Spécifie si on veut que les sous objet (relationship) soit également sérialisé columns: liste liste des colonnes qui doivent être prises en compte """ if columns: fprops = list(filter(lambda d: d[0] in columns, cls_db_columns)) else: fprops = cls_db_columns out = {item: _serializer(getattr(self, item)) for item, _serializer in fprops} if recursif is False: return out for rel, uselist in cls_db_relationships: if getattr(self, rel) is None: break if uselist is True: out[rel] = [x.as_dict(recursif) for x in getattr(self, rel)] else: out[rel] = getattr(self, rel).as_dict(recursif) return out cls.as_dict = serializefn return cls
[docs] def json_resp(fn): """ Décorateur transformant le résultat renvoyé par une vue en objet JSON """ @wraps(fn) def _json_resp(*args, **kwargs): res = fn(*args, **kwargs) if isinstance(res, tuple): res, status = res else: status = 200 if not res: status = 404 res = {"message": "not found"} return Response(json.dumps(res), status=status, mimetype="application/json") return _json_resp
[docs] def csv_resp(fn): """ Décorateur transformant le résultat renvoyé en un fichier csv """ @wraps(fn) def _csv_resp(*args, **kwargs): res = fn(*args, **kwargs) filename, data, columns, separator = res outdata = [separator.join(columns)] headers = Headers() headers.add("Content-Type", "text/plain") headers.add( "Content-Disposition", "attachment", filename="export_%s.csv" % filename ) for o in data: outdata.append( separator.join( '"%s"' % (o.get(i), "")[o.get(i) is None] for i in columns ) ) out = "\r\n".join(outdata) return Response(out, headers=headers) return _csv_resp