comparison src/plugins/plugin_xep_0166.py @ 2489:e2a7bb875957

plugin pipe/stream, file transfert: refactoring and improvments: this is a big patch as things had to be changed at the same time. - changed methods using profile argument to use client instead - move SatFile in a new tools.stream module, has it should be part of core, not a plugin - new IStreamProducer interface, to handler starting a pull producer - new FileStreamObject which create a stream producer/consumer from a SatFile - plugin pipe is no more using unix named pipe, as it complicate the thing, special care need to be taken to not block, and it's generally not necessary. Instead a socket is now used, so the plugin has been renomed to jingle stream. - bad connection/error should be better handler in jingle stream plugin, and code should not block anymore - jp pipe commands have been updated accordingly fix bug 237
author Goffi <goffi@goffi.org>
date Thu, 08 Feb 2018 00:37:42 +0100
parents ed1d71b91b29
children 7ad5f2c4e34a
comparison
equal deleted inserted replaced
2488:78c7992a26ed 2489:e2a7bb875957
120 jingle_elt = iq_elt.addElement("jingle", NS_JINGLE) 120 jingle_elt = iq_elt.addElement("jingle", NS_JINGLE)
121 jingle_elt["sid"] = session['id'] 121 jingle_elt["sid"] = session['id']
122 jingle_elt['action'] = action 122 jingle_elt['action'] = action
123 return iq_elt, jingle_elt 123 return iq_elt, jingle_elt
124 124
125 def sendError(self, error_condition, sid, request, jingle_condition=None, profile=C.PROF_KEY_NONE): 125 def sendError(self, client, error_condition, sid, request, jingle_condition=None):
126 """Send error stanza 126 """Send error stanza
127 127
128 @param error_condition: one of twisted.words.protocols.jabber.error.STANZA_CONDITIONS keys 128 @param error_condition: one of twisted.words.protocols.jabber.error.STANZA_CONDITIONS keys
129 @param sid(unicode,None): jingle session id, or None, if session must not be destroyed 129 @param sid(unicode,None): jingle session id, or None, if session must not be destroyed
130 @param request(domish.Element): original request 130 @param request(domish.Element): original request
131 @param jingle_condition(None, unicode): if not None, additional jingle-specific error information 131 @param jingle_condition(None, unicode): if not None, additional jingle-specific error information
132 @param profile: %(doc_profile)s 132 """
133 """
134 client = self.host.getClient(profile)
135 iq_elt = error.StanzaError(error_condition).toResponse(request) 133 iq_elt = error.StanzaError(error_condition).toResponse(request)
136 if jingle_condition is not None: 134 if jingle_condition is not None:
137 iq_elt.error.addElement((NS_JINGLE_ERROR, jingle_condition)) 135 iq_elt.error.addElement((NS_JINGLE_ERROR, jingle_condition))
138 if error.STANZA_CONDITIONS[error_condition]['type'] == 'cancel' and sid: 136 if error.STANZA_CONDITIONS[error_condition]['type'] == 'cancel' and sid:
139 self._delSession(client, sid) 137 self._delSession(client, sid)
141 client.send(iq_elt) 139 client.send(iq_elt)
142 140
143 def _terminateEb(self, failure_): 141 def _terminateEb(self, failure_):
144 log.warning(_(u"Error while terminating session: {msg}").format(msg=failure_)) 142 log.warning(_(u"Error while terminating session: {msg}").format(msg=failure_))
145 143
146 def terminate(self, reason, session, profile): 144 def terminate(self, client, reason, session):
147 """Terminate the session 145 """Terminate the session
148 146
149 send the session-terminate action, and delete the session data 147 send the session-terminate action, and delete the session data
150 @param reason(unicode, list[domish.Element]): if unicode, will be transformed to an element 148 @param reason(unicode, list[domish.Element]): if unicode, will be transformed to an element
151 if a list of element, add them as children of the <reason/> element 149 if a list of element, add them as children of the <reason/> element
152 @param session(dict): data of the session 150 @param session(dict): data of the session
153 @param profile: %(doc_profile)s 151 """
154 """
155 client = self.host.getClient(profile)
156 iq_elt, jingle_elt = self._buildJingleElt(client, session, XEP_0166.A_SESSION_TERMINATE) 152 iq_elt, jingle_elt = self._buildJingleElt(client, session, XEP_0166.A_SESSION_TERMINATE)
157 reason_elt = jingle_elt.addElement('reason') 153 reason_elt = jingle_elt.addElement('reason')
158 if isinstance(reason, basestring): 154 if isinstance(reason, basestring):
159 reason_elt.addElement(reason) 155 reason_elt.addElement(reason)
160 else: 156 else:
170 def _iqError(self, failure_, sid, client): 166 def _iqError(self, failure_, sid, client):
171 """Called when we got an <iq/> error 167 """Called when we got an <iq/> error
172 168
173 @param failure_(failure.Failure): the exceptions raised 169 @param failure_(failure.Failure): the exceptions raised
174 @param sid(unicode): jingle session id 170 @param sid(unicode): jingle session id
175 @param profile: %(doc_client)s
176 """ 171 """
177 log.warning(u"Error while sending jingle <iq/> stanza: {failure_}".format(failure_=failure_.value)) 172 log.warning(u"Error while sending jingle <iq/> stanza: {failure_}".format(failure_=failure_.value))
178 self._delSession(client, sid) 173 self._delSession(client, sid)
179 174
180 def _jingleErrorCb(self, fail, sid, request, client): 175 def _jingleErrorCb(self, fail, sid, request, client):
187 @param request(domsih.Element): jingle request 182 @param request(domsih.Element): jingle request
188 @param client: %(doc_client)s 183 @param client: %(doc_client)s
189 """ 184 """
190 log.warning("Error while processing jingle request") 185 log.warning("Error while processing jingle request")
191 if isinstance(fail, exceptions.DataError): 186 if isinstance(fail, exceptions.DataError):
192 self.sendError('bad-request', sid, request, profile=client.profile) 187 self.sendError(client, 'bad-request', sid, request)
193 else: 188 else:
194 log.error("Unmanaged jingle exception") 189 log.error("Unmanaged jingle exception")
195 self._delSession(client, sid) 190 self._delSession(client, sid)
196 raise fail 191 raise fail
197 192
206 - requestConfirmation(session, desc_elt, client): 201 - requestConfirmation(session, desc_elt, client):
207 - if present, it is called on when session must be accepted. 202 - if present, it is called on when session must be accepted.
208 - if it return True the session is accepted, else rejected. 203 - if it return True the session is accepted, else rejected.
209 A Deferred can be returned 204 A Deferred can be returned
210 - if not present, a generic accept dialog will be used 205 - if not present, a generic accept dialog will be used
211 - jingleSessionInit(self, session, content_name[, *args, **kwargs], profile): must return the domish.Element used for initial content 206 - jingleSessionInit(client, self, session, content_name[, *args, **kwargs]): must return the domish.Element used for initial content
212 - jingleHandler(self, action, session, content_name, transport_elt, profile): 207 - jingleHandler(client, self, action, session, content_name, transport_elt):
213 called on several action to negociate the application or transport 208 called on several action to negociate the application or transport
214 - jingleTerminate: called on session terminate, with reason_elt 209 - jingleTerminate: called on session terminate, with reason_elt
215 May be used to clean session 210 May be used to clean session
216 """ 211 """
217 if namespace in self._applications: 212 if namespace in self._applications:
224 219
225 @param namespace(unicode): the XML namespace used for this transport 220 @param namespace(unicode): the XML namespace used for this transport
226 @param transport_type(unicode): type of transport to use (see XEP-0166 §8) 221 @param transport_type(unicode): type of transport to use (see XEP-0166 §8)
227 @param handler(object): instance of a class which manage the application. 222 @param handler(object): instance of a class which manage the application.
228 Must have the following methods: 223 Must have the following methods:
229 - jingleSessionInit(self, session, content_name[, *args, **kwargs], profile): must return the domish.Element used for initial content 224 - jingleSessionInit(client, self, session, content_name[, *args, **kwargs]): must return the domish.Element used for initial content
230 - jingleHandler(self, action, session, content_name, transport_elt, profile): 225 - jingleHandler(client, self, action, session, content_name, transport_elt):
231 called on several action to negociate the application or transport 226 called on several action to negociate the application or transport
232 @param priority(int): priority of this transport 227 @param priority(int): priority of this transport
233 """ 228 """
234 assert transport_type in (XEP_0166.TRANSPORT_DATAGRAM, XEP_0166.TRANSPORT_STREAMING) 229 assert transport_type in (XEP_0166.TRANSPORT_DATAGRAM, XEP_0166.TRANSPORT_STREAMING)
235 if namespace in self._transports: 230 if namespace in self._transports:
239 self._type_transports[transport_type].sort(key=lambda transport_data: transport_data.priority, reverse=True) 234 self._type_transports[transport_type].sort(key=lambda transport_data: transport_data.priority, reverse=True)
240 self._transports[namespace] = transport_data 235 self._transports[namespace] = transport_data
241 log.debug(u"new jingle transport registered") 236 log.debug(u"new jingle transport registered")
242 237
243 @defer.inlineCallbacks 238 @defer.inlineCallbacks
244 def transportReplace(self, transport_ns, session, content_name, profile=C.PROF_KEY_NONE): 239 def transportReplace(self, client, transport_ns, session, content_name):
245 """Replace a transport 240 """Replace a transport
246 241
247 @param transport_ns(unicode): namespace of the new transport to use 242 @param transport_ns(unicode): namespace of the new transport to use
248 @param session(dict): jingle session data 243 @param session(dict): jingle session data
249 @param content_name(unicode): name of the content 244 @param content_name(unicode): name of the content
250 @param profile: %(doc_profile)s
251 """ 245 """
252 # XXX: for now we replace the transport before receiving confirmation from other peer 246 # XXX: for now we replace the transport before receiving confirmation from other peer
253 # this is acceptable because we terminate the session if transport is rejected. 247 # this is acceptable because we terminate the session if transport is rejected.
254 # this behavious may change in the future. 248 # this behavious may change in the future.
255 client = self.host.getClient(profile)
256 content_data= session['contents'][content_name] 249 content_data= session['contents'][content_name]
257 transport_data = content_data['transport_data'] 250 transport_data = content_data['transport_data']
258 try: 251 try:
259 transport = self._transports[transport_ns] 252 transport = self._transports[transport_ns]
260 except KeyError: 253 except KeyError:
261 raise exceptions.InternalError(u"Unkown transport") 254 raise exceptions.InternalError(u"Unkown transport")
262 yield content_data['transport'].handler.jingleHandler(XEP_0166.A_DESTROY, session, content_name, None, profile) 255 yield content_data['transport'].handler.jingleHandler(client, XEP_0166.A_DESTROY, session, content_name, None)
263 content_data['transport'] = transport 256 content_data['transport'] = transport
264 transport_data.clear() 257 transport_data.clear()
265 258
266 iq_elt, jingle_elt = self._buildJingleElt(client, session, XEP_0166.A_TRANSPORT_REPLACE) 259 iq_elt, jingle_elt = self._buildJingleElt(client, session, XEP_0166.A_TRANSPORT_REPLACE)
267 content_elt = jingle_elt.addElement('content') 260 content_elt = jingle_elt.addElement('content')
268 content_elt['name'] = content_name 261 content_elt['name'] = content_name
269 content_elt['creator'] = content_data['creator'] 262 content_elt['creator'] = content_data['creator']
270 263
271 transport_elt = transport.handler.jingleSessionInit(session, content_name, profile) 264 transport_elt = transport.handler.jingleSessionInit(client, session, content_name)
272 content_elt.addChild(transport_elt) 265 content_elt.addChild(transport_elt)
273 iq_elt.send() 266 iq_elt.send()
274 267
275 def buildAction(self, action, session, content_name, profile=C.PROF_KEY_NONE): 268 def buildAction(self, client, action, session, content_name):
276 """Build an element according to requested action 269 """Build an element according to requested action
277 270
278 @param action(unicode): a jingle action (see XEP-0166 §7.2), 271 @param action(unicode): a jingle action (see XEP-0166 §7.2),
279 session-* actions are not managed here 272 session-* actions are not managed here
280 transport-replace is managed in the dedicated [transportReplace] method 273 transport-replace is managed in the dedicated [transportReplace] method
281 @param session(dict): jingle session data 274 @param session(dict): jingle session data
282 @param content_name(unicode): name of the content 275 @param content_name(unicode): name of the content
283 @param profile: %(doc_profile)s
284 @return (tuple[domish.Element, domish.Element]): parent <iq> element, <transport> or <description> element, according to action 276 @return (tuple[domish.Element, domish.Element]): parent <iq> element, <transport> or <description> element, according to action
285 """ 277 """
286 client = self.host.getClient(profile)
287
288 # we first build iq, jingle and content element which are the same in every cases 278 # we first build iq, jingle and content element which are the same in every cases
289 iq_elt, jingle_elt = self._buildJingleElt(client, session, action) 279 iq_elt, jingle_elt = self._buildJingleElt(client, session, action)
290 # FIXME: XEP-0260 § 2.3 Ex 5 has an initiator attribute, but it should not according to XEP-0166 §7.1 table 1, must be checked 280 # FIXME: XEP-0260 § 2.3 Ex 5 has an initiator attribute, but it should not according to XEP-0166 §7.1 table 1, must be checked
291 content_data= session['contents'][content_name] 281 content_data= session['contents'][content_name]
292 content_elt = jingle_elt.addElement('content') 282 content_elt = jingle_elt.addElement('content')
299 else: 289 else:
300 raise exceptions.InternalError(u"unmanaged action {}".format(action)) 290 raise exceptions.InternalError(u"unmanaged action {}".format(action))
301 291
302 return iq_elt, context_elt 292 return iq_elt, context_elt
303 293
304 def buildSessionInfo(self, session, profile=C.PROF_KEY_NONE): 294 def buildSessionInfo(self, client, session):
305 """Build a session-info action 295 """Build a session-info action
306 296
307 @param session(dict): jingle session data 297 @param session(dict): jingle session data
308 @param profile: %(doc_profile)s
309 @return (tuple[domish.Element, domish.Element]): parent <iq> element, <jingle> element 298 @return (tuple[domish.Element, domish.Element]): parent <iq> element, <jingle> element
310 """ 299 """
311 client = self.host.getClient(profile)
312 return self._buildJingleElt(client, session, XEP_0166.A_SESSION_INFO) 300 return self._buildJingleElt(client, session, XEP_0166.A_SESSION_INFO)
313 301
314 @defer.inlineCallbacks 302 @defer.inlineCallbacks
315 def initiate(self, peer_jid, contents, profile=C.PROF_KEY_NONE): 303 def initiate(self, client, peer_jid, contents):
316 """Send a session initiation request 304 """Send a session initiation request
317 305
318 @param peer_jid(jid.JID): jid to establith session with 306 @param peer_jid(jid.JID): jid to establith session with
319 @param contents(list[dict]): list of contents to use: 307 @param contents(list[dict]): list of contents to use:
320 The dict must have the following keys: 308 The dict must have the following keys:
325 - name(unicode): name of the content 313 - name(unicode): name of the content
326 - senders(unicode): One of XEP_0166.ROLE_INITIATOR, XEP_0166.ROLE_RESPONDER, both or none 314 - senders(unicode): One of XEP_0166.ROLE_INITIATOR, XEP_0166.ROLE_RESPONDER, both or none
327 default to BOTH (see XEP-0166 §7.3) 315 default to BOTH (see XEP-0166 §7.3)
328 - app_args(list): args to pass to the application plugin 316 - app_args(list): args to pass to the application plugin
329 - app_kwargs(dict): keyword args to pass to the application plugin 317 - app_kwargs(dict): keyword args to pass to the application plugin
330 @param profile: %(doc_profile)s
331 @return D(unicode): jingle session id 318 @return D(unicode): jingle session id
332 """ 319 """
333 assert contents # there must be at least one content 320 assert contents # there must be at least one content
334 client = self.host.getClient(profile)
335 initiator = client.jid 321 initiator = client.jid
336 sid = unicode(uuid.uuid4()) 322 sid = unicode(uuid.uuid4())
337 # TODO: session cleaning after timeout ? 323 # TODO: session cleaning after timeout ?
338 session = client.jingle_sessions[sid] = {'id': sid, 324 session = client.jingle_sessions[sid] = {'id': sid,
339 'state': STATE_PENDING, 325 'state': STATE_PENDING,
390 pass 376 pass
391 377
392 # then the description element 378 # then the description element
393 app_args = content.get('app_args', []) 379 app_args = content.get('app_args', [])
394 app_kwargs = content.get('app_kwargs', {}) 380 app_kwargs = content.get('app_kwargs', {})
395 app_kwargs['profile'] = profile 381 desc_elt = yield application.handler.jingleSessionInit(client, session, content_name, *app_args, **app_kwargs)
396 desc_elt = yield application.handler.jingleSessionInit(session, content_name, *app_args, **app_kwargs)
397 content_elt.addChild(desc_elt) 382 content_elt.addChild(desc_elt)
398 383
399 # and the transport one 384 # and the transport one
400 transport_elt = yield transport.handler.jingleSessionInit(session, content_name, profile) 385 transport_elt = yield transport.handler.jingleSessionInit(client, session, content_name)
401 content_elt.addChild(transport_elt) 386 content_elt.addChild(transport_elt)
402 387
403 d = iq_elt.send() 388 d = iq_elt.send()
404 d.addErrback(self._iqError, sid, client) 389 d.addErrback(self._iqError, sid, client)
405 yield d 390 yield d
409 394
410 This is used to terminate a content inside a handler, to avoid modifying contents 395 This is used to terminate a content inside a handler, to avoid modifying contents
411 """ 396 """
412 reactor.callLater(0, self.contentTerminate, *args, **kwargs) 397 reactor.callLater(0, self.contentTerminate, *args, **kwargs)
413 398
414 def contentTerminate(self, session, content_name, reason=REASON_SUCCESS, profile=C.PROF_KEY_NONE): 399 def contentTerminate(self, client, session, content_name, reason=REASON_SUCCESS):
415 """Terminate and remove a content 400 """Terminate and remove a content
416 401
417 if there is no more content, then session is terminated 402 if there is no more content, then session is terminated
418 @param session(dict): jingle session 403 @param session(dict): jingle session
419 @param content_name(unicode): name of the content terminated 404 @param content_name(unicode): name of the content terminated
420 @param reason(unicode): reason of the termination 405 @param reason(unicode): reason of the termination
421 @param profile: %(doc_profile)s
422 """ 406 """
423 contents = session['contents'] 407 contents = session['contents']
424 del contents[content_name] 408 del contents[content_name]
425 if not contents: 409 if not contents:
426 self.terminate(reason, session, profile) 410 self.terminate(client, reason, session)
427 411
428 ## defaults methods called when plugin doesn't have them ## 412 ## defaults methods called when plugin doesn't have them ##
429 413
430 def jingleRequestConfirmationDefault(self, action, session, content_name, desc_elt, profile): 414 def jingleRequestConfirmationDefault(self, client, action, session, content_name, desc_elt):
431 """This method request confirmation for a jingle session""" 415 """This method request confirmation for a jingle session"""
432 log.debug(u"Using generic jingle confirmation method") 416 log.debug(u"Using generic jingle confirmation method")
433 return xml_tools.deferConfirm(self.host, _(CONFIRM_TXT).format(entity=session['peer_jid'].full()), _('Confirm Jingle session'), profile=profile) 417 return xml_tools.deferConfirm(self.host, _(CONFIRM_TXT).format(entity=session['peer_jid'].full()), _('Confirm Jingle session'), profile=client.profile)
434 418
435 ## jingle events ## 419 ## jingle events ##
436 420
437 def _onJingleRequest(self, request, profile): 421 def _onJingleRequest(self, request, client):
438 """Called when any jingle request is received 422 """Called when any jingle request is received
439 423
440 The request will the be dispatched to appropriate method 424 The request will then be dispatched to appropriate method
441 according to current state 425 according to current state
442 @param request(domish.Element): received IQ request 426 @param request(domish.Element): received IQ request
443 @para profile: %(doc_profile)s 427 """
444 """
445 client = self.host.getClient(profile)
446 request.handled = True 428 request.handled = True
447 jingle_elt = request.elements(NS_JINGLE, 'jingle').next() 429 jingle_elt = request.elements(NS_JINGLE, 'jingle').next()
448 430
449 # first we need the session id 431 # first we need the session id
450 try: 432 try:
451 sid = jingle_elt['sid'] 433 sid = jingle_elt['sid']
452 if not sid: 434 if not sid:
453 raise KeyError 435 raise KeyError
454 except KeyError: 436 except KeyError:
455 log.warning(u"Received jingle request has no sid attribute") 437 log.warning(u"Received jingle request has no sid attribute")
456 self.sendError('bad-request', None, request, profile=profile) 438 self.sendError(client, 'bad-request', None, request)
457 return 439 return
458 440
459 # then the action 441 # then the action
460 try: 442 try:
461 action = jingle_elt['action'] 443 action = jingle_elt['action']
462 if not action: 444 if not action:
463 raise KeyError 445 raise KeyError
464 except KeyError: 446 except KeyError:
465 log.warning(u"Received jingle request has no action") 447 log.warning(u"Received jingle request has no action")
466 self.sendError('bad-request', None, request, profile=profile) 448 self.sendError(client, 'bad-request', None, request)
467 return 449 return
468 450
469 peer_jid = jid.JID(request['from']) 451 peer_jid = jid.JID(request['from'])
470 452
471 # we get or create the session 453 # we get or create the session
472 try: 454 try:
473 session = client.jingle_sessions[sid] 455 session = client.jingle_sessions[sid]
474 except KeyError: 456 except KeyError:
475 if action != XEP_0166.A_SESSION_INITIATE: 457 if action == XEP_0166.A_SESSION_INITIATE:
476 log.warning(u"Received request for an unknown session id: {}".format(sid)) 458 pass
477 self.sendError('item-not-found', None, request, 'unknown-session', profile=profile) 459 elif action == XEP_0166.A_SESSION_TERMINATE:
460 log.debug(u"ignoring session terminate action (inexisting session id): {request_id} [{profile}]".format(
461 request_id=sid,
462 profile = client.profile))
463 return
464 else:
465 log.warning(u"Received request for an unknown session id: {request_id} [{profile}]".format(
466 request_id=sid,
467 profile = client.profile))
468 self.sendError(client, 'item-not-found', None, request, 'unknown-session')
478 return 469 return
479 470
480 session = client.jingle_sessions[sid] = {'id': sid, 471 session = client.jingle_sessions[sid] = {'id': sid,
481 'state': STATE_PENDING, 472 'state': STATE_PENDING,
482 'initiator': peer_jid, 473 'initiator': peer_jid,
485 'started': time.time(), 476 'started': time.time(),
486 } 477 }
487 else: 478 else:
488 if session['peer_jid'] != peer_jid: 479 if session['peer_jid'] != peer_jid:
489 log.warning(u"sid conflict ({}), the jid doesn't match. Can be a collision, a hack attempt, or a bad sid generation".format(sid)) 480 log.warning(u"sid conflict ({}), the jid doesn't match. Can be a collision, a hack attempt, or a bad sid generation".format(sid))
490 self.sendError('service-unavailable', sid, request, profile=profile) 481 self.sendError(client, 'service-unavailable', sid, request)
491 return 482 return
492 if session['id'] != sid: 483 if session['id'] != sid:
493 log.error(u"session id doesn't match") 484 log.error(u"session id doesn't match")
494 self.sendError('service-unavailable', sid, request, profile=profile) 485 self.sendError(client, 'service-unavailable', sid, request)
495 raise exceptions.InternalError 486 raise exceptions.InternalError
496 487
497 if action == XEP_0166.A_SESSION_INITIATE: 488 if action == XEP_0166.A_SESSION_INITIATE:
498 self.onSessionInitiate(client, request, jingle_elt, session) 489 self.onSessionInitiate(client, request, jingle_elt, session)
499 elif action == XEP_0166.A_SESSION_TERMINATE: 490 elif action == XEP_0166.A_SESSION_TERMINATE:
537 name = content_elt['name'] 528 name = content_elt['name']
538 529
539 if new: 530 if new:
540 # the content must not exist, we check it 531 # the content must not exist, we check it
541 if not name or name in contents_dict: 532 if not name or name in contents_dict:
542 self.sendError('bad-request', session['id'], request, profile=client.profile) 533 self.sendError(client, 'bad-request', session['id'], request)
543 raise exceptions.CancelError 534 raise exceptions.CancelError
544 content_data = contents_dict[name] = {'creator': creator, 535 content_data = contents_dict[name] = {'creator': creator,
545 'senders': content_elt.attributes.get('senders', 'both'), 536 'senders': content_elt.attributes.get('senders', 'both'),
546 } 537 }
547 else: 538 else:
548 # the content must exist, we check it 539 # the content must exist, we check it
549 try: 540 try:
550 content_data = contents_dict[name] 541 content_data = contents_dict[name]
551 except KeyError: 542 except KeyError:
552 log.warning(u"Other peer try to access an unknown content") 543 log.warning(u"Other peer try to access an unknown content")
553 self.sendError('bad-request', session['id'], request, profile=client.profile) 544 self.sendError(client, 'bad-request', session['id'], request)
554 raise exceptions.CancelError 545 raise exceptions.CancelError
555 546
556 # application 547 # application
557 if with_application: 548 if with_application:
558 desc_elt = content_elt.description 549 desc_elt = content_elt.description
559 if not desc_elt: 550 if not desc_elt:
560 self.sendError('bad-request', session['id'], request, profile=client.profile) 551 self.sendError(client, 'bad-request', session['id'], request)
561 raise exceptions.CancelError 552 raise exceptions.CancelError
562 553
563 if new: 554 if new:
564 # the content is new, we need to check and link the application 555 # the content is new, we need to check and link the application
565 app_ns = desc_elt.uri 556 app_ns = desc_elt.uri
566 if not app_ns or app_ns == NS_JINGLE: 557 if not app_ns or app_ns == NS_JINGLE:
567 self.sendError('bad-request', session['id'], request, profile=client.profile) 558 self.sendError(client, 'bad-request', session['id'], request)
568 raise exceptions.CancelError 559 raise exceptions.CancelError
569 560
570 try: 561 try:
571 application = self._applications[app_ns] 562 application = self._applications[app_ns]
572 except KeyError: 563 except KeyError:
573 log.warning(u"Unmanaged application namespace [{}]".format(app_ns)) 564 log.warning(u"Unmanaged application namespace [{}]".format(app_ns))
574 self.sendError('service-unavailable', session['id'], request, profile=client.profile) 565 self.sendError(client, 'service-unavailable', session['id'], request)
575 raise exceptions.CancelError 566 raise exceptions.CancelError
576 567
577 content_data['application'] = application 568 content_data['application'] = application
578 content_data['application_data'] = {} 569 content_data['application_data'] = {}
579 else: 570 else:
585 576
586 # transport 577 # transport
587 if with_transport: 578 if with_transport:
588 transport_elt = content_elt.transport 579 transport_elt = content_elt.transport
589 if not transport_elt: 580 if not transport_elt:
590 self.sendError('bad-request', session['id'], request, profile=client.profile) 581 self.sendError(client, 'bad-request', session['id'], request)
591 raise exceptions.CancelError 582 raise exceptions.CancelError
592 583
593 if new: 584 if new:
594 # the content is new, we need to check and link the transport 585 # the content is new, we need to check and link the transport
595 transport_ns = transport_elt.uri 586 transport_ns = transport_elt.uri
596 if not app_ns or app_ns == NS_JINGLE: 587 if not app_ns or app_ns == NS_JINGLE:
597 self.sendError('bad-request', session['id'], request, profile=client.profile) 588 self.sendError(client, 'bad-request', session['id'], request)
598 raise exceptions.CancelError 589 raise exceptions.CancelError
599 590
600 try: 591 try:
601 transport = self._transports[transport_ns] 592 transport = self._transports[transport_ns]
602 except KeyError: 593 except KeyError:
608 if 'transport_elt' in content_data: 599 if 'transport_elt' in content_data:
609 raise exceptions.InternalError(u"transport_elt should not exist at this point") 600 raise exceptions.InternalError(u"transport_elt should not exist at this point")
610 601
611 content_data['transport_elt'] = transport_elt 602 content_data['transport_elt'] = transport_elt
612 603
613 def _ignore(self, action, session, content_name, elt, profile): 604 def _ignore(self, client, action, session, content_name, elt):
614 """Dummy method used when not exception must be raised if a method is not implemented in _callPlugins 605 """Dummy method used when not exception must be raised if a method is not implemented in _callPlugins
615 606
616 must be used as app_default_cb and/or transp_default_cb 607 must be used as app_default_cb and/or transp_default_cb
617 """ 608 """
618 return elt 609 return elt
619 610
620 def _callPlugins(self, action, session, app_method_name='jingleHandler', transp_method_name='jingleHandler', 611 def _callPlugins(self, client, action, session, app_method_name='jingleHandler',
612 transp_method_name='jingleHandler',
621 app_default_cb=None, transp_default_cb=None, delete=True, 613 app_default_cb=None, transp_default_cb=None, delete=True,
622 elements=True, force_element=None, profile=C.PROF_KEY_NONE): 614 elements=True, force_element=None):
623 """Call application and transport plugin methods for all contents 615 """Call application and transport plugin methods for all contents
624 616
625 @param action(unicode): jingle action name 617 @param action(unicode): jingle action name
626 @param session(dict): jingle session data 618 @param session(dict): jingle session data
627 @param app_method_name(unicode, None): name of the method to call for applications 619 @param app_method_name(unicode, None): name of the method to call for applications
633 @param transp_default_cb(callable, None): default callback to use if plugin has not transp_method_name 625 @param transp_default_cb(callable, None): default callback to use if plugin has not transp_method_name
634 None to raise an exception instead 626 None to raise an exception instead
635 @param delete(bool): if True, remove desc_elt and transport_elt from session 627 @param delete(bool): if True, remove desc_elt and transport_elt from session
636 ignored if elements is False 628 ignored if elements is False
637 @param elements(bool): True if elements(desc_elt and tranport_elt) must be managed 629 @param elements(bool): True if elements(desc_elt and tranport_elt) must be managed
638 must be True if _callPlugins is used in a request, and False if it used after a request (i.e. on <iq> result or error) 630 must be True if _callPlugins is used in a request, and False if it used after a request
631 (i.e. on <iq> result or error)
639 @param force_element(None, domish.Element, object): if elements is False, it is used as element parameter 632 @param force_element(None, domish.Element, object): if elements is False, it is used as element parameter
640 else it is ignored 633 else it is ignored
641 @param profile(unicode): %(doc_profile)s
642 @return (list[defer.Deferred]): list of launched Deferred 634 @return (list[defer.Deferred]): list of launched Deferred
643 @raise exceptions.NotFound: method is not implemented 635 @raise exceptions.NotFound: method is not implemented
644 """ 636 """
645 contents_dict = session['contents'] 637 contents_dict = session['contents']
646 defers_list = [] 638 defers_list = []
661 method = default_cb 653 method = default_cb
662 if elements: 654 if elements:
663 elt = content_data.pop(elt_name) if delete else content_data[elt_name] 655 elt = content_data.pop(elt_name) if delete else content_data[elt_name]
664 else: 656 else:
665 elt = force_element 657 elt = force_element
666 d = defer.maybeDeferred(method, action, session, content_name, elt, profile) 658 d = defer.maybeDeferred(method, client, action, session, content_name, elt)
667 defers_list.append(d) 659 defers_list.append(d)
668 660
669 return defers_list 661 return defers_list
670 662
671 def onSessionInitiate(self, client, request, jingle_elt, session): 663 def onSessionInitiate(self, client, request, jingle_elt, session):
689 except exceptions.CancelError: 681 except exceptions.CancelError:
690 return 682 return
691 683
692 if not contents_dict: 684 if not contents_dict:
693 # there MUST be at least one content 685 # there MUST be at least one content
694 self.sendError('bad-request', session['id'], request, profile=client.profile) 686 self.sendError(client, 'bad-request', session['id'], request)
695 return 687 return
696 688
697 # at this point we can send the <iq/> result to confirm reception of the request 689 # at this point we can send the <iq/> result to confirm reception of the request
698 client.send(xmlstream.toResponse(request, 'result')) 690 client.send(xmlstream.toResponse(request, 'result'))
699 691
700 # we now request each application plugin confirmation 692 # we now request each application plugin confirmation
701 # and if all are accepted, we can accept the session 693 # and if all are accepted, we can accept the session
702 confirm_defers = self._callPlugins(XEP_0166.A_SESSION_INITIATE, session, 'jingleRequestConfirmation', None, self.jingleRequestConfirmationDefault, delete=False, profile=client.profile) 694 confirm_defers = self._callPlugins(client, XEP_0166.A_SESSION_INITIATE, session, 'jingleRequestConfirmation', None, self.jingleRequestConfirmationDefault, delete=False)
703 695
704 confirm_dlist = defer.gatherResults(confirm_defers) 696 confirm_dlist = defer.gatherResults(confirm_defers)
705 confirm_dlist.addCallback(self._confirmationCb, session, jingle_elt, client) 697 confirm_dlist.addCallback(self._confirmationCb, session, jingle_elt, client)
706 confirm_dlist.addErrback(self._jingleErrorCb, session['id'], request, client) 698 confirm_dlist.addErrback(self._jingleErrorCb, session['id'], request, client)
707 699
714 @param jingle_elt(domish.Element): jingle data of this session 706 @param jingle_elt(domish.Element): jingle data of this session
715 @param client: %(doc_client)s 707 @param client: %(doc_client)s
716 """ 708 """
717 confirmed = all(confirm_results) 709 confirmed = all(confirm_results)
718 if not confirmed: 710 if not confirmed:
719 return self.terminate(XEP_0166.REASON_DECLINE, session, client.profile) 711 return self.terminate(client, XEP_0166.REASON_DECLINE, session)
720 712
721 iq_elt, jingle_elt = self._buildJingleElt(client, session, XEP_0166.A_SESSION_ACCEPT) 713 iq_elt, jingle_elt = self._buildJingleElt(client, session, XEP_0166.A_SESSION_ACCEPT)
722 jingle_elt['responder'] = client.jid.full() 714 jingle_elt['responder'] = client.jid.full()
723 715
724 # contents 716 # contents
734 content_elt['name'] = content_name 726 content_elt['name'] = content_name
735 727
736 application = content_data['application'] 728 application = content_data['application']
737 app_session_accept_cb = application.handler.jingleHandler 729 app_session_accept_cb = application.handler.jingleHandler
738 730
739 app_d = defer.maybeDeferred(app_session_accept_cb, 731 app_d = defer.maybeDeferred(app_session_accept_cb, client,
740 XEP_0166.A_SESSION_INITIATE, session, content_name, content_data.pop('desc_elt'), client.profile) 732 XEP_0166.A_SESSION_INITIATE, session, content_name, content_data.pop('desc_elt'))
741 app_d.addCallback(addElement, content_elt) 733 app_d.addCallback(addElement, content_elt)
742 defers_list.append(app_d) 734 defers_list.append(app_d)
743 735
744 transport = content_data['transport'] 736 transport = content_data['transport']
745 transport_session_accept_cb = transport.handler.jingleHandler 737 transport_session_accept_cb = transport.handler.jingleHandler
746 738
747 transport_d = defer.maybeDeferred(transport_session_accept_cb, 739 transport_d = defer.maybeDeferred(transport_session_accept_cb, client,
748 XEP_0166.A_SESSION_INITIATE, session, content_name, content_data.pop('transport_elt'), client.profile) 740 XEP_0166.A_SESSION_INITIATE, session, content_name, content_data.pop('transport_elt'))
749 transport_d.addCallback(addElement, content_elt) 741 transport_d.addCallback(addElement, content_elt)
750 defers_list.append(transport_d) 742 defers_list.append(transport_d)
751 743
752 d_list = defer.DeferredList(defers_list) 744 d_list = defer.DeferredList(defers_list)
753 d_list.addCallback(lambda dummy: self._callPlugins(XEP_0166.A_PREPARE_RESPONDER, session, app_method_name=None, elements=False, profile=client.profile)) 745 d_list.addCallback(lambda dummy: self._callPlugins(client, XEP_0166.A_PREPARE_RESPONDER, session, app_method_name=None, elements=False))
754 d_list.addCallback(lambda dummy: iq_elt.send()) 746 d_list.addCallback(lambda dummy: iq_elt.send())
755 def changeState(dummy, session): 747 def changeState(dummy, session):
756 session['state'] = STATE_ACTIVE 748 session['state'] = STATE_ACTIVE
757 749
758 d_list.addCallback(changeState, session) 750 d_list.addCallback(changeState, session)
759 d_list.addCallback(lambda dummy: self._callPlugins(XEP_0166.A_ACCEPTED_ACK, session, elements=False, profile=client.profile)) 751 d_list.addCallback(lambda dummy: self._callPlugins(client, XEP_0166.A_ACCEPTED_ACK, session, elements=False))
760 d_list.addErrback(self._iqError, session['id'], client) 752 d_list.addErrback(self._iqError, session['id'], client)
761 return d_list 753 return d_list
762 754
763 def onSessionTerminate(self, client, request, jingle_elt, session): 755 def onSessionTerminate(self, client, request, jingle_elt, session):
764 # TODO: check reason, display a message to user if needed 756 # TODO: check reason, display a message to user if needed
767 reason_elt = jingle_elt.elements(NS_JINGLE, 'reason').next() 759 reason_elt = jingle_elt.elements(NS_JINGLE, 'reason').next()
768 except StopIteration: 760 except StopIteration:
769 log.warning(u"Not reason given for session termination") 761 log.warning(u"Not reason given for session termination")
770 reason_elt = jingle_elt.addElement('reason') 762 reason_elt = jingle_elt.addElement('reason')
771 763
772 terminate_defers = self._callPlugins(XEP_0166.A_SESSION_TERMINATE, session, 'jingleTerminate', 'jingleTerminate', self._ignore, self._ignore, elements=False, force_element=reason_elt, profile=client.profile) 764 terminate_defers = self._callPlugins(client, XEP_0166.A_SESSION_TERMINATE, session, 'jingleTerminate', 'jingleTerminate', self._ignore, self._ignore, elements=False, force_element=reason_elt)
773 terminate_dlist = defer.DeferredList(terminate_defers) 765 terminate_dlist = defer.DeferredList(terminate_defers)
774 766
775 terminate_dlist.addCallback(lambda dummy: self._delSession(client, session['id'])) 767 terminate_dlist.addCallback(lambda dummy: self._delSession(client, session['id']))
776 client.send(xmlstream.toResponse(request, 'result')) 768 client.send(xmlstream.toResponse(request, 'result'))
777 769
795 client.send(xmlstream.toResponse(request, 'result')) 787 client.send(xmlstream.toResponse(request, 'result'))
796 # and change the state 788 # and change the state
797 session['state'] = STATE_ACTIVE 789 session['state'] = STATE_ACTIVE
798 790
799 negociate_defers = [] 791 negociate_defers = []
800 negociate_defers = self._callPlugins(XEP_0166.A_SESSION_ACCEPT, session, profile=client.profile) 792 negociate_defers = self._callPlugins(client, XEP_0166.A_SESSION_ACCEPT, session)
801 793
802 negociate_dlist = defer.DeferredList(negociate_defers) 794 negociate_dlist = defer.DeferredList(negociate_defers)
803 795
804 # after negociations we start the transfer 796 # after negociations we start the transfer
805 negociate_dlist.addCallback(lambda dummy: self._callPlugins(XEP_0166.A_START, session, app_method_name=None, elements=False, profile=client.profile)) 797 negociate_dlist.addCallback(lambda dummy: self._callPlugins(client, XEP_0166.A_START, session, app_method_name=None, elements=False))
806 798
807 def _onSessionCb(self, result, client, request, jingle_elt, session): 799 def _onSessionCb(self, result, client, request, jingle_elt, session):
808 client.send(xmlstream.toResponse(request, 'result')) 800 client.send(xmlstream.toResponse(request, 'result'))
809 801
810 def _onSessionEb(self, failure_, client, request, jingle_elt, session): 802 def _onSessionEb(self, failure_, client, request, jingle_elt, session):
811 log.error(u"Error while handling onSessionInfo: {}".format(failure_.value)) 803 log.error(u"Error while handling onSessionInfo: {}".format(failure_.value))
812 # XXX: only error managed so far, maybe some applications/transports need more 804 # XXX: only error managed so far, maybe some applications/transports need more
813 self.sendError('feature-not-implemented', None, request, 'unsupported-info', client.profile) 805 self.sendError(client, 'feature-not-implemented', None, request, 'unsupported-info')
814 806
815 def onSessionInfo(self, client, request, jingle_elt, session): 807 def onSessionInfo(self, client, request, jingle_elt, session):
816 """Method called when a session-info action is received from other peer 808 """Method called when a session-info action is received from other peer
817 809
818 This method is only called for initiator 810 This method is only called for initiator
827 return 819 return
828 820
829 try: 821 try:
830 # XXX: session-info is most likely only used for application, so we don't call transport plugins 822 # XXX: session-info is most likely only used for application, so we don't call transport plugins
831 # if a future transport use it, this behaviour must be adapted 823 # if a future transport use it, this behaviour must be adapted
832 defers = self._callPlugins(XEP_0166.A_SESSION_INFO, session, 'jingleSessionInfo', None, 824 defers = self._callPlugins(client, XEP_0166.A_SESSION_INFO, session, 'jingleSessionInfo', None,
833 elements=False, force_element=jingle_elt, profile=client.profile) 825 elements=False, force_element=jingle_elt)
834 except exceptions.NotFound as e: 826 except exceptions.NotFound as e:
835 self._onSessionEb(failure.Failure(e), client, request, jingle_elt, session) 827 self._onSessionEb(failure.Failure(e), client, request, jingle_elt, session)
836 return 828 return
837 829
838 dlist = defer.DeferredList(defers, fireOnOneErrback=True) 830 dlist = defer.DeferredList(defers, fireOnOneErrback=True)
886 # at this point, everything is alright and we can replace the transport(s) 878 # at this point, everything is alright and we can replace the transport(s)
887 # this is similar to an session-accept action, but for transports only 879 # this is similar to an session-accept action, but for transports only
888 iq_elt, accept_jingle_elt = self._buildJingleElt(client, session, XEP_0166.A_TRANSPORT_ACCEPT) 880 iq_elt, accept_jingle_elt = self._buildJingleElt(client, session, XEP_0166.A_TRANSPORT_ACCEPT)
889 for content_name, content_data, transport, transport_elt in to_replace: 881 for content_name, content_data, transport, transport_elt in to_replace:
890 # we can now actually replace the transport 882 # we can now actually replace the transport
891 yield content_data['transport'].handler.jingleHandler(XEP_0166.A_DESTROY, session, content_name, None, client.profile) 883 yield content_data['transport'].handler.jingleHandler(client, XEP_0166.A_DESTROY, session, content_name, None)
892 content_data['transport'] = transport 884 content_data['transport'] = transport
893 content_data['transport_data'].clear() 885 content_data['transport_data'].clear()
894 # and build the element 886 # and build the element
895 content_elt = accept_jingle_elt.addElement('content') 887 content_elt = accept_jingle_elt.addElement('content')
896 content_elt['name'] = content_name 888 content_elt['name'] = content_name
897 content_elt['creator'] = content_data['creator'] 889 content_elt['creator'] = content_data['creator']
898 # we notify the transport and insert its <transport/> in the answer 890 # we notify the transport and insert its <transport/> in the answer
899 accept_transport_elt = yield transport.handler.jingleHandler(XEP_0166.A_TRANSPORT_REPLACE, session, content_name, transport_elt, client.profile) 891 accept_transport_elt = yield transport.handler.jingleHandler(client, XEP_0166.A_TRANSPORT_REPLACE, session, content_name, transport_elt)
900 content_elt.addChild(accept_transport_elt) 892 content_elt.addChild(accept_transport_elt)
901 # there is no confirmation needed here, so we can directly prepare it 893 # there is no confirmation needed here, so we can directly prepare it
902 yield transport.handler.jingleHandler(XEP_0166.A_PREPARE_RESPONDER, session, content_name, None, client.profile) 894 yield transport.handler.jingleHandler(client, XEP_0166.A_PREPARE_RESPONDER, session, content_name, None)
903 895
904 iq_elt.send() 896 iq_elt.send()
905 897
906 def onTransportAccept(self, client, request, jingle_elt, session): 898 def onTransportAccept(self, client, request, jingle_elt, session):
907 """Method called once transport replacement is accepted 899 """Method called once transport replacement is accepted
920 912
921 # at this point we can send the <iq/> result to confirm reception of the request 913 # at this point we can send the <iq/> result to confirm reception of the request
922 client.send(xmlstream.toResponse(request, 'result')) 914 client.send(xmlstream.toResponse(request, 'result'))
923 915
924 negociate_defers = [] 916 negociate_defers = []
925 negociate_defers = self._callPlugins(XEP_0166.A_TRANSPORT_ACCEPT, session, app_method_name=None, profile=client.profile) 917 negociate_defers = self._callPlugins(client, XEP_0166.A_TRANSPORT_ACCEPT, session, app_method_name=None)
926 918
927 negociate_dlist = defer.DeferredList(negociate_defers) 919 negociate_dlist = defer.DeferredList(negociate_defers)
928 920
929 # after negociations we start the transfer 921 # after negociations we start the transfer
930 negociate_dlist.addCallback(lambda dummy: self._callPlugins(XEP_0166.A_START, session, app_method_name=None, elements=False, profile=client.profile)) 922 negociate_dlist.addCallback(lambda dummy: self._callPlugins(client, XEP_0166.A_START, session, app_method_name=None, elements=False))
931 923
932 def onTransportReject(self, client, request, jingle_elt, session): 924 def onTransportReject(self, client, request, jingle_elt, session):
933 """Method called when a transport replacement is refused 925 """Method called when a transport replacement is refused
934 926
935 @param client: %(doc_client)s 927 @param client: %(doc_client)s
937 @param jingle_elt(domish.Element): the <jingle> element 929 @param jingle_elt(domish.Element): the <jingle> element
938 @param session(dict): session data 930 @param session(dict): session data
939 """ 931 """
940 # XXX: for now, we terminate the session in case of transport-reject 932 # XXX: for now, we terminate the session in case of transport-reject
941 # this behaviour may change in the future 933 # this behaviour may change in the future
942 self.terminate('failed-transport', session, client.profile) 934 self.terminate(client, 'failed-transport', session)
943 935
944 def onTransportInfo(self, client, request, jingle_elt, session): 936 def onTransportInfo(self, client, request, jingle_elt, session):
945 """Method called when a transport-info action is received from other peer 937 """Method called when a transport-info action is received from other peer
946 938
947 The request is parsed, and jingleHandler is called on concerned transport plugin(s) 939 The request is parsed, and jingleHandler is called on concerned transport plugin(s)
964 try: 956 try:
965 transport_elt = content_data.pop('transport_elt') 957 transport_elt = content_data.pop('transport_elt')
966 except KeyError: 958 except KeyError:
967 continue 959 continue
968 else: 960 else:
969 content_data['transport'].handler.jingleHandler(XEP_0166.A_TRANSPORT_INFO, session, content_name, transport_elt, client.profile) 961 content_data['transport'].handler.jingleHandler(client, XEP_0166.A_TRANSPORT_INFO, session, content_name, transport_elt)
970 962
971 963
972 class XEP_0166_handler(xmlstream.XMPPHandler): 964 class XEP_0166_handler(xmlstream.XMPPHandler):
973 implements(iwokkel.IDisco) 965 implements(iwokkel.IDisco)
974 966
975 def __init__(self, plugin_parent): 967 def __init__(self, plugin_parent):
976 self.plugin_parent = plugin_parent 968 self.plugin_parent = plugin_parent
977 969
978 def connectionInitialized(self): 970 def connectionInitialized(self):
979 self.xmlstream.addObserver(JINGLE_REQUEST, self.plugin_parent._onJingleRequest, profile=self.parent.profile) 971 self.xmlstream.addObserver(JINGLE_REQUEST, self.plugin_parent._onJingleRequest, client=self.parent)
980 972
981 def getDiscoInfo(self, requestor, target, nodeIdentifier=''): 973 def getDiscoInfo(self, requestor, target, nodeIdentifier=''):
982 return [disco.DiscoFeature(NS_JINGLE)] 974 return [disco.DiscoFeature(NS_JINGLE)]
983 975
984 def getDiscoItems(self, requestor, target, nodeIdentifier=''): 976 def getDiscoItems(self, requestor, target, nodeIdentifier=''):