Mercurial > libervia-pubsub
comparison idavoll/pgsql_backend.py @ 28:39d0c6fa027f
Initial revision
author | Ralph Meijer <ralphm@ik.nu> |
---|---|
date | Mon, 18 Oct 2004 15:29:25 +0000 |
parents | |
children | ea3d3544a52e |
comparison
equal
deleted
inserted
replaced
27:e6d62c93cd0a | 28:39d0c6fa027f |
---|---|
1 from twisted.application import service | |
2 from twisted.internet import defer | |
3 from twisted.protocols.jabber import jid | |
4 from twisted.enterprise import adbapi | |
5 import backend | |
6 | |
7 class Service(service.Service): | |
8 """ PostgreSQL backend Service for a JEP-0060 pubsub service """ | |
9 | |
10 __implements__ = backend.IService | |
11 | |
12 def __init__(self): | |
13 self.dbpool = adbapi.ConnectionPool('pyPgSQL.PgSQL', user='ralphm', | |
14 database='pubsub_test') | |
15 | |
16 def _do_publish(self, cursor, node_id, publisher, items): | |
17 cursor.execute("""SELECT persistent, deliver_payload FROM nodes | |
18 WHERE node=%s""", | |
19 (node_id,)) | |
20 try: | |
21 persist_items, deliver_payloads = cursor.fetchone() | |
22 except TypeError: | |
23 raise backend.NodeNotFound | |
24 | |
25 cursor.execute("""SELECT 1 FROM affiliations | |
26 JOIN nodes ON (node_id=nodes.id) | |
27 JOIN entities ON (entity_id=entities.id) | |
28 WHERE node=%s AND jid=%s AND | |
29 affiliation IN ('owner', 'publisher')""", | |
30 (node_id.encode('utf8'), publisher.encode('utf8'))) | |
31 | |
32 if not cursor.fetchone(): | |
33 raise backend.NotAuthorized | |
34 | |
35 if items and not persist_items and not deliver_payloads: | |
36 raise backend.NoPayloadAllowed | |
37 elif not items and (persist_items or deliver_payloads): | |
38 raise backend.PayloadExpected | |
39 | |
40 print "publish by %s to %s" % (publisher, node_id) | |
41 | |
42 if persist_items or deliver_payloads: | |
43 for item in items: | |
44 if item["id"] is None: | |
45 item["id"] = 'random' # FIXME | |
46 | |
47 if persist_items: | |
48 self.storeItems(node_id, publisher, items) | |
49 | |
50 if items and not deliver_payloads: | |
51 for item in items: | |
52 item.children = [] | |
53 | |
54 recipients = self.get_subscribers(node_id) | |
55 recipients.addCallback(self.magic_filter, node_id, items) | |
56 recipients.addCallback(self.pubsub_service.do_notification, node_id) | |
57 | |
58 def do_publish(self, node_id, publisher, items): | |
59 d = self.dbpool.runInteraction(self._do_publish, node_id, publisher, items) | |
60 return d | |
61 | |
62 def magic_filter(self, subscribers, node_id, items): | |
63 list = {} | |
64 for subscriber in subscribers: | |
65 list[subscriber] = items | |
66 | |
67 return list | |
68 | |
69 def get_subscribers(self, node_id): | |
70 d = self.dbpool.runQuery("""SELECT jid, resource FROM subscriptions | |
71 JOIN nodes ON (node_id=nodes.id) | |
72 JOIN entities ON (entity_id=entities.id) | |
73 WHERE node=%s AND | |
74 subscription='subscribed'""", | |
75 (node_id.encode('utf8'),)) | |
76 d.addCallback(self.convert_to_jids) | |
77 return d | |
78 | |
79 def convert_to_jids(self, list): | |
80 return [jid.JID("%s/%s" % (l[0], l[1])).full() for l in list] | |
81 | |
82 def storeItems(self, node_id, publisher, items): | |
83 pass |