view libervia/web/server/session_iface.py @ 1616:6bfeb9f0fb84

browser (calls): conferences implementation: - Handle A/V conferences calls creation/joining by entering a conference room JID in the search box. - Group call box has been improved and is used both for group calls (small number of participants) and A/V conferences (larger number of participants). - Fullscreen button for group call is working. - Avatar/user nickname are shown in group call on peer user, as an overlay on video stream. - Use `user` metadata when present to display the right user avatar/name when receiving a stream from SFU (i.e. A/V conference). - Peer user have a new 3 dots menu with a `pin` item to (un)pin it (i.e. display it on full container with on top). - Updated webrtc to handle unidirectional streams correctly and to adapt to A/V conference specification. rel 448
author Goffi <goffi@goffi.org>
date Wed, 07 Aug 2024 00:01:57 +0200
parents 86c7a3a625d5
children
line wrap: on
line source

#!/usr/bin/env python3

# Libervia Web Frontend
# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
from collections import OrderedDict, deque
import os.path
import time
from typing import List, Dict, Optional

import shortuuid
from zope.interface import Attribute, Interface
from zope.interface import implementer

from libervia.backend.core.log import getLogger

from libervia.web.server.classes import Notification
from libervia.web.server.constants import Const as C


log = getLogger(__name__)

FLAGS_KEY = "_flags"
NOTIFICATIONS_KEY = "_notifications"
MAX_CACHE_AFFILIATIONS = 100  # number of nodes to keep in cache


class IWebSession(Interface):
    profile = Attribute("Libervia profile")
    jid = Attribute("JID associated with the profile")
    uuid = Attribute("uuid associated with the profile session")
    identities = Attribute("Identities of XMPP entities")


