# HG changeset patch # User Goffi # Date 1605189195 -3600 # Node ID 26a0af6e32c197ca8b4838753c762e6fb04db9b0 # Parent 404d4b29de523b2545b9d3610e4906f6f1adf875 plugin XEP-0166: new trigger point + coroutines + helper methods: - a new `XEP-0166_initiate` async trigger point is available - `initate` is now a coroutine - `jingleSessionInit` in applications and transports handlers are now called using `utils.adDeferred` - new `getApplication` helper method, to retrieve application from its namespace - new `getContentData` helper method to retrieve application and its argument from content diff -r 404d4b29de52 -r 26a0af6e32c1 sat/plugins/plugin_exp_jingle_stream.py --- a/sat/plugins/plugin_exp_jingle_stream.py Thu Nov 12 14:53:15 2020 +0100 +++ b/sat/plugins/plugin_exp_jingle_stream.py Thu Nov 12 14:53:15 2020 +0100 @@ -17,14 +17,8 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -from sat.core.i18n import _, D_ -from sat.core.constants import Const as C -from sat.core import exceptions -from sat.core.log import getLogger - -log = getLogger(__name__) -from sat.tools import xml_tools -from sat.tools import stream +import errno +from zope import interface from twisted.words.xish import domish from twisted.words.protocols.jabber import jid from twisted.internet import defer @@ -33,8 +27,15 @@ from twisted.internet import reactor from twisted.internet import error from twisted.internet import interfaces -from zope import interface -import errno +from sat.core.i18n import _, D_ +from sat.core.constants import Const as C +from sat.core import exceptions +from sat.core.log import getLogger +from sat.tools import xml_tools +from sat.tools import stream + + +log = getLogger(__name__) NS_STREAM = "http://salut-a-toi.org/protocol/stream" SECURITY_LIMIT = 30 @@ -193,10 +194,9 @@ def _streamOut(self, to_jid_s, profile_key): client = self.host.getClient(profile_key) - return self.streamOut(client, jid.JID(to_jid_s)) + return defer.ensureDeferred(self.streamOut(client, jid.JID(to_jid_s))) - @defer.inlineCallbacks - def streamOut(self, client, to_jid): + async def streamOut(self, client, to_jid): """send a stream @param peer_jid(jid.JID): recipient @@ -207,7 +207,7 @@ while True: endpoint = endpoints.TCP4ServerEndpoint(reactor, port) try: - port_listening = yield endpoint.listen(factory) + port_listening = await endpoint.listen(factory) except error.CannotListenError as e: if e.socketError.errno == errno.EADDRINUSE: port += 1 @@ -216,7 +216,8 @@ else: factory.port_listening = port_listening break - self._j.initiate( + # we don't want to wait for IQ result of initiate + defer.ensureDeferred(self._j.initiate( client, to_jid, [ @@ -226,8 +227,8 @@ "app_kwargs": {"stream_object": factory}, } ], - ) - defer.returnValue(str(port)) + )) + return str(port) def jingleSessionInit(self, client, session, content_name, stream_object): content_data = session["contents"][content_name] diff -r 404d4b29de52 -r 26a0af6e32c1 sat/plugins/plugin_xep_0166.py --- a/sat/plugins/plugin_xep_0166.py Thu Nov 12 14:53:15 2020 +0100 +++ b/sat/plugins/plugin_xep_0166.py Thu Nov 12 14:53:15 2020 +0100 @@ -19,6 +19,7 @@ import uuid import time +from typing import Tuple from collections import namedtuple from zope.interface import implementer from twisted.words.protocols.jabber import jid @@ -336,8 +337,34 @@ """ return self._buildJingleElt(client, session, XEP_0166.A_SESSION_INFO) - @defer.inlineCallbacks - def initiate(self, client, peer_jid, contents): + def getApplication(self, namespace: str) -> object: + """Retreive application corresponding to a namespace + + @raise exceptions.NotFound if application can't be found + """ + try: + return self._applications[namespace] + except KeyError: + raise exceptions.NotFound( + f"No application registered for {namespace}" + ) + + def getContentData(self, content: dict) -> Tuple[object, list, dict, str]: + """"Retrieve application and its argument from content""" + app_ns = content["app_ns"] + try: + application = self.getApplication(app_ns) + except exceptions.NotFound as e: + raise exceptions.InternalError(str(e)) + app_args = content.get("app_args", []) + app_kwargs = content.get("app_kwargs", {}) + try: + content_name = content["name"] + except KeyError: + content_name = content["name"] = str(uuid.uuid4()) + return application, app_args, app_kwargs, content_name + + async def initiate(self, client, peer_jid, contents): """Send a session initiation request @param peer_jid(jid.JID): jid to establith session with @@ -352,7 +379,7 @@ default to BOTH (see XEP-0166 ยง7.3) - app_args(list): args to pass to the application plugin - app_kwargs(dict): keyword args to pass to the application plugin - @return D(unicode): jingle session id + @return (unicode): jingle session id """ assert contents # there must be at least one content if (peer_jid == client.jid @@ -371,6 +398,13 @@ "started": time.time(), "contents": {}, } + + if not await self.host.trigger.asyncPoint( + "XEP-0166_initiate", + client, session, contents + ): + return + iq_elt, jingle_elt = self._buildJingleElt( client, session, XEP_0166.A_SESSION_INITIATE ) @@ -380,13 +414,7 @@ for content in contents: # we get the application plugin - app_ns = content["app_ns"] - try: - application = self._applications[app_ns] - except KeyError: - raise exceptions.InternalError( - "No application registered for {}".format(app_ns) - ) + application, app_args, app_kwargs, content_name = self.getContentData(content) # and the transport plugin transport_type = content.get("transport_type", XEP_0166.TRANSPORT_STREAMING) @@ -406,15 +434,10 @@ "creator": XEP_0166.ROLE_INITIATOR, "senders": content.get("senders", "both"), } - try: - content_name = content["name"] - except KeyError: - content_name = str(uuid.uuid4()) - else: - if content_name in contents_dict: - raise exceptions.InternalError( - "There is already a content with this name" - ) + if content_name in contents_dict: + raise exceptions.InternalError( + "There is already a content with this name" + ) contents_dict[content_name] = content_data # we construct the content element @@ -427,21 +450,21 @@ pass # then the description element - app_args = content.get("app_args", []) - app_kwargs = content.get("app_kwargs", {}) - desc_elt = yield application.handler.jingleSessionInit( + desc_elt = await utils.asDeferred( + application.handler.jingleSessionInit, client, session, content_name, *app_args, **app_kwargs ) content_elt.addChild(desc_elt) # and the transport one - transport_elt = yield transport.handler.jingleSessionInit( + transport_elt = await utils.asDeferred( + transport.handler.jingleSessionInit, client, session, content_name ) content_elt.addChild(transport_elt) try: - yield iq_elt.send() + await iq_elt.send() except Exception as e: failure_ = failure.Failure(e) self._iqError(failure_, sid, client) diff -r 404d4b29de52 -r 26a0af6e32c1 sat/plugins/plugin_xep_0234.py --- a/sat/plugins/plugin_xep_0234.py Thu Nov 12 14:53:15 2020 +0100 +++ b/sat/plugins/plugin_xep_0234.py Thu Nov 12 14:53:15 2020 +0100 @@ -405,6 +405,11 @@ # jingle callbacks + def jingleDescriptionElt( + self, client, session, content_name, filepath, name, extra, progress_id_d + ): + return domish.Element((NS_JINGLE_FT, "description")) + def jingleSessionInit( self, client, session, content_name, filepath, name, extra, progress_id_d ): @@ -423,7 +428,8 @@ assert "file_path" not in application_data application_data["file_path"] = filepath file_data = application_data["file_data"] = {} - desc_elt = domish.Element((NS_JINGLE_FT, "description")) + desc_elt = self.jingleDescriptionElt( + client, session, content_name, filepath, name, extra, progress_id_d) file_elt = desc_elt.addElement("file") if content_data["senders"] == self._j.ROLE_INITIATOR: