千万级爬虫并发下的大数据存储与转换

时间:2021-03-01 17:57:26

    宜人蜂巢是领先的、智能的、数据科学驱动的互联网风控科技平台。通过最领先科技与大数据的智能技术,准确预测借款人的信用、偿还能力并实别欺诈、助力贷后风险管理等,从而将公平的信用扩展到更多的人。

    宜人蜂巢于2013年由李善任先生(人称麦哥或Michael),在宜人贷内部组建团队并成功孵化的项目。通过8大维度:金融、电商、社交、保险、社保、行为、位置等约20种数据源,千余维度特征,亿级关系网络等,帮助企业做出更明智的信贷决策,以扩大公平和透明信贷的可用性。

    目前查询量超过6000万次,体验用户超过3000万,通过宜人蜂巢科技平台的促成的放款额已突破1200亿。

    在大数据风控领域,数据是一切工作的根基。在这一系列的技术和金融产品的背后,我们的大数据基础服务到底是怎么在运行,怎么在支撑?本篇文章讲带你了解"宜人蜂巢"千万级爬虫并发下的大数据存储与转换。

“宜人蜂巢”服务相关组件

    首先看一看大数据架构图,总体的来了解一下数据存储的相关服务组件,对于细节的部分我们后面再来讨论。我们可以从数据的角度把整体结构大致分为三个部分:数据接入层、数据的存储层和数据计算层。

千万级爬虫并发下的大数据存储与转换

    1. 数据接入层,实时数据接入使用Dubbox作为分布式调度框架,离线数据主要通过Sqoop2等。选择Dubbox主要是前期并发量不大、支持REST风格远程调用,便于业务端数据的接入。当然目前随着业务的发展及并发增大、项目的稳定,已经换为了RPC的调用方式并在此基础上做了一些改动。

    2. 数据存储层,单一的某个数据库已经远远无法满足于业务的需求,因为数据服务不仅仅只是提供查询还要提供数据分析,数据仓库,机器学习,图谱等相关功能。所以我们选择了Kafka+HBase+ElasticSerach+Hive+Neo4j等组合的方式作为数据存储的基础服务组件。Kafka为数据服务的中转站,一次写入多次读取,并提供数据下发到下游的多个产品线;HBase作为一个高效低延迟的实时分布式数据库,快速响应实时业务相关数据;ElasticSearch为上层的全文搜索引擎,以提供给某些相关业务;Hive作为数据仓库,构建面向分析的集成化数据环境,基于Hive做上层的数据分析,机器学习,孵化出数据相关的其他产品;Neo4j作为一个社交关系知识图谱,优化数据模型,支持反欺诈相关业务。

    3. 数据计算层,主要分为了实时计算部分和离线批处理部分,实时计算:主要以Spark Streaming为主,一些监控数据或计算量不大的数据也使用Kafka Stream;离线批处理:主要以MapReduce还有Spark Core为主。SparkMapReduce都是“On YARN”模式,由YARN来统一管理和调度计算资源。

Kafka数据中转站

    下面我们从Kafka服务—“数据的中转站切入,来看整个大数据存储与转换。主要讨论这三个问题:

1).Kafka为什么可以作为我们数据的神经中枢

2).实时数据部分我们是怎么处理以达到每秒上百万的吞吐量?

3).离线数据部分怎么保证每天上百亿数据的清洗与存储,数据零丢失。

    Kafka目前我们也升级到了最新版本1.1.0以支撑某些需要使用KafkaStream业务,选择Kafka作为我们数据的中转站主要是因为Kafka自身的一些优点以及Kafka消息的可靠性。

Kafka发布订阅消息系统优点

  1.  Kafka高吞吐量,可以支持每秒百万级别的吞吐量;
  2. 生产者一次写入多个消费者都可以读取;
  3. Kafka支持消息生产者在客户端的负载均衡;
  4. 支持多种语言:Java.NetPHPRubyPython 

Kafka消息可靠性:

    消息可靠性保证可以从三个方面来看,第一个就是客户端的写入,第二个Kafak存储机制,第三个数据的读取。我们在这三方面通过一些配置和程序策略来保证整个数据平台数据的可靠性:

  • 客户端的写入

  1. 可以通过message.send.max.retries设置失败重试的次数;
  2. 消息可以批量转单条,通过max.in.flight.requests.per.connection设置;
  3. 消息可以异步转同步,使用future.get()等待消息发送返回结果;
  4. 消息设置同步时,还可以设置acks级别,以确保消息的可靠性。

  • Broker消息的存储

  1. 可以设置broker数据的刷盘策略;
  2. 指定消息的副本数replicas
  3. 设置日志保留策略log.retention.hours

  • 消息的接收客户端默认情况下是自动提交offset,这样可能存在消息丢失的可能性,所以要保证消息的可靠接收,需要将enable.auto.commit设置为false,防止程序自动提交,后面会介绍到我们的一些策略。

实时数据存储

   “宜人蜂巢实时数据目前有这几个需求:

1.适应不同种类的数据格式和数据源,不能预先严格定义模式(NOSQL需求);

2.不强调数据之间的关系,无事务性(摒弃MySQL);

3.以读写为主;

4.需要处理大规模数据;

5.更好的扩展性。

    所以关系型数据库MySQL已经无法满足需求,我们选择了HBaseHBase 一个高可靠、高性能、面向列、可伸缩的分布式存储系统。基于HDFS,支持海量数据读写(尤其是写),支持上亿列、上百万行的,面向列的分布式NoSQL数据库。天然分布式,主从架构,完全满足我们的需求。

千万级爬虫并发下的大数据存储与转换

    1. 实时数据入库部分,主要使用Spark Streaming去消费Kafka的数据入库到HBase。对于业务端或调用方来讲数据相当于是异步,数据只管写入,其他服务来保证下游数据入库。这样就大大的提高了写操作的性能,以达到每秒上百万的吞吐量。使用Spark Streaming也是考虑到我们数据的一些特性,并不需要毫秒级的响应,但是需要一个大的吞吐量。当然我们的Spark Streaming的作用也不仅仅是做一个数据的读取然后入库这么简单,Spark Streaming部分在实时计算部分比如资信报告,特征分析等同样发挥着重要的作用。

千万级爬虫并发下的大数据存储与转换

    2. 实时数据索引部分,在RowKey不满足scan的条件下,使用空间换时间的方式,通过协处理器建立二级索引。对于需要全文检索的业务我们也提供了ElasticSearch+HBase的模式。

千万级爬虫并发下的大数据存储与转换

    3. 数据的可靠性,考虑到防止数据的丢失,也是采用了手动去提交offset,采用默认配置把offset记录写入__consumer_offsetsTopic中。

离线数据存储

    非实时性消费的数据或者叫离线数据,主要是一些T+1的数据通过MapReduceSparkCore解析清洗入库,以提供给报表业务、数据分析、机器学习相关产品等。该部分主要讲两个方面:1)离线数据的处理,我们在Zookeeper中是怎么记录offset以保证数据的零丢失。2)在近实时写入数据到Hive时我们是怎么做保证后期Hive的查询效率与数据倾斜。

    “宜人蜂巢的数据是需要安全和可靠的,为了保障数据的可靠性,我们自己写了一套MapReduce底层解析Kafka相关的InputFormatInputSplitInputRecordReaderCheckpointManager。该离线的业务一天记录一次Kafkaoffset,所以对于zookeeper并没有太大的压力,我们选择保存offsetzookeeper中,并且每天保存一个offset记录(也可以做其它细维度的记录),这样即使MR job失败也可以重跑,以保证数据可靠性。

千万级爬虫并发下的大数据存储与转换

Zookeeper记录的某个topic某一天的offset记录,如下图所示

千万级爬虫并发下的大数据存储与转换

    在近实时数据写入到Hive中时,为什么还要保证后期Hive的查询效率与数据倾斜?我们知道在我们持续(例如通过Spark3分钟)写入数据到Hive中,一天后就会产生大量的小文件,在运行Job时大量的小文件就会导致Map数量的剧增。每一个Map任务都去开一个JVM去执行,导致任务的初始化,启动,执行浪费大量的资源严重影响性能。所以我们在数据的写入时,就进行数据的合并。

    我们对Hadooporg.apache.hadoop.fs.FileUtil源码的copyMerge方法进行了改进,然后写了自己的Merge方法。

    在近实时数据写入到Hive中时,为什么还要保证后期Hive的查询效率与数据倾斜?我们知道在我们持续(例如通过Spark3分钟)写入数据到Hive中,一天后就会产生大量的小文件,在运行Job时大量的小文件就会导致Map数量的剧增。每一个Map任务都去开一个JVM去执行,导致任务的初始化,启动,执行浪费大量的资源严重影响性能。所以我们在数据的写入时,就进行数据的合并。

    我们对Hadooporg.apache.hadoop.fs.FileUtil源码的copyMerge方法进行了改进,然后写了自己的Merge方法。

千万级爬虫并发下的大数据存储与转换

    同时,我们在做文件合并之前也自定义了一些策略去检查是否需要合并,主要是通过文件大小(默认128MB)和文件个数(默认100)两个维度去检查是否需要合并。

千万级爬虫并发下的大数据存储与转换