Mercurial > libervia-pubsub
comparison idavoll/pgsql_backend.py @ 41:ea3d3544a52e
Rewrite using separated backend interfaces. The backend also uses a separate
class for the actual storage.
author | Ralph Meijer <ralphm@ik.nu> |
---|---|
date | Sun, 31 Oct 2004 21:11:03 +0000 |
parents | 39d0c6fa027f |
children | 9685b7e291ef |
comparison
equal
deleted
inserted
replaced
40:b9e7b3b6c687 | 41:ea3d3544a52e |
---|---|
2 from twisted.internet import defer | 2 from twisted.internet import defer |
3 from twisted.protocols.jabber import jid | 3 from twisted.protocols.jabber import jid |
4 from twisted.enterprise import adbapi | 4 from twisted.enterprise import adbapi |
5 import backend | 5 import backend |
6 | 6 |
7 class Service(service.Service): | 7 class Storage: |
8 """ PostgreSQL backend Service for a JEP-0060 pubsub service """ | 8 def __init__(self, user, database): |
9 self.dbpool = adbapi.ConnectionPool('pyPgSQL.PgSQL', user=user, | |
10 database=database) | |
9 | 11 |
10 __implements__ = backend.IService | 12 def _get_node_configuration(self, cursor, node_id): |
11 | 13 configuration = {} |
12 def __init__(self): | |
13 self.dbpool = adbapi.ConnectionPool('pyPgSQL.PgSQL', user='ralphm', | |
14 database='pubsub_test') | |
15 | |
16 def _do_publish(self, cursor, node_id, publisher, items): | |
17 cursor.execute("""SELECT persistent, deliver_payload FROM nodes | 14 cursor.execute("""SELECT persistent, deliver_payload FROM nodes |
18 WHERE node=%s""", | 15 WHERE node=%s""", |
19 (node_id,)) | 16 (node_id,)) |
20 try: | 17 try: |
21 persist_items, deliver_payloads = cursor.fetchone() | 18 (configuration["persist_items"], |
19 configuration["deliver_payloads"]) = cursor.fetchone() | |
20 return configuration | |
22 except TypeError: | 21 except TypeError: |
23 raise backend.NodeNotFound | 22 raise backend.NodeNotFound |
24 | 23 |
25 cursor.execute("""SELECT 1 FROM affiliations | 24 def get_node_configuration(self, node_id): |
25 return self.dbpool.runInteraction(self._get_node_configuration, node_id) | |
26 | |
27 def _get_affiliation(self, cursor, node_id, entity): | |
28 cursor.execute("""SELECT affiliation FROM affiliations | |
26 JOIN nodes ON (node_id=nodes.id) | 29 JOIN nodes ON (node_id=nodes.id) |
27 JOIN entities ON (entity_id=entities.id) | 30 JOIN entities ON (entity_id=entities.id) |
28 WHERE node=%s AND jid=%s AND | 31 WHERE node=%s AND jid=%s""", |
29 affiliation IN ('owner', 'publisher')""", | 32 (node_id.encode('utf8'), entity.encode('utf8'))) |
30 (node_id.encode('utf8'), publisher.encode('utf8'))) | |
31 | 33 |
32 if not cursor.fetchone(): | 34 try: |
35 return cursor.fetchone()[0] | |
36 except TypeError: | |
37 return None | |
38 | |
39 def get_affiliation(self, node_id, entity): | |
40 return self.dbpool.runInteraction(self._get_affiliation, node_id, | |
41 entity) | |
42 | |
43 def get_subscribers(self, node_id): | |
44 d = self.dbpool.runQuery("""SELECT jid, resource FROM subscriptions | |
45 JOIN nodes ON (node_id=nodes.id) | |
46 JOIN entities ON (entity_id=entities.id) | |
47 WHERE node=%s AND | |
48 subscription='subscribed'""", | |
49 (node_id.encode('utf8'),)) | |
50 d.addCallback(self._convert_to_jids) | |
51 return d | |
52 | |
53 def _convert_to_jids(self, list): | |
54 return [jid.JID("%s/%s" % (l[0], l[1])).full() for l in list] | |
55 | |
56 class BackendService(backend.BackendService): | |
57 """ PostgreSQL backend Service for a JEP-0060 pubsub service """ | |
58 | |
59 def __init__(self, storage): | |
60 backend.BackendService.__init__(self) | |
61 self.storage = storage | |
62 | |
63 def do_publish(self, result, node_id, items, requestor): | |
64 print result | |
65 configuration = result[0][1] | |
66 persist_items = configuration["persist_items"] | |
67 deliver_payloads = configuration["deliver_payloads"] | |
68 affiliation = result[1][1] | |
69 | |
70 if affiliation not in ['owner', 'publisher']: | |
33 raise backend.NotAuthorized | 71 raise backend.NotAuthorized |
34 | 72 |
35 if items and not persist_items and not deliver_payloads: | 73 if items and not persist_items and not deliver_payloads: |
36 raise backend.NoPayloadAllowed | 74 raise backend.NoPayloadAllowed |
37 elif not items and (persist_items or deliver_payloads): | 75 elif not items and (persist_items or deliver_payloads): |
38 raise backend.PayloadExpected | 76 raise backend.PayloadExpected |
39 | 77 |
40 print "publish by %s to %s" % (publisher, node_id) | 78 print "publish by %s to %s" % (requestor.full(), node_id) |
41 | 79 |
42 if persist_items or deliver_payloads: | 80 if persist_items or deliver_payloads: |
43 for item in items: | 81 for item in items: |
44 if item["id"] is None: | 82 if item["id"] is None: |
45 item["id"] = 'random' # FIXME | 83 item["id"] = 'random' # FIXME |
46 | 84 |
47 if persist_items: | 85 if persist_items: |
48 self.storeItems(node_id, publisher, items) | 86 d = self.store_items(node_id, items, requestor.full()) |
87 else: | |
88 d = defer.succeed(None) | |
49 | 89 |
90 d.addCallback(self.do_notify, node_id, items, deliver_payloads) | |
91 | |
92 def do_notify(self, result, node_id, items, deliver_payloads): | |
50 if items and not deliver_payloads: | 93 if items and not deliver_payloads: |
51 for item in items: | 94 for item in items: |
52 item.children = [] | 95 item.children = [] |
53 | 96 |
54 recipients = self.get_subscribers(node_id) | 97 self.dispatch({ 'items': items, 'node_id': node_id }, |
55 recipients.addCallback(self.magic_filter, node_id, items) | 98 '//event/pubsub/notify') |
56 recipients.addCallback(self.pubsub_service.do_notification, node_id) | |
57 | 99 |
58 def do_publish(self, node_id, publisher, items): | 100 def publish(self, node_id, items, requestor): |
59 d = self.dbpool.runInteraction(self._do_publish, node_id, publisher, items) | 101 d1 = self.storage.get_node_configuration(node_id) |
102 d2 = self.storage.get_affiliation(node_id, requestor.full()) | |
103 d = defer.DeferredList([d1, d2], fireOnOneErrback=1) | |
104 d.addErrback(lambda x: x.value[0]) | |
105 d.addCallback(self.do_publish, node_id, items, requestor) | |
60 return d | 106 return d |
61 | 107 |
62 def magic_filter(self, subscribers, node_id, items): | 108 def get_notification_list(self, node_id, items): |
109 d = self.storage.get_subscribers(node_id) | |
110 d.addCallback(self._magic_filter, node_id, items) | |
111 return d | |
112 | |
113 def _magic_filter(self, subscribers, node_id, items): | |
63 list = {} | 114 list = {} |
64 for subscriber in subscribers: | 115 for subscriber in subscribers: |
65 list[subscriber] = items | 116 list[subscriber] = items |
66 | 117 |
67 return list | 118 return list |
68 | 119 |
69 def get_subscribers(self, node_id): | 120 def store_items(self, node_id, items, publisher): |
70 d = self.dbpool.runQuery("""SELECT jid, resource FROM subscriptions | 121 return defer.succeed(None) |
71 JOIN nodes ON (node_id=nodes.id) | |
72 JOIN entities ON (entity_id=entities.id) | |
73 WHERE node=%s AND | |
74 subscription='subscribed'""", | |
75 (node_id.encode('utf8'),)) | |
76 d.addCallback(self.convert_to_jids) | |
77 return d | |
78 | 122 |
79 def convert_to_jids(self, list): | 123 class PublishService(service.Service): |
80 return [jid.JID("%s/%s" % (l[0], l[1])).full() for l in list] | |
81 | 124 |
82 def storeItems(self, node_id, publisher, items): | 125 __implements__ = backend.IPublishService, |
83 pass | 126 |
127 def publish(self, node_id, items, requestor): | |
128 return self.parent.publish(node_id, items, requestor) | |
129 | |
130 class NotificationService(backend.NotificationService): | |
131 | |
132 __implements__ = backend.INotificationService, | |
133 | |
134 def get_notification_list(self, node_id, items): | |
135 return self.parent.get_notification_list(node_id, items) | |
136 | |
137 class PersistenceService(service.Service): | |
138 | |
139 __implements__ = backend.IPersistenceService, | |
140 | |
141 def store_items(self, node_id, items, publisher): | |
142 return self.parent.store_items(node_id, items, publisher) |