comparison src/plugins/plugin_xep_0047.py @ 384:785420cd63f7

plugins: In-Band Bytestreams (XEP-0047) implementation
author Goffi <goffi@goffi.org>
date Thu, 29 Sep 2011 12:05:45 +0200
parents
children 6ff50d609b16
comparison
equal deleted inserted replaced
383:98e1d44d5cd4 384:785420cd63f7
1 #!/usr/bin/python
2 # -*- coding: utf-8 -*-
3
4 """
5 SAT plugin for managing gateways (xep-0047)
6 Copyright (C) 2009, 2010, 2011 Jérôme Poisson (goffi@goffi.org)
7
8 This program is free software: you can redistribute it and/or modify
9 it under the terms of the GNU General Public License as published by
10 the Free Software Foundation, either version 3 of the License, or
11 (at your option) any later version.
12
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
17
18 You should have received a copy of the GNU General Public License
19 along with this program. If not, see <http://www.gnu.org/licenses/>.
20 """
21
22 from logging import debug, info, warning, error
23 from twisted.internet import protocol
24 from twisted.words.protocols.jabber import client, jid
25 from twisted.words.protocols.jabber import error as jab_error
26 from twisted.words.xish import domish
27 import twisted.internet.error
28 from twisted.internet import reactor
29
30 from wokkel import disco, iwokkel
31
32 from zope.interface import implements
33
34 import base64
35
36 try:
37 from twisted.words.protocols.xmlstream import XMPPHandler
38 except ImportError:
39 from wokkel.subprotocols import XMPPHandler
40
41 MESSAGE = '/message'
42 IQ = '/iq'
43 IQ_SET = '/iq[@type="set"]'
44 NS_IBB = 'http://jabber.org/protocol/ibb'
45 IBB_OPEN = IQ_SET + '/open[@xmlns="' + NS_IBB + '"]'
46 IBB_CLOSE = IQ_SET + '/close[@xmlns="' + NS_IBB + '" and @sid="%s"]'
47 IBB_IQ_DATA = IQ + '/data[@xmlns="' + NS_IBB + '" and @sid="%s"]' #we use IQ instead of IQ_SET because of a bug in Gajim
48 IBB_MESSAGE_DATA = MESSAGE + '/data[@xmlns="' + NS_IBB + '" and @sid="%s"]'
49 TIMEOUT = 60 #timeout for workflow
50 BLOCK_SIZE = 4096
51
52 PLUGIN_INFO = {
53 "name": "In-Band Bytestream Plugin",
54 "import_name": "XEP-0047",
55 "type": "XEP",
56 "protocols": ["XEP-0047"],
57 "main": "XEP_0047",
58 "handler": "yes",
59 "description": _("""Implementation of In-Band Bytestreams""")
60 }
61
62 class XEP_0047():
63 NAMESPACE = NS_IBB
64
65 def __init__(self, host):
66 info(_("In-Band Bytestreams plugin initialization"))
67 self.host = host
68 self.current_stream = {} #key: stream_id, value: data(dict)
69
70 def getHandler(self, profile):
71 return XEP_0047_handler(self)
72
73 def _timeOut(self, sid):
74 """Delecte current_stream id, called after timeout
75 @param id: id of self.current_stream"""
76 info(_("In-Band Bytestream: TimeOut reached for id %s") % sid);
77 self._killId(sid, False, "TIMEOUT")
78
79 def _killId(self, sid, success=False, failure_reason="UNKNOWN"):
80 """Delete an current_stream id, clean up associated observers
81 @param sid: id of self.current_stream"""
82 if not self.current_stream.has_key(sid):
83 warning(_("kill id called on a non existant id"))
84 return
85 if self.current_stream[sid].has_key("observer_cb"):
86 xmlstream = self.current_stream[sid]["xmlstream"]
87 xmlstream.removeObserver(self.current_stream[sid]["event_data"], self.current_stream[sid]["observer_cb"])
88 self.current_stream[sid]['timer'].cancel()
89 if self.current_stream[sid].has_key("size"):
90 self.host.removeProgressCB(sid)
91
92 file_obj = self.current_stream[sid]['file_obj']
93 success_cb = self.current_stream[sid]['success_cb']
94 failure_cb = self.current_stream[sid]['failure_cb']
95
96 del self.current_stream[sid]
97
98 if success:
99 success_cb(sid, file_obj, NS_IBB)
100 else:
101 failure_cb(sid, file_obj, NS_IBB, failure_reason)
102
103 def getProgress(self, sid, data):
104 """Fill data with position of current transfert"""
105 try:
106 file_obj = self.current_stream[sid]["file_obj"]
107 data["position"] = str(file_obj.tell())
108 data["size"] = str(self.current_stream[sid]["size"])
109 except:
110 pass
111
112 def prepareToReceive(self, from_jid, sid, file_obj, size, success_cb, failure_cb):
113 """Called when a bytestream is imminent
114 @param from_jid: jid of the sender
115 @param id: Stream id
116 @param file_obj: File Object where the data will be written"""
117 data = self.current_stream[sid] = {}
118 data["from"] = from_jid
119 data["file_obj"] = file_obj
120 data["seq"] = -1
121 if size:
122 data["size"] = size
123 self.host.registerProgressCB(sid, self.getProgress)
124 data["timer"] = reactor.callLater(TIMEOUT, self._timeOut, sid)
125 data["success_cb"] = success_cb
126 data["failure_cb"] = failure_cb
127
128 def streamOpening(self, IQ, profile):
129 debug(_("IBB stream opening"))
130 IQ.handled=True
131 profile_jid, xmlstream = self.host.getJidNStream(profile)
132 open_elt = IQ.firstChildElement()
133 block_size = open_elt.getAttribute('block-size')
134 sid = open_elt.getAttribute('sid')
135 stanza = open_elt.getAttribute('stanza', 'iq')
136 if not sid or not block_size or int(block_size)>65535:
137 warning(_("malformed IBB transfert: %s" % IQ['id']))
138 self.sendNotAcceptableError(IQ['id'], IQ['from'], xmlstream)
139 return
140 if not sid in self.current_stream:
141 warning(_("Ignoring unexpected IBB transfert: %s" % sid))
142 self.sendNotAcceptableError(IQ['id'], IQ['from'], xmlstream)
143 return
144 if self.current_stream[sid]["from"] != jid.JID(IQ['from']):
145 warning(_("sended jid inconsistency (man in the middle attack attempt ?)"))
146 self.sendNotAcceptableError(IQ['id'], IQ['from'], xmlstream)
147 self._killId(sid, False, "PROTOCOL_ERROR")
148 return
149
150 #at this stage, the session looks ok and will be accepted
151
152 #we reset the timeout:
153 self.current_stream[sid]["timer"].reset(TIMEOUT)
154
155 #we save the xmlstream, events and observer data to allow observer removal
156 self.current_stream[sid]["xmlstream"] = xmlstream
157 self.current_stream[sid]["event_data"] = event_data = (IBB_MESSAGE_DATA if stanza=='message' else IBB_IQ_DATA) % sid
158 self.current_stream[sid]["observer_cb"] = observer_cb = self.messageData if stanza=='message' else self.iqData
159 event_close = IBB_CLOSE % sid
160 #we now set the stream observer to look after data packet
161 xmlstream.addObserver(event_data, observer_cb, profile = profile)
162 xmlstream.addOnetimeObserver(event_close, self.streamClosing, profile = profile)
163 #finally, we send the accept stanza
164 result = domish.Element(('', 'iq'))
165 result['type'] = 'result'
166 result['id'] = IQ['id']
167 result['to'] = IQ['from']
168 xmlstream.send(result)
169
170 def streamClosing(self, IQ, profile):
171 IQ.handled=True
172 debug(_("IBB stream closing"))
173 data_elt = IQ.firstChildElement()
174 sid = data_elt.getAttribute('sid')
175 result = domish.Element(('', 'iq'))
176 result['type'] = 'result'
177 result['id'] = IQ['id']
178 result['to'] = IQ['from']
179 self.current_stream[sid]["xmlstream"].send(result)
180 self._killId(sid, success=True)
181
182 def iqData(self, IQ, profile):
183 IQ.handled=True
184 data_elt = IQ.firstChildElement()
185
186 if self._manageDataElt(data_elt, 'iq', IQ['id'], jid.JID(IQ['from'])):
187 #and send a success answer
188 result = domish.Element(('', 'iq'))
189 result['type'] = 'result'
190 result['id'] = IQ['id']
191 result['to'] = IQ['from']
192 _jid, xmlstream = self.host.getJidNStream(profile)
193 xmlstream.send(result)
194
195 def messageData(self, message_elt, profile):
196 data_elt = message_elt.firstChildElement()
197 sid = message_elt.getAttribute('id','')
198 self._manageDataElt(message_elt, 'message', sid, jid.JID(message_elt['from']))
199
200 def _manageDataElt(self, data_elt, stanza, sid, stanza_from_jid):
201 """Manage the data elelement (check validity and write to the file_obj)
202 @param data_elt: "data" domish element
203 @return: True if success"""
204 sid = data_elt.getAttribute('sid')
205 if sid not in self.current_stream:
206 error(_("Received data for an unknown session id"))
207 return False
208 xmlstream = self.current_stream[sid]["xmlstream"]
209
210 from_jid = self.current_stream[sid]["from"]
211 file_obj = self.current_stream[sid]["file_obj"]
212
213 if stanza_from_jid != from_jid:
214 warning(_("sended jid inconsistency (man in the middle attack attempt ?)"))
215 if stanza=='iq':
216 self.sendNotAcceptableError(sid, from_jid, xmlstream)
217 return False
218
219 self.current_stream[sid]["seq"]+=1
220 if int(data_elt.getAttribute("seq",-1)) != self.current_stream[sid]["seq"]:
221 warning(_("Sequence error"))
222 if stanza=='iq':
223 self.sendNotAcceptableError(IQ["id"], from_jid, xmlstream)
224 return False
225
226 #we reset the timeout:
227 self.current_stream[sid]["timer"].reset(TIMEOUT)
228
229 #we can now decode the data
230 try:
231 file_obj.write(base64.b64decode(str(data_elt)))
232 except TypeError:
233 #The base64 data is invalid
234 warning(_("Invalid base64 data"))
235 if stanza=='iq':
236 self.sendNotAcceptableError(IQ["id"], from_jid, xmlstream)
237 return False
238 return True
239
240 def sendNotAcceptableError(self, iq_id, to_jid, xmlstream):
241 """Not acceptable error used when the stream is not expected or something is going wrong
242 @param iq_id: IQ id
243 @param to_jid: addressee
244 @param xmlstream: XML stream to use to send the error"""
245 result = domish.Element(('', 'iq'))
246 result['type'] = 'result'
247 result['id'] = iq_id
248 result['to'] = to_jid
249 error_el = result.addElement('error')
250 error_el['type'] = 'cancel'
251 error_el.addElement(('urn:ietf:params:xml:ns:xmpp-stanzas','not-acceptable'))
252 xmlstream.send(result)
253
254 def startStream(self, file_obj, to_jid, sid, length, successCb, failureCb, size = None, profile='@NONE@'):
255 """Launch the stream workflow
256 @param file_obj: file_obj to send
257 @param to_jid: JID of the recipient
258 @param sid: Stream session id
259 @param length: number of byte to send, or None to send until the end
260 @param successCb: method to call when stream successfuly finished
261 @param failureCb: method to call when something go wrong
262 @param profile: %(doc_profile)s"""
263 if length != None:
264 error(_('stream length not managed yet'))
265 return;
266 profile_jid, xmlstream = self.host.getJidNStream(profile)
267 data = self.current_stream[sid] = {}
268 data["timer"] = reactor.callLater(TIMEOUT, self._timeOut, sid)
269 data["file_obj"] = file_obj
270 data["to"] = to_jid
271 data["success_cb"] = successCb
272 data["failure_cb"] = failureCb
273 data["xmlstream"] = xmlstream
274 data["block_size"] = BLOCK_SIZE
275 if size:
276 data["size"] = size
277 self.host.registerProgressCB(sid, self.getProgress)
278 iq_elt = client.IQ(xmlstream,'set')
279 iq_elt['from'] = profile_jid.full()
280 iq_elt['to'] = to_jid.full()
281 open_elt = iq_elt.addElement('open',NS_IBB)
282 open_elt['block-size'] = str(BLOCK_SIZE)
283 open_elt['sid'] = sid
284 open_elt['stanza'] = 'iq'
285 iq_elt.addCallback(self.iqResult, sid, 0, length)
286 iq_elt.send()
287
288 def iqResult(self, sid, seq, length, iq_elt):
289 """Called when the result of open iq is received"""
290 data = self.current_stream[sid]
291 if iq_elt.type == "error":
292 warning(_("Transfer failed"))
293 self.terminateStream(sid, "IQ_ERROR")
294 return
295
296 buffer = data["file_obj"].read(data["block_size"])
297 if buffer:
298 next_iq_elt = client.IQ(data["xmlstream"],'set')
299 next_iq_elt['to'] = data["to"].full()
300 data_elt = next_iq_elt.addElement('data', NS_IBB)
301 data_elt['seq'] = str(seq)
302 data_elt['sid'] = sid
303 data_elt.addContent(base64.b64encode(buffer))
304 next_iq_elt.addCallback(self.iqResult, sid, seq+1, length)
305 next_iq_elt.send()
306 else:
307 self.terminateStream(sid)
308
309 def terminateStream(self, sid, failure_reason = None):
310 """Terminate the stream session
311 @param to_jid: recipient
312 @param sid: Session id
313 @param file_obj: file object used
314 @param xmlstream: XML stream used with this session
315 @param progress_cb: True if we have to remove the progress callback
316 @param callback: method to call after finishing
317 @param failure_reason: reason of the failure, or None if steam was successful"""
318 data = self.current_stream[sid]
319 iq_elt = client.IQ(data["xmlstream"],'set')
320 iq_elt['to'] = data["to"].full()
321 close_elt = iq_elt.addElement('close',NS_IBB)
322 close_elt['sid'] = sid
323 iq_elt.send()
324 self.host.removeProgressCB(sid)
325 if failure_reason:
326 self._killId(sid, False, failure_reason)
327 else:
328 self._killId(sid, True)
329
330 class XEP_0047_handler(XMPPHandler):
331 implements(iwokkel.IDisco)
332
333 def __init__(self,parent):
334 self.plugin_parent = parent
335
336 def connectionInitialized(self):
337 self.xmlstream.addObserver(IBB_OPEN, self.plugin_parent.streamOpening, profile = self.parent.profile)
338
339 def getDiscoInfo(self, requestor, target, nodeIdentifier=''):
340 return [disco.DiscoFeature(NS_IBB)]
341
342 def getDiscoItems(self, requestor, target, nodeIdentifier=''):
343 return []