Mercurial > libervia-backend
comparison src/plugins/plugin_exp_pipe.py @ 538:2c4016921403
core, frontends, bridgen plugins: fixed methods which were unproperly managing multi-profiles
- added profile argument to askConfirmation, actionResult, actionResultExt, entityDataUpdated, confirmationAnswer, getProgress
- core, frontends: fixed calls/signals according to new bridge API
- user of proper profile namespace for progression indicators and dialogs
- memory: getParam* now return bool when param type is bool
- memory: added getStringParam* to return string instead of typed value
- core, memory, storage, quick_frontend: getHistory now manage properly multi-profiles
- plugins XEP-0047, XEP-0054, XEP-0065, XEP-0077, XEP-0096; multi-profiles proper handling
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 10 Nov 2012 16:38:16 +0100 |
parents | 2a072735e459 |
children | ca13633d3b6b |
comparison
equal
deleted
inserted
replaced
537:28cddc96c4ed | 538:2c4016921403 |
---|---|
19 along with this program. If not, see <http://www.gnu.org/licenses/>. | 19 along with this program. If not, see <http://www.gnu.org/licenses/>. |
20 """ | 20 """ |
21 | 21 |
22 from logging import debug, info, warning, error | 22 from logging import debug, info, warning, error |
23 from twisted.words.xish import domish | 23 from twisted.words.xish import domish |
24 from twisted.internet import protocol | 24 from twisted.words.protocols.jabber import jid |
25 from twisted.words.protocols.jabber import client, jid | |
26 from twisted.words.protocols.jabber import error as jab_error | 25 from twisted.words.protocols.jabber import error as jab_error |
27 import os, os.path | 26 import os, os.path |
28 from twisted.internet import reactor | 27 from twisted.internet import reactor |
29 import pdb | 28 from sat.core.exceptions import ProfileNotInCacheError |
30 | 29 |
31 from zope.interface import implements | 30 from wokkel import data_form |
32 | |
33 from wokkel import disco, iwokkel, data_form | |
34 | 31 |
35 IQ_SET = '/iq[@type="set"]' | 32 IQ_SET = '/iq[@type="set"]' |
36 PROFILE_NAME = "pipe-transfer" | 33 PROFILE_NAME = "pipe-transfer" |
37 PROFILE = "http://jabber.org/protocol/si/profile/" + PROFILE_NAME | 34 PROFILE = "http://jabber.org/protocol/si/profile/" + PROFILE_NAME |
38 | 35 |
51 """This is a modified version of XEP-0096 to work with named pipes instead of files""" | 48 """This is a modified version of XEP-0096 to work with named pipes instead of files""" |
52 | 49 |
53 def __init__(self, host): | 50 def __init__(self, host): |
54 info(_("Plugin Pipe initialization")) | 51 info(_("Plugin Pipe initialization")) |
55 self.host = host | 52 self.host = host |
56 self._waiting_for_approval = {} #key = id, value = [transfer data, IdelayedCall Reactor timeout, | |
57 # current stream method, [failed stream methods], profile] | |
58 self.managed_stream_m = [self.host.plugins["XEP-0065"].NAMESPACE, | 53 self.managed_stream_m = [self.host.plugins["XEP-0065"].NAMESPACE, |
59 self.host.plugins["XEP-0047"].NAMESPACE] #Stream methods managed | 54 self.host.plugins["XEP-0047"].NAMESPACE] #Stream methods managed |
60 self.host.plugins["XEP-0095"].registerSIProfile(PROFILE_NAME, self.transferRequest) | 55 self.host.plugins["XEP-0095"].registerSIProfile(PROFILE_NAME, self.transferRequest) |
61 host.bridge.addMethod("pipeOut", ".plugin", in_sign='ssa{ss}s', out_sign='s', method=self.pipeOut) | 56 host.bridge.addMethod("pipeOut", ".plugin", in_sign='ssa{ss}s', out_sign='s', method=self.pipeOut) |
62 | 57 |
63 def _kill_id(self, approval_id): | 58 def profileConnected(self, profile): |
59 client = self.host.getClient(profile) | |
60 client._pipe_waiting_for_approval = {} #key = id, value = [transfer data, IdelayedCall Reactor timeout, | |
61 # current stream method, [failed stream methods], profile] | |
62 | |
63 def _kill_id(self, approval_id, profile): | |
64 """Delete a waiting_for_approval id, called after timeout | 64 """Delete a waiting_for_approval id, called after timeout |
65 @param approval_id: id of _waiting_for_approval""" | 65 @param approval_id: id of _pipe_waiting_for_approval""" |
66 info(_("SI Pipe Transfer: TimeOut reached for id %s") % approval_id); | 66 info(_("SI Pipe Transfer: TimeOut reached for id %s") % approval_id); |
67 try: | 67 try: |
68 del self._waiting_for_approval[approval_id] | 68 client = self.host.getClient(profile) |
69 del client._pipe_waiting_for_approval[approval_id] | |
69 except KeyError: | 70 except KeyError: |
70 warning(_("kill id called on a non existant approval id")) | 71 warning(_("kill id called on a non existant approval id")) |
71 | 72 |
72 def transferRequest(self, iq_id, from_jid, si_id, si_mime_type, si_el, profile): | 73 def transferRequest(self, iq_id, from_jid, si_id, si_mime_type, si_el, profile): |
73 """Called when a pipe transfer is requested | 74 """Called when a pipe transfer is requested |
77 @param si_mime_type: Mime type of the pipe (or default "application/octet-stream" if unknown) | 78 @param si_mime_type: Mime type of the pipe (or default "application/octet-stream" if unknown) |
78 @param si_el: domish.Element of the request | 79 @param si_el: domish.Element of the request |
79 @param profile: %(doc_profile)s""" | 80 @param profile: %(doc_profile)s""" |
80 info (_("EXP-PIPE file transfer requested")) | 81 info (_("EXP-PIPE file transfer requested")) |
81 debug(si_el.toXml()) | 82 debug(si_el.toXml()) |
83 client = self.host.getClient(profile) | |
84 if not client: | |
85 raise ProfileNotInCacheError | |
82 pipe_elts = filter(lambda elt: elt.name == 'pipe', si_el.elements()) | 86 pipe_elts = filter(lambda elt: elt.name == 'pipe', si_el.elements()) |
83 feature_elts = self.host.plugins["XEP-0020"].getFeatureElt(si_el) | 87 feature_elts = self.host.plugins["XEP-0020"].getFeatureElt(si_el) |
84 | 88 |
85 if not pipe_elts: | 89 if not pipe_elts: |
86 warning(_("No pipe element found")) | 90 warning(_("No pipe element found")) |
105 self.host.plugins["XEP-0095"].sendBadRequestError(iq_id, from_jid, profile) | 109 self.host.plugins["XEP-0095"].sendBadRequestError(iq_id, from_jid, profile) |
106 return | 110 return |
107 | 111 |
108 #if we are here, the transfer can start, we just need user's agreement | 112 #if we are here, the transfer can start, we just need user's agreement |
109 data={ "id": iq_id, "from":from_jid } | 113 data={ "id": iq_id, "from":from_jid } |
110 self._waiting_for_approval[si_id] = [data, reactor.callLater(300, self._kill_id, si_id), stream_method, [], profile] | 114 client._pipe_waiting_for_approval[si_id] = [data, reactor.callLater(300, self._kill_id, si_id), stream_method, [], profile] |
111 | 115 |
112 self.host.askConfirmation(si_id, "PIPE_TRANSFER", data, self.confirmationCB) | 116 self.host.askConfirmation(si_id, "PIPE_TRANSFER", data, self.confirmationCB, profile) |
113 | 117 |
114 | 118 |
115 def confirmationCB(self, sid, accepted, frontend_data): | 119 def confirmationCB(self, sid, accepted, frontend_data, profile): |
116 """Called on confirmation answer | 120 """Called on confirmation answer |
117 @param sid: file transfer session id | 121 @param sid: file transfer session id |
118 @param accepted: True if file transfer is accepted | 122 @param accepted: True if file transfer is accepted |
119 @param frontend_data: data sent by frontend""" | 123 @param frontend_data: data sent by frontend""" |
120 data, timeout, stream_method, failed_methods, profile = self._waiting_for_approval[sid] | 124 client = self.host.getClient(profile) |
125 if not client: | |
126 raise ProfileNotInCacheError | |
127 data, timeout, stream_method, failed_methods, profile = client._pipe_waiting_for_approval[sid] | |
121 if accepted: | 128 if accepted: |
122 if timeout.active(): | 129 if timeout.active(): |
123 timeout.cancel() | 130 timeout.cancel() |
124 try: | 131 try: |
125 dest_path = frontend_data['dest_path'] | 132 dest_path = frontend_data['dest_path'] |
126 except KeyError: | 133 except KeyError: |
127 error(_('dest path not found in frontend_data')) | 134 error(_('dest path not found in frontend_data')) |
128 del(self._waiting_for_approval[sid]) | 135 del(client._pipe_waiting_for_approval[sid]) |
129 return | 136 return |
130 if stream_method == self.host.plugins["XEP-0065"].NAMESPACE: | 137 if stream_method == self.host.plugins["XEP-0065"].NAMESPACE: |
131 file_obj = open(dest_path, 'w+') | 138 file_obj = open(dest_path, 'w+') |
132 self.host.plugins["XEP-0065"].prepareToReceive(jid.JID(data['from']), sid, file_obj, None, self._transferSucceeded, self._transferFailed) | 139 self.host.plugins["XEP-0065"].prepareToReceive(jid.JID(data['from']), sid, file_obj, None, self._transferSucceeded, self._transferFailed, profile) |
133 elif stream_method == self.host.plugins["XEP-0047"].NAMESPACE: | 140 elif stream_method == self.host.plugins["XEP-0047"].NAMESPACE: |
134 file_obj = open(dest_path, 'w+') | 141 file_obj = open(dest_path, 'w+') |
135 self.host.plugins["XEP-0047"].prepareToReceive(jid.JID(data['from']), sid, file_obj, None, self._transferSucceeded, self._transferFailed) | 142 self.host.plugins["XEP-0047"].prepareToReceive(jid.JID(data['from']), sid, file_obj, None, self._transferSucceeded, self._transferFailed, profile) |
136 else: | 143 else: |
137 error(_("Unknown stream method, this should not happen at this stage, cancelling transfer")) | 144 error(_("Unknown stream method, this should not happen at this stage, cancelling transfer")) |
138 del(self._waiting_for_approval[sid]) | 145 del(client._pipe_waiting_for_approval[sid]) |
139 return | 146 return |
140 | 147 |
141 #we can send the iq result | 148 #we can send the iq result |
142 feature_elt = self.host.plugins["XEP-0020"].chooseOption({'stream-method':stream_method}) | 149 feature_elt = self.host.plugins["XEP-0020"].chooseOption({'stream-method':stream_method}) |
143 misc_elts = [] | 150 misc_elts = [] |
144 misc_elts.append(domish.Element((PROFILE, "file"))) | 151 misc_elts.append(domish.Element((PROFILE, "file"))) |
145 self.host.plugins["XEP-0095"].acceptStream(data["id"], data['from'], feature_elt, misc_elts, profile) | 152 self.host.plugins["XEP-0095"].acceptStream(data["id"], data['from'], feature_elt, misc_elts, profile) |
146 else: | 153 else: |
147 debug (_("Transfer [%s] refused"), sid) | 154 debug (_("Transfer [%s] refused"), sid) |
148 self.host.plugins["XEP-0095"].sendRejectedError (data["id"], data['from'], profile=profile) | 155 self.host.plugins["XEP-0095"].sendRejectedError (data["id"], data['from'], profile=profile) |
149 del(self._waiting_for_approval[sid]) | 156 del(client._pipe_waiting_for_approval[sid]) |
150 | 157 |
151 def _transferSucceeded(self, sid, file_obj, stream_method): | 158 def _transferSucceeded(self, sid, file_obj, stream_method, profile): |
152 """Called by the stream method when transfer successfuly finished | 159 """Called by the stream method when transfer successfuly finished |
153 @param id: stream id""" | 160 @param id: stream id""" |
161 client = self.host.getClient(profile) | |
162 if not client: | |
163 raise ProfileNotInCacheError | |
154 file_obj.close() | 164 file_obj.close() |
155 info(_('Transfer %s successfuly finished') % sid) | 165 info(_('Transfer %s successfuly finished') % sid) |
156 del(self._waiting_for_approval[sid]) | 166 del(client._pipe_waiting_for_approval[sid]) |
157 | 167 |
158 def _transferFailed(self, sid, file_obj, stream_method, reason): | 168 def _transferFailed(self, sid, file_obj, stream_method, reason, profile): |
159 """Called when something went wrong with the transfer | 169 """Called when something went wrong with the transfer |
160 @param id: stream id | 170 @param id: stream id |
161 @param reason: can be TIMEOUT, IO_ERROR, PROTOCOL_ERROR""" | 171 @param reason: can be TIMEOUT, IO_ERROR, PROTOCOL_ERROR""" |
162 data, timeout, stream_method, failed_methods, profile = self._waiting_for_approval[sid] | 172 client = self.host.getClient(profile) |
173 if not client: | |
174 raise ProfileNotInCacheError | |
175 data, timeout, stream_method, failed_methods, profile = client._pipe_waiting_for_approval[sid] | |
163 warning(_('Transfer %(id)s failed with stream method %(s_method)s') % { 'id': sid, | 176 warning(_('Transfer %(id)s failed with stream method %(s_method)s') % { 'id': sid, |
164 's_method': stream_method }) | 177 's_method': stream_method }) |
165 filepath = file_obj.name | 178 filepath = file_obj.name |
166 file_obj.close() | 179 file_obj.close() |
167 #TODO: session remenber (within a time limit) when a stream method fail, and avoid that stream method with full jid for the rest of the session | 180 #TODO: session remenber (within a time limit) when a stream method fail, and avoid that stream method with full jid for the rest of the session |
168 warning(_("All stream methods failed, can't transfer the file")) | 181 warning(_("All stream methods failed, can't transfer the file")) |
169 del(self._waiting_for_approval[sid]) | 182 del(client._pipe_waiting_for_approval[sid]) |
170 | 183 |
171 def pipeCb(self, profile, filepath, sid, IQ): | 184 def pipeCb(self, filepath, sid, profile, IQ): |
172 if IQ['type'] == "error": | 185 if IQ['type'] == "error": |
173 stanza_err = jab_error.exceptionFromStanza(IQ) | 186 stanza_err = jab_error.exceptionFromStanza(IQ) |
174 if stanza_err.code == '403' and stanza_err.condition == 'forbidden': | 187 if stanza_err.code == '403' and stanza_err.condition == 'forbidden': |
175 debug(_("Pipe transfer refused by %s") % IQ['from']) | 188 debug(_("Pipe transfer refused by %s") % IQ['from']) |
176 self.host.bridge.newAlert(_("The contact %s refused your pipe stream") % IQ['from'], _("Pipe stream refused"), "INFO", profile) | 189 self.host.bridge.newAlert(_("The contact %s refused your pipe stream") % IQ['from'], _("Pipe stream refused"), "INFO", profile) |
228 | 241 |
229 pipe_elt = domish.Element((PROFILE, 'pipe')) | 242 pipe_elt = domish.Element((PROFILE, 'pipe')) |
230 pipe_transfer_elts.append(pipe_elt) | 243 pipe_transfer_elts.append(pipe_elt) |
231 | 244 |
232 sid, offer = self.host.plugins["XEP-0095"].proposeStream(jid.JID(to_jid), PROFILE, feature_elt, pipe_transfer_elts, profile_key = profile) | 245 sid, offer = self.host.plugins["XEP-0095"].proposeStream(jid.JID(to_jid), PROFILE, feature_elt, pipe_transfer_elts, profile_key = profile) |
233 offer.addCallback(self.pipeCb, profile, filepath, sid) | 246 offer.addCallback(self.pipeCb, filepath, sid, profile) |
234 return sid | 247 return sid |
235 | 248 |
236 def sendSuccessCb(self, sid, file_obj, stream_method): | 249 def sendSuccessCb(self, sid, file_obj, stream_method, profile): |
237 info(_('Transfer %s successfuly finished') % sid) | 250 info(_('Transfer %s successfuly finished') % sid) |
238 file_obj.close() | 251 file_obj.close() |
239 | 252 |
240 def sendFailureCb(self, sid, file_obj, stream_method, reason): | 253 def sendFailureCb(self, sid, file_obj, stream_method, reason, profile): |
241 file_obj.close() | 254 file_obj.close() |
242 warning(_('Transfer %(id)s failed with stream method %(s_method)s') % { 'id': sid, "s_method": stream_method }) | 255 warning(_('Transfer %(id)s failed with stream method %(s_method)s %(profile)s') % { 'id': sid, "s_method": stream_method, "profile": profile }) |
256 |