.net 6连接mqtt,emqx服务端

时间:2025-03-27 20:05:31
/// <summary> /// mqtt客户端 /// </summary> public class MqttClientBase { /// <summary> /// 客户端 /// </summary> public IMqttClient client; /// <summary> /// 订阅主题列表 /// </summary> public List<MqttTopicFilter> Topics; /// <summary> /// 初始化客户端 /// </summary> /// <param name="server">服务器地址</param> /// <param name="port">服务器端口 一般为1883</param> /// <param name="username">用户名</param> /// <param name="password">密码</param> /// <param name="cliendID">客户端id</param> /// <param name="OnConnected">连接成功事件</param> public MqttClientBase(string server,int port,string username,string password,string cliendID, Action<MqttClientConnectedEventArgs>? OnConnected = null) { //创建客户端 client = new MqttFactory().CreateMqttClient(); //订阅主题列表 Topics = new List<MqttTopicFilter>(); //客户端参数 var mqttOptions = new MqttClientOptions() { ClientId = cliendID,//客户端id ChannelOptions = new MqttClientTcpOptions() { Server = server,//服务器地址 Port = port//服务器端口 一般为1883 }, //设置用户和密码 Credentials = new MqttClientCredentials(username, Encoding.UTF8.GetBytes(password)), CleanSession = false, //设置心跳 KeepAlivePeriod = TimeSpan.FromSeconds(30), }; if (OnConnected != null) { //Mqtt客户端连接成功 client.ConnectedAsync += (arg) => { OnConnected(arg); return Task.CompletedTask; }; } //连接服务器,这里需要等待异步完成连接,否则后面会报错 var res=client.ConnectAsync(mqttOptions).Result; } /// <summary> /// 连接断开 事件 /// </summary> public void OnDisconnected(Action<MqttClientDisconnectedEventArgs> action) { //Mqtt客户端连接断开 client.DisconnectedAsync += (arg) => { action(arg); return Task.CompletedTask; }; } /// <summary> /// 接收到消息事件 /// 参数1 topic 订阅主题 /// 参数2 data 解析成字符串的内容 /// 参数3 arg 原始消息内容 /// </summary> public void OnMessage(Action<string, string?, MqttApplicationMessageReceivedEventArgs> action) { //接收消息 client.ApplicationMessageReceivedAsync += (arg) => { var buff = arg.ApplicationMessage.Payload; var data = buff.BToString(); action(arg.ApplicationMessage.Topic, data ?? null, arg); return Task.CompletedTask; }; } /// <summary> /// 添加订阅主题 /// </summary> /// <param name="topic"></param> /// <returns></returns> public async Task AddTopic(string topic) { //将主题名称加入列表 Topics.Add(new MqttTopicFilter() { Topic = topic }); //更新订阅 client.SubscribeAsync(new MqttClientSubscribeOptions() { TopicFilters = Topics }); } /// <summary> /// 发送消息 /// </summary> /// <param name="topic">主题</param> /// <param name="content">内容</param> /// <returns></returns> public async Task Publish(string topic, string content) { client.PublishAsync(new MqttApplicationMessage() { Topic = topic, Payload = Encoding.UTF8.GetBytes(content) }); } }