使用Java Socket实现简单版本的Rpc服务

时间:2024-10-15 09:08:51

通过如下demo,希望大家可以快速理解RPC的简单案例。如果对socket不熟悉的话可以先看下我的上篇文章 Java Scoket实现简单的时间服务器-****博客 对socket现有基础了解。

RPC简介

RPC(Remote Procedure Call,远程过程调用)是一种计算机通信协议,它允许一个程序(客户端)通过网络向另一个程序(服务器)请求服务,而无需了解底层网络技术的细节。RPC 使得开发分布式应用程序更加容易,因为它隐藏了网络通信的复杂性,让开发者能够像调用本地函数一样调用远程函数。

RPC 的主要特点:

  1. 透明性:调用远程过程就像调用本地过程一样,无需关心远程调用的细节。
  2. 跨平台:可以在不同的操作系统和编程语言之间进行通信。
  3. 网络中立:RPC 协议可以运行在各种网络协议之上,如 TCP/IP、HTTP 等。
  4. 语言中立:支持多种编程语言,如 C、C++、Java、Python 等。

RPC 的工作流程:

  1. 客户端调用:客户端调用本地的 RPC 库,传递需要远程执行的函数名和参数。
  2. 序列化:RPC 库将参数序列化成网络传输的格式(如 JSON、XML 或二进制格式)。
  3. 发送请求:客户端通过网络发送序列化后的请求到服务器。
  4. 服务器接收:服务器接收到请求后,反序列化参数。
  5. 执行函数:服务器调用相应的函数执行操作。
  6. 返回结果:服务器将执行结果序列化后发送回客户端。
  7. 客户端接收:客户端接收到结果,反序列化并处理。

RPC 的应用场景:

  • 分布式系统:在分布式系统中,不同的服务可能部署在不同的服务器上,RPC 可以方便地进行服务间的通信。
  • 微服务架构:在微服务架构中,各个微服务之间通常通过 RPC 进行通信。
  • 云服务:云服务提供商可能使用 RPC 来允许用户远程调用云服务。

常见的 RPC 框架:

  • gRPC:由 Google 开发的高性能、开源和通用的 RPC 框架,支持多种语言。
  • Apache Thrift:由 Facebook 开发,后来捐赠给 Apache 基金会,支持多种编程语言。
  • Dubbo:阿里巴巴开源的一个高性能的 Java RPC 框架。
  • XML-RPC 和 JSON-RPC:基于 XML 和 JSON 数据格式的简单 RPC 协议。

RPC 是构建现代分布式系统和微服务架构的关键技术之一。如下是个人对RPC服务的理解示图。

代码实现

1.创建Service以及实现类,由于演示使用的是jdk的动态代理,所以必须得通过接口方式声明资源。代码如下:

HelloService.java

public interface HelloService {

    String sayHello();

}

HelloServiceImpl.java 远程访问的接口实现

public class HelloServiceImpl implements HelloService {

    @Override
    public String sayHello() {
        return "hello world !!!";
    }
}

参数传输DTO定义,为了简化get、set,请引入lombok工具包:

import lombok.Data;
import lombok.ToString;
import lombok.experimental.Accessors;

import java.io.Serializable;

@Data
@Accessors(chain = true)
@ToString
public class RequestDTO implements Serializable {

    /**
     * 请求函数名称
     */
    private String methodName;

    /**
     * 请求参数类型
     */
    private Class<?>[] argsType;

    /**
     * 请求的接口
     */
    private String clazz;

    /**
     * 请求参数
     */
    private Object[] params;

    /**
     * 响应结果
     */
    private Object response;

}

服务提供者设计:

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


public class Provide {
    /**
     * 多线程处理请求
     */
    private static final ExecutorService POOL = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

    private static final Map<String,Object> CACHE_MAP = new HashMap<>();

    public static void main(String[] args) throws Exception {
        //注册对象
        registerObject();

        //启动服务
        startServer();
    }

    /**
     * 启动服务端
     */
    private static void startServer() throws Exception {
        ServerSocket serverSocket = new ServerSocket();
        serverSocket.bind(new InetSocketAddress(9999));

        System.out.println("服务器启动成功!!!");

        while (true){
            Socket socket = serverSocket.accept();
            POOL.execute(()->{
                try {
                    ObjectInputStream ois = new ObjectInputStream(socket.getInputStream());
                    RequestDTO request = (RequestDTO)ois.readObject();
                    System.out.println("获取到客户端的请求数据是:"+request);
                    //目标对象
                    Object target = CACHE_MAP.get(request.getClazz());
                    if(null!=target){
                        //反射执行并返回响应结果
                        Method method = target.getClass().getMethod(request.getMethodName(),request.getArgsType());
                        method.setAccessible(true);
                        request.setResponse(method.invoke(target,request.getParams()));
                    }

                    ByteArrayOutputStream bos = new ByteArrayOutputStream();
                    ObjectOutputStream oos = new ObjectOutputStream(bos);
                    oos.writeObject(request);

                    socket.getOutputStream().write(bos.toByteArray());
                    socket.getOutputStream().flush();
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }finally {
                    try {
                        socket.close();
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                }
            });
        }

    }

    /**
     * 注册对象,因为演示,简化自动化扫描注册过程
     */
    private static void registerObject() {
        CACHE_MAP.put(HelloService.class.getSimpleName(),new HelloServiceImpl());
    }

}

服务调用方实现:

1.动态代理

import java.io.*;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


public class DynamicProxy<T> {

