如何在Java中正确集成spark流spark-2.1.0和kafka 2.11-0.10.2.0 ?

时间:2022-03-09 20:52:23

I tried using spark streaming to process kafka messages,followed this wiki https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html and my code is below:

我尝试使用spark流来处理kafka消息,遵循这个wiki https://spark.apache.org/docs/latest/streaming-kafka-0-10集成。html和我的代码如下:

SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKafkaWordCount").setMaster("spark://sl:7077");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(10));
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "10.0.1.5:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "group1");
kafkaParams.put("auto.offset.reset", "earliest");
kafkaParams.put("enable.auto.commit", false);
Collection<String> topics = Collections.singletonList("test");
final JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(jssc,
            LocationStrategies.PreferConsistent(),
            ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));
    stream.print();

after submit, it returns :

提交后返回:

17/04/05 22:43:10 INFO SparkContext: Starting job: print at JavaDirectKafkaWordCount.java:47
17/04/05 22:43:10 INFO DAGScheduler: Got job 0 (print at JavaDirectKafkaWordCount.java:47) with 1 output partitions
17/04/05 22:43:10 INFO DAGScheduler: Final stage: ResultStage 0 (print at JavaDirectKafkaWordCount.java:47)
17/04/05 22:43:10 INFO DAGScheduler: Parents of final stage: List()
17/04/05 22:43:10 INFO DAGScheduler: Missing parents: List()
17/04/05 22:43:10 INFO DAGScheduler: Submitting ResultStage 0 (KafkaRDD[0]     at createDirectStream at JavaDirectKafkaWordCount.java:44), which has no missing parents
17/04/05 22:43:10 INFO MemoryStore: Block broadcast_0 stored as values in     memory (estimated size 2.3 KB, free 366.3 MB)
17/04/05 22:43:10 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1529.0 B, free 366.3 MB)
17/04/05 22:43:10 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 10.245.226.155:15258 (size: 1529.0 B, free: 366.3 MB)
17/04/05 22:43:10 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:996
17/04/05 22:43:10 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (KafkaRDD[0] at createDirectStream at     JavaDirectKafkaWordCount.java:44)
17/04/05 22:43:10 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
17/04/05 22:43:10 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Registered executor NettyRpcEndpointRef(null) (10.245.226.155:53448) with ID 0
17/04/05 22:43:10 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 10.245.226.155, executor 0, partition 0, PROCESS_LOCAL, 7295 bytes)
17/04/05 22:43:10 INFO BlockManagerMasterEndpoint: Registering block manager 10.245.226.155:14669 with 366.3 MB RAM, BlockManagerId(0, 10.245.226.155, 14669, None)
17/04/05 22:43:10 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Registered executor NettyRpcEndpointRef(null) (10.245.226.155:53447) with ID 1
17/04/05 22:43:10 INFO BlockManagerMasterEndpoint: Registering block manager 10.245.226.155:33754 with 366.3 MB RAM, BlockManagerId(1, 10.245.226.155, 33754, None)
17/04/05 22:43:11 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, 10.245.226.155, executor 0): java.lang.NullPointerException
at org.apache.spark.util.Utils$.decodeFileNameInURI(Utils.scala:409)
at org.apache.spark.util.Utils$.fetchFile(Utils.scala:434)
at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:508)
at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:500)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:500)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:257)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

can someone help on it? thanks very much.

有人能帮忙吗?非常感谢。

1 个解决方案

#1


0  

Can you provide the parameters passed to spark-submit?

能否提供传递给spark-submit的参数?

You might have passed a jar-file name instead of an absolute path to the jar file. The class org.apache.spark.executor.Executor tries to load "Added Jars" and "Added Files" in updateDependencies method, but the URI path is not as presumed by spark.

您可能传递了jar文件名而不是jar文件的绝对路径。类org.apache.spark.executor。Executor尝试在updateDependencies方法中加载“添加的jar”和“添加的文件”,但是spark并不假定URI路径。

#1


0  

Can you provide the parameters passed to spark-submit?

能否提供传递给spark-submit的参数?

You might have passed a jar-file name instead of an absolute path to the jar file. The class org.apache.spark.executor.Executor tries to load "Added Jars" and "Added Files" in updateDependencies method, but the URI path is not as presumed by spark.

您可能传递了jar文件名而不是jar文件的绝对路径。类org.apache.spark.executor。Executor尝试在updateDependencies方法中加载“添加的jar”和“添加的文件”,但是spark并不假定URI路径。