Skip to content

Commit 699c0ca

Browse files
authored
Fixed ReplacingMergeTree EngineSpec parsing: is_deleted column presence caused error (#357)
* Fixed ReplacingMergeTree EngineSpec parsing: is_deleted column presence caused error * Preserved is_deleted column in engine spec, added the same fix for ReplicatedReplacingMergeTree
1 parent ea29fb7 commit 699c0ca

File tree

3 files changed

+40
-2
lines changed

3 files changed

+40
-2
lines changed

clickhouse-core/src/main/scala/com/clickhouse/spark/parse/AstVisitor.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,8 @@ class AstVisitor extends ClickHouseSQLBaseVisitor[AnyRef] with Logging {
102102
case eg: String if "ReplacingMergeTree" equalsIgnoreCase eg =>
103103
ReplacingMergeTreeEngineSpec(
104104
engine_clause = engineExpr,
105-
version_column = seqToOption(engineArgs).map(_.asInstanceOf[FieldRef]),
105+
version_column = engineArgs.lift(0).map(_.asInstanceOf[FieldRef]),
106+
is_deleted_column = engineArgs.lift(1).map(_.asInstanceOf[FieldRef]),
106107
_sorting_key = tupleIfNeeded(orderByOpt.toList),
107108
_primary_key = tupleIfNeeded(pkOpt.toList),
108109
_partition_key = tupleIfNeeded(partOpt.toList),
@@ -127,7 +128,8 @@ class AstVisitor extends ClickHouseSQLBaseVisitor[AnyRef] with Logging {
127128
engine_clause = engineExpr,
128129
zk_path = engineArgs.head.asInstanceOf[StringLiteral].value,
129130
replica_name = engineArgs(1).asInstanceOf[StringLiteral].value,
130-
version_column = seqToOption(engineArgs.drop(2)).map(_.asInstanceOf[FieldRef]),
131+
version_column = engineArgs.lift(2).map(_.asInstanceOf[FieldRef]),
132+
is_deleted_column = engineArgs.lift(3).map(_.asInstanceOf[FieldRef]),
131133
_sorting_key = tupleIfNeeded(orderByOpt.toList),
132134
_primary_key = tupleIfNeeded(pkOpt.toList),
133135
_partition_key = tupleIfNeeded(partOpt.toList),

clickhouse-core/src/main/scala/com/clickhouse/spark/spec/TableEngineSpec.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ case class ReplicatedMergeTreeEngineSpec(
8888
case class ReplacingMergeTreeEngineSpec(
8989
engine_clause: String,
9090
version_column: Option[FieldRef] = None,
91+
is_deleted_column: Option[FieldRef] = None,
9192
var _sorting_key: TupleExpr = TupleExpr(List.empty),
9293
var _primary_key: TupleExpr = TupleExpr(List.empty),
9394
var _partition_key: TupleExpr = TupleExpr(List.empty),
@@ -109,6 +110,7 @@ case class ReplicatedReplacingMergeTreeEngineSpec(
109110
zk_path: String,
110111
replica_name: String,
111112
version_column: Option[FieldRef] = None,
113+
is_deleted_column: Option[FieldRef] = None,
112114
var _sorting_key: TupleExpr = TupleExpr(List.empty),
113115
var _primary_key: TupleExpr = TupleExpr(List.empty),
114116
var _partition_key: TupleExpr = TupleExpr(List.empty),

clickhouse-core/src/test/scala/com/clickhouse/spark/parse/SQLParserSuite.scala

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,21 @@ class SQLParserSuite extends AnyFunSuite {
8383
assert(actual === expected)
8484
}
8585

86+
test("parse ReplacingMergeTree - 3") {
87+
val ddl = "ReplacingMergeTree(ts, is_deleted) " +
88+
"PARTITION BY toYYYYMM(created) ORDER BY id SETTINGS index_granularity = 8192"
89+
val actual = parser.parseEngineClause(ddl)
90+
val expected = ReplacingMergeTreeEngineSpec(
91+
engine_clause = "ReplacingMergeTree(ts, is_deleted)",
92+
version_column = Some(FieldRef("ts")),
93+
is_deleted_column = Some(FieldRef("is_deleted")),
94+
_sorting_key = TupleExpr(FieldRef("id") :: Nil),
95+
_partition_key = TupleExpr(List(FuncExpr("toYYYYMM", List(FieldRef("created"))))),
96+
_settings = Map("index_granularity" -> "8192")
97+
)
98+
assert(actual === expected)
99+
}
100+
86101
test("parse ReplicatedReplacingMergeTree - 1") {
87102
val ddl = "ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/wj_report/wj_respondent', '{replica}') " +
88103
"PARTITION BY toYYYYMM(created) ORDER BY id SETTINGS index_granularity = 8192"
@@ -115,6 +130,25 @@ class SQLParserSuite extends AnyFunSuite {
115130
assert(actual === expected)
116131
}
117132

133+
test("parse ReplicatedReplacingMergeTree - 3") {
134+
val ddl = "ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/wj_report/wj_respondent', '{replica}', " +
135+
"ts, is_deleted) PARTITION BY toYYYYMM(created) ORDER BY id SETTINGS index_granularity = 8192"
136+
val actual = parser.parseEngineClause(ddl)
137+
val expected = ReplicatedReplacingMergeTreeEngineSpec(
138+
engine_clause =
139+
"ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/wj_report/wj_respondent', '{replica}', " +
140+
"ts, is_deleted)",
141+
zk_path = "/clickhouse/tables/{shard}/wj_report/wj_respondent",
142+
replica_name = "{replica}",
143+
version_column = Some(FieldRef("ts")),
144+
is_deleted_column = Some(FieldRef("is_deleted")),
145+
_sorting_key = TupleExpr(FieldRef("id") :: Nil),
146+
_partition_key = TupleExpr(List(FuncExpr("toYYYYMM", List(FieldRef("created"))))),
147+
_settings = Map("index_granularity" -> "8192")
148+
)
149+
assert(actual === expected)
150+
}
151+
118152
test("parse Distributed - 1") {
119153
val ddl = "Distributed('default', 'wj_report', 'wj_respondent_local')"
120154
val actual = parser.parseEngineClause(ddl)

0 commit comments

Comments
 (0)