Skip to content
4 changes: 4 additions & 0 deletions src/main/java/core/packetproxy/DuplexManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ public Duplex getDuplex(int hash) {
return duplex_list.get(hash);
}

public void removeDuplex(int hash) {
duplex_list.remove(hash);
}

public boolean has(int hash) {
return (duplex_list.get(hash) == null) ? false : true;
}
Expand Down
112 changes: 103 additions & 9 deletions src/main/java/core/packetproxy/ProxyUDPForward.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,33 @@
import static packetproxy.util.Logging.log;

import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import packetproxy.common.Endpoint;
import packetproxy.common.UDPServerSocket;
import packetproxy.common.UDPSocketEndpoint;
import packetproxy.model.ListenPort;

public class ProxyUDPForward extends Proxy {

private static final int MAX_ACTIVE_CONNECTIONS = 256;
private ListenPort listen_info;
private UDPServerSocket listen_socket;
private final Map<InetSocketAddress, ActiveConnection> activeConnections = new LinkedHashMap<>();
private volatile boolean closed = false;

private static class ActiveConnection {
private final DuplexAsync duplex;
private final UDPSocketEndpoint serverEndpoint;
private final int duplexHash;

ActiveConnection(DuplexAsync duplex, UDPSocketEndpoint serverEndpoint, int duplexHash) {
this.duplex = duplex;
this.serverEndpoint = serverEndpoint;
this.duplexHash = duplexHash;
}
}

public ProxyUDPForward(ListenPort listen_info) throws Exception {
this.listen_info = listen_info;
Expand All @@ -38,26 +56,102 @@ public ProxyUDPForward(ListenPort listen_info) throws Exception {
public void run() {
try {

while (true) {
while (!closed) {
try {

Endpoint client_endpoint = listen_socket.accept();
log("accept");

InetSocketAddress clientAddr = client_endpoint.getAddress();
InetSocketAddress serverAddr = listen_info.getServer().getAddress();
UDPSocketEndpoint server_endpoint = new UDPSocketEndpoint(serverAddr);

Endpoint client_endpoint = listen_socket.accept();
log("accept");
DuplexAsync duplex = DuplexFactory.createDuplexAsync(client_endpoint, server_endpoint,
listen_info.getServer().getEncoder());
duplex.start();
int duplexHash = DuplexManager.getInstance().registerDuplex(duplex);

InetSocketAddress serverAddr = listen_info.getServer().getAddress();
UDPSocketEndpoint server_endpoint = new UDPSocketEndpoint(serverAddr);
closeConnectionIfExists(clientAddr);
activeConnections.put(clientAddr, new ActiveConnection(duplex, server_endpoint, duplexHash));
evictIfOverLimit();
} catch (Exception e) {

DuplexAsync duplex = DuplexFactory.createDuplexAsync(client_endpoint, server_endpoint,
listen_info.getServer().getEncoder());
duplex.start();
DuplexManager.getInstance().registerDuplex(duplex);
if (!closed) {
errWithStackTrace(e);
}
}
}
} catch (Exception e) {

errWithStackTrace(e);
} finally {
closeAllConnections();
}
}

public void close() throws Exception {
closed = true;
closeAllConnections();
listen_socket.close();
}

private void evictIfOverLimit() {
while (activeConnections.size() > MAX_ACTIVE_CONNECTIONS) {
Iterator<Map.Entry<InetSocketAddress, ActiveConnection>> i = activeConnections.entrySet().iterator();
if (!i.hasNext()) {

return;
}
Map.Entry<InetSocketAddress, ActiveConnection> oldest = i.next();
i.remove();
closeConnection(oldest.getKey(), oldest.getValue());
}
}

private void closeConnectionIfExists(InetSocketAddress clientAddr) {
ActiveConnection oldConnection = activeConnections.remove(clientAddr);
if (oldConnection != null) {

closeConnection(clientAddr, oldConnection);
}
}

private void closeConnection(InetSocketAddress clientAddr, ActiveConnection connection) {
try {

connection.duplex.close();
} catch (Exception e) {

errWithStackTrace(e);
}
try {

connection.serverEndpoint.close();
} catch (Exception e) {

errWithStackTrace(e);
}
try {

DuplexManager.getInstance().removeDuplex(connection.duplexHash);
} catch (Exception e) {

errWithStackTrace(e);
}
try {

listen_socket.removeConnection(clientAddr);
} catch (Exception e) {

errWithStackTrace(e);
}
}

private void closeAllConnections() {
for (Map.Entry<InetSocketAddress, ActiveConnection> entry : activeConnections.entrySet()) {

closeConnection(entry.getKey(), entry.getValue());
}
activeConnections.clear();
}
}
46 changes: 43 additions & 3 deletions src/main/java/core/packetproxy/common/UDPConn.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,23 @@
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.commons.io.output.ByteArrayOutputStream;

public class UDPConn {

private PipeEndpoint pipe;
private InetSocketAddress addr;
private final ExecutorService receiveExecutor;
private Future<Void> recvTaskFuture;
private volatile boolean closed;

public UDPConn(InetSocketAddress addr) throws Exception {
this.addr = addr;
this.pipe = new PipeEndpoint(addr);
this.receiveExecutor = Executors.newSingleThreadExecutor();
this.recvTaskFuture = null;
this.closed = false;
}

public void put(byte[] data, int offset, int length) throws Exception {
Expand All @@ -49,24 +56,57 @@ public void put(byte[] data) throws Exception {
}

public void getAutomatically(final BlockingQueue<DatagramPacket> queue) throws Exception {
ExecutorService executor = Executors.newSingleThreadExecutor();
if (closed) {

throw new IllegalStateException("UDPConn is already closed");
}
Callable<Void> recvTask = new Callable<Void>() {

public Void call() throws Exception {
while (true) {
while (!closed) {

InputStream is = pipe.getRawEndpoint().getInputStream();
byte[] buf = new byte[4096];
int len = is.read(buf);
if (len < 0) {

return null;
}
DatagramPacket recvPacket = new DatagramPacket(buf, len, addr);
queue.put(recvPacket);
}
return null;
}
};
executor.submit(recvTask);
recvTaskFuture = receiveExecutor.submit(recvTask);
}

public Endpoint getEndpoint() throws Exception {
return pipe.getProxyRawEndpoint();
}

public void close() throws Exception {
if (closed) {

return;
}
closed = true;
if (recvTaskFuture != null) {

recvTaskFuture.cancel(true);
}
closeQuietly(pipe.getRawEndpoint().getInputStream());
closeQuietly(pipe.getRawEndpoint().getOutputStream());
closeQuietly(pipe.getProxyRawEndpoint().getInputStream());
closeQuietly(pipe.getProxyRawEndpoint().getOutputStream());
receiveExecutor.shutdownNow();
}

private void closeQuietly(AutoCloseable closeable) {
try {

closeable.close();
} catch (Exception ignored) {
}
}
}
41 changes: 34 additions & 7 deletions src/main/java/core/packetproxy/common/UDPConnManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,29 @@ public UDPConnManager() {
}

public Endpoint accept() throws Exception {
InetSocketAddress addr = acceptedQueue.take();
return connList.get(addr).getEndpoint();
while (true) {
InetSocketAddress addr = acceptedQueue.take();
synchronized (this) {
UDPConn conn = connList.get(addr);
if (conn != null) {

return conn.getEndpoint();
}
}
}
}

public void put(DatagramPacket packet) throws Exception {
InetSocketAddress addr = new InetSocketAddress(packet.getAddress(), packet.getPort());
UDPConn conn = this.query(addr);
if (conn == null) {
UDPConn conn;
synchronized (this) {
conn = this.query(addr);
if (conn == null) {

conn = this.create(addr);
conn.getAutomatically(recvQueue);
acceptedQueue.put(addr);
conn = this.create(addr);
conn.getAutomatically(recvQueue);
acceptedQueue.put(addr);
}
}
conn.put(packet.getData(), 0, packet.getLength());
}
Expand All @@ -64,4 +75,20 @@ private UDPConn create(InetSocketAddress key) throws Exception {
connList.put(key, conn);
return conn;
}

public synchronized void remove(InetSocketAddress key) throws Exception {
UDPConn conn = connList.remove(key);
if (conn != null) {

conn.close();
}
}

public synchronized void closeAll() throws Exception {
for (UDPConn conn : connList.values()) {

conn.close();
}
connList.clear();
}
}
6 changes: 6 additions & 0 deletions src/main/java/core/packetproxy/common/UDPServerSocket.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -34,12 +35,17 @@ public UDPServerSocket(int port) throws Exception {

public void close() throws Exception {
socket.close();
connManager.closeAll();
}

public Endpoint accept() throws Exception {
return connManager.accept();
}

public void removeConnection(InetSocketAddress addr) throws Exception {
connManager.remove(addr);
}

private void createRecvLoop() throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(2);
Callable<Void> recvTask = new Callable<Void>() {
Expand Down
Loading
Loading