Mercurial > libervia-pubsub
comparison sat_pubsub/pam.py @ 478:b544109ab4c4
Privileged Entity update + Pubsub Account Management partial implementation + Public Pubsub Subscription
/!\ pgsql schema needs to be updated /!\
/!\ server conf needs to be updated for privileged entity: only the new
`urn:xmpp:privilege:2` namespace is handled now /!\
Privileged entity has been updated to hanlde the new namespace and IQ permission. Roster
pushes are not managed yet.
XEP-0376 (Pubsub Account Management) is partially implemented. The XEP is not fully
specified at the moment, and my messages on standard@ haven't seen any reply. Thus for now
only "Subscribing", "Unsubscribing" and "Listing Subscriptions" is implemented, "Auto
Subscriptions" and "Filtering" is not.
Public Pubsub Subscription
(https://xmpp.org/extensions/inbox/pubsub-public-subscriptions.html) is implemented;
the XEP has been accepted by council but is not yet published. It will be updated to use
subscription options instead of the <public> element actually specified, I'm waiting
for publication to update the XEP.
unsubscribe has been updated to return the `<subscription>` element as expected by
XEP-0060 (sat_tmp needs to be updated).
database schema has been updated to add columns necessary to keep track of subscriptions
to external nodes and to mark subscriptions as public.
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 11 May 2022 13:39:08 +0200 |
parents | |
children | 23a51b139024 |
comparison
equal
deleted
inserted
replaced
477:9125a6e440c0 | 478:b544109ab4c4 |
---|---|
1 #!/usr/bin/env python3 | |
2 # | |
3 # Copyright (c) 2015-2022 Jérôme Poisson | |
4 | |
5 | |
6 # This program is free software: you can redistribute it and/or modify | |
7 # it under the terms of the GNU Affero General Public License as published by | |
8 # the Free Software Foundation, either version 3 of the License, or | |
9 # (at your option) any later version. | |
10 | |
11 # This program is distributed in the hope that it will be useful, | |
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
14 # GNU Affero General Public License for more details. | |
15 | |
16 # You should have received a copy of the GNU Affero General Public License | |
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
18 | |
19 "This module implements XEP-0376 (Pubsub Account Management)" | |
20 | |
21 from typing import Optional | |
22 | |
23 from twisted.internet import defer | |
24 from twisted.python import log | |
25 from twisted.words.protocols.jabber import jid, xmlstream, error as jabber_error | |
26 from twisted.words.xish import domish | |
27 from wokkel import disco, iwokkel, pubsub, data_form | |
28 from wokkel.iwokkel import IPubSubService | |
29 from zope.interface import implementer | |
30 | |
31 from sat_pubsub import error | |
32 | |
33 NS_PAM = "urn:xmpp:pam:0" | |
34 PAM_SUB_XPATH = ( | |
35 f'/iq[@type="set"]/pam[@xmlns="{NS_PAM}"]/subscribe[@xmlns="{pubsub.NS_PUBSUB}"]' | |
36 ) | |
37 PAM_UNSUB_XPATH = ( | |
38 f'/iq[@type="set"]/pam[@xmlns="{NS_PAM}"]/unsubscribe[@xmlns="{pubsub.NS_PUBSUB}"]' | |
39 ) | |
40 PAM_SUBSCRIPTIONS_XPATH = ( | |
41 f'/iq[@type="get"]/subscriptions[@xmlns="{NS_PAM}"]' | |
42 ) | |
43 | |
44 | |
45 @implementer(iwokkel.IDisco) | |
46 class PAMHandler(disco.DiscoClientProtocol): | |
47 | |
48 def __init__(self, service_jid): | |
49 super(PAMHandler, self).__init__() | |
50 self.backend = None | |
51 | |
52 def connectionInitialized(self): | |
53 for handler in self.parent.handlers: | |
54 if IPubSubService.providedBy(handler): | |
55 self._pubsub_service = handler | |
56 break | |
57 self.backend = self.parent.parent.getServiceNamed('backend') | |
58 self.xmlstream.addObserver(PAM_SUB_XPATH, self._onSubscribe) | |
59 self.xmlstream.addObserver(PAM_UNSUB_XPATH, self._onUnsubscribe) | |
60 self.xmlstream.addObserver(PAM_SUBSCRIPTIONS_XPATH, self._onSubscriptions) | |
61 | |
62 def getServerUser(self, iq_elt: domish.Element) -> Optional[jid.JID]: | |
63 """Get JID of sender if it's a user from our server | |
64 | |
65 If it's a user from an external server, None is returned and a message is log | |
66 """ | |
67 from_jid = jid.JID(iq_elt["from"]) | |
68 if not self.backend.isFromServer(from_jid): | |
69 log.msg(f"ignoring PAM request from external user: {iq_elt.toXml()}") | |
70 else: | |
71 return jid.JID(iq_elt["from"]) | |
72 | |
73 def onSubscribeResult(self, iq_req_elt, iq_result_elt, pam_iq_elt): | |
74 destinee_jid = jid.JID(iq_req_elt["from"]) | |
75 sender_jid = jid.JID(iq_req_elt["to"]) | |
76 message_elt = domish.Element((None, "message")) | |
77 message_elt["to"] = destinee_jid.userhost() | |
78 message_elt["from"] = destinee_jid.userhost() | |
79 # XXX: we explicitely store the notification to be sure that all clients get it | |
80 message_elt.addElement(("urn:xmpp:hints", "store")) | |
81 notify_elt = message_elt.addElement((NS_PAM, "notify")) | |
82 notify_elt["service"] = sender_jid.full() | |
83 notify_elt.addChild(iq_result_elt.pubsub.subscription) | |
84 self.backend.privilege.sendMessage(message_elt) | |
85 pam_iq_result_elt = xmlstream.toResponse(pam_iq_elt, 'result') | |
86 self.xmlstream.send(pam_iq_result_elt) | |
87 | |
88 async def onSubRequest(self, from_jid, iq_elt, subscribe=True): | |
89 try: | |
90 service_jid = jid.JID(iq_elt.pam.getAttribute("jid")) | |
91 except RuntimeError: | |
92 log.msg( | |
93 f'Invalid PAM element (missing "jid" attribute): {iq_elt.toXml()}' | |
94 ) | |
95 return | |
96 iq_elt.handled = True | |
97 new_iq_elt = domish.Element((None, "iq")) | |
98 new_iq_elt["from"] = from_jid.userhost() | |
99 new_iq_elt["to"] = service_jid.full() | |
100 new_iq_elt.addUniqueId() | |
101 new_iq_elt["type"] = "set" | |
102 new_pubsub_elt = new_iq_elt.addElement((pubsub.NS_PUBSUB, "pubsub")) | |
103 new_pubsub_elt.addChild( | |
104 iq_elt.pam.subscribe if subscribe else iq_elt.pam.unsubscribe | |
105 ) | |
106 try: | |
107 options_elt = next(iq_elt.pam.elements(pubsub.NS_PUBSUB, "options")) | |
108 except StopIteration: | |
109 options_elt = None | |
110 else: | |
111 new_pubsub_elt.addChild(options_elt) | |
112 | |
113 if self.backend.isFromServer(service_jid): | |
114 # the request is handled locally | |
115 new_iq_elt.delegated = True | |
116 self.backend.delegation.registerSendHook( | |
117 new_iq_elt, self.onSubscribeResult, iq_elt | |
118 ) | |
119 self.xmlstream.dispatch(new_iq_elt) | |
120 else: | |
121 # this is a (un)subscribe request to an external server | |
122 sub_result_elt = await self.backend.privilege.sendIQ(new_iq_elt) | |
123 if sub_result_elt["type"] == "result": | |
124 if subscribe: | |
125 node = new_iq_elt.pubsub.subscribe["node"] | |
126 state = sub_result_elt.pubsub.subscription.getAttribute( | |
127 "subscription", "subscribed" | |
128 ) | |
129 public = False | |
130 if options_elt is not None: | |
131 options_form = data_form.findForm( | |
132 options_elt, pubsub.NS_PUBSUB_SUBSCRIBE_OPTIONS | |
133 ) | |
134 if options_form is not None: | |
135 public = options_form.get(f"{{{const.NS_PPS}}}public", False) | |
136 await self.backend.storage.addExternalSubscription( | |
137 from_jid.userhostJID(), | |
138 service_jid, | |
139 node, | |
140 state, | |
141 public | |
142 ) | |
143 else: | |
144 node = new_iq_elt.pubsub.unsubscribe["node"] | |
145 try: | |
146 await self.backend.storage.removeExternalSubscription( | |
147 from_jid.userhostJID(), | |
148 service_jid, | |
149 node, | |
150 ) | |
151 except error.NotSubscribed: | |
152 pass | |
153 self.onSubscribeResult(new_iq_elt, sub_result_elt, iq_elt) | |
154 | |
155 def _onSubscribe(self, iq_elt: domish.Element) -> None: | |
156 if not iq_elt.delegated: | |
157 return | |
158 from_jid = self.getServerUser(iq_elt) | |
159 if from_jid is not None: | |
160 defer.ensureDeferred(self.onSubRequest(from_jid, iq_elt)) | |
161 | |
162 def _onUnsubscribe(self, iq_elt: domish.Element) -> None: | |
163 if not iq_elt.delegated: | |
164 return | |
165 from_jid = self.getServerUser(iq_elt) | |
166 if from_jid is not None: | |
167 defer.ensureDeferred(self.onSubRequest(from_jid, iq_elt, subscribe=False)) | |
168 | |
169 async def onSubscriptions(self, from_jid: jid.JID, iq_elt: domish.Element) -> None: | |
170 iq_elt.handled = True | |
171 try: | |
172 subs = await self.backend.storage.getAllSubscriptions(from_jid) | |
173 except Exception as e: | |
174 error_elt = jabber_error.StanzaError( | |
175 "internal-server-error", | |
176 text=str(e) | |
177 ).toResponse(iq_elt) | |
178 self.xmlstream.send(error_elt) | |
179 else: | |
180 result_elt = xmlstream.toResponse(iq_elt, "result") | |
181 subscriptions_elt = result_elt.addElement((NS_PAM, "subscriptions")) | |
182 for sub in subs: | |
183 self.backend.addEltFromSubDict(subscriptions_elt, from_jid, sub) | |
184 self.xmlstream.send(result_elt) | |
185 | |
186 def _onSubscriptions(self, iq_elt: domish.Element) -> None: | |
187 if not iq_elt.delegated: | |
188 return | |
189 from_jid = self.getServerUser(iq_elt) | |
190 if from_jid is not None: | |
191 defer.ensureDeferred(self.onSubscriptions(from_jid, iq_elt)) | |
192 | |
193 def getDiscoInfo(self, requestor, service, nodeIdentifier=''): | |
194 return [disco.DiscoFeature(NS_PAM)] | |
195 | |
196 def getDiscoItems(self, requestor, service, nodeIdentifier=''): | |
197 return [] |