Mercurial > libervia-backend
changeset 742:03744d9ebc13
plugin XEP-0033: implementation of the addressing feature:
- frontends pass the recipients in the extra parameter of sendMessage
- backend checks if the target server supports the feature (this is not done yet by prosody plugin)
- features and identities are cached per profile and server
- messages are duplicated in history for now (TODO: redesign the database)
- echos signals are also duplicated to the sender (FIXME)
author | souliane <souliane@mailoo.org> |
---|---|
date | Wed, 11 Dec 2013 17:16:53 +0100 (2013-12-11) |
parents | 00318e60a06a |
children | 5a131930348d |
files | src/core/sat_main.py src/core/xmpp.py src/memory/memory.py src/plugins/plugin_misc_groupblog.py src/plugins/plugin_xep_0033.py src/plugins/plugin_xep_0045.py src/plugins/plugin_xep_0065.py src/tools/misc.py |
diffstat | 8 files changed, 373 insertions(+), 60 deletions(-) [+] |
line wrap: on
line diff
--- a/src/core/sat_main.py Fri Dec 13 05:35:24 2013 +0100 +++ b/src/core/sat_main.py Wed Dec 11 17:16:53 2013 +0100 @@ -62,6 +62,20 @@ return "sat_id_" + str(sat_id) +class MessageSentAndStored(Exception): + """ Exception to raise if the message has been already sent and stored in the + history by the trigger, so the rest of the process should be stopped. This + should normally be raised by the trigger with the minimal priority """ + pass + + +class AbortSendMessage(Exception): + """ Exception to raise if sending the message should be aborted. This can be + raised by any trigger but a side action should be planned by the trigger + to inform the user about what happened """ + pass + + class SAT(service.Service): def get_next_id(self): @@ -354,6 +368,14 @@ raise exceptions.ProfileKeyUnknownError return [self.profiles[profile]] + def getClientHostJid(self, profile_key): + """Convenient method to get the client host from profile key + @return: host jid or None if it doesn't exist""" + profile = self.memory.getProfileName(profile_key) + if not profile: + return None + return self.profiles[profile].getHostJid() + def registerNewAccount(self, login, password, email, server, port=5222, id=None, profile_key='@DEFAULT@'): """Connect to a server and create a new account using in-band registration""" profile = self.memory.getProfileName(profile_key) @@ -541,27 +563,52 @@ if mess_data["message"]: mess_data['xml'].addElement("body", None, mess_data["message"]) - def sendAndStore(mess_data): + def sendErrback(e): + text = '%s: %s' % (e.value.__class__.__name__, e.getErrorMessage()) + if e.check(MessageSentAndStored): + debug(text) + elif e.check(AbortSendMessage): + warning(text) + else: + error("Unmanaged exception: %s" % text) + return e + + treatments.addCallbacks(self.sendAndStoreMessage, sendErrback, [False, profile]) + treatments.callback(mess_data) + + def sendAndStoreMessage(self, mess_data, skip_send=False, profile=None): + """Actually send and store the message to history, after all the treatments + have been done. This has been moved outside the main sendMessage method + because it is used by XEP-0033 to complete a server-side feature not yet + implemented by the prosody plugin. + @param mess_data: message data dictionary + @param skip_send: set to True to skip sending the message to only store it + @param profile: profile + """ + try: + client = self.profiles[profile] + except KeyError: + error(_("Trying to send a message with no profile")) + return + current_jid = client.jid + if not skip_send: client.xmlstream.send(mess_data['xml']) - if mess_data["type"] != "groupchat": - # we don't add groupchat message to history, as we get them back - # and they will be added then - if mess_data['message']: # we need a message to save something - self.memory.addToHistory(current_jid, mess_data['to'], - unicode(mess_data["message"]), - unicode(mess_data["type"]), - mess_data['extra'], - profile=profile) + if mess_data["type"] != "groupchat": + # we don't add groupchat message to history, as we get them back + # and they will be added then + if mess_data['message']: # we need a message to save something + self.memory.addToHistory(current_jid, mess_data['to'], + unicode(mess_data["message"]), + unicode(mess_data["type"]), + mess_data['extra'], + profile=profile) # We send back the message, so all clients are aware of it - if mess_data["message"]: - self.bridge.newMessage(mess_data['xml']['from'], - unicode(mess_data["message"]), - mess_type=mess_data["type"], - to_jid=mess_data['xml']['to'], extra=mess_data['extra'], - profile=profile) - - treatments.addCallback(sendAndStore) - treatments.callback(mess_data) + self.bridge.newMessage(mess_data['xml']['from'], + unicode(mess_data["message"]), + mess_type=mess_data["type"], + to_jid=mess_data['xml']['to'], + extra=mess_data['extra'], + profile=profile) def setPresence(self, to="", show="", priority=0, statuses=None, profile_key='@NONE@'): """Send our presence information""" @@ -623,30 +670,100 @@ self.profiles[profile].roster.removeItem(to_jid) self.profiles[profile].presence.unsubscribe(to_jid) + def requestServerDisco(self, feature, jid_=None, cache_only=False, profile_key="@NONE"): + """Discover if a server or its items offer a given feature + @param feature: the feature to check + @param jid_: the jid of the server + @param cache_only: expect the result to be in cache and don't actually + make any request to avoid returning a Deferred. This can be used anytime + for requesting the local server because the data are cached for sure. + @result: the Deferred entity jid offering the feature, or None + """ + profile = self.memory.getProfileName(profile_key) + + if not profile: + return defer.succeed(None) + if jid_ is None: + jid_ = self.getClientHostJid(profile) + cache_only = True + hasServerFeature = lambda entity: entity if self.memory.hasServerFeature(feature, entity, profile) else None + + def haveItemsFeature(dummy=None): + if jid_ in self.memory.server_identities[profile]: + for entity in self.memory.server_identities[profile][jid_].values(): + if hasServerFeature(entity): + return entity + return None + + entity = hasServerFeature(jid_) or haveItemsFeature() + if entity: + return defer.succeed(entity) + elif entity is False or cache_only: + return defer.succeed(None) + + # data for this server are not in cache + disco = self.profiles[profile].disco + + def errback(failure, method, jid_, profile): + # the target server is not reachable + logging.error("disco.%s on %s failed! [%s]" % (method.func_name, jid_, profile)) + logging.error("reason: %s" % failure.getErrorMessage()) + if method == disco.requestInfo: + features = self.memory.server_features.setdefault(profile, {}) + features.setdefault(jid_, []) + elif method == disco.requestItems: + identities = self.memory.server_identities.setdefault(profile, {}) + identities.setdefault(jid_, {}) + return failure + + def callback(d): + if hasServerFeature(jid_): + return jid_ + else: + d2 = disco.requestItems(jid_).addCallback(self.serverDiscoItems, disco, jid_, profile) + d2.addErrback(errback, disco.requestItems, jid_, profile) + return d2.addCallback(haveItemsFeature) + + d = disco.requestInfo(jid_).addCallback(self.serverDisco, jid_, profile) + d.addCallbacks(callback, errback, [], [disco.requestInfo, jid_, profile]) + return d + ## callbacks ## - def serverDisco(self, disco, profile): - """xep-0030 Discovery Protocol.""" + def serverDisco(self, disco, jid_=None, profile=None): + """xep-0030 Discovery Protocol. + @param disco: result of the disco info query + @param jid_: the jid of the target server + @param profile: profile of the user + """ + if jid_ is None: + jid_ = self.getClientHostJid(profile) + debug(_("Requested disco info on %s") % jid_) for feature in disco.features: - debug(_("Feature found: %s"), feature) - self.memory.addServerFeature(feature, profile) - for cat, type in disco.identities: - debug(_("Identity found: [%(category)s/%(type)s] %(identity)s") % {'category': cat, 'type': type, 'identity': disco.identities[(cat, type)]}) + debug(_("Feature found: %s") % feature) + self.memory.addServerFeature(feature, jid_, profile) + for cat, type_ in disco.identities: + debug(_("Identity found: [%(category)s/%(type)s] %(identity)s") + % {'category': cat, 'type': type_, 'identity': disco.identities[(cat, type_)]}) - def serverDiscoItems(self, disco_result, disco_client, profile, initialized): + def serverDiscoItems(self, disco_result, disco_client, jid_, profile, initialized=None): """xep-0030 Discovery Protocol. @param disco_result: result of the disco item querry @param disco_client: SatDiscoProtocol instance + @param jid_: the jid of the target server @param profile: profile of the user @param initialized: deferred which must be chained when everything is done""" - def _check_entity_cb(result, entity, profile): - for category, type in result.identities: - debug(_('Identity added: (%(category)s,%(type)s) ==> %(entity)s [%(profile)s]') % { - 'category': category, 'type': type, 'entity': entity, 'profile': profile}) - self.memory.addServerIdentity(category, type, entity, profile) + def _check_entity_cb(result, entity, jid_, profile): + debug(_("Requested disco info on %s") % entity) + for category, type_ in result.identities: + debug(_('Identity added: (%(category)s,%(type)s) ==> %(entity)s [%(profile)s]') + % {'category': category, 'type': type_, 'entity': entity, 'profile': profile}) + self.memory.addServerIdentity(category, type_, entity, jid_, profile) + for feature in result.features: + self.memory.addServerFeature(feature, entity, profile) - def _errback(result, entity, profile): + def _errback(result, entity, jid_, profile): warning(_("Can't get information on identity [%(entity)s] for profile [%(profile)s]") % {'entity': entity, 'profile': profile}) defer_list = [] @@ -654,9 +771,11 @@ if item.entity.full().count('.') == 1: # XXX: workaround for a bug on jabberfr, tmp warning(_('Using jabberfr workaround, be sure your domain has at least two levels (e.g. "example.tld", not "example" alone)')) continue - args = [item.entity, profile] + args = [item.entity, jid_, profile] defer_list.append(disco_client.requestInfo(item.entity).addCallbacks(_check_entity_cb, _errback, args, None, args)) - defer.DeferredList(defer_list).chainDeferred(initialized) + if initialized: + defer.DeferredList(defer_list).chainDeferred(initialized) + ## Generic HMI ## def actionResult(self, action_id, action_type, data, profile):
--- a/src/core/xmpp.py Fri Dec 13 05:35:24 2013 +0100 +++ b/src/core/xmpp.py Wed Dec 11 17:16:53 2013 +0100 @@ -75,9 +75,10 @@ self.presence.available() - self.disco.requestInfo(jid.JID(self.jid.host)).addCallback(self.host_app.serverDisco, self.profile) # FIXME: use these informations + jid_ = self.getHostJid() + self.disco.requestInfo(jid_).addCallback(self.host_app.serverDisco, jid_, self.profile) # FIXME: use these informations - self.disco.requestItems(jid.JID(self.jid.host)).addCallback(self.host_app.serverDiscoItems, self.disco, self.profile, self.client_initialized) + self.disco.requestItems(jid_).addCallback(self.host_app.serverDiscoItems, self.disco, jid_, self.profile, self.client_initialized) self.conn_deferred.callback(None) def initializationFailed(self, reason): @@ -103,6 +104,10 @@ self.host_app.bridge.disconnected(self.profile) # we send the signal to the clients self.host_app.purgeClient(self.profile) # and we remove references to this client + def getHostJid(self): + """@return: the jid of the local server""" + return jid.JID(self.jid.host) + class SatMessageProtocol(xmppim.MessageProtocol):
--- a/src/memory/memory.py Fri Dec 13 05:35:24 2013 +0100 +++ b/src/memory/memory.py Wed Dec 11 17:16:53 2013 +0100 @@ -756,49 +756,63 @@ assert profile != "@NONE@" return self.storage.getHistory(jid.JID(from_jid), jid.JID(to_jid), limit, between, profile) - def addServerFeature(self, feature, profile): + def addServerFeature(self, feature, jid_, profile): """Add a feature discovered from server @param feature: string of the feature - @param profile: which profile is using this server ?""" + @param jid_: the jid of the target server + @param profile: which profile is asking this server ?""" if profile not in self.server_features: - self.server_features[profile] = [] - self.server_features[profile].append(feature) + self.server_features[profile] = {} + features = self.server_features[profile].setdefault(jid_, []) + features.append(feature) - def addServerIdentity(self, category, type_, entity, profile): + def addServerIdentity(self, category, type_, entity, jid_, profile): """Add an identity discovered from server @param feature: string of the feature - @param profile: which profile is using this server ?""" + @param jid_: the jid of the target server + @param profile: which profile is asking this server ?""" if not profile in self.server_identities: self.server_identities[profile] = {} - if (category, type_) not in self.server_identities[profile]: - self.server_identities[profile][(category, type_)] = set() - self.server_identities[profile][(category, type_)].add(entity) + identities = self.server_identities[profile].setdefault(jid_, {}) + if (category, type_) not in identities: + identities[(category, type_)] = set() + identities[(category, type_)].add(entity) - def getServerServiceEntities(self, category, type_, profile): + def getServerServiceEntities(self, category, type_, jid_=None, profile=None): """Return all available entities for a service""" - if profile in self.server_identities: - return self.server_identities[profile].get((category, type_), set()) + if jid_ is None: + jid_ = self.host.getClientHostJid(profile) + if profile in self.server_identities and jid_ in self.server_identities[profile]: + return self.server_identities[profile][jid_].get((category, type_), set()) else: return None - def getServerServiceEntity(self, category, type_, profile): + def getServerServiceEntity(self, category, type_, jid_=None, profile=None): """Helper method to get first available entity for a service""" - entities = self.getServerServiceEntities(category, type_, profile) + if jid_ is None: + jid_ = self.host.getClientHostJid(profile) + entities = self.getServerServiceEntities(category, type_, jid_, profile) if entities is None: - warning(_("Entities (%(category)s/%(type)s) not available, maybe they haven't been asked to server yet ?") % {"category": category, - "type": type_}) + warning(_("Entities (%(category)s/%(type)s) of %(server)s not available, maybe they haven't been asked yet?") + % {"category": category, "type": type_, "server": jid_}) return None else: return list(entities)[0] if entities else None - def hasServerFeature(self, feature, profile_key): + def hasServerFeature(self, feature, jid_=None, profile_key="@NONE@"): """Tell if the server of the profile has the required feature""" profile = self.getProfileName(profile_key) if not profile: error(_('Trying find server feature for a non-existant profile')) - return + return None assert profile in self.server_features - return feature in self.server_features[profile] + if jid_ is None: + jid_ = self.host.getClientHostJid(profile) + if jid_ in self.server_features[profile]: + return feature in self.server_features[profile][jid_] + else: + warning(_("Features of %s not available, maybe they haven't been asked yet?") % jid_) + return None def getLastResource(self, contact, profile_key): """Return the last resource used by a contact
--- a/src/plugins/plugin_misc_groupblog.py Fri Dec 13 05:35:24 2013 +0100 +++ b/src/plugins/plugin_misc_groupblog.py Wed Dec 11 17:16:53 2013 +0100 @@ -140,7 +140,7 @@ #we don't have any pubsub server featuring item access yet client.item_access_pubsub = None client._item_access_pubsub_pending = defer.Deferred() - for entity in self.host.memory.getServerServiceEntities("pubsub", "service", profile): + for entity in self.host.memory.getServerServiceEntities("pubsub", "service", profile=profile): _disco = yield client.disco.requestInfo(entity) #if set([NS_PUBSUB_ITEM_ACCESS, NS_PUBSUB_AUTO_CREATE, NS_PUBSUB_CREATOR_JID_CHECK]).issubset(_disco.features): if set([NS_PUBSUB_AUTO_CREATE, NS_PUBSUB_CREATOR_JID_CHECK]).issubset(_disco.features):
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/plugins/plugin_xep_0033.py Wed Dec 11 17:16:53 2013 +0100 @@ -0,0 +1,174 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +# SAT plugin for Extended Stanza Addressing (xep-0033) +# Copyright (C) 2013 Adrien Cossa (souliane@mailoo.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import logging +from sat.core import exceptions +from wokkel import disco, iwokkel +from zope.interface import implements +from twisted.words.protocols.jabber.jid import JID +import copy +try: + from twisted.words.protocols.xmlstream import XMPPHandler +except ImportError: + from wokkel.subprotocols import XMPPHandler +from threading import Timer +from twisted.words.xish import domish +from twisted.internet import defer + +from sat.core.sat_main import MessageSentAndStored, AbortSendMessage +from sat.tools.misc import TriggerManager + +# TODO: fix Prosody "addressing" plugin to leave the concerned bcc according to the spec: +# +# http://xmpp.org/extensions/xep-0033.html#addr-type-bcc +# "This means that the server MUST remove these addresses before the stanza is delivered to anyone other than the given bcc addressee or the multicast service of the bcc addressee." +# +# http://xmpp.org/extensions/xep-0033.html#multicast +# "Each 'bcc' recipient MUST receive only the <address type='bcc'/> associated with that addressee." + +# TODO: fix Prosody "addressing" plugin to determine itself if remote servers supports this XEP + + +NS_XMPP_CLIENT = "jabber:client" +NS_ADDRESS = "http://jabber.org/protocol/address" +ATTRIBUTES = ["jid", "uri", "node", "desc", "delivered", "type"] +ADDRESS_TYPES = ["to", "cc", "bcc", "replyto", "replyroom", "noreply"] + +PLUGIN_INFO = { + "name": "Extended Stanza Addressing Protocol Plugin", + "import_name": "XEP-0033", + "type": "XEP", + "protocols": ["XEP-0033"], + "dependencies": [], + "main": "XEP_0033", + "handler": "yes", + "description": _("""Implementation of Extended Stanza Addressing""") +} + + +class XEP_0033(object): + """ + Implementation for XEP 0033 + """ + def __init__(self, host): + logging.info(_("Extended Stanza Addressing plugin initialization")) + self.host = host + host.trigger.add("sendMessage", self.sendMessageTrigger, TriggerManager.MIN_PRIORITY) + host.trigger.add("MessageReceived", self.messageReceivedTrigger) + + def sendMessageTrigger(self, mess_data, treatments, profile): + """Process the XEP-0033 related data to be sent""" + + def treatment(mess_data): + if not 'address' in mess_data['extra']: + return mess_data + + def discoCallback(entity): + if entity is None: + raise AbortSendMessage(_("XEP-0033 is being used but the server doesn't support it!")) + to = JID(mess_data["to"].host) + if to != entity: + logging.warning(_("Stanzas using XEP-0033 should be addressed to %s, not %s!") % (entity, to)) + logging.warning(_("TODO: addressing has be fixed by the backend... fix it in the frontend!")) + mess_data["to"] = entity + element = mess_data['xml'].addElement('addresses', NS_ADDRESS) + entries = [entry.split(':') for entry in mess_data['extra']['address'].split('\n') if entry != ''] + for type_, jid_ in entries: + element.addChild(domish.Element((None, 'address'), None, {'type': type_, 'jid': jid_})) + # when the prosody plugin is completed, we can immediately return mess_data from here + return self.sendAndStoreMessage(mess_data, entries, profile) + + d = self.host.requestServerDisco(NS_ADDRESS, profile_key=profile) + d.addCallbacks(discoCallback, lambda dummy: discoCallback(None)) + return d + + treatments.addCallback(treatment) + return True + + def sendAndStoreMessage(self, mess_data, entries, profile): + """Check if target servers support XEP-0033, send and store the messages + @raise: a friendly failure to let the core know that we sent the message already + + Later we should be able to remove this method because: + # XXX: sending the messages should be done by the local server + # FIXME: for now we duplicate the messages in the history for each recipient, this should change + # FIXME: for now we duplicate the echoes to the sender, this should also change + Ideas: + - fix Prosody plugin to check if target server support the feature + - redesign the database to save only one entry to the database + - change the newMessage signal to eventually pass more than one recipient + """ + def discoCallback(entity, to_jid): + new_data = copy.deepcopy(mess_data) + new_data['to'] = JID(to_jid) + new_data['xml']['to'] = to_jid + if entity: + if 'address' in mess_data['extra']: + self.host.sendAndStoreMessage(mess_data, False, profile) + # just to remember that the message has been sent + del mess_data['extra']['address'] + # we still need to fill the history and signal the echo... + self.host.sendAndStoreMessage(new_data, True, profile) + else: + # target server misses the addressing feature + self.host.sendAndStoreMessage(new_data, False, profile) + + def errback(failure, to_jid): + discoCallback(None, to_jid) + + for type_, jid_ in entries: + d = defer.Deferred() + d.addCallback(self.host.requestServerDisco, JID(JID(jid_).host), profile_key=profile) + d.addCallbacks(discoCallback, errback, callbackArgs=[jid_], errbackArgs=[jid_]) + d.callback(NS_ADDRESS) + + raise MessageSentAndStored("XEP-0033 took over") + + def messageReceivedTrigger(self, message, post_treat, profile): + """In order to save the addressing information in the history""" + def post_treat_addr(data, addresses): + data['extra']['addresses'] = "" + for address in addresses: + data['extra']['addresses'] += '%s:%s\n' % (address['type'], address['jid']) + return data + + try: + addresses = message.elements(NS_ADDRESS, 'addresses').next() + post_treat.addCallback(post_treat_addr, addresses.children) + except StopIteration: + pass # no addresses + return True + + def getHandler(self, profile): + return XEP_0033_handler(self, profile) + + +class XEP_0033_handler(XMPPHandler): + implements(iwokkel.IDisco) + + def __init__(self, plugin_parent, profile): + self.plugin_parent = plugin_parent + self.host = plugin_parent.host + self.profile = profile + + def getDiscoInfo(self, requestor, target, nodeIdentifier=''): + return [disco.DiscoFeature(NS_ADDRESS)] + + def getDiscoItems(self, requestor, target, nodeIdentifier=''): + return []
--- a/src/plugins/plugin_xep_0045.py Fri Dec 13 05:35:24 2013 +0100 +++ b/src/plugins/plugin_xep_0045.py Wed Dec 11 17:16:53 2013 +0100 @@ -187,7 +187,7 @@ def getMUCService(self, profile): """Return the MUC service or None""" muc_service = None - for service in self.host.memory.getServerServiceEntities("conference", "text", profile): + for service in self.host.memory.getServerServiceEntities("conference", "text", profile=profile): if not ".irc." in service.userhost(): #FIXME: #This awfull ugly hack is here to avoid an issue with openfire: the irc gateway
--- a/src/plugins/plugin_xep_0065.py Fri Dec 13 05:35:24 2013 +0100 +++ b/src/plugins/plugin_xep_0065.py Wed Dec 11 17:16:53 2013 +0100 @@ -807,7 +807,7 @@ def connectionInitialized(self): def after_init(ignore): - proxy_ent = self.host.memory.getServerServiceEntity("proxy", "bytestreams", self.parent.profile) + proxy_ent = self.host.memory.getServerServiceEntity("proxy", "bytestreams", profile=self.parent.profile) if not proxy_ent: debug(_("No proxy found on this server")) return
--- a/src/tools/misc.py Fri Dec 13 05:35:24 2013 +0100 +++ b/src/tools/misc.py Wed Dec 11 17:16:53 2013 +0100 @@ -22,18 +22,19 @@ import sys from logging import debug, warning + class TriggerException(Exception): pass class SkipOtherTriggers(Exception): """ Exception to raise if normal behaviour must be followed instead of - followind triggers list """ + following triggers list """ pass class TriggerManager(object): - """This class manage triggers: code which interact to change de behaviour + """This class manage triggers: code which interact to change the behaviour of SàT""" MIN_PRIORITY = float('-inf')