Mercurial > libervia-backend
view libervia/backend/core/main.py @ 4130:02f0adc745c6
core: notifications implementation, first draft:
add a new table for notifications, and methods/bridge methods to manipulate them.
author | Goffi <goffi@goffi.org> |
---|---|
date | Mon, 16 Oct 2023 17:29:31 +0200 |
parents | 10b6ad569157 |
children | 3b95704ab777 |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia: an XMPP client # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import sys import os.path import uuid import hashlib import copy from pathlib import Path from typing import Optional, List, Tuple, Dict from wokkel.data_form import Option from libervia import backend from libervia.backend.core.i18n import _, D_, language_switch from libervia.backend.core import patches patches.apply() from twisted.application import service from twisted.internet import defer from twisted.words.protocols.jabber import jid from twisted.internet import reactor from wokkel.xmppim import RosterItem from libervia.backend.core import xmpp from libervia.backend.core import exceptions from libervia.backend.core.core_types import SatXMPPEntity from libervia.backend.core.log import getLogger from libervia.backend.core.constants import Const as C from libervia.backend.memory import memory from libervia.backend.memory import cache from libervia.backend.memory import encryption from libervia.backend.tools import async_trigger as trigger from libervia.backend.tools import utils from libervia.backend.tools import image from libervia.backend.tools.common import dynamic_import from libervia.backend.tools.common import regex from libervia.backend.tools.common import data_format from libervia.backend.stdui import ui_contact_list, ui_profile_manager import libervia.backend.plugins log = getLogger(__name__) class LiberviaBackend(service.Service): def _init(self): # we don't use __init__ to avoid doule initialisation with twistd # this _init is called in startService log.info(f"{C.APP_NAME} {self.full_version}") self._cb_map = {} # map from callback_id to callbacks # dynamic menus. key: callback_id, value: menu data (dictionnary) self._menus = {} self._menus_paths = {} # path to id. key: (menu_type, lower case tuple of path), # value: menu id self.initialised = defer.Deferred() self.profiles = {} self.plugins = {} # map for short name to whole namespace, # extended by plugins with register_namespace self.ns_map = { "x-data": xmpp.NS_X_DATA, "disco#info": xmpp.NS_DISCO_INFO, } self.memory = memory.Memory(self) # trigger are used to change Libervia behaviour self.trigger = ( trigger.TriggerManager() ) bridge_name = ( os.getenv("LIBERVIA_BRIDGE_NAME") or self.memory.config_get("", "bridge", "dbus") ) bridge_module = dynamic_import.bridge(bridge_name) if bridge_module is None: log.error(f"Can't find bridge module of name {bridge_name}") sys.exit(1) log.info(f"using {bridge_name} bridge") try: self.bridge = bridge_module.bridge() except exceptions.BridgeInitError: log.exception("bridge can't be initialised, can't start Libervia Backend") sys.exit(1) defer.ensureDeferred(self._post_init()) @property def version(self): """Return the short version of Libervia""" return C.APP_VERSION @property def full_version(self): """Return the full version of Libervia In developement mode, release name and extra data are returned too """ version = self.version if version.split(".")[-1] == "dev0": # we are in debug version, we add extra data try: return self._version_cache except AttributeError: self._version_cache = "{} « {} » ({})".format( version, C.APP_RELEASE_NAME, utils.get_repository_data(backend) ) return self._version_cache else: return version @property def bridge_name(self): return os.path.splitext(os.path.basename(self.bridge.__file__))[0] async def _post_init(self): try: bridge_pi = self.bridge.post_init except AttributeError: pass else: try: await bridge_pi() except Exception: log.exception("Could not initialize bridge") # because init is not complete at this stage, we use callLater reactor.callLater(0, self.stop) return self.bridge.register_method("ready_get", lambda: self.initialised) self.bridge.register_method("version_get", lambda: self.full_version) self.bridge.register_method("features_get", self.features_get) self.bridge.register_method("profile_name_get", self.memory.get_profile_name) self.bridge.register_method("profiles_list_get", self.memory.get_profiles_list) self.bridge.register_method("entity_data_get", self.memory._get_entity_data) self.bridge.register_method("entities_data_get", self.memory._get_entities_data) self.bridge.register_method("profile_create", self.memory.create_profile) self.bridge.register_method("profile_delete_async", self.memory.profile_delete_async) self.bridge.register_method("profile_start_session", self.memory.start_session) self.bridge.register_method( "profile_is_session_started", self.memory._is_session_started ) self.bridge.register_method("profile_set_default", self.memory.profile_set_default) self.bridge.register_method("connect", self._connect) self.bridge.register_method("disconnect", self.disconnect) self.bridge.register_method("contact_get", self._contact_get) self.bridge.register_method("contacts_get", self.contacts_get) self.bridge.register_method("contacts_get_from_group", self.contacts_get_from_group) self.bridge.register_method("main_resource_get", self.memory._get_main_resource) self.bridge.register_method( "presence_statuses_get", self.memory._get_presence_statuses ) self.bridge.register_method("sub_waiting_get", self.memory.sub_waiting_get) self.bridge.register_method("message_send", self._message_send) self.bridge.register_method("message_encryption_start", self._message_encryption_start) self.bridge.register_method("message_encryption_stop", self._message_encryption_stop) self.bridge.register_method("message_encryption_get", self._message_encryption_get) self.bridge.register_method("encryption_namespace_get", self._encryption_namespace_get) self.bridge.register_method("encryption_plugins_get", self._encryption_plugins_get) self.bridge.register_method("encryption_trust_ui_get", self._encryption_trust_ui_get) self.bridge.register_method("config_get", self._get_config) self.bridge.register_method("param_set", self.param_set) self.bridge.register_method("param_get_a", self.memory.get_string_param_a) self.bridge.register_method("private_data_get", self.memory._private_data_get) self.bridge.register_method("private_data_set", self.memory._private_data_set) self.bridge.register_method("private_data_delete", self.memory._private_data_delete) self.bridge.register_method("param_get_a_async", self.memory.async_get_string_param_a) self.bridge.register_method( "params_values_from_category_get_async", self.memory._get_params_values_from_category, ) self.bridge.register_method("param_ui_get", self.memory._get_params_ui) self.bridge.register_method( "params_categories_get", self.memory.params_categories_get ) self.bridge.register_method("params_register_app", self.memory.params_register_app) self.bridge.register_method("history_get", self.memory._history_get) self.bridge.register_method("presence_set", self._set_presence) self.bridge.register_method("subscription", self.subscription) self.bridge.register_method("contact_add", self._add_contact) self.bridge.register_method("contact_update", self._update_contact) self.bridge.register_method("contact_del", self._del_contact) self.bridge.register_method("roster_resync", self._roster_resync) self.bridge.register_method("is_connected", self.is_connected) self.bridge.register_method("action_launch", self._action_launch) self.bridge.register_method("actions_get", self.actions_get) self.bridge.register_method("progress_get", self._progress_get) self.bridge.register_method("progress_get_all", self._progress_get_all) self.bridge.register_method("menus_get", self.get_menus) self.bridge.register_method("menu_help_get", self.get_menu_help) self.bridge.register_method("menu_launch", self._launch_menu) self.bridge.register_method("disco_infos", self.memory.disco._disco_infos) self.bridge.register_method("disco_items", self.memory.disco._disco_items) self.bridge.register_method("disco_find_by_features", self._find_by_features) self.bridge.register_method("params_template_save", self.memory.save_xml) self.bridge.register_method("params_template_load", self.memory.load_xml) self.bridge.register_method("session_infos_get", self.get_session_infos) self.bridge.register_method("devices_infos_get", self._get_devices_infos) self.bridge.register_method("namespaces_get", self.get_namespaces) self.bridge.register_method("image_check", self._image_check) self.bridge.register_method("image_resize", self._image_resize) self.bridge.register_method("image_generate_preview", self._image_generate_preview) self.bridge.register_method("image_convert", self._image_convert) self.bridge.register_method("notification_add", self.memory._add_notification) self.bridge.register_method("notifications_get", self.memory._get_notifications) self.bridge.register_method("notification_delete", self.memory._delete_notification) self.bridge.register_method("notifications_expired_clean", self.memory._notifications_expired_clean) await self.memory.initialise() self.common_cache = cache.Cache(self, None) log.info(_("Memory initialised")) try: self._import_plugins() ui_contact_list.ContactList(self) ui_profile_manager.ProfileManager(self) except Exception as e: log.error(f"Could not initialize backend: {e}") sys.exit(1) self._add_base_menus() self.initialised.callback(None) log.info(_("Backend is ready")) # profile autoconnection must be done after self.initialised is called because # start_session waits for it. autoconnect_dict = await self.memory.storage.get_ind_param_values( category='Connection', name='autoconnect_backend', ) profiles_autoconnect = [p for p, v in autoconnect_dict.items() if C.bool(v)] if not self.trigger.point("profilesAutoconnect", profiles_autoconnect): return if profiles_autoconnect: log.info(D_( "Following profiles will be connected automatically: {profiles}" ).format(profiles= ', '.join(profiles_autoconnect))) connect_d_list = [] for profile in profiles_autoconnect: connect_d_list.append(defer.ensureDeferred(self.connect(profile))) if connect_d_list: results = await defer.DeferredList(connect_d_list) for idx, (success, result) in enumerate(results): if not success: profile = profiles_autoconnect[0] log.warning( _("Can't autoconnect profile {profile}: {reason}").format( profile = profile, reason = result) ) def _add_base_menus(self): """Add base menus""" encryption.EncryptionHandler._import_menus(self) def _unimport_plugin(self, plugin_path): """remove a plugin from sys.modules if it is there""" try: del sys.modules[plugin_path] except KeyError: pass def _import_plugins(self): """import all plugins found in plugins directory""" # FIXME: module imported but cancelled should be deleted # TODO: make this more generic and reusable in tools.common # FIXME: should use imp # TODO: do not import all plugins if no needed: component plugins are not needed # if we just use a client, and plugin blacklisting should be possible in # libervia.conf plugins_path = Path(libervia.backend.plugins.__file__).parent plugins_to_import = {} # plugins we still have to import for plug_path in plugins_path.glob("plugin_*"): if plug_path.is_dir(): init_path = plug_path / f"__init__.{C.PLUGIN_EXT}" if not init_path.exists(): log.warning( f"{plug_path} doesn't appear to be a package, can't load it") continue plug_name = plug_path.name elif plug_path.is_file(): if plug_path.suffix != f".{C.PLUGIN_EXT}": continue plug_name = plug_path.stem else: log.warning( f"{plug_path} is not a file or a dir, ignoring it") continue if not plug_name.isidentifier(): log.warning( f"{plug_name!r} is not a valid name for a plugin, ignoring it") continue plugin_path = f"libervia.backend.plugins.{plug_name}" try: __import__(plugin_path) except exceptions.MissingModule as e: self._unimport_plugin(plugin_path) log.warning( "Can't import plugin [{path}] because of an unavailale third party " "module:\n{msg}".format( path=plugin_path, msg=e ) ) continue except exceptions.CancelError as e: log.info( "Plugin [{path}] cancelled its own import: {msg}".format( path=plugin_path, msg=e ) ) self._unimport_plugin(plugin_path) continue except Exception: import traceback log.error( _("Can't import plugin [{path}]:\n{error}").format( path=plugin_path, error=traceback.format_exc() ) ) self._unimport_plugin(plugin_path) continue mod = sys.modules[plugin_path] plugin_info = mod.PLUGIN_INFO import_name = plugin_info["import_name"] plugin_modes = plugin_info["modes"] = set( plugin_info.setdefault("modes", C.PLUG_MODE_DEFAULT) ) if not plugin_modes.intersection(C.PLUG_MODE_BOTH): log.error( f"Can't import plugin at {plugin_path}, invalid {C.PI_MODES!r} " f"value: {plugin_modes!r}" ) continue # if the plugin is an entry point, it must work in component mode if plugin_info["type"] == C.PLUG_TYPE_ENTRY_POINT: # if plugin is an entrypoint, we cache it if C.PLUG_MODE_COMPONENT not in plugin_modes: log.error( _( "{type} type must be used with {mode} mode, ignoring plugin" ).format(type=C.PLUG_TYPE_ENTRY_POINT, mode=C.PLUG_MODE_COMPONENT) ) self._unimport_plugin(plugin_path) continue if import_name in plugins_to_import: log.error( _( "Name conflict for import name [{import_name}], can't import " "plugin [{name}]" ).format(**plugin_info) ) continue plugins_to_import[import_name] = (plugin_path, mod, plugin_info) while True: try: self._import_plugins_from_dict(plugins_to_import) except ImportError: pass if not plugins_to_import: break def _import_plugins_from_dict( self, plugins_to_import, import_name=None, optional=False ): """Recursively import and their dependencies in the right order @param plugins_to_import(dict): key=import_name and values=(plugin_path, module, plugin_info) @param import_name(unicode, None): name of the plugin to import as found in PLUGIN_INFO['import_name'] @param optional(bool): if False and plugin is not found, an ImportError exception is raised """ if import_name in self.plugins: log.debug("Plugin {} already imported, passing".format(import_name)) return if not import_name: import_name, (plugin_path, mod, plugin_info) = plugins_to_import.popitem() else: if not import_name in plugins_to_import: if optional: log.warning( _("Recommended plugin not found: {}").format(import_name) ) return msg = "Dependency not found: {}".format(import_name) log.error(msg) raise ImportError(msg) plugin_path, mod, plugin_info = plugins_to_import.pop(import_name) dependencies = plugin_info.setdefault("dependencies", []) recommendations = plugin_info.setdefault("recommendations", []) for to_import in dependencies + recommendations: if to_import not in self.plugins: log.debug( "Recursively import dependency of [%s]: [%s]" % (import_name, to_import) ) try: self._import_plugins_from_dict( plugins_to_import, to_import, to_import not in dependencies ) except ImportError as e: log.warning( _("Can't import plugin {name}: {error}").format( name=plugin_info["name"], error=e ) ) if optional: return raise e log.info("importing plugin: {}".format(plugin_info["name"])) # we instanciate the plugin here try: self.plugins[import_name] = getattr(mod, plugin_info["main"])(self) except Exception as e: log.exception( f"Can't load plugin \"{plugin_info['name']}\", ignoring it: {e}" ) if optional: return raise ImportError("Error during initiation") if C.bool(plugin_info.get(C.PI_HANDLER, C.BOOL_FALSE)): self.plugins[import_name].is_handler = True else: self.plugins[import_name].is_handler = False # we keep metadata as a Class attribute self.plugins[import_name]._info = plugin_info # TODO: test xmppclient presence and register handler parent def plugins_unload(self): """Call unload method on every loaded plugin, if exists @return (D): A deferred which return None when all method have been called """ # TODO: in the futur, it should be possible to hot unload a plugin # pluging depending on the unloaded one should be unloaded too # for now, just a basic call on plugin.unload is done defers_list = [] for plugin in self.plugins.values(): try: unload = plugin.unload except AttributeError: continue else: defers_list.append(utils.as_deferred(unload)) return defers_list def _connect(self, profile_key, password="", options=None): profile = self.memory.get_profile_name(profile_key) return defer.ensureDeferred(self.connect(profile, password, options)) async def connect( self, profile, password="", options=None, max_retries=C.XMPP_MAX_RETRIES): """Connect a profile (i.e. connect client.component to XMPP server) Retrieve the individual parameters, authenticate the profile and initiate the connection to the associated XMPP server. @param profile: %(doc_profile)s @param password (string): the Libervia profile password @param options (dict): connection options. Key can be: - @param max_retries (int): max number of connection retries @return (D(bool)): - True if the XMPP connection was already established - False if the XMPP connection has been initiated (it may still fail) @raise exceptions.PasswordError: Profile password is wrong """ if options is None: options = {} await self.memory.start_session(password, profile) if self.is_connected(profile): log.info(_("already connected !")) return True if self.memory.is_component(profile): await xmpp.SatXMPPComponent.start_connection(self, profile, max_retries) else: await xmpp.SatXMPPClient.start_connection(self, profile, max_retries) return False def disconnect(self, profile_key): """disconnect from jabber server""" # FIXME: client should not be deleted if only disconnected # it shoud be deleted only when session is finished if not self.is_connected(profile_key): # is_connected is checked here and not on client # because client is deleted when session is ended log.info(_("not connected !")) return defer.succeed(None) client = self.get_client(profile_key) return client.entity_disconnect() def features_get(self, profile_key=C.PROF_KEY_NONE): """Get available features Return list of activated plugins and plugin specific data @param profile_key: %(doc_profile_key)s C.PROF_KEY_NONE can be used to have general plugins data (i.e. not profile dependent) @return (dict)[Deferred]: features data where: - key is plugin import name, present only for activated plugins - value is a an other dict, when meaning is specific to each plugin. this dict is return by plugin's getFeature method. If this method doesn't exists, an empty dict is returned. """ try: # FIXME: there is no method yet to check profile session # as soon as one is implemented, it should be used here self.get_client(profile_key) except KeyError: log.warning("Requesting features for a profile outside a session") profile_key = C.PROF_KEY_NONE except exceptions.ProfileNotSetError: pass features = [] for import_name, plugin in self.plugins.items(): try: features_d = utils.as_deferred(plugin.features_get, profile_key) except AttributeError: features_d = defer.succeed({}) features.append(features_d) d_list = defer.DeferredList(features) def build_features(result, import_names): assert len(result) == len(import_names) ret = {} for name, (success, data) in zip(import_names, result): if success: ret[name] = data else: log.warning( "Error while getting features for {name}: {failure}".format( name=name, failure=data ) ) ret[name] = {} return ret d_list.addCallback(build_features, list(self.plugins.keys())) return d_list def _contact_get(self, entity_jid_s, profile_key): client = self.get_client(profile_key) entity_jid = jid.JID(entity_jid_s) return defer.ensureDeferred(self.get_contact(client, entity_jid)) async def get_contact(self, client, entity_jid): # we want to be sure that roster has been received await client.roster.got_roster item = client.roster.get_item(entity_jid) if item is None: raise exceptions.NotFound(f"{entity_jid} is not in roster!") return (client.roster.get_attributes(item), list(item.groups)) def contacts_get(self, profile_key): client = self.get_client(profile_key) def got_roster(__): ret = [] for item in client.roster.get_items(): # we get all items for client's roster # and convert them to expected format attr = client.roster.get_attributes(item) # we use full() and not userhost() because jid with resources are allowed # in roster, even if it's not common. ret.append([item.entity.full(), attr, list(item.groups)]) return ret return client.roster.got_roster.addCallback(got_roster) def contacts_get_from_group(self, group, profile_key): client = self.get_client(profile_key) return [jid_.full() for jid_ in client.roster.get_jids_from_group(group)] def purge_entity(self, profile): """Remove reference to a profile client/component and purge cache the garbage collector can then free the memory """ try: del self.profiles[profile] except KeyError: log.error(_("Trying to remove reference to a client not referenced")) else: self.memory.purge_profile_session(profile) def startService(self): self._init() log.info("Salut à toi ô mon frère !") def stopService(self): log.info("Salut aussi à Rantanplan") return self.plugins_unload() def run(self): log.debug(_("running app")) reactor.run() def stop(self): log.debug(_("stopping app")) reactor.stop() ## Misc methods ## def get_jid_n_stream(self, profile_key): """Convenient method to get jid and stream from profile key @return: tuple (jid, xmlstream) from profile, can be None""" # TODO: deprecate this method (get_client is enough) profile = self.memory.get_profile_name(profile_key) if not profile or not self.profiles[profile].is_connected(): return (None, None) return (self.profiles[profile].jid, self.profiles[profile].xmlstream) def get_client(self, profile_key: str) -> xmpp.SatXMPPClient: """Convenient method to get client from profile key @return: the client @raise exceptions.ProfileKeyUnknown: the profile or profile key doesn't exist @raise exceptions.NotFound: client is not available This happen if profile has not been used yet """ profile = self.memory.get_profile_name(profile_key) if not profile: raise exceptions.ProfileKeyUnknown try: return self.profiles[profile] except KeyError: raise exceptions.NotFound(profile_key) def get_clients(self, profile_key): """Convenient method to get list of clients from profile key Manage list through profile_key like C.PROF_KEY_ALL @param profile_key: %(doc_profile_key)s @return: list of clients """ if not profile_key: raise exceptions.DataError(_("profile_key must not be empty")) try: profile = self.memory.get_profile_name(profile_key, True) except exceptions.ProfileUnknownError: return [] if profile == C.PROF_KEY_ALL: return list(self.profiles.values()) elif profile[0] == "@": # only profile keys can start with "@" raise exceptions.ProfileKeyUnknown return [self.profiles[profile]] def _get_config(self, section, name): """Get the main configuration option @param section: section of the config file (None or '' for DEFAULT) @param name: name of the option @return: unicode representation of the option """ return str(self.memory.config_get(section, name, "")) def log_errback(self, failure_, msg=_("Unexpected error: {failure_}")): """Generic errback logging @param msg(unicode): error message ("failure_" key will be use for format) can be used as last errback to show unexpected error """ log.error(msg.format(failure_=failure_)) return failure_ # namespaces def register_namespace(self, short_name, namespace): """associate a namespace to a short name""" if short_name in self.ns_map: raise exceptions.ConflictError("this short name is already used") log.debug(f"registering namespace {short_name} => {namespace}") self.ns_map[short_name] = namespace def get_namespaces(self): return self.ns_map def get_namespace(self, short_name): try: return self.ns_map[short_name] except KeyError: raise exceptions.NotFound("namespace {short_name} is not registered" .format(short_name=short_name)) def get_session_infos(self, profile_key): """compile interesting data on current profile session""" client = self.get_client(profile_key) data = { "jid": client.jid.full(), "started": str(int(client.started)) } return defer.succeed(data) def _get_devices_infos(self, bare_jid, profile_key): client = self.get_client(profile_key) if not bare_jid: bare_jid = None d = defer.ensureDeferred(self.get_devices_infos(client, bare_jid)) d.addCallback(lambda data: data_format.serialise(data)) return d async def get_devices_infos(self, client, bare_jid=None): """compile data on an entity devices @param bare_jid(jid.JID, None): bare jid of entity to check None to use client own jid @return (list[dict]): list of data, one item per resource. Following keys can be set: - resource(str): resource name """ own_jid = client.jid.userhostJID() if bare_jid is None: bare_jid = own_jid else: bare_jid = jid.JID(bare_jid) resources = self.memory.get_all_resources(client, bare_jid) if bare_jid == own_jid: # our own jid is not stored in memory's cache resources.add(client.jid.resource) ret_data = [] for resource in resources: res_jid = copy.copy(bare_jid) res_jid.resource = resource cache_data = self.memory.entity_data_get(client, res_jid) res_data = { "resource": resource, } try: presence = cache_data['presence'] except KeyError: pass else: res_data['presence'] = { "show": presence.show, "priority": presence.priority, "statuses": presence.statuses, } disco = await self.get_disco_infos(client, res_jid) for (category, type_), name in disco.identities.items(): identities = res_data.setdefault('identities', []) identities.append({ "name": name, "category": category, "type": type_, }) ret_data.append(res_data) return ret_data # images def _image_check(self, path): report = image.check(self, path) return data_format.serialise(report) def _image_resize(self, path, width, height): d = image.resize(path, (width, height)) d.addCallback(lambda new_image_path: str(new_image_path)) return d def _image_generate_preview(self, path, profile_key): client = self.get_client(profile_key) d = defer.ensureDeferred(self.image_generate_preview(client, Path(path))) d.addCallback(lambda preview_path: str(preview_path)) return d async def image_generate_preview(self, client, path): """Helper method to generate in cache a preview of an image @param path(Path): path to the image @return (Path): path to the generated preview """ report = image.check(self, path, max_size=(300, 300)) if not report['too_large']: # in the unlikely case that image is already smaller than a preview preview_path = path else: # we use hash as id, to re-use potentially existing preview path_hash = hashlib.sha256(str(path).encode()).hexdigest() uid = f"{path.stem}_{path_hash}_preview" filename = f"{uid}{path.suffix.lower()}" metadata = client.cache.get_metadata(uid=uid) if metadata is not None: preview_path = metadata['path'] else: with client.cache.cache_data( source='HOST_PREVIEW', uid=uid, filename=filename) as cache_f: preview_path = await image.resize( path, new_size=report['recommended_size'], dest=cache_f ) return preview_path def _image_convert(self, source, dest, extra, profile_key): client = self.get_client(profile_key) if profile_key else None source = Path(source) dest = None if not dest else Path(dest) extra = data_format.deserialise(extra) d = defer.ensureDeferred(self.image_convert(client, source, dest, extra)) d.addCallback(lambda dest_path: str(dest_path)) return d async def image_convert(self, client, source, dest=None, extra=None): """Helper method to convert an image from one format to an other @param client(SatClient, None): client to use for caching this parameter is only used if dest is None if client is None, common cache will be used insted of profile cache @param source(Path): path to the image to convert @param dest(None, Path, file): where to save the converted file - None: use a cache file (uid generated from hash of source) file will be converted to PNG - Path: path to the file to create/overwrite - file: a file object which must be opened for writing in binary mode @param extra(dict, None): conversion options see [image.convert] for details @return (Path): path to the converted image @raise ValueError: an issue happened with source of dest """ if not source.is_file: raise ValueError(f"Source file {source} doesn't exist!") if dest is None: # we use hash as id, to re-use potentially existing conversion path_hash = hashlib.sha256(str(source).encode()).hexdigest() uid = f"{source.stem}_{path_hash}_convert_png" filename = f"{uid}.png" if client is None: cache = self.common_cache else: cache = client.cache metadata = cache.get_metadata(uid=uid) if metadata is not None: # there is already a conversion for this image in cache return metadata['path'] else: with cache.cache_data( source='HOST_IMAGE_CONVERT', uid=uid, filename=filename) as cache_f: converted_path = await image.convert( source, dest=cache_f, extra=extra ) return converted_path else: return await image.convert(source, dest, extra) # local dirs def get_local_path( self, client: Optional[SatXMPPEntity], dir_name: str, *extra_path: str, component: bool = False, ) -> Path: """Retrieve path for local data if path doesn't exist, it will be created @param client: client instance if not none, client.profile will be used as last path element @param dir_name: name of the main path directory @param *extra_path: extra path element(s) to use @param component: if True, path will be prefixed with C.COMPONENTS_DIR @return: path """ local_dir = self.memory.config_get("", "local_dir") if not local_dir: raise exceptions.InternalError("local_dir must be set") path_elts = [] if component: path_elts.append(C.COMPONENTS_DIR) path_elts.append(regex.path_escape(dir_name)) if extra_path: path_elts.extend([regex.path_escape(p) for p in extra_path]) if client is not None: path_elts.append(regex.path_escape(client.profile)) local_path = Path(*path_elts) local_path.mkdir(0o700, parents=True, exist_ok=True) return local_path ## Client management ## def param_set(self, name, value, category, security_limit, profile_key): """set wanted paramater and notice observers""" self.memory.param_set(name, value, category, security_limit, profile_key) def is_connected(self, profile_key): """Return connection status of profile @param profile_key: key_word or profile name to determine profile name @return: True if connected """ profile = self.memory.get_profile_name(profile_key) if not profile: log.error(_("asking connection status for a non-existant profile")) raise exceptions.ProfileUnknownError(profile_key) if profile not in self.profiles: return False return self.profiles[profile].is_connected() ## Encryption ## def register_encryption_plugin(self, *args, **kwargs): return encryption.EncryptionHandler.register_plugin(*args, **kwargs) def _message_encryption_start(self, to_jid_s, namespace, replace=False, profile_key=C.PROF_KEY_NONE): client = self.get_client(profile_key) to_jid = jid.JID(to_jid_s) return defer.ensureDeferred( client.encryption.start(to_jid, namespace or None, replace)) def _message_encryption_stop(self, to_jid_s, profile_key=C.PROF_KEY_NONE): client = self.get_client(profile_key) to_jid = jid.JID(to_jid_s) return defer.ensureDeferred( client.encryption.stop(to_jid)) def _message_encryption_get(self, to_jid_s, profile_key=C.PROF_KEY_NONE): client = self.get_client(profile_key) to_jid = jid.JID(to_jid_s) session_data = client.encryption.getSession(to_jid) return client.encryption.get_bridge_data(session_data) def _encryption_namespace_get(self, name): return encryption.EncryptionHandler.get_ns_from_name(name) def _encryption_plugins_get(self): plugins = encryption.EncryptionHandler.getPlugins() ret = [] for p in plugins: ret.append({ "name": p.name, "namespace": p.namespace, "priority": p.priority, "directed": p.directed, }) return data_format.serialise(ret) def _encryption_trust_ui_get(self, to_jid_s, namespace, profile_key): client = self.get_client(profile_key) to_jid = jid.JID(to_jid_s) d = defer.ensureDeferred( client.encryption.get_trust_ui(to_jid, namespace=namespace or None)) d.addCallback(lambda xmlui: xmlui.toXml()) return d ## XMPP methods ## def _message_send( self, to_jid_s, message, subject=None, mess_type="auto", extra_s="", profile_key=C.PROF_KEY_NONE): client = self.get_client(profile_key) to_jid = jid.JID(to_jid_s) return client.sendMessage( to_jid, message, subject, mess_type, data_format.deserialise(extra_s) ) def _set_presence(self, to="", show="", statuses=None, profile_key=C.PROF_KEY_NONE): return self.presence_set(jid.JID(to) if to else None, show, statuses, profile_key) def presence_set(self, to_jid=None, show="", statuses=None, profile_key=C.PROF_KEY_NONE): """Send our presence information""" if statuses is None: statuses = {} profile = self.memory.get_profile_name(profile_key) assert profile priority = int( self.memory.param_get_a("Priority", "Connection", profile_key=profile) ) self.profiles[profile].presence.available(to_jid, show, statuses, priority) # XXX: FIXME: temporary fix to work around openfire 3.7.0 bug (presence is not # broadcasted to generating resource) if "" in statuses: statuses[C.PRESENCE_STATUSES_DEFAULT] = statuses.pop("") self.bridge.presence_update( self.profiles[profile].jid.full(), show, int(priority), statuses, profile ) def subscription(self, subs_type, raw_jid, profile_key): """Called to manage subscription @param subs_type: subsciption type (cf RFC 3921) @param raw_jid: unicode entity's jid @param profile_key: profile""" profile = self.memory.get_profile_name(profile_key) assert profile to_jid = jid.JID(raw_jid) log.debug( _("subsciption request [%(subs_type)s] for %(jid)s") % {"subs_type": subs_type, "jid": to_jid.full()} ) if subs_type == "subscribe": self.profiles[profile].presence.subscribe(to_jid) elif subs_type == "subscribed": self.profiles[profile].presence.subscribed(to_jid) elif subs_type == "unsubscribe": self.profiles[profile].presence.unsubscribe(to_jid) elif subs_type == "unsubscribed": self.profiles[profile].presence.unsubscribed(to_jid) def _add_contact(self, to_jid_s, profile_key): return self.contact_add(jid.JID(to_jid_s), profile_key) def contact_add(self, to_jid, profile_key): """Add a contact in roster list""" profile = self.memory.get_profile_name(profile_key) assert profile # presence is sufficient, as a roster push will be sent according to # RFC 6121 §3.1.2 self.profiles[profile].presence.subscribe(to_jid) def _update_contact(self, to_jid_s, name, groups, profile_key): client = self.get_client(profile_key) return self.contact_update(client, jid.JID(to_jid_s), name, groups) def contact_update(self, client, to_jid, name, groups): """update a contact in roster list""" roster_item = RosterItem(to_jid) roster_item.name = name or u'' roster_item.groups = set(groups) if not self.trigger.point("roster_update", client, roster_item): return return client.roster.setItem(roster_item) def _del_contact(self, to_jid_s, profile_key): return self.contact_del(jid.JID(to_jid_s), profile_key) def contact_del(self, to_jid, profile_key): """Remove contact from roster list""" profile = self.memory.get_profile_name(profile_key) assert profile self.profiles[profile].presence.unsubscribe(to_jid) # is not asynchronous return self.profiles[profile].roster.removeItem(to_jid) def _roster_resync(self, profile_key): client = self.get_client(profile_key) return client.roster.resync() ## Discovery ## # discovery methods are shortcuts to self.memory.disco # the main difference with client.disco is that self.memory.disco manage cache def hasFeature(self, *args, **kwargs): return self.memory.disco.hasFeature(*args, **kwargs) def check_feature(self, *args, **kwargs): return self.memory.disco.check_feature(*args, **kwargs) def check_features(self, *args, **kwargs): return self.memory.disco.check_features(*args, **kwargs) def has_identity(self, *args, **kwargs): return self.memory.disco.has_identity(*args, **kwargs) def get_disco_infos(self, *args, **kwargs): return self.memory.disco.get_infos(*args, **kwargs) def getDiscoItems(self, *args, **kwargs): return self.memory.disco.get_items(*args, **kwargs) def find_service_entity(self, *args, **kwargs): return self.memory.disco.find_service_entity(*args, **kwargs) def find_service_entities(self, *args, **kwargs): return self.memory.disco.find_service_entities(*args, **kwargs) def find_features_set(self, *args, **kwargs): return self.memory.disco.find_features_set(*args, **kwargs) def _find_by_features(self, namespaces, identities, bare_jids, service, roster, own_jid, local_device, profile_key): client = self.get_client(profile_key) identities = [tuple(i) for i in identities] if identities else None return defer.ensureDeferred(self.find_by_features( client, namespaces, identities, bare_jids, service, roster, own_jid, local_device)) async def find_by_features( self, client: SatXMPPEntity, namespaces: List[str], identities: Optional[List[Tuple[str, str]]]=None, bare_jids: bool=False, service: bool=True, roster: bool=True, own_jid: bool=True, local_device: bool=False ) -> Tuple[ Dict[jid.JID, Tuple[str, str, str]], Dict[jid.JID, Tuple[str, str, str]], Dict[jid.JID, Tuple[str, str, str]] ]: """Retrieve all services or contacts managing a set a features @param namespaces: features which must be handled @param identities: if not None or empty, only keep those identities tuple must be (category, type) @param bare_jids: retrieve only bare_jids if True if False, retrieve full jid of connected devices @param service: if True return service from our server @param roster: if True, return entities in roster full jid of all matching resources available will be returned @param own_jid: if True, return profile's jid resources @param local_device: if True, return profile's jid local resource (i.e. client.jid) @return: found entities in a tuple with: - service entities - own entities - roster entities Each element is a dict mapping from jid to a tuple with category, type and name of the entity """ assert isinstance(namespaces, list) if not identities: identities = None if not namespaces and not identities: raise exceptions.DataError( "at least one namespace or one identity must be set" ) found_service = {} found_own = {} found_roster = {} if service: services_jids = await self.find_features_set(client, namespaces) services_jids = list(services_jids) # we need a list to map results below services_infos = await defer.DeferredList( [self.get_disco_infos(client, service_jid) for service_jid in services_jids] ) for idx, (success, infos) in enumerate(services_infos): service_jid = services_jids[idx] if not success: log.warning( _("Can't find features for service {service_jid}, ignoring") .format(service_jid=service_jid.full())) continue if (identities is not None and not set(infos.identities.keys()).issuperset(identities)): continue found_identities = [ (cat, type_, name or "") for (cat, type_), name in infos.identities.items() ] found_service[service_jid.full()] = found_identities to_find = [] if own_jid: to_find.append((found_own, [client.jid.userhostJID()])) if roster: to_find.append((found_roster, client.roster.get_jids())) for found, jids in to_find: full_jids = [] disco_defers = [] for jid_ in jids: if jid_.resource: if bare_jids: continue resources = [jid_.resource] else: if bare_jids: resources = [None] else: try: resources = self.memory.get_available_resources(client, jid_) except exceptions.UnknownEntityError: continue if not resources and jid_ == client.jid.userhostJID() and own_jid: # small hack to avoid missing our own resource when this # method is called at the very beginning of the session # and our presence has not been received yet resources = [client.jid.resource] for resource in resources: full_jid = jid.JID(tuple=(jid_.user, jid_.host, resource)) if full_jid == client.jid and not local_device: continue full_jids.append(full_jid) disco_defers.append(self.get_disco_infos(client, full_jid)) d_list = defer.DeferredList(disco_defers) # XXX: 10 seconds may be too low for slow connections (e.g. mobiles) # but for discovery, that's also the time the user will wait the first time # before seing the page, if something goes wrong. d_list.addTimeout(10, reactor) infos_data = await d_list for idx, (success, infos) in enumerate(infos_data): full_jid = full_jids[idx] if not success: log.warning( _("Can't retrieve {full_jid} infos, ignoring") .format(full_jid=full_jid.full())) continue if infos.features.issuperset(namespaces): if identities is not None and not set( infos.identities.keys() ).issuperset(identities): continue found_identities = [ (cat, type_, name or "") for (cat, type_), name in infos.identities.items() ] found[full_jid.full()] = found_identities return (found_service, found_own, found_roster) ## Generic HMI ## def _kill_action(self, keep_id, client): log.debug("Killing action {} for timeout".format(keep_id)) client.actions[keep_id] def action_new( self, action_data, security_limit=C.NO_SECURITY_LIMIT, keep_id=None, profile=C.PROF_KEY_NONE, ): """Shortcut to bridge.action_new which generate an id and keep for retrieval @param action_data(dict): action data (see bridge documentation) @param security_limit: %(doc_security_limit)s @param keep_id(None, unicode): if not None, used to keep action for differed retrieval. The value will be used as callback_id, be sure to use an unique value. Action will be deleted after 30 min. @param profile: %(doc_profile)s """ if keep_id is not None: id_ = keep_id client = self.get_client(profile) action_timer = reactor.callLater(60 * 30, self._kill_action, keep_id, client) client.actions[keep_id] = (action_data, id_, security_limit, action_timer) else: id_ = str(uuid.uuid4()) self.bridge.action_new( data_format.serialise(action_data), id_, security_limit, profile ) def actions_get(self, profile): """Return current non answered actions @param profile: %(doc_profile)s """ client = self.get_client(profile) return [ (data_format.serialise(action_tuple[0]), *action_tuple[1:-1]) for action_tuple in client.actions.values() ] def register_progress_cb( self, progress_id, callback, metadata=None, profile=C.PROF_KEY_NONE ): """Register a callback called when progress is requested for id""" if metadata is None: metadata = {} client = self.get_client(profile) if progress_id in client._progress_cb: raise exceptions.ConflictError("Progress ID is not unique !") client._progress_cb[progress_id] = (callback, metadata) def remove_progress_cb(self, progress_id, profile): """Remove a progress callback""" client = self.get_client(profile) try: del client._progress_cb[progress_id] except KeyError: log.error(_("Trying to remove an unknow progress callback")) def _progress_get(self, progress_id, profile): data = self.progress_get(progress_id, profile) return {k: str(v) for k, v in data.items()} def progress_get(self, progress_id, profile): """Return a dict with progress information @param progress_id(unicode): unique id of the progressing element @param profile: %(doc_profile)s @return (dict): data with the following keys: 'position' (int): current possition 'size' (int): end_position if id doesn't exists (may be a finished progression), and empty dict is returned """ client = self.get_client(profile) try: data = client._progress_cb[progress_id][0](progress_id, profile) except KeyError: data = {} return data def _progress_get_all(self, profile_key): progress_all = self.progress_get_all(profile_key) for profile, progress_dict in progress_all.items(): for progress_id, data in progress_dict.items(): for key, value in data.items(): data[key] = str(value) return progress_all def progress_get_all_metadata(self, profile_key): """Return all progress metadata at once @param profile_key: %(doc_profile)s if C.PROF_KEY_ALL is used, all progress metadata from all profiles are returned @return (dict[dict[dict]]): a dict which map profile to progress_dict progress_dict map progress_id to progress_data progress_metadata is the same dict as sent by [progress_started] """ clients = self.get_clients(profile_key) progress_all = {} for client in clients: profile = client.profile progress_dict = {} progress_all[profile] = progress_dict for ( progress_id, (__, progress_metadata), ) in client._progress_cb.items(): progress_dict[progress_id] = progress_metadata return progress_all def progress_get_all(self, profile_key): """Return all progress status at once @param profile_key: %(doc_profile)s if C.PROF_KEY_ALL is used, all progress status from all profiles are returned @return (dict[dict[dict]]): a dict which map profile to progress_dict progress_dict map progress_id to progress_data progress_data is the same dict as returned by [progress_get] """ clients = self.get_clients(profile_key) progress_all = {} for client in clients: profile = client.profile progress_dict = {} progress_all[profile] = progress_dict for progress_id, (progress_cb, __) in client._progress_cb.items(): progress_dict[progress_id] = progress_cb(progress_id, profile) return progress_all def register_callback(self, callback, *args, **kwargs): """Register a callback. @param callback(callable): method to call @param kwargs: can contain: with_data(bool): True if the callback use the optional data dict force_id(unicode): id to avoid generated id. Can lead to name conflict, avoid if possible one_shot(bool): True to delete callback once it has been called @return: id of the registered callback """ callback_id = kwargs.pop("force_id", None) if callback_id is None: callback_id = str(uuid.uuid4()) else: if callback_id in self._cb_map: raise exceptions.ConflictError(_("id already registered")) self._cb_map[callback_id] = (callback, args, kwargs) if "one_shot" in kwargs: # One Shot callback are removed after 30 min def purge_callback(): try: self.remove_callback(callback_id) except KeyError: pass reactor.callLater(1800, purge_callback) return callback_id def remove_callback(self, callback_id): """ Remove a previously registered callback @param callback_id: id returned by [register_callback] """ log.debug("Removing callback [%s]" % callback_id) del self._cb_map[callback_id] def _action_launch( self, callback_id: str, data_s: str, profile_key: str ) -> defer.Deferred: d = self.launch_callback( callback_id, data_format.deserialise(data_s), profile_key ) d.addCallback(data_format.serialise) return d def launch_callback( self, callback_id: str, data: Optional[dict] = None, profile_key: str = C.PROF_KEY_NONE ) -> defer.Deferred: """Launch a specific callback @param callback_id: id of the action (callback) to launch @param data: optional data @profile_key: %(doc_profile_key)s @return: a deferred which fire a dict where key can be: - xmlui: a XMLUI need to be displayed - validated: if present, can be used to launch a callback, it can have the values - C.BOOL_TRUE - C.BOOL_FALSE """ # FIXME: is it possible to use this method without profile connected? If not, # client must be used instead of profile_key # FIXME: security limit need to be checked here try: client = self.get_client(profile_key) except exceptions.NotFound: # client is not available yet profile = self.memory.get_profile_name(profile_key) if not profile: raise exceptions.ProfileUnknownError( _("trying to launch action with a non-existant profile") ) else: profile = client.profile # we check if the action is kept, and remove it try: action_tuple = client.actions[callback_id] except KeyError: pass else: action_tuple[-1].cancel() # the last item is the action timer del client.actions[callback_id] try: callback, args, kwargs = self._cb_map[callback_id] except KeyError: raise exceptions.DataError("Unknown callback id {}".format(callback_id)) if kwargs.get("with_data", False): if data is None: raise exceptions.DataError("Required data for this callback is missing") args, kwargs = ( list(args)[:], kwargs.copy(), ) # we don't want to modify the original (kw)args args.insert(0, data) kwargs["profile"] = profile del kwargs["with_data"] if kwargs.pop("one_shot", False): self.remove_callback(callback_id) return utils.as_deferred(callback, *args, **kwargs) # Menus management def _get_menu_canonical_path(self, path): """give canonical form of path canonical form is a tuple of the path were every element is stripped and lowercase @param path(iterable[unicode]): untranslated path to menu @return (tuple[unicode]): canonical form of path """ return tuple((p.lower().strip() for p in path)) def import_menu(self, path, callback, security_limit=C.NO_SECURITY_LIMIT, help_string="", type_=C.MENU_GLOBAL): r"""register a new menu for frontends @param path(iterable[unicode]): path to go to the menu (category/subcategory/.../item) (e.g.: ("File", "Open")) /!\ use D_() instead of _() for translations (e.g. (D_("File"), D_("Open"))) untranslated/lower case path can be used to identity a menu, for this reason it must be unique independently of case. @param callback(callable): method to be called when menuitem is selected, callable or a callback id (string) as returned by [register_callback] @param security_limit(int): %(doc_security_limit)s /!\ security_limit MUST be added to data in launch_callback if used #TODO @param help_string(unicode): string used to indicate what the menu do (can be show as a tooltip). /!\ use D_() instead of _() for translations @param type(unicode): one of: - C.MENU_GLOBAL: classical menu, can be shown in a menubar on top (e.g. something like File/Open) - C.MENU_ROOM: like a global menu, but only shown in multi-user chat menu_data must contain a "room_jid" data - C.MENU_SINGLE: like a global menu, but only shown in one2one chat menu_data must contain a "jid" data - C.MENU_JID_CONTEXT: contextual menu, used with any jid (e.g.: ad hoc commands, jid is already filled) menu_data must contain a "jid" data - C.MENU_ROSTER_JID_CONTEXT: like JID_CONTEXT, but restricted to jids in roster. menu_data must contain a "room_jid" data - C.MENU_ROSTER_GROUP_CONTEXT: contextual menu, used with group (e.g.: publish microblog, group is already filled) menu_data must contain a "group" data @return (unicode): menu_id (same as callback_id) """ if callable(callback): callback_id = self.register_callback(callback, with_data=True) elif isinstance(callback, str): # The callback is already registered callback_id = callback try: callback, args, kwargs = self._cb_map[callback_id] except KeyError: raise exceptions.DataError("Unknown callback id") kwargs["with_data"] = True # we have to be sure that we use extra data else: raise exceptions.DataError("Unknown callback type") for menu_data in self._menus.values(): if menu_data["path"] == path and menu_data["type"] == type_: raise exceptions.ConflictError( _("A menu with the same path and type already exists") ) path_canonical = self._get_menu_canonical_path(path) menu_key = (type_, path_canonical) if menu_key in self._menus_paths: raise exceptions.ConflictError( "this menu path is already used: {path} ({menu_key})".format( path=path_canonical, menu_key=menu_key ) ) menu_data = { "path": tuple(path), "path_canonical": path_canonical, "security_limit": security_limit, "help_string": help_string, "type": type_, } self._menus[callback_id] = menu_data self._menus_paths[menu_key] = callback_id return callback_id def get_menus(self, language="", security_limit=C.NO_SECURITY_LIMIT): """Return all menus registered @param language: language used for translation, or empty string for default @param security_limit: %(doc_security_limit)s @return: array of tuple with: - menu id (same as callback_id) - menu type - raw menu path (array of strings) - translated menu path - extra (dict(unicode, unicode)): extra data where key can be: - icon: name of the icon to use (TODO) - help_url: link to a page with more complete documentation (TODO) """ ret = [] for menu_id, menu_data in self._menus.items(): type_ = menu_data["type"] path = menu_data["path"] menu_security_limit = menu_data["security_limit"] if security_limit != C.NO_SECURITY_LIMIT and ( menu_security_limit == C.NO_SECURITY_LIMIT or menu_security_limit > security_limit ): continue language_switch(language) path_i18n = [_(elt) for elt in path] language_switch() extra = {} # TODO: manage extra data like icon ret.append((menu_id, type_, path, path_i18n, extra)) return ret def _launch_menu(self, menu_type, path, data=None, security_limit=C.NO_SECURITY_LIMIT, profile_key=C.PROF_KEY_NONE): client = self.get_client(profile_key) return self.launch_menu(client, menu_type, path, data, security_limit) def launch_menu(self, client, menu_type, path, data=None, security_limit=C.NO_SECURITY_LIMIT): """launch action a menu action @param menu_type(unicode): type of menu to launch @param path(iterable[unicode]): canonical path of the menu @params data(dict): menu data @raise NotFound: this path is not known """ # FIXME: manage security_limit here # defaut security limit should be high instead of C.NO_SECURITY_LIMIT canonical_path = self._get_menu_canonical_path(path) menu_key = (menu_type, canonical_path) try: callback_id = self._menus_paths[menu_key] except KeyError: raise exceptions.NotFound( "Can't find menu {path} ({menu_type})".format( path=canonical_path, menu_type=menu_type ) ) return self.launch_callback(callback_id, data, client.profile) def get_menu_help(self, menu_id, language=""): """return the help string of the menu @param menu_id: id of the menu (same as callback_id) @param language: language used for translation, or empty string for default @param return: translated help """ try: menu_data = self._menus[menu_id] except KeyError: raise exceptions.DataError("Trying to access an unknown menu") language_switch(language) help_string = _(menu_data["help_string"]) language_switch() return help_string