Mercurial > libervia-backend
comparison libervia/backend/plugins/plugin_exp_jingle_stream.py @ 4071:4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 11:49:51 +0200 |
parents | sat/plugins/plugin_exp_jingle_stream.py@877145b4ba01 |
children | 0d7bb4df2343 |
comparison
equal
deleted
inserted
replaced
4070:d10748475025 | 4071:4b842c1fb686 |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 | |
4 # SAT plugin for managing pipes (experimental) | |
5 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) | |
6 | |
7 # This program is free software: you can redistribute it and/or modify | |
8 # it under the terms of the GNU Affero General Public License as published by | |
9 # the Free Software Foundation, either version 3 of the License, or | |
10 # (at your option) any later version. | |
11 | |
12 # This program is distributed in the hope that it will be useful, | |
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
15 # GNU Affero General Public License for more details. | |
16 | |
17 # You should have received a copy of the GNU Affero General Public License | |
18 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
19 | |
20 import errno | |
21 from zope import interface | |
22 from twisted.words.xish import domish | |
23 from twisted.words.protocols.jabber import jid | |
24 from twisted.internet import defer | |
25 from twisted.internet import protocol | |
26 from twisted.internet import endpoints | |
27 from twisted.internet import reactor | |
28 from twisted.internet import error | |
29 from twisted.internet import interfaces | |
30 from libervia.backend.core.i18n import _, D_ | |
31 from libervia.backend.core.constants import Const as C | |
32 from libervia.backend.core import exceptions | |
33 from libervia.backend.core.log import getLogger | |
34 from libervia.backend.tools import xml_tools | |
35 from libervia.backend.tools import stream | |
36 | |
37 | |
38 log = getLogger(__name__) | |
39 | |
40 NS_STREAM = "http://salut-a-toi.org/protocol/stream" | |
41 SECURITY_LIMIT = 30 | |
42 START_PORT = 8888 | |
43 | |
44 PLUGIN_INFO = { | |
45 C.PI_NAME: "Jingle Stream Plugin", | |
46 C.PI_IMPORT_NAME: "STREAM", | |
47 C.PI_TYPE: "EXP", | |
48 C.PI_PROTOCOLS: [], | |
49 C.PI_DEPENDENCIES: ["XEP-0166"], | |
50 C.PI_MAIN: "JingleStream", | |
51 C.PI_HANDLER: "no", | |
52 C.PI_DESCRIPTION: _("""Jingle Stream plugin"""), | |
53 } | |
54 | |
55 CONFIRM = D_("{peer} wants to send you a stream, do you accept ?") | |
56 CONFIRM_TITLE = D_("Stream Request") | |
57 | |
58 | |
59 class StreamProtocol(protocol.Protocol): | |
60 def __init__(self): | |
61 self.pause = False | |
62 | |
63 def set_pause(self, paused): | |
64 # in Python 2.x, Twisted classes are old style | |
65 # so we can use property and setter | |
66 if paused: | |
67 if not self.pause: | |
68 self.transport.pauseProducing() | |
69 self.pause = True | |
70 else: | |
71 if self.pause: | |
72 self.transport.resumeProducing() | |
73 self.pause = False | |
74 | |
75 def disconnect(self): | |
76 self.transport.loseConnection() | |
77 | |
78 def connectionMade(self): | |
79 if self.factory.client_conn is not None: | |
80 self.transport.loseConnection() | |
81 self.factory.set_client_conn(self) | |
82 | |
83 def dataReceived(self, data): | |
84 self.factory.write_to_consumer(data) | |
85 | |
86 def sendData(self, data): | |
87 self.transport.write(data) | |
88 | |
89 def connectionLost(self, reason): | |
90 if self.factory.client_conn != self: | |
91 # only the first connected client_conn is relevant | |
92 return | |
93 | |
94 if reason.type == error.ConnectionDone: | |
95 self.factory.stream_finished() | |
96 else: | |
97 self.factory.stream_failed(reason) | |
98 | |
99 | |
100 @interface.implementer(stream.IStreamProducer) | |
101 @interface.implementer(interfaces.IPushProducer) | |
102 @interface.implementer(interfaces.IConsumer) | |
103 class StreamFactory(protocol.Factory): | |
104 protocol = StreamProtocol | |
105 consumer = None | |
106 producer = None | |
107 deferred = None | |
108 | |
109 def __init__(self): | |
110 self.client_conn = None | |
111 | |
112 def set_client_conn(self, stream_protocol): | |
113 # in Python 2.x, Twisted classes are old style | |
114 # so we can use property and setter | |
115 assert self.client_conn is None | |
116 self.client_conn = stream_protocol | |
117 if self.consumer is None: | |
118 self.client_conn.set_pause(True) | |
119 | |
120 def start_stream(self, consumer): | |
121 if self.consumer is not None: | |
122 raise exceptions.InternalError( | |
123 _("stream can't be used with multiple consumers") | |
124 ) | |
125 assert self.deferred is None | |
126 self.consumer = consumer | |
127 consumer.registerProducer(self, True) | |
128 self.deferred = defer.Deferred() | |
129 if self.client_conn is not None: | |
130 self.client_conn.set_pause(False) | |
131 return self.deferred | |
132 | |
133 def stream_finished(self): | |
134 self.client_conn = None | |
135 if self.consumer: | |
136 self.consumer.unregisterProducer() | |
137 self.port_listening.stopListening() | |
138 self.deferred.callback(None) | |
139 | |
140 def stream_failed(self, failure_): | |
141 self.client_conn = None | |
142 if self.consumer: | |
143 self.consumer.unregisterProducer() | |
144 self.port_listening.stopListening() | |
145 self.deferred.errback(failure_) | |
146 elif self.producer: | |
147 self.producer.stopProducing() | |
148 | |
149 def stop_stream(self): | |
150 if self.client_conn is not None: | |
151 self.client_conn.disconnect() | |
152 | |
153 def registerProducer(self, producer, streaming): | |
154 self.producer = producer | |
155 | |
156 def pauseProducing(self): | |
157 self.client_conn.set_pause(True) | |
158 | |
159 def resumeProducing(self): | |
160 self.client_conn.set_pause(False) | |
161 | |
162 def stopProducing(self): | |
163 if self.client_conn: | |
164 self.client_conn.disconnect() | |
165 | |
166 def write(self, data): | |
167 try: | |
168 self.client_conn.sendData(data) | |
169 except AttributeError: | |
170 log.warning(_("No client connected, can't send data")) | |
171 | |
172 def write_to_consumer(self, data): | |
173 self.consumer.write(data) | |
174 | |
175 | |
176 class JingleStream(object): | |
177 """This non standard jingle application send byte stream""" | |
178 | |
179 def __init__(self, host): | |
180 log.info(_("Plugin Stream initialization")) | |
181 self.host = host | |
182 self._j = host.plugins["XEP-0166"] # shortcut to access jingle | |
183 self._j.register_application(NS_STREAM, self) | |
184 host.bridge.add_method( | |
185 "stream_out", | |
186 ".plugin", | |
187 in_sign="ss", | |
188 out_sign="s", | |
189 method=self._stream_out, | |
190 async_=True, | |
191 ) | |
192 | |
193 # jingle callbacks | |
194 | |
195 def _stream_out(self, to_jid_s, profile_key): | |
196 client = self.host.get_client(profile_key) | |
197 return defer.ensureDeferred(self.stream_out(client, jid.JID(to_jid_s))) | |
198 | |
199 async def stream_out(self, client, to_jid): | |
200 """send a stream | |
201 | |
202 @param peer_jid(jid.JID): recipient | |
203 @return: an unique id to identify the transfer | |
204 """ | |
205 port = START_PORT | |
206 factory = StreamFactory() | |
207 while True: | |
208 endpoint = endpoints.TCP4ServerEndpoint(reactor, port) | |
209 try: | |
210 port_listening = await endpoint.listen(factory) | |
211 except error.CannotListenError as e: | |
212 if e.socketError.errno == errno.EADDRINUSE: | |
213 port += 1 | |
214 else: | |
215 raise e | |
216 else: | |
217 factory.port_listening = port_listening | |
218 break | |
219 # we don't want to wait for IQ result of initiate | |
220 defer.ensureDeferred(self._j.initiate( | |
221 client, | |
222 to_jid, | |
223 [ | |
224 { | |
225 "app_ns": NS_STREAM, | |
226 "senders": self._j.ROLE_INITIATOR, | |
227 "app_kwargs": {"stream_object": factory}, | |
228 } | |
229 ], | |
230 )) | |
231 return str(port) | |
232 | |
233 def jingle_session_init(self, client, session, content_name, stream_object): | |
234 content_data = session["contents"][content_name] | |
235 application_data = content_data["application_data"] | |
236 assert "stream_object" not in application_data | |
237 application_data["stream_object"] = stream_object | |
238 desc_elt = domish.Element((NS_STREAM, "description")) | |
239 return desc_elt | |
240 | |
241 @defer.inlineCallbacks | |
242 def jingle_request_confirmation(self, client, action, session, content_name, desc_elt): | |
243 """This method request confirmation for a jingle session""" | |
244 content_data = session["contents"][content_name] | |
245 if content_data["senders"] not in ( | |
246 self._j.ROLE_INITIATOR, | |
247 self._j.ROLE_RESPONDER, | |
248 ): | |
249 log.warning("Bad sender, assuming initiator") | |
250 content_data["senders"] = self._j.ROLE_INITIATOR | |
251 | |
252 confirm_data = yield xml_tools.defer_dialog( | |
253 self.host, | |
254 _(CONFIRM).format(peer=session["peer_jid"].full()), | |
255 _(CONFIRM_TITLE), | |
256 type_=C.XMLUI_DIALOG_CONFIRM, | |
257 action_extra={ | |
258 "from_jid": session["peer_jid"].full(), | |
259 "type": "STREAM", | |
260 }, | |
261 security_limit=SECURITY_LIMIT, | |
262 profile=client.profile, | |
263 ) | |
264 | |
265 if not C.bool(confirm_data["answer"]): | |
266 defer.returnValue(False) | |
267 try: | |
268 port = int(confirm_data["port"]) | |
269 except (ValueError, KeyError): | |
270 raise exceptions.DataError(_("given port is invalid")) | |
271 endpoint = endpoints.TCP4ClientEndpoint(reactor, "localhost", port) | |
272 factory = StreamFactory() | |
273 yield endpoint.connect(factory) | |
274 content_data["stream_object"] = factory | |
275 finished_d = content_data["finished_d"] = defer.Deferred() | |
276 args = [client, session, content_name, content_data] | |
277 finished_d.addCallbacks(self._finished_cb, self._finished_eb, args, None, args) | |
278 defer.returnValue(True) | |
279 | |
280 def jingle_handler(self, client, action, session, content_name, desc_elt): | |
281 content_data = session["contents"][content_name] | |
282 application_data = content_data["application_data"] | |
283 if action in (self._j.A_ACCEPTED_ACK, self._j.A_SESSION_INITIATE): | |
284 pass | |
285 elif action == self._j.A_SESSION_ACCEPT: | |
286 assert not "stream_object" in content_data | |
287 content_data["stream_object"] = application_data["stream_object"] | |
288 finished_d = content_data["finished_d"] = defer.Deferred() | |
289 args = [client, session, content_name, content_data] | |
290 finished_d.addCallbacks(self._finished_cb, self._finished_eb, args, None, args) | |
291 else: | |
292 log.warning("FIXME: unmanaged action {}".format(action)) | |
293 return desc_elt | |
294 | |
295 def _finished_cb(self, __, client, session, content_name, content_data): | |
296 log.info("Pipe transfer completed") | |
297 self._j.content_terminate(client, session, content_name) | |
298 content_data["stream_object"].stop_stream() | |
299 | |
300 def _finished_eb(self, failure, client, session, content_name, content_data): | |
301 log.warning("Error while streaming pipe: {}".format(failure)) | |
302 self._j.content_terminate( | |
303 client, session, content_name, reason=self._j.REASON_FAILED_TRANSPORT | |
304 ) | |
305 content_data["stream_object"].stop_stream() |