Mercurial > libervia-backend
comparison libervia/backend/plugins/plugin_xep_0359.py @ 4071:4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 11:49:51 +0200 |
parents | sat/plugins/plugin_xep_0359.py@524856bd7b19 |
children | 2729d424dee7 |
comparison
equal
deleted
inserted
replaced
4070:d10748475025 | 4071:4b842c1fb686 |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 | |
4 # SAT plugin for Message Archive Management (XEP-0359) | |
5 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) | |
6 # Copyright (C) 2013-2016 Adrien Cossa (souliane@mailoo.org) | |
7 | |
8 # This program is free software: you can redistribute it and/or modify | |
9 # it under the terms of the GNU Affero General Public License as published by | |
10 # the Free Software Foundation, either version 3 of the License, or | |
11 # (at your option) any later version. | |
12 | |
13 # This program is distributed in the hope that it will be useful, | |
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
16 # GNU Affero General Public License for more details. | |
17 | |
18 # You should have received a copy of the GNU Affero General Public License | |
19 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
20 | |
21 from typing import Optional | |
22 import uuid | |
23 from zope.interface import implementer | |
24 from twisted.words.protocols.jabber import xmlstream | |
25 from wokkel import disco | |
26 from libervia.backend.core.constants import Const as C | |
27 from libervia.backend.core import exceptions | |
28 from libervia.backend.core.i18n import _ | |
29 from libervia.backend.core.log import getLogger | |
30 from twisted.words.xish import domish | |
31 | |
32 log = getLogger(__name__) | |
33 | |
34 | |
35 PLUGIN_INFO = { | |
36 C.PI_NAME: "Unique and Stable Stanza IDs", | |
37 C.PI_IMPORT_NAME: "XEP-0359", | |
38 C.PI_TYPE: "XEP", | |
39 C.PI_PROTOCOLS: ["XEP-0359"], | |
40 C.PI_MAIN: "XEP_0359", | |
41 C.PI_HANDLER: "yes", | |
42 C.PI_DESCRIPTION: _("""Implementation of Unique and Stable Stanza IDs"""), | |
43 } | |
44 | |
45 NS_SID = "urn:xmpp:sid:0" | |
46 | |
47 | |
48 class XEP_0359(object): | |
49 | |
50 def __init__(self, host): | |
51 log.info(_("Unique and Stable Stanza IDs plugin initialization")) | |
52 self.host = host | |
53 host.register_namespace("stanza_id", NS_SID) | |
54 host.trigger.add("message_parse", self._message_parse_trigger) | |
55 host.trigger.add("send_message_data", self._send_message_data_trigger) | |
56 | |
57 def _message_parse_trigger(self, client, message_elt, mess_data): | |
58 """Check if message has a stanza-id""" | |
59 stanza_id = self.get_stanza_id(message_elt, client.jid.userhostJID()) | |
60 if stanza_id is not None: | |
61 mess_data['extra']['stanza_id'] = stanza_id | |
62 origin_id = self.get_origin_id(message_elt) | |
63 if origin_id is not None: | |
64 mess_data['extra']['origin_id'] = origin_id | |
65 return True | |
66 | |
67 def _send_message_data_trigger(self, client, mess_data): | |
68 origin_id = mess_data["extra"].get("origin_id") | |
69 if not origin_id: | |
70 origin_id = str(uuid.uuid4()) | |
71 mess_data["extra"]["origin_id"] = origin_id | |
72 message_elt = mess_data["xml"] | |
73 self.add_origin_id(message_elt, origin_id) | |
74 | |
75 def get_stanza_id(self, element, by): | |
76 """Return stanza-id if found in element | |
77 | |
78 @param element(domish.Element): element to parse | |
79 @param by(jid.JID): entity which should have set a stanza-id | |
80 @return (unicode, None): stanza-id if found | |
81 """ | |
82 stanza_id = None | |
83 for stanza_elt in element.elements(NS_SID, "stanza-id"): | |
84 if stanza_elt.getAttribute("by") == by.full(): | |
85 if stanza_id is not None: | |
86 # we must not have more than one element (§3 #4) | |
87 raise exceptions.DataError( | |
88 "More than one corresponding stanza-id found!") | |
89 stanza_id = stanza_elt.getAttribute("id") | |
90 # we don't break to be sure that there is no more than one element | |
91 # with this "by" attribute | |
92 | |
93 return stanza_id | |
94 | |
95 def add_stanza_id(self, client, element, stanza_id, by=None): | |
96 """Add a <stanza-id/> to a stanza | |
97 | |
98 @param element(domish.Element): stanza where the <stanza-id/> must be added | |
99 @param stanza_id(unicode): id to use | |
100 @param by(jid.JID, None): jid to use or None to use client.jid | |
101 """ | |
102 sid_elt = element.addElement((NS_SID, "stanza-id")) | |
103 sid_elt["by"] = client.jid.userhost() if by is None else by.userhost() | |
104 sid_elt["id"] = stanza_id | |
105 | |
106 def get_origin_id(self, element: domish.Element) -> Optional[str]: | |
107 """Return origin-id if found in element | |
108 | |
109 @param element: element to parse | |
110 @return: origin-id if found | |
111 """ | |
112 try: | |
113 origin_elt = next(element.elements(NS_SID, "origin-id")) | |
114 except StopIteration: | |
115 return None | |
116 else: | |
117 return origin_elt.getAttribute("id") | |
118 | |
119 def add_origin_id(self, element, origin_id=None): | |
120 """Add a <origin-id/> to a stanza | |
121 | |
122 @param element(domish.Element): stanza where the <origin-id/> must be added | |
123 @param origin_id(str): id to use, None to automatically generate | |
124 @return (str): origin_id | |
125 """ | |
126 if origin_id is None: | |
127 origin_id = str(uuid.uuid4()) | |
128 sid_elt = element.addElement((NS_SID, "origin-id")) | |
129 sid_elt["id"] = origin_id | |
130 return origin_id | |
131 | |
132 def get_handler(self, client): | |
133 return XEP_0359_handler() | |
134 | |
135 | |
136 @implementer(disco.IDisco) | |
137 class XEP_0359_handler(xmlstream.XMPPHandler): | |
138 | |
139 def getDiscoInfo(self, requestor, target, nodeIdentifier=""): | |
140 return [disco.DiscoFeature(NS_SID)] | |
141 | |
142 def getDiscoItems(self, requestor, target, nodeIdentifier=""): | |
143 return [] |