Mercurial > libervia-backend
diff libervia/backend/plugins/plugin_xep_0096.py @ 4071:4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 11:49:51 +0200 |
parents | sat/plugins/plugin_xep_0096.py@524856bd7b19 |
children | e11b13418ba6 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libervia/backend/plugins/plugin_xep_0096.py Fri Jun 02 11:49:51 2023 +0200 @@ -0,0 +1,406 @@ +#!/usr/bin/env python3 + + +# SAT plugin for managing xep-0096 +# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import os +from twisted.words.xish import domish +from twisted.words.protocols.jabber import jid +from twisted.words.protocols.jabber import error +from twisted.internet import defer +from libervia.backend.core.i18n import _, D_ +from libervia.backend.core.constants import Const as C +from libervia.backend.core.log import getLogger +from libervia.backend.core import exceptions +from libervia.backend.tools import xml_tools +from libervia.backend.tools import stream + +log = getLogger(__name__) + + +NS_SI_FT = "http://jabber.org/protocol/si/profile/file-transfer" +IQ_SET = '/iq[@type="set"]' +SI_PROFILE_NAME = "file-transfer" +SI_PROFILE = "http://jabber.org/protocol/si/profile/" + SI_PROFILE_NAME + +PLUGIN_INFO = { + C.PI_NAME: "XEP-0096 Plugin", + C.PI_IMPORT_NAME: "XEP-0096", + C.PI_TYPE: "XEP", + C.PI_PROTOCOLS: ["XEP-0096"], + C.PI_DEPENDENCIES: ["XEP-0020", "XEP-0095", "XEP-0065", "XEP-0047", "FILE"], + C.PI_MAIN: "XEP_0096", + C.PI_HANDLER: "no", + C.PI_DESCRIPTION: _("""Implementation of SI File Transfer"""), +} + + +class XEP_0096(object): + # TODO: call self._f.unregister when unloading order will be managing (i.e. when depenencies will be unloaded at the end) + name = PLUGIN_INFO[C.PI_NAME] + human_name = D_("Stream Initiation") + + def __init__(self, host): + log.info(_("Plugin XEP_0096 initialization")) + self.host = host + self.managed_stream_m = [ + self.host.plugins["XEP-0065"].NAMESPACE, + self.host.plugins["XEP-0047"].NAMESPACE, + ] # Stream methods managed + self._f = self.host.plugins["FILE"] + self._f.register(self) + self._si = self.host.plugins["XEP-0095"] + self._si.register_si_profile(SI_PROFILE_NAME, self._transfer_request) + host.bridge.add_method( + "si_file_send", ".plugin", in_sign="sssss", out_sign="s", method=self._file_send + ) + + async def can_handle_file_send(self, client, peer_jid, filepath): + return await self.host.hasFeature(client, NS_SI_FT, peer_jid) + + def unload(self): + self._si.unregister_si_profile(SI_PROFILE_NAME) + + def _bad_request(self, client, iq_elt, message=None): + """Send a bad-request error + + @param iq_elt(domish.Element): initial <IQ> element of the SI request + @param message(None, unicode): informational message to display in the logs + """ + if message is not None: + log.warning(message) + self._si.sendError(client, iq_elt, "bad-request") + + def _parse_range(self, parent_elt, file_size): + """find and parse <range/> element + + @param parent_elt(domish.Element): direct parent of the <range/> element + @return (tuple[bool, int, int]): a tuple with + - True if range is required + - range_offset + - range_length + """ + try: + range_elt = next(parent_elt.elements(NS_SI_FT, "range")) + except StopIteration: + range_ = False + range_offset = None + range_length = None + else: + range_ = True + + try: + range_offset = int(range_elt["offset"]) + except KeyError: + range_offset = 0 + + try: + range_length = int(range_elt["length"]) + except KeyError: + range_length = file_size + + if range_offset != 0 or range_length != file_size: + raise NotImplementedError # FIXME + + return range_, range_offset, range_length + + def _transfer_request(self, client, iq_elt, si_id, si_mime_type, si_elt): + """Called when a file transfer is requested + + @param iq_elt(domish.Element): initial <IQ> element of the SI request + @param si_id(unicode): Stream Initiation session id + @param si_mime_type("unicode"): Mime type of the file (or default "application/octet-stream" if unknown) + @param si_elt(domish.Element): request + """ + log.info(_("XEP-0096 file transfer requested")) + peer_jid = jid.JID(iq_elt["from"]) + + try: + file_elt = next(si_elt.elements(NS_SI_FT, "file")) + except StopIteration: + return self._bad_request( + client, iq_elt, "No <file/> element found in SI File Transfer request" + ) + + try: + feature_elt = self.host.plugins["XEP-0020"].get_feature_elt(si_elt) + except exceptions.NotFound: + return self._bad_request( + client, iq_elt, "No <feature/> element found in SI File Transfer request" + ) + + try: + filename = file_elt["name"] + file_size = int(file_elt["size"]) + except (KeyError, ValueError): + return self._bad_request(client, iq_elt, "Malformed SI File Transfer request") + + file_date = file_elt.getAttribute("date") + file_hash = file_elt.getAttribute("hash") + + log.info( + "File proposed: name=[{name}] size={size}".format( + name=filename, size=file_size + ) + ) + + try: + file_desc = str(next(file_elt.elements(NS_SI_FT, "desc"))) + except StopIteration: + file_desc = "" + + try: + range_, range_offset, range_length = self._parse_range(file_elt, file_size) + except ValueError: + return self._bad_request(client, iq_elt, "Malformed SI File Transfer request") + + try: + stream_method = self.host.plugins["XEP-0020"].negotiate( + feature_elt, "stream-method", self.managed_stream_m, namespace=None + ) + except KeyError: + return self._bad_request(client, iq_elt, "No stream method found") + + if stream_method: + if stream_method == self.host.plugins["XEP-0065"].NAMESPACE: + plugin = self.host.plugins["XEP-0065"] + elif stream_method == self.host.plugins["XEP-0047"].NAMESPACE: + plugin = self.host.plugins["XEP-0047"] + else: + log.error( + "Unknown stream method, this should not happen at this stage, cancelling transfer" + ) + else: + log.warning("Can't find a valid stream method") + self._si.sendError(client, iq_elt, "not-acceptable") + return + + # if we are here, the transfer can start, we just need user's agreement + data = { + "name": filename, + "peer_jid": peer_jid, + "size": file_size, + "date": file_date, + "hash": file_hash, + "desc": file_desc, + "range": range_, + "range_offset": range_offset, + "range_length": range_length, + "si_id": si_id, + "progress_id": si_id, + "stream_method": stream_method, + "stream_plugin": plugin, + } + + d = defer.ensureDeferred( + self._f.get_dest_dir(client, peer_jid, data, data, stream_object=True) + ) + d.addCallback(self.confirmation_cb, client, iq_elt, data) + + def confirmation_cb(self, accepted, client, iq_elt, data): + """Called on confirmation answer + + @param accepted(bool): True if file transfer is accepted + @param iq_elt(domish.Element): initial SI request + @param data(dict): session data + """ + if not accepted: + log.info("File transfer declined") + self._si.sendError(client, iq_elt, "forbidden") + return + # data, timeout, stream_method, failed_methods = client._xep_0096_waiting_for_approval[sid] + # can_range = data['can_range'] == "True" + # range_offset = 0 + # if timeout.active(): + # timeout.cancel() + # try: + # dest_path = frontend_data['dest_path'] + # except KeyError: + # log.error(_('dest path not found in frontend_data')) + # del client._xep_0096_waiting_for_approval[sid] + # return + # if stream_method == self.host.plugins["XEP-0065"].NAMESPACE: + # plugin = self.host.plugins["XEP-0065"] + # elif stream_method == self.host.plugins["XEP-0047"].NAMESPACE: + # plugin = self.host.plugins["XEP-0047"] + # else: + # log.error(_("Unknown stream method, this should not happen at this stage, cancelling transfer")) + # del client._xep_0096_waiting_for_approval[sid] + # return + + # file_obj = self._getFileObject(dest_path, can_range) + # range_offset = file_obj.tell() + d = data["stream_plugin"].create_session( + client, data["stream_object"], client.jid, data["peer_jid"], data["si_id"] + ) + d.addCallback(self._transfer_cb, client, data) + d.addErrback(self._transfer_eb, client, data) + + # we can send the iq result + feature_elt = self.host.plugins["XEP-0020"].choose_option( + {"stream-method": data["stream_method"]}, namespace=None + ) + misc_elts = [] + misc_elts.append(domish.Element((SI_PROFILE, "file"))) + # if can_range: + # range_elt = domish.Element((None, "range")) + # range_elt['offset'] = str(range_offset) + # #TODO: manage range length + # misc_elts.append(range_elt) + self._si.accept_stream(client, iq_elt, feature_elt, misc_elts) + + def _transfer_cb(self, __, client, data): + """Called by the stream method when transfer successfuly finished + + @param data: session data + """ + # TODO: check hash + data["stream_object"].close() + log.info("Transfer {si_id} successfuly finished".format(**data)) + + def _transfer_eb(self, failure, client, data): + """Called when something went wrong with the transfer + + @param id: stream id + @param data: session data + """ + log.warning( + "Transfer {si_id} failed: {reason}".format( + reason=str(failure.value), **data + ) + ) + data["stream_object"].close() + + def _file_send(self, peer_jid_s, filepath, name, desc, profile=C.PROF_KEY_NONE): + client = self.host.get_client(profile) + return self.file_send( + client, jid.JID(peer_jid_s), filepath, name or None, desc or None + ) + + def file_send(self, client, peer_jid, filepath, name=None, desc=None, extra=None): + """Send a file using XEP-0096 + + @param peer_jid(jid.JID): recipient + @param filepath(str): absolute path to the file to send + @param name(unicode): name of the file to send + name must not contain "/" characters + @param desc: description of the file + @param extra: not used here + @return: an unique id to identify the transfer + """ + feature_elt = self.host.plugins["XEP-0020"].propose_features( + {"stream-method": self.managed_stream_m}, namespace=None + ) + + file_transfer_elts = [] + + statinfo = os.stat(filepath) + file_elt = domish.Element((SI_PROFILE, "file")) + file_elt["name"] = name or os.path.basename(filepath) + assert "/" not in file_elt["name"] + size = statinfo.st_size + file_elt["size"] = str(size) + if desc: + file_elt.addElement("desc", content=desc) + file_transfer_elts.append(file_elt) + + file_transfer_elts.append(domish.Element((None, "range"))) + + sid, offer_d = self._si.propose_stream( + client, peer_jid, SI_PROFILE, feature_elt, file_transfer_elts + ) + args = [filepath, sid, size, client] + offer_d.addCallbacks(self._file_cb, self._file_eb, args, None, args) + return sid + + def _file_cb(self, result_tuple, filepath, sid, size, client): + iq_elt, si_elt = result_tuple + + try: + feature_elt = self.host.plugins["XEP-0020"].get_feature_elt(si_elt) + except exceptions.NotFound: + log.warning("No <feature/> element found in result while expected") + return + + choosed_options = self.host.plugins["XEP-0020"].get_choosed_options( + feature_elt, namespace=None + ) + try: + stream_method = choosed_options["stream-method"] + except KeyError: + log.warning("No stream method choosed") + return + + try: + file_elt = next(si_elt.elements(NS_SI_FT, "file")) + except StopIteration: + pass + else: + range_, range_offset, range_length = self._parse_range(file_elt, size) + + if stream_method == self.host.plugins["XEP-0065"].NAMESPACE: + plugin = self.host.plugins["XEP-0065"] + elif stream_method == self.host.plugins["XEP-0047"].NAMESPACE: + plugin = self.host.plugins["XEP-0047"] + else: + log.warning("Invalid stream method received") + return + + stream_object = stream.FileStreamObject( + self.host, client, filepath, uid=sid, size=size + ) + d = plugin.start_stream(client, stream_object, client.jid, + jid.JID(iq_elt["from"]), sid) + d.addCallback(self._send_cb, client, sid, stream_object) + d.addErrback(self._send_eb, client, sid, stream_object) + + def _file_eb(self, failure, filepath, sid, size, client): + if failure.check(error.StanzaError): + stanza_err = failure.value + if stanza_err.code == "403" and stanza_err.condition == "forbidden": + from_s = stanza_err.stanza["from"] + log.info("File transfer refused by {}".format(from_s)) + msg = D_("The contact {} has refused your file").format(from_s) + title = D_("File refused") + xml_tools.quick_note(self.host, client, msg, title, C.XMLUI_DATA_LVL_INFO) + else: + log.warning(_("Error during file transfer")) + msg = D_( + "Something went wrong during the file transfer session initialisation: {reason}" + ).format(reason=str(stanza_err)) + title = D_("File transfer error") + xml_tools.quick_note(self.host, client, msg, title, C.XMLUI_DATA_LVL_ERROR) + elif failure.check(exceptions.DataError): + log.warning("Invalid stanza received") + else: + log.error("Error while proposing stream: {}".format(failure)) + + def _send_cb(self, __, client, sid, stream_object): + log.info( + _("transfer {sid} successfuly finished [{profile}]").format( + sid=sid, profile=client.profile + ) + ) + stream_object.close() + + def _send_eb(self, failure, client, sid, stream_object): + log.warning( + _("transfer {sid} failed [{profile}]: {reason}").format( + sid=sid, profile=client.profile, reason=str(failure.value) + ) + ) + stream_object.close()