Skip to content

Commit aec9dc2

Browse files
authored
E2e latency fix simple (#52)
## 📝 Summary * serialize request only once before sharing * use fasthttp for requests when sharing * share to builder immediately * add a simple benchmark ## ⛱ Motivation and Context <!--- Why is this change required? What problem does it solve? --> ## 📚 References <!-- Any interesting external links to documentation, articles, tweets which add value to the PR --> --- ## ✅ I have run these commands * [ ] `make lint` * [ ] `make test` * [ ] `go mod tidy`
1 parent 1762106 commit aec9dc2

File tree

13 files changed

+518
-153
lines changed

13 files changed

+518
-153
lines changed

Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ build: ## Build the HTTP server
2626
go build -trimpath -ldflags "-X github.com/flashbots/tdx-orderflow-proxy/common.Version=${VERSION}" -v -o ./build/sender-proxy cmd/sender-proxy/main.go
2727
go build -trimpath -ldflags "-X github.com/flashbots/tdx-orderflow-proxy/common.Version=${VERSION}" -v -o ./build/receiver-proxy cmd/receiver-proxy/main.go
2828
go build -trimpath -ldflags "-X github.com/flashbots/tdx-orderflow-proxy/common.Version=${VERSION}" -v -o ./build/test-orderflow-sender cmd/test-tx-sender/main.go
29+
go build -trimpath -ldflags "-X github.com/flashbots/tdx-orderflow-proxy/common.Version=${VERSION}" -v -o ./build/test-e2e-latency cmd/test-e2e-latency/main.go
2930

3031
.PHONY: build-receiver-proxy
3132
build-receiver-proxy: ## Build only the receiver-proxy

cmd/receiver-proxy/main.go

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -212,16 +212,19 @@ func runMain(cCtx *cli.Context) error {
212212
maxUserRPS := cCtx.Int(flagMaxUserRPS)
213213

214214
proxyConfig := &proxy.ReceiverProxyConfig{
215-
ReceiverProxyConstantConfig: proxy.ReceiverProxyConstantConfig{Log: log, FlashbotsSignerAddress: flashbotsSignerAddress},
216-
BuilderConfigHubEndpoint: builderConfigHubEndpoint,
217-
ArchiveEndpoint: archiveEndpoint,
218-
ArchiveConnections: connectionsPerPeer,
219-
LocalBuilderEndpoint: builderEndpoint,
220-
EthRPC: rpcEndpoint,
221-
MaxRequestBodySizeBytes: maxRequestBodySizeBytes,
222-
ConnectionsPerPeer: connectionsPerPeer,
223-
MaxUserRPS: maxUserRPS,
224-
ArchiveWorkerCount: archiveWorkerCount,
215+
ReceiverProxyConstantConfig: proxy.ReceiverProxyConstantConfig{
216+
Log: log,
217+
FlashbotsSignerAddress: flashbotsSignerAddress,
218+
LocalBuilderEndpoint: builderEndpoint,
219+
},
220+
BuilderConfigHubEndpoint: builderConfigHubEndpoint,
221+
ArchiveEndpoint: archiveEndpoint,
222+
ArchiveConnections: connectionsPerPeer,
223+
EthRPC: rpcEndpoint,
224+
MaxRequestBodySizeBytes: maxRequestBodySizeBytes,
225+
ConnectionsPerPeer: connectionsPerPeer,
226+
MaxUserRPS: maxUserRPS,
227+
ArchiveWorkerCount: archiveWorkerCount,
225228
}
226229

227230
instance, err := proxy.NewReceiverProxy(*proxyConfig)

cmd/test-e2e-latency/main.go

Lines changed: 231 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,231 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"encoding/binary"
6+
"encoding/json"
7+
"errors"
8+
"fmt"
9+
"io"
10+
"log"
11+
"log/slog"
12+
"net/http"
13+
"os"
14+
"slices"
15+
"sync"
16+
"time"
17+
18+
"github.com/ethereum/go-ethereum/common/hexutil"
19+
"github.com/flashbots/go-utils/rpcclient"
20+
"github.com/flashbots/go-utils/signature"
21+
"github.com/urfave/cli/v2" // imports as package "cli"
22+
)
23+
24+
var flags []cli.Flag = []cli.Flag{
25+
// input and output
26+
&cli.StringFlag{
27+
Name: "local-orderflow-endpoint",
28+
Value: "http://127.0.0.1",
29+
Usage: "address to send orderflow to",
30+
EnvVars: []string{"LOCAL_ORDERPLOW_ENDPOINT"},
31+
},
32+
&cli.IntFlag{
33+
Name: "local-receiver-server-port",
34+
Value: 8646,
35+
Usage: "address to send orderflow to",
36+
EnvVars: []string{"LOCAL_RECEIVER_SERVER__PORT"},
37+
},
38+
&cli.IntFlag{
39+
Name: "num-senders",
40+
Value: 50,
41+
Usage: "Number of senders",
42+
EnvVars: []string{"NUM_SENDERS"},
43+
},
44+
&cli.IntFlag{
45+
Name: "num-requests",
46+
Value: 50000,
47+
Usage: "Number of requests",
48+
EnvVars: []string{"NUM_REQUESTS"},
49+
},
50+
}
51+
52+
// test tx
53+
54+
func main() {
55+
app := &cli.App{
56+
Name: "test-tx-sender",
57+
Usage: "send test transactions",
58+
Flags: flags,
59+
Action: func(cCtx *cli.Context) error {
60+
orderflowSigner, err := signature.NewRandomSigner()
61+
if err != nil {
62+
return err
63+
}
64+
slog.Info("Ordeflow signing address", "address", orderflowSigner.Address())
65+
66+
localOrderflowEndpoint := cCtx.String("local-orderflow-endpoint")
67+
client := rpcclient.NewClientWithOpts(localOrderflowEndpoint, &rpcclient.RPCClientOpts{
68+
Signer: orderflowSigner,
69+
})
70+
slog.Info("Created client")
71+
72+
receiverPort := cCtx.Int("local-receiver-server-port")
73+
74+
senders := cCtx.Int("num-senders")
75+
requests := cCtx.Int("num-requests")
76+
77+
return runE2ELatencyTest(client, receiverPort, senders, requests)
78+
},
79+
}
80+
81+
if err := app.Run(os.Args); err != nil {
82+
log.Fatal(err)
83+
}
84+
}
85+
86+
type sharedState struct {
87+
sentAt map[uint64]time.Time
88+
receivedAt map[uint64]time.Time
89+
mu sync.Mutex
90+
}
91+
92+
func (s *sharedState) ServeHTTP(w http.ResponseWriter, r *http.Request) {
93+
receivedAt := time.Now()
94+
body, _ := io.ReadAll(r.Body)
95+
96+
// serve builderhub API
97+
if r.URL.Path == "/api/l1-builder/v1/register_credentials/orderflow_proxy" {
98+
w.WriteHeader(http.StatusOK)
99+
return
100+
} else if r.URL.Path == "/api/l1-builder/v1/builders" {
101+
res, err := json.Marshal([]int{})
102+
if err != nil {
103+
w.WriteHeader(http.StatusInternalServerError)
104+
return
105+
}
106+
w.WriteHeader(http.StatusOK)
107+
_, _ = w.Write(res)
108+
return
109+
}
110+
111+
resp, err := json.Marshal(struct{}{})
112+
if err != nil {
113+
w.WriteHeader(http.StatusInternalServerError)
114+
return
115+
}
116+
_, _ = w.Write(resp)
117+
118+
// forwarded request received
119+
type jsonRPCRequest struct {
120+
Params []hexutil.Bytes `json:"params"`
121+
}
122+
123+
var request jsonRPCRequest
124+
err = json.Unmarshal(body, &request)
125+
if err != nil {
126+
return
127+
}
128+
if len(request.Params) != 1 {
129+
return
130+
}
131+
132+
decoded := binary.BigEndian.Uint64(request.Params[0])
133+
134+
s.mu.Lock()
135+
s.receivedAt[decoded] = receivedAt
136+
s.mu.Unlock()
137+
}
138+
139+
func (s *sharedState) RunSender(client rpcclient.RPCClient, start, count int, wg *sync.WaitGroup) {
140+
defer wg.Done()
141+
for i := start; i < start+count; i += 1 {
142+
b := make([]byte, 8)
143+
//nolint:gosec
144+
binary.BigEndian.PutUint64(b, uint64(i))
145+
request := hexutil.Bytes(b)
146+
147+
s.mu.Lock()
148+
//nolint:gosec
149+
s.sentAt[uint64(i)] = time.Now()
150+
s.mu.Unlock()
151+
// send eth_sendRawTransactions
152+
resp, err := client.Call(context.Background(), "eth_sendRawTransaction", request)
153+
if err != nil {
154+
slog.Error("RPC request failed", "error", err)
155+
continue
156+
}
157+
if resp.Error != nil {
158+
slog.Error("RPC returned error", "error", resp.Error)
159+
continue
160+
}
161+
}
162+
}
163+
164+
func runE2ELatencyTest(client rpcclient.RPCClient, receiverPort, senders, requests int) error {
165+
state := &sharedState{
166+
sentAt: make(map[uint64]time.Time),
167+
receivedAt: make(map[uint64]time.Time),
168+
mu: sync.Mutex{},
169+
}
170+
171+
//nolint:gosec
172+
receiverServer := &http.Server{
173+
Addr: fmt.Sprintf("0.0.0.0:%d", receiverPort),
174+
Handler: state,
175+
}
176+
177+
go func() {
178+
if err := receiverServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
179+
slog.Error("Failed while listening to server", "error", err)
180+
os.Exit(1)
181+
}
182+
}()
183+
184+
countPerSender := requests / senders
185+
186+
slog.Info("Waiting for startup")
187+
time.Sleep(time.Second * 5)
188+
slog.Info("Sending started")
189+
190+
start := time.Now()
191+
offset := 0
192+
var wg sync.WaitGroup
193+
for range senders {
194+
wg.Add(1)
195+
go state.RunSender(client, offset, countPerSender, &wg)
196+
offset += countPerSender
197+
}
198+
wg.Wait()
199+
sendingTime := time.Since(start)
200+
201+
slog.Info("Waiting to finish")
202+
time.Sleep(time.Second * 1)
203+
204+
state.mu.Lock()
205+
defer state.mu.Unlock()
206+
207+
missedRequests := 0
208+
totalRequests := len(state.sentAt)
209+
rps := float64(totalRequests) / (float64(sendingTime.Milliseconds()) / 1000.0)
210+
211+
values := make([]float64, 0, totalRequests)
212+
for request, sentAt := range state.sentAt {
213+
receivedAt, ok := state.receivedAt[request]
214+
if !ok {
215+
missedRequests += 1
216+
continue
217+
}
218+
diff := float64(receivedAt.Sub(sentAt).Microseconds()) / 1000.0
219+
values = append(values, diff)
220+
}
221+
slices.Sort(values)
222+
p50 := values[(len(values)*50)/100]
223+
p99 := values[(len(values)*99)/100]
224+
p100 := values[len(values)-1]
225+
226+
missedPercentage := float64(missedRequests) / float64(totalRequests) * 100.0
227+
228+
slog.Info("Results", "p50", p50, "p99", p99, "p100", p100, "totalReq", totalRequests, "missedRequests", missedRequests, "missedPercent", missedPercentage, "averageRPS", rps)
229+
230+
return nil
231+
}

