comparison src/privilege.py @ 369:dabee42494ac

config file + cleaning: - SàT Pubsub can now be configured using the same config file as SàT itself (i.e. sat.conf or .sat.conf), in the same locations (/etc, local dir, xdg dir). Its options must be in the "pubsub" section - options on command line override config options - removed tap and http files which are not used anymore - changed directory structure to put source in src, to be coherent with SàT and Libervia - changed options name, db* become db_*, secret become xmpp_pwd - an exception is raised if jid or xmpp_pwd is are not configured
author Goffi <goffi@goffi.org>
date Fri, 02 Mar 2018 12:59:38 +0100
parents sat_pubsub/privilege.py@d1f63ae1eaf4
children 371e72871e19
comparison
equal deleted inserted replaced
368:618a92080812 369:dabee42494ac
1 #!/usr/bin/python
2 #-*- coding: utf-8 -*-
3 #
4 # Copyright (c) 2015 Jérôme Poisson
5
6
7 # This program is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU Affero General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
11
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU Affero General Public License for more details.
16
17 # You should have received a copy of the GNU Affero General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20 # ---
21
22 # This module implements XEP-0356 (Privileged Entity) to manage rosters, messages and presences
23
24 from wokkel import xmppim
25 from wokkel.compat import IQ
26 from wokkel import pubsub
27 from wokkel import disco
28 from wokkel.iwokkel import IPubSubService
29 from twisted.python import log
30 from twisted.python import failure
31 from twisted.internet import defer
32 from twisted.words.xish import domish
33 from twisted.words.protocols.jabber import jid
34 import time
35
36 FORWARDED_NS = 'urn:xmpp:forward:0'
37 PRIV_ENT_NS = 'urn:xmpp:privilege:1'
38 PRIV_ENT_ADV_XPATH = '/message/privilege[@xmlns="{}"]'.format(PRIV_ENT_NS)
39 ROSTER_NS = 'jabber:iq:roster'
40 PERM_ROSTER = 'roster'
41 PERM_MESSAGE = 'message'
42 PERM_PRESENCE = 'presence'
43 ALLOWED_ROSTER = ('none', 'get', 'set', 'both')
44 ALLOWED_MESSAGE = ('none', 'outgoing')
45 ALLOWED_PRESENCE = ('none', 'managed_entity', 'roster')
46 TO_CHECK = {PERM_ROSTER:ALLOWED_ROSTER, PERM_MESSAGE:ALLOWED_MESSAGE, PERM_PRESENCE:ALLOWED_PRESENCE}
47
48
49 class InvalidStanza(Exception):
50 pass
51
52 class NotAllowedError(Exception):
53 pass
54
55 class PrivilegesHandler(disco.DiscoClientProtocol):
56 #FIXME: need to manage updates, and database sync
57 #TODO: cache
58
59 def __init__(self, service_jid):
60 super(PrivilegesHandler, self).__init__()
61 self._permissions = {PERM_ROSTER: 'none',
62 PERM_MESSAGE: 'none',
63 PERM_PRESENCE: 'none'}
64 self._pubsub_service = None
65 self._backend = None
66 # FIXME: we use a hack supposing that our privilege come from hostname
67 # and we are a component named [name].hostname
68 # but we need to manage properly server
69 # TODO: do proper server handling
70 self.server_jid = jid.JID(service_jid.host.split('.', 1)[1])
71 self.caps_map = {} # key: bare jid, value: dict of resources with caps hash
72 self.hash_map = {} # key: (hash,version), value: dict with DiscoInfo instance (infos) and nodes to notify (notify)
73 self.roster_cache = {} # key: jid, value: dict with "timestamp" and "roster"
74 self.presence_map = {} # inverted roster: key: jid, value: set of entities who has this jid in roster (with presence of "from" or "both")
75 self.server = None
76
77 @property
78 def permissions(self):
79 return self._permissions
80
81 def connectionInitialized(self):
82 for handler in self.parent.handlers:
83 if IPubSubService.providedBy(handler):
84 self._pubsub_service = handler
85 break
86 self._backend = self.parent.parent.getServiceNamed('backend')
87 self.xmlstream.addObserver(PRIV_ENT_ADV_XPATH, self.onAdvertise)
88 self.xmlstream.addObserver('/presence', self.onPresence)
89
90 def onAdvertise(self, message):
91 """Managage the <message/> advertising privileges
92
93 self._permissions will be updated according to advertised privileged
94 """
95 privilege_elt = message.elements(PRIV_ENT_NS, 'privilege').next()
96 for perm_elt in privilege_elt.elements(PRIV_ENT_NS):
97 try:
98 if perm_elt.name != 'perm':
99 raise InvalidStanza(u'unexpected element {}'.format(perm_elt.name))
100 perm_access = perm_elt['access']
101 perm_type = perm_elt['type']
102 try:
103 if perm_type not in TO_CHECK[perm_access]:
104 raise InvalidStanza(u'bad type [{}] for permission {}'.format(perm_type, perm_access))
105 except KeyError:
106 raise InvalidStanza(u'bad permission [{}]'.format(perm_access))
107 except InvalidStanza as e:
108 log.msg("Invalid stanza received ({}), setting permission to none".format(e))
109 for perm in self._permissions:
110 self._permissions[perm] = 'none'
111 break
112
113 self._permissions[perm_access] = perm_type or 'none'
114
115 log.msg('Privileges updated: roster={roster}, message={message}, presence={presence}'.format(**self._permissions))
116
117 ## roster ##
118
119 def getRoster(self, to_jid):
120 """
121 Retrieve contact list.
122
123 @return: Roster as a mapping from L{JID} to L{RosterItem}.
124 @rtype: L{twisted.internet.defer.Deferred}
125 """
126 # TODO: cache results
127 if self._permissions[PERM_ROSTER] not in ('get', 'both'):
128 log.msg("WARNING: permission not allowed to get roster")
129 raise failure.Failure(NotAllowedError('roster get is not allowed'))
130
131 def processRoster(result):
132 roster = {}
133 for element in result.query.elements(ROSTER_NS, 'item'):
134 item = xmppim.RosterItem.fromElement(element)
135 roster[item.entity] = item
136
137 return roster
138
139 iq = IQ(self.xmlstream, 'get')
140 iq.addElement((ROSTER_NS, 'query'))
141 iq["to"] = to_jid.userhost()
142 d = iq.send()
143 d.addCallback(processRoster)
144 return d
145
146 def _isSubscribedFrom(self, roster, entity, roster_owner_jid):
147 try:
148 return roster[entity.userhostJID()].subscriptionFrom
149 except KeyError:
150 return False
151
152 def isSubscribedFrom(self, entity, roster_owner_jid):
153 """Check if entity has presence subscription from roster_owner_jid
154
155 @param entity(jid.JID): entity to check subscription to
156 @param roster_owner_jid(jid.JID): owner of the roster to check
157 @return D(bool): True if entity has a subscription from roster_owner_jid
158 """
159 d = self.getRoster(roster_owner_jid)
160 d.addCallback(self._isSubscribedFrom, entity, roster_owner_jid)
161 return d
162
163 ## message ##
164
165 def sendMessage(self, priv_message, to_jid=None):
166 """Send privileged message (in the name of the server)
167
168 @param priv_message(domish.Element): privileged message
169 @param to_jid(jid.JID, None): main message destinee
170 None to use our own server
171 """
172 if self._permissions[PERM_MESSAGE] not in ('outgoing',):
173 log.msg("WARNING: permission not allowed to send privileged messages")
174 raise failure.Failure(NotAllowedError('privileged messages are not allowed'))
175
176 main_message = domish.Element((None, "message"))
177 if to_jid is None:
178 to_jid = self.server_jid
179 main_message['to'] = to_jid.full()
180 privilege_elt = main_message.addElement((PRIV_ENT_NS, 'privilege'))
181 forwarded_elt = privilege_elt.addElement((FORWARDED_NS, 'forwarded'))
182 priv_message['xmlns'] = 'jabber:client'
183 forwarded_elt.addChild(priv_message)
184 self.send(main_message)
185
186 def notifyPublish(self, pep_jid, nodeIdentifier, notifications):
187 """Do notifications using privileges"""
188 for subscriber, subscriptions, items in notifications:
189 message = self._pubsub_service._createNotification('items', pep_jid,
190 nodeIdentifier, subscriber,
191 subscriptions)
192 for item in items:
193 item.uri = pubsub.NS_PUBSUB_EVENT
194 message.event.items.addChild(item)
195 self.sendMessage(message)
196
197
198 def notifyRetract(self, pep_jid, nodeIdentifier, notifications):
199 for subscriber, subscriptions, items in notifications:
200 message = self._pubsub_service._createNotification('items', pep_jid,
201 nodeIdentifier, subscriber,
202 subscriptions)
203 for item in items:
204 retract = domish.Element((None, "retract"))
205 retract['id'] = item['id']
206 message.event.items.addChild(retract)
207 self.sendMessage(message)
208
209
210 # def notifyDelete(self, service, nodeIdentifier, subscribers,
211 # redirectURI=None):
212 # # TODO
213 # for subscriber in subscribers:
214 # message = self._createNotification('delete', service,
215 # nodeIdentifier,
216 # subscriber)
217 # if redirectURI:
218 # redirect = message.event.delete.addElement('redirect')
219 # redirect['uri'] = redirectURI
220 # self.send(message)
221
222
223 ## presence ##
224
225 @defer.inlineCallbacks
226 def onPresence(self, presence_elt):
227 if self.server is None:
228 # FIXME: we use a hack supposing that our delegation come from hostname
229 # and we are a component named [name].hostname
230 # but we need to manage properly allowed servers
231 # TODO: do proper origin security check
232 _, self.server = presence_elt['to'].split('.', 1)
233 from_jid = jid.JID(presence_elt['from'])
234 from_jid_bare = from_jid.userhostJID()
235 if from_jid.host == self.server and from_jid_bare not in self.roster_cache:
236 roster = yield self.getRoster(from_jid_bare)
237 timestamp = time.time()
238 self.roster_cache[from_jid_bare] = {'timestamp': timestamp,
239 'roster': roster,
240 }
241 for roster_jid, roster_item in roster.iteritems():
242 if roster_item.subscriptionFrom:
243 self.presence_map.setdefault(roster_jid, set()).add(from_jid_bare)
244
245 presence_type = presence_elt.getAttribute('type')
246 if presence_type != "unavailable":
247 # new resource available, we check entity capabilities
248 try:
249 c_elt = next(presence_elt.elements('http://jabber.org/protocol/caps', 'c'))
250 hash_ = c_elt['hash']
251 ver = c_elt['ver']
252 except (StopIteration, KeyError):
253 # no capabilities, we don't go further
254 return
255
256 # FIXME: hash is not checked (cf. XEP-0115)
257 disco_tuple = (hash_, ver)
258 jid_caps = self.caps_map.setdefault(from_jid_bare, {})
259 if from_jid.resource not in jid_caps:
260 jid_caps[from_jid.resource] = disco_tuple
261
262 if disco_tuple not in self.hash_map:
263 # first time we se this hash, what is behind it?
264 infos = yield self.requestInfo(from_jid)
265 self.hash_map[disco_tuple] = {
266 'notify': {f[:-7] for f in infos.features if f.endswith('+notify')},
267 'infos': infos
268 }
269
270 # nodes are the nodes subscribed with +notify
271 nodes = tuple(self.hash_map[disco_tuple]['notify'])
272 if not nodes:
273 return
274 # publishers are entities which have granted presence access to our user + user itself
275 publishers = tuple(self.presence_map.get(from_jid_bare, ())) + (from_jid_bare,)
276
277 # FIXME: add "presence" access_model (for node) for getLastItems
278 last_items = yield self._backend.storage.getLastItems(publishers, nodes, ('open',), ('open',), True)
279 # we send message with last item, as required by https://xmpp.org/extensions/xep-0163.html#notify-last
280 for pep_jid, node, item, item_access_model in last_items:
281 self.notifyPublish(pep_jid, node, [(from_jid, None, [item])])
282
283 ## misc ##
284
285 @defer.inlineCallbacks
286 def getAutoSubscribers(self, recipient, nodeIdentifier, explicit_subscribers):
287 """get automatic subscribers, i.e. subscribers with presence subscription and +notify for this node
288
289 @param recipient(jid.JID): jid of the PEP owner of this node
290 @param nodeIdentifier(unicode): node
291 @param explicit_subscribers(set(jid.JID}: jids of people which have an explicit subscription
292 @return (list[jid.JID]): full jid of automatically subscribed entities
293 """
294 auto_subscribers = []
295 roster = yield self.getRoster(recipient)
296 for roster_jid, roster_item in roster.iteritems():
297 if roster_jid in explicit_subscribers:
298 continue
299 if roster_item.subscriptionFrom:
300 try:
301 online_resources = self.caps_map[roster_jid]
302 except KeyError:
303 continue
304 for res, hash_ in online_resources.iteritems():
305 notify = self.hash_map[hash_]['notify']
306 if nodeIdentifier in notify:
307 full_jid = jid.JID(tuple=(roster_jid.user, roster_jid.host, res))
308 auto_subscribers.append(full_jid)
309 defer.returnValue(auto_subscribers)