Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ build: ## Build the HTTP server
go build -trimpath -ldflags "-X github.com/flashbots/tdx-orderflow-proxy/common.Version=${VERSION}" -v -o ./build/sender-proxy cmd/sender-proxy/main.go
go build -trimpath -ldflags "-X github.com/flashbots/tdx-orderflow-proxy/common.Version=${VERSION}" -v -o ./build/receiver-proxy cmd/receiver-proxy/main.go
go build -trimpath -ldflags "-X github.com/flashbots/tdx-orderflow-proxy/common.Version=${VERSION}" -v -o ./build/test-orderflow-sender cmd/test-tx-sender/main.go
go build -trimpath -ldflags "-X github.com/flashbots/tdx-orderflow-proxy/common.Version=${VERSION}" -v -o ./build/test-e2e-latency cmd/test-e2e-latency/main.go

.PHONY: build-receiver-proxy
build-receiver-proxy: ## Build only the receiver-proxy
Expand Down
23 changes: 13 additions & 10 deletions cmd/receiver-proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,16 +212,19 @@ func runMain(cCtx *cli.Context) error {
maxUserRPS := cCtx.Int(flagMaxUserRPS)

proxyConfig := &proxy.ReceiverProxyConfig{
ReceiverProxyConstantConfig: proxy.ReceiverProxyConstantConfig{Log: log, FlashbotsSignerAddress: flashbotsSignerAddress},
BuilderConfigHubEndpoint: builderConfigHubEndpoint,
ArchiveEndpoint: archiveEndpoint,
ArchiveConnections: connectionsPerPeer,
LocalBuilderEndpoint: builderEndpoint,
EthRPC: rpcEndpoint,
MaxRequestBodySizeBytes: maxRequestBodySizeBytes,
ConnectionsPerPeer: connectionsPerPeer,
MaxUserRPS: maxUserRPS,
ArchiveWorkerCount: archiveWorkerCount,
ReceiverProxyConstantConfig: proxy.ReceiverProxyConstantConfig{
Log: log,
FlashbotsSignerAddress: flashbotsSignerAddress,
LocalBuilderEndpoint: builderEndpoint,
},
BuilderConfigHubEndpoint: builderConfigHubEndpoint,
ArchiveEndpoint: archiveEndpoint,
ArchiveConnections: connectionsPerPeer,
EthRPC: rpcEndpoint,
MaxRequestBodySizeBytes: maxRequestBodySizeBytes,
ConnectionsPerPeer: connectionsPerPeer,
MaxUserRPS: maxUserRPS,
ArchiveWorkerCount: archiveWorkerCount,
}

instance, err := proxy.NewReceiverProxy(*proxyConfig)
Expand Down
231 changes: 231 additions & 0 deletions cmd/test-e2e-latency/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
package main

import (
"context"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"log/slog"
"net/http"
"os"
"slices"
"sync"
"time"

"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/flashbots/go-utils/rpcclient"
"github.com/flashbots/go-utils/signature"
"github.com/urfave/cli/v2" // imports as package "cli"
)

var flags []cli.Flag = []cli.Flag{
// input and output
&cli.StringFlag{
Name: "local-orderflow-endpoint",
Value: "http://127.0.0.1",
Usage: "address to send orderflow to",
EnvVars: []string{"LOCAL_ORDERPLOW_ENDPOINT"},
},
&cli.IntFlag{
Name: "local-receiver-server-port",
Value: 8646,
Usage: "address to send orderflow to",
EnvVars: []string{"LOCAL_RECEIVER_SERVER__PORT"},
},
&cli.IntFlag{
Name: "num-senders",
Value: 50,
Usage: "Number of senders",
EnvVars: []string{"NUM_SENDERS"},
},
&cli.IntFlag{
Name: "num-requests",
Value: 50000,
Usage: "Number of requests",
EnvVars: []string{"NUM_REQUESTS"},
},
}

// test tx

func main() {
app := &cli.App{
Name: "test-tx-sender",
Usage: "send test transactions",
Flags: flags,
Action: func(cCtx *cli.Context) error {
orderflowSigner, err := signature.NewRandomSigner()
if err != nil {
return err
}
slog.Info("Ordeflow signing address", "address", orderflowSigner.Address())

localOrderflowEndpoint := cCtx.String("local-orderflow-endpoint")
client := rpcclient.NewClientWithOpts(localOrderflowEndpoint, &rpcclient.RPCClientOpts{
Signer: orderflowSigner,
})
slog.Info("Created client")

receiverPort := cCtx.Int("local-receiver-server-port")

senders := cCtx.Int("num-senders")
requests := cCtx.Int("num-requests")

return runE2ELatencyTest(client, receiverPort, senders, requests)
},
}

if err := app.Run(os.Args); err != nil {
log.Fatal(err)
}
}

type sharedState struct {
sentAt map[uint64]time.Time
receivedAt map[uint64]time.Time
mu sync.Mutex
}

func (s *sharedState) ServeHTTP(w http.ResponseWriter, r *http.Request) {
receivedAt := time.Now()
body, _ := io.ReadAll(r.Body)

// serve builderhub API
if r.URL.Path == "/api/l1-builder/v1/register_credentials/orderflow_proxy" {
w.WriteHeader(http.StatusOK)
return
} else if r.URL.Path == "/api/l1-builder/v1/builders" {
res, err := json.Marshal([]int{})
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
_, _ = w.Write(res)
return
}

resp, err := json.Marshal(struct{}{})
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
_, _ = w.Write(resp)

// forwarded request received
type jsonRPCRequest struct {
Params []hexutil.Bytes `json:"params"`
}

var request jsonRPCRequest
err = json.Unmarshal(body, &request)
if err != nil {
return
}
if len(request.Params) != 1 {
return
}

decoded := binary.BigEndian.Uint64(request.Params[0])

s.mu.Lock()
s.receivedAt[decoded] = receivedAt
s.mu.Unlock()
}

func (s *sharedState) RunSender(client rpcclient.RPCClient, start, count int, wg *sync.WaitGroup) {
defer wg.Done()
for i := start; i < start+count; i += 1 {
b := make([]byte, 8)
//nolint:gosec
binary.BigEndian.PutUint64(b, uint64(i))
request := hexutil.Bytes(b)

s.mu.Lock()
//nolint:gosec
s.sentAt[uint64(i)] = time.Now()
s.mu.Unlock()
// send eth_sendRawTransactions
resp, err := client.Call(context.Background(), "eth_sendRawTransaction", request)
if err != nil {
slog.Error("RPC request failed", "error", err)
continue
}
if resp.Error != nil {
slog.Error("RPC returned error", "error", resp.Error)
continue
}
}
}

func runE2ELatencyTest(client rpcclient.RPCClient, receiverPort, senders, requests int) error {
state := &sharedState{
sentAt: make(map[uint64]time.Time),
receivedAt: make(map[uint64]time.Time),
mu: sync.Mutex{},
}

//nolint:gosec
receiverServer := &http.Server{
Addr: fmt.Sprintf("0.0.0.0:%d", receiverPort),
Handler: state,
}

go func() {
if err := receiverServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
slog.Error("Failed while listening to server", "error", err)
os.Exit(1)
}
}()

countPerSender := requests / senders

slog.Info("Waiting for startup")
time.Sleep(time.Second * 5)
slog.Info("Sending started")

start := time.Now()
offset := 0
var wg sync.WaitGroup
for range senders {
wg.Add(1)
go state.RunSender(client, offset, countPerSender, &wg)
offset += countPerSender
}
wg.Wait()
sendingTime := time.Since(start)

slog.Info("Waiting to finish")
time.Sleep(time.Second * 1)

state.mu.Lock()
defer state.mu.Unlock()

missedRequests := 0
totalRequests := len(state.sentAt)
rps := float64(totalRequests) / (float64(sendingTime.Milliseconds()) / 1000.0)

values := make([]float64, 0, totalRequests)
for request, sentAt := range state.sentAt {
receivedAt, ok := state.receivedAt[request]
if !ok {
missedRequests += 1
continue
}
diff := float64(receivedAt.Sub(sentAt).Microseconds()) / 1000.0
values = append(values, diff)
}
slices.Sort(values)
p50 := values[(len(values)*50)/100]
p99 := values[(len(values)*99)/100]
p100 := values[len(values)-1]

missedPercentage := float64(missedRequests) / float64(totalRequests) * 100.0

slog.Info("Results", "p50", p50, "p99", p99, "p100", p100, "totalReq", totalRequests, "missedRequests", missedRequests, "missedPercent", missedPercentage, "averageRPS", rps)

return nil
}
39 changes: 5 additions & 34 deletions cmd/test-tx-sender/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@ package main
import (
"context"
"errors"
"io"
"log"
"log/slog"
"net/http"
"os"

"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/flashbots/go-utils/rpcclient"
"github.com/flashbots/go-utils/rpctypes"
"github.com/flashbots/go-utils/signature"
"github.com/flashbots/tdx-orderflow-proxy/proxy"
Expand All @@ -21,16 +20,10 @@ var flags []cli.Flag = []cli.Flag{
// input and output
&cli.StringFlag{
Name: "local-orderflow-endpoint",
Value: "https://127.0.0.1:443",
Value: "http://127.0.0.1",
Usage: "address to send orderflow to",
EnvVars: []string{"LOCAL_ORDERPLOW_ENDPOINT"},
},
&cli.StringFlag{
Name: "cert-endpoint",
Value: "http://127.0.0.1:14727",
Usage: "address that serves certifiate on /cert endpoint",
EnvVars: []string{"CERT_ENDPOINT"},
},
&cli.StringFlag{
Name: "signer-private-key",
Value: "0x52da2727dd1180b547258c9ca7deb7f9576b2768f3f293b67f36505c85b2ddd0",
Expand All @@ -57,7 +50,6 @@ func main() {
Flags: flags,
Action: func(cCtx *cli.Context) error {
localOrderflowEndpoint := cCtx.String("local-orderflow-endpoint")
certEndpoint := cCtx.String("cert-endpoint")
signerPrivateKey := cCtx.String("signer-private-key")

orderflowSigner, err := signature.NewSignerFromHexPrivateKey(signerPrivateKey)
Expand All @@ -66,16 +58,9 @@ func main() {
}
slog.Info("Ordeflow signing address", "address", orderflowSigner.Address())

cert, err := fetchCertificate(certEndpoint + "/cert")
if err != nil {
return err
}
slog.Info("Fetched certificate")

client, err := proxy.RPCClientWithCertAndSigner(localOrderflowEndpoint, cert, orderflowSigner, 1)
if err != nil {
return err
}
client := rpcclient.NewClientWithOpts(localOrderflowEndpoint, &rpcclient.RPCClientOpts{
Signer: orderflowSigner,
})
slog.Info("Created client")

rpcEndpoint := cCtx.String("rpc-endpoint")
Expand Down Expand Up @@ -188,17 +173,3 @@ func main() {
log.Fatal(err)
}
}

func fetchCertificate(endpoint string) ([]byte, error) {
resp, err := http.Get(endpoint) //nolint:gosec
if err != nil {
return nil, err
}
defer resp.Body.Close()

body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
return body, nil
}
14 changes: 14 additions & 0 deletions e2e-latenc-test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#!/bin/bash

make build

./build/receiver-proxy --pprof --user-listen-addr 0.0.0.0:9976 --builder-endpoint http://127.0.0.1:7890 --builder-confighub-endpoint http://127.0.0.1:7890 --orderflow-archive-endpoint http://127.0.0.1:7890 --connections-per-peer 100 > /tmp/log.txt 2>&1 &
PROXY_PID=$!

./build/test-e2e-latency --local-orderflow-endpoint http://127.0.0.1:9976 --local-receiver-server-port 7890 --num-requests 100000 --num-senders 1000

# uncomment to send orderflow without proxy to see test setup overhead
#./build/test-e2e-latency --local-orderflow-endpoint http://127.0.0.1:7890 --local-receiver-server-port 7890 --num-requests 100000 --num-senders 1000

#sleep 10
kill $PROXY_PID
Loading