Mercurial > libervia-backend
comparison sat/plugins/plugin_xep_0215.py @ 4033:5a42c7842556
core (plugins): implementation of XEP-0215 "External Service Discovery":
rel 418
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 07 Apr 2023 15:16:39 +0200 |
parents | |
children | 524856bd7b19 |
comparison
equal
deleted
inserted
replaced
4032:bb211f80c3e6 | 4033:5a42c7842556 |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 # Libervia plugin | |
4 # Copyright (C) 2009-2023 Jérôme Poisson (goffi@goffi.org) | |
5 | |
6 # This program is free software: you can redistribute it and/or modify | |
7 # it under the terms of the GNU Affero General Public License as published by | |
8 # the Free Software Foundation, either version 3 of the License, or | |
9 # (at your option) any later version. | |
10 | |
11 # This program is distributed in the hope that it will be useful, | |
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
14 # GNU Affero General Public License for more details. | |
15 | |
16 # You should have received a copy of the GNU Affero General Public License | |
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
18 | |
19 from typing import Dict, Final, List, Optional, Optional | |
20 | |
21 from twisted.internet import defer | |
22 from twisted.words.protocols.jabber import error, jid | |
23 from twisted.words.protocols.jabber.xmlstream import XMPPHandler | |
24 from twisted.words.xish import domish | |
25 from wokkel import data_form, disco, iwokkel | |
26 from zope.interface import implementer | |
27 | |
28 from sat.core import exceptions | |
29 from sat.core.constants import Const as C | |
30 from sat.core.core_types import SatXMPPEntity | |
31 from sat.core.i18n import _ | |
32 from sat.core.log import getLogger | |
33 from sat.tools import xml_tools | |
34 from sat.tools import utils | |
35 from sat.tools.common import data_format | |
36 | |
37 log = getLogger(__name__) | |
38 | |
39 | |
40 PLUGIN_INFO = { | |
41 C.PI_NAME: "External Service Discovery", | |
42 C.PI_IMPORT_NAME: "XEP-0215", | |
43 C.PI_TYPE: "XEP", | |
44 C.PI_MODES: C.PLUG_MODE_BOTH, | |
45 C.PI_PROTOCOLS: [], | |
46 C.PI_DEPENDENCIES: [], | |
47 C.PI_RECOMMENDATIONS: [], | |
48 C.PI_MAIN: "XEP_0215", | |
49 C.PI_HANDLER: "yes", | |
50 C.PI_DESCRIPTION: _("""Discover services external to the XMPP network"""), | |
51 } | |
52 | |
53 NS_EXTDISCO: Final = "urn:xmpp:extdisco:2" | |
54 IQ_PUSH: Final = f'{C.IQ_SET}/services[@xmlns="{NS_EXTDISCO}"]' | |
55 | |
56 | |
57 class XEP_0215: | |
58 def __init__(self, host): | |
59 log.info(_("External Service Discovery plugin initialization")) | |
60 self.host = host | |
61 host.bridge.addMethod( | |
62 "external_disco_get", | |
63 ".plugin", | |
64 in_sign="ss", | |
65 out_sign="s", | |
66 method=self._external_disco_get, | |
67 async_=True, | |
68 ) | |
69 host.bridge.addMethod( | |
70 "external_disco_credentials_get", | |
71 ".plugin", | |
72 in_sign="ssis", | |
73 out_sign="s", | |
74 method=self._external_disco_credentials_get, | |
75 async_=True, | |
76 ) | |
77 | |
78 def getHandler(self, client): | |
79 return XEP_0215_handler(self) | |
80 | |
81 async def profileConnecting(self, client: SatXMPPEntity) -> None: | |
82 client._xep_0215_services = {} | |
83 | |
84 def parse_services( | |
85 self, element: domish.Element, parent_elt_name: str = "services" | |
86 ) -> List[dict]: | |
87 """Retrieve services from element | |
88 | |
89 @param element: <[parent_elt_name]/> element or its parent | |
90 @param parent_elt_name: name of the parent element | |
91 can be "services" or "credentials" | |
92 @return: list of parsed services | |
93 """ | |
94 if parent_elt_name not in ("services", "credentials"): | |
95 raise exceptions.InternalError( | |
96 f"invalid parent_elt_name: {parent_elt_name!r}" | |
97 ) | |
98 if element.name == parent_elt_name and element.uri == NS_EXTDISCO: | |
99 services_elt = element | |
100 else: | |
101 try: | |
102 services_elt = next(element.elements(NS_EXTDISCO, parent_elt_name)) | |
103 except StopIteration: | |
104 raise exceptions.DataError( | |
105 f"XEP-0215 response is missing <{parent_elt_name}> element" | |
106 ) | |
107 | |
108 services = [] | |
109 for service_elt in services_elt.elements(NS_EXTDISCO, "service"): | |
110 service = {} | |
111 for key in [ | |
112 "action", | |
113 "expires", | |
114 "host", | |
115 "name", | |
116 "password", | |
117 "port", | |
118 "restricted", | |
119 "transport", | |
120 "type", | |
121 "username", | |
122 ]: | |
123 value = service_elt.getAttribute(key) | |
124 if value is not None: | |
125 if key == "expires": | |
126 try: | |
127 service[key] = utils.parse_xmpp_date(value) | |
128 except ValueError: | |
129 log.warning(f"invalid expiration date: {value!r}") | |
130 continue | |
131 elif key == "port": | |
132 try: | |
133 service[key] = int(value) | |
134 except ValueError: | |
135 log.warning(f"invalid port: {value!r}") | |
136 continue | |
137 elif key == "restricted": | |
138 service[key] = C.bool(value) | |
139 else: | |
140 service[key] = value | |
141 if not {"host", "type"}.issubset(service): | |
142 log.warning( | |
143 'mandatory "host" or "type" are missing in service, ignoring it: ' | |
144 "{service_elt.toXml()}" | |
145 ) | |
146 continue | |
147 for x_elt in service_elt.elements(data_form.NS_X_DATA, "x"): | |
148 form = data_form.Form.fromElement(x_elt) | |
149 extended = service.setdefault("extended", []) | |
150 extended.append(xml_tools.dataForm2dataDict(form)) | |
151 services.append(service) | |
152 | |
153 return services | |
154 | |
155 def _external_disco_get(self, entity: str, profile_key: str) -> defer.Deferred: | |
156 client = self.host.getClient(profile_key) | |
157 d = defer.ensureDeferred( | |
158 self.get_external_services(client, jid.JID(entity) if entity else None) | |
159 ) | |
160 d.addCallback(data_format.serialise) | |
161 return d | |
162 | |
163 async def get_external_services( | |
164 self, client: SatXMPPEntity, entity: Optional[jid.JID] = None | |
165 ) -> List[Dict]: | |
166 """Get non XMPP service proposed by the entity | |
167 | |
168 Response is cached after first query | |
169 | |
170 @param entity: XMPP entity to query. Defaut to our own server | |
171 @return: found services | |
172 """ | |
173 if entity is None: | |
174 entity = client.server_jid | |
175 | |
176 if entity.resource: | |
177 raise exceptions.DataError("A bare jid was expected for target entity") | |
178 | |
179 try: | |
180 cached_services = client._xep_0215_services[entity] | |
181 except KeyError: | |
182 if not self.host.hasFeature(client, NS_EXTDISCO, entity): | |
183 cached_services = client._xep_0215_services[entity] = None | |
184 else: | |
185 iq_elt = client.IQ("get") | |
186 iq_elt["to"] = entity.full() | |
187 iq_elt.addElement((NS_EXTDISCO, "services")) | |
188 try: | |
189 iq_result_elt = await iq_elt.send() | |
190 except error.StanzaError as e: | |
191 log.warning(f"Can't get external services: {e}") | |
192 cached_services = client._xep_0215_services[entity] = None | |
193 else: | |
194 cached_services = self.parse_services(iq_result_elt) | |
195 client._xep_0215_services[entity] = cached_services | |
196 | |
197 return cached_services or [] | |
198 | |
199 def _external_disco_credentials_get( | |
200 self, | |
201 entity: str, | |
202 host: str, | |
203 type_: str, | |
204 port: int = 0, | |
205 profile_key=C.PROF_KEY_NONE, | |
206 ) -> defer.Deferred: | |
207 client = self.host.getClient(profile_key) | |
208 d = defer.ensureDeferred( | |
209 self.request_credentials( | |
210 client, host, type_, port or None, jid.JID(entity) if entity else None | |
211 ) | |
212 ) | |
213 d.addCallback(data_format.serialise) | |
214 return d | |
215 | |
216 async def request_credentials( | |
217 self, | |
218 client: SatXMPPEntity, | |
219 host: str, | |
220 type_: str, | |
221 port: Optional[int] = None, | |
222 entity: Optional[jid.JID] = None, | |
223 ) -> List[dict]: | |
224 """Request credentials for specified service(s) | |
225 | |
226 While usually a single service is expected, several may be returned if the same | |
227 service is launched on several ports (cf. XEP-0215 §3.3) | |
228 @param entity: XMPP entity to query. Defaut to our own server | |
229 @param host: service host | |
230 @param type_: service type | |
231 @param port: service port (to be used when several services have same host and | |
232 type but on different ports) | |
233 @return: matching services with filled credentials | |
234 """ | |
235 if entity is None: | |
236 entity = client.server_jid | |
237 | |
238 iq_elt = client.IQ("get") | |
239 iq_elt["to"] = entity.full() | |
240 iq_elt.addElement((NS_EXTDISCO, "credentials")) | |
241 iq_result_elt = await iq_elt.send() | |
242 return self.parse_services(iq_result_elt, parent_elt_name="credentials") | |
243 | |
244 def get_matching_service( | |
245 self, services: List[dict], host: str, type_: str, port: Optional[int] | |
246 ) -> Optional[dict]: | |
247 """Retrieve service data from its characteristics""" | |
248 try: | |
249 return next( | |
250 s | |
251 for s in services | |
252 if ( | |
253 s["host"] == host | |
254 and s["type"] == type_ | |
255 and (port is None or s.get("port") == port) | |
256 ) | |
257 ) | |
258 except StopIteration: | |
259 return None | |
260 | |
261 def on_services_push(self, iq_elt: domish.Element, client: SatXMPPEntity) -> None: | |
262 iq_elt.handled = True | |
263 entity = jid.JID(iq_elt["from"]).userhostJID() | |
264 cached_services = client._xep_0215_services.get(entity) | |
265 if cached_services is None: | |
266 log.info(f"ignoring services push for uncached entity {entity}") | |
267 return | |
268 try: | |
269 services = self.parse_services(iq_elt) | |
270 except Exception: | |
271 log.exception(f"Can't parse services push: {iq_elt.toXml()}") | |
272 return | |
273 for service in services: | |
274 host = service["host"] | |
275 type_ = service["type"] | |
276 port = service.get("port") | |
277 | |
278 action = service.pop("action", None) | |
279 if action is None: | |
280 # action is not specified, we first check if the service exists | |
281 found_service = self.get_matching_service( | |
282 cached_services, host, type_, port | |
283 ) | |
284 if found_service is not None: | |
285 # existing service, we replace by the new one | |
286 found_service.clear() | |
287 found_service.update(service) | |
288 else: | |
289 # new service | |
290 cached_services.append(service) | |
291 elif action == "add": | |
292 cached_services.append(service) | |
293 elif action in ("modify", "delete"): | |
294 found_service = self.get_matching_service( | |
295 cached_services, host, type_, port | |
296 ) | |
297 if found_service is None: | |
298 log.warning( | |
299 f"{entity} want to {action} an unknow service, we ask for the " | |
300 "full list again" | |
301 ) | |
302 # we delete cache and request a fresh list to make a new one | |
303 del client._xep_0215_services[entity] | |
304 defer.ensureDeferred(self.get_external_services(client, entity)) | |
305 elif action == "modify": | |
306 found_service.clear() | |
307 found_service.update(service) | |
308 else: | |
309 cached_services.remove(found_service) | |
310 else: | |
311 log.warning(f"unknown action for services push, ignoring: {action!r}") | |
312 | |
313 | |
314 @implementer(iwokkel.IDisco) | |
315 class XEP_0215_handler(XMPPHandler): | |
316 def __init__(self, plugin_parent): | |
317 self.plugin_parent = plugin_parent | |
318 | |
319 def connectionInitialized(self): | |
320 self.xmlstream.addObserver( | |
321 IQ_PUSH, self.plugin_parent.on_services_push, client=self.parent | |
322 ) | |
323 | |
324 def getDiscoInfo(self, requestor, target, nodeIdentifier=""): | |
325 return [disco.DiscoFeature(NS_EXTDISCO)] | |
326 | |
327 def getDiscoItems(self, requestor, target, nodeIdentifier=""): | |
328 return [] |