物联网架构成长之路(32)-SpringBoot集成MQTT客户端

时间:2022-06-05 17:21:39

一、前言
  这里虽然是说MQTT客户端。其实对于服务器来说,这里的一个具有超级权限的MQTT客户端,就可以做很多事情。比如手机APP或者网页或者第三方服务需要发送数据到设备,但是这些又不是设备,又不能让他们连到MQTT。那么就可以通过HTTP请求业务服务器。然后由业务服务器利用这个MQTT客户端进行发送数据。
  还有,之前好多人问我,怎么保存这些物联网数据,真的要像前面的博客那样,要自己写插件吗?特别麻烦的啊。这里给出的结论是不需要。保存数据,除了写EMQ插件,还可以在EMQ的规则引擎上进行配置Web消息转发【EMQ 3.x 版本】,还有就是这种通过业务服务器订阅根Topic来保存物联网原始数据。
  这篇博客这讨论如何把MQTT客户端集成到业务服务器上(基于SpringBoot 2.0)。下一篇博客会讲到数据保存到InfluxDB,然后如何通过Grafana进行可视化Dashboard看板模式展示。

二、配置pom.xml,引入第三方库

         <!-- MQTT -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>

三、MQTT客户端代码(Java)

  MqttDemoApplication.java

 package com.wunaozai.mqtt;

 import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import com.wunaozai.mqtt.tools.MqttPushClient; @SpringBootApplication
