Mercurial > libervia-backend
view libervia/backend/plugins/plugin_xep_0215.py @ 4095:684ba556a617
core (memory/sqla_mapping): fix legacy pickled values:
folloing packages refactoring, legacy pickled values could not be unpickled (due to use of
old classes). This temporary workaround fix it, but the right thing to do will be to move
from pickle to JSON at some point.
author | Goffi <goffi@goffi.org> |
---|---|
date | Mon, 12 Jun 2023 14:57:27 +0200 |
parents | 4b842c1fb686 |
children | 9658c534287e |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia plugin # Copyright (C) 2009-2023 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from typing import Dict, Final, List, Optional, Optional from twisted.internet import defer from twisted.words.protocols.jabber import error, jid from twisted.words.protocols.jabber.xmlstream import XMPPHandler from twisted.words.xish import domish from wokkel import data_form, disco, iwokkel from zope.interface import implementer from libervia.backend.core import exceptions from libervia.backend.core.constants import Const as C from libervia.backend.core.core_types import SatXMPPEntity from libervia.backend.core.i18n import _ from libervia.backend.core.log import getLogger from libervia.backend.tools import xml_tools from libervia.backend.tools import utils from libervia.backend.tools.common import data_format log = getLogger(__name__) PLUGIN_INFO = { C.PI_NAME: "External Service Discovery", C.PI_IMPORT_NAME: "XEP-0215", C.PI_TYPE: "XEP", C.PI_MODES: C.PLUG_MODE_BOTH, C.PI_PROTOCOLS: [], C.PI_DEPENDENCIES: [], C.PI_RECOMMENDATIONS: [], C.PI_MAIN: "XEP_0215", C.PI_HANDLER: "yes", C.PI_DESCRIPTION: _("""Discover services external to the XMPP network"""), } NS_EXTDISCO: Final = "urn:xmpp:extdisco:2" IQ_PUSH: Final = f'{C.IQ_SET}/services[@xmlns="{NS_EXTDISCO}"]' class XEP_0215: def __init__(self, host): log.info(_("External Service Discovery plugin initialization")) self.host = host host.bridge.add_method( "external_disco_get", ".plugin", in_sign="ss", out_sign="s", method=self._external_disco_get, async_=True, ) host.bridge.add_method( "external_disco_credentials_get", ".plugin", in_sign="ssis", out_sign="s", method=self._external_disco_credentials_get, async_=True, ) def get_handler(self, client): return XEP_0215_handler(self) async def profile_connecting(self, client: SatXMPPEntity) -> None: client._xep_0215_services = {} def parse_services( self, element: domish.Element, parent_elt_name: str = "services" ) -> List[dict]: """Retrieve services from element @param element: <[parent_elt_name]/> element or its parent @param parent_elt_name: name of the parent element can be "services" or "credentials" @return: list of parsed services """ if parent_elt_name not in ("services", "credentials"): raise exceptions.InternalError( f"invalid parent_elt_name: {parent_elt_name!r}" ) if element.name == parent_elt_name and element.uri == NS_EXTDISCO: services_elt = element else: try: services_elt = next(element.elements(NS_EXTDISCO, parent_elt_name)) except StopIteration: raise exceptions.DataError( f"XEP-0215 response is missing <{parent_elt_name}> element" ) services = [] for service_elt in services_elt.elements(NS_EXTDISCO, "service"): service = {} for key in [ "action", "expires", "host", "name", "password", "port", "restricted", "transport", "type", "username", ]: value = service_elt.getAttribute(key) if value is not None: if key == "expires": try: service[key] = utils.parse_xmpp_date(value) except ValueError: log.warning(f"invalid expiration date: {value!r}") continue elif key == "port": try: service[key] = int(value) except ValueError: log.warning(f"invalid port: {value!r}") continue elif key == "restricted": service[key] = C.bool(value) else: service[key] = value if not {"host", "type"}.issubset(service): log.warning( 'mandatory "host" or "type" are missing in service, ignoring it: ' "{service_elt.toXml()}" ) continue for x_elt in service_elt.elements(data_form.NS_X_DATA, "x"): form = data_form.Form.fromElement(x_elt) extended = service.setdefault("extended", []) extended.append(xml_tools.data_form_2_data_dict(form)) services.append(service) return services def _external_disco_get(self, entity: str, profile_key: str) -> defer.Deferred: client = self.host.get_client(profile_key) d = defer.ensureDeferred( self.get_external_services(client, jid.JID(entity) if entity else None) ) d.addCallback(data_format.serialise) return d async def get_external_services( self, client: SatXMPPEntity, entity: Optional[jid.JID] = None ) -> List[Dict]: """Get non XMPP service proposed by the entity Response is cached after first query @param entity: XMPP entity to query. Default to our own server @return: found services """ if entity is None: entity = client.server_jid if entity.resource: raise exceptions.DataError("A bare jid was expected for target entity") try: cached_services = client._xep_0215_services[entity] except KeyError: if not self.host.hasFeature(client, NS_EXTDISCO, entity): cached_services = client._xep_0215_services[entity] = None else: iq_elt = client.IQ("get") iq_elt["to"] = entity.full() iq_elt.addElement((NS_EXTDISCO, "services")) try: iq_result_elt = await iq_elt.send() except error.StanzaError as e: log.warning(f"Can't get external services: {e}") cached_services = client._xep_0215_services[entity] = None else: cached_services = self.parse_services(iq_result_elt) client._xep_0215_services[entity] = cached_services return cached_services or [] def _external_disco_credentials_get( self, entity: str, host: str, type_: str, port: int = 0, profile_key=C.PROF_KEY_NONE, ) -> defer.Deferred: client = self.host.get_client(profile_key) d = defer.ensureDeferred( self.request_credentials( client, host, type_, port or None, jid.JID(entity) if entity else None ) ) d.addCallback(data_format.serialise) return d async def request_credentials( self, client: SatXMPPEntity, host: str, type_: str, port: Optional[int] = None, entity: Optional[jid.JID] = None, ) -> List[dict]: """Request credentials for specified service(s) While usually a single service is expected, several may be returned if the same service is launched on several ports (cf. XEP-0215 §3.3) @param entity: XMPP entity to query. Defaut to our own server @param host: service host @param type_: service type @param port: service port (to be used when several services have same host and type but on different ports) @return: matching services with filled credentials """ if entity is None: entity = client.server_jid iq_elt = client.IQ("get") iq_elt["to"] = entity.full() iq_elt.addElement((NS_EXTDISCO, "credentials")) iq_result_elt = await iq_elt.send() return self.parse_services(iq_result_elt, parent_elt_name="credentials") def get_matching_service( self, services: List[dict], host: str, type_: str, port: Optional[int] ) -> Optional[dict]: """Retrieve service data from its characteristics""" try: return next( s for s in services if ( s["host"] == host and s["type"] == type_ and (port is None or s.get("port") == port) ) ) except StopIteration: return None def on_services_push(self, iq_elt: domish.Element, client: SatXMPPEntity) -> None: iq_elt.handled = True entity = jid.JID(iq_elt["from"]).userhostJID() cached_services = client._xep_0215_services.get(entity) if cached_services is None: log.info(f"ignoring services push for uncached entity {entity}") return try: services = self.parse_services(iq_elt) except Exception: log.exception(f"Can't parse services push: {iq_elt.toXml()}") return for service in services: host = service["host"] type_ = service["type"] port = service.get("port") action = service.pop("action", None) if action is None: # action is not specified, we first check if the service exists found_service = self.get_matching_service( cached_services, host, type_, port ) if found_service is not None: # existing service, we replace by the new one found_service.clear() found_service.update(service) else: # new service cached_services.append(service) elif action == "add": cached_services.append(service) elif action in ("modify", "delete"): found_service = self.get_matching_service( cached_services, host, type_, port ) if found_service is None: log.warning( f"{entity} want to {action} an unknow service, we ask for the " "full list again" ) # we delete cache and request a fresh list to make a new one del client._xep_0215_services[entity] defer.ensureDeferred(self.get_external_services(client, entity)) elif action == "modify": found_service.clear() found_service.update(service) else: cached_services.remove(found_service) else: log.warning(f"unknown action for services push, ignoring: {action!r}") @implementer(iwokkel.IDisco) class XEP_0215_handler(XMPPHandler): def __init__(self, plugin_parent): self.plugin_parent = plugin_parent def connectionInitialized(self): self.xmlstream.addObserver( IQ_PUSH, self.plugin_parent.on_services_push, client=self.parent ) def getDiscoInfo(self, requestor, target, nodeIdentifier=""): return [disco.DiscoFeature(NS_EXTDISCO)] def getDiscoItems(self, requestor, target, nodeIdentifier=""): return []