2020
2121import java .util .List ;
2222import java .util .concurrent .TimeUnit ;
23+ import java .util .concurrent .atomic .AtomicInteger ;
2324import java .util .concurrent .atomic .AtomicLong ;
2425
2526import com .google .common .util .concurrent .Uninterruptibles ;
3031import com .datastax .driver .core .Row ;
3132import com .datastax .driver .core .exceptions .ReadTimeoutException ;
3233import org .apache .cassandra .config .DatabaseDescriptor ;
34+ import org .apache .cassandra .config .DataStorageSpec ;
3335import org .apache .cassandra .cql3 .CQLTester ;
36+ import org .apache .cassandra .cql3 .PageSize ;
3437import org .jboss .byteman .contrib .bmunit .BMRule ;
3538import org .jboss .byteman .contrib .bmunit .BMUnitRunner ;
3639
3740import static java .util .concurrent .TimeUnit .MILLISECONDS ;
3841import static java .util .concurrent .TimeUnit .NANOSECONDS ;
42+ import static org .junit .Assert .assertEquals ;
3943import static org .junit .Assert .assertThrows ;
4044import static org .junit .Assert .assertTrue ;
4145
4953public class AggregationQueriesTest extends CQLTester
5054{
5155 private static final AtomicLong pageReadDelayMillis = new AtomicLong ();
56+ private static final AtomicInteger pageReadCount = new AtomicInteger ();
5257
5358 @ Before
5459 public void setup ()
5560 {
5661 pageReadDelayMillis .set (0 );
62+ pageReadCount .set (0 );
5763 }
5864
5965 @ Test
@@ -72,8 +78,12 @@ public void testAggregationQueryShouldTimeoutWhenSinglePageReadExceedesReadTimeo
7278
7379 logger .info ("Setting timeouts" );
7480 var oldTimeouts = getDBTimeouts ();
81+ PageSize oldAggregationSubPageSize = DatabaseDescriptor .getAggregationSubPageSize ();
7582 try
7683 {
84+ // Set a small sub-page size to ensure predictable behavior across environments
85+ DatabaseDescriptor .setAggregationSubPageSize (PageSize .inBytes (1024 ));
86+
7787 // 3rd and subsequent page reads should be delayed enough to time out the query
7888 int rangeTimeoutMs = 50 ;
7989 DatabaseDescriptor .setRangeRpcTimeout (rangeTimeoutMs );
@@ -90,11 +100,18 @@ public void testAggregationQueryShouldTimeoutWhenSinglePageReadExceedesReadTimeo
90100 long queryDuration = System .nanoTime () - queryStartTime ;
91101 assertTrue ("Query duration " + queryDuration + " should be greater than range read timeout " + rangeTimeoutMs + "ms" ,
92102 queryDuration > MILLISECONDS .toNanos (rangeTimeoutMs ));
93- logger .info ("Query failed after {} ms as expected with " , NANOSECONDS .toMillis (queryDuration ), exception );
103+
104+ // Verify that we attempted multiple page reads before timing out
105+ int pageReads = pageReadCount .get ();
106+ assertTrue ("Expected at least 1 page read before timeout but got " + pageReads , pageReads >= 1 );
107+
108+ logger .info ("Query failed after {} ms with {} page reads as expected with " ,
109+ NANOSECONDS .toMillis (queryDuration ), pageReads , exception );
94110 }
95111 finally
96112 {
97113 setDBTimeouts (oldTimeouts );
114+ DatabaseDescriptor .setAggregationSubPageSize (oldAggregationSubPageSize );
98115 }
99116 }
100117
@@ -106,20 +123,33 @@ public void testAggregationQueryShouldNotTimeoutWhenItExceedesReadTimeout() thro
106123
107124 logger .info ("Inserting data" );
108125 for (int i = 0 ; i < 4 ; i ++)
109- for (int j = 0 ; j < 40000 ; j ++)
126+ for (int j = 0 ; j < 7500 ; j ++)
110127 execute ("INSERT INTO %s (a, b, c) VALUES (?, ?, 1)" , i , j );
111128
112129 // connect net session
113130 sessionNet ();
114131
115132 logger .info ("Setting timeouts" );
116133 var oldTimeouts = getDBTimeouts ();
134+ DataStorageSpec .LongBytesBound oldLocalReadSizeFailThreshold = DatabaseDescriptor .getLocalReadSizeFailThreshold ();
135+ PageSize oldAggregationSubPageSize = DatabaseDescriptor .getAggregationSubPageSize ();
117136 try
118137 {
119- // single page read should fit in the range timeout, but multiple pages should not;
120- // the query should complete nevertheless because aggregate timeout is large
121- int rangeTimeoutMs = 2000 ;
122- pageReadDelayMillis .set (400 );
138+ // Increase the local read size fail threshold to avoid hitting it with large data
139+ DatabaseDescriptor .setLocalReadSizeFailThreshold (new DataStorageSpec .LongBytesBound (100 , DataStorageSpec .DataStorageUnit .MEBIBYTES ));
140+
141+ // This test verifies that aggregation queries work correctly with multiple page fetches.
142+ // We use a moderate page size to ensure multiple fetches occur, and add delays to simulate
143+ // realistic latency. The aggregation timeout is set much higher than the range timeout
144+ // to demonstrate that aggregation queries are governed by their own timeout.
145+
146+ // Use a moderate page size to ensure we get multiple page fetches
147+ DatabaseDescriptor .setAggregationSubPageSize (PageSize .inBytes (64 * 1024 )); // 64KB
148+
149+ // Set timeouts to reasonable values
150+ // The actual values matter less than ensuring the query succeeds
151+ int rangeTimeoutMs = 1000 ;
152+ pageReadDelayMillis .set (30 ); // Small delay to simulate network/disk latency
123153 DatabaseDescriptor .setRangeRpcTimeout (rangeTimeoutMs );
124154 DatabaseDescriptor .setAggregationRpcTimeout (120000 );
125155
@@ -128,13 +158,21 @@ public void testAggregationQueryShouldNotTimeoutWhenItExceedesReadTimeout() thro
128158 long queryStartTime = System .nanoTime ();
129159 List <Row > result = executeNet ("SELECT a, count(c) FROM %s group by a" ).all ();
130160 long queryDuration = System .nanoTime () - queryStartTime ;
131- assertTrue ("Query duration " + queryDuration + " should be greater than range read timeout " + rangeTimeoutMs + "ms" ,
132- queryDuration > MILLISECONDS .toNanos (rangeTimeoutMs ));
133- logger .info ("Query succeeded in {} ms as expected; result={}" , NANOSECONDS .toMillis (queryDuration ), result );
161+
162+ assertEquals ("Should return 4 groups" , 4 , result .size ());
163+
164+ // Verify that multiple page fetches occurred
165+ int pageReads = pageReadCount .get ();
166+ assertTrue ("Expected multiple page reads but got " + pageReads , pageReads >= 2 );
167+
168+ logger .info ("Query succeeded in {} ms with {} page reads; result={}" ,
169+ NANOSECONDS .toMillis (queryDuration ), pageReads , result );
134170 }
135171 finally
136172 {
137173 setDBTimeouts (oldTimeouts );
174+ DatabaseDescriptor .setLocalReadSizeFailThreshold (oldLocalReadSizeFailThreshold );
175+ DatabaseDescriptor .setAggregationSubPageSize (oldAggregationSubPageSize );
138176 }
139177 }
140178
@@ -146,35 +184,48 @@ public void testAggregationQueryShouldTimeoutWhenSinglePageReadIsFastButAggregat
146184
147185 logger .info ("Inserting data" );
148186 for (int i = 0 ; i < 4 ; i ++)
149- for (int j = 0 ; j < 40000 ; j ++)
187+ for (int j = 0 ; j < 7500 ; j ++)
150188 execute ("INSERT INTO %s (a, b, c) VALUES (?, ?, 1)" , i , j );
151189
152190 // connect net session
153191 sessionNet ();
154192
155193 logger .info ("Setting timeouts" );
156194 var oldTimeouts = getDBTimeouts ();
195+ DataStorageSpec .LongBytesBound oldLocalReadSizeFailThreshold = DatabaseDescriptor .getLocalReadSizeFailThreshold ();
196+ PageSize oldAggregationSubPageSize = DatabaseDescriptor .getAggregationSubPageSize ();
157197 try
158198 {
199+ // Increase the local read size fail threshold to avoid hitting it with large data
200+ DatabaseDescriptor .setLocalReadSizeFailThreshold (new DataStorageSpec .LongBytesBound (100 , DataStorageSpec .DataStorageUnit .MEBIBYTES ));
201+
202+ // Set a small sub-page size to force multiple page fetches
203+ DatabaseDescriptor .setAggregationSubPageSize (PageSize .inBytes (1024 ));
204+
159205 // page reads should fit in the timeout, but the query should time out on aggregate timeout
160206 // the query should complete nevertheless
161- int aggregateTimeoutMs = 1000 ;
162- pageReadDelayMillis .set (400 );
207+ int aggregateTimeoutMs = 50 ;
208+ pageReadDelayMillis .set (30 );
163209 DatabaseDescriptor .setRangeRpcTimeout (10000 );
164210 DatabaseDescriptor .setAggregationRpcTimeout (aggregateTimeoutMs );
165211
166212 logger .info ("Running aggregate, multi-page query" );
167213
168214 long queryStartTime = System .nanoTime ();
169- Exception exception = assertThrows ("expected read timeout" ,
170- Exception .class ,
215+ ReadTimeoutException exception = assertThrows ("expected read timeout" ,
216+ ReadTimeoutException .class ,
171217 () -> executeNet ("SELECT a, count(c) FROM %s group by a" ).all ());
172- assertTrue ("Expected ReadTimeoutException or ReadFailureException but got " + exception .getClass ().getName (),
173- exception instanceof ReadTimeoutException || exception instanceof com .datastax .driver .core .exceptions .ReadFailureException );
174218 long queryDuration = System .nanoTime () - queryStartTime ;
175219 assertTrue ("Query duration " + queryDuration + " should be greater than aggregate timeout " + aggregateTimeoutMs + "ms" ,
176220 queryDuration > MILLISECONDS .toNanos (aggregateTimeoutMs ));
177- logger .info ("Query failed after {} ms as expected with " , NANOSECONDS .toMillis (queryDuration ), exception );
221+
222+ // Verify that multiple page reads occurred before hitting the aggregation timeout
223+ // With 1KB pages and 30k rows, we expect many page reads
224+ int pageReads = pageReadCount .get ();
225+ assertTrue ("Expected multiple page reads before timeout but got " + pageReads , pageReads >= 2 );
226+
227+ logger .info ("Query failed after {} ms with {} page reads as expected with " ,
228+ NANOSECONDS .toMillis (queryDuration ), pageReads , exception );
178229 }
179230 catch (Exception e )
180231 {
@@ -186,6 +237,8 @@ public void testAggregationQueryShouldTimeoutWhenSinglePageReadIsFastButAggregat
186237 finally
187238 {
188239 setDBTimeouts (oldTimeouts );
240+ DatabaseDescriptor .setLocalReadSizeFailThreshold (oldLocalReadSizeFailThreshold );
241+ DatabaseDescriptor .setAggregationSubPageSize (oldAggregationSubPageSize );
189242 }
190243 }
191244
@@ -205,12 +258,13 @@ private void setDBTimeouts(long[] timeouts)
205258 DatabaseDescriptor .setAggregationRpcTimeout (timeouts [2 ]);
206259 }
207260
208- private static void delayPageRead ()
261+ public static void delayPageRead ()
209262 {
263+ pageReadCount .incrementAndGet ();
210264 long delay = pageReadDelayMillis .get ();
211265 if (delay == 0 )
212266 return ;
213- logger .info ("Delaying page read for {} ms" , delay );
267+ logger .info ("Delaying page read #{} for {} ms" , pageReadCount . get () , delay );
214268 Uninterruptibles .sleepUninterruptibly (delay , MILLISECONDS );
215269 logger .info ("Resuming page read" );
216270 }
0 commit comments