comparison src/plugins/plugin_xep_0047.py @ 1524:7b0fcefd52d4

plugin XEP-0047, XEP-0096: In-Band Bystream plugin cleaning: - some renaming, comments improvments, etc - progress callback is no more managed here, as it will be managed by application - no more file data is used, beside file_obj - a proper Deferred is used instead of success and error callbacks - more clean error sending method plugin XEP-0096 has been updated to handle changes. Its temporarily partially broken though
author Goffi <goffi@goffi.org>
date Fri, 25 Sep 2015 19:19:12 +0200
parents 3265a2639182
children 6a8dd91476f0
comparison
equal deleted inserted replaced
1523:0209f8d35873 1524:7b0fcefd52d4
18 # along with this program. If not, see <http://www.gnu.org/licenses/>. 18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 19
20 from sat.core.i18n import _ 20 from sat.core.i18n import _
21 from sat.core.log import getLogger 21 from sat.core.log import getLogger
22 log = getLogger(__name__) 22 log = getLogger(__name__)
23 from twisted.words.protocols.jabber import client as jabber_client, jid 23 from sat.core import exceptions
24 from twisted.words.xish import domish 24 from twisted.words.protocols.jabber import client as jabber_client
25 from twisted.words.protocols.jabber import jid
26 from twisted.words.protocols.jabber import xmlstream
27 from twisted.words.protocols.jabber import error
25 from twisted.internet import reactor 28 from twisted.internet import reactor
29 from twisted.internet import defer
30 from twisted.python import failure
26 31
27 from wokkel import disco, iwokkel 32 from wokkel import disco, iwokkel
28 33
29 from zope.interface import implements 34 from zope.interface import implements
30 35
37 42
38 MESSAGE = '/message' 43 MESSAGE = '/message'
39 IQ_SET = '/iq[@type="set"]' 44 IQ_SET = '/iq[@type="set"]'
40 NS_IBB = 'http://jabber.org/protocol/ibb' 45 NS_IBB = 'http://jabber.org/protocol/ibb'
41 IBB_OPEN = IQ_SET + '/open[@xmlns="' + NS_IBB + '"]' 46 IBB_OPEN = IQ_SET + '/open[@xmlns="' + NS_IBB + '"]'
42 IBB_CLOSE = IQ_SET + '/close[@xmlns="' + NS_IBB + '" and @sid="%s"]' 47 IBB_CLOSE = IQ_SET + '/close[@xmlns="' + NS_IBB + '" and @sid="{}"]'
43 IBB_IQ_DATA = IQ_SET + '/data[@xmlns="' + NS_IBB + '" and @sid="%s"]' 48 IBB_IQ_DATA = IQ_SET + '/data[@xmlns="' + NS_IBB + '" and @sid="{}"]'
44 IBB_MESSAGE_DATA = MESSAGE + '/data[@xmlns="' + NS_IBB + '" and @sid="%s"]' 49 IBB_MESSAGE_DATA = MESSAGE + '/data[@xmlns="' + NS_IBB + '" and @sid="{}"]'
45 TIMEOUT = 60 # timeout for workflow 50 TIMEOUT = 60 # timeout for workflow
46 BLOCK_SIZE = 4096 51 DEFER_KEY = 'finished' # key of the deferred used to track session end
47 52
48 PLUGIN_INFO = { 53 PLUGIN_INFO = {
49 "name": "In-Band Bytestream Plugin", 54 "name": "In-Band Bytestream Plugin",
50 "import_name": "XEP-0047", 55 "import_name": "XEP-0047",
51 "type": "XEP", 56 "type": "XEP",
56 } 61 }
57 62
58 63
59 class XEP_0047(object): 64 class XEP_0047(object):
60 NAMESPACE = NS_IBB 65 NAMESPACE = NS_IBB
66 BLOCK_SIZE = 4096
61 67
62 def __init__(self, host): 68 def __init__(self, host):
63 log.info(_("In-Band Bytestreams plugin initialization")) 69 log.info(_("In-Band Bytestreams plugin initialization"))
64 self.host = host 70 self.host = host
65 71
68 74
69 def profileConnected(self, profile): 75 def profileConnected(self, profile):
70 client = self.host.getClient(profile) 76 client = self.host.getClient(profile)
71 client.xep_0047_current_stream = {} # key: stream_id, value: data(dict) 77 client.xep_0047_current_stream = {} # key: stream_id, value: data(dict)
72 78
73 def _timeOut(self, sid, profile): 79 def _timeOut(self, sid, client):
74 """Delecte current_stream id, called after timeout 80 """Delete current_stream id, called after timeout
75 @param id: id of client.xep_0047_current_stream""" 81
76 log.info(_("In-Band Bytestream: TimeOut reached for id %(sid)s [%(profile)s]") 82 @param sid(unicode): session id of client.xep_0047_current_stream
77 % {"sid": sid, "profile": profile}) 83 @param client: %(doc_client)s
78 self._killId(sid, False, "TIMEOUT", profile) 84 """
79 85 log.info(_("In-Band Bytestream: TimeOut reached for id {sid} [{profile}]")
80 def _killId(self, sid, success=False, failure_reason="UNKNOWN", profile=None): 86 .format(sid=sid, profile=client.profile))
81 """Delete an current_stream id, clean up associated observers 87 self._killSession(sid, client, "TIMEOUT")
82 @param sid: id of client.xep_0047_current_stream""" 88
83 assert(profile) 89 def _killSession(self, sid, client, failure_reason=None):
84 client = self.host.getClient(profile) 90 """Delete a current_stream id, clean up associated observers
85 if sid not in client.xep_0047_current_stream: 91
92 @param sid(unicode): session id
93 @param client: %(doc_client)s
94 @param failure_reason(None, unicode): if None the session is successful
95 else, will be used to call failure_cb
96 """
97 try:
98 session = client.xep_0047_current_stream[sid]
99 except KeyError:
86 log.warning(_("kill id called on a non existant id")) 100 log.warning(_("kill id called on a non existant id"))
87 return 101 return
88 if "observer_cb" in client.xep_0047_current_stream[sid]: 102
89 client.xmlstream.removeObserver(client.xep_0047_current_stream[sid]["event_data"], client.xep_0047_current_stream[sid]["observer_cb"]) 103 try:
90 if client.xep_0047_current_stream[sid]['timer'].active(): 104 observer_cb = session['observer_cb']
91 client.xep_0047_current_stream[sid]['timer'].cancel() 105 except KeyError:
92 if "size" in client.xep_0047_current_stream[sid]: 106 pass
93 self.host.removeProgressCB(sid, profile) 107 else:
94 108 client.xmlstream.removeObserver(session["event_data"], observer_cb)
95 file_obj = client.xep_0047_current_stream[sid]['file_obj'] 109
96 success_cb = client.xep_0047_current_stream[sid]['success_cb'] 110 if session['timer'].active():
97 failure_cb = client.xep_0047_current_stream[sid]['failure_cb'] 111 session['timer'].cancel()
98 112
99 del client.xep_0047_current_stream[sid] 113 del client.xep_0047_current_stream[sid]
100 114
115 success = failure_reason is None
116 stream_d = session[DEFER_KEY]
117
101 if success: 118 if success:
102 success_cb(sid, file_obj, NS_IBB, profile) 119 stream_d.callback(None)
103 else: 120 else:
104 failure_cb(sid, file_obj, NS_IBB, failure_reason, profile) 121 stream_d.errback(failure.Failure(exceptions.DataError(failure_reason)))
105 122
106 def getProgress(self, sid, data, profile): 123 def createSession(self, *args, **kwargs):
107 """Fill data with position of current transfer""" 124 """like [_createSession] but return the session deferred instead of the whole session
108 client = self.host.getClient(profile) 125
109 try: 126 session deferred is fired when transfer is finished
110 file_obj = client.xep_0047_current_stream[sid]["file_obj"] 127 """
111 data["position"] = str(file_obj.tell()) 128 return self._createSession(*args, **kwargs)[DEFER_KEY]
112 data["size"] = str(client.xep_0047_current_stream[sid]["size"]) 129
113 except: 130 def _createSession(self, file_obj, to_jid, sid, profile):
114 pass
115
116 def prepareToReceive(self, from_jid, sid, file_obj, size, success_cb, failure_cb, profile):
117 """Called when a bytestream is imminent 131 """Called when a bytestream is imminent
118 @param from_jid: jid of the sender 132
119 @param sid: Stream id 133 @param file_obj(file): File object where data will be written
120 @param file_obj: File object where data will be written 134 @param to_jid(jid.JId): jid of the other peer
121 @param size: full size of the data, or None if unknown 135 @param sid(unicode): session id
122 @param success_cb: method to call when successfuly finished 136 @param profile: %(doc_profile)s
123 @param failure_cb: method to call when something goes wrong 137 @return (dict): session data
124 @param profile: %(doc_profile)s""" 138 """
125 client = self.host.getClient(profile) 139 client = self.host.getClient(profile)
126 data = client.xep_0047_current_stream[sid] = {} 140 if sid in client.xep_0047_current_stream:
127 data["from"] = from_jid 141 raise exceptions.ConflictError(u'A session with this id already exists !')
128 data["file_obj"] = file_obj 142 session_data = client.xep_0047_current_stream[sid] = \
129 data["seq"] = -1 143 {'id': sid,
130 if size: 144 DEFER_KEY: defer.Deferred(),
131 data["size"] = size 145 'to': to_jid,
132 self.host.registerProgressCB(sid, self.getProgress, profile) 146 'file_obj': file_obj,
133 data["timer"] = reactor.callLater(TIMEOUT, self._timeOut, sid) 147 'seq': -1,
134 data["success_cb"] = success_cb 148 'timer': reactor.callLater(TIMEOUT, self._timeOut, sid, client),
135 data["failure_cb"] = failure_cb 149 }
136 150
137 def streamOpening(self, IQ, profile): 151 return session_data
138 log.debug(_("IBB stream opening")) 152
139 IQ.handled = True 153 def _onIBBOpen(self, iq_elt, profile):
140 client = self.host.getClient(profile) 154 """"Called when an IBB <open> element is received
141 open_elt = IQ.firstChildElement() 155
156 @param iq_elt(domish.Element): the whole <iq> stanza
157 @param profile: %(doc_profile)s
158 """
159 log.debug(_(u"IBB stream opening"))
160 iq_elt.handled = True
161 client = self.host.getClient(profile)
162 open_elt = iq_elt.elements(NS_IBB, 'open').next()
142 block_size = open_elt.getAttribute('block-size') 163 block_size = open_elt.getAttribute('block-size')
143 sid = open_elt.getAttribute('sid') 164 sid = open_elt.getAttribute('sid')
144 stanza = open_elt.getAttribute('stanza', 'iq') 165 stanza = open_elt.getAttribute('stanza', 'iq')
145 if not sid or not block_size or int(block_size) > 65535: 166 if not sid or not block_size or int(block_size) > 65535:
146 log.warning(_(u"malformed IBB transfer: %s" % IQ['id'])) 167 return self._sendError('not-acceptable', sid or None, iq_elt, client)
147 self.sendNotAcceptableError(IQ['id'], IQ['from'], client.xmlstream)
148 return
149 if not sid in client.xep_0047_current_stream: 168 if not sid in client.xep_0047_current_stream:
150 log.warning(_(u"Ignoring unexpected IBB transfer: %s" % sid)) 169 log.warning(_(u"Ignoring unexpected IBB transfer: %s" % sid))
151 self.sendNotAcceptableError(IQ['id'], IQ['from'], client.xmlstream) 170 return self._sendError('not-acceptable', sid or None, iq_elt, client)
152 return 171 session_data = client.xep_0047_current_stream[sid]
153 if client.xep_0047_current_stream[sid]["from"] != jid.JID(IQ['from']): 172 if session_data["to"] != jid.JID(iq_elt['from']):
154 log.warning(_("sended jid inconsistency (man in the middle attack attempt ?)")) 173 log.warning(_("sended jid inconsistency (man in the middle attack attempt ?)"))
155 self.sendNotAcceptableError(IQ['id'], IQ['from'], client.xmlstream) 174 return self._sendError('not-acceptable', sid, iq_elt, client)
156 self._killId(sid, False, "PROTOCOL_ERROR", profile=profile) 175
157 return 176 # at this stage, the session looks ok and will be accepted
158 177
159 #at this stage, the session looks ok and will be accepted 178 # we reset the timeout:
160 179 session_data["timer"].reset(TIMEOUT)
161 #we reset the timeout: 180
162 client.xep_0047_current_stream[sid]["timer"].reset(TIMEOUT) 181 # we save the xmlstream, events and observer data to allow observer removal
163 182 session_data["event_data"] = event_data = (IBB_MESSAGE_DATA if stanza == 'message' else IBB_IQ_DATA).format(sid)
164 #we save the xmlstream, events and observer data to allow observer removal 183 session_data["observer_cb"] = observer_cb = self._onIBBData
165 client.xep_0047_current_stream[sid]["event_data"] = event_data = (IBB_MESSAGE_DATA if stanza == 'message' else IBB_IQ_DATA) % sid 184 event_close = IBB_CLOSE.format(sid)
166 client.xep_0047_current_stream[sid]["observer_cb"] = observer_cb = self.messageData if stanza == 'message' else self.iqData 185 # we now set the stream observer to look after data packet
167 event_close = IBB_CLOSE % sid 186 # FIXME: if we never get the events, the observers stay.
168 #we now set the stream observer to look after data packet 187 # would be better to have generic observer and check id once triggered
169 client.xmlstream.addObserver(event_data, observer_cb, profile=profile) 188 client.xmlstream.addObserver(event_data, observer_cb, profile=profile)
170 client.xmlstream.addOnetimeObserver(event_close, self.streamClosing, profile=profile) 189 client.xmlstream.addOnetimeObserver(event_close, self._onIBBClose, profile=profile)
171 #finally, we send the accept stanza 190 # finally, we send the accept stanza
172 result = domish.Element((None, 'iq')) 191 iq_result_elt = xmlstream.toResponse(iq_elt, 'result')
173 result['type'] = 'result' 192 client.xmlstream.send(iq_result_elt)
174 result['id'] = IQ['id'] 193
175 result['to'] = IQ['from'] 194 def _onIBBClose(self, iq_elt, profile):
176 client.xmlstream.send(result) 195 """"Called when an IBB <close> element is received
177 196
178 def streamClosing(self, IQ, profile): 197 @param iq_elt(domish.Element): the whole <iq> stanza
179 IQ.handled = True 198 @param profile: %(doc_profile)s
199 """
200 iq_elt.handled = True
180 client = self.host.getClient(profile) 201 client = self.host.getClient(profile)
181 log.debug(_("IBB stream closing")) 202 log.debug(_("IBB stream closing"))
182 data_elt = IQ.firstChildElement() 203 close_elt = iq_elt.elements(NS_IBB, 'close').next()
183 sid = data_elt.getAttribute('sid') 204 # XXX: this observer is only triggered on valid sid, so we don't need to check it
184 result = domish.Element((None, 'iq')) 205 sid = close_elt['sid']
185 result['type'] = 'result' 206
186 result['id'] = IQ['id'] 207 iq_result_elt = xmlstream.toResponse(iq_elt, 'result')
187 result['to'] = IQ['from'] 208 client.xmlstream.send(iq_result_elt)
188 client.xmlstream.send(result) 209 self._killSession(sid, client)
189 self._killId(sid, success=True, profile=profile) 210
190 211 def _onIBBData(self, element, profile):
191 def iqData(self, IQ, profile): 212 """Observer called on <iq> or <message> stanzas with data element
192 IQ.handled = True 213
193 client = self.host.getClient(profile) 214 Manage the data elelement (check validity and write to the file_obj)
194 data_elt = IQ.firstChildElement() 215 @param element(domish.Element): <iq> or <message> stanza
195 216 @param profile: %(doc_profile)s
196 if self._manageDataElt(data_elt, 'iq', IQ['id'], jid.JID(IQ['from']), profile): 217 """
197 #and send a success answer 218 element.handled = True
198 result = domish.Element((None, 'iq')) 219 client = self.host.getClient(profile)
199 result['type'] = 'result' 220 data_elt = element.elements(NS_IBB, 'data').next()
200 result['id'] = IQ['id'] 221 sid = data_elt['sid']
201 result['to'] = IQ['from'] 222
202 223 try:
203 client.xmlstream.send(result) 224 session_data = client.xep_0047_current_stream[sid]
204 225 except KeyError:
205 def messageData(self, message_elt, profile): 226 log.warning(_(u"Received data for an unknown session id"))
206 sid = message_elt.getAttribute('id', '') 227 return self._sendError('item-not-found', None, element, client)
207 self._manageDataElt(message_elt, 'message', sid, jid.JID(message_elt['from']), profile) 228
208 229 from_jid = session_data["to"]
209 def _manageDataElt(self, data_elt, stanza, sid, stanza_from_jid, profile): 230 file_obj = session_data["file_obj"]
210 """Manage the data elelement (check validity and write to the file_obj) 231
211 @param data_elt: "data" domish element 232 if from_jid.full() != element['from']:
212 @return: True if success""" 233 log.warning(_(u"sended jid inconsistency (man in the middle attack attempt ?)\ninitial={initial}\ngiven={given}").format(initial=from_jid, given=element['from']))
213 client = self.host.getClient(profile) 234 if element.name == 'iq':
214 sid = data_elt.getAttribute('sid') 235 self._sendError('not-acceptable', sid, element, client)
215 if sid not in client.xep_0047_current_stream: 236 return
216 log.error(_("Received data for an unknown session id")) 237
217 return False 238 session_data["seq"] = (session_data["seq"] + 1) % 65535
218 239 if int(data_elt.getAttribute("seq", -1)) != session_data["seq"]:
219 from_jid = client.xep_0047_current_stream[sid]["from"] 240 log.warning(_(u"Sequence error"))
220 file_obj = client.xep_0047_current_stream[sid]["file_obj"] 241 if element.name == 'iq':
221 242 reason = 'not-acceptable'
222 if stanza_from_jid != from_jid: 243 self._sendError(reason, sid, element, client)
223 log.warning(_("sended jid inconsistency (man in the middle attack attempt ?)")) 244 self.terminateStream(session_data, client, reason)
224 if stanza == 'iq': 245 return
225 self.sendNotAcceptableError(sid, from_jid, client.xmlstream) 246
226 return False 247 # we reset the timeout:
227 248 session_data["timer"].reset(TIMEOUT)
228 client.xep_0047_current_stream[sid]["seq"] += 1 249
229 if int(data_elt.getAttribute("seq", -1)) != client.xep_0047_current_stream[sid]["seq"]: 250 # we can now decode the data
230 log.warning(_("Sequence error"))
231 if stanza == 'iq':
232 self.sendNotAcceptableError(data_elt["id"], from_jid, client.xmlstream)
233 return False
234
235 #we reset the timeout:
236 client.xep_0047_current_stream[sid]["timer"].reset(TIMEOUT)
237
238 #we can now decode the data
239 try: 251 try:
240 file_obj.write(base64.b64decode(str(data_elt))) 252 file_obj.write(base64.b64decode(str(data_elt)))
241 except TypeError: 253 except TypeError:
242 #The base64 data is invalid 254 # The base64 data is invalid
243 log.warning(_("Invalid base64 data")) 255 log.warning(_(u"Invalid base64 data"))
244 if stanza == 'iq': 256 if element.name == 'iq':
245 self.sendNotAcceptableError(data_elt["id"], from_jid, client.xmlstream) 257 self._sendError('not-acceptable', sid, element, client)
246 return False 258 self.terminateStream(session_data, client, reason)
247 return True 259 return
248 260
249 def sendNotAcceptableError(self, iq_id, to_jid, xmlstream): 261 # we can now ack success
250 """Not acceptable error used when the stream is not expected or something is going wrong 262 if element.name == 'iq':
251 @param iq_id: IQ id 263 iq_result_elt = xmlstream.toResponse(element, 'result')
252 @param to_jid: addressee 264 client.xmlstream.send(iq_result_elt)
253 @param xmlstream: XML stream to use to send the error""" 265
254 result = domish.Element((None, 'iq')) 266 def _sendError(self, error_condition, sid, iq_elt, client):
255 result['type'] = 'result' 267 """Send error stanza
256 result['id'] = iq_id 268
257 result['to'] = to_jid 269 @param error_condition: one of twisted.words.protocols.jabber.error.STANZA_CONDITIONS keys
258 error_el = result.addElement('error') 270 @param sid(unicode,None): jingle session id, or None, if session must not be destroyed
259 error_el['type'] = 'cancel' 271 @param iq_elt(domish.Element): full <iq> stanza
260 error_el.addElement(('urn:ietf:params:xml:ns:xmpp-stanzas', 'not-acceptable')) 272 @param client: %(doc_client)s
261 xmlstream.send(result) 273 """
262 274 iq_elt = error.StanzaError(error_condition).toResponse(iq_elt)
263 def startStream(self, file_obj, to_jid, sid, length, successCb, failureCb, size=None, profile=None): 275 log.warning(u"Error while managing in-band bytestream session, cancelling: {}".format(error_condition))
276 if sid is not None:
277 self._killSession(sid, client, error_condition)
278 client.xmlstream.send(iq_elt)
279
280 def startStream(self, file_obj, to_jid, sid, block_size=None, profile=None):
264 """Launch the stream workflow 281 """Launch the stream workflow
265 @param file_obj: file_obj to send 282
266 @param to_jid: JID of the recipient 283 @param file_obj(file): file_obj to send
267 @param sid: Stream session id 284 @param to_jid(jid.JID): JID of the recipient
268 @param length: number of byte to send, or None to send until the end 285 @param sid(unicode): Stream session id
269 @param successCb: method to call when stream successfuly finished 286 @param block_size(int, None): size of the block (or None for default)
270 @param failureCb: method to call when something goes wrong 287 @param profile: %(doc_profile)s
271 @param profile: %(doc_profile)s""" 288 """
272 client = self.host.getClient(profile) 289 session_data = self._createSession(file_obj, to_jid, sid, profile)
273 if length is not None: 290 session_defer = session_data[DEFER_KEY]
274 log.error(_('stream length not managed yet')) 291 client = self.host.getClient(profile)
275 return 292
276 data = client.xep_0047_current_stream[sid] = {} 293 if block_size is None:
277 data["timer"] = reactor.callLater(TIMEOUT, self._timeOut, sid) 294 block_size = XEP_0047.BLOCK_SIZE
278 data["file_obj"] = file_obj 295 assert block_size <= 65535
279 data["to"] = to_jid 296 session_data["block_size"] = block_size
280 data["success_cb"] = successCb 297
281 data["failure_cb"] = failureCb
282 data["block_size"] = BLOCK_SIZE
283 if size:
284 data["size"] = size
285 self.host.registerProgressCB(sid, self.getProgress, profile)
286 iq_elt = jabber_client.IQ(client.xmlstream, 'set') 298 iq_elt = jabber_client.IQ(client.xmlstream, 'set')
287 iq_elt['from'] = client.jid.full() 299 iq_elt['from'] = client.jid.full()
288 iq_elt['to'] = to_jid.full() 300 iq_elt['to'] = to_jid.full()
289 open_elt = iq_elt.addElement('open', NS_IBB) 301 open_elt = iq_elt.addElement('open', NS_IBB)
290 open_elt['block-size'] = str(BLOCK_SIZE) 302 open_elt['block-size'] = str(block_size)
291 open_elt['sid'] = sid 303 open_elt['sid'] = sid
292 open_elt['stanza'] = 'iq' 304 open_elt['stanza'] = 'iq' # TODO: manage <message> stanza ?
293 iq_elt.addCallback(self.iqResult, sid, 0, length, profile) 305 iq_elt.addCallback(self._IQDataStream, session_data, client)
294 iq_elt.send() 306 iq_elt.send()
295 307 return session_defer
296 def iqResult(self, sid, seq, length, profile, iq_elt): 308
297 """Called when the result of open iq is received""" 309 def _IQDataStream(self, session_data, client, iq_elt):
298 client = self.host.getClient(profile) 310 """Called during the whole data streaming
299 data = client.xep_0047_current_stream[sid] 311
300 if iq_elt["type"] == "error": 312 @param session_data(dict): data of this streaming session
301 log.warning(_("Transfer failed")) 313 @param client: %(doc_client)s
302 self.terminateStream(sid, "IQ_ERROR") 314 @param iq_elt(domish.Element): iq result
303 return 315 """
304 316 if iq_elt['type'] == 'error':
305 if data['timer'].active(): 317 log.warning(_(u"IBB transfer failed: {}").format(iq_elt))
306 data['timer'].cancel() 318 self.terminateStream(session_data, client, "IQ_ERROR")
307 319 return
308 buffer = data["file_obj"].read(data["block_size"]) 320
309 if buffer: 321 session_data["timer"].reset(TIMEOUT)
322
323 buffer_ = session_data["file_obj"].read(session_data["block_size"])
324 if buffer_:
310 next_iq_elt = jabber_client.IQ(client.xmlstream, 'set') 325 next_iq_elt = jabber_client.IQ(client.xmlstream, 'set')
311 next_iq_elt['to'] = data["to"].full() 326 next_iq_elt['to'] = session_data["to"].full()
312 data_elt = next_iq_elt.addElement('data', NS_IBB) 327 data_elt = next_iq_elt.addElement('data', NS_IBB)
313 data_elt['seq'] = str(seq) 328 seq = session_data['seq'] = (session_data['seq'] + 1) % 65535
314 data_elt['sid'] = sid 329 data_elt['seq'] = unicode(seq)
315 data_elt.addContent(base64.b64encode(buffer)) 330 data_elt['sid'] = session_data['id']
316 next_iq_elt.addCallback(self.iqResult, sid, seq + 1, length, profile) 331 data_elt.addContent(base64.b64encode(buffer_))
332 next_iq_elt.addCallback(self._IQDataStream, session_data, client)
317 next_iq_elt.send() 333 next_iq_elt.send()
318 else: 334 else:
319 self.terminateStream(sid, profile=profile) 335 self.terminateStream(session_data, client)
320 336
321 def terminateStream(self, sid, failure_reason=None, profile=None): 337 def terminateStream(self, session_data, client, failure_reason=None):
322 """Terminate the stream session 338 """Terminate the stream session
323 @param to_jid: recipient 339
324 @param sid: Session id 340 @param session_data(dict): data of this streaming session
325 @param file_obj: file object used 341 @param client: %(doc_client)s
326 @param xmlstream: XML stream used with this session 342 @param failure_reason(unicode, None): reason of the failure, or None if steam was successful
327 @param progress_cb: True if we have to remove the progress callback 343 """
328 @param callback: method to call after finishing
329 @param failure_reason: reason of the failure, or None if steam was successful"""
330 client = self.host.getClient(profile)
331 data = client.xep_0047_current_stream[sid]
332 iq_elt = jabber_client.IQ(client.xmlstream, 'set') 344 iq_elt = jabber_client.IQ(client.xmlstream, 'set')
333 iq_elt['to'] = data["to"].full() 345 iq_elt['to'] = session_data["to"].full()
334 close_elt = iq_elt.addElement('close', NS_IBB) 346 close_elt = iq_elt.addElement('close', NS_IBB)
335 close_elt['sid'] = sid 347 close_elt['sid'] = session_data['id']
336 iq_elt.send() 348 iq_elt.send()
337 self.host.removeProgressCB(sid, profile) 349 self._killSession(session_data['id'], client, failure_reason)
338 if failure_reason:
339 self._killId(sid, False, failure_reason, profile=profile)
340 else:
341 self._killId(sid, True, profile=profile)
342 350
343 351
344 class XEP_0047_handler(XMPPHandler): 352 class XEP_0047_handler(XMPPHandler):
345 implements(iwokkel.IDisco) 353 implements(iwokkel.IDisco)
346 354
347 def __init__(self, parent): 355 def __init__(self, parent):
348 self.plugin_parent = parent 356 self.plugin_parent = parent
349 357
350 def connectionInitialized(self): 358 def connectionInitialized(self):
351 self.xmlstream.addObserver(IBB_OPEN, self.plugin_parent.streamOpening, profile=self.parent.profile) 359 self.xmlstream.addObserver(IBB_OPEN, self.plugin_parent._onIBBOpen, profile=self.parent.profile)
352 360
353 def getDiscoInfo(self, requestor, target, nodeIdentifier=''): 361 def getDiscoInfo(self, requestor, target, nodeIdentifier=''):
354 return [disco.DiscoFeature(NS_IBB)] 362 return [disco.DiscoFeature(NS_IBB)]
355 363
356 def getDiscoItems(self, requestor, target, nodeIdentifier=''): 364 def getDiscoItems(self, requestor, target, nodeIdentifier=''):