# HG changeset patch # User Goffi # Date 1678465361 -3600 # Node ID 32d714a8ea5104667d246aef9c159ab6c43801b6 # Parent 2d59974a8e3e99ed5ab8ea99b05993bdd2095e89 plugin XEP-0045: dot not wait for MAM retrieval to be completed: in `_join_MAM`, `room.fully_joined` is called before retrieving the MAM archive, as the process can be very long, and is not necessary to have the room working (message can be received after being in the room, and added out of order). This avoid blocking the `join` workflow for an extended time. Some renaming and coroutine integrations. diff -r 2d59974a8e3e -r 32d714a8ea51 sat/plugins/plugin_xep_0045.py --- a/sat/plugins/plugin_xep_0045.py Fri Mar 10 17:01:09 2023 +0100 +++ b/sat/plugins/plugin_xep_0045.py Fri Mar 10 17:22:41 2023 +0100 @@ -17,29 +17,26 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -from sat.core.i18n import _, D_ -from sat.core.constants import Const as C -from sat.core.log import getLogger +import time +from typing import Optional +import uuid + from twisted.internet import defer +from twisted.python import failure from twisted.words.protocols.jabber import jid from twisted.words.protocols.jabber import error as xmpp_error -from twisted.python import failure +from wokkel import disco, iwokkel, muc +from wokkel import rsm +from wokkel import mam +from zope.interface import implementer from sat.core import exceptions -from sat.core.xmpp import SatXMPPClient +from sat.core.core_types import SatXMPPEntity +from sat.core.constants import Const as C +from sat.core.i18n import D_, _ +from sat.core.log import getLogger from sat.memory import memory - -import time -import uuid - -from wokkel import muc, disco, iwokkel -from sat.tools import xml_tools - -from zope.interface import implementer - -# XXX: mam and rsm come from sat_tmp.wokkel -from wokkel import rsm -from wokkel import mam +from sat.tools import xml_tools, utils log = getLogger(__name__) @@ -161,10 +158,8 @@ host.trigger.add("messageReceived", self.messageReceivedTrigger, priority=1000000) host.trigger.add("message_parse", self._message_parseTrigger) - def profileConnected(self, client): - def assign_service(service): - client.muc_service = service - return self.getMUCService(client).addCallback(assign_service) + async def profileConnected(self, client): + client.muc_service = await self.get_MUC_service(client) def _message_parseTrigger(self, client, message_elt, data): """Add stanza-id from the room if present""" @@ -205,7 +200,7 @@ return False return True - def getRoom(self, client: SatXMPPClient, room_jid: jid.JID) -> muc.Room: + def getRoom(self, client: SatXMPPEntity, room_jid: jid.JID) -> muc.Room: """Retrieve Room instance from its jid @param room_jid: jid of the room @@ -224,7 +219,7 @@ if room_jid not in client._muc_client.joined_rooms: raise exceptions.NotFound(_("This room has not been joined")) - def isJoinedRoom(self, client: SatXMPPClient, room_jid: jid.JID) -> bool: + def isJoinedRoom(self, client: SatXMPPEntity, room_jid: jid.JID) -> bool: """Tell if a jid is a known and joined room @room_jid: jid of the room @@ -458,16 +453,18 @@ def _getMUCService(self, jid_=None, profile=C.PROF_KEY_NONE): client = self.host.getClient(profile) - d = self.getMUCService(client, jid_ or None) + d = defer.ensureDeferred(self.get_MUC_service(client, jid_ or None)) d.addCallback(lambda service_jid: service_jid.full() if service_jid is not None else '') return d - @defer.inlineCallbacks - def getMUCService(self, client, jid_=None): + async def get_MUC_service( + self, + client: SatXMPPEntity, + jid_: Optional[jid.JID] = None) -> Optional[jid.JID]: """Return first found MUC service of an entity @param jid_: entity which may have a MUC service, or None for our own server - @return (jid.JID, None): found service jid or None + @return: found service jid or None """ if jid_ is None: try: @@ -476,8 +473,8 @@ pass else: # we have a cached value, we return it - defer.returnValue(muc_service) - services = yield self.host.findServiceEntities(client, "conference", "text", jid_) + return muc_service + services = await self.host.findServiceEntities(client, "conference", "text", jid_) for service in services: if ".irc." not in service.userhost(): # FIXME: @@ -487,7 +484,7 @@ break else: muc_service = None - defer.returnValue(muc_service) + return muc_service def _getUniqueName(self, muc_service="", profile_key=C.PROF_KEY_NONE): client = self.host.getClient(profile_key) @@ -549,7 +546,13 @@ d.addErrback(self._join_eb, client) return d - def join(self, client, room_jid, nick=None, options=None): + async def join( + self, + client: SatXMPPEntity, + room_jid: jid.JID, + nick: Optional[str] = None, + options: Optional[dict] = None + ) -> Optional[muc.Room]: if not nick: nick = client.jid.user if options is None: @@ -558,18 +561,24 @@ room = client._muc_client.joined_rooms[room_jid] log.info(_('{profile} is already in room {room_jid}').format( profile=client.profile, room_jid = room_jid.userhost())) - return defer.fail(AlreadyJoined(room)) + raise AlreadyJoined(room) log.info(_("[{profile}] is joining room {room} with nick {nick}").format( profile=client.profile, room=room_jid.userhost(), nick=nick)) self.host.bridge.mucRoomPrepareJoin(room_jid.userhost(), client.profile) password = options.get("password") - d = client._muc_client.join(room_jid, nick, password) - d.addCallbacks(self._joinCb, self._joinEb, - (client, room_jid, nick), - errbackArgs=(client, room_jid, nick, password)) - return d + try: + room = await client._muc_client.join(room_jid, nick, password) + except Exception as e: + room = await utils.asDeferred( + self._joinEb(failure.Failure(e), client, room_jid, nick, password) + ) + else: + await defer.ensureDeferred( + self._joinCb(room, client, room_jid, nick) + ) + return room def popRooms(self, client): """Remove rooms and return data needed to re-join them @@ -611,7 +620,7 @@ def getHandler(self, client): # create a MUC client and associate it with profile' session - muc_client = client._muc_client = SatMUCClient(self) + muc_client = client._muc_client = LiberviaMUCClient(self) return muc_client def kick(self, client, nick, room_jid, options=None): @@ -915,7 +924,7 @@ @implementer(iwokkel.IDisco) -class SatMUCClient(muc.MUCClient): +class LiberviaMUCClient(muc.MUCClient): def __init__(self, plugin_parent): self.plugin_parent = plugin_parent @@ -967,7 +976,7 @@ room.state = new_state def _addRoom(self, room): - super(SatMUCClient, self)._addRoom(room) + super(LiberviaMUCClient, self)._addRoom(room) room._roster_ok = False # True when occupants list has been fully received room.state = ROOM_STATE_OCCUPANTS # FIXME: check if history_d is not redundant with fully_joined @@ -978,14 +987,21 @@ # we only need to keep last presence status for each jid, so a dict is suitable room._cache_presence = {} - @defer.inlineCallbacks - def _joinLegacy(self, client, room_jid, nick, password): + async def _join_legacy( + self, + client: SatXMPPEntity, + room_jid: jid.JID, + nick: str, + password: Optional[str] + ) -> muc.Room: """Join room an retrieve history with legacy method""" - mess_data_list = yield self.host.memory.historyGet(room_jid, - client.jid.userhostJID(), - limit=1, - between=True, - profile=client.profile) + mess_data_list = await self.host.memory.historyGet( + room_jid, + client.jid.userhostJID(), + limit=1, + between=True, + profile=client.profile + ) if mess_data_list: timestamp = mess_data_list[0][1] # we use seconds since last message to get backlog without duplicates @@ -994,27 +1010,27 @@ else: seconds = None - room = yield super(SatMUCClient, self).join( + room = await super(LiberviaMUCClient, self).join( room_jid, nick, muc.HistoryOptions(seconds=seconds), password) # used to send bridge signal once backlog are written in history room._history_type = HISTORY_LEGACY room._history_d = defer.Deferred() room._history_d.callback(None) - defer.returnValue(room) + return room - @defer.inlineCallbacks - def _joinMAM(self, client, room_jid, nick, password): - """Join room and retrieve history using MAM""" - room = yield super(SatMUCClient, self).join( - # we don't want any history from room as we'll get it with MAM - room_jid, nick, muc.HistoryOptions(maxStanzas=0), password=password) - room._history_type = HISTORY_MAM + async def _get_MAM_history( + self, + client: SatXMPPEntity, + room: muc.Room, + room_jid: jid.JID + ) -> None: + """Retrieve history for rooms handling MAM""" history_d = room._history_d = defer.Deferred() # we trigger now the deferred so all callback are processed as soon as possible # and in order history_d.callback(None) - last_mess = yield self.host.memory.historyGet( + last_mess = await self.host.memory.historyGet( room_jid, None, limit=1, @@ -1040,7 +1056,7 @@ count = 0 while not complete: try: - mam_data = yield self._mam.getArchives(client, mam_req, + mam_data = await self._mam.getArchives(client, mam_req, service=room_jid) except xmpp_error.StanzaError as e: if last_mess and e.condition == 'item-not-found': @@ -1107,20 +1123,35 @@ errbackArgs=[room]) # we wait for all callbacks to be processed - yield history_d + await history_d - defer.returnValue(room) + async def _join_MAM( + self, + client: SatXMPPEntity, + room_jid: jid.JID, + nick: str, + password: Optional[str] + ) -> muc.Room: + """Join room and retrieve history using MAM""" + room = await super(LiberviaMUCClient, self).join( + # we don't want any history from room as we'll get it with MAM + room_jid, nick, muc.HistoryOptions(maxStanzas=0), password=password + ) + room._history_type = HISTORY_MAM + # MAM history retrieval can be very long, and doesn't need to be sync, so we don't + # wait for it + defer.ensureDeferred(self._get_MAM_history(client, room, room_jid)) + room.fully_joined.callback(room) - @defer.inlineCallbacks - def join(self, room_jid, nick, password=None): + return room + + async def join(self, room_jid, nick, password=None): room_service = jid.JID(room_jid.host) - has_mam = yield self.host.hasFeature(self.client, mam.NS_MAM, room_service) + has_mam = await self.host.hasFeature(self.client, mam.NS_MAM, room_service) if not self._mam or not has_mam: - room = yield self._joinLegacy(self.client, room_jid, nick, password) - defer.returnValue(room) + return await self._join_legacy(self.client, room_jid, nick, password) else: - room = yield self._joinMAM(self.client, room_jid, nick, password) - defer.returnValue(room) + return await self._join_MAM(self.client, room_jid, nick, password) ## presence/roster ## @@ -1400,13 +1431,14 @@ this method will finish joining by: - sending message to bridge - - calling fully_joined deferred + - calling fully_joined deferred (for legacy history) - sending stanza put in cache - cleaning variables not needed anymore """ args = self.plugin_parent._getRoomJoinedArgs(room, self.client.profile) self.host.bridge.mucRoomJoined(*args) - room.fully_joined.callback(room) + if room._history_type == HISTORY_LEGACY: + room.fully_joined.callback(room) del room._history_d del room._history_type cache = room._cache diff -r 2d59974a8e3e -r 32d714a8ea51 sat/plugins/plugin_xep_0048.py --- a/sat/plugins/plugin_xep_0048.py Fri Mar 10 17:01:09 2023 +0100 +++ b/sat/plugins/plugin_xep_0048.py Fri Mar 10 17:22:41 2023 +0100 @@ -101,16 +101,15 @@ except KeyError: log.info(_("Text commands not available")) - @defer.inlineCallbacks - def profileConnected(self, client): + async def profileConnected(self, client): local = client.bookmarks_local = PersistentBinaryDict( NS_BOOKMARKS, client.profile ) - yield local.load() + await local.load() if not local: local[XEP_0048.MUC_TYPE] = dict() local[XEP_0048.URL_TYPE] = dict() - private = yield self._getServerBookmarks("private", client.profile) + private = await self._getServerBookmarks("private", client.profile) pubsub = client.bookmarks_pubsub = None for bookmarks in (local, private, pubsub): @@ -118,7 +117,9 @@ for (room_jid, data) in list(bookmarks[XEP_0048.MUC_TYPE].items()): if data.get("autojoin", "false") == "true": nick = data.get("nick", client.jid.user) - self.host.plugins["XEP-0045"].join(client, room_jid, nick, {}) + defer.ensureDeferred( + self.host.plugins["XEP-0045"].join(client, room_jid, nick, {}) + ) # we don't use a DeferredList to gather result here, as waiting for all room would # slow down a lot the connection process, and result in a bad user experience. diff -r 2d59974a8e3e -r 32d714a8ea51 sat/plugins/plugin_xep_0198.py --- a/sat/plugins/plugin_xep_0198.py Fri Mar 10 17:01:09 2023 +0100 +++ b/sat/plugins/plugin_xep_0198.py Fri Mar 10 17:22:41 2023 +0100 @@ -454,7 +454,9 @@ if plg_0045 is not None: # we re-join MUC rooms muc_d_list = defer.DeferredList( - [plg_0045.join(*args) for args in muc_join_args]) + [defer.ensureDeferred(plg_0045.join(*args)) + for args in muc_join_args] + ) d.addCallback(lambda __: muc_d_list) # at the end we replay the buffer, as those stanzas have probably not # been received diff -r 2d59974a8e3e -r 32d714a8ea51 sat/plugins/plugin_xep_0249.py --- a/sat/plugins/plugin_xep_0249.py Fri Mar 10 17:01:09 2023 +0100 +++ b/sat/plugins/plugin_xep_0249.py Fri Mar 10 17:22:41 2023 +0100 @@ -17,19 +17,20 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -from sat.core.i18n import _, D_ +from twisted.internet import defer +from twisted.words.protocols.jabber import jid +from twisted.words.xish import domish +from wokkel import disco, iwokkel +from zope.interface import implementer + +from sat.core import exceptions from sat.core.constants import Const as C -from sat.core import exceptions +from sat.core.i18n import D_, _ from sat.core.log import getLogger +from sat.tools import xml_tools log = getLogger(__name__) -from sat.tools import xml_tools -from twisted.words.xish import domish -from twisted.words.protocols.jabber import jid -from zope.interface import implementer - -from wokkel import disco, iwokkel try: @@ -140,7 +141,9 @@ _("Invitation accepted for room %(room)s [%(profile)s]") % {"room": room_jid.userhost(), "profile": client.profile} ) - d = self.host.plugins["XEP-0045"].join(client, room_jid, client.jid.user, {}) + d = defer.ensureDeferred( + self.host.plugins["XEP-0045"].join(client, room_jid, client.jid.user, {}) + ) return d def _messageReceivedTrigger(self, client, message_elt, post_treat):