Netty基础—6.Netty实现RPC服务一

时间:2025-03-17 09:02:50

大纲

1.RPC的相关概念

2.RPC服务调用端动态代理实现

3.Netty客户端之RPC远程调用过程分析

4.RPC网络通信中的编码解码器

5.Netty服务端之RPC服务提供端的处理

6.RPC服务调用端实现超时功能

1.RPC的相关概念

(1)什么是RPC

(2)什么是静态代理

(3)什么是动态代理

(4)动态代理总结

(1)什么是RPC

本地只有一个方法的接口,需要在本地对这个方法进行远程调用,而对这个方法进行调用其实就是对该接口的动态代理进行调用。

动态代理的底层会把对这个方法的调用封装成一个请求,然后把这个请求序列化成一个二进制数据请求,之后再通过Netty网络通信将二进制数据请求发送给远程机器。

远程机器会启动一个Netty服务端来监听连接和请求,然后把二进制数据反序列化成请求对象,接着再根据这个请求对象在本地找到要调用的那个方法,最后通过反射去调用这个方法并获取结果进行返回。

(2)代理模式

代理模式一般分为静态代理和动态代理两种。

静态代理:就是提前创建好代理类,并且在程序运行前代理类已经被编译成字节码了。

动态代理:就是运行时才生成代理类,即代理类的字节码会在运行时动态生成并载入到ClassLoader中。

(3)静态代理

如果要对一个实现某接口的类的一个方法进行增强,在不影响原接口的前提下只能重新实现该接口。如果要增强的类有很多,那么每一个类都需要重新实现一遍,比较麻烦。比如在下面的例子中,如果还要代理IReceiver接口的实现类,那么还需要定义一个ProxyReceiver代理类去实现IReceiver接口。因为具体的代理类是需要实现被代理类的接口的。

//第一步:定义接口
public interface ISender {
    public boolean send();
}

//第二步:定义真实的实现类,被代理类
public class SmsSender implements ISender {
    public boolean send() {
        System.out.println("Sending msg");
        return true;
    }
}

//第三步:定义代理类,封装实现类
//代理类在不影响真实类的情况下,实现功能的扩展
//如果还要代理IReceiver接口的实现类,那么还需要定义一个ProxyReceiver实现IReceiver接口
public class ProxySender implements ISender {
    private ISender sender;
    
    public ProxySender(ISender sender) {
        this.sender = sender;
    }
    
    public boolean send() {
        System.out.println("处理前");
        boolean result = sender.send();
        System.out.println("处理后");
        return result;
    }
}

//第四步:客户端调用
@Test
public void testStaticProxy() {
    ISender sender = new ProxySender(new SmsSender());
    boolean result = sender.send();
    System.out.println("输出结果:" + result);
}

(4)动态代理

与静态代理相比,动态代理有更多优势。动态代理不仅不需要定义代理类,甚至可以在运行时指定代理类的执行逻辑,从而大大提升系统的灵活性。比如下面的JDK动态代理例子中,只需要定义一个实现InvocationHandler接口的JdkProxyHandler类,就能同时代理ISender和IReceiver接口的实现类了。因为JdkProxyHandler类是不依赖具体的被代理类的接口的。

目前动态代理类的生成方法有如下几种:

JDK动态代理:内置在JDK中,不需要引入第三方jar,使用简单,但功能比较弱。

CGLib和Javassist:这两个都是高级字节码生成库,总体性能比JDK动态代理好,且功能强大。

ASM:低级字节码生成工具,近乎使用字节码编码,对开发人员要求最高,当然性能也是最好。

一.JDK动态代理

首先需要定义一个InvocationHandler类,该类要实现InvocationHandler接口的invoke()方法来拦截对被代理类接口方法的调用。

然后当客户端调用Proxy.newProxyInstance()方法,并传入被代理类的接口和一个封装了被代理对象的InvocationHandler对象时,便会动态生成一个代理类并返回一个实现了被代理类接口的代理对象。

后续就可以向代理对象调用被代理类的接口方法,对应的方法调用就会转发给InvocationHandler对象的invoke()方法,从而实现对被代理对象的方法进行调用时的拦截和增强。

//第一步:定义接口
public interface ISender {
    public boolean send();
}

//第二步:定义实现上述接口的被代理类
public class SmsSender implements ISender {
    public boolean send() {
        System.out.println("Sending msg");
        return true;
    }
}

//第三步:定义一个InvocationHandler类
public class JdkProxyHandler implements InvocationHandler {
    private Object target;
    
