view src/test/test_plugin_xep_0313.py @ 1459:4c4f88d7b156

plugins xep-0060, xep-0163, xep-0277, groupblog: bloging improvments (huge patch, sorry): /!\ not everything is working yet, and specially groupblogs are broken /!\ - renamed bridge api to use prefixed methods (e.g. psSubscribeToMany instead of subscribeToMany in PubSub) - (xep-0060): try to find a default PubSub service, and put it in client.pubsub_service - (xep-0060): extra dictionary can be used in bridge method for RSM and other options - (xep-0060): XEP_0060.addManagedNode and XEP_0060.removeManagedNode allow to easily catch notifications for a specific node - (xep-0060): retractItem manage "notify" attribute - (xep-0060): new signal psEvent will be used to transmit notifications to frontends - (xep-0060, constants): added a bunch of useful constants - (xep-0163): removed personalEvent in favor of psEvent - (xep-0163): addPEPEvent now filter non PEP events for in_callback - (xep-0277): use of new XEP-0060 plugin's addManagedNode - (xep-0277): fixed author handling for incoming blogs: author is the human readable name, author_jid it jid, and author_jid_verified is set to True is the jid is checked - (xep-0277): reworked data2entry with Twisted instead of feed, item_id can now be specified, <content/> is changed to <title/> if there is only content - (xep-0277): comments are now managed here (core removed from groupblog) - (xep-0277): (comments) node is created if needed, default pubsub service is used if available, else PEP - (xep-0277): retract is managed
author Goffi <goffi@goffi.org>
date Sun, 16 Aug 2015 00:39:44 +0200
parents 2d8fccec84e8
children d17772b0fe22
line wrap: on
line source

#!/usr/bin/python
# -*- coding: utf-8 -*-

# SAT: a jabber client
# Copyright (C) 2009, 2010, 2011, 2012, 2013, 2014, 2015 Jérôme Poisson (goffi@goffi.org)
# Copyright (C) 2013, 2014, 2015 Adrien Cossa (souliane@mailoo.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

""" Plugin XEP-0313 """

from constants import Const as C
from sat.test import helpers
from sat.plugins.plugin_xep_0313 import XEP_0313
from twisted.words.protocols.jabber.jid import JID
from twisted.words.xish import domish
from dateutil.tz import tzutc
import datetime

# TODO: change this when RSM and MAM are in wokkel
from sat.tmp.wokkel.rsm import RSMRequest
from sat.tmp.wokkel.mam import buildForm

NS_PUBSUB = 'http://jabber.org/protocol/pubsub'
SERVICE = 'sat-pubsub.tazar.int'
SERVICE_JID = JID(SERVICE)


