diff --git a/hedera-node/docs/design/app/blocks/BlockNodeConnectionManager.md b/hedera-node/docs/design/app/blocks/BlockNodeConnectionManager.md index 227628da0118..9b01bb656ef2 100644 --- a/hedera-node/docs/design/app/blocks/BlockNodeConnectionManager.md +++ b/hedera-node/docs/design/app/blocks/BlockNodeConnectionManager.md @@ -162,6 +162,12 @@ Used when retrying the same block node after transient issues: - **Reset**: Retry count resets if no retry occurs within `protocolExpBackoffTimeframeReset` duration - **Behavior**: Connection retries the same node without selecting a new one +#### Forced Connection Switch Retry Delay + +When another block node should be selected and forced to become active, the previous active connection +is closed and scheduled for retry after a fixed delay of 180s (`blockNode.forcedSwitchRescheduleDelay`). +This may happen when the block buffer saturation action stage is triggered and the manager force switches to a different node. + #### Retry State Management - `RetryState` tracks retry attempts and last retry time per node configuration diff --git a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java index 9f94a70b6498..027b07a6707c 100644 --- a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java +++ b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java @@ -346,7 +346,7 @@ private void performStreamReset() { if (getConnectionState() == ConnectionState.ACTIVE) { logger.info("{} Performing scheduled stream reset.", this); endTheStreamWith(RESET); - blockNodeConnectionManager.connectionResetsTheStream(this); + blockNodeConnectionManager.selectNewBlockNodeForStreaming(false); } } @@ -753,6 +753,7 @@ public void close(final boolean callOnComplete) { } blockStreamMetrics.recordConnectionClosed(); blockStreamMetrics.recordActiveConnectionIp(-1L); + blockNodeConnectionManager.notifyConnectionClosed(this); // regardless of outcome, mark the connection as closed updateConnectionState(ConnectionState.CLOSED); } @@ -1012,7 +1013,6 @@ private void doWork() { newRequestBytes, MAX_BYTES_PER_REQUEST); endTheStreamWith(EndStream.Code.ERROR); - blockNodeConnectionManager.connectionResetsTheStream(BlockNodeConnection.this); break; } } else { diff --git a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java index e3cf65b6f5cc..3b6415b6d302 100644 --- a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java +++ b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java @@ -301,10 +301,7 @@ private void handleConnectionCleanupAndReschedule( @Nullable final Duration delay, @Nullable final Long blockNumber, final boolean selectNewBlockNode) { - // Remove from connections map and clear active reference - removeConnectionAndClearActive(connection); - - final long delayMs; + long delayMs; // Get or create the retry attempt for this node final RetryState retryState = retryStates.computeIfAbsent(connection.getNodeConfig(), k -> new RetryState()); final int retryAttempt; @@ -335,34 +332,6 @@ private void handleConnectionCleanupAndReschedule( } } - /** - * Connection initiated a reset of the stream - * @param connection the connection that initiated the reset of the stream - */ - public void connectionResetsTheStream(@NonNull final BlockNodeConnection connection) { - if (!isStreamingEnabled()) { - return; - } - requireNonNull(connection); - - removeConnectionAndClearActive(connection); - - // Immediately try to find and connect to the next available node - selectNewBlockNodeForStreaming(false); - } - - /** - * Removes a connection from the connections map and clears the active reference if this was the active connection. - * This is a utility method to ensure consistent cleanup behavior. - * - * @param connection the connection to remove and clean up - */ - private void removeConnectionAndClearActive(@NonNull final BlockNodeConnection connection) { - requireNonNull(connection); - connections.remove(connection.getNodeConfig(), connection); - activeConnectionRef.compareAndSet(connection, null); - } - private void scheduleConnectionAttempt( @NonNull final BlockNodeConfig blockNodeConfig, @NonNull final Duration initialDelay, @@ -388,7 +357,6 @@ private void scheduleConnectionAttempt( logger.debug("{} Successfully scheduled reconnection task.", newConnection); } catch (final Exception e) { logger.error("{} Failed to schedule connection task for block node.", newConnection, e); - connections.remove(newConnection.getNodeConfig()); newConnection.close(true); } } @@ -799,6 +767,22 @@ public void run() { try { logger.debug("{} Closing current active connection {}.", connection, activeConnection); activeConnection.close(true); + // For a forced switch, reschedule the previously active connection to try again later + if (force) { + try { + final Duration delay = getForcedSwitchRescheduleDelay(); + scheduleConnectionAttempt(activeConnection.getNodeConfig(), delay, null, false); + logger.debug( + "Scheduled previously active connection {} in {} ms due to forced switch.", + activeConnection, + delay.toMillis()); + } catch (final Exception e) { + logger.error( + "Failed to schedule reschedule for previous active connection after forced switch.", + e); + connections.remove(activeConnection.getNodeConfig()); + } + } } catch (final RuntimeException e) { logger.info( "Failed to shutdown current active connection {} (shutdown reason: another connection was elevated to active).", @@ -810,6 +794,7 @@ public void run() { logger.debug("{} Failed to establish connection to block node. Will schedule a retry.", connection); blockStreamMetrics.recordConnectionCreateFailure(); reschedule(); + selectNewBlockNodeForStreaming(false); } } @@ -857,9 +842,6 @@ private void reschedule() { logger.info("{} Rescheduled connection attempt (delayMillis={}).", connection, jitteredDelayMs); } catch (final Exception e) { logger.error("{} Failed to reschedule connection attempt. Removing from retry map.", connection, e); - // If rescheduling fails, close the connection and remove it from the connection map. A periodic task - // will handle checking if there are no longer any connections - connections.remove(connection.getNodeConfig()); connection.close(true); } } @@ -921,6 +903,13 @@ public Duration getEndOfStreamTimeframe() { .endOfStreamTimeFrame(); } + private Duration getForcedSwitchRescheduleDelay() { + return configProvider + .getConfiguration() + .getConfigData(BlockNodeConnectionConfig.class) + .forcedSwitchRescheduleDelay(); + } + /** * Gets the maximum number of EndOfStream responses allowed before taking corrective action. * @@ -1074,4 +1063,17 @@ public BlockNodeStats.HighLatencyResult recordBlockAckAndCheckLatency( return result; } + + /** + * Notifies the connection manager that a connection has been closed. + * This allows the manager to update its internal state accordingly. + * @param connection the connection that has been closed + */ + public void notifyConnectionClosed(@NonNull final BlockNodeConnection connection) { + // Remove from active connection if it is the current active + activeConnectionRef.compareAndSet(connection, null); + + // Remove from connections map + connections.remove(connection.getNodeConfig()); + } } diff --git a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManagerTest.java b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManagerTest.java index 19857282471c..33eed6484def 100644 --- a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManagerTest.java +++ b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManagerTest.java @@ -616,7 +616,7 @@ void testConnectionTask_higherPriorityConnectionExists_withForce() { assertThat(activeConnectionRef).hasValue(newConnection); - verify(activeConnection).getNodeConfig(); + verify(activeConnection, times(2)).getNodeConfig(); verify(activeConnection).close(true); verify(newConnection, times(2)).getNodeConfig(); verify(newConnection).createRequestPipeline(); @@ -625,7 +625,6 @@ void testConnectionTask_higherPriorityConnectionExists_withForce() { verifyNoMoreInteractions(activeConnection); verifyNoMoreInteractions(newConnection); - verifyNoInteractions(executorService); verifyNoMoreInteractions(bufferService); verifyNoMoreInteractions(metrics); } @@ -832,8 +831,6 @@ void testConnectionTask_reschedule_failure() { task.run(); - assertThat(connections).isEmpty(); // connection should be removed - verify(connection).createRequestPipeline(); verify(executorService).schedule(eq(task), anyLong(), eq(TimeUnit.MILLISECONDS)); verify(connection, atLeast(1)).getNodeConfig(); @@ -920,40 +917,6 @@ void testConstructor_configFileNotFound() { assertThat(availableNodes).isEmpty(); } - @Test - void testRestartConnection() { - final BlockNodeConnection connection = mock(BlockNodeConnection.class); - final BlockNodeConfig nodeConfig = newBlockNodeConfig(PBJ_UNIT_TEST_HOST, 8080, 1); - doReturn(nodeConfig).when(connection).getNodeConfig(); - - // Add the connection to the connections map and set it as active - final Map connections = connections(); - final AtomicReference activeConnectionRef = activeConnection(); - connections.put(nodeConfig, connection); - activeConnectionRef.set(connection); - - // Ensure the node config is available for selection - final List availableNodes = availableNodes(); - availableNodes.clear(); - availableNodes.add(nodeConfig); - - connectionManager.connectionResetsTheStream(connection); - - // Verify the active connection reference was cleared - assertThat(activeConnectionRef).hasNullValue(); - // Verify a new connection was created and added to the connections map - assertThat(connections).containsKey(nodeConfig); - // Verify it's a different connection object (the old one was replaced) - assertThat(connections.get(nodeConfig)).isNotSameAs(connection); - - // Verify that scheduleConnectionAttempt was called with Duration.ZERO and the block number - verify(executorService).schedule(any(BlockNodeConnectionTask.class), eq(0L), eq(TimeUnit.MILLISECONDS)); - verifyNoMoreInteractions(connection); - verifyNoInteractions(bufferService); - verifyNoInteractions(metrics); - verifyNoMoreInteractions(executorService); - } - @Test void testRescheduleConnection_singleBlockNode() { // selectNewBlockNodeForStreaming should NOT be called @@ -987,19 +950,6 @@ void testRescheduleConnection_singleBlockNode() { .schedule(any(BlockNodeConnectionTask.class), eq(5000L), eq(TimeUnit.MILLISECONDS)); } - @Test - void testConnectionResetsTheStream_streamingDisabled() { - useStreamingDisabledManager(); - final BlockNodeConnection connection = mock(BlockNodeConnection.class); - - connectionManager.connectionResetsTheStream(connection); - - verifyNoInteractions(connection); - verifyNoInteractions(bufferService); - verifyNoInteractions(executorService); - verifyNoInteractions(metrics); - } - @Test void testStart_streamingDisabled() { useStreamingDisabledManager(); @@ -1096,36 +1046,6 @@ void testRecordEndOfStreamAndCheckLimit_streamingDisabled() { assertThat(limitExceeded).isFalse(); } - @Test - void testConnectionResetsTheStream() { - final BlockNodeConnection connection = mock(BlockNodeConnection.class); - final BlockNodeConfig nodeConfig = newBlockNodeConfig(8080, 1); - doReturn(nodeConfig).when(connection).getNodeConfig(); - availableNodes().add(nodeConfig); - - // Add the connection to the connections map and set it as active - final Map connections = connections(); - final AtomicReference activeConnectionRef = activeConnection(); - connections.put(nodeConfig, connection); - activeConnectionRef.set(connection); - - connectionManager.connectionResetsTheStream(connection); - - // Verify the active connection reference was cleared - assertThat(activeConnectionRef).hasNullValue(); - // Verify a new connection was created and added to the connections map - assertThat(connections).containsKey(nodeConfig); - // Verify it's a different connection object (the old one was replaced) - assertThat(connections.get(nodeConfig)).isNotSameAs(connection); - - // Verify that selectNewBlockNodeForStreaming was called - verify(executorService).schedule(any(BlockNodeConnectionTask.class), eq(0L), eq(TimeUnit.MILLISECONDS)); - verifyNoMoreInteractions(connection); - verifyNoInteractions(bufferService); - verifyNoInteractions(metrics); - verifyNoMoreInteractions(executorService); - } - @Test void testRecordEndOfStreamAndCheckLimit_withinLimit() { final BlockNodeConfig nodeConfig = newBlockNodeConfig(PBJ_UNIT_TEST_HOST, 8080, 1); diff --git a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java index 28300d7fdb22..cbabd0fbfff3 100644 --- a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java +++ b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java @@ -135,8 +135,6 @@ void beforeEach() { final AtomicReference workerThreadRef = workerThreadRef(); workerThreadRef.set(FAKE_WORKER_THREAD); - // resetMocks(); - lenient().doReturn(requestPipeline).when(grpcServiceClient).publishBlockStream(connection); } @@ -246,7 +244,6 @@ void testHandleStreamError() { verify(requestPipeline).onComplete(); verify(connectionManager).rescheduleConnection(connection, Duration.ofSeconds(30), null, true); verifyNoMoreInteractions(requestPipeline); - verifyNoMoreInteractions(connectionManager); } @Test @@ -615,7 +612,6 @@ void testOnNext_resendBlock_blockDoesNotExist() { verify(bufferService).getBlockState(10L); verifyNoMoreInteractions(metrics); verifyNoMoreInteractions(requestPipeline); - verifyNoMoreInteractions(connectionManager); verifyNoMoreInteractions(bufferService); } @@ -753,7 +749,6 @@ void testClose() { verify(requestPipeline).onComplete(); verifyNoMoreInteractions(metrics); verifyNoMoreInteractions(requestPipeline); - verifyNoMoreInteractions(connectionManager); verifyNoInteractions(bufferService); } @@ -779,7 +774,6 @@ void testClose_failure() { verify(metrics).recordActiveConnectionIp(-1L); verifyNoMoreInteractions(metrics); verifyNoMoreInteractions(requestPipeline); - verifyNoMoreInteractions(connectionManager); verifyNoInteractions(bufferService); } @@ -797,7 +791,6 @@ void testClose_withoutOnComplete() { verify(metrics).recordConnectionClosed(); verify(metrics).recordActiveConnectionIp(-1L); verifyNoMoreInteractions(metrics); - verifyNoMoreInteractions(connectionManager); verifyNoInteractions(bufferService); verifyNoInteractions(requestPipeline); } @@ -817,7 +810,6 @@ void testClose_notActiveState() { verify(metrics).recordConnectionClosed(); verify(metrics).recordActiveConnectionIp(-1L); verifyNoMoreInteractions(metrics); - verifyNoMoreInteractions(connectionManager); verifyNoMoreInteractions(requestPipeline); verifyNoInteractions(bufferService); } @@ -881,7 +873,6 @@ void testClose_pipelineNull() { verify(metrics).recordConnectionClosed(); verify(metrics).recordActiveConnectionIp(-1L); verifyNoMoreInteractions(metrics); - verifyNoMoreInteractions(connectionManager); verifyNoInteractions(bufferService); } @@ -962,7 +953,6 @@ void testOnError_activeConnection() { verify(connectionManager).rescheduleConnection(connection, Duration.ofSeconds(30), null, true); verifyNoMoreInteractions(metrics); verifyNoMoreInteractions(requestPipeline); - verifyNoMoreInteractions(connectionManager); verifyNoInteractions(bufferService); } @@ -1057,7 +1047,6 @@ void testOnCompleted_streamClosingNotInProgress() { verify(metrics).recordActiveConnectionIp(-1L); verifyNoMoreInteractions(metrics); verifyNoMoreInteractions(requestPipeline); - verifyNoMoreInteractions(connectionManager); verifyNoInteractions(bufferService); } @@ -1382,11 +1371,9 @@ void testConnectionWorker_hugeItem() throws Exception { verify(requestPipeline).onComplete(); verify(bufferService).getEarliestAvailableBlockNumber(); verify(bufferService).getHighestAckedBlockNumber(); - verify(connectionManager).connectionResetsTheStream(connection); verifyNoMoreInteractions(metrics); verifyNoMoreInteractions(requestPipeline); verifyNoMoreInteractions(bufferService); - verifyNoMoreInteractions(connectionManager); } // Tests that no response processing occurs when connection is already closed @@ -1491,7 +1478,6 @@ void testHandleStreamFailureWithoutOnComplete() { // Should not call onComplete on the pipeline verify(connectionManager).rescheduleConnection(connection, Duration.ofSeconds(30), null, true); verifyNoInteractions(requestPipeline); - verifyNoMoreInteractions(connectionManager); } // Tests that error handling is skipped when connection is already closed @@ -1525,7 +1511,6 @@ void testOnError_connectionPending() { // Should call onComplete when callOnComplete=true (from handleStreamFailure) verify(requestPipeline).onComplete(); verifyNoMoreInteractions(metrics); - verifyNoMoreInteractions(connectionManager); verifyNoMoreInteractions(requestPipeline); } @@ -1546,7 +1531,6 @@ void testOnError_connectionUninitialized() { verify(metrics).recordActiveConnectionIp(-1L); verifyNoMoreInteractions(metrics); verifyNoInteractions(requestPipeline); - verifyNoMoreInteractions(connectionManager); verifyNoInteractions(bufferService); } @@ -1775,7 +1759,7 @@ void testPeriodicStreamReset() { // Verify reset behavior verify(bufferService).getEarliestAvailableBlockNumber(); verify(bufferService).getHighestAckedBlockNumber(); - verify(connectionManager).connectionResetsTheStream(connection); + verify(connectionManager).selectNewBlockNodeForStreaming(false); verify(requestPipeline).onNext(any(PublishStreamRequest.class)); verify(requestPipeline).onComplete(); diff --git a/hedera-node/hedera-config/src/main/java/com/hedera/node/config/data/BlockNodeConnectionConfig.java b/hedera-node/hedera-config/src/main/java/com/hedera/node/config/data/BlockNodeConnectionConfig.java index 5d4694d84ff5..f694796abdc6 100644 --- a/hedera-node/hedera-config/src/main/java/com/hedera/node/config/data/BlockNodeConnectionConfig.java +++ b/hedera-node/hedera-config/src/main/java/com/hedera/node/config/data/BlockNodeConnectionConfig.java @@ -23,6 +23,7 @@ * @param grpcOverallTimeout single timeout configuration for gRPC Client construction, connectTimeout, readTimeout and pollWaitTime * @param connectionWorkerSleepDuration the amount of time a connection worker will sleep between handling block items (should be less than {@link #maxRequestDelay}) * @param maxRequestDelay the maximum amount of time between sending a request to a block node + * @param forcedSwitchRescheduleDelay the delay to reschedule a closed active connection after a forced switch */ @ConfigData("blockNode") public record BlockNodeConnectionConfig( @@ -39,4 +40,5 @@ public record BlockNodeConnectionConfig( @ConfigProperty(defaultValue = "10s") @NodeProperty Duration maxBackoffDelay, @ConfigProperty(defaultValue = "30s") @NodeProperty Duration grpcOverallTimeout, @ConfigProperty(defaultValue = "25ms") @NetworkProperty Duration connectionWorkerSleepDuration, - @ConfigProperty(defaultValue = "200ms") @NetworkProperty Duration maxRequestDelay) {} + @ConfigProperty(defaultValue = "200ms") @NetworkProperty Duration maxRequestDelay, + @ConfigProperty(defaultValue = "180s") @NodeProperty Duration forcedSwitchRescheduleDelay) {} diff --git a/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/suites/blocknode/BlockNodeSuite.java b/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/suites/blocknode/BlockNodeSuite.java index 9f622cb89aed..e77c60607edb 100644 --- a/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/suites/blocknode/BlockNodeSuite.java +++ b/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/suites/blocknode/BlockNodeSuite.java @@ -439,8 +439,20 @@ final Stream node0StreamingBlockNodeConnectionDropsTrickle() { String.format( "/localhost:%s/CLOSED] Connection state transitioned from CLOSING to CLOSED.", portNumbers.get(3)))), - doingContextual( - spec -> LockSupport.parkNanos(Duration.ofSeconds(20).toNanos()))); + doingContextual(spec -> connectionDropTime.set(Instant.now())), + waitUntilNextBlocks(5), + blockNode(1).shutDownImmediately(), // Pri 1 + sourcingContextual(spec -> assertBlockNodeCommsLogContainsTimeframe( + byNodeId(0), + connectionDropTime::get, + Duration.ofMinutes(1), + Duration.ofSeconds(45), + String.format( + "/localhost:%s/PENDING] Connection state transitioned from UNINITIALIZED to PENDING.", + portNumbers.get(3)), + String.format( + "/localhost:%s/ACTIVE] Connection state transitioned from PENDING to ACTIVE.", + portNumbers.get(3))))); } @HapiTest @@ -484,6 +496,7 @@ final Stream twoNodesStreamingOneBlockNodeHappyPath() { blockNodePriorities = {0, 1}, applicationPropertiesOverrides = { "blockStream.buffer.blockTtl", "1m", + "blockNode.forcedSwitchRescheduleDelay", "30s", "blockStream.streamMode", "BLOCKS", "blockStream.writerMode", "FILE_AND_GRPC" }) @@ -491,31 +504,41 @@ final Stream twoNodesStreamingOneBlockNodeHappyPath() { @Order(6) final Stream testProactiveBlockBufferAction() { final AtomicReference timeRef = new AtomicReference<>(); + final List portNumbers = new ArrayList<>(); return hapiTest( + doingContextual(spec -> { + portNumbers.add(spec.getBlockNodePortById(0)); + portNumbers.add(spec.getBlockNodePortById(1)); + }), doingContextual( spec -> LockSupport.parkNanos(Duration.ofSeconds(5).toNanos())), doingContextual(spec -> timeRef.set(Instant.now())), blockNode(0).updateSendingBlockAcknowledgements(false), doingContextual( spec -> LockSupport.parkNanos(Duration.ofSeconds(5).toNanos())), - sourcingContextual( - spec -> assertBlockNodeCommsLogContainsTimeframe( - byNodeId(0), - timeRef::get, - Duration.ofMinutes(1), - Duration.ofMinutes(1), - // look for the saturation reaching the action stage (50%) - "saturation=50.0%", - // look for the log that shows we are forcing a reconnect to a different block node - "Attempting to forcefully switch block node connections due to increasing block buffer saturation")), - doingContextual(spec -> timeRef.set(Instant.now())), sourcingContextual(spec -> assertBlockNodeCommsLogContainsTimeframe( byNodeId(0), timeRef::get, Duration.ofMinutes(1), Duration.ofMinutes(1), + // look for the saturation reaching the action stage (50%) + "saturation=50.0%", + // look for the log that shows we are forcing a reconnect to a different block node + "Attempting to forcefully switch block node connections due to increasing block buffer saturation", + "/localhost:" + portNumbers.get(1) + + "/ACTIVE] Connection state transitioned from PENDING to ACTIVE.")), + blockNode(0).updateSendingBlockAcknowledgements(true), + doingContextual(spec -> timeRef.set(Instant.now())), + sourcingContextual(spec -> assertBlockNodeCommsLogContainsTimeframe( + byNodeId(0), + timeRef::get, + Duration.ofMinutes(2), + Duration.ofMinutes(2), // saturation should fall back to low levels after the reconnect to the different node - "saturation=0.0%"))); + // then we should see a switch back to higher priority node + "saturation=0.0%", + "/localhost:" + portNumbers.get(0) + + "/ACTIVE] Connection state transitioned from PENDING to ACTIVE."))); } @Disabled