Skip to content

Commit 8561f9d

Browse files
committed
feat(mc2mc): add additional hints on query execution
1 parent 484e037 commit 8561f9d

File tree

5 files changed

+42
-19
lines changed

5 files changed

+42
-19
lines changed

mc2mc/internal/client/client.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ type OdpsClient interface {
1313
ExecSQL(ctx context.Context, query string) error
1414
SetDefaultProject(project string)
1515
SetLogViewRetentionInDays(days int)
16+
SetAdditionalHints(hints map[string]string)
1617
}
1718

1819
type Client struct {

mc2mc/internal/client/odps.go

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ type odpsClient struct {
1616
client *odps.Odps
1717

1818
logViewRetentionInDays int
19+
additionalHints map[string]string
1920
}
2021

2122
// NewODPSClient creates a new odpsClient instance
@@ -31,7 +32,7 @@ func NewODPSClient(logger *slog.Logger, client *odps.Odps) *odpsClient {
3132
// with capability to do graceful shutdown by terminating task instance
3233
// when context is cancelled.
3334
func (c *odpsClient) ExecSQL(ctx context.Context, query string) error {
34-
hints := addHints(query)
35+
hints := addHints(c.additionalHints, query)
3536
taskIns, err := c.client.ExecSQlWithHints(query, hints)
3637
if err != nil {
3738
return errors.WithStack(err)
@@ -57,6 +58,11 @@ func (c *odpsClient) ExecSQL(ctx context.Context, query string) error {
5758
}
5859
}
5960

61+
// SetAdditionalHints sets the additional hints for the odps client
62+
func (c *odpsClient) SetAdditionalHints(hints map[string]string) {
63+
c.additionalHints = hints
64+
}
65+
6066
// SetLogViewRetentionInDays sets the log view retention in days
6167
func (c *odpsClient) SetLogViewRetentionInDays(days int) {
6268
c.logViewRetentionInDays = days
@@ -108,15 +114,17 @@ func wait(taskIns *odps.Instance) <-chan error {
108114
return errChan
109115
}
110116

111-
func addHints(query string) map[string]string {
117+
func addHints(additionalHints map[string]string, query string) map[string]string {
118+
hints := make(map[string]string)
119+
for k, v := range additionalHints {
120+
hints[k] = v
121+
}
112122
multisql := strings.Contains(query, ";")
113123
if multisql {
114-
return map[string]string{
115-
"odps.sql.submit.mode": "script",
116-
}
124+
hints["odps.sql.submit.mode"] = "script"
117125
}
118126

119-
return nil
127+
return hints
120128
}
121129

122130
// getTable returns the table with the given tableID

mc2mc/internal/client/setup.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,18 @@ import (
1010

1111
type SetupFn func(c *Client) error
1212

13+
func SetupAdditionalHints(hints map[string]string) SetupFn {
14+
return func(c *Client) error {
15+
if c.OdpsClient == nil {
16+
return errors.New("odps client is required")
17+
}
18+
if hints != nil {
19+
c.OdpsClient.SetAdditionalHints(hints)
20+
}
21+
return nil
22+
}
23+
}
24+
1325
func SetUpLogViewRetentionInDays(days int) SetupFn {
1426
return func(c *Client) error {
1527
if c.OdpsClient == nil {

mc2mc/internal/config/config.go

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,19 +9,20 @@ import (
99

1010
// ConfigEnv is a mc configuration for the component.
1111
type ConfigEnv struct {
12-
LogLevel string `env:"LOG_LEVEL" envDefault:"INFO"`
13-
OtelCollectorGRPCEndpoint string `env:"OTEL_COLLECTOR_GRPC_ENDPOINT"`
14-
OtelAttributes string `env:"OTEL_ATTRIBUTES"`
15-
MCServiceAccount string `env:"MC_SERVICE_ACCOUNT"`
16-
LoadMethod string `env:"LOAD_METHOD" envDefault:"APPEND"`
17-
QueryFilePath string `env:"QUERY_FILE_PATH" envDefault:"/data/in/query.sql"`
18-
DestinationTableID string `env:"DESTINATION_TABLE_ID"`
19-
DStart string `env:"DSTART"`
20-
DEnd string `env:"DEND"`
21-
ExecutionProject string `env:"EXECUTION_PROJECT"`
22-
Concurrency int `env:"CONCURRENCY" envDefault:"7"`
23-
LogViewRetentionInDays int `env:"LOG_VIEW_RETENTION_IN_DAYS" envDefault:"2"`
24-
DisableMultiQueryGeneration bool `env:"DISABLE_MULTI_QUERY_GENERATION" envDefault:"false"`
12+
LogLevel string `env:"LOG_LEVEL" envDefault:"INFO"`
13+
OtelCollectorGRPCEndpoint string `env:"OTEL_COLLECTOR_GRPC_ENDPOINT"`
14+
OtelAttributes string `env:"OTEL_ATTRIBUTES"`
15+
MCServiceAccount string `env:"MC_SERVICE_ACCOUNT"`
16+
LoadMethod string `env:"LOAD_METHOD" envDefault:"APPEND"`
17+
QueryFilePath string `env:"QUERY_FILE_PATH" envDefault:"/data/in/query.sql"`
18+
DestinationTableID string `env:"DESTINATION_TABLE_ID"`
19+
DStart string `env:"DSTART"`
20+
DEnd string `env:"DEND"`
21+
ExecutionProject string `env:"EXECUTION_PROJECT"`
22+
Concurrency int `env:"CONCURRENCY" envDefault:"7"`
23+
AdditionalHints map[string]string `env:"ADDITIONAL_HINTS" envKeyValSeparator:"=" envSeparator:","`
24+
LogViewRetentionInDays int `env:"LOG_VIEW_RETENTION_IN_DAYS" envDefault:"2"`
25+
DisableMultiQueryGeneration bool `env:"DISABLE_MULTI_QUERY_GENERATION" envDefault:"false"`
2526
// TODO: delete this
2627
DevEnablePartitionValue string `env:"DEV__ENABLE_PARTITION_VALUE" envDefault:"false"`
2728
DevEnableAutoPartition string `env:"DEV__ENABLE_AUTO_PARTITION" envDefault:"false"`

mc2mc/mc2mc.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ func mc2mc(envs []string) error {
4444
client.SetupODPSClient(cfg.GenOdps()),
4545
client.SetupDefaultProject(cfg.ExecutionProject),
4646
client.SetUpLogViewRetentionInDays(cfg.LogViewRetentionInDays),
47+
client.SetupAdditionalHints(cfg.AdditionalHints),
4748
)
4849
if err != nil {
4950
return errors.WithStack(err)

0 commit comments

Comments
 (0)