Mercurial > libervia-backend
comparison libervia/backend/plugins/plugin_xep_0131.py @ 4314:6a70fcd93a7a
plugin XEP-0131: Stanza Headers and Internet Metadata implementation:
- SHIM is now supported and put in `msg_data["extra"]["headers"]`.
- `Keywords` are converted from and to list of string in `msg_data["extra"]["keywords"]`
field (if present in headers on message sending, values are merged).
- Python minimal version upgraded to 3.11 due to use of `StrEnum`.
rel 451
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 28 Sep 2024 15:56:04 +0200 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
4313:530f86f078cc | 4314:6a70fcd93a7a |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 # Libervia plugin Stanza Headers and Internet Metadata (XEP-0131) | |
4 # Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org) | |
5 | |
6 # This program is free software: you can redistribute it and/or modify | |
7 # it under the terms of the GNU Affero General Public License as published by | |
8 # the Free Software Foundation, either version 3 of the License, or | |
9 # (at your option) any later version. | |
10 | |
11 # This program is distributed in the hope that it will be useful, | |
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
14 # GNU Affero General Public License for more details. | |
15 | |
16 # You should have received a copy of the GNU Affero General Public License | |
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
18 | |
19 from email.utils import quote, unquote | |
20 from enum import StrEnum, auto | |
21 import re | |
22 from typing import Iterator, List, Literal, Optional, Self | |
23 | |
24 from pydantic import BaseModel, ConfigDict, Field, RootModel | |
25 from twisted.internet import defer | |
26 from twisted.words.protocols.jabber.xmlstream import XMPPHandler | |
27 from twisted.words.xish import domish | |
28 from wokkel import disco | |
29 from zope.interface import implementer | |
30 | |
31 from libervia.backend.core import exceptions | |
32 from libervia.backend.core.constants import Const as C | |
33 from libervia.backend.core.core_types import SatXMPPEntity | |
34 from libervia.backend.core.i18n import _ | |
35 from libervia.backend.core.log import getLogger | |
36 from libervia.backend.models.core import MessageData | |
37 | |
38 log = getLogger(__name__) | |
39 | |
40 PLUGIN_INFO = { | |
41 C.PI_NAME: "Stanza Headers and Internet Metadata Plugin", | |
42 C.PI_IMPORT_NAME: "XEP-0131", | |
43 C.PI_TYPE: "XEP", | |
44 C.PI_MODES: C.PLUG_MODE_BOTH, | |
45 C.PI_PROTOCOLS: ["XEP-0131"], | |
46 C.PI_DEPENDENCIES: [], | |
47 C.PI_MAIN: "XEP_0131", | |
48 C.PI_HANDLER: "yes", | |
49 C.PI_DESCRIPTION: _( | |
50 "Enables the inclusion of non-addressing header information in XMPP stanzas." | |
51 ), | |
52 } | |
53 | |
54 NS_SHIM = "http://jabber.org/protocol/shim" | |
55 | |
56 # Regex to match quoted and non-quoted values. | |
57 RE_QUOTED_VALUES = re.compile( | |
58 r""" | |
59 # Match quoted phrases | |
60 " | |
61 (?: | |
62 # Match any escaped character | |
63 \\. | |
64 # Match any character that is not a double quote or a backslash | |
65 |[^"\\] | |
66 )* | |
67 " | |
68 | | |
69 # Match non-quoted phrases | |
70 (?:[^,]+) | |
71 """, | |
72 re.VERBOSE, | |
73 ) | |
74 | |
75 | |
76 class Urgency(StrEnum): | |
77 low = auto() | |
78 medium = auto() | |
79 high = auto() | |
80 | |
81 | |
82 class Priority(StrEnum): | |
83 non_urgent = "non-urgent" | |
84 normal = auto() | |
85 urgent = auto() | |
86 emergency = auto() | |
87 | |
88 | |
89 class HeadersData(BaseModel): | |
90 model_config = ConfigDict(extra="allow") | |
91 | |
92 __pydantic_extra__: dict[str, str] = Field(init=False) # type: ignore | |
93 | |
94 keywords: str | None = None | |
95 urgency: Urgency | None = None | |
96 priority: Priority | None = None | |
97 | |
98 def items(self): | |
99 return self.__pydantic_extra__.items() | |
100 | |
101 def to_element(self) -> domish.Element: | |
102 """Build the <headers> element from this instance's data.""" | |
103 headers_elt = domish.Element((NS_SHIM, "headers")) | |
104 header_names = list(self.model_fields.keys()) + list( | |
105 self.__pydantic_extra__.keys() | |
106 ) | |
107 for name in header_names: | |
108 value = getattr(self, name) | |
109 if value is None: | |
110 continue | |
111 header_elt = headers_elt.addElement("header") | |
112 header_elt["name"] = name | |
113 header_elt.addContent(value) | |
114 return headers_elt | |
115 | |
116 @classmethod | |
117 def from_element(cls, headers_elt: domish.Element) -> Self: | |
118 """Create a HeadersData instance from a <headers> element.""" | |
119 if headers_elt.uri != NS_SHIM or headers_elt.name != "headers": | |
120 child_headers_elt = next(headers_elt.elements(NS_SHIM, "headers"), None) | |
121 if child_headers_elt is None: | |
122 raise exceptions.NotFound("<headers> element not found") | |
123 else: | |
124 headers_elt = child_headers_elt | |
125 | |
126 headers = {} | |
127 for header_elt in headers_elt.elements(NS_SHIM, "header"): | |
128 name = header_elt.getAttribute("name") | |
129 value = str(header_elt) | |
130 headers[name] = value | |
131 return cls(**headers) | |
132 | |
133 | |
134 class Keywords(RootModel): | |
135 root: list[str] | |
136 | |
137 def __iter__(self) -> Iterator[str]: # type: ignore | |
138 return iter(self.root) | |
139 | |
140 def __getitem__(self, item) -> str: | |
141 return self.root[item] | |
142 | |
143 def __len__(self) -> int: | |
144 return len(self.root) | |
145 | |
146 | |
147 class XEP_0131: | |
148 """Implementation for XEP-0131""" | |
149 | |
150 def __init__(self, host): | |
151 log.info(_("Stanza Headers and Internet Metadata plugin initialization")) | |
152 self.host = host | |
153 host.register_namespace("shim", NS_SHIM) | |
154 host.trigger.add("sendMessage", self.send_message_trigger) | |
155 host.trigger.add("sendMessageComponent", self.send_message_trigger) | |
156 host.trigger.add("message_received", self.message_received_trigger) | |
157 | |
158 def quote_value(self, value: str) -> str: | |
159 """Quote a value if it contain special characters | |
160 | |
161 @param value: Value to quote if necessary. | |
162 @return: Quoted value. | |
163 """ | |
164 if any(c in value for c in r" ,\""): | |
165 value = f'"{quote(value)}"' | |
166 return value | |
167 | |
168 def unquote_values(self, raw_header: str) -> list[str]: | |
169 """Unquote raw list of values header. | |
170 | |
171 This is raw header for potentially quoted values separated by commas, like in the | |
172 "keywords" header. | |
173 | |
174 @param raw_keywords_header: Raw Keywords header. | |
175 @return: A list of unquoted strings. | |
176 """ | |
177 unquoted_values = [] | |
178 | |
179 for match in RE_QUOTED_VALUES.finditer(raw_header): | |
180 value = match.group(0).strip() | |
181 | |
182 # Unquote the keyword if needed. | |
183 if value.startswith('"') and value.endswith('"'): | |
184 value = unquote(value) | |
185 | |
186 value = value.strip() | |
187 if value: | |
188 unquoted_values.append(value) | |
189 | |
190 return unquoted_values | |
191 | |
192 def move_keywords_to_headers(self, extra: dict) -> None: | |
193 """Check if keywords are present in extra, and move them to headers. | |
194 | |
195 The list of keywords will be converted to a header value and set in the right | |
196 location. | |
197 | |
198 @param extra: MessageData's ``extra`` field. Will be modified in place by | |
199 creating/updating the ``headers`` field. | |
200 """ | |
201 # Keywords can be in a list of strings in extra's "keywords" field. | |
202 if "keywords" in extra: | |
203 keywords = Keywords(extra["keywords"]) | |
204 if keywords: | |
205 headers = extra.setdefault("headers", {}) | |
206 quoted_kw = ",".join(self.quote_value(kw) for kw in keywords) | |
207 existing_kw = headers.get("keywords") | |
208 if existing_kw: | |
209 # We have also keywords in headers, we merge both. | |
210 quoted_kw = f"{existing_kw},{quoted_kw}" | |
211 headers["keywords"] = quoted_kw | |
212 | |
213 def send_message_trigger( | |
214 self, client, mess_data, pre_xml_treatments, post_xml_treatments | |
215 ) -> Literal[True]: | |
216 """Process the XEP-0131 related data to be sent""" | |
217 | |
218 def add_headers(mess_data: MessageData) -> MessageData: | |
219 extra = mess_data["extra"] | |
220 self.move_keywords_to_headers(extra) | |
221 # Now we parse headers, if any. | |
222 if "headers" in extra: | |
223 headers_data = HeadersData(**extra["headers"]) | |
224 message_elt = mess_data["xml"] | |
225 message_elt.addChild(headers_data.to_element()) | |
226 return mess_data | |
227 | |
228 post_xml_treatments.addCallback(add_headers) | |
229 return True | |
230 | |
231 def message_received_trigger( | |
232 self, | |
233 client: SatXMPPEntity, | |
234 message_elt: domish.Element, | |
235 post_treat: defer.Deferred, | |
236 ) -> Literal[True]: | |
237 """Parse headers information and add them to message data.""" | |
238 try: | |
239 headers = HeadersData.from_element(message_elt) | |
240 except exceptions.NotFound: | |
241 pass | |
242 else: | |
243 | |
244 def post_treat_addr(mess_data: MessageData): | |
245 """Add the headers metadata to the message data""" | |
246 if headers.keywords: | |
247 # We move keywords to a list of string in extra's "keywords" field. | |
248 mess_data["extra"]["keywords"] = self.unquote_values(headers.keywords) | |
249 headers.keywords = None | |
250 mess_data["extra"]["headers"] = headers.model_dump( | |
251 mode="json", exclude_none=True | |
252 ) | |
253 return mess_data | |
254 | |
255 post_treat.addCallback(post_treat_addr) | |
256 return True | |
257 | |
258 def get_handler(self, client): | |
259 return XEP_0131_handler() | |
260 | |
261 | |
262 @implementer(disco.IDisco) | |
263 class XEP_0131_handler(XMPPHandler): | |
264 | |
265 def getDiscoInfo( | |
266 self, requestor, target, nodeIdentifier: Optional[str] = "" | |
267 ) -> List[disco.DiscoFeature]: | |
268 return [disco.DiscoFeature(NS_SHIM)] | |
269 | |
270 def getDiscoItems( | |
271 self, requestor, target, nodeIdentifier: Optional[str] = "" | |
272 ) -> List[disco.DiscoItem]: | |
273 return [] |