# coding: utf8
import os
import logging
from flask import Blueprint, request, current_app
from sqlalchemy import func, or_
from sqlalchemy.orm import joinedload
from pypnusershub import routes as fnauth
from . import filemanager
from . import db
from ..log import logmanager
from ..utils.utilssqlalchemy import json_resp, csv_resp
from ..utils.genericfunctions import calculate_offset_page
from .models import BibListes, CorNomListe, Taxref, BibNoms
[docs]
adresses = Blueprint("bib_listes", __name__)
[docs]
logger = logging.getLogger()
@adresses.route("/", methods=["GET"])
@json_resp
[docs]
def get_biblistes(id=None):
"""
retourne les contenu de bib_listes dans "data"
et le nombre d'enregistrements dans "count"
"""
data = (
db.session.query(BibListes, func.count(CorNomListe.id_nom).label("c"))
.outerjoin(CorNomListe)
.group_by(BibListes)
.order_by(BibListes.nom_liste)
.all()
)
maliste = {"data": [], "count": 0}
maliste["count"] = len(data)
for l in data:
d = l.BibListes.as_dict()
d["nb_taxons"] = l.c
maliste["data"].append(d)
return maliste
@adresses.route("/<regne>", methods=["GET"])
@adresses.route("/<regne>/<group2_inpn>", methods=["GET"])
@json_resp
[docs]
def get_biblistesbyTaxref(regne, group2_inpn=None):
q = db.session.query(BibListes)
if regne:
q = q.filter(or_(BibListes.regne == regne, BibListes.regne == None))
if group2_inpn:
q = q.filter(or_(BibListes.group2_inpn == group2_inpn, BibListes.group2_inpn == None))
results = q.all()
return [liste.as_dict() for liste in results]
@adresses.route("/exportcsv/<int:idliste>", methods=["GET"])
@csv_resp
[docs]
def getExporter_biblistesCSV(idliste=None):
"""
Exporter les taxons d'une liste dans un fichier csv
"""
liste = db.session.query(BibListes).get(idliste)
cleanNomliste = filemanager.removeDisallowedFilenameChars(liste.nom_liste)
data = (
db.session.query(Taxref)
.filter(BibNoms.cd_nom == Taxref.cd_nom)
.filter(BibNoms.id_nom == CorNomListe.id_nom)
.filter(CorNomListe.id_liste == idliste)
.all()
)
return (
cleanNomliste,
[nom.as_dict() for nom in data],
Taxref.__table__.columns.keys(),
",",
)
# ######## Route pour module edit and create biblistes ############
# Get data of list by id
@adresses.route("/<int:idliste>", methods=["GET"])
@json_resp
[docs]
def getOne_biblistes(idliste=None):
data = db.session.query(BibListes).filter_by(id_liste=idliste).first()
return data.as_dict()
# Get list of picto in repertory ./static/images/pictos
@adresses.route("/pictosprojet", methods=["GET"])
@json_resp
[docs]
def getPictos_files():
pictos = os.listdir(os.path.join(current_app.static_folder, "images/pictos"))
pictos.sort()
return pictos
# ######## PUT CREER/MODIFIER BIBLISTES ######################
@adresses.route("/", methods=["POST", "PUT"])
@adresses.route("/<int:id_liste>", methods=["POST", "PUT"])
@json_resp
@fnauth.check_auth(4)
[docs]
def insertUpdate_biblistes(id_liste=None):
res = request.get_json(silent=True)
data = {k: v or None for (k, v) in res.items()}
action = "INSERT"
message = "Liste créée"
if id_liste:
action = "UPDATE"
message = "Liste mise à jour"
bib_liste = BibListes(**data)
db.session.merge(bib_liste)
db.session.commit()
logmanager.log_action("bib_liste", bib_liste.id_liste, repr(bib_liste), action, message)
return bib_liste.as_dict()
# ####### Route pour module ajouter noms à la liste ##########################
# Get Taxons + taxref in a liste with id_liste
@adresses.route("/taxons/<int:idliste>", methods=["GET"])
@json_resp
[docs]
def getNoms_bibtaxons(idliste):
# Traitement des parametres
parameters = request.args
limit = parameters.get("limit", 100, int)
page = parameters.get("page", 1, int)
offset = parameters.get("offset", 0, int)
(limit, offset, page) = calculate_offset_page(limit, offset, page)
# Récupération du groupe de la liste
(regne, group2_inpn) = (
db.session.query(BibListes.regne, BibListes.group2_inpn)
.filter(BibListes.id_liste == idliste)
.one()
)
q = db.session.query(
BibNoms.id_nom,
BibNoms.cd_nom,
BibNoms.cd_ref,
BibNoms.nom_francais,
Taxref.nom_complet,
Taxref.regne,
Taxref.group2_inpn,
Taxref.id_rang,
).filter(BibNoms.cd_nom == Taxref.cd_nom)
if regne:
q = q.filter(or_(Taxref.regne == regne))
if group2_inpn:
q = q.filter(or_(Taxref.group2_inpn == group2_inpn))
subq = db.session.query(CorNomListe.id_nom).filter(CorNomListe.id_liste == idliste).subquery()
if parameters.get("existing"):
q = q.join(subq, subq.c.id_nom == BibNoms.id_nom)
else:
q = q.outerjoin(subq, subq.c.id_nom == BibNoms.id_nom).filter(subq.c.id_nom == None)
nbResultsWithoutFilter = q.count()
if parameters.get("cd_nom"):
try:
q = q.filter(BibNoms.cd_nom == int(parameters.get("cd_nom")))
except Exception:
pass
if parameters.get("nom_francais"):
q = q.filter(BibNoms.nom_francais.ilike(parameters.get("nom_francais") + "%"))
if parameters.get("nom_complet"):
q = q.filter(Taxref.nom_complet.ilike(parameters.get("nom_complet") + "%"))
if parameters.get("id_rang"):
q = q.filter(Taxref.id_rang.ilike(parameters.get("id_rang") + "%"))
# Order by
bibTaxonColumns = BibNoms.__table__.columns
taxrefColumns = Taxref.__table__.columns
if "orderby" in parameters:
if parameters["orderby"] in taxrefColumns:
orderCol = getattr(taxrefColumns, parameters["orderby"])
elif parameters["orderby"] in bibTaxonColumns:
orderCol = getattr(bibTaxonColumns, parameters["orderby"])
else:
orderCol = None
if "order" in parameters:
if parameters["order"] == "desc":
orderCol = orderCol.desc()
q = q.order_by(orderCol)
nbResults = q.count()
data = q.limit(limit).offset(offset).all()
results = []
for row in data:
data_as_dict = {}
data_as_dict["id_nom"] = row.id_nom
data_as_dict["cd_nom"] = row.cd_nom
data_as_dict["cd_ref"] = row.cd_ref
data_as_dict["nom_francais"] = row.nom_francais
data_as_dict["nom_complet"] = row.nom_complet
data_as_dict["regne"] = row.regne
data_as_dict["group2_inpn"] = row.group2_inpn
data_as_dict["id_rang"] = row.id_rang
results.append(data_as_dict)
return {
"items": results,
"total": nbResultsWithoutFilter,
"total_filtered": nbResults,
"limit": limit,
"page": page,
}
# POST - Ajouter les noms à une liste
@adresses.route("/addnoms/<int:idliste>", methods=["POST"])
@json_resp
@fnauth.check_auth(4)
[docs]
def add_cornomliste(idliste=None):
ids_nom = request.get_json(silent=True)
# TODO add test if idlist exists
for id in ids_nom:
cornom = {"id_nom": id, "id_liste": idliste}
add_nom = CorNomListe(**cornom)
db.session.add(add_nom)
db.session.commit()
logmanager.log_action("cor_nom_liste", idliste, "", "AJOUT NOM", "Noms ajouté à la liste")
return ids_nom
# POST - Enlever les nom dans une liste
@adresses.route("/deletenoms/<int:idliste>", methods=["POST"])
@json_resp
@fnauth.check_auth(4)
[docs]
def delete_cornomliste(idliste=None):
ids_nom = request.get_json(silent=True)
for id in ids_nom:
del_nom = (
db.session.query(CorNomListe)
.filter(CorNomListe.id_liste == idliste)
.filter(CorNomListe.id_nom == id)
.first()
)
db.session.delete(del_nom)
db.session.commit()
logmanager.log_action(
"cor_nom_liste",
idliste,
"",
"SUPPRESSION NOM",
"Noms supprimés de la liste",
)
return ids_nom