comparison sat/plugins/plugin_xep_0047.py @ 4037:524856bd7b19

massive refactoring to switch from camelCase to snake_case: historically, Libervia (SàT before) was using camelCase as allowed by PEP8 when using a pre-PEP8 code, to use the same coding style as in Twisted. However, snake_case is more readable and it's better to follow PEP8 best practices, so it has been decided to move on full snake_case. Because Libervia has a huge codebase, this ended with a ugly mix of camelCase and snake_case. To fix that, this patch does a big refactoring by renaming every function and method (including bridge) that are not coming from Twisted or Wokkel, to use fully snake_case. This is a massive change, and may result in some bugs.
author Goffi <goffi@goffi.org>
date Sat, 08 Apr 2023 13:54:42 +0200
parents be6d91572633
children
comparison
equal deleted inserted replaced
4036:c4464d7ae97b 4037:524856bd7b19
69 69
70 def __init__(self, host): 70 def __init__(self, host):
71 log.info(_("In-Band Bytestreams plugin initialization")) 71 log.info(_("In-Band Bytestreams plugin initialization"))
72 self.host = host 72 self.host = host
73 73
74 def getHandler(self, client): 74 def get_handler(self, client):
75 return XEP_0047_handler(self) 75 return XEP_0047_handler(self)
76 76
77 def profileConnected(self, client): 77 def profile_connected(self, client):
78 client.xep_0047_current_stream = {} # key: stream_id, value: data(dict) 78 client.xep_0047_current_stream = {} # key: stream_id, value: data(dict)
79 79
80 def _timeOut(self, sid, client): 80 def _time_out(self, sid, client):
81 """Delete current_stream id, called after timeout 81 """Delete current_stream id, called after timeout
82 82
83 @param sid(unicode): session id of client.xep_0047_current_stream 83 @param sid(unicode): session id of client.xep_0047_current_stream
84 @param client: %(doc_client)s 84 @param client: %(doc_client)s
85 """ 85 """
86 log.info( 86 log.info(
87 "In-Band Bytestream: TimeOut reached for id {sid} [{profile}]".format( 87 "In-Band Bytestream: TimeOut reached for id {sid} [{profile}]".format(
88 sid=sid, profile=client.profile 88 sid=sid, profile=client.profile
89 ) 89 )
90 ) 90 )
91 self._killSession(sid, client, "TIMEOUT") 91 self._kill_session(sid, client, "TIMEOUT")
92 92
93 def _killSession(self, sid, client, failure_reason=None): 93 def _kill_session(self, sid, client, failure_reason=None):
94 """Delete a current_stream id, clean up associated observers 94 """Delete a current_stream id, clean up associated observers
95 95
96 @param sid(unicode): session id 96 @param sid(unicode): session id
97 @param client: %(doc_client)s 97 @param client: %(doc_client)s
98 @param failure_reason(None, unicode): if None the session is successful 98 @param failure_reason(None, unicode): if None the session is successful
122 if success: 122 if success:
123 stream_d.callback(None) 123 stream_d.callback(None)
124 else: 124 else:
125 stream_d.errback(failure.Failure(exceptions.DataError(failure_reason))) 125 stream_d.errback(failure.Failure(exceptions.DataError(failure_reason)))
126 126
127 def createSession(self, *args, **kwargs): 127 def create_session(self, *args, **kwargs):
128 """like [_createSession] but return the session deferred instead of the whole session 128 """like [_create_session] but return the session deferred instead of the whole session
129 129
130 session deferred is fired when transfer is finished 130 session deferred is fired when transfer is finished
131 """ 131 """
132 return self._createSession(*args, **kwargs)[DEFER_KEY] 132 return self._create_session(*args, **kwargs)[DEFER_KEY]
133 133
134 def _createSession(self, client, stream_object, local_jid, to_jid, sid): 134 def _create_session(self, client, stream_object, local_jid, to_jid, sid):
135 """Called when a bytestream is imminent 135 """Called when a bytestream is imminent
136 136
137 @param stream_object(IConsumer): stream object where data will be written 137 @param stream_object(IConsumer): stream object where data will be written
138 @param local_jid(jid.JID): same as [startStream] 138 @param local_jid(jid.JID): same as [start_stream]
139 @param to_jid(jid.JId): jid of the other peer 139 @param to_jid(jid.JId): jid of the other peer
140 @param sid(unicode): session id 140 @param sid(unicode): session id
141 @return (dict): session data 141 @return (dict): session data
142 """ 142 """
143 if sid in client.xep_0047_current_stream: 143 if sid in client.xep_0047_current_stream:
147 DEFER_KEY: defer.Deferred(), 147 DEFER_KEY: defer.Deferred(),
148 "local_jid": local_jid, 148 "local_jid": local_jid,
149 "to": to_jid, 149 "to": to_jid,
150 "stream_object": stream_object, 150 "stream_object": stream_object,
151 "seq": -1, 151 "seq": -1,
152 "timer": reactor.callLater(TIMEOUT, self._timeOut, sid, client), 152 "timer": reactor.callLater(TIMEOUT, self._time_out, sid, client),
153 } 153 }
154 154
155 return session_data 155 return session_data
156 156
157 def _onIBBOpen(self, iq_elt, client): 157 def _on_ibb_open(self, iq_elt, client):
158 """"Called when an IBB <open> element is received 158 """"Called when an IBB <open> element is received
159 159
160 @param iq_elt(domish.Element): the whole <iq> stanza 160 @param iq_elt(domish.Element): the whole <iq> stanza
161 """ 161 """
162 log.debug(_("IBB stream opening")) 162 log.debug(_("IBB stream opening"))
184 184
185 # we save the xmlstream, events and observer data to allow observer removal 185 # we save the xmlstream, events and observer data to allow observer removal
186 session_data["event_data"] = event_data = ( 186 session_data["event_data"] = event_data = (
187 IBB_MESSAGE_DATA if stanza == "message" else IBB_IQ_DATA 187 IBB_MESSAGE_DATA if stanza == "message" else IBB_IQ_DATA
188 ).format(sid) 188 ).format(sid)
189 session_data["observer_cb"] = observer_cb = self._onIBBData 189 session_data["observer_cb"] = observer_cb = self._on_ibb_data
190 event_close = IBB_CLOSE.format(sid) 190 event_close = IBB_CLOSE.format(sid)
191 # we now set the stream observer to look after data packet 191 # we now set the stream observer to look after data packet
192 # FIXME: if we never get the events, the observers stay. 192 # FIXME: if we never get the events, the observers stay.
193 # would be better to have generic observer and check id once triggered 193 # would be better to have generic observer and check id once triggered
194 client.xmlstream.addObserver(event_data, observer_cb, client=client) 194 client.xmlstream.addObserver(event_data, observer_cb, client=client)
195 client.xmlstream.addOnetimeObserver(event_close, self._onIBBClose, client=client) 195 client.xmlstream.addOnetimeObserver(event_close, self._on_ibb_close, client=client)
196 # finally, we send the accept stanza 196 # finally, we send the accept stanza
197 iq_result_elt = xmlstream.toResponse(iq_elt, "result") 197 iq_result_elt = xmlstream.toResponse(iq_elt, "result")
198 client.send(iq_result_elt) 198 client.send(iq_result_elt)
199 199
200 def _onIBBClose(self, iq_elt, client): 200 def _on_ibb_close(self, iq_elt, client):
201 """"Called when an IBB <close> element is received 201 """"Called when an IBB <close> element is received
202 202
203 @param iq_elt(domish.Element): the whole <iq> stanza 203 @param iq_elt(domish.Element): the whole <iq> stanza
204 """ 204 """
205 iq_elt.handled = True 205 iq_elt.handled = True
208 # XXX: this observer is only triggered on valid sid, so we don't need to check it 208 # XXX: this observer is only triggered on valid sid, so we don't need to check it
209 sid = close_elt["sid"] 209 sid = close_elt["sid"]
210 210
211 iq_result_elt = xmlstream.toResponse(iq_elt, "result") 211 iq_result_elt = xmlstream.toResponse(iq_elt, "result")
212 client.send(iq_result_elt) 212 client.send(iq_result_elt)
213 self._killSession(sid, client) 213 self._kill_session(sid, client)
214 214
215 def _onIBBData(self, element, client): 215 def _on_ibb_data(self, element, client):
216 """Observer called on <iq> or <message> stanzas with data element 216 """Observer called on <iq> or <message> stanzas with data element
217 217
218 Manage the data elelement (check validity and write to the stream_object) 218 Manage the data elelement (check validity and write to the stream_object)
219 @param element(domish.Element): <iq> or <message> stanza 219 @param element(domish.Element): <iq> or <message> stanza
220 """ 220 """
245 if int(data_elt.getAttribute("seq", -1)) != session_data["seq"]: 245 if int(data_elt.getAttribute("seq", -1)) != session_data["seq"]:
246 log.warning(_("Sequence error")) 246 log.warning(_("Sequence error"))
247 if element.name == "iq": 247 if element.name == "iq":
248 reason = "not-acceptable" 248 reason = "not-acceptable"
249 self._sendError(reason, sid, element, client) 249 self._sendError(reason, sid, element, client)
250 self.terminateStream(session_data, client, reason) 250 self.terminate_stream(session_data, client, reason)
251 return 251 return
252 252
253 # we reset the timeout: 253 # we reset the timeout:
254 session_data["timer"].reset(TIMEOUT) 254 session_data["timer"].reset(TIMEOUT)
255 255
259 except TypeError: 259 except TypeError:
260 # The base64 data is invalid 260 # The base64 data is invalid
261 log.warning(_("Invalid base64 data")) 261 log.warning(_("Invalid base64 data"))
262 if element.name == "iq": 262 if element.name == "iq":
263 self._sendError("not-acceptable", sid, element, client) 263 self._sendError("not-acceptable", sid, element, client)
264 self.terminateStream(session_data, client, reason) 264 self.terminate_stream(session_data, client, reason)
265 return 265 return
266 266
267 # we can now ack success 267 # we can now ack success
268 if element.name == "iq": 268 if element.name == "iq":
269 iq_result_elt = xmlstream.toResponse(element, "result") 269 iq_result_elt = xmlstream.toResponse(element, "result")
282 "Error while managing in-band bytestream session, cancelling: {}".format( 282 "Error while managing in-band bytestream session, cancelling: {}".format(
283 error_condition 283 error_condition
284 ) 284 )
285 ) 285 )
286 if sid is not None: 286 if sid is not None:
287 self._killSession(sid, client, error_condition) 287 self._kill_session(sid, client, error_condition)
288 client.send(iq_elt) 288 client.send(iq_elt)
289 289
290 def startStream(self, client, stream_object, local_jid, to_jid, sid, block_size=None): 290 def start_stream(self, client, stream_object, local_jid, to_jid, sid, block_size=None):
291 """Launch the stream workflow 291 """Launch the stream workflow
292 292
293 @param stream_object(ifaces.IStreamProducer): stream object to send 293 @param stream_object(ifaces.IStreamProducer): stream object to send
294 @param local_jid(jid.JID): jid to use as local jid 294 @param local_jid(jid.JID): jid to use as local jid
295 This is needed for client which can be addressed with a different jid than 295 This is needed for client which can be addressed with a different jid than
297 client.jid would be file.example.net) 297 client.jid would be file.example.net)
298 @param to_jid(jid.JID): JID of the recipient 298 @param to_jid(jid.JID): JID of the recipient
299 @param sid(unicode): Stream session id 299 @param sid(unicode): Stream session id
300 @param block_size(int, None): size of the block (or None for default) 300 @param block_size(int, None): size of the block (or None for default)
301 """ 301 """
302 session_data = self._createSession(client, stream_object, local_jid, to_jid, sid) 302 session_data = self._create_session(client, stream_object, local_jid, to_jid, sid)
303 303
304 if block_size is None: 304 if block_size is None:
305 block_size = XEP_0047.BLOCK_SIZE 305 block_size = XEP_0047.BLOCK_SIZE
306 assert block_size <= 65535 306 assert block_size <= 65535
307 session_data["block_size"] = block_size 307 session_data["block_size"] = block_size
313 open_elt["block-size"] = str(block_size) 313 open_elt["block-size"] = str(block_size)
314 open_elt["sid"] = sid 314 open_elt["sid"] = sid
315 open_elt["stanza"] = "iq" # TODO: manage <message> stanza ? 315 open_elt["stanza"] = "iq" # TODO: manage <message> stanza ?
316 args = [session_data, client] 316 args = [session_data, client]
317 d = iq_elt.send() 317 d = iq_elt.send()
318 d.addCallbacks(self._IQDataStreamCb, self._IQDataStreamEb, args, None, args) 318 d.addCallbacks(self._iq_data_stream_cb, self._iq_data_stream_eb, args, None, args)
319 return session_data[DEFER_KEY] 319 return session_data[DEFER_KEY]
320 320
321 def _IQDataStreamCb(self, iq_elt, session_data, client): 321 def _iq_data_stream_cb(self, iq_elt, session_data, client):
322 """Called during the whole data streaming 322 """Called during the whole data streaming
323 323
324 @param iq_elt(domish.Element): iq result 324 @param iq_elt(domish.Element): iq result
325 @param session_data(dict): data of this streaming session 325 @param session_data(dict): data of this streaming session
326 @param client: %(doc_client)s 326 @param client: %(doc_client)s
338 data_elt["seq"] = str(seq) 338 data_elt["seq"] = str(seq)
339 data_elt["sid"] = session_data["id"] 339 data_elt["sid"] = session_data["id"]
340 data_elt.addContent(base64.b64encode(buffer_).decode()) 340 data_elt.addContent(base64.b64encode(buffer_).decode())
341 args = [session_data, client] 341 args = [session_data, client]
342 d = next_iq_elt.send() 342 d = next_iq_elt.send()
343 d.addCallbacks(self._IQDataStreamCb, self._IQDataStreamEb, args, None, args) 343 d.addCallbacks(self._iq_data_stream_cb, self._iq_data_stream_eb, args, None, args)
344 else: 344 else:
345 self.terminateStream(session_data, client) 345 self.terminate_stream(session_data, client)
346 346
347 def _IQDataStreamEb(self, failure, session_data, client): 347 def _iq_data_stream_eb(self, failure, session_data, client):
348 if failure.check(error.StanzaError): 348 if failure.check(error.StanzaError):
349 log.warning("IBB transfer failed: {}".format(failure.value)) 349 log.warning("IBB transfer failed: {}".format(failure.value))
350 else: 350 else:
351 log.error("IBB transfer failed: {}".format(failure.value)) 351 log.error("IBB transfer failed: {}".format(failure.value))
352 self.terminateStream(session_data, client, "IQ_ERROR") 352 self.terminate_stream(session_data, client, "IQ_ERROR")
353 353
354 def terminateStream(self, session_data, client, failure_reason=None): 354 def terminate_stream(self, session_data, client, failure_reason=None):
355 """Terminate the stream session 355 """Terminate the stream session
356 356
357 @param session_data(dict): data of this streaming session 357 @param session_data(dict): data of this streaming session
358 @param client: %(doc_client)s 358 @param client: %(doc_client)s
359 @param failure_reason(unicode, None): reason of the failure, or None if steam was successful 359 @param failure_reason(unicode, None): reason of the failure, or None if steam was successful
362 iq_elt["from"] = session_data["local_jid"].full() 362 iq_elt["from"] = session_data["local_jid"].full()
363 iq_elt["to"] = session_data["to"].full() 363 iq_elt["to"] = session_data["to"].full()
364 close_elt = iq_elt.addElement((NS_IBB, "close")) 364 close_elt = iq_elt.addElement((NS_IBB, "close"))
365 close_elt["sid"] = session_data["id"] 365 close_elt["sid"] = session_data["id"]
366 iq_elt.send() 366 iq_elt.send()
367 self._killSession(session_data["id"], client, failure_reason) 367 self._kill_session(session_data["id"], client, failure_reason)
368 368
369 369
370 @implementer(iwokkel.IDisco) 370 @implementer(iwokkel.IDisco)
371 class XEP_0047_handler(XMPPHandler): 371 class XEP_0047_handler(XMPPHandler):
372 372
373 def __init__(self, parent): 373 def __init__(self, parent):
374 self.plugin_parent = parent 374 self.plugin_parent = parent
375 375
376 def connectionInitialized(self): 376 def connectionInitialized(self):
377 self.xmlstream.addObserver( 377 self.xmlstream.addObserver(
378 IBB_OPEN, self.plugin_parent._onIBBOpen, client=self.parent 378 IBB_OPEN, self.plugin_parent._on_ibb_open, client=self.parent
379 ) 379 )
380 380
381 def getDiscoInfo(self, requestor, target, nodeIdentifier=""): 381 def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
382 return [disco.DiscoFeature(NS_IBB)] 382 return [disco.DiscoFeature(NS_IBB)]
383 383