使用spark shell进行交互式分析
上传一个文件到hdfs上的如下目录/user/hdfs/
-bash-4.1$ hadoop fs -put README.md /user/hdfs/
-bash-4.1$ hadoop fs -ls /user/hdfs
Found 3 items
drwxr-xr-x - hdfs supergroup 0 2016-08-21 15:34 /user/hdfs/.sparkStaging
drwx------ - hdfs supergroup 0 2016-08-20 13:12 /user/hdfs/.staging
-rw-r--r-- 3 hdfs supergroup 3233 2016-08-21 15:36 /user/hdfs/README.md
然后进入spark-shell命令行交互界面
[root@hadoop01 ~]# su - hdfs计算文件行数
-bash-4.1$ spark-shell
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.5.0-cdh5.5.4
/_/
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_79)
Type in expressions to have them evaluated.
Type :help for more information.
16/08/21 15:34:25 WARN MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set.
Spark context available as sc (master = yarn-client, app id = application_1471668978254_0006).
SQL context available as sqlContext.
scala> val textFile = sc.textFile("README.md")
textFile: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at textFile at <console>:21
scala> textFile.count()
res1: Long = 99
输出文件第一行内容
scala> textFile.first()
res2: String = # Highlight.js
计算包含某字符的行数
scala> textFile.filter(line => line.contains("##")).count
res3: Long = 5
接下来我们造一个文件来做词频统计用
文件内容如下:
-bash-4.1$ cat test.txt
张三 张四
张三 张五
李三 李三
李四 李四
李四 王二
老王 老王
上传文件
-bash-4.1$ hadoop fs -put test.txt /user/hdfs/做词频统计
scala> val wc = sc.textFile("test.txt")进行map reduce
wc: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[6] at textFile at <console>:21
scala> val result = wc.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
result: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[9] at reduceByKey at <console>:23
scala> result.collect()res5: Array[(String, Int)] = Array((张五,1), (老王,2), (张三,2), (张四,1), (王二,1), (李四,3), (李三,2))