Mercurial > libervia-backend
view src/test/test_plugin_xep_0313.py @ 1669:697effba0310
plugin pipe: rewritten plugin as a jingle application. The current implentation can, in some cases, block the backend, and is experimental only. Improvments are needed in the future.
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 25 Nov 2015 02:04:43 +0100 |
parents | 2d8fccec84e8 |
children | d17772b0fe22 |
line wrap: on
line source
#!/usr/bin/python # -*- coding: utf-8 -*- # SAT: a jabber client # Copyright (C) 2009, 2010, 2011, 2012, 2013, 2014, 2015 Jérôme Poisson (goffi@goffi.org) # Copyright (C) 2013, 2014, 2015 Adrien Cossa (souliane@mailoo.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. """ Plugin XEP-0313 """ from constants import Const as C from sat.test import helpers from sat.plugins.plugin_xep_0313 import XEP_0313 from twisted.words.protocols.jabber.jid import JID from twisted.words.xish import domish from dateutil.tz import tzutc import datetime # TODO: change this when RSM and MAM are in wokkel from sat.tmp.wokkel.rsm import RSMRequest from sat.tmp.wokkel.mam import buildForm NS_PUBSUB = 'http://jabber.org/protocol/pubsub' SERVICE = 'sat-pubsub.tazar.int' SERVICE_JID = JID(SERVICE) class XEP_0313Test(helpers.SatTestCase): def setUp(self): self.host = helpers.FakeSAT() self.plugin = XEP_0313(self.host) client = self.plugin.getHandler(C.PROFILE[0]) client.makeConnection(self.host.getClient(C.PROFILE[0]).xmlstream) def test_queryArchive(self): xml = """ <iq type='set' id='%s' to='%s'> <query xmlns='urn:xmpp:mam:0'/> </iq> """ % (("H_%d" % domish.Element._idCounter), SERVICE) d = self.plugin.queryArchive(SERVICE_JID, profile_key=C.PROFILE[0]) d.addCallback(lambda dummy: self.assertEqualXML(self.host.getSentMessageXml(0), xml, True)) return d def test_queryArchivePubsub(self): xml = """ <iq type='set' id='%s' to='%s'> <query xmlns='urn:xmpp:mam:0' node='fdp/submitted/capulet.lit/sonnets' /> </iq> """ % (("H_%d" % domish.Element._idCounter), SERVICE) d = self.plugin.queryArchive(SERVICE_JID, node="fdp/submitted/capulet.lit/sonnets", profile_key=C.PROFILE[0]) d.addCallback(lambda dummy: self.assertEqualXML(self.host.getSentMessageXml(0), xml, True)) return d def test_queryArchiveWith(self): xml = """ <iq type='set' id='%s' to='%s'> <query xmlns='urn:xmpp:mam:0'> <x xmlns='jabber:x:data' type='submit'> <field var='FORM_TYPE' type='hidden'> <value>urn:xmpp:mam:0</value> </field> <field var='with' type='jid-single'> <value>juliet@capulet.lit</value> </field> </x> </query> </iq> """ % (("H_%d" % domish.Element._idCounter), SERVICE) form = buildForm(with_jid=JID('juliet@capulet.lit')) d = self.plugin.queryArchive(SERVICE_JID, form, profile_key=C.PROFILE[0]) d.addCallback(lambda dummy: self.assertEqualXML(self.host.getSentMessageXml(0), xml, True)) return d def test_queryArchiveStartEnd(self): xml = """ <iq type='set' id='%s' to='%s'> <query xmlns='urn:xmpp:mam:0'> <x xmlns='jabber:x:data' type='submit'> <field var='FORM_TYPE' type='hidden'> <value>urn:xmpp:mam:0</value> </field> <field var='start' type='text-single'> <value>2010-06-07T00:00:00Z</value> </field> <field var='end' type='text-single'> <value>2010-07-07T13:23:54Z</value> </field> </x> </query> </iq> """ % (("H_%d" % domish.Element._idCounter), SERVICE) start = datetime.datetime(2010, 6, 7, 0, 0, 0, tzinfo=tzutc()) end = datetime.datetime(2010, 7, 7, 13, 23, 54, tzinfo=tzutc()) form = buildForm(start=start, end=end) d = self.plugin.queryArchive(SERVICE_JID, form, profile_key=C.PROFILE[0]) d.addCallback(lambda dummy: self.assertEqualXML(self.host.getSentMessageXml(0), xml, True)) return d def test_queryArchiveStart(self): xml = """ <iq type='set' id='%s' to='%s'> <query xmlns='urn:xmpp:mam:0'> <x xmlns='jabber:x:data' type='submit'> <field var='FORM_TYPE' type='hidden'> <value>urn:xmpp:mam:0</value> </field> <field var='start' type='text-single'> <value>2010-08-07T00:00:00Z</value> </field> </x> </query> </iq> """ % (("H_%d" % domish.Element._idCounter), SERVICE) start = datetime.datetime(2010, 8, 7, 0, 0, 0, tzinfo=tzutc()) form = buildForm(start=start) d = self.plugin.queryArchive(SERVICE_JID, form, profile_key=C.PROFILE[0]) d.addCallback(lambda dummy: self.assertEqualXML(self.host.getSentMessageXml(0), xml, True)) return d def test_queryArchiveRSM(self): xml = """ <iq type='set' id='%s' to='%s'> <query xmlns='urn:xmpp:mam:0'> <x xmlns='jabber:x:data' type='submit'> <field var='FORM_TYPE' type='hidden'> <value>urn:xmpp:mam:0</value> </field> <field var='start' type='text-single'> <value>2010-08-07T00:00:00Z</value> </field> </x> <set xmlns='http://jabber.org/protocol/rsm'> <max>10</max> </set> </query> </iq> """ % (("H_%d" % domish.Element._idCounter), SERVICE) start = datetime.datetime(2010, 8, 7, 0, 0, 0, tzinfo=tzutc()) form = buildForm(start=start) rsm = RSMRequest(max_=10) d = self.plugin.queryArchive(SERVICE_JID, form=form, rsm=rsm, profile_key=C.PROFILE[0]) d.addCallback(lambda dummy: self.assertEqualXML(self.host.getSentMessageXml(0), xml, True)) return d def test_queryArchiveRSMPaging(self): xml = """ <iq type='set' id='%s' to='%s'> <query xmlns='urn:xmpp:mam:0'> <x xmlns='jabber:x:data' type='submit'> <field var='FORM_TYPE' type='hidden'><value>urn:xmpp:mam:0</value></field> <field var='start' type='text-single'><value>2010-08-07T00:00:00Z</value></field> </x> <set xmlns='http://jabber.org/protocol/rsm'> <max>10</max> <after>09af3-cc343-b409f</after> </set> </query> </iq> """ % (("H_%d" % domish.Element._idCounter), SERVICE) start = datetime.datetime(2010, 8, 7, 0, 0, 0, tzinfo=tzutc()) form = buildForm(start=start) rsm = RSMRequest(max_=10, after=u'09af3-cc343-b409f') d = self.plugin.queryArchive(SERVICE_JID, form=form, rsm=rsm, profile_key=C.PROFILE[0]) d.addCallback(lambda dummy: self.assertEqualXML(self.host.getSentMessageXml(0), xml, True)) return d def test_queryFields(self): xml = """ <iq type='get' id="%s" to='%s'> <query xmlns='urn:xmpp:mam:0'/> </iq> """ % (("H_%d" % domish.Element._idCounter), SERVICE) d = self.plugin.queryFields(SERVICE_JID, C.PROFILE[0]) d.addCallback(lambda dummy: self.assertEqualXML(self.host.getSentMessageXml(0), xml, True)) return d def test_queryArchiveFields(self): xml = """ <iq type='set' id='%s' to='%s'> <query xmlns='urn:xmpp:mam:0'> <x xmlns='jabber:x:data' type='submit'> <field type='hidden' var='FORM_TYPE'> <value>urn:xmpp:mam:0</value> </field> <field type='text-single' var='urn:example:xmpp:free-text-search'> <value>Where arth thou, my Juliet?</value> </field> <field type='text-single' var='urn:example:xmpp:stanza-content'> <value>{http://jabber.org/protocol/mood}mood/lonely</value> </field> </x> </query> </iq> """ % (("H_%d" % domish.Element._idCounter), SERVICE) extra = [{'fieldType': 'text-single', 'var': 'urn:example:xmpp:free-text-search', 'value': 'Where arth thou, my Juliet?'}, {'fieldType': 'text-single', 'var': 'urn:example:xmpp:stanza-content', 'value': '{http://jabber.org/protocol/mood}mood/lonely'}] form = buildForm(extra=extra) d = self.plugin.queryArchive(SERVICE_JID, form=form, profile_key=C.PROFILE[0]) d.addCallback(lambda dummy: self.assertEqualXML(self.host.getSentMessageXml(0), xml, True)) return d def test_queryPrefs(self): xml = """ <iq type='get' id='%s' to='%s'> <prefs xmlns='urn:xmpp:mam:0'> <always/> <never/> </prefs> </iq> """ % (("H_%d" % domish.Element._idCounter), SERVICE) d = self.plugin.getPrefs(SERVICE_JID, profile_key=C.PROFILE[0]) d.addCallback(lambda dummy: self.assertEqualXML(self.host.getSentMessageXml(0), xml, True)) return d def test_setPrefs(self): xml = """ <iq type='set' id='%s' to='%s'> <prefs xmlns='urn:xmpp:mam:0' default='roster'> <always> <jid>romeo@montague.lit</jid> </always> <never> <jid>montague@montague.lit</jid> </never> </prefs> </iq> """ % (("H_%d" % domish.Element._idCounter), SERVICE) always = [JID('romeo@montague.lit')] never = [JID('montague@montague.lit')] d = self.plugin.setPrefs(SERVICE_JID, always=always, never=never, profile_key=C.PROFILE[0]) d.addCallback(lambda dummy: self.assertEqualXML(self.host.getSentMessageXml(0), xml, True)) return d