changeset 322:54d90c73b8b5

mam: various improvments: - put common namespaces ton const - VAL_RSM_MAX_DEFAULT can be None if default limit is not wanted - ItemDate now has a 'date' attribute - MAMService is MonkeyPatched the same way as PubSubService to handle PEP - fixed error mapping in mam module - PEP is handled - properly manage date in a payload independent way - when PEP is used, send privileged messages
author Goffi <goffi@goffi.org>
date Tue, 05 Jan 2016 23:13:13 +0100
parents c7fe09894952
children 8496af26be45
files sat_pubsub/const.py sat_pubsub/container.py sat_pubsub/delegation.py sat_pubsub/mam.py sat_pubsub/pgsql_storage.py
diffstat 5 files changed, 118 insertions(+), 72 deletions(-) [+]
line wrap: on
line diff
--- a/sat_pubsub/const.py	Tue Jan 05 22:16:37 2016 +0100
+++ b/sat_pubsub/const.py	Tue Jan 05 23:13:13 2016 +0100
@@ -51,10 +51,11 @@
 # WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 
 
-
+NS_CLIENT = 'jabber:client'
 NS_GROUPBLOG_PREFIX = 'urn:xmpp:groupblog:'
 NS_ITEM_CONFIG = "http://jabber.org/protocol/pubsub#item-config"
 NS_ATOM = "http://www.w3.org/2005/Atom"
+NS_FORWARD = 'urn:xmpp:forward:0'
 OPT_ACCESS_MODEL = 'pubsub#access_model'
 OPT_ROSTER_GROUPS_ALLOWED = 'pubsub#roster_groups_allowed'
 OPT_PERSIST_ITEMS = "pubsub#persist_items"
@@ -69,6 +70,6 @@
 VAL_PMODEL_SUBSCRIBERS = 'subscribers'
 VAL_PMODEL_OPEN = 'open'
 VAL_PMODEL_DEFAULT = VAL_PMODEL_PUBLISHERS
-VAL_RSM_MAX_DEFAULT = 10
+VAL_RSM_MAX_DEFAULT = 10 # None for no limit
 FLAG_ENABLE_RSM = True
 FLAG_ENABLE_MAM = True
--- a/sat_pubsub/container.py	Tue Jan 05 22:16:37 2016 +0100
+++ b/sat_pubsub/container.py	Tue Jan 05 23:13:13 2016 +0100
@@ -20,5 +20,5 @@
 from collections import namedtuple
 
 
-ItemData = namedtuple('ItemData', ('item', 'access_model', 'config', 'categories'))
+ItemData = namedtuple('ItemData', ('item', 'access_model', 'config', 'categories', 'date'))
 ItemData.__new__.__defaults__ = (None,) * (len(ItemData._fields) - 1) # Only item is mandatory
--- a/sat_pubsub/delegation.py	Tue Jan 05 22:16:37 2016 +0100
+++ b/sat_pubsub/delegation.py	Tue Jan 05 23:13:13 2016 +0100
@@ -26,6 +26,7 @@
 from wokkel import data_form
 from wokkel import disco, iwokkel
 from wokkel.iwokkel import IPubSubService
+from wokkel import mam
 from twisted.python import log
 from twisted.words.protocols.jabber import jid, error
 from twisted.words.protocols.jabber.xmlstream import toResponse
@@ -40,11 +41,14 @@
 DELEGATION_MAIN_SEP = "::"
 DELEGATION_BARE_SEP = ":bare:"
 
+TO_HACK = ((IPubSubService, pubsub, "PubSubRequest"),
+           (mam.IMAMService, mam, "MAMRequest"))
+
+
 class InvalidStanza(Exception):
     pass
 
 
-
 class DelegationsHandler(XMPPHandler):
     implements(iwokkel.IDisco)
     _service_hacked = False
@@ -53,41 +57,42 @@
         super(DelegationsHandler, self).__init__()
 
     def _service_hack(self):
-        """Patch the PubSubService to track delegated stanzas"""
+        """Patch the request classes of services to track delegated stanzas"""
         # XXX: we need to monkey patch to track origin of the stanza in PubSubRequest.
         #      As PubSubRequest from sat.tmp.wokkel.pubsub use _request_class while
         #      original wokkel.pubsub use directly pubsub.PubSubRequest, we need to
         #      check which version is used before monkeypatching
         for handler in self.parent.handlers:
-            if IPubSubService.providedBy(handler):
-                if hasattr(handler, '_request_class'):
-                    request_base_class = handler._request_class
-                else:
-                    request_base_class = pubsub.PubSubRequest
+            for service, module, default_base_cls in TO_HACK:
+                if service.providedBy(handler):
+                    if hasattr(handler, '_request_class'):
+                        request_base_class = handler._request_class
+                    else:
+                        request_base_class = getattr(module, default_base_cls)
 
-                class PubSubRequestWithDelegation(request_base_class):
-                    """A PubSubReques which put an indicator if the stanza comme from delegation"""
+                    class RequestWithDelegation(request_base_class):
+                        """A XxxRequest which put an indicator if the stanza comme from delegation"""
 
-                    @classmethod
-                    def fromElement(cls, element):
-                        """Check if element comme from delegation, and set a delegated flags
+                        @classmethod
+                        def fromElement(cls, element):
+                            """Check if element comme from delegation, and set a delegated flags
 
-                        delegated flaf is either False, or it's a jid of the delegating server
-                        the delegated flag must be set on element before use
-                        """
-                        try:
-                            # __getattr__ is overriden in domish.Element, so we use __getattribute__
-                            delegated = element.__getattribute__('delegated')
-                        except AttributeError:
-                            delegated = False
-                        instance = cls.__base__.fromElement(element)
-                        instance.delegated = delegated
-                        return instance
+                            delegated flag is either False, or it's a jid of the delegating server
+                            the delegated flag must be set on element before use
+                            """
+                            try:
+                                # __getattr__ is overriden in domish.Element, so we use __getattribute__
+                                delegated = element.__getattribute__('delegated')
+                            except AttributeError:
+                                delegated = False
+                            instance = cls.__base__.fromElement(element)
+                            instance.delegated = delegated
+                            return instance
 
-                if hasattr(handler, '_request_class'):
-                    handler._request_class = PubSubRequestWithDelegation
-                else:
-                    pubsub.PubSubRequest = PubSubRequestWithDelegation
+                    if hasattr(handler, '_request_class'):
+                        handler._request_class = RequestWithDelegation
+                    else:
+                        setattr(module, default_base_cls, RequestWithDelegation)
         DelegationsHandler._service_hacked = True
 
     def connectionInitialized(self):
--- a/sat_pubsub/mam.py	Tue Jan 05 22:16:37 2016 +0100
+++ b/sat_pubsub/mam.py	Tue Jan 05 23:13:13 2016 +0100
@@ -28,32 +28,44 @@
 from zope.interface import implements
 
 from twisted.words.xish import domish
+from twisted.python import log
+from twisted.words.protocols.jabber import error
 
 from sat_pubsub import const
-from sat_pubsub.backend import PubSubResourceFromBackend
-from wokkel.pubsub import NS_PUBSUB_EVENT
-
-from dateutil import parser
+from sat_pubsub import backend
+from wokkel import pubsub
 
-# TODO: change this when RSM and MAM are in wokkel
-from sat.tmp.wokkel import rsm
-from sat.tmp.wokkel import mam
-
-NS_CLIENT = 'jabber:client'
+from wokkel import rsm
+from wokkel import mam
+from wokkel import delay
 
 
 class MAMResource(object):
+    implements(mam.IMAMResource)
+    _errorMap = backend.PubSubResourceFromBackend._errorMap
 
-    implements(mam.IMAMResource)
+    def __init__(self, backend_):
+        self.backend = backend_
+
+    def _mapErrors(self, failure):
+        # XXX: come from backend.PubsubResourceFromBackend
+        e = failure.trap(*self._errorMap.keys())
 
-    def __init__(self, backend):
-        self.backend = backend
+        condition, pubsubCondition, feature = self._errorMap[e]
+        msg = failure.value.msg
 
-    def onArchiveRequest(self, mam, requestor):
+        if pubsubCondition:
+            exc = pubsub.PubSubError(condition, pubsubCondition, feature, msg)
+        else:
+            exc = error.StanzaError(condition, text=msg)
+
+        raise exc
+
+    def onArchiveRequest(self, mam_request, requestor):
         """
 
-        @param mam: The MAM <query/> request.
-        @type mam: L{MAMQueryReques<wokkel.mam.MAMQueryRequest>}
+        @param mam_request: The MAM archive request.
+        @type mam_request: L{MAMQueryReques<wokkel.mam.MAMRequest>}
 
         @param requestor: JID of the requestor.
         @type requestor: L{JID<twisted.words.protocols.jabber.jid.JID>}
@@ -62,42 +74,68 @@
         @rtype: C{tuple}
         """
         # FIXME: bad result ordering
-        ext_data = {}
-        if mam.form:
-            ext_data['filters'] = mam.form.fields.values()
-        if mam.rsm is None:
-            mam.rsm = rsm.RSMRequest(const.VAL_RSM_MAX_DEFAULT)
-        ext_data['rsm'] = mam.rsm
+        try:
+            pep = mam_request.delegated
+        except AttributeError:
+            pep = False
+        ext_data = {'pep': pep}
+        if mam_request.form:
+            ext_data['filters'] = mam_request.form.fields.values()
+        if mam_request.rsm is None:
+            if const.VAL_RSM_MAX_DEFAULT != None:
+                log.msg("MAM request without RSM limited to {}".format(const.VAL_RSM_MAX_DEFAULT))
+                ext_data['rsm'] = rsm.RSMRequest(const.VAL_RSM_MAX_DEFAULT)
+        else:
+            ext_data['rsm'] = mam_request.rsm
 
-        d = self.backend.getItems(mam.node, requestor, mam.rsm.max, None, ext_data)
+        d = self.backend.getItemsData(mam_request.node, requestor, None, None, ext_data)
 
         def make_message(elt):
             # XXX: http://xmpp.org/extensions/xep-0297.html#sect-idp629952 (rule 3)
-            message = domish.Element((NS_CLIENT, "message"))
-            event = message.addElement((NS_PUBSUB_EVENT, "event"))
+            message = domish.Element((const.NS_CLIENT, "message"))
+            event = message.addElement((pubsub.NS_PUBSUB_EVENT, "event"))
             items = event.addElement('items')
-            items["node"] = mam.node
+            items["node"] = mam_request.node
             items.addChild(elt)
             return message
 
-        def cb(elts):
+        def cb(items_data):
             msg_data = []
             rsm_elt = None
-            for elt in elts:
-                if elt.name == 'set' and elt.uri == rsm.NS_RSM:
+            for item_data in items_data:
+                if item_data.item.name == 'set' and item_data.item.uri == rsm.NS_RSM:
                     assert rsm_elt is None
-                    rsm_elt = elt
-                elif elt.name == 'item':
-                    # FIXME: this is not good as it is dependant on payload
-                    # TODO: remove this and use date field in database
-                    date = parser.parse(''.join(elt.entry.published.children))
-                    msg_data.append([elt['id'], make_message(elt), date])
+                    rsm_elt = item_data.item
+                elif item_data.item.name == 'item':
+                    msg_data.append([item_data.item['id'], make_message(item_data.item), item_data.date])
+                else:
+                    log.msg("WARNING: unknown element: {}".format(item_data.item.name))
+            if pep:
+                # we need to send privileged message
+                # so me manage the sending ourself, and return
+                # an empty msg_data list to avoid double sending
+                for data in msg_data:
+                    self.forwardPEPMessage(mam_request, requestor, *data)
+                msg_data = []
             return (msg_data, rsm_elt)
 
-        d.addErrback(PubSubResourceFromBackend._mapErrors)
+        d.addErrback(self._mapErrors)
         d.addCallback(cb)
         return d
 
