Mercurial > libervia-backend
comparison sat/plugins/plugin_xep_0045.py @ 4001:32d714a8ea51
plugin XEP-0045: dot not wait for MAM retrieval to be completed:
in `_join_MAM`, `room.fully_joined` is called before retrieving the MAM archive, as the
process can be very long, and is not necessary to have the room working (message can be
received after being in the room, and added out of order). This avoid blocking the `join`
workflow for an extended time.
Some renaming and coroutine integrations.
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 10 Mar 2023 17:22:41 +0100 |
parents | 8289ac1b34f4 |
children | 524856bd7b19 |
comparison
equal
deleted
inserted
replaced
4000:2d59974a8e3e | 4001:32d714a8ea51 |
---|---|
15 # GNU Affero General Public License for more details. | 15 # GNU Affero General Public License for more details. |
16 | 16 |
17 # You should have received a copy of the GNU Affero General Public License | 17 # You should have received a copy of the GNU Affero General Public License |
18 # along with this program. If not, see <http://www.gnu.org/licenses/>. | 18 # along with this program. If not, see <http://www.gnu.org/licenses/>. |
19 | 19 |
20 from sat.core.i18n import _, D_ | 20 import time |
21 from sat.core.constants import Const as C | 21 from typing import Optional |
22 from sat.core.log import getLogger | 22 import uuid |
23 | |
23 from twisted.internet import defer | 24 from twisted.internet import defer |
25 from twisted.python import failure | |
24 from twisted.words.protocols.jabber import jid | 26 from twisted.words.protocols.jabber import jid |
25 from twisted.words.protocols.jabber import error as xmpp_error | 27 from twisted.words.protocols.jabber import error as xmpp_error |
26 from twisted.python import failure | 28 from wokkel import disco, iwokkel, muc |
27 | |
28 from sat.core import exceptions | |
29 from sat.core.xmpp import SatXMPPClient | |
30 from sat.memory import memory | |
31 | |
32 import time | |
33 import uuid | |
34 | |
35 from wokkel import muc, disco, iwokkel | |
36 from sat.tools import xml_tools | |
37 | |
38 from zope.interface import implementer | |
39 | |
40 # XXX: mam and rsm come from sat_tmp.wokkel | |
41 from wokkel import rsm | 29 from wokkel import rsm |
42 from wokkel import mam | 30 from wokkel import mam |
31 from zope.interface import implementer | |
32 | |
33 from sat.core import exceptions | |
34 from sat.core.core_types import SatXMPPEntity | |
35 from sat.core.constants import Const as C | |
36 from sat.core.i18n import D_, _ | |
37 from sat.core.log import getLogger | |
38 from sat.memory import memory | |
39 from sat.tools import xml_tools, utils | |
43 | 40 |
44 | 41 |
45 log = getLogger(__name__) | 42 log = getLogger(__name__) |
46 | 43 |
47 | 44 |
159 host.trigger.add("presence_available", self.presenceTrigger) | 156 host.trigger.add("presence_available", self.presenceTrigger) |
160 host.trigger.add("presence_received", self.presenceReceivedTrigger) | 157 host.trigger.add("presence_received", self.presenceReceivedTrigger) |
161 host.trigger.add("messageReceived", self.messageReceivedTrigger, priority=1000000) | 158 host.trigger.add("messageReceived", self.messageReceivedTrigger, priority=1000000) |
162 host.trigger.add("message_parse", self._message_parseTrigger) | 159 host.trigger.add("message_parse", self._message_parseTrigger) |
163 | 160 |
164 def profileConnected(self, client): | 161 async def profileConnected(self, client): |
165 def assign_service(service): | 162 client.muc_service = await self.get_MUC_service(client) |
166 client.muc_service = service | |
167 return self.getMUCService(client).addCallback(assign_service) | |
168 | 163 |
169 def _message_parseTrigger(self, client, message_elt, data): | 164 def _message_parseTrigger(self, client, message_elt, data): |
170 """Add stanza-id from the room if present""" | 165 """Add stanza-id from the room if present""" |
171 if message_elt.getAttribute("type") != C.MESS_TYPE_GROUPCHAT: | 166 if message_elt.getAttribute("type") != C.MESS_TYPE_GROUPCHAT: |
172 return True | 167 return True |
203 log.warning("Received groupchat message for a room which has not been " | 198 log.warning("Received groupchat message for a room which has not been " |
204 "joined, ignoring it: {}".format(message_elt.toXml())) | 199 "joined, ignoring it: {}".format(message_elt.toXml())) |
205 return False | 200 return False |
206 return True | 201 return True |
207 | 202 |
208 def getRoom(self, client: SatXMPPClient, room_jid: jid.JID) -> muc.Room: | 203 def getRoom(self, client: SatXMPPEntity, room_jid: jid.JID) -> muc.Room: |
209 """Retrieve Room instance from its jid | 204 """Retrieve Room instance from its jid |
210 | 205 |
211 @param room_jid: jid of the room | 206 @param room_jid: jid of the room |
212 @raise exceptions.NotFound: the room has not been joined | 207 @raise exceptions.NotFound: the room has not been joined |
213 """ | 208 """ |
222 @param room_jid (JID): room JID | 217 @param room_jid (JID): room JID |
223 """ | 218 """ |
224 if room_jid not in client._muc_client.joined_rooms: | 219 if room_jid not in client._muc_client.joined_rooms: |
225 raise exceptions.NotFound(_("This room has not been joined")) | 220 raise exceptions.NotFound(_("This room has not been joined")) |
226 | 221 |
227 def isJoinedRoom(self, client: SatXMPPClient, room_jid: jid.JID) -> bool: | 222 def isJoinedRoom(self, client: SatXMPPEntity, room_jid: jid.JID) -> bool: |
228 """Tell if a jid is a known and joined room | 223 """Tell if a jid is a known and joined room |
229 | 224 |
230 @room_jid: jid of the room | 225 @room_jid: jid of the room |
231 """ | 226 """ |
232 try: | 227 try: |
456 self.checkRoomJoined(client, room_jid) | 451 self.checkRoomJoined(client, room_jid) |
457 return client._muc_client.joined_rooms[room_jid].inRoster(muc.User(nick)) | 452 return client._muc_client.joined_rooms[room_jid].inRoster(muc.User(nick)) |
458 | 453 |
459 def _getMUCService(self, jid_=None, profile=C.PROF_KEY_NONE): | 454 def _getMUCService(self, jid_=None, profile=C.PROF_KEY_NONE): |
460 client = self.host.getClient(profile) | 455 client = self.host.getClient(profile) |
461 d = self.getMUCService(client, jid_ or None) | 456 d = defer.ensureDeferred(self.get_MUC_service(client, jid_ or None)) |
462 d.addCallback(lambda service_jid: service_jid.full() if service_jid is not None else '') | 457 d.addCallback(lambda service_jid: service_jid.full() if service_jid is not None else '') |
463 return d | 458 return d |
464 | 459 |
465 @defer.inlineCallbacks | 460 async def get_MUC_service( |
466 def getMUCService(self, client, jid_=None): | 461 self, |
462 client: SatXMPPEntity, | |
463 jid_: Optional[jid.JID] = None) -> Optional[jid.JID]: | |
467 """Return first found MUC service of an entity | 464 """Return first found MUC service of an entity |
468 | 465 |
469 @param jid_: entity which may have a MUC service, or None for our own server | 466 @param jid_: entity which may have a MUC service, or None for our own server |
470 @return (jid.JID, None): found service jid or None | 467 @return: found service jid or None |
471 """ | 468 """ |
472 if jid_ is None: | 469 if jid_ is None: |
473 try: | 470 try: |
474 muc_service = client.muc_service | 471 muc_service = client.muc_service |
475 except AttributeError: | 472 except AttributeError: |
476 pass | 473 pass |
477 else: | 474 else: |
478 # we have a cached value, we return it | 475 # we have a cached value, we return it |
479 defer.returnValue(muc_service) | 476 return muc_service |
480 services = yield self.host.findServiceEntities(client, "conference", "text", jid_) | 477 services = await self.host.findServiceEntities(client, "conference", "text", jid_) |
481 for service in services: | 478 for service in services: |
482 if ".irc." not in service.userhost(): | 479 if ".irc." not in service.userhost(): |
483 # FIXME: | 480 # FIXME: |
484 # This ugly hack is here to avoid an issue with openfire: the IRC gateway | 481 # This ugly hack is here to avoid an issue with openfire: the IRC gateway |
485 # use "conference/text" identity (instead of "conference/irc") | 482 # use "conference/text" identity (instead of "conference/irc") |
486 muc_service = service | 483 muc_service = service |
487 break | 484 break |
488 else: | 485 else: |
489 muc_service = None | 486 muc_service = None |
490 defer.returnValue(muc_service) | 487 return muc_service |
491 | 488 |
492 def _getUniqueName(self, muc_service="", profile_key=C.PROF_KEY_NONE): | 489 def _getUniqueName(self, muc_service="", profile_key=C.PROF_KEY_NONE): |
493 client = self.host.getClient(profile_key) | 490 client = self.host.getClient(profile_key) |
494 return self.getUniqueName(client, muc_service or None).full() | 491 return self.getUniqueName(client, muc_service or None).full() |
495 | 492 |
547 d = self.join(client, room_jid, nick, options or None) | 544 d = self.join(client, room_jid, nick, options or None) |
548 d.addCallback(lambda room: [False] + self._getRoomJoinedArgs(room, client.profile)) | 545 d.addCallback(lambda room: [False] + self._getRoomJoinedArgs(room, client.profile)) |
549 d.addErrback(self._join_eb, client) | 546 d.addErrback(self._join_eb, client) |
550 return d | 547 return d |
551 | 548 |
552 def join(self, client, room_jid, nick=None, options=None): | 549 async def join( |
550 self, | |
551 client: SatXMPPEntity, | |
552 room_jid: jid.JID, | |
553 nick: Optional[str] = None, | |
554 options: Optional[dict] = None | |
555 ) -> Optional[muc.Room]: | |
553 if not nick: | 556 if not nick: |
554 nick = client.jid.user | 557 nick = client.jid.user |
555 if options is None: | 558 if options is None: |
556 options = {} | 559 options = {} |
557 if room_jid in client._muc_client.joined_rooms: | 560 if room_jid in client._muc_client.joined_rooms: |
558 room = client._muc_client.joined_rooms[room_jid] | 561 room = client._muc_client.joined_rooms[room_jid] |
559 log.info(_('{profile} is already in room {room_jid}').format( | 562 log.info(_('{profile} is already in room {room_jid}').format( |
560 profile=client.profile, room_jid = room_jid.userhost())) | 563 profile=client.profile, room_jid = room_jid.userhost())) |
561 return defer.fail(AlreadyJoined(room)) | 564 raise AlreadyJoined(room) |
562 log.info(_("[{profile}] is joining room {room} with nick {nick}").format( | 565 log.info(_("[{profile}] is joining room {room} with nick {nick}").format( |
563 profile=client.profile, room=room_jid.userhost(), nick=nick)) | 566 profile=client.profile, room=room_jid.userhost(), nick=nick)) |
564 self.host.bridge.mucRoomPrepareJoin(room_jid.userhost(), client.profile) | 567 self.host.bridge.mucRoomPrepareJoin(room_jid.userhost(), client.profile) |
565 | 568 |
566 password = options.get("password") | 569 password = options.get("password") |
567 | 570 |
568 d = client._muc_client.join(room_jid, nick, password) | 571 try: |
569 d.addCallbacks(self._joinCb, self._joinEb, | 572 room = await client._muc_client.join(room_jid, nick, password) |
570 (client, room_jid, nick), | 573 except Exception as e: |
571 errbackArgs=(client, room_jid, nick, password)) | 574 room = await utils.asDeferred( |
572 return d | 575 self._joinEb(failure.Failure(e), client, room_jid, nick, password) |
576 ) | |
577 else: | |
578 await defer.ensureDeferred( | |
579 self._joinCb(room, client, room_jid, nick) | |
580 ) | |
581 return room | |
573 | 582 |
574 def popRooms(self, client): | 583 def popRooms(self, client): |
575 """Remove rooms and return data needed to re-join them | 584 """Remove rooms and return data needed to re-join them |
576 | 585 |
577 This methods is to be called before a hot reconnection | 586 This methods is to be called before a hot reconnection |
609 self.checkRoomJoined(client, room_jid) | 618 self.checkRoomJoined(client, room_jid) |
610 return client._muc_client.subject(room_jid, subject) | 619 return client._muc_client.subject(room_jid, subject) |
611 | 620 |
612 def getHandler(self, client): | 621 def getHandler(self, client): |
613 # create a MUC client and associate it with profile' session | 622 # create a MUC client and associate it with profile' session |
614 muc_client = client._muc_client = SatMUCClient(self) | 623 muc_client = client._muc_client = LiberviaMUCClient(self) |
615 return muc_client | 624 return muc_client |
616 | 625 |
617 def kick(self, client, nick, room_jid, options=None): | 626 def kick(self, client, nick, room_jid, options=None): |
618 """Kick a participant from the room | 627 """Kick a participant from the room |
619 | 628 |
913 return False | 922 return False |
914 return True | 923 return True |
915 | 924 |
916 | 925 |
917 @implementer(iwokkel.IDisco) | 926 @implementer(iwokkel.IDisco) |
918 class SatMUCClient(muc.MUCClient): | 927 class LiberviaMUCClient(muc.MUCClient): |
919 | 928 |
920 def __init__(self, plugin_parent): | 929 def __init__(self, plugin_parent): |
921 self.plugin_parent = plugin_parent | 930 self.plugin_parent = plugin_parent |
922 muc.MUCClient.__init__(self) | 931 muc.MUCClient.__init__(self) |
923 self._changing_nicks = set() # used to keep trace of who is changing nick, | 932 self._changing_nicks = set() # used to keep trace of who is changing nick, |
965 current_state=room.state, | 974 current_state=room.state, |
966 expected_state=expected_state)) | 975 expected_state=expected_state)) |
967 room.state = new_state | 976 room.state = new_state |
968 | 977 |
969 def _addRoom(self, room): | 978 def _addRoom(self, room): |
970 super(SatMUCClient, self)._addRoom(room) | 979 super(LiberviaMUCClient, self)._addRoom(room) |
971 room._roster_ok = False # True when occupants list has been fully received | 980 room._roster_ok = False # True when occupants list has been fully received |
972 room.state = ROOM_STATE_OCCUPANTS | 981 room.state = ROOM_STATE_OCCUPANTS |
973 # FIXME: check if history_d is not redundant with fully_joined | 982 # FIXME: check if history_d is not redundant with fully_joined |
974 room.fully_joined = defer.Deferred() # called when everything is OK | 983 room.fully_joined = defer.Deferred() # called when everything is OK |
975 # cache data until room is ready | 984 # cache data until room is ready |
976 # list of elements which will be re-injected in stream | 985 # list of elements which will be re-injected in stream |
977 room._cache = [] | 986 room._cache = [] |
978 # we only need to keep last presence status for each jid, so a dict is suitable | 987 # we only need to keep last presence status for each jid, so a dict is suitable |
979 room._cache_presence = {} | 988 room._cache_presence = {} |
980 | 989 |
981 @defer.inlineCallbacks | 990 async def _join_legacy( |
982 def _joinLegacy(self, client, room_jid, nick, password): | 991 self, |
992 client: SatXMPPEntity, | |
993 room_jid: jid.JID, | |
994 nick: str, | |
995 password: Optional[str] | |
996 ) -> muc.Room: | |
983 """Join room an retrieve history with legacy method""" | 997 """Join room an retrieve history with legacy method""" |
984 mess_data_list = yield self.host.memory.historyGet(room_jid, | 998 mess_data_list = await self.host.memory.historyGet( |
985 client.jid.userhostJID(), | 999 room_jid, |
986 limit=1, | 1000 client.jid.userhostJID(), |
987 between=True, | 1001 limit=1, |
988 profile=client.profile) | 1002 between=True, |
1003 profile=client.profile | |
1004 ) | |
989 if mess_data_list: | 1005 if mess_data_list: |
990 timestamp = mess_data_list[0][1] | 1006 timestamp = mess_data_list[0][1] |
991 # we use seconds since last message to get backlog without duplicates | 1007 # we use seconds since last message to get backlog without duplicates |
992 # and we remove 1 second to avoid getting the last message again | 1008 # and we remove 1 second to avoid getting the last message again |
993 seconds = int(time.time() - timestamp) - 1 | 1009 seconds = int(time.time() - timestamp) - 1 |
994 else: | 1010 else: |
995 seconds = None | 1011 seconds = None |
996 | 1012 |
997 room = yield super(SatMUCClient, self).join( | 1013 room = await super(LiberviaMUCClient, self).join( |
998 room_jid, nick, muc.HistoryOptions(seconds=seconds), password) | 1014 room_jid, nick, muc.HistoryOptions(seconds=seconds), password) |
999 # used to send bridge signal once backlog are written in history | 1015 # used to send bridge signal once backlog are written in history |
1000 room._history_type = HISTORY_LEGACY | 1016 room._history_type = HISTORY_LEGACY |
1001 room._history_d = defer.Deferred() | 1017 room._history_d = defer.Deferred() |
1002 room._history_d.callback(None) | 1018 room._history_d.callback(None) |
1003 defer.returnValue(room) | 1019 return room |
1004 | 1020 |
1005 @defer.inlineCallbacks | 1021 async def _get_MAM_history( |
1006 def _joinMAM(self, client, room_jid, nick, password): | 1022 self, |
1007 """Join room and retrieve history using MAM""" | 1023 client: SatXMPPEntity, |
1008 room = yield super(SatMUCClient, self).join( | 1024 room: muc.Room, |
1009 # we don't want any history from room as we'll get it with MAM | 1025 room_jid: jid.JID |
1010 room_jid, nick, muc.HistoryOptions(maxStanzas=0), password=password) | 1026 ) -> None: |
1011 room._history_type = HISTORY_MAM | 1027 """Retrieve history for rooms handling MAM""" |
1012 history_d = room._history_d = defer.Deferred() | 1028 history_d = room._history_d = defer.Deferred() |
1013 # we trigger now the deferred so all callback are processed as soon as possible | 1029 # we trigger now the deferred so all callback are processed as soon as possible |
1014 # and in order | 1030 # and in order |
1015 history_d.callback(None) | 1031 history_d.callback(None) |
1016 | 1032 |
1017 last_mess = yield self.host.memory.historyGet( | 1033 last_mess = await self.host.memory.historyGet( |
1018 room_jid, | 1034 room_jid, |
1019 None, | 1035 None, |
1020 limit=1, | 1036 limit=1, |
1021 between=False, | 1037 between=False, |
1022 filters={ | 1038 filters={ |
1038 mam_req = mam.MAMRequest(rsm_=rsm_req) | 1054 mam_req = mam.MAMRequest(rsm_=rsm_req) |
1039 complete = False | 1055 complete = False |
1040 count = 0 | 1056 count = 0 |
1041 while not complete: | 1057 while not complete: |
1042 try: | 1058 try: |
1043 mam_data = yield self._mam.getArchives(client, mam_req, | 1059 mam_data = await self._mam.getArchives(client, mam_req, |
1044 service=room_jid) | 1060 service=room_jid) |
1045 except xmpp_error.StanzaError as e: | 1061 except xmpp_error.StanzaError as e: |
1046 if last_mess and e.condition == 'item-not-found': | 1062 if last_mess and e.condition == 'item-not-found': |
1047 log.info( | 1063 log.info( |
1048 f"requested item (with id {stanza_id!r}) can't be found in " | 1064 f"requested item (with id {stanza_id!r}) can't be found in " |
1105 self.changeRoomState(room, ROOM_STATE_LIVE) | 1121 self.changeRoomState(room, ROOM_STATE_LIVE) |
1106 history_d.addCallbacks(self._historyCb, self._historyEb, [room], | 1122 history_d.addCallbacks(self._historyCb, self._historyEb, [room], |
1107 errbackArgs=[room]) | 1123 errbackArgs=[room]) |
1108 | 1124 |
1109 # we wait for all callbacks to be processed | 1125 # we wait for all callbacks to be processed |
1110 yield history_d | 1126 await history_d |
1111 | 1127 |
1112 defer.returnValue(room) | 1128 async def _join_MAM( |
1113 | 1129 self, |
1114 @defer.inlineCallbacks | 1130 client: SatXMPPEntity, |
1115 def join(self, room_jid, nick, password=None): | 1131 room_jid: jid.JID, |
1132 nick: str, | |
1133 password: Optional[str] | |
1134 ) -> muc.Room: | |
1135 """Join room and retrieve history using MAM""" | |
1136 room = await super(LiberviaMUCClient, self).join( | |
1137 # we don't want any history from room as we'll get it with MAM | |
1138 room_jid, nick, muc.HistoryOptions(maxStanzas=0), password=password | |
1139 ) | |
1140 room._history_type = HISTORY_MAM | |
1141 # MAM history retrieval can be very long, and doesn't need to be sync, so we don't | |
1142 # wait for it | |
1143 defer.ensureDeferred(self._get_MAM_history(client, room, room_jid)) | |
1144 room.fully_joined.callback(room) | |
1145 | |
1146 return room | |
1147 | |
1148 async def join(self, room_jid, nick, password=None): | |
1116 room_service = jid.JID(room_jid.host) | 1149 room_service = jid.JID(room_jid.host) |
1117 has_mam = yield self.host.hasFeature(self.client, mam.NS_MAM, room_service) | 1150 has_mam = await self.host.hasFeature(self.client, mam.NS_MAM, room_service) |
1118 if not self._mam or not has_mam: | 1151 if not self._mam or not has_mam: |
1119 room = yield self._joinLegacy(self.client, room_jid, nick, password) | 1152 return await self._join_legacy(self.client, room_jid, nick, password) |
1120 defer.returnValue(room) | 1153 else: |
1121 else: | 1154 return await self._join_MAM(self.client, room_jid, nick, password) |
1122 room = yield self._joinMAM(self.client, room_jid, nick, password) | |
1123 defer.returnValue(room) | |
1124 | 1155 |
1125 ## presence/roster ## | 1156 ## presence/roster ## |
1126 | 1157 |
1127 def availableReceived(self, presence): | 1158 def availableReceived(self, presence): |
1128 """ | 1159 """ |
1398 def _historyCb(self, __, room): | 1429 def _historyCb(self, __, room): |
1399 """Called when history have been written to database and subject is received | 1430 """Called when history have been written to database and subject is received |
1400 | 1431 |
1401 this method will finish joining by: | 1432 this method will finish joining by: |
1402 - sending message to bridge | 1433 - sending message to bridge |
1403 - calling fully_joined deferred | 1434 - calling fully_joined deferred (for legacy history) |
1404 - sending stanza put in cache | 1435 - sending stanza put in cache |
1405 - cleaning variables not needed anymore | 1436 - cleaning variables not needed anymore |
1406 """ | 1437 """ |
1407 args = self.plugin_parent._getRoomJoinedArgs(room, self.client.profile) | 1438 args = self.plugin_parent._getRoomJoinedArgs(room, self.client.profile) |
1408 self.host.bridge.mucRoomJoined(*args) | 1439 self.host.bridge.mucRoomJoined(*args) |
1409 room.fully_joined.callback(room) | 1440 if room._history_type == HISTORY_LEGACY: |
1441 room.fully_joined.callback(room) | |
1410 del room._history_d | 1442 del room._history_d |
1411 del room._history_type | 1443 del room._history_type |
1412 cache = room._cache | 1444 cache = room._cache |
1413 del room._cache | 1445 del room._cache |
1414 cache_presence = room._cache_presence | 1446 cache_presence = room._cache_presence |