Mercurial > libervia-backend
view libervia/backend/core/xmpp.py @ 4306:94e0968987cd
plugin XEP-0033: code modernisation, improve delivery, data validation:
- Code has been rewritten using Pydantic models and `async` coroutines for data validation
and cleaner element parsing/generation.
- Delivery has been completely rewritten. It now works even if server doesn't support
multicast, and send to local multicast service first. Delivering to local multicast
service first is due to bad support of XEP-0033 in server (notably Prosody which has an
incomplete implementation), and the current impossibility to detect if a sub-domain
service handles fully multicast or only for local domains. This is a workaround to have
a good balance between backward compatilibity and use of bandwith, and to make it work
with the incoming email gateway implementation (the gateway will only deliver to
entities of its own domain).
- disco feature checking now uses `async` corountines. `host` implementation still use
Deferred return values for compatibility with legacy code.
rel 450
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 26 Sep 2024 16:12:01 +0200 |
parents | 0f953ce5f0a8 |
children |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia: an XMPP client # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import asyncio import calendar import copy from functools import partial import mimetypes from pathlib import Path import sys import time from typing import Callable, Dict, Tuple, Optional from urllib.parse import unquote, urlparse import uuid import shortuuid from twisted.internet import defer, error as internet_error, reactor from twisted.internet import ssl from twisted.python import failure from twisted.words.protocols.jabber import xmlstream from twisted.words.protocols.jabber import error from twisted.words.protocols.jabber import jid from twisted.words.protocols.jabber.xmlstream import XMPPHandler from twisted.words.xish import domish from wokkel import client as wokkel_client, disco, generic, iwokkel, xmppim from wokkel import component from wokkel import delay from zope.interface import implementer from libervia.backend.core import exceptions from libervia.backend.core import core_types from libervia.backend.core.constants import Const as C from libervia.backend.core.i18n import _ from libervia.backend.core.log import getLogger from libervia.backend.memory import cache from libervia.backend.memory import encryption from libervia.backend.memory import persistent from libervia.backend.models.core import MessageData from libervia.backend.tools import xml_tools from libervia.backend.tools import utils from libervia.backend.tools.common import data_format log = getLogger(__name__) NS_X_DATA = "jabber:x:data" NS_DISCO_INFO = "http://jabber.org/protocol/disco#info" NS_XML_ELEMENT = "urn:xmpp:xml-element" NS_ROSTER_VER = "urn:xmpp:features:rosterver" # we use 2 "@" which is illegal in a jid, to be sure we are not mixing keys # with roster jids ROSTER_VER_KEY = "@version@" class ClientPluginWrapper: """Use a plugin with default value if plugin is missing""" def __init__(self, client, plugin_name, missing): self.client = client self.plugin = client.host_app.plugins.get(plugin_name) if self.plugin is None: self.plugin_name = plugin_name self.missing = missing def __getattr__(self, attr): if self.plugin is None: missing = self.missing if isinstance(missing, type) and issubclass(missing, Exception): raise missing(f"plugin {self.plugin_name!r} is not available") elif isinstance(missing, Exception): raise missing else: return lambda *args, **kwargs: missing return partial(getattr(self.plugin, attr), self.client) class SatXMPPEntity(core_types.SatXMPPEntity): """Common code for Client and Component""" # profile is added there when start_connection begins and removed when it is finished profiles_connecting = set() def __init__(self, host_app, profile, max_retries): factory = self.factory # we monkey patch clientConnectionLost to handle network_enabled/network_disabled # and to allow plugins to tune reconnection mechanism clientConnectionFailed_ori = factory.clientConnectionFailed clientConnectionLost_ori = factory.clientConnectionLost factory.clientConnectionFailed = partial( self.connection_terminated, term_type="failed", cb=clientConnectionFailed_ori ) factory.clientConnectionLost = partial( self.connection_terminated, term_type="lost", cb=clientConnectionLost_ori ) factory.maxRetries = max_retries factory.maxDelay = 30 # when self._connected_d is None, we are not connected # else, it's a deferred which fire on disconnection self._connected_d = None self.profile = profile self.host_app = host_app self.cache = cache.Cache(host_app, profile) self.mess_id2uid = {} # map from message id to uid used in history. # Key: (full_jid, message_id) Value: uid # this Deferred fire when entity is connected self.conn_deferred = defer.Deferred() self._progress_cb = {} # callback called when a progress is requested # (key = progress id) self.actions = {} # used to keep track of actions for retrieval (key = action_id) self.encryption = encryption.EncryptionHandler(self) def __str__(self): return f"Client for profile {self.profile}" def __repr__(self): return f"{super().__repr__()} - profile: {self.profile!r}" ## initialisation ## async def _call_connection_triggers(self, connection_timer): """Call conneting trigger prepare connected trigger @param plugins(iterable): plugins to use @return (list[object, callable]): plugin to trigger tuples with: - plugin instance - profile_connected* triggers (to call after connection) """ plugin_conn_cb = [] self.plugins = plugins = self._get_plugins_list() for plugin in plugins: # we check if plugin handle client mode if plugin.is_handler: plugin.get_handler(self).setHandlerParent(self) # profile_connecting/profile_connected methods handling timer = connection_timer[plugin] = {"total": 0} # profile connecting is called right now (before actually starting client) connecting_cb = getattr(plugin, "profile_connecting", None) if connecting_cb is not None: connecting_start = time.time() await utils.as_deferred(connecting_cb, self) timer["connecting"] = time.time() - connecting_start timer["total"] += timer["connecting"] # profile connected is called after client is ready and roster is got connected_cb = getattr(plugin, "profile_connected", None) if connected_cb is not None: plugin_conn_cb.append((plugin, connected_cb)) return plugin_conn_cb def _get_plugins_list(self): """Return list of plugin to use need to be implemented by subclasses this list is used to call profileConnect* triggers @return(iterable[object]): plugins to use """ raise NotImplementedError def _create_sub_protocols(self): return def entity_connected(self): """Called once connection is done may return a Deferred, to perform initialisation tasks """ return @staticmethod async def _run_profile_connected( callback: Callable, entity: "SatXMPPEntity", timer: Dict[str, float] ) -> None: connected_start = time.time() await utils.as_deferred(callback, entity) timer["connected"] = time.time() - connected_start timer["total"] += timer["connected"] @classmethod async def start_connection(cls, host, profile, max_retries): """instantiate the entity and start the connection""" # FIXME: reconnection doesn't seems to be handled correclty # (client is deleted then recreated from scratch) # most of methods called here should be called once on first connection # (e.g. adding subprotocols) # but client should not be deleted except if session is finished # (independently of connection/deconnection) if profile in cls.profiles_connecting: raise exceptions.CancelError(f"{profile} is already being connected") cls.profiles_connecting.add(profile) try: try: port = int( host.memory.param_get_a( C.FORCE_PORT_PARAM, "Connection", profile_key=profile ) ) except ValueError: log.debug(_("Can't parse port value, using default value")) port = None # will use default value 5222 or be retrieved from a DNS SRV record password = await host.memory.param_get_a_async( "Password", "Connection", profile_key=profile ) entity_jid_s = await host.memory.param_get_a_async( "JabberID", "Connection", profile_key=profile ) entity_jid = jid.JID(entity_jid_s) if not entity_jid.resource and not cls.is_component and entity_jid.user: # if no resource is specified, we create our own instead of using # server returned one, as it will then stay stable in case of # reconnection. we only do that for client and if there is a user part, to # let server decide for anonymous login resource_dict = await host.memory.storage.get_privates( "core:xmpp", ["resource"], profile=profile ) try: resource = resource_dict["resource"] except KeyError: resource = f"{C.APP_NAME_FILE}.{shortuuid.uuid()}" await host.memory.storage.set_private_value( "core:xmpp", "resource", resource, profile=profile ) log.info( _("We'll use the stable resource {resource}").format( resource=resource ) ) entity_jid.resource = resource if profile in host.profiles: if host.profiles[profile].is_connected(): raise exceptions.InternalError( f"There is already a connected profile of name {profile!r} in " f"host" ) log.debug("removing unconnected profile {profile!r}") del host.profiles[profile] entity = host.profiles[profile] = cls( host, profile, entity_jid, password, host.memory.param_get_a( C.FORCE_SERVER_PARAM, "Connection", profile_key=profile ) or None, port, max_retries, ) await entity.encryption.load_sessions() entity._create_sub_protocols() entity.fallBack = SatFallbackHandler(host) entity.fallBack.setHandlerParent(entity) entity.versionHandler = SatVersionHandler(C.APP_NAME, host.full_version) entity.versionHandler.setHandlerParent(entity) entity.identityHandler = SatIdentityHandler() entity.identityHandler.setHandlerParent(entity) log.debug(_("setting plugins parents")) connection_timer: Dict[str, Dict[str, float]] = {} plugin_conn_cb = await entity._call_connection_triggers(connection_timer) entity.startService() await entity.conn_deferred await defer.maybeDeferred(entity.entity_connected) # Call profile_connected callback for all plugins, # and print error message if any of them fails conn_cb_list = [] for plugin, callback in plugin_conn_cb: conn_cb_list.append( defer.ensureDeferred( cls._run_profile_connected( callback, entity, connection_timer[plugin] ) ) ) list_d = defer.DeferredList(conn_cb_list) def log_plugin_results(results): if not results: log.info("no plugin loaded") return all_succeed = all([success for success, result in results]) if not all_succeed: log.error(_("Plugins initialisation error")) for idx, (success, result) in enumerate(results): if not success: plugin_name = plugin_conn_cb[idx][0]._info["import_name"] log.error(f"error (plugin {plugin_name}): {result}") log.debug(f"Plugin loading time for {profile!r} (longer to shorter):\n") plugins_by_timer = sorted( connection_timer, key=lambda p: connection_timer[p]["total"], reverse=True, ) # total is the addition of all connecting and connected, doesn't really # reflect the real loading time as connected are launched in a # DeferredList total_plugins = 0 # total real sum all connecting (which happen sequentially) and the # longuest connected (connected happen in parallel, thus the longuest is # roughly the total time for connected) total_real = 0 total_real = max(t.get("connected", 0) for t in connection_timer.values()) for plugin in plugins_by_timer: name = plugin._info["import_name"] timer = connection_timer[plugin] total_plugins += timer["total"] try: connecting = f"{timer['connecting']:.2f}s" except KeyError: connecting = "n/a" else: total_real += timer["connecting"] try: connected = f"{timer['connected']:.2f}s" except KeyError: connected = "n/a" log.debug( f" - {name}: total={timer['total']:.2f}s " f"connecting={connecting} connected={connected}" ) log.debug( f" Plugins total={total_plugins:.2f}s real={total_real:.2f}s\n" ) await list_d.addCallback( log_plugin_results ) # FIXME: we should have a timeout here, and a way to know if a plugin freeze # TODO: mesure launch time of each plugin finally: cls.profiles_connecting.remove(profile) def _disconnection_cb(self, __): self._connected_d = None def _disconnection_eb(self, failure_): log.error(_("Error while disconnecting: {}".format(failure_))) def _authd(self, xmlstream): super(SatXMPPEntity, self)._authd(xmlstream) log.debug(_("{profile} identified").format(profile=self.profile)) self.stream_initialized() def _finish_connection(self, __): if self.conn_deferred.called: # can happen in case of forced disconnection by server log.debug(f"{self} has already been connected") else: self.conn_deferred.callback(None) def stream_initialized(self): """Called after _authd""" log.debug(_("XML stream is initialized")) if not self.host_app.trigger.point("xml_init", self): return self.post_stream_init() def post_stream_init(self): """Workflow after stream initalisation.""" log.info( _("********** [{profile}] CONNECTED **********").format(profile=self.profile) ) # the following Deferred is used to know when we are connected # so we need to be set it to None when connection is lost self._connected_d = defer.Deferred() self._connected_d.addCallback(self._clean_connection) self._connected_d.addCallback(self._disconnection_cb) self._connected_d.addErrback(self._disconnection_eb) # we send the signal to the clients self.host_app.bridge.connected(self.jid.full(), self.profile) self.disco = SatDiscoProtocol(self) self.disco.setHandlerParent(self) self.discoHandler = disco.DiscoHandler() self.discoHandler.setHandlerParent(self) disco_d = defer.succeed(None) if not self.host_app.trigger.point("Disco handled", disco_d, self.profile): return disco_d.addCallback(self._finish_connection) def initializationFailed(self, reason): log.error( _( "ERROR: XMPP connection failed for profile '%(profile)s': %(reason)s" % {"profile": self.profile, "reason": reason} ) ) self.conn_deferred.errback(reason.value) try: super(SatXMPPEntity, self).initializationFailed(reason) except: # we already chained an errback, no need to raise an exception pass ## connection ## def connection_terminated(self, connector, reason, term_type, cb): """Display disconnection reason, and call factory method This method is monkey patched to factory, allowing plugins to handle finely reconnection with the triggers. @param connector(twisted.internet.base.BaseConnector): current connector @param reason(failure.Failure): why connection has been terminated @param term_type(unicode): on of 'failed' or 'lost' @param cb(callable): original factory method @trigger connection_failed(connector, reason): connection can't be established @trigger connection_lost(connector, reason): connection was available but it not anymore """ # we save connector because it may be deleted when connection will be dropped # if reconnection is disabled self._saved_connector = connector if reason is not None and not isinstance( reason.value, internet_error.ConnectionDone ): try: reason_str = str(reason.value) except Exception: # FIXME: workaround for Android were p4a strips docstrings # while Twisted use docstring in __str__ # TODO: create a ticket upstream, Twisted should work when optimization # is used reason_str = str(reason.value.__class__) log.warning(f"[{self.profile}] Connection {term_type}: {reason_str}") if not self.host_app.trigger.point("connection_" + term_type, connector, reason): return return cb(connector, reason) def network_disabled(self): """Indicate that network has been completely disabled In other words, internet is not available anymore and transport must be stopped. Retrying is disabled too, as it makes no sense to try without network, and it may use resources (notably battery on mobiles). """ log.info(_("stopping connection because of network disabled")) self.factory.continueTrying = 0 self._network_disabled = True if self.xmlstream is not None: self.xmlstream.transport.abortConnection() def network_enabled(self): """Indicate that network has been (re)enabled This happens when e.g. user activate WIFI connection. """ try: connector = self._saved_connector network_disabled = self._network_disabled except AttributeError: # connection has not been stopped by network_disabled # we don't have to restart it log.debug(f"no connection to restart [{self.profile}]") return else: del self._network_disabled if not network_disabled: raise exceptions.InternalError("network_disabled should be True") log.info(_("network is available, trying to connect")) # we want to be sure to start fresh self.factory.resetDelay() # we have a saved connector, meaning the connection has been stopped previously # we can now try to reconnect connector.connect() def _connected(self, xs): send_hooks = [] receive_hooks = [] self.host_app.trigger.point("stream_hooks", self, receive_hooks, send_hooks) for hook in receive_hooks: xs.add_hook(C.STREAM_HOOK_RECEIVE, hook) for hook in send_hooks: xs.add_hook(C.STREAM_HOOK_SEND, hook) super(SatXMPPEntity, self)._connected(xs) def disconnect_profile(self, reason): if self._connected_d is not None: self.host_app.bridge.disconnected( self.profile ) # we send the signal to the clients log.info( _("********** [{profile}] DISCONNECTED **********").format( profile=self.profile ) ) # we purge only if no new connection attempt is expected if not self.factory.continueTrying: log.debug("continueTrying not set, purging entity") self._connected_d.callback(None) # and we remove references to this client self.host_app.purge_entity(self.profile) if not self.conn_deferred.called: if reason is None: err = error.StreamError("Server unexpectedly closed the connection") else: err = reason try: if err.value.args[0][0][2] == "certificate verify failed": err = exceptions.InvalidCertificate( _( "Your server certificate is not valid " "(its identity can't be checked).\n\n" "This should never happen and may indicate that " "somebody is trying to spy on you.\n" "Please contact your server administrator." ) ) self.factory.stopTrying() try: # with invalid certificate, we should not retry to connect # so we delete saved connector to avoid reconnection if # network_enabled is called. del self._saved_connector except AttributeError: pass except (IndexError, TypeError): pass self.conn_deferred.errback(err) def _disconnected(self, reason): super(SatXMPPEntity, self)._disconnected(reason) if not self.host_app.trigger.point("disconnected", self, reason): return self.disconnect_profile(reason) @defer.inlineCallbacks def _clean_connection(self, __): """method called on disconnection used to call profile_disconnected* triggers """ trigger_name = "profile_disconnected" for plugin in self._get_plugins_list(): disconnected_cb = getattr(plugin, trigger_name, None) if disconnected_cb is not None: yield disconnected_cb(self) def is_connected(self): """Return True is client is fully connected client is considered fully connected if transport is started and all plugins are initialised """ try: transport_connected = bool(self.xmlstream.transport.connected) except AttributeError: return False return self._connected_d is not None and transport_connected def entity_disconnect(self): if not self.host_app.trigger.point("disconnecting", self): return log.info(_("Disconnecting...")) self.stopService() if self._connected_d is not None: return self._connected_d else: return defer.succeed(None) ## sending ## def IQ(self, type_="set", timeout=60): """shortcut to create an IQ element managing deferred @param type_(unicode): IQ type ('set' or 'get') @param timeout(None, int): timeout in seconds @return((D)domish.Element: result stanza errback is called if an error stanza is returned """ iq_elt = xmlstream.IQ(self.xmlstream, type_) iq_elt.timeout = timeout return iq_elt def sendError(self, iq_elt, condition, text=None, appCondition=None): """Send error stanza build from iq_elt @param iq_elt(domish.Element): initial IQ element @param condition(unicode): error condition """ iq_error_elt = error.StanzaError( condition, text=text, appCondition=appCondition ).toResponse(iq_elt) self.xmlstream.send(iq_error_elt) def generate_message_xml( self, data: core_types.MessageData, post_xml_treatments: Optional[defer.Deferred] = None, ) -> core_types.MessageData: """Generate <message/> stanza from message data @param data: message data domish element will be put in data['xml'] following keys are needed: - from - to - uid: can be set to '' if uid attribute is not wanted - message - type - subject - extra @param post_xml_treatments: a Deferred which will be called with data once XML is generated @return: message data """ data["xml"] = message_elt = domish.Element((None, "message")) message_elt["to"] = data["to"].full() message_elt["from"] = data["from"].full() message_elt["type"] = data["type"] if data["uid"]: # key must be present but can be set to '' # by a plugin to avoid id on purpose message_elt["id"] = data["uid"] for lang, subject in data["subject"].items(): subject_elt = message_elt.addElement("subject", content=subject) if lang: subject_elt[(C.NS_XML, "lang")] = lang for lang, message in data["message"].items(): body_elt = message_elt.addElement("body", content=message) if lang: body_elt[(C.NS_XML, "lang")] = lang try: thread = data["extra"]["thread"] except KeyError: if "thread_parent" in data["extra"]: raise exceptions.InternalError( "thread_parent found while there is not associated thread" ) else: thread_elt = message_elt.addElement("thread", content=thread) try: thread_elt["parent"] = data["extra"]["thread_parent"] except KeyError: pass if post_xml_treatments is not None: post_xml_treatments.callback(data) return data @property def is_admin(self) -> bool: """True if a client is an administrator with extra privileges""" return self.host_app.memory.is_admin(self.profile) def add_post_xml_callbacks(self, post_xml_treatments): """Used to add class level callbacks at the end of the workflow @param post_xml_treatments(D): the same Deferred as in sendMessage trigger """ raise NotImplementedError async def a_send(self, obj: domish.Element) -> None: # original send method accept string # but we restrict to domish.Element to make trigger treatments easier assert isinstance(obj, domish.Element) # XXX: this trigger is the last one before sending stanza on wire # it is intended for things like end 2 end encryption. # *DO NOT* cancel (i.e. return False) without very good reason # (out of band transmission for instance). # e2e should have a priority of 0 here, and out of band transmission # a lower priority if not (await self.host_app.trigger.async_point("send", self, obj)): return super().send(obj) def send(self, obj: domish.Element): defer.ensureDeferred(self.a_send(obj)) async def send_message_data(self, mess_data): """Convenient method to send message data to stream This method will send mess_data[u'xml'] to stream, but a trigger is there The trigger can't be cancelled, it's a good place for e2e encryption which don't handle full stanza encryption This trigger can return a Deferred (it's an async_point) @param mess_data(dict): message data as constructed by onMessage workflow @return (dict): mess_data (so it can be used in a deferred chain) """ # XXX: This is the last trigger before "send" (last but one globally) # for sending message. # This is intented for e2e encryption which doesn't do full stanza # encryption (e.g. OTR) # This trigger point can't cancel the method await self.host_app.trigger.async_point( "send_message_data", self, mess_data, triggers_no_cancel=True ) await self.a_send(mess_data["xml"]) return mess_data def sendMessage( self, to_jid, message, subject=None, mess_type="auto", extra=None, uid=None, no_trigger=False, ): r"""Send a message to an entity @param to_jid(jid.JID): destinee of the message @param message(dict): message body, key is the language (use '' when unknown) @param subject(dict): message subject, key is the language (use '' when unknown) @param mess_type(str): one of standard message type (cf RFC 6121 §5.2.2) or: - auto: for automatic type detection - info: for information ("info_type" can be specified in extra) @param extra(dict, None): extra data. Key can be: - info_type: information type, can be TODO @param uid(unicode, None): unique id: should be unique at least in this XMPP session if None, an uuid will be generated @param no_trigger (bool): if True, sendMessage[suffix] trigger will no be used useful when a message need to be sent without any modification /!\ this will also skip encryption methods! """ if subject is None: subject = {} if extra is None: extra = {} assert mess_type in C.MESS_TYPE_ALL data = { # dict is similar to the one used in client.onMessage "from": self.jid, "to": to_jid, "uid": uid or str(uuid.uuid4()), "message": message, "subject": subject, "type": mess_type, "extra": extra, "timestamp": time.time(), } # XXX: plugin can add their pre XML treatments to this deferred pre_xml_treatments = defer.Deferred() # XXX: plugin can add their post XML treatments to this deferred post_xml_treatments = defer.Deferred() if data["type"] == C.MESS_TYPE_AUTO: # we try to guess the type if data["subject"]: data["type"] = C.MESS_TYPE_NORMAL elif not data["to"].resource: # we may have a groupchat message, we check if the we know this jid try: entity_type = self.host_app.memory.get_entity_datum( self, data["to"], C.ENTITY_TYPE ) # FIXME: should entity_type manage resources ? except (exceptions.UnknownEntityError, KeyError): entity_type = "contact" if entity_type == C.ENTITY_TYPE_MUC: data["type"] = C.MESS_TYPE_GROUPCHAT else: data["type"] = C.MESS_TYPE_CHAT else: data["type"] = C.MESS_TYPE_CHAT # FIXME: send_only is used by libervia's OTR plugin to avoid # the triggers from frontend, and no_trigger do the same # thing internally, this could be unified send_only = data["extra"].get("send_only", False) if not no_trigger and not send_only: # is the session encrypted? If so we indicate it in data self.encryption.set_encryption_flag(data) if not self.host_app.trigger.point( "sendMessage" + self.trigger_suffix, self, data, pre_xml_treatments, post_xml_treatments, ): return defer.succeed(None) log.debug( _("Sending message (type {type}, to {to})").format( type=data["type"], to=to_jid.full() ) ) pre_xml_treatments.addCallback( lambda __: self.generate_message_xml(data, post_xml_treatments) ) pre_xml_treatments.addCallback(lambda __: post_xml_treatments) pre_xml_treatments.addErrback(self._cancel_error_trap) post_xml_treatments.addCallback( lambda __: defer.ensureDeferred(self.send_message_data(data)) ) if send_only: log.debug( _( "Triggers, storage and echo have been inhibited by the " "'send_only' parameter" ) ) else: self.add_post_xml_callbacks(post_xml_treatments) post_xml_treatments.addErrback(self._cancel_error_trap) post_xml_treatments.addErrback(self.host_app.log_errback) pre_xml_treatments.callback(data) return pre_xml_treatments def _cancel_error_trap(self, failure): """A message sending can be cancelled by a plugin treatment""" failure.trap(exceptions.CancelError) def is_message_printable(self, mess_data): """Return True if a message contain payload to show in frontends""" return ( mess_data["message"] or mess_data["subject"] or mess_data["extra"].get(C.KEY_ATTACHMENTS) or mess_data["type"] == C.MESS_TYPE_INFO ) async def message_add_to_history(self, data): """Store message into database (for local history) @param data: message data dictionnary @param client: profile's client """ if data["type"] != C.MESS_TYPE_GROUPCHAT: # we don't add groupchat message to history, as we get them back # and they will be added then # we need a message to store if self.is_message_printable(data): await self.host_app.memory.add_to_history(self, data) else: log.warning( "No message found" ) # empty body should be managed by plugins before this point return data def message_get_bridge_args(self, data): """Generate args to use with bridge from data dict""" return ( data["uid"], data["timestamp"], data["from"].full(), data["to"].full(), data["message"], data["subject"], data["type"], data_format.serialise(data["extra"]), ) def message_send_to_bridge(self, data): """Send message to bridge, so frontends can display it @param data: message data dictionnary """ if data["type"] != C.MESS_TYPE_GROUPCHAT: # we don't send groupchat message to bridge, as we get them back # and they will be added the # we need a message to send something if self.is_message_printable(data): # We send back the message, so all frontends are aware of it self.host_app.bridge.message_new( *self.message_get_bridge_args(data), profile=self.profile ) else: log.warning(_("No message found")) return data ## helper methods ## def p(self, plugin_name, missing=exceptions.MissingModule): """Get a plugin if available @param plugin_name(str): name of the plugin @param missing(object): value to return if plugin is missing if it is a subclass of Exception, it will be raised with a helping str as argument. @return (object): requested plugin wrapper, or default value The plugin wrapper will return the method with client set as first positional argument """ return ClientPluginWrapper(self, plugin_name, missing) ExtraDict = dict # TODO @implementer(iwokkel.IDisco) class SatXMPPClient(SatXMPPEntity, wokkel_client.XMPPClient): trigger_suffix = "" is_component = False def __init__( self, host_app, profile, user_jid, password, host=None, port=C.XMPP_C2S_PORT, max_retries=C.XMPP_MAX_RETRIES, ): # XXX: DNS SRV records are checked when the host is not specified. # If no SRV record is found, the host is directly extracted from the JID. self.started = time.time() # Currently, we use "client/pc/Salut à Toi", but as # SàT is multi-frontends and can be used on mobile devices, as a bot, # with a web frontend, # etc., we should implement a way to dynamically update identities through the # bridge self.identities = [disco.DiscoIdentity("client", "pc", C.APP_NAME)] if sys.platform == "android": # for now we consider Android devices to be always phones self.identities = [disco.DiscoIdentity("client", "phone", C.APP_NAME)] hosts_map = host_app.memory.config_get(None, "hosts_dict", {}) if host is None and user_jid.host in hosts_map: host_data = hosts_map[user_jid.host] if isinstance(host_data, str): host = host_data elif isinstance(host_data, dict): if "host" in host_data: host = host_data["host"] if "port" in host_data: port = host_data["port"] else: log.warning( _("invalid data used for host: {data}").format(data=host_data) ) host_data = None if host_data is not None: log.info( "using {host}:{port} for host {host_ori} as requested in config".format( host_ori=user_jid.host, host=host, port=port ) ) self.check_certificate = host_app.memory.param_get_a( "check_certificate", "Connection", profile_key=profile ) if self.check_certificate: tls_required, configurationForTLS = True, None else: tls_required = False configurationForTLS = ssl.CertificateOptions(trustRoot=None) wokkel_client.XMPPClient.__init__( self, user_jid, password, host or None, port or C.XMPP_C2S_PORT, tls_required=tls_required, configurationForTLS=configurationForTLS, ) SatXMPPEntity.__init__(self, host_app, profile, max_retries) if not self.check_certificate: msg = _( "Certificate validation is deactivated, this is unsecure and " "somebody may be spying on you. If you have no good reason to disable " 'certificate validation, please activate "Check certificate" in your ' 'settings in "Connection" tab.' ) xml_tools.quick_note( host_app, self, msg, _("Security notice"), level=C.XMLUI_DATA_LVL_WARNING ) @property def server_jid(self): return jid.JID(self.jid.host) def _get_plugins_list(self): for p in self.host_app.plugins.values(): if C.PLUG_MODE_CLIENT in p._info["modes"]: yield p def _create_sub_protocols(self): self.messageProt = SatMessageProtocol(self.host_app) self.messageProt.setHandlerParent(self) self.roster = LiberviaRosterProtocol(self.host_app) self.roster.setHandlerParent(self) self.presence = SatPresenceProtocol(self.host_app) self.presence.setHandlerParent(self) @classmethod async def start_connection(cls, host, profile, max_retries): try: await super(SatXMPPClient, cls).start_connection(host, profile, max_retries) except exceptions.CancelError as e: log.warning(f"start_connection cancelled: {e}") return entity = host.profiles[profile] # we finally send our presence entity.presence.available() def entity_connected(self): # we want to be sure that we got the roster return self.roster.got_roster def add_post_xml_callbacks(self, post_xml_treatments): post_xml_treatments.addCallback(self.messageProt.complete_attachments) post_xml_treatments.addCallback( lambda ret: defer.ensureDeferred(self.message_add_to_history(ret)) ) post_xml_treatments.addCallback(self.message_send_to_bridge) def feedback( self, to_jid: jid.JID, message: str, extra: Optional[ExtraDict] = None ) -> None: """Send message to frontends This message will be an info message, not recorded in history. It can be used to give feedback of a command @param to_jid: destinee jid @param message: message to send to frontends @param extra: extra data to use in particular, info subtype can be specified with MESS_EXTRA_INFO """ if extra is None: extra = {} self.host_app.bridge.message_new( uid=str(uuid.uuid4()), timestamp=time.time(), from_jid=self.jid.full(), to_jid=to_jid.full(), message={"": message}, subject={}, mess_type=C.MESS_TYPE_INFO, extra=data_format.serialise(extra), profile=self.profile, ) def _finish_connection(self, __): d = self.roster.request_roster() d.addCallback(lambda __: super(SatXMPPClient, self)._finish_connection(__)) @implementer(iwokkel.IDisco) class SatXMPPComponent(SatXMPPEntity, component.Component): """XMPP component This component are similar but not identical to clients. An entry point plugin is launched after component is connected. Component need to instantiate MessageProtocol itself """ trigger_suffix = ( "Component" # used for to distinguish some trigger points set in SatXMPPEntity ) is_component = True # XXX: set to True from entry plugin to keep messages in history for sent messages sendHistory = False # XXX: same as sendHistory but for received messaged receiveHistory = False def __init__( self, host_app, profile, component_jid, password, host=None, port=None, max_retries=C.XMPP_MAX_RETRIES, ): self.started = time.time() if port is None: port = C.XMPP_COMPONENT_PORT ## entry point ## entry_point = host_app.memory.get_entry_point(profile) try: self.entry_plugin = host_app.plugins[entry_point] except KeyError: raise exceptions.NotFound( _("The requested entry point ({entry_point}) is not available").format( entry_point=entry_point ) ) self.enabled_features = set() self.identities = [disco.DiscoIdentity("component", "generic", C.APP_NAME)] # jid is set automatically on bind by Twisted for Client, but not for Component self.jid = component_jid if host is None: try: host = component_jid.host.split(".", 1)[1] except IndexError: raise ValueError("Can't guess host from jid, please specify a host") # XXX: component.Component expect unicode jid, while Client expect jid.JID. # this is not consistent, so we use jid.JID for SatXMPP* component.Component.__init__(self, host, port, component_jid.full(), password) SatXMPPEntity.__init__(self, host_app, profile, max_retries) @property def server_jid(self): # FIXME: not the best way to get server jid, maybe use config option? return jid.JID(self.jid.host.split(".", 1)[-1]) @property def is_admin(self) -> bool: return False def is_local(self, jid_: jid.JID) -> bool: """Returns True if jid_ use a domain or subdomain of component's host""" local_host = self.host.split(".") assert local_host return jid_.host.split(".")[-len(local_host) :] == local_host def _create_sub_protocols(self): self.messageProt = SatMessageProtocol(self.host_app) self.messageProt.setHandlerParent(self) def _build_dependencies(self, current, plugins, required=True): """build recursively dependencies needed for a plugin this method build list of plugin needed for a component and raises errors if they are not available or not allowed for components @param current(object): parent plugin to check use entry_point for first call @param plugins(list): list of validated plugins, will be filled by the method give an empty list for first call @param required(bool): True if plugin is mandatory for recursive calls only, should not be modified by inital caller @raise InternalError: one of the plugin is not handling components @raise KeyError: one plugin should be present in self.host_app.plugins but it is not """ if C.PLUG_MODE_COMPONENT not in current._info["modes"]: if not required: return else: log.error( _( "Plugin {current_name} is needed for {entry_name}, " "but it doesn't handle component mode" ).format( current_name=current._info["import_name"], entry_name=self.entry_plugin._info["import_name"], ) ) raise exceptions.InternalError(_("invalid plugin mode")) for import_name in current._info.get(C.PI_DEPENDENCIES, []): # plugins are already loaded as dependencies # so we know they are in self.host_app.plugins dep = self.host_app.plugins[import_name] self._build_dependencies(dep, plugins) for import_name in current._info.get(C.PI_RECOMMENDATIONS, []): # here plugins are only recommendations, # so they may not exist in self.host_app.plugins try: dep = self.host_app.plugins[import_name] except KeyError: continue self._build_dependencies(dep, plugins, required=False) if current not in plugins: # current can be required for several plugins and so # it can already be present in the list plugins.append(current) def _get_plugins_list(self): # XXX: for component we don't launch all plugins triggers # but only the ones from which there is a dependency plugins = [] self._build_dependencies(self.entry_plugin, plugins) return plugins def entity_connected(self): # we can now launch entry point try: start_cb = self.entry_plugin.componentStart except AttributeError: return else: return start_cb(self) def add_post_xml_callbacks(self, post_xml_treatments): if self.sendHistory: post_xml_treatments.addCallback( lambda ret: defer.ensureDeferred(self.message_add_to_history(ret)) ) def get_owner_from_jid(self, to_jid: jid.JID) -> jid.JID: """Retrieve "owner" of a component resource from the destination jid of the request This method needs plugin XEP-0106 for unescaping, if you use it you must add the plugin to your dependencies. A "user" part must be present in "to_jid" (otherwise, the component itself is addressed) @param to_jid: destination JID of the request """ try: unescape = self.host_app.plugins["XEP-0106"].unescape except KeyError: raise exceptions.MissingPlugin("Plugin XEP-0106 is needed to retrieve owner") else: user = unescape(to_jid.user) if "@" in user: # a full jid is specified return jid.JID(user) else: # only user part is specified, we use our own host to build the full jid return jid.JID(None, (user, self.host, None)) def get_owner_and_peer(self, iq_elt: domish.Element) -> Tuple[jid.JID, jid.JID]: """Retrieve owner of a component jid, and the jid of the requesting peer "owner" is found by either unescaping full jid from node, or by combining node with our host. Peer jid is the requesting jid from the IQ element @param iq_elt: IQ stanza sent from the requested @return: owner and peer JIDs """ to_jid = jid.JID(iq_elt["to"]) if to_jid.user: owner = self.get_owner_from_jid(to_jid) else: owner = jid.JID(iq_elt["from"]).userhostJID() peer_jid = jid.JID(iq_elt["from"]) return peer_jid, owner def get_virtual_client(self, jid_: jid.JID) -> SatXMPPEntity: """Get client for this component with a specified jid This is needed to perform operations with a virtual JID corresponding to a virtual entity (e.g. identified of a legacy network account) instead of the JID of the gateway itself. @param jid_: virtual JID to use @return: virtual client """ client = copy.copy(self) client.jid = jid_ return client class SatMessageProtocol(xmppim.MessageProtocol): def __init__(self, host): xmppim.MessageProtocol.__init__(self) self.host = host self.messages_queue = defer.DeferredQueue() def setHandlerParent(self, parent): super().setHandlerParent(parent) defer.ensureDeferred(self.process_messages()) @property def client(self): return self.parent def normalize_ns(self, elt: domish.Element, namespace: Optional[str]) -> None: if elt.uri == namespace: elt.defaultUri = elt.uri = C.NS_CLIENT for child in elt.elements(): self.normalize_ns(child, namespace) def parse_message(self, message_elt: domish.Element) -> MessageData: """Parse a message XML and return message_data @param message_elt(domish.Element): raw <message> xml @param client(SatXMPPClient, None): client to map message id to uid if None, mapping will not be done @return(dict): message data """ if message_elt.name != "message": log.error( _( "parse_message used with a non <message/> stanza, ignoring: {xml}".format( xml=message_elt.toXml() ) ) ) return {} if message_elt.uri == None: # xmlns may be None when wokkel element parsing strip out root namespace self.normalize_ns(message_elt, None) elif message_elt.uri != C.NS_CLIENT: log.error( _( "received <message> with a wrong namespace: {xml}".format( xml=message_elt.toXml() ) ) ) client = self.parent if not message_elt.hasAttribute("to"): message_elt["to"] = client.jid.full() message = {} subject = {} extra = {} data: MessageData = { "from": jid.JID(message_elt["from"]), "to": jid.JID(message_elt["to"]), "uid": message_elt.getAttribute( "uid", str(uuid.uuid4()) ), # XXX: uid is not a standard attribute but may be added by plugins "message": message, "subject": subject, "type": message_elt.getAttribute("type", "normal"), "extra": extra, } try: message_id = data["extra"]["message_id"] = message_elt["id"] except KeyError: pass else: client.mess_id2uid[(data["from"], message_id)] = data["uid"] # message for e in message_elt.elements(C.NS_CLIENT, "body"): message[e.getAttribute((C.NS_XML, "lang"), "")] = str(e) # subject for e in message_elt.elements(C.NS_CLIENT, "subject"): subject[e.getAttribute((C.NS_XML, "lang"), "")] = str(e) # delay and timestamp try: received_timestamp = message_elt._received_timestamp except AttributeError: # message_elt._received_timestamp should have been set in onMessage # but if parse_message is called directly, it can be missing log.debug( "missing received timestamp for {message_elt}".format( message_elt=message_elt ) ) received_timestamp = time.time() try: delay_elt = next(message_elt.elements(delay.NS_DELAY, "delay")) except StopIteration: data["timestamp"] = received_timestamp else: parsed_delay = delay.Delay.fromElement(delay_elt) data["timestamp"] = calendar.timegm(parsed_delay.stamp.utctimetuple()) data["received_timestamp"] = received_timestamp if parsed_delay.sender: data["delay_sender"] = parsed_delay.sender.full() self.host.trigger.point("message_parse", client, message_elt, data) return data def onMessage(self, message_elt: domish.Element) -> None: message_elt._received_timestamp = time.time() self.messages_queue.put(message_elt) async def process_messages(self) -> None: """Process message in order Messages are processed in a queue to avoid race conditions and ensure orderly processing. """ client = self.parent if client is None: log.error("client should not be None!") raise exceptions.InternalError() while True: message_elt = await self.messages_queue.get() try: await self.process_message(client, message_elt) except Exception: log.exception(f"Can't process message {message_elt.toXml()}") def _on_processing_timeout( self, message_elt: domish.Element, async_point_d: defer.Deferred ) -> None: log.error( "Processing of following message took too long, cancelling:" f"{message_elt.toXml()}" ) async_point_d.cancel() async def process_message( self, client: SatXMPPEntity, message_elt: domish.Element ) -> None: # TODO: handle threads if not "from" in message_elt.attributes: message_elt["from"] = client.jid.host log.debug(_("got message from: {from_}").format(from_=message_elt["from"])) if self.client.is_component and message_elt.uri == component.NS_COMPONENT_ACCEPT: # we use client namespace all the time to simplify parsing self.normalize_ns(message_elt, component.NS_COMPONENT_ACCEPT) # plugin can add their treatments to this deferred post_treat = defer.Deferred() async_point_d = defer.ensureDeferred( self.host.trigger.async_point( "message_received", client, message_elt, post_treat ) ) # message_received triggers block the messages queue, so they must not take too # long to proceed. delayed_call = reactor.callLater( 10, self._on_processing_timeout, message_elt, async_point_d ) trigger_ret_continue = await async_point_d if delayed_call.active(): delayed_call.cancel() log.debug(f"delayed_call for {async_point_d} cancelled") if not trigger_ret_continue: # trigger returned False, we stop the workflow. return try: data = self.parse_message(message_elt) # we now do all post treatments added by plugins post_treat.callback(data) await post_treat self.complete_attachments(data) if not data["message"] and not data["extra"] and not data["subject"]: raise exceptions.CancelError("Cancelled empty message") if not client.is_component or client.receiveHistory: await self.add_to_history(data) if not client.is_component: self.bridge_signal(data) except exceptions.CancelError: pass def complete_attachments(self, data: MessageData) -> MessageData: """Complete missing metadata of attachments""" for attachment in data["extra"].get(C.KEY_ATTACHMENTS, []): if "name" not in attachment and "url" in attachment: name = ( Path(unquote(urlparse(attachment["url"]).path)).name or C.FILE_DEFAULT_NAME ) attachment["name"] = name if C.KEY_ATTACHMENTS_MEDIA_TYPE not in attachment and "name" in attachment: media_type = mimetypes.guess_type(attachment["name"], strict=False)[0] if media_type: attachment[C.KEY_ATTACHMENTS_MEDIA_TYPE] = media_type return data async def add_to_history(self, data: MessageData) -> MessageData: if data.pop("history", None) == C.HISTORY_SKIP: log.debug("history is skipped as requested") data["extra"]["history"] = C.HISTORY_SKIP else: # we need a message to store if self.parent.is_message_printable(data): return await self.host.memory.add_to_history(self.parent, data) else: log.debug( "not storing empty message to history: {data}".format(data=data) ) return data def bridge_signal(self, data: MessageData) -> MessageData: """Send signal to frontends for the given message""" try: data["extra"]["received_timestamp"] = str(data["received_timestamp"]) data["extra"]["delay_sender"] = data["delay_sender"] except KeyError: pass if self.client.encryption.isEncrypted(data): data["extra"]["encrypted"] = True if data is not None: if self.parent.is_message_printable(data): self.host.bridge.message_new( data["uid"], data["timestamp"], data["from"].full(), data["to"].full(), data["message"], data["subject"], data["type"], data_format.serialise(data["extra"]), profile=self.parent.profile, ) else: log.debug( "Discarding bridge signal for empty message: {data}".format(data=data) ) return data class LiberviaRosterProtocol(xmppim.RosterClientProtocol): def __init__(self, host): xmppim.RosterClientProtocol.__init__(self) self.host = host self.got_roster = defer.Deferred() # called when roster is received and ready # XXX: the two following dicts keep a local copy of the roster self._jids = {} # map from jids to RosterItem: key=jid value=RosterItem self._groups = {} # map from groups to jids: key=group value=set of jids def __contains__(self, entity_jid): return self.is_jid_in_roster(entity_jid) @property def versioning(self): """True if server support roster versioning""" return (NS_ROSTER_VER, "ver") in self.parent.xmlstream.features @property def roster_cache(self): """Cache of roster from storage This property return a new PersistentDict on each call, it must be loaded manually if necessary """ return persistent.PersistentDict(NS_ROSTER_VER, self.parent.profile) def _register_item(self, item): """Register item in local cache item must be already registered in self._jids before this method is called @param item (RosterIem): item added """ log.debug("registering item: {}".format(item.entity.full())) if item.entity.resource: log.warning( "Received a roster item with a resource, this is not common but not " "restricted by RFC 6121, this case may be not well tested." ) if not item.subscriptionTo: if not item.subscriptionFrom: log.info( _("There's no subscription between you and [{}]!").format( item.entity.full() ) ) else: log.info(_("You are not subscribed to [{}]!").format(item.entity.full())) if not item.subscriptionFrom: log.info(_("[{}] is not subscribed to you!").format(item.entity.full())) for group in item.groups: self._groups.setdefault(group, set()).add(item.entity) @defer.inlineCallbacks def _cache_roster(self, version): """Serialise local roster and save it to storage @param version(unicode): version of roster in local cache """ roster_cache = self.roster_cache yield roster_cache.clear() roster_cache[ROSTER_VER_KEY] = version for roster_jid, roster_item in self._jids.items(): roster_jid_s = roster_jid.full() roster_item_elt = roster_item.toElement().toXml() roster_cache[roster_jid_s] = roster_item_elt @defer.inlineCallbacks def resync(self): """Ask full roster to resync database this should not be necessary, but may be used if user suspsect roster to be somehow corrupted """ roster_cache = self.roster_cache yield roster_cache.clear() self._jids.clear() self._groups.clear() yield self.request_roster() @defer.inlineCallbacks def request_roster(self): """Ask the server for Roster list""" if self.versioning: log.info(_("our server support roster versioning, we use it")) roster_cache = self.roster_cache yield roster_cache.load() try: version = roster_cache[ROSTER_VER_KEY] except KeyError: log.info(_("no roster in cache, we start fresh")) # u"" means we use versioning without valid roster in cache version = "" else: log.info(_("We have roster v{version} in cache").format(version=version)) # we deserialise cached roster to our local cache for roster_jid_s, roster_item_elt_s in roster_cache.items(): if roster_jid_s == ROSTER_VER_KEY: continue roster_jid = jid.JID(roster_jid_s) roster_item_elt = generic.parseXml(roster_item_elt_s.encode("utf-8")) roster_item = xmppim.RosterItem.fromElement(roster_item_elt) self._jids[roster_jid] = roster_item self._register_item(roster_item) else: log.warning(_("our server doesn't support roster versioning")) version = None log.debug("requesting roster") roster = yield self.getRoster(version=version) if roster is None: log.debug( "empty roster result received, we'll get roster item with roster " "pushes" ) else: # a full roster is received self._groups.clear() self._jids = roster for item in roster.values(): if not item.subscriptionTo and not item.subscriptionFrom and not item.ask: # XXX: current behaviour: we don't want contact in our roster list # if there is no presence subscription # may change in the future log.info( "Removing contact {} from roster because there is no presence " "subscription".format(item.jid) ) self.removeItem(item.entity) # FIXME: to be checked else: self._register_item(item) yield self._cache_roster(roster.version) if not self.got_roster.called: # got_roster may already be called if we use resync() self.got_roster.callback(None) def removeItem(self, to_jid): """Remove a contact from roster list @param to_jid: a JID instance @return: Deferred """ return xmppim.RosterClientProtocol.removeItem(self, to_jid) def get_attributes(self, item): """Return dictionary of attributes as used in bridge from a RosterItem @param item: RosterItem @return: dictionary of attributes """ item_attr = { "to": str(item.subscriptionTo), "from": str(item.subscriptionFrom), "ask": str(item.ask), } if item.name: item_attr["name"] = item.name return item_attr def setReceived(self, request): item = request.item entity = item.entity log.info(_("adding {entity} to roster").format(entity=entity.full())) if request.version is not None: # we update the cache in storage roster_cache = self.roster_cache roster_cache[entity.full()] = item.toElement().toXml() roster_cache[ROSTER_VER_KEY] = request.version try: # update the cache for the groups the contact has been removed from left_groups = set(self._jids[entity].groups).difference(item.groups) for group in left_groups: jids_set = self._groups[group] jids_set.remove(entity) if not jids_set: del self._groups[group] except KeyError: pass # no previous item registration (or it's been cleared) self._jids[entity] = item self._register_item(item) self.host.bridge.contact_new( entity.full(), self.get_attributes(item), list(item.groups), self.parent.profile, ) def removeReceived(self, request): entity = request.item.entity log.info(_("removing {entity} from roster").format(entity=entity.full())) if request.version is not None: # we update the cache in storage roster_cache = self.roster_cache try: del roster_cache[request.item.entity.full()] except KeyError: # because we don't use load(), cache won't have the key, but it # will be deleted from storage anyway pass roster_cache[ROSTER_VER_KEY] = request.version # we first remove item from local cache (self._groups and self._jids) try: item = self._jids.pop(entity) except KeyError: log.error( "Received a roster remove event for an item not in cache ({})".format( entity ) ) return for group in item.groups: try: jids_set = self._groups[group] jids_set.remove(entity) if not jids_set: del self._groups[group] except KeyError: log.warning( f"there is no cache for the group [{group}] of the removed roster " f"item [{entity}]" ) # then we send the bridge signal self.host.bridge.contact_deleted(entity.full(), self.parent.profile) def get_groups(self): """Return a list of groups""" return list(self._groups.keys()) def get_item(self, entity_jid): """Return RosterItem for a given jid @param entity_jid(jid.JID): jid of the contact @return(RosterItem, None): RosterItem instance None if contact is not in cache """ return self._jids.get(entity_jid, None) def get_jids(self): """Return all jids of the roster""" return list(self._jids.keys()) def is_jid_in_roster(self, entity_jid): """Return True if jid is in roster""" if not isinstance(entity_jid, jid.JID): raise exceptions.InternalError( f"a JID is expected, not {type(entity_jid)}: {entity_jid!r}" ) return entity_jid in self._jids def is_subscribed_from(self, entity_jid: jid.JID) -> bool: """Return True if entity is authorised to see our presence""" try: item = self._jids[entity_jid.userhostJID()] except KeyError: return False return item.subscriptionFrom def is_subscribed_to(self, entity_jid: jid.JID) -> bool: """Return True if we are subscribed to entity""" try: item = self._jids[entity_jid.userhostJID()] except KeyError: return False return item.subscriptionTo def get_items(self): """Return all items of the roster""" return list(self._jids.values()) def get_jids_from_group(self, group): try: return self._groups[group] except KeyError: raise exceptions.UnknownGroupError(group) def get_jids_set(self, type_, groups=None): """Helper method to get a set of jids @param type_(unicode): one of: C.ALL: get all jids from roster C.GROUP: get jids from groups (listed in "groups") @groups(list[unicode]): list of groups used if type_==C.GROUP @return (set(jid.JID)): set of selected jids """ if type_ == C.ALL and groups is not None: raise ValueError("groups must not be set for {} type".format(C.ALL)) if type_ == C.ALL: return set(self.get_jids()) elif type_ == C.GROUP: jids = set() for group in groups: jids.update(self.get_jids_from_group(group)) return jids else: raise ValueError("Unexpected type_ {}".format(type_)) def get_nick(self, entity_jid): """Return a nick name for an entity return nick choosed by user if available else return user part of entity_jid """ item = self.get_item(entity_jid) if item is None: return entity_jid.user else: return item.name or entity_jid.user class SatPresenceProtocol(xmppim.PresenceClientProtocol): def __init__(self, host): xmppim.PresenceClientProtocol.__init__(self) self.host = host @property def client(self): return self.parent def send(self, obj): presence_d = defer.succeed(None) if not self.host.trigger.point("Presence send", self.parent, obj, presence_d): return presence_d.addCallback(lambda __: super(SatPresenceProtocol, self).send(obj)) return presence_d def availableReceived(self, entity, show=None, statuses=None, priority=0): if not statuses: statuses = {} if None in statuses: # we only want string keys statuses[C.PRESENCE_STATUSES_DEFAULT] = statuses.pop(None) if not self.host.trigger.point( "presence_received", self.parent, entity, show, priority, statuses ): return self.host.memory.set_presence_status( entity, show or "", int(priority), statuses, self.parent.profile ) # now it's time to notify frontends self.host.bridge.presence_update( entity.full(), show or "", int(priority), statuses, self.parent.profile ) def unavailableReceived(self, entity, statuses=None): log.debug( _("presence update for [%(entity)s] (unavailable, statuses=%(statuses)s)") % {"entity": entity, C.PRESENCE_STATUSES: statuses} ) if not statuses: statuses = {} if None in statuses: # we only want string keys statuses[C.PRESENCE_STATUSES_DEFAULT] = statuses.pop(None) if not self.host.trigger.point( "presence_received", self.parent, entity, C.PRESENCE_UNAVAILABLE, 0, statuses, ): return # now it's time to notify frontends # if the entity is not known yet in this session or is already unavailable, # there is no need to send an unavailable signal try: presence = self.host.memory.get_entity_datum(self.client, entity, "presence") except (KeyError, exceptions.UnknownEntityError): # the entity has not been seen yet in this session pass else: if presence.show != C.PRESENCE_UNAVAILABLE: self.host.bridge.presence_update( entity.full(), C.PRESENCE_UNAVAILABLE, 0, statuses, self.parent.profile, ) self.host.memory.set_presence_status( entity, C.PRESENCE_UNAVAILABLE, 0, statuses, self.parent.profile ) def available(self, entity=None, show=None, statuses=None, priority=None): """Set a presence and statuses. @param entity (jid.JID): entity @param show (unicode): value in ('unavailable', '', 'away', 'xa', 'chat', 'dnd') @param statuses (dict{unicode: unicode}): multilingual statuses with the entry key beeing a language code on 2 characters or "default". """ if priority is None: try: priority = int( self.host.memory.param_get_a( "Priority", "Connection", profile_key=self.parent.profile ) ) except ValueError: priority = 0 if statuses is None: statuses = {} # default for us is None for wokkel # so we must temporarily switch to wokkel's convention... if C.PRESENCE_STATUSES_DEFAULT in statuses: statuses[None] = statuses.pop(C.PRESENCE_STATUSES_DEFAULT) presence_elt = xmppim.AvailablePresence(entity, show, statuses, priority) # ... before switching back if None in statuses: statuses["default"] = statuses.pop(None) if not self.host.trigger.point("presence_available", presence_elt, self.parent): return return self.send(presence_elt) @defer.inlineCallbacks def subscribed(self, entity): yield self.parent.roster.got_roster xmppim.PresenceClientProtocol.subscribed(self, entity) self.host.memory.del_waiting_sub(entity.userhost(), self.parent.profile) item = self.parent.roster.get_item(entity) if ( not item or not item.subscriptionTo ): # we automatically subscribe to 'to' presence log.debug(_('sending automatic "from" subscription request')) self.subscribe(entity) def unsubscribed(self, entity): xmppim.PresenceClientProtocol.unsubscribed(self, entity) self.host.memory.del_waiting_sub(entity.userhost(), self.parent.profile) def subscribedReceived(self, entity): log.debug(_("subscription approved for [%s]") % entity.userhost()) self.host.bridge.subscribe("subscribed", entity.userhost(), self.parent.profile) def unsubscribedReceived(self, entity): log.debug(_("unsubscription confirmed for [%s]") % entity.userhost()) self.host.bridge.subscribe("unsubscribed", entity.userhost(), self.parent.profile) @defer.inlineCallbacks def subscribeReceived(self, entity): log.debug(_("subscription request from [%s]") % entity.userhost()) yield self.parent.roster.got_roster item = self.parent.roster.get_item(entity) if item and item.subscriptionTo: # We automatically accept subscription if we are already subscribed to # contact presence log.debug(_("sending automatic subscription acceptance")) self.subscribed(entity) else: self.host.memory.add_waiting_sub( "subscribe", entity.userhost(), self.parent.profile ) self.host.bridge.subscribe( "subscribe", entity.userhost(), self.parent.profile ) @defer.inlineCallbacks def unsubscribeReceived(self, entity): log.debug(_("unsubscription asked for [%s]") % entity.userhost()) yield self.parent.roster.got_roster item = self.parent.roster.get_item(entity) if item and item.subscriptionFrom: # we automatically remove contact log.debug(_("automatic contact deletion")) self.host.contact_del(entity, self.parent.profile) self.host.bridge.subscribe("unsubscribe", entity.userhost(), self.parent.profile) @implementer(iwokkel.IDisco) class SatDiscoProtocol(disco.DiscoClientProtocol): def __init__(self, host): disco.DiscoClientProtocol.__init__(self) def getDiscoInfo(self, requestor, target, nodeIdentifier=""): # those features are implemented in Wokkel (or sat_tmp.wokkel) # and thus are always available return [ disco.DiscoFeature(NS_X_DATA), disco.DiscoFeature(NS_XML_ELEMENT), disco.DiscoFeature(NS_DISCO_INFO), ] def getDiscoItems(self, requestor, target, nodeIdentifier=""): return [] class SatFallbackHandler(generic.FallbackHandler): def __init__(self, host): generic.FallbackHandler.__init__(self) def iqFallback(self, iq): if iq.handled is True: return log.debug("iqFallback: xml = [%s]" % (iq.toXml())) generic.FallbackHandler.iqFallback(self, iq) class SatVersionHandler(generic.VersionHandler): def getDiscoInfo(self, requestor, target, node): # XXX: We need to work around wokkel's behaviour (namespace not added if there # is a node) as it cause issues with XEP-0115 & PEP (XEP-0163): there is a # node when server ask for disco info, and not when we generate the key, so # the hash is used with different disco features, and when the server (seen # on ejabberd) generate its own hash for security check it reject our # features (resulting in e.g. no notification on PEP) return generic.VersionHandler.getDiscoInfo(self, requestor, target, None) @implementer(iwokkel.IDisco) class SatIdentityHandler(XMPPHandler): """Manage disco Identity of SàT.""" # TODO: dynamic identity update (see docstring). Note that a XMPP entity can have # several identities def getDiscoInfo(self, requestor, target, nodeIdentifier=""): return self.parent.identities def getDiscoItems(self, requestor, target, nodeIdentifier=""): return []