comparison src/plugins/plugin_exp_pipe.py @ 594:e629371a28d3

Fix pep8 support in src/plugins.
author Emmanuel Gil Peyrot <linkmauve@linkmauve.fr>
date Fri, 18 Jan 2013 17:55:35 +0100
parents beaf6bec2fcd
children 84a6e83157c2
comparison
equal deleted inserted replaced
593:70bae685d05c 594:e629371a28d3
21 21
22 from logging import debug, info, warning, error 22 from logging import debug, info, warning, error
23 from twisted.words.xish import domish 23 from twisted.words.xish import domish
24 from twisted.words.protocols.jabber import jid 24 from twisted.words.protocols.jabber import jid
25 from twisted.words.protocols.jabber import error as jab_error 25 from twisted.words.protocols.jabber import error as jab_error
26 import os, os.path 26 import os
27 from twisted.internet import reactor 27 from twisted.internet import reactor
28 from sat.core.exceptions import ProfileNotInCacheError 28 from sat.core.exceptions import ProfileNotInCacheError
29 29
30 from wokkel import data_form 30 from wokkel import data_form
31 31
32 IQ_SET = '/iq[@type="set"]' 32 IQ_SET = '/iq[@type="set"]'
33 PROFILE_NAME = "pipe-transfer" 33 PROFILE_NAME = "pipe-transfer"
34 PROFILE = "http://jabber.org/protocol/si/profile/" + PROFILE_NAME 34 PROFILE = "http://jabber.org/protocol/si/profile/" + PROFILE_NAME
35 35
36 PLUGIN_INFO = { 36 PLUGIN_INFO = {
37 "name": "Pipe Plugin", 37 "name": "Pipe Plugin",
38 "import_name": "EXP-PIPE", 38 "import_name": "EXP-PIPE",
39 "type": "EXP", 39 "type": "EXP",
40 "protocols": ["EXP-PIPE"], 40 "protocols": ["EXP-PIPE"],
41 "dependencies": ["XEP-0020", "XEP-0095", "XEP-0065", "XEP-0047"], 41 "dependencies": ["XEP-0020", "XEP-0095", "XEP-0065", "XEP-0047"],
42 "main": "Exp_Pipe", 42 "main": "Exp_Pipe",
43 "handler": "no", 43 "handler": "no",
44 "description": _("""Implementation of SI Pipe Transfer""") 44 "description": _("""Implementation of SI Pipe Transfer""")
45 } 45 }
46
46 47
47 class Exp_Pipe(object): 48 class Exp_Pipe(object):
48 """This is a modified version of XEP-0096 to work with named pipes instead of files""" 49 """This is a modified version of XEP-0096 to work with named pipes instead of files"""
49 50
50 def __init__(self, host): 51 def __init__(self, host):
51 info(_("Plugin Pipe initialization")) 52 info(_("Plugin Pipe initialization"))
52 self.host = host 53 self.host = host
53 self.managed_stream_m = [self.host.plugins["XEP-0065"].NAMESPACE, 54 self.managed_stream_m = [self.host.plugins["XEP-0065"].NAMESPACE,
54 self.host.plugins["XEP-0047"].NAMESPACE] #Stream methods managed 55 self.host.plugins["XEP-0047"].NAMESPACE] # Stream methods managed
55 self.host.plugins["XEP-0095"].registerSIProfile(PROFILE_NAME, self.transferRequest) 56 self.host.plugins["XEP-0095"].registerSIProfile(PROFILE_NAME, self.transferRequest)
56 host.bridge.addMethod("pipeOut", ".plugin", in_sign='ssa{ss}s', out_sign='s', method=self.pipeOut) 57 host.bridge.addMethod("pipeOut", ".plugin", in_sign='ssa{ss}s', out_sign='s', method=self.pipeOut)
57 58
58 def profileConnected(self, profile): 59 def profileConnected(self, profile):
59 client = self.host.getClient(profile) 60 client = self.host.getClient(profile)
60 client._pipe_waiting_for_approval = {} #key = id, value = [transfer data, IdelayedCall Reactor timeout, 61 client._pipe_waiting_for_approval = {} # key = id, value = [transfer data, IdelayedCall Reactor timeout,
61 # current stream method, [failed stream methods], profile] 62 # current stream method, [failed stream methods], profile]
62 63
63 def _kill_id(self, approval_id, profile): 64 def _kill_id(self, approval_id, profile):
64 """Delete a waiting_for_approval id, called after timeout 65 """Delete a waiting_for_approval id, called after timeout
65 @param approval_id: id of _pipe_waiting_for_approval""" 66 @param approval_id: id of _pipe_waiting_for_approval"""
76 @param from_jid: jid of the sender 77 @param from_jid: jid of the sender
77 @param si_id: Stream Initiation session id 78 @param si_id: Stream Initiation session id
78 @param si_mime_type: Mime type of the pipe (or default "application/octet-stream" if unknown) 79 @param si_mime_type: Mime type of the pipe (or default "application/octet-stream" if unknown)
79 @param si_el: domish.Element of the request 80 @param si_el: domish.Element of the request
80 @param profile: %(doc_profile)s""" 81 @param profile: %(doc_profile)s"""
81 info (_("EXP-PIPE file transfer requested")) 82 info(_("EXP-PIPE file transfer requested"))
82 debug(si_el.toXml()) 83 debug(si_el.toXml())
83 client = self.host.getClient(profile) 84 client = self.host.getClient(profile)
84 if not client: 85 if not client:
85 raise ProfileNotInCacheError 86 raise ProfileNotInCacheError
86 pipe_elts = filter(lambda elt: elt.name == 'pipe', si_el.elements()) 87 pipe_elts = filter(lambda elt: elt.name == 'pipe', si_el.elements())
93 94
94 if feature_elts: 95 if feature_elts:
95 feature_el = feature_elts[0] 96 feature_el = feature_elts[0]
96 form = data_form.Form.fromElement(feature_el.firstChildElement()) 97 form = data_form.Form.fromElement(feature_el.firstChildElement())
97 try: 98 try:
98 stream_method = self.host.plugins["XEP-0020"].negociate(feature_el, 'stream-method',self.managed_stream_m) 99 stream_method = self.host.plugins["XEP-0020"].negociate(feature_el, 'stream-method', self.managed_stream_m)
99 except KeyError: 100 except KeyError:
100 warning(_("No stream method found")) 101 warning(_("No stream method found"))
101 self.host.plugins["XEP-0095"].sendBadRequestError(iq_id, from_jid, profile) 102 self.host.plugins["XEP-0095"].sendBadRequestError(iq_id, from_jid, profile)
102 return 103 return
103 if not stream_method: 104 if not stream_method:
108 warning(_("No feature element found")) 109 warning(_("No feature element found"))
109 self.host.plugins["XEP-0095"].sendBadRequestError(iq_id, from_jid, profile) 110 self.host.plugins["XEP-0095"].sendBadRequestError(iq_id, from_jid, profile)
110 return 111 return
111 112
112 #if we are here, the transfer can start, we just need user's agreement 113 #if we are here, the transfer can start, we just need user's agreement
113 data={ "id": iq_id, "from":from_jid } 114 data = {"id": iq_id, "from": from_jid}
114 client._pipe_waiting_for_approval[si_id] = [data, reactor.callLater(300, self._kill_id, si_id), stream_method, [], profile] 115 client._pipe_waiting_for_approval[si_id] = [data, reactor.callLater(300, self._kill_id, si_id), stream_method, [], profile]
115 116
116 self.host.askConfirmation(si_id, "PIPE_TRANSFER", data, self.confirmationCB, profile) 117 self.host.askConfirmation(si_id, "PIPE_TRANSFER", data, self.confirmationCB, profile)
117
118 118
119 def confirmationCB(self, sid, accepted, frontend_data, profile): 119 def confirmationCB(self, sid, accepted, frontend_data, profile):
120 """Called on confirmation answer 120 """Called on confirmation answer
121 @param sid: file transfer session id 121 @param sid: file transfer session id
122 @param accepted: True if file transfer is accepted 122 @param accepted: True if file transfer is accepted
144 error(_("Unknown stream method, this should not happen at this stage, cancelling transfer")) 144 error(_("Unknown stream method, this should not happen at this stage, cancelling transfer"))
145 del(client._pipe_waiting_for_approval[sid]) 145 del(client._pipe_waiting_for_approval[sid])
146 return 146 return
147 147
148 #we can send the iq result 148 #we can send the iq result
149 feature_elt = self.host.plugins["XEP-0020"].chooseOption({'stream-method':stream_method}) 149 feature_elt = self.host.plugins["XEP-0020"].chooseOption({'stream-method': stream_method})
150 misc_elts = [] 150 misc_elts = []
151 misc_elts.append(domish.Element((PROFILE, "file"))) 151 misc_elts.append(domish.Element((PROFILE, "file")))
152 self.host.plugins["XEP-0095"].acceptStream(data["id"], data['from'], feature_elt, misc_elts, profile) 152 self.host.plugins["XEP-0095"].acceptStream(data["id"], data['from'], feature_elt, misc_elts, profile)
153 else: 153 else:
154 debug (_("Transfer [%s] refused"), sid) 154 debug(_("Transfer [%s] refused"), sid)
155 self.host.plugins["XEP-0095"].sendRejectedError (data["id"], data['from'], profile=profile) 155 self.host.plugins["XEP-0095"].sendRejectedError(data["id"], data['from'], profile=profile)
156 del(client._pipe_waiting_for_approval[sid]) 156 del(client._pipe_waiting_for_approval[sid])
157 157
158 def _transferSucceeded(self, sid, file_obj, stream_method, profile): 158 def _transferSucceeded(self, sid, file_obj, stream_method, profile):
159 """Called by the stream method when transfer successfuly finished 159 """Called by the stream method when transfer successfuly finished
160 @param id: stream id""" 160 @param id: stream id"""
171 @param reason: can be TIMEOUT, IO_ERROR, PROTOCOL_ERROR""" 171 @param reason: can be TIMEOUT, IO_ERROR, PROTOCOL_ERROR"""
172 client = self.host.getClient(profile) 172 client = self.host.getClient(profile)
173 if not client: 173 if not client:
174 raise ProfileNotInCacheError 174 raise ProfileNotInCacheError
175 data, timeout, stream_method, failed_methods, profile = client._pipe_waiting_for_approval[sid] 175 data, timeout, stream_method, failed_methods, profile = client._pipe_waiting_for_approval[sid]
176 warning(_('Transfer %(id)s failed with stream method %(s_method)s') % { 'id': sid, 176 warning(_('Transfer %(id)s failed with stream method %(s_method)s') % {'id': sid,
177 's_method': stream_method }) 177 's_method': stream_method})
178 filepath = file_obj.name 178 filepath = file_obj.name
179 file_obj.close() 179 file_obj.close()
180 #TODO: session remenber (within a time limit) when a stream method fail, and avoid that stream method with full jid for the rest of the session 180 #TODO: session remenber (within a time limit) when a stream method fail, and avoid that stream method with full jid for the rest of the session
181 warning(_("All stream methods failed, can't transfer the file")) 181 warning(_("All stream methods failed, can't transfer the file"))
182 del(client._pipe_waiting_for_approval[sid]) 182 del(client._pipe_waiting_for_approval[sid])
209 except KeyError: 209 except KeyError:
210 warning(_("No stream method choosed")) 210 warning(_("No stream method choosed"))
211 return 211 return
212 212
213 if stream_method == self.host.plugins["XEP-0065"].NAMESPACE: 213 if stream_method == self.host.plugins["XEP-0065"].NAMESPACE:
214 #fd = os.open(filepath, os.O_RDONLY | os.O_NONBLOCK) #XXX: non blocking openingl cause issues with XEP-0065's FileSender 214 #fd = os.open(filepath, os.O_RDONLY | os.O_NONBLOCK) #XXX: non blocking openingl cause issues with XEP-0065's FileSender
215 #file_obj = os.fdopen(fd, 'r') 215 #file_obj = os.fdopen(fd, 'r')
216 file_obj = open(filepath, 'r') #XXX: we have to be sure that filepath is well opened, as reading can block it 216 file_obj = open(filepath, 'r') # XXX: we have to be sure that filepath is well opened, as reading can block it
217 self.host.plugins["XEP-0065"].startStream(file_obj, jid.JID(IQ['from']), sid, None, self.sendSuccessCb, self.sendFailureCb, None, profile) 217 self.host.plugins["XEP-0065"].startStream(file_obj, jid.JID(IQ['from']), sid, None, self.sendSuccessCb, self.sendFailureCb, None, profile)
218 elif stream_method == self.host.plugins["XEP-0047"].NAMESPACE: 218 elif stream_method == self.host.plugins["XEP-0047"].NAMESPACE:
219 #fd = os.open(filepath, os.O_RDONLY | os.O_NONBLOCK) #XXX: non blocking openingl cause issues with XEP-0065's FileSender 219 #fd = os.open(filepath, os.O_RDONLY | os.O_NONBLOCK) #XXX: non blocking openingl cause issues with XEP-0065's FileSender
220 #file_obj = os.fdopen(fd, 'r') 220 #file_obj = os.fdopen(fd, 'r')
221 file_obj = open(filepath, 'r') #XXX: we have to be sure that filepath is well opened, as reading can block it 221 file_obj = open(filepath, 'r') # XXX: we have to be sure that filepath is well opened, as reading can block it
222 self.host.plugins["XEP-0047"].startStream(file_obj, jid.JID(IQ['from']), sid, None, self.sendSuccessCb, self.sendFailureCb, None, profile) 222 self.host.plugins["XEP-0047"].startStream(file_obj, jid.JID(IQ['from']), sid, None, self.sendSuccessCb, self.sendFailureCb, None, profile)
223 else: 223 else:
224 warning(_("Invalid stream method received")) 224 warning(_("Invalid stream method received"))
225 225
226 def pipeOut(self, to_jid, filepath, data={}, profile_key='@DEFAULT@'): 226 def pipeOut(self, to_jid, filepath, data={}, profile_key='@DEFAULT@'):
227 """send a file using EXP-PIPE 227 """send a file using EXP-PIPE
240 pipe_transfer_elts = [] 240 pipe_transfer_elts = []
241 241
242 pipe_elt = domish.Element((PROFILE, 'pipe')) 242 pipe_elt = domish.Element((PROFILE, 'pipe'))
243 pipe_transfer_elts.append(pipe_elt) 243 pipe_transfer_elts.append(pipe_elt)
244 244
245 sid, offer = self.host.plugins["XEP-0095"].proposeStream(jid.JID(to_jid), PROFILE, feature_elt, pipe_transfer_elts, profile_key = profile) 245 sid, offer = self.host.plugins["XEP-0095"].proposeStream(jid.JID(to_jid), PROFILE, feature_elt, pipe_transfer_elts, profile_key=profile)
246 offer.addCallback(self.pipeCb, filepath, sid, profile) 246 offer.addCallback(self.pipeCb, filepath, sid, profile)
247 return sid 247 return sid
248 248
249 def sendSuccessCb(self, sid, file_obj, stream_method, profile): 249 def sendSuccessCb(self, sid, file_obj, stream_method, profile):
250 info(_('Transfer %s successfuly finished') % sid) 250 info(_('Transfer %s successfuly finished') % sid)
251 file_obj.close() 251 file_obj.close()
252 252
253 def sendFailureCb(self, sid, file_obj, stream_method, reason, profile): 253 def sendFailureCb(self, sid, file_obj, stream_method, reason, profile):
254 file_obj.close() 254 file_obj.close()
255 warning(_('Transfer %(id)s failed with stream method %(s_method)s %(profile)s') % { 'id': sid, "s_method": stream_method, "profile": profile }) 255 warning(_('Transfer %(id)s failed with stream method %(s_method)s %(profile)s') % {'id': sid, "s_method": stream_method, "profile": profile})
256