changeset 4112:bc60875cb3b8

plugin XEP-0166, XEP-0167, XEP-0234, XEP-0353: call events management to prepare for UI: - XEP-0166: add `jingle_preflight` and `jingle_preflight_cancel` methods to prepare a jingle session, principally used by XEP-0353 to create and cancel a session - XEP-0167: preflight methods implementation, workflow split in more methods/signals to handle UI and call events (e.g.: retract or reject a call) - XEP-0234: implementation of preflight methods as they are now mandatory - XEP-0353: handle various events using the new preflight methods rel 423
author Goffi <goffi@goffi.org>
date Wed, 09 Aug 2023 00:07:37 +0200
parents a8ac5e1e5848
children 3f59a2b141cc
files libervia/backend/plugins/plugin_xep_0166/__init__.py libervia/backend/plugins/plugin_xep_0166/models.py libervia/backend/plugins/plugin_xep_0167/__init__.py libervia/backend/plugins/plugin_xep_0234.py libervia/backend/plugins/plugin_xep_0353.py
diffstat 5 files changed, 523 insertions(+), 121 deletions(-) [+]
line wrap: on
line diff
--- a/libervia/backend/plugins/plugin_xep_0166/__init__.py	Tue Aug 08 23:59:24 2023 +0200
+++ b/libervia/backend/plugins/plugin_xep_0166/__init__.py	Wed Aug 09 00:07:37 2023 +0200
@@ -785,6 +785,8 @@
                 return
 
             try:
+                # session may have been already created in a jingle_preflight, in this
+                # case we re-use it.
                 session = self.get_session(client, sid)
             except exceptions.NotFound:
                 # XXX: we store local_jid using request['to'] because for a component the
--- a/libervia/backend/plugins/plugin_xep_0166/models.py	Tue Aug 08 23:59:24 2023 +0200
+++ b/libervia/backend/plugins/plugin_xep_0166/models.py	Wed Aug 09 00:07:37 2023 +0200
@@ -24,11 +24,37 @@
 from twisted.internet import defer
 from twisted.words.xish import domish
 
+from libervia.backend.core import exceptions
 from libervia.backend.core.core_types import SatXMPPEntity
 from libervia.backend.core.i18n import _
 
+class BaseApplicationHandler(abc.ABC):
 
-class BaseApplicationHandler(abc.ABC):
+    @abc.abstractmethod
+    async def jingle_preflight(
+        self,
+        client: SatXMPPEntity,
+        session: dict,
+        description_elt: domish.Element
+    ) -> None:
+        """Called when preparation steps are needed by a plugin
+
+        Notably used by XEP-0353 when a initiation message is received
+        """
+        pass
+
+    @abc.abstractmethod
+    async def jingle_preflight_cancel(
+        self,
+        client: SatXMPPEntity,
+        session: dict,
+        cancel_error: exceptions.CancelError
+    ) -> None:
+        """Called when preflight initiation is cancelled
+
+        Notably used by XEP-0353 when an initiation message is cancelled
+        """
+        pass
 
     @abc.abstractmethod
     def jingle_request_confirmation(
@@ -42,8 +68,7 @@
         Callable[..., Union[bool, defer.Deferred]],
         Callable[..., Awaitable[bool]]
     ]:
-        """
-        If present, it is called on when session must be accepted.
+        """If present, it is called on when session must be accepted.
         If not present, a generic accept dialog will be used.
 
         @param session: Jingle Session
@@ -64,8 +89,7 @@
         Callable[..., domish.Element],
         Callable[..., Awaitable[domish.Element]]
     ]:
-        """
-        Must return the domish.Element used for initial content.
+        """Must return the domish.Element used for initial content.
 
         @param client: SatXMPPEntity instance
         @param session: Jingle Session
@@ -86,8 +110,7 @@
         Callable[..., None],
         Callable[..., Awaitable[None]]
     ]:
-        """
-        Called on several actions to negotiate the application or transport.
+        """Called on several actions to negotiate the application or transport.
 
         @param client: SatXMPPEntity instance
         @param action: Jingle action
