编写一个 rpc

时间:2022-02-17 03:34:51

手动编写一个 RPC 调用

package com.alibaba.study.rpc.framework;

import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.ServerSocket;
import java.net.Socket; /**
* Created by baizhuang on 2019-04-03.
*/
public class RpcFramework { /**
* 暴露服务
* service : 服务实现
* port: 服务端口
*/
public static void export(final Object service,int port)throws Exception{
if(service == null){
throw new IllegalArgumentException("service == null");
}
if(port<=0||port>65535){
throw new IllegalArgumentException("port invalid:"+port);
}
System.out.println("Export service:"+service.getClass().getName()+" on port :"+port );
ServerSocket server= new ServerSocket(port);
while(true){
try{
final Socket socket = server.accept();
new Thread(new Runnable() {
@Override
public void run() {
try{
try{
ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
try{
String methodName = input.readUTF();
Class<?>[] parameterTypes = (Class<?>[])input.readObject();
Object[] arguments = (Object[])input.readObject();
ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
try{
Method method = service.getClass().getMethod(methodName,parameterTypes);
Object result = method.invoke(service,arguments);
output.writeObject(result);
}catch (Throwable t){
output.writeObject(t);
}finally {
output.close();
}
}finally {
input.close();
}
}finally {
socket.close();
}
}catch (Exception e){
e.printStackTrace();
}
}
}).start();
}catch (Exception e){
e.printStackTrace();
}
} } /**
* 引用服务
*/ public static <T> T refer(final Class<T> interfaceClass,final String host,final int port)throws Exception{
if(interfaceClass == null){
throw new IllegalArgumentException("interface class == null");
}
if(!interfaceClass.isInterface()){
throw new IllegalArgumentException("The" + interfaceClass.getClass().getName()+"must be interface");
}
if(host == null|| host.length()==0){
throw new IllegalArgumentException("host == null");
}
System.out.println("Get remote Service:"+interfaceClass.getName()+" from server "+host+":"+port);
return (T)Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[]{interfaceClass}, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Socket socket = new Socket(host,port);
try{
ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
try{
output.writeUTF(method.getName());
output.writeObject(method.getParameterTypes());
output.writeObject(args);
ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
try{
Object result = input.readObject();
if(result instanceof Throwable){
throw (Throwable)result;
}
return result;
}finally {
input.close();
}
}finally {
output.close();
}
}finally {
socket.close();
}
}
});
}
}

接口以及实现

package com.alibaba.study.rpc.test;

/**
* Created by baizhuang on 2019-04-03.
*/
public interface HelloService {
String hello(String name);
}
package com.alibaba.study.rpc.test;

/**
* Created by baizhuang on 2019-04-03.
*/
public class HelloServiceImpl implements HelloService{
@Override
public String hello(String name) {
return "Hello"+name;
}
}

服务提供方

package com.alibaba.study.rpc.test;

import com.alibaba.study.rpc.framework.RpcFramework;

/**
* Created by baizhuang on 2019-04-03.
*/
public class RpcProvider {
public static void main(String[] args) throws Exception{
HelloService service = new HelloServiceImpl();
RpcFramework.export(service,1234);
}
}

服务客户端

package com.alibaba.study.rpc.test;

import com.alibaba.study.rpc.framework.RpcFramework;

/**
* Created by baizhuang on 2019-04-03.
*/
public class RpcConsumer {
public static void main(String[] args) throws Exception{
HelloService service = RpcFramework.refer(HelloService.class,"127.0.0.1",1234);
for(int i=0;i<Integer.MAX_VALUE;i++){
String hello = service.hello("World : "+i);
System.out.println(hello);
Thread.sleep(1000);
}
}
}