从0到1搭建基于Kafka、Flume和Hive的海量数据分析系统(一)数据收集应用

时间:2021-02-07 05:18:00

大数据时代,一大技术特征是对海量数据采集、存储和分析的多组件解决方案。而其中对来自于传感器、APP的SDK和各类互联网应用的原生日志数据的采集存储则是基本中的基本。本系列文章将从0到1,概述一下搭建基于Kafka、Flume、Zookeeper、HDFS、Hive的海量数据分析系统的框架、核心应用和关键模块。

项目源代码存储于GitHub:源码

系统架构概述

本系列文章所介绍的数据分析系统,定位于一种通用的大数据分析系统,可用于电商、互联网和物联网的实际解决方案中。该应用主要解决从多种多样的互联网应用、APP、传感器、小程序等网络客户端中预设的接口采集数据,并进行分布式存储,通过RESTful或服务订阅的方式,连接BI应用或者嵌入了机器学习模块的业务数据分析系统。其项目架构如下:

从0到1搭建基于Kafka、Flume和Hive的海量数据分析系统(一)数据收集应用

项目主体实现了从各种互联网客户端的日志数据到集中的BI分析系统的全过程,主要包括以下构件:

1. 日志收集Web应用:基于REST风格的接口,处理从网络客户端回传的数据文件,其中包括了对数据对象的定义、核心Web应用和模拟客户端测试程序。

2. Kafka集群:Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。

2. Zookeeper集群:是一个为Kafka的分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。Zookeeper可以实现封装好复杂易出错的关键服务,将简单易用的接口和性能高效、功能稳定的系统提供给用户。

3. Flume:用于存储数据到HDFS。Flume的意义在于:当收集数据的速度超过将写入数据的时候,也就是当收集信息遇到峰值时,这时候收集的信息非常大,甚至超过了系统的写入数据能力,这时候,Flume会在数据生产者和数据收容器间做出调整,保证其能够在两者之间提供平稳的数据。

4. HDFS:提供高吞吐量的分布式存储方案。

5. Hive:Hive是建立在 Hadoop 上的数据仓库基础构架,定义了简单的类 SQL 查询语言,便于快速搭建基于SQL的数据应用。

6. Hive Server2:一种可选服务,允许远程客户端可以使用各种编程语言向Hive提交请求并检索结果。

7. Dubbo和RPC:Dubbo是阿里开源的一个高性能优秀的服务框架,使得应用可通过高性能的 RPC 实现服务的输出和输入功能,轻松实现面向服务的应用开发。

数据收集应用

数据收集应用的目标是提供一个对外的接口,基于实时或准实时的要求收集来自海量客户端应用所上传的数据文件,因此可以根据需求进行集群化和添加负载均衡机制。以常规的日志数据收集应用为例,一个数据应用应该实现的主要功能包括:数据属性拷贝、数据对象封装、时间校对、地理数据提取和缓存、发送数据至Kafka,以及一个可选的模拟客户端上传数据应用。

一、应用结构

基于Maven的多模块应用布局方案,具体包括:

——EasyBI-Parent:父组件,仅维护一个pom文件,作为个子组件的parent pom文件,定义了统一的项目版本、依赖管理和Maven插件管理。

    |——EasyBI-Common:子组件,定义了日志数据对象和通用的工具类方法。

    |——EasyBI-Logs-Collect-Web:核心组件,基于Rest风格收集日志数据,封装数据对象并发送至Kafka,其中对一些数据进行初级加工。

    |——EasyBI-Logs-MockApp:模拟一个客户端上传数据的应用,可选。

二、Common组件

数据对象

数据对象以日志对象为载体,里面封装了从客户端发送过来的不同日志的POJO对象,其类图为:

从0到1搭建基于Kafka、Flume和Hive的海量数据分析系统(一)数据收集应用

AppBaseLog为日志类型的统一父类,定义了一些公共的数据属性,被用于各个具体日志实现类继承。

Startup、Event、Page、Usage和Error分别对应了应用启动、事件、页面、功能和错误的日志记录,继承了公共基类并维护了各自的特有属性。

APPLogEntity是按客户端为单位的日志对象,组合了各个不同的子日志对象,作为整个数据分析系统的核心数据模型。

通用的工具类

主要包括两个部分,分别是复制各子日志对象的属性至LogEntity对象的一个工具方法,以及一个提取IP位置信息的工具方法。

拷贝日志属性的工具类,核心代码如下:

 public class PropertiesUtil {
/*
* 通过内省进行属性复制(对象到对象)
*/
public static void copyProperties(Object src, Object dest) { try {
//源对象的BeanInfo
BeanInfo srcBeanInfo = Introspector.getBeanInfo(src.getClass());
//获取属性描述符
PropertyDescriptor[] descriptors = srcBeanInfo.getPropertyDescriptors();
for (PropertyDescriptor descriptor : descriptors) {
//获取getter和setter方法
Method getter = descriptor.getReadMethod();
Method setter = descriptor.getWriteMethod();
//获取set方法名称
String setterName = setter.getName();
//获取setter方法参数
Class<?>[] parameterTypes = setter.getParameterTypes(); Object value = getter.invoke(src); try {
Method destSetter = dest.getClass().getMethod(setterName, parameterTypes);
destSetter.invoke(dest, value);
} catch (Exception e) {
continue;
} }
} catch (Exception e) {
e.printStackTrace();
}
}
/*
* 复制对象属性至一个数组的重载方法
*/
public static void copyProperties(Object src, Object[] arr) {
for (Object obj : arr) {
copyProperties(src, obj);
}
}
}

