Skip to content

Conversation

grivera64
Copy link
Contributor

@grivera64 grivera64 commented Sep 18, 2025

Description

In gofiber/fiber#3741, there is a response writer race condition error triggered within their unit tests. The error seems to be a race condition due to trying to re-close a recycled netHTTPResponseWriter in fasthttpadaptor/adaptor.go:69.

Fixes #2068

Solution

The stem of the issue seems to be when releasing the writer while still having a reference to it (since Close() and reset() are both being called at the same time).

This PR uses a sync.WaitGroup to ensure wait until both the ServeHTTP() and/or SetBodyStreamWriter() and other concurrent tasks are done, then recycles the writer.

TODOs Completed

  • Use sync.WaitGroup to ensure no references are still using w.
  • Add concrete unit tests to prevent future race condition related regressions for fasthttpadaptor.

Copy link

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR fixes a race condition in fasthttpadaptor v1.66.0 where a recycled netHTTPResponseWriter could be re-closed while still in use, causing concurrent access issues during resource cleanup.

  • Implements synchronization using sync.WaitGroup to ensure proper coordination between concurrent operations
  • Removes multiple mutex locks that were causing deadlocks and race conditions
  • Adds comprehensive test coverage to prevent future race condition regressions

Reviewed Changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.

File Description
fasthttpadaptor/adaptor.go Implements WaitGroup-based synchronization, removes problematic mutexes, and restructures resource cleanup logic
fasthttpadaptor/adaptor_test.go Adds new test functions to verify race condition fixes and resource recycling under load

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

mode := <-w.modeCh
// Wait for the net/http handler to complete before releasing.
// (e.g. wait for hijacked connection)
w.wg.Wait()
Copy link
Preview

Copilot AI Sep 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This creates a potential deadlock. The goroutine is calling w.wg.Wait() while it's part of the same WaitGroup that was incremented with w.wg.Add(1) at line 63. The goroutine cannot complete because it's waiting for itself to finish.

Suggested change
w.wg.Wait()

Copilot uses AI. Check for mistakes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe the comment isn't clear about what it means by waiting for the handler to complete. The WaitGroup is meant to track the lifetime of the reference to w in each mode type.

