学习笔记之PySpark

时间:2024-05-18 22:25:48

一、PySpark 的背后原理

架构图:
学习笔记之PySpark
其中白色部分是新增的Python进程,在Driver端,通过Py4j实现在Python中调用Java的方法,即将用户写的PySpark程序”映射”到JVM中,例如,用户在PySpark中实例化一个Python的SparkContext对象,最终会在JVM中实例化Scala的SparkContext对象;在Executor端,则不需要借助Py4j,因为Executor端运行的Task逻辑是由Driver发过来的,那是序列化后的字节码,虽然里面可能包含有用户定义的Python函数或Lambda表达式,Py4j并不能实现在Java里调用Python的方法,为了能在Executor端运行用户定义的Python函数或Lambda表达式,则需要为每个Task单独启一个Python进程,通过socket通信方式将Python函数或Lambda表达式发给Python进程执行。语言层面的交互总体流程如下图所示,实线表示方法调用,虚线表示结果返回。
学习笔记之PySpark
参考:
PySpark 的背后原理:http://sharkdtu.com/posts/pyspark-internal.html

二、文档

Spark基础知识笔记见另一篇博文:https://blog.****.net/oTengYue/article/details/88405479
PySpark官方文档:https://spark.apache.org/docs/latest/api/python/index.html
pyspark编程指南(英文):https://www.datacamp.com/community/tutorials/apache-spark-python#PySpark
备忘清单:https://s3.amazonaws.com/assets.datacamp.com/blog_assets/PySpark_Cheat_Sheet_Python.pdf
学习笔记之PySpark

三、pyspark读写dataframe

pyspark系列–pyspark读写dataframe:https://zhuanlan.zhihu.com/p/34901558
pyspark:dataframe与rdd的一点小事:https://www.jianshu.com/p/5e593510313b

四、通过spark-submit提交任务模板示例

spark-submit提交方式官网:http://spark.apache.org/docs/latest/submitting-applications.html

spark-submit \
    --master yarn \
    --deploy-mode client \
    --num-executors 10 \
    --executor-memory 10g \
    --executor-cores 8 \
    --driver-memory 10g \
    --conf spark.pyspark.python=python2.7 \
    --conf spark.pyspark.driver.python=python2.7 \
    --py-files depend.zip \
    demo.py 2019-03-05

说明:
1、depend.zip是demo.py的依赖包,注:depend.zip不包含demo.py
2、demo.py中可以直接使用import depend.xx.xxfrom depend.xx.xx import xx类似语句引入依赖包
3、上述示例中的2019-03-05是传给demo.py的参数
参考:
Pyspark spark-submit 集群提交任务以及引入虚拟环境依赖包攻略:https://www.cnblogs.com/piperck/p/10121097.html

五、代码示例

1、WordCount词频分析

wordcount.py

# -*- coding: utf-8 -*-
import sys
import os
import datetime
from pyspark import SparkConf,SparkContext
sc = SparkConf().setAppName("wordcount")
spark = SparkContext(conf=sc)
text_file = spark.textFile("hdfs://shw/pyspark")
word_cnt_rdd = text_file.flatMap(lambda line : line.split(' ')).map(lambda word : (word, 1)).reduceByKey(lambda x, y: x + y)
word_cnt_rdd.saveAsTextFile('hdfs://shw/wc') 

提交命令:
spark-submit --master yarn --deploy-mode client --num-executors 10 --executor-memory 10g --executor-cores 8 --driver-memory 10g --conf spark.pyspark.python=python2.7 wordcount.py

2、使用PySpark语言开发操作Hive

场景:
1、使用pySpark语言开发操作Hive。
注:使用spark.sql(sql)函数执行hive sql每次只能执行一条,需要执行多个sql请分多次调用,且sql语句最后不能包含分号。
2、使用spark-submit提交脚本。
拓展:如果需要一次提交执行多条sql的可以考虑使用spark-sql命令,(使用总结以后再补充吧)

hive_test.py

# -*- coding: utf-8 -*-
from pyspark import SparkConf
from pyspark.sql import SparkSession
import sys,traceback
from pyspark import StorageLevel
import datetime
import uuid
def process(spark,calc_date):
    sql = """ 
        CREATE EXTERNAL TABLE IF NOT EXISTS app.app_shw_test(
            id bigint COMMENT 'ID', 
            name string COMMENT '名称'
        )
        COMMENT '测试表'
        PARTITIONED BY (dt string)
        STORED AS ORC
        LOCATION '/user/shw/app_shw_test'
        tblproperties ('orc.compress'='SNAPPY')
    """
    spark.sql(sql)
    sql = """ 
        select 
            id,
            name,
            '{calc_date}' dt
        from
            xx_test
        where 
            dt = '{calc_date}'
        group by 
            id,
            name
    """.format(calc_date = calc_date)
    df = spark.sql(sql)
    df.write.mode("append").insertInto('app_shw_test', overwrite=True)
def createSparkSession(appName):
    conf = SparkConf().setAppName(appName)
    conf.set("spark.rdd.compress", "true")
    conf.set("spark.broadcast.compress", "true")
    conf.set("hive.exec.dynamic.partition", "true")
    conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
    conf.set("hive.exec.max.dynamic.partitions", "100000")
    conf.set("hive.exec.max.dynamic.partitions.pernode", "100000")
    conf.set("hive.auto.convert.join", "true")
    conf.set("mapred.max.split.size", "256000000") # 每个Map最大输入大小
    conf.set("mapred.min.split.size.per.node", "100000000") # 一个节点上split的至少的大小
    conf.set("mapred.min.split.size.per.rack", "100000000") # 一个交换机下split的至少的大小
    conf.set("hive.input.format", "org.apache.hadoop.hive.ql.io.CombineHiveInputFormat") # 执行Map前进行小文件合并
    conf.set("hive.merge.mapfiles", "true") # 在Map-only的任务结束时合并小文件
    conf.set("hive.merge.mapredfiles", "true") # 在Map-Reduce的任务结束时合并小文件
    conf.set("hive.merge.size.per.task", "256*1000*1000") # 合并文件的大小
    conf.set("hive.merge.smallfiles.avgsize", "16000000") # 当输出文件的平均大小小于该值时,启动一个独立的map-reduce任务进行文件merge
    conf.set("spark.sql.shuffle.partitions", "500") # 设置shuffle分区数
    conf.set("spark.driver.maxResultSize", "5g")
    spark = SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate()
    return spark
def main():
    calc_date = sys.argv[1]
    appName = "spark_hive_task-" + str(uuid.uuid1())
    spark = createSparkSession(appName=appName)
    try:
        process(spark,calc_date)
    except Exception as ex:
        traceback.print_exc()
    finally:
        spark.stop()
if __name__ == "__main__":
    main()

提交命令:
spark-submit --master yarn --deploy-mode client --num-executors 10 --executor-memory 10g --executor-cores 8 --driver-memory 10g --conf spark.pyspark.python=python2.7 hive_test.py 2019-03-11