view libervia/web/pages/photos/album/_browser/__init__.py @ 1532:106945841fbc

_browser (album): moved code to upload file to a separate `file_uploader` module: this way, uploading logic can be re-used elsewhere.
author Goffi <goffi@goffi.org>
date Thu, 22 Jun 2023 16:36:01 +0200
parents d7c78722e4f8
children
line wrap: on
line source

import alt_media_player
from bridge import AsyncBridge, Bridge
from browser import DOMNode, aio, bind, document, html, window
import dialog
from file_uploader import FileUploader
from invitation import InvitationManager
from javascript import JSON
import loading
from slideshow import SlideShow
from template import Template
import tmp_aio


cache_path = window.cache_path
files_service = window.files_service
files_path = window.files_path
try:
    affiliations = window.affiliations.to_dict()
except AttributeError:
    pass
bridge = Bridge()
async_bridge = AsyncBridge()

alt_media_player.install_if_needed()

photo_tpl = Template('photo/item.html')
player_tpl = Template('components/media_player.html')

# file upload

file_uploader = FileUploader(files_service, "photo/item.html", files_path)


@bind("#file_drop", "drop")
def on_file_select(evt):
    evt.stopPropagation()
    evt.preventDefault()
    files = evt.dataTransfer.files
    file_uploader.upload_files(files, document["album_items"])


@bind("#file_drop", "dragover")
def on_drag_over(evt):
    evt.stopPropagation()
    evt.preventDefault()
    evt.dataTransfer.dropEffect = 'copy'


@bind("#file_input", "change")
def on_file_input_change(evt):
    files = evt.currentTarget.files
    file_uploader.upload_files(files, document["album_items"])

# delete

def file_delete_cb(item_elt, item):
    item_elt.classList.add("state_deleted")
    item_elt.bind("transitionend", lambda evt: item_elt.remove())
    print(f"deleted {item['name']}")


def file_delete_eb(failure, item_elt, item):
    dialog.notification.show(
        f"error while deleting {item['name']}: failure",
        level="error"
    )


def delete_ok(evt, notif_elt, item_elt, item):
    file_path = f"{files_path.rstrip('/')}/{item['name']}"
    bridge.file_sharing_delete(
        files_service,
        file_path,
        "",
        callback=lambda : file_delete_cb(item_elt, item),
        errback=lambda failure: file_delete_eb(failure, item_elt, item),
    )


def delete_cancel(evt, notif_elt, item_elt, item):
    notif_elt.remove()
    item_elt.classList.remove("selected_for_deletion")


def on_delete(evt):
    evt.stopPropagation()
    target = evt.currentTarget
    item_elt = DOMNode(target.closest('.item'))
    item_elt.classList.add("selected_for_deletion")
    item = JSON.parse(item_elt.dataset.item)
    dialog.Confirm(
        f"{item['name']!r} will be deleted, are you sure?",
        ok_label="delete",
        ok_color="danger",
    ).show(
        ok_cb=lambda evt, notif_elt: delete_ok(evt, notif_elt, item_elt, item),
        cancel_cb=lambda evt, notif_elt: delete_cancel(evt, notif_elt, item_elt, item),
    )

file_uploader.on_delete_cb = on_delete

# cover

async def cover_ok(evt, notif_elt, item_elt, item):
    # we first need to get a blob of the image
    img_elt = item_elt.select_one("img")
    # the simplest way is to download it
    r = await tmp_aio.ajax("GET", img_elt.src, "blob")
    if r.status != 200:
        dialog.notification.show(
            f"can't retrieve cover: {r.status}: {r.statusText}",
            level="error"
        )
        return
    img_blob = r.response
    # now we'll upload it via HTTP Upload, we need a slow
    img_name = img_elt.src.rsplit('/', 1)[-1]
    img_size = img_blob.size

    slot = await async_bridge.file_http_upload_get_slot(
        img_name,
        img_size,
        '',
        files_service
    )
    get_url, put_url, headers = slot
    # we have the slot, we can upload image
    r = await tmp_aio.ajax("PUT", put_url, "", img_blob)
    if r.status != 201:
        dialog.notification.show(
            f"can't upload cover: {r.status}: {r.statusText}",
            level="error"
        )
        return
    extra = {"thumb_url": get_url}
    album_name = files_path.rsplit('/', 1)[-1]
    await async_bridge.interests_file_sharing_register(
        files_service,
        "photos",
        "",
        files_path,
        album_name,
        JSON.stringify(extra),
    )
    dialog.notification.show("Album cover has been changed")


def cover_cancel(evt, notif_elt, item_elt, item):
    notif_elt.remove()
    item_elt.classList.remove("selected_for_action")


def on_cover(evt):
    evt.stopPropagation()
    target = evt.currentTarget
    item_elt = DOMNode(target.closest('.item'))
    item_elt.classList.add("selected_for_action")
    item = JSON.parse(item_elt.dataset.item)
    dialog.Confirm(
        f"use {item['name']!r} for this album cover?",
        ok_label="use as cover",
    ).show(
        ok_cb=lambda evt, notif_elt: aio.run(cover_ok(evt, notif_elt, item_elt, item)),
        cancel_cb=lambda evt, notif_elt: cover_cancel(evt, notif_elt, item_elt, item),
    )


# slideshow

@bind(".photo_thumb_click", "click")
def photo_click(evt):
    evt.stopPropagation()
    evt.preventDefault()
    slideshow = SlideShow()
    target = evt.currentTarget
    clicked_item_elt = DOMNode(target.closest('.item'))

    slideshow.attach()
    for idx, item_elt in enumerate(document.select('.item')):
        item = JSON.parse(item_elt.dataset.item)
        try:
            biggest_thumb = item['extra']['thumbnails'][-1]
            thumb_url = f"{cache_path}{biggest_thumb['filename']}"
        except (KeyError, IndexError) as e:
            print(f"Can't get full screen thumbnail URL: {e}")
            thumb_url = None
        if item.get("mime_type", "")[:5] == "video":
            player = alt_media_player.MediaPlayer(
                [item['url']],
                poster = thumb_url,
                reduce_click_area =  True
            )
            elt = player.elt
            elt.classList.add("slide_video", "no_fullscreen")
            slideshow.add_slide(
                elt,
                item,
                options={
                    "flags": (alt_media_player.NO_PAGINATION, alt_media_player.NO_SCROLLBAR),
                    "exit_callback": player.reset,
                }
            )
        else:
            slideshow.add_slide(html.IMG(src=thumb_url or item['url'], Class="slide_img"), item)
        if item_elt == clicked_item_elt:
            slideshow.index = idx


for elt in document.select('.action_delete'):
    elt.bind("click", on_delete)
for elt in document.select('.action_cover'):
    elt.bind("click", on_cover)

# manage


@bind("#button_manage", "click")
def manage_click(evt):
    evt.stopPropagation()
    evt.preventDefault()
    manager = InvitationManager("photos", {"service": files_service, "path": files_path})
    aio.run(manager.attach(affiliations=affiliations))


# hint
@bind("#hint .click_to_delete", "click")
def remove_hint(evt):
    document['hint'].remove()


loading.remove_loading_screen()