    public JdkProxyHandler(Object target) {
        this.target = target;
    }
    
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        System.out.println("处理前");
        Object result = method.invoke(target, args);
        System.out.println("处理后");
        return result;
    }
}

//第四步:客户端调用
@Test
public void testJdkProxy() {
    //动态生成一个代理类,并返回一个实现了被代理类接口的代理对象
    //入参是:类加载器、被代理类的类型、封装了一个被代理对象的InvocationHandler对象
    ISender sender = (ISender) Proxy.newProxyInstance(
        ClassLoader.getSystemClassLoader(),
        new Class[]{ISender.class},
        new JdkProxyHandler(new SmsSender())
    );
    
    //向代理对象调用被代理类的接口方法
    boolean result = sender.send();
    System.out.println("代理对象:" + sender.getClass().getName());
    System.out.println("输出结果:" + result);
}

JDK动态代理是通过Proxy.newProxyInstance()方法来动态生成代理对象的,JDK动态代理的底层是通过Java反射机制实现的,并且需要目标对象(被代理对象)继承自一个接口才能生成它的代理类。

public class Proxy implements java.io.Serializable {
    ...
    //该方法有3个参数:
    //loader:用哪个类加载器去加载代理对象,生成目标对象的代理需要确保其类加载器相同,所以需要将目标对象的类加载器作为参数传递
    //interfaces:代理类需实现的接口列表,JDK动态代理技术需要代理类和目标对象都继承自同一接口,所以需要将目标对象的接口作为参数传递
    //h:调用处理器,调用实现了InvocationHandler类的一个回调方法,对目标对象的增强逻辑在这个实现类中
    public static Object newProxyInstance(ClassLoader loader, Class<?>[] interfaces, InvocationHandler h) throws IllegalArgumentException {
        Objects.requireNonNull(h);
        final Class<?>[] intfs = interfaces.clone();
        final SecurityManager sm = System.getSecurityManager();
        if (sm != null) {
            checkProxyAccess(Reflection.getCallerClass(), loader, intfs);
        }
        //获取被代理类的类型
        Class<?> cl = getProxyClass0(loader, intfs);
        try {
            if (sm != null) {
                checkNewProxyPermission(Reflection.getCallerClass(), cl);
            }
            //通过反射创建代理对象实例
            final Constructor<?> cons = cl.getConstructor(constructorParams);
            final InvocationHandler ih = h;
            if (!Modifier.isPublic(cl.getModifiers())) {
                AccessController.doPrivileged(new PrivilegedAction<Void>() {
                    public Void run() {
                        cons.setAccessible(true);
                        return null;
                    }
                });
            }
            return cons.newInstance(new Object[]{h});
        } catch (IllegalAccessException|InstantiationException e) {
            throw new InternalError(e.toString(), e);
        } catch (InvocationTargetException e) {
            Throwable t = e.getCause();
            if (t instanceof RuntimeException) {
                throw (RuntimeException) t;
            } else {
                throw new InternalError(t.toString(), t);
            }
        } catch (NoSuchMethodException e) {
            throw new InternalError(e.toString(), e);
        }
    }
    ...
}

二.CGLib动态代理

和JDK动态代理不同,CGLib动态代理不需要目标对象实现自一个接口,只需要实现一个处理代理逻辑的切入类以及实现MethodInterceptor接口。

CGLib动态代理的特点如下:

使用CGLib实现动态代理,完全不受被代理类必须实现自一个接口的限制。CGLib底层采用ASM字节码生成框架,使用字节码技术生成代理类比使用Java反射的效率要高。CGLib不能对声明为final的方法进行代理,因为CGLib原理是动态生成被代理类的子类。

//使用CGLib动态代理前需要引入依赖
<dependency>
    <groupId>cglib</groupId>
    <artifactId>cglib</artifactId>
    <version>3.3.0</version>
</dependency>

//第一步:定义一个被代理类
public class BdSender {
    public boolean send() {
        System.out.println("Sending msg");
        return true;
    }
}

//第二步:实现一个处理代理逻辑的切入类以及实现MethodInterceptor接口
public class CglibProxyInterceptor implements MethodInterceptor {
    private Enhancer enhancer = new Enhancer();
    
    //获取代理类
    //@param clazz 被代理类
    public Object getProxy(Class clazz) {
        enhancer.setSuperclass(clazz);
        enhancer.setCallback(this);
        return enhancer.create();
    }
    
    @Override
    public Object intercept(Object object, Method method, Object[] args, MethodProxy methodProxy) throws Throwable {
        System.out.println("处理前");
        Object result = methodProxy.invokeSuper(object,args);
        System.out.println("处理后");
        return result;
    }
}