class XEP_0313Test(helpers.SatTestCase):

    def setUp(self):
        self.host = helpers.FakeSAT()
        self.plugin = XEP_0313(self.host)
        client = self.plugin.getHandler(C.PROFILE[0])
        client.makeConnection(self.host.getClient(C.PROFILE[0]).xmlstream)

    def test_queryArchive(self):
        xml = """
        <iq type='set' id='%s' to='%s'>
          <query xmlns='urn:xmpp:mam:0'/>
        </iq>
        """ % (("H_%d" % domish.Element._idCounter), SERVICE)
        d = self.plugin.queryArchive(SERVICE_JID, profile_key=C.PROFILE[0])
        d.addCallback(lambda dummy: self.assertEqualXML(self.host.getSentMessageXml(0), xml, True))
        return d

    def test_queryArchivePubsub(self):
        xml = """
        <iq type='set' id='%s' to='%s'>
          <query xmlns='urn:xmpp:mam:0' node='fdp/submitted/capulet.lit/sonnets' />
        </iq>
        """ % (("H_%d" % domish.Element._idCounter), SERVICE)
        d = self.plugin.queryArchive(SERVICE_JID, node="fdp/submitted/capulet.lit/sonnets", profile_key=C.PROFILE[0])
        d.addCallback(lambda dummy: self.assertEqualXML(self.host.getSentMessageXml(0), xml, True))
        return d

    def test_queryArchiveWith(self):
        xml = """
        <iq type='set' id='%s' to='%s'>
          <query xmlns='urn:xmpp:mam:0'>
            <x xmlns='jabber:x:data' type='submit'>
              <field var='FORM_TYPE' type='hidden'>
                <value>urn:xmpp:mam:0</value>
              </field>
              <field var='with' type='jid-single'>
                <value>juliet@capulet.lit</value>
              </field>
            </x>
          </query>
        </iq>
        """ % (("H_%d" % domish.Element._idCounter), SERVICE)
        form = buildForm(with_jid=JID('juliet@capulet.lit'))
        d = self.plugin.queryArchive(SERVICE_JID, form, profile_key=C.PROFILE[0])
        d.addCallback(lambda dummy: self.assertEqualXML(self.host.getSentMessageXml(0), xml, True))
        return d

    def test_queryArchiveStartEnd(self):
        xml = """
        <iq type='set' id='%s' to='%s'>
          <query xmlns='urn:xmpp:mam:0'>
            <x xmlns='jabber:x:data' type='submit'>
              <field var='FORM_TYPE' type='hidden'>
                <value>urn:xmpp:mam:0</value>
              </field>
              <field var='start' type='text-single'>
                <value>2010-06-07T00:00:00Z</value>
              </field>
              <field var='end' type='text-single'>
                <value>2010-07-07T13:23:54Z</value>
              </field>
            </x>
          </query>
        </iq>
        """ % (("H_%d" % domish.Element._idCounter), SERVICE)
        start = datetime.datetime(2010, 6, 7, 0, 0, 0, tzinfo=tzutc())
        end = datetime.datetime(2010, 7, 7, 13, 23, 54, tzinfo=tzutc())
        form = buildForm(start=start, end=end)
        d = self.plugin.queryArchive(SERVICE_JID, form, profile_key=C.PROFILE[0])
        d.addCallback(lambda dummy: self.assertEqualXML(self.host.getSentMessageXml(0), xml, True))
        return d

    def test_queryArchiveStart(self):
        xml = """
        <iq type='set' id='%s' to='%s'>
          <query xmlns='urn:xmpp:mam:0'>
            <x xmlns='jabber:x:data' type='submit'>
              <field var='FORM_TYPE' type='hidden'>
                <value>urn:xmpp:mam:0</value>
              </field>
              <field var='start' type='text-single'>
                <value>2010-08-07T00:00:00Z</value>
              </field>
            </x>
          </query>
        </iq>
        """ % (("H_%d" % domish.Element._idCounter), SERVICE)
        start = datetime.datetime(2010, 8, 7, 0, 0, 0, tzinfo=tzutc())
        form = buildForm(start=start)
        d = self.plugin.queryArchive(SERVICE_JID, form, profile_key=C.PROFILE[0])
        d.addCallback(lambda dummy: self.assertEqualXML(self.host.getSentMessageXml(0), xml, True))
        return d

    def test_queryArchiveRSM(self):
        xml = """
        <iq type='set' id='%s' to='%s'>
          <query xmlns='urn:xmpp:mam:0'>
            <x xmlns='jabber:x:data' type='submit'>
              <field var='FORM_TYPE' type='hidden'>
                <value>urn:xmpp:mam:0</value>
              </field>
              <field var='start' type='text-single'>
                <value>2010-08-07T00:00:00Z</value>
              </field>
            </x>
            <set xmlns='http://jabber.org/protocol/rsm'>
              <max>10</max>
            </set>
          </query>
        </iq>
        """ % (("H_%d" % domish.Element._idCounter), SERVICE)
        start = datetime.datetime(2010, 8, 7, 0, 0, 0, tzinfo=tzutc())
        form = buildForm(start=start)
        rsm = RSMRequest(max_=10)
        d = self.plugin.queryArchive(SERVICE_JID, form=form, rsm=rsm, profile_key=C.PROFILE[0])
        d.addCallback(lambda dummy: self.assertEqualXML(self.host.getSentMessageXml(0), xml, True))
        return d

    def test_queryArchiveRSMPaging(self):
        xml = """
        <iq type='set' id='%s' to='%s'>
          <query xmlns='urn:xmpp:mam:0'>
              <x xmlns='jabber:x:data' type='submit'>
                <field var='FORM_TYPE' type='hidden'><value>urn:xmpp:mam:0</value></field>
                <field var='start' type='text-single'><value>2010-08-07T00:00:00Z</value></field>
              </x>
              <set xmlns='http://jabber.org/protocol/rsm'>
                 <max>10</max>
                 <after>09af3-cc343-b409f</after>
              </set>
          </query>
        </iq>
        """ % (("H_%d" % domish.Element._idCounter), SERVICE)
        start = datetime.datetime(2010, 8, 7, 0, 0, 0, tzinfo=tzutc())
        form = buildForm(start=start)
        rsm = RSMRequest(max_=10, after=u'09af3-cc343-b409f')
        d = self.plugin.queryArchive(SERVICE_JID, form=form, rsm=rsm, profile_key=C.PROFILE[0])
        d.addCallback(lambda dummy: self.assertEqualXML(self.host.getSentMessageXml(0), xml, True))
        return d

    def test_queryFields(self):
        xml = """
        <iq type='get' id="%s" to='%s'>
          <query xmlns='urn:xmpp:mam:0'/>
        </iq>
        """ % (("H_%d" % domish.Element._idCounter), SERVICE)
        d = self.plugin.queryFields(SERVICE_JID, C.PROFILE[0])
        d.addCallback(lambda dummy: self.assertEqualXML(self.host.getSentMessageXml(0), xml, True))
        return d

    def test_queryArchiveFields(self):
        xml = """
        <iq type='set' id='%s' to='%s'>
          <query xmlns='urn:xmpp:mam:0'>
            <x xmlns='jabber:x:data' type='submit'>
              <field type='hidden' var='FORM_TYPE'>
                <value>urn:xmpp:mam:0</value>
              </field>
              <field type='text-single' var='urn:example:xmpp:free-text-search'>
                <value>Where arth thou, my Juliet?</value>
              </field>
              <field type='text-single' var='urn:example:xmpp:stanza-content'>
                <value>{http://jabber.org/protocol/mood}mood/lonely</value>
              </field>
            </x>
          </query>
        </iq>
        """ % (("H_%d" % domish.Element._idCounter), SERVICE)
        extra = [{'fieldType': 'text-single',
                  'var': 'urn:example:xmpp:free-text-search',
                  'value': 'Where arth thou, my Juliet?'},
                 {'fieldType': 'text-single',
                  'var': 'urn:example:xmpp:stanza-content',
                  'value': '{http://jabber.org/protocol/mood}mood/lonely'}]
        form = buildForm(extra=extra)
        d = self.plugin.queryArchive(SERVICE_JID, form=form, profile_key=C.PROFILE[0])
        d.addCallback(lambda dummy: self.assertEqualXML(self.host.getSentMessageXml(0), xml, True))
        return d

    def test_queryPrefs(self):
        xml = """
        <iq type='get' id='%s' to='%s'>
          <prefs xmlns='urn:xmpp:mam:0'>
            <always/>
            <never/>
          </prefs>
        </iq>
        """ % (("H_%d" % domish.Element._idCounter), SERVICE)
        d = self.plugin.getPrefs(SERVICE_JID, profile_key=C.PROFILE[0])
        d.addCallback(lambda dummy: self.assertEqualXML(self.host.getSentMessageXml(0), xml, True))
        return d

    def test_setPrefs(self):
        xml = """
        <iq type='set' id='%s' to='%s'>
          <prefs xmlns='urn:xmpp:mam:0' default='roster'>
            <always>
              <jid>romeo@montague.lit</jid>
            </always>
            <never>
              <jid>montague@montague.lit</jid>
            </never>
          </prefs>
        </iq>
        """ % (("H_%d" % domish.Element._idCounter), SERVICE)
        always = [JID('romeo@montague.lit')]
        never = [JID('montague@montague.lit')]
        d = self.plugin.setPrefs(SERVICE_JID, always=always, never=never, profile_key=C.PROFILE[0])
        d.addCallback(lambda dummy: self.assertEqualXML(self.host.getSentMessageXml(0), xml, True))
        return d