Skip to content

Commit c788699

Browse files
authored
fix: BlockBufferService forceful BN Switch (#21803)
Signed-off-by: Derek Riley <[email protected]>
1 parent 6e228a8 commit c788699

File tree

7 files changed

+88
-151
lines changed

7 files changed

+88
-151
lines changed

hedera-node/docs/design/app/blocks/BlockNodeConnectionManager.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,12 @@ Used when retrying the same block node after transient issues:
162162
- **Reset**: Retry count resets if no retry occurs within `protocolExpBackoffTimeframeReset` duration
163163
- **Behavior**: Connection retries the same node without selecting a new one
164164

165+
#### Forced Connection Switch Retry Delay
166+
167+
When another block node should be selected and forced to become active, the previous active connection
168+
is closed and scheduled for retry after a fixed delay of 180s (`blockNode.forcedSwitchRescheduleDelay`).
169+
This may happen when the block buffer saturation action stage is triggered and the manager force switches to a different node.
170+
165171
#### Retry State Management
166172

167173
- `RetryState` tracks retry attempts and last retry time per node configuration

hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -348,7 +348,7 @@ private void performStreamReset() {
348348
if (getConnectionState() == ConnectionState.ACTIVE) {
349349
logger.info("{} Performing scheduled stream reset.", this);
350350
endTheStreamWith(RESET);
351-
blockNodeConnectionManager.connectionResetsTheStream(this);
351+
blockNodeConnectionManager.selectNewBlockNodeForStreaming(false);
352352
}
353353
}
354354

@@ -769,6 +769,7 @@ public void close(final boolean callOnComplete) {
769769
}
770770
blockStreamMetrics.recordConnectionClosed();
771771
blockStreamMetrics.recordActiveConnectionIp(-1L);
772+
blockNodeConnectionManager.notifyConnectionClosed(this);
772773
// regardless of outcome, mark the connection as closed
773774
updateConnectionState(ConnectionState.CLOSED);
774775
}
@@ -1029,7 +1030,6 @@ private void doWork() {
10291030
newRequestBytes,
10301031
MAX_BYTES_PER_REQUEST);
10311032
endTheStreamWith(EndStream.Code.ERROR);
1032-
blockNodeConnectionManager.connectionResetsTheStream(BlockNodeConnection.this);
10331033
break;
10341034
}
10351035
} else {

hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java

Lines changed: 38 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -301,10 +301,7 @@ private void handleConnectionCleanupAndReschedule(
301301
@Nullable final Duration delay,
302302
@Nullable final Long blockNumber,
303303
final boolean selectNewBlockNode) {
304-
// Remove from connections map and clear active reference
305-
removeConnectionAndClearActive(connection);
306-
307-
final long delayMs;
304+
long delayMs;
308305
// Get or create the retry attempt for this node
309306
final RetryState retryState = retryStates.computeIfAbsent(connection.getNodeConfig(), k -> new RetryState());
310307
final int retryAttempt;
@@ -335,34 +332,6 @@ private void handleConnectionCleanupAndReschedule(
335332
}
336333
}
337334

338-
/**
339-
* Connection initiated a reset of the stream
340-
* @param connection the connection that initiated the reset of the stream
341-
*/
342-
public void connectionResetsTheStream(@NonNull final BlockNodeConnection connection) {
343-
if (!isStreamingEnabled()) {
344-
return;
345-
}
346-
requireNonNull(connection);
347-
348-
removeConnectionAndClearActive(connection);
349-
350-
// Immediately try to find and connect to the next available node
351-
selectNewBlockNodeForStreaming(false);
352-
}
353-
354-
/**
355-
* Removes a connection from the connections map and clears the active reference if this was the active connection.
356-
* This is a utility method to ensure consistent cleanup behavior.
357-
*
358-
* @param connection the connection to remove and clean up
359-
*/
360-
private void removeConnectionAndClearActive(@NonNull final BlockNodeConnection connection) {
361-
requireNonNull(connection);
362-
connections.remove(connection.getNodeConfig(), connection);
363-
activeConnectionRef.compareAndSet(connection, null);
364-
}
365-
366335
private void scheduleConnectionAttempt(
367336
@NonNull final BlockNodeConfig blockNodeConfig,
368337
@NonNull final Duration initialDelay,
@@ -388,7 +357,6 @@ private void scheduleConnectionAttempt(
388357
logger.debug("{} Successfully scheduled reconnection task.", newConnection);
389358
} catch (final Exception e) {
390359
logger.error("{} Failed to schedule connection task for block node.", newConnection, e);
391-
connections.remove(newConnection.getNodeConfig());
392360
newConnection.close(true);
393361
}
394362
}
@@ -799,6 +767,22 @@ public void run() {
799767
try {
800768
logger.debug("{} Closing current active connection {}.", connection, activeConnection);
801769
activeConnection.close(true);
770+
// For a forced switch, reschedule the previously active connection to try again later
771+
if (force) {
772+
try {
773+
final Duration delay = getForcedSwitchRescheduleDelay();
774+
scheduleConnectionAttempt(activeConnection.getNodeConfig(), delay, null, false);
775+
logger.debug(
776+
"Scheduled previously active connection {} in {} ms due to forced switch.",
777+
activeConnection,
778+
delay.toMillis());
779+
} catch (final Exception e) {
780+
logger.error(
781+
"Failed to schedule reschedule for previous active connection after forced switch.",
782+
e);
783+
connections.remove(activeConnection.getNodeConfig());
784+
}
785+
}
802786
} catch (final RuntimeException e) {
803787
logger.info(
804788
"Failed to shutdown current active connection {} (shutdown reason: another connection was elevated to active).",
@@ -810,6 +794,7 @@ public void run() {
810794
logger.debug("{} Failed to establish connection to block node. Will schedule a retry.", connection);
811795
blockStreamMetrics.recordConnectionCreateFailure();
812796
reschedule();
797+
selectNewBlockNodeForStreaming(false);
813798
}
814799
}
815800

@@ -857,9 +842,6 @@ private void reschedule() {
857842
logger.info("{} Rescheduled connection attempt (delayMillis={}).", connection, jitteredDelayMs);
858843
} catch (final Exception e) {
859844
logger.error("{} Failed to reschedule connection attempt. Removing from retry map.", connection, e);
860-
// If rescheduling fails, close the connection and remove it from the connection map. A periodic task
861-
// will handle checking if there are no longer any connections
862-
connections.remove(connection.getNodeConfig());
863845
connection.close(true);
864846
}
865847
}
@@ -921,6 +903,13 @@ public Duration getEndOfStreamTimeframe() {
921903
.endOfStreamTimeFrame();
922904
}
923905

906+
private Duration getForcedSwitchRescheduleDelay() {
907+
return configProvider
908+
.getConfiguration()
909+
.getConfigData(BlockNodeConnectionConfig.class)
910+
.forcedSwitchRescheduleDelay();
911+
}
912+
924913
/**
925914
* Gets the maximum number of EndOfStream responses allowed before taking corrective action.
926915
*
@@ -1074,4 +1063,17 @@ public BlockNodeStats.HighLatencyResult recordBlockAckAndCheckLatency(
10741063

10751064
return result;
10761065
}
1066+
1067+
/**
1068+
* Notifies the connection manager that a connection has been closed.
1069+
* This allows the manager to update its internal state accordingly.
1070+
* @param connection the connection that has been closed
1071+
*/
1072+
public void notifyConnectionClosed(@NonNull final BlockNodeConnection connection) {
1073+
// Remove from active connection if it is the current active
1074+
activeConnectionRef.compareAndSet(connection, null);
1075+
1076+
// Remove from connections map
1077+
connections.remove(connection.getNodeConfig());
1078+
}
10771079
}

hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManagerTest.java

Lines changed: 1 addition & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -616,7 +616,7 @@ void testConnectionTask_higherPriorityConnectionExists_withForce() {
616616

617617
assertThat(activeConnectionRef).hasValue(newConnection);
618618

619-
verify(activeConnection).getNodeConfig();
619+
verify(activeConnection, times(2)).getNodeConfig();
620620
verify(activeConnection).close(true);
621621
verify(newConnection, times(2)).getNodeConfig();
622622
verify(newConnection).createRequestPipeline();
@@ -625,7 +625,6 @@ void testConnectionTask_higherPriorityConnectionExists_withForce() {
625625

626626
verifyNoMoreInteractions(activeConnection);
627627
verifyNoMoreInteractions(newConnection);
628-
verifyNoInteractions(executorService);
629628
verifyNoMoreInteractions(bufferService);
630629
verifyNoMoreInteractions(metrics);
631630
}
@@ -832,8 +831,6 @@ void testConnectionTask_reschedule_failure() {
832831

833832
task.run();
834833

835-
assertThat(connections).isEmpty(); // connection should be removed
836-
837834
verify(connection).createRequestPipeline();
838835
verify(executorService).schedule(eq(task), anyLong(), eq(TimeUnit.MILLISECONDS));
839836
verify(connection, atLeast(1)).getNodeConfig();
@@ -920,40 +917,6 @@ void testConstructor_configFileNotFound() {
920917
assertThat(availableNodes).isEmpty();
921918
}
922919

923-
@Test
924-
void testRestartConnection() {
925-
final BlockNodeConnection connection = mock(BlockNodeConnection.class);
926-
final BlockNodeConfig nodeConfig = newBlockNodeConfig(PBJ_UNIT_TEST_HOST, 8080, 1);
927-
doReturn(nodeConfig).when(connection).getNodeConfig();
928-
929-
// Add the connection to the connections map and set it as active
930-
final Map<BlockNodeConfig, BlockNodeConnection> connections = connections();
931-
final AtomicReference<BlockNodeConnection> activeConnectionRef = activeConnection();
932-
connections.put(nodeConfig, connection);
933-
activeConnectionRef.set(connection);
934-
935-
// Ensure the node config is available for selection
936-
final List<BlockNodeConfig> availableNodes = availableNodes();
937-
availableNodes.clear();
938-
availableNodes.add(nodeConfig);
939-
940-
connectionManager.connectionResetsTheStream(connection);
941-
942-
// Verify the active connection reference was cleared
943-
assertThat(activeConnectionRef).hasNullValue();
944-
// Verify a new connection was created and added to the connections map
945-
assertThat(connections).containsKey(nodeConfig);
946-
// Verify it's a different connection object (the old one was replaced)
947-
assertThat(connections.get(nodeConfig)).isNotSameAs(connection);
948-
949-
// Verify that scheduleConnectionAttempt was called with Duration.ZERO and the block number
950-
verify(executorService).schedule(any(BlockNodeConnectionTask.class), eq(0L), eq(TimeUnit.MILLISECONDS));
951-
verifyNoMoreInteractions(connection);
952-
verifyNoInteractions(bufferService);
953-
verifyNoInteractions(metrics);
954-
verifyNoMoreInteractions(executorService);
955-
}
956-
957920
@Test
958921
void testRescheduleConnection_singleBlockNode() {
959922
// selectNewBlockNodeForStreaming should NOT be called
@@ -987,19 +950,6 @@ void testRescheduleConnection_singleBlockNode() {
987950
.schedule(any(BlockNodeConnectionTask.class), eq(5000L), eq(TimeUnit.MILLISECONDS));
988951
}
989952

990-
@Test
991-
void testConnectionResetsTheStream_streamingDisabled() {
992-
useStreamingDisabledManager();
993-
final BlockNodeConnection connection = mock(BlockNodeConnection.class);
994-
995-
connectionManager.connectionResetsTheStream(connection);
996-
997-
verifyNoInteractions(connection);
998-
verifyNoInteractions(bufferService);
999-
verifyNoInteractions(executorService);
1000-
verifyNoInteractions(metrics);
1001-
}
1002-
1003953
@Test
1004954
void testStart_streamingDisabled() {
1005955
useStreamingDisabledManager();
@@ -1096,36 +1046,6 @@ void testRecordEndOfStreamAndCheckLimit_streamingDisabled() {
10961046
assertThat(limitExceeded).isFalse();
10971047
}
10981048

1099-
@Test
1100-
void testConnectionResetsTheStream() {
1101-
final BlockNodeConnection connection = mock(BlockNodeConnection.class);
1102-
final BlockNodeConfig nodeConfig = newBlockNodeConfig(8080, 1);
1103-
doReturn(nodeConfig).when(connection).getNodeConfig();
1104-
availableNodes().add(nodeConfig);
1105-
1106-
// Add the connection to the connections map and set it as active
1107-
final Map<BlockNodeConfig, BlockNodeConnection> connections = connections();
1108-
final AtomicReference<BlockNodeConnection> activeConnectionRef = activeConnection();
1109-
connections.put(nodeConfig, connection);
1110-
activeConnectionRef.set(connection);
1111-
1112-
connectionManager.connectionResetsTheStream(connection);
1113-
1114-
// Verify the active connection reference was cleared
1115-
assertThat(activeConnectionRef).hasNullValue();
1116-
// Verify a new connection was created and added to the connections map
1117-
assertThat(connections).containsKey(nodeConfig);
1118-
// Verify it's a different connection object (the old one was replaced)
1119-
assertThat(connections.get(nodeConfig)).isNotSameAs(connection);
1120-
1121-
// Verify that selectNewBlockNodeForStreaming was called
1122-
verify(executorService).schedule(any(BlockNodeConnectionTask.class), eq(0L), eq(TimeUnit.MILLISECONDS));
1123-
verifyNoMoreInteractions(connection);
1124-
verifyNoInteractions(bufferService);
1125-
verifyNoInteractions(metrics);
1126-
verifyNoMoreInteractions(executorService);
1127-
}
1128-
11291049
@Test
11301050
void testRecordEndOfStreamAndCheckLimit_withinLimit() {
11311051
final BlockNodeConfig nodeConfig = newBlockNodeConfig(PBJ_UNIT_TEST_HOST, 8080, 1);

0 commit comments

Comments
 (0)