spark 2.1.1
* Read a text file from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI, and return it as an RDD of Strings.
def textFile(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString).setName(path)
} /**
* Default min number of partitions for Hadoop RDDs when not given by user
* Notice that we use math.min so the "defaultMinPartitions" cannot be higher than 2.
* The reasons for this are discussed in
def defaultMinPartitions: Int = math.min(defaultParallelism, 2) /** Get an RDD for a Hadoop file with an arbitrary InputFormat
* @note Because Hadoop's RecordReader class re-uses the same Writable object for each
* record, directly caching the returned RDD or directly passing it to an aggregation or shuffle
* operation will create many references to the same object.
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
def hadoopFile[K, V](
path: String,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
assertNotStopped() // This is a hack to enforce loading hdfs-site.xml.
// See SPARK-11227 for details.
FileSystem.getLocal(hadoopConfiguration) // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))
val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
new HadoopRDD(
class HadoopRDD[K, V](
sc: SparkContext,
broadcastedConf: Broadcast[SerializableConfiguration],
initLocalJobConfFuncOpt: Option[JobConf => Unit],
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int)
extends RDD[(K, V)](sc, Nil) with Logging {
override def getPartitions: Array[Partition] = {
val jobConf = getJobConf()
// add the credentials here as this can be called before SparkContext initialized
val inputFormat = getInputFormat(jobConf)
val inputSplits = inputFormat.getSplits(jobConf, minPartitions)
val array = new Array[Partition](inputSplits.size)
for (i <- 0 until inputSplits.size) {
array(i) = new HadoopPartition(id, i, inputSplits(i))
protected def getInputFormat(conf: JobConf): InputFormat[K, V] = {
val newInputFormat = ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf)
.asInstanceOf[InputFormat[K, V]]
newInputFormat match {
case c: Configurable => c.setConf(conf)
case _ =>
public interface InputFormat<K, V> {
InputSplit[] getSplits(JobConf var1, int var2) throws IOException; RecordReader<K, V> getRecordReader(InputSplit var1, JobConf var2, Reporter var3) throws IOException;
/** Splits files returned by {@link #listStatus(JobConf)} when
* they're too big.*/
public InputSplit[] getSplits(JobConf job, int numSplits)
throws IOException {
FileStatus[] files = listStatus(job); // Save the number of input files for metrics/loadgen
job.setLong(NUM_INPUT_FILES, files.length);
long totalSize = 0; // compute total size
for (FileStatus file: files) { // check we have valid files
if (file.isDirectory()) {
throw new IOException("Not a file: "+ file.getPath());
totalSize += file.getLen();
} long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize); // generate splits
ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
NetworkTopology clusterMap = new NetworkTopology();
for (FileStatus file: files) {
Path path = file.getPath();
long length = file.getLen();
if (length != 0) {
FileSystem fs = path.getFileSystem(job);
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
blkLocations = fs.getFileBlockLocations(file, 0, length);
if (isSplitable(fs, path)) {
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(goalSize, minSize, blockSize); long bytesRemaining = length;
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
String[] splitHosts = getSplitHosts(blkLocations,
length-bytesRemaining, splitSize, clusterMap);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
bytesRemaining -= splitSize;
} if (bytesRemaining != 0) {
String[] splitHosts = getSplitHosts(blkLocations, length
- bytesRemaining, bytesRemaining, clusterMap);
splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
} else {
String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);
splits.add(makeSplit(path, 0, length, splitHosts));
} else {
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));
LOG.debug("Total # of splits: " + splits.size());
return splits.toArray(new FileSplit[splits.size()]);
} /**
* This function identifies and returns the hosts that contribute
* most for a given split. For calculating the contribution, rack
* locality is treated on par with host locality, so hosts from racks
* that contribute the most are preferred over hosts on racks that
* contribute less
* @param blkLocations The list of block locations
* @param offset
* @param splitSize
* @return array of hosts that contribute most to this split
* @throws IOException
protected String[] getSplitHosts(BlockLocation[] blkLocations,
long offset, long splitSize, NetworkTopology clusterMap)
throws IOException { int startIndex = getBlockIndex(blkLocations, offset); long bytesInThisBlock = blkLocations[startIndex].getOffset() +
blkLocations[startIndex].getLength() - offset; //If this is the only block, just return
if (bytesInThisBlock >= splitSize) {
return blkLocations[startIndex].getHosts();
} long bytesInFirstBlock = bytesInThisBlock;
int index = startIndex + 1;
splitSize -= bytesInThisBlock; while (splitSize > 0) {
bytesInThisBlock =
Math.min(splitSize, blkLocations[index++].getLength());
splitSize -= bytesInThisBlock;
} long bytesInLastBlock = bytesInThisBlock;
int endIndex = index - 1; Map <Node,NodeInfo> hostsMap = new IdentityHashMap<Node,NodeInfo>();
Map <Node,NodeInfo> racksMap = new IdentityHashMap<Node,NodeInfo>();
String [] allTopos = new String[0]; // Build the hierarchy and aggregate the contribution of
// bytes at each level. See for (index = startIndex; index <= endIndex; index++) { // Establish the bytes in this block
if (index == startIndex) {
bytesInThisBlock = bytesInFirstBlock;
else if (index == endIndex) {
bytesInThisBlock = bytesInLastBlock;
else {
bytesInThisBlock = blkLocations[index].getLength();
} allTopos = blkLocations[index].getTopologyPaths(); // If no topology information is available, just
// prefix a fakeRack
if (allTopos.length == 0) {
allTopos = fakeRacks(blkLocations, index);
} // NOTE: This code currently works only for one level of
// hierarchy (rack/host). However, it is relatively easy
// to extend this to support aggregation at different
// levels for (String topo: allTopos) { Node node, parentNode;
NodeInfo nodeInfo, parentNodeInfo; node = clusterMap.getNode(topo); if (node == null) {
node = new NodeBase(topo);
} nodeInfo = hostsMap.get(node); if (nodeInfo == null) {
nodeInfo = new NodeInfo(node);
parentNode = node.getParent();
parentNodeInfo = racksMap.get(parentNode);
if (parentNodeInfo == null) {
parentNodeInfo = new NodeInfo(parentNode);
else {
nodeInfo = hostsMap.get(node);
parentNode = node.getParent();
parentNodeInfo = racksMap.get(parentNode);
} nodeInfo.addValue(index, bytesInThisBlock);
parentNodeInfo.addValue(index, bytesInThisBlock); } // for all topos } // for all indices return identifyHosts(allTopos.length, racksMap);
