diff libervia/desktop_kivy/plugins/plugin_wid_calls.py @ 499:f387992d8e37

plugins: new "call" plugin for A/V calls: this is the base implementation for calls plugin, handling one2one calls. For now, the interface is very basic, call is done by specifying the bare jid of the destinee, then press the "call" button. Incoming calls are automatically accepted. rel 424
author Goffi <goffi@goffi.org>
date Wed, 04 Oct 2023 22:54:36 +0200
parents
children 0480f883f0a6
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/libervia/desktop_kivy/plugins/plugin_wid_calls.py	Wed Oct 04 22:54:36 2023 +0200
@@ -0,0 +1,984 @@
+from dataclasses import dataclass
+import re
+from typing import Optional, Callable
+from urllib.parse import quote_plus
+from functools import partial
+
+# from gi.repository import GLib
+from gi.repository import GObject, Gst, GstWebRTC, GstSdp
+
+try:
+    from gi.overrides import Gst as _
+except ImportError:
+    print(
+        "no GStreamer python overrides available, please install relevant pacakges on "
+        "your system."
+    )
+from kivy.clock import Clock
+from kivy.graphics.texture import Texture
+from kivy.properties import BooleanProperty, ObjectProperty
+from kivy.support import install_gobject_iteration
+from kivy.uix.button import Button
+from kivy.uix.image import Image
+from libervia.backend.core.constants import Const as C
+from libervia.backend.core import log as logging
+from libervia.backend.core.i18n import _
+from libervia.backend.core import exceptions
+from libervia.backend.tools.common import data_format
+from libervia.frontends.quick_frontend import quick_widgets
+from libervia.frontends.tools import jid, aio
+
+from libervia.desktop_kivy import G
+
+from ..core import cagou_widget
+
+log = logging.getLogger(__name__)
+
+install_gobject_iteration()
+
+Gst.init(None)
+
+
+PLUGIN_INFO = {
+    "name": _("calls"),
+    "main": "Calls",
+    "description": _("Audio/Video calls"),
+    "icon_symbol": "phone",
+}
+
+
+@dataclass
+class TextureData:
+    texture: Optional[Texture] = None
+    size: Optional[tuple[int, int]] = None
+
+
+class CallButton(Button):
+    parent_widget = ObjectProperty(None)
+
+
+class VideoStreamWidget(Image):
+    pass
+
+
+class WebRTC:
+    """WebRTC implementation for audio and video communication.
+
+    This class encapsulates the WebRTC functionalities required for initiating and
+    handling audio and video calls.
+
+    @attribute test_mode: A flag to indicate whether the WebRTC instance is in test mode.
+        If true, test video and audio sources will be used. Otherwise first webcam and
+        microphone available will be used.
+    """
+    test_mode: bool = False
+
+
+    def __init__(self, parent_calls: "Calls", profile: str) -> None:
+        self.parent_calls = parent_calls
+        self.profile = profile
+        self.pipeline = None
+        self.reset_instance()
+
+    @property
+    def sdp_set(self):
+        return self._sdp_set
+
+    @sdp_set.setter
+    def sdp_set(self, is_set: bool):
+        self._sdp_set = is_set
+        if is_set:
+            self.on_ice_candidates_new(self.remote_candidates_buffer)
+            for data in self.remote_candidates_buffer.values():
+                data["candidates"].clear()
+
+    @property
+    def media_types(self):
+        if self._media_types is None:
+            raise Exception("self._media_types should not be None!")
+        return self._media_types
+
+    @media_types.setter
+    def media_types(self, new_media_types: dict) -> None:
+        self._media_types = new_media_types
+        self._media_types_inv = {v: k for k, v in new_media_types.items()}
+
+    @property
+    def media_types_inv(self) -> dict:
+        if self._media_types_inv is None:
+            raise Exception("self._media_types_inv should not be None!")
+        return self._media_types_inv
+
+    def get_sdp_mline_index(self, media_type: str) -> int:
+        """Gets the sdpMLineIndex for a given media type.
+
+        @param media_type: The type of the media.
+        """
+        for index, m_type in self.media_types.items():
+            if m_type == media_type:
+                return index
+        raise ValueError(f"Media type '{media_type}' not found")
+
+    def _set_media_types(self, offer_sdp: str) -> None:
+        """Sets media types from offer SDP
+
+        @param offer: RTC session description containing the offer
+        """
+        sdp_lines = offer_sdp.splitlines()
+        media_types = {}
+        mline_index = 0
+
+        for line in sdp_lines:
+            if line.startswith("m="):
+                media_types[mline_index] = line[2 : line.find(" ")]
+                mline_index += 1
+
+        self.media_types = media_types
+
+    def _a_call(self, method, *args, **kwargs):
+        """Call an async method in main thread"""
+
+        def wrapper(__):
+            aio.run_async(method(*args, **kwargs))
+            return False
+
+        Clock.schedule_once(wrapper)
+
+    def get_payload_types(
+        self, sdpmsg, video_encoding: str, audio_encoding: str
+    ) -> dict[str, int | None]:
+        """Find the payload types for the specified video and audio encoding.
+
+        Very simplistically finds the first payload type matching the encoding
+        name. More complex applications will want to match caps on
+        profile-level-id, packetization-mode, etc.
+        """
+        # method coming from gstreamer example (Matthew Waters, Nirbheek Chauhan) at
+        # subprojects/gst-examples/webrtc/sendrecv/gst/webrtc_sendrecv.py
+        video_pt = None
+        audio_pt = None
+        for i in range(0, sdpmsg.medias_len()):
+            media = sdpmsg.get_media(i)
+            for j in range(0, media.formats_len()):
+                fmt = media.get_format(j)
+                if fmt == "webrtc-datachannel":
+                    continue
+                pt = int(fmt)
+                caps = media.get_caps_from_media(pt)
+                s = caps.get_structure(0)
+                encoding_name = s["encoding-name"]
+                if video_pt is None and encoding_name == video_encoding:
+                    video_pt = pt
+                elif audio_pt is None and encoding_name == audio_encoding:
+                    audio_pt = pt
+        return {video_encoding: video_pt, audio_encoding: audio_pt}
+
+    def parse_ice_candidate(self, candidate_string):
+        """Parses the ice candidate string.
+
+        @param candidate_string: The ice candidate string to be parsed.
+        """
+        pattern = re.compile(
+            r"candidate:(?P<foundation>\S+) (?P<component_id>\d+) (?P<transport>\S+) "
+            r"(?P<priority>\d+) (?P<address>\S+) (?P<port>\d+) typ "
+            r"(?P<type>\S+)(?: raddr (?P<rel_addr>\S+) rport "
+            r"(?P<rel_port>\d+))?(?: generation (?P<generation>\d+))?"
+        )
+        match = pattern.match(candidate_string)
+        if match:
+            candidate_dict = match.groupdict()
+
+            # Apply the correct types to the dictionary values
+            candidate_dict["component_id"] = int(candidate_dict["component_id"])
+            candidate_dict["priority"] = int(candidate_dict["priority"])
+            candidate_dict["port"] = int(candidate_dict["port"])
+
+            if candidate_dict["rel_port"]:
+                candidate_dict["rel_port"] = int(candidate_dict["rel_port"])
+
+            if candidate_dict["generation"]:
+                candidate_dict["generation"] = candidate_dict["generation"]
+
+            # Remove None values
+            return {k: v for k, v in candidate_dict.items() if v is not None}
+        else:
+            log.warning(f"can't parse candidate: {candidate_string!r}")
+            return None
+
+    def build_ice_candidate(self, parsed_candidate):
+        """Builds ICE candidate
+
+        @param parsed_candidate: Dictionary containing parsed ICE candidate
+        """
+        base_format = (
+            "candidate:{foundation} {component_id} {transport} {priority} "
+            "{address} {port} typ {type}"
+        )
+
+        if parsed_candidate.get("rel_addr") and parsed_candidate.get("rel_port"):
+            base_format += " raddr {rel_addr} rport {rel_port}"
+
+        if parsed_candidate.get("generation"):
+            base_format += " generation {generation}"
+
+        return base_format.format(**parsed_candidate)
+
+    def extract_ufrag_pwd(self, sdp: str) -> tuple[str, str]:
+        """Retrieves ICE password and user fragment for SDP offer.
+
+        @param sdp: The Session Description Protocol offer string.
+        @return: ufrag and pwd
+        @raise ValueError: Can't extract ufrag and password
+        """
+        ufrag_line = re.search(r"ice-ufrag:(\S+)", sdp)
+        pwd_line = re.search(r"ice-pwd:(\S+)", sdp)
+
+        if ufrag_line and pwd_line:
+            ufrag = self.ufrag = ufrag_line.group(1)
+            pwd = self.pwd = pwd_line.group(1)
+            return ufrag, pwd
+        else:
+            log.error(f"SDP with missing ice-ufrag or ice-pwd:\n{sdp}")
+            raise ValueError("Can't extract ice-ufrag and ice-pwd from SDP")
+
+    def reset_instance(self):
+        """Inits or resets the instance variables to their default state."""
+        self.role: str | None = None
+        if self.pipeline is not None:
+            self.pipeline.set_state(Gst.State.NULL)
+        self.pipeline = None
+        self.texture_map: dict[VideoStreamWidget, TextureData] = {}
+        self._remote_video_pad = None
+        self.sid: str | None = None
+        self.offer: str | None = None
+        self.local_candidates_buffer = {}
+        self.ufrag: str | None = None
+        self.pwd: str | None = None
+        self.callee: str | None = None
+        self._media_types = None
+        self._media_types_inv = None
+        self._sdp_set: bool = False
+        self.remote_candidates_buffer: dict[str, dict[str, list]] = {
+            "audio": {"candidates": []},
+            "video": {"candidates": []},
+        }
+        self._media_types = None
+        self._media_types_inv = None
+
+    async def setup_call(
+        self,
+        role: str,
+        audio_pt: int | None = 96,
+        video_pt: int | None = 97,
+    ) -> None:
+        """Sets up the call.
+
+        This method establishes the Gstreamer pipeline for audio and video communication.
+        The method also manages STUN and TURN server configurations, signal watchers, and
+        various connection handlers for the webrtcbin.
+
+        @param role: The role for the call, either 'initiator' or 'responder'.
+        @param audio_pt: The payload type for the audio stream.
+        @param video_pt: The payload type for the video stream
+
+        @raises NotImplementedError: If audio_pt or video_pt is set to None.
+        @raises AssertionError: If the role is not 'initiator' or 'responder'.
+        """
+        assert role in ("initiator", "responder")
+        self.role = role
+        if audio_pt is None or video_pt is None:
+            raise NotImplementedError("None value is not handled yet")
+
+        if self.test_mode:
+            video_source_elt = "videotestsrc is-live=true pattern=ball"
+            audio_source_elt = "audiotestsrc"
+        else:
+            video_source_elt = "v4l2src"
+            audio_source_elt = "pulsesrc"
+
+        self.gst_pipe_desc = f"""
+        webrtcbin latency=100 name=sendrecv bundle-policy=max-compat
+
+        {video_source_elt} name=video_src
+        ! videorate
+        ! video/x-raw,framerate=30/1
+        ! tee name=t
+
+        t.
+        ! queue max-size-buffers=5 max-size-time=0 max-size-bytes=0 leaky=downstream
+        ! videoconvert
+        ! vp8enc deadline=1 keyframe-max-dist=60
+        ! rtpvp8pay picture-id-mode=15-bit
+        ! application/x-rtp,media=video,encoding-name=VP8,payload={video_pt}
+        ! sendrecv.
+
+        t.
+        ! queue max-size-buffers=5 max-size-time=0 max-size-bytes=0
+        ! videoconvert
+        ! appsink name=local_video_sink emit-signals=true drop=true max-buffers=1 sync=True
+
+        {audio_source_elt} name=audio_src
+        ! queue max-size-buffers=10 max-size-time=0 max-size-bytes=0
+        ! audioconvert
+        ! audioresample
+        ! opusenc audio-type=voice
+        ! rtpopuspay
+        ! application/x-rtp,media=audio,encoding-name=OPUS,payload={audio_pt}
+        ! sendrecv.
+        """
+
+        log.debug(f"Gstreamer pipeline: {self.gst_pipe_desc}")
+
+        # Create the pipeline
+        self.pipeline = Gst.parse_launch(self.gst_pipe_desc)
+        if not self.pipeline:
+            raise exceptions.InternalError("Failed to create Gstreamer pipeline.")
+
+        self.webrtc = self.pipeline.get_by_name("sendrecv")
+
+        self.video_src = self.pipeline.get_by_name("video_src")
+        self.audio_src = self.pipeline.get_by_name("audio_src")
+
+        # set STUN and TURN servers
+        external_disco = data_format.deserialise(
+            await G.host.a_bridge.external_disco_get("", self.profile), type_check=list
+        )
+
+        for server in external_disco:
+            if server["type"] == "stun":
+                if server["transport"] == "tcp":
+                    log.info(
+                        "ignoring TCP STUN server, GStreamer only support one STUN server"
+                    )
+                url = f"stun://{server['host']}:{server['port']}"
+                log.debug(f"adding stun server: {url}")
+                self.webrtc.set_property("stun-server", url)
+            elif server["type"] == "turn":
+                url = "{scheme}://{username}:{password}@{host}:{port}".format(
+                    scheme = "turns" if server["transport"] == "tcp" else "turn",
+                    username=quote_plus(server["username"]),
+                    password=quote_plus(server["password"]),
+                    host=server["host"],
+                    port=server["port"],
+                )
+                log.debug(f"adding turn server: {url}")
+
+                if not self.webrtc.emit("add-turn-server", url):
+                    log.warning(f"Erreur while adding TURN server {url}")
+
+        # local video feedback
+        local_video_sink = self.pipeline.get_by_name("local_video_sink")
+        local_video_sink.set_property("emit-signals", True)
+        local_video_sink.connect(
+            "new-sample",
+            self.on_new_sample,
+            self.update_sample,
+            self.parent_calls.local_video,
+        )
+        local_video_sink_caps = Gst.Caps.from_string(f"video/x-raw,format=RGB")
+        local_video_sink.set_property("caps", local_video_sink_caps)
+
+        # Create bus and associate signal watchers
+        self.bus = self.pipeline.get_bus()
+        if not self.bus:
+            log.error("Failed to get bus from pipeline.")
+            return
+
+        self.bus.add_signal_watch()
+        self.webrtc.connect("pad-added", self.on_pad_added)
+        self.bus.connect("message::error", self.on_bus_error)
+        self.bus.connect("message::eos", self.on_bus_eos)
+        self.webrtc.connect("on-negotiation-needed", self.on_negotiation_needed)
+        self.webrtc.connect("on-ice-candidate", self.on_ice_candidate)
+        self.webrtc.connect(
+            "notify::ice-gathering-state", self.on_ice_gathering_state_change
+        )
+        self.webrtc.connect("notify::ice-connection-state", self.on_ice_connection_state)
+
+    def start_pipeline(self) -> None:
+        """Starts the GStreamer pipeline."""
+        log.debug("starting the pipeline")
+        self.pipeline.set_state(Gst.State.PLAYING)
+
+    def on_negotiation_needed(self, webrtc):
+        """Initiate SDP offer when negotiation is needed."""
+        log.debug("Negotiation needed.")
+        if self.role == "initiator":
+            log.debug("Creating offer…")
+            promise = Gst.Promise.new_with_change_func(self.on_offer_created)
+            self.webrtc.emit("create-offer", None, promise)
+
+    def on_offer_created(self, promise):
+        """Callback for when SDP offer is created."""
+        log.info("on_offer_created called")
+        assert promise.wait() == Gst.PromiseResult.REPLIED
+        reply = promise.get_reply()
+        if reply is None:
+            log.error("Promise reply is None. Offer creation might have failed.")
+            return
+        offer = reply["offer"]
+        self.offer = offer.sdp.as_text()
+        log.info(f"SDP offer created: \n{self.offer}")
+        self._set_media_types(self.offer)
+        promise = Gst.Promise.new()
+        self.webrtc.emit("set-local-description", offer, promise)
+        promise.interrupt()
+        self._a_call(self._start_call)
+
+    def on_answer_set(self, promise):
+        assert promise.wait() == Gst.PromiseResult.REPLIED
+
+    def on_answer_created(self, promise, _, __):
+        """Callback for when SDP answer is created."""
+        assert promise.wait() == Gst.PromiseResult.REPLIED
+        reply = promise.get_reply()
+        answer = reply["answer"]
+        promise = Gst.Promise.new()
+        self.webrtc.emit("set-local-description", answer, promise)
+        promise.interrupt()
+        answer_sdp = answer.sdp.as_text()
+        log.info(f"SDP answer set: \n{answer_sdp}")
+        self.sdp_set = True
+        self._a_call(G.host.a_bridge.call_answer_sdp, self.sid, answer_sdp, self.profile)
+
+    def on_offer_set(self, promise):
+        assert promise.wait() == Gst.PromiseResult.REPLIED
+        promise = Gst.Promise.new_with_change_func(self.on_answer_created, None, None)
+        self.webrtc.emit("create-answer", None, promise)
+
+    def on_new_sample(
+        self,
+        video_sink: Gst.Element,
+        update_sample_method: Callable,
+        video_widget: VideoStreamWidget,
+    ) -> bool:
+        """Handles a new sample from the video sink.
+
+        @param video_sink: The video sink to pull samples from.
+        @param update_sample_method: The method to call for updating the sample.
+        @param video_widget: The video widget (either local_video or remote_video).
+        @return: Always False
+        """
+        sample = video_sink.emit("pull-sample")
+        if sample is None:
+            return False
+
+        try:
+            texture_data = self.texture_map[video_widget]
+        except KeyError:
+            texture_data = self.texture_map[video_widget] = TextureData()
+
+        # Get the current video size
+        video_pad = video_sink.get_static_pad("sink")
+        assert video_pad is not None
+        s = video_pad.get_current_caps().get_structure(0)
+        stream_size = (s.get_value("width"), s.get_value("height"))
+
+        # Compare with the texture size
+        texture_size = texture_data.size
+        if texture_size != stream_size:
+            log.debug(f"sample size update: {texture_size} => {stream_size}")
+            texture_data.size = stream_size
+            # texture needs to be recreated, so we reset the one in texture_data
+            texture_data.texture = None
+
+        Clock.schedule_once(
+            partial(
+                update_sample_method,
+                sample=sample,
+                video_widget=video_widget,
+            )
+        )
+        return False
+
+    def update_sample(
+        self,
+        dt: float,
+        sample: Optional[Gst.Sample],
+        video_widget: VideoStreamWidget,
+    ) -> None:
+        """Updates the video sample.
+
+        This method runs in the main thread.
+
+        @param dt: Time delta since the last call. This is passed by Clock.schedule_once.
+        @param sample: The video sample to update.
+        @param texture_id: identifier of the texture data (e.g. "local" or "remote")
+        @param texture: The texture object.
+        @param texture_size: The texture size.
+        @param video_widget: The video widget.
+        """
+        if sample is None:
+            return
+        try:
+            texture_data = self.texture_map[video_widget]
+        except KeyError:
+            log.warning(f"no texture data found for {video_widget}")
+            return
+
+        if texture_data.texture is None and texture_data.size is not None:
+            log.debug(f"===> creating texture for {video_widget}: {texture_data.size=}")
+            texture = Texture.create(size=texture_data.size, colorfmt="rgb")
+            assert texture is not None
+            texture_data.texture = texture
+            texture.flip_vertical()
+            video_widget.texture = texture
+
+        mapinfo = None
+        buf = None
+        try:
+            buf = sample.get_buffer()
+            _, mapinfo = buf.map(Gst.MapFlags.READ)
+
+            buffer = mapinfo.data.tobytes()
+
+            if texture_data.texture is None:
+                log.debug("can't copy the buffer, texture is None")
+                return
+            texture_data.texture.blit_buffer(buffer, colorfmt="rgb")
+            assert video_widget.canvas is not None
+            video_widget.canvas.ask_update()
+            buf.unmap(mapinfo)
+        finally:
+            if buf is not None and mapinfo is not None:
+                buf.unmap(mapinfo)
+
+    def on_remote_decodebin_stream(self, _, pad: Gst.Pad) -> None:
+        """Handle the stream from the remote decodebin.
+
+        This method processes the incoming stream from the remote decodebin, determining
+        whether it's video or audio. It then sets up the appropriate GStreamer elements
+        for video/audio processing and adds them to the pipeline.
+
+        @param pad: The Gst.Pad from the remote decodebin producing the stream.
+        """
+        assert self.pipeline is not None
+        if not pad.has_current_caps():
+            log.error(f"{pad} has no caps, ignoring")
+            return
+
+        self.pipeline.set_state(Gst.State.PAUSED)
+        caps = pad.get_current_caps()
+        assert len(caps)
+        s = caps[0]
+        name = s.get_name()
+        log.debug(f"====> NAME START: {name}")
+
+        q = Gst.ElementFactory.make("queue")
+        q.set_property("max-size-time", 0)
+        q.set_property("max-size-bytes", 0)
+        q.set_property("max-size-buffers", 5)
+
+        if name.startswith("video"):
+            log.debug("===> VIDEO OK")
+
+            self._remote_video_pad = pad
+
+            # Check and log the original size of the video
+            width = s.get_int("width").value
+            height = s.get_int("height").value
+            log.info(f"Original video size: {width}x{height}")
+
+            # This is a fix for an issue found with Movim on desktop: a non standard
+            # resolution is used (990x557) resulting in bad alignement and no color in
+            # rendered image
+            adjust_resolution = width % 4 != 0 or height % 4 != 0
+            if adjust_resolution:
+                log.info("non standard resolution, we need to adjust size")
+                width = (width + 3) // 4 * 4
+                height = (height + 3) // 4 * 4
+                log.info(f"Adjusted video size: {width}x{height}")
+
+            conv = Gst.ElementFactory.make("videoconvert")
+            remote_video_sink = Gst.ElementFactory.make("appsink")
+
+            appsink_caps = Gst.Caps.from_string("video/x-raw,format=RGB")
+            remote_video_sink.set_property("caps", appsink_caps)
+
+            remote_video_sink.set_property("emit-signals", True)
+            remote_video_sink.set_property("drop", True)
+            remote_video_sink.set_property("max-buffers", 1)
+            remote_video_sink.set_property("sync", True)
+            remote_video_sink.connect(
+                "new-sample",
+                self.on_new_sample,
+                self.update_sample,
+                self.parent_calls.remote_video,
+            )
+
+            if adjust_resolution:
+                videoscale = Gst.ElementFactory.make("videoscale")
+                adjusted_caps = Gst.Caps.from_string(f"video/x-raw,width={width},height={height}")
+                capsfilter = Gst.ElementFactory.make("capsfilter")
+                capsfilter.set_property("caps", adjusted_caps)
+
+                self.pipeline.add(q, conv, videoscale, capsfilter, remote_video_sink)
+                self.pipeline.sync_children_states()
+                pad.link(q.get_static_pad("sink"))
+                q.link(conv)
+                conv.link(videoscale)
+                videoscale.link(capsfilter)
+                capsfilter.link(remote_video_sink)
+            else:
+                self.pipeline.add(q, conv, remote_video_sink)
+                self.pipeline.sync_children_states()
+                pad.link(q.get_static_pad("sink"))
+                q.link(conv)
+                conv.link(remote_video_sink)
+
+        elif name.startswith("audio"):
+            log.debug("===> Audio OK")
+            conv = Gst.ElementFactory.make("audioconvert")
+            resample = Gst.ElementFactory.make("audioresample")
+            remote_audio_sink = Gst.ElementFactory.make("autoaudiosink")
+            self.pipeline.add(q, conv, resample, remote_audio_sink)
+            self.pipeline.sync_children_states()
+            pad.link(q.get_static_pad("sink"))
+            q.link(conv)
+            conv.link(resample)
+            resample.link(remote_audio_sink)
+        self.pipeline.set_state(Gst.State.PLAYING)
+
+    def on_pad_added(self, __, pad: Gst.Pad) -> None:
+        """Handle the addition of a new pad to the element.
+
+        When a new source pad is added to the element, this method creates a decodebin,
+        connects it to handle the stream, and links the pad to the decodebin.
+
+        @param __: Placeholder for the signal source. Not used in this method.
+        @param pad: The newly added pad.
+        """
+
+        log.debug("on_pad_added")
+        if pad.direction != Gst.PadDirection.SRC:
+            return
+
+        decodebin = Gst.ElementFactory.make("decodebin")
+        decodebin.connect("pad-added", self.on_remote_decodebin_stream)
+        self.pipeline.add(decodebin)
+        decodebin.sync_state_with_parent()
+        pad.link(decodebin.get_static_pad("sink"))
+
+    async def _start_call(self) -> None:
+        """Initiate the call.
+
+        Initiates a call with the callee using the stored offer. If there are any buffered
+        local ICE candidates, they are sent as part of the initiation.
+        """
+        assert self.callee is not None
+        self.sid = await G.host.a_bridge.call_start(
+            str(self.callee), data_format.serialise({"sdp": self.offer}), self.profile
+        )
+        if self.local_candidates_buffer:
+            log.debug(
+                f"sending buffered local ICE candidates: {self.local_candidates_buffer}"
+            )
+            if self.pwd is None:
+                sdp = self.webrtc.props.local_description.sdp.as_text()
+                self.extract_ufrag_pwd(sdp)
+            ice_data = {}
+            for media_type, candidates in self.local_candidates_buffer.items():
+                ice_data[media_type] = {
+                    "ufrag": self.ufrag,
+                    "pwd": self.pwd,
+                    "candidates": candidates,
+                }
+            await G.host.a_bridge.ice_candidates_add(
+                self.sid, data_format.serialise(ice_data), self.profile
+            )
+            self.local_candidates_buffer.clear()
+
+    def _remote_sdp_set(self, promise) -> None:
+        assert promise.wait() == Gst.PromiseResult.REPLIED
+        self.sdp_set = True
+
+    def on_accepted_call(self, sdp: str, profile: str) -> None:
+        """Outgoing call has been accepted.
+
+        @param sdp: The SDP answer string received from the other party.
+        @param profile: Profile used for the call.
+        """
+        log.debug(f"SDP answer received: \n{sdp}")
+
+        __, sdpmsg = GstSdp.SDPMessage.new_from_text(sdp)
+        answer = GstWebRTC.WebRTCSessionDescription.new(
+            GstWebRTC.WebRTCSDPType.ANSWER, sdpmsg
+        )
+        promise = Gst.Promise.new_with_change_func(self._remote_sdp_set)
+        self.webrtc.emit("set-remote-description", answer, promise)
+
+    async def answer_call(self, sdp: str, profile: str) -> None:
+        """Answer an incoming call
+
+        @param sdp: The SDP offer string received from the initiator.
+        @param profile: Profile used for the call.
+
+        @raise AssertionError: Raised when either "VP8" or "OPUS" is not present in
+            payload types.
+        """
+        log.debug(f"SDP offer received: \n{sdp}")
+        self._set_media_types(sdp)
+        __, offer_sdp_msg = GstSdp.SDPMessage.new_from_text(sdp)
+        payload_types = self.get_payload_types(
+            offer_sdp_msg, video_encoding="VP8", audio_encoding="OPUS"
+        )
+        assert "VP8" in payload_types
+        assert "OPUS" in payload_types
+        await self.setup_call(
+            "responder", audio_pt=payload_types["OPUS"], video_pt=payload_types["VP8"]
+        )
+        self.start_pipeline()
+        offer = GstWebRTC.WebRTCSessionDescription.new(
+            GstWebRTC.WebRTCSDPType.OFFER, offer_sdp_msg
+        )
+        promise = Gst.Promise.new_with_change_func(self.on_offer_set)
+        self.webrtc.emit("set-remote-description", offer, promise)
+
+    def on_ice_candidate(self, webrtc, mline_index, candidate_sdp):
+        """Handles the on-ice-candidate signal of webrtcbin.
+
+        @param webrtc: The webrtcbin element.
+        @param mlineindex: The mline index.
+        @param candidate: The ICE candidate.
+        """
+        log.debug(
+            f"Local ICE candidate. MLine Index: {mline_index}, Candidate: {candidate_sdp}"
+        )
+        parsed_candidate = self.parse_ice_candidate(candidate_sdp)
+        try:
+            media_type = self.media_types[mline_index]
+        except KeyError:
+            raise exceptions.InternalError("can't find media type")
+
+        if self.sid is None:
+            log.debug("buffering local ICE candidate")
+            self.local_candidates_buffer.setdefault(media_type, []).append(
+                parsed_candidate
+            )
+        else:
+            sdp = self.webrtc.props.local_description.sdp.as_text()
+            assert sdp is not None
+            ufrag, pwd = self.extract_ufrag_pwd(sdp)
+            ice_data = {"ufrag": ufrag, "pwd": pwd, "candidates": [parsed_candidate]}
+            self._a_call(
+                G.host.a_bridge.ice_candidates_add,
+                self.sid,
+                data_format.serialise({media_type: ice_data}),
+                self.profile,
+            )
+
+    def on_ice_candidates_new(self, candidates: dict) -> None:
+        """Handle new ICE candidates.
+
+        @param candidates: A dictionary containing media types ("audio" or "video") as
+            keys and corresponding ICE data as values.
+
+        @raise exceptions.InternalError: Raised when sdp mline index is not found.
+        """
+        if not self.sdp_set:
+            log.debug("buffering remote ICE candidate")
+            for media_type in ("audio", "video"):
+                media_candidates = candidates.get(media_type)
+                if media_candidates:
+                    buffer = self.remote_candidates_buffer[media_type]
+                    buffer["candidates"].extend(media_candidates["candidates"])
+            return
+        for media_type, ice_data in candidates.items():
+            for candidate in ice_data["candidates"]:
+                candidate_sdp = self.build_ice_candidate(candidate)
+                try:
+                    mline_index = self.get_sdp_mline_index(media_type)
+                except Exception as e:
+                    raise exceptions.InternalError(f"Can't find sdp mline index: {e}")
+                self.webrtc.emit("add-ice-candidate", mline_index, candidate_sdp)
+                log.debug(
+                    f"Remote ICE candidate added. MLine Index: {mline_index}, "
+                    f"{candidate_sdp}"
+                )
+
+    def on_ice_gathering_state_change(self, pspec, __):
+        state = self.webrtc.get_property("ice-gathering-state")
+        log.debug(f"ICE gathering state changed to {state}")
+
+    def on_ice_connection_state(self, pspec, __):
+        state = self.webrtc.props.ice_connection_state
+        if state == GstWebRTC.WebRTCICEConnectionState.FAILED:
+            log.error("ICE connection failed")
+        log.info(f"ICE connection state changed to {state}")
+
+    def on_bus_error(self, bus: Gst.Bus, message: Gst.Message) -> None:
+        """Handles the GStreamer bus error messages.
+
+        @param bus: The GStreamer bus.
+        @param message: The error message.
+        """
+        err, debug = message.parse_error()
+        log.error(f"Error from {message.src.get_name()}: {err.message}")
+        log.error(f"Debugging info: {debug}")
+
+    def on_bus_eos(self, bus: Gst.Bus, message: Gst.Message) -> None:
+        """Handles the GStreamer bus eos messages.
+
+        @param bus: The GStreamer bus.
+        @param message: The eos message.
+        """
+        log.info("End of stream")
+
+    async def end_call(self) -> None:
+        """Stop streaming and clean instance"""
+        self.reset_instance()
+
+
+class Calls(quick_widgets.QuickWidget, cagou_widget.LiberviaDesktopKivyWidget):
+    remote_video = ObjectProperty()
+    local_video = ObjectProperty()
+    use_header_input = True
+    signals_registered = False
+    in_call = BooleanProperty(False)
+
+    def __init__(self, host, target, profiles):
+        quick_widgets.QuickWidget.__init__(self, G.host, target, profiles)
+        cagou_widget.LiberviaDesktopKivyWidget.__init__(self)
+        call_btn = CallButton(
+            parent_widget=self, on_press=lambda *__: aio.run_async(self.toggle_call())
+        )
+        self.header_input_add_extra(call_btn)
+        self.webrtc = WebRTC(self, self.profile)
+
+        if not self.__class__.signals_registered:
+            log.debug("registering signals")
+            G.host.register_signal(
+                "ice_candidates_new",
+                handler=self.__class__.ice_candidates_new_handler,
+                iface="plugin",
+            )
+            G.host.register_signal(
+                "call_setup", handler=self.__class__.call_setup_handler, iface="plugin"
+            )
+            G.host.register_signal(
+                "call_ended", handler=self.__class__.call_ended_handler, iface="plugin"
+            )
+            G.host.register_action_handler(
+                C.META_TYPE_CALL, self.__class__.action_new_handler
+            )
+            self.__class__.signals_registered = True
+
+        self.reset_instance()
+
+    @property
+    def sid(self):
+        return self.webrtc.sid
+
+    async def toggle_call(self):
+        """Toggle between making a call and hanging up.
+
+        This function will initiate or terminate a call based on the call state.
+        """
+        if self.sid is None:
+            # Initiate the call
+            log.info("initiating call")
+            callee = jid.JID(self.header_input.text.strip())
+            self.webrtc.callee = callee
+            await self.webrtc.setup_call("initiator")
+            self.webrtc.start_pipeline()
+            self.in_call = True
+        else:
+            # Terminate the call
+            sid = self.sid
+            await self.end_call({"reason": "terminated"}, self.profile)
+            await G.host.a_bridge.call_end(sid, "", self.profile)
+            self.in_call = False
+
+    async def on_incoming_call(
+        self, action_data: dict, action_id: str, profile: str
+    ) -> None:
+        """Handle incoming call notifications.
+
+        @param action_data: A dictionary containing data related to the incoming call
+        @param action_id: The ID corresponding to the incoming call action.
+        @param profile: The profile associated with the incoming call.
+        """
+        if self.sid is not None:
+            # FIXME: show a double remote call notification
+            log.warning("Ignoring new remote call as we are already in a call.")
+            return
+        sid = action_data["session_id"]
+        self.in_call = True
+        self.webrtc.sid = sid
+        # FIXME: we accept all remote calls for now, will be changed when proper UI/UX
+        #   will be implemented
+        log.warning("auto-accepting remote call")
+        await G.host.a_bridge.action_launch(
+            action_id, data_format.serialise({"cancelled": False}), profile
+        )
+
+    async def on_call_setup(self, setup_data: dict, profile: str) -> None:
+        """Call has been accepted, connection can be established
+
+        @param session_id: Session identifier
+        @param setup_data: Data with following keys:
+            role: initiator or responser
+            sdp: Session Description Protocol data
+        @param profile: Profile associated
+        """
+        try:
+            role = setup_data["role"]
+            sdp = setup_data["sdp"]
+        except KeyError:
+            log.error(f"Invalid setup data received: {setup_data}")
+            return
+        if role == "initiator":
+            self.webrtc.on_accepted_call(sdp, profile)
+        elif role == "responder":
+            await self.webrtc.answer_call(sdp, profile)
+        else:
+            log.error(f"Invalid role received during setup: {setup_data}")
+
+    def reset_instance(self) -> None:
+        self.in_call = False
+        self.local_video.texture = None
+        self.remote_video.texture = None
+
+    async def end_call(self, data: dict, profile: str) -> None:
+        """End current call if any and reset the instance
+
+        @param data: end call data, often includes the reason of the call ending.
+        """
+        await self.webrtc.end_call()
+        self.reset_instance()
+
+    @classmethod
+    def ice_candidates_new_handler(
+        cls, sid: str, candidates_s: str, profile: str
+    ) -> None:
+        for wid in G.host.get_visible_list(cls):
+            if profile not in wid.profiles or sid != wid.sid:
+                continue
+            wid.webrtc.on_ice_candidates_new(
+                data_format.deserialise(candidates_s),
+            )
+
+    @classmethod
+    def call_setup_handler(cls, sid: str, setup_data_s: str, profile: str) -> None:
+        for wid in G.host.get_visible_list(cls):
+            if profile not in wid.profiles or sid != wid.sid:
+                continue
+            aio.run_async(
+                wid.on_call_setup(data_format.deserialise(setup_data_s), profile)
+            )
+
+    @classmethod
+    def call_ended_handler(cls, sid: str, data_s: str, profile: str) -> None:
+        for wid in G.host.get_visible_list(cls):
+            if profile not in wid.profiles or sid != wid.sid:
+                continue
+            aio.run_async(wid.end_call(data_format.deserialise(data_s), profile))
+
+    @classmethod
+    def action_new_handler(
+        cls, action_data: dict, action_id: str, security_limit: int, profile: str
+    ) -> None:
+        for wid in G.host.get_visible_list(cls):
+            if profile not in wid.profiles:
+                continue
+            aio.run_async(wid.on_remote_call(action_data, action_id, profile))