-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Fix Race Condition in fasthttpadaptor v1.66.0 #2069
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Fix Race Condition in fasthttpadaptor v1.66.0 #2069
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR fixes a race condition in fasthttpadaptor v1.66.0 where a recycled netHTTPResponseWriter
could be re-closed while still in use, causing concurrent access issues during resource cleanup.
- Implements synchronization using
sync.WaitGroup
to ensure proper coordination between concurrent operations - Removes multiple mutex locks that were causing deadlocks and race conditions
- Adds comprehensive test coverage to prevent future race condition regressions
Reviewed Changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.
File | Description |
---|---|
fasthttpadaptor/adaptor.go | Implements WaitGroup-based synchronization, removes problematic mutexes, and restructures resource cleanup logic |
fasthttpadaptor/adaptor_test.go | Adds new test functions to verify race condition fixes and resource recycling under load |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
mode := <-w.modeCh | ||
// Wait for the net/http handler to complete before releasing. | ||
// (e.g. wait for hijacked connection) | ||
w.wg.Wait() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This creates a potential deadlock. The goroutine is calling w.wg.Wait()
while it's part of the same WaitGroup that was incremented with w.wg.Add(1)
at line 63. The goroutine cannot complete because it's waiting for itself to finish.
w.wg.Wait() |
Copilot uses AI. Check for mistakes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe the comment isn't clear about what it means by waiting for the handler to complete. The WaitGroup is meant to track the lifetime of the reference to w
in each mode type.
The w.wg.Wait()
is necessary here to avoid recycling the writer while w
is still in use (in SendStreamWriter, or the handlerConn still hasn't been closed).
w.r, w.w = io.Pipe() | ||
ctx.SetBodyStreamWriter(func(bw *bufio.Writer) { | ||
// Signal that streaming completed on return. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The pipe is created after the WaitGroup is decremented in the modeDone
case but before it's used in streaming. This could cause a race condition where the pipe is accessed before it's initialized. The pipe creation should happen before the goroutine starts or be properly synchronized.
w.r, w.w = io.Pipe() | |
ctx.SetBodyStreamWriter(func(bw *bufio.Writer) { | |
// Signal that streaming completed on return. | |
// Create the pipe before starting the stream writer to avoid race conditions. | |
w.r, w.w = io.Pipe() | |
ctx.SetBodyStreamWriter(func(bw *bufio.Writer) { |
Copilot uses AI. Check for mistakes.
fasthttpadaptor/adaptor.go
Outdated
w.closeOnce.Do(func() { | ||
if w.w != nil { | ||
_ = w.w.Close() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Close()
method only closes the writer part of the pipe but not the reader (w.r
). This could lead to resource leaks. Both w.r
and w.w
should be closed, or the full pipe should be properly cleaned up.
} | |
} | |
if w.r != nil { | |
_ = w.r.Close() | |
} |
Copilot uses AI. Check for mistakes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Close()
method is from the perspective of the net/http handler.
When the net/http handler closes the writer, this should close the writer end w.w
of the streaming mode. The reader end that is used by fasthttpw.r
will then return an EOF error and be closed along with the client connection separately.
If any other mode occurs, the Close() operation does nothing since no internal writers/readers are managed by fasthttp at that point. Hijack mode gives complete control to the user to control w.handlerConn
, while done mode only uses a recycled buffer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
} | ||
|
||
// Start streaming mode on return. | ||
w.r, w.w = io.Pipe() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The io.Pipe() creation should be moved to the reset() method or initialization to ensure pipes are always available when needed, rather than creating them only in flush mode.
Copilot uses AI. Check for mistakes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The pipes are only needed specifically during streaming (flush) mode.
Since there is no need to allocate these pips in any other situation, I had updated to code to create the pipes only during stream mode since w.r
and w.w
are only used in streaming mode. This way, we don't allocate pipes unnecessarily.
Co-authored-by: Copilot <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 4 out of 4 changed files in this pull request and generated 2 comments.
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
w.w = nil | ||
w.r = nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Setting pipe reader/writer to nil in reset() while concurrent operations might still be using them could cause nil pointer dereferences. These should only be reset after ensuring all operations have completed.
Copilot uses AI. Check for mistakes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A reset only occurs when the writer is acquired. The writer at this stage should have completed all operations as expected.
The new wait group should ensure this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 5 out of 5 changed files in this pull request and generated no new comments.
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
@erikdubbelboer I've made some changes to address the rare reset race condition. I also moved the Please let me know your thoughts on these changes and the AI hints. This should address the issue in a more robust way, since the release of the writer is now located in a single location for the adaptor. |
I'm sorry but these changes are too many and too messy for me to follow. The initial flushing support PR was already super hard to properly review. I'm strongly considering reverting flushing support completely for now. Then we can have a look at re-doing the whole thing in a different way. |
You’re right, I’m sorry for the large PR. I included extra changes, such as documentation updates, renames, and optimizing field allocation to enhance readability and add unit tests. However, I realize that this has made the PR difficult to follow. I’m more than happy to revert these unnecessary changes and only focus on the essential ones that address the race condition in the PR. Or if it's best, I can close the PR and we can look into starting over to ensure a cleaner implementation. If we decide to proceed with the current PR after removing the extra changes, the diff would include approximately 51 new lines and 50 removed lines for adaptor.go +52 -51diff --git a/fasthttpadaptor/adaptor.go b/fasthttpadaptor/adaptor.go
index 48cd0e3..5833074 100644
--- a/fasthttpadaptor/adaptor.go
+++ b/fasthttpadaptor/adaptor.go
@@ -60,18 +60,21 @@ func NewFastHTTPHandler(h http.Handler) fasthttp.RequestHandler {
w := acquireNetHTTPResponseWriter(ctx)
// Concurrently serve the net/http handler.
+ w.wg.Add(1)
go func() {
h.ServeHTTP(w, r.WithContext(ctx))
- select {
- case w.modeCh <- modeDone:
- default:
- }
- _ = w.Close()
- }()
+ w.chOnce.Do(func() {
+ w.modeCh <- modeDone
+ })
+ w.Close()
- mode := <-w.modeCh
+ // Wait for the net/http handler to complete before releasing.
+ // (e.g. wait for hijacked connection)
+ w.wg.Wait()
+ releaseNetHTTPResponseWriter(w)
+ }()
- switch mode {
+ switch <-w.modeCh {
case modeDone:
// No flush occurred before the handler returned.
// Send the data as one chunk.
@@ -87,6 +90,7 @@ func NewFastHTTPHandler(h http.Handler) fasthttp.RequestHandler {
}
}
+ w.responseMutex.Lock()
if !haveContentType {
// From net/http.ResponseWriter.Write:
// If the Header does not contain a Content-Type line, Write adds a Content-Type set
@@ -99,14 +103,13 @@ func NewFastHTTPHandler(h http.Handler) fasthttp.RequestHandler {
ctx.Response.Header.Set(fasthttp.HeaderContentType, http.DetectContentType(b[:l]))
}
- w.responseMutex.Lock()
if len(*w.responseBody) > 0 {
ctx.Response.SetBody(*w.responseBody)
}
w.responseMutex.Unlock()
- // Release after sending response.
- releaseNetHTTPResponseWriter(w)
+ // Signal that the net/http -> fasthttp copy is complete.
+ w.wg.Done()
case modeFlushed:
// Flush occurred before handler returned.
@@ -145,8 +148,13 @@ func NewFastHTTPHandler(h http.Handler) fasthttp.RequestHandler {
ctx.Response.Header.Set(fasthttp.HeaderContentType, http.DetectContentType(b[:l]))
}
- // Start streaming mode on return.
ctx.SetBodyStreamWriter(func(bw *bufio.Writer) {
+ // Signal that streaming completed on return.
+ defer func() {
+ w.r.Close()
+ w.wg.Done()
+ }()
+
// Stream the first chunk.
if len(*w.responseBody) > 0 {
_, _ = bw.Write(*w.responseBody)
@@ -163,16 +171,13 @@ func NewFastHTTPHandler(h http.Handler) fasthttp.RequestHandler {
// as data comes in.
chunk := acquireBuffer()
*chunk = (*chunk)[:minBufferSize]
+ defer releaseBuffer(chunk)
for {
// Read net/http handler chunk.
n, err := w.r.Read(*chunk)
if err != nil {
// Handler ended due to an io.EOF
// or an error occurred.
- //
- // Release the response writer for reuse.
- releaseBuffer(chunk)
- releaseNetHTTPResponseWriter(w)
return
}
@@ -182,10 +187,6 @@ func NewFastHTTPHandler(h http.Handler) fasthttp.RequestHandler {
if err != nil {
// Handler ended due to an io.ErrPipeClosed
// or an error occurred.
- //
- // Release the response writer for reuse.
- releaseBuffer(chunk)
- releaseNetHTTPResponseWriter(w)
return
}
@@ -193,10 +194,6 @@ func NewFastHTTPHandler(h http.Handler) fasthttp.RequestHandler {
if err != nil {
// Handler ended due to an io.ErrPipeClosed
// or an error occurred.
- //
- // Release the response writer for reuse.
- releaseBuffer(chunk)
- releaseNetHTTPResponseWriter(w)
return
}
}
@@ -213,13 +210,12 @@ func NewFastHTTPHandler(h http.Handler) fasthttp.RequestHandler {
// The net/http handler called w.Hijack().
// Copy data bidirectionally between the
// net/http and fasthttp connections.
- var wg sync.WaitGroup
- wg.Add(2)
+ w.hijackedWg.Add(2)
// Note: It is safe to assume that net.Conn automatically
// flushes data while copying.
go func() {
- defer wg.Done()
+ defer w.hijackedWg.Done()
_, _ = io.Copy(ctx.Conn(), w.handlerConn)
// Close the fasthttp connection when
@@ -227,17 +223,18 @@ func NewFastHTTPHandler(h http.Handler) fasthttp.RequestHandler {
_ = ctx.Conn().Close()
}()
go func() {
- defer wg.Done()
+ defer w.hijackedWg.Done()
_, _ = io.Copy(w.handlerConn, ctx.Conn())
// Note: Only the net/http handler
// should close the connection.
}()
// Wait for the net/http handler to finish
- // writing to the hijacked connection prior to releasing
- // the writer into the writer pool.
- wg.Wait()
- releaseNetHTTPResponseWriter(w)
+ // writing to the hijacked connection.
+ w.hijackedWg.Wait()
+
+ // Signal that the hijacked connection was closed.
+ w.wg.Done()
}
}
}
@@ -259,17 +256,17 @@ var writerPool = &sync.Pool{
h: make(http.Header),
r: pr,
w: pw,
- modeCh: make(chan ModeType),
+ modeCh: make(chan modeType),
responseBody: acquireBuffer(),
streamCond: sync.NewCond(&sync.Mutex{}),
}
},
}
-type ModeType int
+type modeType int
const (
- modeUnknown ModeType = iota
+ modeUnknown modeType = iota //nolint:unused
modeDone
modeFlushed
modeHijacked
@@ -281,15 +278,18 @@ type netHTTPResponseWriter struct {
h http.Header
r *io.PipeReader
w *io.PipeWriter
- modeCh chan ModeType
+ modeCh chan modeType
responseBody *[]byte
streamCond *sync.Cond
statusCode int
- once sync.Once
+ chOnce sync.Once
+ closeOnce sync.Once
statusMutex sync.Mutex
responseMutex sync.Mutex
connMutex sync.Mutex
isStreaming bool
+ wg sync.WaitGroup
+ hijackedWg sync.WaitGroup
}
func acquireNetHTTPResponseWriter(ctx *fasthttp.RequestCtx) *netHTTPResponseWriter {
@@ -305,7 +305,6 @@ func acquireNetHTTPResponseWriter(ctx *fasthttp.RequestCtx) *netHTTPResponseWrit
func releaseNetHTTPResponseWriter(w *netHTTPResponseWriter) {
releaseBuffer(w.responseBody)
- w.Close()
writerPool.Put(w)
}
@@ -355,7 +354,7 @@ func (w *netHTTPResponseWriter) Write(p []byte) (int, error) {
}
// Streaming mode is off.
- // Write to the first chunk for flushing later.
+ // Write to the response body buffer for flushing later.
w.responseMutex.Lock()
*w.responseBody = append(*w.responseBody, p...)
w.responseMutex.Unlock()
@@ -364,7 +363,7 @@ func (w *netHTTPResponseWriter) Write(p []byte) (int, error) {
func (w *netHTTPResponseWriter) Flush() {
// Trigger streaming mode setup.
- w.once.Do(func() {
+ w.chOnce.Do(func() {
w.modeCh <- modeFlushed
})
@@ -377,7 +376,7 @@ func (w *netHTTPResponseWriter) Flush() {
}
func (w *netHTTPResponseWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) {
- // Hijack assumes control of the connection, so we need to prevent fasthttp from closing it or
+ // Prevent fasthttp from closing it or
// doing anything else with it.
w.ctx.HijackSetNoResponse(true)
@@ -385,7 +384,7 @@ func (w *netHTTPResponseWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) {
w.handlerConn = fasthttpConn
// Trigger hijacked mode.
- w.once.Do(func() {
+ w.chOnce.Do(func() {
w.modeCh <- modeHijacked
})
@@ -402,14 +401,16 @@ func (w *netHTTPResponseWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) {
}
func (w *netHTTPResponseWriter) Close() error {
- _ = w.w.Close()
- _ = w.r.Close()
+ w.closeOnce.Do(func() {
+ _ = w.w.Close()
+ _ = w.r.Close()
- w.connMutex.Lock()
- if w.handlerConn != nil {
- _ = w.handlerConn.Close()
- }
- w.connMutex.Unlock()
+ w.connMutex.Lock()
+ if w.handlerConn != nil {
+ _ = w.handlerConn.Close()
+ }
+ w.connMutex.Unlock()
+ })
return nil
}
@@ -422,7 +423,6 @@ func (w *netHTTPResponseWriter) reset() {
w.connMutex.Unlock()
w.statusCode = 0
- // Open new bidirectional pipes
pr, pw := io.Pipe()
w.r = pr
w.w = pw
@@ -435,7 +435,8 @@ func (w *netHTTPResponseWriter) reset() {
// Get a new buffer for the response body
w.responseBody = acquireBuffer()
- w.once = sync.Once{}
+ w.chOnce = sync.Once{}
+ w.closeOnce = sync.Once{}
w.streamCond.L.Lock()
w.isStreaming = false
w.streamCond.L.Unlock() And for adaptor_test.go +317 -0diff --git a/fasthttpadaptor/adaptor_test.go b/fasthttpadaptor/adaptor_test.go
index e5b6f8c..b21cb3b 100644
--- a/fasthttpadaptor/adaptor_test.go
+++ b/fasthttpadaptor/adaptor_test.go
@@ -2,9 +2,13 @@ package fasthttpadaptor
import (
"bufio"
+ "bytes"
+ "fmt"
"io"
"net"
"net/http"
+ "net/http/httptest"
+ "net/http/httputil"
"net/url"
"reflect"
"testing"
@@ -140,6 +144,132 @@ func TestNewFastHTTPHandler(t *testing.T) {
}
}
+func TestNewFastHTTPHandlerFunc(t *testing.T) {
+ t.Parallel()
+
+ expectedMethod := fasthttp.MethodPost
+ expectedProto := "HTTP/1.1"
+ expectedProtoMajor := 1
+ expectedProtoMinor := 1
+ expectedRequestURI := "/foo/bar?baz=123"
+ expectedBody := "<!doctype html><html>"
+ expectedContentLength := len(expectedBody)
+ expectedHost := "foobar.com"
+ expectedRemoteAddr := "1.2.3.4:6789"
+ expectedHeader := map[string]string{
+ "Foo-Bar": "baz",
+ "Abc": "defg",
+ "XXX-Remote-Addr": "123.43.4543.345",
+ }
+ expectedURL, err := url.ParseRequestURI(expectedRequestURI)
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ expectedContextKey := "contextKey"
+ expectedContextValue := "contextValue"
+ expectedContentType := "text/html; charset=utf-8"
+
+ callsCount := 0
+ nethttpH := func(w http.ResponseWriter, r *http.Request) {
+ callsCount++
+ if r.Method != expectedMethod {
+ t.Fatalf("unexpected method %q. Expecting %q", r.Method, expectedMethod)
+ }
+ if r.Proto != expectedProto {
+ t.Fatalf("unexpected proto %q. Expecting %q", r.Proto, expectedProto)
+ }
+ if r.ProtoMajor != expectedProtoMajor {
+ t.Fatalf("unexpected protoMajor %d. Expecting %d", r.ProtoMajor, expectedProtoMajor)
+ }
+ if r.ProtoMinor != expectedProtoMinor {
+ t.Fatalf("unexpected protoMinor %d. Expecting %d", r.ProtoMinor, expectedProtoMinor)
+ }
+ if r.RequestURI != expectedRequestURI {
+ t.Fatalf("unexpected requestURI %q. Expecting %q", r.RequestURI, expectedRequestURI)
+ }
+ if r.ContentLength != int64(expectedContentLength) {
+ t.Fatalf("unexpected contentLength %d. Expecting %d", r.ContentLength, expectedContentLength)
+ }
+ if len(r.TransferEncoding) != 0 {
+ t.Fatalf("unexpected transferEncoding %q. Expecting []", r.TransferEncoding)
+ }
+ if r.Host != expectedHost {
+ t.Fatalf("unexpected host %q. Expecting %q", r.Host, expectedHost)
+ }
+ if r.RemoteAddr != expectedRemoteAddr {
+ t.Fatalf("unexpected remoteAddr %q. Expecting %q", r.RemoteAddr, expectedRemoteAddr)
+ }
+ body, err := io.ReadAll(r.Body)
+ r.Body.Close()
+ if err != nil {
+ t.Fatalf("unexpected error when reading request body: %v", err)
+ }
+ if string(body) != expectedBody {
+ t.Fatalf("unexpected body %q. Expecting %q", body, expectedBody)
+ }
+ if !reflect.DeepEqual(r.URL, expectedURL) {
+ t.Fatalf("unexpected URL: %#v. Expecting %#v", r.URL, expectedURL)
+ }
+ if r.Context().Value(expectedContextKey) != expectedContextValue {
+ t.Fatalf("unexpected context value for key %q. Expecting %q", expectedContextKey, expectedContextValue)
+ }
+
+ for k, expectedV := range expectedHeader {
+ v := r.Header.Get(k)
+ if v != expectedV {
+ t.Fatalf("unexpected header value %q for key %q. Expecting %q", v, k, expectedV)
+ }
+ }
+
+ w.Header().Set("Header1", "value1")
+ w.Header().Set("Header2", "value2")
+ w.WriteHeader(http.StatusBadRequest)
+ w.Write(body) //nolint:errcheck
+ }
+ fasthttpH := NewFastHTTPHandlerFunc(nethttpH)
+ fasthttpH = setContextValueMiddleware(fasthttpH, expectedContextKey, expectedContextValue)
+
+ var ctx fasthttp.RequestCtx
+ var req fasthttp.Request
+
+ req.Header.SetMethod(expectedMethod)
+ req.SetRequestURI(expectedRequestURI)
+ req.Header.SetHost(expectedHost)
+ req.BodyWriter().Write([]byte(expectedBody)) //nolint:errcheck
+ for k, v := range expectedHeader {
+ req.Header.Set(k, v)
+ }
+
+ remoteAddr, err := net.ResolveTCPAddr("tcp", expectedRemoteAddr)
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ ctx.Init(&req, remoteAddr, nil)
+
+ fasthttpH(&ctx)
+
+ if callsCount != 1 {
+ t.Fatalf("unexpected callsCount: %d. Expecting 1", callsCount)
+ }
+
+ resp := &ctx.Response
+ if resp.StatusCode() != fasthttp.StatusBadRequest {
+ t.Fatalf("unexpected statusCode: %d. Expecting %d", resp.StatusCode(), fasthttp.StatusBadRequest)
+ }
+ if string(resp.Header.Peek("Header1")) != "value1" {
+ t.Fatalf("unexpected header value: %q. Expecting %q", resp.Header.Peek("Header1"), "value1")
+ }
+ if string(resp.Header.Peek("Header2")) != "value2" {
+ t.Fatalf("unexpected header value: %q. Expecting %q", resp.Header.Peek("Header2"), "value2")
+ }
+ if string(resp.Body()) != expectedBody {
+ t.Fatalf("unexpected response body %q. Expecting %q", resp.Body(), expectedBody)
+ }
+ if string(resp.Header.Peek("Content-Type")) != expectedContentType {
+ t.Fatalf("unexpected response content-type %q. Expecting %q", string(resp.Header.Peek("Content-Type")), expectedContentType)
+ }
+}
+
func TestNewFastHTTPHandlerWithCookies(t *testing.T) {
expectedMethod := fasthttp.MethodPost
expectedRequestURI := "/foo/bar?baz=123"
@@ -430,3 +560,193 @@ func TestHijackFlush(t *testing.T) {
t.Fatal("timeout")
}
}
+
+func TestNetHTTPResponseWriter_Reset(t *testing.T) {
+ t.Parallel()
+
+ var ctx fasthttp.RequestCtx
+ w := acquireNetHTTPResponseWriter(&ctx)
+ defer releaseNetHTTPResponseWriter(w)
+
+ w.WriteHeader(http.StatusMethodNotAllowed)
+ w.Header().Set("Content-Type", "text/plain")
+ _, _ = w.Write([]byte("test"))
+
+ w.reset()
+
+ if w.StatusCode() != http.StatusOK {
+ t.Fatalf("expected status code to be reset to %d, got %d", http.StatusOK, w.StatusCode())
+ }
+ if len(w.Header()) != 0 {
+ t.Fatalf("expected headers to be cleared, got %v", w.Header())
+ }
+ if len(*w.responseBody) != 0 {
+ t.Fatalf("expected response body to be cleared, got %q", *w.responseBody)
+ }
+ if w.isStreaming {
+ t.Fatalf("expected isStreaming to be false, got true")
+ }
+ if w.streamCond == nil {
+ t.Fatalf("expected streamCond to be initialized, got nil")
+ }
+ if w.handlerConn != nil {
+ t.Fatalf("expected handlerConn to be nil, got %q", w.handlerConn)
+ }
+}
+
+func TestNetHTTPResponseWriter_Pool(t *testing.T) {
+ t.Parallel()
+
+ ctx := new(fasthttp.RequestCtx)
+ w := acquireNetHTTPResponseWriter(ctx)
+ defer releaseNetHTTPResponseWriter(w)
+
+ if w.ctx != ctx {
+ t.Fatalf("Passed in context did not match the current context.")
+ }
+
+ if w.StatusCode() != http.StatusOK {
+ t.Fatalf("expected status code to be reset to %d, got %d", http.StatusOK, w.StatusCode())
+ }
+ if len(w.Header()) != 0 {
+ t.Fatalf("expected headers to be cleared, got %v", w.Header())
+ }
+ if len(*w.responseBody) != 0 {
+ t.Fatalf("expected response body to be cleared, got %q", *w.responseBody)
+ }
+ if w.isStreaming {
+ t.Fatalf("expected isStreaming to be false, got true")
+ }
+ if w.streamCond == nil {
+ t.Fatalf("expected streamCond to be initialized, got nil")
+ }
+ if w.handlerConn != nil {
+ t.Fatalf("expected handlerConn to be nil, got %q", w.handlerConn)
+ }
+}
+
+func TestNetHTTPResponseWriter_Write(t *testing.T) {
+ t.Parallel()
+
+ var ctx fasthttp.RequestCtx
+ w := acquireNetHTTPResponseWriter(&ctx)
+ defer releaseNetHTTPResponseWriter(w)
+
+ data := []byte("Hello, World!")
+ n, err := w.Write(data)
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ if n != len(data) {
+ t.Fatalf("expected %d bytes written, got %d", len(data), n)
+ }
+ if !bytes.Equal(*w.responseBody, data) {
+ t.Fatalf("expected response body %q, got %q", data, *w.responseBody)
+ }
+}
+
+func TestNetHTTPResponseWriter_WriteHeader(t *testing.T) {
+ t.Parallel()
+
+ var ctx fasthttp.RequestCtx
+ w := acquireNetHTTPResponseWriter(&ctx)
+ defer releaseNetHTTPResponseWriter(w)
+
+ statusCode := http.StatusNotFound
+ w.WriteHeader(statusCode)
+
+ if w.StatusCode() != statusCode {
+ t.Fatalf("expected status code %d, got %d", statusCode, w.StatusCode())
+ }
+}
+
+func TestNetHTTPResponseWriter_Header(t *testing.T) {
+ t.Parallel()
+
+ var ctx fasthttp.RequestCtx
+ w := acquireNetHTTPResponseWriter(&ctx)
+ defer releaseNetHTTPResponseWriter(w)
+
+ w.Header().Set("Content-Type", "application/json")
+ if w.Header().Get("Content-Type") != "application/json" {
+ t.Fatalf("expected Content-Type header to be 'application/json', got %q", w.Header().Get("Content-Type"))
+ }
+}
+
+func TestNetHTTPResponseWriter_Flush(t *testing.T) {
+ t.Parallel()
+
+ var ctx fasthttp.RequestCtx
+ w := acquireNetHTTPResponseWriter(&ctx)
+ defer releaseNetHTTPResponseWriter(w)
+
+ done := make(chan struct{})
+
+ // Start waiting for modeCh in the main thread to avoid infinite blocking.
+ go func() {
+ w.Flush()
+ close(done)
+ }()
+
+ // Wait for flush to start running.
+ select {
+ case <-w.modeCh:
+ case <-time.After(1 * time.Second):
+ t.Fatal("timeout waiting for modeCh signal")
+ }
+
+ select {
+ case <-done:
+ t.Fatal("Flush completed too early")
+ default:
+ }
+
+ // Signal streaming mode.
+ w.streamCond.L.Lock()
+ w.isStreaming = true
+ w.streamCond.Broadcast()
+ w.streamCond.L.Unlock()
+
+ select {
+ case <-done:
+ case <-time.After(1 * time.Second):
+ t.Fatal("Flush did not complete in time")
+ }
+}
+
+func TestNetHTTPResponseWriter_Hijack(t *testing.T) {
+ t.Parallel()
+
+ var ctx fasthttp.RequestCtx
+ w := acquireNetHTTPResponseWriter(&ctx)
+ defer releaseNetHTTPResponseWriter(w)
+
+ data := []byte("Hijacked data")
+ go func() {
+ conn, bufRW, _ := w.Hijack()
+ defer conn.Close()
+
+ _, _ = bufRW.Write(data)
+ _ = bufRW.Flush()
+ }()
+
+ select {
+ case <-w.modeCh:
+ case <-time.After(1 * time.Second):
+ t.Fatalf("timeout waiting for modeCh signal")
+ }
+
+ // Verify that hijack's returned connection can read from the bufRW.
+ readBuf := make([]byte, len(data))
+ _ = w.handlerConn.SetReadDeadline(time.Now().Add(1 * time.Second))
+ n, err := w.handlerConn.Read(readBuf)
+ if err != nil {
+ t.Fatalf("unexpected error reading from connection: %v", err)
+ }
+ if n != len(data) {
+ t.Fatalf("expected to read %d bytes, got %d", len(data), n)
+ }
+ if !bytes.Equal(readBuf, data) {
+ t.Fatalf("expected to read %q, got %q", data, readBuf)
+ }
+} I greatly appreciate your time and help reviewing my PRs. I hope we can still continue to support flushing in fasthttpadaptor as it would helpful for users that rely on |
Description
In gofiber/fiber#3741, there is a response writer race condition error triggered within their unit tests. The error seems to be a race condition due to trying to re-close a recycled netHTTPResponseWriter in fasthttpadaptor/adaptor.go:69.
Fixes #2068
Solution
The stem of the issue seems to be when releasing the writer while still having a reference to it (since Close() and reset() are both being called at the same time).
This PR uses a sync.WaitGroup to ensure wait until both the ServeHTTP() and/or SetBodyStreamWriter() and other concurrent tasks are done, then recycles the writer.
TODOsCompletedw
.fasthttpadaptor
.