Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@

private volatile long heartbeatIntervalSeconds;
private volatile int heartbeatRecheckInterval;
/** Used by {@link HeartbeatManager}. */
private volatile long heartbeatRecheckIntervalForMonitor;
/**
* Stores the datanode -> block map.
* <p>
Expand Down Expand Up @@ -346,6 +348,7 @@
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY,
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT);
this.staleInterval = getStaleIntervalFromConf(conf, heartbeatExpireInterval);
refreshHeartbeatRecheckIntervalForMonitor();
this.ratioUseStaleDataNodesForWrite = conf.getFloat(
DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY,
DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_DEFAULT);
Expand Down Expand Up @@ -2139,7 +2142,7 @@
}
}

public void setShouldSendCachingCommands(boolean shouldSendCachingCommands) {

Check failure on line 2145 in hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java

View check run for this annotation

ASF Cloudbees Jenkins ci-hadoop / Apache Yetus

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java#L2145

javadoc: warning: no comment

Check failure on line 2145 in hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java

View check run for this annotation

ASF Cloudbees Jenkins ci-hadoop / Apache Yetus

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java#L2145

javadoc: warning: no comment
this.shouldSendCachingCommands = shouldSendCachingCommands;
}

Expand Down Expand Up @@ -2210,6 +2213,19 @@
this.heartbeatExpireInterval = 2L * recheckInterval + 10 * 1000
* intervalSeconds;
this.blockInvalidateLimit = getBlockInvalidateLimit(blockInvalidateLimit);
refreshHeartbeatRecheckIntervalForMonitor();
}

@VisibleForTesting
public void refreshHeartbeatRecheckIntervalForMonitor() {
if (avoidStaleDataNodesForWrite && staleInterval < heartbeatRecheckInterval) {
heartbeatRecheckIntervalForMonitor = staleInterval;
LOG.info("Setting heartbeat recheck interval to " + staleInterval
+ " since " + DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY
+ " is less than " + heartbeatRecheckInterval);
} else {
heartbeatRecheckIntervalForMonitor = heartbeatRecheckInterval;
}
}

private int getBlockInvalidateLimitFromHBInterval() {
Expand Down Expand Up @@ -2337,7 +2353,7 @@
return datanodeMap;
}

public void setMaxSlowPeersToReport(int maxSlowPeersToReport) {

Check failure on line 2356 in hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java

View check run for this annotation

ASF Cloudbees Jenkins ci-hadoop / Apache Yetus

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java#L2356

javadoc: warning: no comment

Check failure on line 2356 in hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java

View check run for this annotation

ASF Cloudbees Jenkins ci-hadoop / Apache Yetus

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java#L2356

javadoc: warning: no comment
Preconditions.checkNotNull(slowPeerTracker, "slowPeerTracker should not be un-assigned");
slowPeerTracker.setMaxSlowPeersToReport(maxSlowPeersToReport);
}
Expand All @@ -2351,4 +2367,8 @@
public long getSlowPeerCollectionInterval() {
return slowPeerCollectionInterval;
}

public long getHeartbeatRecheckIntervalForMonitor() {
return heartbeatRecheckIntervalForMonitor;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,6 @@ class HeartbeatManager implements DatanodeStatistics {
/** Statistics, which are synchronized by the heartbeat manager lock. */
private final DatanodeStats stats = new DatanodeStats();

/** The time period to check for expired datanodes. */
private final long heartbeatRecheckInterval;
/** Heartbeat monitor thread. */
private final Daemon heartbeatThread = new Daemon(new Monitor());
private final StopWatch heartbeatStopWatch = new StopWatch();
Expand All @@ -86,31 +84,12 @@ class HeartbeatManager implements DatanodeStatistics {
final BlockManager blockManager, final Configuration conf) {
this.namesystem = namesystem;
this.blockManager = blockManager;
boolean avoidStaleDataNodesForWrite = conf.getBoolean(
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY,
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT);
long recheckInterval = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 min
long staleInterval = conf.getLong(
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT);// 30s
enableLogStaleNodes = conf.getBoolean(
DFSConfigKeys.DFS_NAMENODE_ENABLE_LOG_STALE_DATANODE_KEY,
DFSConfigKeys.DFS_NAMENODE_ENABLE_LOG_STALE_DATANODE_DEFAULT);
this.numOfDeadDatanodesRemove = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_REMOVE_DEAD_DATANODE_BATCHNUM_KEY,
DFSConfigKeys.DFS_NAMENODE_REMOVE_BAD_BATCH_NUM_DEFAULT);

