Mercurial > libervia-web
comparison libervia/server/server.py @ 1457:792a2e902ee9
server: fix inverse URL redirection for root path + allow multiple inverse redirections:
- inverse redirection is now working for root path
- a list can now be used in `url_redirections_dict`: the first item only will be used for
redirection, but all items will be used for inverse redirection.
e.g.: if in `url_redirections_dict` we have `"/": ["/u/some_user/blog",
"/blog/view/some_user_jid@example.org"]`, root will redirect to "/u/some_user/blog", but
both "/u/some_user/blog" and "/blog/view/some_user_jid@example.org" will have an inverse
redirection to the root path
fix 395
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 29 Sep 2021 17:39:06 +0200 |
parents | 396d5606477f |
children | db13f5c768a0 |
comparison
equal
deleted
inserted
replaced
1456:284522d8af44 | 1457:792a2e902ee9 |
---|---|
335 | 335 |
336 ## redirections | 336 ## redirections |
337 self.redirections = {} | 337 self.redirections = {} |
338 self.inv_redirections = {} # new URL to old URL map | 338 self.inv_redirections = {} # new URL to old URL map |
339 | 339 |
340 for old, new_data in url_redirections.items(): | 340 for old, new_data_list in url_redirections.items(): |
341 # new_data can be a dictionary or a unicode url | 341 # several redirections can be used for one path by using a list. |
342 if isinstance(new_data, dict): | 342 # The redirection will be done using first item of the list, and all items |
343 # new_data dict must contain either "url", "page" or "path" key | 343 # will be used for inverse redirection. |
344 # (exclusive) | 344 # e.g. if a => [b, c], a will redirect to c, and b and c will both be |
345 # if "path" is used, a file url is constructed with it | 345 # equivalent to a |
346 if len({"path", "url", "page"}.intersection(list(new_data.keys()))) != 1: | 346 if not isinstance(new_data_list, list): |
347 raise ValueError( | 347 new_data_list = [new_data_list] |
348 'You must have one and only one of "url", "page" or "path" key ' | 348 for new_data in new_data_list: |
349 'in your url_redirections_dict data') | 349 # new_data can be a dictionary or a unicode url |
350 if "url" in new_data: | 350 if isinstance(new_data, dict): |
351 new = new_data["url"] | 351 # new_data dict must contain either "url", "page" or "path" key |
352 elif "page" in new_data: | 352 # (exclusive) |
353 # if "path" is used, a file url is constructed with it | |
354 if (( | |
355 len( | |
356 {"path", "url", "page"}.intersection(list(new_data.keys())) | |
357 ) != 1 | |
358 )): | |
359 raise ValueError( | |
360 'You must have one and only one of "url", "page" or "path" ' | |
361 'key in your url_redirections_dict data' | |
362 ) | |
363 if "url" in new_data: | |
364 new = new_data["url"] | |
365 elif "page" in new_data: | |
366 new = new_data | |
367 new["type"] = "page" | |
368 new.setdefault("path_args", []) | |
369 if not isinstance(new["path_args"], list): | |
370 log.error( | |
371 _('"path_args" in redirection of {old} must be a list. ' | |
372 'Ignoring the redirection'.format(old=old))) | |
373 continue | |
374 new.setdefault("query_args", {}) | |
375 if not isinstance(new["query_args"], dict): | |
376 log.error( | |
377 _( | |
378 '"query_args" in redirection of {old} must be a ' | |
379 'dictionary. Ignoring the redirection' | |
380 ).format(old=old) | |
381 ) | |
382 continue | |
383 new["path_args"] = [quote(a) for a in new["path_args"]] | |
384 # we keep an inversed dict of page redirection | |
385 # (page/path_args => redirecting URL) | |
386 # so getURL can return the redirecting URL if the same arguments | |
387 # are used # making the URL consistent | |
388 args_hash = tuple(new["path_args"]) | |
389 self.pages_redirects.setdefault(new_data["page"], {}).setdefault( | |
390 args_hash, | |
391 old | |
392 ) | |
393 | |
394 # we need lists in query_args because it will be used | |
395 # as it in request.path_args | |
396 for k, v in new["query_args"].items(): | |
397 if isinstance(v, str): | |
398 new["query_args"][k] = [v] | |
399 elif "path" in new_data: | |
400 new = "file:{}".format(urllib.parse.quote(new_data["path"])) | |
401 elif isinstance(new_data, str): | |
353 new = new_data | 402 new = new_data |
354 new["type"] = "page" | 403 new_data = {} |
355 new.setdefault("path_args", []) | 404 else: |
356 if not isinstance(new["path_args"], list): | 405 log.error( |
357 log.error( | 406 _("ignoring invalid redirection value: {new_data}").format( |
358 _('"path_args" in redirection of {old} must be a list. ' | 407 new_data=new_data |
359 'Ignoring the redirection'.format(old=old))) | 408 ) |
409 ) | |
410 continue | |
411 | |
412 # some normalization | |
413 if not old.strip(): | |
414 # root URL special case | |
415 old = "" | |
416 elif not old.startswith("/"): | |
417 log.error( | |
418 _("redirected url must start with '/', got {value}. Ignoring") | |
419 .format(value=old) | |
420 ) | |
421 continue | |
422 else: | |
423 old = self._normalizeURL(old) | |
424 | |
425 if isinstance(new, dict): | |
426 # dict are handled differently, they contain data | |
427 # which ared use dynamically when the request is done | |
428 self.redirections.setdefault(old, new) | |
429 if not old: | |
430 if new["type"] == "page": | |
431 log.info( | |
432 _("Root URL redirected to page {name}").format( | |
433 name=new["page"] | |
434 ) | |
435 ) | |
436 else: | |
437 if new["type"] == "page": | |
438 page = self.getPageByName(new["page"]) | |
439 url = page.getURL(*new.get("path_args", [])) | |
440 self.inv_redirections[url] = old | |
441 continue | |
442 | |
443 # at this point we have a redirection URL in new, we can parse it | |
444 new_url = urllib.parse.urlsplit(new) | |
445 | |
446 # we handle the known URL schemes | |
447 if new_url.scheme == "xmpp": | |
448 location = self.getPagePathFromURI(new) | |
449 if location is None: | |
450 log.warning( | |
451 _("ignoring redirection, no page found to handle this URI: " | |
452 "{uri}").format(uri=new)) | |
360 continue | 453 continue |
361 new.setdefault("query_args", {}) | 454 request_data = self._getRequestData(location) |
362 if not isinstance(new["query_args"], dict): | 455 self.inv_redirections[location] = old |
363 log.error( | 456 |
364 _( | 457 elif new_url.scheme in ("", "http", "https"): |
365 '"query_args" in redirection of {old} must be a ' | 458 # direct redirection |
366 'dictionary. Ignoring the redirection'.format(old=old))) | 459 if new_url.netloc: |
460 raise NotImplementedError( | |
461 "netloc ({netloc}) is not implemented yet for " | |
462 "url_redirections_dict, it is not possible to redirect to an " | |
463 "external website".format(netloc=new_url.netloc)) | |
464 location = urllib.parse.urlunsplit( | |
465 ("", "", new_url.path, new_url.query, new_url.fragment) | |
466 ) | |
467 request_data = self._getRequestData(location) | |
468 self.inv_redirections[location] = old | |
469 | |
470 elif new_url.scheme == "file": | |
471 # file or directory | |
472 if new_url.netloc: | |
473 raise NotImplementedError( | |
474 "netloc ({netloc}) is not implemented for url redirection to " | |
475 "file system, it is not possible to redirect to an external " | |
476 "host".format( | |
477 netloc=new_url.netloc)) | |
478 path = urllib.parse.unquote(new_url.path) | |
479 if not os.path.isabs(path): | |
480 raise ValueError( | |
481 "file redirection must have an absolute path: e.g. " | |
482 "file:/path/to/my/file") | |
483 # for file redirection, we directly put child here | |
484 resource_class = ( | |
485 ProtectedFile if new_data.get("protected", True) else static.File | |
486 ) | |
487 res = resource_class(path, defaultType="application/octet-stream") | |
488 self.addResourceToPath(old, res) | |
489 log.info("[{host_name}] Added redirection from /{old} to file system " | |
490 "path {path}".format(host_name=self.host_name, | |
491 old=old, | |
492 path=path)) | |
493 | |
494 # we don't want to use redirection system, so we continue here | |
495 continue | |
496 | |
497 elif new_url.scheme == "libervia-app": | |
498 # a Libervia application | |
499 | |
500 app_name = urllib.parse.unquote(new_url.path).lower().strip() | |
501 extra = {"url_prefix": f"/{old}"} | |
502 try: | |
503 await self._startApp(app_name, extra) | |
504 except Exception as e: | |
505 log.warning(_( | |
506 "Can't launch {app_name!r} for path /{old}: {e}").format( | |
507 app_name=app_name, old=old, e=e)) | |
367 continue | 508 continue |
368 new["path_args"] = [quote(a) for a in new["path_args"]] | 509 |
369 # we keep an inversed dict of page redirection | 510 log.info("[{host_name}] Added redirection from /{old} to application " |
370 # (page/path_args => redirecting URL) | 511 "{app_name}".format( |
371 # so getURL can return the redirecting URL if the same arguments | 512 host_name=self.host_name, |
372 # are used # making the URL consistent | 513 old=old, |
373 args_hash = tuple(new["path_args"]) | 514 app_name=app_name)) |
374 self.pages_redirects.setdefault(new_data["page"], {})[ | 515 |
375 args_hash | 516 # normal redirection system is not used here |
376 ] = old | 517 continue |
377 | 518 else: |
378 # we need lists in query_args because it will be used | 519 raise NotImplementedError( |
379 # as it in request.path_args | 520 "{scheme}: scheme is not managed for url_redirections_dict".format( |
380 for k, v in new["query_args"].items(): | 521 scheme=new_url.scheme |
381 if isinstance(v, str): | 522 ) |
382 new["query_args"][k] = [v] | |
383 elif "path" in new_data: | |
384 new = "file:{}".format(urllib.parse.quote(new_data["path"])) | |
385 elif isinstance(new_data, str): | |
386 new = new_data | |
387 new_data = {} | |
388 else: | |
389 log.error( | |
390 _("ignoring invalid redirection value: {new_data}").format( | |
391 new_data=new_data | |
392 ) | 523 ) |
393 ) | 524 |
394 continue | 525 self.redirections.setdefault(old, request_data) |
395 | |
396 # some normalization | |
397 if not old.strip(): | |
398 # root URL special case | |
399 old = "" | |
400 elif not old.startswith("/"): | |
401 log.error(_("redirected url must start with '/', got {value}. Ignoring") | |
402 .format(value=old)) | |
403 continue | |
404 else: | |
405 old = self._normalizeURL(old) | |
406 | |
407 if isinstance(new, dict): | |
408 # dict are handled differently, they contain data | |
409 # which ared use dynamically when the request is done | |
410 self.redirections[old] = new | |
411 if not old: | 526 if not old: |
412 if new["type"] == "page": | 527 log.info(_("[{host_name}] Root URL redirected to {uri}") |
413 log.info( | 528 .format(host_name=self.host_name, |
414 _("Root URL redirected to page {name}").format( | 529 uri=request_data[1])) |
415 name=new["page"] | |
416 ) | |
417 ) | |
418 else: | |
419 if new["type"] == "page": | |
420 page = self.getPageByName(new["page"]) | |
421 url = page.getURL(*new.get("path_args", [])) | |
422 self.inv_redirections[url] = old | |
423 continue | |
424 | |
425 # at this point we have a redirection URL in new, we can parse it | |
426 new_url = urllib.parse.urlsplit(new) | |
427 | |
428 # we handle the known URL schemes | |
429 if new_url.scheme == "xmpp": | |
430 location = self.getPagePathFromURI(new) | |
431 if location is None: | |
432 log.warning( | |
433 _("ignoring redirection, no page found to handle this URI: " | |
434 "{uri}").format(uri=new)) | |
435 continue | |
436 request_data = self._getRequestData(location) | |
437 if old: | |
438 self.inv_redirections[location] = old | |
439 | |
440 elif new_url.scheme in ("", "http", "https"): | |
441 # direct redirection | |
442 if new_url.netloc: | |
443 raise NotImplementedError( | |
444 "netloc ({netloc}) is not implemented yet for " | |
445 "url_redirections_dict, it is not possible to redirect to an " | |
446 "external website".format(netloc=new_url.netloc)) | |
447 location = urllib.parse.urlunsplit( | |
448 ("", "", new_url.path, new_url.query, new_url.fragment) | |
449 ) | |
450 request_data = self._getRequestData(location) | |
451 if old: | |
452 self.inv_redirections[location] = old | |
453 | |
454 elif new_url.scheme == "file": | |
455 # file or directory | |
456 if new_url.netloc: | |
457 raise NotImplementedError( | |
458 "netloc ({netloc}) is not implemented for url redirection to " | |
459 "file system, it is not possible to redirect to an external " | |
460 "host".format( | |
461 netloc=new_url.netloc)) | |
462 path = urllib.parse.unquote(new_url.path) | |
463 if not os.path.isabs(path): | |
464 raise ValueError( | |
465 "file redirection must have an absolute path: e.g. " | |
466 "file:/path/to/my/file") | |
467 # for file redirection, we directly put child here | |
468 resource_class = ( | |
469 ProtectedFile if new_data.get("protected", True) else static.File | |
470 ) | |
471 res = resource_class(path, defaultType="application/octet-stream") | |
472 self.addResourceToPath(old, res) | |
473 log.info("[{host_name}] Added redirection from /{old} to file system " | |
474 "path {path}".format(host_name=self.host_name, | |
475 old=old, | |
476 path=path)) | |
477 | |
478 # we don't want to use redirection system, so we continue here | |
479 continue | |
480 | |
481 elif new_url.scheme == "sat-app": | |
482 # a SàT application | |
483 | |
484 app_name = urllib.parse.unquote(new_url.path).lower().strip() | |
485 extra = {"url_prefix": f"/{old}"} | |
486 try: | |
487 await self._startApp(app_name, extra) | |
488 except Exception as e: | |
489 log.warning(_( | |
490 "Can't launch {app_name!r} for path /{old}: {e}").format( | |
491 app_name=app_name, old=old, e=e)) | |
492 continue | |
493 | |
494 log.info("[{host_name}] Added redirection from /{old} to application " | |
495 "{app_name}".format( | |
496 host_name=self.host_name, | |
497 old=old, | |
498 app_name=app_name)) | |
499 | |
500 # normal redirection system is not used here | |
501 continue | |
502 else: | |
503 raise NotImplementedError( | |
504 "{scheme}: scheme is not managed for url_redirections_dict".format( | |
505 scheme=new_url.scheme | |
506 ) | |
507 ) | |
508 | |
509 self.redirections[old] = request_data | |
510 if not old: | |
511 log.info(_("[{host_name}] Root URL redirected to {uri}") | |
512 .format(host_name=self.host_name, | |
513 uri=request_data[1])) | |
514 | 530 |
515 # the default root URL, if not redirected | 531 # the default root URL, if not redirected |
516 if not "" in self.redirections: | 532 if not "" in self.redirections: |
517 self.redirections[""] = self._getRequestData(C.LIBERVIA_PAGE_START) | 533 self.redirections[""] = self._getRequestData(C.LIBERVIA_PAGE_START) |
518 | 534 |
722 | 738 |
723 if isinstance(resource, web_resource.NoResource): | 739 if isinstance(resource, web_resource.NoResource): |
724 # if nothing was found, we try our luck with redirections | 740 # if nothing was found, we try our luck with redirections |
725 # XXX: we want redirections to happen only if everything else failed | 741 # XXX: we want redirections to happen only if everything else failed |
726 path_elt = request.prepath + request.postpath | 742 path_elt = request.prepath + request.postpath |
727 for idx in range(len(path_elt), 0, -1): | 743 for idx in range(len(path_elt), -1, -1): |
728 test_url = b"/".join(path_elt[:idx]).decode('utf-8').lower() | 744 test_url = b"/".join(path_elt[:idx]).decode('utf-8').lower() |
729 if test_url in self.redirections: | 745 if test_url in self.redirections: |
730 request_data = self.redirections[test_url] | 746 request_data = self.redirections[test_url] |
731 request.postpath = path_elt[idx:] | 747 request.postpath = path_elt[idx:] |
732 return self._redirect(request, request_data) | 748 return self._redirect(request, request_data) |