Mercurial > libervia-pubsub
comparison sat_pubsub/privilege.py @ 463:f520ac3164b0
privilege: improvment on last message sending on presence with `+notify`:
- local entities subscribed to the presence of an other local entity which is connecting
are now added to presence map. This helps getting their notification even if they didn't
connect recently
- nodes with `presence` access model are now also used for `+notify`
- notifications are not sent anymore in case of status change if the resource was already
present.
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 15 Oct 2021 13:40:56 +0200 |
parents | a017af61a32b |
children | d86e0f8a1405 |
comparison
equal
deleted
inserted
replaced
462:a017af61a32b | 463:f520ac3164b0 |
---|---|
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. | 17 # along with this program. If not, see <http://www.gnu.org/licenses/>. |
18 | 18 |
19 "This module implements XEP-0356 (Privileged Entity) to manage rosters, messages and " | 19 "This module implements XEP-0356 (Privileged Entity) to manage rosters, messages and " |
20 "presences" | 20 "presences" |
21 | 21 |
22 # This module implements XEP-0356 (Privileged Entity) to manage rosters, messages and presences | 22 import time |
23 | 23 from typing import List, Set |
24 from wokkel import xmppim | 24 from wokkel import xmppim |
25 from wokkel.compat import IQ | 25 from wokkel.compat import IQ |
26 from wokkel import pubsub | 26 from wokkel import pubsub |
27 from wokkel import disco | 27 from wokkel import disco |
28 from wokkel.iwokkel import IPubSubService | 28 from wokkel.iwokkel import IPubSubService |
29 from twisted.python import log | 29 from twisted.python import log |
30 from twisted.python import failure | 30 from twisted.python import failure |
31 from twisted.internet import defer | 31 from twisted.internet import defer |
32 from twisted.words.xish import domish | 32 from twisted.words.xish import domish |
33 from twisted.words.protocols.jabber import jid, error | 33 from twisted.words.protocols.jabber import jid, error |
34 import time | |
35 | 34 |
36 FORWARDED_NS = 'urn:xmpp:forward:0' | 35 FORWARDED_NS = 'urn:xmpp:forward:0' |
37 PRIV_ENT_NS = 'urn:xmpp:privilege:1' | 36 PRIV_ENT_NS = 'urn:xmpp:privilege:1' |
38 PRIV_ENT_ADV_XPATH = '/message/privilege[@xmlns="{}"]'.format(PRIV_ENT_NS) | 37 PRIV_ENT_ADV_XPATH = '/message/privilege[@xmlns="{}"]'.format(PRIV_ENT_NS) |
39 ROSTER_NS = 'jabber:iq:roster' | 38 ROSTER_NS = 'jabber:iq:roster' |
41 PERM_MESSAGE = 'message' | 40 PERM_MESSAGE = 'message' |
42 PERM_PRESENCE = 'presence' | 41 PERM_PRESENCE = 'presence' |
43 ALLOWED_ROSTER = ('none', 'get', 'set', 'both') | 42 ALLOWED_ROSTER = ('none', 'get', 'set', 'both') |
44 ALLOWED_MESSAGE = ('none', 'outgoing') | 43 ALLOWED_MESSAGE = ('none', 'outgoing') |
45 ALLOWED_PRESENCE = ('none', 'managed_entity', 'roster') | 44 ALLOWED_PRESENCE = ('none', 'managed_entity', 'roster') |
46 TO_CHECK = {PERM_ROSTER:ALLOWED_ROSTER, PERM_MESSAGE:ALLOWED_MESSAGE, PERM_PRESENCE:ALLOWED_PRESENCE} | 45 TO_CHECK = { |
46 PERM_ROSTER:ALLOWED_ROSTER, | |
47 PERM_MESSAGE:ALLOWED_MESSAGE, | |
48 PERM_PRESENCE:ALLOWED_PRESENCE | |
49 } | |
47 | 50 |
48 | 51 |
49 class InvalidStanza(Exception): | 52 class InvalidStanza(Exception): |
50 pass | 53 pass |
51 | 54 |
52 class NotAllowedError(Exception): | 55 class NotAllowedError(Exception): |
53 pass | 56 pass |
54 | 57 |
55 class PrivilegesHandler(disco.DiscoClientProtocol): | 58 class PrivilegesHandler(disco.DiscoClientProtocol): |
56 #FIXME: need to manage updates, and database sync | 59 # FIXME: need to manage updates, XEP-0356 must be updated to get roster pushes |
57 #TODO: cache | 60 # TODO: cache |
58 | 61 |
59 def __init__(self, service_jid): | 62 def __init__(self, service_jid): |
60 super(PrivilegesHandler, self).__init__() | 63 super(PrivilegesHandler, self).__init__() |
61 self.backend = None | 64 self.backend = None |
62 self._permissions = {PERM_ROSTER: 'none', | 65 self._permissions = {PERM_ROSTER: 'none', |
66 self.caps_map = {} # key: bare jid, value: dict of resources with caps hash | 69 self.caps_map = {} # key: bare jid, value: dict of resources with caps hash |
67 # key: (hash,version), value: dict with DiscoInfo instance (infos) and nodes to | 70 # key: (hash,version), value: dict with DiscoInfo instance (infos) and nodes to |
68 # notify (notify) | 71 # notify (notify) |
69 self.hash_map = {} | 72 self.hash_map = {} |
70 self.roster_cache = {} # key: jid, value: dict with "timestamp" and "roster" | 73 self.roster_cache = {} # key: jid, value: dict with "timestamp" and "roster" |
71 self.presence_map = {} # inverted roster: key: jid, value: set of entities who has this jid in roster (with presence of "from" or "both") | 74 # key: jid, value: set of entities who need to receive a notification when we |
75 # get a presence from them. All entities in value have a presence subscription | |
76 # to the key entity. | |
77 self.presence_map = {} | |
78 # resource currently online | |
79 self.presences = set() | |
72 | 80 |
73 @property | 81 @property |
74 def permissions(self): | 82 def permissions(self): |
75 return self._permissions | 83 return self._permissions |
76 | 84 |
95 raise InvalidStanza('unexpected element {}'.format(perm_elt.name)) | 103 raise InvalidStanza('unexpected element {}'.format(perm_elt.name)) |
96 perm_access = perm_elt['access'] | 104 perm_access = perm_elt['access'] |
97 perm_type = perm_elt['type'] | 105 perm_type = perm_elt['type'] |
98 try: | 106 try: |
99 if perm_type not in TO_CHECK[perm_access]: | 107 if perm_type not in TO_CHECK[perm_access]: |
100 raise InvalidStanza('bad type [{}] for permission {}'.format(perm_type, perm_access)) | 108 raise InvalidStanza( |
109 'bad type [{}] for permission {}' | |
110 .format(perm_type, perm_access) | |
111 ) | |
101 except KeyError: | 112 except KeyError: |
102 raise InvalidStanza('bad permission [{}]'.format(perm_access)) | 113 raise InvalidStanza('bad permission [{}]'.format(perm_access)) |
103 except InvalidStanza as e: | 114 except InvalidStanza as e: |
104 log.msg("Invalid stanza received ({}), setting permission to none".format(e)) | 115 log.msg( |
116 f"Invalid stanza received ({e}), setting permission to none" | |
117 ) | |
105 for perm in self._permissions: | 118 for perm in self._permissions: |
106 self._permissions[perm] = 'none' | 119 self._permissions[perm] = 'none' |
107 break | 120 break |
108 | 121 |
109 self._permissions[perm_access] = perm_type or 'none' | 122 self._permissions[perm_access] = perm_type or 'none' |
110 | 123 |
111 log.msg('Privileges updated: roster={roster}, message={message}, presence={presence}'.format(**self._permissions)) | 124 log.msg( |
125 'Privileges updated: roster={roster}, message={message}, presence={presence}' | |
126 .format(**self._permissions) | |
127 ) | |
112 | 128 |
113 ## roster ## | 129 ## roster ## |
114 | 130 |
115 def getRoster(self, to_jid): | 131 def getRoster(self, to_jid): |
116 """ | 132 """ |
247 self.roster_cache[from_jid_bare] = {'timestamp': timestamp, | 263 self.roster_cache[from_jid_bare] = {'timestamp': timestamp, |
248 'roster': roster, | 264 'roster': roster, |
249 } | 265 } |
250 for roster_jid, roster_item in roster.items(): | 266 for roster_jid, roster_item in roster.items(): |
251 if roster_item.subscriptionFrom: | 267 if roster_item.subscriptionFrom: |
268 # we need to know who is subscribed to our user, to send them | |
269 # notifications when they send presence to us | |
252 self.presence_map.setdefault(roster_jid, set()).add(from_jid_bare) | 270 self.presence_map.setdefault(roster_jid, set()).add(from_jid_bare) |
271 if ((roster_item.subscriptionTo | |
272 and jid.JID(roster_jid.host) == self.backend.server_jid)): | |
273 # we also need to know who on this server we are subscribed to, so | |
274 # we can get their notifications even if they didn't connect so far. | |
275 self.presence_map.setdefault(from_jid_bare, set()).add(roster_jid) | |
253 | 276 |
254 presence_type = presence_elt.getAttribute('type') | 277 presence_type = presence_elt.getAttribute('type') |
255 if presence_type != "unavailable": | 278 if presence_type == "unavailable": |
256 # new resource available, we check entity capabilities | 279 self.presences.discard(from_jid) |
280 elif from_jid not in self.presences: | |
281 # new resource available | |
282 | |
283 # we keep resources present in cache to avoid sending notifications on each | |
284 # status change | |
285 self.presences.add(from_jid) | |
286 | |
287 # we check entity capabilities | |
257 try: | 288 try: |
258 c_elt = next(presence_elt.elements('http://jabber.org/protocol/caps', 'c')) | 289 c_elt = next( |
290 presence_elt.elements('http://jabber.org/protocol/caps', 'c') | |
291 ) | |
259 hash_ = c_elt['hash'] | 292 hash_ = c_elt['hash'] |
260 ver = c_elt['ver'] | 293 ver = c_elt['ver'] |
261 except (StopIteration, KeyError): | 294 except (StopIteration, KeyError): |
262 # no capabilities, we don't go further | 295 # no capabilities, we don't go further |
263 return | 296 return |
266 disco_tuple = (hash_, ver) | 299 disco_tuple = (hash_, ver) |
267 | 300 |
268 if disco_tuple not in self.hash_map: | 301 if disco_tuple not in self.hash_map: |
269 # first time we se this hash, what is behind it? | 302 # first time we se this hash, what is behind it? |
270 try: | 303 try: |
271 infos = yield self.requestInfo(from_jid) | 304 infos = await self.requestInfo(from_jid) |
272 except error.StanzaError as e: | 305 except error.StanzaError as e: |
273 log.msg( | 306 log.msg( |
274 f"WARNING: can't request disco info for {from_jid!r} (presence: " | 307 f"WARNING: can't request disco info for {from_jid!r} (presence: " |
275 f"{presence_type}): {e}" | 308 f"{presence_type}): {e}" |
276 ) | 309 ) |
298 tuple(self.presence_map.get(from_jid_bare, ())) | 331 tuple(self.presence_map.get(from_jid_bare, ())) |
299 + (from_jid_bare, self.backend.server_jid) | 332 + (from_jid_bare, self.backend.server_jid) |
300 ) | 333 ) |
301 | 334 |
302 # FIXME: add "presence" access_model (for node) for getLastItems | 335 # FIXME: add "presence" access_model (for node) for getLastItems |
303 last_items = yield self._backend.storage.getLastItems(publishers, nodes, ('open',), ('open',), True) | 336 # TODO: manage other access model (whitelist, …) |
304 # we send message with last item, as required by https://xmpp.org/extensions/xep-0163.html#notify-last | 337 last_items = await self.backend.storage.getLastItems( |
338 publishers, | |
339 nodes, | |
340 ('open', 'presence'), ('open', 'presence'), True | |
341 ) | |
342 # we send message with last item, as required by | |
343 # https://xmpp.org/extensions/xep-0163.html#notify-last | |
305 for pep_jid, node, item, item_access_model in last_items: | 344 for pep_jid, node, item, item_access_model in last_items: |
306 self.notifyPublish(pep_jid, node, [(from_jid, None, [item])]) | 345 self.notifyPublish(pep_jid, node, [(from_jid, None, [item])]) |
307 | 346 |
308 ## misc ## | 347 ## misc ## |
309 | 348 |
310 @defer.inlineCallbacks | 349 async def getAutoSubscribers( |
311 def getAutoSubscribers(self, recipient, nodeIdentifier, explicit_subscribers): | 350 self, |
312 """get automatic subscribers, i.e. subscribers with presence subscription and +notify for this node | 351 recipient: jid.JID, |
313 | 352 nodeIdentifier: str, |
314 @param recipient(jid.JID): jid of the PEP owner of this node | 353 explicit_subscribers: Set[jid.JID] |
315 @param nodeIdentifier(unicode): node | 354 ) -> List[jid.JID]: |
316 @param explicit_subscribers(set(jid.JID}: jids of people which have an explicit subscription | 355 """Get automatic subscribers |
317 @return (list[jid.JID]): full jid of automatically subscribed entities | 356 |
357 Get subscribers with presence subscription and +notify for this node | |
358 @param recipient: jid of the PEP owner of this node | |
359 @param nodeIdentifier: node | |
360 @param explicit_subscribers: jids of people which have an explicit subscription | |
361 @return: full jid of automatically subscribed entities | |
318 """ | 362 """ |
319 auto_subscribers = [] | 363 auto_subscribers = [] |
320 roster = yield self.getRoster(recipient) | 364 roster = await self.getRoster(recipient) |
321 for roster_jid, roster_item in roster.items(): | 365 for roster_jid, roster_item in roster.items(): |
322 if roster_jid in explicit_subscribers: | 366 if roster_jid in explicit_subscribers: |
323 continue | 367 continue |
324 if roster_item.subscriptionFrom: | 368 if roster_item.subscriptionFrom: |
325 try: | 369 try: |
329 for res, disco_tuple in online_resources.items(): | 373 for res, disco_tuple in online_resources.items(): |
330 notify = self.hash_map[disco_tuple]['notify'] | 374 notify = self.hash_map[disco_tuple]['notify'] |
331 if nodeIdentifier in notify: | 375 if nodeIdentifier in notify: |
332 full_jid = jid.JID(tuple=(roster_jid.user, roster_jid.host, res)) | 376 full_jid = jid.JID(tuple=(roster_jid.user, roster_jid.host, res)) |
333 auto_subscribers.append(full_jid) | 377 auto_subscribers.append(full_jid) |
334 defer.returnValue(auto_subscribers) | 378 return auto_subscribers |