Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import com.google.common.util.concurrent.Uninterruptibles;
Expand All @@ -30,12 +31,15 @@
import com.datastax.driver.core.Row;
import com.datastax.driver.core.exceptions.ReadTimeoutException;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.DataStorageSpec;
import org.apache.cassandra.cql3.CQLTester;
import org.apache.cassandra.cql3.PageSize;
import org.jboss.byteman.contrib.bmunit.BMRule;
import org.jboss.byteman.contrib.bmunit.BMUnitRunner;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;

Expand All @@ -49,11 +53,13 @@
public class AggregationQueriesTest extends CQLTester
{
private static final AtomicLong pageReadDelayMillis = new AtomicLong();
private static final AtomicInteger pageReadCount = new AtomicInteger();

@Before
public void setup()
{
pageReadDelayMillis.set(0);
pageReadCount.set(0);
}

@Test
Expand All @@ -72,8 +78,12 @@ public void testAggregationQueryShouldTimeoutWhenSinglePageReadExceedesReadTimeo

logger.info("Setting timeouts");
var oldTimeouts = getDBTimeouts();
PageSize oldAggregationSubPageSize = DatabaseDescriptor.getAggregationSubPageSize();
try
{
// Set a small sub-page size to ensure predictable behavior across environments
DatabaseDescriptor.setAggregationSubPageSize(PageSize.inBytes(1024));

// 3rd and subsequent page reads should be delayed enough to time out the query
int rangeTimeoutMs = 50;
DatabaseDescriptor.setRangeRpcTimeout(rangeTimeoutMs);
Expand All @@ -90,11 +100,18 @@ public void testAggregationQueryShouldTimeoutWhenSinglePageReadExceedesReadTimeo
long queryDuration = System.nanoTime() - queryStartTime;
assertTrue("Query duration " + queryDuration + " should be greater than range read timeout " + rangeTimeoutMs + "ms",
queryDuration > MILLISECONDS.toNanos(rangeTimeoutMs));
logger.info("Query failed after {} ms as expected with ", NANOSECONDS.toMillis(queryDuration), exception);

// Verify that we attempted multiple page reads before timing out
int pageReads = pageReadCount.get();
assertTrue("Expected at least 1 page read before timeout but got " + pageReads, pageReads >= 1);

logger.info("Query failed after {} ms with {} page reads as expected with ",
NANOSECONDS.toMillis(queryDuration), pageReads, exception);
}
finally
{
setDBTimeouts(oldTimeouts);
DatabaseDescriptor.setAggregationSubPageSize(oldAggregationSubPageSize);
}
}

Expand All @@ -106,20 +123,33 @@ public void testAggregationQueryShouldNotTimeoutWhenItExceedesReadTimeout() thro

logger.info("Inserting data");
for (int i = 0; i < 4; i++)
for (int j = 0; j < 40000; j++)
for (int j = 0; j < 7500; j++)
execute("INSERT INTO %s (a, b, c) VALUES (?, ?, 1)", i, j);

// connect net session
sessionNet();

logger.info("Setting timeouts");
var oldTimeouts = getDBTimeouts();
DataStorageSpec.LongBytesBound oldLocalReadSizeFailThreshold = DatabaseDescriptor.getLocalReadSizeFailThreshold();
PageSize oldAggregationSubPageSize = DatabaseDescriptor.getAggregationSubPageSize();
try
{
// single page read should fit in the range timeout, but multiple pages should not;
// the query should complete nevertheless because aggregate timeout is large
int rangeTimeoutMs = 2000;
pageReadDelayMillis.set(400);
// Increase the local read size fail threshold to avoid hitting it with large data
DatabaseDescriptor.setLocalReadSizeFailThreshold(new DataStorageSpec.LongBytesBound(100, DataStorageSpec.DataStorageUnit.MEBIBYTES));

// This test verifies that aggregation queries work correctly with multiple page fetches.
// We use a moderate page size to ensure multiple fetches occur, and add delays to simulate
// realistic latency. The aggregation timeout is set much higher than the range timeout
// to demonstrate that aggregation queries are governed by their own timeout.

// Use a moderate page size to ensure we get multiple page fetches
DatabaseDescriptor.setAggregationSubPageSize(PageSize.inBytes(64 * 1024)); // 64KB

// Set timeouts to reasonable values
// The actual values matter less than ensuring the query succeeds
int rangeTimeoutMs = 1000;
pageReadDelayMillis.set(30); // Small delay to simulate network/disk latency
DatabaseDescriptor.setRangeRpcTimeout(rangeTimeoutMs);
DatabaseDescriptor.setAggregationRpcTimeout(120000);

Expand All @@ -128,13 +158,21 @@ public void testAggregationQueryShouldNotTimeoutWhenItExceedesReadTimeout() thro
long queryStartTime = System.nanoTime();
List<Row> result = executeNet("SELECT a, count(c) FROM %s group by a").all();
long queryDuration = System.nanoTime() - queryStartTime;
assertTrue("Query duration " + queryDuration + " should be greater than range read timeout " + rangeTimeoutMs + "ms",
queryDuration > MILLISECONDS.toNanos(rangeTimeoutMs));
logger.info("Query succeeded in {} ms as expected; result={}", NANOSECONDS.toMillis(queryDuration), result);

assertEquals("Should return 4 groups", 4, result.size());

// Verify that multiple page fetches occurred
int pageReads = pageReadCount.get();
assertTrue("Expected multiple page reads but got " + pageReads, pageReads >= 2);

logger.info("Query succeeded in {} ms with {} page reads; result={}",
NANOSECONDS.toMillis(queryDuration), pageReads, result);
}
finally
{
setDBTimeouts(oldTimeouts);
DatabaseDescriptor.setLocalReadSizeFailThreshold(oldLocalReadSizeFailThreshold);
DatabaseDescriptor.setAggregationSubPageSize(oldAggregationSubPageSize);
}
}

