HIVE 处理日志,自定义inputformat 完整版

时间:2021-10-28 07:59:25

网上找了很多材料都是写了部份代码的,今天在峰哥的帮助下实现了此功能。


为何要设置此功能是由于 hive fields terminated by '||||' 不支持 字符串导致


将你的inputformat类打成jar包,如MyInputFormat.jar
将MyInputFormat.jar放到 hive/lib里,然后就可以建表了
假设你的inputFormat类路径是com.hive.myinput
则建表语句为:create table tbname(name stirng,id int, ...) stored as INPUTFORMAT 'com.hive.myinput'   OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'

HiveIgnoreKeyTextOutputFormat是系统自带的outputformat类,你也可以自定义

由于hive是基于hadoop集群运行的,所以hadoop/lib里面也必须放入MyInputFormat.jar,


此功能需要二个CLASS 类:ClickstreamInputFormat ClickstreamRecordReader


package com.jd.cloud.clickstore;

import java.io.IOException;  

import org.apache.hadoop.io.LongWritable;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.mapred.FileSplit;  
import org.apache.hadoop.mapred.InputSplit;  
import org.apache.hadoop.mapred.JobConf;  
import org.apache.hadoop.mapred.JobConfigurable;  
import org.apache.hadoop.mapred.RecordReader;  
import org.apache.hadoop.mapred.Reporter;  
import org.apache.hadoop.mapred.TextInputFormat;

/**
 * 自定义hadoop的 org.apache.hadoop.mapred.InputFormat
 *  
 * @author winston
 *  
 */  
public class ClickstreamInputFormat extends TextInputFormat implements  
        JobConfigurable {  
 
    public RecordReader<LongWritable, Text> getRecordReader(  
            InputSplit genericSplit, JobConf job, Reporter reporter)  
            throws IOException {  
 
        reporter.setStatus(genericSplit.toString());  
        return new ClickstreamRecordReader((FileSplit) genericSplit,job);  
    }  
}  




package com.jd.cloud.clickstore;

import java.io.IOException;
import java.io.InputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.util.LineReader;
import org.apache.hadoop.mapred.RecordReader;


public class ClickstreamRecordReader implements
        RecordReader<LongWritable, Text> {


    private CompressionCodecFactory compressionCodecs = null;
    private long start;
    private long pos;
    private long end;
    private LineReader lineReader;
    int maxLineLength;

    public ClickstreamRecordReader(FileSplit inputSplit, Configuration job)
            throws IOException {
        maxLineLength = job.getInt("mapred.ClickstreamRecordReader.maxlength",
                Integer.MAX_VALUE);
        start = inputSplit.getStart();
        end = start + inputSplit.getLength();
        final Path file = inputSplit.getPath();
        compressionCodecs = new CompressionCodecFactory(job);
        final CompressionCodec codec = compressionCodecs.getCodec(file);

        // Open file and seek to the start of the split
        FileSystem fs = file.getFileSystem(job);
        FSDataInputStream fileIn = fs.open(file);
        boolean skipFirstLine = false;
        if (codec != null) {
            lineReader = new LineReader(codec.createInputStream(fileIn), job);
            end = Long.MAX_VALUE;
        } else {
            if (start != 0) {
                skipFirstLine = true;
                --start;
                fileIn.seek(start);
            }
            lineReader = new LineReader(fileIn, job);
        }
        if (skipFirstLine) {
            start += lineReader.readLine(new Text(), 0,
                    (int) Math.min((long) Integer.MAX_VALUE, end - start));
        }
        this.pos = start;
    }

    public ClickstreamRecordReader(InputStream in, long offset, long endOffset,
            int maxLineLength) {
        this.maxLineLength = maxLineLength;
        this.lineReader = new LineReader(in);
        this.start = offset;
        this.pos = offset;
        this.end = endOffset;
    }

    public ClickstreamRecordReader(InputStream in, long offset, long endOffset,
            Configuration job) throws IOException {
        this.maxLineLength = job.getInt(
                "mapred.ClickstreamRecordReader.maxlength", Integer.MAX_VALUE);
        this.lineReader = new LineReader(in, job);
        this.start = offset;
        this.pos = offset;
        this.end = endOffset;
    }

    public LongWritable createKey() {
        return new LongWritable();
    }

    public Text createValue() {
        return new Text();
    }

    /**
     * Reads the next record in the split. get usefull fields from the raw nginx
     * log.
     *
     * @param key
     *            key of the record which will map to the byte offset of the
     *            record's line
     * @param value
     *            the record in text format
     * @return true if a record existed, false otherwise
     * @throws IOException
     */
    public synchronized boolean next(LongWritable key, Text value)
            throws IOException {
        // Stay within the split
        while (pos < end) {
            key.set(pos);
            int newSize = lineReader.readLine(value, maxLineLength,
                    Math.max((int) Math.min(Integer.MAX_VALUE, end - pos),
                            maxLineLength));

            if (newSize == 0)
                return false;

            String str = value.toString().toLowerCase()
                    .replaceAll("\\@\\_\\@", "\001");
            value.set(str);
            pos += newSize;

            if (newSize < maxLineLength)
                return true;
        }

        return false;
    }

    public float getProgress() {
        if (start == end) {
            return 0.0f;
        } else {
            return Math.min(1.0f, (pos - start) / (float) (end - start));
        }
    }

    public synchronized long getPos() throws IOException {
        return pos;
    }

    public synchronized void close() throws IOException {
        if (lineReader != null)
            lineReader.close();
    }
    
    // 测试 输出
    //public static void main(String ags[]){
    //    String str1 ="123@_@abcd@_@fk".replaceAll("\\@\\_\\@", "\001");
    //    System.out.println(str1);
    //}
}




1.上传到 HIVE 服务器上 JAVAC 编译

javac -cp ./:/usr/lib/hadoop/hadoop-common.jar:/home/op1/hadoop/hadoop-core-1.0.3.jar:/usr/lib/hadoop/lib/commons-logging-1.1.1.jar */**/*/*/*


2.JAR 打包 类文件

jar -cf ClickstreamInputFormat.jar /home/op1/uerdwdb/src/

3.复制 Hive/lib Hadoop/lib 文件夹内


4.Hive 创建表命令

create table hive_text(num int,name string,`add` string)
stored as INPUTFORMAT 'com.jd.cloud.clickstore.ClickstreamInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
location '/home/op1/uerdwdb/text.txt';