comparison sat/plugins/plugin_exp_jingle_stream.py @ 4037:524856bd7b19

massive refactoring to switch from camelCase to snake_case: historically, Libervia (SàT before) was using camelCase as allowed by PEP8 when using a pre-PEP8 code, to use the same coding style as in Twisted. However, snake_case is more readable and it's better to follow PEP8 best practices, so it has been decided to move on full snake_case. Because Libervia has a huge codebase, this ended with a ugly mix of camelCase and snake_case. To fix that, this patch does a big refactoring by renaming every function and method (including bridge) that are not coming from Twisted or Wokkel, to use fully snake_case. This is a massive change, and may result in some bugs.
author Goffi <goffi@goffi.org>
date Sat, 08 Apr 2023 13:54:42 +0200
parents be6d91572633
children 877145b4ba01
comparison
equal deleted inserted replaced
4036:c4464d7ae97b 4037:524856bd7b19
58 58
59 class StreamProtocol(protocol.Protocol): 59 class StreamProtocol(protocol.Protocol):
60 def __init__(self): 60 def __init__(self):
61 self.pause = False 61 self.pause = False
62 62
63 def setPause(self, paused): 63 def set_pause(self, paused):
64 # in Python 2.x, Twisted classes are old style 64 # in Python 2.x, Twisted classes are old style
65 # so we can use property and setter 65 # so we can use property and setter
66 if paused: 66 if paused:
67 if not self.pause: 67 if not self.pause:
68 self.transport.pauseProducing() 68 self.transport.pauseProducing()
76 self.transport.loseConnection() 76 self.transport.loseConnection()
77 77
78 def connectionMade(self): 78 def connectionMade(self):
79 if self.factory.client_conn is not None: 79 if self.factory.client_conn is not None:
80 self.transport.loseConnection() 80 self.transport.loseConnection()
81 self.factory.setClientConn(self) 81 self.factory.set_client_conn(self)
82 82
83 def dataReceived(self, data): 83 def dataReceived(self, data):
84 self.factory.writeToConsumer(data) 84 self.factory.write_to_consumer(data)
85 85
86 def sendData(self, data): 86 def sendData(self, data):
87 self.transport.write(data) 87 self.transport.write(data)
88 88
89 def connectionLost(self, reason): 89 def connectionLost(self, reason):
90 if self.factory.client_conn != self: 90 if self.factory.client_conn != self:
91 # only the first connected client_conn is relevant 91 # only the first connected client_conn is relevant
92 return 92 return
93 93
94 if reason.type == error.ConnectionDone: 94 if reason.type == error.ConnectionDone:
95 self.factory.streamFinished() 95 self.factory.stream_finished()
96 else: 96 else:
97 self.factory.streamFailed(reason) 97 self.factory.stream_failed(reason)
98 98
99 99
100 @interface.implementer(stream.IStreamProducer) 100 @interface.implementer(stream.IStreamProducer)
101 @interface.implementer(interfaces.IPushProducer) 101 @interface.implementer(interfaces.IPushProducer)
102 @interface.implementer(interfaces.IConsumer) 102 @interface.implementer(interfaces.IConsumer)
107 deferred = None 107 deferred = None
108 108
109 def __init__(self): 109 def __init__(self):
110 self.client_conn = None 110 self.client_conn = None
111 111
112 def setClientConn(self, stream_protocol): 112 def set_client_conn(self, stream_protocol):
113 # in Python 2.x, Twisted classes are old style 113 # in Python 2.x, Twisted classes are old style
114 # so we can use property and setter 114 # so we can use property and setter
115 assert self.client_conn is None 115 assert self.client_conn is None
116 self.client_conn = stream_protocol 116 self.client_conn = stream_protocol
117 if self.consumer is None: 117 if self.consumer is None:
118 self.client_conn.setPause(True) 118 self.client_conn.set_pause(True)
119 119
120 def startStream(self, consumer): 120 def start_stream(self, consumer):
121 if self.consumer is not None: 121 if self.consumer is not None:
122 raise exceptions.InternalError( 122 raise exceptions.InternalError(
123 _("stream can't be used with multiple consumers") 123 _("stream can't be used with multiple consumers")
124 ) 124 )
125 assert self.deferred is None 125 assert self.deferred is None
126 self.consumer = consumer 126 self.consumer = consumer
127 consumer.registerProducer(self, True) 127 consumer.registerProducer(self, True)
128 self.deferred = defer.Deferred() 128 self.deferred = defer.Deferred()
129 if self.client_conn is not None: 129 if self.client_conn is not None:
130 self.client_conn.setPause(False) 130 self.client_conn.set_pause(False)
131 return self.deferred 131 return self.deferred
132 132
133 def streamFinished(self): 133 def stream_finished(self):
134 self.client_conn = None 134 self.client_conn = None
135 if self.consumer: 135 if self.consumer:
136 self.consumer.unregisterProducer() 136 self.consumer.unregisterProducer()
137 self.port_listening.stopListening() 137 self.port_listening.stopListening()
138 self.deferred.callback(None) 138 self.deferred.callback(None)
139 139
140 def streamFailed(self, failure_): 140 def stream_failed(self, failure_):
141 self.client_conn = None 141 self.client_conn = None
142 if self.consumer: 142 if self.consumer:
143 self.consumer.unregisterProducer() 143 self.consumer.unregisterProducer()
144 self.port_listening.stopListening() 144 self.port_listening.stopListening()
145 self.deferred.errback(failure_) 145 self.deferred.errback(failure_)
146 elif self.producer: 146 elif self.producer:
147 self.producer.stopProducing() 147 self.producer.stopProducing()
148 148
149 def stopStream(self): 149 def stop_stream(self):
150 if self.client_conn is not None: 150 if self.client_conn is not None:
151 self.client_conn.disconnect() 151 self.client_conn.disconnect()
152 152
153 def registerProducer(self, producer, streaming): 153 def registerProducer(self, producer, streaming):
154 self.producer = producer 154 self.producer = producer
155 155
156 def pauseProducing(self): 156 def pauseProducing(self):
157 self.client_conn.setPause(True) 157 self.client_conn.set_pause(True)
158 158
159 def resumeProducing(self): 159 def resumeProducing(self):
160 self.client_conn.setPause(False) 160 self.client_conn.set_pause(False)
161 161
162 def stopProducing(self): 162 def stopProducing(self):
163 if self.client_conn: 163 if self.client_conn:
164 self.client_conn.disconnect() 164 self.client_conn.disconnect()
165 165
167 try: 167 try:
168 self.client_conn.sendData(data) 168 self.client_conn.sendData(data)
169 except AttributeError: 169 except AttributeError:
170 log.warning(_("No client connected, can't send data")) 170 log.warning(_("No client connected, can't send data"))
171 171
172 def writeToConsumer(self, data): 172 def write_to_consumer(self, data):
173 self.consumer.write(data) 173 self.consumer.write(data)
174 174
175 175
176 class JingleStream(object): 176 class JingleStream(object):
177 """This non standard jingle application send byte stream""" 177 """This non standard jingle application send byte stream"""
178 178
179 def __init__(self, host): 179 def __init__(self, host):
180 log.info(_("Plugin Stream initialization")) 180 log.info(_("Plugin Stream initialization"))
181 self.host = host 181 self.host = host
182 self._j = host.plugins["XEP-0166"] # shortcut to access jingle 182 self._j = host.plugins["XEP-0166"] # shortcut to access jingle
183 self._j.registerApplication(NS_STREAM, self) 183 self._j.register_application(NS_STREAM, self)
184 host.bridge.addMethod( 184 host.bridge.add_method(
185 "streamOut", 185 "stream_out",
186 ".plugin", 186 ".plugin",
187 in_sign="ss", 187 in_sign="ss",
188 out_sign="s", 188 out_sign="s",
189 method=self._streamOut, 189 method=self._stream_out,
190 async_=True, 190 async_=True,
191 ) 191 )
192 192
193 # jingle callbacks 193 # jingle callbacks
194 194
195 def _streamOut(self, to_jid_s, profile_key): 195 def _stream_out(self, to_jid_s, profile_key):
196 client = self.host.getClient(profile_key) 196 client = self.host.get_client(profile_key)
197 return defer.ensureDeferred(self.streamOut(client, jid.JID(to_jid_s))) 197 return defer.ensureDeferred(self.stream_out(client, jid.JID(to_jid_s)))
198 198
199 async def streamOut(self, client, to_jid): 199 async def stream_out(self, client, to_jid):
200 """send a stream 200 """send a stream
201 201
202 @param peer_jid(jid.JID): recipient 202 @param peer_jid(jid.JID): recipient
203 @return: an unique id to identify the transfer 203 @return: an unique id to identify the transfer
204 """ 204 """
228 } 228 }
229 ], 229 ],
230 )) 230 ))
231 return str(port) 231 return str(port)
232 232
233 def jingleSessionInit(self, client, session, content_name, stream_object): 233 def jingle_session_init(self, client, session, content_name, stream_object):
234 content_data = session["contents"][content_name] 234 content_data = session["contents"][content_name]
235 application_data = content_data["application_data"] 235 application_data = content_data["application_data"]
236 assert "stream_object" not in application_data 236 assert "stream_object" not in application_data
237 application_data["stream_object"] = stream_object 237 application_data["stream_object"] = stream_object
238 desc_elt = domish.Element((NS_STREAM, "description")) 238 desc_elt = domish.Element((NS_STREAM, "description"))
239 return desc_elt 239 return desc_elt
240 240
241 @defer.inlineCallbacks 241 @defer.inlineCallbacks
242 def jingleRequestConfirmation(self, client, action, session, content_name, desc_elt): 242 def jingle_request_confirmation(self, client, action, session, content_name, desc_elt):
243 """This method request confirmation for a jingle session""" 243 """This method request confirmation for a jingle session"""
244 content_data = session["contents"][content_name] 244 content_data = session["contents"][content_name]
245 if content_data["senders"] not in ( 245 if content_data["senders"] not in (
246 self._j.ROLE_INITIATOR, 246 self._j.ROLE_INITIATOR,
247 self._j.ROLE_RESPONDER, 247 self._j.ROLE_RESPONDER,
248 ): 248 ):
249 log.warning("Bad sender, assuming initiator") 249 log.warning("Bad sender, assuming initiator")
250 content_data["senders"] = self._j.ROLE_INITIATOR 250 content_data["senders"] = self._j.ROLE_INITIATOR
251 251
252 confirm_data = yield xml_tools.deferDialog( 252 confirm_data = yield xml_tools.defer_dialog(
253 self.host, 253 self.host,
254 _(CONFIRM).format(peer=session["peer_jid"].full()), 254 _(CONFIRM).format(peer=session["peer_jid"].full()),
255 _(CONFIRM_TITLE), 255 _(CONFIRM_TITLE),
256 type_=C.XMLUI_DIALOG_CONFIRM, 256 type_=C.XMLUI_DIALOG_CONFIRM,
257 action_extra={ 257 action_extra={
272 factory = StreamFactory() 272 factory = StreamFactory()
273 yield endpoint.connect(factory) 273 yield endpoint.connect(factory)
274 content_data["stream_object"] = factory 274 content_data["stream_object"] = factory
275 finished_d = content_data["finished_d"] = defer.Deferred() 275 finished_d = content_data["finished_d"] = defer.Deferred()
276 args = [client, session, content_name, content_data] 276 args = [client, session, content_name, content_data]
277 finished_d.addCallbacks(self._finishedCb, self._finishedEb, args, None, args) 277 finished_d.addCallbacks(self._finished_cb, self._finished_eb, args, None, args)
278 defer.returnValue(True) 278 defer.returnValue(True)
279 279
280 def jingleHandler(self, client, action, session, content_name, desc_elt): 280 def jingle_handler(self, client, action, session, content_name, desc_elt):
281 content_data = session["contents"][content_name] 281 content_data = session["contents"][content_name]
282 application_data = content_data["application_data"] 282 application_data = content_data["application_data"]
283 if action in (self._j.A_ACCEPTED_ACK, self._j.A_SESSION_INITIATE): 283 if action in (self._j.A_ACCEPTED_ACK, self._j.A_SESSION_INITIATE):
284 pass 284 pass
285 elif action == self._j.A_SESSION_ACCEPT: 285 elif action == self._j.A_SESSION_ACCEPT:
286 assert not "stream_object" in content_data 286 assert not "stream_object" in content_data
287 content_data["stream_object"] = application_data["stream_object"] 287 content_data["stream_object"] = application_data["stream_object"]
288 finished_d = content_data["finished_d"] = defer.Deferred() 288 finished_d = content_data["finished_d"] = defer.Deferred()
289 args = [client, session, content_name, content_data] 289 args = [client, session, content_name, content_data]
290 finished_d.addCallbacks(self._finishedCb, self._finishedEb, args, None, args) 290 finished_d.addCallbacks(self._finished_cb, self._finished_eb, args, None, args)
291 else: 291 else:
292 log.warning("FIXME: unmanaged action {}".format(action)) 292 log.warning("FIXME: unmanaged action {}".format(action))
293 return desc_elt 293 return desc_elt
294 294
295 def _finishedCb(self, __, client, session, content_name, content_data): 295 def _finished_cb(self, __, client, session, content_name, content_data):
296 log.info("Pipe transfer completed") 296 log.info("Pipe transfer completed")
297 self._j.contentTerminate(client, session, content_name) 297 self._j.content_terminate(client, session, content_name)
298 content_data["stream_object"].stopStream() 298 content_data["stream_object"].stop_stream()
299 299
300 def _finishedEb(self, failure, client, session, content_name, content_data): 300 def _finished_eb(self, failure, client, session, content_name, content_data):
301 log.warning("Error while streaming pipe: {}".format(failure)) 301 log.warning("Error while streaming pipe: {}".format(failure))
302 self._j.contentTerminate( 302 self._j.content_terminate(
303 client, session, content_name, reason=self._j.REASON_FAILED_TRANSPORT 303 client, session, content_name, reason=self._j.REASON_FAILED_TRANSPORT
304 ) 304 )
305 content_data["stream_object"].stopStream() 305 content_data["stream_object"].stop_stream()