自学大数据第九天~hdfs读写以及java api

时间:2022-12-23 01:07:00


自学大数据第九天~hdfs读写以及java api

前几天学习了怎么使用HDFS,今天稍微深入一下,HDFS是怎么工作的;

HDFS读取数据的方式:

以下是极简的读取数据:

自学大数据第九天~hdfs读写以及java api

客户端读取数据,首先通过DistributefileSystem 获取数据的元数据(即元数据存在哪里)

为了加快读取速度,首先考虑读取位置离客户端最近的 数据,那怎么判断呢?

通过一个关于位置的API来获取数据存储的位置(机架号) ,通过对比客户端所在机架号跟数据所在机架号,如果一致则优先读取,否则就会随机读取;

其中我们编写代码时是通过FsDatainputStream,其实现了hadoop中的dfsDataInputStream

HDFS写入数据

那怎么写入数据呢?

还是这些组件,只不过在写入的时候是串联的方式,

自学大数据第九天~hdfs读写以及java api


写的时候先检查有没有写的权限,有权限之后检查是不是已经写了(重名的),然后开始写,首先是写入离自己近的机架,写入第一个后返回写入成功,然后由第一个赋值给第二个,完成后第二个发送通知给第一个,以此类推,直到所有的节点都写完了;

如果集群出现问题怎么办?

1,在hadoop1.0版本时由于只有一个NameNode,所以当NameNode宕机之后,集群就挂了(停止运行),这时候要通过SecondryNameNode冷备来恢复;
在hadoop2.0版本的时候有了热备,此时主NameNode挂了之后,热备的会顶上;第二名名称节点还是一个备胎~冷备;
由于NameNode的数据存放在内存中,内存不能无限制大,所以当集群数据越来越多时,性能就会下降;

2,如果DataNode出现故障,此时NameNode所在节点会重新复制一份数据以使得副本数量符合要求;
那Namenode所在节点怎么知道DN出现故障的呢? DN会定时发送心跳给NameNode,就像一个注册中心一样;

简单的JAVAAPI

一个简单的例子:

假设在目录“hdfs://localhost:9000/user/hadoop”下面有几个文件,分别是file1.txt、file2.txt、file3.txt、file4.abc和file5.abc,这里需要从该目录中过滤出所有后缀名不为“.abc”的文件,对过滤之后的文件进行读取,并将这些文件的内容合并到文件“hdfs://localhost:9000/user/hadoop/merge.txt”中。

为了编写一个能够与HDFS交互的Java应用程序,一般需要向Java工程中添加以下JAR包:
(1)“/usr/local/hadoop/share/hadoop/common”目录下的所有JAR包,包括hadoop-common-3.1.3.jar、hadoop-common-3.1.3-tests.jar、haoop-nfs-3.1.3.jar和haoop-kms-3.1.3.jar,注意,不包括目录jdiff、lib、sources和webapps;
(2)“/usr/local/hadoop/share/hadoop/common/lib”目录下的所有JAR包;
(3)“/usr/local/hadoop/share/hadoop/hdfs”目录下的所有JAR包,注意,不包括目录jdiff、lib、sources和webapps;
(4)“/usr/local/hadoop/share/hadoop/hdfs/lib”目录下的所有JAR包。

import java.io.IOException;
import java.io.PrintStream;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;

/**
 * 过滤掉文件名满足特定条件的文件 
 */
class MyPathFilter implements PathFilter {
    String reg = null;
    MyPathFilter(String reg) {
        this.reg = reg;
    }
    @Override
    public boolean accept(Path path) {
        if (!(path.toString().matches(reg))) {
            return true;
        }
        return false;
    }
}
/***
 * 利用FSDataOutputStream和FSDataInputStream合并HDFS中的文件
 */
public class MergeFile {
    Path inputPath = null; //待合并的文件所在的目录的路径
    Path outputPath = null; //输出文件的路径
    public MergeFile(String input, String output) {
        this.inputPath = new Path(input);
        this.outputPath = new Path(output);
    }
    public void doMerge() throws IOException {
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS","hdfs://master:9000");
        conf.set("fs.hdfs.impl","org.apache.hadoop.hdfs.DistributedFileSystem");
        FileSystem fsSource = FileSystem.get(URI.create(inputPath.toString()), conf);
        FileSystem fsDst = FileSystem.get(URI.create(outputPath.toString()), conf);
        //下面过滤掉输入目录中后缀为.abc的文件
        FileStatus[] sourceStatus = fsSource.listStatus(inputPath,
                new MyPathFilter(".*\\.abc"));
        FSDataOutputStream fsdos = fsDst.create(outputPath);
        PrintStream ps = new PrintStream(System.out);
        //下面分别读取过滤之后的每个文件的内容,并输出到同一个文件中
        for (FileStatus sta : sourceStatus) {
            //下面打印后缀不为.abc的文件的路径、文件大小
            System.out.print("路径:" + sta.getPath() + "    文件大小:" + sta.getLen()
                    + "   权限:" + sta.getPermission() + "   内容:");
            FSDataInputStream fsdis = fsSource.open(sta.getPath());
            byte[] data = new byte[1024];
            int read = -1;

            while ((read = fsdis.read(data)) > 0) {
                ps.write(data, 0, read);
                fsdos.write(data, 0, read);
            }
            fsdis.close();
        }
        ps.close();
        fsdos.close();
    }
    public static void main(String[] args) throws IOException {
        MergeFile merge = new MergeFile(
                "hdfs://master:9000/user/hadoop/",
                "hdfs://master:9000/user/hadoop/merge.txt");
        merge.doMerge();
    }
}

打包后的jar文件

自学大数据第九天~hdfs读写以及java api


然后在本地建立相应文件并上传到hdfs系统中:

自学大数据第九天~hdfs读写以及java api

运行jar文件:

自学大数据第九天~hdfs读写以及java api

注意,运行前先检查一下是否有多余的文件,
否则会报错(因为代码未考虑其他文件)

[hadoop@master hadoop-3.3.4]$ hdfs dfs -ls /user/hadoop
Found 8 items
-rw-r--r--   3 hadoop hadoop         26 2023-03-14 07:11 /user/hadoop/a.txt
-rw-r--r--   3 hadoop hadoop         18 2023-03-23 06:28 /user/hadoop/file1.txt
-rw-r--r--   3 hadoop hadoop         18 2023-03-23 06:29 /user/hadoop/file2.txt
-rw-r--r--   3 hadoop hadoop         18 2023-03-23 06:29 /user/hadoop/file3.txt
-rw-r--r--   3 hadoop hadoop         18 2023-03-23 06:29 /user/hadoop/file4.abc
-rw-r--r--   3 hadoop hadoop         18 2023-03-23 06:29 /user/hadoop/file5.abc
drwxr-xr-x   - hadoop hadoop          0 2023-03-14 06:40 /user/hadoop/input
drwxr-xr-x   - hadoop hadoop          0 2023-03-14 06:43 /user/hadoop/output

自学大数据第九天~hdfs读写以及java api


运行结束后查看是否生成了merge文件:

自学大数据第九天~hdfs读写以及java api

下面放几段常见的代码供学习参考:

import org.apache.hadoop.conf.Configuration;  
        import org.apache.hadoop.fs.FileSystem;
        import org.apache.hadoop.fs.FSDataOutputStream;
        import org.apache.hadoop.fs.Path;
 
        public class Chapter3 {    
                public static void main(String[] args) { 
                        try {
                                Configuration conf = new Configuration();  
                                conf.set("fs.defaultFS","hdfs://localhost:9000");
                                conf.set("fs.hdfs.impl","org.apache.hadoop.hdfs.DistributedFileSystem");
                                FileSystem fs = FileSystem.get(conf);
                                byte[] buff = "Hello world".getBytes(); // 要写入的内容
                                String filename = "test"; //要写入的文件名
                                FSDataOutputStream os = fs.create(new Path(filename));
                                os.write(buff,0,buff.length);
                                System.out.println("Create:"+ filename);
                                os.close();
                                fs.close();
                        } catch (Exception e) {  
                                e.printStackTrace();  
                        }  
                }  
        }
import org.apache.hadoop.conf.Configuration;
        import org.apache.hadoop.fs.FileSystem;
        import org.apache.hadoop.fs.Path;
 
        public class Chapter3 {
                public static void main(String[] args) {
                            try {
                                    String filename = "test";
 
                                    Configuration conf = new Configuration();
                                    conf.set("fs.defaultFS","hdfs://localhost:9000");
                                    conf.set("fs.hdfs.impl","org.apache.hadoop.hdfs.DistributedFileSystem");
                                    FileSystem fs = FileSystem.get(conf);
                                    if(fs.exists(new Path(filename))){
                                            System.out.println("文件存在");
                                    }else{
                                            System.out.println("文件不存在");
                                    }
                                    fs.close();
                        } catch (Exception e) {
                                e.printStackTrace();
                        }
                }
        }
import java.io.BufferedReader;
        import java.io.InputStreamReader;
 
        import org.apache.hadoop.conf.Configuration;
        import org.apache.hadoop.fs.FileSystem;
        import org.apache.hadoop.fs.Path;
        import org.apache.hadoop.fs.FSDataInputStream;
 
        public class Chapter3 {
                public static void main(String[] args) {
                        try {
                                Configuration conf = new Configuration();
                                conf.set("fs.defaultFS","hdfs://localhost:9000");
                                conf.set("fs.hdfs.impl","org.apache.hadoop.hdfs.DistributedFileSystem");
                                FileSystem fs = FileSystem.get(conf);
                                Path file = new Path("test"); 
                                FSDataInputStream getIt = fs.open(file);
                                BufferedReader d = new BufferedReader(new InputStreamReader(getIt));
                                String content = d.readLine(); //读取文件一行
                                System.out.println(content);
                                d.close(); //关闭文件
                                fs.close(); //关闭hdfs
                        } catch (Exception e) {
                                e.printStackTrace();
                        }
                }
        }

以上代码的基本流程就是 先配置Configration,配置其属性HDFS地址以及其实现类,再然后读取文件的流程,再然后可以编写数据处理的逻辑;