@@ -109,8 +132,7 @@
         Callable[..., None],
         Callable[..., Awaitable[None]]
     ]:
-        """
-        Called on session terminate, with reason_elt.
+        """Called on session terminate, with reason_elt.
         May be used to clean session.
 
         @param reason_elt: Reason element
@@ -131,8 +153,7 @@
         Callable[..., domish.Element],
         Callable[..., Awaitable[domish.Element]]
     ]:
-        """
-        Must return the domish.Element used for initial content.
+        """Must return the domish.Element used for initial content.
 
         @param client: SatXMPPEntity instance
         @param session: Jingle Session
@@ -153,8 +174,7 @@
         Callable[..., None],
         Callable[..., Awaitable[None]]
     ]:
-        """
-        Called on several actions to negotiate the application or transport.
+        """Called on several actions to negotiate the application or transport.
 
         @param client: SatXMPPEntity instance
         @param action: Jingle action
--- a/libervia/backend/plugins/plugin_xep_0167/__init__.py	Tue Aug 08 23:59:24 2023 +0200
+++ b/libervia/backend/plugins/plugin_xep_0167/__init__.py	Wed Aug 09 00:07:37 2023 +0200
@@ -17,7 +17,9 @@
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
 from typing import Optional
+import uuid
 
+from twisted.internet import reactor
 from twisted.internet import defer
 from twisted.words.protocols.jabber import jid
 from twisted.words.protocols.jabber.xmlstream import XMPPHandler
@@ -37,8 +39,8 @@
 from ..plugin_xep_0166 import BaseApplicationHandler
 from .constants import (
     NS_JINGLE_RTP,
+    NS_JINGLE_RTP_AUDIO,
     NS_JINGLE_RTP_INFO,
-    NS_JINGLE_RTP_AUDIO,
     NS_JINGLE_RTP_VIDEO,
 )
 
@@ -88,11 +90,11 @@
             async_=True,
         )
         host.bridge.add_method(
-            "call_end",
+            "call_answer_sdp",
             ".plugin",
             in_sign="sss",
             out_sign="",
-            method=self._call_end,
+            method=self._call_answer_sdp,
             async_=True,
         )
         host.bridge.add_method(
@@ -100,17 +102,32 @@
             ".plugin",
             in_sign="ssss",
             out_sign="",
-            method=self._call_start,
+            method=self._call_info,
         )
+        host.bridge.add_method(
+            "call_end",
+            ".plugin",
+            in_sign="sss",
+            out_sign="",
+            method=self._call_end,
+            async_=True,
+        )
+
+        # args: session_id, serialised setup data (dict with keys "role" and "sdp"),
+        #   profile
         host.bridge.add_signal(
-            "call_accepted", ".plugin", signature="sss"
-        )  # args: session_id, answer_sdp, profile
+            "call_setup", ".plugin", signature="sss"
+        )
+
+        # args: session_id, data, profile
         host.bridge.add_signal(
             "call_ended", ".plugin", signature="sss"
-        )  # args: session_id, data, profile
+        )
+
+        # args: session_id, info_type, extra, profile
         host.bridge.add_signal(
             "call_info", ".plugin", signature="ssss"
-        )  # args: session_id, info_type, extra, profile
+        )
 
     def get_handler(self, client):
         return XEP_0167_handler()
@@ -135,8 +152,26 @@
         client: SatXMPPEntity,
         peer_jid: jid.JID,
         call_data: dict,
-    ) -> None:
-        """Temporary method to test RTP session"""
+    ) -> str:
+        """Initiate a call session with the given peer.
+
+        @param peer_jid: JID of the peer to initiate a call session with.
+        @param call_data: Dictionary containing data for the call. Must include SDP information.
+            The dict can have the following keys:
+                - sdp (str): SDP data for the call.
+                - metadata (dict): Additional metadata for the call (optional).
+            Each media type ("audio" and "video") in the SDP should have:
+                - application_data (dict): Data about the media.
+                - fingerprint (str): Security fingerprint data (optional).
+                - id (str): Identifier for the media (optional).
+                - ice-candidates: ICE candidates for media transport.
+                - And other transport specific data.
+
+        @return: Session ID (SID) for the initiated call session.
+
+        @raises exceptions.DataError: If media data is invalid or duplicate content name
+            (mid) is found.
+        """
         contents = []
         metadata = call_data.get("metadata") or {}
 
