Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ private void performStreamReset() {
if (getConnectionState() == ConnectionState.ACTIVE) {
logWithContext(logger, INFO, this, "Performing scheduled stream reset.");
endTheStreamWith(RESET);
blockNodeConnectionManager.connectionResetsTheStream(this);
blockNodeConnectionManager.selectNewBlockNodeForStreaming(false);
}
}

Expand Down Expand Up @@ -793,6 +793,7 @@ public void close(final boolean callOnComplete) {
}
blockStreamMetrics.recordConnectionClosed();
blockStreamMetrics.recordActiveConnectionIp(-1L);
blockNodeConnectionManager.notifyConnectionClosed(this);
// regardless of outcome, mark the connection as closed
updateConnectionState(ConnectionState.CLOSED);
}
Expand Down Expand Up @@ -1054,7 +1055,6 @@ private void doWork() {
newRequestBytes,
MAX_BYTES_PER_REQUEST);
endTheStreamWith(EndStream.Code.ERROR);
blockNodeConnectionManager.connectionResetsTheStream(BlockNodeConnection.this);
break;
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,10 +309,7 @@
@Nullable final Duration delay,
@Nullable final Long blockNumber,
final boolean selectNewBlockNode) {
// Remove from connections map and clear active reference
removeConnectionAndClearActive(connection);

final long delayMs;
long delayMs;
// Get or create the retry attempt for this node
final RetryState retryState = retryStates.computeIfAbsent(connection.getNodeConfig(), k -> new RetryState());
final int retryAttempt;
Expand Down Expand Up @@ -345,34 +342,6 @@
}
}

/**
* Connection initiated a reset of the stream
* @param connection the connection that initiated the reset of the stream
*/
public void connectionResetsTheStream(@NonNull final BlockNodeConnection connection) {
if (!isStreamingEnabled()) {
return;
}
requireNonNull(connection);

removeConnectionAndClearActive(connection);

// Immediately try to find and connect to the next available node
selectNewBlockNodeForStreaming(false);
}

/**
* Removes a connection from the connections map and clears the active reference if this was the active connection.
* This is a utility method to ensure consistent cleanup behavior.
*
* @param connection the connection to remove and clean up
*/
private void removeConnectionAndClearActive(@NonNull final BlockNodeConnection connection) {
requireNonNull(connection);
connections.remove(connection.getNodeConfig(), connection);
activeConnectionRef.compareAndSet(connection, null);
}

private void scheduleConnectionAttempt(
@NonNull final BlockNodeConfig blockNodeConfig,
@NonNull final Duration initialDelay,
Expand Down Expand Up @@ -404,7 +373,6 @@
logWithContext(logger, DEBUG, "Successfully scheduled reconnection task.", newConnection);
} catch (final Exception e) {
logger.error(formatLogMessage("Failed to schedule connection task for block node.", newConnection), e);
connections.remove(newConnection.getNodeConfig());
newConnection.close(true);
}
}
Expand Down Expand Up @@ -832,6 +800,22 @@
try {
logWithContext(DEBUG, "Closing current active connection {}.", activeConnection);
activeConnection.close(true);
// For a forced switch, reschedule the previously active connection to try again later
if (force) {
try {
final Duration delay = getForcedSwitchRescheduleDelay();
scheduleConnectionAttempt(activeConnection.getNodeConfig(), delay, null, false);
logWithContext(
DEBUG,
"Scheduled previously active connection in {} ms due to forced switch.",
delay.toMillis());
} catch (final Exception e) {
logger.error(

Check warning on line 813 in hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java

View check run for this annotation

Codecov / codecov/patch

hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java#L812-L813

Added lines #L812 - L813 were not covered by tests
"Failed to schedule reschedule for previous active connection after forced switch.",
e);
connections.remove(connection.getNodeConfig());

Check warning on line 816 in hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java

View check run for this annotation

Codecov / codecov/patch

hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java#L816

Added line #L816 was not covered by tests
}
}
} catch (final RuntimeException e) {
logger.info(
"Failed to shutdown current active connection {} (shutdown reason: another connection was elevated to active).",
Expand Down Expand Up @@ -890,9 +874,6 @@
logWithContext(INFO, "Rescheduled connection attempt (delayMillis={}).", jitteredDelayMs);
} catch (final Exception e) {
logger.error("Failed to reschedule connection attempt. Removing from retry map.", e);
// If rescheduling fails, close the connection and remove it from the connection map. A periodic task
// will handle checking if there are no longer any connections
connections.remove(connection.getNodeConfig());
connection.close(true);
}
}
Expand Down Expand Up @@ -954,6 +935,13 @@
.endOfStreamTimeFrame();
}

