Mercurial > libervia-backend
changeset 1556:cbfbe028d099
plugin XEP-0166, XEP-0234, XEP-0261:
- transport and application data are now managed in separate dictionaries
- new client.IQ method is used
- new buildAction helper method for applications and transports
- "role" is now stored in session_data
- "senders" is now stored in content_data
- plugin XEP-0166: "transport-info" action is now managed
- plugin XEP-0166: application namespace and handler are now managed in a namedtuple
- plugin XEP-0234: <range/> element is added by responder if not already present
author | Goffi <goffi@goffi.org> |
---|---|
date | Mon, 02 Nov 2015 22:02:41 +0100 |
parents | eb8aae35085b |
children | 22f0307864b4 |
files | src/plugins/plugin_xep_0166.py src/plugins/plugin_xep_0234.py src/plugins/plugin_xep_0261.py |
diffstat | 3 files changed, 177 insertions(+), 83 deletions(-) [+] |
line wrap: on
line diff
--- a/src/plugins/plugin_xep_0166.py Mon Nov 02 22:02:41 2015 +0100 +++ b/src/plugins/plugin_xep_0166.py Mon Nov 02 22:02:41 2015 +0100 @@ -28,7 +28,7 @@ # from twisted.words.xish import domish from twisted.internet import defer # from wokkel import disco, iwokkel, data_form, compat -from wokkel import disco, iwokkel, compat +from wokkel import disco, iwokkel from twisted.words.protocols.jabber import error from twisted.words.protocols.jabber import xmlstream # from sat.core import exceptions @@ -49,8 +49,6 @@ STATE_PENDING = "PENDING" STATE_ACTIVE = "ACTIVE" STATE_ENDED = "ENDED" -INITIATOR = "initiator" -RESPONDER = "responder" CONFIRM_TXT = D_("{entity} want to start a jingle session with you, do you accept ?") PLUGIN_INFO = { @@ -64,10 +62,13 @@ } +ApplicationData = namedtuple('ApplicationData', ('namespace', 'handler')) TransportData = namedtuple('TransportData', ('namespace', 'handler', 'priority')) class XEP_0166(object): + ROLE_INITIATOR = "initiator" + ROLE_RESPONDER = "responder" TRANSPORT_DATAGRAM='UDP' TRANSPORT_STREAMING='TCP' REASON_SUCCESS='success' @@ -77,6 +78,7 @@ A_SESSION_INITIATE = "session-initiate" A_SESSION_ACCEPT = "session-accept" A_SESSION_TERMINATE = "session-terminate" + A_TRANSPORT_INFO = "transport-info" # non standard actions A_PREPARE_INITIATOR = "prepare-initiator" # initiator must prepare tranfer A_PREPARE_RESPONDER = "prepare-responder" # responder must prepare tranfer @@ -111,7 +113,7 @@ ## helpers methods to build stanzas ## def _buildJingleElt(self, client, session, action): - iq_elt = compat.IQ(client.xmlstream, 'set') + iq_elt = client.IQ('set') iq_elt['from'] = client.jid.full() iq_elt['to'] = session['to_jid'].full() jingle_elt = iq_elt.addElement("jingle", NS_JINGLE) @@ -204,7 +206,8 @@ """ if namespace in self._applications: raise exceptions.ConflictError(u"Trying to register already registered namespace {}".format(namespace)) - self._applications[namespace] = handler + self._applications[namespace] = ApplicationData(namespace=namespace, handler=handler) + log.debug(u"new jingle application registered") def registerTransport(self, namespace, transport_type, handler, priority=0): """Register a transport plugin @@ -223,10 +226,38 @@ raise exceptions.ConflictError(u"Trying to register already registered namespace {}".format(namespace)) transport_data = TransportData(namespace=namespace, handler=handler, priority=priority) self._type_transports[transport_type].append(transport_data) - self._type_transports[transport_type].sort(key=lambda transport_data: transport_data.priority) + self._type_transports[transport_type].sort(key=lambda transport_data: transport_data.priority, reverse=True) self._transports[namespace] = transport_data log.debug(u"new jingle transport registered") + def buildAction(self, action, session, content_name, profile=C.PROF_KEY_NONE): + """Build an element according to requested action + + @param action(unicode): a jingle action (see XEP-0166 §7.2), + session-* actions are not managed here + @param session(dict): jingle session data + @param content_name(unicode): name of the content terminated + @param profile: %(doc_profile)s + @return (tuple[domish.Element, domish.Element]): parent <iq> element, <transport> or <description> element, according to action + """ + client = self.host.getClient(profile) + + # we first build iq, jingle and content element which are the same in every cases + iq_elt, jingle_elt = self._buildJingleElt(client, session, action) + # FIXME: XEP-0260 § 2.3 Ex 5 has an initiator attribute, but it should not according to XEP-0166 §7.1 table 1, must be checked + content_data= session['contents'][content_name] + content_elt = jingle_elt.addElement('content') + content_elt['name'] = content_name + content_elt['creator'] = content_data['creator'] + + if action == XEP_0166.A_TRANSPORT_INFO: + context_elt = transport_elt = content_elt.addElement('transport', content_data['transport'].namespace) + transport_elt['sid'] = content_data['transport_data']['sid'] + else: + raise exceptions.InternalError(u"unmanaged action {}".format(action)) + + return iq_elt, context_elt + @defer.inlineCallbacks def initiate(self, to_jid, contents, profile=C.PROF_KEY_NONE): """Send a session initiation request @@ -239,6 +270,8 @@ - transport_type(unicode): type of transport to use (see XEP-0166 §8) default to TRANSPORT_STREAMING - name(unicode): name of the content + - senders(unicode): One of XEP_0166.ROLE_INITIATOR, XEP_0166.ROLE_RESPONDER, both or none + default to BOTH (see XEP-0166 §7.3) - app_args(list): args to pass to the application plugin - app_kwargs(dict): keyword args to pass to the application plugin @param profile: %(doc_profile)s @@ -251,6 +284,7 @@ session = client.jingle_sessions[sid] = {'id': sid, 'state': STATE_PENDING, 'initiator': initiator, + 'role': XEP_0166.ROLE_INITIATOR, 'to_jid': to_jid, 'started': time.time(), 'contents': {} @@ -264,21 +298,24 @@ # we get the application plugin app_ns = content['app_ns'] try: - application_handler = self._applications[app_ns] + application = self._applications[app_ns] except KeyError: raise exceptions.InternalError(u"No application registered for {}".format(app_ns)) # and the transport plugin transport_type = content.get('transport_type', XEP_0166.TRANSPORT_STREAMING) try: - transport_handler = self._type_transports[transport_type][0].handler + transport = self._type_transports[transport_type][0] except IndexError: raise exceptions.InternalError(u"No transport registered for {}".format(transport_type)) # we build the session data - content_data = {'application': application_handler, - 'transport': transport_handler, - 'creator': INITIATOR, + content_data = {'application': application, + 'application_data': {}, + 'transport': transport, + 'transport_data': {}, + 'creator': XEP_0166.ROLE_INITIATOR, + 'senders': content.get('senders', 'both'), } try: content_name = content['name'] @@ -293,16 +330,20 @@ content_elt = jingle_elt.addElement('content') content_elt['creator'] = content_data['creator'] content_elt['name'] = content_name + try: + content_elt['senders'] = content['senders'] + except KeyError: + pass # then the description element app_args = content.get('app_args', []) app_kwargs = content.get('app_kwargs', {}) app_kwargs['profile'] = profile - desc_elt = yield application_handler.jingleSessionInit(session, content_name, *app_args, **app_kwargs) + desc_elt = yield application.handler.jingleSessionInit(session, content_name, *app_args, **app_kwargs) content_elt.addChild(desc_elt) # and the transport one - transport_elt = yield transport_handler.jingleSessionInit(session, content_name, profile) + transport_elt = yield transport.handler.jingleSessionInit(session, content_name, profile) content_elt.addChild(transport_elt) d = iq_elt.send() @@ -325,7 +366,7 @@ ## defaults methods called when plugin doesn't have them ## - def jingleRequestConfirmationDefault(self, session, desc_elt, profile): + def jingleRequestConfirmationDefault(self, action, session, content_name, desc_elt, profile): """This method request confirmation for a jingle session""" log.debug(u"Using generic jingle confirmation method") return xml_tools.deferConfirm(self.host, _(CONFIRM_TXT).format(entity=session['to_jid'].full()), _('Confirm Jingle session'), profile=profile) @@ -373,6 +414,7 @@ session = client.jingle_sessions[sid] = {'id': sid, 'state': STATE_PENDING, 'initiator': to_jid, + 'role': XEP_0166.ROLE_RESPONDER, 'to_jid': to_jid, 'started': time.time(), } @@ -392,20 +434,26 @@ self.onSessionTerminate(client, request, jingle_elt, session) elif action == XEP_0166.A_SESSION_ACCEPT: self.onSessionAccept(client, request, jingle_elt, session) + elif action == XEP_0166.A_TRANSPORT_INFO: + self.onTransportInfo(client, request, jingle_elt, session) else: - raise exceptions.InternalError(u"Unknown action {}".format(session['state'])) + raise exceptions.InternalError(u"Unknown action {}".format(action)) ## Actions callbacks ## - def _parseElements(self, jingle_elt, session, request, client, new=False, creator=INITIATOR): + def _parseElements(self, jingle_elt, session, request, client, new=False, creator=ROLE_INITIATOR, with_application=True, with_transport=True): """Parse contents elements and fill contents_dict accordingly after the parsing, contents_dict will containt handlers, "desc_elt" and "transport_elt" @param jingle_elt(domish.Element): parent <jingle> element, containing one or more <content> - @param contents_dict(dict): session data for contents, the key is the name of the content + @param session(dict): session data + @param request(domish.Element): the whole request + @param client: %(doc_client)s @param new(bool): if new the content is new and must be created, else the content must exists, and session data will be filled @param creator(unicode): only used if new is True: creating pear (see § 7.3) + @param with_application(bool): if True, raise an error if there is no <description> element else ignore it + @param with_transport(bool): if True, raise an error if there is no <transport> element else ignore it @raise exceptions.CancelError: the error is treated the calling method can cancel the treatment (i.e. return) """ contents_dict = session['contents'] @@ -419,7 +467,9 @@ if not name or name in contents_dict: self.sendError('bad-request', session['id'], request, client.profile) raise exceptions.CancelError - content_data = contents_dict[name] = {'creator': creator} + content_data = contents_dict[name] = {'creator': creator, + 'senders': content_elt.attributes.get('senders', 'both'), + } else: # the content must exist, we check it try: @@ -430,57 +480,61 @@ raise exceptions.CancelError # application - desc_elt = content_elt.description - if not desc_elt: - self.sendError('bad-request', session['id'], request, client.profile) - raise exceptions.CancelError - - if new: - # the content is new, we need to check and link the application_handler - app_ns = desc_elt.uri - if not app_ns or app_ns == NS_JINGLE: + if with_application: + desc_elt = content_elt.description + if not desc_elt: self.sendError('bad-request', session['id'], request, client.profile) raise exceptions.CancelError - try: - application_handler = self._applications[app_ns] - except KeyError: - log.warning(u"Unmanaged application namespace [{}]".format(app_ns)) - self.sendError('service-unavailable', session['id'], request, client.profile) - raise exceptions.CancelError + if new: + # the content is new, we need to check and link the application + app_ns = desc_elt.uri + if not app_ns or app_ns == NS_JINGLE: + self.sendError('bad-request', session['id'], request, client.profile) + raise exceptions.CancelError - content_data['application'] = application_handler - else: - # the content exists, we check that we have not a former desc_elt - if 'desc_elt' in content_data: - raise exceptions.InternalError(u"desc_elt should not exist at this point") + try: + application = self._applications[app_ns] + except KeyError: + log.warning(u"Unmanaged application namespace [{}]".format(app_ns)) + self.sendError('service-unavailable', session['id'], request, client.profile) + raise exceptions.CancelError - content_data['desc_elt'] = desc_elt + content_data['application'] = application + content_data['application_data'] = {} + else: + # the content exists, we check that we have not a former desc_elt + if 'desc_elt' in content_data: + raise exceptions.InternalError(u"desc_elt should not exist at this point") + + content_data['desc_elt'] = desc_elt # transport - transport_elt = content_elt.transport - if not transport_elt: - self.sendError('bad-request', session['id'], request, client.profile) - raise exceptions.CancelError - - if new: - # the content is new, we need to check and link the transport_handler - transport_ns = transport_elt.uri - if not app_ns or app_ns == NS_JINGLE: + if with_transport: + transport_elt = content_elt.transport + if not transport_elt: self.sendError('bad-request', session['id'], request, client.profile) raise exceptions.CancelError - try: - transport_handler = self._transports[transport_ns].handler - except KeyError: - raise exceptions.InternalError(u"No transport registered for namespace {}".format(transport_ns)) - content_data['transport'] = transport_handler - else: - # the content exists, we check that we have not a former transport_elt - if 'transport_elt' in content_data: - raise exceptions.InternalError(u"desc_elt should not exist at this point") + if new: + # the content is new, we need to check and link the transport + transport_ns = transport_elt.uri + if not app_ns or app_ns == NS_JINGLE: + self.sendError('bad-request', session['id'], request, client.profile) + raise exceptions.CancelError - content_data['transport_elt'] = transport_elt + try: + transport = self._transports[transport_ns] + except KeyError: + raise exceptions.InternalError(u"No transport registered for namespace {}".format(transport_ns)) + content_data['transport'] = transport + content_data['transport_data'] = {} + else: + # the content exists, we check that we have not a former transport_elt + if 'transport_elt' in content_data: + raise exceptions.InternalError(u"desc_elt should not exist at this point") + + content_data['transport_elt'] = transport_elt def _callPlugins(self, action, session, app_method_name='jingleHandler', transp_method_name='jingleHandler', app_default_cb=None, transp_default_cb=None, delete=True, elements=True, profile=C.PROF_KEY_NONE): """Call application and transport plugin methods for all contents @@ -511,7 +565,7 @@ if method_name is None: continue - handler = content_data[handler_key] + handler = content_data[handler_key].handler try: method = getattr(handler, method_name) except AttributeError: @@ -546,7 +600,7 @@ session['contents'] = contents_dict = {} try: - self._parseElements(jingle_elt, session, request, client, True, INITIATOR) + self._parseElements(jingle_elt, session, request, client, True, XEP_0166.ROLE_INITIATOR) except exceptions.CancelError: return @@ -591,19 +645,19 @@ for content_name, content_data in session['contents'].iteritems(): content_elt = jingle_elt.addElement('content') - content_elt['creator'] = INITIATOR + content_elt['creator'] = XEP_0166.ROLE_INITIATOR content_elt['name'] = content_name - application_handler = content_data['application'] - app_session_accept_cb = application_handler.jingleHandler + application = content_data['application'] + app_session_accept_cb = application.handler.jingleHandler app_d = defer.maybeDeferred(app_session_accept_cb, XEP_0166.A_SESSION_INITIATE, session, content_name, content_data.pop('desc_elt'), client.profile) app_d.addCallback(addElement, content_elt) defers_list.append(app_d) - transport_handler = content_data['transport'] - transport_session_accept_cb = transport_handler.jingleHandler + transport = content_data['transport'] + transport_session_accept_cb = transport.handler.jingleHandler transport_d = defer.maybeDeferred(transport_session_accept_cb, XEP_0166.A_SESSION_INITIATE, session, content_name, content_data.pop('transport_elt'), client.profile) @@ -628,7 +682,7 @@ client.xmlstream.send(xmlstream.toResponse(request, 'result')) def onSessionAccept(self, client, request, jingle_elt, session): - """Method called one sesion is accepted + """Method called once session is accepted This method is only called for initiator @param client: %(doc_client)s @@ -656,6 +710,33 @@ # after negociations we start the transfer negociate_dlist.addCallback(lambda dummy: self._callPlugins(XEP_0166.A_START, session, app_method_name=None, elements=False, profile=client.profile)) + def onTransportInfo(self, client, request, jingle_elt, session): + """Method called when a transport-info action is received from other peer + + The request is parsed, and jingleHandler is called on concerned transport plugin(s) + @param client: %(doc_client)s + @param request(domish.Element): full <iq> request + @param jingle_elt(domish.Element): the <jingle> element + @param session(dict): session data + """ + log.debug(u"Jingle session {} has been accepted".format(session['id'])) + + try: + self._parseElements(jingle_elt, session, request, client, with_application=False) + except exceptions.CancelError: + return + + # The parsing was OK, we send the <iq> result + client.xmlstream.send(xmlstream.toResponse(request, 'result')) + + for content_name, content_data in session['contents'].iteritems(): + try: + transport_elt = content_data.pop('transport_elt') + except KeyError: + continue + else: + content_data['transport'].handler.jingleHandler(XEP_0166.A_TRANSPORT_INFO, session, content_name, transport_elt, client.profile) + class XEP_0166_handler(xmlstream.XMPPHandler): implements(iwokkel.IDisco)
--- a/src/plugins/plugin_xep_0234.py Mon Nov 02 22:02:41 2015 +0100 +++ b/src/plugins/plugin_xep_0234.py Mon Nov 02 22:02:41 2015 +0100 @@ -93,17 +93,18 @@ @param profile: %(doc_profile)s return (defer.Deferred): True if transfer is accepted """ - file_data = content_data['file_data'] + application_data = content_data['application_data'] + file_data = application_data['file_data'] d = xml_tools.deferDialog(self.host, _(CONFIRM).format(entity=session['to_jid'].full(), **file_data), _(CONFIRM_TITLE), type_=C.XMLUI_DIALOG_FILE, options={C.XMLUI_DATA_FILETYPE: C.XMLUI_DATA_FILETYPE_DIR}, profile=profile) - d.addCallback(self._gotConfirmation, session, content_data, profile) + d.addCallback(self._gotConfirmation, session, content_data, application_data, profile) return d - def _gotConfirmation(self, data, session, content_data, profile): + def _gotConfirmation(self, data, session, content_data, application_data, profile): """Called when the permission and dest path have been received @param data(dict): xmlui data received from file dialog @@ -113,7 +114,7 @@ """ if data.get('cancelled', False): return False - file_data = content_data['file_data'] + file_data = application_data['file_data'] path = data['path'] file_data['file_path'] = file_path = os.path.join(path, file_data['name']) log.debug(u'destination file path set to {}'.format(file_path)) @@ -144,9 +145,10 @@ def jingleSessionInit(self, session, content_name, filepath, name=None, file_desc=None, profile=C.PROF_KEY_NONE): content_data = session['contents'][content_name] - assert 'file_path' not in content_data - content_data['file_path'] = filepath - file_data = content_data['file_data'] = {} + application_data = content_data['application_data'] + assert 'file_path' not in application_data + application_data['file_path'] = filepath + file_data = application_data['file_data'] = {} file_data['date'] = utils.xmpp_date() file_data['desc'] = file_desc or '' file_data['media-type'] = "application/octet-stream" # TODO @@ -190,7 +192,7 @@ # TODO: parse hash using plugin XEP-0300 - content_data['file_data'] = file_data + content_data['application_data']['file_data'] = file_data # now we actualy request permission to user return self._getDestDir(session, content_data, profile) @@ -198,12 +200,21 @@ def jingleHandler(self, action, session, content_name, desc_elt, profile): content_data = session['contents'][content_name] - if action in (self._j.A_SESSION_INITIATE, self._j.A_ACCEPTED_ACK): + application_data = content_data['application_data'] + if action in (self._j.A_ACCEPTED_ACK,): pass + elif action == self._j.A_SESSION_INITIATE: + file_elt = desc_elt.elements(NS_JINGLE_FT, 'file').next() + try: + file_elt.elements(NS_JINGLE_FT, 'range').next() + except StopIteration: + # initiator doesn't manage <range>, but we do so we advertise it + log.debug("adding <range> element") + file_elt.addElement('range') elif action == self._j.A_SESSION_ACCEPT: assert not 'file_obj' in content_data - file_path = content_data['file_path'] - size = content_data['file_data']['size'] + file_path = application_data['file_path'] + size = application_data['file_data']['size'] file_obj = content_data['file_obj'] = self._f.File(self.host, file_path, size=size,
--- a/src/plugins/plugin_xep_0261.py Mon Nov 02 22:02:41 2015 +0100 +++ b/src/plugins/plugin_xep_0261.py Mon Nov 02 22:02:41 2015 +0100 @@ -60,24 +60,26 @@ def jingleSessionInit(self, session, content_name, profile): transport_elt = domish.Element((NS_JINGLE_IBB, "transport")) content_data = session['contents'][content_name] - content_data['ibb_block_size'] = self._ibb.BLOCK_SIZE - transport_elt['block-size'] = unicode(content_data['ibb_block_size']) - transport_elt['sid'] = content_data['ibb_sid'] = unicode(uuid.uuid4()) + transport_data = content_data['transport_data'] + transport_data['block_size'] = self._ibb.BLOCK_SIZE + transport_elt['block-size'] = unicode(transport_data['block_size']) + transport_elt['sid'] = transport_data['sid'] = unicode(uuid.uuid4()) return transport_elt def jingleHandler(self, action, session, content_name, transport_elt, profile): content_data = session['contents'][content_name] + transport_data = content_data['transport_data'] if action in (self._j.A_SESSION_ACCEPT, self._j.A_ACCEPTED_ACK): pass elif action == self._j.A_SESSION_INITIATE: - content_data['ibb_sid'] = transport_elt['sid'] + transport_data['sid'] = transport_elt['sid'] elif action in (self._j.A_START, self._j.A_PREPARE_RESPONDER): to_jid = session['to_jid'] - sid = content_data['ibb_sid'] + sid = transport_data['sid'] file_obj = content_data['file_obj'] args = [session, content_name, profile] if action == self._j.A_START: - block_size = content_data['ibb_block_size'] + block_size = transport_data['block_size'] d = self._ibb.startStream(file_obj, to_jid, sid, block_size, profile) d.addErrback(self._streamEb, *args) else: