comparison sat/plugins/plugin_xep_0045.py @ 4001:32d714a8ea51

plugin XEP-0045: dot not wait for MAM retrieval to be completed: in `_join_MAM`, `room.fully_joined` is called before retrieving the MAM archive, as the process can be very long, and is not necessary to have the room working (message can be received after being in the room, and added out of order). This avoid blocking the `join` workflow for an extended time. Some renaming and coroutine integrations.
author Goffi <goffi@goffi.org>
date Fri, 10 Mar 2023 17:22:41 +0100
parents 8289ac1b34f4
children 524856bd7b19
comparison
equal deleted inserted replaced
4000:2d59974a8e3e 4001:32d714a8ea51
15 # GNU Affero General Public License for more details. 15 # GNU Affero General Public License for more details.
16 16
17 # You should have received a copy of the GNU Affero General Public License 17 # You should have received a copy of the GNU Affero General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>. 18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 19
20 from sat.core.i18n import _, D_ 20 import time
21 from sat.core.constants import Const as C 21 from typing import Optional
22 from sat.core.log import getLogger 22 import uuid
23
23 from twisted.internet import defer 24 from twisted.internet import defer
25 from twisted.python import failure
24 from twisted.words.protocols.jabber import jid 26 from twisted.words.protocols.jabber import jid
25 from twisted.words.protocols.jabber import error as xmpp_error 27 from twisted.words.protocols.jabber import error as xmpp_error
26 from twisted.python import failure 28 from wokkel import disco, iwokkel, muc
27
28 from sat.core import exceptions
29 from sat.core.xmpp import SatXMPPClient
30 from sat.memory import memory
31
32 import time
33 import uuid
34
35 from wokkel import muc, disco, iwokkel
36 from sat.tools import xml_tools
37
38 from zope.interface import implementer
39
40 # XXX: mam and rsm come from sat_tmp.wokkel
41 from wokkel import rsm 29 from wokkel import rsm
42 from wokkel import mam 30 from wokkel import mam
31 from zope.interface import implementer
32
33 from sat.core import exceptions
34 from sat.core.core_types import SatXMPPEntity
35 from sat.core.constants import Const as C
36 from sat.core.i18n import D_, _
37 from sat.core.log import getLogger
38 from sat.memory import memory
39 from sat.tools import xml_tools, utils
43 40
44 41
45 log = getLogger(__name__) 42 log = getLogger(__name__)
46 43
47 44
159 host.trigger.add("presence_available", self.presenceTrigger) 156 host.trigger.add("presence_available", self.presenceTrigger)
160 host.trigger.add("presence_received", self.presenceReceivedTrigger) 157 host.trigger.add("presence_received", self.presenceReceivedTrigger)
161 host.trigger.add("messageReceived", self.messageReceivedTrigger, priority=1000000) 158 host.trigger.add("messageReceived", self.messageReceivedTrigger, priority=1000000)
162 host.trigger.add("message_parse", self._message_parseTrigger) 159 host.trigger.add("message_parse", self._message_parseTrigger)
163 160
164 def profileConnected(self, client): 161 async def profileConnected(self, client):
165 def assign_service(service): 162 client.muc_service = await self.get_MUC_service(client)
166 client.muc_service = service
167 return self.getMUCService(client).addCallback(assign_service)
168 163
169 def _message_parseTrigger(self, client, message_elt, data): 164 def _message_parseTrigger(self, client, message_elt, data):
170 """Add stanza-id from the room if present""" 165 """Add stanza-id from the room if present"""
171 if message_elt.getAttribute("type") != C.MESS_TYPE_GROUPCHAT: 166 if message_elt.getAttribute("type") != C.MESS_TYPE_GROUPCHAT:
172 return True 167 return True
203 log.warning("Received groupchat message for a room which has not been " 198 log.warning("Received groupchat message for a room which has not been "
204 "joined, ignoring it: {}".format(message_elt.toXml())) 199 "joined, ignoring it: {}".format(message_elt.toXml()))
205 return False 200 return False
206 return True 201 return True
207 202
208 def getRoom(self, client: SatXMPPClient, room_jid: jid.JID) -> muc.Room: 203 def getRoom(self, client: SatXMPPEntity, room_jid: jid.JID) -> muc.Room:
209 """Retrieve Room instance from its jid 204 """Retrieve Room instance from its jid
210 205
211 @param room_jid: jid of the room 206 @param room_jid: jid of the room
212 @raise exceptions.NotFound: the room has not been joined 207 @raise exceptions.NotFound: the room has not been joined
213 """ 208 """
222 @param room_jid (JID): room JID 217 @param room_jid (JID): room JID
223 """ 218 """
224 if room_jid not in client._muc_client.joined_rooms: 219 if room_jid not in client._muc_client.joined_rooms:
225 raise exceptions.NotFound(_("This room has not been joined")) 220 raise exceptions.NotFound(_("This room has not been joined"))
226 221
227 def isJoinedRoom(self, client: SatXMPPClient, room_jid: jid.JID) -> bool: 222 def isJoinedRoom(self, client: SatXMPPEntity, room_jid: jid.JID) -> bool:
228 """Tell if a jid is a known and joined room 223 """Tell if a jid is a known and joined room
229 224
230 @room_jid: jid of the room 225 @room_jid: jid of the room
231 """ 226 """
232 try: 227 try:
456 self.checkRoomJoined(client, room_jid) 451 self.checkRoomJoined(client, room_jid)
457 return client._muc_client.joined_rooms[room_jid].inRoster(muc.User(nick)) 452 return client._muc_client.joined_rooms[room_jid].inRoster(muc.User(nick))
458 453
459 def _getMUCService(self, jid_=None, profile=C.PROF_KEY_NONE): 454 def _getMUCService(self, jid_=None, profile=C.PROF_KEY_NONE):
460 client = self.host.getClient(profile) 455 client = self.host.getClient(profile)
461 d = self.getMUCService(client, jid_ or None) 456 d = defer.ensureDeferred(self.get_MUC_service(client, jid_ or None))
462 d.addCallback(lambda service_jid: service_jid.full() if service_jid is not None else '') 457 d.addCallback(lambda service_jid: service_jid.full() if service_jid is not None else '')
463 return d 458 return d
464 459
465 @defer.inlineCallbacks 460 async def get_MUC_service(
466 def getMUCService(self, client, jid_=None): 461 self,
462 client: SatXMPPEntity,
463 jid_: Optional[jid.JID] = None) -> Optional[jid.JID]:
467 """Return first found MUC service of an entity 464 """Return first found MUC service of an entity
468 465
469 @param jid_: entity which may have a MUC service, or None for our own server 466 @param jid_: entity which may have a MUC service, or None for our own server
470 @return (jid.JID, None): found service jid or None 467 @return: found service jid or None
471 """ 468 """
472 if jid_ is None: 469 if jid_ is None:
473 try: 470 try:
474 muc_service = client.muc_service 471 muc_service = client.muc_service
475 except AttributeError: 472 except AttributeError:
476 pass 473 pass
477 else: 474 else:
478 # we have a cached value, we return it 475 # we have a cached value, we return it
479 defer.returnValue(muc_service) 476 return muc_service
480 services = yield self.host.findServiceEntities(client, "conference", "text", jid_) 477 services = await self.host.findServiceEntities(client, "conference", "text", jid_)
481 for service in services: 478 for service in services:
482 if ".irc." not in service.userhost(): 479 if ".irc." not in service.userhost():
483 # FIXME: 480 # FIXME:
484 # This ugly hack is here to avoid an issue with openfire: the IRC gateway 481 # This ugly hack is here to avoid an issue with openfire: the IRC gateway
485 # use "conference/text" identity (instead of "conference/irc") 482 # use "conference/text" identity (instead of "conference/irc")
486 muc_service = service 483 muc_service = service
487 break 484 break
488 else: 485 else:
489 muc_service = None 486 muc_service = None
490 defer.returnValue(muc_service) 487 return muc_service
491 488
492 def _getUniqueName(self, muc_service="", profile_key=C.PROF_KEY_NONE): 489 def _getUniqueName(self, muc_service="", profile_key=C.PROF_KEY_NONE):
493 client = self.host.getClient(profile_key) 490 client = self.host.getClient(profile_key)
494 return self.getUniqueName(client, muc_service or None).full() 491 return self.getUniqueName(client, muc_service or None).full()
495 492
547 d = self.join(client, room_jid, nick, options or None) 544 d = self.join(client, room_jid, nick, options or None)
548 d.addCallback(lambda room: [False] + self._getRoomJoinedArgs(room, client.profile)) 545 d.addCallback(lambda room: [False] + self._getRoomJoinedArgs(room, client.profile))
549 d.addErrback(self._join_eb, client) 546 d.addErrback(self._join_eb, client)
550 return d 547 return d
551 548
552 def join(self, client, room_jid, nick=None, options=None): 549 async def join(
550 self,
551 client: SatXMPPEntity,
552 room_jid: jid.JID,
553 nick: Optional[str] = None,
554 options: Optional[dict] = None
555 ) -> Optional[muc.Room]:
553 if not nick: 556 if not nick:
554 nick = client.jid.user 557 nick = client.jid.user
555 if options is None: 558 if options is None:
556 options = {} 559 options = {}
557 if room_jid in client._muc_client.joined_rooms: 560 if room_jid in client._muc_client.joined_rooms:
558 room = client._muc_client.joined_rooms[room_jid] 561 room = client._muc_client.joined_rooms[room_jid]
559 log.info(_('{profile} is already in room {room_jid}').format( 562 log.info(_('{profile} is already in room {room_jid}').format(
560 profile=client.profile, room_jid = room_jid.userhost())) 563 profile=client.profile, room_jid = room_jid.userhost()))
561 return defer.fail(AlreadyJoined(room)) 564 raise AlreadyJoined(room)
562 log.info(_("[{profile}] is joining room {room} with nick {nick}").format( 565 log.info(_("[{profile}] is joining room {room} with nick {nick}").format(
563 profile=client.profile, room=room_jid.userhost(), nick=nick)) 566 profile=client.profile, room=room_jid.userhost(), nick=nick))
564 self.host.bridge.mucRoomPrepareJoin(room_jid.userhost(), client.profile) 567 self.host.bridge.mucRoomPrepareJoin(room_jid.userhost(), client.profile)
565 568
566 password = options.get("password") 569 password = options.get("password")
567 570
568 d = client._muc_client.join(room_jid, nick, password) 571 try:
569 d.addCallbacks(self._joinCb, self._joinEb, 572 room = await client._muc_client.join(room_jid, nick, password)
570 (client, room_jid, nick), 573 except Exception as e:
571 errbackArgs=(client, room_jid, nick, password)) 574 room = await utils.asDeferred(
572 return d 575 self._joinEb(failure.Failure(e), client, room_jid, nick, password)
576 )
577 else:
578 await defer.ensureDeferred(
579 self._joinCb(room, client, room_jid, nick)
580 )
581 return room
573 582
574 def popRooms(self, client): 583 def popRooms(self, client):
575 """Remove rooms and return data needed to re-join them 584 """Remove rooms and return data needed to re-join them
576 585
577 This methods is to be called before a hot reconnection 586 This methods is to be called before a hot reconnection
609 self.checkRoomJoined(client, room_jid) 618 self.checkRoomJoined(client, room_jid)
610 return client._muc_client.subject(room_jid, subject) 619 return client._muc_client.subject(room_jid, subject)
611 620
612 def getHandler(self, client): 621 def getHandler(self, client):
613 # create a MUC client and associate it with profile' session 622 # create a MUC client and associate it with profile' session
614 muc_client = client._muc_client = SatMUCClient(self) 623 muc_client = client._muc_client = LiberviaMUCClient(self)
615 return muc_client 624 return muc_client
616 625
617 def kick(self, client, nick, room_jid, options=None): 626 def kick(self, client, nick, room_jid, options=None):
618 """Kick a participant from the room 627 """Kick a participant from the room
619 628
913 return False 922 return False
914 return True 923 return True
915 924
916 925
917 @implementer(iwokkel.IDisco) 926 @implementer(iwokkel.IDisco)
918 class SatMUCClient(muc.MUCClient): 927 class LiberviaMUCClient(muc.MUCClient):
919 928
920 def __init__(self, plugin_parent): 929 def __init__(self, plugin_parent):
921 self.plugin_parent = plugin_parent 930 self.plugin_parent = plugin_parent
922 muc.MUCClient.__init__(self) 931 muc.MUCClient.__init__(self)
923 self._changing_nicks = set() # used to keep trace of who is changing nick, 932 self._changing_nicks = set() # used to keep trace of who is changing nick,
965 current_state=room.state, 974 current_state=room.state,
966 expected_state=expected_state)) 975 expected_state=expected_state))
967 room.state = new_state 976 room.state = new_state
968 977
969 def _addRoom(self, room): 978 def _addRoom(self, room):
970 super(SatMUCClient, self)._addRoom(room) 979 super(LiberviaMUCClient, self)._addRoom(room)
971 room._roster_ok = False # True when occupants list has been fully received 980 room._roster_ok = False # True when occupants list has been fully received
972 room.state = ROOM_STATE_OCCUPANTS 981 room.state = ROOM_STATE_OCCUPANTS
973 # FIXME: check if history_d is not redundant with fully_joined 982 # FIXME: check if history_d is not redundant with fully_joined
974 room.fully_joined = defer.Deferred() # called when everything is OK 983 room.fully_joined = defer.Deferred() # called when everything is OK
975 # cache data until room is ready 984 # cache data until room is ready
976 # list of elements which will be re-injected in stream 985 # list of elements which will be re-injected in stream
977 room._cache = [] 986 room._cache = []
978 # we only need to keep last presence status for each jid, so a dict is suitable 987 # we only need to keep last presence status for each jid, so a dict is suitable
979 room._cache_presence = {} 988 room._cache_presence = {}
980 989
981 @defer.inlineCallbacks 990 async def _join_legacy(
982 def _joinLegacy(self, client, room_jid, nick, password): 991 self,
992 client: SatXMPPEntity,
993 room_jid: jid.JID,
994 nick: str,
995 password: Optional[str]
996 ) -> muc.Room:
983 """Join room an retrieve history with legacy method""" 997 """Join room an retrieve history with legacy method"""
984 mess_data_list = yield self.host.memory.historyGet(room_jid, 998 mess_data_list = await self.host.memory.historyGet(
985 client.jid.userhostJID(), 999 room_jid,
986 limit=1, 1000 client.jid.userhostJID(),
987 between=True, 1001 limit=1,
988 profile=client.profile) 1002 between=True,
1003 profile=client.profile
1004 )
989 if mess_data_list: 1005 if mess_data_list:
990 timestamp = mess_data_list[0][1] 1006 timestamp = mess_data_list[0][1]
991 # we use seconds since last message to get backlog without duplicates 1007 # we use seconds since last message to get backlog without duplicates
992 # and we remove 1 second to avoid getting the last message again 1008 # and we remove 1 second to avoid getting the last message again
993 seconds = int(time.time() - timestamp) - 1 1009 seconds = int(time.time() - timestamp) - 1
994 else: 1010 else:
995 seconds = None 1011 seconds = None
996 1012
997 room = yield super(SatMUCClient, self).join( 1013 room = await super(LiberviaMUCClient, self).join(
998 room_jid, nick, muc.HistoryOptions(seconds=seconds), password) 1014 room_jid, nick, muc.HistoryOptions(seconds=seconds), password)
999 # used to send bridge signal once backlog are written in history 1015 # used to send bridge signal once backlog are written in history
1000 room._history_type = HISTORY_LEGACY 1016 room._history_type = HISTORY_LEGACY
1001 room._history_d = defer.Deferred() 1017 room._history_d = defer.Deferred()
1002 room._history_d.callback(None) 1018 room._history_d.callback(None)
1003 defer.returnValue(room) 1019 return room
1004 1020
1005 @defer.inlineCallbacks 1021 async def _get_MAM_history(
1006 def _joinMAM(self, client, room_jid, nick, password): 1022 self,
1007 """Join room and retrieve history using MAM""" 1023 client: SatXMPPEntity,
1008 room = yield super(SatMUCClient, self).join( 1024 room: muc.Room,
1009 # we don't want any history from room as we'll get it with MAM 1025 room_jid: jid.JID
1010 room_jid, nick, muc.HistoryOptions(maxStanzas=0), password=password) 1026 ) -> None:
1011 room._history_type = HISTORY_MAM 1027 """Retrieve history for rooms handling MAM"""
1012 history_d = room._history_d = defer.Deferred() 1028 history_d = room._history_d = defer.Deferred()
1013 # we trigger now the deferred so all callback are processed as soon as possible 1029 # we trigger now the deferred so all callback are processed as soon as possible
1014 # and in order 1030 # and in order
1015 history_d.callback(None) 1031 history_d.callback(None)
1016 1032
1017 last_mess = yield self.host.memory.historyGet( 1033 last_mess = await self.host.memory.historyGet(
1018 room_jid, 1034 room_jid,
1019 None, 1035 None,
1020 limit=1, 1036 limit=1,
1021 between=False, 1037 between=False,
1022 filters={ 1038 filters={
1038 mam_req = mam.MAMRequest(rsm_=rsm_req) 1054 mam_req = mam.MAMRequest(rsm_=rsm_req)
1039 complete = False 1055 complete = False
1040 count = 0 1056 count = 0
1041 while not complete: 1057 while not complete:
1042 try: 1058 try:
1043 mam_data = yield self._mam.getArchives(client, mam_req, 1059 mam_data = await self._mam.getArchives(client, mam_req,
1044 service=room_jid) 1060 service=room_jid)
1045 except xmpp_error.StanzaError as e: 1061 except xmpp_error.StanzaError as e:
1046 if last_mess and e.condition == 'item-not-found': 1062 if last_mess and e.condition == 'item-not-found':
1047 log.info( 1063 log.info(
1048 f"requested item (with id {stanza_id!r}) can't be found in " 1064 f"requested item (with id {stanza_id!r}) can't be found in "
1105 self.changeRoomState(room, ROOM_STATE_LIVE) 1121 self.changeRoomState(room, ROOM_STATE_LIVE)
1106 history_d.addCallbacks(self._historyCb, self._historyEb, [room], 1122 history_d.addCallbacks(self._historyCb, self._historyEb, [room],
1107 errbackArgs=[room]) 1123 errbackArgs=[room])
1108 1124
1109 # we wait for all callbacks to be processed 1125 # we wait for all callbacks to be processed
1110 yield history_d 1126 await history_d
1111 1127
1112 defer.returnValue(room) 1128 async def _join_MAM(
1113 1129 self,
1114 @defer.inlineCallbacks 1130 client: SatXMPPEntity,
1115 def join(self, room_jid, nick, password=None): 1131 room_jid: jid.JID,
1132 nick: str,
1133 password: Optional[str]
1134 ) -> muc.Room:
1135 """Join room and retrieve history using MAM"""
1136 room = await super(LiberviaMUCClient, self).join(
1137 # we don't want any history from room as we'll get it with MAM
1138 room_jid, nick, muc.HistoryOptions(maxStanzas=0), password=password
1139 )
1140 room._history_type = HISTORY_MAM
1141 # MAM history retrieval can be very long, and doesn't need to be sync, so we don't
1142 # wait for it
1143 defer.ensureDeferred(self._get_MAM_history(client, room, room_jid))
1144 room.fully_joined.callback(room)
1145
1146 return room
1147
1148 async def join(self, room_jid, nick, password=None):
1116 room_service = jid.JID(room_jid.host) 1149 room_service = jid.JID(room_jid.host)
1117 has_mam = yield self.host.hasFeature(self.client, mam.NS_MAM, room_service) 1150 has_mam = await self.host.hasFeature(self.client, mam.NS_MAM, room_service)
1118 if not self._mam or not has_mam: 1151 if not self._mam or not has_mam:
1119 room = yield self._joinLegacy(self.client, room_jid, nick, password) 1152 return await self._join_legacy(self.client, room_jid, nick, password)
1120 defer.returnValue(room) 1153 else:
1121 else: 1154 return await self._join_MAM(self.client, room_jid, nick, password)
1122 room = yield self._joinMAM(self.client, room_jid, nick, password)
1123 defer.returnValue(room)
1124 1155
1125 ## presence/roster ## 1156 ## presence/roster ##
1126 1157
1127 def availableReceived(self, presence): 1158 def availableReceived(self, presence):
1128 """ 1159 """
1398 def _historyCb(self, __, room): 1429 def _historyCb(self, __, room):
1399 """Called when history have been written to database and subject is received 1430 """Called when history have been written to database and subject is received
1400 1431
1401 this method will finish joining by: 1432 this method will finish joining by:
1402 - sending message to bridge 1433 - sending message to bridge
1403 - calling fully_joined deferred 1434 - calling fully_joined deferred (for legacy history)
1404 - sending stanza put in cache 1435 - sending stanza put in cache
1405 - cleaning variables not needed anymore 1436 - cleaning variables not needed anymore
1406 """ 1437 """
1407 args = self.plugin_parent._getRoomJoinedArgs(room, self.client.profile) 1438 args = self.plugin_parent._getRoomJoinedArgs(room, self.client.profile)
1408 self.host.bridge.mucRoomJoined(*args) 1439 self.host.bridge.mucRoomJoined(*args)
1409 room.fully_joined.callback(room) 1440 if room._history_type == HISTORY_LEGACY:
1441 room.fully_joined.callback(room)
1410 del room._history_d 1442 del room._history_d
1411 del room._history_type 1443 del room._history_type
1412 cache = room._cache 1444 cache = room._cache
1413 del room._cache 1445 del room._cache
1414 cache_presence = room._cache_presence 1446 cache_presence = room._cache_presence