public class MqttDemoApplication { public static void main(String[] args) {
SpringApplication.run(MqttDemoApplication.class, args); test();
} private static void test(){
MqttPushClient.MQTT_HOST = "tcp://mqtt.com:1883";
MqttPushClient.MQTT_CLIENTID = "client";
MqttPushClient.MQTT_USERNAME = "username";
MqttPushClient.MQTT_PASSWORD = "password";
MqttPushClient client = MqttPushClient.getInstance();
client.subscribe("/#");
}
}

  MqttPushCallback.java

 package com.wunaozai.mqtt.tools;

 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory; /**
* MQTT 推送回调
* @author wunaozai
* @date 2018-08-22
*/
public class MqttPushCallback implements MqttCallback { private static final Logger log = LoggerFactory.getLogger(MqttPushCallback.class); @Override
public void connectionLost(Throwable cause) {
log.info("断开连接,建议重连" + this);
//断开连接,建议重连
} @Override
public void deliveryComplete(IMqttDeliveryToken token) {
//log.info(token.isComplete() + "");
} @Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
log.info("Topic: " + topic);
log.info("Message: " + new String(message.getPayload()));
} }

  MqttPushClient.java

 package com.wunaozai.mqtt.tools;

 import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory; /**
* 创建一个MQTT客户端
* @author wunaozai
* @date 2018-08-22
*/
public class MqttPushClient { private static final Logger log = LoggerFactory.getLogger(MqttPushClient.class);
public static String MQTT_HOST = "";
public static String MQTT_CLIENTID = "";
public static String MQTT_USERNAME = "";
public static String MQTT_PASSWORD = "";
public static int MQTT_TIMEOUT = 10;
public static int MQTT_KEEPALIVE = 10; private MqttClient client;
private static volatile MqttPushClient mqttClient = null;
public static MqttPushClient getInstance() {
if(mqttClient == null) {
synchronized (MqttPushClient.class) {
if(mqttClient == null) {
mqttClient = new MqttPushClient();
}
}
}
return mqttClient;
} private MqttPushClient() {
log.info("Connect MQTT: " + this);
connect();
} private void connect() {
try {
client = new MqttClient(MQTT_HOST, MQTT_CLIENTID, new MemoryPersistence());
MqttConnectOptions option = new MqttConnectOptions();
option.setCleanSession(true);
option.setUserName(MQTT_USERNAME);
option.setPassword(MQTT_PASSWORD.toCharArray());
option.setConnectionTimeout(MQTT_TIMEOUT);
option.setKeepAliveInterval(MQTT_KEEPALIVE);
option.setAutomaticReconnect(true);
try {
client.setCallback(new MqttPushCallback());
client.connect(option);
} catch (Exception e) {
e.printStackTrace();
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 发布主题,用于通知<br>
* 默认qos为1 非持久化
* @param topic
* @param data
*/
public void publish(String topic, String data) {
publish(topic, data, 1, false);
}
/**
* 发布
* @param topic
* @param data
* @param qos
* @param retained
*/
public void publish(String topic, String data, int qos, boolean retained) {
MqttMessage message = new MqttMessage();
message.setQos(qos);
message.setRetained(retained);
message.setPayload(data.getBytes());
MqttTopic mqttTopic = client.getTopic(topic);
if(null == mqttTopic) {
log.error("Topic Not Exist");
}
MqttDeliveryToken token;
try {
token = mqttTopic.publish(message);
token.waitForCompletion();
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 订阅某个主题 qos默认为1
* @param topic
*/
public void subscribe(String topic) {
subscribe(topic, 1);
}
/**
* 订阅某个主题
* @param topic
* @param qos
*/
public void subscribe(String topic, int qos) {
try {
client.subscribe(topic, qos);
} catch (Exception e) {
e.printStackTrace();
}
}
}

四、MQTT客户端代码(C#)
  为了下下篇博客Grafana有数据可以展示,我需要开发一个PC小工具【设备仿真】,用来模拟设备一直发送数据。这里就不对C#开发进行过多的说明了。通过nuget,引入第三方mqtt库。这个工具是我现在开发平台工具链的一个小工具。至于里面的Payload协议,可以不用管。读者可以根据自己的业务制定自己的通信协议。
物联网架构成长之路(32)-SpringBoot集成MQTT客户端

  部分C#代码(连接服务器与发送数据)

 using MQTTClient.Model;
using MQTTnet;
using MQTTnet.Core;
using MQTTnet.Core.Client;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using System.Windows.Forms; namespace MQTTClient
{
public partial class MainPage : Form
{
public MainPage()
{
InitializeComponent();
init();
}
private void init()
{
txtusername.Text = "";
txtpassword.Text = "";
txtclientid.Text = "";
txttopic.Text = "iot/UUID/device/devicepub/update";
} IMqttClient client = null;
private async Task ConnectMqttServerAsync()
{
if(client == null)
{
client = new MqttClientFactory().CreateMqttClient() as MqttClient;
client.ApplicationMessageReceived += mqttClientApplicationMessageReceived;
client.Connected += mqttClientConnected;
client.Disconnected += mqttClientDisconnected;
}
try
{
await client.DisconnectAsync();
var option = getMQTTOption();
await client.ConnectAsync(option);
}catch(Exception e)
{
Invoke((new Action(() =>
{
lblStatus.Text = "连接服务器失败: " + e.Message;
})));
}
}
private void mqttClientDisconnected(object sender, EventArgs e)
{
Invoke((new Action(() =>
{
lblStatus.Text = "连接服务器失败: ERROR";
})));
}
private void mqttClientConnected(object sender, EventArgs e)
{
Invoke((new Action(() =>
{
lblStatus.Text = "连接服务器成功";
})));
}
private void mqttClientApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e)
{
//本工具部收数据
throw new NotImplementedException();
} private void btnconnect_Click(object sender, EventArgs e)
{
Task.Run(async () => { await ConnectMqttServerAsync(); });
}
private void btndisconnect_Click(object sender, EventArgs e)
{
client.DisconnectAsync();
}
private void btnsendone_Click(object sender, EventArgs e)
{
sendPayload();
}
private void btnsendts_Click(object sender, EventArgs e)
{
timer1.Interval = Convert.ToInt32(txttime.Text);
timer1.Enabled = true;
}
private void btnstopts_Click(object sender, EventArgs e)
{
timer1.Enabled = false;
}
private void timer1_Tick(object sender, EventArgs e)
{
sendPayload();
}
private int sendPayload()
{
if (client.IsConnected == false)
{
return -;
}
PayloadModel payload = getPayload();
string json = JsonConvert.SerializeObject(payload, Formatting.Indented);
txtview.Text = json;
string topic = txttopic.Text;
var msg = new MqttApplicationMessage(topic, Encoding.Default.GetBytes(json),
MQTTnet.Core.Protocol.MqttQualityOfServiceLevel.AtMostOnce, false);
client.PublishAsync(msg);
lblSendStatus.Text = "发送: " + DateTime.Now.ToLongTimeString();
return ;
} private MqttClientTcpOptions getMQTTOption()
{
MqttClientTcpOptions option = new MqttClientTcpOptions();
string hostname = txthostname.Text;
string[] host_port = hostname.Split(':');
int port = ;
if(host_port.Length >= )
{
hostname = host_port[];
port = Convert.ToInt32(host_port[]);
}
option.Server = hostname;
option.ClientId = txtclientid.Text;
option.UserName = txtusername.Text;
option.Password = txtpassword.Text;
option.Port = port;
option.CleanSession = true;
return option;
} private PayloadModel getPayload()
{
PayloadModel payload = new PayloadModel();
//略
return payload;
} Random rand1 = new Random(System.DateTime.Now.Millisecond);
private int getRandomNum()
{
int data = rand1.Next(, );
return data;
} int linenum = ;
Random rand2 = new Random(System.DateTime.Now.Millisecond);
private int getLineNum()
{
int f = rand2.Next(, );
int data = rand2.Next(, );
if(f % == )
{
linenum += data;
}
else
{
linenum -= data;
}
return linenum;
} }
}

物联网架构成长之路(32)-SpringBoot集成MQTT客户端

本文地址: https://www.cnblogs.com/wunaozai/p/11147841.html