背景:一个小应用,用celery下发任务,任务内容为kafka生产一些数据。
问题:使用confluent_kafka模块时,单独启用kafka可以正常生产消息,但是套上celery后,kafka就无法将新消息生产到topic队列中了。
解决:换了个pykafka模块,结果问题就没有了。
我很疑惑啊,是我调用confluent_kafka的方法不对吗,怎么套上celery就不行了呢?
可以用的pykafka代码:
tasks.py
from celery import Celery
from pykafka import KafkaClient
import json app = Celery('tasks', backend='amqp', broker='amqp://xxx:xxxxxx@localhost/xxxhost') @app.task
def produce():
client = KafkaClient(hosts="localhost:9092")
print client.topics
topic = client.topics['test_result']
with topic.get_sync_producer() as producer:
for i in range(3):
data = {"info": {"ip": "1.2.3.4", "port": i}, "type": "test", "task_id": "test_celery_kafka"}
print('Producing message: %s' % data)
producer.produce(json.dumps(data))
print "finish produce"
producer.stop()
print "stop"
run_worker.py
from tasks import produce for i in range(1000):
result = produce.delay()
print result.status
无法正常生产数据的confluent_kafka代码:
tasks.py
from celery import Celery
from kafka_producer import p
import json app = Celery('tasks', backend='amqp', broker='amqp://xxx:xxxxxx@localhost/xxxhost') @app.task
def produce():
for i in range(3000):
data = {"info": {"ip": "1.2.3.4"}, "type": "test", "task_id": "test_celery_kafka"}
print('Producing message: %s' % data)
p.produce('test_result3', json.dumps(data))
print "finish produce"
p.flush()
print "finish flush"
run_worker.py
from tasks import produce
result = produce.delay()
print result.status
print result.ready()
print result.get()
print result.status