Mercurial > libervia-backend
comparison src/tools/stream.py @ 2489:e2a7bb875957
plugin pipe/stream, file transfert: refactoring and improvments:
this is a big patch as things had to be changed at the same time.
- changed methods using profile argument to use client instead
- move SatFile in a new tools.stream module, has it should be part of core, not a plugin
- new IStreamProducer interface, to handler starting a pull producer
- new FileStreamObject which create a stream producer/consumer from a SatFile
- plugin pipe is no more using unix named pipe, as it complicate the thing,
special care need to be taken to not block, and it's generally not necessary.
Instead a socket is now used, so the plugin has been renomed to jingle stream.
- bad connection/error should be better handler in jingle stream plugin, and code should not block anymore
- jp pipe commands have been updated accordingly
fix bug 237
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 08 Feb 2018 00:37:42 +0100 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
2488:78c7992a26ed | 2489:e2a7bb875957 |
---|---|
1 #!/usr/bin/env python2 | |
2 # -*- coding: utf-8 -*- | |
3 | |
4 # SAT: a jabber client | |
5 # Copyright (C) 2009-2018 Jérôme Poisson (goffi@goffi.org) | |
6 | |
7 # This program is free software: you can redistribute it and/or modify | |
8 # it under the terms of the GNU Affero General Public License as published by | |
9 # the Free Software Foundation, either version 3 of the License, or | |
10 # (at your option) any later version. | |
11 | |
12 # This program is distributed in the hope that it will be useful, | |
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
15 # GNU Affero General Public License for more details. | |
16 | |
17 # You should have received a copy of the GNU Affero General Public License | |
18 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
19 | |
20 """ interfaces """ | |
21 | |
22 from sat.core import exceptions | |
23 from sat.core.constants import Const as C | |
24 from sat.core.log import getLogger | |
25 from twisted.protocols import basic | |
26 from twisted.internet import interfaces | |
27 from zope import interface | |
28 import uuid | |
29 import os | |
30 | |
31 log = getLogger(__name__) | |
32 | |
33 | |
34 class IStreamProducer(interface.Interface): | |
35 | |
36 def startStream(consumer): | |
37 """start producing the stream | |
38 | |
39 @return (D): deferred fired when stream is finished | |
40 """ | |
41 | |
42 | |
43 class SatFile(object): | |
44 """A file-like object to have high level files manipulation""" | |
45 # TODO: manage "with" statement | |
46 | |
47 def __init__(self, host, client, path, mode='rb', uid=None, size=None, data_cb=None, auto_end_signals=True): | |
48 """ | |
49 @param host: %(doc_host)s | |
50 @param path(str): path of the file to get | |
51 @param mode(str): same as for built-in "open" function | |
52 @param uid(unicode, None): unique id identifing this progressing element | |
53 This uid will be used with self.host.progressGet | |
54 will be automaticaly generated if None | |
55 @param size(None, int): size of the file (when known in advance) | |
56 @param data_cb(None, callable): method to call on each data read/write | |
57 mainly useful to do things like calculating hash | |
58 @param auto_end_signals(bool): if True, progressFinished and progressError signals are automatically sent | |
59 if False, you'll have to call self.progressFinished and self.progressError yourself | |
60 progressStarted signal is always sent automatically | |
61 """ | |
62 self.host = host | |
63 self.profile = client.profile | |
64 self.uid = uid or unicode(uuid.uuid4()) | |
65 self._file = open(path, mode) | |
66 self.size = size | |
67 self.data_cb = data_cb | |
68 self.auto_end_signals = auto_end_signals | |
69 metadata = self.getProgressMetadata() | |
70 self.host.registerProgressCb(self.uid, self.getProgress, metadata, profile=client.profile) | |
71 self.host.bridge.progressStarted(self.uid, metadata, client.profile) | |
72 | |
73 def checkSize(self): | |
74 """Check that current size correspond to given size | |
75 | |
76 must be used when the transfer is supposed to be finished | |
77 @return (bool): True if the position is the same as given size | |
78 @raise exceptions.NotFound: size has not be specified | |
79 """ | |
80 position = self._file.tell() | |
81 if self.size is None: | |
82 raise exceptions.NotFound | |
83 return position == self.size | |
84 | |
85 def close(self, progress_metadata=None, error=None): | |
86 """Close the current file | |
87 | |
88 @param progress_metadata(None, dict): metadata to send with _onProgressFinished message | |
89 @param error(None, unicode): set to an error message if progress was not successful | |
90 mutually exclusive with progress_metadata | |
91 error can happen even if error is None, if current size differ from given size | |
92 """ | |
93 if self._file.closed: | |
94 return # avoid double close (which is allowed) error | |
95 if error is None: | |
96 try: | |
97 size_ok = self.checkSize() | |
98 except exceptions.NotFound: | |
99 size_ok = True | |
100 if not size_ok: | |
101 error = u'declared and actual size mismatch' | |
102 log.warning(error) | |
103 progress_metadata = None | |
104 | |
105 self._file.close() | |
106 | |
107 if self.auto_end_signals: | |
108 if error is None: | |
109 self.progressFinished(progress_metadata) | |
110 else: | |
111 assert progress_metadata is None | |
112 self.progressError(error) | |
113 | |
114 self.host.removeProgressCb(self.uid, self.profile) | |
115 | |
116 def progressFinished(self, metadata=None): | |
117 if metadata is None: | |
118 metadata = {} | |
119 self.host.bridge.progressFinished(self.uid, metadata, self.profile) | |
120 | |
121 def progressError(self, error): | |
122 self.host.bridge.progressError(self.uid, error, self.profile) | |
123 | |
124 def flush(self): | |
125 self._file.flush() | |
126 | |
127 def write(self, buf): | |
128 self._file.write(buf) | |
129 if self.data_cb is not None: | |
130 return self.data_cb(buf) | |
131 | |
132 def read(self, size=-1): | |
133 read = self._file.read(size) | |
134 if self.data_cb is not None and read: | |
135 self.data_cb(read) | |
136 return read | |
137 | |
138 def seek(self, offset, whence=os.SEEK_SET): | |
139 self._file.seek(offset, whence) | |
140 | |
141 def tell(self): | |
142 return self._file.tell() | |
143 | |
144 def mode(self): | |
145 return self._file.mode() | |
146 | |
147 def getProgressMetadata(self): | |
148 """Return progression metadata as given to progressStarted | |
149 | |
150 @return (dict): metadata (check bridge for documentation) | |
151 """ | |
152 metadata = {'type': C.META_TYPE_FILE} | |
153 | |
154 mode = self._file.mode | |
155 if '+' in mode: | |
156 pass # we have no direction in read/write modes | |
157 elif mode in ('r', 'rb'): | |
158 metadata['direction'] = 'out' | |
159 elif mode in ('w', 'wb'): | |
160 metadata['direction'] = 'in' | |
161 elif 'U' in mode: | |
162 metadata['direction'] = 'out' | |
163 else: | |
164 raise exceptions.InternalError | |
165 | |
166 metadata['name'] = self._file.name | |
167 | |
168 return metadata | |
169 | |
170 def getProgress(self, progress_id, profile): | |
171 ret = {'position': self._file.tell()} | |
172 if self.size: | |
173 ret['size'] = self.size | |
174 return ret | |
175 | |
176 | |
177 @interface.implementer(IStreamProducer) | |
178 @interface.implementer(interfaces.IConsumer) | |
179 class FileStreamObject(basic.FileSender): | |
180 | |
181 def __init__(self, host, client, path, **kwargs): | |
182 """ | |
183 | |
184 A SatFile will be created and put in self.file_obj | |
185 @param path(unicode): path to the file | |
186 @param **kwargs: kw arguments to pass to SatFile | |
187 """ | |
188 self.file_obj = SatFile(host, client, path, **kwargs) | |
189 | |
190 def registerProducer(self, producer, streaming): | |
191 pass | |
192 | |
193 def startStream(self, consumer): | |
194 return self.beginFileTransfer(self.file_obj, consumer) | |
195 | |
196 def write(self, data): | |
197 self.file_obj.write(data) | |
198 | |
199 def close(self, *args, **kwargs): | |
200 self.file_obj.close(*args, **kwargs) |