changeset 2581:395a3d1c2888

plugin XEP-0045: fixed joining workflow: - room keep joining workflow state, and display warning when something is received at unexpected moment - state is change immediatly when subject is received (which must be the last thing before live messages), avoiding timing issue if history storage in database is taking too much time - user joined/left data are not saved anymore in history as it takes a lot of space for little interest. A future option may allow to re-enable it if needed.
author Goffi <goffi@goffi.org>
date Fri, 11 May 2018 17:11:47 +0200
parents 5e54afd17321
children 2e6864b1d577
files sat/plugins/plugin_xep_0045.py sat/plugins/plugin_xep_0054.py
diffstat 2 files changed, 91 insertions(+), 86 deletions(-) [+]
line wrap: on
line diff
--- a/sat/plugins/plugin_xep_0045.py	Thu May 10 08:34:44 2018 +0200
+++ b/sat/plugins/plugin_xep_0045.py	Fri May 11 17:11:47 2018 +0200
@@ -58,6 +58,11 @@
 ROOM_USER_LEFT = 'ROOM_USER_LEFT'
 OCCUPANT_KEYS = ('nick', 'entity', 'affiliation', 'role')
 ENTITY_TYPE_MUC = "MUC"
+ROOM_STATE_OCCUPANTS = "occupants"
+ROOM_STATE_SELF_PRESENCE = "self-presence"
+ROOM_STATE_LIVE = "live"
+ROOM_STATES = (ROOM_STATE_OCCUPANTS, ROOM_STATE_SELF_PRESENCE, ROOM_STATE_LIVE)
+
 
 CONFIG_SECTION = u'plugin muc'
 
@@ -105,14 +110,14 @@
             self.text_cmds.addWhoIsCb(self._whois, 100)
 
         host.trigger.add("presence_available", self.presenceTrigger)
-        host.trigger.add("MessageReceived", self.MessageReceivedTrigger, priority=1000000)
+        host.trigger.add("MessageReceived", self.messageReceivedTrigger, priority=1000000)
 
     def profileConnected(self, client):
         def assign_service(service):
             client.muc_service = service
         return self.getMUCService(client).addCallback(assign_service)
 
-    def MessageReceivedTrigger(self, client, message_elt, post_treat):
+    def messageReceivedTrigger(self, client, message_elt, post_treat):
         if message_elt.getAttribute("type") == C.MESS_TYPE_GROUPCHAT:
             if message_elt.subject or message_elt.delay:
                 return False
@@ -120,8 +125,10 @@
             room_jid = from_jid.userhostJID()
             if room_jid in client._muc_client.joined_rooms:
                 room = client._muc_client.joined_rooms[room_jid]
-                if not room._room_ok:
-                    log.warning(u"Received non delayed message in a room before its initialisation: {}".format(message_elt.toXml()))
+                if room.state != ROOM_STATE_LIVE:
+                    log.warning(_(u"Received non delayed message in a room before its initialisation: state={state}, msg={msg}").format(
+                        state=room.state,
+                        msg=message_elt.toXml()))
                     room._cache.append(message_elt)
                     return False
             else:
@@ -234,7 +241,7 @@
         """Return rooms where user is"""
         result = []
         for room in client._muc_client.joined_rooms.values():
-            if room._room_ok:
+            if room.state == ROOM_STATE_LIVE:
                 result.append((room.roomJID.userhost(), self._getOccupants(room), room.nick, room.subject))
         return result
 
@@ -253,46 +260,6 @@
         self.checkRoomJoined(client, room_jid)
         return client._muc_client.joined_rooms[room_jid].nick
 
