Java API如何实现向Hive批量导入数据

时间:2021-07-11 04:40:10

Java API实现向Hive批量导入数据

Java程序中产生的数据,如果导入oracle或者mysql库,可以通过jdbc连接insert批量操作完成,但是当前版本的hive并不支持批量insert操作,因为需要先将结果数据写入hdfs文件,然后插入Hive表中。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
package com.enn.idcard;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
/**
 * <p>Description: </p>
 * @author kangkaia
 * @date 2017年12月26日 下午1:42:24
 */
public class HiveJdbc {
    public static void main(String[] args) throws IOException {
        List<List> argList = new ArrayList<List>();
        List<String> arg = new ArrayList<String>();
        arg.add("12345");
        arg.add("m");
        argList.add(arg);
        arg = new ArrayList<String>();
        arg.add("54321");
        arg.add("f");
        argList.add(arg);
//      System.out.println(argList.toString());
        String dst = "/test/kk.txt";
        createFile(dst,argList);
        loadData2Hive(dst);
    }
 
    /**
     * 将数据插入hdfs中,用于load到hive表中,默认分隔符是"\001"
     * @param dst
     * @param contents
     * @throws IOException
     */
    public static void createFile(String dst , List<List> argList) throws IOException{
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(conf);
        Path dstPath = new Path(dst); //目标路径
        //打开一个输出流
        FSDataOutputStream outputStream = fs.create(dstPath);
        StringBuffer sb = new StringBuffer();
        for(List<String> arg:argList){
            for(String value:arg){
                sb.append(value).append("\001");
            }
            sb.deleteCharAt(sb.length() - 4);//去掉最后一个分隔符
            sb.append("\n");
        }
        sb.deleteCharAt(sb.length() - 2);//去掉最后一个换行符
        byte[] contents =  sb.toString().getBytes();
        outputStream.write(contents);
        outputStream.close();
        fs.close();
        System.out.println("文件创建成功!");       
    }
    /**
     * 将HDFS文件load到hive表中
     * @param dst
     */
    public static void loadData2Hive(String dst) {
        String JDBC_DRIVER = "org.apache.hive.jdbc.HiveDriver";
        String CONNECTION_URL = "jdbc:hive2://server-13:10000/default;auth=noSasl";
        String username = "admin";
        String password = "admin";
        Connection con = null;
        
        try {
            Class.forName(JDBC_DRIVER);
            con = (Connection) DriverManager.getConnection(CONNECTION_URL,username,password);
            Statement stmt = con.createStatement();        
            String sql = " load data inpath '"+dst+"' into table population.population_information ";
            
            stmt.execute(sql);
            System.out.println("loadData到Hive表成功!");
        } catch (SQLException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }finally {
            // 关闭rs、ps和con
            if(con != null){
                try {
                    con.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        }
    }   
}

注意:

本例使用mvn搭建,conf配置文件放在src/main/resources目录下。

Hive提供的默认文件存储格式有textfile、sequencefile、rcfile等。用户也可以通过实现接口来自定义输入输的文件格式。

在实际应用中,textfile由于无压缩,磁盘及解析的开销都很大,一般很少使用。Sequencefile以键值对的形式存储的二进制的格式,其支持针对记录级别和块级别的压缩。rcfile是一种行列结合的存储方式(text file和sequencefile都是行表[row table]),其保证同一条记录在同一个hdfs块中,块以列式存储。一般而言,对于OLTP而言,行表优势大于列表,对于OLAP而言,列表的优势大于行表,特别容易想到当做聚合操作时,列表的复杂度将会比行表小的多,虽然单独rcfile的列运算不一定总是存在的,但是rcfile的高压缩率确实减少文件大小,因此实际应用中,rcfile总是成为不二的选择,达观数据平台在选择文件存储格式时也大量选择了rcfile方案。

通过hdfs导入hive的表默认是textfile格式的,因此可以改变存储格式,具体方法是先创建sequencefile、rcfile等格式的空表,然后重新插入数据即可。

?
1
2
3
insert overwrite table seqfile_table select * from textfile_table;
……
insert overwrite table rcfile_table select * from textfile_table;

java 批量插入hive中转在HDFS

稍微修改了下,这文章是通过将数据存盘后,加载到HIVE.

模拟数据放到HDFS然后加载到HIVE,请大家记得添加HIVE JDBC依赖否则会报错。

加载前的数据表最好用外部表,否则会drop表的时候元数据会一起删除!

?
1
2
3
4
5
<dependency>
 <groupId>org.apache.hive</groupId>
 <artifactId>hive-jdbc</artifactId>
 <version>1.1.0</version>
</dependency>

代码

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class Demo {
        public static void main(String[] args) throws Exception {
            List<List> argList = new ArrayList<List>();
            List<String> arg = new ArrayList<String>();
            arg.add("12345");
            arg.add("m");
            argList.add(arg);
            arg = new ArrayList<String>();
            arg.add("54321");
            arg.add("f");
            argList.add(arg);
//          System.out.println(argList.toString());
            String dst = "/test/kk.txt";
            createFile(dst,argList);
//          loadData2Hive(dst);
        }
        /**
         * 将数据插入hdfs中,用于load到hive表中,默认分隔符是"|"
         * @param dst
         * @param contents
         * @throws IOException
         * @throws Exception
         * @throws InterruptedException
         */
        public static void createFile(String dst , List<List> argList) throws IOException, InterruptedException, Exception{
            Configuration conf = new Configuration();
            FileSystem fs = FileSystem.get(new URI("hdfs://hadoop:9000"),conf,"root");
            Path dstPath = new Path(dst); //目标路径
            //打开一个输出流
            FSDataOutputStream outputStream = fs.create(dstPath);
            StringBuffer sb = new StringBuffer();
            for(List<String> arg:argList){
                for(String value:arg){
                    sb.append(value).append("|");
                }
                sb.deleteCharAt(sb.length() - 1);//去掉最后一个分隔符
                sb.append("\n");
            }
            byte[] contents =  sb.toString().getBytes();
            outputStream.write(contents);
            outputStream.flush();;
            outputStream.close();
            fs.close();
            System.out.println("文件创建成功!");
            
        }
        /**
         * 将HDFS文件load到hive表中
         * @param dst
         */
        public static void loadData2Hive(String dst) {
            String JDBC_DRIVER = "org.apache.hive.jdbc.HiveDriver";
            String CONNECTION_URL = "jdbc:hive2://hadoop:10000/default";
            String username = "root";
            String password = "root";
            Connection con = null;
            
            try {
                Class.forName(JDBC_DRIVER);
                con = (Connection) DriverManager.getConnection(CONNECTION_URL,username,password);
                Statement stmt = con.createStatement();
                
                String sql = " load data inpath '"+dst+"' into table test ";//test 为插入的表
                
                stmt.execute(sql);
                System.out.println("loadData到Hive表成功!");
            } catch (SQLException e) {
                e.printStackTrace();
            } catch (ClassNotFoundException e) {
                e.printStackTrace();
            }finally {
                // 关闭rs、ps和con
                if(con != null){
                    try {
                        con.close();
                    } catch (SQLException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
        
    }

以上为个人经验,希望能给大家一个参考,也希望大家多多支持服务器之家。

原文链接:https://blog.csdn.net/kangkangwanwan/article/details/78915134