【原创】大数据基础之Spark(7)spark读取文件split过程(即RDD分区数量)

时间:2021-06-21 00:36:21

spark 2.1.1

spark初始化rdd的时候,需要读取文件,通常是hdfs文件,在读文件的时候可以指定最小partition数量,这里只是建议的数量,实际可能比这个要大(比如文件特别多或者特别大时),也可能比这个要小(比如文件只有一个而且很小时),如果没有指定最小partition数量,初始化完成的rdd默认有多少个partition是怎样决定的呢?

以SparkContext.textfile为例来看下代码:

org.apache.spark.SparkContext

  /**
* Read a text file from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI, and return it as an RDD of Strings.
*/
def textFile(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
assertNotStopped()
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString).setName(path)
} /**
* Default min number of partitions for Hadoop RDDs when not given by user
* Notice that we use math.min so the "defaultMinPartitions" cannot be higher than 2.
* The reasons for this are discussed in https://github.com/mesos/spark/pull/718
*/
def defaultMinPartitions: Int = math.min(defaultParallelism, 2) /** Get an RDD for a Hadoop file with an arbitrary InputFormat
*
* @note Because Hadoop's RecordReader class re-uses the same Writable object for each
* record, directly caching the returned RDD or directly passing it to an aggregation or shuffle
* operation will create many references to the same object.
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
*/
def hadoopFile[K, V](
path: String,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
assertNotStopped() // This is a hack to enforce loading hdfs-site.xml.
// See SPARK-11227 for details.
FileSystem.getLocal(hadoopConfiguration) // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))
val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
new HadoopRDD(
this,
confBroadcast,
Some(setInputPathsFunc),
inputFormatClass,
keyClass,
valueClass,
minPartitions).setName(path)
}

可见会直接返回一个HadoopRDD,如果不传最小partition数量,会使用defaultMinPartitions(通常情况下是2),那么HadoopRDD是怎样实现的?

org.apache.spark.rdd.HadoopRDD

class HadoopRDD[K, V](
sc: SparkContext,
broadcastedConf: Broadcast[SerializableConfiguration],
initLocalJobConfFuncOpt: Option[JobConf => Unit],
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int)
extends RDD[(K, V)](sc, Nil) with Logging {
...
override def getPartitions: Array[Partition] = {
val jobConf = getJobConf()
// add the credentials here as this can be called before SparkContext initialized
SparkHadoopUtil.get.addCredentials(jobConf)
val inputFormat = getInputFormat(jobConf)
val inputSplits = inputFormat.getSplits(jobConf, minPartitions)
val array = new Array[Partition](inputSplits.size)
for (i <- 0 until inputSplits.size) {
array(i) = new HadoopPartition(id, i, inputSplits(i))
}
array
}
...
protected def getInputFormat(conf: JobConf): InputFormat[K, V] = {
val newInputFormat = ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf)
.asInstanceOf[InputFormat[K, V]]
newInputFormat match {
case c: Configurable => c.setConf(conf)
case _ =>
}
newInputFormat
}

决定分区数量的逻辑在getPartitions中,实际上调用的是InputFormat.getSplits,InputFormat是一个接口,

org.apache.hadoop.mapred.InputFormat

public interface InputFormat<K, V> {
InputSplit[] getSplits(JobConf var1, int var2) throws IOException; RecordReader<K, V> getRecordReader(InputSplit var1, JobConf var2, Reporter var3) throws IOException;
}

每种文件格式都有自己的实现类,常见的文件格式avro、orc、parquet、textfile对应的实现类为AvroInputFormat,OrcInputFormat,MapredParquetInputFormat,CombineTextInputFormat,每个实现类都有自己的split逻辑,来看下默认实现:

