comparison idavoll/pgsql_backend.py @ 41:ea3d3544a52e

Rewrite using separated backend interfaces. The backend also uses a separate class for the actual storage.
author Ralph Meijer <ralphm@ik.nu>
date Sun, 31 Oct 2004 21:11:03 +0000
parents 39d0c6fa027f
children 9685b7e291ef
comparison
equal deleted inserted replaced
40:b9e7b3b6c687 41:ea3d3544a52e
2 from twisted.internet import defer 2 from twisted.internet import defer
3 from twisted.protocols.jabber import jid 3 from twisted.protocols.jabber import jid
4 from twisted.enterprise import adbapi 4 from twisted.enterprise import adbapi
5 import backend 5 import backend
6 6
7 class Service(service.Service): 7 class Storage:
8 """ PostgreSQL backend Service for a JEP-0060 pubsub service """ 8 def __init__(self, user, database):
9 self.dbpool = adbapi.ConnectionPool('pyPgSQL.PgSQL', user=user,
10 database=database)
9 11
10 __implements__ = backend.IService 12 def _get_node_configuration(self, cursor, node_id):
11 13 configuration = {}
12 def __init__(self):
13 self.dbpool = adbapi.ConnectionPool('pyPgSQL.PgSQL', user='ralphm',
14 database='pubsub_test')
15
16 def _do_publish(self, cursor, node_id, publisher, items):
17 cursor.execute("""SELECT persistent, deliver_payload FROM nodes 14 cursor.execute("""SELECT persistent, deliver_payload FROM nodes
18 WHERE node=%s""", 15 WHERE node=%s""",
19 (node_id,)) 16 (node_id,))
20 try: 17 try:
21 persist_items, deliver_payloads = cursor.fetchone() 18 (configuration["persist_items"],
19 configuration["deliver_payloads"]) = cursor.fetchone()
20 return configuration
22 except TypeError: 21 except TypeError:
23 raise backend.NodeNotFound 22 raise backend.NodeNotFound
24 23
25 cursor.execute("""SELECT 1 FROM affiliations 24 def get_node_configuration(self, node_id):
25 return self.dbpool.runInteraction(self._get_node_configuration, node_id)
26
27 def _get_affiliation(self, cursor, node_id, entity):
28 cursor.execute("""SELECT affiliation FROM affiliations
26 JOIN nodes ON (node_id=nodes.id) 29 JOIN nodes ON (node_id=nodes.id)
27 JOIN entities ON (entity_id=entities.id) 30 JOIN entities ON (entity_id=entities.id)
28 WHERE node=%s AND jid=%s AND 31 WHERE node=%s AND jid=%s""",
29 affiliation IN ('owner', 'publisher')""", 32 (node_id.encode('utf8'), entity.encode('utf8')))
30 (node_id.encode('utf8'), publisher.encode('utf8')))
31 33
32 if not cursor.fetchone(): 34 try:
35 return cursor.fetchone()[0]
36 except TypeError:
37 return None
38
39 def get_affiliation(self, node_id, entity):
40 return self.dbpool.runInteraction(self._get_affiliation, node_id,
41 entity)
42
43 def get_subscribers(self, node_id):
44 d = self.dbpool.runQuery("""SELECT jid, resource FROM subscriptions
45 JOIN nodes ON (node_id=nodes.id)
46 JOIN entities ON (entity_id=entities.id)
47 WHERE node=%s AND
48 subscription='subscribed'""",
49 (node_id.encode('utf8'),))
50 d.addCallback(self._convert_to_jids)
51 return d
52
53 def _convert_to_jids(self, list):
54 return [jid.JID("%s/%s" % (l[0], l[1])).full() for l in list]
55
56 class BackendService(backend.BackendService):
57 """ PostgreSQL backend Service for a JEP-0060 pubsub service """
58
59 def __init__(self, storage):
60 backend.BackendService.__init__(self)
61 self.storage = storage
62
63 def do_publish(self, result, node_id, items, requestor):
64 print result
65 configuration = result[0][1]
66 persist_items = configuration["persist_items"]
67 deliver_payloads = configuration["deliver_payloads"]
68 affiliation = result[1][1]
69
70 if affiliation not in ['owner', 'publisher']:
33 raise backend.NotAuthorized 71 raise backend.NotAuthorized
34 72
35 if items and not persist_items and not deliver_payloads: 73 if items and not persist_items and not deliver_payloads:
36 raise backend.NoPayloadAllowed 74 raise backend.NoPayloadAllowed
37 elif not items and (persist_items or deliver_payloads): 75 elif not items and (persist_items or deliver_payloads):
38 raise backend.PayloadExpected 76 raise backend.PayloadExpected
39 77
40 print "publish by %s to %s" % (publisher, node_id) 78 print "publish by %s to %s" % (requestor.full(), node_id)
41 79
42 if persist_items or deliver_payloads: 80 if persist_items or deliver_payloads:
43 for item in items: 81 for item in items:
44 if item["id"] is None: 82 if item["id"] is None:
45 item["id"] = 'random' # FIXME 83 item["id"] = 'random' # FIXME
46 84
47 if persist_items: 85 if persist_items:
48 self.storeItems(node_id, publisher, items) 86 d = self.store_items(node_id, items, requestor.full())
87 else:
88 d = defer.succeed(None)
49 89
90 d.addCallback(self.do_notify, node_id, items, deliver_payloads)
91
92 def do_notify(self, result, node_id, items, deliver_payloads):
50 if items and not deliver_payloads: 93 if items and not deliver_payloads:
51 for item in items: 94 for item in items:
52 item.children = [] 95 item.children = []
53 96
54 recipients = self.get_subscribers(node_id) 97 self.dispatch({ 'items': items, 'node_id': node_id },
55 recipients.addCallback(self.magic_filter, node_id, items) 98 '//event/pubsub/notify')
56 recipients.addCallback(self.pubsub_service.do_notification, node_id)
57 99
58 def do_publish(self, node_id, publisher, items): 100 def publish(self, node_id, items, requestor):
59 d = self.dbpool.runInteraction(self._do_publish, node_id, publisher, items) 101 d1 = self.storage.get_node_configuration(node_id)
102 d2 = self.storage.get_affiliation(node_id, requestor.full())
103 d = defer.DeferredList([d1, d2], fireOnOneErrback=1)
104 d.addErrback(lambda x: x.value[0])
105 d.addCallback(self.do_publish, node_id, items, requestor)
60 return d 106 return d
61 107
62 def magic_filter(self, subscribers, node_id, items): 108 def get_notification_list(self, node_id, items):
109 d = self.storage.get_subscribers(node_id)
110 d.addCallback(self._magic_filter, node_id, items)
111 return d
112
113 def _magic_filter(self, subscribers, node_id, items):
63 list = {} 114 list = {}
64 for subscriber in subscribers: 115 for subscriber in subscribers:
65 list[subscriber] = items 116 list[subscriber] = items
66 117
67 return list 118 return list
68 119
69 def get_subscribers(self, node_id): 120 def store_items(self, node_id, items, publisher):
70 d = self.dbpool.runQuery("""SELECT jid, resource FROM subscriptions 121 return defer.succeed(None)
71 JOIN nodes ON (node_id=nodes.id)
72 JOIN entities ON (entity_id=entities.id)
73 WHERE node=%s AND
74 subscription='subscribed'""",
75 (node_id.encode('utf8'),))
76 d.addCallback(self.convert_to_jids)
77 return d
78 122
79 def convert_to_jids(self, list): 123 class PublishService(service.Service):
80 return [jid.JID("%s/%s" % (l[0], l[1])).full() for l in list]
81 124
82 def storeItems(self, node_id, publisher, items): 125 __implements__ = backend.IPublishService,
83 pass 126
127 def publish(self, node_id, items, requestor):
128 return self.parent.publish(node_id, items, requestor)
129
130 class NotificationService(backend.NotificationService):
131
132 __implements__ = backend.INotificationService,
133
134 def get_notification_list(self, node_id, items):
135 return self.parent.get_notification_list(node_id, items)
136
137 class PersistenceService(service.Service):
138
139 __implements__ = backend.IPersistenceService,
140
141 def store_items(self, node_id, items, publisher):
142 return self.parent.store_items(node_id, items, publisher)