diff --git a/bench_test.go b/bench_test.go index 8b3ad93..c84c3d0 100644 --- a/bench_test.go +++ b/bench_test.go @@ -10,10 +10,7 @@ func BenchmarkBuffer(b *testing.B) { noop := func([]int) {} b.Run("push only", func(b *testing.B) { - sut := buffer.New( - noop, - buffer.WithSize(uint(b.N)+1), - ) + sut := buffer.New(uint(b.N)+1, noop) defer sut.Close() for b.Loop() { @@ -25,10 +22,7 @@ func BenchmarkBuffer(b *testing.B) { }) b.Run("push and flush", func(b *testing.B) { - sut := buffer.New( - noop, - buffer.WithSize(1), - ) + sut := buffer.New(1, noop) defer sut.Close() for b.Loop() { diff --git a/buffer.go b/buffer.go index 163af8e..68c4a0b 100644 --- a/buffer.go +++ b/buffer.go @@ -2,6 +2,7 @@ package buffer import ( "errors" + "fmt" "io" "time" ) @@ -17,33 +18,44 @@ type ( // Buffer represents a data buffer that is asynchronously flushed, either manually or automatically. Buffer[T any] struct { io.Closer - flushFunc func([]T) - dataCh chan T - flushCh chan struct{} - closeCh chan struct{} - doneCh chan struct{} - options *Options + size uint + flusher func([]T) + options *Options + dataCh chan T + flushCh chan struct{} + closeCh chan struct{} + doneCh chan struct{} } ) -// New creates a new buffer instance with the provided flush function and options. -// It panics if provided with a nil flush function. -func New[T any](flushFunc func([]T), opts ...Option) *Buffer[T] { - if flushFunc == nil { - panic("flush function cannot be nil") +// New creates a new buffer instance with the provided size, flush function and options. +// It returns an error if provided with invalid arguments. +func New[T any](size uint, flusher func([]T), opts ...Option) (*Buffer[T], error) { + if size == 0 { + return nil, fmt.Errorf("size cannot be zero") + } + + if flusher == nil { + return nil, fmt.Errorf("flush function cannot be nil") + } + + options, err := resolveOptions(opts...) + if err != nil { + return nil, err } buffer := &Buffer[T]{ - flushFunc: flushFunc, - dataCh: make(chan T), - flushCh: make(chan struct{}), - closeCh: make(chan struct{}), - doneCh: make(chan struct{}), - options: resolveOptions(opts...), + size: size, + flusher: flusher, + options: options, + dataCh: make(chan T), + flushCh: make(chan struct{}), + closeCh: make(chan struct{}), + doneCh: make(chan struct{}), } go buffer.consume() - return buffer + return buffer, nil } // Push appends an item to the end of the buffer. @@ -122,7 +134,7 @@ func (buffer *Buffer[T]) closed() bool { func (buffer *Buffer[T]) consume() { count := 0 - items := make([]T, buffer.options.Size) + items := make([]T, buffer.size) mustFlush := false ticker, stopTicker := newTicker(buffer.options.FlushInterval) @@ -144,10 +156,10 @@ func (buffer *Buffer[T]) consume() { if mustFlush { stopTicker() - buffer.flushFunc(items[:count]) + buffer.flusher(items[:count]) count = 0 - items = make([]T, buffer.options.Size) + items = make([]T, buffer.size) mustFlush = false ticker, stopTicker = newTicker(buffer.options.FlushInterval) } diff --git a/buffer_test.go b/buffer_test.go index d509466..541b1a7 100644 --- a/buffer_test.go +++ b/buffer_test.go @@ -19,10 +19,7 @@ var _ = Describe("Buffer", func() { Context("Constructor", func() { It("creates a new Buffer instance", func() { // act - sut := buffer.New( - flusher.Flush, - buffer.WithSize(10), - ) + sut := buffer.New(10, flusher.Flush) // assert Expect(sut).NotTo(BeNil()) @@ -30,60 +27,38 @@ var _ = Describe("Buffer", func() { It("panics when provided an invalid flusher", func() { Expect(func() { - buffer.New[string]( - nil, - buffer.WithSize(1), - ) + buffer.New[string](1, nil) }).To(Panic()) }) Context("invalid options", func() { It("panics when provided an invalid size", func() { Expect(func() { - buffer.New( - flusher.Flush, - buffer.WithSize(0), - ) + buffer.New(0, flusher.Flush) }).To(Panic()) }) It("panics when provided an invalid flush interval", func() { Expect(func() { - buffer.New( - flusher.Flush, - buffer.WithSize(1), - buffer.WithFlushInterval(-1), - ) + buffer.New(1, flusher.Flush, buffer.WithFlushInterval(-1)) }).To(Panic()) }) It("panics when provided an invalid push timeout", func() { Expect(func() { - buffer.New( - flusher.Flush, - buffer.WithSize(1), - buffer.WithPushTimeout(-1), - ) + buffer.New(1, flusher.Flush, buffer.WithPushTimeout(-1)) }).To(Panic()) }) It("panics when provided an invalid flush timeout", func() { Expect(func() { - buffer.New( - flusher.Flush, - buffer.WithSize(1), - buffer.WithFlushTimeout(-1), - ) + buffer.New(1, flusher.Flush, buffer.WithFlushTimeout(-1)) }).To(Panic()) }) It("panics when provided an invalid close timeout", func() { Expect(func() { - buffer.New( - flusher.Flush, - buffer.WithSize(1), - buffer.WithCloseTimeout(-1), - ) + buffer.New(1, flusher.Flush, buffer.WithCloseTimeout(-1)) }).To(Panic()) }) }) @@ -92,10 +67,7 @@ var _ = Describe("Buffer", func() { Context("Pushing", func() { It("pushes items into the buffer when Push is called", func() { // arrange - sut := buffer.New( - flusher.Flush, - buffer.WithSize(3), - ) + sut := buffer.New(3, flusher.Flush) // act err1 := sut.Push("a") @@ -111,11 +83,7 @@ var _ = Describe("Buffer", func() { It("fails when Push cannot execute in a timely fashion", func() { // arrange flusher.Func = func() { select {} } - sut := buffer.New( - flusher.Flush, - buffer.WithSize(2), - buffer.WithPushTimeout(time.Second), - ) + sut := buffer.New(2, flusher.Flush, buffer.WithPushTimeout(time.Second)) // act err1 := sut.Push("a") @@ -130,10 +98,7 @@ var _ = Describe("Buffer", func() { It("fails when the buffer is closed", func() { // arrange - sut := buffer.New( - flusher.Flush, - buffer.WithSize(2), - ) + sut := buffer.New(2, flusher.Flush) _ = sut.Close() // act @@ -147,10 +112,7 @@ var _ = Describe("Buffer", func() { Context("Flushing", func() { It("flushes the buffer when it fills up", func(done Done) { // arrange - sut := buffer.New( - flusher.Flush, - buffer.WithSize(5), - ) + sut := buffer.New(5, flusher.Flush) // act _ = sut.Push("a") @@ -169,11 +131,7 @@ var _ = Describe("Buffer", func() { // arrange interval := 3 * time.Second start := time.Now() - sut := buffer.New( - flusher.Flush, - buffer.WithSize(5), - buffer.WithFlushInterval(interval), - ) + sut := buffer.New(5, flusher.Flush, buffer.WithFlushInterval(interval)) // act _ = sut.Push("a") @@ -187,10 +145,7 @@ var _ = Describe("Buffer", func() { It("flushes the buffer when Flush is called", func(done Done) { // arrange - sut := buffer.New( - flusher.Flush, - buffer.WithSize(3), - ) + sut := buffer.New(3, flusher.Flush) _ = sut.Push("a") _ = sut.Push("b") @@ -207,11 +162,7 @@ var _ = Describe("Buffer", func() { It("fails when Flush cannot execute in a timely fashion", func() { // arrange flusher.Func = func() { time.Sleep(3 * time.Second) } - sut := buffer.New( - flusher.Flush, - buffer.WithSize(1), - buffer.WithFlushTimeout(time.Second), - ) + sut := buffer.New(1, flusher.Flush, buffer.WithFlushTimeout(time.Second)) _ = sut.Push("a") // act @@ -223,10 +174,7 @@ var _ = Describe("Buffer", func() { It("fails when the buffer is closed", func() { // arrange - sut := buffer.New[string]( - flusher.Flush, - buffer.WithSize(2), - ) + sut := buffer.New[string](2, flusher.Flush) _ = sut.Close() // act @@ -240,10 +188,7 @@ var _ = Describe("Buffer", func() { Context("Closing", func() { It("flushes the buffer and closes it when Close is called", func(done Done) { // arrange - sut := buffer.New( - flusher.Flush, - buffer.WithSize(3), - ) + sut := buffer.New(3, flusher.Flush) _ = sut.Push("a") _ = sut.Push("b") @@ -261,11 +206,7 @@ var _ = Describe("Buffer", func() { // arrange flusher.Func = func() { time.Sleep(2 * time.Second) } - sut := buffer.New( - flusher.Flush, - buffer.WithSize(1), - buffer.WithCloseTimeout(time.Second), - ) + sut := buffer.New(1, flusher.Flush, buffer.WithCloseTimeout(time.Second)) _ = sut.Push("a") // act @@ -279,11 +220,7 @@ var _ = Describe("Buffer", func() { // arrange flusher.Func = func() { time.Sleep(2 * time.Second) } - sut := buffer.New( - flusher.Flush, - buffer.WithSize(1), - buffer.WithCloseTimeout(time.Second), - ) + sut := buffer.New(1, flusher.Flush, buffer.WithCloseTimeout(time.Second)) _ = sut.Close() // act @@ -297,11 +234,7 @@ var _ = Describe("Buffer", func() { // arrange flusher.Func = func() { time.Sleep(2 * time.Second) } - sut := buffer.New( - flusher.Flush, - buffer.WithSize(1), - buffer.WithCloseTimeout(time.Second), - ) + sut := buffer.New(1, flusher.Flush, buffer.WithCloseTimeout(time.Second)) _ = sut.Push("a") // act diff --git a/go.mod b/go.mod index 5d58926..ceac68b 100644 --- a/go.mod +++ b/go.mod @@ -1,4 +1,4 @@ -module github.com/globocom/go-buffer/v3 +module github.com/globocom/go-buffer/v4 retract ( v3.0.0 // Published prematurely @@ -7,6 +7,10 @@ retract ( go 1.24 +retract ( + v3 +) + require ( github.com/onsi/ginkgo v1.13.0 github.com/onsi/gomega v1.10.1 diff --git a/options.go b/options.go index 794b64e..a037243 100644 --- a/options.go +++ b/options.go @@ -1,7 +1,6 @@ package buffer import ( - "errors" "fmt" "time" ) @@ -15,7 +14,6 @@ const ( type ( // Configuration options. Options struct { - Size uint FlushInterval time.Duration PushTimeout time.Duration FlushTimeout time.Duration @@ -26,13 +24,6 @@ type ( Option func(*Options) ) -// WithSize sets the size of the buffer. -func WithSize(size uint) Option { - return func(options *Options) { - options.Size = size - } -} - // WithFlushInterval sets the interval between automatic flushes. func WithFlushInterval(interval time.Duration) Option { return func(options *Options) { @@ -62,9 +53,6 @@ func WithCloseTimeout(timeout time.Duration) Option { } func validateOptions(options *Options) error { - if options.Size == 0 { - return errors.New(invalidSize) - } if options.FlushInterval < 0 { return fmt.Errorf(invalidInterval, "FlushInterval") } @@ -81,9 +69,8 @@ func validateOptions(options *Options) error { return nil } -func resolveOptions(opts ...Option) *Options { +func resolveOptions(opts ...Option) (*Options, error) { options := &Options{ - Size: 0, FlushInterval: 0, PushTimeout: time.Second, FlushTimeout: time.Second, @@ -95,8 +82,8 @@ func resolveOptions(opts ...Option) *Options { } if err := validateOptions(options); err != nil { - panic(err) + return nil, err } - return options + return options, nil } diff --git a/options_test.go b/options_test.go index 18a52d6..45b6e4d 100644 --- a/options_test.go +++ b/options_test.go @@ -10,17 +10,6 @@ import ( ) var _ = Describe("Options", func() { - It("sets up size", func() { - // arrange - opts := &buffer.Options{} - - // act - buffer.WithSize(10)(opts) - - // assert - Expect(opts.Size).To(BeIdenticalTo(uint(10))) - }) - It("sets up flush interval", func() { // arrange opts := &buffer.Options{}