Mercurial > libervia-backend
view libervia/frontends/tools/webrtc_models.py @ 4338:7c0b7ecb816f
component email gateway: Add a pubsub service:
a pubsub service is implemented to retrieve and manage attachments using XEP-0498.
rel 453
author | Goffi <goffi@goffi.org> |
---|---|
date | Tue, 03 Dec 2024 00:13:23 +0100 |
parents | 0d7bb4df2343 |
children |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia WebRTC implementation # Copyright (C) 2009-2023 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from collections.abc import Awaitable from dataclasses import dataclass, field from typing import Any, Callable import uuid import gi gi.require_versions({"Gst": "1.0", "GstWebRTC": "1.0"}) from gi.repository import GstWebRTC from pydantic import BaseModel, Field from libervia.frontends.tools import jid @dataclass class CallData: callee: jid.JID sid: str | None = None action_id: str | None = None kwargs: dict[str, Any] = field(default_factory=dict) class SourcesData(BaseModel): """Data for Sources""" class SourcesNone(SourcesData): """No source is used. This is used when the WebRTC connection will be used for data channels only.""" class SourcesAuto(SourcesData): """Automatic Sources (webcam/microphone)""" class SourcesTest(SourcesData): """Test Sources (pattern)""" class SourcesDataChannel(SourcesData): """Sources for transmitting data over Data Channel @param dc_open_cb: Called when Data Channel is open. This callback will be run in a GStreamer thread. """ name: str = Field(default_factory=lambda: str(uuid.uuid4())) dc_open_cb: Callable[[GstWebRTC.WebRTCDataChannel], None] class SourcesPipeline(SourcesData): """Use custom pipeline description as a source. @param video_pipeline: pipeline description of video source. None to use automatic video source (same as SourcesAuto). Empty string to disable video. @param audio_pipeline: pipeline description of audio source. None to use automatic audio source (same as SourcesAuto). Empty string to disable audio. @param video_properties: Elements properties to set. @param audio_properties: Elements properties to set. """ video_pipeline: str | None = None audio_pipeline: str | None = None video_properties: dict = Field(default_factory=lambda: {}) audio_properties: dict = Field(default_factory=lambda: {}) class SinksData(BaseModel): """Data for Sinks""" class SinksNone(SinksData): """No sink is used. This is used when the WebRTC connection will be used for data channels only.""" class SinksAuto(SinksData): """Automatic Sinks (create windows/default audio)""" class SinksApp(SinksData): local_video_cb: Callable remote_video_cb: Callable | None class SinksDataChannel(SinksData): """Sinks for transmitting data over Data Channel @param dc_on_data_channel: Called when Data Channel is created. This callback will be run in a GStreamer thread. """ dc_on_data_channel: ( Callable[[GstWebRTC.WebRTCDataChannel], Awaitable[None]] | None ) = None