C# MQTT多设备联动工具类开发指南

时间:2025-04-21 16:04:37

一、开发环境搭建 1.1 环境要求 • .NET版本:.NET 6+(支持异步编程模型)

• NuGet包:

<PackageReference Include="MQTTnet" Version="4.3.1.873"/>
<PackageReference Include="MQTTnet.Extensions.ManagedClient" Version="4.3.1.873"/>

• 推荐IDE:Visual Studio 2025(需安装.NET跨平台开发套件)


二、工具类架构设计

2.1 类结构规划

public class MQTTDeviceManager : IDisposable
{
    // 核心组件
    private IMqttClient _mqttClient;
    private ManagedMqttClientOptions _managedOptions;
    private DeviceRegistry _deviceRegistry; // 设备注册表
    
    // 事件系统
    public event EventHandler<DeviceStatusEventArgs> DeviceStatusChanged;
    public event EventHandler<MessageReceivedEventArgs> GroupMessageReceived;
    
    // 配置参数
    private MQTTConfig _config;
    private Timer _heartbeatTimer;
    
    // 线程安全集合
    private ConcurrentDictionary<string, DeviceContext> _activeDevices = new();
}

2.2 功能模块划分

模块

功能描述

连接管理

支持TLS加密、断线重连、心跳检测

设备注册

设备ID管理、元数据存储、状态追踪

消息路由

主题通配符匹配、QoS级别控制、消息分发策略

联动控制

设备组管理、触发条件配置、协同动作执行

异常处理

网络中断恢复、消息重试队列、错误日志记录


三、核心功能实现

3.1 安全连接建立

public async Task ConnectAsync()
{
    var tlsOptions = new MqttClientOptionsBuilderTlsParameters
    {
        UseTls = _config.UseTls,
        Certificates = new X509Certificate2Collection(_config.ClientCertificate),
        AllowUntrustedCertificates = true
    };

    var clientOptions = new MqttClientOptionsBuilder()
        .WithTcpServer(_config.BrokerAddress, _config.Port)
        .WithClientId(Guid.NewGuid().ToString())
        .WithCredentials(_config.Username, _config.Password)
        .WithTls(tlsOptions)
        .WithKeepAlivePeriod(TimeSpan.FromSeconds(30))
        .Build();

    _managedOptions = new ManagedMqttClientOptionsBuilder()
        .WithClientOptions(clientOptions)
        .WithMaxPendingMessages(1000)
        .WithAutoReconnectDelay(TimeSpan.FromSeconds(5))
        .Build();

    _mqttClient = new MqttFactory().CreateManagedMqttClient();
    await _mqttClient.StartAsync(_managedOptions);
    
    // 订阅设备状态主题
    await SubscribeAsync("devices/+/status");
}

3.2 设备注册管理

public class DeviceRegistry
{
    private readonly ConcurrentDictionary<string, DeviceProfile> _devices = new();
    
    public void RegisterDevice(DeviceProfile device)
    {
        if (!_devices.TryAdd(device.DeviceId, device))
            throw new DeviceConflictException($"Device {device.DeviceId} already registered");
        
        // 初始化设备上下文
        var context = new DeviceContext
        {
            LastSeen = DateTime.UtcNow,
            Metadata = device.Metadata,
            Status = DeviceStatus.Offline
        };
        _activeDevices.TryAdd(device.DeviceId, context);
    }
    
    public void UpdateDeviceStatus(string deviceId, DeviceStatus status)
    {
        if (_activeDevices.TryGetValue(deviceId, out var context))
        {
            context.Status = status;
            context.LastSeen = DateTime.UtcNow;
            DeviceStatusChanged?.Invoke(this, 
                new DeviceStatusEventArgs(deviceId, status));
        }
    }
}

3.3 多设备联动控制

public async Task ExecuteGroupAction(string groupId, DeviceAction action)
{
    var groupDevices = _deviceRegistry.GetDevicesByGroup(groupId);
    var tasks = new List<Task>();
    
    foreach (var device in groupDevices)
    {
        var message = new MqttApplicationMessageBuilder()
            .WithTopic($"devices/{device.DeviceId}/actions")
            .WithPayload(JsonSerializer.Serialize(action))
            .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
            .Build();
        
        tasks.Add(_mqttClient.PublishAsync(message));
    }
    
    await Task.WhenAll(tasks);
    
    // 触发联动规则检查
    CheckTriggerConditions(groupId);
}

