Mercurial > libervia-backend
changeset 4240:79c8a70e1813
backend, frontend: prepare remote control:
This is a series of changes necessary to prepare the implementation of remote control
feature:
- XEP-0166: add a `priority` attribute to `ApplicationData`: this is needed when several
applications are working in a same session, to know which one must be handled first.
Will be used to make Remote Control have precedence over Call content.
- XEP-0166: `_call_plugins` is now async and is not used with `DeferredList` anymore: the
benefit to have methods called in parallels is very low, and it cause a lot of trouble
as we can't predict order. Methods are now called sequentially so workflow can be
predicted.
- XEP-0167: fix `senders` XMPP attribute <=> SDP mapping
- XEP-0234: preflight acceptance key is now `pre-accepted` instead of `file-accepted`, so
the same key can be used with other jingle applications.
- XEP-0167, XEP-0343: move some method to XEP-0167
- XEP-0353: use new `priority` feature to call preflight methods of applications according
to it.
- frontend (webrtc): refactor the sources/sink handling with a more flexible mechanism
based on Pydantic models. It is now possible to have has many Data Channel as necessary,
to have them in addition to A/V streams, to specify manually GStreamer sources and
sinks, etc.
- frontend (webrtc): rework of the pipeline to reduce latency.
- frontend: new `portal_desktop` method. Screenshare portal handling has been moved there,
and RemoteDesktop portal has been added.
- frontend (webrtc): fix `extract_ufrag_pwd` method.
rel 436
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 11 May 2024 13:52:41 +0200 |
parents | a38559e6d6e2 |
children | 898db6daf0d0 |
files | libervia/backend/plugins/plugin_xep_0166/__init__.py libervia/backend/plugins/plugin_xep_0166/models.py libervia/backend/plugins/plugin_xep_0167/__init__.py libervia/backend/plugins/plugin_xep_0167/mapping.py libervia/backend/plugins/plugin_xep_0234.py libervia/backend/plugins/plugin_xep_0343.py libervia/backend/plugins/plugin_xep_0353.py libervia/backend/tools/common/data_format.py libervia/cli/call_gui.py libervia/cli/call_tui.py libervia/cli/cmd_call.py libervia/cli/cmd_file.py libervia/frontends/tools/portal_desktop.py libervia/frontends/tools/webrtc.py libervia/frontends/tools/webrtc_file.py libervia/frontends/tools/webrtc_models.py libervia/frontends/tools/webrtc_screenshare.py |
diffstat | 17 files changed, 1167 insertions(+), 654 deletions(-) [+] |
line wrap: on
line diff
--- a/libervia/backend/plugins/plugin_xep_0166/__init__.py Sat May 11 13:25:45 2024 +0200 +++ b/libervia/backend/plugins/plugin_xep_0166/__init__.py Sat May 11 13:52:41 2024 +0200 @@ -327,29 +327,24 @@ ## errors which doesn't imply a stanza sending ## - def _iq_error(self, failure_, sid, client): - """Called when we got an <iq/> error - - @param failure_(failure.Failure): the exceptions raised - @param sid(unicode): jingle session id - """ - log.warning( - "Error while sending jingle <iq/> stanza: {failure_}".format( - failure_=failure_.value - ) - ) - self.delete_session(client, sid) - - def _jingle_error_cb(self, failure_, session, request, client): + def _jingle_error_cb( + self, + failure_: failure.Failure|BaseException, + session: dict, + request: domish.Element, + client: SatXMPPEntity + ) -> defer.Deferred: """Called when something is going wrong while parsing jingle request The error condition depend of the exceptions raised: exceptions.DataError raise a bad-request condition - @param fail(failure.Failure): the exceptions raised - @param session(dict): data of the session - @param request(domsih.Element): jingle request - @param client: %(doc_client)s + @param fail: the exceptions raised + @param session: data of the session + @param request: jingle request + @param client: SatXMPPEntity instance """ + if not isinstance(failure_, failure.Failure): + failure_ = failure.Failure(failure_) del session["jingle_elt"] log.warning(f"Error while processing jingle request [{client.profile}]") if isinstance(failure_.value, defer.FirstError): @@ -369,7 +364,8 @@ def register_application( self, namespace: str, - handler: BaseApplicationHandler + handler: BaseApplicationHandler, + priority: int = 0 ) -> None: """Register an application plugin @@ -390,13 +386,21 @@ called on several action to negociate the application or transport - jingle_terminate: called on session terminate, with reason_elt May be used to clean session + @param priority: Priority of the application. It is used when several contents + from different applications are used to determine in which order methods must + be called. An example use case is for remote control: when using remote + control, contents with call application may be used at the same time, but the + overall session is a remote control one (and so, a remote control confirmation + must be requested to the user, not a call one). + If two applications have the same priority, methods are called in the same + order as the content appears. """ if namespace in self._applications: raise exceptions.ConflictError( f"Trying to register already registered namespace {namespace}" ) self._applications[namespace] = ApplicationData( - namespace=namespace, handler=handler + namespace=namespace, handler=handler, priority=priority ) log.debug("new jingle application registered") @@ -687,10 +691,10 @@ try: await iq_elt.send() - except Exception as e: - failure_ = failure.Failure(e) - self._iq_error(failure_, sid, client) - raise failure_ + except Exception: + log.exception("Error while sending jingle <iq/> stanza") + self.delete_session(client, sid) + raise return sid def delayed_content_terminate(self, *args, **kwargs): @@ -823,13 +827,13 @@ elif action == XEP_0166.A_SESSION_ACCEPT: await self.on_session_accept(client, request, jingle_elt, session) elif action == XEP_0166.A_SESSION_INFO: - self.on_session_info(client, request, jingle_elt, session) + await self.on_session_info(client, request, jingle_elt, session) elif action == XEP_0166.A_TRANSPORT_INFO: self.on_transport_info(client, request, jingle_elt, session) elif action == XEP_0166.A_TRANSPORT_REPLACE: await self.on_transport_replace(client, request, jingle_elt, session) elif action == XEP_0166.A_TRANSPORT_ACCEPT: - self.on_transport_accept(client, request, jingle_elt, session) + await self.on_transport_accept(client, request, jingle_elt, session) elif action == XEP_0166.A_TRANSPORT_REJECT: self.on_transport_reject(client, request, jingle_elt, session) else: @@ -983,7 +987,7 @@ """ return elt - def _call_plugins( + async def _call_plugins( self, client: SatXMPPEntity, action: str, @@ -995,7 +999,7 @@ delete: bool = True, elements: bool = True, force_element: Optional[domish.Element] = None - ) -> List[defer.Deferred]: + ) -> list[Any]: """Call application and transport plugin methods for all contents @param action: jingle action name @@ -1006,7 +1010,8 @@ None to ignore @param app_default_cb: default callback to use if plugin has not app_method_name None to raise an exception instead - @param transp_default_cb: default callback to use if plugin has not transp_method_name + @param transp_default_cb: default callback to use if plugin has not + transp_method_name None to raise an exception instead @param delete: if True, remove desc_elt and transport_elt from session ignored if elements is False @@ -1015,11 +1020,11 @@ after a request (i.e. on <iq> result or error) @param force_element: if elements is False, it is used as element parameter else it is ignored - @return : list of launched Deferred + @return : list of launched methods results @raise exceptions.NotFound: method is not implemented """ contents_dict = session["contents"] - defers_list = [] + results = [] for content_name, content_data in contents_dict.items(): for method_name, handler_key, default_cb, elt_name in ( (app_method_name, "application", app_default_cb, "desc_elt"), @@ -1042,12 +1047,12 @@ elt = content_data.pop(elt_name) if delete else content_data[elt_name] else: elt = force_element - d = utils.as_deferred( + result = await utils.as_deferred( method, client, action, session, content_name, elt ) - defers_list.append(d) + results.append(result) - return defers_list + return results async def on_session_initiate( self, @@ -1097,42 +1102,50 @@ ): return - await defer.DeferredList(self._call_plugins( + await self._call_plugins( client, XEP_0166.A_PREPARE_CONFIRMATION, session, delete=False - )) + ) # we now request each application plugin confirmation # and if all are accepted, we can accept the session - confirm_defers = self._call_plugins( - client, - XEP_0166.A_SESSION_INITIATE, - session, - "jingle_request_confirmation", - None, - self.jingle_request_confirmation_default, - delete=False, - ) + try: + confirmations = await self._call_plugins( + client, + XEP_0166.A_SESSION_INITIATE, + session, + "jingle_request_confirmation", + None, + self.jingle_request_confirmation_default, + delete=False, + ) + except Exception as e: + await self._jingle_error_cb(e, session, jingle_elt, client) + else: + await self._confirmation_cb(confirmations, session, jingle_elt, client) - confirm_dlist = defer.gatherResults(confirm_defers) - confirm_dlist.addCallback(self._confirmation_cb, session, jingle_elt, client) - confirm_dlist.addErrback(self._jingle_error_cb, session, request, client) - - def _confirmation_cb(self, confirm_results, session, jingle_elt, client): + async def _confirmation_cb( + self, + confirmations: list[bool], + session: dict, + jingle_elt: domish.Element, + client: SatXMPPEntity + ) -> None: """Method called when confirmation from user has been received This method is only called for the responder - @param confirm_results(list[bool]): all True if session is accepted - @param session(dict): session data - @param jingle_elt(domish.Element): jingle data of this session - @param client: %(doc_client)s + @param confirm_results: all True if session is accepted + @param session: session data + @param jingle_elt: jingle data of this session + @param client: SatXMPPEntity """ del session["jingle_elt"] - confirmed = all(confirm_results) + confirmed = all(confirmations) if not confirmed: - return self.terminate(client, XEP_0166.REASON_DECLINE, session) + await self.terminate(client, XEP_0166.REASON_DECLINE, session) + return iq_elt, jingle_elt = self._build_jingle_elt( client, session, XEP_0166.A_SESSION_ACCEPT @@ -1142,68 +1155,56 @@ # contents - def addElement(domish_elt, content_elt): - content_elt.addChild(domish_elt) - - defers_list = [] + try: + for content_name, content_data in session["contents"].items(): + content_elt = jingle_elt.addElement("content") + content_elt["creator"] = XEP_0166.ROLE_INITIATOR + content_elt["name"] = content_name - for content_name, content_data in session["contents"].items(): - content_elt = jingle_elt.addElement("content") - content_elt["creator"] = XEP_0166.ROLE_INITIATOR - content_elt["name"] = content_name - - application = content_data["application"] - app_session_accept_cb = application.handler.jingle_handler + application = content_data["application"] + app_session_accept_cb = application.handler.jingle_handler - app_d = utils.as_deferred( - app_session_accept_cb, - client, - XEP_0166.A_SESSION_INITIATE, - session, - content_name, - content_data.pop("desc_elt"), - ) - app_d.addCallback(addElement, content_elt) - defers_list.append(app_d) - - transport = content_data["transport"] - transport_session_accept_cb = transport.handler.jingle_handler + updated_desc_elt = await utils.as_deferred( + app_session_accept_cb, + client, + XEP_0166.A_SESSION_INITIATE, + session, + content_name, + content_data.pop("desc_elt"), + ) + content_elt.addChild(updated_desc_elt) - transport_d = utils.as_deferred( - transport_session_accept_cb, - client, - XEP_0166.A_SESSION_INITIATE, - session, - content_name, - content_data.pop("transport_elt"), - ) - transport_d.addCallback(addElement, content_elt) - defers_list.append(transport_d) + transport = content_data["transport"] + transport_session_accept_cb = transport.handler.jingle_handler - d_list = defer.DeferredList(defers_list) - d_list.addCallback( - lambda __: self._call_plugins( + updated_transport_elt = await utils.as_deferred( + transport_session_accept_cb, + client, + XEP_0166.A_SESSION_INITIATE, + session, + content_name, + content_data.pop("transport_elt"), + ) + content_elt.addChild(updated_transport_elt) + + await self._call_plugins( client, XEP_0166.A_PREPARE_RESPONDER, session, app_method_name=None, elements=False, ) - ) - d_list.addCallback(lambda __: session.pop("jingle_elt")) - d_list.addCallback(lambda __: iq_elt.send()) + session.pop("jingle_elt") + await iq_elt.send() - def change_state(__, session): session["state"] = XEP_0166.STATE_ACTIVE - d_list.addCallback(change_state, session) - d_list.addCallback( - lambda __: self._call_plugins( + await self._call_plugins( client, XEP_0166.A_ACCEPTED_ACK, session, elements=False ) - ) - d_list.addErrback(self._iq_error, session["id"], client) - return d_list + except Exception: + log.exception("Error while sending jingle <iq/> stanza") + self.delete_session(client, session["id"]) def get_reason_elt(self, parent_elt: domish.Element) -> domish.Element: """Find a <reason> element in parent_elt @@ -1247,7 +1248,7 @@ log.debug(f"Jingle Session {session['id']} terminated") reason_elt = self.get_reason_elt(jingle_elt) - terminate_defers = self._call_plugins( + await self._call_plugins( client, XEP_0166.A_SESSION_TERMINATE, session, @@ -1258,10 +1259,9 @@ elements=False, force_element=reason_elt, ) - terminate_dlist = defer.DeferredList(terminate_defers) - terminate_dlist.addCallback(lambda __: self.delete_session(client, session["id"])) - client.send(xmlstream.toResponse(request, "result")) + self.delete_session(client, session["id"]) + await client.a_send(xmlstream.toResponse(request, "result")) async def on_session_accept(self, client, request, jingle_elt, session): """Method called once session is accepted @@ -1285,37 +1285,22 @@ session["state"] = XEP_0166.STATE_ACTIVE session["jingle_elt"] = jingle_elt - await defer.DeferredList(self._call_plugins( + await self._call_plugins( client, XEP_0166.A_PREPARE_INITIATOR, session, delete=False - )) + ) - negociate_defers = [] - negociate_defers = self._call_plugins(client, XEP_0166.A_SESSION_ACCEPT, session) - - negociate_dlist = defer.gatherResults(negociate_defers) + await self._call_plugins(client, XEP_0166.A_SESSION_ACCEPT, session) # after negociations we start the transfer - negociate_dlist.addCallback( - lambda __: self._call_plugins( - client, XEP_0166.A_START, session, app_method_name=None, elements=False - ) + await self._call_plugins( + client, XEP_0166.A_START, session, app_method_name=None, elements=False ) - negociate_dlist.addCallback(lambda __: session.pop("jingle_elt")) + session.pop("jingle_elt") - def _on_session_cb(self, result, client, request, jingle_elt, session): - client.send(xmlstream.toResponse(request, "result")) - - def _on_session_eb(self, failure_, client, request, jingle_elt, session): - log.error("Error while handling on_session_info: {}".format(failure_.value)) - # XXX: only error managed so far, maybe some applications/transports need more - self.sendError( - client, "feature-not-implemented", None, request, "unsupported-info" - ) - - def on_session_info(self, client, request, jingle_elt, session): + async def on_session_info(self, client, request, jingle_elt, session): """Method called when a session-info action is received from other peer This method is only called for initiator @@ -1330,9 +1315,10 @@ return try: - # XXX: session-info is most likely only used for application, so we don't call transport plugins - # if a future transport use it, this behaviour must be adapted - defers = self._call_plugins( + # XXX: session-info is most likely only used for application, so we don't call + # transport plugins if a future transport use it, this behaviour must be + # adapted + await self._call_plugins( client, XEP_0166.A_SESSION_INFO, session, @@ -1342,12 +1328,15 @@ force_element=jingle_elt, ) except exceptions.NotFound as e: - self._on_session_eb(failure.Failure(e), client, request, jingle_elt, session) - return - - dlist = defer.DeferredList(defers, fireOnOneErrback=True) - dlist.addCallback(self._on_session_cb, client, request, jingle_elt, session) - dlist.addErrback(self._on_session_cb, client, request, jingle_elt, session) + log.exception("Error while handling on_session_info") + # XXX: only error managed so far, maybe some applications/transports need more + self.sendError( + client, "feature-not-implemented", None, request, "unsupported-info" + ) + except Exception: + log.exception("Error while managing session info") + else: + client.send(xmlstream.toResponse(request, "result")) async def on_transport_replace(self, client, request, jingle_elt, session): """A transport change is requested @@ -1431,13 +1420,19 @@ iq_elt.send() - def on_transport_accept(self, client, request, jingle_elt, session): + async def on_transport_accept( + self, + client: SatXMPPEntity, + request: domish.Element, + jingle_elt: domish.Element, + session: dict + ) -> None: """Method called once transport replacement is accepted - @param client: %(doc_client)s - @param request(domish.Element): full <iq> request - @param jingle_elt(domish.Element): the <jingle> element - @param session(dict): session data + @param client: SatXMPPEntity instance + @param request: full <iq> request + @param jingle_elt: the <jingle> element + @param session: session data """ log.debug("new transport has been accepted") @@ -1449,20 +1444,15 @@ return # at this point we can send the <iq/> result to confirm reception of the request - client.send(xmlstream.toResponse(request, "result")) + await client.a_send(xmlstream.toResponse(request, "result")) - negociate_defers = [] - negociate_defers = self._call_plugins( + await self._call_plugins( client, XEP_0166.A_TRANSPORT_ACCEPT, session, app_method_name=None ) - negociate_dlist = defer.DeferredList(negociate_defers) - # after negociations we start the transfer - negociate_dlist.addCallback( - lambda __: self._call_plugins( - client, XEP_0166.A_START, session, app_method_name=None, elements=False - ) + await self._call_plugins( + client, XEP_0166.A_START, session, app_method_name=None, elements=False ) def on_transport_reject(self, client, request, jingle_elt, session):
--- a/libervia/backend/plugins/plugin_xep_0166/models.py Sat May 11 13:25:45 2024 +0200 +++ b/libervia/backend/plugins/plugin_xep_0166/models.py Sat May 11 13:52:41 2024 +0200 @@ -215,6 +215,7 @@ class ApplicationData: namespace: str handler: BaseApplicationHandler + priority: int @dataclass(frozen=True)
--- a/libervia/backend/plugins/plugin_xep_0167/__init__.py Sat May 11 13:25:45 2024 +0200 +++ b/libervia/backend/plugins/plugin_xep_0167/__init__.py Sat May 11 13:52:41 2024 +0200 @@ -71,9 +71,12 @@ "unmute", "ringing", ) +ANSWER_SDP_SENT_KEY = "answer_sdp_sent" class XEP_0167(BaseApplicationHandler): + namespace = NS_JINGLE_RTP + def __init__(self, host): log.info(f'Plugin "{PLUGIN_INFO[C.PI_NAME]}" initialization') self.host = host @@ -147,7 +150,7 @@ metadata = call_data.get("metadata") or {} if "sdp" in call_data: - sdp_data = mapping.parse_sdp(call_data["sdp"]) + sdp_data = mapping.parse_sdp(call_data["sdp"], self._j.ROLE_INITIATOR) to_delete = set() for media, data in sdp_data.items(): if media not in ("audio", "video", "application"): @@ -186,6 +189,44 @@ return metadata + def get_contents(self, call_data: dict, metadata: dict) -> list[dict]: + """Generate call related contents. + + @param call_data: Call data after being parsed by [parse_call_data] + @param metadata: Metadata as returned by [parse_call_data] + @return: List of contents to be used with [jingle.initiate]. + + """ + contents = [] + seen_names = set() + + for media, media_data in call_data.items(): + if media not in ("audio", "video"): + continue + content = { + "app_ns": NS_JINGLE_RTP, + "senders": media_data["senders"], + "transport_type": self._j.TRANSPORT_DATAGRAM, + "app_kwargs": {"media": media, "media_data": media_data}, + "transport_data": { + "local_ice_data": { + "ufrag": metadata["ice-ufrag"], + "pwd": metadata["ice-pwd"], + "candidates": media_data.pop("ice-candidates"), + "fingerprint": media_data.pop("fingerprint", {}), + } + }, + } + if "id" in media_data: + name = media_data.pop("id") + if name in seen_names: + raise exceptions.DataError( + f"Content name (mid) seen multiple times: {name}" + ) + content["name"] = name + contents.append(content) + return contents + async def call_start( self, client: SatXMPPEntity, @@ -211,36 +252,8 @@ @raises exceptions.DataError: If media data is invalid or duplicate content name (mid) is found. """ - sid = str(uuid.uuid4()) metadata = self.parse_call_data(call_data) - contents = [] - seen_names = set() - - for media, media_data in call_data.items(): - if media not in ("audio", "video"): - continue - content = { - "app_ns": NS_JINGLE_RTP, - "senders": "both", - "transport_type": self._j.TRANSPORT_DATAGRAM, - "app_kwargs": {"media": media, "media_data": media_data}, - "transport_data": { - "local_ice_data": { - "ufrag": metadata["ice-ufrag"], - "pwd": metadata["ice-pwd"], - "candidates": media_data.pop("ice-candidates"), - "fingerprint": media_data.pop("fingerprint", {}), - } - }, - } - if "id" in media_data: - name = media_data.pop("id") - if name in seen_names: - raise exceptions.DataError( - f"Content name (mid) seen multiple times: {name}" - ) - content["name"] = name - contents.append(content) + contents = self.get_contents(call_data, metadata) if not contents: raise exceptions.DataError("no valid media data found: {call_data}") @@ -249,17 +262,14 @@ else C.META_SUBTYPE_CALL_AUDIO ) - defer.ensureDeferred( - self._j.initiate( + sid = await self._j.initiate( client, peer_jid, contents, - sid=sid, call_type=call_type, metadata=metadata, peer_metadata={}, ) - ) return sid def _call_answer_sdp(self, session_id: str, answer_sdp: str, profile: str) -> None: @@ -338,7 +348,7 @@ accepted = not resp_data.get("cancelled", False) if accepted: - session["call_accepted"] = True + session["pre_accepted"] = True return accepted @@ -357,7 +367,7 @@ @raises exceptions.CancelError: If the user doesn't accept the incoming call. """ - if session.get("call_accepted", False): + if session.get("pre_accepted", False): # the call is already accepted, nothing to do return @@ -474,7 +484,7 @@ # is requested only once for audio and video contents. return True - if not session.get("call_accepted", False): + if not session.get("pre_accepted", False): if any( c["desc_elt"].getAttribute("media") == "video" for c in session["contents"].values() @@ -505,20 +515,47 @@ answer_sdp = await answer_sdp_d - parsed_answer = mapping.parse_sdp(answer_sdp) + parsed_answer = mapping.parse_sdp(answer_sdp, session["role"]) session["metadata"].update(parsed_answer["metadata"]) - for media in ("audio", "video"): - for content in session["contents"].values(): - if content["desc_elt"].getAttribute("media") == media: - media_data = parsed_answer[media] - application_data = content["application_data"] - application_data["local_data"] = media_data["application_data"] - transport_data = content["transport_data"] - local_ice_data = media_data["transport_data"] - transport_data["local_ice_data"] = local_ice_data + self.propagate_data(session, parsed_answer) return True + def propagate_data(self, session: dict, parsed_answer: dict) -> None: + """Propagate local SDP data to other contents""" + for media in ("audio", "video", "application"): + for content in session["contents"].values(): + try: + application_data = content["application_data"] + content_media = application_data["media"] + except KeyError: + pass + else: + if content_media == media: + media_data = parsed_answer[media] + application_data["local_data"] = media_data["application_data"] + transport_data = content["transport_data"] + local_ice_data = media_data["transport_data"] + transport_data["local_ice_data"] = local_ice_data + + def send_answer_sdp(self, client: SatXMPPEntity, session: dict) -> None: + """Send answer SDP to frontend""" + if not session.get(ANSWER_SDP_SENT_KEY, False): + # we only send the signal once, as it means that the whole session is + # accepted + answer_sdp = mapping.generate_sdp_from_session(session) + self.host.bridge.call_setup( + session["id"], + data_format.serialise( + { + "role": session["role"], + "sdp": answer_sdp, + } + ), + client.profile, + ) + session[ANSWER_SDP_SENT_KEY] = True + async def jingle_handler(self, client, action, session, content_name, desc_elt): content_data = session["contents"][content_name] application_data = content_data["application_data"] @@ -542,20 +579,7 @@ elif action == self._j.A_PREPARE_INITIATOR: application_data["peer_data"] = mapping.parse_description(desc_elt) elif action == self._j.A_SESSION_ACCEPT: - if content_name == next(iter(session["contents"])): - # we only send the signal for first content, as it means that the whole - # session is accepted - answer_sdp = mapping.generate_sdp_from_session(session) - self.host.bridge.call_setup( - session["id"], - data_format.serialise( - { - "role": session["role"], - "sdp": answer_sdp, - } - ), - client.profile, - ) + pass # self.send_answer_sdp(client, session) else: log.warning(f"FIXME: unmanaged action {action}")
--- a/libervia/backend/plugins/plugin_xep_0167/mapping.py Sat May 11 13:25:45 2024 +0200 +++ b/libervia/backend/plugins/plugin_xep_0167/mapping.py Sat May 11 13:52:41 2024 +0200 @@ -37,7 +37,7 @@ return "a=sendrecv" elif senders == "none": return "a=inactive" - elif session["role"] == senders: + elif session["role"] != senders: return "a=sendonly" else: return "a=recvonly" @@ -113,8 +113,9 @@ sdp_lines, triggers_no_cancel=True ) + content_names = sorted(contents) - for content_name, content_data in contents.items(): + for content_name, content_data in [(n, contents[n]) for n in content_names]: # contents.items(): app_data_key = "local_data" if local else "peer_data" application_data = content_data["application_data"] media_data = application_data[app_data_key] @@ -241,11 +242,11 @@ return "\r\n".join(sdp_lines) + "\r\n" -def parse_sdp(sdp: str) -> dict: +def parse_sdp(sdp: str, role: str) -> dict: """Parse SDP string. @param sdp: The SDP string to parse. - + @param role: Role of the entities which produces the SDP. @return: A dictionary containing parsed session data. """ # FIXME: to be removed once host is accessible from global var @@ -263,6 +264,8 @@ ice_pwd: Optional[str] = None ice_ufrag: Optional[str] = None payload_types: Optional[Dict[int, dict]] = None + # default value, will be be modified by SDP + senders: str = "both" for line in lines: try: @@ -295,6 +298,7 @@ media_data = call_data[media_type] = { "application_data": application_data, "transport_data": transport_data, + "senders": senders } elif prefix == "a=": @@ -426,6 +430,24 @@ if transport_data is not None: transport_data["pwd"] = parts[0] + elif attribute in ("sendrecv", "sendonly", "recvonly", "inactive"): + if attribute == "sendrecv": + value = "both" + elif attribute == "sendonly": + value = role + elif attribute == "recvonly": + value = "responder" if role == "initiator" else "initiator" + else: + value = "none" + + if application_data is None: + # this is a global value, we use is as new default value + senders = value + else: + # this is a media specific value, it will replace the one used as + # default for this media + application_data["senders"] = value + host.trigger.point( "XEP-0167_parse_sdp_a", attribute,
--- a/libervia/backend/plugins/plugin_xep_0234.py Sat May 11 13:25:45 2024 +0200 +++ b/libervia/backend/plugins/plugin_xep_0234.py Sat May 11 13:52:41 2024 +0200 @@ -548,9 +548,6 @@ """ session_id = session["id"] peer_jid = session["peer_jid"] - # FIXME: has been moved from XEP-0353, but it doesn't handle correctly file - # transfer (metadata are not used). We must check with other clients what is - # actually send, and if XEP-0353 is used, and do a better integration. try: file_elt = next(description_elt.elements(NS_JINGLE_FT, "file")) @@ -564,10 +561,8 @@ ) if is_in_roster: action_type = C.META_TYPE_CONFIRM - action_subtype = C.META_TYPE_FILE else: action_type = C.META_TYPE_NOT_IN_ROSTER_LEAK - action_subtype = None action_extra = { "type": action_type, @@ -575,8 +570,7 @@ "from_jid": peer_jid.full(), "file_data": file_data } - if action_subtype is not None: - action_extra["subtype"] = action_subtype + action_extra["subtype"] = C.META_TYPE_FILE accepted = await xml_tools.defer_confirm( self.host, confirm_msg, @@ -585,7 +579,7 @@ action_extra=action_extra ) if accepted: - session["file_accepted"] = True + session["pre_accepted"] = True return accepted async def jingle_preflight_info( @@ -752,7 +746,7 @@ transport_data = content_data["transport_data"] webrtc = transport_data.get("webrtc", False) # file may have been already accepted in preflight - file_accepted = session.get("file_accepted", False) + pre_accepted = session.get("pre_accepted", False) file_data = await self.parse_file_element(client, file_elt, file_data, given=True) # FIXME: looks redundant with code done in self.parse_file_element try: @@ -773,7 +767,7 @@ ) action_extra = { "webrtc": webrtc, - "file_accepted": file_accepted, + "pre_accepted": pre_accepted, "type": C.META_TYPE_FILE, "session_id": session["id"], "from_jid": peer_jid.full(),
--- a/libervia/backend/plugins/plugin_xep_0343.py Sat May 11 13:25:45 2024 +0200 +++ b/libervia/backend/plugins/plugin_xep_0343.py Sat May 11 13:52:41 2024 +0200 @@ -290,10 +290,7 @@ f"Invalid datachannel signaling element: {transport_elt.toXml()}" ) transport_data["webrtc"] = True - elif action in ( - self._j.A_PREPARE_CONFIRMATION, self._j.A_PREPARE_INITIATOR, - self._j.A_TRANSPORT_INFO - ): + elif action in (self._j.A_PREPARE_INITIATOR, self._j.A_TRANSPORT_INFO): await self._call_ice_udp_handler( client, action, session, content_name, transport_elt ) @@ -319,37 +316,19 @@ elif action == self._j.A_SESSION_INITIATE: # responder side - sdp = mapping.generate_sdp_from_session(session) + # answer_sdp_d is a deferred handled in XEP-0167: it is called when the + # frontend answers with its SDP. session["answer_sdp_d"] = answer_sdp_d = defer.Deferred() # we should have the answer long before 2 min answer_sdp_d.addTimeout(2 * 60, reactor) - self.host.bridge.call_setup( - session["id"], - data_format.serialise( - { - "role": session["role"], - "sdp": sdp, - } - ), - client.profile, - ) + self._rtp.send_answer_sdp(client, session) answer_sdp = await answer_sdp_d - parsed_answer = mapping.parse_sdp(answer_sdp) + parsed_answer = mapping.parse_sdp(answer_sdp, session["role"]) session["metadata"].update(parsed_answer["metadata"]) - contents = session["contents"] - if len(contents) != 1: - raise NotImplementedError( - "Only a singlecontent is supported at the moment." - ) - content = next(iter(contents.values())) - media_data = parsed_answer["application"] - application_data = content["application_data"] - application_data["local_data"] = media_data["application_data"] - transport_data = content["transport_data"] - local_ice_data = media_data["transport_data"] - transport_data["local_ice_data"] = local_ice_data + self._rtp.propagate_data(session, parsed_answer) + transport_elt.children.clear() ice_transport_elt = await self._ice_udp.jingle_handler( client, action, session, content_name, transport_elt
--- a/libervia/backend/plugins/plugin_xep_0353.py Sat May 11 13:25:45 2024 +0200 +++ b/libervia/backend/plugins/plugin_xep_0353.py Sat May 11 13:52:41 2024 +0200 @@ -16,11 +16,13 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. +from typing import cast from twisted.internet import defer from twisted.internet import reactor from twisted.words.protocols.jabber import error, jid from twisted.words.protocols.jabber import xmlstream from twisted.words.xish import domish +from libervia.backend.plugins.plugin_xep_0166.models import ApplicationData from wokkel import disco, iwokkel from zope.interface import implementer @@ -328,6 +330,12 @@ log.warning("No application specified: {message_elt.toXml()}") return + cast(list[tuple[domish.Element, ApplicationData]], desc_and_apps) + desc_and_apps.sort( + key=lambda desc_and_app: desc_and_app[1].priority, + reverse=True + ) + session = self._j.create_session( client, session_id, self._j.ROLE_RESPONDER, peer_jid, local_jid )
--- a/libervia/backend/tools/common/data_format.py Sat May 11 13:25:45 2024 +0200 +++ b/libervia/backend/tools/common/data_format.py Sat May 11 13:52:41 2024 +0200 @@ -128,14 +128,20 @@ else: yield k[len(name) + 1 :], v -def serialise(data): + +def serialise(data: Any) -> str: """Serialise data so it can be sent to bridge - @return(unicode): serialised data, can be transmitted as string to the bridge + @return: serialised data, can be transmitted as string to the bridge """ return json.dumps(data, ensure_ascii=False, default=str) -def deserialise(serialised_data: str, default: Any = None, type_check: type = dict): + +def deserialise( + serialised_data: str, + default: Any = None, + type_check: type = dict +) -> Any: """Deserialize data from bridge @param serialised_data(unicode): data to deserialise
--- a/libervia/cli/call_gui.py Sat May 11 13:25:45 2024 +0200 +++ b/libervia/cli/call_gui.py Sat May 11 13:52:41 2024 +0200 @@ -216,8 +216,7 @@ parent.host.bridge, parent.profile, call_data, - sinks=webrtc.SINKS_APP, - appsink_data=webrtc.AppSinkData( + sinks_data=webrtc.SinksApp( local_video_cb=partial(av_call_gui.on_new_sample, video_stream="local"), remote_video_cb=partial(av_call_gui.on_new_sample, video_stream="remote"), ),
--- a/libervia/cli/call_tui.py Sat May 11 13:25:45 2024 +0200 +++ b/libervia/cli/call_tui.py Sat May 11 13:52:41 2024 +0200 @@ -80,8 +80,7 @@ self.parent.host.bridge, self.parent.profile, call_data, - sinks=webrtc.SINKS_APP, - appsink_data=webrtc.AppSinkData( + sinks_data=webrtc.SinksApp( local_video_cb=partial(self.on_new_sample, video_stream="local"), remote_video_cb=None, ),
--- a/libervia/cli/cmd_call.py Sat May 11 13:25:45 2024 +0200 +++ b/libervia/cli/cmd_call.py Sat May 11 13:52:41 2024 +0200 @@ -69,7 +69,8 @@ """Get relevant keyword arguments for CallData""" kwargs: dict[str, Any] = {} if self.args.sources == "test": - kwargs["sources"] = "test" + from libervia.frontends.tools.webrtc_models import SourcesTest + kwargs["sources_data"] = SourcesTest() return kwargs
--- a/libervia/cli/cmd_file.py Sat May 11 13:25:45 2024 +0200 +++ b/libervia/cli/cmd_file.py Sat May 11 13:52:41 2024 +0200 @@ -18,11 +18,7 @@ # along with this program. If not, see <http://www.gnu.org/licenses/>. -import asyncio -from functools import partial -import importlib import logging -from typing import IO from . import base from . import xmlui_manager import sys @@ -480,9 +476,9 @@ return webrtc = action_data.get("webrtc", False) - file_accepted = action_data.get("file_accepted", False) + pre_accepted = action_data.get("pre_accepted", False) - if file_accepted or not self.bare_jids or from_jid.bare in self.bare_jids: + if pre_accepted or not self.bare_jids or from_jid.bare in self.bare_jids: if self._overwrite_refused: self.disp(_("File refused because overwrite is needed"), error=True) await self.host.bridge.action_launch(
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libervia/frontends/tools/portal_desktop.py Sat May 11 13:52:41 2024 +0200 @@ -0,0 +1,500 @@ +#!/usr/bin/env python3 + +# Libervia freedesktop portal management module +# Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +from typing import Callable, Literal, overload +from libervia.backend.core import exceptions + +import asyncio +import logging +from random import randint +import dbus +from dbus.mainloop.glib import DBusGMainLoop + + +log = logging.getLogger(__name__) + + +class PortalError(Exception): + pass + + +class DesktopPortal: + + def __init__(self, on_session_closed_cb: Callable | None = None): + # we want monitors + windows, see https://flatpak.github.io/xdg-desktop-portal/docs/doc-org.freedesktop.portal.ScreenCast.html#org-freedesktop-portal-screencast-availablesourcetypes + self.dbus = dbus + self.on_session_closed_cb = on_session_closed_cb + self.sources_type = dbus.UInt32(7) + DBusGMainLoop(set_as_default=True) + self.session_bus = dbus.SessionBus() + portal_object = self.session_bus.get_object( + "org.freedesktop.portal.Desktop", "/org/freedesktop/portal/desktop" + ) + self.screencast_interface = dbus.Interface( + portal_object, "org.freedesktop.portal.ScreenCast" + ) + self.remote_desktop_interface = dbus.Interface( + portal_object, "org.freedesktop.portal.RemoteDesktop" + ) + self.session_interface = None + self.session_signal = None + self.handle_counter = 0 + self.session_handle = None + + @property + def handle_token(self): + self.handle_counter += 1 + return f"libervia{self.handle_counter}" + + def on_session_closed(self, details: dict) -> None: + if self.session_interface is not None: + self.session_interface = None + if self.on_session_closed_cb is not None: + self.on_session_closed_cb() + if self.session_signal is not None: + self.session_signal.remove() + self.session_signal = None + + @overload + async def dbus_call( + self, + interface: dbus.Interface, + method_name: str, + response: Literal[False], + **kwargs, + ) -> None: + ... + + @overload + async def dbus_call( + self, + interface: dbus.Interface, + method_name: str, + response: Literal[True], + **kwargs, + ) -> dict: + ... + + + async def dbus_call( + self, + interface: dbus.Interface, + method_name: str, + response: bool, + **kwargs, + ) -> dict|None: + """Call a portal method + + This method handle the signal response. + @param method_name: method to call + @param response: True if the method expect a response. + If True, the method will await responde from + ``org.freedesktop.portal.Request``'s ``Response`` signal. + @param kwargs: method args. + ``handle_token`` will be automatically added to ``options`` dict. + @return: method result + """ + method = getattr(interface, method_name) + try: + options = kwargs["options"] + except KeyError: + raise exceptions.InternalError('"options" key must be present.') + reply_fut = asyncio.Future() + signal_fut = asyncio.Future() + # cf. https://flatpak.github.io/xdg-desktop-portal/docs/doc-org.freedesktop.portal.Request.html + handle_token = self.handle_token + sender = self.session_bus.get_unique_name().replace(".", "_")[1:] + path = f"/org/freedesktop/portal/desktop/request/{sender}/{handle_token}" + signal_match = None + + def on_signal(response, results): + print(f"on_signal responde {response=}") + assert signal_match is not None + signal_match.remove() + if response == 0: + signal_fut.set_result(results) + elif response == 1: + signal_fut.set_exception(exceptions.CancelError("Cancelled by user.")) + else: + signal_fut.set_exception(PortalError("Can't get signal result")) + + if response: + signal_match = self.session_bus.add_signal_receiver( + on_signal, + signal_name="Response", + dbus_interface="org.freedesktop.portal.Request", + path=path, + ) + + options["handle_token"] = handle_token + + method( + *kwargs.values(), + reply_handler=lambda ret=None: reply_fut.set_result(ret), + error_handler=reply_fut.set_exception, + ) + try: + await reply_fut + except Exception as e: + raise PortalError(f"Can't ask portal permission: {e}") + if response: + return await signal_fut + + async def create_session( + self, + interface: dbus.Interface, + ) -> dict: + """Create a new session and store its handle. + + This method creates a new session using the freedesktop portal's CreateSession + dbus call. It then registers the session handle in this object for further use. + + @param None + @return: A dictionary containing the session data, including the session handle. + @raise PortalError: If there is an error getting the session handle. + """ + if self.session_handle is not None: + self.end_session() + session_data = await self.dbus_call( + interface, + "CreateSession", + response=True, + options={ + "session_handle_token": str(randint(1, 2**32)), + }, + ) + try: + session_handle = session_data["session_handle"] + except KeyError: + raise PortalError("Can't get session handle") + self.session_handle = session_handle + return session_data + + def parse_streams(self, session_handle, screenshare_data: dict) -> dict: + """Fill and returns stream_data from screenshare_data""" + try: + node_id, shared_stream_data = screenshare_data["streams"][0] + source_type = int(shared_stream_data["source_type"]) + except (IndexError, KeyError): + raise exceptions.NotFound("No stream data found.") + stream_data = { + "session_handle": session_handle, + "node_id": node_id, + "source_type": source_type, + } + try: + height = int(shared_stream_data["size"][0]) + weight = int(shared_stream_data["size"][1]) + except (IndexError, KeyError): + pass + else: + stream_data["size"] = (height, weight) + return stream_data + + async def request_screenshare(self) -> dict: + await self.create_session(self.screencast_interface) + session_handle = self.session_handle + + await self.dbus_call( + self.screencast_interface, + "SelectSources", + response=True, + session_handle=session_handle, + options={"multiple": True, "types": self.sources_type}, + ) + screenshare_data = await self.dbus_call( + self.screencast_interface, + "Start", + response=True, + session_handle=session_handle, + parent_window="", + options={}, + ) + + session_object = self.session_bus.get_object( + "org.freedesktop.portal.Desktop", session_handle + ) + self.session_interface = self.dbus.Interface( + session_object, "org.freedesktop.portal.Session" + ) + + self.session_signal = self.session_bus.add_signal_receiver( + self.on_session_closed, + signal_name="Closed", + dbus_interface="org.freedesktop.portal.Session", + path=session_handle, + ) + + try: + return self.parse_streams(session_handle, screenshare_data) + except exceptions.NotFound: + raise PortalError("Can't parse stream data") + + async def request_remote_desktop(self, with_screen_sharing: bool = True) -> dict: + """Request autorisation to remote control desktop. + + @param with_screen_sharing: True if screen must be shared. + """ + await self.create_session(self.remote_desktop_interface) + session_handle = self.session_handle + + if with_screen_sharing: + await self.dbus_call( + self.screencast_interface, + "SelectSources", + response=False, + session_handle=session_handle, + options={ + "multiple": True, + "types": self.sources_type, + # hidden cursor (should be the default, but cursor appears during + # tests)) + "cursor_mode": dbus.UInt32(1) + }, + ) + + await self.dbus_call( + self.remote_desktop_interface, + "SelectDevices", + response=False, + session_handle=session_handle, + options={ + "types": dbus.UInt32(3), + # "persist_mode": dbus.UInt32(1), + }, + ) + + remote_desktop_data = await self.dbus_call( + self.remote_desktop_interface, + "Start", + response=True, + session_handle=session_handle, + parent_window="", + options={}, + ) + try: + stream_data = self.parse_streams(session_handle, remote_desktop_data) + except exceptions.NotFound: + pass + else: + remote_desktop_data["stream_data"] = stream_data + + session_object = self.session_bus.get_object( + "org.freedesktop.portal.Desktop", session_handle + ) + self.session_interface = self.dbus.Interface( + session_object, "org.freedesktop.portal.Session" + ) + + self.session_signal = self.session_bus.add_signal_receiver( + self.on_session_closed, + signal_name="Closed", + dbus_interface="org.freedesktop.portal.Session", + path=session_handle, + ) + + return remote_desktop_data + + def end_session(self) -> None: + """Close a running screenshare session, if any.""" + if self.session_interface is None: + return + self.session_interface.Close() + self.on_session_closed({}) + + async def notify_pointer_motion(self, dx: int, dy: int) -> None: + """ + Notify about a new relative pointer motion event. + + @param dx: Relative movement on the x axis + @param dy: Relative movement on the y axis + """ + await self.dbus_call( + self.remote_desktop_interface, + "NotifyPointerMotion", + response=False, + session_handle=self.session_handle, + options={}, + dx=dx, + dy=dy, + ) + + async def notify_pointer_motion_absolute( + self, stream: int, x: float, y: float + ) -> None: + """ + Notify about a new absolute pointer motion event. + + @param stream: The PipeWire stream node the coordinate is relative to + @param x: Pointer motion x coordinate + @param y: Pointer motion y coordinate + """ + await self.dbus_call( + self.remote_desktop_interface, + "NotifyPointerMotionAbsolute", + response=False, + session_handle=self.session_handle, + options={}, + stream=stream, + x=x, + y=y, + ) + + async def notify_pointer_button(self, button: int, state: int) -> None: + """ + Notify about a new pointer button event. + + @param button: The pointer button was pressed or released + @param state: The new state of the button + """ + await self.dbus_call( + self.remote_desktop_interface, + "NotifyPointerButton", + response=False, + session_handle=self.session_handle, + options={}, + button=button, + state=state, + ) + + async def notify_pointer_axis(self, dx: float, dy: float) -> None: + """ + Notify about a new pointer axis event. + + @param dx: Relative axis movement on the x axis + @param dy: Relative axis movement on the y axis + """ + await self.dbus_call( + self.remote_desktop_interface, + "NotifyPointerAxis", + response=False, + session_handle=self.session_handle, + options={}, + dx=dx, + dy=dy, + ) + + async def notify_pointer_axis_discrete(self, axis: int, steps: int) -> None: + """ + Notify about a new pointer axis discrete event. + + @param axis: The axis that was scrolled + 0 for vertical + 1 for horizontal + @param steps: The number of steps scrolled + """ + await self.dbus_call( + self.remote_desktop_interface, + "NotifyPointerAxisDiscrete", + response=False, + session_handle=self.session_handle, + options={}, + axis=axis, + steps=steps, + ) + + async def notify_keyboard_keycode(self, keycode: int, state: int) -> None: + """ + Notify about a new keyboard keycode event. + + @param keycode: Keyboard code that was pressed or released + @param state: New state of keyboard keycode + """ + await self.dbus_call( + self.remote_desktop_interface, + "NotifyKeyboardKeycode", + response=False, + session_handle=self.session_handle, + options={}, + keycode=keycode, + state=state, + ) + + async def notify_keyboard_keysym(self, keysym: int, state: int) -> None: + """ + Notify about a new keyboard keysym event. + + @param keysym: Keyboard symbol that was pressed or released + @param state: New state of keyboard keysym + """ + await self.dbus_call( + self.remote_desktop_interface, + "NotifyKeyboardKeysym", + response=False, + session_handle=self.session_handle, + options={}, + keysym=keysym, + state=state, + ) + + async def notify_touch_down(self, stream: int, slot: int, x: int, y: int) -> None: + """ + Notify about a new touch down event. + + @param stream: The PipeWire stream node the coordinate is relative to + @param slot: Touch slot where touch point appeared + @param x: Touch down x coordinate + @param y: Touch down y coordinate + """ + await self.dbus_call( + self.remote_desktop_interface, + "NotifyTouchDown", + response=False, + session_handle=self.session_handle, + options={}, + stream=stream, + slot=slot, + x=x, + y=y, + ) + + async def notify_touch_motion(self, stream: int, slot: int, x: int, y: int) -> None: + """ + Notify about a new touch motion event. + + @param stream: The PipeWire stream node the coordinate is relative to + @param slot: Touch slot where touch point appeared + @param x: Touch motion x coordinate + @param y: Touch motion y coordinate + """ + await self.dbus_call( + self.remote_desktop_interface, + "NotifyTouchMotion", + response=False, + session_handle=self.session_handle, + options={}, + stream=stream, + slot=slot, + x=x, + y=y, + ) + + async def notify_touch_up(self, slot: int) -> None: + """ + Notify about a new touch up event. + + @param slot: Touch slot where touch point appeared + """ + await self.dbus_call( + self.remote_desktop_interface, + "NotifyTouchUp", + response=False, + session_handle=self.session_handle, + options={}, + slot=slot, + )
--- a/libervia/frontends/tools/webrtc.py Sat May 11 13:25:45 2024 +0200 +++ b/libervia/frontends/tools/webrtc.py Sat May 11 13:52:41 2024 +0200 @@ -35,13 +35,25 @@ from datetime import datetime import logging import re -from typing import Callable +from typing import Callable, Final from urllib.parse import quote_plus from libervia.backend.tools.common import data_format from libervia.frontends.tools import aio, display_servers, jid -from .webrtc_models import AppSinkData, CallData -from .webrtc_screenshare import DesktopPortal +from .webrtc_models import ( + CallData, + SinksApp, + SinksAuto, + SinksData, + SinksDataChannel, + SinksNone, + SourcesAuto, + SourcesData, + SourcesDataChannel, + SourcesNone, + SourcesPipeline, + SourcesTest, +) current_server = display_servers.detect() if current_server == display_servers.X11: @@ -52,17 +64,12 @@ log = logging.getLogger(__name__) +VIDEO_SOURCE_AUTO: Final = "v4l2src" +AUDIO_SOURCE_AUTO: Final = "pulsesrc" +NONE_NOT_IMPLEMENTED_MSG: Final = "None value is not handled yet." Gst.init(None) -SOURCES_AUTO = "auto" -SOURCES_TEST = "test" -SOURCES_DATACHANNEL = "datachannel" -SINKS_APP = "app" -SINKS_AUTO = "auto" -SINKS_TEST = "test" -SINKS_DATACHANNEL = "datachannel" - class WebRTC: """GSTreamer based WebRTC implementation for audio and video communication. @@ -75,44 +82,45 @@ self, bridge, profile: str, - sources: str = SOURCES_AUTO, - sinks: str = SINKS_AUTO, - appsink_data: AppSinkData | None = None, + sources_data: SourcesData|None = None, + sinks_data: SinksData|None = None, reset_cb: Callable | None = None, merge_pip: bool | None = None, target_size: tuple[int, int] | None = None, call_start_cb: Callable[[str, dict, str], Awaitable[str]] | None = None, - dc_open_cb: ( - Callable[[GstWebRTC.WebRTCDataChannel], Awaitable[None]] | None - ) = None, - dc_on_data_channel: ( - Callable[[GstWebRTC.WebRTCDataChannel], Awaitable[None]] | None - ) = None, + dc_data_list: list[SourcesDataChannel|SinksDataChannel]|None = None ) -> None: """Initializes a new WebRTC instance. @param bridge: An instance of backend bridge. @param profile: Libervia profile. - @param sources: Which kind of source to use. - @param sinks: Which kind of sinks to use. - @param appsink_data: configuration data for appsink (when SINKS_APP is used). Must - not be used for other sinks. + @param sources_data: Data of the sources. + The model used will determine which sources to use. + SourcesDataChannel can be used here as a convenience. It will then be moved + to ``data_channels`` and ``SourcesNone`` will be used instead for + ``sources_data``. + If None, SourcesAuto will be used. + @param sinks_data: Data of the sinks. + The model used will determine which sinks to use. + SinksDataChannel can be used here as a convenience. It will then be moved + to ``data_channels`` and ``SinksNone`` will be used instead for + ``sinks_data``. + If None, SinksAuto will be used. @param reset_cb: An optional Callable that is triggered on reset events. Can be used to reset UI data on new calls. @param merge_pip: A boolean flag indicating whether Picture-in-Picture mode is enabled. When PiP is used, local feedback is merged to remote video stream. Only one video stream is then produced (the local one). If None, PiP mode is selected automatically according to selected sink (it's - used for SINKS_AUTO only for now). + used for SinksAuto only for now). @param target_size: Expected size of the final sink stream. Mainly use by composer when ``merge_pip`` is set. - None to autodetect (not real autodetection implemeted yet, default to + None to autodetect (no real autodetection implemeted yet, default to (1280,720)). @param call_start_cb: Called when call is started. - @param dc_open_cb: Called when Data Channel is open (for SOURCES_DATACHANNEL). - This callback will be run in a GStreamer thread. - @param dc_open_cb: Called when Data Channel is created (for SINKS_DATACHANNEL). - This callback will be run in a GStreamer thread. + @param dc_data_list: Data Channels to create. + If a SourcesDataChannel is used as ``sources_data``, or a SinksDataChannel is + used as ``sinks_data``, they will be automatically added to this list. """ self.main_loop = asyncio.get_event_loop() self.bridge = bridge @@ -122,38 +130,40 @@ self._video_muted = False self._desktop_sharing = False self.desktop_sharing_data = None - self.sources = sources - self.sinks = sinks + if dc_data_list is None: + dc_data_list = [] + self.dc_data_list = dc_data_list + if sources_data is None: + sources_data = SourcesAuto() + elif isinstance(sources_data, SourcesDataChannel): + dc_data_list.append(sources_data) + sources_data = SourcesNone() + self.sources_data = sources_data + if sinks_data is None: + sinks_data = SinksAuto() + elif isinstance(sinks_data, SinksDataChannel): + dc_data_list.append(sinks_data) + sinks_data = SinksNone() + self.sinks_data = sinks_data if target_size is None: target_size = (1280, 720) self.target_width, self.target_height = target_size if merge_pip is None: - merge_pip = sinks == SINKS_AUTO + merge_pip = isinstance(sinks_data, SinksAuto) self.merge_pip = merge_pip if call_start_cb is None: call_start_cb = self._call_start self.call_start_cb = call_start_cb - if sources == SOURCES_DATACHANNEL: - assert dc_open_cb is not None - self.dc_open_cb = dc_open_cb - if sinks == SINKS_DATACHANNEL: - assert dc_on_data_channel is not None - self.dc_on_data_channel = dc_on_data_channel - if sinks == SINKS_APP: - if ( - merge_pip - and appsink_data is not None - and appsink_data.remote_video_cb is not None - ): + if isinstance(sinks_data, SinksApp): + if merge_pip and sinks_data.remote_video_cb is not None: raise ValueError("Remote_video_cb can't be used when merge_pip is used!") - self.appsink_data = appsink_data - elif appsink_data is not None: - raise exceptions.InternalError( - "appsink_data can only be used for SINKS_APP sinks" - ) self.reset_cb = reset_cb if current_server == display_servers.WAYLAND: - self.desktop_portal = DesktopPortal(self) + from .portal_desktop import DesktopPortal + + self.desktop_portal = DesktopPortal( + on_session_closed_cb=self.on_portal_session_closed + ) else: self.desktop_portal = None self.reset_instance() @@ -370,23 +380,46 @@ return base_format.format(**parsed_candidate) - def extract_ufrag_pwd(self, sdp: str) -> tuple[str, str]: + def extract_ufrag_pwd(self, sdp: str) -> None: """Retrieves ICE password and user fragment for SDP offer. @param sdp: The Session Description Protocol offer string. - @return: ufrag and pwd - @raise ValueError: Can't extract ufrag and password """ - ufrag_line = re.search(r"ice-ufrag:(\S+)", sdp) - pwd_line = re.search(r"ice-pwd:(\S+)", sdp) + lines = sdp.splitlines() + media = '' + mid_media_map = {} + bundle_media = set() + bundle_ufrag = '' + bundle_pwd = '' + in_bundle = False - if ufrag_line and pwd_line: - ufrag = self.ufrag = ufrag_line.group(1) - pwd = self.pwd = pwd_line.group(1) - return ufrag, pwd - else: - log.error(f"SDP with missing ice-ufrag or ice-pwd:\n{sdp}") - raise ValueError("Can't extract ice-ufrag and ice-pwd from SDP") + for line in lines: + if line.startswith('m='): + media = line.split('=')[1].split()[0] + elif line.startswith('a=mid:'): + mid = line.split(':')[1].strip() + mid_media_map[mid] = media + elif line.startswith('a=group:BUNDLE'): + in_bundle = True + bundle_media = set(line.split(':')[1].strip().split()) + elif line.startswith('a=ice-ufrag:'): + if in_bundle: + bundle_ufrag = line.split(':')[1].strip() + else: + self.ufrag[media] = line.split(':')[1].strip() + elif line.startswith('a=ice-pwd:'): + if in_bundle: + bundle_pwd = line.split(':')[1].strip() + else: + self.pwd[media] = line.split(':')[1].strip() + else: + in_bundle = False + + if bundle_ufrag and bundle_pwd: + for mid in bundle_media: + media = mid_media_map[mid] + self.ufrag[media] = bundle_ufrag + self.pwd[media] = bundle_pwd def reset_instance(self): """Inits or resets the instance variables to their default state.""" @@ -398,8 +431,8 @@ self.sid: str | None = None self.offer: str | None = None self.local_candidates_buffer = {} - self.ufrag: str | None = None - self.pwd: str | None = None + self.ufrag: dict[str, str] = {} + self.pwd: dict[str, str] = {} self.callee: jid.JID | None = None self._media_types = None self._media_types_inv = None @@ -412,13 +445,25 @@ self._media_types_inv = None self.audio_valve = None self.video_valve = None + self.video_selector = None if self.desktop_portal is not None: - self.desktop_portal.end_screenshare() + self.desktop_portal.end_session() self.desktop_sharing = False self.desktop_sink_pad = None self.bindings = {} if self.reset_cb is not None: self.reset_cb() + self.data_channels: dict[str, GstWebRTC.WebRTCDataChannel] = {} + + @property + def data_channel(self) -> GstWebRTC.WebRTCDataChannel: + """Convenience method to get WebRTCDataChannel instance when there is only one.""" + if len(self.data_channels) != 1: + raise exceptions.InternalError( + "self.data_channel can only be used in a single Data Channel scenario. " + "Use self.data_channels dict instead." + ) + return next(iter(self.data_channels.values())) async def setup_call( self, @@ -442,83 +487,126 @@ assert role in ("initiator", "responder") self.role = role - if self.sources == SOURCES_DATACHANNEL or self.sinks == SINKS_DATACHANNEL: - # Setup pipeline for datachannel only, no media streams. - self.gst_pipe_desc = f""" - webrtcbin name=sendrecv bundle-policy=max-bundle - """ + + if isinstance(self.sources_data, SourcesPipeline): + if self.sources_data.video_pipeline!= "" and video_pt is None: + raise NotImplementedError(NONE_NOT_IMPLEMENTED_MSG) + if self.sources_data.audio_pipeline!= "" and audio_pt is None: + raise NotImplementedError(NONE_NOT_IMPLEMENTED_MSG) + elif isinstance(self.sources_data, SourcesNone): + pass else: if audio_pt is None or video_pt is None: - raise NotImplementedError("None value is not handled yet") + raise NotImplementedError(NONE_NOT_IMPLEMENTED_MSG) - if self.sources == SOURCES_AUTO: - video_source_elt = "v4l2src" - audio_source_elt = "pulsesrc" - elif self.sources == SOURCES_TEST: + match self.sources_data: + case SourcesAuto(): + video_source_elt = VIDEO_SOURCE_AUTO + audio_source_elt = AUDIO_SOURCE_AUTO + case SourcesNone(): + video_source_elt = "" + audio_source_elt = "" + case SourcesPipeline() as source: + if source.video_pipeline is None: + video_source_elt = VIDEO_SOURCE_AUTO + else: + video_source_elt = source.video_pipeline + if source.audio_pipeline is None: + audio_source_elt = AUDIO_SOURCE_AUTO + else: + audio_source_elt = source.audio_pipeline + case SourcesTest(): video_source_elt = "videotestsrc is-live=true pattern=ball" audio_source_elt = "audiotestsrc" - else: + case _: raise exceptions.InternalError( - f'Unknown "sources" value: {self.sources!r}' + f'Unexpected "sources_data" value: {self.sources_data!r}' + ) + + match self.sinks_data: + case SinksApp(): + local_video_sink_elt = ( + "appsink name=local_video_sink emit-signals=true drop=true " + "max-buffers=1 sync=True" + ) + case SinksAuto(): + if isinstance(self.sources_data, SourcesNone): + local_video_sink_elt = "" + else: + local_video_sink_elt = "autovideosink" + case SinksNone(): + local_video_sink_elt = "" + case _: + raise exceptions.InternalError( + f'Unexpected "sinks_data" value {self.sinks_data!r}' ) - if self.sinks == SINKS_APP: - local_video_sink_elt = ( - "appsink name=local_video_sink emit-signals=true drop=true max-buffers=1 " - "sync=True" - ) - elif self.sinks == SINKS_AUTO: - local_video_sink_elt = "autovideosink" - else: - raise exceptions.InternalError(f"Unknown sinks value {self.sinks!r}") + gst_pipe_elements = [ + "webrtcbin latency=30 name=sendrecv bundle-policy=max-bundle" + ] + + if self.merge_pip and local_video_sink_elt: + # Compositor is used to merge local video feedback in video sink, useful when + # we have only a single video sink. + gst_pipe_elements.append( + "compositor name=compositor background=black " + f"! video/x-raw,width={self.target_width}," + f"height={self.target_height},framerate=30/1 " + f"! {local_video_sink_elt}" + ) + local_video_sink_elt = "compositor.sink_1" - if self.merge_pip: - extra_elt = ( - "compositor name=compositor background=black " - f"! video/x-raw,width={self.target_width},height={self.target_height}," - "framerate=30/1 " - f"! {local_video_sink_elt}" - ) - local_video_sink_elt = "compositor.sink_1" - else: - extra_elt = "" + if video_source_elt: + # Video source with an input-selector to switch between normal and video mute + # (or desktop sharing). + gst_pipe_elements.append(f""" + input-selector name=video_selector + ! videorate drop-only=1 max-rate=30 + ! video/x-raw,framerate=30/1 + ! tee name=t - self.gst_pipe_desc = f""" - webrtcbin latency=100 name=sendrecv bundle-policy=max-bundle + {video_source_elt} name=video_src + ! queue max-size-buffers=1 max-size-time=0 max-size-bytes=0 leaky=downstream + ! video_selector. - input-selector name=video_selector - ! videorate - ! video/x-raw,framerate=30/1 - ! tee name=t + videotestsrc name=muted_src is-live=true pattern=black + ! queue leaky=downstream + ! video_selector. - {extra_elt} - - {video_source_elt} name=video_src ! queue leaky=downstream ! video_selector. - videotestsrc name=muted_src is-live=true pattern=black ! queue leaky=downstream ! video_selector. - - t. - ! queue max-size-buffers=5 max-size-time=0 max-size-bytes=0 leaky=downstream - ! videoconvert - ! vp8enc deadline=1 keyframe-max-dist=60 - ! rtpvp8pay picture-id-mode=15-bit - ! application/x-rtp,media=video,encoding-name=VP8,payload={video_pt} - ! sendrecv. + t. + ! queue max-size-buffers=1 max-size-time=0 max-size-bytes=0 leaky=downstream + ! videoscale + ! videoconvert + ! vp8enc deadline=1 keyframe-max-dist=30 + ! rtpvp8pay picture-id-mode=15-bit + ! application/x-rtp,media=video,encoding-name=VP8,payload={video_pt} + ! sendrecv. + """) - t. - ! queue max-size-buffers=5 max-size-time=0 max-size-bytes=0 leaky=downstream - ! videoconvert - ! {local_video_sink_elt} + if local_video_sink_elt: + # Local video feedback. + gst_pipe_elements.append(f""" + t. + ! queue max-size-buffers=1 max-size-time=0 max-size-bytes=0 leaky=downstream + ! videoconvert + ! {local_video_sink_elt} + """) - {audio_source_elt} name=audio_src - ! valve - ! queue max-size-buffers=10 max-size-time=0 max-size-bytes=0 leaky=downstream - ! audioconvert - ! audioresample - ! opusenc audio-type=voice - ! rtpopuspay - ! application/x-rtp,media=audio,encoding-name=OPUS,payload={audio_pt} - ! sendrecv. - """ + if audio_source_elt: + # Audio with a valve for muting. + gst_pipe_elements.append(r""" + {audio_source_elt} name=audio_src + ! valve + ! queue max-size-buffers=10 max-size-time=0 max-size-bytes=0 leaky=downstream + ! audioconvert + ! audioresample + ! opusenc audio-type=voice + ! rtpopuspay + ! application/x-rtp,media=audio,encoding-name=OPUS,payload={audio_pt} + ! sendrecv. + """) + + self.gst_pipe_desc = "\n\n".join(gst_pipe_elements) log.debug(f"Gstreamer pipeline: {self.gst_pipe_desc}") @@ -542,18 +630,25 @@ if self.webrtcbin is None: raise exceptions.InternalError("Can't get the pipeline.") - # For datachannel setups, media source, selector, and sink elements are not - # created - if self.sources != SOURCES_DATACHANNEL and self.sinks != SINKS_DATACHANNEL: - self.video_src = self.pipeline.get_by_name("video_src") - self.muted_src = self.pipeline.get_by_name("muted_src") - self.video_selector = self.pipeline.get_by_name("video_selector") - self.audio_valve = self.pipeline.get_by_name("audio_valve") + # If video or audio sources are not created, ``get_by_name`` will return None. + self.video_src = self.pipeline.get_by_name("video_src") + self.muted_src = self.pipeline.get_by_name("muted_src") + self.video_selector = self.pipeline.get_by_name("video_selector") + if self.video_src and isinstance(self.sources_data, SourcesPipeline): + for name, value in self.sources_data.video_properties.items(): + self.video_src.set_property(name, value) - if self.video_muted: - self.on_video_mute(True) - if self.audio_muted: - self.on_audio_mute(True) + self.audio_src = self.pipeline.get_by_name("audio_src") + if self.audio_src and isinstance(self.sources_data, SourcesPipeline): + for name, value in self.sources_data.audio_properties.items(): + self.audio_src.set_property(name, value) + + self.audio_valve = self.pipeline.get_by_name("audio_valve") + + if self.video_muted: + self.on_video_mute(True) + if self.audio_muted: + self.on_audio_mute(True) # set STUN and TURN servers external_disco = data_format.deserialise( @@ -583,13 +678,13 @@ log.warning(f"Erreur while adding TURN server {url}") # local video feedback - if self.sinks == SINKS_APP and self.sources != SOURCES_DATACHANNEL: - assert self.appsink_data is not None + if isinstance(self.sinks_data, SinksApp): local_video_sink = self.pipeline.get_by_name("local_video_sink") - local_video_sink.set_property("emit-signals", True) - local_video_sink.connect("new-sample", self.appsink_data.local_video_cb) - local_video_sink_caps = Gst.Caps.from_string(f"video/x-raw,format=RGB") - local_video_sink.set_property("caps", local_video_sink_caps) + if local_video_sink is not None: + local_video_sink.set_property("emit-signals", True) + local_video_sink.connect("new-sample", self.sinks_data.local_video_cb) + local_video_sink_caps = Gst.Caps.from_string(f"video/x-raw,format=RGB") + local_video_sink.set_property("caps", local_video_sink_caps) # Create bus and associate signal watchers self.bus = self.pipeline.get_bus() @@ -610,7 +705,13 @@ "notify::ice-connection-state", self.on_ice_connection_state ) - if self.sources == SOURCES_DATACHANNEL: + for dc_data in self.dc_data_list: + self.create_data_channel(dc_data) + + def create_data_channel(self, dc_data: SourcesDataChannel|SinksDataChannel) -> None: + """Create a Data Channel and connect relevant callbacks.""" + assert self.pipeline is not None + if isinstance(dc_data, SourcesDataChannel): # Data channel configuration for compatibility with browser defaults data_channel_options = Gst.Structure.new_empty("data-channel-options") data_channel_options.set_value("ordered", True) @@ -618,15 +719,19 @@ # Create the data channel self.pipeline.set_state(Gst.State.READY) - self.data_channel = self.webrtcbin.emit( - "create-data-channel", "file", data_channel_options + self.data_channels[dc_data.name] = data_channel = self.webrtcbin.emit( + "create-data-channel", dc_data.name, data_channel_options ) - if self.data_channel is None: + if data_channel is None: log.error("Failed to create data channel") return - self.data_channel.connect("on-open", self.dc_open_cb) - if self.sinks == SINKS_DATACHANNEL: - self.webrtcbin.connect("on-data-channel", self.dc_on_data_channel) + data_channel.connect("on-open", dc_data.dc_open_cb) + elif isinstance(dc_data, SinksDataChannel): + self.webrtcbin.connect("on-data-channel", dc_data.dc_on_data_channel) + else: + raise ValueError( + "Only SourcesDataChannel or SinksDataChannel are allowed." + ) def start_pipeline(self) -> None: """Starts the GStreamer pipeline.""" @@ -790,10 +895,9 @@ remote_video_sink.set_property("width", width) remote_video_sink.set_property("height", height) remote_video_sink.set_property("sizing-policy", 1) - elif self.sinks == SINKS_APP: + elif isinstance(self.sinks_data, SinksApp): # ``app`` sink without ``self.merge_pip`` set, be create the sink and # connect it to the ``remote_video_cb``. - assert self.appsink_data is not None remote_video_sink = Gst.ElementFactory.make("appsink") remote_video_caps = Gst.Caps.from_string("video/x-raw,format=RGB") @@ -803,15 +907,17 @@ remote_video_sink.set_property("drop", True) remote_video_sink.set_property("max-buffers", 1) remote_video_sink.set_property("sync", True) - remote_video_sink.connect("new-sample", self.appsink_data.remote_video_cb) + remote_video_sink.connect("new-sample", self.sinks_data.remote_video_cb) self.pipeline.add(remote_video_sink) - elif self.sinks == SINKS_AUTO: + elif isinstance(self.sinks_data, SinksAuto): # if ``self.merge_pip`` is not set, we create a dedicated # ``autovideosink`` for remote stream. remote_video_sink = Gst.ElementFactory.make("autovideosink") self.pipeline.add(remote_video_sink) else: - raise exceptions.InternalError(f'Unhandled "sinks" value: {self.sinks!r}') + raise exceptions.InternalError( + f'Unhandled "sinks_data" value: {self.sinks_data!r}' + ) if adjust_resolution: videoscale = Gst.ElementFactory.make("videoscale") @@ -898,14 +1004,14 @@ log.debug( f"sending buffered local ICE candidates: {self.local_candidates_buffer}" ) - if self.pwd is None: + if not self.pwd: sdp = self.webrtcbin.props.local_description.sdp.as_text() self.extract_ufrag_pwd(sdp) ice_data = {} for media_type, candidates in self.local_candidates_buffer.items(): ice_data[media_type] = { - "ufrag": self.ufrag, - "pwd": self.pwd, + "ufrag": self.ufrag[media_type], + "pwd": self.pwd[media_type], "candidates": candidates, } await self.bridge.ice_candidates_add( @@ -949,9 +1055,13 @@ ) assert "VP8" in payload_types assert "OPUS" in payload_types - await self.setup_call( - "responder", audio_pt=payload_types["OPUS"], video_pt=payload_types["VP8"] - ) + try: + await self.setup_call( + "responder", audio_pt=payload_types["OPUS"], video_pt=payload_types["VP8"] + ) + except Exception: + log.exception("Can't setup call") + raise self.start_pipeline() offer = GstWebRTC.WebRTCSessionDescription.new( GstWebRTC.WebRTCSDPType.OFFER, offer_sdp_msg @@ -986,8 +1096,12 @@ else: sdp = self.webrtcbin.props.local_description.sdp.as_text() assert sdp is not None - ufrag, pwd = self.extract_ufrag_pwd(sdp) - ice_data = {"ufrag": ufrag, "pwd": pwd, "candidates": [parsed_candidate]} + self.extract_ufrag_pwd(sdp) + ice_data = { + "ufrag": self.ufrag[media_type], + "pwd": self.pwd[media_type], + "candidates": [parsed_candidate] + } self._a_call( self.bridge.ice_candidates_add, self.sid, @@ -1096,6 +1210,9 @@ self.desktop_sharing_data = {"path": str(screenshare_data["node_id"])} self.do_desktop_switch(desktop_active) + def on_portal_session_closed(self) -> None: + self.desktop_sharing = False + def do_desktop_switch(self, desktop_active: bool) -> None: if self.video_muted: # Update the active source state but do not switch @@ -1214,7 +1331,7 @@ self.desktop_sink_pad = None if self.desktop_portal is not None: - self.desktop_portal.end_screenshare() + self.desktop_portal.end_session() async def end_call(self) -> None: """Stop streaming and clean instance""" @@ -1249,9 +1366,7 @@ self.webrtc.callee = callee self.on_call_setup_cb = on_call_setup_cb self.on_call_ended_cb = on_call_ended_cb - bridge.register_signal( - "ice_candidates_new", self.on_ice_candidates_new, "plugin" - ) + bridge.register_signal("ice_candidates_new", self.on_ice_candidates_new, "plugin") bridge.register_signal("call_setup", self.on_call_setup, "plugin") bridge.register_signal("call_ended", self.on_call_ended, "plugin") @@ -1328,5 +1443,9 @@ To be used only if we are initiator """ - await self.webrtc.setup_call("initiator") + try: + await self.webrtc.setup_call("initiator") + except Exception: + log.exception("Can't setup call") + raise self.webrtc.start_pipeline()
--- a/libervia/frontends/tools/webrtc_file.py Sat May 11 13:25:45 2024 +0200 +++ b/libervia/frontends/tools/webrtc_file.py Sat May 11 13:52:41 2024 +0200 @@ -147,13 +147,14 @@ self.bridge, self.profile, call_data, - sources=webrtc.SOURCES_DATACHANNEL, + sources_data=webrtc.SourcesDataChannel( + dc_open_cb=partial(self._on_dc_open, file_path) + ), call_start_cb=partial( self._on_webrtc_call_start, file_path, file_name, ), - dc_open_cb=partial(self._on_dc_open, file_path), ) @@ -295,6 +296,7 @@ self.bridge, self.profile, call_data, - sinks=webrtc.SINKS_DATACHANNEL, - dc_on_data_channel=self._on_data_channel, + sinks_data=webrtc.SinksDataChannel( + dc_on_data_channel=self._on_data_channel, + ), )
--- a/libervia/frontends/tools/webrtc_models.py Sat May 11 13:25:45 2024 +0200 +++ b/libervia/frontends/tools/webrtc_models.py Sat May 11 13:52:41 2024 +0200 @@ -16,8 +16,15 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. +from collections.abc import Awaitable from dataclasses import dataclass, field from typing import Any, Callable +import uuid +import gi + +gi.require_versions({"Gst": "1.0", "GstWebRTC": "1.0"}) +from gi.repository import GstWebRTC +from pydantic import BaseModel, Field from libervia.frontends.tools import jid @@ -30,7 +37,80 @@ kwargs: dict[str, Any] = field(default_factory=dict) -@dataclass -class AppSinkData: +class SourcesData(BaseModel): + """Data for Sources""" + + +class SourcesNone(SourcesData): + """No source is used. + + This is used when the WebRTC connection will be used for data channels only.""" + + +class SourcesAuto(SourcesData): + """Automatic Sources (webcam/microphone)""" + + +class SourcesTest(SourcesData): + """Test Sources (pattern)""" + + +class SourcesDataChannel(SourcesData): + """Sources for transmitting data over Data Channel + + + @param dc_open_cb: Called when Data Channel is open. + This callback will be run in a GStreamer thread. + """ + name: str = Field(default_factory=lambda: str(uuid.uuid4())) + dc_open_cb: Callable[[GstWebRTC.WebRTCDataChannel], None] + + +class SourcesPipeline(SourcesData): + """Use custom pipeline description as a source. + + @param video_pipeline: pipeline description of video source. + None to use automatic video source (same as SourcesAuto). + Empty string to disable video. + @param audio_pipeline: pipeline description of audio source. + None to use automatic audio source (same as SourcesAuto). + Empty string to disable audio. + @param video_properties: Elements properties to set. + @param audio_properties: Elements properties to set. + + """ + video_pipeline: str|None = None + audio_pipeline: str|None = None + video_properties: dict = Field(default_factory=lambda: {}) + audio_properties: dict = Field(default_factory=lambda: {}) + + +class SinksData(BaseModel): + """Data for Sinks""" + + +class SinksNone(SinksData): + """No sink is used. + + This is used when the WebRTC connection will be used for data channels only.""" + + +class SinksAuto(SinksData): + """Automatic Sinks (create windows/default audio)""" + + +class SinksApp(SinksData): local_video_cb: Callable - remote_video_cb: Callable|None + remote_video_cb: Callable | None + + +class SinksDataChannel(SinksData): + """Sinks for transmitting data over Data Channel + + @param dc_on_data_channel: Called when Data Channel is created. + This callback will be run in a GStreamer thread. + """ + + dc_on_data_channel: ( + Callable[[GstWebRTC.WebRTCDataChannel], Awaitable[None]] | None + ) = None
--- a/libervia/frontends/tools/webrtc_screenshare.py Sat May 11 13:25:45 2024 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,207 +0,0 @@ -#!/usr/bin/env python3 - -# Libervia WebRTC implementation -# Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org) - -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. - -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. - -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -from libervia.backend.core import exceptions - -import asyncio -import logging -from random import randint - - -log = logging.getLogger(__name__) - - -SOURCES_AUTO = "auto" -SOURCES_TEST = "test" -SOURCES_DATACHANNEL = "datachannel" -SINKS_APP = "app" -SINKS_AUTO = "auto" -SINKS_TEST = "test" -SINKS_DATACHANNEL = "datachannel" - - -class ScreenshareError(Exception): - pass - - -class DesktopPortal: - - def __init__(self, webrtc): - import dbus - from dbus.mainloop.glib import DBusGMainLoop - # we want monitors + windows, see https://flatpak.github.io/xdg-desktop-portal/docs/doc-org.freedesktop.portal.ScreenCast.html#org-freedesktop-portal-screencast-availablesourcetypes - self.dbus = dbus - self.webrtc = webrtc - self.sources_type = dbus.UInt32(7) - DBusGMainLoop(set_as_default=True) - self.session_bus = dbus.SessionBus() - portal_object = self.session_bus.get_object( - 'org.freedesktop.portal.Desktop', - '/org/freedesktop/portal/desktop' - ) - self.screencast_interface = dbus.Interface( - portal_object, - 'org.freedesktop.portal.ScreenCast' - ) - self.session_interface = None - self.session_signal = None - self.handle_counter = 0 - self.session_handle = None - self.stream_data: dict|None = None - - @property - def handle_token(self): - self.handle_counter += 1 - return f"libervia{self.handle_counter}" - - def on_session_closed(self, details: dict) -> None: - if self.session_interface is not None: - self.session_interface = None - self.webrtc.desktop_sharing = False - if self.session_signal is not None: - self.session_signal.remove() - self.session_signal = None - - - async def dbus_call(self, method_name: str, *args) -> dict: - """Call a screenshare portal method - - This method handle the signal response. - @param method_name: method to call - @param args: extra args - `handle_token` will be automatically added to the last arg (option dict) - @return: method result - """ - if self.session_handle is not None: - self.end_screenshare() - method = getattr(self.screencast_interface, method_name) - options = args[-1] - reply_fut = asyncio.Future() - signal_fut = asyncio.Future() - # cf. https://flatpak.github.io/xdg-desktop-portal/docs/doc-org.freedesktop.portal.Request.html - handle_token = self.handle_token - sender = self.session_bus.get_unique_name().replace(".", "_")[1:] - path = f"/org/freedesktop/portal/desktop/request/{sender}/{handle_token}" - signal_match = None - - def on_signal(response, results): - assert signal_match is not None - signal_match.remove() - if response == 0: - signal_fut.set_result(results) - elif response == 1: - signal_fut.set_exception( - exceptions.CancelError("Cancelled by user.") - ) - else: - signal_fut.set_exception(ScreenshareError( - f"Can't get signal result" - )) - - signal_match = self.session_bus.add_signal_receiver( - on_signal, - signal_name="Response", - dbus_interface="org.freedesktop.portal.Request", - path=path - ) - - options["handle_token"] = handle_token - - method( - *args, - reply_handler=reply_fut.set_result, - error_handler=reply_fut.set_exception - ) - try: - await reply_fut - except Exception as e: - raise ScreenshareError(f"Can't ask screenshare permission: {e}") - return await signal_fut - - async def request_screenshare(self) -> dict: - session_data = await self.dbus_call( - "CreateSession", - { - "session_handle_token": str(randint(1, 2**32)), - } - ) - try: - session_handle = session_data["session_handle"] - except KeyError: - raise ScreenshareError("Can't get session handle") - self.session_handle = session_handle - - - await self.dbus_call( - "SelectSources", - session_handle, - { - "multiple": True, - "types": self.sources_type, - "modal": True - } - ) - screenshare_data = await self.dbus_call( - "Start", - session_handle, - "", - {} - ) - - session_object = self.session_bus.get_object( - 'org.freedesktop.portal.Desktop', - session_handle - ) - self.session_interface = self.dbus.Interface( - session_object, - 'org.freedesktop.portal.Session' - ) - - self.session_signal = self.session_bus.add_signal_receiver( - self.on_session_closed, - signal_name="Closed", - dbus_interface="org.freedesktop.portal.Session", - path=session_handle - ) - - try: - node_id, stream_data = screenshare_data["streams"][0] - source_type = int(stream_data["source_type"]) - except (IndexError, KeyError): - raise ScreenshareError("Can't parse stream data") - self.stream_data = stream_data = { - "session_handle": session_handle, - "node_id": node_id, - "source_type": source_type - } - try: - height = int(stream_data["size"][0]) - weight = int(stream_data["size"][1]) - except (IndexError, KeyError): - pass - else: - stream_data["size"] = (height, weight) - - return self.stream_data - - def end_screenshare(self) -> None: - """Close a running screenshare session, if any.""" - if self.session_interface is None: - return - self.session_interface.Close() - self.on_session_closed({})