Mercurial > libervia-pubsub
comparison idavoll/pgsql_backend.py @ 43:9685b7e291ef
Moved common stuff out of pgsql_backend.py to backend.py.
Implemented Storage class for memory backend.
Implemented item storage for pgsql Storage.
author | Ralph Meijer <ralphm@ik.nu> |
---|---|
date | Mon, 01 Nov 2004 12:37:40 +0000 |
parents | ea3d3544a52e |
children | 4447b3c5b857 |
comparison
equal
deleted
inserted
replaced
42:7d088c61e131 | 43:9685b7e291ef |
---|---|
39 def get_affiliation(self, node_id, entity): | 39 def get_affiliation(self, node_id, entity): |
40 return self.dbpool.runInteraction(self._get_affiliation, node_id, | 40 return self.dbpool.runInteraction(self._get_affiliation, node_id, |
41 entity) | 41 entity) |
42 | 42 |
43 def get_subscribers(self, node_id): | 43 def get_subscribers(self, node_id): |
44 d = self.dbpool.runQuery("""SELECT jid, resource FROM subscriptions | 44 d = self.dbpool.runQuery("""SELECT jid, resource FROM subscriptions |
45 JOIN nodes ON (node_id=nodes.id) | 45 JOIN nodes ON (node_id=nodes.id) |
46 JOIN entities ON (entity_id=entities.id) | 46 JOIN entities ON (entity_id=entities.id) |
47 WHERE node=%s AND | 47 WHERE node=%s AND |
48 subscription='subscribed'""", | 48 subscription='subscribed'""", |
49 (node_id.encode('utf8'),)) | 49 (node_id.encode('utf8'),)) |
51 return d | 51 return d |
52 | 52 |
53 def _convert_to_jids(self, list): | 53 def _convert_to_jids(self, list): |
54 return [jid.JID("%s/%s" % (l[0], l[1])).full() for l in list] | 54 return [jid.JID("%s/%s" % (l[0], l[1])).full() for l in list] |
55 | 55 |
56 def store_items(self, node_id, items, publisher): | |
57 return self.dbpool.runInteraction(self._store_items, node_id, items, | |
58 publisher) | |
59 | |
60 def _store_items(self, cursor, node_id, items, publisher): | |
61 for item in items: | |
62 self._store_item(cursor, node_id, item, publisher) | |
63 | |
64 def _store_item(self, cursor, node_id, item, publisher): | |
65 data = item.toXml() | |
66 cursor.execute("""UPDATE items SET publisher=%s, data=%s | |
67 FROM nodes | |
68 WHERE nodes.id = items.node_id AND | |
69 nodes.node = %s and items.item=%s""", | |
70 (publisher.encode('utf8'), | |
71 data.encode('utf8'), | |
72 node_id.encode('utf8'), | |
73 item["id"].encode('utf8'))) | |
74 if cursor.rowcount == 1: | |
75 return | |
76 | |
77 cursor.execute("""INSERT INTO items (node_id, item, publisher, data) | |
78 SELECT id, %s, %s, %s FROM nodes WHERE node=%s""", | |
79 (item["id"].encode('utf8'), | |
80 publisher.encode('utf8'), | |
81 data.encode('utf8'), | |
82 node_id.encode('utf8'))) | |
83 | |
56 class BackendService(backend.BackendService): | 84 class BackendService(backend.BackendService): |
57 """ PostgreSQL backend Service for a JEP-0060 pubsub service """ | 85 """ PostgreSQL backend Service for a JEP-0060 pubsub service """ |
58 | |
59 def __init__(self, storage): | |
60 backend.BackendService.__init__(self) | |
61 self.storage = storage | |
62 | |
63 def do_publish(self, result, node_id, items, requestor): | |
64 print result | |
65 configuration = result[0][1] | |
66 persist_items = configuration["persist_items"] | |
67 deliver_payloads = configuration["deliver_payloads"] | |
68 affiliation = result[1][1] | |
69 | |
70 if affiliation not in ['owner', 'publisher']: | |
71 raise backend.NotAuthorized | |
72 | |
73 if items and not persist_items and not deliver_payloads: | |
74 raise backend.NoPayloadAllowed | |
75 elif not items and (persist_items or deliver_payloads): | |
76 raise backend.PayloadExpected | |
77 | |
78 print "publish by %s to %s" % (requestor.full(), node_id) | |
79 | |
80 if persist_items or deliver_payloads: | |
81 for item in items: | |
82 if item["id"] is None: | |
83 item["id"] = 'random' # FIXME | |
84 | |
85 if persist_items: | |
86 d = self.store_items(node_id, items, requestor.full()) | |
87 else: | |
88 d = defer.succeed(None) | |
89 | |
90 d.addCallback(self.do_notify, node_id, items, deliver_payloads) | |
91 | |
92 def do_notify(self, result, node_id, items, deliver_payloads): | |
93 if items and not deliver_payloads: | |
94 for item in items: | |
95 item.children = [] | |
96 | |
97 self.dispatch({ 'items': items, 'node_id': node_id }, | |
98 '//event/pubsub/notify') | |
99 | |
100 def publish(self, node_id, items, requestor): | |
101 d1 = self.storage.get_node_configuration(node_id) | |
102 d2 = self.storage.get_affiliation(node_id, requestor.full()) | |
103 d = defer.DeferredList([d1, d2], fireOnOneErrback=1) | |
104 d.addErrback(lambda x: x.value[0]) | |
105 d.addCallback(self.do_publish, node_id, items, requestor) | |
106 return d | |
107 | |
108 def get_notification_list(self, node_id, items): | |
109 d = self.storage.get_subscribers(node_id) | |
110 d.addCallback(self._magic_filter, node_id, items) | |
111 return d | |
112 | |
113 def _magic_filter(self, subscribers, node_id, items): | |
114 list = {} | |
115 for subscriber in subscribers: | |
116 list[subscriber] = items | |
117 | |
118 return list | |
119 | |
120 def store_items(self, node_id, items, publisher): | |
121 return defer.succeed(None) | |
122 | 86 |
123 class PublishService(service.Service): | 87 class PublishService(service.Service): |
124 | 88 |
125 __implements__ = backend.IPublishService, | 89 __implements__ = backend.IPublishService, |
126 | 90 |