【From】 https://blog.csdn.net/u010990043/article/details/82842995
最近整理了一下spark SQL内置配。加粗配置项是对sparkSQL 调优性能影响比较大的项,小伙伴们按需酌情配置。后续会挑出一些通用调优配置,共大家参考。有不正确的地方,欢迎大家在留言区留言讨论。
配置项 | 默认值 | 概述 |
spark.sql.optimizer.maxIterations | 100 | sql优化器最大迭代次数 |
spark.sql.optimizer.inSetConversionThreshold | 10 | 插入转换的集合大小阈值 |
spark.sql.inMemoryColumnarStorage.compressed | TRUE | 当设置为true时,SCAPK SQL将根据数据的统计自动为每个列选择压缩编解码器 |
spark.sql.inMemoryColumnarStorage.batchSize | 10000 | 控制用于列缓存的批处理的大小。较大的批处理大小可以提高内存利用率和压缩率,但缓存数据时会出现OOM风险 |
spark.sql.inMemoryColumnarStorage.partitionPruning | TRUE | 启用内存中的列表分区剪枝 |
spark.sql.join.preferSortMergeJoin | TRUE | When true, 使用sort merge join 代替 shuffle hash join |
spark.sql.sort.enableRadixSort | TRUE | 使用基数排序,基数排序性能非常快,但是会额外使用over heap.当排序比较小的Row时,overheap 需要提高50% |
spark.sql.autoBroadcastJoinThreshold | 10L * 1024 * 1024 | 当执行join时,被广播到worker节点上表最大字节。当被设置为-1,则禁用广播。当前仅仅支持 Hive Metastore tables,表大小的统计直接基于hive表的源文件大小 |
spark.sql.limit.scaleUpFactor | 4 | 在执行查询时,两次尝试之间读取partation数目的增量。较高的值会导致读取过多分区,较少的值会导致执行时间过长,因为浙江运行更多的作业 |
spark.sql.statistics.fallBackToHdfs | FALSE | 当不能从table metadata中获取表的统计信息,返回到hdfs。这否有用取决与表是否足够小到能够使用auto broadcast joins |
spark.sql.defaultSizeInBytes | Long.MaxValue | 在查询计划中表默认大小,默认被设置成Long.MaxValue 大于spark.sql.autoBroadcastJoinThreshold的值,也就意味着默认情况下不会广播一个表,除非他足够小 |
spark.sql.shuffle.partitions | 200 | 当为join/aggregation shuffle数据时,默认partition的数量 |
spark.sql.adaptive.shuffle.targetPostShuffleInputSize | 64 * 1024 * 1024byte | The target post-shuffle input size in bytes of a task. |
spark.sql.adaptive.enabled | FALSE | 是否开启adaptive query execution(自适应查询执行) |
spark.sql.adaptive.minNumPostShufflePartitions | -1 | 测试用 |
spark.sql.subexpressionElimination.enabled | TRUE | When true, common subexpressions will be eliminated 当为真时,将删除公共子表达式 |
spark.sql.caseSensitive | FALSE | 查询分析器是否区分大小写,默认情况下不区分。强烈建议不区分大小写 |
spark.sql.constraintPropagation.enabled | 是否开启优化,在查询优化器期间推断和传播查询计划中的数据约束。对于某种类型的查询计划(例如有大量谓语和别名的查询),约束传播是昂贵的,会对整个运行时间产生负面影响。 | |
spark.sql.parser.escapedStringLiterals | FALSE | 2.0之前默认值为true,知否默认是否。正常文字能否包含在正则表达式中。 |
spark.sql.parquet.mergeSchema | FALSE | 若为true,在读取parquet数据源时,schema从所有文件中合并出来。否则如果没有可用的摘要文件,则从概要文件或随机文件中选择模式 |
spark.sql.parquet.respectSummaryFiles | FALSE | 若为ture,假设parquet的所有部分文件和概要文件一致,在合并模式时会忽略他们。否则将会合并所有的部分文件 |
spark.sql.parquet.binaryAsString | FALSE | 是否向下兼容其他parquet生产系统(eg impala or older version spark sql ),不区分字节数据和string数据写到parquet schema,这个配置促使spark sql将二进制数据作为string达到兼容 |
spark.sql.parquet.int96AsTimestamp | TRUE | 是否使用Int96作为timestamp的存储格式,可以避免精度损失丢失纳秒部分,为其他parquet系统提供兼容(impala) |
spark.sql.parquet.int64AsTimestampMillis | FALSE | 当为true,timestamp值将以Int64作为mlibs的存储扩展类型,这种模式微秒将被丢弃 |
spark.sql.parquet.cacheMetadata | TRUE | 是否缓存parquet的schema数据元,可以提升静态数据的查询性能 |
spark.sql.parquet.compression.codec | snappy | 支持类型:uncompressed", "snappy", "gzip", "lzo"。 指定parquet写文件的压缩编码方式 |
spark.sql.parquet.filterPushdown | TRUE | 是否开启parquet过滤条件下推 |
spark.sql.parquet.writeLegacyFormat | FALSE | spark sql在拼接schema时是否遵循parquet的schema的规范 |
spark.sql.parquet.output.committer.class | org.apache.parquet.hadoop.ParquetOutputCommitter | parquet输出提交器类,同城必须是org.apache.hadoop.mapreduce.OutputCommitter的子类,如果不是将不会创建数据源摘要,即使配置开启了parquet.enable.summary-metadata |
spark.sql.parquet.enableVectorizedReader | TRUE | 开启parquet向量解码 |
spark.sql.orc.filterPushdown | FALSE | 是否开启条件下推到orc文件写 |
spark.sql.hive.verifyPartitionPath | FALSE | 当为true时,在读取HDFS中存储的数据时,检查表根目录下的所有分区路径 |
spark.sql.hive.metastorePartitionPruning | TRUE | 当为true,spark sql的谓语将被下推到hive metastore中,更早的消除不匹配的分区,会影响到违背转换成文件源关系的hive表 |
spark.sql.hive.manageFilesourcePartitions | TRUE | 是否使用hive metastore管理spark sql的 dataSource表分区,若为true,dataSource表会在执行计划期间使用分区剪枝 |
spark.sql.hive.filesourcePartitionFileCacheSize | 250 * 1024 * 1024 | 当非0时,开启将分区文件数据元缓存到内存中,所有表共享一个缓存,当开启 hive filesource partition management(spark.sql.hive.manageFilesourcePartitions)时才会生效 |
spark.sql.hive.caseSensitiveInferenceMode | INFER_AND_SAVE | 设置无法从hive表属性读取分区大小写模式时所采取的操作,虽然Spice SQL本身不区分大小写,但hive兼容的文件格式如parquet。Spark sql必须使用一个保持情况的模式,当查询由包含区分大小写字段名或查询的文件支持的任何表可能无法返回准确的结果时。有效选项包括INFER_AND_SAVE(默认模式——从基础数据文件推断出区分大小写的模式,并将其写入表属性),INFER_ONLY(推断schema但不尝试将其写入表属性)和NEVER_INFER(回退到使用区分大小写间接转移模式代替推断) |
spark.sql.optimizer.metadataOnly | TRUE | 当为true时,启用仅使用表的元数据的元数据查询优化来生成分区列,而不是表扫描。当扫描的所有列都是分区列,并且查询具有满足不同语义的聚合运算符时,它适用。 |
spark.sql.columnNameOfCorruptRecord | _corrupt_record | 当json/csv数据内部列解析失败时,失败列的名称 |
spark.sql.broadcastTimeout" | 5*60 | 在broadCast join时 ,广播等待的超时时间 |
spark.sql.thriftserver.scheduler.pool | 为JDBC客户端会话设置公平调度程序池 | |
spark.sql.thriftServer.incrementalCollect | FALSE | 当TRUE时,启用增量集合以在thrift server中执行 |
spark.sql.thriftserver.ui.retainedStatements | 200 | JDBC/ODBC Web用户界面历史记录中SQL语句的数量 |
spark.sql.thriftserver.ui.retainedSessions | 200 | JDBC/ODBC Web UI历史中保存的SQL客户端会话数 |
spark.sql.sources.default | parquet | 输入输出默认数据元 |
spark.sql.hive.convertCTAS | FALSE | 如果时true,将使用spark.sql.sources.default.设置数据源,不指定任何存储属性到hive ctas语句 |
spark.sql.hive.gatherFastStats | TRUE | 在修复表分区时,将快速收集STATS(文件数量和所有文件的总大小),以避免HIVE转移子中的顺序列表。 |
spark.sql.sources.partitionColumnTypeInference.enabled | TRUE | 是否自动推断分区列的数据类型 |
spark.sql.sources.bucketing.enabled | TRUE | 当false时,分桶表当作普通表处理 |
spark.sql.crossJoin.enabled | FALSE | 当false时,如果查询中语法笛卡儿积 却语法中没有显示join,将会抛出异常 |
spark.sql.orderByOrdinal | TRUE | 当为true时,排序字段放置到seleect List,否则被忽略 |
spark.sql.groupByOrdinal | TRUE | 当为true时,按组子句的序号被视为选择列表中的位置。当为false时,序数被忽略。 |
spark.sql.groupByAliases | TRUE | group by后的别名是否能够被用到 select list中,若为否将抛出分析异常 |
spark.sql.sources.parallelPartitionDiscovery.threshold | 32 | 允许在driver端列出文件的最大路径数。如果在分区发现期间检测到的路径的数量超过该值,则尝试用另一个SCAPLE分布式作业来列出文件。这适用于parquet、ORC、CSV、JSON和LIbSVM数据源。 |
spark.sql.sources.parallelPartitionDiscovery.parallelism | 10000 | 递归地列出路径集合的并行数,设置阻止文件列表生成太多任务的序号 |
spark.sql.selfJoinAutoResolveAmbiguity | TRUE | 自动解决子链接中的连接条件歧义,修复bug SPARK-6231 |
spark.sql.retainGroupColumns | TRUE | 是否保留分组列 |
spark.sql.pivotMaxValues | 10000 | |
spark.sql.runSQLOnFiles | TRUE | 当为true,在sql查询时,能够使用dataSource.path作为表(eg:"select a,b from hdfs://xx/xx/*") |
spark.sql.codegen.wholeStage | TRUE | 当为true,多个算子的整个stage将被便宜到一个java方法中 |
spark.sql.codegen.maxFields | 100 | 在激活整个stage codegen之前支持的最大字段(包括嵌套字段) |
spark.sql.codegen.fallback | TRUE | 当为true,在整个stage的codegen,对于编译generated code 失败的query 部分,将会暂时关闭 |
spark.sql.codegen.maxCaseBranches | 20 | 支持最大的codegen |
spark.sql.files.maxPartitionBytes | 128 * 1024 * 1024 | 在读取文件时,一个分区最大被读取的数量,默认值=parquet.block.size |
spark.sql.files.openCostInBytes | 4 * 1024 * 1024 | 为了测定打开一个文件的耗时,通过同时扫描配置的字节数来测定,最好是过度估计,那么小文件的分区将比具有较大文件的分区更快(首先调度 |
spark.sql.files.ignoreCorruptFiles | FALSE | 是否自动跳过不正确的文件 |
spark.sql.files.maxRecordsPerFile | 0 | 写入单个文件的最大条数,如果时0或者负数,则无限制 |
spark.sql.exchange.reuse | TRUE | planer是否尝试找出重复的 exchanges并复用 |
spark.sql.streaming.stateStore.minDeltasForSnapshot | 10 | 在合并成快照之前需要生成的状态存储增量文件的最小数目 |
spark.sql.streaming.checkpointLocation | 检查点数据流的查询的默认存储位置 | |
spark.sql.streaming.minBatchesToRetain | 100 | 流式计算最小批次长度 |
spark.sql.streaming.unsupportedOperationCheck | TRUE | streaming query的logical plan 检查不支持的操作 |
spark.sql.variable.substitute | TRUE | |
spark.sql.codegen.aggregate.map.twolevel.enable | 启用两级聚合哈希映射。当启用时,记录将首先“插入/查找第一级、小、快的映射,然后在第一级满或无法找到键时回落到第二级、更大、较慢的映射。当禁用时,记录直接进入第二级。默认为真 | |
spark.sql.view.maxNestedViewDepth | 100 | 嵌套视图中视图引用的最大深度。嵌套视图可以引用其他嵌套视图,依赖关系被组织在有向无环图(DAG)中。然而,DAG深度可能变得太大,导致意外的行为。此配置限制了这一点:当分析期间视图深度超过该值时,我们终止分辨率以避免潜在错误。 |
spark.sql.objectHashAggregate.sortBased.fallbackThreshold | 128 | 在ObjectHashAggregateExec的情况下,当内存中哈希映射的大小增长过大时,我们将回落到基于排序的聚合。此选项为哈希映射的大小设置行计数阈值。 |
spark.sql.execution.useObjectHashAggregateExec | TRUE | 是否使用 ObjectHashAggregateExec |
spark.sql.streaming.fileSink.log.deletion | TRUE | 是否删除文件流接收器中的过期日志文件 |
spark.sql.streaming.fileSink.log.compactInterval | 10 | 日志文件合并阈值,然后将所有以前的文件压缩到下一个日志文件中 |
spark.sql.streaming.fileSink.log.cleanupDelay | 10min | 保证一个日志文件被所有用户可见的时长 |
spark.sql.streaming.fileSource.log.deletion | TRUE | 是否删除文件流源中过期的日志文件 |
spark.sql.streaming.fileSource.log.compactInterval | 10 | 日志文件合并阈值,然后将所有以前的文件压缩到下一个日志文件中 |
spark.sql.streaming.fileSource.log.cleanupDelay | 10min | 保证一个日志文件被所有用户可见的时长 |
spark.sql.streaming.schemaInference | FALSE | 基于文件的流,是否推断它的模式 |
spark.sql.streaming.pollingDelay | 10L(MILLISECONDS) | 在没有数据可用时延迟查询新数据多长时间 |
spark.sql.streaming.noDataProgressEventInterval | 10000L(MILLISECONDS) | 在没有数据的情况下,在两个进度事件之间等待时间 |
spark.sql.streaming.metricsEnabled | FALSE | 是否为活动流查询报告DoopWalth/CODAHALE度量 |
spark.sql.streaming.numRecentProgressUpdates | 100 | streaming query 保留的进度更新数量 |
spark.sql.statistics.ndv.maxError | 0.05 | 生成列级统计量时超对数G+++算法允许的最大估计误差 |
spark.sql.cbo.enabled | FALSE | 在设定true时启用CBO来估计计划统计信息 |
spark.sql.cbo.joinReorder.enabled | FALSE | Enables join reorder in CBO. |
spark.sql.cbo.joinReorder.dp.threshold | 12 | The maximum number of joined nodes allowed in the dynamic programming algorithm |
spark.sql.cbo.joinReorder.card.weight | 0.07 | The weight of cardinality (number of rows) for plan cost comparison in join reorder: rows * weight + size * (1 - weight) |
spark.sql.cbo.joinReorder.dp.star.filter | FALSE | Applies star-join filter heuristics to cost based join enumeration |
spark.sql.cbo.starSchemaDetection | FALSE | When true, it enables join reordering based on star schema detection |
spark.sql.cbo.starJoinFTRatio | 0.9 | Specifies the upper limit of the ratio between the largest fact tables for a star join to be considered |
spark.sql.session.timeZone | TimeZone.getDefault.getID | 时间时区 |
spark.sql.windowExec.buffer.in.memory.threshold | 4096 | 窗口操作符保证存储在内存中的行数的阈值 |
spark.sql.windowExec.buffer.spill.threshold | spark.sql.windowExec.buffer.in.memory.threshold | 窗口操作符溢出的行数的阈值 |
spark.sql.sortMergeJoinExec.buffer.in.memory.threshold | Int.MaxValue | 由sortMergeJoin运算符保证存储在内存中的行数的阈值 |
spark.sql.sortMergeJoinExec.buffer.spill.threshold | spark.sql.sortMergeJoinExec.buffer.in.memory.threshold | 由排序合并连接运算符溢出的行数的阈值 |
spark.sql.cartesianProductExec.buffer.in.memory.threshold | 4096 | 笛卡尔乘积算子保证存储在内存中的行数的阈值 |
spark.sql.cartesianProductExec.buffer.spill.threshold | spark.sql.cartesianProductExec.buffer.in.memory.threshold | 笛卡尔乘积算子溢出的行数阈值 |
spark.sql.redaction.options.regex | "(?i)url".r |
即便join的hive表没有10M,也没有触发 mapjoin[解决方案]
spark在join的时候,用来判断一个表的大小是否达到了10M这个限制,是不会去计算这个表在hdfs上的具体的文件大小的,而是使用hive metadata中的信息,具体如下图:
explain出来spark的执行计划如下:
== Physical Plan ==
*Project [device#57, pkg#58]
+- *BroadcastHashJoin [pkg#58], [apppkg#62], Inner, BuildRight
:- *Filter isnotnull(pkg#58)
: +- HiveTableScan [device#57, pkg#58], MetastoreRelation dm_sdk_mapping, device_applist, [isnotnull(day#56), (cast(day#56 as double) = 2.0180501E7)]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
+- *Filter isnotnull(apppkg#62)
+- HiveTableScan [apppkg#62], MetastoreRelation dm_sdk_mapping, app_category_mapping
当有些hive没有totalSize这个信息的时候,spark就会用sortMergeJoin来做join了,可以使用下面的命令重新生成metadata信息:
ANALYZE TABLE dm_sdk_mapping.app_category_mapping COMPUTE STATISTICS
---------------------
作者:sunkl_
来源:CSDN
原文:https://blog.csdn.net/u010990043/article/details/82842995
版权声明:本文为博主原创文章,转载请附上博文链接!