comparison src/core/xmpp.py @ 1955:633b5c21aefd

backend, frontend: messages refactoring (huge commit, not finished): /!\ database schema has been modified, do a backup before updating message have been refactored, here are the main changes: - languages are now handled - all messages have an uid (internal to SàT) - message updating is anticipated - subject is now first class - new naming scheme is used newMessage => messageNew, getHistory => historyGet, sendMessage => messageSend - minimal compatibility refactoring in quick_frontend/Primitivus, better refactoring should follow - threads handling - delayed messages are saved into history - info messages may also be saved in history (e.g. to keep track of people joining/leaving a room) - duplicate messages should be avoided - historyGet return messages in right order, no need to sort again - plugins have been updated to follow new features, some of them need to be reworked (e.g. OTR) - XEP-0203 (Delayed Delivery) is now fully handled in core, the plugin just handle disco and creation of a delay element - /!\ jp and Libervia are currently broken, as some features of Primitivus It has been put in one huge commit to avoid breaking messaging between changes. This is the main part of message refactoring, other commits will follow to take profit of the new features/behaviour.
author Goffi <goffi@goffi.org>
date Tue, 24 May 2016 22:11:04 +0200
parents 2daf7b4c6756
children a2bc5089c2eb
comparison
equal deleted inserted replaced
1943:ccfe45302a5c 1955:633b5c21aefd
18 # along with this program. If not, see <http://www.gnu.org/licenses/>. 18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 19
20 from sat.core.i18n import _ 20 from sat.core.i18n import _
21 from sat.core.constants import Const as C 21 from sat.core.constants import Const as C
22 from twisted.internet import task, defer 22 from twisted.internet import task, defer
23 from twisted.words.protocols.jabber import jid, xmlstream 23 from twisted.words.protocols.jabber.xmlstream import XMPPHandler
24 from twisted.words.protocols.jabber import xmlstream
24 from twisted.words.protocols.jabber import error 25 from twisted.words.protocols.jabber import error
25 from wokkel import client, disco, xmppim, generic, delay, iwokkel 26 from twisted.words.protocols.jabber import jid
27 from twisted.python import failure
28 from wokkel import client, disco, xmppim, generic, iwokkel
29 from wokkel import delay
26 from sat.core.log import getLogger 30 from sat.core.log import getLogger
27 log = getLogger(__name__) 31 log = getLogger(__name__)
28 from sat.core import exceptions 32 from sat.core import exceptions
29 from zope.interface import implements 33 from zope.interface import implements
30 from twisted.words.protocols.jabber.xmlstream import XMPPHandler 34 import time
35 import calendar
36 import uuid
31 37
32 38
33 class SatXMPPClient(client.XMPPClient): 39 class SatXMPPClient(client.XMPPClient):
34 implements(iwokkel.IDisco) 40 implements(iwokkel.IDisco)
35 41
40 self.factory.clientConnectionLost = self.connectionLost 46 self.factory.clientConnectionLost = self.connectionLost
41 self.factory.maxRetries = max_retries 47 self.factory.maxRetries = max_retries
42 self.__connected = False 48 self.__connected = False
43 self.profile = profile 49 self.profile = profile
44 self.host_app = host_app 50 self.host_app = host_app
51 self._mess_id_uid = {} # map from message id to uid use in history. Key: (full_jid,message_id) Value: uid
45 self.conn_deferred = defer.Deferred() 52 self.conn_deferred = defer.Deferred()
46 self._waiting_conf = {} # callback called when a confirmation is received 53 self._waiting_conf = {} # callback called when a confirmation is received
47 self._progress_cb = {} # callback called when a progress is requested (key = progress id) 54 self._progress_cb = {} # callback called when a progress is requested (key = progress id)
48 self.actions = {} # used to keep track of actions for retrieval (key = action_id) 55 self.actions = {} # used to keep track of actions for retrieval (key = action_id)
49 56
129 136
130 def __init__(self, host): 137 def __init__(self, host):
131 xmppim.MessageProtocol.__init__(self) 138 xmppim.MessageProtocol.__init__(self)
132 self.host = host 139 self.host = host
133 140
134 def onMessage(self, message): 141 def onMessage(self, message_elt):
135 if not message.hasAttribute('from'): 142 # TODO: handle threads
136 message['from'] = self.parent.jid.host 143 client = self.parent
137 log.debug(_(u"got message from: %s") % message["from"]) 144 if not 'from' in message_elt.attributes:
145 message_elt['from'] = client.jid.host
146 log.debug(_(u"got message from: {from_}").format(from_=message_elt['from']))
138 post_treat = defer.Deferred() # XXX: plugin can add their treatments to this deferred 147 post_treat = defer.Deferred() # XXX: plugin can add their treatments to this deferred
139 148
140 if not self.host.trigger.point("MessageReceived", message, post_treat, profile=self.parent.profile): 149 if not self.host.trigger.point("MessageReceived", client, message_elt, post_treat):
141 return 150 return
142 151
143 data = {"from": message['from'], 152 message = {}
144 "to": message['to'], 153 subject = {}
145 "body": "", 154 extra = {}
146 "extra": {}} 155 data = {"from": message_elt['from'],
147 156 "to": message_elt['to'],
148 for e in message.elements(): 157 "uid": message_elt.getAttribute('uid', unicode(uuid.uuid4())), # XXX: uid is not a standard attribute but may be added by plugins
149 if e.name == "body": 158 "message": message,
150 data['body'] = e.children[0] if e.children else "" 159 "subject": subject,
151 elif e.name == "subject" and e.children: 160 "type": message_elt.getAttribute('type', 'normal'),
152 data['extra']['subject'] = e.children[0] 161 "extra": extra}
153 162
154 data['type'] = message['type'] if message.hasAttribute('type') else 'normal' 163 try:
164 data['stanza_id'] = message_elt['id']
165 except KeyError:
166 pass
167 else:
168 client._mess_id_uid[(data['from'], data['stanza_id'])] = data['uid']
169
170 # message
171 for e in message_elt.elements(C.NS_CLIENT, 'body'):
172 message[e.getAttribute('xml:lang','')] = unicode(e)
173
174 # subject
175 for e in message_elt.elements(C.NS_CLIENT, 'subject'):
176 subject[e.getAttribute('xml:lang','')] = unicode(e)
177
178 # delay and timestamp
179 try:
180 delay_elt = message_elt.elements(delay.NS_DELAY, 'delay').next()
181 except StopIteration:
182 data['timestamp'] = time.time()
183 else:
184 parsed_delay = delay.Delay.fromElement(delay_elt)
185 data['timestamp'] = calendar.timegm(parsed_delay.stamp.utctimetuple())
186 data['received_timestamp'] = unicode(time.time())
187 if parsed_delay.sender:
188 data['delay_sender'] = parsed_delay.sender.full()
189
190 def skipEmptyMessage(data):
191 if not data['message'] and not data['extra']:
192 raise failure.Failure(exceptions.CancelError())
193 return data
155 194
156 def bridgeSignal(data): 195 def bridgeSignal(data):
196 try:
197 data['extra']['received_timestamp'] = data['received_timestamp']
198 data['extra']['delay_sender'] = data['delay_sender']
199 except KeyError:
200 pass
157 if data is not None: 201 if data is not None:
158 self.host.bridge.newMessage(data['from'], data['body'], data['type'], data['to'], data['extra'], profile=self.parent.profile) 202 self.host.bridge.messageNew(data['uid'], data['timestamp'], data['from'].full(), data['to'].full(), data['message'], data['subject'], data['type'], data['extra'], profile=client.profile)
159 return data 203 return data
160 204
161 def addToHistory(data): 205 def addToHistory(data):
162 try: 206 data['from'] = jid.JID(data['from'])
163 timestamp = data['extra']['timestamp'] # timestamp added by XEP-0203 207 data['to'] = jid.JID(data['to'])
164 except KeyError: 208 self.host.memory.addToHistory(client, data)
165 self.host.memory.addToHistory(jid.JID(data['from']), jid.JID(data['to']), data['body'], data['type'], data['extra'], profile=self.parent.profile)
166 else:
167 if data['type'] != 'groupchat': # XXX: we don't save delayed messages in history for groupchats
168 #TODO: add delayed messages to history if they aren't already in it
169 data['extra']['archive'] = timestamp # FIXME: this "archive" is actually never used
170 self.host.memory.addToHistory(jid.JID(data['from']), jid.JID(data['to']), data['body'], data['type'], data['extra'], timestamp, profile=self.parent.profile)
171 return data 209 return data
172 210
173 def treatmentsEb(failure): 211 def treatmentsEb(failure_):
174 failure.trap(exceptions.SkipHistory) 212 failure_.trap(exceptions.SkipHistory)
175 return data 213 return data
176 214
177 def cancelErrorTrap(failure): 215 def cancelErrorTrap(failure_):
178 """A message sending can be cancelled by a plugin treatment""" 216 """A message sending can be cancelled by a plugin treatment"""
179 failure.trap(exceptions.CancelError) 217 failure_.trap(exceptions.CancelError)
180 218
219 post_treat.addCallback(skipEmptyMessage)
181 post_treat.addCallback(addToHistory) 220 post_treat.addCallback(addToHistory)
182 post_treat.addErrback(treatmentsEb) 221 post_treat.addErrback(treatmentsEb)
183 post_treat.addCallback(bridgeSignal) 222 post_treat.addCallback(bridgeSignal)
184 post_treat.addErrback(cancelErrorTrap) 223 post_treat.addErrback(cancelErrorTrap)
185 post_treat.callback(data) 224 post_treat.callback(data)
504 self.profile = profile 543 self.profile = profile
505 log.debug(_(u"Registration asked for %(user)s@%(host)s") % {'user': user_login, 'host': jabber_host}) 544 log.debug(_(u"Registration asked for %(user)s@%(host)s") % {'user': user_login, 'host': jabber_host})
506 545
507 def connectionMade(self): 546 def connectionMade(self):
508 log.debug(_(u"Connection made with %s" % self.jabber_host)) 547 log.debug(_(u"Connection made with %s" % self.jabber_host))
509 self.xmlstream.namespace = "jabber:client" 548 self.xmlstream.namespace = C.NS_CLIENT
510 self.xmlstream.sendHeader() 549 self.xmlstream.sendHeader()
511 550
512 iq = xmlstream.IQ(self.xmlstream, 'set') 551 iq = xmlstream.IQ(self.xmlstream, 'set')
513 iq["to"] = self.jabber_host 552 iq["to"] = self.jabber_host
514 query = iq.addElement(('jabber:iq:register', 'query')) 553 query = iq.addElement(('jabber:iq:register', 'query'))
524 563
525 def registrationAnswer(self, answer): 564 def registrationAnswer(self, answer):
526 log.debug(_(u"Registration answer: %s") % answer.toXml()) 565 log.debug(_(u"Registration answer: %s") % answer.toXml())
527 self.xmlstream.sendFooter() 566 self.xmlstream.sendFooter()
528 567
529 def registrationFailure(self, failure): 568 def registrationFailure(self, failure_):
530 log.info(_("Registration failure: %s") % unicode(failure.value)) 569 log.info(_("Registration failure: %s") % unicode(failure_.value))
531 self.xmlstream.sendFooter() 570 self.xmlstream.sendFooter()
532 raise failure.value 571 raise failure_.value
533 572
534 573
535 class SatVersionHandler(generic.VersionHandler): 574 class SatVersionHandler(generic.VersionHandler):
536 575
537 def getDiscoInfo(self, requestor, target, node): 576 def getDiscoInfo(self, requestor, target, node):