comparison libervia/backend/plugins/plugin_xep_0131.py @ 4314:6a70fcd93a7a

plugin XEP-0131: Stanza Headers and Internet Metadata implementation: - SHIM is now supported and put in `msg_data["extra"]["headers"]`. - `Keywords` are converted from and to list of string in `msg_data["extra"]["keywords"]` field (if present in headers on message sending, values are merged). - Python minimal version upgraded to 3.11 due to use of `StrEnum`. rel 451
author Goffi <goffi@goffi.org>
date Sat, 28 Sep 2024 15:56:04 +0200
parents
children
comparison
equal deleted inserted replaced
4313:530f86f078cc 4314:6a70fcd93a7a
1 #!/usr/bin/env python3
2
3 # Libervia plugin Stanza Headers and Internet Metadata (XEP-0131)
4 # Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org)
5
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU Affero General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU Affero General Public License for more details.
15
16 # You should have received a copy of the GNU Affero General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19 from email.utils import quote, unquote
20 from enum import StrEnum, auto
21 import re
22 from typing import Iterator, List, Literal, Optional, Self
23
24 from pydantic import BaseModel, ConfigDict, Field, RootModel
25 from twisted.internet import defer
26 from twisted.words.protocols.jabber.xmlstream import XMPPHandler
27 from twisted.words.xish import domish
28 from wokkel import disco
29 from zope.interface import implementer
30
31 from libervia.backend.core import exceptions
32 from libervia.backend.core.constants import Const as C
33 from libervia.backend.core.core_types import SatXMPPEntity
34 from libervia.backend.core.i18n import _
35 from libervia.backend.core.log import getLogger
36 from libervia.backend.models.core import MessageData
37
38 log = getLogger(__name__)
39
40 PLUGIN_INFO = {
41 C.PI_NAME: "Stanza Headers and Internet Metadata Plugin",
42 C.PI_IMPORT_NAME: "XEP-0131",
43 C.PI_TYPE: "XEP",
44 C.PI_MODES: C.PLUG_MODE_BOTH,
45 C.PI_PROTOCOLS: ["XEP-0131"],
46 C.PI_DEPENDENCIES: [],
47 C.PI_MAIN: "XEP_0131",
48 C.PI_HANDLER: "yes",
49 C.PI_DESCRIPTION: _(
50 "Enables the inclusion of non-addressing header information in XMPP stanzas."
51 ),
52 }
53
54 NS_SHIM = "http://jabber.org/protocol/shim"
55
56 # Regex to match quoted and non-quoted values.
57 RE_QUOTED_VALUES = re.compile(
58 r"""
59 # Match quoted phrases
60 "
61 (?:
62 # Match any escaped character
63 \\.
64 # Match any character that is not a double quote or a backslash
65 |[^"\\]
66 )*
67 "
68 |
69 # Match non-quoted phrases
70 (?:[^,]+)
71 """,
72 re.VERBOSE,
73 )
74
75
76 class Urgency(StrEnum):
77 low = auto()
78 medium = auto()
79 high = auto()
80
81
82 class Priority(StrEnum):
83 non_urgent = "non-urgent"
84 normal = auto()
85 urgent = auto()
86 emergency = auto()
87
88
89 class HeadersData(BaseModel):
90 model_config = ConfigDict(extra="allow")
91
92 __pydantic_extra__: dict[str, str] = Field(init=False) # type: ignore
93
94 keywords: str | None = None
95 urgency: Urgency | None = None
96 priority: Priority | None = None
97
98 def items(self):
99 return self.__pydantic_extra__.items()
100
101 def to_element(self) -> domish.Element:
102 """Build the <headers> element from this instance's data."""
103 headers_elt = domish.Element((NS_SHIM, "headers"))
104 header_names = list(self.model_fields.keys()) + list(
105 self.__pydantic_extra__.keys()
106 )
107 for name in header_names:
108 value = getattr(self, name)
109 if value is None:
110 continue
111 header_elt = headers_elt.addElement("header")
112 header_elt["name"] = name
113 header_elt.addContent(value)
114 return headers_elt
115
116 @classmethod
117 def from_element(cls, headers_elt: domish.Element) -> Self:
118 """Create a HeadersData instance from a <headers> element."""
119 if headers_elt.uri != NS_SHIM or headers_elt.name != "headers":
120 child_headers_elt = next(headers_elt.elements(NS_SHIM, "headers"), None)
121 if child_headers_elt is None:
122 raise exceptions.NotFound("<headers> element not found")
123 else:
124 headers_elt = child_headers_elt
125
126 headers = {}
127 for header_elt in headers_elt.elements(NS_SHIM, "header"):
128 name = header_elt.getAttribute("name")
129 value = str(header_elt)
130 headers[name] = value
131 return cls(**headers)
132
133
134 class Keywords(RootModel):
135 root: list[str]
136
137 def __iter__(self) -> Iterator[str]: # type: ignore
138 return iter(self.root)
139
140 def __getitem__(self, item) -> str:
141 return self.root[item]
142
143 def __len__(self) -> int:
144 return len(self.root)
145
146
147 class XEP_0131:
148 """Implementation for XEP-0131"""
149
150 def __init__(self, host):
151 log.info(_("Stanza Headers and Internet Metadata plugin initialization"))
152 self.host = host
153 host.register_namespace("shim", NS_SHIM)
154 host.trigger.add("sendMessage", self.send_message_trigger)
155 host.trigger.add("sendMessageComponent", self.send_message_trigger)
156 host.trigger.add("message_received", self.message_received_trigger)
157
158 def quote_value(self, value: str) -> str:
159 """Quote a value if it contain special characters
160
161 @param value: Value to quote if necessary.
162 @return: Quoted value.
163 """
164 if any(c in value for c in r" ,\""):
165 value = f'"{quote(value)}"'
166 return value
167
168 def unquote_values(self, raw_header: str) -> list[str]:
169 """Unquote raw list of values header.
170
171 This is raw header for potentially quoted values separated by commas, like in the
172 "keywords" header.
173
174 @param raw_keywords_header: Raw Keywords header.
175 @return: A list of unquoted strings.
176 """
177 unquoted_values = []
178
179 for match in RE_QUOTED_VALUES.finditer(raw_header):
180 value = match.group(0).strip()
181
182 # Unquote the keyword if needed.
183 if value.startswith('"') and value.endswith('"'):
184 value = unquote(value)
185
186 value = value.strip()
187 if value:
188 unquoted_values.append(value)
189
190 return unquoted_values
191
192 def move_keywords_to_headers(self, extra: dict) -> None:
193 """Check if keywords are present in extra, and move them to headers.
194
195 The list of keywords will be converted to a header value and set in the right
196 location.
197
198 @param extra: MessageData's ``extra`` field. Will be modified in place by
199 creating/updating the ``headers`` field.
200 """
201 # Keywords can be in a list of strings in extra's "keywords" field.
202 if "keywords" in extra:
203 keywords = Keywords(extra["keywords"])
204 if keywords:
205 headers = extra.setdefault("headers", {})
206 quoted_kw = ",".join(self.quote_value(kw) for kw in keywords)
207 existing_kw = headers.get("keywords")
208 if existing_kw:
209 # We have also keywords in headers, we merge both.
210 quoted_kw = f"{existing_kw},{quoted_kw}"
211 headers["keywords"] = quoted_kw
212
213 def send_message_trigger(
214 self, client, mess_data, pre_xml_treatments, post_xml_treatments
215 ) -> Literal[True]:
216 """Process the XEP-0131 related data to be sent"""
217
218 def add_headers(mess_data: MessageData) -> MessageData:
219 extra = mess_data["extra"]
220 self.move_keywords_to_headers(extra)
221 # Now we parse headers, if any.
222 if "headers" in extra:
223 headers_data = HeadersData(**extra["headers"])
224 message_elt = mess_data["xml"]
225 message_elt.addChild(headers_data.to_element())
226 return mess_data
227
228 post_xml_treatments.addCallback(add_headers)
229 return True
230
231 def message_received_trigger(
232 self,
233 client: SatXMPPEntity,
234 message_elt: domish.Element,
235 post_treat: defer.Deferred,
236 ) -> Literal[True]:
237 """Parse headers information and add them to message data."""
238 try:
239 headers = HeadersData.from_element(message_elt)
240 except exceptions.NotFound:
241 pass
242 else:
243
244 def post_treat_addr(mess_data: MessageData):
245 """Add the headers metadata to the message data"""
246 if headers.keywords:
247 # We move keywords to a list of string in extra's "keywords" field.
248 mess_data["extra"]["keywords"] = self.unquote_values(headers.keywords)
249 headers.keywords = None
250 mess_data["extra"]["headers"] = headers.model_dump(
251 mode="json", exclude_none=True
252 )
253 return mess_data
254
255 post_treat.addCallback(post_treat_addr)
256 return True
257
258 def get_handler(self, client):
259 return XEP_0131_handler()
260
261
262 @implementer(disco.IDisco)
263 class XEP_0131_handler(XMPPHandler):
264
265 def getDiscoInfo(
266 self, requestor, target, nodeIdentifier: Optional[str] = ""
267 ) -> List[disco.DiscoFeature]:
268 return [disco.DiscoFeature(NS_SHIM)]
269
270 def getDiscoItems(
271 self, requestor, target, nodeIdentifier: Optional[str] = ""
272 ) -> List[disco.DiscoItem]:
273 return []