comparison libervia/backend/plugins/plugin_xep_0047.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200
parents sat/plugins/plugin_xep_0047.py@524856bd7b19
children 0d7bb4df2343
comparison
equal deleted inserted replaced
4070:d10748475025 4071:4b842c1fb686
1 #!/usr/bin/env python3
2
3
4 # SAT plugin for managing gateways (xep-0047)
5 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
6
7 # This program is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU Affero General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
11
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU Affero General Public License for more details.
16
17 # You should have received a copy of the GNU Affero General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20 from libervia.backend.core.i18n import _
21 from libervia.backend.core.log import getLogger
22
23 log = getLogger(__name__)
24 from libervia.backend.core.constants import Const as C
25 from libervia.backend.core import exceptions
26 from twisted.words.protocols.jabber import jid
27 from twisted.words.protocols.jabber import xmlstream
28 from twisted.words.protocols.jabber import error
29 from twisted.internet import reactor
30 from twisted.internet import defer
31 from twisted.python import failure
32
33 from wokkel import disco, iwokkel
34
35 from zope.interface import implementer
36
37 import base64
38
39 try:
40 from twisted.words.protocols.xmlstream import XMPPHandler
41 except ImportError:
42 from wokkel.subprotocols import XMPPHandler
43
44 MESSAGE = "/message"
45 IQ_SET = '/iq[@type="set"]'
46 NS_IBB = "http://jabber.org/protocol/ibb"
47 IBB_OPEN = IQ_SET + '/open[@xmlns="' + NS_IBB + '"]'
48 IBB_CLOSE = IQ_SET + '/close[@xmlns="' + NS_IBB + '" and @sid="{}"]'
49 IBB_IQ_DATA = IQ_SET + '/data[@xmlns="' + NS_IBB + '" and @sid="{}"]'
50 IBB_MESSAGE_DATA = MESSAGE + '/data[@xmlns="' + NS_IBB + '" and @sid="{}"]'
51 TIMEOUT = 120 # timeout for workflow
52 DEFER_KEY = "finished" # key of the deferred used to track session end
53
54 PLUGIN_INFO = {
55 C.PI_NAME: "In-Band Bytestream Plugin",
56 C.PI_IMPORT_NAME: "XEP-0047",
57 C.PI_TYPE: "XEP",
58 C.PI_MODES: C.PLUG_MODE_BOTH,
59 C.PI_PROTOCOLS: ["XEP-0047"],
60 C.PI_MAIN: "XEP_0047",
61 C.PI_HANDLER: "yes",
62 C.PI_DESCRIPTION: _("""Implementation of In-Band Bytestreams"""),
63 }
64
65
66 class XEP_0047(object):
67 NAMESPACE = NS_IBB
68 BLOCK_SIZE = 4096
69
70 def __init__(self, host):
71 log.info(_("In-Band Bytestreams plugin initialization"))
72 self.host = host
73
74 def get_handler(self, client):
75 return XEP_0047_handler(self)
76
77 def profile_connected(self, client):
78 client.xep_0047_current_stream = {} # key: stream_id, value: data(dict)
79
80 def _time_out(self, sid, client):
81 """Delete current_stream id, called after timeout
82
83 @param sid(unicode): session id of client.xep_0047_current_stream
84 @param client: %(doc_client)s
85 """
86 log.info(
87 "In-Band Bytestream: TimeOut reached for id {sid} [{profile}]".format(
88 sid=sid, profile=client.profile
89 )
90 )
91 self._kill_session(sid, client, "TIMEOUT")
92
93 def _kill_session(self, sid, client, failure_reason=None):
94 """Delete a current_stream id, clean up associated observers
95
96 @param sid(unicode): session id
97 @param client: %(doc_client)s
98 @param failure_reason(None, unicode): if None the session is successful
99 else, will be used to call failure_cb
100 """
101 try:
102 session = client.xep_0047_current_stream[sid]
103 except KeyError:
104 log.warning("kill id called on a non existant id")
105 return
106
107 try:
108 observer_cb = session["observer_cb"]
109 except KeyError:
110 pass
111 else:
112 client.xmlstream.removeObserver(session["event_data"], observer_cb)
113
114 if session["timer"].active():
115 session["timer"].cancel()
116
117 del client.xep_0047_current_stream[sid]
118
119 success = failure_reason is None
120 stream_d = session[DEFER_KEY]
121
122 if success:
123 stream_d.callback(None)
124 else:
125 stream_d.errback(failure.Failure(exceptions.DataError(failure_reason)))
126
127 def create_session(self, *args, **kwargs):
128 """like [_create_session] but return the session deferred instead of the whole session
129
130 session deferred is fired when transfer is finished
131 """
132 return self._create_session(*args, **kwargs)[DEFER_KEY]
133
134 def _create_session(self, client, stream_object, local_jid, to_jid, sid):
135 """Called when a bytestream is imminent
136
137 @param stream_object(IConsumer): stream object where data will be written
138 @param local_jid(jid.JID): same as [start_stream]
139 @param to_jid(jid.JId): jid of the other peer
140 @param sid(unicode): session id
141 @return (dict): session data
142 """
143 if sid in client.xep_0047_current_stream:
144 raise exceptions.ConflictError("A session with this id already exists !")
145 session_data = client.xep_0047_current_stream[sid] = {
146 "id": sid,
147 DEFER_KEY: defer.Deferred(),
148 "local_jid": local_jid,
149 "to": to_jid,
150 "stream_object": stream_object,
151 "seq": -1,
152 "timer": reactor.callLater(TIMEOUT, self._time_out, sid, client),
153 }
154
155 return session_data
156
157 def _on_ibb_open(self, iq_elt, client):
158 """"Called when an IBB <open> element is received
159
160 @param iq_elt(domish.Element): the whole <iq> stanza
161 """
162 log.debug(_("IBB stream opening"))
163 iq_elt.handled = True
164 open_elt = next(iq_elt.elements(NS_IBB, "open"))
165 block_size = open_elt.getAttribute("block-size")
166 sid = open_elt.getAttribute("sid")
167 stanza = open_elt.getAttribute("stanza", "iq")
168 if not sid or not block_size or int(block_size) > 65535:
169 return self._sendError("not-acceptable", sid or None, iq_elt, client)
170 if not sid in client.xep_0047_current_stream:
171 log.warning(_("Ignoring unexpected IBB transfer: %s" % sid))
172 return self._sendError("not-acceptable", sid or None, iq_elt, client)
173 session_data = client.xep_0047_current_stream[sid]
174 if session_data["to"] != jid.JID(iq_elt["from"]):
175 log.warning(
176 _("sended jid inconsistency (man in the middle attack attempt ?)")
177 )
178 return self._sendError("not-acceptable", sid, iq_elt, client)
179
180 # at this stage, the session looks ok and will be accepted
181
182 # we reset the timeout:
183 session_data["timer"].reset(TIMEOUT)
184
185 # we save the xmlstream, events and observer data to allow observer removal
186 session_data["event_data"] = event_data = (
187 IBB_MESSAGE_DATA if stanza == "message" else IBB_IQ_DATA
188 ).format(sid)
189 session_data["observer_cb"] = observer_cb = self._on_ibb_data
190 event_close = IBB_CLOSE.format(sid)
191 # we now set the stream observer to look after data packet
192 # FIXME: if we never get the events, the observers stay.
193 # would be better to have generic observer and check id once triggered
194 client.xmlstream.addObserver(event_data, observer_cb, client=client)
195 client.xmlstream.addOnetimeObserver(event_close, self._on_ibb_close, client=client)
196 # finally, we send the accept stanza
197 iq_result_elt = xmlstream.toResponse(iq_elt, "result")
198 client.send(iq_result_elt)
199
200 def _on_ibb_close(self, iq_elt, client):
201 """"Called when an IBB <close> element is received
202
203 @param iq_elt(domish.Element): the whole <iq> stanza
204 """
205 iq_elt.handled = True
206 log.debug(_("IBB stream closing"))
207 close_elt = next(iq_elt.elements(NS_IBB, "close"))
208 # XXX: this observer is only triggered on valid sid, so we don't need to check it
209 sid = close_elt["sid"]
210
211 iq_result_elt = xmlstream.toResponse(iq_elt, "result")
212 client.send(iq_result_elt)
213 self._kill_session(sid, client)
214
215 def _on_ibb_data(self, element, client):
216 """Observer called on <iq> or <message> stanzas with data element
217
218 Manage the data elelement (check validity and write to the stream_object)
219 @param element(domish.Element): <iq> or <message> stanza
220 """
221 element.handled = True
222 data_elt = next(element.elements(NS_IBB, "data"))
223 sid = data_elt["sid"]
224
225 try:
226 session_data = client.xep_0047_current_stream[sid]
227 except KeyError:
228 log.warning(_("Received data for an unknown session id"))
229 return self._sendError("item-not-found", None, element, client)
230
231 from_jid = session_data["to"]
232 stream_object = session_data["stream_object"]
233
234 if from_jid.full() != element["from"]:
235 log.warning(
236 _(
237 "sended jid inconsistency (man in the middle attack attempt ?)\ninitial={initial}\ngiven={given}"
238 ).format(initial=from_jid, given=element["from"])
239 )
240 if element.name == "iq":
241 self._sendError("not-acceptable", sid, element, client)
242 return
243
244 session_data["seq"] = (session_data["seq"] + 1) % 65535
245 if int(data_elt.getAttribute("seq", -1)) != session_data["seq"]:
246 log.warning(_("Sequence error"))
247 if element.name == "iq":
248 reason = "not-acceptable"
249 self._sendError(reason, sid, element, client)
250 self.terminate_stream(session_data, client, reason)
251 return
252
253 # we reset the timeout:
254 session_data["timer"].reset(TIMEOUT)
255
256 # we can now decode the data
257 try:
258 stream_object.write(base64.b64decode(str(data_elt)))
259 except TypeError:
260 # The base64 data is invalid
261 log.warning(_("Invalid base64 data"))
262 if element.name == "iq":
263 self._sendError("not-acceptable", sid, element, client)
264 self.terminate_stream(session_data, client, reason)
265 return
266
267 # we can now ack success
268 if element.name == "iq":
269 iq_result_elt = xmlstream.toResponse(element, "result")
270 client.send(iq_result_elt)
271
272 def _sendError(self, error_condition, sid, iq_elt, client):
273 """Send error stanza
274
275 @param error_condition: one of twisted.words.protocols.jabber.error.STANZA_CONDITIONS keys
276 @param sid(unicode,None): jingle session id, or None, if session must not be destroyed
277 @param iq_elt(domish.Element): full <iq> stanza
278 @param client: %(doc_client)s
279 """
280 iq_elt = error.StanzaError(error_condition).toResponse(iq_elt)
281 log.warning(
282 "Error while managing in-band bytestream session, cancelling: {}".format(
283 error_condition
284 )
285 )
286 if sid is not None:
287 self._kill_session(sid, client, error_condition)
288 client.send(iq_elt)
289
290 def start_stream(self, client, stream_object, local_jid, to_jid, sid, block_size=None):
291 """Launch the stream workflow
292
293 @param stream_object(ifaces.IStreamProducer): stream object to send
294 @param local_jid(jid.JID): jid to use as local jid
295 This is needed for client which can be addressed with a different jid than
296 client.jid if a local part is used (e.g. piotr@file.example.net where
297 client.jid would be file.example.net)
298 @param to_jid(jid.JID): JID of the recipient
299 @param sid(unicode): Stream session id
300 @param block_size(int, None): size of the block (or None for default)
301 """
302 session_data = self._create_session(client, stream_object, local_jid, to_jid, sid)
303
304 if block_size is None:
305 block_size = XEP_0047.BLOCK_SIZE
306 assert block_size <= 65535
307 session_data["block_size"] = block_size
308
309 iq_elt = client.IQ()
310 iq_elt["from"] = local_jid.full()
311 iq_elt["to"] = to_jid.full()
312 open_elt = iq_elt.addElement((NS_IBB, "open"))
313 open_elt["block-size"] = str(block_size)
314 open_elt["sid"] = sid
315 open_elt["stanza"] = "iq" # TODO: manage <message> stanza ?
316 args = [session_data, client]
317 d = iq_elt.send()
318 d.addCallbacks(self._iq_data_stream_cb, self._iq_data_stream_eb, args, None, args)
319 return session_data[DEFER_KEY]
320
321 def _iq_data_stream_cb(self, iq_elt, session_data, client):
322 """Called during the whole data streaming
323
324 @param iq_elt(domish.Element): iq result
325 @param session_data(dict): data of this streaming session
326 @param client: %(doc_client)s
327 """
328 session_data["timer"].reset(TIMEOUT)
329
330 # FIXME: producer/consumer mechanism is not used properly here
331 buffer_ = session_data["stream_object"].file_obj.read(session_data["block_size"])
332 if buffer_:
333 next_iq_elt = client.IQ()
334 next_iq_elt["from"] = session_data["local_jid"].full()
335 next_iq_elt["to"] = session_data["to"].full()
336 data_elt = next_iq_elt.addElement((NS_IBB, "data"))
337 seq = session_data["seq"] = (session_data["seq"] + 1) % 65535
338 data_elt["seq"] = str(seq)
339 data_elt["sid"] = session_data["id"]
340 data_elt.addContent(base64.b64encode(buffer_).decode())
341 args = [session_data, client]
342 d = next_iq_elt.send()
343 d.addCallbacks(self._iq_data_stream_cb, self._iq_data_stream_eb, args, None, args)
344 else:
345 self.terminate_stream(session_data, client)
346
347 def _iq_data_stream_eb(self, failure, session_data, client):
348 if failure.check(error.StanzaError):
349 log.warning("IBB transfer failed: {}".format(failure.value))
350 else:
351 log.error("IBB transfer failed: {}".format(failure.value))
352 self.terminate_stream(session_data, client, "IQ_ERROR")
353
354 def terminate_stream(self, session_data, client, failure_reason=None):
355 """Terminate the stream session
356
357 @param session_data(dict): data of this streaming session
358 @param client: %(doc_client)s
359 @param failure_reason(unicode, None): reason of the failure, or None if steam was successful
360 """
361 iq_elt = client.IQ()
362 iq_elt["from"] = session_data["local_jid"].full()
363 iq_elt["to"] = session_data["to"].full()
364 close_elt = iq_elt.addElement((NS_IBB, "close"))
365 close_elt["sid"] = session_data["id"]
366 iq_elt.send()
367 self._kill_session(session_data["id"], client, failure_reason)
368
369
370 @implementer(iwokkel.IDisco)
371 class XEP_0047_handler(XMPPHandler):
372
373 def __init__(self, parent):
374 self.plugin_parent = parent
375
376 def connectionInitialized(self):
377 self.xmlstream.addObserver(
378 IBB_OPEN, self.plugin_parent._on_ibb_open, client=self.parent
379 )
380
381 def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
382 return [disco.DiscoFeature(NS_IBB)]
383
384 def getDiscoItems(self, requestor, target, nodeIdentifier=""):
385 return []