private void CheckTriggerConditions(string groupId)
{
    var rules = _triggerRules.GetRulesForGroup(groupId);
    foreach (var rule in rules.Where(r => r.ConditionType == ConditionType.Sequential))
    {
        if (rule.CheckCondition(_activeDevices))
        {
            ExecuteActionSequence(rule.ActionSequence);
        }
    }
}

四、高级功能实现

4.1 断线重连策略

private void ConfigureReconnection()
{
    _mqttClient.ConnectingFailedHandler = new ConnectingFailedHandlerDelegate(e =>
    {
        _logger.LogError($"连接失败: {e.Exception.Message}");
        
        // 指数退避策略
        var delay = TimeSpan.FromSeconds(Math.Min(30, Math.Pow(2, e.ConnectCount)));
        Task.Delay(delay).ContinueWith(_ => 
            _mqttClient.StartAsync(_managedOptions));
    });
    
    _mqttClient.ConnectionStateChangedHandler = new ConnectionStateChangedHandlerDelegate(e =>
    {
        if (e.NewState == ManagedMqttClientConnectionState.Connected)
        {
            // 恢复订阅和设备状态
            RecoverSubscriptions();
            RefreshDeviceStatuses();
        }
    });
}

4.2 消息持久化队列

public class MessageQueueService
{
    private readonly ConcurrentQueue<MqttApplicationMessage> _pendingMessages = new();
    private readonly object _lock = new object();
    
    public void EnqueueMessage(MqttApplicationMessage message)
    {
        lock (_lock)
        {
            _pendingMessages.Enqueue(message);
            if (_pendingMessages.Count > 1000)
            {
                _pendingMessages.TryDequeue(out _);
            }
        }
    }
    
    public async Task ProcessQueueAsync()
    {
        while (_pendingMessages.TryDequeue(out var message))
        {
            try
            {
                await _mqttClient.PublishAsync(message);
            }
            catch (Exception ex)
            {
                _logger.LogError($"消息重试失败: {ex.Message}");
                EnqueueMessage(message); // 重新入队
            }
        }
    }
}

五、使用示例

5.1 初始化配置

var config = new MQTTConfig
{
    BrokerAddress = "iot.example.com",
    Port = 8883,
    UseTls = true,
    ClientCertificate = LoadCertificate("client.pfx"),
    GroupPollingInterval = TimeSpan.FromMinutes(5)
};

var deviceManager = new MQTTDeviceManager(config);
await deviceManager.ConnectAsync();

// 注册温度传感器
deviceManager.RegisterDevice(new DeviceProfile
{
    DeviceId = "temp-sensor-01",
    DeviceType = DeviceType.Sensor,
    Metadata = new Dictionary<string, object>
    {
        {"Location", "RoomA"},
        {"Threshold", 30.0}
    }
});

5.2 联动规则配置

{
  "RuleId": "overheat-protection",
  "ConditionType": "Threshold",
  "Parameters": {
    "DeviceId": "temp-sensor-01",
    "Metric": "temperature",
    "Operator": ">",
    "Value": 35.0
  },
  "Actions": [
    {
      "TargetGroup": "cooling-fans",
      "Command": "FAN_SPEED_HIGH"
    },
    {
      "TargetDevice": "alert-led-01",
      "Command": "BLINK_RED"
    }
  ]
}

六、性能优化策略

6.1 消息批处理

public class MessageBatcher
{
    private readonly List<MqttApplicationMessage> _batch = new List<MqttApplicationMessage>();
    private readonly int _batchSize = 50;
    private readonly TimeSpan _maxDelay = TimeSpan.FromMilliseconds(100);
    
    public async Task AddMessageAsync(MqttApplicationMessage message)
    {
        _batch.Add(message);
        if (_batch.Count >= _batchSize || 
            DateTime.UtcNow - _lastFlush > _maxDelay)
        {
            await FlushBatchAsync();
        }
    }
    
