-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Fix Race Condition in fasthttpadaptor v1.66.0 #2069
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 11 commits
59f21b3
6c2101e
5312c11
df1f51e
0b76855
cfeccbf
864629c
2c9a72d
065b350
f4bb3a7
199a009
09bdd43
ab8a223
67ec2d2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -1,13 +1,11 @@ | ||||||||||||||||
// Package fasthttpadaptor provides helper functions for converting net/http | ||||||||||||||||
// Package fasthttpadaptor provides helper functions for converting net/httpG | ||||||||||||||||
// request handlers to fasthttp request handlers. | ||||||||||||||||
package fasthttpadaptor | ||||||||||||||||
|
||||||||||||||||
import ( | ||||||||||||||||
"bufio" | ||||||||||||||||
"io" | ||||||||||||||||
"net" | ||||||||||||||||
"net/http" | ||||||||||||||||
"sync" | ||||||||||||||||
|
||||||||||||||||
"github.com/valyala/fasthttp" | ||||||||||||||||
) | ||||||||||||||||
|
@@ -57,22 +55,25 @@ func NewFastHTTPHandler(h http.Handler) fasthttp.RequestHandler { | |||||||||||||||
return | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
w := acquireNetHTTPResponseWriter(ctx) | ||||||||||||||||
w := acquireResponseWriter(ctx) | ||||||||||||||||
|
||||||||||||||||
// Concurrently serve the net/http handler. | ||||||||||||||||
w.wg.Add(1) | ||||||||||||||||
go func() { | ||||||||||||||||
h.ServeHTTP(w, r.WithContext(ctx)) | ||||||||||||||||
select { | ||||||||||||||||
case w.modeCh <- modeDone: | ||||||||||||||||
default: | ||||||||||||||||
} | ||||||||||||||||
_ = w.Close() | ||||||||||||||||
}() | ||||||||||||||||
w.chOnce.Do(func() { | ||||||||||||||||
w.modeCh <- modeCopy | ||||||||||||||||
}) | ||||||||||||||||
w.close() | ||||||||||||||||
|
||||||||||||||||
mode := <-w.modeCh | ||||||||||||||||
// Wait for the net/http handler to complete before releasing. | ||||||||||||||||
// (e.g. wait for hijacked connection) | ||||||||||||||||
w.wg.Wait() | ||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This creates a potential deadlock. The goroutine is calling
Suggested change
Copilot uses AI. Check for mistakes. Positive FeedbackNegative Feedback There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe the comment isn't clear about what it means by waiting for the handler to complete. The WaitGroup is meant to track the lifetime of the reference to The |
||||||||||||||||
releaseResponseWriter(w) | ||||||||||||||||
}() | ||||||||||||||||
|
||||||||||||||||
switch mode { | ||||||||||||||||
case modeDone: | ||||||||||||||||
switch <-w.modeCh { | ||||||||||||||||
case modeCopy: | ||||||||||||||||
// No flush occurred before the handler returned. | ||||||||||||||||
// Send the data as one chunk. | ||||||||||||||||
ctx.SetStatusCode(w.StatusCode()) | ||||||||||||||||
|
@@ -99,16 +100,14 @@ func NewFastHTTPHandler(h http.Handler) fasthttp.RequestHandler { | |||||||||||||||
ctx.Response.Header.Set(fasthttp.HeaderContentType, http.DetectContentType(b[:l])) | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
w.responseMutex.Lock() | ||||||||||||||||
if len(*w.responseBody) > 0 { | ||||||||||||||||
ctx.Response.SetBody(*w.responseBody) | ||||||||||||||||
} | ||||||||||||||||
w.responseMutex.Unlock() | ||||||||||||||||
|
||||||||||||||||
// Release after sending response. | ||||||||||||||||
releaseNetHTTPResponseWriter(w) | ||||||||||||||||
// Signal that the net/http -> fasthttp copy is complete. | ||||||||||||||||
w.wg.Done() | ||||||||||||||||
|
||||||||||||||||
case modeFlushed: | ||||||||||||||||
case modeStream: | ||||||||||||||||
// Flush occurred before handler returned. | ||||||||||||||||
// Send the first 512 bytes and start streaming | ||||||||||||||||
// the rest of the first chunk and new data as it arrives. | ||||||||||||||||
|
@@ -132,7 +131,6 @@ func NewFastHTTPHandler(h http.Handler) fasthttp.RequestHandler { | |||||||||||||||
|
||||||||||||||||
// Lock the current response body until | ||||||||||||||||
// it is sent in the StreamWriter function. | ||||||||||||||||
w.responseMutex.Lock() | ||||||||||||||||
if !haveContentType { | ||||||||||||||||
// From net/http.ResponseWriter.Write: | ||||||||||||||||
// If the Header does not contain a Content-Type line, Write adds a Content-Type set | ||||||||||||||||
|
@@ -146,15 +144,21 @@ func NewFastHTTPHandler(h http.Handler) fasthttp.RequestHandler { | |||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
// Start streaming mode on return. | ||||||||||||||||
w.r, w.w = io.Pipe() | ||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [nitpick] The io.Pipe() creation should be moved to the reset() method or initialization to ensure pipes are always available when needed, rather than creating them only in flush mode. Copilot uses AI. Check for mistakes. Positive FeedbackNegative Feedback There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The pipes are only needed specifically during streaming (flush) mode. Since there is no need to allocate these pips in any other situation, I had updated to code to create the pipes only during stream mode since |
||||||||||||||||
ctx.SetBodyStreamWriter(func(bw *bufio.Writer) { | ||||||||||||||||
// Signal that streaming completed on return. | ||||||||||||||||
Comment on lines
+147
to
+149
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The pipe is created after the WaitGroup is decremented in the
Suggested change
Copilot uses AI. Check for mistakes. Positive FeedbackNegative Feedback |
||||||||||||||||
defer func() { | ||||||||||||||||
w.r.Close() | ||||||||||||||||
w.wg.Done() | ||||||||||||||||
}() | ||||||||||||||||
|
||||||||||||||||
// Stream the first chunk. | ||||||||||||||||
if len(*w.responseBody) > 0 { | ||||||||||||||||
_, _ = bw.Write(*w.responseBody) | ||||||||||||||||
_ = bw.Flush() | ||||||||||||||||
} | ||||||||||||||||
// The current response body is no longer used | ||||||||||||||||
// past this point. | ||||||||||||||||
w.responseMutex.Unlock() | ||||||||||||||||
|
||||||||||||||||
// Stream the rest of the data that is read | ||||||||||||||||
// from the net/http handler in 32 KiB chunks. | ||||||||||||||||
|
@@ -163,16 +167,13 @@ func NewFastHTTPHandler(h http.Handler) fasthttp.RequestHandler { | |||||||||||||||
// as data comes in. | ||||||||||||||||
chunk := acquireBuffer() | ||||||||||||||||
*chunk = (*chunk)[:minBufferSize] | ||||||||||||||||
defer releaseBuffer(chunk) | ||||||||||||||||
for { | ||||||||||||||||
// Read net/http handler chunk. | ||||||||||||||||
n, err := w.r.Read(*chunk) | ||||||||||||||||
if err != nil { | ||||||||||||||||
// Handler ended due to an io.EOF | ||||||||||||||||
// or an error occurred. | ||||||||||||||||
// | ||||||||||||||||
// Release the response writer for reuse. | ||||||||||||||||
releaseBuffer(chunk) | ||||||||||||||||
releaseNetHTTPResponseWriter(w) | ||||||||||||||||
return | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
|
@@ -182,21 +183,13 @@ func NewFastHTTPHandler(h http.Handler) fasthttp.RequestHandler { | |||||||||||||||
if err != nil { | ||||||||||||||||
// Handler ended due to an io.ErrPipeClosed | ||||||||||||||||
// or an error occurred. | ||||||||||||||||
// | ||||||||||||||||
// Release the response writer for reuse. | ||||||||||||||||
releaseBuffer(chunk) | ||||||||||||||||
releaseNetHTTPResponseWriter(w) | ||||||||||||||||
return | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
err = bw.Flush() | ||||||||||||||||
if err != nil { | ||||||||||||||||
// Handler ended due to an io.ErrPipeClosed | ||||||||||||||||
// or an error occurred. | ||||||||||||||||
// | ||||||||||||||||
// Release the response writer for reuse. | ||||||||||||||||
releaseBuffer(chunk) | ||||||||||||||||
releaseNetHTTPResponseWriter(w) | ||||||||||||||||
return | ||||||||||||||||
} | ||||||||||||||||
} | ||||||||||||||||
|
@@ -209,234 +202,32 @@ func NewFastHTTPHandler(h http.Handler) fasthttp.RequestHandler { | |||||||||||||||
w.streamCond.Signal() | ||||||||||||||||
w.streamCond.L.Unlock() | ||||||||||||||||
|
||||||||||||||||
case modeHijacked: | ||||||||||||||||
case modeHijack: | ||||||||||||||||
// The net/http handler called w.Hijack(). | ||||||||||||||||
// Copy data bidirectionally between the | ||||||||||||||||
// net/http and fasthttp connections. | ||||||||||||||||
var wg sync.WaitGroup | ||||||||||||||||
wg.Add(2) | ||||||||||||||||
w.hijackedWg.Add(2) | ||||||||||||||||
|
||||||||||||||||
// Note: It is safe to assume that net.Conn automatically | ||||||||||||||||
// flushes data while copying. | ||||||||||||||||
go func() { | ||||||||||||||||
defer wg.Done() | ||||||||||||||||
defer w.hijackedWg.Done() | ||||||||||||||||
_, _ = io.Copy(ctx.Conn(), w.handlerConn) | ||||||||||||||||
|
||||||||||||||||
// Close the fasthttp connection when | ||||||||||||||||
// the net/http connection closes. | ||||||||||||||||
_ = ctx.Conn().Close() | ||||||||||||||||
}() | ||||||||||||||||
go func() { | ||||||||||||||||
defer wg.Done() | ||||||||||||||||
defer w.hijackedWg.Done() | ||||||||||||||||
_, _ = io.Copy(w.handlerConn, ctx.Conn()) | ||||||||||||||||
// Note: Only the net/http handler | ||||||||||||||||
// should close the connection. | ||||||||||||||||
}() | ||||||||||||||||
w.hijackedWg.Wait() | ||||||||||||||||
|
||||||||||||||||
// Wait for the net/http handler to finish | ||||||||||||||||
// writing to the hijacked connection prior to releasing | ||||||||||||||||
// the writer into the writer pool. | ||||||||||||||||
wg.Wait() | ||||||||||||||||
releaseNetHTTPResponseWriter(w) | ||||||||||||||||
} | ||||||||||||||||
} | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
// Use a minimum buffer size of 32 KiB. | ||||||||||||||||
const minBufferSize = 32 * 1024 | ||||||||||||||||
|
||||||||||||||||
var bufferPool = &sync.Pool{ | ||||||||||||||||
New: func() any { | ||||||||||||||||
b := make([]byte, minBufferSize) | ||||||||||||||||
return &b | ||||||||||||||||
}, | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
var writerPool = &sync.Pool{ | ||||||||||||||||
New: func() any { | ||||||||||||||||
pr, pw := io.Pipe() | ||||||||||||||||
return &netHTTPResponseWriter{ | ||||||||||||||||
h: make(http.Header), | ||||||||||||||||
r: pr, | ||||||||||||||||
w: pw, | ||||||||||||||||
modeCh: make(chan ModeType), | ||||||||||||||||
responseBody: acquireBuffer(), | ||||||||||||||||
streamCond: sync.NewCond(&sync.Mutex{}), | ||||||||||||||||
// Signal that the hijacked connection was closed. | ||||||||||||||||
w.wg.Done() | ||||||||||||||||
} | ||||||||||||||||
}, | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
type ModeType int | ||||||||||||||||
|
||||||||||||||||
const ( | ||||||||||||||||
modeUnknown ModeType = iota | ||||||||||||||||
modeDone | ||||||||||||||||
modeFlushed | ||||||||||||||||
modeHijacked | ||||||||||||||||
) | ||||||||||||||||
|
||||||||||||||||
type netHTTPResponseWriter struct { | ||||||||||||||||
handlerConn net.Conn | ||||||||||||||||
ctx *fasthttp.RequestCtx | ||||||||||||||||
h http.Header | ||||||||||||||||
r *io.PipeReader | ||||||||||||||||
w *io.PipeWriter | ||||||||||||||||
modeCh chan ModeType | ||||||||||||||||
responseBody *[]byte | ||||||||||||||||
streamCond *sync.Cond | ||||||||||||||||
statusCode int | ||||||||||||||||
once sync.Once | ||||||||||||||||
statusMutex sync.Mutex | ||||||||||||||||
responseMutex sync.Mutex | ||||||||||||||||
connMutex sync.Mutex | ||||||||||||||||
isStreaming bool | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
func acquireNetHTTPResponseWriter(ctx *fasthttp.RequestCtx) *netHTTPResponseWriter { | ||||||||||||||||
w, ok := writerPool.Get().(*netHTTPResponseWriter) | ||||||||||||||||
if !ok { | ||||||||||||||||
panic("fasthttpadaptor: cannot get *netHTTPResponseWriter from writerPool") | ||||||||||||||||
} | ||||||||||||||||
w.reset() | ||||||||||||||||
|
||||||||||||||||
w.ctx = ctx | ||||||||||||||||
return w | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
func releaseNetHTTPResponseWriter(w *netHTTPResponseWriter) { | ||||||||||||||||
releaseBuffer(w.responseBody) | ||||||||||||||||
w.Close() | ||||||||||||||||
writerPool.Put(w) | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
func acquireBuffer() *[]byte { | ||||||||||||||||
buf, ok := bufferPool.Get().(*[]byte) | ||||||||||||||||
if !ok { | ||||||||||||||||
panic("fasthttpadaptor: cannot get *[]byte from bufferPool") | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
*buf = (*buf)[:0] | ||||||||||||||||
return buf | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
func releaseBuffer(buf *[]byte) { | ||||||||||||||||
bufferPool.Put(buf) | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
func (w *netHTTPResponseWriter) StatusCode() int { | ||||||||||||||||
w.statusMutex.Lock() | ||||||||||||||||
defer w.statusMutex.Unlock() | ||||||||||||||||
|
||||||||||||||||
if w.statusCode == 0 { | ||||||||||||||||
return http.StatusOK | ||||||||||||||||
} | ||||||||||||||||
return w.statusCode | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
func (w *netHTTPResponseWriter) Header() http.Header { | ||||||||||||||||
return w.h | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
func (w *netHTTPResponseWriter) WriteHeader(statusCode int) { | ||||||||||||||||
w.statusMutex.Lock() | ||||||||||||||||
defer w.statusMutex.Unlock() | ||||||||||||||||
|
||||||||||||||||
w.statusCode = statusCode | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
func (w *netHTTPResponseWriter) Write(p []byte) (int, error) { | ||||||||||||||||
w.streamCond.L.Lock() | ||||||||||||||||
defer w.streamCond.L.Unlock() | ||||||||||||||||
|
||||||||||||||||
if w.isStreaming { | ||||||||||||||||
// Streaming mode is on. | ||||||||||||||||
// Stream directly to the conn writer. | ||||||||||||||||
return w.w.Write(p) | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
// Streaming mode is off. | ||||||||||||||||
// Write to the first chunk for flushing later. | ||||||||||||||||
w.responseMutex.Lock() | ||||||||||||||||
*w.responseBody = append(*w.responseBody, p...) | ||||||||||||||||
w.responseMutex.Unlock() | ||||||||||||||||
return len(p), nil | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
func (w *netHTTPResponseWriter) Flush() { | ||||||||||||||||
// Trigger streaming mode setup. | ||||||||||||||||
w.once.Do(func() { | ||||||||||||||||
w.modeCh <- modeFlushed | ||||||||||||||||
}) | ||||||||||||||||
|
||||||||||||||||
// Wait for streaming mode. | ||||||||||||||||
w.streamCond.L.Lock() | ||||||||||||||||
defer w.streamCond.L.Unlock() | ||||||||||||||||
for !w.isStreaming { | ||||||||||||||||
w.streamCond.Wait() | ||||||||||||||||
} | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
func (w *netHTTPResponseWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) { | ||||||||||||||||
// Hijack assumes control of the connection, so we need to prevent fasthttp from closing it or | ||||||||||||||||
// doing anything else with it. | ||||||||||||||||
w.ctx.HijackSetNoResponse(true) | ||||||||||||||||
|
||||||||||||||||
netHTTPConn, fasthttpConn := net.Pipe() | ||||||||||||||||
w.handlerConn = fasthttpConn | ||||||||||||||||
|
||||||||||||||||
// Trigger hijacked mode. | ||||||||||||||||
w.once.Do(func() { | ||||||||||||||||
w.modeCh <- modeHijacked | ||||||||||||||||
}) | ||||||||||||||||
|
||||||||||||||||
bufRW := bufio.NewReadWriter(bufio.NewReader(netHTTPConn), bufio.NewWriter(netHTTPConn)) | ||||||||||||||||
|
||||||||||||||||
// Write any unflushed body to the hijacked connection buffer. | ||||||||||||||||
w.responseMutex.Lock() | ||||||||||||||||
if len(*w.responseBody) > 0 { | ||||||||||||||||
_, _ = bufRW.Write(*w.responseBody) | ||||||||||||||||
_ = bufRW.Flush() | ||||||||||||||||
} | ||||||||||||||||
w.responseMutex.Unlock() | ||||||||||||||||
return netHTTPConn, bufRW, nil | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
func (w *netHTTPResponseWriter) Close() error { | ||||||||||||||||
_ = w.w.Close() | ||||||||||||||||
_ = w.r.Close() | ||||||||||||||||
|
||||||||||||||||
w.connMutex.Lock() | ||||||||||||||||
if w.handlerConn != nil { | ||||||||||||||||
_ = w.handlerConn.Close() | ||||||||||||||||
} | ||||||||||||||||
w.connMutex.Unlock() | ||||||||||||||||
return nil | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
func (w *netHTTPResponseWriter) reset() { | ||||||||||||||||
// Note: reset() must only run after a fasthttp handler finishes | ||||||||||||||||
// proxying the full net/http handler response to ensure no data races. | ||||||||||||||||
w.ctx = nil | ||||||||||||||||
w.connMutex.Lock() | ||||||||||||||||
w.handlerConn = nil | ||||||||||||||||
w.connMutex.Unlock() | ||||||||||||||||
w.statusCode = 0 | ||||||||||||||||
|
||||||||||||||||
// Open new bidirectional pipes | ||||||||||||||||
pr, pw := io.Pipe() | ||||||||||||||||
w.r = pr | ||||||||||||||||
w.w = pw | ||||||||||||||||
|
||||||||||||||||
// Clear the http Header | ||||||||||||||||
for key := range w.h { | ||||||||||||||||
delete(w.h, key) | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
// Get a new buffer for the response body | ||||||||||||||||
w.responseBody = acquireBuffer() | ||||||||||||||||
|
||||||||||||||||
w.once = sync.Once{} | ||||||||||||||||
w.streamCond.L.Lock() | ||||||||||||||||
w.isStreaming = false | ||||||||||||||||
w.streamCond.L.Unlock() | ||||||||||||||||
} |
Uh oh!
There was an error while loading. Please reload this page.