comparison sat/core/xmpp.py @ 2562:26edcf3a30eb

core, setup: huge cleaning: - moved directories from src and frontends/src to sat and sat_frontends, which is the recommanded naming convention - move twisted directory to root - removed all hacks from setup.py, and added missing dependencies, it is now clean - use https URL for website in setup.py - removed "Environment :: X11 Applications :: GTK", as wix is deprecated and removed - renamed sat.sh to sat and fixed its installation - added python_requires to specify Python version needed - replaced glib2reactor which use deprecated code by gtk3reactor sat can now be installed directly from virtualenv without using --system-site-packages anymore \o/
author Goffi <goffi@goffi.org>
date Mon, 02 Apr 2018 19:44:50 +0200
parents src/core/xmpp.py@60758de1c227
children 59ba387c17ea
comparison
equal deleted inserted replaced
2561:bd30dc3ffe5a 2562:26edcf3a30eb
1 #!/usr/bin/env python2
2 # -*- coding: utf-8 -*-
3
4 # SAT: a jabber client
5 # Copyright (C) 2009-2018 Jérôme Poisson (goffi@goffi.org)
6
7 # This program is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU Affero General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
11
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU Affero General Public License for more details.
16
17 # You should have received a copy of the GNU Affero General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20 from sat.core.i18n import _
21 from sat.core.constants import Const as C
22 from sat.memory import cache
23 from twisted.internet import task, defer
24 from twisted.words.protocols.jabber.xmlstream import XMPPHandler
25 from twisted.words.protocols.jabber import xmlstream
26 from twisted.words.protocols.jabber import error
27 from twisted.words.protocols.jabber import jid
28 from twisted.words.xish import domish
29 from twisted.python import failure
30 from wokkel import client as wokkel_client, disco, xmppim, generic, iwokkel
31 from wokkel import component
32 from wokkel import delay
33 from sat.core.log import getLogger
34 log = getLogger(__name__)
35 from sat.core import exceptions
36 from zope.interface import implements
37 import time
38 import calendar
39 import uuid
40 import sys
41
42
43 class SatXMPPEntity(object):
44 """Common code for Client and Component"""
45
46 def __init__(self, host_app, profile, max_retries):
47
48 self.factory.clientConnectionLost = self.connectionLost
49 self.factory.maxRetries = max_retries
50 # when self._connected is None, we are not connected
51 # else, it's a deferred which fire on disconnection
52 self._connected = None
53 self.profile = profile
54 self.host_app = host_app
55 self.cache = cache.Cache(host_app, profile)
56 self._mess_id_uid = {} # map from message id to uid used in history. Key: (full_jid,message_id) Value: uid
57 self.conn_deferred = defer.Deferred()
58 self._progress_cb = {} # callback called when a progress is requested (key = progress id)
59 self.actions = {} # used to keep track of actions for retrieval (key = action_id)
60
61 ## initialisation ##
62
63 @defer.inlineCallbacks
64 def _callConnectionTriggers(self):
65 """Call conneting trigger prepare connected trigger
66
67 @param plugins(iterable): plugins to use
68 @return (list[object, callable]): plugin to trigger tuples with:
69 - plugin instance
70 - profileConnected* triggers (to call after connection)
71 """
72 plugin_conn_cb = []
73 for plugin in self._getPluginsList():
74 # we check if plugin handle client mode
75 if plugin.is_handler:
76 plugin.getHandler(self).setHandlerParent(self)
77
78 # profileConnecting/profileConnected methods handling
79
80 # profile connecting is called right now (before actually starting client)
81 connecting_cb = getattr(plugin, "profileConnecting", None)
82 if connecting_cb is not None:
83 yield connecting_cb(self)
84
85 # profile connected is called after client is ready and roster is got
86 connected_cb = getattr(plugin, "profileConnected", None)
87 if connected_cb is not None:
88 plugin_conn_cb.append((plugin, connected_cb))
89
90 defer.returnValue(plugin_conn_cb)
91
92 def _getPluginsList(self):
93 """Return list of plugin to use
94
95 need to be implemented by subclasses
96 this list is used to call profileConnect* triggers
97 @return(iterable[object]): plugins to use
98 """
99 raise NotImplementedError
100
101 def _createSubProtocols(self):
102 return
103
104 def entityConnected(self):
105 """Called once connection is done
106
107 may return a Deferred, to perform initialisation tasks
108 """
109 return
110
111 @classmethod
112 @defer.inlineCallbacks
113 def startConnection(cls, host, profile, max_retries):
114 """instantiate the entity and start the connection"""
115 # FIXME: reconnection doesn't seems to be handled correclty (client is deleted then recreated from scrash
116 # most of methods called here should be called once on first connection (e.g. adding subprotocols)
117 # but client should not be deleted except if session is finished (independently of connection/deconnection
118 #
119 try:
120 port = int(host.memory.getParamA(C.FORCE_PORT_PARAM, "Connection", profile_key=profile))
121 except ValueError:
122 log.debug(_("Can't parse port value, using default value"))
123 port = None # will use default value 5222 or be retrieved from a DNS SRV record
124
125 password = yield host.memory.asyncGetParamA("Password", "Connection", profile_key=profile)
126 entity = host.profiles[profile] = cls(host, profile,
127 jid.JID(host.memory.getParamA("JabberID", "Connection", profile_key=profile)),
128 password, host.memory.getParamA(C.FORCE_SERVER_PARAM, "Connection", profile_key=profile) or None,
129 port, max_retries)
130
131 entity._createSubProtocols()
132
133 entity.fallBack = SatFallbackHandler(host)
134 entity.fallBack.setHandlerParent(entity)
135
136 entity.versionHandler = SatVersionHandler(C.APP_NAME_FULL,
137 host.full_version)
138 entity.versionHandler.setHandlerParent(entity)
139
140 entity.identityHandler = SatIdentityHandler()
141 entity.identityHandler.setHandlerParent(entity)
142
143 log.debug(_("setting plugins parents"))
144
145 plugin_conn_cb = yield entity._callConnectionTriggers()
146
147 entity.startService()
148
149 yield entity.getConnectionDeferred()
150
151 yield defer.maybeDeferred(entity.entityConnected)
152
153 # Call profileConnected callback for all plugins, and print error message if any of them fails
154 conn_cb_list = []
155 for dummy, callback in plugin_conn_cb:
156 conn_cb_list.append(defer.maybeDeferred(callback, entity))
157 list_d = defer.DeferredList(conn_cb_list)
158
159 def logPluginResults(results):
160 all_succeed = all([success for success, result in results])
161 if not all_succeed:
162 log.error(_(u"Plugins initialisation error"))
163 for idx, (success, result) in enumerate(results):
164 if not success:
165 log.error(u"error (plugin %(name)s): %(failure)s" %
166 {'name': plugin_conn_cb[idx][0]._info['import_name'], 'failure': result})
167
168 yield list_d.addCallback(logPluginResults) # FIXME: we should have a timeout here, and a way to know if a plugin freeze
169 # TODO: mesure launch time of each plugin
170
171 def getConnectionDeferred(self):
172 """Return a deferred which fire when the client is connected"""
173 return self.conn_deferred
174
175 def _disconnectionCb(self, dummy):
176 self._connected = None
177
178 def _disconnectionEb(self, failure_):
179 log.error(_(u"Error while disconnecting: {}".format(failure_)))
180
181 def _authd(self, xmlstream):
182 if not self.host_app.trigger.point("XML Initialized", xmlstream, self.profile):
183 return
184 super(SatXMPPEntity, self)._authd(xmlstream)
185
186 # the following Deferred is used to know when we are connected
187 # so we need to be set it to None when connection is lost
188 self._connected = defer.Deferred()
189 self._connected.addCallback(self._cleanConnection)
190 self._connected.addCallback(self._disconnectionCb)
191 self._connected.addErrback(self._disconnectionEb)
192
193 log.info(_(u"********** [{profile}] CONNECTED **********").format(profile=self.profile))
194 self.streamInitialized()
195 self.host_app.bridge.connected(self.profile, unicode(self.jid)) # we send the signal to the clients
196
197 def _finish_connection(self, dummy):
198 self.conn_deferred.callback(None)
199
200 def streamInitialized(self):
201 """Called after _authd"""
202 log.debug(_(u"XML stream is initialized"))
203 self.keep_alife = task.LoopingCall(self.xmlstream.send, " ") # Needed to avoid disconnection (specially with openfire)
204 self.keep_alife.start(C.XMPP_KEEP_ALIFE)
205
206 self.disco = SatDiscoProtocol(self)
207 self.disco.setHandlerParent(self)
208 self.discoHandler = disco.DiscoHandler()
209 self.discoHandler.setHandlerParent(self)
210 disco_d = defer.succeed(None)
211
212 if not self.host_app.trigger.point("Disco handled", disco_d, self.profile):
213 return
214
215 disco_d.addCallback(self._finish_connection)
216
217 def initializationFailed(self, reason):
218 log.error(_(u"ERROR: XMPP connection failed for profile '%(profile)s': %(reason)s" % {'profile': self.profile, 'reason': reason}))
219 self.conn_deferred.errback(reason.value)
220 try:
221 super(SatXMPPEntity, self).initializationFailed(reason)
222 except:
223 # we already chained an errback, no need to raise an exception
224 pass
225
226 ## connection ##
227
228 def connectionLost(self, connector, reason):
229 try:
230 self.keep_alife.stop()
231 except AttributeError:
232 log.debug(_("No keep_alife"))
233 if self._connected is not None:
234 self.host_app.bridge.disconnected(self.profile) # we send the signal to the clients
235 self._connected.callback(None)
236 self.host_app.purgeEntity(self.profile) # and we remove references to this client
237 log.info(_(u"********** [{profile}] DISCONNECTED **********").format(profile=self.profile))
238 if not self.conn_deferred.called:
239 # FIXME: real error is not gotten here (e.g. if jid is not know by Prosody,
240 # we should have the real error)
241 self.conn_deferred.errback(error.StreamError(u"Server unexpectedly closed the connection"))
242
243 @defer.inlineCallbacks
244 def _cleanConnection(self, dummy):
245 """method called on disconnection
246
247 used to call profileDisconnected* triggers
248 """
249 trigger_name = "profileDisconnected"
250 for plugin in self._getPluginsList():
251 disconnected_cb = getattr(plugin, trigger_name, None)
252 if disconnected_cb is not None:
253 yield disconnected_cb(self)
254
255 def isConnected(self):
256 return self._connected is not None
257
258 def entityDisconnect(self):
259 log.info(_(u"Disconnecting..."))
260 self.stopService()
261 if self._connected is not None:
262 return self._connected
263 else:
264 return defer.succeed(None)
265
266 ## sending ##
267
268 def IQ(self, type_=u'set', timeout=60):
269 """shortcut to create an IQ element managing deferred
270
271 @param type_(unicode): IQ type ('set' or 'get')
272 @param timeout(None, int): timeout in seconds
273 @return((D)domish.Element: result stanza
274 errback is called if and error stanza is returned
275 """
276 iq_elt = xmlstream.IQ(self.xmlstream, type_)
277 iq_elt.timeout = timeout
278 return iq_elt
279
280 def sendError(self, iq_elt, condition):
281 """Send error stanza build from iq_elt
282
283 @param iq_elt(domish.Element): initial IQ element
284 @param condition(unicode): error condition
285 """
286 iq_error_elt = error.StanzaError(condition).toResponse(iq_elt)
287 self.xmlstream.send(iq_error_elt)
288
289 def generateMessageXML(self, data):
290 """Generate <message/> stanza from message data
291
292 @param data(dict): message data
293 domish element will be put in data['xml']
294 following keys are needed:
295 - from
296 - to
297 - uid: can be set to '' if uid attribute is not wanted
298 - message
299 - type
300 - subject
301 - extra
302 @return (dict) message data
303 """
304 data['xml'] = message_elt = domish.Element((None, 'message'))
305 message_elt["to"] = data["to"].full()
306 message_elt["from"] = data['from'].full()
307 message_elt["type"] = data["type"]
308 if data['uid']: # key must be present but can be set to ''
309 # by a plugin to avoid id on purpose
310 message_elt['id'] = data['uid']
311 for lang, subject in data["subject"].iteritems():
312 subject_elt = message_elt.addElement("subject", content=subject)
313 if lang:
314 subject_elt[(C.NS_XML, 'lang')] = lang
315 for lang, message in data["message"].iteritems():
316 body_elt = message_elt.addElement("body", content=message)
317 if lang:
318 body_elt[(C.NS_XML, 'lang')] = lang
319 try:
320 thread = data['extra']['thread']
321 except KeyError:
322 if 'thread_parent' in data['extra']:
323 raise exceptions.InternalError(u"thread_parent found while there is not associated thread")
324 else:
325 thread_elt = message_elt.addElement("thread", content=thread)
326 try:
327 thread_elt["parent"] = data["extra"]["thread_parent"]
328 except KeyError:
329 pass
330 return data
331
332 def addPostXmlCallbacks(self, post_xml_treatments):
333 """Used to add class level callbacks at the end of the workflow
334
335 @param post_xml_treatments(D): the same Deferred as in sendMessage trigger
336 """
337 raise NotImplementedError
338
339 def sendMessage(self, to_jid, message, subject=None, mess_type='auto', extra=None, uid=None, no_trigger=False):
340 """Send a message to an entity
341
342 @param to_jid(jid.JID): destinee of the message
343 @param message(dict): message body, key is the language (use '' when unknown)
344 @param subject(dict): message subject, key is the language (use '' when unknown)
345 @param mess_type(str): one of standard message type (cf RFC 6121 §5.2.2) or:
346 - auto: for automatic type detection
347 - info: for information ("info_type" can be specified in extra)
348 @param extra(dict, None): extra data. Key can be:
349 - info_type: information type, can be
350 TODO
351 @param uid(unicode, None): unique id:
352 should be unique at least in this XMPP session
353 if None, an uuid will be generated
354 @param no_trigger (bool): if True, sendMessage[suffix] trigger will no be used
355 useful when a message need to be sent without any modification
356 """
357 if subject is None:
358 subject = {}
359 if extra is None:
360 extra = {}
361
362 assert mess_type in C.MESS_TYPE_ALL
363
364 data = { # dict is similar to the one used in client.onMessage
365 "from": self.jid,
366 "to": to_jid,
367 "uid": uid or unicode(uuid.uuid4()),
368 "message": message,
369 "subject": subject,
370 "type": mess_type,
371 "extra": extra,
372 "timestamp": time.time(),
373 }
374 pre_xml_treatments = defer.Deferred() # XXX: plugin can add their pre XML treatments to this deferred
375 post_xml_treatments = defer.Deferred() # XXX: plugin can add their post XML treatments to this deferred
376
377 if data["type"] == C.MESS_TYPE_AUTO:
378 # we try to guess the type
379 if data["subject"]:
380 data["type"] = C.MESS_TYPE_NORMAL
381 elif not data["to"].resource: # if to JID has a resource, the type is not 'groupchat'
382 # we may have a groupchat message, we check if the we know this jid
383 try:
384 entity_type = self.host_app.memory.getEntityData(data["to"], ['type'], self.profile)["type"]
385 #FIXME: should entity_type manage resources ?
386 except (exceptions.UnknownEntityError, KeyError):
387 entity_type = "contact"
388
389 if entity_type == "chatroom":
390 data["type"] = C.MESS_TYPE_GROUPCHAT
391 else:
392 data["type"] = C.MESS_TYPE_CHAT
393 else:
394 data["type"] == C.MESS_TYPE_CHAT
395 data["type"] == C.MESS_TYPE_CHAT if data["subject"] else C.MESS_TYPE_NORMAL
396
397 # FIXME: send_only is used by libervia's OTR plugin to avoid
398 # the triggers from frontend, and no_trigger do the same
399 # thing internally, this could be unified
400 send_only = data['extra'].get('send_only', False)
401
402 if not no_trigger and not send_only:
403 if not self.host_app.trigger.point("sendMessage" + self.trigger_suffix, self, data, pre_xml_treatments, post_xml_treatments):
404 return defer.succeed(None)
405
406 log.debug(_(u"Sending message (type {type}, to {to})").format(type=data["type"], to=to_jid.full()))
407
408 pre_xml_treatments.addCallback(lambda dummy: self.generateMessageXML(data))
409 pre_xml_treatments.chainDeferred(post_xml_treatments)
410 post_xml_treatments.addCallback(self.sendMessageData)
411 if send_only:
412 log.debug(_("Triggers, storage and echo have been inhibited by the 'send_only' parameter"))
413 else:
414 self.addPostXmlCallbacks(post_xml_treatments)
415 post_xml_treatments.addErrback(self._cancelErrorTrap)
416 post_xml_treatments.addErrback(self.host_app.logErrback)
417 pre_xml_treatments.callback(data)
418 return pre_xml_treatments
419
420 def _cancelErrorTrap(self, failure):
421 """A message sending can be cancelled by a plugin treatment"""
422 failure.trap(exceptions.CancelError)
423
424 def messageAddToHistory(self, data):
425 """Store message into database (for local history)
426
427 @param data: message data dictionnary
428 @param client: profile's client
429 """
430 if data[u"type"] != C.MESS_TYPE_GROUPCHAT:
431 # we don't add groupchat message to history, as we get them back
432 # and they will be added then
433 if data[u'message'] or data[u'subject']: # we need a message to store
434 self.host_app.memory.addToHistory(self, data)
435 else:
436 log.warning(u"No message found") # empty body should be managed by plugins before this point
437 return data
438
439 def messageSendToBridge(self, data):
440 """Send message to bridge, so frontends can display it
441
442 @param data: message data dictionnary
443 @param client: profile's client
444 """
445 if data[u"type"] != C.MESS_TYPE_GROUPCHAT:
446 # we don't send groupchat message to bridge, as we get them back
447 # and they will be added the
448 if data[u'message'] or data[u'subject']: # we need a message to send something
449 # We send back the message, so all frontends are aware of it
450 self.host_app.bridge.messageNew(data[u'uid'], data[u'timestamp'], data[u'from'].full(), data[u'to'].full(), data[u'message'], data[u'subject'], data[u'type'], data[u'extra'], profile=self.profile)
451 else:
452 log.warning(_(u"No message found"))
453 return data
454
455
456 class SatXMPPClient(SatXMPPEntity, wokkel_client.XMPPClient):
457 implements(iwokkel.IDisco)
458 trigger_suffix = ""
459 is_component = False
460
461 def __init__(self, host_app, profile, user_jid, password, host=None, port=C.XMPP_C2S_PORT, max_retries=C.XMPP_MAX_RETRIES):
462 # XXX: DNS SRV records are checked when the host is not specified.
463 # If no SRV record is found, the host is directly extracted from the JID.
464 self.started = time.time()
465
466 # Currently, we use "client/pc/Salut à Toi", but as
467 # SàT is multi-frontends and can be used on mobile devices, as a bot, with a web frontend,
468 # etc., we should implement a way to dynamically update identities through the bridge
469 self.identities = [disco.DiscoIdentity(u"client", u"pc", C.APP_NAME)]
470 if sys.platform == "android":
471 # FIXME: temporary hack as SRV is not working on android
472 # TODO: remove this hack and fix SRV
473 log.info(u"FIXME: Android hack, ignoring SRV")
474 host = user_jid.host
475
476 hosts_map = host_app.memory.getConfig(None, "hosts_dict", {})
477 if host is None and user_jid.host in hosts_map:
478 host_data = hosts_map[user_jid.host]
479 if isinstance(host_data, basestring):
480 host = host_data
481 elif isinstance(host_data, dict):
482 if u'host' in host_data:
483 host = host_data[u'host']
484 if u'port' in host_data:
485 port = host_data[u'port']
486 else:
487 log.warning(_(u"invalid data used for host: {data}").format(data=host_data))
488 host_data = None
489 if host_data is not None:
490 log.info(u"using {host}:{port} for host {host_ori} as requested in config".format(
491 host_ori = user_jid.host,
492 host = host,
493 port = port))
494
495 wokkel_client.XMPPClient.__init__(self, user_jid, password, host or None, port or C.XMPP_C2S_PORT)
496 SatXMPPEntity.__init__(self, host_app, profile, max_retries)
497
498 def _getPluginsList(self):
499 for p in self.host_app.plugins.itervalues():
500 if C.PLUG_MODE_CLIENT in p._info[u'modes']:
501 yield p
502
503 def _createSubProtocols(self):
504 self.messageProt = SatMessageProtocol(self.host_app)
505 self.messageProt.setHandlerParent(self)
506
507 self.roster = SatRosterProtocol(self.host_app)
508 self.roster.setHandlerParent(self)
509
510 self.presence = SatPresenceProtocol(self.host_app)
511 self.presence.setHandlerParent(self)
512
513 def entityConnected(self):
514 # we want to be sure that we got the roster
515 return self.roster.got_roster
516
517 def addPostXmlCallbacks(self, post_xml_treatments):
518 post_xml_treatments.addCallback(self.messageAddToHistory)
519 post_xml_treatments.addCallback(self.messageSendToBridge)
520
521 def send(self, obj):
522 # original send method accept string
523 # but we restrict to domish.Element to make trigger treatments easier
524 assert isinstance(obj, domish.Element)
525 # XXX: this trigger is the last one before sending stanza on wire
526 # it is intended for things like end 2 end encryption.
527 # *DO NOT* cancel (i.e. return False) without very good reason
528 # (out of band transmission for instance).
529 # e2e should have a priority of 0 here, and out of band transmission
530 # a lower priority
531 # FIXME: trigger not used yet, can be uncommented when e2e full stanza encryption is implemented
532 # if not self.host_app.trigger.point("send", self, obj):
533 #  return
534 super(SatXMPPClient, self).send(obj)
535
536 def sendMessageData(self, mess_data):
537 """Convenient method to send message data to stream
538
539 This method will send mess_data[u'xml'] to stream, but a trigger is there
540 The trigger can't be cancelled, it's a good place for e2e encryption which
541 don't handle full stanza encryption
542 @param mess_data(dict): message data as constructed by onMessage workflow
543 @return (dict): mess_data (so it can be used in a deferred chain)
544 """
545 # XXX: This is the last trigger before u"send" (last but one globally) for sending message.
546 # This is intented for e2e encryption which doesn't do full stanza encryption (e.g. OTR)
547 # This trigger point can't cancel the method
548 self.host_app.trigger.point("sendMessageData", self, mess_data)
549 self.send(mess_data[u'xml'])
550 return mess_data
551
552 def feedback(self, to_jid, message):
553 """Send message to frontends
554
555 This message will be an info message, not recorded in history.
556 It can be used to give feedback of a command
557 @param to_jid(jid.JID): destinee jid
558 @param message(unicode): message to send to frontends
559 """
560 self.host_app.bridge.messageNew(uid=unicode(uuid.uuid4()),
561 timestamp=time.time(),
562 from_jid=self.jid.full(),
563 to_jid=to_jid.full(),
564 message={u'': message},
565 subject={},
566 mess_type=C.MESS_TYPE_INFO,
567 extra={},
568 profile=self.profile)
569
570 def _finish_connection(self, dummy):
571 self.roster.requestRoster()
572 self.presence.available()
573 super(SatXMPPClient, self)._finish_connection(dummy)
574
575
576 class SatXMPPComponent(SatXMPPEntity, component.Component):
577 """XMPP component
578
579 This component are similar but not identical to clients.
580 An entry point plugin is launched after component is connected.
581 Component need to instantiate MessageProtocol itself
582 """
583 implements(iwokkel.IDisco)
584 trigger_suffix = "Component" # used for to distinguish some trigger points set in SatXMPPEntity
585 is_component = True
586 sendHistory = False # XXX: set to True from entry plugin to keep messages in history for received messages
587
588 def __init__(self, host_app, profile, component_jid, password, host=None, port=None, max_retries=C.XMPP_MAX_RETRIES):
589 self.started = time.time()
590 if port is None:
591 port = C.XMPP_COMPONENT_PORT
592
593 ## entry point ##
594 entry_point = host_app.memory.getEntryPoint(profile)
595 try:
596 self.entry_plugin = host_app.plugins[entry_point]
597 except KeyError:
598 raise exceptions.NotFound(_(u"The requested entry point ({entry_point}) is not available").format(
599 entry_point = entry_point))
600
601 self.identities = [disco.DiscoIdentity(u"component", u"generic", C.APP_NAME)]
602 # jid is set automatically on bind by Twisted for Client, but not for Component
603 self.jid = component_jid
604 if host is None:
605 try:
606 host = component_jid.host.split(u'.', 1)[1]
607 except IndexError:
608 raise ValueError(u"Can't guess host from jid, please specify a host")
609 # XXX: component.Component expect unicode jid, while Client expect jid.JID.
610 # this is not consistent, so we use jid.JID for SatXMPP*
611 component.Component.__init__(self, host, port, component_jid.full(), password)
612 SatXMPPEntity.__init__(self, host_app, profile, max_retries)
613
614 def _buildDependencies(self, current, plugins, required=True):
615 """build recursively dependencies needed for a plugin
616
617 this method build list of plugin needed for a component and raises
618 errors if they are not available or not allowed for components
619 @param current(object): parent plugin to check
620 use entry_point for first call
621 @param plugins(list): list of validated plugins, will be filled by the method
622 give an empty list for first call
623 @param required(bool): True if plugin is mandatory
624 for recursive calls only, should not be modified by inital caller
625 @raise InternalError: one of the plugin is not handling components
626 @raise KeyError: one plugin should be present in self.host_app.plugins but it is not
627 """
628 if C.PLUG_MODE_COMPONENT not in current._info[u'modes']:
629 if not required:
630 return
631 else:
632 log.error(_(u"Plugin {current_name} is needed for {entry_name}, but it doesn't handle component mode").format(
633 current_name = current._info[u'import_name'],
634 entry_name = self.entry_plugin._info[u'import_name']
635 ))
636 raise exceptions.InternalError(_(u"invalid plugin mode"))
637
638 for import_name in current._info.get(C.PI_DEPENDENCIES, []):
639 # plugins are already loaded as dependencies
640 # so we know they are in self.host_app.plugins
641 dep = self.host_app.plugins[import_name]
642 self._buildDependencies(dep, plugins)
643
644 for import_name in current._info.get(C.PI_RECOMMENDATIONS, []):
645 # here plugins are only recommendations,
646 # so they may not exist in self.host_app.plugins
647 try:
648 dep = self.host_app.plugins[import_name]
649 except KeyError:
650 continue
651 self._buildDependencies(dep, plugins, required = False)
652
653 if current not in plugins:
654 # current can be required for several plugins and so
655 # it can already be present in the list
656 plugins.append(current)
657
658 def _getPluginsList(self):
659 # XXX: for component we don't launch all plugins triggers
660 # but only the ones from which there is a dependency
661 plugins = []
662 self._buildDependencies(self.entry_plugin, plugins)
663 return plugins
664
665 def entityConnected(self):
666 # we can now launch entry point
667 try:
668 start_cb = self.entry_plugin.componentStart
669 except AttributeError:
670 return
671 else:
672 return start_cb(self)
673
674 def addPostXmlCallbacks(self, post_xml_treatments):
675 if self.sendHistory:
676 post_xml_treatments.addCallback(self.messageAddToHistory)
677
678
679 class SatMessageProtocol(xmppim.MessageProtocol):
680
681 def __init__(self, host):
682 xmppim.MessageProtocol.__init__(self)
683 self.host = host
684
685 @staticmethod
686 def parseMessage(message_elt, client=None):
687 """parse a message XML and return message_data
688
689 @param message_elt(domish.Element): raw <message> xml
690 @param client(SatXMPPClient, None): client to map message id to uid
691 if None, mapping will not be done
692 @return(dict): message data
693 """
694 message = {}
695 subject = {}
696 extra = {}
697 data = {"from": jid.JID(message_elt['from']),
698 "to": jid.JID(message_elt['to']),
699 "uid": message_elt.getAttribute('uid', unicode(uuid.uuid4())), # XXX: uid is not a standard attribute but may be added by plugins
700 "message": message,
701 "subject": subject,
702 "type": message_elt.getAttribute('type', 'normal'),
703 "extra": extra}
704
705 if client is not None:
706 try:
707 data['stanza_id'] = message_elt['id']
708 except KeyError:
709 pass
710 else:
711 client._mess_id_uid[(data['from'], data['stanza_id'])] = data['uid']
712
713 # message
714 for e in message_elt.elements(C.NS_CLIENT, 'body'):
715 message[e.getAttribute((C.NS_XML,'lang'),'')] = unicode(e)
716
717 # subject
718 for e in message_elt.elements(C.NS_CLIENT, 'subject'):
719 subject[e.getAttribute((C.NS_XML, 'lang'),'')] = unicode(e)
720
721 # delay and timestamp
722 try:
723 delay_elt = message_elt.elements(delay.NS_DELAY, 'delay').next()
724 except StopIteration:
725 data['timestamp'] = time.time()
726 else:
727 parsed_delay = delay.Delay.fromElement(delay_elt)
728 data['timestamp'] = calendar.timegm(parsed_delay.stamp.utctimetuple())
729 data['received_timestamp'] = unicode(time.time())
730 if parsed_delay.sender:
731 data['delay_sender'] = parsed_delay.sender.full()
732 return data
733
734 def onMessage(self, message_elt):
735 # TODO: handle threads
736 client = self.parent
737 if not 'from' in message_elt.attributes:
738 message_elt['from'] = client.jid.host
739 log.debug(_(u"got message from: {from_}").format(from_=message_elt['from']))
740 post_treat = defer.Deferred() # XXX: plugin can add their treatments to this deferred
741
742 if not self.host.trigger.point("MessageReceived", client, message_elt, post_treat):
743 return
744
745 data = self.parseMessage(message_elt, client)
746
747 post_treat.addCallback(self.skipEmptyMessage)
748 post_treat.addCallback(self.addToHistory, client)
749 post_treat.addCallback(self.bridgeSignal, client, data)
750 post_treat.addErrback(self.cancelErrorTrap)
751 post_treat.callback(data)
752
753 def skipEmptyMessage(self, data):
754 if not data['message'] and not data['extra'] and not data['subject']:
755 raise failure.Failure(exceptions.CancelError("Cancelled empty message"))
756 return data
757
758 def addToHistory(self, data, client):
759 if data.pop(u'history', None) == C.HISTORY_SKIP:
760 log.info(u'history is skipped as requested')
761 data[u'extra'][u'history'] = C.HISTORY_SKIP
762 else:
763 return self.host.memory.addToHistory(client, data)
764
765 def bridgeSignal(self, dummy, client, data):
766 try:
767 data['extra']['received_timestamp'] = data['received_timestamp']
768 data['extra']['delay_sender'] = data['delay_sender']
769 except KeyError:
770 pass
771 if data is not None:
772 self.host.bridge.messageNew(data['uid'], data['timestamp'], data['from'].full(), data['to'].full(), data['message'], data['subject'], data['type'], data['extra'], profile=client.profile)
773 return data
774
775 def cancelErrorTrap(self, failure_):
776 """A message sending can be cancelled by a plugin treatment"""
777 failure_.trap(exceptions.CancelError)
778
779
780 class SatRosterProtocol(xmppim.RosterClientProtocol):
781
782 def __init__(self, host):
783 xmppim.RosterClientProtocol.__init__(self)
784 self.host = host
785 self.got_roster = defer.Deferred() # called when roster is received and ready
786 #XXX: the two following dicts keep a local copy of the roster
787 self._groups = {} # map from groups to jids: key=group value=set of jids
788 self._jids = None # map from jids to RosterItem: key=jid value=RosterItem
789
790 def rosterCb(self, roster):
791 assert roster is not None # FIXME: must be managed with roster versioning
792 self._groups.clear()
793 self._jids = roster
794 for item in roster.itervalues():
795 if not item.subscriptionTo and not item.subscriptionFrom and not item.ask:
796 #XXX: current behaviour: we don't want contact in our roster list
797 # if there is no presence subscription
798 # may change in the future
799 log.info(u"Removing contact {} from roster because there is no presence subscription".format(item.jid))
800 self.removeItem(item.entity) # FIXME: to be checked
801 else:
802 self._registerItem(item)
803
804 def _registerItem(self, item):
805 """Register item in local cache
806
807 item must be already registered in self._jids before this method is called
808 @param item (RosterIem): item added
809 """
810 log.debug(u"registering item: {}".format(item.entity.full()))
811 if item.entity.resource:
812 log.warning(u"Received a roster item with a resource, this is not common but not restricted by RFC 6121, this case may be not well tested.")
813 if not item.subscriptionTo:
814 if not item.subscriptionFrom:
815 log.info(_(u"There's no subscription between you and [{}]!").format(item.entity.full()))
816 else:
817 log.info(_(u"You are not subscribed to [{}]!").format(item.entity.full()))
818 if not item.subscriptionFrom:
819 log.info(_(u"[{}] is not subscribed to you!").format(item.entity.full()))
820
821 for group in item.groups:
822 self._groups.setdefault(group, set()).add(item.entity)
823
824 def requestRoster(self):
825 """ ask the server for Roster list """
826 log.debug("requestRoster")
827 d = self.getRoster().addCallback(self.rosterCb)
828 d.chainDeferred(self.got_roster)
829
830 def removeItem(self, to_jid):
831 """Remove a contact from roster list
832 @param to_jid: a JID instance
833 @return: Deferred
834 """
835 return xmppim.RosterClientProtocol.removeItem(self, to_jid)
836
837 def getAttributes(self, item):
838 """Return dictionary of attributes as used in bridge from a RosterItem
839
840 @param item: RosterItem
841 @return: dictionary of attributes
842 """
843 item_attr = {'to': unicode(item.subscriptionTo),
844 'from': unicode(item.subscriptionFrom),
845 'ask': unicode(item.ask)
846 }
847 if item.name:
848 item_attr['name'] = item.name
849 return item_attr
850
851 def setReceived(self, request):
852 #TODO: implement roster versioning (cf RFC 6121 §2.6)
853 item = request.item
854 try: # update the cache for the groups the contact has been removed from
855 left_groups = set(self._jids[item.entity].groups).difference(item.groups)
856 for group in left_groups:
857 jids_set = self._groups[group]
858 jids_set.remove(item.entity)
859 if not jids_set:
860 del self._groups[group]
861 except KeyError:
862 pass # no previous item registration (or it's been cleared)
863 self._jids[item.entity] = item
864 self._registerItem(item)
865 self.host.bridge.newContact(item.entity.full(), self.getAttributes(item), item.groups, self.parent.profile)
866
867 def removeReceived(self, request):
868 entity = request.item.entity
869 log.info(u"removing %s from roster list" % entity.full())
870
871 # we first remove item from local cache (self._groups and self._jids)
872 try:
873 item = self._jids.pop(entity)
874 except KeyError:
875 log.error(u"Received a roster remove event for an item not in cache ({})".format(entity))
876 return
877 for group in item.groups:
878 try:
879 jids_set = self._groups[group]
880 jids_set.remove(entity)
881 if not jids_set:
882 del self._groups[group]
883 except KeyError:
884 log.warning(u"there is no cache for the group [%(group)s] of the removed roster item [%(jid)s]" %
885 {"group": group, "jid": entity})
886
887 # then we send the bridge signal
888 self.host.bridge.contactDeleted(entity.full(), self.parent.profile)
889
890 def getGroups(self):
891 """Return a list of groups"""
892 return self._groups.keys()
893
894 def getItem(self, entity_jid):
895 """Return RosterItem for a given jid
896
897 @param entity_jid(jid.JID): jid of the contact
898 @return(RosterItem, None): RosterItem instance
899 None if contact is not in cache
900 """
901 return self._jids.get(entity_jid, None)
902
903 def getJids(self):
904 """Return all jids of the roster"""
905 return self._jids.keys()
906
907 def isJidInRoster(self, entity_jid):
908 """Return True if jid is in roster"""
909 return entity_jid in self._jids
910
911 def isPresenceAuthorised(self, entity_jid):
912 """Return True if entity is authorised to see our presence"""
913 try:
914 item = self._jids[entity_jid.userhostJID()]
915 except KeyError:
916 return False
917 return item.subscriptionFrom
918
919 def getItems(self):
920 """Return all items of the roster"""
921 return self._jids.values()
922
923 def getJidsFromGroup(self, group):
924 try:
925 return self._groups[group]
926 except KeyError:
927 raise exceptions.UnknownGroupError(group)
928
929 def getJidsSet(self, type_, groups=None):
930 """Helper method to get a set of jids
931
932 @param type_(unicode): one of:
933 C.ALL: get all jids from roster
934 C.GROUP: get jids from groups (listed in "groups")
935 @groups(list[unicode]): list of groups used if type_==C.GROUP
936 @return (set(jid.JID)): set of selected jids
937 """
938 if type_ == C.ALL and groups is not None:
939 raise ValueError('groups must not be set for {} type'.format(C.ALL))
940
941 if type_ == C.ALL:
942 return set(self.getJids())
943 elif type_ == C.GROUP:
944 jids = set()
945 for group in groups:
946 jids.update(self.getJidsFromGroup(group))
947 return jids
948 else:
949 raise ValueError(u'Unexpected type_ {}'.format(type_))
950
951 def getNick(self, entity_jid):
952 """Return a nick name for an entity
953
954 return nick choosed by user if available
955 else return user part of entity_jid
956 """
957 item = self.getItem(entity_jid)
958 if item is None:
959 return entity_jid.user
960 else:
961 return item.name or entity_jid.user
962
963
964 class SatPresenceProtocol(xmppim.PresenceClientProtocol):
965
966 def __init__(self, host):
967 xmppim.PresenceClientProtocol.__init__(self)
968 self.host = host
969
970 def send(self, obj):
971 presence_d = defer.succeed(None)
972 if not self.host.trigger.point("Presence send", self.parent, obj, presence_d):
973 return
974 presence_d.addCallback(lambda __: super(SatPresenceProtocol, self).send(obj))
975
976 def availableReceived(self, entity, show=None, statuses=None, priority=0):
977 log.debug(_(u"presence update for [%(entity)s] (available, show=%(show)s statuses=%(statuses)s priority=%(priority)d)") % {'entity': entity, C.PRESENCE_SHOW: show, C.PRESENCE_STATUSES: statuses, C.PRESENCE_PRIORITY: priority})
978
979 if not statuses:
980 statuses = {}
981
982 if None in statuses: # we only want string keys
983 statuses[C.PRESENCE_STATUSES_DEFAULT] = statuses.pop(None)
984
985 if not self.host.trigger.point("presenceReceived", entity, show, priority, statuses, self.parent.profile):
986 return
987
988 self.host.memory.setPresenceStatus(entity, show or "",
989 int(priority), statuses,
990 self.parent.profile)
991
992 # now it's time to notify frontends
993 self.host.bridge.presenceUpdate(entity.full(), show or "",
994 int(priority), statuses,
995 self.parent.profile)
996
997 def unavailableReceived(self, entity, statuses=None):
998 log.debug(_(u"presence update for [%(entity)s] (unavailable, statuses=%(statuses)s)") % {'entity': entity, C.PRESENCE_STATUSES: statuses})
999
1000 if not statuses:
1001 statuses = {}
1002
1003 if None in statuses: # we only want string keys
1004 statuses[C.PRESENCE_STATUSES_DEFAULT] = statuses.pop(None)
1005
1006 if not self.host.trigger.point("presenceReceived", entity, "unavailable", 0, statuses, self.parent.profile):
1007 return
1008
1009 # now it's time to notify frontends
1010 # if the entity is not known yet in this session or is already unavailable, there is no need to send an unavailable signal
1011 try:
1012 presence = self.host.memory.getEntityDatum(entity, "presence", self.parent.profile)
1013 except (KeyError, exceptions.UnknownEntityError):
1014 # the entity has not been seen yet in this session
1015 pass
1016 else:
1017 if presence.show != C.PRESENCE_UNAVAILABLE:
1018 self.host.bridge.presenceUpdate(entity.full(), C.PRESENCE_UNAVAILABLE, 0, statuses, self.parent.profile)
1019
1020 self.host.memory.setPresenceStatus(entity, C.PRESENCE_UNAVAILABLE, 0, statuses, self.parent.profile)
1021
1022 def available(self, entity=None, show=None, statuses=None, priority=None):
1023 """Set a presence and statuses.
1024
1025 @param entity (jid.JID): entity
1026 @param show (unicode): value in ('unavailable', '', 'away', 'xa', 'chat', 'dnd')
1027 @param statuses (dict{unicode: unicode}): multilingual statuses with
1028 the entry key beeing a language code on 2 characters or "default".
1029 """
1030 if priority is None:
1031 try:
1032 priority = int(self.host.memory.getParamA("Priority", "Connection", profile_key=self.parent.profile))
1033 except ValueError:
1034 priority = 0
1035
1036 if statuses is None:
1037 statuses = {}
1038
1039 # default for us is None for wokkel
1040 # so we must temporarily switch to wokkel's convention...
1041 if C.PRESENCE_STATUSES_DEFAULT in statuses:
1042 statuses[None] = statuses.pop(C.PRESENCE_STATUSES_DEFAULT)
1043
1044 presence_elt = xmppim.AvailablePresence(entity, show, statuses, priority)
1045
1046 # ... before switching back
1047 if None in statuses:
1048 statuses['default'] = statuses.pop(None)
1049
1050 if not self.host.trigger.point("presence_available", presence_elt, self.parent):
1051 return
1052 self.send(presence_elt)
1053
1054 @defer.inlineCallbacks
1055 def subscribed(self, entity):
1056 yield self.parent.roster.got_roster
1057 xmppim.PresenceClientProtocol.subscribed(self, entity)
1058 self.host.memory.delWaitingSub(entity.userhost(), self.parent.profile)
1059 item = self.parent.roster.getItem(entity)
1060 if not item or not item.subscriptionTo: # we automatically subscribe to 'to' presence
1061 log.debug(_('sending automatic "from" subscription request'))
1062 self.subscribe(entity)
1063
1064 def unsubscribed(self, entity):
1065 xmppim.PresenceClientProtocol.unsubscribed(self, entity)
1066 self.host.memory.delWaitingSub(entity.userhost(), self.parent.profile)
1067
1068 def subscribedReceived(self, entity):
1069 log.debug(_(u"subscription approved for [%s]") % entity.userhost())
1070 self.host.bridge.subscribe('subscribed', entity.userhost(), self.parent.profile)
1071
1072 def unsubscribedReceived(self, entity):
1073 log.debug(_(u"unsubscription confirmed for [%s]") % entity.userhost())
1074 self.host.bridge.subscribe('unsubscribed', entity.userhost(), self.parent.profile)
1075
1076 @defer.inlineCallbacks
1077 def subscribeReceived(self, entity):
1078 log.debug(_(u"subscription request from [%s]") % entity.userhost())
1079 yield self.parent.roster.got_roster
1080 item = self.parent.roster.getItem(entity)
1081 if item and item.subscriptionTo:
1082 # We automatically accept subscription if we are already subscribed to contact presence
1083 log.debug(_('sending automatic subscription acceptance'))
1084 self.subscribed(entity)
1085 else:
1086 self.host.memory.addWaitingSub('subscribe', entity.userhost(), self.parent.profile)
1087 self.host.bridge.subscribe('subscribe', entity.userhost(), self.parent.profile)
1088
1089 @defer.inlineCallbacks
1090 def unsubscribeReceived(self, entity):
1091 log.debug(_(u"unsubscription asked for [%s]") % entity.userhost())
1092 yield self.parent.roster.got_roster
1093 item = self.parent.roster.getItem(entity)
1094 if item and item.subscriptionFrom: # we automatically remove contact
1095 log.debug(_('automatic contact deletion'))
1096 self.host.delContact(entity, self.parent.profile)
1097 self.host.bridge.subscribe('unsubscribe', entity.userhost(), self.parent.profile)
1098
1099
1100 class SatDiscoProtocol(disco.DiscoClientProtocol):
1101 def __init__(self, host):
1102 disco.DiscoClientProtocol.__init__(self)
1103
1104
1105 class SatFallbackHandler(generic.FallbackHandler):
1106 def __init__(self, host):
1107 generic.FallbackHandler.__init__(self)
1108
1109 def iqFallback(self, iq):
1110 if iq.handled is True:
1111 return
1112 log.debug(u"iqFallback: xml = [%s]" % (iq.toXml()))
1113 generic.FallbackHandler.iqFallback(self, iq)
1114
1115
1116 class SatVersionHandler(generic.VersionHandler):
1117
1118 def getDiscoInfo(self, requestor, target, node):
1119 #XXX: We need to work around wokkel's behaviour (namespace not added if there is a
1120 # node) as it cause issues with XEP-0115 & PEP (XEP-0163): there is a node when server
1121 # ask for disco info, and not when we generate the key, so the hash is used with different
1122 # disco features, and when the server (seen on ejabberd) generate its own hash for security check
1123 # it reject our features (resulting in e.g. no notification on PEP)
1124 return generic.VersionHandler.getDiscoInfo(self, requestor, target, None)
1125
1126
1127 class SatIdentityHandler(XMPPHandler):
1128 """ Manage disco Identity of SàT.
1129
1130 """
1131 #TODO: dynamic identity update (see docstring). Note that a XMPP entity can have several identities
1132 implements(iwokkel.IDisco)
1133
1134 def getDiscoInfo(self, requestor, target, nodeIdentifier=''):
1135 return self.parent.identities
1136
1137 def getDiscoItems(self, requestor, target, nodeIdentifier=''):
1138 return []