@@ -105,8 +105,10 @@ mod data {
105105 use graph:: prelude:: transaction_receipt:: LightTransactionReceipt ;
106106 use graph:: prelude:: web3:: types:: H256 ;
107107 use graph:: prelude:: {
108- serde_json as json, BlockNumber , BlockPtr , CachedEthereumCall , Error , Logger , StoreError ,
108+ info, serde_json as json, BlockNumber , BlockPtr , CachedEthereumCall , Error , Logger ,
109+ StoreError ,
109110 } ;
111+
110112 use std:: collections:: HashMap ;
111113 use std:: convert:: TryFrom ;
112114 use std:: fmt;
@@ -1404,56 +1406,87 @@ mod data {
14041406 conn : & mut PgConnection ,
14051407 logger : & Logger ,
14061408 ttl_days : i32 ,
1409+ ttl_max_contracts : Option < i64 > ,
14071410 ) -> Result < ( ) , Error > {
1408- // Delete cache entries in batches since there could be thousands of cache entries per contract
1409- let mut total_deleted = 0 ;
1410- let batch_size = 5000 ;
1411+ let mut total_calls: usize = 0 ;
1412+ let mut total_contracts: i64 = 0 ;
1413+ // We process contracts in batches to avoid loading too many entries into memory
1414+ // at once. Each contract can have many calls, so we also delete calls in batches.
1415+ // Note: The batch sizes were chosen based on experimentation. Potentially, they
1416+ // could be made configurable via ENV vars.
1417+ let contracts_batch_size: i64 = 2000 ;
1418+ let cache_batch_size: usize = 10000 ;
1419+
1420+ // Limits the number of contracts to process if ttl_max_contracts is set.
1421+ // Used also to adjust the final batch size, so we don't process more
1422+ // contracts than the set limit.
1423+ let remaining_contracts = |processed : i64 | -> Option < i64 > {
1424+ ttl_max_contracts. map ( |limit| limit. saturating_sub ( processed) )
1425+ } ;
14111426
14121427 match self {
14131428 Storage :: Shared => {
14141429 use public:: eth_call_cache as cache;
14151430 use public:: eth_call_meta as meta;
14161431
1417- let stale_contracts = meta:: table
1418- . select ( meta:: contract_address)
1419- . filter (
1420- meta:: accessed_at
1421- . lt ( diesel:: dsl:: date ( diesel:: dsl:: now - ttl_days. days ( ) ) ) ,
1422- )
1423- . get_results :: < Vec < u8 > > ( conn) ?;
1424-
1425- if stale_contracts. is_empty ( ) {
1426- return Ok ( ( ) ) ;
1427- }
1428-
14291432 loop {
1430- let next_batch = cache:: table
1431- . select ( cache:: id)
1432- . filter ( cache:: contract_address. eq_any ( & stale_contracts) )
1433- . limit ( batch_size as i64 )
1434- . get_results :: < Vec < u8 > > ( conn) ?;
1435- let deleted_count =
1436- diesel:: delete ( cache:: table. filter ( cache:: id. eq_any ( & next_batch) ) )
1437- . execute ( conn) ?;
1433+ if let Some ( 0 ) = remaining_contracts ( total_contracts) {
1434+ info ! (
1435+ logger,
1436+ "Finished cleaning call cache: deleted {} entries for {} contracts (limit reached)" ,
1437+ total_calls,
1438+ total_contracts
1439+ ) ;
1440+ break ;
1441+ }
14381442
1439- total_deleted += deleted_count;
1443+ let batch_limit = remaining_contracts ( total_contracts)
1444+ . map ( |left| left. min ( contracts_batch_size) )
1445+ . unwrap_or ( contracts_batch_size) ;
1446+
1447+ let stale_contracts = meta:: table
1448+ . select ( meta:: contract_address)
1449+ . filter (
1450+ meta:: accessed_at
1451+ . lt ( diesel:: dsl:: date ( diesel:: dsl:: now - ttl_days. days ( ) ) ) ,
1452+ )
1453+ . limit ( batch_limit)
1454+ . get_results :: < Vec < u8 > > ( conn) ?;
14401455
1441- if deleted_count < batch_size {
1456+ if stale_contracts. is_empty ( ) {
1457+ info ! (
1458+ logger,
1459+ "Finished cleaning call cache: deleted {} entries for {} contracts" ,
1460+ total_calls,
1461+ total_contracts
1462+ ) ;
14421463 break ;
14431464 }
1444- }
14451465
1446- graph:: slog:: info!(
1447- logger,
1448- "Cleaned call cache: deleted {} entries for {} contracts" ,
1449- total_deleted,
1450- stale_contracts. len( )
1451- ) ;
1466+ loop {
1467+ let next_batch = cache:: table
1468+ . select ( cache:: id)
1469+ . filter ( cache:: contract_address. eq_any ( & stale_contracts) )
1470+ . limit ( cache_batch_size as i64 )
1471+ . get_results :: < Vec < u8 > > ( conn) ?;
1472+ let deleted_count =
1473+ diesel:: delete ( cache:: table. filter ( cache:: id. eq_any ( & next_batch) ) )
1474+ . execute ( conn) ?;
14521475
1453- diesel:: delete (
1454- meta:: table. filter ( meta:: contract_address. eq_any ( & stale_contracts) ) ,
1455- )
1456- . execute ( conn) ?;
1476+ total_calls += deleted_count;
1477+
1478+ if deleted_count < cache_batch_size {
1479+ break ;
1480+ }
1481+ }
1482+
1483+ let deleted_contracts = diesel:: delete (
1484+ meta:: table. filter ( meta:: contract_address. eq_any ( & stale_contracts) ) ,
1485+ )
1486+ . execute ( conn) ?;
1487+
1488+ total_contracts += deleted_contracts as i64 ;
1489+ }
14571490
14581491 Ok ( ( ) )
14591492 }
@@ -1463,56 +1496,89 @@ mod data {
14631496 ..
14641497 } ) => {
14651498 let select_query = format ! (
1466- "SELECT contract_address FROM {} \
1467- WHERE accessed_at < CURRENT_DATE - interval '{} days'",
1499+ "WITH stale_contracts AS (
1500+ SELECT contract_address
1501+ FROM {}
1502+ WHERE accessed_at < current_date - interval '{} days'
1503+ LIMIT $1
1504+ )
1505+ SELECT contract_address FROM stale_contracts" ,
14681506 call_meta. qname, ttl_days
14691507 ) ;
14701508
1509+ let delete_cache_query = format ! (
1510+ "WITH targets AS (
1511+ SELECT id
1512+ FROM {}
1513+ WHERE contract_address = ANY($1)
1514+ LIMIT {}
1515+ )
1516+ DELETE FROM {} USING targets
1517+ WHERE {}.id = targets.id" ,
1518+ call_cache. qname, cache_batch_size, call_cache. qname, call_cache. qname
1519+ ) ;
1520+
1521+ let delete_meta_query = format ! (
1522+ "DELETE FROM {} WHERE contract_address = ANY($1)" ,
1523+ call_meta. qname
1524+ ) ;
1525+
14711526 #[ derive( QueryableByName ) ]
14721527 struct ContractAddress {
14731528 #[ diesel( sql_type = Bytea ) ]
14741529 contract_address : Vec < u8 > ,
14751530 }
14761531
1477- let all_stale_contracts: Vec < Vec < u8 > > = sql_query ( select_query)
1478- . load :: < ContractAddress > ( conn) ?
1479- . into_iter ( )
1480- . map ( |row| row. contract_address )
1481- . collect ( ) ;
1482-
1483- if all_stale_contracts. is_empty ( ) {
1484- graph:: slog:: info!( logger, "Cleaned call cache: no stale entries found" ) ;
1485- return Ok ( ( ) ) ;
1486- }
1487-
14881532 loop {
1489- let delete_cache_query = format ! (
1490- "DELETE FROM {} WHERE id IN (
1491- SELECT id FROM {}
1492- WHERE contract_address = ANY($1)
1493- LIMIT {}
1494- )" ,
1495- call_cache. qname, call_cache. qname, batch_size
1496- ) ;
1533+ if let Some ( 0 ) = remaining_contracts ( total_contracts) {
1534+ info ! (
1535+ logger,
1536+ "Finished cleaning call cache: deleted {} entries for {} contracts (limit reached)" ,
1537+ total_calls,
1538+ total_contracts
1539+ ) ;
1540+ break ;
1541+ }
14971542
1498- let deleted_count = sql_query ( delete_cache_query)
1499- . bind :: < Array < Bytea > , _ > ( & all_stale_contracts)
1500- . execute ( conn) ?;
1543+ let batch_limit = remaining_contracts ( total_contracts)
1544+ . map ( |left| left. min ( contracts_batch_size) )
1545+ . unwrap_or ( contracts_batch_size) ;
1546+
1547+ let stale_contracts: Vec < Vec < u8 > > = sql_query ( & select_query)
1548+ . bind :: < BigInt , _ > ( batch_limit)
1549+ . load :: < ContractAddress > ( conn) ?
1550+ . into_iter ( )
1551+ . map ( |r| r. contract_address )
1552+ . collect ( ) ;
1553+
1554+ if stale_contracts. is_empty ( ) {
1555+ info ! (
1556+ logger,
1557+ "Finished cleaning call cache: deleted {} entries for {} contracts" ,
1558+ total_calls,
1559+ total_contracts
1560+ ) ;
1561+ break ;
1562+ }
15011563
1502- total_deleted += deleted_count;
1564+ loop {
1565+ let deleted_count = sql_query ( & delete_cache_query)
1566+ . bind :: < Array < Bytea > , _ > ( & stale_contracts)
1567+ . execute ( conn) ?;
15031568
1504- if deleted_count < batch_size {
1505- break ;
1569+ total_calls += deleted_count;
1570+
1571+ if deleted_count < cache_batch_size {
1572+ break ;
1573+ }
15061574 }
1507- }
15081575
1509- let delete_meta_query = format ! (
1510- "DELETE FROM {} WHERE contract_address = ANY($1)" ,
1511- call_meta. qname
1512- ) ;
1513- sql_query ( delete_meta_query)
1514- . bind :: < Array < Bytea > , _ > ( & all_stale_contracts)
1515- . execute ( conn) ?;
1576+ let deleted_contracts = sql_query ( & delete_meta_query)
1577+ . bind :: < Array < Bytea > , _ > ( & stale_contracts)
1578+ . execute ( conn) ?;
1579+
1580+ total_contracts += deleted_contracts as i64 ;
1581+ }
15161582
15171583 Ok ( ( ) )
15181584 }
@@ -2629,10 +2695,14 @@ impl ChainStoreTrait for ChainStore {
26292695 Ok ( ( ) )
26302696 }
26312697
2632- async fn clear_stale_call_cache ( & self , ttl_days : i32 ) -> Result < ( ) , Error > {
2698+ async fn clear_stale_call_cache (
2699+ & self ,
2700+ ttl_days : i32 ,
2701+ ttl_max_contracts : Option < i64 > ,
2702+ ) -> Result < ( ) , Error > {
26332703 let conn = & mut * self . get_conn ( ) ?;
26342704 self . storage
2635- . clear_stale_call_cache ( conn, & self . logger , ttl_days)
2705+ . clear_stale_call_cache ( conn, & self . logger , ttl_days, ttl_max_contracts )
26362706 }
26372707
26382708 async fn transaction_receipts_in_block (
0 commit comments