Skip to content

Commit

Permalink
[HOPS-1315] fix NullPointerException in PROCESS_TIMEDOUT_PENDING_BLOCK
Browse files Browse the repository at this point in the history
  • Loading branch information
berthoug committed May 17, 2019
1 parent e041eb5 commit 24ad69c
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -5250,7 +5250,8 @@ public BlockInfo tryToCompleteBlock(final BlockCollection bc,
return completeBlock;
}

private void processTimedOutPendingBlock(final long timedOutItemId)
@VisibleForTesting
public void processTimedOutPendingBlock(final long timedOutItemId)
throws IOException {
new HopsTransactionalRequestHandler(
HDFSOperationType.PROCESS_TIMEDOUT_PENDING_BLOCK) {
Expand Down Expand Up @@ -5279,6 +5280,9 @@ public void acquireLock(TransactionLocks locks) throws IOException {
public Object performTask() throws IOException {
BlockInfo timedOutItem = EntityManager
.find(BlockInfo.Finder.ByBlockIdAndINodeId, timedOutItemId);
if(timedOutItem==null){
return null;
}
NumberReplicas num = countNodes(timedOutItem);
if (isNeededReplication(timedOutItem, getReplication(timedOutItem),
num.liveReplicas())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,109 @@ public void testProcessPendingReplications() throws Exception {
}
}

/**
* Test for the race condition that may happen when processing pending replication and removing files in parallel
*
* @throws Exception
*/
@Test
public void testProcessNullPendingReplications() throws Exception {
final short REPLICATION_FACTOR = (short) 1;

// start a mini dfs cluster of 2 nodes
final Configuration conf = new HdfsConfiguration();
//we do not want the nameNode to run processPendingReplications before
//we do it ourself
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 100);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY,
30);
final MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(REPLICATION_FACTOR)
.build();
try {
final FSNamesystem namesystem = cluster.getNamesystem();
final BlockManager bm = namesystem.getBlockManager();
final FileSystem fs = cluster.getFileSystem();

PendingReplicationBlocks pendingReplications = bm.pendingReplications;

//
// populate the cluster with 10 one block file
//
DatanodeStorageInfo[] storages = DFSTestUtil.createDatanodeStorageInfos(10);
for (int i = 0; i < 10; i++) {
final Path FILE_PATH = new Path("/testfile_" + i);
DFSTestUtil.createFile(fs, FILE_PATH, 1L, REPLICATION_FACTOR, 1L);
DFSTestUtil.waitReplication(fs, FILE_PATH, REPLICATION_FACTOR);
//increase the block replication so that they are under replicated
fs.setReplication(FILE_PATH, (short) 2);
final ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, FILE_PATH);
DatanodeStorageInfo[] targets = new DatanodeStorageInfo[1];
System.arraycopy(storages, i, targets, 0, 1);
increment(pendingReplications, block.getLocalBlock(), DatanodeStorageInfo.toDatanodeDescriptors(targets));
}
int test = pendingReplications.size();

assertEquals("Size of pendingReplications " + test, 10,
pendingReplications.size());


//
// Wait for everything to timeout.
//
int loop = 0;
while (pendingReplications.size() > 0) {
try {
Thread.sleep(1000);
} catch (Exception e) {
}
loop++;
}
System.out.println("Had to wait for " + loop +
" seconds for the lot to timeout");

//
// Verify that everything has timed out.
//
assertEquals("Size of pendingReplications ", 0,
pendingReplications.size());
long[] timedOut = pendingReplications.getTimedOutBlocks();
assertTrue(timedOut != null && timedOut.length == 10);

//get the timedout blocks
long[] timedOutItems = pendingReplications.getTimedOutBlocks();

//remove the files containting the blocks
for (int i = 0; i < 10; i++) {
final Path FILE_PATH = new Path("/testfile_" + i);
fs.delete(FILE_PATH, true);

}

//process timedout blocks
if (timedOutItems != null) {
for (long timedOutItem : timedOutItems) {
bm.processTimedOutPendingBlock(timedOutItem);
}
/*
* If we know the target datanodes where the replication timedout,
* we could invoke decBlocksScheduled() on it. Its ok for now.
*/
}


//
// Verify that the blocks have been removed from the pendingreplication
// database
//
timedOut = pendingReplications.getTimedOutBlocks();
assertTrue("blocks removed from pending", timedOut == null);

} finally {
cluster.shutdown();
}
}

private BlockInfo newBlockInfo(final Block block, final int inodeId)
throws IOException {
final BlockInfo blockInfo = new BlockInfo(block, inodeId);
Expand Down

0 comments on commit 24ad69c

Please sign in to comment.