comparison sat_pubsub/privilege.py @ 405:c56a728412f1

file organisation + setup refactoring: - `/src` has been renamed to `/sat_pubsub`, this is the recommended naming convention - revamped `setup.py` on the basis of SàT's `setup.py` - added a `VERSION` which is the unique place where version number will now be set - use same trick as in SàT to specify dev version (`D` at the end) - use setuptools_scm to retrieve Mercurial hash when in dev version
author Goffi <goffi@goffi.org>
date Fri, 16 Aug 2019 12:00:02 +0200
parents src/privilege.py@371e72871e19
children ccb2a22ea0fc
comparison
equal deleted inserted replaced
404:105a0772eedd 405:c56a728412f1
1 #!/usr/bin/python
2 #-*- coding: utf-8 -*-
3 #
4 # Copyright (c) 2015 Jérôme Poisson
5
6
7 # This program is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU Affero General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
11
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU Affero General Public License for more details.
16
17 # You should have received a copy of the GNU Affero General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20 # ---
21
22 # This module implements XEP-0356 (Privileged Entity) to manage rosters, messages and presences
23
24 from wokkel import xmppim
25 from wokkel.compat import IQ
26 from wokkel import pubsub
27 from wokkel import disco
28 from wokkel.iwokkel import IPubSubService
29 from twisted.python import log
30 from twisted.python import failure
31 from twisted.internet import defer
32 from twisted.words.xish import domish
33 from twisted.words.protocols.jabber import jid
34 import time
35
36 FORWARDED_NS = 'urn:xmpp:forward:0'
37 PRIV_ENT_NS = 'urn:xmpp:privilege:1'
38 PRIV_ENT_ADV_XPATH = '/message/privilege[@xmlns="{}"]'.format(PRIV_ENT_NS)
39 ROSTER_NS = 'jabber:iq:roster'
40 PERM_ROSTER = 'roster'
41 PERM_MESSAGE = 'message'
42 PERM_PRESENCE = 'presence'
43 ALLOWED_ROSTER = ('none', 'get', 'set', 'both')
44 ALLOWED_MESSAGE = ('none', 'outgoing')
45 ALLOWED_PRESENCE = ('none', 'managed_entity', 'roster')
46 TO_CHECK = {PERM_ROSTER:ALLOWED_ROSTER, PERM_MESSAGE:ALLOWED_MESSAGE, PERM_PRESENCE:ALLOWED_PRESENCE}
47
48
49 class InvalidStanza(Exception):
50 pass
51
52 class NotAllowedError(Exception):
53 pass
54
55 class PrivilegesHandler(disco.DiscoClientProtocol):
56 #FIXME: need to manage updates, and database sync
57 #TODO: cache
58
59 def __init__(self, service_jid):
60 super(PrivilegesHandler, self).__init__()
61 self._permissions = {PERM_ROSTER: 'none',
62 PERM_MESSAGE: 'none',
63 PERM_PRESENCE: 'none'}
64 self._pubsub_service = None
65 self._backend = None
66 # FIXME: we use a hack supposing that our privilege come from hostname
67 # and we are a component named [name].hostname
68 # but we need to manage properly server
69 # TODO: do proper server handling
70 self.server_jid = jid.JID(service_jid.host.split('.', 1)[1])
71 self.caps_map = {} # key: bare jid, value: dict of resources with caps hash
72 self.hash_map = {} # key: (hash,version), value: dict with DiscoInfo instance (infos) and nodes to notify (notify)
73 self.roster_cache = {} # key: jid, value: dict with "timestamp" and "roster"
74 self.presence_map = {} # inverted roster: key: jid, value: set of entities who has this jid in roster (with presence of "from" or "both")
75 self.server = None
76
77 @property
78 def permissions(self):
79 return self._permissions
80
81 def connectionInitialized(self):
82 for handler in self.parent.handlers:
83 if IPubSubService.providedBy(handler):
84 self._pubsub_service = handler
85 break
86 self._backend = self.parent.parent.getServiceNamed('backend')
87 self.xmlstream.addObserver(PRIV_ENT_ADV_XPATH, self.onAdvertise)
88 self.xmlstream.addObserver('/presence', self.onPresence)
89
90 def onAdvertise(self, message):
91 """Managage the <message/> advertising privileges
92
93 self._permissions will be updated according to advertised privileged
94 """
95 privilege_elt = message.elements(PRIV_ENT_NS, 'privilege').next()
96 for perm_elt in privilege_elt.elements(PRIV_ENT_NS):
97 try:
98 if perm_elt.name != 'perm':
99 raise InvalidStanza(u'unexpected element {}'.format(perm_elt.name))
100 perm_access = perm_elt['access']
101 perm_type = perm_elt['type']
102 try:
103 if perm_type not in TO_CHECK[perm_access]:
104 raise InvalidStanza(u'bad type [{}] for permission {}'.format(perm_type, perm_access))
105 except KeyError:
106 raise InvalidStanza(u'bad permission [{}]'.format(perm_access))
107 except InvalidStanza as e:
108 log.msg("Invalid stanza received ({}), setting permission to none".format(e))
109 for perm in self._permissions:
110 self._permissions[perm] = 'none'
111 break
112
113 self._permissions[perm_access] = perm_type or 'none'
114
115 log.msg('Privileges updated: roster={roster}, message={message}, presence={presence}'.format(**self._permissions))
116
117 ## roster ##
118
119 def getRoster(self, to_jid):
120 """
121 Retrieve contact list.
122
123 @return: Roster as a mapping from L{JID} to L{RosterItem}.
124 @rtype: L{twisted.internet.defer.Deferred}
125 """
126 # TODO: cache results
127 if self._permissions[PERM_ROSTER] not in ('get', 'both'):
128 log.msg("WARNING: permission not allowed to get roster")
129 raise failure.Failure(NotAllowedError('roster get is not allowed'))
130
131 def processRoster(result):
132 roster = {}
133 for element in result.query.elements(ROSTER_NS, 'item'):
134 item = xmppim.RosterItem.fromElement(element)
135 roster[item.entity] = item
136
137 return roster
138
139 iq = IQ(self.xmlstream, 'get')
140 iq.addElement((ROSTER_NS, 'query'))
141 iq["to"] = to_jid.userhost()
142 d = iq.send()
143 d.addCallback(processRoster)
144 return d
145
146 def _isSubscribedFrom(self, roster, entity, roster_owner_jid):
147 try:
148 return roster[entity.userhostJID()].subscriptionFrom
149 except KeyError:
150 return False
151
152 def isSubscribedFrom(self, entity, roster_owner_jid):
153 """Check if entity has presence subscription from roster_owner_jid
154
155 @param entity(jid.JID): entity to check subscription to
156 @param roster_owner_jid(jid.JID): owner of the roster to check
157 @return D(bool): True if entity has a subscription from roster_owner_jid
158 """
159 d = self.getRoster(roster_owner_jid)
160 d.addCallback(self._isSubscribedFrom, entity, roster_owner_jid)
161 return d
162
163 ## message ##
164
165 def sendMessage(self, priv_message, to_jid=None):
166 """Send privileged message (in the name of the server)
167
168 @param priv_message(domish.Element): privileged message
169 @param to_jid(jid.JID, None): main message destinee
170 None to use our own server
171 """
172 if self._permissions[PERM_MESSAGE] not in ('outgoing',):
173 log.msg("WARNING: permission not allowed to send privileged messages")
174 raise failure.Failure(NotAllowedError('privileged messages are not allowed'))
175
176 main_message = domish.Element((None, "message"))
177 if to_jid is None:
178 to_jid = self.server_jid
179 main_message['to'] = to_jid.full()
180 privilege_elt = main_message.addElement((PRIV_ENT_NS, 'privilege'))
181 forwarded_elt = privilege_elt.addElement((FORWARDED_NS, 'forwarded'))
182 priv_message['xmlns'] = 'jabber:client'
183 forwarded_elt.addChild(priv_message)
184 self.send(main_message)
185
186 def notifyPublish(self, pep_jid, nodeIdentifier, notifications):
187 """Do notifications using privileges"""
188 for subscriber, subscriptions, items in notifications:
189 message = self._pubsub_service._createNotification('items', pep_jid,
190 nodeIdentifier, subscriber,
191 subscriptions)
192 for item in items:
193 item.uri = pubsub.NS_PUBSUB_EVENT
194 message.event.items.addChild(item)
195 self.sendMessage(message)
196
197
198 def notifyRetract(self, pep_jid, nodeIdentifier, notifications):
199 for subscriber, subscriptions, items in notifications:
200 message = self._pubsub_service._createNotification('items', pep_jid,
201 nodeIdentifier, subscriber,
202 subscriptions)
203 for item in items:
204 retract = domish.Element((None, "retract"))
205 retract['id'] = item['id']
206 message.event.items.addChild(retract)
207 self.sendMessage(message)
208
209
210 # def notifyDelete(self, service, nodeIdentifier, subscribers,
211 # redirectURI=None):
212 # # TODO
213 # for subscriber in subscribers:
214 # message = self._createNotification('delete', service,
215 # nodeIdentifier,
216 # subscriber)
217 # if redirectURI:
218 # redirect = message.event.delete.addElement('redirect')
219 # redirect['uri'] = redirectURI
220 # self.send(message)
221
222
223 ## presence ##
224
225 @defer.inlineCallbacks
226 def onPresence(self, presence_elt):
227 if self.server is None:
228 # FIXME: we use a hack supposing that our delegation come from hostname
229 # and we are a component named [name].hostname
230 # but we need to manage properly allowed servers
231 # TODO: do proper origin security check
232 _, self.server = presence_elt['to'].split('.', 1)
233 from_jid = jid.JID(presence_elt['from'])
234 from_jid_bare = from_jid.userhostJID()
235 if from_jid.host == self.server and from_jid_bare not in self.roster_cache:
236 roster = yield self.getRoster(from_jid_bare)
237 timestamp = time.time()
238 self.roster_cache[from_jid_bare] = {'timestamp': timestamp,
239 'roster': roster,
240 }
241 for roster_jid, roster_item in roster.iteritems():
242 if roster_item.subscriptionFrom:
243 self.presence_map.setdefault(roster_jid, set()).add(from_jid_bare)
244
245 presence_type = presence_elt.getAttribute('type')
246 if presence_type != "unavailable":
247 # new resource available, we check entity capabilities
248 try:
249 c_elt = next(presence_elt.elements('http://jabber.org/protocol/caps', 'c'))
250 hash_ = c_elt['hash']
251 ver = c_elt['ver']
252 except (StopIteration, KeyError):
253 # no capabilities, we don't go further
254 return
255
256 # FIXME: hash is not checked (cf. XEP-0115)
257 disco_tuple = (hash_, ver)
258
259 if disco_tuple not in self.hash_map:
260 # first time we se this hash, what is behind it?
261 infos = yield self.requestInfo(from_jid)
262 self.hash_map[disco_tuple] = {
263 'notify': {f[:-7] for f in infos.features if f.endswith('+notify')},
264 'infos': infos
265 }
266
267 # jid_caps must be filled only after hash_map is set, to be sure that
268 # the hash data is available in getAutoSubscribers
269 jid_caps = self.caps_map.setdefault(from_jid_bare, {})
270 if from_jid.resource not in jid_caps:
271 jid_caps[from_jid.resource] = disco_tuple
272
273 # nodes are the nodes subscribed with +notify
274 nodes = tuple(self.hash_map[disco_tuple]['notify'])
275 if not nodes:
276 return
277 # publishers are entities which have granted presence access to our user + user itself
278 publishers = tuple(self.presence_map.get(from_jid_bare, ())) + (from_jid_bare,)
279
280 # FIXME: add "presence" access_model (for node) for getLastItems
281 last_items = yield self._backend.storage.getLastItems(publishers, nodes, ('open',), ('open',), True)
282 # we send message with last item, as required by https://xmpp.org/extensions/xep-0163.html#notify-last
283 for pep_jid, node, item, item_access_model in last_items:
284 self.notifyPublish(pep_jid, node, [(from_jid, None, [item])])
285
286 ## misc ##
287
288 @defer.inlineCallbacks
289 def getAutoSubscribers(self, recipient, nodeIdentifier, explicit_subscribers):
290 """get automatic subscribers, i.e. subscribers with presence subscription and +notify for this node
291
292 @param recipient(jid.JID): jid of the PEP owner of this node
293 @param nodeIdentifier(unicode): node
294 @param explicit_subscribers(set(jid.JID}: jids of people which have an explicit subscription
295 @return (list[jid.JID]): full jid of automatically subscribed entities
296 """
297 auto_subscribers = []
298 roster = yield self.getRoster(recipient)
299 for roster_jid, roster_item in roster.iteritems():
300 if roster_jid in explicit_subscribers:
301 continue
302 if roster_item.subscriptionFrom:
303 try:
304 online_resources = self.caps_map[roster_jid]
305 except KeyError:
306 continue
307 for res, disco_tuple in online_resources.iteritems():
308 notify = self.hash_map[disco_tuple]['notify']
309 if nodeIdentifier in notify:
310 full_jid = jid.JID(tuple=(roster_jid.user, roster_jid.host, res))
311 auto_subscribers.append(full_jid)
312 defer.returnValue(auto_subscribers)