diff --git a/clickhouse-core/src/main/scala/com/clickhouse/spark/spec/MacrosSpec.scala b/clickhouse-core/src/main/scala/com/clickhouse/spark/spec/MacrosSpec.scala
new file mode 100644
index 00000000..03c5b208
--- /dev/null
+++ b/clickhouse-core/src/main/scala/com/clickhouse/spark/spec/MacrosSpec.scala
@@ -0,0 +1,11 @@
+package com.clickhouse.spark.spec
+
+import com.clickhouse.spark.ToJson
+
+case class MacrosSpec(
+ name: String,
+ substitution: String
+) extends ToJson with Serializable {
+
+ override def toString: String = s"macro: $name, substitution: $substitution"
+}
diff --git a/clickhouse-core/src/main/scala/com/clickhouse/spark/spec/TableEngineUtils.scala b/clickhouse-core/src/main/scala/com/clickhouse/spark/spec/TableEngineUtils.scala
index d609b08b..e8f7caf2 100644
--- a/clickhouse-core/src/main/scala/com/clickhouse/spark/spec/TableEngineUtils.scala
+++ b/clickhouse-core/src/main/scala/com/clickhouse/spark/spec/TableEngineUtils.scala
@@ -29,7 +29,34 @@ object TableEngineUtils extends Logging {
}
}
- def resolveTableCluster(distributedEngineSpec: DistributedEngineSpec, clusterSpecs: Seq[ClusterSpec]): ClusterSpec =
- clusterSpecs.find(_.name == distributedEngineSpec.cluster)
- .getOrElse(throw CHClientException(s"Unknown cluster: ${distributedEngineSpec.cluster}"))
+ def resolveTableCluster(
+ distributedEngineSpec: DistributedEngineSpec,
+ clusterSpecs: Seq[ClusterSpec],
+ macrosSpecs: Seq[MacrosSpec]
+ ): ClusterSpec = {
+ val clusterName = if (distributedEngineSpec.cluster.contains("{")) {
+ val macrosMap = macrosSpecs.map(spec => (spec.name, spec.substitution)).toMap
+
+ var resolvedClusterName = distributedEngineSpec.cluster
+ var startPos = resolvedClusterName.indexOf('{')
+ while (startPos >= 0) {
+ val endPos = resolvedClusterName.indexOf('}', startPos)
+ if (endPos > startPos) {
+ val macroName = resolvedClusterName.substring(startPos + 1, endPos)
+ val substitution = macrosMap.getOrElse(macroName, throw CHClientException(s"Unknown macro: ${macroName}"))
+ resolvedClusterName = resolvedClusterName
+ .substring(0, startPos)
+ .concat(substitution)
+ .concat(resolvedClusterName.substring(endPos + 1))
+ }
+ startPos = resolvedClusterName.indexOf('{')
+ }
+ resolvedClusterName
+ } else {
+ distributedEngineSpec.cluster
+ }
+
+ clusterSpecs.find(_.name == clusterName)
+ .getOrElse(throw CHClientException(s"Unknown cluster: resolved name '${clusterName}' (original: '${distributedEngineSpec.cluster}')"))
+ }
}
diff --git a/clickhouse-core/src/test/scala/com/clickhouse/spark/spec/NodeSpecHelper.scala b/clickhouse-core/src/test/scala/com/clickhouse/spark/spec/NodeSpecHelper.scala
index 302cbcea..011a986f 100644
--- a/clickhouse-core/src/test/scala/com/clickhouse/spark/spec/NodeSpecHelper.scala
+++ b/clickhouse-core/src/test/scala/com/clickhouse/spark/spec/NodeSpecHelper.scala
@@ -16,6 +16,8 @@ package com.clickhouse.spark.spec
trait NodeSpecHelper {
+ val cluster_name = "cluster-s2r2"
+
val node_s1r1: NodeSpec = NodeSpec("s1r1")
val node_s1r2: NodeSpec = NodeSpec("s1r2")
val node_s2r1: NodeSpec = NodeSpec("s2r1")
@@ -40,7 +42,7 @@ trait NodeSpecHelper {
val cluster: ClusterSpec =
ClusterSpec(
- name = "cluster-s2r2",
+ name = cluster_name,
shards = Array(shard_s2, shard_s1) // unsorted
)
}
diff --git a/clickhouse-core/src/test/scala/com/clickhouse/spark/spec/TableEngineUtilsSuite.scala b/clickhouse-core/src/test/scala/com/clickhouse/spark/spec/TableEngineUtilsSuite.scala
new file mode 100644
index 00000000..85214128
--- /dev/null
+++ b/clickhouse-core/src/test/scala/com/clickhouse/spark/spec/TableEngineUtilsSuite.scala
@@ -0,0 +1,20 @@
+package com.clickhouse.spark.spec
+
+import org.scalatest.funsuite.AnyFunSuite
+
+class TableEngineUtilsSuite extends AnyFunSuite with NodeSpecHelper {
+
+ test("test resolve table cluster by macro") {
+ val distributeSpec = DistributedEngineSpec(
+ engine_clause = "Distributed('{cluster}', 'wj_report', 'wj_respondent_local')",
+ cluster = "{cluster}",
+ local_db = "wj_report",
+ local_table = "wj_respondent_local",
+ sharding_key = None
+ )
+ val clusterName = TableEngineUtils
+ .resolveTableCluster(distributeSpec, Seq(cluster), Seq(MacrosSpec("cluster", cluster_name)))
+
+ assert(clusterName.name === cluster_name)
+ }
+}
diff --git a/clickhouse-core/src/testFixtures/conf/clickhouse-cluster/s1r1/macros.xml b/clickhouse-core/src/testFixtures/conf/clickhouse-cluster/s1r1/macros.xml
index 1219e147..57fe725d 100644
--- a/clickhouse-core/src/testFixtures/conf/clickhouse-cluster/s1r1/macros.xml
+++ b/clickhouse-core/src/testFixtures/conf/clickhouse-cluster/s1r1/macros.xml
@@ -16,5 +16,6 @@
1
1
+ single_replica
\ No newline at end of file
diff --git a/clickhouse-core/src/testFixtures/conf/clickhouse-cluster/s1r2/macros.xml b/clickhouse-core/src/testFixtures/conf/clickhouse-cluster/s1r2/macros.xml
index b6a029a0..99fba39d 100644
--- a/clickhouse-core/src/testFixtures/conf/clickhouse-cluster/s1r2/macros.xml
+++ b/clickhouse-core/src/testFixtures/conf/clickhouse-cluster/s1r2/macros.xml
@@ -16,5 +16,6 @@
1
2
+ single_replica
\ No newline at end of file
diff --git a/clickhouse-core/src/testFixtures/conf/clickhouse-cluster/s2r1/macros.xml b/clickhouse-core/src/testFixtures/conf/clickhouse-cluster/s2r1/macros.xml
index 819602e4..53cb017d 100644
--- a/clickhouse-core/src/testFixtures/conf/clickhouse-cluster/s2r1/macros.xml
+++ b/clickhouse-core/src/testFixtures/conf/clickhouse-cluster/s2r1/macros.xml
@@ -16,5 +16,6 @@
2
1
+ single_replica
\ No newline at end of file
diff --git a/clickhouse-core/src/testFixtures/conf/clickhouse-cluster/s2r2/macros.xml b/clickhouse-core/src/testFixtures/conf/clickhouse-cluster/s2r2/macros.xml
index ff46c3c9..27df101c 100644
--- a/clickhouse-core/src/testFixtures/conf/clickhouse-cluster/s2r2/macros.xml
+++ b/clickhouse-core/src/testFixtures/conf/clickhouse-cluster/s2r2/macros.xml
@@ -16,5 +16,6 @@
2
2
+ single_replica
\ No newline at end of file
diff --git a/spark-3.3/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseCatalog.scala b/spark-3.3/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseCatalog.scala
index dea17a61..2d8380c1 100644
--- a/spark-3.3/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseCatalog.scala
+++ b/spark-3.3/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseCatalog.scala
@@ -59,6 +59,7 @@ class ClickHouseCatalog extends TableCatalog
// /////////////////// CLUSTERS //////////////////////
// ///////////////////////////////////////////////////
private var clusterSpecs: Seq[ClusterSpec] = Nil
+ private var macroSpecs: Seq[MacrosSpec] = Nil
private var functionRegistry: FunctionRegistry = _
@@ -82,6 +83,7 @@ class ClickHouseCatalog extends TableCatalog
}
this.clusterSpecs = queryClusterSpecs(nodeSpec)
+ this.macroSpecs = queryMacrosSpec()
val dynamicFunctionRegistry = new DynamicFunctionRegistry
val xxHash64ShardFunc = new ClickHouseXxHash64Shard(clusterSpecs)
@@ -133,7 +135,7 @@ class ClickHouseCatalog extends TableCatalog
val tableEngineSpec = TableEngineUtils.resolveTableEngine(tableSpec)
val tableClusterSpec = tableEngineSpec match {
case distributeSpec: DistributedEngineSpec =>
- Some(TableEngineUtils.resolveTableCluster(distributeSpec, clusterSpecs))
+ Some(TableEngineUtils.resolveTableCluster(distributeSpec, clusterSpecs, macroSpecs))
case _ => None
}
ClickHouseTable(
diff --git a/spark-3.3/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseHelper.scala b/spark-3.3/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseHelper.scala
index 6495d62c..d73572fa 100644
--- a/spark-3.3/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseHelper.scala
+++ b/spark-3.3/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseHelper.scala
@@ -111,6 +111,23 @@ trait ClickHouseHelper extends Logging {
}.toSeq
}
+ def queryMacrosSpec()(implicit nodeClient: NodeClient): Seq[MacrosSpec] = {
+ val macrosOutput = nodeClient.syncQueryAndCheckOutputJSONEachRow(
+ """ SELECT
+ | `macro`, -- String
+ | `substitution` -- String
+ | FROM `system`.`macros`
+ |""".stripMargin
+ )
+
+ macrosOutput.records
+ .map { row =>
+ val name = row.get("macro").asText
+ val substitution = row.get("substitution").asText
+ MacrosSpec(name, substitution)
+ }
+ }
+
def queryDatabaseSpec(
database: String,
actionIfNoSuchDatabase: String => Unit = DEFAULT_ACTION_IF_NO_SUCH_DATABASE
diff --git a/spark-3.4/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseCatalog.scala b/spark-3.4/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseCatalog.scala
index 51c2ecb8..7a81d84e 100644
--- a/spark-3.4/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseCatalog.scala
+++ b/spark-3.4/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseCatalog.scala
@@ -59,6 +59,7 @@ class ClickHouseCatalog extends TableCatalog
// /////////////////// CLUSTERS //////////////////////
// ///////////////////////////////////////////////////
private var clusterSpecs: Seq[ClusterSpec] = Nil
+ private var macroSpecs: Seq[MacrosSpec] = Nil
private var functionRegistry: FunctionRegistry = _
@@ -82,6 +83,7 @@ class ClickHouseCatalog extends TableCatalog
}
this.clusterSpecs = queryClusterSpecs(nodeSpec)
+ this.macroSpecs = queryMacrosSpec()
val dynamicFunctionRegistry = new DynamicFunctionRegistry
val xxHash64ShardFunc = new ClickHouseXxHash64Shard(clusterSpecs)
@@ -134,7 +136,7 @@ class ClickHouseCatalog extends TableCatalog
val tableEngineSpec = TableEngineUtils.resolveTableEngine(tableSpec)
val tableClusterSpec = tableEngineSpec match {
case distributeSpec: DistributedEngineSpec =>
- Some(TableEngineUtils.resolveTableCluster(distributeSpec, clusterSpecs))
+ Some(TableEngineUtils.resolveTableCluster(distributeSpec, clusterSpecs, macroSpecs))
case _ => None
}
ClickHouseTable(
diff --git a/spark-3.4/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseHelper.scala b/spark-3.4/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseHelper.scala
index 68a322fa..e0bf8a97 100644
--- a/spark-3.4/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseHelper.scala
+++ b/spark-3.4/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseHelper.scala
@@ -112,6 +112,23 @@ trait ClickHouseHelper extends Logging {
}.toSeq
}
+ def queryMacrosSpec()(implicit nodeClient: NodeClient): Seq[MacrosSpec] = {
+ val macrosOutput = nodeClient.syncQueryAndCheckOutputJSONEachRow(
+ """ SELECT
+ | `macro`, -- String
+ | `substitution` -- String
+ | FROM `system`.`macros`
+ |""".stripMargin
+ )
+
+ macrosOutput.records
+ .map { row =>
+ val name = row.get("macro").asText
+ val substitution = row.get("substitution").asText
+ MacrosSpec(name, substitution)
+ }
+ }
+
def queryDatabaseSpec(
database: String,
actionIfNoSuchDatabase: String => Unit = DEFAULT_ACTION_IF_NO_SUCH_DATABASE
diff --git a/spark-3.5/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/ClickHouseClusterReadSuite.scala b/spark-3.5/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/ClickHouseClusterReadSuite.scala
index 44fe1ff2..834b6bba 100644
--- a/spark-3.5/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/ClickHouseClusterReadSuite.scala
+++ b/spark-3.5/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/ClickHouseClusterReadSuite.scala
@@ -86,6 +86,45 @@ class ClickHouseClusterReadSuite extends SparkClickHouseClusterTest {
}
}
+ test("push down aggregation - distributed table with cluster macros") {
+ withSimpleDistTableUsingMacro("{cluster}", "single_replica", "db_agg_col", "t_dist", true) { (_, db, tbl_dist, _) =>
+ checkAnswer(
+ spark.sql(s"SELECT COUNT(id) FROM $db.$tbl_dist"),
+ Seq(Row(4))
+ )
+
+ checkAnswer(
+ spark.sql(s"SELECT MIN(id) FROM $db.$tbl_dist"),
+ Seq(Row(1))
+ )
+
+ checkAnswer(
+ spark.sql(s"SELECT MAX(id) FROM $db.$tbl_dist"),
+ Seq(Row(4))
+ )
+
+ checkAnswer(
+ spark.sql(s"SELECT m, COUNT(DISTINCT id) FROM $db.$tbl_dist GROUP BY m"),
+ Seq(
+ Row(1, 1),
+ Row(2, 1),
+ Row(3, 1),
+ Row(4, 1)
+ )
+ )
+
+ checkAnswer(
+ spark.sql(s"SELECT m, SUM(DISTINCT id) FROM $db.$tbl_dist GROUP BY m"),
+ Seq(
+ Row(1, 1),
+ Row(2, 2),
+ Row(3, 3),
+ Row(4, 4)
+ )
+ )
+ }
+ }
+
test("runtime filter - distributed table") {
withSimpleDistTable("single_replica", "runtime_db", "runtime_tbl", true) { (_, db, tbl_dist, _) =>
spark.sql("set spark.clickhouse.read.runtimeFilter.enabled=false")
diff --git a/spark-3.5/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/SparkClickHouseClusterTest.scala b/spark-3.5/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/SparkClickHouseClusterTest.scala
index c8de2044..fba80e06 100644
--- a/spark-3.5/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/SparkClickHouseClusterTest.scala
+++ b/spark-3.5/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/SparkClickHouseClusterTest.scala
@@ -99,6 +99,52 @@ trait SparkClickHouseClusterTest extends SparkTest with ClickHouseClusterMixIn {
}
}
+ def withSimpleDistTableUsingMacro(
+ cluster_macro: String,
+ actual_cluster: String,
+ db: String,
+ tbl_dist: String,
+ writeData: Boolean = false
+ )(f: (String, String, String, String) => Unit): Unit =
+ autoCleanupDistTable(actual_cluster, db, tbl_dist) { (cluster, db, tbl_dist, tbl_local) =>
+ runClickHouseSQL(
+ s"""CREATE TABLE IF NOT EXISTS $db.$tbl_local ON CLUSTER '$cluster_macro' (
+ | create_time DateTime64(3),
+ | y int COMMENT 'shard key',
+ | m int COMMENT 'part key',
+ | id Int64 COMMENT 'sort key',
+ | value Nullable(String)
+ |) engine MergeTree()
+ | PARTITION BY (m)
+ | ORDER BY (id)
+ | SETTINGS index_granularity = 8192;
+ |""".stripMargin
+ )
+ runClickHouseSQL(
+ s"""CREATE TABLE IF NOT EXISTS $db.$tbl_dist ON CLUSTER '$cluster_macro' (
+ | create_time DateTime64(3),
+ | y int COMMENT 'shard key',
+ | m int COMMENT 'part key',
+ | id Int64 COMMENT 'sort key',
+ | value Nullable(String)
+ |) engine Distributed('$cluster_macro', '$db', '$tbl_local', y)
+ |""".stripMargin
+ )
+ if (writeData) {
+
+ runClickHouseSQL(
+ s"""INSERT INTO $db.$tbl_dist (create_time, y, m, id, value)
+ | VALUES
+ |('2021-01-01 10:10:10', 2021, 1, 1, '1'),
+ |('2022-02-02 10:10:10', 2022, 2, 2, '2'),
+ |('2023-03-03 10:10:10', 2023, 3, 3, '3'),
+ |('2024-04-04 10:10:10', 2024, 4, 4, '4');
+ |""".stripMargin
+ )
+ }
+ f(cluster, db, tbl_dist, tbl_local)
+ }
+
def withSimpleDistTable(
cluster: String,
db: String,
diff --git a/spark-3.5/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseCatalog.scala b/spark-3.5/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseCatalog.scala
index d8437b59..3ef01f7f 100644
--- a/spark-3.5/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseCatalog.scala
+++ b/spark-3.5/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseCatalog.scala
@@ -67,6 +67,7 @@ class ClickHouseCatalog extends TableCatalog
// /////////////////// CLUSTERS //////////////////////
// ///////////////////////////////////////////////////
private var clusterSpecs: Seq[ClusterSpec] = Nil
+ private var macroSpecs: Seq[MacrosSpec] = Nil
private var functionRegistry: FunctionRegistry = _
@@ -90,6 +91,7 @@ class ClickHouseCatalog extends TableCatalog
}
this.clusterSpecs = queryClusterSpecs(nodeSpec)
+ this.macroSpecs = queryMacrosSpec()
val dynamicFunctionRegistry = new DynamicFunctionRegistry
val xxHash64ShardFunc = new ClickHouseXxHash64Shard(clusterSpecs)
@@ -142,7 +144,7 @@ class ClickHouseCatalog extends TableCatalog
val tableEngineSpec = TableEngineUtils.resolveTableEngine(tableSpec)
val tableClusterSpec = tableEngineSpec match {
case distributeSpec: DistributedEngineSpec =>
- Some(TableEngineUtils.resolveTableCluster(distributeSpec, clusterSpecs))
+ Some(TableEngineUtils.resolveTableCluster(distributeSpec, clusterSpecs, macroSpecs))
case _ => None
}
ClickHouseTable(
diff --git a/spark-3.5/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseHelper.scala b/spark-3.5/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseHelper.scala
index f859b117..69956612 100644
--- a/spark-3.5/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseHelper.scala
+++ b/spark-3.5/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseHelper.scala
@@ -80,6 +80,23 @@ trait ClickHouseHelper extends Logging {
)
}
+ def queryMacrosSpec()(implicit nodeClient: NodeClient): Seq[MacrosSpec] = {
+ val macrosOutput = nodeClient.syncQueryAndCheckOutputJSONEachRow(
+ """ SELECT
+ | `macro`, -- String
+ | `substitution` -- String
+ | FROM `system`.`macros`
+ |""".stripMargin
+ )
+
+ macrosOutput.records
+ .map { row =>
+ val name = row.get("macro").asText
+ val substitution = row.get("substitution").asText
+ MacrosSpec(name, substitution)
+ }
+ }
+
def queryClusterSpecs(nodeSpec: NodeSpec)(implicit nodeClient: NodeClient): Seq[ClusterSpec] = {
val clustersOutput = nodeClient.syncQueryAndCheckOutputJSONEachRow(
""" SELECT