原因:
因为公司的平台的数据量在30P左右,使用了Hadoop3.1.2的版本,而且使用的纠删码功能,报错信息如下:
java.io.IOException: Unexpected EOS from the reader
at org.apache.hadoop.hdfs.StripeReader.readToBuffer(StripeReader.java:241)
at org.apache.hadoop.hdfs.StripeReader.lambda$readCells$0(StripeReader.java:281)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
经查询官网,是因为原生存在bug
bug链接地址:https://issues.apache.org/jira/browse/HDFS-14373
下载针对修复3.1.x版本的patch包
修复代码
打开patch包,查看一下包放置的路径和需要修复的代码
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java
index 168b48c..e840da9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java
@@ -247,6 +247,8 @@ private int readToBuffer(BlockReader blockReader,
DFSClient.LOG.warn("Found Checksum error for "
+ currentBlock + " from " + currentNode
+ " at " + ce.getPos());
+ //Clear buffer to make next decode success
+ strategy.getReadBuffer().clear();
// we want to remember which block replicas we have tried
corruptedBlocks.addCorruptedBlock(currentBlock, currentNode);
throw ce;
@@ -254,6 +256,8 @@ private int readToBuffer(BlockReader blockReader,
DFSClient.LOG.warn("Exception while reading from "
+ currentBlock + " of " + dfsStripedInputStream.getSrc() + " from "
+ currentNode, e);
+ //Clear buffer to make next decode success
+ strategy.getReadBuffer().clear();
throw e;
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
index 4c2ff92..2b09a7f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
@@ -355,7 +355,8 @@ public static long spaceConsumedByStripedBlock(long numDataBlkBytes,
cells);
// Step 3: merge into stripes
- AlignedStripe[] stripes = mergeRangesForInternalBlocks(ecPolicy, ranges);
+ AlignedStripe[] stripes = mergeRangesForInternalBlocks(ecPolicy, ranges,
+ blockGroup, cellSize);
// Step 4: calculate each chunk's position in destination buffer. Since the
// whole read range is within a single stripe, the logic is simpler here.
@@ -416,7 +417,8 @@ public static long spaceConsumedByStripedBlock(long numDataBlkBytes,
cells);
// Step 3: merge into at most 5 stripes
- AlignedStripe[] stripes = mergeRangesForInternalBlocks(ecPolicy, ranges);
+ AlignedStripe[] stripes = mergeRangesForInternalBlocks(ecPolicy, ranges,
+ blockGroup, cellSize);
// Step 4: calculate each chunk's position in destination buffer
calcualteChunkPositionsInBuf(cellSize, stripes, cells, buf);
@@ -512,7 +514,8 @@ public static long spaceConsumedByStripedBlock(long numDataBlkBytes,
* {@link AlignedStripe} instances.
*/
private static AlignedStripe[] mergeRangesForInternalBlocks(
- ErasureCodingPolicy ecPolicy, VerticalRange[] ranges) {
+ ErasureCodingPolicy ecPolicy, VerticalRange[] ranges,
+ LocatedStripedBlock blockGroup, int cellSize) {
int dataBlkNum = ecPolicy.getNumDataUnits();
int parityBlkNum = ecPolicy.getNumParityUnits();
List<AlignedStripe> stripes = new ArrayList<>();
@@ -524,6 +527,17 @@ public static long spaceConsumedByStripedBlock(long numDataBlkBytes,
}
}
+ // Add block group last cell offset in stripePoints if it is fall in to read
+ // offset range.
+ int lastCellIdxInBG = (int) (blockGroup.getBlockSize() / cellSize);
+ int idxInInternalBlk = lastCellIdxInBG / ecPolicy.getNumDataUnits();
+ long lastCellEndOffset = (idxInInternalBlk * (long)cellSize)
+ + (blockGroup.getBlockSize() % cellSize);
+ if (stripePoints.first() < lastCellEndOffset
+ && stripePoints.last() > lastCellEndOffset) {
+ stripePoints.add(lastCellEndOffset);
+ }
+
long prev = -1;
for (long point : stripePoints) {
if (prev >= 0) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
index 48ecf9a..d50d482 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
@@ -19,8 +19,11 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -561,6 +564,50 @@ public void testCloseDoesNotAllocateNewBuffer() throws Exception {
}
}
+ @Test
+ public void testReadWhenLastIncompleteCellComeInToDecodeAlignedStripe()
+ throws IOException {
+ DataNodeProperties stopDataNode = null;
+ try {
+ cluster.waitActive();
+ ErasureCodingPolicy policy = getEcPolicy();
+ DistributedFileSystem filesystem = cluster.getFileSystem();
+ filesystem.enableErasureCodingPolicy(policy.getName());
+ Path dir = new Path("/tmp");
+ filesystem.mkdirs(dir);
+ filesystem.getClient().setErasureCodingPolicy(dir.toString(),
+ policy.getName());
+ Path f = new Path(dir, "file");
+
+ //1. File with one stripe, last data cell should be half filed.
+ long fileLength = (policy.getCellSize() * policy.getNumDataUnits())
+ - (policy.getCellSize() / 2);
+ DFSTestUtil.createFile(filesystem, f, fileLength, (short) 1, 0);
+
+ //2. Stop first DN from stripe.
+ LocatedBlocks lbs = cluster.getNameNodeRpc().getBlockLocations(
+ f.toString(), 0, fileLength);
+ LocatedStripedBlock bg = (LocatedStripedBlock) (lbs.get(0));
+ final LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup(bg,
+ cellSize, dataBlocks, parityBlocks);
+ cluster.stopDataNode(blocks[0].getLocations()[0].getName());
+
+ //3. Do pread for fist cell, reconstruction should happen
+ try (FSDataInputStream in = filesystem.open(f)) {
+ DFSStripedInputStream stripedIn = (DFSStripedInputStream) in
+ .getWrappedStream();
+ byte[] b = new byte[policy.getCellSize()];
+ stripedIn.read(0, b, 0, policy.getCellSize());
+ }
+ } catch (HadoopIllegalArgumentException e) {
+ fail(e.getMessage());
+ } finally {
+ if (stopDataNode != null) {
+ cluster.restartDataNode(stopDataNode, true);
+ }
+ }
+ }
+
/**
* Empties the pool for the specified buffer type, for the current ecPolicy.
* <p>
注意:这个是修复/hadoop-hdfs-project/项目下的代码
红色部分有git的a和b路径,一定要在建立a和b的路径,把这个项目分别复制到这两个文件夹下,需要修复的时候提示找不到路径
代码修复
使用patch -p1 < HDFS-14373.branch-3.1.patch
注意,此patch包一定要放到hadoop-rel-release-3.1.2 目录下,否则也会提示找不到路径
出现以下证明代码修复成功
程序中修复的代码
StripedBlockUtil.java中修复的部分
iff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
index 4c2ff92..2b09a7f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
@@ -355,7 +355,8 @@ public static long spaceConsumedByStripedBlock(long numDataBlkBytes,
cells);
// Step 3: merge into stripes
- AlignedStripe[] stripes = mergeRangesForInternalBlocks(ecPolicy, ranges);
+ AlignedStripe[] stripes = mergeRangesForInternalBlocks(ecPolicy, ranges,
+ blockGroup, cellSize);
// Step 4: calculate each chunk's position in destination buffer. Since the
// whole read range is within a single stripe, the logic is simpler here.
@@ -416,7 +417,8 @@ public static long spaceConsumedByStripedBlock(long numDataBlkBytes,
cells);
// Step 3: merge into at most 5 stripes
- AlignedStripe[] stripes = mergeRangesForInternalBlocks(ecPolicy, ranges);
+ AlignedStripe[] stripes = mergeRangesForInternalBlocks(ecPolicy, ranges,
+ blockGroup, cellSize);
// Step 4: calculate each chunk's position in destination buffer
calcualteChunkPositionsInBuf(cellSize, stripes, cells, buf);
@@ -512,7 +514,8 @@ public static long spaceConsumedByStripedBlock(long numDataBlkBytes,
* {@link AlignedStripe} instances.
*/
private static AlignedStripe[] mergeRangesForInternalBlocks(
- ErasureCodingPolicy ecPolicy, VerticalRange[] ranges) {
+ ErasureCodingPolicy ecPolicy, VerticalRange[] ranges,
+ LocatedStripedBlock blockGroup, int cellSize) {
int dataBlkNum = ecPolicy.getNumDataUnits();
int parityBlkNum = ecPolicy.getNumParityUnits();
List<AlignedStripe> stripes = new ArrayList<>();
@@ -524,6 +527,17 @@ public static long spaceConsumedByStripedBlock(long numDataBlkBytes,
}
}
下面java代码中修复的部分(只摘取了一小段)
原来的代码
打包
找到Apache Hadoop HDFS Client的maven进行打包
打包成功
打成功的jar
打成功后替换jar包上线