comparison sat_pubsub/privilege.py @ 463:f520ac3164b0

privilege: improvment on last message sending on presence with `+notify`: - local entities subscribed to the presence of an other local entity which is connecting are now added to presence map. This helps getting their notification even if they didn't connect recently - nodes with `presence` access model are now also used for `+notify` - notifications are not sent anymore in case of status change if the resource was already present.
author Goffi <goffi@goffi.org>
date Fri, 15 Oct 2021 13:40:56 +0200
parents a017af61a32b
children d86e0f8a1405
comparison
equal deleted inserted replaced
462:a017af61a32b 463:f520ac3164b0
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. 17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 18
19 "This module implements XEP-0356 (Privileged Entity) to manage rosters, messages and " 19 "This module implements XEP-0356 (Privileged Entity) to manage rosters, messages and "
20 "presences" 20 "presences"
21 21
22 # This module implements XEP-0356 (Privileged Entity) to manage rosters, messages and presences 22 import time
23 23 from typing import List, Set
24 from wokkel import xmppim 24 from wokkel import xmppim
25 from wokkel.compat import IQ 25 from wokkel.compat import IQ
26 from wokkel import pubsub 26 from wokkel import pubsub
27 from wokkel import disco 27 from wokkel import disco
28 from wokkel.iwokkel import IPubSubService 28 from wokkel.iwokkel import IPubSubService
29 from twisted.python import log 29 from twisted.python import log
30 from twisted.python import failure 30 from twisted.python import failure
31 from twisted.internet import defer 31 from twisted.internet import defer
32 from twisted.words.xish import domish 32 from twisted.words.xish import domish
33 from twisted.words.protocols.jabber import jid, error 33 from twisted.words.protocols.jabber import jid, error
34 import time
35 34
36 FORWARDED_NS = 'urn:xmpp:forward:0' 35 FORWARDED_NS = 'urn:xmpp:forward:0'
37 PRIV_ENT_NS = 'urn:xmpp:privilege:1' 36 PRIV_ENT_NS = 'urn:xmpp:privilege:1'
38 PRIV_ENT_ADV_XPATH = '/message/privilege[@xmlns="{}"]'.format(PRIV_ENT_NS) 37 PRIV_ENT_ADV_XPATH = '/message/privilege[@xmlns="{}"]'.format(PRIV_ENT_NS)
39 ROSTER_NS = 'jabber:iq:roster' 38 ROSTER_NS = 'jabber:iq:roster'
41 PERM_MESSAGE = 'message' 40 PERM_MESSAGE = 'message'
42 PERM_PRESENCE = 'presence' 41 PERM_PRESENCE = 'presence'
43 ALLOWED_ROSTER = ('none', 'get', 'set', 'both') 42 ALLOWED_ROSTER = ('none', 'get', 'set', 'both')
44 ALLOWED_MESSAGE = ('none', 'outgoing') 43 ALLOWED_MESSAGE = ('none', 'outgoing')
45 ALLOWED_PRESENCE = ('none', 'managed_entity', 'roster') 44 ALLOWED_PRESENCE = ('none', 'managed_entity', 'roster')
46 TO_CHECK = {PERM_ROSTER:ALLOWED_ROSTER, PERM_MESSAGE:ALLOWED_MESSAGE, PERM_PRESENCE:ALLOWED_PRESENCE} 45 TO_CHECK = {
46 PERM_ROSTER:ALLOWED_ROSTER,
47 PERM_MESSAGE:ALLOWED_MESSAGE,
48 PERM_PRESENCE:ALLOWED_PRESENCE
49 }
47 50
48 51
49 class InvalidStanza(Exception): 52 class InvalidStanza(Exception):
50 pass 53 pass
51 54
52 class NotAllowedError(Exception): 55 class NotAllowedError(Exception):
53 pass 56 pass
54 57
55 class PrivilegesHandler(disco.DiscoClientProtocol): 58 class PrivilegesHandler(disco.DiscoClientProtocol):
56 #FIXME: need to manage updates, and database sync 59 # FIXME: need to manage updates, XEP-0356 must be updated to get roster pushes
57 #TODO: cache 60 # TODO: cache
58 61
59 def __init__(self, service_jid): 62 def __init__(self, service_jid):
60 super(PrivilegesHandler, self).__init__() 63 super(PrivilegesHandler, self).__init__()
61 self.backend = None 64 self.backend = None
62 self._permissions = {PERM_ROSTER: 'none', 65 self._permissions = {PERM_ROSTER: 'none',
66 self.caps_map = {} # key: bare jid, value: dict of resources with caps hash 69 self.caps_map = {} # key: bare jid, value: dict of resources with caps hash
67 # key: (hash,version), value: dict with DiscoInfo instance (infos) and nodes to 70 # key: (hash,version), value: dict with DiscoInfo instance (infos) and nodes to
68 # notify (notify) 71 # notify (notify)
69 self.hash_map = {} 72 self.hash_map = {}
70 self.roster_cache = {} # key: jid, value: dict with "timestamp" and "roster" 73 self.roster_cache = {} # key: jid, value: dict with "timestamp" and "roster"
71 self.presence_map = {} # inverted roster: key: jid, value: set of entities who has this jid in roster (with presence of "from" or "both") 74 # key: jid, value: set of entities who need to receive a notification when we
75 # get a presence from them. All entities in value have a presence subscription
76 # to the key entity.
77 self.presence_map = {}
78 # resource currently online
79 self.presences = set()
72 80
73 @property 81 @property
74 def permissions(self): 82 def permissions(self):
75 return self._permissions 83 return self._permissions
76 84
95 raise InvalidStanza('unexpected element {}'.format(perm_elt.name)) 103 raise InvalidStanza('unexpected element {}'.format(perm_elt.name))
96 perm_access = perm_elt['access'] 104 perm_access = perm_elt['access']
97 perm_type = perm_elt['type'] 105 perm_type = perm_elt['type']
98 try: 106 try:
99 if perm_type not in TO_CHECK[perm_access]: 107 if perm_type not in TO_CHECK[perm_access]:
100 raise InvalidStanza('bad type [{}] for permission {}'.format(perm_type, perm_access)) 108 raise InvalidStanza(
109 'bad type [{}] for permission {}'
110 .format(perm_type, perm_access)
111 )
101 except KeyError: 112 except KeyError:
102 raise InvalidStanza('bad permission [{}]'.format(perm_access)) 113 raise InvalidStanza('bad permission [{}]'.format(perm_access))
103 except InvalidStanza as e: 114 except InvalidStanza as e:
104 log.msg("Invalid stanza received ({}), setting permission to none".format(e)) 115 log.msg(
116 f"Invalid stanza received ({e}), setting permission to none"
117 )
105 for perm in self._permissions: 118 for perm in self._permissions:
106 self._permissions[perm] = 'none' 119 self._permissions[perm] = 'none'
107 break 120 break
108 121
109 self._permissions[perm_access] = perm_type or 'none' 122 self._permissions[perm_access] = perm_type or 'none'
110 123
111 log.msg('Privileges updated: roster={roster}, message={message}, presence={presence}'.format(**self._permissions)) 124 log.msg(
125 'Privileges updated: roster={roster}, message={message}, presence={presence}'
126 .format(**self._permissions)
127 )
112 128
113 ## roster ## 129 ## roster ##
114 130
115 def getRoster(self, to_jid): 131 def getRoster(self, to_jid):
116 """ 132 """
247 self.roster_cache[from_jid_bare] = {'timestamp': timestamp, 263 self.roster_cache[from_jid_bare] = {'timestamp': timestamp,
248 'roster': roster, 264 'roster': roster,
249 } 265 }
250 for roster_jid, roster_item in roster.items(): 266 for roster_jid, roster_item in roster.items():
251 if roster_item.subscriptionFrom: 267 if roster_item.subscriptionFrom:
268 # we need to know who is subscribed to our user, to send them
269 # notifications when they send presence to us
252 self.presence_map.setdefault(roster_jid, set()).add(from_jid_bare) 270 self.presence_map.setdefault(roster_jid, set()).add(from_jid_bare)
271 if ((roster_item.subscriptionTo
272 and jid.JID(roster_jid.host) == self.backend.server_jid)):
273 # we also need to know who on this server we are subscribed to, so
274 # we can get their notifications even if they didn't connect so far.
275 self.presence_map.setdefault(from_jid_bare, set()).add(roster_jid)
253 276
254 presence_type = presence_elt.getAttribute('type') 277 presence_type = presence_elt.getAttribute('type')
255 if presence_type != "unavailable": 278 if presence_type == "unavailable":
256 # new resource available, we check entity capabilities 279 self.presences.discard(from_jid)
280 elif from_jid not in self.presences:
281 # new resource available
282
283 # we keep resources present in cache to avoid sending notifications on each
284 # status change
285 self.presences.add(from_jid)
286
287 # we check entity capabilities
257 try: 288 try:
258 c_elt = next(presence_elt.elements('http://jabber.org/protocol/caps', 'c')) 289 c_elt = next(
290 presence_elt.elements('http://jabber.org/protocol/caps', 'c')
291 )
259 hash_ = c_elt['hash'] 292 hash_ = c_elt['hash']
260 ver = c_elt['ver'] 293 ver = c_elt['ver']
261 except (StopIteration, KeyError): 294 except (StopIteration, KeyError):
262 # no capabilities, we don't go further 295 # no capabilities, we don't go further
263 return 296 return
266 disco_tuple = (hash_, ver) 299 disco_tuple = (hash_, ver)
267 300
268 if disco_tuple not in self.hash_map: 301 if disco_tuple not in self.hash_map:
269 # first time we se this hash, what is behind it? 302 # first time we se this hash, what is behind it?
270 try: 303 try:
271 infos = yield self.requestInfo(from_jid) 304 infos = await self.requestInfo(from_jid)
272 except error.StanzaError as e: 305 except error.StanzaError as e:
273 log.msg( 306 log.msg(
274 f"WARNING: can't request disco info for {from_jid!r} (presence: " 307 f"WARNING: can't request disco info for {from_jid!r} (presence: "
275 f"{presence_type}): {e}" 308 f"{presence_type}): {e}"
276 ) 309 )
298 tuple(self.presence_map.get(from_jid_bare, ())) 331 tuple(self.presence_map.get(from_jid_bare, ()))
299 + (from_jid_bare, self.backend.server_jid) 332 + (from_jid_bare, self.backend.server_jid)
300 ) 333 )
301 334
302 # FIXME: add "presence" access_model (for node) for getLastItems 335 # FIXME: add "presence" access_model (for node) for getLastItems
303 last_items = yield self._backend.storage.getLastItems(publishers, nodes, ('open',), ('open',), True) 336 # TODO: manage other access model (whitelist, …)
304 # we send message with last item, as required by https://xmpp.org/extensions/xep-0163.html#notify-last 337 last_items = await self.backend.storage.getLastItems(
338 publishers,
339 nodes,
340 ('open', 'presence'), ('open', 'presence'), True
341 )
342 # we send message with last item, as required by
343 # https://xmpp.org/extensions/xep-0163.html#notify-last
305 for pep_jid, node, item, item_access_model in last_items: 344 for pep_jid, node, item, item_access_model in last_items:
306 self.notifyPublish(pep_jid, node, [(from_jid, None, [item])]) 345 self.notifyPublish(pep_jid, node, [(from_jid, None, [item])])
307 346
308 ## misc ## 347 ## misc ##
309 348
310 @defer.inlineCallbacks 349 async def getAutoSubscribers(
311 def getAutoSubscribers(self, recipient, nodeIdentifier, explicit_subscribers): 350 self,
312 """get automatic subscribers, i.e. subscribers with presence subscription and +notify for this node 351 recipient: jid.JID,
313 352 nodeIdentifier: str,
314 @param recipient(jid.JID): jid of the PEP owner of this node 353 explicit_subscribers: Set[jid.JID]
315 @param nodeIdentifier(unicode): node 354 ) -> List[jid.JID]:
316 @param explicit_subscribers(set(jid.JID}: jids of people which have an explicit subscription 355 """Get automatic subscribers
317 @return (list[jid.JID]): full jid of automatically subscribed entities 356
357 Get subscribers with presence subscription and +notify for this node
358 @param recipient: jid of the PEP owner of this node
359 @param nodeIdentifier: node
360 @param explicit_subscribers: jids of people which have an explicit subscription
361 @return: full jid of automatically subscribed entities
318 """ 362 """
319 auto_subscribers = [] 363 auto_subscribers = []
320 roster = yield self.getRoster(recipient) 364 roster = await self.getRoster(recipient)
321 for roster_jid, roster_item in roster.items(): 365 for roster_jid, roster_item in roster.items():
322 if roster_jid in explicit_subscribers: 366 if roster_jid in explicit_subscribers:
323 continue 367 continue
324 if roster_item.subscriptionFrom: 368 if roster_item.subscriptionFrom:
325 try: 369 try:
329 for res, disco_tuple in online_resources.items(): 373 for res, disco_tuple in online_resources.items():
330 notify = self.hash_map[disco_tuple]['notify'] 374 notify = self.hash_map[disco_tuple]['notify']
331 if nodeIdentifier in notify: 375 if nodeIdentifier in notify:
332 full_jid = jid.JID(tuple=(roster_jid.user, roster_jid.host, res)) 376 full_jid = jid.JID(tuple=(roster_jid.user, roster_jid.host, res))
333 auto_subscribers.append(full_jid) 377 auto_subscribers.append(full_jid)
334 defer.returnValue(auto_subscribers) 378 return auto_subscribers