.Net Core/.net 6/.Net 8 实现Mqtt服务器

时间:2024-03-09 21:16:05

.Net Core/.net 6/.Net 8 实现Mqtt服务端

  • Mqtt服务端代码
  • `IMqttServer` 接口
  • 业务类,实现 `IMqttServer` 接口
  • `Program.cs`

直接上代码
nuget 引用
MQTTnet

Mqtt服务端代码



using MQTTnet;
using MQTTnet.Protocol;
using MQTTnet.Server;

namespace Code.Mqtt
{
    /// <summary>
    /// mqtt服务端
    /// </summary>
    public class MqttServerBase
    {


        public MqttServer _server;

        readonly IMqttServer _mqttServer;


        /// <summary>
        /// 向指定主题发送消息
        /// </summary>
        public Action<string, string> ToTopic;


        /// <summary>
        /// 主题/客户端列表
        /// </summary>
        public Dictionary<string,List<string>> Topic_Client=new Dictionary<string, List<string>>();



        public MqttServerBase(IMqttServer mqttServer)
        {
            _mqttServer = mqttServer;

            if(mqttServer == null)
            {
                throw new Exception("MqttServer配置错误");
            }

            var optionbuilder = new MqttServerOptionsBuilder()
           .WithDefaultEndpoint()//设置默认地址127.0.0.1
           .WithDefaultEndpointPort(_mqttServer.Port);//1883

            _server = new MqttFactory().CreateMqttServer(optionbuilder.Build());


            ToTopic = (topic, msg) => {
                _server.InjectApplicationMessage(
                new InjectedMqttApplicationMessage(
                    new MqttApplicationMessageBuilder()
                    .WithTopic(topic)
                    .WithPayload(msg)
                    .Build()
                    )
                );
            };


            _server.ClientConnectedAsync += (e) =>
            {
                _mqttServer.ClientConnectedAsync(e.ClientId, e);
                return Task.CompletedTask;
            };


            _server.ClientDisconnectedAsync += (e) => {
                _mqttServer.ClientDisconnectedAsync(e.ClientId, e);
                return Task.CompletedTask;
            };

            _server.InterceptingPublishAsync += (e)=> {
                var msg = e.ApplicationMessage?.PayloadSegment.Array?.BToString();
                var Topic = e.ApplicationMessage.Topic;

                //判断主题是否存在
                if (Topic_Client.ContainsKey(Topic))
                {
                    _mqttServer.InterceptingPublishAsync(e.ClientId, Topic, msg, e, ToTopic);
                }

                return Task.CompletedTask;
            };


            _server.ApplicationMessageNotConsumedAsync += (e) => {
                var Topic = e.ApplicationMessage.Topic;
                var msg = e.ApplicationMessage.PayloadSegment.Array.BToString();

                //判断主题是否存在,否则会进入死循环
                if (Topic_Client.ContainsKey(Topic))
                {
                    _mqttServer.ApplicationMessageNotConsumedAsync(Topic, msg, e);
                }

                return Task.CompletedTask;
            };


            _server.ValidatingConnectionAsync += (e) => {
                if (_mqttServer.ValidatingConnectionAsync(e.UserName, e.Password,e.ClientId, e))
                {
                    e.ReasonCode = MqttConnectReasonCode.Success;//验证通过
                }
                else
                {
                    e.ReasonCode = MqttConnectReasonCode.Banned;//验证不通过
                }
                return Task.CompletedTask;
            };

            //订阅主题
            _server.ClientSubscribedTopicAsync += (e) =>
            {
                var _topic = e.TopicFilter.Topic;
                //保存主题
                if (!Topic_Client.ContainsKey(_topic))
                {
                    Topic_Client.Add(_topic, new List<string>());
                }

                //添加订阅主题的客户端
                if (!Topic_Client[_topic].Any(x=>x== e.ClientId))
                {
                    Topic_Client[_topic].Add(e.ClientId);
                }

                _mqttServer.ClientSubscribedTopicAsync(e.ClientId, _topic, e);
                return Task.CompletedTask;
            };


            //取消订阅
            _server.ClientUnsubscribedTopicAsync += (e) =>
            {

                var _topic = e.TopicFilter;
                //移除客户端
                if (!Topic_Client.ContainsKey(_topic))
                {
                    Topic_Client[_topic].Remove(e.ClientId);
                    if (Topic_Client[_topic].Count == 0)
                    {
                        // 移除没有客户端订阅的主题
                        Topic_Client.Remove(_topic);
                    }

                    _mqttServer.ClientUnsubscribedTopicAsync(e.ClientId, e.TopicFilter, e);
                }

                return Task.CompletedTask;
            };

            //服务启动事件
            _server.StartedAsync += _mqttServer.StartedAsync;

            //服务停止事件
            _server.StoppedAsync += _mqttServer.StoppedAsync;

            Start();
        }

