comparison src/plugins/plugin_xep_0047.py @ 941:c6d8fc63b1db

core, plugins: host.getClient now raise an exception instead of returning None when no profile is found, plugins have been adapted consequently and a bit cleaned
author Goffi <goffi@goffi.org>
date Fri, 28 Mar 2014 18:07:02 +0100
parents 1fe00f0c9a91
children 301b342c697a
comparison
equal deleted inserted replaced
940:92e41e7c7e00 941:c6d8fc63b1db
19 19
20 from sat.core.i18n import _ 20 from sat.core.i18n import _
21 from logging import debug, info, warning, error 21 from logging import debug, info, warning, error
22 from twisted.words.protocols.jabber import client as jabber_client, jid 22 from twisted.words.protocols.jabber import client as jabber_client, jid
23 from twisted.words.xish import domish 23 from twisted.words.xish import domish
24 import twisted.internet.error
25 from twisted.internet import reactor 24 from twisted.internet import reactor
26 from sat.core.exceptions import ProfileNotInCacheError
27 25
28 from wokkel import disco, iwokkel 26 from wokkel import disco, iwokkel
29 27
30 from zope.interface import implements 28 from zope.interface import implements
31 29
67 def getHandler(self, profile): 65 def getHandler(self, profile):
68 return XEP_0047_handler(self) 66 return XEP_0047_handler(self)
69 67
70 def profileConnected(self, profile): 68 def profileConnected(self, profile):
71 client = self.host.getClient(profile) 69 client = self.host.getClient(profile)
72 if not client:
73 raise ProfileNotInCacheError
74 client.xep_0047_current_stream = {} # key: stream_id, value: data(dict) 70 client.xep_0047_current_stream = {} # key: stream_id, value: data(dict)
75 71
76 def _timeOut(self, sid, profile): 72 def _timeOut(self, sid, profile):
77 """Delecte current_stream id, called after timeout 73 """Delecte current_stream id, called after timeout
78 @param id: id of client.xep_0047_current_stream""" 74 @param id: id of client.xep_0047_current_stream"""
83 def _killId(self, sid, success=False, failure_reason="UNKNOWN", profile=None): 79 def _killId(self, sid, success=False, failure_reason="UNKNOWN", profile=None):
84 """Delete an current_stream id, clean up associated observers 80 """Delete an current_stream id, clean up associated observers
85 @param sid: id of client.xep_0047_current_stream""" 81 @param sid: id of client.xep_0047_current_stream"""
86 assert(profile) 82 assert(profile)
87 client = self.host.getClient(profile) 83 client = self.host.getClient(profile)
88 if not client:
89 warning(_("Client no more in cache"))
90 return
91 if sid not in client.xep_0047_current_stream: 84 if sid not in client.xep_0047_current_stream:
92 warning(_("kill id called on a non existant id")) 85 warning(_("kill id called on a non existant id"))
93 return 86 return
94 if "observer_cb" in client.xep_0047_current_stream[sid]: 87 if "observer_cb" in client.xep_0047_current_stream[sid]:
95 client.xmlstream.removeObserver(client.xep_0047_current_stream[sid]["event_data"], client.xep_0047_current_stream[sid]["observer_cb"]) 88 client.xmlstream.removeObserver(client.xep_0047_current_stream[sid]["event_data"], client.xep_0047_current_stream[sid]["observer_cb"])
110 failure_cb(sid, file_obj, NS_IBB, failure_reason, profile) 103 failure_cb(sid, file_obj, NS_IBB, failure_reason, profile)
111 104
112 def getProgress(self, sid, data, profile): 105 def getProgress(self, sid, data, profile):
113 """Fill data with position of current transfer""" 106 """Fill data with position of current transfer"""
114 client = self.host.getClient(profile) 107 client = self.host.getClient(profile)
115 if not client:
116 raise ProfileNotInCacheError
117 try: 108 try:
118 file_obj = client.xep_0047_current_stream[sid]["file_obj"] 109 file_obj = client.xep_0047_current_stream[sid]["file_obj"]
119 data["position"] = str(file_obj.tell()) 110 data["position"] = str(file_obj.tell())
120 data["size"] = str(client.xep_0047_current_stream[sid]["size"]) 111 data["size"] = str(client.xep_0047_current_stream[sid]["size"])
121 except: 112 except:
129 @param size: full size of the data, or None if unknown 120 @param size: full size of the data, or None if unknown
130 @param success_cb: method to call when successfuly finished 121 @param success_cb: method to call when successfuly finished
131 @param failure_cb: method to call when something goes wrong 122 @param failure_cb: method to call when something goes wrong
132 @param profile: %(doc_profile)s""" 123 @param profile: %(doc_profile)s"""
133 client = self.host.getClient(profile) 124 client = self.host.getClient(profile)
134 if not client:
135 raise ProfileNotInCacheError
136 data = client.xep_0047_current_stream[sid] = {} 125 data = client.xep_0047_current_stream[sid] = {}
137 data["from"] = from_jid 126 data["from"] = from_jid
138 data["file_obj"] = file_obj 127 data["file_obj"] = file_obj
139 data["seq"] = -1 128 data["seq"] = -1
140 if size: 129 if size:
146 135
147 def streamOpening(self, IQ, profile): 136 def streamOpening(self, IQ, profile):
148 debug(_("IBB stream opening")) 137 debug(_("IBB stream opening"))
149 IQ.handled = True 138 IQ.handled = True
150 client = self.host.getClient(profile) 139 client = self.host.getClient(profile)
151 if not client:
152 raise ProfileNotInCacheError
153 open_elt = IQ.firstChildElement() 140 open_elt = IQ.firstChildElement()
154 block_size = open_elt.getAttribute('block-size') 141 block_size = open_elt.getAttribute('block-size')
155 sid = open_elt.getAttribute('sid') 142 sid = open_elt.getAttribute('sid')
156 stanza = open_elt.getAttribute('stanza', 'iq') 143 stanza = open_elt.getAttribute('stanza', 'iq')
157 if not sid or not block_size or int(block_size) > 65535: 144 if not sid or not block_size or int(block_size) > 65535:
188 client.xmlstream.send(result) 175 client.xmlstream.send(result)
189 176
190 def streamClosing(self, IQ, profile): 177 def streamClosing(self, IQ, profile):
191 IQ.handled = True 178 IQ.handled = True
192 client = self.host.getClient(profile) 179 client = self.host.getClient(profile)
193 if not client:
194 raise ProfileNotInCacheError
195 debug(_("IBB stream closing")) 180 debug(_("IBB stream closing"))
196 data_elt = IQ.firstChildElement() 181 data_elt = IQ.firstChildElement()
197 sid = data_elt.getAttribute('sid') 182 sid = data_elt.getAttribute('sid')
198 result = domish.Element((None, 'iq')) 183 result = domish.Element((None, 'iq'))
199 result['type'] = 'result' 184 result['type'] = 'result'
203 self._killId(sid, success=True, profile=profile) 188 self._killId(sid, success=True, profile=profile)
204 189
205 def iqData(self, IQ, profile): 190 def iqData(self, IQ, profile):
206 IQ.handled = True 191 IQ.handled = True
207 client = self.host.getClient(profile) 192 client = self.host.getClient(profile)
208 if not client:
209 raise ProfileNotInCacheError
210 data_elt = IQ.firstChildElement() 193 data_elt = IQ.firstChildElement()
211 194
212 if self._manageDataElt(data_elt, 'iq', IQ['id'], jid.JID(IQ['from']), profile): 195 if self._manageDataElt(data_elt, 'iq', IQ['id'], jid.JID(IQ['from']), profile):
213 #and send a success answer 196 #and send a success answer
214 result = domish.Element((None, 'iq')) 197 result = domish.Element((None, 'iq'))
217 result['to'] = IQ['from'] 200 result['to'] = IQ['from']
218 201
219 client.xmlstream.send(result) 202 client.xmlstream.send(result)
220 203
221 def messageData(self, message_elt, profile): 204 def messageData(self, message_elt, profile):
222 data_elt = message_elt.firstChildElement()
223 sid = message_elt.getAttribute('id', '') 205 sid = message_elt.getAttribute('id', '')
224 self._manageDataElt(message_elt, 'message', sid, jid.JID(message_elt['from']), profile) 206 self._manageDataElt(message_elt, 'message', sid, jid.JID(message_elt['from']), profile)
225 207
226 def _manageDataElt(self, data_elt, stanza, sid, stanza_from_jid, profile): 208 def _manageDataElt(self, data_elt, stanza, sid, stanza_from_jid, profile):
227 """Manage the data elelement (check validity and write to the file_obj) 209 """Manage the data elelement (check validity and write to the file_obj)
228 @param data_elt: "data" domish element 210 @param data_elt: "data" domish element
229 @return: True if success""" 211 @return: True if success"""
230 client = self.host.getClient(profile) 212 client = self.host.getClient(profile)
231 if not client:
232 raise ProfileNotInCacheError
233 sid = data_elt.getAttribute('sid') 213 sid = data_elt.getAttribute('sid')
234 if sid not in client.xep_0047_current_stream: 214 if sid not in client.xep_0047_current_stream:
235 error(_("Received data for an unknown session id")) 215 error(_("Received data for an unknown session id"))
236 return False 216 return False
237 217
287 @param length: number of byte to send, or None to send until the end 267 @param length: number of byte to send, or None to send until the end
288 @param successCb: method to call when stream successfuly finished 268 @param successCb: method to call when stream successfuly finished
289 @param failureCb: method to call when something goes wrong 269 @param failureCb: method to call when something goes wrong
290 @param profile: %(doc_profile)s""" 270 @param profile: %(doc_profile)s"""
291 client = self.host.getClient(profile) 271 client = self.host.getClient(profile)
292 if not client:
293 raise ProfileNotInCacheError
294 if length is not None: 272 if length is not None:
295 error(_('stream length not managed yet')) 273 error(_('stream length not managed yet'))
296 return 274 return
297 data = client.xep_0047_current_stream[sid] = {} 275 data = client.xep_0047_current_stream[sid] = {}
298 data["timer"] = reactor.callLater(TIMEOUT, self._timeOut, sid) 276 data["timer"] = reactor.callLater(TIMEOUT, self._timeOut, sid)
315 iq_elt.send() 293 iq_elt.send()
316 294
317 def iqResult(self, sid, seq, length, profile, iq_elt): 295 def iqResult(self, sid, seq, length, profile, iq_elt):
318 """Called when the result of open iq is received""" 296 """Called when the result of open iq is received"""
319 client = self.host.getClient(profile) 297 client = self.host.getClient(profile)
320 if not client:
321 raise ProfileNotInCacheError
322 data = client.xep_0047_current_stream[sid] 298 data = client.xep_0047_current_stream[sid]
323 if iq_elt["type"] == "error": 299 if iq_elt["type"] == "error":
324 warning(_("Transfer failed")) 300 warning(_("Transfer failed"))
325 self.terminateStream(sid, "IQ_ERROR") 301 self.terminateStream(sid, "IQ_ERROR")
326 return 302 return
349 @param xmlstream: XML stream used with this session 325 @param xmlstream: XML stream used with this session
350 @param progress_cb: True if we have to remove the progress callback 326 @param progress_cb: True if we have to remove the progress callback
351 @param callback: method to call after finishing 327 @param callback: method to call after finishing
352 @param failure_reason: reason of the failure, or None if steam was successful""" 328 @param failure_reason: reason of the failure, or None if steam was successful"""
353 client = self.host.getClient(profile) 329 client = self.host.getClient(profile)
354 if not client:
355 raise ProfileNotInCacheError
356 data = client.xep_0047_current_stream[sid] 330 data = client.xep_0047_current_stream[sid]
357 iq_elt = jabber_client.IQ(client.xmlstream, 'set') 331 iq_elt = jabber_client.IQ(client.xmlstream, 'set')
358 iq_elt['to'] = data["to"].full() 332 iq_elt['to'] = data["to"].full()
359 close_elt = iq_elt.addElement('close', NS_IBB) 333 close_elt = iq_elt.addElement('close', NS_IBB)
360 close_elt['sid'] = sid 334 close_elt['sid'] = sid