comparison src/plugins/plugin_xep_0047.py @ 594:e629371a28d3

Fix pep8 support in src/plugins.
author Emmanuel Gil Peyrot <linkmauve@linkmauve.fr>
date Fri, 18 Jan 2013 17:55:35 +0100
parents beaf6bec2fcd
children 84a6e83157c2
comparison
equal deleted inserted replaced
593:70bae685d05c 594:e629371a28d3
42 NS_IBB = 'http://jabber.org/protocol/ibb' 42 NS_IBB = 'http://jabber.org/protocol/ibb'
43 IBB_OPEN = IQ_SET + '/open[@xmlns="' + NS_IBB + '"]' 43 IBB_OPEN = IQ_SET + '/open[@xmlns="' + NS_IBB + '"]'
44 IBB_CLOSE = IQ_SET + '/close[@xmlns="' + NS_IBB + '" and @sid="%s"]' 44 IBB_CLOSE = IQ_SET + '/close[@xmlns="' + NS_IBB + '" and @sid="%s"]'
45 IBB_IQ_DATA = IQ_SET + '/data[@xmlns="' + NS_IBB + '" and @sid="%s"]' 45 IBB_IQ_DATA = IQ_SET + '/data[@xmlns="' + NS_IBB + '" and @sid="%s"]'
46 IBB_MESSAGE_DATA = MESSAGE + '/data[@xmlns="' + NS_IBB + '" and @sid="%s"]' 46 IBB_MESSAGE_DATA = MESSAGE + '/data[@xmlns="' + NS_IBB + '" and @sid="%s"]'
47 TIMEOUT = 60 #timeout for workflow 47 TIMEOUT = 60 # timeout for workflow
48 BLOCK_SIZE = 4096 48 BLOCK_SIZE = 4096
49 49
50 PLUGIN_INFO = { 50 PLUGIN_INFO = {
51 "name": "In-Band Bytestream Plugin", 51 "name": "In-Band Bytestream Plugin",
52 "import_name": "XEP-0047", 52 "import_name": "XEP-0047",
53 "type": "XEP", 53 "type": "XEP",
54 "protocols": ["XEP-0047"], 54 "protocols": ["XEP-0047"],
55 "main": "XEP_0047", 55 "main": "XEP_0047",
56 "handler": "yes", 56 "handler": "yes",
57 "description": _("""Implementation of In-Band Bytestreams""") 57 "description": _("""Implementation of In-Band Bytestreams""")
58 } 58 }
59
59 60
60 class XEP_0047(object): 61 class XEP_0047(object):
61 NAMESPACE = NS_IBB 62 NAMESPACE = NS_IBB
62 63
63 def __init__(self, host): 64 def __init__(self, host):
69 70
70 def profileConnected(self, profile): 71 def profileConnected(self, profile):
71 client = self.host.getClient(profile) 72 client = self.host.getClient(profile)
72 if not client: 73 if not client:
73 raise ProfileNotInCacheError 74 raise ProfileNotInCacheError
74 client.xep_0047_current_stream = {} #key: stream_id, value: data(dict) 75 client.xep_0047_current_stream = {} # key: stream_id, value: data(dict)
75 76
76 def _timeOut(self, sid, profile): 77 def _timeOut(self, sid, profile):
77 """Delecte current_stream id, called after timeout 78 """Delecte current_stream id, called after timeout
78 @param id: id of client.xep_0047_current_stream""" 79 @param id: id of client.xep_0047_current_stream"""
79 info(_("In-Band Bytestream: TimeOut reached for id %s [%s]") % (sid, profile)) 80 info(_("In-Band Bytestream: TimeOut reached for id %s [%s]") % (sid, profile))
85 assert(profile) 86 assert(profile)
86 client = self.host.getClient(profile) 87 client = self.host.getClient(profile)
87 if not client: 88 if not client:
88 warning(_("Client no more in cache")) 89 warning(_("Client no more in cache"))
89 return 90 return
90 if not client.xep_0047_current_stream.has_key(sid): 91 if sid not in client.xep_0047_current_stream:
91 warning(_("kill id called on a non existant id")) 92 warning(_("kill id called on a non existant id"))
92 return 93 return
93 if client.xep_0047_current_stream[sid].has_key("observer_cb"): 94 if "observer_cb" in client.xep_0047_current_stream[sid]:
94 client.xmlstream.removeObserver(client.xep_0047_current_stream[sid]["event_data"], client.xep_0047_current_stream[sid]["observer_cb"]) 95 client.xmlstream.removeObserver(client.xep_0047_current_stream[sid]["event_data"], client.xep_0047_current_stream[sid]["observer_cb"])
95 if client.xep_0047_current_stream[sid]['timer'].active(): 96 if client.xep_0047_current_stream[sid]['timer'].active():
96 client.xep_0047_current_stream[sid]['timer'].cancel() 97 client.xep_0047_current_stream[sid]['timer'].cancel()
97 if client.xep_0047_current_stream[sid].has_key("size"): 98 if "size" in client.xep_0047_current_stream[sid]:
98 self.host.removeProgressCB(sid, profile) 99 self.host.removeProgressCB(sid, profile)
99 100
100 file_obj = client.xep_0047_current_stream[sid]['file_obj'] 101 file_obj = client.xep_0047_current_stream[sid]['file_obj']
101 success_cb = client.xep_0047_current_stream[sid]['success_cb'] 102 success_cb = client.xep_0047_current_stream[sid]['success_cb']
102 failure_cb = client.xep_0047_current_stream[sid]['failure_cb'] 103 failure_cb = client.xep_0047_current_stream[sid]['failure_cb']
143 data["success_cb"] = success_cb 144 data["success_cb"] = success_cb
144 data["failure_cb"] = failure_cb 145 data["failure_cb"] = failure_cb
145 146
146 def streamOpening(self, IQ, profile): 147 def streamOpening(self, IQ, profile):
147 debug(_("IBB stream opening")) 148 debug(_("IBB stream opening"))
148 IQ.handled=True 149 IQ.handled = True
149 client = self.host.getClient(profile) 150 client = self.host.getClient(profile)
150 if not client: 151 if not client:
151 raise ProfileNotInCacheError 152 raise ProfileNotInCacheError
152 open_elt = IQ.firstChildElement() 153 open_elt = IQ.firstChildElement()
153 block_size = open_elt.getAttribute('block-size') 154 block_size = open_elt.getAttribute('block-size')
154 sid = open_elt.getAttribute('sid') 155 sid = open_elt.getAttribute('sid')
155 stanza = open_elt.getAttribute('stanza', 'iq') 156 stanza = open_elt.getAttribute('stanza', 'iq')
156 if not sid or not block_size or int(block_size)>65535: 157 if not sid or not block_size or int(block_size) > 65535:
157 warning(_("malformed IBB transfer: %s" % IQ['id'])) 158 warning(_("malformed IBB transfer: %s" % IQ['id']))
158 self.sendNotAcceptableError(IQ['id'], IQ['from'], client.xmlstream) 159 self.sendNotAcceptableError(IQ['id'], IQ['from'], client.xmlstream)
159 return 160 return
160 if not sid in client.xep_0047_current_stream: 161 if not sid in client.xep_0047_current_stream:
161 warning(_("Ignoring unexpected IBB transfer: %s" % sid)) 162 warning(_("Ignoring unexpected IBB transfer: %s" % sid))
171 172
172 #we reset the timeout: 173 #we reset the timeout:
173 client.xep_0047_current_stream[sid]["timer"].reset(TIMEOUT) 174 client.xep_0047_current_stream[sid]["timer"].reset(TIMEOUT)
174 175
175 #we save the xmlstream, events and observer data to allow observer removal 176 #we save the xmlstream, events and observer data to allow observer removal
176 client.xep_0047_current_stream[sid]["event_data"] = event_data = (IBB_MESSAGE_DATA if stanza=='message' else IBB_IQ_DATA) % sid 177 client.xep_0047_current_stream[sid]["event_data"] = event_data = (IBB_MESSAGE_DATA if stanza == 'message' else IBB_IQ_DATA) % sid
177 client.xep_0047_current_stream[sid]["observer_cb"] = observer_cb = self.messageData if stanza=='message' else self.iqData 178 client.xep_0047_current_stream[sid]["observer_cb"] = observer_cb = self.messageData if stanza == 'message' else self.iqData
178 event_close = IBB_CLOSE % sid 179 event_close = IBB_CLOSE % sid
179 #we now set the stream observer to look after data packet 180 #we now set the stream observer to look after data packet
180 client.xmlstream.addObserver(event_data, observer_cb, profile = profile) 181 client.xmlstream.addObserver(event_data, observer_cb, profile=profile)
181 client.xmlstream.addOnetimeObserver(event_close, self.streamClosing, profile = profile) 182 client.xmlstream.addOnetimeObserver(event_close, self.streamClosing, profile=profile)
182 #finally, we send the accept stanza 183 #finally, we send the accept stanza
183 result = domish.Element((None, 'iq')) 184 result = domish.Element((None, 'iq'))
184 result['type'] = 'result' 185 result['type'] = 'result'
185 result['id'] = IQ['id'] 186 result['id'] = IQ['id']
186 result['to'] = IQ['from'] 187 result['to'] = IQ['from']
187 client.xmlstream.send(result) 188 client.xmlstream.send(result)
188 189
189 def streamClosing(self, IQ, profile): 190 def streamClosing(self, IQ, profile):
190 IQ.handled=True 191 IQ.handled = True
191 client = self.host.getClient(profile) 192 client = self.host.getClient(profile)
192 if not client: 193 if not client:
193 raise ProfileNotInCacheError 194 raise ProfileNotInCacheError
194 debug(_("IBB stream closing")) 195 debug(_("IBB stream closing"))
195 data_elt = IQ.firstChildElement() 196 data_elt = IQ.firstChildElement()
200 result['to'] = IQ['from'] 201 result['to'] = IQ['from']
201 client.xmlstream.send(result) 202 client.xmlstream.send(result)
202 self._killId(sid, success=True, profile=profile) 203 self._killId(sid, success=True, profile=profile)
203 204
204 def iqData(self, IQ, profile): 205 def iqData(self, IQ, profile):
205 IQ.handled=True 206 IQ.handled = True
206 client = self.host.getClient(profile) 207 client = self.host.getClient(profile)
207 if not client: 208 if not client:
208 raise ProfileNotInCacheError 209 raise ProfileNotInCacheError
209 data_elt = IQ.firstChildElement() 210 data_elt = IQ.firstChildElement()
210 211
217 218
218 client.xmlstream.send(result) 219 client.xmlstream.send(result)
219 220
220 def messageData(self, message_elt, profile): 221 def messageData(self, message_elt, profile):
221 data_elt = message_elt.firstChildElement() 222 data_elt = message_elt.firstChildElement()
222 sid = message_elt.getAttribute('id','') 223 sid = message_elt.getAttribute('id', '')
223 self._manageDataElt(message_elt, 'message', sid, jid.JID(message_elt['from']), profile) 224 self._manageDataElt(message_elt, 'message', sid, jid.JID(message_elt['from']), profile)
224 225
225 def _manageDataElt(self, data_elt, stanza, sid, stanza_from_jid, profile): 226 def _manageDataElt(self, data_elt, stanza, sid, stanza_from_jid, profile):
226 """Manage the data elelement (check validity and write to the file_obj) 227 """Manage the data elelement (check validity and write to the file_obj)
227 @param data_elt: "data" domish element 228 @param data_elt: "data" domish element
237 from_jid = client.xep_0047_current_stream[sid]["from"] 238 from_jid = client.xep_0047_current_stream[sid]["from"]
238 file_obj = client.xep_0047_current_stream[sid]["file_obj"] 239 file_obj = client.xep_0047_current_stream[sid]["file_obj"]
239 240
240 if stanza_from_jid != from_jid: 241 if stanza_from_jid != from_jid:
241 warning(_("sended jid inconsistency (man in the middle attack attempt ?)")) 242 warning(_("sended jid inconsistency (man in the middle attack attempt ?)"))
242 if stanza=='iq': 243 if stanza == 'iq':
243 self.sendNotAcceptableError(sid, from_jid, client.xmlstream) 244 self.sendNotAcceptableError(sid, from_jid, client.xmlstream)
244 return False 245 return False
245 246
246 client.xep_0047_current_stream[sid]["seq"]+=1 247 client.xep_0047_current_stream[sid]["seq"] += 1
247 if int(data_elt.getAttribute("seq",-1)) != client.xep_0047_current_stream[sid]["seq"]: 248 if int(data_elt.getAttribute("seq", -1)) != client.xep_0047_current_stream[sid]["seq"]:
248 warning(_("Sequence error")) 249 warning(_("Sequence error"))
249 if stanza=='iq': 250 if stanza == 'iq':
250 self.sendNotAcceptableError(data_elt["id"], from_jid, client.xmlstream) 251 self.sendNotAcceptableError(data_elt["id"], from_jid, client.xmlstream)
251 return False 252 return False
252 253
253 #we reset the timeout: 254 #we reset the timeout:
254 client.xep_0047_current_stream[sid]["timer"].reset(TIMEOUT) 255 client.xep_0047_current_stream[sid]["timer"].reset(TIMEOUT)
257 try: 258 try:
258 file_obj.write(base64.b64decode(str(data_elt))) 259 file_obj.write(base64.b64decode(str(data_elt)))
259 except TypeError: 260 except TypeError:
260 #The base64 data is invalid 261 #The base64 data is invalid
261 warning(_("Invalid base64 data")) 262 warning(_("Invalid base64 data"))
262 if stanza=='iq': 263 if stanza == 'iq':
263 self.sendNotAcceptableError(data_elt["id"], from_jid, client.xmlstream) 264 self.sendNotAcceptableError(data_elt["id"], from_jid, client.xmlstream)
264 return False 265 return False
265 return True 266 return True
266 267
267 def sendNotAcceptableError(self, iq_id, to_jid, xmlstream): 268 def sendNotAcceptableError(self, iq_id, to_jid, xmlstream):
273 result['type'] = 'result' 274 result['type'] = 'result'
274 result['id'] = iq_id 275 result['id'] = iq_id
275 result['to'] = to_jid 276 result['to'] = to_jid
276 error_el = result.addElement('error') 277 error_el = result.addElement('error')
277 error_el['type'] = 'cancel' 278 error_el['type'] = 'cancel'
278 error_el.addElement(('urn:ietf:params:xml:ns:xmpp-stanzas','not-acceptable')) 279 error_el.addElement(('urn:ietf:params:xml:ns:xmpp-stanzas', 'not-acceptable'))
279 xmlstream.send(result) 280 xmlstream.send(result)
280 281
281 def startStream(self, file_obj, to_jid, sid, length, successCb, failureCb, size = None, profile=None): 282 def startStream(self, file_obj, to_jid, sid, length, successCb, failureCb, size=None, profile=None):
282 """Launch the stream workflow 283 """Launch the stream workflow
283 @param file_obj: file_obj to send 284 @param file_obj: file_obj to send
284 @param to_jid: JID of the recipient 285 @param to_jid: JID of the recipient
285 @param sid: Stream session id 286 @param sid: Stream session id
286 @param length: number of byte to send, or None to send until the end 287 @param length: number of byte to send, or None to send until the end
288 @param failureCb: method to call when something goes wrong 289 @param failureCb: method to call when something goes wrong
289 @param profile: %(doc_profile)s""" 290 @param profile: %(doc_profile)s"""
290 client = self.host.getClient(profile) 291 client = self.host.getClient(profile)
291 if not client: 292 if not client:
292 raise ProfileNotInCacheError 293 raise ProfileNotInCacheError
293 if length != None: 294 if length is not None:
294 error(_('stream length not managed yet')) 295 error(_('stream length not managed yet'))
295 return 296 return
296 data = client.xep_0047_current_stream[sid] = {} 297 data = client.xep_0047_current_stream[sid] = {}
297 data["timer"] = reactor.callLater(TIMEOUT, self._timeOut, sid) 298 data["timer"] = reactor.callLater(TIMEOUT, self._timeOut, sid)
298 data["file_obj"] = file_obj 299 data["file_obj"] = file_obj
301 data["failure_cb"] = failureCb 302 data["failure_cb"] = failureCb
302 data["block_size"] = BLOCK_SIZE 303 data["block_size"] = BLOCK_SIZE
303 if size: 304 if size:
304 data["size"] = size 305 data["size"] = size
305 self.host.registerProgressCB(sid, self.getProgress, profile) 306 self.host.registerProgressCB(sid, self.getProgress, profile)
306 iq_elt = jabber_client.IQ(client.xmlstream,'set') 307 iq_elt = jabber_client.IQ(client.xmlstream, 'set')
307 iq_elt['from'] = client.jid.full() 308 iq_elt['from'] = client.jid.full()
308 iq_elt['to'] = to_jid.full() 309 iq_elt['to'] = to_jid.full()
309 open_elt = iq_elt.addElement('open',NS_IBB) 310 open_elt = iq_elt.addElement('open', NS_IBB)
310 open_elt['block-size'] = str(BLOCK_SIZE) 311 open_elt['block-size'] = str(BLOCK_SIZE)
311 open_elt['sid'] = sid 312 open_elt['sid'] = sid
312 open_elt['stanza'] = 'iq' 313 open_elt['stanza'] = 'iq'
313 iq_elt.addCallback(self.iqResult, sid, 0, length, profile) 314 iq_elt.addCallback(self.iqResult, sid, 0, length, profile)
314 iq_elt.send() 315 iq_elt.send()
327 if data['timer'].active(): 328 if data['timer'].active():
328 data['timer'].cancel() 329 data['timer'].cancel()
329 330
330 buffer = data["file_obj"].read(data["block_size"]) 331 buffer = data["file_obj"].read(data["block_size"])
331 if buffer: 332 if buffer:
332 next_iq_elt = jabber_client.IQ(client.xmlstream,'set') 333 next_iq_elt = jabber_client.IQ(client.xmlstream, 'set')
333 next_iq_elt['to'] = data["to"].full() 334 next_iq_elt['to'] = data["to"].full()
334 data_elt = next_iq_elt.addElement('data', NS_IBB) 335 data_elt = next_iq_elt.addElement('data', NS_IBB)
335 data_elt['seq'] = str(seq) 336 data_elt['seq'] = str(seq)
336 data_elt['sid'] = sid 337 data_elt['sid'] = sid
337 data_elt.addContent(base64.b64encode(buffer)) 338 data_elt.addContent(base64.b64encode(buffer))
338 next_iq_elt.addCallback(self.iqResult, sid, seq+1, length, profile) 339 next_iq_elt.addCallback(self.iqResult, sid, seq + 1, length, profile)
339 next_iq_elt.send() 340 next_iq_elt.send()
340 else: 341 else:
341 self.terminateStream(sid, profile=profile) 342 self.terminateStream(sid, profile=profile)
342 343
343 def terminateStream(self, sid, failure_reason = None, profile=None): 344 def terminateStream(self, sid, failure_reason=None, profile=None):
344 """Terminate the stream session 345 """Terminate the stream session
345 @param to_jid: recipient 346 @param to_jid: recipient
346 @param sid: Session id 347 @param sid: Session id
347 @param file_obj: file object used 348 @param file_obj: file object used
348 @param xmlstream: XML stream used with this session 349 @param xmlstream: XML stream used with this session
351 @param failure_reason: reason of the failure, or None if steam was successful""" 352 @param failure_reason: reason of the failure, or None if steam was successful"""
352 client = self.host.getClient(profile) 353 client = self.host.getClient(profile)
353 if not client: 354 if not client:
354 raise ProfileNotInCacheError 355 raise ProfileNotInCacheError
355 data = client.xep_0047_current_stream[sid] 356 data = client.xep_0047_current_stream[sid]
356 iq_elt = jabber_client.IQ(client.xmlstream,'set') 357 iq_elt = jabber_client.IQ(client.xmlstream, 'set')
357 iq_elt['to'] = data["to"].full() 358 iq_elt['to'] = data["to"].full()
358 close_elt = iq_elt.addElement('close',NS_IBB) 359 close_elt = iq_elt.addElement('close', NS_IBB)
359 close_elt['sid'] = sid 360 close_elt['sid'] = sid
360 iq_elt.send() 361 iq_elt.send()
361 self.host.removeProgressCB(sid, profile) 362 self.host.removeProgressCB(sid, profile)
362 if failure_reason: 363 if failure_reason:
363 self._killId(sid, False, failure_reason, profile=profile) 364 self._killId(sid, False, failure_reason, profile=profile)
364 else: 365 else:
365 self._killId(sid, True, profile=profile) 366 self._killId(sid, True, profile=profile)
366 367
368
367 class XEP_0047_handler(XMPPHandler): 369 class XEP_0047_handler(XMPPHandler):
368 implements(iwokkel.IDisco) 370 implements(iwokkel.IDisco)
369 371
370 def __init__(self,parent): 372 def __init__(self, parent):
371 self.plugin_parent = parent 373 self.plugin_parent = parent
372 374
373 def connectionInitialized(self): 375 def connectionInitialized(self):
374 self.xmlstream.addObserver(IBB_OPEN, self.plugin_parent.streamOpening, profile = self.parent.profile) 376 self.xmlstream.addObserver(IBB_OPEN, self.plugin_parent.streamOpening, profile=self.parent.profile)
375 377
376 def getDiscoInfo(self, requestor, target, nodeIdentifier=''): 378 def getDiscoInfo(self, requestor, target, nodeIdentifier=''):
377 return [disco.DiscoFeature(NS_IBB)] 379 return [disco.DiscoFeature(NS_IBB)]
378 380
379 def getDiscoItems(self, requestor, target, nodeIdentifier=''): 381 def getDiscoItems(self, requestor, target, nodeIdentifier=''):