comparison src/plugins/plugin_xep_0047.py @ 2489:e2a7bb875957

plugin pipe/stream, file transfert: refactoring and improvments: this is a big patch as things had to be changed at the same time. - changed methods using profile argument to use client instead - move SatFile in a new tools.stream module, has it should be part of core, not a plugin - new IStreamProducer interface, to handler starting a pull producer - new FileStreamObject which create a stream producer/consumer from a SatFile - plugin pipe is no more using unix named pipe, as it complicate the thing, special care need to be taken to not block, and it's generally not necessary. Instead a socket is now used, so the plugin has been renomed to jingle stream. - bad connection/error should be better handler in jingle stream plugin, and code should not block anymore - jp pipe commands have been updated accordingly fix bug 237
author Goffi <goffi@goffi.org>
date Thu, 08 Feb 2018 00:37:42 +0100
parents 0046283a285d
children 67cc54b01a12
comparison
equal deleted inserted replaced
2488:78c7992a26ed 2489:e2a7bb875957
124 124
125 session deferred is fired when transfer is finished 125 session deferred is fired when transfer is finished
126 """ 126 """
127 return self._createSession(*args, **kwargs)[DEFER_KEY] 127 return self._createSession(*args, **kwargs)[DEFER_KEY]
128 128
129 def _createSession(self, file_obj, to_jid, sid, profile): 129 def _createSession(self, client, stream_object, to_jid, sid):
130 """Called when a bytestream is imminent 130 """Called when a bytestream is imminent
131 131
132 @param file_obj(file): File object where data will be written 132 @param stream_object(IConsumer): stream object where data will be written
133 @param to_jid(jid.JId): jid of the other peer 133 @param to_jid(jid.JId): jid of the other peer
134 @param sid(unicode): session id 134 @param sid(unicode): session id
135 @param profile: %(doc_profile)s
136 @return (dict): session data 135 @return (dict): session data
137 """ 136 """
138 client = self.host.getClient(profile)
139 if sid in client.xep_0047_current_stream: 137 if sid in client.xep_0047_current_stream:
140 raise exceptions.ConflictError(u'A session with this id already exists !') 138 raise exceptions.ConflictError(u'A session with this id already exists !')
141 session_data = client.xep_0047_current_stream[sid] = \ 139 session_data = client.xep_0047_current_stream[sid] = \
142 {'id': sid, 140 {'id': sid,
143 DEFER_KEY: defer.Deferred(), 141 DEFER_KEY: defer.Deferred(),
144 'to': to_jid, 142 'to': to_jid,
145 'file_obj': file_obj, 143 'stream_object': stream_object,
146 'seq': -1, 144 'seq': -1,
147 'timer': reactor.callLater(TIMEOUT, self._timeOut, sid, client), 145 'timer': reactor.callLater(TIMEOUT, self._timeOut, sid, client),
148 } 146 }
149 147
150 return session_data 148 return session_data
151 149
152 def _onIBBOpen(self, iq_elt, profile): 150 def _onIBBOpen(self, iq_elt, client):
153 """"Called when an IBB <open> element is received 151 """"Called when an IBB <open> element is received
154 152
155 @param iq_elt(domish.Element): the whole <iq> stanza 153 @param iq_elt(domish.Element): the whole <iq> stanza
156 @param profile: %(doc_profile)s
157 """ 154 """
158 log.debug(_(u"IBB stream opening")) 155 log.debug(_(u"IBB stream opening"))
159 iq_elt.handled = True 156 iq_elt.handled = True
160 client = self.host.getClient(profile)
161 open_elt = iq_elt.elements(NS_IBB, 'open').next() 157 open_elt = iq_elt.elements(NS_IBB, 'open').next()
162 block_size = open_elt.getAttribute('block-size') 158 block_size = open_elt.getAttribute('block-size')
163 sid = open_elt.getAttribute('sid') 159 sid = open_elt.getAttribute('sid')
164 stanza = open_elt.getAttribute('stanza', 'iq') 160 stanza = open_elt.getAttribute('stanza', 'iq')
165 if not sid or not block_size or int(block_size) > 65535: 161 if not sid or not block_size or int(block_size) > 65535:
182 session_data["observer_cb"] = observer_cb = self._onIBBData 178 session_data["observer_cb"] = observer_cb = self._onIBBData
183 event_close = IBB_CLOSE.format(sid) 179 event_close = IBB_CLOSE.format(sid)
184 # we now set the stream observer to look after data packet 180 # we now set the stream observer to look after data packet
185 # FIXME: if we never get the events, the observers stay. 181 # FIXME: if we never get the events, the observers stay.
186 # would be better to have generic observer and check id once triggered 182 # would be better to have generic observer and check id once triggered
187 client.xmlstream.addObserver(event_data, observer_cb, profile=profile) 183 client.xmlstream.addObserver(event_data, observer_cb, client=client)
188 client.xmlstream.addOnetimeObserver(event_close, self._onIBBClose, profile=profile) 184 client.xmlstream.addOnetimeObserver(event_close, self._onIBBClose, client=client)
189 # finally, we send the accept stanza 185 # finally, we send the accept stanza
190 iq_result_elt = xmlstream.toResponse(iq_elt, 'result') 186 iq_result_elt = xmlstream.toResponse(iq_elt, 'result')
191 client.send(iq_result_elt) 187 client.send(iq_result_elt)
192 188
193 def _onIBBClose(self, iq_elt, profile): 189 def _onIBBClose(self, iq_elt, client):
194 """"Called when an IBB <close> element is received 190 """"Called when an IBB <close> element is received
195 191
196 @param iq_elt(domish.Element): the whole <iq> stanza 192 @param iq_elt(domish.Element): the whole <iq> stanza
197 @param profile: %(doc_profile)s
198 """ 193 """
199 iq_elt.handled = True 194 iq_elt.handled = True
200 client = self.host.getClient(profile)
201 log.debug(_("IBB stream closing")) 195 log.debug(_("IBB stream closing"))
202 close_elt = iq_elt.elements(NS_IBB, 'close').next() 196 close_elt = iq_elt.elements(NS_IBB, 'close').next()
203 # XXX: this observer is only triggered on valid sid, so we don't need to check it 197 # XXX: this observer is only triggered on valid sid, so we don't need to check it
204 sid = close_elt['sid'] 198 sid = close_elt['sid']
205 199
206 iq_result_elt = xmlstream.toResponse(iq_elt, 'result') 200 iq_result_elt = xmlstream.toResponse(iq_elt, 'result')
207 client.send(iq_result_elt) 201 client.send(iq_result_elt)
208 self._killSession(sid, client) 202 self._killSession(sid, client)
209 203
210 def _onIBBData(self, element, profile): 204 def _onIBBData(self, element, client):
211 """Observer called on <iq> or <message> stanzas with data element 205 """Observer called on <iq> or <message> stanzas with data element
212 206
213 Manage the data elelement (check validity and write to the file_obj) 207 Manage the data elelement (check validity and write to the stream_object)
214 @param element(domish.Element): <iq> or <message> stanza 208 @param element(domish.Element): <iq> or <message> stanza
215 @param profile: %(doc_profile)s
216 """ 209 """
217 element.handled = True 210 element.handled = True
218 client = self.host.getClient(profile)
219 data_elt = element.elements(NS_IBB, 'data').next() 211 data_elt = element.elements(NS_IBB, 'data').next()
220 sid = data_elt['sid'] 212 sid = data_elt['sid']
221 213
222 try: 214 try:
223 session_data = client.xep_0047_current_stream[sid] 215 session_data = client.xep_0047_current_stream[sid]
224 except KeyError: 216 except KeyError:
225 log.warning(_(u"Received data for an unknown session id")) 217 log.warning(_(u"Received data for an unknown session id"))
226 return self._sendError('item-not-found', None, element, client) 218 return self._sendError('item-not-found', None, element, client)
227 219
228 from_jid = session_data["to"] 220 from_jid = session_data["to"]
229 file_obj = session_data["file_obj"] 221 stream_object = session_data["stream_object"]
230 222
231 if from_jid.full() != element['from']: 223 if from_jid.full() != element['from']:
232 log.warning(_(u"sended jid inconsistency (man in the middle attack attempt ?)\ninitial={initial}\ngiven={given}").format(initial=from_jid, given=element['from'])) 224 log.warning(_(u"sended jid inconsistency (man in the middle attack attempt ?)\ninitial={initial}\ngiven={given}").format(initial=from_jid, given=element['from']))
233 if element.name == 'iq': 225 if element.name == 'iq':
234 self._sendError('not-acceptable', sid, element, client) 226 self._sendError('not-acceptable', sid, element, client)
246 # we reset the timeout: 238 # we reset the timeout:
247 session_data["timer"].reset(TIMEOUT) 239 session_data["timer"].reset(TIMEOUT)
248 240
249 # we can now decode the data 241 # we can now decode the data
250 try: 242 try:
251 file_obj.write(base64.b64decode(str(data_elt))) 243 stream_object.write(base64.b64decode(str(data_elt)))
252 except TypeError: 244 except TypeError:
253 # The base64 data is invalid 245 # The base64 data is invalid
254 log.warning(_(u"Invalid base64 data")) 246 log.warning(_(u"Invalid base64 data"))
255 if element.name == 'iq': 247 if element.name == 'iq':
256 self._sendError('not-acceptable', sid, element, client) 248 self._sendError('not-acceptable', sid, element, client)
274 log.warning(u"Error while managing in-band bytestream session, cancelling: {}".format(error_condition)) 266 log.warning(u"Error while managing in-band bytestream session, cancelling: {}".format(error_condition))
275 if sid is not None: 267 if sid is not None:
276 self._killSession(sid, client, error_condition) 268 self._killSession(sid, client, error_condition)
277 client.send(iq_elt) 269 client.send(iq_elt)
278 270
279 def startStream(self, file_obj, to_jid, sid, block_size=None, profile=C.PROF_KEY_NONE): 271 def startStream(self, client, stream_object, to_jid, sid, block_size=None):
280 """Launch the stream workflow 272 """Launch the stream workflow
281 273
282 @param file_obj(file): file_obj to send 274 @param stream_object(ifaces.IStreamProducer): stream object to send
283 @param to_jid(jid.JID): JID of the recipient 275 @param to_jid(jid.JID): JID of the recipient
284 @param sid(unicode): Stream session id 276 @param sid(unicode): Stream session id
285 @param block_size(int, None): size of the block (or None for default) 277 @param block_size(int, None): size of the block (or None for default)
286 @param profile: %(doc_profile)s 278 """
287 """ 279 session_data = self._createSession(client, stream_object, to_jid, sid)
288 session_data = self._createSession(file_obj, to_jid, sid, profile)
289 client = self.host.getClient(profile)
290 280
291 if block_size is None: 281 if block_size is None:
292 block_size = XEP_0047.BLOCK_SIZE 282 block_size = XEP_0047.BLOCK_SIZE
293 assert block_size <= 65535 283 assert block_size <= 65535
294 session_data["block_size"] = block_size 284 session_data["block_size"] = block_size
311 @param session_data(dict): data of this streaming session 301 @param session_data(dict): data of this streaming session
312 @param client: %(doc_client)s 302 @param client: %(doc_client)s
313 """ 303 """
314 session_data["timer"].reset(TIMEOUT) 304 session_data["timer"].reset(TIMEOUT)
315 305
316 buffer_ = session_data["file_obj"].read(session_data["block_size"]) 306 buffer_ = session_data["stream_object"].read(session_data["block_size"])
317 if buffer_: 307 if buffer_:
318 next_iq_elt = client.IQ() 308 next_iq_elt = client.IQ()
319 next_iq_elt['to'] = session_data["to"].full() 309 next_iq_elt['to'] = session_data["to"].full()
320 data_elt = next_iq_elt.addElement((NS_IBB, 'data')) 310 data_elt = next_iq_elt.addElement((NS_IBB, 'data'))
321 seq = session_data['seq'] = (session_data['seq'] + 1) % 65535 311 seq = session_data['seq'] = (session_data['seq'] + 1) % 65535
355 345
356 def __init__(self, parent): 346 def __init__(self, parent):
357 self.plugin_parent = parent 347 self.plugin_parent = parent
358 348
359 def connectionInitialized(self): 349 def connectionInitialized(self):
360 self.xmlstream.addObserver(IBB_OPEN, self.plugin_parent._onIBBOpen, profile=self.parent.profile) 350 self.xmlstream.addObserver(IBB_OPEN, self.plugin_parent._onIBBOpen, client=self.parent)
361 351
362 def getDiscoInfo(self, requestor, target, nodeIdentifier=''): 352 def getDiscoInfo(self, requestor, target, nodeIdentifier=''):
363 return [disco.DiscoFeature(NS_IBB)] 353 return [disco.DiscoFeature(NS_IBB)]
364 354
365 def getDiscoItems(self, requestor, target, nodeIdentifier=''): 355 def getDiscoItems(self, requestor, target, nodeIdentifier=''):