diff sat/plugins/plugin_xep_0045.py @ 2715:b35c84ea73cf

plugin XEP-0045: MAM implementation for MUC
author Goffi <goffi@goffi.org>
date Fri, 07 Dec 2018 19:13:28 +0100
parents d715d912afac
children bb6adaa580ee
line wrap: on
line diff
--- a/sat/plugins/plugin_xep_0045.py	Fri Dec 07 17:46:50 2018 +0100
+++ b/sat/plugins/plugin_xep_0045.py	Fri Dec 07 19:13:28 2018 +0100
@@ -24,12 +24,10 @@
 from twisted.internet import defer
 from twisted.words.protocols.jabber import jid
 from twisted.python import failure
-from dateutil.tz import tzutc
 
 from sat.core import exceptions
 from sat.memory import memory
 
-import calendar
 import time
 import uuid
 import copy
@@ -39,14 +37,18 @@
 
 from zope.interface import implements
 
+# XXX: mam and rsm come from sat_tmp.wokkel
+from wokkel import rsm
+from wokkel import mam
+
 
 PLUGIN_INFO = {
     C.PI_NAME: "XEP-0045 Plugin",
     C.PI_IMPORT_NAME: "XEP-0045",
     C.PI_TYPE: "XEP",
     C.PI_PROTOCOLS: ["XEP-0045"],
-    C.PI_DEPENDENCIES: [],
-    C.PI_RECOMMENDATIONS: [C.TEXT_CMDS],
+    C.PI_DEPENDENCIES: ["XEP-0359"],
+    C.PI_RECOMMENDATIONS: [C.TEXT_CMDS, u"XEP-0313"],
     C.PI_MAIN: "XEP_0045",
     C.PI_HANDLER: "yes",
     C.PI_DESCRIPTION: _("""Implementation of Multi-User Chat""")
@@ -62,6 +64,8 @@
 ROOM_STATE_SELF_PRESENCE = "self-presence"
 ROOM_STATE_LIVE = "live"
 ROOM_STATES = (ROOM_STATE_OCCUPANTS, ROOM_STATE_SELF_PRESENCE, ROOM_STATE_LIVE)
+HISTORY_LEGACY = u"legacy"
+HISTORY_MAM = u"mam"
 
 
 CONFIG_SECTION = u'plugin muc'
@@ -109,14 +113,30 @@
             self.text_cmds.registerTextCommands(self)
             self.text_cmds.addWhoIsCb(self._whois, 100)
 
+        self._mam = self.host.plugins.get(u"XEP-0313")
+        self._si = self.host.plugins[u"XEP-0359"]
+
         host.trigger.add("presence_available", self.presenceTrigger)
         host.trigger.add("MessageReceived", self.messageReceivedTrigger, priority=1000000)
+        host.trigger.add("message_parse", self._message_parseTrigger)
 
     def profileConnected(self, client):
         def assign_service(service):
             client.muc_service = service
         return self.getMUCService(client).addCallback(assign_service)
 
+    def _message_parseTrigger(self, client, message_elt, data):
+        """Add stanza-id from the room if present"""
+        if message_elt.getAttribute(u"type") != C.MESS_TYPE_GROUPCHAT:
+            return True
+
+        # stanza_id will not be filled by parseMessage because the emitter
+        # is the room and not our server, so we have to parse it here
+        room_jid = data[u"from"].userhostJID()
+        stanza_id = self._si.getStanzaId(message_elt, room_jid)
+        if stanza_id:
+            data[u"extra"][u"stanza_id"] = stanza_id
+
     def messageReceivedTrigger(self, client, message_elt, post_treat):
         if message_elt.getAttribute("type") == C.MESS_TYPE_GROUPCHAT:
             if message_elt.subject or message_elt.delay:
@@ -126,13 +146,21 @@
             if room_jid in client._muc_client.joined_rooms:
                 room = client._muc_client.joined_rooms[room_jid]
                 if room.state != ROOM_STATE_LIVE:
-                    log.warning(_(u"Received non delayed message in a room before its initialisation: state={state}, msg={msg}").format(
+                    if getattr(room, "_history_type", HISTORY_LEGACY) == HISTORY_LEGACY:
+                        # With MAM history, order is different, and we can get live
+                        # messages before history is complete, so this is not a warning
+                        # but an expected case.
+                        # On the other hand, with legacy history, it's not normal.
+                        log.warning(_(
+                            u"Received non delayed message in a room before its "
+                            u"initialisation: state={state}, msg={msg}").format(
                         state=room.state,
                         msg=message_elt.toXml()))
                     room._cache.append(message_elt)
                     return False
             else:
-                log.warning(u"Received groupchat message for a room which has not been joined, ignoring it: {}".format(message_elt.toXml()))
+                log.warning(u"Received groupchat message for a room which has not been "
+                            u"joined, ignoring it: {}".format(message_elt.toXml()))
                 return False
         return True
 
@@ -197,7 +225,7 @@
             # a proper configuration management should be done
             log.debug(_(u"room locked !"))
             d = client._muc_client.configure(room.roomJID, {})
-            d.addErrback(lambda dummy: log.error(_(u'Error while configuring the room')))
+            d.addErrback(lambda __: log.error(_(u'Error while configuring the room')))
         return room.fully_joined
 
     def _joinEb(self, failure, client, room_jid, nick, password):
@@ -580,7 +608,7 @@
 
         d = self.kick(client, nick, mess_data["to"], {} if len(options) == 1 else {'reason': options[1]})
 
-        def cb(dummy):
+        def cb(__):
             feedback_msg = _(u'You have kicked {}').format(nick)
             if len(options) > 1:
                 feedback_msg += _(u' for the following reason: {}').format(options[1])
@@ -609,7 +637,7 @@
 
         d = self.ban(client, entity_jid, mess_data["to"], {} if len(options) == 1 else {'reason': options[1]})
 
-        def cb(dummy):
+        def cb(__):
             feedback_msg = _(u'You have banned {}').format(entity_jid)
             if len(options) > 1:
                 feedback_msg += _(u' for the following reason: {}').format(options[1])
@@ -648,7 +676,7 @@
 
         d = self.affiliate(client, entity_jid, mess_data["to"], {'affiliation': affiliation})
 
-        def cb(dummy):
+        def cb(__):
             feedback_msg = _(u'New affiliation for %(entity)s: %(affiliation)s').format(entity=entity_jid, affiliation=affiliation)
             self.text_cmds.feedBack(client, feedback_msg, mess_data)
             return True
@@ -744,7 +772,6 @@
 
     def __init__(self, plugin_parent):
         self.plugin_parent = plugin_parent
-        self.host = plugin_parent.host
         muc.MUCClient.__init__(self)
         self._changing_nicks = set()  # used to keep trace of who is changing nick,
                                       # and to discard userJoinedRoom signal in this case
@@ -754,6 +781,22 @@
     def joined_rooms(self):
         return self._rooms
 
+    @property
+    def host(self):
+        return self.plugin_parent.host
+
+    @property
+    def client(self):
+        return self.parent
+
+    @property
+    def _mam(self):
+        return self.plugin_parent._mam
+
+    @property
+    def _si(self):
+        return self.plugin_parent._si
+
     def changeRoomState(self, room, new_state):
         """Check that room is in expected state, and change it
 
@@ -768,7 +811,9 @@
                 state=new_state))
         expected_state = ROOM_STATES[new_state_idx-1]
         if room.state != expected_state:
-            log.warning(_(u"room {room} is not in expected state: room is in state {current_state} while we were expecting {expected_state}").format(
+            log.error(_(
+                u"room {room} is not in expected state: room is in state {current_state} "
+                u"while we were expecting {expected_state}").format(
                 room=room.userhost(),
                 current_state=room.state,
                 expected_state=expected_state))
@@ -778,8 +823,6 @@
         super(SatMUCClient, self)._addRoom(room)
         room._roster_ok = False  # True when occupants list has been fully received
         room.state = ROOM_STATE_OCCUPANTS
-        room._history_d = defer.Deferred()  # used to send bridge signal once backlog are written in history
-        room._history_d.callback(None)
         # FIXME: check if history_d is not redundant with fully_joined
         room.fully_joined = defer.Deferred()  # called when everything is OK
         # cache data until room is ready
@@ -787,12 +830,13 @@
         room._cache = []
 
     @defer.inlineCallbacks
-    def join(self, room_jid, nick, password=None):
-        mess_data_list = yield self.host.memory.historyGet(self.parent.jid.userhostJID(),
-                                                           room_jid,
-                                                           1,
-                                                           True,
-                                                           profile=self.parent.profile)
+    def _joinLegacy(self, client, room_jid, nick, password):
+        """Join room an retrieve history with legacy method"""
+        mess_data_list = yield self.host.memory.historyGet(room_jid,
+                                                           client.jid.userhostJID(),
+                                                           limit=1,
+                                                           between=True,
+                                                           profile=client.profile)
         if mess_data_list:
             timestamp = mess_data_list[0][1]
             # we use seconds since last message to get backlog without duplicates
@@ -800,9 +844,89 @@
             seconds = int(time.time() - timestamp) - 1
         else:
             seconds = None
-        room = yield super(SatMUCClient, self).join(room_jid, nick, muc.HistoryOptions(seconds=seconds), password)
+
+        room = yield super(SatMUCClient, self).join(
+            room_jid, nick, muc.HistoryOptions(seconds=seconds), password)
+        # used to send bridge signal once backlog are written in history
+        room._history_type = HISTORY_LEGACY
+        room._history_d = defer.Deferred()
+        room._history_d.callback(None)
         defer.returnValue(room)
 
+    @defer.inlineCallbacks
+    def _joinMAM(self, client, room_jid, nick, password):
+        """Join room and retrieve history using MAM"""
+        room = yield super(SatMUCClient, self).join(
+            # we don't want any history from room as we'll get it with MAM
+            room_jid, nick, muc.HistoryOptions(maxStanzas=0), password=password)
+        room._history_type = HISTORY_MAM
+        room._history_d = defer.Deferred()
+
+        last_mess = yield self.host.memory.historyGet(room_jid,
+                                                      None,
+                                                      limit=1,
+                                                      between=False,
+                                                      filters={u'last_stanza_id': True},
+                                                      profile=client.profile)
+        if last_mess:
+            stanza_id = last_mess[0][-1][u'stanza_id']
+            rsm_req = rsm.RSMRequest(after=stanza_id)
+        else:
+            log.info(u"We have no MAM archive for room {room_jid}.".format(
+                room_jid=room_jid))
+            rsm_req = rsm.RSMRequest(max_=20)
+
+        mam_req = mam.MAMRequest(rsm_=rsm_req)
+        mam_data = yield self._mam.getArchives(client, mam_req,
+                                               service=room_jid)
+        elt_list, rsm_response = mam_data
+
+        if not elt_list:
+            log.info(_(u"No message received while offline in {room_jid}".format(
+                room_jid=room_jid)))
+        else:
+            log.info(
+                _(u"We have received {num_mess} message(s) in {room_jid} while offline.")
+                .format(num_mess=len(elt_list), room_jid=room_jid))
+
+            for mess_elt in elt_list:
+                try:
+                    fwd_message_elt = self._mam.getMessageFromResult(
+                        client, mess_elt, mam_req, service=room_jid)
+                except exceptions.DataError:
+                    continue
+                if fwd_message_elt.getAttribute(u"to"):
+                    log.warning(
+                        u'Forwarded message element has a "to" attribute while it is '
+                        u'forbidden by specifications')
+                fwd_message_elt[u"to"] = client.jid.full()
+                mess_data = client.messageProt.parseMessage(fwd_message_elt)
+                # we attache parsed message data to element, to avoid parsing
+                # again in _addToHistory
+                fwd_message_elt._mess_data = mess_data
+                # and we inject to MUC workflow
+                client._muc_client._onGroupChat(fwd_message_elt)
+
+
+        # for legacy history, the following steps are done in receivedSubject but for MAM
+        # the order is different (we have to join then get MAM archive, so subject
+        # is received before archive), so we change state and add the callbacks here.
+        self.changeRoomState(room, ROOM_STATE_LIVE)
+        room._history_d.addCallbacks(self._historyCb, self._historyEb, [room],
+                                     errbackArgs=[room])
+
+        # callback is done now that all needed Deferred have been added to _history_d
+        room._history_d.callback(None)
+
+        defer.returnValue(room)
+
+    def join(self, room_jid, nick, password=None):
+        if (not self._mam or not self.host.hasFeature(self.client,
+                                                      mam.NS_MAM, room_jid)):
+            return self._joinLegacy(self.client, room_jid, nick, password)
+        else:
+            return self._joinMAM(self.client, room_jid, nick, password)
+
     ## presence/roster ##
 
     def availableReceived(self, presence):
@@ -865,7 +989,7 @@
             self.changeRoomState(room, ROOM_STATE_SELF_PRESENCE)
             log.debug(u"room {room} joined with nick {nick}".format(room=room.occupantJID.userhost(), nick=user.nick))
             # We set type so we don't have to use a deferred with disco to check entity type
-            self.host.memory.updateEntityData(room.roomJID, C.ENTITY_TYPE, ENTITY_TYPE_MUC, profile_key=self.parent.profile)
+            self.host.memory.updateEntityData(room.roomJID, C.ENTITY_TYPE, ENTITY_TYPE_MUC, profile_key=self.client.profile)
         elif room.state not in (ROOM_STATE_OCCUPANTS, ROOM_STATE_LIVE):
             log.warning(u"Received user presence data in a room before its initialisation (current state: {state}),"
                 "this is not standard! Ignoring it: {room} ({nick})".format(
@@ -881,7 +1005,7 @@
             except KeyError:
                 # this is a new user
                 log.debug(_(u"user {nick} has joined room {room_id}").format(nick=user.nick, room_id=room.occupantJID.userhost()))
-                if not self.host.trigger.point("MUC user joined", room, user, self.parent.profile):
+                if not self.host.trigger.point("MUC user joined", room, user, self.client.profile):
                     return
 
                 extra = {'info_type': ROOM_USER_JOINED,
@@ -893,7 +1017,7 @@
                     extra['user_entity'] = user.entity.full()
                 mess_data = {  # dict is similar to the one used in client.onMessage
                     "from": room.roomJID,
-                    "to": self.parent.jid,
+                    "to": self.client.jid,
                     "uid": unicode(uuid.uuid4()),
                     "message": {'': D_(u"=> {} has joined the room").format(user.nick)},
                     "subject": {},
@@ -904,20 +1028,20 @@
                 # FIXME: we disable presence in history as it's taking a lot of space
                 #        while not indispensable. In the future an option may allow
                 #        to re-enable it
-                # self.parent.messageAddToHistory(mess_data)
-                self.parent.messageSendToBridge(mess_data)
+                # self.client.messageAddToHistory(mess_data)
+                self.client.messageSendToBridge(mess_data)
 
 
     def userLeftRoom(self, room, user):
-        if not self.host.trigger.point("MUC user left", room, user, self.parent.profile):
+        if not self.host.trigger.point("MUC user left", room, user, self.client.profile):
             return
         if user.nick == room.nick:
             # we left the room
             room_jid_s = room.roomJID.userhost()
             log.info(_(u"Room ({room}) left ({profile})").format(
-                room = room_jid_s, profile = self.parent.profile))
-            self.host.memory.delEntityCache(room.roomJID, profile_key=self.parent.profile)
-            self.host.bridge.mucRoomLeft(room.roomJID.userhost(), self.parent.profile)
+                room = room_jid_s, profile = self.client.profile))
+            self.host.memory.delEntityCache(room.roomJID, profile_key=self.client.profile)
+            self.host.bridge.mucRoomLeft(room.roomJID.userhost(), self.client.profile)
         elif room.state != ROOM_STATE_LIVE:
             log.warning(u"Received user presence data in a room before its initialisation (current state: {state}),"
                 "this is not standard! Ignoring it: {room} ({nick})".format(
@@ -938,7 +1062,7 @@
                 extra['user_entity'] = user.entity.full()
             mess_data = {  # dict is similar to the one used in client.onMessage
                 "from": room.roomJID,
-                "to": self.parent.jid,
+                "to": self.client.jid,
                 "uid": unicode(uuid.uuid4()),
                 "message": {'': D_(u"<= {} has left the room").format(user.nick)},
                 "subject": {},
@@ -947,40 +1071,29 @@
                 "timestamp": time.time(),
             }
             # FIXME: disable history, see userJoinRoom comment
-            # self.parent.messageAddToHistory(mess_data)
-            self.parent.messageSendToBridge(mess_data)
+            # self.client.messageAddToHistory(mess_data)
+            self.client.messageSendToBridge(mess_data)
 
     def userChangedNick(self, room, user, new_nick):
-        self.host.bridge.mucRoomUserChangedNick(room.roomJID.userhost(), user.nick, new_nick, self.parent.profile)
+        self.host.bridge.mucRoomUserChangedNick(room.roomJID.userhost(), user.nick, new_nick, self.client.profile)
 
     def userUpdatedStatus(self, room, user, show, status):
-        self.host.bridge.presenceUpdate(room.roomJID.userhost() + '/' + user.nick, show or '', 0, {C.PRESENCE_STATUSES_DEFAULT: status or ''}, self.parent.profile)
+        self.host.bridge.presenceUpdate(room.roomJID.userhost() + '/' + user.nick, show or '', 0, {C.PRESENCE_STATUSES_DEFAULT: status or ''}, self.client.profile)
 
     ## messages ##
 
     def receivedGroupChat(self, room, user, body):
         log.debug(u'receivedGroupChat: room=%s user=%s body=%s' % (room.roomJID.full(), user, body))
 
-    def _addToHistory(self, dummy, user, message):
-        # we check if message is not in history
-        # and raise ConflictError else
-        stamp = message.delay.stamp.astimezone(tzutc()).timetuple()
-        timestamp = float(calendar.timegm(stamp))
-        data = {  # dict is similar to the one used in client.onMessage
-            "from": message.sender,
-            "to": message.recipient,
-            "uid": unicode(uuid.uuid4()),
-            "type": C.MESS_TYPE_GROUPCHAT,
-            "extra": {},
-            "timestamp": timestamp,
-            "received_timestamp": unicode(time.time()),
-        }
-        # FIXME: message and subject don't handle xml:lang
-        data['message'] = {'': message.body} if message.body is not None else {}
-        data['subject'] = {'': message.subject} if message.subject is not None else {}
-
-        if data['message'] or data['subject']:
-            return self.host.memory.addToHistory(self.parent, data)
+    def _addToHistory(self, __, user, message):
+        try:
+            # message can be already parsed (with MAM), in this case mess_data
+            # it attached to the element
+            mess_data = message.element._mess_data
+        except AttributeError:
+            mess_data = self.client.messageProt.parseMessage(message.element)
+        if mess_data[u'message'] or mess_data[u'subject']:
+            return self.host.memory.addToHistory(self.client, mess_data)
         else:
             return defer.succeed(None)
 
@@ -998,9 +1111,10 @@
         @param message(muc.GroupChat): the parsed message
         """
         if room.state != ROOM_STATE_SELF_PRESENCE:
-            log.warning(_(u"received history in unexpected state in room {room} (state: {state})").format(
-                room = room.userhost(),
-                state = room.state))
+            log.warning(_(
+                u"received history in unexpected state in room {room} (state: "
+                u"{state})").format(room = room.roomJID.userhost(),
+                                    state = room.state))
         room._history_d.addCallback(self._addToHistory, user, message)
         room._history_d.addErrback(self._addToHistoryEb)
 
@@ -1030,7 +1144,7 @@
     def subject(self, room, subject):
         return muc.MUCClientProtocol.subject(self, room, subject)
 
-    def _historyCb(self, dummy, room):
+    def _historyCb(self, __, room):
         """Called when history have been written to database and subject is received
 
         this method will finish joining by:
@@ -1039,14 +1153,15 @@
             - sending stanza put in cache
             - cleaning variables not needed anymore
         """
-        args = self.plugin_parent._getRoomJoinedArgs(room, self.parent.profile)
+        args = self.plugin_parent._getRoomJoinedArgs(room, self.client.profile)
         self.host.bridge.mucRoomJoined(*args)
         room.fully_joined.callback(room)
         del room._history_d
+        del room._history_type
         cache = room._cache
         del room._cache
         for elem in cache:
-            self.parent.xmlstream.dispatch(elem)
+            self.client.xmlstream.dispatch(elem)
 
     def _historyEb(self, failure_, room):
         log.error(u"Error while managing history: {}".format(failure_))
@@ -1057,12 +1172,13 @@
         # cf. http://xmpp.org/extensions/xep-0045.html#enter-subject
         room.subject = subject  # FIXME: subject doesn't handle xml:lang
         if room.state != ROOM_STATE_LIVE:
-            self.changeRoomState(room, ROOM_STATE_LIVE)
-            room._history_d.addCallbacks(self._historyCb, self._historyEb, [room], errbackArgs=[room])
+            if room._history_type == HISTORY_LEGACY:
+                self.changeRoomState(room, ROOM_STATE_LIVE)
+                room._history_d.addCallbacks(self._historyCb, self._historyEb, [room], errbackArgs=[room])
         else:
             # the subject has been changed
             log.debug(_(u"New subject for room ({room_id}): {subject}").format(room_id = room.roomJID.full(), subject = subject))
-            self.host.bridge.mucRoomNewSubject(room.roomJID.userhost(), subject, self.parent.profile)
+            self.host.bridge.mucRoomNewSubject(room.roomJID.userhost(), subject, self.client.profile)
 
     ## disco ##