Databend query result cache 设计与实现

时间:2020-12-21 00:55:11

Databend query result cache 设计与实现

Databend 在 1.0 中支持了对查询结果集的缓存,大大提高了多次相同查询返回结果的效率。

Query result cache 主要用于处理数据更新频率不高的查询,它通过缓存第一次查询返回的结果集,以便在之后对相同数据执行相同查询时能够立即返回结果,从而提高查询效率。

比如我们有个需求是每隔十秒获取一次销量前 5 的产品,通过以下 sql 执行查询:

SELECT product, count(product) AS sales_count
FROM sales_log
GROUP BY product
ORDER BY sales_count DESC
LIMIT 5;

在没有 cache 的情况下,每次都需要执行完整的 sql 查询流程,而整个流程可能耗时比较久,但结果仅仅返回5条数据。如果 sales_count 表中的数据更新频率不高,那么通过 cache 可以立即返回之后查询的结果,大大降低了等待时间和 Server 的负载。

整体设计

Query Result Cache 的生命周期

每个被缓存的结果集都会设置一个缓存失效时间(TTL),每次对相同缓存结果集的访问都会刷新失效时间,缓存的默认失效时间为 300 秒,可以通过设置 query_result_cache_ttl_secs 来修改。当失效时间到达后,缓存的结果集将不再可用。

除了 TTL 之外,如果底层数据(如 snapshot id、segment id、partition location)发生变化,缓存就会变得不准确。但是,这种底层数据的修改不会影响缓存的效果。如果仍然希望快速返回结果集,可以通过设置 SET query_result_cache_allow_inconsistent=1 来允许返回不一致的结果。如果您对 Databend 底层存储结构感兴趣,可以参考 Databend 存储概览

缓存结果存储

Databend 使用键值对来存储查询结果集,对于每一次查询, Databend 根据 query 信息构造一个对应的 key,然后将查询结果集的一些元信息构造成 value 存入到 meta service 中。

其中 Key 的生成规则为:

// 将 ast 序列化为 string,然后通过 hash 函数拿到对应的 hash 值
let ast_hash = sha256(formatted_ast);
// 将 result cache 的前缀,当前租户和上面生成的 hash 值拼接,得到最终 key
let key = format!("{RESULT_CACHE_PREFIX}/{tenant}/{ast_hash}");

Value 的结构如下(注意:value 中只存储对应结果集的元信息,真正的结果集会写到当前使用的 storage 中,比如 local fs, s3...):

struct ResultCacheValue {
    /// 原始查询 SQL.
    pub sql: String,
    /// 该次查询的 query_id
    pub query_id: String,
    /// 查询持续时间.
    pub query_time: u64,
    /// 缓存失效时间
    pub ttl: u64,
    /// 结果集大小,单位:字节
    pub result_size: usize,
    /// 结果集一共包含多少行数据
    pub num_rows: usize,
    /// 查询命中的 partitions 的 hash 值,每个表一个 hash 值
    pub partitions_shas: Vec<String>,
    /// 结果集缓存文件在底层存储中的地址
    pub location: String,
}

读取 cache

读 cache 流程比较简单,通过以下伪代码说明:

// 通过格式化之后的 ast 来生成查询语句对应的 key
let key = gen_result_cache_key(formatted_ast);

// 构建 cache reader
let cache_reader = ResultCacheReader::create(ctx, key, meta_client, allow_inconsistent);

// cache reader 首先从 meta service 中通过 key 得到对应的 ResultCacheValue
// ResultCacheValue 的结构见之前的代码段
let value = cache_reader.get(key)

// 如果可以容忍不一致,或者查询覆盖的 partitions 的 hash 值相同
// 就会通过 location 去底层存储读取缓存结果集,然后返回。
if allow_inconsistent || value.partitions_shas == ctx.partitions_shas {
    read_result_from_cache(&value.location)
}

写入 cache

           ┌─────────┐ 1  ┌─────────┐ 1
           │         ├───►│         ├───►Dummy───►Downstream
Upstream──►│Duplicate│ 2  │         │ 3
           │         ├───►│         ├───►Dummy───►Downstream
           └─────────┘    │         │
                          │ Shuffle │
           ┌─────────┐ 3  │         │ 2  ┌─────────┐
           │         ├───►│         ├───►│  Write  │
Upstream──►│Duplicate│ 4  │         │ 4  │ Result  │
           │         ├───►│         ├───►│  Cache  │
           └─────────┘    └─────────┘    └─────────┘ 

写 cache 的主要流程如上图所示,当一个查询执行没有命中 cache 时,就会触发写 cache 流程。

Databend 使用 pipeline 方式调度和处理读写任务,通常的 pipeline 流程是 source -> transform -> transform .. -> sink, 写 cache 会增加一个 sink 出口,因此需要首先并行的加一条管道来复制上游数据 (图中 duplicate 部分)

而由于 pipeline 中前置节点的 output port 和后置节点的 input port 是一一对应的,所以这里我们通过 shuffle 来重排序,以此来衔接前后处理节点。

