Mercurial > libervia-backend
comparison libervia/frontends/tools/webrtc_file.py @ 4233:d01b8d002619
cli (call, file), frontends: implement webRTC data channel transfer:
- file send/receive commands now supports webRTC transfer. In `send` command, the
`--webrtc` flags is currenty used to activate it.
- WebRTC related code have been factorized and moved to `libervia.frontends.tools.webrtc*`
modules.
rel 442
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 06 Apr 2024 13:43:09 +0200 |
parents | |
children | 79c8a70e1813 |
comparison
equal
deleted
inserted
replaced
4232:0fbe5c605eb6 | 4233:d01b8d002619 |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 # Libervia WebRTC implementation | |
4 # Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org) | |
5 | |
6 # This program is free software: you can redistribute it and/or modify | |
7 # it under the terms of the GNU Affero General Public License as published by | |
8 # the Free Software Foundation, either version 3 of the License, or | |
9 # (at your option) any later version. | |
10 | |
11 # This program is distributed in the hope that it will be useful, | |
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
14 # GNU Affero General Public License for more details. | |
15 | |
16 # You should have received a copy of the GNU Affero General Public License | |
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
18 | |
19 | |
20 import asyncio | |
21 import atexit | |
22 from functools import partial | |
23 import logging | |
24 from pathlib import Path | |
25 from typing import Any, Callable, IO | |
26 | |
27 import gi | |
28 | |
29 gi.require_versions({"Gst": "1.0", "GstWebRTC": "1.0"}) | |
30 from gi.repository import GLib, GstWebRTC | |
31 | |
32 from libervia.backend.core import exceptions | |
33 from libervia.backend.core.i18n import _ | |
34 from libervia.backend.tools.common import data_format, utils | |
35 from libervia.frontends.tools import aio, jid, webrtc | |
36 from libervia.frontends.tools.webrtc_models import CallData | |
37 | |
38 | |
39 log = logging.getLogger(__name__) | |
40 | |
41 WEBRTC_CHUNK_SIZE = 64 * 1024 | |
42 | |
43 | |
44 class WebRTCFileSender: | |
45 | |
46 def __init__( | |
47 self, | |
48 bridge, | |
49 profile: str, | |
50 on_call_start_cb: Callable[[dict], Any] | None = None, | |
51 end_call_cb: Callable[[], Any] | None = None, | |
52 ) -> None: | |
53 """Initializes the File Sender. | |
54 | |
55 @param bridge: An async Bridge instance. | |
56 @param profile: The profile name to be used. | |
57 @param on_call_start_cb: A blocking or async callable that accepts a dict as its | |
58 only argument. | |
59 @param end_call_cb: A callable to be invoked at the end of a call. | |
60 """ | |
61 | |
62 self.bridge = bridge | |
63 self.profile = profile | |
64 self.on_call_start_cb = on_call_start_cb | |
65 self.end_call_cb = end_call_cb | |
66 self.loop = asyncio.get_event_loop() | |
67 | |
68 async def _on_webrtc_call_start( | |
69 self, | |
70 file_path: Path, | |
71 file_name: str | None, | |
72 callee: str, | |
73 call_data: dict, | |
74 profile: str, | |
75 ) -> str: | |
76 file_data_s = await self.bridge.file_jingle_send( | |
77 str(callee), | |
78 "", | |
79 file_name or file_path.name, | |
80 "", | |
81 data_format.serialise( | |
82 { | |
83 "webrtc": True, | |
84 "call_data": call_data, | |
85 "size": file_path.stat().st_size, | |
86 } | |
87 ), | |
88 self.profile, | |
89 ) | |
90 file_data = data_format.deserialise(file_data_s) | |
91 | |
92 if self.on_call_start_cb is not None: | |
93 await aio.maybe_async(self.on_call_start_cb(file_data)) | |
94 return file_data["session_id"] | |
95 | |
96 async def _send_file( | |
97 self, file_path: Path, data_channel: GstWebRTC.WebRTCDataChannel | |
98 ) -> None: | |
99 """Send file to Data Channel by chunks""" | |
100 try: | |
101 with file_path.open("rb") as file: | |
102 while True: | |
103 data = file.read(WEBRTC_CHUNK_SIZE) | |
104 if not data: | |
105 break | |
106 data_channel.send_data(GLib.Bytes(data)) | |
107 # We give control back to the loop to avoid freezing everything. | |
108 await asyncio.sleep(0) | |
109 finally: | |
110 webrtc_call = self.webrtc_call | |
111 # we connect to the "on-close" signal to wait for the data channel to be | |
112 # actually closed before closing the call and quitting the app. | |
113 data_channel.connect("on-close", partial(self._on_dc_close, webrtc_call)) | |
114 data_channel.close() | |
115 | |
116 def _on_dc_close(self, webrtc_call, data_channel: GstWebRTC.WebRTCDataChannel): | |
117 if webrtc_call is not None: | |
118 aio.run_from_thread(self._end_call_and_quit, webrtc_call, loop=self.loop) | |
119 | |
120 async def _end_call_and_quit(self, webrtc_call): | |
121 await webrtc_call.webrtc.end_call() | |
122 if self.end_call_cb is not None: | |
123 await aio.maybe_async(self.end_call_cb()) | |
124 | |
125 def _on_dc_open( | |
126 self, file_path: Path, data_channel: GstWebRTC.WebRTCDataChannel | |
127 ) -> None: | |
128 """Called when datachannel is open""" | |
129 aio.run_from_thread(self._send_file, file_path, data_channel, loop=self.loop) | |
130 | |
131 async def send_file_webrtc( | |
132 self, | |
133 file_path: Path|str, | |
134 callee: jid.JID, | |
135 file_name: str | None = None, | |
136 ) -> None: | |
137 """Send a file using WebRTC to the given callee JID. | |
138 | |
139 @param file_path: The local path to the file to send. | |
140 @param callee: The JID of the recipient to send the file to. | |
141 @param file_name: Name of the file as sent to the peer. | |
142 If None or empty string, name will be retrieved from file path. | |
143 """ | |
144 file_path = Path(file_path) | |
145 call_data = CallData(callee=callee) | |
146 self.webrtc_call = await webrtc.WebRTCCall.make_webrtc_call( | |
147 self.bridge, | |
148 self.profile, | |
149 call_data, | |
150 sources=webrtc.SOURCES_DATACHANNEL, | |
151 call_start_cb=partial( | |
152 self._on_webrtc_call_start, | |
153 file_path, | |
154 file_name, | |
155 ), | |
156 dc_open_cb=partial(self._on_dc_open, file_path), | |
157 ) | |
158 | |
159 | |
160 class WebRTCFileReceiver: | |
161 | |
162 def __init__( | |
163 self, bridge, profile: str, on_close_cb: Callable[[], Any] | None = None | |
164 ) -> None: | |
165 """Initializes the File Receiver. | |
166 | |
167 @param bridge: An async Bridge instance. | |
168 @param profile: The profile name to be used. | |
169 @param on_close_cb: Called when the Data Channel is closed. | |
170 """ | |
171 self.bridge = bridge | |
172 self.profile = profile | |
173 self.on_close_cb = on_close_cb | |
174 self.loop = asyncio.get_event_loop() | |
175 self.file_data: dict | None = None | |
176 self.fd: IO[bytes] | None = None | |
177 | |
178 @staticmethod | |
179 def format_confirm_msg( | |
180 action_data: dict, | |
181 peer_jid: jid.JID, | |
182 peer_name: str|None = None | |
183 ) -> str: | |
184 """Format a user-friendly confirmation message. | |
185 | |
186 File data will be retrieve from ``action_data`` and used to format a user-friendly | |
187 file confirmation message. | |
188 @param action_data: Data as returned by the "FILE" ``action_new`` signal. | |
189 @return: User-friendly confirmation message. | |
190 """ | |
191 file_data = action_data.get("file_data", {}) | |
192 | |
193 file_name = file_data.get('name') | |
194 file_size = file_data.get('size') | |
195 | |
196 if file_name: | |
197 file_name_msg = 'wants to send you the file "{file_name}"'.format( | |
198 file_name=file_name | |
199 ) | |
200 else: | |
201 file_name_msg = 'wants to send you an unnamed file' | |
202 | |
203 if file_size is not None: | |
204 file_size_msg = "which has a size of {file_size_human}".format( | |
205 file_size_human=utils.get_human_size(file_size) | |
206 ) | |
207 else: | |
208 file_size_msg = "which has an unknown size" | |
209 | |
210 file_description = file_data.get('desc') | |
211 if file_description: | |
212 description_msg = " Description: {}.".format(file_description) | |
213 else: | |
214 description_msg = "" | |
215 | |
216 file_data = action_data.get("file_data", {}) | |
217 | |
218 if not peer_name: | |
219 peer_name = str(peer_jid) | |
220 else: | |
221 peer_name = f"{peer_name} ({peer_jid})" | |
222 | |
223 return ( | |
224 _("{peer_name} {file_name_msg} {file_size_msg}.{description_msg} " | |
225 "Do you accept?").format( | |
226 peer_name=peer_name, | |
227 file_name_msg=file_name_msg, | |
228 file_size_msg=file_size_msg, | |
229 description_msg=description_msg | |
230 ) | |
231 ) | |
232 | |
233 def _on_dc_message_data(self, fd, data_channel, glib_data) -> None: | |
234 """A data chunk of the file has been received.""" | |
235 fd.write(glib_data.get_data()) | |
236 | |
237 def _on_dc_close(self, data_channel) -> None: | |
238 """Data channel is closed | |
239 | |
240 The file download should be complete, we close it. | |
241 """ | |
242 aio.run_from_thread(self._on_close, loop=self.loop) | |
243 | |
244 async def _on_close(self) -> None: | |
245 assert self.fd is not None | |
246 self.fd.close() | |
247 if self.on_close_cb is not None: | |
248 await aio.maybe_async(self.on_close_cb()) | |
249 | |
250 def _on_data_channel(self, webrtcbin, data_channel) -> None: | |
251 """The data channel has been opened.""" | |
252 data_channel.connect( | |
253 "on-message-data", partial(self._on_dc_message_data, self.fd) | |
254 ) | |
255 data_channel.connect("on-close", self._on_dc_close) | |
256 | |
257 def _on_fd_clean(self, fd) -> None: | |
258 """Closed opened file object if not already. | |
259 | |
260 If the file object was not closed, an error message is returned. | |
261 """ | |
262 if fd is None: | |
263 return | |
264 if not fd.closed: | |
265 log.error( | |
266 f"The file {fd.name!r} was not closed properly, which might " | |
267 "indicate an incomplete download." | |
268 ) | |
269 fd.close() | |
270 | |
271 async def receive_file_webrtc( | |
272 self, | |
273 from_jid: jid.JID, | |
274 session_id: str, | |
275 file_path: Path, | |
276 file_data: dict, | |
277 ) -> None: | |
278 """Receives a file via WebRTC and saves it to the specified path. | |
279 | |
280 @param from_jid: The JID of the entity sending the file. | |
281 @param session_id: The Jingle FT Session ID. | |
282 @param file_path: The local path where the received file will be saved. | |
283 If a file already exists at this path, it will be overwritten. | |
284 @param file_data: Additional data about the file being transferred. | |
285 """ | |
286 if file_path.exists() and not file_path.is_file(): | |
287 raise exceptions.InternalError( | |
288 f"{file_path} is not a valid destination path." | |
289 ) | |
290 self.fd = file_path.open("wb") | |
291 atexit.register(self._on_fd_clean, self.fd) | |
292 self.file_data = file_data | |
293 call_data = CallData(callee=from_jid, sid=session_id) | |
294 await webrtc.WebRTCCall.make_webrtc_call( | |
295 self.bridge, | |
296 self.profile, | |
297 call_data, | |
298 sinks=webrtc.SINKS_DATACHANNEL, | |
299 dc_on_data_channel=self._on_data_channel, | |
300 ) |