Mercurial > prosody-modules
comparison mod_s2soutinjection/mod_s2soutinjection.lua @ 4938:bc8832c6696b
upstream merge
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 11 May 2022 12:44:32 +0200 |
parents | f4a9e804c457 |
children | e55d1f7a570a |
comparison
equal
deleted
inserted
replaced
4913:3ddab718f717 | 4938:bc8832c6696b |
---|---|
1 local st = require"util.stanza"; | 1 local st = require"util.stanza"; |
2 local new_ip = require"util.ip".new_ip; | 2 local new_ip = require"util.ip".new_ip; |
3 local new_outgoing = require"core.s2smanager".new_outgoing; | 3 local new_outgoing = require"core.s2smanager".new_outgoing; |
4 local bounce_sendq = module:depends"s2s".route_to_new_session.bounce_sendq; | 4 local bounce_sendq = module:depends"s2s".route_to_new_session.bounce_sendq; |
5 local s2sout = module:depends"s2s".route_to_new_session.s2sout; | 5 local initialize_filters = require "util.filters".initialize; |
6 local st = require "util.stanza"; | |
7 | |
8 local portmanager = require "core.portmanager"; | |
9 | |
10 local addclient = require "net.server".addclient; | |
11 | |
12 module:depends("s2s"); | |
13 | |
14 local sessions = module:shared("sessions"); | |
6 | 15 |
7 local injected = module:get_option("s2s_connect_overrides"); | 16 local injected = module:get_option("s2s_connect_overrides"); |
8 | 17 |
9 local function isip(addr) | 18 -- The proxy_listener handles connection while still connecting to the proxy, |
10 return not not (addr and addr:match("^%d+%.%d+%.%d+%.%d+$") or addr:match("^[%x:]*:[%x:]-:[%x:]*$")); | 19 -- then it hands them over to the normal listener (in mod_s2s) |
20 local proxy_listener = { default_port = port, default_mode = "*a", default_interface = "*" }; | |
21 | |
22 function proxy_listener.onconnect(conn) | |
23 local session = sessions[conn]; | |
24 | |
25 -- Now the real s2s listener can take over the connection. | |
26 local listener = portmanager.get_service("s2s").listener; | |
27 | |
28 local w, log = conn.send, session.log; | |
29 | |
30 local filter = initialize_filters(session); | |
31 | |
32 session.version = 1; | |
33 | |
34 session.sends2s = function (t) | |
35 log("debug", "sending (s2s over proxy): %s", (t.top_tag and t:top_tag()) or t:match("^[^>]*>?")); | |
36 if t.name then | |
37 t = filter("stanzas/out", t); | |
38 end | |
39 if t then | |
40 t = filter("bytes/out", tostring(t)); | |
41 if t then | |
42 return conn:write(tostring(t)); | |
43 end | |
44 end | |
45 end | |
46 | |
47 session.open_stream = function () | |
48 session.sends2s(st.stanza("stream:stream", { | |
49 xmlns='jabber:server', ["xmlns:db"]='jabber:server:dialback', | |
50 ["xmlns:stream"]='http://etherx.jabber.org/streams', | |
51 from=session.from_host, to=session.to_host, version='1.0', ["xml:lang"]='en'}):top_tag()); | |
52 end | |
53 | |
54 conn.setlistener(conn, listener); | |
55 | |
56 listener.register_outgoing(conn, session); | |
57 | |
58 listener.onconnect(conn); | |
59 end | |
60 | |
61 function proxy_listener.register_outgoing(conn, session) | |
62 session.direction = "outgoing"; | |
63 sessions[conn] = session; | |
64 end | |
65 | |
66 function proxy_listener.ondisconnect(conn, err) | |
67 sessions[conn] = nil; | |
11 end | 68 end |
12 | 69 |
13 module:hook("route/remote", function(event) | 70 module:hook("route/remote", function(event) |
14 local from_host, to_host, stanza = event.from_host, event.to_host, event.stanza; | 71 local from_host, to_host, stanza = event.from_host, event.to_host, event.stanza; |
15 local inject = injected and injected[to_host]; | 72 local inject = injected and injected[to_host]; |
16 if not inject then return end | 73 if not inject then return end |
17 log("debug", "opening a new outgoing connection for this stanza"); | 74 log("debug", "opening a new outgoing connection for this stanza"); |
18 local host_session = new_outgoing(from_host, to_host); | 75 local host_session = new_outgoing(from_host, to_host); |
19 host_session.version = 1; | |
20 | 76 |
21 -- Store in buffer | 77 -- Store in buffer |
22 host_session.bounce_sendq = bounce_sendq; | 78 host_session.bounce_sendq = bounce_sendq; |
23 host_session.sendq = { {tostring(stanza), stanza.attr.type ~= "error" and stanza.attr.type ~= "result" and st.reply(stanza)} }; | 79 host_session.sendq = { {tostring(stanza), stanza.attr.type ~= "error" and stanza.attr.type ~= "result" and st.reply(stanza)} }; |
24 log("debug", "stanza [%s] queued until connection complete", tostring(stanza.name)); | 80 log("debug", "stanza [%s] queued until connection complete", tostring(stanza.name)); |
25 | 81 |
26 local ip_hosts, srv_hosts = {}, {}; | 82 local host, port = inject[1] or inject, tonumber(inject[2]) or 5269; |
27 host_session.srv_hosts = srv_hosts; | |
28 host_session.srv_choice = 0; | |
29 | 83 |
30 if type(inject) == "string" then inject = { inject } end | 84 local conn = addclient(host, port, proxy_listener, "*a"); |
31 | 85 |
32 for _, item in ipairs(inject) do | 86 proxy_listener.register_outgoing(conn, host_session); |
33 local host, port = item[1] or item, tonumber(item[2]) or 5269; | |
34 if isip(host) then | |
35 ip_hosts[#ip_hosts+1] = { ip = new_ip(host), port = port } | |
36 else | |
37 srv_hosts[#srv_hosts+1] = { target = host, port = port } | |
38 end | |
39 end | |
40 if #ip_hosts > 0 then | |
41 host_session.ip_hosts = ip_hosts; | |
42 host_session.ip_choice = 0; -- Incremented by try_next_ip | |
43 s2sout.try_next_ip(host_session); | |
44 return true; | |
45 end | |
46 | 87 |
47 return s2sout.try_connect(host_session, host_session.srv_hosts[1].target, host_session.srv_hosts[1].port); | 88 host_session.conn = conn; |
89 return true; | |
48 end, -2); | 90 end, -2); |
49 |