@
核心架构
Druid servers建议将它们组织为三种服务器类型:Master主服务器、Query查询服务器和Data数据服务器。
- Master:Master管理数据摄入和可用性,负责启动新的摄入作业,并协调“数据服务器”上的数据可用性。在主服务器中功能划分为Coordinator和Overlord两个进程。
- Query:处理来自外部客户端的查询,查询服务器提供用户和客户端应用程序与之交互的端点,将查询路由到数据服务器或其他查询服务器(以及可选的代理主服务器请求)。在查询服务器中功能被分为Broker和Router两个进程。
- Data:执行摄取作业并存储可查询的数据,功能被分为Historical进程和MiddleManager两个进程。
服务进程类型细分如下:
- Coordinator(协调器):服务管理集群上的数据可用性。协调器进程监视数据服务器上的历史进程,负责将Segments分配到特定的服务器,并确保Segments在各个历史数据之间得到很好的平衡。
- Overlord:服务控制数据摄取工作负载的分配。Overlord进程监视Data服务器上的MiddleManager进程,并且是数据摄取到Druid的控制器。负责将摄取任务分配给middlemanager并协调Segments发布。
- Broker:代理处理来自外部客户端的查询。代理进程从外部客户端接收查询,并将这些查询转发给数据服务器。当代理从这些子查询接收到结果时,合并这些结果并将它们返回给调用者。用户通常是查询broker而不直接在数据服务器上查询Historicals或MiddleManagers进程。
- Router:Router服务是可选的;他们将请求路由到broker、coordinator和Overlords。路由器进程是可选进程,在Druid broker、Overlords和coordinator面前提供统一的API网关。也可以直接请求broker、coordinator和Overlords。Router还运行web控制台、数据源、分段、任务、数据流程(Historicals和MiddleManagers)的管理UI,以及协调器动态配置;还可以在控制台中运行SQL和本地Druid查询。
- Historical:处理存储和查询“历史”数据(包括在系统中存在足够长时间以提交的任何流数据)的主力。历史进程从深层存储中下载Segments并响应关于这些Segments的查询,不接受写操作。
- MiddleManager:服务摄取数据。负责将新数据导入集群,从外部数据源读取数据并发布新的Druid Segments。
- Indexer process:可选的,是MiddleManagers和Peons的替代方案。Indexer不是为每个任务派生单独的JVM进程,而是在单个JVM进程中作为单独的线程运行任务。与MiddleManager + Peon系统相比,Indexer的设计更容易配置和部署,并且更好地支持跨任务共享资源。Indexer是一个较新的特性,由于它的内存管理系统仍在开发中,所以目前还处于试验阶段,将在Druid的未来版本中逐渐成熟。通常情况下,可以部署MiddleManagers或Indexers,但不能同时部署两者。
外部依赖
- 深度存储:Druid使用深度存储来存储任何已经摄入到系统中的数据。深度存储是每个Druid服务器都可以访问的共享文件存储。在集群部署中,这通常是一个分布式对象存储,如S3、HDFS或一个网络挂载的文件系统。在单服务器部署中是本地磁盘。
- 元数据存储:存储各种共享的系统元数据,如段使用信息和任务信息。在集群部署中,这通常是传统的RDBMS,如PostgreSQL或MySQL。在单服务器部署中,它通常是本地存储的Apache Derby数据库。
- ZooKeeper:用于内部服务发现、协调和领导者选举。
核心内容
Druid能够实现海量数据实时分析采取了如下特殊⼿段:
- 预聚合
- 列式存储
- 多级分区(Datasource和Segments)+位图索引
roll-up预聚合
Apache Druid可以在摄入原始数据时使用称为“roll-up”的过程进行汇总。roll-up是针对选定列集的一级聚合操作,可减小存储数据的大小。分析查询逃不开聚合操作,Druid在数据⼊库时就提前进⾏了聚合,这就是所谓的预聚合(roll-up)。Druid把数据按照选定维度的相同的值进⾏分组聚合,可以⼤⼤降低存储⼤⼩。数据查询的时候只需要预聚合的数据基础上进⾏轻量的⼆次过滤和聚合即可快速拿到分析结果。要做预聚合,Druid要求数据能够分为三个部分:
- Timestamp列:Druid所有分析查询均涉及时间(思考:时间实际上是⼀个特殊的维度,它可以衍⽣出⼀堆维度,Druid把它单列出来了)
- Dimension列(维度):Dimension列指⽤于分析数据⻆度的列,例如从地域、产品、⽤户的⻆度来分析订单数据,⼀般⽤于过滤、分组等等。
- Metric列(度量):Metric列指的是⽤于做聚合和其他计算的列;⼀般来说是数字。
使用网络流事件数据的一个小样本,表示在特定秒内发生的从源到目的IP地址的流量的包和字节计数,数据如下:
{"timestamp":"2018-01-01T01:01:35Z","srcIP":"1.1.1.1","dstIP":"2.2.2.2","packets":20,"bytes":9024}
{"timestamp":"2018-01-01T01:01:51Z","srcIP":"1.1.1.1","dstIP":"2.2.2.2","packets":255,"bytes":21133}
{"timestamp":"2018-01-01T01:01:59Z","srcIP":"1.1.1.1","dstIP":"2.2.2.2","packets":11,"bytes":5780}
{"timestamp":"2018-01-01T01:02:14Z","srcIP":"1.1.1.1","dstIP":"2.2.2.2","packets":38,"bytes":6289}
{"timestamp":"2018-01-01T01:02:29Z","srcIP":"1.1.1.1","dstIP":"2.2.2.2","packets":377,"bytes":359971}
{"timestamp":"2018-01-01T01:03:29Z","srcIP":"1.1.1.1","dstIP":"2.2.2.2","packets":49,"bytes":10204}
{"timestamp":"2018-01-02T21:33:14Z","srcIP":"7.7.7.7","dstIP":"8.8.8.8","packets":38,"bytes":6289}
{"timestamp":"2018-01-02T21:33:45Z","srcIP":"7.7.7.7","dstIP":"8.8.8.8","packets":123,"bytes":93999}
{"timestamp":"2018-01-02T21:35:45Z","srcIP":"7.7.7.7","dstIP":"8.8.8.8","packets":12,"bytes":2818}
timestamp是Timestamp列,srcIP和dstIP是Dimension列(维度),packets和bytes是Metric列。数据⼊库到Druid时如果打开预聚合功能(可以不打开聚合,数据量大建议打开),要求对packets和bytes进⾏累加(sum),并且要求按条计数(count *),聚合之后的数据如下,可以看出聚合是以牺牲明细数据分析查询为代价。
列式存储
列式存储的概念已经非常耳熟,但凡在⼤数据领域想要解决快速存储和分析海量数据基本都会采⽤列式存储,一般来说OLTP数据库使用行式存储,OLAP数据使用列式存储。
- 对于分析查询,⼀般只需要⽤到少量的列,在列式存储中,只需要读取所需的数据列即可。 例如,如果您需要100列中的5列,则I / O减少20倍。
- 按列分开存储,按数据包读取时因此更易于压缩。 列中的数据具有相同特征也更易于压缩, 这样可以进⼀步减少I / O量。
- 由于减少了I / O,因此更多数据可以容纳在系统缓存中,进⼀步提⾼分析性能。
Datasource和Segments
- Apache Druid将其数据和索引存储在按时间划分的段文件中。Druid为每个包含数据的段间隔创建一个段。如果间隔为空(即不包含行),则该时间间隔不存在段。
- 如果你在同一段时间内通过不同的摄入作业摄入数据,Druid可能会在同一段时间内创建多个分段。压缩是Druid过程,它试图将这些段组合成每个间隔的单个段,以获得最佳性能。为了让Druid在重载查询负载下运行良好,段文件大小在300-700 MB的推荐范围内是很重要的。如果您的段文件大于此范围,则考虑更改段时间间隔的粒度或对数据进行分区和/或调整partitionsSpec中的targetRowsPerSegment。这个参数的一个起点是500万行。
- 段文件是柱状的,每一列的数据在单独的数据结构中进行布局。通过分别存储每个列,Druid通过只扫描查询实际需要的列来减少查询延迟。有三种基本的列类型:时间戳、维度和度量,例如
- 时间戳和度量类型列是用LZ4压缩的整数或浮点值数组。一旦查询确定了要选择的行,就会对它们进行解压,取出相关行,并应用所需的聚合操作符。如果查询不需要列,Druid会跳过该列的数据。
- 维度列是不同的,因为它们支持筛选和分组操作,所以每个维度都需要以下三种数据结构:
- Dictionary(字典):将值(总是被视为字符串)映射为整数id,允许列表和位图值的紧凑表示。
- List(列表):列的值,使用字典进行编码。GroupBy和TopN查询必选。这些操作符允许在不访问值列表的情况下运行仅基于过滤器聚合指标的查询。
- Bitmap(位图):列中每个不同值的位图,用于指示哪些行包含该值。位图允许快速过滤操作,因为它们便于快速应用AND和OR操作符。也称为倒排指数。
Druid的数据在存储层⾯是按照Datasource和Segments实现多级分区存储的,并建⽴了位图索引。
- Datasource相当于关系型数据库中的表。
- Datasource会按照时间来分⽚(类似于HBase⾥的Region和Kudu⾥的tablet),每⼀个时间分⽚被称为chunk。
- chunk并不是直接存储单元,在chunk内部数据还会被切分为⼀个或者多个segment。
- 所有的segment独⽴存储,通常包含数百万⾏,segment与chunk的关系如下图:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-xkKQdpk9-1675265402036)(http://www.itxiaoshen.com:3001/assets/1675159187424CTMTr68s.png)]
Segment跟Chunk
- Segment是Druid数据存储的最小单元,内部采用列式存储,建立了位图索引,对数据进行了编码跟压缩,
Druid数据存储的摄取方式、聚合方式、每列数据存储的字节起始位都有存储。
位图索引
例如有一份数据如下
以tp为时间列,appkey和city为维度,以value为度量值,导⼊Druid后按天聚合,最终结果如下
聚合后数据经过聚合之后查询本身就很快了,为了进⼀步加速对聚合之后数据的查询,Druid会建立位图索引如下
上⾯的位图索引不是针对列⽽是针对列的值,记录了列的值在数据的哪⼀行出现过,第一列是具体列的值,后续列标识该列的值在某⼀⾏是否出现过,依次是第1列到第n列。例如appkey1在第⼀⾏出现过,在其他⾏没出现,那就是1000(例子中只有四个列)。
Select sum(value)
from xxx
where time='2019-11-11' and appkey in('appkey1','appkey2') and area='北京'
当我们有如上查询时,⾸先根据时间段定位到segment,然后根据appkey in (‘appkey1’,’appkey2’) and area=’北京’ 查到各⾃的bitmap:(appkey1(1000) or appkey2(0110)) and 北京(1100) = (1100) 也就是说,符合条件的列是第⼀行和第⼆行,这两⾏的metric的和为125.
数据摄取
-
在Druid中加载数据称为摄取或索引。当摄取数据到Druid时,Druid从源系统读取数据并将其存储在称为段的数据文件中;通常,每个段文件包含几百万行。
-
对于大多数摄取方法,Druid MiddleManager进程或Indexer进程加载源数据。唯一的例外是基于Hadoop的摄取,它在YARN上使用Hadoop MapReduce作业。
-
在摄入过程中,Druid创建片段并将它们存储在深层存储中。历史节点将段加载到内存中以响应查询。对于流输入,中层管理人员和索引人员可以使用到达的数据实时响应查询。
-
Druid包含流式和批量摄取方法,以下描述了适用于所有摄入方法的摄入概念和信息。
- Druid数据模型介绍了数据源、主时间戳、维度和度量的概念。
- 数据预聚合将预聚合描述为一个概念,并提供了最大化预聚合好处的建议。
- 分区描述了Druid中的时间块和二级分区。
-
流摄取:有两个可用的选项;流摄取由一个持续运行的管理器控制。
-
批量摄取:有三种可供批量摄入的选择。批量摄取作业与在作业期间运行的控制器任务相关联。
查询
Apache Druid支持两种查询语言:Druid SQL和本机查询;可以使用Druid SQL查询Druid数据源中的数据。Druid将SQL查询翻译成其本地查询语言。Druid SQL计划发生在Broker上。设置Broker运行时属性以配置查询计划和JDBC查询。
- Data types:Druid列支持的数据类型列表的数据类型。
- Aggregation functions:聚合函数用于Druid SQL SELECT语句可用的聚合函数列表。
- Scalar functions:用于Druid SQL标量函数的标量函数,包括数字和字符串函数、IP地址函数、Sketch函数等。
- SQL multi-value string functions:SQL多值字符串函数,用于在包含多个值的字符串维度上执行操作。
- Query translation:查询翻译,了解Druid如何在运行SQL查询之前将其翻译为本机查询。
Apache Druid 包含的API如下:
- Druid SQL API:关于HTTP API的信息的Druid SQL API。
- SQL JDBC driver API:SQL JDBC驱动程序API获取有关JDBC驱动程序API的信息。
- SQL query context:SQL查询上下文,获取有关影响SQL规划的查询上下文参数的信息。
Apache Druid的本地查询类型和本地查询组件内容如下:
- 本地查询类型
- 本地查询组件
集群部署
部署规划
使用hadoop1、hadoop2、hadoop3共3台搭建druid的集群,如果有更多服务器可以随时启动相应组件即可,集群规模不大Master Server3台和Query Server2台即可,更多的是根据处理数据规模增加Data Server节点。
主机 | 组件 |
---|---|
hadoop1 | Master Server(Coordinator和Overlords) |
hadoop2 | Data Server(Historical和MiddleManager) |
hadoop3 | Query Server(Broker和Router) |
前置条件
- Java 8 or 11(使用现有)
- Python2 or Python3(使用现有Python3)
- MySQL(元数据存储,使用现有MySQL 8.0.28)
- HDFS(深度存储,使用现有hadoop 3.3.4)
- ZooKeeper(使用现有)
MySQL配置
- 创建数据库
-- 创建一个druid数据库,确保使用utf8mb4作为编码
CREATE DATABASE druid DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_bin;
-- 创建一个druid用户
CREATE USER 'druid'@'%' IDENTIFIED BY 'diurd';
-- 向用户授予刚刚创建的数据库的所有权限
GRANT ALL PRIVILEGES ON druid.* TO druid@'%' WITH GRANT OPTION;
ALTER USER 'druid'@'%' IDENTIFIED WITH mysql_native_password BY 'druid';
FLUSH PRIVILEGES;
- 将MySQL驱动(mysql-connector-java-8.0.28.jar)上传到druid根目录下的extensions/mysql-metadata-storage目录下
- 修改集群配置文件。vi conf/druid/cluster/_common/common.runtime.properties
druid.host=hadoop1
# 在扩展加载列表中包含mysql-metadata-storage和下面使用的druid-hdfs-storage
druid.extensions.loadList=["druid-hdfs-storage", "druid-kafka-indexing-service", "druid-datasketches", "druid-multi-stage-query","mysql-metadata-storage"]
#druid.metadata.storage.type=derby
#druid.metadata.storage.connector.connectURI=jdbc:derby://localhost:1527/var/druid/metadata.db;create=true
#druid.metadata.storage.connector.host=localhost
#druid.metadata.storage.connector.port=1527
druid.metadata.storage.type=mysql
druid.metadata.storage.connector.connectURI=jdbc:mysql://mysqlserver:3306/druid
druid.metadata.storage.connector.user=druid
druid.metadata.storage.connector.password=diurd
HDFS配置
- 注释掉“深度存储”和“索引服务日志”下的本地存储配置。vi conf/druid/cluster/_common/common.runtime.properties
#druid.storage.type=local
#druid.storage.storageDirectory=var/druid/segments
druid.storage.type=hdfs
druid.storage.storageDirectory=/druid/segments
#druid.indexer.logs.type=file
#druid.indexer.logs.directory=var/druid/indexing-logs
druid.indexer.logs.type=hdfs
druid.indexer.logs.directory=/druid/indexing-logs
- 将Hadoop配置xml (core-site.xml, hdfs-site.xml, yarn-site.xml, mapred-site.xml)放在Druid进程的类路径中。把它们复制到conf/druid/cluster/_common/。
- 连接Hadoop的配置(可选),如果需要从Hadoop集群中加载数据则需要配置,并将Hadoop配置xml (core-site.xml, hdfs-site.xml,)放在Druid进程的类路径中。把它们复制到conf/druid/cluster/_common/。vi conf/druid/cluster/data/middleManager/runtime.properties
druid.indexer.task.baseTaskDir=/var/druid/task
# Hadoop indexing
druid.indexer.task.hadoopWorkingPath=/var/druid/hadoop-tmp
Zookeeper配置
vi conf/druid/cluster/_common/common.runtime.properties
druid.zk.service.host=zk1:2181,zk2:2181,zk3:2181
启动集群
# 将apache-druid分别到另外两台服务器上,并修改druid.host
rsync -az apache-druid-25.0.0/ hadoop2:/home/commons/apache-druid-25.0.0/
rsync -az apache-druid-25.0.0/ hadoop3:/home/commons/apache-druid-25.0.0/
# hadoop1上启动Master Serve
bin/start-cluster-master-no-zk-server
# hadoop2上启动Data Server
bin/start-cluster-data-server
# hadoop3上启动Query Server
bin/start-cluster-query-server
# 如果集群规模较大需要分离进程模块,也可以单独启动
bin/coordinator.sh start
bin/overlord.sh start
bin/historical.sh start
bin/middleManager.sh start
bin/broker.sh start
bin/jconsole.sh start
# 单独关闭
bin/coordinator.sh stop
bin/overlord.sh stop
bin/historical.sh stop
bin/middleManager.sh stop
bin/broker.sh stop
bin/jconsole.sh stop
启动完毕后访问查询节点的Druid的控制台UI,http://hadoop3:8888/,点击Services栏目可以看到所有进程服务详细信息
导入HDFS示例
# 先将官方提供的示例数据上传到hdfs
hdfs dfs -put wikiticker-2015-09-12-sampled.json.gz /tmp/my-druid
然后和前面单机版导入操作流程相似,只是选择输入类型为HDFS,填写Paths为上面上传的路径/tmp/my-druid/wikiticker-2015-09-12-sampled.json.gz
生成SQL如下,修改表名为wikipedia(原来为data)
REPLACE INTO "wikipedia" OVERWRITE ALL
WITH "ext" AS (SELECT *
FROM TABLE(
EXTERN(
'{"type":"hdfs","paths":"/tmp/my-druid/wikiticker-2015-09-12-sampled.json.gz"}',
'{"type":"json"}',
'[{"name":"time","type":"string"},{"name":"channel","type":"string"},{"name":"cityName","type":"string"},{"name":"comment","type":"string"},{"name":"countryIsoCode","type":"string"},{"name":"countryName","type":"string"},{"name":"isAnonymous","type":"string"},{"name":"isMinor","type":"string"},{"name":"isNew","type":"string"},{"name":"isRobot","type":"string"},{"name":"isUnpatrolled","type":"string"},{"name":"metroCode","type":"long"},{"name":"namespace","type":"string"},{"name":"page","type":"string"},{"name":"regionIsoCode","type":"string"},{"name":"regionName","type":"string"},{"name":"user","type":"string"},{"name":"delta","type":"long"},{"name":"added","type":"long"},{"name":"deleted","type":"long"}]'
)
))
SELECT
TIME_PARSE("time") AS "__time",
"channel",
"cityName",
"comment",
"countryIsoCode",
"countryName",
"isAnonymous",
"isMinor",
"isNew",
"isRobot",
"isUnpatrolled",
"metroCode",
"namespace",
"page",
"regionIsoCode",
"regionName",
"user",
"delta",
"added",
"deleted"
FROM "ext"
PARTITIONED BY DAY
查看数据源可以看到wikipedia表信息
查看HDFS上也有相应的段数据
输入SQL,点击运行查询数据
SELECT
channel,
COUNT(*)
FROM "wikipedia"
GROUP BY channel
ORDER BY COUNT(*) DESC
可以通过http请求查询,这里以官方示例TopN查询为例
curl -X POST 'http://hadoop3:8888/druid/v2/?pretty' -H 'Content-Type:application/json' -d @wikipedia-top-pages.json
查看数据摄取的任务信息
查看段信息
- 本人博客网站IT小神 www.itxiaoshen.com