if (avoidStaleDataNodesForWrite && staleInterval < recheckInterval) {
this.heartbeatRecheckInterval = staleInterval;
LOG.info("Setting heartbeat recheck interval to " + staleInterval
+ " since " + DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY
+ " is less than "
+ DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY);
} else {
this.heartbeatRecheckInterval = recheckInterval;
}
}

void activate() {
Expand Down Expand Up @@ -355,7 +334,8 @@ void restartHeartbeatStopWatch() {
@VisibleForTesting
boolean shouldAbortHeartbeatCheck(long offset) {
long elapsed = heartbeatStopWatch.now(TimeUnit.MILLISECONDS);
return elapsed + offset > heartbeatRecheckInterval;
return elapsed + offset > blockManager.getDatanodeManager()
.getHeartbeatRecheckIntervalForMonitor();
}

/**
Expand Down Expand Up @@ -545,7 +525,8 @@ public void run() {
restartHeartbeatStopWatch();
try {
final long now = Time.monotonicNow();
if (lastHeartbeatCheck + heartbeatRecheckInterval < now) {
if (lastHeartbeatCheck + blockManager.getDatanodeManager()
.getHeartbeatRecheckIntervalForMonitor() < now) {
heartbeatCheck();
lastHeartbeatCheck = now;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,24 +259,23 @@ public void testHeartbeatBlockRecovery() throws Exception {

@Test
public void testHeartbeatStopWatch() throws Exception {
Namesystem ns = Mockito.mock(Namesystem.class);
BlockManager bm = Mockito.mock(BlockManager.class);
Configuration conf = new Configuration();
long recheck = 2000;
conf.setLong(
DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, recheck);
HeartbeatManager monitor = new HeartbeatManager(ns, bm, conf);
monitor.restartHeartbeatStopWatch();
assertFalse(monitor.shouldAbortHeartbeatCheck(0));
// sleep shorter than recheck and verify shouldn't abort
Thread.sleep(100);
assertFalse(monitor.shouldAbortHeartbeatCheck(0));
// sleep longer than recheck and verify should abort unless ignore delay
Thread.sleep(recheck);
assertTrue(monitor.shouldAbortHeartbeatCheck(0));
assertFalse(monitor.shouldAbortHeartbeatCheck(-recheck*3));
// ensure it resets properly
monitor.restartHeartbeatStopWatch();
assertFalse(monitor.shouldAbortHeartbeatCheck(0));
Namesystem ns = Mockito.mock(Namesystem.class);
Configuration conf = new Configuration();
long recheck = 2000;
conf.setLong(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, recheck);
BlockManager bm = new BlockManager(ns, false, conf);
HeartbeatManager monitor = new HeartbeatManager(ns, bm, conf);
monitor.restartHeartbeatStopWatch();
assertFalse(monitor.shouldAbortHeartbeatCheck(0));
// sleep shorter than recheck and verify shouldn't abort
Thread.sleep(100);
assertFalse(monitor.shouldAbortHeartbeatCheck(0));
// sleep longer than recheck and verify should abort unless ignore delay
Thread.sleep(recheck);
assertTrue(monitor.shouldAbortHeartbeatCheck(0));
assertFalse(monitor.shouldAbortHeartbeatCheck(-recheck * 3));
// ensure it resets properly
monitor.restartHeartbeatStopWatch();
assertFalse(monitor.shouldAbortHeartbeatCheck(0));
}
}