Eclipse提交代码到Spark集群上运行

时间:2022-09-12 11:38:52

Spark集群master节点:      192.168.168.200

Eclipse运行windows主机: 192.168.168.100

场景:

为了测试在Eclipse上开发的代码在Spark集群上运行的情况,比如:内存、cores、stdout以及相应的变量传递是否正常!

生产环境是把在Eclipse上开发的代码打包放到Spark集群上,然后使用spark-submit提交运行。当然我们也可以启动远程调试,

但是这样就会造成每次测试代码,我们都需要把jar包复制到Spark集群机器上,十分的不方便。因此,我们希望能够在Eclipse直接

模拟spark-submit提交程序运行,便于调试!

一、准备words.txt文件

words.txt :

  1. HelloHadoop
  2. HelloBigData
  3. HelloSpark
  4. HelloFlume
  5. HelloKafka

上传到HDFS文件系统中,如图:

Eclipse提交代码到Spark集群上运行

二、创建Spark测试类

  1. package com.spark.test;
  2. import java.util.Arrays;
  3. import java.util.Iterator;
  4. import org.apache.spark.SparkConf;
  5. import org.apache.spark.api.java.JavaPairRDD;
  6. import org.apache.spark.api.java.JavaRDD;
  7. import org.apache.spark.api.java.JavaSparkContext;
  8. import org.apache.spark.api.java.function.FlatMapFunction;
  9. import org.apache.spark.api.java.function.Function2;
  10. import org.apache.spark.api.java.function.PairFunction;
  11. import org.apache.spark.api.java.function.VoidFunction;
  12. import scala.Tuple2;
  13. publicclassJavaWordCount{
  14. publicstaticvoid main(String[] args){
  15. SparkConf sparkConf =newSparkConf().setAppName("JavaWordCount").setMaster("local[2]");
  16. JavaSparkContext jsc =newJavaSparkContext(sparkConf);
  17. JavaRDD<String> lines = jsc.textFile("hdfs://192.168.168.200:9000/test/words.txt");
  18. JavaRDD<String> words = lines.flatMap(newFlatMapFunction<String,String>(){
  19. publicIterator<String> call(String line){
  20. returnArrays.asList(line.split(" ")).iterator();
  21. }
  22. });
  23. JavaPairRDD<String,Integer> pairs = words.mapToPair(newPairFunction<String,String,Integer>(){
  24. publicTuple2<String,Integer> call(String word)throwsException{
  25. returnnewTuple2<String,Integer>(word,1);
  26. }
  27. });
  28. JavaPairRDD<String,Integer> wordCount = pairs.reduceByKey(newFunction2<Integer,Integer,Integer>(){
  29. publicInteger call(Integer v1,Integer v2)throwsException{
  30. return v1 + v2;
  31. }
  32. });
  33. wordCount.foreach(newVoidFunction<Tuple2<String,Integer>>(){
  34. publicvoid call(Tuple2<String,Integer> pairs)throwsException{
  35. System.out.println(pairs._1()+":"+ pairs._2());
  36. }
  37. });
  38. jsc.close();
  39. }
  40. }

日志输出:

Eclipse提交代码到Spark集群上运行

访问spark的web ui : http://192.168.168.200:8080

Eclipse提交代码到Spark集群上运行

从中看出spark的master地址为: spark://master:7077

  1. SparkConf sparkConf =newSparkConf().setAppName("JavaWordCount").setMaster("local[2]");​

修改为:

  1. SparkConf sparkConf =newSparkConf().setAppName("JavaWordCount").setMaster("spark://192.168.168.200:7077");

运行,发现会有报org.apache.spark.SparkException的错:

  1. Exceptionin thread "main" org.apache.spark.SparkException:Job aborted due to stage failure:Task1in stage 0.0 failed 4 times, most recent failure:Lost task 1.3in stage 0.0(TID 6,192.168.168.200): java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seqin instance of org.apache.spark.rdd.MapPartitionsRDD
  2. at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2083)
  3. at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261)
  4. at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1996)
  5. at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
  6. at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
  7. at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
  8. at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
  9. at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
  10. at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
  11. at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
  12. at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
  13. at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
  14. at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
  15. at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:71)
  16. at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
  17. at org.apache.spark.scheduler.Task.run(Task.scala:86)
  18. at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
  19. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
  20. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
  21. at java.lang.Thread.run(Thread.java:745)
  22. Driver stacktrace:
  23. at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)
  24. at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)
  25. at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)
  26. at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  27. at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  28. at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)
  29. at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
  30. at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
  31. at scala.Option.foreach(Option.scala:257)
  32. at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
  33. at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)
  34. at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)
  35. at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)
  36. at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
  37. at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
  38. at org.apache.spark.SparkContext.runJob(SparkContext.scala:1873)
  39. at org.apache.spark.SparkContext.runJob(SparkContext.scala:1886)
  40. at org.apache.spark.SparkContext.runJob(SparkContext.scala:1899)
  41. at org.apache.spark.SparkContext.runJob(SparkContext.scala:1913)
  42. at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:894)
  43. at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:892)
  44. at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  45. at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  46. at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
  47. at org.apache.spark.rdd.RDD.foreach(RDD.scala:892)
  48. at org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:350)
  49. at org.apache.spark.api.java.AbstractJavaRDDLike.foreach(JavaRDDLike.scala:45)
  50. at com.spark.test.JavaWordCount.main(JavaWordCount.java:39)
  51. Causedby: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seqin instance of org.apache.spark.rdd.MapPartitionsRDD
  52. at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2083)
  53. at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261)
  54. at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1996)
  55. at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
  56. at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
  57. at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
  58. at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
  59. at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
  60. at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
  61. at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
  62. at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
  63. at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
  64. at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
  65. at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:71)
  66. at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
  67. at org.apache.spark.scheduler.Task.run(Task.scala:86)
  68. at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
  69. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
  70. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
  71. at java.lang.Thread.run(Thread.java:745)

