comparison sat/plugins/plugin_xep_0166.py @ 2562:26edcf3a30eb

core, setup: huge cleaning: - moved directories from src and frontends/src to sat and sat_frontends, which is the recommanded naming convention - move twisted directory to root - removed all hacks from setup.py, and added missing dependencies, it is now clean - use https URL for website in setup.py - removed "Environment :: X11 Applications :: GTK", as wix is deprecated and removed - renamed sat.sh to sat and fixed its installation - added python_requires to specify Python version needed - replaced glib2reactor which use deprecated code by gtk3reactor sat can now be installed directly from virtualenv without using --system-site-packages anymore \o/
author Goffi <goffi@goffi.org>
date Mon, 02 Apr 2018 19:44:50 +0200
parents src/plugins/plugin_xep_0166.py@7ad5f2c4e34a
children 56f94936df1e
comparison
equal deleted inserted replaced
2561:bd30dc3ffe5a 2562:26edcf3a30eb
1 #!/usr/bin/env python2
2 # -*- coding: utf-8 -*-
3
4 # SAT plugin for Jingle (XEP-0166)
5 # Copyright (C) 2009-2018 Jérôme Poisson (goffi@goffi.org)
6
7 # This program is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU Affero General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
11
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU Affero General Public License for more details.
16
17 # You should have received a copy of the GNU Affero General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20 from sat.core.i18n import _, D_
21 from sat.core.constants import Const as C
22 from sat.core.log import getLogger
23 from sat.tools import xml_tools
24 log = getLogger(__name__)
25 from sat.core import exceptions
26 from twisted.words.protocols.jabber import jid
27 from twisted.internet import defer
28 from twisted.internet import reactor
29 from wokkel import disco, iwokkel
30 from twisted.words.protocols.jabber import error
31 from twisted.words.protocols.jabber import xmlstream
32 from twisted.python import failure
33 from collections import namedtuple
34 import uuid
35 import time
36
37 from zope.interface import implements
38
39
40
41 IQ_SET = '/iq[@type="set"]'
42 NS_JINGLE = "urn:xmpp:jingle:1"
43 NS_JINGLE_ERROR = "urn:xmpp:jingle:errors:1"
44 JINGLE_REQUEST = IQ_SET + '/jingle[@xmlns="' + NS_JINGLE + '"]'
45 STATE_PENDING = "PENDING"
46 STATE_ACTIVE = "ACTIVE"
47 STATE_ENDED = "ENDED"
48 CONFIRM_TXT = D_("{entity} want to start a jingle session with you, do you accept ?")
49
50 PLUGIN_INFO = {
51 C.PI_NAME: "Jingle",
52 C.PI_IMPORT_NAME: "XEP-0166",
53 C.PI_TYPE: "XEP",
54 C.PI_MODES: C.PLUG_MODE_BOTH,
55 C.PI_PROTOCOLS: ["XEP-0166"],
56 C.PI_MAIN: "XEP_0166",
57 C.PI_HANDLER: "yes",
58 C.PI_DESCRIPTION: _("""Implementation of Jingle""")
59 }
60
61
62 ApplicationData = namedtuple('ApplicationData', ('namespace', 'handler'))
63 TransportData = namedtuple('TransportData', ('namespace', 'handler', 'priority'))
64
65
66 class XEP_0166(object):
67 ROLE_INITIATOR = "initiator"
68 ROLE_RESPONDER = "responder"
69 TRANSPORT_DATAGRAM='UDP'
70 TRANSPORT_STREAMING='TCP'
71 REASON_SUCCESS='success'
72 REASON_DECLINE='decline'
73 REASON_FAILED_APPLICATION='failed-application'
74 REASON_FAILED_TRANSPORT='failed-transport'
75 REASON_CONNECTIVITY_ERROR='connectivity-error'
76 A_SESSION_INITIATE = "session-initiate"
77 A_SESSION_ACCEPT = "session-accept"
78 A_SESSION_TERMINATE = "session-terminate"
79 A_SESSION_INFO = "session-info"
80 A_TRANSPORT_REPLACE = "transport-replace"
81 A_TRANSPORT_ACCEPT = "transport-accept"
82 A_TRANSPORT_REJECT = "transport-reject"
83 A_TRANSPORT_INFO = "transport-info"
84 # non standard actions
85 A_PREPARE_INITIATOR = "prepare-initiator" # initiator must prepare tranfer
86 A_PREPARE_RESPONDER = "prepare-responder" # responder must prepare tranfer
87 A_ACCEPTED_ACK = "accepted-ack" # session accepted ack has been received from initiator
88 A_START = "start" # application can start
89 A_DESTROY = "destroy" # called when a transport is destroyed (e.g. because it is remplaced). Used to do cleaning operations
90
91 def __init__(self, host):
92 log.info(_("plugin Jingle initialization"))
93 self.host = host
94 self._applications = {} # key: namespace, value: application data
95 self._transports = {} # key: namespace, value: transport data
96 # we also keep transports by type, they are then sorted by priority
97 self._type_transports = { XEP_0166.TRANSPORT_DATAGRAM: [],
98 XEP_0166.TRANSPORT_STREAMING: [],
99 }
100
101 def profileConnected(self, client):
102 client.jingle_sessions = {} # key = sid, value = session_data
103
104 def getHandler(self, client):
105 return XEP_0166_handler(self)
106
107 def _delSession(self, client, sid):
108 try:
109 del client.jingle_sessions[sid]
110 except KeyError:
111 log.debug(u"Jingle session id [{}] is unknown, nothing to delete".format(sid))
112 else:
113 log.debug(u"Jingle session id [{}] deleted".format(sid))
114
115 ## helpers methods to build stanzas ##
116
117 def _buildJingleElt(self, client, session, action):
118 iq_elt = client.IQ('set')
119 iq_elt['from'] = client.jid.full()
120 iq_elt['to'] = session['peer_jid'].full()
121 jingle_elt = iq_elt.addElement("jingle", NS_JINGLE)
122 jingle_elt["sid"] = session['id']
123 jingle_elt['action'] = action
124 return iq_elt, jingle_elt
125
126 def sendError(self, client, error_condition, sid, request, jingle_condition=None):
127 """Send error stanza
128
129 @param error_condition: one of twisted.words.protocols.jabber.error.STANZA_CONDITIONS keys
130 @param sid(unicode,None): jingle session id, or None, if session must not be destroyed
131 @param request(domish.Element): original request
132 @param jingle_condition(None, unicode): if not None, additional jingle-specific error information
133 """
134 iq_elt = error.StanzaError(error_condition).toResponse(request)
135 if jingle_condition is not None:
136 iq_elt.error.addElement((NS_JINGLE_ERROR, jingle_condition))
137 if error.STANZA_CONDITIONS[error_condition]['type'] == 'cancel' and sid:
138 self._delSession(client, sid)
139 log.warning(u"Error while managing jingle session, cancelling: {condition}".format(error_condition))
140 client.send(iq_elt)
141
142 def _terminateEb(self, failure_):
143 log.warning(_(u"Error while terminating session: {msg}").format(msg=failure_))
144
145 def terminate(self, client, reason, session):
146 """Terminate the session
147
148 send the session-terminate action, and delete the session data
149 @param reason(unicode, list[domish.Element]): if unicode, will be transformed to an element
150 if a list of element, add them as children of the <reason/> element
151 @param session(dict): data of the session
152 """
153 iq_elt, jingle_elt = self._buildJingleElt(client, session, XEP_0166.A_SESSION_TERMINATE)
154 reason_elt = jingle_elt.addElement('reason')
155 if isinstance(reason, basestring):
156 reason_elt.addElement(reason)
157 else:
158 for elt in reason:
159 reason_elt.addChild(elt)
160 self._delSession(client, session['id'])
161 d = iq_elt.send()
162 d.addErrback(self._terminateEb)
163 return d
164
165 ## errors which doesn't imply a stanza sending ##
166
167 def _iqError(self, failure_, sid, client):
168 """Called when we got an <iq/> error
169
170 @param failure_(failure.Failure): the exceptions raised
171 @param sid(unicode): jingle session id
172 """
173 log.warning(u"Error while sending jingle <iq/> stanza: {failure_}".format(failure_=failure_.value))
174 self._delSession(client, sid)
175
176 def _jingleErrorCb(self, fail, sid, request, client):
177 """Called when something is going wrong while parsing jingle request
178
179 The error condition depend of the exceptions raised:
180 exceptions.DataError raise a bad-request condition
181 @param fail(failure.Failure): the exceptions raised
182 @param sid(unicode): jingle session id
183 @param request(domsih.Element): jingle request
184 @param client: %(doc_client)s
185 """
186 log.warning("Error while processing jingle request")
187 if isinstance(fail, exceptions.DataError):
188 self.sendError(client, 'bad-request', sid, request)
189 else:
190 log.error("Unmanaged jingle exception")
191 self._delSession(client, sid)
192 raise fail
193
194 ## methods used by other plugins ##
195
196 def registerApplication(self, namespace, handler):
197 """Register an application plugin
198
199 @param namespace(unicode): application namespace managed by the plugin
200 @param handler(object): instance of a class which manage the application.
201 May have the following methods:
202 - requestConfirmation(session, desc_elt, client):
203 - if present, it is called on when session must be accepted.
204 - if it return True the session is accepted, else rejected.
205 A Deferred can be returned
206 - if not present, a generic accept dialog will be used
207 - jingleSessionInit(client, self, session, content_name[, *args, **kwargs]): must return the domish.Element used for initial content
208 - jingleHandler(client, self, action, session, content_name, transport_elt):
209 called on several action to negociate the application or transport
210 - jingleTerminate: called on session terminate, with reason_elt
211 May be used to clean session
212 """
213 if namespace in self._applications:
214 raise exceptions.ConflictError(u"Trying to register already registered namespace {}".format(namespace))
215 self._applications[namespace] = ApplicationData(namespace=namespace, handler=handler)
216 log.debug(u"new jingle application registered")
217
218 def registerTransport(self, namespace, transport_type, handler, priority=0):
219 """Register a transport plugin
220
221 @param namespace(unicode): the XML namespace used for this transport
222 @param transport_type(unicode): type of transport to use (see XEP-0166 §8)
223 @param handler(object): instance of a class which manage the application.
224 Must have the following methods:
225 - jingleSessionInit(client, self, session, content_name[, *args, **kwargs]): must return the domish.Element used for initial content
226 - jingleHandler(client, self, action, session, content_name, transport_elt):
227 called on several action to negociate the application or transport
228 @param priority(int): priority of this transport
229 """
230 assert transport_type in (XEP_0166.TRANSPORT_DATAGRAM, XEP_0166.TRANSPORT_STREAMING)
231 if namespace in self._transports:
232 raise exceptions.ConflictError(u"Trying to register already registered namespace {}".format(namespace))
233 transport_data = TransportData(namespace=namespace, handler=handler, priority=priority)
234 self._type_transports[transport_type].append(transport_data)
235 self._type_transports[transport_type].sort(key=lambda transport_data: transport_data.priority, reverse=True)
236 self._transports[namespace] = transport_data
237 log.debug(u"new jingle transport registered")
238
239 @defer.inlineCallbacks
240 def transportReplace(self, client, transport_ns, session, content_name):
241 """Replace a transport
242
243 @param transport_ns(unicode): namespace of the new transport to use
244 @param session(dict): jingle session data
245 @param content_name(unicode): name of the content
246 """
247 # XXX: for now we replace the transport before receiving confirmation from other peer
248 # this is acceptable because we terminate the session if transport is rejected.
249 # this behavious may change in the future.
250 content_data= session['contents'][content_name]
251 transport_data = content_data['transport_data']
252 try:
253 transport = self._transports[transport_ns]
254 except KeyError:
255 raise exceptions.InternalError(u"Unkown transport")
256 yield content_data['transport'].handler.jingleHandler(client, XEP_0166.A_DESTROY, session, content_name, None)
257 content_data['transport'] = transport
258 transport_data.clear()
259
260 iq_elt, jingle_elt = self._buildJingleElt(client, session, XEP_0166.A_TRANSPORT_REPLACE)
261 content_elt = jingle_elt.addElement('content')
262 content_elt['name'] = content_name
263 content_elt['creator'] = content_data['creator']
264
265 transport_elt = transport.handler.jingleSessionInit(client, session, content_name)
266 content_elt.addChild(transport_elt)
267 iq_elt.send()
268
269 def buildAction(self, client, action, session, content_name):
270 """Build an element according to requested action
271
272 @param action(unicode): a jingle action (see XEP-0166 §7.2),
273 session-* actions are not managed here
274 transport-replace is managed in the dedicated [transportReplace] method
275 @param session(dict): jingle session data
276 @param content_name(unicode): name of the content
277 @return (tuple[domish.Element, domish.Element]): parent <iq> element, <transport> or <description> element, according to action
278 """
279 # we first build iq, jingle and content element which are the same in every cases
280 iq_elt, jingle_elt = self._buildJingleElt(client, session, action)
281 # FIXME: XEP-0260 § 2.3 Ex 5 has an initiator attribute, but it should not according to XEP-0166 §7.1 table 1, must be checked
282 content_data= session['contents'][content_name]
283 content_elt = jingle_elt.addElement('content')
284 content_elt['name'] = content_name
285 content_elt['creator'] = content_data['creator']
286
287 if action == XEP_0166.A_TRANSPORT_INFO:
288 context_elt = transport_elt = content_elt.addElement('transport', content_data['transport'].namespace)
289 transport_elt['sid'] = content_data['transport_data']['sid']
290 else:
291 raise exceptions.InternalError(u"unmanaged action {}".format(action))
292
293 return iq_elt, context_elt
294
295 def buildSessionInfo(self, client, session):
296 """Build a session-info action
297
298 @param session(dict): jingle session data
299 @return (tuple[domish.Element, domish.Element]): parent <iq> element, <jingle> element
300 """
301 return self._buildJingleElt(client, session, XEP_0166.A_SESSION_INFO)
302
303 @defer.inlineCallbacks
304 def initiate(self, client, peer_jid, contents):
305 """Send a session initiation request
306
307 @param peer_jid(jid.JID): jid to establith session with
308 @param contents(list[dict]): list of contents to use:
309 The dict must have the following keys:
310 - app_ns(unicode): namespace of the application
311 the following keys are optional:
312 - transport_type(unicode): type of transport to use (see XEP-0166 §8)
313 default to TRANSPORT_STREAMING
314 - name(unicode): name of the content
315 - senders(unicode): One of XEP_0166.ROLE_INITIATOR, XEP_0166.ROLE_RESPONDER, both or none
316 default to BOTH (see XEP-0166 §7.3)
317 - app_args(list): args to pass to the application plugin
318 - app_kwargs(dict): keyword args to pass to the application plugin
319 @return D(unicode): jingle session id
320 """
321 assert contents # there must be at least one content
322 if peer_jid == client.jid:
323 raise ValueError(_(u"You can't do a jingle session with yourself"))
324 initiator = client.jid
325 sid = unicode(uuid.uuid4())
326 # TODO: session cleaning after timeout ?
327 session = client.jingle_sessions[sid] = {'id': sid,
328 'state': STATE_PENDING,
329 'initiator': initiator,
330 'role': XEP_0166.ROLE_INITIATOR,
331 'peer_jid': peer_jid,
332 'started': time.time(),
333 'contents': {}
334 }
335 iq_elt, jingle_elt = self._buildJingleElt(client, session, XEP_0166.A_SESSION_INITIATE)
336 jingle_elt["initiator"] = initiator.full()
337
338 contents_dict = session['contents']
339
340 for content in contents:
341 # we get the application plugin
342 app_ns = content['app_ns']
343 try:
344 application = self._applications[app_ns]
345 except KeyError:
346 raise exceptions.InternalError(u"No application registered for {}".format(app_ns))
347
348 # and the transport plugin
349 transport_type = content.get('transport_type', XEP_0166.TRANSPORT_STREAMING)
350 try:
351 transport = self._type_transports[transport_type][0]
352 except IndexError:
353 raise exceptions.InternalError(u"No transport registered for {}".format(transport_type))
354
355 # we build the session data
356 content_data = {'application': application,
357 'application_data': {},
358 'transport': transport,
359 'transport_data': {},
360 'creator': XEP_0166.ROLE_INITIATOR,
361 'senders': content.get('senders', 'both'),
362 }
363 try:
364 content_name = content['name']
365 except KeyError:
366 content_name = unicode(uuid.uuid4())
367 else:
368 if content_name in contents_dict:
369 raise exceptions.InternalError('There is already a content with this name')
370 contents_dict[content_name] = content_data
371
372 # we construct the content element
373 content_elt = jingle_elt.addElement('content')
374 content_elt['creator'] = content_data['creator']
375 content_elt['name'] = content_name
376 try:
377 content_elt['senders'] = content['senders']
378 except KeyError:
379 pass
380
381 # then the description element
382 app_args = content.get('app_args', [])
383 app_kwargs = content.get('app_kwargs', {})
384 desc_elt = yield application.handler.jingleSessionInit(client, session, content_name, *app_args, **app_kwargs)
385 content_elt.addChild(desc_elt)
386
387 # and the transport one
388 transport_elt = yield transport.handler.jingleSessionInit(client, session, content_name)
389 content_elt.addChild(transport_elt)
390
391 try:
392 yield iq_elt.send()
393 except Exception as e:
394 failure_ = failure.Failure(e)
395 self._iqError(failure_, sid, client)
396 raise failure_
397
398 def delayedContentTerminate(self, *args, **kwargs):
399 """Put contentTerminate in queue but don't execute immediately
400
401 This is used to terminate a content inside a handler, to avoid modifying contents
402 """
403 reactor.callLater(0, self.contentTerminate, *args, **kwargs)
404
405 def contentTerminate(self, client, session, content_name, reason=REASON_SUCCESS):
406 """Terminate and remove a content
407
408 if there is no more content, then session is terminated
409 @param session(dict): jingle session
410 @param content_name(unicode): name of the content terminated
411 @param reason(unicode): reason of the termination
412 """
413 contents = session['contents']
414 del contents[content_name]
415 if not contents:
416 self.terminate(client, reason, session)
417
418 ## defaults methods called when plugin doesn't have them ##
419
420 def jingleRequestConfirmationDefault(self, client, action, session, content_name, desc_elt):
421 """This method request confirmation for a jingle session"""
422 log.debug(u"Using generic jingle confirmation method")
423 return xml_tools.deferConfirm(self.host, _(CONFIRM_TXT).format(entity=session['peer_jid'].full()), _('Confirm Jingle session'), profile=client.profile)
424
425 ## jingle events ##
426
427 def _onJingleRequest(self, request, client):
428 """Called when any jingle request is received
429
430 The request will then be dispatched to appropriate method
431 according to current state
432 @param request(domish.Element): received IQ request
433 """
434 request.handled = True
435 jingle_elt = request.elements(NS_JINGLE, 'jingle').next()
436
437 # first we need the session id
438 try:
439 sid = jingle_elt['sid']
440 if not sid:
441 raise KeyError
442 except KeyError:
443 log.warning(u"Received jingle request has no sid attribute")
444 self.sendError(client, 'bad-request', None, request)
445 return
446
447 # then the action
448 try:
449 action = jingle_elt['action']
450 if not action:
451 raise KeyError
452 except KeyError:
453 log.warning(u"Received jingle request has no action")
454 self.sendError(client, 'bad-request', None, request)
455 return
456
457 peer_jid = jid.JID(request['from'])
458
459 # we get or create the session
460 try:
461 session = client.jingle_sessions[sid]
462 except KeyError:
463 if action == XEP_0166.A_SESSION_INITIATE:
464 pass
465 elif action == XEP_0166.A_SESSION_TERMINATE:
466 log.debug(u"ignoring session terminate action (inexisting session id): {request_id} [{profile}]".format(
467 request_id=sid,
468 profile = client.profile))
469 return
470 else:
471 log.warning(u"Received request for an unknown session id: {request_id} [{profile}]".format(
472 request_id=sid,
473 profile = client.profile))
474 self.sendError(client, 'item-not-found', None, request, 'unknown-session')
475 return
476
477 session = client.jingle_sessions[sid] = {'id': sid,
478 'state': STATE_PENDING,
479 'initiator': peer_jid,
480 'role': XEP_0166.ROLE_RESPONDER,
481 'peer_jid': peer_jid,
482 'started': time.time(),
483 }
484 else:
485 if session['peer_jid'] != peer_jid:
486 log.warning(u"sid conflict ({}), the jid doesn't match. Can be a collision, a hack attempt, or a bad sid generation".format(sid))
487 self.sendError(client, 'service-unavailable', sid, request)
488 return
489 if session['id'] != sid:
490 log.error(u"session id doesn't match")
491 self.sendError(client, 'service-unavailable', sid, request)
492 raise exceptions.InternalError
493
494 if action == XEP_0166.A_SESSION_INITIATE:
495 self.onSessionInitiate(client, request, jingle_elt, session)
496 elif action == XEP_0166.A_SESSION_TERMINATE:
497 self.onSessionTerminate(client, request, jingle_elt, session)
498 elif action == XEP_0166.A_SESSION_ACCEPT:
499 self.onSessionAccept(client, request, jingle_elt, session)
500 elif action == XEP_0166.A_SESSION_INFO:
501 self.onSessionInfo(client, request, jingle_elt, session)
502 elif action == XEP_0166.A_TRANSPORT_INFO:
503 self.onTransportInfo(client, request, jingle_elt, session)
504 elif action == XEP_0166.A_TRANSPORT_REPLACE:
505 self.onTransportReplace(client, request, jingle_elt, session)
506 elif action == XEP_0166.A_TRANSPORT_ACCEPT:
507 self.onTransportAccept(client, request, jingle_elt, session)
508 elif action == XEP_0166.A_TRANSPORT_REJECT:
509 self.onTransportReject(client, request, jingle_elt, session)
510 else:
511 raise exceptions.InternalError(u"Unknown action {}".format(action))
512
513 ## Actions callbacks ##
514
515 def _parseElements(self, jingle_elt, session, request, client, new=False, creator=ROLE_INITIATOR, with_application=True, with_transport=True):
516 """Parse contents elements and fill contents_dict accordingly
517
518 after the parsing, contents_dict will containt handlers, "desc_elt" and "transport_elt"
519 @param jingle_elt(domish.Element): parent <jingle> element, containing one or more <content>
520 @param session(dict): session data
521 @param request(domish.Element): the whole request
522 @param client: %(doc_client)s
523 @param new(bool): True if the content is new and must be created,
524 else the content must exists, and session data will be filled
525 @param creator(unicode): only used if new is True: creating pear (see § 7.3)
526 @param with_application(bool): if True, raise an error if there is no <description> element else ignore it
527 @param with_transport(bool): if True, raise an error if there is no <transport> element else ignore it
528 @raise exceptions.CancelError: the error is treated and the calling method can cancel the treatment (i.e. return)
529 """
530 contents_dict = session['contents']
531 content_elts = jingle_elt.elements(NS_JINGLE, 'content')
532
533 for content_elt in content_elts:
534 name = content_elt['name']
535
536 if new:
537 # the content must not exist, we check it
538 if not name or name in contents_dict:
539 self.sendError(client, 'bad-request', session['id'], request)
540 raise exceptions.CancelError
541 content_data = contents_dict[name] = {'creator': creator,
542 'senders': content_elt.attributes.get('senders', 'both'),
543 }
544 else:
545 # the content must exist, we check it
546 try:
547 content_data = contents_dict[name]
548 except KeyError:
549 log.warning(u"Other peer try to access an unknown content")
550 self.sendError(client, 'bad-request', session['id'], request)
551 raise exceptions.CancelError
552
553 # application
554 if with_application:
555 desc_elt = content_elt.description
556 if not desc_elt:
557 self.sendError(client, 'bad-request', session['id'], request)
558 raise exceptions.CancelError
559
560 if new:
561 # the content is new, we need to check and link the application
562 app_ns = desc_elt.uri
563 if not app_ns or app_ns == NS_JINGLE:
564 self.sendError(client, 'bad-request', session['id'], request)
565 raise exceptions.CancelError
566
567 try:
568 application = self._applications[app_ns]
569 except KeyError:
570 log.warning(u"Unmanaged application namespace [{}]".format(app_ns))
571 self.sendError(client, 'service-unavailable', session['id'], request)
572 raise exceptions.CancelError
573
574 content_data['application'] = application
575 content_data['application_data'] = {}
576 else:
577 # the content exists, we check that we have not a former desc_elt
578 if 'desc_elt' in content_data:
579 raise exceptions.InternalError(u"desc_elt should not exist at this point")
580
581 content_data['desc_elt'] = desc_elt
582
583 # transport
584 if with_transport:
585 transport_elt = content_elt.transport
586 if not transport_elt:
587 self.sendError(client, 'bad-request', session['id'], request)
588 raise exceptions.CancelError
589
590 if new:
591 # the content is new, we need to check and link the transport
592 transport_ns = transport_elt.uri
593 if not app_ns or app_ns == NS_JINGLE:
594 self.sendError(client, 'bad-request', session['id'], request)
595 raise exceptions.CancelError
596
597 try:
598 transport = self._transports[transport_ns]
599 except KeyError:
600 raise exceptions.InternalError(u"No transport registered for namespace {}".format(transport_ns))
601 content_data['transport'] = transport
602 content_data['transport_data'] = {}
603 else:
604 # the content exists, we check that we have not a former transport_elt
605 if 'transport_elt' in content_data:
606 raise exceptions.InternalError(u"transport_elt should not exist at this point")
607
608 content_data['transport_elt'] = transport_elt
609
610 def _ignore(self, client, action, session, content_name, elt):
611 """Dummy method used when not exception must be raised if a method is not implemented in _callPlugins
612
613 must be used as app_default_cb and/or transp_default_cb
614 """
615 return elt
616
617 def _callPlugins(self, client, action, session, app_method_name='jingleHandler',
618 transp_method_name='jingleHandler',
619 app_default_cb=None, transp_default_cb=None, delete=True,
620 elements=True, force_element=None):
621 """Call application and transport plugin methods for all contents
622
623 @param action(unicode): jingle action name
624 @param session(dict): jingle session data
625 @param app_method_name(unicode, None): name of the method to call for applications
626 None to ignore
627 @param transp_method_name(unicode, None): name of the method to call for transports
628 None to ignore
629 @param app_default_cb(callable, None): default callback to use if plugin has not app_method_name
630 None to raise an exception instead
631 @param transp_default_cb(callable, None): default callback to use if plugin has not transp_method_name
632 None to raise an exception instead
633 @param delete(bool): if True, remove desc_elt and transport_elt from session
634 ignored if elements is False
635 @param elements(bool): True if elements(desc_elt and tranport_elt) must be managed
636 must be True if _callPlugins is used in a request, and False if it used after a request
637 (i.e. on <iq> result or error)
638 @param force_element(None, domish.Element, object): if elements is False, it is used as element parameter
639 else it is ignored
640 @return (list[defer.Deferred]): list of launched Deferred
641 @raise exceptions.NotFound: method is not implemented
642 """
643 contents_dict = session['contents']
644 defers_list = []
645 for content_name, content_data in contents_dict.iteritems():
646 for method_name, handler_key, default_cb, elt_name in (
647 (app_method_name, 'application', app_default_cb, 'desc_elt'),
648 (transp_method_name, 'transport', transp_default_cb, 'transport_elt')):
649 if method_name is None:
650 continue
651
652 handler = content_data[handler_key].handler
653 try:
654 method = getattr(handler, method_name)
655 except AttributeError:
656 if default_cb is None:
657 raise exceptions.NotFound(u'{} not implemented !'.format(method_name))
658 else:
659 method = default_cb
660 if elements:
661 elt = content_data.pop(elt_name) if delete else content_data[elt_name]
662 else:
663 elt = force_element
664 d = defer.maybeDeferred(method, client, action, session, content_name, elt)
665 defers_list.append(d)
666
667 return defers_list
668
669 def onSessionInitiate(self, client, request, jingle_elt, session):
670 """Called on session-initiate action
671
672 The "jingleRequestConfirmation" method of each application will be called
673 (or self.jingleRequestConfirmationDefault if the former doesn't exist).
674 The session is only accepted if all application are confirmed.
675 The application must manage itself multiple contents scenari (e.g. audio/video).
676 @param client: %(doc_client)s
677 @param request(domish.Element): full request
678 @param jingle_elt(domish.Element): <jingle> element
679 @param session(dict): session data
680 """
681 if 'contents' in session:
682 raise exceptions.InternalError("Contents dict should not already exist at this point")
683 session['contents'] = contents_dict = {}
684
685 try:
686 self._parseElements(jingle_elt, session, request, client, True, XEP_0166.ROLE_INITIATOR)
687 except exceptions.CancelError:
688 return
689
690 if not contents_dict:
691 # there MUST be at least one content
692 self.sendError(client, 'bad-request', session['id'], request)
693 return
694
695 # at this point we can send the <iq/> result to confirm reception of the request
696 client.send(xmlstream.toResponse(request, 'result'))
697
698 # we now request each application plugin confirmation
699 # and if all are accepted, we can accept the session
700 confirm_defers = self._callPlugins(client, XEP_0166.A_SESSION_INITIATE, session, 'jingleRequestConfirmation', None, self.jingleRequestConfirmationDefault, delete=False)
701
702 confirm_dlist = defer.gatherResults(confirm_defers)
703 confirm_dlist.addCallback(self._confirmationCb, session, jingle_elt, client)
704 confirm_dlist.addErrback(self._jingleErrorCb, session['id'], request, client)
705
706 def _confirmationCb(self, confirm_results, session, jingle_elt, client):
707 """Method called when confirmation from user has been received
708
709 This method is only called for the responder
710 @param confirm_results(list[bool]): all True if session is accepted
711 @param session(dict): session data
712 @param jingle_elt(domish.Element): jingle data of this session
713 @param client: %(doc_client)s
714 """
715 confirmed = all(confirm_results)
716 if not confirmed:
717 return self.terminate(client, XEP_0166.REASON_DECLINE, session)
718
719 iq_elt, jingle_elt = self._buildJingleElt(client, session, XEP_0166.A_SESSION_ACCEPT)
720 jingle_elt['responder'] = client.jid.full()
721
722 # contents
723
724 def addElement(domish_elt, content_elt):
725 content_elt.addChild(domish_elt)
726
727 defers_list = []
728
729 for content_name, content_data in session['contents'].iteritems():
730 content_elt = jingle_elt.addElement('content')
731 content_elt['creator'] = XEP_0166.ROLE_INITIATOR
732 content_elt['name'] = content_name
733
734 application = content_data['application']
735 app_session_accept_cb = application.handler.jingleHandler
736
737 app_d = defer.maybeDeferred(app_session_accept_cb, client,
738 XEP_0166.A_SESSION_INITIATE, session, content_name, content_data.pop('desc_elt'))
739 app_d.addCallback(addElement, content_elt)
740 defers_list.append(app_d)
741
742 transport = content_data['transport']
743 transport_session_accept_cb = transport.handler.jingleHandler
744
745 transport_d = defer.maybeDeferred(transport_session_accept_cb, client,
746 XEP_0166.A_SESSION_INITIATE, session, content_name, content_data.pop('transport_elt'))
747 transport_d.addCallback(addElement, content_elt)
748 defers_list.append(transport_d)
749
750 d_list = defer.DeferredList(defers_list)
751 d_list.addCallback(lambda dummy: self._callPlugins(client, XEP_0166.A_PREPARE_RESPONDER, session, app_method_name=None, elements=False))
752 d_list.addCallback(lambda dummy: iq_elt.send())
753 def changeState(dummy, session):
754 session['state'] = STATE_ACTIVE
755
756 d_list.addCallback(changeState, session)
757 d_list.addCallback(lambda dummy: self._callPlugins(client, XEP_0166.A_ACCEPTED_ACK, session, elements=False))
758 d_list.addErrback(self._iqError, session['id'], client)
759 return d_list
760
761 def onSessionTerminate(self, client, request, jingle_elt, session):
762 # TODO: check reason, display a message to user if needed
763 log.debug("Jingle Session {} terminated".format(session['id']))
764 try:
765 reason_elt = jingle_elt.elements(NS_JINGLE, 'reason').next()
766 except StopIteration:
767 log.warning(u"No reason given for session termination")
768 reason_elt = jingle_elt.addElement('reason')
769
770 terminate_defers = self._callPlugins(client, XEP_0166.A_SESSION_TERMINATE, session, 'jingleTerminate', 'jingleTerminate', self._ignore, self._ignore, elements=False, force_element=reason_elt)
771 terminate_dlist = defer.DeferredList(terminate_defers)
772
773 terminate_dlist.addCallback(lambda dummy: self._delSession(client, session['id']))
774 client.send(xmlstream.toResponse(request, 'result'))
775
776 def onSessionAccept(self, client, request, jingle_elt, session):
777 """Method called once session is accepted
778
779 This method is only called for initiator
780 @param client: %(doc_client)s
781 @param request(domish.Element): full <iq> request
782 @param jingle_elt(domish.Element): the <jingle> element
783 @param session(dict): session data
784 """
785 log.debug(u"Jingle session {} has been accepted".format(session['id']))
786
787 try:
788 self._parseElements(jingle_elt, session, request, client)
789 except exceptions.CancelError:
790 return
791
792 # at this point we can send the <iq/> result to confirm reception of the request
793 client.send(xmlstream.toResponse(request, 'result'))
794 # and change the state
795 session['state'] = STATE_ACTIVE
796
797 negociate_defers = []
798 negociate_defers = self._callPlugins(client, XEP_0166.A_SESSION_ACCEPT, session)
799
800 negociate_dlist = defer.DeferredList(negociate_defers)
801
802 # after negociations we start the transfer
803 negociate_dlist.addCallback(lambda dummy: self._callPlugins(client, XEP_0166.A_START, session, app_method_name=None, elements=False))
804
805 def _onSessionCb(self, result, client, request, jingle_elt, session):
806 client.send(xmlstream.toResponse(request, 'result'))
807
808 def _onSessionEb(self, failure_, client, request, jingle_elt, session):
809 log.error(u"Error while handling onSessionInfo: {}".format(failure_.value))
810 # XXX: only error managed so far, maybe some applications/transports need more
811 self.sendError(client, 'feature-not-implemented', None, request, 'unsupported-info')
812
813 def onSessionInfo(self, client, request, jingle_elt, session):
814 """Method called when a session-info action is received from other peer
815
816 This method is only called for initiator
817 @param client: %(doc_client)s
818 @param request(domish.Element): full <iq> request
819 @param jingle_elt(domish.Element): the <jingle> element
820 @param session(dict): session data
821 """
822 if not jingle_elt.children:
823 # this is a session ping, see XEP-0166 §6.8
824 client.send(xmlstream.toResponse(request, 'result'))
825 return
826
827 try:
828 # XXX: session-info is most likely only used for application, so we don't call transport plugins
829 # if a future transport use it, this behaviour must be adapted
830 defers = self._callPlugins(client, XEP_0166.A_SESSION_INFO, session, 'jingleSessionInfo', None,
831 elements=False, force_element=jingle_elt)
832 except exceptions.NotFound as e:
833 self._onSessionEb(failure.Failure(e), client, request, jingle_elt, session)
834 return
835
836 dlist = defer.DeferredList(defers, fireOnOneErrback=True)
837 dlist.addCallback(self._onSessionCb, client, request, jingle_elt, session)
838 dlist.addErrback(self._onSessionCb, client, request, jingle_elt, session)
839
840 @defer.inlineCallbacks
841 def onTransportReplace(self, client, request, jingle_elt, session):
842 """A transport change is requested
843
844 The request is parsed, and jingleHandler is called on concerned transport plugin(s)
845 @param client: %(doc_client)s
846 @param request(domish.Element): full <iq> request
847 @param jingle_elt(domish.Element): the <jingle> element
848 @param session(dict): session data
849 """
850 log.debug(u"Other peer wants to replace the transport")
851 try:
852 self._parseElements(jingle_elt, session, request, client, with_application=False)
853 except exceptions.CancelError:
854 defer.returnValue(None)
855
856 client.send(xmlstream.toResponse(request, 'result'))
857
858 content_name = None
859 to_replace = []
860
861 for content_name, content_data in session['contents'].iteritems():
862 try:
863 transport_elt = content_data.pop('transport_elt')
864 except KeyError:
865 continue
866 transport_ns = transport_elt.uri
867 try:
868 transport = self._transports[transport_ns]
869 except KeyError:
870 log.warning(u"Other peer want to replace current transport with an unknown one: {}".format(transport_ns))
871 content_name = None
872 break
873 to_replace.append((content_name, content_data, transport, transport_elt))
874
875 if content_name is None:
876 # wa can't accept the replacement
877 iq_elt, reject_jingle_elt = self._buildJingleElt(client, session, XEP_0166.A_TRANSPORT_REJECT)
878 for child in jingle_elt.children:
879 reject_jingle_elt.addChild(child)
880
881 iq_elt.send()
882 defer.returnValue(None)
883
884 # at this point, everything is alright and we can replace the transport(s)
885 # this is similar to an session-accept action, but for transports only
886 iq_elt, accept_jingle_elt = self._buildJingleElt(client, session, XEP_0166.A_TRANSPORT_ACCEPT)
887 for content_name, content_data, transport, transport_elt in to_replace:
888 # we can now actually replace the transport
889 yield content_data['transport'].handler.jingleHandler(client, XEP_0166.A_DESTROY, session, content_name, None)
890 content_data['transport'] = transport
891 content_data['transport_data'].clear()
892 # and build the element
893 content_elt = accept_jingle_elt.addElement('content')
894 content_elt['name'] = content_name
895 content_elt['creator'] = content_data['creator']
896 # we notify the transport and insert its <transport/> in the answer
897 accept_transport_elt = yield transport.handler.jingleHandler(client, XEP_0166.A_TRANSPORT_REPLACE, session, content_name, transport_elt)
898 content_elt.addChild(accept_transport_elt)
899 # there is no confirmation needed here, so we can directly prepare it
900 yield transport.handler.jingleHandler(client, XEP_0166.A_PREPARE_RESPONDER, session, content_name, None)
901
902 iq_elt.send()
903
904 def onTransportAccept(self, client, request, jingle_elt, session):
905 """Method called once transport replacement is accepted
906
907 @param client: %(doc_client)s
908 @param request(domish.Element): full <iq> request
909 @param jingle_elt(domish.Element): the <jingle> element
910 @param session(dict): session data
911 """
912 log.debug(u"new transport has been accepted")
913
914 try:
915 self._parseElements(jingle_elt, session, request, client, with_application=False)
916 except exceptions.CancelError:
917 return
918
919 # at this point we can send the <iq/> result to confirm reception of the request
920 client.send(xmlstream.toResponse(request, 'result'))
921
922 negociate_defers = []
923 negociate_defers = self._callPlugins(client, XEP_0166.A_TRANSPORT_ACCEPT, session, app_method_name=None)
924
925 negociate_dlist = defer.DeferredList(negociate_defers)
926
927 # after negociations we start the transfer
928 negociate_dlist.addCallback(lambda dummy: self._callPlugins(client, XEP_0166.A_START, session, app_method_name=None, elements=False))
929
930 def onTransportReject(self, client, request, jingle_elt, session):
931 """Method called when a transport replacement is refused
932
933 @param client: %(doc_client)s
934 @param request(domish.Element): full <iq> request
935 @param jingle_elt(domish.Element): the <jingle> element
936 @param session(dict): session data
937 """
938 # XXX: for now, we terminate the session in case of transport-reject
939 # this behaviour may change in the future
940 self.terminate(client, 'failed-transport', session)
941
942 def onTransportInfo(self, client, request, jingle_elt, session):
943 """Method called when a transport-info action is received from other peer
944
945 The request is parsed, and jingleHandler is called on concerned transport plugin(s)
946 @param client: %(doc_client)s
947 @param request(domish.Element): full <iq> request
948 @param jingle_elt(domish.Element): the <jingle> element
949 @param session(dict): session data
950 """
951 log.debug(u"Jingle session {} has been accepted".format(session['id']))
952
953 try:
954 self._parseElements(jingle_elt, session, request, client, with_application=False)
955 except exceptions.CancelError:
956 return
957
958 # The parsing was OK, we send the <iq> result
959 client.send(xmlstream.toResponse(request, 'result'))
960
961 for content_name, content_data in session['contents'].iteritems():
962 try:
963 transport_elt = content_data.pop('transport_elt')
964 except KeyError:
965 continue
966 else:
967 content_data['transport'].handler.jingleHandler(client, XEP_0166.A_TRANSPORT_INFO, session, content_name, transport_elt)
968
969
970 class XEP_0166_handler(xmlstream.XMPPHandler):
971 implements(iwokkel.IDisco)
972
973 def __init__(self, plugin_parent):
974 self.plugin_parent = plugin_parent
975
976 def connectionInitialized(self):
977 self.xmlstream.addObserver(JINGLE_REQUEST, self.plugin_parent._onJingleRequest, client=self.parent)
978
979 def getDiscoInfo(self, requestor, target, nodeIdentifier=''):
980 return [disco.DiscoFeature(NS_JINGLE)]
981
982 def getDiscoItems(self, requestor, target, nodeIdentifier=''):
983 return []