comparison src/plugins/plugin_xep_0313.py @ 1776:4fc1bf1af48f

plugin XEP-0313: cleaning and improvments: - moved common namespaces to core.constants - removed useless dependencies and recommendations - removed all bride methods as they where there only for testing purpose - don't use a plugin dict to store MAM clients, put them in profiles' clients instead - removed message trigger in favor of an observer which is triggered only when specific queryid is used - new getArchives method which grab all result and return them all at once
author Goffi <goffi@goffi.org>
date Tue, 05 Jan 2016 23:20:22 +0100
parents d17772b0fe22
children 2daf7b4c6756
comparison
equal deleted inserted replaced
1775:0876352459e5 1776:4fc1bf1af48f
20 20
21 from sat.core.constants import Const as C 21 from sat.core.constants import Const as C
22 from sat.core.i18n import _ 22 from sat.core.i18n import _
23 from sat.core.log import getLogger 23 from sat.core.log import getLogger
24 log = getLogger(__name__) 24 log = getLogger(__name__)
25 from sat.core import exceptions
25 26
26 try:
27 from twisted.words.protocols.xmlstream import XMPPHandler
28 except ImportError:
29 from wokkel.subprotocols import XMPPHandler
30 from twisted.words.xish import domish
31 from twisted.words.protocols.jabber import jid 27 from twisted.words.protocols.jabber import jid
32 28
33 from zope.interface import implements 29 from zope.interface import implements
34 30
35 from wokkel import disco, data_form 31 from wokkel import disco
36 from wokkel.generic import parseXml 32 import uuid
37 from wokkel.pubsub import NS_PUBSUB_EVENT, ItemsEvent
38 33
39 # TODO: change this when RSM and MAM are in wokkel 34 # XXX: mam and rsm come from tmp.wokkel
40 from sat.tmp.wokkel.rsm import RSMRequest 35 from wokkel import rsm
41 from sat.tmp.wokkel import mam 36 from wokkel import mam
42 37
43 38
44 NS_MAM = 'urn:xmpp:mam:0' 39 MESSAGE_RESULT = "/message/result[@xmlns='{mam_ns}' and @queryid='{query_id}']"
45 NS_SF = 'urn:xmpp:forward:0'
46 NS_DD = 'urn:xmpp:delay'
47 NS_CLIENT = 'jabber:client'
48 40
49 PLUGIN_INFO = { 41 PLUGIN_INFO = {
50 "name": "Message Archive Management", 42 "name": "Message Archive Management",
51 "import_name": "XEP-0313", 43 "import_name": "XEP-0313",
52 "type": "XEP", 44 "type": "XEP",
53 "protocols": ["XEP-0313"], 45 "protocols": ["XEP-0313"],
54 "dependencies": ["XEP-0059", "XEP-0297", "XEP-0203"],
55 "recommendations": ["XEP-0334"],
56 "main": "XEP_0313", 46 "main": "XEP_0313",
57 "handler": "yes", 47 "handler": "yes",
58 "description": _("""Implementation of Message Archive Management""") 48 "description": _("""Implementation of Message Archive Management""")
59 } 49 }
60 50
62 class XEP_0313(object): 52 class XEP_0313(object):
63 53
64 def __init__(self, host): 54 def __init__(self, host):
65 log.info(_("Message Archive Management plugin initialization")) 55 log.info(_("Message Archive Management plugin initialization"))
66 self.host = host 56 self.host = host
67 self.clients = {} # bind profile name to SatMAMClient
68 host.bridge.addMethod("MAMqueryFields", ".plugin", in_sign='ss', out_sign='s',
69 method=self._queryFields,
70 async=True,
71 doc={})
72 host.bridge.addMethod("MAMqueryArchive", ".plugin", in_sign='ssa{ss}ss', out_sign='s',
73 method=self._queryArchive,
74 async=True,
75 doc={})
76 host.bridge.addMethod("MAMgetPrefs", ".plugin", in_sign='ss', out_sign='s',
77 method=self._getPrefs,
78 async=True,
79 doc={})
80 host.bridge.addMethod("MAMsetPrefs", ".plugin", in_sign='ssasass', out_sign='s',
81 method=self._setPrefs,
82 async=True,
83 doc={})
84 host.trigger.add("MessageReceived", self.messageReceivedTrigger)
85 57
86 def getHandler(self, profile): 58 def getHandler(self, profile):
87 self.clients[profile] = SatMAMClient(self, profile) 59 client = self.host.getClient(profile)
88 return self.clients[profile] 60 mam_client = client._mam = SatMAMClient()
61 return mam_client
89 62
90 def profileDisconnected(self, profile): 63 def queryFields(self, client, service=None):
91 try: 64 """Ask the server about supported fields.
92 del self.clients[profile]
93 except KeyError:
94 pass
95
96 def _queryFields(self, service_s=None, profile_key=C.PROF_KEY_NONE):
97 service = jid.JID(service_s) if service_s else None
98 return self.queryFields(service, profile_key)
99
100 def queryFields(self, service=None, profile_key=C.PROF_KEY_NONE):
101 """Ask the server about additional supported fields.
102 65
103 @param service: entity offering the MAM service (None for user archives) 66 @param service: entity offering the MAM service (None for user archives)
104 @param profile_key (unicode): %(doc_profile_key)s 67 @return (D(data_form.Form)): form with the implemented fields (cf XEP-0313 ยง4.1.5)
105 @return: the server response as a Deferred domish.Element
106 """ 68 """
107 # http://xmpp.org/extensions/xep-0313.html#query-form 69 return client._mam.queryFields(service)
108 def eb(failure):
109 # typically StanzaError with condition u'service-unavailable'
110 log.error(failure.getErrorMessage())
111 return ''
112 70
113 profile = self.host.memory.getProfileName(profile_key) 71 def queryArchive(self, client, mam_query, service=None):
114 d = self.clients[profile].queryFields(service)
115 return d.addCallbacks(lambda elt: elt.toXml(), eb)
116
117 def _queryArchive(self, service_s=None, form_xml=None, rsm_dict=None, node=None, profile_key=C.PROF_KEY_NONE):
118 service = jid.JID(service_s) if service_s else None
119 if form_xml:
120 form = data_form.Form.fromElement(parseXml(form_xml))
121 if form.formNamespace != NS_MAM:
122 log.error(_(u"Expected a MAM Data Form, got instead: %s") % form.formNamespace)
123 form = None
124 else:
125 form = None
126 rsm = RSMRequest(**rsm_dict) if rsm_dict else None
127 return self.queryArchive(service, form, rsm, node, profile_key)
128
129 def queryArchive(self, service=None, form=None, rsm=None, node=None, profile_key=C.PROF_KEY_NONE):
130 """Query a user, MUC or pubsub archive. 72 """Query a user, MUC or pubsub archive.
131 73
132 @param service: entity offering the MAM service (None for user archives) 74 @param mam_query(mam.MAMRequest): MAM query instance
133 @param form (Form): data form to filter the request 75 @param service(jid.JID, None): entity offering the MAM service
134 @param rsm (RSMRequest): RSM request instance 76 None for user server
135 @param node (unicode): pubsub node to query, or None if inappropriate 77 @return (D(domish.Element)): <IQ/> result
136 @param profile_key (unicode): %(doc_profile_key)s
137 @return: a Deferred when the message has been sent
138 """ 78 """
139 def eb(failure): 79 return client._mam.queryArchive(mam_query, service)
140 # typically StanzaError with condition u'service-unavailable'
141 log.error(failure.getErrorMessage())
142 return ''
143 80
144 profile = self.host.memory.getProfileName(profile_key) 81 def _appendMessage(self, elt_list, message_cb, message_elt):
145 d = self.clients[profile].queryArchive(service, form, rsm, node) 82 if message_cb is not None:
146 return d.addCallbacks(lambda elt: elt.toXml(), eb) 83 elt_list.append(message_cb(message_elt))
147 # TODO: add the handler for receiving the final message 84 else:
85 elt_list.append(message_elt)
148 86
149 def _getPrefs(self, service_s=None, profile_key=C.PROF_KEY_NONE): 87 def _queryFinished(self, iq_result, client, elt_list, event):
150 service = jid.JID(service_s) if service_s else None 88 client.xmlstream.removeObserver(event, self._appendMessage)
151 return self.getPrefs(service, profile_key) 89 try:
90 fin_elt = iq_result.elements(mam.NS_MAM, 'fin').next()
91 except StopIteration:
92 raise exceptions.DataError(u"Invalid MAM result")
152 93
153 def getPrefs(self, service=None, profile_key=C.PROF_KEY_NONE): 94 try:
95 rsm_response = rsm.RSMResponse.fromElement(fin_elt)
96 except rsm.RSMNotFoundError:
97 rsm_response = None
98
99 return (elt_list, rsm_response)
100
101 def getArchives(self, client, query, service=None, message_cb=None):
102 """Query archive then grab and return them all in the result
103
104 """
105 if query.query_id is None:
106 query.query_id = unicode(uuid.uuid4())
107 elt_list = []
108 event = MESSAGE_RESULT.format(mam_ns=mam.NS_MAM, query_id=query.query_id)
109 client.xmlstream.addObserver(event, self._appendMessage, 0, elt_list, message_cb)
110 d = self.queryArchive(client, query, service)
111 d.addCallback(self._queryFinished, client, elt_list, event)
112 return d
113
114 def getPrefs(self, client, service=None):
154 """Retrieve the current user preferences. 115 """Retrieve the current user preferences.
155 116
156 @param service: entity offering the MAM service (None for user archives) 117 @param service: entity offering the MAM service (None for user archives)
157 @param profile_key (unicode): %(doc_profile_key)s
158 @return: the server response as a Deferred domish.Element 118 @return: the server response as a Deferred domish.Element
159 """ 119 """
160 # http://xmpp.org/extensions/xep-0313.html#prefs 120 # http://xmpp.org/extensions/xep-0313.html#prefs
161 def eb(failure): 121 return client._mam.queryPrefs(service)
162 # typically StanzaError with condition u'service-unavailable'
163 log.error(failure.getErrorMessage())
164 return ''
165
166 profile = self.host.memory.getProfileName(profile_key)
167 d = self.clients[profile].queryPrefs(service)
168 return d.addCallbacks(lambda elt: elt.toXml(), eb)
169 122
170 def _setPrefs(self, service_s=None, default='roster', always=None, never=None, profile_key=C.PROF_KEY_NONE): 123 def _setPrefs(self, service_s=None, default='roster', always=None, never=None, profile_key=C.PROF_KEY_NONE):
171 service = jid.JID(service_s) if service_s else None 124 service = jid.JID(service_s) if service_s else None
172 always_jid = [jid.JID(entity) for entity in always] 125 always_jid = [jid.JID(entity) for entity in always]
173 never_jid = [jid.JID(entity) for entity in never] 126 never_jid = [jid.JID(entity) for entity in never]
174 #TODO: why not build here a MAMPrefs object instead of passing the args separately? 127 #TODO: why not build here a MAMPrefs object instead of passing the args separately?
175 return self.setPrefs(service, default, always_jid, never_jid, profile_key) 128 return self.setPrefs(service, default, always_jid, never_jid, profile_key)
176 129
177 def setPrefs(self, service=None, default='roster', always=None, never=None, profile_key=C.PROF_KEY_NONE): 130 def setPrefs(self, client, service=None, default='roster', always=None, never=None):
178 """Set news user preferences. 131 """Set news user preferences.
179 132
180 @param service: entity offering the MAM service (None for user archives) 133 @param service: entity offering the MAM service (None for user archives)
181 @param default (unicode): a value in ('always', 'never', 'roster') 134 @param default (unicode): a value in ('always', 'never', 'roster')
182 @param always (list): a list of JID instances 135 @param always (list): a list of JID instances
183 @param never (list): a list of JID instances 136 @param never (list): a list of JID instances
184 @param profile_key (unicode): %(doc_profile_key)s 137 @param profile_key (unicode): %(doc_profile_key)s
185 @return: the server response as a Deferred domish.Element 138 @return: the server response as a Deferred domish.Element
186 """ 139 """
187 # http://xmpp.org/extensions/xep-0313.html#prefs 140 # http://xmpp.org/extensions/xep-0313.html#prefs
188 def eb(failure): 141 return client._mam.setPrefs(service, default, always, never)
189 # typically StanzaError with condition u'service-unavailable'
190 log.error(failure.getErrorMessage())
191 return ''
192
193 profile = self.host.memory.getProfileName(profile_key)
194 d = self.clients[profile].setPrefs(service, default, always, never)
195 return d.addCallbacks(lambda elt: elt.toXml(), eb)
196
197 def messageReceivedTrigger(self, message, post_treat, profile):
198 """Check if the message is a MAM result. If so, extract the original
199 message, stop processing the current message and process the original
200 message instead.
201 """
202 try:
203 result = domish.generateElementsQNamed(message.elements(), "result", NS_MAM).next()
204 except StopIteration:
205 return True
206 try:
207 forwarded = domish.generateElementsQNamed(result.elements(), "forwarded", NS_SF).next()
208 except StopIteration:
209 log.error(_("MAM result misses its <forwarded/> mandatory element!"))
210 return False
211 try:
212 # TODO: delay is not here for nothing, get benefice of it!
213 delay = domish.generateElementsQNamed(forwarded.elements(), "delay", NS_DD).next()
214 msg = domish.generateElementsQNamed(forwarded.elements(), "message", NS_CLIENT).next()
215 except StopIteration:
216 log.error(_("<forwarded/> element misses a mandatory child!"))
217 return False
218 log.debug(_("MAM found a forwarded message"))
219
220 if msg.event and msg.event.uri == NS_PUBSUB_EVENT:
221 event = ItemsEvent(jid.JID(message['from']),
222 jid.JID(message['to']),
223 msg.event.items['node'],
224 msg.event.items.elements(),
225 {})
226 self.host.plugins["XEP-0060"].clients[profile].itemsReceived(event)
227 return False
228
229 client = self.host.getClient(profile)
230 client.messageProt.onMessage(msg)
231 return False
232 142
233 143
234 class SatMAMClient(mam.MAMClient): 144 class SatMAMClient(mam.MAMClient):
235 implements(disco.IDisco) 145 implements(disco.IDisco)
236 146
237 def __init__(self, plugin_parent, profile):
238 self.plugin_parent = plugin_parent
239 self.host = plugin_parent.host
240 self.profile = profile
241 mam.MAMClient.__init__(self)
242
243 def getDiscoInfo(self, requestor, target, nodeIdentifier=''): 147 def getDiscoInfo(self, requestor, target, nodeIdentifier=''):
244 return [disco.DiscoFeature(NS_MAM)] 148 return [disco.DiscoFeature(mam.NS_MAM)]
245 149
246 def getDiscoItems(self, requestor, target, nodeIdentifier=''): 150 def getDiscoItems(self, requestor, target, nodeIdentifier=''):
247 return [] 151 return []