复习RPC原理及其简单实现

时间:2022-03-24 16:26:11

前言:2018年7月27日,这是博客园的第一篇文章,我是一个懒人,大概6,7年前就开始关注博客园,从博客园上面汲取些前者之辈的一些文章,博客园在我java接触的生涯中给了我经验和信心,但是因为我自身感觉一直是菜鸡选手的水平徘徊着,所以一直没考虑开通并书写文章,所以很惭愧的是直到至今才开通我的博客园账号,从现在起可能会养成一个好习惯,代码总结的经验要从代码注释、手写笔记上转移到写博客成为一种习惯,近期也打算换工作了,告诉自己要加油。

第一篇文章是我这些天看到各大互联网公司面试要求需要会一些RPC常用框架,但是我现在所处的公司为一家传统企业,软件部分还处在利用早期基础流行框架的阶段,项目架构也相对老套,对于RPC这种技术层面少有接触,那么RPC是什么相信广大读者早已在百度上将这个词汇搜索过,RPC是基于网络通信底层为基础建立起的一种可对单机或不同及机器之间对不同业务之间的一种交互调用手段,是一种协议,目前市面上常用的RPC框架通常都是基于TCP通信协议, 通过对不同种类传输体的序列化手段可作用且不限于具体的编程语言,因我个人偏重JAVA开发,则文章下面的例子中均采用JAVA语言代码的形式作原理说明,RPC协议需要区分与HTTP协议的不同(网络层数、应用场景),但是底层都离不开网络通信协议,例如TCP、UDP等,关于RPC的具体调用过程我以绘画组织架构形式先展出:
复习RPC原理及其简单实现

底层网络通信主要由jdk net包中socket套接技术实现,技术上则主要利用反射原理及类加载机制(动态代理),通过搭建好目标(服务端),根据目标各项参数需要搭建调用方(客户端),只要在双方提供及调用的接口名称、参数、建立连接的各项参数指标一致,那么通过双方的代理解释器执行,必然会建立一致调用的关系桥梁,代码如下:

注:此篇文章大致只为了演示rpc基础原理实现,则实现交互基于阻塞式的IO,正常情况当并发量大时及配合其他流程的正常运转我们通常应改为非阻塞NIO,异步运行机制,市面上常用的RPC框架基本已经成熟,大多可为用户主动提供IO与NIO、不同编程语言、不同序列化方式、不同类型返回值等供用户根据实际场景选择,了解本文实现后对rpc框架源码解析也会很很大的帮助。
服务端提供的API接口信息:

public interface watchService {

    public String scWatchDoing(String strName);
}

服务端基于接口的实现内容:

public class watchServiceImpl implements watchService {
    @Override
    public String scWatchDoing(String strName) {

        StringBuilder strMsg = new StringBuilder();
        strMsg.append("来自聪聪服务端您调用接口反馈的消息:");

        if("morning".equals(strName)){

            strMsg.append("早上好,现在是8点28分,未迟到,滴~老司机卡,您已成功打卡!");
            System.out.println("早上8点半准时上班,不许迟到,打卡调用");
        }else if ("afternoon".equals(strName) || "night".equals(strName)){
            strMsg.append("到了下午下班时间了,您确定要准时下班打卡吗?确定吗?确定吗?确定吗?");
            System.out.println("下午5点45分下班打卡,你很危险哦~,下班打卡调用");
        }else{
            strMsg.append("你是不是不知道干撒,不知道赶紧加班去!!!");
            System.out.println("迷茫调用者调用!");
        }

        return strMsg.toString();
    }
}

服务端提供服务注册,挂载服务待客户端调用:

public class watchProxy {

    private static final int MAX_PORT = 65535;

