Mercurial > libervia-backend
comparison src/plugins/plugin_exp_pipe.py @ 594:e629371a28d3
Fix pep8 support in src/plugins.
author | Emmanuel Gil Peyrot <linkmauve@linkmauve.fr> |
---|---|
date | Fri, 18 Jan 2013 17:55:35 +0100 |
parents | beaf6bec2fcd |
children | 84a6e83157c2 |
comparison
equal
deleted
inserted
replaced
593:70bae685d05c | 594:e629371a28d3 |
---|---|
21 | 21 |
22 from logging import debug, info, warning, error | 22 from logging import debug, info, warning, error |
23 from twisted.words.xish import domish | 23 from twisted.words.xish import domish |
24 from twisted.words.protocols.jabber import jid | 24 from twisted.words.protocols.jabber import jid |
25 from twisted.words.protocols.jabber import error as jab_error | 25 from twisted.words.protocols.jabber import error as jab_error |
26 import os, os.path | 26 import os |
27 from twisted.internet import reactor | 27 from twisted.internet import reactor |
28 from sat.core.exceptions import ProfileNotInCacheError | 28 from sat.core.exceptions import ProfileNotInCacheError |
29 | 29 |
30 from wokkel import data_form | 30 from wokkel import data_form |
31 | 31 |
32 IQ_SET = '/iq[@type="set"]' | 32 IQ_SET = '/iq[@type="set"]' |
33 PROFILE_NAME = "pipe-transfer" | 33 PROFILE_NAME = "pipe-transfer" |
34 PROFILE = "http://jabber.org/protocol/si/profile/" + PROFILE_NAME | 34 PROFILE = "http://jabber.org/protocol/si/profile/" + PROFILE_NAME |
35 | 35 |
36 PLUGIN_INFO = { | 36 PLUGIN_INFO = { |
37 "name": "Pipe Plugin", | 37 "name": "Pipe Plugin", |
38 "import_name": "EXP-PIPE", | 38 "import_name": "EXP-PIPE", |
39 "type": "EXP", | 39 "type": "EXP", |
40 "protocols": ["EXP-PIPE"], | 40 "protocols": ["EXP-PIPE"], |
41 "dependencies": ["XEP-0020", "XEP-0095", "XEP-0065", "XEP-0047"], | 41 "dependencies": ["XEP-0020", "XEP-0095", "XEP-0065", "XEP-0047"], |
42 "main": "Exp_Pipe", | 42 "main": "Exp_Pipe", |
43 "handler": "no", | 43 "handler": "no", |
44 "description": _("""Implementation of SI Pipe Transfer""") | 44 "description": _("""Implementation of SI Pipe Transfer""") |
45 } | 45 } |
46 | |
46 | 47 |
47 class Exp_Pipe(object): | 48 class Exp_Pipe(object): |
48 """This is a modified version of XEP-0096 to work with named pipes instead of files""" | 49 """This is a modified version of XEP-0096 to work with named pipes instead of files""" |
49 | 50 |
50 def __init__(self, host): | 51 def __init__(self, host): |
51 info(_("Plugin Pipe initialization")) | 52 info(_("Plugin Pipe initialization")) |
52 self.host = host | 53 self.host = host |
53 self.managed_stream_m = [self.host.plugins["XEP-0065"].NAMESPACE, | 54 self.managed_stream_m = [self.host.plugins["XEP-0065"].NAMESPACE, |
54 self.host.plugins["XEP-0047"].NAMESPACE] #Stream methods managed | 55 self.host.plugins["XEP-0047"].NAMESPACE] # Stream methods managed |
55 self.host.plugins["XEP-0095"].registerSIProfile(PROFILE_NAME, self.transferRequest) | 56 self.host.plugins["XEP-0095"].registerSIProfile(PROFILE_NAME, self.transferRequest) |
56 host.bridge.addMethod("pipeOut", ".plugin", in_sign='ssa{ss}s', out_sign='s', method=self.pipeOut) | 57 host.bridge.addMethod("pipeOut", ".plugin", in_sign='ssa{ss}s', out_sign='s', method=self.pipeOut) |
57 | 58 |
58 def profileConnected(self, profile): | 59 def profileConnected(self, profile): |
59 client = self.host.getClient(profile) | 60 client = self.host.getClient(profile) |
60 client._pipe_waiting_for_approval = {} #key = id, value = [transfer data, IdelayedCall Reactor timeout, | 61 client._pipe_waiting_for_approval = {} # key = id, value = [transfer data, IdelayedCall Reactor timeout, |
61 # current stream method, [failed stream methods], profile] | 62 # current stream method, [failed stream methods], profile] |
62 | 63 |
63 def _kill_id(self, approval_id, profile): | 64 def _kill_id(self, approval_id, profile): |
64 """Delete a waiting_for_approval id, called after timeout | 65 """Delete a waiting_for_approval id, called after timeout |
65 @param approval_id: id of _pipe_waiting_for_approval""" | 66 @param approval_id: id of _pipe_waiting_for_approval""" |
76 @param from_jid: jid of the sender | 77 @param from_jid: jid of the sender |
77 @param si_id: Stream Initiation session id | 78 @param si_id: Stream Initiation session id |
78 @param si_mime_type: Mime type of the pipe (or default "application/octet-stream" if unknown) | 79 @param si_mime_type: Mime type of the pipe (or default "application/octet-stream" if unknown) |
79 @param si_el: domish.Element of the request | 80 @param si_el: domish.Element of the request |
80 @param profile: %(doc_profile)s""" | 81 @param profile: %(doc_profile)s""" |
81 info (_("EXP-PIPE file transfer requested")) | 82 info(_("EXP-PIPE file transfer requested")) |
82 debug(si_el.toXml()) | 83 debug(si_el.toXml()) |
83 client = self.host.getClient(profile) | 84 client = self.host.getClient(profile) |
84 if not client: | 85 if not client: |
85 raise ProfileNotInCacheError | 86 raise ProfileNotInCacheError |
86 pipe_elts = filter(lambda elt: elt.name == 'pipe', si_el.elements()) | 87 pipe_elts = filter(lambda elt: elt.name == 'pipe', si_el.elements()) |
93 | 94 |
94 if feature_elts: | 95 if feature_elts: |
95 feature_el = feature_elts[0] | 96 feature_el = feature_elts[0] |
96 form = data_form.Form.fromElement(feature_el.firstChildElement()) | 97 form = data_form.Form.fromElement(feature_el.firstChildElement()) |
97 try: | 98 try: |
98 stream_method = self.host.plugins["XEP-0020"].negociate(feature_el, 'stream-method',self.managed_stream_m) | 99 stream_method = self.host.plugins["XEP-0020"].negociate(feature_el, 'stream-method', self.managed_stream_m) |
99 except KeyError: | 100 except KeyError: |
100 warning(_("No stream method found")) | 101 warning(_("No stream method found")) |
101 self.host.plugins["XEP-0095"].sendBadRequestError(iq_id, from_jid, profile) | 102 self.host.plugins["XEP-0095"].sendBadRequestError(iq_id, from_jid, profile) |
102 return | 103 return |
103 if not stream_method: | 104 if not stream_method: |
108 warning(_("No feature element found")) | 109 warning(_("No feature element found")) |
109 self.host.plugins["XEP-0095"].sendBadRequestError(iq_id, from_jid, profile) | 110 self.host.plugins["XEP-0095"].sendBadRequestError(iq_id, from_jid, profile) |
110 return | 111 return |
111 | 112 |
112 #if we are here, the transfer can start, we just need user's agreement | 113 #if we are here, the transfer can start, we just need user's agreement |
113 data={ "id": iq_id, "from":from_jid } | 114 data = {"id": iq_id, "from": from_jid} |
114 client._pipe_waiting_for_approval[si_id] = [data, reactor.callLater(300, self._kill_id, si_id), stream_method, [], profile] | 115 client._pipe_waiting_for_approval[si_id] = [data, reactor.callLater(300, self._kill_id, si_id), stream_method, [], profile] |
115 | 116 |
116 self.host.askConfirmation(si_id, "PIPE_TRANSFER", data, self.confirmationCB, profile) | 117 self.host.askConfirmation(si_id, "PIPE_TRANSFER", data, self.confirmationCB, profile) |
117 | |
118 | 118 |
119 def confirmationCB(self, sid, accepted, frontend_data, profile): | 119 def confirmationCB(self, sid, accepted, frontend_data, profile): |
120 """Called on confirmation answer | 120 """Called on confirmation answer |
121 @param sid: file transfer session id | 121 @param sid: file transfer session id |
122 @param accepted: True if file transfer is accepted | 122 @param accepted: True if file transfer is accepted |
144 error(_("Unknown stream method, this should not happen at this stage, cancelling transfer")) | 144 error(_("Unknown stream method, this should not happen at this stage, cancelling transfer")) |
145 del(client._pipe_waiting_for_approval[sid]) | 145 del(client._pipe_waiting_for_approval[sid]) |
146 return | 146 return |
147 | 147 |
148 #we can send the iq result | 148 #we can send the iq result |
149 feature_elt = self.host.plugins["XEP-0020"].chooseOption({'stream-method':stream_method}) | 149 feature_elt = self.host.plugins["XEP-0020"].chooseOption({'stream-method': stream_method}) |
150 misc_elts = [] | 150 misc_elts = [] |
151 misc_elts.append(domish.Element((PROFILE, "file"))) | 151 misc_elts.append(domish.Element((PROFILE, "file"))) |
152 self.host.plugins["XEP-0095"].acceptStream(data["id"], data['from'], feature_elt, misc_elts, profile) | 152 self.host.plugins["XEP-0095"].acceptStream(data["id"], data['from'], feature_elt, misc_elts, profile) |
153 else: | 153 else: |
154 debug (_("Transfer [%s] refused"), sid) | 154 debug(_("Transfer [%s] refused"), sid) |
155 self.host.plugins["XEP-0095"].sendRejectedError (data["id"], data['from'], profile=profile) | 155 self.host.plugins["XEP-0095"].sendRejectedError(data["id"], data['from'], profile=profile) |
156 del(client._pipe_waiting_for_approval[sid]) | 156 del(client._pipe_waiting_for_approval[sid]) |
157 | 157 |
158 def _transferSucceeded(self, sid, file_obj, stream_method, profile): | 158 def _transferSucceeded(self, sid, file_obj, stream_method, profile): |
159 """Called by the stream method when transfer successfuly finished | 159 """Called by the stream method when transfer successfuly finished |
160 @param id: stream id""" | 160 @param id: stream id""" |
171 @param reason: can be TIMEOUT, IO_ERROR, PROTOCOL_ERROR""" | 171 @param reason: can be TIMEOUT, IO_ERROR, PROTOCOL_ERROR""" |
172 client = self.host.getClient(profile) | 172 client = self.host.getClient(profile) |
173 if not client: | 173 if not client: |
174 raise ProfileNotInCacheError | 174 raise ProfileNotInCacheError |
175 data, timeout, stream_method, failed_methods, profile = client._pipe_waiting_for_approval[sid] | 175 data, timeout, stream_method, failed_methods, profile = client._pipe_waiting_for_approval[sid] |
176 warning(_('Transfer %(id)s failed with stream method %(s_method)s') % { 'id': sid, | 176 warning(_('Transfer %(id)s failed with stream method %(s_method)s') % {'id': sid, |
177 's_method': stream_method }) | 177 's_method': stream_method}) |
178 filepath = file_obj.name | 178 filepath = file_obj.name |
179 file_obj.close() | 179 file_obj.close() |
180 #TODO: session remenber (within a time limit) when a stream method fail, and avoid that stream method with full jid for the rest of the session | 180 #TODO: session remenber (within a time limit) when a stream method fail, and avoid that stream method with full jid for the rest of the session |
181 warning(_("All stream methods failed, can't transfer the file")) | 181 warning(_("All stream methods failed, can't transfer the file")) |
182 del(client._pipe_waiting_for_approval[sid]) | 182 del(client._pipe_waiting_for_approval[sid]) |
209 except KeyError: | 209 except KeyError: |
210 warning(_("No stream method choosed")) | 210 warning(_("No stream method choosed")) |
211 return | 211 return |
212 | 212 |
213 if stream_method == self.host.plugins["XEP-0065"].NAMESPACE: | 213 if stream_method == self.host.plugins["XEP-0065"].NAMESPACE: |
214 #fd = os.open(filepath, os.O_RDONLY | os.O_NONBLOCK) #XXX: non blocking openingl cause issues with XEP-0065's FileSender | 214 #fd = os.open(filepath, os.O_RDONLY | os.O_NONBLOCK) #XXX: non blocking openingl cause issues with XEP-0065's FileSender |
215 #file_obj = os.fdopen(fd, 'r') | 215 #file_obj = os.fdopen(fd, 'r') |
216 file_obj = open(filepath, 'r') #XXX: we have to be sure that filepath is well opened, as reading can block it | 216 file_obj = open(filepath, 'r') # XXX: we have to be sure that filepath is well opened, as reading can block it |
217 self.host.plugins["XEP-0065"].startStream(file_obj, jid.JID(IQ['from']), sid, None, self.sendSuccessCb, self.sendFailureCb, None, profile) | 217 self.host.plugins["XEP-0065"].startStream(file_obj, jid.JID(IQ['from']), sid, None, self.sendSuccessCb, self.sendFailureCb, None, profile) |
218 elif stream_method == self.host.plugins["XEP-0047"].NAMESPACE: | 218 elif stream_method == self.host.plugins["XEP-0047"].NAMESPACE: |
219 #fd = os.open(filepath, os.O_RDONLY | os.O_NONBLOCK) #XXX: non blocking openingl cause issues with XEP-0065's FileSender | 219 #fd = os.open(filepath, os.O_RDONLY | os.O_NONBLOCK) #XXX: non blocking openingl cause issues with XEP-0065's FileSender |
220 #file_obj = os.fdopen(fd, 'r') | 220 #file_obj = os.fdopen(fd, 'r') |
221 file_obj = open(filepath, 'r') #XXX: we have to be sure that filepath is well opened, as reading can block it | 221 file_obj = open(filepath, 'r') # XXX: we have to be sure that filepath is well opened, as reading can block it |
222 self.host.plugins["XEP-0047"].startStream(file_obj, jid.JID(IQ['from']), sid, None, self.sendSuccessCb, self.sendFailureCb, None, profile) | 222 self.host.plugins["XEP-0047"].startStream(file_obj, jid.JID(IQ['from']), sid, None, self.sendSuccessCb, self.sendFailureCb, None, profile) |
223 else: | 223 else: |
224 warning(_("Invalid stream method received")) | 224 warning(_("Invalid stream method received")) |
225 | 225 |
226 def pipeOut(self, to_jid, filepath, data={}, profile_key='@DEFAULT@'): | 226 def pipeOut(self, to_jid, filepath, data={}, profile_key='@DEFAULT@'): |
227 """send a file using EXP-PIPE | 227 """send a file using EXP-PIPE |
240 pipe_transfer_elts = [] | 240 pipe_transfer_elts = [] |
241 | 241 |
242 pipe_elt = domish.Element((PROFILE, 'pipe')) | 242 pipe_elt = domish.Element((PROFILE, 'pipe')) |
243 pipe_transfer_elts.append(pipe_elt) | 243 pipe_transfer_elts.append(pipe_elt) |
244 | 244 |
245 sid, offer = self.host.plugins["XEP-0095"].proposeStream(jid.JID(to_jid), PROFILE, feature_elt, pipe_transfer_elts, profile_key = profile) | 245 sid, offer = self.host.plugins["XEP-0095"].proposeStream(jid.JID(to_jid), PROFILE, feature_elt, pipe_transfer_elts, profile_key=profile) |
246 offer.addCallback(self.pipeCb, filepath, sid, profile) | 246 offer.addCallback(self.pipeCb, filepath, sid, profile) |
247 return sid | 247 return sid |
248 | 248 |
249 def sendSuccessCb(self, sid, file_obj, stream_method, profile): | 249 def sendSuccessCb(self, sid, file_obj, stream_method, profile): |
250 info(_('Transfer %s successfuly finished') % sid) | 250 info(_('Transfer %s successfuly finished') % sid) |
251 file_obj.close() | 251 file_obj.close() |
252 | 252 |
253 def sendFailureCb(self, sid, file_obj, stream_method, reason, profile): | 253 def sendFailureCb(self, sid, file_obj, stream_method, reason, profile): |
254 file_obj.close() | 254 file_obj.close() |
255 warning(_('Transfer %(id)s failed with stream method %(s_method)s %(profile)s') % { 'id': sid, "s_method": stream_method, "profile": profile }) | 255 warning(_('Transfer %(id)s failed with stream method %(s_method)s %(profile)s') % {'id': sid, "s_method": stream_method, "profile": profile}) |
256 |