-    # FIXME: broken, to be removed !
-    # def getRoomEntityNick(self, client, room_jid, entity_jid, =True):
-    #     """Returns the nick of the given user in the room.
-
-    #     @param room (wokkel.muc.Room): the room
-    #     @param user (jid.JID): bare JID of the user
-    #     @param secure (bool): set to True for a secure check
-    #     @return: unicode or None if the user didn't join the room.
-    #     """
-    #     for user in room.roster.values():
-    #         if user.entity is not None:
-    #             if user.entity.userhostJID() == user_jid.userhostJID():
-    #                 return user.nick
-    #         elif not secure:
-    #             # FIXME: this is NOT ENOUGH to check an identity!!
-    #             # See in which conditions user.entity could be None.
-    #             if user.nick == user_jid.user:
-    #                 return user.nick
-    #     return None
-
-    # def getRoomNicksOfUsers(self, room, users=[], secure=True):
-    #     """Returns the nicks of the given users in the room.
-
-    #     @param room (wokkel.muc.Room): the room
-    #     @param users (list[jid.JID]): list of users
-    #     @param secure (True): set to True for a secure check
-    #     @return: a couple (x, y) with:
-    #         - x (list[unicode]): nicks of the users who are in the room
-    #         - y (list[jid.JID]): JID of the missing users.
-    #     """
-    #     nicks = []
-    #     missing = []
-    #     for user in users:
-    #         nick = self.getRoomNickOfUser(room, user, secure)
-    #         if nick is None:
-    #             missing.append(user)
-    #         else:
-    #             nicks.append(nick)
-    #     return nicks, missing
-
     def _configureRoom(self, room_jid_s, profile_key=C.PROF_KEY_NONE):
         client = self.host.getClient(profile_key)
         d = self.configureRoom(client, jid.JID(room_jid_s))
@@ -456,18 +423,13 @@
             nick = client.jid.user
         if options is None:
             options = {}
-        def _errDeferred(exc_obj=Exception, txt=u'Error while joining room'):
-            d = defer.Deferred()
-            d.errback(exc_obj(txt))
-            return d
-
         if room_jid in client._muc_client.joined_rooms:
             room = client._muc_client.joined_rooms[room_jid]
-            log.warning(_(u'{profile} is already in room {room_jid}').format(profile=client.profile, room_jid = room_jid.userhost()))
+            log.info(_(u'{profile} is already in room {room_jid}').format(profile=client.profile, room_jid = room_jid.userhost()))
             return defer.fail(AlreadyJoined(room))
         log.info(_(u"[{profile}] is joining room {room} with nick {nick}").format(profile=client.profile, room=room_jid.userhost(), nick=nick))
 
-        password = options["password"] if "password" in options else None
+        password = options.get("password")
 
         return client._muc_client.join(room_jid, nick, password).addCallbacks(self._joinCb, self._joinEb, (client, room_jid, nick), errbackArgs=(client, room_jid, nick, password))
 
@@ -776,7 +738,6 @@
         self.plugin_parent = plugin_parent
         self.host = plugin_parent.host
         muc.MUCClient.__init__(self)
-        self.rec_subjects = {}
         self._changing_nicks = set()  # used to keep trace of who is changing nick,
                                       # and to discard userJoinedRoom signal in this case
         print "init SatMUCClient OK"
@@ -785,18 +746,45 @@
     def joined_rooms(self):
         return self._rooms
 
+    def changeRoomState(self, room, new_state):
+        """Check that room is in expected state, and change it
+
+        @param new_state: one of ROOM_STATE_*
+        """
+        new_state_idx = ROOM_STATES.index(new_state)
+        if new_state_idx == -1:
+            raise exceptions.InternalError("unknown room state")
+        if new_state_idx < 1:
+            raise exceptions.InternalError("unexpected new room state ({room}): {state}".format(
+                room=room.userhost(),
+                state=new_state))
+        expected_state = ROOM_STATES[new_state_idx-1]
+        if room.state != expected_state:
+            log.warning(_(u"room {room} is not in expected state: room is in state {current_state} while we were expecting {expected_state}").format(
+                room=room.userhost(),
+                current_state=room.state,
+                expected_state=expected_state))
+        room.state = new_state
+
     def _addRoom(self, room):
         super(SatMUCClient, self)._addRoom(room)
         room._roster_ok = False  # True when occupants list has been fully received
