comparison sat/plugins/plugin_xep_0313.py @ 2930:32b6893240e0

plugin XEP-0313: fixed archive retrieval on connection: last stanza-id is now saved for every message received, and not only retrieved from local message history anymore. If we are the sender of a message from archive, it is only added to local history, else it is injected to normal workflow.
author Goffi <goffi@goffi.org>
date Fri, 03 May 2019 13:00:08 +0200
parents 003b8b4b56a7
children 93da7c6f8e0c
comparison
equal deleted inserted replaced
2929:e0429ff7f6b6 2930:32b6893240e0
37 from wokkel import mam 37 from wokkel import mam
38 38
39 39
40 log = getLogger(__name__) 40 log = getLogger(__name__)
41 41
42
43 MESSAGE_RESULT = "/message/result[@xmlns='{mam_ns}' and @queryid='{query_id}']"
44
45 PLUGIN_INFO = { 42 PLUGIN_INFO = {
46 C.PI_NAME: u"Message Archive Management", 43 C.PI_NAME: u"Message Archive Management",
47 C.PI_IMPORT_NAME: u"XEP-0313", 44 C.PI_IMPORT_NAME: u"XEP-0313",
48 C.PI_TYPE: u"XEP", 45 C.PI_TYPE: u"XEP",
49 C.PI_PROTOCOLS: [u"XEP-0313"], 46 C.PI_PROTOCOLS: [u"XEP-0313"],
53 C.PI_DESCRIPTION: _(u"""Implementation of Message Archive Management"""), 50 C.PI_DESCRIPTION: _(u"""Implementation of Message Archive Management"""),
54 } 51 }
55 52
56 MAM_PREFIX = u"mam_" 53 MAM_PREFIX = u"mam_"
57 FILTER_PREFIX = MAM_PREFIX + "filter_" 54 FILTER_PREFIX = MAM_PREFIX + "filter_"
55 KEY_LAST_STANZA_ID = u"last_stanza_id"
56 MESSAGE_RESULT = "/message/result[@xmlns='{mam_ns}' and @queryid='{query_id}']"
57 MESSAGE_STANZA_ID = '/message/stanza-id[@xmlns="{ns_stanza_id}"]'
58 58
59 59
60 class XEP_0313(object): 60 class XEP_0313(object):
61 def __init__(self, host): 61 def __init__(self, host):
62 log.info(_("Message Archive Management plugin initialization")) 62 log.info(_("Message Archive Management plugin initialization"))
63 self.host = host 63 self.host = host
64 self.host.registerNamespace(u"mam", mam.NS_MAM) 64 self.host.registerNamespace(u"mam", mam.NS_MAM)
65 self._rsm = host.plugins[u"XEP-0059"] 65 self._rsm = host.plugins[u"XEP-0059"]
66 self._sid = host.plugins[u"XEP-0359"] 66 self._sid = host.plugins[u"XEP-0359"]
67 # Deferred used to store last stanza id in order of reception
68 self._last_stanza_id_d = defer.Deferred()
69 self._last_stanza_id_d.callback(None)
67 host.bridge.addMethod( 70 host.bridge.addMethod(
68 "MAMGet", ".plugin", in_sign='sss', 71 "MAMGet", ".plugin", in_sign='sss',
69 out_sign='(a(sdssa{ss}a{ss}sa{ss})a{ss}s)', method=self._getArchives, 72 out_sign='(a(sdssa{ss}a{ss}sa{ss})a{ss}s)', method=self._getArchives,
70 async=True) 73 async=True)
71 74
72 @defer.inlineCallbacks 75 @defer.inlineCallbacks
73 def profileConnected(self, client): 76 def profileConnected(self, client):
74 last_mess = yield self.host.memory.historyGet( 77 stanza_id_data = yield self.host.memory.storage.getPrivates(
75 None, None, limit=1, filters={u'not_types': C.MESS_TYPE_GROUPCHAT, 78 mam.NS_MAM, [KEY_LAST_STANZA_ID], profile=client.profile)
76 u'last_stanza_id': True}, 79 stanza_id = stanza_id_data.get(KEY_LAST_STANZA_ID)
77 profile=client.profile) 80 if stanza_id is None:
78 if not last_mess: 81 log.info(u"can't retrieve last stanza ID, checking history")
79 log.info(_(u"It seems that we have no MAM history yet")) 82 last_mess = yield self.host.memory.historyGet(
80 return 83 None, None, limit=1, filters={u'not_types': C.MESS_TYPE_GROUPCHAT,
81 stanza_id = last_mess[0][-1][u'stanza_id'] 84 u'last_stanza_id': True},
85 profile=client.profile)
86 if not last_mess:
87 log.info(_(u"It seems that we have no MAM history yet"))
88 return
89 stanza_id = last_mess[0][-1][u'stanza_id']
82 rsm_req = rsm.RSMRequest(max_=100, after=stanza_id) 90 rsm_req = rsm.RSMRequest(max_=100, after=stanza_id)
83 mam_req = mam.MAMRequest(rsm_=rsm_req) 91 mam_req = mam.MAMRequest(rsm_=rsm_req)
84 complete = False 92 complete = False
85 count = 0 93 count = 0
86 while not complete: 94 while not complete:
95 else: 103 else:
96 count += len(elt_list) 104 count += len(elt_list)
97 105
98 for mess_elt in elt_list: 106 for mess_elt in elt_list:
99 try: 107 try:
100 fwd_message_elt = self.getMessageFromResult(client, mess_elt, mam_req) 108 fwd_message_elt = self.getMessageFromResult(
109 client, mess_elt, mam_req)
101 except exceptions.DataError: 110 except exceptions.DataError:
102 continue 111 continue
103 112
104 client.messageProt.onMessage(fwd_message_elt) 113 try:
114 destinee = jid.JID(fwd_message_elt['to'])
115 except KeyError:
116 log.warning(_(u'missing "to" attribute in forwarded message'))
117 destinee = client.jid
118 if destinee.userhostJID() == client.jid.userhostJID():
119 # message to use, we insert the forwarded message in the normal
120 # workflow
121 client.xmlstream.dispatch(fwd_message_elt)
122 else:
123 # this message should be from us, we just add it to history
124 try:
125 from_jid = jid.JID(fwd_message_elt['from'])
126 except KeyError:
127 log.warning(_(u'missing "from" attribute in forwarded message'))
128 from_jid = client.jid
129 if from_jid.userhostJID() != client.jid.userhostJID():
130 log.warning(_(
131 u'was expecting a message sent by our jid, but this one if '
132 u'from {from_jid}, ignoring\n{xml}').format(
133 from_jid=from_jid.full(), xml=mess_elt.toXml()))
134 continue
135 # adding message to history
136 mess_data = client.messageProt.parseMessage(fwd_message_elt)
137 yield client.messageProt.addToHistory(mess_data)
105 138
106 if not count: 139 if not count:
107 log.info(_(u"We have received no message while offline")) 140 log.info(_(u"We have received no message while offline"))
108 else: 141 else:
109 log.info(_(u"We have received {num_mess} message(s) while offline.") 142 log.info(_(u"We have received {num_mess} message(s) while offline.")
110 .format(num_mess=count)) 143 .format(num_mess=count))
111 144
112 def getHandler(self, client): 145 def getHandler(self, client):
113 mam_client = client._mam = SatMAMClient() 146 mam_client = client._mam = SatMAMClient(self)
114 return mam_client 147 return mam_client
115 148
116 def parseExtra(self, extra, with_rsm=True): 149 def parseExtra(self, extra, with_rsm=True):
117 """Parse extra dictionnary to retrieve MAM arguments 150 """Parse extra dictionnary to retrieve MAM arguments
118 151
196 @param mess_elt(domish.Element): result <message/> element wrapping the message 229 @param mess_elt(domish.Element): result <message/> element wrapping the message
197 to retrieve 230 to retrieve
198 @param mam_req(mam.MAMRequest): request used (needed to get query_id) 231 @param mam_req(mam.MAMRequest): request used (needed to get query_id)
199 @param service(jid.JID, None): MAM service where the request has been sent 232 @param service(jid.JID, None): MAM service where the request has been sent
200 None if it's user server 233 None if it's user server
201 @return (domish.Element): <message/> that can be used directly with onMessage 234 @return domish.Element): <message/> that can be used directly with onMessage
202 """ 235 """
203 if mess_elt.name != u"message": 236 if mess_elt.name != u"message":
204 log.warning(u"unexpected stanza in archive: {xml}".format( 237 log.warning(u"unexpected stanza in archive: {xml}".format(
205 xml=mess_elt.toXml())) 238 xml=mess_elt.toXml()))
206 raise exceptions.DataError(u"Invalid element") 239 raise exceptions.DataError(u"Invalid element")
374 @return: the server response as a Deferred domish.Element 407 @return: the server response as a Deferred domish.Element
375 """ 408 """
376 # http://xmpp.org/extensions/xep-0313.html#prefs 409 # http://xmpp.org/extensions/xep-0313.html#prefs
377 return client._mam.setPrefs(service, default, always, never) 410 return client._mam.setPrefs(service, default, always, never)
378 411
412 def onMessageStanzaId(self, message_elt, client):
413 """Called when a message with a stanza-id is received
414
415 the messages' stanza ids are stored when received, so the last one can be used
416 to retrieve missing history on next connection
417 @param message_elt(domish.Element): <message> with a stanza-id
418 """
419 service_jid = client.jid.userhostJID()
420 stanza_id = self._sid.getStanzaId(message_elt, service_jid)
421 if stanza_id is None:
422 log.debug(u"Ignoring <message>, stanza id is not from our server")
423 else:
424 # we use self._last_stanza_id_d do be sure that last_stanza_id is stored in
425 # the order of reception
426 self._last_stanza_id_d.addCallback(
427 lambda __: self.host.memory.storage.setPrivateValue(
428 namespace=mam.NS_MAM,
429 key=KEY_LAST_STANZA_ID,
430 value=stanza_id,
431 profile=client.profile))
432
379 433
380 class SatMAMClient(mam.MAMClient): 434 class SatMAMClient(mam.MAMClient):
381 implements(disco.IDisco) 435 implements(disco.IDisco)
382 436
437 def __init__(self, plugin_parent):
438 self.plugin_parent = plugin_parent
439
440 @property
441 def host(self):
442 return self.parent.host_app
443
444 def connectionInitialized(self):
445 observer_xpath = MESSAGE_STANZA_ID.format(
446 ns_stanza_id=self.host.ns_map[u'stanza_id'])
447 self.xmlstream.addObserver(
448 observer_xpath, self.plugin_parent.onMessageStanzaId, client=self.parent
449 )
450
383 def getDiscoInfo(self, requestor, target, nodeIdentifier=""): 451 def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
384 return [disco.DiscoFeature(mam.NS_MAM)] 452 return [disco.DiscoFeature(mam.NS_MAM)]
385 453
386 def getDiscoItems(self, requestor, target, nodeIdentifier=""): 454 def getDiscoItems(self, requestor, target, nodeIdentifier=""):
387 return [] 455 return []