comparison idavoll/pgsql_backend.py @ 43:9685b7e291ef

Moved common stuff out of pgsql_backend.py to backend.py. Implemented Storage class for memory backend. Implemented item storage for pgsql Storage.
author Ralph Meijer <ralphm@ik.nu>
date Mon, 01 Nov 2004 12:37:40 +0000
parents ea3d3544a52e
children 4447b3c5b857
comparison
equal deleted inserted replaced
42:7d088c61e131 43:9685b7e291ef
39 def get_affiliation(self, node_id, entity): 39 def get_affiliation(self, node_id, entity):
40 return self.dbpool.runInteraction(self._get_affiliation, node_id, 40 return self.dbpool.runInteraction(self._get_affiliation, node_id,
41 entity) 41 entity)
42 42
43 def get_subscribers(self, node_id): 43 def get_subscribers(self, node_id):
44 d = self.dbpool.runQuery("""SELECT jid, resource FROM subscriptions 44 d = self.dbpool.runQuery("""SELECT jid, resource FROM subscriptions
45 JOIN nodes ON (node_id=nodes.id) 45 JOIN nodes ON (node_id=nodes.id)
46 JOIN entities ON (entity_id=entities.id) 46 JOIN entities ON (entity_id=entities.id)
47 WHERE node=%s AND 47 WHERE node=%s AND
48 subscription='subscribed'""", 48 subscription='subscribed'""",
49 (node_id.encode('utf8'),)) 49 (node_id.encode('utf8'),))
51 return d 51 return d
52 52
53 def _convert_to_jids(self, list): 53 def _convert_to_jids(self, list):
54 return [jid.JID("%s/%s" % (l[0], l[1])).full() for l in list] 54 return [jid.JID("%s/%s" % (l[0], l[1])).full() for l in list]
55 55
56 def store_items(self, node_id, items, publisher):
57 return self.dbpool.runInteraction(self._store_items, node_id, items,
58 publisher)
59
60 def _store_items(self, cursor, node_id, items, publisher):
61 for item in items:
62 self._store_item(cursor, node_id, item, publisher)
63
64 def _store_item(self, cursor, node_id, item, publisher):
65 data = item.toXml()
66 cursor.execute("""UPDATE items SET publisher=%s, data=%s
67 FROM nodes
68 WHERE nodes.id = items.node_id AND
69 nodes.node = %s and items.item=%s""",
70 (publisher.encode('utf8'),
71 data.encode('utf8'),
72 node_id.encode('utf8'),
73 item["id"].encode('utf8')))
74 if cursor.rowcount == 1:
75 return
76
77 cursor.execute("""INSERT INTO items (node_id, item, publisher, data)
78 SELECT id, %s, %s, %s FROM nodes WHERE node=%s""",
79 (item["id"].encode('utf8'),
80 publisher.encode('utf8'),
81 data.encode('utf8'),
82 node_id.encode('utf8')))
83
56 class BackendService(backend.BackendService): 84 class BackendService(backend.BackendService):
57 """ PostgreSQL backend Service for a JEP-0060 pubsub service """ 85 """ PostgreSQL backend Service for a JEP-0060 pubsub service """
58
59 def __init__(self, storage):
60 backend.BackendService.__init__(self)
61 self.storage = storage
62
63 def do_publish(self, result, node_id, items, requestor):
64 print result
65 configuration = result[0][1]
66 persist_items = configuration["persist_items"]
67 deliver_payloads = configuration["deliver_payloads"]
68 affiliation = result[1][1]
69
70 if affiliation not in ['owner', 'publisher']:
71 raise backend.NotAuthorized
72
73 if items and not persist_items and not deliver_payloads:
74 raise backend.NoPayloadAllowed
75 elif not items and (persist_items or deliver_payloads):
76 raise backend.PayloadExpected
77
78 print "publish by %s to %s" % (requestor.full(), node_id)
79
80 if persist_items or deliver_payloads:
81 for item in items:
82 if item["id"] is None:
83 item["id"] = 'random' # FIXME
84
85 if persist_items:
86 d = self.store_items(node_id, items, requestor.full())
87 else:
88 d = defer.succeed(None)
89
90 d.addCallback(self.do_notify, node_id, items, deliver_payloads)
91
92 def do_notify(self, result, node_id, items, deliver_payloads):
93 if items and not deliver_payloads:
94 for item in items:
95 item.children = []
96
97 self.dispatch({ 'items': items, 'node_id': node_id },
98 '//event/pubsub/notify')
99
100 def publish(self, node_id, items, requestor):
101 d1 = self.storage.get_node_configuration(node_id)
102 d2 = self.storage.get_affiliation(node_id, requestor.full())
103 d = defer.DeferredList([d1, d2], fireOnOneErrback=1)
104 d.addErrback(lambda x: x.value[0])
105 d.addCallback(self.do_publish, node_id, items, requestor)
106 return d
107
108 def get_notification_list(self, node_id, items):
109 d = self.storage.get_subscribers(node_id)
110 d.addCallback(self._magic_filter, node_id, items)
111 return d
112
113 def _magic_filter(self, subscribers, node_id, items):
114 list = {}
115 for subscriber in subscribers:
116 list[subscriber] = items
117
118 return list
119
120 def store_items(self, node_id, items, publisher):
121 return defer.succeed(None)
122 86
123 class PublishService(service.Service): 87 class PublishService(service.Service):
124 88
125 __implements__ = backend.IPublishService, 89 __implements__ = backend.IPublishService,
126 90