Mercurial > libervia-backend
view libervia/frontends/tools/webrtc_models.py @ 4268:51d004e50786
docker (backend): set `+use_local_shared_tmp` in conf.
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 13 Jun 2024 13:22:41 +0200 |
parents | 79c8a70e1813 |
children | 0d7bb4df2343 |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia WebRTC implementation # Copyright (C) 2009-2023 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from collections.abc import Awaitable from dataclasses import dataclass, field from typing import Any, Callable import uuid import gi gi.require_versions({"Gst": "1.0", "GstWebRTC": "1.0"}) from gi.repository import GstWebRTC from pydantic import BaseModel, Field from libervia.frontends.tools import jid @dataclass class CallData: callee: jid.JID sid: str | None = None action_id: str | None = None kwargs: dict[str, Any] = field(default_factory=dict) class SourcesData(BaseModel): """Data for Sources""" class SourcesNone(SourcesData): """No source is used. This is used when the WebRTC connection will be used for data channels only.""" class SourcesAuto(SourcesData): """Automatic Sources (webcam/microphone)""" class SourcesTest(SourcesData): """Test Sources (pattern)""" class SourcesDataChannel(SourcesData): """Sources for transmitting data over Data Channel @param dc_open_cb: Called when Data Channel is open. This callback will be run in a GStreamer thread. """ name: str = Field(default_factory=lambda: str(uuid.uuid4())) dc_open_cb: Callable[[GstWebRTC.WebRTCDataChannel], None] class SourcesPipeline(SourcesData): """Use custom pipeline description as a source. @param video_pipeline: pipeline description of video source. None to use automatic video source (same as SourcesAuto). Empty string to disable video. @param audio_pipeline: pipeline description of audio source. None to use automatic audio source (same as SourcesAuto). Empty string to disable audio. @param video_properties: Elements properties to set. @param audio_properties: Elements properties to set. """ video_pipeline: str|None = None audio_pipeline: str|None = None video_properties: dict = Field(default_factory=lambda: {}) audio_properties: dict = Field(default_factory=lambda: {}) class SinksData(BaseModel): """Data for Sinks""" class SinksNone(SinksData): """No sink is used. This is used when the WebRTC connection will be used for data channels only.""" class SinksAuto(SinksData): """Automatic Sinks (create windows/default audio)""" class SinksApp(SinksData): local_video_cb: Callable remote_video_cb: Callable | None class SinksDataChannel(SinksData): """Sinks for transmitting data over Data Channel @param dc_on_data_channel: Called when Data Channel is created. This callback will be run in a GStreamer thread. """ dc_on_data_channel: ( Callable[[GstWebRTC.WebRTCDataChannel], Awaitable[None]] | None ) = None