Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 53 additions & 80 deletions fasthttpadaptor/adaptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,18 +60,21 @@ func NewFastHTTPHandler(h http.Handler) fasthttp.RequestHandler {
w := acquireNetHTTPResponseWriter(ctx)

// Concurrently serve the net/http handler.
w.wg.Add(1)
go func() {
h.ServeHTTP(w, r.WithContext(ctx))
select {
case w.modeCh <- modeDone:
default:
}
_ = w.Close()
}()
w.chOnce.Do(func() {
w.modeCh <- modeDone
})
w.Close()

mode := <-w.modeCh
// Wait for the net/http handler to complete before releasing.
// (e.g. wait for hijacked connection)
w.wg.Wait()
Copy link
Preview

Copilot AI Sep 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This creates a potential deadlock. The goroutine is calling w.wg.Wait() while it's part of the same WaitGroup that was incremented with w.wg.Add(1) at line 63. The goroutine cannot complete because it's waiting for itself to finish.

Suggested change
w.wg.Wait()

Copilot uses AI. Check for mistakes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe the comment isn't clear about what it means by waiting for the handler to complete. The WaitGroup is meant to track the lifetime of the reference to w in each mode type.

The w.wg.Wait() is necessary here to avoid recycling the writer while w is still in use (in SendStreamWriter, or the handlerConn still hasn't been closed).

releaseNetHTTPResponseWriter(w)
}()

switch mode {
switch <-w.modeCh {
case modeDone:
// No flush occurred before the handler returned.
// Send the data as one chunk.
Expand Down Expand Up @@ -99,14 +102,12 @@ func NewFastHTTPHandler(h http.Handler) fasthttp.RequestHandler {
ctx.Response.Header.Set(fasthttp.HeaderContentType, http.DetectContentType(b[:l]))
}

w.responseMutex.Lock()
if len(*w.responseBody) > 0 {
ctx.Response.SetBody(*w.responseBody)
}
w.responseMutex.Unlock()

// Release after sending response.
releaseNetHTTPResponseWriter(w)
// Signal that the net/http -> fasthttp copy is complete.
w.wg.Done()

case modeFlushed:
// Flush occurred before handler returned.
Expand All @@ -132,7 +133,6 @@ func NewFastHTTPHandler(h http.Handler) fasthttp.RequestHandler {

// Lock the current response body until
// it is sent in the StreamWriter function.
w.responseMutex.Lock()
if !haveContentType {
// From net/http.ResponseWriter.Write:
// If the Header does not contain a Content-Type line, Write adds a Content-Type set
Expand All @@ -146,15 +146,21 @@ func NewFastHTTPHandler(h http.Handler) fasthttp.RequestHandler {
}

// Start streaming mode on return.
w.r, w.w = io.Pipe()
Copy link
Preview

Copilot AI Sep 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The io.Pipe() creation should be moved to the reset() method or initialization to ensure pipes are always available when needed, rather than creating them only in flush mode.

Copilot uses AI. Check for mistakes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pipes are only needed specifically during streaming (flush) mode.

Since there is no need to allocate these pips in any other situation, I had updated to code to create the pipes only during stream mode since w.r and w.w are only used in streaming mode. This way, we don't allocate pipes unnecessarily.

ctx.SetBodyStreamWriter(func(bw *bufio.Writer) {
// Signal that streaming completed on return.
Comment on lines +147 to +149
Copy link
Preview

Copilot AI Sep 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pipe is created after the WaitGroup is decremented in the modeDone case but before it's used in streaming. This could cause a race condition where the pipe is accessed before it's initialized. The pipe creation should happen before the goroutine starts or be properly synchronized.

Suggested change
w.r, w.w = io.Pipe()
ctx.SetBodyStreamWriter(func(bw *bufio.Writer) {
// Signal that streaming completed on return.
// Create the pipe before starting the stream writer to avoid race conditions.
w.r, w.w = io.Pipe()
ctx.SetBodyStreamWriter(func(bw *bufio.Writer) {

Copilot uses AI. Check for mistakes.

defer func() {
w.r.Close()
w.wg.Done()
}()

// Stream the first chunk.
if len(*w.responseBody) > 0 {
_, _ = bw.Write(*w.responseBody)
_ = bw.Flush()
}
// The current response body is no longer used
// past this point.
w.responseMutex.Unlock()

// Stream the rest of the data that is read
// from the net/http handler in 32 KiB chunks.
Expand All @@ -163,16 +169,13 @@ func NewFastHTTPHandler(h http.Handler) fasthttp.RequestHandler {
// as data comes in.
chunk := acquireBuffer()
*chunk = (*chunk)[:minBufferSize]
defer releaseBuffer(chunk)
for {
// Read net/http handler chunk.
n, err := w.r.Read(*chunk)
if err != nil {
// Handler ended due to an io.EOF
// or an error occurred.
//
// Release the response writer for reuse.
releaseBuffer(chunk)
releaseNetHTTPResponseWriter(w)
return
}

Expand All @@ -182,21 +185,13 @@ func NewFastHTTPHandler(h http.Handler) fasthttp.RequestHandler {
if err != nil {
// Handler ended due to an io.ErrPipeClosed
// or an error occurred.
//
// Release the response writer for reuse.
releaseBuffer(chunk)
releaseNetHTTPResponseWriter(w)
return
}

err = bw.Flush()
if err != nil {
// Handler ended due to an io.ErrPipeClosed
// or an error occurred.
//
// Release the response writer for reuse.
releaseBuffer(chunk)
releaseNetHTTPResponseWriter(w)
return
}
}
Expand All @@ -213,31 +208,28 @@ func NewFastHTTPHandler(h http.Handler) fasthttp.RequestHandler {
// The net/http handler called w.Hijack().
// Copy data bidirectionally between the
// net/http and fasthttp connections.
var wg sync.WaitGroup
wg.Add(2)
w.hijackedWg.Add(2)

// Note: It is safe to assume that net.Conn automatically
// flushes data while copying.
go func() {
defer wg.Done()
defer w.hijackedWg.Done()
_, _ = io.Copy(ctx.Conn(), w.handlerConn)

// Close the fasthttp connection when
// the net/http connection closes.
_ = ctx.Conn().Close()
}()
go func() {
defer wg.Done()
defer w.hijackedWg.Done()
_, _ = io.Copy(w.handlerConn, ctx.Conn())
// Note: Only the net/http handler
// should close the connection.
}()
w.hijackedWg.Wait()

// Wait for the net/http handler to finish
// writing to the hijacked connection prior to releasing
// the writer into the writer pool.
wg.Wait()
releaseNetHTTPResponseWriter(w)
// Signal that the hijacked connection was closed.
w.wg.Done()
}
}
}
Expand Down Expand Up @@ -276,20 +268,20 @@ const (
)

type netHTTPResponseWriter struct {
handlerConn net.Conn
ctx *fasthttp.RequestCtx
h http.Header
r *io.PipeReader
w *io.PipeWriter
modeCh chan ModeType
responseBody *[]byte
streamCond *sync.Cond
statusCode int
once sync.Once
statusMutex sync.Mutex
responseMutex sync.Mutex
connMutex sync.Mutex
isStreaming bool
handlerConn net.Conn
ctx *fasthttp.RequestCtx
h http.Header
r *io.PipeReader
w *io.PipeWriter
modeCh chan ModeType
responseBody *[]byte
streamCond *sync.Cond
statusCode int
chOnce sync.Once
closeOnce sync.Once
isStreaming bool
wg sync.WaitGroup
hijackedWg sync.WaitGroup
}

func acquireNetHTTPResponseWriter(ctx *fasthttp.RequestCtx) *netHTTPResponseWriter {
Expand All @@ -305,7 +297,6 @@ func acquireNetHTTPResponseWriter(ctx *fasthttp.RequestCtx) *netHTTPResponseWrit

func releaseNetHTTPResponseWriter(w *netHTTPResponseWriter) {
releaseBuffer(w.responseBody)
w.Close()
writerPool.Put(w)
}

Expand All @@ -324,9 +315,6 @@ func releaseBuffer(buf *[]byte) {
}

func (w *netHTTPResponseWriter) StatusCode() int {
w.statusMutex.Lock()
defer w.statusMutex.Unlock()

if w.statusCode == 0 {
return http.StatusOK
}
Expand All @@ -338,9 +326,6 @@ func (w *netHTTPResponseWriter) Header() http.Header {
}

func (w *netHTTPResponseWriter) WriteHeader(statusCode int) {
w.statusMutex.Lock()
defer w.statusMutex.Unlock()

w.statusCode = statusCode
}

Expand All @@ -356,15 +341,13 @@ func (w *netHTTPResponseWriter) Write(p []byte) (int, error) {

// Streaming mode is off.
// Write to the first chunk for flushing later.
w.responseMutex.Lock()
*w.responseBody = append(*w.responseBody, p...)
w.responseMutex.Unlock()
return len(p), nil
}

func (w *netHTTPResponseWriter) Flush() {
// Trigger streaming mode setup.
w.once.Do(func() {
w.chOnce.Do(func() {
w.modeCh <- modeFlushed
})

Expand All @@ -385,47 +368,38 @@ func (w *netHTTPResponseWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) {
w.handlerConn = fasthttpConn

// Trigger hijacked mode.
w.once.Do(func() {
w.chOnce.Do(func() {
w.modeCh <- modeHijacked
})

bufRW := bufio.NewReadWriter(bufio.NewReader(netHTTPConn), bufio.NewWriter(netHTTPConn))

// Write any unflushed body to the hijacked connection buffer.
w.responseMutex.Lock()
if len(*w.responseBody) > 0 {
_, _ = bufRW.Write(*w.responseBody)
_ = bufRW.Flush()
}
w.responseMutex.Unlock()
return netHTTPConn, bufRW, nil
}

func (w *netHTTPResponseWriter) Close() error {
_ = w.w.Close()
_ = w.r.Close()

w.connMutex.Lock()
if w.handlerConn != nil {
_ = w.handlerConn.Close()
}
w.connMutex.Unlock()
w.closeOnce.Do(func() {
if w.w != nil {
_ = w.w.Close()
}
Copy link
Preview

Copilot AI Sep 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Close() method only closes the writer part of the pipe but not the reader (w.r). This could lead to resource leaks. Both w.r and w.w should be closed, or the full pipe should be properly cleaned up.

Suggested change
}
}
if w.r != nil {
_ = w.r.Close()
}

Copilot uses AI. Check for mistakes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Close() method is from the perspective of the net/http handler.

When the net/http handler closes the writer, this should close the writer end w.w of the streaming mode. The reader end that is used by fasthttpw.r will then return an EOF error and be closed along with the client connection separately.

If any other mode occurs, the Close() operation does nothing since no internal writers/readers are managed by fasthttp at that point. Hijack mode gives complete control to the user to control w.handlerConn, while done mode only uses a recycled buffer.

})
return nil
}

func (w *netHTTPResponseWriter) reset() {
// Note: reset() must only run after a fasthttp handler finishes
// proxying the full net/http handler response to ensure no data races.
w.ctx = nil
w.connMutex.Lock()
w.handlerConn = nil
w.connMutex.Unlock()
w.statusCode = 0

// Open new bidirectional pipes
pr, pw := io.Pipe()
w.r = pr
w.w = pw
w.w = nil
w.r = nil
w.handlerConn = nil

// Clear the http Header
for key := range w.h {
Expand All @@ -435,8 +409,7 @@ func (w *netHTTPResponseWriter) reset() {
// Get a new buffer for the response body
w.responseBody = acquireBuffer()

w.once = sync.Once{}
w.streamCond.L.Lock()
w.chOnce = sync.Once{}
w.closeOnce = sync.Once{}
w.isStreaming = false
w.streamCond.L.Unlock()
}
Loading
Loading