Mercurial > libervia-backend
comparison sat/plugins/plugin_xep_0363.py @ 3089:e75024e41f81
plugin upload, XEP-0363: code modernisation + preparation for extension:
- use of async/await syntax
- fileUpload's options are now serialised, allowing non string values
- (XEP-0363) Slot is now a dataclass, so it can be modified by other plugins
- (XEP-0363) Moved SSL related code to the new tools.web module
- (XEP-0363) added `XEP-0363_upload_size` and `XEP-0363_upload` trigger points
- a Deferred is not used anymore for `progress_id`, the value is directly returned
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 20 Dec 2019 12:28:04 +0100 |
parents | fee60f17ebac |
children | 9d0df638c8b4 |
comparison
equal
deleted
inserted
replaced
3088:d1464548055a | 3089:e75024e41f81 |
---|---|
15 # GNU Affero General Public License for more details. | 15 # GNU Affero General Public License for more details. |
16 | 16 |
17 # You should have received a copy of the GNU Affero General Public License | 17 # You should have received a copy of the GNU Affero General Public License |
18 # along with this program. If not, see <http://www.gnu.org/licenses/>. | 18 # along with this program. If not, see <http://www.gnu.org/licenses/>. |
19 | 19 |
20 from sat.core.i18n import _ | 20 import os.path |
21 from sat.core.constants import Const as C | 21 import mimetypes |
22 from sat.core.log import getLogger | 22 from dataclasses import dataclass |
23 | |
24 log = getLogger(__name__) | |
25 from sat.core import exceptions | |
26 from wokkel import disco, iwokkel | 23 from wokkel import disco, iwokkel |
27 from zope.interface import implementer | 24 from zope.interface import implementer |
28 from twisted.words.protocols.jabber import jid | 25 from twisted.words.protocols.jabber import jid |
29 from twisted.words.protocols.jabber.xmlstream import XMPPHandler | 26 from twisted.words.protocols.jabber.xmlstream import XMPPHandler |
30 from twisted.internet import reactor | 27 from twisted.internet import reactor |
31 from twisted.internet import defer | 28 from twisted.internet import defer |
32 from twisted.internet import ssl | |
33 from twisted.internet.interfaces import IOpenSSLClientConnectionCreator | |
34 from twisted.web import client as http_client | 29 from twisted.web import client as http_client |
35 from twisted.web import http_headers | 30 from twisted.web import http_headers |
36 from twisted.web import iweb | 31 from sat.core.i18n import _ |
37 from twisted.python import failure | 32 from sat.core.constants import Const as C |
38 from collections import namedtuple | 33 from sat.core.log import getLogger |
39 from OpenSSL import SSL | 34 from sat.core import exceptions |
40 import os.path | 35 from sat.tools import web as sat_web |
41 import mimetypes | 36 |
42 | 37 |
38 log = getLogger(__name__) | |
43 | 39 |
44 PLUGIN_INFO = { | 40 PLUGIN_INFO = { |
45 C.PI_NAME: "HTTP File Upload", | 41 C.PI_NAME: "HTTP File Upload", |
46 C.PI_IMPORT_NAME: "XEP-0363", | 42 C.PI_IMPORT_NAME: "XEP-0363", |
47 C.PI_TYPE: "XEP", | 43 C.PI_TYPE: "XEP", |
54 | 50 |
55 NS_HTTP_UPLOAD = "urn:xmpp:http:upload:0" | 51 NS_HTTP_UPLOAD = "urn:xmpp:http:upload:0" |
56 ALLOWED_HEADERS = ('authorization', 'cookie', 'expires') | 52 ALLOWED_HEADERS = ('authorization', 'cookie', 'expires') |
57 | 53 |
58 | 54 |
59 Slot = namedtuple("Slot", ["put", "get", "headers"]) | 55 @dataclass |
60 | 56 class Slot: |
61 | 57 """Upload slot""" |
62 @implementer(IOpenSSLClientConnectionCreator) | 58 put: str |
63 class NoCheckConnectionCreator(object): | 59 get: str |
64 def __init__(self, hostname, ctx): | 60 headers: list |
65 self._ctx = ctx | |
66 | |
67 def clientConnectionForTLS(self, tlsProtocol): | |
68 context = self._ctx | |
69 connection = SSL.Connection(context, None) | |
70 connection.set_app_data(tlsProtocol) | |
71 return connection | |
72 | |
73 | |
74 @implementer(iweb.IPolicyForHTTPS) | |
75 class NoCheckContextFactory(ssl.ClientContextFactory): | |
76 """Context factory which doesn't do TLS certificate check | |
77 | |
78 /!\\ it's obvisously a security flaw to use this class, | |
79 and it should be used only with explicite agreement from the end used | |
80 """ | |
81 | |
82 def creatorForNetloc(self, hostname, port): | |
83 log.warning( | |
84 "TLS check disabled for {host} on port {port}".format( | |
85 host=hostname, port=port | |
86 ) | |
87 ) | |
88 certificateOptions = ssl.CertificateOptions(trustRoot=None) | |
89 return NoCheckConnectionCreator(hostname, certificateOptions.getContext()) | |
90 | 61 |
91 | 62 |
92 class XEP_0363(object): | 63 class XEP_0363(object): |
93 def __init__(self, host): | 64 def __init__(self, host): |
94 log.info(_("plugin HTTP File Upload initialization")) | 65 log.info(_("plugin HTTP File Upload initialization")) |
113 ) | 84 ) |
114 | 85 |
115 def getHandler(self, client): | 86 def getHandler(self, client): |
116 return XEP_0363_handler() | 87 return XEP_0363_handler() |
117 | 88 |
118 @defer.inlineCallbacks | 89 async def getHTTPUploadEntity(self, client, upload_jid=None): |
119 def getHTTPUploadEntity(self, upload_jid=None, profile=C.PROF_KEY_NONE): | |
120 """Get HTTP upload capable entity | 90 """Get HTTP upload capable entity |
121 | 91 |
122 upload_jid is checked, then its components | 92 upload_jid is checked, then its components |
123 @param upload_jid(None, jid.JID): entity to check | 93 @param upload_jid(None, jid.JID): entity to check |
124 @return(D(jid.JID)): first HTTP upload capable entity | 94 @return(D(jid.JID)): first HTTP upload capable entity |
125 @raise exceptions.NotFound: no entity found | 95 @raise exceptions.NotFound: no entity found |
126 """ | 96 """ |
127 client = self.host.getClient(profile) | |
128 try: | 97 try: |
129 entity = client.http_upload_service | 98 entity = client.http_upload_service |
130 except AttributeError: | 99 except AttributeError: |
131 found_entities = yield self.host.findFeaturesSet(client, (NS_HTTP_UPLOAD,)) | 100 found_entities = await self.host.findFeaturesSet(client, (NS_HTTP_UPLOAD,)) |
132 try: | 101 try: |
133 entity = client.http_upload_service = next(iter(found_entities)) | 102 entity = client.http_upload_service = next(iter(found_entities)) |
134 except StopIteration: | 103 except StopIteration: |
135 entity = client.http_upload_service = None | 104 entity = client.http_upload_service = None |
136 | 105 |
137 if entity is None: | 106 if entity is None: |
138 raise failure.Failure(exceptions.NotFound("No HTTP upload entity found")) | 107 raise exceptions.NotFound("No HTTP upload entity found") |
139 | 108 |
140 defer.returnValue(entity) | 109 return entity |
141 | 110 |
142 def _fileHTTPUpload(self, filepath, filename="", upload_jid="", | 111 def _fileHTTPUpload(self, filepath, filename="", upload_jid="", |
143 ignore_tls_errors=False, profile=C.PROF_KEY_NONE): | 112 ignore_tls_errors=False, profile=C.PROF_KEY_NONE): |
144 assert os.path.isabs(filepath) and os.path.isfile(filepath) | 113 assert os.path.isabs(filepath) and os.path.isfile(filepath) |
145 progress_id_d, __ = self.fileHTTPUpload( | 114 client = self.host.getClient(profile) |
115 progress_id_d, __ = defer.ensureDeferred(self.fileHTTPUpload( | |
116 client, | |
146 filepath, | 117 filepath, |
147 filename or None, | 118 filename or None, |
148 jid.JID(upload_jid) if upload_jid else None, | 119 jid.JID(upload_jid) if upload_jid else None, |
149 {"ignore_tls_errors": ignore_tls_errors}, | 120 {"ignore_tls_errors": ignore_tls_errors}, |
150 profile, | 121 )) |
151 ) | |
152 return progress_id_d | 122 return progress_id_d |
153 | 123 |
154 def fileHTTPUpload(self, filepath, filename=None, upload_jid=None, options=None, | 124 async def fileHTTPUpload( |
155 profile=C.PROF_KEY_NONE): | 125 self, client, filepath, filename=None, upload_jid=None, options=None): |
156 """Upload a file through HTTP | 126 """Upload a file through HTTP |
157 | 127 |
158 @param filepath(str): absolute path of the file | 128 @param filepath(str): absolute path of the file |
159 @param filename(None, unicode): name to use for the upload | 129 @param filename(None, unicode): name to use for the upload |
160 None to use basename of the path | 130 None to use basename of the path |
167 download URL | 137 download URL |
168 """ | 138 """ |
169 if options is None: | 139 if options is None: |
170 options = {} | 140 options = {} |
171 ignore_tls_errors = options.get("ignore_tls_errors", False) | 141 ignore_tls_errors = options.get("ignore_tls_errors", False) |
172 client = self.host.getClient(profile) | |
173 filename = filename or os.path.basename(filepath) | 142 filename = filename or os.path.basename(filepath) |
174 size = os.path.getsize(filepath) | 143 size = os.path.getsize(filepath) |
175 progress_id_d = defer.Deferred() | 144 |
176 download_d = defer.Deferred() | 145 |
177 d = self.getSlot(client, filename, size, upload_jid=upload_jid) | 146 size_adjust = [] |
178 d.addCallbacks( | 147 #: this trigger can be used to modify the requested size, it is notably useful |
179 self._getSlotCb, | 148 #: with encryption. The size_adjust is a list which can be filled by int to add |
180 self._getSlotEb, | 149 #: to the initial size |
181 (client, progress_id_d, download_d, filepath, size, ignore_tls_errors), | 150 self.host.trigger.point( |
182 None, | 151 "XEP-0363_upload_size", client, options, filepath, size, size_adjust, |
183 (client, progress_id_d, download_d), | 152 triggers_no_cancel=True) |
184 ) | 153 if size_adjust: |
185 return progress_id_d, download_d | 154 size = sum([size, *size_adjust]) |
186 | 155 try: |
187 def _getSlotEb(self, fail, client, progress_id_d, download_d): | 156 slot = await self.getSlot(client, filename, size, upload_jid=upload_jid) |
188 """an error happened while trying to get slot""" | 157 except Exception as e: |
189 log.warning("Can't get upload slot: {reason}".format(reason=fail.value)) | 158 log.warning(_("Can't get upload slot: {reason}").format(reason=e)) |
190 progress_id_d.errback(fail) | 159 raise e |
191 download_d.errback(fail) | |
192 | |
193 def _getSlotCb(self, slot, client, progress_id_d, download_d, path, size, | |
194 ignore_tls_errors=False): | |
195 """Called when slot is received, try to do the upload | |
196 | |
197 @param slot(Slot): slot instance with the get and put urls | |
198 @param progress_id_d(defer.Deferred): Deferred to call when progress_id is known | |
199 @param progress_id_d(defer.Deferred): Deferred to call with URL when upload is | |
200 done | |
201 @param path(str): path to the file to upload | |
202 @param size(int): size of the file to upload | |
203 @param ignore_tls_errors(bool): ignore TLS certificate is True | |
204 @return (tuple | |
205 """ | |
206 log.debug(f"Got upload slot: {slot}") | |
207 sat_file = self.host.plugins["FILE"].File( | |
208 self.host, client, path, size=size, auto_end_signals=False | |
209 ) | |
210 progress_id_d.callback(sat_file.uid) | |
211 file_producer = http_client.FileBodyProducer(sat_file) | |
212 if ignore_tls_errors: | |
213 agent = http_client.Agent(reactor, NoCheckContextFactory()) | |
214 else: | 160 else: |
215 agent = http_client.Agent(reactor) | 161 log.debug(f"Got upload slot: {slot}") |
216 | 162 sat_file = self.host.plugins["FILE"].File( |
217 headers = {"User-Agent": [C.APP_NAME.encode("utf-8")]} | 163 self.host, client, filepath, size=size, auto_end_signals=False |
218 for name, value in slot.headers: | 164 ) |
219 name = name.encode('utf-8') | 165 progress_id = sat_file.uid |
220 value = value.encode('utf-8') | 166 |
221 headers[name] = value | 167 file_producer = http_client.FileBodyProducer(sat_file) |
222 | 168 |
223 d = agent.request( | 169 if ignore_tls_errors: |
224 b"PUT", | 170 agent = http_client.Agent(reactor, sat_web.NoCheckContextFactory()) |
225 slot.put.encode("utf-8"), | 171 else: |
226 http_headers.Headers(headers), | 172 agent = http_client.Agent(reactor) |
227 file_producer, | 173 |
228 ) | 174 headers = {"User-Agent": [C.APP_NAME.encode("utf-8")]} |
229 d.addCallbacks( | 175 |
230 self._uploadCb, | 176 for name, value in slot.headers: |
231 self._uploadEb, | 177 name = name.encode('utf-8') |
232 (sat_file, slot, download_d), | 178 value = value.encode('utf-8') |
233 None, | 179 headers[name] = value |
234 (sat_file, download_d), | 180 |
235 ) | 181 |
236 return d | 182 await self.host.trigger.asyncPoint( |
237 | 183 "XEP-0363_upload", client, options, sat_file, file_producer, slot, |
238 def _uploadCb(self, __, sat_file, slot, download_d): | 184 triggers_no_cancel=True) |
185 | |
186 download_d = agent.request( | |
187 b"PUT", | |
188 slot.put.encode("utf-8"), | |
189 http_headers.Headers(headers), | |
190 file_producer, | |
191 ) | |
192 download_d.addCallbacks( | |
193 self._uploadCb, | |
194 self._uploadEb, | |
195 (sat_file, slot), | |
196 None, | |
197 (sat_file), | |
198 ) | |
199 | |
200 return progress_id, download_d | |
201 | |
202 def _uploadCb(self, __, sat_file, slot): | |
239 """Called once file is successfully uploaded | 203 """Called once file is successfully uploaded |
240 | 204 |
241 @param sat_file(SatFile): file used for the upload | 205 @param sat_file(SatFile): file used for the upload |
242 should be closed, be is needed to send the progressFinished signal | 206 should be closed, but it is needed to send the progressFinished signal |
243 @param slot(Slot): put/get urls | 207 @param slot(Slot): put/get urls |
244 """ | 208 """ |
245 log.info("HTTP upload finished") | 209 log.info("HTTP upload finished") |
246 sat_file.progressFinished({"url": slot.get}) | 210 sat_file.progressFinished({"url": slot.get}) |
247 download_d.callback(slot.get) | 211 return slot.get |
248 | 212 |
249 def _uploadEb(self, fail, sat_file, download_d): | 213 def _uploadEb(self, failure_, sat_file): |
250 """Called on unsuccessful upload | 214 """Called on unsuccessful upload |
251 | 215 |
252 @param sat_file(SatFile): file used for the upload | 216 @param sat_file(SatFile): file used for the upload |
253 should be closed, be is needed to send the progressError signal | 217 should be closed, be is needed to send the progressError signal |
254 """ | 218 """ |
255 download_d.errback(fail) | |
256 try: | 219 try: |
257 wrapped_fail = fail.value.reasons[0] | 220 wrapped_fail = failure_.value.reasons[0] |
258 except (AttributeError, IndexError) as e: | 221 except (AttributeError, IndexError) as e: |
259 log.warning(_("upload failed: {reason}").format(reason=e)) | 222 log.warning(_("upload failed: {reason}").format(reason=e)) |
260 sat_file.progressError(str(fail)) | 223 sat_file.progressError(str(failure_)) |
261 raise fail | |
262 else: | 224 else: |
263 if wrapped_fail.check(SSL.Error): | 225 if wrapped_fail.check(sat_web.SSLError): |
264 msg = "TLS validation error, can't connect to HTTPS server" | 226 msg = "TLS validation error, can't connect to HTTPS server" |
265 else: | 227 else: |
266 msg = "can't upload file" | 228 msg = "can't upload file" |
267 log.warning(msg + ": " + str(wrapped_fail.value)) | 229 log.warning(msg + ": " + str(wrapped_fail.value)) |
268 sat_file.progressError(msg) | 230 sat_file.progressError(msg) |
269 | 231 raise failure_ |
270 def _gotSlot(self, iq_elt, client): | |
271 """Slot have been received | |
272 | |
273 This method convert the iq_elt result to a Slot instance | |
274 @param iq_elt(domish.Element): <IQ/> result as specified in XEP-0363 | |
275 """ | |
276 try: | |
277 slot_elt = next(iq_elt.elements(NS_HTTP_UPLOAD, "slot")) | |
278 put_elt = next(slot_elt.elements(NS_HTTP_UPLOAD, "put")) | |
279 put_url = put_elt['url'] | |
280 get_elt = next(slot_elt.elements(NS_HTTP_UPLOAD, "get")) | |
281 get_url = get_elt['url'] | |
282 except (StopIteration, KeyError): | |
283 raise exceptions.DataError("Incorrect stanza received from server") | |
284 headers = [] | |
285 for header_elt in put_elt.elements(NS_HTTP_UPLOAD, "header"): | |
286 try: | |
287 name = header_elt["name"] | |
288 value = str(header_elt) | |
289 except KeyError: | |
290 log.warning(_("Invalid header element: {xml}").format( | |
291 iq_elt.toXml())) | |
292 continue | |
293 name = name.replace('\n', '') | |
294 value = value.replace('\n', '') | |
295 if name.lower() not in ALLOWED_HEADERS: | |
296 log.warning(_('Ignoring unauthorised header "{name}": {xml}') | |
297 .format(name=name, xml = iq_elt.toXml())) | |
298 continue | |
299 headers.append((name, value)) | |
300 | |
301 slot = Slot(put=put_url, get=get_url, headers=tuple(headers)) | |
302 return slot | |
303 | 232 |
304 def _getSlot(self, filename, size, content_type, upload_jid, | 233 def _getSlot(self, filename, size, content_type, upload_jid, |
305 profile_key=C.PROF_KEY_NONE): | 234 profile_key=C.PROF_KEY_NONE): |
306 """Get an upload slot | 235 """Get an upload slot |
307 | 236 |
310 @param size(int): size of the file (must be non null) | 239 @param size(int): size of the file (must be non null) |
311 @param upload_jid(jid.JID(), None, ''): HTTP upload capable entity | 240 @param upload_jid(jid.JID(), None, ''): HTTP upload capable entity |
312 @param content_type(unicode, None): MIME type of the content | 241 @param content_type(unicode, None): MIME type of the content |
313 empty string or None to guess automatically | 242 empty string or None to guess automatically |
314 """ | 243 """ |
244 client = self.host.getClient(profile_key) | |
315 filename = filename.replace("/", "_") | 245 filename = filename.replace("/", "_") |
316 client = self.host.getClient(profile_key) | 246 return defer.ensureDeferred(self.getSlot( |
317 return self.getSlot( | |
318 client, filename, size, content_type or None, upload_jid or None | 247 client, filename, size, content_type or None, upload_jid or None |
319 ) | 248 )) |
320 | 249 |
321 def getSlot(self, client, filename, size, content_type=None, upload_jid=None): | 250 async def getSlot(self, client, filename, size, content_type=None, upload_jid=None): |
322 """Get a slot (i.e. download/upload links) | 251 """Get a slot (i.e. download/upload links) |
323 | 252 |
324 @param filename(unicode): name to use for the upload | 253 @param filename(unicode): name to use for the upload |
325 @param size(int): size of the file to upload (must be >0) | 254 @param size(int): size of the file to upload (must be >0) |
326 @param content_type(None, unicode): MIME type of the content | 255 @param content_type(None, unicode): MIME type of the content |
338 | 267 |
339 if upload_jid is None: | 268 if upload_jid is None: |
340 try: | 269 try: |
341 upload_jid = client.http_upload_service | 270 upload_jid = client.http_upload_service |
342 except AttributeError: | 271 except AttributeError: |
343 d = self.getHTTPUploadEntity(profile=client.profile) | 272 found_entity = await self.getHTTPUploadEntity(profile=client.profile) |
344 d.addCallback( | 273 return await self.getSlot( |
345 lambda found_entity: self.getSlot( | 274 client, filename, size, content_type, found_entity) |
346 client, filename, size, content_type, found_entity | |
347 ) | |
348 ) | |
349 return d | |
350 else: | 275 else: |
351 if upload_jid is None: | 276 if upload_jid is None: |
352 raise failure.Failure( | 277 raise exceptions.NotFound("No HTTP upload entity found") |
353 exceptions.NotFound("No HTTP upload entity found") | |
354 ) | |
355 | 278 |
356 iq_elt = client.IQ("get") | 279 iq_elt = client.IQ("get") |
357 iq_elt["to"] = upload_jid.full() | 280 iq_elt["to"] = upload_jid.full() |
358 request_elt = iq_elt.addElement((NS_HTTP_UPLOAD, "request")) | 281 request_elt = iq_elt.addElement((NS_HTTP_UPLOAD, "request")) |
359 request_elt["filename"] = filename | 282 request_elt["filename"] = filename |
360 request_elt["size"] = str(size) | 283 request_elt["size"] = str(size) |
361 if content_type is not None: | 284 if content_type is not None: |
362 request_elt["content-type"] = content_type | 285 request_elt["content-type"] = content_type |
363 | 286 |
364 d = iq_elt.send() | 287 iq_result_elt = await iq_elt.send() |
365 d.addCallback(self._gotSlot, client) | 288 |
366 | 289 try: |
367 return d | 290 slot_elt = next(iq_result_elt.elements(NS_HTTP_UPLOAD, "slot")) |
291 put_elt = next(slot_elt.elements(NS_HTTP_UPLOAD, "put")) | |
292 put_url = put_elt['url'] | |
293 get_elt = next(slot_elt.elements(NS_HTTP_UPLOAD, "get")) | |
294 get_url = get_elt['url'] | |
295 except (StopIteration, KeyError): | |
296 raise exceptions.DataError("Incorrect stanza received from server") | |
297 | |
298 headers = [] | |
299 for header_elt in put_elt.elements(NS_HTTP_UPLOAD, "header"): | |
300 try: | |
301 name = header_elt["name"] | |
302 value = str(header_elt) | |
303 except KeyError: | |
304 log.warning(_("Invalid header element: {xml}").format( | |
305 iq_result_elt.toXml())) | |
306 continue | |
307 name = name.replace('\n', '') | |
308 value = value.replace('\n', '') | |
309 if name.lower() not in ALLOWED_HEADERS: | |
310 log.warning(_('Ignoring unauthorised header "{name}": {xml}') | |
311 .format(name=name, xml = iq_result_elt.toXml())) | |
312 continue | |
313 headers.append((name, value)) | |
314 | |
315 return Slot(put=put_url, get=get_url, headers=headers) | |
368 | 316 |
369 | 317 |
370 @implementer(iwokkel.IDisco) | 318 @implementer(iwokkel.IDisco) |
371 class XEP_0363_handler(XMPPHandler): | 319 class XEP_0363_handler(XMPPHandler): |
372 | 320 |