comparison src/plugins/plugin_xep_0047.py @ 538:2c4016921403

core, frontends, bridgen plugins: fixed methods which were unproperly managing multi-profiles - added profile argument to askConfirmation, actionResult, actionResultExt, entityDataUpdated, confirmationAnswer, getProgress - core, frontends: fixed calls/signals according to new bridge API - user of proper profile namespace for progression indicators and dialogs - memory: getParam* now return bool when param type is bool - memory: added getStringParam* to return string instead of typed value - core, memory, storage, quick_frontend: getHistory now manage properly multi-profiles - plugins XEP-0047, XEP-0054, XEP-0065, XEP-0077, XEP-0096; multi-profiles proper handling
author Goffi <goffi@goffi.org>
date Sat, 10 Nov 2012 16:38:16 +0100
parents a31abb97310d
children ca13633d3b6b
comparison
equal deleted inserted replaced
537:28cddc96c4ed 538:2c4016921403
18 You should have received a copy of the GNU Affero General Public License 18 You should have received a copy of the GNU Affero General Public License
19 along with this program. If not, see <http://www.gnu.org/licenses/>. 19 along with this program. If not, see <http://www.gnu.org/licenses/>.
20 """ 20 """
21 21
22 from logging import debug, info, warning, error 22 from logging import debug, info, warning, error
23 from twisted.words.protocols.jabber import client, jid 23 from twisted.words.protocols.jabber import client as jabber_client, jid
24 from twisted.words.protocols.jabber import error as jab_error
25 from twisted.words.xish import domish 24 from twisted.words.xish import domish
26 import twisted.internet.error 25 import twisted.internet.error
27 from twisted.internet import reactor 26 from twisted.internet import reactor
27 from sat.core.exceptions import ProfileNotInCacheError
28 28
29 from wokkel import disco, iwokkel 29 from wokkel import disco, iwokkel
30 30
31 from zope.interface import implements 31 from zope.interface import implements
32 32
61 NAMESPACE = NS_IBB 61 NAMESPACE = NS_IBB
62 62
63 def __init__(self, host): 63 def __init__(self, host):
64 info(_("In-Band Bytestreams plugin initialization")) 64 info(_("In-Band Bytestreams plugin initialization"))
65 self.host = host 65 self.host = host
66 self.current_stream = {} #key: stream_id, value: data(dict)
67 66
68 def getHandler(self, profile): 67 def getHandler(self, profile):
69 return XEP_0047_handler(self) 68 return XEP_0047_handler(self)
70 69
71 def _timeOut(self, sid): 70 def profileConnected(self, profile):
71 client = self.host.getClient(profile)
72 if not client:
73 raise ProfileNotInCacheError
74 client.xep_0047_current_stream = {} #key: stream_id, value: data(dict)
75
76 def _timeOut(self, sid, profile):
72 """Delecte current_stream id, called after timeout 77 """Delecte current_stream id, called after timeout
73 @param id: id of self.current_stream""" 78 @param id: id of client.xep_0047_current_stream"""
74 info(_("In-Band Bytestream: TimeOut reached for id %s") % sid); 79 info(_("In-Band Bytestream: TimeOut reached for id %s [%s]") % (sid, profile));
75 self._killId(sid, False, "TIMEOUT") 80 self._killId(sid, False, "TIMEOUT", profile)
76 81
77 def _killId(self, sid, success=False, failure_reason="UNKNOWN"): 82 def _killId(self, sid, success=False, failure_reason="UNKNOWN", profile=None):
78 """Delete an current_stream id, clean up associated observers 83 """Delete an current_stream id, clean up associated observers
79 @param sid: id of self.current_stream""" 84 @param sid: id of client.xep_0047_current_stream"""
80 if not self.current_stream.has_key(sid): 85 assert(profile)
86 client = self.host.getClient(profile)
87 if not client:
88 warning(_("Client no more in cache"))
89 return
90 if not client.xep_0047_current_stream.has_key(sid):
81 warning(_("kill id called on a non existant id")) 91 warning(_("kill id called on a non existant id"))
82 return 92 return
83 if self.current_stream[sid].has_key("observer_cb"): 93 if client.xep_0047_current_stream[sid].has_key("observer_cb"):
84 xmlstream = self.current_stream[sid]["xmlstream"] 94 client.xmlstream.removeObserver(client.xep_0047_current_stream[sid]["event_data"], client.xep_0047_current_stream[sid]["observer_cb"])
85 xmlstream.removeObserver(self.current_stream[sid]["event_data"], self.current_stream[sid]["observer_cb"]) 95 if client.xep_0047_current_stream[sid]['timer'].active():
86 if self.current_stream[sid]['timer'].active(): 96 client.xep_0047_current_stream[sid]['timer'].cancel()
87 self.current_stream[sid]['timer'].cancel() 97 if client.xep_0047_current_stream[sid].has_key("size"):
88 if self.current_stream[sid].has_key("size"): 98 self.host.removeProgressCB(sid, profile)
89 self.host.removeProgressCB(sid)
90 99
91 file_obj = self.current_stream[sid]['file_obj'] 100 file_obj = client.xep_0047_current_stream[sid]['file_obj']
92 success_cb = self.current_stream[sid]['success_cb'] 101 success_cb = client.xep_0047_current_stream[sid]['success_cb']
93 failure_cb = self.current_stream[sid]['failure_cb'] 102 failure_cb = client.xep_0047_current_stream[sid]['failure_cb']
94 103
95 del self.current_stream[sid] 104 del client.xep_0047_current_stream[sid]
96 105
97 if success: 106 if success:
98 success_cb(sid, file_obj, NS_IBB) 107 success_cb(sid, file_obj, NS_IBB, profile)
99 else: 108 else:
100 failure_cb(sid, file_obj, NS_IBB, failure_reason) 109 failure_cb(sid, file_obj, NS_IBB, failure_reason, profile)
101 110
102 def getProgress(self, sid, data): 111 def getProgress(self, sid, data, profile):
103 """Fill data with position of current transfer""" 112 """Fill data with position of current transfer"""
113 client = self.host.getClient(profile)
114 if not client:
115 raise ProfileNotInCacheError
104 try: 116 try:
105 file_obj = self.current_stream[sid]["file_obj"] 117 file_obj = client.xep_0047_current_stream[sid]["file_obj"]
106 data["position"] = str(file_obj.tell()) 118 data["position"] = str(file_obj.tell())
107 data["size"] = str(self.current_stream[sid]["size"]) 119 data["size"] = str(client.xep_0047_current_stream[sid]["size"])
108 except: 120 except:
109 pass 121 pass
110 122
111 def prepareToReceive(self, from_jid, sid, file_obj, size, success_cb, failure_cb): 123 def prepareToReceive(self, from_jid, sid, file_obj, size, success_cb, failure_cb, profile):
112 """Called when a bytestream is imminent 124 """Called when a bytestream is imminent
113 @param from_jid: jid of the sender 125 @param from_jid: jid of the sender
114 @param sid: Stream id 126 @param sid: Stream id
115 @param file_obj: File object where data will be written 127 @param file_obj: File object where data will be written
116 @param size: full size of the data, or None if unknown 128 @param size: full size of the data, or None if unknown
117 @param success_cb: method to call when successfuly finished 129 @param success_cb: method to call when successfuly finished
118 @param failure_cb: method to call when something goes wrong""" 130 @param failure_cb: method to call when something goes wrong
119 data = self.current_stream[sid] = {} 131 @param profile: %(doc_profile)s"""
132 client = self.host.getClient(profile)
133 if not client:
134 raise ProfileNotInCacheError
135 data = client.xep_0047_current_stream[sid] = {}
120 data["from"] = from_jid 136 data["from"] = from_jid
121 data["file_obj"] = file_obj 137 data["file_obj"] = file_obj
122 data["seq"] = -1 138 data["seq"] = -1
123 if size: 139 if size:
124 data["size"] = size 140 data["size"] = size
125 self.host.registerProgressCB(sid, self.getProgress) 141 self.host.registerProgressCB(sid, self.getProgress, profile)
126 data["timer"] = reactor.callLater(TIMEOUT, self._timeOut, sid) 142 data["timer"] = reactor.callLater(TIMEOUT, self._timeOut, sid)
127 data["success_cb"] = success_cb 143 data["success_cb"] = success_cb
128 data["failure_cb"] = failure_cb 144 data["failure_cb"] = failure_cb
129 145
130 def streamOpening(self, IQ, profile): 146 def streamOpening(self, IQ, profile):
131 debug(_("IBB stream opening")) 147 debug(_("IBB stream opening"))
132 IQ.handled=True 148 IQ.handled=True
133 profile_jid, xmlstream = self.host.getJidNStream(profile) 149 client = self.host.getClient(profile)
150 if not client:
151 raise ProfileNotInCacheError
134 open_elt = IQ.firstChildElement() 152 open_elt = IQ.firstChildElement()
135 block_size = open_elt.getAttribute('block-size') 153 block_size = open_elt.getAttribute('block-size')
136 sid = open_elt.getAttribute('sid') 154 sid = open_elt.getAttribute('sid')
137 stanza = open_elt.getAttribute('stanza', 'iq') 155 stanza = open_elt.getAttribute('stanza', 'iq')
138 if not sid or not block_size or int(block_size)>65535: 156 if not sid or not block_size or int(block_size)>65535:
139 warning(_("malformed IBB transfer: %s" % IQ['id'])) 157 warning(_("malformed IBB transfer: %s" % IQ['id']))
140 self.sendNotAcceptableError(IQ['id'], IQ['from'], xmlstream) 158 self.sendNotAcceptableError(IQ['id'], IQ['from'], client.xmlstream)
141 return 159 return
142 if not sid in self.current_stream: 160 if not sid in client.xep_0047_current_stream:
143 warning(_("Ignoring unexpected IBB transfer: %s" % sid)) 161 warning(_("Ignoring unexpected IBB transfer: %s" % sid))
144 self.sendNotAcceptableError(IQ['id'], IQ['from'], xmlstream) 162 self.sendNotAcceptableError(IQ['id'], IQ['from'], client.xmlstream)
145 return 163 return
146 if self.current_stream[sid]["from"] != jid.JID(IQ['from']): 164 if client.xep_0047_current_stream[sid]["from"] != jid.JID(IQ['from']):
147 warning(_("sended jid inconsistency (man in the middle attack attempt ?)")) 165 warning(_("sended jid inconsistency (man in the middle attack attempt ?)"))
148 self.sendNotAcceptableError(IQ['id'], IQ['from'], xmlstream) 166 self.sendNotAcceptableError(IQ['id'], IQ['from'], client.xmlstream)
149 self._killId(sid, False, "PROTOCOL_ERROR") 167 self._killId(sid, False, "PROTOCOL_ERROR", profile=profile)
150 return 168 return
151 169
152 #at this stage, the session looks ok and will be accepted 170 #at this stage, the session looks ok and will be accepted
153 171
154 #we reset the timeout: 172 #we reset the timeout:
155 self.current_stream[sid]["timer"].reset(TIMEOUT) 173 client.xep_0047_current_stream[sid]["timer"].reset(TIMEOUT)
156 174
157 #we save the xmlstream, events and observer data to allow observer removal 175 #we save the xmlstream, events and observer data to allow observer removal
158 self.current_stream[sid]["xmlstream"] = xmlstream 176 client.xep_0047_current_stream[sid]["event_data"] = event_data = (IBB_MESSAGE_DATA if stanza=='message' else IBB_IQ_DATA) % sid
159 self.current_stream[sid]["event_data"] = event_data = (IBB_MESSAGE_DATA if stanza=='message' else IBB_IQ_DATA) % sid 177 client.xep_0047_current_stream[sid]["observer_cb"] = observer_cb = self.messageData if stanza=='message' else self.iqData
160 self.current_stream[sid]["observer_cb"] = observer_cb = self.messageData if stanza=='message' else self.iqData
161 event_close = IBB_CLOSE % sid 178 event_close = IBB_CLOSE % sid
162 #we now set the stream observer to look after data packet 179 #we now set the stream observer to look after data packet
163 xmlstream.addObserver(event_data, observer_cb, profile = profile) 180 client.xmlstream.addObserver(event_data, observer_cb, profile = profile)
164 xmlstream.addOnetimeObserver(event_close, self.streamClosing, profile = profile) 181 client.xmlstream.addOnetimeObserver(event_close, self.streamClosing, profile = profile)
165 #finally, we send the accept stanza 182 #finally, we send the accept stanza
166 result = domish.Element((None, 'iq')) 183 result = domish.Element((None, 'iq'))
167 result['type'] = 'result' 184 result['type'] = 'result'
168 result['id'] = IQ['id'] 185 result['id'] = IQ['id']
169 result['to'] = IQ['from'] 186 result['to'] = IQ['from']
170 xmlstream.send(result) 187 client.xmlstream.send(result)
171 188
172 def streamClosing(self, IQ, profile): 189 def streamClosing(self, IQ, profile):
173 IQ.handled=True 190 IQ.handled=True
191 client = self.host.getClient(profile)
192 if not client:
193 raise ProfileNotInCacheError
174 debug(_("IBB stream closing")) 194 debug(_("IBB stream closing"))
175 data_elt = IQ.firstChildElement() 195 data_elt = IQ.firstChildElement()
176 sid = data_elt.getAttribute('sid') 196 sid = data_elt.getAttribute('sid')
177 result = domish.Element((None, 'iq')) 197 result = domish.Element((None, 'iq'))
178 result['type'] = 'result' 198 result['type'] = 'result'
179 result['id'] = IQ['id'] 199 result['id'] = IQ['id']
180 result['to'] = IQ['from'] 200 result['to'] = IQ['from']
181 self.current_stream[sid]["xmlstream"].send(result) 201 client.xmlstream.send(result)
182 self._killId(sid, success=True) 202 self._killId(sid, success=True, profile=profile)
183 203
184 def iqData(self, IQ, profile): 204 def iqData(self, IQ, profile):
185 IQ.handled=True 205 IQ.handled=True
206 client = self.host.getClient(profile)
207 if not client:
208 raise ProfileNotInCacheError
186 data_elt = IQ.firstChildElement() 209 data_elt = IQ.firstChildElement()
187 210
188 if self._manageDataElt(data_elt, 'iq', IQ['id'], jid.JID(IQ['from'])): 211 if self._manageDataElt(data_elt, 'iq', IQ['id'], jid.JID(IQ['from']), profile):
189 #and send a success answer 212 #and send a success answer
190 result = domish.Element((None, 'iq')) 213 result = domish.Element((None, 'iq'))
191 result['type'] = 'result' 214 result['type'] = 'result'
192 result['id'] = IQ['id'] 215 result['id'] = IQ['id']
193 result['to'] = IQ['from'] 216 result['to'] = IQ['from']
194 _jid, xmlstream = self.host.getJidNStream(profile) 217
195 xmlstream.send(result) 218 client.xmlstream.send(result)
196 219
197 def messageData(self, message_elt, profile): 220 def messageData(self, message_elt, profile):
198 data_elt = message_elt.firstChildElement() 221 data_elt = message_elt.firstChildElement()
199 sid = message_elt.getAttribute('id','') 222 sid = message_elt.getAttribute('id','')
200 self._manageDataElt(message_elt, 'message', sid, jid.JID(message_elt['from'])) 223 self._manageDataElt(message_elt, 'message', sid, jid.JID(message_elt['from']), profile)
201 224
202 def _manageDataElt(self, data_elt, stanza, sid, stanza_from_jid): 225 def _manageDataElt(self, data_elt, stanza, sid, stanza_from_jid, profile):
203 """Manage the data elelement (check validity and write to the file_obj) 226 """Manage the data elelement (check validity and write to the file_obj)
204 @param data_elt: "data" domish element 227 @param data_elt: "data" domish element
205 @return: True if success""" 228 @return: True if success"""
229 client = self.host.getClient(profile)
230 if not client:
231 raise ProfileNotInCacheError
206 sid = data_elt.getAttribute('sid') 232 sid = data_elt.getAttribute('sid')
207 if sid not in self.current_stream: 233 if sid not in client.xep_0047_current_stream:
208 error(_("Received data for an unknown session id")) 234 error(_("Received data for an unknown session id"))
209 return False 235 return False
210 xmlstream = self.current_stream[sid]["xmlstream"] 236
211 237 from_jid = client.xep_0047_current_stream[sid]["from"]
212 from_jid = self.current_stream[sid]["from"] 238 file_obj = client.xep_0047_current_stream[sid]["file_obj"]
213 file_obj = self.current_stream[sid]["file_obj"]
214 239
215 if stanza_from_jid != from_jid: 240 if stanza_from_jid != from_jid:
216 warning(_("sended jid inconsistency (man in the middle attack attempt ?)")) 241 warning(_("sended jid inconsistency (man in the middle attack attempt ?)"))
217 if stanza=='iq': 242 if stanza=='iq':
218 self.sendNotAcceptableError(sid, from_jid, xmlstream) 243 self.sendNotAcceptableError(sid, from_jid, client.xmlstream)
219 return False 244 return False
220 245
221 self.current_stream[sid]["seq"]+=1 246 client.xep_0047_current_stream[sid]["seq"]+=1
222 if int(data_elt.getAttribute("seq",-1)) != self.current_stream[sid]["seq"]: 247 if int(data_elt.getAttribute("seq",-1)) != client.xep_0047_current_stream[sid]["seq"]:
223 warning(_("Sequence error")) 248 warning(_("Sequence error"))
224 if stanza=='iq': 249 if stanza=='iq':
225 self.sendNotAcceptableError(IQ["id"], from_jid, xmlstream) 250 self.sendNotAcceptableError(data_elt["id"], from_jid, client.xmlstream)
226 return False 251 return False
227 252
228 #we reset the timeout: 253 #we reset the timeout:
229 self.current_stream[sid]["timer"].reset(TIMEOUT) 254 client.xep_0047_current_stream[sid]["timer"].reset(TIMEOUT)
230 255
231 #we can now decode the data 256 #we can now decode the data
232 try: 257 try:
233 file_obj.write(base64.b64decode(str(data_elt))) 258 file_obj.write(base64.b64decode(str(data_elt)))
234 except TypeError: 259 except TypeError:
235 #The base64 data is invalid 260 #The base64 data is invalid
236 warning(_("Invalid base64 data")) 261 warning(_("Invalid base64 data"))
237 if stanza=='iq': 262 if stanza=='iq':
238 self.sendNotAcceptableError(IQ["id"], from_jid, xmlstream) 263 self.sendNotAcceptableError(data_elt["id"], from_jid, client.xmlstream)
239 return False 264 return False
240 return True 265 return True
241 266
242 def sendNotAcceptableError(self, iq_id, to_jid, xmlstream): 267 def sendNotAcceptableError(self, iq_id, to_jid, xmlstream):
243 """Not acceptable error used when the stream is not expected or something is going wrong 268 """Not acceptable error used when the stream is not expected or something is going wrong
251 error_el = result.addElement('error') 276 error_el = result.addElement('error')
252 error_el['type'] = 'cancel' 277 error_el['type'] = 'cancel'
253 error_el.addElement(('urn:ietf:params:xml:ns:xmpp-stanzas','not-acceptable')) 278 error_el.addElement(('urn:ietf:params:xml:ns:xmpp-stanzas','not-acceptable'))
254 xmlstream.send(result) 279 xmlstream.send(result)
255 280
256 def startStream(self, file_obj, to_jid, sid, length, successCb, failureCb, size = None, profile='@NONE@'): 281 def startStream(self, file_obj, to_jid, sid, length, successCb, failureCb, size = None, profile=None):
257 """Launch the stream workflow 282 """Launch the stream workflow
258 @param file_obj: file_obj to send 283 @param file_obj: file_obj to send
259 @param to_jid: JID of the recipient 284 @param to_jid: JID of the recipient
260 @param sid: Stream session id 285 @param sid: Stream session id
261 @param length: number of byte to send, or None to send until the end 286 @param length: number of byte to send, or None to send until the end
262 @param successCb: method to call when stream successfuly finished 287 @param successCb: method to call when stream successfuly finished
263 @param failureCb: method to call when something goes wrong 288 @param failureCb: method to call when something goes wrong
264 @param profile: %(doc_profile)s""" 289 @param profile: %(doc_profile)s"""
290 client = self.host.getClient(profile)
291 if not client:
292 raise ProfileNotInCacheError
265 if length != None: 293 if length != None:
266 error(_('stream length not managed yet')) 294 error(_('stream length not managed yet'))
267 return; 295 return;
268 profile_jid, xmlstream = self.host.getJidNStream(profile) 296 data = client.xep_0047_current_stream[sid] = {}
269 data = self.current_stream[sid] = {}
270 data["timer"] = reactor.callLater(TIMEOUT, self._timeOut, sid) 297 data["timer"] = reactor.callLater(TIMEOUT, self._timeOut, sid)
271 data["file_obj"] = file_obj 298 data["file_obj"] = file_obj
272 data["to"] = to_jid 299 data["to"] = to_jid
273 data["success_cb"] = successCb 300 data["success_cb"] = successCb
274 data["failure_cb"] = failureCb 301 data["failure_cb"] = failureCb
275 data["xmlstream"] = xmlstream
276 data["block_size"] = BLOCK_SIZE 302 data["block_size"] = BLOCK_SIZE
277 if size: 303 if size:
278 data["size"] = size 304 data["size"] = size
279 self.host.registerProgressCB(sid, self.getProgress) 305 self.host.registerProgressCB(sid, self.getProgress, profile)
280 iq_elt = client.IQ(xmlstream,'set') 306 iq_elt = jabber_client.IQ(client.xmlstream,'set')
281 iq_elt['from'] = profile_jid.full() 307 iq_elt['from'] = client.jid.full()
282 iq_elt['to'] = to_jid.full() 308 iq_elt['to'] = to_jid.full()
283 open_elt = iq_elt.addElement('open',NS_IBB) 309 open_elt = iq_elt.addElement('open',NS_IBB)
284 open_elt['block-size'] = str(BLOCK_SIZE) 310 open_elt['block-size'] = str(BLOCK_SIZE)
285 open_elt['sid'] = sid 311 open_elt['sid'] = sid
286 open_elt['stanza'] = 'iq' 312 open_elt['stanza'] = 'iq'
287 iq_elt.addCallback(self.iqResult, sid, 0, length) 313 iq_elt.addCallback(self.iqResult, sid, 0, length, profile)
288 iq_elt.send() 314 iq_elt.send()
289 315
290 def iqResult(self, sid, seq, length, iq_elt): 316 def iqResult(self, sid, seq, length, profile, iq_elt):
291 """Called when the result of open iq is received""" 317 """Called when the result of open iq is received"""
292 data = self.current_stream[sid] 318 client = self.host.getClient(profile)
319 if not client:
320 raise ProfileNotInCacheError
321 data = client.xep_0047_current_stream[sid]
293 if iq_elt["type"] == "error": 322 if iq_elt["type"] == "error":
294 warning(_("Transfer failed")) 323 warning(_("Transfer failed"))
295 self.terminateStream(sid, "IQ_ERROR") 324 self.terminateStream(sid, "IQ_ERROR")
296 return 325 return
297 326
298 if data['timer'].active(): 327 if data['timer'].active():
299 data['timer'].cancel() 328 data['timer'].cancel()
300 329
301 buffer = data["file_obj"].read(data["block_size"]) 330 buffer = data["file_obj"].read(data["block_size"])
302 if buffer: 331 if buffer:
303 next_iq_elt = client.IQ(data["xmlstream"],'set') 332 next_iq_elt = jabber_client.IQ(client.xmlstream,'set')
304 next_iq_elt['to'] = data["to"].full() 333 next_iq_elt['to'] = data["to"].full()
305 data_elt = next_iq_elt.addElement('data', NS_IBB) 334 data_elt = next_iq_elt.addElement('data', NS_IBB)
306 data_elt['seq'] = str(seq) 335 data_elt['seq'] = str(seq)
307 data_elt['sid'] = sid 336 data_elt['sid'] = sid
308 data_elt.addContent(base64.b64encode(buffer)) 337 data_elt.addContent(base64.b64encode(buffer))
309 next_iq_elt.addCallback(self.iqResult, sid, seq+1, length) 338 next_iq_elt.addCallback(self.iqResult, sid, seq+1, length, profile)
310 next_iq_elt.send() 339 next_iq_elt.send()
311 else: 340 else:
312 self.terminateStream(sid) 341 self.terminateStream(sid, profile=profile)
313 342
314 def terminateStream(self, sid, failure_reason = None): 343 def terminateStream(self, sid, failure_reason = None, profile=None):
315 """Terminate the stream session 344 """Terminate the stream session
316 @param to_jid: recipient 345 @param to_jid: recipient
317 @param sid: Session id 346 @param sid: Session id
318 @param file_obj: file object used 347 @param file_obj: file object used
319 @param xmlstream: XML stream used with this session 348 @param xmlstream: XML stream used with this session
320 @param progress_cb: True if we have to remove the progress callback 349 @param progress_cb: True if we have to remove the progress callback
321 @param callback: method to call after finishing 350 @param callback: method to call after finishing
322 @param failure_reason: reason of the failure, or None if steam was successful""" 351 @param failure_reason: reason of the failure, or None if steam was successful"""
323 data = self.current_stream[sid] 352 client = self.host.getClient(profile)
324 iq_elt = client.IQ(data["xmlstream"],'set') 353 if not client:
354 raise ProfileNotInCacheError
355 data = client.xep_0047_current_stream[sid]
356 iq_elt = jabber_client.IQ(client.xmlstream,'set')
325 iq_elt['to'] = data["to"].full() 357 iq_elt['to'] = data["to"].full()
326 close_elt = iq_elt.addElement('close',NS_IBB) 358 close_elt = iq_elt.addElement('close',NS_IBB)
327 close_elt['sid'] = sid 359 close_elt['sid'] = sid
328 iq_elt.send() 360 iq_elt.send()
329 self.host.removeProgressCB(sid) 361 self.host.removeProgressCB(sid, profile)
330 if failure_reason: 362 if failure_reason:
331 self._killId(sid, False, failure_reason) 363 self._killId(sid, False, failure_reason, profile=profile)
332 else: 364 else:
333 self._killId(sid, True) 365 self._killId(sid, True, profile=profile)
334 366
335 class XEP_0047_handler(XMPPHandler): 367 class XEP_0047_handler(XMPPHandler):
336 implements(iwokkel.IDisco) 368 implements(iwokkel.IDisco)
337 369
338 def __init__(self,parent): 370 def __init__(self,parent):