Mercurial > libervia-web
comparison libervia/server/server.py @ 1483:595e7fef41f3
merge bookmark @
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 12 Nov 2021 17:48:30 +0100 |
parents | fc91b78b71db 83cd4862b134 |
children | 6643855770a5 |
comparison
equal
deleted
inserted
replaced
1454:fc91b78b71db | 1483:595e7fef41f3 |
---|---|
21 import sys | 21 import sys |
22 import urllib.parse | 22 import urllib.parse |
23 import urllib.request, urllib.error | 23 import urllib.request, urllib.error |
24 import time | 24 import time |
25 import copy | 25 import copy |
26 from typing import Optional | |
26 from pathlib import Path | 27 from pathlib import Path |
27 from twisted.application import service | 28 from twisted.application import service |
28 from twisted.internet import reactor, defer, inotify | 29 from twisted.internet import reactor, defer, inotify |
29 from twisted.web import server | 30 from twisted.web import server |
30 from twisted.web import static | 31 from twisted.web import static |
176 def getChildWithDefault(self, path, request): | 177 def getChildWithDefault(self, path, request): |
177 return super().getChildWithDefault(path, request) | 178 return super().getChildWithDefault(path, request) |
178 | 179 |
179 def getChildForRequest(self, request): | 180 def getChildForRequest(self, request): |
180 return super().getChildForRequest(request) | 181 return super().getChildForRequest(request) |
182 | |
181 | 183 |
182 class LiberviaRootResource(ProtectedFile): | 184 class LiberviaRootResource(ProtectedFile): |
183 """Specialized resource for Libervia root | 185 """Specialized resource for Libervia root |
184 | 186 |
185 handle redirections declared in sat.conf | 187 handle redirections declared in sat.conf |
232 | 234 |
233 self.uri_callbacks = {} | 235 self.uri_callbacks = {} |
234 self.pages_redirects = {} | 236 self.pages_redirects = {} |
235 self.cached_urls = {} | 237 self.cached_urls = {} |
236 self.main_menu = None | 238 self.main_menu = None |
237 # map SàT application names => data | 239 # map Libervia application names => data |
238 self.sat_apps = {} | 240 self.libervia_apps = {} |
239 self.build_path = host.getBuildPath(site_name) | 241 self.build_path = host.getBuildPath(site_name) |
240 self.build_path.mkdir(parents=True, exist_ok=True) | 242 self.build_path.mkdir(parents=True, exist_ok=True) |
241 self.dev_build_path = host.getBuildPath(site_name, dev=True) | 243 self.dev_build_path = host.getBuildPath(site_name, dev=True) |
242 self.dev_build_path.mkdir(parents=True, exist_ok=True) | 244 self.dev_build_path.mkdir(parents=True, exist_ok=True) |
243 self.putChild( | 245 self.putChild( |
293 log.info(_( | 295 log.info(_( |
294 "starting application {app_name}").format(app_name=app_name)) | 296 "starting application {app_name}").format(app_name=app_name)) |
295 await self.host.bridgeCall( | 297 await self.host.bridgeCall( |
296 "applicationStart", app_name, data_format.serialise(extra) | 298 "applicationStart", app_name, data_format.serialise(extra) |
297 ) | 299 ) |
298 app_data = self.sat_apps[app_name] = data_format.deserialise( | 300 app_data = self.libervia_apps[app_name] = data_format.deserialise( |
299 await self.host.bridgeCall( | 301 await self.host.bridgeCall( |
300 "applicationExposedGet", app_name, "", "")) | 302 "applicationExposedGet", app_name, "", "")) |
301 | 303 |
302 try: | 304 try: |
303 web_port = int(app_data['ports']['web'].split(':')[1]) | 305 web_port = int(app_data['ports']['web'].split(':')[1]) |
335 | 337 |
336 ## redirections | 338 ## redirections |
337 self.redirections = {} | 339 self.redirections = {} |
338 self.inv_redirections = {} # new URL to old URL map | 340 self.inv_redirections = {} # new URL to old URL map |
339 | 341 |
340 for old, new_data in url_redirections.items(): | 342 for old, new_data_list in url_redirections.items(): |
341 # new_data can be a dictionary or a unicode url | 343 # several redirections can be used for one path by using a list. |
342 if isinstance(new_data, dict): | 344 # The redirection will be done using first item of the list, and all items |
343 # new_data dict must contain either "url", "page" or "path" key | 345 # will be used for inverse redirection. |
344 # (exclusive) | 346 # e.g. if a => [b, c], a will redirect to c, and b and c will both be |
345 # if "path" is used, a file url is constructed with it | 347 # equivalent to a |
346 if len({"path", "url", "page"}.intersection(list(new_data.keys()))) != 1: | 348 if not isinstance(new_data_list, list): |
347 raise ValueError( | 349 new_data_list = [new_data_list] |
348 'You must have one and only one of "url", "page" or "path" key ' | 350 for new_data in new_data_list: |
349 'in your url_redirections_dict data') | 351 # new_data can be a dictionary or a unicode url |
350 if "url" in new_data: | 352 if isinstance(new_data, dict): |
351 new = new_data["url"] | 353 # new_data dict must contain either "url", "page" or "path" key |
352 elif "page" in new_data: | 354 # (exclusive) |
355 # if "path" is used, a file url is constructed with it | |
356 if (( | |
357 len( | |
358 {"path", "url", "page"}.intersection(list(new_data.keys())) | |
359 ) != 1 | |
360 )): | |
361 raise ValueError( | |
362 'You must have one and only one of "url", "page" or "path" ' | |
363 'key in your url_redirections_dict data' | |
364 ) | |
365 if "url" in new_data: | |
366 new = new_data["url"] | |
367 elif "page" in new_data: | |
368 new = new_data | |
369 new["type"] = "page" | |
370 new.setdefault("path_args", []) | |
371 if not isinstance(new["path_args"], list): | |
372 log.error( | |
373 _('"path_args" in redirection of {old} must be a list. ' | |
374 'Ignoring the redirection'.format(old=old))) | |
375 continue | |
376 new.setdefault("query_args", {}) | |
377 if not isinstance(new["query_args"], dict): | |
378 log.error( | |
379 _( | |
380 '"query_args" in redirection of {old} must be a ' | |
381 'dictionary. Ignoring the redirection' | |
382 ).format(old=old) | |
383 ) | |
384 continue | |
385 new["path_args"] = [quote(a) for a in new["path_args"]] | |
386 # we keep an inversed dict of page redirection | |
387 # (page/path_args => redirecting URL) | |
388 # so getURL can return the redirecting URL if the same arguments | |
389 # are used # making the URL consistent | |
390 args_hash = tuple(new["path_args"]) | |
391 self.pages_redirects.setdefault(new_data["page"], {}).setdefault( | |
392 args_hash, | |
393 old | |
394 ) | |
395 | |
396 # we need lists in query_args because it will be used | |
397 # as it in request.path_args | |
398 for k, v in new["query_args"].items(): | |
399 if isinstance(v, str): | |
400 new["query_args"][k] = [v] | |
401 elif "path" in new_data: | |
402 new = "file:{}".format(urllib.parse.quote(new_data["path"])) | |
403 elif isinstance(new_data, str): | |
353 new = new_data | 404 new = new_data |
354 new["type"] = "page" | 405 new_data = {} |
355 new.setdefault("path_args", []) | 406 else: |
356 if not isinstance(new["path_args"], list): | 407 log.error( |
357 log.error( | 408 _("ignoring invalid redirection value: {new_data}").format( |
358 _('"path_args" in redirection of {old} must be a list. ' | 409 new_data=new_data |
359 'Ignoring the redirection'.format(old=old))) | 410 ) |
411 ) | |
412 continue | |
413 | |
414 # some normalization | |
415 if not old.strip(): | |
416 # root URL special case | |
417 old = "" | |
418 elif not old.startswith("/"): | |
419 log.error( | |
420 _("redirected url must start with '/', got {value}. Ignoring") | |
421 .format(value=old) | |
422 ) | |
423 continue | |
424 else: | |
425 old = self._normalizeURL(old) | |
426 | |
427 if isinstance(new, dict): | |
428 # dict are handled differently, they contain data | |
429 # which ared use dynamically when the request is done | |
430 self.redirections.setdefault(old, new) | |
431 if not old: | |
432 if new["type"] == "page": | |
433 log.info( | |
434 _("Root URL redirected to page {name}").format( | |
435 name=new["page"] | |
436 ) | |
437 ) | |
438 else: | |
439 if new["type"] == "page": | |
440 page = self.getPageByName(new["page"]) | |
441 url = page.getURL(*new.get("path_args", [])) | |
442 self.inv_redirections[url] = old | |
443 continue | |
444 | |
445 # at this point we have a redirection URL in new, we can parse it | |
446 new_url = urllib.parse.urlsplit(new) | |
447 | |
448 # we handle the known URL schemes | |
449 if new_url.scheme == "xmpp": | |
450 location = self.getPagePathFromURI(new) | |
451 if location is None: | |
452 log.warning( | |
453 _("ignoring redirection, no page found to handle this URI: " | |
454 "{uri}").format(uri=new)) | |
360 continue | 455 continue |
361 new.setdefault("query_args", {}) | 456 request_data = self._getRequestData(location) |
362 if not isinstance(new["query_args"], dict): | 457 self.inv_redirections[location] = old |
363 log.error( | 458 |
364 _( | 459 elif new_url.scheme in ("", "http", "https"): |
365 '"query_args" in redirection of {old} must be a ' | 460 # direct redirection |
366 'dictionary. Ignoring the redirection'.format(old=old))) | 461 if new_url.netloc: |
462 raise NotImplementedError( | |
463 "netloc ({netloc}) is not implemented yet for " | |
464 "url_redirections_dict, it is not possible to redirect to an " | |
465 "external website".format(netloc=new_url.netloc)) | |
466 location = urllib.parse.urlunsplit( | |
467 ("", "", new_url.path, new_url.query, new_url.fragment) | |
468 ) | |
469 request_data = self._getRequestData(location) | |
470 self.inv_redirections[location] = old | |
471 | |
472 elif new_url.scheme == "file": | |
473 # file or directory | |
474 if new_url.netloc: | |
475 raise NotImplementedError( | |
476 "netloc ({netloc}) is not implemented for url redirection to " | |
477 "file system, it is not possible to redirect to an external " | |
478 "host".format( | |
479 netloc=new_url.netloc)) | |
480 path = urllib.parse.unquote(new_url.path) | |
481 if not os.path.isabs(path): | |
482 raise ValueError( | |
483 "file redirection must have an absolute path: e.g. " | |
484 "file:/path/to/my/file") | |
485 # for file redirection, we directly put child here | |
486 resource_class = ( | |
487 ProtectedFile if new_data.get("protected", True) else static.File | |
488 ) | |
489 res = resource_class(path, defaultType="application/octet-stream") | |
490 self.addResourceToPath(old, res) | |
491 log.info("[{host_name}] Added redirection from /{old} to file system " | |
492 "path {path}".format(host_name=self.host_name, | |
493 old=old, | |
494 path=path)) | |
495 | |
496 # we don't want to use redirection system, so we continue here | |
497 continue | |
498 | |
499 elif new_url.scheme == "libervia-app": | |
500 # a Libervia application | |
501 | |
502 app_name = urllib.parse.unquote(new_url.path).lower().strip() | |
503 extra = {"url_prefix": f"/{old}"} | |
504 try: | |
505 await self._startApp(app_name, extra) | |
506 except Exception as e: | |
507 log.warning(_( | |
508 "Can't launch {app_name!r} for path /{old}: {e}").format( | |
509 app_name=app_name, old=old, e=e)) | |
367 continue | 510 continue |
368 new["path_args"] = [quote(a) for a in new["path_args"]] | 511 |
369 # we keep an inversed dict of page redirection | 512 log.info("[{host_name}] Added redirection from /{old} to application " |
370 # (page/path_args => redirecting URL) | 513 "{app_name}".format( |
371 # so getURL can return the redirecting URL if the same arguments | 514 host_name=self.host_name, |
372 # are used # making the URL consistent | 515 old=old, |
373 args_hash = tuple(new["path_args"]) | 516 app_name=app_name)) |
374 self.pages_redirects.setdefault(new_data["page"], {})[ | 517 |
375 args_hash | 518 # normal redirection system is not used here |
376 ] = old | |
377 | |
378 # we need lists in query_args because it will be used | |
379 # as it in request.path_args | |
380 for k, v in new["query_args"].items(): | |
381 if isinstance(v, str): | |
382 new["query_args"][k] = [v] | |
383 elif "path" in new_data: | |
384 new = "file:{}".format(urllib.parse.quote(new_data["path"])) | |
385 elif isinstance(new_data, str): | |
386 new = new_data | |
387 new_data = {} | |
388 else: | |
389 log.error( | |
390 _("ignoring invalid redirection value: {new_data}").format( | |
391 new_data=new_data | |
392 ) | |
393 ) | |
394 continue | |
395 | |
396 # some normalization | |
397 if not old.strip(): | |
398 # root URL special case | |
399 old = "" | |
400 elif not old.startswith("/"): | |
401 log.error(_("redirected url must start with '/', got {value}. Ignoring") | |
402 .format(value=old)) | |
403 continue | |
404 else: | |
405 old = self._normalizeURL(old) | |
406 | |
407 if isinstance(new, dict): | |
408 # dict are handled differently, they contain data | |
409 # which ared use dynamically when the request is done | |
410 self.redirections[old] = new | |
411 if not old: | |
412 if new["type"] == "page": | |
413 log.info( | |
414 _("Root URL redirected to page {name}").format( | |
415 name=new["page"] | |
416 ) | |
417 ) | |
418 else: | |
419 if new["type"] == "page": | |
420 page = self.getPageByName(new["page"]) | |
421 url = page.getURL(*new.get("path_args", [])) | |
422 self.inv_redirections[url] = old | |
423 continue | |
424 | |
425 # at this point we have a redirection URL in new, we can parse it | |
426 new_url = urllib.parse.urlsplit(new) | |
427 | |
428 # we handle the known URL schemes | |
429 if new_url.scheme == "xmpp": | |
430 location = self.getPagePathFromURI(new) | |
431 if location is None: | |
432 log.warning( | |
433 _("ignoring redirection, no page found to handle this URI: " | |
434 "{uri}").format(uri=new)) | |
435 continue | 519 continue |
436 request_data = self._getRequestData(location) | |
437 if old: | |
438 self.inv_redirections[location] = old | |
439 | |
440 elif new_url.scheme in ("", "http", "https"): | |
441 # direct redirection | |
442 if new_url.netloc: | |
443 raise NotImplementedError( | |
444 "netloc ({netloc}) is not implemented yet for " | |
445 "url_redirections_dict, it is not possible to redirect to an " | |
446 "external website".format(netloc=new_url.netloc)) | |
447 location = urllib.parse.urlunsplit( | |
448 ("", "", new_url.path, new_url.query, new_url.fragment) | |
449 ) | |
450 request_data = self._getRequestData(location) | |
451 if old: | |
452 self.inv_redirections[location] = old | |
453 | |
454 elif new_url.scheme == "file": | |
455 # file or directory | |
456 if new_url.netloc: | |
457 raise NotImplementedError( | |
458 "netloc ({netloc}) is not implemented for url redirection to " | |
459 "file system, it is not possible to redirect to an external " | |
460 "host".format( | |
461 netloc=new_url.netloc)) | |
462 path = urllib.parse.unquote(new_url.path) | |
463 if not os.path.isabs(path): | |
464 raise ValueError( | |
465 "file redirection must have an absolute path: e.g. " | |
466 "file:/path/to/my/file") | |
467 # for file redirection, we directly put child here | |
468 resource_class = ( | |
469 ProtectedFile if new_data.get("protected", True) else static.File | |
470 ) | |
471 res = resource_class(path, defaultType="application/octet-stream") | |
472 self.addResourceToPath(old, res) | |
473 log.info("[{host_name}] Added redirection from /{old} to file system " | |
474 "path {path}".format(host_name=self.host_name, | |
475 old=old, | |
476 path=path)) | |
477 | |
478 # we don't want to use redirection system, so we continue here | |
479 continue | |
480 | |
481 elif new_url.scheme == "sat-app": | |
482 # a SàT application | |
483 | |
484 app_name = urllib.parse.unquote(new_url.path).lower().strip() | |
485 extra = {"url_prefix": f"/{old}"} | |
486 try: | |
487 await self._startApp(app_name, extra) | |
488 except Exception as e: | |
489 log.warning(_( | |
490 "Can't launch {app_name!r} for path /{old}: {e}").format( | |
491 app_name=app_name, old=old, e=e)) | |
492 continue | |
493 | |
494 log.info("[{host_name}] Added redirection from /{old} to application " | |
495 "{app_name}".format( | |
496 host_name=self.host_name, | |
497 old=old, | |
498 app_name=app_name)) | |
499 | |
500 # normal redirection system is not used here | |
501 continue | |
502 elif new_url.scheme == "proxy": | 520 elif new_url.scheme == "proxy": |
503 # a reverse proxy | 521 # a reverse proxy |
504 host, port = new_url.hostname, new_url.port | 522 host, port = new_url.hostname, new_url.port |
505 if host is None or port is None: | 523 if host is None or port is None: |
506 raise ValueError( | 524 raise ValueError( |
519 f"{new_url.netloc} with URL prefix {url_prefix}/" | 537 f"{new_url.netloc} with URL prefix {url_prefix}/" |
520 ) | 538 ) |
521 | 539 |
522 # normal redirection system is not used here | 540 # normal redirection system is not used here |
523 continue | 541 continue |
524 else: | 542 else: |
525 raise NotImplementedError( | 543 raise NotImplementedError( |
526 "{scheme}: scheme is not managed for url_redirections_dict".format( | 544 "{scheme}: scheme is not managed for url_redirections_dict".format( |
527 scheme=new_url.scheme | 545 scheme=new_url.scheme |
546 ) | |
528 ) | 547 ) |
529 ) | 548 |
530 | 549 self.redirections.setdefault(old, request_data) |
531 self.redirections[old] = request_data | 550 if not old: |
532 if not old: | 551 log.info(_("[{host_name}] Root URL redirected to {uri}") |
533 log.info(_("[{host_name}] Root URL redirected to {uri}") | 552 .format(host_name=self.host_name, |
534 .format(host_name=self.host_name, | 553 uri=request_data[1])) |
535 uri=request_data[1])) | |
536 | 554 |
537 # the default root URL, if not redirected | 555 # the default root URL, if not redirected |
538 if not "" in self.redirections: | 556 if not "" in self.redirections: |
539 self.redirections[""] = self._getRequestData(C.LIBERVIA_PAGE_START) | 557 self.redirections[""] = self._getRequestData(C.LIBERVIA_PAGE_START) |
540 | 558 |
552 "menu item as list must be in the form [page_name, absolue URL]" | 570 "menu item as list must be in the form [page_name, absolue URL]" |
553 ) | 571 ) |
554 log.error(msg) | 572 log.error(msg) |
555 raise ValueError(msg) | 573 raise ValueError(msg) |
556 page_name, url = menu | 574 page_name, url = menu |
557 elif menu.startswith("sat-app:"): | 575 elif menu.startswith("libervia-app:"): |
558 app_name = menu[8:].strip().lower() | 576 app_name = menu[13:].strip().lower() |
559 app_data = await self._startApp(app_name) | 577 app_data = await self._startApp(app_name) |
560 front_url = app_data['front_url'] | 578 front_url = app_data['front_url'] |
561 options = self.host.options | 579 options = self.host.options |
562 url_redirections = options["url_redirections_dict"].setdefault( | 580 url_redirections = options["url_redirections_dict"].setdefault( |
563 self.site_name, {}) | 581 self.site_name, {}) |
744 | 762 |
745 if isinstance(resource, web_resource.NoResource): | 763 if isinstance(resource, web_resource.NoResource): |
746 # if nothing was found, we try our luck with redirections | 764 # if nothing was found, we try our luck with redirections |
747 # XXX: we want redirections to happen only if everything else failed | 765 # XXX: we want redirections to happen only if everything else failed |
748 path_elt = request.prepath + request.postpath | 766 path_elt = request.prepath + request.postpath |
749 for idx in range(len(path_elt), 0, -1): | 767 for idx in range(len(path_elt), -1, -1): |
750 test_url = b"/".join(path_elt[:idx]).decode('utf-8').lower() | 768 test_url = b"/".join(path_elt[:idx]).decode('utf-8').lower() |
751 if test_url in self.redirections: | 769 if test_url in self.redirections: |
752 request_data = self.redirections[test_url] | 770 request_data = self.redirections[test_url] |
753 request.postpath = path_elt[idx:] | 771 request.postpath = path_elt[idx:] |
754 return self._redirect(request, request_data) | 772 return self._redirect(request, request_data) |
1658 @return (urlparse.SplitResult): SplitResult instance with only scheme and | 1676 @return (urlparse.SplitResult): SplitResult instance with only scheme and |
1659 netloc filled | 1677 netloc filled |
1660 """ | 1678 """ |
1661 ext_data = self.base_url_ext_data | 1679 ext_data = self.base_url_ext_data |
1662 url_path = request.URLPath() | 1680 url_path = request.URLPath() |
1663 if not ext_data.scheme or not ext_data.netloc: | 1681 |
1664 # ext_data is not specified, we check headers | 1682 try: |
1665 if request.requestHeaders.hasHeader("x-forwarded-host"): | 1683 forwarded = request.requestHeaders.getRawHeaders( |
1666 # we are behing a proxy | 1684 "forwarded" |
1667 # we fill proxy_scheme and proxy_netloc value | 1685 )[0] |
1668 proxy_host = request.requestHeaders.getRawHeaders("x-forwarded-host")[0] | 1686 except TypeError: |
1669 try: | 1687 # we try deprecated headers |
1670 proxy_server = request.requestHeaders.getRawHeaders( | 1688 try: |
1671 "x-forwarded-server" | 1689 proxy_netloc = request.requestHeaders.getRawHeaders( |
1672 )[0] | 1690 "x-forwarded-host" |
1673 except TypeError: | 1691 )[0] |
1674 # no x-forwarded-server found, we use proxy_host | 1692 except TypeError: |
1675 proxy_netloc = proxy_host | 1693 proxy_netloc = None |
1676 else: | 1694 try: |
1677 # if the proxy host has a port, we use it with server name | 1695 proxy_scheme = request.requestHeaders.getRawHeaders( |
1678 proxy_port = urllib.parse.urlsplit("//{}".format(proxy_host)).port | 1696 "x-forwarded-proto" |
1679 proxy_netloc = ( | 1697 )[0] |
1680 "{}:{}".format(proxy_server, proxy_port) | 1698 except TypeError: |
1681 if proxy_port is not None | 1699 proxy_scheme = None |
1682 else proxy_server | |
1683 ) | |
1684 try: | |
1685 proxy_scheme = request.requestHeaders.getRawHeaders( | |
1686 "x-forwarded-proto" | |
1687 )[0] | |
1688 except TypeError: | |
1689 proxy_scheme = None | |
1690 else: | |
1691 proxy_scheme, proxy_netloc = None, None | |
1692 else: | 1700 else: |
1693 proxy_scheme, proxy_netloc = None, None | 1701 fwd_data = { |
1702 k.strip(): v.strip() | |
1703 for k,v in (d.split("=") for d in forwarded.split(";")) | |
1704 } | |
1705 proxy_netloc = fwd_data.get("host") | |
1706 proxy_scheme = fwd_data.get("proto") | |
1694 | 1707 |
1695 return urllib.parse.SplitResult( | 1708 return urllib.parse.SplitResult( |
1696 ext_data.scheme or proxy_scheme or url_path.scheme.decode("utf-8"), | 1709 ext_data.scheme or proxy_scheme or url_path.scheme.decode(), |
1697 ext_data.netloc or proxy_netloc or url_path.netloc.decode("utf-8"), | 1710 ext_data.netloc or proxy_netloc or url_path.netloc.decode(), |
1698 ext_data.path or "/", | 1711 ext_data.path or "/", |
1699 "", | 1712 "", |
1700 "", | 1713 "", |
1701 ) | 1714 ) |
1702 | 1715 |
1703 def getExtBaseURL(self, request, path="", query="", fragment="", scheme=None): | 1716 def getExtBaseURL( |
1717 self, | |
1718 request: server.Request, | |
1719 path: str = "", | |
1720 query: str = "", | |
1721 fragment: str = "", | |
1722 scheme: Optional[str] = None, | |
1723 ) -> str: | |
1704 """Get external URL according to given elements | 1724 """Get external URL according to given elements |
1705 | 1725 |
1706 external URL is the URL seen by external user | 1726 external URL is the URL seen by external user |
1707 @param path(unicode): same as for urlsplit.urlsplit | 1727 @param path: same as for urlsplit.urlsplit |
1708 path will be prefixed to follow found external URL if suitable | 1728 path will be prefixed to follow found external URL if suitable |
1709 @param params(unicode): same as for urlsplit.urlsplit | 1729 @param params: same as for urlsplit.urlsplit |
1710 @param query(unicode): same as for urlsplit.urlsplit | 1730 @param query: same as for urlsplit.urlsplit |
1711 @param fragment(unicode): same as for urlsplit.urlsplit | 1731 @param fragment: same as for urlsplit.urlsplit |
1712 @param scheme(unicode, None): if not None, will override scheme from base URL | 1732 @param scheme: if not None, will override scheme from base URL |
1713 @return (unicode): external URL | 1733 @return: external URL |
1714 """ | 1734 """ |
1715 split_result = self.getExtBaseURLData(request) | 1735 split_result = self.getExtBaseURLData(request) |
1716 return urllib.parse.urlunsplit( | 1736 return urllib.parse.urlunsplit( |
1717 ( | 1737 ( |
1718 split_result.scheme if scheme is None else scheme, | 1738 split_result.scheme if scheme is None else scheme, |
1721 query, | 1741 query, |
1722 fragment, | 1742 fragment, |
1723 ) | 1743 ) |
1724 ) | 1744 ) |
1725 | 1745 |
1726 def checkRedirection(self, vhost_root, url): | 1746 def checkRedirection(self, vhost_root: LiberviaRootResource, url_path: str) -> str: |
1727 """check is a part of the URL prefix is redirected then replace it | 1747 """check is a part of the URL prefix is redirected then replace it |
1728 | 1748 |
1729 @param vhost_root(web_resource.Resource): root of this virtual host | 1749 @param vhost_root: root of this virtual host |
1730 @param url(unicode): url to check | 1750 @param url_path: path of the url to check |
1731 @return (unicode): possibly redirected URL which should link to the same location | 1751 @return: possibly redirected URL which should link to the same location |
1732 """ | 1752 """ |
1733 inv_redirections = vhost_root.inv_redirections | 1753 inv_redirections = vhost_root.inv_redirections |
1734 url_parts = url.strip("/").split("/") | 1754 url_parts = url_path.strip("/").split("/") |
1735 for idx in range(len(url), 0, -1): | 1755 for idx in range(len(url_parts), -1, -1): |
1736 test_url = "/" + "/".join(url_parts[:idx]) | 1756 test_url = "/" + "/".join(url_parts[:idx]) |
1737 if test_url in inv_redirections: | 1757 if test_url in inv_redirections: |
1738 rem_url = url_parts[idx:] | 1758 rem_url = url_parts[idx:] |
1739 return os.path.join( | 1759 return os.path.join( |
1740 "/", "/".join([inv_redirections[test_url]] + rem_url) | 1760 "/", "/".join([inv_redirections[test_url]] + rem_url) |
1741 ) | 1761 ) |
1742 return url | 1762 return url_path |
1743 | 1763 |
1744 ## Sessions ## | 1764 ## Sessions ## |
1745 | 1765 |
1746 def purgeSession(self, request): | 1766 def purgeSession(self, request): |
1747 """helper method to purge a session during request handling""" | 1767 """helper method to purge a session during request handling""" |