Mercurial > libervia-backend
view libervia/backend/plugins/plugin_xep_0264.py @ 4232:0fbe5c605eb6
tests (unit/webrtc,XEP-0176, XEP-0234): Fix tests and add webrtc file transfer tests:
fix 441
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 06 Apr 2024 12:59:50 +0200 |
parents | 4b842c1fb686 |
children | 0d7bb4df2343 |
line wrap: on
line source
#!/usr/bin/env python3 # SàT plugin for managing xep-0264 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) # Copyright (C) 2014 Emmanuel Gil Peyrot (linkmauve@linkmauve.fr) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from libervia.backend.core.i18n import _ from libervia.backend.core.constants import Const as C from libervia.backend.core.log import getLogger log = getLogger(__name__) from twisted.internet import threads from twisted.python.failure import Failure from zope.interface import implementer from wokkel import disco, iwokkel from libervia.backend.core import exceptions import hashlib try: from PIL import Image, ImageOps except: raise exceptions.MissingModule( "Missing module pillow, please download/install it from https://python-pillow.github.io" ) # cf. https://stackoverflow.com/a/23575424 from PIL import ImageFile ImageFile.LOAD_TRUNCATED_IMAGES = True try: from twisted.words.protocols.xmlstream import XMPPHandler except ImportError: from wokkel.subprotocols import XMPPHandler MIME_TYPE = "image/jpeg" SAVE_FORMAT = "JPEG" # (cf. Pillow documentation) NS_THUMBS = "urn:xmpp:thumbs:1" PLUGIN_INFO = { C.PI_NAME: "XEP-0264", C.PI_IMPORT_NAME: "XEP-0264", C.PI_TYPE: "XEP", C.PI_MODES: C.PLUG_MODE_BOTH, C.PI_PROTOCOLS: ["XEP-0264"], C.PI_DEPENDENCIES: ["XEP-0234"], C.PI_MAIN: "XEP_0264", C.PI_HANDLER: "yes", C.PI_DESCRIPTION: _("""Thumbnails handling"""), } class XEP_0264(object): SIZE_SMALL = (320, 320) SIZE_MEDIUM = (640, 640) SIZE_BIG = (1280, 1280) SIZE_FULL_SCREEN = (2560, 2560) # FIXME: SIZE_FULL_SCREEN is currently discarded as the resulting files are too big # for BoB # TODO: use an other mechanism than BoB for bigger files SIZES = (SIZE_SMALL, SIZE_MEDIUM, SIZE_BIG) def __init__(self, host): log.info(_("Plugin XEP_0264 initialization")) self.host = host host.trigger.add("XEP-0234_buildFileElement", self._add_file_thumbnails) host.trigger.add("XEP-0234_parseFileElement", self._get_file_thumbnails) def get_handler(self, client): return XEP_0264_handler() ## triggers ## def _add_file_thumbnails(self, client, file_elt, extra_args): try: thumbnails = extra_args["extra"][C.KEY_THUMBNAILS] except KeyError: return for thumbnail in thumbnails: thumbnail_elt = file_elt.addElement((NS_THUMBS, "thumbnail")) thumbnail_elt["uri"] = "cid:" + thumbnail["id"] thumbnail_elt["media-type"] = MIME_TYPE width, height = thumbnail["size"] thumbnail_elt["width"] = str(width) thumbnail_elt["height"] = str(height) return True def _get_file_thumbnails(self, client, file_elt, file_data): thumbnails = [] for thumbnail_elt in file_elt.elements(NS_THUMBS, "thumbnail"): uri = thumbnail_elt["uri"] if uri.startswith("cid:"): thumbnail = {"id": uri[4:]} width = thumbnail_elt.getAttribute("width") height = thumbnail_elt.getAttribute("height") if width and height: try: thumbnail["size"] = (int(width), int(height)) except ValueError: pass try: thumbnail["mime_type"] = thumbnail_elt["media-type"] except KeyError: pass thumbnails.append(thumbnail) if thumbnails: # we want thumbnails ordered from smallest to biggest thumbnails.sort(key=lambda t: t.get('size', (0, 0))) file_data.setdefault("extra", {})[C.KEY_THUMBNAILS] = thumbnails return True ## thumbnails generation ## def get_thumb_id(self, image_uid, size): """return an ID unique for image/size combination @param image_uid(unicode): unique id of the image can be a hash @param size(tuple(int)): requested size of thumbnail @return (unicode): unique id for this image/size """ return hashlib.sha256(repr((image_uid, size)).encode()).hexdigest() def _blocking_gen_thumb( self, source_path, size=None, max_age=None, image_uid=None, fix_orientation=True): """Generate a thumbnail for image This is a blocking method and must be executed in a thread params are the same as for [generate_thumbnail] """ if size is None: size = self.SIZE_SMALL try: img = Image.open(source_path) except IOError: return Failure(exceptions.DataError("Can't open image")) img.thumbnail(size) if fix_orientation: img = ImageOps.exif_transpose(img) uid = self.get_thumb_id(image_uid or source_path, size) with self.host.common_cache.cache_data( PLUGIN_INFO[C.PI_IMPORT_NAME], uid, MIME_TYPE, max_age ) as f: img.save(f, SAVE_FORMAT) if fix_orientation: log.debug(f"fixed orientation for {f.name}") return img.size, uid def generate_thumbnail( self, source_path, size=None, max_age=None, image_uid=None, fix_orientation=True): """Generate a thumbnail of image @param source_path(unicode): absolute path to source image @param size(int, None): max size of the thumbnail can be one of self.SIZE_* None to use default value (i.e. self.SIZE_SMALL) @param max_age(int, None): same as for [memory.cache.Cache.cache_data]) @param image_uid(unicode, None): unique ID to identify the image use hash whenever possible if None, source_path will be used @param fix_orientation(bool): if True, fix orientation using EXIF data @return D(tuple[tuple[int,int], unicode]): tuple with: - size of the thumbnail - unique Id of the thumbnail """ d = threads.deferToThread( self._blocking_gen_thumb, source_path, size, max_age, image_uid=image_uid, fix_orientation=fix_orientation ) d.addErrback( lambda failure_: log.error("thumbnail generation error: {}".format(failure_)) ) return d @implementer(iwokkel.IDisco) class XEP_0264_handler(XMPPHandler): def getDiscoInfo(self, requestor, target, nodeIdentifier=""): return [disco.DiscoFeature(NS_THUMBS)] def getDiscoItems(self, requestor, target, nodeIdentifier=""): return []