+    def forwardPEPMessage(self, mam_request, requestor, id_, elt, date):
+        msg = domish.Element((None, 'message'))
+        msg['from'] = self.backend.privilege.server_jid.full()
+        msg['to'] = requestor.full()
+        result = msg.addElement((mam.NS_MAM, 'result'))
+        if mam_request.query_id is not None:
+            result['queryid'] = mam_request.query_id
+        result['id'] = id_
+        forward = result.addElement((const.NS_FORWARD, 'forwarded'))
+        forward.addChild(delay.Delay(date).toElement())
+        forward.addChild(elt)
+        self.backend.privilege.sendMessage(msg)
+
     def onPrefsGetRequest(self, requestor):
         """
 
--- a/sat_pubsub/pgsql_storage.py	Tue Jan 05 22:16:37 2016 +0100
+++ b/sat_pubsub/pgsql_storage.py	Tue Jan 05 23:13:13 2016 +0100
@@ -749,7 +749,7 @@
         args = []
 
         # SELECT
-        query = ["SELECT data,items.access_model,item_id"]
+        query = ["SELECT data,items.access_model,item_id,date"]
 
         query_order = self._appendSourcesAndFilters(query, args, authorized_groups, unrestricted, ext_data)
 
@@ -801,15 +801,16 @@
                 item = generic.stripNamespace(parseXml(data[0]))
                 access_model = data[1]
                 item_id = data[2]
+                date = data[3]
                 access_list = {}
                 if access_model == const.VAL_AMODEL_ROSTER: #TODO: jid access_model
                     cursor.execute('SELECT groupname FROM item_groups_authorized WHERE item_id=%s', (item_id,))
                     access_list[const.OPT_ROSTER_GROUPS_ALLOWED] = [r[0] for r in cursor.fetchall()]
 
-                ret.append(container.ItemData(item, access_model, access_list))
+                ret.append(container.ItemData(item, access_model, access_list, date=date))
             return ret
 
-        items_data = [container.ItemData(generic.stripNamespace(parseXml(r[0])), None, None) for r in result]
+        items_data = [container.ItemData(generic.stripNamespace(parseXml(r[0])), r[1], r[2], date=r[3]) for r in result]
         return items_data
 
     def getItemsById(self, authorized_groups, unrestricted, itemIdentifiers):
@@ -828,7 +829,7 @@
         ret = []
         if unrestricted: #we get everything without checking permissions
             for itemIdentifier in itemIdentifiers:
-                cursor.execute("""SELECT data,items.access_model,item_id FROM nodes
+                cursor.execute("""SELECT data,items.access_model,item_id,date FROM nodes
                                   INNER JOIN items USING (node_id)
                                   WHERE node_id=%s AND item=%s""",
                                (self.nodeDbId,
@@ -840,18 +841,19 @@
                 item = generic.stripNamespace(parseXml(result[0]))
                 access_model = result[1]
                 item_id = result[2]
+                date= result[3]
                 access_list = {}
                 if access_model == const.VAL_AMODEL_ROSTER: #TODO: jid access_model
                     cursor.execute('SELECT groupname FROM item_groups_authorized WHERE item_id=%s', (item_id,))
                     access_list[const.OPT_ROSTER_GROUPS_ALLOWED] = [r[0] for r in cursor.fetchall()]
 
-                ret.append(container.ItemData(item, access_model, access_list))
+                ret.append(container.ItemData(item, access_model, access_list, date=date))
         else: #we check permission before returning items
             for itemIdentifier in itemIdentifiers:
                 args = [self.nodeDbId, itemIdentifier]
                 if authorized_groups:
                     args.append(authorized_groups)
-                cursor.execute("""SELECT data FROM nodes
+                cursor.execute("""SELECT data, date FROM nodes
                            INNER  JOIN items USING (node_id)
                            LEFT JOIN item_groups_authorized USING (item_id)
                            WHERE node_id=%s AND item=%s AND
@@ -861,7 +863,7 @@
 
                 result = cursor.fetchone()
                 if result:
-                    ret.append(container.ItemData(generic.stripNamespace(parseXml(result[0])), None, None))
+                    ret.append(container.ItemData(generic.stripNamespace(parseXml(result[0])), date=result[1]))
 
         return ret