comparison src/plugins/plugin_xep_0096.py @ 386:deeebf697d9a

plugins: plugin XEP-0096 update, use of XEP-0047 (IBB)
author Goffi <goffi@goffi.org>
date Thu, 29 Sep 2011 12:09:31 +0200
parents f964dcec1611
children c34fd9d6242e
comparison
equal deleted inserted replaced
385:41fdaeb005bc 386:deeebf697d9a
17 17
18 You should have received a copy of the GNU General Public License 18 You should have received a copy of the GNU General Public License
19 along with this program. If not, see <http://www.gnu.org/licenses/>. 19 along with this program. If not, see <http://www.gnu.org/licenses/>.
20 """ 20 """
21 21
22 from logging import debug, info, error 22 from logging import debug, info, warning, error
23 from twisted.words.xish import domish 23 from twisted.words.xish import domish
24 from twisted.internet import protocol 24 from twisted.internet import protocol
25 from twisted.words.protocols.jabber import client, jid 25 from twisted.words.protocols.jabber import client, jid
26 from twisted.words.protocols.jabber import error as jab_error 26 from twisted.words.protocols.jabber import error as jab_error
27 import os.path 27 import os, os.path
28 from twisted.internet import reactor #FIXME best way ??? 28 from twisted.internet import reactor
29 import pdb 29 import pdb
30 30
31 from zope.interface import implements 31 from zope.interface import implements
32 32
33 try: 33 from wokkel import disco, iwokkel, data_form
34 from twisted.words.protocols.xmlstream import XMPPHandler
35 except ImportError:
36 from wokkel.subprotocols import XMPPHandler
37
38 from wokkel import disco, iwokkel
39 34
40 IQ_SET = '/iq[@type="set"]' 35 IQ_SET = '/iq[@type="set"]'
41 NS_SI = 'http://jabber.org/protocol/si' 36 NS_SI = 'http://jabber.org/protocol/si'
42 SI_REQUEST = IQ_SET + '/si[@xmlns="' + NS_SI + '"]' 37 SI_REQUEST = IQ_SET + '/si[@xmlns="' + NS_SI + '"]'
38 PROFILE_NAME = "file-transfer"
39 PROFILE = "http://jabber.org/protocol/si/profile/" + PROFILE_NAME
43 40
44 PLUGIN_INFO = { 41 PLUGIN_INFO = {
45 "name": "XEP 0096 Plugin", 42 "name": "XEP 0096 Plugin",
46 "import_name": "XEP-0096", 43 "import_name": "XEP-0096",
47 "type": "XEP", 44 "type": "XEP",
48 "protocols": ["XEP-0096"], 45 "protocols": ["XEP-0096"],
49 "dependencies": ["XEP-0065"], 46 "dependencies": ["XEP-0020", "XEP-0095", "XEP-0065", "XEP-0047"],
50 "main": "XEP_0096", 47 "main": "XEP_0096",
51 "handler": "yes", 48 "handler": "no",
52 "description": _("""Implementation of SI File Transfert""") 49 "description": _("""Implementation of SI File Transfert""")
53 } 50 }
54 51
55 class XEP_0096(): 52 class XEP_0096():
56 53
57 def __init__(self, host): 54 def __init__(self, host):
58 info(_("Plugin XEP_0096 initialization")) 55 info(_("Plugin XEP_0096 initialization"))
59 self.host = host 56 self.host = host
60 self._waiting_for_approval = {} 57 self._waiting_for_approval = {} #key = id, value = [transfert data, IdelayedCall Reactor timeout,
61 host.bridge.addMethod("sendFile", ".plugin", in_sign='sss', out_sign='s', method=self.sendFile) 58 # current stream method, [failed stream methods], profile]
59 self.managed_stream_m = [#self.host.plugins["XEP-0065"].NS_BS,
60 self.host.plugins["XEP-0047"].NAMESPACE] #Stream methods managed
61 self.host.plugins["XEP-0095"].registerSIProfile(PROFILE_NAME, self.transfertRequest)
62 host.bridge.addMethod("sendFile", ".plugin", in_sign='ssa{ss}s', out_sign='s', method=self.sendFile)
63
64 def _kill_id(self, approval_id):
65 """Delete a waiting_for_approval id, called after timeout
66 @param approval_id: id of _waiting_for_approval"""
67 info(_("SI File Transfert: TimeOut reached for id %s") % approval_id);
68 try:
69 del self._waiting_for_approval[approval_id]
70 except KeyError:
71 warning(_("kill id called on a non existant approval id"))
62 72
63 def getHandler(self, profile): 73 def transfertRequest(self, from_jid, si_id, si_mime_type, si_el, profile):
64 return XEP_0096_handler(self) 74 """Called when a file transfert is requested
65 75 @param from_jid: jid of the sender
66 def xep_96(self, IQ, profile): 76 @param si_id: Stream Initiation session id
67 info (_("XEP-0096 management")) 77 @param si_mime_type: Mime type of the file (or default "application/octet-stream" if unknown)
68 IQ.handled=True 78 @param si_el: domish.Element of the request
69 SI_elem = IQ.firstChildElement() 79 @param profile: %(doc_profile)s"""
70 debug(SI_elem.toXml()) 80 info (_("XEP-0096 file transfert requested"))
81 debug(si_el.toXml())
71 filename = "" 82 filename = ""
72 file_size = "" 83 file_size = ""
73 for element in SI_elem.elements(): 84 file_date = None
74 if element.name == "file": 85 file_hash = None
75 info (_("File proposed: name=[%(name)s] size=%(size)s") % {'name':element['name'], 'size':element['size']}) 86 file_desc = ""
76 filename = element["name"] 87 can_range = False
77 file_size = element["size"] 88 file_elts = filter(lambda elt: elt.name == 'file', si_el.elements())
78 elif element.name == "feature": 89 feature_elts = self.host.plugins["XEP-0020"].getFeatureElt(si_el)
79 from_jid = IQ["from"] 90
80 self._waiting_for_approval[IQ["id"]] = (element, from_jid, file_size, profile) 91 if file_elts:
81 data={ "filename":filename, "from":from_jid, "size":file_size } 92 file_el = file_elts[0]
82 self.host.askConfirmation(IQ["id"], "FILE_TRANSFERT", data, self.confirmationCB) 93 filename = file_el["name"]
83 94 file_size = file_el["size"]
84 def confirmationCB(self, id, accepted, data): 95 file_date = file_el.getAttribute("date", "")
85 """Called on confirmation answer""" 96 file_hash = file_el.getAttribute("hash", "")
97 info (_("File proposed: name=[%(name)s] size=%(size)s") % {'name':filename, 'size':file_size})
98 for file_child_el in file_el.elements():
99 if file_child_el.name == "desc":
100 file_desc = unicode(file_child_el)
101 elif file_child_el.name == "range":
102 can_range = True
103 else:
104 warning(_("No file element found"))
105 self.host.plugins["XEP-0095"].sendBadRequestError(si_id, from_jid, profile)
106 return
107
108 if feature_elts:
109 feature_el = feature_elts[0]
110 form = data_form.Form.fromElement(feature_el.firstChildElement())
111 try:
112 stream_method = self.host.plugins["XEP-0020"].negociate(feature_el, 'stream-method',self.managed_stream_m)
113 except KeyError:
114 warning(_("No stream method found"))
115 self.host.plugins["XEP-0095"].sendBadRequestError(si_id, from_jid, profile)
116 return
117 if not stream_method:
118 warning(_("Can't find a valid stream method"))
119 self.host.plugins["XEP-0095"].sendFailedError(si_id, from_jid, profile)
120 return
121 else:
122 warning(_("No feature element found"))
123 self.host.plugins["XEP-0095"].sendBadRequestError(si_id, from_jid, profile)
124 return
125
126 #if we are here, the transfert can start, we just need user's agreement
127 data={ "filename":filename, "from":from_jid, "size":file_size, "date":file_date, "hash":file_hash, "desc":file_desc, "can_range": str(can_range) }
128 self._waiting_for_approval[si_id] = [data, reactor.callLater(300, self._kill_id, si_id), stream_method, [], profile]
129
130 self.host.askConfirmation(si_id, "FILE_TRANSFERT", data, self.confirmationCB)
131
132
133 def _getFileObject(self, dest_path, can_range = False):
134 """Open file, put file pointer to the end if the file if needed
135 @param dest_path: path of the destination file
136 @param can_range: True if the file pointer can be moved
137 @return: File Object"""
138 return open(dest_path, "ab" if can_range else "wb")
139
140 def confirmationCB(self, id, accepted, frontend_data):
141 """Called on confirmation answer
142 @param id: file transfert session id
143 @param accepted: True if file transfert is accepted
144 @param frontend_data: data sent by frontend"""
145 data, timeout, stream_method, failed_methods, profile = self._waiting_for_approval[id]
146 can_range = data['can_range'] == "True"
147 range_offset = 0
86 if accepted: 148 if accepted:
87 data['size'] = self._waiting_for_approval[id][2] 149 if timeout.active():
88 self.host.plugins["XEP-0065"].setData(data, id) 150 timeout.cancel()
89 self.approved(id) 151 try:
152 dest_path = frontend_data['dest_path']
153 except KeyError:
154 error(_('dest path not found in frontend_data'))
155 del(self._waiting_for_approval[id])
156 return
157 if stream_method == self.host.plugins["XEP-0065"].NAMESPACE:
158 self.host.plugins["XEP-0065"].setData(data, id)
159 elif stream_method == self.host.plugins["XEP-0047"].NAMESPACE:
160 file_obj = self._getFileObject(dest_path, can_range)
161 range_offset = file_obj.tell()
162 self.host.plugins["XEP-0047"].prepareToReceive(jid.JID(data['from']), id, file_obj, int(data["size"]), self._transferSucceeded, self._transferFailed)
163 else:
164 error(_("Unknown stream method, this should not happen at this stage, cancelling transfert"))
165 del(self._waiting_for_approval[id])
166 return
167
168 #we can send the iq result
169 feature_elt = self.host.plugins["XEP-0020"].chooseOption({'stream-method':stream_method})
170 misc_elts = []
171 misc_elts.append(domish.Element((PROFILE, "file")))
172 if can_range:
173 range_elt = domish.Element(('', "range"))
174 range_elt['offset'] = str(range_offset)
175 #TODO: manage range length
176 misc_elts.append(range_elt)
177 self.host.plugins["XEP-0095"].acceptStream(id, data['from'], feature_elt, misc_elts, profile)
90 else: 178 else:
91 debug (_("Transfert [%s] refused"), id) 179 debug (_("Transfert [%s] refused"), id)
180 self.host.plugins["XEP-0095"].sendRejectedError (id, data['from'], profile=profile)
92 del(self._waiting_for_approval[id]) 181 del(self._waiting_for_approval[id])
93 182
94 def approved(self, id): 183 def _transferSucceeded(self, sid, file_obj, stream_method):
95 """must be called when a file transfert has be accepted by client""" 184 """Called by the stream method when transfert successfuly finished
96 debug (_("Transfert [%s] accepted"), id) 185 @param id: stream id"""
97 186 file_obj.close()
98 if ( not self._waiting_for_approval.has_key(id) ): 187 info(_('Transfert %s successfuly finished') % sid)
99 error (_("Approved unknow id !")) 188 del(self._waiting_for_approval[sid])
100 #TODO: manage this (maybe approved by several frontends) 189
101 else: 190 def _transferFailed(self, sid, file_obj, stream_method, reason):
102 element, from_id, size, profile = self._waiting_for_approval[id] 191 """Called when something went wrong with the transfert
103 del(self._waiting_for_approval[id]) 192 @param id: stream id
104 self.negociate(element, id, from_id, profile) 193 @param reason: can be TIMEOUT, IO_ERROR, PROTOCOL_ERROR"""
105 194 data, timeout, stream_method, failed_methods, profile = self._waiting_for_approval[sid]
106 def negociate(self, feat_elem, id, to_jid, profile): 195 warning(_('Transfert %(id)s failed with stream method %(s_method)s') % { 'id': sid,
107 #TODO: put this in a plugin 196 's_method': stream_method })
108 #FIXME: over ultra mega ugly, need to be generic 197 filepath = file_obj.name
109 client = self.host.getClient(profile) 198 file_obj.close()
110 assert(client) 199 os.remove(filepath)
111 info (_("Feature negociation")) 200 #TODO: session remenber (within a time limit) when a stream method fail, and avoid that stream method with full jid for the rest of the session
112 data = feat_elem.firstChildElement() 201 warning(_("All stream methods failed, can't transfert the file"))
113 field = data.firstChildElement() 202 self.host.plugins["XEP-0095"].sendFailedError(id, data['from'], profile)
114 #FIXME: several options ! Q&D code for test only 203 del(self._waiting_for_approval[id])
115 option = field.firstChildElement() 204
116 value = option.firstChildElement() 205 def fileCb(self, profile, filepath, sid, size, IQ):
117 if unicode(value) == "http://jabber.org/protocol/bytestreams": 206 if IQ['type'] == "error":
118 #ugly, as usual, need to be entirely rewritten (just for test !) 207 stanza_err = jab_error.exceptionFromStanza(IQ)
119 result = domish.Element(('', 'iq')) 208 if stanza_err.code == '403' and stanza_err.condition == 'forbidden':
120 result['type'] = 'result' 209 debug(_("File transfert refused by %s") % IQ['from'])
121 result['id'] = id 210 self.host.bridge.newAlert(_("The contact %s refused your file") % IQ['from'], _("File refused"), "INFO", profile)
122 result['to'] = to_jid 211 else:
123 si = result.addElement('si', 'http://jabber.org/protocol/si') 212 warning(_("Error during file transfert with %s") % IQ['from'])
124 file = si.addElement('file', 'http://jabber.org/protocol/si/profile/file-transfer') 213 self.host.bridge.newAlert(_("Something went wrong during the file transfer session intialisation with %s") % IQ['from'], _("File transfer error"), "ERROR", profile)
125 feature = si.addElement('feature', 'http://jabber.org/protocol/feature-neg') 214 return
126 x = feature.addElement('x', 'jabber:x:data') 215
127 x['type'] = 'submit' 216 si_elt = IQ.firstChildElement()
128 field = x.addElement('field') 217
129 field['var'] = 'stream-method' 218 if IQ['type'] != "result" or not si_elt or si_elt.name != "si":
130 value = field.addElement('value') 219 error(_("Protocol error during file transfer"))
131 value.addContent('http://jabber.org/protocol/bytestreams') 220 return
132 client.xmlstream.send(result) 221
133 222 feature_elts = self.host.plugins["XEP-0020"].getFeatureElt(si_elt)
134 def fileCB(self, answer, xmlstream, current_jid): 223 if not feature_elts:
135 if answer['type']=="result": #FIXME FIXME FIXME ugly ugly ugly ! and temp FIXME FIXME FIXME 224 warning(_("No feature element"))
225 return
226
227 choosed_options = self.host.plugins["XEP-0020"].getChoosedOptions(feature_elts[0])
228 try:
229 stream_method = choosed_options["stream-method"]
230 except KeyError:
231 warning(_("No stream method choosed"))
232 return
233
234 range_offset = 0
235 range_length = None
236 range_elts = filter(lambda elt: elt.name == 'range', si_elt.elements())
237 if range_elts:
238 range_elt = range_elts[0]
239 range_offset = range_elt.getAttribute("offset", 0)
240 range_length = range_elt.getAttribute("length")
241
242 if stream_method == self.host.plugins["XEP-0065"].NAMESPACE:
136 info("SENDING UGLY ANSWER") 243 info("SENDING UGLY ANSWER")
137 offer=client.IQ(xmlstream,'set') 244 """offer=client.IQ(xmlstream,'set')
138 offer["from"]=current_jid.full() 245 offer["from"]=current_jid.full()
139 offer["to"]=answer['from'] 246 offer["to"]=answer['from']
140 query=offer.addElement('query', 'http://jabber.org/protocol/bytestreams') 247 query=offer.addElement('query', 'http://jabber.org/protocol/ibb')
248 #query=offer.addElement('query', 'http://jabber.org/protocol/bytestreams')
141 query['mode']='tcp' 249 query['mode']='tcp'
142 streamhost=query.addElement('streamhost') 250 streamhost=query.addElement('streamhost')
143 streamhost['host']=self.host.memory.getParamA("IP", "File Transfert") 251 streamhost['host']=self.host.memory.getParamA("IP", "File Transfert")
144 streamhost['port']=self.host.memory.getParamA("Port", "File Transfert") 252 streamhost['port']=self.host.memory.getParamA("Port", "File Transfert")
145 streamhost['jid']=current_jid.full() 253 streamhost['jid']=current_jid.full()
146 offer.send() 254 offer.send()"""
147 255 elif stream_method == self.host.plugins["XEP-0047"].NAMESPACE:
148 def sendFile(self, to, filepath, profile_key='@DEFAULT@'): 256 file_obj = open(filepath, 'r')
257 if range_offset:
258 file_obj.seek(range_offset)
259 self.host.plugins["XEP-0047"].startStream(file_obj, jid.JID(IQ['from']), sid, range_length, self.sendSuccessCb, self.sendFailureCb, size, profile)
260 else:
261 warning(_("Invalid stream method received"))
262
263 def sendFile(self, to_jid, filepath, data={}, profile_key='@DEFAULT@'):
149 """send a file using XEP-0096 264 """send a file using XEP-0096
150 Return an unique id to identify the transfert 265 @to_jid: recipient
266 @filepath: absolute path to the file to send
267 @data: dictionnary with the optional following keys:
268 - "description": description of the file
269 @param profile_key: %(doc_profile_key)s
270 @return: an unique id to identify the transfert
151 """ 271 """
152 current_jid, xmlstream = self.host.getJidNStream(profile_key) 272 profile = self.host.memory.getProfileName(profile_key)
153 if not xmlstream: 273 if not profile:
154 error (_('Asking for an non-existant or not connected profile')) 274 warning(_("Trying to send a file from an unknown profile"))
155 return "" 275 return ""
156 debug ("sendfile (%s) to %s", filepath, to ) 276 feature_elt = self.host.plugins["XEP-0020"].proposeFeatures({'stream-method': self.managed_stream_m})
157 print type(filepath), type(to) 277
278 #self.host.plugins["XEP-0065"].sendFile(offer["id"], filepath, str(statinfo.st_size))
279
280 file_transfer_elts = []
158 281
159 statinfo = os.stat(filepath) 282 statinfo = os.stat(filepath)
160 283 file_elt = domish.Element((PROFILE, 'file'))
161 offer=client.IQ(xmlstream,'set') 284 file_elt['name']=os.path.basename(filepath)
162 debug ("Transfert ID: %s", offer["id"]) 285 size = file_elt['size']=str(statinfo.st_size)
163 286 file_transfer_elts.append(file_elt)
164 self.host.plugins["XEP-0065"].sendFile(offer["id"], filepath, str(statinfo.st_size)) 287
165 288 file_transfer_elts.append(domish.Element((None,'range')))
166 offer["from"]=current_jid.full() 289
167 offer["to"]=jid.JID(to).full() 290 sid, offer = self.host.plugins["XEP-0095"].proposeStream(jid.JID(to_jid), PROFILE, feature_elt, file_transfer_elts, profile_key = profile)
168 si=offer.addElement('si','http://jabber.org/protocol/si') 291 offer.addCallback(self.fileCb, profile, filepath, sid, size)
169 si["mime-type"]='text/plain' 292 return sid
170 si["profile"]='http://jabber.org/protocol/si/profile/file-transfer' 293
171 file = si.addElement('file', 'http://jabber.org/protocol/si/profile/file-transfer') 294
172 file['name']=os.path.basename(filepath) 295 def sendSuccessCb(self, sid, file_obj, stream_method):
173 file['size']=str(statinfo.st_size) 296 info(_('Transfer %s successfuly finished') % sid)
174 297 file_obj.close()
175 ### 298
176 # FIXME: Ugly temporary hard coded implementation of XEP-0020 & XEP-0004, 299 def sendFailureCb(self, sid, file_obj, stream_method, reason):
177 # Need to be recoded elsewhere in a more generic way 300 file_obj.close()
178 ### 301 warning(_('Transfert %(id)s failed with stream method %(s_method)s') % { 'id': sid, s_method: stream_method })
179
180 feature=si.addElement('feature', "http://jabber.org/protocol/feature-neg")
181 x=feature.addElement('x', "jabber:x:data")
182 x['type']='form'
183 field=x.addElement('field')
184 field['type']='list-single'
185 field['var']='stream-method'
186 option = field.addElement('option')
187 value = option.addElement('value', content='http://jabber.org/protocol/bytestreams')
188
189 offer.addCallback(self.fileCB, current_jid = current_jid, xmlstream = xmlstream)
190 offer.send()
191 return offer["id"] #XXX: using IQ id as file transfert id seems OK as IQ id are required
192
193 class XEP_0096_handler(XMPPHandler):
194 implements(iwokkel.IDisco)
195
196 def __init__(self, plugin_parent):
197 self.plugin_parent = plugin_parent
198 self.host = plugin_parent.host
199
200 def connectionInitialized(self):
201 self.xmlstream.addObserver(SI_REQUEST, self.plugin_parent.xep_96, profile = self.parent.profile)
202
203 def getDiscoInfo(self, requestor, target, nodeIdentifier=''):
204 return [disco.DiscoFeature(NS_SI)]
205
206 def getDiscoItems(self, requestor, target, nodeIdentifier=''):
207 return []
208