comparison src/core/xmpp.py @ 2144:1d3f73e065e1

core, jp: component handling + client handling refactoring: - SàT can now handle components - plugin have now a "modes" key in PLUGIN_INFO where they declare if they can be used with clients and or components. They default to be client only. - components are really similar to clients, but with some changes in behaviour: * component has "entry point", which is a special plugin with a componentStart method, which is called just after component is connected * trigger end with a different suffixes (e.g. profileConnected vs profileConnectedComponent), so a plugin which manage both clients and components can have different workflow * for clients, only triggers of plugins handling client mode are launched * for components, only triggers of plugins needed in dependencies are launched. They all must handle component mode. * component have a sendHistory attribute (False by default) which can be set to True to allow saving sent messages into history * for convenience, "client" is still used in method even if it can now be a component * a new "component" boolean attribute tells if we have a component or a client * components have to add themselve Message protocol * roster and presence protocols are not added for components * component default port is 5347 (which is Prosody's default port) - asyncCreateProfile has been renamed for profileCreate, both to follow new naming convention and to prepare the transition to fully asynchronous bridge - createProfile has a new "component" attribute. When used to create a component, it must be set to a component entry point - jp: added --component argument to profile/create - disconnect bridge method is now asynchronous, this way frontends can know when disconnection is finished - new PI_* constants for PLUGIN_INFO values (not used everywhere yet) - client/component connection workflow has been moved to their classes instead of being a host methods - host.messageSend is now client.sendMessage, and former client.sendMessage is now client.sendMessageData. - identities are now handled in client.identities list, so it can be updated dynamically by plugins (in the future, frontends should be able to update them too through bridge) - profileConnecting* profileConnected* profileDisconnected* and getHandler now all use client instead of profile
author Goffi <goffi@goffi.org>
date Sun, 12 Feb 2017 17:55:43 +0100
parents f8401024ab28
children 545a1261ac3b
comparison
equal deleted inserted replaced
2143:c3cac21157d4 2144:1d3f73e065e1
26 from twisted.words.protocols.jabber import error 26 from twisted.words.protocols.jabber import error
27 from twisted.words.protocols.jabber import jid 27 from twisted.words.protocols.jabber import jid
28 from twisted.words.xish import domish 28 from twisted.words.xish import domish
29 from twisted.python import failure 29 from twisted.python import failure
30 from wokkel import client as wokkel_client, disco, xmppim, generic, iwokkel 30 from wokkel import client as wokkel_client, disco, xmppim, generic, iwokkel
31 from wokkel import component
31 from wokkel import delay 32 from wokkel import delay
32 from sat.core.log import getLogger 33 from sat.core.log import getLogger
33 log = getLogger(__name__) 34 log = getLogger(__name__)
34 from sat.core import exceptions 35 from sat.core import exceptions
35 from zope.interface import implements 36 from zope.interface import implements
37 import calendar 38 import calendar
38 import uuid 39 import uuid
39 import sys 40 import sys
40 41
41 42
42 class SatXMPPClient(wokkel_client.XMPPClient): 43 class SatXMPPEntity(object):
44 """Common code for Client and Component"""
45
46 def __init__(self, host_app, profile, max_retries):
47
48 self.factory.clientConnectionLost = self.connectionLost
49 self.factory.maxRetries = max_retries
50 # when self._connected is None, we are not connected
51 # else, it's a deferred which fire on disconnection
52 self._connected = None
53 self.profile = profile
54 self.host_app = host_app
55 self.cache = cache.Cache(host_app, profile)
56 self._mess_id_uid = {} # map from message id to uid used in history. Key: (full_jid,message_id) Value: uid
57 self.conn_deferred = defer.Deferred()
58
59 ## initialisation ##
60
61 @defer.inlineCallbacks
62 def _callConnectionTriggers(self):
63 """Call conneting trigger prepare connected trigger
64
65 @param plugins(iterable): plugins to use
66 @return (list[object, callable]): plugin to trigger tuples with:
67 - plugin instance
68 - profileConnected* triggers (to call after connection)
69 """
70 plugin_conn_cb = []
71 for plugin in self._getPluginsList():
72 # we check if plugin handle client mode
73 if plugin.is_handler:
74 plugin.getHandler(self).setHandlerParent(self)
75
76 # profileConnecting/profileConnected methods handling
77
78 # profile connecting is called right now (before actually starting client)
79 connecting_cb = getattr(plugin, "profileConnecting" + self.trigger_suffix, None)
80 if connecting_cb is not None:
81 yield connecting_cb(self)
82
83 # profile connected is called after client is ready and roster is got
84 connected_cb = getattr(plugin, "profileConnected" + self.trigger_suffix, None)
85 if connected_cb is not None:
86 plugin_conn_cb.append((plugin, connected_cb))
87
88 defer.returnValue(plugin_conn_cb)
89
90 def _getPluginsList(self):
91 """Return list of plugin to use
92
93 need to be implemented by subclasses
94 this list is used to call profileConnect* triggers
95 @return(iterable[object]): plugins to use
96 """
97 raise NotImplementedError
98
99 def _createSubProtocols(self):
100 return
101
102 def entityConnected(self):
103 """Called once connection is done
104
105 may return a Deferred, to perform initialisation tasks
106 """
107 return
108
109 @classmethod
110 @defer.inlineCallbacks
111 def startConnection(cls, host, profile, max_retries):
112 """instantiate the entity and start the connection"""
113 # FIXME: reconnection doesn't seems to be handled correclty (client is deleted then recreated from scrash
114 # most of methods called here should be called once on first connection (e.g. adding subprotocols)
115 # but client should not be deleted except if session is finished (independently of connection/deconnection
116 #
117 try:
118 port = int(host.memory.getParamA(C.FORCE_PORT_PARAM, "Connection", profile_key=profile))
119 except ValueError:
120 log.debug(_("Can't parse port value, using default value"))
121 port = None # will use default value 5222 or be retrieved from a DNS SRV record
122
123 password = yield host.memory.asyncGetParamA("Password", "Connection", profile_key=profile)
124 entity = host.profiles[profile] = cls(host, profile,
125 jid.JID(host.memory.getParamA("JabberID", "Connection", profile_key=profile)),
126 password, host.memory.getParamA(C.FORCE_SERVER_PARAM, "Connection", profile_key=profile) or None,
127 port, max_retries)
128
129 entity._createSubProtocols()
130
131 entity.fallBack = SatFallbackHandler(host)
132 entity.fallBack.setHandlerParent(entity)
133
134 entity.versionHandler = SatVersionHandler(C.APP_NAME_FULL,
135 host.full_version)
136 entity.versionHandler.setHandlerParent(entity)
137
138 entity.identityHandler = SatIdentityHandler()
139 entity.identityHandler.setHandlerParent(entity)
140
141 log.debug(_("setting plugins parents"))
142
143 plugin_conn_cb = yield entity._callConnectionTriggers()
144
145 entity.startService()
146
147 yield entity.getConnectionDeferred()
148
149 yield defer.maybeDeferred(entity.entityConnected)
150
151 # Call profileConnected callback for all plugins, and print error message if any of them fails
152 conn_cb_list = []
153 for dummy, callback in plugin_conn_cb:
154 conn_cb_list.append(defer.maybeDeferred(callback, entity))
155 list_d = defer.DeferredList(conn_cb_list)
156
157 def logPluginResults(results):
158 all_succeed = all([success for success, result in results])
159 if not all_succeed:
160 log.error(_(u"Plugins initialisation error"))
161 for idx, (success, result) in enumerate(results):
162 if not success:
163 log.error(u"error (plugin %(name)s): %(failure)s" %
164 {'name': plugin_conn_cb[idx][0]._info['import_name'], 'failure': result})
165
166 yield list_d.addCallback(logPluginResults) # FIXME: we should have a timeout here, and a way to know if a plugin freeze
167 # TODO: mesure launch time of each plugin
168
169 def getConnectionDeferred(self):
170 """Return a deferred which fire when the client is connected"""
171 return self.conn_deferred
172
173 def _disconnectionCb(self, dummy):
174 self._connected = None
175
176 def _disconnectionEb(self, failure_):
177 log.error(_(u"Error while disconnecting: {}".format(failure_)))
178
179 def _authd(self, xmlstream):
180 if not self.host_app.trigger.point("XML Initialized", xmlstream, self.profile):
181 return
182 super(SatXMPPEntity, self)._authd(xmlstream)
183
184 # the following Deferred is used to know when we are connected
185 # so we need to be set it to None when connection is lost
186 self._connected = defer.Deferred()
187 self._connected.addCallback(self._cleanConnection)
188 self._connected.addCallback(self._disconnectionCb)
189 self._connected.addErrback(self._disconnectionEb)
190
191 log.info(_("********** [%s] CONNECTED **********") % self.profile)
192 self.streamInitialized()
193 self.host_app.bridge.connected(self.profile, unicode(self.jid)) # we send the signal to the clients
194
195 def _finish_connection(self, dummy):
196 self.conn_deferred.callback(None)
197
198 def streamInitialized(self):
199 """Called after _authd"""
200 log.debug(_(u"XML stream is initialized"))
201 self.keep_alife = task.LoopingCall(self.xmlstream.send, " ") # Needed to avoid disconnection (specially with openfire)
202 self.keep_alife.start(C.XMPP_KEEP_ALIFE)
203
204 self.disco = SatDiscoProtocol(self)
205 self.disco.setHandlerParent(self)
206 self.discoHandler = disco.DiscoHandler()
207 self.discoHandler.setHandlerParent(self)
208 disco_d = defer.succeed(None)
209
210 if not self.host_app.trigger.point("Disco handled", disco_d, self.profile):
211 return
212
213 disco_d.addCallback(self._finish_connection)
214
215 def initializationFailed(self, reason):
216 log.error(_(u"ERROR: XMPP connection failed for profile '%(profile)s': %(reason)s" % {'profile': self.profile, 'reason': reason}))
217 self.conn_deferred.errback(reason.value)
218 try:
219 super(SatXMPPEntity, self).initializationFailed(reason)
220 except:
221 # we already chained an errback, no need to raise an exception
222 pass
223
224 ## connection ##
225
226 def connectionLost(self, connector, reason):
227 try:
228 self.keep_alife.stop()
229 except AttributeError:
230 log.debug(_("No keep_alife"))
231 if self._connected is not None:
232 self.host_app.bridge.disconnected(self.profile) # we send the signal to the clients
233 self._connected.callback(None)
234 self.host_app.purgeEntity(self.profile) # and we remove references to this client
235 log.info(_("********** [%s] DISCONNECTED **********") % self.profile)
236 if not self.conn_deferred.called:
237 # FIXME: real error is not gotten here (e.g. if jid is not know by Prosody,
238 # we should have the real error)
239 self.conn_deferred.errback(error.StreamError(u"Server unexpectedly closed the connection"))
240
241 @defer.inlineCallbacks
242 def _cleanConnection(self, dummy):
243 """method called on disconnection
244
245 used to call profileDisconnected* triggers
246 """
247 trigger_name = "profileDisconnected" + self.trigger_suffix
248 for plugin in self._getPluginsList():
249 disconnected_cb = getattr(plugin, trigger_name, None)
250 if disconnected_cb is not None:
251 yield disconnected_cb(self)
252
253 def isConnected(self):
254 return self._connected is not None
255
256 def entityDisconnect(self):
257 log.info(_(u"Disconnecting..."))
258 self.stopService()
259 if self._connected is not None:
260 return self._connected
261 else:
262 return defer.succeed(None)
263
264 ## sending ##
265
266 def IQ(self, type_=u'set', timeout=None):
267 """shortcut to create an IQ element managing deferred
268
269 @param type_(unicode): IQ type ('set' or 'get')
270 @param timeout(None, int): timeout in seconds
271 @return((D)domish.Element: result stanza
272 errback is called if and error stanza is returned
273 """
274 iq_elt = xmlstream.IQ(self.xmlstream, type_)
275 iq_elt.timeout = timeout
276 return iq_elt
277
278 def sendError(self, iq_elt, condition):
279 """Send error stanza build from iq_elt
280
281 @param iq_elt(domish.Element): initial IQ element
282 @param condition(unicode): error condition
283 """
284 iq_error_elt = error.StanzaError(condition).toResponse(iq_elt)
285 self.xmlstream.send(iq_error_elt)
286
287 def generateMessageXML(self, data):
288 """Generate <message/> stanza from message data
289
290 @param data(dict): message data
291 domish element will be put in data['xml']
292 following keys are needed:
293 - from
294 - to
295 - uid: can be set to '' if uid attribute is not wanted
296 - message
297 - type
298 - subject
299 - extra
300 @return (dict) message data
301 """
302 data['xml'] = message_elt = domish.Element((None, 'message'))
303 message_elt["to"] = data["to"].full()
304 message_elt["from"] = data['from'].full()
305 message_elt["type"] = data["type"]
306 if data['uid']: # key must be present but can be set to ''
307 # by a plugin to avoid id on purpose
308 message_elt['id'] = data['uid']
309 for lang, subject in data["subject"].iteritems():
310 subject_elt = message_elt.addElement("subject", content=subject)
311 if lang:
312 subject_elt[(C.NS_XML, 'lang')] = lang
313 for lang, message in data["message"].iteritems():
314 body_elt = message_elt.addElement("body", content=message)
315 if lang:
316 body_elt[(C.NS_XML, 'lang')] = lang
317 try:
318 thread = data['extra']['thread']
319 except KeyError:
320 if 'thread_parent' in data['extra']:
321 raise exceptions.InternalError(u"thread_parent found while there is not associated thread")
322 else:
323 thread_elt = message_elt.addElement("thread", content=thread)
324 try:
325 thread_elt["parent"] = data["extra"]["thread_parent"]
326 except KeyError:
327 pass
328 return data
329
330 def addPostXmlCallbacks(self, post_xml_treatments):
331 """Used to add class level callbacks at the end of the workflow
332
333 @param post_xml_treatments(D): the same Deferred as in sendMessage trigger
334 """
335 raise NotImplementedError
336
337 def sendMessage(self, to_jid, message, subject=None, mess_type='auto', extra=None, uid=None, no_trigger=False):
338 """Send a message to an entity
339
340 @param to_jid(jid.JID): destinee of the message
341 @param message(dict): message body, key is the language (use '' when unknown)
342 @param subject(dict): message subject, key is the language (use '' when unknown)
343 @param mess_type(str): one of standard message type (cf RFC 6121 §5.2.2) or:
344 - auto: for automatic type detection
345 - info: for information ("info_type" can be specified in extra)
346 @param extra(dict, None): extra data. Key can be:
347 - info_type: information type, can be
348 TODO
349 @param uid(unicode, None): unique id:
350 should be unique at least in this XMPP session
351 if None, an uuid will be generated
352 @param no_trigger (bool): if True, sendMessage[suffix] trigger will no be used
353 useful when a message need to be sent without any modification
354 """
355 if subject is None:
356 subject = {}
357 if extra is None:
358 extra = {}
359 data = { # dict is similar to the one used in client.onMessage
360 "from": self.jid,
361 "to": to_jid,
362 "uid": uid or unicode(uuid.uuid4()),
363 "message": message,
364 "subject": subject,
365 "type": mess_type,
366 "extra": extra,
367 "timestamp": time.time(),
368 }
369 pre_xml_treatments = defer.Deferred() # XXX: plugin can add their pre XML treatments to this deferred
370 post_xml_treatments = defer.Deferred() # XXX: plugin can add their post XML treatments to this deferred
371
372 if data["type"] == "auto":
373 # we try to guess the type
374 if data["subject"]:
375 data["type"] = 'normal'
376 elif not data["to"].resource: # if to JID has a resource, the type is not 'groupchat'
377 # we may have a groupchat message, we check if the we know this jid
378 try:
379 entity_type = self.host_app.memory.getEntityData(data["to"], ['type'], self.profile)["type"]
380 #FIXME: should entity_type manage resources ?
381 except (exceptions.UnknownEntityError, KeyError):
382 entity_type = "contact"
383
384 if entity_type == "chatroom":
385 data["type"] = 'groupchat'
386 else:
387 data["type"] = 'chat'
388 else:
389 data["type"] == 'chat'
390 data["type"] == "chat" if data["subject"] else "normal"
391
392 # FIXME: send_only is used by libervia's OTR plugin to avoid
393 # the triggers from frontend, and no_trigger do the same
394 # thing internally, this could be unified
395 send_only = data['extra'].get('send_only', False)
396
397 if not no_trigger and not send_only:
398 if not self.host_app.trigger.point("sendMessage" + self.trigger_suffix, self, data, pre_xml_treatments, post_xml_treatments):
399 return defer.succeed(None)
400
401 log.debug(_(u"Sending message (type {type}, to {to})").format(type=data["type"], to=to_jid.full()))
402
403 pre_xml_treatments.addCallback(lambda dummy: self.generateMessageXML(data))
404 pre_xml_treatments.chainDeferred(post_xml_treatments)
405 post_xml_treatments.addCallback(self.sendMessageData)
406 if send_only:
407 log.debug(_("Triggers, storage and echo have been inhibited by the 'send_only' parameter"))
408 else:
409 self.addPostXmlCallbacks(post_xml_treatments)
410 post_xml_treatments.addErrback(self._cancelErrorTrap)
411 post_xml_treatments.addErrback(self.host_app.logErrback)
412 pre_xml_treatments.callback(data)
413 return pre_xml_treatments
414
415 def _cancelErrorTrap(self, failure):
416 """A message sending can be cancelled by a plugin treatment"""
417 failure.trap(exceptions.CancelError)
418
419 def messageAddToHistory(self, data):
420 """Store message into database (for local history)
421
422 @param data: message data dictionnary
423 @param client: profile's client
424 """
425 if data[u"type"] != C.MESS_TYPE_GROUPCHAT:
426 # we don't add groupchat message to history, as we get them back
427 # and they will be added then
428 if data[u'message'] or data[u'subject']: # we need a message to store
429 self.host_app.memory.addToHistory(self, data)
430 else:
431 log.warning(u"No message found") # empty body should be managed by plugins before this point
432 return data
433
434 def messageSendToBridge(self, data):
435 """Send message to bridge, so frontends can display it
436
437 @param data: message data dictionnary
438 @param client: profile's client
439 """
440 if data[u"type"] != C.MESS_TYPE_GROUPCHAT:
441 # we don't send groupchat message to bridge, as we get them back
442 # and they will be added the
443 if data[u'message'] or data[u'subject']: # we need a message to send something
444 # We send back the message, so all frontends are aware of it
445 self.host_app.bridge.messageNew(data[u'uid'], data[u'timestamp'], data[u'from'].full(), data[u'to'].full(), data[u'message'], data[u'subject'], data[u'type'], data[u'extra'], profile=self.profile)
446 else:
447 log.warning(_(u"No message found"))
448 return data
449
450
451 class SatXMPPClient(SatXMPPEntity, wokkel_client.XMPPClient):
43 implements(iwokkel.IDisco) 452 implements(iwokkel.IDisco)
453 trigger_suffix = ""
454 component = False
44 455
45 def __init__(self, host_app, profile, user_jid, password, host=None, port=C.XMPP_C2S_PORT, max_retries=C.XMPP_MAX_RETRIES): 456 def __init__(self, host_app, profile, user_jid, password, host=None, port=C.XMPP_C2S_PORT, max_retries=C.XMPP_MAX_RETRIES):
46 # XXX: DNS SRV records are checked when the host is not specified. 457 # XXX: DNS SRV records are checked when the host is not specified.
47 # If no SRV record is found, the host is directly extracted from the JID. 458 # If no SRV record is found, the host is directly extracted from the JID.
48 self.started = time.time() 459 self.started = time.time()
460
461 # Currently, we use "client/pc/Salut à Toi", but as
462 # SàT is multi-frontends and can be used on mobile devices, as a bot, with a web frontend,
463 # etc., we should implement a way to dynamically update identities through the bridge
464 self.identities = [disco.DiscoIdentity(u"client", u"pc", C.APP_NAME)]
49 if sys.platform == "android": 465 if sys.platform == "android":
50 # FIXME: temporary hack as SRV is not working on android 466 # FIXME: temporary hack as SRV is not working on android
51 # TODO: remove this hack and fix SRV 467 # TODO: remove this hack and fix SRV
52 log.info(u"FIXME: Android hack, ignoring SRV") 468 log.info(u"FIXME: Android hack, ignoring SRV")
53 host = user_jid.host 469 host = user_jid.host
56 log.info(u"using {host_to_use} for host {host_ori} as requested in config".format( 472 log.info(u"using {host_to_use} for host {host_ori} as requested in config".format(
57 host_ori = host, 473 host_ori = host,
58 host_to_use = hosts_map[host])) 474 host_to_use = hosts_map[host]))
59 host = hosts_map[host] 475 host = hosts_map[host]
60 wokkel_client.XMPPClient.__init__(self, user_jid, password, host or None, port or C.XMPP_C2S_PORT) 476 wokkel_client.XMPPClient.__init__(self, user_jid, password, host or None, port or C.XMPP_C2S_PORT)
61 self.factory.clientConnectionLost = self.connectionLost 477 SatXMPPEntity.__init__(self, host_app, profile, max_retries)
62 self.factory.maxRetries = max_retries
63 self.__connected = False
64 self.profile = profile
65 self.host_app = host_app
66 self.cache = cache.Cache(host_app, profile)
67 self._mess_id_uid = {} # map from message id to uid use in history. Key: (full_jid,message_id) Value: uid
68 self.conn_deferred = defer.Deferred()
69 self._progress_cb = {} # callback called when a progress is requested (key = progress id) 478 self._progress_cb = {} # callback called when a progress is requested (key = progress id)
70 self.actions = {} # used to keep track of actions for retrieval (key = action_id) 479 self.actions = {} # used to keep track of actions for retrieval (key = action_id)
71 480
72 def getConnectionDeferred(self): 481 def _getPluginsList(self):
73 """Return a deferred which fire when the client is connected""" 482 for p in self.host_app.plugins.itervalues():
74 return self.conn_deferred 483 if C.PLUG_MODE_CLIENT in p._info[u'modes']:
75 484 yield p
76 def IQ(self, type_=u'set', timeout=None): 485
77 """shortcut to create an IQ element managing deferred 486 def _createSubProtocols(self):
78 487 self.messageProt = SatMessageProtocol(self.host_app)
79 @param type_(unicode): IQ type ('set' or 'get') 488 self.messageProt.setHandlerParent(self)
80 @param timeout(None, int): timeout in seconds 489
81 @return((D)domish.Element: result stanza 490 self.roster = SatRosterProtocol(self.host_app)
82 errback is called if and error stanza is returned 491 self.roster.setHandlerParent(self)
83 """ 492
84 iq_elt = xmlstream.IQ(self.xmlstream, type_) 493 self.presence = SatPresenceProtocol(self.host_app)
85 iq_elt.timeout = timeout 494 self.presence.setHandlerParent(self)
86 return iq_elt 495
87 496 def entityConnected(self):
88 def sendError(self, iq_elt, condition): 497 # we want to be sure that we got the roster
89 """Send error stanza build from iq_elt 498 return self.roster.got_roster
90 499
91 @param iq_elt(domish.Element): initial IQ element 500 def addPostXmlCallbacks(self, post_xml_treatments):
92 @param condition(unicode): error condition 501 post_xml_treatments.addCallback(self.messageAddToHistory)
93 """ 502 post_xml_treatments.addCallback(self.messageSendToBridge)
94 iq_error_elt = error.StanzaError(condition).toResponse(iq_elt)
95 self.xmlstream.send(iq_error_elt)
96 503
97 def send(self, obj): 504 def send(self, obj):
98 # original send method accept string 505 # original send method accept string
99 # but we restrict to domish.Element to make trigger treatments easier 506 # but we restrict to domish.Element to make trigger treatments easier
100 assert isinstance(obj, domish.Element) 507 assert isinstance(obj, domish.Element)
107 # FIXME: trigger not used yet, can be uncommented when e2e full stanza encryption is implemented 514 # FIXME: trigger not used yet, can be uncommented when e2e full stanza encryption is implemented
108 # if not self.host_app.trigger.point("send", self, obj): 515 # if not self.host_app.trigger.point("send", self, obj):
109 #  return 516 #  return
110 super(SatXMPPClient, self).send(obj) 517 super(SatXMPPClient, self).send(obj)
111 518
112 def sendMessage(self, mess_data): 519 def sendMessageData(self, mess_data):
113 """Convenient method to send message data to stream 520 """Convenient method to send message data to stream
114 521
115 This method will send mess_data[u'xml'] to stream, but a trigger is there 522 This method will send mess_data[u'xml'] to stream, but a trigger is there
116 The trigger can't be cancelled, it's a good place for e2e encryption which 523 The trigger can't be cancelled, it's a good place for e2e encryption which
117 don't handle full stanza encryption 524 don't handle full stanza encryption
119 @return (dict): mess_data (so it can be used in a deferred chain) 526 @return (dict): mess_data (so it can be used in a deferred chain)
120 """ 527 """
121 # XXX: This is the last trigger before u"send" (last but one globally) for sending message. 528 # XXX: This is the last trigger before u"send" (last but one globally) for sending message.
122 # This is intented for e2e encryption which doesn't do full stanza encryption (e.g. OTR) 529 # This is intented for e2e encryption which doesn't do full stanza encryption (e.g. OTR)
123 # This trigger point can't cancel the method 530 # This trigger point can't cancel the method
124 self.host_app.trigger.point("sendMessageFinish", self, mess_data) 531 self.host_app.trigger.point("sendMessageData", self, mess_data)
125 self.send(mess_data[u'xml']) 532 self.send(mess_data[u'xml'])
126 return mess_data 533 return mess_data
127 534
128 def feedback(self, to_jid, message): 535 def feedback(self, to_jid, message):
129 """Send message to frontends 536 """Send message to frontends
130 537
131 This message will be an info message, not recorded in history. 538 This message will be an info message, not recorded in history.
132 It can be used to give feedback of a command 539 It can be used to give feedback of a command
133 @param to_jid(jid.Jid): destinee jid 540 @param to_jid(jid.JID): destinee jid
134 @param message(unicode): message to send to frontends 541 @param message(unicode): message to send to frontends
135 """ 542 """
136 self.host_app.bridge.messageNew(uid=unicode(uuid.uuid4()), 543 self.host_app.bridge.messageNew(uid=unicode(uuid.uuid4()),
137 timestamp=time.time(), 544 timestamp=time.time(),
138 from_jid=self.jid.full(), 545 from_jid=self.jid.full(),
141 subject={}, 548 subject={},
142 mess_type=C.MESS_TYPE_INFO, 549 mess_type=C.MESS_TYPE_INFO,
143 extra={}, 550 extra={},
144 profile=self.profile) 551 profile=self.profile)
145 552
146 def _authd(self, xmlstream): 553 def _finish_connection(self, dummy):
147 if not self.host_app.trigger.point("XML Initialized", xmlstream, self.profile): 554 self.roster.requestRoster()
148 return 555 self.presence.available()
149 wokkel_client.XMPPClient._authd(self, xmlstream) 556 super(SatXMPPClient, self)._finish_connection(dummy)
150 self.__connected = True 557
151 log.info(_("********** [%s] CONNECTED **********") % self.profile) 558
152 self.streamInitialized() 559 class SatXMPPComponent(SatXMPPEntity, component.Component):
153 self.host_app.bridge.connected(self.profile, unicode(self.jid)) # we send the signal to the clients 560 """XMPP component
154 561
155 def streamInitialized(self): 562 This component are similar but not identical to clients.
156 """Called after _authd""" 563 An entry point plugin is launched after component is connected.
157 log.debug(_("XML stream is initialized")) 564 Component need to instantiate MessageProtocol itself
158 self.keep_alife = task.LoopingCall(self.xmlstream.send, " ") # Needed to avoid disconnection (specially with openfire) 565 """
159 self.keep_alife.start(C.XMPP_KEEP_ALIFE) 566 implements(iwokkel.IDisco)
160 567 trigger_suffix = "Component" # used for to distinguish some trigger points set in SatXMPPEntity
161 self.disco = SatDiscoProtocol(self) 568 component = True
162 self.disco.setHandlerParent(self) 569 sendHistory = False # XXX: set to True from entry plugin to keep messages in history for received messages
163 self.discoHandler = disco.DiscoHandler() 570
164 self.discoHandler.setHandlerParent(self) 571 def __init__(self, host_app, profile, component_jid, password, host=None, port=None, max_retries=C.XMPP_MAX_RETRIES):
165 disco_d = defer.succeed(None) 572 self.started = time.time()
166 573 if port is None:
167 if not self.host_app.trigger.point("Disco handled", disco_d, self.profile): 574 port = C.XMPP_COMPONENT_PORT
168 return 575
169 576 ## entry point ##
170 def finish_connection(dummy): 577 entry_point = host_app.memory.getEntryPoint(profile)
171 self.roster.requestRoster()
172 self.presence.available()
173 self.conn_deferred.callback(None)
174
175 disco_d.addCallback(finish_connection)
176
177 def initializationFailed(self, reason):
178 log.error(_(u"ERROR: XMPP connection failed for profile '%(profile)s': %(reason)s" % {'profile': self.profile, 'reason': reason}))
179 self.conn_deferred.errback(reason.value)
180 try: 578 try:
181 wokkel_client.XMPPClient.initializationFailed(self, reason) 579 self.entry_plugin = host_app.plugins[entry_point]
182 except: 580 except KeyError:
183 # we already chained an errback, no need to raise an exception 581 raise exceptions.NotFound(_(u"The requested entry point ({entry_point}) is not available").format(
184 pass 582 entry_point = entry_point))
185 583
186 def isConnected(self): 584 self.identities = [disco.DiscoIdentity(u"component", u"generic", C.APP_NAME)]
187 return self.__connected 585 # jid is set automatically on bind by Twisted for Client, but not for Component
188 586 self.jid = component_jid
189 def connectionLost(self, connector, unused_reason): 587 if host is None:
190 try: 588 try:
191 self.keep_alife.stop() 589 host = component_jid.host.split(u'.', 1)[1]
192 except AttributeError: 590 except IndexError:
193 log.debug(_("No keep_alife")) 591 raise ValueError(u"Can't guess host from jid, please specify a host")
194 if self.__connected: 592 # XXX: component.Component expect unicode jid, while Client expect jid.JID.
195 log.info(_("********** [%s] DISCONNECTED **********") % self.profile) 593 # this is not consistent, so we use jid.JID for SatXMPP*
196 self.host_app.bridge.disconnected(self.profile) # we send the signal to the clients 594 component.Component.__init__(self, host, port, component_jid.full(), password)
197 self.host_app.purgeClient(self.profile) # and we remove references to this client 595 SatXMPPEntity.__init__(self, host_app, profile, max_retries)
198 self.__connected = False 596
597 def _buildDependencies(self, current, plugins, required=True):
598 """build recursively dependencies needed for a plugin
599
600 this method build list of plugin needed for a component and raises
601 errors if they are not available or not allowed for components
602 @param current(object): parent plugin to check
603 use entry_point for first call
604 @param plugins(list): list of validated plugins, will be filled by the method
605 give an empty list for first call
606 @param required(bool): True if plugin is mandatory
607 for recursive calls only, should not be modified by inital caller
608 @raise InternalError: one of the plugin is not handling components
609 @raise KeyError: one plugin should be present in self.host_app.plugins but it is not
610 """
611 if C.PLUG_MODE_COMPONENT not in current._info[u'modes']:
612 if not required:
613 return
614 else:
615 log.error(_(u"Plugin {current_name} if needed for {entry_name}, but it doesn't handle component mode").format(
616 current_name = current._info[u'import_name'],
617 entry_name = self.entry_plugin._info[u'import_name']
618 ))
619 raise exceptions.InternalError(_(u"invalid plugin mode"))
620
621 for import_name in current._info.get(C.PI_DEPENDENCIES, []):
622 # plugins are already loaded as dependencies
623 # so we know they are in self.host_app.plugins
624 dep = self.host_app.plugins[import_name]
625 self._checkDependencies(dep, plugins)
626
627 for import_name in current._info.get(C.PI_RECOMMENDATIONS, []):
628 # here plugins are only recommendations,
629 # so they may not exist in self.host_app.plugins
630 try:
631 dep = self.host_app.plugins[import_name]
632 except KeyError:
633 continue
634 self._buildDependencies(dep, plugins, required = False)
635
636 if current not in plugins:
637 # current can be required for several plugins and so
638 # it can already be present in the list
639 plugins.append(current)
640
641 def _getPluginsList(self):
642 # XXX: for component we don't launch all plugins triggers
643 # but only the ones from which there is a dependency
644 plugins = []
645 self._buildDependencies(self.entry_plugin, plugins)
646 return plugins
647
648 def entityConnected(self):
649 # we can now launch entry point
650 return self.entry_plugin.componentStart(self)
651
652 def addPostXmlCallbacks(self, post_xml_treatments):
653 if self.sendHistory:
654 post_xml_treatments.addCallback(self.messageAddToHistory)
199 655
200 656
201 class SatMessageProtocol(xmppim.MessageProtocol): 657 class SatMessageProtocol(xmppim.MessageProtocol):
202 658
203 def __init__(self, host): 659 def __init__(self, host):
430 """Return True if jid is in roster""" 886 """Return True if jid is in roster"""
431 return entity_jid in self._jids 887 return entity_jid in self._jids
432 888
433 def isPresenceAuthorised(self, entity_jid): 889 def isPresenceAuthorised(self, entity_jid):
434 """Return True if entity is authorised to see our presence""" 890 """Return True if entity is authorised to see our presence"""
435 import pudb
436 pudb.set_trace()
437 try: 891 try:
438 item = self._jids[entity_jid.userhostJID()] 892 item = self._jids[entity_jid.userhostJID()]
439 except KeyError: 893 except KeyError:
440 return False 894 return False
441 return item.subscriptionFrom 895 return item.subscriptionFrom
685 # ask for disco info, and not when we generate the key, so the hash is used with different 1139 # ask for disco info, and not when we generate the key, so the hash is used with different
686 # disco features, and when the server (seen on ejabberd) generate its own hash for security check 1140 # disco features, and when the server (seen on ejabberd) generate its own hash for security check
687 # it reject our features (resulting in e.g. no notification on PEP) 1141 # it reject our features (resulting in e.g. no notification on PEP)
688 return generic.VersionHandler.getDiscoInfo(self, requestor, target, None) 1142 return generic.VersionHandler.getDiscoInfo(self, requestor, target, None)
689 1143
1144
690 class SatIdentityHandler(XMPPHandler): 1145 class SatIdentityHandler(XMPPHandler):
691 """ Manage disco Identity of SàT. Currently, we use "client/pc/Salut à Toi", but as 1146 """ Manage disco Identity of SàT.
692 SàT is multi-frontends and can be used on mobile devices, as a bot, with a web frontend, 1147
693 etc, we should implement a way to dynamically update identities through the bridge """ 1148 """
694 #TODO: dynamic identity update (see docstring). Note that a XMPP entity can have several identities 1149 #TODO: dynamic identity update (see docstring). Note that a XMPP entity can have several identities
695 implements(iwokkel.IDisco) 1150 implements(iwokkel.IDisco)
696 1151
697 def getDiscoInfo(self, requestor, target, nodeIdentifier=''): 1152 def getDiscoInfo(self, requestor, target, nodeIdentifier=''):
698 return [disco.DiscoIdentity(u"client", u"pc", C.APP_NAME)] 1153 return self.parent.identities
699 1154
700 def getDiscoItems(self, requestor, target, nodeIdentifier=''): 1155 def getDiscoItems(self, requestor, target, nodeIdentifier=''):
701 return [] 1156 return []