Mercurial > libervia-backend
comparison sat/plugins/plugin_xep_0047.py @ 4037:524856bd7b19
massive refactoring to switch from camelCase to snake_case:
historically, Libervia (SàT before) was using camelCase as allowed by PEP8 when using a
pre-PEP8 code, to use the same coding style as in Twisted.
However, snake_case is more readable and it's better to follow PEP8 best practices, so it
has been decided to move on full snake_case. Because Libervia has a huge codebase, this
ended with a ugly mix of camelCase and snake_case.
To fix that, this patch does a big refactoring by renaming every function and method
(including bridge) that are not coming from Twisted or Wokkel, to use fully snake_case.
This is a massive change, and may result in some bugs.
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 08 Apr 2023 13:54:42 +0200 |
parents | be6d91572633 |
children |
comparison
equal
deleted
inserted
replaced
4036:c4464d7ae97b | 4037:524856bd7b19 |
---|---|
69 | 69 |
70 def __init__(self, host): | 70 def __init__(self, host): |
71 log.info(_("In-Band Bytestreams plugin initialization")) | 71 log.info(_("In-Band Bytestreams plugin initialization")) |
72 self.host = host | 72 self.host = host |
73 | 73 |
74 def getHandler(self, client): | 74 def get_handler(self, client): |
75 return XEP_0047_handler(self) | 75 return XEP_0047_handler(self) |
76 | 76 |
77 def profileConnected(self, client): | 77 def profile_connected(self, client): |
78 client.xep_0047_current_stream = {} # key: stream_id, value: data(dict) | 78 client.xep_0047_current_stream = {} # key: stream_id, value: data(dict) |
79 | 79 |
80 def _timeOut(self, sid, client): | 80 def _time_out(self, sid, client): |
81 """Delete current_stream id, called after timeout | 81 """Delete current_stream id, called after timeout |
82 | 82 |
83 @param sid(unicode): session id of client.xep_0047_current_stream | 83 @param sid(unicode): session id of client.xep_0047_current_stream |
84 @param client: %(doc_client)s | 84 @param client: %(doc_client)s |
85 """ | 85 """ |
86 log.info( | 86 log.info( |
87 "In-Band Bytestream: TimeOut reached for id {sid} [{profile}]".format( | 87 "In-Band Bytestream: TimeOut reached for id {sid} [{profile}]".format( |
88 sid=sid, profile=client.profile | 88 sid=sid, profile=client.profile |
89 ) | 89 ) |
90 ) | 90 ) |
91 self._killSession(sid, client, "TIMEOUT") | 91 self._kill_session(sid, client, "TIMEOUT") |
92 | 92 |
93 def _killSession(self, sid, client, failure_reason=None): | 93 def _kill_session(self, sid, client, failure_reason=None): |
94 """Delete a current_stream id, clean up associated observers | 94 """Delete a current_stream id, clean up associated observers |
95 | 95 |
96 @param sid(unicode): session id | 96 @param sid(unicode): session id |
97 @param client: %(doc_client)s | 97 @param client: %(doc_client)s |
98 @param failure_reason(None, unicode): if None the session is successful | 98 @param failure_reason(None, unicode): if None the session is successful |
122 if success: | 122 if success: |
123 stream_d.callback(None) | 123 stream_d.callback(None) |
124 else: | 124 else: |
125 stream_d.errback(failure.Failure(exceptions.DataError(failure_reason))) | 125 stream_d.errback(failure.Failure(exceptions.DataError(failure_reason))) |
126 | 126 |
127 def createSession(self, *args, **kwargs): | 127 def create_session(self, *args, **kwargs): |
128 """like [_createSession] but return the session deferred instead of the whole session | 128 """like [_create_session] but return the session deferred instead of the whole session |
129 | 129 |
130 session deferred is fired when transfer is finished | 130 session deferred is fired when transfer is finished |
131 """ | 131 """ |
132 return self._createSession(*args, **kwargs)[DEFER_KEY] | 132 return self._create_session(*args, **kwargs)[DEFER_KEY] |
133 | 133 |
134 def _createSession(self, client, stream_object, local_jid, to_jid, sid): | 134 def _create_session(self, client, stream_object, local_jid, to_jid, sid): |
135 """Called when a bytestream is imminent | 135 """Called when a bytestream is imminent |
136 | 136 |
137 @param stream_object(IConsumer): stream object where data will be written | 137 @param stream_object(IConsumer): stream object where data will be written |
138 @param local_jid(jid.JID): same as [startStream] | 138 @param local_jid(jid.JID): same as [start_stream] |
139 @param to_jid(jid.JId): jid of the other peer | 139 @param to_jid(jid.JId): jid of the other peer |
140 @param sid(unicode): session id | 140 @param sid(unicode): session id |
141 @return (dict): session data | 141 @return (dict): session data |
142 """ | 142 """ |
143 if sid in client.xep_0047_current_stream: | 143 if sid in client.xep_0047_current_stream: |
147 DEFER_KEY: defer.Deferred(), | 147 DEFER_KEY: defer.Deferred(), |
148 "local_jid": local_jid, | 148 "local_jid": local_jid, |
149 "to": to_jid, | 149 "to": to_jid, |
150 "stream_object": stream_object, | 150 "stream_object": stream_object, |
151 "seq": -1, | 151 "seq": -1, |
152 "timer": reactor.callLater(TIMEOUT, self._timeOut, sid, client), | 152 "timer": reactor.callLater(TIMEOUT, self._time_out, sid, client), |
153 } | 153 } |
154 | 154 |
155 return session_data | 155 return session_data |
156 | 156 |
157 def _onIBBOpen(self, iq_elt, client): | 157 def _on_ibb_open(self, iq_elt, client): |
158 """"Called when an IBB <open> element is received | 158 """"Called when an IBB <open> element is received |
159 | 159 |
160 @param iq_elt(domish.Element): the whole <iq> stanza | 160 @param iq_elt(domish.Element): the whole <iq> stanza |
161 """ | 161 """ |
162 log.debug(_("IBB stream opening")) | 162 log.debug(_("IBB stream opening")) |
184 | 184 |
185 # we save the xmlstream, events and observer data to allow observer removal | 185 # we save the xmlstream, events and observer data to allow observer removal |
186 session_data["event_data"] = event_data = ( | 186 session_data["event_data"] = event_data = ( |
187 IBB_MESSAGE_DATA if stanza == "message" else IBB_IQ_DATA | 187 IBB_MESSAGE_DATA if stanza == "message" else IBB_IQ_DATA |
188 ).format(sid) | 188 ).format(sid) |
189 session_data["observer_cb"] = observer_cb = self._onIBBData | 189 session_data["observer_cb"] = observer_cb = self._on_ibb_data |
190 event_close = IBB_CLOSE.format(sid) | 190 event_close = IBB_CLOSE.format(sid) |
191 # we now set the stream observer to look after data packet | 191 # we now set the stream observer to look after data packet |
192 # FIXME: if we never get the events, the observers stay. | 192 # FIXME: if we never get the events, the observers stay. |
193 # would be better to have generic observer and check id once triggered | 193 # would be better to have generic observer and check id once triggered |
194 client.xmlstream.addObserver(event_data, observer_cb, client=client) | 194 client.xmlstream.addObserver(event_data, observer_cb, client=client) |
195 client.xmlstream.addOnetimeObserver(event_close, self._onIBBClose, client=client) | 195 client.xmlstream.addOnetimeObserver(event_close, self._on_ibb_close, client=client) |
196 # finally, we send the accept stanza | 196 # finally, we send the accept stanza |
197 iq_result_elt = xmlstream.toResponse(iq_elt, "result") | 197 iq_result_elt = xmlstream.toResponse(iq_elt, "result") |
198 client.send(iq_result_elt) | 198 client.send(iq_result_elt) |
199 | 199 |
200 def _onIBBClose(self, iq_elt, client): | 200 def _on_ibb_close(self, iq_elt, client): |
201 """"Called when an IBB <close> element is received | 201 """"Called when an IBB <close> element is received |
202 | 202 |
203 @param iq_elt(domish.Element): the whole <iq> stanza | 203 @param iq_elt(domish.Element): the whole <iq> stanza |
204 """ | 204 """ |
205 iq_elt.handled = True | 205 iq_elt.handled = True |
208 # XXX: this observer is only triggered on valid sid, so we don't need to check it | 208 # XXX: this observer is only triggered on valid sid, so we don't need to check it |
209 sid = close_elt["sid"] | 209 sid = close_elt["sid"] |
210 | 210 |
211 iq_result_elt = xmlstream.toResponse(iq_elt, "result") | 211 iq_result_elt = xmlstream.toResponse(iq_elt, "result") |
212 client.send(iq_result_elt) | 212 client.send(iq_result_elt) |
213 self._killSession(sid, client) | 213 self._kill_session(sid, client) |
214 | 214 |
215 def _onIBBData(self, element, client): | 215 def _on_ibb_data(self, element, client): |
216 """Observer called on <iq> or <message> stanzas with data element | 216 """Observer called on <iq> or <message> stanzas with data element |
217 | 217 |
218 Manage the data elelement (check validity and write to the stream_object) | 218 Manage the data elelement (check validity and write to the stream_object) |
219 @param element(domish.Element): <iq> or <message> stanza | 219 @param element(domish.Element): <iq> or <message> stanza |
220 """ | 220 """ |
245 if int(data_elt.getAttribute("seq", -1)) != session_data["seq"]: | 245 if int(data_elt.getAttribute("seq", -1)) != session_data["seq"]: |
246 log.warning(_("Sequence error")) | 246 log.warning(_("Sequence error")) |
247 if element.name == "iq": | 247 if element.name == "iq": |
248 reason = "not-acceptable" | 248 reason = "not-acceptable" |
249 self._sendError(reason, sid, element, client) | 249 self._sendError(reason, sid, element, client) |
250 self.terminateStream(session_data, client, reason) | 250 self.terminate_stream(session_data, client, reason) |
251 return | 251 return |
252 | 252 |
253 # we reset the timeout: | 253 # we reset the timeout: |
254 session_data["timer"].reset(TIMEOUT) | 254 session_data["timer"].reset(TIMEOUT) |
255 | 255 |
259 except TypeError: | 259 except TypeError: |
260 # The base64 data is invalid | 260 # The base64 data is invalid |
261 log.warning(_("Invalid base64 data")) | 261 log.warning(_("Invalid base64 data")) |
262 if element.name == "iq": | 262 if element.name == "iq": |
263 self._sendError("not-acceptable", sid, element, client) | 263 self._sendError("not-acceptable", sid, element, client) |
264 self.terminateStream(session_data, client, reason) | 264 self.terminate_stream(session_data, client, reason) |
265 return | 265 return |
266 | 266 |
267 # we can now ack success | 267 # we can now ack success |
268 if element.name == "iq": | 268 if element.name == "iq": |
269 iq_result_elt = xmlstream.toResponse(element, "result") | 269 iq_result_elt = xmlstream.toResponse(element, "result") |
282 "Error while managing in-band bytestream session, cancelling: {}".format( | 282 "Error while managing in-band bytestream session, cancelling: {}".format( |
283 error_condition | 283 error_condition |
284 ) | 284 ) |
285 ) | 285 ) |
286 if sid is not None: | 286 if sid is not None: |
287 self._killSession(sid, client, error_condition) | 287 self._kill_session(sid, client, error_condition) |
288 client.send(iq_elt) | 288 client.send(iq_elt) |
289 | 289 |
290 def startStream(self, client, stream_object, local_jid, to_jid, sid, block_size=None): | 290 def start_stream(self, client, stream_object, local_jid, to_jid, sid, block_size=None): |
291 """Launch the stream workflow | 291 """Launch the stream workflow |
292 | 292 |
293 @param stream_object(ifaces.IStreamProducer): stream object to send | 293 @param stream_object(ifaces.IStreamProducer): stream object to send |
294 @param local_jid(jid.JID): jid to use as local jid | 294 @param local_jid(jid.JID): jid to use as local jid |
295 This is needed for client which can be addressed with a different jid than | 295 This is needed for client which can be addressed with a different jid than |
297 client.jid would be file.example.net) | 297 client.jid would be file.example.net) |
298 @param to_jid(jid.JID): JID of the recipient | 298 @param to_jid(jid.JID): JID of the recipient |
299 @param sid(unicode): Stream session id | 299 @param sid(unicode): Stream session id |
300 @param block_size(int, None): size of the block (or None for default) | 300 @param block_size(int, None): size of the block (or None for default) |
301 """ | 301 """ |
302 session_data = self._createSession(client, stream_object, local_jid, to_jid, sid) | 302 session_data = self._create_session(client, stream_object, local_jid, to_jid, sid) |
303 | 303 |
304 if block_size is None: | 304 if block_size is None: |
305 block_size = XEP_0047.BLOCK_SIZE | 305 block_size = XEP_0047.BLOCK_SIZE |
306 assert block_size <= 65535 | 306 assert block_size <= 65535 |
307 session_data["block_size"] = block_size | 307 session_data["block_size"] = block_size |
313 open_elt["block-size"] = str(block_size) | 313 open_elt["block-size"] = str(block_size) |
314 open_elt["sid"] = sid | 314 open_elt["sid"] = sid |
315 open_elt["stanza"] = "iq" # TODO: manage <message> stanza ? | 315 open_elt["stanza"] = "iq" # TODO: manage <message> stanza ? |
316 args = [session_data, client] | 316 args = [session_data, client] |
317 d = iq_elt.send() | 317 d = iq_elt.send() |
318 d.addCallbacks(self._IQDataStreamCb, self._IQDataStreamEb, args, None, args) | 318 d.addCallbacks(self._iq_data_stream_cb, self._iq_data_stream_eb, args, None, args) |
319 return session_data[DEFER_KEY] | 319 return session_data[DEFER_KEY] |
320 | 320 |
321 def _IQDataStreamCb(self, iq_elt, session_data, client): | 321 def _iq_data_stream_cb(self, iq_elt, session_data, client): |
322 """Called during the whole data streaming | 322 """Called during the whole data streaming |
323 | 323 |
324 @param iq_elt(domish.Element): iq result | 324 @param iq_elt(domish.Element): iq result |
325 @param session_data(dict): data of this streaming session | 325 @param session_data(dict): data of this streaming session |
326 @param client: %(doc_client)s | 326 @param client: %(doc_client)s |
338 data_elt["seq"] = str(seq) | 338 data_elt["seq"] = str(seq) |
339 data_elt["sid"] = session_data["id"] | 339 data_elt["sid"] = session_data["id"] |
340 data_elt.addContent(base64.b64encode(buffer_).decode()) | 340 data_elt.addContent(base64.b64encode(buffer_).decode()) |
341 args = [session_data, client] | 341 args = [session_data, client] |
342 d = next_iq_elt.send() | 342 d = next_iq_elt.send() |
343 d.addCallbacks(self._IQDataStreamCb, self._IQDataStreamEb, args, None, args) | 343 d.addCallbacks(self._iq_data_stream_cb, self._iq_data_stream_eb, args, None, args) |
344 else: | 344 else: |
345 self.terminateStream(session_data, client) | 345 self.terminate_stream(session_data, client) |
346 | 346 |
347 def _IQDataStreamEb(self, failure, session_data, client): | 347 def _iq_data_stream_eb(self, failure, session_data, client): |
348 if failure.check(error.StanzaError): | 348 if failure.check(error.StanzaError): |
349 log.warning("IBB transfer failed: {}".format(failure.value)) | 349 log.warning("IBB transfer failed: {}".format(failure.value)) |
350 else: | 350 else: |
351 log.error("IBB transfer failed: {}".format(failure.value)) | 351 log.error("IBB transfer failed: {}".format(failure.value)) |
352 self.terminateStream(session_data, client, "IQ_ERROR") | 352 self.terminate_stream(session_data, client, "IQ_ERROR") |
353 | 353 |
354 def terminateStream(self, session_data, client, failure_reason=None): | 354 def terminate_stream(self, session_data, client, failure_reason=None): |
355 """Terminate the stream session | 355 """Terminate the stream session |
356 | 356 |
357 @param session_data(dict): data of this streaming session | 357 @param session_data(dict): data of this streaming session |
358 @param client: %(doc_client)s | 358 @param client: %(doc_client)s |
359 @param failure_reason(unicode, None): reason of the failure, or None if steam was successful | 359 @param failure_reason(unicode, None): reason of the failure, or None if steam was successful |
362 iq_elt["from"] = session_data["local_jid"].full() | 362 iq_elt["from"] = session_data["local_jid"].full() |
363 iq_elt["to"] = session_data["to"].full() | 363 iq_elt["to"] = session_data["to"].full() |
364 close_elt = iq_elt.addElement((NS_IBB, "close")) | 364 close_elt = iq_elt.addElement((NS_IBB, "close")) |
365 close_elt["sid"] = session_data["id"] | 365 close_elt["sid"] = session_data["id"] |
366 iq_elt.send() | 366 iq_elt.send() |
367 self._killSession(session_data["id"], client, failure_reason) | 367 self._kill_session(session_data["id"], client, failure_reason) |
368 | 368 |
369 | 369 |
370 @implementer(iwokkel.IDisco) | 370 @implementer(iwokkel.IDisco) |
371 class XEP_0047_handler(XMPPHandler): | 371 class XEP_0047_handler(XMPPHandler): |
372 | 372 |
373 def __init__(self, parent): | 373 def __init__(self, parent): |
374 self.plugin_parent = parent | 374 self.plugin_parent = parent |
375 | 375 |
376 def connectionInitialized(self): | 376 def connectionInitialized(self): |
377 self.xmlstream.addObserver( | 377 self.xmlstream.addObserver( |
378 IBB_OPEN, self.plugin_parent._onIBBOpen, client=self.parent | 378 IBB_OPEN, self.plugin_parent._on_ibb_open, client=self.parent |
379 ) | 379 ) |
380 | 380 |
381 def getDiscoInfo(self, requestor, target, nodeIdentifier=""): | 381 def getDiscoInfo(self, requestor, target, nodeIdentifier=""): |
382 return [disco.DiscoFeature(NS_IBB)] | 382 return [disco.DiscoFeature(NS_IBB)] |
383 | 383 |