Skip to content

Commit ce215d8

Browse files
authored
feat: add support for auto partition table (#47)
feat: add support for auto partition using flag toggle
1 parent 9d84622 commit ce215d8

File tree

5 files changed

+64
-3
lines changed

5 files changed

+64
-3
lines changed

mc2mc/internal/client/client.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ type Client struct {
3333

3434
// TODO: remove this temporary capability after 15 nov
3535
enablePartitionValue bool
36+
enableAutoPartition bool
3637
}
3738

3839
func NewClient(ctx context.Context, setupFns ...SetupFn) (*Client, error) {
@@ -64,7 +65,7 @@ func (c *Client) Execute(ctx context.Context, tableID, queryFilePath string) err
6465
if err != nil {
6566
return errors.WithStack(err)
6667
}
67-
if c.enablePartitionValue {
68+
if c.enablePartitionValue && !c.enableAutoPartition {
6869
queryRaw = addPartitionValueColumn(queryRaw)
6970
}
7071

@@ -76,7 +77,8 @@ func (c *Client) Execute(ctx context.Context, tableID, queryFilePath string) err
7677

7778
// prepare query
7879
queryToExec := c.Loader.GetQuery(tableID, string(queryRaw))
79-
if len(partitionNames) > 0 {
80+
if len(partitionNames) > 0 && !c.enableAutoPartition {
81+
// when table is partitioned and auto partition is disabled, then we need to specify partition columns explicitly
8082
c.logger.Info(fmt.Sprintf("table %s is partitioned by %s", tableID, strings.Join(partitionNames, ", ")))
8183
queryToExec = c.Loader.GetPartitionedQuery(tableID, string(queryRaw), partitionNames)
8284
}

mc2mc/internal/client/client_test.go

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ func TestExecute(t *testing.T) {
6060
})
6161
t.Run("should return nil when everything is successful", func(t *testing.T) {
6262
// arrange
63-
client, err := client.NewClient(context.TODO(), client.SetupLogger("error"), client.SetupLoader("APPEND"))
63+
client, err := client.NewClient(context.TODO(), client.SetupLogger("error"), client.SetupLoader("REPLACE"))
6464
require.NoError(t, err)
6565
client.OdpsClient = &mockOdpsClient{
6666
partitionResult: func() ([]string, error) {
@@ -70,6 +70,42 @@ func TestExecute(t *testing.T) {
7070
return nil
7171
},
7272
}
73+
client.Loader = &mockLoader{
74+
getQueryFunc: func(tableID, query string) string {
75+
return "INSERT OVERWRITE TABLE project_test.table_test SELECT * FROM table;"
76+
},
77+
getPartitionedQueryFunc: func(tableID, query string, partitionNames []string) string {
78+
assert.True(t, true, "should be called")
79+
return "INSERT OVERWRITE TABLE project_test.table_test PARTITION(event_date) SELECT * FROM table;"
80+
},
81+
}
82+
require.NoError(t, os.WriteFile("/tmp/query.sql", []byte("SELECT * FROM table;"), 0644))
83+
// act
84+
err = client.Execute(context.TODO(), "project_test.table_test", "/tmp/query.sql")
85+
// assert
86+
assert.NoError(t, err)
87+
})
88+
t.Run("should return nil when everything is successful with enable auto partition", func(t *testing.T) {
89+
// arrange
90+
client, err := client.NewClient(context.TODO(), client.SetupLogger("error"), client.SetupLoader("REPLACE"), client.EnableAutoPartition(true))
91+
require.NoError(t, err)
92+
client.OdpsClient = &mockOdpsClient{
93+
partitionResult: func() ([]string, error) {
94+
return []string{"_partition_value"}, nil
95+
},
96+
execSQLResult: func() error {
97+
return nil
98+
},
99+
}
100+
client.Loader = &mockLoader{
101+
getQueryFunc: func(tableID, query string) string {
102+
return "INSERT OVERWRITE TABLE project_test.table_test SELECT * FROM table;"
103+
},
104+
getPartitionedQueryFunc: func(tableID, query string, partitionNames []string) string {
105+
assert.False(t, true, "should not be called")
106+
return "INSERT OVERWRITE TABLE project_test.table_test PARTITION(event_date) SELECT * FROM table;"
107+
},
108+
}
73109
require.NoError(t, os.WriteFile("/tmp/query.sql", []byte("SELECT * FROM table;"), 0644))
74110
// act
75111
err = client.Execute(context.TODO(), "project_test.table_test", "/tmp/query.sql")
@@ -90,3 +126,16 @@ func (m *mockOdpsClient) GetPartitionNames(ctx context.Context, tableID string)
90126
func (m *mockOdpsClient) ExecSQL(ctx context.Context, query string) error {
91127
return m.execSQLResult()
92128
}
129+
130+
type mockLoader struct {
131+
getQueryFunc func(tableID, query string) string
132+
getPartitionedQueryFunc func(tableID, query string, partitionNames []string) string
133+
}
134+
135+
func (m *mockLoader) GetQuery(tableID, query string) string {
136+
return m.getQueryFunc(tableID, query)
137+
}
138+
139+
func (m *mockLoader) GetPartitionedQuery(tableID, query string, partitionNames []string) string {
140+
return m.getPartitionedQueryFunc(tableID, query, partitionNames)
141+
}

mc2mc/internal/client/setup.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,3 +59,10 @@ func EnablePartitionValue(enabled bool) SetupFn {
5959
return nil
6060
}
6161
}
62+
63+
func EnableAutoPartition(enabled bool) SetupFn {
64+
return func(c *Client) error {
65+
c.enableAutoPartition = enabled
66+
return nil
67+
}
68+
}

mc2mc/internal/config/config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ type Config struct {
1818
ScheduledTime string
1919
// TODO: remove this temporary support after 15 nov 2024
2020
DevEnablePartitionValue bool
21+
DevEnableAutoPartition bool
2122
}
2223

2324
type maxComputeCredentials struct {
@@ -41,6 +42,7 @@ func NewConfig() (*Config, error) {
4142
ScheduledTime: getEnv("SCHEDULED_TIME", ""),
4243
// TODO: delete this after 15 nov
4344
DevEnablePartitionValue: getEnv("DEV__ENABLE_PARTITION_VALUE", "false") == "true",
45+
DevEnableAutoPartition: getEnv("DEV__ENABLE_AUTO_PARTITION", "false") == "true",
4446
}
4547
// ali-odps-go-sdk related config
4648
scvAcc := getEnv("MC_SERVICE_ACCOUNT", "")

mc2mc/mc2mc.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ func mc2mc() error {
3131
client.SetupODPSClient(cfg.GenOdps()),
3232
client.SetupLoader(cfg.LoadMethod),
3333
client.EnablePartitionValue(cfg.DevEnablePartitionValue),
34+
client.EnableAutoPartition(cfg.DevEnableAutoPartition),
3435
)
3536
if err != nil {
3637
return errors.WithStack(err)

0 commit comments

Comments
 (0)