.Net Core/.net 6/.Net 8 实现Mqtt客户端
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Packets;
using System.Text;
namespace Code.Mqtt
{
/// <summary>
/// Mqtt客户端
/// </summary>
public class MqttClientBase
{
/// <summary>
/// 客户端
/// </summary>
public IMqttClient client;
/// <summary>
/// 订阅主题列表
/// </summary>
public List<string> Topics=new List<string>();
public MqttClientOptions options;
public MqttClientBaseOptions _opt;
/// <summary>
/// 主动断开连接
/// </summary>
public bool off = false;
public bool isconn = false;
/// <summary>
/// 创建mqtt客户端,并值接传入初始参数
/// </summary>
/// <param name="opt"></param>
public MqttClientBase(MqttClientBaseOptions opt)
{
this._opt = opt;
//创建客户端
client = new MqttFactory().CreateMqttClient();
options =new MqttClientOptions() {
ClientId=_opt.clientId,
ChannelOptions=new MqttClientTcpOptions()
{
Server=_opt.server,
Port=_opt.port,
},
Credentials=new MqttClientCredentials(_opt.username,Encoding.UTF8.GetBytes(_opt.password)),
//清理会话
CleanSession=false,
//设置心跳
KeepAlivePeriod = TimeSpan.FromSeconds(30)
};
}
/// <summary>
/// 创建mqtt客户端,不传参数,
/// 必须在调用 Connect之前调用过SetOption方法
/// </summary>
public MqttClientBase()
{
//创建客户端
client = new MqttFactory().CreateMqttClient();
}
/// <summary>
/// 设置参数
/// </summary>
/// <param name="opt"></param>
public void SetOption(MqttClientBaseOptions opt)
{
options = new MqttClientOptions()
{
ClientId = _opt.clientId,
ChannelOptions = new MqttClientTcpOptions()
{
Server = _opt.server,
Port = _opt.port,
},
Credentials = new MqttClientCredentials(_opt.username, Encoding.UTF8.GetBytes(_opt.password)),
//清理会话
CleanSession = false,
//设置心跳
KeepAlivePeriod = TimeSpan.FromSeconds(30)
};
}
/// <summary>
/// 连接服务器
/// </summary>
/// <param name="action">连接成功后执行</param>
/// <param name="ConnectedAsync">连接成功事件</param>
public void Connect(Action<MqttClientConnectedEventArgs> ConnectedAsync=null)
{
client.ConnectAsync(options);
if(ConnectedAsync != null)
{
//连接成功事件
client.ConnectedAsync += (args) =>
{
ConnectedAsync(args);
return Task.CompletedTask;
};
}
}
/// <summary>
/// 重连服务器
/// 在连接断开事件中调用,即可实现无限轮询
/// </summary>
/// <param name="t">是否重复尝试重连</param>
/// <param name="i">尝试次数</param>
public void ReConnect()
{
try
{
client.ConnectAsync(options).Wait();
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
public async Task AddTopic(string topic)
{
//更新订阅
client.SubscribeAsync(new MqttClientSubscribeOptions()
{
TopicFilters = new List<MqttTopicFilter>() {
new MqttTopicFilter { Topic = topic }
}
});
//将主题名称加入列表
Topics.Add(topic);
}
/// <summary>
/// 取消订阅
/// </summary>
/// <param name="topic"></param>
/// <returns></returns>
public async Task DeleteTopic(string topic)
{
client.UnsubscribeAsync(new MqttClientUnsubscribeOptions()
{
TopicFilters = new List<string> { topic }
});
Topics.Remove(topic);
}
/// <summary>
/// 发布消息
/// </summary>
/// <param name="topic">主题</param>
/// <param name="content">内容</param>
/// <returns></returns>
public async Task Publish(string topic, string content)
{
if(client.IsConnected)
{
client.PublishAsync(new MqttApplicationMessage()
{
Topic = topic,
Payload = Encoding.UTF8.GetBytes(content)
});
}
}
/// <summary>
/// 主动断开连接
/// </summary>
public void Disconnect()
{
off = true;
client.DisconnectAsync();
}
/// <summary>
/// 断开连接事件
/// </summary>
/// <param name="action"></param>
/// <returns></returns>
public async Task DisconnectedAsync(Action<MqttClientDisconnectedEventArgs> action)
{
client.DisconnectedAsync += (args) => {
action(args);
return Task.CompletedTask;
};
}
/// <summary>
/// 接收消息事件
/// </summary>
/// <param name="action"></param>
/// <returns></returns>
public async Task Message(Action<string,string> action) {
client.ApplicationMessageReceivedAsync += (args) =>
{
var topic = args.ApplicationMessage.Topic;
var msg = args.ApplicationMessage.Payload.BToString();
action(topic, msg);
return Task.CompletedTask;
};
}
}
}