Mercurial > libervia-backend
view libervia/backend/plugins/plugin_xep_0280.py @ 4307:6a0155f410bd
test (unit): add test for plugin XEP-0033:
those replace the legacy XEP-0033 test from libervia/backend/test/test_plugin_xep_0033.py.
rel 450
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 26 Sep 2024 16:12:01 +0200 |
parents | 0d7bb4df2343 |
children |
line wrap: on
line source
#!/usr/bin/env python3 # SAT plugin for managing xep-0280 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from libervia.backend.core.i18n import _, D_ from libervia.backend.core.log import getLogger log = getLogger(__name__) from libervia.backend.core import exceptions from libervia.backend.core.constants import Const as C from twisted.words.protocols.jabber.error import StanzaError from twisted.internet import defer from wokkel import disco, iwokkel from zope.interface import implementer try: from twisted.words.protocols.xmlstream import XMPPHandler except ImportError: from wokkel.subprotocols import XMPPHandler PARAM_CATEGORY = "Misc" PARAM_NAME = "carbon" PARAM_LABEL = D_("Message carbons") NS_CARBONS = "urn:xmpp:carbons:2" PLUGIN_INFO = { C.PI_NAME: "XEP-0280 Plugin", C.PI_IMPORT_NAME: "XEP-0280", C.PI_TYPE: "XEP", C.PI_PROTOCOLS: ["XEP-0280"], C.PI_DEPENDENCIES: [], C.PI_MAIN: "XEP_0280", C.PI_HANDLER: "yes", C.PI_DESCRIPTION: D_("""Implementation of Message Carbons"""), } class XEP_0280(object): # TODO: param is only checked at profile connection # activate carbons on param change even after profile connection # TODO: chat state notifications are not handled yet (and potentially other XEPs?) params = """ <params> <individual> <category name="{category_name}" label="{category_label}"> <param name="{param_name}" label="{param_label}" value="true" type="bool" security="0" /> </category> </individual> </params> """.format( category_name=PARAM_CATEGORY, category_label=D_(PARAM_CATEGORY), param_name=PARAM_NAME, param_label=PARAM_LABEL, ) def __init__(self, host): log.info(_("Plugin XEP_0280 initialization")) self.host = host host.memory.update_params(self.params) host.trigger.add( "message_received", self.message_received_trigger, priority=200000 ) def get_handler(self, client): return XEP_0280_handler() def set_private(self, message_elt): """Add a <private/> element to a message this method is intented to be called on final domish.Element by other plugins (in particular end 2 end encryption plugins) @param message_elt(domish.Element): <message> stanza """ if message_elt.name != "message": log.error("addPrivateElt must be used with <message> stanzas") return message_elt.addElement((NS_CARBONS, "private")) @defer.inlineCallbacks def profile_connected(self, client): """activate message carbons on connection if possible and activated in config""" activate = self.host.memory.param_get_a( PARAM_NAME, PARAM_CATEGORY, profile_key=client.profile ) if not activate: log.info(_("Not activating message carbons as requested in params")) return try: yield self.host.check_features(client, (NS_CARBONS,)) except exceptions.FeatureNotFound: log.warning(_("server doesn't handle message carbons")) else: log.info(_("message carbons available, enabling it")) iq_elt = client.IQ() iq_elt.addElement((NS_CARBONS, "enable")) try: yield iq_elt.send() except StanzaError as e: log.warning("Can't activate message carbons: {}".format(e)) else: log.info(_("message carbons activated")) def message_received_trigger(self, client, message_elt, post_treat): """get message and handle it if carbons namespace is present""" for e in message_elt.elements(): if e.uri == NS_CARBONS and e.name in ("received", "sent"): carbons_elt = e break else: # this is not a message carbons, # we continue normal behaviour return True if message_elt["from"] != client.jid.userhost(): log.warning( "The message carbon received is not from our server, hack attempt?\n{xml}".format( xml=message_elt.toXml() ) ) return forwarded_elt = next(carbons_elt.elements(C.NS_FORWARD, "forwarded")) cc_message_elt = next(forwarded_elt.elements(C.NS_CLIENT, "message")) # we replace the wrapping message with the CCed one # and continue the normal behaviour message_elt["type"] = cc_message_elt.getAttribute("type", C.MESS_TYPE_NORMAL) mess_id = cc_message_elt.getAttribute("id") if mess_id: message_elt[mess_id] = mess_id if carbons_elt.name == "received": message_elt["from"] = cc_message_elt["from"] lang = cc_message_elt.getAttribute("xml:lang") if lang: message_elt["xml:lang"] = lang elif carbons_elt.name == "sent": # the cc_message_elt is the full JID, so we copy it message_elt["from"] = cc_message_elt["from"] try: message_elt["to"] = cc_message_elt["to"] except KeyError: # we may not have "to" in case of message from ourself (from an other # device) pass else: log.warning( "invalid message carbons received:\n{xml}".format(xml=message_elt.toXml()) ) return False del message_elt.children[:] for c in cc_message_elt.children: message_elt.addChild(c) return True @implementer(iwokkel.IDisco) class XEP_0280_handler(XMPPHandler): def getDiscoInfo(self, requestor, target, nodeIdentifier=""): return [disco.DiscoFeature(NS_CARBONS)] def getDiscoItems(self, requestor, target, nodeIdentifier=""): return []