Skip to content

Commit bbce743

Browse files
committed
feat: improve multi query logs
1 parent a6c4cc5 commit bbce743

File tree

1 file changed

+31
-9
lines changed

1 file changed

+31
-9
lines changed

mc2mc/mc2mc.go

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
e "errors"
66
"fmt"
7+
"log/slog"
78
"os"
89
"os/signal"
910
"strings"
@@ -162,35 +163,53 @@ func mc2mc(envs []string) error {
162163

163164
// only support concurrent execution for REPLACE method
164165
if cfg.LoadMethod == "REPLACE" {
165-
return executeConcurrently(ctx, c, cfg.Concurrency, queriesToExecute, cfg.AdditionalHints)
166+
return executeConcurrently(ctx, l, c, cfg.Concurrency, queriesToExecute, cfg.AdditionalHints)
166167
}
167168
// otherwise execute sequentially
168-
return execute(ctx, c, queriesToExecute, cfg.AdditionalHints)
169+
return execute(ctx, l, c, queriesToExecute, cfg.AdditionalHints)
169170
}
170171

171-
func executeConcurrently(ctx context.Context, c *client.Client, concurrency int, queriesToExecute []string, additionalHints map[string]string) error {
172+
func executeConcurrently(ctx context.Context, l *slog.Logger, c *client.Client, concurrency int, queriesToExecute []string, additionalHints map[string]string) error {
172173
// execute query concurrently
173174
sem := make(chan uint8, concurrency)
174175
wg := sync.WaitGroup{}
175176
wg.Add(len(queriesToExecute))
176177
errChan := make(chan error, len(queriesToExecute))
178+
ids := sync.Map{} // id to boolean map to track running ids
177179

178180
for i, queryToExecute := range queriesToExecute {
179181
sem <- 0
180-
executeFn := c.ExecuteFn(i + 1)
181-
go func(queryToExecute string, errChan chan error) {
182+
id := i + 1
183+
ids.Store(id, false)
184+
executeFn := c.ExecuteFn(id)
185+
go func(id int, queryToExecute string, errChan chan error) {
186+
defer func() {
187+
wg.Done()
188+
<-sem
189+
ids.Delete(id)
190+
// logs the remaining running ids
191+
remainingIds := []int{}
192+
ids.Range(func(key, value any) bool {
193+
remainingIds = append(remainingIds, key.(int))
194+
return true
195+
})
196+
if len(remainingIds) > 0 {
197+
l.Info(fmt.Sprintf("remaining running ids: %v", remainingIds))
198+
l.Info(fmt.Sprintf("waiting for %d other queries to finish...", len(remainingIds)))
199+
}
200+
}()
182201
err := executeFn(ctx, queryToExecute, additionalHints)
183202
if err != nil {
184203
errChan <- errors.WithStack(err)
185204
}
186-
wg.Done()
187-
<-sem
188-
}(queryToExecute, errChan)
205+
}(id, queryToExecute, errChan)
189206
}
190207

191208
wg.Wait()
192209
close(errChan)
193210

211+
l.Info("all queries have been processed")
212+
194213
// check error
195214
var errs error
196215
for err := range errChan {
@@ -201,14 +220,17 @@ func executeConcurrently(ctx context.Context, c *client.Client, concurrency int,
201220
return errs
202221
}
203222

204-
func execute(ctx context.Context, c *client.Client, queriesToExecute []string, additionalHints map[string]string) error {
223+
func execute(ctx context.Context, l *slog.Logger, c *client.Client, queriesToExecute []string, additionalHints map[string]string) error {
205224
for i, queryToExecute := range queriesToExecute {
225+
l.Info(fmt.Sprintf("processing query %d of %d", i+1, len(queriesToExecute)))
206226
executeFn := c.ExecuteFn(i + 1)
207227
err := executeFn(ctx, queryToExecute, additionalHints)
208228
if err != nil {
209229
return errors.WithStack(err)
210230
}
211231
}
232+
233+
l.Info("all queries have been processed")
212234
return nil
213235
}
214236

0 commit comments

Comments
 (0)