comparison libervia/frontends/tools/webrtc_file.py @ 4233:d01b8d002619

cli (call, file), frontends: implement webRTC data channel transfer: - file send/receive commands now supports webRTC transfer. In `send` command, the `--webrtc` flags is currenty used to activate it. - WebRTC related code have been factorized and moved to `libervia.frontends.tools.webrtc*` modules. rel 442
author Goffi <goffi@goffi.org>
date Sat, 06 Apr 2024 13:43:09 +0200
parents
children 79c8a70e1813
comparison
equal deleted inserted replaced
4232:0fbe5c605eb6 4233:d01b8d002619
1 #!/usr/bin/env python3
2
3 # Libervia WebRTC implementation
4 # Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org)
5
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU Affero General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU Affero General Public License for more details.
15
16 # You should have received a copy of the GNU Affero General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19
20 import asyncio
21 import atexit
22 from functools import partial
23 import logging
24 from pathlib import Path
25 from typing import Any, Callable, IO
26
27 import gi
28
29 gi.require_versions({"Gst": "1.0", "GstWebRTC": "1.0"})
30 from gi.repository import GLib, GstWebRTC
31
32 from libervia.backend.core import exceptions
33 from libervia.backend.core.i18n import _
34 from libervia.backend.tools.common import data_format, utils
35 from libervia.frontends.tools import aio, jid, webrtc
36 from libervia.frontends.tools.webrtc_models import CallData
37
38
39 log = logging.getLogger(__name__)
40
41 WEBRTC_CHUNK_SIZE = 64 * 1024
42
43
44 class WebRTCFileSender:
45
46 def __init__(
47 self,
48 bridge,
49 profile: str,
50 on_call_start_cb: Callable[[dict], Any] | None = None,
51 end_call_cb: Callable[[], Any] | None = None,
52 ) -> None:
53 """Initializes the File Sender.
54
55 @param bridge: An async Bridge instance.
56 @param profile: The profile name to be used.
57 @param on_call_start_cb: A blocking or async callable that accepts a dict as its
58 only argument.
59 @param end_call_cb: A callable to be invoked at the end of a call.
60 """
61
62 self.bridge = bridge
63 self.profile = profile
64 self.on_call_start_cb = on_call_start_cb
65 self.end_call_cb = end_call_cb
66 self.loop = asyncio.get_event_loop()
67
68 async def _on_webrtc_call_start(
69 self,
70 file_path: Path,
71 file_name: str | None,
72 callee: str,
73 call_data: dict,
74 profile: str,
75 ) -> str:
76 file_data_s = await self.bridge.file_jingle_send(
77 str(callee),
78 "",
79 file_name or file_path.name,
80 "",
81 data_format.serialise(
82 {
83 "webrtc": True,
84 "call_data": call_data,
85 "size": file_path.stat().st_size,
86 }
87 ),
88 self.profile,
89 )
90 file_data = data_format.deserialise(file_data_s)
91
92 if self.on_call_start_cb is not None:
93 await aio.maybe_async(self.on_call_start_cb(file_data))
94 return file_data["session_id"]
95
96 async def _send_file(
97 self, file_path: Path, data_channel: GstWebRTC.WebRTCDataChannel
98 ) -> None:
99 """Send file to Data Channel by chunks"""
100 try:
101 with file_path.open("rb") as file:
102 while True:
103 data = file.read(WEBRTC_CHUNK_SIZE)
104 if not data:
105 break
106 data_channel.send_data(GLib.Bytes(data))
107 # We give control back to the loop to avoid freezing everything.
108 await asyncio.sleep(0)
109 finally:
110 webrtc_call = self.webrtc_call
111 # we connect to the "on-close" signal to wait for the data channel to be
112 # actually closed before closing the call and quitting the app.
113 data_channel.connect("on-close", partial(self._on_dc_close, webrtc_call))
114 data_channel.close()
115
116 def _on_dc_close(self, webrtc_call, data_channel: GstWebRTC.WebRTCDataChannel):
117 if webrtc_call is not None:
118 aio.run_from_thread(self._end_call_and_quit, webrtc_call, loop=self.loop)
119
120 async def _end_call_and_quit(self, webrtc_call):
121 await webrtc_call.webrtc.end_call()
122 if self.end_call_cb is not None:
123 await aio.maybe_async(self.end_call_cb())
124
125 def _on_dc_open(
126 self, file_path: Path, data_channel: GstWebRTC.WebRTCDataChannel
127 ) -> None:
128 """Called when datachannel is open"""
129 aio.run_from_thread(self._send_file, file_path, data_channel, loop=self.loop)
130
131 async def send_file_webrtc(
132 self,
133 file_path: Path|str,
134 callee: jid.JID,
135 file_name: str | None = None,
136 ) -> None:
137 """Send a file using WebRTC to the given callee JID.
138
139 @param file_path: The local path to the file to send.
140 @param callee: The JID of the recipient to send the file to.
141 @param file_name: Name of the file as sent to the peer.
142 If None or empty string, name will be retrieved from file path.
143 """
144 file_path = Path(file_path)
145 call_data = CallData(callee=callee)
146 self.webrtc_call = await webrtc.WebRTCCall.make_webrtc_call(
147 self.bridge,
148 self.profile,
149 call_data,
150 sources=webrtc.SOURCES_DATACHANNEL,
151 call_start_cb=partial(
152 self._on_webrtc_call_start,
153 file_path,
154 file_name,
155 ),
156 dc_open_cb=partial(self._on_dc_open, file_path),
157 )
158
159
160 class WebRTCFileReceiver:
161
162 def __init__(
163 self, bridge, profile: str, on_close_cb: Callable[[], Any] | None = None
164 ) -> None:
165 """Initializes the File Receiver.
166
167 @param bridge: An async Bridge instance.
168 @param profile: The profile name to be used.
169 @param on_close_cb: Called when the Data Channel is closed.
170 """
171 self.bridge = bridge
172 self.profile = profile
173 self.on_close_cb = on_close_cb
174 self.loop = asyncio.get_event_loop()
175 self.file_data: dict | None = None
176 self.fd: IO[bytes] | None = None
177
178 @staticmethod
179 def format_confirm_msg(
180 action_data: dict,
181 peer_jid: jid.JID,
182 peer_name: str|None = None
183 ) -> str:
184 """Format a user-friendly confirmation message.
185
186 File data will be retrieve from ``action_data`` and used to format a user-friendly
187 file confirmation message.
188 @param action_data: Data as returned by the "FILE" ``action_new`` signal.
189 @return: User-friendly confirmation message.
190 """
191 file_data = action_data.get("file_data", {})
192
193 file_name = file_data.get('name')
194 file_size = file_data.get('size')
195
196 if file_name:
197 file_name_msg = 'wants to send you the file "{file_name}"'.format(
198 file_name=file_name
199 )
200 else:
201 file_name_msg = 'wants to send you an unnamed file'
202
203 if file_size is not None:
204 file_size_msg = "which has a size of {file_size_human}".format(
205 file_size_human=utils.get_human_size(file_size)
206 )
207 else:
208 file_size_msg = "which has an unknown size"
209
210 file_description = file_data.get('desc')
211 if file_description:
212 description_msg = " Description: {}.".format(file_description)
213 else:
214 description_msg = ""
215
216 file_data = action_data.get("file_data", {})
217
218 if not peer_name:
219 peer_name = str(peer_jid)
220 else:
221 peer_name = f"{peer_name} ({peer_jid})"
222
223 return (
224 _("{peer_name} {file_name_msg} {file_size_msg}.{description_msg} "
225 "Do you accept?").format(
226 peer_name=peer_name,
227 file_name_msg=file_name_msg,
228 file_size_msg=file_size_msg,
229 description_msg=description_msg
230 )
231 )
232
233 def _on_dc_message_data(self, fd, data_channel, glib_data) -> None:
234 """A data chunk of the file has been received."""
235 fd.write(glib_data.get_data())
236
237 def _on_dc_close(self, data_channel) -> None:
238 """Data channel is closed
239
240 The file download should be complete, we close it.
241 """
242 aio.run_from_thread(self._on_close, loop=self.loop)
243
244 async def _on_close(self) -> None:
245 assert self.fd is not None
246 self.fd.close()
247 if self.on_close_cb is not None:
248 await aio.maybe_async(self.on_close_cb())
249
250 def _on_data_channel(self, webrtcbin, data_channel) -> None:
251 """The data channel has been opened."""
252 data_channel.connect(
253 "on-message-data", partial(self._on_dc_message_data, self.fd)
254 )
255 data_channel.connect("on-close", self._on_dc_close)
256
257 def _on_fd_clean(self, fd) -> None:
258 """Closed opened file object if not already.
259
260 If the file object was not closed, an error message is returned.
261 """
262 if fd is None:
263 return
264 if not fd.closed:
265 log.error(
266 f"The file {fd.name!r} was not closed properly, which might "
267 "indicate an incomplete download."
268 )
269 fd.close()
270
271 async def receive_file_webrtc(
272 self,
273 from_jid: jid.JID,
274 session_id: str,
275 file_path: Path,
276 file_data: dict,
277 ) -> None:
278 """Receives a file via WebRTC and saves it to the specified path.
279
280 @param from_jid: The JID of the entity sending the file.
281 @param session_id: The Jingle FT Session ID.
282 @param file_path: The local path where the received file will be saved.
283 If a file already exists at this path, it will be overwritten.
284 @param file_data: Additional data about the file being transferred.
285 """
286 if file_path.exists() and not file_path.is_file():
287 raise exceptions.InternalError(
288 f"{file_path} is not a valid destination path."
289 )
290 self.fd = file_path.open("wb")
291 atexit.register(self._on_fd_clean, self.fd)
292 self.file_data = file_data
293 call_data = CallData(callee=from_jid, sid=session_id)
294 await webrtc.WebRTCCall.make_webrtc_call(
295 self.bridge,
296 self.profile,
297 call_data,
298 sinks=webrtc.SINKS_DATACHANNEL,
299 dc_on_data_channel=self._on_data_channel,
300 )