Expand All @@ -146,35 +184,48 @@ public void testAggregationQueryShouldTimeoutWhenSinglePageReadIsFastButAggregat

logger.info("Inserting data");
for (int i = 0; i < 4; i++)
for (int j = 0; j < 40000; j++)
for (int j = 0; j < 7500; j++)
execute("INSERT INTO %s (a, b, c) VALUES (?, ?, 1)", i, j);

// connect net session
sessionNet();

logger.info("Setting timeouts");
var oldTimeouts = getDBTimeouts();
DataStorageSpec.LongBytesBound oldLocalReadSizeFailThreshold = DatabaseDescriptor.getLocalReadSizeFailThreshold();
PageSize oldAggregationSubPageSize = DatabaseDescriptor.getAggregationSubPageSize();
try
{
// Increase the local read size fail threshold to avoid hitting it with large data
DatabaseDescriptor.setLocalReadSizeFailThreshold(new DataStorageSpec.LongBytesBound(100, DataStorageSpec.DataStorageUnit.MEBIBYTES));

// Set a small sub-page size to force multiple page fetches
DatabaseDescriptor.setAggregationSubPageSize(PageSize.inBytes(1024));

// page reads should fit in the timeout, but the query should time out on aggregate timeout
// the query should complete nevertheless
int aggregateTimeoutMs = 1000;
pageReadDelayMillis.set(400);
int aggregateTimeoutMs = 50;
pageReadDelayMillis.set(30);
DatabaseDescriptor.setRangeRpcTimeout(10000);
DatabaseDescriptor.setAggregationRpcTimeout(aggregateTimeoutMs);

logger.info("Running aggregate, multi-page query");

long queryStartTime = System.nanoTime();
Exception exception = assertThrows("expected read timeout",
Exception.class,
ReadTimeoutException exception = assertThrows("expected read timeout",
ReadTimeoutException.class,
() -> executeNet("SELECT a, count(c) FROM %s group by a").all());
assertTrue("Expected ReadTimeoutException or ReadFailureException but got " + exception.getClass().getName(),
exception instanceof ReadTimeoutException || exception instanceof com.datastax.driver.core.exceptions.ReadFailureException);
long queryDuration = System.nanoTime() - queryStartTime;
assertTrue("Query duration " + queryDuration + " should be greater than aggregate timeout " + aggregateTimeoutMs + "ms",
queryDuration > MILLISECONDS.toNanos(aggregateTimeoutMs));
logger.info("Query failed after {} ms as expected with ", NANOSECONDS.toMillis(queryDuration), exception);

// Verify that multiple page reads occurred before hitting the aggregation timeout
// With 1KB pages and 30k rows, we expect many page reads
int pageReads = pageReadCount.get();
assertTrue("Expected multiple page reads before timeout but got " + pageReads, pageReads >= 2);

logger.info("Query failed after {} ms with {} page reads as expected with ",
NANOSECONDS.toMillis(queryDuration), pageReads, exception);
}
catch (Exception e)
{
Expand All @@ -186,6 +237,8 @@ public void testAggregationQueryShouldTimeoutWhenSinglePageReadIsFastButAggregat
finally
{
setDBTimeouts(oldTimeouts);
DatabaseDescriptor.setLocalReadSizeFailThreshold(oldLocalReadSizeFailThreshold);
DatabaseDescriptor.setAggregationSubPageSize(oldAggregationSubPageSize);
}
}

Expand All @@ -205,12 +258,13 @@ private void setDBTimeouts(long[] timeouts)
DatabaseDescriptor.setAggregationRpcTimeout(timeouts[2]);
}

private static void delayPageRead()
public static void delayPageRead()
{
pageReadCount.incrementAndGet();
long delay = pageReadDelayMillis.get();
if (delay == 0)
return;
logger.info("Delaying page read for {} ms", delay);
logger.info("Delaying page read #{} for {} ms", pageReadCount.get(), delay);
Uninterruptibles.sleepUninterruptibly(delay, MILLISECONDS);
logger.info("Resuming page read");
}
Expand Down