Mercurial > libervia-web
view libervia/web/pages/photos/album/_browser/__init__.py @ 1534:49ad8dd210d0
server (restricted_bridge): add `message_send` method
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 22 Jun 2023 16:36:18 +0200 |
parents | 106945841fbc |
children |
line wrap: on
line source
import alt_media_player from bridge import AsyncBridge, Bridge from browser import DOMNode, aio, bind, document, html, window import dialog from file_uploader import FileUploader from invitation import InvitationManager from javascript import JSON import loading from slideshow import SlideShow from template import Template import tmp_aio cache_path = window.cache_path files_service = window.files_service files_path = window.files_path try: affiliations = window.affiliations.to_dict() except AttributeError: pass bridge = Bridge() async_bridge = AsyncBridge() alt_media_player.install_if_needed() photo_tpl = Template('photo/item.html') player_tpl = Template('components/media_player.html') # file upload file_uploader = FileUploader(files_service, "photo/item.html", files_path) @bind("#file_drop", "drop") def on_file_select(evt): evt.stopPropagation() evt.preventDefault() files = evt.dataTransfer.files file_uploader.upload_files(files, document["album_items"]) @bind("#file_drop", "dragover") def on_drag_over(evt): evt.stopPropagation() evt.preventDefault() evt.dataTransfer.dropEffect = 'copy' @bind("#file_input", "change") def on_file_input_change(evt): files = evt.currentTarget.files file_uploader.upload_files(files, document["album_items"]) # delete def file_delete_cb(item_elt, item): item_elt.classList.add("state_deleted") item_elt.bind("transitionend", lambda evt: item_elt.remove()) print(f"deleted {item['name']}") def file_delete_eb(failure, item_elt, item): dialog.notification.show( f"error while deleting {item['name']}: failure", level="error" ) def delete_ok(evt, notif_elt, item_elt, item): file_path = f"{files_path.rstrip('/')}/{item['name']}" bridge.file_sharing_delete( files_service, file_path, "", callback=lambda : file_delete_cb(item_elt, item), errback=lambda failure: file_delete_eb(failure, item_elt, item), ) def delete_cancel(evt, notif_elt, item_elt, item): notif_elt.remove() item_elt.classList.remove("selected_for_deletion") def on_delete(evt): evt.stopPropagation() target = evt.currentTarget item_elt = DOMNode(target.closest('.item')) item_elt.classList.add("selected_for_deletion") item = JSON.parse(item_elt.dataset.item) dialog.Confirm( f"{item['name']!r} will be deleted, are you sure?", ok_label="delete", ok_color="danger", ).show( ok_cb=lambda evt, notif_elt: delete_ok(evt, notif_elt, item_elt, item), cancel_cb=lambda evt, notif_elt: delete_cancel(evt, notif_elt, item_elt, item), ) file_uploader.on_delete_cb = on_delete # cover async def cover_ok(evt, notif_elt, item_elt, item): # we first need to get a blob of the image img_elt = item_elt.select_one("img") # the simplest way is to download it r = await tmp_aio.ajax("GET", img_elt.src, "blob") if r.status != 200: dialog.notification.show( f"can't retrieve cover: {r.status}: {r.statusText}", level="error" ) return img_blob = r.response # now we'll upload it via HTTP Upload, we need a slow img_name = img_elt.src.rsplit('/', 1)[-1] img_size = img_blob.size slot = await async_bridge.file_http_upload_get_slot( img_name, img_size, '', files_service ) get_url, put_url, headers = slot # we have the slot, we can upload image r = await tmp_aio.ajax("PUT", put_url, "", img_blob) if r.status != 201: dialog.notification.show( f"can't upload cover: {r.status}: {r.statusText}", level="error" ) return extra = {"thumb_url": get_url} album_name = files_path.rsplit('/', 1)[-1] await async_bridge.interests_file_sharing_register( files_service, "photos", "", files_path, album_name, JSON.stringify(extra), ) dialog.notification.show("Album cover has been changed") def cover_cancel(evt, notif_elt, item_elt, item): notif_elt.remove() item_elt.classList.remove("selected_for_action") def on_cover(evt): evt.stopPropagation() target = evt.currentTarget item_elt = DOMNode(target.closest('.item')) item_elt.classList.add("selected_for_action") item = JSON.parse(item_elt.dataset.item) dialog.Confirm( f"use {item['name']!r} for this album cover?", ok_label="use as cover", ).show( ok_cb=lambda evt, notif_elt: aio.run(cover_ok(evt, notif_elt, item_elt, item)), cancel_cb=lambda evt, notif_elt: cover_cancel(evt, notif_elt, item_elt, item), ) # slideshow @bind(".photo_thumb_click", "click") def photo_click(evt): evt.stopPropagation() evt.preventDefault() slideshow = SlideShow() target = evt.currentTarget clicked_item_elt = DOMNode(target.closest('.item')) slideshow.attach() for idx, item_elt in enumerate(document.select('.item')): item = JSON.parse(item_elt.dataset.item) try: biggest_thumb = item['extra']['thumbnails'][-1] thumb_url = f"{cache_path}{biggest_thumb['filename']}" except (KeyError, IndexError) as e: print(f"Can't get full screen thumbnail URL: {e}") thumb_url = None if item.get("mime_type", "")[:5] == "video": player = alt_media_player.MediaPlayer( [item['url']], poster = thumb_url, reduce_click_area = True ) elt = player.elt elt.classList.add("slide_video", "no_fullscreen") slideshow.add_slide( elt, item, options={ "flags": (alt_media_player.NO_PAGINATION, alt_media_player.NO_SCROLLBAR), "exit_callback": player.reset, } ) else: slideshow.add_slide(html.IMG(src=thumb_url or item['url'], Class="slide_img"), item) if item_elt == clicked_item_elt: slideshow.index = idx for elt in document.select('.action_delete'): elt.bind("click", on_delete) for elt in document.select('.action_cover'): elt.bind("click", on_cover) # manage @bind("#button_manage", "click") def manage_click(evt): evt.stopPropagation() evt.preventDefault() manager = InvitationManager("photos", {"service": files_service, "path": files_path}) aio.run(manager.attach(affiliations=affiliations)) # hint @bind("#hint .click_to_delete", "click") def remove_hint(evt): document['hint'].remove() loading.remove_loading_screen()