comparison src/plugins/plugin_exp_pipe.py @ 1669:697effba0310

plugin pipe: rewritten plugin as a jingle application. The current implentation can, in some cases, block the backend, and is experimental only. Improvments are needed in the future.
author Goffi <goffi@goffi.org>
date Wed, 25 Nov 2015 02:04:43 +0100
parents 3265a2639182
children dbd7c79aab2b
comparison
equal deleted inserted replaced
1668:a9e86f660653 1669:697effba0310
15 # GNU Affero General Public License for more details. 15 # GNU Affero General Public License for more details.
16 16
17 # You should have received a copy of the GNU Affero General Public License 17 # You should have received a copy of the GNU Affero General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>. 18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 19
20 from sat.core.i18n import _ 20 from sat.core.i18n import _, D_
21 from sat.core.constants import Const as C 21 from sat.core.constants import Const as C
22 from sat.core.log import getLogger 22 from sat.core.log import getLogger
23 log = getLogger(__name__) 23 log = getLogger(__name__)
24 from sat.tools import xml_tools
24 from twisted.words.xish import domish 25 from twisted.words.xish import domish
25 from twisted.words.protocols.jabber import jid 26 from twisted.words.protocols.jabber import jid
26 from twisted.words.protocols import jabber 27 from twisted.internet import defer
27 from twisted.internet import reactor
28 28
29 from wokkel import data_form 29 NS_PIPE = 'org.salut-a-toi/pipe'
30 30 SECURITY_LIMIT=30
31 IQ_SET = '/iq[@type="set"]'
32 PROFILE_NAME = "pipe-transfer"
33 PROFILE = "http://jabber.org/protocol/si/profile/" + PROFILE_NAME
34 31
35 PLUGIN_INFO = { 32 PLUGIN_INFO = {
36 "name": "Pipe Plugin", 33 "name": "Pipe Plugin",
37 "import_name": "EXP-PIPE", 34 "import_name": "EXP-PIPE",
38 "type": "EXP", 35 "type": "EXP",
39 "protocols": ["EXP-PIPE"], 36 "protocols": ["EXP-PIPE"],
40 "dependencies": ["XEP-0020", "XEP-0095", "XEP-0065", "XEP-0047"], 37 "dependencies": ["XEP-0166"],
41 "main": "Exp_Pipe", 38 "main": "Exp_Pipe",
42 "handler": "no", 39 "handler": "no",
43 "description": _("""Implementation of SI Pipe Transfer""") 40 "description": _("""Jingle Pipe Transfer experimental plugin""")
44 } 41 }
45 42
43 CONFIRM = D_(u"{peer} wants to send you a pipe stream, do you accept ?")
44 CONFIRM_TITLE = D_(u"Pipe stream")
46 45
47 class Exp_Pipe(object): 46 class Exp_Pipe(object):
48 """This is a modified version of XEP-0096 to work with named pipes instead of files""" 47 """This non standard jingle application works with named pipes"""
49 48
50 def __init__(self, host): 49 def __init__(self, host):
51 log.info(_("Plugin Pipe initialization")) 50 log.info(_("Plugin Pipe initialization"))
52 self.host = host 51 self.host = host
53 self.managed_stream_m = [self.host.plugins["XEP-0065"].NAMESPACE, 52 self._j = host.plugins["XEP-0166"] # shortcut to access jingle
54 self.host.plugins["XEP-0047"].NAMESPACE] # Stream methods managed 53 self._j.registerApplication(NS_PIPE, self)
55 self.host.plugins["XEP-0095"].registerSIProfile(PROFILE_NAME, self.transferRequest) 54 host.bridge.addMethod("pipeOut", ".plugin", in_sign='sss', out_sign='', method=self._pipeOut)
56 host.bridge.addMethod("pipeOut", ".plugin", in_sign='ssa{ss}s', out_sign='s', method=self.pipeOut)
57 55
58 def profileConnected(self, profile): 56 # jingle callbacks
59 client = self.host.getClient(profile)
60 client._pipe_waiting_for_approval = {} # key = id, value = [transfer data, IdelayedCall Reactor timeout,
61 # current stream method, [failed stream methods], profile]
62 57
63 def _kill_id(self, approval_id, profile): 58 def _pipeOut(self, peer_jid_s, filepath, profile_key=C.PROF_KEY_NONE):
64 """Delete a waiting_for_approval id, called after timeout 59 profile = self.host.memory.getProfileName(profile_key)
65 @param approval_id: id of _pipe_waiting_for_approval""" 60 self.pipeOut(jid.JID(peer_jid_s), filepath, profile)
66 log.info(_("SI Pipe Transfer: TimeOut reached for id %s") % approval_id)
67 try:
68 client = self.host.getClient(profile)
69 del client._pipe_waiting_for_approval[approval_id]
70 except KeyError:
71 log.warning(_(u"kill id called on a non existant approval id"))
72 61
73 def transferRequest(self, iq_id, from_jid, si_id, si_mime_type, si_el, profile): 62 def pipeOut(self, peer_jid, filepath, profile):
74 """Called when a pipe transfer is requested 63 """send a file using EXP-PIPE
75 @param iq_id: id of the iq request
76 @param from_jid: jid of the sender
77 @param si_id: Stream Initiation session id
78 @param si_mime_type: Mime type of the pipe (or default "application/octet-stream" if unknown)
79 @param si_el: domish.Element of the request
80 @param profile: %(doc_profile)s"""
81 log.info(_("EXP-PIPE file transfer requested"))
82 log.debug(si_el.toXml())
83 client = self.host.getClient(profile)
84 pipe_elts = filter(lambda elt: elt.name == 'pipe', si_el.elements())
85 feature_elts = self.host.plugins["XEP-0020"].getFeatureElt(si_el)
86 64
87 if not pipe_elts: 65 @param peer_jid(jid.JID): recipient
88 log.warning(_(u"No pipe element found")) 66 @param filepath(unicode): absolute path to the named pipe to send
89 self.host.plugins["XEP-0095"].sendBadRequestError(iq_id, from_jid, profile)
90 return
91
92 if feature_elts:
93 feature_el = feature_elts[0]
94 data_form.Form.fromElement(feature_el.firstChildElement())
95 try:
96 stream_method = self.host.plugins["XEP-0020"].negociate(feature_el, 'stream-method', self.managed_stream_m)
97 except KeyError:
98 log.warning(_(u"No stream method found"))
99 self.host.plugins["XEP-0095"].sendBadRequestError(iq_id, from_jid, profile)
100 return
101 if not stream_method:
102 log.warning(_(u"Can't find a valid stream method"))
103 self.host.plugins["XEP-0095"].sendFailedError(iq_id, from_jid, profile)
104 return
105 else:
106 log.warning(_(u"No feature element found"))
107 self.host.plugins["XEP-0095"].sendBadRequestError(iq_id, from_jid, profile)
108 return
109
110 #if we are here, the transfer can start, we just need user's agreement
111 data = {"id": iq_id, "from": from_jid}
112 client._pipe_waiting_for_approval[si_id] = [data, reactor.callLater(300, self._kill_id, si_id), stream_method, [], profile]
113
114 self.host.askConfirmation(si_id, "PIPE_TRANSFER", data, self.confirmationCB, profile)
115
116 def confirmationCB(self, sid, accepted, frontend_data, profile):
117 """Called on confirmation answer
118 @param sid: file transfer session id
119 @param accepted: True if file transfer is accepted
120 @param frontend_data: data sent by frontend"""
121 client = self.host.getClient(profile)
122 data, timeout, stream_method, failed_methods, profile = client._pipe_waiting_for_approval[sid]
123 if accepted:
124 if timeout.active():
125 timeout.cancel()
126 try:
127 dest_path = frontend_data['dest_path']
128 except KeyError:
129 log.error(_(u'dest path not found in frontend_data'))
130 del(client._pipe_waiting_for_approval[sid])
131 return
132 if stream_method == self.host.plugins["XEP-0065"].NAMESPACE:
133 file_obj = open(dest_path, 'w+')
134 self.host.plugins["XEP-0065"].prepareToReceive(jid.JID(data['from']), sid, file_obj, None, self._transferSucceeded, self._transferFailed, profile)
135 elif stream_method == self.host.plugins["XEP-0047"].NAMESPACE:
136 file_obj = open(dest_path, 'w+')
137 self.host.plugins["XEP-0047"].prepareToReceive(jid.JID(data['from']), sid, file_obj, None, self._transferSucceeded, self._transferFailed, profile)
138 else:
139 log.error(_(u"Unknown stream method, this should not happen at this stage, cancelling transfer"))
140 del(client._pipe_waiting_for_approval[sid])
141 return
142
143 #we can send the iq result
144 feature_elt = self.host.plugins["XEP-0020"].chooseOption({'stream-method': stream_method})
145 misc_elts = []
146 misc_elts.append(domish.Element((PROFILE, "file")))
147 self.host.plugins["XEP-0095"].acceptStream(data["id"], data['from'], feature_elt, misc_elts, profile)
148 else:
149 log.debug(_(u"Transfer [%s] refused"), sid)
150 self.host.plugins["XEP-0095"].sendRejectedError(data["id"], data['from'], profile=profile)
151 del(client._pipe_waiting_for_approval[sid])
152
153 def _transferSucceeded(self, sid, file_obj, stream_method, profile):
154 """Called by the stream method when transfer successfuly finished
155 @param id: stream id"""
156 client = self.host.getClient(profile)
157 file_obj.close()
158 log.info(_('Transfer %s successfuly finished') % sid)
159 del(client._pipe_waiting_for_approval[sid])
160
161 def _transferFailed(self, sid, file_obj, stream_method, reason, profile):
162 """Called when something went wrong with the transfer
163 @param id: stream id
164 @param reason: can be TIMEOUT, IO_ERROR, PROTOCOL_ERROR"""
165 client = self.host.getClient(profile)
166 data, timeout, stream_method, failed_methods, profile = client._pipe_waiting_for_approval[sid]
167 log.warning(_(u'Transfer %(id)s failed with stream method %(s_method)s') % {'id': sid,
168 's_method': stream_method})
169 # filepath = file_obj.name
170 file_obj.close()
171 #TODO: session remenber (within a time limit) when a stream method fail, and avoid that stream method with full jid for the rest of the session
172 log.warning(_(u"All stream methods failed, can't transfer the file"))
173 del(client._pipe_waiting_for_approval[sid])
174
175 def pipeCb(self, filepath, sid, profile, IQ):
176 if IQ['type'] == "error":
177 stanza_err = jabber.error.exceptionFromStanza(IQ)
178 if stanza_err.code == '403' and stanza_err.condition == 'forbidden':
179 log.debug(_(u"Pipe transfer refused by %s") % IQ['from'])
180 self.host.bridge.newAlert(_("The contact %s refused your pipe stream") % IQ['from'], _("Pipe stream refused"), "INFO", profile)
181 else:
182 log.warning(_(u"Error during pipe stream transfer with %s") % IQ['from'])
183 self.host.bridge.newAlert(_("Something went wrong during the pipe stream session intialisation with %s") % IQ['from'], _("Pipe stream error"), "ERROR", profile)
184 return
185
186 si_elt = IQ.firstChildElement()
187
188 if IQ['type'] != "result" or not si_elt or si_elt.name != "si":
189 log.error(_(u"Protocol error during file transfer"))
190 return
191
192 feature_elts = self.host.plugins["XEP-0020"].getFeatureElt(si_elt)
193 if not feature_elts:
194 log.warning(_(u"No feature element"))
195 return
196
197 choosed_options = self.host.plugins["XEP-0020"].getChoosedOptions(feature_elts[0])
198 try:
199 stream_method = choosed_options["stream-method"]
200 except KeyError:
201 log.warning(_(u"No stream method choosed"))
202 return
203
204 if stream_method == self.host.plugins["XEP-0065"].NAMESPACE:
205 #fd = os.open(filepath, os.O_RDONLY | os.O_NONBLOCK) #XXX: non blocking openingl cause issues with XEP-0065's FileSender
206 #file_obj = os.fdopen(fd, 'r')
207 file_obj = open(filepath, 'r') # XXX: we have to be sure that filepath is well opened, as reading can block it
208 self.host.plugins["XEP-0065"].startStream(file_obj, jid.JID(IQ['from']), sid, None, self.sendSuccessCb, self.sendFailureCb, None, profile)
209 elif stream_method == self.host.plugins["XEP-0047"].NAMESPACE:
210 #fd = os.open(filepath, os.O_RDONLY | os.O_NONBLOCK) #XXX: non blocking openingl cause issues with XEP-0065's FileSender
211 #file_obj = os.fdopen(fd, 'r')
212 file_obj = open(filepath, 'r') # XXX: we have to be sure that filepath is well opened, as reading can block it
213 self.host.plugins["XEP-0047"].startStream(file_obj, jid.JID(IQ['from']), sid, None, self.sendSuccessCb, self.sendFailureCb, None, profile)
214 else:
215 log.warning(_(u"Invalid stream method received"))
216
217 def pipeOut(self, to_jid, filepath, data={}, profile_key=C.PROF_KEY_NONE):
218 """send a file using EXP-PIPE
219 @to_jid: recipient
220 @filepath: absolute path to the named pipe to send
221 @data: dictionnary with the optional data
222 @param profile_key: %(doc_profile_key)s 67 @param profile_key: %(doc_profile_key)s
223 @return: an unique id to identify the transfer 68 @return: an unique id to identify the transfer
224 """ 69 """
225 profile = self.host.memory.getProfileName(profile_key) 70 self._j.initiate(peer_jid,
226 if not profile: 71 [{'app_ns': NS_PIPE,
227 log.warning(_(u"Trying to send a file from an unknown profile")) 72 'senders': self._j.ROLE_INITIATOR,
228 return "" 73 'app_kwargs': {'filepath': filepath,
229 feature_elt = self.host.plugins["XEP-0020"].proposeFeatures({'stream-method': self.managed_stream_m}) 74 },
75 }],
76 profile=profile)
230 77
231 pipe_transfer_elts = [] 78 def jingleSessionInit(self, session, content_name, filepath, profile=C.PROF_KEY_NONE):
79 content_data = session['contents'][content_name]
80 application_data = content_data['application_data']
81 assert 'file_path' not in application_data
82 application_data['file_path'] = filepath
83 desc_elt = domish.Element((NS_PIPE, 'description'))
84 return desc_elt
232 85
233 pipe_elt = domish.Element((PROFILE, 'pipe')) 86 def jingleRequestConfirmation(self, action, session, content_name, desc_elt, profile):
234 pipe_transfer_elts.append(pipe_elt) 87 """This method request confirmation for a jingle session"""
88 content_data = session['contents'][content_name]
89 if content_data['senders'] not in (self._j.ROLE_INITIATOR, self._j.ROLE_RESPONDER):
90 log.warning(u"Bad sender, assuming initiator")
91 content_data['senders'] = self._j.ROLE_INITIATOR
235 92
236 sid, offer = self.host.plugins["XEP-0095"].proposeStream(jid.JID(to_jid), PROFILE, feature_elt, pipe_transfer_elts, profile_key=profile) 93 def gotConfirmation(data):
237 offer.addCallback(self.pipeCb, filepath, sid, profile) 94 if data.get('cancelled', False):
238 return sid 95 return False
96 application_data = content_data['application_data']
97 dest_path = application_data['file_path'] = data['path']
98 content_data['file_obj'] = open(dest_path, 'w+')
99 finished_d = content_data['finished_d'] = defer.Deferred()
100 args = [session, content_name, content_data, profile]
101 finished_d.addCallbacks(self._finishedCb, self._finishedEb, args, None, args)
102 return True
239 103
240 def sendSuccessCb(self, sid, file_obj, stream_method, profile): 104 d = xml_tools.deferDialog(self.host,
241 log.info(_('Transfer %s successfuly finished') % sid) 105 _(CONFIRM).format(peer=session['peer_jid'].full()),
242 file_obj.close() 106 _(CONFIRM_TITLE),
107 type_=C.XMLUI_DIALOG_FILE,
108 options={C.XMLUI_DATA_FILETYPE: C.XMLUI_DATA_FILETYPE_DIR},
109 action_extra={'meta_from_jid': session['peer_jid'].full(),
110 'meta_type': "PIPE",
111 },
112 security_limit=SECURITY_LIMIT,
113 profile=profile)
243 114
244 def sendFailureCb(self, sid, file_obj, stream_method, reason, profile): 115 d.addCallback(gotConfirmation)
245 file_obj.close() 116 return d
246 log.warning(_(u'Transfer %(id)s failed with stream method %(s_method)s %(profile)s') % {'id': sid, "s_method": stream_method, "profile": profile}) 117
118 def jingleHandler(self, action, session, content_name, desc_elt, profile):
119 content_data = session['contents'][content_name]
120 application_data = content_data['application_data']
121 if action in (self._j.A_ACCEPTED_ACK, self._j.A_SESSION_INITIATE):
122 pass
123 elif action == self._j.A_SESSION_ACCEPT:
124 assert not 'file_obj' in content_data
125 filepath = application_data['file_path']
126 content_data['file_obj'] = open(filepath, 'r') # XXX: we have to be sure that filepath is well opened, as reading can block it
127 finished_d = content_data['finished_d'] = defer.Deferred()
128 args = [session, content_name, content_data, profile]
129 finished_d.addCallbacks(self._finishedCb, self._finishedEb, args, None, args)
130 else:
131 log.warning(u"FIXME: unmanaged action {}".format(action))
132 return desc_elt
133
134 def _finishedCb(self, dummy, session, content_name, content_data, profile):
135 log.info(u"Pipe transfer completed")
136 self._j.contentTerminate(session, content_name, profile=profile)
137 content_data['file_obj'].close()
138
139 def _finishedEb(self, failure, session, content_name, content_data, profile):
140 log.warning(u"Error while streaming pipe: {}".format(failure))
141 content_data['file_obj'].close()
142 self._j.contentTerminate(session, content_name, reason=self._j.REASON_FAILED_TRANSPORT, profile=profile)