comparison src/tools/stream.py @ 2489:e2a7bb875957

plugin pipe/stream, file transfert: refactoring and improvments: this is a big patch as things had to be changed at the same time. - changed methods using profile argument to use client instead - move SatFile in a new tools.stream module, has it should be part of core, not a plugin - new IStreamProducer interface, to handler starting a pull producer - new FileStreamObject which create a stream producer/consumer from a SatFile - plugin pipe is no more using unix named pipe, as it complicate the thing, special care need to be taken to not block, and it's generally not necessary. Instead a socket is now used, so the plugin has been renomed to jingle stream. - bad connection/error should be better handler in jingle stream plugin, and code should not block anymore - jp pipe commands have been updated accordingly fix bug 237
author Goffi <goffi@goffi.org>
date Thu, 08 Feb 2018 00:37:42 +0100
parents
children
comparison
equal deleted inserted replaced
2488:78c7992a26ed 2489:e2a7bb875957
1 #!/usr/bin/env python2
2 # -*- coding: utf-8 -*-
3
4 # SAT: a jabber client
5 # Copyright (C) 2009-2018 Jérôme Poisson (goffi@goffi.org)
6
7 # This program is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU Affero General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
11
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU Affero General Public License for more details.
16
17 # You should have received a copy of the GNU Affero General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20 """ interfaces """
21
22 from sat.core import exceptions
23 from sat.core.constants import Const as C
24 from sat.core.log import getLogger
25 from twisted.protocols import basic
26 from twisted.internet import interfaces
27 from zope import interface
28 import uuid
29 import os
30
31 log = getLogger(__name__)
32
33
34 class IStreamProducer(interface.Interface):
35
36 def startStream(consumer):
37 """start producing the stream
38
39 @return (D): deferred fired when stream is finished
40 """
41
42
43 class SatFile(object):
44 """A file-like object to have high level files manipulation"""
45 # TODO: manage "with" statement
46
47 def __init__(self, host, client, path, mode='rb', uid=None, size=None, data_cb=None, auto_end_signals=True):
48 """
49 @param host: %(doc_host)s
50 @param path(str): path of the file to get
51 @param mode(str): same as for built-in "open" function
52 @param uid(unicode, None): unique id identifing this progressing element
53 This uid will be used with self.host.progressGet
54 will be automaticaly generated if None
55 @param size(None, int): size of the file (when known in advance)
56 @param data_cb(None, callable): method to call on each data read/write
57 mainly useful to do things like calculating hash
58 @param auto_end_signals(bool): if True, progressFinished and progressError signals are automatically sent
59 if False, you'll have to call self.progressFinished and self.progressError yourself
60 progressStarted signal is always sent automatically
61 """
62 self.host = host
63 self.profile = client.profile
64 self.uid = uid or unicode(uuid.uuid4())
65 self._file = open(path, mode)
66 self.size = size
67 self.data_cb = data_cb
68 self.auto_end_signals = auto_end_signals
69 metadata = self.getProgressMetadata()
70 self.host.registerProgressCb(self.uid, self.getProgress, metadata, profile=client.profile)
71 self.host.bridge.progressStarted(self.uid, metadata, client.profile)
72
73 def checkSize(self):
74 """Check that current size correspond to given size
75
76 must be used when the transfer is supposed to be finished
77 @return (bool): True if the position is the same as given size
78 @raise exceptions.NotFound: size has not be specified
79 """
80 position = self._file.tell()
81 if self.size is None:
82 raise exceptions.NotFound
83 return position == self.size
84
85 def close(self, progress_metadata=None, error=None):
86 """Close the current file
87
88 @param progress_metadata(None, dict): metadata to send with _onProgressFinished message
89 @param error(None, unicode): set to an error message if progress was not successful
90 mutually exclusive with progress_metadata
91 error can happen even if error is None, if current size differ from given size
92 """
93 if self._file.closed:
94 return # avoid double close (which is allowed) error
95 if error is None:
96 try:
97 size_ok = self.checkSize()
98 except exceptions.NotFound:
99 size_ok = True
100 if not size_ok:
101 error = u'declared and actual size mismatch'
102 log.warning(error)
103 progress_metadata = None
104
105 self._file.close()
106
107 if self.auto_end_signals:
108 if error is None:
109 self.progressFinished(progress_metadata)
110 else:
111 assert progress_metadata is None
112 self.progressError(error)
113
114 self.host.removeProgressCb(self.uid, self.profile)
115
116 def progressFinished(self, metadata=None):
117 if metadata is None:
118 metadata = {}
119 self.host.bridge.progressFinished(self.uid, metadata, self.profile)
120
121 def progressError(self, error):
122 self.host.bridge.progressError(self.uid, error, self.profile)
123
124 def flush(self):
125 self._file.flush()
126
127 def write(self, buf):
128 self._file.write(buf)
129 if self.data_cb is not None:
130 return self.data_cb(buf)
131
132 def read(self, size=-1):
133 read = self._file.read(size)
134 if self.data_cb is not None and read:
135 self.data_cb(read)
136 return read
137
138 def seek(self, offset, whence=os.SEEK_SET):
139 self._file.seek(offset, whence)
140
141 def tell(self):
142 return self._file.tell()
143
144 def mode(self):
145 return self._file.mode()
146
147 def getProgressMetadata(self):
148 """Return progression metadata as given to progressStarted
149
150 @return (dict): metadata (check bridge for documentation)
151 """
152 metadata = {'type': C.META_TYPE_FILE}
153
154 mode = self._file.mode
155 if '+' in mode:
156 pass # we have no direction in read/write modes
157 elif mode in ('r', 'rb'):
158 metadata['direction'] = 'out'
159 elif mode in ('w', 'wb'):
160 metadata['direction'] = 'in'
161 elif 'U' in mode:
162 metadata['direction'] = 'out'
163 else:
164 raise exceptions.InternalError
165
166 metadata['name'] = self._file.name
167
168 return metadata
169
170 def getProgress(self, progress_id, profile):
171 ret = {'position': self._file.tell()}
172 if self.size:
173 ret['size'] = self.size
174 return ret
175
176
177 @interface.implementer(IStreamProducer)
178 @interface.implementer(interfaces.IConsumer)
179 class FileStreamObject(basic.FileSender):
180
181 def __init__(self, host, client, path, **kwargs):
182 """
183
184 A SatFile will be created and put in self.file_obj
185 @param path(unicode): path to the file
186 @param **kwargs: kw arguments to pass to SatFile
187 """
188 self.file_obj = SatFile(host, client, path, **kwargs)
189
190 def registerProducer(self, producer, streaming):
191 pass
192
193 def startStream(self, consumer):
194 return self.beginFileTransfer(self.file_obj, consumer)
195
196 def write(self, data):
197 self.file_obj.write(data)
198
199 def close(self, *args, **kwargs):
200 self.file_obj.close(*args, **kwargs)