kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.

时间:2022-05-12 14:33:11

  今天在写kafka生产者生成数据的程序并运行时,报如下错误:

log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties).
log4j:WARN Please initialize the log4j system properly.
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
at kafka.producer.Producer.send(Producer.scala:76)
at kafka.javaapi.producer.Producer.send(Producer.scala:33)
at ProducerTest.main(TestProducer.java:21)

问题解决方案:

1.检查目录:C:\Windows\System32\drivers\etc下的hosts文件,看IP地址与主机名是否配置正确。经常,发现没有正确配置,修改后再运行,正常。

192.168.2.212 hadoop01
192.168.2.224 hadoop02
192.168.2.226 hadoop03

2.把config文件夹下的server.properties文件里#host.name=localhost的注释去掉,然后运行

3.把config文件夹下的server.properties中的以下两个属性

zookeeper.connect=localhost:2181改成zookeeper.connect=192.168.2.212:2181

以及默认注释掉的 
#host.name=localhost改成host.name=192.168.2.212
 
楼主用的是第一种方案
---------------------------------生产者代码如下-----------------------------------------------------
import java.util.*;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig; public class ProducerDemo3 {
public static void main(String[] args) {
long events = 400;
Random rnd = new Random(); Properties props = new Properties();
props.put("metadata.broker.list", "192.168.2.212:9092,192.168.224:9092,192.168.226:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("partitioner.class", "com.vrv.kafka.SimplePartitioner");
props.put("request.required.acks", "1"); ProducerConfig config = new ProducerConfig(props); Producer<String, String> producer = new Producer<String, String>(config);
long start=System.currentTimeMillis();
for (long nEvents = 0; nEvents < events; nEvents++) {
String ip1 = "192.168.2." + rnd.nextInt(255);
String ip2 = "192.168.2." + rnd.nextInt(255);
String ip3 = "192.168.2." + rnd.nextInt(255);
String msg = ip1 + " " + ip2 + " " + ip3 ;
KeyedMessage<String, String> data = new KeyedMessage<String, String>("test", ip1, msg);
producer.send(data);
}
producer.close();
System.out.println("耗时:" + (System.currentTimeMillis() - start)/1000);
}
}