Mercurial > libervia-pubsub
comparison sat_pubsub/delegation.py @ 478:b544109ab4c4
Privileged Entity update + Pubsub Account Management partial implementation + Public Pubsub Subscription
/!\ pgsql schema needs to be updated /!\
/!\ server conf needs to be updated for privileged entity: only the new
`urn:xmpp:privilege:2` namespace is handled now /!\
Privileged entity has been updated to hanlde the new namespace and IQ permission. Roster
pushes are not managed yet.
XEP-0376 (Pubsub Account Management) is partially implemented. The XEP is not fully
specified at the moment, and my messages on standard@ haven't seen any reply. Thus for now
only "Subscribing", "Unsubscribing" and "Listing Subscriptions" is implemented, "Auto
Subscriptions" and "Filtering" is not.
Public Pubsub Subscription
(https://xmpp.org/extensions/inbox/pubsub-public-subscriptions.html) is implemented;
the XEP has been accepted by council but is not yet published. It will be updated to use
subscription options instead of the <public> element actually specified, I'm waiting
for publication to update the XEP.
unsubscribe has been updated to return the `<subscription>` element as expected by
XEP-0060 (sat_tmp needs to be updated).
database schema has been updated to add columns necessary to keep track of subscriptions
to external nodes and to mark subscriptions as public.
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 11 May 2022 13:39:08 +0200 |
parents | 607616f9ef5b |
children |
comparison
equal
deleted
inserted
replaced
477:9125a6e440c0 | 478:b544109ab4c4 |
---|---|
18 | 18 |
19 # --- | 19 # --- |
20 | 20 |
21 # This module implements XEP-0355 (Namespace delegation) to use SàT Pubsub as PEP service | 21 # This module implements XEP-0355 (Namespace delegation) to use SàT Pubsub as PEP service |
22 | 22 |
23 from wokkel.subprotocols import XMPPHandler | 23 from typing import Callable, Any |
24 | |
25 from twisted.python import log | |
26 from twisted.internet import reactor, defer | |
27 from twisted.words.protocols.jabber import error, jid | |
28 from twisted.words.protocols.jabber import xmlstream | |
29 from twisted.words.xish import domish | |
24 from wokkel import pubsub | 30 from wokkel import pubsub |
25 from wokkel import data_form | 31 from wokkel import data_form |
26 from wokkel import disco, iwokkel, generic | 32 from wokkel import disco, iwokkel |
33 from wokkel import mam | |
27 from wokkel.iwokkel import IPubSubService | 34 from wokkel.iwokkel import IPubSubService |
28 from wokkel import mam | 35 from wokkel.subprotocols import XMPPHandler |
29 from twisted.python import log | |
30 from twisted.words.protocols.jabber import ijabber, jid, error | |
31 from twisted.words.protocols.jabber.xmlstream import toResponse | |
32 from twisted.words.xish import domish | |
33 from zope.interface import implementer | 36 from zope.interface import implementer |
34 | 37 |
35 DELEGATION_NS = 'urn:xmpp:delegation:2' | 38 DELEGATION_NS = 'urn:xmpp:delegation:2' |
36 FORWARDED_NS = 'urn:xmpp:forward:0' | 39 FORWARDED_NS = 'urn:xmpp:forward:0' |
37 DELEGATION_ADV_XPATH = '/message/delegation[@xmlns="{}"]'.format(DELEGATION_NS) | 40 DELEGATION_ADV_XPATH = '/message/delegation[@xmlns="{}"]'.format(DELEGATION_NS) |
38 DELEGATION_FWD_XPATH = '/iq[@type="set"]/delegation[@xmlns="{}"]/forwarded[@xmlns="{}"]'.format(DELEGATION_NS, FORWARDED_NS) | 41 DELEGATION_FWD_XPATH = '/iq[@type="set"]/delegation[@xmlns="{}"]/forwarded[@xmlns="{}"]'.format(DELEGATION_NS, FORWARDED_NS) |
39 | 42 |
40 DELEGATION_MAIN_SEP = "::" | 43 DELEGATION_MAIN_SEP = "::" |
41 DELEGATION_BARE_SEP = ":bare:" | 44 DELEGATION_BARE_SEP = ":bare:" |
42 | 45 |
46 SEND_HOOK_TIMEOUT = 300 | |
43 TO_HACK = ((IPubSubService, pubsub, "PubSubRequest"), | 47 TO_HACK = ((IPubSubService, pubsub, "PubSubRequest"), |
44 (mam.IMAMService, mam, "MAMRequest"), | 48 (mam.IMAMService, mam, "MAMRequest"), |
45 (None, disco, "_DiscoRequest")) | 49 (None, disco, "_DiscoRequest")) |
46 | 50 |
47 | 51 |
106 self.backend = self.parent.parent.getServiceNamed('backend') | 110 self.backend = self.parent.parent.getServiceNamed('backend') |
107 if not self._service_hacked: | 111 if not self._service_hacked: |
108 self._service_hack() | 112 self._service_hack() |
109 self.xmlstream.addObserver(DELEGATION_ADV_XPATH, self.onAdvertise) | 113 self.xmlstream.addObserver(DELEGATION_ADV_XPATH, self.onAdvertise) |
110 self.xmlstream.addObserver(DELEGATION_FWD_XPATH, self._obsWrapper, 0, self.onForward) | 114 self.xmlstream.addObserver(DELEGATION_FWD_XPATH, self._obsWrapper, 0, self.onForward) |
111 self._current_iqs = {} # dict of iq being handler by delegation | 115 self._current_iqs = {} # dict of iq being handled by delegation |
112 self._xs_send = self.xmlstream.send | 116 self.xs_send = self.xmlstream.send |
113 self.xmlstream.send = self._sendHack | 117 self.xmlstream.send = self._sendHack |
118 | |
119 def delegatedResult( | |
120 self, | |
121 iq_req_elt: domish.Element, | |
122 iq_resp_elt: domish.Element, | |
123 wrapping_iq_elt: domish.Element | |
124 ) -> None: | |
125 """Method called when a result to a delegated stanza has been received | |
126 | |
127 The result is wrapped and sent back to server | |
128 """ | |
129 iq_result_elt = xmlstream.toResponse(wrapping_iq_elt, 'result') | |
130 fwd_elt = iq_result_elt.addElement( | |
131 'delegation', DELEGATION_NS | |
132 ).addElement('forwarded', FORWARDED_NS) | |
133 fwd_elt.addChild(iq_resp_elt) | |
134 iq_resp_elt.uri = iq_resp_elt.defaultUri = 'jabber:client' | |
135 self.xs_send(iq_result_elt) | |
114 | 136 |
115 def _sendHack(self, elt): | 137 def _sendHack(self, elt): |
116 """This method is called instead of xmlstream to control sending | 138 """This method is called instead of xmlstream to control sending |
117 | 139 |
118 @param obj(domsish.Element, unicode, str): obj sent to real xmlstream | 140 @param obj(domsish.Element, unicode, str): obj sent to real xmlstream |
119 """ | 141 """ |
120 if isinstance(elt, domish.Element) and elt.name=='iq': | 142 if isinstance(elt, domish.Element) and elt.name=='iq': |
121 try: | 143 try: |
122 id_ = elt.getAttribute('id') | 144 iq_id = elt["id"] |
123 ori_iq, managed_entity = self._current_iqs[id_] | 145 iq_req_elt, callback, cb_args, timeout = self._current_iqs[iq_id] |
124 if jid.JID(elt['to']) != managed_entity: | 146 if elt['to'] != iq_req_elt["from"]: |
125 log.msg("IQ id conflict: the managed entity doesn't match (got {got} was expecting {expected})" | 147 log.err( |
126 .format(got=jid.JID(elt['to']), expected=managed_entity)) | 148 "IQ id conflict: the managed entity doesn't match (got " |
149 f"{elt['to']!r} and was expecting {iq_req_elt['from']!r})" | |
150 ) | |
127 raise KeyError | 151 raise KeyError |
128 except KeyError: | 152 except KeyError: |
129 # the iq is not a delegated one | 153 # the iq is not a delegated one |
130 self._xs_send(elt) | 154 self.xs_send(elt) |
131 else: | 155 else: |
132 del self._current_iqs[id_] | 156 if not timeout.called: |
133 iq_result_elt = toResponse(ori_iq, 'result') | 157 timeout.cancel() |
134 fwd_elt = iq_result_elt.addElement('delegation', DELEGATION_NS).addElement('forwarded', FORWARDED_NS) | 158 del self._current_iqs[iq_id] |
135 fwd_elt.addChild(elt) | 159 callback(iq_req_elt, elt, *cb_args) |
136 elt.uri = elt.defaultUri = 'jabber:client' | |
137 self._xs_send(iq_result_elt) | |
138 else: | 160 else: |
139 self._xs_send(elt) | 161 self.xs_send(elt) |
140 | 162 |
141 def _obsWrapper(self, observer, stanza): | 163 def _obsWrapper(self, observer, stanza): |
142 """Wrapper to observer which catch StanzaError | 164 """Wrapper to observer which catch StanzaError |
143 | 165 |
144 @param observer(callable): method to wrap | 166 @param observer(callable): method to wrap |
145 """ | 167 """ |
146 try: | 168 try: |
147 observer(stanza) | 169 observer(stanza) |
148 except error.StanzaError as e: | 170 except error.StanzaError as e: |
149 error_elt = e.toResponse(stanza) | 171 error_elt = e.toResponse(stanza) |
150 self._xs_send(error_elt) | 172 self.xs_send(error_elt) |
151 stanza.handled = True | 173 stanza.handled = True |
152 | 174 |
153 def onAdvertise(self, message): | 175 def onAdvertise(self, message): |
154 """Manage the <message/> advertising delegations""" | 176 """Manage the <message/> advertising delegations""" |
155 if self.backend.config["server_jid"] is None: | 177 if self.backend.config["server_jid"] is None: |
184 raise InvalidStanza('was expecting a "name" attribute in attribute element') | 206 raise InvalidStanza('was expecting a "name" attribute in attribute element') |
185 except InvalidStanza as e: | 207 except InvalidStanza as e: |
186 log.msg("Invalid stanza received ({})".format(e)) | 208 log.msg("Invalid stanza received ({})".format(e)) |
187 | 209 |
188 log.msg('delegations updated:\n{}'.format( | 210 log.msg('delegations updated:\n{}'.format( |
189 '\n'.join([" - namespace {}{}".format(ns, | 211 '\n'.join(["- namespace {}{}".format(ns, |
190 "" if not attributes else " with filtering on {} attribute(s)".format( | 212 "" if not attributes else " with filtering on {} attribute(s)".format( |
191 ", ".join(attributes))) for ns, attributes in list(delegated.items())]))) | 213 ", ".join(attributes))) for ns, attributes in list(delegated.items())]))) |
192 | 214 |
193 if not pubsub.NS_PUBSUB in delegated: | 215 if not pubsub.NS_PUBSUB in delegated: |
194 log.msg("Didn't got pubsub delegation from server, can't act as a PEP service") | 216 log.msg( |
217 "Didn't got pubsub delegation from server, can't act as a PEP service" | |
218 ) | |
219 | |
220 def registerSendHook( | |
221 self, | |
222 iq_elt: domish.Element, | |
223 callback: Callable[[domish.Element, domish.Element, ...], None], | |
224 *args | |
225 ) -> None: | |
226 """Register a methode to call when an IQ element response is received | |
227 | |
228 If no result is received before SEND_HOOK_TIMEOUT seconds, the hook is deleted | |
229 @param iq_elt: source IQ element sent. Its "id" attribute will be used to track | |
230 response | |
231 @param callback: method to call when answer is seen | |
232 Will be called with: | |
233 - original IQ request | |
234 - received IQ result (or error) | |
235 - optional extra arguments | |
236 self.xs_send should be used to send final result | |
237 @param *args: argument to use with callback | |
238 """ | |
239 iq_id = iq_elt["id"] | |
240 timeout = reactor.callLater( | |
241 SEND_HOOK_TIMEOUT, self._current_iqs.pop, (iq_id, None) | |
242 ) | |
243 self._current_iqs[iq_id] = (iq_elt, callback, args, timeout) | |
195 | 244 |
196 def onForward(self, iq): | 245 def onForward(self, iq): |
197 """Manage forwarded iq | 246 """Manage forwarded iq |
198 | 247 |
199 @param iq(domish.Element): full delegation stanza | 248 @param iq(domish.Element): full delegation stanza |
208 forwarded_elt = next(delegation_elt.elements(FORWARDED_NS, 'forwarded')) | 257 forwarded_elt = next(delegation_elt.elements(FORWARDED_NS, 'forwarded')) |
209 fwd_iq = next(forwarded_elt.elements('jabber:client', 'iq')) | 258 fwd_iq = next(forwarded_elt.elements('jabber:client', 'iq')) |
210 except StopIteration: | 259 except StopIteration: |
211 raise error.StanzaError('not-acceptable') | 260 raise error.StanzaError('not-acceptable') |
212 | 261 |
213 managed_entity = jid.JID(fwd_iq['from']) | 262 self.registerSendHook(fwd_iq, self.delegatedResult, iq) |
214 | |
215 self._current_iqs[fwd_iq['id']] = (iq, managed_entity) | |
216 fwd_iq.delegated = True | 263 fwd_iq.delegated = True |
217 | 264 |
218 # we need a recipient in pubsub request for PEP | 265 # we need a recipient in pubsub request for PEP |
219 # so we set "to" attribute if it doesn't exist | 266 # so we set "to" attribute if it doesn't exist |
220 if not fwd_iq.hasAttribute('to'): | 267 if not fwd_iq.hasAttribute('to'): |