hadoop上传文件功能实例代码

时间:2022-09-04 17:19:38

hdfs上的文件是手动执行命令从本地linux上传至hdfs的。在真实的运行环境中,我们不可能每次手动执行命令上传的,这样太过繁琐。那么,我们可以使用hdfs提供的java api实现文件上传至hdfs,或者直接从ftp上传至hdfs。 

然而,需要说明一点,之前笔者是要运行mr,都需要每次手动执行yarn jar,在实际的环境中也不可能每次手动执行。像我们公司是使用了索答的调度平台/任务监控平台,可以定时的以工作流执行我们的程序,包括普通java程序和mr。其实,这个调度平台就是使用了quartz。当然,这个调度平台也提供其它的一些功能,比如web展示、日志查看等,所以也不是免费的。 

首先,给大家简单介绍一下hdfs。hdfs是以流式数据访问模式来存储超大文件,hdfs的构建思路是一次写入,多次读取,这样才是最高效的访问模式。hdfs是为高数据吞吐量应用优化的,所以会以提高时间延迟为代价。对于低延时的访问需求,我们可以使用hbase。 

然后,还要知道hdfs中块(block)的概念,默认为64mb。块是hdfs的数据读写的最小单位,通常每个map任务一次只处理一个block,像我们对集群性能评估就会使用到这个概念,比如目前有多少节点,每个节点的磁盘空间、cpu以及所要处理的数据量、网络带宽,通过这些信息来进行性能评估。我们可以使用hadoop fsck / -files -blocks列出文件系统中各个文件由哪些块构成。 

然后,再就是要知道namenode和datanode,这个在之前的博文已经介绍过,下面看看cm环境中hdfs的管理者(namenode)和工作者(datanode),如下 

hadoop上传文件功能实例代码

在yarn环境中是可以有多个namenode的。此环境中没有secondarynamenode,当然也可以有。 

好了,关于hdfs的基本概念就讲到这儿了,下面来看看具体的代码。

一、java实现上传本地文件至hdfs

这里,可以直接使用hdfs提供的java api即可实现,代码如下:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package com.bjpowernode.hdfs.local;
import org.apache.hadoop.conf.configuration;
import org.apache.hadoop.fs.filesystem;
import org.apache.hadoop.fs.path;
/**
 * classname:uploadlocalfiletohdfs <br/>
 * function: 本地文件上传至hdfs. <br/>
 * date:  2016年3月28日 下午10:06:05 <br/>
 * @author qiyongkang
 * @version
 * @since jdk 1.6
 * @see 
 */
public class uploadlocalfiletohdfs {
 public static void main(string[] args) {
  configuration conf = new configuration();
  string localdir = "/home/qiyongkang";
  string hdfsdir = "/qiyongkang";
  try{
   path localpath = new path(localdir);
   path hdfspath = new path(hdfsdir);
   filesystem hdfs = filesystem.get(conf);
   hdfs.copyfromlocalfile(localpath, hdfspath);
  }catch(exception e){
   e.printstacktrace();
  }
 }
}

注意,这里hdfs上传目录如果不存在的话,hdfs会自动创建,比较智能。 

打完包后,上传至服务器,执行yarn jar mr-demo-0.0.1-snapshot-jar-with-dependencies.jar,然后执行hadoop fs -ls /qiyongkang便可看到: 

hadoop上传文件功能实例代码

二、java实现上传ftp上的文件至hdfs

首先,我们得准备一个ftp服务器,关于ftp服务器的搭建,大家可以查阅资料,笔者就不赘述了。 

其实,从ftp上拉取文件上传到hdfs上,这个过程大家不要想复杂了,我们讲本地文件上传到hdfs,其实就是采用流的方式。因此,我们可以直接读取ftp上的文件流,然后以流的方式写入到hdfs。 

下面,直接贴出代码:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package com.bjpowernode.hdfs.ftp;
import java.io.inputstream;
import org.apache.commons.net.ftp.ftp;
import org.apache.commons.net.ftp.ftpclient;
import org.apache.commons.net.ftp.ftpfile;
import org.apache.commons.net.ftp.ftpreply;
import org.apache.hadoop.conf.configuration;
import org.apache.hadoop.fs.fsdataoutputstream;
import org.apache.hadoop.fs.filesystem;
import org.apache.hadoop.fs.path;
import org.apache.hadoop.io.ioutils;
/**
 * classname:uploadftpfiletohdfs <br/>
 * function: todo add function. <br/>
 * reason: todo add reason. <br/>
 * date: 2016年3月28日 下午10:50:37 <br/>
 *
 * @author qiyongkang
 * @version
 * @since jdk 1.6
 * @see
 */
public class uploadftpfiletohdfs {
 public static void main(string[] args) {
  configuration conf = new configuration();
  loadfromftptohdfs("172.31.26.200", "qiyongkang", "qyk123456", "/www/input/", "/qiyongkang/", conf);
 }
 /**
  *
  * loadfromftptohdfs:将数据从ftp上传到hdfs上. <br/>
  *
  * @author qiyongkang
  * @param ip
  * @param username
  * @param password
  * @param filepath
  * @param outputpath
  * @param conf
  * @return
  * @since jdk 1.6
  */
 private static boolean loadfromftptohdfs(string ip, string username, string password, string filepath,
   string outputpath, configuration conf) {
  ftpclient ftp = new ftpclient();
  inputstream inputstream = null;
  fsdataoutputstream outputstream = null;
  boolean flag = true;
  try {
   ftp.connect(ip);
   ftp.login(username, password);
   ftp.setfiletype(ftp.binary_file_type);
   ftp.setcontrolencoding("utf-8");
   int reply = ftp.getreplycode();
   if (!ftpreply.ispositivecompletion(reply)) {
    ftp.disconnect();
   }
   ftpfile[] files = ftp.listfiles(filepath);
   filesystem hdfs = filesystem.get(conf);
   for (ftpfile file : files) {
    if (!(file.getname().equals(".") || file.getname().equals(".."))) {
     inputstream = ftp.retrievefilestream(filepath + file.getname());
     outputstream = hdfs.create(new path(outputpath + file.getname()));
     ioutils.copybytes(inputstream, outputstream, conf, false);
     if (inputstream != null) {
      inputstream.close();
      ftp.completependingcommand();
     }
    }
   }
   ftp.disconnect();
  } catch (exception e) {
   flag = false;
   e.printstacktrace();
  }
  return flag;
 }
}

然后同样打包上传后执行yarn jar mr-demo-0.0.1-snapshot-jar-with-dependencies.jar,便可看到: 

hadoop上传文件功能实例代码

总结

以上所述是小编给大家介绍的hadoop上传文件功能实例代码,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对服务器之家网站的支持!