Skip to content

Commit c93af7f

Browse files
authored
Consolidate service worker channels and goroutines (#417)
Instead of having every service worker report a failure error and the done status of each goroutine within the service, use a single channel to have all service report on their status and completion. Fixes #415
1 parent 9950e7d commit c93af7f

File tree

18 files changed

+381
-309
lines changed

18 files changed

+381
-309
lines changed
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
{
2+
"skip32bit": true
3+
}

analytics/analytics.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ import (
1919
"gorm.io/gorm/clause"
2020
)
2121

22+
const flushInterval = time.Hour
23+
2224
var Enabled = true
2325

2426
var logger = log.Logger("analytics")
@@ -184,11 +186,14 @@ func (c *Collector) Flush() error {
184186
}
185187

186188
func (c *Collector) Start(ctx context.Context) {
189+
timer := time.NewTimer(flushInterval)
190+
defer timer.Stop()
187191
for {
188192
select {
189193
case <-ctx.Done():
190194
return
191-
case <-time.After(time.Hour):
195+
case <-timer.C:
196+
timer.Reset(flushInterval)
192197
}
193198
//nolint:contextcheck
194199
c.Flush()

api/api.go

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -402,10 +402,10 @@ var logger = logging.Logger("api")
402402
// 3. Completion of analytics event flushing.
403403
// - A channel (service.Fail) that reports errors that occur while the server is running.
404404
// - An error if there is an issue during the initialization phase, otherwise nil.
405-
func (s Server) Start(ctx context.Context) ([]service.Done, service.Fail, error) {
405+
func (s Server) Start(ctx context.Context, exitErr chan<- error) error {
406406
err := analytics.Init(ctx, s.db)
407407
if err != nil {
408-
return nil, nil, errors.WithStack(err)
408+
return errors.WithStack(err)
409409
}
410410
e := echo.New()
411411
e.Debug = true
@@ -453,16 +453,18 @@ func (s Server) Start(ctx context.Context) ([]service.Done, service.Fail, error)
453453
e.Listener = s.listener
454454

455455
done := make(chan struct{})
456-
fail := make(chan error)
456+
eventsFlushed := make(chan struct{})
457+
457458
go func() {
458459
err := e.Start("")
459-
if err != nil {
460-
select {
461-
case <-ctx.Done():
462-
case fail <- err:
463-
}
460+
<-eventsFlushed
461+
<-done
462+
463+
if exitErr != nil {
464+
exitErr <- err
464465
}
465466
}()
467+
466468
go func() {
467469
defer close(done)
468470
<-ctx.Done()
@@ -477,21 +479,18 @@ func (s Server) Start(ctx context.Context) ([]service.Done, service.Fail, error)
477479
if err != nil {
478480
logger.Errorw("failed to close database connection", "err", err)
479481
}
480-
}()
481-
hostDone := make(chan struct{})
482-
go func() {
483-
defer close(hostDone)
484-
<-ctx.Done()
482+
485483
s.host.Close()
486484
}()
487-
eventsFlushed := make(chan struct{})
485+
488486
go func() {
489487
defer close(eventsFlushed)
490488
analytics.Default.Start(ctx)
491489
//nolint:contextcheck
492490
analytics.Default.Flush()
493491
}()
494-
return []service.Done{done, hostDone, eventsFlushed}, fail, nil
492+
493+
return nil
495494
}
496495

497496
func isIntKind(kind reflect.Kind) bool {

cmd/testutil.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@ func CalculateCommp(t *testing.T, content []byte, targetPieceSize uint64) string
171171
func WaitForServerReady(ctx context.Context, url string) error {
172172
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
173173
defer cancel()
174+
var timer *time.Timer
174175
for {
175176
if ctx.Err() != nil {
176177
return ctx.Err()
@@ -180,10 +181,16 @@ func WaitForServerReady(ctx context.Context, url string) error {
180181
if err == nil && resp != nil && resp.StatusCode == http.StatusOK {
181182
return nil
182183
}
184+
if timer == nil {
185+
timer = time.NewTimer(100 * time.Millisecond)
186+
defer timer.Stop()
187+
} else {
188+
timer.Reset(100 * time.Millisecond)
189+
}
183190
select {
184191
case <-ctx.Done():
185192
return ctx.Err()
186-
case <-time.After(100 * time.Millisecond):
193+
case <-timer.C:
187194
}
188195
}
189196
}

service/contentprovider/bitswap.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55

66
"github.com/cockroachdb/errors"
7-
"github.com/data-preservation-programs/singularity/service"
87
"github.com/data-preservation-programs/singularity/store"
98
"github.com/data-preservation-programs/singularity/util"
109
nilrouting "github.com/ipfs/go-ipfs-routing/none"
@@ -51,25 +50,25 @@ func (BitswapServer) Name() string {
5150
// It sets up the necessary routing and networking components,
5251
// and starts serving Bitswap requests.
5352
// It returns channels that signal when the service has stopped or encountered an error.
54-
func (s BitswapServer) Start(ctx context.Context) ([]service.Done, service.Fail, error) {
53+
func (s BitswapServer) Start(ctx context.Context, exitErr chan<- error) error {
5554
nilRouter, err := nilrouting.ConstructNilRouting(ctx, nil, nil, nil)
5655
if err != nil {
57-
return nil, nil, errors.WithStack(err)
56+
return errors.WithStack(err)
5857
}
5958

6059
net := bsnetwork.NewFromIpfsHost(s.host, nilRouter)
6160
bs := &store.FileReferenceBlockStore{DBNoContext: s.dbNoContext}
6261
bsserver := server.New(ctx, net, bs)
6362
net.Start(bsserver)
64-
done := make(chan struct{})
65-
fail := make(chan error)
6663

6764
go func() {
68-
defer close(done)
6965
<-ctx.Done()
7066
net.Stop()
7167
bsserver.Close()
7268
s.host.Close()
69+
if exitErr != nil {
70+
exitErr <- nil
71+
}
7372
}()
74-
return []service.Done{done}, fail, nil
73+
return nil
7574
}

service/contentprovider/bitswap_test.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,18 @@ func TestBitswapServer(t *testing.T) {
2121
host: h,
2222
}
2323
require.Equal(t, "Bitswap", s.Name())
24+
25+
exitErr := make(chan error, 1)
2426
ctx, cancel := context.WithCancel(ctx)
25-
done, _, err := s.Start(ctx)
27+
err = s.Start(ctx, exitErr)
2628
require.NoError(t, err)
2729
time.Sleep(200 * time.Millisecond)
2830
cancel()
2931
select {
3032
case <-time.After(1 * time.Second):
3133
t.Fatal("bitswap server did not stop")
32-
case <-done[0]:
34+
case err = <-exitErr:
35+
require.NoError(t, err)
3336
}
3437
})
3538
}

service/contentprovider/http.go

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"github.com/cockroachdb/errors"
1212
"github.com/cockroachdb/errors/oserror"
1313
"github.com/data-preservation-programs/singularity/model"
14-
"github.com/data-preservation-programs/singularity/service"
1514
"github.com/data-preservation-programs/singularity/storagesystem"
1615
"github.com/data-preservation-programs/singularity/store"
1716
"github.com/data-preservation-programs/singularity/util"
@@ -49,7 +48,7 @@ func (*HTTPServer) Name() string {
4948
// - A Done channel slice that are closed when the server has stopped.
5049
// - A Fail channel that receives an error if the server fails to start or stop.
5150
// - An error if the server fails to start.
52-
func (s *HTTPServer) Start(ctx context.Context) ([]service.Done, service.Fail, error) {
51+
func (s *HTTPServer) Start(ctx context.Context, exitErr chan<- error) error {
5352
e := echo.New()
5453
e.Use(middleware.GzipWithConfig(middleware.GzipConfig{}))
5554
e.Use(
@@ -98,27 +97,35 @@ func (s *HTTPServer) Start(ctx context.Context) ([]service.Done, service.Fail, e
9897
e.GET("/health", func(c echo.Context) error {
9998
return c.String(http.StatusOK, "ok")
10099
})
101-
done := make(chan struct{})
102-
fail := make(chan error)
100+
101+
forceShutdown := make(chan struct{})
102+
shutdownErr := make(chan error, 1)
103+
103104
go func() {
104105
err := e.Start(s.bind)
105-
if err != nil {
106-
select {
107-
case <-ctx.Done():
108-
case fail <- err:
106+
if errors.Is(err, http.ErrServerClosed) {
107+
err = nil
108+
}
109+
close(forceShutdown)
110+
closeErr := <-shutdownErr
111+
if exitErr != nil {
112+
if err == nil {
113+
err = closeErr
109114
}
115+
exitErr <- err
110116
}
111117
}()
118+
112119
go func() {
113-
defer close(done)
114-
<-ctx.Done()
115-
//nolint:contextcheck
116-
err := e.Shutdown(context.Background())
117-
if err != nil {
118-
fail <- err
120+
select {
121+
case <-ctx.Done():
122+
case <-forceShutdown:
119123
}
124+
//nolint:contextcheck
125+
shutdownErr <- e.Shutdown(context.Background())
120126
}()
121-
return []service.Done{done}, fail, nil
127+
128+
return nil
122129
}
123130

124131
func getPieceMetadata(ctx context.Context, db *gorm.DB, car model.Car) (*PieceMetadata, error) {

service/contentprovider/http_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,9 @@ func TestHTTPServerStart(t *testing.T) {
2727
bind: "127.0.0.1:65432",
2828
}
2929
require.Equal(t, "HTTPServer", s.Name())
30+
exitErr := make(chan error, 1)
3031
ctx, cancel := context.WithCancel(ctx)
31-
done, _, err := s.Start(ctx)
32+
err := s.Start(ctx, exitErr)
3233
require.NoError(t, err)
3334
time.Sleep(200 * time.Millisecond)
3435
gorequest.New().Get("http://127.0.0.1:65432/health").End()
@@ -37,7 +38,8 @@ func TestHTTPServerStart(t *testing.T) {
3738
select {
3839
case <-time.After(1 * time.Second):
3940
t.Fatal("http server did not stop")
40-
case <-done[0]:
41+
case err = <-exitErr:
42+
require.NoError(t, err)
4143
}
4244
})
4345
}

0 commit comments

Comments
 (0)