Mercurial > libervia-backend
comparison sat/plugins/plugin_exp_jingle_stream.py @ 4037:524856bd7b19
massive refactoring to switch from camelCase to snake_case:
historically, Libervia (SàT before) was using camelCase as allowed by PEP8 when using a
pre-PEP8 code, to use the same coding style as in Twisted.
However, snake_case is more readable and it's better to follow PEP8 best practices, so it
has been decided to move on full snake_case. Because Libervia has a huge codebase, this
ended with a ugly mix of camelCase and snake_case.
To fix that, this patch does a big refactoring by renaming every function and method
(including bridge) that are not coming from Twisted or Wokkel, to use fully snake_case.
This is a massive change, and may result in some bugs.
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 08 Apr 2023 13:54:42 +0200 |
parents | be6d91572633 |
children | 877145b4ba01 |
comparison
equal
deleted
inserted
replaced
4036:c4464d7ae97b | 4037:524856bd7b19 |
---|---|
58 | 58 |
59 class StreamProtocol(protocol.Protocol): | 59 class StreamProtocol(protocol.Protocol): |
60 def __init__(self): | 60 def __init__(self): |
61 self.pause = False | 61 self.pause = False |
62 | 62 |
63 def setPause(self, paused): | 63 def set_pause(self, paused): |
64 # in Python 2.x, Twisted classes are old style | 64 # in Python 2.x, Twisted classes are old style |
65 # so we can use property and setter | 65 # so we can use property and setter |
66 if paused: | 66 if paused: |
67 if not self.pause: | 67 if not self.pause: |
68 self.transport.pauseProducing() | 68 self.transport.pauseProducing() |
76 self.transport.loseConnection() | 76 self.transport.loseConnection() |
77 | 77 |
78 def connectionMade(self): | 78 def connectionMade(self): |
79 if self.factory.client_conn is not None: | 79 if self.factory.client_conn is not None: |
80 self.transport.loseConnection() | 80 self.transport.loseConnection() |
81 self.factory.setClientConn(self) | 81 self.factory.set_client_conn(self) |
82 | 82 |
83 def dataReceived(self, data): | 83 def dataReceived(self, data): |
84 self.factory.writeToConsumer(data) | 84 self.factory.write_to_consumer(data) |
85 | 85 |
86 def sendData(self, data): | 86 def sendData(self, data): |
87 self.transport.write(data) | 87 self.transport.write(data) |
88 | 88 |
89 def connectionLost(self, reason): | 89 def connectionLost(self, reason): |
90 if self.factory.client_conn != self: | 90 if self.factory.client_conn != self: |
91 # only the first connected client_conn is relevant | 91 # only the first connected client_conn is relevant |
92 return | 92 return |
93 | 93 |
94 if reason.type == error.ConnectionDone: | 94 if reason.type == error.ConnectionDone: |
95 self.factory.streamFinished() | 95 self.factory.stream_finished() |
96 else: | 96 else: |
97 self.factory.streamFailed(reason) | 97 self.factory.stream_failed(reason) |
98 | 98 |
99 | 99 |
100 @interface.implementer(stream.IStreamProducer) | 100 @interface.implementer(stream.IStreamProducer) |
101 @interface.implementer(interfaces.IPushProducer) | 101 @interface.implementer(interfaces.IPushProducer) |
102 @interface.implementer(interfaces.IConsumer) | 102 @interface.implementer(interfaces.IConsumer) |
107 deferred = None | 107 deferred = None |
108 | 108 |
109 def __init__(self): | 109 def __init__(self): |
110 self.client_conn = None | 110 self.client_conn = None |
111 | 111 |
112 def setClientConn(self, stream_protocol): | 112 def set_client_conn(self, stream_protocol): |
113 # in Python 2.x, Twisted classes are old style | 113 # in Python 2.x, Twisted classes are old style |
114 # so we can use property and setter | 114 # so we can use property and setter |
115 assert self.client_conn is None | 115 assert self.client_conn is None |
116 self.client_conn = stream_protocol | 116 self.client_conn = stream_protocol |
117 if self.consumer is None: | 117 if self.consumer is None: |
118 self.client_conn.setPause(True) | 118 self.client_conn.set_pause(True) |
119 | 119 |
120 def startStream(self, consumer): | 120 def start_stream(self, consumer): |
121 if self.consumer is not None: | 121 if self.consumer is not None: |
122 raise exceptions.InternalError( | 122 raise exceptions.InternalError( |
123 _("stream can't be used with multiple consumers") | 123 _("stream can't be used with multiple consumers") |
124 ) | 124 ) |
125 assert self.deferred is None | 125 assert self.deferred is None |
126 self.consumer = consumer | 126 self.consumer = consumer |
127 consumer.registerProducer(self, True) | 127 consumer.registerProducer(self, True) |
128 self.deferred = defer.Deferred() | 128 self.deferred = defer.Deferred() |
129 if self.client_conn is not None: | 129 if self.client_conn is not None: |
130 self.client_conn.setPause(False) | 130 self.client_conn.set_pause(False) |
131 return self.deferred | 131 return self.deferred |
132 | 132 |
133 def streamFinished(self): | 133 def stream_finished(self): |
134 self.client_conn = None | 134 self.client_conn = None |
135 if self.consumer: | 135 if self.consumer: |
136 self.consumer.unregisterProducer() | 136 self.consumer.unregisterProducer() |
137 self.port_listening.stopListening() | 137 self.port_listening.stopListening() |
138 self.deferred.callback(None) | 138 self.deferred.callback(None) |
139 | 139 |
140 def streamFailed(self, failure_): | 140 def stream_failed(self, failure_): |
141 self.client_conn = None | 141 self.client_conn = None |
142 if self.consumer: | 142 if self.consumer: |
143 self.consumer.unregisterProducer() | 143 self.consumer.unregisterProducer() |
144 self.port_listening.stopListening() | 144 self.port_listening.stopListening() |
145 self.deferred.errback(failure_) | 145 self.deferred.errback(failure_) |
146 elif self.producer: | 146 elif self.producer: |
147 self.producer.stopProducing() | 147 self.producer.stopProducing() |
148 | 148 |
149 def stopStream(self): | 149 def stop_stream(self): |
150 if self.client_conn is not None: | 150 if self.client_conn is not None: |
151 self.client_conn.disconnect() | 151 self.client_conn.disconnect() |
152 | 152 |
153 def registerProducer(self, producer, streaming): | 153 def registerProducer(self, producer, streaming): |
154 self.producer = producer | 154 self.producer = producer |
155 | 155 |
156 def pauseProducing(self): | 156 def pauseProducing(self): |
157 self.client_conn.setPause(True) | 157 self.client_conn.set_pause(True) |
158 | 158 |
159 def resumeProducing(self): | 159 def resumeProducing(self): |
160 self.client_conn.setPause(False) | 160 self.client_conn.set_pause(False) |
161 | 161 |
162 def stopProducing(self): | 162 def stopProducing(self): |
163 if self.client_conn: | 163 if self.client_conn: |
164 self.client_conn.disconnect() | 164 self.client_conn.disconnect() |
165 | 165 |
167 try: | 167 try: |
168 self.client_conn.sendData(data) | 168 self.client_conn.sendData(data) |
169 except AttributeError: | 169 except AttributeError: |
170 log.warning(_("No client connected, can't send data")) | 170 log.warning(_("No client connected, can't send data")) |
171 | 171 |
172 def writeToConsumer(self, data): | 172 def write_to_consumer(self, data): |
173 self.consumer.write(data) | 173 self.consumer.write(data) |
174 | 174 |
175 | 175 |
176 class JingleStream(object): | 176 class JingleStream(object): |
177 """This non standard jingle application send byte stream""" | 177 """This non standard jingle application send byte stream""" |
178 | 178 |
179 def __init__(self, host): | 179 def __init__(self, host): |
180 log.info(_("Plugin Stream initialization")) | 180 log.info(_("Plugin Stream initialization")) |
181 self.host = host | 181 self.host = host |
182 self._j = host.plugins["XEP-0166"] # shortcut to access jingle | 182 self._j = host.plugins["XEP-0166"] # shortcut to access jingle |
183 self._j.registerApplication(NS_STREAM, self) | 183 self._j.register_application(NS_STREAM, self) |
184 host.bridge.addMethod( | 184 host.bridge.add_method( |
185 "streamOut", | 185 "stream_out", |
186 ".plugin", | 186 ".plugin", |
187 in_sign="ss", | 187 in_sign="ss", |
188 out_sign="s", | 188 out_sign="s", |
189 method=self._streamOut, | 189 method=self._stream_out, |
190 async_=True, | 190 async_=True, |
191 ) | 191 ) |
192 | 192 |
193 # jingle callbacks | 193 # jingle callbacks |
194 | 194 |
195 def _streamOut(self, to_jid_s, profile_key): | 195 def _stream_out(self, to_jid_s, profile_key): |
196 client = self.host.getClient(profile_key) | 196 client = self.host.get_client(profile_key) |
197 return defer.ensureDeferred(self.streamOut(client, jid.JID(to_jid_s))) | 197 return defer.ensureDeferred(self.stream_out(client, jid.JID(to_jid_s))) |
198 | 198 |
199 async def streamOut(self, client, to_jid): | 199 async def stream_out(self, client, to_jid): |
200 """send a stream | 200 """send a stream |
201 | 201 |
202 @param peer_jid(jid.JID): recipient | 202 @param peer_jid(jid.JID): recipient |
203 @return: an unique id to identify the transfer | 203 @return: an unique id to identify the transfer |
204 """ | 204 """ |
228 } | 228 } |
229 ], | 229 ], |
230 )) | 230 )) |
231 return str(port) | 231 return str(port) |
232 | 232 |
233 def jingleSessionInit(self, client, session, content_name, stream_object): | 233 def jingle_session_init(self, client, session, content_name, stream_object): |
234 content_data = session["contents"][content_name] | 234 content_data = session["contents"][content_name] |
235 application_data = content_data["application_data"] | 235 application_data = content_data["application_data"] |
236 assert "stream_object" not in application_data | 236 assert "stream_object" not in application_data |
237 application_data["stream_object"] = stream_object | 237 application_data["stream_object"] = stream_object |
238 desc_elt = domish.Element((NS_STREAM, "description")) | 238 desc_elt = domish.Element((NS_STREAM, "description")) |
239 return desc_elt | 239 return desc_elt |
240 | 240 |
241 @defer.inlineCallbacks | 241 @defer.inlineCallbacks |
242 def jingleRequestConfirmation(self, client, action, session, content_name, desc_elt): | 242 def jingle_request_confirmation(self, client, action, session, content_name, desc_elt): |
243 """This method request confirmation for a jingle session""" | 243 """This method request confirmation for a jingle session""" |
244 content_data = session["contents"][content_name] | 244 content_data = session["contents"][content_name] |
245 if content_data["senders"] not in ( | 245 if content_data["senders"] not in ( |
246 self._j.ROLE_INITIATOR, | 246 self._j.ROLE_INITIATOR, |
247 self._j.ROLE_RESPONDER, | 247 self._j.ROLE_RESPONDER, |
248 ): | 248 ): |
249 log.warning("Bad sender, assuming initiator") | 249 log.warning("Bad sender, assuming initiator") |
250 content_data["senders"] = self._j.ROLE_INITIATOR | 250 content_data["senders"] = self._j.ROLE_INITIATOR |
251 | 251 |
252 confirm_data = yield xml_tools.deferDialog( | 252 confirm_data = yield xml_tools.defer_dialog( |
253 self.host, | 253 self.host, |
254 _(CONFIRM).format(peer=session["peer_jid"].full()), | 254 _(CONFIRM).format(peer=session["peer_jid"].full()), |
255 _(CONFIRM_TITLE), | 255 _(CONFIRM_TITLE), |
256 type_=C.XMLUI_DIALOG_CONFIRM, | 256 type_=C.XMLUI_DIALOG_CONFIRM, |
257 action_extra={ | 257 action_extra={ |
272 factory = StreamFactory() | 272 factory = StreamFactory() |
273 yield endpoint.connect(factory) | 273 yield endpoint.connect(factory) |
274 content_data["stream_object"] = factory | 274 content_data["stream_object"] = factory |
275 finished_d = content_data["finished_d"] = defer.Deferred() | 275 finished_d = content_data["finished_d"] = defer.Deferred() |
276 args = [client, session, content_name, content_data] | 276 args = [client, session, content_name, content_data] |
277 finished_d.addCallbacks(self._finishedCb, self._finishedEb, args, None, args) | 277 finished_d.addCallbacks(self._finished_cb, self._finished_eb, args, None, args) |
278 defer.returnValue(True) | 278 defer.returnValue(True) |
279 | 279 |
280 def jingleHandler(self, client, action, session, content_name, desc_elt): | 280 def jingle_handler(self, client, action, session, content_name, desc_elt): |
281 content_data = session["contents"][content_name] | 281 content_data = session["contents"][content_name] |
282 application_data = content_data["application_data"] | 282 application_data = content_data["application_data"] |
283 if action in (self._j.A_ACCEPTED_ACK, self._j.A_SESSION_INITIATE): | 283 if action in (self._j.A_ACCEPTED_ACK, self._j.A_SESSION_INITIATE): |
284 pass | 284 pass |
285 elif action == self._j.A_SESSION_ACCEPT: | 285 elif action == self._j.A_SESSION_ACCEPT: |
286 assert not "stream_object" in content_data | 286 assert not "stream_object" in content_data |
287 content_data["stream_object"] = application_data["stream_object"] | 287 content_data["stream_object"] = application_data["stream_object"] |
288 finished_d = content_data["finished_d"] = defer.Deferred() | 288 finished_d = content_data["finished_d"] = defer.Deferred() |
289 args = [client, session, content_name, content_data] | 289 args = [client, session, content_name, content_data] |
290 finished_d.addCallbacks(self._finishedCb, self._finishedEb, args, None, args) | 290 finished_d.addCallbacks(self._finished_cb, self._finished_eb, args, None, args) |
291 else: | 291 else: |
292 log.warning("FIXME: unmanaged action {}".format(action)) | 292 log.warning("FIXME: unmanaged action {}".format(action)) |
293 return desc_elt | 293 return desc_elt |
294 | 294 |
295 def _finishedCb(self, __, client, session, content_name, content_data): | 295 def _finished_cb(self, __, client, session, content_name, content_data): |
296 log.info("Pipe transfer completed") | 296 log.info("Pipe transfer completed") |
297 self._j.contentTerminate(client, session, content_name) | 297 self._j.content_terminate(client, session, content_name) |
298 content_data["stream_object"].stopStream() | 298 content_data["stream_object"].stop_stream() |
299 | 299 |
300 def _finishedEb(self, failure, client, session, content_name, content_data): | 300 def _finished_eb(self, failure, client, session, content_name, content_data): |
301 log.warning("Error while streaming pipe: {}".format(failure)) | 301 log.warning("Error while streaming pipe: {}".format(failure)) |
302 self._j.contentTerminate( | 302 self._j.content_terminate( |
303 client, session, content_name, reason=self._j.REASON_FAILED_TRANSPORT | 303 client, session, content_name, reason=self._j.REASON_FAILED_TRANSPORT |
304 ) | 304 ) |
305 content_data["stream_object"].stopStream() | 305 content_data["stream_object"].stop_stream() |