.net 6连接mqtt,emqx服务端
/// <summary>
/// mqtt客户端
/// </summary>
public class MqttClientBase
{
/// <summary>
/// 客户端
/// </summary>
public IMqttClient client;
/// <summary>
/// 订阅主题列表
/// </summary>
public List<MqttTopicFilter> Topics;
/// <summary>
/// 初始化客户端
/// </summary>
/// <param name="server">服务器地址</param>
/// <param name="port">服务器端口 一般为1883</param>
/// <param name="username">用户名</param>
/// <param name="password">密码</param>
/// <param name="cliendID">客户端id</param>
/// <param name="OnConnected">连接成功事件</param>
public MqttClientBase(string server,int port,string username,string password,string cliendID, Action<MqttClientConnectedEventArgs>? OnConnected = null)
{
//创建客户端
client = new MqttFactory().CreateMqttClient();
//订阅主题列表
Topics = new List<MqttTopicFilter>();
//客户端参数
var mqttOptions = new MqttClientOptions()
{
ClientId = cliendID,//客户端id
ChannelOptions = new MqttClientTcpOptions()
{
Server = server,//服务器地址
Port = port//服务器端口 一般为1883
},
//设置用户和密码
Credentials = new MqttClientCredentials(username, Encoding.UTF8.GetBytes(password)),
CleanSession = false,
//设置心跳
KeepAlivePeriod = TimeSpan.FromSeconds(30),
};
if (OnConnected != null)
{
//Mqtt客户端连接成功
client.ConnectedAsync += (arg) =>
{
OnConnected(arg);
return Task.CompletedTask;
};
}
//连接服务器,这里需要等待异步完成连接,否则后面会报错
var res=client.ConnectAsync(mqttOptions).Result;
}
/// <summary>
/// 连接断开 事件
/// </summary>
public void OnDisconnected(Action<MqttClientDisconnectedEventArgs> action)
{
//Mqtt客户端连接断开
client.DisconnectedAsync += (arg) =>
{
action(arg);
return Task.CompletedTask;
};
}
/// <summary>
/// 接收到消息事件
/// 参数1 topic 订阅主题
/// 参数2 data 解析成字符串的内容
/// 参数3 arg 原始消息内容
/// </summary>
public void OnMessage(Action<string, string?, MqttApplicationMessageReceivedEventArgs> action)
{
//接收消息
client.ApplicationMessageReceivedAsync += (arg) => {
var buff = arg.ApplicationMessage.Payload;
var data = buff.BToString();
action(arg.ApplicationMessage.Topic, data ?? null, arg);
return Task.CompletedTask;
};
}
/// <summary>
/// 添加订阅主题
/// </summary>
/// <param name="topic"></param>
/// <returns></returns>
public async Task AddTopic(string topic)
{
//将主题名称加入列表
Topics.Add(new MqttTopicFilter() { Topic = topic });
//更新订阅
client.SubscribeAsync(new MqttClientSubscribeOptions()
{
TopicFilters = Topics
});
}
/// <summary>
/// 发送消息
/// </summary>
/// <param name="topic">主题</param>
/// <param name="content">内容</param>
/// <returns></returns>
public async Task Publish(string topic, string content)
{
client.PublishAsync(new MqttApplicationMessage()
{
Topic = topic,
Payload = Encoding.UTF8.GetBytes(content)
});
}
}