首先创建一个 数据链接工作单元
发送端和接收端都可以使用
MessageMQUserName MessageMQPassword MessageMQHostName MessageMQQueueName 对应自己的rabbit服务器的username passward 端口 以及队列名称
public class RabbitMQClientUnit : RabbitMQClientIUnit { private readonly IConfiguration _configuration; public RabbitMQClientUnit(IConfiguration configuration) { _configuration = configuration; ConnectionFactory factory = new ConnectionFactory {
UserName = _configuration.GetConnectionString("MessageMQUserName"), Password = _configuration.GetConnectionString("MessageMQPassword"), HostName = _configuration.GetConnectionString("MessageMQHostName"), }; Connection = factory.CreateConnection(); QueueName = _configuration.GetConnectionString("MessageMQQueueName"); Channel = Connection.CreateModel(); } public IConnection Connection { get; } public IModel Channel { get; } public string QueueName { get; } }
在MessageRepository类中使用以下方法
private readonly RabbitMQClientIUnit _rabbitMQClientIUnit; private readonly IConfiguration _configuration; public MessageRepository(RabbitMQClientIUnit rabbitMQClientIUnit, IConfiguration configuration) { _rabbitMQClientIUnit = rabbitMQClientIUnit; _configuration = configuration; } /// <summary> /// MQ下发消息 /// </summary> /// <param name="encryption"></param> public void RabbitMQPush(string encryption) { try { _rabbitMQClientIUnit.Channel.QueueDeclare(_rabbitMQClientIUnit.QueueName, false, false, false, null); var sendBytes = Encoding.UTF8.GetBytes(encryption); //发布消息 _rabbitMQClientIUnit.Channel.BasicPublish("", _configuration.GetConnectionString("MessageExchange"), null, sendBytes); _rabbitMQClientIUnit.Channel.Close(); } catch { throw new ArgumentException("出现异常MQ推送失败"); } }
这样就完成发送端RabbitMQ的编写
接收端稍微有些麻烦 在Core3.1中我也走了一些弯路 一开始想用控制台程序做接收端 但是在linux下面 无法使用console.key 使用console.key会导致启动服务出错
所以只能回归到Core webapi or mvc上面了
编写RabbitListener类代码如下
public class RabbitListener : IHostedService { //private readonly IConnection connection; //private readonly IModel channel; private readonly RabbitMQClientIUnit _rabbitMQClientIUnit; private readonly MessageIService _messageIService; public RabbitListener(RabbitMQClientIUnit rabbitMQClientIUnit, MessageIService messageIService) { _rabbitMQClientIUnit = rabbitMQClientIUnit; _messageIService = messageIService; } public Task StartAsync(CancellationToken cancellationToken) { Register(); return Task.CompletedTask; } // 处理消息的方法 public virtual bool Process(string message) { throw new NotImplementedException(); } // 注册消费者监听在这里 public void Register() { EventingBasicConsumer consumer = new EventingBasicConsumer(_rabbitMQClientIUnit.Channel); //接收到消息事件 consumer.Received += (ch, ea) => {
//切记在.net core 3.1中无法直接使用ea.Body var message = Encoding.UTF8.GetString(ea.Body.ToArray()); Console.WriteLine($"收到消息: {message}"); }; _rabbitMQClientIUnit.Channel.BasicConsume("Message", false, consumer); } public void DeRegister() { _rabbitMQClientIUnit.Connection.Close(); } public Task StopAsync(CancellationToken cancellationToken) { _rabbitMQClientIUnit.Connection.Close(); return Task.CompletedTask; } }
最后一步也是整个接收端的核心注入
在Startup中要使用AddHostedService方法注入
services.AddHostedService<RabbitListener>();