Mercurial > libervia-backend
comparison src/core/xmpp.py @ 2144:1d3f73e065e1
core, jp: component handling + client handling refactoring:
- SàT can now handle components
- plugin have now a "modes" key in PLUGIN_INFO where they declare if they can be used with clients and or components. They default to be client only.
- components are really similar to clients, but with some changes in behaviour:
* component has "entry point", which is a special plugin with a componentStart method, which is called just after component is connected
* trigger end with a different suffixes (e.g. profileConnected vs profileConnectedComponent), so a plugin which manage both clients and components can have different workflow
* for clients, only triggers of plugins handling client mode are launched
* for components, only triggers of plugins needed in dependencies are launched. They all must handle component mode.
* component have a sendHistory attribute (False by default) which can be set to True to allow saving sent messages into history
* for convenience, "client" is still used in method even if it can now be a component
* a new "component" boolean attribute tells if we have a component or a client
* components have to add themselve Message protocol
* roster and presence protocols are not added for components
* component default port is 5347 (which is Prosody's default port)
- asyncCreateProfile has been renamed for profileCreate, both to follow new naming convention and to prepare the transition to fully asynchronous bridge
- createProfile has a new "component" attribute. When used to create a component, it must be set to a component entry point
- jp: added --component argument to profile/create
- disconnect bridge method is now asynchronous, this way frontends can know when disconnection is finished
- new PI_* constants for PLUGIN_INFO values (not used everywhere yet)
- client/component connection workflow has been moved to their classes instead of being a host methods
- host.messageSend is now client.sendMessage, and former client.sendMessage is now client.sendMessageData.
- identities are now handled in client.identities list, so it can be updated dynamically by plugins (in the future, frontends should be able to update them too through bridge)
- profileConnecting* profileConnected* profileDisconnected* and getHandler now all use client instead of profile
author | Goffi <goffi@goffi.org> |
---|---|
date | Sun, 12 Feb 2017 17:55:43 +0100 |
parents | f8401024ab28 |
children | 545a1261ac3b |
comparison
equal
deleted
inserted
replaced
2143:c3cac21157d4 | 2144:1d3f73e065e1 |
---|---|
26 from twisted.words.protocols.jabber import error | 26 from twisted.words.protocols.jabber import error |
27 from twisted.words.protocols.jabber import jid | 27 from twisted.words.protocols.jabber import jid |
28 from twisted.words.xish import domish | 28 from twisted.words.xish import domish |
29 from twisted.python import failure | 29 from twisted.python import failure |
30 from wokkel import client as wokkel_client, disco, xmppim, generic, iwokkel | 30 from wokkel import client as wokkel_client, disco, xmppim, generic, iwokkel |
31 from wokkel import component | |
31 from wokkel import delay | 32 from wokkel import delay |
32 from sat.core.log import getLogger | 33 from sat.core.log import getLogger |
33 log = getLogger(__name__) | 34 log = getLogger(__name__) |
34 from sat.core import exceptions | 35 from sat.core import exceptions |
35 from zope.interface import implements | 36 from zope.interface import implements |
37 import calendar | 38 import calendar |
38 import uuid | 39 import uuid |
39 import sys | 40 import sys |
40 | 41 |
41 | 42 |
42 class SatXMPPClient(wokkel_client.XMPPClient): | 43 class SatXMPPEntity(object): |
44 """Common code for Client and Component""" | |
45 | |
46 def __init__(self, host_app, profile, max_retries): | |
47 | |
48 self.factory.clientConnectionLost = self.connectionLost | |
49 self.factory.maxRetries = max_retries | |
50 # when self._connected is None, we are not connected | |
51 # else, it's a deferred which fire on disconnection | |
52 self._connected = None | |
53 self.profile = profile | |
54 self.host_app = host_app | |
55 self.cache = cache.Cache(host_app, profile) | |
56 self._mess_id_uid = {} # map from message id to uid used in history. Key: (full_jid,message_id) Value: uid | |
57 self.conn_deferred = defer.Deferred() | |
58 | |
59 ## initialisation ## | |
60 | |
61 @defer.inlineCallbacks | |
62 def _callConnectionTriggers(self): | |
63 """Call conneting trigger prepare connected trigger | |
64 | |
65 @param plugins(iterable): plugins to use | |
66 @return (list[object, callable]): plugin to trigger tuples with: | |
67 - plugin instance | |
68 - profileConnected* triggers (to call after connection) | |
69 """ | |
70 plugin_conn_cb = [] | |
71 for plugin in self._getPluginsList(): | |
72 # we check if plugin handle client mode | |
73 if plugin.is_handler: | |
74 plugin.getHandler(self).setHandlerParent(self) | |
75 | |
76 # profileConnecting/profileConnected methods handling | |
77 | |
78 # profile connecting is called right now (before actually starting client) | |
79 connecting_cb = getattr(plugin, "profileConnecting" + self.trigger_suffix, None) | |
80 if connecting_cb is not None: | |
81 yield connecting_cb(self) | |
82 | |
83 # profile connected is called after client is ready and roster is got | |
84 connected_cb = getattr(plugin, "profileConnected" + self.trigger_suffix, None) | |
85 if connected_cb is not None: | |
86 plugin_conn_cb.append((plugin, connected_cb)) | |
87 | |
88 defer.returnValue(plugin_conn_cb) | |
89 | |
90 def _getPluginsList(self): | |
91 """Return list of plugin to use | |
92 | |
93 need to be implemented by subclasses | |
94 this list is used to call profileConnect* triggers | |
95 @return(iterable[object]): plugins to use | |
96 """ | |
97 raise NotImplementedError | |
98 | |
99 def _createSubProtocols(self): | |
100 return | |
101 | |
102 def entityConnected(self): | |
103 """Called once connection is done | |
104 | |
105 may return a Deferred, to perform initialisation tasks | |
106 """ | |
107 return | |
108 | |
109 @classmethod | |
110 @defer.inlineCallbacks | |
111 def startConnection(cls, host, profile, max_retries): | |
112 """instantiate the entity and start the connection""" | |
113 # FIXME: reconnection doesn't seems to be handled correclty (client is deleted then recreated from scrash | |
114 # most of methods called here should be called once on first connection (e.g. adding subprotocols) | |
115 # but client should not be deleted except if session is finished (independently of connection/deconnection | |
116 # | |
117 try: | |
118 port = int(host.memory.getParamA(C.FORCE_PORT_PARAM, "Connection", profile_key=profile)) | |
119 except ValueError: | |
120 log.debug(_("Can't parse port value, using default value")) | |
121 port = None # will use default value 5222 or be retrieved from a DNS SRV record | |
122 | |
123 password = yield host.memory.asyncGetParamA("Password", "Connection", profile_key=profile) | |
124 entity = host.profiles[profile] = cls(host, profile, | |
125 jid.JID(host.memory.getParamA("JabberID", "Connection", profile_key=profile)), | |
126 password, host.memory.getParamA(C.FORCE_SERVER_PARAM, "Connection", profile_key=profile) or None, | |
127 port, max_retries) | |
128 | |
129 entity._createSubProtocols() | |
130 | |
131 entity.fallBack = SatFallbackHandler(host) | |
132 entity.fallBack.setHandlerParent(entity) | |
133 | |
134 entity.versionHandler = SatVersionHandler(C.APP_NAME_FULL, | |
135 host.full_version) | |
136 entity.versionHandler.setHandlerParent(entity) | |
137 | |
138 entity.identityHandler = SatIdentityHandler() | |
139 entity.identityHandler.setHandlerParent(entity) | |
140 | |
141 log.debug(_("setting plugins parents")) | |
142 | |
143 plugin_conn_cb = yield entity._callConnectionTriggers() | |
144 | |
145 entity.startService() | |
146 | |
147 yield entity.getConnectionDeferred() | |
148 | |
149 yield defer.maybeDeferred(entity.entityConnected) | |
150 | |
151 # Call profileConnected callback for all plugins, and print error message if any of them fails | |
152 conn_cb_list = [] | |
153 for dummy, callback in plugin_conn_cb: | |
154 conn_cb_list.append(defer.maybeDeferred(callback, entity)) | |
155 list_d = defer.DeferredList(conn_cb_list) | |
156 | |
157 def logPluginResults(results): | |
158 all_succeed = all([success for success, result in results]) | |
159 if not all_succeed: | |
160 log.error(_(u"Plugins initialisation error")) | |
161 for idx, (success, result) in enumerate(results): | |
162 if not success: | |
163 log.error(u"error (plugin %(name)s): %(failure)s" % | |
164 {'name': plugin_conn_cb[idx][0]._info['import_name'], 'failure': result}) | |
165 | |
166 yield list_d.addCallback(logPluginResults) # FIXME: we should have a timeout here, and a way to know if a plugin freeze | |
167 # TODO: mesure launch time of each plugin | |
168 | |
169 def getConnectionDeferred(self): | |
170 """Return a deferred which fire when the client is connected""" | |
171 return self.conn_deferred | |
172 | |
173 def _disconnectionCb(self, dummy): | |
174 self._connected = None | |
175 | |
176 def _disconnectionEb(self, failure_): | |
177 log.error(_(u"Error while disconnecting: {}".format(failure_))) | |
178 | |
179 def _authd(self, xmlstream): | |
180 if not self.host_app.trigger.point("XML Initialized", xmlstream, self.profile): | |
181 return | |
182 super(SatXMPPEntity, self)._authd(xmlstream) | |
183 | |
184 # the following Deferred is used to know when we are connected | |
185 # so we need to be set it to None when connection is lost | |
186 self._connected = defer.Deferred() | |
187 self._connected.addCallback(self._cleanConnection) | |
188 self._connected.addCallback(self._disconnectionCb) | |
189 self._connected.addErrback(self._disconnectionEb) | |
190 | |
191 log.info(_("********** [%s] CONNECTED **********") % self.profile) | |
192 self.streamInitialized() | |
193 self.host_app.bridge.connected(self.profile, unicode(self.jid)) # we send the signal to the clients | |
194 | |
195 def _finish_connection(self, dummy): | |
196 self.conn_deferred.callback(None) | |
197 | |
198 def streamInitialized(self): | |
199 """Called after _authd""" | |
200 log.debug(_(u"XML stream is initialized")) | |
201 self.keep_alife = task.LoopingCall(self.xmlstream.send, " ") # Needed to avoid disconnection (specially with openfire) | |
202 self.keep_alife.start(C.XMPP_KEEP_ALIFE) | |
203 | |
204 self.disco = SatDiscoProtocol(self) | |
205 self.disco.setHandlerParent(self) | |
206 self.discoHandler = disco.DiscoHandler() | |
207 self.discoHandler.setHandlerParent(self) | |
208 disco_d = defer.succeed(None) | |
209 | |
210 if not self.host_app.trigger.point("Disco handled", disco_d, self.profile): | |
211 return | |
212 | |
213 disco_d.addCallback(self._finish_connection) | |
214 | |
215 def initializationFailed(self, reason): | |
216 log.error(_(u"ERROR: XMPP connection failed for profile '%(profile)s': %(reason)s" % {'profile': self.profile, 'reason': reason})) | |
217 self.conn_deferred.errback(reason.value) | |
218 try: | |
219 super(SatXMPPEntity, self).initializationFailed(reason) | |
220 except: | |
221 # we already chained an errback, no need to raise an exception | |
222 pass | |
223 | |
224 ## connection ## | |
225 | |
226 def connectionLost(self, connector, reason): | |
227 try: | |
228 self.keep_alife.stop() | |
229 except AttributeError: | |
230 log.debug(_("No keep_alife")) | |
231 if self._connected is not None: | |
232 self.host_app.bridge.disconnected(self.profile) # we send the signal to the clients | |
233 self._connected.callback(None) | |
234 self.host_app.purgeEntity(self.profile) # and we remove references to this client | |
235 log.info(_("********** [%s] DISCONNECTED **********") % self.profile) | |
236 if not self.conn_deferred.called: | |
237 # FIXME: real error is not gotten here (e.g. if jid is not know by Prosody, | |
238 # we should have the real error) | |
239 self.conn_deferred.errback(error.StreamError(u"Server unexpectedly closed the connection")) | |
240 | |
241 @defer.inlineCallbacks | |
242 def _cleanConnection(self, dummy): | |
243 """method called on disconnection | |
244 | |
245 used to call profileDisconnected* triggers | |
246 """ | |
247 trigger_name = "profileDisconnected" + self.trigger_suffix | |
248 for plugin in self._getPluginsList(): | |
249 disconnected_cb = getattr(plugin, trigger_name, None) | |
250 if disconnected_cb is not None: | |
251 yield disconnected_cb(self) | |
252 | |
253 def isConnected(self): | |
254 return self._connected is not None | |
255 | |
256 def entityDisconnect(self): | |
257 log.info(_(u"Disconnecting...")) | |
258 self.stopService() | |
259 if self._connected is not None: | |
260 return self._connected | |
261 else: | |
262 return defer.succeed(None) | |
263 | |
264 ## sending ## | |
265 | |
266 def IQ(self, type_=u'set', timeout=None): | |
267 """shortcut to create an IQ element managing deferred | |
268 | |
269 @param type_(unicode): IQ type ('set' or 'get') | |
270 @param timeout(None, int): timeout in seconds | |
271 @return((D)domish.Element: result stanza | |
272 errback is called if and error stanza is returned | |
273 """ | |
274 iq_elt = xmlstream.IQ(self.xmlstream, type_) | |
275 iq_elt.timeout = timeout | |
276 return iq_elt | |
277 | |
278 def sendError(self, iq_elt, condition): | |
279 """Send error stanza build from iq_elt | |
280 | |
281 @param iq_elt(domish.Element): initial IQ element | |
282 @param condition(unicode): error condition | |
283 """ | |
284 iq_error_elt = error.StanzaError(condition).toResponse(iq_elt) | |
285 self.xmlstream.send(iq_error_elt) | |
286 | |
287 def generateMessageXML(self, data): | |
288 """Generate <message/> stanza from message data | |
289 | |
290 @param data(dict): message data | |
291 domish element will be put in data['xml'] | |
292 following keys are needed: | |
293 - from | |
294 - to | |
295 - uid: can be set to '' if uid attribute is not wanted | |
296 - message | |
297 - type | |
298 - subject | |
299 - extra | |
300 @return (dict) message data | |
301 """ | |
302 data['xml'] = message_elt = domish.Element((None, 'message')) | |
303 message_elt["to"] = data["to"].full() | |
304 message_elt["from"] = data['from'].full() | |
305 message_elt["type"] = data["type"] | |
306 if data['uid']: # key must be present but can be set to '' | |
307 # by a plugin to avoid id on purpose | |
308 message_elt['id'] = data['uid'] | |
309 for lang, subject in data["subject"].iteritems(): | |
310 subject_elt = message_elt.addElement("subject", content=subject) | |
311 if lang: | |
312 subject_elt[(C.NS_XML, 'lang')] = lang | |
313 for lang, message in data["message"].iteritems(): | |
314 body_elt = message_elt.addElement("body", content=message) | |
315 if lang: | |
316 body_elt[(C.NS_XML, 'lang')] = lang | |
317 try: | |
318 thread = data['extra']['thread'] | |
319 except KeyError: | |
320 if 'thread_parent' in data['extra']: | |
321 raise exceptions.InternalError(u"thread_parent found while there is not associated thread") | |
322 else: | |
323 thread_elt = message_elt.addElement("thread", content=thread) | |
324 try: | |
325 thread_elt["parent"] = data["extra"]["thread_parent"] | |
326 except KeyError: | |
327 pass | |
328 return data | |
329 | |
330 def addPostXmlCallbacks(self, post_xml_treatments): | |
331 """Used to add class level callbacks at the end of the workflow | |
332 | |
333 @param post_xml_treatments(D): the same Deferred as in sendMessage trigger | |
334 """ | |
335 raise NotImplementedError | |
336 | |
337 def sendMessage(self, to_jid, message, subject=None, mess_type='auto', extra=None, uid=None, no_trigger=False): | |
338 """Send a message to an entity | |
339 | |
340 @param to_jid(jid.JID): destinee of the message | |
341 @param message(dict): message body, key is the language (use '' when unknown) | |
342 @param subject(dict): message subject, key is the language (use '' when unknown) | |
343 @param mess_type(str): one of standard message type (cf RFC 6121 §5.2.2) or: | |
344 - auto: for automatic type detection | |
345 - info: for information ("info_type" can be specified in extra) | |
346 @param extra(dict, None): extra data. Key can be: | |
347 - info_type: information type, can be | |
348 TODO | |
349 @param uid(unicode, None): unique id: | |
350 should be unique at least in this XMPP session | |
351 if None, an uuid will be generated | |
352 @param no_trigger (bool): if True, sendMessage[suffix] trigger will no be used | |
353 useful when a message need to be sent without any modification | |
354 """ | |
355 if subject is None: | |
356 subject = {} | |
357 if extra is None: | |
358 extra = {} | |
359 data = { # dict is similar to the one used in client.onMessage | |
360 "from": self.jid, | |
361 "to": to_jid, | |
362 "uid": uid or unicode(uuid.uuid4()), | |
363 "message": message, | |
364 "subject": subject, | |
365 "type": mess_type, | |
366 "extra": extra, | |
367 "timestamp": time.time(), | |
368 } | |
369 pre_xml_treatments = defer.Deferred() # XXX: plugin can add their pre XML treatments to this deferred | |
370 post_xml_treatments = defer.Deferred() # XXX: plugin can add their post XML treatments to this deferred | |
371 | |
372 if data["type"] == "auto": | |
373 # we try to guess the type | |
374 if data["subject"]: | |
375 data["type"] = 'normal' | |
376 elif not data["to"].resource: # if to JID has a resource, the type is not 'groupchat' | |
377 # we may have a groupchat message, we check if the we know this jid | |
378 try: | |
379 entity_type = self.host_app.memory.getEntityData(data["to"], ['type'], self.profile)["type"] | |
380 #FIXME: should entity_type manage resources ? | |
381 except (exceptions.UnknownEntityError, KeyError): | |
382 entity_type = "contact" | |
383 | |
384 if entity_type == "chatroom": | |
385 data["type"] = 'groupchat' | |
386 else: | |
387 data["type"] = 'chat' | |
388 else: | |
389 data["type"] == 'chat' | |
390 data["type"] == "chat" if data["subject"] else "normal" | |
391 | |
392 # FIXME: send_only is used by libervia's OTR plugin to avoid | |
393 # the triggers from frontend, and no_trigger do the same | |
394 # thing internally, this could be unified | |
395 send_only = data['extra'].get('send_only', False) | |
396 | |
397 if not no_trigger and not send_only: | |
398 if not self.host_app.trigger.point("sendMessage" + self.trigger_suffix, self, data, pre_xml_treatments, post_xml_treatments): | |
399 return defer.succeed(None) | |
400 | |
401 log.debug(_(u"Sending message (type {type}, to {to})").format(type=data["type"], to=to_jid.full())) | |
402 | |
403 pre_xml_treatments.addCallback(lambda dummy: self.generateMessageXML(data)) | |
404 pre_xml_treatments.chainDeferred(post_xml_treatments) | |
405 post_xml_treatments.addCallback(self.sendMessageData) | |
406 if send_only: | |
407 log.debug(_("Triggers, storage and echo have been inhibited by the 'send_only' parameter")) | |
408 else: | |
409 self.addPostXmlCallbacks(post_xml_treatments) | |
410 post_xml_treatments.addErrback(self._cancelErrorTrap) | |
411 post_xml_treatments.addErrback(self.host_app.logErrback) | |
412 pre_xml_treatments.callback(data) | |
413 return pre_xml_treatments | |
414 | |
415 def _cancelErrorTrap(self, failure): | |
416 """A message sending can be cancelled by a plugin treatment""" | |
417 failure.trap(exceptions.CancelError) | |
418 | |
419 def messageAddToHistory(self, data): | |
420 """Store message into database (for local history) | |
421 | |
422 @param data: message data dictionnary | |
423 @param client: profile's client | |
424 """ | |
425 if data[u"type"] != C.MESS_TYPE_GROUPCHAT: | |
426 # we don't add groupchat message to history, as we get them back | |
427 # and they will be added then | |
428 if data[u'message'] or data[u'subject']: # we need a message to store | |
429 self.host_app.memory.addToHistory(self, data) | |
430 else: | |
431 log.warning(u"No message found") # empty body should be managed by plugins before this point | |
432 return data | |
433 | |
434 def messageSendToBridge(self, data): | |
435 """Send message to bridge, so frontends can display it | |
436 | |
437 @param data: message data dictionnary | |
438 @param client: profile's client | |
439 """ | |
440 if data[u"type"] != C.MESS_TYPE_GROUPCHAT: | |
441 # we don't send groupchat message to bridge, as we get them back | |
442 # and they will be added the | |
443 if data[u'message'] or data[u'subject']: # we need a message to send something | |
444 # We send back the message, so all frontends are aware of it | |
445 self.host_app.bridge.messageNew(data[u'uid'], data[u'timestamp'], data[u'from'].full(), data[u'to'].full(), data[u'message'], data[u'subject'], data[u'type'], data[u'extra'], profile=self.profile) | |
446 else: | |
447 log.warning(_(u"No message found")) | |
448 return data | |
449 | |
450 | |
451 class SatXMPPClient(SatXMPPEntity, wokkel_client.XMPPClient): | |
43 implements(iwokkel.IDisco) | 452 implements(iwokkel.IDisco) |
453 trigger_suffix = "" | |
454 component = False | |
44 | 455 |
45 def __init__(self, host_app, profile, user_jid, password, host=None, port=C.XMPP_C2S_PORT, max_retries=C.XMPP_MAX_RETRIES): | 456 def __init__(self, host_app, profile, user_jid, password, host=None, port=C.XMPP_C2S_PORT, max_retries=C.XMPP_MAX_RETRIES): |
46 # XXX: DNS SRV records are checked when the host is not specified. | 457 # XXX: DNS SRV records are checked when the host is not specified. |
47 # If no SRV record is found, the host is directly extracted from the JID. | 458 # If no SRV record is found, the host is directly extracted from the JID. |
48 self.started = time.time() | 459 self.started = time.time() |
460 | |
461 # Currently, we use "client/pc/Salut à Toi", but as | |
462 # SàT is multi-frontends and can be used on mobile devices, as a bot, with a web frontend, | |
463 # etc., we should implement a way to dynamically update identities through the bridge | |
464 self.identities = [disco.DiscoIdentity(u"client", u"pc", C.APP_NAME)] | |
49 if sys.platform == "android": | 465 if sys.platform == "android": |
50 # FIXME: temporary hack as SRV is not working on android | 466 # FIXME: temporary hack as SRV is not working on android |
51 # TODO: remove this hack and fix SRV | 467 # TODO: remove this hack and fix SRV |
52 log.info(u"FIXME: Android hack, ignoring SRV") | 468 log.info(u"FIXME: Android hack, ignoring SRV") |
53 host = user_jid.host | 469 host = user_jid.host |
56 log.info(u"using {host_to_use} for host {host_ori} as requested in config".format( | 472 log.info(u"using {host_to_use} for host {host_ori} as requested in config".format( |
57 host_ori = host, | 473 host_ori = host, |
58 host_to_use = hosts_map[host])) | 474 host_to_use = hosts_map[host])) |
59 host = hosts_map[host] | 475 host = hosts_map[host] |
60 wokkel_client.XMPPClient.__init__(self, user_jid, password, host or None, port or C.XMPP_C2S_PORT) | 476 wokkel_client.XMPPClient.__init__(self, user_jid, password, host or None, port or C.XMPP_C2S_PORT) |
61 self.factory.clientConnectionLost = self.connectionLost | 477 SatXMPPEntity.__init__(self, host_app, profile, max_retries) |
62 self.factory.maxRetries = max_retries | |
63 self.__connected = False | |
64 self.profile = profile | |
65 self.host_app = host_app | |
66 self.cache = cache.Cache(host_app, profile) | |
67 self._mess_id_uid = {} # map from message id to uid use in history. Key: (full_jid,message_id) Value: uid | |
68 self.conn_deferred = defer.Deferred() | |
69 self._progress_cb = {} # callback called when a progress is requested (key = progress id) | 478 self._progress_cb = {} # callback called when a progress is requested (key = progress id) |
70 self.actions = {} # used to keep track of actions for retrieval (key = action_id) | 479 self.actions = {} # used to keep track of actions for retrieval (key = action_id) |
71 | 480 |
72 def getConnectionDeferred(self): | 481 def _getPluginsList(self): |
73 """Return a deferred which fire when the client is connected""" | 482 for p in self.host_app.plugins.itervalues(): |
74 return self.conn_deferred | 483 if C.PLUG_MODE_CLIENT in p._info[u'modes']: |
75 | 484 yield p |
76 def IQ(self, type_=u'set', timeout=None): | 485 |
77 """shortcut to create an IQ element managing deferred | 486 def _createSubProtocols(self): |
78 | 487 self.messageProt = SatMessageProtocol(self.host_app) |
79 @param type_(unicode): IQ type ('set' or 'get') | 488 self.messageProt.setHandlerParent(self) |
80 @param timeout(None, int): timeout in seconds | 489 |
81 @return((D)domish.Element: result stanza | 490 self.roster = SatRosterProtocol(self.host_app) |
82 errback is called if and error stanza is returned | 491 self.roster.setHandlerParent(self) |
83 """ | 492 |
84 iq_elt = xmlstream.IQ(self.xmlstream, type_) | 493 self.presence = SatPresenceProtocol(self.host_app) |
85 iq_elt.timeout = timeout | 494 self.presence.setHandlerParent(self) |
86 return iq_elt | 495 |
87 | 496 def entityConnected(self): |
88 def sendError(self, iq_elt, condition): | 497 # we want to be sure that we got the roster |
89 """Send error stanza build from iq_elt | 498 return self.roster.got_roster |
90 | 499 |
91 @param iq_elt(domish.Element): initial IQ element | 500 def addPostXmlCallbacks(self, post_xml_treatments): |
92 @param condition(unicode): error condition | 501 post_xml_treatments.addCallback(self.messageAddToHistory) |
93 """ | 502 post_xml_treatments.addCallback(self.messageSendToBridge) |
94 iq_error_elt = error.StanzaError(condition).toResponse(iq_elt) | |
95 self.xmlstream.send(iq_error_elt) | |
96 | 503 |
97 def send(self, obj): | 504 def send(self, obj): |
98 # original send method accept string | 505 # original send method accept string |
99 # but we restrict to domish.Element to make trigger treatments easier | 506 # but we restrict to domish.Element to make trigger treatments easier |
100 assert isinstance(obj, domish.Element) | 507 assert isinstance(obj, domish.Element) |
107 # FIXME: trigger not used yet, can be uncommented when e2e full stanza encryption is implemented | 514 # FIXME: trigger not used yet, can be uncommented when e2e full stanza encryption is implemented |
108 # if not self.host_app.trigger.point("send", self, obj): | 515 # if not self.host_app.trigger.point("send", self, obj): |
109 # return | 516 # return |
110 super(SatXMPPClient, self).send(obj) | 517 super(SatXMPPClient, self).send(obj) |
111 | 518 |
112 def sendMessage(self, mess_data): | 519 def sendMessageData(self, mess_data): |
113 """Convenient method to send message data to stream | 520 """Convenient method to send message data to stream |
114 | 521 |
115 This method will send mess_data[u'xml'] to stream, but a trigger is there | 522 This method will send mess_data[u'xml'] to stream, but a trigger is there |
116 The trigger can't be cancelled, it's a good place for e2e encryption which | 523 The trigger can't be cancelled, it's a good place for e2e encryption which |
117 don't handle full stanza encryption | 524 don't handle full stanza encryption |
119 @return (dict): mess_data (so it can be used in a deferred chain) | 526 @return (dict): mess_data (so it can be used in a deferred chain) |
120 """ | 527 """ |
121 # XXX: This is the last trigger before u"send" (last but one globally) for sending message. | 528 # XXX: This is the last trigger before u"send" (last but one globally) for sending message. |
122 # This is intented for e2e encryption which doesn't do full stanza encryption (e.g. OTR) | 529 # This is intented for e2e encryption which doesn't do full stanza encryption (e.g. OTR) |
123 # This trigger point can't cancel the method | 530 # This trigger point can't cancel the method |
124 self.host_app.trigger.point("sendMessageFinish", self, mess_data) | 531 self.host_app.trigger.point("sendMessageData", self, mess_data) |
125 self.send(mess_data[u'xml']) | 532 self.send(mess_data[u'xml']) |
126 return mess_data | 533 return mess_data |
127 | 534 |
128 def feedback(self, to_jid, message): | 535 def feedback(self, to_jid, message): |
129 """Send message to frontends | 536 """Send message to frontends |
130 | 537 |
131 This message will be an info message, not recorded in history. | 538 This message will be an info message, not recorded in history. |
132 It can be used to give feedback of a command | 539 It can be used to give feedback of a command |
133 @param to_jid(jid.Jid): destinee jid | 540 @param to_jid(jid.JID): destinee jid |
134 @param message(unicode): message to send to frontends | 541 @param message(unicode): message to send to frontends |
135 """ | 542 """ |
136 self.host_app.bridge.messageNew(uid=unicode(uuid.uuid4()), | 543 self.host_app.bridge.messageNew(uid=unicode(uuid.uuid4()), |
137 timestamp=time.time(), | 544 timestamp=time.time(), |
138 from_jid=self.jid.full(), | 545 from_jid=self.jid.full(), |
141 subject={}, | 548 subject={}, |
142 mess_type=C.MESS_TYPE_INFO, | 549 mess_type=C.MESS_TYPE_INFO, |
143 extra={}, | 550 extra={}, |
144 profile=self.profile) | 551 profile=self.profile) |
145 | 552 |
146 def _authd(self, xmlstream): | 553 def _finish_connection(self, dummy): |
147 if not self.host_app.trigger.point("XML Initialized", xmlstream, self.profile): | 554 self.roster.requestRoster() |
148 return | 555 self.presence.available() |
149 wokkel_client.XMPPClient._authd(self, xmlstream) | 556 super(SatXMPPClient, self)._finish_connection(dummy) |
150 self.__connected = True | 557 |
151 log.info(_("********** [%s] CONNECTED **********") % self.profile) | 558 |
152 self.streamInitialized() | 559 class SatXMPPComponent(SatXMPPEntity, component.Component): |
153 self.host_app.bridge.connected(self.profile, unicode(self.jid)) # we send the signal to the clients | 560 """XMPP component |
154 | 561 |
155 def streamInitialized(self): | 562 This component are similar but not identical to clients. |
156 """Called after _authd""" | 563 An entry point plugin is launched after component is connected. |
157 log.debug(_("XML stream is initialized")) | 564 Component need to instantiate MessageProtocol itself |
158 self.keep_alife = task.LoopingCall(self.xmlstream.send, " ") # Needed to avoid disconnection (specially with openfire) | 565 """ |
159 self.keep_alife.start(C.XMPP_KEEP_ALIFE) | 566 implements(iwokkel.IDisco) |
160 | 567 trigger_suffix = "Component" # used for to distinguish some trigger points set in SatXMPPEntity |
161 self.disco = SatDiscoProtocol(self) | 568 component = True |
162 self.disco.setHandlerParent(self) | 569 sendHistory = False # XXX: set to True from entry plugin to keep messages in history for received messages |
163 self.discoHandler = disco.DiscoHandler() | 570 |
164 self.discoHandler.setHandlerParent(self) | 571 def __init__(self, host_app, profile, component_jid, password, host=None, port=None, max_retries=C.XMPP_MAX_RETRIES): |
165 disco_d = defer.succeed(None) | 572 self.started = time.time() |
166 | 573 if port is None: |
167 if not self.host_app.trigger.point("Disco handled", disco_d, self.profile): | 574 port = C.XMPP_COMPONENT_PORT |
168 return | 575 |
169 | 576 ## entry point ## |
170 def finish_connection(dummy): | 577 entry_point = host_app.memory.getEntryPoint(profile) |
171 self.roster.requestRoster() | |
172 self.presence.available() | |
173 self.conn_deferred.callback(None) | |
174 | |
175 disco_d.addCallback(finish_connection) | |
176 | |
177 def initializationFailed(self, reason): | |
178 log.error(_(u"ERROR: XMPP connection failed for profile '%(profile)s': %(reason)s" % {'profile': self.profile, 'reason': reason})) | |
179 self.conn_deferred.errback(reason.value) | |
180 try: | 578 try: |
181 wokkel_client.XMPPClient.initializationFailed(self, reason) | 579 self.entry_plugin = host_app.plugins[entry_point] |
182 except: | 580 except KeyError: |
183 # we already chained an errback, no need to raise an exception | 581 raise exceptions.NotFound(_(u"The requested entry point ({entry_point}) is not available").format( |
184 pass | 582 entry_point = entry_point)) |
185 | 583 |
186 def isConnected(self): | 584 self.identities = [disco.DiscoIdentity(u"component", u"generic", C.APP_NAME)] |
187 return self.__connected | 585 # jid is set automatically on bind by Twisted for Client, but not for Component |
188 | 586 self.jid = component_jid |
189 def connectionLost(self, connector, unused_reason): | 587 if host is None: |
190 try: | 588 try: |
191 self.keep_alife.stop() | 589 host = component_jid.host.split(u'.', 1)[1] |
192 except AttributeError: | 590 except IndexError: |
193 log.debug(_("No keep_alife")) | 591 raise ValueError(u"Can't guess host from jid, please specify a host") |
194 if self.__connected: | 592 # XXX: component.Component expect unicode jid, while Client expect jid.JID. |
195 log.info(_("********** [%s] DISCONNECTED **********") % self.profile) | 593 # this is not consistent, so we use jid.JID for SatXMPP* |
196 self.host_app.bridge.disconnected(self.profile) # we send the signal to the clients | 594 component.Component.__init__(self, host, port, component_jid.full(), password) |
197 self.host_app.purgeClient(self.profile) # and we remove references to this client | 595 SatXMPPEntity.__init__(self, host_app, profile, max_retries) |
198 self.__connected = False | 596 |
597 def _buildDependencies(self, current, plugins, required=True): | |
598 """build recursively dependencies needed for a plugin | |
599 | |
600 this method build list of plugin needed for a component and raises | |
601 errors if they are not available or not allowed for components | |
602 @param current(object): parent plugin to check | |
603 use entry_point for first call | |
604 @param plugins(list): list of validated plugins, will be filled by the method | |
605 give an empty list for first call | |
606 @param required(bool): True if plugin is mandatory | |
607 for recursive calls only, should not be modified by inital caller | |
608 @raise InternalError: one of the plugin is not handling components | |
609 @raise KeyError: one plugin should be present in self.host_app.plugins but it is not | |
610 """ | |
611 if C.PLUG_MODE_COMPONENT not in current._info[u'modes']: | |
612 if not required: | |
613 return | |
614 else: | |
615 log.error(_(u"Plugin {current_name} if needed for {entry_name}, but it doesn't handle component mode").format( | |
616 current_name = current._info[u'import_name'], | |
617 entry_name = self.entry_plugin._info[u'import_name'] | |
618 )) | |
619 raise exceptions.InternalError(_(u"invalid plugin mode")) | |
620 | |
621 for import_name in current._info.get(C.PI_DEPENDENCIES, []): | |
622 # plugins are already loaded as dependencies | |
623 # so we know they are in self.host_app.plugins | |
624 dep = self.host_app.plugins[import_name] | |
625 self._checkDependencies(dep, plugins) | |
626 | |
627 for import_name in current._info.get(C.PI_RECOMMENDATIONS, []): | |
628 # here plugins are only recommendations, | |
629 # so they may not exist in self.host_app.plugins | |
630 try: | |
631 dep = self.host_app.plugins[import_name] | |
632 except KeyError: | |
633 continue | |
634 self._buildDependencies(dep, plugins, required = False) | |
635 | |
636 if current not in plugins: | |
637 # current can be required for several plugins and so | |
638 # it can already be present in the list | |
639 plugins.append(current) | |
640 | |
641 def _getPluginsList(self): | |
642 # XXX: for component we don't launch all plugins triggers | |
643 # but only the ones from which there is a dependency | |
644 plugins = [] | |
645 self._buildDependencies(self.entry_plugin, plugins) | |
646 return plugins | |
647 | |
648 def entityConnected(self): | |
649 # we can now launch entry point | |
650 return self.entry_plugin.componentStart(self) | |
651 | |
652 def addPostXmlCallbacks(self, post_xml_treatments): | |
653 if self.sendHistory: | |
654 post_xml_treatments.addCallback(self.messageAddToHistory) | |
199 | 655 |
200 | 656 |
201 class SatMessageProtocol(xmppim.MessageProtocol): | 657 class SatMessageProtocol(xmppim.MessageProtocol): |
202 | 658 |
203 def __init__(self, host): | 659 def __init__(self, host): |
430 """Return True if jid is in roster""" | 886 """Return True if jid is in roster""" |
431 return entity_jid in self._jids | 887 return entity_jid in self._jids |
432 | 888 |
433 def isPresenceAuthorised(self, entity_jid): | 889 def isPresenceAuthorised(self, entity_jid): |
434 """Return True if entity is authorised to see our presence""" | 890 """Return True if entity is authorised to see our presence""" |
435 import pudb | |
436 pudb.set_trace() | |
437 try: | 891 try: |
438 item = self._jids[entity_jid.userhostJID()] | 892 item = self._jids[entity_jid.userhostJID()] |
439 except KeyError: | 893 except KeyError: |
440 return False | 894 return False |
441 return item.subscriptionFrom | 895 return item.subscriptionFrom |
685 # ask for disco info, and not when we generate the key, so the hash is used with different | 1139 # ask for disco info, and not when we generate the key, so the hash is used with different |
686 # disco features, and when the server (seen on ejabberd) generate its own hash for security check | 1140 # disco features, and when the server (seen on ejabberd) generate its own hash for security check |
687 # it reject our features (resulting in e.g. no notification on PEP) | 1141 # it reject our features (resulting in e.g. no notification on PEP) |
688 return generic.VersionHandler.getDiscoInfo(self, requestor, target, None) | 1142 return generic.VersionHandler.getDiscoInfo(self, requestor, target, None) |
689 | 1143 |
1144 | |
690 class SatIdentityHandler(XMPPHandler): | 1145 class SatIdentityHandler(XMPPHandler): |
691 """ Manage disco Identity of SàT. Currently, we use "client/pc/Salut à Toi", but as | 1146 """ Manage disco Identity of SàT. |
692 SàT is multi-frontends and can be used on mobile devices, as a bot, with a web frontend, | 1147 |
693 etc, we should implement a way to dynamically update identities through the bridge """ | 1148 """ |
694 #TODO: dynamic identity update (see docstring). Note that a XMPP entity can have several identities | 1149 #TODO: dynamic identity update (see docstring). Note that a XMPP entity can have several identities |
695 implements(iwokkel.IDisco) | 1150 implements(iwokkel.IDisco) |
696 | 1151 |
697 def getDiscoInfo(self, requestor, target, nodeIdentifier=''): | 1152 def getDiscoInfo(self, requestor, target, nodeIdentifier=''): |
698 return [disco.DiscoIdentity(u"client", u"pc", C.APP_NAME)] | 1153 return self.parent.identities |
699 | 1154 |
700 def getDiscoItems(self, requestor, target, nodeIdentifier=''): | 1155 def getDiscoItems(self, requestor, target, nodeIdentifier=''): |
701 return [] | 1156 return [] |