注意事项

如果 query 中使用了不确定性的函数,比如 now(), rand(), uuid(),那么结果集将不会被 cache,另外 system 下的表也不会被 cache。

另外目前结果集最大缓存 1MiB 的数据,可以通过设置 query_result_cache_max_bytes 来调整允许 cache 的大小。

使用方式

相关设置

// 进行如下设置开启 query result cache,
// 后续 databend 将会默认打开这个设置
SET enable_query_result_cache=1;

// 进行如下设置来容忍不准确的结果
SET query_result_cache_allow_inconsistent=1;

测试 cache 是否生效

SET enable_query_result_cache=1;

SELECT WatchID, ClientIP, COUNT(*) AS c, SUM(IsRefresh), AVG(ResolutionWidth) FROM hits GROUP BY WatchID, ClientIP ORDER BY c DESC LIMIT 10;
+---------------------+-------------+------+----------------+----------------------+
| watchid             | clientip    | c    | sum(isrefresh) | avg(resolutionwidth) |
+---------------------+-------------+------+----------------+----------------------+
| 6655575552203051303 |  1611957945 |    2 |              0 |               1638.0 |
| 8566928176839891583 | -1402644643 |    2 |              0 |               1368.0 |
| 7904046282518428963 |  1509330109 |    2 |              0 |               1368.0 |
| 7224410078130478461 |  -776509581 |    2 |              0 |               1368.0 |
| 5957995970499767542 |  1311505962 |    1 |              0 |               1368.0 |
| 5295730445754781367 |  1398621605 |    1 |              0 |               1917.0 |
| 8635802783983293129 |   900266514 |    1 |              1 |               1638.0 |
| 5650467702003458413 |  1358200733 |    1 |              0 |               1368.0 |
| 6470882100682188891 | -1911689457 |    1 |              0 |               1996.0 |
| 6475474889432602205 |  1501294204 |    1 |              0 |               1368.0 |
+---------------------+-------------+------+----------------+----------------------+
10 rows in set (3.255 sec)

SELECT WatchID, ClientIP, COUNT(*) AS c, SUM(IsRefresh), AVG(ResolutionWidth) FROM hits GROUP BY WatchID, ClientIP ORDER BY c DESC LIMIT 10;
+---------------------+-------------+------+----------------+----------------------+
| watchid             | clientip    | c    | sum(isrefresh) | avg(resolutionwidth) |
+---------------------+-------------+------+----------------+----------------------+
| 6655575552203051303 |  1611957945 |    2 |              0 |               1638.0 |
| 8566928176839891583 | -1402644643 |    2 |              0 |               1368.0 |
| 7904046282518428963 |  1509330109 |    2 |              0 |               1368.0 |
| 7224410078130478461 |  -776509581 |    2 |              0 |               1368.0 |
| 5957995970499767542 |  1311505962 |    1 |              0 |               1368.0 |
| 5295730445754781367 |  1398621605 |    1 |              0 |               1917.0 |
| 8635802783983293129 |   900266514 |    1 |              1 |               1638.0 |
| 5650467702003458413 |  1358200733 |    1 |              0 |               1368.0 |
| 6470882100682188891 | -1911689457 |    1 |              0 |               1996.0 |
| 6475474889432602205 |  1501294204 |    1 |              0 |               1368.0 |
+---------------------+-------------+------+----------------+----------------------+
10 rows in set (0.066 sec)

可以看到,相同的查询,第二次的结果是立即返回的。

RESULT_SCAN

Query result cache 同时提供了 RESULT_SCAN 的 table function,在同一个 session 中,可以快速根据 query_id 来拿到之前查询的结果,使用方式可以参考文档

另外用户可以通过 SELECT * from system.query_cache 来获取当前租户下被 cache 所有结果集的元信息,包括

sql 结果集对应的原始 sql
query_id 结果集对应的 query id
result_size 缓存结果集的大小
num_rows 缓存结果集的行数
partitions_sha 查询对应 partitions 的 hash 值
location 缓存结果集在存储中的地址
active_result_scan 为 true 表示可以被 result_scan 使用

未来规划

  • 缓存数据清理:当前缓存的结果集在 TTL 到期后不可用,但是底层数据并未被清理,未来可以有个定时任务去清理过期数据
  • 对缓存结果进行压缩,进一步节省空间
  • 对复合 SQL 进行结果集缓存,比如:(INSERT INTO xxx SELECT ...COPY FROM SELECT)

对以上改进感兴趣的同学欢迎为 Databend 添砖加瓦。

致谢

Databend 结果集缓存的设计参考了 ClickHouse 和 Snowflake,如果想进一步跟进 query result cache 的细节,请参考以下链接:

关于 Databend

Databend 是一款开源、弹性、低成本,基于对象存储也可以做实时分析的新式数仓。期待您的关注,一起探索云原生数仓解决方案,打造新一代开源 Data Cloud。