Mercurial > libervia-pubsub
comparison src/delegation.py @ 369:dabee42494ac
config file + cleaning:
- SàT Pubsub can now be configured using the same config file as SàT itself (i.e. sat.conf or .sat.conf), in the same locations (/etc, local dir, xdg dir).
Its options must be in the "pubsub" section
- options on command line override config options
- removed tap and http files which are not used anymore
- changed directory structure to put source in src, to be coherent with SàT and Libervia
- changed options name, db* become db_*, secret become xmpp_pwd
- an exception is raised if jid or xmpp_pwd is are not configured
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Mar 2018 12:59:38 +0100 |
parents | sat_pubsub/delegation.py@3bbab2173ebc |
children |
comparison
equal
deleted
inserted
replaced
368:618a92080812 | 369:dabee42494ac |
---|---|
1 #!/usr/bin/python | |
2 #-*- coding: utf-8 -*- | |
3 # | |
4 # Copyright (c) 2015 Jérôme Poisson | |
5 | |
6 | |
7 # This program is free software: you can redistribute it and/or modify | |
8 # it under the terms of the GNU Affero General Public License as published by | |
9 # the Free Software Foundation, either version 3 of the License, or | |
10 # (at your option) any later version. | |
11 | |
12 # This program is distributed in the hope that it will be useful, | |
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
15 # GNU Affero General Public License for more details. | |
16 | |
17 # You should have received a copy of the GNU Affero General Public License | |
18 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
19 | |
20 # --- | |
21 | |
22 # This module implements XEP-0355 (Namespace delegation) to use SàT Pubsub as PEP service | |
23 | |
24 from wokkel.subprotocols import XMPPHandler | |
25 from wokkel import pubsub | |
26 from wokkel import data_form | |
27 from wokkel import disco, iwokkel | |
28 from wokkel.iwokkel import IPubSubService | |
29 from wokkel import mam | |
30 from twisted.python import log | |
31 from twisted.words.protocols.jabber import jid, error | |
32 from twisted.words.protocols.jabber.xmlstream import toResponse | |
33 from twisted.words.xish import domish | |
34 from zope.interface import implements | |
35 | |
36 DELEGATION_NS = 'urn:xmpp:delegation:1' | |
37 FORWARDED_NS = 'urn:xmpp:forward:0' | |
38 DELEGATION_ADV_XPATH = '/message/delegation[@xmlns="{}"]'.format(DELEGATION_NS) | |
39 DELEGATION_FWD_XPATH = '/iq[@type="set"]/delegation[@xmlns="{}"]/forwarded[@xmlns="{}"]'.format(DELEGATION_NS, FORWARDED_NS) | |
40 | |
41 DELEGATION_MAIN_SEP = "::" | |
42 DELEGATION_BARE_SEP = ":bare:" | |
43 | |
44 TO_HACK = ((IPubSubService, pubsub, "PubSubRequest"), | |
45 (mam.IMAMService, mam, "MAMRequest")) | |
46 | |
47 | |
48 class InvalidStanza(Exception): | |
49 pass | |
50 | |
51 | |
52 class DelegationsHandler(XMPPHandler): | |
53 implements(iwokkel.IDisco) | |
54 _service_hacked = False | |
55 | |
56 def __init__(self): | |
57 super(DelegationsHandler, self).__init__() | |
58 | |
59 def _service_hack(self): | |
60 """Patch the request classes of services to track delegated stanzas""" | |
61 # XXX: we need to monkey patch to track origin of the stanza in PubSubRequest. | |
62 # As PubSubRequest from sat.tmp.wokkel.pubsub use _request_class while | |
63 # original wokkel.pubsub use directly pubsub.PubSubRequest, we need to | |
64 # check which version is used before monkeypatching | |
65 for handler in self.parent.handlers: | |
66 for service, module, default_base_cls in TO_HACK: | |
67 if service.providedBy(handler): | |
68 if hasattr(handler, '_request_class'): | |
69 request_base_class = handler._request_class | |
70 else: | |
71 request_base_class = getattr(module, default_base_cls) | |
72 | |
73 class RequestWithDelegation(request_base_class): | |
74 """A XxxRequest which put an indicator if the stanza comme from delegation""" | |
75 | |
76 @classmethod | |
77 def fromElement(cls, element): | |
78 """Check if element comme from delegation, and set a delegated flags | |
79 | |
80 delegated flag is either False, or it's a jid of the delegating server | |
81 the delegated flag must be set on element before use | |
82 """ | |
83 try: | |
84 # __getattr__ is overriden in domish.Element, so we use __getattribute__ | |
85 delegated = element.__getattribute__('delegated') | |
86 except AttributeError: | |
87 delegated = False | |
88 instance = cls.__base__.fromElement(element) | |
89 instance.delegated = delegated | |
90 return instance | |
91 | |
92 if hasattr(handler, '_request_class'): | |
93 handler._request_class = RequestWithDelegation | |
94 else: | |
95 setattr(module, default_base_cls, RequestWithDelegation) | |
96 DelegationsHandler._service_hacked = True | |
97 | |
98 def connectionInitialized(self): | |
99 if not self._service_hacked: | |
100 self._service_hack() | |
101 self.xmlstream.addObserver(DELEGATION_ADV_XPATH, self.onAdvertise) | |
102 self.xmlstream.addObserver(DELEGATION_FWD_XPATH, self._obsWrapper, 0, self.onForward) | |
103 self._current_iqs = {} # dict of iq being handler by delegation | |
104 self._xs_send = self.xmlstream.send | |
105 self.xmlstream.send = self._sendHack | |
106 | |
107 def _sendHack(self, elt): | |
108 """This method is called instead of xmlstream to control sending | |
109 | |
110 @param obj(domsish.Element, unicode, str): obj sent to real xmlstream | |
111 """ | |
112 if isinstance(elt, domish.Element) and elt.name=='iq': | |
113 try: | |
114 id_ = elt.getAttribute('id') | |
115 ori_iq, managed_entity = self._current_iqs[id_] | |
116 if jid.JID(elt['to']) != managed_entity: | |
117 log.msg("IQ id conflict: the managed entity doesn't match (got {got} was expecting {expected})" | |
118 .format(got=jid.JID(elt['to']), expected=managed_entity)) | |
119 raise KeyError | |
120 except KeyError: | |
121 # the iq is not a delegated one | |
122 self._xs_send(elt) | |
123 else: | |
124 del self._current_iqs[id_] | |
125 iq_result_elt = toResponse(ori_iq, 'result') | |
126 fwd_elt = iq_result_elt.addElement('delegation', DELEGATION_NS).addElement('forwarded', FORWARDED_NS) | |
127 fwd_elt.addChild(elt) | |
128 elt.uri = elt.defaultUri = 'jabber:client' | |
129 self._xs_send(iq_result_elt) | |
130 else: | |
131 self._xs_send(elt) | |
132 | |
133 def _obsWrapper(self, observer, stanza): | |
134 """Wrapper to observer which catch StanzaError | |
135 | |
136 @param observer(callable): method to wrap | |
137 """ | |
138 try: | |
139 observer(stanza) | |
140 except error.StanzaError as e: | |
141 error_elt = e.toResponse(stanza) | |
142 self._xs_send(error_elt) | |
143 stanza.handled = True | |
144 | |
145 def onAdvertise(self, message): | |
146 """Manage the <message/> advertising delegations""" | |
147 delegation_elt = message.elements(DELEGATION_NS, 'delegation').next() | |
148 delegated = {} | |
149 for delegated_elt in delegation_elt.elements(DELEGATION_NS): | |
150 try: | |
151 if delegated_elt.name != 'delegated': | |
152 raise InvalidStanza(u'unexpected element {}'.format(delegated_elt.name)) | |
153 try: | |
154 namespace = delegated_elt['namespace'] | |
155 except KeyError: | |
156 raise InvalidStanza(u'was expecting a "namespace" attribute in delegated element') | |
157 delegated[namespace] = [] | |
158 for attribute_elt in delegated_elt.elements(DELEGATION_NS, 'attribute'): | |
159 try: | |
160 delegated[namespace].append(attribute_elt["name"]) | |
161 except KeyError: | |
162 raise InvalidStanza(u'was expecting a "name" attribute in attribute element') | |
163 except InvalidStanza as e: | |
164 log.msg("Invalid stanza received ({})".format(e)) | |
165 | |
166 log.msg(u'delegations updated:\n{}'.format( | |
167 u'\n'.join([u" - namespace {}{}".format(ns, | |
168 u"" if not attributes else u" with filtering on {} attribute(s)".format( | |
169 u", ".join(attributes))) for ns, attributes in delegated.items()]))) | |
170 | |
171 if not pubsub.NS_PUBSUB in delegated: | |
172 log.msg(u"Didn't got pubsub delegation from server, can't act as a PEP service") | |
173 | |
174 def onForward(self, iq): | |
175 """Manage forwarded iq | |
176 | |
177 @param iq(domish.Element): full delegation stanza | |
178 """ | |
179 | |
180 # FIXME: we use a hack supposing that our delegation come from hostname | |
181 # and we are a component named [name].hostname | |
182 # but we need to manage properly allowed servers | |
183 # TODO: do proper origin security check | |
184 _, allowed = iq['to'].split('.', 1) | |
185 if jid.JID(iq['from']) != jid.JID(allowed): | |
186 log.msg((u"SECURITY WARNING: forwarded stanza doesn't come from our server: {}" | |
187 .format(iq.toXml())).encode('utf-8')) | |
188 raise error.StanzaError('not-allowed') | |
189 | |
190 try: | |
191 fwd_iq = (iq.elements(DELEGATION_NS, 'delegation').next() | |
192 .elements(FORWARDED_NS, 'forwarded').next() | |
193 .elements('jabber:client', 'iq').next()) | |
194 except StopIteration: | |
195 raise error.StanzaError('not-acceptable') | |
196 | |
197 managed_entity = jid.JID(fwd_iq['from']) | |
198 | |
199 self._current_iqs[fwd_iq['id']] = (iq, managed_entity) | |
200 fwd_iq.delegated = True | |
201 | |
202 # we need a recipient in pubsub request for PEP | |
203 # so we set "to" attribute if it doesn't exist | |
204 if not fwd_iq.hasAttribute('to'): | |
205 fwd_iq["to"] = jid.JID(fwd_iq["from"]).userhost() | |
206 | |
207 # we now inject the element in the stream | |
208 self.xmlstream.dispatch(fwd_iq) | |
209 | |
210 def getDiscoInfo(self, requestor, target, nodeIdentifier=''): | |
211 """Manage disco nesting | |
212 | |
213 This method looks for DiscoHandler in sibling handlers and use it to | |
214 collect main disco infos. It then filters by delegated namespace and return it. | |
215 An identity is added for PEP if pubsub namespace is requested. | |
216 | |
217 The same features/identities are returned for main and bare nodes | |
218 """ | |
219 if not nodeIdentifier.startswith(DELEGATION_NS): | |
220 return [] | |
221 | |
222 try: | |
223 _, namespace = nodeIdentifier.split(DELEGATION_MAIN_SEP, 1) | |
224 except ValueError: | |
225 try: | |
226 _, namespace = nodeIdentifier.split(DELEGATION_BARE_SEP, 1) | |
227 except ValueError: | |
228 log.msg("Unexpected disco node: {}".format(nodeIdentifier)) | |
229 raise error.StanzaError('not-acceptable') | |
230 | |
231 if not namespace: | |
232 log.msg("No namespace found in node {}".format(nodeIdentifier)) | |
233 return [] | |
234 | |
235 if namespace.startswith(pubsub.NS_PUBSUB): | |
236 # pubsub use several namespaces starting with NS_PUBSUB (e.g. http://jabber.org/protocol/pubsub#owner) | |
237 # we return the same disco for all of them | |
238 namespace = pubsub.NS_PUBSUB | |
239 | |
240 def gotInfos(infos): | |
241 ns_features = [] | |
242 for info in infos: | |
243 if isinstance(info, disco.DiscoFeature) and info.startswith(namespace): | |
244 ns_features.append(info) | |
245 elif (isinstance(info, data_form.Form) and info.formNamespace | |
246 and info.formNamespace.startwith(namespace)): | |
247 # extensions management (XEP-0128) | |
248 ns_features.append(info) | |
249 | |
250 if namespace == pubsub.NS_PUBSUB: | |
251 ns_features.append(disco.DiscoIdentity('pubsub', 'pep')) | |
252 | |
253 return ns_features | |
254 | |
255 for handler in self.parent.handlers: | |
256 if isinstance(handler, disco.DiscoHandler): | |
257 break | |
258 | |
259 if not isinstance(handler, disco.DiscoHandler): | |
260 log.err("Can't find DiscoHandler") | |
261 return [] | |
262 | |
263 d = handler.info(requestor, target, '') | |
264 d.addCallback(gotInfos) | |
265 return d | |
266 | |
267 def getDiscoItems(self, requestor, target, nodeIdentifier=''): | |
268 return [] | |
269 | |
270 | |
271 # we monkeypatch DiscoHandler to add delegation informations | |
272 def _onDiscoItems(self, iq): | |
273 request = disco._DiscoRequest.fromElement(iq) | |
274 # it's really ugly to attach pep data to recipient | |
275 # but we don't have many options | |
276 request.recipient.pep = iq.delegated | |
277 | |
278 def toResponse(items): | |
279 response = disco.DiscoItems() | |
280 response.nodeIdentifier = request.nodeIdentifier | |
281 | |
282 for item in items: | |
283 response.append(item) | |
284 | |
285 return response.toElement() | |
286 | |
287 d = self.items(request.sender, request.recipient, | |
288 request.nodeIdentifier) | |
289 d.addCallback(toResponse) | |
290 return d | |
291 | |
292 | |
293 disco.DiscoHandler._onDiscoItems = _onDiscoItems |