使用Spark加载数据到SQL Server列存储表

时间:2021-07-02 15:45:58

原文地址https://devblogs.microsoft.com/azure-sql/partitioning-on-spark-fast-loading-clustered-columnstore-index/#comments

介绍

SQL Server的批量加载方法默认为串行,这意味着例如,一个BULK INSERT语句将生成一个线程将数据插入表中。但是,对于并发负载,您可以使用多个批量插入语句插入同一张表,前提是需要阅读多个文件。

考虑要求所在的情景:

  • 从大文件加载数据(比如,超过 20 GB)
  • 拆分文件不是一个选项,因为它将是整个大容量负载操作中的一个额外步骤。
  • 每个传入的数据文件大小不同,因此很难识别大块数(将文件拆分为)并动态定义为每个大块执行的批量插入语句。
  • 要加载的多个文件跨越多个 GB(例如超过 20 GB 及以上),每个GB 包含数百万条记录。

在这种情况下,使用 Apache Spark是并行批量数据加载到 SQL 表的流行方法之一。

在本文中,我们使用 Azure Databricks spark engine使用单个输入文件将数据以并行流(多个线程将数据加载到表中)插入 SQL Server。目标表可能是Heap、Clustered Index或Clustered Columnstore Index。本文旨在展示如何利用Spark提供的高度分布式框架,在加载到 SQL Server或 Azure SQL的聚集列存储索引表之前仔细对数据分区。

本文中分享的最有趣的观察是展示使用Spark默认配置时列存储表的行组质量降低,以及如何通过高效使用Spark分区来提高质量。从本质上讲,提高行组质量是决定查询性能的重要因素。

环境设置

数据集:

单张表的一个自定义数据集。一个 27 GB 的 CSV 文件,110 M 记录,共 36 列。其中列的类型有int, nvarchar, datetime等。

数据库:

Azure SQL Database – Business Critical, Gen5 80vCores

ELT 平台:

Azure Databricks – 6.6 (includes Apache Spark 2.4.5, Scala 2.11)

Standard_DS3_v2 14.0 GB Memory, 4 Cores, 0.75 DBU (8 Worker Nodes Max)

存储:

Azure Data Lake Storage Gen2

先决条件:

在进一步浏览本文之前,请花一些时间了解此处将数据加载到聚集列存储表中的概述:Data Loading performance considerations with Clustered Columnstore indexes

在此测试中,数据从位于 Azure Data Lake Storage Gen 2的 CSV 文件中加载。CSV 文件大小为 27 GB,有 110 M 记录,有 36 列。这是一个带有随机数据的自定义数据集。

批量加载或预处理(ELT\ETL)的典型架构看起来与下图相似:

使用Spark加载数据到SQL Server列存储表

使用BULK INSERTS

在第一次测试中,单个BULK INSERT用于将数据加载到带有聚集列存储索引的 Azure SQL 表中,这里没有意外,根据所使用的 BATCHSIZE,它花了 30 多分钟才完成。请记住,BULK INSERT是一个单一的线程操作,因此单个流会读取并将其写入表中,从而降低负载吞吐量。

使用Spark加载数据到SQL Server列存储表

使用Spark加载数据到SQL Server列存储表

使用Azure Databricks

为了实现写入到 SQL Server和读取ADLS (Azure Data Lake Storage) Gen 2的最大并发性和高吞吐量,Azure Databricks 被选为平台的选择,尽管我们还有其他选择,即 Azure Data Factory或其他基于Spark引擎的平台。

使用Azure Databricks加载数据的优点是 Spark 引擎通过专用的 Spark API并行读取输入文件。这些 API将使用一定数量的分区,这些分区映射到单个或多个输入文件,映射是在文件的一部分或整个文件上完成的。数据读入Spark DataFrame or, DataSet or RDD (Resilient Distributed Dataset) 。在这种情况下,数据被加载到DataFrame中,然后进行转换(设置与目标表匹配的DataFrame schema),然后数据准备写入 SQL 表。

要将DataFrame中的数据写入 SQL Server中,必须使用Microsoft's Apache Spark SQL Connector。这是一个高性能的连接器,使您能够在大数据分析中使用事务数据,和持久化结果用于即席查询或报告。连接器允许您使用任何 SQL Server(本地数据库或云中)作为 Spark 作业的输入数据源或输出目标。

