PySpark——Python与大数据

时间:2024-11-15 12:40:39

一、Spark 与 PySpark

Apache Spark 是用于大规模数据( large-scala data )处理的统一( unified )分析引擎。简单来说, Spark 是一款分布式的计算框架,用于调度成百上千的服务器集群,计算 TB 、 PB 乃至 EB 级别的海量数据。

Spark 作为全球*的分布式计算框架,支持众多的编程语言进行开发,Python 语言是 Spark 重点支持的方向,PySpark 是由 Spark 官方开发的 Python 语言第三方库。

1.1 PySpark的安装

使用快捷键Win+R打开运行窗口,然后输入"cmd"并按下回车键,调出CMD命令窗口,在命令窗口中输入“ pip install pyspark ”,按下回车键,等待安装成功即可。(若无法直接顺利下载,可使用国内代理镜像网站(清华大学源)pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark)

1.2构建 PySpark 执行环境入口对象

SparkContext 类对象,是 PySpark 编程中一切功能的入口。想要使用 PySpark 库完成数据处理,首先需要构建一个执行环境入口对象,PySpark 的执行环境入口对象是类 SparkContext 的类对象。

#导包
from pyspark import SparkConf,SparkContext
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#上述写法是一种链式调用,等同于下面的写法
# conf=SparkConf()
# conf.setMaster('local[*]')
# conf.setAppName('test_spark_app')

#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#打印PySpark的运行版本
print(sc.version)
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

输出:

24/11/09 15:34:18 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/09 15:34:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

3.5.3

1.3 PySpark 的编程模型

PySpark 的编程,主要分为如下三大步骤:

PySpark 的编程模型 图1
PySpark 的编程模型 图2

通过 SparkContext 对象,完成数据输入,输入数据后得到 RDD 对象,对 RDD 对象进行迭代计算,最终通过 RDD 对象的成员方法,完成数据输出工作。

二、数据输入

2.1 RDD对象

PySpark 支持多种数据的输入,在输入完成后,都会得到一个 RDD 类的对象,RDD 全称为弹性分布式数据集( Resilient Distributed Datasets )。
为什么要使用RDD对象呢?因为PySpark 针对数据的处理,都是以 RDD 对象作为载体,即:

  • 数据存储在 RDD 内
  • 各类数据的计算方法也都是 RDD 的成员方法
  • RDD 的数据计算方法,返回值依旧是 RDD 对象

可以结合 -PySpark 的编程模型 图2- 理解。

2.2数据转换为RDD对象

2.2.1 Python 数据容器转换为 RDD 对象

PySpark 支持通过 SparkContext 对象的 parallelize 成员方法,将Python 数据容器( list、tuple、 set、 dict、str)转换为 PySpark 的 RDD 对象。

语法:rdd=SparkContext类对象.parallelize(Python 数据容器)

代码示例如下:

#导包
from pyspark import SparkConf,SparkContext
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#数据输入——Python 数据容器转 RDD 对象
rdd1=sc.parallelize([1,2,3,4,5])
rdd2=sc.parallelize((1,2,3,4,5))
rdd3=sc.parallelize('my heart go on')
rdd4=sc.parallelize({'a',2,4,'abc'})
rdd5=sc.parallelize({1:'a',2:'b',3:'c'})
#输出RDD的内容
print(rdd1.collect())
print(rdd2.collect())
print(rdd3.collect())
print(rdd4.collect())
print(rdd5.collect())
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

输出:

24/11/10 15:01:20 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/10 15:01:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

[1, 2, 3, 4, 5]
[1, 2, 3, 4, 5]
['m', 'y', ' ', 'h', 'e', 'a', 'r', 't', ' ', 'g', 'o', ' ', 'o', 'n']
[2, 'a', 4, 'abc']
[1, 2, 3]

由输出结果可知:

  • 字符串会被拆分成 一个一个的字符,存入 RDD 对象
  • 字典中仅有 key 会被存入 RDD 对象

