comparison src/plugins/plugin_misc_groupblog.py @ 462:d9456d94cd12

plugin groupblog: next-gen group blog first draft
author Goffi <goffi@goffi.org>
date Wed, 14 Mar 2012 23:45:01 +0100
parents cf005701624b
children 78e67a59d51d
comparison
equal deleted inserted replaced
461:4e361d295bca 462:d9456d94cd12
25 from twisted.words.protocols.jabber import error as jab_error 25 from twisted.words.protocols.jabber import error as jab_error
26 import twisted.internet.error 26 import twisted.internet.error
27 from twisted.words.xish import domish 27 from twisted.words.xish import domish
28 from sat.tools.xml_tools import ElementParser 28 from sat.tools.xml_tools import ElementParser
29 29
30 from wokkel import disco,pubsub 30 from wokkel import disco, pubsub, data_form
31 from feed.atom import Entry, Author 31 from feed.atom import Entry, Author
32 import uuid 32 import uuid
33 from time import time 33 from time import time
34 34
35 NS_MICROBLOG = 'urn:xmpp:microblog:%02d' 35 NS_PUBSUB = 'http://jabber.org/protocol/pubsub'
36 CONFIG_NODE = 'CONFIG' 36 #NS_PUBSUB_EXP = 'http://goffi.org/protocol/pubsub' #for non official features
37 NS_PUBSUB_EXP = NS_PUBSUB #XXX: we can't use custom namespace as Wokkel's PubSubService use official NS
38 NS_PUBSUB_ITEM_ACCESS = NS_PUBSUB_EXP + "#item-access"
39 NS_PUBSUB_CREATOR_JID_CHECK = NS_PUBSUB_EXP + "#creator-jid-check"
40 NS_PUBSUB_ITEM_CONFIG = NS_PUBSUB_EXP + "#item-config"
41 NS_PUBSUB_AUTO_CREATE = NS_PUBSUB + "#auto-create"
42 OPT_ROSTER_GROUPS_ALLOWED = 'pubsub#roster_groups_allowed'
37 OPT_ACCESS_MODEL = 'pubsub#access_model' 43 OPT_ACCESS_MODEL = 'pubsub#access_model'
38 OPT_PERSIST_ITEMS = 'pubsub#persist_items' 44 OPT_PERSIST_ITEMS = 'pubsub#persist_items'
39 OPT_MAX_ITEMS = 'pubsub#max_items' 45 OPT_MAX_ITEMS = 'pubsub#max_items'
40 OPT_NODE_TYPE = 'pubsub#node_type' 46 OPT_NODE_TYPE = 'pubsub#node_type'
41 OPT_SUBSCRIPTION_TYPE = 'pubsub#subscription_type' 47 OPT_SUBSCRIPTION_TYPE = 'pubsub#subscription_type'
58 64
59 class NodeDeletionError(Exception): 65 class NodeDeletionError(Exception):
60 pass 66 pass
61 67
62 class GroupBlog(): 68 class GroupBlog():
63 """This class use a PubSub Collection to manage roster access on microblog""" 69 """This class use a SàT PubSub Service to manage access on microblog"""
64 70
65 def __init__(self, host): 71 def __init__(self, host):
66 info(_("Group blog plugin initialization")) 72 info(_("Group blog plugin initialization"))
67 self.host = host 73 self.host = host
68 self._blog_nodes={} #keep association betweek [profile][node] and [groups]
69 for i in range(1,21):
70 self.host.plugins["XEP-0163"].addPEPEvent("MICROBLOG_%02d" % i, NS_MICROBLOG % i, self.groupblogCB, None)
71
72 host.bridge.addMethod("cleanBlogCollection", ".plugin", in_sign='s', out_sign='',
73 method=self.cleanBlogCollection,
74 doc = {
75 })
76
77 host.bridge.addMethod("getMblogNodes", ".plugin", in_sign='s', out_sign='a{sas}',
78 method=self.getMblogNodes,
79 async = True,
80 doc = { 'summary':"retrieve mblog node, and their association with roster's groups",
81 'param_0':'%(doc_profile)s',
82 'return':'list of microblog data (dict)'
83 })
84 74
85 host.bridge.addMethod("sendGroupBlog", ".plugin", in_sign='asss', out_sign='', 75 host.bridge.addMethod("sendGroupBlog", ".plugin", in_sign='asss', out_sign='',
86 method=self.sendGroupBlog, 76 method=self.sendGroupBlog,
87 doc = { 'summary':"Send a microblog to a list of groups", 77 doc = { 'summary':"Send a microblog to a list of groups",
88 'param_0':'list of groups which can read the microblog', 78 'param_0':'list of groups which can read the microblog',
89 'param_1':'text to send', 79 'param_1':'text to send',
90 'param_2':'%(doc_profile)s' 80 'param_2':'%(doc_profile)s'
91 }) 81 })
92 82
93 host.bridge.addMethod("subscribeGroupBlog", ".plugin", in_sign='ss', out_sign='', 83 def _publishMblog(self, service, groups, message, client):
94 method=self.subscribeGroupBlog,
95 doc = { 'summary':"Subscribe to the group blog of somebody",
96 'param_0':'jid of the group node published',
97 'param_1':'%(doc_profile)s'
98 })
99
100 def groupblogCB(self, itemsEvent, profile):
101 for item in itemsEvent.items:
102 microblog_data = self.host.plugins["XEP-0277"]._item2mbdata(item)
103 microblog_data["node"] = itemsEvent.nodeIdentifier
104 try:
105 microblog_data["groups"] = "\n".join(self._blog_nodes[profile].get(itemsEvent.nodeIdentifier, []))
106 except KeyError:
107 pass
108 self.host.bridge.personalEvent(itemsEvent.sender.full(), "MICROBLOG", microblog_data, profile)
109
110 def _getRootNode(self, entity):
111 return "%(entity)s_%(root_suff)s" % {'entity':entity.userhost(), 'root_suff':MBLOG_COLLECTION}
112
113 def _getNodeName(self, number):
114 """Return the node name
115 @param number: int number of the node
116 @param entity: jid of the owner"""
117 return NS_MICROBLOG % number
118
119 def _getConfigNode(self, entity):
120 return "%(entity)s_%(root_suff)s" % {'entity':entity.userhost(), 'root_suff':CONFIG_NODE}
121
122 def _configNodeCb(self, result, callback, profile):
123 self._blog_nodes[profile] = {}
124 for item in result:
125 node_ass = item.firstChildElement()
126 assert(node_ass.name == "node_association")
127 node = node_ass['node']
128 groups = [unicode(group) for group in node_ass.children]
129 self._blog_nodes[profile][node] = groups
130 callback(self._blog_nodes[profile])
131
132 def _configNodeFail(self, failure, errback):
133 errback("") #FIXME
134
135 def _configNodeErr(self, failure, user_jid, pubsub_ent, callback, errback, profile):
136 if failure.value.condition == 'item-not-found':
137 debug(_('Multiblog config node not found, creating it'))
138 _options = {OPT_ACCESS_MODEL:"whitelist", OPT_PERSIST_ITEMS:1, OPT_MAX_ITEMS:-1}
139 d = self.host.plugins["XEP-0060"].createNode(pubsub_ent, self._getConfigNode(user_jid), _options, profile_key=profile)
140 d.addCallback(lambda result: self._configNodeCb([] , callback, profile))
141 d.addErrback(self._configNodeFail, errback)
142 else:
143 self._configNodeFail(failure, errback)
144
145 def getMblogNodes(self, profile_key='@DEFAULT@', callback=None, errback=None):
146 debug(_('Getting mblog nodes'))
147 profile = self.host.memory.getProfileName(profile_key)
148 if not profile:
149 error(_("Unknown profile"))
150 return {}
151
152 def after_init(ignore):
153 pubsub_ent = self.host.memory.getServerServiceEntity("pubsub", "service", profile)
154 _jid, xmlstream = self.host.getJidNStream(profile_key)
155 d = self.host.plugins["XEP-0060"].getItems(pubsub_ent, self._getConfigNode(_jid), profile_key=profile_key)
156 d.addCallbacks(self._configNodeCb, self._configNodeErr, callbackArgs=(callback, profile), errbackArgs=(_jid, pubsub_ent, callback, errback, profile))
157
158 client = self.host.getClient(profile)
159 if not client:
160 error(_('No client for this profile key: %s') % profile_key)
161 return
162 client.client_initialized.addCallback(after_init)
163
164 def _publishMblog(self, name, message, client):
165 """Actually publish the message on the group blog 84 """Actually publish the message on the group blog
166 @param name: name of the node where we publish 85 @param service: jid of the item-access pubsub service
86 @param groups: set of groups allowed to see the item
167 @param message: message to publish 87 @param message: message to publish
168 @param client: SatXMPPClient of the published""" 88 @param client: SatXMPPClient of the published"""
169 mblog_item = self.host.plugins["XEP-0277"].data2entry({'content':message}, client.profile) 89 mblog_item = self.host.plugins["XEP-0277"].data2entry({'content':message}, client.profile)
170 defer_blog = self.host.plugins["XEP-0060"].publish(client.jid.userhostJID(), name, items=[mblog_item], profile_key=client.profile) 90 form = data_form.Form('submit', formNamespace=NS_PUBSUB_ITEM_CONFIG)
91 field = data_form.Field('list-multi', OPT_ROSTER_GROUPS_ALLOWED, values=groups)
92 form.addField(field)
93 mblog_item.addChild(form.toElement())
94 defer_blog = self.host.plugins["XEP-0060"].publish(service, client.jid.userhost(), items=[mblog_item], profile_key=client.profile)
171 defer_blog.addErrback(self._mblogPublicationFailed) 95 defer_blog.addErrback(self._mblogPublicationFailed)
172
173 def _groupNodeCreated(self, ignore, groups, name, message, client):
174 """A group node as been created, we need to add it to the configure node, and send the message to it
175 @param groups: list of groups authorized to subscribe to the node
176 @param name: unique name of the node
177 @param message: message to publish to the group
178 @param client: SatXMPPClient"""
179 def configNodeUpdated(result):
180 self._blog_nodes[client.profile][name] = groups
181 debug(_("Configuration node updated"))
182
183 config_node = self._getConfigNode(client.jid)
184 _payload = domish.Element(('','node_association'))
185 _payload['node'] = name
186 for group in groups:
187 _payload.addElement('group',content=group)
188 config_item = pubsub.Item(payload=_payload)
189 pubsub_ent = self.host.memory.getServerServiceEntity("pubsub", "service", client.profile)
190 defer_config = self.host.plugins["XEP-0060"].publish(pubsub_ent, config_node, items=[config_item], profile_key=client.profile)
191 defer_config.addCallback(configNodeUpdated)
192 defer_config.addErrback(self._configUpdateFailed)
193
194 #Finally, we publish the message
195 self._publishMblog(name, message, client)
196
197 96
198 def _mblogPublicationFailed(self, failure): 97 def _mblogPublicationFailed(self, failure):
199 #TODO 98 #TODO
200 pass 99 pass
201 100
202 def _configUpdateFailed(self, failure): 101 @defer.inlineCallbacks
203 #TODO
204 pass
205
206 def _nodeCreationFailed(self, failure, name, groups, message, client):
207 #FIXME: temporary behaviour is to delete the node,
208 #user input should be required in the future
209 def unmanagedError(failure):
210 msg = _("Can't create node")
211 error(msg)
212 raise NodeCreationError(msg)
213
214 if failure.value.condition == 'conflict':
215 pubsub_elts = filter(lambda elt: elt.name == 'pubsub', failure.value.children)
216 if pubsub_elts:
217 create_elts = filter(lambda elt: elt.name == 'create', pubsub_elts[0].children)
218 if create_elts:
219 _from = jid.JID(failure.value.stanza['from'])
220 _node = create_elts[0]['node']
221 d = self.host.plugins["XEP-0060"].deleteNode(_from, _node, client.profile)
222 d.addCallback(self._createNode, name, groups, message, client)
223 d.addErrback(unmanagedError)
224 else:
225 unmanagedError(None)
226 msg = _("Can't create node")
227 error(msg)
228 raise NodeCreationError(msg)
229
230 def _createNode(self, ignore, name, groups, message, client):
231 """create a group microblog node
232 @param ignore: ignored param, necessary to be added as a deferred callback
233 @param name: name of the node
234 @param groups: list of group than can subscribe to the node
235 @param message: message to publish
236 @param client: SatXMPPClient"""
237 _options = {OPT_ACCESS_MODEL:"roster", OPT_PERSIST_ITEMS:1, OPT_MAX_ITEMS:-1,
238 'pubsub#roster_groups_allowed':groups}
239 d = self.host.plugins["XEP-0060"].createNode(client.jid.userhostJID(), name, _options, client.profile)
240 d.addCallback(self._groupNodeCreated, groups, name, message, client)
241 d.addErrback(self._nodeCreationFailed, name, groups, message, client)
242
243 def _getNodeForGroups(self, groups, profile):
244 """Return node associated with the given list of groups
245 @param groups: list of groups
246 @param profile: profile of publisher"""
247 for node in self._blog_nodes[profile]:
248 node_groups = self._blog_nodes[profile][node]
249 if set(node_groups) == set(groups):
250 return node
251 return None
252
253 def _getFreeNode(self, entity, profile):
254 """Return a free group number,
255 raise an exception if we have reach limit"""
256 _all = set([self._getNodeName(idx) for idx in range(1,21)])
257 _used = set(self._blog_nodes[profile].keys())
258 _free = _all.difference(_used)
259 if not _free:
260 msg = _("Can't create group node: no more free node available")
261 warning(msg)
262 raise NodeCreationError(msg)
263 else:
264 return _free.pop()
265
266 def sendGroupBlog(self, groups, message, profile_key='@DEFAULT@'): 102 def sendGroupBlog(self, groups, message, profile_key='@DEFAULT@'):
267 """Publish a microblog to the node associated to the groups 103 """Publish a microblog to the node associated to the groups
268 If the node doesn't exist, it is created, then the message is posted 104 If the node doesn't exist, it is created, then the message is posted
269 @param groups: list of groups allowed to retrieve the microblog 105 @param groups: list of groups allowed to retrieve the microblog
270 @param message: microblog 106 @param message: microblog
271 @profile_key: %(doc_profile)s 107 @profile_key: %(doc_profile)s
272 """ 108 """
109 print "sendGroupBlog"
273 profile = self.host.memory.getProfileName(profile_key) 110 profile = self.host.memory.getProfileName(profile_key)
274 if not profile: 111 if not profile:
275 error(_("Unknown profile")) 112 error(_("Unknown profile"))
276 return 113 return
277 114
278 def after_init(ignore):
279 _groups = list(set(groups).intersection(client.roster.getGroups())) #We only keep group which actually exist
280 #TODO: send an error signal if user want to post to non existant groups
281 _groups.sort()
282 for group in _groups:
283 _node = self._getNodeForGroups([group], profile)
284 if not _node:
285 _node_name = self._getFreeNode(client.jid, profile)
286 self._createNode(None, _node_name, [group], message, client)
287 else:
288 self._publishMblog(_node, message, client)
289
290 client = self.host.getClient(profile) 115 client = self.host.getClient(profile)
291 if not client: 116 if not client:
292 error(_('No client for this profile key: %s') % profile_key) 117 error(_('No client for this profile key: %s') % profile_key)
293 return 118 return
294 client.client_initialized.addCallback(after_init) 119
120 yield client.client_initialized #we want to be sure that the client is initialized
121
122 #we first check that we have a item-access pubsub server
123 if not hasattr(client,"item_access_pubsub"):
124 debug(_('Looking for item-access power pubsub server'))
125 #we don't have any pubsub server featuring item access yet
126 test = self.host.memory.getServerServiceEntities("pubsub", "service", profile)
127 client.item_access_pubsub = None
128 for entity in self.host.memory.getServerServiceEntities("pubsub", "service", profile):
129 disco = yield client.disco.requestInfo(entity)
130 if set([NS_PUBSUB_ITEM_ACCESS, NS_PUBSUB_AUTO_CREATE, NS_PUBSUB_CREATOR_JID_CHECK]).issubset(disco.features):
131 info(_("item-access powered pubsub service found: [%s]") % entity.full())
132 client.item_access_pubsub = entity
295 133
296 def _doCleaning(self, result, pubsub_ent, profile): 134 if not client.item_access_pubsub:
297 """Compare the node in config node, and the existing nodes, and delete unknown ones""" 135 error(_("No item-access powered pubsub server found, can't post group blog"))
298 #TODO: manage groups which don't exist anymore 136 return
299 assert(len(result)==2)
300 assert(result[0][0]==True and result[1][0]==True)
301 config_nodes = [item.firstChildElement()["node"] for item in result[0][1]]
302 existing_nodes = [item.nodeIdentifier for item in result[1][1]._items]
303 to_delete = set(config_nodes).symmetric_difference(existing_nodes)
304 def check_deletion(result):
305 for (success, value) in result:
306 if not success:
307 msg = _("Can't delete node")
308 error(msg)
309 raise NodeDeletionError(msg)
310 #TODO: log node which was not deleted
311 137
138 _groups = set(groups).intersection(client.roster.getGroups()) #We only keep group which actually exist
139 #TODO: send an error signal if user want to post to non existant groups
312 140
313 d = defer.DeferredList([self.host.plugins["XEP-0060"].deleteNode(pubsub_ent, node, profile) for node in to_delete]) 141 self._publishMblog(client.item_access_pubsub, _groups, message, client)
314 d.addCallback(check_deletion) 142
315 143
316 def cleanBlogCollection(self, profile_key='@DEFAULT@'):
317 """Remove blog nodes not referenced in config node"""
318 debug(_('Cleaning mblog nodes'))
319 profile = self.host.memory.getProfileName(profile_key)
320 if not profile:
321 error(_("Unknown profile"))
322 return {}
323
324 def after_init(ignore):
325 pubsub_ent = self.host.memory.getServerServiceEntity("pubsub", "service", profile)
326 _jid, xmlstream = self.host.getJidNStream(profile_key)
327 d_config = self.host.plugins["XEP-0060"].getItems(pubsub_ent, self._getConfigNode(_jid), profile_key=profile_key)
328 d_root = client.disco.requestItems(pubsub_ent, self._getRootNode(client.jid))
329 defer.DeferredList([d_config, d_root]).addCallback(self._doCleaning, pubsub_ent, profile)
330
331 client = self.host.getClient(profile)
332 if not client:
333 error(_('No client for this profile key: %s') % profile_key)
334 return
335 client.client_initialized.addCallback(after_init)
336
337 def subscribeGroupBlog(self, pub_jid, profile_key='@DEFAULT@'):
338 debug(_('subscribing mblog nodes'))
339 _pub_jid = jid.JID(pub_jid)
340 profile = self.host.memory.getProfileName(profile_key)
341 if not profile:
342 error(_("Unknown profile"))
343 return
344
345 def after_init(ignore):
346 pubsub_ent = self.host.memory.getServerServiceEntity("pubsub", "service", profile)
347 _options = {OPT_SUBSCRIPTION_TYPE:'items', OPT_SUBSCRIPTION_DEPTH:'1'}
348 d = self.host.plugins["XEP-0060"].subscribe(pubsub_ent, self._getRootNode(_pub_jid), options = _options, profile_key=profile)
349 d.addCallback(lambda x: debug(_("%(publisher)s's group node subscribed [%(profile)s]") % {'publisher':_pub_jid.userhost(), 'profile': profile}))
350 d.addErrback(lambda x: error(_("Can't subscribe group node [%(profile)s]") % {'profile': profile}))
351
352 client = self.host.getClient(profile)
353 if not client:
354 error(_('No client for this profile key: %s') % profile_key)
355 return
356 client.client_initialized.addCallback(after_init)
357