comparison libervia/backend/plugins/plugin_xep_0070.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200
parents sat/plugins/plugin_xep_0070.py@524856bd7b19
children 0d7bb4df2343
comparison
equal deleted inserted replaced
4070:d10748475025 4071:4b842c1fb686
1 #!/usr/bin/env python3
2
3
4 # SAT plugin for managing xep-0070
5 # Copyright (C) 2009-2016 Geoffrey POUZET (chteufleur@kingpenguin.tk)
6
7 # This program is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU Affero General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
11
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU Affero General Public License for more details.
16
17 # You should have received a copy of the GNU Affero General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 from libervia.backend.core.i18n import _, D_
20 from libervia.backend.core.constants import Const as C
21 from libervia.backend.core.log import getLogger
22 from twisted.words.protocols.jabber import xmlstream
23 from twisted.words.protocols import jabber
24
25 log = getLogger(__name__)
26 from libervia.backend.tools import xml_tools
27
28 from wokkel import disco, iwokkel
29 from zope.interface import implementer
30
31 try:
32 from twisted.words.protocols.xmlstream import XMPPHandler
33 except ImportError:
34 from wokkel.subprotocols import XMPPHandler
35
36
37 NS_HTTP_AUTH = "http://jabber.org/protocol/http-auth"
38
39 IQ = "iq"
40 IQ_GET = "/" + IQ + '[@type="get"]'
41 IQ_HTTP_AUTH_REQUEST = IQ_GET + '/confirm[@xmlns="' + NS_HTTP_AUTH + '"]'
42
43 MSG = "message"
44 MSG_GET = "/" + MSG + '[@type="normal"]'
45 MSG_HTTP_AUTH_REQUEST = MSG_GET + '/confirm[@xmlns="' + NS_HTTP_AUTH + '"]'
46
47
48 PLUGIN_INFO = {
49 C.PI_NAME: "XEP-0070 Plugin",
50 C.PI_IMPORT_NAME: "XEP-0070",
51 C.PI_TYPE: "XEP",
52 C.PI_PROTOCOLS: ["XEP-0070"],
53 C.PI_DEPENDENCIES: [],
54 C.PI_MAIN: "XEP_0070",
55 C.PI_HANDLER: "yes",
56 C.PI_DESCRIPTION: _("""Implementation of HTTP Requests via XMPP"""),
57 }
58
59
60 class XEP_0070(object):
61 """
62 Implementation for XEP 0070.
63 """
64
65 def __init__(self, host):
66 log.info(_("Plugin XEP_0070 initialization"))
67 self.host = host
68 self._dictRequest = dict()
69
70 def get_handler(self, client):
71 return XEP_0070_handler(self, client.profile)
72
73 def on_http_auth_request_iq(self, iq_elt, client):
74 """This method is called on confirmation request received (XEP-0070 #4.5)
75
76 @param iq_elt: IQ element
77 @param client: %(doc_client)s
78 """
79 log.info(_("XEP-0070 Verifying HTTP Requests via XMPP (iq)"))
80 self._treat_http_auth_request(iq_elt, IQ, client)
81
82 def on_http_auth_request_msg(self, msg_elt, client):
83 """This method is called on confirmation request received (XEP-0070 #4.5)
84
85 @param msg_elt: message element
86 @param client: %(doc_client)s
87 """
88 log.info(_("XEP-0070 Verifying HTTP Requests via XMPP (message)"))
89 self._treat_http_auth_request(msg_elt, MSG, client)
90
91 def _treat_http_auth_request(self, elt, stanzaType, client):
92 elt.handled = True
93 auth_elt = next(elt.elements(NS_HTTP_AUTH, "confirm"))
94 auth_id = auth_elt["id"]
95 auth_method = auth_elt["method"]
96 auth_url = auth_elt["url"]
97 self._dictRequest[client] = (auth_id, auth_method, auth_url, stanzaType, elt)
98 title = D_("Auth confirmation")
99 message = D_("{auth_url} needs to validate your identity, do you agree?\n"
100 "Validation code : {auth_id}\n\n"
101 "Please check that this code is the same as on {auth_url}"
102 ).format(auth_url=auth_url, auth_id=auth_id)
103 d = xml_tools.defer_confirm(self.host, message=message, title=title,
104 profile=client.profile)
105 d.addCallback(self._auth_request_callback, client)
106
107 def _auth_request_callback(self, authorized, client):
108 try:
109 auth_id, auth_method, auth_url, stanzaType, elt = self._dictRequest.pop(
110 client)
111 except KeyError:
112 authorized = False
113
114 if authorized:
115 if stanzaType == IQ:
116 # iq
117 log.debug(_("XEP-0070 reply iq"))
118 iq_result_elt = xmlstream.toResponse(elt, "result")
119 client.send(iq_result_elt)
120 elif stanzaType == MSG:
121 # message
122 log.debug(_("XEP-0070 reply message"))
123 msg_result_elt = xmlstream.toResponse(elt, "result")
124 msg_result_elt.addChild(next(elt.elements(NS_HTTP_AUTH, "confirm")))
125 client.send(msg_result_elt)
126 else:
127 log.debug(_("XEP-0070 reply error"))
128 result_elt = jabber.error.StanzaError("not-authorized").toResponse(elt)
129 client.send(result_elt)
130
131
132 @implementer(iwokkel.IDisco)
133 class XEP_0070_handler(XMPPHandler):
134
135 def __init__(self, plugin_parent, profile):
136 self.plugin_parent = plugin_parent
137 self.host = plugin_parent.host
138 self.profile = profile
139
140 def connectionInitialized(self):
141 self.xmlstream.addObserver(
142 IQ_HTTP_AUTH_REQUEST,
143 self.plugin_parent.on_http_auth_request_iq,
144 client=self.parent,
145 )
146 self.xmlstream.addObserver(
147 MSG_HTTP_AUTH_REQUEST,
148 self.plugin_parent.on_http_auth_request_msg,
149 client=self.parent,
150 )
151
152 def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
153 return [disco.DiscoFeature(NS_HTTP_AUTH)]
154
155 def getDiscoItems(self, requestor, target, nodeIdentifier=""):
156 return []