2.2.2文件转换为 RDD 对象

PySpark 支持通过 SparkContext 的 textFile 成员方法,读取文本文件,转换成 RDD 对象。

语法:rdd=SparkContext类对象.textFile(文件路径)

我们新建一个文件“hello.txt”,在其中写入一句“Say goodbye to all your troubles”,然后保存。

读取文件“hello.txt”,将其转换成 RDD 对象,代码如下:

#导包
from pyspark import SparkConf,SparkContext
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#数据输入——文件数据转 RDD 对象
rdd=sc.textFile('E:/可视化案例数据/hello.txt')
#输出RDD的内容
print(rdd.collect())
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

输出:

24/11/10 15:31:40 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/10 15:31:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

['Say goodbye to all your troubles']

三、数据计算

PySpark 的数据计算,都是基于 RDD 对象来进行的,依赖 RDD 对象内置丰富的成员方法,也称为算子,本节主要介绍以下6种算子。

3.1 map算子

功能:对 RDD内的数据逐个处理,处理的具体步骤基于其接收的处理函数,处理完成后返回一个新的RDD。

语法:rdd.map ( func )

f : (T)  ->  U 表示这是一个函数(方法),接收一个参数传入,参数类型不限,返回一个返回值,返回值类型不限。

f : (T)  ->  T 表示这是一个函数(方法),接收一个参数传入,参数类型不限,返回一个返回值,返回值类型和传入参数类型一致。

写好代码后直接运行会报错“SparkException: Python worker failed to connect back.”,简单来说就是spark找不到python在哪里,我们需要通过OS模块设置一个环境变量来告诉spark python在哪里,代码如下所示,environ是一个字典,key为“PYSPARK_PYTHON”,value为python.exe的文件路径(因人而异,安装在哪里就写哪里)。

import os
os.environ['PYSPARK_PYTHON']='E:/Anaconda/python.exe'

利用map算子将RDD中的数据扩大10倍,代码示例:

#导包
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']='E:/Anaconda/python.exe'
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#输入数据
rdd1=sc.parallelize([1,2,3,4,5])
print(rdd1.collect())
def func(x):
    return x*10
rdd2=rdd1.map(func)
print(rdd2.collect())
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

也可以使用 lambda匿名函数,让代码更加简洁高效:

 lambda匿名函数语法:lambda  传入参数:函数体(只能写一行代码)

#导包
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']='E:/Anaconda/python.exe'
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#输入数据
rdd1=sc.parallelize([1,2,3,4,5])
print(rdd1.collect())
rdd2=rdd1.map(lambda data:data*10)
print(rdd2.collect())
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

两种方法输出结果相同:

24/11/10 17:20:52 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/10 17:20:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

[1, 2, 3, 4, 5]
[10, 20, 30, 40, 50]

如果函数体复杂,用直接定义方式写比较方便,如果函数体一行就能写完,用lambda匿名函数写比较方便。

如果需要经历数次处理,可以通过链式调用的方式多次调用算子,更为简便:

#导包
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']='E:/Anaconda/python.exe'
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#输入数据
rdd1=sc.parallelize([1,2,3,4,5])
print(rdd1.collect())
def func1(x):
    return x*10
def func2(x):
    return x+10
def func3(x):
    return x-2
rdd2=rdd1.map(func1).map(func2).map(func3) #链式调用map算子
print(rdd2.collect())
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

输出:

24/11/10 17:29:21 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/10 17:29:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

[1, 2, 3, 4, 5]
[18, 28, 38, 48, 58]

注意:此处的函数func1,func2,func3只是为了演示链式调用,其中的操作很简单,可以合并。遇到无法合并的函数又需要调用多次map算子时,可以使用链式调用。

3.2 flatMap算子

功能:对RDD执行map操作,然后进行解除嵌套操作。

语法:rdd.flatMap ( func )

