基于Apache Hudi 构建Serverless实时分析平台

时间:2023-02-12 11:08:05

NerdWallet 的使命是为生活中的所有财务决策提供清晰的信息。 这涵盖了一系列不同的主题:从选择合适的信用卡到管理您的支出,到找到最好的个人贷款,再到为您的抵押贷款再融资。 因此,NerdWallet 提供了跨越众多领域的强大功能,例如信用监控和警报、用于跟踪净值和现金流的仪表板、机器学习 (ML) 驱动的建议,以及为数百万用户提供的更多功能。

基于Apache Hudi 构建Serverless实时分析平台
为了为我们的用户构建一个一体化和高性能的体验,我们需要能够使用来自多个独立团队的大量不同的用户数据。 这需要强大的数据文化以及一套数据基础设施和自助服务工具,以实现创造力和协作。

这篇文章中我们阐述了一个用例,该用例说明 NerdWallet 如何通过构建支持来自整个公司的流数据的无服务器 Serverless 管道来扩展其数据生态系统。 我们迭代了两种不同的架构,并且说明在初始设计中遇到的挑战,以及我们在第二个架构中使用 Apache Hudi 和其他 AWS 服务所获得的收益。

NerdWallet 收集了大量的支出数据。 此数据用于为用户构建有用的仪表板和见解。数据存储在 Amazon Aurora 集群中。 尽管 Aurora 集群作为联机事务处理 (OLTP) 引擎运行良好,但它不适合大型、复杂的联机分析处理 (OLAP) 查询,因此我们无法向分析师和数据工程师公开直接的数据库访问权限。数据所有者必须使用只读副本上的数据来解决此类请求。 随着数据量以及数据消费者和请求的多样性的增长,这个过程变得更加难以维护。 此外数据科学家大多需要从 Amazon Simple Storage Service (Amazon S3) 等对象存储访问数据文件。

我们决定探索所有消费者都可以使用开放标准工具和协议安全且可扩展地独立完成他们自己的数据请求的替代方案。 从数据网格范例中汲取灵感,我们设计了一个基于 Amazon S3 的数据湖,将数据生产者与消费者分离,同时提供自助服务、安全合规且可扩展的易于配置的工具集。

初始架构

下图是初始设计的架构

基于Apache Hudi 构建Serverless实时分析平台

该设计包括以下关键组件:

  • 使用 AWS Data Migration Service (AWS DMS),因为它是一种托管服务,可促进数据从各种数据存储(例如关系数据库和 NoSQL 数据库)移动到 Amazon S3。 AWS DMS 允许使用变更数据捕获 (CDC) 进行一次性迁移和持续复制,以保持源数据存储和目标数据存储同步。
  • 使用 Amazon S3 作为我们数据湖的基础,因为它具有可扩展性、持久性和灵活性。 您可以将存储从 GB 级无缝增加到 PB 级,只需为您使用的部分付费,它提供 11 个 9 的可用性,支持结构化、半结构化和非结构化数据,并与广泛的 AWS 服务组合进行原生集成。
  • AWS Glue 是一种完全托管的数据集成服务。 AWS Glue 可以更轻松地在不同数据存储之间分类、清理、转换和可靠地传输数据。
  • Amazon Athena 是一种无服务器交互式查询引擎,可让您使用标准 SQL 直接在 Amazon S3 中轻松分析数据。 Athena 自动扩展——并行运行查询——因此结果很快,即使是大型数据集、高并发和复杂的查询。

该架构适用于小型测试数据集,然而团队很快就遇到了大规模生产数据集的问题。

挑战

团队遇到了以下挑战

  • 长批处理时间和复杂的转换逻辑——Spark 批处理作业的单次运行需要 2-3 小时才能完成,并且在针对数十亿条记录进行测试时,我们最终支付了相当大的 AWS 账单。 核心问题是我们必须重建最新状态并为每个作业运行重写每个分区的整个记录集,即使增量更改是分区的单个记录也是如此。
  • 大量客户端增加了复杂性——此工作负载包含数百万个客户端,一种常见的查询模式是按单个客户端 ID 进行过滤。 我们*进行了许多优化,例如谓词下推、调整 Parquet 文件大小、使用分桶分区方案等。 随着越来越多的数据所有者采用这种架构,我们将不得不针对他们的数据模型和消费者查询模式定制每一个优化。
  • 实时用例的有限可扩展性——这种批量提取、转换和加载 (ETL) 架构无法扩展以处理每秒数千条记录更新插入的每小时更新。 此外数据平台团队要跟上多样化的实时分析需求将是一项挑战。 增量查询、时间旅行查询、改进延迟等都需要在很长一段时间内进行大量投入。改进这个问题将开启近实时 ML 推理和基于事件的警报等。

