EMQX3.0多端数据传输(解决方案)

时间:2024-04-06 16:24:31

EMQX3.0多端数据传输(解决方案)

功能实现
  • 多台EMQX消息服务器之间数据传达
  • Python本地化测试
功能实现思路
--------------------------------------
EMQX-1(此时为客户端)		--->		EMQX2(此时为服务器)
--------------------------------------
两台消息服务器之间传输数据,需要注意一点,就是订阅方和发布方的关系。
这种关系并不需要真的修改配置文件,但确实是在考验逻辑部署。pip install paho-mqtt
简略图
	-------------------------------------------------------------------------------

		传感器-1-
				-(采集数据)
		传感器-2 - --->	EMQX-1	(发布)---> 主题(text名称) <---(订阅)  EMQX-2	
				-(采集数据)
		传感器-3-

	-------------------------------------------------------------------------------
配置声明

EMQX-1此时承载了传感器传输的数据,作为消息的发布方。同时websocket连接EMQX-2,这个场景在于异地双机状态。

EMQX-1通过websocket连接到EMQX-2之后,EMQX-2需要订阅EMQX-1所发布的消息。这个消息指的就是主题订阅。


测试阶段

本人的EMQX消息服务器部署在了阿里云上,本地化也有一个EMQX,不过是当做学习用的。

先用python模拟一个传感器,向云服务器发送消息。

需要的python库

pip install paho-mqtt
# -*- coding:utf-8 -*-
# 这是传感器

import paho.mqtt.client as mqtt

HOST = "服务器IP"
PORT = 1883
# 端口不需要更改,因为1883是MQTT:TCP协议部署EMQX用到的端口。


def test():
    client = mqtt.Client()
    client.connect(HOST, PORT, 60)
    client.publish("testtopic","hello liefyuan",0, retain=True) 
    # 发布一个主题为'chat',内容为‘hello liefyuan’的信息
    client.loop_forever()

    
if __name__ == '__main__':
    test()

在云服务器上的EMQX服务器订阅主题,该主题和python执行程序中所写的主题是相同的。EMQX3.0多端数据传输(解决方案)

如果python执行程序中,所设定的主题名中包含了特殊字符,将会报错。

错误类型为:ValueError(‘Publish topic cannot contain wildcards.’)


测试成功的样子如下

EMQX3.0多端数据传输(解决方案)

这个时候,想要运维管理EMQX消息服务器的话。可以看看

http://emqtt.com/docs/v3/commands.html#subscriptions

在云端上,EMQX消息服务器应该是在linux系统上。如果本地化windows也想要管理,也是可以的。

这里推荐使用cmder来替代cmd和powershell,不为什么,就是嫌自带的终端丑!
EMQX3.0多端数据传输(解决方案)


EMQX消息服务器数据跳转(逻辑)

数据跳转思维(简略图)


		EMQX-1(发布)							   EMQX-2(订阅)
	---------------------------------------------------------------
			|============>>>  EMQX-3(订阅)  >>>=========|

原始设备不需要改变,EMQX消息服务器和本地化python。在这个环境下,我们需要另外一台客户端来订阅EMQX消息服务器所订阅到的数据流。

# -*- coding:utf-8 -*-

import paho.mqtt.client as mqtt

# The callback for when the client receives a CONNACK response from the server.


def on_connect(client, userdata, flags, rc):
    print("Connected with result code " + str(rc))

    # Subscribing in on_connect() means that if we lose the connection and
    # reconnect then subscriptions will be renewed.
    client.subscribe("testtopic")
    # 订阅的主题和EMQX消息服务器所订阅的相同

# The callback for when a PUBLISH message is received from the server.


def on_message(client, userdata, msg):
    print(msg.topic + " " + str(msg.payload))


client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message

# client.connect("149.129.102.153", 1883, 60)
client.connect("服务器IP", 1883, 60)


# Blocking call that processes network traffic, dispatches callbacks and
# handles reconnecting.
# Other loop*() functions are available that give a threaded interface and a
# manual interface.
client.loop_forever()

测试成功时,将会收到订阅主题所发送的数据流。
EMQX3.0多端数据传输(解决方案)

而EMQX消息服务器上,将会出现如下情况
EMQX3.0多端数据传输(解决方案)

至于如何区别哪个节点是发布方,哪个节点是订阅方,看客户端ID和订阅数。但是python在做测试时,所模拟出来的客户端ID,本人现在还没办法控制,还请见谅。

这个数据跳转逻辑,还请见上面的简略图