comparison libervia/backend/core/xmpp.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200
parents sat/core/xmpp.py@c23cad65ae99
children bc7d45dedeb0
comparison
equal deleted inserted replaced
4070:d10748475025 4071:4b842c1fb686
1 #!/usr/bin/env python3
2
3 # Libervia: an XMPP client
4 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
5
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU Affero General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU Affero General Public License for more details.
15
16 # You should have received a copy of the GNU Affero General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19 import calendar
20 import copy
21 from functools import partial
22 import mimetypes
23 from pathlib import Path
24 import sys
25 import time
26 from typing import Callable, Dict, Tuple, Optional
27 from urllib.parse import unquote, urlparse
28 import uuid
29
30 import shortuuid
31 from twisted.internet import defer, error as internet_error
32 from twisted.internet import ssl
33 from twisted.python import failure
34 from twisted.words.protocols.jabber import xmlstream
35 from twisted.words.protocols.jabber import error
36 from twisted.words.protocols.jabber import jid
37 from twisted.words.protocols.jabber.xmlstream import XMPPHandler
38 from twisted.words.xish import domish
39 from wokkel import client as wokkel_client, disco, generic, iwokkel, xmppim
40 from wokkel import component
41 from wokkel import delay
42 from zope.interface import implementer
43
44 from libervia.backend.core import exceptions
45 from libervia.backend.core import core_types
46 from libervia.backend.core.constants import Const as C
47 from libervia.backend.core.i18n import _
48 from libervia.backend.core.log import getLogger
49 from libervia.backend.memory import cache
50 from libervia.backend.memory import encryption
51 from libervia.backend.memory import persistent
52 from libervia.backend.tools import xml_tools
53 from libervia.backend.tools import utils
54 from libervia.backend.tools.common import data_format
55
56 log = getLogger(__name__)
57
58
59 NS_X_DATA = "jabber:x:data"
60 NS_DISCO_INFO = "http://jabber.org/protocol/disco#info"
61 NS_XML_ELEMENT = "urn:xmpp:xml-element"
62 NS_ROSTER_VER = "urn:xmpp:features:rosterver"
63 # we use 2 "@" which is illegal in a jid, to be sure we are not mixing keys
64 # with roster jids
65 ROSTER_VER_KEY = "@version@"
66
67
68 class ClientPluginWrapper:
69 """Use a plugin with default value if plugin is missing"""
70
71 def __init__(self, client, plugin_name, missing):
72 self.client = client
73 self.plugin = client.host_app.plugins.get(plugin_name)
74 if self.plugin is None:
75 self.plugin_name = plugin_name
76 self.missing = missing
77
78 def __getattr__(self, attr):
79 if self.plugin is None:
80 missing = self.missing
81 if isinstance(missing, type) and issubclass(missing, Exception):
82 raise missing(f"plugin {self.plugin_name!r} is not available")
83 elif isinstance(missing, Exception):
84 raise missing
85 else:
86 return lambda *args, **kwargs: missing
87 return partial(getattr(self.plugin, attr), self.client)
88
89
90 class SatXMPPEntity(core_types.SatXMPPEntity):
91 """Common code for Client and Component"""
92 # profile is added there when start_connection begins and removed when it is finished
93 profiles_connecting = set()
94
95 def __init__(self, host_app, profile, max_retries):
96 factory = self.factory
97
98 # we monkey patch clientConnectionLost to handle network_enabled/network_disabled
99 # and to allow plugins to tune reconnection mechanism
100 clientConnectionFailed_ori = factory.clientConnectionFailed
101 clientConnectionLost_ori = factory.clientConnectionLost
102 factory.clientConnectionFailed = partial(
103 self.connection_terminated, term_type="failed", cb=clientConnectionFailed_ori)
104 factory.clientConnectionLost = partial(
105 self.connection_terminated, term_type="lost", cb=clientConnectionLost_ori)
106
107 factory.maxRetries = max_retries
108 factory.maxDelay = 30
109 # when self._connected_d is None, we are not connected
110 # else, it's a deferred which fire on disconnection
111 self._connected_d = None
112 self.profile = profile
113 self.host_app = host_app
114 self.cache = cache.Cache(host_app, profile)
115 self.mess_id2uid = {} # map from message id to uid used in history.
116 # Key: (full_jid, message_id) Value: uid
117 # this Deferred fire when entity is connected
118 self.conn_deferred = defer.Deferred()
119 self._progress_cb = {} # callback called when a progress is requested
120 # (key = progress id)
121 self.actions = {} # used to keep track of actions for retrieval (key = action_id)
122 self.encryption = encryption.EncryptionHandler(self)
123
124 def __str__(self):
125 return f"Client for profile {self.profile}"
126
127 def __repr__(self):
128 return f"{super().__repr__()} - profile: {self.profile!r}"
129
130 ## initialisation ##
131
132 async def _call_connection_triggers(self, connection_timer):
133 """Call conneting trigger prepare connected trigger
134
135 @param plugins(iterable): plugins to use
136 @return (list[object, callable]): plugin to trigger tuples with:
137 - plugin instance
138 - profile_connected* triggers (to call after connection)
139 """
140 plugin_conn_cb = []
141 for plugin in self._get_plugins_list():
142 # we check if plugin handle client mode
143 if plugin.is_handler:
144 plugin.get_handler(self).setHandlerParent(self)
145
146 # profile_connecting/profile_connected methods handling
147
148 timer = connection_timer[plugin] = {
149 "total": 0
150 }
151 # profile connecting is called right now (before actually starting client)
152 connecting_cb = getattr(plugin, "profile_connecting", None)
153 if connecting_cb is not None:
154 connecting_start = time.time()
155 await utils.as_deferred(connecting_cb, self)
156 timer["connecting"] = time.time() - connecting_start
157 timer["total"] += timer["connecting"]
158
159 # profile connected is called after client is ready and roster is got
160 connected_cb = getattr(plugin, "profile_connected", None)
161 if connected_cb is not None:
162 plugin_conn_cb.append((plugin, connected_cb))
163
164 return plugin_conn_cb
165
166 def _get_plugins_list(self):
167 """Return list of plugin to use
168
169 need to be implemented by subclasses
170 this list is used to call profileConnect* triggers
171 @return(iterable[object]): plugins to use
172 """
173 raise NotImplementedError
174
175 def _create_sub_protocols(self):
176 return
177
178 def entity_connected(self):
179 """Called once connection is done
180
181 may return a Deferred, to perform initialisation tasks
182 """
183 return
184
185 @staticmethod
186 async def _run_profile_connected(
187 callback: Callable,
188 entity: "SatXMPPEntity",
189 timer: Dict[str, float]
190 ) -> None:
191 connected_start = time.time()
192 await utils.as_deferred(callback, entity)
193 timer["connected"] = time.time() - connected_start
194 timer["total"] += timer["connected"]
195
196 @classmethod
197 async def start_connection(cls, host, profile, max_retries):
198 """instantiate the entity and start the connection"""
199 # FIXME: reconnection doesn't seems to be handled correclty
200 # (client is deleted then recreated from scratch)
201 # most of methods called here should be called once on first connection
202 # (e.g. adding subprotocols)
203 # but client should not be deleted except if session is finished
204 # (independently of connection/deconnection)
205 if profile in cls.profiles_connecting:
206 raise exceptions.CancelError(f"{profile} is already being connected")
207 cls.profiles_connecting.add(profile)
208 try:
209 try:
210 port = int(
211 host.memory.param_get_a(
212 C.FORCE_PORT_PARAM, "Connection", profile_key=profile
213 )
214 )
215 except ValueError:
216 log.debug(_("Can't parse port value, using default value"))
217 port = (
218 None
219 ) # will use default value 5222 or be retrieved from a DNS SRV record
220
221 password = await host.memory.param_get_a_async(
222 "Password", "Connection", profile_key=profile
223 )
224
225 entity_jid_s = await host.memory.param_get_a_async(
226 "JabberID", "Connection", profile_key=profile)
227 entity_jid = jid.JID(entity_jid_s)
228
229 if not entity_jid.resource and not cls.is_component and entity_jid.user:
230 # if no resource is specified, we create our own instead of using
231 # server returned one, as it will then stay stable in case of
232 # reconnection. we only do that for client and if there is a user part, to
233 # let server decide for anonymous login
234 resource_dict = await host.memory.storage.get_privates(
235 "core:xmpp", ["resource"] , profile=profile)
236 try:
237 resource = resource_dict["resource"]
238 except KeyError:
239 resource = f"{C.APP_NAME_FILE}.{shortuuid.uuid()}"
240 await host.memory.storage.set_private_value(
241 "core:xmpp", "resource", resource, profile=profile)
242
243 log.info(_("We'll use the stable resource {resource}").format(
244 resource=resource))
245 entity_jid.resource = resource
246
247 if profile in host.profiles:
248 if host.profiles[profile].is_connected():
249 raise exceptions.InternalError(
250 f"There is already a connected profile of name {profile!r} in "
251 f"host")
252 log.debug(
253 "removing unconnected profile {profile!r}")
254 del host.profiles[profile]
255 entity = host.profiles[profile] = cls(
256 host, profile, entity_jid, password,
257 host.memory.param_get_a(C.FORCE_SERVER_PARAM, "Connection",
258 profile_key=profile) or None,
259 port, max_retries,
260 )
261
262 await entity.encryption.load_sessions()
263
264 entity._create_sub_protocols()
265
266 entity.fallBack = SatFallbackHandler(host)
267 entity.fallBack.setHandlerParent(entity)
268
269 entity.versionHandler = SatVersionHandler(C.APP_NAME, host.full_version)
270 entity.versionHandler.setHandlerParent(entity)
271
272 entity.identityHandler = SatIdentityHandler()
273 entity.identityHandler.setHandlerParent(entity)
274
275 log.debug(_("setting plugins parents"))
276
277 connection_timer: Dict[str, Dict[str, float]] = {}
278 plugin_conn_cb = await entity._call_connection_triggers(connection_timer)
279
280 entity.startService()
281
282 await entity.conn_deferred
283
284 await defer.maybeDeferred(entity.entity_connected)
285
286 # Call profile_connected callback for all plugins,
287 # and print error message if any of them fails
288 conn_cb_list = []
289 for plugin, callback in plugin_conn_cb:
290 conn_cb_list.append(
291 defer.ensureDeferred(
292 cls._run_profile_connected(
293 callback, entity, connection_timer[plugin]
294 )
295 )
296 )
297 list_d = defer.DeferredList(conn_cb_list)
298
299 def log_plugin_results(results):
300 if not results:
301 log.info("no plugin loaded")
302 return
303 all_succeed = all([success for success, result in results])
304 if not all_succeed:
305 log.error(_("Plugins initialisation error"))
306 for idx, (success, result) in enumerate(results):
307 if not success:
308 plugin_name = plugin_conn_cb[idx][0]._info["import_name"]
309 log.error(f"error (plugin {plugin_name}): {result}")
310
311 log.debug(f"Plugin loading time for {profile!r} (longer to shorter):\n")
312 plugins_by_timer = sorted(
313 connection_timer,
314 key=lambda p: connection_timer[p]["total"],
315 reverse=True
316 )
317 # total is the addition of all connecting and connected, doesn't really
318 # reflect the real loading time as connected are launched in a
319 # DeferredList
320 total_plugins = 0
321 # total real sum all connecting (which happen sequentially) and the
322 # longuest connected (connected happen in parallel, thus the longuest is
323 # roughly the total time for connected)
324 total_real = 0
325 total_real = max(t.get("connected", 0) for t in connection_timer.values())
326
327 for plugin in plugins_by_timer:
328 name = plugin._info["import_name"]
329 timer = connection_timer[plugin]
330 total_plugins += timer["total"]
331 try:
332 connecting = f"{timer['connecting']:.2f}s"
333 except KeyError:
334 connecting = "n/a"
335 else:
336 total_real += timer["connecting"]
337 try:
338 connected = f"{timer['connected']:.2f}s"
339 except KeyError:
340 connected = "n/a"
341 log.debug(
342 f" - {name}: total={timer['total']:.2f}s "
343 f"connecting={connecting} connected={connected}"
344 )
345 log.debug(
346 f" Plugins total={total_plugins:.2f}s real={total_real:.2f}s\n"
347 )
348
349 await list_d.addCallback(
350 log_plugin_results
351 ) # FIXME: we should have a timeout here, and a way to know if a plugin freeze
352 # TODO: mesure launch time of each plugin
353 finally:
354 cls.profiles_connecting.remove(profile)
355
356 def _disconnection_cb(self, __):
357 self._connected_d = None
358
359 def _disconnection_eb(self, failure_):
360 log.error(_("Error while disconnecting: {}".format(failure_)))
361
362 def _authd(self, xmlstream):
363 super(SatXMPPEntity, self)._authd(xmlstream)
364 log.debug(_("{profile} identified").format(profile=self.profile))
365 self.stream_initialized()
366
367 def _finish_connection(self, __):
368 if self.conn_deferred.called:
369 # can happen in case of forced disconnection by server
370 log.debug(f"{self} has already been connected")
371 else:
372 self.conn_deferred.callback(None)
373
374 def stream_initialized(self):
375 """Called after _authd"""
376 log.debug(_("XML stream is initialized"))
377 if not self.host_app.trigger.point("xml_init", self):
378 return
379 self.post_stream_init()
380
381 def post_stream_init(self):
382 """Workflow after stream initalisation."""
383 log.info(
384 _("********** [{profile}] CONNECTED **********").format(profile=self.profile)
385 )
386
387 # the following Deferred is used to know when we are connected
388 # so we need to be set it to None when connection is lost
389 self._connected_d = defer.Deferred()
390 self._connected_d.addCallback(self._clean_connection)
391 self._connected_d.addCallback(self._disconnection_cb)
392 self._connected_d.addErrback(self._disconnection_eb)
393
394 # we send the signal to the clients
395 self.host_app.bridge.connected(self.jid.full(), self.profile)
396
397 self.disco = SatDiscoProtocol(self)
398 self.disco.setHandlerParent(self)
399 self.discoHandler = disco.DiscoHandler()
400 self.discoHandler.setHandlerParent(self)
401 disco_d = defer.succeed(None)
402
403 if not self.host_app.trigger.point("Disco handled", disco_d, self.profile):
404 return
405
406 disco_d.addCallback(self._finish_connection)
407
408 def initializationFailed(self, reason):
409 log.error(
410 _(
411 "ERROR: XMPP connection failed for profile '%(profile)s': %(reason)s"
412 % {"profile": self.profile, "reason": reason}
413 )
414 )
415 self.conn_deferred.errback(reason.value)
416 try:
417 super(SatXMPPEntity, self).initializationFailed(reason)
418 except:
419 # we already chained an errback, no need to raise an exception
420 pass
421
422 ## connection ##
423
424 def connection_terminated(self, connector, reason, term_type, cb):
425 """Display disconnection reason, and call factory method
426
427 This method is monkey patched to factory, allowing plugins to handle finely
428 reconnection with the triggers.
429 @param connector(twisted.internet.base.BaseConnector): current connector
430 @param reason(failure.Failure): why connection has been terminated
431 @param term_type(unicode): on of 'failed' or 'lost'
432 @param cb(callable): original factory method
433
434 @trigger connection_failed(connector, reason): connection can't be established
435 @trigger connection_lost(connector, reason): connection was available but it not
436 anymore
437 """
438 # we save connector because it may be deleted when connection will be dropped
439 # if reconnection is disabled
440 self._saved_connector = connector
441 if reason is not None and not isinstance(reason.value,
442 internet_error.ConnectionDone):
443 try:
444 reason_str = str(reason.value)
445 except Exception:
446 # FIXME: workaround for Android were p4a strips docstrings
447 # while Twisted use docstring in __str__
448 # TODO: create a ticket upstream, Twisted should work when optimization
449 # is used
450 reason_str = str(reason.value.__class__)
451 log.warning(f"[{self.profile}] Connection {term_type}: {reason_str}")
452 if not self.host_app.trigger.point("connection_" + term_type, connector, reason):
453 return
454 return cb(connector, reason)
455
456 def network_disabled(self):
457 """Indicate that network has been completely disabled
458
459 In other words, internet is not available anymore and transport must be stopped.
460 Retrying is disabled too, as it makes no sense to try without network, and it may
461 use resources (notably battery on mobiles).
462 """
463 log.info(_("stopping connection because of network disabled"))
464 self.factory.continueTrying = 0
465 self._network_disabled = True
466 if self.xmlstream is not None:
467 self.xmlstream.transport.abortConnection()
468
469 def network_enabled(self):
470 """Indicate that network has been (re)enabled
471
472 This happens when e.g. user activate WIFI connection.
473 """
474 try:
475 connector = self._saved_connector
476 network_disabled = self._network_disabled
477 except AttributeError:
478 # connection has not been stopped by network_disabled
479 # we don't have to restart it
480 log.debug(f"no connection to restart [{self.profile}]")
481 return
482 else:
483 del self._network_disabled
484 if not network_disabled:
485 raise exceptions.InternalError("network_disabled should be True")
486 log.info(_("network is available, trying to connect"))
487 # we want to be sure to start fresh
488 self.factory.resetDelay()
489 # we have a saved connector, meaning the connection has been stopped previously
490 # we can now try to reconnect
491 connector.connect()
492
493 def _connected(self, xs):
494 send_hooks = []
495 receive_hooks = []
496 self.host_app.trigger.point(
497 "stream_hooks", self, receive_hooks, send_hooks)
498 for hook in receive_hooks:
499 xs.add_hook(C.STREAM_HOOK_RECEIVE, hook)
500 for hook in send_hooks:
501 xs.add_hook(C.STREAM_HOOK_SEND, hook)
502 super(SatXMPPEntity, self)._connected(xs)
503
504 def disconnect_profile(self, reason):
505 if self._connected_d is not None:
506 self.host_app.bridge.disconnected(
507 self.profile
508 ) # we send the signal to the clients
509 log.info(
510 _("********** [{profile}] DISCONNECTED **********").format(
511 profile=self.profile
512 )
513 )
514 # we purge only if no new connection attempt is expected
515 if not self.factory.continueTrying:
516 log.debug("continueTrying not set, purging entity")
517 self._connected_d.callback(None)
518 # and we remove references to this client
519 self.host_app.purge_entity(self.profile)
520
521 if not self.conn_deferred.called:
522 if reason is None:
523 err = error.StreamError("Server unexpectedly closed the connection")
524 else:
525 err = reason
526 try:
527 if err.value.args[0][0][2] == "certificate verify failed":
528 err = exceptions.InvalidCertificate(
529 _("Your server certificate is not valid "
530 "(its identity can't be checked).\n\n"
531 "This should never happen and may indicate that "
532 "somebody is trying to spy on you.\n"
533 "Please contact your server administrator."))
534 self.factory.stopTrying()
535 try:
536 # with invalid certificate, we should not retry to connect
537 # so we delete saved connector to avoid reconnection if
538 # network_enabled is called.
539 del self._saved_connector
540 except AttributeError:
541 pass
542 except (IndexError, TypeError):
543 pass
544 self.conn_deferred.errback(err)
545
546 def _disconnected(self, reason):
547 super(SatXMPPEntity, self)._disconnected(reason)
548 if not self.host_app.trigger.point("disconnected", self, reason):
549 return
550 self.disconnect_profile(reason)
551
552 @defer.inlineCallbacks
553 def _clean_connection(self, __):
554 """method called on disconnection
555
556 used to call profile_disconnected* triggers
557 """
558 trigger_name = "profile_disconnected"
559 for plugin in self._get_plugins_list():
560 disconnected_cb = getattr(plugin, trigger_name, None)
561 if disconnected_cb is not None:
562 yield disconnected_cb(self)
563
564 def is_connected(self):
565 """Return True is client is fully connected
566
567 client is considered fully connected if transport is started and all plugins
568 are initialised
569 """
570 try:
571 transport_connected = bool(self.xmlstream.transport.connected)
572 except AttributeError:
573 return False
574
575 return self._connected_d is not None and transport_connected
576
577 def entity_disconnect(self):
578 if not self.host_app.trigger.point("disconnecting", self):
579 return
580 log.info(_("Disconnecting..."))
581 self.stopService()
582 if self._connected_d is not None:
583 return self._connected_d
584 else:
585 return defer.succeed(None)
586
587 ## sending ##
588
589 def IQ(self, type_="set", timeout=60):
590 """shortcut to create an IQ element managing deferred
591
592 @param type_(unicode): IQ type ('set' or 'get')
593 @param timeout(None, int): timeout in seconds
594 @return((D)domish.Element: result stanza
595 errback is called if an error stanza is returned
596 """
597 iq_elt = xmlstream.IQ(self.xmlstream, type_)
598 iq_elt.timeout = timeout
599 return iq_elt
600
601 def sendError(self, iq_elt, condition, text=None, appCondition=None):
602 """Send error stanza build from iq_elt
603
604 @param iq_elt(domish.Element): initial IQ element
605 @param condition(unicode): error condition
606 """
607 iq_error_elt = error.StanzaError(
608 condition, text=text, appCondition=appCondition
609 ).toResponse(iq_elt)
610 self.xmlstream.send(iq_error_elt)
611
612 def generate_message_xml(
613 self,
614 data: core_types.MessageData,
615 post_xml_treatments: Optional[defer.Deferred] = None
616 ) -> core_types.MessageData:
617 """Generate <message/> stanza from message data
618
619 @param data: message data
620 domish element will be put in data['xml']
621 following keys are needed:
622 - from
623 - to
624 - uid: can be set to '' if uid attribute is not wanted
625 - message
626 - type
627 - subject
628 - extra
629 @param post_xml_treatments: a Deferred which will be called with data once XML is
630 generated
631 @return: message data
632 """
633 data["xml"] = message_elt = domish.Element((None, "message"))
634 message_elt["to"] = data["to"].full()
635 message_elt["from"] = data["from"].full()
636 message_elt["type"] = data["type"]
637 if data["uid"]: # key must be present but can be set to ''
638 # by a plugin to avoid id on purpose
639 message_elt["id"] = data["uid"]
640 for lang, subject in data["subject"].items():
641 subject_elt = message_elt.addElement("subject", content=subject)
642 if lang:
643 subject_elt[(C.NS_XML, "lang")] = lang
644 for lang, message in data["message"].items():
645 body_elt = message_elt.addElement("body", content=message)
646 if lang:
647 body_elt[(C.NS_XML, "lang")] = lang
648 try:
649 thread = data["extra"]["thread"]
650 except KeyError:
651 if "thread_parent" in data["extra"]:
652 raise exceptions.InternalError(
653 "thread_parent found while there is not associated thread"
654 )
655 else:
656 thread_elt = message_elt.addElement("thread", content=thread)
657 try:
658 thread_elt["parent"] = data["extra"]["thread_parent"]
659 except KeyError:
660 pass
661
662 if post_xml_treatments is not None:
663 post_xml_treatments.callback(data)
664 return data
665
666 @property
667 def is_admin(self) -> bool:
668 """True if a client is an administrator with extra privileges"""
669 return self.host_app.memory.is_admin(self.profile)
670
671 def add_post_xml_callbacks(self, post_xml_treatments):
672 """Used to add class level callbacks at the end of the workflow
673
674 @param post_xml_treatments(D): the same Deferred as in sendMessage trigger
675 """
676 raise NotImplementedError
677
678 async def a_send(self, obj):
679 # original send method accept string
680 # but we restrict to domish.Element to make trigger treatments easier
681 assert isinstance(obj, domish.Element)
682 # XXX: this trigger is the last one before sending stanza on wire
683 # it is intended for things like end 2 end encryption.
684 # *DO NOT* cancel (i.e. return False) without very good reason
685 # (out of band transmission for instance).
686 # e2e should have a priority of 0 here, and out of band transmission
687 # a lower priority
688 if not (await self.host_app.trigger.async_point("send", self, obj)):
689 return
690 super().send(obj)
691
692 def send(self, obj):
693 defer.ensureDeferred(self.a_send(obj))
694
695 async def send_message_data(self, mess_data):
696 """Convenient method to send message data to stream
697
698 This method will send mess_data[u'xml'] to stream, but a trigger is there
699 The trigger can't be cancelled, it's a good place for e2e encryption which
700 don't handle full stanza encryption
701 This trigger can return a Deferred (it's an async_point)
702 @param mess_data(dict): message data as constructed by onMessage workflow
703 @return (dict): mess_data (so it can be used in a deferred chain)
704 """
705 # XXX: This is the last trigger before u"send" (last but one globally)
706 # for sending message.
707 # This is intented for e2e encryption which doesn't do full stanza
708 # encryption (e.g. OTR)
709 # This trigger point can't cancel the method
710 await self.host_app.trigger.async_point("send_message_data", self, mess_data,
711 triggers_no_cancel=True)
712 await self.a_send(mess_data["xml"])
713 return mess_data
714
715 def sendMessage(
716 self, to_jid, message, subject=None, mess_type="auto", extra=None, uid=None,
717 no_trigger=False):
718 r"""Send a message to an entity
719
720 @param to_jid(jid.JID): destinee of the message
721 @param message(dict): message body, key is the language (use '' when unknown)
722 @param subject(dict): message subject, key is the language (use '' when unknown)
723 @param mess_type(str): one of standard message type (cf RFC 6121 §5.2.2) or:
724 - auto: for automatic type detection
725 - info: for information ("info_type" can be specified in extra)
726 @param extra(dict, None): extra data. Key can be:
727 - info_type: information type, can be
728 TODO
729 @param uid(unicode, None): unique id:
730 should be unique at least in this XMPP session
731 if None, an uuid will be generated
732 @param no_trigger (bool): if True, sendMessage[suffix] trigger will no be used
733 useful when a message need to be sent without any modification
734 /!\ this will also skip encryption methods!
735 """
736 if subject is None:
737 subject = {}
738 if extra is None:
739 extra = {}
740
741 assert mess_type in C.MESS_TYPE_ALL
742
743 data = { # dict is similar to the one used in client.onMessage
744 "from": self.jid,
745 "to": to_jid,
746 "uid": uid or str(uuid.uuid4()),
747 "message": message,
748 "subject": subject,
749 "type": mess_type,
750 "extra": extra,
751 "timestamp": time.time(),
752 }
753 # XXX: plugin can add their pre XML treatments to this deferred
754 pre_xml_treatments = defer.Deferred()
755 # XXX: plugin can add their post XML treatments to this deferred
756 post_xml_treatments = defer.Deferred()
757
758 if data["type"] == C.MESS_TYPE_AUTO:
759 # we try to guess the type
760 if data["subject"]:
761 data["type"] = C.MESS_TYPE_NORMAL
762 elif not data["to"].resource:
763 # we may have a groupchat message, we check if the we know this jid
764 try:
765 entity_type = self.host_app.memory.get_entity_datum(
766 self, data["to"], C.ENTITY_TYPE
767 )
768 # FIXME: should entity_type manage resources ?
769 except (exceptions.UnknownEntityError, KeyError):
770 entity_type = "contact"
771
772 if entity_type == C.ENTITY_TYPE_MUC:
773 data["type"] = C.MESS_TYPE_GROUPCHAT
774 else:
775 data["type"] = C.MESS_TYPE_CHAT
776 else:
777 data["type"] = C.MESS_TYPE_CHAT
778
779 # FIXME: send_only is used by libervia's OTR plugin to avoid
780 # the triggers from frontend, and no_trigger do the same
781 # thing internally, this could be unified
782 send_only = data["extra"].get("send_only", False)
783
784 if not no_trigger and not send_only:
785 # is the session encrypted? If so we indicate it in data
786 self.encryption.set_encryption_flag(data)
787
788 if not self.host_app.trigger.point(
789 "sendMessage" + self.trigger_suffix,
790 self,
791 data,
792 pre_xml_treatments,
793 post_xml_treatments,
794 ):
795 return defer.succeed(None)
796
797 log.debug(_("Sending message (type {type}, to {to})")
798 .format(type=data["type"], to=to_jid.full()))
799
800 pre_xml_treatments.addCallback(lambda __: self.generate_message_xml(data, post_xml_treatments))
801 pre_xml_treatments.addCallback(lambda __: post_xml_treatments)
802 pre_xml_treatments.addErrback(self._cancel_error_trap)
803 post_xml_treatments.addCallback(
804 lambda __: defer.ensureDeferred(self.send_message_data(data))
805 )
806 if send_only:
807 log.debug(_("Triggers, storage and echo have been inhibited by the "
808 "'send_only' parameter"))
809 else:
810 self.add_post_xml_callbacks(post_xml_treatments)
811 post_xml_treatments.addErrback(self._cancel_error_trap)
812 post_xml_treatments.addErrback(self.host_app.log_errback)
813 pre_xml_treatments.callback(data)
814 return pre_xml_treatments
815
816 def _cancel_error_trap(self, failure):
817 """A message sending can be cancelled by a plugin treatment"""
818 failure.trap(exceptions.CancelError)
819
820 def is_message_printable(self, mess_data):
821 """Return True if a message contain payload to show in frontends"""
822 return (
823 mess_data["message"] or mess_data["subject"]
824 or mess_data["extra"].get(C.KEY_ATTACHMENTS)
825 or mess_data["type"] == C.MESS_TYPE_INFO
826 )
827
828 async def message_add_to_history(self, data):
829 """Store message into database (for local history)
830
831 @param data: message data dictionnary
832 @param client: profile's client
833 """
834 if data["type"] != C.MESS_TYPE_GROUPCHAT:
835 # we don't add groupchat message to history, as we get them back
836 # and they will be added then
837
838 # we need a message to store
839 if self.is_message_printable(data):
840 await self.host_app.memory.add_to_history(self, data)
841 else:
842 log.warning(
843 "No message found"
844 ) # empty body should be managed by plugins before this point
845 return data
846
847 def message_get_bridge_args(self, data):
848 """Generate args to use with bridge from data dict"""
849 return (data["uid"], data["timestamp"], data["from"].full(),
850 data["to"].full(), data["message"], data["subject"],
851 data["type"], data_format.serialise(data["extra"]))
852
853
854 def message_send_to_bridge(self, data):
855 """Send message to bridge, so frontends can display it
856
857 @param data: message data dictionnary
858 @param client: profile's client
859 """
860 if data["type"] != C.MESS_TYPE_GROUPCHAT:
861 # we don't send groupchat message to bridge, as we get them back
862 # and they will be added the
863
864 # we need a message to send something
865 if self.is_message_printable(data):
866
867 # We send back the message, so all frontends are aware of it
868 self.host_app.bridge.message_new(
869 *self.message_get_bridge_args(data),
870 profile=self.profile
871 )
872 else:
873 log.warning(_("No message found"))
874 return data
875
876 ## helper methods ##
877
878 def p(self, plugin_name, missing=exceptions.MissingModule):
879 """Get a plugin if available
880
881 @param plugin_name(str): name of the plugin
882 @param missing(object): value to return if plugin is missing
883 if it is a subclass of Exception, it will be raised with a helping str as
884 argument.
885 @return (object): requested plugin wrapper, or default value
886 The plugin wrapper will return the method with client set as first
887 positional argument
888 """
889 return ClientPluginWrapper(self, plugin_name, missing)
890
891
892 ExtraDict = dict # TODO
893
894
895 @implementer(iwokkel.IDisco)
896 class SatXMPPClient(SatXMPPEntity, wokkel_client.XMPPClient):
897 trigger_suffix = ""
898 is_component = False
899
900 def __init__(self, host_app, profile, user_jid, password, host=None,
901 port=C.XMPP_C2S_PORT, max_retries=C.XMPP_MAX_RETRIES):
902 # XXX: DNS SRV records are checked when the host is not specified.
903 # If no SRV record is found, the host is directly extracted from the JID.
904 self.started = time.time()
905
906 # Currently, we use "client/pc/Salut à Toi", but as
907 # SàT is multi-frontends and can be used on mobile devices, as a bot,
908 # with a web frontend,
909 # etc., we should implement a way to dynamically update identities through the
910 # bridge
911 self.identities = [disco.DiscoIdentity("client", "pc", C.APP_NAME)]
912 if sys.platform == "android":
913 # for now we consider Android devices to be always phones
914 self.identities = [disco.DiscoIdentity("client", "phone", C.APP_NAME)]
915
916 hosts_map = host_app.memory.config_get(None, "hosts_dict", {})
917 if host is None and user_jid.host in hosts_map:
918 host_data = hosts_map[user_jid.host]
919 if isinstance(host_data, str):
920 host = host_data
921 elif isinstance(host_data, dict):
922 if "host" in host_data:
923 host = host_data["host"]
924 if "port" in host_data:
925 port = host_data["port"]
926 else:
927 log.warning(
928 _("invalid data used for host: {data}").format(data=host_data)
929 )
930 host_data = None
931 if host_data is not None:
932 log.info(
933 "using {host}:{port} for host {host_ori} as requested in config"
934 .format(host_ori=user_jid.host, host=host, port=port)
935 )
936
937 self.check_certificate = host_app.memory.param_get_a(
938 "check_certificate", "Connection", profile_key=profile)
939
940 if self.check_certificate:
941 tls_required, configurationForTLS = True, None
942 else:
943 tls_required = False
944 configurationForTLS = ssl.CertificateOptions(trustRoot=None)
945
946 wokkel_client.XMPPClient.__init__(
947 self, user_jid, password, host or None, port or C.XMPP_C2S_PORT,
948 tls_required=tls_required, configurationForTLS=configurationForTLS
949 )
950 SatXMPPEntity.__init__(self, host_app, profile, max_retries)
951
952 if not self.check_certificate:
953 msg = (_("Certificate validation is deactivated, this is unsecure and "
954 "somebody may be spying on you. If you have no good reason to disable "
955 "certificate validation, please activate \"Check certificate\" in your "
956 "settings in \"Connection\" tab."))
957 xml_tools.quick_note(host_app, self, msg, _("Security notice"),
958 level = C.XMLUI_DATA_LVL_WARNING)
959
960 @property
961 def server_jid(self):
962 return jid.JID(self.jid.host)
963
964 def _get_plugins_list(self):
965 for p in self.host_app.plugins.values():
966 if C.PLUG_MODE_CLIENT in p._info["modes"]:
967 yield p
968
969 def _create_sub_protocols(self):
970 self.messageProt = SatMessageProtocol(self.host_app)
971 self.messageProt.setHandlerParent(self)
972
973 self.roster = SatRosterProtocol(self.host_app)
974 self.roster.setHandlerParent(self)
975
976 self.presence = SatPresenceProtocol(self.host_app)
977 self.presence.setHandlerParent(self)
978
979 @classmethod
980 async def start_connection(cls, host, profile, max_retries):
981 try:
982 await super(SatXMPPClient, cls).start_connection(host, profile, max_retries)
983 except exceptions.CancelError as e:
984 log.warning(f"start_connection cancelled: {e}")
985 return
986 entity = host.profiles[profile]
987 # we finally send our presence
988 entity.presence.available()
989
990 def entity_connected(self):
991 # we want to be sure that we got the roster
992 return self.roster.got_roster
993
994 def add_post_xml_callbacks(self, post_xml_treatments):
995 post_xml_treatments.addCallback(self.messageProt.complete_attachments)
996 post_xml_treatments.addCallback(
997 lambda ret: defer.ensureDeferred(self.message_add_to_history(ret))
998 )
999 post_xml_treatments.addCallback(self.message_send_to_bridge)
1000
1001 def feedback(
1002 self,
1003 to_jid: jid.JID,
1004 message: str,
1005 extra: Optional[ExtraDict] = None
1006 ) -> None:
1007 """Send message to frontends
1008
1009 This message will be an info message, not recorded in history.
1010 It can be used to give feedback of a command
1011 @param to_jid: destinee jid
1012 @param message: message to send to frontends
1013 @param extra: extra data to use in particular, info subtype can be specified with
1014 MESS_EXTRA_INFO
1015 """
1016 if extra is None:
1017 extra = {}
1018 self.host_app.bridge.message_new(
1019 uid=str(uuid.uuid4()),
1020 timestamp=time.time(),
1021 from_jid=self.jid.full(),
1022 to_jid=to_jid.full(),
1023 message={"": message},
1024 subject={},
1025 mess_type=C.MESS_TYPE_INFO,
1026 extra=data_format.serialise(extra),
1027 profile=self.profile,
1028 )
1029
1030 def _finish_connection(self, __):
1031 d = self.roster.request_roster()
1032 d.addCallback(lambda __: super(SatXMPPClient, self)._finish_connection(__))
1033
1034
1035 @implementer(iwokkel.IDisco)
1036 class SatXMPPComponent(SatXMPPEntity, component.Component):
1037 """XMPP component
1038
1039 This component are similar but not identical to clients.
1040 An entry point plugin is launched after component is connected.
1041 Component need to instantiate MessageProtocol itself
1042 """
1043
1044 trigger_suffix = (
1045 "Component"
1046 ) # used for to distinguish some trigger points set in SatXMPPEntity
1047 is_component = True
1048 # XXX: set to True from entry plugin to keep messages in history for sent messages
1049 sendHistory = False
1050 # XXX: same as sendHistory but for received messaged
1051 receiveHistory = False
1052
1053 def __init__(self, host_app, profile, component_jid, password, host=None, port=None,
1054 max_retries=C.XMPP_MAX_RETRIES):
1055 self.started = time.time()
1056 if port is None:
1057 port = C.XMPP_COMPONENT_PORT
1058
1059 ## entry point ##
1060 entry_point = host_app.memory.get_entry_point(profile)
1061 try:
1062 self.entry_plugin = host_app.plugins[entry_point]
1063 except KeyError:
1064 raise exceptions.NotFound(
1065 _("The requested entry point ({entry_point}) is not available").format(
1066 entry_point=entry_point
1067 )
1068 )
1069
1070 self.enabled_features = set()
1071 self.identities = [disco.DiscoIdentity("component", "generic", C.APP_NAME)]
1072 # jid is set automatically on bind by Twisted for Client, but not for Component
1073 self.jid = component_jid
1074 if host is None:
1075 try:
1076 host = component_jid.host.split(".", 1)[1]
1077 except IndexError:
1078 raise ValueError("Can't guess host from jid, please specify a host")
1079 # XXX: component.Component expect unicode jid, while Client expect jid.JID.
1080 # this is not consistent, so we use jid.JID for SatXMPP*
1081 component.Component.__init__(self, host, port, component_jid.full(), password)
1082 SatXMPPEntity.__init__(self, host_app, profile, max_retries)
1083
1084 @property
1085 def server_jid(self):
1086 # FIXME: not the best way to get server jid, maybe use config option?
1087 return jid.JID(self.jid.host.split(".", 1)[-1])
1088
1089 @property
1090 def is_admin(self) -> bool:
1091 return False
1092
1093 def _create_sub_protocols(self):
1094 self.messageProt = SatMessageProtocol(self.host_app)
1095 self.messageProt.setHandlerParent(self)
1096
1097 def _build_dependencies(self, current, plugins, required=True):
1098 """build recursively dependencies needed for a plugin
1099
1100 this method build list of plugin needed for a component and raises
1101 errors if they are not available or not allowed for components
1102 @param current(object): parent plugin to check
1103 use entry_point for first call
1104 @param plugins(list): list of validated plugins, will be filled by the method
1105 give an empty list for first call
1106 @param required(bool): True if plugin is mandatory
1107 for recursive calls only, should not be modified by inital caller
1108 @raise InternalError: one of the plugin is not handling components
1109 @raise KeyError: one plugin should be present in self.host_app.plugins but it
1110 is not
1111 """
1112 if C.PLUG_MODE_COMPONENT not in current._info["modes"]:
1113 if not required:
1114 return
1115 else:
1116 log.error(
1117 _(
1118 "Plugin {current_name} is needed for {entry_name}, "
1119 "but it doesn't handle component mode"
1120 ).format(
1121 current_name=current._info["import_name"],
1122 entry_name=self.entry_plugin._info["import_name"],
1123 )
1124 )
1125 raise exceptions.InternalError(_("invalid plugin mode"))
1126
1127 for import_name in current._info.get(C.PI_DEPENDENCIES, []):
1128 # plugins are already loaded as dependencies
1129 # so we know they are in self.host_app.plugins
1130 dep = self.host_app.plugins[import_name]
1131 self._build_dependencies(dep, plugins)
1132
1133 for import_name in current._info.get(C.PI_RECOMMENDATIONS, []):
1134 # here plugins are only recommendations,
1135 # so they may not exist in self.host_app.plugins
1136 try:
1137 dep = self.host_app.plugins[import_name]
1138 except KeyError:
1139 continue
1140 self._build_dependencies(dep, plugins, required=False)
1141
1142 if current not in plugins:
1143 # current can be required for several plugins and so
1144 # it can already be present in the list
1145 plugins.append(current)
1146
1147 def _get_plugins_list(self):
1148 # XXX: for component we don't launch all plugins triggers
1149 # but only the ones from which there is a dependency
1150 plugins = []
1151 self._build_dependencies(self.entry_plugin, plugins)
1152 return plugins
1153
1154 def entity_connected(self):
1155 # we can now launch entry point
1156 try:
1157 start_cb = self.entry_plugin.componentStart
1158 except AttributeError:
1159 return
1160 else:
1161 return start_cb(self)
1162
1163 def add_post_xml_callbacks(self, post_xml_treatments):
1164 if self.sendHistory:
1165 post_xml_treatments.addCallback(
1166 lambda ret: defer.ensureDeferred(self.message_add_to_history(ret))
1167 )
1168
1169 def get_owner_from_jid(self, to_jid: jid.JID) -> jid.JID:
1170 """Retrieve "owner" of a component resource from the destination jid of the request
1171
1172 This method needs plugin XEP-0106 for unescaping, if you use it you must add the
1173 plugin to your dependencies.
1174 A "user" part must be present in "to_jid" (otherwise, the component itself is addressed)
1175 @param to_jid: destination JID of the request
1176 """
1177 try:
1178 unescape = self.host_app.plugins['XEP-0106'].unescape
1179 except KeyError:
1180 raise exceptions.MissingPlugin("Plugin XEP-0106 is needed to retrieve owner")
1181 else:
1182 user = unescape(to_jid.user)
1183 if '@' in user:
1184 # a full jid is specified
1185 return jid.JID(user)
1186 else:
1187 # only user part is specified, we use our own host to build the full jid
1188 return jid.JID(None, (user, self.host, None))
1189
1190 def get_owner_and_peer(self, iq_elt: domish.Element) -> Tuple[jid.JID, jid.JID]:
1191 """Retrieve owner of a component jid, and the jid of the requesting peer
1192
1193 "owner" is found by either unescaping full jid from node, or by combining node
1194 with our host.
1195 Peer jid is the requesting jid from the IQ element
1196 @param iq_elt: IQ stanza sent from the requested
1197 @return: owner and peer JIDs
1198 """
1199 to_jid = jid.JID(iq_elt['to'])
1200 if to_jid.user:
1201 owner = self.get_owner_from_jid(to_jid)
1202 else:
1203 owner = jid.JID(iq_elt["from"]).userhostJID()
1204
1205 peer_jid = jid.JID(iq_elt["from"])
1206 return peer_jid, owner
1207
1208 def get_virtual_client(self, jid_: jid.JID) -> SatXMPPEntity:
1209 """Get client for this component with a specified jid
1210
1211 This is needed to perform operations with a virtual JID corresponding to a virtual
1212 entity (e.g. identified of a legacy network account) instead of the JID of the
1213 gateway itself.
1214 @param jid_: virtual JID to use
1215 @return: virtual client
1216 """
1217 client = copy.copy(self)
1218 client.jid = jid_
1219 return client
1220
1221
1222 class SatMessageProtocol(xmppim.MessageProtocol):
1223
1224 def __init__(self, host):
1225 xmppim.MessageProtocol.__init__(self)
1226 self.host = host
1227
1228 @property
1229 def client(self):
1230 return self.parent
1231
1232 def normalize_ns(self, elt: domish.Element, namespace: Optional[str]) -> None:
1233 if elt.uri == namespace:
1234 elt.defaultUri = elt.uri = C.NS_CLIENT
1235 for child in elt.elements():
1236 self.normalize_ns(child, namespace)
1237
1238 def parse_message(self, message_elt):
1239 """Parse a message XML and return message_data
1240
1241 @param message_elt(domish.Element): raw <message> xml
1242 @param client(SatXMPPClient, None): client to map message id to uid
1243 if None, mapping will not be done
1244 @return(dict): message data
1245 """
1246 if message_elt.name != "message":
1247 log.warning(_(
1248 "parse_message used with a non <message/> stanza, ignoring: {xml}"
1249 .format(xml=message_elt.toXml())))
1250 return {}
1251
1252 if message_elt.uri == None:
1253 # xmlns may be None when wokkel element parsing strip out root namespace
1254 self.normalize_ns(message_elt, None)
1255 elif message_elt.uri != C.NS_CLIENT:
1256 log.warning(_(
1257 "received <message> with a wrong namespace: {xml}"
1258 .format(xml=message_elt.toXml())))
1259
1260 client = self.parent
1261
1262 if not message_elt.hasAttribute('to'):
1263 message_elt['to'] = client.jid.full()
1264
1265 message = {}
1266 subject = {}
1267 extra = {}
1268 data = {
1269 "from": jid.JID(message_elt["from"]),
1270 "to": jid.JID(message_elt["to"]),
1271 "uid": message_elt.getAttribute(
1272 "uid", str(uuid.uuid4())
1273 ), # XXX: uid is not a standard attribute but may be added by plugins
1274 "message": message,
1275 "subject": subject,
1276 "type": message_elt.getAttribute("type", "normal"),
1277 "extra": extra,
1278 }
1279
1280 try:
1281 message_id = data["extra"]["message_id"] = message_elt["id"]
1282 except KeyError:
1283 pass
1284 else:
1285 client.mess_id2uid[(data["from"], message_id)] = data["uid"]
1286
1287 # message
1288 for e in message_elt.elements(C.NS_CLIENT, "body"):
1289 message[e.getAttribute((C.NS_XML, "lang"), "")] = str(e)
1290
1291 # subject
1292 for e in message_elt.elements(C.NS_CLIENT, "subject"):
1293 subject[e.getAttribute((C.NS_XML, "lang"), "")] = str(e)
1294
1295 # delay and timestamp
1296 try:
1297 received_timestamp = message_elt._received_timestamp
1298 except AttributeError:
1299 # message_elt._received_timestamp should have been set in onMessage
1300 # but if parse_message is called directly, it can be missing
1301 log.debug("missing received timestamp for {message_elt}".format(
1302 message_elt=message_elt))
1303 received_timestamp = time.time()
1304
1305 try:
1306 delay_elt = next(message_elt.elements(delay.NS_DELAY, "delay"))
1307 except StopIteration:
1308 data["timestamp"] = received_timestamp
1309 else:
1310 parsed_delay = delay.Delay.fromElement(delay_elt)
1311 data["timestamp"] = calendar.timegm(parsed_delay.stamp.utctimetuple())
1312 data["received_timestamp"] = received_timestamp
1313 if parsed_delay.sender:
1314 data["delay_sender"] = parsed_delay.sender.full()
1315
1316 self.host.trigger.point("message_parse", client, message_elt, data)
1317 return data
1318
1319 def _on_message_start_workflow(self, cont, client, message_elt, post_treat):
1320 """Parse message and do post treatments
1321
1322 It is the first callback called after message_received trigger
1323 @param cont(bool): workflow will continue only if this is True
1324 @param message_elt(domish.Element): message stanza
1325 may have be modified by triggers
1326 @param post_treat(defer.Deferred): post parsing treatments
1327 """
1328 if not cont:
1329 return
1330 data = self.parse_message(message_elt)
1331 post_treat.addCallback(self.complete_attachments)
1332 post_treat.addCallback(self.skip_empty_message)
1333 if not client.is_component or client.receiveHistory:
1334 post_treat.addCallback(
1335 lambda ret: defer.ensureDeferred(self.add_to_history(ret))
1336 )
1337 if not client.is_component:
1338 post_treat.addCallback(self.bridge_signal, data)
1339 post_treat.addErrback(self.cancel_error_trap)
1340 post_treat.callback(data)
1341
1342 def onMessage(self, message_elt):
1343 # TODO: handle threads
1344 message_elt._received_timestamp = time.time()
1345 client = self.parent
1346 if not "from" in message_elt.attributes:
1347 message_elt["from"] = client.jid.host
1348 log.debug(_("got message from: {from_}").format(from_=message_elt["from"]))
1349 if self.client.is_component and message_elt.uri == component.NS_COMPONENT_ACCEPT:
1350 # we use client namespace all the time to simplify parsing
1351 self.normalize_ns(message_elt, component.NS_COMPONENT_ACCEPT)
1352
1353 # plugin can add their treatments to this deferred
1354 post_treat = defer.Deferred()
1355
1356 d = self.host.trigger.async_point(
1357 "message_received", client, message_elt, post_treat
1358 )
1359
1360 d.addCallback(self._on_message_start_workflow, client, message_elt, post_treat)
1361
1362 def complete_attachments(self, data):
1363 """Complete missing metadata of attachments"""
1364 for attachment in data['extra'].get(C.KEY_ATTACHMENTS, []):
1365 if "name" not in attachment and "url" in attachment:
1366 name = (Path(unquote(urlparse(attachment['url']).path)).name
1367 or C.FILE_DEFAULT_NAME)
1368 attachment["name"] = name
1369 if ((C.KEY_ATTACHMENTS_MEDIA_TYPE not in attachment
1370 and "name" in attachment)):
1371 media_type = mimetypes.guess_type(attachment['name'], strict=False)[0]
1372 if media_type:
1373 attachment[C.KEY_ATTACHMENTS_MEDIA_TYPE] = media_type
1374
1375 return data
1376
1377 def skip_empty_message(self, data):
1378 if not data["message"] and not data["extra"] and not data["subject"]:
1379 raise failure.Failure(exceptions.CancelError("Cancelled empty message"))
1380 return data
1381
1382 async def add_to_history(self, data):
1383 if data.pop("history", None) == C.HISTORY_SKIP:
1384 log.debug("history is skipped as requested")
1385 data["extra"]["history"] = C.HISTORY_SKIP
1386 else:
1387 # we need a message to store
1388 if self.parent.is_message_printable(data):
1389 return await self.host.memory.add_to_history(self.parent, data)
1390 else:
1391 log.debug("not storing empty message to history: {data}"
1392 .format(data=data))
1393
1394 def bridge_signal(self, __, data):
1395 try:
1396 data["extra"]["received_timestamp"] = str(data["received_timestamp"])
1397 data["extra"]["delay_sender"] = data["delay_sender"]
1398 except KeyError:
1399 pass
1400 if self.client.encryption.isEncrypted(data):
1401 data["extra"]["encrypted"] = True
1402 if data is not None:
1403 if self.parent.is_message_printable(data):
1404 self.host.bridge.message_new(
1405 data["uid"],
1406 data["timestamp"],
1407 data["from"].full(),
1408 data["to"].full(),
1409 data["message"],
1410 data["subject"],
1411 data["type"],
1412 data_format.serialise(data["extra"]),
1413 profile=self.parent.profile,
1414 )
1415 else:
1416 log.debug("Discarding bridge signal for empty message: {data}".format(
1417 data=data))
1418 return data
1419
1420 def cancel_error_trap(self, failure_):
1421 """A message sending can be cancelled by a plugin treatment"""
1422 failure_.trap(exceptions.CancelError)
1423
1424
1425 class SatRosterProtocol(xmppim.RosterClientProtocol):
1426
1427 def __init__(self, host):
1428 xmppim.RosterClientProtocol.__init__(self)
1429 self.host = host
1430 self.got_roster = defer.Deferred() # called when roster is received and ready
1431 # XXX: the two following dicts keep a local copy of the roster
1432 self._jids = {} # map from jids to RosterItem: key=jid value=RosterItem
1433 self._groups = {} # map from groups to jids: key=group value=set of jids
1434
1435 def __contains__(self, entity_jid):
1436 return self.is_jid_in_roster(entity_jid)
1437
1438 @property
1439 def versioning(self):
1440 """True if server support roster versioning"""
1441 return (NS_ROSTER_VER, 'ver') in self.parent.xmlstream.features
1442
1443 @property
1444 def roster_cache(self):
1445 """Cache of roster from storage
1446
1447 This property return a new PersistentDict on each call, it must be loaded
1448 manually if necessary
1449 """
1450 return persistent.PersistentDict(NS_ROSTER_VER, self.parent.profile)
1451
1452 def _register_item(self, item):
1453 """Register item in local cache
1454
1455 item must be already registered in self._jids before this method is called
1456 @param item (RosterIem): item added
1457 """
1458 log.debug("registering item: {}".format(item.entity.full()))
1459 if item.entity.resource:
1460 log.warning(
1461 "Received a roster item with a resource, this is not common but not "
1462 "restricted by RFC 6121, this case may be not well tested."
1463 )
1464 if not item.subscriptionTo:
1465 if not item.subscriptionFrom:
1466 log.info(
1467 _("There's no subscription between you and [{}]!").format(
1468 item.entity.full()
1469 )
1470 )
1471 else:
1472 log.info(_("You are not subscribed to [{}]!").format(item.entity.full()))
1473 if not item.subscriptionFrom:
1474 log.info(_("[{}] is not subscribed to you!").format(item.entity.full()))
1475
1476 for group in item.groups:
1477 self._groups.setdefault(group, set()).add(item.entity)
1478
1479 @defer.inlineCallbacks
1480 def _cache_roster(self, version):
1481 """Serialise local roster and save it to storage
1482
1483 @param version(unicode): version of roster in local cache
1484 """
1485 roster_cache = self.roster_cache
1486 yield roster_cache.clear()
1487 roster_cache[ROSTER_VER_KEY] = version
1488 for roster_jid, roster_item in self._jids.items():
1489 roster_jid_s = roster_jid.full()
1490 roster_item_elt = roster_item.toElement().toXml()
1491 roster_cache[roster_jid_s] = roster_item_elt
1492
1493 @defer.inlineCallbacks
1494 def resync(self):
1495 """Ask full roster to resync database
1496
1497 this should not be necessary, but may be used if user suspsect roster
1498 to be somehow corrupted
1499 """
1500 roster_cache = self.roster_cache
1501 yield roster_cache.clear()
1502 self._jids.clear()
1503 self._groups.clear()
1504 yield self.request_roster()
1505
1506 @defer.inlineCallbacks
1507 def request_roster(self):
1508 """Ask the server for Roster list """
1509 if self.versioning:
1510 log.info(_("our server support roster versioning, we use it"))
1511 roster_cache = self.roster_cache
1512 yield roster_cache.load()
1513 try:
1514 version = roster_cache[ROSTER_VER_KEY]
1515 except KeyError:
1516 log.info(_("no roster in cache, we start fresh"))
1517 # u"" means we use versioning without valid roster in cache
1518 version = ""
1519 else:
1520 log.info(_("We have roster v{version} in cache").format(version=version))
1521 # we deserialise cached roster to our local cache
1522 for roster_jid_s, roster_item_elt_s in roster_cache.items():
1523 if roster_jid_s == ROSTER_VER_KEY:
1524 continue
1525 roster_jid = jid.JID(roster_jid_s)
1526 roster_item_elt = generic.parseXml(roster_item_elt_s.encode('utf-8'))
1527 roster_item = xmppim.RosterItem.fromElement(roster_item_elt)
1528 self._jids[roster_jid] = roster_item
1529 self._register_item(roster_item)
1530 else:
1531 log.warning(_("our server doesn't support roster versioning"))
1532 version = None
1533
1534 log.debug("requesting roster")
1535 roster = yield self.getRoster(version=version)
1536 if roster is None:
1537 log.debug("empty roster result received, we'll get roster item with roster "
1538 "pushes")
1539 else:
1540 # a full roster is received
1541 self._groups.clear()
1542 self._jids = roster
1543 for item in roster.values():
1544 if not item.subscriptionTo and not item.subscriptionFrom and not item.ask:
1545 # XXX: current behaviour: we don't want contact in our roster list
1546 # if there is no presence subscription
1547 # may change in the future
1548 log.info(
1549 "Removing contact {} from roster because there is no presence "
1550 "subscription".format(
1551 item.jid
1552 )
1553 )
1554 self.removeItem(item.entity) # FIXME: to be checked
1555 else:
1556 self._register_item(item)
1557 yield self._cache_roster(roster.version)
1558
1559 if not self.got_roster.called:
1560 # got_roster may already be called if we use resync()
1561 self.got_roster.callback(None)
1562
1563 def removeItem(self, to_jid):
1564 """Remove a contact from roster list
1565 @param to_jid: a JID instance
1566 @return: Deferred
1567 """
1568 return xmppim.RosterClientProtocol.removeItem(self, to_jid)
1569
1570 def get_attributes(self, item):
1571 """Return dictionary of attributes as used in bridge from a RosterItem
1572
1573 @param item: RosterItem
1574 @return: dictionary of attributes
1575 """
1576 item_attr = {
1577 "to": str(item.subscriptionTo),
1578 "from": str(item.subscriptionFrom),
1579 "ask": str(item.ask),
1580 }
1581 if item.name:
1582 item_attr["name"] = item.name
1583 return item_attr
1584
1585 def setReceived(self, request):
1586 item = request.item
1587 entity = item.entity
1588 log.info(_("adding {entity} to roster").format(entity=entity.full()))
1589 if request.version is not None:
1590 # we update the cache in storage
1591 roster_cache = self.roster_cache
1592 roster_cache[entity.full()] = item.toElement().toXml()
1593 roster_cache[ROSTER_VER_KEY] = request.version
1594
1595 try: # update the cache for the groups the contact has been removed from
1596 left_groups = set(self._jids[entity].groups).difference(item.groups)
1597 for group in left_groups:
1598 jids_set = self._groups[group]
1599 jids_set.remove(entity)
1600 if not jids_set:
1601 del self._groups[group]
1602 except KeyError:
1603 pass # no previous item registration (or it's been cleared)
1604 self._jids[entity] = item
1605 self._register_item(item)
1606 self.host.bridge.contact_new(
1607 entity.full(), self.get_attributes(item), list(item.groups),
1608 self.parent.profile
1609 )
1610
1611 def removeReceived(self, request):
1612 entity = request.item.entity
1613 log.info(_("removing {entity} from roster").format(entity=entity.full()))
1614 if request.version is not None:
1615 # we update the cache in storage
1616 roster_cache = self.roster_cache
1617 try:
1618 del roster_cache[request.item.entity.full()]
1619 except KeyError:
1620 # because we don't use load(), cache won't have the key, but it
1621 # will be deleted from storage anyway
1622 pass
1623 roster_cache[ROSTER_VER_KEY] = request.version
1624
1625 # we first remove item from local cache (self._groups and self._jids)
1626 try:
1627 item = self._jids.pop(entity)
1628 except KeyError:
1629 log.error(
1630 "Received a roster remove event for an item not in cache ({})".format(
1631 entity
1632 )
1633 )
1634 return
1635 for group in item.groups:
1636 try:
1637 jids_set = self._groups[group]
1638 jids_set.remove(entity)
1639 if not jids_set:
1640 del self._groups[group]
1641 except KeyError:
1642 log.warning(
1643 f"there is no cache for the group [{group}] of the removed roster "
1644 f"item [{entity}]"
1645 )
1646
1647 # then we send the bridge signal
1648 self.host.bridge.contact_deleted(entity.full(), self.parent.profile)
1649
1650 def get_groups(self):
1651 """Return a list of groups"""
1652 return list(self._groups.keys())
1653
1654 def get_item(self, entity_jid):
1655 """Return RosterItem for a given jid
1656
1657 @param entity_jid(jid.JID): jid of the contact
1658 @return(RosterItem, None): RosterItem instance
1659 None if contact is not in cache
1660 """
1661 return self._jids.get(entity_jid, None)
1662
1663 def get_jids(self):
1664 """Return all jids of the roster"""
1665 return list(self._jids.keys())
1666
1667 def is_jid_in_roster(self, entity_jid):
1668 """Return True if jid is in roster"""
1669 if not isinstance(entity_jid, jid.JID):
1670 raise exceptions.InternalError(
1671 f"a JID is expected, not {type(entity_jid)}: {entity_jid!r}")
1672 return entity_jid in self._jids
1673
1674 def is_subscribed_from(self, entity_jid: jid.JID) -> bool:
1675 """Return True if entity is authorised to see our presence"""
1676 try:
1677 item = self._jids[entity_jid.userhostJID()]
1678 except KeyError:
1679 return False
1680 return item.subscriptionFrom
1681
1682 def is_subscribed_to(self, entity_jid: jid.JID) -> bool:
1683 """Return True if we are subscribed to entity"""
1684 try:
1685 item = self._jids[entity_jid.userhostJID()]
1686 except KeyError:
1687 return False
1688 return item.subscriptionTo
1689
1690 def get_items(self):
1691 """Return all items of the roster"""
1692 return list(self._jids.values())
1693
1694 def get_jids_from_group(self, group):
1695 try:
1696 return self._groups[group]
1697 except KeyError:
1698 raise exceptions.UnknownGroupError(group)
1699
1700 def get_jids_set(self, type_, groups=None):
1701 """Helper method to get a set of jids
1702
1703 @param type_(unicode): one of:
1704 C.ALL: get all jids from roster
1705 C.GROUP: get jids from groups (listed in "groups")
1706 @groups(list[unicode]): list of groups used if type_==C.GROUP
1707 @return (set(jid.JID)): set of selected jids
1708 """
1709 if type_ == C.ALL and groups is not None:
1710 raise ValueError("groups must not be set for {} type".format(C.ALL))
1711
1712 if type_ == C.ALL:
1713 return set(self.get_jids())
1714 elif type_ == C.GROUP:
1715 jids = set()
1716 for group in groups:
1717 jids.update(self.get_jids_from_group(group))
1718 return jids
1719 else:
1720 raise ValueError("Unexpected type_ {}".format(type_))
1721
1722 def get_nick(self, entity_jid):
1723 """Return a nick name for an entity
1724
1725 return nick choosed by user if available
1726 else return user part of entity_jid
1727 """
1728 item = self.get_item(entity_jid)
1729 if item is None:
1730 return entity_jid.user
1731 else:
1732 return item.name or entity_jid.user
1733
1734
1735 class SatPresenceProtocol(xmppim.PresenceClientProtocol):
1736
1737 def __init__(self, host):
1738 xmppim.PresenceClientProtocol.__init__(self)
1739 self.host = host
1740
1741 @property
1742 def client(self):
1743 return self.parent
1744
1745 def send(self, obj):
1746 presence_d = defer.succeed(None)
1747 if not self.host.trigger.point("Presence send", self.parent, obj, presence_d):
1748 return
1749 presence_d.addCallback(lambda __: super(SatPresenceProtocol, self).send(obj))
1750 return presence_d
1751
1752 def availableReceived(self, entity, show=None, statuses=None, priority=0):
1753 if not statuses:
1754 statuses = {}
1755
1756 if None in statuses: # we only want string keys
1757 statuses[C.PRESENCE_STATUSES_DEFAULT] = statuses.pop(None)
1758
1759 if not self.host.trigger.point(
1760 "presence_received", self.parent, entity, show, priority, statuses
1761 ):
1762 return
1763
1764 self.host.memory.set_presence_status(
1765 entity, show or "", int(priority), statuses, self.parent.profile
1766 )
1767
1768 # now it's time to notify frontends
1769 self.host.bridge.presence_update(
1770 entity.full(), show or "", int(priority), statuses, self.parent.profile
1771 )
1772
1773 def unavailableReceived(self, entity, statuses=None):
1774 log.debug(
1775 _("presence update for [%(entity)s] (unavailable, statuses=%(statuses)s)")
1776 % {"entity": entity, C.PRESENCE_STATUSES: statuses}
1777 )
1778
1779 if not statuses:
1780 statuses = {}
1781
1782 if None in statuses: # we only want string keys
1783 statuses[C.PRESENCE_STATUSES_DEFAULT] = statuses.pop(None)
1784
1785 if not self.host.trigger.point(
1786 "presence_received", self.parent, entity, C.PRESENCE_UNAVAILABLE, 0, statuses,
1787 ):
1788 return
1789
1790 # now it's time to notify frontends
1791 # if the entity is not known yet in this session or is already unavailable,
1792 # there is no need to send an unavailable signal
1793 try:
1794 presence = self.host.memory.get_entity_datum(
1795 self.client, entity, "presence"
1796 )
1797 except (KeyError, exceptions.UnknownEntityError):
1798 # the entity has not been seen yet in this session
1799 pass
1800 else:
1801 if presence.show != C.PRESENCE_UNAVAILABLE:
1802 self.host.bridge.presence_update(
1803 entity.full(),
1804 C.PRESENCE_UNAVAILABLE,
1805 0,
1806 statuses,
1807 self.parent.profile,
1808 )
1809
1810 self.host.memory.set_presence_status(
1811 entity, C.PRESENCE_UNAVAILABLE, 0, statuses, self.parent.profile
1812 )
1813
1814 def available(self, entity=None, show=None, statuses=None, priority=None):
1815 """Set a presence and statuses.
1816
1817 @param entity (jid.JID): entity
1818 @param show (unicode): value in ('unavailable', '', 'away', 'xa', 'chat', 'dnd')
1819 @param statuses (dict{unicode: unicode}): multilingual statuses with
1820 the entry key beeing a language code on 2 characters or "default".
1821 """
1822 if priority is None:
1823 try:
1824 priority = int(
1825 self.host.memory.param_get_a(
1826 "Priority", "Connection", profile_key=self.parent.profile
1827 )
1828 )
1829 except ValueError:
1830 priority = 0
1831
1832 if statuses is None:
1833 statuses = {}
1834
1835 # default for us is None for wokkel
1836 # so we must temporarily switch to wokkel's convention...
1837 if C.PRESENCE_STATUSES_DEFAULT in statuses:
1838 statuses[None] = statuses.pop(C.PRESENCE_STATUSES_DEFAULT)
1839
1840 presence_elt = xmppim.AvailablePresence(entity, show, statuses, priority)
1841
1842 # ... before switching back
1843 if None in statuses:
1844 statuses["default"] = statuses.pop(None)
1845
1846 if not self.host.trigger.point("presence_available", presence_elt, self.parent):
1847 return
1848 return self.send(presence_elt)
1849
1850 @defer.inlineCallbacks
1851 def subscribed(self, entity):
1852 yield self.parent.roster.got_roster
1853 xmppim.PresenceClientProtocol.subscribed(self, entity)
1854 self.host.memory.del_waiting_sub(entity.userhost(), self.parent.profile)
1855 item = self.parent.roster.get_item(entity)
1856 if (
1857 not item or not item.subscriptionTo
1858 ): # we automatically subscribe to 'to' presence
1859 log.debug(_('sending automatic "from" subscription request'))
1860 self.subscribe(entity)
1861
1862 def unsubscribed(self, entity):
1863 xmppim.PresenceClientProtocol.unsubscribed(self, entity)
1864 self.host.memory.del_waiting_sub(entity.userhost(), self.parent.profile)
1865
1866 def subscribedReceived(self, entity):
1867 log.debug(_("subscription approved for [%s]") % entity.userhost())
1868 self.host.bridge.subscribe("subscribed", entity.userhost(), self.parent.profile)
1869
1870 def unsubscribedReceived(self, entity):
1871 log.debug(_("unsubscription confirmed for [%s]") % entity.userhost())
1872 self.host.bridge.subscribe("unsubscribed", entity.userhost(), self.parent.profile)
1873
1874 @defer.inlineCallbacks
1875 def subscribeReceived(self, entity):
1876 log.debug(_("subscription request from [%s]") % entity.userhost())
1877 yield self.parent.roster.got_roster
1878 item = self.parent.roster.get_item(entity)
1879 if item and item.subscriptionTo:
1880 # We automatically accept subscription if we are already subscribed to
1881 # contact presence
1882 log.debug(_("sending automatic subscription acceptance"))
1883 self.subscribed(entity)
1884 else:
1885 self.host.memory.add_waiting_sub(
1886 "subscribe", entity.userhost(), self.parent.profile
1887 )
1888 self.host.bridge.subscribe(
1889 "subscribe", entity.userhost(), self.parent.profile
1890 )
1891
1892 @defer.inlineCallbacks
1893 def unsubscribeReceived(self, entity):
1894 log.debug(_("unsubscription asked for [%s]") % entity.userhost())
1895 yield self.parent.roster.got_roster
1896 item = self.parent.roster.get_item(entity)
1897 if item and item.subscriptionFrom: # we automatically remove contact
1898 log.debug(_("automatic contact deletion"))
1899 self.host.contact_del(entity, self.parent.profile)
1900 self.host.bridge.subscribe("unsubscribe", entity.userhost(), self.parent.profile)
1901
1902
1903 @implementer(iwokkel.IDisco)
1904 class SatDiscoProtocol(disco.DiscoClientProtocol):
1905
1906 def __init__(self, host):
1907 disco.DiscoClientProtocol.__init__(self)
1908
1909 def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
1910 # those features are implemented in Wokkel (or sat_tmp.wokkel)
1911 # and thus are always available
1912 return [disco.DiscoFeature(NS_X_DATA),
1913 disco.DiscoFeature(NS_XML_ELEMENT),
1914 disco.DiscoFeature(NS_DISCO_INFO)]
1915
1916 def getDiscoItems(self, requestor, target, nodeIdentifier=""):
1917 return []
1918
1919
1920 class SatFallbackHandler(generic.FallbackHandler):
1921 def __init__(self, host):
1922 generic.FallbackHandler.__init__(self)
1923
1924 def iqFallback(self, iq):
1925 if iq.handled is True:
1926 return
1927 log.debug("iqFallback: xml = [%s]" % (iq.toXml()))
1928 generic.FallbackHandler.iqFallback(self, iq)
1929
1930
1931 class SatVersionHandler(generic.VersionHandler):
1932
1933 def getDiscoInfo(self, requestor, target, node):
1934 # XXX: We need to work around wokkel's behaviour (namespace not added if there
1935 # is a node) as it cause issues with XEP-0115 & PEP (XEP-0163): there is a
1936 # node when server ask for disco info, and not when we generate the key, so
1937 # the hash is used with different disco features, and when the server (seen
1938 # on ejabberd) generate its own hash for security check it reject our
1939 # features (resulting in e.g. no notification on PEP)
1940 return generic.VersionHandler.getDiscoInfo(self, requestor, target, None)
1941
1942
1943 @implementer(iwokkel.IDisco)
1944 class SatIdentityHandler(XMPPHandler):
1945 """Manage disco Identity of SàT."""
1946 # TODO: dynamic identity update (see docstring). Note that a XMPP entity can have
1947 # several identities
1948
1949 def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
1950 return self.parent.identities
1951
1952 def getDiscoItems(self, requestor, target, nodeIdentifier=""):
1953 return []