28
|
1 from twisted.application import service |
|
2 from twisted.internet import defer |
|
3 from twisted.protocols.jabber import jid |
|
4 from twisted.enterprise import adbapi |
|
5 import backend |
|
6 |
|
7 class Service(service.Service): |
|
8 """ PostgreSQL backend Service for a JEP-0060 pubsub service """ |
|
9 |
|
10 __implements__ = backend.IService |
|
11 |
|
12 def __init__(self): |
|
13 self.dbpool = adbapi.ConnectionPool('pyPgSQL.PgSQL', user='ralphm', |
|
14 database='pubsub_test') |
|
15 |
|
16 def _do_publish(self, cursor, node_id, publisher, items): |
|
17 cursor.execute("""SELECT persistent, deliver_payload FROM nodes |
|
18 WHERE node=%s""", |
|
19 (node_id,)) |
|
20 try: |
|
21 persist_items, deliver_payloads = cursor.fetchone() |
|
22 except TypeError: |
|
23 raise backend.NodeNotFound |
|
24 |
|
25 cursor.execute("""SELECT 1 FROM affiliations |
|
26 JOIN nodes ON (node_id=nodes.id) |
|
27 JOIN entities ON (entity_id=entities.id) |
|
28 WHERE node=%s AND jid=%s AND |
|
29 affiliation IN ('owner', 'publisher')""", |
|
30 (node_id.encode('utf8'), publisher.encode('utf8'))) |
|
31 |
|
32 if not cursor.fetchone(): |
|
33 raise backend.NotAuthorized |
|
34 |
|
35 if items and not persist_items and not deliver_payloads: |
|
36 raise backend.NoPayloadAllowed |
|
37 elif not items and (persist_items or deliver_payloads): |
|
38 raise backend.PayloadExpected |
|
39 |
|
40 print "publish by %s to %s" % (publisher, node_id) |
|
41 |
|
42 if persist_items or deliver_payloads: |
|
43 for item in items: |
|
44 if item["id"] is None: |
|
45 item["id"] = 'random' # FIXME |
|
46 |
|
47 if persist_items: |
|
48 self.storeItems(node_id, publisher, items) |
|
49 |
|
50 if items and not deliver_payloads: |
|
51 for item in items: |
|
52 item.children = [] |
|
53 |
|
54 recipients = self.get_subscribers(node_id) |
|
55 recipients.addCallback(self.magic_filter, node_id, items) |
|
56 recipients.addCallback(self.pubsub_service.do_notification, node_id) |
|
57 |
|
58 def do_publish(self, node_id, publisher, items): |
|
59 d = self.dbpool.runInteraction(self._do_publish, node_id, publisher, items) |
|
60 return d |
|
61 |
|
62 def magic_filter(self, subscribers, node_id, items): |
|
63 list = {} |
|
64 for subscriber in subscribers: |
|
65 list[subscriber] = items |
|
66 |
|
67 return list |
|
68 |
|
69 def get_subscribers(self, node_id): |
|
70 d = self.dbpool.runQuery("""SELECT jid, resource FROM subscriptions |
|
71 JOIN nodes ON (node_id=nodes.id) |
|
72 JOIN entities ON (entity_id=entities.id) |
|
73 WHERE node=%s AND |
|
74 subscription='subscribed'""", |
|
75 (node_id.encode('utf8'),)) |
|
76 d.addCallback(self.convert_to_jids) |
|
77 return d |
|
78 |
|
79 def convert_to_jids(self, list): |
|
80 return [jid.JID("%s/%s" % (l[0], l[1])).full() for l in list] |
|
81 |
|
82 def storeItems(self, node_id, publisher, items): |
|
83 pass |