Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
271 changes: 31 additions & 240 deletions fasthttpadaptor/adaptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ package fasthttpadaptor
import (
"bufio"
"io"
"net"
"net/http"
"sync"

"github.com/valyala/fasthttp"
)
Expand Down Expand Up @@ -57,22 +55,25 @@ func NewFastHTTPHandler(h http.Handler) fasthttp.RequestHandler {
return
}

w := acquireNetHTTPResponseWriter(ctx)
w := acquireResponseWriter(ctx)

// Concurrently serve the net/http handler.
w.wg.Add(1)
go func() {
h.ServeHTTP(w, r.WithContext(ctx))
select {
case w.modeCh <- modeDone:
default:
}
_ = w.Close()
}()
w.chOnce.Do(func() {
w.modeCh <- modeCopy
})
w.close()

mode := <-w.modeCh
// Wait for the net/http handler to complete before releasing.
// (e.g. wait for hijacked connection)
w.wg.Wait()
releaseResponseWriter(w)
}()

switch mode {
case modeDone:
switch <-w.modeCh {
case modeCopy:
// No flush occurred before the handler returned.
// Send the data as one chunk.
ctx.SetStatusCode(w.StatusCode())
Expand All @@ -99,16 +100,14 @@ func NewFastHTTPHandler(h http.Handler) fasthttp.RequestHandler {
ctx.Response.Header.Set(fasthttp.HeaderContentType, http.DetectContentType(b[:l]))
}

w.responseMutex.Lock()
if len(*w.responseBody) > 0 {
ctx.Response.SetBody(*w.responseBody)
}
w.responseMutex.Unlock()

// Release after sending response.
releaseNetHTTPResponseWriter(w)
// Signal that the net/http -> fasthttp copy is complete.
w.wg.Done()

case modeFlushed:
case modeStream:
// Flush occurred before handler returned.
// Send the first 512 bytes and start streaming
// the rest of the first chunk and new data as it arrives.
Expand All @@ -132,7 +131,6 @@ func NewFastHTTPHandler(h http.Handler) fasthttp.RequestHandler {

// Lock the current response body until
// it is sent in the StreamWriter function.
w.responseMutex.Lock()
if !haveContentType {
// From net/http.ResponseWriter.Write:
// If the Header does not contain a Content-Type line, Write adds a Content-Type set
Expand All @@ -146,15 +144,21 @@ func NewFastHTTPHandler(h http.Handler) fasthttp.RequestHandler {
}

// Start streaming mode on return.
w.r, w.w = io.Pipe()
ctx.SetBodyStreamWriter(func(bw *bufio.Writer) {
// Signal that streaming completed on return.
defer func() {
w.r.Close()
w.wg.Done()
}()

// Stream the first chunk.
if len(*w.responseBody) > 0 {
_, _ = bw.Write(*w.responseBody)
_ = bw.Flush()
}
// The current response body is no longer used
// past this point.
w.responseMutex.Unlock()

// Stream the rest of the data that is read
// from the net/http handler in 32 KiB chunks.
Expand All @@ -163,16 +167,13 @@ func NewFastHTTPHandler(h http.Handler) fasthttp.RequestHandler {
// as data comes in.
chunk := acquireBuffer()
*chunk = (*chunk)[:minBufferSize]
defer releaseBuffer(chunk)
for {
// Read net/http handler chunk.
n, err := w.r.Read(*chunk)
if err != nil {
// Handler ended due to an io.EOF
// or an error occurred.
//
// Release the response writer for reuse.
releaseBuffer(chunk)
releaseNetHTTPResponseWriter(w)
return
}

Expand All @@ -182,21 +183,13 @@ func NewFastHTTPHandler(h http.Handler) fasthttp.RequestHandler {
if err != nil {
// Handler ended due to an io.ErrPipeClosed
// or an error occurred.
//
// Release the response writer for reuse.
releaseBuffer(chunk)
releaseNetHTTPResponseWriter(w)
return
}

err = bw.Flush()
if err != nil {
// Handler ended due to an io.ErrPipeClosed
// or an error occurred.
//
// Release the response writer for reuse.
releaseBuffer(chunk)
releaseNetHTTPResponseWriter(w)
return
}
}
Expand All @@ -209,234 +202,32 @@ func NewFastHTTPHandler(h http.Handler) fasthttp.RequestHandler {
w.streamCond.Signal()
w.streamCond.L.Unlock()

case modeHijacked:
case modeHijack:
// The net/http handler called w.Hijack().
// Copy data bidirectionally between the
// net/http and fasthttp connections.
var wg sync.WaitGroup
wg.Add(2)
w.hijackedWg.Add(2)

// Note: It is safe to assume that net.Conn automatically
// flushes data while copying.
go func() {
defer wg.Done()
defer w.hijackedWg.Done()
_, _ = io.Copy(ctx.Conn(), w.handlerConn)

// Close the fasthttp connection when
// the net/http connection closes.
_ = ctx.Conn().Close()
}()
go func() {
defer wg.Done()
defer w.hijackedWg.Done()
_, _ = io.Copy(w.handlerConn, ctx.Conn())
// Note: Only the net/http handler
// should close the connection.
}()
w.hijackedWg.Wait()

// Wait for the net/http handler to finish
// writing to the hijacked connection prior to releasing
// the writer into the writer pool.
wg.Wait()
releaseNetHTTPResponseWriter(w)
}
}
}

// Use a minimum buffer size of 32 KiB.
const minBufferSize = 32 * 1024

var bufferPool = &sync.Pool{
New: func() any {
b := make([]byte, minBufferSize)
return &b
},
}

var writerPool = &sync.Pool{
New: func() any {
pr, pw := io.Pipe()
return &netHTTPResponseWriter{
h: make(http.Header),
r: pr,
w: pw,
modeCh: make(chan ModeType),
responseBody: acquireBuffer(),
streamCond: sync.NewCond(&sync.Mutex{}),
// Signal that the hijacked connection was closed.
w.wg.Done()
}
},
}

type ModeType int

const (
modeUnknown ModeType = iota
modeDone
modeFlushed
modeHijacked
)

type netHTTPResponseWriter struct {
handlerConn net.Conn
ctx *fasthttp.RequestCtx
h http.Header
r *io.PipeReader
w *io.PipeWriter
modeCh chan ModeType
responseBody *[]byte
streamCond *sync.Cond
statusCode int
once sync.Once
statusMutex sync.Mutex
responseMutex sync.Mutex
connMutex sync.Mutex
isStreaming bool
}

func acquireNetHTTPResponseWriter(ctx *fasthttp.RequestCtx) *netHTTPResponseWriter {
w, ok := writerPool.Get().(*netHTTPResponseWriter)
if !ok {
panic("fasthttpadaptor: cannot get *netHTTPResponseWriter from writerPool")
}
w.reset()

w.ctx = ctx
return w
}

func releaseNetHTTPResponseWriter(w *netHTTPResponseWriter) {
releaseBuffer(w.responseBody)
w.Close()
writerPool.Put(w)
}

func acquireBuffer() *[]byte {
buf, ok := bufferPool.Get().(*[]byte)
if !ok {
panic("fasthttpadaptor: cannot get *[]byte from bufferPool")
}

*buf = (*buf)[:0]
return buf
}

func releaseBuffer(buf *[]byte) {
bufferPool.Put(buf)
}

func (w *netHTTPResponseWriter) StatusCode() int {
w.statusMutex.Lock()
defer w.statusMutex.Unlock()

if w.statusCode == 0 {
return http.StatusOK
}
return w.statusCode
}

func (w *netHTTPResponseWriter) Header() http.Header {
return w.h
}

func (w *netHTTPResponseWriter) WriteHeader(statusCode int) {
w.statusMutex.Lock()
defer w.statusMutex.Unlock()

w.statusCode = statusCode
}

func (w *netHTTPResponseWriter) Write(p []byte) (int, error) {
w.streamCond.L.Lock()
defer w.streamCond.L.Unlock()

if w.isStreaming {
// Streaming mode is on.
// Stream directly to the conn writer.
return w.w.Write(p)
}

// Streaming mode is off.
// Write to the first chunk for flushing later.
w.responseMutex.Lock()
*w.responseBody = append(*w.responseBody, p...)
w.responseMutex.Unlock()
return len(p), nil
}

func (w *netHTTPResponseWriter) Flush() {
// Trigger streaming mode setup.
w.once.Do(func() {
w.modeCh <- modeFlushed
})

// Wait for streaming mode.
w.streamCond.L.Lock()
defer w.streamCond.L.Unlock()
for !w.isStreaming {
w.streamCond.Wait()
}
}

func (w *netHTTPResponseWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) {
// Hijack assumes control of the connection, so we need to prevent fasthttp from closing it or
// doing anything else with it.
w.ctx.HijackSetNoResponse(true)

netHTTPConn, fasthttpConn := net.Pipe()
w.handlerConn = fasthttpConn

// Trigger hijacked mode.
w.once.Do(func() {
w.modeCh <- modeHijacked
})

bufRW := bufio.NewReadWriter(bufio.NewReader(netHTTPConn), bufio.NewWriter(netHTTPConn))

// Write any unflushed body to the hijacked connection buffer.
w.responseMutex.Lock()
if len(*w.responseBody) > 0 {
_, _ = bufRW.Write(*w.responseBody)
_ = bufRW.Flush()
}
w.responseMutex.Unlock()
return netHTTPConn, bufRW, nil
}

func (w *netHTTPResponseWriter) Close() error {
_ = w.w.Close()
_ = w.r.Close()

w.connMutex.Lock()
if w.handlerConn != nil {
_ = w.handlerConn.Close()
}
w.connMutex.Unlock()
return nil
}

func (w *netHTTPResponseWriter) reset() {
// Note: reset() must only run after a fasthttp handler finishes
// proxying the full net/http handler response to ensure no data races.
w.ctx = nil
w.connMutex.Lock()
w.handlerConn = nil
w.connMutex.Unlock()
w.statusCode = 0

// Open new bidirectional pipes
pr, pw := io.Pipe()
w.r = pr
w.w = pw

// Clear the http Header
for key := range w.h {
delete(w.h, key)
}

// Get a new buffer for the response body
w.responseBody = acquireBuffer()

w.once = sync.Once{}
w.streamCond.L.Lock()
w.isStreaming = false
w.streamCond.L.Unlock()
}
Loading
Loading