comparison sat_pubsub/delegation.py @ 291:61fb4817b77f

delegation: iq forwarded management: in addition to the onForward observer, the xmlstream.send message is monkey patched, so we can inject the forwarded stanza as if it was received normally, and intercept the result to send it back to the server.
author Goffi <goffi@goffi.org>
date Mon, 04 May 2015 18:40:47 +0200
parents f08f8536cab8
children 6918a0dad359
comparison
equal deleted inserted replaced
290:9f612fa19eea 291:61fb4817b77f
26 from wokkel.subprotocols import XMPPHandler 26 from wokkel.subprotocols import XMPPHandler
27 from wokkel import pubsub 27 from wokkel import pubsub
28 from wokkel import data_form 28 from wokkel import data_form
29 from wokkel import disco, iwokkel 29 from wokkel import disco, iwokkel
30 from twisted.python import log 30 from twisted.python import log
31 from twisted.words.protocols.jabber import error 31 from twisted.words.protocols.jabber import jid, error
32 from twisted.words.protocols.jabber.xmlstream import toResponse
33 from twisted.words.xish import domish
32 from zope.interface import implements 34 from zope.interface import implements
33 35
34 DELEGATION_NS = 'urn:xmpp:delegation:1' 36 DELEGATION_NS = 'urn:xmpp:delegation:1'
35 FORWARDED_NS = 'urn:xmpp:forward:0' 37 FORWARDED_NS = 'urn:xmpp:forward:0'
36 DELEGATION_ADV_XPATH = '/message/delegation[@xmlns="{}"]'.format(DELEGATION_NS) 38 DELEGATION_ADV_XPATH = '/message/delegation[@xmlns="{}"]'.format(DELEGATION_NS)
39 DELEGATION_FWD_XPATH = '/iq[@type="set"]/delegation[@xmlns="{}"]/forwarded[@xmlns="{}"]'.format(DELEGATION_NS, FORWARDED_NS)
37 40
38 DELEGATION_MAIN_SEP = "::" 41 DELEGATION_MAIN_SEP = "::"
39 DELEGATION_BARE_SEP = ":bare:" 42 DELEGATION_BARE_SEP = ":bare:"
40 43
41 class InvalidStanza(Exception): 44 class InvalidStanza(Exception):
47 def __init__(self): 50 def __init__(self):
48 super(DelegationsHandler, self).__init__() 51 super(DelegationsHandler, self).__init__()
49 52
50 def connectionInitialized(self): 53 def connectionInitialized(self):
51 self.xmlstream.addObserver(DELEGATION_ADV_XPATH, self.onAdvertise) 54 self.xmlstream.addObserver(DELEGATION_ADV_XPATH, self.onAdvertise)
55 self.xmlstream.addObserver(DELEGATION_FWD_XPATH, self._obsWrapper, 0, self.onForward)
56 self._current_iqs = {} # dict of iq being handler by delegation
57 self._xs_send = self.xmlstream.send
58 self.xmlstream.send = self._sendHack
59
60 def _sendHack(self, elt):
61 """This method is called instead of xmlstream to control sending
62
63 @param obj(domsish.Element, unicode, str): obj sent to real xmlstream
64 """
65 if isinstance(elt, domish.Element) and elt.name=='iq':
66 try:
67 ori_iq, managed_entity = self._current_iqs.pop(elt.getAttribute('id'))
68 if jid.JID(elt['to']) != managed_entity:
69 log.msg("IQ id conflict: the managed entity doesn't match")
70 raise KeyError
71 except KeyError:
72 # the iq is not a delegated one
73 self._xs_send(elt)
74 else:
75 iq_result_elt = toResponse(ori_iq, 'result')
76 fwd_elt = iq_result_elt.addElement('delegation', DELEGATION_NS).addElement('forwarded', FORWARDED_NS)
77 fwd_elt.addChild(elt)
78 self._xs_send(iq_result_elt)
79 else:
80 self._xs_send(elt)
81
82 def _obsWrapper(self, observer, stanza):
83 """Wrapper to observer which catch StanzaError
84
85 @param observer(callable): method to wrap
86 """
87 try:
88 observer(stanza)
89 except error.StanzaError as e:
90 error_elt = e.toResponse(stanza)
91 self._xs_send(error_elt)
92 stanza.handled = True
52 93
53 def onAdvertise(self, message): 94 def onAdvertise(self, message):
54 """Managage the <message/> advertising delegations""" 95 """Manage the <message/> advertising delegations"""
55 delegation_elt = message.elements(DELEGATION_NS, 'delegation').next() 96 delegation_elt = message.elements(DELEGATION_NS, 'delegation').next()
56 delegated = {} 97 delegated = {}
57 for delegated_elt in delegation_elt.elements(DELEGATION_NS): 98 for delegated_elt in delegation_elt.elements(DELEGATION_NS):
58 try: 99 try:
59 if delegated_elt.name != 'delegated': 100 if delegated_elt.name != 'delegated':
76 u"" if not attributes else u" with filtering on {} attribute(s)".format( 117 u"" if not attributes else u" with filtering on {} attribute(s)".format(
77 u", ".join(attributes))) for ns, attributes in delegated.items()]))) 118 u", ".join(attributes))) for ns, attributes in delegated.items()])))
78 119
79 if not pubsub.NS_PUBSUB in delegated: 120 if not pubsub.NS_PUBSUB in delegated:
80 log.msg(u"Didn't got pubsub delegation from server, can't act as a PEP service") 121 log.msg(u"Didn't got pubsub delegation from server, can't act as a PEP service")
122
123 def onForward(self, iq):
124 """Manage forwarded iq
125
126 @param iq(domish.Element): full delegation stanza
127 """
128 try:
129 fwd_iq = (iq.elements(DELEGATION_NS, 'delegation').next()
130 .elements(FORWARDED_NS, 'forwarded').next()
131 .elements('jabber:client', 'iq').next())
132 except StopIteration:
133 raise error.StanzaError('not-acceptable')
134
135 managed_entity = jid.JID(fwd_iq.getAttribute('to') or fwd_iq['from'])
136
137 if managed_entity.host != iq['from']:
138 log.msg((u"SECURITY WARNING: forwarded stanza doesn't come from the emitting server: {}"
139 .format(iq.toXml())).encode('utf-8'))
140 raise error.StanzaError('not-allowed')
141
142 self._current_iqs[fwd_iq['id']] = (iq, managed_entity)
143 # we now inject the element in the stream
144 self.xmlstream.dispatch(fwd_iq)
81 145
82 def getDiscoInfo(self, requestor, target, nodeIdentifier=''): 146 def getDiscoInfo(self, requestor, target, nodeIdentifier=''):
83 """Manage disco nesting 147 """Manage disco nesting
84 148
85 This method looks for DiscoHandler in sibling handlers and use it to 149 This method looks for DiscoHandler in sibling handlers and use it to