view libervia/web/pages/_browser/invitation.py @ 1598:86c7a3a625d5

server: always start a new session on connection: The session was kept when a user was connecting from service profile (but not from other profiles), this was leading to session fixation vulnerability (an attacker on the same machine could get service profile session cookie, and use it when a victim would log-in). This patch fixes it by always starting a new session on connection. fix 443
author Goffi <goffi@goffi.org>
date Fri, 23 Feb 2024 13:35:24 +0100
parents 7228fc3c4744
children
line wrap: on
line source

from bridge import Bridge
from browser import document, timer, window
from cache import cache
import dialog
import javascript
from template import Template

bridge = Bridge()
# we use JS RegExp because Python's re is really long to import in Brython
# FIXME: this is a naive JID regex, a more accurate should be used instead
jid_re = javascript.RegExp.new(r"^\w+@\w+\.\w+")


class InvitationManager:

    def __init__(self, invitation_type, invitation_data):
        self.invitation_type = invitation_type
        self.invitation_data = invitation_data
        manager_panel_tpl = Template('invitation/manager.html')
        self.manager_panel_elt = manager_panel_tpl.get_elt()
        self.invite_by_email_tpl = Template('invitation/invite_by_email.html')
        self.affiliation_tpl = Template('invitation/affiliation_item.html')
        self.new_item_tpl = Template('invitation/new_item.html')
        # list of item passing filter when adding a new contact
        self._filtered_new_items = {}
        self._active_new_item = None
        self._idx = 0

    async def attach(self, affiliations=None):
        if affiliations is None:
            affiliations = {}
        self.affiliations = affiliations
        self.side_panel = self.manager_panel_elt.select_one(
            '.invitation_manager_side_panel')
        self.open()
        for close_elt in self.manager_panel_elt.select('.click_to_close'):
            close_elt.bind("click", self.on_manager_close)
        self.side_panel.bind("click", lambda evt: evt.stopPropagation())

        await cache.fill_identities(affiliations.keys())
        for entity_jid, affiliation in self.affiliations.items():
            self.set_affiliation(entity_jid, affiliation)

        contact_elt = self.manager_panel_elt.select_one('input[name="contact"]')
        contact_elt.bind("input", self.on_contact_input)
        contact_elt.bind("keydown", self.on_contact_keydown)
        contact_elt.bind("focus", self.on_contact_focus)
        contact_elt.bind("blur", self.on_contact_blur)
        document['invite_email'].bind('click', self.on_invite_email_click)

    def open(self):
        """Re-attach and show a closed panel"""
        self._body_ori_style = document.body.style.height, document.body.style.overflow
        document.body.style.height = '100vh'
        document.body.style.overflow = 'hidden'
        document.body <= self.manager_panel_elt
        timer.set_timeout(lambda: self.side_panel.classList.add("open"), 0)

    def _on_close_transition_end(self, evt):
        self.manager_panel_elt.remove()
        # FIXME: not working with Brython, to report upstream
        # self.side_panel.unbind("transitionend", self._on_close_transition_end)
        self.side_panel.unbind("transitionend")

    def close(self):
        """Hide the panel"""
        document.body.style.height, document.body.style.overflow = self._body_ori_style
        self.side_panel.classList.remove('open')
        self.side_panel.bind("transitionend", self._on_close_transition_end)

    def _invite_jid(self, entity_jid, callback, errback=None):
        if errback is None:
            errback = lambda e: dialog.notification.show(f"invitation failed: {e}", "error")
        if self.invitation_type == 'photos':
            service = self.invitation_data["service"]
            path = self.invitation_data["path"]
            album_name = path.rsplit('/')[-1]
            print(f"inviting {entity_jid}")
            bridge.fis_invite(
                entity_jid,
                service,
                "photos",
                "",
                path,
                album_name,
                '',
                callback=callback,
                errback=errback
            )
        elif self.invitation_type == 'pubsub':
            service = self.invitation_data["service"]
            node = self.invitation_data["node"]
            name = self.invitation_data.get("name")
            namespace = self.invitation_data.get("namespace")
            extra = {}
            if namespace:
                extra["namespace"] = namespace
            print(f"inviting {entity_jid}")
            bridge.ps_invite(
                entity_jid,
                service,
                node,
                '',
                name,
                javascript.JSON.stringify(extra),
                callback=callback,
                errback=errback
            )
        else:
            print(f"error: unknown invitation type: {self.invitation_type}")

    def invite_by_jid(self, entity_jid):
        self._invite_jid(
            entity_jid,
            callback=lambda entity_jid=entity_jid: self._on_jid_invitation_success(entity_jid),
        )

    def on_manager_close(self, evt):
        self.close()

    def _on_jid_invitation_success(self, entity_jid):
        form_elt = document['invitation_form']
        contact_elt = form_elt.select_one('input[name="contact"]')
        contact_elt.value = ""
        contact_elt.dispatchEvent(window.Event.new('input'))
        dialog.notification.show(
            f"{entity_jid} has been invited",
            level="success",
        )
        if entity_jid not in self.affiliations:
            self.set_affiliation(entity_jid, "member")

    def on_contact_invite(self, evt, entity_jid):
        """User is adding a contact"""
        form_elt = document['invitation_form']
        contact_elt = form_elt.select_one('input[name="contact"]')
        contact_elt.value = ""
        contact_elt.dispatchEvent(window.Event.new('input'))
        self.invite_by_jid(entity_jid)

    def on_contact_keydown(self, evt):
        if evt.key == "Escape":
            evt.target.value = ""
            evt.target.dispatchEvent(window.Event.new('input'))
        elif evt.key == "ArrowDown":
            evt.stopPropagation()
            evt.preventDefault()
            content_elt = document['invitation_contact_search'].select_one(
                ".search_dialog__content")
            if self._active_new_item == None:
                self._active_new_item = content_elt.firstElementChild
                self._active_new_item.classList.add('selected')
            else:
                next_item = self._active_new_item.nextElementSibling
                if next_item is not None:
                    self._active_new_item.classList.remove('selected')
                    self._active_new_item = next_item
                    self._active_new_item.classList.add('selected')
        elif evt.key == "ArrowUp":
            evt.stopPropagation()
            evt.preventDefault()
            content_elt = document['invitation_contact_search'].select_one(
                ".search_dialog__content")
            if self._active_new_item == None:
                self._active_new_item = content_elt.lastElementChild
                self._active_new_item.classList.add('selected')
            else:
                previous_item = self._active_new_item.previousElementSibling
                if previous_item is not None:
                    self._active_new_item.classList.remove('selected')
                    self._active_new_item = previous_item
                    self._active_new_item.classList.add('selected')
        elif evt.key == "Enter":
            evt.stopPropagation()
            evt.preventDefault()
            if self._active_new_item is not None:
                entity_jid = self._active_new_item.dataset.entityJid
                self.invite_by_jid(entity_jid)
            else:
                if jid_re.exec(evt.target.value):
                    self.invite_by_jid(evt.target.value)
                    evt.target.value = ""

    def on_contact_focus(self, evt):
        search_dialog = document['invitation_contact_search']
        search_dialog.classList.add('open')
        self._active_new_item = None
        evt.target.dispatchEvent(window.Event.new('input'))

    def on_contact_blur(self, evt):
        search_dialog = document['invitation_contact_search']
        search_dialog.classList.remove('open')
        for elt in self._filtered_new_items.values():
            elt.remove()
        self._filtered_new_items.clear()


    def on_contact_input(self, evt):
        text = evt.target.value.strip().lower()
        search_dialog = document['invitation_contact_search']
        content_elt = search_dialog.select_one(".search_dialog__content")
        for (entity_jid, identity) in cache.identities.items():
            if not cache.match_identity(entity_jid, text, identity):
                # if the entity was present in last pass, we remove it
                try:
                    filtered_item = self._filtered_new_items.pop(entity_jid)
                except KeyError:
                    pass
                else:
                    filtered_item.remove()
                continue
            if entity_jid not in self._filtered_new_items:
                # we only create a new element if the item was not already there
                new_item_elt = self.new_item_tpl.get_elt({
                    "entity_jid": entity_jid,
                    "identities": cache.identities,
                })
                content_elt <= new_item_elt
                self._filtered_new_items[entity_jid] = new_item_elt
                for elt in new_item_elt.select('.click_to_ok'):
                    # we use mousedown instead of click because otherwise it would be
                    # ignored due to "blur" event manager (see
                    # https://stackoverflow.com/a/9335401)
                    elt.bind(
                        "mousedown",
                        lambda evt, entity_jid=entity_jid: self.on_contact_invite(
                            evt, entity_jid),
                    )

        if ((self._active_new_item is not None
             and not self._active_new_item.parentElement)):
            # active item has been filtered out
            self._active_new_item = None

    def _on_email_invitation_success(self, invitee_jid, email, name):
        self.set_affiliation(invitee_jid, "member")
        dialog.notification.show(
            f"{name} has been invited, he/she has received an email with a link",
            level="success",
        )

    def invitation_simple_create_cb(self, invitation_data, email, name):
        invitee_jid = invitation_data['jid']
        self._invite_jid(
            invitee_jid,
            callback=lambda: self._on_email_invitation_success(invitee_jid, email, name),
            errback=lambda e: dialog.notification.show(
                f"invitation failed for {email}: {e}",
                "error"
            )
        )

        # we update identities to have the name instead of the invitation jid in
        # affiliations
        cache.identities[invitee_jid] = {'nicknames': [name]}
        cache.update()

    def invite_by_email(self, email, name):
        guest_url_tpl = f'{window.URL.new("/g", document.baseURI).href}/{{uuid}}'
        bridge.invitation_simple_create(
            email,
            name,
            guest_url_tpl,
            '',
            callback=lambda data: self.invitation_simple_create_cb(data, email, name),
            errback=lambda e: window.alert(f"can't send email invitation: {e}")
        )

    def on_invite_email_submit(self, evt, invite_email_elt):
        evt.stopPropagation()
        evt.preventDefault()
        form = document['email_invitation_form']
        try:
            reportValidity = form.reportValidity
        except AttributeError:
            print("reportValidity is not supported by this browser!")
        else:
            if not reportValidity():
                return
        email = form.select_one('input[name="email"]').value
        name = form.select_one('input[name="name"]').value
        self.invite_by_email(email, name)
        invite_email_elt.remove()
        self.open()

    def on_invite_email_close(self, evt, invite_email_elt):
        evt.stopPropagation()
        evt.preventDefault()
        invite_email_elt.remove()
        self.open()

    def on_invite_email_click(self, evt):
        evt.stopPropagation()
        evt.preventDefault()
        invite_email_elt = self.invite_by_email_tpl.get_elt()
        document.body <= invite_email_elt
        document['email_invitation_submit'].bind(
            'click', lambda evt: self.on_invite_email_submit(evt, invite_email_elt)
        )
        for close_elt in invite_email_elt.select('.click_to_close'):
            close_elt.bind(
                "click", lambda evt: self.on_invite_email_close(evt, invite_email_elt))
        self.close()

    ## affiliations

    def _add_affiliation_bindings(self, entity_jid, affiliation_elt):
        for elt in affiliation_elt.select(".click_to_delete"):
            elt.bind(
                "click",
                lambda evt, entity_jid=entity_jid, affiliation_elt=affiliation_elt:
                self.on_affiliation_remove(entity_jid, affiliation_elt)
            )
        for elt in affiliation_elt.select(".click_to_set_publisher"):
            try:
                name = cache.identities[entity_jid]["nicknames"][0]
            except (KeyError, IndexError):
                name = entity_jid
            elt.bind(
                "click",
                lambda evt, entity_jid=entity_jid, name=name,
                    affiliation_elt=affiliation_elt:
                    self.on_affiliation_set(
                        entity_jid, name, affiliation_elt, "publisher"
                    ),
            )
        for elt in affiliation_elt.select(".click_to_set_member"):
            try:
                name = cache.identities[entity_jid]["nicknames"][0]
            except (KeyError, IndexError):
                name = entity_jid
            elt.bind(
                "click",
                lambda evt, entity_jid=entity_jid, name=name,
                    affiliation_elt=affiliation_elt:
                    self.on_affiliation_set(
                        entity_jid, name, affiliation_elt, "member"
                    ),
            )

    def set_affiliation(self, entity_jid, affiliation):
        if affiliation not in ('owner', 'member', 'publisher'):
            raise NotImplementedError(
                f'{affiliation} affiliation can not be set with this method for the '
                'moment')
        if entity_jid not in self.affiliations:
            self.affiliations[entity_jid] = affiliation
        affiliation_elt = self.affiliation_tpl.get_elt({
            "entity_jid": entity_jid,
            "affiliation": affiliation,
            "identities": cache.identities,
        })
        document['affiliations'] <= affiliation_elt
        self._add_affiliation_bindings(entity_jid, affiliation_elt)

    def _on_affiliation_remove_success(self, affiliation_elt, entity_jid):
        affiliation_elt.remove()
        del self.affiliations[entity_jid]

    def on_affiliation_remove(self, entity_jid, affiliation_elt):
        if self.invitation_type == 'photos':
            path = self.invitation_data["path"]
            service = self.invitation_data["service"]
            bridge.fis_affiliations_set(
                service,
                "",
                path,
                {entity_jid: "none"},
                callback=lambda: self._on_affiliation_remove_success(
                    affiliation_elt, entity_jid),
                errback=lambda e: dialog.notification.show(
                    f"can't remove affiliation: {e}", "error")
            )
        elif self.invitation_type == 'pubsub':
            service = self.invitation_data["service"]
            node = self.invitation_data["node"]
            bridge.ps_node_affiliations_set(
                service,
                node,
                {entity_jid: "none"},
                callback=lambda: self._on_affiliation_remove_success(
                    affiliation_elt, entity_jid),
                errback=lambda e: dialog.notification.show(
                    f"can't remove affiliation: {e}", "error")
            )
        else:
            dialog.notification.show(
                f"error: unknown invitation type: {self.invitation_type}",
                "error"
            )

    def _on_affiliation_set_success(self, entity_jid, name, affiliation_elt, affiliation):
        dialog.notification.show(f"permission updated for {name}")
        self.affiliations[entity_jid] = affiliation
        new_affiliation_elt = self.affiliation_tpl.get_elt({
            "entity_jid": entity_jid,
            "affiliation": affiliation,
            "identities": cache.identities,
        })
        affiliation_elt.replaceWith(new_affiliation_elt)
        self._add_affiliation_bindings(entity_jid, new_affiliation_elt)

    def _on_affiliation_set_ok(self, entity_jid, name, affiliation_elt, affiliation):
        if self.invitation_type == 'pubsub':
            service = self.invitation_data["service"]
            node = self.invitation_data["node"]
            bridge.ps_node_affiliations_set(
                service,
                node,
                {entity_jid: affiliation},
                callback=lambda: self._on_affiliation_set_success(
                    entity_jid, name, affiliation_elt, affiliation
                ),
                errback=lambda e: dialog.notification.show(
                    f"can't set affiliation: {e}", "error")
            )
        else:
            dialog.notification.show(
                f"error: unknown invitation type: {self.invitation_type}",
                "error"
            )

    def _on_affiliation_set_cancel(self, evt, notif_elt):
        self.open()

    def on_affiliation_set(self, entity_jid, name, affiliation_elt, affiliation):
        if affiliation == "publisher":
            message = f"Give autorisation to publish to {name}?"
        elif affiliation == "member":
            message = f"Remove autorisation to publish from {name}?"
        else:
            dialog.notification.show(f"unmanaged affiliation: {affiliation}", "error")
            return
        dialog.Confirm(message).show(
            ok_cb=lambda evt, notif_elt:
                self._on_affiliation_set_ok(
                    entity_jid, name, affiliation_elt, affiliation
                ),
            cancel_cb=self._on_affiliation_set_cancel
        )
        self.close()