Mercurial > libervia-backend
view libervia/backend/plugins/plugin_xep_0045.py @ 4303:a7ec325246fb
component email-gateway: first draft:
Initial implementation of the Email Gateway.
This component uses XEP-0100 for registration. Upon registration and subsequent startups,
a connection is made to registered IMAP services, and incoming emails (in `INBOX`
mailboxes) are immediately forwarded as XMPP messages.
In the opposite direction, an SMTP connection is established to send emails on incoming
XMPP messages.
rel 449
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 06 Sep 2024 18:07:17 +0200 |
parents | 0d7bb4df2343 |
children |
line wrap: on
line source
#!/usr/bin/env python3 # SAT plugin for managing xep-0045 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import time from typing import Optional import uuid import shortuuid from twisted.internet import defer from twisted.python import failure from twisted.words.protocols.jabber import jid from twisted.words.protocols.jabber import error as xmpp_error from wokkel import disco, iwokkel, muc from wokkel import rsm from wokkel import mam from zope.interface import implementer from libervia.backend.core import exceptions from libervia.backend.core.core_types import SatXMPPEntity from libervia.backend.core.constants import Const as C from libervia.backend.core.i18n import D_, _ from libervia.backend.core.log import getLogger from libervia.backend.memory import memory from libervia.backend.tools import xml_tools, utils log = getLogger(__name__) PLUGIN_INFO = { C.PI_NAME: "XEP-0045 Plugin", C.PI_IMPORT_NAME: "XEP-0045", C.PI_TYPE: "XEP", C.PI_MODES: C.PLUG_MODE_BOTH, C.PI_PROTOCOLS: ["XEP-0045"], C.PI_DEPENDENCIES: ["XEP-0359"], C.PI_RECOMMENDATIONS: [C.TEXT_CMDS, "XEP-0313"], C.PI_MAIN: "XEP_0045", C.PI_HANDLER: "yes", C.PI_DESCRIPTION: _("""Implementation of Multi-User Chat"""), } NS_MUC = "http://jabber.org/protocol/muc" AFFILIATIONS = ("owner", "admin", "member", "none", "outcast") ROOM_USER_JOINED = "ROOM_USER_JOINED" ROOM_USER_LEFT = "ROOM_USER_LEFT" OCCUPANT_KEYS = ("nick", "entity", "affiliation", "role") ROOM_STATE_OCCUPANTS = "occupants" ROOM_STATE_SELF_PRESENCE = "self-presence" ROOM_STATE_LIVE = "live" ROOM_STATES = (ROOM_STATE_OCCUPANTS, ROOM_STATE_SELF_PRESENCE, ROOM_STATE_LIVE) HISTORY_LEGACY = "legacy" HISTORY_MAM = "mam" CONFIG_SECTION = "plugin muc" default_conf = {"default_muc": "sat@chat.jabberfr.org"} class AlreadyJoined(exceptions.ConflictError): def __init__(self, room): super(AlreadyJoined, self).__init__() self.room = room class XEP_0045: # TODO: handle invitations # FIXME: this plugin need a good cleaning, join method is messy def __init__(self, host): log.info(_("Plugin XEP_0045 initialization")) self.host = host self._sessions = memory.Sessions() # return same arguments as muc_room_joined + a boolean set to True is the room was # already joined (first argument) host.bridge.add_method( "muc_join", ".plugin", in_sign="ssa{ss}s", out_sign="(bsa{sa{ss}}ssass)", method=self._join, async_=True, ) host.bridge.add_method( "muc_nick", ".plugin", in_sign="sss", out_sign="", method=self._nick ) host.bridge.add_method( "muc_nick_get", ".plugin", in_sign="ss", out_sign="s", method=self._get_room_nick, ) host.bridge.add_method( "muc_leave", ".plugin", in_sign="ss", out_sign="", method=self._leave, async_=True, ) host.bridge.add_method( "muc_occupants_get", ".plugin", in_sign="ss", out_sign="a{sa{ss}}", method=self._get_room_occupants, ) host.bridge.add_method( "muc_subject", ".plugin", in_sign="sss", out_sign="", method=self._subject ) host.bridge.add_method( "muc_get_rooms_joined", ".plugin", in_sign="s", out_sign="a(sa{sa{ss}}ssas)", method=self._get_rooms_joined, ) host.bridge.add_method( "muc_get_unique_room_name", ".plugin", in_sign="ss", out_sign="s", method=self._get_unique_name, ) host.bridge.add_method( "muc_configure_room", ".plugin", in_sign="ss", out_sign="s", method=self._configure_room, async_=True, ) host.bridge.add_method( "muc_get_default_service", ".plugin", in_sign="", out_sign="s", method=self.get_default_muc, ) host.bridge.add_method( "muc_get_service", ".plugin", in_sign="ss", out_sign="s", method=self._get_muc_service, async_=True, ) # called when a room will be joined but must be locked until join is received # (room is prepared, history is getting retrieved) # args: room_jid, profile host.bridge.add_signal("muc_room_prepare_join", ".plugin", signature="ss") # args: room_jid, occupants, user_nick, subject, profile host.bridge.add_signal("muc_room_joined", ".plugin", signature="sa{sa{ss}}ssass") # args: room_jid, profile host.bridge.add_signal("muc_room_left", ".plugin", signature="ss") # args: room_jid, old_nick, new_nick, profile host.bridge.add_signal("muc_room_user_changed_nick", ".plugin", signature="ssss") # args: room_jid, subject, profile host.bridge.add_signal("muc_room_new_subject", ".plugin", signature="sss") self.__submit_conf_id = host.register_callback( self._submit_configuration, with_data=True ) self._room_join_id = host.register_callback(self._ui_room_join_cb, with_data=True) host.import_menu( (D_("MUC"), D_("configure")), self._configure_room_menu, security_limit=0, help_string=D_("Configure Multi-User Chat room"), type_=C.MENU_ROOM, ) try: self.text_cmds = self.host.plugins[C.TEXT_CMDS] except KeyError: log.info(_("Text commands not available")) else: self.text_cmds.register_text_commands(self) self.text_cmds.add_who_is_cb(self._whois, 100) self._mam = self.host.plugins.get("XEP-0313") self._si = self.host.plugins["XEP-0359"] host.trigger.add("presence_available", self.presence_trigger) host.trigger.add("presence_received", self.presence_received_trigger) host.trigger.add( "message_received", self.message_received_trigger, priority=1000000 ) host.trigger.add("message_parse", self._message_parse_trigger) async def profile_connected(self, client): client.muc_service = await self.get_muc_service(client) def _message_parse_trigger(self, client, message_elt, data): """Add stanza-id from the room if present""" if message_elt.getAttribute("type") != C.MESS_TYPE_GROUPCHAT: return True # stanza_id will not be filled by parse_message because the emitter # is the room and not our server, so we have to parse it here room_jid = data["from"].userhostJID() stanza_id = self._si.get_stanza_id(message_elt, room_jid) if stanza_id: data["extra"]["stanza_id"] = stanza_id def message_received_trigger(self, client, message_elt, post_treat): if message_elt.getAttribute("type") == C.MESS_TYPE_GROUPCHAT: if message_elt.subject: return False from_jid = jid.JID(message_elt["from"]) room_jid = from_jid.userhostJID() if room_jid in client._muc_client.joined_rooms: room = client._muc_client.joined_rooms[room_jid] if room.state != ROOM_STATE_LIVE: if getattr(room, "_history_type", HISTORY_LEGACY) == HISTORY_LEGACY: # With MAM history, order is different, and we can get live # messages before history is complete, so this is not a warning # but an expected case. # On the other hand, with legacy history, it's not normal. log.warning( _( "Received non delayed message in a room before its " "initialisation: state={state}, msg={msg}" ).format(state=room.state, msg=message_elt.toXml()) ) room._cache.append(message_elt) return False else: log.warning( "Received groupchat message for a room which has not been " "joined, ignoring it: {}".format(message_elt.toXml()) ) return False return True def get_room(self, client: SatXMPPEntity, room_jid: jid.JID) -> muc.Room: """Retrieve Room instance from its jid @param room_jid: jid of the room @raise exceptions.NotFound: the room has not been joined """ try: return client._muc_client.joined_rooms[room_jid] except KeyError: raise exceptions.NotFound(_("This room has not been joined")) def check_room_joined(self, client, room_jid): """Check that given room has been joined in current session @param room_jid (JID): room JID """ if room_jid not in client._muc_client.joined_rooms: raise exceptions.NotFound(_("This room has not been joined")) def is_joined_room(self, client: SatXMPPEntity, room_jid: jid.JID) -> bool: """Tell if a jid is a known and joined room @room_jid: jid of the room """ try: self.check_room_joined(client, room_jid) except exceptions.NotFound: return False else: return True def is_room(self, client, entity_jid): """Tell if a jid is a joined MUC similar to is_joined_room but returns a boolean @param entity_jid(jid.JID): full or bare jid of the entity check @return (bool): True if the bare jid of the entity is a room jid """ try: self.check_room_joined(client, entity_jid.userhostJID()) except exceptions.NotFound: return False else: return True def get_bare_or_full(self, client, peer_jid): """use full jid if peer_jid is an occupant of a room, bare jid else @param peer_jid(jid.JID): entity to test @return (jid.JID): bare or full jid """ if peer_jid.resource: if not self.is_room(client, peer_jid): return peer_jid.userhostJID() return peer_jid def get_room_user_jid(self, client: SatXMPPEntity, room_jid: jid.JID) -> jid.JID: """Get the nick used in the room @param room_jid: jid of the room to use @return: JID used in room @raise exceptions.NotFound: this room has not been joined @raise exceptions.InternalError: invalid room_jid """ # FIXME: doesn't check if room is anonymous or not # TODO: check if room is (semi)anonymous if room_jid.resource: raise exceptions.InternalError(f"{room_jid} should not have a ressource.") room = self.get_room(client, room_jid) return jid.JID(tuple=(room_jid.user, room_jid.host, room.nick)) def _get_room_joined_args(self, room, profile): return [ room.roomJID.userhost(), XEP_0045._get_occupants(room), room.nick, # FIXME: getattr below is a Q&D fix as `subject` may not be set in early call # of _get_rooms_joined (during `join` call). The whole workflow of this # plugin should be refactored. getattr(room, "subject", ""), [s.name for s in room.statuses], profile, ] def _ui_room_join_cb(self, data, profile): room_jid = jid.JID(data["index"]) client = self.host.get_client(profile) defer.ensureDeferred(self.join(client, room_jid)) return {} def _password_ui_cb(self, data, client, room_jid, nick): """Called when the user has given room password (or cancelled)""" if C.bool(data.get(C.XMLUI_DATA_CANCELLED, "false")): log.info("room join for {} is cancelled".format(room_jid.userhost())) raise failure.Failure( exceptions.CancelError(D_("Room joining cancelled by user")) ) password = data[xml_tools.form_escape("password")] return client._muc_client.join(room_jid, nick, password).addCallbacks( self._join_cb, self._join_eb, (client, room_jid, nick), errbackArgs=(client, room_jid, nick, password), ) def _show_list_ui(self, items, client, service): xmlui = xml_tools.XMLUI(title=D_("Rooms in {}".format(service.full()))) adv_list = xmlui.change_container( "advanced_list", columns=1, selectable="single", callback_id=self._room_join_id, ) items = sorted(items, key=lambda i: i.name.lower()) for item in items: adv_list.set_row_index(item.entity.full()) xmlui.addText(item.name) adv_list.end() self.host.action_new({"xmlui": xmlui.toXml()}, profile=client.profile) def _join_cb(self, room, client, room_jid, nick): """Called when the user is in the requested room""" if room.locked: # FIXME: the current behaviour is to create an instant room # and send the signal only when the room is unlocked # a proper configuration management should be done log.debug(_("room locked !")) d = client._muc_client.configure(room.roomJID, {}) d.addErrback( self.host.log_errback, msg=_("Error while configuring the room: {failure_}"), ) return room.fully_joined def _join_eb(self, failure_, client, room_jid, nick, password): """Called when something is going wrong when joining the room""" try: condition = failure_.value.condition except AttributeError: msg_suffix = f": {failure_}" else: if condition == "conflict": # we have a nickname conflict, we try again with "_" suffixed to current nickname nick += "_" return client._muc_client.join(room_jid, nick, password).addCallbacks( self._join_cb, self._join_eb, (client, room_jid, nick), errbackArgs=(client, room_jid, nick, password), ) elif condition == "not-allowed": # room is restricted, we need a password password_ui = xml_tools.XMLUI( "form", title=D_("Room {} is restricted").format(room_jid.userhost()), submit_id="", ) password_ui.addText( D_("This room is restricted, please enter the password") ) password_ui.addPassword("password") d = xml_tools.defer_xmlui(self.host, password_ui, profile=client.profile) d.addCallback(self._password_ui_cb, client, room_jid, nick) return d msg_suffix = ' with condition "{}"'.format(failure_.value.condition) mess = D_( "Error while joining the room {room}{suffix}".format( room=room_jid.userhost(), suffix=msg_suffix ) ) log.warning(mess) xmlui = xml_tools.note(mess, D_("Group chat error"), level=C.XMLUI_DATA_LVL_ERROR) self.host.action_new({"xmlui": xmlui.toXml()}, profile=client.profile) @staticmethod def _get_occupants(room): """Get occupants of a room in a form suitable for bridge""" return { u.nick: {k: str(getattr(u, k) or "") for k in OCCUPANT_KEYS} for u in list(room.roster.values()) } def _get_room_occupants(self, room_jid_s, profile_key): client = self.host.get_client(profile_key) room_jid = jid.JID(room_jid_s) return self.get_room_occupants(client, room_jid) def get_room_occupants(self, client, room_jid): room = self.get_room(client, room_jid) return self._get_occupants(room) def _get_rooms_joined(self, profile_key=C.PROF_KEY_NONE): client = self.host.get_client(profile_key) return self.get_rooms_joined(client) def get_rooms_joined(self, client): """Return rooms where user is""" result = [] for room in list(client._muc_client.joined_rooms.values()): if room.state == ROOM_STATE_LIVE: result.append( ( room.roomJID.userhost(), self._get_occupants(room), room.nick, room.subject, [s.name for s in room.statuses], ) ) return result def _get_room_nick(self, room_jid_s, profile_key=C.PROF_KEY_NONE): client = self.host.get_client(profile_key) return self.get_room_nick(client, jid.JID(room_jid_s)) def get_room_nick(self, client, room_jid): """return nick used in room by user @param room_jid (jid.JID): JID of the room @profile_key: profile @return: nick or empty string in case of error @raise exceptions.Notfound: use has not joined the room """ self.check_room_joined(client, room_jid) return client._muc_client.joined_rooms[room_jid].nick def _configure_room(self, room_jid_s, profile_key=C.PROF_KEY_NONE): client = self.host.get_client(profile_key) d = self.configure_room(client, jid.JID(room_jid_s)) d.addCallback(lambda xmlui: xmlui.toXml()) return d def _configure_room_menu(self, menu_data, profile): """Return room configuration form @param menu_data: %(menu_data)s @param profile: %(doc_profile)s """ client = self.host.get_client(profile) try: room_jid = jid.JID(menu_data["room_jid"]) except KeyError: log.error(_("room_jid key is not present !")) return defer.fail(exceptions.DataError) def xmlui_received(xmlui): if not xmlui: msg = D_("No configuration available for this room") return {"xmlui": xml_tools.note(msg).toXml()} return {"xmlui": xmlui.toXml()} return self.configure_room(client, room_jid).addCallback(xmlui_received) def configure_room(self, client, room_jid): """return the room configuration form @param room: jid of the room to configure @return: configuration form as XMLUI """ self.check_room_joined(client, room_jid) def config_2_xmlui(result): if not result: return "" session_id, session_data = self._sessions.new_session(profile=client.profile) session_data["room_jid"] = room_jid xmlui = xml_tools.data_form_2_xmlui(result, submit_id=self.__submit_conf_id) xmlui.session_id = session_id return xmlui d = client._muc_client.getConfiguration(room_jid) d.addCallback(config_2_xmlui) return d def _submit_configuration(self, raw_data, profile): cancelled = C.bool(raw_data.get("cancelled", C.BOOL_FALSE)) if cancelled: return defer.succeed({}) client = self.host.get_client(profile) try: session_data = self._sessions.profile_get(raw_data["session_id"], profile) except KeyError: log.warning(D_("Session ID doesn't exist, session has probably expired.")) _dialog = xml_tools.XMLUI("popup", title=D_("Room configuration failed")) _dialog.addText(D_("Session ID doesn't exist, session has probably expired.")) return defer.succeed({"xmlui": _dialog.toXml()}) data = xml_tools.xmlui_result_2_data_form_result(raw_data) d = client._muc_client.configure(session_data["room_jid"], data) _dialog = xml_tools.XMLUI("popup", title=D_("Room configuration succeed")) _dialog.addText(D_("The new settings have been saved.")) d.addCallback(lambda ignore: {"xmlui": _dialog.toXml()}) del self._sessions[raw_data["session_id"]] return d def is_nick_in_room(self, client, room_jid, nick): """Tell if a nick is currently present in a room""" self.check_room_joined(client, room_jid) return client._muc_client.joined_rooms[room_jid].inRoster(muc.User(nick)) def _get_muc_service(self, jid_=None, profile=C.PROF_KEY_NONE): client = self.host.get_client(profile) d = defer.ensureDeferred(self.get_muc_service(client, jid_ or None)) d.addCallback( lambda service_jid: service_jid.full() if service_jid is not None else "" ) return d async def get_muc_service( self, client: SatXMPPEntity, jid_: Optional[jid.JID] = None ) -> Optional[jid.JID]: """Return first found MUC service of an entity @param jid_: entity which may have a MUC service, or None for our own server @return: found service jid or None """ if jid_ is None: try: muc_service = client.muc_service except AttributeError: pass else: # we have a cached value, we return it return muc_service services = await self.host.find_service_entities( client, "conference", "text", jid_ ) for service in services: if ".irc." not in service.userhost(): # FIXME: # This ugly hack is here to avoid an issue with openfire: the IRC gateway # use "conference/text" identity (instead of "conference/irc") muc_service = service break else: muc_service = None return muc_service def _get_unique_name(self, muc_service="", profile_key=C.PROF_KEY_NONE): client = self.host.get_client(profile_key) return self.get_unique_name(client, muc_service or None).full() def get_unique_name( self, client: SatXMPPEntity, muc_service: jid.JID | None = None, prefix: str = "" ) -> jid.JID: """Return unique name for a room, avoiding collision @param client: Client instance. @param muc_service: leave empty string to use the default service @param prefix: prefix to use in room name. @return: unique room bare JID """ room_name = f"{prefix}{shortuuid.uuid()}" if muc_service is None: try: muc_service = client.muc_service except AttributeError: raise exceptions.NotReady( "Main server MUC service has not been checked yet" ) if muc_service is None: log.warning(_("No MUC service found on main server")) raise exceptions.FeatureNotFound muc_service = muc_service.userhost() return jid.JID(f"{room_name}@{muc_service}") def get_default_muc(self): """Return the default MUC. @return: unicode """ return self.host.memory.config_get( CONFIG_SECTION, "default_muc", default_conf["default_muc"] ) def _bridge_join_eb(self, failure_, client): failure_.trap(AlreadyJoined) room = failure_.value.room return [True] + self._get_room_joined_args(room, client.profile) def _join(self, room_jid_s, nick, options, profile_key=C.PROF_KEY_NONE): """join method used by bridge @return (tuple): already_joined boolean + room joined arguments (see [_get_room_joined_args]) """ client = self.host.get_client(profile_key) if room_jid_s: muc_service = client.muc_service try: room_jid = jid.JID(room_jid_s) except (RuntimeError, jid.InvalidFormat, AttributeError): return defer.fail( jid.InvalidFormat( _( "Invalid room identifier: {room_id}'. Please give a room short or full identifier like 'room' or 'room@{muc_service}'." ).format(room_id=room_jid_s, muc_service=str(muc_service)) ) ) if not room_jid.user: room_jid.user, room_jid.host = room_jid.host, muc_service else: room_jid = self.get_unique_name(client) # TODO: error management + signal in bridge d = defer.ensureDeferred(self.join(client, room_jid, nick, options or None)) d.addCallback( lambda room: [False] + self._get_room_joined_args(room, client.profile) ) d.addErrback(self._bridge_join_eb, client) return d async def join( self, client: SatXMPPEntity, room_jid: jid.JID, nick: Optional[str] = None, options: Optional[dict] = None, ) -> Optional[muc.Room]: if not nick: nick = client.jid.user if options is None: options = {} if room_jid in client._muc_client.joined_rooms: room = client._muc_client.joined_rooms[room_jid] log.info( _("{profile} is already in room {room_jid}").format( profile=client.profile, room_jid=room_jid.userhost() ) ) raise AlreadyJoined(room) log.info( _("[{profile}] is joining room {room} with nick {nick}").format( profile=client.profile, room=room_jid.userhost(), nick=nick ) ) self.host.bridge.muc_room_prepare_join(room_jid.userhost(), client.profile) password = options.get("password") try: room = await client._muc_client.join(room_jid, nick, password) except Exception as e: room = await utils.as_deferred( self._join_eb(failure.Failure(e), client, room_jid, nick, password) ) else: room.on_joined_callbacks = [] room.on_left_callbacks = [] await defer.ensureDeferred(self._join_cb(room, client, room_jid, nick)) return room def pop_rooms(self, client): """Remove rooms and return data needed to re-join them This methods is to be called before a hot reconnection @return (list[(jid.JID, unicode)]): arguments needed to re-join the rooms This list can be used directly (unpacked) with self.join """ args_list = [] for room in list(client._muc_client.joined_rooms.values()): client._muc_client._removeRoom(room.roomJID) args_list.append((client, room.roomJID, room.nick)) return args_list def _nick(self, room_jid_s, nick, profile_key=C.PROF_KEY_NONE): client = self.host.get_client(profile_key) return self.nick(client, jid.JID(room_jid_s), nick) def nick(self, client, room_jid, nick): """Change nickname in a room""" self.check_room_joined(client, room_jid) return client._muc_client.nick(room_jid, nick) def _leave(self, room_jid, profile_key): client = self.host.get_client(profile_key) return self.leave(client, jid.JID(room_jid)) def leave(self, client, room_jid): self.check_room_joined(client, room_jid) return client._muc_client.leave(room_jid) def _subject(self, room_jid_s, new_subject, profile_key): client = self.host.get_client(profile_key) return self.subject(client, jid.JID(room_jid_s), new_subject) def subject(self, client, room_jid, subject): self.check_room_joined(client, room_jid) return client._muc_client.subject(room_jid, subject) def get_handler(self, client): # create a MUC client and associate it with profile' session muc_client = client._muc_client = LiberviaMUCClient(self) return muc_client def kick(self, client, nick, room_jid, options=None): """Kick a participant from the room @param nick (str): nick of the user to kick @param room_jid_s (JID): jid of the room @param options (dict): attribute with extra info (reason, password) as in #XEP-0045 """ if options is None: options = {} self.check_room_joined(client, room_jid) return client._muc_client.kick(room_jid, nick, reason=options.get("reason", None)) def ban(self, client, entity_jid, room_jid, options=None): """Ban an entity from the room @param entity_jid (JID): bare jid of the entity to be banned @param room_jid (JID): jid of the room @param options: attribute with extra info (reason, password) as in #XEP-0045 """ self.check_room_joined(client, room_jid) if options is None: options = {} assert not entity_jid.resource assert not room_jid.resource return client._muc_client.ban( room_jid, entity_jid, reason=options.get("reason", None) ) def affiliate(self, client, entity_jid, room_jid, options): """Change the affiliation of an entity @param entity_jid (JID): bare jid of the entity @param room_jid_s (JID): jid of the room @param options: attribute with extra info (reason, nick) as in #XEP-0045 """ self.check_room_joined(client, room_jid) assert not entity_jid.resource assert not room_jid.resource assert "affiliation" in options # TODO: handles reason and nick return client._muc_client.modifyAffiliationList( room_jid, [entity_jid], options["affiliation"] ) # Text commands # def cmd_nick(self, client, mess_data): """change nickname @command (group): new_nick - new_nick: new nick to use """ nick = mess_data["unparsed"].strip() if nick: room = mess_data["to"] self.nick(client, room, nick) return False def cmd_join(self, client, mess_data): """join a new room @command (all): JID - JID: room to join (on the same service if full jid is not specified) """ room_raw = mess_data["unparsed"].strip() if room_raw: if self.is_joined_room(client, mess_data["to"]): # we use the same service as the one from the room where the command has # been entered if full jid is not entered muc_service = mess_data["to"].host nick = self.get_room_nick(client, mess_data["to"]) or client.jid.user else: # the command has been entered in a one2one conversation, so we use # our server MUC service as default service muc_service = client.muc_service or "" nick = client.jid.user room_jid = self.text_cmds.get_room_jid(room_raw, muc_service) defer.ensureDeferred(self.join(client, room_jid, nick, {})) return False def cmd_leave(self, client, mess_data): """quit a room @command (group): [ROOM_JID] - ROOM_JID: jid of the room to live (current room if not specified) """ room_raw = mess_data["unparsed"].strip() if room_raw: room = self.text_cmds.get_room_jid(room_raw, mess_data["to"].host) else: room = mess_data["to"] self.leave(client, room) return False def cmd_part(self, client, mess_data): """just a synonym of /leave @command (group): [ROOM_JID] - ROOM_JID: jid of the room to live (current room if not specified) """ return self.cmd_leave(client, mess_data) def cmd_kick(self, client, mess_data): """kick a room member @command (group): ROOM_NICK - ROOM_NICK: the nick of the person to kick """ options = mess_data["unparsed"].strip().split() try: nick = options[0] assert self.is_nick_in_room(client, mess_data["to"], nick) except (IndexError, AssertionError): feedback = _("You must provide a member's nick to kick.") self.text_cmds.feed_back(client, feedback, mess_data) return False reason = " ".join(options[1:]) if len(options) > 1 else None d = self.kick(client, nick, mess_data["to"], {"reason": reason}) def cb(__): feedback_msg = _("You have kicked {}").format(nick) if reason is not None: feedback_msg += _(" for the following reason: {reason}").format( reason=reason ) self.text_cmds.feed_back(client, feedback_msg, mess_data) return True d.addCallback(cb) return d def cmd_ban(self, client, mess_data): """ban an entity from the room @command (group): (JID) [reason] - JID: the JID of the entity to ban - reason: the reason why this entity is being banned """ options = mess_data["unparsed"].strip().split() try: jid_s = options[0] entity_jid = jid.JID(jid_s).userhostJID() assert entity_jid.user assert entity_jid.host except ( RuntimeError, jid.InvalidFormat, AttributeError, IndexError, AssertionError, ): feedback = _( "You must provide a valid JID to ban, like in '/ban contact@example.net'" ) self.text_cmds.feed_back(client, feedback, mess_data) return False reason = " ".join(options[1:]) if len(options) > 1 else None d = self.ban(client, entity_jid, mess_data["to"], {"reason": reason}) def cb(__): feedback_msg = _("You have banned {}").format(entity_jid) if reason is not None: feedback_msg += _(" for the following reason: {reason}").format( reason=reason ) self.text_cmds.feed_back(client, feedback_msg, mess_data) return True d.addCallback(cb) return d def cmd_affiliate(self, client, mess_data): """affiliate an entity to the room @command (group): (JID) [owner|admin|member|none|outcast] - JID: the JID of the entity to affiliate - owner: grant owner privileges - admin: grant admin privileges - member: grant member privileges - none: reset entity privileges - outcast: ban entity """ options = mess_data["unparsed"].strip().split() try: jid_s = options[0] entity_jid = jid.JID(jid_s).userhostJID() assert entity_jid.user assert entity_jid.host except ( RuntimeError, jid.InvalidFormat, AttributeError, IndexError, AssertionError, ): feedback = _( "You must provide a valid JID to affiliate, like in '/affiliate contact@example.net member'" ) self.text_cmds.feed_back(client, feedback, mess_data) return False affiliation = options[1] if len(options) > 1 else "none" if affiliation not in AFFILIATIONS: feedback = _("You must provide a valid affiliation: %s") % " ".join( AFFILIATIONS ) self.text_cmds.feed_back(client, feedback, mess_data) return False d = self.affiliate( client, entity_jid, mess_data["to"], {"affiliation": affiliation} ) def cb(__): feedback_msg = _("New affiliation for {entity}: {affiliation}").format( entity=entity_jid, affiliation=affiliation ) self.text_cmds.feed_back(client, feedback_msg, mess_data) return True d.addCallback(cb) return d def cmd_title(self, client, mess_data): """change room's subject @command (group): title - title: new room subject """ subject = mess_data["unparsed"].strip() if subject: room = mess_data["to"] self.subject(client, room, subject) return False def cmd_topic(self, client, mess_data): """just a synonym of /title @command (group): title - title: new room subject """ return self.cmd_title(client, mess_data) def cmd_list(self, client, mess_data): """list available rooms in a muc server @command (all): [MUC_SERVICE] - MUC_SERVICE: service to request empty value will request room's service for a room, or user's server default MUC service in a one2one chat """ unparsed = mess_data["unparsed"].strip() try: service = jid.JID(unparsed) except RuntimeError: if mess_data["type"] == C.MESS_TYPE_GROUPCHAT: room_jid = mess_data["to"] service = jid.JID(room_jid.host) elif client.muc_service is not None: service = client.muc_service else: msg = D_("No known default MUC service {unparsed}").format( unparsed=unparsed ) self.text_cmds.feed_back(client, msg, mess_data) return False except jid.InvalidFormat: msg = D_("{} is not a valid JID!".format(unparsed)) self.text_cmds.feed_back(client, msg, mess_data) return False d = self.host.getDiscoItems(client, service) d.addCallback(self._show_list_ui, client, service) return False def _whois(self, client, whois_msg, mess_data, target_jid): """Add MUC user information to whois""" if mess_data["type"] != "groupchat": return if target_jid.userhostJID() not in client._muc_client.joined_rooms: log.warning(_("This room has not been joined")) return if not target_jid.resource: return user = client._muc_client.joined_rooms[target_jid.userhostJID()].getUser( target_jid.resource ) whois_msg.append(_("Nickname: %s") % user.nick) if user.entity: whois_msg.append(_("Entity: %s") % user.entity) if user.affiliation != "none": whois_msg.append(_("Affiliation: %s") % user.affiliation) if user.role != "none": whois_msg.append(_("Role: %s") % user.role) if user.status: whois_msg.append(_("Status: %s") % user.status) if user.show: whois_msg.append(_("Show: %s") % user.show) def presence_trigger(self, presence_elt, client): # FIXME: should we add a privacy parameters in settings to activate before # broadcasting the presence to all MUC rooms ? muc_client = client._muc_client for room_jid, room in muc_client.joined_rooms.items(): elt = xml_tools.element_copy(presence_elt) elt["to"] = room_jid.userhost() + "/" + room.nick client.presence.send(elt) return True def presence_received_trigger(self, client, entity, show, priority, statuses): entity_bare = entity.userhostJID() muc_client = client._muc_client if entity_bare in muc_client.joined_rooms: # presence is already handled in (un)availableReceived return False return True @implementer(iwokkel.IDisco) class LiberviaMUCClient(muc.MUCClient): def __init__(self, plugin_parent): self.plugin_parent = plugin_parent muc.MUCClient.__init__(self) self._changing_nicks = set() # used to keep trace of who is changing nick, # and to discard userJoinedRoom signal in this case print("init SatMUCClient OK") @property def joined_rooms(self): return self._rooms @property def host(self): return self.plugin_parent.host @property def client(self): return self.parent @property def _mam(self): return self.plugin_parent._mam @property def _si(self): return self.plugin_parent._si def change_room_state(self, room, new_state): """Check that room is in expected state, and change it @param new_state: one of ROOM_STATE_* """ new_state_idx = ROOM_STATES.index(new_state) if new_state_idx == -1: raise exceptions.InternalError("unknown room state") if new_state_idx < 1: raise exceptions.InternalError( "unexpected new room state ({room}): {state}".format( room=room.userhost(), state=new_state ) ) expected_state = ROOM_STATES[new_state_idx - 1] if room.state != expected_state: log.error( _( "room {room} is not in expected state: room is in state {current_state} " "while we were expecting {expected_state}" ).format( room=room.roomJID.userhost(), current_state=room.state, expected_state=expected_state, ) ) room.state = new_state def _addRoom(self, room): super(LiberviaMUCClient, self)._addRoom(room) room._roster_ok = False # True when occupants list has been fully received room.state = ROOM_STATE_OCCUPANTS # FIXME: check if history_d is not redundant with fully_joined room.fully_joined = defer.Deferred() # called when everything is OK # cache data until room is ready # list of elements which will be re-injected in stream room._cache = [] # we only need to keep last presence status for each jid, so a dict is suitable room._cache_presence = {} async def _join_legacy( self, client: SatXMPPEntity, room_jid: jid.JID, nick: str, password: Optional[str] ) -> muc.Room: """Join room an retrieve history with legacy method""" mess_data_list = await self.host.memory.history_get( room_jid, client.jid.userhostJID(), limit=1, between=True, profile=client.profile, ) if mess_data_list: timestamp = mess_data_list[0][1] # we use seconds since last message to get backlog without duplicates # and we remove 1 second to avoid getting the last message again seconds = int(time.time() - timestamp) - 1 else: seconds = None room = await super(LiberviaMUCClient, self).join( room_jid, nick, muc.HistoryOptions(seconds=seconds), password ) # used to send bridge signal once backlog are written in history room._history_type = HISTORY_LEGACY room._history_d = defer.Deferred() room._history_d.callback(None) return room async def _get_mam_history( self, client: SatXMPPEntity, room: muc.Room, room_jid: jid.JID ) -> None: """Retrieve history for rooms handling MAM""" history_d = room._history_d = defer.Deferred() # we trigger now the deferred so all callback are processed as soon as possible # and in order history_d.callback(None) last_mess = await self.host.memory.history_get( room_jid, None, limit=1, between=False, filters={"types": C.MESS_TYPE_GROUPCHAT, "last_stanza_id": True}, profile=client.profile, ) if last_mess: stanza_id = last_mess[0][-1]["stanza_id"] rsm_req = rsm.RSMRequest(max_=20, after=stanza_id) no_loop = False else: log.info( "We have no MAM archive for room {room_jid}.".format(room_jid=room_jid) ) # we don't want the whole archive if we have no archive yet # as it can be huge rsm_req = rsm.RSMRequest(max_=50, before="") no_loop = True mam_req = mam.MAMRequest(rsm_=rsm_req) complete = False count = 0 while not complete: try: mam_data = await self._mam.get_archives(client, mam_req, service=room_jid) except xmpp_error.StanzaError as e: if last_mess and e.condition == "item-not-found": log.warning( f"requested item (with id {stanza_id!r}) can't be found in " f"history of {room_jid}, history has probably been purged on " f"server." ) # we get last items like for a new room rsm_req = rsm.RSMRequest(max_=50, before="") mam_req = mam.MAMRequest(rsm_=rsm_req) no_loop = True continue else: raise e elt_list, rsm_response, mam_response = mam_data complete = True if no_loop else mam_response["complete"] # we update MAM request for next iteration mam_req.rsm.after = rsm_response.last if not elt_list: break else: count += len(elt_list) for mess_elt in elt_list: try: fwd_message_elt = self._mam.get_message_from_result( client, mess_elt, mam_req, service=room_jid ) except exceptions.DataError: continue if fwd_message_elt.getAttribute("to"): log.warning( 'Forwarded message element has a "to" attribute while it is ' "forbidden by specifications" ) fwd_message_elt["to"] = client.jid.full() client.messageProt.onMessage(fwd_message_elt) client._muc_client._onGroupChat(fwd_message_elt) if not count: log.info( _( "No message received while offline in {room_jid}".format( room_jid=room_jid ) ) ) else: log.info( _( "We have received {num_mess} message(s) in {room_jid} while " "offline." ).format(num_mess=count, room_jid=room_jid) ) # for legacy history, the following steps are done in receivedSubject but for MAM # the order is different (we have to join then get MAM archive, so subject # is received before archive), so we change state and add the callbacks here. self.change_room_state(room, ROOM_STATE_LIVE) history_d.addCallbacks( self._history_cb, self._history_eb, [room], errbackArgs=[room] ) # we wait for all callbacks to be processed await history_d async def _join_mam( self, client: SatXMPPEntity, room_jid: jid.JID, nick: str, password: Optional[str] ) -> muc.Room: """Join room and retrieve history using MAM""" room = await super(LiberviaMUCClient, self).join( # we don't want any history from room as we'll get it with MAM room_jid, nick, muc.HistoryOptions(maxStanzas=0), password=password, ) room._history_type = HISTORY_MAM # MAM history retrieval can be very long, and doesn't need to be sync, so we don't # wait for it defer.ensureDeferred(self._get_mam_history(client, room, room_jid)) room.fully_joined.callback(room) return room async def join(self, room_jid, nick, password=None): room_service = jid.JID(room_jid.host) has_mam = await self.host.hasFeature(self.client, mam.NS_MAM, room_service) if not self._mam or not has_mam: return await self._join_legacy(self.client, room_jid, nick, password) else: return await self._join_mam(self.client, room_jid, nick, password) ## presence/roster ## def availableReceived(self, presence): """ Available presence was received. """ # XXX: we override MUCClient.availableReceived to fix bugs # (affiliation and role are not set) room, user = self._getRoomUser(presence) if room is None: return if user is None: nick = presence.sender.resource if not nick: log.warning( _("missing nick in presence: {xml}").format( xml=presence.toElement().toXml() ) ) return user = muc.User(nick, presence.entity) # we want to keep statuses with room # XXX: presence if broadcasted, and we won't have status code # like 110 (REALJID_PUBLIC) after first <presence/> received # so we keep only the initial <presence> (with SELF_PRESENCE), # thus we check if attribute already exists if ( not hasattr(room, "statuses") and muc.STATUS_CODE.SELF_PRESENCE in presence.mucStatuses ): room.statuses = presence.mucStatuses # Update user data user.role = presence.role user.affiliation = presence.affiliation user.status = presence.status user.show = presence.show if room.inRoster(user): self.userUpdatedStatus(room, user, presence.show, presence.status) else: room.addUser(user) self.userJoinedRoom(room, user) def unavailableReceived(self, presence): # XXX: we override this method to manage nickname change """ Unavailable presence was received. If this was received from a MUC room occupant JID, that occupant has left the room. """ room, user = self._getRoomUser(presence) if room is None or user is None: return room.removeUser(user) if muc.STATUS_CODE.NEW_NICK in presence.mucStatuses: self._changing_nicks.add(presence.nick) self.user_changed_nick(room, user, presence.nick) else: self._changing_nicks.discard(presence.nick) self.userLeftRoom(room, user) def userJoinedRoom(self, room, user): if user.nick == room.nick: # we have received our own nick, # this mean that the full room roster was received self.change_room_state(room, ROOM_STATE_SELF_PRESENCE) log.debug( "room {room} joined with nick {nick}".format( room=room.occupantJID.userhost(), nick=user.nick ) ) # we set type so we don't have to use a deferred # with disco to check entity type self.host.memory.update_entity_data( self.client, room.roomJID, C.ENTITY_TYPE, C.ENTITY_TYPE_MUC ) elif room.state not in (ROOM_STATE_OCCUPANTS, ROOM_STATE_LIVE): log.warning( "Received user presence data in a room before its initialisation " "(current state: {state})," "this is not standard! Ignoring it: {room} ({nick})".format( state=room.state, room=room.roomJID.userhost(), nick=user.nick ) ) return else: if not room.fully_joined.called: return for cb in room.on_joined_callbacks: defer.ensureDeferred(cb(room, user)) try: self._changing_nicks.remove(user.nick) except KeyError: # this is a new user log.debug( _("user {nick} has joined room {room_id}").format( nick=user.nick, room_id=room.occupantJID.userhost() ) ) if not self.host.trigger.point( "MUC user joined", room, user, self.client.profile ): return extra = { "info_type": ROOM_USER_JOINED, "user_affiliation": user.affiliation, "user_role": user.role, "user_nick": user.nick, } if user.entity is not None: extra["user_entity"] = user.entity.full() mess_data = { # dict is similar to the one used in client.onMessage "from": room.roomJID, "to": self.client.jid, "uid": str(uuid.uuid4()), "message": {"": D_("=> {} has joined the room").format(user.nick)}, "subject": {}, "type": C.MESS_TYPE_INFO, "extra": extra, "timestamp": time.time(), } # FIXME: we disable presence in history as it's taking a lot of space # while not indispensable. In the future an option may allow # to re-enable it # self.client.message_add_to_history(mess_data) self.client.message_send_to_bridge(mess_data) def userLeftRoom(self, room, user): if not self.host.trigger.point("MUC user left", room, user, self.client.profile): return if user.nick == room.nick: # we left the room room_jid_s = room.roomJID.userhost() log.info( _("Room ({room}) left ({profile})").format( room=room_jid_s, profile=self.client.profile ) ) self.host.memory.del_entity_cache( room.roomJID, profile_key=self.client.profile ) self.host.bridge.muc_room_left(room.roomJID.userhost(), self.client.profile) elif room.state != ROOM_STATE_LIVE: log.warning( "Received user presence data in a room before its initialisation (current state: {state})," "this is not standard! Ignoring it: {room} ({nick})".format( state=room.state, room=room.roomJID.userhost(), nick=user.nick ) ) return else: if not room.fully_joined.called: return log.debug( _("user {nick} left room {room_id}").format( nick=user.nick, room_id=room.occupantJID.userhost() ) ) for cb in room.on_left_callbacks: defer.ensureDeferred(cb(room, user)) extra = { "info_type": ROOM_USER_LEFT, "user_affiliation": user.affiliation, "user_role": user.role, "user_nick": user.nick, } if user.entity is not None: extra["user_entity"] = user.entity.full() mess_data = { # dict is similar to the one used in client.onMessage "from": room.roomJID, "to": self.client.jid, "uid": str(uuid.uuid4()), "message": {"": D_("<= {} has left the room").format(user.nick)}, "subject": {}, "type": C.MESS_TYPE_INFO, "extra": extra, "timestamp": time.time(), } # FIXME: disable history, see userJoinRoom comment # self.client.message_add_to_history(mess_data) self.client.message_send_to_bridge(mess_data) def user_changed_nick(self, room, user, new_nick): self.host.bridge.muc_room_user_changed_nick( room.roomJID.userhost(), user.nick, new_nick, self.client.profile ) def userUpdatedStatus(self, room, user, show, status): entity = jid.JID(tuple=(room.roomJID.user, room.roomJID.host, user.nick)) if hasattr(room, "_cache_presence"): # room has a cache for presence, meaning it has not been fully # joined yet. So we put presence in cache, and stop workflow. # Or delete current presence and continue workflow if it's an # "unavailable" presence cache = room._cache_presence cache[entity] = { "room": room, "user": user, "show": show, "status": status, } return statuses = {C.PRESENCE_STATUSES_DEFAULT: status or ""} self.host.bridge.presence_update( entity.full(), show or "", 0, statuses, self.client.profile ) ## messages ## def receivedGroupChat(self, room, user, body): log.debug( "receivedGroupChat: room=%s user=%s body=%s" % (room.roomJID.full(), user, body) ) ## subject ## def groupChatReceived(self, message): """ A group chat message has been received from a MUC room. There are a few event methods that may get called here. L{receivedGroupChat}, L{receivedSubject} or L{receivedHistory}. """ # We override this method to fix subject handling (empty strings were discarded) # FIXME: remove this once fixed upstream room, user = self._getRoomUser(message) if room is None: log.warning( "No room found for message: {message}".format( message=message.toElement().toXml() ) ) return if message.subject is not None: self.receivedSubject(room, user, message.subject) elif message.delay is None: self.receivedGroupChat(room, user, message) def subject(self, room, subject): return muc.MUCClientProtocol.subject(self, room, subject) def _history_cb(self, __, room): """Called when history have been written to database and subject is received this method will finish joining by: - sending message to bridge - calling fully_joined deferred (for legacy history) - sending stanza put in cache - cleaning variables not needed anymore """ args = self.plugin_parent._get_room_joined_args(room, self.client.profile) self.host.bridge.muc_room_joined(*args) if room._history_type == HISTORY_LEGACY: room.fully_joined.callback(room) del room._history_d del room._history_type cache = room._cache del room._cache cache_presence = room._cache_presence del room._cache_presence for elem in cache: self.client.xmlstream.dispatch(elem) for presence_data in cache_presence.values(): if not presence_data["show"] and not presence_data["status"]: # occupants are already sent in muc_room_joined, so if we don't have # extra information like show or statuses, we can discard the signal continue else: self.userUpdatedStatus(**presence_data) def _history_eb(self, failure_, room): log.error("Error while managing history: {}".format(failure_)) self._history_cb(None, room) def receivedSubject(self, room, user, subject): # when subject is received, we know that we have whole roster and history # cf. http://xmpp.org/extensions/xep-0045.html#enter-subject room.subject = subject # FIXME: subject doesn't handle xml:lang if room.state != ROOM_STATE_LIVE: if room._history_type == HISTORY_LEGACY: self.change_room_state(room, ROOM_STATE_LIVE) room._history_d.addCallbacks( self._history_cb, self._history_eb, [room], errbackArgs=[room] ) else: # the subject has been changed log.debug( _("New subject for room ({room_id}): {subject}").format( room_id=room.roomJID.full(), subject=subject ) ) self.host.bridge.muc_room_new_subject( room.roomJID.userhost(), subject, self.client.profile ) ## disco ## def getDiscoInfo(self, requestor, target, nodeIdentifier=""): return [disco.DiscoFeature(NS_MUC)] def getDiscoItems(self, requestor, target, nodeIdentifier=""): # TODO: manage room queries ? Bad for privacy, must be disabled by default # see XEP-0045 § 6.7 return []