-        room._room_ok = None  # False when roster, history and subject are available
-                              # True when new messages are saved to database
+        room.state = ROOM_STATE_OCCUPANTS
         room._history_d = defer.Deferred()  # used to send bridge signal once backlog are written in history
         room._history_d.callback(None)
         # FIXME: check if history_d is not redundant with fully_joined
         room.fully_joined = defer.Deferred()  # called when everything is OK
+        # cache data until room is ready
+        # list of elements which will be re-injected in stream
         room._cache = []
 
-    def _gotLastDbHistory(self, mess_data_list, room_jid, nick, password):
+    @defer.inlineCallbacks
+    def join(self, room_jid, nick, password=None):
+        mess_data_list = yield self.host.memory.historyGet(self.parent.jid.userhostJID(),
+                                                           room_jid,
+                                                           1,
+                                                           True,
+                                                           profile=self.parent.profile)
         if mess_data_list:
             timestamp = mess_data_list[0][1]
             # we use seconds since last message to get backlog without duplicates
@@ -804,13 +792,8 @@
             seconds = int(time.time() - timestamp) - 1
         else:
             seconds = None
-        d = super(SatMUCClient, self).join(room_jid, nick, muc.HistoryOptions(seconds=seconds), password)
-        return d
-
-    def join(self, room_jid, nick, password=None):
-        d = self.host.memory.historyGet(self.parent.jid.userhostJID(), room_jid, 1, True, profile=self.parent.profile)
-        d.addCallback(self._gotLastDbHistory, room_jid, nick, password)
-        return d
+        room = yield super(SatMUCClient, self).join(room_jid, nick, muc.HistoryOptions(seconds=seconds), password)
+        defer.returnValue(room)
 
     ## presence/roster ##
 
@@ -828,6 +811,10 @@
 
         if user is None:
             nick = presence.sender.resource
+            if not nick:
+                log.warning(_(u"missing nick in presence: {xml}").format(
+                    xml = presence.toElement().toXml()))
+                return
             user = muc.User(nick, presence.entity)
 
         # Update user data
@@ -867,17 +854,20 @@
     def userJoinedRoom(self, room, user):
         if user.nick == room.nick:
             # we have received our own nick, this mean that the full room roster was received
-            room._roster_ok = True
+            self.changeRoomState(room, ROOM_STATE_SELF_PRESENCE)
             log.debug(u"room {room} joined with nick {nick}".format(room=room.occupantJID.userhost(), nick=user.nick))
-            # We set type so we don't have use a deferred with disco to check entity type
+            # We set type so we don't have to use a deferred with disco to check entity type
             self.host.memory.updateEntityData(room.roomJID, C.ENTITY_TYPE, ENTITY_TYPE_MUC, profile_key=self.parent.profile)
-        elif not room._room_ok:
-            log.warning(u"Received user presence data in a room before its initialisation (and after our own presence),"
-                "this is not standard! Ignoring it: {} ({})".format(
-                room.roomJID.userhost(),
-                user.nick))
+        elif room.state not in (ROOM_STATE_OCCUPANTS, ROOM_STATE_LIVE):
+            log.warning(u"Received user presence data in a room before its initialisation (current state: {state}),"
+                "this is not standard! Ignoring it: {room} ({nick})".format(
+                state=room.state,
+                room=room.roomJID.userhost(),
+                nick=user.nick))
             return
-        elif room._roster_ok:
+        else:
+            if not room.fully_joined.called:
+                return
             try:
                 self._changing_nicks.remove(user.nick)
             except KeyError:
@@ -903,7 +893,10 @@
                     "extra": extra,
                     "timestamp": time.time(),
                 }
-                self.parent.messageAddToHistory(mess_data)
+                # FIXME: we disable presence in history as it's taking a lot of space
+                #        while not indispensable. In the future an option may allow
+                #        to re-enable it
+                # self.parent.messageAddToHistory(mess_data)
                 self.parent.messageSendToBridge(mess_data)
 
 
@@ -917,13 +910,16 @@
                 room = room_jid_s, profile = self.parent.profile))
             self.host.memory.delEntityCache(room.roomJID, profile_key=self.parent.profile)
             self.host.bridge.mucRoomLeft(room.roomJID.userhost(), self.parent.profile)
