概述
本系列文章是个人的spark学习笔记。
开发环境搭建
说明
项目名 | Value | 备注 |
---|---|---|
OS | window10 64bit | |
JDK | 1.8+ | |
scala | 2.10.4 | 注意,scala上下版本的兼容性很差,选对版本很重要! |
IDE | IntellIJ IDEA 15 | |
Spark | 1.6.2 | 可以根据实际情况进行选择 |
- 这里不描述怎么安装或者下载需要用到的组件
- 需要先安装IntelliJ的两个插件:scala、sbt,安装完重启IDE
编写DEMO
新建项目
我创建的是maven项目,然后引用spark的jar。
然后引入spark的jar包
引入spark-assembly-1.6.2-hadoop2.6.0.jar即可。
然后就能编写代码了。
WordCount Demo
现在我们要做的是使用spark来计算下面一篇短文中单词出现的频率。
i am a slow walker,but i never walk backward.
when i thought I could not go on,i forced myself to keep going.
though the future is scary,you can’t just run to the past because it is familiar.
always remember,a man is not old as long as he’s seeking something.
a man is not old until regrets take the place of dreams.
为了方便运行,我们新建一个简单的spark启动器,负责运行我们的程序:
public class AppConsole {
private static SparkConf sparkConf;
private static SparkContext sparkContext;
public static void init(){
if(sparkContext==null){
if(SystemUtils.IS_OS_WINDOWS)
System.setProperty("hadoop.home.dir", getContentPath());
sparkConf = new SparkConf().setAppName("Nerve Spark Console").setMaster("local[*]");
sparkContext = new SparkContext(sparkConf);
System.out.println("Spark 环境配置完成!");
}
}
public static void exit(){
if(sparkContext!=null){
if(!sparkContext.isStopped())
sparkContext.stop();
}
}
}
然后编写我们的WordCount:
import org.apache.spark.SparkContext
/**
* Created by zengxm on 2016/6/30.
*/
class FileWordCountModeling(sparkContext: SparkContext) extends CommonModeling(sparkContext){
/**
* 对指定文件进行统计
* @param filePath
*/
def compute(filePath:String)={
val textFile=sparkContext.textFile(filePath).cache()
// Create the FileInputDStream on the directory and use the
val words = textFile.flatMap(_.split(" "))
val wordCounts=words.map(x=>(x,1)).reduceByKey(_+_)
wordCounts.foreach(d=>println(d._1+"\t\t"+d._2))
println("total line on %s is %d".format(filePath, textFile.count()))
}
}
其中CommonModeling
是一个空的class
,不做任何事情。
然后在AppConsole中执行:
FileWordCountModeling fileModeling=new FileWordCountModeling(sparkContext);
fileModeling.compute("文件路径");
运行结果:
on,i 1
keep 1
is 4
not 3
long 1
it 1
past 1
familiar. 1
slow 1
because 1
am 1
never 1
when 1
until 1
something. 1
as 2
going. 1
regrets 1
seeking 1
just 1
man 2
run 1
go 1
a 2
myself 1
dreams. 1
backward. 1
remember,a 1
old 2
i 3
could 1
future 1
I 1
walker,but 1
though 1
to 2
of 1
place 1
forced 1
scary,you 1
take 1
can't 1
always 1
walk 1
thought 1
he's 1
the 3
total line on _data/articles/i_am_just_a_slow_walker.txt is 5
相关Error
NoSuchMethodError: scala.collection.immutable.HashSet$.empty
Exception in thread "main" java.lang.NoSuchMethodError: scala.collection.immutable.HashSet$.empty()Lscala/collection/immutable/HashSet;
at akka.actor.ActorCell$.<init>(ActorCell.scala:336)
at akka.actor.ActorCell$.<clinit>(ActorCell.scala)
at akka.actor.RootActorPath.$div(ActorPath.scala:185)
at akka.actor.LocalActorRefProvider.<init>(ActorRefProvider.scala:465)
at akka.remote.RemoteActorRefProvider.<init>(RemoteActorRefProvider.scala:124)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:408)
at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$2.apply(DynamicAccess.scala:78)
at scala.util.Try$.apply(Try.scala:191)
at akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:73)
at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)
at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)
at scala.util.Success.flatMap(Try.scala:230)
at akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:84)
at akka.actor.ActorSystemImpl.liftedTree1$1(ActorSystem.scala:585)
at akka.actor.ActorSystemImpl.<init>(ActorSystem.scala:578)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:142)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:119)
出现这个错误一般是scala版本问题,我一开始用的是2.11.4,换成2.10.4后问题解决。
java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries
在window平台下会出现这个问题,解决方法:Apache Spark checkpoint issue on windows