HDFS的写数据过程分析
我们通过FileSystem类可以操控HDFS, 那我们就从这里开始分析写数据到HDFS的过程。在我们向 HDFS 写文件的时候,调用的是 FileSystem.create(Path path)方法,我们查看这个方法的源码,通过跟踪内部的重载方法,可以找到
/** * Opens an FSDataOutputStream at the indicated Path with write-progress * reporting. * @param f the file name to open * @param permission * @param overwrite if a file with this name already exists, then if true, * the file will be overwritten, and if false an error will be thrown. * @param bufferSize the size of the buffer to be used. * @param replication required block replication for the file. * @param blockSize * @param progress * @throws IOException * @see #setPermission(Path, FsPermission) */ public abstract FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException;这个方法是抽象类,没有实现。那么我们只能向他的子类寻找实现。FileSystem 有个子类是 DistributedFileSystem,在我们的伪分布环境下使用的就是这个类。我们可以看到DistributedFileSystem 的这个方法的实现
点create方法 按ctrl+T 点击DistributedFileSystem 进进去了:public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException { statistics.incrementWriteOps(1); return new FSDataOutputStream (dfs.create(getPathName(f), permission, overwrite, true, replication, blockSize, progress, bufferSize), statistics); }注意返回值 FSDataOutputStream。这个返回值对象调用了自己的构造方法, 构造方法的第一个参数是 dfs.create()方法。 我们关注一下这里的 dfs 对象是谁,create 方法做了什么事情。现在进入这个方法的实现,
/** * Create a new dfs file with the specified block replication * with write-progress reporting and return an output stream for writing * into the file. * * @param src stream name * @param permission The permission of the directory being created. * If permission == null, use {@link FsPermission#getDefault()}. * @param overwrite do not check for file existence if true * @param createParent create missing parent directory if true * @param replication block replication * @return output stream * @throws IOException * @see ClientProtocol#create(String, FsPermission, String, boolean, short, long) */ public OutputStream create(String src, FsPermission permission, boolean overwrite, boolean createParent, short replication, long blockSize, Progressable progress, int buffersize ) throws IOException { checkOpen(); if (permission == null) { permission = FsPermission.getDefault(); } FsPermission masked = permission.applyUMask(FsPermission.getUMask(conf)); LOG.debug(src + ": masked=" + masked); final DFSOutputStream result = new DFSOutputStream(src, masked, overwrite, createParent, replication, blockSize, progress, buffersize, conf.getInt("io.bytes.per.checksum", 512)); beginFileLease(src, result); return result; }final DFSOutputStream result = new DFSOutputStream(src, masked,
返回值创建的对象。这个类有什么神奇的地方吗?我们看一下他的(点击DFSOutputStream)源码,/** * Create a new output stream to the given DataNode. * @see ClientProtocol#create(String, FsPermission, String, boolean, short, long) */ DFSOutputStream(String src, FsPermission masked, boolean overwrite, boolean createParent, short replication, long blockSize, Progressable progress, int buffersize, int bytesPerChecksum) throws IOException { this(src, blockSize, progress, bytesPerChecksum, replication); computePacketChunkSize(writePacketSize, bytesPerChecksum); try { // Make sure the regular create() is done through the old create(). // This is done to ensure that newer clients (post-1.0) can talk to // older clusters (pre-1.0). Older clusters lack the new create() // method accepting createParent as one of the arguments. if (createParent) { namenode.create( src, masked, clientName, overwrite, replication, blockSize); } else { namenode.create( src, masked, clientName, overwrite, false, replication, blockSize); } } catch(RemoteException re) { throw re.unwrapRemoteException(AccessControlException.class, FileAlreadyExistsException.class, FileNotFoundException.class, NSQuotaExceededException.class, DSQuotaExceededException.class); } streamer.start(); }可 以 看 到 , 这 个 类 是 DFSClient 的 内 部 类 。 在 类 内 部 通 过 调用namenode.create()方法创建了一个输出流。我们再看一下 namenode 对象是什么类型
public final ClientProtocol namenode;可以看到 namenode 其实是 ClientProtocal 接口。那么,这个对象是什么时候创建的?点击outline点击下面这个DFSClient:
/** * Create a new DFSClient connected to the given nameNodeAddr or rpcNamenode. * Exactly one of nameNodeAddr or rpcNamenode must be null. */ DFSClient(InetSocketAddress nameNodeAddr, ClientProtocol rpcNamenode, Configuration conf, FileSystem.Statistics stats) throws IOException { this.conf = conf; this.stats = stats; this.nnAddress = nameNodeAddr; this.socketTimeout = conf.getInt("dfs.socket.timeout", HdfsConstants.READ_TIMEOUT); this.datanodeWriteTimeout = conf.getInt("dfs.datanode.socket.write.timeout", HdfsConstants.WRITE_TIMEOUT); this.timeoutValue = this.socketTimeout; this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class); // dfs.write.packet.size is an internal config variable this.writePacketSize = conf.getInt("dfs.write.packet.size", 64*1024); this.maxBlockAcquireFailures = getMaxBlockAcquireFailures(conf); this.hdfsTimeout = Client.getTimeout(conf); ugi = UserGroupInformation.getCurrentUser(); this.authority = nameNodeAddr == null? "null": nameNodeAddr.getHostName() + ":" + nameNodeAddr.getPort(); String taskId = conf.get("mapred.task.id", "NONMAPREDUCE"); this.clientName = "DFSClient_" + taskId + "_" + r.nextInt() + "_" + Thread.currentThread().getId(); defaultBlockSize = conf.getLong("dfs.block.size", DEFAULT_BLOCK_SIZE); defaultReplication = (short) conf.getInt("dfs.replication", 3); if (nameNodeAddr != null && rpcNamenode == null) { this.rpcNamenode = createRPCNamenode(nameNodeAddr, conf, ugi); this.namenode = createNamenode(this.rpcNamenode, conf); } else if (nameNodeAddr == null && rpcNamenode != null) { //This case is used for testing. this.namenode = this.rpcNamenode = rpcNamenode; } else { throw new IllegalArgumentException( "Expecting exactly one of nameNodeAddr and rpcNamenode being null: " + "nameNodeAddr=" + nameNodeAddr + ", rpcNamenode=" + rpcNamenode); } // read directly from the block file if configured. this.shortCircuitLocalReads = conf.getBoolean( DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT); if (LOG.isDebugEnabled()) { LOG.debug("Short circuit read is " + shortCircuitLocalReads); } this.connectToDnViaHostname = conf.getBoolean( DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME, DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT); if (LOG.isDebugEnabled()) { LOG.debug("Connect to datanode via hostname is " + connectToDnViaHostname); } String localInterfaces[] = conf.getStrings(DFSConfigKeys.DFS_CLIENT_LOCAL_INTERFACES); if (null == localInterfaces) { localInterfaces = new String[0]; } this.localInterfaceAddrs = getLocalInterfaceAddrs(localInterfaces); if (LOG.isDebugEnabled() && 0 != localInterfaces.length) { LOG.debug("Using local interfaces [" + StringUtils.join(",",localInterfaces)+ "] with addresses [" + StringUtils.join(",",localInterfaceAddrs) + "]"); } }可以看到34行namenode 对象是在 DFSClient 的构造函数调用时创建的,即当 DFSClient 对象存在的时候,namenode 对象已经存在了。
至此,我们可以看到,使用 FileSystem 对象的 api 操纵 HDFS,其实是通过 DFSClient 对象访问 NameNode 中的方法操纵 HDFS 的。 这里的 DFSClient 是 RPC 机制的客户端, NameNode是 RPC 机制的服务端的调用对象,整个调用过程如图
在整个过程中,DFSClient 是个很重要的类,从名称就可以看出,他表示 HDFS 的 Client,是整个 HDFS 的 RPC 机制的客户端部分。我们对 HDFS 的操作,是通过 FileSsytem 调用的DFSClient 里面的方法。FileSystem 是封装了对 DFSClient 的操作,提供给用户使用的
HDFS的写数据过程分析的更多相关文章
-
Linux启动kettle及linux和windows中kettle往hdfs中写数据(3)
在xmanager中的xshell运行进入图形化界面 sh spoon.sh 新建一个job
-
HDFS 读/写数据流程
1. HDFS 写数据流程 客户端通过 Distributed FileSystem 模块向 NameNode 请求上传文件, NameNode 检查目标文件是否已存在,父目录是否存在: NameNo ...
-
HDFS源码解析:教你用HDFS客户端写数据
摘要:终于开始了这个很感兴趣但是一直觉得困难重重的源码解析工作,也算是一个好的开端. 本文分享自华为云社区<hdfs源码解析之客户端写数据>,作者: dayu_dls. 在我们客户端写数据 ...
-
通过FSDataOutputStream向HDFS上写数据
FSDataOutputStream,这个类重载了很多write方法,用于写入很多类型的数据:比如字节数组,long,int,char等等. 像FSDataInputStream一样,要获得FSDat ...
-
HDFS数据流——写数据流程
剖析HDFS文件写入 假设文件ss.avi共200m,其写入HDFS指定路径/user/atguigu/ss.avi流程如下: 1)客户端向namenode请求上传文件到指定路径,namenode通过 ...
-
Hadoop源码分析之客户端向HDFS写数据
转自:http://www.tuicool.com/articles/neUrmu 在上一篇博文中分析了客户端从HDFS读取数据的过程,下面来看看客户端是怎么样向HDFS写数据的,下面的代码将本地文件 ...
-
Hadoop(8)-HDFS的读写数据流程以及机架感知
1. HDFS的写数据流程 1.客户端通过fs模块向NameNode申请文件上传,NameNode检查请求是否合法,如用户权限,目标文件是否已存在,父目录是否存在等等 2.NameNode返回是否可以 ...
-
HDFS写数据和读数据流程
HDFS数据存储 HDFS client上传数据到HDFS时,首先,在本地缓存数据,当数据达到一个block大小时.请求NameNode分配一个block. NameNode会把block所在的Dat ...
-
HDFS写文件过程分析
转自http://shiyanjun.cn/archives/942.html HDFS是一个分布式文件系统,在HDFS上写文件的过程与我们平时使用的单机文件系统非常不同,从宏观上来看,在HDFS文件 ...
随机推荐
-
扩展RBAC用户角色权限设计方案
RBAC(Role-Based Access Control,基于角色的访问控制),就是用户通过角色与权限进行关联.简单地说,一个用户拥有若干角色,每一个角色拥有若干权限.这样,就构造成“用户-角色- ...
-
HBase Shell 常见操作
1.一般操作 status 查看状态 version 查看HBase版本 2.DDL操作 create 'member','member_id','address','info' 创建了一个membe ...
-
Centos配置Caffe详解
http://www.tuicool.com/articles/uiuA3e
-
如何监听input的脚本赋值
今天记录下我解决input值改变监听,大家肯定首先想到onchange方法.对于实时监听改变用onpropertychange.oninput等方法:可是,onchange并不能监听脚本改变的值,对于 ...
-
Android初学:联系创建Activity
public class Activity2 extends Activity{ @Override protected void onCreate(Bundle savedInstanceState ...
-
C与C++动态分配二维数组
C: C中使用函数malloc和free两个函数. //动态分配M*N维 int **a=(int **)malloc(sizeof(int*)*M); ;i<M;i++) a[i]=(int ...
-
Linux基础(七)
一.nfs服务 nfs(Network File System)即网络文件系统,它允许网络中的计算机之间通过TCP/IP网络共享资源.常用于Linux系统之间的文件共享. nfs在文件传送过程中依赖r ...
-
Android View绘制回调方法流程
Android中View的性命周期,挪用 invalidate() 战 requestLayout() 会触收哪些方式,一张图就可以讲解的很详细. 该图确切一看便特别很是清楚.让人简略的懂得View的 ...
-
vue 动态加载组建
<component :is="comp1"></component> data () { return { comp1:'', } } require.e ...
-
Formal Grammars of English -10 chapter(Speech and Language Processing)
determiner 限定词 DET propernoun 专有名词 NP (or noun phrase) mass noun 不可数名词 Det Nouns 限定词名词 relative pro ...