comparison libervia/backend/plugins/plugin_exp_jingle_stream.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200
parents sat/plugins/plugin_exp_jingle_stream.py@877145b4ba01
children 0d7bb4df2343
comparison
equal deleted inserted replaced
4070:d10748475025 4071:4b842c1fb686
1 #!/usr/bin/env python3
2
3
4 # SAT plugin for managing pipes (experimental)
5 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
6
7 # This program is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU Affero General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
11
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU Affero General Public License for more details.
16
17 # You should have received a copy of the GNU Affero General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20 import errno
21 from zope import interface
22 from twisted.words.xish import domish
23 from twisted.words.protocols.jabber import jid
24 from twisted.internet import defer
25 from twisted.internet import protocol
26 from twisted.internet import endpoints
27 from twisted.internet import reactor
28 from twisted.internet import error
29 from twisted.internet import interfaces
30 from libervia.backend.core.i18n import _, D_
31 from libervia.backend.core.constants import Const as C
32 from libervia.backend.core import exceptions
33 from libervia.backend.core.log import getLogger
34 from libervia.backend.tools import xml_tools
35 from libervia.backend.tools import stream
36
37
38 log = getLogger(__name__)
39
40 NS_STREAM = "http://salut-a-toi.org/protocol/stream"
41 SECURITY_LIMIT = 30
42 START_PORT = 8888
43
44 PLUGIN_INFO = {
45 C.PI_NAME: "Jingle Stream Plugin",
46 C.PI_IMPORT_NAME: "STREAM",
47 C.PI_TYPE: "EXP",
48 C.PI_PROTOCOLS: [],
49 C.PI_DEPENDENCIES: ["XEP-0166"],
50 C.PI_MAIN: "JingleStream",
51 C.PI_HANDLER: "no",
52 C.PI_DESCRIPTION: _("""Jingle Stream plugin"""),
53 }
54
55 CONFIRM = D_("{peer} wants to send you a stream, do you accept ?")
56 CONFIRM_TITLE = D_("Stream Request")
57
58
59 class StreamProtocol(protocol.Protocol):
60 def __init__(self):
61 self.pause = False
62
63 def set_pause(self, paused):
64 # in Python 2.x, Twisted classes are old style
65 # so we can use property and setter
66 if paused:
67 if not self.pause:
68 self.transport.pauseProducing()
69 self.pause = True
70 else:
71 if self.pause:
72 self.transport.resumeProducing()
73 self.pause = False
74
75 def disconnect(self):
76 self.transport.loseConnection()
77
78 def connectionMade(self):
79 if self.factory.client_conn is not None:
80 self.transport.loseConnection()
81 self.factory.set_client_conn(self)
82
83 def dataReceived(self, data):
84 self.factory.write_to_consumer(data)
85
86 def sendData(self, data):
87 self.transport.write(data)
88
89 def connectionLost(self, reason):
90 if self.factory.client_conn != self:
91 # only the first connected client_conn is relevant
92 return
93
94 if reason.type == error.ConnectionDone:
95 self.factory.stream_finished()
96 else:
97 self.factory.stream_failed(reason)
98
99
100 @interface.implementer(stream.IStreamProducer)
101 @interface.implementer(interfaces.IPushProducer)
102 @interface.implementer(interfaces.IConsumer)
103 class StreamFactory(protocol.Factory):
104 protocol = StreamProtocol
105 consumer = None
106 producer = None
107 deferred = None
108
109 def __init__(self):
110 self.client_conn = None
111
112 def set_client_conn(self, stream_protocol):
113 # in Python 2.x, Twisted classes are old style
114 # so we can use property and setter
115 assert self.client_conn is None
116 self.client_conn = stream_protocol
117 if self.consumer is None:
118 self.client_conn.set_pause(True)
119
120 def start_stream(self, consumer):
121 if self.consumer is not None:
122 raise exceptions.InternalError(
123 _("stream can't be used with multiple consumers")
124 )
125 assert self.deferred is None
126 self.consumer = consumer
127 consumer.registerProducer(self, True)
128 self.deferred = defer.Deferred()
129 if self.client_conn is not None:
130 self.client_conn.set_pause(False)
131 return self.deferred
132
133 def stream_finished(self):
134 self.client_conn = None
135 if self.consumer:
136 self.consumer.unregisterProducer()
137 self.port_listening.stopListening()
138 self.deferred.callback(None)
139
140 def stream_failed(self, failure_):
141 self.client_conn = None
142 if self.consumer:
143 self.consumer.unregisterProducer()
144 self.port_listening.stopListening()
145 self.deferred.errback(failure_)
146 elif self.producer:
147 self.producer.stopProducing()
148
149 def stop_stream(self):
150 if self.client_conn is not None:
151 self.client_conn.disconnect()
152
153 def registerProducer(self, producer, streaming):
154 self.producer = producer
155
156 def pauseProducing(self):
157 self.client_conn.set_pause(True)
158
159 def resumeProducing(self):
160 self.client_conn.set_pause(False)
161
162 def stopProducing(self):
163 if self.client_conn:
164 self.client_conn.disconnect()
165
166 def write(self, data):
167 try:
168 self.client_conn.sendData(data)
169 except AttributeError:
170 log.warning(_("No client connected, can't send data"))
171
172 def write_to_consumer(self, data):
173 self.consumer.write(data)
174
175
176 class JingleStream(object):
177 """This non standard jingle application send byte stream"""
178
179 def __init__(self, host):
180 log.info(_("Plugin Stream initialization"))
181 self.host = host
182 self._j = host.plugins["XEP-0166"] # shortcut to access jingle
183 self._j.register_application(NS_STREAM, self)
184 host.bridge.add_method(
185 "stream_out",
186 ".plugin",
187 in_sign="ss",
188 out_sign="s",
189 method=self._stream_out,
190 async_=True,
191 )
192
193 # jingle callbacks
194
195 def _stream_out(self, to_jid_s, profile_key):
196 client = self.host.get_client(profile_key)
197 return defer.ensureDeferred(self.stream_out(client, jid.JID(to_jid_s)))
198
199 async def stream_out(self, client, to_jid):
200 """send a stream
201
202 @param peer_jid(jid.JID): recipient
203 @return: an unique id to identify the transfer
204 """
205 port = START_PORT
206 factory = StreamFactory()
207 while True:
208 endpoint = endpoints.TCP4ServerEndpoint(reactor, port)
209 try:
210 port_listening = await endpoint.listen(factory)
211 except error.CannotListenError as e:
212 if e.socketError.errno == errno.EADDRINUSE:
213 port += 1
214 else:
215 raise e
216 else:
217 factory.port_listening = port_listening
218 break
219 # we don't want to wait for IQ result of initiate
220 defer.ensureDeferred(self._j.initiate(
221 client,
222 to_jid,
223 [
224 {
225 "app_ns": NS_STREAM,
226 "senders": self._j.ROLE_INITIATOR,
227 "app_kwargs": {"stream_object": factory},
228 }
229 ],
230 ))
231 return str(port)
232
233 def jingle_session_init(self, client, session, content_name, stream_object):
234 content_data = session["contents"][content_name]
235 application_data = content_data["application_data"]
236 assert "stream_object" not in application_data
237 application_data["stream_object"] = stream_object
238 desc_elt = domish.Element((NS_STREAM, "description"))
239 return desc_elt
240
241 @defer.inlineCallbacks
242 def jingle_request_confirmation(self, client, action, session, content_name, desc_elt):
243 """This method request confirmation for a jingle session"""
244 content_data = session["contents"][content_name]
245 if content_data["senders"] not in (
246 self._j.ROLE_INITIATOR,
247 self._j.ROLE_RESPONDER,
248 ):
249 log.warning("Bad sender, assuming initiator")
250 content_data["senders"] = self._j.ROLE_INITIATOR
251
252 confirm_data = yield xml_tools.defer_dialog(
253 self.host,
254 _(CONFIRM).format(peer=session["peer_jid"].full()),
255 _(CONFIRM_TITLE),
256 type_=C.XMLUI_DIALOG_CONFIRM,
257 action_extra={
258 "from_jid": session["peer_jid"].full(),
259 "type": "STREAM",
260 },
261 security_limit=SECURITY_LIMIT,
262 profile=client.profile,
263 )
264
265 if not C.bool(confirm_data["answer"]):
266 defer.returnValue(False)
267 try:
268 port = int(confirm_data["port"])
269 except (ValueError, KeyError):
270 raise exceptions.DataError(_("given port is invalid"))
271 endpoint = endpoints.TCP4ClientEndpoint(reactor, "localhost", port)
272 factory = StreamFactory()
273 yield endpoint.connect(factory)
274 content_data["stream_object"] = factory
275 finished_d = content_data["finished_d"] = defer.Deferred()
276 args = [client, session, content_name, content_data]
277 finished_d.addCallbacks(self._finished_cb, self._finished_eb, args, None, args)
278 defer.returnValue(True)
279
280 def jingle_handler(self, client, action, session, content_name, desc_elt):
281 content_data = session["contents"][content_name]
282 application_data = content_data["application_data"]
283 if action in (self._j.A_ACCEPTED_ACK, self._j.A_SESSION_INITIATE):
284 pass
285 elif action == self._j.A_SESSION_ACCEPT:
286 assert not "stream_object" in content_data
287 content_data["stream_object"] = application_data["stream_object"]
288 finished_d = content_data["finished_d"] = defer.Deferred()
289 args = [client, session, content_name, content_data]
290 finished_d.addCallbacks(self._finished_cb, self._finished_eb, args, None, args)
291 else:
292 log.warning("FIXME: unmanaged action {}".format(action))
293 return desc_elt
294
295 def _finished_cb(self, __, client, session, content_name, content_data):
296 log.info("Pipe transfer completed")
297 self._j.content_terminate(client, session, content_name)
298 content_data["stream_object"].stop_stream()
299
300 def _finished_eb(self, failure, client, session, content_name, content_data):
301 log.warning("Error while streaming pipe: {}".format(failure))
302 self._j.content_terminate(
303 client, session, content_name, reason=self._j.REASON_FAILED_TRANSPORT
304 )
305 content_data["stream_object"].stop_stream()