view libervia/web/pages/photos/album/_browser/__init__.py @ 1598:86c7a3a625d5

server: always start a new session on connection: The session was kept when a user was connecting from service profile (but not from other profiles), this was leading to session fixation vulnerability (an attacker on the same machine could get service profile session cookie, and use it when a victim would log-in). This patch fixes it by always starting a new session on connection. fix 443
author Goffi <goffi@goffi.org>
date Fri, 23 Feb 2024 13:35:24 +0100
parents 106945841fbc
children
line wrap: on
line source

import alt_media_player
from bridge import AsyncBridge, Bridge
from browser import DOMNode, aio, bind, document, html, window
import dialog
from file_uploader import FileUploader
from invitation import InvitationManager
from javascript import JSON
import loading
from slideshow import SlideShow
from template import Template
import tmp_aio


cache_path = window.cache_path
files_service = window.files_service
files_path = window.files_path
try:
    affiliations = window.affiliations.to_dict()
except AttributeError:
    pass
bridge = Bridge()
async_bridge = AsyncBridge()

alt_media_player.install_if_needed()

photo_tpl = Template('photo/item.html')
player_tpl = Template('components/media_player.html')

# file upload

file_uploader = FileUploader(files_service, "photo/item.html", files_path)


@bind("#file_drop", "drop")
def on_file_select(evt):
    evt.stopPropagation()
    evt.preventDefault()
    files = evt.dataTransfer.files
    file_uploader.upload_files(files, document["album_items"])


@bind("#file_drop", "dragover")
def on_drag_over(evt):
    evt.stopPropagation()
    evt.preventDefault()
    evt.dataTransfer.dropEffect = 'copy'


@bind("#file_input", "change")
def on_file_input_change(evt):
    files = evt.currentTarget.files
    file_uploader.upload_files(files, document["album_items"])

# delete

def file_delete_cb(item_elt, item):
    item_elt.classList.add("state_deleted")
    item_elt.bind("transitionend", lambda evt: item_elt.remove())
    print(f"deleted {item['name']}")


def file_delete_eb(failure, item_elt, item):
    dialog.notification.show(
        f"error while deleting {item['name']}: failure",
        level="error"
    )


def delete_ok(evt, notif_elt, item_elt, item):
    file_path = f"{files_path.rstrip('/')}/{item['name']}"
    bridge.file_sharing_delete(
        files_service,
        file_path,
        "",
        callback=lambda : file_delete_cb(item_elt, item),
        errback=lambda failure: file_delete_eb(failure, item_elt, item),
    )


def delete_cancel(evt, notif_elt, item_elt, item):
    notif_elt.remove()
    item_elt.classList.remove("selected_for_deletion")


def on_delete(evt):
    evt.stopPropagation()
    target = evt.currentTarget
    item_elt = DOMNode(target.closest('.item'))
    item_elt.classList.add("selected_for_deletion")
    item = JSON.parse(item_elt.dataset.item)
    dialog.Confirm(
        f"{item['name']!r} will be deleted, are you sure?",
        ok_label="delete",
        ok_color="danger",
    ).show(
        ok_cb=lambda evt, notif_elt: delete_ok(evt, notif_elt, item_elt, item),
        cancel_cb=lambda evt, notif_elt: delete_cancel(evt, notif_elt, item_elt, item),
    )

file_uploader.on_delete_cb = on_delete

# cover

async def cover_ok(evt, notif_elt, item_elt, item):
    # we first need to get a blob of the image
    img_elt = item_elt.select_one("img")
    # the simplest way is to download it
    r = await tmp_aio.ajax("GET", img_elt.src, "blob")
    if r.status != 200:
        dialog.notification.show(
            f"can't retrieve cover: {r.status}: {r.statusText}",
            level="error"
        )
        return
    img_blob = r.response
    # now we'll upload it via HTTP Upload, we need a slow
    img_name = img_elt.src.rsplit('/', 1)[-1]
    img_size = img_blob.size

    slot = await async_bridge.file_http_upload_get_slot(
        img_name,
        img_size,
        '',
        files_service
    )
    get_url, put_url, headers = slot
    # we have the slot, we can upload image
    r = await tmp_aio.ajax("PUT", put_url, "", img_blob)
    if r.status != 201:
        dialog.notification.show(
            f"can't upload cover: {r.status}: {r.statusText}",
            level="error"
        )
        return
    extra = {"thumb_url": get_url}
    album_name = files_path.rsplit('/', 1)[-1]
    await async_bridge.interests_file_sharing_register(
        files_service,
        "photos",
        "",
        files_path,
        album_name,
        JSON.stringify(extra),
    )
    dialog.notification.show("Album cover has been changed")


def cover_cancel(evt, notif_elt, item_elt, item):
    notif_elt.remove()
    item_elt.classList.remove("selected_for_action")


def on_cover(evt):
    evt.stopPropagation()
    target = evt.currentTarget
    item_elt = DOMNode(target.closest('.item'))
    item_elt.classList.add("selected_for_action")
    item = JSON.parse(item_elt.dataset.item)
    dialog.Confirm(
        f"use {item['name']!r} for this album cover?",
        ok_label="use as cover",
    ).show(
        ok_cb=lambda evt, notif_elt: aio.run(cover_ok(evt, notif_elt, item_elt, item)),
        cancel_cb=lambda evt, notif_elt: cover_cancel(evt, notif_elt, item_elt, item),
    )


# slideshow

@bind(".photo_thumb_click", "click")
def photo_click(evt):
    evt.stopPropagation()
    evt.preventDefault()
    slideshow = SlideShow()
    target = evt.currentTarget
    clicked_item_elt = DOMNode(target.closest('.item'))

    slideshow.attach()
    for idx, item_elt in enumerate(document.select('.item')):
        item = JSON.parse(item_elt.dataset.item)
        try:
            biggest_thumb = item['extra']['thumbnails'][-1]
            thumb_url = f"{cache_path}{biggest_thumb['filename']}"
        except (KeyError, IndexError) as e:
            print(f"Can't get full screen thumbnail URL: {e}")
            thumb_url = None
        if item.get("mime_type", "")[:5] == "video":
            player = alt_media_player.MediaPlayer(
                [item['url']],
                poster = thumb_url,
                reduce_click_area =  True
            )
            elt = player.elt
            elt.classList.add("slide_video", "no_fullscreen")
            slideshow.add_slide(
                elt,
                item,
                options={
                    "flags": (alt_media_player.NO_PAGINATION, alt_media_player.NO_SCROLLBAR),
                    "exit_callback": player.reset,
                }
            )
        else:
            slideshow.add_slide(html.IMG(src=thumb_url or item['url'], Class="slide_img"), item)
        if item_elt == clicked_item_elt:
            slideshow.index = idx


for elt in document.select('.action_delete'):
    elt.bind("click", on_delete)
for elt in document.select('.action_cover'):
    elt.bind("click", on_cover)

# manage


@bind("#button_manage", "click")
def manage_click(evt):
    evt.stopPropagation()
    evt.preventDefault()
    manager = InvitationManager("photos", {"service": files_service, "path": files_path})
    aio.run(manager.attach(affiliations=affiliations))


# hint
@bind("#hint .click_to_delete", "click")
def remove_hint(evt):
    document['hint'].remove()


loading.remove_loading_screen()