GitHub repo: Fast Data Loading in Azure SQL DB using Azure Databricks

请注意,目标表具有聚集列存储索引,以实现高负载吞吐量,但是,您也可以将数据加载到Heap,这也将提供良好的负载性能。对于本文的相关性,我们只讨论加载到列存储表。我们使用不同的 BATCHSIZE 值将数据加载到Clustered Columnstore Index中 -请参阅此文档,了解 BATCHSIZE 在批量加载到聚集列存储索引表期间的影响。

以下是Clustered Columnstore Index上的数据加载测试运行,BATCHSIZE为 102400 和 1048576:

使用Spark加载数据到SQL Server列存储表

请注意,我们正在使用 Azure Databricks使用的默认并行和分区,并将数据直接推至 SQL Server聚集列存储索引表。我们没有调整 Azure Databricks使用的任何默认配置。无论所定义的批次大小,我们所有的测试都大致在同一时间完成。

将数据加载到 SQL 中的 32 个并发线程是由于上述已提供的数据砖群集的大小。该集群最多有 8 个节点,每个节点有 4 个内核,即 8*4 = 32 个内核,最多可运行 32 个并发线程。

查看行组(Row Groups)

有关我们使用 BATCHSIZE 1048576 插入数据的表格,以下是在 SQL 中创建的行组数:

行组总数:

SELECT COUNT(1)

FROM sys.dm_db_column_store_row_group_physical_stats

WHERE object_id = OBJECT_ID('largetable110M_1048576')

216

行组的质量:

SELECT *

FROM sys.dm_db_column_store_row_group_physical_stats

WHERE object_id = OBJECT_ID('largetable110M_1048576')

使用Spark加载数据到SQL Server列存储表

在这种情况下,我们只有一个delta store在OPEN状态 (total_rows = 3810) 和 215 行组处于压缩状态, 这是有道理的, 因为如果插入的批次大小是>102400 行, 数据不再delta store存储, 而是直接插入一个压缩行组的列存储。在这种情况下,压缩状态中的所有行组都有 >102400 条记录。现在,有关行组的问题是:

为什么我们有216行组?

为什么当我们的BatchSize设置为 1048576 时,每个行组的行数不同?

请注意,每个行组的数据大约等于上述结果集中的 500,000 条记录。

这两个问题的答案是 Azure Databricks Spark引擎对数据分区控制了写入聚集列存储索引表行组的数据行数。让我们来看看 Azure Databricks为有关数据集创建的分区数:

# Get the number of partitions before re-partitioning

print(df_gl.rdd.getNumPartitions())

216

因此,我们为数据集创建了 216 个分区。请记住,这些是分区的默认数。每个分区都有大约 500000 条记录。

# Number of records in each partition

from pyspark.sql.functions

import spark_partition_id

df_gl.withColumn("partitionId", spark_partition_id()).groupBy("partitionId").count().show(10000)

使用Spark加载数据到SQL Server列存储表

将Spark分区中的记录数与行组中的记录数进行比较,您就会发现它们是相等的。甚至分区数也等于行组数。因此,从某种意义上说,1048576 的 BATCHSIZE 正被每个分区中的行数过度拉大。

sqldbconnection = dbutils.secrets.get(scope =
"sqldb-secrets", key =
"sqldbconn")

sqldbuser = dbutils.secrets.get(scope =
"sqldb-secrets", key =
"sqldbuser")

sqldbpwd = dbutils.secrets.get(scope =
"sqldb-secrets", key =
"sqldbpwd")

servername =
"jdbc:sqlserver://"
+ sqldbconnection url = servername +
";"
+
"database_name="
+
<Your
Database
Name>
+
";"

table_name =
"<Your Table Name>"

# Write data to SQL table with BatchSize 1048576

df_gl.write \

.format("com.microsoft.sqlserver.jdbc.spark") \

.mode("overwrite") \

.option("url", url) \

.option("dbtable", table_name) \

.option("user", sqldbuser) \

.option("password", sqldbpwd) \

.option("schemaCheckEnabled",
False) \

