1.建立spakTesk项目,建立scala对象Test
2.Tesk对象的代码如下
package sparkTest /** * Created by jiahong on 15-8-2. */ import org.apache.spark.{SparkConf,SparkContext} object Test { def main(args: Array[String]) { if (args.length < 1) { System.err.println("Usage: <file>") System.exit(1) } val conf=new SparkConf().setAppName("Test").setMaster("local") val sc=new SparkContext(conf) val rdd=sc.textFile("/home/jiahong/sparkWorkSpace/input") //统计单词个数,然后按个数从高到低排序 val result=rdd.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).map(x=>(x._2,x._1)).sortByKey(false).map(x=>(x._2,x._1)) result.saveAsTextFile("/home/jiahong/sparkWorkSpace/output") print(result) } }
本地测试hive的话,代码如下:
package sparkTest.sparkSql import org.apache.spark.sql.hive.HiveContext import org.apache.spark.{SparkContext, SparkConf} object HiveSqlTest { def main(args: Array[String]) { val conf = new SparkConf().setAppName("HiveLink").setMaster("spark://JIAs-Mac.local:7077"). setJars(Array("/Users/JIA/Desktop/jar/hiveTest/sparkTest.jar")) val sc = new SparkContext(conf) val sqlContext = new HiveContext(sc) val sql = "select * from Test limit 100" sqlContext.sql(sql).map(s => s(0) + "," + s(1) + "," + s(2) + "," + s(3)+","+s(4)).collect().foreach(println) } }
注意:需要把hive-site.xml放到项目目录下,新建Resources设置为Resources root
3.设置本地运行,在IDEA的右上角-点开Edit Configurations
4.设置本地运行,在Vm options:上填写:-Dspark.master=local ,Program arguments上填写:local
5.点击run运行,run前先开启本机的spark
/usr/lib/jdk/jdk1.7.0_79/bin/java -Dspark.master=local -Didea.launcher.port=7532 -Didea.launcher.bin.path=/home/jiahong/idea-IC-141.1532.4/bin -Dfile.encoding=UTF-8 -classpath /usr/lib/jdk/jdk1.7.0_79/jre/lib/resources.jar:/usr/lib/jdk/jdk1.7.0_79/jre/lib/jfxrt.jar:/usr/lib/jdk/jdk1.7.0_79/jre/lib/charsets.jar:/usr/lib/jdk/jdk1.7.0_79/jre/lib/jsse.jar:/usr/lib/jdk/jdk1.7.0_79/jre/lib/rt.jar:/usr/lib/jdk/jdk1.7.0_79/jre/lib/plugin.jar:/usr/lib/jdk/jdk1.7.0_79/jre/lib/deploy.jar:/usr/lib/jdk/jdk1.7.0_79/jre/lib/jfr.jar:/usr/lib/jdk/jdk1.7.0_79/jre/lib/javaws.jar:/usr/lib/jdk/jdk1.7.0_79/jre/lib/management-agent.jar:/usr/lib/jdk/jdk1.7.0_79/jre/lib/jce.jar:/usr/lib/jdk/jdk1.7.0_79/jre/lib/ext/zipfs.jar:/usr/lib/jdk/jdk1.7.0_79/jre/lib/ext/dnsns.jar:/usr/lib/jdk/jdk1.7.0_79/jre/lib/ext/sunec.jar:/usr/lib/jdk/jdk1.7.0_79/jre/lib/ext/sunjce_provider.jar:/usr/lib/jdk/jdk1.7.0_79/jre/lib/ext/sunpkcs11.jar:/usr/lib/jdk/jdk1.7.0_79/jre/lib/ext/localedata.jar:/home/jiahong/IdeaProjects/sparkTest/out/production/sparkTest:/home/jiahong/apache/spark-1.3.1-bin-hadoop2.6/lib/spark-assembly-1.3.1-hadoop2.6.0.jar:/home/jiahong/apache/scala-2.10.4/lib/scala-actors-migration.jar:/home/jiahong/apache/scala-2.10.4/lib/scala-reflect.jar:/home/jiahong/apache/scala-2.10.4/lib/scala-actors.jar:/home/jiahong/apache/scala-2.10.4/lib/scala-swing.jar:/home/jiahong/apache/scala-2.10.4/lib/scala-library.jar:/home/jiahong/idea-IC-141.1532.4/lib/idea_rt.jar com.intellij.rt.execution.application.AppMain sparkTest.Test local Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 15/08/02 10:58:14 INFO SparkContext: Running Spark version 1.3.1 15/08/02 10:58:14 WARN Utils: Your hostname, jiahong-OptiPlex-7010 resolves to a loopback address: 127.0.1.1; using 192.168.199.187 instead (on interface eth0) 15/08/02 10:58:14 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address 15/08/02 10:58:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/08/02 10:58:15 INFO SecurityManager: Changing view acls to: jiahong 15/08/02 10:58:15 INFO SecurityManager: Changing modify acls to: jiahong 15/08/02 10:58:15 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(jiahong); users with modify permissions: Set(jiahong) 15/08/02 10:58:15 INFO Slf4jLogger: Slf4jLogger started 15/08/02 10:58:15 INFO Remoting: Starting remoting 15/08/02 10:58:15 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@jiahong-OptiPlex-7010.lan:37917] 15/08/02 10:58:15 INFO Utils: Successfully started service 'sparkDriver' on port 37917. 15/08/02 10:58:15 INFO SparkEnv: Registering MapOutputTracker 15/08/02 10:58:15 INFO SparkEnv: Registering BlockManagerMaster 15/08/02 10:58:15 INFO DiskBlockManager: Created local directory at /tmp/spark-a2cbde0d-0951-4a95-80df-a99a14127efc/blockmgr-3cbdae80-810a-4ecf-b012-0979b3d714d0 15/08/02 10:58:15 INFO MemoryStore: MemoryStore started with capacity 469.5 MB 15/08/02 10:58:15 INFO HttpFileServer: HTTP File server directory is /tmp/spark-67629167-df98-4e7e-afa1-4dd36b655012/httpd-28cb8de9-caa4-4600-9704-347cea890b07 15/08/02 10:58:15 INFO HttpServer: Starting HTTP Server 15/08/02 10:58:15 INFO Server: jetty-8.y.z-SNAPSHOT 15/08/02 10:58:15 INFO AbstractConnector: Started SocketConnector@0.0.0.0:50336 15/08/02 10:58:15 INFO Utils: Successfully started service 'HTTP file server' on port 50336. 15/08/02 10:58:15 INFO SparkEnv: Registering OutputCommitCoordinator 15/08/02 10:58:15 INFO Server: jetty-8.y.z-SNAPSHOT 15/08/02 10:58:15 INFO AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040 15/08/02 10:58:15 INFO Utils: Successfully started service 'SparkUI' on port 4040. 15/08/02 10:58:15 INFO SparkUI: Started SparkUI at http://jiahong-OptiPlex-7010.lan:4040 15/08/02 10:58:15 INFO Executor: Starting executor ID <driver> on host localhost 15/08/02 10:58:15 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@jiahong-OptiPlex-7010.lan:37917/user/HeartbeatReceiver 15/08/02 10:58:16 INFO NettyBlockTransferService: Server created on 40115 15/08/02 10:58:16 INFO BlockManagerMaster: Trying to register BlockManager 15/08/02 10:58:16 INFO BlockManagerMasterActor: Registering block manager localhost:40115 with 469.5 MB RAM, BlockManagerId(<driver>, localhost, 40115) 15/08/02 10:58:16 INFO BlockManagerMaster: Registered BlockManager 15/08/02 10:58:16 INFO MemoryStore: ensureFreeSpace(182921) called with curMem=0, maxMem=492337889 15/08/02 10:58:16 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 178.6 KB, free 469.4 MB) 15/08/02 10:58:16 INFO MemoryStore: ensureFreeSpace(25432) called with curMem=182921, maxMem=492337889 15/08/02 10:58:16 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 24.8 KB, free 469.3 MB) 15/08/02 10:58:16 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:40115 (size: 24.8 KB, free: 469.5 MB) 15/08/02 10:58:16 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0 15/08/02 10:58:16 INFO SparkContext: Created broadcast 0 from textFile at Test.scala:17 15/08/02 10:58:16 INFO FileInputFormat: Total input paths to process : 1 MapPartitionsRDD[7] at map at Test.scala:20 15/08/02 10:58:16 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id 15/08/02 10:58:16 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id 15/08/02 10:58:16 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap 15/08/02 10:58:16 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition 15/08/02 10:58:16 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id 15/08/02 10:58:16 INFO SparkContext: Starting job: saveAsTextFile at Test.scala:24 15/08/02 10:58:16 INFO DAGScheduler: Registering RDD 3 (map at Test.scala:20) 15/08/02 10:58:16 INFO DAGScheduler: Registering RDD 5 (map at Test.scala:20) 15/08/02 10:58:16 INFO DAGScheduler: Got job 0 (saveAsTextFile at Test.scala:24) with 1 output partitions (allowLocal=false) 15/08/02 10:58:16 INFO DAGScheduler: Final stage: Stage 2(saveAsTextFile at Test.scala:24) 15/08/02 10:58:16 INFO DAGScheduler: Parents of final stage: List(Stage 1) 15/08/02 10:58:16 INFO DAGScheduler: Missing parents: List(Stage 1) 15/08/02 10:58:16 INFO DAGScheduler: Submitting Stage 0 (MapPartitionsRDD[3] at map at Test.scala:20), which has no missing parents 15/08/02 10:58:16 INFO MemoryStore: ensureFreeSpace(3640) called with curMem=208353, maxMem=492337889 15/08/02 10:58:16 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 3.6 KB, free 469.3 MB) 15/08/02 10:58:16 INFO MemoryStore: ensureFreeSpace(2614) called with curMem=211993, maxMem=492337889 15/08/02 10:58:16 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.6 KB, free 469.3 MB) 15/08/02 10:58:16 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:40115 (size: 2.6 KB, free: 469.5 MB) 15/08/02 10:58:16 INFO BlockManagerMaster: Updated info of block broadcast_1_piece0 15/08/02 10:58:16 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:839 15/08/02 10:58:16 INFO DAGScheduler: Submitting 1 missing tasks from Stage 0 (MapPartitionsRDD[3] at map at Test.scala:20) 15/08/02 10:58:16 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks 15/08/02 10:58:16 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, PROCESS_LOCAL, 1301 bytes) 15/08/02 10:58:16 INFO Executor: Running task 0.0 in stage 0.0 (TID 0) 15/08/02 10:58:16 INFO HadoopRDD: Input split: file:/home/jiahong/sparkWorkSpace/input/test.txt:0+62 15/08/02 10:58:16 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2003 bytes result sent to driver 15/08/02 10:58:16 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 81 ms on localhost (1/1) 15/08/02 10:58:16 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 15/08/02 10:58:16 INFO DAGScheduler: Stage 0 (map at Test.scala:20) finished in 0.092 s 15/08/02 10:58:16 INFO DAGScheduler: looking for newly runnable stages 15/08/02 10:58:16 INFO DAGScheduler: running: Set() 15/08/02 10:58:16 INFO DAGScheduler: waiting: Set(Stage 1, Stage 2) 15/08/02 10:58:16 INFO DAGScheduler: failed: Set() 15/08/02 10:58:16 INFO DAGScheduler: Missing parents for Stage 1: List() 15/08/02 10:58:16 INFO DAGScheduler: Missing parents for Stage 2: List(Stage 1) 15/08/02 10:58:16 INFO DAGScheduler: Submitting Stage 1 (MapPartitionsRDD[5] at map at Test.scala:20), which is now runnable 15/08/02 10:58:16 INFO MemoryStore: ensureFreeSpace(3080) called with curMem=214607, maxMem=492337889 15/08/02 10:58:16 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 3.0 KB, free 469.3 MB) 15/08/02 10:58:16 INFO MemoryStore: ensureFreeSpace(2177) called with curMem=217687, maxMem=492337889 15/08/02 10:58:16 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 2.1 KB, free 469.3 MB) 15/08/02 10:58:16 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:40115 (size: 2.1 KB, free: 469.5 MB) 15/08/02 10:58:16 INFO BlockManagerMaster: Updated info of block broadcast_2_piece0 15/08/02 10:58:16 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:839 15/08/02 10:58:16 INFO DAGScheduler: Submitting 1 missing tasks from Stage 1 (MapPartitionsRDD[5] at map at Test.scala:20) 15/08/02 10:58:16 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks 15/08/02 10:58:16 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, PROCESS_LOCAL, 1045 bytes) 15/08/02 10:58:16 INFO Executor: Running task 0.0 in stage 1.0 (TID 1) 15/08/02 10:58:16 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks 15/08/02 10:58:16 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 3 ms 15/08/02 10:58:17 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1097 bytes result sent to driver 15/08/02 10:58:17 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 77 ms on localhost (1/1) 15/08/02 10:58:17 INFO DAGScheduler: Stage 1 (map at Test.scala:20) finished in 0.077 s 15/08/02 10:58:17 INFO DAGScheduler: looking for newly runnable stages 15/08/02 10:58:17 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 15/08/02 10:58:17 INFO DAGScheduler: running: Set() 15/08/02 10:58:17 INFO DAGScheduler: waiting: Set(Stage 2) 15/08/02 10:58:17 INFO DAGScheduler: failed: Set() 15/08/02 10:58:17 INFO DAGScheduler: Missing parents for Stage 2: List() 15/08/02 10:58:17 INFO DAGScheduler: Submitting Stage 2 (MapPartitionsRDD[8] at saveAsTextFile at Test.scala:24), which is now runnable 15/08/02 10:58:17 INFO MemoryStore: ensureFreeSpace(127696) called with curMem=219864, maxMem=492337889 15/08/02 10:58:17 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 124.7 KB, free 469.2 MB) 15/08/02 10:58:17 INFO MemoryStore: ensureFreeSpace(76648) called with curMem=347560, maxMem=492337889 15/08/02 10:58:17 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 74.9 KB, free 469.1 MB) 15/08/02 10:58:17 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on localhost:40115 (size: 74.9 KB, free: 469.4 MB) 15/08/02 10:58:17 INFO BlockManagerMaster: Updated info of block broadcast_3_piece0 15/08/02 10:58:17 INFO SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:839 15/08/02 10:58:17 INFO DAGScheduler: Submitting 1 missing tasks from Stage 2 (MapPartitionsRDD[8] at saveAsTextFile at Test.scala:24) 15/08/02 10:58:17 INFO TaskSchedulerImpl: Adding task set 2.0 with 1 tasks 15/08/02 10:58:17 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 2, localhost, PROCESS_LOCAL, 1056 bytes) 15/08/02 10:58:17 INFO Executor: Running task 0.0 in stage 2.0 (TID 2) 15/08/02 10:58:17 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks 15/08/02 10:58:17 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms 15/08/02 10:58:17 INFO FileOutputCommitter: Saved output of task 'attempt_201508021058_0002_m_000000_2' to file:/home/jiahong/sparkWorkSpace/output/_temporary/0/task_201508021058_0002_m_000000 15/08/02 10:58:17 INFO SparkHadoopMapRedUtil: attempt_201508021058_0002_m_000000_2: Committed 15/08/02 10:58:17 INFO Executor: Finished task 0.0 in stage 2.0 (TID 2). 1828 bytes result sent to driver 15/08/02 10:58:17 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 2) in 138 ms on localhost (1/1) 15/08/02 10:58:17 INFO DAGScheduler: Stage 2 (saveAsTextFile at Test.scala:24) finished in 0.138 s 15/08/02 10:58:17 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 15/08/02 10:58:17 INFO DAGScheduler: Job 0 finished: saveAsTextFile at Test.scala:24, took 0.483353 s MapPartitionsRDD[7] at map at Test.scala:20 Process finished with exit code 0
6.结果如下:
input目录下有个test.txt文件,内容如下
运行之后,output目录下文件如下:
注意:
一开始运行时,可能会碰到如下问题
Exception in thread "main" java.lang.NoSuchMethodError:
解决办法是,在启动你的spark时,观看scala的版本是多少,然后你在本机安装对应的版本,最后在IDEA上修改过来。我之前本机安装的是2.11.7版本,导致错误,最后查看spark的scala版本为2.10.4,我重新安装了,然后再在IDEA上修改过来,就可以正确运行了!