从0开始学人工智能测试节选:Spark -- 结构化数据领域中测试人员的万金油技术(二)

时间:2024-04-19 07:08:11

Dataframe

dataframe 是spark中参考pandas设计出的一套高级API,用户可以像操作pandas一样方便的操作结构化数据。毕竟纯的RDD操作是十分原始且麻烦的。而dataframe的出现可以让熟悉pandas的从业人员能用非常少的成本完成分布式的数据分析工作, 毕竟跟数据打交道的人很少有不懂dataframe的。

初始化dataframe的方法

from pyspark import SparkContext, SparkConf, SQLContext

from pyspark.sql import Row



logFile = "/Users/xxxx/tools/spark-3.0.3-bin-hadoop2.7/README.md"

conf = SparkConf().setMaster("local").setAppName("My App")

sc = SparkContext(conf=conf)

sqlContext = SQLContext(sc)

dataA = sqlContext.read.csv("路径")





dicts = [{'col1': 'a', 'col2': 1}, {'col1': 'b', 'col2': 2}]

dataf = sqlContext.createDataFrame(dicts)

dataf.show()



dicts = [['a', 1], ['b', 2]]

rdd = sc.parallelize(dicts)

dataf = sqlContext.createDataFrame(rdd, ['col1','col2'])

dataf.show()





rows = [Row(col1='a', col2=1), Row(col1='b', col2=2)]

dataf= sqlContext.createDataFrame(rows)

dataf.show()



dataf.write.csv(path="/Users/cainsun/Downloads/test_spark", header=True, sep=",", mode='overwrite')

可以看到创建dataframe有多种方式, 可以从文件中读取, 可以从列表中初始化,可以用简单的方式指定列信息, 也可以使用Row类来初始化列信息。

dataframe常用操作

读取数据:

df = spark.read.json("data.json")



df = spark.read.csv("data.csv", header=True, inferSchema=True)



df = spark.read.parquet("data.parquet")

显示数据:

# 显示前 n 行数据,默认为 20 行

df.show(n=5)

# 打印 DataFrame 的 schema

df.printSchema()

选择和过滤数据:

# 选择特定列

selected_df = df.select("column1", "column2")

# 使用条件过滤数据

filtered_df = df.filter(df["age"] > 30)

聚合和分组数据:

from pyspark import SparkContext, SparkConf, SQLContext





conf = SparkConf().setMaster("local").setAppName("My App")

sc = SparkContext(conf=conf)

sqlContext = SQLContext(sc)





dicts = [

['teacher', 202355, 16, '336051551@qq.com'],

['student', 2035, 16, '336051551@qq.com'],

['qa', 2355, 16, '336051551@qq.com'],

['qa', 20235, 16, '336051551@qq.com'],

['teacher', 35, 16, '336051asdf'],

['student', 453, 16, '336051asdf'],





]

rdd = sc.parallelize(dicts, 3)

data = sqlContext.createDataFrame(rdd, ['title', 'sales', 'age', 'email'])





result = data.groupBy("title").max("sales").alias("max_sales")

resultA = data.groupBy("title").sum("sales").alias("sum_sales")



# 显示结果

result.show()

resultA.show()





+-------+----------+

| title|max(sales)|

+-------+----------+

|teacher| 202355|

| qa| 20235|

|student| 2035|

+-------+----------+



+-------+----------+

| title|sum(sales)|

+-------+----------+

|teacher| 202390|

| qa| 22590|

|student| 2488|

+-------+----------+



数据排序:



from pyspark.sql.functions import desc



# 按列排序

sorted_df = df.sort("column1")



# 按列降序排序

sorted_df = df.sort(desc("column1"))

添加,修改和删除列:



from pyspark.sql.functions import upper



# 添加新列

new_df = df.withColumn("new_column", df["column1"] * 2)



# 修改现有列

modified_df = df.withColumn("column1", upper(df["column1"]))



# 删除列

