Mercurial > libervia-backend
diff sat/plugins/plugin_xep_0166.py @ 2624:56f94936df1e
code style reformatting using black
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 27 Jun 2018 20:14:46 +0200 |
parents | 26edcf3a30eb |
children | 378188abe941 |
line wrap: on
line diff
--- a/sat/plugins/plugin_xep_0166.py Wed Jun 27 07:51:29 2018 +0200 +++ b/sat/plugins/plugin_xep_0166.py Wed Jun 27 20:14:46 2018 +0200 @@ -21,6 +21,7 @@ from sat.core.constants import Const as C from sat.core.log import getLogger from sat.tools import xml_tools + log = getLogger(__name__) from sat.core import exceptions from twisted.words.protocols.jabber import jid @@ -37,7 +38,6 @@ from zope.interface import implements - IQ_SET = '/iq[@type="set"]' NS_JINGLE = "urn:xmpp:jingle:1" NS_JINGLE_ERROR = "urn:xmpp:jingle:errors:1" @@ -55,24 +55,24 @@ C.PI_PROTOCOLS: ["XEP-0166"], C.PI_MAIN: "XEP_0166", C.PI_HANDLER: "yes", - C.PI_DESCRIPTION: _("""Implementation of Jingle""") + C.PI_DESCRIPTION: _("""Implementation of Jingle"""), } -ApplicationData = namedtuple('ApplicationData', ('namespace', 'handler')) -TransportData = namedtuple('TransportData', ('namespace', 'handler', 'priority')) +ApplicationData = namedtuple("ApplicationData", ("namespace", "handler")) +TransportData = namedtuple("TransportData", ("namespace", "handler", "priority")) class XEP_0166(object): ROLE_INITIATOR = "initiator" ROLE_RESPONDER = "responder" - TRANSPORT_DATAGRAM='UDP' - TRANSPORT_STREAMING='TCP' - REASON_SUCCESS='success' - REASON_DECLINE='decline' - REASON_FAILED_APPLICATION='failed-application' - REASON_FAILED_TRANSPORT='failed-transport' - REASON_CONNECTIVITY_ERROR='connectivity-error' + TRANSPORT_DATAGRAM = "UDP" + TRANSPORT_STREAMING = "TCP" + REASON_SUCCESS = "success" + REASON_DECLINE = "decline" + REASON_FAILED_APPLICATION = "failed-application" + REASON_FAILED_TRANSPORT = "failed-transport" + REASON_CONNECTIVITY_ERROR = "connectivity-error" A_SESSION_INITIATE = "session-initiate" A_SESSION_ACCEPT = "session-accept" A_SESSION_TERMINATE = "session-terminate" @@ -82,21 +82,26 @@ A_TRANSPORT_REJECT = "transport-reject" A_TRANSPORT_INFO = "transport-info" # non standard actions - A_PREPARE_INITIATOR = "prepare-initiator" # initiator must prepare tranfer - A_PREPARE_RESPONDER = "prepare-responder" # responder must prepare tranfer - A_ACCEPTED_ACK = "accepted-ack" # session accepted ack has been received from initiator - A_START = "start" # application can start - A_DESTROY = "destroy" # called when a transport is destroyed (e.g. because it is remplaced). Used to do cleaning operations + A_PREPARE_INITIATOR = "prepare-initiator" # initiator must prepare tranfer + A_PREPARE_RESPONDER = "prepare-responder" # responder must prepare tranfer + A_ACCEPTED_ACK = ( + "accepted-ack" + ) # session accepted ack has been received from initiator + A_START = "start" # application can start + A_DESTROY = ( + "destroy" + ) # called when a transport is destroyed (e.g. because it is remplaced). Used to do cleaning operations def __init__(self, host): log.info(_("plugin Jingle initialization")) self.host = host - self._applications = {} # key: namespace, value: application data - self._transports = {} # key: namespace, value: transport data + self._applications = {} # key: namespace, value: application data + self._transports = {} # key: namespace, value: transport data # we also keep transports by type, they are then sorted by priority - self._type_transports = { XEP_0166.TRANSPORT_DATAGRAM: [], - XEP_0166.TRANSPORT_STREAMING: [], - } + self._type_transports = { + XEP_0166.TRANSPORT_DATAGRAM: [], + XEP_0166.TRANSPORT_STREAMING: [], + } def profileConnected(self, client): client.jingle_sessions = {} # key = sid, value = session_data @@ -115,12 +120,12 @@ ## helpers methods to build stanzas ## def _buildJingleElt(self, client, session, action): - iq_elt = client.IQ('set') - iq_elt['from'] = client.jid.full() - iq_elt['to'] = session['peer_jid'].full() + iq_elt = client.IQ("set") + iq_elt["from"] = client.jid.full() + iq_elt["to"] = session["peer_jid"].full() jingle_elt = iq_elt.addElement("jingle", NS_JINGLE) - jingle_elt["sid"] = session['id'] - jingle_elt['action'] = action + jingle_elt["sid"] = session["id"] + jingle_elt["action"] = action return iq_elt, jingle_elt def sendError(self, client, error_condition, sid, request, jingle_condition=None): @@ -134,9 +139,13 @@ iq_elt = error.StanzaError(error_condition).toResponse(request) if jingle_condition is not None: iq_elt.error.addElement((NS_JINGLE_ERROR, jingle_condition)) - if error.STANZA_CONDITIONS[error_condition]['type'] == 'cancel' and sid: + if error.STANZA_CONDITIONS[error_condition]["type"] == "cancel" and sid: self._delSession(client, sid) - log.warning(u"Error while managing jingle session, cancelling: {condition}".format(error_condition)) + log.warning( + u"Error while managing jingle session, cancelling: {condition}".format( + error_condition + ) + ) client.send(iq_elt) def _terminateEb(self, failure_): @@ -150,14 +159,16 @@ if a list of element, add them as children of the <reason/> element @param session(dict): data of the session """ - iq_elt, jingle_elt = self._buildJingleElt(client, session, XEP_0166.A_SESSION_TERMINATE) - reason_elt = jingle_elt.addElement('reason') + iq_elt, jingle_elt = self._buildJingleElt( + client, session, XEP_0166.A_SESSION_TERMINATE + ) + reason_elt = jingle_elt.addElement("reason") if isinstance(reason, basestring): reason_elt.addElement(reason) else: for elt in reason: reason_elt.addChild(elt) - self._delSession(client, session['id']) + self._delSession(client, session["id"]) d = iq_elt.send() d.addErrback(self._terminateEb) return d @@ -170,7 +181,11 @@ @param failure_(failure.Failure): the exceptions raised @param sid(unicode): jingle session id """ - log.warning(u"Error while sending jingle <iq/> stanza: {failure_}".format(failure_=failure_.value)) + log.warning( + u"Error while sending jingle <iq/> stanza: {failure_}".format( + failure_=failure_.value + ) + ) self._delSession(client, sid) def _jingleErrorCb(self, fail, sid, request, client): @@ -185,7 +200,7 @@ """ log.warning("Error while processing jingle request") if isinstance(fail, exceptions.DataError): - self.sendError(client, 'bad-request', sid, request) + self.sendError(client, "bad-request", sid, request) else: log.error("Unmanaged jingle exception") self._delSession(client, sid) @@ -211,8 +226,12 @@ May be used to clean session """ if namespace in self._applications: - raise exceptions.ConflictError(u"Trying to register already registered namespace {}".format(namespace)) - self._applications[namespace] = ApplicationData(namespace=namespace, handler=handler) + raise exceptions.ConflictError( + u"Trying to register already registered namespace {}".format(namespace) + ) + self._applications[namespace] = ApplicationData( + namespace=namespace, handler=handler + ) log.debug(u"new jingle application registered") def registerTransport(self, namespace, transport_type, handler, priority=0): @@ -227,12 +246,21 @@ called on several action to negociate the application or transport @param priority(int): priority of this transport """ - assert transport_type in (XEP_0166.TRANSPORT_DATAGRAM, XEP_0166.TRANSPORT_STREAMING) + assert transport_type in ( + XEP_0166.TRANSPORT_DATAGRAM, + XEP_0166.TRANSPORT_STREAMING, + ) if namespace in self._transports: - raise exceptions.ConflictError(u"Trying to register already registered namespace {}".format(namespace)) - transport_data = TransportData(namespace=namespace, handler=handler, priority=priority) + raise exceptions.ConflictError( + u"Trying to register already registered namespace {}".format(namespace) + ) + transport_data = TransportData( + namespace=namespace, handler=handler, priority=priority + ) self._type_transports[transport_type].append(transport_data) - self._type_transports[transport_type].sort(key=lambda transport_data: transport_data.priority, reverse=True) + self._type_transports[transport_type].sort( + key=lambda transport_data: transport_data.priority, reverse=True + ) self._transports[namespace] = transport_data log.debug(u"new jingle transport registered") @@ -247,20 +275,24 @@ # XXX: for now we replace the transport before receiving confirmation from other peer # this is acceptable because we terminate the session if transport is rejected. # this behavious may change in the future. - content_data= session['contents'][content_name] - transport_data = content_data['transport_data'] + content_data = session["contents"][content_name] + transport_data = content_data["transport_data"] try: transport = self._transports[transport_ns] except KeyError: raise exceptions.InternalError(u"Unkown transport") - yield content_data['transport'].handler.jingleHandler(client, XEP_0166.A_DESTROY, session, content_name, None) - content_data['transport'] = transport + yield content_data["transport"].handler.jingleHandler( + client, XEP_0166.A_DESTROY, session, content_name, None + ) + content_data["transport"] = transport transport_data.clear() - iq_elt, jingle_elt = self._buildJingleElt(client, session, XEP_0166.A_TRANSPORT_REPLACE) - content_elt = jingle_elt.addElement('content') - content_elt['name'] = content_name - content_elt['creator'] = content_data['creator'] + iq_elt, jingle_elt = self._buildJingleElt( + client, session, XEP_0166.A_TRANSPORT_REPLACE + ) + content_elt = jingle_elt.addElement("content") + content_elt["name"] = content_name + content_elt["creator"] = content_data["creator"] transport_elt = transport.handler.jingleSessionInit(client, session, content_name) content_elt.addChild(transport_elt) @@ -279,14 +311,16 @@ # we first build iq, jingle and content element which are the same in every cases iq_elt, jingle_elt = self._buildJingleElt(client, session, action) # FIXME: XEP-0260 § 2.3 Ex 5 has an initiator attribute, but it should not according to XEP-0166 §7.1 table 1, must be checked - content_data= session['contents'][content_name] - content_elt = jingle_elt.addElement('content') - content_elt['name'] = content_name - content_elt['creator'] = content_data['creator'] + content_data = session["contents"][content_name] + content_elt = jingle_elt.addElement("content") + content_elt["name"] = content_name + content_elt["creator"] = content_data["creator"] if action == XEP_0166.A_TRANSPORT_INFO: - context_elt = transport_elt = content_elt.addElement('transport', content_data['transport'].namespace) - transport_elt['sid'] = content_data['transport_data']['sid'] + context_elt = transport_elt = content_elt.addElement( + "transport", content_data["transport"].namespace + ) + transport_elt["sid"] = content_data["transport_data"]["sid"] else: raise exceptions.InternalError(u"unmanaged action {}".format(action)) @@ -298,7 +332,7 @@ @param session(dict): jingle session data @return (tuple[domish.Element, domish.Element]): parent <iq> element, <jingle> element """ - return self._buildJingleElt(client, session, XEP_0166.A_SESSION_INFO) + return self._buildJingleElt(client, session, XEP_0166.A_SESSION_INFO) @defer.inlineCallbacks def initiate(self, client, peer_jid, contents): @@ -318,74 +352,88 @@ - app_kwargs(dict): keyword args to pass to the application plugin @return D(unicode): jingle session id """ - assert contents # there must be at least one content + assert contents # there must be at least one content if peer_jid == client.jid: raise ValueError(_(u"You can't do a jingle session with yourself")) initiator = client.jid sid = unicode(uuid.uuid4()) # TODO: session cleaning after timeout ? - session = client.jingle_sessions[sid] = {'id': sid, - 'state': STATE_PENDING, - 'initiator': initiator, - 'role': XEP_0166.ROLE_INITIATOR, - 'peer_jid': peer_jid, - 'started': time.time(), - 'contents': {} - } - iq_elt, jingle_elt = self._buildJingleElt(client, session, XEP_0166.A_SESSION_INITIATE) + session = client.jingle_sessions[sid] = { + "id": sid, + "state": STATE_PENDING, + "initiator": initiator, + "role": XEP_0166.ROLE_INITIATOR, + "peer_jid": peer_jid, + "started": time.time(), + "contents": {}, + } + iq_elt, jingle_elt = self._buildJingleElt( + client, session, XEP_0166.A_SESSION_INITIATE + ) jingle_elt["initiator"] = initiator.full() - contents_dict = session['contents'] + contents_dict = session["contents"] for content in contents: # we get the application plugin - app_ns = content['app_ns'] + app_ns = content["app_ns"] try: application = self._applications[app_ns] except KeyError: - raise exceptions.InternalError(u"No application registered for {}".format(app_ns)) + raise exceptions.InternalError( + u"No application registered for {}".format(app_ns) + ) # and the transport plugin - transport_type = content.get('transport_type', XEP_0166.TRANSPORT_STREAMING) + transport_type = content.get("transport_type", XEP_0166.TRANSPORT_STREAMING) try: transport = self._type_transports[transport_type][0] except IndexError: - raise exceptions.InternalError(u"No transport registered for {}".format(transport_type)) + raise exceptions.InternalError( + u"No transport registered for {}".format(transport_type) + ) # we build the session data - content_data = {'application': application, - 'application_data': {}, - 'transport': transport, - 'transport_data': {}, - 'creator': XEP_0166.ROLE_INITIATOR, - 'senders': content.get('senders', 'both'), - } + content_data = { + "application": application, + "application_data": {}, + "transport": transport, + "transport_data": {}, + "creator": XEP_0166.ROLE_INITIATOR, + "senders": content.get("senders", "both"), + } try: - content_name = content['name'] + content_name = content["name"] except KeyError: content_name = unicode(uuid.uuid4()) else: if content_name in contents_dict: - raise exceptions.InternalError('There is already a content with this name') + raise exceptions.InternalError( + "There is already a content with this name" + ) contents_dict[content_name] = content_data # we construct the content element - content_elt = jingle_elt.addElement('content') - content_elt['creator'] = content_data['creator'] - content_elt['name'] = content_name + content_elt = jingle_elt.addElement("content") + content_elt["creator"] = content_data["creator"] + content_elt["name"] = content_name try: - content_elt['senders'] = content['senders'] + content_elt["senders"] = content["senders"] except KeyError: pass # then the description element - app_args = content.get('app_args', []) - app_kwargs = content.get('app_kwargs', {}) - desc_elt = yield application.handler.jingleSessionInit(client, session, content_name, *app_args, **app_kwargs) + app_args = content.get("app_args", []) + app_kwargs = content.get("app_kwargs", {}) + desc_elt = yield application.handler.jingleSessionInit( + client, session, content_name, *app_args, **app_kwargs + ) content_elt.addChild(desc_elt) # and the transport one - transport_elt = yield transport.handler.jingleSessionInit(client, session, content_name) + transport_elt = yield transport.handler.jingleSessionInit( + client, session, content_name + ) content_elt.addChild(transport_elt) try: @@ -410,17 +458,24 @@ @param content_name(unicode): name of the content terminated @param reason(unicode): reason of the termination """ - contents = session['contents'] + contents = session["contents"] del contents[content_name] if not contents: self.terminate(client, reason, session) ## defaults methods called when plugin doesn't have them ## - def jingleRequestConfirmationDefault(self, client, action, session, content_name, desc_elt): + def jingleRequestConfirmationDefault( + self, client, action, session, content_name, desc_elt + ): """This method request confirmation for a jingle session""" log.debug(u"Using generic jingle confirmation method") - return xml_tools.deferConfirm(self.host, _(CONFIRM_TXT).format(entity=session['peer_jid'].full()), _('Confirm Jingle session'), profile=client.profile) + return xml_tools.deferConfirm( + self.host, + _(CONFIRM_TXT).format(entity=session["peer_jid"].full()), + _("Confirm Jingle session"), + profile=client.profile, + ) ## jingle events ## @@ -432,29 +487,29 @@ @param request(domish.Element): received IQ request """ request.handled = True - jingle_elt = request.elements(NS_JINGLE, 'jingle').next() + jingle_elt = request.elements(NS_JINGLE, "jingle").next() # first we need the session id try: - sid = jingle_elt['sid'] + sid = jingle_elt["sid"] if not sid: raise KeyError except KeyError: log.warning(u"Received jingle request has no sid attribute") - self.sendError(client, 'bad-request', None, request) + self.sendError(client, "bad-request", None, request) return # then the action try: - action = jingle_elt['action'] + action = jingle_elt["action"] if not action: raise KeyError except KeyError: log.warning(u"Received jingle request has no action") - self.sendError(client, 'bad-request', None, request) + self.sendError(client, "bad-request", None, request) return - peer_jid = jid.JID(request['from']) + peer_jid = jid.JID(request["from"]) # we get or create the session try: @@ -463,32 +518,41 @@ if action == XEP_0166.A_SESSION_INITIATE: pass elif action == XEP_0166.A_SESSION_TERMINATE: - log.debug(u"ignoring session terminate action (inexisting session id): {request_id} [{profile}]".format( - request_id=sid, - profile = client.profile)) + log.debug( + u"ignoring session terminate action (inexisting session id): {request_id} [{profile}]".format( + request_id=sid, profile=client.profile + ) + ) return else: - log.warning(u"Received request for an unknown session id: {request_id} [{profile}]".format( - request_id=sid, - profile = client.profile)) - self.sendError(client, 'item-not-found', None, request, 'unknown-session') + log.warning( + u"Received request for an unknown session id: {request_id} [{profile}]".format( + request_id=sid, profile=client.profile + ) + ) + self.sendError(client, "item-not-found", None, request, "unknown-session") return - session = client.jingle_sessions[sid] = {'id': sid, - 'state': STATE_PENDING, - 'initiator': peer_jid, - 'role': XEP_0166.ROLE_RESPONDER, - 'peer_jid': peer_jid, - 'started': time.time(), - } + session = client.jingle_sessions[sid] = { + "id": sid, + "state": STATE_PENDING, + "initiator": peer_jid, + "role": XEP_0166.ROLE_RESPONDER, + "peer_jid": peer_jid, + "started": time.time(), + } else: - if session['peer_jid'] != peer_jid: - log.warning(u"sid conflict ({}), the jid doesn't match. Can be a collision, a hack attempt, or a bad sid generation".format(sid)) - self.sendError(client, 'service-unavailable', sid, request) + if session["peer_jid"] != peer_jid: + log.warning( + u"sid conflict ({}), the jid doesn't match. Can be a collision, a hack attempt, or a bad sid generation".format( + sid + ) + ) + self.sendError(client, "service-unavailable", sid, request) return - if session['id'] != sid: + if session["id"] != sid: log.error(u"session id doesn't match") - self.sendError(client, 'service-unavailable', sid, request) + self.sendError(client, "service-unavailable", sid, request) raise exceptions.InternalError if action == XEP_0166.A_SESSION_INITIATE: @@ -512,7 +576,17 @@ ## Actions callbacks ## - def _parseElements(self, jingle_elt, session, request, client, new=False, creator=ROLE_INITIATOR, with_application=True, with_transport=True): + def _parseElements( + self, + jingle_elt, + session, + request, + client, + new=False, + creator=ROLE_INITIATOR, + with_application=True, + with_transport=True, + ): """Parse contents elements and fill contents_dict accordingly after the parsing, contents_dict will containt handlers, "desc_elt" and "transport_elt" @@ -527,85 +601,98 @@ @param with_transport(bool): if True, raise an error if there is no <transport> element else ignore it @raise exceptions.CancelError: the error is treated and the calling method can cancel the treatment (i.e. return) """ - contents_dict = session['contents'] - content_elts = jingle_elt.elements(NS_JINGLE, 'content') + contents_dict = session["contents"] + content_elts = jingle_elt.elements(NS_JINGLE, "content") for content_elt in content_elts: - name = content_elt['name'] + name = content_elt["name"] if new: # the content must not exist, we check it if not name or name in contents_dict: - self.sendError(client, 'bad-request', session['id'], request) + self.sendError(client, "bad-request", session["id"], request) raise exceptions.CancelError - content_data = contents_dict[name] = {'creator': creator, - 'senders': content_elt.attributes.get('senders', 'both'), - } + content_data = contents_dict[name] = { + "creator": creator, + "senders": content_elt.attributes.get("senders", "both"), + } else: # the content must exist, we check it try: content_data = contents_dict[name] except KeyError: log.warning(u"Other peer try to access an unknown content") - self.sendError(client, 'bad-request', session['id'], request) + self.sendError(client, "bad-request", session["id"], request) raise exceptions.CancelError # application if with_application: desc_elt = content_elt.description if not desc_elt: - self.sendError(client, 'bad-request', session['id'], request) + self.sendError(client, "bad-request", session["id"], request) raise exceptions.CancelError if new: # the content is new, we need to check and link the application app_ns = desc_elt.uri if not app_ns or app_ns == NS_JINGLE: - self.sendError(client, 'bad-request', session['id'], request) + self.sendError(client, "bad-request", session["id"], request) raise exceptions.CancelError try: application = self._applications[app_ns] except KeyError: - log.warning(u"Unmanaged application namespace [{}]".format(app_ns)) - self.sendError(client, 'service-unavailable', session['id'], request) + log.warning( + u"Unmanaged application namespace [{}]".format(app_ns) + ) + self.sendError( + client, "service-unavailable", session["id"], request + ) raise exceptions.CancelError - content_data['application'] = application - content_data['application_data'] = {} + content_data["application"] = application + content_data["application_data"] = {} else: # the content exists, we check that we have not a former desc_elt - if 'desc_elt' in content_data: - raise exceptions.InternalError(u"desc_elt should not exist at this point") + if "desc_elt" in content_data: + raise exceptions.InternalError( + u"desc_elt should not exist at this point" + ) - content_data['desc_elt'] = desc_elt + content_data["desc_elt"] = desc_elt # transport if with_transport: transport_elt = content_elt.transport if not transport_elt: - self.sendError(client, 'bad-request', session['id'], request) + self.sendError(client, "bad-request", session["id"], request) raise exceptions.CancelError if new: # the content is new, we need to check and link the transport transport_ns = transport_elt.uri if not app_ns or app_ns == NS_JINGLE: - self.sendError(client, 'bad-request', session['id'], request) + self.sendError(client, "bad-request", session["id"], request) raise exceptions.CancelError try: transport = self._transports[transport_ns] except KeyError: - raise exceptions.InternalError(u"No transport registered for namespace {}".format(transport_ns)) - content_data['transport'] = transport - content_data['transport_data'] = {} + raise exceptions.InternalError( + u"No transport registered for namespace {}".format( + transport_ns + ) + ) + content_data["transport"] = transport + content_data["transport_data"] = {} else: # the content exists, we check that we have not a former transport_elt - if 'transport_elt' in content_data: - raise exceptions.InternalError(u"transport_elt should not exist at this point") + if "transport_elt" in content_data: + raise exceptions.InternalError( + u"transport_elt should not exist at this point" + ) - content_data['transport_elt'] = transport_elt + content_data["transport_elt"] = transport_elt def _ignore(self, client, action, session, content_name, elt): """Dummy method used when not exception must be raised if a method is not implemented in _callPlugins @@ -614,10 +701,19 @@ """ return elt - def _callPlugins(self, client, action, session, app_method_name='jingleHandler', - transp_method_name='jingleHandler', - app_default_cb=None, transp_default_cb=None, delete=True, - elements=True, force_element=None): + def _callPlugins( + self, + client, + action, + session, + app_method_name="jingleHandler", + transp_method_name="jingleHandler", + app_default_cb=None, + transp_default_cb=None, + delete=True, + elements=True, + force_element=None, + ): """Call application and transport plugin methods for all contents @param action(unicode): jingle action name @@ -640,12 +736,13 @@ @return (list[defer.Deferred]): list of launched Deferred @raise exceptions.NotFound: method is not implemented """ - contents_dict = session['contents'] + contents_dict = session["contents"] defers_list = [] for content_name, content_data in contents_dict.iteritems(): for method_name, handler_key, default_cb, elt_name in ( - (app_method_name, 'application', app_default_cb, 'desc_elt'), - (transp_method_name, 'transport', transp_default_cb, 'transport_elt')): + (app_method_name, "application", app_default_cb, "desc_elt"), + (transp_method_name, "transport", transp_default_cb, "transport_elt"), + ): if method_name is None: continue @@ -654,14 +751,18 @@ method = getattr(handler, method_name) except AttributeError: if default_cb is None: - raise exceptions.NotFound(u'{} not implemented !'.format(method_name)) + raise exceptions.NotFound( + u"{} not implemented !".format(method_name) + ) else: method = default_cb if elements: elt = content_data.pop(elt_name) if delete else content_data[elt_name] else: elt = force_element - d = defer.maybeDeferred(method, client, action, session, content_name, elt) + d = defer.maybeDeferred( + method, client, action, session, content_name, elt + ) defers_list.append(d) return defers_list @@ -678,30 +779,42 @@ @param jingle_elt(domish.Element): <jingle> element @param session(dict): session data """ - if 'contents' in session: - raise exceptions.InternalError("Contents dict should not already exist at this point") - session['contents'] = contents_dict = {} + if "contents" in session: + raise exceptions.InternalError( + "Contents dict should not already exist at this point" + ) + session["contents"] = contents_dict = {} try: - self._parseElements(jingle_elt, session, request, client, True, XEP_0166.ROLE_INITIATOR) + self._parseElements( + jingle_elt, session, request, client, True, XEP_0166.ROLE_INITIATOR + ) except exceptions.CancelError: return if not contents_dict: # there MUST be at least one content - self.sendError(client, 'bad-request', session['id'], request) + self.sendError(client, "bad-request", session["id"], request) return # at this point we can send the <iq/> result to confirm reception of the request - client.send(xmlstream.toResponse(request, 'result')) + client.send(xmlstream.toResponse(request, "result")) # we now request each application plugin confirmation # and if all are accepted, we can accept the session - confirm_defers = self._callPlugins(client, XEP_0166.A_SESSION_INITIATE, session, 'jingleRequestConfirmation', None, self.jingleRequestConfirmationDefault, delete=False) + confirm_defers = self._callPlugins( + client, + XEP_0166.A_SESSION_INITIATE, + session, + "jingleRequestConfirmation", + None, + self.jingleRequestConfirmationDefault, + delete=False, + ) confirm_dlist = defer.gatherResults(confirm_defers) confirm_dlist.addCallback(self._confirmationCb, session, jingle_elt, client) - confirm_dlist.addErrback(self._jingleErrorCb, session['id'], request, client) + confirm_dlist.addErrback(self._jingleErrorCb, session["id"], request, client) def _confirmationCb(self, confirm_results, session, jingle_elt, client): """Method called when confirmation from user has been received @@ -716,8 +829,10 @@ if not confirmed: return self.terminate(client, XEP_0166.REASON_DECLINE, session) - iq_elt, jingle_elt = self._buildJingleElt(client, session, XEP_0166.A_SESSION_ACCEPT) - jingle_elt['responder'] = client.jid.full() + iq_elt, jingle_elt = self._buildJingleElt( + client, session, XEP_0166.A_SESSION_ACCEPT + ) + jingle_elt["responder"] = client.jid.full() # contents @@ -726,52 +841,87 @@ defers_list = [] - for content_name, content_data in session['contents'].iteritems(): - content_elt = jingle_elt.addElement('content') - content_elt['creator'] = XEP_0166.ROLE_INITIATOR - content_elt['name'] = content_name + for content_name, content_data in session["contents"].iteritems(): + content_elt = jingle_elt.addElement("content") + content_elt["creator"] = XEP_0166.ROLE_INITIATOR + content_elt["name"] = content_name - application = content_data['application'] + application = content_data["application"] app_session_accept_cb = application.handler.jingleHandler - app_d = defer.maybeDeferred(app_session_accept_cb, client, - XEP_0166.A_SESSION_INITIATE, session, content_name, content_data.pop('desc_elt')) + app_d = defer.maybeDeferred( + app_session_accept_cb, + client, + XEP_0166.A_SESSION_INITIATE, + session, + content_name, + content_data.pop("desc_elt"), + ) app_d.addCallback(addElement, content_elt) defers_list.append(app_d) - transport = content_data['transport'] + transport = content_data["transport"] transport_session_accept_cb = transport.handler.jingleHandler - transport_d = defer.maybeDeferred(transport_session_accept_cb, client, - XEP_0166.A_SESSION_INITIATE, session, content_name, content_data.pop('transport_elt')) + transport_d = defer.maybeDeferred( + transport_session_accept_cb, + client, + XEP_0166.A_SESSION_INITIATE, + session, + content_name, + content_data.pop("transport_elt"), + ) transport_d.addCallback(addElement, content_elt) defers_list.append(transport_d) d_list = defer.DeferredList(defers_list) - d_list.addCallback(lambda dummy: self._callPlugins(client, XEP_0166.A_PREPARE_RESPONDER, session, app_method_name=None, elements=False)) + d_list.addCallback( + lambda dummy: self._callPlugins( + client, + XEP_0166.A_PREPARE_RESPONDER, + session, + app_method_name=None, + elements=False, + ) + ) d_list.addCallback(lambda dummy: iq_elt.send()) + def changeState(dummy, session): - session['state'] = STATE_ACTIVE + session["state"] = STATE_ACTIVE d_list.addCallback(changeState, session) - d_list.addCallback(lambda dummy: self._callPlugins(client, XEP_0166.A_ACCEPTED_ACK, session, elements=False)) - d_list.addErrback(self._iqError, session['id'], client) + d_list.addCallback( + lambda dummy: self._callPlugins( + client, XEP_0166.A_ACCEPTED_ACK, session, elements=False + ) + ) + d_list.addErrback(self._iqError, session["id"], client) return d_list def onSessionTerminate(self, client, request, jingle_elt, session): # TODO: check reason, display a message to user if needed - log.debug("Jingle Session {} terminated".format(session['id'])) + log.debug("Jingle Session {} terminated".format(session["id"])) try: - reason_elt = jingle_elt.elements(NS_JINGLE, 'reason').next() + reason_elt = jingle_elt.elements(NS_JINGLE, "reason").next() except StopIteration: log.warning(u"No reason given for session termination") - reason_elt = jingle_elt.addElement('reason') + reason_elt = jingle_elt.addElement("reason") - terminate_defers = self._callPlugins(client, XEP_0166.A_SESSION_TERMINATE, session, 'jingleTerminate', 'jingleTerminate', self._ignore, self._ignore, elements=False, force_element=reason_elt) + terminate_defers = self._callPlugins( + client, + XEP_0166.A_SESSION_TERMINATE, + session, + "jingleTerminate", + "jingleTerminate", + self._ignore, + self._ignore, + elements=False, + force_element=reason_elt, + ) terminate_dlist = defer.DeferredList(terminate_defers) - terminate_dlist.addCallback(lambda dummy: self._delSession(client, session['id'])) - client.send(xmlstream.toResponse(request, 'result')) + terminate_dlist.addCallback(lambda dummy: self._delSession(client, session["id"])) + client.send(xmlstream.toResponse(request, "result")) def onSessionAccept(self, client, request, jingle_elt, session): """Method called once session is accepted @@ -782,7 +932,7 @@ @param jingle_elt(domish.Element): the <jingle> element @param session(dict): session data """ - log.debug(u"Jingle session {} has been accepted".format(session['id'])) + log.debug(u"Jingle session {} has been accepted".format(session["id"])) try: self._parseElements(jingle_elt, session, request, client) @@ -790,9 +940,9 @@ return # at this point we can send the <iq/> result to confirm reception of the request - client.send(xmlstream.toResponse(request, 'result')) + client.send(xmlstream.toResponse(request, "result")) # and change the state - session['state'] = STATE_ACTIVE + session["state"] = STATE_ACTIVE negociate_defers = [] negociate_defers = self._callPlugins(client, XEP_0166.A_SESSION_ACCEPT, session) @@ -800,15 +950,21 @@ negociate_dlist = defer.DeferredList(negociate_defers) # after negociations we start the transfer - negociate_dlist.addCallback(lambda dummy: self._callPlugins(client, XEP_0166.A_START, session, app_method_name=None, elements=False)) + negociate_dlist.addCallback( + lambda dummy: self._callPlugins( + client, XEP_0166.A_START, session, app_method_name=None, elements=False + ) + ) def _onSessionCb(self, result, client, request, jingle_elt, session): - client.send(xmlstream.toResponse(request, 'result')) + client.send(xmlstream.toResponse(request, "result")) def _onSessionEb(self, failure_, client, request, jingle_elt, session): log.error(u"Error while handling onSessionInfo: {}".format(failure_.value)) # XXX: only error managed so far, maybe some applications/transports need more - self.sendError(client, 'feature-not-implemented', None, request, 'unsupported-info') + self.sendError( + client, "feature-not-implemented", None, request, "unsupported-info" + ) def onSessionInfo(self, client, request, jingle_elt, session): """Method called when a session-info action is received from other peer @@ -821,14 +977,21 @@ """ if not jingle_elt.children: # this is a session ping, see XEP-0166 §6.8 - client.send(xmlstream.toResponse(request, 'result')) + client.send(xmlstream.toResponse(request, "result")) return try: # XXX: session-info is most likely only used for application, so we don't call transport plugins # if a future transport use it, this behaviour must be adapted - defers = self._callPlugins(client, XEP_0166.A_SESSION_INFO, session, 'jingleSessionInfo', None, - elements=False, force_element=jingle_elt) + defers = self._callPlugins( + client, + XEP_0166.A_SESSION_INFO, + session, + "jingleSessionInfo", + None, + elements=False, + force_element=jingle_elt, + ) except exceptions.NotFound as e: self._onSessionEb(failure.Failure(e), client, request, jingle_elt, session) return @@ -849,32 +1012,40 @@ """ log.debug(u"Other peer wants to replace the transport") try: - self._parseElements(jingle_elt, session, request, client, with_application=False) + self._parseElements( + jingle_elt, session, request, client, with_application=False + ) except exceptions.CancelError: defer.returnValue(None) - client.send(xmlstream.toResponse(request, 'result')) + client.send(xmlstream.toResponse(request, "result")) content_name = None to_replace = [] - for content_name, content_data in session['contents'].iteritems(): + for content_name, content_data in session["contents"].iteritems(): try: - transport_elt = content_data.pop('transport_elt') + transport_elt = content_data.pop("transport_elt") except KeyError: continue transport_ns = transport_elt.uri try: transport = self._transports[transport_ns] except KeyError: - log.warning(u"Other peer want to replace current transport with an unknown one: {}".format(transport_ns)) + log.warning( + u"Other peer want to replace current transport with an unknown one: {}".format( + transport_ns + ) + ) content_name = None break to_replace.append((content_name, content_data, transport, transport_elt)) if content_name is None: # wa can't accept the replacement - iq_elt, reject_jingle_elt = self._buildJingleElt(client, session, XEP_0166.A_TRANSPORT_REJECT) + iq_elt, reject_jingle_elt = self._buildJingleElt( + client, session, XEP_0166.A_TRANSPORT_REJECT + ) for child in jingle_elt.children: reject_jingle_elt.addChild(child) @@ -883,21 +1054,29 @@ # at this point, everything is alright and we can replace the transport(s) # this is similar to an session-accept action, but for transports only - iq_elt, accept_jingle_elt = self._buildJingleElt(client, session, XEP_0166.A_TRANSPORT_ACCEPT) + iq_elt, accept_jingle_elt = self._buildJingleElt( + client, session, XEP_0166.A_TRANSPORT_ACCEPT + ) for content_name, content_data, transport, transport_elt in to_replace: # we can now actually replace the transport - yield content_data['transport'].handler.jingleHandler(client, XEP_0166.A_DESTROY, session, content_name, None) - content_data['transport'] = transport - content_data['transport_data'].clear() + yield content_data["transport"].handler.jingleHandler( + client, XEP_0166.A_DESTROY, session, content_name, None + ) + content_data["transport"] = transport + content_data["transport_data"].clear() # and build the element - content_elt = accept_jingle_elt.addElement('content') - content_elt['name'] = content_name - content_elt['creator'] = content_data['creator'] + content_elt = accept_jingle_elt.addElement("content") + content_elt["name"] = content_name + content_elt["creator"] = content_data["creator"] # we notify the transport and insert its <transport/> in the answer - accept_transport_elt = yield transport.handler.jingleHandler(client, XEP_0166.A_TRANSPORT_REPLACE, session, content_name, transport_elt) + accept_transport_elt = yield transport.handler.jingleHandler( + client, XEP_0166.A_TRANSPORT_REPLACE, session, content_name, transport_elt + ) content_elt.addChild(accept_transport_elt) # there is no confirmation needed here, so we can directly prepare it - yield transport.handler.jingleHandler(client, XEP_0166.A_PREPARE_RESPONDER, session, content_name, None) + yield transport.handler.jingleHandler( + client, XEP_0166.A_PREPARE_RESPONDER, session, content_name, None + ) iq_elt.send() @@ -912,20 +1091,28 @@ log.debug(u"new transport has been accepted") try: - self._parseElements(jingle_elt, session, request, client, with_application=False) + self._parseElements( + jingle_elt, session, request, client, with_application=False + ) except exceptions.CancelError: return # at this point we can send the <iq/> result to confirm reception of the request - client.send(xmlstream.toResponse(request, 'result')) + client.send(xmlstream.toResponse(request, "result")) negociate_defers = [] - negociate_defers = self._callPlugins(client, XEP_0166.A_TRANSPORT_ACCEPT, session, app_method_name=None) + negociate_defers = self._callPlugins( + client, XEP_0166.A_TRANSPORT_ACCEPT, session, app_method_name=None + ) negociate_dlist = defer.DeferredList(negociate_defers) # after negociations we start the transfer - negociate_dlist.addCallback(lambda dummy: self._callPlugins(client, XEP_0166.A_START, session, app_method_name=None, elements=False)) + negociate_dlist.addCallback( + lambda dummy: self._callPlugins( + client, XEP_0166.A_START, session, app_method_name=None, elements=False + ) + ) def onTransportReject(self, client, request, jingle_elt, session): """Method called when a transport replacement is refused @@ -937,7 +1124,7 @@ """ # XXX: for now, we terminate the session in case of transport-reject # this behaviour may change in the future - self.terminate(client, 'failed-transport', session) + self.terminate(client, "failed-transport", session) def onTransportInfo(self, client, request, jingle_elt, session): """Method called when a transport-info action is received from other peer @@ -948,23 +1135,31 @@ @param jingle_elt(domish.Element): the <jingle> element @param session(dict): session data """ - log.debug(u"Jingle session {} has been accepted".format(session['id'])) + log.debug(u"Jingle session {} has been accepted".format(session["id"])) try: - self._parseElements(jingle_elt, session, request, client, with_application=False) + self._parseElements( + jingle_elt, session, request, client, with_application=False + ) except exceptions.CancelError: return # The parsing was OK, we send the <iq> result - client.send(xmlstream.toResponse(request, 'result')) + client.send(xmlstream.toResponse(request, "result")) - for content_name, content_data in session['contents'].iteritems(): + for content_name, content_data in session["contents"].iteritems(): try: - transport_elt = content_data.pop('transport_elt') + transport_elt = content_data.pop("transport_elt") except KeyError: continue else: - content_data['transport'].handler.jingleHandler(client, XEP_0166.A_TRANSPORT_INFO, session, content_name, transport_elt) + content_data["transport"].handler.jingleHandler( + client, + XEP_0166.A_TRANSPORT_INFO, + session, + content_name, + transport_elt, + ) class XEP_0166_handler(xmlstream.XMPPHandler): @@ -974,10 +1169,12 @@ self.plugin_parent = plugin_parent def connectionInitialized(self): - self.xmlstream.addObserver(JINGLE_REQUEST, self.plugin_parent._onJingleRequest, client=self.parent) + self.xmlstream.addObserver( + JINGLE_REQUEST, self.plugin_parent._onJingleRequest, client=self.parent + ) - def getDiscoInfo(self, requestor, target, nodeIdentifier=''): + def getDiscoInfo(self, requestor, target, nodeIdentifier=""): return [disco.DiscoFeature(NS_JINGLE)] - def getDiscoItems(self, requestor, target, nodeIdentifier=''): + def getDiscoItems(self, requestor, target, nodeIdentifier=""): return []