EMQX3.0多端数据传输(解决方案)
功能实现
- 多台EMQX消息服务器之间数据传达
- Python本地化测试
功能实现思路
--------------------------------------
EMQX-1(此时为客户端) ---> EMQX2(此时为服务器)
--------------------------------------
两台消息服务器之间传输数据,需要注意一点,就是订阅方和发布方的关系。
这种关系并不需要真的修改配置文件,但确实是在考验逻辑部署。pip install paho-mqtt
简略图
-------------------------------------------------------------------------------
传感器-1-
-(采集数据)
传感器-2 - ---> EMQX-1 (发布)---> 主题(text名称) <---(订阅) EMQX-2
-(采集数据)
传感器-3-
-------------------------------------------------------------------------------
配置声明
EMQX-1此时承载了传感器传输的数据,作为消息的发布方。同时websocket连接EMQX-2,这个场景在于异地双机状态。
EMQX-1通过websocket连接到EMQX-2之后,EMQX-2需要订阅EMQX-1所发布的消息。这个消息指的就是主题订阅。
测试阶段
本人的EMQX消息服务器部署在了阿里云上,本地化也有一个EMQX,不过是当做学习用的。
先用python模拟一个传感器,向云服务器发送消息。
需要的python库
pip install paho-mqtt
# -*- coding:utf-8 -*-
# 这是传感器
import paho.mqtt.client as mqtt
HOST = "服务器IP"
PORT = 1883
# 端口不需要更改,因为1883是MQTT:TCP协议部署EMQX用到的端口。
def test():
client = mqtt.Client()
client.connect(HOST, PORT, 60)
client.publish("testtopic","hello liefyuan",0, retain=True)
# 发布一个主题为'chat',内容为‘hello liefyuan’的信息
client.loop_forever()
if __name__ == '__main__':
test()
在云服务器上的EMQX服务器订阅主题,该主题和python执行程序中所写的主题是相同的。
如果python执行程序中,所设定的主题名中包含了特殊字符,将会报错。
错误类型为:ValueError(‘Publish topic cannot contain wildcards.’)
测试成功的样子如下
这个时候,想要运维管理EMQX消息服务器的话。可以看看
http://emqtt.com/docs/v3/commands.html#subscriptions
在云端上,EMQX消息服务器应该是在linux系统上。如果本地化windows也想要管理,也是可以的。
这里推荐使用cmder来替代cmd和powershell,不为什么,就是嫌自带的终端丑!
EMQX消息服务器数据跳转(逻辑)
数据跳转思维(简略图)
EMQX-1(发布) EMQX-2(订阅) --------------------------------------------------------------- |============>>> EMQX-3(订阅) >>>=========|
原始设备不需要改变,EMQX消息服务器和本地化python。在这个环境下,我们需要另外一台客户端来订阅EMQX消息服务器所订阅到的数据流。
# -*- coding:utf-8 -*-
import paho.mqtt.client as mqtt
# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):
print("Connected with result code " + str(rc))
# Subscribing in on_connect() means that if we lose the connection and
# reconnect then subscriptions will be renewed.
client.subscribe("testtopic")
# 订阅的主题和EMQX消息服务器所订阅的相同
# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg):
print(msg.topic + " " + str(msg.payload))
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
# client.connect("149.129.102.153", 1883, 60)
client.connect("服务器IP", 1883, 60)
# Blocking call that processes network traffic, dispatches callbacks and
# handles reconnecting.
# Other loop*() functions are available that give a threaded interface and a
# manual interface.
client.loop_forever()
测试成功时,将会收到订阅主题所发送的数据流。
而EMQX消息服务器上,将会出现如下情况
至于如何区别哪个节点是发布方,哪个节点是订阅方,看客户端ID和订阅数。但是python在做测试时,所模拟出来的客户端ID,本人现在还没办法控制,还请见谅。
这个数据跳转逻辑,还请见上面的简略图