comparison libervia/backend/plugins/plugin_xep_0422.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200
parents sat/plugins/plugin_xep_0422.py@524856bd7b19
children
comparison
equal deleted inserted replaced
4070:d10748475025 4071:4b842c1fb686
1 #!/usr/bin/env python3
2
3 # Copyright (C) 2009-2022 Jérôme Poisson (goffi@goffi.org)
4
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
9
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU Affero General Public License for more details.
14
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 from typing import Optional, List, Tuple, Union, NamedTuple
19 from collections import namedtuple
20
21 from twisted.words.protocols.jabber import xmlstream
22 from twisted.words.xish import domish
23 from wokkel import disco
24 from zope.interface import implementer
25
26 from libervia.backend.core.constants import Const as C
27 from libervia.backend.core.i18n import _
28 from libervia.backend.core.log import getLogger
29 from libervia.backend.core.core_types import SatXMPPEntity
30 from libervia.backend.memory.sqla_mapping import History
31 from libervia.backend.tools.common.async_utils import async_lru
32
33 log = getLogger(__name__)
34
35
36 PLUGIN_INFO = {
37 C.PI_NAME: "Message Fastening",
38 C.PI_IMPORT_NAME: "XEP-0422",
39 C.PI_TYPE: "XEP",
40 C.PI_MODES: C.PLUG_MODE_BOTH,
41 C.PI_PROTOCOLS: ["XEP-0359", "XEP-0422"],
42 C.PI_MAIN: "XEP_0422",
43 C.PI_HANDLER: "yes",
44 C.PI_DESCRIPTION: _("""Implementation Message Fastening"""),
45 }
46
47 NS_FASTEN = "urn:xmpp:fasten:0"
48
49
50 class FastenMetadata(NamedTuple):
51 elements: List[domish.Element]
52 id: str
53 history: Optional[History]
54 clear: bool
55 shell: bool
56
57
58 class XEP_0422(object):
59
60 def __init__(self, host):
61 log.info(_("XEP-0422 (Message Fastening) plugin initialization"))
62 self.host = host
63 host.register_namespace("fasten", NS_FASTEN)
64
65 def get_handler(self, __):
66 return XEP_0422_handler()
67
68 def apply_to_elt(
69 self,
70 message_elt: domish.Element,
71 origin_id: str,
72 clear: Optional[bool] = None,
73 shell: Optional[bool] = None,
74 children: Optional[List[domish.Element]] = None,
75 external: Optional[List[Union[str, Tuple[str, str]]]] = None
76 ) -> domish.Element:
77 """Generate, add and return <apply-to> element
78
79 @param message_elt: wrapping <message> element
80 @param origin_id: origin ID of the target message
81 @param clear: set to True to remove a fastening
82 @param shell: set to True when using e2ee shell
83 cf. https://xmpp.org/extensions/xep-0422.html#encryption
84 @param children: element to fasten to the target message
85 <apply-to> element is returned, thus children can also easily be added
86 afterwards
87 @param external: <external> element to add
88 cf. https://xmpp.org/extensions/xep-0422.html#external-payloads
89 the list items can either be a str with only the element name,
90 or a tuple which must then be (namespace, name)
91 @return: <apply-to> element, which is already added to the wrapping message_elt
92 """
93 apply_to_elt = message_elt.addElement((NS_FASTEN, "apply-to"))
94 apply_to_elt["id"] = origin_id
95 if clear is not None:
96 apply_to_elt["clear"] = C.bool_const(clear)
97 if shell is not None:
98 apply_to_elt["shell"] = C.bool_const(shell)
99 if children is not None:
100 for child in children:
101 apply_to_elt.addChild(child)
102 if external is not None:
103 for ext in external:
104 external_elt = apply_to_elt.addElement("external")
105 if isinstance(ext, str):
106 external_elt["name"] = ext
107 else:
108 ns, name = ext
109 external_elt["name"] = name
110 external_elt["element-namespace"] = ns
111 return apply_to_elt
112
113 @async_lru(maxsize=5)
114 async def get_fastened_elts(
115 self,
116 client: SatXMPPEntity,
117 message_elt: domish.Element
118 ) -> Optional[FastenMetadata]:
119 """Get fastened elements
120
121 if the message contains no <apply-to> element, None is returned
122 """
123 try:
124 apply_to_elt = next(message_elt.elements(NS_FASTEN, "apply-to"))
125 except StopIteration:
126 return None
127 else:
128 origin_id = apply_to_elt.getAttribute("id")
129 if not origin_id:
130 log.warning(
131 f"Received invalid fastening message: {message_elt.toXml()}"
132 )
133 return None
134 elements = apply_to_elt.children
135 if not elements:
136 log.warning(f"No element to fasten: {message_elt.toXml()}")
137 return None
138 history = await self.host.memory.storage.get(
139 client,
140 History,
141 History.origin_id,
142 origin_id,
143 (History.messages, History.subjects, History.thread)
144 )
145 return FastenMetadata(
146 elements,
147 origin_id,
148 history,
149 C.bool(apply_to_elt.getAttribute("clear", C.BOOL_FALSE)),
150 C.bool(apply_to_elt.getAttribute("shell", C.BOOL_FALSE)),
151 )
152
153
154 @implementer(disco.IDisco)
155 class XEP_0422_handler(xmlstream.XMPPHandler):
156
157 def getDiscoInfo(self, __, target, nodeIdentifier=""):
158 return [disco.DiscoFeature(NS_FASTEN)]
159
160 def getDiscoItems(self, requestor, target, nodeIdentifier=""):
161 return []