在网上找到的解决办法是配置jar包的路径即可,先用maven install把程序打包成jar包,然后setJars方法。

  1. SparkConf sparkConf =newSparkConf().setAppName("JavaWordCount").setMaster("spark://192.168.168.200:7077");
  2. String[] jars ={"I:\\TestSpark\\target\\TestSpark-0.0.1-jar-with-dependencies.jar"};
  3. sparkConf.setJars(jars);

最终源码如下:

  1. package com.spark.test;
  2. import java.util.Arrays;
  3. import java.util.Iterator;
  4. import org.apache.spark.SparkConf;
  5. import org.apache.spark.api.java.JavaPairRDD;
  6. import org.apache.spark.api.java.JavaRDD;
  7. import org.apache.spark.api.java.JavaSparkContext;
  8. import org.apache.spark.api.java.function.FlatMapFunction;
  9. import org.apache.spark.api.java.function.Function2;
  10. import org.apache.spark.api.java.function.PairFunction;
  11. import org.apache.spark.api.java.function.VoidFunction;
  12. import scala.Tuple2;
  13. publicclassJavaWordCount{
  14. publicstaticvoid main(String[] args){
  15. SparkConf sparkConf =newSparkConf().setAppName("JavaWordCount").setMaster("spark://192.168.168.200:7077");
  16. String[] jars ={"I:\\TestSpark\\target\\TestSpark-0.0.1-jar-with-dependencies.jar"};
  17. sparkConf.setJars(jars);
  18. JavaSparkContext jsc =newJavaSparkContext(sparkConf);
  19. JavaRDD<String> lines = jsc.textFile("hdfs://192.168.168.200:9000/test/words.txt");
  20. JavaRDD<String> words = lines.flatMap(newFlatMapFunction<String,String>(){
  21. publicIterator<String> call(String line){
  22. returnArrays.asList(line.split(" ")).iterator();
  23. }
  24. });
  25. JavaPairRDD<String,Integer> pairs = words.mapToPair(newPairFunction<String,String,Integer>(){
  26. publicTuple2<String,Integer> call(String word)throwsException{
  27. returnnewTuple2<String,Integer>(word,1);
  28. }
  29. });
  30. JavaPairRDD<String,Integer> wordCount = pairs.reduceByKey(newFunction2<Integer,Integer,Integer>(){
  31. publicInteger call(Integer v1,Integer v2)throwsException{
  32. return v1 + v2;
  33. }
  34. });
  35. wordCount.foreach(newVoidFunction<Tuple2<String,Integer>>(){
  36. publicvoid call(Tuple2<String,Integer> pairs)throwsException{
  37. System.out.println(pairs._1()+":"+ pairs._2());
  38. }
  39. });
  40. jsc.close();
  41. }
  42. }

运行正常,没有出现报错!

Eclipse提交代码到Spark集群上运行

查看stdout是否统计正确:

Eclipse提交代码到Spark集群上运行Eclipse提交代码到Spark集群上运行

至此,你可以很方便的在Eclipse上开发调试你的代码啦!