    private async Task FlushBatchAsync()
    {
        var messages = _batch.ToArray();
        var publishTasks = messages.Select(m => 
            _mqttClient.PublishAsync(m)).ToArray();
        
        await Task.WhenAll(publishTasks);
        _batch.Clear();
        _lastFlush = DateTime.UtcNow;
    }
}

6.2 资源监控

public class ResourceMonitor : IAsyncDisposable
{
    private readonly PerformanceCounter _cpuCounter = new PerformanceCounter();
    private readonly Timer _monitorTimer;
    
    public ResourceMonitor()
    {
        _monitorTimer = new Timer(async _ => 
        {
            var cpuUsage = await GetCpuUsageAsync();
            var memoryUsage = GetMemoryUsage();
            
            if (cpuUsage > 90)
            {
                AdjustThreadPool(WorkerThreads.Increment);
            }
        }, null, TimeSpan.Zero, TimeSpan.FromSeconds(5));
    }
    
    private void AdjustThreadPool(int delta)
    {
        ThreadPool.GetMaxThreads(out var worker, out var io);
        ThreadPool.SetMaxThreads(worker + delta, io);
    }
}

七、测试方案

7.1 单元测试用例

[Fact]
public async Task DeviceGroupAction_ShouldTriggerAllDevices()
{
    // 准备
    var mockClient = new Mock<IMqttClient>();
    var manager = new MQTTDeviceManager(mockClient.Object);
    manager.RegisterGroup("lights", new[] { "light-01", "light-02" });

    // 执行
    await manager.ExecuteGroupAction("lights", new { Command = "TURN_ON" });

    // 验证
    mockClient.Verify(m => m.PublishAsync(It.Is<MqttApplicationMessage>(
        msg => msg.Topic.Contains("light-01") && 
               msg.Payload.SequenceEqual("TURN_ON")), Times.Exactly(2));
}

八、完整代码(分步展示)

8.1. 核心工具类 MQTTDeviceManager.cs

using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Extensions.ManagedClient;
using MQTTnet.Protocol;
using System.Collections.Concurrent;
using System.Text.Json;

public class MQTTDeviceManager : IDisposable
{
    #region 字段和属性
    private readonly IManagedMqttClient _mqttClient;
    private readonly ManagedMqttClientOptions _managedOptions;
    private readonly ConcurrentDictionary<string, DeviceContext> _activeDevices = new();
    private readonly DeviceRegistry _deviceRegistry = new();
    private readonly MQTTConfig _config;
    private readonly Timer _heartbeatTimer;
    private readonly MessageQueueService _messageQueue = new();
    private readonly ILogger _logger;
    #endregion

    #region 事件定义
    public event EventHandler<DeviceStatusEventArgs> DeviceStatusChanged;
    public event EventHandler<GroupMessageEventArgs> GroupMessageReceived;
    #endregion

    #region 初始化
    public MQTTDeviceManager(MQTTConfig config, ILogger logger = null)
    {
        _config = config ?? throw new ArgumentNullException(nameof(config));
        _logger = logger;

        // 初始化MQTT客户端
        var factory = new MqttFactory();
        _mqttClient = factory.CreateManagedMqttClient();

        // 配置连接选项
        var clientOptions = new MqttClientOptionsBuilder()
            .WithTcpServer(_config.BrokerAddress, _config.Port)
            .WithClientId(_config.ClientId ?? Guid.NewGuid().ToString())
            .WithCredentials(_config.Username, _config.Password)
            .WithTls(new MqttClientOptionsBuilderTlsParameters
            {
                UseTls = _config.UseTls,
                Certificates = _config.ClientCertificates
            })
            .WithKeepAlivePeriod(TimeSpan.FromSeconds(30))
            .Build();

        _managedOptions = new ManagedMqttClientOptionsBuilder()
            .WithClientOptions(clientOptions)
            .WithAutoReconnectDelay(TimeSpan.FromSeconds(5))
            .WithPendingMessagesOverflowStrategy(MQTTnet.Server.MqttPendingMessagesOverflowStrategy.DropOldest)
            .Build();

        // 心跳检测
        _heartbeatTimer = new Timer(HeartbeatCheck, null, 
            TimeSpan.FromMinutes(1), TimeSpan.FromMinutes(1));

        // 消息处理器
        _mqttClient.ApplicationMessageReceivedAsync += OnMessageReceivedAsync;
    }
    #endregion

    #region 连接管理
    public async Task ConnectAsync()
    {
        await _mqttClient.StartAsync(_managedOptions);
        await SubscribeSystemTopics();
    }

    private async Task SubscribeSystemTopics()
    {
        await _mqttClient.SubscribeAsync(new[]
        {
            new MqttTopicFilterBuilder().WithTopic("devices/+/status").Build(),
            new MqttTopicFilterBuilder().WithTopic("groups/#").Build()
        });
    }
    #endregion

    #region 设备管理
    public void RegisterDevice(DeviceProfile device)
    {
        _deviceRegistry.Register(device);
        _activeDevices.TryAdd(device.DeviceId, new DeviceContext
        {
            Status = DeviceStatus.Offline,
            LastSeen = DateTime.MinValue
        });
    }

    public void UnregisterDevice(string deviceId)
    {
        _deviceRegistry.Unregister(deviceId);
        _activeDevices.TryRemove(deviceId, out _);
    }
    #endregion

    #region 消息处理
    private async Task OnMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs e)
    {
        try
        {
            var topic = e.ApplicationMessage.Topic;
            var payload = e.ApplicationMessage.ConvertPayloadToString();

            // 设备状态更新
            if (topic.StartsWith("devices/") && topic.EndsWith("/status"))
            {
                var deviceId = topic.Split('/')[1];
                var status = JsonSerializer.Deserialize<DeviceStatusUpdate>(payload);
                UpdateDeviceStatus(deviceId, status);
            }

            // 组消息处理
            else if (topic.StartsWith("groups/"))
            {
                var groupId = topic.Split('/')[1];
                GroupMessageReceived?.Invoke(this, 
                    new GroupMessageEventArgs(groupId, payload));
            }
        }
        catch (Exception ex)
        {
            _logger?.LogError(ex, "消息处理失败");
        }
    }

    private void UpdateDeviceStatus(string deviceId, DeviceStatusUpdate status)
    {
        if (_activeDevices.TryGetValue(deviceId, out var context))
        {
            context.Status = status.NewStatus;
            context.LastSeen = DateTime.UtcNow;
            DeviceStatusChanged?.Invoke(this, 
                new DeviceStatusEventArgs(deviceId, status.NewStatus));
        }
    }
    #endregion

    #region 联动控制
    public async Task SendGroupCommand(string groupId, object command)
    {
        var devices = _deviceRegistry.GetDevicesInGroup(groupId);
        var tasks = devices.Select(device => 
            PublishAsync($"devices/{device.DeviceId}/cmd", command));

        await Task.WhenAll(tasks);
    }

    public async Task PublishAsync(string topic, object payload, 
        MqttQualityOfServiceLevel qos = MqttQualityOfServiceLevel.AtLeastOnce)
    {
        var message = new MqttApplicationMessageBuilder()
            .WithTopic(topic)
            .WithPayload(JsonSerializer.Serialize(payload))
            .WithQualityOfServiceLevel(qos)
            .Build();

        if (_mqttClient.IsConnected)
        {
            await _mqttClient.EnqueueAsync(message);
        }
        else
        {
            _messageQueue.Enqueue(message);
        }
    }
    #endregion

    #region 辅助方法
    private void HeartbeatCheck(object state)
    {
        foreach (var (deviceId, context) in _activeDevices)
        {
            if (DateTime.UtcNow - context.LastSeen > TimeSpan.FromMinutes(5))
            {
                UpdateDeviceStatus(deviceId, new DeviceStatusUpdate 
                { 
                    NewStatus = DeviceStatus.Offline 
                });
            }
        }
    }
    #endregion

    #region 清理资源
    public void Dispose()
    {
        _heartbeatTimer?.Dispose();
        _mqttClient?.Dispose();
        GC.SuppressFinalize(this);
    }
    #endregion
}

