Mercurial > libervia-backend
comparison sat/core/xmpp.py @ 2891:6a0f42e9410a
core (xmpp): implemented roster versioning
author | Goffi <goffi@goffi.org> |
---|---|
date | Sun, 07 Apr 2019 18:47:17 +0200 |
parents | b06cb71079fa |
children | 82b781c46841 |
comparison
equal
deleted
inserted
replaced
2890:c652d079a9a1 | 2891:6a0f42e9410a |
---|---|
30 from twisted.python import failure | 30 from twisted.python import failure |
31 from wokkel import client as wokkel_client, disco, xmppim, generic, iwokkel | 31 from wokkel import client as wokkel_client, disco, xmppim, generic, iwokkel |
32 from wokkel import component | 32 from wokkel import component |
33 from wokkel import delay | 33 from wokkel import delay |
34 from sat.core.log import getLogger | 34 from sat.core.log import getLogger |
35 | |
36 log = getLogger(__name__) | |
37 from sat.core import exceptions | 35 from sat.core import exceptions |
38 from sat.memory import encryption | 36 from sat.memory import encryption |
37 from sat.memory import persistent | |
39 from sat.tools import xml_tools | 38 from sat.tools import xml_tools |
40 from zope.interface import implements | 39 from zope.interface import implements |
41 import time | 40 import time |
42 import calendar | 41 import calendar |
43 import uuid | 42 import uuid |
44 import sys | 43 import sys |
45 | 44 |
45 log = getLogger(__name__) | |
46 | |
46 | 47 |
47 NS_X_DATA = u"jabber:x:data" | 48 NS_X_DATA = u"jabber:x:data" |
48 NS_DISCO_INFO = u"http://jabber.org/protocol/disco#info" | 49 NS_DISCO_INFO = u"http://jabber.org/protocol/disco#info" |
49 NS_XML_ELEMENT = u"urn:xmpp:xml-element" | 50 NS_XML_ELEMENT = u"urn:xmpp:xml-element" |
51 NS_ROSTER_VER = u"urn:xmpp:features:rosterver" | |
52 # we use 2 "@" which is illegal in a jid, to be sure we are not mixing keys | |
53 # with roster jids | |
54 ROSTER_VER_KEY = u"@version@" | |
50 | 55 |
51 | 56 |
52 class SatXMPPEntity(object): | 57 class SatXMPPEntity(object): |
53 """Common code for Client and Component""" | 58 """Common code for Client and Component""" |
54 | 59 |
813 extra=extra, | 818 extra=extra, |
814 profile=self.profile, | 819 profile=self.profile, |
815 ) | 820 ) |
816 | 821 |
817 def _finish_connection(self, __): | 822 def _finish_connection(self, __): |
818 self.roster.requestRoster() | 823 d = self.roster.requestRoster() |
819 super(SatXMPPClient, self)._finish_connection(__) | 824 d.addCallback(lambda __: super(SatXMPPClient, self)._finish_connection(__)) |
820 | 825 |
821 | 826 |
822 class SatXMPPComponent(SatXMPPEntity, component.Component): | 827 class SatXMPPComponent(SatXMPPEntity, component.Component): |
823 """XMPP component | 828 """XMPP component |
824 | 829 |
1099 """A message sending can be cancelled by a plugin treatment""" | 1104 """A message sending can be cancelled by a plugin treatment""" |
1100 failure_.trap(exceptions.CancelError) | 1105 failure_.trap(exceptions.CancelError) |
1101 | 1106 |
1102 | 1107 |
1103 class SatRosterProtocol(xmppim.RosterClientProtocol): | 1108 class SatRosterProtocol(xmppim.RosterClientProtocol): |
1109 | |
1104 def __init__(self, host): | 1110 def __init__(self, host): |
1105 xmppim.RosterClientProtocol.__init__(self) | 1111 xmppim.RosterClientProtocol.__init__(self) |
1106 self.host = host | 1112 self.host = host |
1107 self.got_roster = defer.Deferred() # called when roster is received and ready | 1113 self.got_roster = defer.Deferred() # called when roster is received and ready |
1108 # XXX: the two following dicts keep a local copy of the roster | 1114 # XXX: the two following dicts keep a local copy of the roster |
1115 self._jids = {} # map from jids to RosterItem: key=jid value=RosterItem | |
1109 self._groups = {} # map from groups to jids: key=group value=set of jids | 1116 self._groups = {} # map from groups to jids: key=group value=set of jids |
1110 self._jids = None # map from jids to RosterItem: key=jid value=RosterItem | 1117 |
1111 | 1118 @property |
1112 def rosterCb(self, roster): | 1119 def versioning(self): |
1113 assert roster is not None # FIXME: must be managed with roster versioning | 1120 """True if server support roster versioning""" |
1114 self._groups.clear() | 1121 return (NS_ROSTER_VER, u'ver') in self.parent.xmlstream.features |
1115 self._jids = roster | 1122 |
1116 for item in roster.itervalues(): | 1123 @property |
1117 if not item.subscriptionTo and not item.subscriptionFrom and not item.ask: | 1124 def roster_cache(self): |
1118 # XXX: current behaviour: we don't want contact in our roster list | 1125 """Cache of roster from storage |
1119 # if there is no presence subscription | 1126 |
1120 # may change in the future | 1127 This property return a new PersistentDict on each call, it must be loaded |
1121 log.info( | 1128 manually if necessary |
1122 u"Removing contact {} from roster because there is no presence " | 1129 """ |
1123 u"subscription".format( | 1130 return persistent.PersistentDict(NS_ROSTER_VER, self.parent.profile) |
1124 item.jid | |
1125 ) | |
1126 ) | |
1127 self.removeItem(item.entity) # FIXME: to be checked | |
1128 else: | |
1129 self._registerItem(item) | |
1130 | 1131 |
1131 def _registerItem(self, item): | 1132 def _registerItem(self, item): |
1132 """Register item in local cache | 1133 """Register item in local cache |
1133 | 1134 |
1134 item must be already registered in self._jids before this method is called | 1135 item must be already registered in self._jids before this method is called |
1153 log.info(_(u"[{}] is not subscribed to you!").format(item.entity.full())) | 1154 log.info(_(u"[{}] is not subscribed to you!").format(item.entity.full())) |
1154 | 1155 |
1155 for group in item.groups: | 1156 for group in item.groups: |
1156 self._groups.setdefault(group, set()).add(item.entity) | 1157 self._groups.setdefault(group, set()).add(item.entity) |
1157 | 1158 |
1159 @defer.inlineCallbacks | |
1160 def _cacheRoster(self, version): | |
1161 """Serialise local roster and save it to storage | |
1162 | |
1163 @param version(unicode): version of roster in local cache | |
1164 """ | |
1165 roster_cache = self.roster_cache | |
1166 yield roster_cache.clear() | |
1167 roster_cache[ROSTER_VER_KEY] = version | |
1168 for roster_jid, roster_item in self._jids.iteritems(): | |
1169 roster_jid_s = roster_jid.full() | |
1170 roster_item_elt = roster_item.toElement().toXml() | |
1171 roster_cache[roster_jid_s] = roster_item_elt | |
1172 | |
1173 @defer.inlineCallbacks | |
1158 def requestRoster(self): | 1174 def requestRoster(self): |
1159 """ ask the server for Roster list """ | 1175 """Ask the server for Roster list """ |
1160 log.debug("requestRoster") | 1176 if self.versioning: |
1161 d = self.getRoster().addCallback(self.rosterCb) | 1177 log.info(_(u"our server support roster versioning, we use it")) |
1162 d.chainDeferred(self.got_roster) | 1178 roster_cache = self.roster_cache |
1179 yield roster_cache.load() | |
1180 try: | |
1181 version = roster_cache[ROSTER_VER_KEY] | |
1182 except KeyError: | |
1183 log.info(_(u"no roster in cache, we start fresh")) | |
1184 # u"" means we use versioning without valid roster in cache | |
1185 version = u"" | |
1186 else: | |
1187 log.info(_(u"We have roster v{version} in cache").format(version=version)) | |
1188 # we deserialise cached roster to our local cache | |
1189 for roster_jid_s, roster_item_elt_s in roster_cache.iteritems(): | |
1190 if roster_jid_s == ROSTER_VER_KEY: | |
1191 continue | |
1192 roster_jid = jid.JID(roster_jid_s) | |
1193 roster_item_elt = generic.parseXml(roster_item_elt_s.encode('utf-8')) | |
1194 roster_item = xmppim.RosterItem.fromElement(roster_item_elt) | |
1195 self._jids[roster_jid] = roster_item | |
1196 self._registerItem(roster_item) | |
1197 else: | |
1198 log.warning(_(u"our server doesn't support roster versioning")) | |
1199 version = None | |
1200 | |
1201 log.debug("requesting roster") | |
1202 roster = yield self.getRoster(version=version) | |
1203 if roster is None: | |
1204 log.debug(u"empty roster result received, we'll get roster item with roster " | |
1205 u"pushes") | |
1206 else: | |
1207 # a full roster is received | |
1208 self._groups.clear() | |
1209 self._jids = roster | |
1210 for item in roster.itervalues(): | |
1211 if not item.subscriptionTo and not item.subscriptionFrom and not item.ask: | |
1212 # XXX: current behaviour: we don't want contact in our roster list | |
1213 # if there is no presence subscription | |
1214 # may change in the future | |
1215 log.info( | |
1216 u"Removing contact {} from roster because there is no presence " | |
1217 u"subscription".format( | |
1218 item.jid | |
1219 ) | |
1220 ) | |
1221 self.removeItem(item.entity) # FIXME: to be checked | |
1222 else: | |
1223 self._registerItem(item) | |
1224 yield self._cacheRoster(roster.version) | |
1225 | |
1226 if not self.got_roster.called: | |
1227 # got_roster may already be called if we use resync() | |
1228 self.got_roster.callback(None) | |
1163 | 1229 |
1164 def removeItem(self, to_jid): | 1230 def removeItem(self, to_jid): |
1165 """Remove a contact from roster list | 1231 """Remove a contact from roster list |
1166 @param to_jid: a JID instance | 1232 @param to_jid: a JID instance |
1167 @return: Deferred | 1233 @return: Deferred |
1182 if item.name: | 1248 if item.name: |
1183 item_attr["name"] = item.name | 1249 item_attr["name"] = item.name |
1184 return item_attr | 1250 return item_attr |
1185 | 1251 |
1186 def setReceived(self, request): | 1252 def setReceived(self, request): |
1187 # TODO: implement roster versioning (cf RFC 6121 ยง2.6) | |
1188 item = request.item | 1253 item = request.item |
1254 entity = item.entity | |
1255 log.info(_(u"adding {entity} to roster").format(entity=entity.full())) | |
1256 if request.version is not None: | |
1257 # we update the cache in storage | |
1258 roster_cache = self.roster_cache | |
1259 roster_cache[ROSTER_VER_KEY] = request.version | |
1260 roster_cache[entity.full()] = item.toElement().toXml() | |
1261 | |
1189 try: # update the cache for the groups the contact has been removed from | 1262 try: # update the cache for the groups the contact has been removed from |
1190 left_groups = set(self._jids[item.entity].groups).difference(item.groups) | 1263 left_groups = set(self._jids[entity].groups).difference(item.groups) |
1191 for group in left_groups: | 1264 for group in left_groups: |
1192 jids_set = self._groups[group] | 1265 jids_set = self._groups[group] |
1193 jids_set.remove(item.entity) | 1266 jids_set.remove(entity) |
1194 if not jids_set: | 1267 if not jids_set: |
1195 del self._groups[group] | 1268 del self._groups[group] |
1196 except KeyError: | 1269 except KeyError: |
1197 pass # no previous item registration (or it's been cleared) | 1270 pass # no previous item registration (or it's been cleared) |
1198 self._jids[item.entity] = item | 1271 self._jids[entity] = item |
1199 self._registerItem(item) | 1272 self._registerItem(item) |
1200 self.host.bridge.newContact( | 1273 self.host.bridge.newContact( |
1201 item.entity.full(), self.getAttributes(item), item.groups, self.parent.profile | 1274 entity.full(), self.getAttributes(item), item.groups, self.parent.profile |
1202 ) | 1275 ) |
1203 | 1276 |
1204 def removeReceived(self, request): | 1277 def removeReceived(self, request): |
1205 entity = request.item.entity | 1278 entity = request.item.entity |
1206 log.info(u"removing %s from roster list" % entity.full()) | 1279 log.info(_(u"removing {entity} from roster").format(entity=entity.full())) |
1280 if request.version is not None: | |
1281 # we update the cache in storage | |
1282 roster_cache = self.roster_cache | |
1283 roster_cache[ROSTER_VER_KEY] = request.version | |
1284 del roster_cache[request.item.entity.full()] | |
1207 | 1285 |
1208 # we first remove item from local cache (self._groups and self._jids) | 1286 # we first remove item from local cache (self._groups and self._jids) |
1209 try: | 1287 try: |
1210 item = self._jids.pop(entity) | 1288 item = self._jids.pop(entity) |
1211 except KeyError: | 1289 except KeyError: |