Mercurial > libervia-backend
comparison sat/plugins/plugin_xep_0313.py @ 2718:bb6adaa580ee
plugin XEP-0313, XEP-0045: loop MAM requests until whole archive is retrieved:
- plugin xep-0313: new serialise method
- : getArchives now returns an extra mam_response dict with "complete" and "stable" status
- : MAMGet now return a new metadata dict before profile (with serialised RSM and MAM response)
author | Goffi <goffi@goffi.org> |
---|---|
date | Mon, 10 Dec 2018 20:34:45 +0100 |
parents | 19000c506d0c |
children | 3480d4fdf83a |
comparison
equal
deleted
inserted
replaced
2717:e3f6de6ce320 | 2718:bb6adaa580ee |
---|---|
63 self.host = host | 63 self.host = host |
64 self.host.registerNamespace(u"mam", mam.NS_MAM) | 64 self.host.registerNamespace(u"mam", mam.NS_MAM) |
65 self._rsm = host.plugins[u"XEP-0059"] | 65 self._rsm = host.plugins[u"XEP-0059"] |
66 self._sid = host.plugins[u"XEP-0359"] | 66 self._sid = host.plugins[u"XEP-0359"] |
67 host.bridge.addMethod( | 67 host.bridge.addMethod( |
68 "MAMGet", ".plugin", in_sign='sss', out_sign='(a(sdssa{ss}a{ss}sa{ss})s)', method=self._getArchives, | 68 "MAMGet", ".plugin", in_sign='sss', |
69 out_sign='(a(sdssa{ss}a{ss}sa{ss})a{ss}s)', method=self._getArchives, | |
69 async=True) | 70 async=True) |
70 | 71 |
71 @defer.inlineCallbacks | 72 @defer.inlineCallbacks |
72 def profileConnected(self, client): | 73 def profileConnected(self, client): |
73 last_mess = yield self.host.memory.historyGet( | 74 last_mess = yield self.host.memory.historyGet( |
76 profile=client.profile) | 77 profile=client.profile) |
77 if not last_mess: | 78 if not last_mess: |
78 log.info(_(u"It seems that we have no MAM history yet")) | 79 log.info(_(u"It seems that we have no MAM history yet")) |
79 return | 80 return |
80 stanza_id = last_mess[0][-1][u'stanza_id'] | 81 stanza_id = last_mess[0][-1][u'stanza_id'] |
81 rsm_req = rsm.RSMRequest(after=stanza_id) | 82 rsm_req = rsm.RSMRequest(max_=100, after=stanza_id) |
82 mam_req = mam.MAMRequest(rsm_=rsm_req) | 83 mam_req = mam.MAMRequest(rsm_=rsm_req) |
83 mam_data = yield self.getArchives(client, mam_req, | 84 complete = False |
84 service=client.jid.userhostJID()) | 85 count = 0 |
85 elt_list, rsm_response = mam_data | 86 while not complete: |
86 if not elt_list: | 87 mam_data = yield self.getArchives(client, mam_req, |
88 service=client.jid.userhostJID()) | |
89 elt_list, rsm_response, mam_response = mam_data | |
90 complete = mam_response[u"complete"] | |
91 # we update MAM request for next iteration | |
92 mam_req.rsm.after = rsm_response.last | |
93 if not elt_list: | |
94 break | |
95 else: | |
96 count += len(elt_list) | |
97 | |
98 for mess_elt in elt_list: | |
99 try: | |
100 fwd_message_elt = self.getMessageFromResult(client, mess_elt, mam_req) | |
101 except exceptions.DataError: | |
102 continue | |
103 | |
104 client.messageProt.onMessage(fwd_message_elt) | |
105 | |
106 if not count: | |
87 log.info(_(u"We have received no message while offline")) | 107 log.info(_(u"We have received no message while offline")) |
88 return | |
89 else: | 108 else: |
90 log.info(_(u"We have received {num_mess} message(s) while offline.").format( | 109 log.info(_(u"We have received {num_mess} message(s) while offline.") |
91 num_mess=len(elt_list))) | 110 .format(num_mess=count)) |
92 | |
93 for mess_elt in elt_list: | |
94 try: | |
95 fwd_message_elt = self.getMessageFromResult(client, mess_elt, mam_req) | |
96 except exceptions.DataError: | |
97 continue | |
98 | |
99 client.messageProt.onMessage(fwd_message_elt) | |
100 | 111 |
101 def getHandler(self, client): | 112 def getHandler(self, client): |
102 mam_client = client._mam = SatMAMClient() | 113 mam_client = client._mam = SatMAMClient() |
103 return mam_client | 114 return mam_client |
104 | 115 |
151 if form_args: | 162 if form_args: |
152 mam_args["form"] = mam.buildForm(**form_args) | 163 mam_args["form"] = mam.buildForm(**form_args) |
153 | 164 |
154 return mam.MAMRequest(**mam_args) if mam_args else None | 165 return mam.MAMRequest(**mam_args) if mam_args else None |
155 | 166 |
167 def serialise(self, mam_response, data=None): | |
168 """Serialise data for MAM | |
169 | |
170 Key set in data can be: | |
171 - mam_complete: a bool const indicating if all items have been received | |
172 - mam_stable: a bool const which is False if items order may be changed | |
173 All values are set as strings. | |
174 @param mam_response(dict): response data to serialise | |
175 @param data(dict, None): dict to update with mam_* data. | |
176 If None, a new dict is created | |
177 @return (dict): data dict | |
178 """ | |
179 if data is None: | |
180 data = {} | |
181 data[u"mam_complete"] = C.boolConst(mam_response[u'complete']) | |
182 data[u"mam_stable"] = C.boolConst(mam_response[u'stable']) | |
183 return data | |
184 | |
156 def getMessageFromResult(self, client, mess_elt, mam_req, service=None): | 185 def getMessageFromResult(self, client, mess_elt, mam_req, service=None): |
157 """Extract usable <message/> from MAM query result | 186 """Extract usable <message/> from MAM query result |
158 | 187 |
159 The message will be validated, and stanza-id/delay will be added if necessary. | 188 The message will be validated, and stanza-id/delay will be added if necessary. |
160 @param mess_elt(domish.Element): result <message/> element wrapping the message | 189 @param mess_elt(domish.Element): result <message/> element wrapping the message |
161 to retrieve | 190 to retrieve |
162 @param mam_req(mam.MAMRequest): request used | 191 @param mam_req(mam.MAMRequest): request used (needed to get query_id) |
163 @param service(jid.JID, None): MAM service where the request has been sent | 192 @param service(jid.JID, None): MAM service where the request has been sent |
164 None if it's user server | 193 None if it's user server |
165 @return (domish.Element): <message/> that can be used directly with onMessage | 194 @return (domish.Element): <message/> that can be used directly with onMessage |
166 """ | 195 """ |
167 if mess_elt.name != u"message": | 196 if mess_elt.name != u"message": |
244 try: | 273 try: |
245 fin_elt = iq_result.elements(mam.NS_MAM, "fin").next() | 274 fin_elt = iq_result.elements(mam.NS_MAM, "fin").next() |
246 except StopIteration: | 275 except StopIteration: |
247 raise exceptions.DataError(u"Invalid MAM result") | 276 raise exceptions.DataError(u"Invalid MAM result") |
248 | 277 |
278 mam_response = {u"complete": C.bool(fin_elt.getAttribute(u"complete", C.BOOL_FALSE)), | |
279 u"stable": C.bool(fin_elt.getAttribute(u"stable", C.BOOL_TRUE))} | |
280 | |
249 try: | 281 try: |
250 rsm_response = rsm.RSMResponse.fromElement(fin_elt) | 282 rsm_response = rsm.RSMResponse.fromElement(fin_elt) |
251 except rsm.RSMNotFoundError: | 283 except rsm.RSMNotFoundError: |
252 rsm_response = None | 284 rsm_response = None |
253 | 285 |
254 return (elt_list, rsm_response) | 286 return (elt_list, rsm_response, mam_response) |
255 | 287 |
256 def serializeArchiveResult(self, data, client, mam_req, service): | 288 def serializeArchiveResult(self, data, client, mam_req, service): |
257 elt_list, rsm_response = data | 289 elt_list, rsm_response, mam_response = data |
258 mess_list = [] | 290 mess_list = [] |
259 for elt in elt_list: | 291 for elt in elt_list: |
260 fwd_message_elt = self.getMessageFromResult(client, elt, mam_req, | 292 fwd_message_elt = self.getMessageFromResult(client, elt, mam_req, |
261 service=service) | 293 service=service) |
262 mess_data = client.messageProt.parseMessage(fwd_message_elt) | 294 mess_data = client.messageProt.parseMessage(fwd_message_elt) |
263 mess_list.append(client.messageGetBridgeArgs(mess_data)) | 295 mess_list.append(client.messageGetBridgeArgs(mess_data)) |
264 return mess_list, client.profile | 296 metadata = self._rsm.serialise(rsm_response) |
297 self.serialise(mam_response, metadata) | |
298 return mess_list, metadata, client.profile | |
265 | 299 |
266 def _getArchives(self, service, extra_ser, profile_key): | 300 def _getArchives(self, service, extra_ser, profile_key): |
301 """ | |
302 @return: tuple with: | |
303 - list of message with same data as in bridge.messageNew | |
304 - response metadata with: | |
305 - rsm data (rsm_first, rsm_last, rsm_count, rsm_index) | |
306 - mam data (mam_complete, mam_stable) | |
307 - profile | |
308 """ | |
267 client = self.host.getClient(profile_key) | 309 client = self.host.getClient(profile_key) |
268 service = jid.JID(service) if service else None | 310 service = jid.JID(service) if service else None |
269 extra = data_format.deserialise(extra_ser, {}) | 311 extra = data_format.deserialise(extra_ser, {}) |
270 mam_req = self.parseExtra(extra) | 312 mam_req = self.parseExtra(extra) |
271 | 313 |
272 d = self.getArchives(client, mam_req, service=service) | 314 d = self.getArchives(client, mam_req, service=service) |
273 d.addCallback(self.serializeArchiveResult, client, mam_req, service) | 315 d.addCallback(self.serializeArchiveResult, client, mam_req, service) |
274 return d | 316 return d |
275 | 317 |
276 def getArchives(self, client, query, service=None, message_cb=None): | 318 def getArchives(self, client, query, service=None, message_cb=None): |
277 """Query archive then grab and return them all in the result | 319 """Query archive and gather page result |
278 | 320 |
321 @param query(mam.MAMRequest): MAM request | |
322 @param service(jid.JID, None): MAM service to use | |
323 None to use our own server | |
324 @param message_cb(callable, None): callback to use on each message | |
325 this method can be used to unwrap messages | |
326 @return (tuple[list[domish.Element], rsm.RSMResponse, dict): result data with: | |
327 - list of found elements | |
328 - RSM response | |
329 - MAM response, which is a dict with following value: | |
330 - complete: a boolean which is True if all items have been received | |
331 - stable: a boolean which is False if items order may be changed | |
279 """ | 332 """ |
280 if query.query_id is None: | 333 if query.query_id is None: |
281 query.query_id = unicode(uuid.uuid4()) | 334 query.query_id = unicode(uuid.uuid4()) |
282 elt_list = [] | 335 elt_list = [] |
283 event = MESSAGE_RESULT.format(mam_ns=mam.NS_MAM, query_id=query.query_id) | 336 event = MESSAGE_RESULT.format(mam_ns=mam.NS_MAM, query_id=query.query_id) |