comparison sat_pubsub/delegation.py @ 405:c56a728412f1

file organisation + setup refactoring: - `/src` has been renamed to `/sat_pubsub`, this is the recommended naming convention - revamped `setup.py` on the basis of SàT's `setup.py` - added a `VERSION` which is the unique place where version number will now be set - use same trick as in SàT to specify dev version (`D` at the end) - use setuptools_scm to retrieve Mercurial hash when in dev version
author Goffi <goffi@goffi.org>
date Fri, 16 Aug 2019 12:00:02 +0200
parents src/delegation.py@dabee42494ac
children ccb2a22ea0fc
comparison
equal deleted inserted replaced
404:105a0772eedd 405:c56a728412f1
1 #!/usr/bin/python
2 #-*- coding: utf-8 -*-
3 #
4 # Copyright (c) 2015 Jérôme Poisson
5
6
7 # This program is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU Affero General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
11
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU Affero General Public License for more details.
16
17 # You should have received a copy of the GNU Affero General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20 # ---
21
22 # This module implements XEP-0355 (Namespace delegation) to use SàT Pubsub as PEP service
23
24 from wokkel.subprotocols import XMPPHandler
25 from wokkel import pubsub
26 from wokkel import data_form
27 from wokkel import disco, iwokkel
28 from wokkel.iwokkel import IPubSubService
29 from wokkel import mam
30 from twisted.python import log
31 from twisted.words.protocols.jabber import jid, error
32 from twisted.words.protocols.jabber.xmlstream import toResponse
33 from twisted.words.xish import domish
34 from zope.interface import implements
35
36 DELEGATION_NS = 'urn:xmpp:delegation:1'
37 FORWARDED_NS = 'urn:xmpp:forward:0'
38 DELEGATION_ADV_XPATH = '/message/delegation[@xmlns="{}"]'.format(DELEGATION_NS)
39 DELEGATION_FWD_XPATH = '/iq[@type="set"]/delegation[@xmlns="{}"]/forwarded[@xmlns="{}"]'.format(DELEGATION_NS, FORWARDED_NS)
40
41 DELEGATION_MAIN_SEP = "::"
42 DELEGATION_BARE_SEP = ":bare:"
43
44 TO_HACK = ((IPubSubService, pubsub, "PubSubRequest"),
45 (mam.IMAMService, mam, "MAMRequest"))
46
47
48 class InvalidStanza(Exception):
49 pass
50
51
52 class DelegationsHandler(XMPPHandler):
53 implements(iwokkel.IDisco)
54 _service_hacked = False
55
56 def __init__(self):
57 super(DelegationsHandler, self).__init__()
58
59 def _service_hack(self):
60 """Patch the request classes of services to track delegated stanzas"""
61 # XXX: we need to monkey patch to track origin of the stanza in PubSubRequest.
62 # As PubSubRequest from sat.tmp.wokkel.pubsub use _request_class while
63 # original wokkel.pubsub use directly pubsub.PubSubRequest, we need to
64 # check which version is used before monkeypatching
65 for handler in self.parent.handlers:
66 for service, module, default_base_cls in TO_HACK:
67 if service.providedBy(handler):
68 if hasattr(handler, '_request_class'):
69 request_base_class = handler._request_class
70 else:
71 request_base_class = getattr(module, default_base_cls)
72
73 class RequestWithDelegation(request_base_class):
74 """A XxxRequest which put an indicator if the stanza comme from delegation"""
75
76 @classmethod
77 def fromElement(cls, element):
78 """Check if element comme from delegation, and set a delegated flags
79
80 delegated flag is either False, or it's a jid of the delegating server
81 the delegated flag must be set on element before use
82 """
83 try:
84 # __getattr__ is overriden in domish.Element, so we use __getattribute__
85 delegated = element.__getattribute__('delegated')
86 except AttributeError:
87 delegated = False
88 instance = cls.__base__.fromElement(element)
89 instance.delegated = delegated
90 return instance
91
92 if hasattr(handler, '_request_class'):
93 handler._request_class = RequestWithDelegation
94 else:
95 setattr(module, default_base_cls, RequestWithDelegation)
96 DelegationsHandler._service_hacked = True
97
98 def connectionInitialized(self):
99 if not self._service_hacked:
100 self._service_hack()
101 self.xmlstream.addObserver(DELEGATION_ADV_XPATH, self.onAdvertise)
102 self.xmlstream.addObserver(DELEGATION_FWD_XPATH, self._obsWrapper, 0, self.onForward)
103 self._current_iqs = {} # dict of iq being handler by delegation
104 self._xs_send = self.xmlstream.send
105 self.xmlstream.send = self._sendHack
106
107 def _sendHack(self, elt):
108 """This method is called instead of xmlstream to control sending
109
110 @param obj(domsish.Element, unicode, str): obj sent to real xmlstream
111 """
112 if isinstance(elt, domish.Element) and elt.name=='iq':
113 try:
114 id_ = elt.getAttribute('id')
115 ori_iq, managed_entity = self._current_iqs[id_]
116 if jid.JID(elt['to']) != managed_entity:
117 log.msg("IQ id conflict: the managed entity doesn't match (got {got} was expecting {expected})"
118 .format(got=jid.JID(elt['to']), expected=managed_entity))
119 raise KeyError
120 except KeyError:
121 # the iq is not a delegated one
122 self._xs_send(elt)
123 else:
124 del self._current_iqs[id_]
125 iq_result_elt = toResponse(ori_iq, 'result')
126 fwd_elt = iq_result_elt.addElement('delegation', DELEGATION_NS).addElement('forwarded', FORWARDED_NS)
127 fwd_elt.addChild(elt)
128 elt.uri = elt.defaultUri = 'jabber:client'
129 self._xs_send(iq_result_elt)
130 else:
131 self._xs_send(elt)
132
133 def _obsWrapper(self, observer, stanza):
134 """Wrapper to observer which catch StanzaError
135
136 @param observer(callable): method to wrap
137 """
138 try:
139 observer(stanza)
140 except error.StanzaError as e:
141 error_elt = e.toResponse(stanza)
142 self._xs_send(error_elt)
143 stanza.handled = True
144
145 def onAdvertise(self, message):
146 """Manage the <message/> advertising delegations"""
147 delegation_elt = message.elements(DELEGATION_NS, 'delegation').next()
148 delegated = {}
149 for delegated_elt in delegation_elt.elements(DELEGATION_NS):
150 try:
151 if delegated_elt.name != 'delegated':
152 raise InvalidStanza(u'unexpected element {}'.format(delegated_elt.name))
153 try:
154 namespace = delegated_elt['namespace']
155 except KeyError:
156 raise InvalidStanza(u'was expecting a "namespace" attribute in delegated element')
157 delegated[namespace] = []
158 for attribute_elt in delegated_elt.elements(DELEGATION_NS, 'attribute'):
159 try:
160 delegated[namespace].append(attribute_elt["name"])
161 except KeyError:
162 raise InvalidStanza(u'was expecting a "name" attribute in attribute element')
163 except InvalidStanza as e:
164 log.msg("Invalid stanza received ({})".format(e))
165
166 log.msg(u'delegations updated:\n{}'.format(
167 u'\n'.join([u" - namespace {}{}".format(ns,
168 u"" if not attributes else u" with filtering on {} attribute(s)".format(
169 u", ".join(attributes))) for ns, attributes in delegated.items()])))
170
171 if not pubsub.NS_PUBSUB in delegated:
172 log.msg(u"Didn't got pubsub delegation from server, can't act as a PEP service")
173
174 def onForward(self, iq):
175 """Manage forwarded iq
176
177 @param iq(domish.Element): full delegation stanza
178 """
179
180 # FIXME: we use a hack supposing that our delegation come from hostname
181 # and we are a component named [name].hostname
182 # but we need to manage properly allowed servers
183 # TODO: do proper origin security check
184 _, allowed = iq['to'].split('.', 1)
185 if jid.JID(iq['from']) != jid.JID(allowed):
186 log.msg((u"SECURITY WARNING: forwarded stanza doesn't come from our server: {}"
187 .format(iq.toXml())).encode('utf-8'))
188 raise error.StanzaError('not-allowed')
189
190 try:
191 fwd_iq = (iq.elements(DELEGATION_NS, 'delegation').next()
192 .elements(FORWARDED_NS, 'forwarded').next()
193 .elements('jabber:client', 'iq').next())
194 except StopIteration:
195 raise error.StanzaError('not-acceptable')
196
197 managed_entity = jid.JID(fwd_iq['from'])
198
199 self._current_iqs[fwd_iq['id']] = (iq, managed_entity)
200 fwd_iq.delegated = True
201
202 # we need a recipient in pubsub request for PEP
203 # so we set "to" attribute if it doesn't exist
204 if not fwd_iq.hasAttribute('to'):
205 fwd_iq["to"] = jid.JID(fwd_iq["from"]).userhost()
206
207 # we now inject the element in the stream
208 self.xmlstream.dispatch(fwd_iq)
209
210 def getDiscoInfo(self, requestor, target, nodeIdentifier=''):
211 """Manage disco nesting
212
213 This method looks for DiscoHandler in sibling handlers and use it to
214 collect main disco infos. It then filters by delegated namespace and return it.
215 An identity is added for PEP if pubsub namespace is requested.
216
217 The same features/identities are returned for main and bare nodes
218 """
219 if not nodeIdentifier.startswith(DELEGATION_NS):
220 return []
221
222 try:
223 _, namespace = nodeIdentifier.split(DELEGATION_MAIN_SEP, 1)
224 except ValueError:
225 try:
226 _, namespace = nodeIdentifier.split(DELEGATION_BARE_SEP, 1)
227 except ValueError:
228 log.msg("Unexpected disco node: {}".format(nodeIdentifier))
229 raise error.StanzaError('not-acceptable')
230
231 if not namespace:
232 log.msg("No namespace found in node {}".format(nodeIdentifier))
233 return []
234
235 if namespace.startswith(pubsub.NS_PUBSUB):
236 # pubsub use several namespaces starting with NS_PUBSUB (e.g. http://jabber.org/protocol/pubsub#owner)
237 # we return the same disco for all of them
238 namespace = pubsub.NS_PUBSUB
239
240 def gotInfos(infos):
241 ns_features = []
242 for info in infos:
243 if isinstance(info, disco.DiscoFeature) and info.startswith(namespace):
244 ns_features.append(info)
245 elif (isinstance(info, data_form.Form) and info.formNamespace
246 and info.formNamespace.startwith(namespace)):
247 # extensions management (XEP-0128)
248 ns_features.append(info)
249
250 if namespace == pubsub.NS_PUBSUB:
251 ns_features.append(disco.DiscoIdentity('pubsub', 'pep'))
252
253 return ns_features
254
255 for handler in self.parent.handlers:
256 if isinstance(handler, disco.DiscoHandler):
257 break
258
259 if not isinstance(handler, disco.DiscoHandler):
260 log.err("Can't find DiscoHandler")
261 return []
262
263 d = handler.info(requestor, target, '')
264 d.addCallback(gotInfos)
265 return d
266
267 def getDiscoItems(self, requestor, target, nodeIdentifier=''):
268 return []
269
270
271 # we monkeypatch DiscoHandler to add delegation informations
272 def _onDiscoItems(self, iq):
273 request = disco._DiscoRequest.fromElement(iq)
274 # it's really ugly to attach pep data to recipient
275 # but we don't have many options
276 request.recipient.pep = iq.delegated
277
278 def toResponse(items):
279 response = disco.DiscoItems()
280 response.nodeIdentifier = request.nodeIdentifier
281
282 for item in items:
283 response.append(item)
284
285 return response.toElement()
286
287 d = self.items(request.sender, request.recipient,
288 request.nodeIdentifier)
289 d.addCallback(toResponse)
290 return d
291
292
293 disco.DiscoHandler._onDiscoItems = _onDiscoItems