comparison mod_s2soutinjection/mod_s2soutinjection.lua @ 4932:f4a9e804c457

mod_s2soutinjection: Rewrite based on mod_onions for 0.12 compat (thanks Zash)
author moparisthebest <admin@moparisthebest.com>
date Sun, 24 Apr 2022 23:58:41 -0400
parents 864fefec1c07
children e55d1f7a570a
comparison
equal deleted inserted replaced
4931:13070c6a7ce8 4932:f4a9e804c457
1 local st = require"util.stanza"; 1 local st = require"util.stanza";
2 local new_ip = require"util.ip".new_ip; 2 local new_ip = require"util.ip".new_ip;
3 local new_outgoing = require"core.s2smanager".new_outgoing; 3 local new_outgoing = require"core.s2smanager".new_outgoing;
4 local bounce_sendq = module:depends"s2s".route_to_new_session.bounce_sendq; 4 local bounce_sendq = module:depends"s2s".route_to_new_session.bounce_sendq;
5 local s2sout = module:depends"s2s".route_to_new_session.s2sout; 5 local initialize_filters = require "util.filters".initialize;
6 local st = require "util.stanza";
7
8 local portmanager = require "core.portmanager";
9
10 local addclient = require "net.server".addclient;
11
12 module:depends("s2s");
13
14 local sessions = module:shared("sessions");
6 15
7 local injected = module:get_option("s2s_connect_overrides"); 16 local injected = module:get_option("s2s_connect_overrides");
8 17
9 local function isip(addr) 18 -- The proxy_listener handles connection while still connecting to the proxy,
10 return not not (addr and addr:match("^%d+%.%d+%.%d+%.%d+$") or addr:match("^[%x:]*:[%x:]-:[%x:]*$")); 19 -- then it hands them over to the normal listener (in mod_s2s)
20 local proxy_listener = { default_port = port, default_mode = "*a", default_interface = "*" };
21
22 function proxy_listener.onconnect(conn)
23 local session = sessions[conn];
24
25 -- Now the real s2s listener can take over the connection.
26 local listener = portmanager.get_service("s2s").listener;
27
28 local w, log = conn.send, session.log;
29
30 local filter = initialize_filters(session);
31
32 session.version = 1;
33
34 session.sends2s = function (t)
35 log("debug", "sending (s2s over proxy): %s", (t.top_tag and t:top_tag()) or t:match("^[^>]*>?"));
36 if t.name then
37 t = filter("stanzas/out", t);
38 end
39 if t then
40 t = filter("bytes/out", tostring(t));
41 if t then
42 return conn:write(tostring(t));
43 end
44 end
45 end
46
47 session.open_stream = function ()
48 session.sends2s(st.stanza("stream:stream", {
49 xmlns='jabber:server', ["xmlns:db"]='jabber:server:dialback',
50 ["xmlns:stream"]='http://etherx.jabber.org/streams',
51 from=session.from_host, to=session.to_host, version='1.0', ["xml:lang"]='en'}):top_tag());
52 end
53
54 conn.setlistener(conn, listener);
55
56 listener.register_outgoing(conn, session);
57
58 listener.onconnect(conn);
59 end
60
61 function proxy_listener.register_outgoing(conn, session)
62 session.direction = "outgoing";
63 sessions[conn] = session;
64 end
65
66 function proxy_listener.ondisconnect(conn, err)
67 sessions[conn] = nil;
11 end 68 end
12 69
13 module:hook("route/remote", function(event) 70 module:hook("route/remote", function(event)
14 local from_host, to_host, stanza = event.from_host, event.to_host, event.stanza; 71 local from_host, to_host, stanza = event.from_host, event.to_host, event.stanza;
15 local inject = injected and injected[to_host]; 72 local inject = injected and injected[to_host];
16 if not inject then return end 73 if not inject then return end
17 log("debug", "opening a new outgoing connection for this stanza"); 74 log("debug", "opening a new outgoing connection for this stanza");
18 local host_session = new_outgoing(from_host, to_host); 75 local host_session = new_outgoing(from_host, to_host);
19 host_session.version = 1;
20 76
21 -- Store in buffer 77 -- Store in buffer
22 host_session.bounce_sendq = bounce_sendq; 78 host_session.bounce_sendq = bounce_sendq;
23 host_session.sendq = { {tostring(stanza), stanza.attr.type ~= "error" and stanza.attr.type ~= "result" and st.reply(stanza)} }; 79 host_session.sendq = { {tostring(stanza), stanza.attr.type ~= "error" and stanza.attr.type ~= "result" and st.reply(stanza)} };
24 log("debug", "stanza [%s] queued until connection complete", tostring(stanza.name)); 80 log("debug", "stanza [%s] queued until connection complete", tostring(stanza.name));
25 81
26 local ip_hosts, srv_hosts = {}, {}; 82 local host, port = inject[1] or inject, tonumber(inject[2]) or 5269;
27 host_session.srv_hosts = srv_hosts;
28 host_session.srv_choice = 0;
29 83
30 if type(inject) == "string" then inject = { inject } end 84 local conn = addclient(host, port, proxy_listener, "*a");
31 85
32 for _, item in ipairs(inject) do 86 proxy_listener.register_outgoing(conn, host_session);
33 local host, port = item[1] or item, tonumber(item[2]) or 5269;
34 if isip(host) then
35 ip_hosts[#ip_hosts+1] = { ip = new_ip(host), port = port }
36 else
37 srv_hosts[#srv_hosts+1] = { target = host, port = port }
38 end
39 end
40 if #ip_hosts > 0 then
41 host_session.ip_hosts = ip_hosts;
42 host_session.ip_choice = 0; -- Incremented by try_next_ip
43 s2sout.try_next_ip(host_session);
44 return true;
45 end
46 87
47 return s2sout.try_connect(host_session, host_session.srv_hosts[1].target, host_session.srv_hosts[1].port); 88 host_session.conn = conn;
89 return true;
48 end, -2); 90 end, -2);
49