如何使用Apache Kafka修复Python2.7中的“AssertionError:Value必须为字节”错误

时间:2021-05-06 20:24:16

I'm new at Apache Kafka technology. I'm trying to send messages as JSON object to a kafka topic by using python 2.7 but i got "AssertionError: Value must be bytes" error. I can send messages as string succesfully, i can see my messages with kafka-console-consumer.sh. I'm using apache kafka 2.10-0.8.2.1 version. I'm giving my code below.

我是Apache Kafka技术的新手。我试图通过使用python 2.7将消息作为JSON对象发送到kafka主题,但我得到“AssertionError:Value必须是字节”错误。我可以成功地将消息作为字符串发送,我可以使用kafka-console-consumer.sh查看我的消息。我正在使用apache kafka 2.10-0.8.2.1版本。我在下面给出了我的代码。

from kafka import KafkaProducer
import yaml

producer = KafkaProducer(bootstap_servers="localhost:9092")
msg = yaml.safe_load('{"id":1, "name":"oguz"}')

producer.send("my-topic", msg)

Thanks for your help.

谢谢你的帮助。

1 个解决方案

#1


7  

yaml.safe_load() returns a dict, so there are two things required to turn it into bytes -- serialize it to a string via JSON, and then encode that to bytes as UTF-8.

yaml.safe_load()返回一个dict,因此将其转换为字节需要两件事 - 通过JSON将其序列化为字符串,然后将其编码为UTF-8字节。

Taken from the examples in the kafka-python docs, you can use the value_serializer keyword argument when you instantiate the KafkaProducer:

从kafka-python文档中的示例中,您可以在实例化KafkaProducer时使用value_serializer关键字参数:

>>> import json
>>> producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'))
>>> producer.send('my-topic', msg)

Alternatively you could just serialize it manually when you call send():

或者,您可以在调用send()时手动序列化它:

>>> producer.send('my-topic', json.dumps(msg).encode('utf-8'))

#1


7  

yaml.safe_load() returns a dict, so there are two things required to turn it into bytes -- serialize it to a string via JSON, and then encode that to bytes as UTF-8.

yaml.safe_load()返回一个dict,因此将其转换为字节需要两件事 - 通过JSON将其序列化为字符串,然后将其编码为UTF-8字节。

Taken from the examples in the kafka-python docs, you can use the value_serializer keyword argument when you instantiate the KafkaProducer:

从kafka-python文档中的示例中,您可以在实例化KafkaProducer时使用value_serializer关键字参数:

>>> import json
>>> producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'))
>>> producer.send('my-topic', msg)

Alternatively you could just serialize it manually when you call send():

或者,您可以在调用send()时手动序列化它:

>>> producer.send('my-topic', json.dumps(msg).encode('utf-8'))