diff --git a/proxy/receiver_proxy.go b/proxy/receiver_proxy.go index 7a97e0c..d51bc29 100644 --- a/proxy/receiver_proxy.go +++ b/proxy/receiver_proxy.go @@ -113,7 +113,11 @@ func NewReceiverProxy(config ReceiverProxyConfig) (*ReceiverProxy, error) { return nil, err } - localBuilder := rpcclient.NewClient(config.LocalBuilderEndpoint) + localCl := HTTPClientLocalhost(DefaultLocalhostMaxIdleConn) + + localBuilder := rpcclient.NewClientWithOpts(config.LocalBuilderEndpoint, &rpcclient.RPCClientOpts{ + HTTPClient: localCl, + }) limit := rate.Limit(config.MaxUserRPS) if config.MaxUserRPS == 0 { diff --git a/proxy/receiver_servers.go b/proxy/receiver_servers.go index edd11c3..bc61933 100644 --- a/proxy/receiver_servers.go +++ b/proxy/receiver_servers.go @@ -16,6 +16,7 @@ var ( HTTPDefaultIdleTimeout = time.Duration(cli.GetEnvInt("HTTP_IDLE_TIMEOUT_SEC", 3600)) * time.Second HTTP2DefaultMaxUploadPerConnection = int32(cli.GetEnvInt("HTTP2_MAX_UPLOAD_PER_CONN", 32<<20)) // 32MiB HTTP2DefaultMaxUploadPerStream = int32(cli.GetEnvInt("HTTP2_MAX_UPLOAD_PER_STREAM", 8<<20)) // 8MiB + HTTP2DefaultMaxConcurrentStreams = uint32(cli.GetEnvInt("HTTP2_MAX_CONCURRENT_STREAMS", 4096)) ) type ReceiverProxyServers struct { @@ -35,6 +36,7 @@ func StartReceiverServers(proxy *ReceiverProxy, userListenAddress, systemListenA IdleTimeout: HTTPDefaultIdleTimeout, } userH2 := http2.Server{ + MaxConcurrentStreams: HTTP2DefaultMaxConcurrentStreams, MaxUploadBufferPerConnection: HTTP2DefaultMaxUploadPerConnection, MaxUploadBufferPerStream: HTTP2DefaultMaxUploadPerStream, } @@ -54,6 +56,7 @@ func StartReceiverServers(proxy *ReceiverProxy, userListenAddress, systemListenA IdleTimeout: HTTPDefaultIdleTimeout, } systemH2 := http2.Server{ + MaxConcurrentStreams: HTTP2DefaultMaxConcurrentStreams, MaxUploadBufferPerConnection: HTTP2DefaultMaxUploadPerConnection, MaxUploadBufferPerStream: HTTP2DefaultMaxUploadPerStream, } diff --git a/proxy/utils.go b/proxy/utils.go index 3f5391e..428c102 100644 --- a/proxy/utils.go +++ b/proxy/utils.go @@ -15,6 +15,7 @@ import ( "github.com/flashbots/go-utils/cli" "github.com/flashbots/go-utils/rpcclient" "github.com/flashbots/go-utils/signature" + "golang.org/x/net/http2" ) var ( @@ -22,19 +23,24 @@ var ( DefaultHTTPCLientWriteBuffer = cli.GetEnvInt("HTTP_CLIENT_WRITE_BUFFER", 64<<10) // 64 KiB ) +const DefaultLocalhostMaxIdleConn = 1000 + var errCertificate = errors.New("failed to add certificate to pool") -func createTransportForSelfSignedCert(certPEM []byte) (*http.Transport, error) { +func createTransportForSelfSignedCert(certPEM []byte, maxOpenConnections int) (*http.Transport, error) { certPool := x509.NewCertPool() if ok := certPool.AppendCertsFromPEM(certPEM); !ok { return nil, errCertificate } - return &http.Transport{ + tr := &http.Transport{ TLSClientConfig: &tls.Config{ RootCAs: certPool, MinVersion: tls.VersionTLS12, }, - }, nil + MaxConnsPerHost: maxOpenConnections * 2, + } + + return tr, nil } func HTTPClientWithMaxConnections(maxOpenConnections int) *http.Client { @@ -46,14 +52,45 @@ func HTTPClientWithMaxConnections(maxOpenConnections int) *http.Client { } } +func HTTPClientLocalhost(maxOpenConnections int) *http.Client { + localTransport := &http.Transport{ + MaxIdleConnsPerHost: maxOpenConnections, + MaxIdleConns: maxOpenConnections, + DisableCompression: true, + IdleConnTimeout: time.Minute * 2, + // ---- kill delayed-ACK/Nagle ---- + DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) { + c, err := (&net.Dialer{ + LocalAddr: &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1)}, + KeepAlive: 30 * time.Second, + Timeout: 1 * time.Second, + }).DialContext(ctx, network, addr) + if err == nil { + tcp, ok := c.(*net.TCPConn) + if ok { + err = tcp.SetNoDelay(true) // <-- ACK immediately + } + } + return c, err + }, + Proxy: nil, + } + _ = http2.ConfigureTransport(localTransport) + localCl := http.Client{ + Transport: localTransport, + Timeout: 10 * time.Second, + } + return &localCl +} + //nolint:ireturn func RPCClientWithCertAndSigner(endpoint string, certPEM []byte, signer *signature.Signer, maxOpenConnections int) (rpcclient.RPCClient, error) { - transport, err := createTransportForSelfSignedCert(certPEM) + transport, err := createTransportForSelfSignedCert(certPEM, maxOpenConnections) if err != nil { return nil, err } - transport.MaxIdleConns = maxOpenConnections - transport.MaxIdleConnsPerHost = maxOpenConnections + transport.MaxIdleConns = maxOpenConnections * 2 + transport.MaxIdleConnsPerHost = maxOpenConnections * 2 transport.WriteBufferSize = DefaultHTTPCLientWriteBuffer client := rpcclient.NewClientWithOpts(endpoint, &rpcclient.RPCClientOpts{