接下来介绍如何使用 python 的 kafka 库实现:生产者读取本地文档中的内容并以流的形式发送到 kafka 主题,消费者从 kafka 主题接收数据流并输出。(注:代码中的 ip 地址注意替换)
库安装
pip3 install kafka-python
生产者
from kafka import KafkaProducer
import time
# 创建Kafka生产者
producer = KafkaProducer(bootstrap_servers='{your_ip_address}:9002')
# 读取本地数据文件
with open('doc.txt', 'r') as file:
for line in file:
# 发送数据到Kafka主题
producer.send('test-kafka', line.encode('utf-8'))
# 设置发送间隔
time.sleep(1)
消费者
from kafka import KafkaConsumer
# 创建Kafka消费者
consumer = KafkaConsumer('test-kafka', bootstrap_servers='{your_ip_address}:9002')
# 消费数据
for message in consumer:
print(f"Received message: {message.value.decode('utf-8')}")