Mercurial > libervia-backend
comparison libervia/backend/plugins/plugin_xep_0444.py @ 4157:04cdcb3fd713
plugin XEP-0444: complete implementation:
"Message Reactions" implementation is now working and updating history. The
`message_reactions_set` bridge method can be used by frontend to `replace`, `add`,
`remove` or `toggle` reactions. History is properly updated, and signal are sent when
necessary.
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 22 Nov 2023 15:10:04 +0100 |
parents | 4b842c1fb686 |
children | 3b3cd9453d9b |
comparison
equal
deleted
inserted
replaced
4156:2729d424dee7 | 4157:04cdcb3fd713 |
---|---|
14 # GNU Affero General Public License for more details. | 14 # GNU Affero General Public License for more details. |
15 | 15 |
16 # You should have received a copy of the GNU Affero General Public License | 16 # You should have received a copy of the GNU Affero General Public License |
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. | 17 # along with this program. If not, see <http://www.gnu.org/licenses/>. |
18 | 18 |
19 from typing import List, Iterable | 19 from typing import Iterable, List |
20 from copy import deepcopy | 20 |
21 | 21 from twisted.internet import defer |
22 from twisted.words.protocols.jabber import jid, xmlstream | 22 from twisted.words.protocols.jabber import jid, xmlstream |
23 from twisted.words.xish import domish | 23 from twisted.words.xish import domish |
24 from twisted.internet import defer | |
25 from wokkel import disco, iwokkel | 24 from wokkel import disco, iwokkel |
26 from zope.interface import implementer | 25 from zope.interface import implementer |
27 | 26 |
27 import emoji | |
28 from libervia.backend.core import exceptions | |
28 from libervia.backend.core.constants import Const as C | 29 from libervia.backend.core.constants import Const as C |
30 from libervia.backend.core.core_types import SatXMPPEntity | |
29 from libervia.backend.core.i18n import _ | 31 from libervia.backend.core.i18n import _ |
30 from libervia.backend.core.log import getLogger | 32 from libervia.backend.core.log import getLogger |
31 from libervia.backend.core import exceptions | |
32 from libervia.backend.core.core_types import SatXMPPEntity | |
33 from libervia.backend.memory.sqla_mapping import History | 33 from libervia.backend.memory.sqla_mapping import History |
34 from libervia.backend.models.core import MessageReactionData | |
35 from libervia.backend.tools.utils import aio | |
36 from libervia.backend.memory.sqla import select | |
37 from sqlalchemy.orm.attributes import flag_modified | |
34 | 38 |
35 log = getLogger(__name__) | 39 log = getLogger(__name__) |
36 | 40 |
37 PLUGIN_INFO = { | 41 PLUGIN_INFO = { |
38 C.PI_NAME: "Message Reactions", | 42 C.PI_NAME: "Message Reactions", |
39 C.PI_IMPORT_NAME: "XEP-0444", | 43 C.PI_IMPORT_NAME: "XEP-0444", |
40 C.PI_TYPE: C.PLUG_TYPE_XEP, | 44 C.PI_TYPE: C.PLUG_TYPE_XEP, |
41 C.PI_MODES: C.PLUG_MODE_BOTH, | 45 C.PI_MODES: C.PLUG_MODE_BOTH, |
42 C.PI_PROTOCOLS: ["XEP-0444"], | 46 C.PI_PROTOCOLS: ["XEP-0444"], |
43 C.PI_DEPENDENCIES: ["XEP-0334"], | 47 C.PI_DEPENDENCIES: ["XEP-0045", "XEP-0334"], |
44 C.PI_MAIN: "XEP_0444", | 48 C.PI_MAIN: "XEP_0444", |
45 C.PI_HANDLER: "yes", | 49 C.PI_HANDLER: "yes", |
46 C.PI_DESCRIPTION: _("""Message Reactions implementation"""), | 50 C.PI_DESCRIPTION: _("""Message Reactions implementation"""), |
47 } | 51 } |
48 | 52 |
49 NS_REACTIONS = "urn:xmpp:reactions:0" | 53 NS_REACTIONS = "urn:xmpp:reactions:0" |
50 | 54 |
51 | 55 |
52 class XEP_0444: | 56 class XEP_0444: |
53 | 57 # TODO: implement and use occupant-ID (XEP-0421), and check sender (see |
58 # https://xmpp.org/extensions/xep-0444.html#acceptable-reactions). | |
54 def __init__(self, host): | 59 def __init__(self, host): |
55 log.info(_("Message Reactions initialization")) | 60 log.info(_("Message Reactions initialization")) |
56 host.register_namespace("reactions", NS_REACTIONS) | 61 host.register_namespace("reactions", NS_REACTIONS) |
57 self.host = host | 62 self.host = host |
63 self._m = host.plugins["XEP-0045"] | |
58 self._h = host.plugins["XEP-0334"] | 64 self._h = host.plugins["XEP-0334"] |
59 host.bridge.add_method( | 65 host.bridge.add_method( |
60 "message_reactions_set", | 66 "message_reactions_set", |
61 ".plugin", | 67 ".plugin", |
62 in_sign="ssas", | 68 in_sign="sasss", |
63 out_sign="", | 69 out_sign="", |
64 method=self._reactions_set, | 70 method=self._reactions_set, |
65 async_=True, | 71 async_=True, |
66 ) | 72 ) |
67 host.trigger.add("message_received", self._message_received_trigger) | 73 host.trigger.add("message_received", self._message_received_trigger) |
68 | 74 |
69 def get_handler(self, client): | 75 def get_handler(self, client): |
70 return XEP_0444_Handler() | 76 return XEP_0444_Handler() |
71 | 77 |
78 @aio | |
79 async def get_history_from_reaction_id( | |
80 self, client: SatXMPPEntity, message_elt: domish.Element, reaction_id: str | |
81 ) -> History | None: | |
82 """Retrieves history rows that match a specific reaction_id. | |
83 | |
84 The retrieval criteria vary based on the message type, according to XEP-0444. | |
85 | |
86 @param message_elt: The message element, used to determine the type of message. | |
87 @param reaction_id: The reaction ID to match in the history. | |
88 """ | |
89 profile_id = self.host.memory.storage.profiles[client.profile] | |
90 async with self.host.memory.storage.session() as session: | |
91 if message_elt.type == C.MESS_TYPE_GROUPCHAT: | |
92 query = select(History).where( | |
93 History.profile_id == profile_id, History.stanza_id == reaction_id | |
94 ) | |
95 else: | |
96 query = select(History).where( | |
97 History.profile_id == profile_id, History.origin_id == reaction_id | |
98 ) | |
99 | |
100 result = await session.execute(query) | |
101 history = result.scalars().first() | |
102 | |
103 return history | |
104 | |
72 async def _message_received_trigger( | 105 async def _message_received_trigger( |
73 self, | 106 self, |
74 client: SatXMPPEntity, | 107 client: SatXMPPEntity, |
75 message_elt: domish.Element, | 108 message_elt: domish.Element, |
76 post_treat: defer.Deferred | 109 post_treat: defer.Deferred, |
77 ) -> bool: | 110 ) -> bool: |
111 reactions_elt = next(message_elt.elements(NS_REACTIONS, "reactions"), None) | |
112 if reactions_elt is not None: | |
113 reaction_id = reactions_elt.getAttribute("id") | |
114 history = await self.get_history_from_reaction_id( | |
115 client, message_elt, reaction_id | |
116 ) | |
117 if history is None: | |
118 log.warning( | |
119 f"Can't find matching message for reaction: {reactions_elt.toXml()}" | |
120 ) | |
121 else: | |
122 if not reaction_id: | |
123 log.warning(f"Invalid reaction: {reactions_elt.toXml()}") | |
124 return False | |
125 from_jid = jid.JID(message_elt["from"]) | |
126 reactions = set() | |
127 for reaction_elt in reactions_elt.elements("reaction"): | |
128 reaction = str(reaction_elt) | |
129 if not emoji.is_emoji(reaction): | |
130 log.warning(f"ignoring invalide reaction: {reaction_elt.toXml()}") | |
131 continue | |
132 reactions.add(reaction) | |
133 await self.update_history_reactions(client, history, from_jid, reactions) | |
134 | |
135 return False | |
136 | |
78 return True | 137 return True |
79 | 138 |
80 def _reactions_set(self, message_id: str, profile: str, reactions: List[str]) -> None: | 139 def _reactions_set( |
140 self, | |
141 message_id: str, | |
142 reactions: List[str], | |
143 update_type: str = "replace", | |
144 profile: str = C.PROF_KEY_NONE, | |
145 ) -> defer.Deferred[None]: | |
81 client = self.host.get_client(profile) | 146 client = self.host.get_client(profile) |
82 return defer.ensureDeferred( | 147 return defer.ensureDeferred( |
83 self.set_reactions(client, message_id) | 148 self.set_reactions(client, message_id, reactions, update_type) |
84 ) | 149 ) |
150 | |
151 async def get_history_from_uid( | |
152 self, client: SatXMPPEntity, message_id: str | |
153 ) -> History: | |
154 """Retrieve the chat history associated with a given message ID. | |
155 | |
156 @param message_id: The Libervia specific identifier of the message | |
157 | |
158 @return: An instance of History containing the chat history, messages, and | |
159 subjects related to the specified message ID. | |
160 | |
161 @raises exceptions.NotFound: The history corresponding to the given message ID is | |
162 not found in the database. | |
163 """ | |
164 history = await self.host.memory.storage.get( | |
165 client, | |
166 History, | |
167 History.uid, | |
168 message_id, | |
169 ) | |
170 if history is None: | |
171 raise exceptions.NotFound( | |
172 f"message to retract not found in database ({message_id})" | |
173 ) | |
174 return history | |
85 | 175 |
86 def send_reactions( | 176 def send_reactions( |
87 self, | 177 self, |
88 client: SatXMPPEntity, | 178 client: SatXMPPEntity, |
89 dest_jid: jid.JID, | 179 dest_jid: jid.JID, |
180 message_type: str, | |
90 message_id: str, | 181 message_id: str, |
91 reactions: Iterable[str] | 182 reactions: Iterable[str], |
92 ) -> None: | 183 ) -> None: |
93 """Send the <message> stanza containing the reactions | 184 """Send the <message> stanza containing the reactions |
94 | 185 |
95 @param dest_jid: recipient of the reaction | 186 @param dest_jid: recipient of the reaction |
96 @param message_id: either <origin-id> or message's ID | 187 @param message_id: either <origin-id> or message's ID |
97 see https://xmpp.org/extensions/xep-0444.html#business-id | 188 see https://xmpp.org/extensions/xep-0444.html#business-id |
98 """ | 189 """ |
99 message_elt = domish.Element((None, "message")) | 190 message_elt = domish.Element((None, "message")) |
100 message_elt["from"] = client.jid.full() | 191 message_elt["from"] = client.jid.full() |
101 message_elt["to"] = dest_jid.full() | 192 message_elt["to"] = dest_jid.full() |
193 message_elt["type"] = message_type | |
102 reactions_elt = message_elt.addElement((NS_REACTIONS, "reactions")) | 194 reactions_elt = message_elt.addElement((NS_REACTIONS, "reactions")) |
103 reactions_elt["id"] = message_id | 195 reactions_elt["id"] = message_id |
104 for r in set(reactions): | 196 for r in set(reactions): |
105 reactions_elt.addElement("reaction", content=r) | 197 reactions_elt.addElement("reaction", content=r) |
106 self._h.add_hint_elements(message_elt, [self._h.HINT_STORE]) | 198 self._h.add_hint_elements(message_elt, [self._h.HINT_STORE]) |
107 client.send(message_elt) | 199 client.send(message_elt) |
108 | 200 |
109 async def add_reactions_to_history( | 201 def convert_to_replace_update( |
110 self, | 202 self, |
203 current_reactions: set[str], | |
204 reactions_new: Iterable[str], | |
205 update_type: str, | |
206 ) -> set[str]: | |
207 """Convert the given update to a replace update. | |
208 | |
209 @param current_reactions: reaction of reacting JID before update | |
210 @param reactions_new: New reactions to be updated. | |
211 @param update_type: Original type of update (add, remove, toggle, replace). | |
212 @return: reactions for a replace operation. | |
213 | |
214 @raise ValueError: invalid ``update_type``. | |
215 """ | |
216 new_reactions_set = set(reactions_new) | |
217 | |
218 if update_type == "replace": | |
219 return new_reactions_set | |
220 | |
221 if update_type == "add": | |
222 replace_reactions = current_reactions | new_reactions_set | |
223 elif update_type == "remove": | |
224 replace_reactions = current_reactions - new_reactions_set | |
225 elif update_type == "toggle": | |
226 replace_reactions = current_reactions ^ new_reactions_set | |
227 else: | |
228 raise ValueError(f"Invalid update type: {update_type!r}") | |
229 | |
230 return replace_reactions | |
231 | |
232 async def update_history_reactions( | |
233 self, | |
234 client: SatXMPPEntity, | |
111 history: History, | 235 history: History, |
112 from_jid: jid.JID, | 236 reacting_jid: jid.JID, |
113 reactions: Iterable[str] | 237 reactions_new: Iterable[str], |
114 ) -> None: | 238 update_type: str = "replace", |
115 """Update History instance with given reactions | 239 store: bool = True, |
116 | 240 ) -> set[str]: |
117 @param history: storage History instance | 241 """Update reactions in History instance and optionally store and signal it. |
118 will be updated in DB | 242 |
119 "summary" field of history.extra["reactions"] will also be updated | 243 @param history: storage History instance to be updated |
120 @param from_jid: author of the reactions | 244 @param reacting_jid: author of the reactions |
121 @param reactions: list of reactions | 245 @param reactions_new: new reactions to update |
122 """ | 246 @param update_type: Original type of update (add, remove, toggle, replace). |
123 history.extra = deepcopy(history.extra) if history.extra else {} | 247 @param store: if True, update history in storage, and send a `message_update` |
124 h_reactions = history.extra.setdefault("reactions", {}) | 248 signal with the new reactions. |
125 # reactions mapped by originating JID | 249 @return: set of reactions for this JID for a "replace" update |
126 by_jid = h_reactions.setdefault("by_jid", {}) | 250 """ |
127 # reactions are sorted to in summary to keep a consistent order | 251 # FIXME: handle race conditions |
128 h_reactions["by_jid"][from_jid.userhost()] = sorted(list(set(reactions))) | 252 if history.type == C.MESS_TYPE_GROUPCHAT: |
129 h_reactions["summary"] = sorted(list(set().union(*by_jid.values()))) | 253 entity_jid_s = reacting_jid.full() |
130 await self.host.memory.storage.session_add(history) | 254 else: |
255 entity_jid_s = reacting_jid.userhost() | |
256 if history.extra is None: | |
257 history.extra = {} | |
258 extra = history.extra | |
259 reactions = extra.get("reactions", {}) | |
260 | |
261 current_reactions = { | |
262 reaction for reaction, jids in reactions.items() if entity_jid_s in jids | |
263 } | |
264 reactions_replace_set = self.convert_to_replace_update( | |
265 current_reactions, reactions_new, update_type | |
266 ) | |
267 | |
268 for reaction in current_reactions - reactions_replace_set: | |
269 reaction_jids = reactions[reaction] | |
270 reaction_jids.remove(entity_jid_s) | |
271 if not reaction_jids: | |
272 del reactions[reaction] | |
273 | |
274 for reaction in reactions_replace_set - current_reactions: | |
275 reactions.setdefault(reaction, []).append(entity_jid_s) | |
276 | |
277 # we want to have a constant order in reactions | |
278 extra["reactions"] = dict(sorted(reactions.items())) | |
279 | |
280 if store: | |
281 # FIXME: this is not a clean way to flag "extra" as modified, but a deepcopy | |
282 # is for some reason not working here. | |
283 flag_modified(history, "extra") | |
284 await self.host.memory.storage.add(history) | |
285 # we send the signal for frontends update | |
286 data = MessageReactionData(reactions=extra["reactions"]) | |
287 self.host.bridge.message_update( | |
288 history.uid, | |
289 C.MESS_UPDATE_REACTION, | |
290 data.model_dump_json(), | |
291 client.profile, | |
292 ) | |
293 | |
294 return reactions_replace_set | |
131 | 295 |
132 async def set_reactions( | 296 async def set_reactions( |
133 self, | 297 self, |
134 client: SatXMPPEntity, | 298 client: SatXMPPEntity, |
135 message_id: str, | 299 message_id: str, |
136 reactions: Iterable[str] | 300 reactions: Iterable[str], |
301 update_type: str = "replace", | |
137 ) -> None: | 302 ) -> None: |
138 """Set and replace reactions to a message | 303 """Set and replace reactions to a message |
139 | 304 |
140 @param message_id: internal ID of the message | 305 @param message_id: internal ID of the message |
141 @param rections: lsit of emojis to used to react to the message | 306 @param rections: lsit of emojis to used to react to the message |
142 use empty list to remove all reactions | 307 use empty list to remove all reactions |
308 @param update_type: how to use the reaction to make the update, can be "replace", | |
309 "add", "remove" or "toggle". | |
143 """ | 310 """ |
144 if not message_id: | 311 if not message_id: |
145 raise ValueError("message_id can't be empty") | 312 raise ValueError("message_id can't be empty") |
146 history = await self.host.memory.storage.get( | 313 |
147 client, History, History.uid, message_id, | 314 history = await self.get_history_from_uid(client, message_id) |
148 joined_loads=[History.messages, History.subjects] | 315 if history.origin_id is not None: |
149 ) | 316 mess_id = history.origin_id |
150 if history is None: | 317 else: |
151 raise exceptions.NotFound( | 318 mess_id = history.stanza_id |
152 f"message to retract not found in database ({message_id})" | 319 |
153 ) | 320 if mess_id is None: |
154 mess_id = history.origin_id or history.stanza_id | |
155 if not mess_id: | |
156 raise exceptions.DataError( | 321 raise exceptions.DataError( |
157 "target message has neither origin-id nor message-id, we can't send a " | 322 "target message has neither origin-id nor message-id, we can't send a " |
158 "reaction" | 323 "reaction" |
159 ) | 324 ) |
160 await self.add_reactions_to_history(history, client.jid, reactions) | 325 |
161 self.send_reactions(client, history.dest_jid, mess_id, reactions) | 326 if history.source == client.jid.userhost(): |
327 dest_jid = history.dest_jid | |
328 else: | |
329 dest_jid = history.source_jid | |
330 | |
331 if history.type == C.MESS_TYPE_GROUPCHAT: | |
332 # the reaction is for the chat room itself | |
333 dest_jid = dest_jid.userhostJID() | |
334 reacting_jid = self._m.get_room_user_jid(client, dest_jid) | |
335 # we don't store reactions for MUC, at this will be done when we receive back | |
336 # the reaction | |
337 store = False | |
338 else: | |
339 # we use bare JIDs for one2one message, except if the recipient is from a MUC | |
340 if self._m.is_room(client, dest_jid): | |
341 # this is a private message in a room, we need to use the MUC JID | |
342 reacting_jid = self._m.get_room_user_jid(client, dest_jid.userhostJID()) | |
343 else: | |
344 # this is a classic one2one message, we need the bare JID | |
345 reacting_jid = client.jid.userhostJID() | |
346 dest_jid = dest_jid.userhostJID() | |
347 store = True | |
348 | |
349 reaction_replace_set = await self.update_history_reactions( | |
350 client, history, reacting_jid, reactions, update_type, store | |
351 ) | |
352 | |
353 self.send_reactions(client, dest_jid, history.type, mess_id, reaction_replace_set) | |
162 | 354 |
163 | 355 |
164 @implementer(iwokkel.IDisco) | 356 @implementer(iwokkel.IDisco) |
165 class XEP_0444_Handler(xmlstream.XMPPHandler): | 357 class XEP_0444_Handler(xmlstream.XMPPHandler): |
166 | |
167 def getDiscoInfo(self, requestor, service, nodeIdentifier=""): | 358 def getDiscoInfo(self, requestor, service, nodeIdentifier=""): |
168 return [disco.DiscoFeature(NS_REACTIONS)] | 359 return [disco.DiscoFeature(NS_REACTIONS)] |
169 | 360 |
170 def getDiscoItems(self, requestor, service, nodeIdentifier=""): | 361 def getDiscoItems(self, requestor, service, nodeIdentifier=""): |
171 return [] | 362 return [] |