4315
|
1 #!/usr/bin/env python3 |
|
2 |
|
3 # Libervia: an XMPP client |
|
4 # Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org) |
|
5 |
|
6 # This program is free software: you can redistribute it and/or modify |
|
7 # it under the terms of the GNU Affero General Public License as published by |
|
8 # the Free Software Foundation, either version 3 of the License, or |
|
9 # (at your option) any later version. |
|
10 |
|
11 # This program is distributed in the hope that it will be useful, |
|
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
14 # GNU Affero General Public License for more details. |
|
15 |
|
16 # You should have received a copy of the GNU Affero General Public License |
|
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
18 |
|
19 from unittest.mock import MagicMock |
|
20 |
|
21 from pytest_twisted import ensureDeferred as ed |
|
22 from twisted.internet import defer |
|
23 from twisted.words.xish import domish |
|
24 |
|
25 from libervia.backend.models.core import MessageData |
|
26 from libervia.backend.plugins.plugin_xep_0131 import ( |
|
27 HeadersData, |
|
28 NS_SHIM, |
|
29 XEP_0131, |
|
30 Urgency, |
|
31 Priority, |
|
32 ) |
|
33 |
|
34 |
|
35 class TestXEP0131: |
|
36 |
|
37 def test_headers_data_to_element(self): |
|
38 """HeadersData instance is correctly converted to an XML element""" |
|
39 headers = HeadersData( |
|
40 keywords="test,keyword", |
|
41 urgency=Urgency.high, |
|
42 priority=Priority.urgent, |
|
43 custom_header="custom_value", # type: ignore |
|
44 ) |
|
45 headers_elt = headers.to_element() |
|
46 |
|
47 assert headers_elt.uri == NS_SHIM |
|
48 assert headers_elt.name == "headers" |
|
49 header_elts = list(headers_elt.elements(NS_SHIM, "header")) |
|
50 assert len(header_elts) == 4 |
|
51 |
|
52 header_names = set() |
|
53 for header_elt in header_elts: |
|
54 header_names.add(header_elt["name"]) |
|
55 if header_elt["name"] == "keywords": |
|
56 assert str(header_elt) == "test,keyword" |
|
57 elif header_elt["name"] == "urgency": |
|
58 assert str(header_elt) == "high" |
|
59 elif header_elt["name"] == "priority": |
|
60 assert str(header_elt) == "urgent" |
|
61 elif header_elt["name"] == "custom_header": |
|
62 assert str(header_elt) == "custom_value" |
|
63 |
|
64 assert header_names == {"keywords", "urgency", "priority", "custom_header"} |
|
65 |
|
66 def test_headers_data_from_element(self): |
|
67 """HeadersData instance is correctly created from an XML element""" |
|
68 headers_elt = domish.Element((NS_SHIM, "headers")) |
|
69 keywords_elt = headers_elt.addElement("header") |
|
70 keywords_elt["name"] = "keywords" |
|
71 keywords_elt.addContent("test,keyword") |
|
72 urgency_elt = headers_elt.addElement("header") |
|
73 urgency_elt["name"] = "urgency" |
|
74 urgency_elt.addContent("high") |
|
75 custom_elt = headers_elt.addElement("header") |
|
76 custom_elt["name"] = "custom_header" |
|
77 custom_elt.addContent("custom_value") |
|
78 |
|
79 headers = HeadersData.from_element(headers_elt) |
|
80 |
|
81 assert headers.keywords == "test,keyword" |
|
82 assert headers.urgency == Urgency.high |
|
83 assert headers.priority is None |
|
84 assert headers.custom_header == "custom_value" # type: ignore |
|
85 |
|
86 def test_quote_value(self): |
|
87 """Values are correctly quoted when necessary""" |
|
88 xep_0131 = XEP_0131(MagicMock()) |
|
89 |
|
90 assert xep_0131.quote_value("simple") == "simple" |
|
91 assert xep_0131.quote_value("with space") == '"with space"' |
|
92 assert xep_0131.quote_value('with "quotes"') == '"with \\"quotes\\""' |
|
93 assert xep_0131.quote_value("with,comma") == '"with,comma"' |
|
94 |
|
95 def test_unquote_values(self): |
|
96 """Raw header values are correctly unquoted""" |
|
97 xep_0131 = XEP_0131(MagicMock()) |
|
98 |
|
99 assert xep_0131.unquote_values("simple,another") == ["simple", "another"] |
|
100 assert xep_0131.unquote_values('"quoted value",simple') == [ |
|
101 "quoted value", |
|
102 "simple", |
|
103 ] |
|
104 assert xep_0131.unquote_values('"with,comma",simple') == ["with,comma", "simple"] |
|
105 assert xep_0131.unquote_values('"with \\"quotes\\"",simple') == [ |
|
106 'with "quotes"', |
|
107 "simple", |
|
108 ] |
|
109 |
|
110 @ed |
|
111 async def test_send_message_trigger(self): |
|
112 """Headers are correctly added to the message when sending""" |
|
113 xep_0131 = XEP_0131(MagicMock()) |
|
114 client = MagicMock() |
|
115 |
|
116 mess_data = MessageData( |
|
117 { |
|
118 "extra": { |
|
119 "keywords": ["test", "keyword"], |
|
120 "headers": {"urgency": "high", "custom_header": "custom_value"}, |
|
121 }, |
|
122 "xml": domish.Element(("jabber:client", "message")), |
|
123 } |
|
124 ) |
|
125 |
|
126 pre_xml_treatments = MagicMock() |
|
127 post_xml_treatments = defer.Deferred() |
|
128 |
|
129 result = xep_0131.send_message_trigger( |
|
130 client, mess_data, pre_xml_treatments, post_xml_treatments |
|
131 ) |
|
132 assert result is True |
|
133 |
|
134 post_xml_treatments.callback(mess_data) |
|
135 await post_xml_treatments |
|
136 |
|
137 # Check that headers were added to the XML |
|
138 headers_elt = next(mess_data["xml"].elements(NS_SHIM, "headers")) |
|
139 |
|
140 header_names = set() |
|
141 for header_elt in headers_elt.elements(NS_SHIM, "header"): |
|
142 header_names.add(header_elt["name"]) |
|
143 if header_elt["name"] == "keywords": |
|
144 assert str(header_elt) == "test,keyword" |
|
145 elif header_elt["name"] == "urgency": |
|
146 assert str(header_elt) == "high" |
|
147 elif header_elt["name"] == "custom_header": |
|
148 assert str(header_elt) == "custom_value" |
|
149 |
|
150 assert header_names == {"keywords", "urgency", "custom_header"} |
|
151 |
|
152 @ed |
|
153 async def test_message_received_trigger(self): |
|
154 """Headers are correctly parsed from received messages""" |
|
155 xep_0131 = XEP_0131(MagicMock()) |
|
156 client = MagicMock() |
|
157 |
|
158 message_elt = domish.Element(("jabber:client", "message")) |
|
159 headers_elt = message_elt.addElement((NS_SHIM, "headers")) |
|
160 keywords_elt = headers_elt.addElement("header") |
|
161 keywords_elt["name"] = "keywords" |
|
162 keywords_elt.addContent('test,"keyword with space"') |
|
163 urgency_elt = headers_elt.addElement("header") |
|
164 urgency_elt["name"] = "urgency" |
|
165 urgency_elt.addContent("high") |
|
166 |
|
167 post_treat = defer.Deferred() |
|
168 |
|
169 result = xep_0131.message_received_trigger(client, message_elt, post_treat) |
|
170 |
|
171 assert result is True |
|
172 |
|
173 mess_data = MessageData({"extra": {}}) |
|
174 post_treat.callback(mess_data) |
|
175 await post_treat |
|
176 |
|
177 assert mess_data["extra"]["keywords"] == ["test", "keyword with space"] |
|
178 headers = mess_data["extra"]["headers"] |
|
179 assert "keywords" not in headers |
|
180 assert headers["urgency"] == "high" |