Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 2 additions & 8 deletions bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,7 @@ func BenchmarkBuffer(b *testing.B) {
noop := func([]int) {}

b.Run("push only", func(b *testing.B) {
sut := buffer.New(
noop,
buffer.WithSize(uint(b.N)+1),
)
sut := buffer.New(uint(b.N)+1, noop)
defer sut.Close()

for b.Loop() {
Expand All @@ -25,10 +22,7 @@ func BenchmarkBuffer(b *testing.B) {
})

b.Run("push and flush", func(b *testing.B) {
sut := buffer.New(
noop,
buffer.WithSize(1),
)
sut := buffer.New(1, noop)
defer sut.Close()

for b.Loop() {
Expand Down
54 changes: 33 additions & 21 deletions buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package buffer

import (
"errors"
"fmt"
"io"
"time"
)
Expand All @@ -17,33 +18,44 @@ type (
// Buffer represents a data buffer that is asynchronously flushed, either manually or automatically.
Buffer[T any] struct {
io.Closer
flushFunc func([]T)
dataCh chan T
flushCh chan struct{}
closeCh chan struct{}
doneCh chan struct{}
options *Options
size uint
flusher func([]T)
options *Options
dataCh chan T
flushCh chan struct{}
closeCh chan struct{}
doneCh chan struct{}
}
)

// New creates a new buffer instance with the provided flush function and options.
// It panics if provided with a nil flush function.
func New[T any](flushFunc func([]T), opts ...Option) *Buffer[T] {
if flushFunc == nil {
panic("flush function cannot be nil")
// New creates a new buffer instance with the provided size, flush function and options.
// It returns an error if provided with invalid arguments.
func New[T any](size uint, flusher func([]T), opts ...Option) (*Buffer[T], error) {
if size == 0 {
return nil, fmt.Errorf("size cannot be zero")
}

if flusher == nil {
return nil, fmt.Errorf("flush function cannot be nil")
}

options, err := resolveOptions(opts...)
if err != nil {
return nil, err
}

buffer := &Buffer[T]{
flushFunc: flushFunc,
dataCh: make(chan T),
flushCh: make(chan struct{}),
closeCh: make(chan struct{}),
doneCh: make(chan struct{}),
options: resolveOptions(opts...),
size: size,
flusher: flusher,
options: options,
dataCh: make(chan T),
flushCh: make(chan struct{}),
closeCh: make(chan struct{}),
doneCh: make(chan struct{}),
}
go buffer.consume()

return buffer
return buffer, nil
}

// Push appends an item to the end of the buffer.
Expand Down Expand Up @@ -122,7 +134,7 @@ func (buffer *Buffer[T]) closed() bool {

func (buffer *Buffer[T]) consume() {
count := 0
items := make([]T, buffer.options.Size)
items := make([]T, buffer.size)
mustFlush := false
ticker, stopTicker := newTicker(buffer.options.FlushInterval)

Expand All @@ -144,10 +156,10 @@ func (buffer *Buffer[T]) consume() {

if mustFlush {
stopTicker()
buffer.flushFunc(items[:count])
buffer.flusher(items[:count])

count = 0
items = make([]T, buffer.options.Size)
items = make([]T, buffer.size)
mustFlush = false
ticker, stopTicker = newTicker(buffer.options.FlushInterval)
}
Expand Down
105 changes: 19 additions & 86 deletions buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,71 +19,46 @@ var _ = Describe("Buffer", func() {
Context("Constructor", func() {
It("creates a new Buffer instance", func() {
// act
sut := buffer.New(
flusher.Flush,
buffer.WithSize(10),
)
sut := buffer.New(10, flusher.Flush)

// assert
Expect(sut).NotTo(BeNil())
})

It("panics when provided an invalid flusher", func() {
Expect(func() {
buffer.New[string](
nil,
buffer.WithSize(1),
)
buffer.New[string](1, nil)
}).To(Panic())
})

Context("invalid options", func() {
It("panics when provided an invalid size", func() {
Expect(func() {
buffer.New(
flusher.Flush,
buffer.WithSize(0),
)
buffer.New(0, flusher.Flush)
}).To(Panic())
})

It("panics when provided an invalid flush interval", func() {
Expect(func() {
buffer.New(
flusher.Flush,
buffer.WithSize(1),
buffer.WithFlushInterval(-1),
)
buffer.New(1, flusher.Flush, buffer.WithFlushInterval(-1))
}).To(Panic())
})

It("panics when provided an invalid push timeout", func() {
Expect(func() {
buffer.New(
flusher.Flush,
buffer.WithSize(1),
buffer.WithPushTimeout(-1),
)
buffer.New(1, flusher.Flush, buffer.WithPushTimeout(-1))
}).To(Panic())
})

It("panics when provided an invalid flush timeout", func() {
Expect(func() {
buffer.New(
flusher.Flush,
buffer.WithSize(1),
buffer.WithFlushTimeout(-1),
)
buffer.New(1, flusher.Flush, buffer.WithFlushTimeout(-1))
}).To(Panic())
})

It("panics when provided an invalid close timeout", func() {
Expect(func() {
buffer.New(
flusher.Flush,
buffer.WithSize(1),
buffer.WithCloseTimeout(-1),
)
buffer.New(1, flusher.Flush, buffer.WithCloseTimeout(-1))
}).To(Panic())
})
})
Expand All @@ -92,10 +67,7 @@ var _ = Describe("Buffer", func() {
Context("Pushing", func() {
It("pushes items into the buffer when Push is called", func() {
// arrange
sut := buffer.New(
flusher.Flush,
buffer.WithSize(3),
)
sut := buffer.New(3, flusher.Flush)

// act
err1 := sut.Push("a")
Expand All @@ -111,11 +83,7 @@ var _ = Describe("Buffer", func() {
It("fails when Push cannot execute in a timely fashion", func() {
// arrange
flusher.Func = func() { select {} }
sut := buffer.New(
flusher.Flush,
buffer.WithSize(2),
buffer.WithPushTimeout(time.Second),
)
sut := buffer.New(2, flusher.Flush, buffer.WithPushTimeout(time.Second))

// act
err1 := sut.Push("a")
Expand All @@ -130,10 +98,7 @@ var _ = Describe("Buffer", func() {

It("fails when the buffer is closed", func() {
// arrange
sut := buffer.New(
flusher.Flush,
buffer.WithSize(2),
)
sut := buffer.New(2, flusher.Flush)
_ = sut.Close()

// act
Expand All @@ -147,10 +112,7 @@ var _ = Describe("Buffer", func() {
Context("Flushing", func() {
It("flushes the buffer when it fills up", func(done Done) {
// arrange
sut := buffer.New(
flusher.Flush,
buffer.WithSize(5),
)
sut := buffer.New(5, flusher.Flush)

// act
_ = sut.Push("a")
Expand All @@ -169,11 +131,7 @@ var _ = Describe("Buffer", func() {
// arrange
interval := 3 * time.Second
start := time.Now()
sut := buffer.New(
flusher.Flush,
buffer.WithSize(5),
buffer.WithFlushInterval(interval),
)
sut := buffer.New(5, flusher.Flush, buffer.WithFlushInterval(interval))

// act
_ = sut.Push("a")
Expand All @@ -187,10 +145,7 @@ var _ = Describe("Buffer", func() {

It("flushes the buffer when Flush is called", func(done Done) {
// arrange
sut := buffer.New(
flusher.Flush,
buffer.WithSize(3),
)
sut := buffer.New(3, flusher.Flush)
_ = sut.Push("a")
_ = sut.Push("b")

Expand All @@ -207,11 +162,7 @@ var _ = Describe("Buffer", func() {
It("fails when Flush cannot execute in a timely fashion", func() {
// arrange
flusher.Func = func() { time.Sleep(3 * time.Second) }
sut := buffer.New(
flusher.Flush,
buffer.WithSize(1),
buffer.WithFlushTimeout(time.Second),
)
sut := buffer.New(1, flusher.Flush, buffer.WithFlushTimeout(time.Second))
_ = sut.Push("a")

// act
Expand All @@ -223,10 +174,7 @@ var _ = Describe("Buffer", func() {

It("fails when the buffer is closed", func() {
// arrange
sut := buffer.New[string](
flusher.Flush,
buffer.WithSize(2),
)
sut := buffer.New[string](2, flusher.Flush)
_ = sut.Close()

// act
Expand All @@ -240,10 +188,7 @@ var _ = Describe("Buffer", func() {
Context("Closing", func() {
It("flushes the buffer and closes it when Close is called", func(done Done) {
// arrange
sut := buffer.New(
flusher.Flush,
buffer.WithSize(3),
)
sut := buffer.New(3, flusher.Flush)
_ = sut.Push("a")
_ = sut.Push("b")

Expand All @@ -261,11 +206,7 @@ var _ = Describe("Buffer", func() {
// arrange
flusher.Func = func() { time.Sleep(2 * time.Second) }

sut := buffer.New(
flusher.Flush,
buffer.WithSize(1),
buffer.WithCloseTimeout(time.Second),
)
sut := buffer.New(1, flusher.Flush, buffer.WithCloseTimeout(time.Second))
_ = sut.Push("a")

// act
Expand All @@ -279,11 +220,7 @@ var _ = Describe("Buffer", func() {
// arrange
flusher.Func = func() { time.Sleep(2 * time.Second) }

sut := buffer.New(
flusher.Flush,
buffer.WithSize(1),
buffer.WithCloseTimeout(time.Second),
)
sut := buffer.New(1, flusher.Flush, buffer.WithCloseTimeout(time.Second))
_ = sut.Close()

// act
Expand All @@ -297,11 +234,7 @@ var _ = Describe("Buffer", func() {
// arrange
flusher.Func = func() { time.Sleep(2 * time.Second) }

sut := buffer.New(
flusher.Flush,
buffer.WithSize(1),
buffer.WithCloseTimeout(time.Second),
)
sut := buffer.New(1, flusher.Flush, buffer.WithCloseTimeout(time.Second))
_ = sut.Push("a")

// act
Expand Down
6 changes: 5 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module github.com/globocom/go-buffer/v3
module github.com/globocom/go-buffer/v4

retract (
v3.0.0 // Published prematurely
Expand All @@ -7,6 +7,10 @@ retract (

go 1.24

retract (
v3
)

require (
github.com/onsi/ginkgo v1.13.0
github.com/onsi/gomega v1.10.1
Expand Down
Loading
Loading