将数据 ['my heart','go on'] 转换成 ['my' , 'heart' , 'go' , 'on'] 这种格式,使用字符串分割函数split,按照空格切分数据,然后解除嵌套。

字符串分割函数split语法:字符串.split(分隔符字符串)    

代码示例:

#导包
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']='E:/Anaconda/python.exe'
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#输入数据
rdd3=sc.parallelize(['my heart','go on'])
print(rdd3.collect())
#按照空格切分数据,然后解除嵌套
rdd4=rdd3.flatMap(lambda x:x.split(' '))
print(rdd4.collect())
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

输出:

24/11/10 18:32:16 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/10 18:32:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

['my heart','go on']
['my', 'heart', 'go', 'on']

flatMap 算子计算逻辑和map算子 一样,只是比map算子多出一层解除嵌套的功能。

3.3 reduceByKey算子

功能:针对KV型RDD,自动按照Key分组,然后根据提供的聚合逻辑(函数),完成对组内数据(value)的聚合操作。

语法:rdd.reduceByKey ( func )  #func为聚合函数

reduceByKey的聚合逻辑

统计数据[('a',1),('a',1),('b',1),('b',1),('b',1)]中a和b的出现次数,代码示例:

#导包
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']='E:/Anaconda/python.exe'
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#输入数据
rdd5=sc.parallelize([('a',1),('a',1),('b',1),('b',1),('b',1)])
rdd6=rdd5.reduceByKey(lambda a,b:a+b)
print(rdd6.collect())
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

输出:

24/11/10 19:20:56 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/10 19:20:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

[('a', 2), ('b', 3)]

注意:rdd.reduceByKey ( func )中,func只负责聚合,分组是自动ByKey来分组的。

图解

3.4 filter算子

功能:过滤数据,即对 RDD 数据逐个进行处理,返回值为True的数据被保留,返回值为False的数据被丢弃。

语法:rdd.filter ( func )

过滤掉数据[1,2,3,4,5,6]中的偶数,代码示例:

#导包
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']='E:/Anaconda/python.exe'
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#输入数据
rdd1=sc.parallelize([1,2,3,4,5,6])
print(rdd1.collect())
#过滤掉数据中的偶数
rdd2=rdd1.filter(lambda x:x%2==1)
print(rdd2.collect())
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

输出:

24/11/11 21:20:46 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/11 21:20:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

[1,2,3,4,5,6]

[1, 3, 5]

3.5 distinct算子

功能:对 RDD 内数据去重。

语法:rdd.distinct() #无需传参

对数据[1,2,1,4,6,6]进行去重,代码示例:

#导包
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']='E:/Anaconda/python.exe'
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#输入数据
rdd1=sc.parallelize([1,2,1,4,6,6])
print(rdd1.collect())
#去重
rdd2=rdd1.distinct()
print(rdd2.collect())
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

输出:

24/11/11 21:29:46 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/11 21:29:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

[1, 2, 1, 4, 6, 6]
[1, 2, 4, 6]

3.6 sortBy算子

功能:对 RDD 数据进行排序,基于指定的排序依据(由函数规定)。

语法:rdd.sortBy(func,ascending,numPartitions) 

对数据[('苹果',6),('香蕉',3),('橙子',4),('西瓜',9),('火龙果',10)]进行排序,按照value值进行排序(降序),代码示例:

#导包
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']='E:/Anaconda/python.exe'
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#输入数据
rdd1=sc.parallelize([('苹果',6),('香蕉',3),('橙子',4),('西瓜',9),('火龙果',10)])
print(rdd1.collect())
#按照value值进行排序(降序)
rdd2=rdd1.sortBy(lambda x:x[1],ascending=False,numPartitions=1) #此处设置分区数量为1,非要点,无需理解
print(rdd2.collect())
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

输出:

24/11/11 22:05:04 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/11 22:05:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

