Mercurial > libervia-pubsub
comparison sat_pubsub/delegation.py @ 292:6918a0dad359
delegation: delegated stanza are tracked
author | Goffi <goffi@goffi.org> |
---|---|
date | Sun, 16 Aug 2015 01:13:23 +0200 |
parents | 61fb4817b77f |
children | e6a9a3c93314 |
comparison
equal
deleted
inserted
replaced
291:61fb4817b77f | 292:6918a0dad359 |
---|---|
25 | 25 |
26 from wokkel.subprotocols import XMPPHandler | 26 from wokkel.subprotocols import XMPPHandler |
27 from wokkel import pubsub | 27 from wokkel import pubsub |
28 from wokkel import data_form | 28 from wokkel import data_form |
29 from wokkel import disco, iwokkel | 29 from wokkel import disco, iwokkel |
30 from wokkel.iwokkel import IPubSubService | |
30 from twisted.python import log | 31 from twisted.python import log |
31 from twisted.words.protocols.jabber import jid, error | 32 from twisted.words.protocols.jabber import jid, error |
32 from twisted.words.protocols.jabber.xmlstream import toResponse | 33 from twisted.words.protocols.jabber.xmlstream import toResponse |
33 from twisted.words.xish import domish | 34 from twisted.words.xish import domish |
34 from zope.interface import implements | 35 from zope.interface import implements |
42 DELEGATION_BARE_SEP = ":bare:" | 43 DELEGATION_BARE_SEP = ":bare:" |
43 | 44 |
44 class InvalidStanza(Exception): | 45 class InvalidStanza(Exception): |
45 pass | 46 pass |
46 | 47 |
48 | |
49 | |
47 class DelegationsHandler(XMPPHandler): | 50 class DelegationsHandler(XMPPHandler): |
48 implements(iwokkel.IDisco) | 51 implements(iwokkel.IDisco) |
52 _service_hacked = False | |
49 | 53 |
50 def __init__(self): | 54 def __init__(self): |
51 super(DelegationsHandler, self).__init__() | 55 super(DelegationsHandler, self).__init__() |
52 | 56 |
57 def _service_hack(self): | |
58 """Patch the PubSubService to track delegated stanzas""" | |
59 # XXX: we need to monkey patch to track origin of the stanza in PubSubRequest. | |
60 # As PubSubRequest from sat.tmp.wokkel.pubsub use _request_class while | |
61 # original wokkel.pubsub use directly pubsub.PubSubRequest, we need to | |
62 # check which version is used before monkeypatching | |
63 for handler in self.parent.handlers: | |
64 if IPubSubService.providedBy(handler): | |
65 if hasattr(handler, '_request_class'): | |
66 request_base_class = handler._request_class | |
67 else: | |
68 request_base_class = pubsub.PubSubRequest | |
69 | |
70 class PubSubRequestWithDelegation(request_base_class): | |
71 """A PubSubReques which put an indicator if the stanza comme from delegation""" | |
72 | |
73 @classmethod | |
74 def fromElement(cls, element): | |
75 """Check if element comme from delegation, and set a delegated flags | |
76 | |
77 delegated flaf is either False, or it's a jid of the delegating server | |
78 the delegated flag must be set on element before use | |
79 """ | |
80 try: | |
81 # __getattr__ is overriden in domish.Element, so we use __getattribute__ | |
82 delegated = element.__getattribute__('delegated') | |
83 except AttributeError: | |
84 delegated = False | |
85 instance = cls.__base__.fromElement(element) | |
86 instance.delegated = delegated | |
87 return instance | |
88 | |
89 if hasattr(handler, '_request_class'): | |
90 handler._request_class = PubSubRequestWithDelegation | |
91 else: | |
92 pubsub.PubSubRequest = PubSubRequestWithDelegation | |
93 DelegationsHandler._service_hacked = True | |
94 | |
53 def connectionInitialized(self): | 95 def connectionInitialized(self): |
96 if not self._service_hacked: | |
97 self._service_hack() | |
54 self.xmlstream.addObserver(DELEGATION_ADV_XPATH, self.onAdvertise) | 98 self.xmlstream.addObserver(DELEGATION_ADV_XPATH, self.onAdvertise) |
55 self.xmlstream.addObserver(DELEGATION_FWD_XPATH, self._obsWrapper, 0, self.onForward) | 99 self.xmlstream.addObserver(DELEGATION_FWD_XPATH, self._obsWrapper, 0, self.onForward) |
56 self._current_iqs = {} # dict of iq being handler by delegation | 100 self._current_iqs = {} # dict of iq being handler by delegation |
57 self._xs_send = self.xmlstream.send | 101 self._xs_send = self.xmlstream.send |
58 self.xmlstream.send = self._sendHack | 102 self.xmlstream.send = self._sendHack |
62 | 106 |
63 @param obj(domsish.Element, unicode, str): obj sent to real xmlstream | 107 @param obj(domsish.Element, unicode, str): obj sent to real xmlstream |
64 """ | 108 """ |
65 if isinstance(elt, domish.Element) and elt.name=='iq': | 109 if isinstance(elt, domish.Element) and elt.name=='iq': |
66 try: | 110 try: |
67 ori_iq, managed_entity = self._current_iqs.pop(elt.getAttribute('id')) | 111 id_ = elt.getAttribute('id') |
112 ori_iq, managed_entity = self._current_iqs[id_] | |
68 if jid.JID(elt['to']) != managed_entity: | 113 if jid.JID(elt['to']) != managed_entity: |
69 log.msg("IQ id conflict: the managed entity doesn't match") | 114 log.msg("IQ id conflict: the managed entity doesn't match (got {got} was expecting {expected})" |
115 .format(got=jid.JID(elt['to']), expected=managed_entity)) | |
70 raise KeyError | 116 raise KeyError |
71 except KeyError: | 117 except KeyError: |
72 # the iq is not a delegated one | 118 # the iq is not a delegated one |
73 self._xs_send(elt) | 119 self._xs_send(elt) |
74 else: | 120 else: |
121 del self._current_iqs[id_] | |
75 iq_result_elt = toResponse(ori_iq, 'result') | 122 iq_result_elt = toResponse(ori_iq, 'result') |
76 fwd_elt = iq_result_elt.addElement('delegation', DELEGATION_NS).addElement('forwarded', FORWARDED_NS) | 123 fwd_elt = iq_result_elt.addElement('delegation', DELEGATION_NS).addElement('forwarded', FORWARDED_NS) |
77 fwd_elt.addChild(elt) | 124 fwd_elt.addChild(elt) |
125 elt.uri = elt.defaultUri = 'jabber:client' | |
78 self._xs_send(iq_result_elt) | 126 self._xs_send(iq_result_elt) |
79 else: | 127 else: |
80 self._xs_send(elt) | 128 self._xs_send(elt) |
81 | 129 |
82 def _obsWrapper(self, observer, stanza): | 130 def _obsWrapper(self, observer, stanza): |
130 .elements(FORWARDED_NS, 'forwarded').next() | 178 .elements(FORWARDED_NS, 'forwarded').next() |
131 .elements('jabber:client', 'iq').next()) | 179 .elements('jabber:client', 'iq').next()) |
132 except StopIteration: | 180 except StopIteration: |
133 raise error.StanzaError('not-acceptable') | 181 raise error.StanzaError('not-acceptable') |
134 | 182 |
135 managed_entity = jid.JID(fwd_iq.getAttribute('to') or fwd_iq['from']) | 183 managed_entity = jid.JID(fwd_iq['from']) |
136 | 184 |
137 if managed_entity.host != iq['from']: | 185 if managed_entity.host != iq['from']: |
138 log.msg((u"SECURITY WARNING: forwarded stanza doesn't come from the emitting server: {}" | 186 log.msg((u"SECURITY WARNING: forwarded stanza doesn't come from the emitting server: {}" |
139 .format(iq.toXml())).encode('utf-8')) | 187 .format(iq.toXml())).encode('utf-8')) |
140 raise error.StanzaError('not-allowed') | 188 raise error.StanzaError('not-allowed') |
141 | 189 |
142 self._current_iqs[fwd_iq['id']] = (iq, managed_entity) | 190 self._current_iqs[fwd_iq['id']] = (iq, managed_entity) |
191 fwd_iq.delegated = True | |
192 | |
193 # we need a recipient in pubsub request for PEP | |
194 # so we set "to" attribute if it doesn't exist | |
195 if not fwd_iq.hasAttribute('to'): | |
196 fwd_iq["to"] = jid.JID(fwd_iq["from"]).userhost() | |
197 | |
143 # we now inject the element in the stream | 198 # we now inject the element in the stream |
144 self.xmlstream.dispatch(fwd_iq) | 199 self.xmlstream.dispatch(fwd_iq) |
145 | 200 |
146 def getDiscoInfo(self, requestor, target, nodeIdentifier=''): | 201 def getDiscoInfo(self, requestor, target, nodeIdentifier=''): |
147 """Manage disco nesting | 202 """Manage disco nesting |