Mercurial > libervia-backend
changeset 4044:3900626bc100
plugin XEP-0166: refactoring, and various improvments:
- add models for transport and applications handlers and linked data
- split models into separate file
- some type hints
- some documentation comments
- add actions to prepare confirmation, useful to do initial parsing of all contents
- application arg/kwargs and some transport data can be initialised during Jingle
`initiate` call, this is notably useful when a call is made with transport data (this is
the call for A/V calls where codecs and ICE candidate can be specified when starting a
call)
- session data can be specified during Jingle `initiate` call
- new `store_in_session` argument in `_parse_elements`, which can be used to avoid
race-condition when a context element (<decription> or <transport>) is being parsed for
an action while an other action happens (like `transport-info`)
- don't sed `sid` in `transport_elt` during a `transport-info` action anymore in
`build_action`: this is specific to Jingle File Transfer and has been moved there
rel 419
author | Goffi <goffi@goffi.org> |
---|---|
date | Mon, 15 May 2023 16:23:11 +0200 |
parents | 9641ce286e07 |
children | ae756bf7c3e8 |
files | sat/core/core_types.py sat/plugins/plugin_xep_0166.py sat/plugins/plugin_xep_0166/__init__.py sat/plugins/plugin_xep_0166/models.py sat/plugins/plugin_xep_0215.py sat/plugins/plugin_xep_0260.py sat/plugins/plugin_xep_0353.py |
diffstat | 7 files changed, 1596 insertions(+), 1255 deletions(-) [+] |
line wrap: on
line diff
--- a/sat/core/core_types.py Mon May 15 16:20:58 2023 +0200 +++ b/sat/core/core_types.py Mon May 15 16:23:11 2023 +0200 @@ -27,6 +27,7 @@ class SatXMPPEntity: + profile: str jid: t_jid.JID is_component: bool server_jid: t_jid.JID
--- a/sat/plugins/plugin_xep_0166.py Mon May 15 16:20:58 2023 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,1248 +0,0 @@ -#!/usr/bin/env python3 - -# SàT plugin for Jingle (XEP-0166) -# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) - -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. - -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. - -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - - -from collections import namedtuple -import time -from typing import Any, Dict, Tuple -import uuid - -from twisted.internet import defer -from twisted.internet import reactor -from twisted.python import failure -from twisted.words.protocols.jabber import jid -from twisted.words.protocols.jabber import error -from twisted.words.protocols.jabber import xmlstream -from twisted.words.xish import domish -from wokkel import disco, iwokkel -from zope.interface import implementer - -from sat.core import exceptions -from sat.core.constants import Const as C -from sat.core.core_types import SatXMPPEntity -from sat.core.i18n import D_, _ -from sat.core.log import getLogger -from sat.tools import xml_tools -from sat.tools import utils - - -log = getLogger(__name__) - - -IQ_SET = '/iq[@type="set"]' -NS_JINGLE = "urn:xmpp:jingle:1" -NS_JINGLE_ERROR = "urn:xmpp:jingle:errors:1" -JINGLE_REQUEST = IQ_SET + '/jingle[@xmlns="' + NS_JINGLE + '"]' -STATE_PENDING = "PENDING" -STATE_ACTIVE = "ACTIVE" -STATE_ENDED = "ENDED" -CONFIRM_TXT = D_("{entity} want to start a jingle session with you, do you accept ?") - -PLUGIN_INFO = { - C.PI_NAME: "Jingle", - C.PI_IMPORT_NAME: "XEP-0166", - C.PI_TYPE: "XEP", - C.PI_MODES: C.PLUG_MODE_BOTH, - C.PI_PROTOCOLS: ["XEP-0166"], - C.PI_MAIN: "XEP_0166", - C.PI_HANDLER: "yes", - C.PI_DESCRIPTION: _("""Implementation of Jingle"""), -} - - -ApplicationData = namedtuple("ApplicationData", ("namespace", "handler")) -TransportData = namedtuple("TransportData", ("namespace", "handler", "priority")) - - -class XEP_0166: - namespace = NS_JINGLE - ROLE_INITIATOR = "initiator" - ROLE_RESPONDER = "responder" - TRANSPORT_DATAGRAM = "UDP" - TRANSPORT_STREAMING = "TCP" - REASON_SUCCESS = "success" - REASON_DECLINE = "decline" - REASON_FAILED_APPLICATION = "failed-application" - REASON_FAILED_TRANSPORT = "failed-transport" - REASON_CONNECTIVITY_ERROR = "connectivity-error" - A_SESSION_INITIATE = "session-initiate" - A_SESSION_ACCEPT = "session-accept" - A_SESSION_TERMINATE = "session-terminate" - A_SESSION_INFO = "session-info" - A_TRANSPORT_REPLACE = "transport-replace" - A_TRANSPORT_ACCEPT = "transport-accept" - A_TRANSPORT_REJECT = "transport-reject" - A_TRANSPORT_INFO = "transport-info" - # non standard actions - A_PREPARE_INITIATOR = "prepare-initiator" # initiator must prepare tranfer - A_PREPARE_RESPONDER = "prepare-responder" # responder must prepare tranfer - A_ACCEPTED_ACK = ( - "accepted-ack" - ) # session accepted ack has been received from initiator - A_START = "start" # application can start - A_DESTROY = ( - "destroy" - ) # called when a transport is destroyed (e.g. because it is remplaced). Used to do cleaning operations - - def __init__(self, host): - log.info(_("plugin Jingle initialization")) - self.host = host - self._applications = {} # key: namespace, value: application data - self._transports = {} # key: namespace, value: transport data - # we also keep transports by type, they are then sorted by priority - self._type_transports = { - XEP_0166.TRANSPORT_DATAGRAM: [], - XEP_0166.TRANSPORT_STREAMING: [], - } - - def profile_connected(self, client): - client.jingle_sessions = {} # key = sid, value = session_data - - def get_handler(self, client): - return XEP_0166_handler(self) - - def _del_session(self, client, sid): - try: - del client.jingle_sessions[sid] - except KeyError: - log.debug( - f"Jingle session id {sid!r} is unknown, nothing to delete " - f"[{client.profile}]") - else: - log.debug(f"Jingle session id {sid!r} deleted [{client.profile}]") - - ## helpers methods to build stanzas ## - - def _build_jingle_elt(self, client, session, action): - iq_elt = client.IQ("set") - iq_elt["from"] = session['local_jid'].full() - iq_elt["to"] = session["peer_jid"].full() - jingle_elt = iq_elt.addElement("jingle", NS_JINGLE) - jingle_elt["sid"] = session["id"] - jingle_elt["action"] = action - return iq_elt, jingle_elt - - def sendError(self, client, error_condition, sid, request, jingle_condition=None): - """Send error stanza - - @param error_condition: one of twisted.words.protocols.jabber.error.STANZA_CONDITIONS keys - @param sid(unicode,None): jingle session id, or None, if session must not be destroyed - @param request(domish.Element): original request - @param jingle_condition(None, unicode): if not None, additional jingle-specific error information - """ - iq_elt = error.StanzaError(error_condition).toResponse(request) - if jingle_condition is not None: - iq_elt.error.addElement((NS_JINGLE_ERROR, jingle_condition)) - if error.STANZA_CONDITIONS[error_condition]["type"] == "cancel" and sid: - self._del_session(client, sid) - log.warning( - "Error while managing jingle session, cancelling: {condition}".format( - condition=error_condition - ) - ) - return client.send(iq_elt) - - def _terminate_eb(self, failure_): - log.warning(_("Error while terminating session: {msg}").format(msg=failure_)) - - def terminate(self, client, reason, session, text=None): - """Terminate the session - - send the session-terminate action, and delete the session data - @param reason(unicode, list[domish.Element]): if unicode, will be transformed to an element - if a list of element, add them as children of the <reason/> element - @param session(dict): data of the session - """ - iq_elt, jingle_elt = self._build_jingle_elt( - client, session, XEP_0166.A_SESSION_TERMINATE - ) - reason_elt = jingle_elt.addElement("reason") - if isinstance(reason, str): - reason_elt.addElement(reason) - else: - for elt in reason: - reason_elt.addChild(elt) - if text is not None: - reason_elt.addElement("text", content=text) - self._del_session(client, session["id"]) - d = iq_elt.send() - d.addErrback(self._terminate_eb) - return d - - ## errors which doesn't imply a stanza sending ## - - def _iq_error(self, failure_, sid, client): - """Called when we got an <iq/> error - - @param failure_(failure.Failure): the exceptions raised - @param sid(unicode): jingle session id - """ - log.warning( - "Error while sending jingle <iq/> stanza: {failure_}".format( - failure_=failure_.value - ) - ) - self._del_session(client, sid) - - def _jingle_error_cb(self, failure_, session, request, client): - """Called when something is going wrong while parsing jingle request - - The error condition depend of the exceptions raised: - exceptions.DataError raise a bad-request condition - @param fail(failure.Failure): the exceptions raised - @param session(dict): data of the session - @param request(domsih.Element): jingle request - @param client: %(doc_client)s - """ - log.warning(f"Error while processing jingle request [{client.profile}]") - if isinstance(failure_.value, defer.FirstError): - failure_ = failure_.value.subFailure.value - if isinstance(failure_, exceptions.DataError): - return self.sendError(client, "bad-request", session["id"], request) - elif isinstance(failure_, error.StanzaError): - return self.terminate(client, self.REASON_FAILED_APPLICATION, session, - text=str(failure_)) - else: - log.error(f"Unmanaged jingle exception: {failure_}") - return self.terminate(client, self.REASON_FAILED_APPLICATION, session, - text=str(failure_)) - - ## methods used by other plugins ## - - def register_application(self, namespace, handler): - """Register an application plugin - - @param namespace(unicode): application namespace managed by the plugin - @param handler(object): instance of a class which manage the application. - May have the following methods: - - requestConfirmation(session, desc_elt, client): - - if present, it is called on when session must be accepted. - - if it return True the session is accepted, else rejected. - A Deferred can be returned - - if not present, a generic accept dialog will be used - - jingle_session_init(client, self, session, content_name[, *args, **kwargs]): must return the domish.Element used for initial content - - jingle_handler(client, self, action, session, content_name, transport_elt): - called on several action to negociate the application or transport - - jingle_terminate: called on session terminate, with reason_elt - May be used to clean session - """ - if namespace in self._applications: - raise exceptions.ConflictError( - "Trying to register already registered namespace {}".format(namespace) - ) - self._applications[namespace] = ApplicationData( - namespace=namespace, handler=handler - ) - log.debug("new jingle application registered") - - def register_transport(self, namespace, transport_type, handler, priority=0): - """Register a transport plugin - - @param namespace(unicode): the XML namespace used for this transport - @param transport_type(unicode): type of transport to use (see XEP-0166 §8) - @param handler(object): instance of a class which manage the application. - Must have the following methods: - - jingle_session_init(client, self, session, content_name[, *args, **kwargs]): must return the domish.Element used for initial content - - jingle_handler(client, self, action, session, content_name, transport_elt): - called on several action to negociate the application or transport - @param priority(int): priority of this transport - """ - assert transport_type in ( - XEP_0166.TRANSPORT_DATAGRAM, - XEP_0166.TRANSPORT_STREAMING, - ) - if namespace in self._transports: - raise exceptions.ConflictError( - "Trying to register already registered namespace {}".format(namespace) - ) - transport_data = TransportData( - namespace=namespace, handler=handler, priority=priority - ) - self._type_transports[transport_type].append(transport_data) - self._type_transports[transport_type].sort( - key=lambda transport_data: transport_data.priority, reverse=True - ) - self._transports[namespace] = transport_data - log.debug("new jingle transport registered") - - @defer.inlineCallbacks - def transport_replace(self, client, transport_ns, session, content_name): - """Replace a transport - - @param transport_ns(unicode): namespace of the new transport to use - @param session(dict): jingle session data - @param content_name(unicode): name of the content - """ - # XXX: for now we replace the transport before receiving confirmation from other peer - # this is acceptable because we terminate the session if transport is rejected. - # this behavious may change in the future. - content_data = session["contents"][content_name] - transport_data = content_data["transport_data"] - try: - transport = self._transports[transport_ns] - except KeyError: - raise exceptions.InternalError("Unkown transport") - yield content_data["transport"].handler.jingle_handler( - client, XEP_0166.A_DESTROY, session, content_name, None - ) - content_data["transport"] = transport - transport_data.clear() - - iq_elt, jingle_elt = self._build_jingle_elt( - client, session, XEP_0166.A_TRANSPORT_REPLACE - ) - content_elt = jingle_elt.addElement("content") - content_elt["name"] = content_name - content_elt["creator"] = content_data["creator"] - - transport_elt = transport.handler.jingle_session_init(client, session, content_name) - content_elt.addChild(transport_elt) - iq_elt.send() - - def build_action(self, client, action, session, content_name): - """Build an element according to requested action - - @param action(unicode): a jingle action (see XEP-0166 §7.2), - session-* actions are not managed here - transport-replace is managed in the dedicated [transport_replace] method - @param session(dict): jingle session data - @param content_name(unicode): name of the content - @return (tuple[domish.Element, domish.Element]): parent <iq> element, <transport> or <description> element, according to action - """ - # we first build iq, jingle and content element which are the same in every cases - iq_elt, jingle_elt = self._build_jingle_elt(client, session, action) - # FIXME: XEP-0260 § 2.3 Ex 5 has an initiator attribute, but it should not according to XEP-0166 §7.1 table 1, must be checked - content_data = session["contents"][content_name] - content_elt = jingle_elt.addElement("content") - content_elt["name"] = content_name - content_elt["creator"] = content_data["creator"] - - if action == XEP_0166.A_TRANSPORT_INFO: - context_elt = transport_elt = content_elt.addElement( - "transport", content_data["transport"].namespace - ) - transport_elt["sid"] = content_data["transport_data"]["sid"] - else: - raise exceptions.InternalError("unmanaged action {}".format(action)) - - return iq_elt, context_elt - - def build_session_info(self, client, session): - """Build a session-info action - - @param session(dict): jingle session data - @return (tuple[domish.Element, domish.Element]): parent <iq> element, <jingle> element - """ - return self._build_jingle_elt(client, session, XEP_0166.A_SESSION_INFO) - - def getApplication(self, namespace: str) -> object: - """Retreive application corresponding to a namespace - - @raise exceptions.NotFound if application can't be found - """ - try: - return self._applications[namespace] - except KeyError: - raise exceptions.NotFound( - f"No application registered for {namespace}" - ) - - def get_content_data(self, content: dict) -> Tuple[object, list, dict, str]: - """"Retrieve application and its argument from content""" - app_ns = content["app_ns"] - try: - application = self.getApplication(app_ns) - except exceptions.NotFound as e: - raise exceptions.InternalError(str(e)) - app_args = content.get("app_args", []) - app_kwargs = content.get("app_kwargs", {}) - try: - content_name = content["name"] - except KeyError: - content_name = content["name"] = str(uuid.uuid4()) - return application, app_args, app_kwargs, content_name - - async def initiate(self, client, peer_jid, contents, encrypted=False): - """Send a session initiation request - - @param peer_jid(jid.JID): jid to establith session with - @param contents(list[dict]): list of contents to use: - The dict must have the following keys: - - app_ns(unicode): namespace of the application - the following keys are optional: - - transport_type(unicode): type of transport to use (see XEP-0166 §8) - default to TRANSPORT_STREAMING - - name(unicode): name of the content - - senders(unicode): One of XEP_0166.ROLE_INITIATOR, XEP_0166.ROLE_RESPONDER, both or none - default to BOTH (see XEP-0166 §7.3) - - app_args(list): args to pass to the application plugin - - app_kwargs(dict): keyword args to pass to the application plugin - @param encrypted: if True, session must be encrypted and "encryption" must be set - to all content data of session - @return (unicode): jingle session id - """ - assert contents # there must be at least one content - if (peer_jid == client.jid - or client.is_component and peer_jid.host == client.jid.host): - raise ValueError(_("You can't do a jingle session with yourself")) - initiator = client.jid - sid = str(uuid.uuid4()) - # TODO: session cleaning after timeout ? - session = client.jingle_sessions[sid] = { - "id": sid, - "state": STATE_PENDING, - "initiator": initiator, - "role": XEP_0166.ROLE_INITIATOR, - "local_jid": client.jid, - "peer_jid": peer_jid, - "started": time.time(), - "contents": {}, - } - - if not await self.host.trigger.async_point( - "XEP-0166_initiate", - client, session, contents - ): - return - - iq_elt, jingle_elt = self._build_jingle_elt( - client, session, XEP_0166.A_SESSION_INITIATE - ) - jingle_elt["initiator"] = initiator.full() - - contents_dict = session["contents"] - - for content in contents: - # we get the application plugin - application, app_args, app_kwargs, content_name = self.get_content_data(content) - - # and the transport plugin - transport_type = content.get("transport_type", XEP_0166.TRANSPORT_STREAMING) - try: - transport = self._type_transports[transport_type][0] - except IndexError: - raise exceptions.InternalError( - "No transport registered for {}".format(transport_type) - ) - - # we build the session data - content_data = { - "application": application, - "application_data": {}, - "transport": transport, - "transport_data": {}, - "creator": XEP_0166.ROLE_INITIATOR, - "senders": content.get("senders", "both"), - } - if content_name in contents_dict: - raise exceptions.InternalError( - "There is already a content with this name" - ) - contents_dict[content_name] = content_data - - # we construct the content element - content_elt = jingle_elt.addElement("content") - content_elt["creator"] = content_data["creator"] - content_elt["name"] = content_name - try: - content_elt["senders"] = content["senders"] - except KeyError: - pass - - # then the description element - desc_elt = await utils.as_deferred( - application.handler.jingle_session_init, - client, session, content_name, *app_args, **app_kwargs - ) - content_elt.addChild(desc_elt) - - # and the transport one - transport_elt = await utils.as_deferred( - transport.handler.jingle_session_init, - client, session, content_name - ) - content_elt.addChild(transport_elt) - - if not await self.host.trigger.async_point( - "XEP-0166_initiate_elt_built", - client, session, iq_elt, jingle_elt - ): - return - if encrypted: - for content in session["contents"].values(): - if "encryption" not in content: - raise exceptions.EncryptionError( - "Encryption is requested, but no encryption has been set" - ) - - try: - await iq_elt.send() - except Exception as e: - failure_ = failure.Failure(e) - self._iq_error(failure_, sid, client) - raise failure_ - - def delayed_content_terminate(self, *args, **kwargs): - """Put content_terminate in queue but don't execute immediately - - This is used to terminate a content inside a handler, to avoid modifying contents - """ - reactor.callLater(0, self.content_terminate, *args, **kwargs) - - def content_terminate(self, client, session, content_name, reason=REASON_SUCCESS): - """Terminate and remove a content - - if there is no more content, then session is terminated - @param session(dict): jingle session - @param content_name(unicode): name of the content terminated - @param reason(unicode): reason of the termination - """ - contents = session["contents"] - del contents[content_name] - if not contents: - self.terminate(client, reason, session) - - ## defaults methods called when plugin doesn't have them ## - - def jingle_request_confirmation_default( - self, client, action, session, content_name, desc_elt - ): - """This method request confirmation for a jingle session""" - log.debug("Using generic jingle confirmation method") - return xml_tools.defer_confirm( - self.host, - _(CONFIRM_TXT).format(entity=session["peer_jid"].full()), - _("Confirm Jingle session"), - profile=client.profile, - ) - - ## jingle events ## - - def _on_jingle_request(self, request: domish.Element, client: SatXMPPEntity) -> None: - defer.ensureDeferred(self.on_jingle_request(client, request)) - - async def on_jingle_request( - self, - client: SatXMPPEntity, - request: domish.Element - ) -> None: - """Called when any jingle request is received - - The request will then be dispatched to appropriate method - according to current state - @param request(domish.Element): received IQ request - """ - request.handled = True - jingle_elt = next(request.elements(NS_JINGLE, "jingle")) - - # first we need the session id - try: - sid = jingle_elt["sid"] - if not sid: - raise KeyError - except KeyError: - log.warning("Received jingle request has no sid attribute") - self.sendError(client, "bad-request", None, request) - return - - # then the action - try: - action = jingle_elt["action"] - if not action: - raise KeyError - except KeyError: - log.warning("Received jingle request has no action") - self.sendError(client, "bad-request", None, request) - return - - peer_jid = jid.JID(request["from"]) - - # we get or create the session - try: - session = client.jingle_sessions[sid] - except KeyError: - if action == XEP_0166.A_SESSION_INITIATE: - pass - elif action == XEP_0166.A_SESSION_TERMINATE: - log.debug( - "ignoring session terminate action (inexisting session id): {request_id} [{profile}]".format( - request_id=sid, profile=client.profile - ) - ) - return - else: - log.warning( - "Received request for an unknown session id: {request_id} [{profile}]".format( - request_id=sid, profile=client.profile - ) - ) - self.sendError(client, "item-not-found", None, request, "unknown-session") - return - - session = client.jingle_sessions[sid] = { - "id": sid, - "state": STATE_PENDING, - "initiator": peer_jid, - "role": XEP_0166.ROLE_RESPONDER, - # we store local_jid using request['to'] because for a component the jid - # used may not be client.jid (if a local part is used). - "local_jid": jid.JID(request['to']), - "peer_jid": peer_jid, - "started": time.time(), - } - else: - if session["peer_jid"] != peer_jid: - log.warning( - "sid conflict ({}), the jid doesn't match. Can be a collision, a hack attempt, or a bad sid generation".format( - sid - ) - ) - self.sendError(client, "service-unavailable", sid, request) - return - if session["id"] != sid: - log.error("session id doesn't match") - self.sendError(client, "service-unavailable", sid, request) - raise exceptions.InternalError - - if action == XEP_0166.A_SESSION_INITIATE: - await self.on_session_initiate(client, request, jingle_elt, session) - elif action == XEP_0166.A_SESSION_TERMINATE: - self.on_session_terminate(client, request, jingle_elt, session) - elif action == XEP_0166.A_SESSION_ACCEPT: - self.on_session_accept(client, request, jingle_elt, session) - elif action == XEP_0166.A_SESSION_INFO: - self.on_session_info(client, request, jingle_elt, session) - elif action == XEP_0166.A_TRANSPORT_INFO: - self.on_transport_info(client, request, jingle_elt, session) - elif action == XEP_0166.A_TRANSPORT_REPLACE: - self.on_transport_replace(client, request, jingle_elt, session) - elif action == XEP_0166.A_TRANSPORT_ACCEPT: - self.on_transport_accept(client, request, jingle_elt, session) - elif action == XEP_0166.A_TRANSPORT_REJECT: - self.on_transport_reject(client, request, jingle_elt, session) - else: - raise exceptions.InternalError("Unknown action {}".format(action)) - - ## Actions callbacks ## - - def _parse_elements( - self, - jingle_elt, - session, - request, - client, - new=False, - creator=ROLE_INITIATOR, - with_application=True, - with_transport=True, - ): - """Parse contents elements and fill contents_dict accordingly - - after the parsing, contents_dict will containt handlers, "desc_elt" and "transport_elt" - @param jingle_elt(domish.Element): parent <jingle> element, containing one or more <content> - @param session(dict): session data - @param request(domish.Element): the whole request - @param client: %(doc_client)s - @param new(bool): True if the content is new and must be created, - else the content must exists, and session data will be filled - @param creator(unicode): only used if new is True: creating pear (see § 7.3) - @param with_application(bool): if True, raise an error if there is no <description> element else ignore it - @param with_transport(bool): if True, raise an error if there is no <transport> element else ignore it - @raise exceptions.CancelError: the error is treated and the calling method can cancel the treatment (i.e. return) - """ - contents_dict = session["contents"] - content_elts = jingle_elt.elements(NS_JINGLE, "content") - - for content_elt in content_elts: - name = content_elt["name"] - - if new: - # the content must not exist, we check it - if not name or name in contents_dict: - self.sendError(client, "bad-request", session["id"], request) - raise exceptions.CancelError - content_data = contents_dict[name] = { - "creator": creator, - "senders": content_elt.attributes.get("senders", "both"), - } - else: - # the content must exist, we check it - try: - content_data = contents_dict[name] - except KeyError: - log.warning("Other peer try to access an unknown content") - self.sendError(client, "bad-request", session["id"], request) - raise exceptions.CancelError - - # application - if with_application: - desc_elt = content_elt.description - if not desc_elt: - self.sendError(client, "bad-request", session["id"], request) - raise exceptions.CancelError - - if new: - # the content is new, we need to check and link the application - app_ns = desc_elt.uri - if not app_ns or app_ns == NS_JINGLE: - self.sendError(client, "bad-request", session["id"], request) - raise exceptions.CancelError - - try: - application = self._applications[app_ns] - except KeyError: - log.warning( - "Unmanaged application namespace [{}]".format(app_ns) - ) - self.sendError( - client, "service-unavailable", session["id"], request - ) - raise exceptions.CancelError - - content_data["application"] = application - content_data["application_data"] = {} - else: - # the content exists, we check that we have not a former desc_elt - if "desc_elt" in content_data: - raise exceptions.InternalError( - "desc_elt should not exist at this point" - ) - - content_data["desc_elt"] = desc_elt - - # transport - if with_transport: - transport_elt = content_elt.transport - if not transport_elt: - self.sendError(client, "bad-request", session["id"], request) - raise exceptions.CancelError - - if new: - # the content is new, we need to check and link the transport - transport_ns = transport_elt.uri - if not app_ns or app_ns == NS_JINGLE: - self.sendError(client, "bad-request", session["id"], request) - raise exceptions.CancelError - - try: - transport = self._transports[transport_ns] - except KeyError: - raise exceptions.InternalError( - "No transport registered for namespace {}".format( - transport_ns - ) - ) - content_data["transport"] = transport - content_data["transport_data"] = {} - else: - # the content exists, we check that we have not a former transport_elt - if "transport_elt" in content_data: - raise exceptions.InternalError( - "transport_elt should not exist at this point" - ) - - content_data["transport_elt"] = transport_elt - - def _ignore(self, client, action, session, content_name, elt): - """Dummy method used when not exception must be raised if a method is not implemented in _call_plugins - - must be used as app_default_cb and/or transp_default_cb - """ - return elt - - def _call_plugins(self, client, action, session, app_method_name="jingle_handler", - transp_method_name="jingle_handler", app_default_cb=None, - transp_default_cb=None, delete=True, elements=True, - force_element=None): - """Call application and transport plugin methods for all contents - - @param action(unicode): jingle action name - @param session(dict): jingle session data - @param app_method_name(unicode, None): name of the method to call for applications - None to ignore - @param transp_method_name(unicode, None): name of the method to call for transports - None to ignore - @param app_default_cb(callable, None): default callback to use if plugin has not app_method_name - None to raise an exception instead - @param transp_default_cb(callable, None): default callback to use if plugin has not transp_method_name - None to raise an exception instead - @param delete(bool): if True, remove desc_elt and transport_elt from session - ignored if elements is False - @param elements(bool): True if elements(desc_elt and tranport_elt) must be managed - must be True if _call_plugins is used in a request, and False if it used after a request - (i.e. on <iq> result or error) - @param force_element(None, domish.Element, object): if elements is False, it is used as element parameter - else it is ignored - @return (list[defer.Deferred]): list of launched Deferred - @raise exceptions.NotFound: method is not implemented - """ - contents_dict = session["contents"] - defers_list = [] - for content_name, content_data in contents_dict.items(): - for method_name, handler_key, default_cb, elt_name in ( - (app_method_name, "application", app_default_cb, "desc_elt"), - (transp_method_name, "transport", transp_default_cb, "transport_elt"), - ): - if method_name is None: - continue - - handler = content_data[handler_key].handler - try: - method = getattr(handler, method_name) - except AttributeError: - if default_cb is None: - raise exceptions.NotFound( - "{} not implemented !".format(method_name) - ) - else: - method = default_cb - if elements: - elt = content_data.pop(elt_name) if delete else content_data[elt_name] - else: - elt = force_element - d = utils.as_deferred( - method, client, action, session, content_name, elt - ) - defers_list.append(d) - - return defers_list - - async def on_session_initiate( - self, - client: SatXMPPEntity, - request: domish.Element, - jingle_elt: domish.Element, - session: Dict[str, Any] - ) -> None: - """Called on session-initiate action - - The "jingle_request_confirmation" method of each application will be called - (or self.jingle_request_confirmation_default if the former doesn't exist). - The session is only accepted if all application are confirmed. - The application must manage itself multiple contents scenari (e.g. audio/video). - @param client: %(doc_client)s - @param request(domish.Element): full request - @param jingle_elt(domish.Element): <jingle> element - @param session(dict): session data - """ - if "contents" in session: - raise exceptions.InternalError( - "Contents dict should not already exist at this point" - ) - session["contents"] = contents_dict = {} - - try: - self._parse_elements( - jingle_elt, session, request, client, True, XEP_0166.ROLE_INITIATOR - ) - except exceptions.CancelError: - return - - if not contents_dict: - # there MUST be at least one content - self.sendError(client, "bad-request", session["id"], request) - return - - # at this point we can send the <iq/> result to confirm reception of the request - client.send(xmlstream.toResponse(request, "result")) - - - if not await self.host.trigger.async_point( - "XEP-0166_on_session_initiate", - client, session, request, jingle_elt - ): - return - - # we now request each application plugin confirmation - # and if all are accepted, we can accept the session - confirm_defers = self._call_plugins( - client, - XEP_0166.A_SESSION_INITIATE, - session, - "jingle_request_confirmation", - None, - self.jingle_request_confirmation_default, - delete=False, - ) - - confirm_dlist = defer.gatherResults(confirm_defers) - confirm_dlist.addCallback(self._confirmation_cb, session, jingle_elt, client) - confirm_dlist.addErrback(self._jingle_error_cb, session, request, client) - - def _confirmation_cb(self, confirm_results, session, jingle_elt, client): - """Method called when confirmation from user has been received - - This method is only called for the responder - @param confirm_results(list[bool]): all True if session is accepted - @param session(dict): session data - @param jingle_elt(domish.Element): jingle data of this session - @param client: %(doc_client)s - """ - confirmed = all(confirm_results) - if not confirmed: - return self.terminate(client, XEP_0166.REASON_DECLINE, session) - - iq_elt, jingle_elt = self._build_jingle_elt( - client, session, XEP_0166.A_SESSION_ACCEPT - ) - jingle_elt["responder"] = session['local_jid'].full() - - # contents - - def addElement(domish_elt, content_elt): - content_elt.addChild(domish_elt) - - defers_list = [] - - for content_name, content_data in session["contents"].items(): - content_elt = jingle_elt.addElement("content") - content_elt["creator"] = XEP_0166.ROLE_INITIATOR - content_elt["name"] = content_name - - application = content_data["application"] - app_session_accept_cb = application.handler.jingle_handler - - app_d = utils.as_deferred( - app_session_accept_cb, - client, - XEP_0166.A_SESSION_INITIATE, - session, - content_name, - content_data.pop("desc_elt"), - ) - app_d.addCallback(addElement, content_elt) - defers_list.append(app_d) - - transport = content_data["transport"] - transport_session_accept_cb = transport.handler.jingle_handler - - transport_d = utils.as_deferred( - transport_session_accept_cb, - client, - XEP_0166.A_SESSION_INITIATE, - session, - content_name, - content_data.pop("transport_elt"), - ) - transport_d.addCallback(addElement, content_elt) - defers_list.append(transport_d) - - d_list = defer.DeferredList(defers_list) - d_list.addCallback( - lambda __: self._call_plugins( - client, - XEP_0166.A_PREPARE_RESPONDER, - session, - app_method_name=None, - elements=False, - ) - ) - d_list.addCallback(lambda __: iq_elt.send()) - - def change_state(__, session): - session["state"] = STATE_ACTIVE - - d_list.addCallback(change_state, session) - d_list.addCallback( - lambda __: self._call_plugins( - client, XEP_0166.A_ACCEPTED_ACK, session, elements=False - ) - ) - d_list.addErrback(self._iq_error, session["id"], client) - return d_list - - def on_session_terminate(self, client, request, jingle_elt, session): - # TODO: check reason, display a message to user if needed - log.debug("Jingle Session {} terminated".format(session["id"])) - try: - reason_elt = next(jingle_elt.elements(NS_JINGLE, "reason")) - except StopIteration: - log.warning("No reason given for session termination") - reason_elt = jingle_elt.addElement("reason") - - terminate_defers = self._call_plugins( - client, - XEP_0166.A_SESSION_TERMINATE, - session, - "jingle_terminate", - "jingle_terminate", - self._ignore, - self._ignore, - elements=False, - force_element=reason_elt, - ) - terminate_dlist = defer.DeferredList(terminate_defers) - - terminate_dlist.addCallback(lambda __: self._del_session(client, session["id"])) - client.send(xmlstream.toResponse(request, "result")) - - def on_session_accept(self, client, request, jingle_elt, session): - """Method called once session is accepted - - This method is only called for initiator - @param client: %(doc_client)s - @param request(domish.Element): full <iq> request - @param jingle_elt(domish.Element): the <jingle> element - @param session(dict): session data - """ - log.debug("Jingle session {} has been accepted".format(session["id"])) - - try: - self._parse_elements(jingle_elt, session, request, client) - except exceptions.CancelError: - return - - # at this point we can send the <iq/> result to confirm reception of the request - client.send(xmlstream.toResponse(request, "result")) - # and change the state - session["state"] = STATE_ACTIVE - - negociate_defers = [] - negociate_defers = self._call_plugins(client, XEP_0166.A_SESSION_ACCEPT, session) - - negociate_dlist = defer.gatherResults(negociate_defers) - - # after negociations we start the transfer - negociate_dlist.addCallback( - lambda __: self._call_plugins( - client, XEP_0166.A_START, session, app_method_name=None, elements=False - ) - ) - - def _on_session_cb(self, result, client, request, jingle_elt, session): - client.send(xmlstream.toResponse(request, "result")) - - def _on_session_eb(self, failure_, client, request, jingle_elt, session): - log.error("Error while handling on_session_info: {}".format(failure_.value)) - # XXX: only error managed so far, maybe some applications/transports need more - self.sendError( - client, "feature-not-implemented", None, request, "unsupported-info" - ) - - def on_session_info(self, client, request, jingle_elt, session): - """Method called when a session-info action is received from other peer - - This method is only called for initiator - @param client: %(doc_client)s - @param request(domish.Element): full <iq> request - @param jingle_elt(domish.Element): the <jingle> element - @param session(dict): session data - """ - if not jingle_elt.children: - # this is a session ping, see XEP-0166 §6.8 - client.send(xmlstream.toResponse(request, "result")) - return - - try: - # XXX: session-info is most likely only used for application, so we don't call transport plugins - # if a future transport use it, this behaviour must be adapted - defers = self._call_plugins( - client, - XEP_0166.A_SESSION_INFO, - session, - "jingle_session_info", - None, - elements=False, - force_element=jingle_elt, - ) - except exceptions.NotFound as e: - self._on_session_eb(failure.Failure(e), client, request, jingle_elt, session) - return - - dlist = defer.DeferredList(defers, fireOnOneErrback=True) - dlist.addCallback(self._on_session_cb, client, request, jingle_elt, session) - dlist.addErrback(self._on_session_cb, client, request, jingle_elt, session) - - @defer.inlineCallbacks - def on_transport_replace(self, client, request, jingle_elt, session): - """A transport change is requested - - The request is parsed, and jingle_handler is called on concerned transport plugin(s) - @param client: %(doc_client)s - @param request(domish.Element): full <iq> request - @param jingle_elt(domish.Element): the <jingle> element - @param session(dict): session data - """ - log.debug("Other peer wants to replace the transport") - try: - self._parse_elements( - jingle_elt, session, request, client, with_application=False - ) - except exceptions.CancelError: - defer.returnValue(None) - - client.send(xmlstream.toResponse(request, "result")) - - content_name = None - to_replace = [] - - for content_name, content_data in session["contents"].items(): - try: - transport_elt = content_data.pop("transport_elt") - except KeyError: - continue - transport_ns = transport_elt.uri - try: - transport = self._transports[transport_ns] - except KeyError: - log.warning( - "Other peer want to replace current transport with an unknown one: {}".format( - transport_ns - ) - ) - content_name = None - break - to_replace.append((content_name, content_data, transport, transport_elt)) - - if content_name is None: - # wa can't accept the replacement - iq_elt, reject_jingle_elt = self._build_jingle_elt( - client, session, XEP_0166.A_TRANSPORT_REJECT - ) - for child in jingle_elt.children: - reject_jingle_elt.addChild(child) - - iq_elt.send() - defer.returnValue(None) - - # at this point, everything is alright and we can replace the transport(s) - # this is similar to an session-accept action, but for transports only - iq_elt, accept_jingle_elt = self._build_jingle_elt( - client, session, XEP_0166.A_TRANSPORT_ACCEPT - ) - for content_name, content_data, transport, transport_elt in to_replace: - # we can now actually replace the transport - yield content_data["transport"].handler.jingle_handler( - client, XEP_0166.A_DESTROY, session, content_name, None - ) - content_data["transport"] = transport - content_data["transport_data"].clear() - # and build the element - content_elt = accept_jingle_elt.addElement("content") - content_elt["name"] = content_name - content_elt["creator"] = content_data["creator"] - # we notify the transport and insert its <transport/> in the answer - accept_transport_elt = yield transport.handler.jingle_handler( - client, XEP_0166.A_TRANSPORT_REPLACE, session, content_name, transport_elt - ) - content_elt.addChild(accept_transport_elt) - # there is no confirmation needed here, so we can directly prepare it - yield transport.handler.jingle_handler( - client, XEP_0166.A_PREPARE_RESPONDER, session, content_name, None - ) - - iq_elt.send() - - def on_transport_accept(self, client, request, jingle_elt, session): - """Method called once transport replacement is accepted - - @param client: %(doc_client)s - @param request(domish.Element): full <iq> request - @param jingle_elt(domish.Element): the <jingle> element - @param session(dict): session data - """ - log.debug("new transport has been accepted") - - try: - self._parse_elements( - jingle_elt, session, request, client, with_application=False - ) - except exceptions.CancelError: - return - - # at this point we can send the <iq/> result to confirm reception of the request - client.send(xmlstream.toResponse(request, "result")) - - negociate_defers = [] - negociate_defers = self._call_plugins( - client, XEP_0166.A_TRANSPORT_ACCEPT, session, app_method_name=None - ) - - negociate_dlist = defer.DeferredList(negociate_defers) - - # after negociations we start the transfer - negociate_dlist.addCallback( - lambda __: self._call_plugins( - client, XEP_0166.A_START, session, app_method_name=None, elements=False - ) - ) - - def on_transport_reject(self, client, request, jingle_elt, session): - """Method called when a transport replacement is refused - - @param client: %(doc_client)s - @param request(domish.Element): full <iq> request - @param jingle_elt(domish.Element): the <jingle> element - @param session(dict): session data - """ - # XXX: for now, we terminate the session in case of transport-reject - # this behaviour may change in the future - self.terminate(client, "failed-transport", session) - - def on_transport_info(self, client, request, jingle_elt, session): - """Method called when a transport-info action is received from other peer - - The request is parsed, and jingle_handler is called on concerned transport plugin(s) - @param client: %(doc_client)s - @param request(domish.Element): full <iq> request - @param jingle_elt(domish.Element): the <jingle> element - @param session(dict): session data - """ - log.debug("Jingle session {} has been accepted".format(session["id"])) - - try: - self._parse_elements( - jingle_elt, session, request, client, with_application=False - ) - except exceptions.CancelError: - return - - # The parsing was OK, we send the <iq> result - client.send(xmlstream.toResponse(request, "result")) - - for content_name, content_data in session["contents"].items(): - try: - transport_elt = content_data.pop("transport_elt") - except KeyError: - continue - else: - content_data["transport"].handler.jingle_handler( - client, - XEP_0166.A_TRANSPORT_INFO, - session, - content_name, - transport_elt, - ) - - -@implementer(iwokkel.IDisco) -class XEP_0166_handler(xmlstream.XMPPHandler): - - def __init__(self, plugin_parent): - self.plugin_parent = plugin_parent - - def connectionInitialized(self): - self.xmlstream.addObserver( - JINGLE_REQUEST, self.plugin_parent._on_jingle_request, client=self.parent - ) - - def getDiscoInfo(self, requestor, target, nodeIdentifier=""): - return [disco.DiscoFeature(NS_JINGLE)] - - def getDiscoItems(self, requestor, target, nodeIdentifier=""): - return []
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sat/plugins/plugin_xep_0166/__init__.py Mon May 15 16:23:11 2023 +0200 @@ -0,0 +1,1390 @@ +#!/usr/bin/env python3 + +# Libervia plugin for Jingle (XEP-0166) +# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +import time +from typing import Any, Callable, Dict, Final, List, Optional, Tuple +import uuid + +from twisted.internet import defer +from twisted.internet import reactor +from twisted.python import failure +from twisted.words.protocols.jabber import jid +from twisted.words.protocols.jabber import error +from twisted.words.protocols.jabber import xmlstream +from twisted.words.xish import domish +from wokkel import disco, iwokkel +from zope.interface import implementer + +from sat.core import exceptions +from sat.core.constants import Const as C +from sat.core.core_types import SatXMPPEntity +from sat.core.i18n import D_, _ +from sat.core.log import getLogger +from sat.tools import xml_tools +from sat.tools import utils + +from .models import ( + ApplicationData, + BaseApplicationHandler, + BaseTransportHandler, + ContentData, + TransportData, +) + + +log = getLogger(__name__) + + +IQ_SET : Final = '/iq[@type="set"]' +NS_JINGLE : Final = "urn:xmpp:jingle:1" +NS_JINGLE_ERROR : Final = "urn:xmpp:jingle:errors:1" +JINGLE_REQUEST : Final = f'{IQ_SET}/jingle[@xmlns="{NS_JINGLE}"]' +STATE_PENDING : Final = "PENDING" +STATE_ACTIVE : Final = "ACTIVE" +STATE_ENDED : Final = "ENDED" +CONFIRM_TXT : Final = D_( + "{entity} want to start a jingle session with you, do you accept ?" +) + +PLUGIN_INFO : Final = { + C.PI_NAME: "Jingle", + C.PI_IMPORT_NAME: "XEP-0166", + C.PI_TYPE: "XEP", + C.PI_MODES: C.PLUG_MODE_BOTH, + C.PI_PROTOCOLS: ["XEP-0166"], + C.PI_MAIN: "XEP_0166", + C.PI_HANDLER: "yes", + C.PI_DESCRIPTION: _("""Implementation of Jingle"""), +} + + +class XEP_0166: + namespace : Final = NS_JINGLE + + ROLE_INITIATOR : Final = "initiator" + ROLE_RESPONDER : Final = "responder" + + TRANSPORT_DATAGRAM : Final = "UDP" + TRANSPORT_STREAMING : Final = "TCP" + + REASON_SUCCESS : Final = "success" + REASON_DECLINE : Final = "decline" + REASON_FAILED_APPLICATION : Final = "failed-application" + REASON_FAILED_TRANSPORT : Final = "failed-transport" + REASON_CONNECTIVITY_ERROR : Final = "connectivity-error" + + # standard actions + + A_SESSION_INITIATE : Final = "session-initiate" + A_SESSION_ACCEPT : Final = "session-accept" + A_SESSION_TERMINATE : Final = "session-terminate" + A_SESSION_INFO : Final = "session-info" + A_TRANSPORT_REPLACE : Final = "transport-replace" + A_TRANSPORT_ACCEPT : Final = "transport-accept" + A_TRANSPORT_REJECT : Final = "transport-reject" + A_TRANSPORT_INFO : Final = "transport-info" + + # non standard actions + + #: called before the confirmation request, first event for responder, useful for + #: parsing + A_PREPARE_CONFIRMATION : Final = "prepare-confirmation" + #: initiator must prepare tranfer + A_PREPARE_INITIATOR : Final = "prepare-initiator" + #: responder must prepare tranfer + A_PREPARE_RESPONDER : Final = "prepare-responder" + #; session accepted ack has been received from initiator + A_ACCEPTED_ACK : Final = ( + "accepted-ack" + ) + A_START : Final = "start" # application can start + #: called when a transport is destroyed (e.g. because it is remplaced). Used to do + #: cleaning operations + A_DESTROY : Final = ( + "destroy" + ) + + def __init__(self, host): + log.info(_("plugin Jingle initialization")) + self.host = host + self._applications = {} # key: namespace, value: application data + self._transports = {} # key: namespace, value: transport data + # we also keep transports by type, they are then sorted by priority + self._type_transports = { + XEP_0166.TRANSPORT_DATAGRAM: [], + XEP_0166.TRANSPORT_STREAMING: [], + } + + def profile_connected(self, client): + client.jingle_sessions = {} # key = sid, value = session_data + + def get_handler(self, client): + return XEP_0166_handler(self) + + def get_session(self, client: SatXMPPEntity, session_id: str) -> dict: + """Retrieve session from its SID + + @param session_id: session ID + @return: found session + + @raise exceptions.NotFound: no session with this SID has been found + """ + try: + return client.jingle_sessions[session_id] + except KeyError: + raise exceptions.NotFound( + f"No session with SID {session_id} found" + ) + + + def _del_session(self, client, sid): + try: + del client.jingle_sessions[sid] + except KeyError: + log.debug( + f"Jingle session id {sid!r} is unknown, nothing to delete " + f"[{client.profile}]") + else: + log.debug(f"Jingle session id {sid!r} deleted [{client.profile}]") + + ## helpers methods to build stanzas ## + + def _build_jingle_elt( + self, + client: SatXMPPEntity, + session: dict, + action: str + ) -> Tuple[xmlstream.IQ, domish.Element]: + iq_elt = client.IQ("set") + iq_elt["from"] = session['local_jid'].full() + iq_elt["to"] = session["peer_jid"].full() + jingle_elt = iq_elt.addElement("jingle", NS_JINGLE) + jingle_elt["sid"] = session["id"] + jingle_elt["action"] = action + return iq_elt, jingle_elt + + def sendError(self, client, error_condition, sid, request, jingle_condition=None): + """Send error stanza + + @param error_condition: one of twisted.words.protocols.jabber.error.STANZA_CONDITIONS keys + @param sid(unicode,None): jingle session id, or None, if session must not be destroyed + @param request(domish.Element): original request + @param jingle_condition(None, unicode): if not None, additional jingle-specific error information + """ + iq_elt = error.StanzaError(error_condition).toResponse(request) + if jingle_condition is not None: + iq_elt.error.addElement((NS_JINGLE_ERROR, jingle_condition)) + if error.STANZA_CONDITIONS[error_condition]["type"] == "cancel" and sid: + self._del_session(client, sid) + log.warning( + "Error while managing jingle session, cancelling: {condition}".format( + condition=error_condition + ) + ) + return client.send(iq_elt) + + def _terminate_eb(self, failure_): + log.warning(_("Error while terminating session: {msg}").format(msg=failure_)) + + def terminate(self, client, reason, session, text=None): + """Terminate the session + + send the session-terminate action, and delete the session data + @param reason(unicode, list[domish.Element]): if unicode, will be transformed to an element + if a list of element, add them as children of the <reason/> element + @param session(dict): data of the session + """ + iq_elt, jingle_elt = self._build_jingle_elt( + client, session, XEP_0166.A_SESSION_TERMINATE + ) + reason_elt = jingle_elt.addElement("reason") + if isinstance(reason, str): + reason_elt.addElement(reason) + else: + for elt in reason: + reason_elt.addChild(elt) + if text is not None: + reason_elt.addElement("text", content=text) + self._del_session(client, session["id"]) + d = iq_elt.send() + d.addErrback(self._terminate_eb) + return d + + ## errors which doesn't imply a stanza sending ## + + def _iq_error(self, failure_, sid, client): + """Called when we got an <iq/> error + + @param failure_(failure.Failure): the exceptions raised + @param sid(unicode): jingle session id + """ + log.warning( + "Error while sending jingle <iq/> stanza: {failure_}".format( + failure_=failure_.value + ) + ) + self._del_session(client, sid) + + def _jingle_error_cb(self, failure_, session, request, client): + """Called when something is going wrong while parsing jingle request + + The error condition depend of the exceptions raised: + exceptions.DataError raise a bad-request condition + @param fail(failure.Failure): the exceptions raised + @param session(dict): data of the session + @param request(domsih.Element): jingle request + @param client: %(doc_client)s + """ + log.warning(f"Error while processing jingle request [{client.profile}]") + if isinstance(failure_.value, defer.FirstError): + failure_ = failure_.value.subFailure.value + if isinstance(failure_, exceptions.DataError): + return self.sendError(client, "bad-request", session["id"], request) + elif isinstance(failure_, error.StanzaError): + return self.terminate(client, self.REASON_FAILED_APPLICATION, session, + text=str(failure_)) + else: + log.error(f"Unmanaged jingle exception: {failure_}") + return self.terminate(client, self.REASON_FAILED_APPLICATION, session, + text=str(failure_)) + + ## methods used by other plugins ## + + def register_application( + self, + namespace: str, + handler: BaseApplicationHandler + ) -> None: + """Register an application plugin + + @param namespace(unicode): application namespace managed by the plugin + @param handler(object): instance of a class which manage the application. + May have the following methods: + - request_confirmation(session, desc_elt, client): + - if present, it is called on when session must be accepted. + - if it return True the session is accepted, else rejected. + A Deferred can be returned + - if not present, a generic accept dialog will be used + - jingle_session_init( + client, self, session, content_name[, *args, **kwargs] + ): must return the domish.Element used for initial content + - jingle_handler( + client, self, action, session, content_name, transport_elt + ): + called on several action to negociate the application or transport + - jingle_terminate: called on session terminate, with reason_elt + May be used to clean session + """ + if namespace in self._applications: + raise exceptions.ConflictError( + f"Trying to register already registered namespace {namespace}" + ) + self._applications[namespace] = ApplicationData( + namespace=namespace, handler=handler + ) + log.debug("new jingle application registered") + + def register_transport( + self, + namespace: str, + transport_type: str, + handler: BaseTransportHandler, + priority: int = 0 + ) -> None: + """Register a transport plugin + + @param namespace: the XML namespace used for this transport + @param transport_type: type of transport to use (see XEP-0166 §8) + @param handler: instance of a class which manage the application. + @param priority: priority of this transport + """ + assert transport_type in ( + XEP_0166.TRANSPORT_DATAGRAM, + XEP_0166.TRANSPORT_STREAMING, + ) + if namespace in self._transports: + raise exceptions.ConflictError( + "Trying to register already registered namespace {}".format(namespace) + ) + transport_data = TransportData( + namespace=namespace, handler=handler, priority=priority + ) + self._type_transports[transport_type].append(transport_data) + self._type_transports[transport_type].sort( + key=lambda transport_data: transport_data.priority, reverse=True + ) + self._transports[namespace] = transport_data + log.debug("new jingle transport registered") + + @defer.inlineCallbacks + def transport_replace(self, client, transport_ns, session, content_name): + """Replace a transport + + @param transport_ns(unicode): namespace of the new transport to use + @param session(dict): jingle session data + @param content_name(unicode): name of the content + """ + # XXX: for now we replace the transport before receiving confirmation from other peer + # this is acceptable because we terminate the session if transport is rejected. + # this behavious may change in the future. + content_data = session["contents"][content_name] + transport_data = content_data["transport_data"] + try: + transport = self._transports[transport_ns] + except KeyError: + raise exceptions.InternalError("Unkown transport") + yield content_data["transport"].handler.jingle_handler( + client, XEP_0166.A_DESTROY, session, content_name, None + ) + content_data["transport"] = transport + transport_data.clear() + + iq_elt, jingle_elt = self._build_jingle_elt( + client, session, XEP_0166.A_TRANSPORT_REPLACE + ) + content_elt = jingle_elt.addElement("content") + content_elt["name"] = content_name + content_elt["creator"] = content_data["creator"] + + transport_elt = transport.handler.jingle_session_init(client, session, content_name) + content_elt.addChild(transport_elt) + iq_elt.send() + + def build_action( + self, + client: SatXMPPEntity, + action: str, + session: dict, + content_name: str, + iq_elt: Optional[xmlstream.IQ] = None, + context_elt: Optional[domish.Element] = None + ) -> Tuple[xmlstream.IQ, domish.Element]: + """Build an element according to requested action + + @param action: a jingle action (see XEP-0166 §7.2), + session-* actions are not managed here + transport-replace is managed in the dedicated [transport_replace] method + @param session: jingle session data + @param content_name: name of the content + @param iq_elt: use this IQ instead of creating a new one if provided + @param context_elt: use this element instead of creating a new one if provided + @return: parent <iq> element, <transport> or <description> element, according to action + """ + # we first build iq, jingle and content element which are the same in every cases + if iq_elt is not None: + try: + jingle_elt = next(iq_elt.elements(NS_JINGLE, "jingle")) + except StopIteration: + raise exceptions.InternalError( + "The <iq> element provided doesn't have a <jingle> element" + ) + else: + iq_elt, jingle_elt = self._build_jingle_elt(client, session, action) + # FIXME: XEP-0260 § 2.3 Ex 5 has an initiator attribute, but it should not according to XEP-0166 §7.1 table 1, must be checked + content_data = session["contents"][content_name] + content_elt = jingle_elt.addElement("content") + content_elt["name"] = content_name + content_elt["creator"] = content_data["creator"] + + if context_elt is not None: + pass + elif action == XEP_0166.A_TRANSPORT_INFO: + context_elt = transport_elt = content_elt.addElement( + "transport", content_data["transport"].namespace + ) + else: + raise exceptions.InternalError(f"unmanaged action {action}") + + return iq_elt, context_elt + + def build_session_info(self, client, session): + """Build a session-info action + + @param session(dict): jingle session data + @return (tuple[domish.Element, domish.Element]): parent <iq> element, <jingle> element + """ + return self._build_jingle_elt(client, session, XEP_0166.A_SESSION_INFO) + + def get_application(self, namespace: str) -> ApplicationData: + """Retreive application corresponding to a namespace + + @raise exceptions.NotFound if application can't be found + """ + try: + return self._applications[namespace] + except KeyError: + raise exceptions.NotFound( + f"No application registered for {namespace}" + ) + + def get_content_data(self, content: dict) -> ContentData: + """"Retrieve application and its argument from content""" + app_ns = content["app_ns"] + try: + application = self.get_application(app_ns) + except exceptions.NotFound as e: + raise exceptions.InternalError(str(e)) + app_args = content.get("app_args", []) + app_kwargs = content.get("app_kwargs", {}) + transport_data = content.get("transport_data", {}) + try: + content_name = content["name"] + except KeyError: + content_name = content["name"] = str(uuid.uuid4()) + return ContentData( + application, + app_args, + app_kwargs, + transport_data, + content_name + ) + + async def initiate( + self, + client: SatXMPPEntity, + peer_jid: jid.JID, + contents: List[dict], + encrypted: bool = False, + **extra_data: Any + ) -> str: + """Send a session initiation request + + @param peer_jid: jid to establith session with + @param contents: list of contents to use: + The dict must have the following keys: + - app_ns(str): namespace of the application + the following keys are optional: + - transport_type(str): type of transport to use (see XEP-0166 §8) + default to TRANSPORT_STREAMING + - name(str): name of the content + - senders(str): One of XEP_0166.ROLE_INITIATOR, XEP_0166.ROLE_RESPONDER, both or none + default to BOTH (see XEP-0166 §7.3) + - app_args(list): args to pass to the application plugin + - app_kwargs(dict): keyword args to pass to the application plugin + @param encrypted: if True, session must be encrypted and "encryption" must be set + to all content data of session + @return: jingle session id + """ + assert contents # there must be at least one content + if (peer_jid == client.jid + or client.is_component and peer_jid.host == client.jid.host): + raise ValueError(_("You can't do a jingle session with yourself")) + initiator = client.jid + sid = str(uuid.uuid4()) + # TODO: session cleaning after timeout ? + session = client.jingle_sessions[sid] = { + "id": sid, + "state": STATE_PENDING, + "initiator": initiator, + "role": XEP_0166.ROLE_INITIATOR, + "local_jid": client.jid, + "peer_jid": peer_jid, + "started": time.time(), + "contents": {}, + **extra_data, + } + + if not await self.host.trigger.async_point( + "XEP-0166_initiate", + client, session, contents + ): + return sid + + iq_elt, jingle_elt = self._build_jingle_elt( + client, session, XEP_0166.A_SESSION_INITIATE + ) + jingle_elt["initiator"] = initiator.full() + + session_contents = session["contents"] + + for content in contents: + # we get the application plugin + content_data = self.get_content_data(content) + + # and the transport plugin + transport_type = content.get("transport_type", XEP_0166.TRANSPORT_STREAMING) + try: + transport = self._type_transports[transport_type][0] + except IndexError: + raise exceptions.InternalError( + "No transport registered for {}".format(transport_type) + ) + + # we build the session data for this content + session_content = { + "application": content_data.application, + "application_data": {}, + "transport": transport, + "transport_data": content_data.transport_data, + "creator": XEP_0166.ROLE_INITIATOR, + "senders": content.get("senders", "both"), + } + if content_data.content_name in session_contents: + raise exceptions.InternalError( + "There is already a content with this name" + ) + session_contents[content_data.content_name] = session_content + + # we construct the content element + content_elt = jingle_elt.addElement("content") + content_elt["creator"] = session_content["creator"] + content_elt["name"] = content_data.content_name + try: + content_elt["senders"] = content["senders"] + except KeyError: + pass + + # then the description element + desc_elt = await utils.as_deferred( + content_data.application.handler.jingle_session_init, + client, session, content_data.content_name, + *content_data.app_args, **content_data.app_kwargs + ) + content_elt.addChild(desc_elt) + + # and the transport one + transport_elt = await utils.as_deferred( + transport.handler.jingle_session_init, + client, session, content_data.content_name, + ) + content_elt.addChild(transport_elt) + + if not await self.host.trigger.async_point( + "XEP-0166_initiate_elt_built", + client, session, iq_elt, jingle_elt + ): + return sid + if encrypted: + for content in session["contents"].values(): + if "encryption" not in content: + raise exceptions.EncryptionError( + "Encryption is requested, but no encryption has been set" + ) + + try: + await iq_elt.send() + except Exception as e: + failure_ = failure.Failure(e) + self._iq_error(failure_, sid, client) + raise failure_ + return sid + + def delayed_content_terminate(self, *args, **kwargs): + """Put content_terminate in queue but don't execute immediately + + This is used to terminate a content inside a handler, to avoid modifying contents + """ + reactor.callLater(0, self.content_terminate, *args, **kwargs) + + def content_terminate(self, client, session, content_name, reason=REASON_SUCCESS): + """Terminate and remove a content + + if there is no more content, then session is terminated + @param session(dict): jingle session + @param content_name(unicode): name of the content terminated + @param reason(unicode): reason of the termination + """ + contents = session["contents"] + del contents[content_name] + if not contents: + self.terminate(client, reason, session) + + ## defaults methods called when plugin doesn't have them ## + + def jingle_request_confirmation_default( + self, client, action, session, content_name, desc_elt + ): + """This method request confirmation for a jingle session""" + log.debug("Using generic jingle confirmation method") + return xml_tools.defer_confirm( + self.host, + _(CONFIRM_TXT).format(entity=session["peer_jid"].full()), + _("Confirm Jingle session"), + profile=client.profile, + ) + + ## jingle events ## + + def _on_jingle_request(self, request: domish.Element, client: SatXMPPEntity) -> None: + defer.ensureDeferred(self.on_jingle_request(client, request)) + + async def on_jingle_request( + self, + client: SatXMPPEntity, + request: domish.Element + ) -> None: + """Called when any jingle request is received + + The request will then be dispatched to appropriate method + according to current state + @param request(domish.Element): received IQ request + """ + request.handled = True + jingle_elt = next(request.elements(NS_JINGLE, "jingle")) + + # first we need the session id + try: + sid = jingle_elt["sid"] + if not sid: + raise KeyError + except KeyError: + log.warning("Received jingle request has no sid attribute") + self.sendError(client, "bad-request", None, request) + return + + # then the action + try: + action = jingle_elt["action"] + if not action: + raise KeyError + except KeyError: + log.warning("Received jingle request has no action") + self.sendError(client, "bad-request", None, request) + return + + peer_jid = jid.JID(request["from"]) + + # we get or create the session + try: + session = client.jingle_sessions[sid] + except KeyError: + if action == XEP_0166.A_SESSION_INITIATE: + pass + elif action == XEP_0166.A_SESSION_TERMINATE: + log.debug( + "ignoring session terminate action (inexisting session id): {request_id} [{profile}]".format( + request_id=sid, profile=client.profile + ) + ) + return + else: + log.warning( + "Received request for an unknown session id: {request_id} [{profile}]".format( + request_id=sid, profile=client.profile + ) + ) + self.sendError(client, "item-not-found", None, request, "unknown-session") + return + + session = client.jingle_sessions[sid] = { + "id": sid, + "state": STATE_PENDING, + "initiator": peer_jid, + "role": XEP_0166.ROLE_RESPONDER, + # we store local_jid using request['to'] because for a component the jid + # used may not be client.jid (if a local part is used). + "local_jid": jid.JID(request['to']), + "peer_jid": peer_jid, + "started": time.time(), + } + else: + if session["peer_jid"] != peer_jid: + log.warning( + "sid conflict ({}), the jid doesn't match. Can be a collision, a hack attempt, or a bad sid generation".format( + sid + ) + ) + self.sendError(client, "service-unavailable", sid, request) + return + if session["id"] != sid: + log.error("session id doesn't match") + self.sendError(client, "service-unavailable", sid, request) + raise exceptions.InternalError + + if action == XEP_0166.A_SESSION_INITIATE: + await self.on_session_initiate(client, request, jingle_elt, session) + elif action == XEP_0166.A_SESSION_TERMINATE: + self.on_session_terminate(client, request, jingle_elt, session) + elif action == XEP_0166.A_SESSION_ACCEPT: + await self.on_session_accept(client, request, jingle_elt, session) + elif action == XEP_0166.A_SESSION_INFO: + self.on_session_info(client, request, jingle_elt, session) + elif action == XEP_0166.A_TRANSPORT_INFO: + self.on_transport_info(client, request, jingle_elt, session) + elif action == XEP_0166.A_TRANSPORT_REPLACE: + await self.on_transport_replace(client, request, jingle_elt, session) + elif action == XEP_0166.A_TRANSPORT_ACCEPT: + self.on_transport_accept(client, request, jingle_elt, session) + elif action == XEP_0166.A_TRANSPORT_REJECT: + self.on_transport_reject(client, request, jingle_elt, session) + else: + raise exceptions.InternalError(f"Unknown action {action}") + + ## Actions callbacks ## + + def _parse_elements( + self, + jingle_elt: domish.Element, + session: dict, + request: domish.Element, + client: SatXMPPEntity, + new: bool = False, + creator: str = ROLE_INITIATOR, + with_application: bool =True, + with_transport: bool = True, + store_in_session: bool = True, + ) -> Dict[str, dict]: + """Parse contents elements and fill contents_dict accordingly + + after the parsing, contents_dict will containt handlers, "desc_elt" and "transport_elt" + @param jingle_elt: parent <jingle> element, containing one or more <content> + @param session: session data + @param request: the whole request + @param client: %(doc_client)s + @param new: True if the content is new and must be created, + else the content must exists, and session data will be filled + @param creator: only used if new is True: creating pear (see § 7.3) + @param with_application: if True, raise an error if there is no <description> + element else ignore it + @param with_transport: if True, raise an error if there is no <transport> element + else ignore it + @param store_in_session: if True, the ``session`` contents will be updated with + the parsed elements. + Use False when you parse an action which can happen at any time (e.g. + transport-info) and meaning that a parsed element may already be present in + the session (e.g. if an authorisation request is waiting for user answer), + This can't be used when ``new`` is set. + @return: contents_dict (from session, or a new one if "store_in_session" is False) + @raise exceptions.CancelError: the error is treated and the calling method can + cancel the treatment (i.e. return) + """ + if store_in_session: + contents_dict = session["contents"] + else: + if new: + raise exceptions.InternalError( + '"store_in_session" must not be used when "new" is set' + ) + contents_dict = {n: {} for n in session["contents"]} + content_elts = jingle_elt.elements(NS_JINGLE, "content") + + for content_elt in content_elts: + name = content_elt["name"] + + if new: + # the content must not exist, we check it + if not name or name in contents_dict: + self.sendError(client, "bad-request", session["id"], request) + raise exceptions.CancelError + content_data = contents_dict[name] = { + "creator": creator, + "senders": content_elt.attributes.get("senders", "both"), + } + else: + # the content must exist, we check it + try: + content_data = contents_dict[name] + except KeyError: + log.warning("Other peer try to access an unknown content") + self.sendError(client, "bad-request", session["id"], request) + raise exceptions.CancelError + + # application + if with_application: + desc_elt = content_elt.description + if not desc_elt: + self.sendError(client, "bad-request", session["id"], request) + raise exceptions.CancelError + + if new: + # the content is new, we need to check and link the application + app_ns = desc_elt.uri + if not app_ns or app_ns == NS_JINGLE: + self.sendError(client, "bad-request", session["id"], request) + raise exceptions.CancelError + + try: + application = self._applications[app_ns] + except KeyError: + log.warning( + "Unmanaged application namespace [{}]".format(app_ns) + ) + self.sendError( + client, "service-unavailable", session["id"], request + ) + raise exceptions.CancelError + + content_data["application"] = application + content_data["application_data"] = {} + else: + # the content exists, we check that we have not a former desc_elt + if "desc_elt" in content_data: + raise exceptions.InternalError( + "desc_elt should not exist at this point" + ) + + content_data["desc_elt"] = desc_elt + + # transport + if with_transport: + transport_elt = content_elt.transport + if not transport_elt: + self.sendError(client, "bad-request", session["id"], request) + raise exceptions.CancelError + + if new: + # the content is new, we need to check and link the transport + transport_ns = transport_elt.uri + if not app_ns or app_ns == NS_JINGLE: + self.sendError(client, "bad-request", session["id"], request) + raise exceptions.CancelError + + try: + transport = self._transports[transport_ns] + except KeyError: + raise exceptions.InternalError( + "No transport registered for namespace {}".format( + transport_ns + ) + ) + content_data["transport"] = transport + content_data["transport_data"] = {} + else: + # the content exists, we check that we have not a former transport_elt + if "transport_elt" in content_data: + raise exceptions.InternalError( + "transport_elt should not exist at this point" + ) + + content_data["transport_elt"] = transport_elt + + return contents_dict + + def _ignore(self, client, action, session, content_name, elt): + """Dummy method used when not exception must be raised if a method is not implemented in _call_plugins + + must be used as app_default_cb and/or transp_default_cb + """ + return elt + + def _call_plugins( + self, + client: SatXMPPEntity, + action: str, + session: dict, + app_method_name: Optional[str] = "jingle_handler", + transp_method_name: Optional[str] = "jingle_handler", + app_default_cb: Optional[Callable] = None, + transp_default_cb: Optional[Callable] = None, + delete: bool = True, + elements: bool = True, + force_element: Optional[domish.Element] = None + ) -> List[defer.Deferred]: + """Call application and transport plugin methods for all contents + + @param action: jingle action name + @param session: jingle session data + @param app_method_name: name of the method to call for applications + None to ignore + @param transp_method_name: name of the method to call for transports + None to ignore + @param app_default_cb: default callback to use if plugin has not app_method_name + None to raise an exception instead + @param transp_default_cb: default callback to use if plugin has not transp_method_name + None to raise an exception instead + @param delete: if True, remove desc_elt and transport_elt from session + ignored if elements is False + @param elements: True if elements(desc_elt and tranport_elt) must be managed + must be True if _call_plugins is used in a request, and False if it is used + after a request (i.e. on <iq> result or error) + @param force_element: if elements is False, it is used as element parameter + else it is ignored + @return : list of launched Deferred + @raise exceptions.NotFound: method is not implemented + """ + contents_dict = session["contents"] + defers_list = [] + for content_name, content_data in contents_dict.items(): + for method_name, handler_key, default_cb, elt_name in ( + (app_method_name, "application", app_default_cb, "desc_elt"), + (transp_method_name, "transport", transp_default_cb, "transport_elt"), + ): + if method_name is None: + continue + + handler = content_data[handler_key].handler + try: + method = getattr(handler, method_name) + except AttributeError: + if default_cb is None: + raise exceptions.NotFound( + "{} not implemented !".format(method_name) + ) + else: + method = default_cb + if elements: + elt = content_data.pop(elt_name) if delete else content_data[elt_name] + else: + elt = force_element + d = utils.as_deferred( + method, client, action, session, content_name, elt + ) + defers_list.append(d) + + return defers_list + + async def on_session_initiate( + self, + client: SatXMPPEntity, + request: domish.Element, + jingle_elt: domish.Element, + session: Dict[str, Any] + ) -> None: + """Called on session-initiate action + + The "jingle_request_confirmation" method of each application will be called + (or self.jingle_request_confirmation_default if the former doesn't exist). + The session is only accepted if all application are confirmed. + The application must manage itself multiple contents scenari (e.g. audio/video). + @param client: %(doc_client)s + @param request(domish.Element): full request + @param jingle_elt(domish.Element): <jingle> element + @param session(dict): session data + """ + if "contents" in session: + raise exceptions.InternalError( + "Contents dict should not already exist at this point" + ) + session["contents"] = contents_dict = {} + + try: + self._parse_elements( + jingle_elt, session, request, client, True, XEP_0166.ROLE_INITIATOR + ) + except exceptions.CancelError: + return + + if not contents_dict: + # there MUST be at least one content + self.sendError(client, "bad-request", session["id"], request) + return + + # at this point we can send the <iq/> result to confirm reception of the request + client.send(xmlstream.toResponse(request, "result")) + + + if not await self.host.trigger.async_point( + "XEP-0166_on_session_initiate", + client, session, request, jingle_elt + ): + return + + await defer.DeferredList(self._call_plugins( + client, + XEP_0166.A_PREPARE_CONFIRMATION, + session, + delete=False + )) + + # we now request each application plugin confirmation + # and if all are accepted, we can accept the session + confirm_defers = self._call_plugins( + client, + XEP_0166.A_SESSION_INITIATE, + session, + "jingle_request_confirmation", + None, + self.jingle_request_confirmation_default, + delete=False, + ) + + confirm_dlist = defer.gatherResults(confirm_defers) + confirm_dlist.addCallback(self._confirmation_cb, session, jingle_elt, client) + confirm_dlist.addErrback(self._jingle_error_cb, session, request, client) + + def _confirmation_cb(self, confirm_results, session, jingle_elt, client): + """Method called when confirmation from user has been received + + This method is only called for the responder + @param confirm_results(list[bool]): all True if session is accepted + @param session(dict): session data + @param jingle_elt(domish.Element): jingle data of this session + @param client: %(doc_client)s + """ + confirmed = all(confirm_results) + if not confirmed: + return self.terminate(client, XEP_0166.REASON_DECLINE, session) + + iq_elt, jingle_elt = self._build_jingle_elt( + client, session, XEP_0166.A_SESSION_ACCEPT + ) + jingle_elt["responder"] = session['local_jid'].full() + + # contents + + def addElement(domish_elt, content_elt): + content_elt.addChild(domish_elt) + + defers_list = [] + + for content_name, content_data in session["contents"].items(): + content_elt = jingle_elt.addElement("content") + content_elt["creator"] = XEP_0166.ROLE_INITIATOR + content_elt["name"] = content_name + + application = content_data["application"] + app_session_accept_cb = application.handler.jingle_handler + + app_d = utils.as_deferred( + app_session_accept_cb, + client, + XEP_0166.A_SESSION_INITIATE, + session, + content_name, + content_data.pop("desc_elt"), + ) + app_d.addCallback(addElement, content_elt) + defers_list.append(app_d) + + transport = content_data["transport"] + transport_session_accept_cb = transport.handler.jingle_handler + + transport_d = utils.as_deferred( + transport_session_accept_cb, + client, + XEP_0166.A_SESSION_INITIATE, + session, + content_name, + content_data.pop("transport_elt"), + ) + transport_d.addCallback(addElement, content_elt) + defers_list.append(transport_d) + + d_list = defer.DeferredList(defers_list) + d_list.addCallback( + lambda __: self._call_plugins( + client, + XEP_0166.A_PREPARE_RESPONDER, + session, + app_method_name=None, + elements=False, + ) + ) + d_list.addCallback(lambda __: iq_elt.send()) + + def change_state(__, session): + session["state"] = STATE_ACTIVE + + d_list.addCallback(change_state, session) + d_list.addCallback( + lambda __: self._call_plugins( + client, XEP_0166.A_ACCEPTED_ACK, session, elements=False + ) + ) + d_list.addErrback(self._iq_error, session["id"], client) + return d_list + + def on_session_terminate(self, client, request, jingle_elt, session): + # TODO: check reason, display a message to user if needed + log.debug("Jingle Session {} terminated".format(session["id"])) + try: + reason_elt = next(jingle_elt.elements(NS_JINGLE, "reason")) + except StopIteration: + log.warning("No reason given for session termination") + reason_elt = jingle_elt.addElement("reason") + + terminate_defers = self._call_plugins( + client, + XEP_0166.A_SESSION_TERMINATE, + session, + "jingle_terminate", + "jingle_terminate", + self._ignore, + self._ignore, + elements=False, + force_element=reason_elt, + ) + terminate_dlist = defer.DeferredList(terminate_defers) + + terminate_dlist.addCallback(lambda __: self._del_session(client, session["id"])) + client.send(xmlstream.toResponse(request, "result")) + + async def on_session_accept(self, client, request, jingle_elt, session): + """Method called once session is accepted + + This method is only called for initiator + @param client: %(doc_client)s + @param request(domish.Element): full <iq> request + @param jingle_elt(domish.Element): the <jingle> element + @param session(dict): session data + """ + log.debug(f"Jingle session {session['id']} has been accepted") + + try: + self._parse_elements(jingle_elt, session, request, client) + except exceptions.CancelError: + return + + # at this point we can send the <iq/> result to confirm reception of the request + client.send(xmlstream.toResponse(request, "result")) + # and change the state + session["state"] = STATE_ACTIVE + + await defer.DeferredList(self._call_plugins( + client, + XEP_0166.A_PREPARE_INITIATOR, + session, + delete=False + )) + + negociate_defers = [] + negociate_defers = self._call_plugins(client, XEP_0166.A_SESSION_ACCEPT, session) + + negociate_dlist = defer.gatherResults(negociate_defers) + + # after negociations we start the transfer + negociate_dlist.addCallback( + lambda __: self._call_plugins( + client, XEP_0166.A_START, session, app_method_name=None, elements=False + ) + ) + + def _on_session_cb(self, result, client, request, jingle_elt, session): + client.send(xmlstream.toResponse(request, "result")) + + def _on_session_eb(self, failure_, client, request, jingle_elt, session): + log.error("Error while handling on_session_info: {}".format(failure_.value)) + # XXX: only error managed so far, maybe some applications/transports need more + self.sendError( + client, "feature-not-implemented", None, request, "unsupported-info" + ) + + def on_session_info(self, client, request, jingle_elt, session): + """Method called when a session-info action is received from other peer + + This method is only called for initiator + @param client: %(doc_client)s + @param request(domish.Element): full <iq> request + @param jingle_elt(domish.Element): the <jingle> element + @param session(dict): session data + """ + if not jingle_elt.children: + # this is a session ping, see XEP-0166 §6.8 + client.send(xmlstream.toResponse(request, "result")) + return + + try: + # XXX: session-info is most likely only used for application, so we don't call transport plugins + # if a future transport use it, this behaviour must be adapted + defers = self._call_plugins( + client, + XEP_0166.A_SESSION_INFO, + session, + "jingle_session_info", + None, + elements=False, + force_element=jingle_elt, + ) + except exceptions.NotFound as e: + self._on_session_eb(failure.Failure(e), client, request, jingle_elt, session) + return + + dlist = defer.DeferredList(defers, fireOnOneErrback=True) + dlist.addCallback(self._on_session_cb, client, request, jingle_elt, session) + dlist.addErrback(self._on_session_cb, client, request, jingle_elt, session) + + async def on_transport_replace(self, client, request, jingle_elt, session): + """A transport change is requested + + The request is parsed, and jingle_handler is called on concerned transport plugin(s) + @param client: %(doc_client)s + @param request(domish.Element): full <iq> request + @param jingle_elt(domish.Element): the <jingle> element + @param session(dict): session data + """ + log.debug("Other peer wants to replace the transport") + try: + self._parse_elements( + jingle_elt, session, request, client, with_application=False + ) + except exceptions.CancelError: + defer.returnValue(None) + + client.send(xmlstream.toResponse(request, "result")) + + content_name = None + to_replace = [] + + for content_name, content_data in session["contents"].items(): + try: + transport_elt = content_data.pop("transport_elt") + except KeyError: + continue + transport_ns = transport_elt.uri + try: + transport = self._transports[transport_ns] + except KeyError: + log.warning( + "Other peer want to replace current transport with an unknown one: {}".format( + transport_ns + ) + ) + content_name = None + break + to_replace.append((content_name, content_data, transport, transport_elt)) + + if content_name is None: + # wa can't accept the replacement + iq_elt, reject_jingle_elt = self._build_jingle_elt( + client, session, XEP_0166.A_TRANSPORT_REJECT + ) + for child in jingle_elt.children: + reject_jingle_elt.addChild(child) + + iq_elt.send() + defer.returnValue(None) + + # at this point, everything is alright and we can replace the transport(s) + # this is similar to an session-accept action, but for transports only + iq_elt, accept_jingle_elt = self._build_jingle_elt( + client, session, XEP_0166.A_TRANSPORT_ACCEPT + ) + for content_name, content_data, transport, transport_elt in to_replace: + # we can now actually replace the transport + await utils.as_deferred( + content_data["transport"].handler.jingle_handler, + client, XEP_0166.A_DESTROY, session, content_name, None + ) + content_data["transport"] = transport + content_data["transport_data"].clear() + # and build the element + content_elt = accept_jingle_elt.addElement("content") + content_elt["name"] = content_name + content_elt["creator"] = content_data["creator"] + # we notify the transport and insert its <transport/> in the answer + accept_transport_elt = await utils.as_deferred( + transport.handler.jingle_handler, + client, XEP_0166.A_TRANSPORT_REPLACE, session, content_name, transport_elt + ) + content_elt.addChild(accept_transport_elt) + # there is no confirmation needed here, so we can directly prepare it + await utils.as_deferred( + transport.handler.jingle_handler, + client, XEP_0166.A_PREPARE_RESPONDER, session, content_name, None + ) + + iq_elt.send() + + def on_transport_accept(self, client, request, jingle_elt, session): + """Method called once transport replacement is accepted + + @param client: %(doc_client)s + @param request(domish.Element): full <iq> request + @param jingle_elt(domish.Element): the <jingle> element + @param session(dict): session data + """ + log.debug("new transport has been accepted") + + try: + self._parse_elements( + jingle_elt, session, request, client, with_application=False + ) + except exceptions.CancelError: + return + + # at this point we can send the <iq/> result to confirm reception of the request + client.send(xmlstream.toResponse(request, "result")) + + negociate_defers = [] + negociate_defers = self._call_plugins( + client, XEP_0166.A_TRANSPORT_ACCEPT, session, app_method_name=None + ) + + negociate_dlist = defer.DeferredList(negociate_defers) + + # after negociations we start the transfer + negociate_dlist.addCallback( + lambda __: self._call_plugins( + client, XEP_0166.A_START, session, app_method_name=None, elements=False + ) + ) + + def on_transport_reject(self, client, request, jingle_elt, session): + """Method called when a transport replacement is refused + + @param client: %(doc_client)s + @param request(domish.Element): full <iq> request + @param jingle_elt(domish.Element): the <jingle> element + @param session(dict): session data + """ + # XXX: for now, we terminate the session in case of transport-reject + # this behaviour may change in the future + self.terminate(client, "failed-transport", session) + + def on_transport_info( + self, + client: SatXMPPEntity, + request: domish.Element, + jingle_elt: domish.Element, + session: dict + ) -> None: + """Method called when a transport-info action is received from other peer + + The request is parsed, and jingle_handler is called on concerned transport + plugin(s) + @param client: %(doc_client)s + @param request: full <iq> request + @param jingle_elt: the <jingle> element + @param session: session data + """ + log.debug(f"Jingle session {session['id']} has been accepted") + + try: + parsed_contents = self._parse_elements( + jingle_elt, session, request, client, with_application=False, + store_in_session=False + ) + except exceptions.CancelError: + return + + # The parsing was OK, we send the <iq> result + client.send(xmlstream.toResponse(request, "result")) + + for content_name, content_data in session["contents"].items(): + try: + transport_elt = parsed_contents[content_name]["transport_elt"] + except KeyError: + continue + else: + utils.as_deferred( + content_data["transport"].handler.jingle_handler, + client, + XEP_0166.A_TRANSPORT_INFO, + session, + content_name, + transport_elt, + ) + + +@implementer(iwokkel.IDisco) +class XEP_0166_handler(xmlstream.XMPPHandler): + + def __init__(self, plugin_parent): + self.plugin_parent = plugin_parent + + def connectionInitialized(self): + self.xmlstream.addObserver( + JINGLE_REQUEST, self.plugin_parent._on_jingle_request, client=self.parent + ) + + def getDiscoInfo(self, requestor, target, nodeIdentifier=""): + return [disco.DiscoFeature(NS_JINGLE)] + + def getDiscoItems(self, requestor, target, nodeIdentifier=""): + return []
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sat/plugins/plugin_xep_0166/models.py Mon May 15 16:23:11 2023 +0200 @@ -0,0 +1,183 @@ +#!/usr/bin/env python3 + +# Libervia plugin for Jingle (XEP-0166) +# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +import abc +from dataclasses import dataclass +from typing import Awaitable, Callable, Union + +from twisted.internet import defer +from twisted.words.xish import domish + +from sat.core.core_types import SatXMPPEntity +from sat.core.i18n import _ + + +class BaseApplicationHandler(abc.ABC): + + @abc.abstractmethod + def jingle_request_confirmation( + self, + client: SatXMPPEntity, + action: str, + session: dict, + content_name: str, + desc_elt: domish.Element, + ) -> Union[ + Callable[..., Union[bool, defer.Deferred]], + Callable[..., Awaitable[bool]] + ]: + """ + If present, it is called on when session must be accepted. + If not present, a generic accept dialog will be used. + + @param session: Jingle Session + @param desc_elt: <description> element + @return: True if the session is accepted. + A Deferred can be returned. + """ + pass + + @abc.abstractmethod + def jingle_session_init( + self, + client: SatXMPPEntity, + session: dict, + content_name: str, + *args, **kwargs + ) -> Union[ + Callable[..., domish.Element], + Callable[..., Awaitable[domish.Element]] + ]: + """ + Must return the domish.Element used for initial content. + + @param client: SatXMPPEntity instance + @param session: Jingle Session + @param content_name: Name of the content + @return: The domish.Element used for initial content + """ + pass + + @abc.abstractmethod + def jingle_handler( + self, + client: SatXMPPEntity, + action: str, + session: dict, + content_name: str, + transport_elt: domish.Element + ) -> Union[ + Callable[..., None], + Callable[..., Awaitable[None]] + ]: + """ + Called on several actions to negotiate the application or transport. + + @param client: SatXMPPEntity instance + @param action: Jingle action + @param session: Jingle Session + @param content_name: Name of the content + @param transport_elt: Transport element + """ + pass + + @abc.abstractmethod + def jingle_terminate( + self, + reason_elt: domish.Element + ) -> Union[ + Callable[..., None], + Callable[..., Awaitable[None]] + ]: + """ + Called on session terminate, with reason_elt. + May be used to clean session. + + @param reason_elt: Reason element + """ + pass + + +class BaseTransportHandler(abc.ABC): + + @abc.abstractmethod + def jingle_session_init( + self, + client: SatXMPPEntity, + session: dict, + content_name: str, + *args, **kwargs + ) -> Union[ + Callable[..., domish.Element], + Callable[..., Awaitable[domish.Element]] + ]: + """ + Must return the domish.Element used for initial content. + + @param client: SatXMPPEntity instance + @param session: Jingle Session + @param content_name: Name of the content + @return: The domish.Element used for initial content + """ + pass + + @abc.abstractmethod + def jingle_handler( + self, + client: SatXMPPEntity, + action: str, + session: dict, + content_name: str, + transport_elt: domish.Element + ) -> Union[ + Callable[..., None], + Callable[..., Awaitable[None]] + ]: + """ + Called on several actions to negotiate the application or transport. + + @param client: SatXMPPEntity instance + @param action: Jingle action + @param session: Jingle Session + @param content_name: Name of the content + @param transport_elt: Transport element + """ + pass + + +@dataclass(frozen=True) +class ApplicationData: + namespace: str + handler: BaseApplicationHandler + + +@dataclass(frozen=True) +class TransportData: + namespace: str + handler: BaseTransportHandler + priority: int + + +@dataclass(frozen=True) +class ContentData: + application: ApplicationData + app_args: list + app_kwargs: dict + transport_data: dict + content_name: str
--- a/sat/plugins/plugin_xep_0215.py Mon May 15 16:20:58 2023 +0200 +++ b/sat/plugins/plugin_xep_0215.py Mon May 15 16:23:11 2023 +0200 @@ -167,7 +167,7 @@ Response is cached after first query - @param entity: XMPP entity to query. Defaut to our own server + @param entity: XMPP entity to query. Default to our own server @return: found services """ if entity is None:
--- a/sat/plugins/plugin_xep_0260.py Mon May 15 16:20:58 2023 +0200 +++ b/sat/plugins/plugin_xep_0260.py Mon May 15 16:23:11 2023 +0200 @@ -161,9 +161,11 @@ cf XEP-0260 § 2.4 """ # now that the proxy is activated, we have to inform other peer + content_data = session["contents"][content_name] iq_elt, transport_elt = self._j.build_action( client, self._j.A_TRANSPORT_INFO, session, content_name ) + transport_elt["sid"] = content_data["transport_data"]["sid"] activated_elt = transport_elt.addElement("activated") activated_elt["cid"] = candidate.id iq_elt.send() @@ -175,9 +177,11 @@ """ # TODO: fallback to IBB # now that the proxy is activated, we have to inform other peer + content_data = session["contents"][content_name] iq_elt, transport_elt = self._j.build_action( client, self._j.A_TRANSPORT_INFO, session, content_name ) + transport_elt["sid"] = content_data["transport_data"]["sid"] transport_elt.addElement("proxy-error") iq_elt.send() log.warning( @@ -200,6 +204,7 @@ @param client(unicode): %(doc_client)s """ + content_data = session["contents"][content_name] transport_data["best_candidate"] = candidate # we need to disconnect all non selected candidates before removing them for c in transport_data["peer_candidates"]: @@ -210,6 +215,7 @@ iq_elt, transport_elt = self._j.build_action( client, self._j.A_TRANSPORT_INFO, session, content_name ) + transport_elt["sid"] = content_data["transport_data"]["sid"] if candidate is None: log.warning("Can't connect to any peer candidate") candidate_elt = transport_elt.addElement("candidate-error")
--- a/sat/plugins/plugin_xep_0353.py Mon May 15 16:20:58 2023 +0200 +++ b/sat/plugins/plugin_xep_0353.py Mon May 15 16:23:11 2023 +0200 @@ -105,22 +105,30 @@ mess_data = self.build_message_data(client, peer_jid, "propose", session['id']) for content in contents: - application, app_args, app_kwargs, content_name = self._j.get_content_data( + content_data = self._j.get_content_data( content) try: - jingle_description_elt = application.handler.jingle_description_elt + jingle_description_elt = ( + content_data.application.handler.jingle_description_elt + ) except AttributeError: - log.debug(f"no jingle_description_elt set for {application.handler}") + log.debug( + "no jingle_description_elt set for " + f"{content_data.application.handler}" + ) description_elt = domish.Element((content["app_ns"], "description")) else: description_elt = await utils.as_deferred( jingle_description_elt, - client, session, content_name, *app_args, **app_kwargs + client, session, content_data.content_name, *content_data.app_args, + **content_data.app_kwargs ) mess_data["xml"].propose.addChild(description_elt) response_d = defer.Deferred() # we wait for 2 min before cancelling the session init - response_d.addTimeout(2*60, reactor) + # response_d.addTimeout(2*60, reactor) + # FIXME: let's application decide timeout? + response_d.addTimeout(2, reactor) client._xep_0353_pending_sessions[session['id']] = response_d await client.send_message_data(mess_data) try: @@ -158,7 +166,7 @@ if peer_jid.userhostJID() not in client.roster: app_ns = elt.description.uri try: - application = self._j.getApplication(app_ns) + application = self._j.get_application(app_ns) human_name = getattr(application.handler, "human_name", application.name) except (exceptions.NotFound, AttributeError): if app_ns.startswith("urn:xmpp:jingle:apps:"): @@ -193,6 +201,7 @@ else: await client.presence.available(peer_jid) session_id = elt["id"] + # FIXME: accept is not used anymore in new specification, check it and remove it mess_data = self.build_message_data( client, client.jid.userhostJID(), "accept", session_id) await client.send_message_data(mess_data)