如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
from kafka import KafkaProducer
import json
'''
生产者demo
向test_lyl2主题中循环写入10条json数据
注意事项:要写入json数据需加上value_serializer参数,如下代码
'''
producer = KafkaProducer(
value_serializer = lambda v: json.dumps(v).encode( 'utf-8' ),
bootstrap_servers = [ '192.168.12.101:6667' , '192.168.12.102:6667' , '192.168.12.103:6667' ]
)
for i in range ( 10 ):
data = {
"name" : "李四" ,
"age" : 23 ,
"gender" : "男" ,
"id" :i
}
producer.send( 'test_lyl2' , data)
producer.close()
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
from kafka import KafkaConsumer
import json
'''
消费者demo
消费test_lyl2主题中的数据
注意事项:如需以json格式读取数据需加上value_deserializer参数
'''
consumer = KafkaConsumer( 'test_lyl2' ,group_id = "lyl-gid1" ,
bootstrap_servers = [ '192.168.12.101:6667' , '192.168.12.102:6667' , '192.168.12.103:6667' ],
auto_offset_reset = 'earliest' ,value_deserializer = json.loads
)
for message in consumer:
print (message.value)
|
以上这篇对python操作kafka写入json数据的简单demo分享就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持服务器之家。
原文链接:https://blog.csdn.net/qq_32502511/article/details/82109933