Skip to content

Commit e58d60f

Browse files
committed
Add retries for remote store & optimize requests
1 parent 7c85007 commit e58d60f

File tree

1 file changed

+215
-10
lines changed

1 file changed

+215
-10
lines changed

storage/remote/store.go

Lines changed: 215 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,12 @@ import (
2222
"context"
2323
"encoding/hex"
2424
"fmt"
25+
"math"
26+
"math/rand"
2527
"strconv"
2628
"strings"
29+
"sync"
30+
"time"
2731

2832
lru "github.com/hashicorp/golang-lru/v2"
2933
"github.com/onflow/flow-go/engine/common/rpc/convert"
@@ -45,6 +49,171 @@ import (
4549
"github.com/onflow/flow-emulator/types"
4650
)
4751

52+
// Retry and circuit breaker configuration
53+
const (
54+
maxRetries = 5
55+
baseDelay = 100 * time.Millisecond
56+
maxDelay = 30 * time.Second
57+
jitterFactor = 0.1
58+
circuitTimeout = 30 * time.Second // Circuit breaker timeout
59+
)
60+
61+
// isRateLimitError checks if the error is a rate limiting error
62+
func isRateLimitError(err error) bool {
63+
if err == nil {
64+
return false
65+
}
66+
67+
st, ok := status.FromError(err)
68+
if !ok {
69+
return false
70+
}
71+
72+
// Check for common rate limiting gRPC codes
73+
switch st.Code() {
74+
case codes.ResourceExhausted:
75+
return true
76+
case codes.Unavailable:
77+
// Sometimes rate limits are returned as unavailable
78+
return strings.Contains(st.Message(), "rate") ||
79+
strings.Contains(st.Message(), "limit") ||
80+
strings.Contains(st.Message(), "throttle")
81+
case codes.Aborted:
82+
// Some services return rate limits as aborted
83+
return strings.Contains(st.Message(), "rate") ||
84+
strings.Contains(st.Message(), "limit")
85+
}
86+
87+
return false
88+
}
89+
90+
// exponentialBackoffWithJitter calculates delay with exponential backoff and jitter
91+
func exponentialBackoffWithJitter(attempt int) time.Duration {
92+
if attempt <= 0 {
93+
return baseDelay
94+
}
95+
96+
// Calculate exponential delay: baseDelay * 2^(attempt-1)
97+
delay := float64(baseDelay) * math.Pow(2, float64(attempt-1))
98+
99+
// Cap at maxDelay
100+
if delay > float64(maxDelay) {
101+
delay = float64(maxDelay)
102+
}
103+
104+
// Add jitter: ±10% random variation
105+
jitter := delay * jitterFactor * (2*rand.Float64() - 1)
106+
delay += jitter
107+
108+
// Ensure minimum delay
109+
if delay < float64(baseDelay) {
110+
delay = float64(baseDelay)
111+
}
112+
113+
return time.Duration(delay)
114+
}
115+
116+
// retryWithBackoff executes a function with exponential backoff retry on rate limit errors
117+
func (s *Store) retryWithBackoff(ctx context.Context, operation string, fn func() error) error {
118+
var lastErr error
119+
120+
for attempt := 0; attempt <= maxRetries; attempt++ {
121+
// Check circuit breaker first
122+
if !s.circuitBreaker.canMakeRequest() {
123+
s.logger.Debug().
124+
Str("operation", operation).
125+
Msg("Circuit breaker is open, skipping request")
126+
return fmt.Errorf("circuit breaker is open")
127+
}
128+
129+
err := fn()
130+
if err == nil {
131+
s.circuitBreaker.recordSuccess()
132+
return nil
133+
}
134+
135+
lastErr = err
136+
137+
// Only record failures for rate limit errors
138+
if isRateLimitError(err) {
139+
s.circuitBreaker.recordFailure()
140+
}
141+
142+
// Check if this is a rate limit error
143+
if isRateLimitError(err) {
144+
s.logger.Info().
145+
Str("operation", operation).
146+
Msg("Rate limit detected, will retry with backoff")
147+
}
148+
149+
// For all errors (including rate limits), continue with retry logic
150+
if attempt == maxRetries {
151+
s.logger.Warn().
152+
Str("operation", operation).
153+
Int("attempt", attempt+1).
154+
Err(err).
155+
Msg("Request failed after max attempts")
156+
return err
157+
}
158+
159+
// Calculate delay and wait
160+
delay := exponentialBackoffWithJitter(attempt)
161+
s.logger.Debug().
162+
Str("operation", operation).
163+
Int("attempt", attempt+1).
164+
Dur("delay", delay).
165+
Err(err).
166+
Msg("Request failed, retrying with backoff")
167+
168+
select {
169+
case <-ctx.Done():
170+
return ctx.Err()
171+
case <-time.After(delay):
172+
// Continue to next attempt
173+
}
174+
}
175+
176+
return lastErr
177+
}
178+
179+
// circuitBreaker implements a simple circuit breaker pattern
180+
type circuitBreaker struct {
181+
mu sync.RWMutex
182+
failures int
183+
lastFail time.Time
184+
timeout time.Duration
185+
}
186+
187+
// canMakeRequest checks if requests can be made (circuit breaker is closed)
188+
func (cb *circuitBreaker) canMakeRequest() bool {
189+
cb.mu.RLock()
190+
defer cb.mu.RUnlock()
191+
192+
// If we've had recent failures, check timeout
193+
if cb.failures > 0 && time.Since(cb.lastFail) < cb.timeout {
194+
return false
195+
}
196+
197+
return true
198+
}
199+
200+
// recordFailure records a failure and opens the circuit breaker
201+
func (cb *circuitBreaker) recordFailure() {
202+
cb.mu.Lock()
203+
defer cb.mu.Unlock()
204+
205+
cb.failures++
206+
cb.lastFail = time.Now()
207+
}
208+
209+
// recordSuccess records a success and closes the circuit breaker
210+
func (cb *circuitBreaker) recordSuccess() {
211+
cb.mu.Lock()
212+
defer cb.mu.Unlock()
213+
214+
cb.failures = 0 // Reset on success
215+
}
216+
48217
type Store struct {
49218
*sqlite.Store
50219
executionClient executiondata.ExecutionDataAPIClient
@@ -54,6 +223,7 @@ type Store struct {
54223
chainID flowgo.ChainID
55224
forkHeight uint64
56225
logger *zerolog.Logger
226+
circuitBreaker *circuitBreaker
57227
// COMPATIBILITY SHIM: Account Key Deduplication Migration
58228
// TODO: Remove after Flow release - this shim provides backward compatibility
59229
// for pre-migration networks by synthesizing migrated registers from legacy data
@@ -111,6 +281,9 @@ func New(provider *sqlite.Store, logger *zerolog.Logger, options ...Option) (*St
111281
store := &Store{
112282
Store: provider,
113283
logger: logger,
284+
circuitBreaker: &circuitBreaker{
285+
timeout: circuitTimeout,
286+
},
114287
}
115288

116289
for _, opt := range options {
@@ -136,7 +309,12 @@ func New(provider *sqlite.Store, logger *zerolog.Logger, options ...Option) (*St
136309
store.accessClient = access.NewAccessAPIClient(conn)
137310
}
138311

139-
params, err := store.accessClient.GetNetworkParameters(context.Background(), &access.GetNetworkParametersRequest{})
312+
var params *access.GetNetworkParametersResponse
313+
err := store.retryWithBackoff(context.Background(), "GetNetworkParameters", func() error {
314+
var err error
315+
params, err = store.accessClient.GetNetworkParameters(context.Background(), &access.GetNetworkParametersRequest{})
316+
return err
317+
})
140318
if err != nil {
141319
return nil, fmt.Errorf("could not get network parameters: %w", err)
142320
}
@@ -182,7 +360,12 @@ func (s *Store) initializeStartBlock(ctx context.Context) error {
182360

183361
// use the current latest block from the rpc host if no height was provided
184362
if s.forkHeight == 0 {
185-
resp, err := s.accessClient.GetLatestBlockHeader(ctx, &access.GetLatestBlockHeaderRequest{IsSealed: true})
363+
var resp *access.BlockHeaderResponse
364+
err := s.retryWithBackoff(ctx, "GetLatestBlockHeader", func() error {
365+
var err error
366+
resp, err = s.accessClient.GetLatestBlockHeader(ctx, &access.GetLatestBlockHeaderRequest{IsSealed: true})
367+
return err
368+
})
186369
if err != nil {
187370
return fmt.Errorf("could not get last block height: %w", err)
188371
}
@@ -216,7 +399,12 @@ func (s *Store) BlockByID(ctx context.Context, blockID flowgo.Identifier) (*flow
216399
if err == nil {
217400
height = block.Height
218401
} else if errors.Is(err, storage.ErrNotFound) {
219-
heightRes, err := s.accessClient.GetBlockHeaderByID(ctx, &access.GetBlockHeaderByIDRequest{Id: blockID[:]})
402+
var heightRes *access.BlockHeaderResponse
403+
err := s.retryWithBackoff(ctx, "GetBlockHeaderByID", func() error {
404+
var err error
405+
heightRes, err = s.accessClient.GetBlockHeaderByID(ctx, &access.GetBlockHeaderByIDRequest{Id: blockID[:]})
406+
return err
407+
})
220408
if err != nil {
221409
return nil, err
222410
}
@@ -256,7 +444,12 @@ func (s *Store) BlockByHeight(ctx context.Context, height uint64) (*flowgo.Block
256444
return nil, &types.BlockNotFoundByHeightError{Height: height}
257445
}
258446

259-
blockRes, err := s.accessClient.GetBlockHeaderByHeight(ctx, &access.GetBlockHeaderByHeightRequest{Height: height})
447+
var blockRes *access.BlockHeaderResponse
448+
err = s.retryWithBackoff(ctx, "GetBlockHeaderByHeight", func() error {
449+
var err error
450+
blockRes, err = s.accessClient.GetBlockHeaderByHeight(ctx, &access.GetBlockHeaderByHeightRequest{Height: height})
451+
return err
452+
})
260453
if err != nil {
261454
return nil, err
262455
}
@@ -313,10 +506,16 @@ func (s *Store) LedgerByHeight(
313506
}
314507

315508
registerID := convert.RegisterIDToMessage(flowgo.RegisterID{Key: id.Key, Owner: id.Owner})
316-
response, err := s.executionClient.GetRegisterValues(ctx, &executiondata.GetRegisterValuesRequest{
317-
BlockHeight: lookupHeight,
318-
RegisterIds: []*entities.RegisterID{registerID},
509+
var response *executiondata.GetRegisterValuesResponse
510+
511+
err = s.retryWithBackoff(ctx, "GetRegisterValues", func() error {
512+
response, err = s.executionClient.GetRegisterValues(ctx, &executiondata.GetRegisterValuesRequest{
513+
BlockHeight: lookupHeight,
514+
RegisterIds: []*entities.RegisterID{registerID},
515+
})
516+
return err
319517
})
518+
320519
if err != nil {
321520
if status.Code(err) != codes.NotFound {
322521
return nil, err
@@ -484,10 +683,16 @@ func (s *Store) fetchRemoteRegisters(ctx context.Context, owner string, keys []s
484683
registerIDs[i] = convert.RegisterIDToMessage(flowgo.RegisterID{Key: key, Owner: owner})
485684
}
486685

487-
response, err := s.executionClient.GetRegisterValues(ctx, &executiondata.GetRegisterValuesRequest{
488-
BlockHeight: height,
489-
RegisterIds: registerIDs,
686+
var response *executiondata.GetRegisterValuesResponse
687+
err := s.retryWithBackoff(ctx, "GetRegisterValuesBatch", func() error {
688+
var err error
689+
response, err = s.executionClient.GetRegisterValues(ctx, &executiondata.GetRegisterValuesRequest{
690+
BlockHeight: height,
691+
RegisterIds: registerIDs,
692+
})
693+
return err
490694
})
695+
491696
if err != nil {
492697
if status.Code(err) == codes.NotFound {
493698
return make(map[string][]byte), nil

0 commit comments

Comments
 (0)