From c4a507b068e665a73d4dc2a8c5bc4001851c8386 Mon Sep 17 00:00:00 2001 From: Minkin Aleksei Date: Fri, 25 Jul 2025 16:28:04 +0200 Subject: [PATCH 1/4] allow macros in cluster name in Distributed engine --- .../clickhouse/spark/spec/MacrosSpec.scala | 11 +++++ .../spark/spec/TableEngineUtils.scala | 31 ++++++++++++- .../spark/spec/NodeSpecHelper.scala | 4 +- .../spark/spec/TableEngineUtilsSuite.scala | 20 ++++++++ .../conf/clickhouse-cluster/s1r1/macros.xml | 1 + .../conf/clickhouse-cluster/s1r2/macros.xml | 1 + .../conf/clickhouse-cluster/s2r1/macros.xml | 1 + .../conf/clickhouse-cluster/s2r2/macros.xml | 1 + .../cluster/ClickHouseClusterReadSuite.scala | 39 ++++++++++++++++ .../cluster/SparkClickHouseClusterTest.scala | 46 +++++++++++++++++++ .../clickhouse/spark/ClickHouseCatalog.scala | 4 +- .../clickhouse/spark/ClickHouseHelper.scala | 17 +++++++ 12 files changed, 172 insertions(+), 4 deletions(-) create mode 100644 clickhouse-core/src/main/scala/com/clickhouse/spark/spec/MacrosSpec.scala create mode 100644 clickhouse-core/src/test/scala/com/clickhouse/spark/spec/TableEngineUtilsSuite.scala diff --git a/clickhouse-core/src/main/scala/com/clickhouse/spark/spec/MacrosSpec.scala b/clickhouse-core/src/main/scala/com/clickhouse/spark/spec/MacrosSpec.scala new file mode 100644 index 00000000..03c5b208 --- /dev/null +++ b/clickhouse-core/src/main/scala/com/clickhouse/spark/spec/MacrosSpec.scala @@ -0,0 +1,11 @@ +package com.clickhouse.spark.spec + +import com.clickhouse.spark.ToJson + +case class MacrosSpec( + name: String, + substitution: String +) extends ToJson with Serializable { + + override def toString: String = s"macro: $name, substitution: $substitution" +} diff --git a/clickhouse-core/src/main/scala/com/clickhouse/spark/spec/TableEngineUtils.scala b/clickhouse-core/src/main/scala/com/clickhouse/spark/spec/TableEngineUtils.scala index d609b08b..e1c02c98 100644 --- a/clickhouse-core/src/main/scala/com/clickhouse/spark/spec/TableEngineUtils.scala +++ b/clickhouse-core/src/main/scala/com/clickhouse/spark/spec/TableEngineUtils.scala @@ -29,7 +29,34 @@ object TableEngineUtils extends Logging { } } - def resolveTableCluster(distributedEngineSpec: DistributedEngineSpec, clusterSpecs: Seq[ClusterSpec]): ClusterSpec = - clusterSpecs.find(_.name == distributedEngineSpec.cluster) + def resolveTableCluster( + distributedEngineSpec: DistributedEngineSpec, + clusterSpecs: Seq[ClusterSpec], + macrosSpecs: Seq[MacrosSpec] + ): ClusterSpec = { + val clusterName = if (distributedEngineSpec.cluster.contains("{")) { + val macrosMap = macrosSpecs.map(spec => (spec.name, spec.substitution)).toMap + + var clusterName = distributedEngineSpec.cluster + var startPos = clusterName.indexOf('{') + while (startPos >= 0) { + val endPos = clusterName.indexOf('}', startPos) + if (endPos > startPos) { + val macroName = clusterName.substring(startPos + 1, endPos) + val substitution = macrosMap.getOrElse(macroName, throw CHClientException(s"Unknown macro: ${macroName}")) + clusterName = clusterName + .substring(0, startPos) + .concat(substitution) + .concat(clusterName.substring(endPos + 1)) + } + startPos = clusterName.indexOf('{') + } + clusterName + } else { + distributedEngineSpec.cluster + } + + clusterSpecs.find(_.name == clusterName) .getOrElse(throw CHClientException(s"Unknown cluster: ${distributedEngineSpec.cluster}")) + } } diff --git a/clickhouse-core/src/test/scala/com/clickhouse/spark/spec/NodeSpecHelper.scala b/clickhouse-core/src/test/scala/com/clickhouse/spark/spec/NodeSpecHelper.scala index 302cbcea..011a986f 100644 --- a/clickhouse-core/src/test/scala/com/clickhouse/spark/spec/NodeSpecHelper.scala +++ b/clickhouse-core/src/test/scala/com/clickhouse/spark/spec/NodeSpecHelper.scala @@ -16,6 +16,8 @@ package com.clickhouse.spark.spec trait NodeSpecHelper { + val cluster_name = "cluster-s2r2" + val node_s1r1: NodeSpec = NodeSpec("s1r1") val node_s1r2: NodeSpec = NodeSpec("s1r2") val node_s2r1: NodeSpec = NodeSpec("s2r1") @@ -40,7 +42,7 @@ trait NodeSpecHelper { val cluster: ClusterSpec = ClusterSpec( - name = "cluster-s2r2", + name = cluster_name, shards = Array(shard_s2, shard_s1) // unsorted ) } diff --git a/clickhouse-core/src/test/scala/com/clickhouse/spark/spec/TableEngineUtilsSuite.scala b/clickhouse-core/src/test/scala/com/clickhouse/spark/spec/TableEngineUtilsSuite.scala new file mode 100644 index 00000000..85214128 --- /dev/null +++ b/clickhouse-core/src/test/scala/com/clickhouse/spark/spec/TableEngineUtilsSuite.scala @@ -0,0 +1,20 @@ +package com.clickhouse.spark.spec + +import org.scalatest.funsuite.AnyFunSuite + +class TableEngineUtilsSuite extends AnyFunSuite with NodeSpecHelper { + + test("test resolve table cluster by macro") { + val distributeSpec = DistributedEngineSpec( + engine_clause = "Distributed('{cluster}', 'wj_report', 'wj_respondent_local')", + cluster = "{cluster}", + local_db = "wj_report", + local_table = "wj_respondent_local", + sharding_key = None + ) + val clusterName = TableEngineUtils + .resolveTableCluster(distributeSpec, Seq(cluster), Seq(MacrosSpec("cluster", cluster_name))) + + assert(clusterName.name === cluster_name) + } +} diff --git a/clickhouse-core/src/testFixtures/conf/clickhouse-cluster/s1r1/macros.xml b/clickhouse-core/src/testFixtures/conf/clickhouse-cluster/s1r1/macros.xml index 1219e147..57fe725d 100644 --- a/clickhouse-core/src/testFixtures/conf/clickhouse-cluster/s1r1/macros.xml +++ b/clickhouse-core/src/testFixtures/conf/clickhouse-cluster/s1r1/macros.xml @@ -16,5 +16,6 @@ 1 1 + single_replica \ No newline at end of file diff --git a/clickhouse-core/src/testFixtures/conf/clickhouse-cluster/s1r2/macros.xml b/clickhouse-core/src/testFixtures/conf/clickhouse-cluster/s1r2/macros.xml index b6a029a0..99fba39d 100644 --- a/clickhouse-core/src/testFixtures/conf/clickhouse-cluster/s1r2/macros.xml +++ b/clickhouse-core/src/testFixtures/conf/clickhouse-cluster/s1r2/macros.xml @@ -16,5 +16,6 @@ 1 2 + single_replica \ No newline at end of file diff --git a/clickhouse-core/src/testFixtures/conf/clickhouse-cluster/s2r1/macros.xml b/clickhouse-core/src/testFixtures/conf/clickhouse-cluster/s2r1/macros.xml index 819602e4..53cb017d 100644 --- a/clickhouse-core/src/testFixtures/conf/clickhouse-cluster/s2r1/macros.xml +++ b/clickhouse-core/src/testFixtures/conf/clickhouse-cluster/s2r1/macros.xml @@ -16,5 +16,6 @@ 2 1 + single_replica \ No newline at end of file diff --git a/clickhouse-core/src/testFixtures/conf/clickhouse-cluster/s2r2/macros.xml b/clickhouse-core/src/testFixtures/conf/clickhouse-cluster/s2r2/macros.xml index ff46c3c9..27df101c 100644 --- a/clickhouse-core/src/testFixtures/conf/clickhouse-cluster/s2r2/macros.xml +++ b/clickhouse-core/src/testFixtures/conf/clickhouse-cluster/s2r2/macros.xml @@ -16,5 +16,6 @@ 2 2 + single_replica \ No newline at end of file diff --git a/spark-3.5/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/ClickHouseClusterReadSuite.scala b/spark-3.5/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/ClickHouseClusterReadSuite.scala index 44fe1ff2..834b6bba 100644 --- a/spark-3.5/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/ClickHouseClusterReadSuite.scala +++ b/spark-3.5/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/ClickHouseClusterReadSuite.scala @@ -86,6 +86,45 @@ class ClickHouseClusterReadSuite extends SparkClickHouseClusterTest { } } + test("push down aggregation - distributed table with cluster macros") { + withSimpleDistTableUsingMacro("{cluster}", "single_replica", "db_agg_col", "t_dist", true) { (_, db, tbl_dist, _) => + checkAnswer( + spark.sql(s"SELECT COUNT(id) FROM $db.$tbl_dist"), + Seq(Row(4)) + ) + + checkAnswer( + spark.sql(s"SELECT MIN(id) FROM $db.$tbl_dist"), + Seq(Row(1)) + ) + + checkAnswer( + spark.sql(s"SELECT MAX(id) FROM $db.$tbl_dist"), + Seq(Row(4)) + ) + + checkAnswer( + spark.sql(s"SELECT m, COUNT(DISTINCT id) FROM $db.$tbl_dist GROUP BY m"), + Seq( + Row(1, 1), + Row(2, 1), + Row(3, 1), + Row(4, 1) + ) + ) + + checkAnswer( + spark.sql(s"SELECT m, SUM(DISTINCT id) FROM $db.$tbl_dist GROUP BY m"), + Seq( + Row(1, 1), + Row(2, 2), + Row(3, 3), + Row(4, 4) + ) + ) + } + } + test("runtime filter - distributed table") { withSimpleDistTable("single_replica", "runtime_db", "runtime_tbl", true) { (_, db, tbl_dist, _) => spark.sql("set spark.clickhouse.read.runtimeFilter.enabled=false") diff --git a/spark-3.5/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/SparkClickHouseClusterTest.scala b/spark-3.5/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/SparkClickHouseClusterTest.scala index c8de2044..fba80e06 100644 --- a/spark-3.5/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/SparkClickHouseClusterTest.scala +++ b/spark-3.5/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/SparkClickHouseClusterTest.scala @@ -99,6 +99,52 @@ trait SparkClickHouseClusterTest extends SparkTest with ClickHouseClusterMixIn { } } + def withSimpleDistTableUsingMacro( + cluster_macro: String, + actual_cluster: String, + db: String, + tbl_dist: String, + writeData: Boolean = false + )(f: (String, String, String, String) => Unit): Unit = + autoCleanupDistTable(actual_cluster, db, tbl_dist) { (cluster, db, tbl_dist, tbl_local) => + runClickHouseSQL( + s"""CREATE TABLE IF NOT EXISTS $db.$tbl_local ON CLUSTER '$cluster_macro' ( + | create_time DateTime64(3), + | y int COMMENT 'shard key', + | m int COMMENT 'part key', + | id Int64 COMMENT 'sort key', + | value Nullable(String) + |) engine MergeTree() + | PARTITION BY (m) + | ORDER BY (id) + | SETTINGS index_granularity = 8192; + |""".stripMargin + ) + runClickHouseSQL( + s"""CREATE TABLE IF NOT EXISTS $db.$tbl_dist ON CLUSTER '$cluster_macro' ( + | create_time DateTime64(3), + | y int COMMENT 'shard key', + | m int COMMENT 'part key', + | id Int64 COMMENT 'sort key', + | value Nullable(String) + |) engine Distributed('$cluster_macro', '$db', '$tbl_local', y) + |""".stripMargin + ) + if (writeData) { + + runClickHouseSQL( + s"""INSERT INTO $db.$tbl_dist (create_time, y, m, id, value) + | VALUES + |('2021-01-01 10:10:10', 2021, 1, 1, '1'), + |('2022-02-02 10:10:10', 2022, 2, 2, '2'), + |('2023-03-03 10:10:10', 2023, 3, 3, '3'), + |('2024-04-04 10:10:10', 2024, 4, 4, '4'); + |""".stripMargin + ) + } + f(cluster, db, tbl_dist, tbl_local) + } + def withSimpleDistTable( cluster: String, db: String, diff --git a/spark-3.5/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseCatalog.scala b/spark-3.5/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseCatalog.scala index d8437b59..3ef01f7f 100644 --- a/spark-3.5/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseCatalog.scala +++ b/spark-3.5/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseCatalog.scala @@ -67,6 +67,7 @@ class ClickHouseCatalog extends TableCatalog // /////////////////// CLUSTERS ////////////////////// // /////////////////////////////////////////////////// private var clusterSpecs: Seq[ClusterSpec] = Nil + private var macroSpecs: Seq[MacrosSpec] = Nil private var functionRegistry: FunctionRegistry = _ @@ -90,6 +91,7 @@ class ClickHouseCatalog extends TableCatalog } this.clusterSpecs = queryClusterSpecs(nodeSpec) + this.macroSpecs = queryMacrosSpec() val dynamicFunctionRegistry = new DynamicFunctionRegistry val xxHash64ShardFunc = new ClickHouseXxHash64Shard(clusterSpecs) @@ -142,7 +144,7 @@ class ClickHouseCatalog extends TableCatalog val tableEngineSpec = TableEngineUtils.resolveTableEngine(tableSpec) val tableClusterSpec = tableEngineSpec match { case distributeSpec: DistributedEngineSpec => - Some(TableEngineUtils.resolveTableCluster(distributeSpec, clusterSpecs)) + Some(TableEngineUtils.resolveTableCluster(distributeSpec, clusterSpecs, macroSpecs)) case _ => None } ClickHouseTable( diff --git a/spark-3.5/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseHelper.scala b/spark-3.5/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseHelper.scala index f859b117..69956612 100644 --- a/spark-3.5/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseHelper.scala +++ b/spark-3.5/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseHelper.scala @@ -80,6 +80,23 @@ trait ClickHouseHelper extends Logging { ) } + def queryMacrosSpec()(implicit nodeClient: NodeClient): Seq[MacrosSpec] = { + val macrosOutput = nodeClient.syncQueryAndCheckOutputJSONEachRow( + """ SELECT + | `macro`, -- String + | `substitution` -- String + | FROM `system`.`macros` + |""".stripMargin + ) + + macrosOutput.records + .map { row => + val name = row.get("macro").asText + val substitution = row.get("substitution").asText + MacrosSpec(name, substitution) + } + } + def queryClusterSpecs(nodeSpec: NodeSpec)(implicit nodeClient: NodeClient): Seq[ClusterSpec] = { val clustersOutput = nodeClient.syncQueryAndCheckOutputJSONEachRow( """ SELECT From 9a7532e29c3500188ed73f2c30912600d379f577 Mon Sep 17 00:00:00 2001 From: Minkin Aleksei Date: Thu, 31 Jul 2025 15:22:04 +0200 Subject: [PATCH 2/4] Update clickhouse-core/src/main/scala/com/clickhouse/spark/spec/TableEngineUtils.scala Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../main/scala/com/clickhouse/spark/spec/TableEngineUtils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clickhouse-core/src/main/scala/com/clickhouse/spark/spec/TableEngineUtils.scala b/clickhouse-core/src/main/scala/com/clickhouse/spark/spec/TableEngineUtils.scala index e1c02c98..6ae35993 100644 --- a/clickhouse-core/src/main/scala/com/clickhouse/spark/spec/TableEngineUtils.scala +++ b/clickhouse-core/src/main/scala/com/clickhouse/spark/spec/TableEngineUtils.scala @@ -57,6 +57,6 @@ object TableEngineUtils extends Logging { } clusterSpecs.find(_.name == clusterName) - .getOrElse(throw CHClientException(s"Unknown cluster: ${distributedEngineSpec.cluster}")) + .getOrElse(throw CHClientException(s"Unknown cluster: resolved name '${clusterName}' (original: '${distributedEngineSpec.cluster}')")) } } From 794703edd1b571a5850d60dcd573027a3347ef6d Mon Sep 17 00:00:00 2001 From: Minkin Aleksei Date: Thu, 31 Jul 2025 17:01:05 +0200 Subject: [PATCH 3/4] Update clickhouse-core/src/main/scala/com/clickhouse/spark/spec/TableEngineUtils.scala Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../clickhouse/spark/spec/TableEngineUtils.scala | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/clickhouse-core/src/main/scala/com/clickhouse/spark/spec/TableEngineUtils.scala b/clickhouse-core/src/main/scala/com/clickhouse/spark/spec/TableEngineUtils.scala index 6ae35993..e8f7caf2 100644 --- a/clickhouse-core/src/main/scala/com/clickhouse/spark/spec/TableEngineUtils.scala +++ b/clickhouse-core/src/main/scala/com/clickhouse/spark/spec/TableEngineUtils.scala @@ -37,21 +37,21 @@ object TableEngineUtils extends Logging { val clusterName = if (distributedEngineSpec.cluster.contains("{")) { val macrosMap = macrosSpecs.map(spec => (spec.name, spec.substitution)).toMap - var clusterName = distributedEngineSpec.cluster - var startPos = clusterName.indexOf('{') + var resolvedClusterName = distributedEngineSpec.cluster + var startPos = resolvedClusterName.indexOf('{') while (startPos >= 0) { - val endPos = clusterName.indexOf('}', startPos) + val endPos = resolvedClusterName.indexOf('}', startPos) if (endPos > startPos) { - val macroName = clusterName.substring(startPos + 1, endPos) + val macroName = resolvedClusterName.substring(startPos + 1, endPos) val substitution = macrosMap.getOrElse(macroName, throw CHClientException(s"Unknown macro: ${macroName}")) - clusterName = clusterName + resolvedClusterName = resolvedClusterName .substring(0, startPos) .concat(substitution) - .concat(clusterName.substring(endPos + 1)) + .concat(resolvedClusterName.substring(endPos + 1)) } - startPos = clusterName.indexOf('{') + startPos = resolvedClusterName.indexOf('{') } - clusterName + resolvedClusterName } else { distributedEngineSpec.cluster } From b1f34f54afbc6d977e53b7e99bdc9536496e0994 Mon Sep 17 00:00:00 2001 From: Minkin Aleksei Date: Thu, 21 Aug 2025 17:26:56 +0200 Subject: [PATCH 4/4] implemnt macros resolution for 3.3 and 3.4 spark --- .../clickhouse/spark/ClickHouseCatalog.scala | 4 +++- .../com/clickhouse/spark/ClickHouseHelper.scala | 17 +++++++++++++++++ .../clickhouse/spark/ClickHouseCatalog.scala | 4 +++- .../com/clickhouse/spark/ClickHouseHelper.scala | 17 +++++++++++++++++ 4 files changed, 40 insertions(+), 2 deletions(-) diff --git a/spark-3.3/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseCatalog.scala b/spark-3.3/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseCatalog.scala index dea17a61..2d8380c1 100644 --- a/spark-3.3/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseCatalog.scala +++ b/spark-3.3/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseCatalog.scala @@ -59,6 +59,7 @@ class ClickHouseCatalog extends TableCatalog // /////////////////// CLUSTERS ////////////////////// // /////////////////////////////////////////////////// private var clusterSpecs: Seq[ClusterSpec] = Nil + private var macroSpecs: Seq[MacrosSpec] = Nil private var functionRegistry: FunctionRegistry = _ @@ -82,6 +83,7 @@ class ClickHouseCatalog extends TableCatalog } this.clusterSpecs = queryClusterSpecs(nodeSpec) + this.macroSpecs = queryMacrosSpec() val dynamicFunctionRegistry = new DynamicFunctionRegistry val xxHash64ShardFunc = new ClickHouseXxHash64Shard(clusterSpecs) @@ -133,7 +135,7 @@ class ClickHouseCatalog extends TableCatalog val tableEngineSpec = TableEngineUtils.resolveTableEngine(tableSpec) val tableClusterSpec = tableEngineSpec match { case distributeSpec: DistributedEngineSpec => - Some(TableEngineUtils.resolveTableCluster(distributeSpec, clusterSpecs)) + Some(TableEngineUtils.resolveTableCluster(distributeSpec, clusterSpecs, macroSpecs)) case _ => None } ClickHouseTable( diff --git a/spark-3.3/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseHelper.scala b/spark-3.3/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseHelper.scala index 6495d62c..d73572fa 100644 --- a/spark-3.3/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseHelper.scala +++ b/spark-3.3/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseHelper.scala @@ -111,6 +111,23 @@ trait ClickHouseHelper extends Logging { }.toSeq } + def queryMacrosSpec()(implicit nodeClient: NodeClient): Seq[MacrosSpec] = { + val macrosOutput = nodeClient.syncQueryAndCheckOutputJSONEachRow( + """ SELECT + | `macro`, -- String + | `substitution` -- String + | FROM `system`.`macros` + |""".stripMargin + ) + + macrosOutput.records + .map { row => + val name = row.get("macro").asText + val substitution = row.get("substitution").asText + MacrosSpec(name, substitution) + } + } + def queryDatabaseSpec( database: String, actionIfNoSuchDatabase: String => Unit = DEFAULT_ACTION_IF_NO_SUCH_DATABASE diff --git a/spark-3.4/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseCatalog.scala b/spark-3.4/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseCatalog.scala index 51c2ecb8..7a81d84e 100644 --- a/spark-3.4/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseCatalog.scala +++ b/spark-3.4/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseCatalog.scala @@ -59,6 +59,7 @@ class ClickHouseCatalog extends TableCatalog // /////////////////// CLUSTERS ////////////////////// // /////////////////////////////////////////////////// private var clusterSpecs: Seq[ClusterSpec] = Nil + private var macroSpecs: Seq[MacrosSpec] = Nil private var functionRegistry: FunctionRegistry = _ @@ -82,6 +83,7 @@ class ClickHouseCatalog extends TableCatalog } this.clusterSpecs = queryClusterSpecs(nodeSpec) + this.macroSpecs = queryMacrosSpec() val dynamicFunctionRegistry = new DynamicFunctionRegistry val xxHash64ShardFunc = new ClickHouseXxHash64Shard(clusterSpecs) @@ -134,7 +136,7 @@ class ClickHouseCatalog extends TableCatalog val tableEngineSpec = TableEngineUtils.resolveTableEngine(tableSpec) val tableClusterSpec = tableEngineSpec match { case distributeSpec: DistributedEngineSpec => - Some(TableEngineUtils.resolveTableCluster(distributeSpec, clusterSpecs)) + Some(TableEngineUtils.resolveTableCluster(distributeSpec, clusterSpecs, macroSpecs)) case _ => None } ClickHouseTable( diff --git a/spark-3.4/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseHelper.scala b/spark-3.4/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseHelper.scala index 68a322fa..e0bf8a97 100644 --- a/spark-3.4/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseHelper.scala +++ b/spark-3.4/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseHelper.scala @@ -112,6 +112,23 @@ trait ClickHouseHelper extends Logging { }.toSeq } + def queryMacrosSpec()(implicit nodeClient: NodeClient): Seq[MacrosSpec] = { + val macrosOutput = nodeClient.syncQueryAndCheckOutputJSONEachRow( + """ SELECT + | `macro`, -- String + | `substitution` -- String + | FROM `system`.`macros` + |""".stripMargin + ) + + macrosOutput.records + .map { row => + val name = row.get("macro").asText + val substitution = row.get("substitution").asText + MacrosSpec(name, substitution) + } + } + def queryDatabaseSpec( database: String, actionIfNoSuchDatabase: String => Unit = DEFAULT_ACTION_IF_NO_SUCH_DATABASE