# HG changeset patch # User Goffi # Date 1443201552 -7200 # Node ID 0209f8d358736dcbdde2574ab0a2b54ee6b2865c # Parent 7d7e57a84792cb5542a76b5fe478e0ffb0d9bf22 plugin XEP-0166: (jingle) first draft. Not all actions are managed yet diff -r 7d7e57a84792 -r 0209f8d35873 src/plugins/plugin_xep_0166.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/plugins/plugin_xep_0166.py Fri Sep 25 19:19:12 2015 +0200 @@ -0,0 +1,673 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +# SAT plugin for Jingle (XEP-0166) +# Copyright (C) 2009, 2010, 2011, 2012, 2013, 2014, 2015 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +from sat.core.i18n import _, D_ +from sat.core.constants import Const as C +from sat.core.log import getLogger +from sat.tools import xml_tools +log = getLogger(__name__) +from sat.core import exceptions +from twisted.words.protocols.jabber import jid +# from twisted.words.protocols import jabber +# from twisted.words.xish import domish +from twisted.internet import defer +# from wokkel import disco, iwokkel, data_form, compat +from wokkel import disco, iwokkel, compat +from twisted.words.protocols.jabber import error +from twisted.words.protocols.jabber import xmlstream +# from sat.core import exceptions +# from sat.memory.memory import Sessions +# from uuid import uuid4 +# from sat.tools import xml_tools +from collections import namedtuple +import uuid +import time + +from zope.interface import implements + + + +IQ_SET = '/iq[@type="set"]' +NS_JINGLE = "urn:xmpp:jingle:1" +JINGLE_REQUEST = IQ_SET + '/jingle[@xmlns="' + NS_JINGLE + '"]' +STATE_PENDING = "PENDING" +STATE_ACTIVE = "ACTIVE" +STATE_ENDED = "ENDED" +INITIATOR = "initiator" +RESPONDER = "responder" +CONFIRM_TXT = D_("{entity} want to start a jingle session with you, do you accept ?") + +PLUGIN_INFO = { + "name": "Jingle", + "import_name": "XEP-0166", + "type": "XEP", + "protocols": ["XEP-0166"], + "main": "XEP_0166", + "handler": "yes", + "description": _("""Implementation of Jingle""") +} + + +TransportData = namedtuple('TransportData', ('namespace', 'handler', 'priority')) + + +class XEP_0166(object): + TRANSPORT_DATAGRAM='UDP' + TRANSPORT_STREAMING='TCP' + REASON_SUCCESS='success' + REASON_DECLINE='decline' + REASON_FAILED_APPLICATION='failed-application' + REASON_FAILED_TRANSPORT='failed-transport' + A_SESSION_INITIATE = "session-initiate" + A_SESSION_ACCEPT = "session-accept" + A_SESSION_TERMINATE = "session-terminate" + # non standard actions + A_PREPARE_INITIATOR = "prepare-initiator" # initiator must prepare tranfer + A_PREPARE_RESPONDER = "prepare-responder" # responder must prepare tranfer + A_ACCEPTED_ACK = "accepted-ack" # session accepted ack has been received from initiator + A_START = "start" # application can start + + def __init__(self, host): + log.info(_("plugin Jingle initialization")) + self.host = host + self._applications = {} # key: namespace, value: application data + self._transports = {} # key: namespace, value: transport data + # we also keep transports by type, they are then sorted by priority + self._type_transports = { XEP_0166.TRANSPORT_DATAGRAM: [], + XEP_0166.TRANSPORT_STREAMING: [], + } + + def profileConnected(self, profile): + client = self.host.getClient(profile) + client.jingle_sessions = {} # key = sid, value = session_data + + def getHandler(self, profile): + return XEP_0166_handler(self) + + def _delSession(self, client, sid): + try: + del client.jingle_sessions[sid] + except KeyError: + log.debug(u"Jingle session id [{}] is unknown, nothing to delete".format(sid)) + else: + log.debug(u"Jingle session id [{}] deleted".format(sid)) + + ## helpers methods to build stanzas ## + + def _buildJingleElt(self, client, session, action): + iq_elt = compat.IQ(client.xmlstream, 'set') + iq_elt['from'] = client.jid.full() + iq_elt['to'] = session['to_jid'].full() + jingle_elt = iq_elt.addElement("jingle", NS_JINGLE) + jingle_elt["sid"] = session['id'] + jingle_elt['action'] = action + return iq_elt, jingle_elt + + def sendError(self, error_condition, sid, request, profile): + """Send error stanza + + @param error_condition: one of twisted.words.protocols.jabber.error.STANZA_CONDITIONS keys + @param sid(unicode,None): jingle session id, or None, if session must not be destroyed + @param request(domish.Element): original request + @param profile: %(doc_profile)s + """ + client = self.host.getClient(profile) + iq_elt = error.StanzaError(error_condition).toResponse(request) + if error.STANZA_CONDITIONS[error_condition]['type'] == 'cancel' and sid: + self._delSession(client, sid) + log.warning(u"Error while managing jingle session, cancelling: {condition}".format(error_condition)) + client.xmlstream.send(iq_elt) + + def terminate(self, reason, session, profile): + """Terminate the session + + send the session-terminate action, and delete the session data + @param reason(unicode, list[domish.Element]): if unicode, will be transformed to an element + if a list of element, add them as children of the element + @param session(dict): data of the session + @param profile: %(doc_profile)s + """ + client = self.host.getClient(profile) + iq_elt, jingle_elt = self._buildJingleElt(client, session, XEP_0166.A_SESSION_TERMINATE) + reason_elt = jingle_elt.addElement('reason') + if isinstance(reason, basestring): + reason_elt.addElement(reason) + else: + for elt in reason: + reason_elt.addChild(elt) + self._delSession(client, session['id']) + d = iq_elt.send() + return d + + ## errors which doesn't imply a stanza sending ## + + def _iqError(self, failure, sid, client): + """Called when we got an error + + @param failure(failure.Failure): the exceptions raised + @param sid(unicode): jingle session id + @param profile: %(doc_client)s + """ + log.warning(u"Error while sending jingle stanza: {failure}".format(failure=failure.value)) + self._delSession(client, sid) + + def _jingleErrorCb(self, fail, sid, request, client): + """Called when something is going wrong while parsing jingle request + + The error condition depend of the exceptions raised: + exceptions.DataError raise a bad-request condition + @param fail(failure.Failure): the exceptions raised + @param sid(unicode): jingle session id + @param request(domsih.Element): jingle request + @param client: %(doc_client)s + """ + log.warning("Error while processing jingle request") + if isinstance(fail, exceptions.DataError): + self.sendError('bad-request', sid, request, client.profile) + else: + log.error("Unmanaged jingle exception") + self._delSession(client, sid) + raise fail + + ## methods used by other plugins ## + + def registerApplication(self, namespace, handler): + """Register an application plugin + + @param namespace(unicode): application namespace managed by the plugin + @param handler(object): instance of a class which manage the application. + May have the following methods: + - requestConfirmation(session, desc_elt, client): + - if present, it is called on when session must be accepted. + - if it return True the session is accepted, else rejected. + A Deferred can be returned + - if not present, a generic accept dialog will be used + - jingleSessionInit(self, session, content_name[, *args, **kwargs], profile): must return the domish.Element used for initial content + - jingleHandler(self, action, session, content_name, transport_elt, profile): + called on several action to negociate the application or transport + """ + if namespace in self._applications: + raise exceptions.ConflictError(u"Trying to register already registered namespace {}".format(namespace)) + self._applications[namespace] = handler + + def registerTransport(self, namespace, transport_type, handler, priority=0): + """Register a transport plugin + + @param namespace(unicode): the XML namespace used for this transport + @param transport_type(unicode): type of transport to use (see XEP-0166 §8) + @param handler(object): instance of a class which manage the application. + Must have the following methods: + - jingleSessionInit(self, session, content_name[, *args, **kwargs], profile): must return the domish.Element used for initial content + - jingleHandler(self, action, session, content_name, transport_elt, profile): + called on several action to negociate the application or transport + @param priority(int): priority of this transport + """ + assert transport_type in (XEP_0166.TRANSPORT_DATAGRAM, XEP_0166.TRANSPORT_STREAMING) + if namespace in self._transports: + raise exceptions.ConflictError(u"Trying to register already registered namespace {}".format(namespace)) + transport_data = TransportData(namespace=namespace, handler=handler, priority=priority) + self._type_transports[transport_type].append(transport_data) + self._type_transports[transport_type].sort(key=lambda transport_data: transport_data.priority) + self._transports[namespace] = transport_data + log.debug(u"new jingle transport registered") + + @defer.inlineCallbacks + def initiate(self, to_jid, contents, profile=C.PROF_KEY_NONE): + """Send a session initiation request + + @param to_jid(jid.JID): jid to establith session with + @param contents(list[dict]): list of contents to use: + The dict must have the following keys: + - app_ns(unicode): namespace of the application + the following keys are optional: + - transport_type(unicode): type of transport to use (see XEP-0166 §8) + default to TRANSPORT_STREAMING + - name(unicode): name of the content + - app_args(list): args to pass to the application plugin + - app_kwargs(dict): keyword args to pass to the application plugin + @param profile: %(doc_profile)s + """ + assert contents # there must be at least one content + client = self.host.getClient(profile) + initiator = client.jid + sid = unicode(uuid.uuid4()) + # TODO: session cleaning after timeout ? + session = client.jingle_sessions[sid] = {'id': sid, + 'state': STATE_PENDING, + 'initiator': initiator, + 'to_jid': to_jid, + 'started': time.time(), + 'contents': {} + } + iq_elt, jingle_elt = self._buildJingleElt(client, session, XEP_0166.A_SESSION_INITIATE) + jingle_elt["initiator"] = initiator.full() + + contents_dict = session['contents'] + + for content in contents: + # we get the application plugin + app_ns = content['app_ns'] + try: + application_handler = self._applications[app_ns] + except KeyError: + raise exceptions.InternalError(u"No application registered for {}".format(app_ns)) + + # and the transport plugin + transport_type = content.get('transport_type', XEP_0166.TRANSPORT_STREAMING) + try: + transport_handler = self._type_transports[transport_type][0].handler + except IndexError: + raise exceptions.InternalError(u"No transport registered for {}".format(transport_type)) + + # we build the session data + content_data = {'application': application_handler, + 'transport': transport_handler, + 'creator': INITIATOR, + } + try: + content_name = content['name'] + except KeyError: + content_name = unicode(uuid.uuid4()) + else: + if content_name in contents_dict: + raise exceptions.InternalError('There is already a content with this name') + contents_dict[content_name] = content_data + + # we construct the content element + content_elt = jingle_elt.addElement('content') + content_elt['creator'] = content_data['creator'] + content_elt['name'] = content_name + + # then the description element + app_args = content.get('app_args', []) + app_kwargs = content.get('app_kwargs', {}) + app_kwargs['profile'] = profile + desc_elt = yield application_handler.jingleSessionInit(session, content_name, *app_args, **app_kwargs) + content_elt.addChild(desc_elt) + + # and the transport one + transport_elt = yield transport_handler.jingleSessionInit(session, content_name, profile) + content_elt.addChild(transport_elt) + + d = iq_elt.send() + d.addErrback(self._iqError, sid, client) + yield d + + def contentTerminate(self, session, content_name, reason=REASON_SUCCESS, profile=C.PROF_KEY_NONE): + """Terminate and remove a content + + if there is no more content, then session is terminated + @param session(dict): jingle session + @param content_name(unicode): name of the content terminated + @param reason(unicode): reason of the termination + @param profile: %(doc_profile)s + """ + contents = session['contents'] + del contents[content_name] + if not contents: + self.terminate(reason, session, profile) + + ## defaults methods called when plugin doesn't have them ## + + def jingleRequestConfirmationDefault(self, session, desc_elt, profile): + """This method request confirmation for a jingle session""" + log.debug(u"Using generic jingle confirmation method") + return xml_tools.deferConfirm(self.host, _(CONFIRM_TXT).format(entity=session['to_jid'].full()), _('Confirm Jingle session'), profile=profile) + + ## jingle events ## + + def _onJingleRequest(self, request, profile): + """Called when any jingle request is received + + The request will the be dispatched to appropriate method + according to current state + @param request(domish.Element): received IQ request + @para profile: %(doc_profile)s + """ + client = self.host.getClient(profile) + request.handled = True + jingle_elt = request.elements(NS_JINGLE, 'jingle').next() + + # first we need the session id + try: + sid = jingle_elt['sid'] + if not sid: + raise KeyError + except KeyError: + log.warning(u"Received jingle request has no sid attribute") + self.sendError('bad-request', None, request, profile) + return + + # then the action + try: + action = jingle_elt['action'] + if not action: + raise KeyError + except KeyError: + log.warning(u"Received jingle request has no action") + self.sendError('bad-request', None, request, profile) + return + + to_jid = jid.JID(request['from']) + + # we get or create the session + try: + session = client.jingle_sessions[sid] + except KeyError: + session = client.jingle_sessions[sid] = {'id': sid, + 'state': STATE_PENDING, + 'initiator': to_jid, + 'to_jid': to_jid, + 'started': time.time(), + } + else: + if session['to_jid'] != to_jid: + log.warning(u"sid conflict ({}), the jid doesn't match. Can be a collision, a hack attempt, or a bad sid generation".format(sid)) + self.sendError('service-unavailable', sid, request, profile) + return + if session['id'] != sid: + log.error(u"session id doesn't match") + self.sendError('service-unavailable', sid, request, profile) + raise exceptions.InternalError + + if action == XEP_0166.A_SESSION_INITIATE: + self.onSessionInitiate(client, request, jingle_elt, session) + elif action == XEP_0166.A_SESSION_TERMINATE: + self.onSessionTerminate(client, request, jingle_elt, session) + elif action == XEP_0166.A_SESSION_ACCEPT: + self.onSessionAccept(client, request, jingle_elt, session) + else: + raise exceptions.InternalError(u"Unknown action {}".format(session['state'])) + + ## Actions callbacks ## + + def _parseElements(self, jingle_elt, session, request, client, new=False, creator=INITIATOR): + """Parse contents elements and fill contents_dict accordingly + + after the parsing, contents_dict will containt handlers, "desc_elt" and "transport_elt" + @param jingle_elt(domish.Element): parent element, containing one or more + @param contents_dict(dict): session data for contents, the key is the name of the content + @param new(bool): if new the content is new and must be created, + else the content must exists, and session data will be filled + @param creator(unicode): only used if new is True: creating pear (see § 7.3) + @raise exceptions.CancelError: the error is treated the calling method can cancel the treatment (i.e. return) + """ + contents_dict = session['contents'] + content_elts = jingle_elt.elements(NS_JINGLE, 'content') + + for content_elt in content_elts: + name = content_elt['name'] + + if new: + # the content must not exist, we check it + if not name or name in contents_dict: + self.sendError('bad-request', session['id'], request, client.profile) + raise exceptions.CancelError + content_data = contents_dict[name] = {'creator': creator} + else: + # the content must exist, we check it + try: + content_data = contents_dict[name] + except KeyError: + log.warning(u"Other peer try to access an unknown content") + self.sendError('bad-request', session['id'], request, client.profile) + raise exceptions.CancelError + + # application + desc_elt = content_elt.description + if not desc_elt: + self.sendError('bad-request', session['id'], request, client.profile) + raise exceptions.CancelError + + if new: + # the content is new, we need to check and link the application_handler + app_ns = desc_elt.uri + if not app_ns or app_ns == NS_JINGLE: + self.sendError('bad-request', session['id'], request, client.profile) + raise exceptions.CancelError + + try: + application_handler = self._applications[app_ns] + except KeyError: + log.warning(u"Unmanaged application namespace [{}]".format(app_ns)) + self.sendError('service-unavailable', session['id'], request, client.profile) + raise exceptions.CancelError + + content_data['application'] = application_handler + else: + # the content exists, we check that we have not a former desc_elt + if 'desc_elt' in content_data: + raise exceptions.InternalError(u"desc_elt should not exist at this point") + + content_data['desc_elt'] = desc_elt + + # transport + transport_elt = content_elt.transport + if not transport_elt: + self.sendError('bad-request', session['id'], request, client.profile) + raise exceptions.CancelError + + if new: + # the content is new, we need to check and link the transport_handler + transport_ns = transport_elt.uri + if not app_ns or app_ns == NS_JINGLE: + self.sendError('bad-request', session['id'], request, client.profile) + raise exceptions.CancelError + + try: + transport_handler = self._transports[transport_ns].handler + except KeyError: + raise exceptions.InternalError(u"No transport registered for namespace {}".format(transport_ns)) + content_data['transport'] = transport_handler + else: + # the content exists, we check that we have not a former transport_elt + if 'transport_elt' in content_data: + raise exceptions.InternalError(u"desc_elt should not exist at this point") + + content_data['transport_elt'] = transport_elt + + def _callPlugins(self, action, session, app_method_name='jingleHandler', transp_method_name='jingleHandler', app_default_cb=None, transp_default_cb=None, delete=True, elements=True, profile=C.PROF_KEY_NONE): + """Call application and transport plugin methods for all contents + + @param action(unicode): jingle action name + @param session(dict): jingle session data + @param app_method_name(unicode, None): name of the method to call for applications + None to ignore + @param transp_method_name(unicode, None): name of the method to call for transports + None to ignore + @param app_default_cb(callable, None): default callback to use if plugin has not app_method_name + None to raise an exception instead + @param transp_default_cb(callable, None): default callback to use if plugin has not transp_method_name + None to raise an exception instead + @param delete(bool): if True, remove desc_elt and transport_elt from session + ignored if elements is False + @param elements(bool): True if elements(desc_elt and tranport_elt) must be managed + must be True if _callPlugins is use in a request, and False if it used after a request (i.e. on result or error) + @param profile(unicode): %(doc_profile)s + @return (list[defer.Deferred]): list of launched Deferred + """ + contents_dict = session['contents'] + defers_list = [] + for content_name, content_data in contents_dict.iteritems(): + for method_name, handler_key, default_cb, elt_name in ( + (app_method_name, 'application', app_default_cb, 'desc_elt'), + (transp_method_name, 'transport', transp_default_cb, 'transport_elt')): + if method_name is None: + continue + + handler = content_data[handler_key] + try: + method = getattr(handler, method_name) + except AttributeError: + if default_cb is not None: + method = default_cb + else: + raise exceptions.InternalError(u'{} not implemented !'.format(method_name)) + finally: + if elements: + elt = content_data.pop(elt_name) if delete else content_data[elt_name] + else: + elt = None + d = defer.maybeDeferred(method, action, session, content_name, elt, profile) + defers_list.append(d) + + return defers_list + + def onSessionInitiate(self, client, request, jingle_elt, session): + """Called on session-initiate action + + The "jingleRequestConfirmation" method of each application will be called + (or self.jingleRequestConfirmationDefault if the former doesn't exist). + The session is only accepted if all application are confirmed. + The application must manage itself multiple contents scenari (e.g. audio/video). + @param client: %(doc_client)s + @param request(domish.Element): full request + @param jingle_elt(domish.Element): element + @param session(dict): session data + """ + if 'contents' in session: + raise exceptions.InternalError("Contents dict should not already exist at this point") + session['contents'] = contents_dict = {} + + try: + self._parseElements(jingle_elt, session, request, client, True, INITIATOR) + except exceptions.CancelError: + return + + if not contents_dict: + # there MUST be at least one content + self.sendError('bad-request', session['id'], request, client.profile) + return + + # at this point we can send the result to confirm reception of the request + client.xmlstream.send(xmlstream.toResponse(request, 'result')) + + # we now request each application plugin confirmation + # and if all are accepted, we can accept the session + confirm_defers = self._callPlugins(XEP_0166.A_SESSION_INITIATE, session, 'jingleRequestConfirmation', None, self.jingleRequestConfirmationDefault, delete=False, profile=client.profile) + + confirm_dlist = defer.gatherResults(confirm_defers) + confirm_dlist.addCallback(self._confirmationCb, session, jingle_elt, client) + confirm_dlist.addErrback(self._jingleErrorCb, session['id'], request, client) + + def _confirmationCb(self, confirm_results, session, jingle_elt, client): + """Method called when confirmation from user has been received + + This method is only called for the responder + @param confirm_results(list[bool]): all True if session is accepted + @param session(dict): session data + @param jingle_elt(domish.Element): jingle data of this session + @param client: %(doc_client)s + """ + confirmed = all(confirm_results) + if not confirmed: + return self.terminate(XEP_0166.REASON_DECLINE, session, client.profile) + + iq_elt, jingle_elt = self._buildJingleElt(client, session, XEP_0166.A_SESSION_ACCEPT) + jingle_elt['responder'] = client.jid.full() + + # contents + + def addElement(domish_elt, content_elt): + content_elt.addChild(domish_elt) + + defers_list = [] + + for content_name, content_data in session['contents'].iteritems(): + content_elt = jingle_elt.addElement('content') + content_elt['creator'] = INITIATOR + content_elt['name'] = content_name + + application_handler = content_data['application'] + app_session_accept_cb = application_handler.jingleHandler + + app_d = defer.maybeDeferred(app_session_accept_cb, + XEP_0166.A_SESSION_INITIATE, session, content_name, content_data.pop('desc_elt'), client.profile) + app_d.addCallback(addElement, content_elt) + defers_list.append(app_d) + + transport_handler = content_data['transport'] + transport_session_accept_cb = transport_handler.jingleHandler + + transport_d = defer.maybeDeferred(transport_session_accept_cb, + XEP_0166.A_SESSION_INITIATE, session, content_name, content_data.pop('transport_elt'), client.profile) + transport_d.addCallback(addElement, content_elt) + defers_list.append(transport_d) + + d_list = defer.DeferredList(defers_list) + d_list.addCallback(lambda dummy: self._callPlugins(XEP_0166.A_PREPARE_RESPONDER, session, app_method_name=None, elements=False, profile=client.profile)) + d_list.addCallback(lambda dummy: iq_elt.send()) + def changeState(dummy, session): + session['state'] = STATE_ACTIVE + + d_list.addCallback(changeState, session) + d_list.addCallback(lambda dummy: self._callPlugins(XEP_0166.A_ACCEPTED_ACK, session, elements=False, profile=client.profile)) + d_list.addErrback(self._iqError, session['id'], client) + return d_list + + def onSessionTerminate(self, client, request, jingle_elt, session): + # TODO: check reason, display a message to user if needed + log.debug("Jingle Session {} terminated".format(session['id'])) + self._delSession(client, session['id']) + client.xmlstream.send(xmlstream.toResponse(request, 'result')) + + def onSessionAccept(self, client, request, jingle_elt, session): + """Method called one sesion is accepted + + This method is only called for initiator + @param client: %(doc_client)s + @param request(domish.Element): full request + @param jingle_elt(domish.Element): the element + @param session(dict): session data + """ + log.debug(u"Jingle session {} has been accepted".format(session['id'])) + + try: + self._parseElements(jingle_elt, session, request, client) + except exceptions.CancelError: + return + + # at this point we can send the result to confirm reception of the request + client.xmlstream.send(xmlstream.toResponse(request, 'result')) + # and change the state + session['state'] = STATE_ACTIVE + + negociate_defers = [] + negociate_defers = self._callPlugins(XEP_0166.A_SESSION_ACCEPT, session, profile=client.profile) + + negociate_dlist = defer.DeferredList(negociate_defers) + + # after negociations we start the transfer + negociate_dlist.addCallback(lambda dummy: self._callPlugins(XEP_0166.A_START, session, app_method_name=None, elements=False, profile=client.profile)) + + +class XEP_0166_handler(xmlstream.XMPPHandler): + implements(iwokkel.IDisco) + + def __init__(self, plugin_parent): + self.plugin_parent = plugin_parent + + def connectionInitialized(self): + self.xmlstream.addObserver(JINGLE_REQUEST, self.plugin_parent._onJingleRequest, profile=self.parent.profile) + + def getDiscoInfo(self, requestor, target, nodeIdentifier=''): + return [disco.DiscoFeature(NS_JINGLE)] + + def getDiscoItems(self, requestor, target, nodeIdentifier=''): + return []