comparison sat_pubsub/delegation.py @ 478:b544109ab4c4

Privileged Entity update + Pubsub Account Management partial implementation + Public Pubsub Subscription /!\ pgsql schema needs to be updated /!\ /!\ server conf needs to be updated for privileged entity: only the new `urn:xmpp:privilege:2` namespace is handled now /!\ Privileged entity has been updated to hanlde the new namespace and IQ permission. Roster pushes are not managed yet. XEP-0376 (Pubsub Account Management) is partially implemented. The XEP is not fully specified at the moment, and my messages on standard@ haven't seen any reply. Thus for now only "Subscribing", "Unsubscribing" and "Listing Subscriptions" is implemented, "Auto Subscriptions" and "Filtering" is not. Public Pubsub Subscription (https://xmpp.org/extensions/inbox/pubsub-public-subscriptions.html) is implemented; the XEP has been accepted by council but is not yet published. It will be updated to use subscription options instead of the <public> element actually specified, I'm waiting for publication to update the XEP. unsubscribe has been updated to return the `<subscription>` element as expected by XEP-0060 (sat_tmp needs to be updated). database schema has been updated to add columns necessary to keep track of subscriptions to external nodes and to mark subscriptions as public.
author Goffi <goffi@goffi.org>
date Wed, 11 May 2022 13:39:08 +0200
parents 607616f9ef5b
children
comparison
equal deleted inserted replaced
477:9125a6e440c0 478:b544109ab4c4
18 18
19 # --- 19 # ---
20 20
21 # This module implements XEP-0355 (Namespace delegation) to use SàT Pubsub as PEP service 21 # This module implements XEP-0355 (Namespace delegation) to use SàT Pubsub as PEP service
22 22
23 from wokkel.subprotocols import XMPPHandler 23 from typing import Callable, Any
24
25 from twisted.python import log
26 from twisted.internet import reactor, defer
27 from twisted.words.protocols.jabber import error, jid
28 from twisted.words.protocols.jabber import xmlstream
29 from twisted.words.xish import domish
24 from wokkel import pubsub 30 from wokkel import pubsub
25 from wokkel import data_form 31 from wokkel import data_form
26 from wokkel import disco, iwokkel, generic 32 from wokkel import disco, iwokkel
33 from wokkel import mam
27 from wokkel.iwokkel import IPubSubService 34 from wokkel.iwokkel import IPubSubService
28 from wokkel import mam 35 from wokkel.subprotocols import XMPPHandler
29 from twisted.python import log
30 from twisted.words.protocols.jabber import ijabber, jid, error
31 from twisted.words.protocols.jabber.xmlstream import toResponse
32 from twisted.words.xish import domish
33 from zope.interface import implementer 36 from zope.interface import implementer
34 37
35 DELEGATION_NS = 'urn:xmpp:delegation:2' 38 DELEGATION_NS = 'urn:xmpp:delegation:2'
36 FORWARDED_NS = 'urn:xmpp:forward:0' 39 FORWARDED_NS = 'urn:xmpp:forward:0'
37 DELEGATION_ADV_XPATH = '/message/delegation[@xmlns="{}"]'.format(DELEGATION_NS) 40 DELEGATION_ADV_XPATH = '/message/delegation[@xmlns="{}"]'.format(DELEGATION_NS)
38 DELEGATION_FWD_XPATH = '/iq[@type="set"]/delegation[@xmlns="{}"]/forwarded[@xmlns="{}"]'.format(DELEGATION_NS, FORWARDED_NS) 41 DELEGATION_FWD_XPATH = '/iq[@type="set"]/delegation[@xmlns="{}"]/forwarded[@xmlns="{}"]'.format(DELEGATION_NS, FORWARDED_NS)
39 42
40 DELEGATION_MAIN_SEP = "::" 43 DELEGATION_MAIN_SEP = "::"
41 DELEGATION_BARE_SEP = ":bare:" 44 DELEGATION_BARE_SEP = ":bare:"
42 45
46 SEND_HOOK_TIMEOUT = 300
43 TO_HACK = ((IPubSubService, pubsub, "PubSubRequest"), 47 TO_HACK = ((IPubSubService, pubsub, "PubSubRequest"),
44 (mam.IMAMService, mam, "MAMRequest"), 48 (mam.IMAMService, mam, "MAMRequest"),
45 (None, disco, "_DiscoRequest")) 49 (None, disco, "_DiscoRequest"))
46 50
47 51
106 self.backend = self.parent.parent.getServiceNamed('backend') 110 self.backend = self.parent.parent.getServiceNamed('backend')
107 if not self._service_hacked: 111 if not self._service_hacked:
108 self._service_hack() 112 self._service_hack()
109 self.xmlstream.addObserver(DELEGATION_ADV_XPATH, self.onAdvertise) 113 self.xmlstream.addObserver(DELEGATION_ADV_XPATH, self.onAdvertise)
110 self.xmlstream.addObserver(DELEGATION_FWD_XPATH, self._obsWrapper, 0, self.onForward) 114 self.xmlstream.addObserver(DELEGATION_FWD_XPATH, self._obsWrapper, 0, self.onForward)
111 self._current_iqs = {} # dict of iq being handler by delegation 115 self._current_iqs = {} # dict of iq being handled by delegation
112 self._xs_send = self.xmlstream.send 116 self.xs_send = self.xmlstream.send
113 self.xmlstream.send = self._sendHack 117 self.xmlstream.send = self._sendHack
118
119 def delegatedResult(
120 self,
121 iq_req_elt: domish.Element,
122 iq_resp_elt: domish.Element,
123 wrapping_iq_elt: domish.Element
124 ) -> None:
125 """Method called when a result to a delegated stanza has been received
126
127 The result is wrapped and sent back to server
128 """
129 iq_result_elt = xmlstream.toResponse(wrapping_iq_elt, 'result')
130 fwd_elt = iq_result_elt.addElement(
131 'delegation', DELEGATION_NS
132 ).addElement('forwarded', FORWARDED_NS)
133 fwd_elt.addChild(iq_resp_elt)
134 iq_resp_elt.uri = iq_resp_elt.defaultUri = 'jabber:client'
135 self.xs_send(iq_result_elt)
114 136
115 def _sendHack(self, elt): 137 def _sendHack(self, elt):
116 """This method is called instead of xmlstream to control sending 138 """This method is called instead of xmlstream to control sending
117 139
118 @param obj(domsish.Element, unicode, str): obj sent to real xmlstream 140 @param obj(domsish.Element, unicode, str): obj sent to real xmlstream
119 """ 141 """
120 if isinstance(elt, domish.Element) and elt.name=='iq': 142 if isinstance(elt, domish.Element) and elt.name=='iq':
121 try: 143 try:
122 id_ = elt.getAttribute('id') 144 iq_id = elt["id"]
123 ori_iq, managed_entity = self._current_iqs[id_] 145 iq_req_elt, callback, cb_args, timeout = self._current_iqs[iq_id]
124 if jid.JID(elt['to']) != managed_entity: 146 if elt['to'] != iq_req_elt["from"]:
125 log.msg("IQ id conflict: the managed entity doesn't match (got {got} was expecting {expected})" 147 log.err(
126 .format(got=jid.JID(elt['to']), expected=managed_entity)) 148 "IQ id conflict: the managed entity doesn't match (got "
149 f"{elt['to']!r} and was expecting {iq_req_elt['from']!r})"
150 )
127 raise KeyError 151 raise KeyError
128 except KeyError: 152 except KeyError:
129 # the iq is not a delegated one 153 # the iq is not a delegated one
130 self._xs_send(elt) 154 self.xs_send(elt)
131 else: 155 else:
132 del self._current_iqs[id_] 156 if not timeout.called:
133 iq_result_elt = toResponse(ori_iq, 'result') 157 timeout.cancel()
134 fwd_elt = iq_result_elt.addElement('delegation', DELEGATION_NS).addElement('forwarded', FORWARDED_NS) 158 del self._current_iqs[iq_id]
135 fwd_elt.addChild(elt) 159 callback(iq_req_elt, elt, *cb_args)
136 elt.uri = elt.defaultUri = 'jabber:client'
137 self._xs_send(iq_result_elt)
138 else: 160 else:
139 self._xs_send(elt) 161 self.xs_send(elt)
140 162
141 def _obsWrapper(self, observer, stanza): 163 def _obsWrapper(self, observer, stanza):
142 """Wrapper to observer which catch StanzaError 164 """Wrapper to observer which catch StanzaError
143 165
144 @param observer(callable): method to wrap 166 @param observer(callable): method to wrap
145 """ 167 """
146 try: 168 try:
147 observer(stanza) 169 observer(stanza)
148 except error.StanzaError as e: 170 except error.StanzaError as e:
149 error_elt = e.toResponse(stanza) 171 error_elt = e.toResponse(stanza)
150 self._xs_send(error_elt) 172 self.xs_send(error_elt)
151 stanza.handled = True 173 stanza.handled = True
152 174
153 def onAdvertise(self, message): 175 def onAdvertise(self, message):
154 """Manage the <message/> advertising delegations""" 176 """Manage the <message/> advertising delegations"""
155 if self.backend.config["server_jid"] is None: 177 if self.backend.config["server_jid"] is None:
184 raise InvalidStanza('was expecting a "name" attribute in attribute element') 206 raise InvalidStanza('was expecting a "name" attribute in attribute element')
185 except InvalidStanza as e: 207 except InvalidStanza as e:
186 log.msg("Invalid stanza received ({})".format(e)) 208 log.msg("Invalid stanza received ({})".format(e))
187 209
188 log.msg('delegations updated:\n{}'.format( 210 log.msg('delegations updated:\n{}'.format(
189 '\n'.join([" - namespace {}{}".format(ns, 211 '\n'.join(["- namespace {}{}".format(ns,
190 "" if not attributes else " with filtering on {} attribute(s)".format( 212 "" if not attributes else " with filtering on {} attribute(s)".format(
191 ", ".join(attributes))) for ns, attributes in list(delegated.items())]))) 213 ", ".join(attributes))) for ns, attributes in list(delegated.items())])))
192 214
193 if not pubsub.NS_PUBSUB in delegated: 215 if not pubsub.NS_PUBSUB in delegated:
194 log.msg("Didn't got pubsub delegation from server, can't act as a PEP service") 216 log.msg(
217 "Didn't got pubsub delegation from server, can't act as a PEP service"
218 )
219
220 def registerSendHook(
221 self,
222 iq_elt: domish.Element,
223 callback: Callable[[domish.Element, domish.Element, ...], None],
224 *args
225 ) -> None:
226 """Register a methode to call when an IQ element response is received
227
228 If no result is received before SEND_HOOK_TIMEOUT seconds, the hook is deleted
229 @param iq_elt: source IQ element sent. Its "id" attribute will be used to track
230 response
231 @param callback: method to call when answer is seen
232 Will be called with:
233 - original IQ request
234 - received IQ result (or error)
235 - optional extra arguments
236 self.xs_send should be used to send final result
237 @param *args: argument to use with callback
238 """
239 iq_id = iq_elt["id"]
240 timeout = reactor.callLater(
241 SEND_HOOK_TIMEOUT, self._current_iqs.pop, (iq_id, None)
242 )
243 self._current_iqs[iq_id] = (iq_elt, callback, args, timeout)
195 244
196 def onForward(self, iq): 245 def onForward(self, iq):
197 """Manage forwarded iq 246 """Manage forwarded iq
198 247
199 @param iq(domish.Element): full delegation stanza 248 @param iq(domish.Element): full delegation stanza
208 forwarded_elt = next(delegation_elt.elements(FORWARDED_NS, 'forwarded')) 257 forwarded_elt = next(delegation_elt.elements(FORWARDED_NS, 'forwarded'))
209 fwd_iq = next(forwarded_elt.elements('jabber:client', 'iq')) 258 fwd_iq = next(forwarded_elt.elements('jabber:client', 'iq'))
210 except StopIteration: 259 except StopIteration:
211 raise error.StanzaError('not-acceptable') 260 raise error.StanzaError('not-acceptable')
212 261
213 managed_entity = jid.JID(fwd_iq['from']) 262 self.registerSendHook(fwd_iq, self.delegatedResult, iq)
214
215 self._current_iqs[fwd_iq['id']] = (iq, managed_entity)
216 fwd_iq.delegated = True 263 fwd_iq.delegated = True
217 264
218 # we need a recipient in pubsub request for PEP 265 # we need a recipient in pubsub request for PEP
219 # so we set "to" attribute if it doesn't exist 266 # so we set "to" attribute if it doesn't exist
220 if not fwd_iq.hasAttribute('to'): 267 if not fwd_iq.hasAttribute('to'):