dropped_df = df.drop("column1")



重命名列:

# 重命名 DataFrame 中的列

renamed_df = df.withColumnRenamed("old_column_name", "new_column_name")

spark sql

初始化

from pyspark import SparkContext, SparkConf, SQLContext



# 创建 SparkSession

conf = SparkConf().setMaster("local").setAppName("My App")

sc = SparkContext(conf=conf)

sqlContext = SQLContext(sc)





dicts = [

['teacher', 202355, 16, '336051551@qq.com'],

['student', 2035, 16, '336051551@qq.com'],

['qa', 2355, 16, '336051551@qq.com'],

['qa', 20235, 16, '336051551@qq.com'],

['teacher', 35, 16, '336051asdf'],

['student', 453, 16, '336051asdf'],





]

rdd = sc.parallelize(dicts, 3)

data = sqlContext.createDataFrame(rdd, ['title', 'sales', 'age', 'email'])



data.createOrReplaceTempView("table")

要使用spark sql的能力, 需要利用createOrReplaceTempView创建一个临时表,然后才能执行 sql

简单的sql执行

query = "select * from table where title = 'qa'"



resultB = sqlContext.sql(query)



resultB.show()



# 执行结果

+-----+-----+---+----------------+

|title|sales|age| email|

+-----+-----+---+----------------+

| qa| 2355| 16|336051551@qq.com|

| qa|20235| 16|336051551@qq.com|

+-----+-----+---+----------------+

分组查询

query = "select title, sum(sales), max(sales) from table group by title"



resultC = sqlContext.sql(query)



resultC.show()



# 执行结果

+-------+----------+----------+

| title|sum(sales)|max(sales)|

+-------+----------+----------+

|teacher| 202390| 202355|

| qa| 22590| 20235|

|student| 2488| 2035|

+-------+----------+----------+

Spark sql适合熟悉sql语法的人使用,本质上sql和dataframe最终都会被翻译成rdd来运行。我们可以把它看成是rdd的高级语法糖就可以。 大家喜欢哪种操作方式就选择哪种就可以。

数据测试/监控

回顾自学习与数据闭环那里,我们知道在这样的系统中针对与每天新采集的数据,需要做一道数据校验。下面我模拟一个场景写这样一个检查脚本。

from pyspark import SparkContext, SparkConf, SQLContext

from pyspark.sql import SparkSession

import pyspark.sql.functions as F



conf = SparkConf().setMaster("local").setAppName("My App")

sc = SparkContext(conf=conf)

sqlContext = SQLContext(sc)



rdd = sc.parallelize(range(1000))

print(rdd.map(lambda x: '%s,%s' % ('男', '16')).collect())



dicts = [

['frank', 202355, 16, '336051551@qq.com'],

['frank', 202355, 16, '336051551@qq.com'],

['frank', 202355, 16, '336051551@qq.com'],

['frank', 202355, 16, '336051551@qq.com'],

['frank', 202355, 16, '336051asdf'],

['', 452345, 16, '336051asdf'],





]

rdd = sc.parallelize(dicts, 3)

dataf = sqlContext.createDataFrame(rdd, ['name', 'id', 'age', 'email'])





# 验证 id 字段必须是整数

id_filter = F.col("id").cast("int") >= 0



# 验证 name 字段必须是非空字符串

name_filter = F.col("name").isNotNull() & (F.col("name") != "")



# 验证 age 字段必须是大于等于 0 的整数

age_filter = F.col("age").cast("int") >= 0



# 验证 email 字段必须是有效的电子邮件地址

email_filter = F.col("email").rlike("^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}$")



# 应用过滤条件

valid_data = dataf.filter(id_filter & name_filter & age_filter & email_filter)



# 输出符合质量要求的数据

valid_data.show()



# 输出不符合质量要求的数据

invalid_data = dataf.exceptAll(valid_data)

invalid_data.show()

更多内容欢迎来到我的知识星球: