Spark一系列入门流程

时间:2022-09-17 23:34:46



本文的目的:利用Intellij Idea在windows搭建spark 开发环境 环境:windows 10, intellij idea2016.1.1, spark 1.6.2(HDP),hadoop2.7.3(HDP),scala 2.10.5(编译环境), sbt管理依赖 背景:spark单点运算不能解决大数据的问题的时候,就需要使用spark-submit 调用jar包的方式分布式运算,从而提高集群的运算和资源使用能力。 集群模式:本文默认使用的是spark自带的standalone模式。 [由于CSDN博客不支持截图图片大小超过2M,所以这里只有步骤没有截图说明]
步骤: 第一步 下载intellij idea2016.1.1,点击此处下载Community版本:安装 Intellij IDEA  第二步:在Intellij菜单中选择“Configure”→“Plugins”→“Browse repositories”命令,在弹出的界面中输入“Scala”搜索插件,然后点击相应安装按钮进行安装,重启Intellij使配置生效。 第三步: 点击File, 选择“Create New Project”命令,此时选择左侧列表中的“Scala”选项,为了方便以后的开发工作,选择右侧的“SBT”选项。 [差不多等1个小时后可以看到自动创建的src目录]
然后,邮件src目录,新建scala class, name 为HelloSpark,kind 修改为Object。
第四步:测试程序是否正确
object HelloSpark {
def main(args:Array[String]): Unit ={
print("Hello Spark")

}
}
右键HelloSpark.scala ,run 'HelloSpark'. 如果能看到“Hello Spark”环境没有问题。
第五步:加入Spark开发包:使用IDEA导入外部Jar包,具体步骤:“File”→“Project Structure”→“Modules”→“Dependencies”→+...→“Library...”→“Library Type(Java)”→“Select Library Files”→“Configure Library”然后点击 “apply”
spark 程序必须要把spark-assembly-1.5.0-hadoop2.3.0.jar,  jdk jar,以及相关的驱动包放入。都可以使用这样的方法。
第六步:IDEA 生成JAR 使用IDEA编译class文件,同时可以将class打包成Jar文件,方法如下:①选择菜单“File”→“Project Structure”,弹出“Project Structure”的设置对话框;②选择左边的“Artifacts”,点击上方的“+”按钮;③在弹出的对话框中选择“Jar”→“from moduls with dependencies”;④选择要启动的类,然后确定;⑤应用之后选择菜单“Build”→“Build Artifacts”,选择“Build”或者“Rebuild”后即可生成,生成的Jar文件位于工程项目目录的out/artifacts下。##这里一定要确认Artifacts->Output Layout->Main Class 一定是主类,比如HelloSpark;
##jar 的路径可以在项目路径下找到:
##..\HelloSpark\out\artifacts\HelloSpark_jar\
最后, 环境测试:为了确保环境没有问题,可以跑下面这段代码测试一下:测试代码:
object SparkPi {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Spark Pi")
conf.setMaster("spark://192.168.1.125:7077")
val spark = new SparkContext(conf)
val slices = if (args.length > 0) args(0).toInt else 2
val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow
val count = spark.parallelize(1 until n, slices).map { i =>
val x = random * 2 - 1
val y = random * 2 - 1
if (x*x + y*y < 1) 1 else 0
}.reduce(_ + _)
println("Pi is roughly " + 4.0 * count / n)
spark.stop()
}
}
如有不详之处,欢迎留言。本文参考书籍:spark核心技术与高级应用


继续上篇文章利用Intellij Idea在windows搭建Spark 开发环境(含打jar包过程)(一) 以及Sparksql处理json日志[要求sparksql统计json日志条数存入MySQL数据库]                         

本章将把打好的jar提交到集群运行。[如何打jar 包请看上文第一篇]

环境:centos7,windows10,intellij 2016.1.1

jar 包代码:[详细的步骤请看第二篇]

import scala.xml.XML
import scala.sys.process._
import org.apache.spark.sql.SQLContext
import scala.collection.mutable.ArrayBuffer
import scala.io.Source
import java.io.PrintWriter
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
import java.sql.DriverManager
import java.sql.PreparedStatement
import java.sql.Connection
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.Row
import java.util.Properties
import org.apache.spark.sql.SaveMode
import java.sql.{Connection, DriverManager, ResultSet}
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.SparkConf
object Distb{
  def  main(args:Array[String]): Unit ={
    val conf = new SparkConf().setAppName("distritb1").setMaster("spark://HDPMaster:7077")
    val sc = new SparkContext(conf) 
    val driverUrl="jdbc:mysql://ip:3306/test?user=root&password=123456&zeroDateTimeBehavior=convertToNull&amp;useUnicode=true&characterEncoding=utf8"
    val sqlContext=new org.apache.spark.sql.SQLContext(sc)
    val path="/test/input/test.json"    
    val all=sqlContext.read.json(path)
    all.registerTempTable("testtab")
 val count_name=sqlContext.sql("select name,sum(size) as sizesum from testtab group by name")
    count_name.write.mode(SaveMode.Overwrite).jdbc(driverUrl, "testresult", new Properties)   
  }
  }

第一步:把jar包上传至linux server.默认是/test/test.jar
第二步:由于本例是standalone 模式,所以要指定--master spark://HDPMaster:7077 \由于 我的spark-env.sh设置如下:export SPARK_EXECUTOR_INSTANCES=2 
export SPARK_EXECUTOR_CORES=1
export SPARK_EXECUTOR_MEMORY=3G 
export SPARK_DRIVER_MEMORY=1G 
export SPARK_MASTER_IP=HDPMaster
export SPARK_WORKER_MEMORY=4G     ###每个节点能用的内存是4G
export SPARK_MASTER_PORT=7077

所以我用的参数是--executor-memory 3g
运行参数设置如下:/usr/hdp/2.5.0.0-1245/spark/bin/spark-submit \
--class Distb \
--master spark://HDPMaster:7077 \
--executor-memory 3g   \               
--executor-cores 2   \
--num-executors 2 \
/test/test.jar


然后进去spark UI监控进度:your_ip:4040。如果程序没有报错,也可以去数据去查看表是否已经存在,数据是否已经有了。如果有了,说明成功!
[注意事项]如果是集群的模式,你也可以用htop查看每台Slave节点的内存,CPU的变化,从而知道,集群的状态。


问题:要求sparksql统计json日志条数存入mysql数据库。环境:spark1.6.2,hadoop2.7.3,mariadb10.1.4,centos7数据处理环境:spark-shell  交互环境数据量:100以内的测试json数据集。存储位置:HDFS文件系统。最终目标:统计json日志条数并存入maridb问题解决步骤:[数据怎么获取这里滤过]
必须提前要解决的问题:spark sql必须能访问mysql数据库. 第一步:把mysql驱动jar包放入spark的lib路径下。例如:mysql-connector-java-5.1.26-bin.jar第二步:修改spark-env.sh文件。export SPARK_CLASSPATH=$SPARK_CLASSPATH:/usr/hdp/2.5.0.0-1245/spark/lib/mysql-connector-java-5.1.26-bin.jar     ##你的spark lib存放mysql 驱动jar的路径。第三步:重启spark.
手动制作测试数据集:文件的名字以及数据:test.json{"name":"a","size":"12"}
{"name":"a","size":"13"}
{"name":"b","size":"14"}
{"name":"b","size":"16"}
{"name":"c","size":"21"}
{"name":"d","size":"34"}
1.把测试数据集test.json从本地存入[put]到hdfs。##创建测试路径: hadoop fs -mkdir  -p /test/input##存入测试数据集到此路径hadoop fs -put test.json /test/input##检查数据是否存在hadoop fs -ls /test/input[hdfs@localhost~]$ hadoop fs -ls /test/input
Found 1 items
-rw-r--r--   3 hdfs hdfs         18 2017-01-13 22:11 /test/input/test.json

2.从HDFS读取json数据。
启动spark-shell  交互环境
./spark-shell
##导入[import] spark sql相关的处理包。import scala.xml.XML
import scala.sys.process._
import org.apache.spark.sql.SQLContext
import scala.collection.mutable.ArrayBuffer
import scala.io.Source
import java.io.PrintWriter
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
import java.sql.DriverManager
import java.sql.PreparedStatement
import java.sql.Connection
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.Row
import java.util.Properties
import org.apache.spark.sql.SaveMode
import java.sql.{Connection, DriverManager, ResultSet}
import scala.collection.mutable.ArrayBuffer

##定义连接的mysql数据库的connection.val driverUrl ="jdbc:mysql://ip:3306/test?user=root&password=123456&zeroDateTimeBehavior=convertToNull&amp;useUnicode=true&characterEncoding=utf8"
val sqlContext=new org.apache.spark.sql.SQLContext(sc)

##指定路径名val path="/test/input/*" 
val all=sqlContext.read.json(path)
##临时表的名字是testtab
all.registerTempTable("testtab")
all.show+----+----+
|name|size|
+----+----+
|   a|  12|
|   a|  13|
|   b|  14|
|   b|  16|
|   c|  21|
|   d|  34|
+----+----+

##接下来统计每个name的和。val count_name=sqlContext.sql("select name,sum(size) as sizesum from testtab group by name")
count_name.show

+----+-------+
|name|sizesum|
+----+-------+
|   a|   25.0|
|   b|   30.0|
|   c|   21.0|
|   d|   34.0|
+----+-------+


##最后把统计结果存入到mysql数据库。count_name.write.mode(SaveMode.Append).jdbc(driverUrl, "testresult", new Properties)   ###这个是把记录追加在表中,等于数据库的insert into table.如果数据库表已经存在,就用这个操作。count_name.write.mode(SaveMode.OverWrite).jdbc(driverUrl, "testresult", new Properties)  ###这个是创建表并把数据保存在当前的表中。相当于sql的 create table if not exits,第一次的时候,要用OverWrite.