Mercurial > libervia-backend
view libervia/backend/plugins/plugin_xep_0298.py @ 4318:27bb22eace65
tests (unit/email gateway): add test for XEP-0131 handling:
rel 451
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 28 Sep 2024 15:59:48 +0200 |
parents | a0ed5c976bf8 |
children |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia plugin to handle events # Copyright (C) 2009-2022 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from urllib.parse import quote, unquote from twisted.words.protocols.jabber import jid from twisted.words.xish import domish from libervia.backend.core.core_types import SatXMPPEntity from libervia.backend.core.i18n import _ from libervia.backend.core.constants import Const as C from libervia.backend.core.log import getLogger from wokkel import disco, iwokkel from zope.interface import implementer from twisted.words.protocols.jabber.xmlstream import XMPPHandler from libervia.backend.plugins.plugin_xep_0166 import XEP_0166 log = getLogger(__name__) PLUGIN_INFO = { C.PI_NAME: "Events", C.PI_IMPORT_NAME: "XEP-0298", C.PI_TYPE: "XEP", C.PI_MODES: C.PLUG_MODE_BOTH, C.PI_PROTOCOLS: [], C.PI_DEPENDENCIES: ["XEP-0167"], C.PI_MAIN: "XEP_0298", C.PI_HANDLER: "yes", C.PI_DESCRIPTION: _( "Delivering Conference Information to Jingle Participants (Coin). This plugin " "is used to associate metadata about uses an call states in multi-party calls." ), } NS_COIN = "urn:xmpp:coin:1" NS_CONFERENCE_INFO = "urn:ietf:params:xml:ns:conference-info" class XEP_0298: namespace = NS_COIN def __init__(self, host): log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization") self.host = host self._j: XEP_0166 = host.plugins["XEP-0166"] host.trigger.add( "XEP-0167_jingle_session_init", self._jingle_session_init_trigger ) host.trigger.add("XEP-0167_jingle_handler", self._jingle_handler_trigger) def get_handler(self, client): return CoinHandler(self) def _jingle_session_init_trigger( self, client: SatXMPPEntity, session: dict, content_name: str, media: str, media_data: dict, desc_elt: domish.Element, ) -> None: """Check for the presence of "user" metadata, and add relevant elements.""" if client.profile == "conferences": if content_name != next(iter(session["contents"].keys())): # We only apply metadata for the first content, as it is global. return try: user = session["metadata"]["user"] except KeyError: return jingle_elt = session["jingle_elt"] conference_info_elt = self.add_conference_info(jingle_elt, True) self.add_user(conference_info_elt, user) async def _jingle_handler_trigger( self, client: SatXMPPEntity, action: str, session: dict, content_name: str, desc_elt: domish.Element, ) -> None: # this is a session metadata, so we only generate it on the first content if action == self._j.A_PREPARE_CONFIRMATION and content_name == next( iter(session["contents"]) ): jingle_elt = session["jingle_elt"] infos = self.parse(jingle_elt) try: user = infos["info"]["users"][0] except (KeyError, IndexError): pass else: session.setdefault("metadata", {})["peer_user"] = user def add_conference_info( self, jingle_elt: domish.Element, is_focus: bool = False, **kwargs ) -> domish.Element: """Create and return a <conference_info> element @param jingle_elt: parent element @param kwargs: attributes of the element. @return: created <conference-info> element. """ conference_info_elt = jingle_elt.addElement( (NS_CONFERENCE_INFO, "conference-info"), ) if is_focus: conference_info_elt["isfocus"] = C.BOOL_TRUE conference_info_elt.attributes.update(kwargs) return conference_info_elt def add_user( self, conference_info_elt: domish.Element, entity: jid.JID ) -> domish.Element: """Add an user to a cconference info element. If the parent <users> doesn't exist, it will be created. @param conference_info_elt: <conference-info> element where the <user> element need to be added @param entity: XMPP JID to use as entity. @return: created <user> element. """ try: users_elt = next(conference_info_elt.elements(NS_CONFERENCE_INFO, "users")) except StopIteration: users_elt = conference_info_elt.addElement("users") user_elt = users_elt.addElement("user") user_elt["entity"] = f"xmpp:{quote(entity.userhost())}" return user_elt def parse(self, jingle_elt: domish.Element) -> dict: """Parse a Jingle element and return a dictionary with conference info if present. @param jingle_elt: Jingle element to parse. @return: Dictionary with "info" key if conference info is found. """ try: conference_info_elt = next( jingle_elt.elements(NS_CONFERENCE_INFO, "conference-info") ) except StopIteration: return {} users = [] try: users_elt = next(conference_info_elt.elements(NS_CONFERENCE_INFO, "users")) for user_elt in users_elt.elements(NS_CONFERENCE_INFO, "user"): entity = user_elt.getAttribute("entity") if entity.startswith("xmpp:"): try: entity = jid.JID(unquote(entity[5:])) users.append(entity) except Exception as e: log.warning(f"Failed to parse entity {entity!r}: {e}") else: log.warning(f"Ignoring non-XMPP entity {entity!r}") except StopIteration: pass return {"info": {"users": users}} @implementer(iwokkel.IDisco) class CoinHandler(XMPPHandler): def __init__(self, plugin_parent): self.plugin_parent = plugin_parent def getDiscoInfo(self, requestor, target, nodeIdentifier=""): return [ disco.DiscoFeature(NS_COIN), ] def getDiscoItems(self, requestor, target, nodeIdentifier=""): return []