comparison sat/plugins/plugin_xep_0363.py @ 3089:e75024e41f81

plugin upload, XEP-0363: code modernisation + preparation for extension: - use of async/await syntax - fileUpload's options are now serialised, allowing non string values - (XEP-0363) Slot is now a dataclass, so it can be modified by other plugins - (XEP-0363) Moved SSL related code to the new tools.web module - (XEP-0363) added `XEP-0363_upload_size` and `XEP-0363_upload` trigger points - a Deferred is not used anymore for `progress_id`, the value is directly returned
author Goffi <goffi@goffi.org>
date Fri, 20 Dec 2019 12:28:04 +0100
parents fee60f17ebac
children 9d0df638c8b4
comparison
equal deleted inserted replaced
3088:d1464548055a 3089:e75024e41f81
15 # GNU Affero General Public License for more details. 15 # GNU Affero General Public License for more details.
16 16
17 # You should have received a copy of the GNU Affero General Public License 17 # You should have received a copy of the GNU Affero General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>. 18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 19
20 from sat.core.i18n import _ 20 import os.path
21 from sat.core.constants import Const as C 21 import mimetypes
22 from sat.core.log import getLogger 22 from dataclasses import dataclass
23
24 log = getLogger(__name__)
25 from sat.core import exceptions
26 from wokkel import disco, iwokkel 23 from wokkel import disco, iwokkel
27 from zope.interface import implementer 24 from zope.interface import implementer
28 from twisted.words.protocols.jabber import jid 25 from twisted.words.protocols.jabber import jid
29 from twisted.words.protocols.jabber.xmlstream import XMPPHandler 26 from twisted.words.protocols.jabber.xmlstream import XMPPHandler
30 from twisted.internet import reactor 27 from twisted.internet import reactor
31 from twisted.internet import defer 28 from twisted.internet import defer
32 from twisted.internet import ssl
33 from twisted.internet.interfaces import IOpenSSLClientConnectionCreator
34 from twisted.web import client as http_client 29 from twisted.web import client as http_client
35 from twisted.web import http_headers 30 from twisted.web import http_headers
36 from twisted.web import iweb 31 from sat.core.i18n import _
37 from twisted.python import failure 32 from sat.core.constants import Const as C
38 from collections import namedtuple 33 from sat.core.log import getLogger
39 from OpenSSL import SSL 34 from sat.core import exceptions
40 import os.path 35 from sat.tools import web as sat_web
41 import mimetypes 36
42 37
38 log = getLogger(__name__)
43 39
44 PLUGIN_INFO = { 40 PLUGIN_INFO = {
45 C.PI_NAME: "HTTP File Upload", 41 C.PI_NAME: "HTTP File Upload",
46 C.PI_IMPORT_NAME: "XEP-0363", 42 C.PI_IMPORT_NAME: "XEP-0363",
47 C.PI_TYPE: "XEP", 43 C.PI_TYPE: "XEP",
54 50
55 NS_HTTP_UPLOAD = "urn:xmpp:http:upload:0" 51 NS_HTTP_UPLOAD = "urn:xmpp:http:upload:0"
56 ALLOWED_HEADERS = ('authorization', 'cookie', 'expires') 52 ALLOWED_HEADERS = ('authorization', 'cookie', 'expires')
57 53
58 54
59 Slot = namedtuple("Slot", ["put", "get", "headers"]) 55 @dataclass
60 56 class Slot:
61 57 """Upload slot"""
62 @implementer(IOpenSSLClientConnectionCreator) 58 put: str
63 class NoCheckConnectionCreator(object): 59 get: str
64 def __init__(self, hostname, ctx): 60 headers: list
65 self._ctx = ctx
66
67 def clientConnectionForTLS(self, tlsProtocol):
68 context = self._ctx
69 connection = SSL.Connection(context, None)
70 connection.set_app_data(tlsProtocol)
71 return connection
72
73
74 @implementer(iweb.IPolicyForHTTPS)
75 class NoCheckContextFactory(ssl.ClientContextFactory):
76 """Context factory which doesn't do TLS certificate check
77
78 /!\\ it's obvisously a security flaw to use this class,
79 and it should be used only with explicite agreement from the end used
80 """
81
82 def creatorForNetloc(self, hostname, port):
83 log.warning(
84 "TLS check disabled for {host} on port {port}".format(
85 host=hostname, port=port
86 )
87 )
88 certificateOptions = ssl.CertificateOptions(trustRoot=None)
89 return NoCheckConnectionCreator(hostname, certificateOptions.getContext())
90 61
91 62
92 class XEP_0363(object): 63 class XEP_0363(object):
93 def __init__(self, host): 64 def __init__(self, host):
94 log.info(_("plugin HTTP File Upload initialization")) 65 log.info(_("plugin HTTP File Upload initialization"))
113 ) 84 )
114 85
115 def getHandler(self, client): 86 def getHandler(self, client):
116 return XEP_0363_handler() 87 return XEP_0363_handler()
117 88
118 @defer.inlineCallbacks 89 async def getHTTPUploadEntity(self, client, upload_jid=None):
119 def getHTTPUploadEntity(self, upload_jid=None, profile=C.PROF_KEY_NONE):
120 """Get HTTP upload capable entity 90 """Get HTTP upload capable entity
121 91
122 upload_jid is checked, then its components 92 upload_jid is checked, then its components
123 @param upload_jid(None, jid.JID): entity to check 93 @param upload_jid(None, jid.JID): entity to check
124 @return(D(jid.JID)): first HTTP upload capable entity 94 @return(D(jid.JID)): first HTTP upload capable entity
125 @raise exceptions.NotFound: no entity found 95 @raise exceptions.NotFound: no entity found
126 """ 96 """
127 client = self.host.getClient(profile)
128 try: 97 try:
129 entity = client.http_upload_service 98 entity = client.http_upload_service
130 except AttributeError: 99 except AttributeError:
131 found_entities = yield self.host.findFeaturesSet(client, (NS_HTTP_UPLOAD,)) 100 found_entities = await self.host.findFeaturesSet(client, (NS_HTTP_UPLOAD,))
132 try: 101 try:
133 entity = client.http_upload_service = next(iter(found_entities)) 102 entity = client.http_upload_service = next(iter(found_entities))
134 except StopIteration: 103 except StopIteration:
135 entity = client.http_upload_service = None 104 entity = client.http_upload_service = None
136 105
137 if entity is None: 106 if entity is None:
138 raise failure.Failure(exceptions.NotFound("No HTTP upload entity found")) 107 raise exceptions.NotFound("No HTTP upload entity found")
139 108
140 defer.returnValue(entity) 109 return entity
141 110
142 def _fileHTTPUpload(self, filepath, filename="", upload_jid="", 111 def _fileHTTPUpload(self, filepath, filename="", upload_jid="",
143 ignore_tls_errors=False, profile=C.PROF_KEY_NONE): 112 ignore_tls_errors=False, profile=C.PROF_KEY_NONE):
144 assert os.path.isabs(filepath) and os.path.isfile(filepath) 113 assert os.path.isabs(filepath) and os.path.isfile(filepath)
145 progress_id_d, __ = self.fileHTTPUpload( 114 client = self.host.getClient(profile)
115 progress_id_d, __ = defer.ensureDeferred(self.fileHTTPUpload(
116 client,
146 filepath, 117 filepath,
147 filename or None, 118 filename or None,
148 jid.JID(upload_jid) if upload_jid else None, 119 jid.JID(upload_jid) if upload_jid else None,
149 {"ignore_tls_errors": ignore_tls_errors}, 120 {"ignore_tls_errors": ignore_tls_errors},
150 profile, 121 ))
151 )
152 return progress_id_d 122 return progress_id_d
153 123
154 def fileHTTPUpload(self, filepath, filename=None, upload_jid=None, options=None, 124 async def fileHTTPUpload(
155 profile=C.PROF_KEY_NONE): 125 self, client, filepath, filename=None, upload_jid=None, options=None):
156 """Upload a file through HTTP 126 """Upload a file through HTTP
157 127
158 @param filepath(str): absolute path of the file 128 @param filepath(str): absolute path of the file
159 @param filename(None, unicode): name to use for the upload 129 @param filename(None, unicode): name to use for the upload
160 None to use basename of the path 130 None to use basename of the path
167 download URL 137 download URL
168 """ 138 """
169 if options is None: 139 if options is None:
170 options = {} 140 options = {}
171 ignore_tls_errors = options.get("ignore_tls_errors", False) 141 ignore_tls_errors = options.get("ignore_tls_errors", False)
172 client = self.host.getClient(profile)
173 filename = filename or os.path.basename(filepath) 142 filename = filename or os.path.basename(filepath)
174 size = os.path.getsize(filepath) 143 size = os.path.getsize(filepath)
175 progress_id_d = defer.Deferred() 144
176 download_d = defer.Deferred() 145
177 d = self.getSlot(client, filename, size, upload_jid=upload_jid) 146 size_adjust = []
178 d.addCallbacks( 147 #: this trigger can be used to modify the requested size, it is notably useful
179 self._getSlotCb, 148 #: with encryption. The size_adjust is a list which can be filled by int to add
180 self._getSlotEb, 149 #: to the initial size
181 (client, progress_id_d, download_d, filepath, size, ignore_tls_errors), 150 self.host.trigger.point(
182 None, 151 "XEP-0363_upload_size", client, options, filepath, size, size_adjust,
183 (client, progress_id_d, download_d), 152 triggers_no_cancel=True)
184 ) 153 if size_adjust:
185 return progress_id_d, download_d 154 size = sum([size, *size_adjust])
186 155 try:
187 def _getSlotEb(self, fail, client, progress_id_d, download_d): 156 slot = await self.getSlot(client, filename, size, upload_jid=upload_jid)
188 """an error happened while trying to get slot""" 157 except Exception as e:
189 log.warning("Can't get upload slot: {reason}".format(reason=fail.value)) 158 log.warning(_("Can't get upload slot: {reason}").format(reason=e))
190 progress_id_d.errback(fail) 159 raise e
191 download_d.errback(fail)
192
193 def _getSlotCb(self, slot, client, progress_id_d, download_d, path, size,
194 ignore_tls_errors=False):
195 """Called when slot is received, try to do the upload
196
197 @param slot(Slot): slot instance with the get and put urls
198 @param progress_id_d(defer.Deferred): Deferred to call when progress_id is known
199 @param progress_id_d(defer.Deferred): Deferred to call with URL when upload is
200 done
201 @param path(str): path to the file to upload
202 @param size(int): size of the file to upload
203 @param ignore_tls_errors(bool): ignore TLS certificate is True
204 @return (tuple
205 """
206 log.debug(f"Got upload slot: {slot}")
207 sat_file = self.host.plugins["FILE"].File(
208 self.host, client, path, size=size, auto_end_signals=False
209 )
210 progress_id_d.callback(sat_file.uid)
211 file_producer = http_client.FileBodyProducer(sat_file)
212 if ignore_tls_errors:
213 agent = http_client.Agent(reactor, NoCheckContextFactory())
214 else: 160 else:
215 agent = http_client.Agent(reactor) 161 log.debug(f"Got upload slot: {slot}")
216 162 sat_file = self.host.plugins["FILE"].File(
217 headers = {"User-Agent": [C.APP_NAME.encode("utf-8")]} 163 self.host, client, filepath, size=size, auto_end_signals=False
218 for name, value in slot.headers: 164 )
219 name = name.encode('utf-8') 165 progress_id = sat_file.uid
220 value = value.encode('utf-8') 166
221 headers[name] = value 167 file_producer = http_client.FileBodyProducer(sat_file)
222 168
223 d = agent.request( 169 if ignore_tls_errors:
224 b"PUT", 170 agent = http_client.Agent(reactor, sat_web.NoCheckContextFactory())
225 slot.put.encode("utf-8"), 171 else:
226 http_headers.Headers(headers), 172 agent = http_client.Agent(reactor)
227 file_producer, 173
228 ) 174 headers = {"User-Agent": [C.APP_NAME.encode("utf-8")]}
229 d.addCallbacks( 175
230 self._uploadCb, 176 for name, value in slot.headers:
231 self._uploadEb, 177 name = name.encode('utf-8')
232 (sat_file, slot, download_d), 178 value = value.encode('utf-8')
233 None, 179 headers[name] = value
234 (sat_file, download_d), 180
235 ) 181
236 return d 182 await self.host.trigger.asyncPoint(
237 183 "XEP-0363_upload", client, options, sat_file, file_producer, slot,
238 def _uploadCb(self, __, sat_file, slot, download_d): 184 triggers_no_cancel=True)
185
186 download_d = agent.request(
187 b"PUT",
188 slot.put.encode("utf-8"),
189 http_headers.Headers(headers),
190 file_producer,
191 )
192 download_d.addCallbacks(
193 self._uploadCb,
194 self._uploadEb,
195 (sat_file, slot),
196 None,
197 (sat_file),
198 )
199
200 return progress_id, download_d
201
202 def _uploadCb(self, __, sat_file, slot):
239 """Called once file is successfully uploaded 203 """Called once file is successfully uploaded
240 204
241 @param sat_file(SatFile): file used for the upload 205 @param sat_file(SatFile): file used for the upload
242 should be closed, be is needed to send the progressFinished signal 206 should be closed, but it is needed to send the progressFinished signal
243 @param slot(Slot): put/get urls 207 @param slot(Slot): put/get urls
244 """ 208 """
245 log.info("HTTP upload finished") 209 log.info("HTTP upload finished")
246 sat_file.progressFinished({"url": slot.get}) 210 sat_file.progressFinished({"url": slot.get})
247 download_d.callback(slot.get) 211 return slot.get
248 212
249 def _uploadEb(self, fail, sat_file, download_d): 213 def _uploadEb(self, failure_, sat_file):
250 """Called on unsuccessful upload 214 """Called on unsuccessful upload
251 215
252 @param sat_file(SatFile): file used for the upload 216 @param sat_file(SatFile): file used for the upload
253 should be closed, be is needed to send the progressError signal 217 should be closed, be is needed to send the progressError signal
254 """ 218 """
255 download_d.errback(fail)
256 try: 219 try:
257 wrapped_fail = fail.value.reasons[0] 220 wrapped_fail = failure_.value.reasons[0]
258 except (AttributeError, IndexError) as e: 221 except (AttributeError, IndexError) as e:
259 log.warning(_("upload failed: {reason}").format(reason=e)) 222 log.warning(_("upload failed: {reason}").format(reason=e))
260 sat_file.progressError(str(fail)) 223 sat_file.progressError(str(failure_))
261 raise fail
262 else: 224 else:
263 if wrapped_fail.check(SSL.Error): 225 if wrapped_fail.check(sat_web.SSLError):
264 msg = "TLS validation error, can't connect to HTTPS server" 226 msg = "TLS validation error, can't connect to HTTPS server"
265 else: 227 else:
266 msg = "can't upload file" 228 msg = "can't upload file"
267 log.warning(msg + ": " + str(wrapped_fail.value)) 229 log.warning(msg + ": " + str(wrapped_fail.value))
268 sat_file.progressError(msg) 230 sat_file.progressError(msg)
269 231 raise failure_
270 def _gotSlot(self, iq_elt, client):
271 """Slot have been received
272
273 This method convert the iq_elt result to a Slot instance
274 @param iq_elt(domish.Element): <IQ/> result as specified in XEP-0363
275 """
276 try:
277 slot_elt = next(iq_elt.elements(NS_HTTP_UPLOAD, "slot"))
278 put_elt = next(slot_elt.elements(NS_HTTP_UPLOAD, "put"))
279 put_url = put_elt['url']
280 get_elt = next(slot_elt.elements(NS_HTTP_UPLOAD, "get"))
281 get_url = get_elt['url']
282 except (StopIteration, KeyError):
283 raise exceptions.DataError("Incorrect stanza received from server")
284 headers = []
285 for header_elt in put_elt.elements(NS_HTTP_UPLOAD, "header"):
286 try:
287 name = header_elt["name"]
288 value = str(header_elt)
289 except KeyError:
290 log.warning(_("Invalid header element: {xml}").format(
291 iq_elt.toXml()))
292 continue
293 name = name.replace('\n', '')
294 value = value.replace('\n', '')
295 if name.lower() not in ALLOWED_HEADERS:
296 log.warning(_('Ignoring unauthorised header "{name}": {xml}')
297 .format(name=name, xml = iq_elt.toXml()))
298 continue
299 headers.append((name, value))
300
301 slot = Slot(put=put_url, get=get_url, headers=tuple(headers))
302 return slot
303 232
304 def _getSlot(self, filename, size, content_type, upload_jid, 233 def _getSlot(self, filename, size, content_type, upload_jid,
305 profile_key=C.PROF_KEY_NONE): 234 profile_key=C.PROF_KEY_NONE):
306 """Get an upload slot 235 """Get an upload slot
307 236
310 @param size(int): size of the file (must be non null) 239 @param size(int): size of the file (must be non null)
311 @param upload_jid(jid.JID(), None, ''): HTTP upload capable entity 240 @param upload_jid(jid.JID(), None, ''): HTTP upload capable entity
312 @param content_type(unicode, None): MIME type of the content 241 @param content_type(unicode, None): MIME type of the content
313 empty string or None to guess automatically 242 empty string or None to guess automatically
314 """ 243 """
244 client = self.host.getClient(profile_key)
315 filename = filename.replace("/", "_") 245 filename = filename.replace("/", "_")
316 client = self.host.getClient(profile_key) 246 return defer.ensureDeferred(self.getSlot(
317 return self.getSlot(
318 client, filename, size, content_type or None, upload_jid or None 247 client, filename, size, content_type or None, upload_jid or None
319 ) 248 ))
320 249
321 def getSlot(self, client, filename, size, content_type=None, upload_jid=None): 250 async def getSlot(self, client, filename, size, content_type=None, upload_jid=None):
322 """Get a slot (i.e. download/upload links) 251 """Get a slot (i.e. download/upload links)
323 252
324 @param filename(unicode): name to use for the upload 253 @param filename(unicode): name to use for the upload
325 @param size(int): size of the file to upload (must be >0) 254 @param size(int): size of the file to upload (must be >0)
326 @param content_type(None, unicode): MIME type of the content 255 @param content_type(None, unicode): MIME type of the content
338 267
339 if upload_jid is None: 268 if upload_jid is None:
340 try: 269 try:
341 upload_jid = client.http_upload_service 270 upload_jid = client.http_upload_service
342 except AttributeError: 271 except AttributeError:
343 d = self.getHTTPUploadEntity(profile=client.profile) 272 found_entity = await self.getHTTPUploadEntity(profile=client.profile)
344 d.addCallback( 273 return await self.getSlot(
345 lambda found_entity: self.getSlot( 274 client, filename, size, content_type, found_entity)
346 client, filename, size, content_type, found_entity
347 )
348 )
349 return d
350 else: 275 else:
351 if upload_jid is None: 276 if upload_jid is None:
352 raise failure.Failure( 277 raise exceptions.NotFound("No HTTP upload entity found")
353 exceptions.NotFound("No HTTP upload entity found")
354 )
355 278
356 iq_elt = client.IQ("get") 279 iq_elt = client.IQ("get")
357 iq_elt["to"] = upload_jid.full() 280 iq_elt["to"] = upload_jid.full()
358 request_elt = iq_elt.addElement((NS_HTTP_UPLOAD, "request")) 281 request_elt = iq_elt.addElement((NS_HTTP_UPLOAD, "request"))
359 request_elt["filename"] = filename 282 request_elt["filename"] = filename
360 request_elt["size"] = str(size) 283 request_elt["size"] = str(size)
361 if content_type is not None: 284 if content_type is not None:
362 request_elt["content-type"] = content_type 285 request_elt["content-type"] = content_type
363 286
364 d = iq_elt.send() 287 iq_result_elt = await iq_elt.send()
365 d.addCallback(self._gotSlot, client) 288
366 289 try:
367 return d 290 slot_elt = next(iq_result_elt.elements(NS_HTTP_UPLOAD, "slot"))
291 put_elt = next(slot_elt.elements(NS_HTTP_UPLOAD, "put"))
292 put_url = put_elt['url']
293 get_elt = next(slot_elt.elements(NS_HTTP_UPLOAD, "get"))
294 get_url = get_elt['url']
295 except (StopIteration, KeyError):
296 raise exceptions.DataError("Incorrect stanza received from server")
297
298 headers = []
299 for header_elt in put_elt.elements(NS_HTTP_UPLOAD, "header"):
300 try:
301 name = header_elt["name"]
302 value = str(header_elt)
303 except KeyError:
304 log.warning(_("Invalid header element: {xml}").format(
305 iq_result_elt.toXml()))
306 continue
307 name = name.replace('\n', '')
308 value = value.replace('\n', '')
309 if name.lower() not in ALLOWED_HEADERS:
310 log.warning(_('Ignoring unauthorised header "{name}": {xml}')
311 .format(name=name, xml = iq_result_elt.toXml()))
312 continue
313 headers.append((name, value))
314
315 return Slot(put=put_url, get=get_url, headers=headers)
368 316
369 317
370 @implementer(iwokkel.IDisco) 318 @implementer(iwokkel.IDisco)
371 class XEP_0363_handler(XMPPHandler): 319 class XEP_0363_handler(XMPPHandler):
372 320