-        elif not room._room_ok:
-            log.warning(u"Received user presence data in a room before its initialisation (and after our own presence),"
-                "this is not standard! Ignoring it: {} ({})".format(
-                room.roomJID.userhost(),
-                user.nick))
+        elif room.state != ROOM_STATE_LIVE:
+            log.warning(u"Received user presence data in a room before its initialisation (current state: {state}),"
+                "this is not standard! Ignoring it: {room} ({nick})".format(
+                state=room.state,
+                room=room.roomJID.userhost(),
+                nick=user.nick))
             return
         else:
+            if not room.fully_joined.called:
+                return
             log.debug(_(u"user {nick} left room {room_id}").format(nick=user.nick, room_id=room.occupantJID.userhost()))
             extra = {'info_type': ROOM_USER_LEFT,
                      'user_affiliation': user.affiliation,
@@ -942,7 +938,8 @@
                 "extra": extra,
                 "timestamp": time.time(),
             }
-            self.parent.messageAddToHistory(mess_data)
+            # FIXME: disable history, see userJoinRoom comment
+            # self.parent.messageAddToHistory(mess_data)
             self.parent.messageSendToBridge(mess_data)
 
     def userChangedNick(self, room, user, new_nick):
@@ -992,6 +989,10 @@
             None if the message come from the room
         @param message(muc.GroupChat): the parsed message
         """
+        if room.state != ROOM_STATE_SELF_PRESENCE:
+            log.warning(_(u"received history in unexpected state in room {room} (state: {state})").format(
+                room = room.userhost(),
+                state = room.state))
         room._history_d.addCallback(self._addToHistory, user, message)
         room._history_d.addErrback(self._addToHistoryEb)
 
@@ -1022,12 +1023,20 @@
         return muc.MUCClientProtocol.subject(self, room, subject)
 
     def _historyCb(self, dummy, room):
+        """Called when history have been written to database and subject is received
+
+        this method will finish joining by:
+            - sending message to bridge
+            - calling fully_joined deferred
+            - sending stanza put in cache
+            - cleaning variables not needed anymore
+        """
         args = self.plugin_parent._getRoomJoinedArgs(room, self.parent.profile)
         self.host.bridge.mucRoomJoined(*args)
+        room.fully_joined.callback(room)
         del room._history_d
         cache = room._cache
         del room._cache
-        room._room_ok = True
         for elem in cache:
             self.parent.xmlstream.dispatch(elem)
 
@@ -1039,13 +1048,9 @@
         # when subject is received, we know that we have whole roster and history
         # cf. http://xmpp.org/extensions/xep-0045.html#enter-subject
         room.subject = subject  # FIXME: subject doesn't handle xml:lang
-        self.rec_subjects[room.roomJID.userhost()] = (room.roomJID.userhost(), subject)
-        if room._room_ok is None:
-            # this is the first subject we receive
-            # that mean that we have received everything we need
-            room._room_ok = False
+        if room.state != ROOM_STATE_LIVE:
+            self.changeRoomState(room, ROOM_STATE_LIVE)
             room._history_d.addCallbacks(self._historyCb, self._historyEb, [room], errbackArgs=[room])
-            room.fully_joined.callback(room)
         else:
             # the subject has been changed
             log.debug(_(u"New subject for room ({room_id}): {subject}").format(room_id = room.roomJID.full(), subject = subject))
--- a/sat/plugins/plugin_xep_0054.py	Thu May 10 08:34:44 2018 +0200
+++ b/sat/plugins/plugin_xep_0054.py	Fri May 11 17:11:47 2018 +0200
@@ -145,7 +145,7 @@
         #       Hashes should be shared between profiles (or not ? what
         #       if the avatar is different depending on who is requesting it
         #       this is not possible with vcard-tmp, but it is with XEP-0084).
-        #       Loading avatar on demand per jid may be a option to investigate.
+        #       Loading avatar on demand per jid may be an option to investigate.
         client = self.host.getClient(profile)
         for jid_s, data in client._cache_0054.iteritems():
             jid_ = jid.JID(jid_s)