comparison sat/plugins/plugin_xep_0047.py @ 2562:26edcf3a30eb

core, setup: huge cleaning: - moved directories from src and frontends/src to sat and sat_frontends, which is the recommanded naming convention - move twisted directory to root - removed all hacks from setup.py, and added missing dependencies, it is now clean - use https URL for website in setup.py - removed "Environment :: X11 Applications :: GTK", as wix is deprecated and removed - renamed sat.sh to sat and fixed its installation - added python_requires to specify Python version needed - replaced glib2reactor which use deprecated code by gtk3reactor sat can now be installed directly from virtualenv without using --system-site-packages anymore \o/
author Goffi <goffi@goffi.org>
date Mon, 02 Apr 2018 19:44:50 +0200
parents src/plugins/plugin_xep_0047.py@67cc54b01a12
children 56f94936df1e
comparison
equal deleted inserted replaced
2561:bd30dc3ffe5a 2562:26edcf3a30eb
1 #!/usr/bin/env python2
2 # -*- coding: utf-8 -*-
3
4 # SAT plugin for managing gateways (xep-0047)
5 # Copyright (C) 2009-2018 Jérôme Poisson (goffi@goffi.org)
6
7 # This program is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU Affero General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
11
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU Affero General Public License for more details.
16
17 # You should have received a copy of the GNU Affero General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20 from sat.core.i18n import _
21 from sat.core.log import getLogger
22 log = getLogger(__name__)
23 from sat.core.constants import Const as C
24 from sat.core import exceptions
25 from twisted.words.protocols.jabber import jid
26 from twisted.words.protocols.jabber import xmlstream
27 from twisted.words.protocols.jabber import error
28 from twisted.internet import reactor
29 from twisted.internet import defer
30 from twisted.python import failure
31
32 from wokkel import disco, iwokkel
33
34 from zope.interface import implements
35
36 import base64
37
38 try:
39 from twisted.words.protocols.xmlstream import XMPPHandler
40 except ImportError:
41 from wokkel.subprotocols import XMPPHandler
42
43 MESSAGE = '/message'
44 IQ_SET = '/iq[@type="set"]'
45 NS_IBB = 'http://jabber.org/protocol/ibb'
46 IBB_OPEN = IQ_SET + '/open[@xmlns="' + NS_IBB + '"]'
47 IBB_CLOSE = IQ_SET + '/close[@xmlns="' + NS_IBB + '" and @sid="{}"]'
48 IBB_IQ_DATA = IQ_SET + '/data[@xmlns="' + NS_IBB + '" and @sid="{}"]'
49 IBB_MESSAGE_DATA = MESSAGE + '/data[@xmlns="' + NS_IBB + '" and @sid="{}"]'
50 TIMEOUT = 120 # timeout for workflow
51 DEFER_KEY = 'finished' # key of the deferred used to track session end
52
53 PLUGIN_INFO = {
54 C.PI_NAME: "In-Band Bytestream Plugin",
55 C.PI_IMPORT_NAME: "XEP-0047",
56 C.PI_TYPE: "XEP",
57 C.PI_MODES: C.PLUG_MODE_BOTH,
58 C.PI_PROTOCOLS: ["XEP-0047"],
59 C.PI_MAIN: "XEP_0047",
60 C.PI_HANDLER: "yes",
61 C.PI_DESCRIPTION: _("""Implementation of In-Band Bytestreams""")
62 }
63
64
65 class XEP_0047(object):
66 NAMESPACE = NS_IBB
67 BLOCK_SIZE = 4096
68
69 def __init__(self, host):
70 log.info(_("In-Band Bytestreams plugin initialization"))
71 self.host = host
72
73 def getHandler(self, client):
74 return XEP_0047_handler(self)
75
76 def profileConnected(self, client):
77 client.xep_0047_current_stream = {} # key: stream_id, value: data(dict)
78
79 def _timeOut(self, sid, client):
80 """Delete current_stream id, called after timeout
81
82 @param sid(unicode): session id of client.xep_0047_current_stream
83 @param client: %(doc_client)s
84 """
85 log.info(u"In-Band Bytestream: TimeOut reached for id {sid} [{profile}]"
86 .format(sid=sid, profile=client.profile))
87 self._killSession(sid, client, "TIMEOUT")
88
89 def _killSession(self, sid, client, failure_reason=None):
90 """Delete a current_stream id, clean up associated observers
91
92 @param sid(unicode): session id
93 @param client: %(doc_client)s
94 @param failure_reason(None, unicode): if None the session is successful
95 else, will be used to call failure_cb
96 """
97 try:
98 session = client.xep_0047_current_stream[sid]
99 except KeyError:
100 log.warning(u"kill id called on a non existant id")
101 return
102
103 try:
104 observer_cb = session['observer_cb']
105 except KeyError:
106 pass
107 else:
108 client.xmlstream.removeObserver(session["event_data"], observer_cb)
109
110 if session['timer'].active():
111 session['timer'].cancel()
112
113 del client.xep_0047_current_stream[sid]
114
115 success = failure_reason is None
116 stream_d = session[DEFER_KEY]
117
118 if success:
119 stream_d.callback(None)
120 else:
121 stream_d.errback(failure.Failure(exceptions.DataError(failure_reason)))
122
123 def createSession(self, *args, **kwargs):
124 """like [_createSession] but return the session deferred instead of the whole session
125
126 session deferred is fired when transfer is finished
127 """
128 return self._createSession(*args, **kwargs)[DEFER_KEY]
129
130 def _createSession(self, client, stream_object, to_jid, sid):
131 """Called when a bytestream is imminent
132
133 @param stream_object(IConsumer): stream object where data will be written
134 @param to_jid(jid.JId): jid of the other peer
135 @param sid(unicode): session id
136 @return (dict): session data
137 """
138 if sid in client.xep_0047_current_stream:
139 raise exceptions.ConflictError(u'A session with this id already exists !')
140 session_data = client.xep_0047_current_stream[sid] = \
141 {'id': sid,
142 DEFER_KEY: defer.Deferred(),
143 'to': to_jid,
144 'stream_object': stream_object,
145 'seq': -1,
146 'timer': reactor.callLater(TIMEOUT, self._timeOut, sid, client),
147 }
148
149 return session_data
150
151 def _onIBBOpen(self, iq_elt, client):
152 """"Called when an IBB <open> element is received
153
154 @param iq_elt(domish.Element): the whole <iq> stanza
155 """
156 log.debug(_(u"IBB stream opening"))
157 iq_elt.handled = True
158 open_elt = iq_elt.elements(NS_IBB, 'open').next()
159 block_size = open_elt.getAttribute('block-size')
160 sid = open_elt.getAttribute('sid')
161 stanza = open_elt.getAttribute('stanza', 'iq')
162 if not sid or not block_size or int(block_size) > 65535:
163 return self._sendError('not-acceptable', sid or None, iq_elt, client)
164 if not sid in client.xep_0047_current_stream:
165 log.warning(_(u"Ignoring unexpected IBB transfer: %s" % sid))
166 return self._sendError('not-acceptable', sid or None, iq_elt, client)
167 session_data = client.xep_0047_current_stream[sid]
168 if session_data["to"] != jid.JID(iq_elt['from']):
169 log.warning(_("sended jid inconsistency (man in the middle attack attempt ?)"))
170 return self._sendError('not-acceptable', sid, iq_elt, client)
171
172 # at this stage, the session looks ok and will be accepted
173
174 # we reset the timeout:
175 session_data["timer"].reset(TIMEOUT)
176
177 # we save the xmlstream, events and observer data to allow observer removal
178 session_data["event_data"] = event_data = (IBB_MESSAGE_DATA if stanza == 'message' else IBB_IQ_DATA).format(sid)
179 session_data["observer_cb"] = observer_cb = self._onIBBData
180 event_close = IBB_CLOSE.format(sid)
181 # we now set the stream observer to look after data packet
182 # FIXME: if we never get the events, the observers stay.
183 # would be better to have generic observer and check id once triggered
184 client.xmlstream.addObserver(event_data, observer_cb, client=client)
185 client.xmlstream.addOnetimeObserver(event_close, self._onIBBClose, client=client)
186 # finally, we send the accept stanza
187 iq_result_elt = xmlstream.toResponse(iq_elt, 'result')
188 client.send(iq_result_elt)
189
190 def _onIBBClose(self, iq_elt, client):
191 """"Called when an IBB <close> element is received
192
193 @param iq_elt(domish.Element): the whole <iq> stanza
194 """
195 iq_elt.handled = True
196 log.debug(_("IBB stream closing"))
197 close_elt = iq_elt.elements(NS_IBB, 'close').next()
198 # XXX: this observer is only triggered on valid sid, so we don't need to check it
199 sid = close_elt['sid']
200
201 iq_result_elt = xmlstream.toResponse(iq_elt, 'result')
202 client.send(iq_result_elt)
203 self._killSession(sid, client)
204
205 def _onIBBData(self, element, client):
206 """Observer called on <iq> or <message> stanzas with data element
207
208 Manage the data elelement (check validity and write to the stream_object)
209 @param element(domish.Element): <iq> or <message> stanza
210 """
211 element.handled = True
212 data_elt = element.elements(NS_IBB, 'data').next()
213 sid = data_elt['sid']
214
215 try:
216 session_data = client.xep_0047_current_stream[sid]
217 except KeyError:
218 log.warning(_(u"Received data for an unknown session id"))
219 return self._sendError('item-not-found', None, element, client)
220
221 from_jid = session_data["to"]
222 stream_object = session_data["stream_object"]
223
224 if from_jid.full() != element['from']:
225 log.warning(_(u"sended jid inconsistency (man in the middle attack attempt ?)\ninitial={initial}\ngiven={given}").format(initial=from_jid, given=element['from']))
226 if element.name == 'iq':
227 self._sendError('not-acceptable', sid, element, client)
228 return
229
230 session_data["seq"] = (session_data["seq"] + 1) % 65535
231 if int(data_elt.getAttribute("seq", -1)) != session_data["seq"]:
232 log.warning(_(u"Sequence error"))
233 if element.name == 'iq':
234 reason = 'not-acceptable'
235 self._sendError(reason, sid, element, client)
236 self.terminateStream(session_data, client, reason)
237 return
238
239 # we reset the timeout:
240 session_data["timer"].reset(TIMEOUT)
241
242 # we can now decode the data
243 try:
244 stream_object.write(base64.b64decode(str(data_elt)))
245 except TypeError:
246 # The base64 data is invalid
247 log.warning(_(u"Invalid base64 data"))
248 if element.name == 'iq':
249 self._sendError('not-acceptable', sid, element, client)
250 self.terminateStream(session_data, client, reason)
251 return
252
253 # we can now ack success
254 if element.name == 'iq':
255 iq_result_elt = xmlstream.toResponse(element, 'result')
256 client.send(iq_result_elt)
257
258 def _sendError(self, error_condition, sid, iq_elt, client):
259 """Send error stanza
260
261 @param error_condition: one of twisted.words.protocols.jabber.error.STANZA_CONDITIONS keys
262 @param sid(unicode,None): jingle session id, or None, if session must not be destroyed
263 @param iq_elt(domish.Element): full <iq> stanza
264 @param client: %(doc_client)s
265 """
266 iq_elt = error.StanzaError(error_condition).toResponse(iq_elt)
267 log.warning(u"Error while managing in-band bytestream session, cancelling: {}".format(error_condition))
268 if sid is not None:
269 self._killSession(sid, client, error_condition)
270 client.send(iq_elt)
271
272 def startStream(self, client, stream_object, to_jid, sid, block_size=None):
273 """Launch the stream workflow
274
275 @param stream_object(ifaces.IStreamProducer): stream object to send
276 @param to_jid(jid.JID): JID of the recipient
277 @param sid(unicode): Stream session id
278 @param block_size(int, None): size of the block (or None for default)
279 """
280 session_data = self._createSession(client, stream_object, to_jid, sid)
281
282 if block_size is None:
283 block_size = XEP_0047.BLOCK_SIZE
284 assert block_size <= 65535
285 session_data["block_size"] = block_size
286
287 iq_elt = client.IQ()
288 iq_elt['to'] = to_jid.full()
289 open_elt = iq_elt.addElement((NS_IBB, 'open'))
290 open_elt['block-size'] = str(block_size)
291 open_elt['sid'] = sid
292 open_elt['stanza'] = 'iq' # TODO: manage <message> stanza ?
293 args = [session_data, client]
294 d = iq_elt.send()
295 d.addCallbacks(self._IQDataStreamCb, self._IQDataStreamEb, args, None, args)
296 return session_data[DEFER_KEY]
297
298 def _IQDataStreamCb(self, iq_elt, session_data, client):
299 """Called during the whole data streaming
300
301 @param iq_elt(domish.Element): iq result
302 @param session_data(dict): data of this streaming session
303 @param client: %(doc_client)s
304 """
305 session_data["timer"].reset(TIMEOUT)
306
307 buffer_ = session_data["stream_object"].read(session_data["block_size"])
308 if buffer_:
309 next_iq_elt = client.IQ()
310 next_iq_elt['to'] = session_data["to"].full()
311 data_elt = next_iq_elt.addElement((NS_IBB, 'data'))
312 seq = session_data['seq'] = (session_data['seq'] + 1) % 65535
313 data_elt['seq'] = unicode(seq)
314 data_elt['sid'] = session_data['id']
315 data_elt.addContent(base64.b64encode(buffer_))
316 args = [session_data, client]
317 d = next_iq_elt.send()
318 d.addCallbacks(self._IQDataStreamCb, self._IQDataStreamEb, args, None, args)
319 else:
320 self.terminateStream(session_data, client)
321
322 def _IQDataStreamEb(self, failure, session_data, client):
323 if failure.check(error.StanzaError):
324 log.warning(u"IBB transfer failed: {}".format(failure.value))
325 else:
326 log.error(u"IBB transfer failed: {}".format(failure.value))
327 self.terminateStream(session_data, client, "IQ_ERROR")
328
329 def terminateStream(self, session_data, client, failure_reason=None):
330 """Terminate the stream session
331
332 @param session_data(dict): data of this streaming session
333 @param client: %(doc_client)s
334 @param failure_reason(unicode, None): reason of the failure, or None if steam was successful
335 """
336 iq_elt = client.IQ()
337 iq_elt['to'] = session_data["to"].full()
338 close_elt = iq_elt.addElement((NS_IBB, 'close'))
339 close_elt['sid'] = session_data['id']
340 iq_elt.send()
341 self._killSession(session_data['id'], client, failure_reason)
342
343
344 class XEP_0047_handler(XMPPHandler):
345 implements(iwokkel.IDisco)
346
347 def __init__(self, parent):
348 self.plugin_parent = parent
349
350 def connectionInitialized(self):
351 self.xmlstream.addObserver(IBB_OPEN, self.plugin_parent._onIBBOpen, client=self.parent)
352
353 def getDiscoInfo(self, requestor, target, nodeIdentifier=''):
354 return [disco.DiscoFeature(NS_IBB)]
355
356 def getDiscoItems(self, requestor, target, nodeIdentifier=''):
357 return []