view libervia/cli/call_webrtc.py @ 4230:314d3c02bb67

core (xmpp): Add a timeout for messages processing to avoid blocking the queue.
author Goffi <goffi@goffi.org>
date Sat, 06 Apr 2024 12:21:04 +0200
parents 9218d4331bb2
children
line wrap: on
line source

#!/usr/bin/env python3

# Libervia CLI
# Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from dataclasses import dataclass

from libervia.backend.tools.common import data_format
from libervia.frontends.tools import aio, jid


@dataclass
class CallData:
    callee: jid.JID
    sid: str | None = None
    action_id: str | None = None


class WebRTCCall:
    def __init__(self, host, profile: str, callee: jid.JID, **kwargs):
        """Create and setup a webRTC instance

        @param profile: profile making or receiving the call
        @param callee: peer jid
        @param kwargs: extra kw args to use when instantiating WebRTC
        """
        from libervia.frontends.tools import webrtc

        aio.install_glib_asyncio_iteration()
        self.host = host
        self.profile = profile
        self.webrtc = webrtc.WebRTC(host.bridge, profile, **kwargs)
        self.webrtc.callee = callee
        host.bridge.register_signal(
            "ice_candidates_new", self.on_ice_candidates_new, "plugin"
        )
        host.bridge.register_signal("call_setup", self.on_call_setup, "plugin")
        host.bridge.register_signal("call_ended", self.on_call_ended, "plugin")

    @classmethod
    async def make_webrtc_call(
        cls,
        host,
        profile: str,
        call_data: CallData,
        **kwargs
    ) -> "WebRTCCall":
        """Create the webrtc_call instance

        @param call_data: Call data of the command
        @param kwargs: extra args used to instanciate WebRTCCall

        """
        webrtc_call = cls(host, profile, call_data.callee, **kwargs)
        if call_data.sid is None:
            # we are making the call
            await webrtc_call.start()
        else:
            # we are receiving the call
            webrtc_call.sid = call_data.sid
            if call_data.action_id is not None:
                await host.bridge.action_launch(
                    call_data.action_id,
                    data_format.serialise({"cancelled": False}),
                    profile
                )
        return webrtc_call

    @property
    def sid(self) -> str | None:
        return self.webrtc.sid

    @sid.setter
    def sid(self, new_sid: str | None) -> None:
        self.webrtc.sid = new_sid

    async def on_ice_candidates_new(
        self, sid: str, candidates_s: str, profile: str
    ) -> None:
        if sid != self.webrtc.sid or profile != self.profile:
            return
        self.webrtc.on_ice_candidates_new(
            data_format.deserialise(candidates_s),
        )

    async def on_call_setup(self, sid: str, setup_data_s: str, profile: str) -> None:
        if sid != self.webrtc.sid or profile != self.profile:
            return
        setup_data = data_format.deserialise(setup_data_s)
        try:
            role = setup_data["role"]
            sdp = setup_data["sdp"]
        except KeyError:
            self.host.disp(f"Invalid setup data received: {setup_data}", error=True)
            return
        if role == "initiator":
            self.webrtc.on_accepted_call(sdp, profile)
        elif role == "responder":
            await self.webrtc.answer_call(sdp, profile)
        else:
            self.host.disp(
                f"Invalid role received during setup: {setup_data}", error=True
            )
        # we want to be sure that call is ended if user presses `Ctrl + c` or anything
        # else stops the session.
        self.host.add_on_quit_callback(
            lambda: self.host.bridge.call_end(sid, "", profile)
        )

    async def on_call_ended(self, sid: str, data_s: str, profile: str) -> None:
        if sid != self.webrtc.sid or profile != self.profile:
            return
        await self.webrtc.end_call()
        await self.host.a_quit()

    async def start(self):
        """Start a call.

        To be used only if we are initiator
        """
        await self.webrtc.setup_call("initiator")
        self.webrtc.start_pipeline()