Java实现HDFS文本解析写入到Hbase中

时间:2022-12-31 13:47:06

首先先在Hbase 中建表,参考我上一篇Java操作Hbase 的博客。

接着代码:

package com.xxx.report.service;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.xxx.report.config.Constants;
import com.xxx.report.util.HbaseUtilQA;
import com.xxx.report.util.MD5Util;
import com.xxx.report.util.TimeUtil;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

/**
* @author yangxin-ryanx
*/
public class LockAppLog2Hbase implements Serializable{
private static final Logger LOG = LoggerFactory.getLogger(LockAppLog2Hbase.class);
private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMdd");
private static final String CF_NAME = "cf";
private static final String tableName = "rd:app_log";

public void run(String master, String startTime, String endTime) {
long start = System.currentTimeMillis();
LOG.info("Start run the log parser...");
startTime = startTime.replace("-", "");
endTime = endTime.replace("-","");
SparkConf conf = new SparkConf().setAppName(Constants.SPARK_APP_NAME_YangXin).setMaster(master);
JavaSparkContext javaSparkContext = new JavaSparkContext(conf);
List<String> list = Lists.newArrayList();
Calendar calendar = Calendar.getInstance();
calendar.set(Calendar.YEAR, Integer.valueOf(startTime.substring(0,4)));
calendar.set(Calendar.MONTH, Integer.valueOf(startTime.substring(4,6)) - 1);
calendar.set(Calendar.DATE, Integer.valueOf(startTime.substring(6,8)));
String date = startTime;
while (!date.equals(endTime)){
list.add(date);
calendar.add(Calendar.DATE, 1);
date = simpleDateFormat.format(calendar.getTime());
}
list.add(endTime);


for (String day : list){
StringBuffer path = new StringBuffer();
path.append(Constants.PREFIX_PATH_YangXin).append(day).append("/*/*");

JavaRDD<String> rdd = javaSparkContext.textFile(path.toString());

if (rdd.isEmpty()){
continue;
}
rdd.foreachPartition(new VoidFunction<Iterator<String>>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Iterator<String> logs) throws Exception {
while (logs.hasNext()) {

String log = logs.next();
try {
String snValue = log.split(" ")[10];
if (snValue.equals("-")){
continue;
}
handleLog(log);
} catch (Exception e) {
e.printStackTrace();
LOG.error(e.getMessage());
}
}
}
});
HbaseUtilQA.flush(tableName);
}
long end = System.currentTimeMillis();
System.out.println("耗时:"+(end-start)/1000+"秒");
}


private void handleLog(String line){
MD5Util md5Util = new MD5Util();
String snValue = line.split(" ")[10];
String timePart1 = line.split(" ")[0];
String timePart2 = line.split(" ")[1];
String time = timePart1 + " " + timePart2;
String rowKey = md5Util.md5Function(snValue, timePart1);
Long timeStamp = TimeUtil.getTimeStamp(time);
Map<String, String> lockAppMap = Maps.newHashMap();
String key = md5Util.md5Key(line) + "_" + timePart2.replace(".", "").replace(":","");
lockAppMap.put(key, line);
HbaseUtilQA.addRecords(tableName, rowKey, timeStamp, CF_NAME.getBytes(), lockAppMap);
}

public static void main(String[] args){
LockAppLog2Hbase lockAppLog2Hbase = new LockAppLog2Hbase();
String master = args[0];
String startTime = args[1];
String endTime = args[2];
lockAppLog2Hbase.run(master, startTime, endTime);
}
}


思路先拼接hdfs路径,然后Spark加载进来,然后分区读取,处理,一条一条写入。