@implementer(IWebSession)
class WebSession:
    profiles_map: Dict[Optional[str], List["WebSession"]] = {}

    def __init__(self, session):
        self._profile = None
        self.jid = None
        self.started = time.time()
        # time when the backend session was started
        self.backend_started = None
        self.uuid = str(shortuuid.uuid())
        self.identities = {}
        self.csrf_token = str(shortuuid.uuid())
        self.ws_token = str(shortuuid.uuid())
        self.ws_socket = None
        self.ws_buffer = deque(maxlen=200)
        self.locale = None  # i18n of the pages
        self.theme = None
        self.pages_data = {}  # used to keep data accross reloads (key is page instance)
        self.affiliations = OrderedDict()  # cache for node affiliations
        self.profiles_map.setdefault(C.SERVICE_PROFILE, []).append(self)
        log.debug(
            f"session started for {C.SERVICE_PROFILE} "
            f"({len(self.get_profile_sessions(C.SERVICE_PROFILE))} session(s) active)"
        )

    @property
    def profile(self) -> Optional[str]:
        return self._profile

    @profile.setter
    def profile(self, profile: Optional[str]) -> None:
        old_profile = self._profile or C.SERVICE_PROFILE
        new_profile = profile or C.SERVICE_PROFILE
        try:
            self.profiles_map[old_profile].remove(self)
        except (ValueError, KeyError):
            log.warning(f"session was not registered for profile {old_profile}")
        else:
            nb_session_old = len(self.get_profile_sessions(old_profile))
            log.debug(f"{old_profile} has now {nb_session_old} session(s) active")

        self._profile = profile
        self.profiles_map.setdefault(new_profile, []).append(self)
        nb_session_new = len(self.get_profile_sessions(new_profile))
        log.debug(f"{new_profile} has now {nb_session_new} session(s) active")

    @property
    def cache_dir(self):
        if self.profile is None:
            return self.service_cache_url + "/"
        return os.path.join("/", C.CACHE_DIR, self.uuid) + "/"

    @property
    def connected(self):
        return self.profile is not None

    @property
    def guest(self):
        """True if this is a guest session"""
        if self.profile is None:
            return False
        else:
            return self.profile.startswith("guest@@")

    @classmethod
    def send(cls, profile: str, data_type: str, data: dict) -> None:
        """Send a message to all session

        If the session doesn't have an active websocket, the message is buffered until a
        socket is available
        """
        for session in cls.profiles_map.get(profile, []):
            if session.ws_socket is None or not session.ws_socket.init_ok:
                session.ws_buffer.append({"data_type": data_type, "data": data})
            else:
                session.ws_socket.send(data_type, data)

    def on_expire(self) -> None:
        profile = self._profile or C.SERVICE_PROFILE
        try:
            self.profiles_map[profile].remove(self)
        except (ValueError, KeyError):
            log.warning(f"session was not registered for profile {profile}")
        else:
            nb_session = len(self.get_profile_sessions(profile))
            log.debug(
                f"Session for profile {profile} expired. {profile} has now {nb_session} "
                f"session(s) active."
            )

    @classmethod
    def get_profile_sessions(cls, profile: str) -> List["WebSession"]:
        return cls.profiles_map.get(profile, [])

    def get_page_data(self, page, key):
        """get session data for a page

        @param page(LiberviaPage): instance of the page
        @param key(object): data key
        return (None, object): value of the key
            None if not found or page_data doesn't exist
        """
        return self.pages_data.get(page, {}).get(key)

    def pop_page_data(self, page, key, default=None):
        """like get_page_data, but remove key once value is gotten

        @param page(LiberviaPage): instance of the page
        @param key(object): data key
        @param default(object): value to return if key is not found
        @return (object): found value or default
        """
        page_data = self.pages_data.get(page)
        if page_data is None:
            return default
        value = page_data.pop(key, default)
        if not page_data:
            # no need to keep unused page_data
            del self.pages_data[page]
        return value

    def set_page_data(self, page, key, value):
        """set data to persist on reload

        @param page(LiberviaPage): instance of the page
        @param key(object): data key
        @param value(object): value to set
        @return (object): set value
        """
        page_data = self.pages_data.setdefault(page, {})
        page_data[key] = value
        return value

    def set_page_flag(self, page, flag):
        """set a flag for this page

        @param page(LiberviaPage): instance of the page
        @param flag(unicode): flag to set
        """
        flags = self.get_page_data(page, FLAGS_KEY)
        if flags is None:
            flags = self.set_page_data(page, FLAGS_KEY, set())
        flags.add(flag)

    def pop_page_flag(self, page, flag):
        """return True if flag is set

        flag is removed if it was set
        @param page(LiberviaPage): instance of the page
        @param flag(unicode): flag to set
        @return (bool): True if flaag was set
        """
        page_data = self.pages_data.get(page, {})
        flags = page_data.get(FLAGS_KEY)
        if flags is None:
            return False
        if flag in flags:
            flags.remove(flag)
            # we remove data if they are not used anymore
            if not flags:
                del page_data[FLAGS_KEY]
            if not page_data:
                del self.pages_data[page]
            return True
        else:
            return False

    def set_page_notification(self, page, message, level=C.LVL_INFO):
        """set a flag for this page

        @param page(LiberviaPage): instance of the page
        @param flag(unicode): flag to set
        """
        notif = Notification(message, level)
        notifs = self.get_page_data(page, NOTIFICATIONS_KEY)
        if notifs is None:
            notifs = self.set_page_data(page, NOTIFICATIONS_KEY, [])
        notifs.append(notif)

    def pop_page_notifications(self, page):
        """Return and remove last page notification

        @param page(LiberviaPage): instance of the page
        @return (list[Notification]): notifications if any
        """
        page_data = self.pages_data.get(page, {})
        notifs = page_data.get(NOTIFICATIONS_KEY)
        if not notifs:
            return []
        ret = notifs[:]
        del notifs[:]
        return ret

    def get_affiliation(self, service, node):
        """retrieve affiliation for a pubsub node

        @param service(jid.JID): pubsub service
        @param node(unicode): pubsub node
        @return (unicode, None): affiliation, or None if it is not in cache
        """
        if service.resource:
            raise ValueError("Service must not have a resource")
        if not node:
            raise ValueError("node must be set")
        try:
            affiliation = self.affiliations.pop((service, node))
        except KeyError:
            return None
        else:
            # we replace at the top to get the most recently used on top
            # so less recently used will be removed if cache is full
            self.affiliations[(service, node)] = affiliation
            return affiliation

    def set_affiliation(self, service, node, affiliation):
        """cache affiliation for a node

        will empty cache when it become too big
        @param service(jid.JID): pubsub service
        @param node(unicode): pubsub node
        @param affiliation(unicode): affiliation to this node
        """
        if service.resource:
            raise ValueError("Service must not have a resource")
        if not node:
            raise ValueError("node must be set")
        self.affiliations[(service, node)] = affiliation
        while len(self.affiliations) > MAX_CACHE_AFFILIATIONS:
            self.affiliations.popitem(last=False)


class IWebGuestSession(Interface):
    id = Attribute("UUID of the guest")
    data = Attribute("data associated with the guest")


@implementer(IWebGuestSession)
class WebGuestSession(object):

    def __init__(self, session):
        self.id = None
        self.data = None