//第三步:客户端调用
@Test
public void testCglibProxy(){
    BdSender sender = (BdSender) new CglibProxyInterceptor().getProxy(BdSender.class);
    boolean result = sender.send();
    System.out.println("代理对象:" + sender.getClass().getName());
    System.out.println("输出结果:" + result);
}

三.Javassist动态代理

Javassist是一个开源的分析、编辑和创建Java字节码的类库,可以直接编辑和生成Java生成的字节码。相对于ASM这些工具,开发者不需要了解虚拟机指令,就能动态改变类的结构或者动态生成类。

使用Javassist生成动态代理有以下两种方式:

代理工厂创建:需要实现一个用于处理代理逻辑的切入类以及实现MethodHandler接口,类似CGLib。

动态代码创建:可以通过Java代码生成字节码,这种方式创建的动态代理非常灵活,甚至可以在运行时生成业务逻辑。

方式一:代理工厂创建

//使用Javassist前需要引入依赖
<dependency>
    <groupId>org.javassist</groupId>
    <artifactId>javassist</artifactId>
    <version>3.27.0-GA</version>
</dependency>

//第一步:定义一个被代理类
public class BdSender {
    public boolean send() {
        System.out.println("Sending msg");
        return true;
    }
}

//第二步:实现一个处理代理逻辑的切入类以及实现MethodHandler接口
public class JavassistProxyHandler implements MethodHandler {
    private ProxyFactory proxyFactory = new ProxyFactory();
    
    //获取代理对象
    //@param clazz 被代理类
    public Object getProxy(Class clazz) throws Exception {
        proxyFactory.setSuperclass(clazz);
        Class<?> factoryClass = proxyFactory.createClass();
        Object proxy = factoryClass.newInstance();
        ((ProxyObject)proxy).setHandler(this);
        return proxy;
    }
    
    @Override
    public Object invoke(Object object, Method method, Method method1, Object[] args) throws Throwable {
        System.out.println("处理前");
        Object result = method1.invoke(object,args);
        System.out.println("处理后");
        return result;
    }
}

//第三步:客户端调用
@Test
public void testJavassistProxy() throws Exception {
    BdSender sender = (BdSender) new JavassistProxyHandler().getProxy(BdSender.class);
    boolean result = sender.send();
    System.out.println("代理对象:" + sender.getClass().getName());
    System.out.println("输出结果:" + result);
}

方式二:动态代码创建

//第一步:定义一个被代理类
public class BdSender {
    public boolean send() {
        System.out.println("Sending msg");
        return true;
    }
}

//第二步:生成字节码
public class JavassistDynamicCodeProxy {
    public static Object getProxy(Class clazz) throws Exception {
        ClassPool mPool = ClassPool.getDefault();
        CtClass c0 = mPool.get(clazz.getName());
        //定义代理类名称
        CtClass mCtc = mPool.makeClass(clazz.getName() + "$$BytecodeProxy");
        //添加父类继承
        mCtc.setSuperclass(c0);
        //添加类的字段信息
        CtField field = new CtField(c0, "real", mCtc);
        field.setModifiers(AccessFlag.PRIVATE);
        mCtc.addField(field);
        //添加构造函数
        CtConstructor constructor = new CtConstructor(new CtClass[]{c0},mCtc);
        constructor.setBody("{$0.real = $1;}"); // $0代表this, $1代表构造函数的第1个参数
        mCtc.addConstructor(constructor);
        //添加方法
        CtMethod ctMethod = mCtc.getSuperclass().getDeclaredMethod("send");
        CtMethod newMethod = new CtMethod(ctMethod.getReturnType(), ctMethod.getName(),ctMethod.getParameterTypes(), mCtc);
        newMethod.setBody("{" +
            "System.out.println(\"处理前\");" +
            "boolean result = $0.real.send();" +
            "System.out.println(\"处理后\");" +
            "return result;}");
        mCtc.addMethod(newMethod);
        //生成动态类
        return mCtc.toClass().getConstructor(clazz).newInstance(clazz.newInstance());
    }
}

//第三步:客户端调用
@Test
public void testJavassisBytecodetProxy() throws Exception {
    BdSender sender = (BdSender) JavassistDynamicCodeProxy.getProxy(BdSender.class);
    boolean result = sender.send();
    System.out.println("代理对象:" + sender.getClass().getName());
    System.out.println("输出结果:" + result);
}

(4)动态代理总结

JDK动态代理需要实现InvocationHandler接口,CGLib动态代理需要实现MethodInterceptor接口,Javassist动态代理需要实现MethodHandler接口或者直接生成字节码。

