comparison sat/plugins/plugin_xep_0047.py @ 2624:56f94936df1e

code style reformatting using black
author Goffi <goffi@goffi.org>
date Wed, 27 Jun 2018 20:14:46 +0200
parents 26edcf3a30eb
children 003b8b4b56a7
comparison
equal deleted inserted replaced
2623:49533de4540b 2624:56f94936df1e
17 # You should have received a copy of the GNU Affero General Public License 17 # You should have received a copy of the GNU Affero General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>. 18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 19
20 from sat.core.i18n import _ 20 from sat.core.i18n import _
21 from sat.core.log import getLogger 21 from sat.core.log import getLogger
22
22 log = getLogger(__name__) 23 log = getLogger(__name__)
23 from sat.core.constants import Const as C 24 from sat.core.constants import Const as C
24 from sat.core import exceptions 25 from sat.core import exceptions
25 from twisted.words.protocols.jabber import jid 26 from twisted.words.protocols.jabber import jid
26 from twisted.words.protocols.jabber import xmlstream 27 from twisted.words.protocols.jabber import xmlstream
38 try: 39 try:
39 from twisted.words.protocols.xmlstream import XMPPHandler 40 from twisted.words.protocols.xmlstream import XMPPHandler
40 except ImportError: 41 except ImportError:
41 from wokkel.subprotocols import XMPPHandler 42 from wokkel.subprotocols import XMPPHandler
42 43
43 MESSAGE = '/message' 44 MESSAGE = "/message"
44 IQ_SET = '/iq[@type="set"]' 45 IQ_SET = '/iq[@type="set"]'
45 NS_IBB = 'http://jabber.org/protocol/ibb' 46 NS_IBB = "http://jabber.org/protocol/ibb"
46 IBB_OPEN = IQ_SET + '/open[@xmlns="' + NS_IBB + '"]' 47 IBB_OPEN = IQ_SET + '/open[@xmlns="' + NS_IBB + '"]'
47 IBB_CLOSE = IQ_SET + '/close[@xmlns="' + NS_IBB + '" and @sid="{}"]' 48 IBB_CLOSE = IQ_SET + '/close[@xmlns="' + NS_IBB + '" and @sid="{}"]'
48 IBB_IQ_DATA = IQ_SET + '/data[@xmlns="' + NS_IBB + '" and @sid="{}"]' 49 IBB_IQ_DATA = IQ_SET + '/data[@xmlns="' + NS_IBB + '" and @sid="{}"]'
49 IBB_MESSAGE_DATA = MESSAGE + '/data[@xmlns="' + NS_IBB + '" and @sid="{}"]' 50 IBB_MESSAGE_DATA = MESSAGE + '/data[@xmlns="' + NS_IBB + '" and @sid="{}"]'
50 TIMEOUT = 120 # timeout for workflow 51 TIMEOUT = 120 # timeout for workflow
51 DEFER_KEY = 'finished' # key of the deferred used to track session end 52 DEFER_KEY = "finished" # key of the deferred used to track session end
52 53
53 PLUGIN_INFO = { 54 PLUGIN_INFO = {
54 C.PI_NAME: "In-Band Bytestream Plugin", 55 C.PI_NAME: "In-Band Bytestream Plugin",
55 C.PI_IMPORT_NAME: "XEP-0047", 56 C.PI_IMPORT_NAME: "XEP-0047",
56 C.PI_TYPE: "XEP", 57 C.PI_TYPE: "XEP",
57 C.PI_MODES: C.PLUG_MODE_BOTH, 58 C.PI_MODES: C.PLUG_MODE_BOTH,
58 C.PI_PROTOCOLS: ["XEP-0047"], 59 C.PI_PROTOCOLS: ["XEP-0047"],
59 C.PI_MAIN: "XEP_0047", 60 C.PI_MAIN: "XEP_0047",
60 C.PI_HANDLER: "yes", 61 C.PI_HANDLER: "yes",
61 C.PI_DESCRIPTION: _("""Implementation of In-Band Bytestreams""") 62 C.PI_DESCRIPTION: _("""Implementation of In-Band Bytestreams"""),
62 } 63 }
63 64
64 65
65 class XEP_0047(object): 66 class XEP_0047(object):
66 NAMESPACE = NS_IBB 67 NAMESPACE = NS_IBB
80 """Delete current_stream id, called after timeout 81 """Delete current_stream id, called after timeout
81 82
82 @param sid(unicode): session id of client.xep_0047_current_stream 83 @param sid(unicode): session id of client.xep_0047_current_stream
83 @param client: %(doc_client)s 84 @param client: %(doc_client)s
84 """ 85 """
85 log.info(u"In-Band Bytestream: TimeOut reached for id {sid} [{profile}]" 86 log.info(
86 .format(sid=sid, profile=client.profile)) 87 u"In-Band Bytestream: TimeOut reached for id {sid} [{profile}]".format(
88 sid=sid, profile=client.profile
89 )
90 )
87 self._killSession(sid, client, "TIMEOUT") 91 self._killSession(sid, client, "TIMEOUT")
88 92
89 def _killSession(self, sid, client, failure_reason=None): 93 def _killSession(self, sid, client, failure_reason=None):
90 """Delete a current_stream id, clean up associated observers 94 """Delete a current_stream id, clean up associated observers
91 95
99 except KeyError: 103 except KeyError:
100 log.warning(u"kill id called on a non existant id") 104 log.warning(u"kill id called on a non existant id")
101 return 105 return
102 106
103 try: 107 try:
104 observer_cb = session['observer_cb'] 108 observer_cb = session["observer_cb"]
105 except KeyError: 109 except KeyError:
106 pass 110 pass
107 else: 111 else:
108 client.xmlstream.removeObserver(session["event_data"], observer_cb) 112 client.xmlstream.removeObserver(session["event_data"], observer_cb)
109 113
110 if session['timer'].active(): 114 if session["timer"].active():
111 session['timer'].cancel() 115 session["timer"].cancel()
112 116
113 del client.xep_0047_current_stream[sid] 117 del client.xep_0047_current_stream[sid]
114 118
115 success = failure_reason is None 119 success = failure_reason is None
116 stream_d = session[DEFER_KEY] 120 stream_d = session[DEFER_KEY]
134 @param to_jid(jid.JId): jid of the other peer 138 @param to_jid(jid.JId): jid of the other peer
135 @param sid(unicode): session id 139 @param sid(unicode): session id
136 @return (dict): session data 140 @return (dict): session data
137 """ 141 """
138 if sid in client.xep_0047_current_stream: 142 if sid in client.xep_0047_current_stream:
139 raise exceptions.ConflictError(u'A session with this id already exists !') 143 raise exceptions.ConflictError(u"A session with this id already exists !")
140 session_data = client.xep_0047_current_stream[sid] = \ 144 session_data = client.xep_0047_current_stream[sid] = {
141 {'id': sid, 145 "id": sid,
142 DEFER_KEY: defer.Deferred(), 146 DEFER_KEY: defer.Deferred(),
143 'to': to_jid, 147 "to": to_jid,
144 'stream_object': stream_object, 148 "stream_object": stream_object,
145 'seq': -1, 149 "seq": -1,
146 'timer': reactor.callLater(TIMEOUT, self._timeOut, sid, client), 150 "timer": reactor.callLater(TIMEOUT, self._timeOut, sid, client),
147 } 151 }
148 152
149 return session_data 153 return session_data
150 154
151 def _onIBBOpen(self, iq_elt, client): 155 def _onIBBOpen(self, iq_elt, client):
152 """"Called when an IBB <open> element is received 156 """"Called when an IBB <open> element is received
153 157
154 @param iq_elt(domish.Element): the whole <iq> stanza 158 @param iq_elt(domish.Element): the whole <iq> stanza
155 """ 159 """
156 log.debug(_(u"IBB stream opening")) 160 log.debug(_(u"IBB stream opening"))
157 iq_elt.handled = True 161 iq_elt.handled = True
158 open_elt = iq_elt.elements(NS_IBB, 'open').next() 162 open_elt = iq_elt.elements(NS_IBB, "open").next()
159 block_size = open_elt.getAttribute('block-size') 163 block_size = open_elt.getAttribute("block-size")
160 sid = open_elt.getAttribute('sid') 164 sid = open_elt.getAttribute("sid")
161 stanza = open_elt.getAttribute('stanza', 'iq') 165 stanza = open_elt.getAttribute("stanza", "iq")
162 if not sid or not block_size or int(block_size) > 65535: 166 if not sid or not block_size or int(block_size) > 65535:
163 return self._sendError('not-acceptable', sid or None, iq_elt, client) 167 return self._sendError("not-acceptable", sid or None, iq_elt, client)
164 if not sid in client.xep_0047_current_stream: 168 if not sid in client.xep_0047_current_stream:
165 log.warning(_(u"Ignoring unexpected IBB transfer: %s" % sid)) 169 log.warning(_(u"Ignoring unexpected IBB transfer: %s" % sid))
166 return self._sendError('not-acceptable', sid or None, iq_elt, client) 170 return self._sendError("not-acceptable", sid or None, iq_elt, client)
167 session_data = client.xep_0047_current_stream[sid] 171 session_data = client.xep_0047_current_stream[sid]
168 if session_data["to"] != jid.JID(iq_elt['from']): 172 if session_data["to"] != jid.JID(iq_elt["from"]):
169 log.warning(_("sended jid inconsistency (man in the middle attack attempt ?)")) 173 log.warning(
170 return self._sendError('not-acceptable', sid, iq_elt, client) 174 _("sended jid inconsistency (man in the middle attack attempt ?)")
175 )
176 return self._sendError("not-acceptable", sid, iq_elt, client)
171 177
172 # at this stage, the session looks ok and will be accepted 178 # at this stage, the session looks ok and will be accepted
173 179
174 # we reset the timeout: 180 # we reset the timeout:
175 session_data["timer"].reset(TIMEOUT) 181 session_data["timer"].reset(TIMEOUT)
176 182
177 # we save the xmlstream, events and observer data to allow observer removal 183 # we save the xmlstream, events and observer data to allow observer removal
178 session_data["event_data"] = event_data = (IBB_MESSAGE_DATA if stanza == 'message' else IBB_IQ_DATA).format(sid) 184 session_data["event_data"] = event_data = (
185 IBB_MESSAGE_DATA if stanza == "message" else IBB_IQ_DATA
186 ).format(sid)
179 session_data["observer_cb"] = observer_cb = self._onIBBData 187 session_data["observer_cb"] = observer_cb = self._onIBBData
180 event_close = IBB_CLOSE.format(sid) 188 event_close = IBB_CLOSE.format(sid)
181 # we now set the stream observer to look after data packet 189 # we now set the stream observer to look after data packet
182 # FIXME: if we never get the events, the observers stay. 190 # FIXME: if we never get the events, the observers stay.
183 # would be better to have generic observer and check id once triggered 191 # would be better to have generic observer and check id once triggered
184 client.xmlstream.addObserver(event_data, observer_cb, client=client) 192 client.xmlstream.addObserver(event_data, observer_cb, client=client)
185 client.xmlstream.addOnetimeObserver(event_close, self._onIBBClose, client=client) 193 client.xmlstream.addOnetimeObserver(event_close, self._onIBBClose, client=client)
186 # finally, we send the accept stanza 194 # finally, we send the accept stanza
187 iq_result_elt = xmlstream.toResponse(iq_elt, 'result') 195 iq_result_elt = xmlstream.toResponse(iq_elt, "result")
188 client.send(iq_result_elt) 196 client.send(iq_result_elt)
189 197
190 def _onIBBClose(self, iq_elt, client): 198 def _onIBBClose(self, iq_elt, client):
191 """"Called when an IBB <close> element is received 199 """"Called when an IBB <close> element is received
192 200
193 @param iq_elt(domish.Element): the whole <iq> stanza 201 @param iq_elt(domish.Element): the whole <iq> stanza
194 """ 202 """
195 iq_elt.handled = True 203 iq_elt.handled = True
196 log.debug(_("IBB stream closing")) 204 log.debug(_("IBB stream closing"))
197 close_elt = iq_elt.elements(NS_IBB, 'close').next() 205 close_elt = iq_elt.elements(NS_IBB, "close").next()
198 # XXX: this observer is only triggered on valid sid, so we don't need to check it 206 # XXX: this observer is only triggered on valid sid, so we don't need to check it
199 sid = close_elt['sid'] 207 sid = close_elt["sid"]
200 208
201 iq_result_elt = xmlstream.toResponse(iq_elt, 'result') 209 iq_result_elt = xmlstream.toResponse(iq_elt, "result")
202 client.send(iq_result_elt) 210 client.send(iq_result_elt)
203 self._killSession(sid, client) 211 self._killSession(sid, client)
204 212
205 def _onIBBData(self, element, client): 213 def _onIBBData(self, element, client):
206 """Observer called on <iq> or <message> stanzas with data element 214 """Observer called on <iq> or <message> stanzas with data element
207 215
208 Manage the data elelement (check validity and write to the stream_object) 216 Manage the data elelement (check validity and write to the stream_object)
209 @param element(domish.Element): <iq> or <message> stanza 217 @param element(domish.Element): <iq> or <message> stanza
210 """ 218 """
211 element.handled = True 219 element.handled = True
212 data_elt = element.elements(NS_IBB, 'data').next() 220 data_elt = element.elements(NS_IBB, "data").next()
213 sid = data_elt['sid'] 221 sid = data_elt["sid"]
214 222
215 try: 223 try:
216 session_data = client.xep_0047_current_stream[sid] 224 session_data = client.xep_0047_current_stream[sid]
217 except KeyError: 225 except KeyError:
218 log.warning(_(u"Received data for an unknown session id")) 226 log.warning(_(u"Received data for an unknown session id"))
219 return self._sendError('item-not-found', None, element, client) 227 return self._sendError("item-not-found", None, element, client)
220 228
221 from_jid = session_data["to"] 229 from_jid = session_data["to"]
222 stream_object = session_data["stream_object"] 230 stream_object = session_data["stream_object"]
223 231
224 if from_jid.full() != element['from']: 232 if from_jid.full() != element["from"]:
225 log.warning(_(u"sended jid inconsistency (man in the middle attack attempt ?)\ninitial={initial}\ngiven={given}").format(initial=from_jid, given=element['from'])) 233 log.warning(
226 if element.name == 'iq': 234 _(
227 self._sendError('not-acceptable', sid, element, client) 235 u"sended jid inconsistency (man in the middle attack attempt ?)\ninitial={initial}\ngiven={given}"
236 ).format(initial=from_jid, given=element["from"])
237 )
238 if element.name == "iq":
239 self._sendError("not-acceptable", sid, element, client)
228 return 240 return
229 241
230 session_data["seq"] = (session_data["seq"] + 1) % 65535 242 session_data["seq"] = (session_data["seq"] + 1) % 65535
231 if int(data_elt.getAttribute("seq", -1)) != session_data["seq"]: 243 if int(data_elt.getAttribute("seq", -1)) != session_data["seq"]:
232 log.warning(_(u"Sequence error")) 244 log.warning(_(u"Sequence error"))
233 if element.name == 'iq': 245 if element.name == "iq":
234 reason = 'not-acceptable' 246 reason = "not-acceptable"
235 self._sendError(reason, sid, element, client) 247 self._sendError(reason, sid, element, client)
236 self.terminateStream(session_data, client, reason) 248 self.terminateStream(session_data, client, reason)
237 return 249 return
238 250
239 # we reset the timeout: 251 # we reset the timeout:
243 try: 255 try:
244 stream_object.write(base64.b64decode(str(data_elt))) 256 stream_object.write(base64.b64decode(str(data_elt)))
245 except TypeError: 257 except TypeError:
246 # The base64 data is invalid 258 # The base64 data is invalid
247 log.warning(_(u"Invalid base64 data")) 259 log.warning(_(u"Invalid base64 data"))
248 if element.name == 'iq': 260 if element.name == "iq":
249 self._sendError('not-acceptable', sid, element, client) 261 self._sendError("not-acceptable", sid, element, client)
250 self.terminateStream(session_data, client, reason) 262 self.terminateStream(session_data, client, reason)
251 return 263 return
252 264
253 # we can now ack success 265 # we can now ack success
254 if element.name == 'iq': 266 if element.name == "iq":
255 iq_result_elt = xmlstream.toResponse(element, 'result') 267 iq_result_elt = xmlstream.toResponse(element, "result")
256 client.send(iq_result_elt) 268 client.send(iq_result_elt)
257 269
258 def _sendError(self, error_condition, sid, iq_elt, client): 270 def _sendError(self, error_condition, sid, iq_elt, client):
259 """Send error stanza 271 """Send error stanza
260 272
262 @param sid(unicode,None): jingle session id, or None, if session must not be destroyed 274 @param sid(unicode,None): jingle session id, or None, if session must not be destroyed
263 @param iq_elt(domish.Element): full <iq> stanza 275 @param iq_elt(domish.Element): full <iq> stanza
264 @param client: %(doc_client)s 276 @param client: %(doc_client)s
265 """ 277 """
266 iq_elt = error.StanzaError(error_condition).toResponse(iq_elt) 278 iq_elt = error.StanzaError(error_condition).toResponse(iq_elt)
267 log.warning(u"Error while managing in-band bytestream session, cancelling: {}".format(error_condition)) 279 log.warning(
280 u"Error while managing in-band bytestream session, cancelling: {}".format(
281 error_condition
282 )
283 )
268 if sid is not None: 284 if sid is not None:
269 self._killSession(sid, client, error_condition) 285 self._killSession(sid, client, error_condition)
270 client.send(iq_elt) 286 client.send(iq_elt)
271 287
272 def startStream(self, client, stream_object, to_jid, sid, block_size=None): 288 def startStream(self, client, stream_object, to_jid, sid, block_size=None):
283 block_size = XEP_0047.BLOCK_SIZE 299 block_size = XEP_0047.BLOCK_SIZE
284 assert block_size <= 65535 300 assert block_size <= 65535
285 session_data["block_size"] = block_size 301 session_data["block_size"] = block_size
286 302
287 iq_elt = client.IQ() 303 iq_elt = client.IQ()
288 iq_elt['to'] = to_jid.full() 304 iq_elt["to"] = to_jid.full()
289 open_elt = iq_elt.addElement((NS_IBB, 'open')) 305 open_elt = iq_elt.addElement((NS_IBB, "open"))
290 open_elt['block-size'] = str(block_size) 306 open_elt["block-size"] = str(block_size)
291 open_elt['sid'] = sid 307 open_elt["sid"] = sid
292 open_elt['stanza'] = 'iq' # TODO: manage <message> stanza ? 308 open_elt["stanza"] = "iq" # TODO: manage <message> stanza ?
293 args = [session_data, client] 309 args = [session_data, client]
294 d = iq_elt.send() 310 d = iq_elt.send()
295 d.addCallbacks(self._IQDataStreamCb, self._IQDataStreamEb, args, None, args) 311 d.addCallbacks(self._IQDataStreamCb, self._IQDataStreamEb, args, None, args)
296 return session_data[DEFER_KEY] 312 return session_data[DEFER_KEY]
297 313
305 session_data["timer"].reset(TIMEOUT) 321 session_data["timer"].reset(TIMEOUT)
306 322
307 buffer_ = session_data["stream_object"].read(session_data["block_size"]) 323 buffer_ = session_data["stream_object"].read(session_data["block_size"])
308 if buffer_: 324 if buffer_:
309 next_iq_elt = client.IQ() 325 next_iq_elt = client.IQ()
310 next_iq_elt['to'] = session_data["to"].full() 326 next_iq_elt["to"] = session_data["to"].full()
311 data_elt = next_iq_elt.addElement((NS_IBB, 'data')) 327 data_elt = next_iq_elt.addElement((NS_IBB, "data"))
312 seq = session_data['seq'] = (session_data['seq'] + 1) % 65535 328 seq = session_data["seq"] = (session_data["seq"] + 1) % 65535
313 data_elt['seq'] = unicode(seq) 329 data_elt["seq"] = unicode(seq)
314 data_elt['sid'] = session_data['id'] 330 data_elt["sid"] = session_data["id"]
315 data_elt.addContent(base64.b64encode(buffer_)) 331 data_elt.addContent(base64.b64encode(buffer_))
316 args = [session_data, client] 332 args = [session_data, client]
317 d = next_iq_elt.send() 333 d = next_iq_elt.send()
318 d.addCallbacks(self._IQDataStreamCb, self._IQDataStreamEb, args, None, args) 334 d.addCallbacks(self._IQDataStreamCb, self._IQDataStreamEb, args, None, args)
319 else: 335 else:
332 @param session_data(dict): data of this streaming session 348 @param session_data(dict): data of this streaming session
333 @param client: %(doc_client)s 349 @param client: %(doc_client)s
334 @param failure_reason(unicode, None): reason of the failure, or None if steam was successful 350 @param failure_reason(unicode, None): reason of the failure, or None if steam was successful
335 """ 351 """
336 iq_elt = client.IQ() 352 iq_elt = client.IQ()
337 iq_elt['to'] = session_data["to"].full() 353 iq_elt["to"] = session_data["to"].full()
338 close_elt = iq_elt.addElement((NS_IBB, 'close')) 354 close_elt = iq_elt.addElement((NS_IBB, "close"))
339 close_elt['sid'] = session_data['id'] 355 close_elt["sid"] = session_data["id"]
340 iq_elt.send() 356 iq_elt.send()
341 self._killSession(session_data['id'], client, failure_reason) 357 self._killSession(session_data["id"], client, failure_reason)
342 358
343 359
344 class XEP_0047_handler(XMPPHandler): 360 class XEP_0047_handler(XMPPHandler):
345 implements(iwokkel.IDisco) 361 implements(iwokkel.IDisco)
346 362
347 def __init__(self, parent): 363 def __init__(self, parent):
348 self.plugin_parent = parent 364 self.plugin_parent = parent
349 365
350 def connectionInitialized(self): 366 def connectionInitialized(self):
351 self.xmlstream.addObserver(IBB_OPEN, self.plugin_parent._onIBBOpen, client=self.parent) 367 self.xmlstream.addObserver(
352 368 IBB_OPEN, self.plugin_parent._onIBBOpen, client=self.parent
353 def getDiscoInfo(self, requestor, target, nodeIdentifier=''): 369 )
370
371 def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
354 return [disco.DiscoFeature(NS_IBB)] 372 return [disco.DiscoFeature(NS_IBB)]
355 373
356 def getDiscoItems(self, requestor, target, nodeIdentifier=''): 374 def getDiscoItems(self, requestor, target, nodeIdentifier=""):
357 return [] 375 return []