.Net Core/.net 6/.Net 8 实现Mqtt服务端
- Mqtt服务端代码
- `IMqttServer` 接口
- 业务类,实现 `IMqttServer` 接口
- `Program.cs`
直接上代码
nuget 引用
MQTTnet
Mqtt服务端代码
using MQTTnet;
using MQTTnet.Protocol;
using MQTTnet.Server;
namespace Code.Mqtt
{
/// <summary>
/// mqtt服务端
/// </summary>
public class MqttServerBase
{
public MqttServer _server;
readonly IMqttServer _mqttServer;
/// <summary>
/// 向指定主题发送消息
/// </summary>
public Action<string, string> ToTopic;
/// <summary>
/// 主题/客户端列表
/// </summary>
public Dictionary<string,List<string>> Topic_Client=new Dictionary<string, List<string>>();
public MqttServerBase(IMqttServer mqttServer)
{
_mqttServer = mqttServer;
if(mqttServer == null)
{
throw new Exception("MqttServer配置错误");
}
var optionbuilder = new MqttServerOptionsBuilder()
.WithDefaultEndpoint()//设置默认地址127.0.0.1
.WithDefaultEndpointPort(_mqttServer.Port);//1883
_server = new MqttFactory().CreateMqttServer(optionbuilder.Build());
ToTopic = (topic, msg) => {
_server.InjectApplicationMessage(
new InjectedMqttApplicationMessage(
new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(msg)
.Build()
)
);
};
_server.ClientConnectedAsync += (e) =>
{
_mqttServer.ClientConnectedAsync(e.ClientId, e);
return Task.CompletedTask;
};
_server.ClientDisconnectedAsync += (e) => {
_mqttServer.ClientDisconnectedAsync(e.ClientId, e);
return Task.CompletedTask;
};
_server.InterceptingPublishAsync += (e)=> {
var msg = e.ApplicationMessage?.PayloadSegment.Array?.BToString();
var Topic = e.ApplicationMessage.Topic;
//判断主题是否存在
if (Topic_Client.ContainsKey(Topic))
{
_mqttServer.InterceptingPublishAsync(e.ClientId, Topic, msg, e, ToTopic);
}
return Task.CompletedTask;
};
_server.ApplicationMessageNotConsumedAsync += (e) => {
var Topic = e.ApplicationMessage.Topic;
var msg = e.ApplicationMessage.PayloadSegment.Array.BToString();
//判断主题是否存在,否则会进入死循环
if (Topic_Client.ContainsKey(Topic))
{
_mqttServer.ApplicationMessageNotConsumedAsync(Topic, msg, e);
}
return Task.CompletedTask;
};
_server.ValidatingConnectionAsync += (e) => {
if (_mqttServer.ValidatingConnectionAsync(e.UserName, e.Password,e.ClientId, e))
{
e.ReasonCode = MqttConnectReasonCode.Success;//验证通过
}
else
{
e.ReasonCode = MqttConnectReasonCode.Banned;//验证不通过
}
return Task.CompletedTask;
};
//订阅主题
_server.ClientSubscribedTopicAsync += (e) =>
{
var _topic = e.TopicFilter.Topic;
//保存主题
if (!Topic_Client.ContainsKey(_topic))
{
Topic_Client.Add(_topic, new List<string>());
}
//添加订阅主题的客户端
if (!Topic_Client[_topic].Any(x=>x== e.ClientId))
{
Topic_Client[_topic].Add(e.ClientId);
}
_mqttServer.ClientSubscribedTopicAsync(e.ClientId, _topic, e);
return Task.CompletedTask;
};
//取消订阅
_server.ClientUnsubscribedTopicAsync += (e) =>
{
var _topic = e.TopicFilter;
//移除客户端
if (!Topic_Client.ContainsKey(_topic))
{
Topic_Client[_topic].Remove(e.ClientId);
if (Topic_Client[_topic].Count == 0)
{
// 移除没有客户端订阅的主题
Topic_Client.Remove(_topic);
}
_mqttServer.ClientUnsubscribedTopicAsync(e.ClientId, e.TopicFilter, e);
}
return Task.CompletedTask;
};
//服务启动事件
_server.StartedAsync += _mqttServer.StartedAsync;
//服务停止事件
_server.StoppedAsync += _mqttServer.StoppedAsync;
Start();
}
public async Task Start()
{
Console.WriteLine("正在启动Mqtt服务");
await _server.StartAsync();
Console.WriteLine("Mqtt服务启动成功,端口:" + _mqttServer.Port);
}
public async Task Stop()
{
Console.WriteLine("正在停止Mqtt服务");
await _server.StopAsync();
Console.WriteLine("Mqtt服务停止");
}
/// <summary>
/// 重启服务
/// </summary>
/// <returns></returns>
public async Task ReStart()
{
await Stop();
await Start();
}
}
}
IMqttServer
接口
using MQTTnet.Server;
namespace Code.Mqtt
{
public interface IMqttServer
{
/// <summary>
/// 服务端口
/// </summary>
int Port { get;}
/// <summary>
/// 服务启动事件
/// </summary>
/// <param name="args"></param>
/// <returns></returns>
public Task StartedAsync(EventArgs args);
/// <summary>
/// 服务停止事件
/// </summary>
/// <param name="args"></param>
/// <returns></returns>
public Task StoppedAsync(EventArgs args);
/// <summary>
/// 客户端上线
/// </summary>
/// <param name="args"></param>
/// <returns></returns>
public Task ClientConnectedAsync(string clientId,ClientConnectedEventArgs args);
/// <summary>
/// 客户端下线
/// </summary>
/// <param name="args"></param>
/// <returns></returns>
public Task ClientDisconnectedAsync(string clientId,ClientDisconnectedEventArgs args);
/// <summary>
/// 消息事件
/// </summary>
/// <param name="args"></param>
/// <param name="ToTopic">发送消息</param>
/// <returns></returns>
public Task InterceptingPublishAsync(string clientId,string Topic,string msg,InterceptingPublishEventArgs args, Action<string, string> ToTopic);
/// <summary>
/// 验证
/// </summary>
/// <param name="username">账号</param>
/// <param name="password">密码</param>
/// <param name="args"></param>
/// <returns></returns>
public bool ValidatingConnectionAsync(string username,string password,string clientId,ValidatingConnectionEventArgs args);
/// <summary>
/// 消息未消费事件
/// </summary>
/// <param name="args"></param>
/// <returns></returns>
public Task ApplicationMessageNotConsumedAsync(string Topic,string msg,ApplicationMessageNotConsumedEventArgs args);
/// <summary>
/// 订阅主题事件
/// </summary>
/// <param name="args"></param>
/// <returns></returns>
public Task ClientSubscribedTopicAsync(string clientId,string Topic,ClientSubscribedTopicEventArgs args);
/// <summary>
/// 取消订阅主题事件
/// </summary>
/// <param name="args"></param>
/// <returns></returns>
public Task ClientUnsubscribedTopicAsync(string clientId,string Topic,ClientUnsubscribedTopicEventArgs args);
}
}
业务类,实现 IMqttServer
接口
public class MqttApp : IMqttServer
{
/// <summary>
/// 服务端口
/// </summary>
int IMqttServer.Port { get => 10883; }
public MqttApp()
{
}
/// <summary>
/// 消息未消费
/// </summary>
/// <param name="Topic">主题</param>
/// <param name="msg">消息内容</param>
/// <param name="args">事件原始参数</param>
/// <returns></returns>
async Task IMqttServer.ApplicationMessageNotConsumedAsync(string Topic,string msg,ApplicationMessageNotConsumedEventArgs args)
{
Console.WriteLine($"消息未消费{Topic}:");
Console.WriteLine(msg);
}
/// <summary>
/// 客户端上线
/// </summary>
/// <param name="clientId">客户端id</param>
/// <param name="args">事件原始参数</param>
/// <returns></returns>
async Task IMqttServer.ClientConnectedAsync(string clientId, ClientConnectedEventArgs args)
{
Console.WriteLine($"客户端上线 id:{clientId}");
}
/// <summary>
/// 客户端下线
/// </summary>
/// <param name="clientId">客户端id</param>
/// <param name="args">事件原始参数</param>
/// <returns></returns>
async Task IMqttServer.ClientDisconnectedAsync(string clientId, ClientDisconnectedEventArgs args)
{
Console.WriteLine($"客户端下线 id:{clientId}");
}
/// <summary>
/// 订阅主题
/// </summary>
/// <param name="clientId">客户端id</param>
/// <param name="Topic">主题</param>
/// <param name="args">事件原始参数</param>
/// <returns></returns>
async Task IMqttServer.ClientSubscribedTopicAsync(string clientId, string Topic, ClientSubscribedTopicEventArgs args)
{
Console.WriteLine($"客户端{clientId}订阅主题:{Topic}");
}
/// <summary>
/// 取消主题订阅
/// </summary>
/// <param name="clientId">客户端id</param>
/// <param name="Topic">主题</param>
/// <param name="args">事件原始参数</param>
/// <returns></returns>
async Task IMqttServer.ClientUnsubscribedTopicAsync(string clientId, string Topic,ClientUnsubscribedTopicEventArgs args)
{
Console.WriteLine($"客户端{clientId} 取消主题订阅:{Topic}");
}
/// <summary>
/// 收到客户端消息
/// </summary>
/// <param name="clientId">客户端id</param>
/// <param name="Topic">主题</param>
/// <param name="msg">消息内容</param>
/// <param name="args">事件原始参数</param>
/// <param name="ToTopic">推送消息到指定主题 ("主题","内容")</param>
/// <returns></returns>
async Task IMqttServer.InterceptingPublishAsync(string clientId, string Topic, string msg, InterceptingPublishEventArgs args, Action<string, string> ToTopic)
{
Console.WriteLine($"客户端{clientId} 主题{Topic} 发送消息 内容:");
Console.WriteLine(msg);
//推送消息到指定主题
ToTopic("主题","内容");
}
/// <summary>
/// 服务启动事件
/// </summary>
/// <param name="args">事件原始参数</param>
/// <returns></returns>
async Task IMqttServer.StartedAsync(EventArgs args)
{
}
/// <summary>
/// 服务停止事件
/// </summary>
/// <param name="args">事件原始参数</param>
/// <returns></returns>
async Task IMqttServer.StoppedAsync(EventArgs args)
{
}
/// <summary>
/// 验证账号密码
/// </summary>
/// <param name="username">账号</param>
/// <param name="password">密码</param>
/// <param name="clientId">客户端id</param>
/// <param name="args">事件原始参数</param>
/// <returns></returns>
bool IMqttServer.ValidatingConnectionAsync(string username, string password, string clientId, ValidatingConnectionEventArgs args)
{
Console.WriteLine($"验证客户端{clientId}信息:{args.UserName} {args.Password}");
return true;//验证通过
//return false;//验证不通过
}
}
Program.cs
// 注入
builder.Services.AddSingleton<IMqttServer, MqttApp>();
builder.Services.AddSingleton<MqttServerBase>();
/*
如果没有下面这段代码,那么程序启动后不会立即启动mqtt服务,需要在控制器注入来初始化实列,
app.Services.GetService 相当于访问了一次对象
*/
//立即启动Mqtt服务
//app.Services.GetService<MqttServerBase>();
//延时启动Mqtt服务
Task.Run(async () => {
await Task.Delay(3000);
app.Services.GetService<MqttServerBase>();
});