diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index 01f1af9624d05..2347cb330df6b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -90,6 +90,8 @@ public class DatanodeManager { private volatile long heartbeatIntervalSeconds; private volatile int heartbeatRecheckInterval; + /** Used by {@link HeartbeatManager}. */ + private volatile long heartbeatRecheckIntervalForMonitor; /** * Stores the datanode -> block map. *
@@ -346,6 +348,7 @@ public class DatanodeManager { DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY, DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT); this.staleInterval = getStaleIntervalFromConf(conf, heartbeatExpireInterval); + refreshHeartbeatRecheckIntervalForMonitor(); this.ratioUseStaleDataNodesForWrite = conf.getFloat( DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY, DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_DEFAULT); @@ -2210,6 +2213,19 @@ private void setHeartbeatInterval(long intervalSeconds, this.heartbeatExpireInterval = 2L * recheckInterval + 10 * 1000 * intervalSeconds; this.blockInvalidateLimit = getBlockInvalidateLimit(blockInvalidateLimit); + refreshHeartbeatRecheckIntervalForMonitor(); + } + + @VisibleForTesting + public void refreshHeartbeatRecheckIntervalForMonitor() { + if (avoidStaleDataNodesForWrite && staleInterval < heartbeatRecheckInterval) { + heartbeatRecheckIntervalForMonitor = staleInterval; + LOG.info("Setting heartbeat recheck interval to " + staleInterval + + " since " + DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY + + " is less than " + heartbeatRecheckInterval); + } else { + heartbeatRecheckIntervalForMonitor = heartbeatRecheckInterval; + } } private int getBlockInvalidateLimitFromHBInterval() { @@ -2351,4 +2367,8 @@ public boolean isSlowPeerCollectorInitialized() { public long getSlowPeerCollectionInterval() { return slowPeerCollectionInterval; } + + public long getHeartbeatRecheckIntervalForMonitor() { + return heartbeatRecheckIntervalForMonitor; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java index 43ded22dbceaf..579e7b98e08b0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java @@ -67,8 +67,6 @@ class HeartbeatManager implements DatanodeStatistics { /** Statistics, which are synchronized by the heartbeat manager lock. */ private final DatanodeStats stats = new DatanodeStats(); - /** The time period to check for expired datanodes. */ - private final long heartbeatRecheckInterval; /** Heartbeat monitor thread. */ private final Daemon heartbeatThread = new Daemon(new Monitor()); private final StopWatch heartbeatStopWatch = new StopWatch(); @@ -86,31 +84,12 @@ class HeartbeatManager implements DatanodeStatistics { final BlockManager blockManager, final Configuration conf) { this.namesystem = namesystem; this.blockManager = blockManager; - boolean avoidStaleDataNodesForWrite = conf.getBoolean( - DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY, - DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT); - long recheckInterval = conf.getInt( - DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, - DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 min - long staleInterval = conf.getLong( - DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY, - DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT);// 30s enableLogStaleNodes = conf.getBoolean( DFSConfigKeys.DFS_NAMENODE_ENABLE_LOG_STALE_DATANODE_KEY, DFSConfigKeys.DFS_NAMENODE_ENABLE_LOG_STALE_DATANODE_DEFAULT); this.numOfDeadDatanodesRemove = conf.getInt( DFSConfigKeys.DFS_NAMENODE_REMOVE_DEAD_DATANODE_BATCHNUM_KEY, DFSConfigKeys.DFS_NAMENODE_REMOVE_BAD_BATCH_NUM_DEFAULT); - - if (avoidStaleDataNodesForWrite && staleInterval < recheckInterval) { - this.heartbeatRecheckInterval = staleInterval; - LOG.info("Setting heartbeat recheck interval to " + staleInterval - + " since " + DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY - + " is less than " - + DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY); - } else { - this.heartbeatRecheckInterval = recheckInterval; - } } void activate() { @@ -355,7 +334,8 @@ void restartHeartbeatStopWatch() { @VisibleForTesting boolean shouldAbortHeartbeatCheck(long offset) { long elapsed = heartbeatStopWatch.now(TimeUnit.MILLISECONDS); - return elapsed + offset > heartbeatRecheckInterval; + return elapsed + offset > blockManager.getDatanodeManager() + .getHeartbeatRecheckIntervalForMonitor(); } /** @@ -545,7 +525,8 @@ public void run() { restartHeartbeatStopWatch(); try { final long now = Time.monotonicNow(); - if (lastHeartbeatCheck + heartbeatRecheckInterval < now) { + if (lastHeartbeatCheck + blockManager.getDatanodeManager() + .getHeartbeatRecheckIntervalForMonitor() < now) { heartbeatCheck(); lastHeartbeatCheck = now; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java index bd35eb4b39f57..5966670b270ec 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java @@ -259,24 +259,23 @@ public void testHeartbeatBlockRecovery() throws Exception { @Test public void testHeartbeatStopWatch() throws Exception { - Namesystem ns = Mockito.mock(Namesystem.class); - BlockManager bm = Mockito.mock(BlockManager.class); - Configuration conf = new Configuration(); - long recheck = 2000; - conf.setLong( - DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, recheck); - HeartbeatManager monitor = new HeartbeatManager(ns, bm, conf); - monitor.restartHeartbeatStopWatch(); - assertFalse(monitor.shouldAbortHeartbeatCheck(0)); - // sleep shorter than recheck and verify shouldn't abort - Thread.sleep(100); - assertFalse(monitor.shouldAbortHeartbeatCheck(0)); - // sleep longer than recheck and verify should abort unless ignore delay - Thread.sleep(recheck); - assertTrue(monitor.shouldAbortHeartbeatCheck(0)); - assertFalse(monitor.shouldAbortHeartbeatCheck(-recheck*3)); - // ensure it resets properly - monitor.restartHeartbeatStopWatch(); - assertFalse(monitor.shouldAbortHeartbeatCheck(0)); + Namesystem ns = Mockito.mock(Namesystem.class); + Configuration conf = new Configuration(); + long recheck = 2000; + conf.setLong(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, recheck); + BlockManager bm = new BlockManager(ns, false, conf); + HeartbeatManager monitor = new HeartbeatManager(ns, bm, conf); + monitor.restartHeartbeatStopWatch(); + assertFalse(monitor.shouldAbortHeartbeatCheck(0)); + // sleep shorter than recheck and verify shouldn't abort + Thread.sleep(100); + assertFalse(monitor.shouldAbortHeartbeatCheck(0)); + // sleep longer than recheck and verify should abort unless ignore delay + Thread.sleep(recheck); + assertTrue(monitor.shouldAbortHeartbeatCheck(0)); + assertFalse(monitor.shouldAbortHeartbeatCheck(-recheck * 3)); + // ensure it resets properly + monitor.restartHeartbeatStopWatch(); + assertFalse(monitor.shouldAbortHeartbeatCheck(0)); } }