Mercurial > libervia-backend
diff sat/plugins/plugin_xep_0045.py @ 2581:395a3d1c2888
plugin XEP-0045: fixed joining workflow:
- room keep joining workflow state, and display warning when something is received at unexpected moment
- state is change immediatly when subject is received (which must be the last thing before live messages), avoiding timing issue if history storage in database is taking too much time
- user joined/left data are not saved anymore in history as it takes a lot of space for little interest. A future option may allow to re-enable it if needed.
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 11 May 2018 17:11:47 +0200 |
parents | 26edcf3a30eb |
children | 8378806a70fe |
line wrap: on
line diff
--- a/sat/plugins/plugin_xep_0045.py Thu May 10 08:34:44 2018 +0200 +++ b/sat/plugins/plugin_xep_0045.py Fri May 11 17:11:47 2018 +0200 @@ -58,6 +58,11 @@ ROOM_USER_LEFT = 'ROOM_USER_LEFT' OCCUPANT_KEYS = ('nick', 'entity', 'affiliation', 'role') ENTITY_TYPE_MUC = "MUC" +ROOM_STATE_OCCUPANTS = "occupants" +ROOM_STATE_SELF_PRESENCE = "self-presence" +ROOM_STATE_LIVE = "live" +ROOM_STATES = (ROOM_STATE_OCCUPANTS, ROOM_STATE_SELF_PRESENCE, ROOM_STATE_LIVE) + CONFIG_SECTION = u'plugin muc' @@ -105,14 +110,14 @@ self.text_cmds.addWhoIsCb(self._whois, 100) host.trigger.add("presence_available", self.presenceTrigger) - host.trigger.add("MessageReceived", self.MessageReceivedTrigger, priority=1000000) + host.trigger.add("MessageReceived", self.messageReceivedTrigger, priority=1000000) def profileConnected(self, client): def assign_service(service): client.muc_service = service return self.getMUCService(client).addCallback(assign_service) - def MessageReceivedTrigger(self, client, message_elt, post_treat): + def messageReceivedTrigger(self, client, message_elt, post_treat): if message_elt.getAttribute("type") == C.MESS_TYPE_GROUPCHAT: if message_elt.subject or message_elt.delay: return False @@ -120,8 +125,10 @@ room_jid = from_jid.userhostJID() if room_jid in client._muc_client.joined_rooms: room = client._muc_client.joined_rooms[room_jid] - if not room._room_ok: - log.warning(u"Received non delayed message in a room before its initialisation: {}".format(message_elt.toXml())) + if room.state != ROOM_STATE_LIVE: + log.warning(_(u"Received non delayed message in a room before its initialisation: state={state}, msg={msg}").format( + state=room.state, + msg=message_elt.toXml())) room._cache.append(message_elt) return False else: @@ -234,7 +241,7 @@ """Return rooms where user is""" result = [] for room in client._muc_client.joined_rooms.values(): - if room._room_ok: + if room.state == ROOM_STATE_LIVE: result.append((room.roomJID.userhost(), self._getOccupants(room), room.nick, room.subject)) return result @@ -253,46 +260,6 @@ self.checkRoomJoined(client, room_jid) return client._muc_client.joined_rooms[room_jid].nick - # FIXME: broken, to be removed ! - # def getRoomEntityNick(self, client, room_jid, entity_jid, =True): - # """Returns the nick of the given user in the room. - - # @param room (wokkel.muc.Room): the room - # @param user (jid.JID): bare JID of the user - # @param secure (bool): set to True for a secure check - # @return: unicode or None if the user didn't join the room. - # """ - # for user in room.roster.values(): - # if user.entity is not None: - # if user.entity.userhostJID() == user_jid.userhostJID(): - # return user.nick - # elif not secure: - # # FIXME: this is NOT ENOUGH to check an identity!! - # # See in which conditions user.entity could be None. - # if user.nick == user_jid.user: - # return user.nick - # return None - - # def getRoomNicksOfUsers(self, room, users=[], secure=True): - # """Returns the nicks of the given users in the room. - - # @param room (wokkel.muc.Room): the room - # @param users (list[jid.JID]): list of users - # @param secure (True): set to True for a secure check - # @return: a couple (x, y) with: - # - x (list[unicode]): nicks of the users who are in the room - # - y (list[jid.JID]): JID of the missing users. - # """ - # nicks = [] - # missing = [] - # for user in users: - # nick = self.getRoomNickOfUser(room, user, secure) - # if nick is None: - # missing.append(user) - # else: - # nicks.append(nick) - # return nicks, missing - def _configureRoom(self, room_jid_s, profile_key=C.PROF_KEY_NONE): client = self.host.getClient(profile_key) d = self.configureRoom(client, jid.JID(room_jid_s)) @@ -456,18 +423,13 @@ nick = client.jid.user if options is None: options = {} - def _errDeferred(exc_obj=Exception, txt=u'Error while joining room'): - d = defer.Deferred() - d.errback(exc_obj(txt)) - return d - if room_jid in client._muc_client.joined_rooms: room = client._muc_client.joined_rooms[room_jid] - log.warning(_(u'{profile} is already in room {room_jid}').format(profile=client.profile, room_jid = room_jid.userhost())) + log.info(_(u'{profile} is already in room {room_jid}').format(profile=client.profile, room_jid = room_jid.userhost())) return defer.fail(AlreadyJoined(room)) log.info(_(u"[{profile}] is joining room {room} with nick {nick}").format(profile=client.profile, room=room_jid.userhost(), nick=nick)) - password = options["password"] if "password" in options else None + password = options.get("password") return client._muc_client.join(room_jid, nick, password).addCallbacks(self._joinCb, self._joinEb, (client, room_jid, nick), errbackArgs=(client, room_jid, nick, password)) @@ -776,7 +738,6 @@ self.plugin_parent = plugin_parent self.host = plugin_parent.host muc.MUCClient.__init__(self) - self.rec_subjects = {} self._changing_nicks = set() # used to keep trace of who is changing nick, # and to discard userJoinedRoom signal in this case print "init SatMUCClient OK" @@ -785,18 +746,45 @@ def joined_rooms(self): return self._rooms + def changeRoomState(self, room, new_state): + """Check that room is in expected state, and change it + + @param new_state: one of ROOM_STATE_* + """ + new_state_idx = ROOM_STATES.index(new_state) + if new_state_idx == -1: + raise exceptions.InternalError("unknown room state") + if new_state_idx < 1: + raise exceptions.InternalError("unexpected new room state ({room}): {state}".format( + room=room.userhost(), + state=new_state)) + expected_state = ROOM_STATES[new_state_idx-1] + if room.state != expected_state: + log.warning(_(u"room {room} is not in expected state: room is in state {current_state} while we were expecting {expected_state}").format( + room=room.userhost(), + current_state=room.state, + expected_state=expected_state)) + room.state = new_state + def _addRoom(self, room): super(SatMUCClient, self)._addRoom(room) room._roster_ok = False # True when occupants list has been fully received - room._room_ok = None # False when roster, history and subject are available - # True when new messages are saved to database + room.state = ROOM_STATE_OCCUPANTS room._history_d = defer.Deferred() # used to send bridge signal once backlog are written in history room._history_d.callback(None) # FIXME: check if history_d is not redundant with fully_joined room.fully_joined = defer.Deferred() # called when everything is OK + # cache data until room is ready + # list of elements which will be re-injected in stream room._cache = [] - def _gotLastDbHistory(self, mess_data_list, room_jid, nick, password): + @defer.inlineCallbacks + def join(self, room_jid, nick, password=None): + mess_data_list = yield self.host.memory.historyGet(self.parent.jid.userhostJID(), + room_jid, + 1, + True, + profile=self.parent.profile) if mess_data_list: timestamp = mess_data_list[0][1] # we use seconds since last message to get backlog without duplicates @@ -804,13 +792,8 @@ seconds = int(time.time() - timestamp) - 1 else: seconds = None - d = super(SatMUCClient, self).join(room_jid, nick, muc.HistoryOptions(seconds=seconds), password) - return d - - def join(self, room_jid, nick, password=None): - d = self.host.memory.historyGet(self.parent.jid.userhostJID(), room_jid, 1, True, profile=self.parent.profile) - d.addCallback(self._gotLastDbHistory, room_jid, nick, password) - return d + room = yield super(SatMUCClient, self).join(room_jid, nick, muc.HistoryOptions(seconds=seconds), password) + defer.returnValue(room) ## presence/roster ## @@ -828,6 +811,10 @@ if user is None: nick = presence.sender.resource + if not nick: + log.warning(_(u"missing nick in presence: {xml}").format( + xml = presence.toElement().toXml())) + return user = muc.User(nick, presence.entity) # Update user data @@ -867,17 +854,20 @@ def userJoinedRoom(self, room, user): if user.nick == room.nick: # we have received our own nick, this mean that the full room roster was received - room._roster_ok = True + self.changeRoomState(room, ROOM_STATE_SELF_PRESENCE) log.debug(u"room {room} joined with nick {nick}".format(room=room.occupantJID.userhost(), nick=user.nick)) - # We set type so we don't have use a deferred with disco to check entity type + # We set type so we don't have to use a deferred with disco to check entity type self.host.memory.updateEntityData(room.roomJID, C.ENTITY_TYPE, ENTITY_TYPE_MUC, profile_key=self.parent.profile) - elif not room._room_ok: - log.warning(u"Received user presence data in a room before its initialisation (and after our own presence)," - "this is not standard! Ignoring it: {} ({})".format( - room.roomJID.userhost(), - user.nick)) + elif room.state not in (ROOM_STATE_OCCUPANTS, ROOM_STATE_LIVE): + log.warning(u"Received user presence data in a room before its initialisation (current state: {state})," + "this is not standard! Ignoring it: {room} ({nick})".format( + state=room.state, + room=room.roomJID.userhost(), + nick=user.nick)) return - elif room._roster_ok: + else: + if not room.fully_joined.called: + return try: self._changing_nicks.remove(user.nick) except KeyError: @@ -903,7 +893,10 @@ "extra": extra, "timestamp": time.time(), } - self.parent.messageAddToHistory(mess_data) + # FIXME: we disable presence in history as it's taking a lot of space + # while not indispensable. In the future an option may allow + # to re-enable it + # self.parent.messageAddToHistory(mess_data) self.parent.messageSendToBridge(mess_data) @@ -917,13 +910,16 @@ room = room_jid_s, profile = self.parent.profile)) self.host.memory.delEntityCache(room.roomJID, profile_key=self.parent.profile) self.host.bridge.mucRoomLeft(room.roomJID.userhost(), self.parent.profile) - elif not room._room_ok: - log.warning(u"Received user presence data in a room before its initialisation (and after our own presence)," - "this is not standard! Ignoring it: {} ({})".format( - room.roomJID.userhost(), - user.nick)) + elif room.state != ROOM_STATE_LIVE: + log.warning(u"Received user presence data in a room before its initialisation (current state: {state})," + "this is not standard! Ignoring it: {room} ({nick})".format( + state=room.state, + room=room.roomJID.userhost(), + nick=user.nick)) return else: + if not room.fully_joined.called: + return log.debug(_(u"user {nick} left room {room_id}").format(nick=user.nick, room_id=room.occupantJID.userhost())) extra = {'info_type': ROOM_USER_LEFT, 'user_affiliation': user.affiliation, @@ -942,7 +938,8 @@ "extra": extra, "timestamp": time.time(), } - self.parent.messageAddToHistory(mess_data) + # FIXME: disable history, see userJoinRoom comment + # self.parent.messageAddToHistory(mess_data) self.parent.messageSendToBridge(mess_data) def userChangedNick(self, room, user, new_nick): @@ -992,6 +989,10 @@ None if the message come from the room @param message(muc.GroupChat): the parsed message """ + if room.state != ROOM_STATE_SELF_PRESENCE: + log.warning(_(u"received history in unexpected state in room {room} (state: {state})").format( + room = room.userhost(), + state = room.state)) room._history_d.addCallback(self._addToHistory, user, message) room._history_d.addErrback(self._addToHistoryEb) @@ -1022,12 +1023,20 @@ return muc.MUCClientProtocol.subject(self, room, subject) def _historyCb(self, dummy, room): + """Called when history have been written to database and subject is received + + this method will finish joining by: + - sending message to bridge + - calling fully_joined deferred + - sending stanza put in cache + - cleaning variables not needed anymore + """ args = self.plugin_parent._getRoomJoinedArgs(room, self.parent.profile) self.host.bridge.mucRoomJoined(*args) + room.fully_joined.callback(room) del room._history_d cache = room._cache del room._cache - room._room_ok = True for elem in cache: self.parent.xmlstream.dispatch(elem) @@ -1039,13 +1048,9 @@ # when subject is received, we know that we have whole roster and history # cf. http://xmpp.org/extensions/xep-0045.html#enter-subject room.subject = subject # FIXME: subject doesn't handle xml:lang - self.rec_subjects[room.roomJID.userhost()] = (room.roomJID.userhost(), subject) - if room._room_ok is None: - # this is the first subject we receive - # that mean that we have received everything we need - room._room_ok = False + if room.state != ROOM_STATE_LIVE: + self.changeRoomState(room, ROOM_STATE_LIVE) room._history_d.addCallbacks(self._historyCb, self._historyEb, [room], errbackArgs=[room]) - room.fully_joined.callback(room) else: # the subject has been changed log.debug(_(u"New subject for room ({room_id}): {subject}").format(room_id = room.roomJID.full(), subject = subject))