comparison sat/plugins/plugin_xep_0313.py @ 2718:bb6adaa580ee

plugin XEP-0313, XEP-0045: loop MAM requests until whole archive is retrieved: - plugin xep-0313: new serialise method - : getArchives now returns an extra mam_response dict with "complete" and "stable" status - : MAMGet now return a new metadata dict before profile (with serialised RSM and MAM response)
author Goffi <goffi@goffi.org>
date Mon, 10 Dec 2018 20:34:45 +0100
parents 19000c506d0c
children 3480d4fdf83a
comparison
equal deleted inserted replaced
2717:e3f6de6ce320 2718:bb6adaa580ee
63 self.host = host 63 self.host = host
64 self.host.registerNamespace(u"mam", mam.NS_MAM) 64 self.host.registerNamespace(u"mam", mam.NS_MAM)
65 self._rsm = host.plugins[u"XEP-0059"] 65 self._rsm = host.plugins[u"XEP-0059"]
66 self._sid = host.plugins[u"XEP-0359"] 66 self._sid = host.plugins[u"XEP-0359"]
67 host.bridge.addMethod( 67 host.bridge.addMethod(
68 "MAMGet", ".plugin", in_sign='sss', out_sign='(a(sdssa{ss}a{ss}sa{ss})s)', method=self._getArchives, 68 "MAMGet", ".plugin", in_sign='sss',
69 out_sign='(a(sdssa{ss}a{ss}sa{ss})a{ss}s)', method=self._getArchives,
69 async=True) 70 async=True)
70 71
71 @defer.inlineCallbacks 72 @defer.inlineCallbacks
72 def profileConnected(self, client): 73 def profileConnected(self, client):
73 last_mess = yield self.host.memory.historyGet( 74 last_mess = yield self.host.memory.historyGet(
76 profile=client.profile) 77 profile=client.profile)
77 if not last_mess: 78 if not last_mess:
78 log.info(_(u"It seems that we have no MAM history yet")) 79 log.info(_(u"It seems that we have no MAM history yet"))
79 return 80 return
80 stanza_id = last_mess[0][-1][u'stanza_id'] 81 stanza_id = last_mess[0][-1][u'stanza_id']
81 rsm_req = rsm.RSMRequest(after=stanza_id) 82 rsm_req = rsm.RSMRequest(max_=100, after=stanza_id)
82 mam_req = mam.MAMRequest(rsm_=rsm_req) 83 mam_req = mam.MAMRequest(rsm_=rsm_req)
83 mam_data = yield self.getArchives(client, mam_req, 84 complete = False
84 service=client.jid.userhostJID()) 85 count = 0
85 elt_list, rsm_response = mam_data 86 while not complete:
86 if not elt_list: 87 mam_data = yield self.getArchives(client, mam_req,
88 service=client.jid.userhostJID())
89 elt_list, rsm_response, mam_response = mam_data
90 complete = mam_response[u"complete"]
91 # we update MAM request for next iteration
92 mam_req.rsm.after = rsm_response.last
93 if not elt_list:
94 break
95 else:
96 count += len(elt_list)
97
98 for mess_elt in elt_list:
99 try:
100 fwd_message_elt = self.getMessageFromResult(client, mess_elt, mam_req)
101 except exceptions.DataError:
102 continue
103
104 client.messageProt.onMessage(fwd_message_elt)
105
106 if not count:
87 log.info(_(u"We have received no message while offline")) 107 log.info(_(u"We have received no message while offline"))
88 return
89 else: 108 else:
90 log.info(_(u"We have received {num_mess} message(s) while offline.").format( 109 log.info(_(u"We have received {num_mess} message(s) while offline.")
91 num_mess=len(elt_list))) 110 .format(num_mess=count))
92
93 for mess_elt in elt_list:
94 try:
95 fwd_message_elt = self.getMessageFromResult(client, mess_elt, mam_req)
96 except exceptions.DataError:
97 continue
98
99 client.messageProt.onMessage(fwd_message_elt)
100 111
101 def getHandler(self, client): 112 def getHandler(self, client):
102 mam_client = client._mam = SatMAMClient() 113 mam_client = client._mam = SatMAMClient()
103 return mam_client 114 return mam_client
104 115
151 if form_args: 162 if form_args:
152 mam_args["form"] = mam.buildForm(**form_args) 163 mam_args["form"] = mam.buildForm(**form_args)
153 164
154 return mam.MAMRequest(**mam_args) if mam_args else None 165 return mam.MAMRequest(**mam_args) if mam_args else None
155 166
167 def serialise(self, mam_response, data=None):
168 """Serialise data for MAM
169
170 Key set in data can be:
171 - mam_complete: a bool const indicating if all items have been received
172 - mam_stable: a bool const which is False if items order may be changed
173 All values are set as strings.
174 @param mam_response(dict): response data to serialise
175 @param data(dict, None): dict to update with mam_* data.
176 If None, a new dict is created
177 @return (dict): data dict
178 """
179 if data is None:
180 data = {}
181 data[u"mam_complete"] = C.boolConst(mam_response[u'complete'])
182 data[u"mam_stable"] = C.boolConst(mam_response[u'stable'])
183 return data
184
156 def getMessageFromResult(self, client, mess_elt, mam_req, service=None): 185 def getMessageFromResult(self, client, mess_elt, mam_req, service=None):
157 """Extract usable <message/> from MAM query result 186 """Extract usable <message/> from MAM query result
158 187
159 The message will be validated, and stanza-id/delay will be added if necessary. 188 The message will be validated, and stanza-id/delay will be added if necessary.
160 @param mess_elt(domish.Element): result <message/> element wrapping the message 189 @param mess_elt(domish.Element): result <message/> element wrapping the message
161 to retrieve 190 to retrieve
162 @param mam_req(mam.MAMRequest): request used 191 @param mam_req(mam.MAMRequest): request used (needed to get query_id)
163 @param service(jid.JID, None): MAM service where the request has been sent 192 @param service(jid.JID, None): MAM service where the request has been sent
164 None if it's user server 193 None if it's user server
165 @return (domish.Element): <message/> that can be used directly with onMessage 194 @return (domish.Element): <message/> that can be used directly with onMessage
166 """ 195 """
167 if mess_elt.name != u"message": 196 if mess_elt.name != u"message":
244 try: 273 try:
245 fin_elt = iq_result.elements(mam.NS_MAM, "fin").next() 274 fin_elt = iq_result.elements(mam.NS_MAM, "fin").next()
246 except StopIteration: 275 except StopIteration:
247 raise exceptions.DataError(u"Invalid MAM result") 276 raise exceptions.DataError(u"Invalid MAM result")
248 277
278 mam_response = {u"complete": C.bool(fin_elt.getAttribute(u"complete", C.BOOL_FALSE)),
279 u"stable": C.bool(fin_elt.getAttribute(u"stable", C.BOOL_TRUE))}
280
249 try: 281 try:
250 rsm_response = rsm.RSMResponse.fromElement(fin_elt) 282 rsm_response = rsm.RSMResponse.fromElement(fin_elt)
251 except rsm.RSMNotFoundError: 283 except rsm.RSMNotFoundError:
252 rsm_response = None 284 rsm_response = None
253 285
254 return (elt_list, rsm_response) 286 return (elt_list, rsm_response, mam_response)
255 287
256 def serializeArchiveResult(self, data, client, mam_req, service): 288 def serializeArchiveResult(self, data, client, mam_req, service):
257 elt_list, rsm_response = data 289 elt_list, rsm_response, mam_response = data
258 mess_list = [] 290 mess_list = []
259 for elt in elt_list: 291 for elt in elt_list:
260 fwd_message_elt = self.getMessageFromResult(client, elt, mam_req, 292 fwd_message_elt = self.getMessageFromResult(client, elt, mam_req,
261 service=service) 293 service=service)
262 mess_data = client.messageProt.parseMessage(fwd_message_elt) 294 mess_data = client.messageProt.parseMessage(fwd_message_elt)
263 mess_list.append(client.messageGetBridgeArgs(mess_data)) 295 mess_list.append(client.messageGetBridgeArgs(mess_data))
264 return mess_list, client.profile 296 metadata = self._rsm.serialise(rsm_response)
297 self.serialise(mam_response, metadata)
298 return mess_list, metadata, client.profile
265 299
266 def _getArchives(self, service, extra_ser, profile_key): 300 def _getArchives(self, service, extra_ser, profile_key):
301 """
302 @return: tuple with:
303 - list of message with same data as in bridge.messageNew
304 - response metadata with:
305 - rsm data (rsm_first, rsm_last, rsm_count, rsm_index)
306 - mam data (mam_complete, mam_stable)
307 - profile
308 """
267 client = self.host.getClient(profile_key) 309 client = self.host.getClient(profile_key)
268 service = jid.JID(service) if service else None 310 service = jid.JID(service) if service else None
269 extra = data_format.deserialise(extra_ser, {}) 311 extra = data_format.deserialise(extra_ser, {})
270 mam_req = self.parseExtra(extra) 312 mam_req = self.parseExtra(extra)
271 313
272 d = self.getArchives(client, mam_req, service=service) 314 d = self.getArchives(client, mam_req, service=service)
273 d.addCallback(self.serializeArchiveResult, client, mam_req, service) 315 d.addCallback(self.serializeArchiveResult, client, mam_req, service)
274 return d 316 return d
275 317
276 def getArchives(self, client, query, service=None, message_cb=None): 318 def getArchives(self, client, query, service=None, message_cb=None):
277 """Query archive then grab and return them all in the result 319 """Query archive and gather page result
278 320
321 @param query(mam.MAMRequest): MAM request
322 @param service(jid.JID, None): MAM service to use
323 None to use our own server
324 @param message_cb(callable, None): callback to use on each message
325 this method can be used to unwrap messages
326 @return (tuple[list[domish.Element], rsm.RSMResponse, dict): result data with:
327 - list of found elements
328 - RSM response
329 - MAM response, which is a dict with following value:
330 - complete: a boolean which is True if all items have been received
331 - stable: a boolean which is False if items order may be changed
279 """ 332 """
280 if query.query_id is None: 333 if query.query_id is None:
281 query.query_id = unicode(uuid.uuid4()) 334 query.query_id = unicode(uuid.uuid4())
282 elt_list = [] 335 elt_list = []
283 event = MESSAGE_RESULT.format(mam_ns=mam.NS_MAM, query_id=query.query_id) 336 event = MESSAGE_RESULT.format(mam_ns=mam.NS_MAM, query_id=query.query_id)