Mercurial > libervia-backend
changeset 4033:5a42c7842556
core (plugins): implementation of XEP-0215 "External Service Discovery":
rel 418
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 07 Apr 2023 15:16:39 +0200 |
parents | bb211f80c3e6 |
children | 9496f28dadff |
files | sat/core/core_types.py sat/plugins/plugin_xep_0215.py sat/tools/utils.py |
diffstat | 3 files changed, 334 insertions(+), 2 deletions(-) [+] |
line wrap: on
line diff
--- a/sat/core/core_types.py Fri Apr 07 14:59:01 2023 +0200 +++ b/sat/core/core_types.py Fri Apr 07 15:16:39 2023 +0200 @@ -17,9 +17,11 @@ # along with this program. If not, see <http://www.gnu.org/licenses/>. from collections import namedtuple -from typing import Dict +from typing import Dict, Callable, Optional from typing_extensions import TypedDict + from twisted.words.protocols.jabber import jid as t_jid +from twisted.words.protocols.jabber import xmlstream from twisted.words.xish import domish @@ -27,7 +29,8 @@ jid: t_jid.JID is_component: bool - + server_jid: t_jid.JID + IQ: Callable[[Optional[str], Optional[int]], xmlstream.IQ] EncryptionPlugin = namedtuple("EncryptionPlugin", ("instance", "name",
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sat/plugins/plugin_xep_0215.py Fri Apr 07 15:16:39 2023 +0200 @@ -0,0 +1,328 @@ +#!/usr/bin/env python3 + +# Libervia plugin +# Copyright (C) 2009-2023 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +from typing import Dict, Final, List, Optional, Optional + +from twisted.internet import defer +from twisted.words.protocols.jabber import error, jid +from twisted.words.protocols.jabber.xmlstream import XMPPHandler +from twisted.words.xish import domish +from wokkel import data_form, disco, iwokkel +from zope.interface import implementer + +from sat.core import exceptions +from sat.core.constants import Const as C +from sat.core.core_types import SatXMPPEntity +from sat.core.i18n import _ +from sat.core.log import getLogger +from sat.tools import xml_tools +from sat.tools import utils +from sat.tools.common import data_format + +log = getLogger(__name__) + + +PLUGIN_INFO = { + C.PI_NAME: "External Service Discovery", + C.PI_IMPORT_NAME: "XEP-0215", + C.PI_TYPE: "XEP", + C.PI_MODES: C.PLUG_MODE_BOTH, + C.PI_PROTOCOLS: [], + C.PI_DEPENDENCIES: [], + C.PI_RECOMMENDATIONS: [], + C.PI_MAIN: "XEP_0215", + C.PI_HANDLER: "yes", + C.PI_DESCRIPTION: _("""Discover services external to the XMPP network"""), +} + +NS_EXTDISCO: Final = "urn:xmpp:extdisco:2" +IQ_PUSH: Final = f'{C.IQ_SET}/services[@xmlns="{NS_EXTDISCO}"]' + + +class XEP_0215: + def __init__(self, host): + log.info(_("External Service Discovery plugin initialization")) + self.host = host + host.bridge.addMethod( + "external_disco_get", + ".plugin", + in_sign="ss", + out_sign="s", + method=self._external_disco_get, + async_=True, + ) + host.bridge.addMethod( + "external_disco_credentials_get", + ".plugin", + in_sign="ssis", + out_sign="s", + method=self._external_disco_credentials_get, + async_=True, + ) + + def getHandler(self, client): + return XEP_0215_handler(self) + + async def profileConnecting(self, client: SatXMPPEntity) -> None: + client._xep_0215_services = {} + + def parse_services( + self, element: domish.Element, parent_elt_name: str = "services" + ) -> List[dict]: + """Retrieve services from element + + @param element: <[parent_elt_name]/> element or its parent + @param parent_elt_name: name of the parent element + can be "services" or "credentials" + @return: list of parsed services + """ + if parent_elt_name not in ("services", "credentials"): + raise exceptions.InternalError( + f"invalid parent_elt_name: {parent_elt_name!r}" + ) + if element.name == parent_elt_name and element.uri == NS_EXTDISCO: + services_elt = element + else: + try: + services_elt = next(element.elements(NS_EXTDISCO, parent_elt_name)) + except StopIteration: + raise exceptions.DataError( + f"XEP-0215 response is missing <{parent_elt_name}> element" + ) + + services = [] + for service_elt in services_elt.elements(NS_EXTDISCO, "service"): + service = {} + for key in [ + "action", + "expires", + "host", + "name", + "password", + "port", + "restricted", + "transport", + "type", + "username", + ]: + value = service_elt.getAttribute(key) + if value is not None: + if key == "expires": + try: + service[key] = utils.parse_xmpp_date(value) + except ValueError: + log.warning(f"invalid expiration date: {value!r}") + continue + elif key == "port": + try: + service[key] = int(value) + except ValueError: + log.warning(f"invalid port: {value!r}") + continue + elif key == "restricted": + service[key] = C.bool(value) + else: + service[key] = value + if not {"host", "type"}.issubset(service): + log.warning( + 'mandatory "host" or "type" are missing in service, ignoring it: ' + "{service_elt.toXml()}" + ) + continue + for x_elt in service_elt.elements(data_form.NS_X_DATA, "x"): + form = data_form.Form.fromElement(x_elt) + extended = service.setdefault("extended", []) + extended.append(xml_tools.dataForm2dataDict(form)) + services.append(service) + + return services + + def _external_disco_get(self, entity: str, profile_key: str) -> defer.Deferred: + client = self.host.getClient(profile_key) + d = defer.ensureDeferred( + self.get_external_services(client, jid.JID(entity) if entity else None) + ) + d.addCallback(data_format.serialise) + return d + + async def get_external_services( + self, client: SatXMPPEntity, entity: Optional[jid.JID] = None + ) -> List[Dict]: + """Get non XMPP service proposed by the entity + + Response is cached after first query + + @param entity: XMPP entity to query. Defaut to our own server + @return: found services + """ + if entity is None: + entity = client.server_jid + + if entity.resource: + raise exceptions.DataError("A bare jid was expected for target entity") + + try: + cached_services = client._xep_0215_services[entity] + except KeyError: + if not self.host.hasFeature(client, NS_EXTDISCO, entity): + cached_services = client._xep_0215_services[entity] = None + else: + iq_elt = client.IQ("get") + iq_elt["to"] = entity.full() + iq_elt.addElement((NS_EXTDISCO, "services")) + try: + iq_result_elt = await iq_elt.send() + except error.StanzaError as e: + log.warning(f"Can't get external services: {e}") + cached_services = client._xep_0215_services[entity] = None + else: + cached_services = self.parse_services(iq_result_elt) + client._xep_0215_services[entity] = cached_services + + return cached_services or [] + + def _external_disco_credentials_get( + self, + entity: str, + host: str, + type_: str, + port: int = 0, + profile_key=C.PROF_KEY_NONE, + ) -> defer.Deferred: + client = self.host.getClient(profile_key) + d = defer.ensureDeferred( + self.request_credentials( + client, host, type_, port or None, jid.JID(entity) if entity else None + ) + ) + d.addCallback(data_format.serialise) + return d + + async def request_credentials( + self, + client: SatXMPPEntity, + host: str, + type_: str, + port: Optional[int] = None, + entity: Optional[jid.JID] = None, + ) -> List[dict]: + """Request credentials for specified service(s) + + While usually a single service is expected, several may be returned if the same + service is launched on several ports (cf. XEP-0215 §3.3) + @param entity: XMPP entity to query. Defaut to our own server + @param host: service host + @param type_: service type + @param port: service port (to be used when several services have same host and + type but on different ports) + @return: matching services with filled credentials + """ + if entity is None: + entity = client.server_jid + + iq_elt = client.IQ("get") + iq_elt["to"] = entity.full() + iq_elt.addElement((NS_EXTDISCO, "credentials")) + iq_result_elt = await iq_elt.send() + return self.parse_services(iq_result_elt, parent_elt_name="credentials") + + def get_matching_service( + self, services: List[dict], host: str, type_: str, port: Optional[int] + ) -> Optional[dict]: + """Retrieve service data from its characteristics""" + try: + return next( + s + for s in services + if ( + s["host"] == host + and s["type"] == type_ + and (port is None or s.get("port") == port) + ) + ) + except StopIteration: + return None + + def on_services_push(self, iq_elt: domish.Element, client: SatXMPPEntity) -> None: + iq_elt.handled = True + entity = jid.JID(iq_elt["from"]).userhostJID() + cached_services = client._xep_0215_services.get(entity) + if cached_services is None: + log.info(f"ignoring services push for uncached entity {entity}") + return + try: + services = self.parse_services(iq_elt) + except Exception: + log.exception(f"Can't parse services push: {iq_elt.toXml()}") + return + for service in services: + host = service["host"] + type_ = service["type"] + port = service.get("port") + + action = service.pop("action", None) + if action is None: + # action is not specified, we first check if the service exists + found_service = self.get_matching_service( + cached_services, host, type_, port + ) + if found_service is not None: + # existing service, we replace by the new one + found_service.clear() + found_service.update(service) + else: + # new service + cached_services.append(service) + elif action == "add": + cached_services.append(service) + elif action in ("modify", "delete"): + found_service = self.get_matching_service( + cached_services, host, type_, port + ) + if found_service is None: + log.warning( + f"{entity} want to {action} an unknow service, we ask for the " + "full list again" + ) + # we delete cache and request a fresh list to make a new one + del client._xep_0215_services[entity] + defer.ensureDeferred(self.get_external_services(client, entity)) + elif action == "modify": + found_service.clear() + found_service.update(service) + else: + cached_services.remove(found_service) + else: + log.warning(f"unknown action for services push, ignoring: {action!r}") + + +@implementer(iwokkel.IDisco) +class XEP_0215_handler(XMPPHandler): + def __init__(self, plugin_parent): + self.plugin_parent = plugin_parent + + def connectionInitialized(self): + self.xmlstream.addObserver( + IQ_PUSH, self.plugin_parent.on_services_push, client=self.parent + ) + + def getDiscoInfo(self, requestor, target, nodeIdentifier=""): + return [disco.DiscoFeature(NS_EXTDISCO)] + + def getDiscoItems(self, requestor, target, nodeIdentifier=""): + return []
--- a/sat/tools/utils.py Fri Apr 07 14:59:01 2023 +0200 +++ b/sat/tools/utils.py Fri Apr 07 15:16:39 2023 +0200 @@ -173,6 +173,7 @@ @param with_time: if True, ``xmpp_date_str`` must be a datetime, otherwise if must be a time profile. @return: datetime converted to unix time + @raise ValueError: the format is invalid """ if with_time: dt = xmpp_datetime.parse_datetime(xmpp_date_str)