Mercurial > libervia-backend
comparison libervia/backend/core/xmpp.py @ 4071:4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 11:49:51 +0200 |
parents | sat/core/xmpp.py@c23cad65ae99 |
children | bc7d45dedeb0 |
comparison
equal
deleted
inserted
replaced
4070:d10748475025 | 4071:4b842c1fb686 |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 # Libervia: an XMPP client | |
4 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) | |
5 | |
6 # This program is free software: you can redistribute it and/or modify | |
7 # it under the terms of the GNU Affero General Public License as published by | |
8 # the Free Software Foundation, either version 3 of the License, or | |
9 # (at your option) any later version. | |
10 | |
11 # This program is distributed in the hope that it will be useful, | |
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
14 # GNU Affero General Public License for more details. | |
15 | |
16 # You should have received a copy of the GNU Affero General Public License | |
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
18 | |
19 import calendar | |
20 import copy | |
21 from functools import partial | |
22 import mimetypes | |
23 from pathlib import Path | |
24 import sys | |
25 import time | |
26 from typing import Callable, Dict, Tuple, Optional | |
27 from urllib.parse import unquote, urlparse | |
28 import uuid | |
29 | |
30 import shortuuid | |
31 from twisted.internet import defer, error as internet_error | |
32 from twisted.internet import ssl | |
33 from twisted.python import failure | |
34 from twisted.words.protocols.jabber import xmlstream | |
35 from twisted.words.protocols.jabber import error | |
36 from twisted.words.protocols.jabber import jid | |
37 from twisted.words.protocols.jabber.xmlstream import XMPPHandler | |
38 from twisted.words.xish import domish | |
39 from wokkel import client as wokkel_client, disco, generic, iwokkel, xmppim | |
40 from wokkel import component | |
41 from wokkel import delay | |
42 from zope.interface import implementer | |
43 | |
44 from libervia.backend.core import exceptions | |
45 from libervia.backend.core import core_types | |
46 from libervia.backend.core.constants import Const as C | |
47 from libervia.backend.core.i18n import _ | |
48 from libervia.backend.core.log import getLogger | |
49 from libervia.backend.memory import cache | |
50 from libervia.backend.memory import encryption | |
51 from libervia.backend.memory import persistent | |
52 from libervia.backend.tools import xml_tools | |
53 from libervia.backend.tools import utils | |
54 from libervia.backend.tools.common import data_format | |
55 | |
56 log = getLogger(__name__) | |
57 | |
58 | |
59 NS_X_DATA = "jabber:x:data" | |
60 NS_DISCO_INFO = "http://jabber.org/protocol/disco#info" | |
61 NS_XML_ELEMENT = "urn:xmpp:xml-element" | |
62 NS_ROSTER_VER = "urn:xmpp:features:rosterver" | |
63 # we use 2 "@" which is illegal in a jid, to be sure we are not mixing keys | |
64 # with roster jids | |
65 ROSTER_VER_KEY = "@version@" | |
66 | |
67 | |
68 class ClientPluginWrapper: | |
69 """Use a plugin with default value if plugin is missing""" | |
70 | |
71 def __init__(self, client, plugin_name, missing): | |
72 self.client = client | |
73 self.plugin = client.host_app.plugins.get(plugin_name) | |
74 if self.plugin is None: | |
75 self.plugin_name = plugin_name | |
76 self.missing = missing | |
77 | |
78 def __getattr__(self, attr): | |
79 if self.plugin is None: | |
80 missing = self.missing | |
81 if isinstance(missing, type) and issubclass(missing, Exception): | |
82 raise missing(f"plugin {self.plugin_name!r} is not available") | |
83 elif isinstance(missing, Exception): | |
84 raise missing | |
85 else: | |
86 return lambda *args, **kwargs: missing | |
87 return partial(getattr(self.plugin, attr), self.client) | |
88 | |
89 | |
90 class SatXMPPEntity(core_types.SatXMPPEntity): | |
91 """Common code for Client and Component""" | |
92 # profile is added there when start_connection begins and removed when it is finished | |
93 profiles_connecting = set() | |
94 | |
95 def __init__(self, host_app, profile, max_retries): | |
96 factory = self.factory | |
97 | |
98 # we monkey patch clientConnectionLost to handle network_enabled/network_disabled | |
99 # and to allow plugins to tune reconnection mechanism | |
100 clientConnectionFailed_ori = factory.clientConnectionFailed | |
101 clientConnectionLost_ori = factory.clientConnectionLost | |
102 factory.clientConnectionFailed = partial( | |
103 self.connection_terminated, term_type="failed", cb=clientConnectionFailed_ori) | |
104 factory.clientConnectionLost = partial( | |
105 self.connection_terminated, term_type="lost", cb=clientConnectionLost_ori) | |
106 | |
107 factory.maxRetries = max_retries | |
108 factory.maxDelay = 30 | |
109 # when self._connected_d is None, we are not connected | |
110 # else, it's a deferred which fire on disconnection | |
111 self._connected_d = None | |
112 self.profile = profile | |
113 self.host_app = host_app | |
114 self.cache = cache.Cache(host_app, profile) | |
115 self.mess_id2uid = {} # map from message id to uid used in history. | |
116 # Key: (full_jid, message_id) Value: uid | |
117 # this Deferred fire when entity is connected | |
118 self.conn_deferred = defer.Deferred() | |
119 self._progress_cb = {} # callback called when a progress is requested | |
120 # (key = progress id) | |
121 self.actions = {} # used to keep track of actions for retrieval (key = action_id) | |
122 self.encryption = encryption.EncryptionHandler(self) | |
123 | |
124 def __str__(self): | |
125 return f"Client for profile {self.profile}" | |
126 | |
127 def __repr__(self): | |
128 return f"{super().__repr__()} - profile: {self.profile!r}" | |
129 | |
130 ## initialisation ## | |
131 | |
132 async def _call_connection_triggers(self, connection_timer): | |
133 """Call conneting trigger prepare connected trigger | |
134 | |
135 @param plugins(iterable): plugins to use | |
136 @return (list[object, callable]): plugin to trigger tuples with: | |
137 - plugin instance | |
138 - profile_connected* triggers (to call after connection) | |
139 """ | |
140 plugin_conn_cb = [] | |
141 for plugin in self._get_plugins_list(): | |
142 # we check if plugin handle client mode | |
143 if plugin.is_handler: | |
144 plugin.get_handler(self).setHandlerParent(self) | |
145 | |
146 # profile_connecting/profile_connected methods handling | |
147 | |
148 timer = connection_timer[plugin] = { | |
149 "total": 0 | |
150 } | |
151 # profile connecting is called right now (before actually starting client) | |
152 connecting_cb = getattr(plugin, "profile_connecting", None) | |
153 if connecting_cb is not None: | |
154 connecting_start = time.time() | |
155 await utils.as_deferred(connecting_cb, self) | |
156 timer["connecting"] = time.time() - connecting_start | |
157 timer["total"] += timer["connecting"] | |
158 | |
159 # profile connected is called after client is ready and roster is got | |
160 connected_cb = getattr(plugin, "profile_connected", None) | |
161 if connected_cb is not None: | |
162 plugin_conn_cb.append((plugin, connected_cb)) | |
163 | |
164 return plugin_conn_cb | |
165 | |
166 def _get_plugins_list(self): | |
167 """Return list of plugin to use | |
168 | |
169 need to be implemented by subclasses | |
170 this list is used to call profileConnect* triggers | |
171 @return(iterable[object]): plugins to use | |
172 """ | |
173 raise NotImplementedError | |
174 | |
175 def _create_sub_protocols(self): | |
176 return | |
177 | |
178 def entity_connected(self): | |
179 """Called once connection is done | |
180 | |
181 may return a Deferred, to perform initialisation tasks | |
182 """ | |
183 return | |
184 | |
185 @staticmethod | |
186 async def _run_profile_connected( | |
187 callback: Callable, | |
188 entity: "SatXMPPEntity", | |
189 timer: Dict[str, float] | |
190 ) -> None: | |
191 connected_start = time.time() | |
192 await utils.as_deferred(callback, entity) | |
193 timer["connected"] = time.time() - connected_start | |
194 timer["total"] += timer["connected"] | |
195 | |
196 @classmethod | |
197 async def start_connection(cls, host, profile, max_retries): | |
198 """instantiate the entity and start the connection""" | |
199 # FIXME: reconnection doesn't seems to be handled correclty | |
200 # (client is deleted then recreated from scratch) | |
201 # most of methods called here should be called once on first connection | |
202 # (e.g. adding subprotocols) | |
203 # but client should not be deleted except if session is finished | |
204 # (independently of connection/deconnection) | |
205 if profile in cls.profiles_connecting: | |
206 raise exceptions.CancelError(f"{profile} is already being connected") | |
207 cls.profiles_connecting.add(profile) | |
208 try: | |
209 try: | |
210 port = int( | |
211 host.memory.param_get_a( | |
212 C.FORCE_PORT_PARAM, "Connection", profile_key=profile | |
213 ) | |
214 ) | |
215 except ValueError: | |
216 log.debug(_("Can't parse port value, using default value")) | |
217 port = ( | |
218 None | |
219 ) # will use default value 5222 or be retrieved from a DNS SRV record | |
220 | |
221 password = await host.memory.param_get_a_async( | |
222 "Password", "Connection", profile_key=profile | |
223 ) | |
224 | |
225 entity_jid_s = await host.memory.param_get_a_async( | |
226 "JabberID", "Connection", profile_key=profile) | |
227 entity_jid = jid.JID(entity_jid_s) | |
228 | |
229 if not entity_jid.resource and not cls.is_component and entity_jid.user: | |
230 # if no resource is specified, we create our own instead of using | |
231 # server returned one, as it will then stay stable in case of | |
232 # reconnection. we only do that for client and if there is a user part, to | |
233 # let server decide for anonymous login | |
234 resource_dict = await host.memory.storage.get_privates( | |
235 "core:xmpp", ["resource"] , profile=profile) | |
236 try: | |
237 resource = resource_dict["resource"] | |
238 except KeyError: | |
239 resource = f"{C.APP_NAME_FILE}.{shortuuid.uuid()}" | |
240 await host.memory.storage.set_private_value( | |
241 "core:xmpp", "resource", resource, profile=profile) | |
242 | |
243 log.info(_("We'll use the stable resource {resource}").format( | |
244 resource=resource)) | |
245 entity_jid.resource = resource | |
246 | |
247 if profile in host.profiles: | |
248 if host.profiles[profile].is_connected(): | |
249 raise exceptions.InternalError( | |
250 f"There is already a connected profile of name {profile!r} in " | |
251 f"host") | |
252 log.debug( | |
253 "removing unconnected profile {profile!r}") | |
254 del host.profiles[profile] | |
255 entity = host.profiles[profile] = cls( | |
256 host, profile, entity_jid, password, | |
257 host.memory.param_get_a(C.FORCE_SERVER_PARAM, "Connection", | |
258 profile_key=profile) or None, | |
259 port, max_retries, | |
260 ) | |
261 | |
262 await entity.encryption.load_sessions() | |
263 | |
264 entity._create_sub_protocols() | |
265 | |
266 entity.fallBack = SatFallbackHandler(host) | |
267 entity.fallBack.setHandlerParent(entity) | |
268 | |
269 entity.versionHandler = SatVersionHandler(C.APP_NAME, host.full_version) | |
270 entity.versionHandler.setHandlerParent(entity) | |
271 | |
272 entity.identityHandler = SatIdentityHandler() | |
273 entity.identityHandler.setHandlerParent(entity) | |
274 | |
275 log.debug(_("setting plugins parents")) | |
276 | |
277 connection_timer: Dict[str, Dict[str, float]] = {} | |
278 plugin_conn_cb = await entity._call_connection_triggers(connection_timer) | |
279 | |
280 entity.startService() | |
281 | |
282 await entity.conn_deferred | |
283 | |
284 await defer.maybeDeferred(entity.entity_connected) | |
285 | |
286 # Call profile_connected callback for all plugins, | |
287 # and print error message if any of them fails | |
288 conn_cb_list = [] | |
289 for plugin, callback in plugin_conn_cb: | |
290 conn_cb_list.append( | |
291 defer.ensureDeferred( | |
292 cls._run_profile_connected( | |
293 callback, entity, connection_timer[plugin] | |
294 ) | |
295 ) | |
296 ) | |
297 list_d = defer.DeferredList(conn_cb_list) | |
298 | |
299 def log_plugin_results(results): | |
300 if not results: | |
301 log.info("no plugin loaded") | |
302 return | |
303 all_succeed = all([success for success, result in results]) | |
304 if not all_succeed: | |
305 log.error(_("Plugins initialisation error")) | |
306 for idx, (success, result) in enumerate(results): | |
307 if not success: | |
308 plugin_name = plugin_conn_cb[idx][0]._info["import_name"] | |
309 log.error(f"error (plugin {plugin_name}): {result}") | |
310 | |
311 log.debug(f"Plugin loading time for {profile!r} (longer to shorter):\n") | |
312 plugins_by_timer = sorted( | |
313 connection_timer, | |
314 key=lambda p: connection_timer[p]["total"], | |
315 reverse=True | |
316 ) | |
317 # total is the addition of all connecting and connected, doesn't really | |
318 # reflect the real loading time as connected are launched in a | |
319 # DeferredList | |
320 total_plugins = 0 | |
321 # total real sum all connecting (which happen sequentially) and the | |
322 # longuest connected (connected happen in parallel, thus the longuest is | |
323 # roughly the total time for connected) | |
324 total_real = 0 | |
325 total_real = max(t.get("connected", 0) for t in connection_timer.values()) | |
326 | |
327 for plugin in plugins_by_timer: | |
328 name = plugin._info["import_name"] | |
329 timer = connection_timer[plugin] | |
330 total_plugins += timer["total"] | |
331 try: | |
332 connecting = f"{timer['connecting']:.2f}s" | |
333 except KeyError: | |
334 connecting = "n/a" | |
335 else: | |
336 total_real += timer["connecting"] | |
337 try: | |
338 connected = f"{timer['connected']:.2f}s" | |
339 except KeyError: | |
340 connected = "n/a" | |
341 log.debug( | |
342 f" - {name}: total={timer['total']:.2f}s " | |
343 f"connecting={connecting} connected={connected}" | |
344 ) | |
345 log.debug( | |
346 f" Plugins total={total_plugins:.2f}s real={total_real:.2f}s\n" | |
347 ) | |
348 | |
349 await list_d.addCallback( | |
350 log_plugin_results | |
351 ) # FIXME: we should have a timeout here, and a way to know if a plugin freeze | |
352 # TODO: mesure launch time of each plugin | |
353 finally: | |
354 cls.profiles_connecting.remove(profile) | |
355 | |
356 def _disconnection_cb(self, __): | |
357 self._connected_d = None | |
358 | |
359 def _disconnection_eb(self, failure_): | |
360 log.error(_("Error while disconnecting: {}".format(failure_))) | |
361 | |
362 def _authd(self, xmlstream): | |
363 super(SatXMPPEntity, self)._authd(xmlstream) | |
364 log.debug(_("{profile} identified").format(profile=self.profile)) | |
365 self.stream_initialized() | |
366 | |
367 def _finish_connection(self, __): | |
368 if self.conn_deferred.called: | |
369 # can happen in case of forced disconnection by server | |
370 log.debug(f"{self} has already been connected") | |
371 else: | |
372 self.conn_deferred.callback(None) | |
373 | |
374 def stream_initialized(self): | |
375 """Called after _authd""" | |
376 log.debug(_("XML stream is initialized")) | |
377 if not self.host_app.trigger.point("xml_init", self): | |
378 return | |
379 self.post_stream_init() | |
380 | |
381 def post_stream_init(self): | |
382 """Workflow after stream initalisation.""" | |
383 log.info( | |
384 _("********** [{profile}] CONNECTED **********").format(profile=self.profile) | |
385 ) | |
386 | |
387 # the following Deferred is used to know when we are connected | |
388 # so we need to be set it to None when connection is lost | |
389 self._connected_d = defer.Deferred() | |
390 self._connected_d.addCallback(self._clean_connection) | |
391 self._connected_d.addCallback(self._disconnection_cb) | |
392 self._connected_d.addErrback(self._disconnection_eb) | |
393 | |
394 # we send the signal to the clients | |
395 self.host_app.bridge.connected(self.jid.full(), self.profile) | |
396 | |
397 self.disco = SatDiscoProtocol(self) | |
398 self.disco.setHandlerParent(self) | |
399 self.discoHandler = disco.DiscoHandler() | |
400 self.discoHandler.setHandlerParent(self) | |
401 disco_d = defer.succeed(None) | |
402 | |
403 if not self.host_app.trigger.point("Disco handled", disco_d, self.profile): | |
404 return | |
405 | |
406 disco_d.addCallback(self._finish_connection) | |
407 | |
408 def initializationFailed(self, reason): | |
409 log.error( | |
410 _( | |
411 "ERROR: XMPP connection failed for profile '%(profile)s': %(reason)s" | |
412 % {"profile": self.profile, "reason": reason} | |
413 ) | |
414 ) | |
415 self.conn_deferred.errback(reason.value) | |
416 try: | |
417 super(SatXMPPEntity, self).initializationFailed(reason) | |
418 except: | |
419 # we already chained an errback, no need to raise an exception | |
420 pass | |
421 | |
422 ## connection ## | |
423 | |
424 def connection_terminated(self, connector, reason, term_type, cb): | |
425 """Display disconnection reason, and call factory method | |
426 | |
427 This method is monkey patched to factory, allowing plugins to handle finely | |
428 reconnection with the triggers. | |
429 @param connector(twisted.internet.base.BaseConnector): current connector | |
430 @param reason(failure.Failure): why connection has been terminated | |
431 @param term_type(unicode): on of 'failed' or 'lost' | |
432 @param cb(callable): original factory method | |
433 | |
434 @trigger connection_failed(connector, reason): connection can't be established | |
435 @trigger connection_lost(connector, reason): connection was available but it not | |
436 anymore | |
437 """ | |
438 # we save connector because it may be deleted when connection will be dropped | |
439 # if reconnection is disabled | |
440 self._saved_connector = connector | |
441 if reason is not None and not isinstance(reason.value, | |
442 internet_error.ConnectionDone): | |
443 try: | |
444 reason_str = str(reason.value) | |
445 except Exception: | |
446 # FIXME: workaround for Android were p4a strips docstrings | |
447 # while Twisted use docstring in __str__ | |
448 # TODO: create a ticket upstream, Twisted should work when optimization | |
449 # is used | |
450 reason_str = str(reason.value.__class__) | |
451 log.warning(f"[{self.profile}] Connection {term_type}: {reason_str}") | |
452 if not self.host_app.trigger.point("connection_" + term_type, connector, reason): | |
453 return | |
454 return cb(connector, reason) | |
455 | |
456 def network_disabled(self): | |
457 """Indicate that network has been completely disabled | |
458 | |
459 In other words, internet is not available anymore and transport must be stopped. | |
460 Retrying is disabled too, as it makes no sense to try without network, and it may | |
461 use resources (notably battery on mobiles). | |
462 """ | |
463 log.info(_("stopping connection because of network disabled")) | |
464 self.factory.continueTrying = 0 | |
465 self._network_disabled = True | |
466 if self.xmlstream is not None: | |
467 self.xmlstream.transport.abortConnection() | |
468 | |
469 def network_enabled(self): | |
470 """Indicate that network has been (re)enabled | |
471 | |
472 This happens when e.g. user activate WIFI connection. | |
473 """ | |
474 try: | |
475 connector = self._saved_connector | |
476 network_disabled = self._network_disabled | |
477 except AttributeError: | |
478 # connection has not been stopped by network_disabled | |
479 # we don't have to restart it | |
480 log.debug(f"no connection to restart [{self.profile}]") | |
481 return | |
482 else: | |
483 del self._network_disabled | |
484 if not network_disabled: | |
485 raise exceptions.InternalError("network_disabled should be True") | |
486 log.info(_("network is available, trying to connect")) | |
487 # we want to be sure to start fresh | |
488 self.factory.resetDelay() | |
489 # we have a saved connector, meaning the connection has been stopped previously | |
490 # we can now try to reconnect | |
491 connector.connect() | |
492 | |
493 def _connected(self, xs): | |
494 send_hooks = [] | |
495 receive_hooks = [] | |
496 self.host_app.trigger.point( | |
497 "stream_hooks", self, receive_hooks, send_hooks) | |
498 for hook in receive_hooks: | |
499 xs.add_hook(C.STREAM_HOOK_RECEIVE, hook) | |
500 for hook in send_hooks: | |
501 xs.add_hook(C.STREAM_HOOK_SEND, hook) | |
502 super(SatXMPPEntity, self)._connected(xs) | |
503 | |
504 def disconnect_profile(self, reason): | |
505 if self._connected_d is not None: | |
506 self.host_app.bridge.disconnected( | |
507 self.profile | |
508 ) # we send the signal to the clients | |
509 log.info( | |
510 _("********** [{profile}] DISCONNECTED **********").format( | |
511 profile=self.profile | |
512 ) | |
513 ) | |
514 # we purge only if no new connection attempt is expected | |
515 if not self.factory.continueTrying: | |
516 log.debug("continueTrying not set, purging entity") | |
517 self._connected_d.callback(None) | |
518 # and we remove references to this client | |
519 self.host_app.purge_entity(self.profile) | |
520 | |
521 if not self.conn_deferred.called: | |
522 if reason is None: | |
523 err = error.StreamError("Server unexpectedly closed the connection") | |
524 else: | |
525 err = reason | |
526 try: | |
527 if err.value.args[0][0][2] == "certificate verify failed": | |
528 err = exceptions.InvalidCertificate( | |
529 _("Your server certificate is not valid " | |
530 "(its identity can't be checked).\n\n" | |
531 "This should never happen and may indicate that " | |
532 "somebody is trying to spy on you.\n" | |
533 "Please contact your server administrator.")) | |
534 self.factory.stopTrying() | |
535 try: | |
536 # with invalid certificate, we should not retry to connect | |
537 # so we delete saved connector to avoid reconnection if | |
538 # network_enabled is called. | |
539 del self._saved_connector | |
540 except AttributeError: | |
541 pass | |
542 except (IndexError, TypeError): | |
543 pass | |
544 self.conn_deferred.errback(err) | |
545 | |
546 def _disconnected(self, reason): | |
547 super(SatXMPPEntity, self)._disconnected(reason) | |
548 if not self.host_app.trigger.point("disconnected", self, reason): | |
549 return | |
550 self.disconnect_profile(reason) | |
551 | |
552 @defer.inlineCallbacks | |
553 def _clean_connection(self, __): | |
554 """method called on disconnection | |
555 | |
556 used to call profile_disconnected* triggers | |
557 """ | |
558 trigger_name = "profile_disconnected" | |
559 for plugin in self._get_plugins_list(): | |
560 disconnected_cb = getattr(plugin, trigger_name, None) | |
561 if disconnected_cb is not None: | |
562 yield disconnected_cb(self) | |
563 | |
564 def is_connected(self): | |
565 """Return True is client is fully connected | |
566 | |
567 client is considered fully connected if transport is started and all plugins | |
568 are initialised | |
569 """ | |
570 try: | |
571 transport_connected = bool(self.xmlstream.transport.connected) | |
572 except AttributeError: | |
573 return False | |
574 | |
575 return self._connected_d is not None and transport_connected | |
576 | |
577 def entity_disconnect(self): | |
578 if not self.host_app.trigger.point("disconnecting", self): | |
579 return | |
580 log.info(_("Disconnecting...")) | |
581 self.stopService() | |
582 if self._connected_d is not None: | |
583 return self._connected_d | |
584 else: | |
585 return defer.succeed(None) | |
586 | |
587 ## sending ## | |
588 | |
589 def IQ(self, type_="set", timeout=60): | |
590 """shortcut to create an IQ element managing deferred | |
591 | |
592 @param type_(unicode): IQ type ('set' or 'get') | |
593 @param timeout(None, int): timeout in seconds | |
594 @return((D)domish.Element: result stanza | |
595 errback is called if an error stanza is returned | |
596 """ | |
597 iq_elt = xmlstream.IQ(self.xmlstream, type_) | |
598 iq_elt.timeout = timeout | |
599 return iq_elt | |
600 | |
601 def sendError(self, iq_elt, condition, text=None, appCondition=None): | |
602 """Send error stanza build from iq_elt | |
603 | |
604 @param iq_elt(domish.Element): initial IQ element | |
605 @param condition(unicode): error condition | |
606 """ | |
607 iq_error_elt = error.StanzaError( | |
608 condition, text=text, appCondition=appCondition | |
609 ).toResponse(iq_elt) | |
610 self.xmlstream.send(iq_error_elt) | |
611 | |
612 def generate_message_xml( | |
613 self, | |
614 data: core_types.MessageData, | |
615 post_xml_treatments: Optional[defer.Deferred] = None | |
616 ) -> core_types.MessageData: | |
617 """Generate <message/> stanza from message data | |
618 | |
619 @param data: message data | |
620 domish element will be put in data['xml'] | |
621 following keys are needed: | |
622 - from | |
623 - to | |
624 - uid: can be set to '' if uid attribute is not wanted | |
625 - message | |
626 - type | |
627 - subject | |
628 - extra | |
629 @param post_xml_treatments: a Deferred which will be called with data once XML is | |
630 generated | |
631 @return: message data | |
632 """ | |
633 data["xml"] = message_elt = domish.Element((None, "message")) | |
634 message_elt["to"] = data["to"].full() | |
635 message_elt["from"] = data["from"].full() | |
636 message_elt["type"] = data["type"] | |
637 if data["uid"]: # key must be present but can be set to '' | |
638 # by a plugin to avoid id on purpose | |
639 message_elt["id"] = data["uid"] | |
640 for lang, subject in data["subject"].items(): | |
641 subject_elt = message_elt.addElement("subject", content=subject) | |
642 if lang: | |
643 subject_elt[(C.NS_XML, "lang")] = lang | |
644 for lang, message in data["message"].items(): | |
645 body_elt = message_elt.addElement("body", content=message) | |
646 if lang: | |
647 body_elt[(C.NS_XML, "lang")] = lang | |
648 try: | |
649 thread = data["extra"]["thread"] | |
650 except KeyError: | |
651 if "thread_parent" in data["extra"]: | |
652 raise exceptions.InternalError( | |
653 "thread_parent found while there is not associated thread" | |
654 ) | |
655 else: | |
656 thread_elt = message_elt.addElement("thread", content=thread) | |
657 try: | |
658 thread_elt["parent"] = data["extra"]["thread_parent"] | |
659 except KeyError: | |
660 pass | |
661 | |
662 if post_xml_treatments is not None: | |
663 post_xml_treatments.callback(data) | |
664 return data | |
665 | |
666 @property | |
667 def is_admin(self) -> bool: | |
668 """True if a client is an administrator with extra privileges""" | |
669 return self.host_app.memory.is_admin(self.profile) | |
670 | |
671 def add_post_xml_callbacks(self, post_xml_treatments): | |
672 """Used to add class level callbacks at the end of the workflow | |
673 | |
674 @param post_xml_treatments(D): the same Deferred as in sendMessage trigger | |
675 """ | |
676 raise NotImplementedError | |
677 | |
678 async def a_send(self, obj): | |
679 # original send method accept string | |
680 # but we restrict to domish.Element to make trigger treatments easier | |
681 assert isinstance(obj, domish.Element) | |
682 # XXX: this trigger is the last one before sending stanza on wire | |
683 # it is intended for things like end 2 end encryption. | |
684 # *DO NOT* cancel (i.e. return False) without very good reason | |
685 # (out of band transmission for instance). | |
686 # e2e should have a priority of 0 here, and out of band transmission | |
687 # a lower priority | |
688 if not (await self.host_app.trigger.async_point("send", self, obj)): | |
689 return | |
690 super().send(obj) | |
691 | |
692 def send(self, obj): | |
693 defer.ensureDeferred(self.a_send(obj)) | |
694 | |
695 async def send_message_data(self, mess_data): | |
696 """Convenient method to send message data to stream | |
697 | |
698 This method will send mess_data[u'xml'] to stream, but a trigger is there | |
699 The trigger can't be cancelled, it's a good place for e2e encryption which | |
700 don't handle full stanza encryption | |
701 This trigger can return a Deferred (it's an async_point) | |
702 @param mess_data(dict): message data as constructed by onMessage workflow | |
703 @return (dict): mess_data (so it can be used in a deferred chain) | |
704 """ | |
705 # XXX: This is the last trigger before u"send" (last but one globally) | |
706 # for sending message. | |
707 # This is intented for e2e encryption which doesn't do full stanza | |
708 # encryption (e.g. OTR) | |
709 # This trigger point can't cancel the method | |
710 await self.host_app.trigger.async_point("send_message_data", self, mess_data, | |
711 triggers_no_cancel=True) | |
712 await self.a_send(mess_data["xml"]) | |
713 return mess_data | |
714 | |
715 def sendMessage( | |
716 self, to_jid, message, subject=None, mess_type="auto", extra=None, uid=None, | |
717 no_trigger=False): | |
718 r"""Send a message to an entity | |
719 | |
720 @param to_jid(jid.JID): destinee of the message | |
721 @param message(dict): message body, key is the language (use '' when unknown) | |
722 @param subject(dict): message subject, key is the language (use '' when unknown) | |
723 @param mess_type(str): one of standard message type (cf RFC 6121 §5.2.2) or: | |
724 - auto: for automatic type detection | |
725 - info: for information ("info_type" can be specified in extra) | |
726 @param extra(dict, None): extra data. Key can be: | |
727 - info_type: information type, can be | |
728 TODO | |
729 @param uid(unicode, None): unique id: | |
730 should be unique at least in this XMPP session | |
731 if None, an uuid will be generated | |
732 @param no_trigger (bool): if True, sendMessage[suffix] trigger will no be used | |
733 useful when a message need to be sent without any modification | |
734 /!\ this will also skip encryption methods! | |
735 """ | |
736 if subject is None: | |
737 subject = {} | |
738 if extra is None: | |
739 extra = {} | |
740 | |
741 assert mess_type in C.MESS_TYPE_ALL | |
742 | |
743 data = { # dict is similar to the one used in client.onMessage | |
744 "from": self.jid, | |
745 "to": to_jid, | |
746 "uid": uid or str(uuid.uuid4()), | |
747 "message": message, | |
748 "subject": subject, | |
749 "type": mess_type, | |
750 "extra": extra, | |
751 "timestamp": time.time(), | |
752 } | |
753 # XXX: plugin can add their pre XML treatments to this deferred | |
754 pre_xml_treatments = defer.Deferred() | |
755 # XXX: plugin can add their post XML treatments to this deferred | |
756 post_xml_treatments = defer.Deferred() | |
757 | |
758 if data["type"] == C.MESS_TYPE_AUTO: | |
759 # we try to guess the type | |
760 if data["subject"]: | |
761 data["type"] = C.MESS_TYPE_NORMAL | |
762 elif not data["to"].resource: | |
763 # we may have a groupchat message, we check if the we know this jid | |
764 try: | |
765 entity_type = self.host_app.memory.get_entity_datum( | |
766 self, data["to"], C.ENTITY_TYPE | |
767 ) | |
768 # FIXME: should entity_type manage resources ? | |
769 except (exceptions.UnknownEntityError, KeyError): | |
770 entity_type = "contact" | |
771 | |
772 if entity_type == C.ENTITY_TYPE_MUC: | |
773 data["type"] = C.MESS_TYPE_GROUPCHAT | |
774 else: | |
775 data["type"] = C.MESS_TYPE_CHAT | |
776 else: | |
777 data["type"] = C.MESS_TYPE_CHAT | |
778 | |
779 # FIXME: send_only is used by libervia's OTR plugin to avoid | |
780 # the triggers from frontend, and no_trigger do the same | |
781 # thing internally, this could be unified | |
782 send_only = data["extra"].get("send_only", False) | |
783 | |
784 if not no_trigger and not send_only: | |
785 # is the session encrypted? If so we indicate it in data | |
786 self.encryption.set_encryption_flag(data) | |
787 | |
788 if not self.host_app.trigger.point( | |
789 "sendMessage" + self.trigger_suffix, | |
790 self, | |
791 data, | |
792 pre_xml_treatments, | |
793 post_xml_treatments, | |
794 ): | |
795 return defer.succeed(None) | |
796 | |
797 log.debug(_("Sending message (type {type}, to {to})") | |
798 .format(type=data["type"], to=to_jid.full())) | |
799 | |
800 pre_xml_treatments.addCallback(lambda __: self.generate_message_xml(data, post_xml_treatments)) | |
801 pre_xml_treatments.addCallback(lambda __: post_xml_treatments) | |
802 pre_xml_treatments.addErrback(self._cancel_error_trap) | |
803 post_xml_treatments.addCallback( | |
804 lambda __: defer.ensureDeferred(self.send_message_data(data)) | |
805 ) | |
806 if send_only: | |
807 log.debug(_("Triggers, storage and echo have been inhibited by the " | |
808 "'send_only' parameter")) | |
809 else: | |
810 self.add_post_xml_callbacks(post_xml_treatments) | |
811 post_xml_treatments.addErrback(self._cancel_error_trap) | |
812 post_xml_treatments.addErrback(self.host_app.log_errback) | |
813 pre_xml_treatments.callback(data) | |
814 return pre_xml_treatments | |
815 | |
816 def _cancel_error_trap(self, failure): | |
817 """A message sending can be cancelled by a plugin treatment""" | |
818 failure.trap(exceptions.CancelError) | |
819 | |
820 def is_message_printable(self, mess_data): | |
821 """Return True if a message contain payload to show in frontends""" | |
822 return ( | |
823 mess_data["message"] or mess_data["subject"] | |
824 or mess_data["extra"].get(C.KEY_ATTACHMENTS) | |
825 or mess_data["type"] == C.MESS_TYPE_INFO | |
826 ) | |
827 | |
828 async def message_add_to_history(self, data): | |
829 """Store message into database (for local history) | |
830 | |
831 @param data: message data dictionnary | |
832 @param client: profile's client | |
833 """ | |
834 if data["type"] != C.MESS_TYPE_GROUPCHAT: | |
835 # we don't add groupchat message to history, as we get them back | |
836 # and they will be added then | |
837 | |
838 # we need a message to store | |
839 if self.is_message_printable(data): | |
840 await self.host_app.memory.add_to_history(self, data) | |
841 else: | |
842 log.warning( | |
843 "No message found" | |
844 ) # empty body should be managed by plugins before this point | |
845 return data | |
846 | |
847 def message_get_bridge_args(self, data): | |
848 """Generate args to use with bridge from data dict""" | |
849 return (data["uid"], data["timestamp"], data["from"].full(), | |
850 data["to"].full(), data["message"], data["subject"], | |
851 data["type"], data_format.serialise(data["extra"])) | |
852 | |
853 | |
854 def message_send_to_bridge(self, data): | |
855 """Send message to bridge, so frontends can display it | |
856 | |
857 @param data: message data dictionnary | |
858 @param client: profile's client | |
859 """ | |
860 if data["type"] != C.MESS_TYPE_GROUPCHAT: | |
861 # we don't send groupchat message to bridge, as we get them back | |
862 # and they will be added the | |
863 | |
864 # we need a message to send something | |
865 if self.is_message_printable(data): | |
866 | |
867 # We send back the message, so all frontends are aware of it | |
868 self.host_app.bridge.message_new( | |
869 *self.message_get_bridge_args(data), | |
870 profile=self.profile | |
871 ) | |
872 else: | |
873 log.warning(_("No message found")) | |
874 return data | |
875 | |
876 ## helper methods ## | |
877 | |
878 def p(self, plugin_name, missing=exceptions.MissingModule): | |
879 """Get a plugin if available | |
880 | |
881 @param plugin_name(str): name of the plugin | |
882 @param missing(object): value to return if plugin is missing | |
883 if it is a subclass of Exception, it will be raised with a helping str as | |
884 argument. | |
885 @return (object): requested plugin wrapper, or default value | |
886 The plugin wrapper will return the method with client set as first | |
887 positional argument | |
888 """ | |
889 return ClientPluginWrapper(self, plugin_name, missing) | |
890 | |
891 | |
892 ExtraDict = dict # TODO | |
893 | |
894 | |
895 @implementer(iwokkel.IDisco) | |
896 class SatXMPPClient(SatXMPPEntity, wokkel_client.XMPPClient): | |
897 trigger_suffix = "" | |
898 is_component = False | |
899 | |
900 def __init__(self, host_app, profile, user_jid, password, host=None, | |
901 port=C.XMPP_C2S_PORT, max_retries=C.XMPP_MAX_RETRIES): | |
902 # XXX: DNS SRV records are checked when the host is not specified. | |
903 # If no SRV record is found, the host is directly extracted from the JID. | |
904 self.started = time.time() | |
905 | |
906 # Currently, we use "client/pc/Salut à Toi", but as | |
907 # SàT is multi-frontends and can be used on mobile devices, as a bot, | |
908 # with a web frontend, | |
909 # etc., we should implement a way to dynamically update identities through the | |
910 # bridge | |
911 self.identities = [disco.DiscoIdentity("client", "pc", C.APP_NAME)] | |
912 if sys.platform == "android": | |
913 # for now we consider Android devices to be always phones | |
914 self.identities = [disco.DiscoIdentity("client", "phone", C.APP_NAME)] | |
915 | |
916 hosts_map = host_app.memory.config_get(None, "hosts_dict", {}) | |
917 if host is None and user_jid.host in hosts_map: | |
918 host_data = hosts_map[user_jid.host] | |
919 if isinstance(host_data, str): | |
920 host = host_data | |
921 elif isinstance(host_data, dict): | |
922 if "host" in host_data: | |
923 host = host_data["host"] | |
924 if "port" in host_data: | |
925 port = host_data["port"] | |
926 else: | |
927 log.warning( | |
928 _("invalid data used for host: {data}").format(data=host_data) | |
929 ) | |
930 host_data = None | |
931 if host_data is not None: | |
932 log.info( | |
933 "using {host}:{port} for host {host_ori} as requested in config" | |
934 .format(host_ori=user_jid.host, host=host, port=port) | |
935 ) | |
936 | |
937 self.check_certificate = host_app.memory.param_get_a( | |
938 "check_certificate", "Connection", profile_key=profile) | |
939 | |
940 if self.check_certificate: | |
941 tls_required, configurationForTLS = True, None | |
942 else: | |
943 tls_required = False | |
944 configurationForTLS = ssl.CertificateOptions(trustRoot=None) | |
945 | |
946 wokkel_client.XMPPClient.__init__( | |
947 self, user_jid, password, host or None, port or C.XMPP_C2S_PORT, | |
948 tls_required=tls_required, configurationForTLS=configurationForTLS | |
949 ) | |
950 SatXMPPEntity.__init__(self, host_app, profile, max_retries) | |
951 | |
952 if not self.check_certificate: | |
953 msg = (_("Certificate validation is deactivated, this is unsecure and " | |
954 "somebody may be spying on you. If you have no good reason to disable " | |
955 "certificate validation, please activate \"Check certificate\" in your " | |
956 "settings in \"Connection\" tab.")) | |
957 xml_tools.quick_note(host_app, self, msg, _("Security notice"), | |
958 level = C.XMLUI_DATA_LVL_WARNING) | |
959 | |
960 @property | |
961 def server_jid(self): | |
962 return jid.JID(self.jid.host) | |
963 | |
964 def _get_plugins_list(self): | |
965 for p in self.host_app.plugins.values(): | |
966 if C.PLUG_MODE_CLIENT in p._info["modes"]: | |
967 yield p | |
968 | |
969 def _create_sub_protocols(self): | |
970 self.messageProt = SatMessageProtocol(self.host_app) | |
971 self.messageProt.setHandlerParent(self) | |
972 | |
973 self.roster = SatRosterProtocol(self.host_app) | |
974 self.roster.setHandlerParent(self) | |
975 | |
976 self.presence = SatPresenceProtocol(self.host_app) | |
977 self.presence.setHandlerParent(self) | |
978 | |
979 @classmethod | |
980 async def start_connection(cls, host, profile, max_retries): | |
981 try: | |
982 await super(SatXMPPClient, cls).start_connection(host, profile, max_retries) | |
983 except exceptions.CancelError as e: | |
984 log.warning(f"start_connection cancelled: {e}") | |
985 return | |
986 entity = host.profiles[profile] | |
987 # we finally send our presence | |
988 entity.presence.available() | |
989 | |
990 def entity_connected(self): | |
991 # we want to be sure that we got the roster | |
992 return self.roster.got_roster | |
993 | |
994 def add_post_xml_callbacks(self, post_xml_treatments): | |
995 post_xml_treatments.addCallback(self.messageProt.complete_attachments) | |
996 post_xml_treatments.addCallback( | |
997 lambda ret: defer.ensureDeferred(self.message_add_to_history(ret)) | |
998 ) | |
999 post_xml_treatments.addCallback(self.message_send_to_bridge) | |
1000 | |
1001 def feedback( | |
1002 self, | |
1003 to_jid: jid.JID, | |
1004 message: str, | |
1005 extra: Optional[ExtraDict] = None | |
1006 ) -> None: | |
1007 """Send message to frontends | |
1008 | |
1009 This message will be an info message, not recorded in history. | |
1010 It can be used to give feedback of a command | |
1011 @param to_jid: destinee jid | |
1012 @param message: message to send to frontends | |
1013 @param extra: extra data to use in particular, info subtype can be specified with | |
1014 MESS_EXTRA_INFO | |
1015 """ | |
1016 if extra is None: | |
1017 extra = {} | |
1018 self.host_app.bridge.message_new( | |
1019 uid=str(uuid.uuid4()), | |
1020 timestamp=time.time(), | |
1021 from_jid=self.jid.full(), | |
1022 to_jid=to_jid.full(), | |
1023 message={"": message}, | |
1024 subject={}, | |
1025 mess_type=C.MESS_TYPE_INFO, | |
1026 extra=data_format.serialise(extra), | |
1027 profile=self.profile, | |
1028 ) | |
1029 | |
1030 def _finish_connection(self, __): | |
1031 d = self.roster.request_roster() | |
1032 d.addCallback(lambda __: super(SatXMPPClient, self)._finish_connection(__)) | |
1033 | |
1034 | |
1035 @implementer(iwokkel.IDisco) | |
1036 class SatXMPPComponent(SatXMPPEntity, component.Component): | |
1037 """XMPP component | |
1038 | |
1039 This component are similar but not identical to clients. | |
1040 An entry point plugin is launched after component is connected. | |
1041 Component need to instantiate MessageProtocol itself | |
1042 """ | |
1043 | |
1044 trigger_suffix = ( | |
1045 "Component" | |
1046 ) # used for to distinguish some trigger points set in SatXMPPEntity | |
1047 is_component = True | |
1048 # XXX: set to True from entry plugin to keep messages in history for sent messages | |
1049 sendHistory = False | |
1050 # XXX: same as sendHistory but for received messaged | |
1051 receiveHistory = False | |
1052 | |
1053 def __init__(self, host_app, profile, component_jid, password, host=None, port=None, | |
1054 max_retries=C.XMPP_MAX_RETRIES): | |
1055 self.started = time.time() | |
1056 if port is None: | |
1057 port = C.XMPP_COMPONENT_PORT | |
1058 | |
1059 ## entry point ## | |
1060 entry_point = host_app.memory.get_entry_point(profile) | |
1061 try: | |
1062 self.entry_plugin = host_app.plugins[entry_point] | |
1063 except KeyError: | |
1064 raise exceptions.NotFound( | |
1065 _("The requested entry point ({entry_point}) is not available").format( | |
1066 entry_point=entry_point | |
1067 ) | |
1068 ) | |
1069 | |
1070 self.enabled_features = set() | |
1071 self.identities = [disco.DiscoIdentity("component", "generic", C.APP_NAME)] | |
1072 # jid is set automatically on bind by Twisted for Client, but not for Component | |
1073 self.jid = component_jid | |
1074 if host is None: | |
1075 try: | |
1076 host = component_jid.host.split(".", 1)[1] | |
1077 except IndexError: | |
1078 raise ValueError("Can't guess host from jid, please specify a host") | |
1079 # XXX: component.Component expect unicode jid, while Client expect jid.JID. | |
1080 # this is not consistent, so we use jid.JID for SatXMPP* | |
1081 component.Component.__init__(self, host, port, component_jid.full(), password) | |
1082 SatXMPPEntity.__init__(self, host_app, profile, max_retries) | |
1083 | |
1084 @property | |
1085 def server_jid(self): | |
1086 # FIXME: not the best way to get server jid, maybe use config option? | |
1087 return jid.JID(self.jid.host.split(".", 1)[-1]) | |
1088 | |
1089 @property | |
1090 def is_admin(self) -> bool: | |
1091 return False | |
1092 | |
1093 def _create_sub_protocols(self): | |
1094 self.messageProt = SatMessageProtocol(self.host_app) | |
1095 self.messageProt.setHandlerParent(self) | |
1096 | |
1097 def _build_dependencies(self, current, plugins, required=True): | |
1098 """build recursively dependencies needed for a plugin | |
1099 | |
1100 this method build list of plugin needed for a component and raises | |
1101 errors if they are not available or not allowed for components | |
1102 @param current(object): parent plugin to check | |
1103 use entry_point for first call | |
1104 @param plugins(list): list of validated plugins, will be filled by the method | |
1105 give an empty list for first call | |
1106 @param required(bool): True if plugin is mandatory | |
1107 for recursive calls only, should not be modified by inital caller | |
1108 @raise InternalError: one of the plugin is not handling components | |
1109 @raise KeyError: one plugin should be present in self.host_app.plugins but it | |
1110 is not | |
1111 """ | |
1112 if C.PLUG_MODE_COMPONENT not in current._info["modes"]: | |
1113 if not required: | |
1114 return | |
1115 else: | |
1116 log.error( | |
1117 _( | |
1118 "Plugin {current_name} is needed for {entry_name}, " | |
1119 "but it doesn't handle component mode" | |
1120 ).format( | |
1121 current_name=current._info["import_name"], | |
1122 entry_name=self.entry_plugin._info["import_name"], | |
1123 ) | |
1124 ) | |
1125 raise exceptions.InternalError(_("invalid plugin mode")) | |
1126 | |
1127 for import_name in current._info.get(C.PI_DEPENDENCIES, []): | |
1128 # plugins are already loaded as dependencies | |
1129 # so we know they are in self.host_app.plugins | |
1130 dep = self.host_app.plugins[import_name] | |
1131 self._build_dependencies(dep, plugins) | |
1132 | |
1133 for import_name in current._info.get(C.PI_RECOMMENDATIONS, []): | |
1134 # here plugins are only recommendations, | |
1135 # so they may not exist in self.host_app.plugins | |
1136 try: | |
1137 dep = self.host_app.plugins[import_name] | |
1138 except KeyError: | |
1139 continue | |
1140 self._build_dependencies(dep, plugins, required=False) | |
1141 | |
1142 if current not in plugins: | |
1143 # current can be required for several plugins and so | |
1144 # it can already be present in the list | |
1145 plugins.append(current) | |
1146 | |
1147 def _get_plugins_list(self): | |
1148 # XXX: for component we don't launch all plugins triggers | |
1149 # but only the ones from which there is a dependency | |
1150 plugins = [] | |
1151 self._build_dependencies(self.entry_plugin, plugins) | |
1152 return plugins | |
1153 | |
1154 def entity_connected(self): | |
1155 # we can now launch entry point | |
1156 try: | |
1157 start_cb = self.entry_plugin.componentStart | |
1158 except AttributeError: | |
1159 return | |
1160 else: | |
1161 return start_cb(self) | |
1162 | |
1163 def add_post_xml_callbacks(self, post_xml_treatments): | |
1164 if self.sendHistory: | |
1165 post_xml_treatments.addCallback( | |
1166 lambda ret: defer.ensureDeferred(self.message_add_to_history(ret)) | |
1167 ) | |
1168 | |
1169 def get_owner_from_jid(self, to_jid: jid.JID) -> jid.JID: | |
1170 """Retrieve "owner" of a component resource from the destination jid of the request | |
1171 | |
1172 This method needs plugin XEP-0106 for unescaping, if you use it you must add the | |
1173 plugin to your dependencies. | |
1174 A "user" part must be present in "to_jid" (otherwise, the component itself is addressed) | |
1175 @param to_jid: destination JID of the request | |
1176 """ | |
1177 try: | |
1178 unescape = self.host_app.plugins['XEP-0106'].unescape | |
1179 except KeyError: | |
1180 raise exceptions.MissingPlugin("Plugin XEP-0106 is needed to retrieve owner") | |
1181 else: | |
1182 user = unescape(to_jid.user) | |
1183 if '@' in user: | |
1184 # a full jid is specified | |
1185 return jid.JID(user) | |
1186 else: | |
1187 # only user part is specified, we use our own host to build the full jid | |
1188 return jid.JID(None, (user, self.host, None)) | |
1189 | |
1190 def get_owner_and_peer(self, iq_elt: domish.Element) -> Tuple[jid.JID, jid.JID]: | |
1191 """Retrieve owner of a component jid, and the jid of the requesting peer | |
1192 | |
1193 "owner" is found by either unescaping full jid from node, or by combining node | |
1194 with our host. | |
1195 Peer jid is the requesting jid from the IQ element | |
1196 @param iq_elt: IQ stanza sent from the requested | |
1197 @return: owner and peer JIDs | |
1198 """ | |
1199 to_jid = jid.JID(iq_elt['to']) | |
1200 if to_jid.user: | |
1201 owner = self.get_owner_from_jid(to_jid) | |
1202 else: | |
1203 owner = jid.JID(iq_elt["from"]).userhostJID() | |
1204 | |
1205 peer_jid = jid.JID(iq_elt["from"]) | |
1206 return peer_jid, owner | |
1207 | |
1208 def get_virtual_client(self, jid_: jid.JID) -> SatXMPPEntity: | |
1209 """Get client for this component with a specified jid | |
1210 | |
1211 This is needed to perform operations with a virtual JID corresponding to a virtual | |
1212 entity (e.g. identified of a legacy network account) instead of the JID of the | |
1213 gateway itself. | |
1214 @param jid_: virtual JID to use | |
1215 @return: virtual client | |
1216 """ | |
1217 client = copy.copy(self) | |
1218 client.jid = jid_ | |
1219 return client | |
1220 | |
1221 | |
1222 class SatMessageProtocol(xmppim.MessageProtocol): | |
1223 | |
1224 def __init__(self, host): | |
1225 xmppim.MessageProtocol.__init__(self) | |
1226 self.host = host | |
1227 | |
1228 @property | |
1229 def client(self): | |
1230 return self.parent | |
1231 | |
1232 def normalize_ns(self, elt: domish.Element, namespace: Optional[str]) -> None: | |
1233 if elt.uri == namespace: | |
1234 elt.defaultUri = elt.uri = C.NS_CLIENT | |
1235 for child in elt.elements(): | |
1236 self.normalize_ns(child, namespace) | |
1237 | |
1238 def parse_message(self, message_elt): | |
1239 """Parse a message XML and return message_data | |
1240 | |
1241 @param message_elt(domish.Element): raw <message> xml | |
1242 @param client(SatXMPPClient, None): client to map message id to uid | |
1243 if None, mapping will not be done | |
1244 @return(dict): message data | |
1245 """ | |
1246 if message_elt.name != "message": | |
1247 log.warning(_( | |
1248 "parse_message used with a non <message/> stanza, ignoring: {xml}" | |
1249 .format(xml=message_elt.toXml()))) | |
1250 return {} | |
1251 | |
1252 if message_elt.uri == None: | |
1253 # xmlns may be None when wokkel element parsing strip out root namespace | |
1254 self.normalize_ns(message_elt, None) | |
1255 elif message_elt.uri != C.NS_CLIENT: | |
1256 log.warning(_( | |
1257 "received <message> with a wrong namespace: {xml}" | |
1258 .format(xml=message_elt.toXml()))) | |
1259 | |
1260 client = self.parent | |
1261 | |
1262 if not message_elt.hasAttribute('to'): | |
1263 message_elt['to'] = client.jid.full() | |
1264 | |
1265 message = {} | |
1266 subject = {} | |
1267 extra = {} | |
1268 data = { | |
1269 "from": jid.JID(message_elt["from"]), | |
1270 "to": jid.JID(message_elt["to"]), | |
1271 "uid": message_elt.getAttribute( | |
1272 "uid", str(uuid.uuid4()) | |
1273 ), # XXX: uid is not a standard attribute but may be added by plugins | |
1274 "message": message, | |
1275 "subject": subject, | |
1276 "type": message_elt.getAttribute("type", "normal"), | |
1277 "extra": extra, | |
1278 } | |
1279 | |
1280 try: | |
1281 message_id = data["extra"]["message_id"] = message_elt["id"] | |
1282 except KeyError: | |
1283 pass | |
1284 else: | |
1285 client.mess_id2uid[(data["from"], message_id)] = data["uid"] | |
1286 | |
1287 # message | |
1288 for e in message_elt.elements(C.NS_CLIENT, "body"): | |
1289 message[e.getAttribute((C.NS_XML, "lang"), "")] = str(e) | |
1290 | |
1291 # subject | |
1292 for e in message_elt.elements(C.NS_CLIENT, "subject"): | |
1293 subject[e.getAttribute((C.NS_XML, "lang"), "")] = str(e) | |
1294 | |
1295 # delay and timestamp | |
1296 try: | |
1297 received_timestamp = message_elt._received_timestamp | |
1298 except AttributeError: | |
1299 # message_elt._received_timestamp should have been set in onMessage | |
1300 # but if parse_message is called directly, it can be missing | |
1301 log.debug("missing received timestamp for {message_elt}".format( | |
1302 message_elt=message_elt)) | |
1303 received_timestamp = time.time() | |
1304 | |
1305 try: | |
1306 delay_elt = next(message_elt.elements(delay.NS_DELAY, "delay")) | |
1307 except StopIteration: | |
1308 data["timestamp"] = received_timestamp | |
1309 else: | |
1310 parsed_delay = delay.Delay.fromElement(delay_elt) | |
1311 data["timestamp"] = calendar.timegm(parsed_delay.stamp.utctimetuple()) | |
1312 data["received_timestamp"] = received_timestamp | |
1313 if parsed_delay.sender: | |
1314 data["delay_sender"] = parsed_delay.sender.full() | |
1315 | |
1316 self.host.trigger.point("message_parse", client, message_elt, data) | |
1317 return data | |
1318 | |
1319 def _on_message_start_workflow(self, cont, client, message_elt, post_treat): | |
1320 """Parse message and do post treatments | |
1321 | |
1322 It is the first callback called after message_received trigger | |
1323 @param cont(bool): workflow will continue only if this is True | |
1324 @param message_elt(domish.Element): message stanza | |
1325 may have be modified by triggers | |
1326 @param post_treat(defer.Deferred): post parsing treatments | |
1327 """ | |
1328 if not cont: | |
1329 return | |
1330 data = self.parse_message(message_elt) | |
1331 post_treat.addCallback(self.complete_attachments) | |
1332 post_treat.addCallback(self.skip_empty_message) | |
1333 if not client.is_component or client.receiveHistory: | |
1334 post_treat.addCallback( | |
1335 lambda ret: defer.ensureDeferred(self.add_to_history(ret)) | |
1336 ) | |
1337 if not client.is_component: | |
1338 post_treat.addCallback(self.bridge_signal, data) | |
1339 post_treat.addErrback(self.cancel_error_trap) | |
1340 post_treat.callback(data) | |
1341 | |
1342 def onMessage(self, message_elt): | |
1343 # TODO: handle threads | |
1344 message_elt._received_timestamp = time.time() | |
1345 client = self.parent | |
1346 if not "from" in message_elt.attributes: | |
1347 message_elt["from"] = client.jid.host | |
1348 log.debug(_("got message from: {from_}").format(from_=message_elt["from"])) | |
1349 if self.client.is_component and message_elt.uri == component.NS_COMPONENT_ACCEPT: | |
1350 # we use client namespace all the time to simplify parsing | |
1351 self.normalize_ns(message_elt, component.NS_COMPONENT_ACCEPT) | |
1352 | |
1353 # plugin can add their treatments to this deferred | |
1354 post_treat = defer.Deferred() | |
1355 | |
1356 d = self.host.trigger.async_point( | |
1357 "message_received", client, message_elt, post_treat | |
1358 ) | |
1359 | |
1360 d.addCallback(self._on_message_start_workflow, client, message_elt, post_treat) | |
1361 | |
1362 def complete_attachments(self, data): | |
1363 """Complete missing metadata of attachments""" | |
1364 for attachment in data['extra'].get(C.KEY_ATTACHMENTS, []): | |
1365 if "name" not in attachment and "url" in attachment: | |
1366 name = (Path(unquote(urlparse(attachment['url']).path)).name | |
1367 or C.FILE_DEFAULT_NAME) | |
1368 attachment["name"] = name | |
1369 if ((C.KEY_ATTACHMENTS_MEDIA_TYPE not in attachment | |
1370 and "name" in attachment)): | |
1371 media_type = mimetypes.guess_type(attachment['name'], strict=False)[0] | |
1372 if media_type: | |
1373 attachment[C.KEY_ATTACHMENTS_MEDIA_TYPE] = media_type | |
1374 | |
1375 return data | |
1376 | |
1377 def skip_empty_message(self, data): | |
1378 if not data["message"] and not data["extra"] and not data["subject"]: | |
1379 raise failure.Failure(exceptions.CancelError("Cancelled empty message")) | |
1380 return data | |
1381 | |
1382 async def add_to_history(self, data): | |
1383 if data.pop("history", None) == C.HISTORY_SKIP: | |
1384 log.debug("history is skipped as requested") | |
1385 data["extra"]["history"] = C.HISTORY_SKIP | |
1386 else: | |
1387 # we need a message to store | |
1388 if self.parent.is_message_printable(data): | |
1389 return await self.host.memory.add_to_history(self.parent, data) | |
1390 else: | |
1391 log.debug("not storing empty message to history: {data}" | |
1392 .format(data=data)) | |
1393 | |
1394 def bridge_signal(self, __, data): | |
1395 try: | |
1396 data["extra"]["received_timestamp"] = str(data["received_timestamp"]) | |
1397 data["extra"]["delay_sender"] = data["delay_sender"] | |
1398 except KeyError: | |
1399 pass | |
1400 if self.client.encryption.isEncrypted(data): | |
1401 data["extra"]["encrypted"] = True | |
1402 if data is not None: | |
1403 if self.parent.is_message_printable(data): | |
1404 self.host.bridge.message_new( | |
1405 data["uid"], | |
1406 data["timestamp"], | |
1407 data["from"].full(), | |
1408 data["to"].full(), | |
1409 data["message"], | |
1410 data["subject"], | |
1411 data["type"], | |
1412 data_format.serialise(data["extra"]), | |
1413 profile=self.parent.profile, | |
1414 ) | |
1415 else: | |
1416 log.debug("Discarding bridge signal for empty message: {data}".format( | |
1417 data=data)) | |
1418 return data | |
1419 | |
1420 def cancel_error_trap(self, failure_): | |
1421 """A message sending can be cancelled by a plugin treatment""" | |
1422 failure_.trap(exceptions.CancelError) | |
1423 | |
1424 | |
1425 class SatRosterProtocol(xmppim.RosterClientProtocol): | |
1426 | |
1427 def __init__(self, host): | |
1428 xmppim.RosterClientProtocol.__init__(self) | |
1429 self.host = host | |
1430 self.got_roster = defer.Deferred() # called when roster is received and ready | |
1431 # XXX: the two following dicts keep a local copy of the roster | |
1432 self._jids = {} # map from jids to RosterItem: key=jid value=RosterItem | |
1433 self._groups = {} # map from groups to jids: key=group value=set of jids | |
1434 | |
1435 def __contains__(self, entity_jid): | |
1436 return self.is_jid_in_roster(entity_jid) | |
1437 | |
1438 @property | |
1439 def versioning(self): | |
1440 """True if server support roster versioning""" | |
1441 return (NS_ROSTER_VER, 'ver') in self.parent.xmlstream.features | |
1442 | |
1443 @property | |
1444 def roster_cache(self): | |
1445 """Cache of roster from storage | |
1446 | |
1447 This property return a new PersistentDict on each call, it must be loaded | |
1448 manually if necessary | |
1449 """ | |
1450 return persistent.PersistentDict(NS_ROSTER_VER, self.parent.profile) | |
1451 | |
1452 def _register_item(self, item): | |
1453 """Register item in local cache | |
1454 | |
1455 item must be already registered in self._jids before this method is called | |
1456 @param item (RosterIem): item added | |
1457 """ | |
1458 log.debug("registering item: {}".format(item.entity.full())) | |
1459 if item.entity.resource: | |
1460 log.warning( | |
1461 "Received a roster item with a resource, this is not common but not " | |
1462 "restricted by RFC 6121, this case may be not well tested." | |
1463 ) | |
1464 if not item.subscriptionTo: | |
1465 if not item.subscriptionFrom: | |
1466 log.info( | |
1467 _("There's no subscription between you and [{}]!").format( | |
1468 item.entity.full() | |
1469 ) | |
1470 ) | |
1471 else: | |
1472 log.info(_("You are not subscribed to [{}]!").format(item.entity.full())) | |
1473 if not item.subscriptionFrom: | |
1474 log.info(_("[{}] is not subscribed to you!").format(item.entity.full())) | |
1475 | |
1476 for group in item.groups: | |
1477 self._groups.setdefault(group, set()).add(item.entity) | |
1478 | |
1479 @defer.inlineCallbacks | |
1480 def _cache_roster(self, version): | |
1481 """Serialise local roster and save it to storage | |
1482 | |
1483 @param version(unicode): version of roster in local cache | |
1484 """ | |
1485 roster_cache = self.roster_cache | |
1486 yield roster_cache.clear() | |
1487 roster_cache[ROSTER_VER_KEY] = version | |
1488 for roster_jid, roster_item in self._jids.items(): | |
1489 roster_jid_s = roster_jid.full() | |
1490 roster_item_elt = roster_item.toElement().toXml() | |
1491 roster_cache[roster_jid_s] = roster_item_elt | |
1492 | |
1493 @defer.inlineCallbacks | |
1494 def resync(self): | |
1495 """Ask full roster to resync database | |
1496 | |
1497 this should not be necessary, but may be used if user suspsect roster | |
1498 to be somehow corrupted | |
1499 """ | |
1500 roster_cache = self.roster_cache | |
1501 yield roster_cache.clear() | |
1502 self._jids.clear() | |
1503 self._groups.clear() | |
1504 yield self.request_roster() | |
1505 | |
1506 @defer.inlineCallbacks | |
1507 def request_roster(self): | |
1508 """Ask the server for Roster list """ | |
1509 if self.versioning: | |
1510 log.info(_("our server support roster versioning, we use it")) | |
1511 roster_cache = self.roster_cache | |
1512 yield roster_cache.load() | |
1513 try: | |
1514 version = roster_cache[ROSTER_VER_KEY] | |
1515 except KeyError: | |
1516 log.info(_("no roster in cache, we start fresh")) | |
1517 # u"" means we use versioning without valid roster in cache | |
1518 version = "" | |
1519 else: | |
1520 log.info(_("We have roster v{version} in cache").format(version=version)) | |
1521 # we deserialise cached roster to our local cache | |
1522 for roster_jid_s, roster_item_elt_s in roster_cache.items(): | |
1523 if roster_jid_s == ROSTER_VER_KEY: | |
1524 continue | |
1525 roster_jid = jid.JID(roster_jid_s) | |
1526 roster_item_elt = generic.parseXml(roster_item_elt_s.encode('utf-8')) | |
1527 roster_item = xmppim.RosterItem.fromElement(roster_item_elt) | |
1528 self._jids[roster_jid] = roster_item | |
1529 self._register_item(roster_item) | |
1530 else: | |
1531 log.warning(_("our server doesn't support roster versioning")) | |
1532 version = None | |
1533 | |
1534 log.debug("requesting roster") | |
1535 roster = yield self.getRoster(version=version) | |
1536 if roster is None: | |
1537 log.debug("empty roster result received, we'll get roster item with roster " | |
1538 "pushes") | |
1539 else: | |
1540 # a full roster is received | |
1541 self._groups.clear() | |
1542 self._jids = roster | |
1543 for item in roster.values(): | |
1544 if not item.subscriptionTo and not item.subscriptionFrom and not item.ask: | |
1545 # XXX: current behaviour: we don't want contact in our roster list | |
1546 # if there is no presence subscription | |
1547 # may change in the future | |
1548 log.info( | |
1549 "Removing contact {} from roster because there is no presence " | |
1550 "subscription".format( | |
1551 item.jid | |
1552 ) | |
1553 ) | |
1554 self.removeItem(item.entity) # FIXME: to be checked | |
1555 else: | |
1556 self._register_item(item) | |
1557 yield self._cache_roster(roster.version) | |
1558 | |
1559 if not self.got_roster.called: | |
1560 # got_roster may already be called if we use resync() | |
1561 self.got_roster.callback(None) | |
1562 | |
1563 def removeItem(self, to_jid): | |
1564 """Remove a contact from roster list | |
1565 @param to_jid: a JID instance | |
1566 @return: Deferred | |
1567 """ | |
1568 return xmppim.RosterClientProtocol.removeItem(self, to_jid) | |
1569 | |
1570 def get_attributes(self, item): | |
1571 """Return dictionary of attributes as used in bridge from a RosterItem | |
1572 | |
1573 @param item: RosterItem | |
1574 @return: dictionary of attributes | |
1575 """ | |
1576 item_attr = { | |
1577 "to": str(item.subscriptionTo), | |
1578 "from": str(item.subscriptionFrom), | |
1579 "ask": str(item.ask), | |
1580 } | |
1581 if item.name: | |
1582 item_attr["name"] = item.name | |
1583 return item_attr | |
1584 | |
1585 def setReceived(self, request): | |
1586 item = request.item | |
1587 entity = item.entity | |
1588 log.info(_("adding {entity} to roster").format(entity=entity.full())) | |
1589 if request.version is not None: | |
1590 # we update the cache in storage | |
1591 roster_cache = self.roster_cache | |
1592 roster_cache[entity.full()] = item.toElement().toXml() | |
1593 roster_cache[ROSTER_VER_KEY] = request.version | |
1594 | |
1595 try: # update the cache for the groups the contact has been removed from | |
1596 left_groups = set(self._jids[entity].groups).difference(item.groups) | |
1597 for group in left_groups: | |
1598 jids_set = self._groups[group] | |
1599 jids_set.remove(entity) | |
1600 if not jids_set: | |
1601 del self._groups[group] | |
1602 except KeyError: | |
1603 pass # no previous item registration (or it's been cleared) | |
1604 self._jids[entity] = item | |
1605 self._register_item(item) | |
1606 self.host.bridge.contact_new( | |
1607 entity.full(), self.get_attributes(item), list(item.groups), | |
1608 self.parent.profile | |
1609 ) | |
1610 | |
1611 def removeReceived(self, request): | |
1612 entity = request.item.entity | |
1613 log.info(_("removing {entity} from roster").format(entity=entity.full())) | |
1614 if request.version is not None: | |
1615 # we update the cache in storage | |
1616 roster_cache = self.roster_cache | |
1617 try: | |
1618 del roster_cache[request.item.entity.full()] | |
1619 except KeyError: | |
1620 # because we don't use load(), cache won't have the key, but it | |
1621 # will be deleted from storage anyway | |
1622 pass | |
1623 roster_cache[ROSTER_VER_KEY] = request.version | |
1624 | |
1625 # we first remove item from local cache (self._groups and self._jids) | |
1626 try: | |
1627 item = self._jids.pop(entity) | |
1628 except KeyError: | |
1629 log.error( | |
1630 "Received a roster remove event for an item not in cache ({})".format( | |
1631 entity | |
1632 ) | |
1633 ) | |
1634 return | |
1635 for group in item.groups: | |
1636 try: | |
1637 jids_set = self._groups[group] | |
1638 jids_set.remove(entity) | |
1639 if not jids_set: | |
1640 del self._groups[group] | |
1641 except KeyError: | |
1642 log.warning( | |
1643 f"there is no cache for the group [{group}] of the removed roster " | |
1644 f"item [{entity}]" | |
1645 ) | |
1646 | |
1647 # then we send the bridge signal | |
1648 self.host.bridge.contact_deleted(entity.full(), self.parent.profile) | |
1649 | |
1650 def get_groups(self): | |
1651 """Return a list of groups""" | |
1652 return list(self._groups.keys()) | |
1653 | |
1654 def get_item(self, entity_jid): | |
1655 """Return RosterItem for a given jid | |
1656 | |
1657 @param entity_jid(jid.JID): jid of the contact | |
1658 @return(RosterItem, None): RosterItem instance | |
1659 None if contact is not in cache | |
1660 """ | |
1661 return self._jids.get(entity_jid, None) | |
1662 | |
1663 def get_jids(self): | |
1664 """Return all jids of the roster""" | |
1665 return list(self._jids.keys()) | |
1666 | |
1667 def is_jid_in_roster(self, entity_jid): | |
1668 """Return True if jid is in roster""" | |
1669 if not isinstance(entity_jid, jid.JID): | |
1670 raise exceptions.InternalError( | |
1671 f"a JID is expected, not {type(entity_jid)}: {entity_jid!r}") | |
1672 return entity_jid in self._jids | |
1673 | |
1674 def is_subscribed_from(self, entity_jid: jid.JID) -> bool: | |
1675 """Return True if entity is authorised to see our presence""" | |
1676 try: | |
1677 item = self._jids[entity_jid.userhostJID()] | |
1678 except KeyError: | |
1679 return False | |
1680 return item.subscriptionFrom | |
1681 | |
1682 def is_subscribed_to(self, entity_jid: jid.JID) -> bool: | |
1683 """Return True if we are subscribed to entity""" | |
1684 try: | |
1685 item = self._jids[entity_jid.userhostJID()] | |
1686 except KeyError: | |
1687 return False | |
1688 return item.subscriptionTo | |
1689 | |
1690 def get_items(self): | |
1691 """Return all items of the roster""" | |
1692 return list(self._jids.values()) | |
1693 | |
1694 def get_jids_from_group(self, group): | |
1695 try: | |
1696 return self._groups[group] | |
1697 except KeyError: | |
1698 raise exceptions.UnknownGroupError(group) | |
1699 | |
1700 def get_jids_set(self, type_, groups=None): | |
1701 """Helper method to get a set of jids | |
1702 | |
1703 @param type_(unicode): one of: | |
1704 C.ALL: get all jids from roster | |
1705 C.GROUP: get jids from groups (listed in "groups") | |
1706 @groups(list[unicode]): list of groups used if type_==C.GROUP | |
1707 @return (set(jid.JID)): set of selected jids | |
1708 """ | |
1709 if type_ == C.ALL and groups is not None: | |
1710 raise ValueError("groups must not be set for {} type".format(C.ALL)) | |
1711 | |
1712 if type_ == C.ALL: | |
1713 return set(self.get_jids()) | |
1714 elif type_ == C.GROUP: | |
1715 jids = set() | |
1716 for group in groups: | |
1717 jids.update(self.get_jids_from_group(group)) | |
1718 return jids | |
1719 else: | |
1720 raise ValueError("Unexpected type_ {}".format(type_)) | |
1721 | |
1722 def get_nick(self, entity_jid): | |
1723 """Return a nick name for an entity | |
1724 | |
1725 return nick choosed by user if available | |
1726 else return user part of entity_jid | |
1727 """ | |
1728 item = self.get_item(entity_jid) | |
1729 if item is None: | |
1730 return entity_jid.user | |
1731 else: | |
1732 return item.name or entity_jid.user | |
1733 | |
1734 | |
1735 class SatPresenceProtocol(xmppim.PresenceClientProtocol): | |
1736 | |
1737 def __init__(self, host): | |
1738 xmppim.PresenceClientProtocol.__init__(self) | |
1739 self.host = host | |
1740 | |
1741 @property | |
1742 def client(self): | |
1743 return self.parent | |
1744 | |
1745 def send(self, obj): | |
1746 presence_d = defer.succeed(None) | |
1747 if not self.host.trigger.point("Presence send", self.parent, obj, presence_d): | |
1748 return | |
1749 presence_d.addCallback(lambda __: super(SatPresenceProtocol, self).send(obj)) | |
1750 return presence_d | |
1751 | |
1752 def availableReceived(self, entity, show=None, statuses=None, priority=0): | |
1753 if not statuses: | |
1754 statuses = {} | |
1755 | |
1756 if None in statuses: # we only want string keys | |
1757 statuses[C.PRESENCE_STATUSES_DEFAULT] = statuses.pop(None) | |
1758 | |
1759 if not self.host.trigger.point( | |
1760 "presence_received", self.parent, entity, show, priority, statuses | |
1761 ): | |
1762 return | |
1763 | |
1764 self.host.memory.set_presence_status( | |
1765 entity, show or "", int(priority), statuses, self.parent.profile | |
1766 ) | |
1767 | |
1768 # now it's time to notify frontends | |
1769 self.host.bridge.presence_update( | |
1770 entity.full(), show or "", int(priority), statuses, self.parent.profile | |
1771 ) | |
1772 | |
1773 def unavailableReceived(self, entity, statuses=None): | |
1774 log.debug( | |
1775 _("presence update for [%(entity)s] (unavailable, statuses=%(statuses)s)") | |
1776 % {"entity": entity, C.PRESENCE_STATUSES: statuses} | |
1777 ) | |
1778 | |
1779 if not statuses: | |
1780 statuses = {} | |
1781 | |
1782 if None in statuses: # we only want string keys | |
1783 statuses[C.PRESENCE_STATUSES_DEFAULT] = statuses.pop(None) | |
1784 | |
1785 if not self.host.trigger.point( | |
1786 "presence_received", self.parent, entity, C.PRESENCE_UNAVAILABLE, 0, statuses, | |
1787 ): | |
1788 return | |
1789 | |
1790 # now it's time to notify frontends | |
1791 # if the entity is not known yet in this session or is already unavailable, | |
1792 # there is no need to send an unavailable signal | |
1793 try: | |
1794 presence = self.host.memory.get_entity_datum( | |
1795 self.client, entity, "presence" | |
1796 ) | |
1797 except (KeyError, exceptions.UnknownEntityError): | |
1798 # the entity has not been seen yet in this session | |
1799 pass | |
1800 else: | |
1801 if presence.show != C.PRESENCE_UNAVAILABLE: | |
1802 self.host.bridge.presence_update( | |
1803 entity.full(), | |
1804 C.PRESENCE_UNAVAILABLE, | |
1805 0, | |
1806 statuses, | |
1807 self.parent.profile, | |
1808 ) | |
1809 | |
1810 self.host.memory.set_presence_status( | |
1811 entity, C.PRESENCE_UNAVAILABLE, 0, statuses, self.parent.profile | |
1812 ) | |
1813 | |
1814 def available(self, entity=None, show=None, statuses=None, priority=None): | |
1815 """Set a presence and statuses. | |
1816 | |
1817 @param entity (jid.JID): entity | |
1818 @param show (unicode): value in ('unavailable', '', 'away', 'xa', 'chat', 'dnd') | |
1819 @param statuses (dict{unicode: unicode}): multilingual statuses with | |
1820 the entry key beeing a language code on 2 characters or "default". | |
1821 """ | |
1822 if priority is None: | |
1823 try: | |
1824 priority = int( | |
1825 self.host.memory.param_get_a( | |
1826 "Priority", "Connection", profile_key=self.parent.profile | |
1827 ) | |
1828 ) | |
1829 except ValueError: | |
1830 priority = 0 | |
1831 | |
1832 if statuses is None: | |
1833 statuses = {} | |
1834 | |
1835 # default for us is None for wokkel | |
1836 # so we must temporarily switch to wokkel's convention... | |
1837 if C.PRESENCE_STATUSES_DEFAULT in statuses: | |
1838 statuses[None] = statuses.pop(C.PRESENCE_STATUSES_DEFAULT) | |
1839 | |
1840 presence_elt = xmppim.AvailablePresence(entity, show, statuses, priority) | |
1841 | |
1842 # ... before switching back | |
1843 if None in statuses: | |
1844 statuses["default"] = statuses.pop(None) | |
1845 | |
1846 if not self.host.trigger.point("presence_available", presence_elt, self.parent): | |
1847 return | |
1848 return self.send(presence_elt) | |
1849 | |
1850 @defer.inlineCallbacks | |
1851 def subscribed(self, entity): | |
1852 yield self.parent.roster.got_roster | |
1853 xmppim.PresenceClientProtocol.subscribed(self, entity) | |
1854 self.host.memory.del_waiting_sub(entity.userhost(), self.parent.profile) | |
1855 item = self.parent.roster.get_item(entity) | |
1856 if ( | |
1857 not item or not item.subscriptionTo | |
1858 ): # we automatically subscribe to 'to' presence | |
1859 log.debug(_('sending automatic "from" subscription request')) | |
1860 self.subscribe(entity) | |
1861 | |
1862 def unsubscribed(self, entity): | |
1863 xmppim.PresenceClientProtocol.unsubscribed(self, entity) | |
1864 self.host.memory.del_waiting_sub(entity.userhost(), self.parent.profile) | |
1865 | |
1866 def subscribedReceived(self, entity): | |
1867 log.debug(_("subscription approved for [%s]") % entity.userhost()) | |
1868 self.host.bridge.subscribe("subscribed", entity.userhost(), self.parent.profile) | |
1869 | |
1870 def unsubscribedReceived(self, entity): | |
1871 log.debug(_("unsubscription confirmed for [%s]") % entity.userhost()) | |
1872 self.host.bridge.subscribe("unsubscribed", entity.userhost(), self.parent.profile) | |
1873 | |
1874 @defer.inlineCallbacks | |
1875 def subscribeReceived(self, entity): | |
1876 log.debug(_("subscription request from [%s]") % entity.userhost()) | |
1877 yield self.parent.roster.got_roster | |
1878 item = self.parent.roster.get_item(entity) | |
1879 if item and item.subscriptionTo: | |
1880 # We automatically accept subscription if we are already subscribed to | |
1881 # contact presence | |
1882 log.debug(_("sending automatic subscription acceptance")) | |
1883 self.subscribed(entity) | |
1884 else: | |
1885 self.host.memory.add_waiting_sub( | |
1886 "subscribe", entity.userhost(), self.parent.profile | |
1887 ) | |
1888 self.host.bridge.subscribe( | |
1889 "subscribe", entity.userhost(), self.parent.profile | |
1890 ) | |
1891 | |
1892 @defer.inlineCallbacks | |
1893 def unsubscribeReceived(self, entity): | |
1894 log.debug(_("unsubscription asked for [%s]") % entity.userhost()) | |
1895 yield self.parent.roster.got_roster | |
1896 item = self.parent.roster.get_item(entity) | |
1897 if item and item.subscriptionFrom: # we automatically remove contact | |
1898 log.debug(_("automatic contact deletion")) | |
1899 self.host.contact_del(entity, self.parent.profile) | |
1900 self.host.bridge.subscribe("unsubscribe", entity.userhost(), self.parent.profile) | |
1901 | |
1902 | |
1903 @implementer(iwokkel.IDisco) | |
1904 class SatDiscoProtocol(disco.DiscoClientProtocol): | |
1905 | |
1906 def __init__(self, host): | |
1907 disco.DiscoClientProtocol.__init__(self) | |
1908 | |
1909 def getDiscoInfo(self, requestor, target, nodeIdentifier=""): | |
1910 # those features are implemented in Wokkel (or sat_tmp.wokkel) | |
1911 # and thus are always available | |
1912 return [disco.DiscoFeature(NS_X_DATA), | |
1913 disco.DiscoFeature(NS_XML_ELEMENT), | |
1914 disco.DiscoFeature(NS_DISCO_INFO)] | |
1915 | |
1916 def getDiscoItems(self, requestor, target, nodeIdentifier=""): | |
1917 return [] | |
1918 | |
1919 | |
1920 class SatFallbackHandler(generic.FallbackHandler): | |
1921 def __init__(self, host): | |
1922 generic.FallbackHandler.__init__(self) | |
1923 | |
1924 def iqFallback(self, iq): | |
1925 if iq.handled is True: | |
1926 return | |
1927 log.debug("iqFallback: xml = [%s]" % (iq.toXml())) | |
1928 generic.FallbackHandler.iqFallback(self, iq) | |
1929 | |
1930 | |
1931 class SatVersionHandler(generic.VersionHandler): | |
1932 | |
1933 def getDiscoInfo(self, requestor, target, node): | |
1934 # XXX: We need to work around wokkel's behaviour (namespace not added if there | |
1935 # is a node) as it cause issues with XEP-0115 & PEP (XEP-0163): there is a | |
1936 # node when server ask for disco info, and not when we generate the key, so | |
1937 # the hash is used with different disco features, and when the server (seen | |
1938 # on ejabberd) generate its own hash for security check it reject our | |
1939 # features (resulting in e.g. no notification on PEP) | |
1940 return generic.VersionHandler.getDiscoInfo(self, requestor, target, None) | |
1941 | |
1942 | |
1943 @implementer(iwokkel.IDisco) | |
1944 class SatIdentityHandler(XMPPHandler): | |
1945 """Manage disco Identity of SàT.""" | |
1946 # TODO: dynamic identity update (see docstring). Note that a XMPP entity can have | |
1947 # several identities | |
1948 | |
1949 def getDiscoInfo(self, requestor, target, nodeIdentifier=""): | |
1950 return self.parent.identities | |
1951 | |
1952 def getDiscoItems(self, requestor, target, nodeIdentifier=""): | |
1953 return [] |