首先安装这个python库:
需求描述:
- 第一步:客户端访问某个路由就可以执行固定的提交到MQTT服务器的操作。
- 第二步:客户端以get方法访问某个路由,flask提取get中的参数并验证,验证完成以后将参数中的数据提交到MQTT服务器中。
- 第三步:将前面的代码整合到SockIO中去。
一、flask上的简单应用:
#encoding: utf-8
import as mqtt
# 当连接上服务器后回调此函数
def on_connect(client, userdata, flags, rc):
print("Connected with result code " + str(rc))
# 放在on_connect函数里意味着
# 重新连接时订阅主题将会被更新
("chat")
# 从服务器接受到消息后回调此函数
def on_message(client, userdata, msg):
print("主题:" + + " 消息:" + str())
client = ()
# 参数有 Client(client_id="", clean_session=True, userdata=None, protocol=MQTTv311, transport="tcp")
client.on_connect = on_connect # 设置连接上服务器回调函数
client.on_message = on_message # 设置接收到服务器消息回调函数
("", 1883, 60) # 连接服务器,端口为1883,维持心跳为60秒
('chat','this is a test!') # 往主题chat里发送消息
client.loop_forever()
二、flaskIO配合MQTT的应用
#encoding: utf-8
from flask import Flask, render_template, request
import eventlet
import json
import as mqtt_client
from flask_mqtt import Mqtt
from flask_socketio import SocketIO
from flask_bootstrap import Bootstrap
eventlet.monkey_patch()
app = Flask(__name__)
['SECRET'] = 'my secret key'
['TEMPLATES_AUTO_RELOAD'] = True
['MQTT_BROKER_URL'] = ''
['MQTT_BROKER_PORT'] = 1883
['MQTT_USERNAME'] = ''
['MQTT_PASSWORD'] = ''
['MQTT_KEEPALIVE'] = 60
['MQTT_TLS_ENABLED'] = False
['MQTT_LAST_WILL_TOPIC'] = 'home/lastwill'
['MQTT_LAST_WILL_MESSAGE'] = 'bye'
['MQTT_LAST_WILL_QOS'] = 2
# Parameters for SSL enabled
# ['MQTT_BROKER_PORT'] = 8883
# ['MQTT_TLS_ENABLED'] = True
# ['MQTT_TLS_INSECURE'] = True
# ['MQTT_TLS_CA_CERTS'] = ''
mqtt_ws = Mqtt(app)
socketio = SocketIO(app)
bootstrap = Bootstrap(app)
@('/')
def hello_world():
return render_template('')
@('/ws')
def wevsocket():
return render_template('websocket_mqtt_demo.html')
@('publish')
def handle_publish(json_str):
data = (json_str)
mqtt_ws.publish(data['topic'], data['message'])
@('subscribe')
def handle_subscribe(json_str):
data = (json_str)
mqtt_ws.subscribe(data['topic'])
@mqtt_ws.on_message()
def handle_mqtt_message(client, userdata, message):
data = dict(
topic=,
payload=()
)
('mqtt_message', data=data)
@mqtt_ws.on_log()
def handle_logging(client, userdata, level, buf):
print(client, userdata, level, buf)
if __name__ == '__main__':
(app, host='127.0.0.1', port=5001, use_reloader=True, debug=True)
四、flask、flaskIO、加纯Python代码的MQTT
#encoding: utf-8
from flask import Flask, render_template, request
import eventlet
import json
import as mqtt_client
from flask_mqtt import Mqtt
from flask_socketio import SocketIO
from flask_bootstrap import Bootstrap
eventlet.monkey_patch()
app = Flask(__name__)
['SECRET'] = 'my secret key'
['TEMPLATES_AUTO_RELOAD'] = True
['MQTT_BROKER_URL'] = ''
['MQTT_BROKER_PORT'] = 1883
['MQTT_USERNAME'] = ''
['MQTT_PASSWORD'] = ''
['MQTT_KEEPALIVE'] = 60
['MQTT_TLS_ENABLED'] = False
['MQTT_LAST_WILL_TOPIC'] = 'home/lastwill'
['MQTT_LAST_WILL_MESSAGE'] = 'bye'
['MQTT_LAST_WILL_QOS'] = 2
# Parameters for SSL enabled
# ['MQTT_BROKER_PORT'] = 8883
# ['MQTT_TLS_ENABLED'] = True
# ['MQTT_TLS_INSECURE'] = True
# ['MQTT_TLS_CA_CERTS'] = ''
mqtt_ws = Mqtt(app)
socketio = SocketIO(app)
bootstrap = Bootstrap(app)
# 这里的初始化MQTT可以让纯Python代码运行起来
mqtt_client = mqtt_client.Client()
mqtt_client.connect("", 1883, 60) # 连接服务器,端口为1883,维持心跳为60秒
@('/')
def hello_world():
return render_template('')
# 每次访问这个路由将向chat主题,提交一次数据data
@('/mqtts')
def mqtts():
# mqtt_client = myMqtt()
mqtt_client.publish('chat', 'data')
return 'ok'
@('/ws')
def wevsocket():
return render_template('websocket_mqtt_demo.html')
@('publish')
def handle_publish(json_str):
data = (json_str)
mqtt_ws.publish(data['topic'], data['message'])
@('subscribe')
def handle_subscribe(json_str):
data = (json_str)
mqtt_ws.subscribe(data['topic'])
@mqtt_ws.on_message()
def handle_mqtt_message(client, userdata, message):
data = dict(
topic=,
payload=()
)
('mqtt_message', data=data)
@mqtt_ws.on_log()
def handle_logging(client, userdata, level, buf):
print(client, userdata, level, buf)
if __name__ == '__main__':
(app, host='127.0.0.1', port=5001, use_reloader=True, debug=True)