Mercurial > libervia-backend
comparison src/plugins/plugin_exp_pipe.py @ 993:301b342c697a
core: use of the new core.log module:
/!\ this is a massive refactoring and was largely automated, it probably did bring some bugs /!\
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 19 Apr 2014 19:19:19 +0200 |
parents | c6d8fc63b1db |
children | 069ad98b360d |
comparison
equal
deleted
inserted
replaced
992:f51a1895275c | 993:301b342c697a |
---|---|
17 # You should have received a copy of the GNU Affero General Public License | 17 # You should have received a copy of the GNU Affero General Public License |
18 # along with this program. If not, see <http://www.gnu.org/licenses/>. | 18 # along with this program. If not, see <http://www.gnu.org/licenses/>. |
19 | 19 |
20 from sat.core.i18n import _ | 20 from sat.core.i18n import _ |
21 from sat.core.constants import Const as C | 21 from sat.core.constants import Const as C |
22 from logging import debug, info, warning, error | 22 from sat.core.log import getLogger |
23 log = getLogger(__name__) | |
23 from twisted.words.xish import domish | 24 from twisted.words.xish import domish |
24 from twisted.words.protocols.jabber import jid | 25 from twisted.words.protocols.jabber import jid |
25 from twisted.words.protocols.jabber import error as jab_error | 26 from twisted.words.protocols import jabber |
26 from twisted.internet import reactor | 27 from twisted.internet import reactor |
27 | 28 |
28 from wokkel import data_form | 29 from wokkel import data_form |
29 | 30 |
30 IQ_SET = '/iq[@type="set"]' | 31 IQ_SET = '/iq[@type="set"]' |
45 | 46 |
46 class Exp_Pipe(object): | 47 class Exp_Pipe(object): |
47 """This is a modified version of XEP-0096 to work with named pipes instead of files""" | 48 """This is a modified version of XEP-0096 to work with named pipes instead of files""" |
48 | 49 |
49 def __init__(self, host): | 50 def __init__(self, host): |
50 info(_("Plugin Pipe initialization")) | 51 log.info(_("Plugin Pipe initialization")) |
51 self.host = host | 52 self.host = host |
52 self.managed_stream_m = [self.host.plugins["XEP-0065"].NAMESPACE, | 53 self.managed_stream_m = [self.host.plugins["XEP-0065"].NAMESPACE, |
53 self.host.plugins["XEP-0047"].NAMESPACE] # Stream methods managed | 54 self.host.plugins["XEP-0047"].NAMESPACE] # Stream methods managed |
54 self.host.plugins["XEP-0095"].registerSIProfile(PROFILE_NAME, self.transferRequest) | 55 self.host.plugins["XEP-0095"].registerSIProfile(PROFILE_NAME, self.transferRequest) |
55 host.bridge.addMethod("pipeOut", ".plugin", in_sign='ssa{ss}s', out_sign='s', method=self.pipeOut) | 56 host.bridge.addMethod("pipeOut", ".plugin", in_sign='ssa{ss}s', out_sign='s', method=self.pipeOut) |
60 # current stream method, [failed stream methods], profile] | 61 # current stream method, [failed stream methods], profile] |
61 | 62 |
62 def _kill_id(self, approval_id, profile): | 63 def _kill_id(self, approval_id, profile): |
63 """Delete a waiting_for_approval id, called after timeout | 64 """Delete a waiting_for_approval id, called after timeout |
64 @param approval_id: id of _pipe_waiting_for_approval""" | 65 @param approval_id: id of _pipe_waiting_for_approval""" |
65 info(_("SI Pipe Transfer: TimeOut reached for id %s") % approval_id) | 66 log.info(_("SI Pipe Transfer: TimeOut reached for id %s") % approval_id) |
66 try: | 67 try: |
67 client = self.host.getClient(profile) | 68 client = self.host.getClient(profile) |
68 del client._pipe_waiting_for_approval[approval_id] | 69 del client._pipe_waiting_for_approval[approval_id] |
69 except KeyError: | 70 except KeyError: |
70 warning(_("kill id called on a non existant approval id")) | 71 log.warning(_("kill id called on a non existant approval id")) |
71 | 72 |
72 def transferRequest(self, iq_id, from_jid, si_id, si_mime_type, si_el, profile): | 73 def transferRequest(self, iq_id, from_jid, si_id, si_mime_type, si_el, profile): |
73 """Called when a pipe transfer is requested | 74 """Called when a pipe transfer is requested |
74 @param iq_id: id of the iq request | 75 @param iq_id: id of the iq request |
75 @param from_jid: jid of the sender | 76 @param from_jid: jid of the sender |
76 @param si_id: Stream Initiation session id | 77 @param si_id: Stream Initiation session id |
77 @param si_mime_type: Mime type of the pipe (or default "application/octet-stream" if unknown) | 78 @param si_mime_type: Mime type of the pipe (or default "application/octet-stream" if unknown) |
78 @param si_el: domish.Element of the request | 79 @param si_el: domish.Element of the request |
79 @param profile: %(doc_profile)s""" | 80 @param profile: %(doc_profile)s""" |
80 info(_("EXP-PIPE file transfer requested")) | 81 log.info(_("EXP-PIPE file transfer requested")) |
81 debug(si_el.toXml()) | 82 log.debug(si_el.toXml()) |
82 client = self.host.getClient(profile) | 83 client = self.host.getClient(profile) |
83 pipe_elts = filter(lambda elt: elt.name == 'pipe', si_el.elements()) | 84 pipe_elts = filter(lambda elt: elt.name == 'pipe', si_el.elements()) |
84 feature_elts = self.host.plugins["XEP-0020"].getFeatureElt(si_el) | 85 feature_elts = self.host.plugins["XEP-0020"].getFeatureElt(si_el) |
85 | 86 |
86 if not pipe_elts: | 87 if not pipe_elts: |
87 warning(_("No pipe element found")) | 88 log.warning(_("No pipe element found")) |
88 self.host.plugins["XEP-0095"].sendBadRequestError(iq_id, from_jid, profile) | 89 self.host.plugins["XEP-0095"].sendBadRequestError(iq_id, from_jid, profile) |
89 return | 90 return |
90 | 91 |
91 if feature_elts: | 92 if feature_elts: |
92 feature_el = feature_elts[0] | 93 feature_el = feature_elts[0] |
93 data_form.Form.fromElement(feature_el.firstChildElement()) | 94 data_form.Form.fromElement(feature_el.firstChildElement()) |
94 try: | 95 try: |
95 stream_method = self.host.plugins["XEP-0020"].negociate(feature_el, 'stream-method', self.managed_stream_m) | 96 stream_method = self.host.plugins["XEP-0020"].negociate(feature_el, 'stream-method', self.managed_stream_m) |
96 except KeyError: | 97 except KeyError: |
97 warning(_("No stream method found")) | 98 log.warning(_("No stream method found")) |
98 self.host.plugins["XEP-0095"].sendBadRequestError(iq_id, from_jid, profile) | 99 self.host.plugins["XEP-0095"].sendBadRequestError(iq_id, from_jid, profile) |
99 return | 100 return |
100 if not stream_method: | 101 if not stream_method: |
101 warning(_("Can't find a valid stream method")) | 102 log.warning(_("Can't find a valid stream method")) |
102 self.host.plugins["XEP-0095"].sendFailedError(iq_id, from_jid, profile) | 103 self.host.plugins["XEP-0095"].sendFailedError(iq_id, from_jid, profile) |
103 return | 104 return |
104 else: | 105 else: |
105 warning(_("No feature element found")) | 106 log.warning(_("No feature element found")) |
106 self.host.plugins["XEP-0095"].sendBadRequestError(iq_id, from_jid, profile) | 107 self.host.plugins["XEP-0095"].sendBadRequestError(iq_id, from_jid, profile) |
107 return | 108 return |
108 | 109 |
109 #if we are here, the transfer can start, we just need user's agreement | 110 #if we are here, the transfer can start, we just need user's agreement |
110 data = {"id": iq_id, "from": from_jid} | 111 data = {"id": iq_id, "from": from_jid} |
123 if timeout.active(): | 124 if timeout.active(): |
124 timeout.cancel() | 125 timeout.cancel() |
125 try: | 126 try: |
126 dest_path = frontend_data['dest_path'] | 127 dest_path = frontend_data['dest_path'] |
127 except KeyError: | 128 except KeyError: |
128 error(_('dest path not found in frontend_data')) | 129 log.error(_('dest path not found in frontend_data')) |
129 del(client._pipe_waiting_for_approval[sid]) | 130 del(client._pipe_waiting_for_approval[sid]) |
130 return | 131 return |
131 if stream_method == self.host.plugins["XEP-0065"].NAMESPACE: | 132 if stream_method == self.host.plugins["XEP-0065"].NAMESPACE: |
132 file_obj = open(dest_path, 'w+') | 133 file_obj = open(dest_path, 'w+') |
133 self.host.plugins["XEP-0065"].prepareToReceive(jid.JID(data['from']), sid, file_obj, None, self._transferSucceeded, self._transferFailed, profile) | 134 self.host.plugins["XEP-0065"].prepareToReceive(jid.JID(data['from']), sid, file_obj, None, self._transferSucceeded, self._transferFailed, profile) |
134 elif stream_method == self.host.plugins["XEP-0047"].NAMESPACE: | 135 elif stream_method == self.host.plugins["XEP-0047"].NAMESPACE: |
135 file_obj = open(dest_path, 'w+') | 136 file_obj = open(dest_path, 'w+') |
136 self.host.plugins["XEP-0047"].prepareToReceive(jid.JID(data['from']), sid, file_obj, None, self._transferSucceeded, self._transferFailed, profile) | 137 self.host.plugins["XEP-0047"].prepareToReceive(jid.JID(data['from']), sid, file_obj, None, self._transferSucceeded, self._transferFailed, profile) |
137 else: | 138 else: |
138 error(_("Unknown stream method, this should not happen at this stage, cancelling transfer")) | 139 log.error(_("Unknown stream method, this should not happen at this stage, cancelling transfer")) |
139 del(client._pipe_waiting_for_approval[sid]) | 140 del(client._pipe_waiting_for_approval[sid]) |
140 return | 141 return |
141 | 142 |
142 #we can send the iq result | 143 #we can send the iq result |
143 feature_elt = self.host.plugins["XEP-0020"].chooseOption({'stream-method': stream_method}) | 144 feature_elt = self.host.plugins["XEP-0020"].chooseOption({'stream-method': stream_method}) |
144 misc_elts = [] | 145 misc_elts = [] |
145 misc_elts.append(domish.Element((PROFILE, "file"))) | 146 misc_elts.append(domish.Element((PROFILE, "file"))) |
146 self.host.plugins["XEP-0095"].acceptStream(data["id"], data['from'], feature_elt, misc_elts, profile) | 147 self.host.plugins["XEP-0095"].acceptStream(data["id"], data['from'], feature_elt, misc_elts, profile) |
147 else: | 148 else: |
148 debug(_("Transfer [%s] refused"), sid) | 149 log.debug(_("Transfer [%s] refused"), sid) |
149 self.host.plugins["XEP-0095"].sendRejectedError(data["id"], data['from'], profile=profile) | 150 self.host.plugins["XEP-0095"].sendRejectedError(data["id"], data['from'], profile=profile) |
150 del(client._pipe_waiting_for_approval[sid]) | 151 del(client._pipe_waiting_for_approval[sid]) |
151 | 152 |
152 def _transferSucceeded(self, sid, file_obj, stream_method, profile): | 153 def _transferSucceeded(self, sid, file_obj, stream_method, profile): |
153 """Called by the stream method when transfer successfuly finished | 154 """Called by the stream method when transfer successfuly finished |
154 @param id: stream id""" | 155 @param id: stream id""" |
155 client = self.host.getClient(profile) | 156 client = self.host.getClient(profile) |
156 file_obj.close() | 157 file_obj.close() |
157 info(_('Transfer %s successfuly finished') % sid) | 158 log.info(_('Transfer %s successfuly finished') % sid) |
158 del(client._pipe_waiting_for_approval[sid]) | 159 del(client._pipe_waiting_for_approval[sid]) |
159 | 160 |
160 def _transferFailed(self, sid, file_obj, stream_method, reason, profile): | 161 def _transferFailed(self, sid, file_obj, stream_method, reason, profile): |
161 """Called when something went wrong with the transfer | 162 """Called when something went wrong with the transfer |
162 @param id: stream id | 163 @param id: stream id |
163 @param reason: can be TIMEOUT, IO_ERROR, PROTOCOL_ERROR""" | 164 @param reason: can be TIMEOUT, IO_ERROR, PROTOCOL_ERROR""" |
164 client = self.host.getClient(profile) | 165 client = self.host.getClient(profile) |
165 data, timeout, stream_method, failed_methods, profile = client._pipe_waiting_for_approval[sid] | 166 data, timeout, stream_method, failed_methods, profile = client._pipe_waiting_for_approval[sid] |
166 warning(_('Transfer %(id)s failed with stream method %(s_method)s') % {'id': sid, | 167 log.warning(_('Transfer %(id)s failed with stream method %(s_method)s') % {'id': sid, |
167 's_method': stream_method}) | 168 's_method': stream_method}) |
168 # filepath = file_obj.name | 169 # filepath = file_obj.name |
169 file_obj.close() | 170 file_obj.close() |
170 #TODO: session remenber (within a time limit) when a stream method fail, and avoid that stream method with full jid for the rest of the session | 171 #TODO: session remenber (within a time limit) when a stream method fail, and avoid that stream method with full jid for the rest of the session |
171 warning(_("All stream methods failed, can't transfer the file")) | 172 log.warning(_("All stream methods failed, can't transfer the file")) |
172 del(client._pipe_waiting_for_approval[sid]) | 173 del(client._pipe_waiting_for_approval[sid]) |
173 | 174 |
174 def pipeCb(self, filepath, sid, profile, IQ): | 175 def pipeCb(self, filepath, sid, profile, IQ): |
175 if IQ['type'] == "error": | 176 if IQ['type'] == "error": |
176 stanza_err = jab_error.exceptionFromStanza(IQ) | 177 stanza_err = jabber.error.exceptionFromStanza(IQ) |
177 if stanza_err.code == '403' and stanza_err.condition == 'forbidden': | 178 if stanza_err.code == '403' and stanza_err.condition == 'forbidden': |
178 debug(_("Pipe transfer refused by %s") % IQ['from']) | 179 log.debug(_("Pipe transfer refused by %s") % IQ['from']) |
179 self.host.bridge.newAlert(_("The contact %s refused your pipe stream") % IQ['from'], _("Pipe stream refused"), "INFO", profile) | 180 self.host.bridge.newAlert(_("The contact %s refused your pipe stream") % IQ['from'], _("Pipe stream refused"), "INFO", profile) |
180 else: | 181 else: |
181 warning(_("Error during pipe stream transfer with %s") % IQ['from']) | 182 log.warning(_("Error during pipe stream transfer with %s") % IQ['from']) |
182 self.host.bridge.newAlert(_("Something went wrong during the pipe stream session intialisation with %s") % IQ['from'], _("Pipe stream error"), "ERROR", profile) | 183 self.host.bridge.newAlert(_("Something went wrong during the pipe stream session intialisation with %s") % IQ['from'], _("Pipe stream error"), "ERROR", profile) |
183 return | 184 return |
184 | 185 |
185 si_elt = IQ.firstChildElement() | 186 si_elt = IQ.firstChildElement() |
186 | 187 |
187 if IQ['type'] != "result" or not si_elt or si_elt.name != "si": | 188 if IQ['type'] != "result" or not si_elt or si_elt.name != "si": |
188 error(_("Protocol error during file transfer")) | 189 log.error(_("Protocol error during file transfer")) |
189 return | 190 return |
190 | 191 |
191 feature_elts = self.host.plugins["XEP-0020"].getFeatureElt(si_elt) | 192 feature_elts = self.host.plugins["XEP-0020"].getFeatureElt(si_elt) |
192 if not feature_elts: | 193 if not feature_elts: |
193 warning(_("No feature element")) | 194 log.warning(_("No feature element")) |
194 return | 195 return |
195 | 196 |
196 choosed_options = self.host.plugins["XEP-0020"].getChoosedOptions(feature_elts[0]) | 197 choosed_options = self.host.plugins["XEP-0020"].getChoosedOptions(feature_elts[0]) |
197 try: | 198 try: |
198 stream_method = choosed_options["stream-method"] | 199 stream_method = choosed_options["stream-method"] |
199 except KeyError: | 200 except KeyError: |
200 warning(_("No stream method choosed")) | 201 log.warning(_("No stream method choosed")) |
201 return | 202 return |
202 | 203 |
203 if stream_method == self.host.plugins["XEP-0065"].NAMESPACE: | 204 if stream_method == self.host.plugins["XEP-0065"].NAMESPACE: |
204 #fd = os.open(filepath, os.O_RDONLY | os.O_NONBLOCK) #XXX: non blocking openingl cause issues with XEP-0065's FileSender | 205 #fd = os.open(filepath, os.O_RDONLY | os.O_NONBLOCK) #XXX: non blocking openingl cause issues with XEP-0065's FileSender |
205 #file_obj = os.fdopen(fd, 'r') | 206 #file_obj = os.fdopen(fd, 'r') |
209 #fd = os.open(filepath, os.O_RDONLY | os.O_NONBLOCK) #XXX: non blocking openingl cause issues with XEP-0065's FileSender | 210 #fd = os.open(filepath, os.O_RDONLY | os.O_NONBLOCK) #XXX: non blocking openingl cause issues with XEP-0065's FileSender |
210 #file_obj = os.fdopen(fd, 'r') | 211 #file_obj = os.fdopen(fd, 'r') |
211 file_obj = open(filepath, 'r') # XXX: we have to be sure that filepath is well opened, as reading can block it | 212 file_obj = open(filepath, 'r') # XXX: we have to be sure that filepath is well opened, as reading can block it |
212 self.host.plugins["XEP-0047"].startStream(file_obj, jid.JID(IQ['from']), sid, None, self.sendSuccessCb, self.sendFailureCb, None, profile) | 213 self.host.plugins["XEP-0047"].startStream(file_obj, jid.JID(IQ['from']), sid, None, self.sendSuccessCb, self.sendFailureCb, None, profile) |
213 else: | 214 else: |
214 warning(_("Invalid stream method received")) | 215 log.warning(_("Invalid stream method received")) |
215 | 216 |
216 def pipeOut(self, to_jid, filepath, data={}, profile_key=C.PROF_KEY_NONE): | 217 def pipeOut(self, to_jid, filepath, data={}, profile_key=C.PROF_KEY_NONE): |
217 """send a file using EXP-PIPE | 218 """send a file using EXP-PIPE |
218 @to_jid: recipient | 219 @to_jid: recipient |
219 @filepath: absolute path to the named pipe to send | 220 @filepath: absolute path to the named pipe to send |
221 @param profile_key: %(doc_profile_key)s | 222 @param profile_key: %(doc_profile_key)s |
222 @return: an unique id to identify the transfer | 223 @return: an unique id to identify the transfer |
223 """ | 224 """ |
224 profile = self.host.memory.getProfileName(profile_key) | 225 profile = self.host.memory.getProfileName(profile_key) |
225 if not profile: | 226 if not profile: |
226 warning(_("Trying to send a file from an unknown profile")) | 227 log.warning(_("Trying to send a file from an unknown profile")) |
227 return "" | 228 return "" |
228 feature_elt = self.host.plugins["XEP-0020"].proposeFeatures({'stream-method': self.managed_stream_m}) | 229 feature_elt = self.host.plugins["XEP-0020"].proposeFeatures({'stream-method': self.managed_stream_m}) |
229 | 230 |
230 pipe_transfer_elts = [] | 231 pipe_transfer_elts = [] |
231 | 232 |
235 sid, offer = self.host.plugins["XEP-0095"].proposeStream(jid.JID(to_jid), PROFILE, feature_elt, pipe_transfer_elts, profile_key=profile) | 236 sid, offer = self.host.plugins["XEP-0095"].proposeStream(jid.JID(to_jid), PROFILE, feature_elt, pipe_transfer_elts, profile_key=profile) |
236 offer.addCallback(self.pipeCb, filepath, sid, profile) | 237 offer.addCallback(self.pipeCb, filepath, sid, profile) |
237 return sid | 238 return sid |
238 | 239 |
239 def sendSuccessCb(self, sid, file_obj, stream_method, profile): | 240 def sendSuccessCb(self, sid, file_obj, stream_method, profile): |
240 info(_('Transfer %s successfuly finished') % sid) | 241 log.info(_('Transfer %s successfuly finished') % sid) |
241 file_obj.close() | 242 file_obj.close() |
242 | 243 |
243 def sendFailureCb(self, sid, file_obj, stream_method, reason, profile): | 244 def sendFailureCb(self, sid, file_obj, stream_method, reason, profile): |
244 file_obj.close() | 245 file_obj.close() |
245 warning(_('Transfer %(id)s failed with stream method %(s_method)s %(profile)s') % {'id': sid, "s_method": stream_method, "profile": profile}) | 246 log.warning(_('Transfer %(id)s failed with stream method %(s_method)s %(profile)s') % {'id': sid, "s_method": stream_method, "profile": profile}) |