private Duration getForcedSwitchRescheduleDelay() {
return configProvider
.getConfiguration()
.getConfigData(BlockNodeConnectionConfig.class)
.forcedSwitchRescheduleDelay();
}

/**
* Gets the maximum number of EndOfStream responses allowed before taking corrective action.
*
Expand Down Expand Up @@ -1107,4 +1095,17 @@

return result;
}

/**
* Notifies the connection manager that a connection has been closed.
* This allows the manager to update its internal state accordingly.
* @param connection the connection that has been closed
*/
public void notifyConnectionClosed(final BlockNodeConnection connection) {
// Remove from active connection if it is the current active
activeConnectionRef.compareAndSet(connection, null);

// Remove from connections map
connections.remove(connection.getNodeConfig());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,7 @@ void testConnectionTask_higherPriorityConnectionExists_withForce() {

assertThat(activeConnectionRef).hasValue(newConnection);

verify(activeConnection).getNodeConfig();
verify(activeConnection, times(2)).getNodeConfig();
verify(activeConnection).close(true);
verify(newConnection, times(2)).getNodeConfig();
verify(newConnection).createRequestPipeline();
Expand All @@ -592,7 +592,6 @@ void testConnectionTask_higherPriorityConnectionExists_withForce() {

verifyNoMoreInteractions(activeConnection);
verifyNoMoreInteractions(newConnection);
verifyNoInteractions(executorService);
verifyNoMoreInteractions(bufferService);
verifyNoMoreInteractions(metrics);
}
Expand Down Expand Up @@ -799,8 +798,6 @@ void testConnectionTask_reschedule_failure() {

task.run();

assertThat(connections).isEmpty(); // connection should be removed

verify(connection).createRequestPipeline();
verify(executorService).schedule(eq(task), anyLong(), eq(TimeUnit.MILLISECONDS));
verify(connection, atLeast(1)).getNodeConfig();
Expand Down Expand Up @@ -887,40 +884,6 @@ void testConstructor_configFileNotFound() {
assertThat(availableNodes).isEmpty();
}

@Test
void testRestartConnection() {
final BlockNodeConnection connection = mock(BlockNodeConnection.class);
final BlockNodeConfig nodeConfig = newBlockNodeConfig(PBJ_UNIT_TEST_HOST, 8080, 1);
doReturn(nodeConfig).when(connection).getNodeConfig();

// Add the connection to the connections map and set it as active
final Map<BlockNodeConfig, BlockNodeConnection> connections = connections();
final AtomicReference<BlockNodeConnection> activeConnectionRef = activeConnection();
connections.put(nodeConfig, connection);
activeConnectionRef.set(connection);

// Ensure the node config is available for selection
final List<BlockNodeConfig> availableNodes = availableNodes();
availableNodes.clear();
availableNodes.add(nodeConfig);

connectionManager.connectionResetsTheStream(connection);

// Verify the active connection reference was cleared
assertThat(activeConnectionRef).hasNullValue();
// Verify a new connection was created and added to the connections map
assertThat(connections).containsKey(nodeConfig);
// Verify it's a different connection object (the old one was replaced)
assertThat(connections.get(nodeConfig)).isNotSameAs(connection);

// Verify that scheduleConnectionAttempt was called with Duration.ZERO and the block number
verify(executorService).schedule(any(BlockNodeConnectionTask.class), eq(0L), eq(TimeUnit.MILLISECONDS));
verifyNoMoreInteractions(connection);
verifyNoInteractions(bufferService);
verifyNoInteractions(metrics);
verifyNoMoreInteractions(executorService);
}

@Test
void testRescheduleConnection_singleBlockNode() {
// selectNewBlockNodeForStreaming should NOT be called
Expand Down Expand Up @@ -954,19 +917,6 @@ void testRescheduleConnection_singleBlockNode() {
.schedule(any(BlockNodeConnectionTask.class), eq(5000L), eq(TimeUnit.MILLISECONDS));
}

@Test
void testConnectionResetsTheStream_streamingDisabled() {
useStreamingDisabledManager();
final BlockNodeConnection connection = mock(BlockNodeConnection.class);

connectionManager.connectionResetsTheStream(connection);

verifyNoInteractions(connection);
verifyNoInteractions(bufferService);
verifyNoInteractions(executorService);
verifyNoInteractions(metrics);
}

@Test
void testStart_streamingDisabled() {
useStreamingDisabledManager();
Expand Down Expand Up @@ -1063,36 +1013,6 @@ void testRecordEndOfStreamAndCheckLimit_streamingDisabled() {
assertThat(limitExceeded).isFalse();
}

@Test
void testConnectionResetsTheStream() {
final BlockNodeConnection connection = mock(BlockNodeConnection.class);
final BlockNodeConfig nodeConfig = newBlockNodeConfig(8080, 1);
doReturn(nodeConfig).when(connection).getNodeConfig();
availableNodes().add(nodeConfig);

// Add the connection to the connections map and set it as active
final Map<BlockNodeConfig, BlockNodeConnection> connections = connections();
final AtomicReference<BlockNodeConnection> activeConnectionRef = activeConnection();
connections.put(nodeConfig, connection);
activeConnectionRef.set(connection);

connectionManager.connectionResetsTheStream(connection);

// Verify the active connection reference was cleared
assertThat(activeConnectionRef).hasNullValue();
// Verify a new connection was created and added to the connections map
assertThat(connections).containsKey(nodeConfig);
// Verify it's a different connection object (the old one was replaced)
assertThat(connections.get(nodeConfig)).isNotSameAs(connection);

// Verify that selectNewBlockNodeForStreaming was called
verify(executorService).schedule(any(BlockNodeConnectionTask.class), eq(0L), eq(TimeUnit.MILLISECONDS));
verifyNoMoreInteractions(connection);
verifyNoInteractions(bufferService);
verifyNoInteractions(metrics);
verifyNoMoreInteractions(executorService);
}

@Test
void testRecordEndOfStreamAndCheckLimit_withinLimit() {
final BlockNodeConfig nodeConfig = newBlockNodeConfig(PBJ_UNIT_TEST_HOST, 8080, 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,6 @@ void beforeEach() {
final AtomicReference<Thread> workerThreadRef = workerThreadRef();
workerThreadRef.set(FAKE_WORKER_THREAD);

// resetMocks();

lenient().doReturn(requestPipeline).when(grpcServiceClient).publishBlockStream(connection);
}

Expand Down Expand Up @@ -220,7 +218,6 @@ void testHandleStreamError() {
verify(requestPipeline).onComplete();
verify(connectionManager).rescheduleConnection(connection, Duration.ofSeconds(30), null, true);
verifyNoMoreInteractions(requestPipeline);
verifyNoMoreInteractions(connectionManager);
}

@Test
Expand Down Expand Up @@ -563,7 +560,6 @@ void testOnNext_resendBlock_blockDoesNotExist() {
verify(bufferService).getBlockState(10L);
verifyNoMoreInteractions(metrics);
verifyNoMoreInteractions(requestPipeline);
verifyNoMoreInteractions(connectionManager);
verifyNoMoreInteractions(bufferService);
}

Expand Down Expand Up @@ -701,7 +697,6 @@ void testClose() {
verify(requestPipeline).onComplete();
verifyNoMoreInteractions(metrics);
verifyNoMoreInteractions(requestPipeline);
verifyNoMoreInteractions(connectionManager);
verifyNoInteractions(bufferService);
}

Expand All @@ -727,7 +722,6 @@ void testClose_failure() {
verify(metrics).recordActiveConnectionIp(-1L);
verifyNoMoreInteractions(metrics);
verifyNoMoreInteractions(requestPipeline);
verifyNoMoreInteractions(connectionManager);
verifyNoInteractions(bufferService);
}

Expand All @@ -745,7 +739,6 @@ void testClose_withoutOnComplete() {
verify(metrics).recordConnectionClosed();
verify(metrics).recordActiveConnectionIp(-1L);
verifyNoMoreInteractions(metrics);
verifyNoMoreInteractions(connectionManager);
verifyNoInteractions(bufferService);
verifyNoInteractions(requestPipeline);
}
Expand All @@ -765,7 +758,6 @@ void testClose_notActiveState() {
verify(metrics).recordConnectionClosed();
verify(metrics).recordActiveConnectionIp(-1L);
verifyNoMoreInteractions(metrics);
verifyNoMoreInteractions(connectionManager);
verifyNoMoreInteractions(requestPipeline);
verifyNoInteractions(bufferService);
}
Expand Down Expand Up @@ -829,7 +821,6 @@ void testClose_pipelineNull() {
verify(metrics).recordConnectionClosed();
verify(metrics).recordActiveConnectionIp(-1L);
verifyNoMoreInteractions(metrics);
verifyNoMoreInteractions(connectionManager);
verifyNoInteractions(bufferService);
}

Expand Down Expand Up @@ -875,7 +866,6 @@ void testOnError_activeConnection() {
verify(connectionManager).rescheduleConnection(connection, Duration.ofSeconds(30), null, true);
verifyNoMoreInteractions(metrics);
verifyNoMoreInteractions(requestPipeline);
verifyNoMoreInteractions(connectionManager);
verifyNoInteractions(bufferService);
}

Expand Down Expand Up @@ -950,7 +940,6 @@ void testOnCompleted_streamClosingNotInProgress() {
verify(metrics).recordActiveConnectionIp(-1L);
verifyNoMoreInteractions(metrics);
verifyNoMoreInteractions(requestPipeline);
verifyNoMoreInteractions(connectionManager);
verifyNoInteractions(bufferService);
}

Expand Down Expand Up @@ -1275,11 +1264,9 @@ void testConnectionWorker_hugeItem() throws Exception {
verify(requestPipeline).onComplete();
verify(bufferService).getEarliestAvailableBlockNumber();
verify(bufferService).getHighestAckedBlockNumber();
verify(connectionManager).connectionResetsTheStream(connection);
verifyNoMoreInteractions(metrics);
verifyNoMoreInteractions(requestPipeline);
verifyNoMoreInteractions(bufferService);
verifyNoMoreInteractions(connectionManager);
}

// Tests that no response processing occurs when connection is already closed
Expand Down Expand Up @@ -1384,7 +1371,6 @@ void testHandleStreamFailureWithoutOnComplete() {
// Should not call onComplete on the pipeline
verify(connectionManager).rescheduleConnection(connection, Duration.ofSeconds(30), null, true);
verifyNoInteractions(requestPipeline);
verifyNoMoreInteractions(connectionManager);
}

// Tests that error handling is skipped when connection is already closed
Expand Down Expand Up @@ -1418,7 +1404,6 @@ void testOnError_connectionPending() {
// Should call onComplete when callOnComplete=true (from handleStreamFailure)
verify(requestPipeline).onComplete();
verifyNoMoreInteractions(metrics);
verifyNoMoreInteractions(connectionManager);
verifyNoMoreInteractions(requestPipeline);
}

Expand All @@ -1439,7 +1424,6 @@ void testOnError_connectionUninitialized() {
verify(metrics).recordActiveConnectionIp(-1L);
verifyNoMoreInteractions(metrics);
verifyNoInteractions(requestPipeline);
verifyNoMoreInteractions(connectionManager);
verifyNoInteractions(bufferService);
}

Expand Down Expand Up @@ -1668,7 +1652,7 @@ void testPeriodicStreamReset() {
// Verify reset behavior
verify(bufferService).getEarliestAvailableBlockNumber();
verify(bufferService).getHighestAckedBlockNumber();
verify(connectionManager).connectionResetsTheStream(connection);
verify(connectionManager).selectNewBlockNodeForStreaming(false);
verify(requestPipeline).onNext(any(PublishStreamRequest.class));
verify(requestPipeline).onComplete();

Expand Down
Loading
Loading