view sat/plugins/plugin_xep_0163.py @ 4007:1d5a81e3c9e8

plugin XEP-0384: skip MessageReceived trigger when in a component: OMEMO is not used in components so far, but the trigger is trying to request OMEMO PEP nodes, which causes an error with virtual pubsub service of AP component.
author Goffi <goffi@goffi.org>
date Thu, 16 Mar 2023 12:31:24 +0100
parents 5d72dc52ee4a
children 524856bd7b19
line wrap: on
line source

#!/usr/bin/env python3


# SAT plugin for Personal Eventing Protocol (xep-0163)
# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from typing import Optional, Callable
from sat.core.i18n import _
from sat.core import exceptions
from sat.core.constants import Const as C
from sat.core.log import getLogger

from twisted.words.xish import domish

from wokkel import disco, pubsub
from wokkel.formats import Mood
from sat.tools.common import data_format


log = getLogger(__name__)

NS_USER_MOOD = "http://jabber.org/protocol/mood"

PLUGIN_INFO = {
    C.PI_NAME: "Personal Eventing Protocol Plugin",
    C.PI_IMPORT_NAME: "XEP-0163",
    C.PI_TYPE: "XEP",
    C.PI_MODES: C.PLUG_MODE_BOTH,
    C.PI_PROTOCOLS: ["XEP-0163", "XEP-0107"],
    C.PI_DEPENDENCIES: ["XEP-0060"],
    C.PI_MAIN: "XEP_0163",
    C.PI_HANDLER: "no",
    C.PI_DESCRIPTION: _("""Implementation of Personal Eventing Protocol"""),
}


class XEP_0163(object):
    def __init__(self, host):
        log.info(_("PEP plugin initialization"))
        self.host = host
        self.pep_events = set()
        self.pep_out_cb = {}
        host.trigger.add("PubSub Disco Info", self.disoInfoTrigger)
        host.bridge.addMethod(
            "PEPSend",
            ".plugin",
            in_sign="sa{ss}s",
            out_sign="",
            method=self.PEPSend,
            async_=True,
        )  # args: type(MOOD, TUNE, etc), data, profile_key;
        self.addPEPEvent("MOOD", NS_USER_MOOD, self.userMoodCB, self.sendMood)

    def disoInfoTrigger(self, disco_info, profile):
        """Add info from managed PEP

        @param disco_info: list of disco feature as returned by PubSub,
            will be filled with PEP features
        @param profile: profile we are handling
        """
        disco_info.extend(list(map(disco.DiscoFeature, self.pep_events)))
        return True

    def addPEPEvent(
        self,
        event_type: Optional[str],
        node: str,
        in_callback: Callable,
        out_callback: Optional[Callable] = None,
        notify: bool = True
    ) -> None:
        """Add a Personal Eventing Protocol event manager

        @param event_type: type of the event (stored uppercase),
            only used when out_callback is set.
            Can be MOOD, TUNE, etc.
        @param node: namespace of the node (e.g. http://jabber.org/protocol/mood
            for User Mood)
        @param in_callback: method to call when this event occur
            the callable will be called with (itemsEvent, profile) as arguments
        @param out_callback: method to call when we want to publish this
            event (must return a deferred)
            the callable will be called when sendPEPEvent is called
        @param notify: add autosubscribe (+notify) if True
        """
        if event_type and out_callback:
            event_type = event_type.upper()
            if event_type in self.pep_out_cb:
                raise exceptions.ConflictError(
                    f"event_type {event_type!r} already exists"
                )
            self.pep_out_cb[event_type] = out_callback
        self.pep_events.add(node)
        if notify:
            self.pep_events.add(node + "+notify")

        def filterPEPEvent(client, itemsEvent):
            """Ignore messages which are not coming from PEP (i.e. a bare jid)

            @param itemsEvent(pubsub.ItemsEvent): pubsub event
            """
            if not itemsEvent.sender.user or itemsEvent.sender.resource:
                log.debug(
                    "ignoring non PEP event from {} (profile={})".format(
                        itemsEvent.sender.full(), client.profile
                    )
                )
                return
            in_callback(itemsEvent, client.profile)

        self.host.plugins["XEP-0060"].addManagedNode(node, items_cb=filterPEPEvent)

    def sendPEPEvent(self, node, data, profile):
        """Publish the event data

        @param node(unicode): node namespace
        @param data: domish.Element to use as payload
        @param profile: profile which send the data
        """
        client = self.host.getClient(profile)
        item = pubsub.Item(payload=data)
        return self.host.plugins["XEP-0060"].publish(client, None, node, [item])

    def PEPSend(self, event_type, data, profile_key=C.PROF_KEY_NONE):
        """Send personal event after checking the data is alright

        @param event_type: type of event (eg: MOOD, TUNE),
            must be in self.pep_out_cb.keys()
        @param data: dict of {string:string} of event_type dependant data
        @param profile_key: profile who send the event
        """
        profile = self.host.memory.getProfileName(profile_key)
        if not profile:
            log.error(
                _("Trying to send personal event with an unknown profile key [%s]")
                % profile_key
            )
            raise exceptions.ProfileUnknownError
        if not event_type in list(self.pep_out_cb.keys()):
            log.error(_("Trying to send personal event for an unknown type"))
            raise exceptions.DataError("Type unknown")
        return self.pep_out_cb[event_type](data, profile)

    def userMoodCB(self, itemsEvent, profile):
        if not itemsEvent.items:
            log.debug(_("No item found"))
            return
        try:
            mood_elt = [
                child for child in itemsEvent.items[0].elements() if child.name == "mood"
            ][0]
        except IndexError:
            log.error(_("Can't find mood element in mood event"))
            return
        mood = Mood.fromXml(mood_elt)
        if not mood:
            log.debug(_("No mood found"))
            return
        self.host.bridge.psEvent(
            C.PS_PEP,
            itemsEvent.sender.full(),
            itemsEvent.nodeIdentifier,
            "MOOD",
            data_format.serialise({"mood": mood.value or "", "text": mood.text or ""}),
            profile,
        )

    def sendMood(self, data, profile):
        """Send XEP-0107's User Mood

        @param data: must include mood and text
        @param profile: profile which send the mood"""
        try:
            value = data["mood"].lower()
            text = data["text"] if "text" in data else ""
        except KeyError:
            raise exceptions.DataError("Mood data must contain at least 'mood' key")
        mood = UserMood(value, text)
        return self.sendPEPEvent(NS_USER_MOOD, mood, profile)


class UserMood(Mood, domish.Element):
    """Improved wokkel Mood which is also a domish.Element"""

    def __init__(self, value, text=None):
        Mood.__init__(self, value, text)
        domish.Element.__init__(self, (NS_USER_MOOD, "mood"))
        self.addElement(value)
        if text:
            self.addElement("text", content=text)