Mercurial > libervia-backend
diff libervia/backend/core/xmpp.py @ 4270:0d7bb4df2343
Reformatted code base using black.
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 19 Jun 2024 18:44:57 +0200 |
parents | 2417ad1d0f23 |
children | 0f953ce5f0a8 |
line wrap: on
line diff
--- a/libervia/backend/core/xmpp.py Tue Jun 18 12:06:45 2024 +0200 +++ b/libervia/backend/core/xmpp.py Wed Jun 19 18:44:57 2024 +0200 @@ -91,6 +91,7 @@ class SatXMPPEntity(core_types.SatXMPPEntity): """Common code for Client and Component""" + # profile is added there when start_connection begins and removed when it is finished profiles_connecting = set() @@ -102,9 +103,11 @@ clientConnectionFailed_ori = factory.clientConnectionFailed clientConnectionLost_ori = factory.clientConnectionLost factory.clientConnectionFailed = partial( - self.connection_terminated, term_type="failed", cb=clientConnectionFailed_ori) + self.connection_terminated, term_type="failed", cb=clientConnectionFailed_ori + ) factory.clientConnectionLost = partial( - self.connection_terminated, term_type="lost", cb=clientConnectionLost_ori) + self.connection_terminated, term_type="lost", cb=clientConnectionLost_ori + ) factory.maxRetries = max_retries factory.maxDelay = 30 @@ -115,11 +118,11 @@ self.host_app = host_app self.cache = cache.Cache(host_app, profile) self.mess_id2uid = {} # map from message id to uid used in history. - # Key: (full_jid, message_id) Value: uid + # Key: (full_jid, message_id) Value: uid # this Deferred fire when entity is connected self.conn_deferred = defer.Deferred() self._progress_cb = {} # callback called when a progress is requested - # (key = progress id) + # (key = progress id) self.actions = {} # used to keep track of actions for retrieval (key = action_id) self.encryption = encryption.EncryptionHandler(self) @@ -148,9 +151,7 @@ # profile_connecting/profile_connected methods handling - timer = connection_timer[plugin] = { - "total": 0 - } + timer = connection_timer[plugin] = {"total": 0} # profile connecting is called right now (before actually starting client) connecting_cb = getattr(plugin, "profile_connecting", None) if connecting_cb is not None: @@ -187,9 +188,7 @@ @staticmethod async def _run_profile_connected( - callback: Callable, - entity: "SatXMPPEntity", - timer: Dict[str, float] + callback: Callable, entity: "SatXMPPEntity", timer: Dict[str, float] ) -> None: connected_start = time.time() await utils.as_deferred(callback, entity) @@ -217,16 +216,15 @@ ) except ValueError: log.debug(_("Can't parse port value, using default value")) - port = ( - None - ) # will use default value 5222 or be retrieved from a DNS SRV record + port = None # will use default value 5222 or be retrieved from a DNS SRV record password = await host.memory.param_get_a_async( "Password", "Connection", profile_key=profile ) entity_jid_s = await host.memory.param_get_a_async( - "JabberID", "Connection", profile_key=profile) + "JabberID", "Connection", profile_key=profile + ) entity_jid = jid.JID(entity_jid_s) if not entity_jid.resource and not cls.is_component and entity_jid.user: @@ -235,32 +233,43 @@ # reconnection. we only do that for client and if there is a user part, to # let server decide for anonymous login resource_dict = await host.memory.storage.get_privates( - "core:xmpp", ["resource"] , profile=profile) + "core:xmpp", ["resource"], profile=profile + ) try: resource = resource_dict["resource"] except KeyError: resource = f"{C.APP_NAME_FILE}.{shortuuid.uuid()}" await host.memory.storage.set_private_value( - "core:xmpp", "resource", resource, profile=profile) + "core:xmpp", "resource", resource, profile=profile + ) - log.info(_("We'll use the stable resource {resource}").format( - resource=resource)) + log.info( + _("We'll use the stable resource {resource}").format( + resource=resource + ) + ) entity_jid.resource = resource if profile in host.profiles: if host.profiles[profile].is_connected(): raise exceptions.InternalError( f"There is already a connected profile of name {profile!r} in " - f"host") - log.debug( - "removing unconnected profile {profile!r}") + f"host" + ) + log.debug("removing unconnected profile {profile!r}") del host.profiles[profile] entity = host.profiles[profile] = cls( - host, profile, entity_jid, password, - host.memory.param_get_a(C.FORCE_SERVER_PARAM, "Connection", - profile_key=profile) or None, - port, max_retries, + host, + profile, + entity_jid, + password, + host.memory.param_get_a( + C.FORCE_SERVER_PARAM, "Connection", profile_key=profile ) + or None, + port, + max_retries, + ) await entity.encryption.load_sessions() @@ -315,7 +324,7 @@ plugins_by_timer = sorted( connection_timer, key=lambda p: connection_timer[p]["total"], - reverse=True + reverse=True, ) # total is the addition of all connecting and connected, doesn't really # reflect the real loading time as connected are launched in a @@ -441,8 +450,9 @@ # we save connector because it may be deleted when connection will be dropped # if reconnection is disabled self._saved_connector = connector - if reason is not None and not isinstance(reason.value, - internet_error.ConnectionDone): + if reason is not None and not isinstance( + reason.value, internet_error.ConnectionDone + ): try: reason_str = str(reason.value) except Exception: @@ -496,8 +506,7 @@ def _connected(self, xs): send_hooks = [] receive_hooks = [] - self.host_app.trigger.point( - "stream_hooks", self, receive_hooks, send_hooks) + self.host_app.trigger.point("stream_hooks", self, receive_hooks, send_hooks) for hook in receive_hooks: xs.add_hook(C.STREAM_HOOK_RECEIVE, hook) for hook in send_hooks: @@ -529,11 +538,14 @@ try: if err.value.args[0][0][2] == "certificate verify failed": err = exceptions.InvalidCertificate( - _("Your server certificate is not valid " - "(its identity can't be checked).\n\n" - "This should never happen and may indicate that " - "somebody is trying to spy on you.\n" - "Please contact your server administrator.")) + _( + "Your server certificate is not valid " + "(its identity can't be checked).\n\n" + "This should never happen and may indicate that " + "somebody is trying to spy on you.\n" + "Please contact your server administrator." + ) + ) self.factory.stopTrying() try: # with invalid certificate, we should not retry to connect @@ -615,7 +627,7 @@ def generate_message_xml( self, data: core_types.MessageData, - post_xml_treatments: Optional[defer.Deferred] = None + post_xml_treatments: Optional[defer.Deferred] = None, ) -> core_types.MessageData: """Generate <message/> stanza from message data @@ -710,14 +722,22 @@ # This is intented for e2e encryption which doesn't do full stanza # encryption (e.g. OTR) # This trigger point can't cancel the method - await self.host_app.trigger.async_point("send_message_data", self, mess_data, - triggers_no_cancel=True) + await self.host_app.trigger.async_point( + "send_message_data", self, mess_data, triggers_no_cancel=True + ) await self.a_send(mess_data["xml"]) return mess_data def sendMessage( - self, to_jid, message, subject=None, mess_type="auto", extra=None, uid=None, - no_trigger=False): + self, + to_jid, + message, + subject=None, + mess_type="auto", + extra=None, + uid=None, + no_trigger=False, + ): r"""Send a message to an entity @param to_jid(jid.JID): destinee of the message @@ -797,18 +817,27 @@ ): return defer.succeed(None) - log.debug(_("Sending message (type {type}, to {to})") - .format(type=data["type"], to=to_jid.full())) + log.debug( + _("Sending message (type {type}, to {to})").format( + type=data["type"], to=to_jid.full() + ) + ) - pre_xml_treatments.addCallback(lambda __: self.generate_message_xml(data, post_xml_treatments)) + pre_xml_treatments.addCallback( + lambda __: self.generate_message_xml(data, post_xml_treatments) + ) pre_xml_treatments.addCallback(lambda __: post_xml_treatments) pre_xml_treatments.addErrback(self._cancel_error_trap) post_xml_treatments.addCallback( lambda __: defer.ensureDeferred(self.send_message_data(data)) ) if send_only: - log.debug(_("Triggers, storage and echo have been inhibited by the " - "'send_only' parameter")) + log.debug( + _( + "Triggers, storage and echo have been inhibited by the " + "'send_only' parameter" + ) + ) else: self.add_post_xml_callbacks(post_xml_treatments) post_xml_treatments.addErrback(self._cancel_error_trap) @@ -823,7 +852,8 @@ def is_message_printable(self, mess_data): """Return True if a message contain payload to show in frontends""" return ( - mess_data["message"] or mess_data["subject"] + mess_data["message"] + or mess_data["subject"] or mess_data["extra"].get(C.KEY_ATTACHMENTS) or mess_data["type"] == C.MESS_TYPE_INFO ) @@ -849,10 +879,16 @@ def message_get_bridge_args(self, data): """Generate args to use with bridge from data dict""" - return (data["uid"], data["timestamp"], data["from"].full(), - data["to"].full(), data["message"], data["subject"], - data["type"], data_format.serialise(data["extra"])) - + return ( + data["uid"], + data["timestamp"], + data["from"].full(), + data["to"].full(), + data["message"], + data["subject"], + data["type"], + data_format.serialise(data["extra"]), + ) def message_send_to_bridge(self, data): """Send message to bridge, so frontends can display it @@ -869,8 +905,7 @@ # We send back the message, so all frontends are aware of it self.host_app.bridge.message_new( - *self.message_get_bridge_args(data), - profile=self.profile + *self.message_get_bridge_args(data), profile=self.profile ) else: log.warning(_("No message found")) @@ -900,8 +935,16 @@ trigger_suffix = "" is_component = False - def __init__(self, host_app, profile, user_jid, password, host=None, - port=C.XMPP_C2S_PORT, max_retries=C.XMPP_MAX_RETRIES): + def __init__( + self, + host_app, + profile, + user_jid, + password, + host=None, + port=C.XMPP_C2S_PORT, + max_retries=C.XMPP_MAX_RETRIES, + ): # XXX: DNS SRV records are checked when the host is not specified. # If no SRV record is found, the host is directly extracted from the JID. self.started = time.time() @@ -933,12 +976,14 @@ host_data = None if host_data is not None: log.info( - "using {host}:{port} for host {host_ori} as requested in config" - .format(host_ori=user_jid.host, host=host, port=port) + "using {host}:{port} for host {host_ori} as requested in config".format( + host_ori=user_jid.host, host=host, port=port + ) ) self.check_certificate = host_app.memory.param_get_a( - "check_certificate", "Connection", profile_key=profile) + "check_certificate", "Connection", profile_key=profile + ) if self.check_certificate: tls_required, configurationForTLS = True, None @@ -947,18 +992,26 @@ configurationForTLS = ssl.CertificateOptions(trustRoot=None) wokkel_client.XMPPClient.__init__( - self, user_jid, password, host or None, port or C.XMPP_C2S_PORT, - tls_required=tls_required, configurationForTLS=configurationForTLS + self, + user_jid, + password, + host or None, + port or C.XMPP_C2S_PORT, + tls_required=tls_required, + configurationForTLS=configurationForTLS, ) SatXMPPEntity.__init__(self, host_app, profile, max_retries) if not self.check_certificate: - msg = (_("Certificate validation is deactivated, this is unsecure and " + msg = _( + "Certificate validation is deactivated, this is unsecure and " "somebody may be spying on you. If you have no good reason to disable " - "certificate validation, please activate \"Check certificate\" in your " - "settings in \"Connection\" tab.")) - xml_tools.quick_note(host_app, self, msg, _("Security notice"), - level = C.XMLUI_DATA_LVL_WARNING) + 'certificate validation, please activate "Check certificate" in your ' + 'settings in "Connection" tab.' + ) + xml_tools.quick_note( + host_app, self, msg, _("Security notice"), level=C.XMLUI_DATA_LVL_WARNING + ) @property def server_jid(self): @@ -1002,10 +1055,7 @@ post_xml_treatments.addCallback(self.message_send_to_bridge) def feedback( - self, - to_jid: jid.JID, - message: str, - extra: Optional[ExtraDict] = None + self, to_jid: jid.JID, message: str, extra: Optional[ExtraDict] = None ) -> None: """Send message to frontends @@ -1045,16 +1095,24 @@ """ trigger_suffix = ( - "Component" - ) # used for to distinguish some trigger points set in SatXMPPEntity + "Component" # used for to distinguish some trigger points set in SatXMPPEntity + ) is_component = True # XXX: set to True from entry plugin to keep messages in history for sent messages sendHistory = False # XXX: same as sendHistory but for received messaged receiveHistory = False - def __init__(self, host_app, profile, component_jid, password, host=None, port=None, - max_retries=C.XMPP_MAX_RETRIES): + def __init__( + self, + host_app, + profile, + component_jid, + password, + host=None, + port=None, + max_retries=C.XMPP_MAX_RETRIES, + ): self.started = time.time() if port is None: port = C.XMPP_COMPONENT_PORT @@ -1178,12 +1236,12 @@ @param to_jid: destination JID of the request """ try: - unescape = self.host_app.plugins['XEP-0106'].unescape + unescape = self.host_app.plugins["XEP-0106"].unescape except KeyError: raise exceptions.MissingPlugin("Plugin XEP-0106 is needed to retrieve owner") else: user = unescape(to_jid.user) - if '@' in user: + if "@" in user: # a full jid is specified return jid.JID(user) else: @@ -1199,7 +1257,7 @@ @param iq_elt: IQ stanza sent from the requested @return: owner and peer JIDs """ - to_jid = jid.JID(iq_elt['to']) + to_jid = jid.JID(iq_elt["to"]) if to_jid.user: owner = self.get_owner_from_jid(to_jid) else: @@ -1227,7 +1285,7 @@ def __init__(self, host): xmppim.MessageProtocol.__init__(self) self.host = host - self.messages_queue = defer.DeferredQueue() + self.messages_queue = defer.DeferredQueue() def setHandlerParent(self, parent): super().setHandlerParent(parent) @@ -1252,23 +1310,31 @@ @return(dict): message data """ if message_elt.name != "message": - log.warning(_( - "parse_message used with a non <message/> stanza, ignoring: {xml}" - .format(xml=message_elt.toXml()))) + log.warning( + _( + "parse_message used with a non <message/> stanza, ignoring: {xml}".format( + xml=message_elt.toXml() + ) + ) + ) return {} if message_elt.uri == None: # xmlns may be None when wokkel element parsing strip out root namespace self.normalize_ns(message_elt, None) elif message_elt.uri != C.NS_CLIENT: - log.warning(_( - "received <message> with a wrong namespace: {xml}" - .format(xml=message_elt.toXml()))) + log.warning( + _( + "received <message> with a wrong namespace: {xml}".format( + xml=message_elt.toXml() + ) + ) + ) client = self.parent - if not message_elt.hasAttribute('to'): - message_elt['to'] = client.jid.full() + if not message_elt.hasAttribute("to"): + message_elt["to"] = client.jid.full() message = {} subject = {} @@ -1306,8 +1372,11 @@ except AttributeError: # message_elt._received_timestamp should have been set in onMessage # but if parse_message is called directly, it can be missing - log.debug("missing received timestamp for {message_elt}".format( - message_elt=message_elt)) + log.debug( + "missing received timestamp for {message_elt}".format( + message_elt=message_elt + ) + ) received_timestamp = time.time() try: @@ -1321,10 +1390,9 @@ if parsed_delay.sender: data["delay_sender"] = parsed_delay.sender.full() - self.host.trigger.point("message_parse", client, message_elt, data) + self.host.trigger.point("message_parse", client, message_elt, data) return data - def onMessage(self, message_elt: domish.Element) -> None: message_elt._received_timestamp = time.time() self.messages_queue.put(message_elt) @@ -1347,9 +1415,7 @@ log.exception(f"Can't process message {message_elt.toXml()}") def _on_processing_timeout( - self, - message_elt: domish.Element, - async_point_d: defer.Deferred + self, message_elt: domish.Element, async_point_d: defer.Deferred ) -> None: log.error( "Processing of following message took too long, cancelling:" @@ -1358,9 +1424,7 @@ async_point_d.cancel() async def process_message( - self, - client: SatXMPPEntity, - message_elt: domish.Element + self, client: SatXMPPEntity, message_elt: domish.Element ) -> None: # TODO: handle threads if not "from" in message_elt.attributes: @@ -1372,16 +1436,15 @@ # plugin can add their treatments to this deferred post_treat = defer.Deferred() - async_point_d = defer.ensureDeferred(self.host.trigger.async_point( - "message_received", client, message_elt, post_treat - )) + async_point_d = defer.ensureDeferred( + self.host.trigger.async_point( + "message_received", client, message_elt, post_treat + ) + ) # message_received triggers block the messages queue, so they must not take too # long to proceed. delayed_call = reactor.callLater( - 10, - self._on_processing_timeout, - message_elt, - async_point_d + 10, self._on_processing_timeout, message_elt, async_point_d ) trigger_ret_continue = await async_point_d @@ -1411,14 +1474,15 @@ def complete_attachments(self, data: MessageData) -> MessageData: """Complete missing metadata of attachments""" - for attachment in data['extra'].get(C.KEY_ATTACHMENTS, []): + for attachment in data["extra"].get(C.KEY_ATTACHMENTS, []): if "name" not in attachment and "url" in attachment: - name = (Path(unquote(urlparse(attachment['url']).path)).name - or C.FILE_DEFAULT_NAME) + name = ( + Path(unquote(urlparse(attachment["url"]).path)).name + or C.FILE_DEFAULT_NAME + ) attachment["name"] = name - if ((C.KEY_ATTACHMENTS_MEDIA_TYPE not in attachment - and "name" in attachment)): - media_type = mimetypes.guess_type(attachment['name'], strict=False)[0] + if C.KEY_ATTACHMENTS_MEDIA_TYPE not in attachment and "name" in attachment: + media_type = mimetypes.guess_type(attachment["name"], strict=False)[0] if media_type: attachment[C.KEY_ATTACHMENTS_MEDIA_TYPE] = media_type return data @@ -1432,8 +1496,9 @@ if self.parent.is_message_printable(data): return await self.host.memory.add_to_history(self.parent, data) else: - log.debug("not storing empty message to history: {data}" - .format(data=data)) + log.debug( + "not storing empty message to history: {data}".format(data=data) + ) return data def bridge_signal(self, data: MessageData) -> MessageData: @@ -1459,8 +1524,9 @@ profile=self.parent.profile, ) else: - log.debug("Discarding bridge signal for empty message: {data}".format( - data=data)) + log.debug( + "Discarding bridge signal for empty message: {data}".format(data=data) + ) return data @@ -1480,7 +1546,7 @@ @property def versioning(self): """True if server support roster versioning""" - return (NS_ROSTER_VER, 'ver') in self.parent.xmlstream.features + return (NS_ROSTER_VER, "ver") in self.parent.xmlstream.features @property def roster_cache(self): @@ -1547,7 +1613,7 @@ @defer.inlineCallbacks def request_roster(self): - """Ask the server for Roster list """ + """Ask the server for Roster list""" if self.versioning: log.info(_("our server support roster versioning, we use it")) roster_cache = self.roster_cache @@ -1565,7 +1631,7 @@ if roster_jid_s == ROSTER_VER_KEY: continue roster_jid = jid.JID(roster_jid_s) - roster_item_elt = generic.parseXml(roster_item_elt_s.encode('utf-8')) + roster_item_elt = generic.parseXml(roster_item_elt_s.encode("utf-8")) roster_item = xmppim.RosterItem.fromElement(roster_item_elt) self._jids[roster_jid] = roster_item self._register_item(roster_item) @@ -1576,8 +1642,10 @@ log.debug("requesting roster") roster = yield self.getRoster(version=version) if roster is None: - log.debug("empty roster result received, we'll get roster item with roster " - "pushes") + log.debug( + "empty roster result received, we'll get roster item with roster " + "pushes" + ) else: # a full roster is received self._groups.clear() @@ -1589,9 +1657,7 @@ # may change in the future log.info( "Removing contact {} from roster because there is no presence " - "subscription".format( - item.jid - ) + "subscription".format(item.jid) ) self.removeItem(item.entity) # FIXME: to be checked else: @@ -1646,8 +1712,10 @@ self._jids[entity] = item self._register_item(item) self.host.bridge.contact_new( - entity.full(), self.get_attributes(item), list(item.groups), - self.parent.profile + entity.full(), + self.get_attributes(item), + list(item.groups), + self.parent.profile, ) def removeReceived(self, request): @@ -1710,7 +1778,8 @@ """Return True if jid is in roster""" if not isinstance(entity_jid, jid.JID): raise exceptions.InternalError( - f"a JID is expected, not {type(entity_jid)}: {entity_jid!r}") + f"a JID is expected, not {type(entity_jid)}: {entity_jid!r}" + ) return entity_jid in self._jids def is_subscribed_from(self, entity_jid: jid.JID) -> bool: @@ -1825,7 +1894,12 @@ statuses[C.PRESENCE_STATUSES_DEFAULT] = statuses.pop(None) if not self.host.trigger.point( - "presence_received", self.parent, entity, C.PRESENCE_UNAVAILABLE, 0, statuses, + "presence_received", + self.parent, + entity, + C.PRESENCE_UNAVAILABLE, + 0, + statuses, ): return @@ -1833,9 +1907,7 @@ # if the entity is not known yet in this session or is already unavailable, # there is no need to send an unavailable signal try: - presence = self.host.memory.get_entity_datum( - self.client, entity, "presence" - ) + presence = self.host.memory.get_entity_datum(self.client, entity, "presence") except (KeyError, exceptions.UnknownEntityError): # the entity has not been seen yet in this session pass @@ -1951,9 +2023,11 @@ def getDiscoInfo(self, requestor, target, nodeIdentifier=""): # those features are implemented in Wokkel (or sat_tmp.wokkel) # and thus are always available - return [disco.DiscoFeature(NS_X_DATA), - disco.DiscoFeature(NS_XML_ELEMENT), - disco.DiscoFeature(NS_DISCO_INFO)] + return [ + disco.DiscoFeature(NS_X_DATA), + disco.DiscoFeature(NS_XML_ELEMENT), + disco.DiscoFeature(NS_DISCO_INFO), + ] def getDiscoItems(self, requestor, target, nodeIdentifier=""): return [] @@ -1985,6 +2059,7 @@ @implementer(iwokkel.IDisco) class SatIdentityHandler(XMPPHandler): """Manage disco Identity of SàT.""" + # TODO: dynamic identity update (see docstring). Note that a XMPP entity can have # several identities