文章目录
一、PySpark 的背后原理
架构图:
其中白色部分是新增的Python进程,在Driver端,通过Py4j实现在Python中调用Java的方法,即将用户写的PySpark程序”映射”到JVM中,例如,用户在PySpark中实例化一个Python的SparkContext对象,最终会在JVM中实例化Scala的SparkContext对象;在Executor端,则不需要借助Py4j,因为Executor端运行的Task逻辑是由Driver发过来的,那是序列化后的字节码,虽然里面可能包含有用户定义的Python函数或Lambda表达式,Py4j并不能实现在Java里调用Python的方法,为了能在Executor端运行用户定义的Python函数或Lambda表达式,则需要为每个Task单独启一个Python进程,通过socket通信方式将Python函数或Lambda表达式发给Python进程执行。语言层面的交互总体流程如下图所示,实线表示方法调用,虚线表示结果返回。
参考:
PySpark 的背后原理:http://sharkdtu.com/posts/pyspark-internal.html
二、文档
Spark基础知识笔记见另一篇博文:https://blog.****.net/oTengYue/article/details/88405479
PySpark官方文档:https://spark.apache.org/docs/latest/api/python/index.html
pyspark编程指南(英文):https://www.datacamp.com/community/tutorials/apache-spark-python#PySpark
备忘清单:https://s3.amazonaws.com/assets.datacamp.com/blog_assets/PySpark_Cheat_Sheet_Python.pdf
三、pyspark读写dataframe
pyspark系列–pyspark读写dataframe:https://zhuanlan.zhihu.com/p/34901558
pyspark:dataframe与rdd的一点小事:https://www.jianshu.com/p/5e593510313b
四、通过spark-submit提交任务模板示例
spark-submit提交方式官网:http://spark.apache.org/docs/latest/submitting-applications.html
spark-submit \
--master yarn \
--deploy-mode client \
--num-executors 10 \
--executor-memory 10g \
--executor-cores 8 \
--driver-memory 10g \
--conf spark.pyspark.python=python2.7 \
--conf spark.pyspark.driver.python=python2.7 \
--py-files depend.zip \
demo.py 2019-03-05
说明:
1、depend.zip是demo.py的依赖包,注:depend.zip不包含demo.py
2、demo.py中可以直接使用import depend.xx.xx
或 from depend.xx.xx import xx
类似语句引入依赖包
3、上述示例中的2019-03-05
是传给demo.py的参数
参考:
Pyspark spark-submit 集群提交任务以及引入虚拟环境依赖包攻略:https://www.cnblogs.com/piperck/p/10121097.html
五、代码示例
1、WordCount词频分析
# -*- coding: utf-8 -*-
import sys
import os
import datetime
from pyspark import SparkConf,SparkContext
sc = SparkConf().setAppName("wordcount")
spark = SparkContext(conf=sc)
text_file = spark.textFile("hdfs://shw/pyspark")
word_cnt_rdd = text_file.flatMap(lambda line : line.split(' ')).map(lambda word : (word, 1)).reduceByKey(lambda x, y: x + y)
word_cnt_rdd.saveAsTextFile('hdfs://shw/wc')
提交命令:spark-submit --master yarn --deploy-mode client --num-executors 10 --executor-memory 10g --executor-cores 8 --driver-memory 10g --conf spark.pyspark.python=python2.7 wordcount.py
2、使用PySpark语言开发操作Hive
场景:
1、使用pySpark语言开发操作Hive。
注:使用spark.sql(sql)
函数执行hive sql每次只能执行一条,需要执行多个sql请分多次调用,且sql语句最后不能包含分号。
2、使用spark-submit提交脚本。
拓展:如果需要一次提交执行多条sql的可以考虑使用spark-sql命令,(使用总结以后再补充吧)
hive_test.py
# -*- coding: utf-8 -*-
from pyspark import SparkConf
from pyspark.sql import SparkSession
import sys,traceback
from pyspark import StorageLevel
import datetime
import uuid
def process(spark,calc_date):
sql = """
CREATE EXTERNAL TABLE IF NOT EXISTS app.app_shw_test(
id bigint COMMENT 'ID',
name string COMMENT '名称'
)
COMMENT '测试表'
PARTITIONED BY (dt string)
STORED AS ORC
LOCATION '/user/shw/app_shw_test'
tblproperties ('orc.compress'='SNAPPY')
"""
spark.sql(sql)
sql = """
select
id,
name,
'{calc_date}' dt
from
xx_test
where
dt = '{calc_date}'
group by
id,
name
""".format(calc_date = calc_date)
df = spark.sql(sql)
df.write.mode("append").insertInto('app_shw_test', overwrite=True)
def createSparkSession(appName):
conf = SparkConf().setAppName(appName)
conf.set("spark.rdd.compress", "true")
conf.set("spark.broadcast.compress", "true")
conf.set("hive.exec.dynamic.partition", "true")
conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
conf.set("hive.exec.max.dynamic.partitions", "100000")
conf.set("hive.exec.max.dynamic.partitions.pernode", "100000")
conf.set("hive.auto.convert.join", "true")
conf.set("mapred.max.split.size", "256000000") # 每个Map最大输入大小
conf.set("mapred.min.split.size.per.node", "100000000") # 一个节点上split的至少的大小
conf.set("mapred.min.split.size.per.rack", "100000000") # 一个交换机下split的至少的大小
conf.set("hive.input.format", "org.apache.hadoop.hive.ql.io.CombineHiveInputFormat") # 执行Map前进行小文件合并
conf.set("hive.merge.mapfiles", "true") # 在Map-only的任务结束时合并小文件
conf.set("hive.merge.mapredfiles", "true") # 在Map-Reduce的任务结束时合并小文件
conf.set("hive.merge.size.per.task", "256*1000*1000") # 合并文件的大小
conf.set("hive.merge.smallfiles.avgsize", "16000000") # 当输出文件的平均大小小于该值时,启动一个独立的map-reduce任务进行文件merge
conf.set("spark.sql.shuffle.partitions", "500") # 设置shuffle分区数
conf.set("spark.driver.maxResultSize", "5g")
spark = SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate()
return spark
def main():
calc_date = sys.argv[1]
appName = "spark_hive_task-" + str(uuid.uuid1())
spark = createSparkSession(appName=appName)
try:
process(spark,calc_date)
except Exception as ex:
traceback.print_exc()
finally:
spark.stop()
if __name__ == "__main__":
main()
提交命令:spark-submit --master yarn --deploy-mode client --num-executors 10 --executor-memory 10g --executor-cores 8 --driver-memory 10g --conf spark.pyspark.python=python2.7 hive_test.py 2019-03-11