动态代理中的动态是针对静态代理而言的,动态代理的优势不在于省去了编写静态代理类时的代码量,而是实现了可以在被代理类未知的时候就确定代理类的代理行为。

2.RPC服务调用端动态代理实现

RPC服务对目标接口发起调用时,首先会使用比如JDK动态代理的方式去创建一个动态代理类,然后再由这个动态代理类通过Netty将调用请求发送给目标机器进行处理。

所以关键代码如下:

Proxy.newProxyInstance() ->

nettyRpcClient.connect() ->

nettyRpcClient.remoteCall(rpcRequest)

public class NettyRpcClientTest {
    private static final Logger logger = LogManager.getLogger(NettyRpcClientTest.class);
    public static void main(String[] args) {
        //发起RPC调用时,是针对哪一个接口发起的
        ReferenceConfig referenceConfig = new ReferenceConfig(TestService.class);
        //创建动态代理类
        TestService testService = (TestService) RpcServiceProxy.createProxy(referenceConfig);
        //下面的调用会走到RpcServiceProxy.ServiceProxyInvocationHandler的invoke()方法去
        String result = testService.sayHello("zhangsan");
        logger.info("rpc call finished: " + result);
    }
}

public class RpcServiceProxy {
    //创建代理
    public static Object createProxy(ReferenceConfig referenceConfig) {
        return Proxy.newProxyInstance(
            RpcServiceProxy.class.getClassLoader(),
            new Class[]{referenceConfig.getServiceInterfaceClass()},
            new ServiceProxyInvocationHandler(referenceConfig)
        );
    }
    static class ServiceProxyInvocationHandler implements InvocationHandler {
        private ReferenceConfig referenceConfig;
        public ServiceProxyInvocationHandler(ReferenceConfig referenceConfig) {
            this.referenceConfig = referenceConfig;
        }
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            //发起连接
            NettyRpcClient nettyRpcClient = new NettyRpcClient(referenceConfig);
            nettyRpcClient.connect();


            RpcRequest rpcRequest = new RpcRequest();
            rpcRequest.setRequestId(UUID.randomUUID().toString().replace("-", ""));
            rpcRequest.setServiceInterfaceClass(referenceConfig.getServiceInterfaceClass().getName());
            rpcRequest.setMethodName(method.getName());
            rpcRequest.setParameterTypes(method.getParameterTypes());
            rpcRequest.setArgs(args);
        
            //进行RPC调用
            RpcResponse rpcResponse = nettyRpcClient.remoteCall(rpcRequest);
            return rpcResponse.getResult();
        }
    }
}

public class NettyRpcClient {
    private static final Logger logger = LogManager.getLogger(NettyRpcClient.class);

    private String serviceHost;
    private int servicePort;
    private ChannelFuture channelFuture;
    private NettyRpcClientHandler nettyRpcClientHandler;

    public NettyRpcClient(String serviceHost, int servicePort) {
        this.serviceHost = serviceHost;
        this.servicePort = servicePort;
        this.nettyRpcClientHandler = new NettyRpcClientHandler();
    }

    public void connect() {
        logger.info("connecting to Netty RPC server: " + serviceHost + ":" + servicePort);
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap
        .group(eventLoopGroup)
        .channel(NioSocketChannel.class)
        .option(ChannelOption.SO_KEEPALIVE, true)//长时间没有通信就发送一个检测包
        .handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel socketChannel) throws Exception {
                socketChannel.pipeline()
                .addLast(new RpcEncoder(RpcRequest.class))
                .addLast(new RpcDecoder(RpcResponse.class))
                .addLast(new NettyRpcReadTimeoutHandler())
                .addLast(nettyRpcClientHandler);
            }


        });

        try {
            if (serviceHost != null && !serviceHost.equals("")) {
                //通过connect()方法建立连接后,就会通过sync()方法进行同步阻塞
                channelFuture = bootstrap.connect(serviceHost, servicePort).sync();
                logger.info("successfully connected.");
            }
        } catch(Exception e) {
            throw new RuntimeException(e);
        }
    }

    public RpcResponse remoteCall(RpcRequest rpcRequest) throws Throwable {
        channelFuture.channel().writeAndFlush(rpcRequest).sync();
        RpcResponse rpcResponse = nettyRpcClientHandler.getRpcResponse();
        logger.info("receives response from netty rpc server.");

        if (rpcResponse.isSuccess()) {
            return rpcResponse;
        }
        throw rpcResponse.getException();
    }
}

相关文章