.option("BatchSize",
1048576) \

.option("truncate",
True) \

.save()

行组质量

行组质量由行组数和每个行组记录决定。由于聚集列存储索引通过扫描单行组的列段扫描表,则最大化每个行组中的行数可增强查询性能。当行组具有大量行数时,数据压缩会改善,这意味着从磁盘中读取的数据更少。为了获得最佳的查询性能,目标是最大限度地提高聚集列索引中每个行组的行数。行组最多可有 1048576 行。但是,需要注意的是,由于聚集列索引,行组必须至少有 102400 行才能实现性能提升。此外,请记住,行组的最大大小(100万)可能在每一个情况下都达到,文件行组大小不只是最大限制的一个因素,但受到以下因素的影响。

字典大小限制,即 16 MB

插入指定的批次大小

表的分区方案,因为行组不跨分区

内存压力导致行组被修剪

索引重组,重建

话虽如此,现在一个重要的考虑是让行组大小尽可能接近 100 万条记录。在此测试中,由于每个行组的大小接近 500000 条记录,我们有两个选项可以达到约 100 万条记录的大小:

在Spark中,更改分区数,使每个分区尽可能接近 1048576 条记录,

保持Spark分区(默认值),一旦数据加载到表中,就运行 ALTER INDEX REORG,将多个压缩行组组合成一组。

选项#1很容易在Python或Scala代码中实现,该代码将在A Azure Databricks上运行,负载相当低。

选项#2是数据加载后需要采取的额外步骤,当然,这将消耗 SQL 上的额外 CPU ,并增加整个加载过程所需的时间。

为了保持本文的相关性,让我们来讨论更多关于Spark分区,以及如何从其默认值及其在下一节的影响中更改它。

Spark Partitioning

Spark 引擎最典型的输入源是一组文件,这些文件通过将每个节点上的适当分区划分为一个或多个 Spark API来读取这些文件。这是 Spark 的自动分区,将用户从确定分区数量的忧虑中抽象出来,如果用户想挑战,就需控制分区的配置。根据环境和环境设置计算的分区的默认数通常适用于大多数情况下。但是,在某些情况下,更好地了解分区是如何自动计算的,如果需要,用户可以更改分区计数,从而在性能上产生明显差异。

注意:大型Spark群集可以生成大量并行线程,这可能导致 Azure SQL DB 上的内存授予争议。由于内存超时,您必须留意这种可能性,以避免提前修剪。请参阅本文以了解更多详细信息,了解表的模式和行数等也可能对内存授予产生影响。

spark.sql.files.maxPartitionBytes是控制分区大小的重要参数,默认设置为128 MB。它可以调整以控制分区大小,因此也会更改由此产生的分区数。

spark.default.parallelism这相当于worker nodes核心的总数。

最后,我们有coalesce()和repartition(),可用于增加/减少分区数,甚至在数据已被读入Spark。

只有当您想要减少分区数时,才能使用coalesce() ,因为它不涉及数据的重排。请考虑此data frame的分区数为 16,并且您希望将其增加到 32,因此您决定运行以下命令。

df = df.coalesce(32)

print(df.rdd.getNumPartitions())

但是,分区数量不会增加到 32 个,并且将保持在 16 个,因为coalesce()不涉及数据重排。这是一个性能优化的实现,因为无需昂贵的数据重排即可减少分区。

如果您想将上述示例的分区数减少到 8,则会获得预期的结果。

df = df.coalesce(8)

print(df.rdd.getNumPartitions())

这将合并数据并产生 8 个分区。

repartition() 是另一个帮助调整分区的函数。对于同一示例,您可以使用以下命令将数据放入 32 个分区。

df = df.repartition(32)

print(df.rdd.getNumPartitions())

最后,还有其他功能可以改变分区数,其中是groupBy(), groupByKey(), reduceByKey() 和 join().。当在 DataFrame 上调用这些功能时,会导致跨机器或通常跨执行器对数据进行重排,最终在默认情况下将数据重新划分为 200 个分区。此默认 数字可以使用spark.sql.shuffle.partitions配置进行控制。

数据加载