@@ -201,14 +236,35 @@
                 contents.append(content)
         if not contents:
             raise exceptions.DataError("no valid media data found: {call_data}")
-        return await self._j.initiate(
-            client,
-            peer_jid,
-            contents,
-            call_type=call_type,
-            metadata=metadata,
-            peer_metadata={},
+        sid = str(uuid.uuid4())
+        defer.ensureDeferred(
+            self._j.initiate(
+                client,
+                peer_jid,
+                contents,
+                sid=sid,
+                call_type=call_type,
+                metadata=metadata,
+                peer_metadata={},
+            )
         )
+        return sid
+
+    def _call_answer_sdp(
+        self,
+        session_id: str,
+        answer_sdp: str,
+        profile: str
+    ) -> None:
+        client = self.host.get_client(profile)
+        session = self._j.get_session(client, session_id)
+        try:
+            answer_sdp_d = session.pop("answer_sdp_d")
+        except KeyError:
+            raise exceptions.NotFound(
+                f"No answer SDP expected for session {session_id!r}"
+            )
+        answer_sdp_d.callback(answer_sdp)
 
     def _call_end(
         self,
@@ -240,6 +296,123 @@
 
     # jingle callbacks
 
+    async def confirm_incoming_call(
+        self,
+        client: SatXMPPEntity,
+        session: dict,
+        call_type: str
+    ) -> bool:
+        """Prompt the user for a call confirmation.
+
+        @param client: The client entity.
+        @param session: The Jingle session.
+        @param media_type: Type of media (audio or video).
+
+        @return: True if the call has been accepted
+        """
+        peer_jid = session["peer_jid"]
+
+        session["call_type"] = call_type
+        cancellable_deferred = session.setdefault("cancellable_deferred", [])
+
+        dialog_d = xml_tools.defer_dialog(
+            self.host,
+            _(CONFIRM).format(peer=peer_jid.userhost(), call_type=call_type),
+            _(CONFIRM_TITLE),
+            action_extra={
+                "session_id": session["id"],
+                "from_jid": peer_jid.full(),
+                "type": C.META_TYPE_CALL,
+                "sub_type": call_type,
+            },
+            security_limit=SECURITY_LIMIT,
+            profile=client.profile,
+        )
+
+        cancellable_deferred.append(dialog_d)
+
+        resp_data = await dialog_d
+
+        accepted = not resp_data.get("cancelled", False)
+
+        if accepted:
+            session["call_accepted"] = True
+
+        return accepted
+
+    async def jingle_preflight(
+        self,
+        client: SatXMPPEntity,
+        session: dict,
+        description_elt: domish.Element
+    ) -> None:
+        """Perform preflight checks for an incoming call session.
+
+        Check if the calls is audio only or audio/video, then, prompts the user for
+        confirmation.
+
+        @param client: The client instance.
+        @param session: Jingle session.
+        @param description_elt: The description element. It's parent attribute is used to
+            determine check siblings to see if it's an audio only or audio/video call.
+
+        @raises exceptions.CancelError: If the user doesn't accept the incoming call.
+        """
+        if session.get("call_accepted", False):
+            # the call is already accepted, nothing to do
+            return
+
+        parent_elt = description_elt.parent
+        assert parent_elt is not None
+
+        assert description_elt.parent is not None
+        for desc_elt in parent_elt.elements(NS_JINGLE_RTP, "description"):
+            if desc_elt.getAttribute("media") == "video":
+                call_type = C.META_SUBTYPE_CALL_VIDEO
+                break
+        else:
+            call_type = C.META_SUBTYPE_CALL_AUDIO
+
+        try:
+            accepted = await self.confirm_incoming_call(client, session, call_type)
+        except defer.CancelledError as e:
+            # raised when call is retracted before user has answered or rejected
+            self.host.bridge.call_ended(
+                session["id"],
+                data_format.serialise({"reason": "retracted"}),
+                client.profile
+            )
+            raise e
+
+        if not accepted:
+            raise exceptions.CancelError("User declined the incoming call.")
+
+    async def jingle_preflight_cancel(
+        self,
+        client: SatXMPPEntity,
+        session: dict,
+        cancel_error: exceptions.CancelError
+    ) -> None:
+        """The call has been rejected"""
+        # call_ended is use to send the signal only once even if there are audio and video
+        # contents
+        call_ended = session.get("call_ended", False)
+        if call_ended:
+            return
+        data = {
+            "reason": getattr(cancel_error, "reason", "cancelled")
+        }
+        text = getattr(cancel_error, "text", None)
+        if text:
+            data["text"] = text
+        self.host.bridge.call_ended(
+            session["id"],
+            data_format.serialise(data),
+            client.profile
+        )
+        session["call_ended"] = True
+
+
     def jingle_session_init(
         self,
         client: SatXMPPEntity,
@@ -275,42 +448,56 @@
         content_name: str,
         desc_elt: domish.Element,
     ) -> bool:
+        """Requests confirmation from the user for a Jingle session's incoming call.
+
+        This method checks the content type of the Jingle session (audio or video)
+        based on the session's contents. Confirmation is requested only for the first
+        content; subsequent contents are automatically accepted. This means, in practice,
+        that the call confirmation is prompted only once for both audio and video contents.
+
+        @param client: The client instance.
+        @param action: The action type associated with the Jingle session.
+        @param session: Jingle session.
+        @param content_name: Name of the content being checked.
+        @param desc_elt: The description element associated with the content.
+
+        @return: True if the call is accepted by the user, False otherwise.
+        """
         if content_name != next(iter(session["contents"])):
             # we request confirmation only for the first content, all others are
             # automatically accepted. In practice, that means that the call confirmation
             # is requested only once for audio and video contents.
             return True
-        peer_jid = session["peer_jid"]
 
-        if any(
-            c["desc_elt"].getAttribute("media") == "video"
-            for c in session["contents"].values()
-        ):
-            call_type = session["call_type"] = C.META_SUBTYPE_CALL_VIDEO
-        else:
-            call_type = session["call_type"] = C.META_SUBTYPE_CALL_AUDIO
+        if not session.get("call_accepted", False):
+            if any(
+                c["desc_elt"].getAttribute("media") == "video"
+                for c in session["contents"].values()
+            ):
+                call_type = session["call_type"] = C.META_SUBTYPE_CALL_VIDEO
+            else:
+                call_type = session["call_type"] = C.META_SUBTYPE_CALL_AUDIO
+
+            accepted = await self.confirm_incoming_call(client, session, call_type)
+            if not accepted:
+                return False
 
         sdp = mapping.generate_sdp_from_session(session)
+        session["answer_sdp_d"] = answer_sdp_d = defer.Deferred()
+        # we should have the answer long before 2 min
+        answer_sdp_d.addTimeout(2 * 60, reactor)
 
-        resp_data = await xml_tools.defer_dialog(
-            self.host,
-            _(CONFIRM).format(peer=peer_jid.userhost(), call_type=call_type),
-            _(CONFIRM_TITLE),
-            action_extra={
-                "session_id": session["id"],
-                "from_jid": peer_jid.full(),
-                "type": C.META_TYPE_CALL,
-                "sub_type": call_type,
+        self.host.bridge.call_setup(
+            session["id"],
+            data_format.serialise({
+                "role": session["role"],
                 "sdp": sdp,
-            },
-            security_limit=SECURITY_LIMIT,
-            profile=client.profile,
+            }),
+            client.profile
         )
 
-        if resp_data.get("cancelled", False):
-            return False
+        answer_sdp = await answer_sdp_d
 
-        answer_sdp = resp_data["sdp"]
         parsed_answer = mapping.parse_sdp(answer_sdp)
         session["peer_metadata"].update(parsed_answer["metadata"])
         for media in ("audio", "video"):
@@ -352,7 +539,14 @@
                 # we only send the signal for first content, as it means that the whole
                 # session is accepted
                 answer_sdp = mapping.generate_sdp_from_session(session)
-                self.host.bridge.call_accepted(session["id"], answer_sdp, client.profile)
+                self.host.bridge.call_setup(
+                    session["id"],
+                    data_format.serialise({
+                        "role": session["role"],
+                        "sdp": answer_sdp,
+                    }),
+                    client.profile
+                )
         else:
             log.warning(f"FIXME: unmanaged action {action}")
 
@@ -397,7 +591,6 @@
         extra = data_format.deserialise(extra_s)
         return self.send_info(client, session_id, info_type, extra)
 
-
     def send_info(
         self,
         client: SatXMPPEntity,
@@ -423,7 +616,15 @@
         content_name: str,
         reason_elt: domish.Element,
     ) -> None:
-        self.host.bridge.call_ended(session["id"], "", client.profile)
+        reason, text = self._j.parse_reason_elt(reason_elt)
+        data = {
+            "reason": reason
+        }
+        if text:
+            data["text"] = text
+        self.host.bridge.call_ended(
+            session["id"], data_format.serialise(data), client.profile
+        )
 
 
 @implementer(iwokkel.IDisco)
--- a/libervia/backend/plugins/plugin_xep_0234.py	Tue Aug 08 23:59:24 2023 +0200
+++ b/libervia/backend/plugins/plugin_xep_0234.py	Wed Aug 09 00:07:37 2023 +0200
@@ -32,13 +32,17 @@
 
 from libervia.backend.core import exceptions
 from libervia.backend.core.constants import Const as C
+from libervia.backend.core.core_types import SatXMPPEntity
 from libervia.backend.core.i18n import D_, _
 from libervia.backend.core.log import getLogger
+from libervia.backend.tools import xml_tools
 from libervia.backend.tools import utils
 from libervia.backend.tools import stream
 from libervia.backend.tools.common import date_utils
 from libervia.backend.tools.common import regex
 
+from .plugin_xep_0166 import BaseApplicationHandler
+
 
 log = getLogger(__name__)
 
@@ -60,7 +64,7 @@
 Range = namedtuple("Range", ("offset", "length"))
 
 
-class XEP_0234:
+class XEP_0234(BaseApplicationHandler):
     # TODO: assure everything is closed when file is sent or session terminate is received
     # TODO: call self._f.unregister when unloading order will be managing (i.e. when
     #   dependencies will be unloaded at the end)
@@ -407,6 +411,67 @@
 
     # jingle callbacks
 
+    async def jingle_preflight(
+        self,
+        client: SatXMPPEntity,
+        session: dict,
+        description_elt: domish.Element
+    ) -> None:
+        """Perform preflight checks for an incoming call session.
+
+        Check if the calls is audio only or audio/video, then, prompts the user for
+        confirmation.
+
+        @param client: The client instance.
+        @param session: Jingle session.
+        @param description_elt: The description element. It's parent attribute is used to
+            determine check siblings to see if it's an audio only or audio/video call.
+
+        @raises exceptions.CancelError: If the user doesn't accept the incoming call.
+        """
+        session_id = session["id"]
+        peer_jid = session["peer_jid"]
+        # FIXME: has been moved from XEP-0353, but it doesn't handle correctly file
+        #   transfer (metadata are not used). We must check with other clients what is
+        #   actually send, and if XEP-0353 is used, and do a better integration.
+
+        if client.roster and peer_jid.userhostJID() not in client.roster:
+            confirm_msg = D_(
+                "Somebody not in your contact list ({peer_jid}) wants to do a "
+                '"{human_name}" session with you, this would leak your presence and '
+                "possibly you IP (internet localisation), do you accept?"
+            ).format(peer_jid=peer_jid, human_name=self.human_name)
+            confirm_title = D_("File sent from an unknown contact")
+            action_type = C.META_TYPE_NOT_IN_ROSTER_LEAK
+        else:
+            confirm_msg = D_(
+                "{peer_jid} wants to send a file ro you, do you accept?"
+            ).format(peer_jid=peer_jid)
+            confirm_title = D_("File Proposed")
+            action_type = C.META_TYPE_FILE
+        accepted = await xml_tools.defer_confirm(
+            self.host,
+            confirm_msg,
+            confirm_title,
+            profile=client.profile,
+            action_extra={
+                "type": action_type,
+                "session_id": session_id,
+                "from_jid": peer_jid.full(),
+            },
+        )
+        if accepted:
+            session["file_accepted"] = True
+        return accepted
+
+    async def jingle_preflight_cancel(
+        self,
+        client: SatXMPPEntity,
+        session: dict,
+        cancel_error: exceptions.CancelError
+    ) -> None:
+        pass
+
     def jingle_description_elt(
         self, client, session, content_name, filepath, name, extra, progress_id_d
     ):
--- a/libervia/backend/plugins/plugin_xep_0353.py	Tue Aug 08 23:59:24 2023 +0200
+++ b/libervia/backend/plugins/plugin_xep_0353.py	Wed Aug 09 00:07:37 2023 +0200
@@ -49,6 +49,17 @@
 }
 
 
+class RejectException(exceptions.CancelError):
+
+    def __init__(self, reason: str, text: str|None = None):
+        super().__init__(text)
+        self.reason = reason
+
+
+class RetractException(exceptions.CancelError):
+    pass
+
+
 class XEP_0353:
     def __init__(self, host):
         log.info(_("plugin {name} initialization").format(name=PLUGIN_INFO[C.PI_NAME]))
@@ -63,6 +74,11 @@
             # can get the full peer JID
             priority=host.trigger.MAX_PRIORITY,
         )
+        host.trigger.add(
+            "XEP-0166_terminate",
+            self._terminate_trigger,
+            priority=host.trigger.MAX_PRIORITY,
+        )
         host.trigger.add("message_received", self._on_message_received)
 
     def get_handler(self, client):
@@ -136,8 +152,8 @@
         # FIXME: let's application decide timeout?
         response_d.addTimeout(2 * 60, reactor)
         client._xep_0353_pending_sessions[session["id"]] = response_d
-        await client.send_message_data(mess_data)
         try:
+            await client.send_message_data(mess_data)
             accepting_jid = await response_d
         except defer.TimeoutError:
             log.warning(
@@ -145,6 +161,14 @@
                     peer_jid=peer_jid
                 )
             )
