Mercurial > libervia-web
view libervia/web/pages/photos/album/_browser/__init__.py @ 1598:86c7a3a625d5
server: always start a new session on connection:
The session was kept when a user was connecting from service profile (but not from other
profiles), this was leading to session fixation vulnerability (an attacker on the same
machine could get service profile session cookie, and use it when a victim would log-in).
This patch fixes it by always starting a new session on connection.
fix 443
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 23 Feb 2024 13:35:24 +0100 |
parents | 106945841fbc |
children |
line wrap: on
line source
import alt_media_player from bridge import AsyncBridge, Bridge from browser import DOMNode, aio, bind, document, html, window import dialog from file_uploader import FileUploader from invitation import InvitationManager from javascript import JSON import loading from slideshow import SlideShow from template import Template import tmp_aio cache_path = window.cache_path files_service = window.files_service files_path = window.files_path try: affiliations = window.affiliations.to_dict() except AttributeError: pass bridge = Bridge() async_bridge = AsyncBridge() alt_media_player.install_if_needed() photo_tpl = Template('photo/item.html') player_tpl = Template('components/media_player.html') # file upload file_uploader = FileUploader(files_service, "photo/item.html", files_path) @bind("#file_drop", "drop") def on_file_select(evt): evt.stopPropagation() evt.preventDefault() files = evt.dataTransfer.files file_uploader.upload_files(files, document["album_items"]) @bind("#file_drop", "dragover") def on_drag_over(evt): evt.stopPropagation() evt.preventDefault() evt.dataTransfer.dropEffect = 'copy' @bind("#file_input", "change") def on_file_input_change(evt): files = evt.currentTarget.files file_uploader.upload_files(files, document["album_items"]) # delete def file_delete_cb(item_elt, item): item_elt.classList.add("state_deleted") item_elt.bind("transitionend", lambda evt: item_elt.remove()) print(f"deleted {item['name']}") def file_delete_eb(failure, item_elt, item): dialog.notification.show( f"error while deleting {item['name']}: failure", level="error" ) def delete_ok(evt, notif_elt, item_elt, item): file_path = f"{files_path.rstrip('/')}/{item['name']}" bridge.file_sharing_delete( files_service, file_path, "", callback=lambda : file_delete_cb(item_elt, item), errback=lambda failure: file_delete_eb(failure, item_elt, item), ) def delete_cancel(evt, notif_elt, item_elt, item): notif_elt.remove() item_elt.classList.remove("selected_for_deletion") def on_delete(evt): evt.stopPropagation() target = evt.currentTarget item_elt = DOMNode(target.closest('.item')) item_elt.classList.add("selected_for_deletion") item = JSON.parse(item_elt.dataset.item) dialog.Confirm( f"{item['name']!r} will be deleted, are you sure?", ok_label="delete", ok_color="danger", ).show( ok_cb=lambda evt, notif_elt: delete_ok(evt, notif_elt, item_elt, item), cancel_cb=lambda evt, notif_elt: delete_cancel(evt, notif_elt, item_elt, item), ) file_uploader.on_delete_cb = on_delete # cover async def cover_ok(evt, notif_elt, item_elt, item): # we first need to get a blob of the image img_elt = item_elt.select_one("img") # the simplest way is to download it r = await tmp_aio.ajax("GET", img_elt.src, "blob") if r.status != 200: dialog.notification.show( f"can't retrieve cover: {r.status}: {r.statusText}", level="error" ) return img_blob = r.response # now we'll upload it via HTTP Upload, we need a slow img_name = img_elt.src.rsplit('/', 1)[-1] img_size = img_blob.size slot = await async_bridge.file_http_upload_get_slot( img_name, img_size, '', files_service ) get_url, put_url, headers = slot # we have the slot, we can upload image r = await tmp_aio.ajax("PUT", put_url, "", img_blob) if r.status != 201: dialog.notification.show( f"can't upload cover: {r.status}: {r.statusText}", level="error" ) return extra = {"thumb_url": get_url} album_name = files_path.rsplit('/', 1)[-1] await async_bridge.interests_file_sharing_register( files_service, "photos", "", files_path, album_name, JSON.stringify(extra), ) dialog.notification.show("Album cover has been changed") def cover_cancel(evt, notif_elt, item_elt, item): notif_elt.remove() item_elt.classList.remove("selected_for_action") def on_cover(evt): evt.stopPropagation() target = evt.currentTarget item_elt = DOMNode(target.closest('.item')) item_elt.classList.add("selected_for_action") item = JSON.parse(item_elt.dataset.item) dialog.Confirm( f"use {item['name']!r} for this album cover?", ok_label="use as cover", ).show( ok_cb=lambda evt, notif_elt: aio.run(cover_ok(evt, notif_elt, item_elt, item)), cancel_cb=lambda evt, notif_elt: cover_cancel(evt, notif_elt, item_elt, item), ) # slideshow @bind(".photo_thumb_click", "click") def photo_click(evt): evt.stopPropagation() evt.preventDefault() slideshow = SlideShow() target = evt.currentTarget clicked_item_elt = DOMNode(target.closest('.item')) slideshow.attach() for idx, item_elt in enumerate(document.select('.item')): item = JSON.parse(item_elt.dataset.item) try: biggest_thumb = item['extra']['thumbnails'][-1] thumb_url = f"{cache_path}{biggest_thumb['filename']}" except (KeyError, IndexError) as e: print(f"Can't get full screen thumbnail URL: {e}") thumb_url = None if item.get("mime_type", "")[:5] == "video": player = alt_media_player.MediaPlayer( [item['url']], poster = thumb_url, reduce_click_area = True ) elt = player.elt elt.classList.add("slide_video", "no_fullscreen") slideshow.add_slide( elt, item, options={ "flags": (alt_media_player.NO_PAGINATION, alt_media_player.NO_SCROLLBAR), "exit_callback": player.reset, } ) else: slideshow.add_slide(html.IMG(src=thumb_url or item['url'], Class="slide_img"), item) if item_elt == clicked_item_elt: slideshow.index = idx for elt in document.select('.action_delete'): elt.bind("click", on_delete) for elt in document.select('.action_cover'): elt.bind("click", on_cover) # manage @bind("#button_manage", "click") def manage_click(evt): evt.stopPropagation() evt.preventDefault() manager = InvitationManager("photos", {"service": files_service, "path": files_path}) aio.run(manager.attach(affiliations=affiliations)) # hint @bind("#hint .click_to_delete", "click") def remove_hint(evt): document['hint'].remove() loading.remove_loading_screen()