Mercurial > libervia-backend
comparison src/plugins/plugin_xep_0096.py @ 386:deeebf697d9a
plugins: plugin XEP-0096 update, use of XEP-0047 (IBB)
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 29 Sep 2011 12:09:31 +0200 |
parents | f964dcec1611 |
children | c34fd9d6242e |
comparison
equal
deleted
inserted
replaced
385:41fdaeb005bc | 386:deeebf697d9a |
---|---|
17 | 17 |
18 You should have received a copy of the GNU General Public License | 18 You should have received a copy of the GNU General Public License |
19 along with this program. If not, see <http://www.gnu.org/licenses/>. | 19 along with this program. If not, see <http://www.gnu.org/licenses/>. |
20 """ | 20 """ |
21 | 21 |
22 from logging import debug, info, error | 22 from logging import debug, info, warning, error |
23 from twisted.words.xish import domish | 23 from twisted.words.xish import domish |
24 from twisted.internet import protocol | 24 from twisted.internet import protocol |
25 from twisted.words.protocols.jabber import client, jid | 25 from twisted.words.protocols.jabber import client, jid |
26 from twisted.words.protocols.jabber import error as jab_error | 26 from twisted.words.protocols.jabber import error as jab_error |
27 import os.path | 27 import os, os.path |
28 from twisted.internet import reactor #FIXME best way ??? | 28 from twisted.internet import reactor |
29 import pdb | 29 import pdb |
30 | 30 |
31 from zope.interface import implements | 31 from zope.interface import implements |
32 | 32 |
33 try: | 33 from wokkel import disco, iwokkel, data_form |
34 from twisted.words.protocols.xmlstream import XMPPHandler | |
35 except ImportError: | |
36 from wokkel.subprotocols import XMPPHandler | |
37 | |
38 from wokkel import disco, iwokkel | |
39 | 34 |
40 IQ_SET = '/iq[@type="set"]' | 35 IQ_SET = '/iq[@type="set"]' |
41 NS_SI = 'http://jabber.org/protocol/si' | 36 NS_SI = 'http://jabber.org/protocol/si' |
42 SI_REQUEST = IQ_SET + '/si[@xmlns="' + NS_SI + '"]' | 37 SI_REQUEST = IQ_SET + '/si[@xmlns="' + NS_SI + '"]' |
38 PROFILE_NAME = "file-transfer" | |
39 PROFILE = "http://jabber.org/protocol/si/profile/" + PROFILE_NAME | |
43 | 40 |
44 PLUGIN_INFO = { | 41 PLUGIN_INFO = { |
45 "name": "XEP 0096 Plugin", | 42 "name": "XEP 0096 Plugin", |
46 "import_name": "XEP-0096", | 43 "import_name": "XEP-0096", |
47 "type": "XEP", | 44 "type": "XEP", |
48 "protocols": ["XEP-0096"], | 45 "protocols": ["XEP-0096"], |
49 "dependencies": ["XEP-0065"], | 46 "dependencies": ["XEP-0020", "XEP-0095", "XEP-0065", "XEP-0047"], |
50 "main": "XEP_0096", | 47 "main": "XEP_0096", |
51 "handler": "yes", | 48 "handler": "no", |
52 "description": _("""Implementation of SI File Transfert""") | 49 "description": _("""Implementation of SI File Transfert""") |
53 } | 50 } |
54 | 51 |
55 class XEP_0096(): | 52 class XEP_0096(): |
56 | 53 |
57 def __init__(self, host): | 54 def __init__(self, host): |
58 info(_("Plugin XEP_0096 initialization")) | 55 info(_("Plugin XEP_0096 initialization")) |
59 self.host = host | 56 self.host = host |
60 self._waiting_for_approval = {} | 57 self._waiting_for_approval = {} #key = id, value = [transfert data, IdelayedCall Reactor timeout, |
61 host.bridge.addMethod("sendFile", ".plugin", in_sign='sss', out_sign='s', method=self.sendFile) | 58 # current stream method, [failed stream methods], profile] |
59 self.managed_stream_m = [#self.host.plugins["XEP-0065"].NS_BS, | |
60 self.host.plugins["XEP-0047"].NAMESPACE] #Stream methods managed | |
61 self.host.plugins["XEP-0095"].registerSIProfile(PROFILE_NAME, self.transfertRequest) | |
62 host.bridge.addMethod("sendFile", ".plugin", in_sign='ssa{ss}s', out_sign='s', method=self.sendFile) | |
63 | |
64 def _kill_id(self, approval_id): | |
65 """Delete a waiting_for_approval id, called after timeout | |
66 @param approval_id: id of _waiting_for_approval""" | |
67 info(_("SI File Transfert: TimeOut reached for id %s") % approval_id); | |
68 try: | |
69 del self._waiting_for_approval[approval_id] | |
70 except KeyError: | |
71 warning(_("kill id called on a non existant approval id")) | |
62 | 72 |
63 def getHandler(self, profile): | 73 def transfertRequest(self, from_jid, si_id, si_mime_type, si_el, profile): |
64 return XEP_0096_handler(self) | 74 """Called when a file transfert is requested |
65 | 75 @param from_jid: jid of the sender |
66 def xep_96(self, IQ, profile): | 76 @param si_id: Stream Initiation session id |
67 info (_("XEP-0096 management")) | 77 @param si_mime_type: Mime type of the file (or default "application/octet-stream" if unknown) |
68 IQ.handled=True | 78 @param si_el: domish.Element of the request |
69 SI_elem = IQ.firstChildElement() | 79 @param profile: %(doc_profile)s""" |
70 debug(SI_elem.toXml()) | 80 info (_("XEP-0096 file transfert requested")) |
81 debug(si_el.toXml()) | |
71 filename = "" | 82 filename = "" |
72 file_size = "" | 83 file_size = "" |
73 for element in SI_elem.elements(): | 84 file_date = None |
74 if element.name == "file": | 85 file_hash = None |
75 info (_("File proposed: name=[%(name)s] size=%(size)s") % {'name':element['name'], 'size':element['size']}) | 86 file_desc = "" |
76 filename = element["name"] | 87 can_range = False |
77 file_size = element["size"] | 88 file_elts = filter(lambda elt: elt.name == 'file', si_el.elements()) |
78 elif element.name == "feature": | 89 feature_elts = self.host.plugins["XEP-0020"].getFeatureElt(si_el) |
79 from_jid = IQ["from"] | 90 |
80 self._waiting_for_approval[IQ["id"]] = (element, from_jid, file_size, profile) | 91 if file_elts: |
81 data={ "filename":filename, "from":from_jid, "size":file_size } | 92 file_el = file_elts[0] |
82 self.host.askConfirmation(IQ["id"], "FILE_TRANSFERT", data, self.confirmationCB) | 93 filename = file_el["name"] |
83 | 94 file_size = file_el["size"] |
84 def confirmationCB(self, id, accepted, data): | 95 file_date = file_el.getAttribute("date", "") |
85 """Called on confirmation answer""" | 96 file_hash = file_el.getAttribute("hash", "") |
97 info (_("File proposed: name=[%(name)s] size=%(size)s") % {'name':filename, 'size':file_size}) | |
98 for file_child_el in file_el.elements(): | |
99 if file_child_el.name == "desc": | |
100 file_desc = unicode(file_child_el) | |
101 elif file_child_el.name == "range": | |
102 can_range = True | |
103 else: | |
104 warning(_("No file element found")) | |
105 self.host.plugins["XEP-0095"].sendBadRequestError(si_id, from_jid, profile) | |
106 return | |
107 | |
108 if feature_elts: | |
109 feature_el = feature_elts[0] | |
110 form = data_form.Form.fromElement(feature_el.firstChildElement()) | |
111 try: | |
112 stream_method = self.host.plugins["XEP-0020"].negociate(feature_el, 'stream-method',self.managed_stream_m) | |
113 except KeyError: | |
114 warning(_("No stream method found")) | |
115 self.host.plugins["XEP-0095"].sendBadRequestError(si_id, from_jid, profile) | |
116 return | |
117 if not stream_method: | |
118 warning(_("Can't find a valid stream method")) | |
119 self.host.plugins["XEP-0095"].sendFailedError(si_id, from_jid, profile) | |
120 return | |
121 else: | |
122 warning(_("No feature element found")) | |
123 self.host.plugins["XEP-0095"].sendBadRequestError(si_id, from_jid, profile) | |
124 return | |
125 | |
126 #if we are here, the transfert can start, we just need user's agreement | |
127 data={ "filename":filename, "from":from_jid, "size":file_size, "date":file_date, "hash":file_hash, "desc":file_desc, "can_range": str(can_range) } | |
128 self._waiting_for_approval[si_id] = [data, reactor.callLater(300, self._kill_id, si_id), stream_method, [], profile] | |
129 | |
130 self.host.askConfirmation(si_id, "FILE_TRANSFERT", data, self.confirmationCB) | |
131 | |
132 | |
133 def _getFileObject(self, dest_path, can_range = False): | |
134 """Open file, put file pointer to the end if the file if needed | |
135 @param dest_path: path of the destination file | |
136 @param can_range: True if the file pointer can be moved | |
137 @return: File Object""" | |
138 return open(dest_path, "ab" if can_range else "wb") | |
139 | |
140 def confirmationCB(self, id, accepted, frontend_data): | |
141 """Called on confirmation answer | |
142 @param id: file transfert session id | |
143 @param accepted: True if file transfert is accepted | |
144 @param frontend_data: data sent by frontend""" | |
145 data, timeout, stream_method, failed_methods, profile = self._waiting_for_approval[id] | |
146 can_range = data['can_range'] == "True" | |
147 range_offset = 0 | |
86 if accepted: | 148 if accepted: |
87 data['size'] = self._waiting_for_approval[id][2] | 149 if timeout.active(): |
88 self.host.plugins["XEP-0065"].setData(data, id) | 150 timeout.cancel() |
89 self.approved(id) | 151 try: |
152 dest_path = frontend_data['dest_path'] | |
153 except KeyError: | |
154 error(_('dest path not found in frontend_data')) | |
155 del(self._waiting_for_approval[id]) | |
156 return | |
157 if stream_method == self.host.plugins["XEP-0065"].NAMESPACE: | |
158 self.host.plugins["XEP-0065"].setData(data, id) | |
159 elif stream_method == self.host.plugins["XEP-0047"].NAMESPACE: | |
160 file_obj = self._getFileObject(dest_path, can_range) | |
161 range_offset = file_obj.tell() | |
162 self.host.plugins["XEP-0047"].prepareToReceive(jid.JID(data['from']), id, file_obj, int(data["size"]), self._transferSucceeded, self._transferFailed) | |
163 else: | |
164 error(_("Unknown stream method, this should not happen at this stage, cancelling transfert")) | |
165 del(self._waiting_for_approval[id]) | |
166 return | |
167 | |
168 #we can send the iq result | |
169 feature_elt = self.host.plugins["XEP-0020"].chooseOption({'stream-method':stream_method}) | |
170 misc_elts = [] | |
171 misc_elts.append(domish.Element((PROFILE, "file"))) | |
172 if can_range: | |
173 range_elt = domish.Element(('', "range")) | |
174 range_elt['offset'] = str(range_offset) | |
175 #TODO: manage range length | |
176 misc_elts.append(range_elt) | |
177 self.host.plugins["XEP-0095"].acceptStream(id, data['from'], feature_elt, misc_elts, profile) | |
90 else: | 178 else: |
91 debug (_("Transfert [%s] refused"), id) | 179 debug (_("Transfert [%s] refused"), id) |
180 self.host.plugins["XEP-0095"].sendRejectedError (id, data['from'], profile=profile) | |
92 del(self._waiting_for_approval[id]) | 181 del(self._waiting_for_approval[id]) |
93 | 182 |
94 def approved(self, id): | 183 def _transferSucceeded(self, sid, file_obj, stream_method): |
95 """must be called when a file transfert has be accepted by client""" | 184 """Called by the stream method when transfert successfuly finished |
96 debug (_("Transfert [%s] accepted"), id) | 185 @param id: stream id""" |
97 | 186 file_obj.close() |
98 if ( not self._waiting_for_approval.has_key(id) ): | 187 info(_('Transfert %s successfuly finished') % sid) |
99 error (_("Approved unknow id !")) | 188 del(self._waiting_for_approval[sid]) |
100 #TODO: manage this (maybe approved by several frontends) | 189 |
101 else: | 190 def _transferFailed(self, sid, file_obj, stream_method, reason): |
102 element, from_id, size, profile = self._waiting_for_approval[id] | 191 """Called when something went wrong with the transfert |
103 del(self._waiting_for_approval[id]) | 192 @param id: stream id |
104 self.negociate(element, id, from_id, profile) | 193 @param reason: can be TIMEOUT, IO_ERROR, PROTOCOL_ERROR""" |
105 | 194 data, timeout, stream_method, failed_methods, profile = self._waiting_for_approval[sid] |
106 def negociate(self, feat_elem, id, to_jid, profile): | 195 warning(_('Transfert %(id)s failed with stream method %(s_method)s') % { 'id': sid, |
107 #TODO: put this in a plugin | 196 's_method': stream_method }) |
108 #FIXME: over ultra mega ugly, need to be generic | 197 filepath = file_obj.name |
109 client = self.host.getClient(profile) | 198 file_obj.close() |
110 assert(client) | 199 os.remove(filepath) |
111 info (_("Feature negociation")) | 200 #TODO: session remenber (within a time limit) when a stream method fail, and avoid that stream method with full jid for the rest of the session |
112 data = feat_elem.firstChildElement() | 201 warning(_("All stream methods failed, can't transfert the file")) |
113 field = data.firstChildElement() | 202 self.host.plugins["XEP-0095"].sendFailedError(id, data['from'], profile) |
114 #FIXME: several options ! Q&D code for test only | 203 del(self._waiting_for_approval[id]) |
115 option = field.firstChildElement() | 204 |
116 value = option.firstChildElement() | 205 def fileCb(self, profile, filepath, sid, size, IQ): |
117 if unicode(value) == "http://jabber.org/protocol/bytestreams": | 206 if IQ['type'] == "error": |
118 #ugly, as usual, need to be entirely rewritten (just for test !) | 207 stanza_err = jab_error.exceptionFromStanza(IQ) |
119 result = domish.Element(('', 'iq')) | 208 if stanza_err.code == '403' and stanza_err.condition == 'forbidden': |
120 result['type'] = 'result' | 209 debug(_("File transfert refused by %s") % IQ['from']) |
121 result['id'] = id | 210 self.host.bridge.newAlert(_("The contact %s refused your file") % IQ['from'], _("File refused"), "INFO", profile) |
122 result['to'] = to_jid | 211 else: |
123 si = result.addElement('si', 'http://jabber.org/protocol/si') | 212 warning(_("Error during file transfert with %s") % IQ['from']) |
124 file = si.addElement('file', 'http://jabber.org/protocol/si/profile/file-transfer') | 213 self.host.bridge.newAlert(_("Something went wrong during the file transfer session intialisation with %s") % IQ['from'], _("File transfer error"), "ERROR", profile) |
125 feature = si.addElement('feature', 'http://jabber.org/protocol/feature-neg') | 214 return |
126 x = feature.addElement('x', 'jabber:x:data') | 215 |
127 x['type'] = 'submit' | 216 si_elt = IQ.firstChildElement() |
128 field = x.addElement('field') | 217 |
129 field['var'] = 'stream-method' | 218 if IQ['type'] != "result" or not si_elt or si_elt.name != "si": |
130 value = field.addElement('value') | 219 error(_("Protocol error during file transfer")) |
131 value.addContent('http://jabber.org/protocol/bytestreams') | 220 return |
132 client.xmlstream.send(result) | 221 |
133 | 222 feature_elts = self.host.plugins["XEP-0020"].getFeatureElt(si_elt) |
134 def fileCB(self, answer, xmlstream, current_jid): | 223 if not feature_elts: |
135 if answer['type']=="result": #FIXME FIXME FIXME ugly ugly ugly ! and temp FIXME FIXME FIXME | 224 warning(_("No feature element")) |
225 return | |
226 | |
227 choosed_options = self.host.plugins["XEP-0020"].getChoosedOptions(feature_elts[0]) | |
228 try: | |
229 stream_method = choosed_options["stream-method"] | |
230 except KeyError: | |
231 warning(_("No stream method choosed")) | |
232 return | |
233 | |
234 range_offset = 0 | |
235 range_length = None | |
236 range_elts = filter(lambda elt: elt.name == 'range', si_elt.elements()) | |
237 if range_elts: | |
238 range_elt = range_elts[0] | |
239 range_offset = range_elt.getAttribute("offset", 0) | |
240 range_length = range_elt.getAttribute("length") | |
241 | |
242 if stream_method == self.host.plugins["XEP-0065"].NAMESPACE: | |
136 info("SENDING UGLY ANSWER") | 243 info("SENDING UGLY ANSWER") |
137 offer=client.IQ(xmlstream,'set') | 244 """offer=client.IQ(xmlstream,'set') |
138 offer["from"]=current_jid.full() | 245 offer["from"]=current_jid.full() |
139 offer["to"]=answer['from'] | 246 offer["to"]=answer['from'] |
140 query=offer.addElement('query', 'http://jabber.org/protocol/bytestreams') | 247 query=offer.addElement('query', 'http://jabber.org/protocol/ibb') |
248 #query=offer.addElement('query', 'http://jabber.org/protocol/bytestreams') | |
141 query['mode']='tcp' | 249 query['mode']='tcp' |
142 streamhost=query.addElement('streamhost') | 250 streamhost=query.addElement('streamhost') |
143 streamhost['host']=self.host.memory.getParamA("IP", "File Transfert") | 251 streamhost['host']=self.host.memory.getParamA("IP", "File Transfert") |
144 streamhost['port']=self.host.memory.getParamA("Port", "File Transfert") | 252 streamhost['port']=self.host.memory.getParamA("Port", "File Transfert") |
145 streamhost['jid']=current_jid.full() | 253 streamhost['jid']=current_jid.full() |
146 offer.send() | 254 offer.send()""" |
147 | 255 elif stream_method == self.host.plugins["XEP-0047"].NAMESPACE: |
148 def sendFile(self, to, filepath, profile_key='@DEFAULT@'): | 256 file_obj = open(filepath, 'r') |
257 if range_offset: | |
258 file_obj.seek(range_offset) | |
259 self.host.plugins["XEP-0047"].startStream(file_obj, jid.JID(IQ['from']), sid, range_length, self.sendSuccessCb, self.sendFailureCb, size, profile) | |
260 else: | |
261 warning(_("Invalid stream method received")) | |
262 | |
263 def sendFile(self, to_jid, filepath, data={}, profile_key='@DEFAULT@'): | |
149 """send a file using XEP-0096 | 264 """send a file using XEP-0096 |
150 Return an unique id to identify the transfert | 265 @to_jid: recipient |
266 @filepath: absolute path to the file to send | |
267 @data: dictionnary with the optional following keys: | |
268 - "description": description of the file | |
269 @param profile_key: %(doc_profile_key)s | |
270 @return: an unique id to identify the transfert | |
151 """ | 271 """ |
152 current_jid, xmlstream = self.host.getJidNStream(profile_key) | 272 profile = self.host.memory.getProfileName(profile_key) |
153 if not xmlstream: | 273 if not profile: |
154 error (_('Asking for an non-existant or not connected profile')) | 274 warning(_("Trying to send a file from an unknown profile")) |
155 return "" | 275 return "" |
156 debug ("sendfile (%s) to %s", filepath, to ) | 276 feature_elt = self.host.plugins["XEP-0020"].proposeFeatures({'stream-method': self.managed_stream_m}) |
157 print type(filepath), type(to) | 277 |
278 #self.host.plugins["XEP-0065"].sendFile(offer["id"], filepath, str(statinfo.st_size)) | |
279 | |
280 file_transfer_elts = [] | |
158 | 281 |
159 statinfo = os.stat(filepath) | 282 statinfo = os.stat(filepath) |
160 | 283 file_elt = domish.Element((PROFILE, 'file')) |
161 offer=client.IQ(xmlstream,'set') | 284 file_elt['name']=os.path.basename(filepath) |
162 debug ("Transfert ID: %s", offer["id"]) | 285 size = file_elt['size']=str(statinfo.st_size) |
163 | 286 file_transfer_elts.append(file_elt) |
164 self.host.plugins["XEP-0065"].sendFile(offer["id"], filepath, str(statinfo.st_size)) | 287 |
165 | 288 file_transfer_elts.append(domish.Element((None,'range'))) |
166 offer["from"]=current_jid.full() | 289 |
167 offer["to"]=jid.JID(to).full() | 290 sid, offer = self.host.plugins["XEP-0095"].proposeStream(jid.JID(to_jid), PROFILE, feature_elt, file_transfer_elts, profile_key = profile) |
168 si=offer.addElement('si','http://jabber.org/protocol/si') | 291 offer.addCallback(self.fileCb, profile, filepath, sid, size) |
169 si["mime-type"]='text/plain' | 292 return sid |
170 si["profile"]='http://jabber.org/protocol/si/profile/file-transfer' | 293 |
171 file = si.addElement('file', 'http://jabber.org/protocol/si/profile/file-transfer') | 294 |
172 file['name']=os.path.basename(filepath) | 295 def sendSuccessCb(self, sid, file_obj, stream_method): |
173 file['size']=str(statinfo.st_size) | 296 info(_('Transfer %s successfuly finished') % sid) |
174 | 297 file_obj.close() |
175 ### | 298 |
176 # FIXME: Ugly temporary hard coded implementation of XEP-0020 & XEP-0004, | 299 def sendFailureCb(self, sid, file_obj, stream_method, reason): |
177 # Need to be recoded elsewhere in a more generic way | 300 file_obj.close() |
178 ### | 301 warning(_('Transfert %(id)s failed with stream method %(s_method)s') % { 'id': sid, s_method: stream_method }) |
179 | |
180 feature=si.addElement('feature', "http://jabber.org/protocol/feature-neg") | |
181 x=feature.addElement('x', "jabber:x:data") | |
182 x['type']='form' | |
183 field=x.addElement('field') | |
184 field['type']='list-single' | |
185 field['var']='stream-method' | |
186 option = field.addElement('option') | |
187 value = option.addElement('value', content='http://jabber.org/protocol/bytestreams') | |
188 | |
189 offer.addCallback(self.fileCB, current_jid = current_jid, xmlstream = xmlstream) | |
190 offer.send() | |
191 return offer["id"] #XXX: using IQ id as file transfert id seems OK as IQ id are required | |
192 | |
193 class XEP_0096_handler(XMPPHandler): | |
194 implements(iwokkel.IDisco) | |
195 | |
196 def __init__(self, plugin_parent): | |
197 self.plugin_parent = plugin_parent | |
198 self.host = plugin_parent.host | |
199 | |
200 def connectionInitialized(self): | |
201 self.xmlstream.addObserver(SI_REQUEST, self.plugin_parent.xep_96, profile = self.parent.profile) | |
202 | |
203 def getDiscoInfo(self, requestor, target, nodeIdentifier=''): | |
204 return [disco.DiscoFeature(NS_SI)] | |
205 | |
206 def getDiscoItems(self, requestor, target, nodeIdentifier=''): | |
207 return [] | |
208 |