comparison sat/core/xmpp.py @ 2891:6a0f42e9410a

core (xmpp): implemented roster versioning
author Goffi <goffi@goffi.org>
date Sun, 07 Apr 2019 18:47:17 +0200
parents b06cb71079fa
children 82b781c46841
comparison
equal deleted inserted replaced
2890:c652d079a9a1 2891:6a0f42e9410a
30 from twisted.python import failure 30 from twisted.python import failure
31 from wokkel import client as wokkel_client, disco, xmppim, generic, iwokkel 31 from wokkel import client as wokkel_client, disco, xmppim, generic, iwokkel
32 from wokkel import component 32 from wokkel import component
33 from wokkel import delay 33 from wokkel import delay
34 from sat.core.log import getLogger 34 from sat.core.log import getLogger
35
36 log = getLogger(__name__)
37 from sat.core import exceptions 35 from sat.core import exceptions
38 from sat.memory import encryption 36 from sat.memory import encryption
37 from sat.memory import persistent
39 from sat.tools import xml_tools 38 from sat.tools import xml_tools
40 from zope.interface import implements 39 from zope.interface import implements
41 import time 40 import time
42 import calendar 41 import calendar
43 import uuid 42 import uuid
44 import sys 43 import sys
45 44
45 log = getLogger(__name__)
46
46 47
47 NS_X_DATA = u"jabber:x:data" 48 NS_X_DATA = u"jabber:x:data"
48 NS_DISCO_INFO = u"http://jabber.org/protocol/disco#info" 49 NS_DISCO_INFO = u"http://jabber.org/protocol/disco#info"
49 NS_XML_ELEMENT = u"urn:xmpp:xml-element" 50 NS_XML_ELEMENT = u"urn:xmpp:xml-element"
51 NS_ROSTER_VER = u"urn:xmpp:features:rosterver"
52 # we use 2 "@" which is illegal in a jid, to be sure we are not mixing keys
53 # with roster jids
54 ROSTER_VER_KEY = u"@version@"
50 55
51 56
52 class SatXMPPEntity(object): 57 class SatXMPPEntity(object):
53 """Common code for Client and Component""" 58 """Common code for Client and Component"""
54 59
813 extra=extra, 818 extra=extra,
814 profile=self.profile, 819 profile=self.profile,
815 ) 820 )
816 821
817 def _finish_connection(self, __): 822 def _finish_connection(self, __):
818 self.roster.requestRoster() 823 d = self.roster.requestRoster()
819 super(SatXMPPClient, self)._finish_connection(__) 824 d.addCallback(lambda __: super(SatXMPPClient, self)._finish_connection(__))
820 825
821 826
822 class SatXMPPComponent(SatXMPPEntity, component.Component): 827 class SatXMPPComponent(SatXMPPEntity, component.Component):
823 """XMPP component 828 """XMPP component
824 829
1099 """A message sending can be cancelled by a plugin treatment""" 1104 """A message sending can be cancelled by a plugin treatment"""
1100 failure_.trap(exceptions.CancelError) 1105 failure_.trap(exceptions.CancelError)
1101 1106
1102 1107
1103 class SatRosterProtocol(xmppim.RosterClientProtocol): 1108 class SatRosterProtocol(xmppim.RosterClientProtocol):
1109
1104 def __init__(self, host): 1110 def __init__(self, host):
1105 xmppim.RosterClientProtocol.__init__(self) 1111 xmppim.RosterClientProtocol.__init__(self)
1106 self.host = host 1112 self.host = host
1107 self.got_roster = defer.Deferred() # called when roster is received and ready 1113 self.got_roster = defer.Deferred() # called when roster is received and ready
1108 # XXX: the two following dicts keep a local copy of the roster 1114 # XXX: the two following dicts keep a local copy of the roster
1115 self._jids = {} # map from jids to RosterItem: key=jid value=RosterItem
1109 self._groups = {} # map from groups to jids: key=group value=set of jids 1116 self._groups = {} # map from groups to jids: key=group value=set of jids
1110 self._jids = None # map from jids to RosterItem: key=jid value=RosterItem 1117
1111 1118 @property
1112 def rosterCb(self, roster): 1119 def versioning(self):
1113 assert roster is not None # FIXME: must be managed with roster versioning 1120 """True if server support roster versioning"""
1114 self._groups.clear() 1121 return (NS_ROSTER_VER, u'ver') in self.parent.xmlstream.features
1115 self._jids = roster 1122
1116 for item in roster.itervalues(): 1123 @property
1117 if not item.subscriptionTo and not item.subscriptionFrom and not item.ask: 1124 def roster_cache(self):
1118 # XXX: current behaviour: we don't want contact in our roster list 1125 """Cache of roster from storage
1119 # if there is no presence subscription 1126
1120 # may change in the future 1127 This property return a new PersistentDict on each call, it must be loaded
1121 log.info( 1128 manually if necessary
1122 u"Removing contact {} from roster because there is no presence " 1129 """
1123 u"subscription".format( 1130 return persistent.PersistentDict(NS_ROSTER_VER, self.parent.profile)
1124 item.jid
1125 )
1126 )
1127 self.removeItem(item.entity) # FIXME: to be checked
1128 else:
1129 self._registerItem(item)
1130 1131
1131 def _registerItem(self, item): 1132 def _registerItem(self, item):
1132 """Register item in local cache 1133 """Register item in local cache
1133 1134
1134 item must be already registered in self._jids before this method is called 1135 item must be already registered in self._jids before this method is called
1153 log.info(_(u"[{}] is not subscribed to you!").format(item.entity.full())) 1154 log.info(_(u"[{}] is not subscribed to you!").format(item.entity.full()))
1154 1155
1155 for group in item.groups: 1156 for group in item.groups:
1156 self._groups.setdefault(group, set()).add(item.entity) 1157 self._groups.setdefault(group, set()).add(item.entity)
1157 1158
1159 @defer.inlineCallbacks
1160 def _cacheRoster(self, version):
1161 """Serialise local roster and save it to storage
1162
1163 @param version(unicode): version of roster in local cache
1164 """
1165 roster_cache = self.roster_cache
1166 yield roster_cache.clear()
1167 roster_cache[ROSTER_VER_KEY] = version
1168 for roster_jid, roster_item in self._jids.iteritems():
1169 roster_jid_s = roster_jid.full()
1170 roster_item_elt = roster_item.toElement().toXml()
1171 roster_cache[roster_jid_s] = roster_item_elt
1172
1173 @defer.inlineCallbacks
1158 def requestRoster(self): 1174 def requestRoster(self):
1159 """ ask the server for Roster list """ 1175 """Ask the server for Roster list """
1160 log.debug("requestRoster") 1176 if self.versioning:
1161 d = self.getRoster().addCallback(self.rosterCb) 1177 log.info(_(u"our server support roster versioning, we use it"))
1162 d.chainDeferred(self.got_roster) 1178 roster_cache = self.roster_cache
1179 yield roster_cache.load()
1180 try:
1181 version = roster_cache[ROSTER_VER_KEY]
1182 except KeyError:
1183 log.info(_(u"no roster in cache, we start fresh"))
1184 # u"" means we use versioning without valid roster in cache
1185 version = u""
1186 else:
1187 log.info(_(u"We have roster v{version} in cache").format(version=version))
1188 # we deserialise cached roster to our local cache
1189 for roster_jid_s, roster_item_elt_s in roster_cache.iteritems():
1190 if roster_jid_s == ROSTER_VER_KEY:
1191 continue
1192 roster_jid = jid.JID(roster_jid_s)
1193 roster_item_elt = generic.parseXml(roster_item_elt_s.encode('utf-8'))
1194 roster_item = xmppim.RosterItem.fromElement(roster_item_elt)
1195 self._jids[roster_jid] = roster_item
1196 self._registerItem(roster_item)
1197 else:
1198 log.warning(_(u"our server doesn't support roster versioning"))
1199 version = None
1200
1201 log.debug("requesting roster")
1202 roster = yield self.getRoster(version=version)
1203 if roster is None:
1204 log.debug(u"empty roster result received, we'll get roster item with roster "
1205 u"pushes")
1206 else:
1207 # a full roster is received
1208 self._groups.clear()
1209 self._jids = roster
1210 for item in roster.itervalues():
1211 if not item.subscriptionTo and not item.subscriptionFrom and not item.ask:
1212 # XXX: current behaviour: we don't want contact in our roster list
1213 # if there is no presence subscription
1214 # may change in the future
1215 log.info(
1216 u"Removing contact {} from roster because there is no presence "
1217 u"subscription".format(
1218 item.jid
1219 )
1220 )
1221 self.removeItem(item.entity) # FIXME: to be checked
1222 else:
1223 self._registerItem(item)
1224 yield self._cacheRoster(roster.version)
1225
1226 if not self.got_roster.called:
1227 # got_roster may already be called if we use resync()
1228 self.got_roster.callback(None)
1163 1229
1164 def removeItem(self, to_jid): 1230 def removeItem(self, to_jid):
1165 """Remove a contact from roster list 1231 """Remove a contact from roster list
1166 @param to_jid: a JID instance 1232 @param to_jid: a JID instance
1167 @return: Deferred 1233 @return: Deferred
1182 if item.name: 1248 if item.name:
1183 item_attr["name"] = item.name 1249 item_attr["name"] = item.name
1184 return item_attr 1250 return item_attr
1185 1251
1186 def setReceived(self, request): 1252 def setReceived(self, request):
1187 # TODO: implement roster versioning (cf RFC 6121 ยง2.6)
1188 item = request.item 1253 item = request.item
1254 entity = item.entity
1255 log.info(_(u"adding {entity} to roster").format(entity=entity.full()))
1256 if request.version is not None:
1257 # we update the cache in storage
1258 roster_cache = self.roster_cache
1259 roster_cache[ROSTER_VER_KEY] = request.version
1260 roster_cache[entity.full()] = item.toElement().toXml()
1261
1189 try: # update the cache for the groups the contact has been removed from 1262 try: # update the cache for the groups the contact has been removed from
1190 left_groups = set(self._jids[item.entity].groups).difference(item.groups) 1263 left_groups = set(self._jids[entity].groups).difference(item.groups)
1191 for group in left_groups: 1264 for group in left_groups:
1192 jids_set = self._groups[group] 1265 jids_set = self._groups[group]
1193 jids_set.remove(item.entity) 1266 jids_set.remove(entity)
1194 if not jids_set: 1267 if not jids_set:
1195 del self._groups[group] 1268 del self._groups[group]
1196 except KeyError: 1269 except KeyError:
1197 pass # no previous item registration (or it's been cleared) 1270 pass # no previous item registration (or it's been cleared)
1198 self._jids[item.entity] = item 1271 self._jids[entity] = item
1199 self._registerItem(item) 1272 self._registerItem(item)
1200 self.host.bridge.newContact( 1273 self.host.bridge.newContact(
1201 item.entity.full(), self.getAttributes(item), item.groups, self.parent.profile 1274 entity.full(), self.getAttributes(item), item.groups, self.parent.profile
1202 ) 1275 )
1203 1276
1204 def removeReceived(self, request): 1277 def removeReceived(self, request):
1205 entity = request.item.entity 1278 entity = request.item.entity
1206 log.info(u"removing %s from roster list" % entity.full()) 1279 log.info(_(u"removing {entity} from roster").format(entity=entity.full()))
1280 if request.version is not None:
1281 # we update the cache in storage
1282 roster_cache = self.roster_cache
1283 roster_cache[ROSTER_VER_KEY] = request.version
1284 del roster_cache[request.item.entity.full()]
1207 1285
1208 # we first remove item from local cache (self._groups and self._jids) 1286 # we first remove item from local cache (self._groups and self._jids)
1209 try: 1287 try:
1210 item = self._jids.pop(entity) 1288 item = self._jids.pop(entity)
1211 except KeyError: 1289 except KeyError: