Skip to content

Commit 9726eb1

Browse files
committed
feat: retry upon checking for success
1 parent 0fac65a commit 9726eb1

File tree

1 file changed

+28
-5
lines changed

1 file changed

+28
-5
lines changed

mc2mc/internal/client/odps.go

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ func (c *odpsClient) ExecSQL(ctx context.Context, query string) error {
5454
c.logger.Info("context cancelled, terminating task instance")
5555
err := taskIns.Terminate()
5656
return e.Join(ctx.Err(), err)
57-
case err := <-wait(ctx, c.logger, taskIns):
57+
case err := <-c.wait(ctx, taskIns):
5858
return errors.WithStack(err)
5959
}
6060
}
@@ -105,34 +105,39 @@ func (c *odpsClient) GetOrderedColumns(tableID string) ([]string, error) {
105105
}
106106

107107
// wait waits for the task instance to finish on a separate goroutine
108-
func wait(ctx context.Context, l *slog.Logger, taskIns *odps.Instance) <-chan error {
108+
func (c *odpsClient) wait(ctx context.Context, taskIns *odps.Instance) <-chan error {
109109
errChan := make(chan error)
110110
done := make(chan uint8)
111111
// progress log
112112
go func() {
113113
for {
114114
select {
115115
case <-done:
116-
l.Info(fmt.Sprintf("execution finished with status: %s", taskIns.Status()))
116+
c.logger.Info(fmt.Sprintf("execution finished with status: %s", taskIns.Status()))
117117
return
118118
case <-ctx.Done():
119119
return
120120
default:
121-
l.Info("execution in progress...")
121+
c.logger.Info("execution in progress...")
122122
time.Sleep(time.Second * 60)
123123
}
124124
}
125125
}()
126126
// wait for task instance to finish
127127
go func(errChan chan<- error) {
128128
defer close(errChan)
129-
err := taskIns.WaitForSuccess()
129+
err := c.retry(taskIns.WaitForSuccess)
130130
errChan <- errors.WithStack(err)
131131
done <- 0
132132
}(errChan)
133133
return errChan
134134
}
135135

136+
// retry retries the given function with exponential backoff
137+
func (c *odpsClient) retry(f func() error) error {
138+
return retry(c.logger, 3, 1000, f)
139+
}
140+
136141
func addHints(additionalHints map[string]string, query string) map[string]string {
137142
hints := make(map[string]string)
138143
for k, v := range additionalHints {
@@ -174,3 +179,21 @@ func getTable(client *odps.Odps, tableID string) (*odps.Table, error) {
174179
}
175180
return table, nil
176181
}
182+
183+
func retry(l *slog.Logger, retryMax int, retryBackoffMs int64, f func() error) error {
184+
var err error
185+
sleepTime := int64(1)
186+
187+
for i := range retryMax {
188+
err = f()
189+
if err == nil {
190+
return nil
191+
}
192+
193+
l.Warn(fmt.Sprintf("retry: %d, error: %v", i, err))
194+
sleepTime *= 1 << i
195+
time.Sleep(time.Duration(sleepTime*retryBackoffMs) * time.Millisecond)
196+
}
197+
198+
return err
199+
}

0 commit comments

Comments
 (0)