Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion proxy/receiver_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,11 @@ func NewReceiverProxy(config ReceiverProxyConfig) (*ReceiverProxy, error) {
return nil, err
}

localBuilder := rpcclient.NewClient(config.LocalBuilderEndpoint)
localCl := HTTPClientLocalhost(DefaultLocalhostMaxIdleConn)

localBuilder := rpcclient.NewClientWithOpts(config.LocalBuilderEndpoint, &rpcclient.RPCClientOpts{
HTTPClient: localCl,
})

limit := rate.Limit(config.MaxUserRPS)
if config.MaxUserRPS == 0 {
Expand Down
3 changes: 3 additions & 0 deletions proxy/receiver_servers.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ var (
HTTPDefaultIdleTimeout = time.Duration(cli.GetEnvInt("HTTP_IDLE_TIMEOUT_SEC", 3600)) * time.Second
HTTP2DefaultMaxUploadPerConnection = int32(cli.GetEnvInt("HTTP2_MAX_UPLOAD_PER_CONN", 32<<20)) // 32MiB
HTTP2DefaultMaxUploadPerStream = int32(cli.GetEnvInt("HTTP2_MAX_UPLOAD_PER_STREAM", 8<<20)) // 8MiB
HTTP2DefaultMaxConcurrentStreams = uint32(cli.GetEnvInt("HTTP2_MAX_CONCURRENT_STREAMS", 4096))
)

type ReceiverProxyServers struct {
Expand All @@ -35,6 +36,7 @@ func StartReceiverServers(proxy *ReceiverProxy, userListenAddress, systemListenA
IdleTimeout: HTTPDefaultIdleTimeout,
}
userH2 := http2.Server{
MaxConcurrentStreams: HTTP2DefaultMaxConcurrentStreams,
MaxUploadBufferPerConnection: HTTP2DefaultMaxUploadPerConnection,
MaxUploadBufferPerStream: HTTP2DefaultMaxUploadPerStream,
}
Expand All @@ -54,6 +56,7 @@ func StartReceiverServers(proxy *ReceiverProxy, userListenAddress, systemListenA
IdleTimeout: HTTPDefaultIdleTimeout,
}
systemH2 := http2.Server{
MaxConcurrentStreams: HTTP2DefaultMaxConcurrentStreams,
MaxUploadBufferPerConnection: HTTP2DefaultMaxUploadPerConnection,
MaxUploadBufferPerStream: HTTP2DefaultMaxUploadPerStream,
}
Expand Down
49 changes: 43 additions & 6 deletions proxy/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,32 @@ import (
"github.com/flashbots/go-utils/cli"
"github.com/flashbots/go-utils/rpcclient"
"github.com/flashbots/go-utils/signature"
"golang.org/x/net/http2"
)

var (
DefaultOrderflowProxyPublicPort = "5544"
DefaultHTTPCLientWriteBuffer = cli.GetEnvInt("HTTP_CLIENT_WRITE_BUFFER", 64<<10) // 64 KiB
)

const DefaultLocalhostMaxIdleConn = 1000

var errCertificate = errors.New("failed to add certificate to pool")

func createTransportForSelfSignedCert(certPEM []byte) (*http.Transport, error) {
func createTransportForSelfSignedCert(certPEM []byte, maxOpenConnections int) (*http.Transport, error) {
certPool := x509.NewCertPool()
if ok := certPool.AppendCertsFromPEM(certPEM); !ok {
return nil, errCertificate
}
return &http.Transport{
tr := &http.Transport{
TLSClientConfig: &tls.Config{
RootCAs: certPool,
MinVersion: tls.VersionTLS12,
},
}, nil
MaxConnsPerHost: maxOpenConnections * 2,
}

return tr, nil
}

func HTTPClientWithMaxConnections(maxOpenConnections int) *http.Client {
Expand All @@ -46,14 +52,45 @@ func HTTPClientWithMaxConnections(maxOpenConnections int) *http.Client {
}
}

func HTTPClientLocalhost(maxOpenConnections int) *http.Client {
localTransport := &http.Transport{
MaxIdleConnsPerHost: maxOpenConnections,
MaxIdleConns: maxOpenConnections,
DisableCompression: true,
IdleConnTimeout: time.Minute * 2,
// ---- kill delayed-ACK/Nagle ----
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
c, err := (&net.Dialer{
LocalAddr: &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1)},
KeepAlive: 30 * time.Second,
Timeout: 1 * time.Second,
}).DialContext(ctx, network, addr)
if err == nil {
tcp, ok := c.(*net.TCPConn)
if ok {
err = tcp.SetNoDelay(true) // <-- ACK immediately
}
}
return c, err
},
Proxy: nil,
}
_ = http2.ConfigureTransport(localTransport)
localCl := http.Client{
Transport: localTransport,
Timeout: 10 * time.Second,
}
return &localCl
}

//nolint:ireturn
func RPCClientWithCertAndSigner(endpoint string, certPEM []byte, signer *signature.Signer, maxOpenConnections int) (rpcclient.RPCClient, error) {
transport, err := createTransportForSelfSignedCert(certPEM)
transport, err := createTransportForSelfSignedCert(certPEM, maxOpenConnections)
if err != nil {
return nil, err
}
transport.MaxIdleConns = maxOpenConnections
transport.MaxIdleConnsPerHost = maxOpenConnections
transport.MaxIdleConns = maxOpenConnections * 2
transport.MaxIdleConnsPerHost = maxOpenConnections * 2
transport.WriteBufferSize = DefaultHTTPCLientWriteBuffer

client := rpcclient.NewClientWithOpts(endpoint, &rpcclient.RPCClientOpts{
Expand Down