.Net Core/.net 6/.Net 8 实现Mqtt客户端

时间:2024-03-12 20:11:07
using MQTTnet; using MQTTnet.Client; using MQTTnet.Packets; using System.Text; namespace Code.Mqtt { /// <summary> /// Mqtt客户端 /// </summary> public class MqttClientBase { /// <summary> /// 客户端 /// </summary> public IMqttClient client; /// <summary> /// 订阅主题列表 /// </summary> public List<string> Topics=new List<string>(); public MqttClientOptions options; public MqttClientBaseOptions _opt; /// <summary> /// 主动断开连接 /// </summary> public bool off = false; public bool isconn = false; /// <summary> /// 创建mqtt客户端,并值接传入初始参数 /// </summary> /// <param name="opt"></param> public MqttClientBase(MqttClientBaseOptions opt) { this._opt = opt; //创建客户端 client = new MqttFactory().CreateMqttClient(); options =new MqttClientOptions() { ClientId=_opt.clientId, ChannelOptions=new MqttClientTcpOptions() { Server=_opt.server, Port=_opt.port, }, Credentials=new MqttClientCredentials(_opt.username,Encoding.UTF8.GetBytes(_opt.password)), //清理会话 CleanSession=false, //设置心跳 KeepAlivePeriod = TimeSpan.FromSeconds(30) }; } /// <summary> /// 创建mqtt客户端,不传参数, /// 必须在调用 Connect之前调用过SetOption方法 /// </summary> public MqttClientBase() { //创建客户端 client = new MqttFactory().CreateMqttClient(); } /// <summary> /// 设置参数 /// </summary> /// <param name="opt"></param> public void SetOption(MqttClientBaseOptions opt) { options = new MqttClientOptions() { ClientId = _opt.clientId, ChannelOptions = new MqttClientTcpOptions() { Server = _opt.server, Port = _opt.port, }, Credentials = new MqttClientCredentials(_opt.username, Encoding.UTF8.GetBytes(_opt.password)), //清理会话 CleanSession = false, //设置心跳 KeepAlivePeriod = TimeSpan.FromSeconds(30) }; } /// <summary> /// 连接服务器 /// </summary> /// <param name="action">连接成功后执行</param> /// <param name="ConnectedAsync">连接成功事件</param> public void Connect(Action<MqttClientConnectedEventArgs> ConnectedAsync=null) { client.ConnectAsync(options); if(ConnectedAsync != null) { //连接成功事件 client.ConnectedAsync += (args) => { ConnectedAsync(args); return Task.CompletedTask; }; } } /// <summary> /// 重连服务器 /// 在连接断开事件中调用,即可实现无限轮询 /// </summary> /// <param name="t">是否重复尝试重连</param> /// <param name="i">尝试次数</param> public void ReConnect() { try { client.ConnectAsync(options).Wait(); } catch (Exception ex) { Console.WriteLine(ex.Message); } } public async Task AddTopic(string topic) { //更新订阅 client.SubscribeAsync(new MqttClientSubscribeOptions() { TopicFilters = new List<MqttTopicFilter>() { new MqttTopicFilter { Topic = topic } } }); //将主题名称加入列表 Topics.Add(topic); } /// <summary> /// 取消订阅 /// </summary> /// <param name="topic"></param> /// <returns></returns> public async Task DeleteTopic(string topic) { client.UnsubscribeAsync(new MqttClientUnsubscribeOptions() { TopicFilters = new List<string> { topic } }); Topics.Remove(topic); } /// <summary> /// 发布消息 /// </summary> /// <param name="topic">主题</param> /// <param name="content">内容</param> /// <returns></returns> public async Task Publish(string topic, string content) { if(client.IsConnected) { client.PublishAsync(new MqttApplicationMessage() { Topic = topic, Payload = Encoding.UTF8.GetBytes(content) }); } } /// <summary> /// 主动断开连接 /// </summary> public void Disconnect() { off = true; client.DisconnectAsync(); } /// <summary> /// 断开连接事件 /// </summary> /// <param name="action"></param> /// <returns></returns> public async Task DisconnectedAsync(Action<MqttClientDisconnectedEventArgs> action) { client.DisconnectedAsync += (args) => { action(args); return Task.CompletedTask; }; } /// <summary> /// 接收消息事件 /// </summary> /// <param name="action"></param> /// <returns></returns> public async Task Message(Action<string,string> action) { client.ApplicationMessageReceivedAsync += (args) => { var topic = args.ApplicationMessage.Topic; var msg = args.ApplicationMessage.Payload.BToString(); action(topic, msg); return Task.CompletedTask; }; } } }