Mercurial > libervia-backend
comparison libervia/backend/plugins/plugin_sec_pubsub_signing.py @ 4270:0d7bb4df2343
Reformatted code base using black.
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 19 Jun 2024 18:44:57 +0200 |
parents | 4b842c1fb686 |
children |
comparison
equal
deleted
inserted
replaced
4269:64a85ce8be70 | 4270:0d7bb4df2343 |
---|---|
113 # FIXME: xml_tools.domish_elt_2_et_elt must be used once implementation is | 113 # FIXME: xml_tools.domish_elt_2_et_elt must be used once implementation is |
114 # complete. For now serialisation/deserialisation is more secure. | 114 # complete. For now serialisation/deserialisation is more secure. |
115 # et_sign_data_elt = xml_tools.domish_elt_2_et_elt(sign_data_elt, True) | 115 # et_sign_data_elt = xml_tools.domish_elt_2_et_elt(sign_data_elt, True) |
116 et_sign_data_elt = etree.fromstring(sign_data_elt.toXml()) | 116 et_sign_data_elt = etree.fromstring(sign_data_elt.toXml()) |
117 to_sign = etree.tostring( | 117 to_sign = etree.tostring( |
118 et_sign_data_elt, | 118 et_sign_data_elt, method="c14n2", with_comments=False, strip_text=True |
119 method="c14n2", | |
120 with_comments=False, | |
121 strip_text=True | |
122 ) | 119 ) |
123 # the data to sign is serialised, we cna restore original values | 120 # the data to sign is serialised, we cna restore original values |
124 item_elt["id"] = item_id | 121 item_elt["id"] = item_id |
125 if item_publisher is not None: | 122 if item_publisher is not None: |
126 item_elt["publisher"] = item_publisher | 123 item_elt["publisher"] = item_publisher |
139 self.check( | 136 self.check( |
140 self.host.get_client(profile_key), | 137 self.host.get_client(profile_key), |
141 jid.JID(service), | 138 jid.JID(service), |
142 node, | 139 node, |
143 item_id, | 140 item_id, |
144 data_format.deserialise(signature_data_s) | 141 data_format.deserialise(signature_data_s), |
145 ) | 142 ) |
146 ) | 143 ) |
147 d.addCallback(data_format.serialise) | 144 d.addCallback(data_format.serialise) |
148 return d | 145 return d |
149 | 146 |
153 service: jid.JID, | 150 service: jid.JID, |
154 node: str, | 151 node: str, |
155 item_id: str, | 152 item_id: str, |
156 signature_data: Dict[str, Any], | 153 signature_data: Dict[str, Any], |
157 ) -> Dict[str, Any]: | 154 ) -> Dict[str, Any]: |
158 items, __ = await self._p.get_items( | 155 items, __ = await self._p.get_items(client, service, node, item_ids=[item_id]) |
159 client, service, node, item_ids=[item_id] | |
160 ) | |
161 if not items != 1: | 156 if not items != 1: |
162 raise exceptions.NotFound( | 157 raise exceptions.NotFound( |
163 f"target item not found for {item_id!r} at {node!r} for {service}" | 158 f"target item not found for {item_id!r} at {node!r} for {service}" |
164 ) | 159 ) |
165 item_elt = items[0] | 160 item_elt = items[0] |
170 if len(signers) > 1: | 165 if len(signers) > 1: |
171 raise NotImplemented("multiple signers are not supported yet") | 166 raise NotImplemented("multiple signers are not supported yet") |
172 signer = jid.JID(signers[0]) | 167 signer = jid.JID(signers[0]) |
173 signature = base64.b64decode(signature_data["signature"]) | 168 signature = base64.b64decode(signature_data["signature"]) |
174 verification_keys = { | 169 verification_keys = { |
175 k for k in await self._ox.import_all_public_keys(client, signer) | 170 k |
171 for k in await self._ox.import_all_public_keys(client, signer) | |
176 if client.gpg_provider.can_sign(k) | 172 if client.gpg_provider.can_sign(k) |
177 } | 173 } |
178 signed_data = self.get_data_to_sign(item_elt, service, timestamp, signer.full()) | 174 signed_data = self.get_data_to_sign(item_elt, service, timestamp, signer.full()) |
179 try: | 175 try: |
180 client.gpg_provider.verify_detached(signed_data, signature, verification_keys) | 176 client.gpg_provider.verify_detached(signed_data, signature, verification_keys) |
198 client: SatXMPPEntity, | 194 client: SatXMPPEntity, |
199 attachments_elt: domish.Element, | 195 attachments_elt: domish.Element, |
200 data: Dict[str, Any], | 196 data: Dict[str, Any], |
201 ) -> None: | 197 ) -> None: |
202 try: | 198 try: |
203 signature_elt = next( | 199 signature_elt = next(attachments_elt.elements(NS_PUBSUB_SIGNING, "signature")) |
204 attachments_elt.elements(NS_PUBSUB_SIGNING, "signature") | |
205 ) | |
206 except StopIteration: | 200 except StopIteration: |
207 pass | 201 pass |
208 else: | 202 else: |
209 time_elts = list(signature_elt.elements(NS_PUBSUB_SIGNING, "time")) | 203 time_elts = list(signature_elt.elements(NS_PUBSUB_SIGNING, "time")) |
210 if len(time_elts) != 1: | 204 if len(time_elts) != 1: |
218 | 212 |
219 signature_data: Dict[str, Any] = { | 213 signature_data: Dict[str, Any] = { |
220 "timestamp": timestamp, | 214 "timestamp": timestamp, |
221 "signers": [ | 215 "signers": [ |
222 str(s) for s in signature_elt.elements(NS_PUBSUB_SIGNING, "signer") | 216 str(s) for s in signature_elt.elements(NS_PUBSUB_SIGNING, "signer") |
223 ] | 217 ], |
224 } | 218 } |
225 # FIXME: only OpenPGP signature is available for now, to be updated if and | 219 # FIXME: only OpenPGP signature is available for now, to be updated if and |
226 # when more algorithms are available. | 220 # when more algorithms are available. |
227 sign_elt = next( | 221 sign_elt = next( |
228 signature_elt.elements(NS_PUBSUB_SIGNING_OPENPGP, "sign"), | 222 signature_elt.elements(NS_PUBSUB_SIGNING_OPENPGP, "sign"), None |
229 None | |
230 ) | 223 ) |
231 if sign_elt is None: | 224 if sign_elt is None: |
232 log.warning( | 225 log.warning( |
233 "no known signature profile element found, ignoring signature: " | 226 "no known signature profile element found, ignoring signature: " |
234 f"{signature_elt.toXml()}" | 227 f"{signature_elt.toXml()}" |
241 | 234 |
242 async def signature_set( | 235 async def signature_set( |
243 self, | 236 self, |
244 client: SatXMPPEntity, | 237 client: SatXMPPEntity, |
245 attachments_data: Dict[str, Any], | 238 attachments_data: Dict[str, Any], |
246 former_elt: Optional[domish.Element] | 239 former_elt: Optional[domish.Element], |
247 ) -> Optional[domish.Element]: | 240 ) -> Optional[domish.Element]: |
248 signature_data = attachments_data["extra"].get("signature") | 241 signature_data = attachments_data["extra"].get("signature") |
249 if signature_data is None: | 242 if signature_data is None: |
250 return former_elt | 243 return former_elt |
251 elif signature_data: | 244 elif signature_data: |
275 time_elt["stamp"] = timestamp_xmpp | 268 time_elt["stamp"] = timestamp_xmpp |
276 signature_elt.addElement("signer", content=signer) | 269 signature_elt.addElement("signer", content=signer) |
277 | 270 |
278 sign_elt = signature_elt.addElement((NS_PUBSUB_SIGNING_OPENPGP, "sign")) | 271 sign_elt = signature_elt.addElement((NS_PUBSUB_SIGNING_OPENPGP, "sign")) |
279 signing_keys = { | 272 signing_keys = { |
280 k for k in self._ox.list_secret_keys(client) | 273 k |
274 for k in self._ox.list_secret_keys(client) | |
281 if client.gpg_provider.can_sign(k.public_key) | 275 if client.gpg_provider.can_sign(k.public_key) |
282 } | 276 } |
283 # the base64 encoded signature itself | 277 # the base64 encoded signature itself |
284 sign_elt.addContent( | 278 sign_elt.addContent( |
285 base64.b64encode( | 279 base64.b64encode( |
296 service: jid.JID, | 290 service: jid.JID, |
297 node: str, | 291 node: str, |
298 items: Optional[List[domish.Element]], | 292 items: Optional[List[domish.Element]], |
299 options: Optional[dict], | 293 options: Optional[dict], |
300 sender: jid.JID, | 294 sender: jid.JID, |
301 extra: Dict[str, Any] | 295 extra: Dict[str, Any], |
302 ) -> bool: | 296 ) -> bool: |
303 if not items or not extra.get("signed"): | 297 if not items or not extra.get("signed"): |
304 return True | 298 return True |
305 | 299 |
306 for item_elt in items: | 300 for item_elt in items: |
316 "extra": { | 310 "extra": { |
317 "signature": { | 311 "signature": { |
318 "item_elt": item_elt, | 312 "item_elt": item_elt, |
319 "signer": sender.userhost(), | 313 "signer": sender.userhost(), |
320 } | 314 } |
321 } | 315 }, |
322 } | 316 }, |
323 ) | 317 ) |
324 | 318 |
325 return True | 319 return True |
326 | 320 |
327 | 321 |