转: python 利用EMQ实现消费者和生产者模型

时间:2022-08-02 15:14:36

消费者

"""
测试emq-消费者
@author me
"""
import paho.mqtt.client as mqtt
import time
 
 
class Consumer(object):
 
    def get_time(self):
        """
    获取时间
    """
        return time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(time.time()))
 
    def on_subscribe(self, client, userdata, mid, granted_qos):
        """
        开始时订阅 callback
        :param userdata:
        :param mid:
        :param granted_qos:
        :return:
        """
        print(self.get_time(), "Begin subscribe topic with ", mid)
 
    def on_message(self, client, userdata, message):
        """
        接收消息 callback
        :param userdata:
        :param message:
        :return:
        """
        print(self.get_time(), " Received message '" + str(message.payload) + "' on topic '" +
              message.topic + "' with QoS " + str(message.qos))
 
    def on_connect(self, client, userdata, flags, rc):
        """
        连接时的 callback
        :param client:
        :param userdata:
        :param flags:
        :param rc:
        :return:
        """
        print(self.get_time(), "[consumer]Connected with result code " + str(rc))
        if rc == 0:
            sub_result = client.subscribe("/chat/room/4", qos=0)
            print(self.get_time(), "Connected with result is (status,mid)", sub_result)
        else:
            print(self.get_time(), " connect failed")
 
    def run(self):
        # 4就是MQTT3.1.1
        emq_client = mqtt.Client(client_id="emqttd_2018080922", userdata=None, protocol=4)
        emq_client.on_connect = self.on_connect
        # emq_client.on_disconnect = self.on_disconnect
        emq_client.on_message = self.on_message
        emq_client.on_subscribe = self.on_subscribe
        # 设置用户密码,如果没有设置用户,这里可以省略
        emq_client.username_pw_set('admin', "123.com")
        emq_client.connect("192.168.0.251", 1883, keepalive=60)
        emq_client.loop_forever()
 
 
if __name__ == "__main__":
    consumer = Consumer()
    consumer.run()

  

生产者

"""
测试emq-生产者
@author me
"""
import paho.mqtt.client as mqtt
import time
 
 
class Producer(object):
 
    def get_time(self):
        """
      获取时间
      """
        return time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(time.time()))
 
    def on_disconnect(self, client, userdata, rc):
        """
        rc 是操作结果的状态码,0 代表成功
        断开连接时的 callback
        """
        print(self.get_time(), " end a loop with code "+str(rc))
 
    def pub_topic(self, client, topic_text):
        """
        发布topic
        """
        try:
            pub_result = client.publish("/chat/room/4", payload=topic_text, qos=0, retain=False)
            if pub_result.is_published:
                print(self.get_time(), " success pub message with id: ", pub_result.mid)
            else:
                print(self.get_time(), "failed to  pub message")
        except ValueError as err_str:
            print(self.get_time(), "please check your parameters: ", err_str)
 
    def on_connect(self, client, userdata, flags, rc):
        """
        连接broker时的callback
        """
        print(self.get_time(), "[producer]Connected with result code " + str(rc))
        if rc == 0:
            while True:
                topic_text = input("Enter your topic text,('end' to end a loop): ")
                if topic_text == "end":
                    print(self.get_time(), "EXIT ..... ")
                    client.disconnect()
                    break
                else:
                    self.pub_topic(client, topic_text)
        else:
            print(self.get_time(), "Connected Failed, Exited ")
            client.disconnect()
 
    def run(self):
        # 4就是MQTT3.1.1
        emq_client = mqtt.Client(client_id="emqttd_2018080946", userdata=None, protocol=4)
        emq_client.on_connect = self.on_connect
        emq_client.on_disconnect = self.on_disconnect
        # 设置用户密码,如果没有设置用户,这里可以省略
        emq_client.username_pw_set('admin', "123.com")
        emq_client.connect("192.168.0.250", 1883, keepalive=60)
        emq_client.loop_forever()
 
    def main(self):
        self.run()
        while True:
            start = input("Enter your start sign,('no' to end a  program): ")
            if start != 'no':
                self.run()
            else:
                print(self.get_time(), "Exiting program ")
                break
 
 
if __name__ == "__main__":
    producer = Producer()
    producer.main()

  

运行

首先,启动消费者,然后启动生产者,就可以输入消息体了,输入 end 表示发布之前的消息,no 结束生产者。
生产者
消费者