comparison libervia/backend/plugins/plugin_xep_0096.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200
parents sat/plugins/plugin_xep_0096.py@524856bd7b19
children e11b13418ba6
comparison
equal deleted inserted replaced
4070:d10748475025 4071:4b842c1fb686
1 #!/usr/bin/env python3
2
3
4 # SAT plugin for managing xep-0096
5 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
6
7 # This program is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU Affero General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
11
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU Affero General Public License for more details.
16
17 # You should have received a copy of the GNU Affero General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20 import os
21 from twisted.words.xish import domish
22 from twisted.words.protocols.jabber import jid
23 from twisted.words.protocols.jabber import error
24 from twisted.internet import defer
25 from libervia.backend.core.i18n import _, D_
26 from libervia.backend.core.constants import Const as C
27 from libervia.backend.core.log import getLogger
28 from libervia.backend.core import exceptions
29 from libervia.backend.tools import xml_tools
30 from libervia.backend.tools import stream
31
32 log = getLogger(__name__)
33
34
35 NS_SI_FT = "http://jabber.org/protocol/si/profile/file-transfer"
36 IQ_SET = '/iq[@type="set"]'
37 SI_PROFILE_NAME = "file-transfer"
38 SI_PROFILE = "http://jabber.org/protocol/si/profile/" + SI_PROFILE_NAME
39
40 PLUGIN_INFO = {
41 C.PI_NAME: "XEP-0096 Plugin",
42 C.PI_IMPORT_NAME: "XEP-0096",
43 C.PI_TYPE: "XEP",
44 C.PI_PROTOCOLS: ["XEP-0096"],
45 C.PI_DEPENDENCIES: ["XEP-0020", "XEP-0095", "XEP-0065", "XEP-0047", "FILE"],
46 C.PI_MAIN: "XEP_0096",
47 C.PI_HANDLER: "no",
48 C.PI_DESCRIPTION: _("""Implementation of SI File Transfer"""),
49 }
50
51
52 class XEP_0096(object):
53 # TODO: call self._f.unregister when unloading order will be managing (i.e. when depenencies will be unloaded at the end)
54 name = PLUGIN_INFO[C.PI_NAME]
55 human_name = D_("Stream Initiation")
56
57 def __init__(self, host):
58 log.info(_("Plugin XEP_0096 initialization"))
59 self.host = host
60 self.managed_stream_m = [
61 self.host.plugins["XEP-0065"].NAMESPACE,
62 self.host.plugins["XEP-0047"].NAMESPACE,
63 ] # Stream methods managed
64 self._f = self.host.plugins["FILE"]
65 self._f.register(self)
66 self._si = self.host.plugins["XEP-0095"]
67 self._si.register_si_profile(SI_PROFILE_NAME, self._transfer_request)
68 host.bridge.add_method(
69 "si_file_send", ".plugin", in_sign="sssss", out_sign="s", method=self._file_send
70 )
71
72 async def can_handle_file_send(self, client, peer_jid, filepath):
73 return await self.host.hasFeature(client, NS_SI_FT, peer_jid)
74
75 def unload(self):
76 self._si.unregister_si_profile(SI_PROFILE_NAME)
77
78 def _bad_request(self, client, iq_elt, message=None):
79 """Send a bad-request error
80
81 @param iq_elt(domish.Element): initial <IQ> element of the SI request
82 @param message(None, unicode): informational message to display in the logs
83 """
84 if message is not None:
85 log.warning(message)
86 self._si.sendError(client, iq_elt, "bad-request")
87
88 def _parse_range(self, parent_elt, file_size):
89 """find and parse <range/> element
90
91 @param parent_elt(domish.Element): direct parent of the <range/> element
92 @return (tuple[bool, int, int]): a tuple with
93 - True if range is required
94 - range_offset
95 - range_length
96 """
97 try:
98 range_elt = next(parent_elt.elements(NS_SI_FT, "range"))
99 except StopIteration:
100 range_ = False
101 range_offset = None
102 range_length = None
103 else:
104 range_ = True
105
106 try:
107 range_offset = int(range_elt["offset"])
108 except KeyError:
109 range_offset = 0
110
111 try:
112 range_length = int(range_elt["length"])
113 except KeyError:
114 range_length = file_size
115
116 if range_offset != 0 or range_length != file_size:
117 raise NotImplementedError # FIXME
118
119 return range_, range_offset, range_length
120
121 def _transfer_request(self, client, iq_elt, si_id, si_mime_type, si_elt):
122 """Called when a file transfer is requested
123
124 @param iq_elt(domish.Element): initial <IQ> element of the SI request
125 @param si_id(unicode): Stream Initiation session id
126 @param si_mime_type("unicode"): Mime type of the file (or default "application/octet-stream" if unknown)
127 @param si_elt(domish.Element): request
128 """
129 log.info(_("XEP-0096 file transfer requested"))
130 peer_jid = jid.JID(iq_elt["from"])
131
132 try:
133 file_elt = next(si_elt.elements(NS_SI_FT, "file"))
134 except StopIteration:
135 return self._bad_request(
136 client, iq_elt, "No <file/> element found in SI File Transfer request"
137 )
138
139 try:
140 feature_elt = self.host.plugins["XEP-0020"].get_feature_elt(si_elt)
141 except exceptions.NotFound:
142 return self._bad_request(
143 client, iq_elt, "No <feature/> element found in SI File Transfer request"
144 )
145
146 try:
147 filename = file_elt["name"]
148 file_size = int(file_elt["size"])
149 except (KeyError, ValueError):
150 return self._bad_request(client, iq_elt, "Malformed SI File Transfer request")
151
152 file_date = file_elt.getAttribute("date")
153 file_hash = file_elt.getAttribute("hash")
154
155 log.info(
156 "File proposed: name=[{name}] size={size}".format(
157 name=filename, size=file_size
158 )
159 )
160
161 try:
162 file_desc = str(next(file_elt.elements(NS_SI_FT, "desc")))
163 except StopIteration:
164 file_desc = ""
165
166 try:
167 range_, range_offset, range_length = self._parse_range(file_elt, file_size)
168 except ValueError:
169 return self._bad_request(client, iq_elt, "Malformed SI File Transfer request")
170
171 try:
172 stream_method = self.host.plugins["XEP-0020"].negotiate(
173 feature_elt, "stream-method", self.managed_stream_m, namespace=None
174 )
175 except KeyError:
176 return self._bad_request(client, iq_elt, "No stream method found")
177
178 if stream_method:
179 if stream_method == self.host.plugins["XEP-0065"].NAMESPACE:
180 plugin = self.host.plugins["XEP-0065"]
181 elif stream_method == self.host.plugins["XEP-0047"].NAMESPACE:
182 plugin = self.host.plugins["XEP-0047"]
183 else:
184 log.error(
185 "Unknown stream method, this should not happen at this stage, cancelling transfer"
186 )
187 else:
188 log.warning("Can't find a valid stream method")
189 self._si.sendError(client, iq_elt, "not-acceptable")
190 return
191
192 # if we are here, the transfer can start, we just need user's agreement
193 data = {
194 "name": filename,
195 "peer_jid": peer_jid,
196 "size": file_size,
197 "date": file_date,
198 "hash": file_hash,
199 "desc": file_desc,
200 "range": range_,
201 "range_offset": range_offset,
202 "range_length": range_length,
203 "si_id": si_id,
204 "progress_id": si_id,
205 "stream_method": stream_method,
206 "stream_plugin": plugin,
207 }
208
209 d = defer.ensureDeferred(
210 self._f.get_dest_dir(client, peer_jid, data, data, stream_object=True)
211 )
212 d.addCallback(self.confirmation_cb, client, iq_elt, data)
213
214 def confirmation_cb(self, accepted, client, iq_elt, data):
215 """Called on confirmation answer
216
217 @param accepted(bool): True if file transfer is accepted
218 @param iq_elt(domish.Element): initial SI request
219 @param data(dict): session data
220 """
221 if not accepted:
222 log.info("File transfer declined")
223 self._si.sendError(client, iq_elt, "forbidden")
224 return
225 # data, timeout, stream_method, failed_methods = client._xep_0096_waiting_for_approval[sid]
226 # can_range = data['can_range'] == "True"
227 # range_offset = 0
228 # if timeout.active():
229 # timeout.cancel()
230 # try:
231 # dest_path = frontend_data['dest_path']
232 # except KeyError:
233 # log.error(_('dest path not found in frontend_data'))
234 # del client._xep_0096_waiting_for_approval[sid]
235 # return
236 # if stream_method == self.host.plugins["XEP-0065"].NAMESPACE:
237 # plugin = self.host.plugins["XEP-0065"]
238 # elif stream_method == self.host.plugins["XEP-0047"].NAMESPACE:
239 # plugin = self.host.plugins["XEP-0047"]
240 # else:
241 # log.error(_("Unknown stream method, this should not happen at this stage, cancelling transfer"))
242 # del client._xep_0096_waiting_for_approval[sid]
243 # return
244
245 # file_obj = self._getFileObject(dest_path, can_range)
246 # range_offset = file_obj.tell()
247 d = data["stream_plugin"].create_session(
248 client, data["stream_object"], client.jid, data["peer_jid"], data["si_id"]
249 )
250 d.addCallback(self._transfer_cb, client, data)
251 d.addErrback(self._transfer_eb, client, data)
252
253 # we can send the iq result
254 feature_elt = self.host.plugins["XEP-0020"].choose_option(
255 {"stream-method": data["stream_method"]}, namespace=None
256 )
257 misc_elts = []
258 misc_elts.append(domish.Element((SI_PROFILE, "file")))
259 # if can_range:
260 # range_elt = domish.Element((None, "range"))
261 # range_elt['offset'] = str(range_offset)
262 # #TODO: manage range length
263 # misc_elts.append(range_elt)
264 self._si.accept_stream(client, iq_elt, feature_elt, misc_elts)
265
266 def _transfer_cb(self, __, client, data):
267 """Called by the stream method when transfer successfuly finished
268
269 @param data: session data
270 """
271 # TODO: check hash
272 data["stream_object"].close()
273 log.info("Transfer {si_id} successfuly finished".format(**data))
274
275 def _transfer_eb(self, failure, client, data):
276 """Called when something went wrong with the transfer
277
278 @param id: stream id
279 @param data: session data
280 """
281 log.warning(
282 "Transfer {si_id} failed: {reason}".format(
283 reason=str(failure.value), **data
284 )
285 )
286 data["stream_object"].close()
287
288 def _file_send(self, peer_jid_s, filepath, name, desc, profile=C.PROF_KEY_NONE):
289 client = self.host.get_client(profile)
290 return self.file_send(
291 client, jid.JID(peer_jid_s), filepath, name or None, desc or None
292 )
293
294 def file_send(self, client, peer_jid, filepath, name=None, desc=None, extra=None):
295 """Send a file using XEP-0096
296
297 @param peer_jid(jid.JID): recipient
298 @param filepath(str): absolute path to the file to send
299 @param name(unicode): name of the file to send
300 name must not contain "/" characters
301 @param desc: description of the file
302 @param extra: not used here
303 @return: an unique id to identify the transfer
304 """
305 feature_elt = self.host.plugins["XEP-0020"].propose_features(
306 {"stream-method": self.managed_stream_m}, namespace=None
307 )
308
309 file_transfer_elts = []
310
311 statinfo = os.stat(filepath)
312 file_elt = domish.Element((SI_PROFILE, "file"))
313 file_elt["name"] = name or os.path.basename(filepath)
314 assert "/" not in file_elt["name"]
315 size = statinfo.st_size
316 file_elt["size"] = str(size)
317 if desc:
318 file_elt.addElement("desc", content=desc)
319 file_transfer_elts.append(file_elt)
320
321 file_transfer_elts.append(domish.Element((None, "range")))
322
323 sid, offer_d = self._si.propose_stream(
324 client, peer_jid, SI_PROFILE, feature_elt, file_transfer_elts
325 )
326 args = [filepath, sid, size, client]
327 offer_d.addCallbacks(self._file_cb, self._file_eb, args, None, args)
328 return sid
329
330 def _file_cb(self, result_tuple, filepath, sid, size, client):
331 iq_elt, si_elt = result_tuple
332
333 try:
334 feature_elt = self.host.plugins["XEP-0020"].get_feature_elt(si_elt)
335 except exceptions.NotFound:
336 log.warning("No <feature/> element found in result while expected")
337 return
338
339 choosed_options = self.host.plugins["XEP-0020"].get_choosed_options(
340 feature_elt, namespace=None
341 )
342 try:
343 stream_method = choosed_options["stream-method"]
344 except KeyError:
345 log.warning("No stream method choosed")
346 return
347
348 try:
349 file_elt = next(si_elt.elements(NS_SI_FT, "file"))
350 except StopIteration:
351 pass
352 else:
353 range_, range_offset, range_length = self._parse_range(file_elt, size)
354
355 if stream_method == self.host.plugins["XEP-0065"].NAMESPACE:
356 plugin = self.host.plugins["XEP-0065"]
357 elif stream_method == self.host.plugins["XEP-0047"].NAMESPACE:
358 plugin = self.host.plugins["XEP-0047"]
359 else:
360 log.warning("Invalid stream method received")
361 return
362
363 stream_object = stream.FileStreamObject(
364 self.host, client, filepath, uid=sid, size=size
365 )
366 d = plugin.start_stream(client, stream_object, client.jid,
367 jid.JID(iq_elt["from"]), sid)
368 d.addCallback(self._send_cb, client, sid, stream_object)
369 d.addErrback(self._send_eb, client, sid, stream_object)
370
371 def _file_eb(self, failure, filepath, sid, size, client):
372 if failure.check(error.StanzaError):
373 stanza_err = failure.value
374 if stanza_err.code == "403" and stanza_err.condition == "forbidden":
375 from_s = stanza_err.stanza["from"]
376 log.info("File transfer refused by {}".format(from_s))
377 msg = D_("The contact {} has refused your file").format(from_s)
378 title = D_("File refused")
379 xml_tools.quick_note(self.host, client, msg, title, C.XMLUI_DATA_LVL_INFO)
380 else:
381 log.warning(_("Error during file transfer"))
382 msg = D_(
383 "Something went wrong during the file transfer session initialisation: {reason}"
384 ).format(reason=str(stanza_err))
385 title = D_("File transfer error")
386 xml_tools.quick_note(self.host, client, msg, title, C.XMLUI_DATA_LVL_ERROR)
387 elif failure.check(exceptions.DataError):
388 log.warning("Invalid stanza received")
389 else:
390 log.error("Error while proposing stream: {}".format(failure))
391
392 def _send_cb(self, __, client, sid, stream_object):
393 log.info(
394 _("transfer {sid} successfuly finished [{profile}]").format(
395 sid=sid, profile=client.profile
396 )
397 )
398 stream_object.close()
399
400 def _send_eb(self, failure, client, sid, stream_object):
401 log.warning(
402 _("transfer {sid} failed [{profile}]: {reason}").format(
403 sid=sid, profile=client.profile, reason=str(failure.value)
404 )
405 )
406 stream_object.close()