Mercurial > libervia-backend
comparison src/plugins/plugin_xep_0047.py @ 384:785420cd63f7
plugins: In-Band Bytestreams (XEP-0047) implementation
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 29 Sep 2011 12:05:45 +0200 |
parents | |
children | 6ff50d609b16 |
comparison
equal
deleted
inserted
replaced
383:98e1d44d5cd4 | 384:785420cd63f7 |
---|---|
1 #!/usr/bin/python | |
2 # -*- coding: utf-8 -*- | |
3 | |
4 """ | |
5 SAT plugin for managing gateways (xep-0047) | |
6 Copyright (C) 2009, 2010, 2011 Jérôme Poisson (goffi@goffi.org) | |
7 | |
8 This program is free software: you can redistribute it and/or modify | |
9 it under the terms of the GNU General Public License as published by | |
10 the Free Software Foundation, either version 3 of the License, or | |
11 (at your option) any later version. | |
12 | |
13 This program is distributed in the hope that it will be useful, | |
14 but WITHOUT ANY WARRANTY; without even the implied warranty of | |
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
16 GNU General Public License for more details. | |
17 | |
18 You should have received a copy of the GNU General Public License | |
19 along with this program. If not, see <http://www.gnu.org/licenses/>. | |
20 """ | |
21 | |
22 from logging import debug, info, warning, error | |
23 from twisted.internet import protocol | |
24 from twisted.words.protocols.jabber import client, jid | |
25 from twisted.words.protocols.jabber import error as jab_error | |
26 from twisted.words.xish import domish | |
27 import twisted.internet.error | |
28 from twisted.internet import reactor | |
29 | |
30 from wokkel import disco, iwokkel | |
31 | |
32 from zope.interface import implements | |
33 | |
34 import base64 | |
35 | |
36 try: | |
37 from twisted.words.protocols.xmlstream import XMPPHandler | |
38 except ImportError: | |
39 from wokkel.subprotocols import XMPPHandler | |
40 | |
41 MESSAGE = '/message' | |
42 IQ = '/iq' | |
43 IQ_SET = '/iq[@type="set"]' | |
44 NS_IBB = 'http://jabber.org/protocol/ibb' | |
45 IBB_OPEN = IQ_SET + '/open[@xmlns="' + NS_IBB + '"]' | |
46 IBB_CLOSE = IQ_SET + '/close[@xmlns="' + NS_IBB + '" and @sid="%s"]' | |
47 IBB_IQ_DATA = IQ + '/data[@xmlns="' + NS_IBB + '" and @sid="%s"]' #we use IQ instead of IQ_SET because of a bug in Gajim | |
48 IBB_MESSAGE_DATA = MESSAGE + '/data[@xmlns="' + NS_IBB + '" and @sid="%s"]' | |
49 TIMEOUT = 60 #timeout for workflow | |
50 BLOCK_SIZE = 4096 | |
51 | |
52 PLUGIN_INFO = { | |
53 "name": "In-Band Bytestream Plugin", | |
54 "import_name": "XEP-0047", | |
55 "type": "XEP", | |
56 "protocols": ["XEP-0047"], | |
57 "main": "XEP_0047", | |
58 "handler": "yes", | |
59 "description": _("""Implementation of In-Band Bytestreams""") | |
60 } | |
61 | |
62 class XEP_0047(): | |
63 NAMESPACE = NS_IBB | |
64 | |
65 def __init__(self, host): | |
66 info(_("In-Band Bytestreams plugin initialization")) | |
67 self.host = host | |
68 self.current_stream = {} #key: stream_id, value: data(dict) | |
69 | |
70 def getHandler(self, profile): | |
71 return XEP_0047_handler(self) | |
72 | |
73 def _timeOut(self, sid): | |
74 """Delecte current_stream id, called after timeout | |
75 @param id: id of self.current_stream""" | |
76 info(_("In-Band Bytestream: TimeOut reached for id %s") % sid); | |
77 self._killId(sid, False, "TIMEOUT") | |
78 | |
79 def _killId(self, sid, success=False, failure_reason="UNKNOWN"): | |
80 """Delete an current_stream id, clean up associated observers | |
81 @param sid: id of self.current_stream""" | |
82 if not self.current_stream.has_key(sid): | |
83 warning(_("kill id called on a non existant id")) | |
84 return | |
85 if self.current_stream[sid].has_key("observer_cb"): | |
86 xmlstream = self.current_stream[sid]["xmlstream"] | |
87 xmlstream.removeObserver(self.current_stream[sid]["event_data"], self.current_stream[sid]["observer_cb"]) | |
88 self.current_stream[sid]['timer'].cancel() | |
89 if self.current_stream[sid].has_key("size"): | |
90 self.host.removeProgressCB(sid) | |
91 | |
92 file_obj = self.current_stream[sid]['file_obj'] | |
93 success_cb = self.current_stream[sid]['success_cb'] | |
94 failure_cb = self.current_stream[sid]['failure_cb'] | |
95 | |
96 del self.current_stream[sid] | |
97 | |
98 if success: | |
99 success_cb(sid, file_obj, NS_IBB) | |
100 else: | |
101 failure_cb(sid, file_obj, NS_IBB, failure_reason) | |
102 | |
103 def getProgress(self, sid, data): | |
104 """Fill data with position of current transfert""" | |
105 try: | |
106 file_obj = self.current_stream[sid]["file_obj"] | |
107 data["position"] = str(file_obj.tell()) | |
108 data["size"] = str(self.current_stream[sid]["size"]) | |
109 except: | |
110 pass | |
111 | |
112 def prepareToReceive(self, from_jid, sid, file_obj, size, success_cb, failure_cb): | |
113 """Called when a bytestream is imminent | |
114 @param from_jid: jid of the sender | |
115 @param id: Stream id | |
116 @param file_obj: File Object where the data will be written""" | |
117 data = self.current_stream[sid] = {} | |
118 data["from"] = from_jid | |
119 data["file_obj"] = file_obj | |
120 data["seq"] = -1 | |
121 if size: | |
122 data["size"] = size | |
123 self.host.registerProgressCB(sid, self.getProgress) | |
124 data["timer"] = reactor.callLater(TIMEOUT, self._timeOut, sid) | |
125 data["success_cb"] = success_cb | |
126 data["failure_cb"] = failure_cb | |
127 | |
128 def streamOpening(self, IQ, profile): | |
129 debug(_("IBB stream opening")) | |
130 IQ.handled=True | |
131 profile_jid, xmlstream = self.host.getJidNStream(profile) | |
132 open_elt = IQ.firstChildElement() | |
133 block_size = open_elt.getAttribute('block-size') | |
134 sid = open_elt.getAttribute('sid') | |
135 stanza = open_elt.getAttribute('stanza', 'iq') | |
136 if not sid or not block_size or int(block_size)>65535: | |
137 warning(_("malformed IBB transfert: %s" % IQ['id'])) | |
138 self.sendNotAcceptableError(IQ['id'], IQ['from'], xmlstream) | |
139 return | |
140 if not sid in self.current_stream: | |
141 warning(_("Ignoring unexpected IBB transfert: %s" % sid)) | |
142 self.sendNotAcceptableError(IQ['id'], IQ['from'], xmlstream) | |
143 return | |
144 if self.current_stream[sid]["from"] != jid.JID(IQ['from']): | |
145 warning(_("sended jid inconsistency (man in the middle attack attempt ?)")) | |
146 self.sendNotAcceptableError(IQ['id'], IQ['from'], xmlstream) | |
147 self._killId(sid, False, "PROTOCOL_ERROR") | |
148 return | |
149 | |
150 #at this stage, the session looks ok and will be accepted | |
151 | |
152 #we reset the timeout: | |
153 self.current_stream[sid]["timer"].reset(TIMEOUT) | |
154 | |
155 #we save the xmlstream, events and observer data to allow observer removal | |
156 self.current_stream[sid]["xmlstream"] = xmlstream | |
157 self.current_stream[sid]["event_data"] = event_data = (IBB_MESSAGE_DATA if stanza=='message' else IBB_IQ_DATA) % sid | |
158 self.current_stream[sid]["observer_cb"] = observer_cb = self.messageData if stanza=='message' else self.iqData | |
159 event_close = IBB_CLOSE % sid | |
160 #we now set the stream observer to look after data packet | |
161 xmlstream.addObserver(event_data, observer_cb, profile = profile) | |
162 xmlstream.addOnetimeObserver(event_close, self.streamClosing, profile = profile) | |
163 #finally, we send the accept stanza | |
164 result = domish.Element(('', 'iq')) | |
165 result['type'] = 'result' | |
166 result['id'] = IQ['id'] | |
167 result['to'] = IQ['from'] | |
168 xmlstream.send(result) | |
169 | |
170 def streamClosing(self, IQ, profile): | |
171 IQ.handled=True | |
172 debug(_("IBB stream closing")) | |
173 data_elt = IQ.firstChildElement() | |
174 sid = data_elt.getAttribute('sid') | |
175 result = domish.Element(('', 'iq')) | |
176 result['type'] = 'result' | |
177 result['id'] = IQ['id'] | |
178 result['to'] = IQ['from'] | |
179 self.current_stream[sid]["xmlstream"].send(result) | |
180 self._killId(sid, success=True) | |
181 | |
182 def iqData(self, IQ, profile): | |
183 IQ.handled=True | |
184 data_elt = IQ.firstChildElement() | |
185 | |
186 if self._manageDataElt(data_elt, 'iq', IQ['id'], jid.JID(IQ['from'])): | |
187 #and send a success answer | |
188 result = domish.Element(('', 'iq')) | |
189 result['type'] = 'result' | |
190 result['id'] = IQ['id'] | |
191 result['to'] = IQ['from'] | |
192 _jid, xmlstream = self.host.getJidNStream(profile) | |
193 xmlstream.send(result) | |
194 | |
195 def messageData(self, message_elt, profile): | |
196 data_elt = message_elt.firstChildElement() | |
197 sid = message_elt.getAttribute('id','') | |
198 self._manageDataElt(message_elt, 'message', sid, jid.JID(message_elt['from'])) | |
199 | |
200 def _manageDataElt(self, data_elt, stanza, sid, stanza_from_jid): | |
201 """Manage the data elelement (check validity and write to the file_obj) | |
202 @param data_elt: "data" domish element | |
203 @return: True if success""" | |
204 sid = data_elt.getAttribute('sid') | |
205 if sid not in self.current_stream: | |
206 error(_("Received data for an unknown session id")) | |
207 return False | |
208 xmlstream = self.current_stream[sid]["xmlstream"] | |
209 | |
210 from_jid = self.current_stream[sid]["from"] | |
211 file_obj = self.current_stream[sid]["file_obj"] | |
212 | |
213 if stanza_from_jid != from_jid: | |
214 warning(_("sended jid inconsistency (man in the middle attack attempt ?)")) | |
215 if stanza=='iq': | |
216 self.sendNotAcceptableError(sid, from_jid, xmlstream) | |
217 return False | |
218 | |
219 self.current_stream[sid]["seq"]+=1 | |
220 if int(data_elt.getAttribute("seq",-1)) != self.current_stream[sid]["seq"]: | |
221 warning(_("Sequence error")) | |
222 if stanza=='iq': | |
223 self.sendNotAcceptableError(IQ["id"], from_jid, xmlstream) | |
224 return False | |
225 | |
226 #we reset the timeout: | |
227 self.current_stream[sid]["timer"].reset(TIMEOUT) | |
228 | |
229 #we can now decode the data | |
230 try: | |
231 file_obj.write(base64.b64decode(str(data_elt))) | |
232 except TypeError: | |
233 #The base64 data is invalid | |
234 warning(_("Invalid base64 data")) | |
235 if stanza=='iq': | |
236 self.sendNotAcceptableError(IQ["id"], from_jid, xmlstream) | |
237 return False | |
238 return True | |
239 | |
240 def sendNotAcceptableError(self, iq_id, to_jid, xmlstream): | |
241 """Not acceptable error used when the stream is not expected or something is going wrong | |
242 @param iq_id: IQ id | |
243 @param to_jid: addressee | |
244 @param xmlstream: XML stream to use to send the error""" | |
245 result = domish.Element(('', 'iq')) | |
246 result['type'] = 'result' | |
247 result['id'] = iq_id | |
248 result['to'] = to_jid | |
249 error_el = result.addElement('error') | |
250 error_el['type'] = 'cancel' | |
251 error_el.addElement(('urn:ietf:params:xml:ns:xmpp-stanzas','not-acceptable')) | |
252 xmlstream.send(result) | |
253 | |
254 def startStream(self, file_obj, to_jid, sid, length, successCb, failureCb, size = None, profile='@NONE@'): | |
255 """Launch the stream workflow | |
256 @param file_obj: file_obj to send | |
257 @param to_jid: JID of the recipient | |
258 @param sid: Stream session id | |
259 @param length: number of byte to send, or None to send until the end | |
260 @param successCb: method to call when stream successfuly finished | |
261 @param failureCb: method to call when something go wrong | |
262 @param profile: %(doc_profile)s""" | |
263 if length != None: | |
264 error(_('stream length not managed yet')) | |
265 return; | |
266 profile_jid, xmlstream = self.host.getJidNStream(profile) | |
267 data = self.current_stream[sid] = {} | |
268 data["timer"] = reactor.callLater(TIMEOUT, self._timeOut, sid) | |
269 data["file_obj"] = file_obj | |
270 data["to"] = to_jid | |
271 data["success_cb"] = successCb | |
272 data["failure_cb"] = failureCb | |
273 data["xmlstream"] = xmlstream | |
274 data["block_size"] = BLOCK_SIZE | |
275 if size: | |
276 data["size"] = size | |
277 self.host.registerProgressCB(sid, self.getProgress) | |
278 iq_elt = client.IQ(xmlstream,'set') | |
279 iq_elt['from'] = profile_jid.full() | |
280 iq_elt['to'] = to_jid.full() | |
281 open_elt = iq_elt.addElement('open',NS_IBB) | |
282 open_elt['block-size'] = str(BLOCK_SIZE) | |
283 open_elt['sid'] = sid | |
284 open_elt['stanza'] = 'iq' | |
285 iq_elt.addCallback(self.iqResult, sid, 0, length) | |
286 iq_elt.send() | |
287 | |
288 def iqResult(self, sid, seq, length, iq_elt): | |
289 """Called when the result of open iq is received""" | |
290 data = self.current_stream[sid] | |
291 if iq_elt.type == "error": | |
292 warning(_("Transfer failed")) | |
293 self.terminateStream(sid, "IQ_ERROR") | |
294 return | |
295 | |
296 buffer = data["file_obj"].read(data["block_size"]) | |
297 if buffer: | |
298 next_iq_elt = client.IQ(data["xmlstream"],'set') | |
299 next_iq_elt['to'] = data["to"].full() | |
300 data_elt = next_iq_elt.addElement('data', NS_IBB) | |
301 data_elt['seq'] = str(seq) | |
302 data_elt['sid'] = sid | |
303 data_elt.addContent(base64.b64encode(buffer)) | |
304 next_iq_elt.addCallback(self.iqResult, sid, seq+1, length) | |
305 next_iq_elt.send() | |
306 else: | |
307 self.terminateStream(sid) | |
308 | |
309 def terminateStream(self, sid, failure_reason = None): | |
310 """Terminate the stream session | |
311 @param to_jid: recipient | |
312 @param sid: Session id | |
313 @param file_obj: file object used | |
314 @param xmlstream: XML stream used with this session | |
315 @param progress_cb: True if we have to remove the progress callback | |
316 @param callback: method to call after finishing | |
317 @param failure_reason: reason of the failure, or None if steam was successful""" | |
318 data = self.current_stream[sid] | |
319 iq_elt = client.IQ(data["xmlstream"],'set') | |
320 iq_elt['to'] = data["to"].full() | |
321 close_elt = iq_elt.addElement('close',NS_IBB) | |
322 close_elt['sid'] = sid | |
323 iq_elt.send() | |
324 self.host.removeProgressCB(sid) | |
325 if failure_reason: | |
326 self._killId(sid, False, failure_reason) | |
327 else: | |
328 self._killId(sid, True) | |
329 | |
330 class XEP_0047_handler(XMPPHandler): | |
331 implements(iwokkel.IDisco) | |
332 | |
333 def __init__(self,parent): | |
334 self.plugin_parent = parent | |
335 | |
336 def connectionInitialized(self): | |
337 self.xmlstream.addObserver(IBB_OPEN, self.plugin_parent.streamOpening, profile = self.parent.profile) | |
338 | |
339 def getDiscoInfo(self, requestor, target, nodeIdentifier=''): | |
340 return [disco.DiscoFeature(NS_IBB)] | |
341 | |
342 def getDiscoItems(self, requestor, target, nodeIdentifier=''): | |
343 return [] |