Eclipse提交代码到Spark集群上运行的更多相关文章

  1. 将java开发的wordcount程序提交到spark集群上运行

    今天来分享下将java开发的wordcount程序提交到spark集群上运行的步骤. 第一个步骤之前,先上传文本文件,spark.txt,然用命令hadoop fs -put spark.txt /s ...

  2. &lbrack;Spark Core&rsqb; 在 Spark 集群上运行程序

    0. 说明 将 IDEA 下的项目导出为 Jar 包,部署到 Spark 集群上运行. 1. 打包程序 1.0 前提 搭建好 Spark 集群,完成代码的编写. 1.1 修改代码 [添加内容,判断参数 ...

  3. IntelliJ IDEA编写的spark程序在远程spark集群上运行

    准备工作 需要有三台主机,其中一台主机充当master,另外两台主机分别为slave01,slave02,并且要求三台主机处于同一个局域网下 通过命令:ifconfig 可以查看主机的IP地址,如下图 ...

  4. 有关python numpy pandas scipy 等 能在YARN集群上 运行PySpark

    有关这个问题,似乎这个在某些时候,用python写好,且spark没有响应的算法支持, 能否能在YARN集群上 运行PySpark方式, 将python分析程序提交上去? Spark Applicat ...

  5. Spark学习之在集群上运行Spark

    一.简介 Spark 的一大好处就是可以通过增加机器数量并使用集群模式运行,来扩展程序的计算能力.好在编写用于在集群上并行执行的 Spark 应用所使用的 API 跟本地单机模式下的完全一样.也就是说 ...

  6. 在集群上运行Spark

    Spark 可以在各种各样的集群管理器(Hadoop YARN.Apache Mesos,还有Spark 自带的独立集群管理器)上运行,所以Spark 应用既能够适应专用集群,又能用于共享的云计算环境 ...

  7. 06、部署Spark程序到集群上运行

    06.部署Spark程序到集群上运行 6.1 修改程序代码 修改文件加载路径 在spark集群上执行程序时,如果加载文件需要确保路径是所有节点能否访问到的路径,因此通常是hdfs路径地址.所以需要修改 ...

  8. MapReduce编程入门实例之WordCount:分别在Eclipse和Hadoop集群上运行

    上一篇博文如何在Eclipse下搭建Hadoop开发环境,今天给大家介绍一下如何分别分别在Eclipse和Hadoop集群上运行我们的MapReduce程序! 1. 在Eclipse环境下运行MapR ...

  9. Spark学习之在集群上运行Spark(6)

    Spark学习之在集群上运行Spark(6) 1. Spark的一个优点在于可以通过增加机器数量并使用集群模式运行,来扩展程序的计算能力. 2. Spark既能适用于专用集群,也可以适用于共享的云计算 ...

随机推荐

  1. 安装 openSUSE Leap 42&period;1 之后要做的 8 件事

    导读 openSUSE Leap 确实是个巨大的飞跃,它允许用户运行一个和 SUSE Linux 企业版拥有同样基因的发行版.和其它系统一样,为了实现最佳的使用效果,在使用它之前需要做些优化设置. 下 ...

  2. php语法检查方法——命令行模式和代码形式

    1. 命令行形式 php -l /path/to/file.php 2. php代码形式 function php_syntax_check($file){ $code = file_get_cont ...

  3. MySQL如何有效地创建基于 INNODB 引擎的表

    2016-05-27 赵伟 数据库开发者 有用户问我们为什么下面这个建表语句会执行失败,报错是 "Row size too large ...."下面我就以这个例子出发讲一讲使用m ...

  4. &lbrack;清华集训&rsqb;小 Y 和恐怖的奴隶主

    题面在这里 题意 有一个\(Boss\)和他血量为\(m\)的随从奴隶主,每当奴隶主受到攻击且不死,并且\(Boss\)的随从个数\(<k\)时,就会新召唤一个血量为\(m\)的奴隶主.每次攻击 ...

  5. 在线OJ使用总结&lpar;acm&rpar;

    赛码网OJ规则 用readLine()代替read_line() 用readLine()代替read_line() 用readLine()代替read_line() 用readLine()代替read ...

  6. Javascript小问题

    1.原生对象克隆 var clone = function(obj) { var o; if (typeof obj == "object") { if (obj === null ...

  7. day058 聚合 分组查询 自定义标签过滤器 外部调用django环境 事务和锁

    1.聚合(aggregate) 聚合的主要语法: from django.db.models import Avg , Max , Min , Count models.类名 .objects.all ...

  8. 卷积&lpar;转自wiki百科)

    *,*的百科全书 图示两个方形脉冲波的卷积.其中函数 "g" 首先对  反射,接着平移 "t" ,成为  .那么重叠部份的面积就相当于 "t& ...

  9. Luogu4221 WC2018州区划分(状压dp&plus;FWT)

    合法条件为所有划分出的子图均不存在欧拉回路或不连通,也即至少存在一个度数为奇数的点或不连通.显然可以对每个点集预处理是否合法,然后就不用管这个奇怪的条件了. 考虑状压dp.设f[S]为S集合所有划分方 ...

  10. 第二十二篇:基于UDP的一对回射客户&sol;服务器程序

    前言 之前曾经学习过一对回射客户/服务器程序的例子,不过那个是基于TCP协议的.本文将讲解另一对回射客户/服务器程序,该程序基于UDP协议. 由于使用的协议不同,因此编写出的程序也有本质上的区别,应将 ...