comparison libervia/backend/plugins/plugin_sec_pubsub_signing.py @ 4270:0d7bb4df2343

Reformatted code base using black.
author Goffi <goffi@goffi.org>
date Wed, 19 Jun 2024 18:44:57 +0200
parents 4b842c1fb686
children
comparison
equal deleted inserted replaced
4269:64a85ce8be70 4270:0d7bb4df2343
113 # FIXME: xml_tools.domish_elt_2_et_elt must be used once implementation is 113 # FIXME: xml_tools.domish_elt_2_et_elt must be used once implementation is
114 # complete. For now serialisation/deserialisation is more secure. 114 # complete. For now serialisation/deserialisation is more secure.
115 # et_sign_data_elt = xml_tools.domish_elt_2_et_elt(sign_data_elt, True) 115 # et_sign_data_elt = xml_tools.domish_elt_2_et_elt(sign_data_elt, True)
116 et_sign_data_elt = etree.fromstring(sign_data_elt.toXml()) 116 et_sign_data_elt = etree.fromstring(sign_data_elt.toXml())
117 to_sign = etree.tostring( 117 to_sign = etree.tostring(
118 et_sign_data_elt, 118 et_sign_data_elt, method="c14n2", with_comments=False, strip_text=True
119 method="c14n2",
120 with_comments=False,
121 strip_text=True
122 ) 119 )
123 # the data to sign is serialised, we cna restore original values 120 # the data to sign is serialised, we cna restore original values
124 item_elt["id"] = item_id 121 item_elt["id"] = item_id
125 if item_publisher is not None: 122 if item_publisher is not None:
126 item_elt["publisher"] = item_publisher 123 item_elt["publisher"] = item_publisher
139 self.check( 136 self.check(
140 self.host.get_client(profile_key), 137 self.host.get_client(profile_key),
141 jid.JID(service), 138 jid.JID(service),
142 node, 139 node,
143 item_id, 140 item_id,
144 data_format.deserialise(signature_data_s) 141 data_format.deserialise(signature_data_s),
145 ) 142 )
146 ) 143 )
147 d.addCallback(data_format.serialise) 144 d.addCallback(data_format.serialise)
148 return d 145 return d
149 146
153 service: jid.JID, 150 service: jid.JID,
154 node: str, 151 node: str,
155 item_id: str, 152 item_id: str,
156 signature_data: Dict[str, Any], 153 signature_data: Dict[str, Any],
157 ) -> Dict[str, Any]: 154 ) -> Dict[str, Any]:
158 items, __ = await self._p.get_items( 155 items, __ = await self._p.get_items(client, service, node, item_ids=[item_id])
159 client, service, node, item_ids=[item_id]
160 )
161 if not items != 1: 156 if not items != 1:
162 raise exceptions.NotFound( 157 raise exceptions.NotFound(
163 f"target item not found for {item_id!r} at {node!r} for {service}" 158 f"target item not found for {item_id!r} at {node!r} for {service}"
164 ) 159 )
165 item_elt = items[0] 160 item_elt = items[0]
170 if len(signers) > 1: 165 if len(signers) > 1:
171 raise NotImplemented("multiple signers are not supported yet") 166 raise NotImplemented("multiple signers are not supported yet")
172 signer = jid.JID(signers[0]) 167 signer = jid.JID(signers[0])
173 signature = base64.b64decode(signature_data["signature"]) 168 signature = base64.b64decode(signature_data["signature"])
174 verification_keys = { 169 verification_keys = {
175 k for k in await self._ox.import_all_public_keys(client, signer) 170 k
171 for k in await self._ox.import_all_public_keys(client, signer)
176 if client.gpg_provider.can_sign(k) 172 if client.gpg_provider.can_sign(k)
177 } 173 }
178 signed_data = self.get_data_to_sign(item_elt, service, timestamp, signer.full()) 174 signed_data = self.get_data_to_sign(item_elt, service, timestamp, signer.full())
179 try: 175 try:
180 client.gpg_provider.verify_detached(signed_data, signature, verification_keys) 176 client.gpg_provider.verify_detached(signed_data, signature, verification_keys)
198 client: SatXMPPEntity, 194 client: SatXMPPEntity,
199 attachments_elt: domish.Element, 195 attachments_elt: domish.Element,
200 data: Dict[str, Any], 196 data: Dict[str, Any],
201 ) -> None: 197 ) -> None:
202 try: 198 try:
203 signature_elt = next( 199 signature_elt = next(attachments_elt.elements(NS_PUBSUB_SIGNING, "signature"))
204 attachments_elt.elements(NS_PUBSUB_SIGNING, "signature")
205 )
206 except StopIteration: 200 except StopIteration:
207 pass 201 pass
208 else: 202 else:
209 time_elts = list(signature_elt.elements(NS_PUBSUB_SIGNING, "time")) 203 time_elts = list(signature_elt.elements(NS_PUBSUB_SIGNING, "time"))
210 if len(time_elts) != 1: 204 if len(time_elts) != 1:
218 212
219 signature_data: Dict[str, Any] = { 213 signature_data: Dict[str, Any] = {
220 "timestamp": timestamp, 214 "timestamp": timestamp,
221 "signers": [ 215 "signers": [
222 str(s) for s in signature_elt.elements(NS_PUBSUB_SIGNING, "signer") 216 str(s) for s in signature_elt.elements(NS_PUBSUB_SIGNING, "signer")
223 ] 217 ],
224 } 218 }
225 # FIXME: only OpenPGP signature is available for now, to be updated if and 219 # FIXME: only OpenPGP signature is available for now, to be updated if and
226 # when more algorithms are available. 220 # when more algorithms are available.
227 sign_elt = next( 221 sign_elt = next(
228 signature_elt.elements(NS_PUBSUB_SIGNING_OPENPGP, "sign"), 222 signature_elt.elements(NS_PUBSUB_SIGNING_OPENPGP, "sign"), None
229 None
230 ) 223 )
231 if sign_elt is None: 224 if sign_elt is None:
232 log.warning( 225 log.warning(
233 "no known signature profile element found, ignoring signature: " 226 "no known signature profile element found, ignoring signature: "
234 f"{signature_elt.toXml()}" 227 f"{signature_elt.toXml()}"
241 234
242 async def signature_set( 235 async def signature_set(
243 self, 236 self,
244 client: SatXMPPEntity, 237 client: SatXMPPEntity,
245 attachments_data: Dict[str, Any], 238 attachments_data: Dict[str, Any],
246 former_elt: Optional[domish.Element] 239 former_elt: Optional[domish.Element],
247 ) -> Optional[domish.Element]: 240 ) -> Optional[domish.Element]:
248 signature_data = attachments_data["extra"].get("signature") 241 signature_data = attachments_data["extra"].get("signature")
249 if signature_data is None: 242 if signature_data is None:
250 return former_elt 243 return former_elt
251 elif signature_data: 244 elif signature_data:
275 time_elt["stamp"] = timestamp_xmpp 268 time_elt["stamp"] = timestamp_xmpp
276 signature_elt.addElement("signer", content=signer) 269 signature_elt.addElement("signer", content=signer)
277 270
278 sign_elt = signature_elt.addElement((NS_PUBSUB_SIGNING_OPENPGP, "sign")) 271 sign_elt = signature_elt.addElement((NS_PUBSUB_SIGNING_OPENPGP, "sign"))
279 signing_keys = { 272 signing_keys = {
280 k for k in self._ox.list_secret_keys(client) 273 k
274 for k in self._ox.list_secret_keys(client)
281 if client.gpg_provider.can_sign(k.public_key) 275 if client.gpg_provider.can_sign(k.public_key)
282 } 276 }
283 # the base64 encoded signature itself 277 # the base64 encoded signature itself
284 sign_elt.addContent( 278 sign_elt.addContent(
285 base64.b64encode( 279 base64.b64encode(
296 service: jid.JID, 290 service: jid.JID,
297 node: str, 291 node: str,
298 items: Optional[List[domish.Element]], 292 items: Optional[List[domish.Element]],
299 options: Optional[dict], 293 options: Optional[dict],
300 sender: jid.JID, 294 sender: jid.JID,
301 extra: Dict[str, Any] 295 extra: Dict[str, Any],
302 ) -> bool: 296 ) -> bool:
303 if not items or not extra.get("signed"): 297 if not items or not extra.get("signed"):
304 return True 298 return True
305 299
306 for item_elt in items: 300 for item_elt in items:
316 "extra": { 310 "extra": {
317 "signature": { 311 "signature": {
318 "item_elt": item_elt, 312 "item_elt": item_elt,
319 "signer": sender.userhost(), 313 "signer": sender.userhost(),
320 } 314 }
321 } 315 },
322 } 316 },
323 ) 317 )
324 318
325 return True 319 return True
326 320
327 321