package org.rx.socks.proxy; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.rx.common.Logger; import java.util.function.BiConsumer; import static org.rx.common.Contract.require; public class DirectClientHandler extends SimpleChannelInboundHandler<byte[]> {
private BiConsumer<ChannelHandlerContext, byte[]> onReceive;
private ChannelHandlerContext ctx; public Channel getChannel() {
require(ctx);
return ctx.channel();
} public DirectClientHandler(BiConsumer<ChannelHandlerContext, byte[]> onReceive) {
require(onReceive); this.onReceive = onReceive;
} @Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
this.ctx = ctx;
Logger.info("DirectClientHandler %s connect %s", ctx.channel().localAddress(), ctx.channel().remoteAddress());
} @Override
protected void channelRead0(ChannelHandlerContext ctx, byte[] bytes) {
onReceive.accept(ctx, bytes);
Logger.info("DirectClientHandler %s recv %s bytes from %s", ctx.channel().remoteAddress(), bytes.length,
ctx.channel().localAddress());
} public ChannelFuture send(byte[] bytes) {
try {
return ctx.channel().writeAndFlush(bytes);
} finally {
Logger.info("DirectClientHandler %s send %s bytes to %s", ctx.channel().localAddress(), bytes.length,
ctx.channel().remoteAddress());
}
} @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
Logger.error(cause, "DirectClientHandler");
ctx.close();
}
}
package org.rx.socks.proxy; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.rx.common.Logger; import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer; import static org.rx.common.Contract.require; public class DirectServerHandler extends SimpleChannelInboundHandler<byte[]> {
private static class ClientState {
private ProxyClient directClient;
// private int length;
// private MemoryStream stream; public ProxyClient getDirectClient() {
return directClient;
} public ClientState(boolean enableSsl, SocketAddress directAddress,
BiConsumer<ChannelHandlerContext, byte[]> onReceive) {
require(directAddress, onReceive); directClient = new ProxyClient();
directClient.setEnableSsl(enableSsl);
directClient.connect((InetSocketAddress) directAddress, onReceive);
// stream = new MemoryStream(32, true);
} // private int readRemoteAddress(byte[] bytes) {
// int offset = 0;
// if (length == -1) {
// stream.setLength(length = Bytes.toInt(bytes, 0));
// stream.setPosition(0);
// offset = Integer.BYTES;
// }
// int count = length - stream.getPosition();
// stream.write(bytes, offset, Math.min(count, bytes.length));
// if (stream.getPosition() < length) {
// return -1;
// }
//
// directAddress = Sockets.parseAddress(Bytes.toString(stream.getBuffer(), 0, length));
// length = -1;
// return bytes.length - count;
// }
} private final Map<ChannelHandlerContext, ClientState> clients;
private boolean enableSsl;
private SocketAddress directAddress; public DirectServerHandler(boolean enableSsl, SocketAddress directAddress) {
require(directAddress); clients = new ConcurrentHashMap<>();
this.enableSsl = enableSsl;
this.directAddress = directAddress;
} @Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
clients.put(ctx, new ClientState(enableSsl, directAddress, (directChannel, bytes) -> {
ctx.writeAndFlush(bytes);
Logger.info("DirectServerHandler %s recv %s bytes from %s", ctx.channel().remoteAddress(), bytes.length,
directAddress);
}));
Logger.info("DirectServerHandler %s connect %s", ctx.channel().remoteAddress(), directAddress);
} @Override
protected void channelRead0(ChannelHandlerContext ctx, byte[] bytes) {
ClientState state = clients.get(ctx);
require(state); ProxyClient directClient = state.getDirectClient();
directClient.send(bytes);
Logger.info("DirectServerHandler %s send %s bytes to %s",
directClient.getHandler().getChannel().remoteAddress(), bytes.length, ctx.channel().remoteAddress());
} @Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
clients.remove(ctx);
Logger.info("DirectServerHandler %s disconnect %s", ctx.channel().remoteAddress(), directAddress);
} @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
Logger.error(cause, "DirectServerHandler");
ctx.close();
}
}
package org.rx.socks.proxy; import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.bytes.ByteArrayDecoder;
import io.netty.handler.codec.bytes.ByteArrayEncoder;
import io.netty.handler.codec.compression.ZlibCodecFactory;
import io.netty.handler.codec.compression.ZlibWrapper;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import lombok.SneakyThrows;
import org.rx.common.App;
import org.rx.common.Disposable; import java.net.InetSocketAddress;
import java.util.function.BiConsumer; import static org.rx.common.Contract.require;
import static org.rx.socks.proxy.ProxyServer.Compression_Key; public class ProxyClient extends Disposable {
private EventLoopGroup group;
private boolean enableSsl;
private DirectClientHandler handler; public boolean isEnableSsl() {
return enableSsl;
} public void setEnableSsl(boolean enableSsl) {
this.enableSsl = enableSsl;
} public boolean isEnableCompression() {
return App.convert(App.readSetting(Compression_Key), boolean.class);
} public DirectClientHandler getHandler() {
checkNotClosed();
return handler;
} @Override
protected void freeObjects() {
if (group != null) {
group.shutdownGracefully();
}
} public void connect(InetSocketAddress remoteAddress) {
connect(remoteAddress, null);
} @SneakyThrows
public void connect(InetSocketAddress remoteAddress, BiConsumer<ChannelHandlerContext, byte[]> onReceive) {
checkNotClosed();
require(group == null);
require(remoteAddress); // Configure SSL.
SslContext sslCtx = null;
if (enableSsl) {
sslCtx = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();
} Bootstrap b = new Bootstrap();
SslContext ssl = sslCtx;
b.group(group = new NioEventLoopGroup()).channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
if (ssl != null) {
pipeline.addLast(
ssl.newHandler(ch.alloc(), remoteAddress.getHostName(), remoteAddress.getPort()));
}
if (isEnableCompression()) {
pipeline.addLast(ZlibCodecFactory.newZlibEncoder(ZlibWrapper.GZIP));
pipeline.addLast(ZlibCodecFactory.newZlibDecoder(ZlibWrapper.GZIP));
} pipeline.addLast(new ByteArrayDecoder());
pipeline.addLast(new ByteArrayEncoder()); pipeline.addLast(new DirectClientHandler(onReceive));
}
});
ChannelFuture f = b.connect(remoteAddress).sync();
handler = (DirectClientHandler) f.channel().pipeline().last();
} public ChannelFuture send(byte[] bytes) {
checkNotClosed();
require(group != null);
require(bytes); return getHandler().send(bytes);
}
}
package org.rx.socks.proxy; import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.bytes.ByteArrayDecoder;
import io.netty.handler.codec.bytes.ByteArrayEncoder;
import io.netty.handler.codec.compression.ZlibCodecFactory;
import io.netty.handler.codec.compression.ZlibWrapper;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.SelfSignedCertificate;
import lombok.SneakyThrows;
import org.rx.common.App;
import org.rx.common.Disposable;
import org.rx.socks.Sockets; import java.net.InetSocketAddress;
import java.net.SocketAddress; import static org.rx.common.Contract.require; public final class ProxyServer extends Disposable {
public static final String Compression_Key = "app.netProxy.compression";
public static final String ListenBlock_Key = "app.netProxy.listenBlock";
private EventLoopGroup group;
private boolean enableSsl; public boolean isEnableSsl() {
return enableSsl;
} public void setEnableSsl(boolean enableSsl) {
this.enableSsl = enableSsl;
} public boolean isEnableCompression() {
return App.convert(App.readSetting(Compression_Key), boolean.class);
} public boolean isListening() {
return group != null;
} private boolean isListenBlock() {
return App.convert(App.readSetting(ListenBlock_Key), boolean.class);
} @Override
protected void freeObjects() {
if (group != null) {
group.shutdownGracefully();
}
} public void start(int localPort, SocketAddress directAddress) {
start(new InetSocketAddress(Sockets.AnyAddress, localPort), directAddress);
} @SneakyThrows
public void start(SocketAddress localAddress, SocketAddress directAddress) {
checkNotClosed();
require(group == null);
require(localAddress); // Configure SSL.
SslContext sslCtx = null;
if (enableSsl) {
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
} ServerBootstrap b = new ServerBootstrap();
SslContext ssl = sslCtx;
b.group(group = new NioEventLoopGroup()).channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
if (ssl != null) {
pipeline.addLast(ssl.newHandler(ch.alloc()));
}
if (isEnableCompression()) {
// Enable stream compression (you can remove these two if unnecessary)
pipeline.addLast(ZlibCodecFactory.newZlibEncoder(ZlibWrapper.GZIP));
pipeline.addLast(ZlibCodecFactory.newZlibDecoder(ZlibWrapper.GZIP));
} // Add the number codec first,
pipeline.addLast(new ByteArrayDecoder());
pipeline.addLast(new ByteArrayEncoder()); // and then business logic.
// Please note we create a handler for every new channel because it has stateful properties.
pipeline.addLast(new DirectServerHandler(enableSsl, directAddress));
}
});
ChannelFuture f = b.bind(localAddress).sync();
if (isListenBlock()) {
f.channel().closeFuture().sync();
}
} public void closeClients() {
checkNotClosed();
if (group == null) {
return;
} group.shutdownGracefully();
group = null;
}
}