# HG changeset patch # User Goffi # Date 1452032422 -3600 # Node ID 4fc1bf1af48fe9d85afbafcd7b6ef477d3f10527 # Parent 0876352459e5b292668c35178457f416e3daff0b plugin XEP-0313: cleaning and improvments: - moved common namespaces to core.constants - removed useless dependencies and recommendations - removed all bride methods as they where there only for testing purpose - don't use a plugin dict to store MAM clients, put them in profiles' clients instead - removed message trigger in favor of an observer which is triggered only when specific queryid is used - new getArchives method which grab all result and return them all at once diff -r 0876352459e5 -r 4fc1bf1af48f src/core/constants.py --- a/src/core/constants.py Tue Jan 05 23:20:22 2016 +0100 +++ b/src/core/constants.py Tue Jan 05 23:20:22 2016 +0100 @@ -100,7 +100,7 @@ MESS_TYPE_NORMAL = 'normal' - ## PRESENCE ## + ## Presence ## PRESENCE_UNAVAILABLE = 'unavailable' PRESENCE_SHOW_AWAY = 'away' PRESENCE_SHOW_CHAT = 'chat' @@ -112,6 +112,12 @@ PRESENCE_PRIORITY = 'priority' + ## Common namespaces ## + NS_CLIENT = 'jabber:client' + NS_FORWARD = 'urn:xmpp:forward:0' + NS_DELAY = 'urn:xmpp:delay' + + ## Configuration ## if BaseDirectory: # skipped when xdg module is not available (should not happen in backend) diff -r 0876352459e5 -r 4fc1bf1af48f src/plugins/plugin_xep_0313.py --- a/src/plugins/plugin_xep_0313.py Tue Jan 05 23:20:22 2016 +0100 +++ b/src/plugins/plugin_xep_0313.py Tue Jan 05 23:20:22 2016 +0100 @@ -22,37 +22,27 @@ from sat.core.i18n import _ from sat.core.log import getLogger log = getLogger(__name__) +from sat.core import exceptions -try: - from twisted.words.protocols.xmlstream import XMPPHandler -except ImportError: - from wokkel.subprotocols import XMPPHandler -from twisted.words.xish import domish from twisted.words.protocols.jabber import jid from zope.interface import implements -from wokkel import disco, data_form -from wokkel.generic import parseXml -from wokkel.pubsub import NS_PUBSUB_EVENT, ItemsEvent +from wokkel import disco +import uuid -# TODO: change this when RSM and MAM are in wokkel -from sat.tmp.wokkel.rsm import RSMRequest -from sat.tmp.wokkel import mam +# XXX: mam and rsm come from tmp.wokkel +from wokkel import rsm +from wokkel import mam -NS_MAM = 'urn:xmpp:mam:0' -NS_SF = 'urn:xmpp:forward:0' -NS_DD = 'urn:xmpp:delay' -NS_CLIENT = 'jabber:client' +MESSAGE_RESULT = "/message/result[@xmlns='{mam_ns}' and @queryid='{query_id}']" PLUGIN_INFO = { "name": "Message Archive Management", "import_name": "XEP-0313", "type": "XEP", "protocols": ["XEP-0313"], - "dependencies": ["XEP-0059", "XEP-0297", "XEP-0203"], - "recommendations": ["XEP-0334"], "main": "XEP_0313", "handler": "yes", "description": _("""Implementation of Message Archive Management""") @@ -64,108 +54,71 @@ def __init__(self, host): log.info(_("Message Archive Management plugin initialization")) self.host = host - self.clients = {} # bind profile name to SatMAMClient - host.bridge.addMethod("MAMqueryFields", ".plugin", in_sign='ss', out_sign='s', - method=self._queryFields, - async=True, - doc={}) - host.bridge.addMethod("MAMqueryArchive", ".plugin", in_sign='ssa{ss}ss', out_sign='s', - method=self._queryArchive, - async=True, - doc={}) - host.bridge.addMethod("MAMgetPrefs", ".plugin", in_sign='ss', out_sign='s', - method=self._getPrefs, - async=True, - doc={}) - host.bridge.addMethod("MAMsetPrefs", ".plugin", in_sign='ssasass', out_sign='s', - method=self._setPrefs, - async=True, - doc={}) - host.trigger.add("MessageReceived", self.messageReceivedTrigger) def getHandler(self, profile): - self.clients[profile] = SatMAMClient(self, profile) - return self.clients[profile] + client = self.host.getClient(profile) + mam_client = client._mam = SatMAMClient() + return mam_client - def profileDisconnected(self, profile): - try: - del self.clients[profile] - except KeyError: - pass - - def _queryFields(self, service_s=None, profile_key=C.PROF_KEY_NONE): - service = jid.JID(service_s) if service_s else None - return self.queryFields(service, profile_key) - - def queryFields(self, service=None, profile_key=C.PROF_KEY_NONE): - """Ask the server about additional supported fields. + def queryFields(self, client, service=None): + """Ask the server about supported fields. @param service: entity offering the MAM service (None for user archives) - @param profile_key (unicode): %(doc_profile_key)s - @return: the server response as a Deferred domish.Element + @return (D(data_form.Form)): form with the implemented fields (cf XEP-0313 ยง4.1.5) """ - # http://xmpp.org/extensions/xep-0313.html#query-form - def eb(failure): - # typically StanzaError with condition u'service-unavailable' - log.error(failure.getErrorMessage()) - return '' - - profile = self.host.memory.getProfileName(profile_key) - d = self.clients[profile].queryFields(service) - return d.addCallbacks(lambda elt: elt.toXml(), eb) + return client._mam.queryFields(service) - def _queryArchive(self, service_s=None, form_xml=None, rsm_dict=None, node=None, profile_key=C.PROF_KEY_NONE): - service = jid.JID(service_s) if service_s else None - if form_xml: - form = data_form.Form.fromElement(parseXml(form_xml)) - if form.formNamespace != NS_MAM: - log.error(_(u"Expected a MAM Data Form, got instead: %s") % form.formNamespace) - form = None - else: - form = None - rsm = RSMRequest(**rsm_dict) if rsm_dict else None - return self.queryArchive(service, form, rsm, node, profile_key) - - def queryArchive(self, service=None, form=None, rsm=None, node=None, profile_key=C.PROF_KEY_NONE): + def queryArchive(self, client, mam_query, service=None): """Query a user, MUC or pubsub archive. - @param service: entity offering the MAM service (None for user archives) - @param form (Form): data form to filter the request - @param rsm (RSMRequest): RSM request instance - @param node (unicode): pubsub node to query, or None if inappropriate - @param profile_key (unicode): %(doc_profile_key)s - @return: a Deferred when the message has been sent + @param mam_query(mam.MAMRequest): MAM query instance + @param service(jid.JID, None): entity offering the MAM service + None for user server + @return (D(domish.Element)): result """ - def eb(failure): - # typically StanzaError with condition u'service-unavailable' - log.error(failure.getErrorMessage()) - return '' + return client._mam.queryArchive(mam_query, service) + + def _appendMessage(self, elt_list, message_cb, message_elt): + if message_cb is not None: + elt_list.append(message_cb(message_elt)) + else: + elt_list.append(message_elt) + + def _queryFinished(self, iq_result, client, elt_list, event): + client.xmlstream.removeObserver(event, self._appendMessage) + try: + fin_elt = iq_result.elements(mam.NS_MAM, 'fin').next() + except StopIteration: + raise exceptions.DataError(u"Invalid MAM result") - profile = self.host.memory.getProfileName(profile_key) - d = self.clients[profile].queryArchive(service, form, rsm, node) - return d.addCallbacks(lambda elt: elt.toXml(), eb) - # TODO: add the handler for receiving the final message + try: + rsm_response = rsm.RSMResponse.fromElement(fin_elt) + except rsm.RSMNotFoundError: + rsm_response = None + + return (elt_list, rsm_response) + + def getArchives(self, client, query, service=None, message_cb=None): + """Query archive then grab and return them all in the result - def _getPrefs(self, service_s=None, profile_key=C.PROF_KEY_NONE): - service = jid.JID(service_s) if service_s else None - return self.getPrefs(service, profile_key) + """ + if query.query_id is None: + query.query_id = unicode(uuid.uuid4()) + elt_list = [] + event = MESSAGE_RESULT.format(mam_ns=mam.NS_MAM, query_id=query.query_id) + client.xmlstream.addObserver(event, self._appendMessage, 0, elt_list, message_cb) + d = self.queryArchive(client, query, service) + d.addCallback(self._queryFinished, client, elt_list, event) + return d - def getPrefs(self, service=None, profile_key=C.PROF_KEY_NONE): + def getPrefs(self, client, service=None): """Retrieve the current user preferences. @param service: entity offering the MAM service (None for user archives) - @param profile_key (unicode): %(doc_profile_key)s @return: the server response as a Deferred domish.Element """ # http://xmpp.org/extensions/xep-0313.html#prefs - def eb(failure): - # typically StanzaError with condition u'service-unavailable' - log.error(failure.getErrorMessage()) - return '' - - profile = self.host.memory.getProfileName(profile_key) - d = self.clients[profile].queryPrefs(service) - return d.addCallbacks(lambda elt: elt.toXml(), eb) + return client._mam.queryPrefs(service) def _setPrefs(self, service_s=None, default='roster', always=None, never=None, profile_key=C.PROF_KEY_NONE): service = jid.JID(service_s) if service_s else None @@ -174,7 +127,7 @@ #TODO: why not build here a MAMPrefs object instead of passing the args separately? return self.setPrefs(service, default, always_jid, never_jid, profile_key) - def setPrefs(self, service=None, default='roster', always=None, never=None, profile_key=C.PROF_KEY_NONE): + def setPrefs(self, client, service=None, default='roster', always=None, never=None): """Set news user preferences. @param service: entity offering the MAM service (None for user archives) @@ -185,63 +138,14 @@ @return: the server response as a Deferred domish.Element """ # http://xmpp.org/extensions/xep-0313.html#prefs - def eb(failure): - # typically StanzaError with condition u'service-unavailable' - log.error(failure.getErrorMessage()) - return '' - - profile = self.host.memory.getProfileName(profile_key) - d = self.clients[profile].setPrefs(service, default, always, never) - return d.addCallbacks(lambda elt: elt.toXml(), eb) - - def messageReceivedTrigger(self, message, post_treat, profile): - """Check if the message is a MAM result. If so, extract the original - message, stop processing the current message and process the original - message instead. - """ - try: - result = domish.generateElementsQNamed(message.elements(), "result", NS_MAM).next() - except StopIteration: - return True - try: - forwarded = domish.generateElementsQNamed(result.elements(), "forwarded", NS_SF).next() - except StopIteration: - log.error(_("MAM result misses its mandatory element!")) - return False - try: - # TODO: delay is not here for nothing, get benefice of it! - delay = domish.generateElementsQNamed(forwarded.elements(), "delay", NS_DD).next() - msg = domish.generateElementsQNamed(forwarded.elements(), "message", NS_CLIENT).next() - except StopIteration: - log.error(_(" element misses a mandatory child!")) - return False - log.debug(_("MAM found a forwarded message")) - - if msg.event and msg.event.uri == NS_PUBSUB_EVENT: - event = ItemsEvent(jid.JID(message['from']), - jid.JID(message['to']), - msg.event.items['node'], - msg.event.items.elements(), - {}) - self.host.plugins["XEP-0060"].clients[profile].itemsReceived(event) - return False - - client = self.host.getClient(profile) - client.messageProt.onMessage(msg) - return False + return client._mam.setPrefs(service, default, always, never) class SatMAMClient(mam.MAMClient): implements(disco.IDisco) - def __init__(self, plugin_parent, profile): - self.plugin_parent = plugin_parent - self.host = plugin_parent.host - self.profile = profile - mam.MAMClient.__init__(self) - def getDiscoInfo(self, requestor, target, nodeIdentifier=''): - return [disco.DiscoFeature(NS_MAM)] + return [disco.DiscoFeature(mam.NS_MAM)] def getDiscoItems(self, requestor, target, nodeIdentifier=''): return []