Mercurial > libervia-backend
comparison src/plugins/plugin_xep_0047.py @ 2489:e2a7bb875957
plugin pipe/stream, file transfert: refactoring and improvments:
this is a big patch as things had to be changed at the same time.
- changed methods using profile argument to use client instead
- move SatFile in a new tools.stream module, has it should be part of core, not a plugin
- new IStreamProducer interface, to handler starting a pull producer
- new FileStreamObject which create a stream producer/consumer from a SatFile
- plugin pipe is no more using unix named pipe, as it complicate the thing,
special care need to be taken to not block, and it's generally not necessary.
Instead a socket is now used, so the plugin has been renomed to jingle stream.
- bad connection/error should be better handler in jingle stream plugin, and code should not block anymore
- jp pipe commands have been updated accordingly
fix bug 237
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 08 Feb 2018 00:37:42 +0100 |
parents | 0046283a285d |
children | 67cc54b01a12 |
comparison
equal
deleted
inserted
replaced
2488:78c7992a26ed | 2489:e2a7bb875957 |
---|---|
124 | 124 |
125 session deferred is fired when transfer is finished | 125 session deferred is fired when transfer is finished |
126 """ | 126 """ |
127 return self._createSession(*args, **kwargs)[DEFER_KEY] | 127 return self._createSession(*args, **kwargs)[DEFER_KEY] |
128 | 128 |
129 def _createSession(self, file_obj, to_jid, sid, profile): | 129 def _createSession(self, client, stream_object, to_jid, sid): |
130 """Called when a bytestream is imminent | 130 """Called when a bytestream is imminent |
131 | 131 |
132 @param file_obj(file): File object where data will be written | 132 @param stream_object(IConsumer): stream object where data will be written |
133 @param to_jid(jid.JId): jid of the other peer | 133 @param to_jid(jid.JId): jid of the other peer |
134 @param sid(unicode): session id | 134 @param sid(unicode): session id |
135 @param profile: %(doc_profile)s | |
136 @return (dict): session data | 135 @return (dict): session data |
137 """ | 136 """ |
138 client = self.host.getClient(profile) | |
139 if sid in client.xep_0047_current_stream: | 137 if sid in client.xep_0047_current_stream: |
140 raise exceptions.ConflictError(u'A session with this id already exists !') | 138 raise exceptions.ConflictError(u'A session with this id already exists !') |
141 session_data = client.xep_0047_current_stream[sid] = \ | 139 session_data = client.xep_0047_current_stream[sid] = \ |
142 {'id': sid, | 140 {'id': sid, |
143 DEFER_KEY: defer.Deferred(), | 141 DEFER_KEY: defer.Deferred(), |
144 'to': to_jid, | 142 'to': to_jid, |
145 'file_obj': file_obj, | 143 'stream_object': stream_object, |
146 'seq': -1, | 144 'seq': -1, |
147 'timer': reactor.callLater(TIMEOUT, self._timeOut, sid, client), | 145 'timer': reactor.callLater(TIMEOUT, self._timeOut, sid, client), |
148 } | 146 } |
149 | 147 |
150 return session_data | 148 return session_data |
151 | 149 |
152 def _onIBBOpen(self, iq_elt, profile): | 150 def _onIBBOpen(self, iq_elt, client): |
153 """"Called when an IBB <open> element is received | 151 """"Called when an IBB <open> element is received |
154 | 152 |
155 @param iq_elt(domish.Element): the whole <iq> stanza | 153 @param iq_elt(domish.Element): the whole <iq> stanza |
156 @param profile: %(doc_profile)s | |
157 """ | 154 """ |
158 log.debug(_(u"IBB stream opening")) | 155 log.debug(_(u"IBB stream opening")) |
159 iq_elt.handled = True | 156 iq_elt.handled = True |
160 client = self.host.getClient(profile) | |
161 open_elt = iq_elt.elements(NS_IBB, 'open').next() | 157 open_elt = iq_elt.elements(NS_IBB, 'open').next() |
162 block_size = open_elt.getAttribute('block-size') | 158 block_size = open_elt.getAttribute('block-size') |
163 sid = open_elt.getAttribute('sid') | 159 sid = open_elt.getAttribute('sid') |
164 stanza = open_elt.getAttribute('stanza', 'iq') | 160 stanza = open_elt.getAttribute('stanza', 'iq') |
165 if not sid or not block_size or int(block_size) > 65535: | 161 if not sid or not block_size or int(block_size) > 65535: |
182 session_data["observer_cb"] = observer_cb = self._onIBBData | 178 session_data["observer_cb"] = observer_cb = self._onIBBData |
183 event_close = IBB_CLOSE.format(sid) | 179 event_close = IBB_CLOSE.format(sid) |
184 # we now set the stream observer to look after data packet | 180 # we now set the stream observer to look after data packet |
185 # FIXME: if we never get the events, the observers stay. | 181 # FIXME: if we never get the events, the observers stay. |
186 # would be better to have generic observer and check id once triggered | 182 # would be better to have generic observer and check id once triggered |
187 client.xmlstream.addObserver(event_data, observer_cb, profile=profile) | 183 client.xmlstream.addObserver(event_data, observer_cb, client=client) |
188 client.xmlstream.addOnetimeObserver(event_close, self._onIBBClose, profile=profile) | 184 client.xmlstream.addOnetimeObserver(event_close, self._onIBBClose, client=client) |
189 # finally, we send the accept stanza | 185 # finally, we send the accept stanza |
190 iq_result_elt = xmlstream.toResponse(iq_elt, 'result') | 186 iq_result_elt = xmlstream.toResponse(iq_elt, 'result') |
191 client.send(iq_result_elt) | 187 client.send(iq_result_elt) |
192 | 188 |
193 def _onIBBClose(self, iq_elt, profile): | 189 def _onIBBClose(self, iq_elt, client): |
194 """"Called when an IBB <close> element is received | 190 """"Called when an IBB <close> element is received |
195 | 191 |
196 @param iq_elt(domish.Element): the whole <iq> stanza | 192 @param iq_elt(domish.Element): the whole <iq> stanza |
197 @param profile: %(doc_profile)s | |
198 """ | 193 """ |
199 iq_elt.handled = True | 194 iq_elt.handled = True |
200 client = self.host.getClient(profile) | |
201 log.debug(_("IBB stream closing")) | 195 log.debug(_("IBB stream closing")) |
202 close_elt = iq_elt.elements(NS_IBB, 'close').next() | 196 close_elt = iq_elt.elements(NS_IBB, 'close').next() |
203 # XXX: this observer is only triggered on valid sid, so we don't need to check it | 197 # XXX: this observer is only triggered on valid sid, so we don't need to check it |
204 sid = close_elt['sid'] | 198 sid = close_elt['sid'] |
205 | 199 |
206 iq_result_elt = xmlstream.toResponse(iq_elt, 'result') | 200 iq_result_elt = xmlstream.toResponse(iq_elt, 'result') |
207 client.send(iq_result_elt) | 201 client.send(iq_result_elt) |
208 self._killSession(sid, client) | 202 self._killSession(sid, client) |
209 | 203 |
210 def _onIBBData(self, element, profile): | 204 def _onIBBData(self, element, client): |
211 """Observer called on <iq> or <message> stanzas with data element | 205 """Observer called on <iq> or <message> stanzas with data element |
212 | 206 |
213 Manage the data elelement (check validity and write to the file_obj) | 207 Manage the data elelement (check validity and write to the stream_object) |
214 @param element(domish.Element): <iq> or <message> stanza | 208 @param element(domish.Element): <iq> or <message> stanza |
215 @param profile: %(doc_profile)s | |
216 """ | 209 """ |
217 element.handled = True | 210 element.handled = True |
218 client = self.host.getClient(profile) | |
219 data_elt = element.elements(NS_IBB, 'data').next() | 211 data_elt = element.elements(NS_IBB, 'data').next() |
220 sid = data_elt['sid'] | 212 sid = data_elt['sid'] |
221 | 213 |
222 try: | 214 try: |
223 session_data = client.xep_0047_current_stream[sid] | 215 session_data = client.xep_0047_current_stream[sid] |
224 except KeyError: | 216 except KeyError: |
225 log.warning(_(u"Received data for an unknown session id")) | 217 log.warning(_(u"Received data for an unknown session id")) |
226 return self._sendError('item-not-found', None, element, client) | 218 return self._sendError('item-not-found', None, element, client) |
227 | 219 |
228 from_jid = session_data["to"] | 220 from_jid = session_data["to"] |
229 file_obj = session_data["file_obj"] | 221 stream_object = session_data["stream_object"] |
230 | 222 |
231 if from_jid.full() != element['from']: | 223 if from_jid.full() != element['from']: |
232 log.warning(_(u"sended jid inconsistency (man in the middle attack attempt ?)\ninitial={initial}\ngiven={given}").format(initial=from_jid, given=element['from'])) | 224 log.warning(_(u"sended jid inconsistency (man in the middle attack attempt ?)\ninitial={initial}\ngiven={given}").format(initial=from_jid, given=element['from'])) |
233 if element.name == 'iq': | 225 if element.name == 'iq': |
234 self._sendError('not-acceptable', sid, element, client) | 226 self._sendError('not-acceptable', sid, element, client) |
246 # we reset the timeout: | 238 # we reset the timeout: |
247 session_data["timer"].reset(TIMEOUT) | 239 session_data["timer"].reset(TIMEOUT) |
248 | 240 |
249 # we can now decode the data | 241 # we can now decode the data |
250 try: | 242 try: |
251 file_obj.write(base64.b64decode(str(data_elt))) | 243 stream_object.write(base64.b64decode(str(data_elt))) |
252 except TypeError: | 244 except TypeError: |
253 # The base64 data is invalid | 245 # The base64 data is invalid |
254 log.warning(_(u"Invalid base64 data")) | 246 log.warning(_(u"Invalid base64 data")) |
255 if element.name == 'iq': | 247 if element.name == 'iq': |
256 self._sendError('not-acceptable', sid, element, client) | 248 self._sendError('not-acceptable', sid, element, client) |
274 log.warning(u"Error while managing in-band bytestream session, cancelling: {}".format(error_condition)) | 266 log.warning(u"Error while managing in-band bytestream session, cancelling: {}".format(error_condition)) |
275 if sid is not None: | 267 if sid is not None: |
276 self._killSession(sid, client, error_condition) | 268 self._killSession(sid, client, error_condition) |
277 client.send(iq_elt) | 269 client.send(iq_elt) |
278 | 270 |
279 def startStream(self, file_obj, to_jid, sid, block_size=None, profile=C.PROF_KEY_NONE): | 271 def startStream(self, client, stream_object, to_jid, sid, block_size=None): |
280 """Launch the stream workflow | 272 """Launch the stream workflow |
281 | 273 |
282 @param file_obj(file): file_obj to send | 274 @param stream_object(ifaces.IStreamProducer): stream object to send |
283 @param to_jid(jid.JID): JID of the recipient | 275 @param to_jid(jid.JID): JID of the recipient |
284 @param sid(unicode): Stream session id | 276 @param sid(unicode): Stream session id |
285 @param block_size(int, None): size of the block (or None for default) | 277 @param block_size(int, None): size of the block (or None for default) |
286 @param profile: %(doc_profile)s | 278 """ |
287 """ | 279 session_data = self._createSession(client, stream_object, to_jid, sid) |
288 session_data = self._createSession(file_obj, to_jid, sid, profile) | |
289 client = self.host.getClient(profile) | |
290 | 280 |
291 if block_size is None: | 281 if block_size is None: |
292 block_size = XEP_0047.BLOCK_SIZE | 282 block_size = XEP_0047.BLOCK_SIZE |
293 assert block_size <= 65535 | 283 assert block_size <= 65535 |
294 session_data["block_size"] = block_size | 284 session_data["block_size"] = block_size |
311 @param session_data(dict): data of this streaming session | 301 @param session_data(dict): data of this streaming session |
312 @param client: %(doc_client)s | 302 @param client: %(doc_client)s |
313 """ | 303 """ |
314 session_data["timer"].reset(TIMEOUT) | 304 session_data["timer"].reset(TIMEOUT) |
315 | 305 |
316 buffer_ = session_data["file_obj"].read(session_data["block_size"]) | 306 buffer_ = session_data["stream_object"].read(session_data["block_size"]) |
317 if buffer_: | 307 if buffer_: |
318 next_iq_elt = client.IQ() | 308 next_iq_elt = client.IQ() |
319 next_iq_elt['to'] = session_data["to"].full() | 309 next_iq_elt['to'] = session_data["to"].full() |
320 data_elt = next_iq_elt.addElement((NS_IBB, 'data')) | 310 data_elt = next_iq_elt.addElement((NS_IBB, 'data')) |
321 seq = session_data['seq'] = (session_data['seq'] + 1) % 65535 | 311 seq = session_data['seq'] = (session_data['seq'] + 1) % 65535 |
355 | 345 |
356 def __init__(self, parent): | 346 def __init__(self, parent): |
357 self.plugin_parent = parent | 347 self.plugin_parent = parent |
358 | 348 |
359 def connectionInitialized(self): | 349 def connectionInitialized(self): |
360 self.xmlstream.addObserver(IBB_OPEN, self.plugin_parent._onIBBOpen, profile=self.parent.profile) | 350 self.xmlstream.addObserver(IBB_OPEN, self.plugin_parent._onIBBOpen, client=self.parent) |
361 | 351 |
362 def getDiscoInfo(self, requestor, target, nodeIdentifier=''): | 352 def getDiscoInfo(self, requestor, target, nodeIdentifier=''): |
363 return [disco.DiscoFeature(NS_IBB)] | 353 return [disco.DiscoFeature(NS_IBB)] |
364 | 354 |
365 def getDiscoItems(self, requestor, target, nodeIdentifier=''): | 355 def getDiscoItems(self, requestor, target, nodeIdentifier=''): |