一、开发环境搭建 1.1 环境要求 • .NET版本:.NET 6+(支持异步编程模型)
• NuGet包:
<PackageReference Include="MQTTnet" Version="4.3.1.873"/>
<PackageReference Include="MQTTnet.Extensions.ManagedClient" Version="4.3.1.873"/>
• 推荐IDE:Visual Studio 2025(需安装.NET跨平台开发套件)
二、工具类架构设计
2.1 类结构规划
public class MQTTDeviceManager : IDisposable
{
// 核心组件
private IMqttClient _mqttClient;
private ManagedMqttClientOptions _managedOptions;
private DeviceRegistry _deviceRegistry; // 设备注册表
// 事件系统
public event EventHandler<DeviceStatusEventArgs> DeviceStatusChanged;
public event EventHandler<MessageReceivedEventArgs> GroupMessageReceived;
// 配置参数
private MQTTConfig _config;
private Timer _heartbeatTimer;
// 线程安全集合
private ConcurrentDictionary<string, DeviceContext> _activeDevices = new();
}
2.2 功能模块划分
模块 |
功能描述 |
连接管理 |
支持TLS加密、断线重连、心跳检测 |
设备注册 |
设备ID管理、元数据存储、状态追踪 |
消息路由 |
主题通配符匹配、QoS级别控制、消息分发策略 |
联动控制 |
设备组管理、触发条件配置、协同动作执行 |
异常处理 |
网络中断恢复、消息重试队列、错误日志记录 |
三、核心功能实现
3.1 安全连接建立
public async Task ConnectAsync()
{
var tlsOptions = new MqttClientOptionsBuilderTlsParameters
{
UseTls = _config.UseTls,
Certificates = new X509Certificate2Collection(_config.ClientCertificate),
AllowUntrustedCertificates = true
};
var clientOptions = new MqttClientOptionsBuilder()
.WithTcpServer(_config.BrokerAddress, _config.Port)
.WithClientId(Guid.NewGuid().ToString())
.WithCredentials(_config.Username, _config.Password)
.WithTls(tlsOptions)
.WithKeepAlivePeriod(TimeSpan.FromSeconds(30))
.Build();
_managedOptions = new ManagedMqttClientOptionsBuilder()
.WithClientOptions(clientOptions)
.WithMaxPendingMessages(1000)
.WithAutoReconnectDelay(TimeSpan.FromSeconds(5))
.Build();
_mqttClient = new MqttFactory().CreateManagedMqttClient();
await _mqttClient.StartAsync(_managedOptions);
// 订阅设备状态主题
await SubscribeAsync("devices/+/status");
}
3.2 设备注册管理
public class DeviceRegistry
{
private readonly ConcurrentDictionary<string, DeviceProfile> _devices = new();
public void RegisterDevice(DeviceProfile device)
{
if (!_devices.TryAdd(device.DeviceId, device))
throw new DeviceConflictException($"Device {device.DeviceId} already registered");
// 初始化设备上下文
var context = new DeviceContext
{
LastSeen = DateTime.UtcNow,
Metadata = device.Metadata,
Status = DeviceStatus.Offline
};
_activeDevices.TryAdd(device.DeviceId, context);
}
public void UpdateDeviceStatus(string deviceId, DeviceStatus status)
{
if (_activeDevices.TryGetValue(deviceId, out var context))
{
context.Status = status;
context.LastSeen = DateTime.UtcNow;
DeviceStatusChanged?.Invoke(this,
new DeviceStatusEventArgs(deviceId, status));
}
}
}
3.3 多设备联动控制
public async Task ExecuteGroupAction(string groupId, DeviceAction action)
{
var groupDevices = _deviceRegistry.GetDevicesByGroup(groupId);
var tasks = new List<Task>();
foreach (var device in groupDevices)
{
var message = new MqttApplicationMessageBuilder()
.WithTopic($"devices/{device.DeviceId}/actions")
.WithPayload(JsonSerializer.Serialize(action))
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
.Build();
tasks.Add(_mqttClient.PublishAsync(message));
}
await Task.WhenAll(tasks);
// 触发联动规则检查
CheckTriggerConditions(groupId);
}
private void CheckTriggerConditions(string groupId)
{
var rules = _triggerRules.GetRulesForGroup(groupId);
foreach (var rule in rules.Where(r => r.ConditionType == ConditionType.Sequential))
{
if (rule.CheckCondition(_activeDevices))
{
ExecuteActionSequence(rule.ActionSequence);
}
}
}
四、高级功能实现
4.1 断线重连策略
private void ConfigureReconnection()
{
_mqttClient.ConnectingFailedHandler = new ConnectingFailedHandlerDelegate(e =>
{
_logger.LogError($"连接失败: {e.Exception.Message}");
// 指数退避策略
var delay = TimeSpan.FromSeconds(Math.Min(30, Math.Pow(2, e.ConnectCount)));
Task.Delay(delay).ContinueWith(_ =>
_mqttClient.StartAsync(_managedOptions));
});
_mqttClient.ConnectionStateChangedHandler = new ConnectionStateChangedHandlerDelegate(e =>
{
if (e.NewState == ManagedMqttClientConnectionState.Connected)
{
// 恢复订阅和设备状态
RecoverSubscriptions();
RefreshDeviceStatuses();
}
});
}
4.2 消息持久化队列
public class MessageQueueService
{
private readonly ConcurrentQueue<MqttApplicationMessage> _pendingMessages = new();
private readonly object _lock = new object();
public void EnqueueMessage(MqttApplicationMessage message)
{
lock (_lock)
{
_pendingMessages.Enqueue(message);
if (_pendingMessages.Count > 1000)
{
_pendingMessages.TryDequeue(out _);
}
}
}
public async Task ProcessQueueAsync()
{
while (_pendingMessages.TryDequeue(out var message))
{
try
{
await _mqttClient.PublishAsync(message);
}
catch (Exception ex)
{
_logger.LogError($"消息重试失败: {ex.Message}");
EnqueueMessage(message); // 重新入队
}
}
}
}
五、使用示例
5.1 初始化配置
var config = new MQTTConfig
{
BrokerAddress = "iot.example.com",
Port = 8883,
UseTls = true,
ClientCertificate = LoadCertificate("client.pfx"),
GroupPollingInterval = TimeSpan.FromMinutes(5)
};
var deviceManager = new MQTTDeviceManager(config);
await deviceManager.ConnectAsync();
// 注册温度传感器
deviceManager.RegisterDevice(new DeviceProfile
{
DeviceId = "temp-sensor-01",
DeviceType = DeviceType.Sensor,
Metadata = new Dictionary<string, object>
{
{"Location", "RoomA"},
{"Threshold", 30.0}
}
});
5.2 联动规则配置
{
"RuleId": "overheat-protection",
"ConditionType": "Threshold",
"Parameters": {
"DeviceId": "temp-sensor-01",
"Metric": "temperature",
"Operator": ">",
"Value": 35.0
},
"Actions": [
{
"TargetGroup": "cooling-fans",
"Command": "FAN_SPEED_HIGH"
},
{
"TargetDevice": "alert-led-01",
"Command": "BLINK_RED"
}
]
}
六、性能优化策略
6.1 消息批处理
public class MessageBatcher
{
private readonly List<MqttApplicationMessage> _batch = new List<MqttApplicationMessage>();
private readonly int _batchSize = 50;
private readonly TimeSpan _maxDelay = TimeSpan.FromMilliseconds(100);
public async Task AddMessageAsync(MqttApplicationMessage message)
{
_batch.Add(message);
if (_batch.Count >= _batchSize ||
DateTime.UtcNow - _lastFlush > _maxDelay)
{
await FlushBatchAsync();
}
}
private async Task FlushBatchAsync()
{
var messages = _batch.ToArray();
var publishTasks = messages.Select(m =>
_mqttClient.PublishAsync(m)).ToArray();
await Task.WhenAll(publishTasks);
_batch.Clear();
_lastFlush = DateTime.UtcNow;
}
}
6.2 资源监控
public class ResourceMonitor : IAsyncDisposable
{
private readonly PerformanceCounter _cpuCounter = new PerformanceCounter();
private readonly Timer _monitorTimer;
public ResourceMonitor()
{
_monitorTimer = new Timer(async _ =>
{
var cpuUsage = await GetCpuUsageAsync();
var memoryUsage = GetMemoryUsage();
if (cpuUsage > 90)
{
AdjustThreadPool(WorkerThreads.Increment);
}
}, null, TimeSpan.Zero, TimeSpan.FromSeconds(5));
}
private void AdjustThreadPool(int delta)
{
ThreadPool.GetMaxThreads(out var worker, out var io);
ThreadPool.SetMaxThreads(worker + delta, io);
}
}
七、测试方案
7.1 单元测试用例
[Fact]
public async Task DeviceGroupAction_ShouldTriggerAllDevices()
{
// 准备
var mockClient = new Mock<IMqttClient>();
var manager = new MQTTDeviceManager(mockClient.Object);
manager.RegisterGroup("lights", new[] { "light-01", "light-02" });
// 执行
await manager.ExecuteGroupAction("lights", new { Command = "TURN_ON" });
// 验证
mockClient.Verify(m => m.PublishAsync(It.Is<MqttApplicationMessage>(
msg => msg.Topic.Contains("light-01") &&
msg.Payload.SequenceEqual("TURN_ON")), Times.Exactly(2));
}
八、完整代码(分步展示)
8.1. 核心工具类 MQTTDeviceManager.cs
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Extensions.ManagedClient;
using MQTTnet.Protocol;
using System.Collections.Concurrent;
using System.Text.Json;
public class MQTTDeviceManager : IDisposable
{
#region 字段和属性
private readonly IManagedMqttClient _mqttClient;
private readonly ManagedMqttClientOptions _managedOptions;
private readonly ConcurrentDictionary<string, DeviceContext> _activeDevices = new();
private readonly DeviceRegistry _deviceRegistry = new();
private readonly MQTTConfig _config;
private readonly Timer _heartbeatTimer;
private readonly MessageQueueService _messageQueue = new();
private readonly ILogger _logger;
#endregion
#region 事件定义
public event EventHandler<DeviceStatusEventArgs> DeviceStatusChanged;
public event EventHandler<GroupMessageEventArgs> GroupMessageReceived;
#endregion
#region 初始化
public MQTTDeviceManager(MQTTConfig config, ILogger logger = null)
{
_config = config ?? throw new ArgumentNullException(nameof(config));
_logger = logger;
// 初始化MQTT客户端
var factory = new MqttFactory();
_mqttClient = factory.CreateManagedMqttClient();
// 配置连接选项
var clientOptions = new MqttClientOptionsBuilder()
.WithTcpServer(_config.BrokerAddress, _config.Port)
.WithClientId(_config.ClientId ?? Guid.NewGuid().ToString())
.WithCredentials(_config.Username, _config.Password)
.WithTls(new MqttClientOptionsBuilderTlsParameters
{
UseTls = _config.UseTls,
Certificates = _config.ClientCertificates
})
.WithKeepAlivePeriod(TimeSpan.FromSeconds(30))
.Build();
_managedOptions = new ManagedMqttClientOptionsBuilder()
.WithClientOptions(clientOptions)
.WithAutoReconnectDelay(TimeSpan.FromSeconds(5))
.WithPendingMessagesOverflowStrategy(MQTTnet.Server.MqttPendingMessagesOverflowStrategy.DropOldest)
.Build();
// 心跳检测
_heartbeatTimer = new Timer(HeartbeatCheck, null,
TimeSpan.FromMinutes(1), TimeSpan.FromMinutes(1));
// 消息处理器
_mqttClient.ApplicationMessageReceivedAsync += OnMessageReceivedAsync;
}
#endregion
#region 连接管理
public async Task ConnectAsync()
{
await _mqttClient.StartAsync(_managedOptions);
await SubscribeSystemTopics();
}
private async Task SubscribeSystemTopics()
{
await _mqttClient.SubscribeAsync(new[]
{
new MqttTopicFilterBuilder().WithTopic("devices/+/status").Build(),
new MqttTopicFilterBuilder().WithTopic("groups/#").Build()
});
}
#endregion
#region 设备管理
public void RegisterDevice(DeviceProfile device)
{
_deviceRegistry.Register(device);
_activeDevices.TryAdd(device.DeviceId, new DeviceContext
{
Status = DeviceStatus.Offline,
LastSeen = DateTime.MinValue
});
}
public void UnregisterDevice(string deviceId)
{
_deviceRegistry.Unregister(deviceId);
_activeDevices.TryRemove(deviceId, out _);
}
#endregion
#region 消息处理
private async Task OnMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs e)
{
try
{
var topic = e.ApplicationMessage.Topic;
var payload = e.ApplicationMessage.ConvertPayloadToString();
// 设备状态更新
if (topic.StartsWith("devices/") && topic.EndsWith("/status"))
{
var deviceId = topic.Split('/')[1];
var status = JsonSerializer.Deserialize<DeviceStatusUpdate>(payload);
UpdateDeviceStatus(deviceId, status);
}
// 组消息处理
else if (topic.StartsWith("groups/"))
{
var groupId = topic.Split('/')[1];
GroupMessageReceived?.Invoke(this,
new GroupMessageEventArgs(groupId, payload));
}
}
catch (Exception ex)
{
_logger?.LogError(ex, "消息处理失败");
}
}
private void UpdateDeviceStatus(string deviceId, DeviceStatusUpdate status)
{
if (_activeDevices.TryGetValue(deviceId, out var context))
{
context.Status = status.NewStatus;
context.LastSeen = DateTime.UtcNow;
DeviceStatusChanged?.Invoke(this,
new DeviceStatusEventArgs(deviceId, status.NewStatus));
}
}
#endregion
#region 联动控制
public async Task SendGroupCommand(string groupId, object command)
{
var devices = _deviceRegistry.GetDevicesInGroup(groupId);
var tasks = devices.Select(device =>
PublishAsync($"devices/{device.DeviceId}/cmd", command));
await Task.WhenAll(tasks);
}
public async Task PublishAsync(string topic, object payload,
MqttQualityOfServiceLevel qos = MqttQualityOfServiceLevel.AtLeastOnce)
{
var message = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(JsonSerializer.Serialize(payload))
.WithQualityOfServiceLevel(qos)
.Build();
if (_mqttClient.IsConnected)
{
await _mqttClient.EnqueueAsync(message);
}
else
{
_messageQueue.Enqueue(message);
}
}
#endregion
#region 辅助方法
private void HeartbeatCheck(object state)
{
foreach (var (deviceId, context) in _activeDevices)
{
if (DateTime.UtcNow - context.LastSeen > TimeSpan.FromMinutes(5))
{
UpdateDeviceStatus(deviceId, new DeviceStatusUpdate
{
NewStatus = DeviceStatus.Offline
});
}
}
}
#endregion
#region 清理资源
public void Dispose()
{
_heartbeatTimer?.Dispose();
_mqttClient?.Dispose();
GC.SuppressFinalize(this);
}
#endregion
}
8.2. 支持类定义
8.2.1 设备注册表 DeviceRegistry.cs
public class DeviceRegistry
{
private readonly ConcurrentDictionary<string, DeviceProfile> _devices = new();
private readonly ConcurrentDictionary<string, List<string>> _deviceGroups = new();
public void Register(DeviceProfile device)
{
if (!_devices.TryAdd(device.DeviceId, device))
throw new InvalidOperationException($"设备 {device.DeviceId} 已注册");
foreach (var group in device.Groups)
{
_deviceGroups.GetOrAdd(group, _ => new List<string>()).Add(device.DeviceId);
}
}
public IEnumerable<DeviceProfile> GetDevicesInGroup(string groupId)
{
return _deviceGroups.TryGetValue(groupId, out var deviceIds)
? deviceIds.Select(id => _devices[id])
: Enumerable.Empty<DeviceProfile>();
}
}
8.2.2 消息队列服务 MessageQueueService.cs
public class MessageQueueService
{
private readonly ConcurrentQueue<MqttApplicationMessage> _queue = new();
private readonly SemaphoreSlim _lock = new(1, 1);
public void Enqueue(MqttApplicationMessage message)
{
_queue.Enqueue(message);
}
public async Task<MqttApplicationMessage[]> FlushAsync()
{
await _lock.WaitAsync();
try
{
var messages = _queue.ToArray();
_queue.Clear();
return messages;
}
finally
{
_lock.Release();
}
}
}
8.3. 数据模型
8.3.1 设备模型
public class DeviceProfile
{
public required string DeviceId { get; set; }
public DeviceType DeviceType { get; set; }
public List<string> Groups { get; set; } = new();
public Dictionary<string, object> Metadata { get; set; } = new();
}
public enum DeviceType { Sensor, Actuator, Gateway }
public class DeviceContext
{
public DeviceStatus Status { get; set; }
public DateTime LastSeen { get; set; }
}
public enum DeviceStatus { Online, Offline, Error }
8.3.2 事件参数
public class DeviceStatusEventArgs : EventArgs
{
public string DeviceId { get; }
public DeviceStatus Status { get; }
public DeviceStatusEventArgs(string deviceId, DeviceStatus status)
{
DeviceId = deviceId;
Status = status;
}
}
public class GroupMessageEventArgs : EventArgs
{
public string GroupId { get; }
public string Message { get; }
public GroupMessageEventArgs(string groupId, string message)
{
GroupId = groupId;
Message = message;
}
}
8.4. 配置类
public class MQTTConfig
{
public required string BrokerAddress { get; set; }
public int Port { get; set; } = 1883;
public string ClientId { get; set; }
public string Username { get; set; }
public string Password { get; set; }
public bool UseTls { get; set; }
public List<byte[]> ClientCertificates { get; set; } = new();
}
8.5. 使用示例
// 初始化
var config = new MQTTConfig
{
BrokerAddress = "mqtt.example.com",
Port = 8883,
Username = "admin",
Password = "pass123",
UseTls = true
};
var manager = new MQTTDeviceManager(config);
// 注册温度传感器
manager.RegisterDevice(new DeviceProfile
{
DeviceId = "temp-sensor-01",
DeviceType = DeviceType.Sensor,
Groups = new List<string> { "room1", "alerts" }
});
// 发送组命令
await manager.SendGroupCommand("room1", new { Command = "READ_ALL" });
// 监听设备状态变化
manager.DeviceStatusChanged += (sender, e) =>
{
Console.WriteLine($"设备 {e.DeviceId} 状态变更为 {e.Status}");
};
完整项目工程结构:
MQTTToolkit/
├── Core/
│ ├── MQTTDeviceManager.cs
│ ├── DeviceRegistry.cs
├── Services/
│ ├── MessageQueueService.cs
├── Models/
│ ├── Device.cs
│ ├── Events.cs
└── Config/
├── MQTTConfig.cs
实际部署时需配合 EMQX 或 Mosquitto 等MQTT Broker使用,生产环境建议启用TLS双向认证。