org.apache.hadoop.mapred.FileInputFormat

  /** Splits files returned by {@link #listStatus(JobConf)} when
* they're too big.*/
public InputSplit[] getSplits(JobConf job, int numSplits)
throws IOException {
FileStatus[] files = listStatus(job); // Save the number of input files for metrics/loadgen
job.setLong(NUM_INPUT_FILES, files.length);
long totalSize = 0; // compute total size
for (FileStatus file: files) { // check we have valid files
if (file.isDirectory()) {
throw new IOException("Not a file: "+ file.getPath());
}
totalSize += file.getLen();
} long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize); // generate splits
ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
NetworkTopology clusterMap = new NetworkTopology();
for (FileStatus file: files) {
Path path = file.getPath();
long length = file.getLen();
if (length != 0) {
FileSystem fs = path.getFileSystem(job);
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
if (isSplitable(fs, path)) {
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(goalSize, minSize, blockSize); long bytesRemaining = length;
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
String[] splitHosts = getSplitHosts(blkLocations,
length-bytesRemaining, splitSize, clusterMap);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
splitHosts));
bytesRemaining -= splitSize;
} if (bytesRemaining != 0) {
String[] splitHosts = getSplitHosts(blkLocations, length
- bytesRemaining, bytesRemaining, clusterMap);
splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
splitHosts));
}
} else {
String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);
splits.add(makeSplit(path, 0, length, splitHosts));
}
} else {
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
LOG.debug("Total # of splits: " + splits.size());
return splits.toArray(new FileSplit[splits.size()]);
} /**
* This function identifies and returns the hosts that contribute
* most for a given split. For calculating the contribution, rack
* locality is treated on par with host locality, so hosts from racks
* that contribute the most are preferred over hosts on racks that
* contribute less
* @param blkLocations The list of block locations
* @param offset
* @param splitSize
* @return array of hosts that contribute most to this split
* @throws IOException
*/
protected String[] getSplitHosts(BlockLocation[] blkLocations,
long offset, long splitSize, NetworkTopology clusterMap)
throws IOException { int startIndex = getBlockIndex(blkLocations, offset); long bytesInThisBlock = blkLocations[startIndex].getOffset() +
blkLocations[startIndex].getLength() - offset; //If this is the only block, just return
if (bytesInThisBlock >= splitSize) {
return blkLocations[startIndex].getHosts();
} long bytesInFirstBlock = bytesInThisBlock;
int index = startIndex + 1;
splitSize -= bytesInThisBlock; while (splitSize > 0) {
bytesInThisBlock =
Math.min(splitSize, blkLocations[index++].getLength());
splitSize -= bytesInThisBlock;
} long bytesInLastBlock = bytesInThisBlock;
int endIndex = index - 1; Map <Node,NodeInfo> hostsMap = new IdentityHashMap<Node,NodeInfo>();
Map <Node,NodeInfo> racksMap = new IdentityHashMap<Node,NodeInfo>();
String [] allTopos = new String[0]; // Build the hierarchy and aggregate the contribution of
// bytes at each level. See TestGetSplitHosts.java for (index = startIndex; index <= endIndex; index++) { // Establish the bytes in this block
if (index == startIndex) {
bytesInThisBlock = bytesInFirstBlock;
}
else if (index == endIndex) {
bytesInThisBlock = bytesInLastBlock;
}
else {
bytesInThisBlock = blkLocations[index].getLength();
} allTopos = blkLocations[index].getTopologyPaths(); // If no topology information is available, just
// prefix a fakeRack
if (allTopos.length == 0) {
allTopos = fakeRacks(blkLocations, index);
} // NOTE: This code currently works only for one level of
// hierarchy (rack/host). However, it is relatively easy
// to extend this to support aggregation at different
// levels for (String topo: allTopos) { Node node, parentNode;
NodeInfo nodeInfo, parentNodeInfo; node = clusterMap.getNode(topo); if (node == null) {
node = new NodeBase(topo);
clusterMap.add(node);
} nodeInfo = hostsMap.get(node); if (nodeInfo == null) {
nodeInfo = new NodeInfo(node);
hostsMap.put(node,nodeInfo);
parentNode = node.getParent();
parentNodeInfo = racksMap.get(parentNode);
if (parentNodeInfo == null) {
parentNodeInfo = new NodeInfo(parentNode);
racksMap.put(parentNode,parentNodeInfo);
}
parentNodeInfo.addLeaf(nodeInfo);
}
else {
nodeInfo = hostsMap.get(node);
parentNode = node.getParent();
parentNodeInfo = racksMap.get(parentNode);
} nodeInfo.addValue(index, bytesInThisBlock);
parentNodeInfo.addValue(index, bytesInThisBlock); } // for all topos } // for all indices return identifyHosts(allTopos.length, racksMap);
}

大致过程如下:

getSplits首先会拿到所有需要读取的file列表,然后会迭代这个file列表,首先看一个file是否可以再分即isSplitable(默认是true可能被子类覆盖),如果不能再split则直接作为1个split,如果可以再split,则获取这个file的block信息,然后综合根据多个参数来计算出1个split的数据大小即splitSize,然后会将这个file的所有block划分为多个split,划分过程会考虑机架、host等因素,如果是大block,则直接作为一个split,如果是小block可能多个block合并在一个split里(这样能够尽量减少split数量),最终得到的split数量即partition数量;

注意:上边的过程可能被子类覆盖;

