Mercurial > libervia-backend
comparison sat/plugins/plugin_xep_0313.py @ 2930:32b6893240e0
plugin XEP-0313: fixed archive retrieval on connection:
last stanza-id is now saved for every message received, and not only retrieved from local message history anymore.
If we are the sender of a message from archive, it is only added to local history, else it is injected to normal workflow.
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 03 May 2019 13:00:08 +0200 |
parents | 003b8b4b56a7 |
children | 93da7c6f8e0c |
comparison
equal
deleted
inserted
replaced
2929:e0429ff7f6b6 | 2930:32b6893240e0 |
---|---|
37 from wokkel import mam | 37 from wokkel import mam |
38 | 38 |
39 | 39 |
40 log = getLogger(__name__) | 40 log = getLogger(__name__) |
41 | 41 |
42 | |
43 MESSAGE_RESULT = "/message/result[@xmlns='{mam_ns}' and @queryid='{query_id}']" | |
44 | |
45 PLUGIN_INFO = { | 42 PLUGIN_INFO = { |
46 C.PI_NAME: u"Message Archive Management", | 43 C.PI_NAME: u"Message Archive Management", |
47 C.PI_IMPORT_NAME: u"XEP-0313", | 44 C.PI_IMPORT_NAME: u"XEP-0313", |
48 C.PI_TYPE: u"XEP", | 45 C.PI_TYPE: u"XEP", |
49 C.PI_PROTOCOLS: [u"XEP-0313"], | 46 C.PI_PROTOCOLS: [u"XEP-0313"], |
53 C.PI_DESCRIPTION: _(u"""Implementation of Message Archive Management"""), | 50 C.PI_DESCRIPTION: _(u"""Implementation of Message Archive Management"""), |
54 } | 51 } |
55 | 52 |
56 MAM_PREFIX = u"mam_" | 53 MAM_PREFIX = u"mam_" |
57 FILTER_PREFIX = MAM_PREFIX + "filter_" | 54 FILTER_PREFIX = MAM_PREFIX + "filter_" |
55 KEY_LAST_STANZA_ID = u"last_stanza_id" | |
56 MESSAGE_RESULT = "/message/result[@xmlns='{mam_ns}' and @queryid='{query_id}']" | |
57 MESSAGE_STANZA_ID = '/message/stanza-id[@xmlns="{ns_stanza_id}"]' | |
58 | 58 |
59 | 59 |
60 class XEP_0313(object): | 60 class XEP_0313(object): |
61 def __init__(self, host): | 61 def __init__(self, host): |
62 log.info(_("Message Archive Management plugin initialization")) | 62 log.info(_("Message Archive Management plugin initialization")) |
63 self.host = host | 63 self.host = host |
64 self.host.registerNamespace(u"mam", mam.NS_MAM) | 64 self.host.registerNamespace(u"mam", mam.NS_MAM) |
65 self._rsm = host.plugins[u"XEP-0059"] | 65 self._rsm = host.plugins[u"XEP-0059"] |
66 self._sid = host.plugins[u"XEP-0359"] | 66 self._sid = host.plugins[u"XEP-0359"] |
67 # Deferred used to store last stanza id in order of reception | |
68 self._last_stanza_id_d = defer.Deferred() | |
69 self._last_stanza_id_d.callback(None) | |
67 host.bridge.addMethod( | 70 host.bridge.addMethod( |
68 "MAMGet", ".plugin", in_sign='sss', | 71 "MAMGet", ".plugin", in_sign='sss', |
69 out_sign='(a(sdssa{ss}a{ss}sa{ss})a{ss}s)', method=self._getArchives, | 72 out_sign='(a(sdssa{ss}a{ss}sa{ss})a{ss}s)', method=self._getArchives, |
70 async=True) | 73 async=True) |
71 | 74 |
72 @defer.inlineCallbacks | 75 @defer.inlineCallbacks |
73 def profileConnected(self, client): | 76 def profileConnected(self, client): |
74 last_mess = yield self.host.memory.historyGet( | 77 stanza_id_data = yield self.host.memory.storage.getPrivates( |
75 None, None, limit=1, filters={u'not_types': C.MESS_TYPE_GROUPCHAT, | 78 mam.NS_MAM, [KEY_LAST_STANZA_ID], profile=client.profile) |
76 u'last_stanza_id': True}, | 79 stanza_id = stanza_id_data.get(KEY_LAST_STANZA_ID) |
77 profile=client.profile) | 80 if stanza_id is None: |
78 if not last_mess: | 81 log.info(u"can't retrieve last stanza ID, checking history") |
79 log.info(_(u"It seems that we have no MAM history yet")) | 82 last_mess = yield self.host.memory.historyGet( |
80 return | 83 None, None, limit=1, filters={u'not_types': C.MESS_TYPE_GROUPCHAT, |
81 stanza_id = last_mess[0][-1][u'stanza_id'] | 84 u'last_stanza_id': True}, |
85 profile=client.profile) | |
86 if not last_mess: | |
87 log.info(_(u"It seems that we have no MAM history yet")) | |
88 return | |
89 stanza_id = last_mess[0][-1][u'stanza_id'] | |
82 rsm_req = rsm.RSMRequest(max_=100, after=stanza_id) | 90 rsm_req = rsm.RSMRequest(max_=100, after=stanza_id) |
83 mam_req = mam.MAMRequest(rsm_=rsm_req) | 91 mam_req = mam.MAMRequest(rsm_=rsm_req) |
84 complete = False | 92 complete = False |
85 count = 0 | 93 count = 0 |
86 while not complete: | 94 while not complete: |
95 else: | 103 else: |
96 count += len(elt_list) | 104 count += len(elt_list) |
97 | 105 |
98 for mess_elt in elt_list: | 106 for mess_elt in elt_list: |
99 try: | 107 try: |
100 fwd_message_elt = self.getMessageFromResult(client, mess_elt, mam_req) | 108 fwd_message_elt = self.getMessageFromResult( |
109 client, mess_elt, mam_req) | |
101 except exceptions.DataError: | 110 except exceptions.DataError: |
102 continue | 111 continue |
103 | 112 |
104 client.messageProt.onMessage(fwd_message_elt) | 113 try: |
114 destinee = jid.JID(fwd_message_elt['to']) | |
115 except KeyError: | |
116 log.warning(_(u'missing "to" attribute in forwarded message')) | |
117 destinee = client.jid | |
118 if destinee.userhostJID() == client.jid.userhostJID(): | |
119 # message to use, we insert the forwarded message in the normal | |
120 # workflow | |
121 client.xmlstream.dispatch(fwd_message_elt) | |
122 else: | |
123 # this message should be from us, we just add it to history | |
124 try: | |
125 from_jid = jid.JID(fwd_message_elt['from']) | |
126 except KeyError: | |
127 log.warning(_(u'missing "from" attribute in forwarded message')) | |
128 from_jid = client.jid | |
129 if from_jid.userhostJID() != client.jid.userhostJID(): | |
130 log.warning(_( | |
131 u'was expecting a message sent by our jid, but this one if ' | |
132 u'from {from_jid}, ignoring\n{xml}').format( | |
133 from_jid=from_jid.full(), xml=mess_elt.toXml())) | |
134 continue | |
135 # adding message to history | |
136 mess_data = client.messageProt.parseMessage(fwd_message_elt) | |
137 yield client.messageProt.addToHistory(mess_data) | |
105 | 138 |
106 if not count: | 139 if not count: |
107 log.info(_(u"We have received no message while offline")) | 140 log.info(_(u"We have received no message while offline")) |
108 else: | 141 else: |
109 log.info(_(u"We have received {num_mess} message(s) while offline.") | 142 log.info(_(u"We have received {num_mess} message(s) while offline.") |
110 .format(num_mess=count)) | 143 .format(num_mess=count)) |
111 | 144 |
112 def getHandler(self, client): | 145 def getHandler(self, client): |
113 mam_client = client._mam = SatMAMClient() | 146 mam_client = client._mam = SatMAMClient(self) |
114 return mam_client | 147 return mam_client |
115 | 148 |
116 def parseExtra(self, extra, with_rsm=True): | 149 def parseExtra(self, extra, with_rsm=True): |
117 """Parse extra dictionnary to retrieve MAM arguments | 150 """Parse extra dictionnary to retrieve MAM arguments |
118 | 151 |
196 @param mess_elt(domish.Element): result <message/> element wrapping the message | 229 @param mess_elt(domish.Element): result <message/> element wrapping the message |
197 to retrieve | 230 to retrieve |
198 @param mam_req(mam.MAMRequest): request used (needed to get query_id) | 231 @param mam_req(mam.MAMRequest): request used (needed to get query_id) |
199 @param service(jid.JID, None): MAM service where the request has been sent | 232 @param service(jid.JID, None): MAM service where the request has been sent |
200 None if it's user server | 233 None if it's user server |
201 @return (domish.Element): <message/> that can be used directly with onMessage | 234 @return domish.Element): <message/> that can be used directly with onMessage |
202 """ | 235 """ |
203 if mess_elt.name != u"message": | 236 if mess_elt.name != u"message": |
204 log.warning(u"unexpected stanza in archive: {xml}".format( | 237 log.warning(u"unexpected stanza in archive: {xml}".format( |
205 xml=mess_elt.toXml())) | 238 xml=mess_elt.toXml())) |
206 raise exceptions.DataError(u"Invalid element") | 239 raise exceptions.DataError(u"Invalid element") |
374 @return: the server response as a Deferred domish.Element | 407 @return: the server response as a Deferred domish.Element |
375 """ | 408 """ |
376 # http://xmpp.org/extensions/xep-0313.html#prefs | 409 # http://xmpp.org/extensions/xep-0313.html#prefs |
377 return client._mam.setPrefs(service, default, always, never) | 410 return client._mam.setPrefs(service, default, always, never) |
378 | 411 |
412 def onMessageStanzaId(self, message_elt, client): | |
413 """Called when a message with a stanza-id is received | |
414 | |
415 the messages' stanza ids are stored when received, so the last one can be used | |
416 to retrieve missing history on next connection | |
417 @param message_elt(domish.Element): <message> with a stanza-id | |
418 """ | |
419 service_jid = client.jid.userhostJID() | |
420 stanza_id = self._sid.getStanzaId(message_elt, service_jid) | |
421 if stanza_id is None: | |
422 log.debug(u"Ignoring <message>, stanza id is not from our server") | |
423 else: | |
424 # we use self._last_stanza_id_d do be sure that last_stanza_id is stored in | |
425 # the order of reception | |
426 self._last_stanza_id_d.addCallback( | |
427 lambda __: self.host.memory.storage.setPrivateValue( | |
428 namespace=mam.NS_MAM, | |
429 key=KEY_LAST_STANZA_ID, | |
430 value=stanza_id, | |
431 profile=client.profile)) | |
432 | |
379 | 433 |
380 class SatMAMClient(mam.MAMClient): | 434 class SatMAMClient(mam.MAMClient): |
381 implements(disco.IDisco) | 435 implements(disco.IDisco) |
382 | 436 |
437 def __init__(self, plugin_parent): | |
438 self.plugin_parent = plugin_parent | |
439 | |
440 @property | |
441 def host(self): | |
442 return self.parent.host_app | |
443 | |
444 def connectionInitialized(self): | |
445 observer_xpath = MESSAGE_STANZA_ID.format( | |
446 ns_stanza_id=self.host.ns_map[u'stanza_id']) | |
447 self.xmlstream.addObserver( | |
448 observer_xpath, self.plugin_parent.onMessageStanzaId, client=self.parent | |
449 ) | |
450 | |
383 def getDiscoInfo(self, requestor, target, nodeIdentifier=""): | 451 def getDiscoInfo(self, requestor, target, nodeIdentifier=""): |
384 return [disco.DiscoFeature(mam.NS_MAM)] | 452 return [disco.DiscoFeature(mam.NS_MAM)] |
385 | 453 |
386 def getDiscoItems(self, requestor, target, nodeIdentifier=""): | 454 def getDiscoItems(self, requestor, target, nodeIdentifier=""): |
387 return [] | 455 return [] |