Mercurial > libervia-pubsub
changeset 322:54d90c73b8b5
mam: various improvments:
- put common namespaces ton const
- VAL_RSM_MAX_DEFAULT can be None if default limit is not wanted
- ItemDate now has a 'date' attribute
- MAMService is MonkeyPatched the same way as PubSubService to handle PEP
- fixed error mapping in mam module
- PEP is handled
- properly manage date in a payload independent way
- when PEP is used, send privileged messages
author | Goffi <goffi@goffi.org> |
---|---|
date | Tue, 05 Jan 2016 23:13:13 +0100 |
parents | c7fe09894952 |
children | 8496af26be45 |
files | sat_pubsub/const.py sat_pubsub/container.py sat_pubsub/delegation.py sat_pubsub/mam.py sat_pubsub/pgsql_storage.py |
diffstat | 5 files changed, 118 insertions(+), 72 deletions(-) [+] |
line wrap: on
line diff
--- a/sat_pubsub/const.py Tue Jan 05 22:16:37 2016 +0100 +++ b/sat_pubsub/const.py Tue Jan 05 23:13:13 2016 +0100 @@ -51,10 +51,11 @@ # WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - +NS_CLIENT = 'jabber:client' NS_GROUPBLOG_PREFIX = 'urn:xmpp:groupblog:' NS_ITEM_CONFIG = "http://jabber.org/protocol/pubsub#item-config" NS_ATOM = "http://www.w3.org/2005/Atom" +NS_FORWARD = 'urn:xmpp:forward:0' OPT_ACCESS_MODEL = 'pubsub#access_model' OPT_ROSTER_GROUPS_ALLOWED = 'pubsub#roster_groups_allowed' OPT_PERSIST_ITEMS = "pubsub#persist_items" @@ -69,6 +70,6 @@ VAL_PMODEL_SUBSCRIBERS = 'subscribers' VAL_PMODEL_OPEN = 'open' VAL_PMODEL_DEFAULT = VAL_PMODEL_PUBLISHERS -VAL_RSM_MAX_DEFAULT = 10 +VAL_RSM_MAX_DEFAULT = 10 # None for no limit FLAG_ENABLE_RSM = True FLAG_ENABLE_MAM = True
--- a/sat_pubsub/container.py Tue Jan 05 22:16:37 2016 +0100 +++ b/sat_pubsub/container.py Tue Jan 05 23:13:13 2016 +0100 @@ -20,5 +20,5 @@ from collections import namedtuple -ItemData = namedtuple('ItemData', ('item', 'access_model', 'config', 'categories')) +ItemData = namedtuple('ItemData', ('item', 'access_model', 'config', 'categories', 'date')) ItemData.__new__.__defaults__ = (None,) * (len(ItemData._fields) - 1) # Only item is mandatory
--- a/sat_pubsub/delegation.py Tue Jan 05 22:16:37 2016 +0100 +++ b/sat_pubsub/delegation.py Tue Jan 05 23:13:13 2016 +0100 @@ -26,6 +26,7 @@ from wokkel import data_form from wokkel import disco, iwokkel from wokkel.iwokkel import IPubSubService +from wokkel import mam from twisted.python import log from twisted.words.protocols.jabber import jid, error from twisted.words.protocols.jabber.xmlstream import toResponse @@ -40,11 +41,14 @@ DELEGATION_MAIN_SEP = "::" DELEGATION_BARE_SEP = ":bare:" +TO_HACK = ((IPubSubService, pubsub, "PubSubRequest"), + (mam.IMAMService, mam, "MAMRequest")) + + class InvalidStanza(Exception): pass - class DelegationsHandler(XMPPHandler): implements(iwokkel.IDisco) _service_hacked = False @@ -53,41 +57,42 @@ super(DelegationsHandler, self).__init__() def _service_hack(self): - """Patch the PubSubService to track delegated stanzas""" + """Patch the request classes of services to track delegated stanzas""" # XXX: we need to monkey patch to track origin of the stanza in PubSubRequest. # As PubSubRequest from sat.tmp.wokkel.pubsub use _request_class while # original wokkel.pubsub use directly pubsub.PubSubRequest, we need to # check which version is used before monkeypatching for handler in self.parent.handlers: - if IPubSubService.providedBy(handler): - if hasattr(handler, '_request_class'): - request_base_class = handler._request_class - else: - request_base_class = pubsub.PubSubRequest + for service, module, default_base_cls in TO_HACK: + if service.providedBy(handler): + if hasattr(handler, '_request_class'): + request_base_class = handler._request_class + else: + request_base_class = getattr(module, default_base_cls) - class PubSubRequestWithDelegation(request_base_class): - """A PubSubReques which put an indicator if the stanza comme from delegation""" + class RequestWithDelegation(request_base_class): + """A XxxRequest which put an indicator if the stanza comme from delegation""" - @classmethod - def fromElement(cls, element): - """Check if element comme from delegation, and set a delegated flags + @classmethod + def fromElement(cls, element): + """Check if element comme from delegation, and set a delegated flags - delegated flaf is either False, or it's a jid of the delegating server - the delegated flag must be set on element before use - """ - try: - # __getattr__ is overriden in domish.Element, so we use __getattribute__ - delegated = element.__getattribute__('delegated') - except AttributeError: - delegated = False - instance = cls.__base__.fromElement(element) - instance.delegated = delegated - return instance + delegated flag is either False, or it's a jid of the delegating server + the delegated flag must be set on element before use + """ + try: + # __getattr__ is overriden in domish.Element, so we use __getattribute__ + delegated = element.__getattribute__('delegated') + except AttributeError: + delegated = False + instance = cls.__base__.fromElement(element) + instance.delegated = delegated + return instance - if hasattr(handler, '_request_class'): - handler._request_class = PubSubRequestWithDelegation - else: - pubsub.PubSubRequest = PubSubRequestWithDelegation + if hasattr(handler, '_request_class'): + handler._request_class = RequestWithDelegation + else: + setattr(module, default_base_cls, RequestWithDelegation) DelegationsHandler._service_hacked = True def connectionInitialized(self):
--- a/sat_pubsub/mam.py Tue Jan 05 22:16:37 2016 +0100 +++ b/sat_pubsub/mam.py Tue Jan 05 23:13:13 2016 +0100 @@ -28,32 +28,44 @@ from zope.interface import implements from twisted.words.xish import domish +from twisted.python import log +from twisted.words.protocols.jabber import error from sat_pubsub import const -from sat_pubsub.backend import PubSubResourceFromBackend -from wokkel.pubsub import NS_PUBSUB_EVENT - -from dateutil import parser +from sat_pubsub import backend +from wokkel import pubsub -# TODO: change this when RSM and MAM are in wokkel -from sat.tmp.wokkel import rsm -from sat.tmp.wokkel import mam - -NS_CLIENT = 'jabber:client' +from wokkel import rsm +from wokkel import mam +from wokkel import delay class MAMResource(object): + implements(mam.IMAMResource) + _errorMap = backend.PubSubResourceFromBackend._errorMap - implements(mam.IMAMResource) + def __init__(self, backend_): + self.backend = backend_ + + def _mapErrors(self, failure): + # XXX: come from backend.PubsubResourceFromBackend + e = failure.trap(*self._errorMap.keys()) - def __init__(self, backend): - self.backend = backend + condition, pubsubCondition, feature = self._errorMap[e] + msg = failure.value.msg - def onArchiveRequest(self, mam, requestor): + if pubsubCondition: + exc = pubsub.PubSubError(condition, pubsubCondition, feature, msg) + else: + exc = error.StanzaError(condition, text=msg) + + raise exc + + def onArchiveRequest(self, mam_request, requestor): """ - @param mam: The MAM <query/> request. - @type mam: L{MAMQueryReques<wokkel.mam.MAMQueryRequest>} + @param mam_request: The MAM archive request. + @type mam_request: L{MAMQueryReques<wokkel.mam.MAMRequest>} @param requestor: JID of the requestor. @type requestor: L{JID<twisted.words.protocols.jabber.jid.JID>} @@ -62,42 +74,68 @@ @rtype: C{tuple} """ # FIXME: bad result ordering - ext_data = {} - if mam.form: - ext_data['filters'] = mam.form.fields.values() - if mam.rsm is None: - mam.rsm = rsm.RSMRequest(const.VAL_RSM_MAX_DEFAULT) - ext_data['rsm'] = mam.rsm + try: + pep = mam_request.delegated + except AttributeError: + pep = False + ext_data = {'pep': pep} + if mam_request.form: + ext_data['filters'] = mam_request.form.fields.values() + if mam_request.rsm is None: + if const.VAL_RSM_MAX_DEFAULT != None: + log.msg("MAM request without RSM limited to {}".format(const.VAL_RSM_MAX_DEFAULT)) + ext_data['rsm'] = rsm.RSMRequest(const.VAL_RSM_MAX_DEFAULT) + else: + ext_data['rsm'] = mam_request.rsm - d = self.backend.getItems(mam.node, requestor, mam.rsm.max, None, ext_data) + d = self.backend.getItemsData(mam_request.node, requestor, None, None, ext_data) def make_message(elt): # XXX: http://xmpp.org/extensions/xep-0297.html#sect-idp629952 (rule 3) - message = domish.Element((NS_CLIENT, "message")) - event = message.addElement((NS_PUBSUB_EVENT, "event")) + message = domish.Element((const.NS_CLIENT, "message")) + event = message.addElement((pubsub.NS_PUBSUB_EVENT, "event")) items = event.addElement('items') - items["node"] = mam.node + items["node"] = mam_request.node items.addChild(elt) return message - def cb(elts): + def cb(items_data): msg_data = [] rsm_elt = None - for elt in elts: - if elt.name == 'set' and elt.uri == rsm.NS_RSM: + for item_data in items_data: + if item_data.item.name == 'set' and item_data.item.uri == rsm.NS_RSM: assert rsm_elt is None - rsm_elt = elt - elif elt.name == 'item': - # FIXME: this is not good as it is dependant on payload - # TODO: remove this and use date field in database - date = parser.parse(''.join(elt.entry.published.children)) - msg_data.append([elt['id'], make_message(elt), date]) + rsm_elt = item_data.item + elif item_data.item.name == 'item': + msg_data.append([item_data.item['id'], make_message(item_data.item), item_data.date]) + else: + log.msg("WARNING: unknown element: {}".format(item_data.item.name)) + if pep: + # we need to send privileged message + # so me manage the sending ourself, and return + # an empty msg_data list to avoid double sending + for data in msg_data: + self.forwardPEPMessage(mam_request, requestor, *data) + msg_data = [] return (msg_data, rsm_elt) - d.addErrback(PubSubResourceFromBackend._mapErrors) + d.addErrback(self._mapErrors) d.addCallback(cb) return d + def forwardPEPMessage(self, mam_request, requestor, id_, elt, date): + msg = domish.Element((None, 'message')) + msg['from'] = self.backend.privilege.server_jid.full() + msg['to'] = requestor.full() + result = msg.addElement((mam.NS_MAM, 'result')) + if mam_request.query_id is not None: + result['queryid'] = mam_request.query_id + result['id'] = id_ + forward = result.addElement((const.NS_FORWARD, 'forwarded')) + forward.addChild(delay.Delay(date).toElement()) + forward.addChild(elt) + self.backend.privilege.sendMessage(msg) + def onPrefsGetRequest(self, requestor): """
--- a/sat_pubsub/pgsql_storage.py Tue Jan 05 22:16:37 2016 +0100 +++ b/sat_pubsub/pgsql_storage.py Tue Jan 05 23:13:13 2016 +0100 @@ -749,7 +749,7 @@ args = [] # SELECT - query = ["SELECT data,items.access_model,item_id"] + query = ["SELECT data,items.access_model,item_id,date"] query_order = self._appendSourcesAndFilters(query, args, authorized_groups, unrestricted, ext_data) @@ -801,15 +801,16 @@ item = generic.stripNamespace(parseXml(data[0])) access_model = data[1] item_id = data[2] + date = data[3] access_list = {} if access_model == const.VAL_AMODEL_ROSTER: #TODO: jid access_model cursor.execute('SELECT groupname FROM item_groups_authorized WHERE item_id=%s', (item_id,)) access_list[const.OPT_ROSTER_GROUPS_ALLOWED] = [r[0] for r in cursor.fetchall()] - ret.append(container.ItemData(item, access_model, access_list)) + ret.append(container.ItemData(item, access_model, access_list, date=date)) return ret - items_data = [container.ItemData(generic.stripNamespace(parseXml(r[0])), None, None) for r in result] + items_data = [container.ItemData(generic.stripNamespace(parseXml(r[0])), r[1], r[2], date=r[3]) for r in result] return items_data def getItemsById(self, authorized_groups, unrestricted, itemIdentifiers): @@ -828,7 +829,7 @@ ret = [] if unrestricted: #we get everything without checking permissions for itemIdentifier in itemIdentifiers: - cursor.execute("""SELECT data,items.access_model,item_id FROM nodes + cursor.execute("""SELECT data,items.access_model,item_id,date FROM nodes INNER JOIN items USING (node_id) WHERE node_id=%s AND item=%s""", (self.nodeDbId, @@ -840,18 +841,19 @@ item = generic.stripNamespace(parseXml(result[0])) access_model = result[1] item_id = result[2] + date= result[3] access_list = {} if access_model == const.VAL_AMODEL_ROSTER: #TODO: jid access_model cursor.execute('SELECT groupname FROM item_groups_authorized WHERE item_id=%s', (item_id,)) access_list[const.OPT_ROSTER_GROUPS_ALLOWED] = [r[0] for r in cursor.fetchall()] - ret.append(container.ItemData(item, access_model, access_list)) + ret.append(container.ItemData(item, access_model, access_list, date=date)) else: #we check permission before returning items for itemIdentifier in itemIdentifiers: args = [self.nodeDbId, itemIdentifier] if authorized_groups: args.append(authorized_groups) - cursor.execute("""SELECT data FROM nodes + cursor.execute("""SELECT data, date FROM nodes INNER JOIN items USING (node_id) LEFT JOIN item_groups_authorized USING (item_id) WHERE node_id=%s AND item=%s AND @@ -861,7 +863,7 @@ result = cursor.fetchone() if result: - ret.append(container.ItemData(generic.stripNamespace(parseXml(result[0])), None, None)) + ret.append(container.ItemData(generic.stripNamespace(parseXml(result[0])), date=result[1])) return ret