comparison src/plugins/plugin_exp_pipe.py @ 538:2c4016921403

core, frontends, bridgen plugins: fixed methods which were unproperly managing multi-profiles - added profile argument to askConfirmation, actionResult, actionResultExt, entityDataUpdated, confirmationAnswer, getProgress - core, frontends: fixed calls/signals according to new bridge API - user of proper profile namespace for progression indicators and dialogs - memory: getParam* now return bool when param type is bool - memory: added getStringParam* to return string instead of typed value - core, memory, storage, quick_frontend: getHistory now manage properly multi-profiles - plugins XEP-0047, XEP-0054, XEP-0065, XEP-0077, XEP-0096; multi-profiles proper handling
author Goffi <goffi@goffi.org>
date Sat, 10 Nov 2012 16:38:16 +0100
parents 2a072735e459
children ca13633d3b6b
comparison
equal deleted inserted replaced
537:28cddc96c4ed 538:2c4016921403
19 along with this program. If not, see <http://www.gnu.org/licenses/>. 19 along with this program. If not, see <http://www.gnu.org/licenses/>.
20 """ 20 """
21 21
22 from logging import debug, info, warning, error 22 from logging import debug, info, warning, error
23 from twisted.words.xish import domish 23 from twisted.words.xish import domish
24 from twisted.internet import protocol 24 from twisted.words.protocols.jabber import jid
25 from twisted.words.protocols.jabber import client, jid
26 from twisted.words.protocols.jabber import error as jab_error 25 from twisted.words.protocols.jabber import error as jab_error
27 import os, os.path 26 import os, os.path
28 from twisted.internet import reactor 27 from twisted.internet import reactor
29 import pdb 28 from sat.core.exceptions import ProfileNotInCacheError
30 29
31 from zope.interface import implements 30 from wokkel import data_form
32
33 from wokkel import disco, iwokkel, data_form
34 31
35 IQ_SET = '/iq[@type="set"]' 32 IQ_SET = '/iq[@type="set"]'
36 PROFILE_NAME = "pipe-transfer" 33 PROFILE_NAME = "pipe-transfer"
37 PROFILE = "http://jabber.org/protocol/si/profile/" + PROFILE_NAME 34 PROFILE = "http://jabber.org/protocol/si/profile/" + PROFILE_NAME
38 35
51 """This is a modified version of XEP-0096 to work with named pipes instead of files""" 48 """This is a modified version of XEP-0096 to work with named pipes instead of files"""
52 49
53 def __init__(self, host): 50 def __init__(self, host):
54 info(_("Plugin Pipe initialization")) 51 info(_("Plugin Pipe initialization"))
55 self.host = host 52 self.host = host
56 self._waiting_for_approval = {} #key = id, value = [transfer data, IdelayedCall Reactor timeout,
57 # current stream method, [failed stream methods], profile]
58 self.managed_stream_m = [self.host.plugins["XEP-0065"].NAMESPACE, 53 self.managed_stream_m = [self.host.plugins["XEP-0065"].NAMESPACE,
59 self.host.plugins["XEP-0047"].NAMESPACE] #Stream methods managed 54 self.host.plugins["XEP-0047"].NAMESPACE] #Stream methods managed
60 self.host.plugins["XEP-0095"].registerSIProfile(PROFILE_NAME, self.transferRequest) 55 self.host.plugins["XEP-0095"].registerSIProfile(PROFILE_NAME, self.transferRequest)
61 host.bridge.addMethod("pipeOut", ".plugin", in_sign='ssa{ss}s', out_sign='s', method=self.pipeOut) 56 host.bridge.addMethod("pipeOut", ".plugin", in_sign='ssa{ss}s', out_sign='s', method=self.pipeOut)
62 57
63 def _kill_id(self, approval_id): 58 def profileConnected(self, profile):
59 client = self.host.getClient(profile)
60 client._pipe_waiting_for_approval = {} #key = id, value = [transfer data, IdelayedCall Reactor timeout,
61 # current stream method, [failed stream methods], profile]
62
63 def _kill_id(self, approval_id, profile):
64 """Delete a waiting_for_approval id, called after timeout 64 """Delete a waiting_for_approval id, called after timeout
65 @param approval_id: id of _waiting_for_approval""" 65 @param approval_id: id of _pipe_waiting_for_approval"""
66 info(_("SI Pipe Transfer: TimeOut reached for id %s") % approval_id); 66 info(_("SI Pipe Transfer: TimeOut reached for id %s") % approval_id);
67 try: 67 try:
68 del self._waiting_for_approval[approval_id] 68 client = self.host.getClient(profile)
69 del client._pipe_waiting_for_approval[approval_id]
69 except KeyError: 70 except KeyError:
70 warning(_("kill id called on a non existant approval id")) 71 warning(_("kill id called on a non existant approval id"))
71 72
72 def transferRequest(self, iq_id, from_jid, si_id, si_mime_type, si_el, profile): 73 def transferRequest(self, iq_id, from_jid, si_id, si_mime_type, si_el, profile):
73 """Called when a pipe transfer is requested 74 """Called when a pipe transfer is requested
77 @param si_mime_type: Mime type of the pipe (or default "application/octet-stream" if unknown) 78 @param si_mime_type: Mime type of the pipe (or default "application/octet-stream" if unknown)
78 @param si_el: domish.Element of the request 79 @param si_el: domish.Element of the request
79 @param profile: %(doc_profile)s""" 80 @param profile: %(doc_profile)s"""
80 info (_("EXP-PIPE file transfer requested")) 81 info (_("EXP-PIPE file transfer requested"))
81 debug(si_el.toXml()) 82 debug(si_el.toXml())
83 client = self.host.getClient(profile)
84 if not client:
85 raise ProfileNotInCacheError
82 pipe_elts = filter(lambda elt: elt.name == 'pipe', si_el.elements()) 86 pipe_elts = filter(lambda elt: elt.name == 'pipe', si_el.elements())
83 feature_elts = self.host.plugins["XEP-0020"].getFeatureElt(si_el) 87 feature_elts = self.host.plugins["XEP-0020"].getFeatureElt(si_el)
84 88
85 if not pipe_elts: 89 if not pipe_elts:
86 warning(_("No pipe element found")) 90 warning(_("No pipe element found"))
105 self.host.plugins["XEP-0095"].sendBadRequestError(iq_id, from_jid, profile) 109 self.host.plugins["XEP-0095"].sendBadRequestError(iq_id, from_jid, profile)
106 return 110 return
107 111
108 #if we are here, the transfer can start, we just need user's agreement 112 #if we are here, the transfer can start, we just need user's agreement
109 data={ "id": iq_id, "from":from_jid } 113 data={ "id": iq_id, "from":from_jid }
110 self._waiting_for_approval[si_id] = [data, reactor.callLater(300, self._kill_id, si_id), stream_method, [], profile] 114 client._pipe_waiting_for_approval[si_id] = [data, reactor.callLater(300, self._kill_id, si_id), stream_method, [], profile]
111 115
112 self.host.askConfirmation(si_id, "PIPE_TRANSFER", data, self.confirmationCB) 116 self.host.askConfirmation(si_id, "PIPE_TRANSFER", data, self.confirmationCB, profile)
113 117
114 118
115 def confirmationCB(self, sid, accepted, frontend_data): 119 def confirmationCB(self, sid, accepted, frontend_data, profile):
116 """Called on confirmation answer 120 """Called on confirmation answer
117 @param sid: file transfer session id 121 @param sid: file transfer session id
118 @param accepted: True if file transfer is accepted 122 @param accepted: True if file transfer is accepted
119 @param frontend_data: data sent by frontend""" 123 @param frontend_data: data sent by frontend"""
120 data, timeout, stream_method, failed_methods, profile = self._waiting_for_approval[sid] 124 client = self.host.getClient(profile)
125 if not client:
126 raise ProfileNotInCacheError
127 data, timeout, stream_method, failed_methods, profile = client._pipe_waiting_for_approval[sid]
121 if accepted: 128 if accepted:
122 if timeout.active(): 129 if timeout.active():
123 timeout.cancel() 130 timeout.cancel()
124 try: 131 try:
125 dest_path = frontend_data['dest_path'] 132 dest_path = frontend_data['dest_path']
126 except KeyError: 133 except KeyError:
127 error(_('dest path not found in frontend_data')) 134 error(_('dest path not found in frontend_data'))
128 del(self._waiting_for_approval[sid]) 135 del(client._pipe_waiting_for_approval[sid])
129 return 136 return
130 if stream_method == self.host.plugins["XEP-0065"].NAMESPACE: 137 if stream_method == self.host.plugins["XEP-0065"].NAMESPACE:
131 file_obj = open(dest_path, 'w+') 138 file_obj = open(dest_path, 'w+')
132 self.host.plugins["XEP-0065"].prepareToReceive(jid.JID(data['from']), sid, file_obj, None, self._transferSucceeded, self._transferFailed) 139 self.host.plugins["XEP-0065"].prepareToReceive(jid.JID(data['from']), sid, file_obj, None, self._transferSucceeded, self._transferFailed, profile)
133 elif stream_method == self.host.plugins["XEP-0047"].NAMESPACE: 140 elif stream_method == self.host.plugins["XEP-0047"].NAMESPACE:
134 file_obj = open(dest_path, 'w+') 141 file_obj = open(dest_path, 'w+')
135 self.host.plugins["XEP-0047"].prepareToReceive(jid.JID(data['from']), sid, file_obj, None, self._transferSucceeded, self._transferFailed) 142 self.host.plugins["XEP-0047"].prepareToReceive(jid.JID(data['from']), sid, file_obj, None, self._transferSucceeded, self._transferFailed, profile)
136 else: 143 else:
137 error(_("Unknown stream method, this should not happen at this stage, cancelling transfer")) 144 error(_("Unknown stream method, this should not happen at this stage, cancelling transfer"))
138 del(self._waiting_for_approval[sid]) 145 del(client._pipe_waiting_for_approval[sid])
139 return 146 return
140 147
141 #we can send the iq result 148 #we can send the iq result
142 feature_elt = self.host.plugins["XEP-0020"].chooseOption({'stream-method':stream_method}) 149 feature_elt = self.host.plugins["XEP-0020"].chooseOption({'stream-method':stream_method})
143 misc_elts = [] 150 misc_elts = []
144 misc_elts.append(domish.Element((PROFILE, "file"))) 151 misc_elts.append(domish.Element((PROFILE, "file")))
145 self.host.plugins["XEP-0095"].acceptStream(data["id"], data['from'], feature_elt, misc_elts, profile) 152 self.host.plugins["XEP-0095"].acceptStream(data["id"], data['from'], feature_elt, misc_elts, profile)
146 else: 153 else:
147 debug (_("Transfer [%s] refused"), sid) 154 debug (_("Transfer [%s] refused"), sid)
148 self.host.plugins["XEP-0095"].sendRejectedError (data["id"], data['from'], profile=profile) 155 self.host.plugins["XEP-0095"].sendRejectedError (data["id"], data['from'], profile=profile)
149 del(self._waiting_for_approval[sid]) 156 del(client._pipe_waiting_for_approval[sid])
150 157
151 def _transferSucceeded(self, sid, file_obj, stream_method): 158 def _transferSucceeded(self, sid, file_obj, stream_method, profile):
152 """Called by the stream method when transfer successfuly finished 159 """Called by the stream method when transfer successfuly finished
153 @param id: stream id""" 160 @param id: stream id"""
161 client = self.host.getClient(profile)
162 if not client:
163 raise ProfileNotInCacheError
154 file_obj.close() 164 file_obj.close()
155 info(_('Transfer %s successfuly finished') % sid) 165 info(_('Transfer %s successfuly finished') % sid)
156 del(self._waiting_for_approval[sid]) 166 del(client._pipe_waiting_for_approval[sid])
157 167
158 def _transferFailed(self, sid, file_obj, stream_method, reason): 168 def _transferFailed(self, sid, file_obj, stream_method, reason, profile):
159 """Called when something went wrong with the transfer 169 """Called when something went wrong with the transfer
160 @param id: stream id 170 @param id: stream id
161 @param reason: can be TIMEOUT, IO_ERROR, PROTOCOL_ERROR""" 171 @param reason: can be TIMEOUT, IO_ERROR, PROTOCOL_ERROR"""
162 data, timeout, stream_method, failed_methods, profile = self._waiting_for_approval[sid] 172 client = self.host.getClient(profile)
173 if not client:
174 raise ProfileNotInCacheError
175 data, timeout, stream_method, failed_methods, profile = client._pipe_waiting_for_approval[sid]
163 warning(_('Transfer %(id)s failed with stream method %(s_method)s') % { 'id': sid, 176 warning(_('Transfer %(id)s failed with stream method %(s_method)s') % { 'id': sid,
164 's_method': stream_method }) 177 's_method': stream_method })
165 filepath = file_obj.name 178 filepath = file_obj.name
166 file_obj.close() 179 file_obj.close()
167 #TODO: session remenber (within a time limit) when a stream method fail, and avoid that stream method with full jid for the rest of the session 180 #TODO: session remenber (within a time limit) when a stream method fail, and avoid that stream method with full jid for the rest of the session
168 warning(_("All stream methods failed, can't transfer the file")) 181 warning(_("All stream methods failed, can't transfer the file"))
169 del(self._waiting_for_approval[sid]) 182 del(client._pipe_waiting_for_approval[sid])
170 183
171 def pipeCb(self, profile, filepath, sid, IQ): 184 def pipeCb(self, filepath, sid, profile, IQ):
172 if IQ['type'] == "error": 185 if IQ['type'] == "error":
173 stanza_err = jab_error.exceptionFromStanza(IQ) 186 stanza_err = jab_error.exceptionFromStanza(IQ)
174 if stanza_err.code == '403' and stanza_err.condition == 'forbidden': 187 if stanza_err.code == '403' and stanza_err.condition == 'forbidden':
175 debug(_("Pipe transfer refused by %s") % IQ['from']) 188 debug(_("Pipe transfer refused by %s") % IQ['from'])
176 self.host.bridge.newAlert(_("The contact %s refused your pipe stream") % IQ['from'], _("Pipe stream refused"), "INFO", profile) 189 self.host.bridge.newAlert(_("The contact %s refused your pipe stream") % IQ['from'], _("Pipe stream refused"), "INFO", profile)
228 241
229 pipe_elt = domish.Element((PROFILE, 'pipe')) 242 pipe_elt = domish.Element((PROFILE, 'pipe'))
230 pipe_transfer_elts.append(pipe_elt) 243 pipe_transfer_elts.append(pipe_elt)
231 244
232 sid, offer = self.host.plugins["XEP-0095"].proposeStream(jid.JID(to_jid), PROFILE, feature_elt, pipe_transfer_elts, profile_key = profile) 245 sid, offer = self.host.plugins["XEP-0095"].proposeStream(jid.JID(to_jid), PROFILE, feature_elt, pipe_transfer_elts, profile_key = profile)
233 offer.addCallback(self.pipeCb, profile, filepath, sid) 246 offer.addCallback(self.pipeCb, filepath, sid, profile)
234 return sid 247 return sid
235 248
236 def sendSuccessCb(self, sid, file_obj, stream_method): 249 def sendSuccessCb(self, sid, file_obj, stream_method, profile):
237 info(_('Transfer %s successfuly finished') % sid) 250 info(_('Transfer %s successfuly finished') % sid)
238 file_obj.close() 251 file_obj.close()
239 252
240 def sendFailureCb(self, sid, file_obj, stream_method, reason): 253 def sendFailureCb(self, sid, file_obj, stream_method, reason, profile):
241 file_obj.close() 254 file_obj.close()
242 warning(_('Transfer %(id)s failed with stream method %(s_method)s') % { 'id': sid, "s_method": stream_method }) 255 warning(_('Transfer %(id)s failed with stream method %(s_method)s %(profile)s') % { 'id': sid, "s_method": stream_method, "profile": profile })
256