        public async Task Start()
        {
            Console.WriteLine("正在启动Mqtt服务");
            await _server.StartAsync();
            Console.WriteLine("Mqtt服务启动成功,端口:" + _mqttServer.Port);
        }


        public async Task Stop()
        {
            Console.WriteLine("正在停止Mqtt服务");
            await _server.StopAsync();
            Console.WriteLine("Mqtt服务停止");
        }

        /// <summary>
        /// 重启服务
        /// </summary>
        /// <returns></returns>
        public async Task ReStart()
        {
            await Stop();
            await Start();
        }


        


    }
}



IMqttServer 接口


using MQTTnet.Server;

namespace Code.Mqtt
{
    public interface IMqttServer
    {

        /// <summary>
        /// 服务端口
        /// </summary>
        int Port { get;}


        /// <summary>
        /// 服务启动事件
        /// </summary>
        /// <param name="args"></param>
        /// <returns></returns>
        public Task StartedAsync(EventArgs args);

        /// <summary>
        /// 服务停止事件
        /// </summary>
        /// <param name="args"></param>
        /// <returns></returns>
        public Task StoppedAsync(EventArgs args);


        /// <summary>
        /// 客户端上线
        /// </summary>
        /// <param name="args"></param>
        /// <returns></returns>
        public Task ClientConnectedAsync(string clientId,ClientConnectedEventArgs args);

        /// <summary>
        /// 客户端下线
        /// </summary>
        /// <param name="args"></param>
        /// <returns></returns>
        public Task ClientDisconnectedAsync(string clientId,ClientDisconnectedEventArgs args);

        /// <summary>
        /// 消息事件
        /// </summary>
        /// <param name="args"></param>
        /// <param name="ToTopic">发送消息</param>
        /// <returns></returns>
        public Task InterceptingPublishAsync(string clientId,string Topic,string msg,InterceptingPublishEventArgs args, Action<string, string> ToTopic);

        /// <summary>
        /// 验证
        /// </summary>
        /// <param name="username">账号</param>
        /// <param name="password">密码</param>
        /// <param name="args"></param>
        /// <returns></returns>
        public bool ValidatingConnectionAsync(string username,string password,string clientId,ValidatingConnectionEventArgs args);

        /// <summary>
        /// 消息未消费事件
        /// </summary>
        /// <param name="args"></param>
        /// <returns></returns>
        public Task ApplicationMessageNotConsumedAsync(string Topic,string msg,ApplicationMessageNotConsumedEventArgs args);

        /// <summary>
        /// 订阅主题事件
        /// </summary>
        /// <param name="args"></param>
        /// <returns></returns>
        public Task ClientSubscribedTopicAsync(string clientId,string Topic,ClientSubscribedTopicEventArgs args);

        /// <summary>
        /// 取消订阅主题事件
        /// </summary>
        /// <param name="args"></param>
        /// <returns></returns>
        public Task ClientUnsubscribedTopicAsync(string clientId,string Topic,ClientUnsubscribedTopicEventArgs args);
    }
}

