RabbitMQ 实现远程过程调用RPC

时间:2021-10-17 16:27:43

RPC调用的顺序
1. 在客户端初始化的时候,也就是SimpleRpcClient类初始化的时候,它会随机的创建一个callback队列,用于存放服务的返回值,这个队列是exclusive的。连接断开就没有了。
2. 客户端在发送Request的时候,会加上两个参数:ReplyTo和CorrelationId,前者用于告诉服务返回值放在哪个队列里面或路由,后者用于配对每次的Request。这两个属性都放在客户端发送消息的附带的IBasicProperties字典中。
3. 把消息放入服务的监控队列里,消息里面自然有调用方法的参数。
4. 服务在所监控的队列中收到数据后,进行运算,并把返回值放入到客户端指定的callback队列中去。
5. 客户端在发送完Request后,便去自己创建的callback队列监听,如果获得到数据,则查看里面的CorrelationId,如果和调用Request一致,则返回结果。

Server 端:

 class Program
{
static void Main(string[] args)
{
ConnectionFactory factory = new ConnectionFactory()
{
HostName = "192.168.254.40",
UserName = "admin",
Password = "admin",
}; //第一步:创建connection
var connection = factory.CreateConnection(); //第二步:创建一个channel
var channel = connection.CreateModel(); channel.QueueDeclare("rpc_queue", true, false, false, null); Subscription subscription = new Subscription(channel, "rpc_queue"); MySimpleRpcServer server = new MySimpleRpcServer(subscription); Console.WriteLine("server 端启动完毕!!!"); server.MainLoop(); Console.Read();
}
} public class MySimpleRpcServer : SimpleRpcServer
{
public MySimpleRpcServer(Subscription subscription) : base(subscription)
{ } public override byte[] HandleCall(bool isRedelivered, IBasicProperties requestProperties, byte[] body, out IBasicProperties replyProperties)
{
return base.HandleCall(isRedelivered, requestProperties, body, out replyProperties);
} public override byte[] HandleSimpleCall(bool isRedelivered, IBasicProperties requestProperties, byte[] body, out IBasicProperties replyProperties)
{
replyProperties = null; var msg = string.Format("当前文字长度为:{0}", Encoding.UTF8.GetString(body).Length); return Encoding.UTF8.GetBytes(msg);
//return base.HandleSimpleCall(isRedelivered, requestProperties, body, out replyProperties);
} public override void ProcessRequest(BasicDeliverEventArgs evt)
{
base.ProcessRequest(evt);
}
}

Client 端:

static void Main(string[] args)
{
ConnectionFactory factory = new ConnectionFactory()
{
HostName = "192.168.254.40",
UserName = "admin",
Password = "admin",
}; //第一步:创建connection
var connection = factory.CreateConnection(); //第二步:创建一个channel
var channel = connection.CreateModel(); SimpleRpcClient client = new SimpleRpcClient(channel, string.Empty, ExchangeType.Direct, "rpc_queue"); var bytes = client.Call(Encoding.UTF8.GetBytes("hello world!!!!")); var result = Encoding.UTF8.GetString(bytes);
}