现在,了解分区在 Spark 中的工作原理以及如何更改分区,是时候实施这些学习了。在上述实验中,分区数为 216(默认情况下),这是因为文件的大小为 27 GB,因此将 27 GB 除以 128 MB(默认情况下由 Spark 定义的最大分区字节)提供了216 个分区

Spark重新分区的影响

对 PySpark 代码的更改是重新分区数据并确保每个分区现在有 1048576 行或接近它。为此,首先在DataFrame中获取记录数量,然后将其除以 1048576。此划分的结果将是用于加载数据的分区数,假设分区数为n但是,可能有一些分区现在有 >=1048576 行,因此,为了确保每个分区都<=1048576行,我们将分区数作为n+1使用n+1在分区结果为 0 的情况下也很重要。在这种情况下,您将有一个分区。

由于数据已加载到DataFrame中,而 Spark 默认已创建分区,我们现在必须再次重新分区数据,分区数等于n+1。

# Get the number of partitions before re-partitioning
print(df_gl.rdd.getNumPartitions())
216
# Get the number of rows of DataFrame and get the number of partitions to be used.
rows = df_gl.count()
n_partitions = rows//1048576
# Re-Partition the DataFrame
df_gl_repartitioned = df_gl.repartition(n_partitions+1)
# Get the number of partitions after re-partitioning
print(df_gl_repartitioned.rdd.getNumPartitions())
105
# Get the partition id and count of partitions
df_gl_repartitioned.withColumn("partitionId",
spark_partition_id()).groupBy("partitionId").count().show(10000)

使用Spark加载数据到SQL Server列存储表

因此,在重新划分分区后,分区数量从216 个减少到 105 (n+1),因此每个分区现在都有接近1048576行。

此时,让我们将数据再次写入 SQL 表中,并验证行组质量。这一次,每个行组的行数将接近每个分区中的行数(略低于 1048576)。让我们看看下面:

重新分区后的行组

SELECT COUNT(1)
FROM sys.dm_db_column_store_row_group_physical_stats
WHERE object_id = OBJECT_ID('largetable110M_1048576')
105

重新分区后的行组质量

使用Spark加载数据到SQL Server列存储表

从本质上讲,这次整体数据加载比之前慢了 2 秒,但行组的质量要好得多。行组数量减少到一半,行组几乎已填满到最大容量。请注意,由于DataFrame的重新划分,将消耗额外的时间,这取决于数据帧的大小和分区数。

请注意,您不会总是获得每row_group 100 万条记录。它将取决于数据类型、列数等,以及之前讨论的因素-请参阅sys.dm_db_column_store_row_group_physical_stats

关键点

  1. 建议在将数据批量加载到 SQL Server时使用BatchSize(无论是 CCI 还是Heap)。但是,如果 Azure Databricks 或任何其他 Spark 引擎用于加载数据,则数据分区在确定聚集列存储索引中的行组质量方面起着重要作用。
  2. 使用BULK INSERT SQL 命令加载数据将遵守命令中提到的BATCHSIZE,除非其他因素影响插入行组的行数。
  3. Spark 中的数据分区不应基于某些随机数,最好动态识别分区数,并将n+1 用作分区数。
  4. 由于聚集列存储索引通过扫描单行组的列段扫描表,则最大化每个行组中的记录数可增强查询性能。为了获得最佳的查询性能,目标是最大限度地提高聚集列存储索引中每个行组的行数。
  5. Azure Databricks的数据加载速度在很大程度上取决于选择的集群类型及其配置。此外,请注意,到目前为止,Azure Databricks连接器仅支持Apache Spark 2.4.5。微软已经发布了对Spark 3.0的支持,它目前在预览版中,我们建议您在开发测试环境中彻底测试此连接器。
  6. 根据data frame的大小、列数、数据类型等,进行重新划分的时间会有所不同,因此您必须从端端角度考虑这次对整体数据加载的考虑。

Azure Data Factory

这是一篇非常好的数据ETL文章,Spark和SQL Server列存储表功能的组合。

Azure Data Factory是当前最成熟,功能最强大的ETL/ELT数据集成服务。其架构就是使用Spark作为计算引擎。

使用Spark加载数据到SQL Server列存储表

https://github.com/mrpaulandrew/A-Day-Full-of-Azure-Data-Factory