该工具类包括两个重载的方法,基于内省,分别实现深度复制一个对象的成员变量到另一个对象,或者到另一个对象数组中。具体到本例子,包括:

  1. 获取A对象的getter方法和setter方法,然后获取setter方法的名称和参数值。
  2. 通过反射,调用A对象的getter方法,获取成员变量值。
  3. 通过反射,调用B对象的setter方法,为其赋值,完成复制。

提取IP地理信息的工具类,通过使用maxmind-db库,来实现对Host地址的地理信息提取,用来填充至数据对象。

     public static final int COUNTRY = 1;
public static final int PROVINCE = 2;
public static final int CITY = 3; private static InputStream inputStream;
private static Reader reader; static {
try {
inputStream = ClassLoader.getSystemResourceAsStream("GeoLite2-City.mmdb");
reader = new Reader(inputStream);
} catch (IOException e) {
e.printStackTrace();
}
}

使用maxmind-db库需要用到GeoLite2-City.mmdb文件,通过静态代码块来初始化资源文件的读取流,并且定义用来获取国家、省、市的常量代码。

 public static String getLocation(String ip, int level) {
try {
JsonNode node = reader.get(InetAddress.getByName(ip));
switch (level) {
case 1:
return node.get("country").get("names").get("zh-CN").textValue();
case 2:
return node.get("subdivisions").get(0).get("names").get("zh-CN").textValue();
case 3:
return node.get("city").get("names").get("zh-CN").textValue();
default:
return null;
}
} catch (Exception e) {
e.printStackTrace();
}
return null;
}

通过调用com.maxmind.db.Reader对象的get方法,可以获取传入IP地址的地址节点对象,如果对文档节点模型比较熟悉的话,可以很快地获取到节点对象所对应的不同地址信息。

三、Logs_Collect_Web应用组件

Logs_Collect_Web是基于SpringMVC的Web应用,目标是收集各客户端的日志数据,组件结构为

从0到1搭建基于Kafka、Flume和Hive的海量数据分析系统(一)数据收集应用

DispatcherServlet是SpringMVC的核心调度类,关于SpringMVC的Web应用可参考:基于SSM的Java Web应用开发原理初探

其Controller类需要实现如下的核心功能。

基本信息复制

利用上面介绍的工具类,实现对从请求体中所提取的日志数据进行属性复制,封装到LogEntity的数据对象中,用于传输。

 private void copyBaseProperties(AppLogEntity e) {
PropertiesUtil.copyProperties(e, e.getAppStartupLogs());
4 }

时间校准

因为日志文件的上传并不是瞬时的,客户端提交时间与服务器收到时间存在时间差,因此需要使用服务器时间,与Http请求的时间差,来对原始的日志文件时间进行校正。

    //server时间
long serverTime = System.currentTimeMillis();
//client时间
long clientTime = Long.parseLong(request.getHeader("clientTime"));
//时间校对
long duration = serverTime - clientTime;
/*
* 校正时间
*/
private void verifyTime(AppLogEntity e, long duration) {
for (AppBaseLog log : e.getAppStartupLogs()) {
log.setCreatedAtMs(log.getCreatedAtMs() + duration);
}
  }

提取地理信息并缓存

缓存地理位置信息的方法,是通过维护一个HashMap,把Host的字符串作为键,封装一个包括国家、省、市的位置对象作为值,实现赋值位置信息到数据对象时:

  • 如果缓存中包含该位置,直接从HashMap中查找该值并返回,实现高性能的查找。
  • 如果缓存中没有,再调用GeoUtil方法,获取地址,并添加到HashMap中。
 /*
* 操作IP的方法(缓存地理位置信息)
*/
private void processIP(AppLogEntity e, String clientIP) {
GeoInfo info = geoCache.get(clientIP);
if (info == null) {
info = new GeoInfo();
info.setCountry(GeoUtil.getLocation(clientIP, GeoUtil.COUNTRY));
info.setProvince(GeoUtil.getLocation(clientIP, GeoUtil.PROVINCE));
geoCache.put(clientIP, info);
}
for (AppStartupLog log : e.getAppStartupLogs()) {
log.setCountry(info.getCountry());
log.setProvince(info.getProvince());
log.setIpAddress(clientIP);
}
}

发送至Kafka的方法

Kafka的核心方法是Producer,通过将数据对象转为JSON的字符串封装到不同的Topic中,再通过Producer来发送出去,即完成了发送至Kafka的方法实现。代码实现如下:

 private void sendMessage(AppLogEntity e) {
//创建配置对象
Properties properties = new Properties();
properties.put("metadata.broker.list", "s202:9092");
properties.put("serializer.class", "kafka.serializer.StringEncoder");
properties.put("request.required.acks", "1");
//创建生产者
Producer<Integer, String> producer = new Producer<Integer, String>(new ProducerConfig(properties)); sendSingleLog(producer, Constants.TOPIC_APP_STARTUP, e.getAppStartupLogs());
sendSingleLog(producer,Constants.TOPIC_APP_ERRROR,e.getAppErrorLogs());
//发送消息
producer.close();
}
/*
* 发送单个消息的方法
*/
private void sendSingleLog(Producer<Integer, String> producer, String topic,
AppBaseLog[] logs) {
for (AppBaseLog log : logs) {
String logMessage = JSONObject.toJSONString(log);
KeyedMessage<Integer, String> data = new KeyedMessage<Integer, String>(topic, logMessage);
producer.send(data);
}
}

四、Mock_Client应用组件

Mock_Client是可选组件,用于模拟一个客户端向服务器发送带数据对象的请求方法,来测试服务器的可用性。

实现的原理是基于一个Json数据样本,通过随机组合数据对象的属性并发送请求,并获取响应代码来判断。具体可以参考GitHub源码。