Mercurial > libervia-pubsub
comparison sat_pubsub/delegation.py @ 291:61fb4817b77f
delegation: iq forwarded management:
in addition to the onForward observer, the xmlstream.send message is monkey patched, so we can inject the forwarded stanza as if it was received normally, and intercept the result to send it back to the server.
author | Goffi <goffi@goffi.org> |
---|---|
date | Mon, 04 May 2015 18:40:47 +0200 |
parents | f08f8536cab8 |
children | 6918a0dad359 |
comparison
equal
deleted
inserted
replaced
290:9f612fa19eea | 291:61fb4817b77f |
---|---|
26 from wokkel.subprotocols import XMPPHandler | 26 from wokkel.subprotocols import XMPPHandler |
27 from wokkel import pubsub | 27 from wokkel import pubsub |
28 from wokkel import data_form | 28 from wokkel import data_form |
29 from wokkel import disco, iwokkel | 29 from wokkel import disco, iwokkel |
30 from twisted.python import log | 30 from twisted.python import log |
31 from twisted.words.protocols.jabber import error | 31 from twisted.words.protocols.jabber import jid, error |
32 from twisted.words.protocols.jabber.xmlstream import toResponse | |
33 from twisted.words.xish import domish | |
32 from zope.interface import implements | 34 from zope.interface import implements |
33 | 35 |
34 DELEGATION_NS = 'urn:xmpp:delegation:1' | 36 DELEGATION_NS = 'urn:xmpp:delegation:1' |
35 FORWARDED_NS = 'urn:xmpp:forward:0' | 37 FORWARDED_NS = 'urn:xmpp:forward:0' |
36 DELEGATION_ADV_XPATH = '/message/delegation[@xmlns="{}"]'.format(DELEGATION_NS) | 38 DELEGATION_ADV_XPATH = '/message/delegation[@xmlns="{}"]'.format(DELEGATION_NS) |
39 DELEGATION_FWD_XPATH = '/iq[@type="set"]/delegation[@xmlns="{}"]/forwarded[@xmlns="{}"]'.format(DELEGATION_NS, FORWARDED_NS) | |
37 | 40 |
38 DELEGATION_MAIN_SEP = "::" | 41 DELEGATION_MAIN_SEP = "::" |
39 DELEGATION_BARE_SEP = ":bare:" | 42 DELEGATION_BARE_SEP = ":bare:" |
40 | 43 |
41 class InvalidStanza(Exception): | 44 class InvalidStanza(Exception): |
47 def __init__(self): | 50 def __init__(self): |
48 super(DelegationsHandler, self).__init__() | 51 super(DelegationsHandler, self).__init__() |
49 | 52 |
50 def connectionInitialized(self): | 53 def connectionInitialized(self): |
51 self.xmlstream.addObserver(DELEGATION_ADV_XPATH, self.onAdvertise) | 54 self.xmlstream.addObserver(DELEGATION_ADV_XPATH, self.onAdvertise) |
55 self.xmlstream.addObserver(DELEGATION_FWD_XPATH, self._obsWrapper, 0, self.onForward) | |
56 self._current_iqs = {} # dict of iq being handler by delegation | |
57 self._xs_send = self.xmlstream.send | |
58 self.xmlstream.send = self._sendHack | |
59 | |
60 def _sendHack(self, elt): | |
61 """This method is called instead of xmlstream to control sending | |
62 | |
63 @param obj(domsish.Element, unicode, str): obj sent to real xmlstream | |
64 """ | |
65 if isinstance(elt, domish.Element) and elt.name=='iq': | |
66 try: | |
67 ori_iq, managed_entity = self._current_iqs.pop(elt.getAttribute('id')) | |
68 if jid.JID(elt['to']) != managed_entity: | |
69 log.msg("IQ id conflict: the managed entity doesn't match") | |
70 raise KeyError | |
71 except KeyError: | |
72 # the iq is not a delegated one | |
73 self._xs_send(elt) | |
74 else: | |
75 iq_result_elt = toResponse(ori_iq, 'result') | |
76 fwd_elt = iq_result_elt.addElement('delegation', DELEGATION_NS).addElement('forwarded', FORWARDED_NS) | |
77 fwd_elt.addChild(elt) | |
78 self._xs_send(iq_result_elt) | |
79 else: | |
80 self._xs_send(elt) | |
81 | |
82 def _obsWrapper(self, observer, stanza): | |
83 """Wrapper to observer which catch StanzaError | |
84 | |
85 @param observer(callable): method to wrap | |
86 """ | |
87 try: | |
88 observer(stanza) | |
89 except error.StanzaError as e: | |
90 error_elt = e.toResponse(stanza) | |
91 self._xs_send(error_elt) | |
92 stanza.handled = True | |
52 | 93 |
53 def onAdvertise(self, message): | 94 def onAdvertise(self, message): |
54 """Managage the <message/> advertising delegations""" | 95 """Manage the <message/> advertising delegations""" |
55 delegation_elt = message.elements(DELEGATION_NS, 'delegation').next() | 96 delegation_elt = message.elements(DELEGATION_NS, 'delegation').next() |
56 delegated = {} | 97 delegated = {} |
57 for delegated_elt in delegation_elt.elements(DELEGATION_NS): | 98 for delegated_elt in delegation_elt.elements(DELEGATION_NS): |
58 try: | 99 try: |
59 if delegated_elt.name != 'delegated': | 100 if delegated_elt.name != 'delegated': |
76 u"" if not attributes else u" with filtering on {} attribute(s)".format( | 117 u"" if not attributes else u" with filtering on {} attribute(s)".format( |
77 u", ".join(attributes))) for ns, attributes in delegated.items()]))) | 118 u", ".join(attributes))) for ns, attributes in delegated.items()]))) |
78 | 119 |
79 if not pubsub.NS_PUBSUB in delegated: | 120 if not pubsub.NS_PUBSUB in delegated: |
80 log.msg(u"Didn't got pubsub delegation from server, can't act as a PEP service") | 121 log.msg(u"Didn't got pubsub delegation from server, can't act as a PEP service") |
122 | |
123 def onForward(self, iq): | |
124 """Manage forwarded iq | |
125 | |
126 @param iq(domish.Element): full delegation stanza | |
127 """ | |
128 try: | |
129 fwd_iq = (iq.elements(DELEGATION_NS, 'delegation').next() | |
130 .elements(FORWARDED_NS, 'forwarded').next() | |
131 .elements('jabber:client', 'iq').next()) | |
132 except StopIteration: | |
133 raise error.StanzaError('not-acceptable') | |
134 | |
135 managed_entity = jid.JID(fwd_iq.getAttribute('to') or fwd_iq['from']) | |
136 | |
137 if managed_entity.host != iq['from']: | |
138 log.msg((u"SECURITY WARNING: forwarded stanza doesn't come from the emitting server: {}" | |
139 .format(iq.toXml())).encode('utf-8')) | |
140 raise error.StanzaError('not-allowed') | |
141 | |
142 self._current_iqs[fwd_iq['id']] = (iq, managed_entity) | |
143 # we now inject the element in the stream | |
144 self.xmlstream.dispatch(fwd_iq) | |
81 | 145 |
82 def getDiscoInfo(self, requestor, target, nodeIdentifier=''): | 146 def getDiscoInfo(self, requestor, target, nodeIdentifier=''): |
83 """Manage disco nesting | 147 """Manage disco nesting |
84 | 148 |
85 This method looks for DiscoHandler in sibling handlers and use it to | 149 This method looks for DiscoHandler in sibling handlers and use it to |