C#建立MQTT服务器(基于MQTTnet,局域网)
namespace MQTT_STA
{
public partial class Form1 : Form
{
int lianjie = 0;
public delegate void MyInvoke(string str1, string str2, string str3, string str4);
public Form1()
{
InitializeComponent();
}
private void button1_Click(object sender, EventArgs e)
{
StartMqttServer();
}
public MqttServer mqttServer = null;
public async void StartMqttServer()
{
try
{
if (mqttServer == null)
{
var optionsBuilder = new MqttServerOptionsBuilder()
.WithDefaultEndpoint().WithDefaultEndpointPort(Convert.ToInt32(textBox1.Text)).WithConnectionValidator(
c =>
{
if (!c.Username.Equals(textBox3.Text.ToString()))
{
c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
return;
}
if (!c.Password.Equals(textBox2.Text.ToString()))
{
c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
return;
}
c.ReasonCode = MqttConnectReasonCode.Success;
}).WithSubscriptionInterceptor(
c =>
{
c.AcceptSubscription = true;
}).WithApplicationMessageInterceptor(
c =>
{
c.AcceptPublish = true;
});
mqttServer = new MqttFactory().CreateMqttServer() as MqttServer;
mqttServer.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(OnMqttServerClientConnected);
mqttServer.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(OnMqttServerClientDisconnected);
mqttServer.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(OnMqttServer_ApplicationMessageReceived);
await mqttServer.StartAsync(optionsBuilder.Build());
toolStripLabel1.Text = "启动成功!";
}
}
catch (Exception ex)
{
toolStripLabel1.Text = "启动失败......";
}
}
public async void StopMqttServer()
{
if (mqttServer == null) return;
try
{
await mqttServer?.StopAsync();
mqttServer = null;
toolStripLabel1.Text = "关闭成功";
}
catch (Exception ex)
{
toolStripLabel1.Text = "关闭失败......";
}
}
private void button2_Click(object sender, EventArgs e)
{
ServerPublishMqttTopic();
}
public async void ServerPublishMqttTopic()
{
await mqttServer.SubscribeAsync(textBox4.Text);
label5.Text = textBox4.Text+"订阅成功!";
}
private void button3_Click(object sender, EventArgs e)
{
ServerPublishMqttTopic(textBox6.Text, textBox7.Text);
}
public async void ServerPublishMqttTopic(string topic, string payload)
{
var message = new MqttApplicationMessage()
{
Topic = topic,
Payload = Encoding.UTF8.GetBytes(payload)
};
await mqttServer.PublishAsync(message);
textBox5.Text = textBox5.Text + "主题:"+topic+"\n内容:"+payload+"\n发送成功!\n";
}
private void toolStripButton1_Click(object sender, EventArgs e)
{
StopMqttServer();
}
public void OnMqttServerClientConnected(MqttServerClientConnectedEventArgs e)
{
lianjie++;
toolStripLabel2.Text = "连接数:" +lianjie;
}
public void OnMqttServerClientDisconnected(MqttServerClientDisconnectedEventArgs e)
{
lianjie--;
toolStripLabel2.Text = "连接数:" + lianjie;
}
public void OnMqttServer_ApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs e)
{
string text = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
string Topic = e.ApplicationMessage.Topic;
string QoS = e.ApplicationMessage.QualityOfServiceLevel.ToString();
string Retained = e.ApplicationMessage.Retain.ToString();
MyInvoke jieshou = new MyInvoke(jieshouo);
this.BeginInvoke(jieshou, new Object[] { Topic, QoS, Retained, text });
}
public void jieshouo(string strr1, string strr2, string strr3, string strr4)
{
textBox5.Text = textBox5.Text +"主题来源:"+ strr1 +"\nQoS:"+ strr2+"\nRetained:" + strr3+"\n内容:" + strr4+"\n----\n";
}
}
}