【原创】大数据基础之Spark(7)spark读取文件split过程(即RDD分区数量)的更多相关文章

  1. 大数据学习系列之七 ----- Hadoop&plus;Spark&plus;Zookeeper&plus;HBase&plus;Hive集群搭建 图文详解

    引言 在之前的大数据学习系列中,搭建了Hadoop+Spark+HBase+Hive 环境以及一些测试.其实要说的话,我开始学习大数据的时候,搭建的就是集群,并不是单机模式和伪分布式.至于为什么先写单 ...

  2. CentOS6安装各种大数据软件 第十章:Spark集群安装和部署

    相关文章链接 CentOS6安装各种大数据软件 第一章:各个软件版本介绍 CentOS6安装各种大数据软件 第二章:Linux各个软件启动命令 CentOS6安装各种大数据软件 第三章:Linux基础 ...

  3. 大数据平台搭建(hadoop&plus;spark)

    大数据平台搭建(hadoop+spark) 一.基本信息 1. 服务器基本信息 主机名 ip地址 安装服务 spark-master 172.16.200.81 jdk.hadoop.spark.sc ...

  4. 大数据系列之并行计算引擎Spark部署及应用

    相关博文: 大数据系列之并行计算引擎Spark介绍 之前介绍过关于Spark的程序运行模式有三种: 1.Local模式: 2.standalone(独立模式) 3.Yarn/mesos模式 本文将介绍 ...

  5. 大数据系列之并行计算引擎Spark介绍

    相关博文:大数据系列之并行计算引擎Spark部署及应用 Spark: Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎. Spark是UC Berkeley AMP lab ( ...

  6. 【原创】大数据基础之Zookeeper(2)源代码解析

    核心枚举 public enum ServerState { LOOKING, FOLLOWING, LEADING, OBSERVING; } zookeeper服务器状态:刚启动LOOKING,f ...

  7. 大数据基础总结---HDFS分布式文件系统

    HDFS分布式文件系统 文件系统的基本概述 文件系统定义:文件系统是一种存储和组织计算机数据的方法,它使得对其访问和查找变得容易. 文件名:在文件系统中,文件名是用于定位存储位置. 元数据(Metad ...

  8. 【原创】大数据基础之Spark(6)Spark Rdd Sort实现原理

    spark 2.1.1 spark中可以通过RDD.sortBy来对分布式数据进行排序,具体是如何实现的?来看代码: org.apache.spark.rdd.RDD /** * Return thi ...

  9. 【原创】大数据基础之Spark(5)Shuffle实现原理及代码解析

    一 简介 Shuffle,简而言之,就是对数据进行重新分区,其中会涉及大量的网络io和磁盘io,为什么需要shuffle,以词频统计reduceByKey过程为例, serverA:partition ...

随机推荐

  1. C&plus;&plus; map的使用

    C++ map的基本操作和使用 来源:(http://blog.sina.com.cn/s/blog_61533c9b0100fa7w.html) - C++ map的基本操作和使用_Live_新浪博 ...

  2. RAP开发入门-开发笔记

    一.发布/运行 每次项目发布时需要在MANIFEST.MF->bulid中勾选依赖包.文件.代码等,避免报错 部署时项目可能会报一个baseline的错误,window->preferen ...

  3. textarea中限制输入字符长度&lpar;实用版&rpar;

    textarea称文本域,又称文本区,即有滚动条的多行文本输入控件,在网页的提交表单中经常用到.与单行文本框text控件不同,它不能通过maxlength属性来限制字数,为此必须寻求其他方法来加以限制 ...

  4. 【Unity3D自学记录】利用代码改动图片属性&lpar;Inspector&rpar;

    这段时间一直都在打包资源,然后每次导入都要改图片的属性.真是麻烦,所以一直在寻找一键改动而且打包的方法. 最终让我找到了,太坑人了. 依据自己的需求改代码哦,相信大家都能看明确. 核心部分: Text ...

  5. java实现——005从尾到头打印链表

    import java.util.Stack; public class T005 { public static void main(String[] args){ Node n1 = new No ...

  6. 在Pycharm中使用jupyter笔记本

    在Pycharm中使用jupyter笔记本 我在Pycharm中使用jupyter笔记本,发现新的Jupyter更新中,增加了令牌. 随着创建的虚拟环境启动的所有设置,并将URL设置为127.0.0. ...

  7. weixinShare&period;js &sol; 极简微信分享插件

    weixinShare.js / 极简微信分享插件 / 版本:0.1 这是一个很简单.很实用的微信分享插件,无需jQuery,只需要在网页里加入一行JS代码,即可自动识别微信浏览器并启动微信分享的提示 ...

  8. TZOJ 4848 货车运输&lpar;最大生成树&plus;倍增lca&rpar;

    描述 A 国有 n 座城市,编号从 1 到 n,城市之间有 m 条双向道路.每一条道路对车辆都有重量限制,简称限重.现在有 q 辆货车在运输货物,司机们想知道每辆车在不超过车辆限重的情况下,最多能运多 ...

  9. Android 常用的数据加密方式

    前言 Android 很多场合需要使用到数据加密,比如:本地登录密码加密,网络传输数据加密,等.在android 中一般的加密方式有如下: 亦或加密 AES加密 RSA非对称加密 当然还有其他的方式, ...

  10. Python3玩转单链表——逆转单向链表pythonic版

    [本文出自天外归云的博客园] 链表是由节点构成的,一个指针代表一个方向,如果一个构成链表的节点都只包含一个指针,那么这个链表就是单向链表. 单向链表中的节点不光有代表方向的指针变量,也有值变量.所以我 ...