    public static void watchProxyInit(int iPort, final Object allService) throws ScException,IOException{

        // 系统端口能够提供的最大值及最小值范围 1~65535
        if(iPort < 0 || iPort > watchProxy.MAX_PORT ){
            throw new ScException("服务端端口设置有误!");
        }

        // 需要判断,本次作为通用挂载服务,可接受任意业务接口承载
        if(null == allService){
            throw new ScException("传入的service为空或非法传入");
        }

        // 传入端口,以端口为参数之一建立socket连接挂载服务
        ServerSocket socket = new ServerSocket(iPort);
        try{
            while (true){
                // 等待接收
                try(Socket soc =  socket.accept()){

                    // 接收客户端调用对象
                    ObjectInputStream inputStream = new ObjectInputStream(soc.getInputStream());
                    try{

                        // 获取对象流中的方法名(UTF8)
                        String method = inputStream.readUTF();

                        // 解析客户端传入的方法参数个数及对应的参数类型,存入容器
                        Class<?>[] parameters = (Class<?>[])inputStream.readObject();

                        // 解析客户端传入的参数值,存入容器
                        Object[] objs = (Object[]) inputStream.readObject();

                        // 根据传入方法名及参数类型、个数从watchService的Class中获取方法找寻符合的方法
                        Method watchMethod =  allService.getClass().getMethod(method, parameters);

                        // 执行方法调用,根据上个步骤获取到的匹配方法,对应将参数值放入指定的对象方法中
                        // 而且这步直接根据方法内条件选择获取到执行结果,因方法有返回值,则以object形式返回
                        Object pjObj =  watchMethod.invoke(allService, objs);

                        // 从服务端方法实现中获取到的返回值以object流形式放入输出流,准备向客户端输出
                        // 输出载体为双方建立的socket链接中获取输出流为载体
                        ObjectOutputStream outputStream = new ObjectOutputStream(soc.getOutputStream());

                        try{

                            // 放置获取方法体返回参数object置入以socket双方建立好的连接为载体,通过对象输出流写入
                            // 的方式将返回参数写入输出,返回给客户端作为服务端的响应
                            outputStream.writeObject(pjObj);
                        }catch (Exception e){
                            e.printStackTrace();
                        }finally {
                            outputStream.close();
                        }
                    }finally {
                        inputStream.close();
                    }
                }
            }
        }catch (Exception e) {
            e.printStackTrace();
        }finally {
            if(null != socket){
                try {
                    socket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

服务端建立rpc连接服务(提供向外展示接口及接口实现):

public class ScRpcCreate {

    public static void main(String[] args) {
        System.out.println("SCRPC服务端开始启动!!!");
        try {
            watchProxy.watchProxyInit(133, new watchServiceImpl());
        } catch (ScException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

客户端
客户端首先需要和服务端一样的接口,正常在工作中和客户对接中需要提供服务的一方提供:

public interface watchService {

    public String scWatchDoing(String strName);
}

客户端代理解析实现(通用):
Handler类,需要实现InvocationHandler(java.lang.reflect.InvocationHandler),重写invoke业务逻辑

public class MyHandler implements InvocationHandler {

    private String strUrl;
    private Integer iPort;

    public MyHandler(String strUrl, Integer iPort){
        super();
        this.strUrl = strUrl;
        this.iPort = iPort;
    }
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

        // 客户端调用传入服务端目标的url及端口获取socket连接对象
        Socket socket = new Socket(strUrl, iPort);

        // 获取连接成功(即不抛出connect连接异常视为连接成功)后,获取套接对象输出流
        ObjectOutputStream outs = new ObjectOutputStream(socket.getOutputStream());

        // 将调用的方法名、参数类型、参数个数放入输出流中
        outs.writeUTF(method.getName());
        outs.writeObject(method.getParameterTypes());
        outs.writeObject(args);

        // 调用获取来自服务端给出的输入结果
        ObjectInputStream ins = new ObjectInputStream(socket.getInputStream());

        // 转换为object对象返回给代理解释器执行
        Object obj = ins.readObject();

        outs.close();
        ins.close();
        socket.close();

        return  obj;
    }

    public String getStrUrl() {
        return strUrl;
    }

    public void setStrUrl(String strUrl) {
        this.strUrl = strUrl;
    }

    public Integer getIPort() {
        return iPort;
    }

    public void setIPort(Integer iPort) {
        this.iPort = iPort;
    }
}

客户端解释器逻辑,利用java动态代理机制,给客户端创建handler(上一步),此步利用动态代理获取对象实例:

public class SCRpcClient {

    public static <T> T ScClientProxy(final Class<T> classes, final String url, final int port){

        // 代理解释器:直接会根据socket调用服务端返回的方法、参数等信息通过反射获取对象实例
        // 获取对象实例:根据传回的方法具体对象,利用类加载器加载机制解释为匹配到的Service方法对象(此处为通用!!!)
        return (T)Proxy.newProxyInstance(classes.getClassLoader(), new Class<?>[]{classes}, new MyHandler(url, port));
    }
}

客户端调用实例

public class ScRpcChange {
    public static void main(String[] args) {
        // 传入目标服务器地址,端口进行获取解释后的service对象
        Object obj = SCRpcClient.ScClientProxy(watchService.class, "127.0.0.1", 1333);

        // 转为由服务端提供客户端调用的那个适配的service接口对象
        watchService watchService = (watchService) obj;

        // 此步同调用本地方法一样调用接口内的方法,并赋予正确参数即可得到方法返回值
        // 客户端无需考虑接口内的方法具体如何实现,只管调用即可
        String result = watchService.scWatchDoing("morning");

        System.out.println("收到的消息为:----" + result);
        
    }
}

具体结果如下

复习RPC原理及其简单实现
服务端启动执行后,会挂载服务,等待接收

复习RPC原理及其简单实现
客户端直接以main主函数的形式执行调用服务端提供的接口,注意:仅需要服务端提供接口文件即可

复习RPC原理及其简单实现
服务端输出调用过程中测试输出的选项提示,根据图上结果显示调用成功

复习RPC原理及其简单实现
我以项目中交互常用的jar包提供方式手段将服务端提供的客户端代理解析、及自定义异常、工具类等,客户端配合接口文档直接build jar调用即可

此次RPC调用方式除无具体序列化实现外,其余同Binary-RPC:Hessian的调用方式基本无异,Hessian在我个人的工作中使用频率较多,利用Hessian我们做过cs及sc的调用,有时间研究hessian的小伙伴可以看下源码,底层基于socket通信实现基本无异,只是增加了一些诸如通过转换对象为二进制的方式进行C <--> S交互、异步运行,加入非阻塞(netty)等机制的封装度较高的RPC协议框架,方便使用,上手难度小。

今天先写到这里,试写一下,谢谢~