# HG changeset patch # User Goffi # Date 1544206408 -3600 # Node ID b35c84ea73cf652f4c302545ba102bc5b34ff063 # Parent 57eac4fd0ec0e6182b4c162cf23085b7de74a03c plugin XEP-0045: MAM implementation for MUC diff -r 57eac4fd0ec0 -r b35c84ea73cf CHANGELOG --- a/CHANGELOG Fri Dec 07 17:46:50 2018 +0100 +++ b/CHANGELOG Fri Dec 07 19:13:28 2018 +0100 @@ -10,6 +10,7 @@ - XEP-0231 implementation (Bits of Binary) - XEP-0264 implementation (Thumbnails) - XEP-0280 implementation (Mesage Carbons) + - XEP-0313 implementation of messages part (one2one + MUC) - XEP-0329 implementation (File Information Sharing) - XEP-0384 implementation (OMEMO encryption) - new bridges are available: pb (perspective browser), and embedded (use backend as a module) @@ -38,6 +39,7 @@ - new info/session command, to get data on current session - new blog/get command, to retrieve locally XMPP blog - new message/encryption command, to handle encryption sessions + - new message/mam command, to check MAM archives (this command may be renamed or merged in an other one in the future) - new pubsub commands, for low level pubsub manipulations - include pubusb/search command for "grepping" pubsub nodes - new invitation commands diff -r 57eac4fd0ec0 -r b35c84ea73cf sat/core/xmpp.py --- a/sat/core/xmpp.py Fri Dec 07 17:46:50 2018 +0100 +++ b/sat/core/xmpp.py Fri Dec 07 19:13:28 2018 +0100 @@ -925,8 +925,8 @@ return data = self.parseMessage(message_elt) post_treat.addCallback(self.skipEmptyMessage) - post_treat.addCallback(self.addToHistory, client) - post_treat.addCallback(self.bridgeSignal, client, data) + post_treat.addCallback(self.addToHistory) + post_treat.addCallback(self.bridgeSignal, data) post_treat.addErrback(self.cancelErrorTrap) post_treat.callback(data) @@ -951,14 +951,14 @@ raise failure.Failure(exceptions.CancelError("Cancelled empty message")) return data - def addToHistory(self, data, client): + def addToHistory(self, data): if data.pop(u"history", None) == C.HISTORY_SKIP: log.info(u"history is skipped as requested") data[u"extra"][u"history"] = C.HISTORY_SKIP else: - return self.host.memory.addToHistory(client, data) + return self.host.memory.addToHistory(self.parent, data) - def bridgeSignal(self, __, client, data): + def bridgeSignal(self, __, data): try: data["extra"]["received_timestamp"] = data["received_timestamp"] data["extra"]["delay_sender"] = data["delay_sender"] @@ -976,7 +976,7 @@ data["subject"], data["type"], data["extra"], - profile=client.profile, + profile=self.parent.profile, ) return data diff -r 57eac4fd0ec0 -r b35c84ea73cf sat/memory/sqlite.py --- a/sat/memory/sqlite.py Fri Dec 07 17:46:50 2018 +0100 +++ b/sat/memory/sqlite.py Fri Dec 07 19:13:28 2018 +0100 @@ -556,32 +556,33 @@ order = False if filters: - if 'body' in filters: + if u'body' in filters: # TODO: use REGEXP (function to be defined) instead of GLOB: https://www.sqlite.org/lang_expr.html query_parts.append(u"AND message LIKE ?") values.append(u"%{}%".format(filters['body'])) - if 'search' in filters: + if u'search' in filters: query_parts.append(u"AND (message LIKE ? OR source_res LIKE ?)") values.extend([u"%{}%".format(filters['search'])] * 2) - if 'types' in filters: + if u'types' in filters: types = filters['types'].split() query_parts.append(u"AND type IN ({})".format(u','.join("?"*len(types)))) values.extend(types) - if 'not_types' in filters: + if u'not_types' in filters: types = filters['not_types'].split() query_parts.append(u"AND type NOT IN ({})".format(u','.join("?"*len(types)))) values.extend(types) - if 'last_stanza_id' in filters: + if u'last_stanza_id' in filters: # this request get the last message with a "stanza_id" that we # have in history. This is mainly used to retrieve messages sent # while we were offline, using MAM (XEP-0313). + # It must be set after all other filters, because it contains an ORDER BY if (filters[u'last_stanza_id'] is not True - or from_jid is not None or to_jid is not None or limit != 1): raise ValueError(u"Unexpected values for last_stanza_id filter") query_parts.append(u"AND stanza_id IS NOT NULL ORDER BY history.rowid DESC") order = True + if not order: query_parts.append(u"ORDER BY timestamp DESC") # we reverse the order in sqliteHistoryToList # we use DESC here so LIMIT keep the last messages diff -r 57eac4fd0ec0 -r b35c84ea73cf sat/plugins/plugin_xep_0045.py --- a/sat/plugins/plugin_xep_0045.py Fri Dec 07 17:46:50 2018 +0100 +++ b/sat/plugins/plugin_xep_0045.py Fri Dec 07 19:13:28 2018 +0100 @@ -24,12 +24,10 @@ from twisted.internet import defer from twisted.words.protocols.jabber import jid from twisted.python import failure -from dateutil.tz import tzutc from sat.core import exceptions from sat.memory import memory -import calendar import time import uuid import copy @@ -39,14 +37,18 @@ from zope.interface import implements +# XXX: mam and rsm come from sat_tmp.wokkel +from wokkel import rsm +from wokkel import mam + PLUGIN_INFO = { C.PI_NAME: "XEP-0045 Plugin", C.PI_IMPORT_NAME: "XEP-0045", C.PI_TYPE: "XEP", C.PI_PROTOCOLS: ["XEP-0045"], - C.PI_DEPENDENCIES: [], - C.PI_RECOMMENDATIONS: [C.TEXT_CMDS], + C.PI_DEPENDENCIES: ["XEP-0359"], + C.PI_RECOMMENDATIONS: [C.TEXT_CMDS, u"XEP-0313"], C.PI_MAIN: "XEP_0045", C.PI_HANDLER: "yes", C.PI_DESCRIPTION: _("""Implementation of Multi-User Chat""") @@ -62,6 +64,8 @@ ROOM_STATE_SELF_PRESENCE = "self-presence" ROOM_STATE_LIVE = "live" ROOM_STATES = (ROOM_STATE_OCCUPANTS, ROOM_STATE_SELF_PRESENCE, ROOM_STATE_LIVE) +HISTORY_LEGACY = u"legacy" +HISTORY_MAM = u"mam" CONFIG_SECTION = u'plugin muc' @@ -109,14 +113,30 @@ self.text_cmds.registerTextCommands(self) self.text_cmds.addWhoIsCb(self._whois, 100) + self._mam = self.host.plugins.get(u"XEP-0313") + self._si = self.host.plugins[u"XEP-0359"] + host.trigger.add("presence_available", self.presenceTrigger) host.trigger.add("MessageReceived", self.messageReceivedTrigger, priority=1000000) + host.trigger.add("message_parse", self._message_parseTrigger) def profileConnected(self, client): def assign_service(service): client.muc_service = service return self.getMUCService(client).addCallback(assign_service) + def _message_parseTrigger(self, client, message_elt, data): + """Add stanza-id from the room if present""" + if message_elt.getAttribute(u"type") != C.MESS_TYPE_GROUPCHAT: + return True + + # stanza_id will not be filled by parseMessage because the emitter + # is the room and not our server, so we have to parse it here + room_jid = data[u"from"].userhostJID() + stanza_id = self._si.getStanzaId(message_elt, room_jid) + if stanza_id: + data[u"extra"][u"stanza_id"] = stanza_id + def messageReceivedTrigger(self, client, message_elt, post_treat): if message_elt.getAttribute("type") == C.MESS_TYPE_GROUPCHAT: if message_elt.subject or message_elt.delay: @@ -126,13 +146,21 @@ if room_jid in client._muc_client.joined_rooms: room = client._muc_client.joined_rooms[room_jid] if room.state != ROOM_STATE_LIVE: - log.warning(_(u"Received non delayed message in a room before its initialisation: state={state}, msg={msg}").format( + if getattr(room, "_history_type", HISTORY_LEGACY) == HISTORY_LEGACY: + # With MAM history, order is different, and we can get live + # messages before history is complete, so this is not a warning + # but an expected case. + # On the other hand, with legacy history, it's not normal. + log.warning(_( + u"Received non delayed message in a room before its " + u"initialisation: state={state}, msg={msg}").format( state=room.state, msg=message_elt.toXml())) room._cache.append(message_elt) return False else: - log.warning(u"Received groupchat message for a room which has not been joined, ignoring it: {}".format(message_elt.toXml())) + log.warning(u"Received groupchat message for a room which has not been " + u"joined, ignoring it: {}".format(message_elt.toXml())) return False return True @@ -197,7 +225,7 @@ # a proper configuration management should be done log.debug(_(u"room locked !")) d = client._muc_client.configure(room.roomJID, {}) - d.addErrback(lambda dummy: log.error(_(u'Error while configuring the room'))) + d.addErrback(lambda __: log.error(_(u'Error while configuring the room'))) return room.fully_joined def _joinEb(self, failure, client, room_jid, nick, password): @@ -580,7 +608,7 @@ d = self.kick(client, nick, mess_data["to"], {} if len(options) == 1 else {'reason': options[1]}) - def cb(dummy): + def cb(__): feedback_msg = _(u'You have kicked {}').format(nick) if len(options) > 1: feedback_msg += _(u' for the following reason: {}').format(options[1]) @@ -609,7 +637,7 @@ d = self.ban(client, entity_jid, mess_data["to"], {} if len(options) == 1 else {'reason': options[1]}) - def cb(dummy): + def cb(__): feedback_msg = _(u'You have banned {}').format(entity_jid) if len(options) > 1: feedback_msg += _(u' for the following reason: {}').format(options[1]) @@ -648,7 +676,7 @@ d = self.affiliate(client, entity_jid, mess_data["to"], {'affiliation': affiliation}) - def cb(dummy): + def cb(__): feedback_msg = _(u'New affiliation for %(entity)s: %(affiliation)s').format(entity=entity_jid, affiliation=affiliation) self.text_cmds.feedBack(client, feedback_msg, mess_data) return True @@ -744,7 +772,6 @@ def __init__(self, plugin_parent): self.plugin_parent = plugin_parent - self.host = plugin_parent.host muc.MUCClient.__init__(self) self._changing_nicks = set() # used to keep trace of who is changing nick, # and to discard userJoinedRoom signal in this case @@ -754,6 +781,22 @@ def joined_rooms(self): return self._rooms + @property + def host(self): + return self.plugin_parent.host + + @property + def client(self): + return self.parent + + @property + def _mam(self): + return self.plugin_parent._mam + + @property + def _si(self): + return self.plugin_parent._si + def changeRoomState(self, room, new_state): """Check that room is in expected state, and change it @@ -768,7 +811,9 @@ state=new_state)) expected_state = ROOM_STATES[new_state_idx-1] if room.state != expected_state: - log.warning(_(u"room {room} is not in expected state: room is in state {current_state} while we were expecting {expected_state}").format( + log.error(_( + u"room {room} is not in expected state: room is in state {current_state} " + u"while we were expecting {expected_state}").format( room=room.userhost(), current_state=room.state, expected_state=expected_state)) @@ -778,8 +823,6 @@ super(SatMUCClient, self)._addRoom(room) room._roster_ok = False # True when occupants list has been fully received room.state = ROOM_STATE_OCCUPANTS - room._history_d = defer.Deferred() # used to send bridge signal once backlog are written in history - room._history_d.callback(None) # FIXME: check if history_d is not redundant with fully_joined room.fully_joined = defer.Deferred() # called when everything is OK # cache data until room is ready @@ -787,12 +830,13 @@ room._cache = [] @defer.inlineCallbacks - def join(self, room_jid, nick, password=None): - mess_data_list = yield self.host.memory.historyGet(self.parent.jid.userhostJID(), - room_jid, - 1, - True, - profile=self.parent.profile) + def _joinLegacy(self, client, room_jid, nick, password): + """Join room an retrieve history with legacy method""" + mess_data_list = yield self.host.memory.historyGet(room_jid, + client.jid.userhostJID(), + limit=1, + between=True, + profile=client.profile) if mess_data_list: timestamp = mess_data_list[0][1] # we use seconds since last message to get backlog without duplicates @@ -800,9 +844,89 @@ seconds = int(time.time() - timestamp) - 1 else: seconds = None - room = yield super(SatMUCClient, self).join(room_jid, nick, muc.HistoryOptions(seconds=seconds), password) + + room = yield super(SatMUCClient, self).join( + room_jid, nick, muc.HistoryOptions(seconds=seconds), password) + # used to send bridge signal once backlog are written in history + room._history_type = HISTORY_LEGACY + room._history_d = defer.Deferred() + room._history_d.callback(None) defer.returnValue(room) + @defer.inlineCallbacks + def _joinMAM(self, client, room_jid, nick, password): + """Join room and retrieve history using MAM""" + room = yield super(SatMUCClient, self).join( + # we don't want any history from room as we'll get it with MAM + room_jid, nick, muc.HistoryOptions(maxStanzas=0), password=password) + room._history_type = HISTORY_MAM + room._history_d = defer.Deferred() + + last_mess = yield self.host.memory.historyGet(room_jid, + None, + limit=1, + between=False, + filters={u'last_stanza_id': True}, + profile=client.profile) + if last_mess: + stanza_id = last_mess[0][-1][u'stanza_id'] + rsm_req = rsm.RSMRequest(after=stanza_id) + else: + log.info(u"We have no MAM archive for room {room_jid}.".format( + room_jid=room_jid)) + rsm_req = rsm.RSMRequest(max_=20) + + mam_req = mam.MAMRequest(rsm_=rsm_req) + mam_data = yield self._mam.getArchives(client, mam_req, + service=room_jid) + elt_list, rsm_response = mam_data + + if not elt_list: + log.info(_(u"No message received while offline in {room_jid}".format( + room_jid=room_jid))) + else: + log.info( + _(u"We have received {num_mess} message(s) in {room_jid} while offline.") + .format(num_mess=len(elt_list), room_jid=room_jid)) + + for mess_elt in elt_list: + try: + fwd_message_elt = self._mam.getMessageFromResult( + client, mess_elt, mam_req, service=room_jid) + except exceptions.DataError: + continue + if fwd_message_elt.getAttribute(u"to"): + log.warning( + u'Forwarded message element has a "to" attribute while it is ' + u'forbidden by specifications') + fwd_message_elt[u"to"] = client.jid.full() + mess_data = client.messageProt.parseMessage(fwd_message_elt) + # we attache parsed message data to element, to avoid parsing + # again in _addToHistory + fwd_message_elt._mess_data = mess_data + # and we inject to MUC workflow + client._muc_client._onGroupChat(fwd_message_elt) + + + # for legacy history, the following steps are done in receivedSubject but for MAM + # the order is different (we have to join then get MAM archive, so subject + # is received before archive), so we change state and add the callbacks here. + self.changeRoomState(room, ROOM_STATE_LIVE) + room._history_d.addCallbacks(self._historyCb, self._historyEb, [room], + errbackArgs=[room]) + + # callback is done now that all needed Deferred have been added to _history_d + room._history_d.callback(None) + + defer.returnValue(room) + + def join(self, room_jid, nick, password=None): + if (not self._mam or not self.host.hasFeature(self.client, + mam.NS_MAM, room_jid)): + return self._joinLegacy(self.client, room_jid, nick, password) + else: + return self._joinMAM(self.client, room_jid, nick, password) + ## presence/roster ## def availableReceived(self, presence): @@ -865,7 +989,7 @@ self.changeRoomState(room, ROOM_STATE_SELF_PRESENCE) log.debug(u"room {room} joined with nick {nick}".format(room=room.occupantJID.userhost(), nick=user.nick)) # We set type so we don't have to use a deferred with disco to check entity type - self.host.memory.updateEntityData(room.roomJID, C.ENTITY_TYPE, ENTITY_TYPE_MUC, profile_key=self.parent.profile) + self.host.memory.updateEntityData(room.roomJID, C.ENTITY_TYPE, ENTITY_TYPE_MUC, profile_key=self.client.profile) elif room.state not in (ROOM_STATE_OCCUPANTS, ROOM_STATE_LIVE): log.warning(u"Received user presence data in a room before its initialisation (current state: {state})," "this is not standard! Ignoring it: {room} ({nick})".format( @@ -881,7 +1005,7 @@ except KeyError: # this is a new user log.debug(_(u"user {nick} has joined room {room_id}").format(nick=user.nick, room_id=room.occupantJID.userhost())) - if not self.host.trigger.point("MUC user joined", room, user, self.parent.profile): + if not self.host.trigger.point("MUC user joined", room, user, self.client.profile): return extra = {'info_type': ROOM_USER_JOINED, @@ -893,7 +1017,7 @@ extra['user_entity'] = user.entity.full() mess_data = { # dict is similar to the one used in client.onMessage "from": room.roomJID, - "to": self.parent.jid, + "to": self.client.jid, "uid": unicode(uuid.uuid4()), "message": {'': D_(u"=> {} has joined the room").format(user.nick)}, "subject": {}, @@ -904,20 +1028,20 @@ # FIXME: we disable presence in history as it's taking a lot of space # while not indispensable. In the future an option may allow # to re-enable it - # self.parent.messageAddToHistory(mess_data) - self.parent.messageSendToBridge(mess_data) + # self.client.messageAddToHistory(mess_data) + self.client.messageSendToBridge(mess_data) def userLeftRoom(self, room, user): - if not self.host.trigger.point("MUC user left", room, user, self.parent.profile): + if not self.host.trigger.point("MUC user left", room, user, self.client.profile): return if user.nick == room.nick: # we left the room room_jid_s = room.roomJID.userhost() log.info(_(u"Room ({room}) left ({profile})").format( - room = room_jid_s, profile = self.parent.profile)) - self.host.memory.delEntityCache(room.roomJID, profile_key=self.parent.profile) - self.host.bridge.mucRoomLeft(room.roomJID.userhost(), self.parent.profile) + room = room_jid_s, profile = self.client.profile)) + self.host.memory.delEntityCache(room.roomJID, profile_key=self.client.profile) + self.host.bridge.mucRoomLeft(room.roomJID.userhost(), self.client.profile) elif room.state != ROOM_STATE_LIVE: log.warning(u"Received user presence data in a room before its initialisation (current state: {state})," "this is not standard! Ignoring it: {room} ({nick})".format( @@ -938,7 +1062,7 @@ extra['user_entity'] = user.entity.full() mess_data = { # dict is similar to the one used in client.onMessage "from": room.roomJID, - "to": self.parent.jid, + "to": self.client.jid, "uid": unicode(uuid.uuid4()), "message": {'': D_(u"<= {} has left the room").format(user.nick)}, "subject": {}, @@ -947,40 +1071,29 @@ "timestamp": time.time(), } # FIXME: disable history, see userJoinRoom comment - # self.parent.messageAddToHistory(mess_data) - self.parent.messageSendToBridge(mess_data) + # self.client.messageAddToHistory(mess_data) + self.client.messageSendToBridge(mess_data) def userChangedNick(self, room, user, new_nick): - self.host.bridge.mucRoomUserChangedNick(room.roomJID.userhost(), user.nick, new_nick, self.parent.profile) + self.host.bridge.mucRoomUserChangedNick(room.roomJID.userhost(), user.nick, new_nick, self.client.profile) def userUpdatedStatus(self, room, user, show, status): - self.host.bridge.presenceUpdate(room.roomJID.userhost() + '/' + user.nick, show or '', 0, {C.PRESENCE_STATUSES_DEFAULT: status or ''}, self.parent.profile) + self.host.bridge.presenceUpdate(room.roomJID.userhost() + '/' + user.nick, show or '', 0, {C.PRESENCE_STATUSES_DEFAULT: status or ''}, self.client.profile) ## messages ## def receivedGroupChat(self, room, user, body): log.debug(u'receivedGroupChat: room=%s user=%s body=%s' % (room.roomJID.full(), user, body)) - def _addToHistory(self, dummy, user, message): - # we check if message is not in history - # and raise ConflictError else - stamp = message.delay.stamp.astimezone(tzutc()).timetuple() - timestamp = float(calendar.timegm(stamp)) - data = { # dict is similar to the one used in client.onMessage - "from": message.sender, - "to": message.recipient, - "uid": unicode(uuid.uuid4()), - "type": C.MESS_TYPE_GROUPCHAT, - "extra": {}, - "timestamp": timestamp, - "received_timestamp": unicode(time.time()), - } - # FIXME: message and subject don't handle xml:lang - data['message'] = {'': message.body} if message.body is not None else {} - data['subject'] = {'': message.subject} if message.subject is not None else {} - - if data['message'] or data['subject']: - return self.host.memory.addToHistory(self.parent, data) + def _addToHistory(self, __, user, message): + try: + # message can be already parsed (with MAM), in this case mess_data + # it attached to the element + mess_data = message.element._mess_data + except AttributeError: + mess_data = self.client.messageProt.parseMessage(message.element) + if mess_data[u'message'] or mess_data[u'subject']: + return self.host.memory.addToHistory(self.client, mess_data) else: return defer.succeed(None) @@ -998,9 +1111,10 @@ @param message(muc.GroupChat): the parsed message """ if room.state != ROOM_STATE_SELF_PRESENCE: - log.warning(_(u"received history in unexpected state in room {room} (state: {state})").format( - room = room.userhost(), - state = room.state)) + log.warning(_( + u"received history in unexpected state in room {room} (state: " + u"{state})").format(room = room.roomJID.userhost(), + state = room.state)) room._history_d.addCallback(self._addToHistory, user, message) room._history_d.addErrback(self._addToHistoryEb) @@ -1030,7 +1144,7 @@ def subject(self, room, subject): return muc.MUCClientProtocol.subject(self, room, subject) - def _historyCb(self, dummy, room): + def _historyCb(self, __, room): """Called when history have been written to database and subject is received this method will finish joining by: @@ -1039,14 +1153,15 @@ - sending stanza put in cache - cleaning variables not needed anymore """ - args = self.plugin_parent._getRoomJoinedArgs(room, self.parent.profile) + args = self.plugin_parent._getRoomJoinedArgs(room, self.client.profile) self.host.bridge.mucRoomJoined(*args) room.fully_joined.callback(room) del room._history_d + del room._history_type cache = room._cache del room._cache for elem in cache: - self.parent.xmlstream.dispatch(elem) + self.client.xmlstream.dispatch(elem) def _historyEb(self, failure_, room): log.error(u"Error while managing history: {}".format(failure_)) @@ -1057,12 +1172,13 @@ # cf. http://xmpp.org/extensions/xep-0045.html#enter-subject room.subject = subject # FIXME: subject doesn't handle xml:lang if room.state != ROOM_STATE_LIVE: - self.changeRoomState(room, ROOM_STATE_LIVE) - room._history_d.addCallbacks(self._historyCb, self._historyEb, [room], errbackArgs=[room]) + if room._history_type == HISTORY_LEGACY: + self.changeRoomState(room, ROOM_STATE_LIVE) + room._history_d.addCallbacks(self._historyCb, self._historyEb, [room], errbackArgs=[room]) else: # the subject has been changed log.debug(_(u"New subject for room ({room_id}): {subject}").format(room_id = room.roomJID.full(), subject = subject)) - self.host.bridge.mucRoomNewSubject(room.roomJID.userhost(), subject, self.parent.profile) + self.host.bridge.mucRoomNewSubject(room.roomJID.userhost(), subject, self.client.profile) ## disco ##