Skip to content

Commit d22aa3a

Browse files
authored
feat: add query identifier and pass it as hints (#84)
* feat: add query identifier and pass it as hints fix comment * fix call terminate function * log job hints * fix: provide additionalHints in query execution instead * move hints to string function
1 parent a088cdf commit d22aa3a

File tree

4 files changed

+48
-41
lines changed

4 files changed

+48
-41
lines changed

mc2mc/internal/client/client.go

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,14 @@ import (
99
"github.com/pkg/errors"
1010
)
1111

12+
const (
13+
SqlScriptSequenceHint = "goto.sql.script.sequence"
14+
)
15+
1216
type OdpsClient interface {
13-
ExecSQL(ctx context.Context, query string) error
17+
ExecSQL(ctx context.Context, query string, hints map[string]string) error
1418
SetDefaultProject(project string)
1519
SetLogViewRetentionInDays(days int)
16-
SetAdditionalHints(hints map[string]string)
1720
SetDryRun(dryRun bool)
1821
}
1922

@@ -47,13 +50,21 @@ func (c *Client) Close() error {
4750
return errors.WithStack(err)
4851
}
4952

50-
func (c *Client) Execute(ctx context.Context, query string) error {
51-
// execute query with odps client
52-
c.logger.Info(fmt.Sprintf("query to execute:\n%s", query))
53-
if err := c.OdpsClient.ExecSQL(ctx, query); err != nil {
54-
return errors.WithStack(err)
55-
}
53+
func (c *Client) ExecuteFn(id int) func(context.Context, string, map[string]string) error {
54+
return func(ctx context.Context, query string, additionalHints map[string]string) error {
55+
// execute query with odps client
56+
c.logger.Info(fmt.Sprintf("[sequence: %d] query to execute:\n%s", id, query))
57+
// Merge additionalHints with the id
58+
if additionalHints == nil {
59+
additionalHints = make(map[string]string)
60+
}
61+
additionalHints[SqlScriptSequenceHint] = fmt.Sprintf("%d", id)
5662

57-
c.logger.Info("execution done")
58-
return nil
63+
if err := c.OdpsClient.ExecSQL(ctx, query, additionalHints); err != nil {
64+
return errors.WithStack(err)
65+
}
66+
67+
c.logger.Info(fmt.Sprintf("[sequence: %d] execution done", id))
68+
return nil
69+
}
5970
}

mc2mc/internal/client/odps.go

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ type odpsClient struct {
1717
client *odps.Odps
1818

1919
logViewRetentionInDays int
20-
additionalHints map[string]string
2120
isDryRun bool
2221
}
2322

@@ -33,12 +32,14 @@ func NewODPSClient(logger *slog.Logger, client *odps.Odps) *odpsClient {
3332
// ExecSQL executes the given query in syncronous mode (blocking)
3433
// with capability to do graceful shutdown by terminating task instance
3534
// when context is cancelled.
36-
func (c *odpsClient) ExecSQL(ctx context.Context, query string) error {
35+
func (c *odpsClient) ExecSQL(ctx context.Context, query string, additionalHints map[string]string) error {
3736
if c.isDryRun {
3837
c.logger.Info("dry run mode, skipping execution")
3938
return nil
4039
}
41-
hints := addHints(c.additionalHints, query)
40+
41+
hints := addHints(additionalHints, query)
42+
4243
taskIns, err := c.client.ExecSQlWithHints(query, hints)
4344
if err != nil {
4445
return errors.WithStack(err)
@@ -50,24 +51,19 @@ func (c *odpsClient) ExecSQL(ctx context.Context, query string) error {
5051
err = e.Join(err, taskIns.Terminate())
5152
return errors.WithStack(err)
5253
}
53-
c.logger.Info(fmt.Sprintf("taskId: %s, log view: %s", taskIns.Id(), url))
54+
c.logger.Info(fmt.Sprintf("taskId: %s, log view: %s, hints: (%s)", taskIns.Id(), url, getHintsString(hints)))
5455

5556
// wait execution success
5657
select {
5758
case <-ctx.Done():
5859
c.logger.Info("context cancelled, terminating task instance")
59-
err := taskIns.Terminate()
60+
err := c.terminate(taskIns)
6061
return e.Join(ctx.Err(), err)
6162
case err := <-c.wait(taskIns):
6263
return errors.WithStack(err)
6364
}
6465
}
6566

66-
// SetAdditionalHints sets the additional hints for the odps client
67-
func (c *odpsClient) SetAdditionalHints(hints map[string]string) {
68-
c.additionalHints = hints
69-
}
70-
7167
// SetLogViewRetentionInDays sets the log view retention in days
7268
func (c *odpsClient) SetLogViewRetentionInDays(days int) {
7369
c.logViewRetentionInDays = days
@@ -217,3 +213,14 @@ func retry(l *slog.Logger, retryMax int, retryBackoffMs int64, f func() error) e
217213

218214
return err
219215
}
216+
217+
func getHintsString(hints map[string]string) string {
218+
if hints == nil {
219+
return ""
220+
}
221+
var hintsStr []string
222+
for k, v := range hints {
223+
hintsStr = append(hintsStr, fmt.Sprintf("%s: %s", k, v))
224+
}
225+
return strings.Join(hintsStr, ", ")
226+
}

mc2mc/internal/client/setup.go

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,18 +20,6 @@ func SetupDryRun(dryRun bool) SetupFn {
2020
}
2121
}
2222

23-
func SetupAdditionalHints(hints map[string]string) SetupFn {
24-
return func(c *Client) error {
25-
if c.OdpsClient == nil {
26-
return errors.New("odps client is required")
27-
}
28-
if hints != nil {
29-
c.OdpsClient.SetAdditionalHints(hints)
30-
}
31-
return nil
32-
}
33-
}
34-
3523
func SetUpLogViewRetentionInDays(days int) SetupFn {
3624
return func(c *Client) error {
3725
if c.OdpsClient == nil {

mc2mc/mc2mc.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ func mc2mc(envs []string) error {
4444
client.SetupODPSClient(cfg.GenOdps()),
4545
client.SetupDefaultProject(cfg.ExecutionProject),
4646
client.SetUpLogViewRetentionInDays(cfg.LogViewRetentionInDays),
47-
client.SetupAdditionalHints(cfg.AdditionalHints),
4847
client.SetupDryRun(cfg.DryRun),
4948
)
5049
if err != nil {
@@ -162,23 +161,24 @@ func mc2mc(envs []string) error {
162161

163162
// only support concurrent execution for REPLACE method
164163
if cfg.LoadMethod == "REPLACE" {
165-
return executeConcurrently(ctx, c, cfg.Concurrency, queriesToExecute)
164+
return executeConcurrently(ctx, c, cfg.Concurrency, queriesToExecute, cfg.AdditionalHints)
166165
}
167166
// otherwise execute sequentially
168-
return execute(ctx, c, queriesToExecute)
167+
return execute(ctx, c, queriesToExecute, cfg.AdditionalHints)
169168
}
170169

171-
func executeConcurrently(ctx context.Context, c *client.Client, concurrency int, queriesToExecute []string) error {
170+
func executeConcurrently(ctx context.Context, c *client.Client, concurrency int, queriesToExecute []string, additionalHints map[string]string) error {
172171
// execute query concurrently
173172
sem := make(chan uint8, concurrency)
174173
wg := sync.WaitGroup{}
175174
wg.Add(len(queriesToExecute))
176175
errChan := make(chan error, len(queriesToExecute))
177176

178-
for _, queryToExecute := range queriesToExecute {
177+
for i, queryToExecute := range queriesToExecute {
179178
sem <- 0
179+
executeFn := c.ExecuteFn(i + 1)
180180
go func(queryToExecute string, errChan chan error) {
181-
err := c.Execute(ctx, queryToExecute)
181+
err := executeFn(ctx, queryToExecute, additionalHints)
182182
if err != nil {
183183
errChan <- errors.WithStack(err)
184184
}
@@ -200,9 +200,10 @@ func executeConcurrently(ctx context.Context, c *client.Client, concurrency int,
200200
return errs
201201
}
202202

203-
func execute(ctx context.Context, c *client.Client, queriesToExecute []string) error {
204-
for _, queryToExecute := range queriesToExecute {
205-
err := c.Execute(ctx, queryToExecute)
203+
func execute(ctx context.Context, c *client.Client, queriesToExecute []string, additionalHints map[string]string) error {
204+
for i, queryToExecute := range queriesToExecute {
205+
executeFn := c.ExecuteFn(i + 1)
206+
err := executeFn(ctx, queryToExecute, additionalHints)
206207
if err != nil {
207208
return errors.WithStack(err)
208209
}

0 commit comments

Comments
 (0)