view sat/plugins/plugin_xep_0163.py @ 3875:a0666f17f300

plugin merge-requests: fix `await` use on blocking method
author Goffi <goffi@goffi.org>
date Tue, 23 Aug 2022 13:00:07 +0200
parents 5d72dc52ee4a
children 524856bd7b19
line wrap: on
line source

#!/usr/bin/env python3


# SAT plugin for Personal Eventing Protocol (xep-0163)
# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from typing import Optional, Callable
from sat.core.i18n import _
from sat.core import exceptions
from sat.core.constants import Const as C
from sat.core.log import getLogger

from twisted.words.xish import domish

from wokkel import disco, pubsub
from wokkel.formats import Mood
from sat.tools.common import data_format


log = getLogger(__name__)

NS_USER_MOOD = "http://jabber.org/protocol/mood"

PLUGIN_INFO = {
    C.PI_NAME: "Personal Eventing Protocol Plugin",
    C.PI_IMPORT_NAME: "XEP-0163",
    C.PI_TYPE: "XEP",
    C.PI_MODES: C.PLUG_MODE_BOTH,
    C.PI_PROTOCOLS: ["XEP-0163", "XEP-0107"],
    C.PI_DEPENDENCIES: ["XEP-0060"],
    C.PI_MAIN: "XEP_0163",
    C.PI_HANDLER: "no",
    C.PI_DESCRIPTION: _("""Implementation of Personal Eventing Protocol"""),
}


class XEP_0163(object):
    def __init__(self, host):
        log.info(_("PEP plugin initialization"))
        self.host = host
        self.pep_events = set()
        self.pep_out_cb = {}
        host.trigger.add("PubSub Disco Info", self.disoInfoTrigger)
        host.bridge.addMethod(
            "PEPSend",
            ".plugin",
            in_sign="sa{ss}s",
            out_sign="",
            method=self.PEPSend,
            async_=True,
        )  # args: type(MOOD, TUNE, etc), data, profile_key;
        self.addPEPEvent("MOOD", NS_USER_MOOD, self.userMoodCB, self.sendMood)

    def disoInfoTrigger(self, disco_info, profile):
        """Add info from managed PEP

        @param disco_info: list of disco feature as returned by PubSub,
            will be filled with PEP features
        @param profile: profile we are handling
        """
        disco_info.extend(list(map(disco.DiscoFeature, self.pep_events)))
        return True

    def addPEPEvent(
        self,
        event_type: Optional[str],
        node: str,
        in_callback: Callable,
        out_callback: Optional[Callable] = None,
        notify: bool = True
    ) -> None:
        """Add a Personal Eventing Protocol event manager

        @param event_type: type of the event (stored uppercase),
            only used when out_callback is set.
            Can be MOOD, TUNE, etc.
        @param node: namespace of the node (e.g. http://jabber.org/protocol/mood
            for User Mood)
        @param in_callback: method to call when this event occur
            the callable will be called with (itemsEvent, profile) as arguments
        @param out_callback: method to call when we want to publish this
            event (must return a deferred)
            the callable will be called when sendPEPEvent is called
        @param notify: add autosubscribe (+notify) if True
        """
        if event_type and out_callback:
            event_type = event_type.upper()
            if event_type in self.pep_out_cb:
                raise exceptions.ConflictError(
                    f"event_type {event_type!r} already exists"
                )
            self.pep_out_cb[event_type] = out_callback
        self.pep_events.add(node)
        if notify:
            self.pep_events.add(node + "+notify")

        def filterPEPEvent(client, itemsEvent):
            """Ignore messages which are not coming from PEP (i.e. a bare jid)

            @param itemsEvent(pubsub.ItemsEvent): pubsub event
            """
            if not itemsEvent.sender.user or itemsEvent.sender.resource:
                log.debug(
                    "ignoring non PEP event from {} (profile={})".format(
                        itemsEvent.sender.full(), client.profile
                    )
                )
                return
            in_callback(itemsEvent, client.profile)

        self.host.plugins["XEP-0060"].addManagedNode(node, items_cb=filterPEPEvent)

    def sendPEPEvent(self, node, data, profile):
        """Publish the event data

        @param node(unicode): node namespace
        @param data: domish.Element to use as payload
        @param profile: profile which send the data
        """
        client = self.host.getClient(profile)
        item = pubsub.Item(payload=data)
        return self.host.plugins["XEP-0060"].publish(client, None, node, [item])

    def PEPSend(self, event_type, data, profile_key=C.PROF_KEY_NONE):
        """Send personal event after checking the data is alright

        @param event_type: type of event (eg: MOOD, TUNE),
            must be in self.pep_out_cb.keys()
        @param data: dict of {string:string} of event_type dependant data
        @param profile_key: profile who send the event
        """
        profile = self.host.memory.getProfileName(profile_key)
        if not profile:
            log.error(
                _("Trying to send personal event with an unknown profile key [%s]")
                % profile_key
            )
            raise exceptions.ProfileUnknownError
        if not event_type in list(self.pep_out_cb.keys()):
            log.error(_("Trying to send personal event for an unknown type"))
            raise exceptions.DataError("Type unknown")
        return self.pep_out_cb[event_type](data, profile)

    def userMoodCB(self, itemsEvent, profile):
        if not itemsEvent.items:
            log.debug(_("No item found"))
            return
        try:
            mood_elt = [
                child for child in itemsEvent.items[0].elements() if child.name == "mood"
            ][0]
        except IndexError:
            log.error(_("Can't find mood element in mood event"))
            return
        mood = Mood.fromXml(mood_elt)
        if not mood:
            log.debug(_("No mood found"))
            return
        self.host.bridge.psEvent(
            C.PS_PEP,
            itemsEvent.sender.full(),
            itemsEvent.nodeIdentifier,
            "MOOD",
            data_format.serialise({"mood": mood.value or "", "text": mood.text or ""}),
            profile,
        )

    def sendMood(self, data, profile):
        """Send XEP-0107's User Mood

        @param data: must include mood and text
        @param profile: profile which send the mood"""
        try:
            value = data["mood"].lower()
            text = data["text"] if "text" in data else ""
        except KeyError:
            raise exceptions.DataError("Mood data must contain at least 'mood' key")
        mood = UserMood(value, text)
        return self.sendPEPEvent(NS_USER_MOOD, mood, profile)


class UserMood(Mood, domish.Element):
    """Improved wokkel Mood which is also a domish.Element"""

    def __init__(self, value, text=None):
        Mood.__init__(self, value, text)
        domish.Element.__init__(self, (NS_USER_MOOD, "mood"))
        self.addElement(value)
        if text:
            self.addElement("text", content=text)