STORAGE LAYER
On-DiskFormat
MergeTree
引擎的插入分为两种形式,同步插入的情况下每个Insert
语句都会创建一个Part
,异步插入和一般LSM
的写入一致,维护内存memtable
,基于大小和时间维度做下刷,后者适应于实时分析的场景,因为一般客户端无法接受批量导入的时延。
对于磁盘存储结构,都是老生常谈的东西,官方文档基本上描述的清楚了,想快一点看[1]也可以。
大致描述就是把8192个行定义为granules
,这是最小的逻辑单位,主键索引就是这一层,但是读取的最小逻辑单位是Block
,默认1MB,内部可能包含多个granules
,块本身也可以被压缩,为了能够在压缩的情况下快速随机访问单个granules
,ClickHouse
还为每一列存储了一个映射,该映射将每个granules ID
与其包含的压缩块在列中的偏移量以及granules
在未压缩块中的偏移量关联起来。
列可以进一步进行字典编码或使用两种wrapper data types
:
-
LowCardinality(T)
将原始列值替换为整数 ID,从而显著减少具有少量唯一值的数据的存储开销。 -
Nullable(T)
向列 T 添加一个内部位图,表示列值是否为 NULL
Data Pruning
文中提到了:
-
主键索引:存储主键到
granules ID
的映射 - Table projections:主键不同的另外一张表,可以加快筛选不同于主表主键列的查询速度,但会增加插入、合并和空间消耗的开销。查询优化器根据估计的 I/O 成本在从主表读取和投影之间进行选择。
-
skipping 索引:包括
Min-max indices
,Set indices
,Bloom flter indices
具体细节可以参考[7]
Merge-time Data Transformation
文中提到了:
-
Replacing merges:去除重复主键行,默认删除时间戳较老的,也可以指定一个特殊的版本用作判断哪一行应该被留下。从[12]中可以看到因为行的去处是在
compact
时发生的,所以查询并不能确保完全没有重复值,这对于部分业务场景是致命的。 -
Aggregating merges:标准物化视图实现,可以参考官方文档的
AggregatingMergeTree
和SummingMergeTree
,我们之前想做一个类似的事情,不同的是要支持first/last/mean等复杂算子,但是后续发现在存算分离的系统做物化视图完全没必要这样玩,直接搞几个计算节点跑SQL,结果写到目标表完事,隔离又好,速度又快,就是相对的实时性差一些。 -
TTL (time-to-live) merges:可以重新压缩,删除数据,或者把数据写入到更廉价的存储,这个地方想做的通用工程上还是要下不少功夫的,因为不是从一个log直接拿数据上传,而是需要把存量数据转换并上传到三方,过程中也要注意隔离和资源使用。之前我们的一种做法是在
compact
的时候判断ttl
去做相关逻辑,不知道ck是怎么做的。
Updates and Deletes
- Mutations rewrite:比较有意思的做法,就地重写表。为了防止更新导致的大小暂时翻倍,此操作是非原子的,即并行 SELECT 语句可以读取突变和未突变的部分,但是可预料到的开销非常大。
- Lightweight deletes:轻量级删除仅更新内部位图列,指示行是否被删除,compact和查询的时候做过滤,可以从[13]中看到Presto这样的列存系统在做行删的时候对查询性能影响是很大的,因为查询的时候需要去掉被删除的数据。
Idempotent Inserts
客户端在将数据发送到服务器以插入表中后如何处理连接超时?在这种情况下,客户端很难区分数据是否成功插入。
前面章节提到的ReplacingMergeTree
也可以处理这种情况,传统上,通过将数据从客户端重新发送到服务器并依靠主键或唯一约束来拒绝重复插入来解决该问题,由于这些数据结构索引每个数据项,因此它们的空间和更新开销对于大型数据集和高写入来说难以承受,所以CK有个轻量级的玩法,服务器维护最后插入的 N 个Part
的哈希值,并忽略具有已知哈希值Part
的重新插入。
Data Replication
文中给了一个简单的case:
- Node1收到两个Insert语句,并写入复制流,为图中的1,2
- Node2重放日志,为图中的3
- Node2从Node1下载数据,为图中的4
- Node3重防日志,为图中的3,5
- Node3从Node1下载数据,为图中的4,6
- Node3执行merge,并记录log,为图中的7
这里比较重要的一点是并不是简单同步log,然后把数据重放到引擎,这样的架构最大的问题就是几副本整个集群的开销就是几倍,所以实际的数据部分是从主来拉取的。如果有统一的持久化层负责可靠性,天生就拥有了这个能力,但可惜的是很多价格当前还是有类似的问题。
QUERY PROCESSING LAYER
极端简单的描述,基本上可以理解为啥也没说。
把查询的并行分为三层,分布式计划划分后的Node层面并行,多个Block的Core级别并行,Simd指令级并行。
node层面不谈了,simd层面上从[11]的DATA-PARALLEL EXECUTION (SIMD)章节中可以看到很多分析型场景的瓶颈是在数据访问,即IO层面,并不是计算层面,所以simd的优化并不是所有场景有显著效果的。
从Core层面并行来说,因为ck和Velox都是典型的Plan-Driven
,而不是像duckdb
那样的Morsel-Driven
,所以必须要在计划生成的时候就指定好并行数,即同步点下pipeline
的数量就是最大并行数。
论文中还描述了一些其他优化策略:
- Query optimization:constant folding,extracting scalars from certain aggregation functions,common subexpression elimination, transforming disjunctions of equality flters to IN-lists
- Query compilation:code-gen
- Primary key index evaluation:淦,没看懂前两个优化,只看懂允许把部分函数比如toYear(k) = 2024转换为关键列的比较 k >= 2024-01-01 && k < 2025-01-01,利用排序性
-
Data skipping:利用
Data Pruning
提到的索引结构来跳过数据。 - Hash tables:哈希表的实现对于aggregation 和 hash joins至关重要,ck的哈希表实现非常丰富,实现了三十多种哈希表,这个可以在后续有需求时仔细调研下。
Workload Isolation
ClickHouse ofers concurrency control, memory usage limits, and I/O scheduling, enabling users to isolate queries into workload classes. By setting limits on shared resources (CPU cores, DRAM, disk and network I/O) for specifc workload classes, it ensures these queries do not afect other critical business queries.
Concurrency control prevents thread oversubscription in scenarios with a high number of concurrent queries. More specifcally, the number of worker threads per query are adjusted dynamically based on a specifed ratio to the number of available CPU cores.
ClickHouse tracks byte sizes of memory allocations at the server, user, and query level, and thereby allows to set fexible memory usage limits. Memory overcommit enables queries to use additional free memory beyond the guaranteed memory, while assuring memory limits for other queries. Furthermore, memory usage for aggregation, sort, and join clauses can be limited, causing fallbacks to external algorithms when the memory limit is exceeded.
Lastly, I/O scheduling allows users to restrict local and remote disk accesses for workload classes based on a maximum bandwidth, in-fight requests, and policy (e.g. FIFO, SFC [32]).
Workload classes
的思路是没问题的,Velox中没有这样的实现,在Velox中把不同的操作,比如访问对象存储,计算,spill等分为不同的folly::exector
,以此做线程隔离,并没有IO级别的隔离。
但是在Velox中内存的限制基本和ck一致,使用树型的memory pool
追踪全局的内存,并在查询超过单独设置的内存上限时触发Spill
和MemoryArbitrator
,以此为高优查询释放内存。
INTEGRATION LAYER
有两种方法可以使外部数据在 OLAP 数据库中可用:
- push-based:第三方组件将数据库与外部数据存储连接起来,比如 ETL 工具,将远程数据转换格式后推送到目标系统。
- pull-based:在基于拉的模型中,数据库本身连接到远程数据源并拉取数据以查询本地表或将数据导出到远程系统。
ck使用后者,确实是一个很有意思的功能,可以把远程存储当作本地表做查询或者写入,虽然没有技术难点,但是做到像ck这样支持五十多种外部系统的工程量是不可想象的。