消费者
""" 测试emq-消费者 @author me """ import paho.mqtt.client as mqtt import time class Consumer(object): def get_time(self): """ 获取时间 """ return time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(time.time())) def on_subscribe(self, client, userdata, mid, granted_qos): """ 开始时订阅 callback :param userdata: :param mid: :param granted_qos: :return: """ print(self.get_time(), "Begin subscribe topic with ", mid) def on_message(self, client, userdata, message): """ 接收消息 callback :param userdata: :param message: :return: """ print(self.get_time(), " Received message '" + str(message.payload) + "' on topic '" + message.topic + "' with QoS " + str(message.qos)) def on_connect(self, client, userdata, flags, rc): """ 连接时的 callback :param client: :param userdata: :param flags: :param rc: :return: """ print(self.get_time(), "[consumer]Connected with result code " + str(rc)) if rc == 0: sub_result = client.subscribe("/chat/room/4", qos=0) print(self.get_time(), "Connected with result is (status,mid)", sub_result) else: print(self.get_time(), " connect failed") def run(self): # 4就是MQTT3.1.1 emq_client = mqtt.Client(client_id="emqttd_2018080922", userdata=None, protocol=4) emq_client.on_connect = self.on_connect # emq_client.on_disconnect = self.on_disconnect emq_client.on_message = self.on_message emq_client.on_subscribe = self.on_subscribe # 设置用户密码,如果没有设置用户,这里可以省略 emq_client.username_pw_set('admin', "123.com") emq_client.connect("192.168.0.251", 1883, keepalive=60) emq_client.loop_forever() if __name__ == "__main__": consumer = Consumer() consumer.run()
生产者
""" 测试emq-生产者 @author me """ import paho.mqtt.client as mqtt import time class Producer(object): def get_time(self): """ 获取时间 """ return time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(time.time())) def on_disconnect(self, client, userdata, rc): """ rc 是操作结果的状态码,0 代表成功 断开连接时的 callback """ print(self.get_time(), " end a loop with code "+str(rc)) def pub_topic(self, client, topic_text): """ 发布topic """ try: pub_result = client.publish("/chat/room/4", payload=topic_text, qos=0, retain=False) if pub_result.is_published: print(self.get_time(), " success pub message with id: ", pub_result.mid) else: print(self.get_time(), "failed to pub message") except ValueError as err_str: print(self.get_time(), "please check your parameters: ", err_str) def on_connect(self, client, userdata, flags, rc): """ 连接broker时的callback """ print(self.get_time(), "[producer]Connected with result code " + str(rc)) if rc == 0: while True: topic_text = input("Enter your topic text,('end' to end a loop): ") if topic_text == "end": print(self.get_time(), "EXIT ..... ") client.disconnect() break else: self.pub_topic(client, topic_text) else: print(self.get_time(), "Connected Failed, Exited ") client.disconnect() def run(self): # 4就是MQTT3.1.1 emq_client = mqtt.Client(client_id="emqttd_2018080946", userdata=None, protocol=4) emq_client.on_connect = self.on_connect emq_client.on_disconnect = self.on_disconnect # 设置用户密码,如果没有设置用户,这里可以省略 emq_client.username_pw_set('admin', "123.com") emq_client.connect("192.168.0.250", 1883, keepalive=60) emq_client.loop_forever() def main(self): self.run() while True: start = input("Enter your start sign,('no' to end a program): ") if start != 'no': self.run() else: print(self.get_time(), "Exiting program ") break if __name__ == "__main__": producer = Producer() producer.main()
运行
首先,启动消费者,然后启动生产者,就可以输入消息体了,输入 end 表示发布之前的消息,no 结束生产者。
生产者
消费者