业务类,实现 IMqttServer 接口


    public class MqttApp : IMqttServer
    {

        /// <summary>
        /// 服务端口
        /// </summary>
        int IMqttServer.Port { get => 10883; }

        public MqttApp()
        {

        }

        /// <summary>
        /// 消息未消费
        /// </summary>
        /// <param name="Topic">主题</param>
        /// <param name="msg">消息内容</param>
        /// <param name="args">事件原始参数</param>
        /// <returns></returns>
        async Task IMqttServer.ApplicationMessageNotConsumedAsync(string Topic,string msg,ApplicationMessageNotConsumedEventArgs args)
        {
            Console.WriteLine($"消息未消费{Topic}:");
            Console.WriteLine(msg);
        }

        /// <summary>
        /// 客户端上线
        /// </summary>
        /// <param name="clientId">客户端id</param>
        /// <param name="args">事件原始参数</param>
        /// <returns></returns>
        async Task IMqttServer.ClientConnectedAsync(string clientId, ClientConnectedEventArgs args)
        {
            Console.WriteLine($"客户端上线 id:{clientId}");
        }

        /// <summary>
        /// 客户端下线
        /// </summary>
        /// <param name="clientId">客户端id</param>
        /// <param name="args">事件原始参数</param>
        /// <returns></returns>
        async Task IMqttServer.ClientDisconnectedAsync(string clientId, ClientDisconnectedEventArgs args)
        {
            Console.WriteLine($"客户端下线 id:{clientId}");
        }

        /// <summary>
        /// 订阅主题
        /// </summary>
        /// <param name="clientId">客户端id</param>
        /// <param name="Topic">主题</param>
        /// <param name="args">事件原始参数</param>
        /// <returns></returns>
        async Task IMqttServer.ClientSubscribedTopicAsync(string clientId, string Topic, ClientSubscribedTopicEventArgs args)
        {
            Console.WriteLine($"客户端{clientId}订阅主题:{Topic}");
        }

        /// <summary>
        /// 取消主题订阅
        /// </summary>
        /// <param name="clientId">客户端id</param>
        /// <param name="Topic">主题</param>
        /// <param name="args">事件原始参数</param>
        /// <returns></returns>
        async Task IMqttServer.ClientUnsubscribedTopicAsync(string clientId, string Topic,ClientUnsubscribedTopicEventArgs args)
        {
            Console.WriteLine($"客户端{clientId} 取消主题订阅:{Topic}");
        }


        /// <summary>
        /// 收到客户端消息
        /// </summary>
        /// <param name="clientId">客户端id</param>
        /// <param name="Topic">主题</param>
        /// <param name="msg">消息内容</param>
        /// <param name="args">事件原始参数</param>
        /// <param name="ToTopic">推送消息到指定主题 ("主题","内容")</param>
        /// <returns></returns>
        async Task IMqttServer.InterceptingPublishAsync(string clientId, string Topic, string msg, InterceptingPublishEventArgs args, Action<string, string> ToTopic)
        {
            Console.WriteLine($"客户端{clientId} 主题{Topic} 发送消息 内容:");
            Console.WriteLine(msg);

            //推送消息到指定主题
            ToTopic("主题","内容");
        }


        /// <summary>
        /// 服务启动事件
        /// </summary>
        /// <param name="args">事件原始参数</param>
        /// <returns></returns>
        async Task IMqttServer.StartedAsync(EventArgs args)
        {
        }


        /// <summary>
        /// 服务停止事件
        /// </summary>
        /// <param name="args">事件原始参数</param>
        /// <returns></returns>
        async Task IMqttServer.StoppedAsync(EventArgs args)
        {
        }


        /// <summary>
        /// 验证账号密码
        /// </summary>
        /// <param name="username">账号</param>
        /// <param name="password">密码</param>
        /// <param name="clientId">客户端id</param>
        /// <param name="args">事件原始参数</param>
        /// <returns></returns>
        bool IMqttServer.ValidatingConnectionAsync(string username, string password, string clientId, ValidatingConnectionEventArgs args)
        {
            Console.WriteLine($"验证客户端{clientId}信息:{args.UserName} {args.Password}");

            return true;//验证通过
            //return false;//验证不通过

        }
    }



Program.cs

// 注入
builder.Services.AddSingleton<IMqttServer, MqttApp>();
builder.Services.AddSingleton<MqttServerBase>();

/* 
如果没有下面这段代码,那么程序启动后不会立即启动mqtt服务,需要在控制器注入来初始化实列,
app.Services.GetService 相当于访问了一次对象
*/
//立即启动Mqtt服务
//app.Services.GetService<MqttServerBase>();

//延时启动Mqtt服务
Task.Run(async () => {
    await Task.Delay(3000);
    app.Services.GetService<MqttServerBase>();
});