使用Docker安装Kafka-Kafka-python

时间:2024-04-11 11:06:18

接下来介绍如何使用 python 的 kafka 库实现:生产者读取本地文档中的内容并以流的形式发送到 kafka 主题,消费者从 kafka 主题接收数据流并输出。(注:代码中的 ip 地址注意替换

库安装

pip3 install kafka-python

生产者

from kafka import KafkaProducer
import time

# 创建Kafka生产者
producer = KafkaProducer(bootstrap_servers='{your_ip_address}:9002')

# 读取本地数据文件
with open('doc.txt', 'r') as file:
    for line in file:
        # 发送数据到Kafka主题
        producer.send('test-kafka', line.encode('utf-8'))
        # 设置发送间隔
        time.sleep(1)

消费者

from kafka import KafkaConsumer

# 创建Kafka消费者
consumer = KafkaConsumer('test-kafka', bootstrap_servers='{your_ip_address}:9002')

# 消费数据
for message in consumer:
    print(f"Received message: {message.value.decode('utf-8')}")