comparison libervia/backend/plugins/plugin_xep_0465.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200
parents sat/plugins/plugin_xep_0465.py@524856bd7b19
children 0d7bb4df2343
comparison
equal deleted inserted replaced
4070:d10748475025 4071:4b842c1fb686
1 #!/usr/bin/env python3
2
3 # Libervia plugin for XEP-0465
4 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
5
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU Affero General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU Affero General Public License for more details.
15
16 # You should have received a copy of the GNU Affero General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19 from typing import Optional, List, Dict, Union
20
21 from twisted.words.protocols.jabber.xmlstream import XMPPHandler
22 from twisted.words.protocols.jabber import jid
23 from twisted.words.protocols.jabber import error
24 from twisted.words.xish import domish
25 from zope.interface import implementer
26 from wokkel import disco, iwokkel
27
28 from libervia.backend.core.constants import Const as C
29 from libervia.backend.core.i18n import _
30 from libervia.backend.core.log import getLogger
31 from libervia.backend.core import exceptions
32 from libervia.backend.core.core_types import SatXMPPEntity
33 from libervia.backend.tools import utils
34 from libervia.backend.tools.common import data_format
35
36 log = getLogger(__name__)
37
38 PLUGIN_INFO = {
39 C.PI_NAME: "Pubsub Public Subscriptions",
40 C.PI_IMPORT_NAME: "XEP-0465",
41 C.PI_TYPE: C.PLUG_TYPE_XEP,
42 C.PI_MODES: C.PLUG_MODE_BOTH,
43 C.PI_PROTOCOLS: ["XEP-0465"],
44 C.PI_DEPENDENCIES: ["XEP-0060", "XEP-0376"],
45 C.PI_MAIN: "XEP_0465",
46 C.PI_HANDLER: "yes",
47 C.PI_DESCRIPTION: _("""Pubsub Public Subscriptions implementation"""),
48 }
49
50 NS_PPS = "urn:xmpp:pps:0"
51 NS_PPS_SUBSCRIPTIONS = "urn:xmpp:pps:subscriptions:0"
52 NS_PPS_SUBSCRIBERS = "urn:xmpp:pps:subscribers:0"
53 SUBSCRIBERS_NODE_PREFIX = f"{NS_PPS_SUBSCRIBERS}/"
54 NOT_IMPLEMENTED_MSG = (
55 "The service at {service!s} doesn't seem to support Pubsub Public Subscriptions "
56 "(XEP-0465), please request support from your service administrator."
57 )
58
59
60 class XEP_0465:
61
62 def __init__(self, host):
63 log.info(_("Pubsub Public Subscriptions initialization"))
64 host.register_namespace("pps", NS_PPS)
65 self.host = host
66 host.bridge.add_method(
67 "ps_public_subscriptions_get",
68 ".plugin",
69 in_sign="sss",
70 out_sign="s",
71 method=self._subscriptions,
72 async_=True,
73 )
74 host.bridge.add_method(
75 "ps_public_subscriptions_get",
76 ".plugin",
77 in_sign="sss",
78 out_sign="s",
79 method=self._subscriptions,
80 async_=True,
81 )
82 host.bridge.add_method(
83 "ps_public_node_subscriptions_get",
84 ".plugin",
85 in_sign="sss",
86 out_sign="a{ss}",
87 method=self._get_public_node_subscriptions,
88 async_=True,
89 )
90
91 def get_handler(self, client):
92 return XEP_0465_Handler()
93
94 @property
95 def subscriptions_node(self) -> str:
96 return NS_PPS_SUBSCRIPTIONS
97
98 @property
99 def subscribers_node_prefix(self) -> str:
100 return SUBSCRIBERS_NODE_PREFIX
101
102 def build_subscription_elt(self, node: str, service: jid.JID) -> domish.Element:
103 """Generate a <subscriptions> element
104
105 This is the element that a service returns on public subscriptions request
106 """
107 subscription_elt = domish.Element((NS_PPS, "subscription"))
108 subscription_elt["node"] = node
109 subscription_elt["service"] = service.full()
110 return subscription_elt
111
112 def build_subscriber_elt(self, subscriber: jid.JID) -> domish.Element:
113 """Generate a <subscriber> element
114
115 This is the element that a service returns on node public subscriptions request
116 """
117 subscriber_elt = domish.Element((NS_PPS, "subscriber"))
118 subscriber_elt["jid"] = subscriber.full()
119 return subscriber_elt
120
121 @utils.ensure_deferred
122 async def _subscriptions(
123 self,
124 service="",
125 nodeIdentifier="",
126 profile_key=C.PROF_KEY_NONE
127 ) -> str:
128 client = self.host.get_client(profile_key)
129 service = None if not service else jid.JID(service)
130 subs = await self.subscriptions(client, service, nodeIdentifier or None)
131 return data_format.serialise(subs)
132
133 async def subscriptions(
134 self,
135 client: SatXMPPEntity,
136 service: Optional[jid.JID] = None,
137 node: Optional[str] = None
138 ) -> List[Dict[str, Union[str, bool]]]:
139 """Retrieve public subscriptions from a service
140
141 @param service(jid.JID): PubSub service
142 @param nodeIdentifier(unicode, None): node to filter
143 None to get all subscriptions
144 """
145 if service is None:
146 service = client.jid.userhostJID()
147 try:
148 items, __ = await self.host.plugins["XEP-0060"].get_items(
149 client, service, NS_PPS_SUBSCRIPTIONS
150 )
151 except error.StanzaError as e:
152 if e.condition == "forbidden":
153 log.warning(NOT_IMPLEMENTED_MSG.format(service=service))
154 return []
155 else:
156 raise e
157 ret = []
158 for item in items:
159 try:
160 subscription_elt = next(item.elements(NS_PPS, "subscription"))
161 except StopIteration:
162 log.warning(f"no <subscription> element found: {item.toXml()}")
163 continue
164
165 try:
166 sub_dict = {
167 "service": subscription_elt["service"],
168 "node": subscription_elt["node"],
169 "subscriber": service.full(),
170 "state": subscription_elt.getAttribute("subscription", "subscribed"),
171 }
172 except KeyError:
173 log.warning(
174 f"invalid <subscription> element: {subscription_elt.toXml()}"
175 )
176 continue
177 if node is not None and sub_dict["node"] != node:
178 # if not is specified, we filter out any other node
179 # FIXME: should node filtering be done by server?
180 continue
181 ret.append(sub_dict)
182 return ret
183
184 @utils.ensure_deferred
185 async def _get_public_node_subscriptions(
186 self,
187 service: str,
188 node: str,
189 profile_key: str
190 ) -> Dict[str, str]:
191 client = self.host.get_client(profile_key)
192 subs = await self.get_public_node_subscriptions(
193 client, jid.JID(service) if service else None, node
194 )
195 return {j.full(): a for j, a in subs.items()}
196
197 def get_public_subscribers_node(self, node: str) -> str:
198 """Return prefixed node to retrieve public subscribers"""
199 return f"{NS_PPS_SUBSCRIBERS}/{node}"
200
201 async def get_public_node_subscriptions(
202 self,
203 client: SatXMPPEntity,
204 service: Optional[jid.JID],
205 nodeIdentifier: str
206 ) -> Dict[jid.JID, str]:
207 """Retrieve public subscriptions to a node
208
209 @param nodeIdentifier(unicode): node to get subscriptions from
210 """
211 if not nodeIdentifier:
212 raise exceptions.DataError("node identifier can't be empty")
213
214 if service is None:
215 service = client.jid.userhostJID()
216
217 subscribers_node = self.get_public_subscribers_node(nodeIdentifier)
218
219 try:
220 items, __ = await self.host.plugins["XEP-0060"].get_items(
221 client, service, subscribers_node
222 )
223 except error.StanzaError as e:
224 if e.condition == "forbidden":
225 log.warning(NOT_IMPLEMENTED_MSG.format(service=service))
226 return {}
227 else:
228 raise e
229 ret = {}
230 for item in items:
231 try:
232 subscriber_elt = next(item.elements(NS_PPS, "subscriber"))
233 except StopIteration:
234 log.warning(f"no <subscriber> element found: {item.toXml()}")
235 continue
236
237 try:
238 ret[jid.JID(subscriber_elt["jid"])] = "subscribed"
239 except (KeyError, RuntimeError):
240 log.warning(
241 f"invalid <subscriber> element: {subscriber_elt.toXml()}"
242 )
243 continue
244 return ret
245
246 def set_public_opt(self, options: Optional[dict] = None) -> dict:
247 """Set option to make a subscription public
248
249 @param options: dict where the option must be set
250 if None, a new dict will be created
251
252 @return: the options dict
253 """
254 if options is None:
255 options = {}
256 options[f'{{{NS_PPS}}}public'] = True
257 return options
258
259
260 @implementer(iwokkel.IDisco)
261 class XEP_0465_Handler(XMPPHandler):
262
263 def getDiscoInfo(self, requestor, service, nodeIdentifier=""):
264 return [disco.DiscoFeature(NS_PPS)]
265
266 def getDiscoItems(self, requestor, service, nodeIdentifier=""):
267 return []