comparison sat_pubsub/delegation.py @ 292:6918a0dad359

delegation: delegated stanza are tracked
author Goffi <goffi@goffi.org>
date Sun, 16 Aug 2015 01:13:23 +0200
parents 61fb4817b77f
children e6a9a3c93314
comparison
equal deleted inserted replaced
291:61fb4817b77f 292:6918a0dad359
25 25
26 from wokkel.subprotocols import XMPPHandler 26 from wokkel.subprotocols import XMPPHandler
27 from wokkel import pubsub 27 from wokkel import pubsub
28 from wokkel import data_form 28 from wokkel import data_form
29 from wokkel import disco, iwokkel 29 from wokkel import disco, iwokkel
30 from wokkel.iwokkel import IPubSubService
30 from twisted.python import log 31 from twisted.python import log
31 from twisted.words.protocols.jabber import jid, error 32 from twisted.words.protocols.jabber import jid, error
32 from twisted.words.protocols.jabber.xmlstream import toResponse 33 from twisted.words.protocols.jabber.xmlstream import toResponse
33 from twisted.words.xish import domish 34 from twisted.words.xish import domish
34 from zope.interface import implements 35 from zope.interface import implements
42 DELEGATION_BARE_SEP = ":bare:" 43 DELEGATION_BARE_SEP = ":bare:"
43 44
44 class InvalidStanza(Exception): 45 class InvalidStanza(Exception):
45 pass 46 pass
46 47
48
49
47 class DelegationsHandler(XMPPHandler): 50 class DelegationsHandler(XMPPHandler):
48 implements(iwokkel.IDisco) 51 implements(iwokkel.IDisco)
52 _service_hacked = False
49 53
50 def __init__(self): 54 def __init__(self):
51 super(DelegationsHandler, self).__init__() 55 super(DelegationsHandler, self).__init__()
52 56
57 def _service_hack(self):
58 """Patch the PubSubService to track delegated stanzas"""
59 # XXX: we need to monkey patch to track origin of the stanza in PubSubRequest.
60 # As PubSubRequest from sat.tmp.wokkel.pubsub use _request_class while
61 # original wokkel.pubsub use directly pubsub.PubSubRequest, we need to
62 # check which version is used before monkeypatching
63 for handler in self.parent.handlers:
64 if IPubSubService.providedBy(handler):
65 if hasattr(handler, '_request_class'):
66 request_base_class = handler._request_class
67 else:
68 request_base_class = pubsub.PubSubRequest
69
70 class PubSubRequestWithDelegation(request_base_class):
71 """A PubSubReques which put an indicator if the stanza comme from delegation"""
72
73 @classmethod
74 def fromElement(cls, element):
75 """Check if element comme from delegation, and set a delegated flags
76
77 delegated flaf is either False, or it's a jid of the delegating server
78 the delegated flag must be set on element before use
79 """
80 try:
81 # __getattr__ is overriden in domish.Element, so we use __getattribute__
82 delegated = element.__getattribute__('delegated')
83 except AttributeError:
84 delegated = False
85 instance = cls.__base__.fromElement(element)
86 instance.delegated = delegated
87 return instance
88
89 if hasattr(handler, '_request_class'):
90 handler._request_class = PubSubRequestWithDelegation
91 else:
92 pubsub.PubSubRequest = PubSubRequestWithDelegation
93 DelegationsHandler._service_hacked = True
94
53 def connectionInitialized(self): 95 def connectionInitialized(self):
96 if not self._service_hacked:
97 self._service_hack()
54 self.xmlstream.addObserver(DELEGATION_ADV_XPATH, self.onAdvertise) 98 self.xmlstream.addObserver(DELEGATION_ADV_XPATH, self.onAdvertise)
55 self.xmlstream.addObserver(DELEGATION_FWD_XPATH, self._obsWrapper, 0, self.onForward) 99 self.xmlstream.addObserver(DELEGATION_FWD_XPATH, self._obsWrapper, 0, self.onForward)
56 self._current_iqs = {} # dict of iq being handler by delegation 100 self._current_iqs = {} # dict of iq being handler by delegation
57 self._xs_send = self.xmlstream.send 101 self._xs_send = self.xmlstream.send
58 self.xmlstream.send = self._sendHack 102 self.xmlstream.send = self._sendHack
62 106
63 @param obj(domsish.Element, unicode, str): obj sent to real xmlstream 107 @param obj(domsish.Element, unicode, str): obj sent to real xmlstream
64 """ 108 """
65 if isinstance(elt, domish.Element) and elt.name=='iq': 109 if isinstance(elt, domish.Element) and elt.name=='iq':
66 try: 110 try:
67 ori_iq, managed_entity = self._current_iqs.pop(elt.getAttribute('id')) 111 id_ = elt.getAttribute('id')
112 ori_iq, managed_entity = self._current_iqs[id_]
68 if jid.JID(elt['to']) != managed_entity: 113 if jid.JID(elt['to']) != managed_entity:
69 log.msg("IQ id conflict: the managed entity doesn't match") 114 log.msg("IQ id conflict: the managed entity doesn't match (got {got} was expecting {expected})"
115 .format(got=jid.JID(elt['to']), expected=managed_entity))
70 raise KeyError 116 raise KeyError
71 except KeyError: 117 except KeyError:
72 # the iq is not a delegated one 118 # the iq is not a delegated one
73 self._xs_send(elt) 119 self._xs_send(elt)
74 else: 120 else:
121 del self._current_iqs[id_]
75 iq_result_elt = toResponse(ori_iq, 'result') 122 iq_result_elt = toResponse(ori_iq, 'result')
76 fwd_elt = iq_result_elt.addElement('delegation', DELEGATION_NS).addElement('forwarded', FORWARDED_NS) 123 fwd_elt = iq_result_elt.addElement('delegation', DELEGATION_NS).addElement('forwarded', FORWARDED_NS)
77 fwd_elt.addChild(elt) 124 fwd_elt.addChild(elt)
125 elt.uri = elt.defaultUri = 'jabber:client'
78 self._xs_send(iq_result_elt) 126 self._xs_send(iq_result_elt)
79 else: 127 else:
80 self._xs_send(elt) 128 self._xs_send(elt)
81 129
82 def _obsWrapper(self, observer, stanza): 130 def _obsWrapper(self, observer, stanza):
130 .elements(FORWARDED_NS, 'forwarded').next() 178 .elements(FORWARDED_NS, 'forwarded').next()
131 .elements('jabber:client', 'iq').next()) 179 .elements('jabber:client', 'iq').next())
132 except StopIteration: 180 except StopIteration:
133 raise error.StanzaError('not-acceptable') 181 raise error.StanzaError('not-acceptable')
134 182
135 managed_entity = jid.JID(fwd_iq.getAttribute('to') or fwd_iq['from']) 183 managed_entity = jid.JID(fwd_iq['from'])
136 184
137 if managed_entity.host != iq['from']: 185 if managed_entity.host != iq['from']:
138 log.msg((u"SECURITY WARNING: forwarded stanza doesn't come from the emitting server: {}" 186 log.msg((u"SECURITY WARNING: forwarded stanza doesn't come from the emitting server: {}"
139 .format(iq.toXml())).encode('utf-8')) 187 .format(iq.toXml())).encode('utf-8'))
140 raise error.StanzaError('not-allowed') 188 raise error.StanzaError('not-allowed')
141 189
142 self._current_iqs[fwd_iq['id']] = (iq, managed_entity) 190 self._current_iqs[fwd_iq['id']] = (iq, managed_entity)
191 fwd_iq.delegated = True
192
193 # we need a recipient in pubsub request for PEP
194 # so we set "to" attribute if it doesn't exist
195 if not fwd_iq.hasAttribute('to'):
196 fwd_iq["to"] = jid.JID(fwd_iq["from"]).userhost()
197
143 # we now inject the element in the stream 198 # we now inject the element in the stream
144 self.xmlstream.dispatch(fwd_iq) 199 self.xmlstream.dispatch(fwd_iq)
145 200
146 def getDiscoInfo(self, requestor, target, nodeIdentifier=''): 201 def getDiscoInfo(self, requestor, target, nodeIdentifier=''):
147 """Manage disco nesting 202 """Manage disco nesting