由于初始架构设计的所有这些限制,我们决定重新设计一个真正的增量处理架构

解决方案

下图展示了我们重新设计的架构。为了支持实时用例,我们在架构中添加了 Amazon Kinesis Data Streams、AWS Lambda、Amazon Kinesis Data Firehose 和 Amazon Simple Notification Service (Amazon SNS)。

基于Apache Hudi 构建Serverless实时分析平台

新引入的组件如下:

  1. Amazon Kinesis Data Streams 是一种无服务器流数据服务,可以轻松捕获、处理和存储数据流。 我们将 Kinesis 数据流设置为 AWS DMS 的目标,数据流收集 CDC 日志。
  2. 我们使用 Lambda 函数来转换 CDC 记录。 我们在 Lambda 函数的记录级别应用架构验证和数据扩充,转换后的结果发布到第二个 Kinesis 数据流以供数据湖使用和 Amazon SNS 主题,以便可以将更改分散到各种下游系统。
  3. 下游系统可以订阅 Amazon SNS 主题并根据 CDC 日志采取实时操作(在几秒钟内)。 这可以支持异常检测和基于事件的警报等用例。
  4. 为了解决批处理时间长的问题,我们使用 Apache Hudi 格式存储数据,并使用 AWS Glue 流作业执行流式 ETL。 Apache Hudi 是一个开源的事务性数据湖框架,极大地简化了增量数据处理和数据管道开发。 Hudi 允许使用增量数据管道构建流式数据湖,支持事务、记录级更新和删除存储在数据湖中的数据。 Hudi 与各种 AWS 分析服务(如 AWS Glue、Amazon EMR 和 Athena)很好地集成,这使其成为我们之前架构的直接扩展。 Apache Hudi 解决了记录级更新和删除挑战,而 AWS Glue 流作业将长时间运行的批处理转换转换为低延迟的微批处理转换。 我们使用 Apache Hudi 的 AWS Glue 连接器在 AWS Glue 流式处理作业中导入 Apache Hudi 依赖项,并将转换后的数据连续写入 Amazon S3。 Hudi 完成了记录级更新插入的所有繁重工作,而我们只需配置编写器并将数据转换为 Hudi Copy-on-Write 表类型。 借助 Hudi on AWS Glue 流式作业,我们将核心数据集的数据新鲜度延迟从数小时减少到 15 分钟以下
  5. 为了解决高基数 UUID 的分区挑战,我们使用分桶技术。 Bucketing 将基于特定列的数据分组到一个分区中。 这些列称为存储桶键。 将相关数据分组到一个存储桶(分区中的一个文件)时可以显着减少 Athena 扫描的数据量,从而提高查询性能并降低成本。 我们现有的查询已经根据用户 ID 进行了过滤,因此我们可以显着提高 Athena 使用的性能,而无需通过使用分桶用户 ID 作为分区方案来重写查询。 例如,以下代码显示每个用户在特定类别中的总支出:
SELECT ID, SUM(AMOUNT) SPENDING
FROM "{{DATABASE}}"."{{TABLE}}"
WHERE CATEGORY IN (
'ENTERTAINMENT',
'SOME_OTHER_CATEGORY')
AND ID_BUCKET ='{{ID_BUCKET}}'
GROUP BY ID;
  1. 我们的数据科学家团队可以使用 Amazon SageMaker 访问数据集并执行 ML 模型训练。
  2. 我们通过 Amazon Kinesis Data Firehose 在 Amazon S3 中维护原始 CDC 日志的副本。

结论

采用一种无服务器流处理架构,该架构可以在我们数据湖的新鲜度几分钟内扩展到每秒数千次写入。在目前的规模下,Hudi 作业每秒处理每个 AWS Glue Worker 大约 1.75 MiB,它可以自动向上和向下扩展(得益于 AWS Glue 自动扩展)。 由于 Hudi 的增量更新与我们的第一次架构相比,在不到 5 分钟的时间内端到端新鲜度有了显着改善。

借助 Amazon S3 上的 Hudi,我们已经建立了一个高杠杆基础来个性化我们的用户体验。 拥有数据的团队现在可以通过千篇一律的解决方案中内置的可靠性和性能特征在整个组织内共享他们的数据。 这使我们的数据消费者能够构建更复杂的信号,为生活中的所有财务决策提供清晰度。