Skip to content

Commit 484e037

Browse files
authored
feat(mc2mc): capability to run query with actual ordering (#78)
feat: capability to run query with proper ordering
1 parent 5c8313e commit 484e037

File tree

4 files changed

+242
-50
lines changed

4 files changed

+242
-50
lines changed

mc2mc/internal/query/builder.go

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -60,20 +60,21 @@ func (b *Builder) Build() (string, error) {
6060
return "", errors.New("query is required")
6161
}
6262

63-
// separate headers, variables and udfs from the query
64-
hr, query := SeparateHeadersAndQuery(b.query)
65-
varsAndUDFs, query := SeparateVariablesUDFsAndQuery(query)
66-
drops, query := SeparateDropsAndQuery(query)
67-
6863
if b.method == MERGE {
69-
queries := semicolonPattern.Split(query, -1)
64+
// split query components
65+
hrs, vars, queries := SplitQueryComponents(b.query)
7066
if len(queries) <= 1 {
7167
return b.query, nil
7268
}
73-
query = b.constructMergeQuery(hr, drops, varsAndUDFs, queries)
69+
query := b.constructMergeQuery(hrs, vars, queries)
7470
return query, nil
7571
}
7672

73+
// separate headers, variables and udfs from the query
74+
hr, query := SeparateHeadersAndQuery(b.query)
75+
varsAndUDFs, query := SeparateVariablesUDFsAndQuery(query)
76+
drops, query := SeparateDropsAndQuery(query)
77+
7778
// destination table is required for append and replace method
7879
if b.destinationTableID == "" {
7980
return "", errors.New("destination table is required")
@@ -187,21 +188,20 @@ func (b *Builder) constructOverridedValues(query string) (string, error) {
187188
}
188189

189190
// constructMergeQueries constructs merge queries with headers and variables
190-
func (b *Builder) constructMergeQuery(hr, drops, varsAndUDFs string, queries []string) string {
191+
func (b *Builder) constructMergeQuery(hrs, vars, queries []string) string {
191192
builder := strings.Builder{}
192-
if drops != "" {
193-
builder.WriteString(fmt.Sprintf("%s\n", hr))
194-
builder.WriteString(fmt.Sprintf("%s\n", drops))
195-
builder.WriteString(fmt.Sprintf("%s\n", BREAK_MARKER))
196-
}
197193
for i, q := range queries {
198194
q = strings.TrimSpace(q)
199195
if q == "" || strings.TrimSpace(RemoveComments(q)) == "" {
200196
continue
201197
}
202-
builder.WriteString(fmt.Sprintf("%s\n", hr))
203-
if varsAndUDFs != "" {
204-
builder.WriteString(fmt.Sprintf("%s\n", varsAndUDFs))
198+
headers := JoinSliceString(hrs[:i+1], "\n")
199+
variables := JoinSliceString(vars[:i+1], "\n")
200+
if headers != "" {
201+
builder.WriteString(fmt.Sprintf("%s\n", headers))
202+
}
203+
if variables != "" {
204+
builder.WriteString(fmt.Sprintf("%s\n", variables))
205205
}
206206
builder.WriteString(fmt.Sprintf("%s\n;", q))
207207
if i < len(queries)-1 {

mc2mc/internal/query/builder_test.go

Lines changed: 48 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -540,16 +540,12 @@ SET append_test.id = 2;`
540540
assert.NoError(t, err)
541541
assert.Equal(t, `SET odps.table.append2.enable=true
542542
;
543-
@src := SELECT 1 id
544-
;
545543
CREATE TABLE IF NOT EXISTS append_test (id bigint)
546544
TBLPROPERTIES('table.format.version'='2')
547545
;
548546
--*--optimus-break-marker--*--
549547
SET odps.table.append2.enable=true
550548
;
551-
@src := SELECT 1 id
552-
;
553549
INSERT OVERWRITE TABLE append_test VALUES(0),(1)
554550
;
555551
--*--optimus-break-marker--*--
@@ -601,19 +597,6 @@ SET append_test.id = 2;`
601597
assert.NoError(t, err)
602598
assert.Equal(t, `SET odps.table.append2.enable=true
603599
;
604-
FUNCTION castStringToBoolean (@field STRING) AS CASE
605-
WHEN TOLOWER(@field) = '1.0' THEN true
606-
WHEN TOLOWER(@field) = '0.0' THEN false
607-
WHEN TOLOWER(@field) = '1' THEN true
608-
WHEN TOLOWER(@field) = '0' THEN false
609-
WHEN TOLOWER(@field) = 'true' THEN true
610-
WHEN TOLOWER(@field) = 'false' THEN false
611-
END
612-
;
613-
function my_add(@a BIGINT) as @a + 1
614-
;
615-
@src := SELECT my_add(1) id
616-
;
617600
CREATE TABLE IF NOT EXISTS append_test (id bigint)
618601
TBLPROPERTIES('table.format.version'='2')
619602
;
@@ -631,8 +614,6 @@ END
631614
;
632615
function my_add(@a BIGINT) as @a + 1
633616
;
634-
@src := SELECT my_add(1) id
635-
;
636617
INSERT OVERWRITE TABLE append_test VALUES(0),(1)
637618
;
638619
--*--optimus-break-marker--*--
@@ -697,19 +678,6 @@ SET append_test.id = 2;`
697678
assert.NoError(t, err)
698679
assert.Equal(t, `SET odps.table.append2.enable=true
699680
;
700-
FUNCTION castStringToBoolean (@field STRING) AS CASE
701-
WHEN TOLOWER(@field) = '1.0' THEN true
702-
WHEN TOLOWER(@field) = '0.0' THEN false
703-
WHEN TOLOWER(@field) = '1' THEN true
704-
WHEN TOLOWER(@field) = '0' THEN false
705-
WHEN TOLOWER(@field) = 'true' THEN true
706-
WHEN TOLOWER(@field) = 'false' THEN false
707-
END
708-
;
709-
function my_add(@a BIGINT) as @a + 1
710-
;
711-
@src := SELECT my_add(1) id
712-
;
713681
CREATE TABLE IF NOT EXISTS append_test (id bigint)
714682
TBLPROPERTIES('table.format.version'='2')
715683
;
@@ -727,8 +695,6 @@ END
727695
;
728696
function my_add(@a BIGINT) as @a + 1
729697
;
730-
@src := SELECT my_add(1) id
731-
;
732698
INSERT OVERWRITE TABLE append_test VALUES(0),(1)
733699
;
734700
--*--optimus-break-marker--*--
@@ -752,6 +718,54 @@ USING (SELECT castStringToBoolean(id) FROM @src) source
752718
on append_test.id = source.id
753719
WHEN MATCHED THEN UPDATE
754720
SET append_test.id = 2
721+
;`, query)
722+
})
723+
724+
t.Run("returns query for merge load method with proper variable ordering", func(t *testing.T) {
725+
queryToExecute := `SET odps.table.append2.enable=true;
726+
DROP TABLE IF EXISTS append_tmp;
727+
@src := SELECT 1 id;
728+
729+
CREATE TABLE append_tmp AS SELECT * FROM @src;
730+
731+
@src2 := SELECT id FROM append_tmp;
732+
733+
MERGE INTO append_test
734+
USING (SELECT * FROM @src2) source
735+
on append_test.id = source.id
736+
WHEN MATCHED THEN UPDATE
737+
SET append_test.id = 2;`
738+
odspClient := &mockOdpsClient{}
739+
query, err := query.NewBuilder(
740+
logger.NewDefaultLogger(),
741+
odspClient,
742+
query.WithQuery(queryToExecute),
743+
query.WithMethod(query.MERGE),
744+
).Build()
745+
assert.NoError(t, err)
746+
assert.Equal(t, `SET odps.table.append2.enable=true
747+
;
748+
DROP TABLE IF EXISTS append_tmp
749+
;
750+
--*--optimus-break-marker--*--
751+
SET odps.table.append2.enable=true
752+
;
753+
@src := SELECT 1 id
754+
;
755+
CREATE TABLE append_tmp AS SELECT * FROM @src
756+
;
757+
--*--optimus-break-marker--*--
758+
SET odps.table.append2.enable=true
759+
;
760+
@src := SELECT 1 id
761+
;
762+
@src2 := SELECT id FROM append_tmp
763+
;
764+
MERGE INTO append_test
765+
USING (SELECT * FROM @src2) source
766+
on append_test.id = source.id
767+
WHEN MATCHED THEN UPDATE
768+
SET append_test.id = 2
755769
;`, query)
756770
})
757771
}

mc2mc/internal/query/helper.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,70 @@ var (
2222
stringPattern = regexp.MustCompile(`'[^']*'`) // regex to match SQL strings (anything inside single quotes)
2323
)
2424

25+
func SplitQueryComponents(query string) (headers []string, varsUDFs []string, queries []string) {
26+
query = strings.TrimSpace(query)
27+
28+
// extract all header, variable and query lines
29+
stmts := semicolonPattern.Split(query, -1)
30+
queryIndex := 0
31+
for _, stmt := range stmts {
32+
stmt = strings.TrimSpace(stmt)
33+
if stmt == "" {
34+
continue
35+
}
36+
stmtWithoutComment := RemoveComments(stmt)
37+
if headerPattern.MatchString(strings.TrimSpace(stmtWithoutComment)) {
38+
for len(headers) <= queryIndex {
39+
headers = append(headers, "")
40+
}
41+
headers[queryIndex] += strings.TrimSpace(stmt) + "\n;\n"
42+
} else if variablePattern.MatchString(strings.TrimSpace(stmtWithoutComment)) ||
43+
udfPattern.MatchString(strings.TrimSpace(stmtWithoutComment)) {
44+
for len(varsUDFs) <= queryIndex {
45+
varsUDFs = append(varsUDFs, "")
46+
}
47+
varsUDFs[queryIndex] += strings.TrimSpace(stmt) + "\n;\n"
48+
} else if strings.TrimSpace(stmtWithoutComment) == "" {
49+
// if the statement is empty, it's a comment, then omit it
50+
// since it doesn't make sense to execute this statement
51+
} else {
52+
queries = append(queries, stmt)
53+
queryIndex++
54+
}
55+
}
56+
57+
// fill in empty headers and varsUDFs + clear whitespace
58+
for i := range queries {
59+
if len(headers) == i {
60+
headers = append(headers, "")
61+
}
62+
if len(varsUDFs) == i {
63+
varsUDFs = append(varsUDFs, "")
64+
}
65+
headers[i] = strings.TrimSpace(headers[i])
66+
varsUDFs[i] = strings.TrimSpace(varsUDFs[i])
67+
queries[i] = strings.TrimSpace(queries[i])
68+
}
69+
70+
return headers, varsUDFs, queries
71+
}
72+
73+
// JoinSliceString joins a slice of strings with a delimiter
74+
// and skips empty strings
75+
func JoinSliceString(slice []string, delimiter string) string {
76+
builder := strings.Builder{}
77+
for i, s := range slice {
78+
if s == "" {
79+
continue
80+
}
81+
if i > 0 {
82+
builder.WriteString(delimiter)
83+
}
84+
builder.WriteString(s)
85+
}
86+
return strings.TrimSpace(builder.String())
87+
}
88+
2589
func SeparateHeadersAndQuery(query string) (string, string) {
2690
headers := []string{}
2791
query = strings.TrimSpace(query)

mc2mc/internal/query/helper_test.go

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,120 @@ import (
88
"github.com/goto/transformers/mc2mc/internal/query"
99
)
1010

11+
func TestSplitQueryComponents(t *testing.T) {
12+
t.Run("returns query without headers and variables", func(t *testing.T) {
13+
q1 := `select * from playground`
14+
headers, varsUDFs, queries := query.SplitQueryComponents(q1)
15+
assert.Len(t, headers, 1)
16+
assert.Len(t, varsUDFs, 1)
17+
assert.Len(t, queries, 1)
18+
assert.Empty(t, headers[0])
19+
assert.Empty(t, varsUDFs[0])
20+
assert.Equal(t, q1, queries[0])
21+
})
22+
t.Run("returns headers, vars, and queries with proper order", func(t *testing.T) {
23+
q1 := `set odps.sql.allow.fullscan=true;
24+
set odps.sql.python.version=cp37;
25+
DROP TABLE IF EXISTS append_test_tmp;
26+
27+
@src := SELECT 1 id;
28+
@src2 := SELECT id
29+
FROM @src
30+
WHERE id = 1;
31+
CREATE TABLE append_test_tmp AS SELECT * FROM @src2;
32+
33+
MERGE INTO append_test_tmp USING (SELECT * FROM @src) source
34+
on append_test_tmp.id = source.id
35+
WHEN MATCHED THEN UPDATE
36+
SET append_test_tmp.id = 2;
37+
38+
@src3 := SELECT id FROM append_test_tmp WHERE id = 2;
39+
MERGE INTO append_test USING (SELECT * FROM @src3) source
40+
on append_test.id = source.id
41+
WHEN MATCHED THEN UPDATE
42+
SET append_test.id = 3;
43+
44+
MERGE INTO append_test USING (SELECT * FROM @src3) source
45+
on append_test.id = source.id
46+
WHEN MATCHED THEN UPDATE
47+
SET append_test.id = 3;
48+
`
49+
headers, varsUDFs, queries := query.SplitQueryComponents(q1)
50+
assert.Len(t, headers, 5)
51+
assert.Len(t, varsUDFs, 5)
52+
assert.Len(t, queries, 5)
53+
54+
// headers asserts
55+
headersExpected := make([]string, 5)
56+
headersExpected[0] = `set odps.sql.allow.fullscan=true
57+
;
58+
set odps.sql.python.version=cp37
59+
;`
60+
headersExpected[1] = ""
61+
headersExpected[2] = ""
62+
headersExpected[3] = ""
63+
headersExpected[4] = ""
64+
65+
// vars asserts
66+
varsExpected := make([]string, 5)
67+
varsExpected[0] = ""
68+
varsExpected[1] = `@src := SELECT 1 id
69+
;
70+
@src2 := SELECT id
71+
FROM @src
72+
WHERE id = 1
73+
;`
74+
varsExpected[2] = ""
75+
varsExpected[3] = `@src3 := SELECT id FROM append_test_tmp WHERE id = 2
76+
;`
77+
varsExpected[4] = ""
78+
79+
// queries asserts
80+
queriesExpected := make([]string, 5)
81+
queriesExpected[0] = "DROP TABLE IF EXISTS append_test_tmp"
82+
queriesExpected[1] = "CREATE TABLE append_test_tmp AS SELECT * FROM @src2"
83+
queriesExpected[2] = `MERGE INTO append_test_tmp USING (SELECT * FROM @src) source
84+
on append_test_tmp.id = source.id
85+
WHEN MATCHED THEN UPDATE
86+
SET append_test_tmp.id = 2`
87+
queriesExpected[3] = `MERGE INTO append_test USING (SELECT * FROM @src3) source
88+
on append_test.id = source.id
89+
WHEN MATCHED THEN UPDATE
90+
SET append_test.id = 3`
91+
queriesExpected[4] = `MERGE INTO append_test USING (SELECT * FROM @src3) source
92+
on append_test.id = source.id
93+
WHEN MATCHED THEN UPDATE
94+
SET append_test.id = 3`
95+
96+
for i := range queries {
97+
assert.Equal(t, headersExpected[i], headers[i])
98+
assert.Equal(t, varsExpected[i], varsUDFs[i])
99+
assert.Equal(t, queriesExpected[i], queries[i])
100+
}
101+
})
102+
}
103+
104+
func TestJoinSliceString(t *testing.T) {
105+
t.Run("returns empty string for empty slice", func(t *testing.T) {
106+
slice := []string{}
107+
delimiter := ";"
108+
result := query.JoinSliceString(slice, delimiter)
109+
assert.Empty(t, result)
110+
})
111+
t.Run("returns joined string with delimiter", func(t *testing.T) {
112+
slice := []string{"set odps.sql.allow.fullscan=true", "set odps.sql.python.version=cp37"}
113+
delimiter := ";"
114+
result := query.JoinSliceString(slice, delimiter)
115+
assert.Equal(t, "set odps.sql.allow.fullscan=true;set odps.sql.python.version=cp37", result)
116+
})
117+
t.Run("returns joined string with delimiter and skips empty strings", func(t *testing.T) {
118+
slice := []string{"set odps.sql.allow.fullscan=true", "", "set odps.sql.python.version=cp37"}
119+
delimiter := ";"
120+
result := query.JoinSliceString(slice, delimiter)
121+
assert.Equal(t, "set odps.sql.allow.fullscan=true;set odps.sql.python.version=cp37", result)
122+
})
123+
}
124+
11125
func TestSeparateHeadersAndQuery(t *testing.T) {
12126
t.Run("returns query without macros", func(t *testing.T) {
13127
q1 := `select * from playground`

0 commit comments

Comments
 (0)