-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Fix Race Condition in fasthttpadaptor v1.66.0 #2069
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 6 commits
59f21b3
6c2101e
5312c11
df1f51e
0b76855
cfeccbf
864629c
2c9a72d
065b350
f4bb3a7
199a009
09bdd43
ab8a223
67ec2d2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -60,18 +60,21 @@ func NewFastHTTPHandler(h http.Handler) fasthttp.RequestHandler { | |||||||||||||||
w := acquireNetHTTPResponseWriter(ctx) | ||||||||||||||||
|
||||||||||||||||
// Concurrently serve the net/http handler. | ||||||||||||||||
w.wg.Add(1) | ||||||||||||||||
go func() { | ||||||||||||||||
h.ServeHTTP(w, r.WithContext(ctx)) | ||||||||||||||||
select { | ||||||||||||||||
case w.modeCh <- modeDone: | ||||||||||||||||
default: | ||||||||||||||||
} | ||||||||||||||||
_ = w.Close() | ||||||||||||||||
}() | ||||||||||||||||
w.chOnce.Do(func() { | ||||||||||||||||
w.modeCh <- modeDone | ||||||||||||||||
}) | ||||||||||||||||
w.Close() | ||||||||||||||||
|
||||||||||||||||
mode := <-w.modeCh | ||||||||||||||||
// Wait for the net/http handler to complete before releasing. | ||||||||||||||||
// (e.g. wait for hijacked connection) | ||||||||||||||||
w.wg.Wait() | ||||||||||||||||
releaseNetHTTPResponseWriter(w) | ||||||||||||||||
}() | ||||||||||||||||
|
||||||||||||||||
switch mode { | ||||||||||||||||
switch <-w.modeCh { | ||||||||||||||||
case modeDone: | ||||||||||||||||
// No flush occurred before the handler returned. | ||||||||||||||||
// Send the data as one chunk. | ||||||||||||||||
|
@@ -99,14 +102,12 @@ func NewFastHTTPHandler(h http.Handler) fasthttp.RequestHandler { | |||||||||||||||
ctx.Response.Header.Set(fasthttp.HeaderContentType, http.DetectContentType(b[:l])) | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
w.responseMutex.Lock() | ||||||||||||||||
if len(*w.responseBody) > 0 { | ||||||||||||||||
ctx.Response.SetBody(*w.responseBody) | ||||||||||||||||
} | ||||||||||||||||
w.responseMutex.Unlock() | ||||||||||||||||
|
||||||||||||||||
// Release after sending response. | ||||||||||||||||
releaseNetHTTPResponseWriter(w) | ||||||||||||||||
// Signal that the net/http -> fasthttp copy is complete. | ||||||||||||||||
w.wg.Done() | ||||||||||||||||
|
||||||||||||||||
case modeFlushed: | ||||||||||||||||
// Flush occurred before handler returned. | ||||||||||||||||
|
@@ -132,7 +133,6 @@ func NewFastHTTPHandler(h http.Handler) fasthttp.RequestHandler { | |||||||||||||||
|
||||||||||||||||
// Lock the current response body until | ||||||||||||||||
// it is sent in the StreamWriter function. | ||||||||||||||||
w.responseMutex.Lock() | ||||||||||||||||
if !haveContentType { | ||||||||||||||||
// From net/http.ResponseWriter.Write: | ||||||||||||||||
// If the Header does not contain a Content-Type line, Write adds a Content-Type set | ||||||||||||||||
|
@@ -146,15 +146,21 @@ func NewFastHTTPHandler(h http.Handler) fasthttp.RequestHandler { | |||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
// Start streaming mode on return. | ||||||||||||||||
w.r, w.w = io.Pipe() | ||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [nitpick] The io.Pipe() creation should be moved to the reset() method or initialization to ensure pipes are always available when needed, rather than creating them only in flush mode. Copilot uses AI. Check for mistakes. Positive FeedbackNegative Feedback There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The pipes are only needed specifically during streaming (flush) mode. Since there is no need to allocate these pips in any other situation, I had updated to code to create the pipes only during stream mode since |
||||||||||||||||
ctx.SetBodyStreamWriter(func(bw *bufio.Writer) { | ||||||||||||||||
// Signal that streaming completed on return. | ||||||||||||||||
Comment on lines
+147
to
+149
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The pipe is created after the WaitGroup is decremented in the
Suggested change
Copilot uses AI. Check for mistakes. Positive FeedbackNegative Feedback |
||||||||||||||||
defer func() { | ||||||||||||||||
w.r.Close() | ||||||||||||||||
w.wg.Done() | ||||||||||||||||
}() | ||||||||||||||||
|
||||||||||||||||
// Stream the first chunk. | ||||||||||||||||
if len(*w.responseBody) > 0 { | ||||||||||||||||
_, _ = bw.Write(*w.responseBody) | ||||||||||||||||
_ = bw.Flush() | ||||||||||||||||
} | ||||||||||||||||
// The current response body is no longer used | ||||||||||||||||
// past this point. | ||||||||||||||||
w.responseMutex.Unlock() | ||||||||||||||||
|
||||||||||||||||
// Stream the rest of the data that is read | ||||||||||||||||
// from the net/http handler in 32 KiB chunks. | ||||||||||||||||
|
@@ -163,16 +169,13 @@ func NewFastHTTPHandler(h http.Handler) fasthttp.RequestHandler { | |||||||||||||||
// as data comes in. | ||||||||||||||||
chunk := acquireBuffer() | ||||||||||||||||
*chunk = (*chunk)[:minBufferSize] | ||||||||||||||||
defer releaseBuffer(chunk) | ||||||||||||||||
for { | ||||||||||||||||
// Read net/http handler chunk. | ||||||||||||||||
n, err := w.r.Read(*chunk) | ||||||||||||||||
if err != nil { | ||||||||||||||||
// Handler ended due to an io.EOF | ||||||||||||||||
// or an error occurred. | ||||||||||||||||
// | ||||||||||||||||
// Release the response writer for reuse. | ||||||||||||||||
releaseBuffer(chunk) | ||||||||||||||||
releaseNetHTTPResponseWriter(w) | ||||||||||||||||
return | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
|
@@ -182,21 +185,13 @@ func NewFastHTTPHandler(h http.Handler) fasthttp.RequestHandler { | |||||||||||||||
if err != nil { | ||||||||||||||||
// Handler ended due to an io.ErrPipeClosed | ||||||||||||||||
// or an error occurred. | ||||||||||||||||
// | ||||||||||||||||
// Release the response writer for reuse. | ||||||||||||||||
releaseBuffer(chunk) | ||||||||||||||||
releaseNetHTTPResponseWriter(w) | ||||||||||||||||
return | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
err = bw.Flush() | ||||||||||||||||
if err != nil { | ||||||||||||||||
// Handler ended due to an io.ErrPipeClosed | ||||||||||||||||
// or an error occurred. | ||||||||||||||||
// | ||||||||||||||||
// Release the response writer for reuse. | ||||||||||||||||
releaseBuffer(chunk) | ||||||||||||||||
releaseNetHTTPResponseWriter(w) | ||||||||||||||||
return | ||||||||||||||||
} | ||||||||||||||||
} | ||||||||||||||||
|
@@ -213,31 +208,28 @@ func NewFastHTTPHandler(h http.Handler) fasthttp.RequestHandler { | |||||||||||||||
// The net/http handler called w.Hijack(). | ||||||||||||||||
// Copy data bidirectionally between the | ||||||||||||||||
// net/http and fasthttp connections. | ||||||||||||||||
var wg sync.WaitGroup | ||||||||||||||||
wg.Add(2) | ||||||||||||||||
w.hijackedWg.Add(2) | ||||||||||||||||
|
||||||||||||||||
// Note: It is safe to assume that net.Conn automatically | ||||||||||||||||
// flushes data while copying. | ||||||||||||||||
go func() { | ||||||||||||||||
defer wg.Done() | ||||||||||||||||
defer w.hijackedWg.Done() | ||||||||||||||||
_, _ = io.Copy(ctx.Conn(), w.handlerConn) | ||||||||||||||||
|
||||||||||||||||
// Close the fasthttp connection when | ||||||||||||||||
// the net/http connection closes. | ||||||||||||||||
_ = ctx.Conn().Close() | ||||||||||||||||
}() | ||||||||||||||||
go func() { | ||||||||||||||||
defer wg.Done() | ||||||||||||||||
defer w.hijackedWg.Done() | ||||||||||||||||
_, _ = io.Copy(w.handlerConn, ctx.Conn()) | ||||||||||||||||
// Note: Only the net/http handler | ||||||||||||||||
// should close the connection. | ||||||||||||||||
}() | ||||||||||||||||
w.hijackedWg.Wait() | ||||||||||||||||
|
||||||||||||||||
// Wait for the net/http handler to finish | ||||||||||||||||
// writing to the hijacked connection prior to releasing | ||||||||||||||||
// the writer into the writer pool. | ||||||||||||||||
wg.Wait() | ||||||||||||||||
releaseNetHTTPResponseWriter(w) | ||||||||||||||||
// Signal that the hijacked connection was closed. | ||||||||||||||||
w.wg.Done() | ||||||||||||||||
} | ||||||||||||||||
} | ||||||||||||||||
} | ||||||||||||||||
|
@@ -276,20 +268,20 @@ const ( | |||||||||||||||
) | ||||||||||||||||
|
||||||||||||||||
type netHTTPResponseWriter struct { | ||||||||||||||||
handlerConn net.Conn | ||||||||||||||||
ctx *fasthttp.RequestCtx | ||||||||||||||||
h http.Header | ||||||||||||||||
r *io.PipeReader | ||||||||||||||||
w *io.PipeWriter | ||||||||||||||||
modeCh chan ModeType | ||||||||||||||||
responseBody *[]byte | ||||||||||||||||
streamCond *sync.Cond | ||||||||||||||||
statusCode int | ||||||||||||||||
once sync.Once | ||||||||||||||||
statusMutex sync.Mutex | ||||||||||||||||
responseMutex sync.Mutex | ||||||||||||||||
connMutex sync.Mutex | ||||||||||||||||
isStreaming bool | ||||||||||||||||
handlerConn net.Conn | ||||||||||||||||
ctx *fasthttp.RequestCtx | ||||||||||||||||
h http.Header | ||||||||||||||||
r *io.PipeReader | ||||||||||||||||
w *io.PipeWriter | ||||||||||||||||
modeCh chan ModeType | ||||||||||||||||
responseBody *[]byte | ||||||||||||||||
streamCond *sync.Cond | ||||||||||||||||
statusCode int | ||||||||||||||||
chOnce sync.Once | ||||||||||||||||
closeOnce sync.Once | ||||||||||||||||
isStreaming bool | ||||||||||||||||
wg sync.WaitGroup | ||||||||||||||||
hijackedWg sync.WaitGroup | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
func acquireNetHTTPResponseWriter(ctx *fasthttp.RequestCtx) *netHTTPResponseWriter { | ||||||||||||||||
|
@@ -305,7 +297,6 @@ func acquireNetHTTPResponseWriter(ctx *fasthttp.RequestCtx) *netHTTPResponseWrit | |||||||||||||||
|
||||||||||||||||
func releaseNetHTTPResponseWriter(w *netHTTPResponseWriter) { | ||||||||||||||||
releaseBuffer(w.responseBody) | ||||||||||||||||
w.Close() | ||||||||||||||||
writerPool.Put(w) | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
|
@@ -324,9 +315,6 @@ func releaseBuffer(buf *[]byte) { | |||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
func (w *netHTTPResponseWriter) StatusCode() int { | ||||||||||||||||
w.statusMutex.Lock() | ||||||||||||||||
defer w.statusMutex.Unlock() | ||||||||||||||||
|
||||||||||||||||
if w.statusCode == 0 { | ||||||||||||||||
return http.StatusOK | ||||||||||||||||
} | ||||||||||||||||
|
@@ -338,9 +326,6 @@ func (w *netHTTPResponseWriter) Header() http.Header { | |||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
func (w *netHTTPResponseWriter) WriteHeader(statusCode int) { | ||||||||||||||||
w.statusMutex.Lock() | ||||||||||||||||
defer w.statusMutex.Unlock() | ||||||||||||||||
|
||||||||||||||||
w.statusCode = statusCode | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
|
@@ -356,15 +341,13 @@ func (w *netHTTPResponseWriter) Write(p []byte) (int, error) { | |||||||||||||||
|
||||||||||||||||
// Streaming mode is off. | ||||||||||||||||
// Write to the first chunk for flushing later. | ||||||||||||||||
w.responseMutex.Lock() | ||||||||||||||||
*w.responseBody = append(*w.responseBody, p...) | ||||||||||||||||
w.responseMutex.Unlock() | ||||||||||||||||
return len(p), nil | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
func (w *netHTTPResponseWriter) Flush() { | ||||||||||||||||
// Trigger streaming mode setup. | ||||||||||||||||
w.once.Do(func() { | ||||||||||||||||
w.chOnce.Do(func() { | ||||||||||||||||
w.modeCh <- modeFlushed | ||||||||||||||||
}) | ||||||||||||||||
|
||||||||||||||||
|
@@ -385,47 +368,38 @@ func (w *netHTTPResponseWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) { | |||||||||||||||
w.handlerConn = fasthttpConn | ||||||||||||||||
|
||||||||||||||||
// Trigger hijacked mode. | ||||||||||||||||
w.once.Do(func() { | ||||||||||||||||
w.chOnce.Do(func() { | ||||||||||||||||
w.modeCh <- modeHijacked | ||||||||||||||||
}) | ||||||||||||||||
|
||||||||||||||||
bufRW := bufio.NewReadWriter(bufio.NewReader(netHTTPConn), bufio.NewWriter(netHTTPConn)) | ||||||||||||||||
|
||||||||||||||||
// Write any unflushed body to the hijacked connection buffer. | ||||||||||||||||
w.responseMutex.Lock() | ||||||||||||||||
if len(*w.responseBody) > 0 { | ||||||||||||||||
_, _ = bufRW.Write(*w.responseBody) | ||||||||||||||||
_ = bufRW.Flush() | ||||||||||||||||
} | ||||||||||||||||
w.responseMutex.Unlock() | ||||||||||||||||
return netHTTPConn, bufRW, nil | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
func (w *netHTTPResponseWriter) Close() error { | ||||||||||||||||
_ = w.w.Close() | ||||||||||||||||
_ = w.r.Close() | ||||||||||||||||
|
||||||||||||||||
w.connMutex.Lock() | ||||||||||||||||
if w.handlerConn != nil { | ||||||||||||||||
_ = w.handlerConn.Close() | ||||||||||||||||
} | ||||||||||||||||
w.connMutex.Unlock() | ||||||||||||||||
w.closeOnce.Do(func() { | ||||||||||||||||
if w.w != nil { | ||||||||||||||||
_ = w.w.Close() | ||||||||||||||||
} | ||||||||||||||||
|
} | |
} | |
if w.r != nil { | |
_ = w.r.Close() | |
} |
Copilot uses AI. Check for mistakes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Close()
method is from the perspective of the net/http handler.
When the net/http handler closes the writer, this should close the writer end w.w
of the streaming mode. The reader end that is used by fasthttpw.r
will then return an EOF error and be closed along with the client connection separately.
If any other mode occurs, the Close() operation does nothing since no internal writers/readers are managed by fasthttp at that point. Hijack mode gives complete control to the user to control w.handlerConn
, while done mode only uses a recycled buffer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This creates a potential deadlock. The goroutine is calling
w.wg.Wait()
while it's part of the same WaitGroup that was incremented withw.wg.Add(1)
at line 63. The goroutine cannot complete because it's waiting for itself to finish.Copilot uses AI. Check for mistakes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe the comment isn't clear about what it means by waiting for the handler to complete. The WaitGroup is meant to track the lifetime of the reference to
w
in each mode type.The
w.wg.Wait()
is necessary here to avoid recycling the writer whilew
is still in use (in SendStreamWriter, or the handlerConn still hasn't been closed).