Mercurial > libervia-backend
comparison src/plugins/plugin_xep_0047.py @ 538:2c4016921403
core, frontends, bridgen plugins: fixed methods which were unproperly managing multi-profiles
- added profile argument to askConfirmation, actionResult, actionResultExt, entityDataUpdated, confirmationAnswer, getProgress
- core, frontends: fixed calls/signals according to new bridge API
- user of proper profile namespace for progression indicators and dialogs
- memory: getParam* now return bool when param type is bool
- memory: added getStringParam* to return string instead of typed value
- core, memory, storage, quick_frontend: getHistory now manage properly multi-profiles
- plugins XEP-0047, XEP-0054, XEP-0065, XEP-0077, XEP-0096; multi-profiles proper handling
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 10 Nov 2012 16:38:16 +0100 |
parents | a31abb97310d |
children | ca13633d3b6b |
comparison
equal
deleted
inserted
replaced
537:28cddc96c4ed | 538:2c4016921403 |
---|---|
18 You should have received a copy of the GNU Affero General Public License | 18 You should have received a copy of the GNU Affero General Public License |
19 along with this program. If not, see <http://www.gnu.org/licenses/>. | 19 along with this program. If not, see <http://www.gnu.org/licenses/>. |
20 """ | 20 """ |
21 | 21 |
22 from logging import debug, info, warning, error | 22 from logging import debug, info, warning, error |
23 from twisted.words.protocols.jabber import client, jid | 23 from twisted.words.protocols.jabber import client as jabber_client, jid |
24 from twisted.words.protocols.jabber import error as jab_error | |
25 from twisted.words.xish import domish | 24 from twisted.words.xish import domish |
26 import twisted.internet.error | 25 import twisted.internet.error |
27 from twisted.internet import reactor | 26 from twisted.internet import reactor |
27 from sat.core.exceptions import ProfileNotInCacheError | |
28 | 28 |
29 from wokkel import disco, iwokkel | 29 from wokkel import disco, iwokkel |
30 | 30 |
31 from zope.interface import implements | 31 from zope.interface import implements |
32 | 32 |
61 NAMESPACE = NS_IBB | 61 NAMESPACE = NS_IBB |
62 | 62 |
63 def __init__(self, host): | 63 def __init__(self, host): |
64 info(_("In-Band Bytestreams plugin initialization")) | 64 info(_("In-Band Bytestreams plugin initialization")) |
65 self.host = host | 65 self.host = host |
66 self.current_stream = {} #key: stream_id, value: data(dict) | |
67 | 66 |
68 def getHandler(self, profile): | 67 def getHandler(self, profile): |
69 return XEP_0047_handler(self) | 68 return XEP_0047_handler(self) |
70 | 69 |
71 def _timeOut(self, sid): | 70 def profileConnected(self, profile): |
71 client = self.host.getClient(profile) | |
72 if not client: | |
73 raise ProfileNotInCacheError | |
74 client.xep_0047_current_stream = {} #key: stream_id, value: data(dict) | |
75 | |
76 def _timeOut(self, sid, profile): | |
72 """Delecte current_stream id, called after timeout | 77 """Delecte current_stream id, called after timeout |
73 @param id: id of self.current_stream""" | 78 @param id: id of client.xep_0047_current_stream""" |
74 info(_("In-Band Bytestream: TimeOut reached for id %s") % sid); | 79 info(_("In-Band Bytestream: TimeOut reached for id %s [%s]") % (sid, profile)); |
75 self._killId(sid, False, "TIMEOUT") | 80 self._killId(sid, False, "TIMEOUT", profile) |
76 | 81 |
77 def _killId(self, sid, success=False, failure_reason="UNKNOWN"): | 82 def _killId(self, sid, success=False, failure_reason="UNKNOWN", profile=None): |
78 """Delete an current_stream id, clean up associated observers | 83 """Delete an current_stream id, clean up associated observers |
79 @param sid: id of self.current_stream""" | 84 @param sid: id of client.xep_0047_current_stream""" |
80 if not self.current_stream.has_key(sid): | 85 assert(profile) |
86 client = self.host.getClient(profile) | |
87 if not client: | |
88 warning(_("Client no more in cache")) | |
89 return | |
90 if not client.xep_0047_current_stream.has_key(sid): | |
81 warning(_("kill id called on a non existant id")) | 91 warning(_("kill id called on a non existant id")) |
82 return | 92 return |
83 if self.current_stream[sid].has_key("observer_cb"): | 93 if client.xep_0047_current_stream[sid].has_key("observer_cb"): |
84 xmlstream = self.current_stream[sid]["xmlstream"] | 94 client.xmlstream.removeObserver(client.xep_0047_current_stream[sid]["event_data"], client.xep_0047_current_stream[sid]["observer_cb"]) |
85 xmlstream.removeObserver(self.current_stream[sid]["event_data"], self.current_stream[sid]["observer_cb"]) | 95 if client.xep_0047_current_stream[sid]['timer'].active(): |
86 if self.current_stream[sid]['timer'].active(): | 96 client.xep_0047_current_stream[sid]['timer'].cancel() |
87 self.current_stream[sid]['timer'].cancel() | 97 if client.xep_0047_current_stream[sid].has_key("size"): |
88 if self.current_stream[sid].has_key("size"): | 98 self.host.removeProgressCB(sid, profile) |
89 self.host.removeProgressCB(sid) | |
90 | 99 |
91 file_obj = self.current_stream[sid]['file_obj'] | 100 file_obj = client.xep_0047_current_stream[sid]['file_obj'] |
92 success_cb = self.current_stream[sid]['success_cb'] | 101 success_cb = client.xep_0047_current_stream[sid]['success_cb'] |
93 failure_cb = self.current_stream[sid]['failure_cb'] | 102 failure_cb = client.xep_0047_current_stream[sid]['failure_cb'] |
94 | 103 |
95 del self.current_stream[sid] | 104 del client.xep_0047_current_stream[sid] |
96 | 105 |
97 if success: | 106 if success: |
98 success_cb(sid, file_obj, NS_IBB) | 107 success_cb(sid, file_obj, NS_IBB, profile) |
99 else: | 108 else: |
100 failure_cb(sid, file_obj, NS_IBB, failure_reason) | 109 failure_cb(sid, file_obj, NS_IBB, failure_reason, profile) |
101 | 110 |
102 def getProgress(self, sid, data): | 111 def getProgress(self, sid, data, profile): |
103 """Fill data with position of current transfer""" | 112 """Fill data with position of current transfer""" |
113 client = self.host.getClient(profile) | |
114 if not client: | |
115 raise ProfileNotInCacheError | |
104 try: | 116 try: |
105 file_obj = self.current_stream[sid]["file_obj"] | 117 file_obj = client.xep_0047_current_stream[sid]["file_obj"] |
106 data["position"] = str(file_obj.tell()) | 118 data["position"] = str(file_obj.tell()) |
107 data["size"] = str(self.current_stream[sid]["size"]) | 119 data["size"] = str(client.xep_0047_current_stream[sid]["size"]) |
108 except: | 120 except: |
109 pass | 121 pass |
110 | 122 |
111 def prepareToReceive(self, from_jid, sid, file_obj, size, success_cb, failure_cb): | 123 def prepareToReceive(self, from_jid, sid, file_obj, size, success_cb, failure_cb, profile): |
112 """Called when a bytestream is imminent | 124 """Called when a bytestream is imminent |
113 @param from_jid: jid of the sender | 125 @param from_jid: jid of the sender |
114 @param sid: Stream id | 126 @param sid: Stream id |
115 @param file_obj: File object where data will be written | 127 @param file_obj: File object where data will be written |
116 @param size: full size of the data, or None if unknown | 128 @param size: full size of the data, or None if unknown |
117 @param success_cb: method to call when successfuly finished | 129 @param success_cb: method to call when successfuly finished |
118 @param failure_cb: method to call when something goes wrong""" | 130 @param failure_cb: method to call when something goes wrong |
119 data = self.current_stream[sid] = {} | 131 @param profile: %(doc_profile)s""" |
132 client = self.host.getClient(profile) | |
133 if not client: | |
134 raise ProfileNotInCacheError | |
135 data = client.xep_0047_current_stream[sid] = {} | |
120 data["from"] = from_jid | 136 data["from"] = from_jid |
121 data["file_obj"] = file_obj | 137 data["file_obj"] = file_obj |
122 data["seq"] = -1 | 138 data["seq"] = -1 |
123 if size: | 139 if size: |
124 data["size"] = size | 140 data["size"] = size |
125 self.host.registerProgressCB(sid, self.getProgress) | 141 self.host.registerProgressCB(sid, self.getProgress, profile) |
126 data["timer"] = reactor.callLater(TIMEOUT, self._timeOut, sid) | 142 data["timer"] = reactor.callLater(TIMEOUT, self._timeOut, sid) |
127 data["success_cb"] = success_cb | 143 data["success_cb"] = success_cb |
128 data["failure_cb"] = failure_cb | 144 data["failure_cb"] = failure_cb |
129 | 145 |
130 def streamOpening(self, IQ, profile): | 146 def streamOpening(self, IQ, profile): |
131 debug(_("IBB stream opening")) | 147 debug(_("IBB stream opening")) |
132 IQ.handled=True | 148 IQ.handled=True |
133 profile_jid, xmlstream = self.host.getJidNStream(profile) | 149 client = self.host.getClient(profile) |
150 if not client: | |
151 raise ProfileNotInCacheError | |
134 open_elt = IQ.firstChildElement() | 152 open_elt = IQ.firstChildElement() |
135 block_size = open_elt.getAttribute('block-size') | 153 block_size = open_elt.getAttribute('block-size') |
136 sid = open_elt.getAttribute('sid') | 154 sid = open_elt.getAttribute('sid') |
137 stanza = open_elt.getAttribute('stanza', 'iq') | 155 stanza = open_elt.getAttribute('stanza', 'iq') |
138 if not sid or not block_size or int(block_size)>65535: | 156 if not sid or not block_size or int(block_size)>65535: |
139 warning(_("malformed IBB transfer: %s" % IQ['id'])) | 157 warning(_("malformed IBB transfer: %s" % IQ['id'])) |
140 self.sendNotAcceptableError(IQ['id'], IQ['from'], xmlstream) | 158 self.sendNotAcceptableError(IQ['id'], IQ['from'], client.xmlstream) |
141 return | 159 return |
142 if not sid in self.current_stream: | 160 if not sid in client.xep_0047_current_stream: |
143 warning(_("Ignoring unexpected IBB transfer: %s" % sid)) | 161 warning(_("Ignoring unexpected IBB transfer: %s" % sid)) |
144 self.sendNotAcceptableError(IQ['id'], IQ['from'], xmlstream) | 162 self.sendNotAcceptableError(IQ['id'], IQ['from'], client.xmlstream) |
145 return | 163 return |
146 if self.current_stream[sid]["from"] != jid.JID(IQ['from']): | 164 if client.xep_0047_current_stream[sid]["from"] != jid.JID(IQ['from']): |
147 warning(_("sended jid inconsistency (man in the middle attack attempt ?)")) | 165 warning(_("sended jid inconsistency (man in the middle attack attempt ?)")) |
148 self.sendNotAcceptableError(IQ['id'], IQ['from'], xmlstream) | 166 self.sendNotAcceptableError(IQ['id'], IQ['from'], client.xmlstream) |
149 self._killId(sid, False, "PROTOCOL_ERROR") | 167 self._killId(sid, False, "PROTOCOL_ERROR", profile=profile) |
150 return | 168 return |
151 | 169 |
152 #at this stage, the session looks ok and will be accepted | 170 #at this stage, the session looks ok and will be accepted |
153 | 171 |
154 #we reset the timeout: | 172 #we reset the timeout: |
155 self.current_stream[sid]["timer"].reset(TIMEOUT) | 173 client.xep_0047_current_stream[sid]["timer"].reset(TIMEOUT) |
156 | 174 |
157 #we save the xmlstream, events and observer data to allow observer removal | 175 #we save the xmlstream, events and observer data to allow observer removal |
158 self.current_stream[sid]["xmlstream"] = xmlstream | 176 client.xep_0047_current_stream[sid]["event_data"] = event_data = (IBB_MESSAGE_DATA if stanza=='message' else IBB_IQ_DATA) % sid |
159 self.current_stream[sid]["event_data"] = event_data = (IBB_MESSAGE_DATA if stanza=='message' else IBB_IQ_DATA) % sid | 177 client.xep_0047_current_stream[sid]["observer_cb"] = observer_cb = self.messageData if stanza=='message' else self.iqData |
160 self.current_stream[sid]["observer_cb"] = observer_cb = self.messageData if stanza=='message' else self.iqData | |
161 event_close = IBB_CLOSE % sid | 178 event_close = IBB_CLOSE % sid |
162 #we now set the stream observer to look after data packet | 179 #we now set the stream observer to look after data packet |
163 xmlstream.addObserver(event_data, observer_cb, profile = profile) | 180 client.xmlstream.addObserver(event_data, observer_cb, profile = profile) |
164 xmlstream.addOnetimeObserver(event_close, self.streamClosing, profile = profile) | 181 client.xmlstream.addOnetimeObserver(event_close, self.streamClosing, profile = profile) |
165 #finally, we send the accept stanza | 182 #finally, we send the accept stanza |
166 result = domish.Element((None, 'iq')) | 183 result = domish.Element((None, 'iq')) |
167 result['type'] = 'result' | 184 result['type'] = 'result' |
168 result['id'] = IQ['id'] | 185 result['id'] = IQ['id'] |
169 result['to'] = IQ['from'] | 186 result['to'] = IQ['from'] |
170 xmlstream.send(result) | 187 client.xmlstream.send(result) |
171 | 188 |
172 def streamClosing(self, IQ, profile): | 189 def streamClosing(self, IQ, profile): |
173 IQ.handled=True | 190 IQ.handled=True |
191 client = self.host.getClient(profile) | |
192 if not client: | |
193 raise ProfileNotInCacheError | |
174 debug(_("IBB stream closing")) | 194 debug(_("IBB stream closing")) |
175 data_elt = IQ.firstChildElement() | 195 data_elt = IQ.firstChildElement() |
176 sid = data_elt.getAttribute('sid') | 196 sid = data_elt.getAttribute('sid') |
177 result = domish.Element((None, 'iq')) | 197 result = domish.Element((None, 'iq')) |
178 result['type'] = 'result' | 198 result['type'] = 'result' |
179 result['id'] = IQ['id'] | 199 result['id'] = IQ['id'] |
180 result['to'] = IQ['from'] | 200 result['to'] = IQ['from'] |
181 self.current_stream[sid]["xmlstream"].send(result) | 201 client.xmlstream.send(result) |
182 self._killId(sid, success=True) | 202 self._killId(sid, success=True, profile=profile) |
183 | 203 |
184 def iqData(self, IQ, profile): | 204 def iqData(self, IQ, profile): |
185 IQ.handled=True | 205 IQ.handled=True |
206 client = self.host.getClient(profile) | |
207 if not client: | |
208 raise ProfileNotInCacheError | |
186 data_elt = IQ.firstChildElement() | 209 data_elt = IQ.firstChildElement() |
187 | 210 |
188 if self._manageDataElt(data_elt, 'iq', IQ['id'], jid.JID(IQ['from'])): | 211 if self._manageDataElt(data_elt, 'iq', IQ['id'], jid.JID(IQ['from']), profile): |
189 #and send a success answer | 212 #and send a success answer |
190 result = domish.Element((None, 'iq')) | 213 result = domish.Element((None, 'iq')) |
191 result['type'] = 'result' | 214 result['type'] = 'result' |
192 result['id'] = IQ['id'] | 215 result['id'] = IQ['id'] |
193 result['to'] = IQ['from'] | 216 result['to'] = IQ['from'] |
194 _jid, xmlstream = self.host.getJidNStream(profile) | 217 |
195 xmlstream.send(result) | 218 client.xmlstream.send(result) |
196 | 219 |
197 def messageData(self, message_elt, profile): | 220 def messageData(self, message_elt, profile): |
198 data_elt = message_elt.firstChildElement() | 221 data_elt = message_elt.firstChildElement() |
199 sid = message_elt.getAttribute('id','') | 222 sid = message_elt.getAttribute('id','') |
200 self._manageDataElt(message_elt, 'message', sid, jid.JID(message_elt['from'])) | 223 self._manageDataElt(message_elt, 'message', sid, jid.JID(message_elt['from']), profile) |
201 | 224 |
202 def _manageDataElt(self, data_elt, stanza, sid, stanza_from_jid): | 225 def _manageDataElt(self, data_elt, stanza, sid, stanza_from_jid, profile): |
203 """Manage the data elelement (check validity and write to the file_obj) | 226 """Manage the data elelement (check validity and write to the file_obj) |
204 @param data_elt: "data" domish element | 227 @param data_elt: "data" domish element |
205 @return: True if success""" | 228 @return: True if success""" |
229 client = self.host.getClient(profile) | |
230 if not client: | |
231 raise ProfileNotInCacheError | |
206 sid = data_elt.getAttribute('sid') | 232 sid = data_elt.getAttribute('sid') |
207 if sid not in self.current_stream: | 233 if sid not in client.xep_0047_current_stream: |
208 error(_("Received data for an unknown session id")) | 234 error(_("Received data for an unknown session id")) |
209 return False | 235 return False |
210 xmlstream = self.current_stream[sid]["xmlstream"] | 236 |
211 | 237 from_jid = client.xep_0047_current_stream[sid]["from"] |
212 from_jid = self.current_stream[sid]["from"] | 238 file_obj = client.xep_0047_current_stream[sid]["file_obj"] |
213 file_obj = self.current_stream[sid]["file_obj"] | |
214 | 239 |
215 if stanza_from_jid != from_jid: | 240 if stanza_from_jid != from_jid: |
216 warning(_("sended jid inconsistency (man in the middle attack attempt ?)")) | 241 warning(_("sended jid inconsistency (man in the middle attack attempt ?)")) |
217 if stanza=='iq': | 242 if stanza=='iq': |
218 self.sendNotAcceptableError(sid, from_jid, xmlstream) | 243 self.sendNotAcceptableError(sid, from_jid, client.xmlstream) |
219 return False | 244 return False |
220 | 245 |
221 self.current_stream[sid]["seq"]+=1 | 246 client.xep_0047_current_stream[sid]["seq"]+=1 |
222 if int(data_elt.getAttribute("seq",-1)) != self.current_stream[sid]["seq"]: | 247 if int(data_elt.getAttribute("seq",-1)) != client.xep_0047_current_stream[sid]["seq"]: |
223 warning(_("Sequence error")) | 248 warning(_("Sequence error")) |
224 if stanza=='iq': | 249 if stanza=='iq': |
225 self.sendNotAcceptableError(IQ["id"], from_jid, xmlstream) | 250 self.sendNotAcceptableError(data_elt["id"], from_jid, client.xmlstream) |
226 return False | 251 return False |
227 | 252 |
228 #we reset the timeout: | 253 #we reset the timeout: |
229 self.current_stream[sid]["timer"].reset(TIMEOUT) | 254 client.xep_0047_current_stream[sid]["timer"].reset(TIMEOUT) |
230 | 255 |
231 #we can now decode the data | 256 #we can now decode the data |
232 try: | 257 try: |
233 file_obj.write(base64.b64decode(str(data_elt))) | 258 file_obj.write(base64.b64decode(str(data_elt))) |
234 except TypeError: | 259 except TypeError: |
235 #The base64 data is invalid | 260 #The base64 data is invalid |
236 warning(_("Invalid base64 data")) | 261 warning(_("Invalid base64 data")) |
237 if stanza=='iq': | 262 if stanza=='iq': |
238 self.sendNotAcceptableError(IQ["id"], from_jid, xmlstream) | 263 self.sendNotAcceptableError(data_elt["id"], from_jid, client.xmlstream) |
239 return False | 264 return False |
240 return True | 265 return True |
241 | 266 |
242 def sendNotAcceptableError(self, iq_id, to_jid, xmlstream): | 267 def sendNotAcceptableError(self, iq_id, to_jid, xmlstream): |
243 """Not acceptable error used when the stream is not expected or something is going wrong | 268 """Not acceptable error used when the stream is not expected or something is going wrong |
251 error_el = result.addElement('error') | 276 error_el = result.addElement('error') |
252 error_el['type'] = 'cancel' | 277 error_el['type'] = 'cancel' |
253 error_el.addElement(('urn:ietf:params:xml:ns:xmpp-stanzas','not-acceptable')) | 278 error_el.addElement(('urn:ietf:params:xml:ns:xmpp-stanzas','not-acceptable')) |
254 xmlstream.send(result) | 279 xmlstream.send(result) |
255 | 280 |
256 def startStream(self, file_obj, to_jid, sid, length, successCb, failureCb, size = None, profile='@NONE@'): | 281 def startStream(self, file_obj, to_jid, sid, length, successCb, failureCb, size = None, profile=None): |
257 """Launch the stream workflow | 282 """Launch the stream workflow |
258 @param file_obj: file_obj to send | 283 @param file_obj: file_obj to send |
259 @param to_jid: JID of the recipient | 284 @param to_jid: JID of the recipient |
260 @param sid: Stream session id | 285 @param sid: Stream session id |
261 @param length: number of byte to send, or None to send until the end | 286 @param length: number of byte to send, or None to send until the end |
262 @param successCb: method to call when stream successfuly finished | 287 @param successCb: method to call when stream successfuly finished |
263 @param failureCb: method to call when something goes wrong | 288 @param failureCb: method to call when something goes wrong |
264 @param profile: %(doc_profile)s""" | 289 @param profile: %(doc_profile)s""" |
290 client = self.host.getClient(profile) | |
291 if not client: | |
292 raise ProfileNotInCacheError | |
265 if length != None: | 293 if length != None: |
266 error(_('stream length not managed yet')) | 294 error(_('stream length not managed yet')) |
267 return; | 295 return; |
268 profile_jid, xmlstream = self.host.getJidNStream(profile) | 296 data = client.xep_0047_current_stream[sid] = {} |
269 data = self.current_stream[sid] = {} | |
270 data["timer"] = reactor.callLater(TIMEOUT, self._timeOut, sid) | 297 data["timer"] = reactor.callLater(TIMEOUT, self._timeOut, sid) |
271 data["file_obj"] = file_obj | 298 data["file_obj"] = file_obj |
272 data["to"] = to_jid | 299 data["to"] = to_jid |
273 data["success_cb"] = successCb | 300 data["success_cb"] = successCb |
274 data["failure_cb"] = failureCb | 301 data["failure_cb"] = failureCb |
275 data["xmlstream"] = xmlstream | |
276 data["block_size"] = BLOCK_SIZE | 302 data["block_size"] = BLOCK_SIZE |
277 if size: | 303 if size: |
278 data["size"] = size | 304 data["size"] = size |
279 self.host.registerProgressCB(sid, self.getProgress) | 305 self.host.registerProgressCB(sid, self.getProgress, profile) |
280 iq_elt = client.IQ(xmlstream,'set') | 306 iq_elt = jabber_client.IQ(client.xmlstream,'set') |
281 iq_elt['from'] = profile_jid.full() | 307 iq_elt['from'] = client.jid.full() |
282 iq_elt['to'] = to_jid.full() | 308 iq_elt['to'] = to_jid.full() |
283 open_elt = iq_elt.addElement('open',NS_IBB) | 309 open_elt = iq_elt.addElement('open',NS_IBB) |
284 open_elt['block-size'] = str(BLOCK_SIZE) | 310 open_elt['block-size'] = str(BLOCK_SIZE) |
285 open_elt['sid'] = sid | 311 open_elt['sid'] = sid |
286 open_elt['stanza'] = 'iq' | 312 open_elt['stanza'] = 'iq' |
287 iq_elt.addCallback(self.iqResult, sid, 0, length) | 313 iq_elt.addCallback(self.iqResult, sid, 0, length, profile) |
288 iq_elt.send() | 314 iq_elt.send() |
289 | 315 |
290 def iqResult(self, sid, seq, length, iq_elt): | 316 def iqResult(self, sid, seq, length, profile, iq_elt): |
291 """Called when the result of open iq is received""" | 317 """Called when the result of open iq is received""" |
292 data = self.current_stream[sid] | 318 client = self.host.getClient(profile) |
319 if not client: | |
320 raise ProfileNotInCacheError | |
321 data = client.xep_0047_current_stream[sid] | |
293 if iq_elt["type"] == "error": | 322 if iq_elt["type"] == "error": |
294 warning(_("Transfer failed")) | 323 warning(_("Transfer failed")) |
295 self.terminateStream(sid, "IQ_ERROR") | 324 self.terminateStream(sid, "IQ_ERROR") |
296 return | 325 return |
297 | 326 |
298 if data['timer'].active(): | 327 if data['timer'].active(): |
299 data['timer'].cancel() | 328 data['timer'].cancel() |
300 | 329 |
301 buffer = data["file_obj"].read(data["block_size"]) | 330 buffer = data["file_obj"].read(data["block_size"]) |
302 if buffer: | 331 if buffer: |
303 next_iq_elt = client.IQ(data["xmlstream"],'set') | 332 next_iq_elt = jabber_client.IQ(client.xmlstream,'set') |
304 next_iq_elt['to'] = data["to"].full() | 333 next_iq_elt['to'] = data["to"].full() |
305 data_elt = next_iq_elt.addElement('data', NS_IBB) | 334 data_elt = next_iq_elt.addElement('data', NS_IBB) |
306 data_elt['seq'] = str(seq) | 335 data_elt['seq'] = str(seq) |
307 data_elt['sid'] = sid | 336 data_elt['sid'] = sid |
308 data_elt.addContent(base64.b64encode(buffer)) | 337 data_elt.addContent(base64.b64encode(buffer)) |
309 next_iq_elt.addCallback(self.iqResult, sid, seq+1, length) | 338 next_iq_elt.addCallback(self.iqResult, sid, seq+1, length, profile) |
310 next_iq_elt.send() | 339 next_iq_elt.send() |
311 else: | 340 else: |
312 self.terminateStream(sid) | 341 self.terminateStream(sid, profile=profile) |
313 | 342 |
314 def terminateStream(self, sid, failure_reason = None): | 343 def terminateStream(self, sid, failure_reason = None, profile=None): |
315 """Terminate the stream session | 344 """Terminate the stream session |
316 @param to_jid: recipient | 345 @param to_jid: recipient |
317 @param sid: Session id | 346 @param sid: Session id |
318 @param file_obj: file object used | 347 @param file_obj: file object used |
319 @param xmlstream: XML stream used with this session | 348 @param xmlstream: XML stream used with this session |
320 @param progress_cb: True if we have to remove the progress callback | 349 @param progress_cb: True if we have to remove the progress callback |
321 @param callback: method to call after finishing | 350 @param callback: method to call after finishing |
322 @param failure_reason: reason of the failure, or None if steam was successful""" | 351 @param failure_reason: reason of the failure, or None if steam was successful""" |
323 data = self.current_stream[sid] | 352 client = self.host.getClient(profile) |
324 iq_elt = client.IQ(data["xmlstream"],'set') | 353 if not client: |
354 raise ProfileNotInCacheError | |
355 data = client.xep_0047_current_stream[sid] | |
356 iq_elt = jabber_client.IQ(client.xmlstream,'set') | |
325 iq_elt['to'] = data["to"].full() | 357 iq_elt['to'] = data["to"].full() |
326 close_elt = iq_elt.addElement('close',NS_IBB) | 358 close_elt = iq_elt.addElement('close',NS_IBB) |
327 close_elt['sid'] = sid | 359 close_elt['sid'] = sid |
328 iq_elt.send() | 360 iq_elt.send() |
329 self.host.removeProgressCB(sid) | 361 self.host.removeProgressCB(sid, profile) |
330 if failure_reason: | 362 if failure_reason: |
331 self._killId(sid, False, failure_reason) | 363 self._killId(sid, False, failure_reason, profile=profile) |
332 else: | 364 else: |
333 self._killId(sid, True) | 365 self._killId(sid, True, profile=profile) |
334 | 366 |
335 class XEP_0047_handler(XMPPHandler): | 367 class XEP_0047_handler(XMPPHandler): |
336 implements(iwokkel.IDisco) | 368 implements(iwokkel.IDisco) |
337 | 369 |
338 def __init__(self,parent): | 370 def __init__(self,parent): |