[转] Spark sql 内置配置(V2.2)

时间:2024-02-20 07:05:20

【From】 https://blog.csdn.net/u010990043/article/details/82842995

 

 最近整理了一下spark SQL内置配。加粗配置项是对sparkSQL 调优性能影响比较大的项,小伙伴们按需酌情配置。后续会挑出一些通用调优配置,共大家参考。有不正确的地方,欢迎大家在留言区留言讨论。 

 

配置项 默认值 概述
spark.sql.optimizer.maxIterations 100 sql优化器最大迭代次数
spark.sql.optimizer.inSetConversionThreshold 10 插入转换的集合大小阈值
spark.sql.inMemoryColumnarStorage.compressed TRUE 当设置为true时,SCAPK SQL将根据数据的统计自动为每个列选择压缩编解码器
spark.sql.inMemoryColumnarStorage.batchSize 10000 控制用于列缓存的批处理的大小。较大的批处理大小可以提高内存利用率和压缩率,但缓存数据时会出现OOM风险
spark.sql.inMemoryColumnarStorage.partitionPruning  TRUE 启用内存中的列表分区剪枝
spark.sql.join.preferSortMergeJoin TRUE When true, 使用sort merge join 代替 shuffle hash join
spark.sql.sort.enableRadixSort TRUE 使用基数排序,基数排序性能非常快,但是会额外使用over heap.当排序比较小的Row时,overheap 需要提高50%
spark.sql.autoBroadcastJoinThreshold 10L * 1024 * 1024 当执行join时,被广播到worker节点上表最大字节。当被设置为-1,则禁用广播。当前仅仅支持 Hive Metastore tables,表大小的统计直接基于hive表的源文件大小
spark.sql.limit.scaleUpFactor 4 在执行查询时,两次尝试之间读取partation数目的增量。较高的值会导致读取过多分区,较少的值会导致执行时间过长,因为浙江运行更多的作业
spark.sql.statistics.fallBackToHdfs FALSE 当不能从table metadata中获取表的统计信息,返回到hdfs。这否有用取决与表是否足够小到能够使用auto broadcast joins
spark.sql.defaultSizeInBytes Long.MaxValue 在查询计划中表默认大小,默认被设置成Long.MaxValue 大于spark.sql.autoBroadcastJoinThreshold的值,也就意味着默认情况下不会广播一个表,除非他足够小
spark.sql.shuffle.partitions 200 当为join/aggregation  shuffle数据时,默认partition的数量
spark.sql.adaptive.shuffle.targetPostShuffleInputSize 64 * 1024 * 1024byte The target post-shuffle input size in bytes of a task.
spark.sql.adaptive.enabled FALSE 是否开启adaptive query execution(自适应查询执行)
spark.sql.adaptive.minNumPostShufflePartitions -1 测试用
spark.sql.subexpressionElimination.enabled TRUE When true, common subexpressions will be eliminated
当为真时,将删除公共子表达式
spark.sql.caseSensitive FALSE 查询分析器是否区分大小写,默认情况下不区分。强烈建议不区分大小写
spark.sql.constraintPropagation.enabled   是否开启优化,在查询优化器期间推断和传播查询计划中的数据约束。对于某种类型的查询计划(例如有大量谓语和别名的查询),约束传播是昂贵的,会对整个运行时间产生负面影响。
spark.sql.parser.escapedStringLiterals FALSE 2.0之前默认值为true,知否默认是否。正常文字能否包含在正则表达式中。
spark.sql.parquet.mergeSchema FALSE 若为true,在读取parquet数据源时,schema从所有文件中合并出来。否则如果没有可用的摘要文件,则从概要文件或随机文件中选择模式
spark.sql.parquet.respectSummaryFiles FALSE 若为ture,假设parquet的所有部分文件和概要文件一致,在合并模式时会忽略他们。否则将会合并所有的部分文件
spark.sql.parquet.binaryAsString FALSE 是否向下兼容其他parquet生产系统(eg impala or older version spark sql ),不区分字节数据和string数据写到parquet schema,这个配置促使spark sql将二进制数据作为string达到兼容
spark.sql.parquet.int96AsTimestamp TRUE 是否使用Int96作为timestamp的存储格式,可以避免精度损失丢失纳秒部分,为其他parquet系统提供兼容(impala)
spark.sql.parquet.int64AsTimestampMillis FALSE 当为true,timestamp值将以Int64作为mlibs的存储扩展类型,这种模式微秒将被丢弃
spark.sql.parquet.cacheMetadata TRUE 是否缓存parquet的schema数据元,可以提升静态数据的查询性能
spark.sql.parquet.compression.codec snappy 支持类型:uncompressed", "snappy", "gzip", "lzo"。 指定parquet写文件的压缩编码方式
spark.sql.parquet.filterPushdown TRUE 是否开启parquet过滤条件下推
spark.sql.parquet.writeLegacyFormat FALSE spark sql在拼接schema时是否遵循parquet的schema的规范
spark.sql.parquet.output.committer.class org.apache.parquet.hadoop.ParquetOutputCommitter parquet输出提交器类,同城必须是org.apache.hadoop.mapreduce.OutputCommitter的子类,如果不是将不会创建数据源摘要,即使配置开启了parquet.enable.summary-metadata
spark.sql.parquet.enableVectorizedReader TRUE 开启parquet向量解码
spark.sql.orc.filterPushdown FALSE 是否开启条件下推到orc文件写
spark.sql.hive.verifyPartitionPath FALSE 当为true时,在读取HDFS中存储的数据时,检查表根目录下的所有分区路径
spark.sql.hive.metastorePartitionPruning TRUE 当为true,spark sql的谓语将被下推到hive metastore中,更早的消除不匹配的分区,会影响到违背转换成文件源关系的hive表
spark.sql.hive.manageFilesourcePartitions TRUE 是否使用hive metastore管理spark sql的 dataSource表分区,若为true,dataSource表会在执行计划期间使用分区剪枝 
spark.sql.hive.filesourcePartitionFileCacheSize 250 * 1024 * 1024 当非0时,开启将分区文件数据元缓存到内存中,所有表共享一个缓存,当开启 hive filesource partition management(spark.sql.hive.manageFilesourcePartitions)时才会生效
spark.sql.hive.caseSensitiveInferenceMode INFER_AND_SAVE 设置无法从hive表属性读取分区大小写模式时所采取的操作,虽然Spice SQL本身不区分大小写,但hive兼容的文件格式如parquet。Spark sql必须使用一个保持情况的模式,当查询由包含区分大小写字段名或查询的文件支持的任何表可能无法返回准确的结果时。有效选项包括INFER_AND_SAVE(默认模式——从基础数据文件推断出区分大小写的模式,并将其写入表属性),INFER_ONLY(推断schema但不尝试将其写入表属性)和NEVER_INFER(回退到使用区分大小写间接转移模式代替推断)
spark.sql.optimizer.metadataOnly TRUE 当为true时,启用仅使用表的元数据的元数据查询优化来生成分区列,而不是表扫描。当扫描的所有列都是分区列,并且查询具有满足不同语义的聚合运算符时,它适用。
spark.sql.columnNameOfCorruptRecord _corrupt_record 当json/csv数据内部列解析失败时,失败列的名称
spark.sql.broadcastTimeout" 5*60 在broadCast join时 ,广播等待的超时时间
spark.sql.thriftserver.scheduler.pool   为JDBC客户端会话设置公平调度程序池
spark.sql.thriftServer.incrementalCollect FALSE 当TRUE时,启用增量集合以在thrift server中执行
spark.sql.thriftserver.ui.retainedStatements 200 JDBC/ODBC Web用户界面历史记录中SQL语句的数量
spark.sql.thriftserver.ui.retainedSessions 200 JDBC/ODBC Web UI历史中保存的SQL客户端会话数
spark.sql.sources.default parquet 输入输出默认数据元
spark.sql.hive.convertCTAS FALSE 如果时true,将使用spark.sql.sources.default.设置数据源,不指定任何存储属性到hive ctas语句
spark.sql.hive.gatherFastStats TRUE 在修复表分区时,将快速收集STATS(文件数量和所有文件的总大小),以避免HIVE转移子中的顺序列表。
spark.sql.sources.partitionColumnTypeInference.enabled TRUE 是否自动推断分区列的数据类型
spark.sql.sources.bucketing.enabled TRUE 当false时,分桶表当作普通表处理
spark.sql.crossJoin.enabled FALSE 当false时,如果查询中语法笛卡儿积 却语法中没有显示join,将会抛出异常
spark.sql.orderByOrdinal TRUE 当为true时,排序字段放置到seleect List,否则被忽略
spark.sql.groupByOrdinal TRUE 当为true时,按组子句的序号被视为选择列表中的位置。当为false时,序数被忽略。
spark.sql.groupByAliases TRUE group by后的别名是否能够被用到 select list中,若为否将抛出分析异常
spark.sql.sources.parallelPartitionDiscovery.threshold 32 允许在driver端列出文件的最大路径数。如果在分区发现期间检测到的路径的数量超过该值,则尝试用另一个SCAPLE分布式作业来列出文件。这适用于parquet、ORC、CSV、JSON和LIbSVM数据源。
spark.sql.sources.parallelPartitionDiscovery.parallelism 10000 递归地列出路径集合的并行数,设置阻止文件列表生成太多任务的序号
spark.sql.selfJoinAutoResolveAmbiguity TRUE 自动解决子链接中的连接条件歧义,修复bug SPARK-6231
spark.sql.retainGroupColumns TRUE 是否保留分组列
spark.sql.pivotMaxValues 10000  
spark.sql.runSQLOnFiles TRUE 当为true,在sql查询时,能够使用dataSource.path作为表(eg:"select a,b from hdfs://xx/xx/*")
spark.sql.codegen.wholeStage TRUE 当为true,多个算子的整个stage将被便宜到一个java方法中
spark.sql.codegen.maxFields 100 在激活整个stage codegen之前支持的最大字段(包括嵌套字段)
spark.sql.codegen.fallback TRUE 当为true,在整个stage的codegen,对于编译generated code 失败的query 部分,将会暂时关闭
spark.sql.codegen.maxCaseBranches 20 支持最大的codegen
spark.sql.files.maxPartitionBytes 128 * 1024 * 1024 在读取文件时,一个分区最大被读取的数量,默认值=parquet.block.size
spark.sql.files.openCostInBytes 4 * 1024 * 1024 为了测定打开一个文件的耗时,通过同时扫描配置的字节数来测定,最好是过度估计,那么小文件的分区将比具有较大文件的分区更快(首先调度
spark.sql.files.ignoreCorruptFiles FALSE 是否自动跳过不正确的文件
spark.sql.files.maxRecordsPerFile 0 写入单个文件的最大条数,如果时0或者负数,则无限制
spark.sql.exchange.reuse TRUE planer是否尝试找出重复的 exchanges并复用
spark.sql.streaming.stateStore.minDeltasForSnapshot 10 在合并成快照之前需要生成的状态存储增量文件的最小数目
spark.sql.streaming.checkpointLocation   检查点数据流的查询的默认存储位置
spark.sql.streaming.minBatchesToRetain 100 流式计算最小批次长度
spark.sql.streaming.unsupportedOperationCheck TRUE streaming query的logical plan 检查不支持的操作
spark.sql.variable.substitute TRUE  
spark.sql.codegen.aggregate.map.twolevel.enable   启用两级聚合哈希映射。当启用时,记录将首先“插入/查找第一级、小、快的映射,然后在第一级满或无法找到键时回落到第二级、更大、较慢的映射。当禁用时,记录直接进入第二级。默认为真
spark.sql.view.maxNestedViewDepth 100 嵌套视图中视图引用的最大深度。嵌套视图可以引用其他嵌套视图,依赖关系被组织在有向无环图(DAG)中。然而,DAG深度可能变得太大,导致意外的行为。此配置限制了这一点:当分析期间视图深度超过该值时,我们终止分辨率以避免潜在错误。
spark.sql.objectHashAggregate.sortBased.fallbackThreshold 128 在ObjectHashAggregateExec的情况下,当内存中哈希映射的大小增长过大时,我们将回落到基于排序的聚合。此选项为哈希映射的大小设置行计数阈值。
spark.sql.execution.useObjectHashAggregateExec TRUE 是否使用 ObjectHashAggregateExec
spark.sql.streaming.fileSink.log.deletion TRUE 是否删除文件流接收器中的过期日志文件
spark.sql.streaming.fileSink.log.compactInterval 10 日志文件合并阈值,然后将所有以前的文件压缩到下一个日志文件中
spark.sql.streaming.fileSink.log.cleanupDelay 10min 保证一个日志文件被所有用户可见的时长
spark.sql.streaming.fileSource.log.deletion TRUE 是否删除文件流源中过期的日志文件
spark.sql.streaming.fileSource.log.compactInterval 10 日志文件合并阈值,然后将所有以前的文件压缩到下一个日志文件中
spark.sql.streaming.fileSource.log.cleanupDelay 10min 保证一个日志文件被所有用户可见的时长
spark.sql.streaming.schemaInference FALSE 基于文件的流,是否推断它的模式
spark.sql.streaming.pollingDelay 10L(MILLISECONDS) 在没有数据可用时延迟查询新数据多长时间
spark.sql.streaming.noDataProgressEventInterval 10000L(MILLISECONDS) 在没有数据的情况下,在两个进度事件之间等待时间
spark.sql.streaming.metricsEnabled FALSE 是否为活动流查询报告DoopWalth/CODAHALE度量
spark.sql.streaming.numRecentProgressUpdates 100 streaming query 保留的进度更新数量
spark.sql.statistics.ndv.maxError 0.05 生成列级统计量时超对数G+++算法允许的最大估计误差
spark.sql.cbo.enabled FALSE 在设定true时启用CBO来估计计划统计信息
spark.sql.cbo.joinReorder.enabled FALSE Enables join reorder in CBO.
spark.sql.cbo.joinReorder.dp.threshold 12 The maximum number of joined nodes allowed in the dynamic programming algorithm
spark.sql.cbo.joinReorder.card.weight 0.07 The weight of cardinality (number of rows) for plan cost comparison in join reorder:  rows * weight + size * (1 - weight)
spark.sql.cbo.joinReorder.dp.star.filter FALSE Applies star-join filter heuristics to cost based join enumeration
spark.sql.cbo.starSchemaDetection FALSE When true, it enables join reordering based on star schema detection
spark.sql.cbo.starJoinFTRatio 0.9 Specifies the upper limit of the ratio between the largest fact tables  for a star join to be considered
spark.sql.session.timeZone TimeZone.getDefault.getID  时间时区
spark.sql.windowExec.buffer.in.memory.threshold 4096 窗口操作符保证存储在内存中的行数的阈值
spark.sql.windowExec.buffer.spill.threshold spark.sql.windowExec.buffer.in.memory.threshold 窗口操作符溢出的行数的阈值
spark.sql.sortMergeJoinExec.buffer.in.memory.threshold Int.MaxValue 由sortMergeJoin运算符保证存储在内存中的行数的阈值
spark.sql.sortMergeJoinExec.buffer.spill.threshold spark.sql.sortMergeJoinExec.buffer.in.memory.threshold 由排序合并连接运算符溢出的行数的阈值
spark.sql.cartesianProductExec.buffer.in.memory.threshold 4096 笛卡尔乘积算子保证存储在内存中的行数的阈值
spark.sql.cartesianProductExec.buffer.spill.threshold spark.sql.cartesianProductExec.buffer.in.memory.threshold 笛卡尔乘积算子溢出的行数阈值
spark.sql.redaction.options.regex "(?i)url".r  

 

 

即便join的hive表没有10M,也没有触发 mapjoin[解决方案]
spark在join的时候,用来判断一个表的大小是否达到了10M这个限制,是不会去计算这个表在hdfs上的具体的文件大小的,而是使用hive metadata中的信息,具体如下图:

 

explain出来spark的执行计划如下:

== Physical Plan ==

*Project [device#57, pkg#58]

+- *BroadcastHashJoin [pkg#58], [apppkg#62], Inner, BuildRight

   :- *Filter isnotnull(pkg#58)

   :  +- HiveTableScan [device#57, pkg#58], MetastoreRelation dm_sdk_mapping, device_applist, [isnotnull(day#56), (cast(day#56 as double) = 2.0180501E7)]

   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))

      +- *Filter isnotnull(apppkg#62)

         +- HiveTableScan [apppkg#62], MetastoreRelation dm_sdk_mapping, app_category_mapping

当有些hive没有totalSize这个信息的时候,spark就会用sortMergeJoin来做join了,可以使用下面的命令重新生成metadata信息:

ANALYZE TABLE dm_sdk_mapping.app_category_mapping COMPUTE STATISTICS


---------------------
作者:sunkl_
来源:CSDN
原文:https://blog.csdn.net/u010990043/article/details/82842995
版权声明:本文为博主原创文章,转载请附上博文链接!