import re
import os
import math
import boto3
import botocore
import logging
import unicodedata
from abc import ABC
from shutil import rmtree
from PIL import Image, ImageOps
from werkzeug.utils import secure_filename
from flask import current_app
import urllib.request
[docs]
logger = logging.getLogger()
[docs]
def remove_dir(dirpath):
"""
Fonction de suppression d'un répertoire
"""
if dirpath == "/":
raise Exception("rm / is not possible")
if not os.path.exists(dirpath):
raise FileNotFoundError("not exists {}".format(dirpath))
if not os.path.isdir(dirpath):
raise FileNotFoundError("not isdir {}".format(dirpath))
try:
rmtree(dirpath)
except (OSError, IOError) as e:
raise e
[docs]
def removeDisallowedFilenameChars(uncleanString):
cleanedString = secure_filename(uncleanString)
cleanedString = unicodedata.normalize("NFKD", uncleanString)
cleanedString = re.sub("[ ]+", "_", cleanedString)
cleanedString = re.sub("[^0-9a-zA-Z_-]", "", cleanedString)
return cleanedString
[docs]
class FileManagerServiceInterface(ABC):
"""
Abstract class to media file manipulation functions
Class who inherite of this class must implement the following abstract methods:
- remove_file
- rename_file
- upload_file
"""
def __init__(self):
self.dir_thumb_base = os.path.join(
current_app.static_folder, current_app.config["UPLOAD_FOLDER"], "thumb"
)
self.dir_file_base = os.path.join(
current_app.static_folder, current_app.config["UPLOAD_FOLDER"]
)
[docs]
def _get_new_chemin(self, old_chemin, old_title, new_title):
return old_chemin.replace(
removeDisallowedFilenameChars(old_title),
removeDisallowedFilenameChars(new_title),
)
[docs]
def _generate_file_name(self, file, id_media, cd_ref, titre):
return "{cd_ref}_{id_media}_{file_name}.{ext}".format(
cd_ref=str(cd_ref),
id_media=str(id_media),
file_name=removeDisallowedFilenameChars(titre),
ext=file.filename.rsplit(".", 1)[1],
)
[docs]
def _get_image_object(self, media):
if media.chemin:
img = Image.open(self._get_media_path_from_db(media.chemin))
else:
img = url_to_image(media.url)
return img
[docs]
def remove_file(self, filepath):
try:
os.remove(self._get_media_path_from_db(filepath))
except Exception:
pass
[docs]
def rename_file(self, old_chemin, old_title, new_title):
new_chemin = self._get_new_chemin(old_chemin, old_title, new_title)
print(old_chemin, self._get_media_path_from_db(old_chemin))
os.rename(
os.path.join(self._get_media_path_from_db(old_chemin)),
os.path.join(self._get_media_path_from_db(new_chemin)),
)
return new_chemin
[docs]
def upload_file(self, file, id_media, cd_ref, titre):
filename = self._generate_file_name(file, id_media, cd_ref, titre)
filepath = os.path.join(self.dir_file_base, filename)
file.save(filepath)
return ("/").join(["static", current_app.config["UPLOAD_FOLDER"], filename])
[docs]
def remove_thumb(self, id_media):
# suppression des thumbnails
try:
remove_dir(os.path.join(self.dir_thumb_base, str(id_media)))
except (FileNotFoundError, IOError, OSError) as e:
logger.error(e)
pass
[docs]
def create_thumb(self, media, size, regenerate=False):
id_media = media.id_media
thumbdir = os.path.join(self.dir_thumb_base, str(id_media))
thumbpath = os.path.join(thumbdir, "{}x{}.jpg".format(size[0], size[1]))
if regenerate:
self.remove_file(thumbpath)
# Test if media exists
if os.path.exists(thumbpath):
return thumbpath
# Get Image
img = self._get_image_object(media)
# Création du thumbnail
resizeImg = resizeAndPad(img, size)
# Sauvegarde de l'image
if not os.path.exists(thumbdir):
os.makedirs(thumbdir)
resizeImg.save(thumbpath)
return thumbpath
[docs]
class LocalFileManagerService(FileManagerServiceInterface):
pass
[docs]
class S3FileManagerService(FileManagerServiceInterface):
"""
Class permettant de manipuler des fichiers stockés
dans un cloud S3 (AWS, OVH, etc..)
"""
def __init__(self):
super().__init__()
self.s3 = boto3.client(
"s3",
aws_access_key_id=current_app.config["S3_KEY"],
aws_secret_access_key=current_app.config["S3_SECRET"],
endpoint_url=current_app.config["S3_ENDPOINT"],
region_name=current_app.config["S3_REGION_NAME"],
)
[docs]
def _get_image_object(self, media):
if media.chemin:
img = url_to_image(os.path.join(current_app.config["S3_PUBLIC_URL"], media.chemin))
else:
img = url_to_image(media.url)
return img
[docs]
def remove_file(self, filepath):
try:
# TODO prévoir un message d'erreur si echec suppression du bucket ?
self.s3.delete_object(Bucket=current_app.config["S3_BUCKET_NAME"], Key=filepath)
except botocore.exceptions.ParamValidationError: # filepath is None (upload)
pass
[docs]
def rename_file(self, old_chemin, old_title, new_title):
new_chemin = self._get_new_chemin(old_chemin, old_title, new_title)
self.s3.copy_object(
Bucket=current_app.config["S3_BUCKET_NAME"],
CopySource=os.path.join(current_app.config["S3_BUCKET_NAME"], old_chemin),
Key=new_chemin,
)
self.s3.delete_object(Bucket=current_app.config["S3_BUCKET_NAME"], Key=old_chemin)
return new_chemin
[docs]
def upload_file(self, file, id_media, cd_ref, titre):
filename = self._generate_file_name(file, id_media, cd_ref, titre)
self.s3.upload_fileobj(
file,
current_app.config["S3_BUCKET_NAME"],
os.path.join(current_app.config["S3_FOLDER"], filename),
ExtraArgs={
"ACL": "public-read",
"ContentType": file.content_type, # sans ça le content type est par défaut binary/octet-stream
},
)
return os.path.join(current_app.config["S3_FOLDER"], filename)
if current_app.config.get("S3_BUCKET_NAME"): # Use S3 upload
[docs]
FILEMANAGER = S3FileManagerService()
else:
FILEMANAGER = LocalFileManagerService()
# METHOD #2: PIL
[docs]
def url_to_image(url):
"""
Récupération d'une image à partir d'une url
"""
local_filename, headers = urllib.request.urlretrieve(url)
try:
img = Image.open(local_filename)
urllib.request.urlcleanup()
except IOError:
raise Exception("Media is not an image")
return img
[docs]
def add_border(img, border, color=0):
"""
Ajout d'une bordure à une image
"""
if isinstance(border, int) or isinstance(border, tuple):
bimg = ImageOps.expand(img, border=border, fill=color)
else:
raise RuntimeError("Border is not an integer or tuple!")
return bimg
[docs]
def calculate_border(initial_size, new_size, aspect):
"""
Calcul de la taille de l'image et de ces bordures
"""
i_h, i_w = initial_size
n_h, n_w = new_size
if aspect > 1:
# horizontal image
f_w = n_w
f_h = round(i_h * (n_w / i_w))
pad_vert = abs((f_h - n_h) / 2)
pad_top, pad_bot = math.floor(pad_vert), math.ceil(pad_vert)
pad_left, pad_right = 0, 0
elif aspect < 1:
# vertical image
f_h = n_h
f_w = round(i_w * (f_h / i_h))
pad_horz = abs((f_w - n_w) / 2)
pad_left, pad_right = math.floor(pad_horz), math.ceil(pad_horz)
pad_top, pad_bot = 0, 0
else:
# square image
f_h, f_w = new_size
pad_left, pad_right, pad_top, pad_bot = 0, 0, 0, 0
return ((f_h, f_w), (pad_left, pad_right, pad_top, pad_bot))
[docs]
def resizeAndPad(img, new_size, pad=True, padColor=0):
inital_w, inital_h = img.size
final_h = final_w = None
pad_left, pad_top, pad_right, pad_bot = (0, 0, 0, 0)
# aspect ratio of image
aspect = inital_w / inital_h
if new_size[1] == -1: # Si largeur non spécifé
final_h = new_size[0]
final_w = round(inital_w * (final_h / inital_h))
pad = False
elif new_size[0] == -1: # Si hauteur non spécifé
final_w = new_size[1]
final_h = round(inital_h * (final_w / inital_w))
pad = False
else:
final_h, final_w = new_size
if pad:
final, border = calculate_border((inital_h, inital_w), new_size, aspect)
final_h, final_w = final
pad_left, pad_right, pad_top, pad_bot = border
# scale and pad
scaled_img = img.resize((final_w, final_h))
if pad:
scaled_img = add_border(scaled_img, (pad_left, pad_top, pad_right, pad_bot), color=0)
return scaled_img