Mercurial > libervia-backend
diff libervia/backend/plugins/plugin_xep_0353.py @ 4231:e11b13418ba6
plugin XEP-0353, XEP-0234, jingle: WebRTC data channel signaling implementation:
Implement XEP-0343: Signaling WebRTC Data Channels in Jingle. The current version of the
XEP (0.3.1) has no implementation and contains some flaws. After discussing this on xsf@,
Daniel (from Conversations) mentioned that they had a sprint with Larma (from Dino) to
work on another version and provided me with this link:
https://gist.github.com/iNPUTmice/6c56f3e948cca517c5fb129016d99e74 . I have used it for my
implementation.
This implementation reuses work done on Jingle A/V call (notably XEP-0176 and XEP-0167
plugins), with adaptations. When used, XEP-0234 will not handle the file itself as it
normally does. This is because WebRTC has several implementations (browser for web
interface, GStreamer for others), and file/data must be handled directly by the frontend.
This is particularly important for web frontends, as the file is not sent from the backend
but from the end-user's browser device.
Among the changes, there are:
- XEP-0343 implementation.
- `file_send` bridge method now use serialised dict as output.
- New `BaseTransportHandler.is_usable` method which get content data and returns a boolean
(default to `True`) to tell if this transport can actually be used in this context (when
we are initiator). Used in webRTC case to see if call data are available.
- Support of `application` media type, and everything necessary to handle data channels.
- Better confirmation message, with file name, size and description when available.
- When file is accepted in preflight, it is specified in following `action_new` signal for
actual file transfer. This way, frontend can avoid the display or 2 confirmation
messages.
- XEP-0166: when not specified, default `content` name is now its index number instead of
a UUID. This follows the behaviour of browsers.
- XEP-0353: better handling of events such as call taken by another device.
- various other updates.
rel 441
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 06 Apr 2024 12:57:23 +0200 |
parents | 6784d07b99c8 |
children | 79c8a70e1813 |
line wrap: on
line diff
--- a/libervia/backend/plugins/plugin_xep_0353.py Sat Apr 06 12:21:04 2024 +0200 +++ b/libervia/backend/plugins/plugin_xep_0353.py Sat Apr 06 12:57:23 2024 +0200 @@ -29,6 +29,12 @@ from libervia.backend.core.core_types import SatXMPPEntity from libervia.backend.core.i18n import D_, _ from libervia.backend.core.log import getLogger +from libervia.backend.tools.xml_tools import element_copy + +try: + from .plugin_xep_0167 import NS_JINGLE_RTP +except ImportError: + NS_JINGLE_RTP = None log = getLogger(__name__) @@ -55,6 +61,14 @@ self.reason = reason +class TakenByOtherDeviceException(exceptions.CancelError): + reason: str = "taken_by_other_device" + + def __init__(self, device_jid: jid.JID): + super().__init__(device_jid.full()) + self.device_jid = device_jid + + class RetractException(exceptions.CancelError): pass @@ -70,7 +84,7 @@ "XEP-0166_initiate_elt_built", self, self._on_initiate_trigger, - # this plugin set the resource, we want it to happen first to other trigger + # this plugin set the resource, we want it to happen first so other triggers # can get the full peer JID priority=host.trigger.MAX_PRIORITY, ) @@ -140,13 +154,20 @@ for content_data in session["contents"].values(): # we get the full element build by the application plugin jingle_description_elt = content_data["application_data"]["desc_elt"] - # and copy it to only keep the root <description> element, no children - description_elt = domish.Element( - (jingle_description_elt.uri, jingle_description_elt.name), - defaultUri=jingle_description_elt.defaultUri, - attribs=jingle_description_elt.attributes, - localPrefixes=jingle_description_elt.localPrefixes, - ) + + # we need to copy the element + if jingle_description_elt.uri == NS_JINGLE_RTP: + # for RTP, we only keep the root <description> element, no children + description_elt = domish.Element( + (jingle_description_elt.uri, jingle_description_elt.name), + defaultUri=jingle_description_elt.defaultUri, + attribs=jingle_description_elt.attributes, + localPrefixes=jingle_description_elt.localPrefixes, + ) + else: + # Otherwise we keep the children to have application useful data + description_elt = element_copy(jingle_description_elt, with_parent=False) + message_elt.propose.addChild(description_elt) response_d = defer.Deferred() # we wait for 2 min before cancelling the session init @@ -206,47 +227,82 @@ async def _on_message_received(self, client, message_elt, post_treat): for elt in message_elt.elements(): if elt.uri == NS_JINGLE_MESSAGE: - if elt.name == "propose": - return await self._handle_propose(client, message_elt, elt) - elif elt.name == "retract": - return self._handle_retract(client, message_elt, elt) - elif elt.name == "proceed": - return self._handle_proceed(client, message_elt, elt) - elif elt.name == "accept": - return self._handle_accept(client, message_elt, elt) - elif elt.name == "reject": - return self._handle_reject(client, message_elt, elt) - elif elt.name == "ringing": - return await self._handle_ringing(client, message_elt, elt) - else: - log.warning(f"invalid element: {elt.toXml}") - return True + # We use ensureDeferred to process the message initiation workflow in + # parallel and to avoid blocking the message queue. + defer.ensureDeferred(self._handle_mess_init(client, message_elt, elt)) + return False return True + async def _handle_mess_init( + self, + client: SatXMPPEntity, + message_elt: domish.Element, + mess_init_elt: domish.Element + ) -> None: + if mess_init_elt.name == "propose": + await self._handle_propose(client, message_elt, mess_init_elt) + elif mess_init_elt.name == "retract": + self._handle_retract(client, message_elt, mess_init_elt) + elif mess_init_elt.name == "proceed": + self._handle_proceed(client, message_elt, mess_init_elt) + elif mess_init_elt.name == "accept": + self._handle_accept(client, message_elt, mess_init_elt) + elif mess_init_elt.name == "reject": + self._handle_reject(client, message_elt, mess_init_elt) + elif mess_init_elt.name == "ringing": + await self._handle_ringing(client, message_elt, mess_init_elt) + else: + log.warning(f"invalid element: {mess_init_elt.toXml}") + + def _get_sid_and_session_d( + self, + client: SatXMPPEntity, + elt: domish.Element + ) -> tuple[str, defer.Deferred|list[defer.Deferred]]: + """Retrieve session ID and deferred or list of deferred from response element""" + try: + session_id = elt["id"] + except KeyError as e: + assert elt.parent is not None + log.warning(f"invalid proceed element in message_elt: {elt.parent.toXml()}") + raise e + try: + session_d = client._xep_0353_pending_sessions[session_id] + except KeyError as e: + log.warning( + _( + "no pending session found with id {session_id}, did it timed out?" + ).format(session_id=session_id) + ) + raise e + return session_id, session_d + def _get_sid_and_response_d( self, client: SatXMPPEntity, elt: domish.Element ) -> tuple[str, defer.Deferred]: """Retrieve session ID and response_d from response element""" - try: - session_id = elt["id"] - except KeyError as e: - assert elt.parent is not None - log.warning(f"invalid proceed element in message_elt: {elt.parent.toXml()}") - raise e - try: - response_d = client._xep_0353_pending_sessions[session_id] - except KeyError as e: - log.warning( - _( - "no pending session found with id {session_id}, did it timed out?" - ).format(session_id=session_id) - ) - raise e + session_id, response_d = self._get_sid_and_session_d(client, elt) + assert isinstance(response_d, defer.Deferred) return session_id, response_d - async def _handle_propose(self, client, message_elt, elt): + def _get_sid_and_preflight_d_list( + self, + client: SatXMPPEntity, + elt: domish.Element + ) -> tuple[str, list[defer.Deferred]]: + """Retrieve session ID and list of preflight_d from response element""" + session_id, preflight_d_list = self._get_sid_and_session_d(client, elt) + assert isinstance(preflight_d_list, list) + return session_id, preflight_d_list + + async def _handle_propose( + self, + client: SatXMPPEntity, + message_elt: domish.Element, + elt: domish.Element + ) -> None: peer_jid = jid.JID(message_elt["from"]) local_jid = jid.JID(message_elt["to"]) session_id = elt["id"] @@ -260,17 +316,17 @@ raise AttributeError except AttributeError: log.warning(f"Invalid propose element: {message_elt.toXml()}") - return False + return except exceptions.NotFound: log.warning( f"There is not registered application to handle this " f"proposal: {elt.toXml()}" ) - return False + return if not desc_and_apps: log.warning("No application specified: {message_elt.toXml()}") - return False + return session = self._j.create_session( client, session_id, self._j.ROLE_RESPONDER, peer_jid, local_jid @@ -284,35 +340,51 @@ mess_data = self.build_message_data(client, peer_jid, "ringing", session_id) await client.send_message_data(mess_data) - for description_elt, application in desc_and_apps: - try: - await application.handler.jingle_preflight( - client, session, description_elt - ) - except exceptions.CancelError as e: - log.info(f"{client.profile} refused the session: {e}") + try: + for description_elt, application in desc_and_apps: + try: + preflight_d = defer.ensureDeferred( + application.handler.jingle_preflight( + client, session, description_elt + ) + ) + client._xep_0353_pending_sessions.setdefault(session_id, []).append( + preflight_d + ) + await preflight_d + except TakenByOtherDeviceException as e: + log.info(f"The call has been takend by {e.device_jid}") + await application.handler.jingle_preflight_cancel(client, session, e) + self._j.delete_session(client, session_id) + return + except exceptions.CancelError as e: + log.info(f"{client.profile} refused the session: {e}") - if is_in_roster: - # peer is in our roster, we send reject to them, ou other devices will - # get carbon copies - reject_dest_jid = peer_jid - else: - # peer is not in our roster, we send the "reject" only to our own - # devices to make them stop ringing/doing notification, and we don't - # send anything to peer to avoid presence leak. - reject_dest_jid = client.jid.userhostJID() + if is_in_roster: + # peer is in our roster, we send reject to them, ou other devices + # will get carbon copies + reject_dest_jid = peer_jid + else: + # peer is not in our roster, we send the "reject" only to our own + # devices to make them stop ringing/doing notification, and we + # don't send anything to peer to avoid presence leak. + reject_dest_jid = client.jid.userhostJID() - mess_data = self.build_message_data( - client, reject_dest_jid, "reject", session_id - ) - await client.send_message_data(mess_data) - self._j.delete_session(client, session_id) - - return False - except defer.CancelledError: - # raised when call is retracted before user can reply - self._j.delete_session(client, session_id) - return False + mess_data = self.build_message_data( + client, reject_dest_jid, "reject", session_id + ) + await client.send_message_data(mess_data) + self._j.delete_session(client, session_id) + return + except defer.CancelledError: + # raised when call is retracted before user can reply + self._j.delete_session(client, session_id) + return + finally: + try: + del client._xep_0353_pending_sessions[session_id] + except KeyError: + pass if peer_jid.userhostJID() not in client.roster: await client.presence.available(peer_jid) @@ -320,8 +392,6 @@ mess_data = self.build_message_data(client, peer_jid, "proceed", session_id) await client.send_message_data(mess_data) - return False - def _handle_retract(self, client, message_elt, retract_elt): try: session = self._j.get_session(client, retract_elt["id"]) @@ -345,14 +415,44 @@ d.cancel() return False - def _handle_proceed(self, client, message_elt, proceed_elt): - try: - __, response_d = self._get_sid_and_response_d(client, proceed_elt) - except KeyError: - return True + def _handle_proceed( + self, + client: SatXMPPEntity, + message_elt: domish.Element, + proceed_elt: domish.Element + ) -> None: + from_jid = jid.JID(message_elt["from"]) + # session_d is the deferred of the session, it can be preflight_d or response_d + if from_jid.userhostJID() == client.jid.userhostJID(): + # an other device took the session + try: + sid, preflight_d_list = self._get_sid_and_preflight_d_list( + client, proceed_elt + ) + except KeyError: + return + for preflight_d in preflight_d_list: + if not preflight_d.called: + preflight_d.errback(TakenByOtherDeviceException(from_jid)) - response_d.callback(jid.JID(message_elt["from"])) - return False + try: + session = self._j.get_session(client, sid) + except exceptions.NotFound: + log.warning("No session found with sid {sid!r}.") + else: + # jingle_preflight_cancel? + pass + + # FIXME: Is preflight cancel handler correctly? Check if preflight_d is always + # cleaned correctly (use a timeout?) + + else: + try: + __, response_d = self._get_sid_and_response_d(client, proceed_elt) + except KeyError: + return + # we have a response deferred + response_d.callback(jid.JID(message_elt["from"])) def _handle_accept(self, client, message_elt, accept_elt): pass