Mercurial > libervia-backend
comparison libervia/backend/plugins/plugin_sec_pte.py @ 4270:0d7bb4df2343
Reformatted code base using black.
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 19 Jun 2024 18:44:57 +0200 |
parents | 4b842c1fb686 |
children |
comparison
equal
deleted
inserted
replaced
4269:64a85ce8be70 | 4270:0d7bb4df2343 |
---|---|
70 service: jid.JID, | 70 service: jid.JID, |
71 node: str, | 71 node: str, |
72 items: Optional[List[domish.Element]], | 72 items: Optional[List[domish.Element]], |
73 options: Optional[dict], | 73 options: Optional[dict], |
74 sender: jid.JID, | 74 sender: jid.JID, |
75 extra: Dict[str, Any] | 75 extra: Dict[str, Any], |
76 ) -> bool: | 76 ) -> bool: |
77 if not items or extra.get("encrypted_for") is None: | 77 if not items or extra.get("encrypted_for") is None: |
78 return True | 78 return True |
79 encrypt_data = extra["encrypted_for"] | 79 encrypt_data = extra["encrypted_for"] |
80 try: | 80 try: |
93 client, | 93 client, |
94 self._o.NS_TWOMEMO, | 94 self._o.NS_TWOMEMO, |
95 item, | 95 item, |
96 targets, | 96 targets, |
97 is_muc_message=False, | 97 is_muc_message=False, |
98 stanza_id=None | 98 stanza_id=None, |
99 ) | 99 ) |
100 item_elts = list(item.elements()) | 100 item_elts = list(item.elements()) |
101 if len(item_elts) != 1: | 101 if len(item_elts) != 1: |
102 raise ValueError( | 102 raise ValueError( |
103 f"there should be exactly one item payload: {item.toXml()}" | 103 f"there should be exactly one item payload: {item.toXml()}" |
124 return True | 124 return True |
125 if service is None: | 125 if service is None: |
126 service = client.jid.userhostJID() | 126 service = client.jid.userhostJID() |
127 for item in items: | 127 for item in items: |
128 payload = item.firstChildElement() | 128 payload = item.firstChildElement() |
129 if (payload is not None | 129 if ( |
130 payload is not None | |
130 and payload.name == "encrypted" | 131 and payload.name == "encrypted" |
131 and payload.uri == NS_PTE): | 132 and payload.uri == NS_PTE |
133 ): | |
132 encrypted_elt = payload | 134 encrypted_elt = payload |
133 item.children.clear() | 135 item.children.clear() |
134 try: | 136 try: |
135 encryption_type = encrypted_elt.getAttribute("type") | 137 encryption_type = encrypted_elt.getAttribute("type") |
136 encrypted_by = jid.JID(encrypted_elt["by"]) | 138 encrypted_by = jid.JID(encrypted_elt["by"]) |
137 except (KeyError, RuntimeError): | 139 except (KeyError, RuntimeError): |
138 raise exceptions.DataError( | 140 raise exceptions.DataError( |
139 f"invalid <encrypted> element: {encrypted_elt.toXml()}" | 141 f"invalid <encrypted> element: {encrypted_elt.toXml()}" |
140 ) | 142 ) |
141 if encryption_type!= self._o.NS_TWOMEMO: | 143 if encryption_type != self._o.NS_TWOMEMO: |
142 raise NotImplementedError("only TWOMEMO is supported for now") | 144 raise NotImplementedError("only TWOMEMO is supported for now") |
143 log.debug(f"decrypting item {item.getAttribute('id', '')}") | 145 log.debug(f"decrypting item {item.getAttribute('id', '')}") |
144 | 146 |
145 # FIXME: we do use _message_received_trigger now to decrypt the stanza, a | 147 # FIXME: we do use _message_received_trigger now to decrypt the stanza, a |
146 # cleaner separated decrypt method should be used | 148 # cleaner separated decrypt method should be used |
147 encrypted_elt["from"] = encrypted_by.full() | 149 encrypted_elt["from"] = encrypted_by.full() |
148 if not await self._o._message_received_trigger( | 150 if ( |
149 client, | 151 not await self._o._message_received_trigger( |
150 encrypted_elt, | 152 client, encrypted_elt, defer.Deferred() |
151 defer.Deferred() | 153 ) |
152 ) or not encrypted_elt.children: | 154 or not encrypted_elt.children |
155 ): | |
153 raise exceptions.EncryptionError("can't decrypt the message") | 156 raise exceptions.EncryptionError("can't decrypt the message") |
154 | 157 |
155 item.addChild(encrypted_elt.firstChildElement()) | 158 item.addChild(encrypted_elt.firstChildElement()) |
156 | 159 |
157 extra.setdefault("encrypted", {})[item["id"]] = { | 160 extra.setdefault("encrypted", {})[item["id"]] = { |
158 "type": NS_PTE, | 161 "type": NS_PTE, |
159 "algorithm": encryption_type | 162 "algorithm": encryption_type, |
160 } | 163 } |
161 return True | 164 return True |
162 | 165 |
163 | 166 |
164 @implementer(iwokkel.IDisco) | 167 @implementer(iwokkel.IDisco) |