[('苹果', 6), ('香蕉', 3), ('橙子', 4), ('西瓜', 9), ('火龙果', 10)]
[('火龙果', 10), ('西瓜', 9), ('苹果', 6), ('橙子', 4), ('香蕉', 3)]

四、数据输出

4.1输出为 Python 对象

数据输出为 Python 对象的方法是很多的,本小节简单介绍了以下 4 个。

4.1.1 collect算子

功能:将RDD各个分区内的数据统一收集到Driver中,返回一个list对象。

语法:rdd.collect()#无须传参

代码验证:

#导包
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']='E:/Anaconda/python.exe'
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#输入数据
rdd=sc.parallelize([1,2,3,4,5])
print(rdd.collect())
print(type(rdd.collect()))
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

输出:

24/11/12 21:33:52 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/12 21:33:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

[1, 2, 3, 4, 5]
<class 'list'>

4.1.2 reduce算子

功能:对RDD数据集按照传入的逻辑进行聚合

语法:rdd.reduce(func)

代码示例:

#导包
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']='E:/Anaconda/python.exe'
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#输入数据
rdd=sc.parallelize([1,2,3,4,5])
print(rdd.reduce(lambda a,b:a*b))
print(type(rdd.reduce(lambda a,b:a*b)))
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

输出:

24/11/12 21:52:33 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/12 21:52:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

120
<class 'int'>

图解
​​​​

4.1.3 take算子

功能:取RDD的前N个元素,组合成列表返回。

语法:rdd.take(N)

代码示例:

#导包
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']='E:/Anaconda/python.exe'
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#输入数据
rdd=sc.parallelize([1,2,3,4,5])
print(rdd.take(3))
print(type(rdd.take(3)))
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

输出:

24/11/12 21:41:18 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/12 21:41:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

[1, 2, 3]
<class 'list'>

4.1.4 count算子

功能:统计RDD内有多少条数据,返回一个数字

语法:rdd.count() #无须传参

代码示例:

#导包
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']='E:/Anaconda/python.exe'
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#输入数据
rdd=sc.parallelize([1,2,3,4,5])
print(rdd.count())
print(type(rdd.count()))
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

输出:

24/11/12 21:44:34 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/12 21:44:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

5
<class 'int'>

4.2输出到文件中

将数据输出到文件中的方法本小节主要介绍了saveAsTextFile算子。

saveAsTextFile算子功能:将RDD中的数据写入文本文件中。

语法:rdd.saveAsTextFile(文件路径)

