comparison src/plugins/plugin_exp_pipe.py @ 993:301b342c697a

core: use of the new core.log module: /!\ this is a massive refactoring and was largely automated, it probably did bring some bugs /!\
author Goffi <goffi@goffi.org>
date Sat, 19 Apr 2014 19:19:19 +0200
parents c6d8fc63b1db
children 069ad98b360d
comparison
equal deleted inserted replaced
992:f51a1895275c 993:301b342c697a
17 # You should have received a copy of the GNU Affero General Public License 17 # You should have received a copy of the GNU Affero General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>. 18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 19
20 from sat.core.i18n import _ 20 from sat.core.i18n import _
21 from sat.core.constants import Const as C 21 from sat.core.constants import Const as C
22 from logging import debug, info, warning, error 22 from sat.core.log import getLogger
23 log = getLogger(__name__)
23 from twisted.words.xish import domish 24 from twisted.words.xish import domish
24 from twisted.words.protocols.jabber import jid 25 from twisted.words.protocols.jabber import jid
25 from twisted.words.protocols.jabber import error as jab_error 26 from twisted.words.protocols import jabber
26 from twisted.internet import reactor 27 from twisted.internet import reactor
27 28
28 from wokkel import data_form 29 from wokkel import data_form
29 30
30 IQ_SET = '/iq[@type="set"]' 31 IQ_SET = '/iq[@type="set"]'
45 46
46 class Exp_Pipe(object): 47 class Exp_Pipe(object):
47 """This is a modified version of XEP-0096 to work with named pipes instead of files""" 48 """This is a modified version of XEP-0096 to work with named pipes instead of files"""
48 49
49 def __init__(self, host): 50 def __init__(self, host):
50 info(_("Plugin Pipe initialization")) 51 log.info(_("Plugin Pipe initialization"))
51 self.host = host 52 self.host = host
52 self.managed_stream_m = [self.host.plugins["XEP-0065"].NAMESPACE, 53 self.managed_stream_m = [self.host.plugins["XEP-0065"].NAMESPACE,
53 self.host.plugins["XEP-0047"].NAMESPACE] # Stream methods managed 54 self.host.plugins["XEP-0047"].NAMESPACE] # Stream methods managed
54 self.host.plugins["XEP-0095"].registerSIProfile(PROFILE_NAME, self.transferRequest) 55 self.host.plugins["XEP-0095"].registerSIProfile(PROFILE_NAME, self.transferRequest)
55 host.bridge.addMethod("pipeOut", ".plugin", in_sign='ssa{ss}s', out_sign='s', method=self.pipeOut) 56 host.bridge.addMethod("pipeOut", ".plugin", in_sign='ssa{ss}s', out_sign='s', method=self.pipeOut)
60 # current stream method, [failed stream methods], profile] 61 # current stream method, [failed stream methods], profile]
61 62
62 def _kill_id(self, approval_id, profile): 63 def _kill_id(self, approval_id, profile):
63 """Delete a waiting_for_approval id, called after timeout 64 """Delete a waiting_for_approval id, called after timeout
64 @param approval_id: id of _pipe_waiting_for_approval""" 65 @param approval_id: id of _pipe_waiting_for_approval"""
65 info(_("SI Pipe Transfer: TimeOut reached for id %s") % approval_id) 66 log.info(_("SI Pipe Transfer: TimeOut reached for id %s") % approval_id)
66 try: 67 try:
67 client = self.host.getClient(profile) 68 client = self.host.getClient(profile)
68 del client._pipe_waiting_for_approval[approval_id] 69 del client._pipe_waiting_for_approval[approval_id]
69 except KeyError: 70 except KeyError:
70 warning(_("kill id called on a non existant approval id")) 71 log.warning(_("kill id called on a non existant approval id"))
71 72
72 def transferRequest(self, iq_id, from_jid, si_id, si_mime_type, si_el, profile): 73 def transferRequest(self, iq_id, from_jid, si_id, si_mime_type, si_el, profile):
73 """Called when a pipe transfer is requested 74 """Called when a pipe transfer is requested
74 @param iq_id: id of the iq request 75 @param iq_id: id of the iq request
75 @param from_jid: jid of the sender 76 @param from_jid: jid of the sender
76 @param si_id: Stream Initiation session id 77 @param si_id: Stream Initiation session id
77 @param si_mime_type: Mime type of the pipe (or default "application/octet-stream" if unknown) 78 @param si_mime_type: Mime type of the pipe (or default "application/octet-stream" if unknown)
78 @param si_el: domish.Element of the request 79 @param si_el: domish.Element of the request
79 @param profile: %(doc_profile)s""" 80 @param profile: %(doc_profile)s"""
80 info(_("EXP-PIPE file transfer requested")) 81 log.info(_("EXP-PIPE file transfer requested"))
81 debug(si_el.toXml()) 82 log.debug(si_el.toXml())
82 client = self.host.getClient(profile) 83 client = self.host.getClient(profile)
83 pipe_elts = filter(lambda elt: elt.name == 'pipe', si_el.elements()) 84 pipe_elts = filter(lambda elt: elt.name == 'pipe', si_el.elements())
84 feature_elts = self.host.plugins["XEP-0020"].getFeatureElt(si_el) 85 feature_elts = self.host.plugins["XEP-0020"].getFeatureElt(si_el)
85 86
86 if not pipe_elts: 87 if not pipe_elts:
87 warning(_("No pipe element found")) 88 log.warning(_("No pipe element found"))
88 self.host.plugins["XEP-0095"].sendBadRequestError(iq_id, from_jid, profile) 89 self.host.plugins["XEP-0095"].sendBadRequestError(iq_id, from_jid, profile)
89 return 90 return
90 91
91 if feature_elts: 92 if feature_elts:
92 feature_el = feature_elts[0] 93 feature_el = feature_elts[0]
93 data_form.Form.fromElement(feature_el.firstChildElement()) 94 data_form.Form.fromElement(feature_el.firstChildElement())
94 try: 95 try:
95 stream_method = self.host.plugins["XEP-0020"].negociate(feature_el, 'stream-method', self.managed_stream_m) 96 stream_method = self.host.plugins["XEP-0020"].negociate(feature_el, 'stream-method', self.managed_stream_m)
96 except KeyError: 97 except KeyError:
97 warning(_("No stream method found")) 98 log.warning(_("No stream method found"))
98 self.host.plugins["XEP-0095"].sendBadRequestError(iq_id, from_jid, profile) 99 self.host.plugins["XEP-0095"].sendBadRequestError(iq_id, from_jid, profile)
99 return 100 return
100 if not stream_method: 101 if not stream_method:
101 warning(_("Can't find a valid stream method")) 102 log.warning(_("Can't find a valid stream method"))
102 self.host.plugins["XEP-0095"].sendFailedError(iq_id, from_jid, profile) 103 self.host.plugins["XEP-0095"].sendFailedError(iq_id, from_jid, profile)
103 return 104 return
104 else: 105 else:
105 warning(_("No feature element found")) 106 log.warning(_("No feature element found"))
106 self.host.plugins["XEP-0095"].sendBadRequestError(iq_id, from_jid, profile) 107 self.host.plugins["XEP-0095"].sendBadRequestError(iq_id, from_jid, profile)
107 return 108 return
108 109
109 #if we are here, the transfer can start, we just need user's agreement 110 #if we are here, the transfer can start, we just need user's agreement
110 data = {"id": iq_id, "from": from_jid} 111 data = {"id": iq_id, "from": from_jid}
123 if timeout.active(): 124 if timeout.active():
124 timeout.cancel() 125 timeout.cancel()
125 try: 126 try:
126 dest_path = frontend_data['dest_path'] 127 dest_path = frontend_data['dest_path']
127 except KeyError: 128 except KeyError:
128 error(_('dest path not found in frontend_data')) 129 log.error(_('dest path not found in frontend_data'))
129 del(client._pipe_waiting_for_approval[sid]) 130 del(client._pipe_waiting_for_approval[sid])
130 return 131 return
131 if stream_method == self.host.plugins["XEP-0065"].NAMESPACE: 132 if stream_method == self.host.plugins["XEP-0065"].NAMESPACE:
132 file_obj = open(dest_path, 'w+') 133 file_obj = open(dest_path, 'w+')
133 self.host.plugins["XEP-0065"].prepareToReceive(jid.JID(data['from']), sid, file_obj, None, self._transferSucceeded, self._transferFailed, profile) 134 self.host.plugins["XEP-0065"].prepareToReceive(jid.JID(data['from']), sid, file_obj, None, self._transferSucceeded, self._transferFailed, profile)
134 elif stream_method == self.host.plugins["XEP-0047"].NAMESPACE: 135 elif stream_method == self.host.plugins["XEP-0047"].NAMESPACE:
135 file_obj = open(dest_path, 'w+') 136 file_obj = open(dest_path, 'w+')
136 self.host.plugins["XEP-0047"].prepareToReceive(jid.JID(data['from']), sid, file_obj, None, self._transferSucceeded, self._transferFailed, profile) 137 self.host.plugins["XEP-0047"].prepareToReceive(jid.JID(data['from']), sid, file_obj, None, self._transferSucceeded, self._transferFailed, profile)
137 else: 138 else:
138 error(_("Unknown stream method, this should not happen at this stage, cancelling transfer")) 139 log.error(_("Unknown stream method, this should not happen at this stage, cancelling transfer"))
139 del(client._pipe_waiting_for_approval[sid]) 140 del(client._pipe_waiting_for_approval[sid])
140 return 141 return
141 142
142 #we can send the iq result 143 #we can send the iq result
143 feature_elt = self.host.plugins["XEP-0020"].chooseOption({'stream-method': stream_method}) 144 feature_elt = self.host.plugins["XEP-0020"].chooseOption({'stream-method': stream_method})
144 misc_elts = [] 145 misc_elts = []
145 misc_elts.append(domish.Element((PROFILE, "file"))) 146 misc_elts.append(domish.Element((PROFILE, "file")))
146 self.host.plugins["XEP-0095"].acceptStream(data["id"], data['from'], feature_elt, misc_elts, profile) 147 self.host.plugins["XEP-0095"].acceptStream(data["id"], data['from'], feature_elt, misc_elts, profile)
147 else: 148 else:
148 debug(_("Transfer [%s] refused"), sid) 149 log.debug(_("Transfer [%s] refused"), sid)
149 self.host.plugins["XEP-0095"].sendRejectedError(data["id"], data['from'], profile=profile) 150 self.host.plugins["XEP-0095"].sendRejectedError(data["id"], data['from'], profile=profile)
150 del(client._pipe_waiting_for_approval[sid]) 151 del(client._pipe_waiting_for_approval[sid])
151 152
152 def _transferSucceeded(self, sid, file_obj, stream_method, profile): 153 def _transferSucceeded(self, sid, file_obj, stream_method, profile):
153 """Called by the stream method when transfer successfuly finished 154 """Called by the stream method when transfer successfuly finished
154 @param id: stream id""" 155 @param id: stream id"""
155 client = self.host.getClient(profile) 156 client = self.host.getClient(profile)
156 file_obj.close() 157 file_obj.close()
157 info(_('Transfer %s successfuly finished') % sid) 158 log.info(_('Transfer %s successfuly finished') % sid)
158 del(client._pipe_waiting_for_approval[sid]) 159 del(client._pipe_waiting_for_approval[sid])
159 160
160 def _transferFailed(self, sid, file_obj, stream_method, reason, profile): 161 def _transferFailed(self, sid, file_obj, stream_method, reason, profile):
161 """Called when something went wrong with the transfer 162 """Called when something went wrong with the transfer
162 @param id: stream id 163 @param id: stream id
163 @param reason: can be TIMEOUT, IO_ERROR, PROTOCOL_ERROR""" 164 @param reason: can be TIMEOUT, IO_ERROR, PROTOCOL_ERROR"""
164 client = self.host.getClient(profile) 165 client = self.host.getClient(profile)
165 data, timeout, stream_method, failed_methods, profile = client._pipe_waiting_for_approval[sid] 166 data, timeout, stream_method, failed_methods, profile = client._pipe_waiting_for_approval[sid]
166 warning(_('Transfer %(id)s failed with stream method %(s_method)s') % {'id': sid, 167 log.warning(_('Transfer %(id)s failed with stream method %(s_method)s') % {'id': sid,
167 's_method': stream_method}) 168 's_method': stream_method})
168 # filepath = file_obj.name 169 # filepath = file_obj.name
169 file_obj.close() 170 file_obj.close()
170 #TODO: session remenber (within a time limit) when a stream method fail, and avoid that stream method with full jid for the rest of the session 171 #TODO: session remenber (within a time limit) when a stream method fail, and avoid that stream method with full jid for the rest of the session
171 warning(_("All stream methods failed, can't transfer the file")) 172 log.warning(_("All stream methods failed, can't transfer the file"))
172 del(client._pipe_waiting_for_approval[sid]) 173 del(client._pipe_waiting_for_approval[sid])
173 174
174 def pipeCb(self, filepath, sid, profile, IQ): 175 def pipeCb(self, filepath, sid, profile, IQ):
175 if IQ['type'] == "error": 176 if IQ['type'] == "error":
176 stanza_err = jab_error.exceptionFromStanza(IQ) 177 stanza_err = jabber.error.exceptionFromStanza(IQ)
177 if stanza_err.code == '403' and stanza_err.condition == 'forbidden': 178 if stanza_err.code == '403' and stanza_err.condition == 'forbidden':
178 debug(_("Pipe transfer refused by %s") % IQ['from']) 179 log.debug(_("Pipe transfer refused by %s") % IQ['from'])
179 self.host.bridge.newAlert(_("The contact %s refused your pipe stream") % IQ['from'], _("Pipe stream refused"), "INFO", profile) 180 self.host.bridge.newAlert(_("The contact %s refused your pipe stream") % IQ['from'], _("Pipe stream refused"), "INFO", profile)
180 else: 181 else:
181 warning(_("Error during pipe stream transfer with %s") % IQ['from']) 182 log.warning(_("Error during pipe stream transfer with %s") % IQ['from'])
182 self.host.bridge.newAlert(_("Something went wrong during the pipe stream session intialisation with %s") % IQ['from'], _("Pipe stream error"), "ERROR", profile) 183 self.host.bridge.newAlert(_("Something went wrong during the pipe stream session intialisation with %s") % IQ['from'], _("Pipe stream error"), "ERROR", profile)
183 return 184 return
184 185
185 si_elt = IQ.firstChildElement() 186 si_elt = IQ.firstChildElement()
186 187
187 if IQ['type'] != "result" or not si_elt or si_elt.name != "si": 188 if IQ['type'] != "result" or not si_elt or si_elt.name != "si":
188 error(_("Protocol error during file transfer")) 189 log.error(_("Protocol error during file transfer"))
189 return 190 return
190 191
191 feature_elts = self.host.plugins["XEP-0020"].getFeatureElt(si_elt) 192 feature_elts = self.host.plugins["XEP-0020"].getFeatureElt(si_elt)
192 if not feature_elts: 193 if not feature_elts:
193 warning(_("No feature element")) 194 log.warning(_("No feature element"))
194 return 195 return
195 196
196 choosed_options = self.host.plugins["XEP-0020"].getChoosedOptions(feature_elts[0]) 197 choosed_options = self.host.plugins["XEP-0020"].getChoosedOptions(feature_elts[0])
197 try: 198 try:
198 stream_method = choosed_options["stream-method"] 199 stream_method = choosed_options["stream-method"]
199 except KeyError: 200 except KeyError:
200 warning(_("No stream method choosed")) 201 log.warning(_("No stream method choosed"))
201 return 202 return
202 203
203 if stream_method == self.host.plugins["XEP-0065"].NAMESPACE: 204 if stream_method == self.host.plugins["XEP-0065"].NAMESPACE:
204 #fd = os.open(filepath, os.O_RDONLY | os.O_NONBLOCK) #XXX: non blocking openingl cause issues with XEP-0065's FileSender 205 #fd = os.open(filepath, os.O_RDONLY | os.O_NONBLOCK) #XXX: non blocking openingl cause issues with XEP-0065's FileSender
205 #file_obj = os.fdopen(fd, 'r') 206 #file_obj = os.fdopen(fd, 'r')
209 #fd = os.open(filepath, os.O_RDONLY | os.O_NONBLOCK) #XXX: non blocking openingl cause issues with XEP-0065's FileSender 210 #fd = os.open(filepath, os.O_RDONLY | os.O_NONBLOCK) #XXX: non blocking openingl cause issues with XEP-0065's FileSender
210 #file_obj = os.fdopen(fd, 'r') 211 #file_obj = os.fdopen(fd, 'r')
211 file_obj = open(filepath, 'r') # XXX: we have to be sure that filepath is well opened, as reading can block it 212 file_obj = open(filepath, 'r') # XXX: we have to be sure that filepath is well opened, as reading can block it
212 self.host.plugins["XEP-0047"].startStream(file_obj, jid.JID(IQ['from']), sid, None, self.sendSuccessCb, self.sendFailureCb, None, profile) 213 self.host.plugins["XEP-0047"].startStream(file_obj, jid.JID(IQ['from']), sid, None, self.sendSuccessCb, self.sendFailureCb, None, profile)
213 else: 214 else:
214 warning(_("Invalid stream method received")) 215 log.warning(_("Invalid stream method received"))
215 216
216 def pipeOut(self, to_jid, filepath, data={}, profile_key=C.PROF_KEY_NONE): 217 def pipeOut(self, to_jid, filepath, data={}, profile_key=C.PROF_KEY_NONE):
217 """send a file using EXP-PIPE 218 """send a file using EXP-PIPE
218 @to_jid: recipient 219 @to_jid: recipient
219 @filepath: absolute path to the named pipe to send 220 @filepath: absolute path to the named pipe to send
221 @param profile_key: %(doc_profile_key)s 222 @param profile_key: %(doc_profile_key)s
222 @return: an unique id to identify the transfer 223 @return: an unique id to identify the transfer
223 """ 224 """
224 profile = self.host.memory.getProfileName(profile_key) 225 profile = self.host.memory.getProfileName(profile_key)
225 if not profile: 226 if not profile:
226 warning(_("Trying to send a file from an unknown profile")) 227 log.warning(_("Trying to send a file from an unknown profile"))
227 return "" 228 return ""
228 feature_elt = self.host.plugins["XEP-0020"].proposeFeatures({'stream-method': self.managed_stream_m}) 229 feature_elt = self.host.plugins["XEP-0020"].proposeFeatures({'stream-method': self.managed_stream_m})
229 230
230 pipe_transfer_elts = [] 231 pipe_transfer_elts = []
231 232
235 sid, offer = self.host.plugins["XEP-0095"].proposeStream(jid.JID(to_jid), PROFILE, feature_elt, pipe_transfer_elts, profile_key=profile) 236 sid, offer = self.host.plugins["XEP-0095"].proposeStream(jid.JID(to_jid), PROFILE, feature_elt, pipe_transfer_elts, profile_key=profile)
236 offer.addCallback(self.pipeCb, filepath, sid, profile) 237 offer.addCallback(self.pipeCb, filepath, sid, profile)
237 return sid 238 return sid
238 239
239 def sendSuccessCb(self, sid, file_obj, stream_method, profile): 240 def sendSuccessCb(self, sid, file_obj, stream_method, profile):
240 info(_('Transfer %s successfuly finished') % sid) 241 log.info(_('Transfer %s successfuly finished') % sid)
241 file_obj.close() 242 file_obj.close()
242 243
243 def sendFailureCb(self, sid, file_obj, stream_method, reason, profile): 244 def sendFailureCb(self, sid, file_obj, stream_method, reason, profile):
244 file_obj.close() 245 file_obj.close()
245 warning(_('Transfer %(id)s failed with stream method %(s_method)s %(profile)s') % {'id': sid, "s_method": stream_method, "profile": profile}) 246 log.warning(_('Transfer %(id)s failed with stream method %(s_method)s %(profile)s') % {'id': sid, "s_method": stream_method, "profile": profile})