Mercurial > libervia-backend
view src/plugins/plugin_misc_groupblog.py @ 663:8004c7d4aba7
core: Deferred in onMessage.
onMessage now use a deferred which is passed to MessageReceived trigger through the post_treat parameter. This can be used by plugins to add deferred in the callback chain.
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 31 Oct 2013 17:18:04 +0100 |
parents | 69a8bfd266a5 |
children | 0c2c1dfb79e4 |
line wrap: on
line source
#!/usr/bin/python # -*- coding: utf-8 -*- # SAT plugin for microbloging with roster access # Copyright (C) 2009, 2010, 2011, 2012, 2013 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from logging import debug, info, warning, error from twisted.internet import defer from twisted.words.protocols.jabber import jid from wokkel import disco, data_form, iwokkel from zope.interface import implements import uuid import urllib try: from twisted.words.protocols.xmlstream import XMPPHandler except ImportError: from wokkel.subprotocols import XMPPHandler NS_PUBSUB = 'http://jabber.org/protocol/pubsub' NS_GROUPBLOG = 'http://goffi.org/protocol/groupblog' NS_NODE_PREFIX = 'urn:xmpp:groupblog:' NS_COMMENT_PREFIX = 'urn:xmpp:comments:' #NS_PUBSUB_EXP = 'http://goffi.org/protocol/pubsub' #for non official features NS_PUBSUB_EXP = NS_PUBSUB # XXX: we can't use custom namespace as Wokkel's PubSubService use official NS NS_PUBSUB_ITEM_ACCESS = NS_PUBSUB_EXP + "#item-access" NS_PUBSUB_CREATOR_JID_CHECK = NS_PUBSUB_EXP + "#creator-jid-check" NS_PUBSUB_ITEM_CONFIG = NS_PUBSUB_EXP + "#item-config" NS_PUBSUB_AUTO_CREATE = NS_PUBSUB + "#auto-create" TYPE_COLLECTION = 'collection' ACCESS_TYPE_MAP = { 'PUBLIC': 'open', 'GROUP': 'roster', 'JID': None, #JID is not yet managed } PLUGIN_INFO = { "name": "Group blogging throught collections", "import_name": "groupblog", "type": "MISC", "protocols": [], "dependencies": ["XEP-0277"], "main": "GroupBlog", "handler": "yes", "description": _("""Implementation of microblogging with roster access""") } class NoCompatiblePubSubServerFound(Exception): pass class BadAccessTypeError(Exception): pass class BadAccessListError(Exception): pass class UnknownType(Exception): pass class GroupBlog(object): """This class use a SàT PubSub Service to manage access on microblog""" def __init__(self, host): info(_("Group blog plugin initialization")) self.host = host host.bridge.addMethod("sendGroupBlog", ".plugin", in_sign='sassa{ss}s', out_sign='', method=self.sendGroupBlog) host.bridge.addMethod("sendGroupBlogComment", ".plugin", in_sign='sss', out_sign='', method=self.sendGroupBlogComment, async=True) host.bridge.addMethod("getLastGroupBlogs", ".plugin", in_sign='sis', out_sign='aa{ss}', method=self.getLastGroupBlogs, async=True) host.bridge.addMethod("getMassiveLastGroupBlogs", ".plugin", in_sign='sasis', out_sign='a{saa{ss}}', method=self.getMassiveLastGroupBlogs, async=True) host.bridge.addMethod("getGroupBlogComments", ".plugin", in_sign='sss', out_sign='aa{ss}', method=self.getGroupBlogComments, async=True) host.bridge.addMethod("subscribeGroupBlog", ".plugin", in_sign='ss', out_sign='', method=self.subscribeGroupBlog, async=True) host.bridge.addMethod("massiveSubscribeGroupBlogs", ".plugin", in_sign='sass', out_sign='', method=self.massiveSubscribeGroupBlogs, async=True) host.trigger.add("PubSubItemsReceived", self.pubSubItemsReceivedTrigger) def getHandler(self, profile): return GroupBlog_handler() @defer.inlineCallbacks def initialise(self, profile_key): """Check that this data for this profile are initialised, and do it else @param client: client of the profile @profile_key: %(doc_profile)s""" profile = self.host.memory.getProfileName(profile_key) if not profile: error(_("Unknown profile")) raise Exception("Unknown profile") client = self.host.getClient(profile) if not client: error(_('No client for this profile key: %s') % profile_key) raise Exception("Unknown profile") yield client.client_initialized # we want to be sure that the client is initialized #we first check that we have a item-access pubsub server if not hasattr(client, "item_access_pubsub"): debug(_('Looking for item-access power pubsub server')) #we don't have any pubsub server featuring item access yet client.item_access_pubsub = None client._item_access_pubsub_pending = defer.Deferred() for entity in self.host.memory.getServerServiceEntities("pubsub", "service", profile): _disco = yield client.disco.requestInfo(entity) #if set([NS_PUBSUB_ITEM_ACCESS, NS_PUBSUB_AUTO_CREATE, NS_PUBSUB_CREATOR_JID_CHECK]).issubset(_disco.features): if set([NS_PUBSUB_AUTO_CREATE, NS_PUBSUB_CREATOR_JID_CHECK]).issubset(_disco.features): info(_("item-access powered pubsub service found: [%s]") % entity.full()) client.item_access_pubsub = entity client._item_access_pubsub_pending.callback(None) if hasattr(client, "_item_access_pubsub_pending"): #XXX: we need to wait for item access pubsub service check yield client._item_access_pubsub_pending del client._item_access_pubsub_pending if not client.item_access_pubsub: error(_("No item-access powered pubsub server found, can't use group blog")) raise NoCompatiblePubSubServerFound defer.returnValue((profile, client)) def pubSubItemsReceivedTrigger(self, event, profile): """"Trigger which catch groupblogs events""" if event.nodeIdentifier.startswith(NS_NODE_PREFIX): # Microblog publisher = jid.JID(event.nodeIdentifier[len(NS_NODE_PREFIX):]) origin_host = publisher.host.split('.') event_host = event.sender.host.split('.') #FIXME: basic origin check, must be improved #TODO: automatic security test if (not (origin_host) or len(event_host) < len(origin_host) or event_host[-len(origin_host):] != origin_host): warning("Host incoherence between %s and %s (hack attempt ?)" % (unicode(event.sender), unicode(publisher))) return client = self.host.getClient(profile) for gbdata in self._itemsConstruction(event.items, publisher, client): self.host.bridge.personalEvent(publisher.full(), "MICROBLOG", gbdata, profile) return False elif event.nodeIdentifier.startswith(NS_COMMENT_PREFIX): # Comment for microblog_data in self._handleCommentsItems(event.items, event.sender, event.nodeIdentifier): publisher = None # FIXME: see below (_handleCommentsItems) self.host.bridge.personalEvent(publisher.full() if publisher else microblog_data["author"], "MICROBLOG", microblog_data, profile) return False return True def _handleCommentsItems(self, items, service, node_identifier): """ Convert comments items to groupblog data, and send them as signals @param items: comments items @param service: jid of the PubSub service used @param node_identifier: comments node @return: list of group blog data """ ret = [] for item in items: publisher = "" # FIXME: publisher attribute for item in SàT pubsub is not managed yet, so # publisher is not checked and can be easily spoofed. This need to be fixed # quickly. microblog_data = self.item2gbdata(item, "comment") microblog_data["service"] = service.userhost() microblog_data["node"] = node_identifier microblog_data["verified_publisher"] = "true" if publisher else "false" ret.append(microblog_data) return ret def _parseAccessData(self, microblog_data, item): P = self.host.plugins["XEP-0060"] form_elts = [child for child in item.elements() if child.name == "x"] for form_elt in form_elts: form = data_form.Form.fromElement(form_elt) if (form.formNamespace == NS_PUBSUB_ITEM_CONFIG): access_model = form.get(P.OPT_ACCESS_MODEL, 'open') if access_model == "roster": try: microblog_data["groups"] = '\n'.join(form.fields[P.OPT_ROSTER_GROUPS_ALLOWED].values) except KeyError: warning("No group found for roster access-model") microblog_data["groups"] = '' break def item2gbdata(self, item, _type="main_item"): """ Convert item to microblog data dictionary + add access data """ microblog_data = self.host.plugins["XEP-0277"].item2mbdata(item) microblog_data["type"] = _type self._parseAccessData(microblog_data, item) return microblog_data def getNodeName(self, publisher): """Retrieve the name of publisher's node @param publisher: publisher's jid @return: node's name (string)""" return NS_NODE_PREFIX + publisher.userhost() def _publishMblog(self, service, client, access_type, access_list, message, options): """Actually publish the message on the group blog @param service: jid of the item-access pubsub service @param client: SatXMPPClient of the published @param access_type: one of "PUBLIC", "GROUP", "JID" @param access_list: set of entities (empty list for all, groups or jids) allowed to see the item @param message: message to publish @param options: dict which option name as key, which can be: - allow_comments: True to accept comments, False else (default: False) """ node_name = self.getNodeName(client.jid) mblog_data = {'content': message} P = self.host.plugins["XEP-0060"] access_model_value = ACCESS_TYPE_MAP[access_type] if options.get('allow_comments', 'False').lower() == 'true': comments_node = "%s_%s__%s" % (NS_COMMENT_PREFIX, str(uuid.uuid4()), node_name) mblog_data['comments'] = "xmpp:%(service)s?%(query)s" % {'service': service.userhost(), 'query': urllib.urlencode([('node',comments_node.encode('utf-8'))])} _options = {P.OPT_ACCESS_MODEL: access_model_value, P.OPT_PERSIST_ITEMS: 1, P.OPT_MAX_ITEMS: -1, P.OPT_DELIVER_PAYLOADS: 1, P.OPT_SEND_ITEM_SUBSCRIBE: 1, P.OPT_PUBLISH_MODEL: "subscribers", #TODO: should be open if *both* node and item access_model are open (public node and item) } if access_model_value == 'roster': _options[P.OPT_ROSTER_GROUPS_ALLOWED] = list(access_list) # FIXME: check comments node creation success, at the moment this is a potential security risk (if the node # already exists, the creation will silently fail, but the comments link will stay the same, linking to a # node owned by somebody else) defer_blog = self.host.plugins["XEP-0060"].createNode(service, comments_node, _options, profile_key=client.profile) mblog_item = self.host.plugins["XEP-0277"].data2entry(mblog_data, client.profile) form = data_form.Form('submit', formNamespace=NS_PUBSUB_ITEM_CONFIG) if access_type == "PUBLIC": if access_list: raise BadAccessListError("access_list must be empty for PUBLIC access") access = data_form.Field(None, P.OPT_ACCESS_MODEL, value=access_model_value) form.addField(access) elif access_type == "GROUP": access = data_form.Field(None, P.OPT_ACCESS_MODEL, value=access_model_value) allowed = data_form.Field(None, P.OPT_ROSTER_GROUPS_ALLOWED, values=access_list) form.addField(access) form.addField(allowed) mblog_item.addChild(form.toElement()) elif access_type == "JID": raise NotImplementedError else: error(_("Unknown access_type")) raise BadAccessTypeError defer_blog = self.host.plugins["XEP-0060"].publish(service, node_name, items=[mblog_item], profile_key=client.profile) defer_blog.addErrback(self._mblogPublicationFailed) def _mblogPublicationFailed(self, failure): #TODO return failure def sendGroupBlog(self, access_type, access_list, message, options, profile_key='@NONE@'): """Publish a microblog with given item access @param access_type: one of "PUBLIC", "GROUP", "JID" @param access_list: list of authorized entity (empty list for PUBLIC ACCESS, list of groups or list of jids) for this item @param message: microblog @param options: dict which option name as key, which can be: - allow_comments: True to accept comments, False else (default: False) @profile_key: %(doc_profile)s """ def initialised(result): profile, client = result if access_type == "PUBLIC": if access_list: raise Exception("Publishers list must be empty when getting microblogs for all contacts") self._publishMblog(client.item_access_pubsub, client, "PUBLIC", [], message, options) elif access_type == "GROUP": _groups = set(access_list).intersection(client.roster.getGroups()) # We only keep group which actually exist if not _groups: raise BadAccessListError("No valid group") self._publishMblog(client.item_access_pubsub, client, "GROUP", _groups, message, options) elif access_type == "JID": raise NotImplementedError else: error(_("Unknown access type")) raise BadAccessTypeError self.initialise(profile_key).addCallback(initialised) def sendGroupBlogComment(self, node_url, message, profile_key='@NONE@'): """Publish a comment in the given node @param node url: link to the comments node as specified in XEP-0277 and given in microblog data's comments key @param message: comment @profile_key: %(doc_profile)s """ profile = self.host.memory.getProfileName(profile_key) if not profile: error(_("Unknown profile")) raise Exception("Unknown profile") service, node = self.host.plugins["XEP-0277"].parseCommentUrl(node_url) mblog_data = {'content': message} mblog_item = self.host.plugins["XEP-0277"].data2entry(mblog_data, profile) return self.host.plugins["XEP-0060"].publish(service, node, items=[mblog_item], profile_key=profile) def _itemsConstruction(self, items, pub_jid, client): """ Transforms items to group blog data and manage comments node @param items: iterable of items @param pub_jid: jid of the publisher or None to use items data @param client: SatXMPPClient instance @return: list of group blog data """ # TODO: use items data when pub_jid is None ret = [] for item in items: gbdata = self.item2gbdata(item) ret.append(gbdata) # if there is a comments node, we subscribe to it if "comments_node" in gbdata: try: # every comments node must be subscribed, except if we are the publisher (we are already subscribed in this case) if pub_jid.userhostJID() != client.jid.userhostJID(): self.host.plugins["XEP-0060"].subscribe(jid.JID(gbdata["comments_service"]), gbdata["comments_node"], profile_key=client.profile) except KeyError: warning("Missing key for comments") return ret def getLastGroupBlogs(self, pub_jid_s, max_items=10, profile_key='@NONE@'): """Get the last published microblogs @param pub_jid_s: jid of the publisher @param max_items: how many microblogs we want to get (see XEP-0060 #6.5.7) @param profile_key: profile key @return: list of microblog data (dict) """ pub_jid = jid.JID(pub_jid_s) def initialised(result): profile, client = result d = self.host.plugins["XEP-0060"].getItems(client.item_access_pubsub, self.getNodeName(pub_jid), max_items=max_items, profile_key=profile_key) d.addCallback(self._itemsConstruction, pub_jid, client) d.addErrback(lambda ignore: {}) # TODO: more complete error management (log !) return d #TODO: we need to use the server corresponding the the host of the jid return self.initialise(profile_key).addCallback(initialised) def getGroupBlogComments(self, service_s, node, profile_key='@NONE@'): """Get all comments of given node @param service_s: service hosting the node @param node: comments node @param profile_key: profile key @return: list of microblog data (dict) """ service = jid.JID(service_s) def initialised(result): profile, client = result d = self.host.plugins["XEP-0060"].getItems(service, node, profile_key=profile_key) d.addCallback(self._handleCommentsItems, service, node) d.addErrback(lambda ignore: {}) # TODO: more complete error management (log !) return d #TODO: we need to use the server corresponding the the host of the jid return self.initialise(profile_key).addCallback(initialised) def getMassiveLastGroupBlogs(self, publishers_type, publishers, max_items=10, profile_key='@NONE@'): """Get the last published microblogs for a list of groups or jids @param publishers_type: type of the list of publishers (one of "GROUP" or "JID" or "ALL") @param publishers: list of publishers, according to "publishers_type" (list of groups or list of jids) @param max_items: how many microblogs we want to get @param profile_key: profile key """ def sendResult(result): """send result of DeferredList (list of microblogs to the calling method""" ret = {} for (success, value) in result: if success: source_jid, data = value ret[source_jid] = data return ret def initialised(result): profile, client = result if publishers_type == "ALL": contacts = client.roster.getItems() jids = [contact.jid.userhost() for contact in contacts] elif publishers_type == "GROUP": jids = [] for _group in publishers: jids.extend(client.roster.getJidsFromGroup(_group)) elif publishers_type == 'JID': jids = publishers else: raise UnknownType mblogs = [] for jid_s in jids: _jid = jid.JID(jid_s) d = self.host.plugins["XEP-0060"].getItems(client.item_access_pubsub, self.getNodeName(_jid), max_items=max_items, profile_key=profile_key) d.addCallback(lambda items, source_jid: (source_jid, self._itemsConstruction(items, _jid, client)), jid_s) mblogs.append(d) dlist = defer.DeferredList(mblogs) dlist.addCallback(sendResult) return dlist #TODO: custom exception if publishers_type not in ["GROUP", "JID", "ALL"]: raise Exception("Bad call, unknown publishers_type") if publishers_type == "ALL" and publishers: raise Exception("Publishers list must be empty when getting microblogs for all contacts") return self.initialise(profile_key).addCallback(initialised) #TODO: we need to use the server corresponding the the host of the jid def subscribeGroupBlog(self, pub_jid, profile_key='@NONE@'): def initialised(result): profile, client = result d = self.host.plugins["XEP-0060"].subscribe(client.item_access_pubsub, self.getNodeName(jid.JID(pub_jid)), profile_key=profile_key) return d #TODO: we need to use the server corresponding the the host of the jid return self.initialise(profile_key).addCallback(initialised) def massiveSubscribeGroupBlogs(self, publishers_type, publishers, profile_key='@NONE@'): """Subscribe microblogs for a list of groups or jids @param publishers_type: type of the list of publishers (one of "GROUP" or "JID" or "ALL") @param publishers: list of publishers, according to "publishers_type" (list of groups or list of jids) @param profile_key: profile key """ def initialised(result): profile, client = result if publishers_type == "ALL": contacts = client.roster.getItems() jids = [contact.jid.userhost() for contact in contacts] elif publishers_type == "GROUP": jids = [] for _group in publishers: jids.extend(client.roster.getJidsFromGroup(_group)) elif publishers_type == 'JID': jids = publishers else: raise UnknownType mblogs = [] for _jid in jids: d = self.host.plugins["XEP-0060"].subscribe(client.item_access_pubsub, self.getNodeName(jid.JID(_jid)), profile_key=profile_key) mblogs.append(d) dlist = defer.DeferredList(mblogs) return dlist #TODO: custom exception if publishers_type not in ["GROUP", "JID", "ALL"]: raise Exception("Bad call, unknown publishers_type") if publishers_type == "ALL" and publishers: raise Exception("Publishers list must be empty when getting microblogs for all contacts") return self.initialise(profile_key).addCallback(initialised) #TODO: we need to use the server corresponding the the host of the jid class GroupBlog_handler(XMPPHandler): implements(iwokkel.IDisco) def getDiscoInfo(self, requestor, target, nodeIdentifier=''): return [disco.DiscoFeature(NS_GROUPBLOG)] def getDiscoItems(self, requestor, target, nodeIdentifier=''): return []