Mercurial > libervia-backend
view libervia/backend/plugins/plugin_xep_0338.py @ 4097:0f6fd28fde0d
component file sharing: fix method name case:
`render_OPTIONS` is a method inherited from Twisted, name has been put to lowercase
following massive refactoring, resulting in the method not being called anymore. This
patch fixes it.
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 22 Jun 2023 15:45:20 +0200 |
parents | 4b842c1fb686 |
children | 52a89ddf3087 |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia plugin # Copyright (C) 2009-2023 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from typing import List from twisted.words.protocols.jabber.xmlstream import XMPPHandler from twisted.words.xish import domish from wokkel import disco, iwokkel from zope.interface import implementer from libervia.backend.core.constants import Const as C from libervia.backend.core.i18n import _ from libervia.backend.core.log import getLogger from libervia.backend.core.core_types import SatXMPPEntity log = getLogger(__name__) NS_JINGLE_GROUPING = "urn:xmpp:jingle:apps:grouping:0" NS_RFC_5888 = "urn:ietf:rfc:5888" PLUGIN_INFO = { C.PI_NAME: "Jingle Grouping Framework", C.PI_IMPORT_NAME: "XEP-0338", C.PI_TYPE: "XEP", C.PI_MODES: C.PLUG_MODE_BOTH, C.PI_PROTOCOLS: ["XEP-0338"], C.PI_DEPENDENCIES: ["XEP-0166", "XEP-0167"], C.PI_RECOMMENDATIONS: [], C.PI_MAIN: "XEP_0338", C.PI_HANDLER: "yes", C.PI_DESCRIPTION: _("""Jingle mapping of RFC 5888 SDP Grouping Framework"""), } class XEP_0338: def __init__(self, host): log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization") self._j = host.plugins["XEP-0166"] host.trigger.add("XEP-0167_parse_sdp_a", self._parse_sdp_a_trigger) host.trigger.add( "XEP-0167_generate_sdp_session", self._generate_sdp_session_trigger ) host.trigger.add("XEP-0167_jingle_session_init", self._jingle_session_init_trigger) host.trigger.add("XEP-0167_jingle_handler", self._jingle_handler_trigger) def get_handler(self, client): return XEP_0338_handler() def _parse_sdp_a_trigger( self, attribute: str, parts: List[str], call_data: dict, metadata: dict, media_type: str, application_data: dict, transport_data: dict, ) -> None: """Parse "group" attributes""" if attribute == "group": semantics = parts[0] content_names = parts[1:] metadata.setdefault("group", {})[semantics] = content_names def _generate_sdp_session_trigger( self, session: dict, local: bool, sdp_lines: List[str], ) -> None: """Generate "group" attributes""" key = "metadata" if local else "peer_metadata" group_data = session[key].get("group", {}) for semantics, content_names in group_data.items(): sdp_lines.append(f"a=group:{semantics} {' '.join(content_names)}") def parse_group_element( self, jingle_elt: domish.Element, session: dict ) -> None: """Parse the <group> and <content> elements""" for group_elt in jingle_elt.elements(NS_JINGLE_GROUPING, "group"): try: metadata = session["peer_metadata"] semantics = group_elt["semantics"] group_content = metadata.setdefault("group", {})[semantics] = [] for content_elt in group_elt.elements(NS_JINGLE_GROUPING, "content"): group_content.append(content_elt["name"]) except KeyError as e: log.warning(f"Error while parsing <group>: {e}\n{group_elt.toXml()}") def add_group_element( self, jingle_elt: domish.Element, session: dict ) -> None: """Build the <group> and <content> elements if possible""" for semantics, content_names in session["metadata"].get("group", {}).items(): group_elt = jingle_elt.addElement((NS_JINGLE_GROUPING, "group")) group_elt["semantics"] = semantics for content_name in content_names: content_elt = group_elt.addElement((NS_JINGLE_GROUPING, "content")) content_elt["name"] = content_name def _jingle_session_init_trigger( self, client: SatXMPPEntity, session: dict, content_name: str, media: str, media_data: dict, desc_elt: domish.Element, ) -> None: jingle_elt = session["jingle_elt"] self.add_group_element(jingle_elt, session) def _jingle_handler_trigger( self, client: SatXMPPEntity, action: str, session: dict, content_name: str, desc_elt: domish.Element, ) -> None: # this is a session metadata, so we only generate it on the first content if content_name == next(iter(session["contents"])) and action in ( self._j.A_PREPARE_CONFIRMATION, self._j.A_SESSION_INITIATE, self._j.A_PREPARE_INITIATOR, ): jingle_elt = session["jingle_elt"] self.parse_group_element(jingle_elt, session) if action == self._j.A_SESSION_INITIATE: self.add_group_element(jingle_elt, session) @implementer(iwokkel.IDisco) class XEP_0338_handler(XMPPHandler): def getDiscoInfo(self, requestor, target, nodeIdentifier=""): return [disco.DiscoFeature(NS_RFC_5888)] def getDiscoItems(self, requestor, target, nodeIdentifier=""): return []