diff --git a/lndservices/block_header_cache.go b/lndservices/block_header_cache.go new file mode 100644 index 000000000..0e056318c --- /dev/null +++ b/lndservices/block_header_cache.go @@ -0,0 +1,322 @@ +package lndservices + +import ( + "fmt" + "sync" + + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/wire" +) + +const ( + // DefaultHeaderCacheSize is the default maximum number of block + // headers to cache. + DefaultHeaderCacheSize = 100_000 + + // DefaultPurgePercentage is the default percentage of entries to purge + // when the cache reaches capacity (from 1 to 100). + DefaultPurgePercentage = 10 + + // DefaultMinSettledBlockDepth is the default minimum block depth + // required before a block header is considered settled. + DefaultMinSettledBlockDepth = 6 +) + +// BlockHeaderCacheConfig holds configuration parameters for the block header +// cache. +type BlockHeaderCacheConfig struct { + // MaxSize is the maximum number of block headers to cache. + MaxSize uint32 + + // PurgePercentage is the percentage of entries to purge when the cache + // reaches capacity (from 1 to 100, inclusive). + PurgePercentage uint32 + + // MinSettledBlockDepth is the minimum block depth required before a + // block header is considered settled. + MinSettledBlockDepth uint32 +} + +// DefaultBlockHeaderCacheConfig returns a BlockHeaderCacheConfig with default +// values. +func DefaultBlockHeaderCacheConfig() BlockHeaderCacheConfig { + return BlockHeaderCacheConfig{ + MaxSize: DefaultHeaderCacheSize, + PurgePercentage: DefaultPurgePercentage, + MinSettledBlockDepth: DefaultMinSettledBlockDepth, + } +} + +// Validate checks that the configuration parameters are valid. +func (c *BlockHeaderCacheConfig) Validate() error { + if c.PurgePercentage == 0 || c.PurgePercentage > 100 { + return fmt.Errorf("invalid PurgePercentage: %d, must "+ + "be > 0 and <= 100", c.PurgePercentage) + } + + return nil +} + +// headerEntry represents a cached block header with metadata. +type headerEntry struct { + // header is the cached block header. + header wire.BlockHeader + + // hash is the cached block hash. + hash chainhash.Hash + + // height is the block height of this header. + height uint32 +} + +// BlockHeaderCache is a reorg-aware cache of block headers. +// +// TODO(ffranr): Once this component is stable, consider moving btcd repo. +type BlockHeaderCache struct { + // cfg is the cache configuration. + cfg BlockHeaderCacheConfig + + // mu protects concurrent access to the cache. + mu sync.RWMutex + + // byHeight maps block height to header entry. + byHeight map[uint32]*headerEntry + + // byHash maps block hash to header entry. + byHash map[chainhash.Hash]*headerEntry + + // maxHeight tracks the highest block height we've seen. + maxHeight uint32 +} + +// NewBlockHeaderCache creates a new block header cache with the given +// configuration. +func NewBlockHeaderCache(cfg BlockHeaderCacheConfig) (*BlockHeaderCache, + error) { + + if err := cfg.Validate(); err != nil { + return nil, err + } + + return &BlockHeaderCache{ + cfg: cfg, + byHeight: make(map[uint32]*headerEntry), + byHash: make(map[chainhash.Hash]*headerEntry), + }, nil +} + +// isSettled returns whether an entry is considered settled based on +// block depth. +func (c *BlockHeaderCache) isSettled(height uint32) bool { + settledHeight := height + c.cfg.MinSettledBlockDepth + + // If the maximum height among all seen block headers meets or exceeds + // the settled height, this entry is considered settled. + return settledHeight <= c.maxHeight +} + +// Put adds a block header to the cache at the given height. +// +// If the insertion exceeded capacity, entries are purged first. If a +// conflicting header exists at this height, a reorg is detected and all headers +// at or above this height are invalidated. +func (c *BlockHeaderCache) Put(height uint32, header wire.BlockHeader) error { + c.mu.Lock() + defer c.mu.Unlock() + + hash := header.BlockHash() + + // Check if there's already an entry at this height. + if existing, exists := c.byHeight[height]; exists { + existingHash := existing.hash + + // If the hashes match, this is a duplicate insertion. + if existingHash == hash { + return nil + } + + // The hashes do not match, indicating a reorg. Invalidate + // all known headers at or above this height. + c.invalidateFromHeight(height) + } + + // Check capacity and purge if needed. + if uint32(len(c.byHeight)) >= c.cfg.MaxSize { + c.purge() + } + + // Create the new entry and store in the cache. + entry := &headerEntry{ + header: header, + hash: hash, + height: height, + } + + c.byHeight[height] = entry + c.byHash[hash] = entry + + // Update max height seen. + if height > c.maxHeight { + c.maxHeight = height + } + + return nil +} + +// GetByHeight retrieves a block header by height. Returns ok=false if not found +// or if the entry is unsettled (to force external lookup). +func (c *BlockHeaderCache) GetByHeight(height uint32) (wire.BlockHeader, bool) { + c.mu.RLock() + defer c.mu.RUnlock() + + var zero wire.BlockHeader + + entry, exists := c.byHeight[height] + if !exists || !c.isSettled(height) { + return zero, false + } + + return entry.header, true +} + +// GetByHash retrieves a block header by hash. Returns ok=false if not found or +// if the entry is unsettled (to force external lookup). +func (c *BlockHeaderCache) GetByHash(hash chainhash.Hash) (wire.BlockHeader, + bool) { + + c.mu.RLock() + defer c.mu.RUnlock() + + var zero wire.BlockHeader + + entry, exists := c.byHash[hash] + if !exists || !c.isSettled(entry.height) { + return zero, false + } + + return entry.header, true +} + +// invalidateFromHeight removes all entries at or above the given height, +// effectively invalidating the orphaned chain. +func (c *BlockHeaderCache) invalidateFromHeight(heightLowerBound uint32) { + // Track new max height after entries are removed. + var newMaxHeight uint32 + + // Iterate over all entries and remove those at or above the lower + // bound. + for height, entry := range c.byHeight { + // Skip entries below the lower bound. + if height < heightLowerBound { + // Update new max height if needed. + if height > newMaxHeight { + newMaxHeight = height + } + + continue + } + + // Remove the entry which is at or above the lower bound. + hash := entry.hash + delete(c.byHeight, height) + delete(c.byHash, hash) + } + + c.maxHeight = newMaxHeight +} + +// purge removes a random set of entries from the cache at the configured +// purge percentage. +func (c *BlockHeaderCache) purge() { + numToPurge := len(c.byHeight) * int(c.cfg.PurgePercentage) / 100 + if numToPurge == 0 { + numToPurge = 1 + } + + // Remove entries directly from the map iteration (already random + // order). + maxHeightDeleted := false + count := 0 + for height, entry := range c.byHeight { + if count >= numToPurge { + break + } + + if height == c.maxHeight { + maxHeightDeleted = true + } + + hash := entry.hash + delete(c.byHeight, height) + delete(c.byHash, hash) + count++ + } + + if !maxHeightDeleted { + return + } + + // Recalculate max height only if it was deleted. + c.maxHeight = 0 + for height := range c.byHeight { + if height > c.maxHeight { + c.maxHeight = height + } + } +} + +// Size returns the current number of entries in the cache. +func (c *BlockHeaderCache) Size() int { + c.mu.RLock() + defer c.mu.RUnlock() + + return len(c.byHeight) +} + +// Clear removes all entries from the cache. +func (c *BlockHeaderCache) Clear() { + c.mu.Lock() + defer c.mu.Unlock() + + c.byHeight = make(map[uint32]*headerEntry) + c.byHash = make(map[chainhash.Hash]*headerEntry) + c.maxHeight = 0 +} + +// Stats returns statistics about the cache. +func (c *BlockHeaderCache) Stats() CacheStats { + c.mu.RLock() + defer c.mu.RUnlock() + + settled := 0 + for height := range c.byHeight { + if c.isSettled(height) { + settled++ + } + } + + return CacheStats{ + TotalEntries: len(c.byHeight), + SettledEntries: settled, + MaxHeight: c.maxHeight, + } +} + +// CacheStats holds statistics about the block header cache. +type CacheStats struct { + // TotalEntries is the total number of entries in the cache. + TotalEntries int + + // SettledEntries is the number of settled entries in the cache. + SettledEntries int + + // MaxHeight is the highest block height seen. + MaxHeight uint32 +} + +// String returns a string representation of the cache stats. +func (s CacheStats) String() string { + return fmt.Sprintf("BlockHeaderCacheStats(total=%d, settled=%d, "+ + "max_height=%d)", s.TotalEntries, s.SettledEntries, + s.MaxHeight) +} diff --git a/lndservices/block_header_cache_test.go b/lndservices/block_header_cache_test.go new file mode 100644 index 000000000..3960f1973 --- /dev/null +++ b/lndservices/block_header_cache_test.go @@ -0,0 +1,457 @@ +package lndservices + +import ( + "testing" + "time" + + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/wire" + "github.com/stretchr/testify/require" +) + +// makeHeader creates a test block header with a unique hash based on the +// provided nonce. +func makeHeader(nonce uint32, timestamp time.Time) wire.BlockHeader { + return wire.BlockHeader{ + Version: 1, + PrevBlock: chainhash.Hash{}, + MerkleRoot: chainhash.Hash{ + byte(nonce), byte(nonce >> 8), byte(nonce >> 16), + byte(nonce >> 24), + }, + Timestamp: timestamp, + Bits: 0x1d00ffff, + Nonce: nonce, + } +} + +// makeConsecutiveHeaders creates a set of consecutive block headers where each +// header's PrevBlock points to the previous header's hash. +func makeConsecutiveHeaders(startHeight uint32, count int, + baseTime time.Time) []wire.BlockHeader { + + headers := make([]wire.BlockHeader, count) + var prevHash chainhash.Hash + + for i := 0; i < count; i++ { + timestamp := baseTime.Add(time.Duration(i) * 10 * time.Minute) + header := wire.BlockHeader{ + Version: 1, + PrevBlock: prevHash, + MerkleRoot: chainhash.Hash{byte(i)}, + Timestamp: timestamp, + Bits: 0x1d00ffff, + Nonce: startHeight + uint32(i), + } + headers[i] = header + prevHash = header.BlockHash() + } + + return headers +} + +// TestBlockHeaderCacheBasicPutGet tests basic put and get operations by +// height and hash. +func TestBlockHeaderCacheBasicPutGet(t *testing.T) { + t.Parallel() + + cfg := BlockHeaderCacheConfig{ + MaxSize: 100, + PurgePercentage: 10, + MinSettledBlockDepth: 6, + } + cache, err := NewBlockHeaderCache(cfg) + require.NoError(t, err) + + now := time.Now().UTC() + header1 := makeHeader(1, now) + header2 := makeHeader(2, now.Add(10*time.Minute)) + + // Put headers at height 100 and 106 (so 100 becomes settled). + err = cache.Put(100, header1) + require.NoError(t, err) + + err = cache.Put(106, header2) + require.NoError(t, err) + + // Height 100 should now be settled (106 - 100 >= 6). + retrieved, ok := cache.GetByHeight(100) + require.True(t, ok) + require.Equal(t, header1.BlockHash(), retrieved.BlockHash()) + + // Get by hash should also work. + hash1 := header1.BlockHash() + retrievedByHash, ok := cache.GetByHash(hash1) + require.True(t, ok) + require.Equal(t, header1.BlockHash(), retrievedByHash.BlockHash()) + + // Height 106 should not be settled yet (no blocks after it). + retrieved, ok = cache.GetByHeight(106) + require.False(t, ok, "unsettled block should return false") + + // Now add a block at height 112 to settle height 106. + header3 := makeHeader(3, now.Add(20*time.Minute)) + err = cache.Put(112, header3) + require.NoError(t, err) + + // Now height 106 should be settled. + retrieved, ok = cache.GetByHeight(106) + require.True(t, ok) + require.Equal(t, header2.BlockHash(), retrieved.BlockHash()) +} + +// TestBlockHeaderCacheCapacity tests that the cache purges entries when it +// reaches capacity. +func TestBlockHeaderCacheCapacity(t *testing.T) { + t.Parallel() + + cfg := BlockHeaderCacheConfig{ + MaxSize: 10, + PurgePercentage: 30, + MinSettledBlockDepth: 1, + } + cache, err := NewBlockHeaderCache(cfg) + require.NoError(t, err) + + now := time.Now().UTC() + + // Fill the cache to capacity. + for i := uint32(0); i < 10; i++ { + header := makeHeader(i, now.Add(time.Duration(i)*time.Minute)) + err := cache.Put(i, header) + require.NoError(t, err) + } + + require.Equal(t, 10, cache.Size()) + + // Add one more entry, which should trigger a purge. + header := makeHeader(100, now.Add(100*time.Minute)) + err = cache.Put(100, header) + require.NoError(t, err) + + // Cache should have purged ~3 entries (30% of 10), then added 1. + // So we should have around 8 entries. + size := cache.Size() + require.LessOrEqual(t, size, 10, "cache should not exceed max size") + require.GreaterOrEqual(t, size, 7, "cache should have purged entries") +} + +// TestBlockHeaderCacheUnsettledVsSettled tests the unsettled vs settled +// semantics with confirmation depth. +func TestBlockHeaderCacheUnsettledVsSettled(t *testing.T) { + t.Parallel() + + cfg := BlockHeaderCacheConfig{ + MaxSize: 100, + PurgePercentage: 10, + MinSettledBlockDepth: 6, + } + cache, err := NewBlockHeaderCache(cfg) + require.NoError(t, err) + + now := time.Now().UTC() + headers := makeConsecutiveHeaders(100, 20, now) + + // Add headers 100-119. + for i, header := range headers { + err := cache.Put(uint32(100+i), header) + require.NoError(t, err) + } + + // Headers 100-113 should be settled (119 - 6 = 113). + for i := uint32(100); i <= 113; i++ { + _, ok := cache.GetByHeight(i) + require.True(t, ok, "height %d should be settled", i) + } + + // Headers 114-119 should be unsettled. + for i := uint32(114); i <= 119; i++ { + _, ok := cache.GetByHeight(i) + require.False(t, ok, "height %d should be unsettled", i) + } + + // Check stats. + stats := cache.Stats() + require.Equal(t, 20, stats.TotalEntries) + require.Equal(t, 14, stats.SettledEntries) // 100-113 + require.Equal(t, uint32(119), stats.MaxHeight) +} + +// TestBlockHeaderCacheReorg tests reorg detection and orphaned branch +// invalidation. +func TestBlockHeaderCacheReorg(t *testing.T) { + t.Parallel() + + cfg := BlockHeaderCacheConfig{ + MaxSize: 100, + PurgePercentage: 10, + MinSettledBlockDepth: 6, + } + cache, err := NewBlockHeaderCache(cfg) + require.NoError(t, err) + + now := time.Now().UTC() + + // Create initial chain: headers at heights 100-110. + originalChain := makeConsecutiveHeaders(100, 11, now) + for i, header := range originalChain { + err := cache.Put(uint32(100+i), header) + require.NoError(t, err) + } + + // Verify heights 100-104 are settled (110 - 6 = 104). + for i := uint32(100); i <= 104; i++ { + _, ok := cache.GetByHeight(i) + require.True(t, ok, "height %d should be settled", i) + } + + // Verify we have 11 entries. + require.Equal(t, 11, cache.Size()) + + // Simulate a reorg at height 105 by inserting a different header. + reorgHeader := makeHeader(9999, now.Add(50*time.Minute)) + err = cache.Put(105, reorgHeader) + require.NoError(t, err) + + // Heights 105-110 should have been invalidated. + // Only heights 100-104 should remain (5 entries) plus the new 105. + require.Equal(t, 6, cache.Size()) + + // Heights 100-104 should be unsettled as the new highest block height + // is 105 (which is not enough to settle them). + for i := uint32(100); i <= 104; i++ { + _, ok := cache.GetByHeight(i) + require.False(t, ok, "height %d should not be settled", i) + } + + // The new header at 105 should be unsettled (no blocks after it). + _, ok := cache.GetByHeight(105) + require.False(t, ok, "new height 105 should be unsettled") + + // Old headers at 106-110 should be gone. + for i := uint32(106); i <= 110; i++ { + _, ok := cache.GetByHeight(i) + require.False(t, ok, "height %d should be invalidated", i) + } + + // Verify the hash at 105 is the new one. + cache.mu.RLock() + entry := cache.byHeight[105] + cache.mu.RUnlock() + require.NotNil(t, entry) + require.Equal(t, reorgHeader.BlockHash(), entry.header.BlockHash()) +} + +// TestBlockHeaderCacheDuplicateInsert tests that inserting the same header +// twice doesn't cause issues. +func TestBlockHeaderCacheDuplicateInsert(t *testing.T) { + t.Parallel() + + cfg := DefaultBlockHeaderCacheConfig() + cache, err := NewBlockHeaderCache(cfg) + require.NoError(t, err) + + now := time.Now().UTC() + header := makeHeader(42, now) + + err = cache.Put(100, header) + require.NoError(t, err) + + // Insert again at same height with same header. + err = cache.Put(100, header) + require.NoError(t, err) + + // Should still have just 1 entry. + require.Equal(t, 1, cache.Size()) +} + +// TestBlockHeaderCacheClear tests the Clear method. +func TestBlockHeaderCacheClear(t *testing.T) { + t.Parallel() + + cfg := DefaultBlockHeaderCacheConfig() + cache, err := NewBlockHeaderCache(cfg) + require.NoError(t, err) + + now := time.Now().UTC() + + // Add some entries. + for i := uint32(0); i < 10; i++ { + header := makeHeader(i, now.Add(time.Duration(i)*time.Minute)) + err := cache.Put(i*10, header) + require.NoError(t, err) + } + + require.Equal(t, 10, cache.Size()) + + // Clear the cache. + cache.Clear() + + require.Equal(t, 0, cache.Size()) + + stats := cache.Stats() + require.Equal(t, 0, stats.TotalEntries) + require.Equal(t, 0, stats.SettledEntries) + require.Equal(t, uint32(0), stats.MaxHeight) +} + +// TestBlockHeaderCacheStats tests the Stats method. +func TestBlockHeaderCacheStats(t *testing.T) { + t.Parallel() + + cfg := BlockHeaderCacheConfig{ + MaxSize: 100, + PurgePercentage: 10, + MinSettledBlockDepth: 5, + } + cache, err := NewBlockHeaderCache(cfg) + require.NoError(t, err) + + now := time.Now().UTC() + + // Add 15 headers starting at height 100. + for i := uint32(0); i < 15; i++ { + header := makeHeader(i, now.Add(time.Duration(i)*time.Minute)) + err := cache.Put(100+i, header) + require.NoError(t, err) + } + + stats := cache.Stats() + + // Should have 15 total entries. + require.Equal(t, 15, stats.TotalEntries) + + // Heights 100-109 should be settled (114 - 5 = 109), that's 10 entries. + require.Equal(t, 10, stats.SettledEntries) + + // Highest settled should be 114. + require.Equal(t, uint32(114), stats.MaxHeight) + + // Test String method. + statsStr := stats.String() + require.Contains(t, statsStr, "total=15") + require.Contains(t, statsStr, "settled=10") + require.Contains(t, statsStr, "max_height=114") +} + +// TestBlockHeaderCacheEdgeCases tests various edge cases. +func TestBlockHeaderCacheEdgeCases(t *testing.T) { + t.Parallel() + + cfg := BlockHeaderCacheConfig{ + MaxSize: 10, + PurgePercentage: 10, + MinSettledBlockDepth: 6, + } + cache, err := NewBlockHeaderCache(cfg) + require.NoError(t, err) + + // Get from empty cache. + _, ok := cache.GetByHeight(100) + require.False(t, ok) + + hash := chainhash.Hash{} + _, ok = cache.GetByHash(hash) + require.False(t, ok) + + // Add a single header. + now := time.Now().UTC() + header := makeHeader(1, now) + err = cache.Put(100, header) + require.NoError(t, err) + + // Should be unsettled (no confirmations). + _, ok = cache.GetByHeight(100) + require.False(t, ok) + + // Add header far enough ahead to settle the first one. + header2 := makeHeader(2, now.Add(10*time.Minute)) + err = cache.Put(107, header2) + require.NoError(t, err) + + // Now first header should be settled. + retrieved, ok := cache.GetByHeight(100) + require.True(t, ok) + require.Equal(t, header.BlockHash(), retrieved.BlockHash()) +} + +// TestBlockHeaderCacheHeaderEntryHashField verifies that the headerEntry.hash +// field is set and used correctly, especially during invalidation and purge. +func TestBlockHeaderCacheHeaderEntryHashField(t *testing.T) { + t.Parallel() + + cfg := BlockHeaderCacheConfig{ + MaxSize: 10, + PurgePercentage: 50, + MinSettledBlockDepth: 1, + } + cache, err := NewBlockHeaderCache(cfg) + require.NoError(t, err) + + now := time.Now().UTC() + header := makeHeader(123, now) + height := uint32(5) + + // Insert header and check hash field. + err = cache.Put(height, header) + require.NoError(t, err) + + cache.mu.RLock() + entry, exists := cache.byHeight[height] + cache.mu.RUnlock() + require.True(t, exists, "entry should exist") + require.Equal( + t, header.BlockHash(), entry.hash, + "headerEntry.hash should match header.BlockHash()", + ) + + // Insert a different header at the same height to trigger + // reorg/invalidation. + altHeader := makeHeader(999, now.Add(1*time.Minute)) + err = cache.Put(height, altHeader) + require.NoError(t, err) + + cache.mu.RLock() + entry, exists = cache.byHeight[height] + cache.mu.RUnlock() + require.True(t, exists, "entry should exist after reorg") + require.Equal( + t, altHeader.BlockHash(), entry.hash, + "headerEntry.hash should update after reorg", + ) +} + +// TestBlockHeaderCacheInvalidConfig tests that invalid configurations return +// errors. +func TestBlockHeaderCacheInvalidConfig(t *testing.T) { + t.Parallel() + + // Test PurgePercentage = 0. + cfg := BlockHeaderCacheConfig{ + MaxSize: 100, + PurgePercentage: 0, + MinSettledBlockDepth: 6, + } + cache, err := NewBlockHeaderCache(cfg) + require.Error(t, err) + require.Nil(t, cache) + require.Contains(t, err.Error(), "invalid PurgePercentage") + + // Test PurgePercentage > 100. + cfg.PurgePercentage = 101 + cache, err = NewBlockHeaderCache(cfg) + require.Error(t, err) + require.Nil(t, cache) + require.Contains(t, err.Error(), "invalid PurgePercentage") + + // Test valid edge cases. + cfg.PurgePercentage = 1 + cache, err = NewBlockHeaderCache(cfg) + require.NoError(t, err) + require.NotNil(t, cache) + + cfg.PurgePercentage = 100 + cache, err = NewBlockHeaderCache(cfg) + require.NoError(t, err) + require.NotNil(t, cache) +} diff --git a/lndservices/chain_bridge.go b/lndservices/chain_bridge.go index 122bce3e4..181a1a1df 100644 --- a/lndservices/chain_bridge.go +++ b/lndservices/chain_bridge.go @@ -10,7 +10,6 @@ import ( "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/wire" "github.com/lightninglabs/lndclient" - "github.com/lightninglabs/neutrino/cache/lru" "github.com/lightninglabs/taproot-assets/asset" "github.com/lightninglabs/taproot-assets/fn" "github.com/lightninglabs/taproot-assets/proof" @@ -21,12 +20,6 @@ import ( ) const ( - // maxNumBlocksInCache is the maximum number of blocks we'll cache - // timestamps for. With 400k blocks we should only take up approximately - // 3200kB of memory (4 bytes for the block height and 4 bytes for the - // timestamp, not including any map/cache overhead). - maxNumBlocksInCache = 400_000 - // medianTimeBlocks is the number of previous blocks which should be // used to calculate the median time used to validate block timestamps. medianTimeBlocks = 11 @@ -38,40 +31,33 @@ var ( errTxNotFound = fmt.Errorf("transaction not found in proof file") ) -// cacheableTimestamp is a wrapper around an uint32 that can be used as a value -// in an LRU cache. -type cacheableTimestamp uint32 - -// Size returns the size of the cacheable timestamp. Since we scale the cache by -// the number of items and not the total memory size, we can simply return 1 -// here to count each timestamp as 1 item. -func (c cacheableTimestamp) Size() (uint64, error) { - return 1, nil -} - // LndRpcChainBridge is an implementation of the tapgarden.ChainBridge // interface backed by an active remote lnd node. type LndRpcChainBridge struct { + // lnd is the active lnd services client. lnd *lndclient.LndServices - blockTimestampCache *lru.Cache[uint32, cacheableTimestamp] - retryConfig fn.RetryConfig + // retryConfig is the configuration used for retrying operations. + retryConfig fn.RetryConfig + // assetStore is a handle to the asset store. assetStore *tapdb.AssetStore + + // headerCache is a cache for block headers to reduce RPC calls. + headerCache *BlockHeaderCache } // NewLndRpcChainBridge creates a new chain bridge from an active lnd services // client. func NewLndRpcChainBridge(lnd *lndclient.LndServices, - assetStore *tapdb.AssetStore) *LndRpcChainBridge { + assetStore *tapdb.AssetStore, + headerCache *BlockHeaderCache) *LndRpcChainBridge { return &LndRpcChainBridge{ - lnd: lnd, - blockTimestampCache: lru.NewCache[uint32, cacheableTimestamp]( - maxNumBlocksInCache, - ), + lnd: lnd, retryConfig: fn.DefaultRetryConfig(), assetStore: assetStore, + headerCache: headerCache, } } @@ -137,6 +123,11 @@ func (l *LndRpcChainBridge) GetBlock(ctx context.Context, func (l *LndRpcChainBridge) GetBlockHeader(ctx context.Context, hash chainhash.Hash) (*wire.BlockHeader, error) { + // First, check the cache for the requested block header. + if header, ok := l.headerCache.GetByHash(hash); ok { + return &header, nil + } + return fn.RetryFuncN( ctx, l.retryConfig, func() (*wire.BlockHeader, error) { header, err := l.lnd.ChainKit.GetBlockHeader(ctx, hash) @@ -155,6 +146,14 @@ func (l *LndRpcChainBridge) GetBlockHeader(ctx context.Context, func (l *LndRpcChainBridge) GetBlockHeaderByHeight(ctx context.Context, blockHeight int64) (*wire.BlockHeader, error) { + // Convert to uint32 for cache operations. + height := uint32(blockHeight) + + // First, check the cache for the requested block header by height. + if header, ok := l.headerCache.GetByHeight(height); ok { + return &header, nil + } + // First, we need to resolve the block hash at the given height. blockHash, err := fn.RetryFuncN( ctx, l.retryConfig, func() (chainhash.Hash, error) { @@ -190,6 +189,13 @@ func (l *LndRpcChainBridge) GetBlockHeaderByHeight(ctx context.Context, ) } + // Store the retrieved header in the cache. + err = l.headerCache.Put(height, *header) + if err != nil { + return nil, fmt.Errorf("failed to cache block "+ + "header: %w", err) + } + return header, nil }, ) @@ -268,38 +274,20 @@ func (l *LndRpcChainBridge) CurrentHeight(ctx context.Context) (uint32, error) { // GetBlockTimestamp returns the timestamp of the block at the given height. func (l *LndRpcChainBridge) GetBlockTimestamp(ctx context.Context, - height uint32) int64 { + height uint32) (int64, error) { // Shortcut any lookup in case we don't have a valid height in the first // place. if height == 0 { - return 0 + return 0, nil } - cacheTS, err := l.blockTimestampCache.Get(height) - if err == nil { - return int64(cacheTS) - } - - hash, err := fn.RetryFuncN( - ctx, l.retryConfig, func() (chainhash.Hash, error) { - return l.lnd.ChainKit.GetBlockHash(ctx, int64(height)) - }, - ) - if err != nil { - return 0 - } - - // Get block header. - header, err := l.GetBlockHeader(ctx, hash) + blockHeader, err := l.GetBlockHeaderByHeight(ctx, int64(height)) if err != nil { - return 0 + return 0, fmt.Errorf("unable to fetch block header: %w", err) } - ts := uint32(header.Timestamp.Unix()) - _, _ = l.blockTimestampCache.Put(height, cacheableTimestamp(ts)) - - return int64(ts) + return blockHeader.Timestamp.Unix(), nil } // PublishTransaction attempts to publish a new transaction to the @@ -472,7 +460,14 @@ func (l *ProofChainLookup) MeanBlockTimestamp(ctx context.Context, break } - unixTs := l.chainBridge.GetBlockTimestamp(ctx, blockHeight-i) + unixTs, err := l.chainBridge.GetBlockTimestamp( + ctx, blockHeight-i, + ) + if err != nil { + return time.Time{}, fmt.Errorf("unable to fetch block "+ + "header timestamp: %w", err) + } + if unixTs == 0 { return time.Time{}, fmt.Errorf("couldn't find "+ "timestamp for block height %d", blockHeight) diff --git a/lndservices/daemon_adapters.go b/lndservices/daemon_adapters.go index 4b6394d94..715bd9224 100644 --- a/lndservices/daemon_adapters.go +++ b/lndservices/daemon_adapters.go @@ -51,14 +51,16 @@ type LndFsmDaemonAdapters struct { } // NewLndFsmDaemonAdapters creates a new instance of LndFsmDaemonAdapters. -func NewLndFsmDaemonAdapters(lnd *lndclient.LndServices) *LndFsmDaemonAdapters { +func NewLndFsmDaemonAdapters(lnd *lndclient.LndServices, + headerCache *BlockHeaderCache) *LndFsmDaemonAdapters { + retryConfig := fn.DefaultRetryConfig() msgTransport := NewLndMsgTransportClient(lnd) // Initialize the chain bridge without the asset store, as it is not // needed for the FSM adapters. - chainBridge := NewLndRpcChainBridge(lnd, nil) + chainBridge := NewLndRpcChainBridge(lnd, nil, headerCache) chainBridge.retryConfig = retryConfig return &LndFsmDaemonAdapters{ diff --git a/rpcserver.go b/rpcserver.go index 683e15b44..72d82b872 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -1259,9 +1259,15 @@ func (r *rpcServer) MarshalChainAsset(ctx context.Context, a asset.ChainAsset, // Ensure the block timestamp is set if a block height is set. if a.AnchorBlockTimestamp == 0 && a.AnchorBlockHeight > 0 { - a.AnchorBlockTimestamp = r.cfg.ChainBridge.GetBlockTimestamp( + timestamp, err := r.cfg.ChainBridge.GetBlockTimestamp( ctx, a.AnchorBlockHeight, ) + if err != nil { + return nil, fmt.Errorf("unable to fetch block header "+ + "timestamp: %w", err) + } + + a.AnchorBlockTimestamp = timestamp } return rpcutils.MarshalChainAsset( @@ -7743,24 +7749,31 @@ func (r *rpcServer) UniverseStats(ctx context.Context, // marshalAssetSyncSnapshot maps a universe asset sync stat snapshot to the RPC // counterpart. func (r *rpcServer) marshalAssetSyncSnapshot(ctx context.Context, - a universe.AssetSyncSnapshot) *unirpc.AssetStatsSnapshot { + a universe.AssetSyncSnapshot) (*unirpc.AssetStatsSnapshot, error) { resp := &unirpc.AssetStatsSnapshot{ TotalSyncs: int64(a.TotalSyncs), TotalProofs: int64(a.TotalProofs), GroupSupply: int64(a.GroupSupply), } + + blockTimestamp, err := r.cfg.ChainBridge.GetBlockTimestamp( + ctx, a.GenesisHeight, + ) + if err != nil { + return nil, fmt.Errorf("failed to query block header "+ + "timestamp for genesis height: %w", err) + } + rpcAsset := &unirpc.AssetStatsAsset{ - AssetId: a.AssetID[:], - GenesisPoint: a.GenesisPoint.String(), - AssetName: a.AssetName, - AssetType: taprpc.AssetType(a.AssetType), - TotalSupply: int64(a.TotalSupply), - GenesisHeight: int32(a.GenesisHeight), - GenesisTimestamp: r.cfg.ChainBridge.GetBlockTimestamp( - ctx, a.GenesisHeight, - ), - AnchorPoint: a.AnchorPoint.String(), + AssetId: a.AssetID[:], + GenesisPoint: a.GenesisPoint.String(), + AssetName: a.AssetName, + AssetType: taprpc.AssetType(a.AssetType), + TotalSupply: int64(a.TotalSupply), + GenesisHeight: int32(a.GenesisHeight), + GenesisTimestamp: blockTimestamp, + AnchorPoint: a.AnchorPoint.String(), } decDisplay, err := r.cfg.AddrBook.DecDisplayForAssetID(ctx, a.AssetID) @@ -7777,7 +7790,7 @@ func (r *rpcServer) marshalAssetSyncSnapshot(ctx context.Context, resp.Asset = rpcAsset } - return resp + return resp, nil } // QueryAssetStats returns a set of statistics for a given set of assets. @@ -7821,7 +7834,13 @@ func (r *rpcServer) QueryAssetStats(ctx context.Context, ), } for idx, snapshot := range assetStats.SyncStats { - resp.AssetStats[idx] = r.marshalAssetSyncSnapshot(ctx, snapshot) + rpcSnapshot, err := r.marshalAssetSyncSnapshot(ctx, snapshot) + if err != nil { + return nil, fmt.Errorf("failed to marshal asset "+ + "snapshot: %w", err) + } + + resp.AssetStats[idx] = rpcSnapshot } return resp, nil diff --git a/tapcfg/server.go b/tapcfg/server.go index d3d8b7327..6051c2244 100644 --- a/tapcfg/server.go +++ b/tapcfg/server.go @@ -112,14 +112,30 @@ func genServerConfig(cfg *Config, cfgLogger btclog.Logger, lndServices, lndservices.WithPsbtMaxFeeRatio(cfg.Wallet.PsbtMaxFeeRatio), ) - chainBridge := lndservices.NewLndRpcChainBridge(lndServices, assetStore) + + // Create a block header cache with default configuration. + headerCache, err := lndservices.NewBlockHeaderCache( + lndservices.DefaultBlockHeaderCacheConfig(), + ) + if err != nil { + return nil, fmt.Errorf("failed to create block header cache: "+ + "%w", err) + } + + chainBridge := lndservices.NewLndRpcChainBridge( + lndServices, assetStore, headerCache, + ) + msgTransportClient := lndservices.NewLndMsgTransportClient(lndServices) lndRouterClient := lndservices.NewLndRouterClient(lndServices) lndInvoicesClient := lndservices.NewLndInvoicesClient(lndServices) lndFeatureBitsVerifier := lndservices.NewLndFeatureBitVerifier( lndServices, ) - lndFsmDaemonAdapters := lndservices.NewLndFsmDaemonAdapters(lndServices) + + lndFsmDaemonAdapters := lndservices.NewLndFsmDaemonAdapters( + lndServices, headerCache, + ) uniDB := tapdb.NewTransactionExecutor( db, func(tx *sql.Tx) tapdb.BaseUniverseStore { diff --git a/tapgarden/interface.go b/tapgarden/interface.go index bea9618f3..9f07369b3 100644 --- a/tapgarden/interface.go +++ b/tapgarden/interface.go @@ -348,7 +348,7 @@ type ChainBridge interface { // GetBlockTimestamp returns the timestamp of the block at the given // height. - GetBlockTimestamp(context.Context, uint32) int64 + GetBlockTimestamp(context.Context, uint32) (int64, error) // GetBlockHeaderByHeight returns a block header given the block height. GetBlockHeaderByHeight(ctx context.Context, diff --git a/tapgarden/mock.go b/tapgarden/mock.go index 1b228dae1..278635ff5 100644 --- a/tapgarden/mock.go +++ b/tapgarden/mock.go @@ -716,8 +716,10 @@ func (m *MockChainBridge) CurrentHeight(_ context.Context) (uint32, error) { return 0, nil } -func (m *MockChainBridge) GetBlockTimestamp(_ context.Context, _ uint32) int64 { - return 0 +func (m *MockChainBridge) GetBlockTimestamp(_ context.Context, _ uint32) (int64, + error) { + + return 0, nil } func (m *MockChainBridge) PublishTransaction(_ context.Context, diff --git a/universe/supplycommit/mock.go b/universe/supplycommit/mock.go index fb45158eb..c19379b54 100644 --- a/universe/supplycommit/mock.go +++ b/universe/supplycommit/mock.go @@ -14,6 +14,7 @@ import ( "github.com/lightninglabs/taproot-assets/fn" "github.com/lightninglabs/taproot-assets/mssmt" "github.com/lightninglabs/taproot-assets/proof" + "github.com/lightninglabs/taproot-assets/tapgarden" "github.com/lightninglabs/taproot-assets/tapsend" "github.com/lightningnetwork/lnd/chainntnfs" lfn "github.com/lightningnetwork/lnd/fn/v2" @@ -268,10 +269,10 @@ func (m *mockChainBridge) VerifyBlock(ctx context.Context, } func (m *mockChainBridge) GetBlockTimestamp(ctx context.Context, - height uint32) int64 { + height uint32) (int64, error) { args := m.Called(ctx, height) - return args.Get(0).(int64) + return args.Get(0).(int64), args.Error(1) } func (m *mockChainBridge) GenFileChainLookup(f *proof.File) asset.ChainLookup { @@ -289,6 +290,9 @@ func (m *mockChainBridge) GenProofChainLookup( return args.Get(0).(asset.ChainLookup), args.Error(1) } +// Ensure mockChainBridge implements the tapgarden.ChainBridge interface. +var _ tapgarden.ChainBridge = (*mockChainBridge)(nil) + // mockStateMachineStore is a mock implementation of the StateMachineStore // interface. type mockStateMachineStore struct {