注意:调用saveAsTextFile这类保存文件的算子,需要配置 Hadoop 依赖,步骤如下:

  1. 下载 Hadoop 安装包,解压到电脑任意位置。(文件链接: https://pan.baidu.com/s/1HHEE2oIp2kYohTpObgVN8g?pwd=mwdy 提取码: mwdy 复制这段内容后打开百度网盘手机App,操作更方便哦)
  2. 在 Python 代码中使用 os 模块配置: os.environ[‘HADOOP_HOME’] = ‘HADOOP 解压文件夹路径’
  3. 下载 winutils.exe ,并放入 Hadoop 解压文件夹的 bin 目录内(文件链接: https://pan.baidu.com/s/1z67mUP3yrci93uLOXAuOxg?pwd=txv2 提取码: txv2 复制这段内容后打开百度网盘手机App,操作更方便哦)
  4. 下载 hadoop.dll ,并放入 :C:/Windows/System32 文件夹内(文件链接: https://pan.baidu.com/s/15vA0o6j8W9f5kNaEYRopDA?pwd=pbww 提取码: pbww 复制这段内容后打开百度网盘手机App,操作更方便哦)

代码示例:

#导包
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']='E:/Anaconda/python.exe'
os.environ[‘HADOOP_HOME’] = ‘E:/HADOOP’#具体看个人的HADOOP保存路径
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#输入数据
rdd=sc.parallelize([1,2,3,4,5],numSlices=1)#修改 rdd 分区为 1 个,便于文件输出演示
#输出到文件file_test.txt中
rdd.saveAsTextFile('E:/可视化案例数据/file_test.txt')
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

运行代码后,打开该文件路径,发现文件“file_test.txt”已被建立,且通过右侧的文件预览可见文件内容与代码内容一致,如下图所示:

五、案例

4.1统计出现的单词数量

我们新建一个文件“test.txt”,在其中写入如下图所示的内容,然后保存。

案例要求:统计文件内所出现的单词的数量。

思路:

  1. 利用flatMap算子,将单词依次独立地取出。
  2. 利用map算子,给所有单词加上value(值为1),单词本身作为key。
  3. 利用reduceByKey算子,分组聚合,从而得出每个单词的出现总数。

代码示例:

#导包
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']='E:/Anaconda/python.exe'
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#读取数据文件
file=sc.textFile('E:/可视化案例数据/test.txt')
#将单词依次独立地取出
words_rdd=file.flatMap(lambda line:line.split(' '))
print(words_rdd.collect())
#给所有单词加上value(值为1),单词本身作为key
words_with_one_rdd=words_rdd.map(lambda x:(x,1))
print(words_with_one_rdd.collect())
#分组聚合
result_rdd=words_with_one_rdd.reduceByKey(lambda a,b:a+b)
print(result_rdd.collect())
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

输出:

24/11/11 21:02:57 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/11 21:02:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

['goodbye', 'to', 'troubles', 'comes', 'to', 'comes', 'to', 'ease', 'troubles', 'comes', 'to', 'ease', 'goodbye']
[('goodbye', 1), ('to', 1), ('troubles', 1), ('comes', 1), ('to', 1), ('comes', 1), ('to', 1), ('ease', 1), ('troubles', 1), ('comes', 1), ('to', 1), ('ease', 1), ('goodbye', 1)]
[('goodbye', 2), ('troubles', 2), ('to', 4), ('comes', 3), ('ease', 2)]

4.2销售数据计算

读取文件“orders.txt”中的数据,并按要求进行计算。

文件链接: https://pan.baidu.com/s/1rEiJm3-BNZeo5MrzXiSZnw?pwd=g3a6 提取码: g3a6 复制这段内容后打开百度网盘手机App,操作更方便哦

案例要求:

  1. 各个城市销售额排名,从大到小
  2. 全部城市,有哪些商品类别在售卖
  3. 北京市有哪些商品类别在售卖

1.各个城市销售额排名,从大到小

如上图所示,文件数据是json格式,但并不是每个json数据都独占一行,其中有些json数据之间用“|”隔开了,我们要先利用flatMap算子,接收字符串分割函数,将json数据独立地分隔开。然后利用map算子将json数据转换成python字典,数据初步处理完成,

文件内同一城市的不同商品类别的销售数据是分散的,我们需要先取出城市和销售额数据,然后利用reduceByKey算子分组聚合,聚合同一城市不同商品的销售额,最后利用sortBy算子排序。

代码如下:

#导包
from pyspark import SparkConf,SparkContext
import json
import os
os.environ['PYSPARK_PYTHON']='E:/Anaconda/python.exe'
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#读取文件数据
file_rdd=sc.textFile('E:/可视化案例数据/orders.txt')
json_rdd=file_rdd.flatMap(lambda x:x.split('|'))
#将json数据转换成python字典
dict_rdd=json_rdd.map(lambda x:json.loads(x))#将json数据转换成python字典
#取出城市和销售额数据
city_with_money_rdd=dict_rdd.map(lambda x:(x['areaName'],int(x['money']))) #注意,文件内money数据是字符串格式,为了满足排序的需求,必须将其强制转换为int类型
#聚合同一城市不同商品的销售额
city_result_money_rdd=city_with_money_rdd.reduceByKey(lambda a,b:a+b)
#按销售额排序,从大到小
result1_rdd=city_result_money_rdd.sortBy(lambda x:x[1],ascending=False,numPartitions=1)
print(result1_rdd.collect())
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

输出:

24/11/12 20:54:15 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/12 20:54:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

[('北京', 91556), ('杭州', 28831), ('天津', 12260), ('上海', 1513), ('郑州', 1120)]

2.全部城市,有哪些商品类别在售卖

从文件中可以看出,同一商品可能多个城市都在售卖,那取出全部商品类别数据后,可能存在重复情况,就需要利用distinct算子去重。

代码如下:

#导包
from pyspark import SparkConf,SparkContext
import json
import os
os.environ['PYSPARK_PYTHON']='E:/Anaconda/python.exe'
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#读取文件数据
file_rdd=sc.textFile('E:/可视化案例数据/orders.txt')
json_rdd=file_rdd.flatMap(lambda x:x.split('|'))
#将json数据转换成python字典
dict_rdd=json_rdd.map(lambda x:json.loads(x))#将json数据转换成python字典
#取出全部商品类别数据
category_rdd=dict_rdd.map(lambda x:x['category'])
#去重
result2_rdd=category_rdd.distinct()
print(result2_rdd.collect())
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

输出:

24/11/12 21:11:54 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/12 21:11:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

['平板电脑', '家电', '书籍', '手机', '电脑', '家具', '食品', '服饰']

3.北京市有哪些商品类别在售卖

首先利用filter算子过滤掉areaName不是北京的数据,因为北京同一商品类别不同时间的数据也而不同,需要去重。

代码如下:

#导包
from pyspark import SparkConf,SparkContext
import json
import os
os.environ['PYSPARK_PYTHON']='E:/Anaconda/python.exe'
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#读取文件数据
file_rdd=sc.textFile('E:/可视化案例数据/orders.txt')
json_rdd=file_rdd.flatMap(lambda x:x.split('|'))
#将json数据转换成python字典
dict_rdd=json_rdd.map(lambda x:json.loads(x))#将json数据转换成python字典
#取出北京市的数据
BeiJing_rdd=dict_rdd.filter(lambda x:x['areaName']=='北京')
#取出北京市的商品类别数据并去重
result3_rdd=BeiJing_rdd.map(lambda x:x['category']).distinct() #链式调用
print(result3_rdd.collect())
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

输出:

24/11/12 21:23:00 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/12 21:23:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

['平板电脑', '家电', '书籍', '手机', '电脑', '家具', '食品', '服饰']

4.3搜索引擎日志分析

文件链接: https://pan.baidu.com/s/1y8Ydf2JErNWfIUU5jWMJ7Q?pwd=m81n 提取码: m81n 复制这段内容后打开百度网盘手机App,操作更方便哦

文件“search_log.txt”数据格式如下图所示:

案例要求:

  1. 打印输出:热门搜索时间段(小时精度) Top3
  2. 打印输出:热门搜索词 Top3
  3. 打印输出:统计黑马程序员关键字在哪个时段被搜索最多

1.打印输出:热门搜索时间段(小时精度) Top3

​​​​​​​思路如下:

  • ​​​​​​​从文件中可以看出每列数据之间使用“\t”对齐,利用字符串分割函数取出第一列时间数据。
  • 案例要求小时精度,时间数据格式为【小时:分钟:秒】,我们只需要小时,利用数据容器的切片操作取前两位即可。
  • 将数据转换成(小时,1)的二元元组形式,然后分组聚合,按降序从高到低排序。
  • 取前3个小时时间段即为“热门搜索时间段(小时精度) Top3”。

代码如下:

#导包
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']='E:/Anaconda/python.exe'
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#读取文件数据
file_rdd=sc.textFile('E:/可视化案例数据/search_log.txt')
#利用链式调用的形式处理数据
result1=file_rdd.map(lambda x:x.split('\t')).map(lambda x:x[0][:2]).map(lambda x:(x,1)).reduceByKey(lambda a,b:a+b).sortBy(lambda x:x[1],ascending=False,numPartitions=1).take(3)
print(result1)
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

输出:

24/11/14 14:25:49 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/14 14:25:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

[('20', 3479), ('23', 3087), ('21', 2989)]

我们得到了搜索次数最多的时间段:晚上8点、晚上11点、晚上9点。

代码可以进一步改进:

改进后的代码:

​
#导包
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']='E:/Anaconda/python.exe'
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#读取文件数据
file_rdd=sc.textFile('E:/可视化案例数据/search_log.txt')
#利用链式调用的形式处理数据
result1=file_rdd.map(lambda x:(x.split('\t')[0][:2],1)).\
    reduceByKey(lambda a,b:a+b).\
    sortBy(lambda x:x[1],ascending=False,numPartitions=1).\
    take(3)
print(result1)
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

​

2.打印输出:热门搜索词 Top3

​​​​​​​思路如下:

  • 从文件中可以看出每列数据之间使用“\t”对齐,利用字符串分割函数取出第三列热搜词数据。
  • 将数据转换成(热搜词,1)的二元元组形式,然后分组聚合,按降序从高到低排序。
  • 取前3个数据即为“热门搜索词 Top3”。

代码如下:

#导包
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']='E:/Anaconda/python.exe'
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#读取文件数据
file_rdd=sc.textFile('E:/可视化案例数据/search_log.txt')
#利用链式调用的形式处理数据
result2=file_rdd.map(lambda x:x.split('\t')).\
    map(lambda x:x[2]).\
    map(lambda x:(x,1)).\
    reduceByKey(lambda a,b:a+b).\
    sortBy(lambda x:x[1],ascending=False,numPartitions=1).\
    take(3)
print(result2)
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

​​​​​​​输出:

24/11/14 15:02:21 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/14 15:02:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

[('scala', 2310), ('hadoop', 2268), ('博学谷', 2002)]

我们得到了被检索次数最多的3个热搜词及其具体被检索次数。

此处依然可以简化3个map算子,但为了代码易读性,此处不再进行简化,简化可看下图:

3.打印输出:统计黑马程序员关键字在哪个时段(小时精度)被搜索最多

思路如下:

  • 从文件中可以看出每列数据之间使用“\t”对齐利用字符串分割函数取出第一列时间数据和第三列关键字数据。案例要求小时精度,时间数据格式为【小时:分钟:秒】,我们只需要小时,利用数据容器的切片操作取前两位即可。
  • 过滤出关键词=黑马程序员的数据,过滤完成后不再需要关键字数据,将数据转换成(时间,1)的二元元组形式,然后分组聚合,统计不同时间段内关键词黑马程序员被搜索的次数。
  • 最后按降序从高到低排序,取第一个数据即为“黑马程序员关键字被搜索次数最多的时段”。

代码如下:

#导包
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']='E:/Anaconda/python.exe'
#创建SparkConf类对象
conf=SparkConf().setMaster('local[*]').setAppName('test_spark_app')
#基于SparkConf类对象创建SparkContext类对象
sc=SparkContext(conf=conf)
#读取文件数据
file_rdd=sc.textFile('E:/可视化案例数据/search_log.txt')
#利用链式调用的形式处理数据
result3=file_rdd.map(lambda x:x.split('\t')).\
    map(lambda x:(x[0][:2],x[2])).\
    filter(lambda x:x[1]=='黑马程序员').\
    map(lambda x:(x[0],1)).\
    reduceByKey(lambda a,b:a+b).\
    sortBy(lambda x:x[1],ascending=False,numPartitions=1).\
    take(1)
print(result3)
#停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

​​​​​​​输出:

24/11/14 15:21:20 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/14 15:21:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

[('22', 245)]

黑马程序员关键字在晚上10点这个时间段被检索次数最多,被检索次数为245次。