Mercurial > libervia-backend
comparison src/plugins/plugin_exp_pipe.py @ 1669:697effba0310
plugin pipe: rewritten plugin as a jingle application. The current implentation can, in some cases, block the backend, and is experimental only. Improvments are needed in the future.
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 25 Nov 2015 02:04:43 +0100 |
parents | 3265a2639182 |
children | dbd7c79aab2b |
comparison
equal
deleted
inserted
replaced
1668:a9e86f660653 | 1669:697effba0310 |
---|---|
15 # GNU Affero General Public License for more details. | 15 # GNU Affero General Public License for more details. |
16 | 16 |
17 # You should have received a copy of the GNU Affero General Public License | 17 # You should have received a copy of the GNU Affero General Public License |
18 # along with this program. If not, see <http://www.gnu.org/licenses/>. | 18 # along with this program. If not, see <http://www.gnu.org/licenses/>. |
19 | 19 |
20 from sat.core.i18n import _ | 20 from sat.core.i18n import _, D_ |
21 from sat.core.constants import Const as C | 21 from sat.core.constants import Const as C |
22 from sat.core.log import getLogger | 22 from sat.core.log import getLogger |
23 log = getLogger(__name__) | 23 log = getLogger(__name__) |
24 from sat.tools import xml_tools | |
24 from twisted.words.xish import domish | 25 from twisted.words.xish import domish |
25 from twisted.words.protocols.jabber import jid | 26 from twisted.words.protocols.jabber import jid |
26 from twisted.words.protocols import jabber | 27 from twisted.internet import defer |
27 from twisted.internet import reactor | |
28 | 28 |
29 from wokkel import data_form | 29 NS_PIPE = 'org.salut-a-toi/pipe' |
30 | 30 SECURITY_LIMIT=30 |
31 IQ_SET = '/iq[@type="set"]' | |
32 PROFILE_NAME = "pipe-transfer" | |
33 PROFILE = "http://jabber.org/protocol/si/profile/" + PROFILE_NAME | |
34 | 31 |
35 PLUGIN_INFO = { | 32 PLUGIN_INFO = { |
36 "name": "Pipe Plugin", | 33 "name": "Pipe Plugin", |
37 "import_name": "EXP-PIPE", | 34 "import_name": "EXP-PIPE", |
38 "type": "EXP", | 35 "type": "EXP", |
39 "protocols": ["EXP-PIPE"], | 36 "protocols": ["EXP-PIPE"], |
40 "dependencies": ["XEP-0020", "XEP-0095", "XEP-0065", "XEP-0047"], | 37 "dependencies": ["XEP-0166"], |
41 "main": "Exp_Pipe", | 38 "main": "Exp_Pipe", |
42 "handler": "no", | 39 "handler": "no", |
43 "description": _("""Implementation of SI Pipe Transfer""") | 40 "description": _("""Jingle Pipe Transfer experimental plugin""") |
44 } | 41 } |
45 | 42 |
43 CONFIRM = D_(u"{peer} wants to send you a pipe stream, do you accept ?") | |
44 CONFIRM_TITLE = D_(u"Pipe stream") | |
46 | 45 |
47 class Exp_Pipe(object): | 46 class Exp_Pipe(object): |
48 """This is a modified version of XEP-0096 to work with named pipes instead of files""" | 47 """This non standard jingle application works with named pipes""" |
49 | 48 |
50 def __init__(self, host): | 49 def __init__(self, host): |
51 log.info(_("Plugin Pipe initialization")) | 50 log.info(_("Plugin Pipe initialization")) |
52 self.host = host | 51 self.host = host |
53 self.managed_stream_m = [self.host.plugins["XEP-0065"].NAMESPACE, | 52 self._j = host.plugins["XEP-0166"] # shortcut to access jingle |
54 self.host.plugins["XEP-0047"].NAMESPACE] # Stream methods managed | 53 self._j.registerApplication(NS_PIPE, self) |
55 self.host.plugins["XEP-0095"].registerSIProfile(PROFILE_NAME, self.transferRequest) | 54 host.bridge.addMethod("pipeOut", ".plugin", in_sign='sss', out_sign='', method=self._pipeOut) |
56 host.bridge.addMethod("pipeOut", ".plugin", in_sign='ssa{ss}s', out_sign='s', method=self.pipeOut) | |
57 | 55 |
58 def profileConnected(self, profile): | 56 # jingle callbacks |
59 client = self.host.getClient(profile) | |
60 client._pipe_waiting_for_approval = {} # key = id, value = [transfer data, IdelayedCall Reactor timeout, | |
61 # current stream method, [failed stream methods], profile] | |
62 | 57 |
63 def _kill_id(self, approval_id, profile): | 58 def _pipeOut(self, peer_jid_s, filepath, profile_key=C.PROF_KEY_NONE): |
64 """Delete a waiting_for_approval id, called after timeout | 59 profile = self.host.memory.getProfileName(profile_key) |
65 @param approval_id: id of _pipe_waiting_for_approval""" | 60 self.pipeOut(jid.JID(peer_jid_s), filepath, profile) |
66 log.info(_("SI Pipe Transfer: TimeOut reached for id %s") % approval_id) | |
67 try: | |
68 client = self.host.getClient(profile) | |
69 del client._pipe_waiting_for_approval[approval_id] | |
70 except KeyError: | |
71 log.warning(_(u"kill id called on a non existant approval id")) | |
72 | 61 |
73 def transferRequest(self, iq_id, from_jid, si_id, si_mime_type, si_el, profile): | 62 def pipeOut(self, peer_jid, filepath, profile): |
74 """Called when a pipe transfer is requested | 63 """send a file using EXP-PIPE |
75 @param iq_id: id of the iq request | |
76 @param from_jid: jid of the sender | |
77 @param si_id: Stream Initiation session id | |
78 @param si_mime_type: Mime type of the pipe (or default "application/octet-stream" if unknown) | |
79 @param si_el: domish.Element of the request | |
80 @param profile: %(doc_profile)s""" | |
81 log.info(_("EXP-PIPE file transfer requested")) | |
82 log.debug(si_el.toXml()) | |
83 client = self.host.getClient(profile) | |
84 pipe_elts = filter(lambda elt: elt.name == 'pipe', si_el.elements()) | |
85 feature_elts = self.host.plugins["XEP-0020"].getFeatureElt(si_el) | |
86 | 64 |
87 if not pipe_elts: | 65 @param peer_jid(jid.JID): recipient |
88 log.warning(_(u"No pipe element found")) | 66 @param filepath(unicode): absolute path to the named pipe to send |
89 self.host.plugins["XEP-0095"].sendBadRequestError(iq_id, from_jid, profile) | |
90 return | |
91 | |
92 if feature_elts: | |
93 feature_el = feature_elts[0] | |
94 data_form.Form.fromElement(feature_el.firstChildElement()) | |
95 try: | |
96 stream_method = self.host.plugins["XEP-0020"].negociate(feature_el, 'stream-method', self.managed_stream_m) | |
97 except KeyError: | |
98 log.warning(_(u"No stream method found")) | |
99 self.host.plugins["XEP-0095"].sendBadRequestError(iq_id, from_jid, profile) | |
100 return | |
101 if not stream_method: | |
102 log.warning(_(u"Can't find a valid stream method")) | |
103 self.host.plugins["XEP-0095"].sendFailedError(iq_id, from_jid, profile) | |
104 return | |
105 else: | |
106 log.warning(_(u"No feature element found")) | |
107 self.host.plugins["XEP-0095"].sendBadRequestError(iq_id, from_jid, profile) | |
108 return | |
109 | |
110 #if we are here, the transfer can start, we just need user's agreement | |
111 data = {"id": iq_id, "from": from_jid} | |
112 client._pipe_waiting_for_approval[si_id] = [data, reactor.callLater(300, self._kill_id, si_id), stream_method, [], profile] | |
113 | |
114 self.host.askConfirmation(si_id, "PIPE_TRANSFER", data, self.confirmationCB, profile) | |
115 | |
116 def confirmationCB(self, sid, accepted, frontend_data, profile): | |
117 """Called on confirmation answer | |
118 @param sid: file transfer session id | |
119 @param accepted: True if file transfer is accepted | |
120 @param frontend_data: data sent by frontend""" | |
121 client = self.host.getClient(profile) | |
122 data, timeout, stream_method, failed_methods, profile = client._pipe_waiting_for_approval[sid] | |
123 if accepted: | |
124 if timeout.active(): | |
125 timeout.cancel() | |
126 try: | |
127 dest_path = frontend_data['dest_path'] | |
128 except KeyError: | |
129 log.error(_(u'dest path not found in frontend_data')) | |
130 del(client._pipe_waiting_for_approval[sid]) | |
131 return | |
132 if stream_method == self.host.plugins["XEP-0065"].NAMESPACE: | |
133 file_obj = open(dest_path, 'w+') | |
134 self.host.plugins["XEP-0065"].prepareToReceive(jid.JID(data['from']), sid, file_obj, None, self._transferSucceeded, self._transferFailed, profile) | |
135 elif stream_method == self.host.plugins["XEP-0047"].NAMESPACE: | |
136 file_obj = open(dest_path, 'w+') | |
137 self.host.plugins["XEP-0047"].prepareToReceive(jid.JID(data['from']), sid, file_obj, None, self._transferSucceeded, self._transferFailed, profile) | |
138 else: | |
139 log.error(_(u"Unknown stream method, this should not happen at this stage, cancelling transfer")) | |
140 del(client._pipe_waiting_for_approval[sid]) | |
141 return | |
142 | |
143 #we can send the iq result | |
144 feature_elt = self.host.plugins["XEP-0020"].chooseOption({'stream-method': stream_method}) | |
145 misc_elts = [] | |
146 misc_elts.append(domish.Element((PROFILE, "file"))) | |
147 self.host.plugins["XEP-0095"].acceptStream(data["id"], data['from'], feature_elt, misc_elts, profile) | |
148 else: | |
149 log.debug(_(u"Transfer [%s] refused"), sid) | |
150 self.host.plugins["XEP-0095"].sendRejectedError(data["id"], data['from'], profile=profile) | |
151 del(client._pipe_waiting_for_approval[sid]) | |
152 | |
153 def _transferSucceeded(self, sid, file_obj, stream_method, profile): | |
154 """Called by the stream method when transfer successfuly finished | |
155 @param id: stream id""" | |
156 client = self.host.getClient(profile) | |
157 file_obj.close() | |
158 log.info(_('Transfer %s successfuly finished') % sid) | |
159 del(client._pipe_waiting_for_approval[sid]) | |
160 | |
161 def _transferFailed(self, sid, file_obj, stream_method, reason, profile): | |
162 """Called when something went wrong with the transfer | |
163 @param id: stream id | |
164 @param reason: can be TIMEOUT, IO_ERROR, PROTOCOL_ERROR""" | |
165 client = self.host.getClient(profile) | |
166 data, timeout, stream_method, failed_methods, profile = client._pipe_waiting_for_approval[sid] | |
167 log.warning(_(u'Transfer %(id)s failed with stream method %(s_method)s') % {'id': sid, | |
168 's_method': stream_method}) | |
169 # filepath = file_obj.name | |
170 file_obj.close() | |
171 #TODO: session remenber (within a time limit) when a stream method fail, and avoid that stream method with full jid for the rest of the session | |
172 log.warning(_(u"All stream methods failed, can't transfer the file")) | |
173 del(client._pipe_waiting_for_approval[sid]) | |
174 | |
175 def pipeCb(self, filepath, sid, profile, IQ): | |
176 if IQ['type'] == "error": | |
177 stanza_err = jabber.error.exceptionFromStanza(IQ) | |
178 if stanza_err.code == '403' and stanza_err.condition == 'forbidden': | |
179 log.debug(_(u"Pipe transfer refused by %s") % IQ['from']) | |
180 self.host.bridge.newAlert(_("The contact %s refused your pipe stream") % IQ['from'], _("Pipe stream refused"), "INFO", profile) | |
181 else: | |
182 log.warning(_(u"Error during pipe stream transfer with %s") % IQ['from']) | |
183 self.host.bridge.newAlert(_("Something went wrong during the pipe stream session intialisation with %s") % IQ['from'], _("Pipe stream error"), "ERROR", profile) | |
184 return | |
185 | |
186 si_elt = IQ.firstChildElement() | |
187 | |
188 if IQ['type'] != "result" or not si_elt or si_elt.name != "si": | |
189 log.error(_(u"Protocol error during file transfer")) | |
190 return | |
191 | |
192 feature_elts = self.host.plugins["XEP-0020"].getFeatureElt(si_elt) | |
193 if not feature_elts: | |
194 log.warning(_(u"No feature element")) | |
195 return | |
196 | |
197 choosed_options = self.host.plugins["XEP-0020"].getChoosedOptions(feature_elts[0]) | |
198 try: | |
199 stream_method = choosed_options["stream-method"] | |
200 except KeyError: | |
201 log.warning(_(u"No stream method choosed")) | |
202 return | |
203 | |
204 if stream_method == self.host.plugins["XEP-0065"].NAMESPACE: | |
205 #fd = os.open(filepath, os.O_RDONLY | os.O_NONBLOCK) #XXX: non blocking openingl cause issues with XEP-0065's FileSender | |
206 #file_obj = os.fdopen(fd, 'r') | |
207 file_obj = open(filepath, 'r') # XXX: we have to be sure that filepath is well opened, as reading can block it | |
208 self.host.plugins["XEP-0065"].startStream(file_obj, jid.JID(IQ['from']), sid, None, self.sendSuccessCb, self.sendFailureCb, None, profile) | |
209 elif stream_method == self.host.plugins["XEP-0047"].NAMESPACE: | |
210 #fd = os.open(filepath, os.O_RDONLY | os.O_NONBLOCK) #XXX: non blocking openingl cause issues with XEP-0065's FileSender | |
211 #file_obj = os.fdopen(fd, 'r') | |
212 file_obj = open(filepath, 'r') # XXX: we have to be sure that filepath is well opened, as reading can block it | |
213 self.host.plugins["XEP-0047"].startStream(file_obj, jid.JID(IQ['from']), sid, None, self.sendSuccessCb, self.sendFailureCb, None, profile) | |
214 else: | |
215 log.warning(_(u"Invalid stream method received")) | |
216 | |
217 def pipeOut(self, to_jid, filepath, data={}, profile_key=C.PROF_KEY_NONE): | |
218 """send a file using EXP-PIPE | |
219 @to_jid: recipient | |
220 @filepath: absolute path to the named pipe to send | |
221 @data: dictionnary with the optional data | |
222 @param profile_key: %(doc_profile_key)s | 67 @param profile_key: %(doc_profile_key)s |
223 @return: an unique id to identify the transfer | 68 @return: an unique id to identify the transfer |
224 """ | 69 """ |
225 profile = self.host.memory.getProfileName(profile_key) | 70 self._j.initiate(peer_jid, |
226 if not profile: | 71 [{'app_ns': NS_PIPE, |
227 log.warning(_(u"Trying to send a file from an unknown profile")) | 72 'senders': self._j.ROLE_INITIATOR, |
228 return "" | 73 'app_kwargs': {'filepath': filepath, |
229 feature_elt = self.host.plugins["XEP-0020"].proposeFeatures({'stream-method': self.managed_stream_m}) | 74 }, |
75 }], | |
76 profile=profile) | |
230 | 77 |
231 pipe_transfer_elts = [] | 78 def jingleSessionInit(self, session, content_name, filepath, profile=C.PROF_KEY_NONE): |
79 content_data = session['contents'][content_name] | |
80 application_data = content_data['application_data'] | |
81 assert 'file_path' not in application_data | |
82 application_data['file_path'] = filepath | |
83 desc_elt = domish.Element((NS_PIPE, 'description')) | |
84 return desc_elt | |
232 | 85 |
233 pipe_elt = domish.Element((PROFILE, 'pipe')) | 86 def jingleRequestConfirmation(self, action, session, content_name, desc_elt, profile): |
234 pipe_transfer_elts.append(pipe_elt) | 87 """This method request confirmation for a jingle session""" |
88 content_data = session['contents'][content_name] | |
89 if content_data['senders'] not in (self._j.ROLE_INITIATOR, self._j.ROLE_RESPONDER): | |
90 log.warning(u"Bad sender, assuming initiator") | |
91 content_data['senders'] = self._j.ROLE_INITIATOR | |
235 | 92 |
236 sid, offer = self.host.plugins["XEP-0095"].proposeStream(jid.JID(to_jid), PROFILE, feature_elt, pipe_transfer_elts, profile_key=profile) | 93 def gotConfirmation(data): |
237 offer.addCallback(self.pipeCb, filepath, sid, profile) | 94 if data.get('cancelled', False): |
238 return sid | 95 return False |
96 application_data = content_data['application_data'] | |
97 dest_path = application_data['file_path'] = data['path'] | |
98 content_data['file_obj'] = open(dest_path, 'w+') | |
99 finished_d = content_data['finished_d'] = defer.Deferred() | |
100 args = [session, content_name, content_data, profile] | |
101 finished_d.addCallbacks(self._finishedCb, self._finishedEb, args, None, args) | |
102 return True | |
239 | 103 |
240 def sendSuccessCb(self, sid, file_obj, stream_method, profile): | 104 d = xml_tools.deferDialog(self.host, |
241 log.info(_('Transfer %s successfuly finished') % sid) | 105 _(CONFIRM).format(peer=session['peer_jid'].full()), |
242 file_obj.close() | 106 _(CONFIRM_TITLE), |
107 type_=C.XMLUI_DIALOG_FILE, | |
108 options={C.XMLUI_DATA_FILETYPE: C.XMLUI_DATA_FILETYPE_DIR}, | |
109 action_extra={'meta_from_jid': session['peer_jid'].full(), | |
110 'meta_type': "PIPE", | |
111 }, | |
112 security_limit=SECURITY_LIMIT, | |
113 profile=profile) | |
243 | 114 |
244 def sendFailureCb(self, sid, file_obj, stream_method, reason, profile): | 115 d.addCallback(gotConfirmation) |
245 file_obj.close() | 116 return d |
246 log.warning(_(u'Transfer %(id)s failed with stream method %(s_method)s %(profile)s') % {'id': sid, "s_method": stream_method, "profile": profile}) | 117 |
118 def jingleHandler(self, action, session, content_name, desc_elt, profile): | |
119 content_data = session['contents'][content_name] | |
120 application_data = content_data['application_data'] | |
121 if action in (self._j.A_ACCEPTED_ACK, self._j.A_SESSION_INITIATE): | |
122 pass | |
123 elif action == self._j.A_SESSION_ACCEPT: | |
124 assert not 'file_obj' in content_data | |
125 filepath = application_data['file_path'] | |
126 content_data['file_obj'] = open(filepath, 'r') # XXX: we have to be sure that filepath is well opened, as reading can block it | |
127 finished_d = content_data['finished_d'] = defer.Deferred() | |
128 args = [session, content_name, content_data, profile] | |
129 finished_d.addCallbacks(self._finishedCb, self._finishedEb, args, None, args) | |
130 else: | |
131 log.warning(u"FIXME: unmanaged action {}".format(action)) | |
132 return desc_elt | |
133 | |
134 def _finishedCb(self, dummy, session, content_name, content_data, profile): | |
135 log.info(u"Pipe transfer completed") | |
136 self._j.contentTerminate(session, content_name, profile=profile) | |
137 content_data['file_obj'].close() | |
138 | |
139 def _finishedEb(self, failure, session, content_name, content_data, profile): | |
140 log.warning(u"Error while streaming pipe: {}".format(failure)) | |
141 content_data['file_obj'].close() | |
142 self._j.contentTerminate(session, content_name, reason=self._j.REASON_FAILED_TRANSPORT, profile=profile) |