8.2. 支持类定义

8.2.1 设备注册表 DeviceRegistry.cs

public class DeviceRegistry
{
    private readonly ConcurrentDictionary<string, DeviceProfile> _devices = new();
    private readonly ConcurrentDictionary<string, List<string>> _deviceGroups = new();

    public void Register(DeviceProfile device)
    {
        if (!_devices.TryAdd(device.DeviceId, device))
            throw new InvalidOperationException($"设备 {device.DeviceId} 已注册");

        foreach (var group in device.Groups)
        {
            _deviceGroups.GetOrAdd(group, _ => new List<string>()).Add(device.DeviceId);
        }
    }

    public IEnumerable<DeviceProfile> GetDevicesInGroup(string groupId)
    {
        return _deviceGroups.TryGetValue(groupId, out var deviceIds) 
            ? deviceIds.Select(id => _devices[id]) 
            : Enumerable.Empty<DeviceProfile>();
    }
}

8.2.2 消息队列服务 MessageQueueService.cs

public class MessageQueueService
{
    private readonly ConcurrentQueue<MqttApplicationMessage> _queue = new();
    private readonly SemaphoreSlim _lock = new(1, 1);

    public void Enqueue(MqttApplicationMessage message)
    {
        _queue.Enqueue(message);
    }

    public async Task<MqttApplicationMessage[]> FlushAsync()
    {
        await _lock.WaitAsync();
        try
        {
            var messages = _queue.ToArray();
            _queue.Clear();
            return messages;
        }
        finally
        {
            _lock.Release();
        }
    }
}

