view libervia/backend/plugins/plugin_xep_0215.py @ 4240:79c8a70e1813

backend, frontend: prepare remote control: This is a series of changes necessary to prepare the implementation of remote control feature: - XEP-0166: add a `priority` attribute to `ApplicationData`: this is needed when several applications are working in a same session, to know which one must be handled first. Will be used to make Remote Control have precedence over Call content. - XEP-0166: `_call_plugins` is now async and is not used with `DeferredList` anymore: the benefit to have methods called in parallels is very low, and it cause a lot of trouble as we can't predict order. Methods are now called sequentially so workflow can be predicted. - XEP-0167: fix `senders` XMPP attribute <=> SDP mapping - XEP-0234: preflight acceptance key is now `pre-accepted` instead of `file-accepted`, so the same key can be used with other jingle applications. - XEP-0167, XEP-0343: move some method to XEP-0167 - XEP-0353: use new `priority` feature to call preflight methods of applications according to it. - frontend (webrtc): refactor the sources/sink handling with a more flexible mechanism based on Pydantic models. It is now possible to have has many Data Channel as necessary, to have them in addition to A/V streams, to specify manually GStreamer sources and sinks, etc. - frontend (webrtc): rework of the pipeline to reduce latency. - frontend: new `portal_desktop` method. Screenshare portal handling has been moved there, and RemoteDesktop portal has been added. - frontend (webrtc): fix `extract_ufrag_pwd` method. rel 436
author Goffi <goffi@goffi.org>
date Sat, 11 May 2024 13:52:41 +0200
parents 4b842c1fb686
children 9658c534287e
line wrap: on
line source

#!/usr/bin/env python3

# Libervia plugin
# Copyright (C) 2009-2023 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from typing import Dict, Final, List, Optional, Optional

from twisted.internet import defer
from twisted.words.protocols.jabber import error, jid
from twisted.words.protocols.jabber.xmlstream import XMPPHandler
from twisted.words.xish import domish
from wokkel import data_form, disco, iwokkel
from zope.interface import implementer

from libervia.backend.core import exceptions
from libervia.backend.core.constants import Const as C
from libervia.backend.core.core_types import SatXMPPEntity
from libervia.backend.core.i18n import _
from libervia.backend.core.log import getLogger
from libervia.backend.tools import xml_tools
from libervia.backend.tools import utils
from libervia.backend.tools.common import data_format

log = getLogger(__name__)


PLUGIN_INFO = {
    C.PI_NAME: "External Service Discovery",
    C.PI_IMPORT_NAME: "XEP-0215",
    C.PI_TYPE: "XEP",
    C.PI_MODES: C.PLUG_MODE_BOTH,
    C.PI_PROTOCOLS: [],
    C.PI_DEPENDENCIES: [],
    C.PI_RECOMMENDATIONS: [],
    C.PI_MAIN: "XEP_0215",
    C.PI_HANDLER: "yes",
    C.PI_DESCRIPTION: _("""Discover services external to the XMPP network"""),
}

NS_EXTDISCO: Final = "urn:xmpp:extdisco:2"
IQ_PUSH: Final = f'{C.IQ_SET}/services[@xmlns="{NS_EXTDISCO}"]'


