comparison sat_pubsub/pam.py @ 478:b544109ab4c4

Privileged Entity update + Pubsub Account Management partial implementation + Public Pubsub Subscription /!\ pgsql schema needs to be updated /!\ /!\ server conf needs to be updated for privileged entity: only the new `urn:xmpp:privilege:2` namespace is handled now /!\ Privileged entity has been updated to hanlde the new namespace and IQ permission. Roster pushes are not managed yet. XEP-0376 (Pubsub Account Management) is partially implemented. The XEP is not fully specified at the moment, and my messages on standard@ haven't seen any reply. Thus for now only "Subscribing", "Unsubscribing" and "Listing Subscriptions" is implemented, "Auto Subscriptions" and "Filtering" is not. Public Pubsub Subscription (https://xmpp.org/extensions/inbox/pubsub-public-subscriptions.html) is implemented; the XEP has been accepted by council but is not yet published. It will be updated to use subscription options instead of the <public> element actually specified, I'm waiting for publication to update the XEP. unsubscribe has been updated to return the `<subscription>` element as expected by XEP-0060 (sat_tmp needs to be updated). database schema has been updated to add columns necessary to keep track of subscriptions to external nodes and to mark subscriptions as public.
author Goffi <goffi@goffi.org>
date Wed, 11 May 2022 13:39:08 +0200
parents
children 23a51b139024
comparison
equal deleted inserted replaced
477:9125a6e440c0 478:b544109ab4c4
1 #!/usr/bin/env python3
2 #
3 # Copyright (c) 2015-2022 Jérôme Poisson
4
5
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU Affero General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU Affero General Public License for more details.
15
16 # You should have received a copy of the GNU Affero General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19 "This module implements XEP-0376 (Pubsub Account Management)"
20
21 from typing import Optional
22
23 from twisted.internet import defer
24 from twisted.python import log
25 from twisted.words.protocols.jabber import jid, xmlstream, error as jabber_error
26 from twisted.words.xish import domish
27 from wokkel import disco, iwokkel, pubsub, data_form
28 from wokkel.iwokkel import IPubSubService
29 from zope.interface import implementer
30
31 from sat_pubsub import error
32
33 NS_PAM = "urn:xmpp:pam:0"
34 PAM_SUB_XPATH = (
35 f'/iq[@type="set"]/pam[@xmlns="{NS_PAM}"]/subscribe[@xmlns="{pubsub.NS_PUBSUB}"]'
36 )
37 PAM_UNSUB_XPATH = (
38 f'/iq[@type="set"]/pam[@xmlns="{NS_PAM}"]/unsubscribe[@xmlns="{pubsub.NS_PUBSUB}"]'
39 )
40 PAM_SUBSCRIPTIONS_XPATH = (
41 f'/iq[@type="get"]/subscriptions[@xmlns="{NS_PAM}"]'
42 )
43
44
45 @implementer(iwokkel.IDisco)
46 class PAMHandler(disco.DiscoClientProtocol):
47
48 def __init__(self, service_jid):
49 super(PAMHandler, self).__init__()
50 self.backend = None
51
52 def connectionInitialized(self):
53 for handler in self.parent.handlers:
54 if IPubSubService.providedBy(handler):
55 self._pubsub_service = handler
56 break
57 self.backend = self.parent.parent.getServiceNamed('backend')
58 self.xmlstream.addObserver(PAM_SUB_XPATH, self._onSubscribe)
59 self.xmlstream.addObserver(PAM_UNSUB_XPATH, self._onUnsubscribe)
60 self.xmlstream.addObserver(PAM_SUBSCRIPTIONS_XPATH, self._onSubscriptions)
61
62 def getServerUser(self, iq_elt: domish.Element) -> Optional[jid.JID]:
63 """Get JID of sender if it's a user from our server
64
65 If it's a user from an external server, None is returned and a message is log
66 """
67 from_jid = jid.JID(iq_elt["from"])
68 if not self.backend.isFromServer(from_jid):
69 log.msg(f"ignoring PAM request from external user: {iq_elt.toXml()}")
70 else:
71 return jid.JID(iq_elt["from"])
72
73 def onSubscribeResult(self, iq_req_elt, iq_result_elt, pam_iq_elt):
74 destinee_jid = jid.JID(iq_req_elt["from"])
75 sender_jid = jid.JID(iq_req_elt["to"])
76 message_elt = domish.Element((None, "message"))
77 message_elt["to"] = destinee_jid.userhost()
78 message_elt["from"] = destinee_jid.userhost()
79 # XXX: we explicitely store the notification to be sure that all clients get it
80 message_elt.addElement(("urn:xmpp:hints", "store"))
81 notify_elt = message_elt.addElement((NS_PAM, "notify"))
82 notify_elt["service"] = sender_jid.full()
83 notify_elt.addChild(iq_result_elt.pubsub.subscription)
84 self.backend.privilege.sendMessage(message_elt)
85 pam_iq_result_elt = xmlstream.toResponse(pam_iq_elt, 'result')
86 self.xmlstream.send(pam_iq_result_elt)
87
88 async def onSubRequest(self, from_jid, iq_elt, subscribe=True):
89 try:
90 service_jid = jid.JID(iq_elt.pam.getAttribute("jid"))
91 except RuntimeError:
92 log.msg(
93 f'Invalid PAM element (missing "jid" attribute): {iq_elt.toXml()}'
94 )
95 return
96 iq_elt.handled = True
97 new_iq_elt = domish.Element((None, "iq"))
98 new_iq_elt["from"] = from_jid.userhost()
99 new_iq_elt["to"] = service_jid.full()
100 new_iq_elt.addUniqueId()
101 new_iq_elt["type"] = "set"
102 new_pubsub_elt = new_iq_elt.addElement((pubsub.NS_PUBSUB, "pubsub"))
103 new_pubsub_elt.addChild(
104 iq_elt.pam.subscribe if subscribe else iq_elt.pam.unsubscribe
105 )
106 try:
107 options_elt = next(iq_elt.pam.elements(pubsub.NS_PUBSUB, "options"))
108 except StopIteration:
109 options_elt = None
110 else:
111 new_pubsub_elt.addChild(options_elt)
112
113 if self.backend.isFromServer(service_jid):
114 # the request is handled locally
115 new_iq_elt.delegated = True
116 self.backend.delegation.registerSendHook(
117 new_iq_elt, self.onSubscribeResult, iq_elt
118 )
119 self.xmlstream.dispatch(new_iq_elt)
120 else:
121 # this is a (un)subscribe request to an external server
122 sub_result_elt = await self.backend.privilege.sendIQ(new_iq_elt)
123 if sub_result_elt["type"] == "result":
124 if subscribe:
125 node = new_iq_elt.pubsub.subscribe["node"]
126 state = sub_result_elt.pubsub.subscription.getAttribute(
127 "subscription", "subscribed"
128 )
129 public = False
130 if options_elt is not None:
131 options_form = data_form.findForm(
132 options_elt, pubsub.NS_PUBSUB_SUBSCRIBE_OPTIONS
133 )
134 if options_form is not None:
135 public = options_form.get(f"{{{const.NS_PPS}}}public", False)
136 await self.backend.storage.addExternalSubscription(
137 from_jid.userhostJID(),
138 service_jid,
139 node,
140 state,
141 public
142 )
143 else:
144 node = new_iq_elt.pubsub.unsubscribe["node"]
145 try:
146 await self.backend.storage.removeExternalSubscription(
147 from_jid.userhostJID(),
148 service_jid,
149 node,
150 )
151 except error.NotSubscribed:
152 pass
153 self.onSubscribeResult(new_iq_elt, sub_result_elt, iq_elt)
154
155 def _onSubscribe(self, iq_elt: domish.Element) -> None:
156 if not iq_elt.delegated:
157 return
158 from_jid = self.getServerUser(iq_elt)
159 if from_jid is not None:
160 defer.ensureDeferred(self.onSubRequest(from_jid, iq_elt))
161
162 def _onUnsubscribe(self, iq_elt: domish.Element) -> None:
163 if not iq_elt.delegated:
164 return
165 from_jid = self.getServerUser(iq_elt)
166 if from_jid is not None:
167 defer.ensureDeferred(self.onSubRequest(from_jid, iq_elt, subscribe=False))
168
169 async def onSubscriptions(self, from_jid: jid.JID, iq_elt: domish.Element) -> None:
170 iq_elt.handled = True
171 try:
172 subs = await self.backend.storage.getAllSubscriptions(from_jid)
173 except Exception as e:
174 error_elt = jabber_error.StanzaError(
175 "internal-server-error",
176 text=str(e)
177 ).toResponse(iq_elt)
178 self.xmlstream.send(error_elt)
179 else:
180 result_elt = xmlstream.toResponse(iq_elt, "result")
181 subscriptions_elt = result_elt.addElement((NS_PAM, "subscriptions"))
182 for sub in subs:
183 self.backend.addEltFromSubDict(subscriptions_elt, from_jid, sub)
184 self.xmlstream.send(result_elt)
185
186 def _onSubscriptions(self, iq_elt: domish.Element) -> None:
187 if not iq_elt.delegated:
188 return
189 from_jid = self.getServerUser(iq_elt)
190 if from_jid is not None:
191 defer.ensureDeferred(self.onSubscriptions(from_jid, iq_elt))
192
193 def getDiscoInfo(self, requestor, service, nodeIdentifier=''):
194 return [disco.DiscoFeature(NS_PAM)]
195
196 def getDiscoItems(self, requestor, service, nodeIdentifier=''):
197 return []