Mercurial > libervia-backend
comparison sat/plugins/plugin_xep_0313.py @ 2701:2ea2369ae7de
plugin XEP-0313: implementation of MAM for messages:
- (core/xmpp): new messageGetBridgeArgs to easily retrieve arguments used in bridge from message data
- : parseMessage is not static anymore
- : new "message_parse" trigger point
- (xep-0313) : new "MAMGet" bridge method to retrieve history from MAM instead of local one
- : on profileConnected, if previous MAM message is found (i.e. message with a stanza_id), message received while offline are retrieved and injected in message workflow.
In other words, one2one history is synchronised on connection.
- : new "parseExtra" method which parse MAM (and optionally RSM) option from extra dictionary used in bridge.
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 01 Dec 2018 10:33:43 +0100 |
parents | 56f94936df1e |
children | 19000c506d0c |
comparison
equal
deleted
inserted
replaced
2700:035901dc946d | 2701:2ea2369ae7de |
---|---|
19 # along with this program. If not, see <http://www.gnu.org/licenses/>. | 19 # along with this program. If not, see <http://www.gnu.org/licenses/>. |
20 | 20 |
21 from sat.core.constants import Const as C | 21 from sat.core.constants import Const as C |
22 from sat.core.i18n import _ | 22 from sat.core.i18n import _ |
23 from sat.core.log import getLogger | 23 from sat.core.log import getLogger |
24 | |
25 log = getLogger(__name__) | |
26 from sat.core import exceptions | 24 from sat.core import exceptions |
27 | 25 from sat.tools.common import data_format |
28 from twisted.words.protocols.jabber import jid | 26 from twisted.words.protocols.jabber import jid |
29 | 27 from twisted.internet import defer |
30 from zope.interface import implements | 28 from zope.interface import implements |
31 | 29 from datetime import datetime |
30 from dateutil import tz | |
32 from wokkel import disco | 31 from wokkel import disco |
32 from wokkel import data_form | |
33 import uuid | 33 import uuid |
34 | 34 |
35 # XXX: mam and rsm come from sat_tmp.wokkel | 35 # XXX: mam and rsm come from sat_tmp.wokkel |
36 from wokkel import rsm | 36 from wokkel import rsm |
37 from wokkel import mam | 37 from wokkel import mam |
38 | 38 |
39 | 39 |
40 log = getLogger(__name__) | |
41 | |
42 | |
40 MESSAGE_RESULT = "/message/result[@xmlns='{mam_ns}' and @queryid='{query_id}']" | 43 MESSAGE_RESULT = "/message/result[@xmlns='{mam_ns}' and @queryid='{query_id}']" |
41 | 44 |
42 PLUGIN_INFO = { | 45 PLUGIN_INFO = { |
43 C.PI_NAME: "Message Archive Management", | 46 C.PI_NAME: u"Message Archive Management", |
44 C.PI_IMPORT_NAME: "XEP-0313", | 47 C.PI_IMPORT_NAME: u"XEP-0313", |
45 C.PI_TYPE: "XEP", | 48 C.PI_TYPE: u"XEP", |
46 C.PI_PROTOCOLS: ["XEP-0313"], | 49 C.PI_PROTOCOLS: [u"XEP-0313"], |
47 C.PI_MAIN: "XEP_0313", | 50 C.PI_DEPENDENCIES: [u"XEP-0059", u"XEP-0359"], |
48 C.PI_HANDLER: "yes", | 51 C.PI_MAIN: u"XEP_0313", |
49 C.PI_DESCRIPTION: _("""Implementation of Message Archive Management"""), | 52 C.PI_HANDLER: u"yes", |
53 C.PI_DESCRIPTION: _(u"""Implementation of Message Archive Management"""), | |
50 } | 54 } |
55 | |
56 MAM_PREFIX = u"mam_" | |
57 FILTER_PREFIX = MAM_PREFIX + "filter_" | |
51 | 58 |
52 | 59 |
53 class XEP_0313(object): | 60 class XEP_0313(object): |
54 def __init__(self, host): | 61 def __init__(self, host): |
55 log.info(_("Message Archive Management plugin initialization")) | 62 log.info(_("Message Archive Management plugin initialization")) |
56 self.host = host | 63 self.host = host |
64 self.host.registerNamespace(u"mam", mam.NS_MAM) | |
65 self._rsm = host.plugins[u"XEP-0059"] | |
66 self._sid = host.plugins[u"XEP-0359"] | |
67 host.bridge.addMethod( | |
68 "MAMGet", ".plugin", in_sign='sss', out_sign='(a(sdssa{ss}a{ss}sa{ss})s)', method=self._getArchives, | |
69 async=True) | |
70 | |
71 @defer.inlineCallbacks | |
72 def profileConnected(self, client): | |
73 last_mess = yield self.host.memory.historyGet( | |
74 None, None, limit=1, filters={u'last_stanza_id': True}, | |
75 profile=client.profile) | |
76 if not last_mess: | |
77 log.info(_(u"It seems that we have no MAM history yet")) | |
78 return | |
79 stanza_id = last_mess[0][-1][u'stanza_id'] | |
80 # XXX: test | |
81 # stanza_id = "IIheJOfiIhkPYkw6" | |
82 rsm_req = rsm.RSMRequest(after=stanza_id) | |
83 mam_req = mam.MAMRequest(rsm_=rsm_req) | |
84 mam_data = yield self.getArchives(client, mam_req, | |
85 service=client.jid.userhostJID()) | |
86 elt_list, rsm_response = mam_data | |
87 if not elt_list: | |
88 log.info(_(u"We have received no message while offline")) | |
89 return | |
90 else: | |
91 log.info(_(u"We have received {num_mess} message(s) while offline.").format( | |
92 num_mess=len(elt_list))) | |
93 | |
94 for mess_elt in elt_list: | |
95 try: | |
96 fwd_message_elt = self.getMessageFromResult(client, mess_elt, mam_req) | |
97 except exceptions.DataError: | |
98 continue | |
99 | |
100 client.messageProt.onMessage(fwd_message_elt) | |
57 | 101 |
58 def getHandler(self, client): | 102 def getHandler(self, client): |
59 mam_client = client._mam = SatMAMClient() | 103 mam_client = client._mam = SatMAMClient() |
60 return mam_client | 104 return mam_client |
61 | 105 |
106 def parseExtra(self, extra, with_rsm=True): | |
107 """Parse extra dictionnary to retrieve MAM arguments | |
108 | |
109 @param extra(dict): data for parse | |
110 @param with_rsm(bool): if True, RSM data will be parsed too | |
111 @return (data_form, None): request with parsed arguments | |
112 or None if no MAM arguments have been found | |
113 """ | |
114 mam_args = {} | |
115 form_args = {} | |
116 for arg in (u"start", u"end"): | |
117 try: | |
118 value = extra.pop(MAM_PREFIX + arg) | |
119 form_args[arg] = datetime.fromtimestamp(float(value), tz.tzutc()) | |
120 except (TypeError, ValueError): | |
121 log.warning(u"Bad value for {arg} filter ({value}), ignoring".format( | |
122 arg=arg, value=value)) | |
123 except KeyError: | |
124 continue | |
125 | |
126 try: | |
127 form_args[u"with_jid"] = jid.JID(extra.pop( | |
128 MAM_PREFIX + u"with")) | |
129 except (jid.InvalidFormat): | |
130 log.warning(u"Bad value for jid filter") | |
131 except KeyError: | |
132 pass | |
133 | |
134 for name, value in extra.iteritems(): | |
135 if name.startswith(FILTER_PREFIX): | |
136 var = name[len(FILTER_PREFIX) :] | |
137 extra_fields = form_args.setdefault(u"extra_fields", []) | |
138 extra_fields.append(data_form.Field(var=var, value=value)) | |
139 | |
140 for arg in (u"node", u"query_id"): | |
141 try: | |
142 value = extra.pop(MAM_PREFIX + arg) | |
143 mam_args[arg] = value | |
144 except KeyError: | |
145 continue | |
146 | |
147 if with_rsm: | |
148 rsm_request = self._rsm.parseExtra(extra) | |
149 if rsm_request is not None: | |
150 mam_args["rsm_"] = rsm_request | |
151 | |
152 if form_args: | |
153 mam_args["form"] = mam.buildForm(**form_args) | |
154 | |
155 return mam.MAMRequest(**mam_args) if mam_args else None | |
156 | |
157 def getMessageFromResult(self, client, mess_elt, mam_req): | |
158 """Extract usable <message/> from MAM query result | |
159 | |
160 The message will be validated, and stanza-id/delay will be added if necessary. | |
161 @param mess_elt(domish.Element): result <message/> element wrapping the message | |
162 to retrieve | |
163 @param mam_req(mam.MAMRequest): request used | |
164 @return (domish.Element): <message/> that can be used directly with onMessage | |
165 """ | |
166 if mess_elt.name != u"message": | |
167 log.warning(u"unexpected stanza in archive: {xml}".format( | |
168 xml=mess_elt.toXml())) | |
169 raise exceptions.DataError(u"Invalid element") | |
170 mess_from = mess_elt[u"from"] | |
171 if mess_from != client.jid.host and mess_from != client.jid.userhost(): | |
172 log.error(u"Message is not from our server, something went wrong: " | |
173 u"{xml}".format(xml=mess_elt.toXml())) | |
174 raise exceptions.DataError(u"Invalid element") | |
175 try: | |
176 result_elt = next(mess_elt.elements(mam.NS_MAM, u"result")) | |
177 forwarded_elt = next(result_elt.elements(C.NS_FORWARD, u"forwarded")) | |
178 try: | |
179 delay_elt = next(forwarded_elt.elements(C.NS_DELAY, u"delay")) | |
180 except StopIteration: | |
181 # delay_elt is not mandatory | |
182 delay_elt = None | |
183 fwd_message_elt = next(forwarded_elt.elements(C.NS_CLIENT, u"message")) | |
184 except StopIteration: | |
185 log.warning(u"Invalid message received from MAM: {xml}".format( | |
186 xml=mess_elt.toXml())) | |
187 raise exceptions.DataError(u"Invalid element") | |
188 else: | |
189 if not result_elt[u"queryid"] == mam_req.query_id: | |
190 log.error(u"Unexpected query id (was expecting {query_id}): {xml}" | |
191 .format(query_id=mam.query_id, xml=mess_elt.toXml())) | |
192 raise exceptions.DataError(u"Invalid element") | |
193 stanza_id = self._sid.getStanzaId(fwd_message_elt, | |
194 client.jid.userhostJID()) | |
195 if stanza_id is None: | |
196 # not stanza-id element is present, we add one so message | |
197 # will be archived with it, and we won't request several times | |
198 # the same MAM achive | |
199 try: | |
200 stanza_id = result_elt[u"id"] | |
201 except AttributeError: | |
202 log.warning(u'Invalid MAM result: missing "id" attribute: {xml}' | |
203 .format(xml=result_elt.toXml())) | |
204 raise exceptions.DataError(u"Invalid element") | |
205 self._sid.addStanzaId(client, fwd_message_elt, stanza_id) | |
206 | |
207 if delay_elt is not None: | |
208 fwd_message_elt.addChild(delay_elt) | |
209 | |
210 return fwd_message_elt | |
211 | |
62 def queryFields(self, client, service=None): | 212 def queryFields(self, client, service=None): |
63 """Ask the server about supported fields. | 213 """Ask the server about supported fields. |
64 | 214 |
65 @param service: entity offering the MAM service (None for user archives) | 215 @param service: entity offering the MAM service (None for user archives) |
66 @return (D(data_form.Form)): form with the implemented fields (cf XEP-0313 §4.1.5) | 216 @return (D(data_form.Form)): form with the implemented fields (cf XEP-0313 §4.1.5) |
67 """ | 217 """ |
68 return client._mam.queryFields(service) | 218 return client._mam.queryFields(service) |
69 | 219 |
70 def queryArchive(self, client, mam_query, service=None): | 220 def queryArchive(self, client, mam_req, service=None): |
71 """Query a user, MUC or pubsub archive. | 221 """Query a user, MUC or pubsub archive. |
72 | 222 |
73 @param mam_query(mam.MAMRequest): MAM query instance | 223 @param mam_req(mam.MAMRequest): MAM query instance |
74 @param service(jid.JID, None): entity offering the MAM service | 224 @param service(jid.JID, None): entity offering the MAM service |
75 None for user server | 225 None for user server |
76 @return (D(domish.Element)): <IQ/> result | 226 @return (D(domish.Element)): <IQ/> result |
77 """ | 227 """ |
78 return client._mam.queryArchive(mam_query, service) | 228 return client._mam.queryArchive(mam_req, service) |
79 | 229 |
80 def _appendMessage(self, elt_list, message_cb, message_elt): | 230 def _appendMessage(self, elt_list, message_cb, message_elt): |
81 if message_cb is not None: | 231 if message_cb is not None: |
82 elt_list.append(message_cb(message_elt)) | 232 elt_list.append(message_cb(message_elt)) |
83 else: | 233 else: |
94 rsm_response = rsm.RSMResponse.fromElement(fin_elt) | 244 rsm_response = rsm.RSMResponse.fromElement(fin_elt) |
95 except rsm.RSMNotFoundError: | 245 except rsm.RSMNotFoundError: |
96 rsm_response = None | 246 rsm_response = None |
97 | 247 |
98 return (elt_list, rsm_response) | 248 return (elt_list, rsm_response) |
249 | |
250 def serializeArchiveResult(self, data, client, mam_req): | |
251 elt_list, rsm_response = data | |
252 mess_list = [] | |
253 for elt in elt_list: | |
254 fwd_message_elt = self.getMessageFromResult(client, elt, mam_req) | |
255 mess_data = client.messageProt.parseMessage(fwd_message_elt) | |
256 mess_list.append(client.messageGetBridgeArgs(mess_data)) | |
257 return mess_list, client.profile | |
258 | |
259 def _getArchives(self, service, extra_ser, profile_key): | |
260 client = self.host.getClient(profile_key) | |
261 service = jid.JID(service) if service else None | |
262 extra = data_format.deserialise(extra_ser, {}) | |
263 mam_req = self.parseExtra(extra) | |
264 | |
265 d = self.getArchives(client, mam_req, service=service) | |
266 d.addCallback(self.serializeArchiveResult, client, mam_req) | |
267 return d | |
99 | 268 |
100 def getArchives(self, client, query, service=None, message_cb=None): | 269 def getArchives(self, client, query, service=None, message_cb=None): |
101 """Query archive then grab and return them all in the result | 270 """Query archive then grab and return them all in the result |
102 | 271 |
103 """ | 272 """ |
117 @return: the server response as a Deferred domish.Element | 286 @return: the server response as a Deferred domish.Element |
118 """ | 287 """ |
119 # http://xmpp.org/extensions/xep-0313.html#prefs | 288 # http://xmpp.org/extensions/xep-0313.html#prefs |
120 return client._mam.queryPrefs(service) | 289 return client._mam.queryPrefs(service) |
121 | 290 |
122 def _setPrefs( | 291 def _setPrefs(self, service_s=None, default="roster", always=None, never=None, |
123 self, | 292 profile_key=C.PROF_KEY_NONE): |
124 service_s=None, | |
125 default="roster", | |
126 always=None, | |
127 never=None, | |
128 profile_key=C.PROF_KEY_NONE, | |
129 ): | |
130 service = jid.JID(service_s) if service_s else None | 293 service = jid.JID(service_s) if service_s else None |
131 always_jid = [jid.JID(entity) for entity in always] | 294 always_jid = [jid.JID(entity) for entity in always] |
132 never_jid = [jid.JID(entity) for entity in never] | 295 never_jid = [jid.JID(entity) for entity in never] |
133 # TODO: why not build here a MAMPrefs object instead of passing the args separately? | 296 # TODO: why not build here a MAMPrefs object instead of passing the args separately? |
134 return self.setPrefs(service, default, always_jid, never_jid, profile_key) | 297 return self.setPrefs(service, default, always_jid, never_jid, profile_key) |