Mercurial > libervia-backend
comparison src/core/xmpp.py @ 1955:633b5c21aefd
backend, frontend: messages refactoring (huge commit, not finished):
/!\ database schema has been modified, do a backup before updating
message have been refactored, here are the main changes:
- languages are now handled
- all messages have an uid (internal to SàT)
- message updating is anticipated
- subject is now first class
- new naming scheme is used newMessage => messageNew, getHistory => historyGet, sendMessage => messageSend
- minimal compatibility refactoring in quick_frontend/Primitivus, better refactoring should follow
- threads handling
- delayed messages are saved into history
- info messages may also be saved in history (e.g. to keep track of people joining/leaving a room)
- duplicate messages should be avoided
- historyGet return messages in right order, no need to sort again
- plugins have been updated to follow new features, some of them need to be reworked (e.g. OTR)
- XEP-0203 (Delayed Delivery) is now fully handled in core, the plugin just handle disco and creation of a delay element
- /!\ jp and Libervia are currently broken, as some features of Primitivus
It has been put in one huge commit to avoid breaking messaging between changes.
This is the main part of message refactoring, other commits will follow to take profit of the new features/behaviour.
author | Goffi <goffi@goffi.org> |
---|---|
date | Tue, 24 May 2016 22:11:04 +0200 |
parents | 2daf7b4c6756 |
children | a2bc5089c2eb |
comparison
equal
deleted
inserted
replaced
1943:ccfe45302a5c | 1955:633b5c21aefd |
---|---|
18 # along with this program. If not, see <http://www.gnu.org/licenses/>. | 18 # along with this program. If not, see <http://www.gnu.org/licenses/>. |
19 | 19 |
20 from sat.core.i18n import _ | 20 from sat.core.i18n import _ |
21 from sat.core.constants import Const as C | 21 from sat.core.constants import Const as C |
22 from twisted.internet import task, defer | 22 from twisted.internet import task, defer |
23 from twisted.words.protocols.jabber import jid, xmlstream | 23 from twisted.words.protocols.jabber.xmlstream import XMPPHandler |
24 from twisted.words.protocols.jabber import xmlstream | |
24 from twisted.words.protocols.jabber import error | 25 from twisted.words.protocols.jabber import error |
25 from wokkel import client, disco, xmppim, generic, delay, iwokkel | 26 from twisted.words.protocols.jabber import jid |
27 from twisted.python import failure | |
28 from wokkel import client, disco, xmppim, generic, iwokkel | |
29 from wokkel import delay | |
26 from sat.core.log import getLogger | 30 from sat.core.log import getLogger |
27 log = getLogger(__name__) | 31 log = getLogger(__name__) |
28 from sat.core import exceptions | 32 from sat.core import exceptions |
29 from zope.interface import implements | 33 from zope.interface import implements |
30 from twisted.words.protocols.jabber.xmlstream import XMPPHandler | 34 import time |
35 import calendar | |
36 import uuid | |
31 | 37 |
32 | 38 |
33 class SatXMPPClient(client.XMPPClient): | 39 class SatXMPPClient(client.XMPPClient): |
34 implements(iwokkel.IDisco) | 40 implements(iwokkel.IDisco) |
35 | 41 |
40 self.factory.clientConnectionLost = self.connectionLost | 46 self.factory.clientConnectionLost = self.connectionLost |
41 self.factory.maxRetries = max_retries | 47 self.factory.maxRetries = max_retries |
42 self.__connected = False | 48 self.__connected = False |
43 self.profile = profile | 49 self.profile = profile |
44 self.host_app = host_app | 50 self.host_app = host_app |
51 self._mess_id_uid = {} # map from message id to uid use in history. Key: (full_jid,message_id) Value: uid | |
45 self.conn_deferred = defer.Deferred() | 52 self.conn_deferred = defer.Deferred() |
46 self._waiting_conf = {} # callback called when a confirmation is received | 53 self._waiting_conf = {} # callback called when a confirmation is received |
47 self._progress_cb = {} # callback called when a progress is requested (key = progress id) | 54 self._progress_cb = {} # callback called when a progress is requested (key = progress id) |
48 self.actions = {} # used to keep track of actions for retrieval (key = action_id) | 55 self.actions = {} # used to keep track of actions for retrieval (key = action_id) |
49 | 56 |
129 | 136 |
130 def __init__(self, host): | 137 def __init__(self, host): |
131 xmppim.MessageProtocol.__init__(self) | 138 xmppim.MessageProtocol.__init__(self) |
132 self.host = host | 139 self.host = host |
133 | 140 |
134 def onMessage(self, message): | 141 def onMessage(self, message_elt): |
135 if not message.hasAttribute('from'): | 142 # TODO: handle threads |
136 message['from'] = self.parent.jid.host | 143 client = self.parent |
137 log.debug(_(u"got message from: %s") % message["from"]) | 144 if not 'from' in message_elt.attributes: |
145 message_elt['from'] = client.jid.host | |
146 log.debug(_(u"got message from: {from_}").format(from_=message_elt['from'])) | |
138 post_treat = defer.Deferred() # XXX: plugin can add their treatments to this deferred | 147 post_treat = defer.Deferred() # XXX: plugin can add their treatments to this deferred |
139 | 148 |
140 if not self.host.trigger.point("MessageReceived", message, post_treat, profile=self.parent.profile): | 149 if not self.host.trigger.point("MessageReceived", client, message_elt, post_treat): |
141 return | 150 return |
142 | 151 |
143 data = {"from": message['from'], | 152 message = {} |
144 "to": message['to'], | 153 subject = {} |
145 "body": "", | 154 extra = {} |
146 "extra": {}} | 155 data = {"from": message_elt['from'], |
147 | 156 "to": message_elt['to'], |
148 for e in message.elements(): | 157 "uid": message_elt.getAttribute('uid', unicode(uuid.uuid4())), # XXX: uid is not a standard attribute but may be added by plugins |
149 if e.name == "body": | 158 "message": message, |
150 data['body'] = e.children[0] if e.children else "" | 159 "subject": subject, |
151 elif e.name == "subject" and e.children: | 160 "type": message_elt.getAttribute('type', 'normal'), |
152 data['extra']['subject'] = e.children[0] | 161 "extra": extra} |
153 | 162 |
154 data['type'] = message['type'] if message.hasAttribute('type') else 'normal' | 163 try: |
164 data['stanza_id'] = message_elt['id'] | |
165 except KeyError: | |
166 pass | |
167 else: | |
168 client._mess_id_uid[(data['from'], data['stanza_id'])] = data['uid'] | |
169 | |
170 # message | |
171 for e in message_elt.elements(C.NS_CLIENT, 'body'): | |
172 message[e.getAttribute('xml:lang','')] = unicode(e) | |
173 | |
174 # subject | |
175 for e in message_elt.elements(C.NS_CLIENT, 'subject'): | |
176 subject[e.getAttribute('xml:lang','')] = unicode(e) | |
177 | |
178 # delay and timestamp | |
179 try: | |
180 delay_elt = message_elt.elements(delay.NS_DELAY, 'delay').next() | |
181 except StopIteration: | |
182 data['timestamp'] = time.time() | |
183 else: | |
184 parsed_delay = delay.Delay.fromElement(delay_elt) | |
185 data['timestamp'] = calendar.timegm(parsed_delay.stamp.utctimetuple()) | |
186 data['received_timestamp'] = unicode(time.time()) | |
187 if parsed_delay.sender: | |
188 data['delay_sender'] = parsed_delay.sender.full() | |
189 | |
190 def skipEmptyMessage(data): | |
191 if not data['message'] and not data['extra']: | |
192 raise failure.Failure(exceptions.CancelError()) | |
193 return data | |
155 | 194 |
156 def bridgeSignal(data): | 195 def bridgeSignal(data): |
196 try: | |
197 data['extra']['received_timestamp'] = data['received_timestamp'] | |
198 data['extra']['delay_sender'] = data['delay_sender'] | |
199 except KeyError: | |
200 pass | |
157 if data is not None: | 201 if data is not None: |
158 self.host.bridge.newMessage(data['from'], data['body'], data['type'], data['to'], data['extra'], profile=self.parent.profile) | 202 self.host.bridge.messageNew(data['uid'], data['timestamp'], data['from'].full(), data['to'].full(), data['message'], data['subject'], data['type'], data['extra'], profile=client.profile) |
159 return data | 203 return data |
160 | 204 |
161 def addToHistory(data): | 205 def addToHistory(data): |
162 try: | 206 data['from'] = jid.JID(data['from']) |
163 timestamp = data['extra']['timestamp'] # timestamp added by XEP-0203 | 207 data['to'] = jid.JID(data['to']) |
164 except KeyError: | 208 self.host.memory.addToHistory(client, data) |
165 self.host.memory.addToHistory(jid.JID(data['from']), jid.JID(data['to']), data['body'], data['type'], data['extra'], profile=self.parent.profile) | |
166 else: | |
167 if data['type'] != 'groupchat': # XXX: we don't save delayed messages in history for groupchats | |
168 #TODO: add delayed messages to history if they aren't already in it | |
169 data['extra']['archive'] = timestamp # FIXME: this "archive" is actually never used | |
170 self.host.memory.addToHistory(jid.JID(data['from']), jid.JID(data['to']), data['body'], data['type'], data['extra'], timestamp, profile=self.parent.profile) | |
171 return data | 209 return data |
172 | 210 |
173 def treatmentsEb(failure): | 211 def treatmentsEb(failure_): |
174 failure.trap(exceptions.SkipHistory) | 212 failure_.trap(exceptions.SkipHistory) |
175 return data | 213 return data |
176 | 214 |
177 def cancelErrorTrap(failure): | 215 def cancelErrorTrap(failure_): |
178 """A message sending can be cancelled by a plugin treatment""" | 216 """A message sending can be cancelled by a plugin treatment""" |
179 failure.trap(exceptions.CancelError) | 217 failure_.trap(exceptions.CancelError) |
180 | 218 |
219 post_treat.addCallback(skipEmptyMessage) | |
181 post_treat.addCallback(addToHistory) | 220 post_treat.addCallback(addToHistory) |
182 post_treat.addErrback(treatmentsEb) | 221 post_treat.addErrback(treatmentsEb) |
183 post_treat.addCallback(bridgeSignal) | 222 post_treat.addCallback(bridgeSignal) |
184 post_treat.addErrback(cancelErrorTrap) | 223 post_treat.addErrback(cancelErrorTrap) |
185 post_treat.callback(data) | 224 post_treat.callback(data) |
504 self.profile = profile | 543 self.profile = profile |
505 log.debug(_(u"Registration asked for %(user)s@%(host)s") % {'user': user_login, 'host': jabber_host}) | 544 log.debug(_(u"Registration asked for %(user)s@%(host)s") % {'user': user_login, 'host': jabber_host}) |
506 | 545 |
507 def connectionMade(self): | 546 def connectionMade(self): |
508 log.debug(_(u"Connection made with %s" % self.jabber_host)) | 547 log.debug(_(u"Connection made with %s" % self.jabber_host)) |
509 self.xmlstream.namespace = "jabber:client" | 548 self.xmlstream.namespace = C.NS_CLIENT |
510 self.xmlstream.sendHeader() | 549 self.xmlstream.sendHeader() |
511 | 550 |
512 iq = xmlstream.IQ(self.xmlstream, 'set') | 551 iq = xmlstream.IQ(self.xmlstream, 'set') |
513 iq["to"] = self.jabber_host | 552 iq["to"] = self.jabber_host |
514 query = iq.addElement(('jabber:iq:register', 'query')) | 553 query = iq.addElement(('jabber:iq:register', 'query')) |
524 | 563 |
525 def registrationAnswer(self, answer): | 564 def registrationAnswer(self, answer): |
526 log.debug(_(u"Registration answer: %s") % answer.toXml()) | 565 log.debug(_(u"Registration answer: %s") % answer.toXml()) |
527 self.xmlstream.sendFooter() | 566 self.xmlstream.sendFooter() |
528 | 567 |
529 def registrationFailure(self, failure): | 568 def registrationFailure(self, failure_): |
530 log.info(_("Registration failure: %s") % unicode(failure.value)) | 569 log.info(_("Registration failure: %s") % unicode(failure_.value)) |
531 self.xmlstream.sendFooter() | 570 self.xmlstream.sendFooter() |
532 raise failure.value | 571 raise failure_.value |
533 | 572 |
534 | 573 |
535 class SatVersionHandler(generic.VersionHandler): | 574 class SatVersionHandler(generic.VersionHandler): |
536 | 575 |
537 def getDiscoInfo(self, requestor, target, node): | 576 def getDiscoInfo(self, requestor, target, node): |