Mercurial > libervia-backend
comparison sat/plugins/plugin_exp_jingle_stream.py @ 3404:26a0af6e32c1
plugin XEP-0166: new trigger point + coroutines + helper methods:
- a new `XEP-0166_initiate` async trigger point is available
- `initate` is now a coroutine
- `jingleSessionInit` in applications and transports handlers are now called using
`utils.adDeferred`
- new `getApplication` helper method, to retrieve application from its namespace
- new `getContentData` helper method to retrieve application and its argument from content
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 12 Nov 2020 14:53:15 +0100 |
parents | 559a625a236b |
children | be6d91572633 |
comparison
equal
deleted
inserted
replaced
3403:404d4b29de52 | 3404:26a0af6e32c1 |
---|---|
15 # GNU Affero General Public License for more details. | 15 # GNU Affero General Public License for more details. |
16 | 16 |
17 # You should have received a copy of the GNU Affero General Public License | 17 # You should have received a copy of the GNU Affero General Public License |
18 # along with this program. If not, see <http://www.gnu.org/licenses/>. | 18 # along with this program. If not, see <http://www.gnu.org/licenses/>. |
19 | 19 |
20 from sat.core.i18n import _, D_ | 20 import errno |
21 from sat.core.constants import Const as C | 21 from zope import interface |
22 from sat.core import exceptions | |
23 from sat.core.log import getLogger | |
24 | |
25 log = getLogger(__name__) | |
26 from sat.tools import xml_tools | |
27 from sat.tools import stream | |
28 from twisted.words.xish import domish | 22 from twisted.words.xish import domish |
29 from twisted.words.protocols.jabber import jid | 23 from twisted.words.protocols.jabber import jid |
30 from twisted.internet import defer | 24 from twisted.internet import defer |
31 from twisted.internet import protocol | 25 from twisted.internet import protocol |
32 from twisted.internet import endpoints | 26 from twisted.internet import endpoints |
33 from twisted.internet import reactor | 27 from twisted.internet import reactor |
34 from twisted.internet import error | 28 from twisted.internet import error |
35 from twisted.internet import interfaces | 29 from twisted.internet import interfaces |
36 from zope import interface | 30 from sat.core.i18n import _, D_ |
37 import errno | 31 from sat.core.constants import Const as C |
32 from sat.core import exceptions | |
33 from sat.core.log import getLogger | |
34 from sat.tools import xml_tools | |
35 from sat.tools import stream | |
36 | |
37 | |
38 log = getLogger(__name__) | |
38 | 39 |
39 NS_STREAM = "http://salut-a-toi.org/protocol/stream" | 40 NS_STREAM = "http://salut-a-toi.org/protocol/stream" |
40 SECURITY_LIMIT = 30 | 41 SECURITY_LIMIT = 30 |
41 START_PORT = 8888 | 42 START_PORT = 8888 |
42 | 43 |
191 | 192 |
192 # jingle callbacks | 193 # jingle callbacks |
193 | 194 |
194 def _streamOut(self, to_jid_s, profile_key): | 195 def _streamOut(self, to_jid_s, profile_key): |
195 client = self.host.getClient(profile_key) | 196 client = self.host.getClient(profile_key) |
196 return self.streamOut(client, jid.JID(to_jid_s)) | 197 return defer.ensureDeferred(self.streamOut(client, jid.JID(to_jid_s))) |
197 | 198 |
198 @defer.inlineCallbacks | 199 async def streamOut(self, client, to_jid): |
199 def streamOut(self, client, to_jid): | |
200 """send a stream | 200 """send a stream |
201 | 201 |
202 @param peer_jid(jid.JID): recipient | 202 @param peer_jid(jid.JID): recipient |
203 @return: an unique id to identify the transfer | 203 @return: an unique id to identify the transfer |
204 """ | 204 """ |
205 port = START_PORT | 205 port = START_PORT |
206 factory = StreamFactory() | 206 factory = StreamFactory() |
207 while True: | 207 while True: |
208 endpoint = endpoints.TCP4ServerEndpoint(reactor, port) | 208 endpoint = endpoints.TCP4ServerEndpoint(reactor, port) |
209 try: | 209 try: |
210 port_listening = yield endpoint.listen(factory) | 210 port_listening = await endpoint.listen(factory) |
211 except error.CannotListenError as e: | 211 except error.CannotListenError as e: |
212 if e.socketError.errno == errno.EADDRINUSE: | 212 if e.socketError.errno == errno.EADDRINUSE: |
213 port += 1 | 213 port += 1 |
214 else: | 214 else: |
215 raise e | 215 raise e |
216 else: | 216 else: |
217 factory.port_listening = port_listening | 217 factory.port_listening = port_listening |
218 break | 218 break |
219 self._j.initiate( | 219 # we don't want to wait for IQ result of initiate |
220 defer.ensureDeferred(self._j.initiate( | |
220 client, | 221 client, |
221 to_jid, | 222 to_jid, |
222 [ | 223 [ |
223 { | 224 { |
224 "app_ns": NS_STREAM, | 225 "app_ns": NS_STREAM, |
225 "senders": self._j.ROLE_INITIATOR, | 226 "senders": self._j.ROLE_INITIATOR, |
226 "app_kwargs": {"stream_object": factory}, | 227 "app_kwargs": {"stream_object": factory}, |
227 } | 228 } |
228 ], | 229 ], |
229 ) | 230 )) |
230 defer.returnValue(str(port)) | 231 return str(port) |
231 | 232 |
232 def jingleSessionInit(self, client, session, content_name, stream_object): | 233 def jingleSessionInit(self, client, session, content_name, stream_object): |
233 content_data = session["contents"][content_name] | 234 content_data = session["contents"][content_name] |
234 application_data = content_data["application_data"] | 235 application_data = content_data["application_data"] |
235 assert "stream_object" not in application_data | 236 assert "stream_object" not in application_data |