diff --git a/README.md b/README.md index 6c7d3f51..5fab00ad 100644 --- a/README.md +++ b/README.md @@ -73,8 +73,8 @@ values. | `--host` | `FLOW_HOST` | ` ` | Host to listen on for emulator GRPC/REST/Admin servers (default: All Interfaces) | | `--chain-id` | `FLOW_CHAINID` | `emulator` | Chain to simulate, if 'mainnet' or 'testnet' values are used, you will be able to run transactions against that network and a local fork will be created. Valid values are: 'emulator', 'testnet', 'mainnet' | | `--redis-url` | `FLOW_REDIS_URL` | '' | Redis-server URL for persisting redis storage backend ( `redis://[[username:]password@]host[:port][/database]` ) | -| `--start-block-height` | `FLOW_STARTBLOCKHEIGHT` | `0` | Start block height to use when starting the network using 'testnet' or 'mainnet' as the chain-id | -| `--rpc-host` | `FLOW_RPCHOST` | '' | RPC host (access node) to query for previous state when starting the network using 'testnet' or 'mainnet' as the chain-id | +| `--fork-host` | `FLOW_FORK_HOST` | '' | gRPC access node address (`host:port`) to fork from | +| `--fork-height` | `FLOW_FORK_HEIGHT` | `0` | Block height to pin the fork (defaults to latest sealed) | | `--legacy-upgrade` | `FLOW_LEGACYUPGRADE` | `false` | Enable upgrading of legacy contracts | | `--computation-reporting` | `FLOW_COMPUTATIONREPORTING` | `false` | Enable computation reporting for Cadence scripts & transactions | | `--checkpoint-dir` | `FLOW_CHECKPOINTDIR` | '' | Checkpoint directory to load the emulator state from, if starting the emulator from a checkpoint | @@ -155,8 +155,7 @@ Post Data: height={block height} ``` Note: it is only possible to roll back state to a height that was previously executed by the emulator. -To roll back to a past block height when using a forked Mainnet or Testnet network, use the -`--start-block-height` flag. +To pin the starting block height when using a fork, use the `--fork-height` flag. ## Managing emulator state It's possible to manage emulator state by using the admin API. You can at any point @@ -269,15 +268,14 @@ you must specify the network name for the chain ID flag and the RPC host to connect to. ``` -flow emulator --chain-id mainnet --rpc-host access.mainnet.nodes.onflow.org:9000 -flow emulator --chain-id mainnet --rpc-host access.devnet.nodes.onflow.org:9000 +flow emulator --fork-host access.mainnet.nodes.onflow.org:9000 +flow emulator --fork-host access.mainnet.nodes.onflow.org:9000 --fork-height 12345 ``` Please note, that the actual execution on the real network may differ depending on the exact state when the transaction is executed. -By default, the forked network will start from the latest sealed block when the emulator -is started. You can specify a different starting block height by using the `--start-block-height` flag. +By default, the forked network will start from the latest sealed block when the emulator is started. You can specify a different starting block height by using the `--fork-height` flag. You can also store all of your changes and cached registers to a persistent db by using the `--persist` flag, along with the other SQLite settings. diff --git a/cmd/emulator/start/start.go b/cmd/emulator/start/start.go index e25faef4..31b83348 100644 --- a/cmd/emulator/start/start.go +++ b/cmd/emulator/start/start.go @@ -75,14 +75,18 @@ type Config struct { SqliteURL string `default:"" flag:"sqlite-url" info:"sqlite db URL for persisting sqlite storage backend "` CoverageReportingEnabled bool `default:"false" flag:"coverage-reporting" info:"enable Cadence code coverage reporting"` LegacyContractUpgradeEnabled bool `default:"false" flag:"legacy-upgrade" info:"enable Cadence legacy contract upgrade"` - StartBlockHeight uint64 `default:"0" flag:"start-block-height" info:"block height to start the emulator at. only valid when forking Mainnet or Testnet"` - RPCHost string `default:"" flag:"rpc-host" info:"rpc host to query when forking Mainnet or Testnet"` + ForkHost string `default:"" flag:"fork-host" info:"gRPC access node address (host:port) to fork from"` + ForkHeight uint64 `default:"0" flag:"fork-height" info:"height to pin fork; defaults to latest sealed"` CheckpointPath string `default:"" flag:"checkpoint-dir" info:"checkpoint directory to load the emulator state from"` StateHash string `default:"" flag:"state-hash" info:"state hash of the checkpoint to load the emulator state from"` ComputationReportingEnabled bool `default:"false" flag:"computation-reporting" info:"enable Cadence computation reporting"` ScheduledTransactionsEnabled bool `default:"true" flag:"scheduled-transactions" info:"enable Cadence scheduled transactions"` SetupEVMEnabled bool `default:"true" flag:"setup-evm" info:"enable EVM setup for the emulator, this will deploy the EVM contracts"` SetupVMBridgeEnabled bool `default:"true" flag:"setup-vm-bridge" info:"enable VM Bridge setup for the emulator, this will deploy the VM Bridge contracts"` + + // Deprecated hidden aliases + StartBlockHeight uint64 `default:"0" flag:"start-block-height" info:"(deprecated) use --fork-height"` + RPCHost string `default:"" flag:"rpc-host" info:"(deprecated) use --fork-host"` } const EnvPrefix = "FLOW" @@ -147,31 +151,23 @@ func Cmd(config StartConfig) *cobra.Command { Exit(1, err.Error()) } - if conf.StartBlockHeight > 0 && flowChainID != flowgo.Mainnet && flowChainID != flowgo.Testnet { - Exit(1, "❗ --start-block-height is only valid when forking Mainnet or Testnet") + // Deprecation shims: map old flags to new and warn + if conf.RPCHost != "" && conf.ForkHost == "" { + logger.Warn().Msg("❗ --rpc-host is deprecated; use --fork-host") + conf.ForkHost = conf.RPCHost } - - if (flowChainID == flowgo.Mainnet || flowChainID == flowgo.Testnet) && conf.RPCHost == "" { - Exit(1, "❗ --rpc-host must be provided when forking Mainnet or Testnet") + if conf.StartBlockHeight > 0 && conf.ForkHeight == 0 { + logger.Warn().Msg("❗ --start-block-height is deprecated; use --fork-height") + conf.ForkHeight = conf.StartBlockHeight } - serviceAddress := flowsdk.ServiceAddress(flowsdk.ChainID(flowChainID)) - if conf.SimpleAddresses { - serviceAddress = flowsdk.HexToAddress("0x1") + // In non-fork mode, fork-only flags are invalid + if conf.ForkHost == "" && (conf.StartBlockHeight > 0 || conf.ForkHeight > 0) { + Exit(1, "❗ --fork-height requires --fork-host") } - serviceFields := map[string]any{ - "serviceAddress": serviceAddress.Hex(), - "servicePubKey": hex.EncodeToString(servicePublicKey.Encode()), - "serviceSigAlgo": serviceKeySigAlgo.String(), - "serviceHashAlgo": serviceKeyHashAlgo.String(), - } - - if servicePrivateKey != nil { - serviceFields["servicePrivKey"] = hex.EncodeToString(servicePrivateKey.Encode()) - } - - logger.Info().Fields(serviceFields).Msgf("⚙️ Using service account 0x%s", serviceAddress.Hex()) + // Service account logging is deferred until after server configuration to allow + // higher-level wrappers to customize fork settings via ConfigureServer. minimumStorageReservation := fvm.DefaultMinimumStorageReservation if conf.MinimumAccountBalance != "" { @@ -183,6 +179,18 @@ func Cmd(config StartConfig) *cobra.Command { storageMBPerFLOW = parseCadenceUFix64(conf.StorageMBPerFLOW, "storage-per-flow") } + // Recompute chain ID and service address accurately for fork mode by querying the node. + forkHost := conf.ForkHost + resolvedChainID := flowChainID + forkMode := forkHost != "" + if forkMode { + parsed, err := server.DetectRemoteChainID(forkHost) + if err != nil { + Exit(1, fmt.Sprintf("failed to detect remote chain id from %s: %v", forkHost, err)) + } + resolvedChainID = parsed + } + serverConf := &server.Config{ GRPCPort: conf.Port, GRPCDebug: conf.GRPCDebug, @@ -212,13 +220,13 @@ func Cmd(config StartConfig) *cobra.Command { SkipTransactionValidation: conf.SkipTxValidation, SimpleAddressesEnabled: conf.SimpleAddresses, Host: conf.Host, - ChainID: flowChainID, + ChainID: resolvedChainID, RedisURL: conf.RedisURL, ContractRemovalEnabled: conf.ContractRemovalEnabled, SqliteURL: conf.SqliteURL, CoverageReportingEnabled: conf.CoverageReportingEnabled, - StartBlockHeight: conf.StartBlockHeight, - RPCHost: conf.RPCHost, + ForkHost: conf.ForkHost, + ForkHeight: conf.ForkHeight, CheckpointPath: conf.CheckpointPath, StateHash: conf.StateHash, ComputationReportingEnabled: conf.ComputationReportingEnabled, @@ -227,6 +235,24 @@ func Cmd(config StartConfig) *cobra.Command { SetupVMBridgeEnabled: conf.SetupVMBridgeEnabled, } + serviceAddress := flowsdk.ServiceAddress(flowsdk.ChainID(resolvedChainID)) + if conf.SimpleAddresses { + serviceAddress = flowsdk.HexToAddress("0x1") + } + + serviceFields := map[string]any{ + "serviceAddress": serviceAddress.Hex(), + "servicePubKey": hex.EncodeToString(servicePublicKey.Encode()), + "serviceSigAlgo": serviceKeySigAlgo.String(), + "serviceHashAlgo": serviceKeyHashAlgo.String(), + } + + if servicePrivateKey != nil { + serviceFields["servicePrivKey"] = hex.EncodeToString(servicePrivateKey.Encode()) + } + + logger.Info().Fields(serviceFields).Msgf("⚙️ Using service account 0x%s", serviceAddress.Hex()) + emu := server.NewEmulatorServer(logger, serverConf) if emu != nil { for _, middleware := range config.RestMiddlewares { @@ -241,6 +267,12 @@ func Cmd(config StartConfig) *cobra.Command { initConfig(cmd) + // Hide and deprecate legacy flags while keeping them functional + _ = cmd.PersistentFlags().MarkHidden("rpc-host") + _ = cmd.PersistentFlags().MarkDeprecated("rpc-host", "use --fork-host") + _ = cmd.PersistentFlags().MarkHidden("start-block-height") + _ = cmd.PersistentFlags().MarkDeprecated("start-block-height", "use --fork-height") + return cmd } diff --git a/docs/overview.md b/docs/overview.md index 547a66f1..b66b091f 100644 --- a/docs/overview.md +++ b/docs/overview.md @@ -73,8 +73,8 @@ values. | `--host` | `FLOW_HOST` | ` ` | Host to listen on for emulator GRPC/REST/Admin servers (default: all interfaces) | | `--chain-id` | `FLOW_CHAINID` | `emulator` | Chain to emulate for address generation. Valid values are: 'emulator', 'testnet', 'mainnet' | | `--redis-url` | `FLOW_REDIS_URL` | '' | Redis-server URL for persisting redis storage backend ( `redis://[[username:]password@]host[:port][/database]` ) | -| `--start-block-height` | `FLOW_STARTBLOCKHEIGHT` | `0` | Start block height to use when starting the network using 'testnet' or 'mainnet' as the chain-id | -| `--rpc-host` | `FLOW_RPCHOST` | '' | RPC host (access node) to query for previous state when starting the network using 'testnet' or 'mainnet' as the chain-id | +| `--fork-host` | `FLOW_FORK_HOST` | '' | gRPC access node address (`host:port`) to fork from | +| `--fork-height` | `FLOW_FORK_HEIGHT` | `0` | Block height to pin the fork (defaults to latest sealed) | ## Running the emulator with the Flow CLI @@ -149,8 +149,7 @@ Post Data: height={block height} ``` Note: it is only possible to roll back state to a height that was previously executed by the emulator. -To roll back to a past block height when using a forked Mainnet or Testnet network, use the -`--start-block-height` flag. +To pin the starting block height when using a fork, use the `--fork-height` flag. ## Managing emulator state @@ -246,14 +245,13 @@ you must specify the network name for the chain ID flag as well as the RPC host to connect to. ``` -flow emulator --chain-id mainnet --rpc-host access-008.mainnet24.nodes.onflow.org:9000 -flow emulator --chain-id mainnet --rpc-host access-002.devnet49.nodes.onflow.org:9000 +flow emulator --fork-host access.mainnet.nodes.onflow.org:9000 +flow emulator --fork-host access.mainnet.nodes.onflow.org:9000 --fork-height 12345 ``` Please note, the actual execution on the real network may differ depending on the exact state when the transaction is executed. -By default, the forked network will start from the latest sealed block when the emulator -is started. You can specify a different starting block height by using the `--start-block-height` flag. +By default, the forked network will start from the latest sealed block when the emulator is started. You can specify a different starting block height by using the `--fork-height` flag. You can also store all of your changes and cached registers to a persistent db by using the `--persist` flag, along with the other sqlite settings. diff --git a/go.mod b/go.mod index 8b022fe4..39becd64 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/google/go-dap v0.11.0 github.com/gorilla/mux v1.8.1 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 + github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/improbable-eng/grpc-web v0.15.0 github.com/logrusorgru/aurora v2.0.3+incompatible github.com/onflow/cadence v1.8.1 @@ -94,7 +95,6 @@ require ( github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect - github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/holiman/bloomfilter/v2 v2.0.3 // indirect github.com/holiman/uint256 v1.3.2 // indirect diff --git a/server/fork_integration_test.go b/server/fork_integration_test.go new file mode 100644 index 00000000..b9ae18f8 --- /dev/null +++ b/server/fork_integration_test.go @@ -0,0 +1,270 @@ +/* + * Flow Emulator + * + * Copyright 2019-2024 Dapper Labs, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package server + +import ( + "context" + "testing" + + "github.com/onflow/cadence" + "github.com/onflow/flow-emulator/convert" + flowsdk "github.com/onflow/flow-go-sdk" + flowgo "github.com/onflow/flow-go/model/flow" + flowaccess "github.com/onflow/flow/protobuf/go/flow/access" + "github.com/rs/zerolog" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +// TestForkingAgainstTestnet exercises the forking path by wiring a remote store +func TestForkingAgainstTestnet(t *testing.T) { + logger := zerolog.Nop() + + // Get remote latest sealed height to pin fork + conn, err := grpc.NewClient("access.testnet.nodes.onflow.org:9000", grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("dial remote: %v", err) + } + defer func() { _ = conn.Close() }() + remote := flowaccess.NewAccessAPIClient(conn) + rh, err := remote.GetLatestBlockHeader(context.Background(), &flowaccess.GetLatestBlockHeaderRequest{IsSealed: true}) + if err != nil { + t.Fatalf("get remote header: %v", err) + } + remoteHeight := rh.Block.Height + + cfg := &Config{ + // Do not start listeners; NewEmulatorServer only configures components. + DBPath: "", + Persist: false, + Snapshot: false, + SkipTransactionValidation: true, + ChainID: flowgo.Testnet, // will be overridden by detectRemoteChainID + ForkHost: "access.testnet.nodes.onflow.org:9000", + ForkHeight: remoteHeight, + } + + srv := NewEmulatorServer(&logger, cfg) + if srv == nil { + t.Fatal("NewEmulatorServer returned nil") + } + + if cfg.ChainID != flowgo.Testnet { + t.Fatalf("expected ChainID to be Testnet after fork detection, got %q", cfg.ChainID) + } + + // Create an initial local block so we have a valid reference block ID in the forked store + if _, _, err := srv.Emulator().ExecuteAndCommitBlock(); err != nil { + t.Fatalf("prime local block: %v", err) + } + + // Submit a minimal transaction against the forked emulator to ensure tx processing works + latest, err := srv.Emulator().GetLatestBlock() + if err != nil { + t.Fatalf("get latest block: %v", err) + } + // Allow emulator height to be equal to or one greater than remote (if remote advanced by one between queries) + if latest.Height != remoteHeight+1 { + t.Fatalf("fork height mismatch: emulator %d not in {remote, remote+1} where remote=%d", latest.Height, remoteHeight) + } + sk := srv.Emulator().ServiceKey() + // Write an Int into account storage and publish a capability to read it later + writeScript := []byte(` + transaction { + prepare(acct: auth(Storage, Capabilities) &Account) { + acct.storage.save(42, to: /storage/foo) + let cap = acct.capabilities.storage.issue<&Int>(/storage/foo) + acct.capabilities.publish(cap, at: /public/foo) + } + } + `) + tx := flowsdk.NewTransaction(). + SetScript(writeScript). + SetReferenceBlockID(flowsdk.Identifier(latest.ID())). + SetProposalKey(flowsdk.Address(sk.Address), sk.Index, sk.SequenceNumber). + SetPayer(flowsdk.Address(sk.Address)). + AddAuthorizer(flowsdk.Address(sk.Address)) + + signer, err := sk.Signer() + if err != nil { + t.Fatalf("signer: %v", err) + } + if err := tx.SignEnvelope(flowsdk.Address(sk.Address), sk.Index, signer); err != nil { + t.Fatalf("sign envelope: %v", err) + } + if err := srv.Emulator().AddTransaction(*convert.SDKTransactionToFlow(*tx)); err != nil { + t.Fatalf("add tx: %v", err) + } + if _, results, err := srv.Emulator().ExecuteAndCommitBlock(); err != nil { + t.Fatalf("execute block: %v", err) + } else { + if len(results) != 1 { + t.Fatalf("expected 1 tx result, got %d", len(results)) + } + r := results[0] + if !r.Succeeded() { + t.Fatalf("tx failed: %v", r.Error) + } + } + + // Read back in a second transaction using the same authorizer and assert via logs + readTxCode := []byte(` + transaction { + prepare(acct: auth(Storage) &Account) { + let ok = acct.storage.borrow<&Int>(from: /storage/foo) != nil + log(ok) + } + } + `) + latest2, err := srv.Emulator().GetLatestBlock() + if err != nil { + t.Fatalf("get latest block for read tx: %v", err) + } + readTx := flowsdk.NewTransaction(). + SetScript(readTxCode). + SetReferenceBlockID(flowsdk.Identifier(latest2.ID())). + SetProposalKey(flowsdk.Address(sk.Address), sk.Index, sk.SequenceNumber). + SetPayer(flowsdk.Address(sk.Address)). + AddAuthorizer(flowsdk.Address(sk.Address)) + if err := readTx.SignEnvelope(flowsdk.Address(sk.Address), sk.Index, signer); err != nil { + t.Fatalf("sign read envelope: %v", err) + } + if err := srv.Emulator().AddTransaction(*convert.SDKTransactionToFlow(*readTx)); err != nil { + t.Fatalf("add read tx: %v", err) + } + if _, readResults, err := srv.Emulator().ExecuteAndCommitBlock(); err != nil { + t.Fatalf("execute read block: %v", err) + } else { + if len(readResults) != 1 || !readResults[0].Succeeded() { + t.Fatalf("read tx failed: %v", readResults[0].Error) + } + logs := readResults[0].Logs + found := false + for _, l := range logs { + if l == "true" { + found = true + break + } + } + if !found { + t.Fatalf("expected log containing true, got: %v", logs) + } + } +} + +// TestForkingAgainstMainnet exercises the forking path with mainnet +func TestForkingAgainstMainnet(t *testing.T) { + t.Skip("remove after Forte release") + logger := zerolog.Nop() + + // Get remote latest sealed height to pin fork + conn, err := grpc.NewClient("access.mainnet.nodes.onflow.org:9000", grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("dial remote: %v", err) + } + defer func() { _ = conn.Close() }() + remote := flowaccess.NewAccessAPIClient(conn) + rh, err := remote.GetLatestBlockHeader(context.Background(), &flowaccess.GetLatestBlockHeaderRequest{IsSealed: true}) + if err != nil { + t.Fatalf("get remote header: %v", err) + } + remoteHeight := rh.Block.Height + + cfg := &Config{ + // Do not start listeners; NewEmulatorServer only configures components. + DBPath: "", + Persist: false, + Snapshot: false, + SkipTransactionValidation: true, + ChainID: flowgo.Mainnet, // will be overridden by detectRemoteChainID + ForkHost: "access.mainnet.nodes.onflow.org:9000", + ForkHeight: remoteHeight, + } + + srv := NewEmulatorServer(&logger, cfg) + if srv == nil { + t.Fatal("NewEmulatorServer returned nil") + } + + if cfg.ChainID != flowgo.Mainnet { + t.Fatalf("expected ChainID to be Mainnet after fork detection, got %q", cfg.ChainID) + } + + // Create an initial local block so we have a valid reference block ID in the forked store + if _, _, err := srv.Emulator().ExecuteAndCommitBlock(); err != nil { + t.Fatalf("prime local block: %v", err) + } + + // Test account key retrieval for a known mainnet account with multiple keys + // This tests the account key deduplication shim by executing a script that accesses + // keys and ensures no errors occur (successful execution proves the shim works) + testAccountScript := []byte(` + access(all) fun main(): Bool { + // Test getting account keys for a known mainnet account + let account = getAccount(0xe467b9dd11fa00df) + + // Test accessing specific key indices + let key0 = account.keys.get(keyIndex: 0) + if key0 == nil { + return false + } + // Access weight and revoked status to test parsing + // (successful access without errors proves the shim works) + if key0!.weight < 0.0 || key0!.isRevoked == key0!.isRevoked { + // Just access the properties, don't actually test values + } + + let key1 = account.keys.get(keyIndex: 1) + if key1 == nil { + return false + } + // Access weight and revoked status to test parsing + if key1!.weight < 0.0 || key1!.isRevoked == key1!.isRevoked { + // Just access the properties, don't actually test values + } + + return true + } + `) + + latest, err := srv.Emulator().GetLatestBlock() + if err != nil { + t.Fatalf("get latest block: %v", err) + } + // Allow emulator height to be equal to or one greater than remote (if remote advanced by one between queries) + if latest.Height != remoteHeight+1 { + t.Fatalf("fork height mismatch: emulator %d not in {remote, remote+1} where remote=%d", latest.Height, remoteHeight) + } + // Execute the script to test account key retrieval + scriptResult, err := srv.Emulator().ExecuteScript(testAccountScript, nil) + if err != nil { + t.Fatalf("test script failed: %v", err) + } + + if !scriptResult.Succeeded() { + t.Fatalf("test script error: %v", scriptResult.Error) + } + + // Check that the script returned true (all verifications passed) + if scriptResult.Value != cadence.Bool(true) { + t.Fatalf("test script returned %v, expected true", scriptResult.Value) + } + + t.Logf("Account key test successful. Script result: %v", scriptResult.Value) +} diff --git a/server/server.go b/server/server.go index e30ba4cb..9c17725d 100644 --- a/server/server.go +++ b/server/server.go @@ -19,11 +19,13 @@ package server import ( + "context" _ "embed" "fmt" "net/http" "os" "sort" + "strings" "time" "github.com/onflow/cadence" @@ -44,6 +46,9 @@ import ( "github.com/onflow/flow-emulator/storage/remote" "github.com/onflow/flow-emulator/storage/sqlite" "github.com/onflow/flow-emulator/storage/util" + flowaccess "github.com/onflow/flow/protobuf/go/flow/access" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" ) // EmulatorServer is a local server that runs a Flow Emulator instance. @@ -137,10 +142,10 @@ type Config struct { SqliteURL string // CoverageReportingEnabled enables/disables Cadence code coverage reporting. CoverageReportingEnabled bool - // RPCHost is the address of the access node to use when using a forked network. - RPCHost string - // StartBlockHeight is the height at which to start the emulator. - StartBlockHeight uint64 + // ForkHost is the gRPC access node address to fork from (host:port). + ForkHost string + // ForkHeight is the height at which to start the emulator when forking. + ForkHeight uint64 // CheckpointPath is the path to the checkpoint folder to use when starting the network on top of existing state. // StateHash should be provided as well. CheckpointPath string @@ -171,7 +176,18 @@ func NewEmulatorServer(logger *zerolog.Logger, conf *Config) *EmulatorServer { return nil } - emulatedBlockchain, err := configureBlockchain(logger, conf, store) + // Derive chain ID for the emulator setup. + resolvedChainID := conf.ChainID + if conf.ForkHost != "" && conf.ChainID == "" { + parsed, err := DetectRemoteChainID(conf.ForkHost) + if err != nil { + logger.Error().Err(err).Msg("❗ Failed to detect remote chain ID") + return nil + } + resolvedChainID = parsed + } + + emulatedBlockchain, err := configureBlockchain(logger, resolvedChainID, conf, store) if err != nil { logger.Err(err).Msg("❗ Failed to configure emulated emulator") return nil @@ -245,6 +261,22 @@ func NewEmulatorServer(logger *zerolog.Logger, conf *Config) *EmulatorServer { return server } +// detectRemoteChainID connects to the remote access node and fetches network parameters to obtain the chain ID. +func DetectRemoteChainID(url string) (flowgo.ChainID, error) { + // Expect raw host:port + conn, err := grpc.NewClient(url, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + return "", err + } + defer func() { _ = conn.Close() }() + client := flowaccess.NewAccessAPIClient(conn) + resp, err := client.GetNetworkParameters(context.Background(), &flowaccess.GetNetworkParametersRequest{}) + if err != nil { + return "", err + } + return flowgo.ChainID(resp.ChainId), nil +} + // Listen starts listening for incoming connections. // // After this non-blocking function executes we can treat the @@ -383,7 +415,12 @@ func configureStorage(logger *zerolog.Logger, conf *Config) (storageProvider sto } } - if conf.ChainID == flowgo.Testnet || conf.ChainID == flowgo.Mainnet { + if conf.ForkHost != "" { + // Validate fork host has port + if !strings.Contains(conf.ForkHost, ":") { + return nil, fmt.Errorf("fork-host must include port (e.g., access.mainnet.nodes.onflow.org:9000)") + } + // TODO: any reason redis shouldn't work? baseProvider, ok := storageProvider.(*sqlite.Store) if !ok { @@ -391,8 +428,8 @@ func configureStorage(logger *zerolog.Logger, conf *Config) (storageProvider sto } provider, err := remote.New(baseProvider, logger, - remote.WithRPCHost(conf.RPCHost, conf.ChainID), - remote.WithStartBlockHeight(conf.StartBlockHeight), + remote.WithForkHost(conf.ForkHost), + remote.WithForkHeight(conf.ForkHeight), ) if err != nil { return nil, err @@ -413,7 +450,7 @@ func configureStorage(logger *zerolog.Logger, conf *Config) (storageProvider sto return storageProvider, err } -func configureBlockchain(logger *zerolog.Logger, conf *Config, store storage.Store) (*emulator.Blockchain, error) { +func configureBlockchain(logger *zerolog.Logger, chainID flowgo.ChainID, conf *Config, store storage.Store) (*emulator.Blockchain, error) { options := []emulator.Option{ emulator.WithServerLogger(*logger), emulator.WithStore(store), @@ -425,7 +462,7 @@ func configureBlockchain(logger *zerolog.Logger, conf *Config, store storage.Sto emulator.WithMinimumStorageReservation(conf.MinimumStorageReservation), emulator.WithStorageMBPerFLOW(conf.StorageMBPerFLOW), emulator.WithTransactionFeesEnabled(conf.TransactionFeesEnabled), - emulator.WithChainID(conf.ChainID), + emulator.WithChainID(chainID), emulator.WithContractRemovalEnabled(conf.ContractRemovalEnabled), emulator.WithSetupVMBridgeEnabled(conf.SetupVMBridgeEnabled), } diff --git a/server/utils/emulator.go b/server/utils/emulator.go index f9103ba7..e560cd23 100644 --- a/server/utils/emulator.go +++ b/server/utils/emulator.go @@ -25,9 +25,9 @@ import ( "strconv" "github.com/gorilla/mux" - flowgo "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-emulator/adapters" "github.com/onflow/flow-emulator/emulator" + flowgo "github.com/onflow/flow-go/model/flow" ) type BlockResponse struct { diff --git a/storage/remote/store.go b/storage/remote/store.go index be74773c..3c1aa50a 100644 --- a/storage/remote/store.go +++ b/storage/remote/store.go @@ -21,7 +21,11 @@ package remote import ( "context" "fmt" + "math" + "math/rand" + "time" + lru "github.com/hashicorp/golang-lru/v2" "github.com/onflow/flow-go/engine/common/rpc/convert" "github.com/onflow/flow-go/fvm/errors" "github.com/onflow/flow-go/fvm/storage/snapshot" @@ -40,6 +44,111 @@ import ( "github.com/onflow/flow-emulator/types" ) +// Retry and concurrency configuration +const ( + maxRetries = 5 + baseDelay = 100 * time.Millisecond + maxDelay = 30 * time.Second + jitterFactor = 0.1 + maxConcurrentRequests = 10 // Maximum concurrent requests to remote node +) + +// isRateLimitError checks if the error is a rate limiting error +func isRateLimitError(err error) bool { + if err == nil { + return false + } + + st, ok := status.FromError(err) + if !ok { + return false + } + + // ResourceExhausted is the standard gRPC code for rate limiting + return st.Code() == codes.ResourceExhausted +} + +// exponentialBackoffWithJitter calculates delay with exponential backoff and jitter +func exponentialBackoffWithJitter(attempt int) time.Duration { + if attempt <= 0 { + return baseDelay + } + + // Calculate exponential delay: baseDelay * 2^(attempt-1) + delay := float64(baseDelay) * math.Pow(2, float64(attempt-1)) + + // Cap at maxDelay + if delay > float64(maxDelay) { + delay = float64(maxDelay) + } + + // Add jitter: ±10% random variation + jitter := delay * jitterFactor * (2*rand.Float64() - 1) + delay += jitter + + // Ensure minimum delay + if delay < float64(baseDelay) { + delay = float64(baseDelay) + } + + return time.Duration(delay) +} + +// retryWithBackoff executes a function with exponential backoff retry on rate limit errors +func (s *Store) retryWithBackoff(ctx context.Context, operation string, fn func() error) error { + // Acquire semaphore to limit concurrent requests + select { + case s.concurrencySem <- struct{}{}: + defer func() { <-s.concurrencySem }() + case <-ctx.Done(): + return ctx.Err() + } + + var lastErr error + + for attempt := 0; attempt <= maxRetries; attempt++ { + err := fn() + if err == nil { + return nil + } + + lastErr = err + + // Only retry on recognized, transient rate limit errors + if !isRateLimitError(err) { + return err + } + + // Continue with retry logic for rate limits only + if attempt == maxRetries { + s.logger.Warn(). + Str("operation", operation). + Int("attempt", attempt+1). + Err(err). + Msg("Request failed after max attempts") + return err + } + + // Calculate delay and wait + delay := exponentialBackoffWithJitter(attempt) + s.logger.Debug(). + Str("operation", operation). + Int("attempt", attempt+1). + Dur("delay", delay). + Err(err). + Msg("Rate limited, retrying with backoff") + + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(delay): + // Continue to next attempt + } + } + + return lastErr +} + type Store struct { *sqlite.Store executionClient executiondata.ExecutionDataAPIClient @@ -49,25 +158,39 @@ type Store struct { chainID flowgo.ChainID forkHeight uint64 logger *zerolog.Logger + concurrencySem chan struct{} // Semaphore to limit concurrent requests } type Option func(*Store) -// WithRPCHost sets access/observer node host. +// WithForkHost configures the remote access/observer node gRPC endpoint. +// Expects raw host:port with no scheme. +func WithForkHost(host string) Option { + return func(store *Store) { + store.host = host + } +} + +// WithRPCHost sets access/observer node host. Deprecated: use WithForkHost. func WithRPCHost(host string, chainID flowgo.ChainID) Option { return func(store *Store) { + // Keep legacy behavior: set host and (optionally) chainID for validation. store.host = host store.chainID = chainID } } // WithStartBlockHeight sets the start height for the store. -func WithStartBlockHeight(height uint64) Option { +// WithForkHeight sets the pinned fork height. +func WithForkHeight(height uint64) Option { return func(store *Store) { store.forkHeight = height } } +// WithStartBlockHeight is deprecated: use WithForkHeight. +func WithStartBlockHeight(height uint64) Option { return WithForkHeight(height) } + // WithClient can set an rpc host client // // This is mostly use for testing. @@ -83,8 +206,9 @@ func WithClient( func New(provider *sqlite.Store, logger *zerolog.Logger, options ...Option) (*Store, error) { store := &Store{ - Store: provider, - logger: logger, + Store: provider, + logger: logger, + concurrencySem: make(chan struct{}, maxConcurrentRequests), } for _, opt := range options { @@ -110,13 +234,26 @@ func New(provider *sqlite.Store, logger *zerolog.Logger, options ...Option) (*St store.accessClient = access.NewAccessAPIClient(conn) } - params, err := store.accessClient.GetNetworkParameters(context.Background(), &access.GetNetworkParametersRequest{}) + var params *access.GetNetworkParametersResponse + err := store.retryWithBackoff(context.Background(), "GetNetworkParameters", func() error { + var err error + params, err = store.accessClient.GetNetworkParameters(context.Background(), &access.GetNetworkParametersRequest{}) + return err + }) if err != nil { return nil, fmt.Errorf("could not get network parameters: %w", err) } - if params.ChainId != store.chainID.String() { - return nil, fmt.Errorf("chain ID of rpc host does not match chain ID provided in config: %s != %s", params.ChainId, store.chainID) + // If a chainID was provided (legacy path), validate it matches the remote. If not provided, skip. + if store.chainID != "" { + if params.ChainId != store.chainID.String() { + return nil, fmt.Errorf("chain ID of rpc host does not match chain ID provided in config: %s != %s", params.ChainId, store.chainID) + } + } + + // Record remote chain ID if not already set via options + if store.chainID == "" { + store.chainID = flowgo.ChainID(params.ChainId) } if err := store.initializeStartBlock(context.Background()); err != nil { @@ -144,7 +281,12 @@ func (s *Store) initializeStartBlock(ctx context.Context) error { // use the current latest block from the rpc host if no height was provided if s.forkHeight == 0 { - resp, err := s.accessClient.GetLatestBlockHeader(ctx, &access.GetLatestBlockHeaderRequest{IsSealed: true}) + var resp *access.BlockHeaderResponse + err := s.retryWithBackoff(ctx, "GetLatestBlockHeader", func() error { + var err error + resp, err = s.accessClient.GetLatestBlockHeader(ctx, &access.GetLatestBlockHeaderRequest{IsSealed: true}) + return err + }) if err != nil { return fmt.Errorf("could not get last block height: %w", err) } @@ -154,6 +296,7 @@ func (s *Store) initializeStartBlock(ctx context.Context) error { s.logger.Info(). Uint64("forkHeight", s.forkHeight). Str("host", s.host). + Str("chainId", s.chainID.String()). Msg("Using fork height") // store the initial fork height. any future queries for data on the rpc host will be fixed @@ -178,7 +321,12 @@ func (s *Store) BlockByID(ctx context.Context, blockID flowgo.Identifier) (*flow if err == nil { height = block.Height } else if errors.Is(err, storage.ErrNotFound) { - heightRes, err := s.accessClient.GetBlockHeaderByID(ctx, &access.GetBlockHeaderByIDRequest{Id: blockID[:]}) + var heightRes *access.BlockHeaderResponse + err := s.retryWithBackoff(ctx, "GetBlockHeaderByID", func() error { + var err error + heightRes, err = s.accessClient.GetBlockHeaderByID(ctx, &access.GetBlockHeaderByIDRequest{Id: blockID[:]}) + return err + }) if err != nil { return nil, err } @@ -218,7 +366,12 @@ func (s *Store) BlockByHeight(ctx context.Context, height uint64) (*flowgo.Block return nil, &types.BlockNotFoundByHeightError{Height: height} } - blockRes, err := s.accessClient.GetBlockHeaderByHeight(ctx, &access.GetBlockHeaderByHeightRequest{Height: height}) + var blockRes *access.BlockHeaderResponse + err = s.retryWithBackoff(ctx, "GetBlockHeaderByHeight", func() error { + var err error + blockRes, err = s.accessClient.GetBlockHeaderByHeight(ctx, &access.GetBlockHeaderByHeightRequest{Height: height}) + return err + }) if err != nil { return nil, err } @@ -239,7 +392,18 @@ func (s *Store) LedgerByHeight( ctx context.Context, blockHeight uint64, ) (snapshot.StorageSnapshot, error) { + // Create a snapshot with LRU cache to avoid duplicate RPC calls within the same snapshot + // LRU cache with max 1000 entries to prevent memory bloat + cache, err := lru.New[string, flowgo.RegisterValue](1000) + if err != nil { + return nil, fmt.Errorf("failed to create LRU cache: %w", err) + } + return snapshot.NewReadFuncStorageSnapshot(func(id flowgo.RegisterID) (flowgo.RegisterValue, error) { + // Check LRU cache first to avoid duplicate RPC calls within this snapshot + if cachedValue, exists := cache.Get(id.String()); exists { + return cachedValue, nil + } // create a copy so updating it doesn't affect future calls lookupHeight := blockHeight @@ -264,10 +428,16 @@ func (s *Store) LedgerByHeight( } registerID := convert.RegisterIDToMessage(flowgo.RegisterID{Key: id.Key, Owner: id.Owner}) - response, err := s.executionClient.GetRegisterValues(ctx, &executiondata.GetRegisterValuesRequest{ - BlockHeight: lookupHeight, - RegisterIds: []*entities.RegisterID{registerID}, + var response *executiondata.GetRegisterValuesResponse + + err = s.retryWithBackoff(ctx, "GetRegisterValues", func() error { + response, err = s.executionClient.GetRegisterValues(ctx, &executiondata.GetRegisterValuesRequest{ + BlockHeight: lookupHeight, + RegisterIds: []*entities.RegisterID{registerID}, + }) + return err }) + if err != nil { if status.Code(err) != codes.NotFound { return nil, err @@ -289,10 +459,11 @@ func (s *Store) LedgerByHeight( return nil, fmt.Errorf("could not cache ledger value: %w", err) } + // Cache in LRU cache for this snapshot to avoid duplicate RPC calls + cache.Add(id.String(), value) return value, nil }), nil } - func (s *Store) Stop() { _ = s.grpcConn.Close() }