    private final String host = "127.0.0.1";

    private final int port = 9999;

    /**
     * 被代理的对象
     */
    private Class<T> clazz;

    public DynamicProxy(Class<T> clazz){
        this.clazz = clazz;
    }

    /**
     * 获取代理对象
     *
     *
     * @return T
     */
    public T getService(){
        return (T) Proxy.newProxyInstance(DynamicProxy.class.getClassLoader(), new Class[]{clazz}, new InvocationHandler() {
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                RequestDTO requestDTO = new RequestDTO()
                        .setMethodName(method.getName())
                        .setParams(args)
                        .setClazz(clazz.getSimpleName())
                        .setArgsType(method.getParameterTypes());

                try (Socket socket = new Socket()){
                    //连接客户端
                    socket.connect(new InetSocketAddress(host,port));

                    //进行参数传输
                    ByteArrayOutputStream baos = new ByteArrayOutputStream();
                    ObjectOutputStream oos = new ObjectOutputStream(baos);
                    oos.writeObject(requestDTO);
                    oos.flush();

                    socket.getOutputStream().write(baos.toByteArray());
                    socket.getOutputStream().flush();

                    ObjectInputStream ois = new ObjectInputStream(socket.getInputStream());
                    Object o = ois.readObject();
                    if(null==o){
                        return null;
                    }
                    return ((RequestDTO)o).getResponse();
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
        });
    }

}

调用过程:


public class Consumer {

    public static void main(String[] args) {
        DynamicProxy<HelloService> helloServiceDynamicProxy = new DynamicProxy<>(HelloService.class);
        HelloService service = helloServiceDynamicProxy.getService();
        System.out.println(service.sayHello());
    }

}

总结

RPC(远程过程调用)是一种允许一个程序(客户端)通过网络向另一个程序(服务器)请求服务的协议。在RPC框架中,客户端可以像调用本地方法一样调用服务器端的方法,而无需关心底层的网络通信细节。

以下是一段基于Java Socket实现的RPC原理的demo总结:

  1. 服务器端(Server)

    • 创建一个服务器端Socket,监听一个端口等待客户端的连接。
    • 接受客户端的连接请求,创建一个新的线程或者使用线程池来处理客户端请求。
    • 接收客户端发送的请求信息,这通常包括方法名、参数类型和参数值等。
    • 根据请求信息,动态调用对应的本地方法,并获取执行结果。
    • 将方法执行结果返回给客户端。
  2. 客户端(Client)

    • 创建一个客户端Socket,连接到服务器端的IP地址和端口。
    • 封装需要远程调用的方法信息,包括方法名、参数等,并将这些信息发送给服务器。
    • 等待服务器处理请求并返回结果。
    • 接收并解析服务器返回的结果,这可能涉及到对象的反序列化。
  3. 序列化与反序列化

    • 为了通过网络传输方法的参数和返回值,需要将对象序列化成字节流。
    • Java提供了Serializable接口来支持对象的序列化。
    • 客户端发送序列化后的对象,服务器端接收后需要反序列化以恢复对象。
  4. 通信协议

    • 定义了客户端和服务器之间的通信协议,包括请求格式和响应格式。
    • 协议需要规定如何表示方法名、参数列表、返回值等信息。
  5. 异常处理

    • 在网络通信中,可能会遇到各种异常情况,如连接失败、数据传输错误等。
    • 需要在客户端和服务器端都实现异常处理机制,确保系统的健壮性。
  6. 多线程处理

    • 服务器端通常需要处理多个客户端的并发请求。
    • 使用线程池可以有效地管理线程资源,提高系统性能。
  7. 安全性

    • 在实际应用中,需要考虑数据的安全性,比如使用SSL/TLS加密Socket通信。

通过这个Demo,我们可以了解到RPC的核心原理和实现方式,虽然这是一个简化的示例,但它展示了RPC框架的基本工作流程。在实际应用中,RPC框架会提供更复杂的功能,如服务注册与发现、负载均衡、超时重试等。

相关文章