https://blog.csdn.net/a924382407/article/details/106663891/
说明:java实现从HDFS上下载文件及文件夹的功能,以流形式输出,便于用户自定义保存任何路径下
<!--阿里 FastJson依赖--> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>3.1.1</version> <exclusions> <exclusion> <artifactId>slf4j-log4j12</artifactId> <groupId>org.slf4j</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>3.1.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.1.1</version> </dependency>
相关类引入jar包,代码上方查看对照即可
1.下载xxx文件
“下载文件” 执行流程说明:
1.构建hdfs连接,初始化Configuration
2.获取文件输入流FSDataInputStream,调用downloadFile()
3.方法内部先设置header请求头,格式以文件名(convertFileName(fileName))输出文件,然后输出流内部信息以流的形式输出
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.io.InputStreamResource; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; import util.ExportUtil; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; /** * 下载文件 * @author liudz * @date 2020/6/9 * @return 执行结果 **/ @RequestMapping(value = "/down", method = RequestMethod.GET) public ResponseEntity<InputStreamResource> Test01() throws URISyntaxException, IOException { //下面两行,初始化hdfs配置连接 Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://172.16.1.9:8020"), conf); FSDataInputStream inputStream = fs.open(new Path("hdfs://172.16.1.9:8020/spark/testLog.txt")); ResponseEntity<InputStreamResource> result = ExportUtil.downloadFile(inputStream, "testLog.txt"); return result; }
import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.UnsupportedEncodingException; import java.net.URLEncoder; import lombok.extern.slf4j.Slf4j; import org.springframework.core.io.FileSystemResource; import org.springframework.core.io.InputStreamResource; import org.springframework.http.HttpHeaders; import org.springframework.http.MediaType; import org.springframework.http.ResponseEntity; /** * 文件以流的形式读取 * * @param in 字符输入流 * @param fileName 文件名字 * @return 返回结果 */ public static ResponseEntity<InputStreamResource> downloadFile(InputStream in, String fileName) { try { byte[] testBytes = new byte[in.available()]; HttpHeaders headers = new HttpHeaders(); headers.add("Cache-Control", "no-cache, no-store, must-revalidate"); headers.add("Content-Disposition", String.format("attachment; filename=\"%s\"", convertFileName(fileName))); headers.add("Pragma", "no-cache"); headers.add("Expires", "0"); headers.add("Content-Language", "UTF-8"); //最终这句,让文件内容以流的形式输出 return ResponseEntity.ok().headers(headers).contentLength(testBytes.length) .contentType(MediaType.parseMediaType("application/octet-stream")).body(new InputStreamResource(in)); } catch (IOException e) { log.info("downfile is error" + e.getMessage()); } log.info("file is null" + fileName); return null; }
2.下载xx文件夹
“下载文件夹及内部文件” 执行流程说明:
1.初始化header请求头信息,格式以xx.zip输出文件夹,调用down2()
2.构建hdfs连接,初始化Configuration
3.调用迭代器compress,传入参数(文件夹整体路径 + ZipOutputStream实例 + FileSystem实例)
4.迭代器执行思路:
遍历对应子目录:1)如果为文件夹,zip写入一个文件进入点(路径末尾单词 + “/”)
2)如果为文件,zip写入文件(目录文件的整体路径)
----------------------------------------------------------------------------------------
******注意:容易出错2行代码:******
压缩文件:zipOutputStream.putNextEntry(new ZipEntry(name.substring(1)));
压缩文件夹:zipOutputStream.putNextEntry(new ZipEntry(fileStatulist[i].getPath().getName() + "/"));
**name属性用于zip创建文件,fileStatulist[i].getPath().getName()用于zip创建文件夹**
-----------------------------------------------------------------------------------------
举例说明:
假设文件夹spark-warehouse路径下有2文件夹data1和data2,文件夹下各一个a.txt文本文件
第一步:获取路径“C:/Users/liudz/Desktop/spark-warehouse”下的目录,也就是(C:/Users/liudz/Desktop/spark-warehouse/data1、C:/Users/liudz/Desktop/spark-warehouse/data2)
lastName=spark-warehouse
name=/spark-warehouse/data1
判断“C:/Users/liudz/Desktop/spark-warehouse/data1”为目录,zip写入“data1/”文件夹
第二步:获取路径“C:/Users/liudz/Desktop/spark-warehouse/data1”下的目录,也就是(C:/Users/liudz/Desktop/spark-warehouse/data1/a.txt)
lastName=data1
name=/data1/a.txt
判断“C:/Users/liudz/Desktop/spark-warehouse/data1/a.txt”为文件,zip写入“data1/a。txt”文件
package test; import java.io.*; import java.net.URI; import java.net.URISyntaxException; import java.util.zip.ZipEntry; import java.util.zip.ZipOutputStream; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; /** * 使用java API操作hdfs--拷贝部分文件到本地 */ public class DownHDFDSTtoLocal { private static final Log log = LogFactory.getLog(DownHDFDSTtoLocal.class); public static void main(String[] args) throws IOException { DownHDFDSTtoLocal d=new DownHDFDSTtoLocal(); ByteArrayOutputStream zos = (ByteArrayOutputStream) d.down2("hdfs://10.79.169.24:8020/user/guestuser/oas/tables/business_line_analyse"); //byte[] out = zos.toByteArray(); FileOutputStream f=new FileOutputStream("D:\\code\\oasdata\\相关航线导入\\新表的数据\\likehua.zip"); f.write(zos.toByteArray()); zos.close(); } /** * 多文件(文件夹) * * @param cloudPath * cloudPath * @author liudz * @date 2020/6/8 * @return 执行结果 **/ public OutputStream down2(String cloudPath) { // 1获取对象 ByteArrayOutputStream out = null; try { Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://10.79.169.24:8020"), conf); out = new ByteArrayOutputStream(); ZipOutputStream zos = new ZipOutputStream(out); compress(cloudPath, zos, fs); zos.close(); } catch (IOException e) { log.info("----error:{}----" + e.getMessage()); } catch (URISyntaxException e) { log.info("----error:{}----" + e.getMessage()); } return out; } /** * compress * * @param baseDir * baseDir * @param zipOutputStream * zipOutputStream * @param fs * fs * @author liudz * @date 2020/6/8 **/ public void compress(String baseDir, ZipOutputStream zipOutputStream, FileSystem fs) throws IOException { try { FileStatus[] fileStatulist = fs.listStatus(new Path(baseDir)); log.info("basedir = " + baseDir); String[] strs = baseDir.split("/"); //lastName代表路径最后的单词 String lastName = strs[strs.length - 1]; for (int i = 0; i < fileStatulist.length; i++) { String name = fileStatulist[i].getPath().toString(); name = name.substring(name.indexOf("/" + lastName)); if (fileStatulist[i].isFile()) { Path path = fileStatulist[i].getPath(); FSDataInputStream inputStream = fs.open(path); zipOutputStream.putNextEntry(new ZipEntry(name.substring(1))); IOUtils.copyBytes(inputStream, zipOutputStream, Integer.parseInt("1024")); inputStream.close(); } else { zipOutputStream.putNextEntry(new ZipEntry(fileStatulist[i].getPath().getName() + "/")); log.info("fileStatulist[i].getPath().toString() = " + fileStatulist[i].getPath().toString()); compress(fileStatulist[i].getPath().toString(), zipOutputStream, fs); } } } catch (IOException e) { log.info("----error:{}----" + e.getMessage()); } } }