Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.testng.IRetryAnalyzer;
import org.testng.ITestResult;
import org.testng.annotations.Test;

/**
* A test base for testing functions.
Expand Down Expand Up @@ -291,25 +294,32 @@ protected void testWindowFunction(String type, String[] expectedResults) throws
// get function info
getFunctionInfoSuccess(functionName);

containerExecResult = pulsarCluster.getAnyWorker().execCmd(
PulsarCluster.ADMIN_SCRIPT,
"functions",
"status",
"--tenant", "public",
"--namespace", "default",
"--name", functionName
);

FunctionStatus functionStatus = FunctionStatusUtil.decode(containerExecResult.getStdout());
assertEquals(functionStatus.getNumInstances(), 1);
assertEquals(functionStatus.getNumRunning(), 1);
assertEquals(functionStatus.getInstances().size(), 1);
assertEquals(functionStatus.getInstances().get(0).getInstanceId(), 0);
assertEquals(functionStatus.getInstances().get(0).getStatus().isRunning(), true);
assertEquals(functionStatus.getInstances().get(0).getStatus().getNumReceived(), 0);
assertEquals(functionStatus.getInstances().get(0).getStatus().getNumSuccessfullyProcessed(), 0);
assertEquals(functionStatus.getInstances().get(0).getStatus().getLatestUserExceptions().size(), 0);
assertEquals(functionStatus.getInstances().get(0).getStatus().getLatestSystemExceptions().size(), 0);
// Use Awaitility with retry mechanism for function status check to improve stability
Awaitility.await()
.pollInterval(Duration.ofSeconds(2))
.atMost(Duration.ofSeconds(30))
.ignoreExceptions()
.untilAsserted(() -> {
ContainerExecResult statusResult = pulsarCluster.getAnyWorker().execCmd(
PulsarCluster.ADMIN_SCRIPT,
"functions",
"status",
"--tenant", "public",
"--namespace", "default",
"--name", functionName
);

FunctionStatus functionStatus = FunctionStatusUtil.decode(statusResult.getStdout());
assertEquals(functionStatus.getNumInstances(), 1);
assertEquals(functionStatus.getNumRunning(), 1);
assertEquals(functionStatus.getInstances().size(), 1);
assertEquals(functionStatus.getInstances().get(0).getInstanceId(), 0);
assertEquals(functionStatus.getInstances().get(0).getStatus().isRunning(), true);
assertEquals(functionStatus.getInstances().get(0).getStatus().getNumReceived(), 0);
assertEquals(functionStatus.getInstances().get(0).getStatus().getNumSuccessfullyProcessed(), 0);
assertEquals(functionStatus.getInstances().get(0).getStatus().getLatestUserExceptions().size(), 0);
assertEquals(functionStatus.getInstances().get(0).getStatus().getLatestSystemExceptions().size(), 0);
});

@Cleanup
PulsarClient client = PulsarClient.builder()
Expand Down
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these unrelated to the fix? Instead of adding an empty PR description, please explain the changes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, I see some noises in recent few flaky tests related PRs, though I'm not sure whether they are really noises.

Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.zip.GZIPOutputStream;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
Expand All @@ -56,6 +60,19 @@

public class DockerUtils {
private static final Logger LOG = LoggerFactory.getLogger(DockerUtils.class);

// Default timeout for Docker command execution
private static final long DEFAULT_DOCKER_COMMAND_TIMEOUT_SECONDS = 60;

// Diagnostic collection timeout
private static final long DIAGNOSTIC_COLLECTION_TIMEOUT_SECONDS = 30;

// Metrics collection for observability enhancement
private static final AtomicInteger totalDockerCommands = new AtomicInteger(0);
private static final AtomicInteger timedOutDockerCommands = new AtomicInteger(0);
private static final AtomicLong totalDockerCommandExecutionTime = new AtomicLong(0);
private static final AtomicInteger totalDiagnosticCollections = new AtomicInteger(0);
private static final AtomicInteger failedDiagnosticCollections = new AtomicInteger(0);

private static File getTargetDirectory(String containerId) {
String base = System.getProperty("maven.buildDirectory");
Expand Down Expand Up @@ -103,8 +120,8 @@ public void onComplete() {
future.complete(true);
}
});
future.get();
} catch (RuntimeException | ExecutionException | IOException e) {
future.get(DIAGNOSTIC_COLLECTION_TIMEOUT_SECONDS, TimeUnit.SECONDS);
} catch (RuntimeException | ExecutionException | IOException | TimeoutException e) {
LOG.error("Error dumping log for {}", containerName, e);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
Expand Down Expand Up @@ -178,13 +195,34 @@ public static ContainerExecResult runCommand(DockerClient docker,
String containerId,
String... cmd)
throws ContainerExecException, ExecutionException, InterruptedException {
long startTime = System.currentTimeMillis();
boolean timedOut = false;
totalDockerCommands.incrementAndGet();

try {
return runCommandAsync(docker, containerId, cmd).get();
// Add timeout for Docker command execution to prevent infinite waiting
ContainerExecResult result = runCommandAsync(docker, containerId, cmd)
.get(DEFAULT_DOCKER_COMMAND_TIMEOUT_SECONDS, TimeUnit.SECONDS);
return result;
} catch (TimeoutException e) {
timedOut = true;
timedOutDockerCommands.incrementAndGet();
// Collect diagnostics when timeout occurs
collectDiagnostics(docker, containerId);
throw new ContainerExecException(
"Command execution timed out after " + DEFAULT_DOCKER_COMMAND_TIMEOUT_SECONDS + " seconds: "
+ String.join(" ", cmd),
containerId,
null);
} catch (ExecutionException e) {
if (e.getCause() instanceof ContainerExecException) {
throw (ContainerExecException) e.getCause();
}
throw e;
} finally {
long executionTime = System.currentTimeMillis() - startTime;
totalDockerCommandExecutionTime.addAndGet(executionTime);
LOG.debug("Docker command completed in {} ms, timed out: {}", executionTime, timedOut);
}
}

Expand All @@ -193,13 +231,34 @@ public static ContainerExecResult runCommandAsUser(String userId,
String containerId,
String... cmd)
throws ContainerExecException, ExecutionException, InterruptedException {
long startTime = System.currentTimeMillis();
boolean timedOut = false;
totalDockerCommands.incrementAndGet();

try {
return runCommandAsyncAsUser(userId, docker, containerId, cmd).get();
// Add timeout for Docker command execution to prevent infinite waiting
ContainerExecResult result = runCommandAsyncAsUser(userId, docker, containerId, cmd)
.get(DEFAULT_DOCKER_COMMAND_TIMEOUT_SECONDS, TimeUnit.SECONDS);
return result;
} catch (TimeoutException e) {
timedOut = true;
timedOutDockerCommands.incrementAndGet();
// Collect diagnostics when timeout occurs
collectDiagnostics(docker, containerId);
throw new ContainerExecException(
"Command execution timed out after " + DEFAULT_DOCKER_COMMAND_TIMEOUT_SECONDS + " seconds: "
+ String.join(" ", cmd),
containerId,
null);
} catch (ExecutionException e) {
if (e.getCause() instanceof ContainerExecException) {
throw (ContainerExecException) e.getCause();
}
throw e;
} finally {
long executionTime = System.currentTimeMillis() - startTime;
totalDockerCommandExecutionTime.addAndGet(executionTime);
LOG.debug("Docker command (as user) completed in {} ms, timed out: {}", executionTime, timedOut);
}
}

Expand Down Expand Up @@ -417,4 +476,136 @@ public static Optional<String> getContainerCluster(DockerClient docker, String c
return Optional.ofNullable(docker.inspectContainerCmd(containerId)
.exec().getConfig().getLabels().get("cluster"));
}
}

/**
* Collect comprehensive diagnostics information when Docker command execution times out.
* This helps with debugging timeout issues.
*/
private static void collectDiagnostics(DockerClient docker, String containerId) {
totalDiagnosticCollections.incrementAndGet();
try {
LOG.info("Collecting diagnostics for container {}", containerId);

// Collect container log
LOG.info("Collecting container log for {}", containerId);
dumpContainerLogToTarget(docker, containerId);

// Collect container state
LOG.info("Collecting container state for {}", containerId);
InspectContainerResponse info = docker.inspectContainerCmd(containerId).exec();
LOG.error("Container state: {}", info.getState());

// Collect running processes
LOG.info("Collecting running processes for {}", containerId);
try {
com.github.dockerjava.api.command.TopContainerResponse top = docker.topContainerCmd(containerId).exec();
LOG.error("Running processes: {}", top);
} catch (Exception e) {
LOG.error("Failed to collect running processes", e);
}

// Collect container configuration
LOG.info("Collecting container configuration for {}", containerId);
LOG.error("Container config: {}", info.getConfig());

// Collect container network settings
LOG.info("Collecting container network settings for {}", containerId);
LOG.error("Container network settings: {}", info.getNetworkSettings());

// Try to collect function logs if this is a worker container
LOG.info("Collecting function logs for {}", containerId);
try {
dumpContainerDirToTargetCompressed(docker, containerId, "/pulsar/logs/functions");
} catch (Exception e) {
LOG.warn("Failed to collect function logs", e);
}

// Try to collect Pulsar logs
LOG.info("Collecting Pulsar logs for {}", containerId);
try {
dumpContainerDirToTargetCompressed(docker, containerId, "/pulsar/logs");
} catch (Exception e) {
LOG.warn("Failed to collect Pulsar logs", e);
}

// Collect function-specific diagnostics for Pulsar Functions containers
collectFunctionSpecificDiagnostics(docker, containerId);

LOG.info("Finished collecting diagnostics for container {}", containerId);
} catch (Exception e) {
failedDiagnosticCollections.incrementAndGet();
LOG.error("Failed to collect diagnostics for container {}", containerId, e);
}
}

