Skip to content

Commit 3c07b96

Browse files
committed
feat: add temporary support for adding _partitionvalue field during insertion
1 parent ed0cddf commit 3c07b96

File tree

4 files changed

+23
-0
lines changed

4 files changed

+23
-0
lines changed

mc2mc/internal/client/client.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ type Client struct {
2828
appCtx context.Context
2929
logger *slog.Logger
3030
shutdownFns []func() error
31+
32+
// TODO: remove this temporary capability after 15 nov
33+
enablePartitionValue bool
3134
}
3235

3336
func NewClient(ctx context.Context, setupFns ...SetupFn) (*Client, error) {
@@ -59,6 +62,9 @@ func (c *Client) Execute(ctx context.Context, tableID, queryFilePath string) err
5962
if err != nil {
6063
return errors.WithStack(err)
6164
}
65+
if c.enablePartitionValue {
66+
queryRaw = addPartitionValueColumn(queryRaw)
67+
}
6268

6369
// check if table is partitioned
6470
partitionNames, err := c.OdpsClient.GetPartitionNames(ctx, tableID)
@@ -82,3 +88,8 @@ func (c *Client) Execute(ctx context.Context, tableID, queryFilePath string) err
8288
c.logger.Info("execution done")
8389
return errors.WithStack(err)
8490
}
91+
92+
// TODO: remove this temporary support after 15 nov
93+
func addPartitionValueColumn(rawQuery []byte) []byte {
94+
return []byte(fmt.Sprintf("SELECT *, STRING(CURRENT_DATE()) as __partitionvalue FROM (%s);", rawQuery))
95+
}

mc2mc/internal/client/setup.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,3 +51,10 @@ func SetupLoader(loadMethod string) SetupFn {
5151
return nil
5252
}
5353
}
54+
55+
func EnablePartitionValue(enabled bool) SetupFn {
56+
return func(c *Client) error {
57+
c.enablePartitionValue = enabled
58+
return nil
59+
}
60+
}

mc2mc/internal/config/config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ type Config struct {
1616
OtelCollectorGRPCEndpoint string
1717
JobName string
1818
ScheduledTime string
19+
// TODO: remove this temporary support after 15 nov 2024
20+
DevEnablePartitionValue bool
1921
}
2022

2123
type maxComputeCredentials struct {
@@ -37,6 +39,8 @@ func NewConfig() (*Config, error) {
3739
OtelCollectorGRPCEndpoint: getEnv("OTEL_COLLECTOR_GRPC_ENDPOINT", ""),
3840
JobName: getJobName(),
3941
ScheduledTime: getEnv("SCHEDULED_TIME", ""),
42+
// TODO: delete this after 15 nov
43+
DevEnablePartitionValue: getEnv("DEV__ENABLE_PARTITION_VALUE", "false") == "true",
4044
}
4145
// ali-odps-go-sdk related config
4246
scvAcc := getEnv("SERVICE_ACCOUNT", "")

mc2mc/mc2mc.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ func mc2mc() error {
2929
client.SetupOTelSDK(cfg.OtelCollectorGRPCEndpoint, cfg.JobName, cfg.ScheduledTime),
3030
client.SetupODPSClient(cfg.GenOdps()),
3131
client.SetupLoader(cfg.LoadMethod),
32+
client.EnablePartitionValue(cfg.DevEnablePartitionValue),
3233
)
3334
if err != nil {
3435
return errors.WithStack(err)

0 commit comments

Comments
 (0)