Mercurial > libervia-backend
view libervia/backend/plugins/plugin_xep_0167/__init__.py @ 4303:a7ec325246fb
component email-gateway: first draft:
Initial implementation of the Email Gateway.
This component uses XEP-0100 for registration. Upon registration and subsequent startups,
a connection is made to registered IMAP services, and incoming emails (in `INBOX`
mailboxes) are immediately forwarded as XMPP messages.
In the opposite direction, an SMTP connection is established to send emails on incoming
XMPP messages.
rel 449
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 06 Sep 2024 18:07:17 +0200 |
parents | a0ed5c976bf8 |
children |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia: an XMPP client # Copyright (C) 2009-2023 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from typing import Optional from twisted.internet import reactor from twisted.internet import defer from twisted.words.protocols.jabber import jid from twisted.words.protocols.jabber.xmlstream import XMPPHandler from twisted.words.xish import domish from wokkel import disco, iwokkel from zope.interface import implementer from libervia.backend.core import exceptions from libervia.backend.core.constants import Const as C from libervia.backend.core.core_types import SatXMPPEntity from libervia.backend.core.i18n import D_, _ from libervia.backend.core.log import getLogger from libervia.backend.tools import xml_tools from libervia.backend.tools.common import data_format from . import mapping from ..plugin_xep_0166 import BaseApplicationHandler from .constants import ( NS_JINGLE_RTP, NS_JINGLE_RTP_AUDIO, NS_JINGLE_RTP_INFO, NS_JINGLE_RTP_VIDEO, NS_AV_CONFERENCES, ) log = getLogger(__name__) PLUGIN_INFO = { C.PI_NAME: "Jingle RTP Sessions", C.PI_IMPORT_NAME: "XEP-0167", C.PI_TYPE: "XEP", C.PI_MODES: C.PLUG_MODE_BOTH, C.PI_PROTOCOLS: ["XEP-0167"], C.PI_DEPENDENCIES: ["XEP-0166"], C.PI_MAIN: "XEP_0167", C.PI_HANDLER: "yes", C.PI_DESCRIPTION: _("""Real-time Transport Protocol (RTP) is used for A/V calls"""), } CONFIRM = D_("{peer} wants to start a call ({call_type}) with you, do you accept?") CONFIRM_TITLE = D_("Incoming Call") SECURITY_LIMIT = 0 ALLOWED_ACTIONS = ( "active", "hold", "unhold", "mute", "unmute", "ringing", ) ANSWER_SDP_SENT_KEY = "answer_sdp_sent" class XEP_0167(BaseApplicationHandler): namespace = NS_JINGLE_RTP def __init__(self, host): log.info(f'Plugin "{PLUGIN_INFO[C.PI_NAME]}" initialization') self.host = host # FIXME: to be removed once host is accessible from global var mapping.host = host self._j = host.plugins["XEP-0166"] self._j.register_application(NS_JINGLE_RTP, self) host.register_namespace("jingle-rtp", NS_JINGLE_RTP) host.bridge.add_method( "call_start", ".plugin", in_sign="sss", out_sign="s", method=self._call_start, async_=True, ) host.bridge.add_method( "call_update", ".plugin", in_sign="sss", out_sign="", method=self._call_update, async_=True, ) host.bridge.add_method( "call_answer_sdp", ".plugin", in_sign="sss", out_sign="", method=self._call_answer_sdp, async_=False, ) host.bridge.add_method( "call_info", ".plugin", in_sign="ssss", out_sign="", method=self._call_info, ) host.bridge.add_method( "call_end", ".plugin", in_sign="sss", out_sign="", method=self._call_end, async_=True, ) # args: session_id, serialised setup data (dict with keys "role" and "sdp"), # profile host.bridge.add_signal("call_setup", ".plugin", signature="sss") # args: session_id, serialised update data, profile host.bridge.add_signal("call_update", ".plugin", signature="sss") # args: session_id, data, profile host.bridge.add_signal("call_ended", ".plugin", signature="sss") # args: session_id, info_type, extra, profile host.bridge.add_signal("call_info", ".plugin", signature="ssss") def get_handler(self, client): return XEP_0167_handler() # bridge methods def _call_start( self, entity_s: str, call_data_s: str, profile_key: str, ): client = self.host.get_client(profile_key) return defer.ensureDeferred( self.call_start( client, jid.JID(entity_s), data_format.deserialise(call_data_s) ) ) def _call_update( self, session_id: str, call_data_s: str, profile_key: str, ): client = self.host.get_client(profile_key) return defer.ensureDeferred( self.call_update(client, session_id, data_format.deserialise(call_data_s)) ) def parse_call_data(self, call_data: dict) -> dict: """Parse ``call_data`` and return corresponding contents end metadata""" metadata = call_data.get("metadata") or {} if "sdp" in call_data: sdp_data = mapping.parse_sdp(call_data["sdp"], self._j.ROLE_INITIATOR) to_delete = set() for media, data in sdp_data.items(): if media not in ("audio", "video", "application"): continue to_delete.add(media) media_type, media_data = media, data call_data[media_type] = media_data["application_data"] transport_data = media_data["transport_data"] try: call_data[media_type]["fingerprint"] = transport_data["fingerprint"] except KeyError: log.warning("fingerprint is missing") pass try: call_data[media_type]["id"] = media_data["id"] except KeyError: log.warning(f"no media ID found for {media_type}: {media_data}") # FIXME: the 2 values below are linked to XEP-0343, they should be added # there instead, maybe with some new trigger? for key in ("sctp-port", "max-message-size"): value = transport_data.get(key) if value is not None: metadata[key] = value try: call_data[media_type]["ice-candidates"] = transport_data.get( "candidates", [] ) metadata["ice-ufrag"] = transport_data["ufrag"] metadata["ice-pwd"] = transport_data["pwd"] except KeyError: log.warning("ICE data are missing from SDP") continue for media in to_delete: del sdp_data[media] metadata.update(sdp_data.get("metadata", {})) return metadata def get_contents(self, call_data: dict, metadata: dict) -> list[dict]: """Generate call related contents. @param call_data: Call data after being parsed by [parse_call_data] @param metadata: Metadata as returned by [parse_call_data] @return: List of contents to be used with [jingle.initiate]. """ contents = [] seen_names = set() for media, media_data in call_data.items(): if media not in ("audio", "video"): continue content = { "app_ns": NS_JINGLE_RTP, "senders": media_data["senders"], "transport_type": self._j.TRANSPORT_DATAGRAM, "app_kwargs": {"media": media, "media_data": media_data}, "transport_data": { "local_ice_data": { "ufrag": metadata["ice-ufrag"], "pwd": metadata["ice-pwd"], "candidates": media_data.pop("ice-candidates"), "fingerprint": media_data.pop("fingerprint", {}), } }, } if "id" in media_data: name = media_data.pop("id") if name in seen_names: raise exceptions.DataError( f"Content name (mid) seen multiple times: {name}" ) content["name"] = name contents.append(content) return contents async def call_start( self, client: SatXMPPEntity, peer_jid: jid.JID, call_data: dict, session_id: str | None = None, ) -> str: """Initiate a call session with the given peer. @param peer_jid: JID of the peer to initiate a call session with. @param call_data: Dictionary containing data for the call. Must include SDP information. The dict can have the following keys: - sdp (str): SDP data for the call. - metadata (dict): Additional metadata for the call (optional). Each media type ("audio" and "video") in the SDP should have: - application_data (dict): Data about the media. - fingerprint (str): Security fingerprint data (optional). - id (str): Identifier for the media (optional). - ice-candidates: ICE candidates for media transport. - And other transport specific data. @param session_id: ID of the Jingle session. If None, an ID will be automatically generated. @return: Session ID (SID) for the initiated call session. @raises exceptions.DataError: If media data is invalid or duplicate content name (mid) is found. """ metadata = self.parse_call_data(call_data) contents = self.get_contents(call_data, metadata) if not contents: raise exceptions.DataError("no valid media data found: {call_data}") call_type = ( C.META_SUBTYPE_CALL_VIDEO if "video" in call_data else C.META_SUBTYPE_CALL_AUDIO ) sid = await self._j.initiate( client, peer_jid, contents, call_type=call_type, metadata=metadata, peer_metadata={}, sid=session_id, ) return sid async def call_update( self, client: SatXMPPEntity, session_id: str, call_data: dict, ) -> None: """Update a running call session. @param session_id: ID of the Jingle session to update. @param call_data: Dictionary containing updated data for the call. Must include SDP information. The dict can have the following keys: - sdp (str): SDP data for the call. - metadata (dict): Additional metadata for the call (optional). Each media type ("audio" and "video") in the SDP should have: - application_data (dict): Data about the media. - fingerprint (str): Security fingerprint data (optional). - id (str): Identifier for the media (optional). - ice-candidates: ICE candidates for media transport. - And other transport specific data. @raises exceptions.DataError: If media data is invalid or duplicate content name (mid) is found. """ session = self._j.get_session(client, session_id) try: new_offer_sdp = call_data["sdp"] except KeyError: raise exceptions.DataError(f"New SDP offer is missing: {call_data}") metadata = self.parse_call_data(call_data) contents = self.get_contents(call_data, metadata) if not contents: raise exceptions.DataError("no valid media data found: {call_data}") call_type = ( C.META_SUBTYPE_CALL_VIDEO if "video" in call_data else C.META_SUBTYPE_CALL_AUDIO ) for content_args in contents: content = content_args["app_kwargs"] content["app_ns"] = NS_JINGLE_RTP content["name"] = (content_args["name"],) content["transport_type"] = self._j.TRANSPORT_DATAGRAM media = content["media"] media_data = content["media_data"].copy() media_data["transport_data"] = content_args["transport_data"][ "local_ice_data" ] desc_elt = mapping.build_description(media, media_data, {}) iq_elt, __ = self._j.build_action( client, self._j.A_CONTENT_ADD, session, content_args["name"], context_elt=desc_elt, ) content_data = self._j.get_content_data(content) transport = self._j.get_transport(client, content, content_data) transport_elt = transport.handler.build_transport( media_data["transport_data"] ) iq_elt.jingle.content.addChild(transport_elt) await iq_elt.send() def _call_answer_sdp(self, session_id: str, answer_sdp: str, profile: str) -> None: client = self.host.get_client(profile) session = self._j.get_session(client, session_id) try: answer_sdp_d = session.pop("answer_sdp_d") except KeyError: raise exceptions.NotFound( f"No answer SDP expected for session {session_id!r}" ) answer_sdp_d.callback(answer_sdp) def _call_end( self, session_id: str, data_s: str, profile_key: str, ): client = self.host.get_client(profile_key) return defer.ensureDeferred( self.call_end(client, session_id, data_format.deserialise(data_s)) ) async def call_end( self, client: SatXMPPEntity, session_id: str, data: dict, ) -> None: """End a call @param session_id: Jingle session ID of the call @param data: optional extra data, may be used to indicate the reason to end the call """ session = self._j.get_session(client, session_id) await self._j.terminate(client, self._j.REASON_SUCCESS, session) # jingle callbacks async def confirm_incoming_call( self, client: SatXMPPEntity, session: dict, call_type: str ) -> bool: """Prompt the user for a call confirmation. @param client: The client entity. @param session: The Jingle session. @param call_type: Type of media (audio or video). @return: True if the call has been accepted """ peer_jid = session["peer_jid"] session["call_type"] = call_type cancellable_deferred = session.setdefault("cancellable_deferred", []) action_extra = { "session_id": session["id"], "from_jid": peer_jid.full(), "type": C.META_TYPE_CALL, "sub_type": call_type, } try: action_extra["metadata"] = {"user": session["metadata"]["peer_user"].full()} except KeyError: pass dialog_d = xml_tools.defer_dialog( self.host, _(CONFIRM).format(peer=peer_jid.userhost(), call_type=call_type), _(CONFIRM_TITLE), action_extra=action_extra, security_limit=SECURITY_LIMIT, profile=client.profile, ) cancellable_deferred.append(dialog_d) resp_data = await dialog_d accepted = not resp_data.get("cancelled", False) if accepted: session["pre_accepted"] = True return accepted async def jingle_preflight( self, client: SatXMPPEntity, session: dict, description_elt: domish.Element ) -> None: """Perform preflight checks for an incoming call session. Check if the calls is audio only or audio/video, then, prompts the user for confirmation. @param client: The client instance. @param session: Jingle session. @param description_elt: The description element. It's parent attribute is used to determine check siblings to see if it's an audio only or audio/video call. @raises exceptions.CancelError: If the user doesn't accept the incoming call. """ if session.get("pre_accepted", False): # the call is already accepted, nothing to do return parent_elt = description_elt.parent assert parent_elt is not None assert description_elt.parent is not None for desc_elt in parent_elt.elements(NS_JINGLE_RTP, "description"): if desc_elt.getAttribute("media") == "video": call_type = C.META_SUBTYPE_CALL_VIDEO break else: call_type = C.META_SUBTYPE_CALL_AUDIO try: accepted = await self.confirm_incoming_call(client, session, call_type) except defer.CancelledError as e: # raised when call is retracted before user has answered or rejected self.host.bridge.call_ended( session["id"], data_format.serialise({"reason": "retracted"}), client.profile, ) raise e if not accepted: raise exceptions.CancelError("User declined the incoming call.") async def jingle_preflight_info( self, client: SatXMPPEntity, session: dict, info_type: str, info_data: dict | None = None, ) -> None: if info_type == "ringing": if not session.get("ringing", False): self.host.bridge.call_info(session["id"], "ringing", "", client.profile) # we indicate that the ringing has started, to avoid sending several times # the signal session["ringing"] = True else: log.warning(f"Unknown preflight info type: {info_type!r}") async def jingle_preflight_cancel( self, client: SatXMPPEntity, session: dict, cancel_error: exceptions.CancelError ) -> None: """The call has been rejected""" # call_ended is used to send the signal only once even if there are audio and # video contents call_ended = session.get("call_ended", False) if call_ended: return data = {"reason": getattr(cancel_error, "reason", None) or "cancelled"} data["text"] = str(cancel_error) self.host.bridge.call_ended( session["id"], data_format.serialise(data), client.profile ) session["call_ended"] = True def jingle_session_init( self, client: SatXMPPEntity, session: dict, content_name: str, media: str, media_data: dict, ) -> domish.Element: if media not in ("audio", "video"): raise ValueError('only "audio" and "video" media types are supported') content_data = session["contents"][content_name] application_data = content_data["application_data"] application_data["media"] = media application_data["local_data"] = media_data desc_elt = mapping.build_description(media, media_data, session) self.host.trigger.point( "XEP-0167_jingle_session_init", client, session, content_name, media, media_data, desc_elt, triggers_no_cancel=True, ) return desc_elt async def jingle_request_confirmation( self, client: SatXMPPEntity, action: str, session: dict, content_name: str, desc_elt: domish.Element, ) -> bool: """Requests confirmation from the user for a Jingle session's incoming call. This method checks the content type of the Jingle session (audio or video) based on the session's contents. Confirmation is requested only for the first content; subsequent contents are automatically accepted. This means, in practice, that the call confirmation is prompted only once for both audio and video contents. @param client: The client instance. @param action: The action type associated with the Jingle session. @param session: Jingle session. @param content_name: Name of the content being checked. @param desc_elt: The description element associated with the content. @return: True if the call is accepted by the user, False otherwise. """ if content_name != next(iter(session["contents"])): # we request confirmation only for the first content, all others are # automatically accepted. In practice, that means that the call confirmation # is requested only once for audio and video contents. return True if not session.get("pre_accepted", False): if any( c["desc_elt"].getAttribute("media") == "video" for c in session["contents"].values() ): call_type = session["call_type"] = C.META_SUBTYPE_CALL_VIDEO else: call_type = session["call_type"] = C.META_SUBTYPE_CALL_AUDIO accepted = await self.confirm_incoming_call(client, session, call_type) if not accepted: return False sdp = mapping.generate_sdp_from_session(session) session["answer_sdp_d"] = answer_sdp_d = defer.Deferred() # we should have the answer long before 2 min answer_sdp_d.addTimeout(2 * 60, reactor) call_setup = session.get("call_setup_cb") call_data = { "role": session["role"], "sdp": sdp, } if call_setup is None: self.host.bridge.call_setup( session["id"], data_format.serialise(call_data), client.profile, ) else: await call_setup( client, session, call_data, ) answer_sdp = await answer_sdp_d parsed_answer = mapping.parse_sdp(answer_sdp, session["role"]) session["metadata"].update(parsed_answer["metadata"]) self.propagate_data(session, parsed_answer) return True def propagate_data(self, session: dict, parsed_answer: dict) -> None: """Propagate local SDP data to other contents""" for media in ("audio", "video", "application"): for content in session["contents"].values(): try: application_data = content["application_data"] content_media = application_data["media"] except KeyError: pass else: if content_media == media: media_data = parsed_answer[media] application_data["local_data"] = media_data["application_data"] transport_data = content["transport_data"] local_ice_data = media_data["transport_data"] transport_data["local_ice_data"] = local_ice_data async def send_answer_sdp(self, client: SatXMPPEntity, session: dict) -> None: """Send answer SDP to frontend""" if not session.get(ANSWER_SDP_SENT_KEY, False): # we only send the signal once, as it means that the whole session is # accepted answer_sdp = mapping.generate_sdp_from_session(session) call_setup = session.get("call_setup_cb") if call_setup is None: self.host.bridge.call_setup( session["id"], data_format.serialise( { "role": session["role"], "sdp": answer_sdp, } ), client.profile, ) else: await call_setup( client, session, { "role": session["role"], "sdp": answer_sdp, }, ) session[ANSWER_SDP_SENT_KEY] = True async def jingle_handler(self, client, action, session, content_name, desc_elt): if action == self._j.A_CONTENT_ADD: content_data = session["contents_new"][content_name] else: content_data = session["contents"][content_name] application_data = content_data["application_data"] if action == self._j.A_PREPARE_CONFIRMATION: session.setdefault("metadata", {}) session.setdefault("peer_metadata", {}) try: media = application_data["media"] = desc_elt["media"] except KeyError: raise exceptions.DataError('"media" key is missing in {desc_elt.toXml()}') if media not in ("audio", "video"): raise exceptions.DataError(f"invalid media: {media!r}") application_data["peer_data"] = mapping.parse_description(desc_elt) elif action == self._j.A_SESSION_INITIATE: application_data["peer_data"] = mapping.parse_description(desc_elt) desc_elt = mapping.build_description( application_data["media"], application_data["local_data"], session ) elif action == self._j.A_ACCEPTED_ACK: pass elif action == self._j.A_PREPARE_INITIATOR: application_data["peer_data"] = mapping.parse_description(desc_elt) elif action == self._j.A_SESSION_ACCEPT: await self.send_answer_sdp(client, session) elif action == self._j.A_CONTENT_ADD: current_contents = session["contents"] if content_name in current_contents: raise exceptions.ConflictError( f"There is already a {content_name!r} content." ) current_contents[content_name] = content_data application_data["media"] = desc_elt["media"] application_data["peer_data"] = mapping.parse_description(desc_elt) else: log.warning(f"FIXME: unmanaged action {action}") await self.host.trigger.async_point( "XEP-0167_jingle_handler", client, action, session, content_name, desc_elt, triggers_no_cancel=True, ) return desc_elt def jingle_session_info( self, client: SatXMPPEntity, action: str, session: dict, content_name: str, jingle_elt: domish.Element, ) -> None: """Informational messages""" for elt in jingle_elt.elements(): if elt.uri == NS_JINGLE_RTP_INFO: info_type = elt.name if info_type not in ALLOWED_ACTIONS: log.warning("ignoring unknow info type: {info_type!r}") continue extra = {} if info_type in ("mute", "unmute"): name = elt.getAttribute("name") if name: extra["name"] = name log.debug(f"{info_type} call info received (extra: {extra})") self.host.bridge.call_info( session["id"], info_type, data_format.serialise(extra), client.profile ) def _call_info(self, session_id, info_type, extra_s, profile_key): client = self.host.get_client(profile_key) extra = data_format.deserialise(extra_s) return self.send_info(client, session_id, info_type, extra) def send_info( self, client: SatXMPPEntity, session_id: str, info_type: str, extra: Optional[dict], ) -> None: """Send information on the call""" if info_type not in ALLOWED_ACTIONS: raise ValueError(f"Unkown info type {info_type!r}") session = self._j.get_session(client, session_id) iq_elt, jingle_elt = self._j.build_session_info(client, session) info_elt = jingle_elt.addElement((NS_JINGLE_RTP_INFO, info_type)) if extra and info_type in ("mute", "unmute") and "name" in extra: info_elt["name"] = extra["name"] iq_elt.send() def jingle_terminate( self, client: SatXMPPEntity, action: str, session: dict, content_name: str, reason_elt: domish.Element, ) -> None: reason, text = self._j.parse_reason_elt(reason_elt) data = {"reason": reason} if text: data["text"] = text self.host.bridge.call_ended( session["id"], data_format.serialise(data), client.profile ) @implementer(iwokkel.IDisco) class XEP_0167_handler(XMPPHandler): def getDiscoInfo(self, requestor, target, nodeIdentifier=""): return [ disco.DiscoFeature(NS_JINGLE_RTP), disco.DiscoFeature(NS_JINGLE_RTP_AUDIO), disco.DiscoFeature(NS_JINGLE_RTP_VIDEO), disco.DiscoFeature(NS_AV_CONFERENCES), ] def getDiscoItems(self, requestor, target, nodeIdentifier=""): return []