+        except exceptions.CancelError as e:
+            for content in session["contents"].values():
+                await content["application"].handler.jingle_preflight_cancel(
+                    client, session, e
+                )
+
+            self._j.delete_session(client, session["id"])
+            return False
         else:
             if iq_elt["to"] != accepting_jid.userhost():
                 raise exceptions.InternalError(
@@ -154,9 +178,30 @@
                 )
             iq_elt["to"] = accepting_jid.full()
             session["peer_jid"] = accepting_jid
-        del client._xep_0353_pending_sessions[session["id"]]
+        finally:
+            del client._xep_0353_pending_sessions[session["id"]]
         return True
 
+    def _terminate_trigger(
+        self,
+        client: SatXMPPEntity,
+        session: dict,
+        reason_elt: domish.Element
+    ) -> bool:
+        session_id = session["id"]
+        try:
+            response_d = client._xep_0353_pending_sessions[session_id]
+        except KeyError:
+            return True
+        # we have a XEP-0353 session, that means that we are retracting a proposed session
+        mess_data = self.build_message_data(
+            client, session["peer_jid"], "retract", session_id
+        )
+        defer.ensureDeferred(client.send_message_data(mess_data))
+        response_d.errback(RetractException())
+
+        return False
+
     async def _on_message_received(self, client, message_elt, post_treat):
         for elt in message_elt.elements():
             if elt.uri == NS_JINGLE_MESSAGE:
@@ -169,79 +214,138 @@
                 elif elt.name == "accept":
                     return self._handle_accept(client, message_elt, elt)
                 elif elt.name == "reject":
-                    return self._handle_accept(client, message_elt, elt)
+                    return self._handle_reject(client, message_elt, elt)
                 else:
                     log.warning(f"invalid element: {elt.toXml}")
                     return True
         return True
 
-    async def _handle_propose(self, client, message_elt, elt):
-        peer_jid = jid.JID(message_elt["from"])
-        session_id = elt["id"]
-        if peer_jid.userhostJID() not in client.roster:
-            app_ns = elt.description.uri
-            try:
-                application = self._j.get_application(app_ns)
-                human_name = getattr(application.handler, "human_name", application.name)
-            except (exceptions.NotFound, AttributeError):
-                if app_ns.startswith("urn:xmpp:jingle:apps:"):
-                    human_name = app_ns[21:].split(":", 1)[0].replace("-", " ").title()
-                else:
-                    splitted_ns = app_ns.split(":")
-                    if len(splitted_ns) > 1:
-                        human_name = splitted_ns[-2].replace("- ", " ").title()
-                    else:
-                        human_name = app_ns
-
-            confirm_msg = D_(
-                "Somebody not in your contact list ({peer_jid}) wants to do a "
-                '"{human_name}" session with you, this would leak your presence and '
-                "possibly you IP (internet localisation), do you accept?"
-            ).format(peer_jid=peer_jid, human_name=human_name)
-            confirm_title = D_("Invitation from an unknown contact")
-            accept = await xml_tools.defer_confirm(
-                self.host,
-                confirm_msg,
-                confirm_title,
-                profile=client.profile,
-                action_extra={
-                    "type": C.META_TYPE_NOT_IN_ROSTER_LEAK,
-                    "session_id": session_id,
-                    "from_jid": peer_jid.full(),
-                },
-            )
-            if not accept:
-                mess_data = self.build_message_data(
-                    client, client.jid.userhostJID(), "reject", session_id
-                )
-                await client.send_message_data(mess_data)
-                # we don't sent anything to sender, to avoid leaking presence
-                return False
-            else:
-                await client.presence.available(peer_jid)
-        session_id = elt["id"]
-        mess_data = self.build_message_data(client, peer_jid, "proceed", session_id)
-        await client.send_message_data(mess_data)
-        return False
-
-    def _handle_retract(self, client, message_elt, proceed_elt):
-        log.warning("retract is not implemented yet")
-        return False
-
-    def _handle_proceed(self, client, message_elt, proceed_elt):
+    def _get_sid_and_response_d(
+        self,
+        client: SatXMPPEntity,
+        elt: domish.Element
+    ) -> tuple[str, defer.Deferred]:
+        """Retrieve session ID and response_d from response element"""
         try:
-            session_id = proceed_elt["id"]
-        except KeyError:
-            log.warning(f"invalid proceed element in message_elt: {message_elt}")
-            return True
+            session_id = elt["id"]
+        except KeyError as e:
+            assert elt.parent is not None
+            log.warning(f"invalid proceed element in message_elt: {elt.parent.toXml()}")
+            raise e
         try:
             response_d = client._xep_0353_pending_sessions[session_id]
-        except KeyError:
+        except KeyError as e:
             log.warning(
                 _(
                     "no pending session found with id {session_id}, did it timed out?"
                 ).format(session_id=session_id)
             )
+            raise e
+        return session_id, response_d
+
+    async def _handle_propose(self, client, message_elt, elt):
+        peer_jid = jid.JID(message_elt["from"])
+        local_jid = jid.JID(message_elt["to"])
+        session_id = elt["id"]
+        try:
+            desc_and_apps = [
+                (description_elt, self._j.get_application(description_elt.uri))
+                for description_elt in elt.elements()
+                if description_elt.name == "description"
+            ]
+            if not desc_and_apps:
+                raise AttributeError
+        except AttributeError:
+            log.warning(f"Invalid propose element: {message_elt.toXml()}")
+            return False
+        except exceptions.NotFound:
+            log.warning(
+                f"There is not registered application to handle this "
+                f"proposal: {elt.toXml()}"
+            )
+            return False
+
+        if not desc_and_apps:
+            log.warning("No application specified: {message_elt.toXml()}")
+            return False
+
+        session = self._j.create_session(
+            client, session_id, self._j.ROLE_RESPONDER, peer_jid, local_jid
+        )
+
+        is_in_roster = peer_jid.userhostJID() in client.roster
+        if is_in_roster:
+            # we indicate that device is ringing as explained in
+            # https://xmpp.org/extensions/xep-0353.html#ring , but we only do that if user
+            # is in roster to avoid presence leak of all our devices.
+            mess_data = self.build_message_data(client, peer_jid, "ringing", session_id)
+            await client.send_message_data(mess_data)
+
+        for description_elt, application in desc_and_apps:
+            try:
+                await application.handler.jingle_preflight(
+                    client, session, description_elt
+                )
+            except exceptions.CancelError as e:
+                log.info(f"{client.profile} refused the session: {e}")
+
+                if is_in_roster:
+                    # peer is in our roster, we send reject to them, ou other devices will
+                    # get carbon copies
+                    reject_dest_jid = peer_jid
+                else:
+                    # peer is not in our roster, we send the "reject" only to our own
+                    # devices to make them stop ringing/doing notification, and we don't
+                    # send anything to peer to avoid presence leak.
+                    reject_dest_jid = client.jid.userhostJID()
+
+                mess_data = self.build_message_data(
+                    client, reject_dest_jid, "reject", session_id
+                )
+                await client.send_message_data(mess_data)
+                self._j.delete_session(client, session_id)
+
+                return False
+            except defer.CancelledError:
+                # raised when call is retracted before user can reply
+                self._j.delete_session(client, session_id)
+                return False
+
+        if peer_jid.userhostJID() not in client.roster:
+            await client.presence.available(peer_jid)
+
+        mess_data = self.build_message_data(client, peer_jid, "proceed", session_id)
+        await client.send_message_data(mess_data)
+
+        return False
+
+    def _handle_retract(self, client, message_elt, retract_elt):
+        try:
+            session = self._j.get_session(client, retract_elt["id"])
+        except KeyError:
+            log.warning(f"invalid retract element: {message_elt.toXml()}")
+            return False
+        except exceptions.NotFound:
+            log.warning(f"no session found with ID {retract_elt['id']}")
+            return False
+        log.debug(
+            f"{message_elt['from']} are retracting their proposal {retract_elt['id']}"
+        )
+        try:
+            cancellable_deferred = session["cancellable_deferred"]
+            if not cancellable_deferred:
+                raise KeyError
+        except KeyError:
+            self._j.delete_session(client, session["id"])
+        else:
+            for d in cancellable_deferred:
+                d.cancel()
+        return False
+
+    def _handle_proceed(self, client, message_elt, proceed_elt):
+        try:
+            __, response_d = self._get_sid_and_response_d(client, proceed_elt)
+        except KeyError:
             return True
 
         response_d.callback(jid.JID(message_elt["from"]))
@@ -250,8 +354,18 @@
     def _handle_accept(self, client, message_elt, accept_elt):
         pass
 
-    def _handle_reject(self, client, message_elt, accept_elt):
-        pass
+    def _handle_reject(self, client, message_elt, reject_elt):
+        try:
+            __, response_d = self._get_sid_and_response_d(client, reject_elt)
+        except KeyError:
+            return True
+        reason_elt = self._j.get_reason_elt(reject_elt)
+        reason, text = self._j.parse_reason_elt(reason_elt)
+        if reason is None:
+            reason = "busy"
+
+        response_d.errback(RejectException(reason, text))
+        return False
 
 
 @implementer(iwokkel.IDisco)