8.3. 数据模型

8.3.1 设备模型

public class DeviceProfile
{
    public required string DeviceId { get; set; }
    public DeviceType DeviceType { get; set; }
    public List<string> Groups { get; set; } = new();
    public Dictionary<string, object> Metadata { get; set; } = new();
}

public enum DeviceType { Sensor, Actuator, Gateway }

public class DeviceContext
{
    public DeviceStatus Status { get; set; }
    public DateTime LastSeen { get; set; }
}

public enum DeviceStatus { Online, Offline, Error }

8.3.2 事件参数

public class DeviceStatusEventArgs : EventArgs
{
    public string DeviceId { get; }
    public DeviceStatus Status { get; }

    public DeviceStatusEventArgs(string deviceId, DeviceStatus status)
    {
        DeviceId = deviceId;
        Status = status;
    }
}

public class GroupMessageEventArgs : EventArgs
{
    public string GroupId { get; }
    public string Message { get; }

    public GroupMessageEventArgs(string groupId, string message)
    {
        GroupId = groupId;
        Message = message;
    }
}

8.4. 配置类

public class MQTTConfig
{
    public required string BrokerAddress { get; set; }
    public int Port { get; set; } = 1883;
    public string ClientId { get; set; }
    public string Username { get; set; }
    public string Password { get; set; }
    public bool UseTls { get; set; }
    public List<byte[]> ClientCertificates { get; set; } = new();
}

8.5. 使用示例

// 初始化
var config = new MQTTConfig
{
    BrokerAddress = "mqtt.example.com",
    Port = 8883,
    Username = "admin",
    Password = "pass123",
    UseTls = true
};

var manager = new MQTTDeviceManager(config);

// 注册温度传感器
manager.RegisterDevice(new DeviceProfile
{
    DeviceId = "temp-sensor-01",
    DeviceType = DeviceType.Sensor,
    Groups = new List<string> { "room1", "alerts" }
});

// 发送组命令
await manager.SendGroupCommand("room1", new { Command = "READ_ALL" });

// 监听设备状态变化
manager.DeviceStatusChanged += (sender, e) => 
{
    Console.WriteLine($"设备 {e.DeviceId} 状态变更为 {e.Status}");
};

完整项目工程结构:

MQTTToolkit/
├── Core/
│   ├── MQTTDeviceManager.cs
│   ├── DeviceRegistry.cs
├── Services/
│   ├── MessageQueueService.cs
├── Models/
│   ├── Device.cs
│   ├── Events.cs
└── Config/
    ├── MQTTConfig.cs

实际部署时需配合 EMQX 或 Mosquitto 等MQTT Broker使用,生产环境建议启用TLS双向认证。