From 1a51007c4ffebc694448602f73401af9d328b6c8 Mon Sep 17 00:00:00 2001 From: Hiroki Tokunaga Date: Mon, 2 Mar 2026 17:34:29 +0900 Subject: [PATCH 1/9] fix: prevent UDP forwarder thread exhaustion Limit active UDP forwarding connections and clean up old resources so long-running traffic no longer exhausts native threads and stops receiving new packets. Made-with: Cursor --- .../java/core/packetproxy/DuplexManager.java | 4 + .../core/packetproxy/ProxyUDPForward.java | 116 ++++++++++++++++-- .../java/core/packetproxy/common/UDPConn.java | 51 +++++++- .../packetproxy/common/UDPConnManager.java | 41 +++++-- .../packetproxy/common/UDPServerSocket.java | 6 + .../packetproxy/common/UDPSocketEndpoint.java | 53 ++++++-- 6 files changed, 240 insertions(+), 31 deletions(-) diff --git a/src/main/java/core/packetproxy/DuplexManager.java b/src/main/java/core/packetproxy/DuplexManager.java index dfb721bf..55019dd6 100644 --- a/src/main/java/core/packetproxy/DuplexManager.java +++ b/src/main/java/core/packetproxy/DuplexManager.java @@ -59,6 +59,10 @@ public Duplex getDuplex(int hash) { return duplex_list.get(hash); } + public void removeDuplex(int hash) { + duplex_list.remove(hash); + } + public boolean has(int hash) { return (duplex_list.get(hash) == null) ? false : true; } diff --git a/src/main/java/core/packetproxy/ProxyUDPForward.java b/src/main/java/core/packetproxy/ProxyUDPForward.java index 57e1619f..5bf91d94 100644 --- a/src/main/java/core/packetproxy/ProxyUDPForward.java +++ b/src/main/java/core/packetproxy/ProxyUDPForward.java @@ -19,6 +19,9 @@ import static packetproxy.util.Logging.log; import java.net.InetSocketAddress; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.Map; import packetproxy.common.Endpoint; import packetproxy.common.UDPServerSocket; import packetproxy.common.UDPSocketEndpoint; @@ -26,8 +29,23 @@ public class ProxyUDPForward extends Proxy { - private ListenPort listen_info; - private UDPServerSocket listen_socket; + private static final int MAX_ACTIVE_CONNECTIONS = 256; + private final ListenPort listen_info; + private final UDPServerSocket listen_socket; + private final Map activeConnections = new LinkedHashMap<>(); + private volatile boolean closed = false; + + private static class ActiveConnection { + private final DuplexAsync duplex; + private final UDPSocketEndpoint serverEndpoint; + private final int duplexHash; + + ActiveConnection(DuplexAsync duplex, UDPSocketEndpoint serverEndpoint, int duplexHash) { + this.duplex = duplex; + this.serverEndpoint = serverEndpoint; + this.duplexHash = duplexHash; + } + } public ProxyUDPForward(ListenPort listen_info) throws Exception { this.listen_info = listen_info; @@ -38,26 +56,102 @@ public ProxyUDPForward(ListenPort listen_info) throws Exception { public void run() { try { - while (true) { + while (!closed) { + try { + + Endpoint client_endpoint = listen_socket.accept(); + log("accept"); + + InetSocketAddress clientAddr = client_endpoint.getAddress(); + InetSocketAddress serverAddr = listen_info.getServer().getAddress(); + UDPSocketEndpoint server_endpoint = new UDPSocketEndpoint(serverAddr); - Endpoint client_endpoint = listen_socket.accept(); - log("accept"); + DuplexAsync duplex = DuplexFactory.createDuplexAsync(client_endpoint, server_endpoint, + listen_info.getServer().getEncoder()); + duplex.start(); + int duplexHash = DuplexManager.getInstance().registerDuplex(duplex); - InetSocketAddress serverAddr = listen_info.getServer().getAddress(); - UDPSocketEndpoint server_endpoint = new UDPSocketEndpoint(serverAddr); + closeConnectionIfExists(clientAddr); + activeConnections.put(clientAddr, new ActiveConnection(duplex, server_endpoint, duplexHash)); + evictIfOverLimit(); + } catch (Exception e) { - DuplexAsync duplex = DuplexFactory.createDuplexAsync(client_endpoint, server_endpoint, - listen_info.getServer().getEncoder()); - duplex.start(); - DuplexManager.getInstance().registerDuplex(duplex); + if (!closed) { + errWithStackTrace(e); + } + } } } catch (Exception e) { errWithStackTrace(e); + } finally { + closeAllConnections(); } } public void close() throws Exception { + closed = true; + closeAllConnections(); listen_socket.close(); } + + private void evictIfOverLimit() { + while (activeConnections.size() > MAX_ACTIVE_CONNECTIONS) { + Iterator> i = activeConnections.entrySet().iterator(); + if (!i.hasNext()) { + + return; + } + Map.Entry oldest = i.next(); + i.remove(); + closeConnection(oldest.getKey(), oldest.getValue()); + } + } + + private void closeConnectionIfExists(InetSocketAddress clientAddr) { + ActiveConnection oldConnection = activeConnections.remove(clientAddr); + if (oldConnection != null) { + + closeConnection(clientAddr, oldConnection); + } + } + + private void closeConnection(InetSocketAddress clientAddr, ActiveConnection connection) { + try { + + connection.duplex.close(); + } catch (Exception e) { + + errWithStackTrace(e); + } + try { + + connection.serverEndpoint.close(); + } catch (Exception e) { + + errWithStackTrace(e); + } + try { + + DuplexManager.getInstance().removeDuplex(connection.duplexHash); + } catch (Exception e) { + + errWithStackTrace(e); + } + try { + + listen_socket.removeConnection(clientAddr); + } catch (Exception e) { + + errWithStackTrace(e); + } + } + + private void closeAllConnections() { + for (Map.Entry entry : activeConnections.entrySet()) { + + closeConnection(entry.getKey(), entry.getValue()); + } + activeConnections.clear(); + } } diff --git a/src/main/java/core/packetproxy/common/UDPConn.java b/src/main/java/core/packetproxy/common/UDPConn.java index 5fd97bd8..faf2f583 100644 --- a/src/main/java/core/packetproxy/common/UDPConn.java +++ b/src/main/java/core/packetproxy/common/UDPConn.java @@ -27,12 +27,18 @@ public class UDPConn { - private PipeEndpoint pipe; - private InetSocketAddress addr; + private final PipeEndpoint pipe; + private final InetSocketAddress addr; + private final RawEndpoint rawEndpoint; + private final RawEndpoint proxyRawEndpoint; + private volatile boolean closed; public UDPConn(InetSocketAddress addr) throws Exception { this.addr = addr; this.pipe = new PipeEndpoint(addr); + this.rawEndpoint = this.pipe.getRawEndpoint(); + this.proxyRawEndpoint = this.pipe.getProxyRawEndpoint(); + this.closed = false; } public void put(byte[] data, int offset, int length) throws Exception { @@ -43,7 +49,7 @@ public void put(byte[] data, int offset, int length) throws Exception { } public void put(byte[] data) throws Exception { - OutputStream os = pipe.getRawEndpoint().getOutputStream(); + OutputStream os = rawEndpoint.getOutputStream(); os.write(data); os.flush(); } @@ -53,20 +59,53 @@ public void getAutomatically(final BlockingQueue queue) throws E Callable recvTask = new Callable() { public Void call() throws Exception { - while (true) { + while (!closed) { - InputStream is = pipe.getRawEndpoint().getInputStream(); + InputStream is = rawEndpoint.getInputStream(); byte[] buf = new byte[4096]; int len = is.read(buf); + if (len < 0) { + + return null; + } DatagramPacket recvPacket = new DatagramPacket(buf, len, addr); queue.put(recvPacket); } + return null; } }; executor.submit(recvTask); } public Endpoint getEndpoint() throws Exception { - return pipe.getProxyRawEndpoint(); + return proxyRawEndpoint; + } + + public void close() throws Exception { + if (closed) { + + return; + } + closed = true; + try { + + rawEndpoint.getInputStream().close(); + } catch (Exception ignored) { + } + try { + + rawEndpoint.getOutputStream().close(); + } catch (Exception ignored) { + } + try { + + proxyRawEndpoint.getInputStream().close(); + } catch (Exception ignored) { + } + try { + + proxyRawEndpoint.getOutputStream().close(); + } catch (Exception ignored) { + } } } diff --git a/src/main/java/core/packetproxy/common/UDPConnManager.java b/src/main/java/core/packetproxy/common/UDPConnManager.java index b479e505..c7d3f7cc 100644 --- a/src/main/java/core/packetproxy/common/UDPConnManager.java +++ b/src/main/java/core/packetproxy/common/UDPConnManager.java @@ -35,18 +35,29 @@ public UDPConnManager() { } public Endpoint accept() throws Exception { - InetSocketAddress addr = acceptedQueue.take(); - return connList.get(addr).getEndpoint(); + while (true) { + InetSocketAddress addr = acceptedQueue.take(); + synchronized (this) { + UDPConn conn = connList.get(addr); + if (conn != null) { + + return conn.getEndpoint(); + } + } + } } public void put(DatagramPacket packet) throws Exception { InetSocketAddress addr = new InetSocketAddress(packet.getAddress(), packet.getPort()); - UDPConn conn = this.query(addr); - if (conn == null) { + UDPConn conn; + synchronized (this) { + conn = this.query(addr); + if (conn == null) { - conn = this.create(addr); - conn.getAutomatically(recvQueue); - acceptedQueue.put(addr); + conn = this.create(addr); + conn.getAutomatically(recvQueue); + acceptedQueue.put(addr); + } } conn.put(packet.getData(), 0, packet.getLength()); } @@ -64,4 +75,20 @@ private UDPConn create(InetSocketAddress key) throws Exception { connList.put(key, conn); return conn; } + + public synchronized void remove(InetSocketAddress key) throws Exception { + UDPConn conn = connList.remove(key); + if (conn != null) { + + conn.close(); + } + } + + public synchronized void closeAll() throws Exception { + for (UDPConn conn : connList.values()) { + + conn.close(); + } + connList.clear(); + } } diff --git a/src/main/java/core/packetproxy/common/UDPServerSocket.java b/src/main/java/core/packetproxy/common/UDPServerSocket.java index a76a3cd3..2dc428c1 100644 --- a/src/main/java/core/packetproxy/common/UDPServerSocket.java +++ b/src/main/java/core/packetproxy/common/UDPServerSocket.java @@ -17,6 +17,7 @@ import java.net.DatagramPacket; import java.net.DatagramSocket; +import java.net.InetSocketAddress; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -34,12 +35,17 @@ public UDPServerSocket(int port) throws Exception { public void close() throws Exception { socket.close(); + connManager.closeAll(); } public Endpoint accept() throws Exception { return connManager.accept(); } + public void removeConnection(InetSocketAddress addr) throws Exception { + connManager.remove(addr); + } + private void createRecvLoop() throws Exception { ExecutorService executor = Executors.newFixedThreadPool(2); Callable recvTask = new Callable() { diff --git a/src/main/java/core/packetproxy/common/UDPSocketEndpoint.java b/src/main/java/core/packetproxy/common/UDPSocketEndpoint.java index 63bab73b..9f97a979 100644 --- a/src/main/java/core/packetproxy/common/UDPSocketEndpoint.java +++ b/src/main/java/core/packetproxy/common/UDPSocketEndpoint.java @@ -26,16 +26,20 @@ public class UDPSocketEndpoint implements Endpoint { - private DatagramSocket socket; - private InetSocketAddress serverAddr; - private PipeEndpoint pipe; - private static int BUFSIZE = 4096; + private final DatagramSocket socket; + private final InetSocketAddress serverAddr; + private final PipeEndpoint pipe; + private static final int BUFSIZE = 4096; + private final ExecutorService executor; + private volatile boolean closed; public UDPSocketEndpoint(InetSocketAddress addr) throws Exception { socket = new DatagramSocket(); socket.connect(addr); serverAddr = addr; pipe = new PipeEndpoint(addr); + executor = Executors.newFixedThreadPool(2); + closed = false; loop(); } @@ -55,24 +59,28 @@ public OutputStream getOutputStream() throws Exception { } private void loop() { - ExecutorService executor = Executors.newFixedThreadPool(2); Callable sendTask = new Callable() { public Void call() throws Exception { - while (true) { + while (!closed) { InputStream is = pipe.getRawEndpoint().getInputStream(); byte[] input_data = new byte[BUFSIZE]; int len = is.read(input_data); + if (len < 0) { + + return null; + } DatagramPacket sendPacket = new DatagramPacket(input_data, 0, len, serverAddr); socket.send(sendPacket); } + return null; } }; Callable recvTask = new Callable() { public Void call() throws Exception { - while (true) { + while (!closed) { byte[] buf = new byte[BUFSIZE]; DatagramPacket recvPacket = new DatagramPacket(buf, BUFSIZE); @@ -81,12 +89,43 @@ public Void call() throws Exception { os.write(recvPacket.getData(), 0, recvPacket.getLength()); os.flush(); } + return null; } }; executor.submit(sendTask); executor.submit(recvTask); } + public void close() { + if (closed) { + + return; + } + closed = true; + socket.close(); + executor.shutdownNow(); + try { + + pipe.getRawEndpoint().getInputStream().close(); + } catch (Exception ignored) { + } + try { + + pipe.getRawEndpoint().getOutputStream().close(); + } catch (Exception ignored) { + } + try { + + pipe.getProxyRawEndpoint().getInputStream().close(); + } catch (Exception ignored) { + } + try { + + pipe.getProxyRawEndpoint().getOutputStream().close(); + } catch (Exception ignored) { + } + } + @Override public int getLocalPort() { return socket.getLocalPort(); From 7a5c5571107276e42ebb9eeb977be822d3a58d73 Mon Sep 17 00:00:00 2001 From: Hiroki Tokunaga Date: Mon, 9 Mar 2026 15:11:01 +0900 Subject: [PATCH 2/9] fix: streamline UDP forwarder packet handling Replace the per-connection duplex flow with a lighter datagram forwarding path so the UDP forwarder spends less time creating transient sessions during bursts. Made-with: Cursor --- .../core/packetproxy/ProxyUDPForward.java | 198 ++++++++++++------ .../packetproxy/common/UDPServerSocket.java | 114 ++++++++-- 2 files changed, 228 insertions(+), 84 deletions(-) diff --git a/src/main/java/core/packetproxy/ProxyUDPForward.java b/src/main/java/core/packetproxy/ProxyUDPForward.java index 5bf91d94..3013d6ef 100644 --- a/src/main/java/core/packetproxy/ProxyUDPForward.java +++ b/src/main/java/core/packetproxy/ProxyUDPForward.java @@ -16,142 +16,212 @@ package packetproxy; import static packetproxy.util.Logging.errWithStackTrace; -import static packetproxy.util.Logging.log; +import java.net.DatagramPacket; import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.DatagramChannel; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.util.Arrays; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; -import packetproxy.common.Endpoint; import packetproxy.common.UDPServerSocket; -import packetproxy.common.UDPSocketEndpoint; import packetproxy.model.ListenPort; public class ProxyUDPForward extends Proxy { - private static final int MAX_ACTIVE_CONNECTIONS = 256; + private static final int MAX_DATAGRAM_SIZE = 65535; + private static final long IDLE_SESSION_TIMEOUT_MILLIS = 10; private final ListenPort listen_info; private final UDPServerSocket listen_socket; - private final Map activeConnections = new LinkedHashMap<>(); + private final Selector selector; + private final Thread responseLoopThread; + private final Map activeSessions = new LinkedHashMap<>(); private volatile boolean closed = false; - private static class ActiveConnection { - private final DuplexAsync duplex; - private final UDPSocketEndpoint serverEndpoint; - private final int duplexHash; + private static class UpstreamSession { + private final InetSocketAddress clientAddr; + private final InetSocketAddress serverAddr; + private final DatagramChannel channel; + private volatile long lastActivityAtMillis; + + UpstreamSession(InetSocketAddress clientAddr, InetSocketAddress serverAddr, DatagramChannel channel) { + this.clientAddr = clientAddr; + this.serverAddr = serverAddr; + this.channel = channel; + this.lastActivityAtMillis = System.currentTimeMillis(); + } + + private void touch() { + lastActivityAtMillis = System.currentTimeMillis(); + } - ActiveConnection(DuplexAsync duplex, UDPSocketEndpoint serverEndpoint, int duplexHash) { - this.duplex = duplex; - this.serverEndpoint = serverEndpoint; - this.duplexHash = duplexHash; + private boolean isIdle(long timeoutMillis) { + return System.currentTimeMillis() - lastActivityAtMillis >= timeoutMillis; } } public ProxyUDPForward(ListenPort listen_info) throws Exception { this.listen_info = listen_info; listen_socket = new UDPServerSocket(listen_info.getPort()); + selector = Selector.open(); + responseLoopThread = new Thread(new Runnable() { + + @Override + public void run() { + runResponseLoop(); + } + }, "udp-forward-response"); + responseLoopThread.setDaemon(true); + responseLoopThread.start(); } @Override public void run() { try { - while (!closed) { try { + DatagramPacket clientPacket = listen_socket.takeReceivedPacket(); + if (clientPacket == null) { - Endpoint client_endpoint = listen_socket.accept(); - log("accept"); - - InetSocketAddress clientAddr = client_endpoint.getAddress(); + continue; + } + InetSocketAddress clientAddr = new InetSocketAddress(clientPacket.getAddress(), + clientPacket.getPort()); InetSocketAddress serverAddr = listen_info.getServer().getAddress(); - UDPSocketEndpoint server_endpoint = new UDPSocketEndpoint(serverAddr); - - DuplexAsync duplex = DuplexFactory.createDuplexAsync(client_endpoint, server_endpoint, - listen_info.getServer().getEncoder()); - duplex.start(); - int duplexHash = DuplexManager.getInstance().registerDuplex(duplex); - - closeConnectionIfExists(clientAddr); - activeConnections.put(clientAddr, new ActiveConnection(duplex, server_endpoint, duplexHash)); - evictIfOverLimit(); + closeIdleSessions(); + UpstreamSession session = getOrCreateSession(clientAddr, serverAddr); + sendToUpstream(session, clientPacket); } catch (Exception e) { - if (!closed) { errWithStackTrace(e); } } } } catch (Exception e) { - errWithStackTrace(e); } finally { - closeAllConnections(); + closeAllSessions(); } } public void close() throws Exception { closed = true; - closeAllConnections(); + selector.wakeup(); + closeAllSessions(); listen_socket.close(); } - private void evictIfOverLimit() { - while (activeConnections.size() > MAX_ACTIVE_CONNECTIONS) { - Iterator> i = activeConnections.entrySet().iterator(); - if (!i.hasNext()) { + private UpstreamSession getOrCreateSession(InetSocketAddress clientAddr, InetSocketAddress serverAddr) + throws Exception { + synchronized (activeSessions) { + UpstreamSession session = activeSessions.get(clientAddr); + if (session != null) { - return; + return session; } - Map.Entry oldest = i.next(); - i.remove(); - closeConnection(oldest.getKey(), oldest.getValue()); + + DatagramChannel channel = DatagramChannel.open(); + channel.bind(null); + channel.configureBlocking(false); + UpstreamSession newSession = new UpstreamSession(clientAddr, serverAddr, channel); + selector.wakeup(); + channel.register(selector, SelectionKey.OP_READ, newSession); + activeSessions.put(clientAddr, newSession); + return newSession; } } - private void closeConnectionIfExists(InetSocketAddress clientAddr) { - ActiveConnection oldConnection = activeConnections.remove(clientAddr); - if (oldConnection != null) { + private void sendToUpstream(UpstreamSession session, DatagramPacket clientPacket) throws Exception { + ByteBuffer buf = ByteBuffer.wrap(clientPacket.getData(), 0, clientPacket.getLength()); + while (buf.hasRemaining() && !closed) { + int written = session.channel.send(buf, session.serverAddr); + if (written > 0) { - closeConnection(clientAddr, oldConnection); + session.touch(); + return; + } + Thread.yield(); } } - private void closeConnection(InetSocketAddress clientAddr, ActiveConnection connection) { + private void runResponseLoop() { try { - - connection.duplex.close(); + while (!closed) { + selector.select(50); + processSelectedResponses(); + closeIdleSessions(); + } } catch (Exception e) { + if (!closed) { - errWithStackTrace(e); + errWithStackTrace(e); + } } - try { + } - connection.serverEndpoint.close(); - } catch (Exception e) { + private void processSelectedResponses() throws Exception { + Iterator i = selector.selectedKeys().iterator(); + while (i.hasNext()) { + SelectionKey key = i.next(); + i.remove(); + if (!key.isValid() || !key.isReadable()) { - errWithStackTrace(e); + continue; + } + + UpstreamSession session = (UpstreamSession) key.attachment(); + ByteBuffer buf = ByteBuffer.allocate(MAX_DATAGRAM_SIZE); + int len = session.channel.read(buf); + if (len <= 0) { + + continue; + } + session.touch(); + byte[] response = Arrays.copyOf(buf.array(), len); + listen_socket.sendToClient(new DatagramPacket(response, len, session.clientAddr)); } - try { + } - DuplexManager.getInstance().removeDuplex(connection.duplexHash); - } catch (Exception e) { + private void closeIdleSessions() { + synchronized (activeSessions) { + Iterator> i = activeSessions.entrySet().iterator(); + while (i.hasNext()) { + Map.Entry entry = i.next(); + if (!entry.getValue().isIdle(IDLE_SESSION_TIMEOUT_MILLIS)) { - errWithStackTrace(e); + continue; + } + i.remove(); + closeSession(entry.getValue()); + } } - try { + } - listen_socket.removeConnection(clientAddr); - } catch (Exception e) { + private void closeAllSessions() { + synchronized (activeSessions) { + for (UpstreamSession session : activeSessions.values()) { - errWithStackTrace(e); + closeSession(session); + } + activeSessions.clear(); } } - private void closeAllConnections() { - for (Map.Entry entry : activeConnections.entrySet()) { + private void closeSession(UpstreamSession session) { + try { + + SelectionKey key = session.channel.keyFor(selector); + if (key != null) { + + key.cancel(); + } + session.channel.close(); + } catch (Exception e) { - closeConnection(entry.getKey(), entry.getValue()); + errWithStackTrace(e); } - activeConnections.clear(); } } diff --git a/src/main/java/core/packetproxy/common/UDPServerSocket.java b/src/main/java/core/packetproxy/common/UDPServerSocket.java index 2dc428c1..c9710c82 100644 --- a/src/main/java/core/packetproxy/common/UDPServerSocket.java +++ b/src/main/java/core/packetproxy/common/UDPServerSocket.java @@ -15,62 +15,136 @@ */ package packetproxy.common; +import static packetproxy.util.Logging.errWithStackTrace; + import java.net.DatagramPacket; import java.net.DatagramSocket; -import java.net.InetSocketAddress; +import java.net.SocketException; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; public class UDPServerSocket { - private DatagramSocket socket; - private UDPConnManager connManager; + private static final int MAX_DATAGRAM_SIZE = 65535; + private static final int SOCKET_BUFFER_SIZE = 1024 * 1024; + private final DatagramSocket socket; + private final BlockingQueue incomingQueue; + private final BlockingQueue outgoingQueue; + private final Thread recvThread; + private final ExecutorService executor; public UDPServerSocket(int port) throws Exception { socket = new DatagramSocket(port); - connManager = new UDPConnManager(); + socket.setReceiveBufferSize(SOCKET_BUFFER_SIZE); + socket.setSendBufferSize(SOCKET_BUFFER_SIZE); + incomingQueue = new LinkedBlockingQueue(); + outgoingQueue = new LinkedBlockingQueue(); + recvThread = new Thread(new Runnable() { + + @Override + public void run() { + try { + + createRecvTask().call(); + } catch (Exception e) { + + if (!socket.isClosed()) { + + errWithStackTrace(e); + } + } + } + }, "udp-listen-recv"); + recvThread.setDaemon(true); + recvThread.setPriority(Thread.MAX_PRIORITY); + executor = Executors.newSingleThreadExecutor(); createRecvLoop(); } public void close() throws Exception { socket.close(); - connManager.closeAll(); + executor.shutdownNow(); } - public Endpoint accept() throws Exception { - return connManager.accept(); + public DatagramPacket takeReceivedPacket() throws Exception { + return incomingQueue.poll(100, TimeUnit.MILLISECONDS); } - public void removeConnection(InetSocketAddress addr) throws Exception { - connManager.remove(addr); + public void sendToClient(DatagramPacket packet) throws Exception { + outgoingQueue.put(packet); } private void createRecvLoop() throws Exception { - ExecutorService executor = Executors.newFixedThreadPool(2); - Callable recvTask = new Callable() { + Callable sendTask = new Callable() { public Void call() throws Exception { while (true) { + try { + + DatagramPacket sendPacket = outgoingQueue.take(); + socket.send(sendPacket); + } catch (InterruptedException e) { + + Thread.currentThread().interrupt(); + return null; + } catch (SocketException e) { + + if (socket.isClosed()) { + + return null; + } + errWithStackTrace(e); + } catch (Exception e) { - byte[] buf = new byte[4096]; - DatagramPacket recvPacket = new DatagramPacket(buf, 4096); - socket.receive(recvPacket); - connManager.put(recvPacket); + if (socket.isClosed()) { + + return null; + } + errWithStackTrace(e); + } } } }; - Callable sendTask = new Callable() { + recvThread.start(); + executor.submit(sendTask); + } + + private Callable createRecvTask() { + return new Callable() { public Void call() throws Exception { while (true) { + try { + + byte[] buf = new byte[MAX_DATAGRAM_SIZE]; + DatagramPacket recvPacket = new DatagramPacket(buf, MAX_DATAGRAM_SIZE); + socket.receive(recvPacket); + incomingQueue.put(recvPacket); + } catch (InterruptedException e) { - DatagramPacket sendPacket = connManager.get(); - socket.send(sendPacket); + Thread.currentThread().interrupt(); + return null; + } catch (SocketException e) { + + if (socket.isClosed()) { + + return null; + } + errWithStackTrace(e); + } catch (Exception e) { + + if (socket.isClosed()) { + + return null; + } + errWithStackTrace(e); + } } } }; - executor.submit(recvTask); - executor.submit(sendTask); } } From c02a2a03ad244686abac5d4533eda60d29026bb7 Mon Sep 17 00:00:00 2001 From: Hiroki Tokunaga Date: Mon, 9 Mar 2026 15:21:34 +0900 Subject: [PATCH 3/9] revert: undo UDP forwarder packet handling rewrite Revert the datagram-forwarding rewrite because it breaks UDP forwarder behavior and removes the existing duplex-based processing path. Made-with: Cursor --- .../core/packetproxy/ProxyUDPForward.java | 198 ++++++------------ .../packetproxy/common/UDPServerSocket.java | 114 ++-------- 2 files changed, 84 insertions(+), 228 deletions(-) diff --git a/src/main/java/core/packetproxy/ProxyUDPForward.java b/src/main/java/core/packetproxy/ProxyUDPForward.java index 3013d6ef..5bf91d94 100644 --- a/src/main/java/core/packetproxy/ProxyUDPForward.java +++ b/src/main/java/core/packetproxy/ProxyUDPForward.java @@ -16,212 +16,142 @@ package packetproxy; import static packetproxy.util.Logging.errWithStackTrace; +import static packetproxy.util.Logging.log; -import java.net.DatagramPacket; import java.net.InetSocketAddress; -import java.nio.ByteBuffer; -import java.nio.channels.DatagramChannel; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.util.Arrays; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; +import packetproxy.common.Endpoint; import packetproxy.common.UDPServerSocket; +import packetproxy.common.UDPSocketEndpoint; import packetproxy.model.ListenPort; public class ProxyUDPForward extends Proxy { - private static final int MAX_DATAGRAM_SIZE = 65535; - private static final long IDLE_SESSION_TIMEOUT_MILLIS = 10; + private static final int MAX_ACTIVE_CONNECTIONS = 256; private final ListenPort listen_info; private final UDPServerSocket listen_socket; - private final Selector selector; - private final Thread responseLoopThread; - private final Map activeSessions = new LinkedHashMap<>(); + private final Map activeConnections = new LinkedHashMap<>(); private volatile boolean closed = false; - private static class UpstreamSession { - private final InetSocketAddress clientAddr; - private final InetSocketAddress serverAddr; - private final DatagramChannel channel; - private volatile long lastActivityAtMillis; - - UpstreamSession(InetSocketAddress clientAddr, InetSocketAddress serverAddr, DatagramChannel channel) { - this.clientAddr = clientAddr; - this.serverAddr = serverAddr; - this.channel = channel; - this.lastActivityAtMillis = System.currentTimeMillis(); - } - - private void touch() { - lastActivityAtMillis = System.currentTimeMillis(); - } + private static class ActiveConnection { + private final DuplexAsync duplex; + private final UDPSocketEndpoint serverEndpoint; + private final int duplexHash; - private boolean isIdle(long timeoutMillis) { - return System.currentTimeMillis() - lastActivityAtMillis >= timeoutMillis; + ActiveConnection(DuplexAsync duplex, UDPSocketEndpoint serverEndpoint, int duplexHash) { + this.duplex = duplex; + this.serverEndpoint = serverEndpoint; + this.duplexHash = duplexHash; } } public ProxyUDPForward(ListenPort listen_info) throws Exception { this.listen_info = listen_info; listen_socket = new UDPServerSocket(listen_info.getPort()); - selector = Selector.open(); - responseLoopThread = new Thread(new Runnable() { - - @Override - public void run() { - runResponseLoop(); - } - }, "udp-forward-response"); - responseLoopThread.setDaemon(true); - responseLoopThread.start(); } @Override public void run() { try { + while (!closed) { try { - DatagramPacket clientPacket = listen_socket.takeReceivedPacket(); - if (clientPacket == null) { - continue; - } - InetSocketAddress clientAddr = new InetSocketAddress(clientPacket.getAddress(), - clientPacket.getPort()); + Endpoint client_endpoint = listen_socket.accept(); + log("accept"); + + InetSocketAddress clientAddr = client_endpoint.getAddress(); InetSocketAddress serverAddr = listen_info.getServer().getAddress(); - closeIdleSessions(); - UpstreamSession session = getOrCreateSession(clientAddr, serverAddr); - sendToUpstream(session, clientPacket); + UDPSocketEndpoint server_endpoint = new UDPSocketEndpoint(serverAddr); + + DuplexAsync duplex = DuplexFactory.createDuplexAsync(client_endpoint, server_endpoint, + listen_info.getServer().getEncoder()); + duplex.start(); + int duplexHash = DuplexManager.getInstance().registerDuplex(duplex); + + closeConnectionIfExists(clientAddr); + activeConnections.put(clientAddr, new ActiveConnection(duplex, server_endpoint, duplexHash)); + evictIfOverLimit(); } catch (Exception e) { + if (!closed) { errWithStackTrace(e); } } } } catch (Exception e) { + errWithStackTrace(e); } finally { - closeAllSessions(); + closeAllConnections(); } } public void close() throws Exception { closed = true; - selector.wakeup(); - closeAllSessions(); + closeAllConnections(); listen_socket.close(); } - private UpstreamSession getOrCreateSession(InetSocketAddress clientAddr, InetSocketAddress serverAddr) - throws Exception { - synchronized (activeSessions) { - UpstreamSession session = activeSessions.get(clientAddr); - if (session != null) { + private void evictIfOverLimit() { + while (activeConnections.size() > MAX_ACTIVE_CONNECTIONS) { + Iterator> i = activeConnections.entrySet().iterator(); + if (!i.hasNext()) { - return session; + return; } - - DatagramChannel channel = DatagramChannel.open(); - channel.bind(null); - channel.configureBlocking(false); - UpstreamSession newSession = new UpstreamSession(clientAddr, serverAddr, channel); - selector.wakeup(); - channel.register(selector, SelectionKey.OP_READ, newSession); - activeSessions.put(clientAddr, newSession); - return newSession; + Map.Entry oldest = i.next(); + i.remove(); + closeConnection(oldest.getKey(), oldest.getValue()); } } - private void sendToUpstream(UpstreamSession session, DatagramPacket clientPacket) throws Exception { - ByteBuffer buf = ByteBuffer.wrap(clientPacket.getData(), 0, clientPacket.getLength()); - while (buf.hasRemaining() && !closed) { - int written = session.channel.send(buf, session.serverAddr); - if (written > 0) { + private void closeConnectionIfExists(InetSocketAddress clientAddr) { + ActiveConnection oldConnection = activeConnections.remove(clientAddr); + if (oldConnection != null) { - session.touch(); - return; - } - Thread.yield(); + closeConnection(clientAddr, oldConnection); } } - private void runResponseLoop() { + private void closeConnection(InetSocketAddress clientAddr, ActiveConnection connection) { try { - while (!closed) { - selector.select(50); - processSelectedResponses(); - closeIdleSessions(); - } + + connection.duplex.close(); } catch (Exception e) { - if (!closed) { - errWithStackTrace(e); - } + errWithStackTrace(e); } - } - - private void processSelectedResponses() throws Exception { - Iterator i = selector.selectedKeys().iterator(); - while (i.hasNext()) { - SelectionKey key = i.next(); - i.remove(); - if (!key.isValid() || !key.isReadable()) { - - continue; - } + try { - UpstreamSession session = (UpstreamSession) key.attachment(); - ByteBuffer buf = ByteBuffer.allocate(MAX_DATAGRAM_SIZE); - int len = session.channel.read(buf); - if (len <= 0) { + connection.serverEndpoint.close(); + } catch (Exception e) { - continue; - } - session.touch(); - byte[] response = Arrays.copyOf(buf.array(), len); - listen_socket.sendToClient(new DatagramPacket(response, len, session.clientAddr)); + errWithStackTrace(e); } - } + try { - private void closeIdleSessions() { - synchronized (activeSessions) { - Iterator> i = activeSessions.entrySet().iterator(); - while (i.hasNext()) { - Map.Entry entry = i.next(); - if (!entry.getValue().isIdle(IDLE_SESSION_TIMEOUT_MILLIS)) { + DuplexManager.getInstance().removeDuplex(connection.duplexHash); + } catch (Exception e) { - continue; - } - i.remove(); - closeSession(entry.getValue()); - } + errWithStackTrace(e); } - } + try { - private void closeAllSessions() { - synchronized (activeSessions) { - for (UpstreamSession session : activeSessions.values()) { + listen_socket.removeConnection(clientAddr); + } catch (Exception e) { - closeSession(session); - } - activeSessions.clear(); + errWithStackTrace(e); } } - private void closeSession(UpstreamSession session) { - try { - - SelectionKey key = session.channel.keyFor(selector); - if (key != null) { - - key.cancel(); - } - session.channel.close(); - } catch (Exception e) { + private void closeAllConnections() { + for (Map.Entry entry : activeConnections.entrySet()) { - errWithStackTrace(e); + closeConnection(entry.getKey(), entry.getValue()); } + activeConnections.clear(); } } diff --git a/src/main/java/core/packetproxy/common/UDPServerSocket.java b/src/main/java/core/packetproxy/common/UDPServerSocket.java index c9710c82..2dc428c1 100644 --- a/src/main/java/core/packetproxy/common/UDPServerSocket.java +++ b/src/main/java/core/packetproxy/common/UDPServerSocket.java @@ -15,136 +15,62 @@ */ package packetproxy.common; -import static packetproxy.util.Logging.errWithStackTrace; - import java.net.DatagramPacket; import java.net.DatagramSocket; -import java.net.SocketException; -import java.util.concurrent.BlockingQueue; +import java.net.InetSocketAddress; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; public class UDPServerSocket { - private static final int MAX_DATAGRAM_SIZE = 65535; - private static final int SOCKET_BUFFER_SIZE = 1024 * 1024; - private final DatagramSocket socket; - private final BlockingQueue incomingQueue; - private final BlockingQueue outgoingQueue; - private final Thread recvThread; - private final ExecutorService executor; + private DatagramSocket socket; + private UDPConnManager connManager; public UDPServerSocket(int port) throws Exception { socket = new DatagramSocket(port); - socket.setReceiveBufferSize(SOCKET_BUFFER_SIZE); - socket.setSendBufferSize(SOCKET_BUFFER_SIZE); - incomingQueue = new LinkedBlockingQueue(); - outgoingQueue = new LinkedBlockingQueue(); - recvThread = new Thread(new Runnable() { - - @Override - public void run() { - try { - - createRecvTask().call(); - } catch (Exception e) { - - if (!socket.isClosed()) { - - errWithStackTrace(e); - } - } - } - }, "udp-listen-recv"); - recvThread.setDaemon(true); - recvThread.setPriority(Thread.MAX_PRIORITY); - executor = Executors.newSingleThreadExecutor(); + connManager = new UDPConnManager(); createRecvLoop(); } public void close() throws Exception { socket.close(); - executor.shutdownNow(); + connManager.closeAll(); } - public DatagramPacket takeReceivedPacket() throws Exception { - return incomingQueue.poll(100, TimeUnit.MILLISECONDS); + public Endpoint accept() throws Exception { + return connManager.accept(); } - public void sendToClient(DatagramPacket packet) throws Exception { - outgoingQueue.put(packet); + public void removeConnection(InetSocketAddress addr) throws Exception { + connManager.remove(addr); } private void createRecvLoop() throws Exception { - Callable sendTask = new Callable() { + ExecutorService executor = Executors.newFixedThreadPool(2); + Callable recvTask = new Callable() { public Void call() throws Exception { while (true) { - try { - - DatagramPacket sendPacket = outgoingQueue.take(); - socket.send(sendPacket); - } catch (InterruptedException e) { - - Thread.currentThread().interrupt(); - return null; - } catch (SocketException e) { - - if (socket.isClosed()) { - - return null; - } - errWithStackTrace(e); - } catch (Exception e) { - if (socket.isClosed()) { - - return null; - } - errWithStackTrace(e); - } + byte[] buf = new byte[4096]; + DatagramPacket recvPacket = new DatagramPacket(buf, 4096); + socket.receive(recvPacket); + connManager.put(recvPacket); } } }; - recvThread.start(); - executor.submit(sendTask); - } - - private Callable createRecvTask() { - return new Callable() { + Callable sendTask = new Callable() { public Void call() throws Exception { while (true) { - try { - - byte[] buf = new byte[MAX_DATAGRAM_SIZE]; - DatagramPacket recvPacket = new DatagramPacket(buf, MAX_DATAGRAM_SIZE); - socket.receive(recvPacket); - incomingQueue.put(recvPacket); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return null; - } catch (SocketException e) { - - if (socket.isClosed()) { - - return null; - } - errWithStackTrace(e); - } catch (Exception e) { - - if (socket.isClosed()) { - - return null; - } - errWithStackTrace(e); - } + DatagramPacket sendPacket = connManager.get(); + socket.send(sendPacket); } } }; + executor.submit(recvTask); + executor.submit(sendTask); } } From 3027d47a99fd6cffe1aa3907077fe9c87167e7a9 Mon Sep 17 00:00:00 2001 From: Hiroki Tokunaga Date: Thu, 12 Mar 2026 13:24:27 +0900 Subject: [PATCH 4/9] fix: stop leaked UDPConn worker threads Shut down the per-connection receive executor when a UDPConn closes so evicted UDP forwarder connections do not leave worker threads behind. Add a regression test that fails until the cleanup path actually terminates the created thread. Made-with: Cursor --- .../java/core/packetproxy/common/UDPConn.java | 19 +++- .../java/packetproxy/common/UDPConnTest.java | 106 ++++++++++++++++++ 2 files changed, 122 insertions(+), 3 deletions(-) create mode 100644 src/test/java/packetproxy/common/UDPConnTest.java diff --git a/src/main/java/core/packetproxy/common/UDPConn.java b/src/main/java/core/packetproxy/common/UDPConn.java index faf2f583..3acc608a 100644 --- a/src/main/java/core/packetproxy/common/UDPConn.java +++ b/src/main/java/core/packetproxy/common/UDPConn.java @@ -23,6 +23,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import org.apache.commons.io.output.ByteArrayOutputStream; public class UDPConn { @@ -31,6 +32,8 @@ public class UDPConn { private final InetSocketAddress addr; private final RawEndpoint rawEndpoint; private final RawEndpoint proxyRawEndpoint; + private final ExecutorService receiveExecutor; + private Future recvTaskFuture; private volatile boolean closed; public UDPConn(InetSocketAddress addr) throws Exception { @@ -38,6 +41,8 @@ public UDPConn(InetSocketAddress addr) throws Exception { this.pipe = new PipeEndpoint(addr); this.rawEndpoint = this.pipe.getRawEndpoint(); this.proxyRawEndpoint = this.pipe.getProxyRawEndpoint(); + this.receiveExecutor = Executors.newSingleThreadExecutor(); + this.recvTaskFuture = null; this.closed = false; } @@ -54,8 +59,11 @@ public void put(byte[] data) throws Exception { os.flush(); } - public void getAutomatically(final BlockingQueue queue) throws Exception { - ExecutorService executor = Executors.newSingleThreadExecutor(); + public synchronized void getAutomatically(final BlockingQueue queue) throws Exception { + if (recvTaskFuture != null) { + + return; + } Callable recvTask = new Callable() { public Void call() throws Exception { @@ -74,7 +82,7 @@ public Void call() throws Exception { return null; } }; - executor.submit(recvTask); + recvTaskFuture = receiveExecutor.submit(recvTask); } public Endpoint getEndpoint() throws Exception { @@ -87,6 +95,10 @@ public void close() throws Exception { return; } closed = true; + if (recvTaskFuture != null) { + + recvTaskFuture.cancel(true); + } try { rawEndpoint.getInputStream().close(); @@ -107,5 +119,6 @@ public void close() throws Exception { proxyRawEndpoint.getOutputStream().close(); } catch (Exception ignored) { } + receiveExecutor.shutdownNow(); } } diff --git a/src/test/java/packetproxy/common/UDPConnTest.java b/src/test/java/packetproxy/common/UDPConnTest.java new file mode 100644 index 00000000..8fec432e --- /dev/null +++ b/src/test/java/packetproxy/common/UDPConnTest.java @@ -0,0 +1,106 @@ +/* + * Copyright 2026 DeNA Co., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package packetproxy.common; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import java.net.DatagramPacket; +import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.Test; + +public class UDPConnTest { + + @Test + public void testCloseStopsAutomaticallyCreatedThread() throws Exception { + var existingThreads = new HashSet<>(Thread.getAllStackTraces().keySet()); + var conn = new UDPConn(new InetSocketAddress("127.0.0.1", 20000)); + var queue = new LinkedBlockingQueue(); + try { + + conn.getAutomatically(queue); + var payload = "request".getBytes(StandardCharsets.US_ASCII); + var endpointOutput = conn.getEndpoint().getOutputStream(); + endpointOutput.write(payload); + endpointOutput.flush(); + + var packet = queue.poll(1, TimeUnit.SECONDS); + assertNotNull(packet); + assertEquals("request", new String(packet.getData(), 0, packet.getLength(), StandardCharsets.US_ASCII)); + + var connThreads = waitForNewThreads(existingThreads); + assertFalse(connThreads.isEmpty()); + + conn.close(); + joinThreads(connThreads); + assertFalse(hasAliveThreads(connThreads)); + } finally { + + conn.close(); + } + } + + private Set waitForNewThreads(Set existingThreads) throws Exception { + long deadlineMillis = System.currentTimeMillis() + 1000; + Set newThreads = Set.of(); + while (System.currentTimeMillis() < deadlineMillis) { + + newThreads = getNewThreads(existingThreads); + if (!newThreads.isEmpty()) { + + return newThreads; + } + Thread.sleep(10); + } + return newThreads; + } + + private Set getNewThreads(Set existingThreads) { + var currentThreads = Thread.getAllStackTraces().keySet(); + var newThreads = new HashSet(); + for (var thread : currentThreads) { + + if (!existingThreads.contains(thread)) { + newThreads.add(thread); + } + } + return newThreads; + } + + private void joinThreads(Set threads) throws Exception { + for (var thread : threads) { + + thread.join(1000); + } + } + + private boolean hasAliveThreads(Set threads) { + for (var thread : threads) { + + if (thread.isAlive()) { + + return true; + } + } + return false; + } +} From ebf48b2580ec0ce8b2279bc52c9437d3cda531c1 Mon Sep 17 00:00:00 2001 From: Hiroki Tokunaga Date: Thu, 12 Mar 2026 18:24:48 +0900 Subject: [PATCH 5/9] style: revert unnecessary final modifiers Keep the UDP leak fixes intact while removing style-only final additions on pre-existing fields. Made-with: Cursor --- src/main/java/core/packetproxy/ProxyUDPForward.java | 4 ++-- src/main/java/core/packetproxy/common/UDPConn.java | 4 ++-- .../java/core/packetproxy/common/UDPSocketEndpoint.java | 8 ++++---- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/main/java/core/packetproxy/ProxyUDPForward.java b/src/main/java/core/packetproxy/ProxyUDPForward.java index 5bf91d94..52b0bd4c 100644 --- a/src/main/java/core/packetproxy/ProxyUDPForward.java +++ b/src/main/java/core/packetproxy/ProxyUDPForward.java @@ -30,8 +30,8 @@ public class ProxyUDPForward extends Proxy { private static final int MAX_ACTIVE_CONNECTIONS = 256; - private final ListenPort listen_info; - private final UDPServerSocket listen_socket; + private ListenPort listen_info; + private UDPServerSocket listen_socket; private final Map activeConnections = new LinkedHashMap<>(); private volatile boolean closed = false; diff --git a/src/main/java/core/packetproxy/common/UDPConn.java b/src/main/java/core/packetproxy/common/UDPConn.java index 3acc608a..077b6151 100644 --- a/src/main/java/core/packetproxy/common/UDPConn.java +++ b/src/main/java/core/packetproxy/common/UDPConn.java @@ -28,8 +28,8 @@ public class UDPConn { - private final PipeEndpoint pipe; - private final InetSocketAddress addr; + private PipeEndpoint pipe; + private InetSocketAddress addr; private final RawEndpoint rawEndpoint; private final RawEndpoint proxyRawEndpoint; private final ExecutorService receiveExecutor; diff --git a/src/main/java/core/packetproxy/common/UDPSocketEndpoint.java b/src/main/java/core/packetproxy/common/UDPSocketEndpoint.java index 9f97a979..abc8aa2e 100644 --- a/src/main/java/core/packetproxy/common/UDPSocketEndpoint.java +++ b/src/main/java/core/packetproxy/common/UDPSocketEndpoint.java @@ -26,10 +26,10 @@ public class UDPSocketEndpoint implements Endpoint { - private final DatagramSocket socket; - private final InetSocketAddress serverAddr; - private final PipeEndpoint pipe; - private static final int BUFSIZE = 4096; + private DatagramSocket socket; + private InetSocketAddress serverAddr; + private PipeEndpoint pipe; + private static int BUFSIZE = 4096; private final ExecutorService executor; private volatile boolean closed; From 87294bc23c05ebde8e795cabd4729ca634bb7217 Mon Sep 17 00:00:00 2001 From: Hiroki Tokunaga Date: Fri, 13 Mar 2026 13:12:51 +0900 Subject: [PATCH 6/9] style: inline UDPConn endpoint access Keep the UDP thread leak fix focused by removing cached RawEndpoint members that do not change behavior. Made-with: Cursor --- .../java/core/packetproxy/common/UDPConn.java | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/src/main/java/core/packetproxy/common/UDPConn.java b/src/main/java/core/packetproxy/common/UDPConn.java index 077b6151..2971dde6 100644 --- a/src/main/java/core/packetproxy/common/UDPConn.java +++ b/src/main/java/core/packetproxy/common/UDPConn.java @@ -30,8 +30,6 @@ public class UDPConn { private PipeEndpoint pipe; private InetSocketAddress addr; - private final RawEndpoint rawEndpoint; - private final RawEndpoint proxyRawEndpoint; private final ExecutorService receiveExecutor; private Future recvTaskFuture; private volatile boolean closed; @@ -39,8 +37,6 @@ public class UDPConn { public UDPConn(InetSocketAddress addr) throws Exception { this.addr = addr; this.pipe = new PipeEndpoint(addr); - this.rawEndpoint = this.pipe.getRawEndpoint(); - this.proxyRawEndpoint = this.pipe.getProxyRawEndpoint(); this.receiveExecutor = Executors.newSingleThreadExecutor(); this.recvTaskFuture = null; this.closed = false; @@ -54,7 +50,7 @@ public void put(byte[] data, int offset, int length) throws Exception { } public void put(byte[] data) throws Exception { - OutputStream os = rawEndpoint.getOutputStream(); + OutputStream os = pipe.getRawEndpoint().getOutputStream(); os.write(data); os.flush(); } @@ -69,7 +65,7 @@ public synchronized void getAutomatically(final BlockingQueue qu public Void call() throws Exception { while (!closed) { - InputStream is = rawEndpoint.getInputStream(); + InputStream is = pipe.getRawEndpoint().getInputStream(); byte[] buf = new byte[4096]; int len = is.read(buf); if (len < 0) { @@ -86,7 +82,7 @@ public Void call() throws Exception { } public Endpoint getEndpoint() throws Exception { - return proxyRawEndpoint; + return pipe.getProxyRawEndpoint(); } public void close() throws Exception { @@ -101,22 +97,22 @@ public void close() throws Exception { } try { - rawEndpoint.getInputStream().close(); + pipe.getRawEndpoint().getInputStream().close(); } catch (Exception ignored) { } try { - rawEndpoint.getOutputStream().close(); + pipe.getRawEndpoint().getOutputStream().close(); } catch (Exception ignored) { } try { - proxyRawEndpoint.getInputStream().close(); + pipe.getProxyRawEndpoint().getInputStream().close(); } catch (Exception ignored) { } try { - proxyRawEndpoint.getOutputStream().close(); + pipe.getProxyRawEndpoint().getOutputStream().close(); } catch (Exception ignored) { } receiveExecutor.shutdownNow(); From d4b7779cecbb474f8f0682fb4a8054e23d603fe1 Mon Sep 17 00:00:00 2001 From: Hiroki Tokunaga Date: Fri, 13 Mar 2026 14:18:19 +0900 Subject: [PATCH 7/9] style: remove redundant UDPConn receive guard Keep the UDP thread leak fix focused by dropping the extra start-once guard that is not required by the current call flow. Made-with: Cursor --- src/main/java/core/packetproxy/common/UDPConn.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/main/java/core/packetproxy/common/UDPConn.java b/src/main/java/core/packetproxy/common/UDPConn.java index 2971dde6..7c398c2f 100644 --- a/src/main/java/core/packetproxy/common/UDPConn.java +++ b/src/main/java/core/packetproxy/common/UDPConn.java @@ -55,11 +55,7 @@ public void put(byte[] data) throws Exception { os.flush(); } - public synchronized void getAutomatically(final BlockingQueue queue) throws Exception { - if (recvTaskFuture != null) { - - return; - } + public void getAutomatically(final BlockingQueue queue) throws Exception { Callable recvTask = new Callable() { public Void call() throws Exception { From 0d2818c8fc9f8a0dfff31b801a55471b3b5b3177 Mon Sep 17 00:00:00 2001 From: Hiroki Tokunaga Date: Fri, 13 Mar 2026 14:29:48 +0900 Subject: [PATCH 8/9] fix: reject restarting UDPConn after close Make the UDPConn lifecycle explicit by failing fast when automatic receive is requested after the connection has already been closed. Made-with: Cursor --- src/main/java/core/packetproxy/common/UDPConn.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/main/java/core/packetproxy/common/UDPConn.java b/src/main/java/core/packetproxy/common/UDPConn.java index 7c398c2f..0b1a0fcd 100644 --- a/src/main/java/core/packetproxy/common/UDPConn.java +++ b/src/main/java/core/packetproxy/common/UDPConn.java @@ -56,6 +56,10 @@ public void put(byte[] data) throws Exception { } public void getAutomatically(final BlockingQueue queue) throws Exception { + if (closed) { + + throw new IllegalStateException("UDPConn is already closed"); + } Callable recvTask = new Callable() { public Void call() throws Exception { From 883509e6646d5ec013d498f9a4f34735a2fc739a Mon Sep 17 00:00:00 2001 From: Hiroki Tokunaga Date: Fri, 13 Mar 2026 14:33:03 +0900 Subject: [PATCH 9/9] style: simplify UDPConn close cleanup Extract the repeated close-and-ignore pattern so the shutdown path stays compact without changing behavior. Made-with: Cursor --- .../java/core/packetproxy/common/UDPConn.java | 24 +++++++------------ 1 file changed, 8 insertions(+), 16 deletions(-) diff --git a/src/main/java/core/packetproxy/common/UDPConn.java b/src/main/java/core/packetproxy/common/UDPConn.java index 0b1a0fcd..2fe12efc 100644 --- a/src/main/java/core/packetproxy/common/UDPConn.java +++ b/src/main/java/core/packetproxy/common/UDPConn.java @@ -95,26 +95,18 @@ public void close() throws Exception { recvTaskFuture.cancel(true); } - try { - - pipe.getRawEndpoint().getInputStream().close(); - } catch (Exception ignored) { - } - try { - - pipe.getRawEndpoint().getOutputStream().close(); - } catch (Exception ignored) { - } - try { + closeQuietly(pipe.getRawEndpoint().getInputStream()); + closeQuietly(pipe.getRawEndpoint().getOutputStream()); + closeQuietly(pipe.getProxyRawEndpoint().getInputStream()); + closeQuietly(pipe.getProxyRawEndpoint().getOutputStream()); + receiveExecutor.shutdownNow(); + } - pipe.getProxyRawEndpoint().getInputStream().close(); - } catch (Exception ignored) { - } + private void closeQuietly(AutoCloseable closeable) { try { - pipe.getProxyRawEndpoint().getOutputStream().close(); + closeable.close(); } catch (Exception ignored) { } - receiveExecutor.shutdownNow(); } }