/**
* Collect function-specific diagnostics for Pulsar Functions containers.
* This includes function metadata, configuration files, and instance information.
*/
private static void collectFunctionSpecificDiagnostics(DockerClient docker, String containerId) {
try {
// Get container name
String containerName = getContainerName(docker, containerId);

// Collect configuration files
try {
LOG.info("Collecting configuration files for {}", containerName);
dumpContainerDirToTargetCompressed(docker, containerId, "/pulsar/conf");
} catch (Exception e) {
LOG.warn("Failed to collect configuration files", e);
}

// Collect function metadata
try {
LOG.info("Collecting function metadata for {}", containerName);
// Try to get function information
CompletableFuture<ContainerExecResult> future = runCommandAsync(docker, containerId,
"/pulsar/bin/pulsar-admin", "functions", "list");

// Add a reasonable timeout for this diagnostic command
ContainerExecResult result = future.get(10, TimeUnit.SECONDS);
LOG.info("Functions in container {}: {}", containerName, result.getStdout());
} catch (Exception e) {
LOG.warn("Failed to collect function metadata", e);
}

} catch (Exception e) {
LOG.error("Failed to collect function-specific diagnostics", e);
}
}

/**
* Print a summary of Docker command metrics for observability.
* This helps in identifying performance bottlenecks and timeout issues.
*/
public static void printDockerMetricsSummary() {
LOG.info("===== Docker Command Metrics Summary =====");
LOG.info("Total Docker commands executed: {}", totalDockerCommands.get());
LOG.info("Docker commands timed out: {}", timedOutDockerCommands.get());
LOG.info("Diagnostic collections attempted: {}", totalDiagnosticCollections.get());
LOG.info("Failed diagnostic collections: {}", failedDiagnosticCollections.get());

if (totalDockerCommands.get() > 0) {
LOG.info("Docker command timeout rate: {}%",
(timedOutDockerCommands.get() * 100.0 / totalDockerCommands.get()));
}

if (totalDockerCommands.get() > 0) {
LOG.info("Average Docker command execution time: {} ms",
totalDockerCommandExecutionTime.get() / totalDockerCommands.get());
}
LOG.info("=====================================");
}

/**
* Reset all Docker command metrics.
* This is useful for cleaning up between test runs.
*/
public static void resetDockerMetrics() {
totalDockerCommands.set(0);
timedOutDockerCommands.set(0);
totalDockerCommandExecutionTime.set(0);
totalDiagnosticCollections.set(0);
failedDiagnosticCollections.set(0);
}
}