cmd/test-tx-sender/main.go

Lines changed: 5 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,12 @@ package main
33
import (
44
"context"
55
"errors"
6-
"io"
76
"log"
87
"log/slog"
9-
"net/http"
108
"os"
119

1210
"github.com/ethereum/go-ethereum/common/hexutil"
11+
"github.com/flashbots/go-utils/rpcclient"
1312
"github.com/flashbots/go-utils/rpctypes"
1413
"github.com/flashbots/go-utils/signature"
1514
"github.com/flashbots/tdx-orderflow-proxy/proxy"
@@ -21,16 +20,10 @@ var flags []cli.Flag = []cli.Flag{
2120
// input and output
2221
&cli.StringFlag{
2322
Name: "local-orderflow-endpoint",
24-
Value: "https://127.0.0.1:443",
23+
Value: "http://127.0.0.1",
2524
Usage: "address to send orderflow to",
2625
EnvVars: []string{"LOCAL_ORDERPLOW_ENDPOINT"},
2726
},
28-
&cli.StringFlag{
29-
Name: "cert-endpoint",
30-
Value: "http://127.0.0.1:14727",
31-
Usage: "address that serves certifiate on /cert endpoint",
32-
EnvVars: []string{"CERT_ENDPOINT"},
33-
},
3427
&cli.StringFlag{
3528
Name: "signer-private-key",
3629
Value: "0x52da2727dd1180b547258c9ca7deb7f9576b2768f3f293b67f36505c85b2ddd0",
@@ -57,7 +50,6 @@ func main() {
5750
Flags: flags,
5851
Action: func(cCtx *cli.Context) error {
5952
localOrderflowEndpoint := cCtx.String("local-orderflow-endpoint")
60-
certEndpoint := cCtx.String("cert-endpoint")
6153
signerPrivateKey := cCtx.String("signer-private-key")
6254

6355
orderflowSigner, err := signature.NewSignerFromHexPrivateKey(signerPrivateKey)
@@ -66,16 +58,9 @@ func main() {
6658
}
6759
slog.Info("Ordeflow signing address", "address", orderflowSigner.Address())
6860

69-
cert, err := fetchCertificate(certEndpoint + "/cert")
70-
if err != nil {
71-
return err
72-
}
73-
slog.Info("Fetched certificate")
74-
75-
client, err := proxy.RPCClientWithCertAndSigner(localOrderflowEndpoint, cert, orderflowSigner, 1)
76-
if err != nil {
77-
return err
78-
}
61+
client := rpcclient.NewClientWithOpts(localOrderflowEndpoint, &rpcclient.RPCClientOpts{
62+
Signer: orderflowSigner,
63+
})
7964
slog.Info("Created client")
8065

8166
rpcEndpoint := cCtx.String("rpc-endpoint")
@@ -188,17 +173,3 @@ func main() {
188173
log.Fatal(err)
189174
}
190175
}
191-
192-
func fetchCertificate(endpoint string) ([]byte, error) {
193-
resp, err := http.Get(endpoint) //nolint:gosec
194-
if err != nil {
195-
return nil, err
196-
}
197-
defer resp.Body.Close()
198-
199-
body, err := io.ReadAll(resp.Body)
200-
if err != nil {
201-
return nil, err
202-
}
203-
return body, nil
204-
}

e2e-latenc-test.sh

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
#!/bin/bash
2+
3+
make build
4+
5+
./build/receiver-proxy --pprof --user-listen-addr 0.0.0.0:9976 --builder-endpoint http://127.0.0.1:7890 --builder-confighub-endpoint http://127.0.0.1:7890 --orderflow-archive-endpoint http://127.0.0.1:7890 --connections-per-peer 100 > /tmp/log.txt 2>&1 &
6+
PROXY_PID=$!
7+
8+
./build/test-e2e-latency --local-orderflow-endpoint http://127.0.0.1:9976 --local-receiver-server-port 7890 --num-requests 100000 --num-senders 1000
9+
10+
# uncomment to send orderflow without proxy to see test setup overhead
11+
#./build/test-e2e-latency --local-orderflow-endpoint http://127.0.0.1:7890 --local-receiver-server-port 7890 --num-requests 100000 --num-senders 1000
12+
13+
#sleep 10
14+
kill $PROXY_PID

0 commit comments

Comments
 (0)