The w.wg.Wait() is necessary here to avoid recycling the writer while w is still in use (in SendStreamWriter, or the handlerConn still hasn't been closed).

Comment on lines +149 to +151
w.r, w.w = io.Pipe()
ctx.SetBodyStreamWriter(func(bw *bufio.Writer) {
// Signal that streaming completed on return.
Copy link
Preview

Copilot AI Sep 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pipe is created after the WaitGroup is decremented in the modeDone case but before it's used in streaming. This could cause a race condition where the pipe is accessed before it's initialized. The pipe creation should happen before the goroutine starts or be properly synchronized.

Suggested change
w.r, w.w = io.Pipe()
ctx.SetBodyStreamWriter(func(bw *bufio.Writer) {
// Signal that streaming completed on return.
// Create the pipe before starting the stream writer to avoid race conditions.
w.r, w.w = io.Pipe()
ctx.SetBodyStreamWriter(func(bw *bufio.Writer) {

Copilot uses AI. Check for mistakes.

w.closeOnce.Do(func() {
if w.w != nil {
_ = w.w.Close()
}
Copy link
Preview

Copilot AI Sep 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Close() method only closes the writer part of the pipe but not the reader (w.r). This could lead to resource leaks. Both w.r and w.w should be closed, or the full pipe should be properly cleaned up.

Suggested change
}
}
if w.r != nil {
_ = w.r.Close()
}

Copilot uses AI. Check for mistakes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Close() method is from the perspective of the net/http handler.

When the net/http handler closes the writer, this should close the writer end w.w of the streaming mode. The reader end that is used by fasthttpw.r will then return an EOF error and be closed along with the client connection separately.

If any other mode occurs, the Close() operation does nothing since no internal writers/readers are managed by fasthttp at that point. Hijack mode gives complete control to the user to control w.handlerConn, while done mode only uses a recycled buffer.

Copy link

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.


Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

}

// Start streaming mode on return.
w.r, w.w = io.Pipe()
Copy link
Preview

Copilot AI Sep 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The io.Pipe() creation should be moved to the reset() method or initialization to ensure pipes are always available when needed, rather than creating them only in flush mode.

Copilot uses AI. Check for mistakes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pipes are only needed specifically during streaming (flush) mode.

Since there is no need to allocate these pips in any other situation, I had updated to code to create the pipes only during stream mode since w.r and w.w are only used in streaming mode. This way, we don't allocate pipes unnecessarily.

@grivera64 grivera64 requested a review from Copilot September 19, 2025 23:54
Copy link

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

Copilot reviewed 4 out of 4 changed files in this pull request and generated 2 comments.


Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

Comment on lines +188 to +189
w.w = nil
w.r = nil
Copy link
Preview

Copilot AI Sep 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting pipe reader/writer to nil in reset() while concurrent operations might still be using them could cause nil pointer dereferences. These should only be reset after ensuring all operations have completed.

Copilot uses AI. Check for mistakes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A reset only occurs when the writer is acquired. The writer at this stage should have completed all operations as expected.

The new wait group should ensure this.

@grivera64 grivera64 marked this pull request as ready for review September 19, 2025 23:58
@grivera64 grivera64 requested a review from Copilot September 22, 2025 20:33
Copy link

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

Copilot reviewed 5 out of 5 changed files in this pull request and generated no new comments.


Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

@grivera64
Copy link
Contributor Author

@erikdubbelboer I've made some changes to address the rare reset race condition. I also moved the netHTTPResponseWriter (now called responseWriter) code into response_writer.go and added a few test cases for its methods as well.

Please let me know your thoughts on these changes and the AI hints. This should address the issue in a more robust way, since the release of the writer is now located in a single location for the adaptor.

@erikdubbelboer
Copy link
Collaborator

I'm sorry but these changes are too many and too messy for me to follow. The initial flushing support PR was already super hard to properly review.

I'm strongly considering reverting flushing support completely for now. Then we can have a look at re-doing the whole thing in a different way.

@grivera64
Copy link
Contributor Author

grivera64 commented Sep 25, 2025

I'm sorry but these changes are too many and too messy for me to follow. The initial flushing support PR was already super hard to properly review.

I'm strongly considering reverting flushing support completely for now. Then we can have a look at re-doing the whole thing in a different way.

You’re right, I’m sorry for the large PR. I included extra changes, such as documentation updates, renames, and optimizing field allocation to enhance readability and add unit tests. However, I realize that this has made the PR difficult to follow.

I’m more than happy to revert these unnecessary changes and only focus on the essential ones that address the race condition in the PR. Or if it's best, I can close the PR and we can look into starting over to ensure a cleaner implementation.

If we decide to proceed with the current PR after removing the extra changes, the diff would include approximately 51 new lines and 50 removed lines for adaptor.go. This should be more manageable.

adaptor.go +52 -51
diff --git a/fasthttpadaptor/adaptor.go b/fasthttpadaptor/adaptor.go
index 48cd0e3..5833074 100644
--- a/fasthttpadaptor/adaptor.go
+++ b/fasthttpadaptor/adaptor.go
@@ -60,18 +60,21 @@ func NewFastHTTPHandler(h http.Handler) fasthttp.RequestHandler {
 		w := acquireNetHTTPResponseWriter(ctx)
 
 		// Concurrently serve the net/http handler.
+		w.wg.Add(1)
 		go func() {
 			h.ServeHTTP(w, r.WithContext(ctx))
-			select {
-			case w.modeCh <- modeDone:
-			default:
-			}
-			_ = w.Close()
-		}()
+			w.chOnce.Do(func() {
+				w.modeCh <- modeDone
+			})
+			w.Close()
 
-		mode := <-w.modeCh
+			// Wait for the net/http handler to complete before releasing.
+			// (e.g. wait for hijacked connection)
+			w.wg.Wait()
+			releaseNetHTTPResponseWriter(w)
+		}()
 
-		switch mode {
+		switch <-w.modeCh {
 		case modeDone:
 			// No flush occurred before the handler returned.
 			// Send the data as one chunk.
@@ -87,6 +90,7 @@ func NewFastHTTPHandler(h http.Handler) fasthttp.RequestHandler {
 				}
 			}
 
+			w.responseMutex.Lock()
 			if !haveContentType {
 				// From net/http.ResponseWriter.Write:
 				// If the Header does not contain a Content-Type line, Write adds a Content-Type set
@@ -99,14 +103,13 @@ func NewFastHTTPHandler(h http.Handler) fasthttp.RequestHandler {
 				ctx.Response.Header.Set(fasthttp.HeaderContentType, http.DetectContentType(b[:l]))
 			}
 
-			w.responseMutex.Lock()
 			if len(*w.responseBody) > 0 {
 				ctx.Response.SetBody(*w.responseBody)
 			}
 			w.responseMutex.Unlock()
 
-			// Release after sending response.
-			releaseNetHTTPResponseWriter(w)
+			// Signal that the net/http -> fasthttp copy is complete.
+			w.wg.Done()
 
 		case modeFlushed:
 			// Flush occurred before handler returned.
@@ -145,8 +148,13 @@ func NewFastHTTPHandler(h http.Handler) fasthttp.RequestHandler {
 				ctx.Response.Header.Set(fasthttp.HeaderContentType, http.DetectContentType(b[:l]))
 			}
 
-			// Start streaming mode on return.
 			ctx.SetBodyStreamWriter(func(bw *bufio.Writer) {
+				// Signal that streaming completed on return.
+				defer func() {
+					w.r.Close()
+					w.wg.Done()
+				}()
+
 				// Stream the first chunk.
 				if len(*w.responseBody) > 0 {
 					_, _ = bw.Write(*w.responseBody)
@@ -163,16 +171,13 @@ func NewFastHTTPHandler(h http.Handler) fasthttp.RequestHandler {
 				// as data comes in.
 				chunk := acquireBuffer()
 				*chunk = (*chunk)[:minBufferSize]
+				defer releaseBuffer(chunk)
 				for {
 					// Read net/http handler chunk.
 					n, err := w.r.Read(*chunk)
 					if err != nil {
 						// Handler ended due to an io.EOF
 						// or an error occurred.
-						//
-						// Release the response writer for reuse.
-						releaseBuffer(chunk)
-						releaseNetHTTPResponseWriter(w)
 						return
 					}
 
@@ -182,10 +187,6 @@ func NewFastHTTPHandler(h http.Handler) fasthttp.RequestHandler {
 						if err != nil {
 							// Handler ended due to an io.ErrPipeClosed
 							// or an error occurred.
-							//
-							// Release the response writer for reuse.
-							releaseBuffer(chunk)
-							releaseNetHTTPResponseWriter(w)
 							return
 						}
 
@@ -193,10 +194,6 @@ func NewFastHTTPHandler(h http.Handler) fasthttp.RequestHandler {
 						if err != nil {
 							// Handler ended due to an io.ErrPipeClosed
 							// or an error occurred.
-							//
-							// Release the response writer for reuse.
-							releaseBuffer(chunk)
-							releaseNetHTTPResponseWriter(w)
 							return
 						}
 					}
@@ -213,13 +210,12 @@ func NewFastHTTPHandler(h http.Handler) fasthttp.RequestHandler {
 			// The net/http handler called w.Hijack().
 			// Copy data bidirectionally between the
 			// net/http and fasthttp connections.
-			var wg sync.WaitGroup
-			wg.Add(2)
+			w.hijackedWg.Add(2)
 
 			// Note: It is safe to assume that net.Conn automatically
 			// flushes data while copying.
 			go func() {
-				defer wg.Done()
+				defer w.hijackedWg.Done()
 				_, _ = io.Copy(ctx.Conn(), w.handlerConn)
 
 				// Close the fasthttp connection when
@@ -227,17 +223,18 @@ func NewFastHTTPHandler(h http.Handler) fasthttp.RequestHandler {
 				_ = ctx.Conn().Close()
 			}()
 			go func() {
-				defer wg.Done()
+				defer w.hijackedWg.Done()
 				_, _ = io.Copy(w.handlerConn, ctx.Conn())
 				// Note: Only the net/http handler
 				// should close the connection.
 			}()
 
 			// Wait for the net/http handler to finish
-			// writing to the hijacked connection prior to releasing
-			// the writer into the writer pool.
-			wg.Wait()
-			releaseNetHTTPResponseWriter(w)
+			// writing to the hijacked connection.
+			w.hijackedWg.Wait()
+
+			// Signal that the hijacked connection was closed.
+			w.wg.Done()
 		}
 	}
 }
@@ -259,17 +256,17 @@ var writerPool = &sync.Pool{
 			h:            make(http.Header),
 			r:            pr,
 			w:            pw,
-			modeCh:       make(chan ModeType),
+			modeCh:       make(chan modeType),
 			responseBody: acquireBuffer(),
 			streamCond:   sync.NewCond(&sync.Mutex{}),
 		}
 	},
 }
 
-type ModeType int
+type modeType int
 
 const (
-	modeUnknown ModeType = iota
+	modeUnknown modeType = iota //nolint:unused
 	modeDone
 	modeFlushed
 	modeHijacked
@@ -281,15 +278,18 @@ type netHTTPResponseWriter struct {
 	h             http.Header
 	r             *io.PipeReader
 	w             *io.PipeWriter
-	modeCh        chan ModeType
+	modeCh        chan modeType
 	responseBody  *[]byte
 	streamCond    *sync.Cond
 	statusCode    int
-	once          sync.Once
+	chOnce        sync.Once
+	closeOnce     sync.Once
 	statusMutex   sync.Mutex
 	responseMutex sync.Mutex
 	connMutex     sync.Mutex
 	isStreaming   bool
+	wg            sync.WaitGroup
+	hijackedWg    sync.WaitGroup
 }
 
 func acquireNetHTTPResponseWriter(ctx *fasthttp.RequestCtx) *netHTTPResponseWriter {
@@ -305,7 +305,6 @@ func acquireNetHTTPResponseWriter(ctx *fasthttp.RequestCtx) *netHTTPResponseWrit
 
 func releaseNetHTTPResponseWriter(w *netHTTPResponseWriter) {
 	releaseBuffer(w.responseBody)
-	w.Close()
 	writerPool.Put(w)
 }
 
@@ -355,7 +354,7 @@ func (w *netHTTPResponseWriter) Write(p []byte) (int, error) {
 	}
 
 	// Streaming mode is off.
-	// Write to the first chunk for flushing later.
+	// Write to the response body buffer for flushing later.
 	w.responseMutex.Lock()
 	*w.responseBody = append(*w.responseBody, p...)
 	w.responseMutex.Unlock()
@@ -364,7 +363,7 @@ func (w *netHTTPResponseWriter) Write(p []byte) (int, error) {
 
 func (w *netHTTPResponseWriter) Flush() {
 	// Trigger streaming mode setup.
-	w.once.Do(func() {
+	w.chOnce.Do(func() {
 		w.modeCh <- modeFlushed
 	})
 
@@ -377,7 +376,7 @@ func (w *netHTTPResponseWriter) Flush() {
 }
 
 func (w *netHTTPResponseWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) {
-	// Hijack assumes control of the connection, so we need to prevent fasthttp from closing it or
+	// Prevent fasthttp from closing it or
 	// doing anything else with it.
 	w.ctx.HijackSetNoResponse(true)
 
@@ -385,7 +384,7 @@ func (w *netHTTPResponseWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) {
 	w.handlerConn = fasthttpConn
 
 	// Trigger hijacked mode.
-	w.once.Do(func() {
+	w.chOnce.Do(func() {
 		w.modeCh <- modeHijacked
 	})
 
@@ -402,14 +401,16 @@ func (w *netHTTPResponseWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) {
 }
 
 func (w *netHTTPResponseWriter) Close() error {
-	_ = w.w.Close()
-	_ = w.r.Close()
+	w.closeOnce.Do(func() {
+		_ = w.w.Close()
+		_ = w.r.Close()
 
-	w.connMutex.Lock()
-	if w.handlerConn != nil {
-		_ = w.handlerConn.Close()
-	}
-	w.connMutex.Unlock()
+		w.connMutex.Lock()
+		if w.handlerConn != nil {
+			_ = w.handlerConn.Close()
+		}
+		w.connMutex.Unlock()
+	})
 	return nil
 }
 
@@ -422,7 +423,6 @@ func (w *netHTTPResponseWriter) reset() {
 	w.connMutex.Unlock()
 	w.statusCode = 0
 
-	// Open new bidirectional pipes
 	pr, pw := io.Pipe()
 	w.r = pr
 	w.w = pw
@@ -435,7 +435,8 @@ func (w *netHTTPResponseWriter) reset() {
 	// Get a new buffer for the response body
 	w.responseBody = acquireBuffer()
 
-	w.once = sync.Once{}
+	w.chOnce = sync.Once{}
+	w.closeOnce = sync.Once{}
 	w.streamCond.L.Lock()
 	w.isStreaming = false
 	w.streamCond.L.Unlock()

And for adaptor_test.go, there would be 8 new unit tests (that were originally added in this PR under response_writer_test.go).

adaptor_test.go +317 -0
diff --git a/fasthttpadaptor/adaptor_test.go b/fasthttpadaptor/adaptor_test.go
index e5b6f8c..b21cb3b 100644
--- a/fasthttpadaptor/adaptor_test.go
+++ b/fasthttpadaptor/adaptor_test.go
@@ -2,9 +2,13 @@ package fasthttpadaptor
 
 import (
 	"bufio"
+	"bytes"
+	"fmt"
 	"io"
 	"net"
 	"net/http"
+	"net/http/httptest"
+	"net/http/httputil"
 	"net/url"
 	"reflect"
 	"testing"
@@ -140,6 +144,132 @@ func TestNewFastHTTPHandler(t *testing.T) {
 	}
 }
 
+func TestNewFastHTTPHandlerFunc(t *testing.T) {
+	t.Parallel()
+
+	expectedMethod := fasthttp.MethodPost
+	expectedProto := "HTTP/1.1"
+	expectedProtoMajor := 1
+	expectedProtoMinor := 1
+	expectedRequestURI := "/foo/bar?baz=123"
+	expectedBody := "<!doctype html><html>"
+	expectedContentLength := len(expectedBody)
+	expectedHost := "foobar.com"
+	expectedRemoteAddr := "1.2.3.4:6789"
+	expectedHeader := map[string]string{
+		"Foo-Bar":         "baz",
+		"Abc":             "defg",
+		"XXX-Remote-Addr": "123.43.4543.345",
+	}
+	expectedURL, err := url.ParseRequestURI(expectedRequestURI)
+	if err != nil {
+		t.Fatalf("unexpected error: %v", err)
+	}
+	expectedContextKey := "contextKey"
+	expectedContextValue := "contextValue"
+	expectedContentType := "text/html; charset=utf-8"
+
+	callsCount := 0
+	nethttpH := func(w http.ResponseWriter, r *http.Request) {
+		callsCount++
+		if r.Method != expectedMethod {
+			t.Fatalf("unexpected method %q. Expecting %q", r.Method, expectedMethod)
+		}
+		if r.Proto != expectedProto {
+			t.Fatalf("unexpected proto %q. Expecting %q", r.Proto, expectedProto)
+		}
+		if r.ProtoMajor != expectedProtoMajor {
+			t.Fatalf("unexpected protoMajor %d. Expecting %d", r.ProtoMajor, expectedProtoMajor)
+		}
+		if r.ProtoMinor != expectedProtoMinor {
+			t.Fatalf("unexpected protoMinor %d. Expecting %d", r.ProtoMinor, expectedProtoMinor)
+		}
+		if r.RequestURI != expectedRequestURI {
+			t.Fatalf("unexpected requestURI %q. Expecting %q", r.RequestURI, expectedRequestURI)
+		}
+		if r.ContentLength != int64(expectedContentLength) {
+			t.Fatalf("unexpected contentLength %d. Expecting %d", r.ContentLength, expectedContentLength)
+		}
+		if len(r.TransferEncoding) != 0 {
+			t.Fatalf("unexpected transferEncoding %q. Expecting []", r.TransferEncoding)
+		}
+		if r.Host != expectedHost {
+			t.Fatalf("unexpected host %q. Expecting %q", r.Host, expectedHost)
+		}
+		if r.RemoteAddr != expectedRemoteAddr {
+			t.Fatalf("unexpected remoteAddr %q. Expecting %q", r.RemoteAddr, expectedRemoteAddr)
+		}
+		body, err := io.ReadAll(r.Body)
+		r.Body.Close()
+		if err != nil {
+			t.Fatalf("unexpected error when reading request body: %v", err)
+		}
+		if string(body) != expectedBody {
+			t.Fatalf("unexpected body %q. Expecting %q", body, expectedBody)
+		}
+		if !reflect.DeepEqual(r.URL, expectedURL) {
+			t.Fatalf("unexpected URL: %#v. Expecting %#v", r.URL, expectedURL)
+		}
+		if r.Context().Value(expectedContextKey) != expectedContextValue {
+			t.Fatalf("unexpected context value for key %q. Expecting %q", expectedContextKey, expectedContextValue)
+		}
+
+		for k, expectedV := range expectedHeader {
+			v := r.Header.Get(k)
+			if v != expectedV {
+				t.Fatalf("unexpected header value %q for key %q. Expecting %q", v, k, expectedV)
+			}
+		}
+
+		w.Header().Set("Header1", "value1")
+		w.Header().Set("Header2", "value2")
+		w.WriteHeader(http.StatusBadRequest)
+		w.Write(body) //nolint:errcheck
+	}
+	fasthttpH := NewFastHTTPHandlerFunc(nethttpH)
+	fasthttpH = setContextValueMiddleware(fasthttpH, expectedContextKey, expectedContextValue)
+
+	var ctx fasthttp.RequestCtx
+	var req fasthttp.Request
+
+	req.Header.SetMethod(expectedMethod)
+	req.SetRequestURI(expectedRequestURI)
+	req.Header.SetHost(expectedHost)
+	req.BodyWriter().Write([]byte(expectedBody)) //nolint:errcheck
+	for k, v := range expectedHeader {
+		req.Header.Set(k, v)
+	}
+
+	remoteAddr, err := net.ResolveTCPAddr("tcp", expectedRemoteAddr)
+	if err != nil {
+		t.Fatalf("unexpected error: %v", err)
+	}
+	ctx.Init(&req, remoteAddr, nil)
+
+	fasthttpH(&ctx)
+
+	if callsCount != 1 {
+		t.Fatalf("unexpected callsCount: %d. Expecting 1", callsCount)
+	}
+
+	resp := &ctx.Response
+	if resp.StatusCode() != fasthttp.StatusBadRequest {
+		t.Fatalf("unexpected statusCode: %d. Expecting %d", resp.StatusCode(), fasthttp.StatusBadRequest)
+	}
+	if string(resp.Header.Peek("Header1")) != "value1" {
+		t.Fatalf("unexpected header value: %q. Expecting %q", resp.Header.Peek("Header1"), "value1")
+	}
+	if string(resp.Header.Peek("Header2")) != "value2" {
+		t.Fatalf("unexpected header value: %q. Expecting %q", resp.Header.Peek("Header2"), "value2")
+	}
+	if string(resp.Body()) != expectedBody {
+		t.Fatalf("unexpected response body %q. Expecting %q", resp.Body(), expectedBody)
+	}
+	if string(resp.Header.Peek("Content-Type")) != expectedContentType {
+		t.Fatalf("unexpected response content-type %q. Expecting %q", string(resp.Header.Peek("Content-Type")), expectedContentType)
+	}
+}
+
 func TestNewFastHTTPHandlerWithCookies(t *testing.T) {
 	expectedMethod := fasthttp.MethodPost
 	expectedRequestURI := "/foo/bar?baz=123"
@@ -430,3 +560,193 @@ func TestHijackFlush(t *testing.T) {
 		t.Fatal("timeout")
 	}
 }
+
+func TestNetHTTPResponseWriter_Reset(t *testing.T) {
+	t.Parallel()
+
+	var ctx fasthttp.RequestCtx
+	w := acquireNetHTTPResponseWriter(&ctx)
+	defer releaseNetHTTPResponseWriter(w)
+
+	w.WriteHeader(http.StatusMethodNotAllowed)
+	w.Header().Set("Content-Type", "text/plain")
+	_, _ = w.Write([]byte("test"))
+
+	w.reset()
+
+	if w.StatusCode() != http.StatusOK {
+		t.Fatalf("expected status code to be reset to %d, got %d", http.StatusOK, w.StatusCode())
+	}
+	if len(w.Header()) != 0 {
+		t.Fatalf("expected headers to be cleared, got %v", w.Header())
+	}
+	if len(*w.responseBody) != 0 {
+		t.Fatalf("expected response body to be cleared, got %q", *w.responseBody)
+	}
+	if w.isStreaming {
+		t.Fatalf("expected isStreaming to be false, got true")
+	}
+	if w.streamCond == nil {
+		t.Fatalf("expected streamCond to be initialized, got nil")
+	}
+	if w.handlerConn != nil {
+		t.Fatalf("expected handlerConn to be nil, got %q", w.handlerConn)
+	}
+}
+
+func TestNetHTTPResponseWriter_Pool(t *testing.T) {
+	t.Parallel()
+
+	ctx := new(fasthttp.RequestCtx)
+	w := acquireNetHTTPResponseWriter(ctx)
+	defer releaseNetHTTPResponseWriter(w)
+
+	if w.ctx != ctx {
+		t.Fatalf("Passed in context did not match the current context.")
+	}
+
+	if w.StatusCode() != http.StatusOK {
+		t.Fatalf("expected status code to be reset to %d, got %d", http.StatusOK, w.StatusCode())
+	}
+	if len(w.Header()) != 0 {
+		t.Fatalf("expected headers to be cleared, got %v", w.Header())
+	}
+	if len(*w.responseBody) != 0 {
+		t.Fatalf("expected response body to be cleared, got %q", *w.responseBody)
+	}
+	if w.isStreaming {
+		t.Fatalf("expected isStreaming to be false, got true")
+	}
+	if w.streamCond == nil {
+		t.Fatalf("expected streamCond to be initialized, got nil")
+	}
+	if w.handlerConn != nil {
+		t.Fatalf("expected handlerConn to be nil, got %q", w.handlerConn)
+	}
+}
+
+func TestNetHTTPResponseWriter_Write(t *testing.T) {
+	t.Parallel()
+
+	var ctx fasthttp.RequestCtx
+	w := acquireNetHTTPResponseWriter(&ctx)
+	defer releaseNetHTTPResponseWriter(w)
+
+	data := []byte("Hello, World!")
+	n, err := w.Write(data)
+	if err != nil {
+		t.Fatalf("unexpected error: %v", err)
+	}
+	if n != len(data) {
+		t.Fatalf("expected %d bytes written, got %d", len(data), n)
+	}
+	if !bytes.Equal(*w.responseBody, data) {
+		t.Fatalf("expected response body %q, got %q", data, *w.responseBody)
+	}
+}
+
+func TestNetHTTPResponseWriter_WriteHeader(t *testing.T) {
+	t.Parallel()
+
+	var ctx fasthttp.RequestCtx
+	w := acquireNetHTTPResponseWriter(&ctx)
+	defer releaseNetHTTPResponseWriter(w)
+
+	statusCode := http.StatusNotFound
+	w.WriteHeader(statusCode)
+
+	if w.StatusCode() != statusCode {
+		t.Fatalf("expected status code %d, got %d", statusCode, w.StatusCode())
+	}
+}
+
+func TestNetHTTPResponseWriter_Header(t *testing.T) {
+	t.Parallel()
+
+	var ctx fasthttp.RequestCtx
+	w := acquireNetHTTPResponseWriter(&ctx)
+	defer releaseNetHTTPResponseWriter(w)
+
+	w.Header().Set("Content-Type", "application/json")
+	if w.Header().Get("Content-Type") != "application/json" {
+		t.Fatalf("expected Content-Type header to be 'application/json', got %q", w.Header().Get("Content-Type"))
+	}
+}
+
+func TestNetHTTPResponseWriter_Flush(t *testing.T) {
+	t.Parallel()
+
+	var ctx fasthttp.RequestCtx
+	w := acquireNetHTTPResponseWriter(&ctx)
+	defer releaseNetHTTPResponseWriter(w)
+
+	done := make(chan struct{})
+
+	// Start waiting for modeCh in the main thread to avoid infinite blocking.
+	go func() {
+		w.Flush()
+		close(done)
+	}()
+
+	// Wait for flush to start running.
+	select {
+	case <-w.modeCh:
+	case <-time.After(1 * time.Second):
+		t.Fatal("timeout waiting for modeCh signal")
+	}
+
+	select {
+	case <-done:
+		t.Fatal("Flush completed too early")
+	default:
+	}
+
+	// Signal streaming mode.
+	w.streamCond.L.Lock()
+	w.isStreaming = true
+	w.streamCond.Broadcast()
+	w.streamCond.L.Unlock()
+
+	select {
+	case <-done:
+	case <-time.After(1 * time.Second):
+		t.Fatal("Flush did not complete in time")
+	}
+}
+
+func TestNetHTTPResponseWriter_Hijack(t *testing.T) {
+	t.Parallel()
+
+	var ctx fasthttp.RequestCtx
+	w := acquireNetHTTPResponseWriter(&ctx)
+	defer releaseNetHTTPResponseWriter(w)
+
+	data := []byte("Hijacked data")
+	go func() {
+		conn, bufRW, _ := w.Hijack()
+		defer conn.Close()
+
+		_, _ = bufRW.Write(data)
+		_ = bufRW.Flush()
+	}()
+
+	select {
+	case <-w.modeCh:
+	case <-time.After(1 * time.Second):
+		t.Fatalf("timeout waiting for modeCh signal")
+	}
+
+	// Verify that hijack's returned connection can read from the bufRW.
+	readBuf := make([]byte, len(data))
+	_ = w.handlerConn.SetReadDeadline(time.Now().Add(1 * time.Second))
+	n, err := w.handlerConn.Read(readBuf)
+	if err != nil {
+		t.Fatalf("unexpected error reading from connection: %v", err)
+	}
+	if n != len(data) {
+		t.Fatalf("expected to read %d bytes, got %d", len(data), n)
+	}
+	if !bytes.Equal(readBuf, data) {
+		t.Fatalf("expected to read %q, got %q", data, readBuf)
+	}
+}

I greatly appreciate your time and help reviewing my PRs. I hope we can still continue to support flushing in fasthttpadaptor as it would helpful for users that rely on net/http based libraries.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Race Condition in fasthttpadaptor v1.66.0
2 participants