class XEP_0215:
    def __init__(self, host):
        log.info(_("External Service Discovery plugin initialization"))
        self.host = host
        host.bridge.add_method(
            "external_disco_get",
            ".plugin",
            in_sign="ss",
            out_sign="s",
            method=self._external_disco_get,
            async_=True,
        )
        host.bridge.add_method(
            "external_disco_credentials_get",
            ".plugin",
            in_sign="ssis",
            out_sign="s",
            method=self._external_disco_credentials_get,
            async_=True,
        )

    def get_handler(self, client):
        return XEP_0215_handler(self)

    async def profile_connecting(self, client: SatXMPPEntity) -> None:
        client._xep_0215_services = {}

    def parse_services(
        self, element: domish.Element, parent_elt_name: str = "services"
    ) -> List[dict]:
        """Retrieve services from element

        @param element: <[parent_elt_name]/> element or its parent
        @param parent_elt_name: name of the parent element
            can be "services" or "credentials"
        @return: list of parsed services
        """
        if parent_elt_name not in ("services", "credentials"):
            raise exceptions.InternalError(
                f"invalid parent_elt_name: {parent_elt_name!r}"
            )
        if element.name == parent_elt_name and element.uri == NS_EXTDISCO:
            services_elt = element
        else:
            try:
                services_elt = next(element.elements(NS_EXTDISCO, parent_elt_name))
            except StopIteration:
                raise exceptions.DataError(
                    f"XEP-0215 response is missing <{parent_elt_name}> element"
                )

        services = []
        for service_elt in services_elt.elements(NS_EXTDISCO, "service"):
            service = {}
            for key in [
                "action",
                "expires",
                "host",
                "name",
                "password",
                "port",
                "restricted",
                "transport",
                "type",
                "username",
            ]:
                value = service_elt.getAttribute(key)
                if value is not None:
                    if key == "expires":
                        try:
                            service[key] = utils.parse_xmpp_date(value)
                        except ValueError:
                            log.warning(f"invalid expiration date: {value!r}")
                            continue
                    elif key == "port":
                        try:
                            service[key] = int(value)
                        except ValueError:
                            log.warning(f"invalid port: {value!r}")
                            continue
                    elif key == "restricted":
                        service[key] = C.bool(value)
                    else:
                        service[key] = value
            if not {"host", "type"}.issubset(service):
                log.warning(
                    'mandatory "host" or "type" are missing in service, ignoring it: '
                    "{service_elt.toXml()}"
                )
                continue
            for x_elt in service_elt.elements(data_form.NS_X_DATA, "x"):
                form = data_form.Form.fromElement(x_elt)
                extended = service.setdefault("extended", [])
                extended.append(xml_tools.data_form_2_data_dict(form))
            services.append(service)

        return services

    def _external_disco_get(self, entity: str, profile_key: str) -> defer.Deferred:
        client = self.host.get_client(profile_key)
        d = defer.ensureDeferred(
            self.get_external_services(client, jid.JID(entity) if entity else None)
        )
        d.addCallback(data_format.serialise)
        return d

    async def get_external_services(
        self, client: SatXMPPEntity, entity: Optional[jid.JID] = None
    ) -> List[Dict]:
        """Get non XMPP service proposed by the entity

        Response is cached after first query

        @param entity: XMPP entity to query. Default to our own server
        @return: found services
        """
        if entity is None:
            entity = client.server_jid

        if entity.resource:
            raise exceptions.DataError("A bare jid was expected for target entity")

        try:
            cached_services = client._xep_0215_services[entity]
        except KeyError:
            if not self.host.hasFeature(client, NS_EXTDISCO, entity):
                cached_services = client._xep_0215_services[entity] = None
            else:
                iq_elt = client.IQ("get")
                iq_elt["to"] = entity.full()
                iq_elt.addElement((NS_EXTDISCO, "services"))
                try:
                    iq_result_elt = await iq_elt.send()
                except error.StanzaError as e:
                    log.warning(f"Can't get external services: {e}")
                    cached_services = client._xep_0215_services[entity] = None
                else:
                    cached_services = self.parse_services(iq_result_elt)
                    client._xep_0215_services[entity] = cached_services

        return cached_services or []

    def _external_disco_credentials_get(
        self,
        entity: str,
        host: str,
        type_: str,
        port: int = 0,
        profile_key=C.PROF_KEY_NONE,
    ) -> defer.Deferred:
        client = self.host.get_client(profile_key)
        d = defer.ensureDeferred(
            self.request_credentials(
                client, host, type_, port or None, jid.JID(entity) if entity else None
            )
        )
        d.addCallback(data_format.serialise)
        return d

    async def request_credentials(
        self,
        client: SatXMPPEntity,
        host: str,
        type_: str,
        port: Optional[int] = None,
        entity: Optional[jid.JID] = None,
    ) -> List[dict]:
        """Request credentials for specified service(s)

        While usually a single service is expected, several may be returned if the same
        service is launched on several ports (cf. XEP-0215 §3.3)
        @param entity: XMPP entity to query. Defaut to our own server
        @param host: service host
        @param type_: service type
        @param port: service port (to be used when several services have same host and
            type but on different ports)
        @return: matching services with filled credentials
        """
        if entity is None:
            entity = client.server_jid

        iq_elt = client.IQ("get")
        iq_elt["to"] = entity.full()
        iq_elt.addElement((NS_EXTDISCO, "credentials"))
        iq_result_elt = await iq_elt.send()
        return self.parse_services(iq_result_elt, parent_elt_name="credentials")

    def get_matching_service(
        self, services: List[dict], host: str, type_: str, port: Optional[int]
    ) -> Optional[dict]:
        """Retrieve service data from its characteristics"""
        try:
            return next(
                s
                for s in services
                if (
                    s["host"] == host
                    and s["type"] == type_
                    and (port is None or s.get("port") == port)
                )
            )
        except StopIteration:
            return None

    def on_services_push(self, iq_elt: domish.Element, client: SatXMPPEntity) -> None:
        iq_elt.handled = True
        entity = jid.JID(iq_elt["from"]).userhostJID()
        cached_services = client._xep_0215_services.get(entity)
        if cached_services is None:
            log.info(f"ignoring services push for uncached entity {entity}")
            return
        try:
            services = self.parse_services(iq_elt)
        except Exception:
            log.exception(f"Can't parse services push: {iq_elt.toXml()}")
            return
        for service in services:
            host = service["host"]
            type_ = service["type"]
            port = service.get("port")

            action = service.pop("action", None)
            if action is None:
                # action is not specified, we first check if the service exists
                found_service = self.get_matching_service(
                    cached_services, host, type_, port
                )
                if found_service is not None:
                    # existing service, we replace by the new one
                    found_service.clear()
                    found_service.update(service)
                else:
                    # new service
                    cached_services.append(service)
            elif action == "add":
                cached_services.append(service)
            elif action in ("modify", "delete"):
                found_service = self.get_matching_service(
                    cached_services, host, type_, port
                )
                if found_service is None:
                    log.warning(
                        f"{entity} want to {action} an unknow service, we ask for the "
                        "full list again"
                    )
                    # we delete cache and request a fresh list to make a new one
                    del client._xep_0215_services[entity]
                    defer.ensureDeferred(self.get_external_services(client, entity))
                elif action == "modify":
                    found_service.clear()
                    found_service.update(service)
                else:
                    cached_services.remove(found_service)
            else:
                log.warning(f"unknown action for services push, ignoring: {action!r}")


@implementer(iwokkel.IDisco)
class XEP_0215_handler(XMPPHandler):
    def __init__(self, plugin_parent):
        self.plugin_parent = plugin_parent

    def connectionInitialized(self):
        self.xmlstream.addObserver(
            IQ_PUSH, self.plugin_parent.on_services_push, client=self.parent
        )

    def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
        return [disco.DiscoFeature(NS_EXTDISCO)]

    def getDiscoItems(self, requestor, target, nodeIdentifier=""):
        return []