comparison libervia/backend/plugins/plugin_xep_0444.py @ 4157:04cdcb3fd713

plugin XEP-0444: complete implementation: "Message Reactions" implementation is now working and updating history. The `message_reactions_set` bridge method can be used by frontend to `replace`, `add`, `remove` or `toggle` reactions. History is properly updated, and signal are sent when necessary.
author Goffi <goffi@goffi.org>
date Wed, 22 Nov 2023 15:10:04 +0100
parents 4b842c1fb686
children 3b3cd9453d9b
comparison
equal deleted inserted replaced
4156:2729d424dee7 4157:04cdcb3fd713
14 # GNU Affero General Public License for more details. 14 # GNU Affero General Public License for more details.
15 15
16 # You should have received a copy of the GNU Affero General Public License 16 # You should have received a copy of the GNU Affero General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. 17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 18
19 from typing import List, Iterable 19 from typing import Iterable, List
20 from copy import deepcopy 20
21 21 from twisted.internet import defer
22 from twisted.words.protocols.jabber import jid, xmlstream 22 from twisted.words.protocols.jabber import jid, xmlstream
23 from twisted.words.xish import domish 23 from twisted.words.xish import domish
24 from twisted.internet import defer
25 from wokkel import disco, iwokkel 24 from wokkel import disco, iwokkel
26 from zope.interface import implementer 25 from zope.interface import implementer
27 26
27 import emoji
28 from libervia.backend.core import exceptions
28 from libervia.backend.core.constants import Const as C 29 from libervia.backend.core.constants import Const as C
30 from libervia.backend.core.core_types import SatXMPPEntity
29 from libervia.backend.core.i18n import _ 31 from libervia.backend.core.i18n import _
30 from libervia.backend.core.log import getLogger 32 from libervia.backend.core.log import getLogger
31 from libervia.backend.core import exceptions
32 from libervia.backend.core.core_types import SatXMPPEntity
33 from libervia.backend.memory.sqla_mapping import History 33 from libervia.backend.memory.sqla_mapping import History
34 from libervia.backend.models.core import MessageReactionData
35 from libervia.backend.tools.utils import aio
36 from libervia.backend.memory.sqla import select
37 from sqlalchemy.orm.attributes import flag_modified
34 38
35 log = getLogger(__name__) 39 log = getLogger(__name__)
36 40
37 PLUGIN_INFO = { 41 PLUGIN_INFO = {
38 C.PI_NAME: "Message Reactions", 42 C.PI_NAME: "Message Reactions",
39 C.PI_IMPORT_NAME: "XEP-0444", 43 C.PI_IMPORT_NAME: "XEP-0444",
40 C.PI_TYPE: C.PLUG_TYPE_XEP, 44 C.PI_TYPE: C.PLUG_TYPE_XEP,
41 C.PI_MODES: C.PLUG_MODE_BOTH, 45 C.PI_MODES: C.PLUG_MODE_BOTH,
42 C.PI_PROTOCOLS: ["XEP-0444"], 46 C.PI_PROTOCOLS: ["XEP-0444"],
43 C.PI_DEPENDENCIES: ["XEP-0334"], 47 C.PI_DEPENDENCIES: ["XEP-0045", "XEP-0334"],
44 C.PI_MAIN: "XEP_0444", 48 C.PI_MAIN: "XEP_0444",
45 C.PI_HANDLER: "yes", 49 C.PI_HANDLER: "yes",
46 C.PI_DESCRIPTION: _("""Message Reactions implementation"""), 50 C.PI_DESCRIPTION: _("""Message Reactions implementation"""),
47 } 51 }
48 52
49 NS_REACTIONS = "urn:xmpp:reactions:0" 53 NS_REACTIONS = "urn:xmpp:reactions:0"
50 54
51 55
52 class XEP_0444: 56 class XEP_0444:
53 57 # TODO: implement and use occupant-ID (XEP-0421), and check sender (see
58 # https://xmpp.org/extensions/xep-0444.html#acceptable-reactions).
54 def __init__(self, host): 59 def __init__(self, host):
55 log.info(_("Message Reactions initialization")) 60 log.info(_("Message Reactions initialization"))
56 host.register_namespace("reactions", NS_REACTIONS) 61 host.register_namespace("reactions", NS_REACTIONS)
57 self.host = host 62 self.host = host
63 self._m = host.plugins["XEP-0045"]
58 self._h = host.plugins["XEP-0334"] 64 self._h = host.plugins["XEP-0334"]
59 host.bridge.add_method( 65 host.bridge.add_method(
60 "message_reactions_set", 66 "message_reactions_set",
61 ".plugin", 67 ".plugin",
62 in_sign="ssas", 68 in_sign="sasss",
63 out_sign="", 69 out_sign="",
64 method=self._reactions_set, 70 method=self._reactions_set,
65 async_=True, 71 async_=True,
66 ) 72 )
67 host.trigger.add("message_received", self._message_received_trigger) 73 host.trigger.add("message_received", self._message_received_trigger)
68 74
69 def get_handler(self, client): 75 def get_handler(self, client):
70 return XEP_0444_Handler() 76 return XEP_0444_Handler()
71 77
78 @aio
79 async def get_history_from_reaction_id(
80 self, client: SatXMPPEntity, message_elt: domish.Element, reaction_id: str
81 ) -> History | None:
82 """Retrieves history rows that match a specific reaction_id.
83
84 The retrieval criteria vary based on the message type, according to XEP-0444.
85
86 @param message_elt: The message element, used to determine the type of message.
87 @param reaction_id: The reaction ID to match in the history.
88 """
89 profile_id = self.host.memory.storage.profiles[client.profile]
90 async with self.host.memory.storage.session() as session:
91 if message_elt.type == C.MESS_TYPE_GROUPCHAT:
92 query = select(History).where(
93 History.profile_id == profile_id, History.stanza_id == reaction_id
94 )
95 else:
96 query = select(History).where(
97 History.profile_id == profile_id, History.origin_id == reaction_id
98 )
99
100 result = await session.execute(query)
101 history = result.scalars().first()
102
103 return history
104
72 async def _message_received_trigger( 105 async def _message_received_trigger(
73 self, 106 self,
74 client: SatXMPPEntity, 107 client: SatXMPPEntity,
75 message_elt: domish.Element, 108 message_elt: domish.Element,
76 post_treat: defer.Deferred 109 post_treat: defer.Deferred,
77 ) -> bool: 110 ) -> bool:
111 reactions_elt = next(message_elt.elements(NS_REACTIONS, "reactions"), None)
112 if reactions_elt is not None:
113 reaction_id = reactions_elt.getAttribute("id")
114 history = await self.get_history_from_reaction_id(
115 client, message_elt, reaction_id
116 )
117 if history is None:
118 log.warning(
119 f"Can't find matching message for reaction: {reactions_elt.toXml()}"
120 )
121 else:
122 if not reaction_id:
123 log.warning(f"Invalid reaction: {reactions_elt.toXml()}")
124 return False
125 from_jid = jid.JID(message_elt["from"])
126 reactions = set()
127 for reaction_elt in reactions_elt.elements("reaction"):
128 reaction = str(reaction_elt)
129 if not emoji.is_emoji(reaction):
130 log.warning(f"ignoring invalide reaction: {reaction_elt.toXml()}")
131 continue
132 reactions.add(reaction)
133 await self.update_history_reactions(client, history, from_jid, reactions)
134
135 return False
136
78 return True 137 return True
79 138
80 def _reactions_set(self, message_id: str, profile: str, reactions: List[str]) -> None: 139 def _reactions_set(
140 self,
141 message_id: str,
142 reactions: List[str],
143 update_type: str = "replace",
144 profile: str = C.PROF_KEY_NONE,
145 ) -> defer.Deferred[None]:
81 client = self.host.get_client(profile) 146 client = self.host.get_client(profile)
82 return defer.ensureDeferred( 147 return defer.ensureDeferred(
83 self.set_reactions(client, message_id) 148 self.set_reactions(client, message_id, reactions, update_type)
84 ) 149 )
150
151 async def get_history_from_uid(
152 self, client: SatXMPPEntity, message_id: str
153 ) -> History:
154 """Retrieve the chat history associated with a given message ID.
155
156 @param message_id: The Libervia specific identifier of the message
157
158 @return: An instance of History containing the chat history, messages, and
159 subjects related to the specified message ID.
160
161 @raises exceptions.NotFound: The history corresponding to the given message ID is
162 not found in the database.
163 """
164 history = await self.host.memory.storage.get(
165 client,
166 History,
167 History.uid,
168 message_id,
169 )
170 if history is None:
171 raise exceptions.NotFound(
172 f"message to retract not found in database ({message_id})"
173 )
174 return history
85 175
86 def send_reactions( 176 def send_reactions(
87 self, 177 self,
88 client: SatXMPPEntity, 178 client: SatXMPPEntity,
89 dest_jid: jid.JID, 179 dest_jid: jid.JID,
180 message_type: str,
90 message_id: str, 181 message_id: str,
91 reactions: Iterable[str] 182 reactions: Iterable[str],
92 ) -> None: 183 ) -> None:
93 """Send the <message> stanza containing the reactions 184 """Send the <message> stanza containing the reactions
94 185
95 @param dest_jid: recipient of the reaction 186 @param dest_jid: recipient of the reaction
96 @param message_id: either <origin-id> or message's ID 187 @param message_id: either <origin-id> or message's ID
97 see https://xmpp.org/extensions/xep-0444.html#business-id 188 see https://xmpp.org/extensions/xep-0444.html#business-id
98 """ 189 """
99 message_elt = domish.Element((None, "message")) 190 message_elt = domish.Element((None, "message"))
100 message_elt["from"] = client.jid.full() 191 message_elt["from"] = client.jid.full()
101 message_elt["to"] = dest_jid.full() 192 message_elt["to"] = dest_jid.full()
193 message_elt["type"] = message_type
102 reactions_elt = message_elt.addElement((NS_REACTIONS, "reactions")) 194 reactions_elt = message_elt.addElement((NS_REACTIONS, "reactions"))
103 reactions_elt["id"] = message_id 195 reactions_elt["id"] = message_id
104 for r in set(reactions): 196 for r in set(reactions):
105 reactions_elt.addElement("reaction", content=r) 197 reactions_elt.addElement("reaction", content=r)
106 self._h.add_hint_elements(message_elt, [self._h.HINT_STORE]) 198 self._h.add_hint_elements(message_elt, [self._h.HINT_STORE])
107 client.send(message_elt) 199 client.send(message_elt)
108 200
109 async def add_reactions_to_history( 201 def convert_to_replace_update(
110 self, 202 self,
203 current_reactions: set[str],
204 reactions_new: Iterable[str],
205 update_type: str,
206 ) -> set[str]:
207 """Convert the given update to a replace update.
208
209 @param current_reactions: reaction of reacting JID before update
210 @param reactions_new: New reactions to be updated.
211 @param update_type: Original type of update (add, remove, toggle, replace).
212 @return: reactions for a replace operation.
213
214 @raise ValueError: invalid ``update_type``.
215 """
216 new_reactions_set = set(reactions_new)
217
218 if update_type == "replace":
219 return new_reactions_set
220
221 if update_type == "add":
222 replace_reactions = current_reactions | new_reactions_set
223 elif update_type == "remove":
224 replace_reactions = current_reactions - new_reactions_set
225 elif update_type == "toggle":
226 replace_reactions = current_reactions ^ new_reactions_set
227 else:
228 raise ValueError(f"Invalid update type: {update_type!r}")
229
230 return replace_reactions
231
232 async def update_history_reactions(
233 self,
234 client: SatXMPPEntity,
111 history: History, 235 history: History,
112 from_jid: jid.JID, 236 reacting_jid: jid.JID,
113 reactions: Iterable[str] 237 reactions_new: Iterable[str],
114 ) -> None: 238 update_type: str = "replace",
115 """Update History instance with given reactions 239 store: bool = True,
116 240 ) -> set[str]:
117 @param history: storage History instance 241 """Update reactions in History instance and optionally store and signal it.
118 will be updated in DB 242
119 "summary" field of history.extra["reactions"] will also be updated 243 @param history: storage History instance to be updated
120 @param from_jid: author of the reactions 244 @param reacting_jid: author of the reactions
121 @param reactions: list of reactions 245 @param reactions_new: new reactions to update
122 """ 246 @param update_type: Original type of update (add, remove, toggle, replace).
123 history.extra = deepcopy(history.extra) if history.extra else {} 247 @param store: if True, update history in storage, and send a `message_update`
124 h_reactions = history.extra.setdefault("reactions", {}) 248 signal with the new reactions.
125 # reactions mapped by originating JID 249 @return: set of reactions for this JID for a "replace" update
126 by_jid = h_reactions.setdefault("by_jid", {}) 250 """
127 # reactions are sorted to in summary to keep a consistent order 251 # FIXME: handle race conditions
128 h_reactions["by_jid"][from_jid.userhost()] = sorted(list(set(reactions))) 252 if history.type == C.MESS_TYPE_GROUPCHAT:
129 h_reactions["summary"] = sorted(list(set().union(*by_jid.values()))) 253 entity_jid_s = reacting_jid.full()
130 await self.host.memory.storage.session_add(history) 254 else:
255 entity_jid_s = reacting_jid.userhost()
256 if history.extra is None:
257 history.extra = {}
258 extra = history.extra
259 reactions = extra.get("reactions", {})
260
261 current_reactions = {
262 reaction for reaction, jids in reactions.items() if entity_jid_s in jids
263 }
264 reactions_replace_set = self.convert_to_replace_update(
265 current_reactions, reactions_new, update_type
266 )
267
268 for reaction in current_reactions - reactions_replace_set:
269 reaction_jids = reactions[reaction]
270 reaction_jids.remove(entity_jid_s)
271 if not reaction_jids:
272 del reactions[reaction]
273
274 for reaction in reactions_replace_set - current_reactions:
275 reactions.setdefault(reaction, []).append(entity_jid_s)
276
277 # we want to have a constant order in reactions
278 extra["reactions"] = dict(sorted(reactions.items()))
279
280 if store:
281 # FIXME: this is not a clean way to flag "extra" as modified, but a deepcopy
282 # is for some reason not working here.
283 flag_modified(history, "extra")
284 await self.host.memory.storage.add(history)
285 # we send the signal for frontends update
286 data = MessageReactionData(reactions=extra["reactions"])
287 self.host.bridge.message_update(
288 history.uid,
289 C.MESS_UPDATE_REACTION,
290 data.model_dump_json(),
291 client.profile,
292 )
293
294 return reactions_replace_set
131 295
132 async def set_reactions( 296 async def set_reactions(
133 self, 297 self,
134 client: SatXMPPEntity, 298 client: SatXMPPEntity,
135 message_id: str, 299 message_id: str,
136 reactions: Iterable[str] 300 reactions: Iterable[str],
301 update_type: str = "replace",
137 ) -> None: 302 ) -> None:
138 """Set and replace reactions to a message 303 """Set and replace reactions to a message
139 304
140 @param message_id: internal ID of the message 305 @param message_id: internal ID of the message
141 @param rections: lsit of emojis to used to react to the message 306 @param rections: lsit of emojis to used to react to the message
142 use empty list to remove all reactions 307 use empty list to remove all reactions
308 @param update_type: how to use the reaction to make the update, can be "replace",
309 "add", "remove" or "toggle".
143 """ 310 """
144 if not message_id: 311 if not message_id:
145 raise ValueError("message_id can't be empty") 312 raise ValueError("message_id can't be empty")
146 history = await self.host.memory.storage.get( 313
147 client, History, History.uid, message_id, 314 history = await self.get_history_from_uid(client, message_id)
148 joined_loads=[History.messages, History.subjects] 315 if history.origin_id is not None:
149 ) 316 mess_id = history.origin_id
150 if history is None: 317 else:
151 raise exceptions.NotFound( 318 mess_id = history.stanza_id
152 f"message to retract not found in database ({message_id})" 319
153 ) 320 if mess_id is None:
154 mess_id = history.origin_id or history.stanza_id
155 if not mess_id:
156 raise exceptions.DataError( 321 raise exceptions.DataError(
157 "target message has neither origin-id nor message-id, we can't send a " 322 "target message has neither origin-id nor message-id, we can't send a "
158 "reaction" 323 "reaction"
159 ) 324 )
160 await self.add_reactions_to_history(history, client.jid, reactions) 325
161 self.send_reactions(client, history.dest_jid, mess_id, reactions) 326 if history.source == client.jid.userhost():
327 dest_jid = history.dest_jid
328 else:
329 dest_jid = history.source_jid
330
331 if history.type == C.MESS_TYPE_GROUPCHAT:
332 # the reaction is for the chat room itself
333 dest_jid = dest_jid.userhostJID()
334 reacting_jid = self._m.get_room_user_jid(client, dest_jid)
335 # we don't store reactions for MUC, at this will be done when we receive back
336 # the reaction
337 store = False
338 else:
339 # we use bare JIDs for one2one message, except if the recipient is from a MUC
340 if self._m.is_room(client, dest_jid):
341 # this is a private message in a room, we need to use the MUC JID
342 reacting_jid = self._m.get_room_user_jid(client, dest_jid.userhostJID())
343 else:
344 # this is a classic one2one message, we need the bare JID
345 reacting_jid = client.jid.userhostJID()
346 dest_jid = dest_jid.userhostJID()
347 store = True
348
349 reaction_replace_set = await self.update_history_reactions(
350 client, history, reacting_jid, reactions, update_type, store
351 )
352
353 self.send_reactions(client, dest_jid, history.type, mess_id, reaction_replace_set)
162 354
163 355
164 @implementer(iwokkel.IDisco) 356 @implementer(iwokkel.IDisco)
165 class XEP_0444_Handler(xmlstream.XMPPHandler): 357 class XEP_0444_Handler(xmlstream.XMPPHandler):
166
167 def getDiscoInfo(self, requestor, service, nodeIdentifier=""): 358 def getDiscoInfo(self, requestor, service, nodeIdentifier=""):
168 return [disco.DiscoFeature(NS_REACTIONS)] 359 return [disco.DiscoFeature(NS_REACTIONS)]
169 360
170 def getDiscoItems(self, requestor, service, nodeIdentifier=""): 361 def getDiscoItems(self, requestor, service, nodeIdentifier=""):
171 return [] 362 return []