comparison libervia/backend/plugins/plugin_xep_0300.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200
parents sat/plugins/plugin_xep_0300.py@524856bd7b19
children 0d7bb4df2343
comparison
equal deleted inserted replaced
4070:d10748475025 4071:4b842c1fb686
1 #!/usr/bin/env python3
2
3
4 # SAT plugin for Hash functions (XEP-0300)
5 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
6
7 # This program is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU Affero General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
11
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU Affero General Public License for more details.
16
17 # You should have received a copy of the GNU Affero General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20 from typing import Tuple
21 import base64
22 from collections import OrderedDict
23 import hashlib
24
25 from twisted.internet import threads
26 from twisted.internet import defer
27 from twisted.words.protocols.jabber.xmlstream import XMPPHandler
28 from twisted.words.xish import domish
29 from wokkel import disco, iwokkel
30 from zope.interface import implementer
31
32 from libervia.backend.core import exceptions
33 from libervia.backend.core.constants import Const as C
34 from libervia.backend.core.i18n import _
35 from libervia.backend.core.log import getLogger
36
37 log = getLogger(__name__)
38
39
40 PLUGIN_INFO = {
41 C.PI_NAME: "Cryptographic Hash Functions",
42 C.PI_IMPORT_NAME: "XEP-0300",
43 C.PI_TYPE: "XEP",
44 C.PI_MODES: C.PLUG_MODE_BOTH,
45 C.PI_PROTOCOLS: ["XEP-0300"],
46 C.PI_MAIN: "XEP_0300",
47 C.PI_HANDLER: "yes",
48 C.PI_DESCRIPTION: _("""Management of cryptographic hashes"""),
49 }
50
51 NS_HASHES = "urn:xmpp:hashes:2"
52 NS_HASHES_FUNCTIONS = "urn:xmpp:hash-function-text-names:{}"
53 BUFFER_SIZE = 2 ** 12
54 ALGO_DEFAULT = "sha-256"
55
56
57 class XEP_0300(object):
58 # TODO: add blake after moving to Python 3
59 ALGOS = OrderedDict(
60 (
61 ("md5", hashlib.md5),
62 ("sha-1", hashlib.sha1),
63 ("sha-256", hashlib.sha256),
64 ("sha-512", hashlib.sha512),
65 )
66 )
67 ALGO_DEFAULT = ALGO_DEFAULT
68
69 def __init__(self, host):
70 log.info(_("plugin Hashes initialization"))
71 host.register_namespace("hashes", NS_HASHES)
72
73 def get_handler(self, client):
74 return XEP_0300_handler()
75
76 def get_hasher(self, algo=ALGO_DEFAULT):
77 """Return hasher instance
78
79 @param algo(unicode): one of the XEP_300.ALGOS keys
80 @return (hash object): same object s in hashlib.
81 update method need to be called for each chunh
82 diget or hexdigest can be used at the end
83 """
84 return self.ALGOS[algo]()
85
86 def get_default_algo(self):
87 return ALGO_DEFAULT
88
89 @defer.inlineCallbacks
90 def get_best_peer_algo(self, to_jid, profile):
91 """Return the best available hashing algorith of other peer
92
93 @param to_jid(jid.JID): peer jid
94 @parm profile: %(doc_profile)s
95 @return (D(unicode, None)): best available algorithm,
96 or None if hashing is not possible
97 """
98 client = self.host.get_client(profile)
99 for algo in reversed(XEP_0300.ALGOS):
100 has_feature = yield self.host.hasFeature(
101 client, NS_HASHES_FUNCTIONS.format(algo), to_jid
102 )
103 if has_feature:
104 log.debug(
105 "Best hashing algorithm found for {jid}: {algo}".format(
106 jid=to_jid.full(), algo=algo
107 )
108 )
109 defer.returnValue(algo)
110
111 def _calculate_hash_blocking(self, file_obj, hasher):
112 """Calculate hash in a blocking way
113
114 /!\\ blocking method, please use calculate_hash instead
115 @param file_obj(file): a file-like object
116 @param hasher(hash object): the method to call to initialise hash object
117 @return (str): the hex digest of the hash
118 """
119 while True:
120 buf = file_obj.read(BUFFER_SIZE)
121 if not buf:
122 break
123 hasher.update(buf)
124 return hasher.hexdigest()
125
126 def calculate_hash(self, file_obj, hasher):
127 return threads.deferToThread(self._calculate_hash_blocking, file_obj, hasher)
128
129 def calculate_hash_elt(self, file_obj=None, algo=ALGO_DEFAULT):
130 """Compute hash and build hash element
131
132 @param file_obj(file, None): file-like object to use to calculate the hash
133 @param algo(unicode): algorithme to use, must be a key of XEP_0300.ALGOS
134 @return (D(domish.Element)): hash element
135 """
136
137 def hash_calculated(hash_):
138 return self.build_hash_elt(hash_, algo)
139
140 hasher = self.get_hasher(algo)
141 hash_d = self.calculate_hash(file_obj, hasher)
142 hash_d.addCallback(hash_calculated)
143 return hash_d
144
145 def build_hash_used_elt(self, algo=ALGO_DEFAULT):
146 hash_used_elt = domish.Element((NS_HASHES, "hash-used"))
147 hash_used_elt["algo"] = algo
148 return hash_used_elt
149
150 def parse_hash_used_elt(self, parent):
151 """Find and parse a hash-used element
152
153 @param (domish.Element): parent of <hash/> element
154 @return (unicode): hash algorithm used
155 @raise exceptions.NotFound: the element is not present
156 @raise exceptions.DataError: the element is invalid
157 """
158 try:
159 hash_used_elt = next(parent.elements(NS_HASHES, "hash-used"))
160 except StopIteration:
161 raise exceptions.NotFound
162 algo = hash_used_elt["algo"]
163 if not algo:
164 raise exceptions.DataError
165 return algo
166
167 def build_hash_elt(self, hash_, algo=ALGO_DEFAULT):
168 """Compute hash and build hash element
169
170 @param hash_(str): hash to use
171 @param algo(unicode): algorithme to use, must be a key of XEP_0300.ALGOS
172 @return (domish.Element): computed hash
173 """
174 assert hash_
175 assert algo
176 hash_elt = domish.Element((NS_HASHES, "hash"))
177 if hash_ is not None:
178 b64_hash = base64.b64encode(hash_.encode('utf-8')).decode('utf-8')
179 hash_elt.addContent(b64_hash)
180 hash_elt["algo"] = algo
181 return hash_elt
182
183 def parse_hash_elt(self, parent: domish.Element) -> Tuple[str, bytes]:
184 """Find and parse a hash element
185
186 if multiple elements are found, the strongest managed one is returned
187 @param parent: parent of <hash/> element
188 @return: (algo, hash) tuple
189 both values can be None if <hash/> is empty
190 @raise exceptions.NotFound: the element is not present
191 @raise exceptions.DataError: the element is invalid
192 """
193 algos = list(XEP_0300.ALGOS.keys())
194 hash_elt = None
195 best_algo = None
196 best_value = None
197 for hash_elt in parent.elements(NS_HASHES, "hash"):
198 algo = hash_elt.getAttribute("algo")
199 try:
200 idx = algos.index(algo)
201 except ValueError:
202 log.warning(f"Proposed {algo} algorithm is not managed")
203 algo = None
204 continue
205
206 if best_algo is None or algos.index(best_algo) < idx:
207 best_algo = algo
208 best_value = base64.b64decode(str(hash_elt)).decode('utf-8')
209
210 if not hash_elt:
211 raise exceptions.NotFound
212 if not best_algo or not best_value:
213 raise exceptions.DataError
214 return best_algo, best_value
215
216
217 @implementer(iwokkel.IDisco)
218 class XEP_0300_handler(XMPPHandler):
219
220 def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
221 hash_functions_names = [
222 disco.DiscoFeature(NS_HASHES_FUNCTIONS.format(algo))
223 for algo in XEP_0300.ALGOS
224 ]
225 return [disco.DiscoFeature(NS_HASHES)] + hash_functions_names
226
227 def getDiscoItems(self, requestor, target, nodeIdentifier=""):
228 return []