diff --git a/Makefile b/Makefile index d8895c3..ce773fc 100644 --- a/Makefile +++ b/Makefile @@ -26,6 +26,7 @@ build: ## Build the HTTP server go build -trimpath -ldflags "-X github.com/flashbots/tdx-orderflow-proxy/common.Version=${VERSION}" -v -o ./build/sender-proxy cmd/sender-proxy/main.go go build -trimpath -ldflags "-X github.com/flashbots/tdx-orderflow-proxy/common.Version=${VERSION}" -v -o ./build/receiver-proxy cmd/receiver-proxy/main.go go build -trimpath -ldflags "-X github.com/flashbots/tdx-orderflow-proxy/common.Version=${VERSION}" -v -o ./build/test-orderflow-sender cmd/test-tx-sender/main.go + go build -trimpath -ldflags "-X github.com/flashbots/tdx-orderflow-proxy/common.Version=${VERSION}" -v -o ./build/test-e2e-latency cmd/test-e2e-latency/main.go .PHONY: build-receiver-proxy build-receiver-proxy: ## Build only the receiver-proxy diff --git a/cmd/receiver-proxy/main.go b/cmd/receiver-proxy/main.go index 50a6c6f..d4b0644 100644 --- a/cmd/receiver-proxy/main.go +++ b/cmd/receiver-proxy/main.go @@ -212,16 +212,19 @@ func runMain(cCtx *cli.Context) error { maxUserRPS := cCtx.Int(flagMaxUserRPS) proxyConfig := &proxy.ReceiverProxyConfig{ - ReceiverProxyConstantConfig: proxy.ReceiverProxyConstantConfig{Log: log, FlashbotsSignerAddress: flashbotsSignerAddress}, - BuilderConfigHubEndpoint: builderConfigHubEndpoint, - ArchiveEndpoint: archiveEndpoint, - ArchiveConnections: connectionsPerPeer, - LocalBuilderEndpoint: builderEndpoint, - EthRPC: rpcEndpoint, - MaxRequestBodySizeBytes: maxRequestBodySizeBytes, - ConnectionsPerPeer: connectionsPerPeer, - MaxUserRPS: maxUserRPS, - ArchiveWorkerCount: archiveWorkerCount, + ReceiverProxyConstantConfig: proxy.ReceiverProxyConstantConfig{ + Log: log, + FlashbotsSignerAddress: flashbotsSignerAddress, + LocalBuilderEndpoint: builderEndpoint, + }, + BuilderConfigHubEndpoint: builderConfigHubEndpoint, + ArchiveEndpoint: archiveEndpoint, + ArchiveConnections: connectionsPerPeer, + EthRPC: rpcEndpoint, + MaxRequestBodySizeBytes: maxRequestBodySizeBytes, + ConnectionsPerPeer: connectionsPerPeer, + MaxUserRPS: maxUserRPS, + ArchiveWorkerCount: archiveWorkerCount, } instance, err := proxy.NewReceiverProxy(*proxyConfig) diff --git a/cmd/test-e2e-latency/main.go b/cmd/test-e2e-latency/main.go new file mode 100644 index 0000000..f26afcf --- /dev/null +++ b/cmd/test-e2e-latency/main.go @@ -0,0 +1,231 @@ +package main + +import ( + "context" + "encoding/binary" + "encoding/json" + "errors" + "fmt" + "io" + "log" + "log/slog" + "net/http" + "os" + "slices" + "sync" + "time" + + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/flashbots/go-utils/rpcclient" + "github.com/flashbots/go-utils/signature" + "github.com/urfave/cli/v2" // imports as package "cli" +) + +var flags []cli.Flag = []cli.Flag{ + // input and output + &cli.StringFlag{ + Name: "local-orderflow-endpoint", + Value: "http://127.0.0.1", + Usage: "address to send orderflow to", + EnvVars: []string{"LOCAL_ORDERPLOW_ENDPOINT"}, + }, + &cli.IntFlag{ + Name: "local-receiver-server-port", + Value: 8646, + Usage: "address to send orderflow to", + EnvVars: []string{"LOCAL_RECEIVER_SERVER__PORT"}, + }, + &cli.IntFlag{ + Name: "num-senders", + Value: 50, + Usage: "Number of senders", + EnvVars: []string{"NUM_SENDERS"}, + }, + &cli.IntFlag{ + Name: "num-requests", + Value: 50000, + Usage: "Number of requests", + EnvVars: []string{"NUM_REQUESTS"}, + }, +} + +// test tx + +func main() { + app := &cli.App{ + Name: "test-tx-sender", + Usage: "send test transactions", + Flags: flags, + Action: func(cCtx *cli.Context) error { + orderflowSigner, err := signature.NewRandomSigner() + if err != nil { + return err + } + slog.Info("Ordeflow signing address", "address", orderflowSigner.Address()) + + localOrderflowEndpoint := cCtx.String("local-orderflow-endpoint") + client := rpcclient.NewClientWithOpts(localOrderflowEndpoint, &rpcclient.RPCClientOpts{ + Signer: orderflowSigner, + }) + slog.Info("Created client") + + receiverPort := cCtx.Int("local-receiver-server-port") + + senders := cCtx.Int("num-senders") + requests := cCtx.Int("num-requests") + + return runE2ELatencyTest(client, receiverPort, senders, requests) + }, + } + + if err := app.Run(os.Args); err != nil { + log.Fatal(err) + } +} + +type sharedState struct { + sentAt map[uint64]time.Time + receivedAt map[uint64]time.Time + mu sync.Mutex +} + +func (s *sharedState) ServeHTTP(w http.ResponseWriter, r *http.Request) { + receivedAt := time.Now() + body, _ := io.ReadAll(r.Body) + + // serve builderhub API + if r.URL.Path == "/api/l1-builder/v1/register_credentials/orderflow_proxy" { + w.WriteHeader(http.StatusOK) + return + } else if r.URL.Path == "/api/l1-builder/v1/builders" { + res, err := json.Marshal([]int{}) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusOK) + _, _ = w.Write(res) + return + } + + resp, err := json.Marshal(struct{}{}) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + _, _ = w.Write(resp) + + // forwarded request received + type jsonRPCRequest struct { + Params []hexutil.Bytes `json:"params"` + } + + var request jsonRPCRequest + err = json.Unmarshal(body, &request) + if err != nil { + return + } + if len(request.Params) != 1 { + return + } + + decoded := binary.BigEndian.Uint64(request.Params[0]) + + s.mu.Lock() + s.receivedAt[decoded] = receivedAt + s.mu.Unlock() +} + +func (s *sharedState) RunSender(client rpcclient.RPCClient, start, count int, wg *sync.WaitGroup) { + defer wg.Done() + for i := start; i < start+count; i += 1 { + b := make([]byte, 8) + //nolint:gosec + binary.BigEndian.PutUint64(b, uint64(i)) + request := hexutil.Bytes(b) + + s.mu.Lock() + //nolint:gosec + s.sentAt[uint64(i)] = time.Now() + s.mu.Unlock() + // send eth_sendRawTransactions + resp, err := client.Call(context.Background(), "eth_sendRawTransaction", request) + if err != nil { + slog.Error("RPC request failed", "error", err) + continue + } + if resp.Error != nil { + slog.Error("RPC returned error", "error", resp.Error) + continue + } + } +} + +func runE2ELatencyTest(client rpcclient.RPCClient, receiverPort, senders, requests int) error { + state := &sharedState{ + sentAt: make(map[uint64]time.Time), + receivedAt: make(map[uint64]time.Time), + mu: sync.Mutex{}, + } + + //nolint:gosec + receiverServer := &http.Server{ + Addr: fmt.Sprintf("0.0.0.0:%d", receiverPort), + Handler: state, + } + + go func() { + if err := receiverServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + slog.Error("Failed while listening to server", "error", err) + os.Exit(1) + } + }() + + countPerSender := requests / senders + + slog.Info("Waiting for startup") + time.Sleep(time.Second * 5) + slog.Info("Sending started") + + start := time.Now() + offset := 0 + var wg sync.WaitGroup + for range senders { + wg.Add(1) + go state.RunSender(client, offset, countPerSender, &wg) + offset += countPerSender + } + wg.Wait() + sendingTime := time.Since(start) + + slog.Info("Waiting to finish") + time.Sleep(time.Second * 1) + + state.mu.Lock() + defer state.mu.Unlock() + + missedRequests := 0 + totalRequests := len(state.sentAt) + rps := float64(totalRequests) / (float64(sendingTime.Milliseconds()) / 1000.0) + + values := make([]float64, 0, totalRequests) + for request, sentAt := range state.sentAt { + receivedAt, ok := state.receivedAt[request] + if !ok { + missedRequests += 1 + continue + } + diff := float64(receivedAt.Sub(sentAt).Microseconds()) / 1000.0 + values = append(values, diff) + } + slices.Sort(values) + p50 := values[(len(values)*50)/100] + p99 := values[(len(values)*99)/100] + p100 := values[len(values)-1] + + missedPercentage := float64(missedRequests) / float64(totalRequests) * 100.0 + + slog.Info("Results", "p50", p50, "p99", p99, "p100", p100, "totalReq", totalRequests, "missedRequests", missedRequests, "missedPercent", missedPercentage, "averageRPS", rps) + + return nil +} diff --git a/cmd/test-tx-sender/main.go b/cmd/test-tx-sender/main.go index 6397e1f..2b9a60b 100644 --- a/cmd/test-tx-sender/main.go +++ b/cmd/test-tx-sender/main.go @@ -3,13 +3,12 @@ package main import ( "context" "errors" - "io" "log" "log/slog" - "net/http" "os" "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/flashbots/go-utils/rpcclient" "github.com/flashbots/go-utils/rpctypes" "github.com/flashbots/go-utils/signature" "github.com/flashbots/tdx-orderflow-proxy/proxy" @@ -21,16 +20,10 @@ var flags []cli.Flag = []cli.Flag{ // input and output &cli.StringFlag{ Name: "local-orderflow-endpoint", - Value: "https://127.0.0.1:443", + Value: "http://127.0.0.1", Usage: "address to send orderflow to", EnvVars: []string{"LOCAL_ORDERPLOW_ENDPOINT"}, }, - &cli.StringFlag{ - Name: "cert-endpoint", - Value: "http://127.0.0.1:14727", - Usage: "address that serves certifiate on /cert endpoint", - EnvVars: []string{"CERT_ENDPOINT"}, - }, &cli.StringFlag{ Name: "signer-private-key", Value: "0x52da2727dd1180b547258c9ca7deb7f9576b2768f3f293b67f36505c85b2ddd0", @@ -57,7 +50,6 @@ func main() { Flags: flags, Action: func(cCtx *cli.Context) error { localOrderflowEndpoint := cCtx.String("local-orderflow-endpoint") - certEndpoint := cCtx.String("cert-endpoint") signerPrivateKey := cCtx.String("signer-private-key") orderflowSigner, err := signature.NewSignerFromHexPrivateKey(signerPrivateKey) @@ -66,16 +58,9 @@ func main() { } slog.Info("Ordeflow signing address", "address", orderflowSigner.Address()) - cert, err := fetchCertificate(certEndpoint + "/cert") - if err != nil { - return err - } - slog.Info("Fetched certificate") - - client, err := proxy.RPCClientWithCertAndSigner(localOrderflowEndpoint, cert, orderflowSigner, 1) - if err != nil { - return err - } + client := rpcclient.NewClientWithOpts(localOrderflowEndpoint, &rpcclient.RPCClientOpts{ + Signer: orderflowSigner, + }) slog.Info("Created client") rpcEndpoint := cCtx.String("rpc-endpoint") @@ -188,17 +173,3 @@ func main() { log.Fatal(err) } } - -func fetchCertificate(endpoint string) ([]byte, error) { - resp, err := http.Get(endpoint) //nolint:gosec - if err != nil { - return nil, err - } - defer resp.Body.Close() - - body, err := io.ReadAll(resp.Body) - if err != nil { - return nil, err - } - return body, nil -} diff --git a/e2e-latenc-test.sh b/e2e-latenc-test.sh new file mode 100644 index 0000000..f30ef73 --- /dev/null +++ b/e2e-latenc-test.sh @@ -0,0 +1,14 @@ +#!/bin/bash + +make build + +./build/receiver-proxy --pprof --user-listen-addr 0.0.0.0:9976 --builder-endpoint http://127.0.0.1:7890 --builder-confighub-endpoint http://127.0.0.1:7890 --orderflow-archive-endpoint http://127.0.0.1:7890 --connections-per-peer 100 > /tmp/log.txt 2>&1 & +PROXY_PID=$! + +./build/test-e2e-latency --local-orderflow-endpoint http://127.0.0.1:9976 --local-receiver-server-port 7890 --num-requests 100000 --num-senders 1000 + +# uncomment to send orderflow without proxy to see test setup overhead +#./build/test-e2e-latency --local-orderflow-endpoint http://127.0.0.1:7890 --local-receiver-server-port 7890 --num-requests 100000 --num-senders 1000 + +#sleep 10 +kill $PROXY_PID diff --git a/go.mod b/go.mod index 110a24b..29ea8f6 100644 --- a/go.mod +++ b/go.mod @@ -9,15 +9,18 @@ require ( github.com/cenkalti/backoff v2.2.1+incompatible github.com/ethereum/go-ethereum v1.15.5 github.com/flashbots/go-utils v0.14.0 + github.com/goccy/go-json v0.10.5 github.com/google/uuid v1.6.0 github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/stretchr/testify v1.10.0 github.com/urfave/cli/v2 v2.27.5 - golang.org/x/net v0.34.0 + github.com/valyala/fasthttp v1.62.0 + golang.org/x/net v0.40.0 golang.org/x/time v0.9.0 ) require ( + github.com/andybalholm/brotli v1.1.1 // indirect github.com/bits-and-blooms/bitset v1.17.0 // indirect github.com/consensys/bavard v0.1.22 // indirect github.com/consensys/gnark-crypto v0.14.0 // indirect @@ -28,19 +31,20 @@ require ( github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect github.com/ethereum/c-kzg-4844 v1.0.0 // indirect github.com/ethereum/go-verkle v0.2.2 // indirect - github.com/goccy/go-json v0.10.5 // indirect github.com/holiman/uint256 v1.3.2 // indirect + github.com/klauspost/compress v1.18.0 // indirect github.com/mmcloughlin/addchain v0.4.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/supranational/blst v0.3.14 // indirect + github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/fastrand v1.1.0 // indirect github.com/valyala/histogram v1.2.0 // indirect github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 // indirect - golang.org/x/crypto v0.32.0 // indirect - golang.org/x/sync v0.10.0 // indirect - golang.org/x/sys v0.29.0 // indirect - golang.org/x/text v0.21.0 // indirect + golang.org/x/crypto v0.38.0 // indirect + golang.org/x/sync v0.14.0 // indirect + golang.org/x/sys v0.33.0 // indirect + golang.org/x/text v0.25.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect rsc.io/tmplfunc v0.0.3 // indirect ) diff --git a/go.sum b/go.sum index 959ddfe..dfe3110 100644 --- a/go.sum +++ b/go.sum @@ -4,6 +4,8 @@ github.com/VictoriaMetrics/fastcache v1.12.2 h1:N0y9ASrJ0F6h0QaC3o6uJb3NIZ9VKLjC github.com/VictoriaMetrics/fastcache v1.12.2/go.mod h1:AmC+Nzz1+3G2eCPapF6UcsnkThDcMsQicp4xDukwJYI= github.com/VictoriaMetrics/metrics v1.35.1 h1:o84wtBKQbzLdDy14XeskkCZih6anG+veZ1SwJHFGwrU= github.com/VictoriaMetrics/metrics v1.35.1/go.mod h1:r7hveu6xMdUACXvB8TYdAj8WEsKzWB0EkpJN+RDtOf8= +github.com/andybalholm/brotli v1.1.1 h1:PR2pgnyFznKEugtsUo0xLdDop5SKXd5Qf5ysW+7XdTA= +github.com/andybalholm/brotli v1.1.1/go.mod h1:05ib4cKhjx3OQYUY22hTVd34Bc8upXjOLL2rKwwZBoA= github.com/bits-and-blooms/bitset v1.17.0 h1:1X2TS7aHz1ELcC0yU1y2stUs/0ig5oMU6STFZGrhvHI= github.com/bits-and-blooms/bitset v1.17.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8= github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4= @@ -49,6 +51,8 @@ github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/holiman/uint256 v1.3.2 h1:a9EgMPSC1AAaj1SZL5zIQD3WbwTuHrMGOerLjGmM/TA= github.com/holiman/uint256 v1.3.2/go.mod h1:EOMSn4q6Nyt9P6efbI3bueV4e1b3dGlUCXeiRV4ng7E= +github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= +github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -84,22 +88,28 @@ github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+F github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY= github.com/urfave/cli/v2 v2.27.5 h1:WoHEJLdsXr6dDWoJgMq/CboDmyY/8HMMH1fTECbih+w= github.com/urfave/cli/v2 v2.27.5/go.mod h1:3Sevf16NykTbInEnD0yKkjDAeZDS0A6bzhBH5hrMvTQ= +github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= +github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= +github.com/valyala/fasthttp v1.62.0 h1:8dKRBX/y2rCzyc6903Zu1+3qN0H/d2MsxPPmVNamiH0= +github.com/valyala/fasthttp v1.62.0/go.mod h1:FCINgr4GKdKqV8Q0xv8b+UxPV+H/O5nNFo3D+r54Htg= github.com/valyala/fastrand v1.1.0 h1:f+5HkLW4rsgzdNoleUOB69hyT9IlD2ZQh9GyDMfb5G8= github.com/valyala/fastrand v1.1.0/go.mod h1:HWqCzkrkg6QXT8V2EXWvXCoow7vLwOFN002oeRzjapQ= github.com/valyala/histogram v1.2.0 h1:wyYGAZZt3CpwUiIb9AU/Zbllg1llXyrtApRS815OLoQ= github.com/valyala/histogram v1.2.0/go.mod h1:Hb4kBwb4UxsaNbbbh+RRz8ZR6pdodR57tzWUS3BUzXY= github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 h1:gEOO8jv9F4OT7lGCjxCBTO/36wtF6j2nSip77qHd4x4= github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1/go.mod h1:Ohn+xnUBiLI6FVj/9LpzZWtj1/D6lUovWYBkxHVV3aM= -golang.org/x/crypto v0.32.0 h1:euUpcYgM8WcP71gNpTqQCn6rC2t6ULUPiOzfWaXVVfc= -golang.org/x/crypto v0.32.0/go.mod h1:ZnnJkOaASj8g0AjIduWNlq2NRxL0PlBrbKVyZ6V/Ugc= -golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0= -golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k= -golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= -golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= -golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU= -golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= -golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= +github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU= +github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E= +golang.org/x/crypto v0.38.0 h1:jt+WWG8IZlBnVbomuhg2Mdq0+BBQaHbtqHEFEigjUV8= +golang.org/x/crypto v0.38.0/go.mod h1:MvrbAqul58NNYPKnOra203SB9vpuZW0e+RRZV+Ggqjw= +golang.org/x/net v0.40.0 h1:79Xs7wF06Gbdcg4kdCCIQArK11Z1hr5POQ6+fIYHNuY= +golang.org/x/net v0.40.0/go.mod h1:y0hY0exeL2Pku80/zKK7tpntoX23cqL3Oa6njdgRtds= +golang.org/x/sync v0.14.0 h1:woo0S4Yywslg6hp4eUFjTVOyKt0RookbpAHG4c1HmhQ= +golang.org/x/sync v0.14.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw= +golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/text v0.25.0 h1:qVyWApTSYLk/drJRO5mDlNYskwQznZmkpV2c8q9zls4= +golang.org/x/text v0.25.0/go.mod h1:WEdwpYrmk1qmdHvhkSTNPm3app7v4rsT8F2UD6+VHIA= golang.org/x/time v0.9.0 h1:EsRrnYcQiGH+5FfbgvV4AP7qEZstoyrHB0DzarOQ4ZY= golang.org/x/time v0.9.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/proxy/receiver_api.go b/proxy/receiver_api.go index e988046..6b6a198 100644 --- a/proxy/receiver_api.go +++ b/proxy/receiver_api.go @@ -80,7 +80,7 @@ func (prx *ReceiverProxy) UserJSONRPCHandler(maxRequestBodySizeBytes int64) (*rp // readyHandler calls /readyz on rbuilder func (prx *ReceiverProxy) readyHandler(w http.ResponseWriter, r *http.Request) error { - resp, err := http.Get(prx.localBuilderEndpoint + "/readyz") + resp, err := http.Get(prx.LocalBuilderEndpoint + "/readyz") if err != nil { prx.Log.Warn("Failed to check builder readiness", slog.Any("error", err)) http.Error(w, "not ready", http.StatusServiceUnavailable) @@ -374,6 +374,9 @@ type ParsedRequest struct { ethCancelBundle *rpctypes.EthCancelBundleArgs ethSendRawTransaction *rpctypes.EthSendRawTransactionArgs bidSubsidiseBlock *rpctypes.BidSubsisideBlockArgs + + serializedJSONRPCRequest []byte + signatureHeader string } func (prx *ReceiverProxy) HandleParsedRequest(ctx context.Context, parsedRequest ParsedRequest) error { @@ -408,10 +411,21 @@ func (prx *ReceiverProxy) HandleParsedRequest(ctx context.Context, parsedRequest incRequestDurationStep(time.Since(startAt), parsedRequest.method, "", "rate_limiting") startAt = time.Now() - select { - case <-ctx.Done(): - prx.Log.Error("Shared queue is stalling") - case prx.shareQueue <- &parsedRequest: + err := SerializeParsedRequestForSharing(&parsedRequest, prx.OrderflowSigner) + if err != nil { + prx.Log.Warn("Failed to serialize request for sharing", slog.Any("error", err)) + } + + incRequestDurationStep(time.Since(startAt), parsedRequest.method, "", "serialize_parsed_request") + startAt = time.Now() + + // since we send to local builder while handling the request we can skip sharing request + if !parsedRequest.systemEndpoint { + select { + case <-ctx.Done(): + prx.Log.Error("Shared queue is stalling") + case prx.shareQueue <- &parsedRequest: + } } incRequestDurationStep(time.Since(startAt), parsedRequest.method, "", "share_queue") @@ -426,5 +440,14 @@ func (prx *ReceiverProxy) HandleParsedRequest(ctx context.Context, parsedRequest } incRequestDurationStep(time.Since(startAt), parsedRequest.method, "", "archive_queue") + startAt = time.Now() + // since we always send to local builder we do it here to avoid queue entirery + + err = prx.localBuilderSender.SendRequest(&parsedRequest) + if err != nil { + prx.Log.Debug("Failed to send request to a local builder", slog.Any("error", err)) + } + + incRequestDurationStep(time.Since(startAt), parsedRequest.method, "", "local_builder") return nil } diff --git a/proxy/receiver_proxy.go b/proxy/receiver_proxy.go index 22d3798..54d3ddc 100644 --- a/proxy/receiver_proxy.go +++ b/proxy/receiver_proxy.go @@ -39,9 +39,6 @@ type ReceiverProxy struct { OrderflowSigner *signature.Signer - localBuilder rpcclient.RPCClient - localBuilderEndpoint string - UserHandler http.Handler SystemHandler http.Handler @@ -61,6 +58,8 @@ type ReceiverProxy struct { peerUpdaterClose chan struct{} userAPIRateLimiter *rate.Limiter + + localBuilderSender LocalBuilderSender } type ReceiverProxyConstantConfig struct { @@ -68,6 +67,7 @@ type ReceiverProxyConstantConfig struct { // Name is optional field and it used to distringuish multiple proxies when running in the same process in tests Name string FlashbotsSignerAddress common.Address + LocalBuilderEndpoint string } type ReceiverProxyConfig struct { @@ -76,7 +76,6 @@ type ReceiverProxyConfig struct { BuilderConfigHubEndpoint string ArchiveEndpoint string ArchiveConnections int - LocalBuilderEndpoint string // EthRPC should support eth_blockNumber API EthRPC string @@ -94,26 +93,23 @@ func NewReceiverProxy(config ReceiverProxyConfig) (*ReceiverProxy, error) { return nil, err } - localCl := HTTPClientLocalhost(DefaultLocalhostMaxIdleConn) - - localBuilder := rpcclient.NewClientWithOpts(config.LocalBuilderEndpoint, &rpcclient.RPCClientOpts{ - HTTPClient: localCl, - }) - limit := rate.Limit(config.MaxUserRPS) if config.MaxUserRPS == 0 { limit = rate.Inf } userAPIRateLimiter := rate.NewLimiter(limit, config.MaxUserRPS) + localBuilderSender, err := NewLocalBuilderSender(config.Log, config.LocalBuilderEndpoint, config.ConnectionsPerPeer) + if err != nil { + return nil, err + } prx := &ReceiverProxy{ ReceiverProxyConstantConfig: config.ReceiverProxyConstantConfig, ConfigHub: NewBuilderConfigHub(config.Log, config.BuilderConfigHubEndpoint), OrderflowSigner: orderflowSigner, - localBuilder: localBuilder, - localBuilderEndpoint: config.LocalBuilderEndpoint, requestUniqueKeysRLU: expirable.NewLRU[uuid.UUID, struct{}](requestsRLUSize, nil, requestsRLUTTL), replacementNonceRLU: expirable.NewLRU[replacementNonceKey, int](replacementNonceSize, nil, replacementNonceTTL), userAPIRateLimiter: userAPIRateLimiter, + localBuilderSender: localBuilderSender, } maxRequestBodySizeBytes := DefaultMaxRequestBodySizeBytes if config.MaxRequestBodySizeBytes != 0 { @@ -136,12 +132,12 @@ func NewReceiverProxy(config ReceiverProxyConfig) (*ReceiverProxy, error) { updatePeersCh := make(chan []ConfighubBuilder) prx.shareQueue = shareQeueuCh prx.updatePeers = updatePeersCh + queue := ShareQueue{ name: prx.Name, log: prx.Log, queue: shareQeueuCh, updatePeers: updatePeersCh, - localBuilder: prx.localBuilder, signer: prx.OrderflowSigner, workersPerPeer: config.ConnectionsPerPeer, } diff --git a/proxy/receiver_proxy_test.go b/proxy/receiver_proxy_test.go index 5afab08..ea416f9 100644 --- a/proxy/receiver_proxy_test.go +++ b/proxy/receiver_proxy_test.go @@ -252,11 +252,11 @@ func createProxy(localBuilder, name, certPath, certKeyPath string) *ReceiverProx Log: log, Name: name, FlashbotsSignerAddress: flashbotsSigner.Address(), + LocalBuilderEndpoint: localBuilder, }, BuilderConfigHubEndpoint: builderHub.URL, ArchiveEndpoint: archiveServer.URL, - LocalBuilderEndpoint: localBuilder, EthRPC: "eth-rpc-not-set", MaxUserRPS: 10, }) diff --git a/proxy/sender_proxy.go b/proxy/sender_proxy.go index a3d323c..1867f0e 100644 --- a/proxy/sender_proxy.go +++ b/proxy/sender_proxy.go @@ -70,7 +70,6 @@ func NewSenderProxy(config SenderProxyConfig) (*SenderProxy, error) { log: prx.Log, queue: prx.shareQueue, updatePeers: prx.updatePeers, - localBuilder: nil, signer: prx.OrderflowSigner, workersPerPeer: config.ConnectionsPerPeer, } diff --git a/proxy/sharing.go b/proxy/sharing.go index 6f78f7d..d53a4d6 100644 --- a/proxy/sharing.go +++ b/proxy/sharing.go @@ -2,16 +2,23 @@ package proxy import ( "context" + "errors" "log/slog" + "net/http" "time" + "github.com/flashbots/go-utils/jsonrpc" "github.com/flashbots/go-utils/rpcclient" "github.com/flashbots/go-utils/signature" + "github.com/goccy/go-json" + "github.com/valyala/fasthttp" ) var ( ShareWorkerQueueSize = 10000 requestTimeout = time.Second * 10 + + errUnknownRequestType = errors.New("unknown request type for sharing") ) const ( @@ -19,29 +26,30 @@ const ( ) type ShareQueue struct { - name string - log *slog.Logger - queue chan *ParsedRequest - updatePeers chan []ConfighubBuilder - localBuilder rpcclient.RPCClient - signer *signature.Signer + name string + log *slog.Logger + queue chan *ParsedRequest + updatePeers chan []ConfighubBuilder + signer *signature.Signer // if > 0 share queue will spawn multiple senders per peer workersPerPeer int } type shareQueuePeer struct { - ch chan *ParsedRequest - name string - client rpcclient.RPCClient - conf ConfighubBuilder + ch chan *ParsedRequest + name string + client *fasthttp.Client + conf ConfighubBuilder + endpoint string } -func newShareQueuePeer(name string, client rpcclient.RPCClient, conf ConfighubBuilder) shareQueuePeer { +func newShareQueuePeer(name string, client *fasthttp.Client, conf ConfighubBuilder, endpoint string) shareQueuePeer { return shareQueuePeer{ - ch: make(chan *ParsedRequest, ShareWorkerQueueSize), - name: name, - client: client, - conf: conf, + ch: make(chan *ParsedRequest, ShareWorkerQueueSize), + name: name, + client: client, + conf: conf, + endpoint: endpoint, } } @@ -63,18 +71,7 @@ func (sq *ShareQueue) Run() { if sq.workersPerPeer > 0 { workersPerPeer = sq.workersPerPeer } - var ( - localBuilder *shareQueuePeer - peers []shareQueuePeer - ) - if sq.localBuilder != nil { - builderPeer := newShareQueuePeer("local-builder", sq.localBuilder, ConfighubBuilder{}) - localBuilder = &builderPeer - for worker := range workersPerPeer { - go sq.proxyRequests(localBuilder, worker) - } - defer localBuilder.Close() - } + var peers []shareQueuePeer for { select { case req, more := <-sq.queue: @@ -83,9 +80,6 @@ func (sq *ShareQueue) Run() { sq.log.Info("Share queue closing, queue channel closed") return } - if localBuilder != nil { - localBuilder.SendRequest(sq.log, req) - } if !req.systemEndpoint { for _, peer := range peers { peer.SendRequest(sq.log, req) @@ -133,14 +127,15 @@ func (sq *ShareQueue) Run() { if info.OrderflowProxy.EcdsaPubkeyAddress == sq.signer.Address() { continue } - client, err := RPCClientWithCertAndSigner(info.SystemAPIAddress(), []byte(info.TLSCert()), sq.signer, workersPerPeer) + client, err := NewFastHTTPClient([]byte(info.TLSCert()), workersPerPeer) if err != nil { - sq.log.Error("Failed to create a peer client", slog.Any("error", err)) + sq.log.Error("Failed to create a peer client3", slog.Any("error", err)) shareQueueInternalErrors.Inc() continue } + sq.log.Info("Created client for peer", slog.String("peer", info.Name), slog.String("name", sq.name)) - newPeer := newShareQueuePeer(info.Name, client, info) + newPeer := newShareQueuePeer(info.Name, client, info, info.SystemAPIAddress()) peers = append(peers, newPeer) for worker := range workersPerPeer { go sq.proxyRequests(&newPeer, worker) @@ -150,6 +145,83 @@ func (sq *ShareQueue) Run() { } } +type LocalBuilderSender struct { + logger *slog.Logger + client *fasthttp.Client + endpoint string +} + +func NewLocalBuilderSender(logger *slog.Logger, endpoint string, maxOpenConnections int) (LocalBuilderSender, error) { + logger = logger.With(slog.String("peer", "local-builder")) + + client, err := NewFastHTTPClient(nil, maxOpenConnections) + if err != nil { + return LocalBuilderSender{}, err + } + + return LocalBuilderSender{ + logger, client, endpoint, + }, nil +} + +func (s *LocalBuilderSender) SendRequest(req *ParsedRequest) error { + request := fasthttp.AcquireRequest() + request.SetRequestURI(s.endpoint) + request.Header.SetMethod(http.MethodPost) + request.Header.SetContentTypeBytes([]byte("application/json")) + defer fasthttp.ReleaseRequest(request) + + return sendShareRequest(s.logger, req, request, s.client, "local-builder") +} + +func sendShareRequest(logger *slog.Logger, req *ParsedRequest, request *fasthttp.Request, client *fasthttp.Client, peerName string) error { + if req.serializedJSONRPCRequest == nil { + logger.Debug("Skip sharing request that is not serialized properly") + return nil + } + + timeInQueue := time.Since(req.receivedAt) + + request.Header.Set(signature.HTTPHeader, req.signatureHeader) + request.SetBodyRaw(req.serializedJSONRPCRequest) + + resp := fasthttp.AcquireResponse() + start := time.Now() + err := client.DoTimeout(request, resp, requestTimeout) + requestDuration := time.Since(start) + timeE2E := timeInQueue + requestDuration + + // in background update metrics and handle response + go func() { + isBig := req.size >= bigRequestSize + timeShareQueuePeerQueueDuration(peerName, timeInQueue, req.method, req.systemEndpoint, isBig) + timeShareQueuePeerRPCDuration(peerName, requestDuration.Milliseconds(), isBig) + timeShareQueuePeerE2EDuration(peerName, timeE2E, req.method, req.systemEndpoint, isBig) + + logSendErrorLevel := slog.LevelDebug + if peerName == "local-builder" { + logSendErrorLevel = slog.LevelWarn + } + if err != nil { + logger.Log(context.Background(), logSendErrorLevel, "Error while proxying request", slog.Any("error", err)) + incShareQueuePeerRPCErrors(peerName) + } else { + var parsedResp jsonrpc.JSONRPCResponse + err = json.Unmarshal(resp.Body(), &parsedResp) + if err != nil { + logger.Log(context.Background(), logSendErrorLevel, "Error parsing response while proxying", slog.Any("error", err)) + incShareQueuePeerRPCErrors(peerName) + } else if parsedResp.Error != nil { + logger.Log(context.Background(), logSendErrorLevel, "Error returned from target while proxying", slog.Any("error", parsedResp.Error)) + incShareQueuePeerRPCErrors(peerName) + } + } + fasthttp.ReleaseResponse(resp) + }() + + return nil +} + func (sq *ShareQueue) proxyRequests(peer *shareQueuePeer, worker int) { proxiedRequestCount := 0 logger := sq.log.With(slog.String("peer", peer.name), slog.String("name", sq.name), slog.Int("worker", worker)) @@ -157,56 +229,74 @@ func (sq *ShareQueue) proxyRequests(peer *shareQueuePeer, worker int) { defer func() { logger.Info("Stopped proxying requets to peer", slog.Int("proxiedRequestCount", proxiedRequestCount)) }() + + request := fasthttp.AcquireRequest() + request.SetRequestURI(peer.endpoint) + request.Header.SetMethod(http.MethodPost) + request.Header.SetContentTypeBytes([]byte("application/json")) + defer fasthttp.ReleaseRequest(request) + for { req, more := <-peer.ch if !more { return } - var ( - method string - data any - ) - isBig := req.size > bigRequestSize - if req.ethSendBundle != nil { - method = EthSendBundleMethod - data = req.ethSendBundle - } else if req.mevSendBundle != nil { - method = MevSendBundleMethod - data = req.mevSendBundle - } else if req.ethCancelBundle != nil { - method = EthCancelBundleMethod - data = req.ethCancelBundle - } else if req.ethSendRawTransaction != nil { - method = EthSendRawTransactionMethod - data = req.ethSendRawTransaction - } else if req.bidSubsidiseBlock != nil { - method = BidSubsidiseBlockMethod - data = req.bidSubsidiseBlock - } else { - logger.Error("Unknown request type", slog.String("method", req.method)) - shareQueueInternalErrors.Inc() + + if req.serializedJSONRPCRequest == nil { + logger.Debug("Skip sharing request that is not serialized properly") continue } - timeShareQueuePeerQueueDuration(peer.name, time.Since(req.receivedAt), method, req.systemEndpoint, isBig) - start := time.Now() - ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) - resp, err := peer.client.Call(ctx, method, data) - cancel() - timeShareQueuePeerRPCDuration(peer.name, time.Since(start).Milliseconds(), isBig) - timeShareQueuePeerE2EDuration(peer.name, time.Since(req.receivedAt), method, req.systemEndpoint, isBig) - logSendErrorLevel := slog.LevelDebug - if peer.name == "local-builder" { - logSendErrorLevel = slog.LevelWarn - } + + err := sendShareRequest(logger, req, request, peer.client, peer.name) if err != nil { - logger.Log(context.Background(), logSendErrorLevel, "Error while proxying request", slog.Any("error", err)) - incShareQueuePeerRPCErrors(peer.name) - } - if resp != nil && resp.Error != nil { - logger.Log(context.Background(), logSendErrorLevel, "Error returned from target while proxying", slog.Any("error", resp.Error)) - incShareQueuePeerRPCErrors(peer.name) + logger.Debug("Failed to proxy a request", slog.Any("error", err)) } + proxiedRequestCount += 1 logger.Debug("Message proxied") } } + +func SerializeParsedRequestForSharing(req *ParsedRequest, signer *signature.Signer) error { + var ( + method string + data any + ) + if req.ethSendBundle != nil { + method = EthSendBundleMethod + data = req.ethSendBundle + } else if req.mevSendBundle != nil { + method = MevSendBundleMethod + data = req.mevSendBundle + } else if req.ethCancelBundle != nil { + method = EthCancelBundleMethod + data = req.ethCancelBundle + } else if req.ethSendRawTransaction != nil { + method = EthSendRawTransactionMethod + data = req.ethSendRawTransaction + } else if req.bidSubsidiseBlock != nil { + method = BidSubsidiseBlockMethod + data = req.bidSubsidiseBlock + } else { + return errUnknownRequestType + } + + request := rpcclient.NewRequestWithID(0, method, data) + + ser, err := json.Marshal(request) + if err != nil { + return err + } + + req.serializedJSONRPCRequest = ser + + if signer != nil { + header, err := signer.Create(ser) + if err != nil { + return err + } + req.signatureHeader = header + } + + return nil +} diff --git a/proxy/utils.go b/proxy/utils.go index 23c7a75..8495f27 100644 --- a/proxy/utils.go +++ b/proxy/utils.go @@ -15,6 +15,7 @@ import ( "github.com/flashbots/go-utils/cli" "github.com/flashbots/go-utils/rpcclient" "github.com/flashbots/go-utils/signature" + "github.com/valyala/fasthttp" "golang.org/x/net/http2" ) @@ -52,6 +53,28 @@ func HTTPClientWithMaxConnections(maxOpenConnections int) *http.Client { } } +func NewFastHTTPClient(certPEM []byte, maxOpenConnections int) (*fasthttp.Client, error) { + var tlsConfig *tls.Config + if certPEM != nil { + certPool := x509.NewCertPool() + if ok := certPool.AppendCertsFromPEM(certPEM); !ok { + return nil, errCertificate + } + tlsConfig = &tls.Config{ + RootCAs: certPool, + MinVersion: tls.VersionTLS12, + } + } + + return &fasthttp.Client{ + MaxIdleConnDuration: time.Minute * 2, + NoDefaultUserAgentHeader: true, + DisableHeaderNamesNormalizing: true, + // DisablePathNormalizing: true, + TLSConfig: tlsConfig, + }, nil +} + func HTTPClientLocalhost(maxOpenConnections int) *http.Client { localTransport := &http.Transport{ MaxIdleConnsPerHost: maxOpenConnections,