changeset 4056:1c4f4aa36d98

plugin XEP-0167: Jingle RTP Sessions implementation: rel 420
author Goffi <goffi@goffi.org>
date Mon, 29 May 2023 13:38:10 +0200
parents 38819c69aa39
children e807a5434f82
files sat/core/constants.py sat/plugins/plugin_xep_0167/__init__.py sat/plugins/plugin_xep_0167/constants.py sat/plugins/plugin_xep_0167/mapping.py
diffstat 4 files changed, 1077 insertions(+), 1 deletions(-) [+]
line wrap: on
line diff
--- a/sat/core/constants.py	Mon May 29 13:32:40 2023 +0200
+++ b/sat/core/constants.py	Mon May 29 13:38:10 2023 +0200
@@ -22,7 +22,7 @@
 except ImportError:
     BaseDirectory = None
 from os.path import dirname
-from typing_extensions import Final
+from typing import Final
 import sat
 
 
@@ -327,8 +327,11 @@
 
     ## action constants ##
     META_TYPE_FILE = "file"
+    META_TYPE_CALL = "call"
     META_TYPE_OVERWRITE = "overwrite"
     META_TYPE_NOT_IN_ROSTER_LEAK = "not_in_roster_leak"
+    META_SUBTYPE_CALL_AUDIO = "audio"
+    META_SUBTYPE_CALL_VIDEO = "video"
 
     ## HARD-CODED ACTIONS IDS (generated with uuid.uuid4) ##
     AUTHENTICATE_PROFILE_ID = "b03bbfa8-a4ae-4734-a248-06ce6c7cf562"
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/sat/plugins/plugin_xep_0167/__init__.py	Mon May 29 13:38:10 2023 +0200
@@ -0,0 +1,401 @@
+#!/usr/bin/env python3
+
+# Libervia plugin for managing pipes (experimental)
+# Copyright (C) 2009-2023 Jérôme Poisson (goffi@goffi.org)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+from typing import Optional
+
+from twisted.internet import defer
+from twisted.words.protocols.jabber import jid
+from twisted.words.protocols.jabber.xmlstream import XMPPHandler
+from twisted.words.xish import domish
+from wokkel import disco, iwokkel
+from zope.interface import implementer
+
+from sat.core import exceptions
+from sat.core.constants import Const as C
+from sat.core.core_types import SatXMPPEntity
+from sat.core.i18n import D_, _
+from sat.core.log import getLogger
+from sat.tools import xml_tools
+from sat.tools.common import data_format
+
+from . import mapping
+from ..plugin_xep_0166 import BaseApplicationHandler
+from .constants import (
+    NS_JINGLE_RTP,
+    NS_JINGLE_RTP_INFO,
+    NS_JINGLE_RTP_AUDIO,
+    NS_JINGLE_RTP_VIDEO,
+)
+
+
+log = getLogger(__name__)
+
+
+PLUGIN_INFO = {
+    C.PI_NAME: "Jingle RTP Sessions",
+    C.PI_IMPORT_NAME: "XEP-0167",
+    C.PI_TYPE: "XEP",
+    C.PI_PROTOCOLS: ["XEP-0167"],
+    C.PI_DEPENDENCIES: ["XEP-0166"],
+    C.PI_MAIN: "XEP_0167",
+    C.PI_HANDLER: "yes",
+    C.PI_DESCRIPTION: _("""Real-time Transport Protocol (RTP) is used for A/V calls"""),
+}
+
+CONFIRM = D_("{peer} wants to start a call ({call_type}) with you, do you accept?")
+CONFIRM_TITLE = D_("Incoming Call")
+SECURITY_LIMIT = 0
+
+ALLOWED_ACTIONS = (
+    "active",
+    "hold",
+    "unhold",
+    "mute",
+    "unmute",
+    "ringing",
+)
+
+
+class XEP_0167(BaseApplicationHandler):
+    def __init__(self, host):
+        log.info(f'Plugin "{PLUGIN_INFO[C.PI_NAME]}" initialization')
+        self.host = host
+        # FIXME: to be removed once host is accessible from global var
+        mapping.host = host
+        self._j = host.plugins["XEP-0166"]
+        self._j.register_application(NS_JINGLE_RTP, self)
+        host.bridge.add_method(
+            "call_start",
+            ".plugin",
+            in_sign="sss",
+            out_sign="s",
+            method=self._call_start,
+            async_=True,
+        )
+        host.bridge.add_method(
+            "call_info",
+            ".plugin",
+            in_sign="ssss",
+            out_sign="",
+            method=self._call_start,
+        )
+        host.bridge.add_signal(
+            "call_accepted", ".plugin", signature="sss"
+        )  # args: session_id, answer_sdp, profile
+        host.bridge.add_signal(
+            "call_info", ".plugin", signature="ssss"
+        )  # args: session_id, info_type, extra, profile
+
+    def get_handler(self, client):
+        return XEP_0167_handler()
+
+    # bridge methods
+
+    def _call_start(
+        self,
+        entity_s: str,
+        call_data_s: str,
+        profile_key: str,
+    ):
+        client = self.host.get_client(profile_key)
+        return defer.ensureDeferred(
+            self.call_start(
+                client, jid.JID(entity_s), data_format.deserialise(call_data_s)
+            )
+        )
+
+    async def call_start(
+        self,
+        client: SatXMPPEntity,
+        peer_jid: jid.JID,
+        call_data: dict,
+        media: str = "video",
+    ) -> None:
+        """Temporary method to test RTP session"""
+        contents = []
+        metadata = call_data.get("metadata") or {}
+
+        if "sdp" in call_data:
+            sdp_data = mapping.parse_sdp(call_data["sdp"])
+            for media_type in ("audio", "video"):
+                try:
+                    media_data = sdp_data.pop(media_type)
+                except KeyError:
+                    continue
+                call_data[media_type] = media_data["application_data"]
+                transport_data = media_data["transport_data"]
+                try:
+                    call_data[media_type]["fingerprint"] = transport_data["fingerprint"]
+                except KeyError:
+                    log.warning("fingerprint is missing")
+                    pass
+                try:
+                    call_data[media_type]["id"] = media_data["id"]
+                except KeyError:
+                    log.warning(f"no media ID found for {media_type}: {media_data}")
+                try:
+                    call_data[media_type]["ice-candidates"] = transport_data["candidates"]
+                    metadata["ice-ufrag"] = transport_data["ufrag"]
+                    metadata["ice-pwd"] = transport_data["pwd"]
+                except KeyError:
+                    log.warning("ICE data are missing from SDP")
+                    continue
+            metadata.update(sdp_data.get("metadata", {}))
+
+        call_type = (
+            C.META_SUBTYPE_CALL_VIDEO
+            if "video" in call_data
+            else C.META_SUBTYPE_CALL_AUDIO
+        )
+        seen_names = set()
+
+        for media in ("audio", "video"):
+            media_data = call_data.get(media)
+            if media_data is not None:
+                content = {
+                    "app_ns": NS_JINGLE_RTP,
+                    "senders": "both",
+                    "transport_type": self._j.TRANSPORT_DATAGRAM,
+                    "app_kwargs": {"media": media, "media_data": media_data},
+                    "transport_data": {
+                        "local_ice_data": {
+                            "ufrag": metadata["ice-ufrag"],
+                            "pwd": metadata["ice-pwd"],
+                            "candidates": media_data.pop("ice-candidates"),
+                            "fingerprint": media_data.pop("fingerprint", {}),
+                        }
+                    },
+                }
+                if "id" in media_data:
+                    name = media_data.pop("id")
+                    if name in seen_names:
+                        raise exceptions.DataError(
+                            f"Content name (mid) seen multiple times: {name}"
+                        )
+                    content["name"] = name
+                contents.append(content)
+        if not contents:
+            raise exceptions.DataError("no valid media data found: {call_data}")
+        return await self._j.initiate(
+            client,
+            peer_jid,
+            contents,
+            call_type=call_type,
+            metadata=metadata,
+            peer_metadata={},
+        )
+
+    # jingle callbacks
+
+    def jingle_session_init(
+        self,
+        client: SatXMPPEntity,
+        session: dict,
+        content_name: str,
+        media: str,
+        media_data: dict,
+    ) -> domish.Element:
+        if media not in ("audio", "video"):
+            raise ValueError('only "audio" and "video" media types are supported')
+        content_data = session["contents"][content_name]
+        application_data = content_data["application_data"]
+        application_data["media"] = media
+        application_data["local_data"] = media_data
+        desc_elt = mapping.build_description(media, media_data, session)
+        self.host.trigger.point(
+            "XEP-0167_jingle_session_init",
+            client,
+            session,
+            content_name,
+            media,
+            media_data,
+            desc_elt,
+            triggers_no_cancel=True,
+        )
+        return desc_elt
+
+    async def jingle_request_confirmation(
+        self,
+        client: SatXMPPEntity,
+        action: str,
+        session: dict,
+        content_name: str,
+        desc_elt: domish.Element,
+    ) -> bool:
+        if content_name != next(iter(session["contents"])):
+            # we request confirmation only for the first content, all others are
+            # automatically accepted. In practice, that means that the call confirmation
+            # is requested only once for audio and video contents.
+            return True
+        peer_jid = session["peer_jid"]
+
+        if any(
+            c["desc_elt"].getAttribute("media") == "video"
+            for c in session["contents"].values()
+        ):
+            call_type = session["call_type"] = C.META_SUBTYPE_CALL_VIDEO
+        else:
+            call_type = session["call_type"] = C.META_SUBTYPE_CALL_AUDIO
+
+        sdp = mapping.generate_sdp_from_session(session)
+
+        resp_data = await xml_tools.defer_dialog(
+            self.host,
+            _(CONFIRM).format(peer=peer_jid.userhost(), call_type=call_type),
+            _(CONFIRM_TITLE),
+            action_extra={
+                "session_id": session["id"],
+                "from_jid": peer_jid.full(),
+                "type": C.META_TYPE_CALL,
+                "sub_type": call_type,
+                "sdp": sdp,
+            },
+            security_limit=SECURITY_LIMIT,
+            profile=client.profile,
+        )
+
+        if resp_data.get("cancelled", False):
+            return False
+
+        answer_sdp = resp_data["sdp"]
+        parsed_answer = mapping.parse_sdp(answer_sdp)
+        session["peer_metadata"].update(parsed_answer["metadata"])
+        for media in ("audio", "video"):
+            for content in session["contents"].values():
+                if content["desc_elt"].getAttribute("media") == media:
+                    media_data = parsed_answer[media]
+                    application_data = content["application_data"]
+                    application_data["local_data"] = media_data["application_data"]
+                    transport_data = content["transport_data"]
+                    local_ice_data = media_data["transport_data"]
+                    transport_data["local_ice_data"] = local_ice_data
+
+        return True
+
+    async def jingle_handler(self, client, action, session, content_name, desc_elt):
+        content_data = session["contents"][content_name]
+        application_data = content_data["application_data"]
+        if action == self._j.A_PREPARE_CONFIRMATION:
+            session["metadata"] = {}
+            session["peer_metadata"] = {}
+            try:
+                media = application_data["media"] = desc_elt["media"]
+            except KeyError:
+                raise exceptions.DataError('"media" key is missing in {desc_elt.toXml()}')
+            if media not in ("audio", "video"):
+                raise exceptions.DataError(f"invalid media: {media!r}")
+            application_data["peer_data"] = mapping.parse_description(desc_elt)
+        elif action == self._j.A_SESSION_INITIATE:
+            application_data["peer_data"] = mapping.parse_description(desc_elt)
+            desc_elt = mapping.build_description(
+                application_data["media"], application_data["local_data"], session
+            )
+        elif action == self._j.A_ACCEPTED_ACK:
+            pass
+        elif action == self._j.A_PREPARE_INITIATOR:
+            application_data["peer_data"] = mapping.parse_description(desc_elt)
+        elif action == self._j.A_SESSION_ACCEPT:
+            if content_name == next(iter(session["contents"])):
+                # we only send the signal for first content, as it means that the whole
+                # session is accepted
+                answer_sdp = mapping.generate_sdp_from_session(session)
+                self.host.bridge.call_accepted(session["id"], answer_sdp, client.profile)
+        else:
+            log.warning(f"FIXME: unmanaged action {action}")
+
+        self.host.trigger.point(
+            "XEP-0167_jingle_handler",
+            client,
+            action,
+            session,
+            content_name,
+            desc_elt,
+            triggers_no_cancel=True,
+        )
+        return desc_elt
+
+    def jingle_session_info(
+        self,
+        client: SatXMPPEntity,
+        action: str,
+        session: dict,
+        content_name: str,
+        jingle_elt: domish.Element,
+    ) -> None:
+        """Informational messages"""
+        for elt in jingle_elt.elements():
+            if elt.uri == NS_JINGLE_RTP_INFO:
+                info_type = elt.name
+                if info_type not in ALLOWED_ACTIONS:
+                    log.warning("ignoring unknow info type: {info_type!r}")
+                    continue
+                extra = {}
+                if info_type in ("mute", "unmute"):
+                    name = elt.getAttribute("name")
+                    if name:
+                        extra["name"] = name
+                log.debug(f"{info_type} call info received (extra: {extra})")
+                self.host.bridge.call_info(
+                    session["id"], info_type, data_format.serialise(extra), client.profile
+                )
+
+    def _call_info(self, session_id, info_type, extra_s, profile_key):
+        client = self.host.get_client(profile_key)
+        extra = data_format.deserialise(extra_s)
+        return self.send_info(client, session_id, info_type, extra)
+
+
+    def send_info(
+        self,
+        client: SatXMPPEntity,
+        session_id: str,
+        info_type: str,
+        extra: Optional[dict],
+    ) -> None:
+        """Send information on the call"""
+        if info_type not in ALLOWED_ACTIONS:
+            raise ValueError(f"Unkown info type {info_type!r}")
+        session = self._j.get_session(client, session_id)
+        iq_elt, jingle_elt = self._j.build_session_info(client, session)
+        info_elt = jingle_elt.addElement((NS_JINGLE_RTP_INFO, info_type))
+        if extra and info_type in ("mute", "unmute") and "name" in extra:
+            info_elt["name"] = extra["name"]
+        iq_elt.send()
+
+    def jingle_terminate(
+        self,
+        client: SatXMPPEntity,
+        action: str,
+        session: dict,
+        content_name: str,
+        reason_elt: domish.Element,
+    ) -> None:
+        pass
+
+
+@implementer(iwokkel.IDisco)
+class XEP_0167_handler(XMPPHandler):
+    def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
+        return [
+            disco.DiscoFeature(NS_JINGLE_RTP),
+            disco.DiscoFeature(NS_JINGLE_RTP_AUDIO),
+            disco.DiscoFeature(NS_JINGLE_RTP_VIDEO),
+        ]
+
+    def getDiscoItems(self, requestor, target, nodeIdentifier=""):
+        return []
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/sat/plugins/plugin_xep_0167/constants.py	Mon May 29 13:38:10 2023 +0200
@@ -0,0 +1,27 @@
+#!/usr/bin/env python3
+
+# Libervia plugin for managing pipes (experimental)
+# Copyright (C) 2009-2023 Jérôme Poisson (goffi@goffi.org)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+from typing import Final
+
+
+NS_JINGLE_RTP_BASE: Final = "urn:xmpp:jingle:apps:rtp"
+NS_JINGLE_RTP: Final = f"{NS_JINGLE_RTP_BASE}:1"
+NS_JINGLE_RTP_AUDIO: Final = f"{NS_JINGLE_RTP_BASE}:audio"
+NS_JINGLE_RTP_VIDEO: Final = f"{NS_JINGLE_RTP_BASE}:video"
+NS_JINGLE_RTP_ERRORS: Final = f"{NS_JINGLE_RTP_BASE}:errors:1"
+NS_JINGLE_RTP_INFO: Final = f"{NS_JINGLE_RTP_BASE}:info:1"
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/sat/plugins/plugin_xep_0167/mapping.py	Mon May 29 13:38:10 2023 +0200
@@ -0,0 +1,645 @@
+#!/usr/bin/env python3
+
+# Libervia plugin for managing pipes (experimental)
+# Copyright (C) 2009-2023 Jérôme Poisson (goffi@goffi.org)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+import base64
+from typing import Any, Dict, Optional
+
+from twisted.words.xish import domish
+
+from sat.core.constants import Const as C
+from sat.core.log import getLogger
+
+from .constants import NS_JINGLE_RTP
+
+log = getLogger(__name__)
+
+host = None
+
+
+def senders_to_sdp(senders: str, session: dict) -> str:
+    """Returns appropriate SDP attribute corresponding to Jingle senders attribute"""
+    if senders == "both":
+        return "a=sendrecv"
+    elif senders == "none":
+        return "a=inactive"
+    elif session["role"] == senders:
+        return "a=sendonly"
+    else:
+        return "a=recvonly"
+
+
+def generate_sdp_from_session(
+    session: dict, local: bool = False, port: int = 9999
+) -> str:
+    """Generate an SDP string from session data.
+
+    @param session: A dictionary containing the session data. It should have the
+        following structure:
+
+        {
+            "contents": {
+                "<content_id>": {
+                    "application_data": {
+                        "media": <str: "audio" or "video">,
+                        "local_data": <media_data dict>,
+                        "peer_data": <media_data dict>,
+                        ...
+                    },
+                    "transport_data": {
+                        "local_ice_data": <ice_data dict>,
+                        "peer_ice_data": <ice_data dict>,
+                        ...
+                    },
+                    ...
+                },
+                ...
+            }
+        }
+    @param local: A boolean value indicating whether to generate SDP for the local or
+        peer entity. If True, the method will generate SDP for the local entity,
+        otherwise for the peer entity. Generally the local SDP is received from frontends
+        and not needed in backend, except for debugging purpose.
+    @param port: The preferred port for communications.
+
+    @return: The generated SDP string.
+    """
+    sdp_lines = ["v=0"]
+
+    # Add originator (o=) line after the version (v=) line
+    username = base64.b64encode(session["local_jid"].full().encode()).decode()
+    session_id = "1"  # Increment this for each session
+    session_version = "1"  # Increment this when the session is updated
+    network_type = "IN"
+    address_type = "IP4"
+    connection_address = "0.0.0.0"
+    o_line = (
+        f"o={username} {session_id} {session_version} {network_type} {address_type} "
+        f"{connection_address}"
+    )
+    sdp_lines.append(o_line)
+
+    # Add the mandatory "s=" and t=" lines
+    sdp_lines.append("s=-")
+    sdp_lines.append("t=0 0")
+
+    # stream direction
+    all_senders = {c["senders"] for c in session["contents"].values()}
+    # if we don't have a common senders for all contents, we set them at media level
+    senders = all_senders.pop() if len(all_senders) == 1 else None
+    if senders is not None:
+        sdp_lines.append(senders_to_sdp(senders, session))
+
+    sdp_lines.append("a=msid-semantic:WMS *")
+
+    host.trigger.point(
+        "XEP-0167_generate_sdp_session",
+        session,
+        local,
+        sdp_lines,
+        triggers_no_cancel=True
+    )
+
+    contents = session["contents"]
+    for content_name, content_data in contents.items():
+        app_data_key = "local_data" if local else "peer_data"
+        application_data = content_data["application_data"]
+        media_data = application_data[app_data_key]
+        media = application_data["media"]
+        payload_types = media_data.get("payload_types", {})
+
+        # Generate m= line
+        transport = "UDP/TLS/RTP/SAVPF"
+        payload_type_ids = [str(pt_id) for pt_id in payload_types]
+        m_line = f"m={media} {port} {transport} {' '.join(payload_type_ids)}"
+        sdp_lines.append(m_line)
+
+        sdp_lines.append(f"c={network_type} {address_type} {connection_address}")
+
+        sdp_lines.append(f"a=mid:{content_name}")
+
+        # stream direction
+        if senders is None:
+            sdp_lines.append(senders_to_sdp(content_data["senders"], session))
+
+        # Generate a= lines for rtpmap and fmtp
+        for pt_id, pt in payload_types.items():
+            name = pt["name"]
+            clockrate = pt.get("clockrate", "")
+            sdp_lines.append(f"a=rtpmap:{pt_id} {name}/{clockrate}")
+
+            if "ptime" in pt:
+                sdp_lines.append(f"a=ptime:{pt['ptime']}")
+
+            if "parameters" in pt:
+                fmtp_params = ";".join([f"{k}={v}" for k, v in pt["parameters"].items()])
+                sdp_lines.append(f"a=fmtp:{pt_id} {fmtp_params}")
+
+        if "bandwidth" in media_data:
+            sdp_lines.append(f"a=b:{media_data['bandwidth']}")
+
+        if media_data.get("rtcp-mux"):
+            sdp_lines.append("a=rtcp-mux")
+
+        # Generate a= lines for fingerprint, ICE ufrag, pwd and candidates
+        ice_data_key = "local_ice_data" if local else "peer_ice_data"
+        ice_data = content_data["transport_data"][ice_data_key]
+
+        if "fingerprint" in ice_data:
+            fingerprint_data = ice_data["fingerprint"]
+            sdp_lines.append(
+                f"a=fingerprint:{fingerprint_data['hash']} "
+                f"{fingerprint_data['fingerprint']}"
+            )
+            sdp_lines.append(f"a=setup:{fingerprint_data['setup']}")
+
+        sdp_lines.append(f"a=ice-ufrag:{ice_data['ufrag']}")
+        sdp_lines.append(f"a=ice-pwd:{ice_data['pwd']}")
+
+        for candidate in ice_data["candidates"]:
+            foundation = candidate["foundation"]
+            component_id = candidate["component_id"]
+            transport = candidate["transport"]
+            priority = candidate["priority"]
+            address = candidate["address"]
+            candidate_port = candidate["port"]
+            candidate_type = candidate["type"]
+
+            candidate_line = (
+                f"a=candidate:{foundation} {component_id} {transport} {priority} "
+                f"{address} {candidate_port} typ {candidate_type}"
+            )
+
+            if "rel_addr" in candidate and "rel_port" in candidate:
+                candidate_line += (
+                    f" raddr {candidate['rel_addr']} rport {candidate['rel_port']}"
+                )
+
+            if "generation" in candidate:
+                candidate_line += f" generation {candidate['generation']}"
+
+            if "network" in candidate:
+                candidate_line += f" network {candidate['network']}"
+
+            sdp_lines.append(candidate_line)
+
+        # Generate a= lines for encryption
+        if "encryption" in media_data:
+            for enc_data in media_data["encryption"]:
+                crypto_suite = enc_data["crypto-suite"]
+                key_params = enc_data["key-params"]
+                session_params = enc_data.get("session-params", "")
+                tag = enc_data["tag"]
+
+                crypto_line = f"a=crypto:{tag} {crypto_suite} {key_params}"
+                if session_params:
+                    crypto_line += f" {session_params}"
+                sdp_lines.append(crypto_line)
+
+
+        host.trigger.point(
+            "XEP-0167_generate_sdp_content",
+            session,
+            local,
+            content_name,
+            content_data,
+            sdp_lines,
+            application_data,
+            app_data_key,
+            media_data,
+            media,
+            triggers_no_cancel=True
+        )
+
+    # Combine SDP lines and return the result
+    return "\r\n".join(sdp_lines) + "\r\n"
+
+
+def parse_sdp(sdp: str) -> dict:
+    """Parse SDP string.
+
+    @param sdp: The SDP string to parse.
+
+    @return: A dictionary containing parsed session data.
+    """
+    # FIXME: to be removed once host is accessible from global var
+    assert host is not None
+    lines = sdp.strip().split("\r\n")
+    # session metadata
+    metadata: Dict[str, Any] = {}
+    call_data = {"metadata": metadata}
+
+    media_type = None
+    media_data: Optional[Dict[str, Any]] = None
+    application_data: Optional[Dict[str, Any]] = None
+    transport_data: Optional[Dict[str, Any]] = None
+    fingerprint_data: Optional[Dict[str, str]] = None
+    ice_pwd: Optional[str] = None
+    ice_ufrag: Optional[str] = None
+    payload_types: Optional[Dict[int, dict]] = None
+
+    for line in lines:
+        try:
+            parts = line.split()
+            prefix = parts[0][:2]  # Extract the 'a=', 'm=', etc., prefix
+            parts[0] = parts[0][2:]  # Remove the prefix from the first element
+
+            if prefix == "m=":
+                media_type = parts[0]
+                port = int(parts[1])
+                payload_types = {}
+                for payload_type_id in [int(pt_id) for pt_id in parts[3:]]:
+                    payload_type = {"id": payload_type_id}
+                    payload_types[payload_type_id] = payload_type
+
+                application_data = {"media": media_type, "payload_types": payload_types}
+                transport_data = {"port": port}
+                if fingerprint_data is not None:
+                    transport_data["fingerprint"] = fingerprint_data
+                if ice_pwd is not None:
+                    transport_data["pwd"] = ice_pwd
+                if ice_ufrag is not None:
+                    transport_data["ufrag"] = ice_ufrag
+                media_data = call_data[media_type] = {
+                    "application_data": application_data,
+                    "transport_data": transport_data,
+                }
+
+            elif prefix == "a=":
+                if ":" in parts[0]:
+                    attribute, parts[0] = parts[0].split(":", 1)
+                else:
+                    attribute = parts[0]
+
+                if (
+                    media_type is None
+                    or application_data is None
+                    or transport_data is None
+                ) and not (
+                    attribute
+                    in (
+                        "sendrecv",
+                        "sendonly",
+                        "recvonly",
+                        "inactive",
+                        "fingerprint",
+                        "group",
+                        "ice-options",
+                        "msid-semantic",
+                        "ice-pwd",
+                        "ice-ufrag",
+                    )
+                ):
+                    log.warning(
+                        "Received attribute before media description, this is "
+                        f"invalid: {line}"
+                    )
+                    continue
+
+                if attribute == "mid":
+                    assert media_data is not None
+                    try:
+                        media_data["id"] = parts[0]
+                    except IndexError:
+                        log.warning(f"invalid media ID: {line}")
+
+                elif attribute == "rtpmap":
+                    assert application_data is not None
+                    assert payload_types is not None
+                    pt_id = int(parts[0])
+                    codec_info = parts[1].split("/")
+                    codec = codec_info[0]
+                    clockrate = int(codec_info[1])
+                    payload_type = {
+                        "id": pt_id,
+                        "name": codec,
+                        "clockrate": clockrate,
+                    }
+                    # Handle optional channel count
+                    if len(codec_info) > 2:
+                        channels = int(codec_info[2])
+                        payload_type["channels"] = channels
+
+                    payload_types.setdefault(pt_id, {}).update(payload_type)
+
+                elif attribute == "fmtp":
+                    assert payload_types is not None
+                    pt_id = int(parts[0])
+                    params = parts[1].split(";")
+                    try:
+                        payload_type = payload_types[pt_id]
+                    except KeyError:
+                        raise ValueError(
+                            f"Can find content type {pt_id}, ignoring: {line}"
+                        )
+
+                    try:
+                        payload_type["parameters"] = {
+                            name: value
+                            for name, value in (param.split("=") for param in params)
+                        }
+                    except ValueError:
+                        payload_type.setdefault("exra-parameters", []).extend(params)
+
+                elif attribute == "candidate":
+                    assert transport_data is not None
+                    candidate = {
+                        "foundation": parts[0],
+                        "component_id": int(parts[1]),
+                        "transport": parts[2],
+                        "priority": int(parts[3]),
+                        "address": parts[4],
+                        "port": int(parts[5]),
+                        "type": parts[7],
+                    }
+
+                    for part in parts[8:]:
+                        if part == "raddr":
+                            candidate["rel_addr"] = parts[parts.index(part) + 1]
+                        elif part == "rport":
+                            candidate["rel_port"] = int(parts[parts.index(part) + 1])
+                        elif part == "generation":
+                            candidate["generation"] = parts[parts.index(part) + 1]
+                        elif part == "network":
+                            candidate["network"] = parts[parts.index(part) + 1]
+
+                    transport_data.setdefault("candidates", []).append(candidate)
+
+                elif attribute == "fingerprint":
+                    algorithm, fingerprint = parts[0], parts[1]
+                    fingerprint_data = {"hash": algorithm, "fingerprint": fingerprint}
+                    if transport_data is not None:
+                        transport_data["fingerprint"] = fingerprint_data
+                elif attribute == "setup":
+                    assert transport_data is not None
+                    setup = parts[0]
+                    transport_data.setdefault("fingerprint", {})["setup"] = setup
+
+                elif attribute == "b":
+                    assert application_data is not None
+                    bandwidth = int(parts[0])
+                    application_data["bandwidth"] = bandwidth
+
+                elif attribute == "rtcp-mux":
+                    assert application_data is not None
+                    application_data["rtcp-mux"] = True
+
+                elif attribute == "ice-ufrag":
+                    if transport_data is not None:
+                        transport_data["ufrag"] = parts[0]
+
+                elif attribute == "ice-pwd":
+                    if transport_data is not None:
+                        transport_data["pwd"] = parts[0]
+
+                host.trigger.point(
+                    "XEP-0167_parse_sdp_a",
+                    attribute,
+                    parts,
+                    call_data,
+                    metadata,
+                    media_type,
+                    application_data,
+                    transport_data,
+                    triggers_no_cancel=True
+                )
+
+        except ValueError as e:
+            raise ValueError(f"Could not parse line. Invalid format ({e}): {line}") from e
+        except IndexError as e:
+            raise IndexError(f"Incomplete line. Missing data: {line}") from e
+
+    # we remove private data (data starting with _, used by some plugins (e.g. XEP-0294)
+    # to handle session data at media level))
+    for key in [k for k in call_data if k.startswith("_")]:
+        log.debug(f"cleaning remaining private data {key!r}")
+        del call_data[key]
+
+    # ICE candidates may only be specified for the first media, this
+    # duplicate the candidate for the other in this case
+    all_media = {k:v for k,v in call_data.items() if k in ("audio", "video")}
+    if len(all_media) > 1 and not all(
+        "candidates" in c["transport_data"] for c in all_media.values()
+    ):
+        first_content = next(iter(all_media.values()))
+        try:
+            ice_candidates = first_content["transport_data"]["candidates"]
+        except KeyError:
+            log.warning("missing candidates in SDP")
+        else:
+            for idx, content in enumerate(all_media.values()):
+                if idx == 0:
+                    continue
+                content["transport_data"].setdefault("candidates", ice_candidates)
+
+    return call_data
+
+
+def build_description(media: str, media_data: dict, session: dict) -> domish.Element:
+    """Generate <description> element from media data
+
+    @param media: media type ("audio" or "video")
+
+    @param media_data: A dictionary containing the media description data.
+        The keys and values are described below:
+
+        - ssrc (str, optional): The synchronization source identifier.
+        - payload_types (list): A list of dictionaries, each representing a payload
+          type.
+          Each dictionary may contain the following keys:
+            - channels (str, optional): Number of audio channels.
+            - clockrate (str, optional): Clock rate of the media.
+            - id (str): The unique identifier of the payload type.
+            - maxptime (str, optional): Maximum packet time.
+            - name (str, optional): Name of the codec.
+            - ptime (str, optional): Preferred packet time.
+            - parameters (dict, optional): A dictionary of codec-specific parameters.
+              Key-value pairs represent the parameter name and value, respectively.
+        - bandwidth (str, optional): The bandwidth type.
+        - rtcp-mux (bool, optional): Indicates whether RTCP multiplexing is enabled or
+          not.
+        - encryption (list, optional): A list of dictionaries, each representing an
+          encryption method.
+          Each dictionary may contain the following keys:
+            - tag (str): The unique identifier of the encryption method.
+            - crypto-suite (str): The encryption suite in use.
+            - key-params (str): Key parameters for the encryption suite.
+            - session-params (str, optional): Session parameters for the encryption
+              suite.
+
+    @return: A <description> element.
+    """
+    # FIXME: to be removed once host is accessible from global var
+    assert host is not None
+
+    desc_elt = domish.Element((NS_JINGLE_RTP, "description"), attribs={"media": media})
+
+    for pt_id, pt_data in media_data.get("payload_types", {}).items():
+        payload_type_elt = desc_elt.addElement("payload-type")
+        payload_type_elt["id"] = str(pt_id)
+        for attr in ["channels", "clockrate", "maxptime", "name", "ptime"]:
+            if attr in pt_data:
+                payload_type_elt[attr] = str(pt_data[attr])
+
+        if "parameters" in pt_data:
+            for param_name, param_value in pt_data["parameters"].items():
+                param_elt = payload_type_elt.addElement("parameter")
+                param_elt["name"] = param_name
+                param_elt["value"] = param_value
+        host.trigger.point(
+            "XEP-0167_build_description_payload_type",
+            desc_elt,
+            media_data,
+            pt_data,
+            payload_type_elt,
+            triggers_no_cancel=True
+        )
+
+    if "bandwidth" in media_data:
+        bandwidth_elt = desc_elt.addElement("bandwidth")
+        bandwidth_elt["type"] = media_data["bandwidth"]
+
+    if media_data.get("rtcp-mux"):
+        desc_elt.addElement("rtcp-mux")
+
+    # Add encryption element
+    if "encryption" in media_data:
+        encryption_elt = desc_elt.addElement("encryption")
+        # we always want require encryption if the `encryption` data is present
+        encryption_elt["required"] = "1"
+        for enc_data in media_data["encryption"]:
+            crypto_elt = encryption_elt.addElement("crypto")
+            for attr in ["tag", "crypto-suite", "key-params", "session-params"]:
+                if attr in enc_data:
+                    crypto_elt[attr] = enc_data[attr]
+
+    host.trigger.point(
+        "XEP-0167_build_description",
+        desc_elt,
+        media_data,
+        session,
+        triggers_no_cancel=True
+    )
+
+    return desc_elt
+
+
+def parse_description(desc_elt: domish.Element) -> dict:
+    """Parse <desciption> to a dict
+
+    @param desc_elt: <description> element
+    @return: media data as in [build_description]
+    """
+    # FIXME: to be removed once host is accessible from global var
+    assert host is not None
+
+    media_data = {}
+    if desc_elt.hasAttribute("ssrc"):
+        media_data.setdefault("ssrc", {})[desc_elt["ssrc"]] = {}
+
+    payload_types = {}
+    for payload_type_elt in desc_elt.elements(NS_JINGLE_RTP, "payload-type"):
+        payload_type_data = {
+            attr: payload_type_elt[attr]
+            for attr in [
+                "channels",
+                "clockrate",
+                "maxptime",
+                "name",
+                "ptime",
+            ]
+            if payload_type_elt.hasAttribute(attr)
+        }
+        try:
+            pt_id = int(payload_type_elt["id"])
+        except KeyError:
+            log.warning(
+                f"missing ID in payload type, ignoring: {payload_type_elt.toXml()}"
+            )
+            continue
+
+        parameters = {}
+        for param_elt in payload_type_elt.elements(NS_JINGLE_RTP, "parameter"):
+            param_name = param_elt.getAttribute("name")
+            param_value = param_elt.getAttribute("value")
+            if not param_name or param_value is None:
+                log.warning(f"invalid parameter: {param_elt.toXml()}")
+                continue
+            parameters[param_name] = param_value
+
+        if parameters:
+            payload_type_data["parameters"] = parameters
+
+        host.trigger.point(
+            "XEP-0167_parse_description_payload_type",
+            desc_elt,
+            media_data,
+            payload_type_elt,
+            payload_type_data,
+            triggers_no_cancel=True
+        )
+        payload_types[pt_id] = payload_type_data
+
+    # bandwidth
+    media_data["payload_types"] = payload_types
+    try:
+        bandwidth_elt = next(desc_elt.elements(NS_JINGLE_RTP, "bandwidth"))
+    except StopIteration:
+        pass
+    else:
+        bandwidth = bandwidth_elt.getAttribute("type")
+        if not bandwidth:
+            log.warning(f"invalid bandwidth: {bandwidth_elt.toXml}")
+        else:
+            media_data["bandwidth"] = bandwidth
+
+    # rtcp-mux
+    rtcp_mux_elt = next(desc_elt.elements(NS_JINGLE_RTP, "rtcp-mux"), None)
+    media_data["rtcp-mux"] = rtcp_mux_elt is not None
+
+    # Encryption
+    encryption_data = []
+    encryption_elt = next(desc_elt.elements(NS_JINGLE_RTP, "encryption"), None)
+    if encryption_elt:
+        media_data["encryption_required"] = C.bool(
+            encryption_elt.getAttribute("required", C.BOOL_FALSE)
+        )
+
+        for crypto_elt in encryption_elt.elements(NS_JINGLE_RTP, "crypto"):
+            crypto_data = {
+                attr: crypto_elt[attr]
+                for attr in [
+                    "crypto-suite",
+                    "key-params",
+                    "session-params",
+                    "tag",
+                ]
+                if crypto_elt.hasAttribute(attr)
+            }
+            encryption_data.append(crypto_data)
+
+    if encryption_data:
+        media_data["encryption"] = encryption_data
+
+    host.trigger.point(
+        "XEP-0167_parse_description",
+        desc_elt,
+        media_data,
+        triggers_no_cancel=True
+    )
+
+    return media_data