diff --git a/clickhouse-core/src/main/scala/com/clickhouse/spark/spec/MacrosSpec.scala b/clickhouse-core/src/main/scala/com/clickhouse/spark/spec/MacrosSpec.scala new file mode 100644 index 00000000..03c5b208 --- /dev/null +++ b/clickhouse-core/src/main/scala/com/clickhouse/spark/spec/MacrosSpec.scala @@ -0,0 +1,11 @@ +package com.clickhouse.spark.spec + +import com.clickhouse.spark.ToJson + +case class MacrosSpec( + name: String, + substitution: String +) extends ToJson with Serializable { + + override def toString: String = s"macro: $name, substitution: $substitution" +} diff --git a/clickhouse-core/src/main/scala/com/clickhouse/spark/spec/TableEngineUtils.scala b/clickhouse-core/src/main/scala/com/clickhouse/spark/spec/TableEngineUtils.scala index d609b08b..e8f7caf2 100644 --- a/clickhouse-core/src/main/scala/com/clickhouse/spark/spec/TableEngineUtils.scala +++ b/clickhouse-core/src/main/scala/com/clickhouse/spark/spec/TableEngineUtils.scala @@ -29,7 +29,34 @@ object TableEngineUtils extends Logging { } } - def resolveTableCluster(distributedEngineSpec: DistributedEngineSpec, clusterSpecs: Seq[ClusterSpec]): ClusterSpec = - clusterSpecs.find(_.name == distributedEngineSpec.cluster) - .getOrElse(throw CHClientException(s"Unknown cluster: ${distributedEngineSpec.cluster}")) + def resolveTableCluster( + distributedEngineSpec: DistributedEngineSpec, + clusterSpecs: Seq[ClusterSpec], + macrosSpecs: Seq[MacrosSpec] + ): ClusterSpec = { + val clusterName = if (distributedEngineSpec.cluster.contains("{")) { + val macrosMap = macrosSpecs.map(spec => (spec.name, spec.substitution)).toMap + + var resolvedClusterName = distributedEngineSpec.cluster + var startPos = resolvedClusterName.indexOf('{') + while (startPos >= 0) { + val endPos = resolvedClusterName.indexOf('}', startPos) + if (endPos > startPos) { + val macroName = resolvedClusterName.substring(startPos + 1, endPos) + val substitution = macrosMap.getOrElse(macroName, throw CHClientException(s"Unknown macro: ${macroName}")) + resolvedClusterName = resolvedClusterName + .substring(0, startPos) + .concat(substitution) + .concat(resolvedClusterName.substring(endPos + 1)) + } + startPos = resolvedClusterName.indexOf('{') + } + resolvedClusterName + } else { + distributedEngineSpec.cluster + } + + clusterSpecs.find(_.name == clusterName) + .getOrElse(throw CHClientException(s"Unknown cluster: resolved name '${clusterName}' (original: '${distributedEngineSpec.cluster}')")) + } } diff --git a/clickhouse-core/src/test/scala/com/clickhouse/spark/spec/NodeSpecHelper.scala b/clickhouse-core/src/test/scala/com/clickhouse/spark/spec/NodeSpecHelper.scala index 302cbcea..011a986f 100644 --- a/clickhouse-core/src/test/scala/com/clickhouse/spark/spec/NodeSpecHelper.scala +++ b/clickhouse-core/src/test/scala/com/clickhouse/spark/spec/NodeSpecHelper.scala @@ -16,6 +16,8 @@ package com.clickhouse.spark.spec trait NodeSpecHelper { + val cluster_name = "cluster-s2r2" + val node_s1r1: NodeSpec = NodeSpec("s1r1") val node_s1r2: NodeSpec = NodeSpec("s1r2") val node_s2r1: NodeSpec = NodeSpec("s2r1") @@ -40,7 +42,7 @@ trait NodeSpecHelper { val cluster: ClusterSpec = ClusterSpec( - name = "cluster-s2r2", + name = cluster_name, shards = Array(shard_s2, shard_s1) // unsorted ) } diff --git a/clickhouse-core/src/test/scala/com/clickhouse/spark/spec/TableEngineUtilsSuite.scala b/clickhouse-core/src/test/scala/com/clickhouse/spark/spec/TableEngineUtilsSuite.scala new file mode 100644 index 00000000..85214128 --- /dev/null +++ b/clickhouse-core/src/test/scala/com/clickhouse/spark/spec/TableEngineUtilsSuite.scala @@ -0,0 +1,20 @@ +package com.clickhouse.spark.spec + +import org.scalatest.funsuite.AnyFunSuite + +class TableEngineUtilsSuite extends AnyFunSuite with NodeSpecHelper { + + test("test resolve table cluster by macro") { + val distributeSpec = DistributedEngineSpec( + engine_clause = "Distributed('{cluster}', 'wj_report', 'wj_respondent_local')", + cluster = "{cluster}", + local_db = "wj_report", + local_table = "wj_respondent_local", + sharding_key = None + ) + val clusterName = TableEngineUtils + .resolveTableCluster(distributeSpec, Seq(cluster), Seq(MacrosSpec("cluster", cluster_name))) + + assert(clusterName.name === cluster_name) + } +} diff --git a/clickhouse-core/src/testFixtures/conf/clickhouse-cluster/s1r1/macros.xml b/clickhouse-core/src/testFixtures/conf/clickhouse-cluster/s1r1/macros.xml index 1219e147..57fe725d 100644 --- a/clickhouse-core/src/testFixtures/conf/clickhouse-cluster/s1r1/macros.xml +++ b/clickhouse-core/src/testFixtures/conf/clickhouse-cluster/s1r1/macros.xml @@ -16,5 +16,6 @@ 1 1 + single_replica \ No newline at end of file diff --git a/clickhouse-core/src/testFixtures/conf/clickhouse-cluster/s1r2/macros.xml b/clickhouse-core/src/testFixtures/conf/clickhouse-cluster/s1r2/macros.xml index b6a029a0..99fba39d 100644 --- a/clickhouse-core/src/testFixtures/conf/clickhouse-cluster/s1r2/macros.xml +++ b/clickhouse-core/src/testFixtures/conf/clickhouse-cluster/s1r2/macros.xml @@ -16,5 +16,6 @@ 1 2 + single_replica \ No newline at end of file diff --git a/clickhouse-core/src/testFixtures/conf/clickhouse-cluster/s2r1/macros.xml b/clickhouse-core/src/testFixtures/conf/clickhouse-cluster/s2r1/macros.xml index 819602e4..53cb017d 100644 --- a/clickhouse-core/src/testFixtures/conf/clickhouse-cluster/s2r1/macros.xml +++ b/clickhouse-core/src/testFixtures/conf/clickhouse-cluster/s2r1/macros.xml @@ -16,5 +16,6 @@ 2 1 + single_replica \ No newline at end of file diff --git a/clickhouse-core/src/testFixtures/conf/clickhouse-cluster/s2r2/macros.xml b/clickhouse-core/src/testFixtures/conf/clickhouse-cluster/s2r2/macros.xml index ff46c3c9..27df101c 100644 --- a/clickhouse-core/src/testFixtures/conf/clickhouse-cluster/s2r2/macros.xml +++ b/clickhouse-core/src/testFixtures/conf/clickhouse-cluster/s2r2/macros.xml @@ -16,5 +16,6 @@ 2 2 + single_replica \ No newline at end of file diff --git a/spark-3.3/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseCatalog.scala b/spark-3.3/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseCatalog.scala index dea17a61..2d8380c1 100644 --- a/spark-3.3/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseCatalog.scala +++ b/spark-3.3/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseCatalog.scala @@ -59,6 +59,7 @@ class ClickHouseCatalog extends TableCatalog // /////////////////// CLUSTERS ////////////////////// // /////////////////////////////////////////////////// private var clusterSpecs: Seq[ClusterSpec] = Nil + private var macroSpecs: Seq[MacrosSpec] = Nil private var functionRegistry: FunctionRegistry = _ @@ -82,6 +83,7 @@ class ClickHouseCatalog extends TableCatalog } this.clusterSpecs = queryClusterSpecs(nodeSpec) + this.macroSpecs = queryMacrosSpec() val dynamicFunctionRegistry = new DynamicFunctionRegistry val xxHash64ShardFunc = new ClickHouseXxHash64Shard(clusterSpecs) @@ -133,7 +135,7 @@ class ClickHouseCatalog extends TableCatalog val tableEngineSpec = TableEngineUtils.resolveTableEngine(tableSpec) val tableClusterSpec = tableEngineSpec match { case distributeSpec: DistributedEngineSpec => - Some(TableEngineUtils.resolveTableCluster(distributeSpec, clusterSpecs)) + Some(TableEngineUtils.resolveTableCluster(distributeSpec, clusterSpecs, macroSpecs)) case _ => None } ClickHouseTable( diff --git a/spark-3.3/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseHelper.scala b/spark-3.3/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseHelper.scala index 6495d62c..d73572fa 100644 --- a/spark-3.3/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseHelper.scala +++ b/spark-3.3/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseHelper.scala @@ -111,6 +111,23 @@ trait ClickHouseHelper extends Logging { }.toSeq } + def queryMacrosSpec()(implicit nodeClient: NodeClient): Seq[MacrosSpec] = { + val macrosOutput = nodeClient.syncQueryAndCheckOutputJSONEachRow( + """ SELECT + | `macro`, -- String + | `substitution` -- String + | FROM `system`.`macros` + |""".stripMargin + ) + + macrosOutput.records + .map { row => + val name = row.get("macro").asText + val substitution = row.get("substitution").asText + MacrosSpec(name, substitution) + } + } + def queryDatabaseSpec( database: String, actionIfNoSuchDatabase: String => Unit = DEFAULT_ACTION_IF_NO_SUCH_DATABASE diff --git a/spark-3.4/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseCatalog.scala b/spark-3.4/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseCatalog.scala index 51c2ecb8..7a81d84e 100644 --- a/spark-3.4/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseCatalog.scala +++ b/spark-3.4/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseCatalog.scala @@ -59,6 +59,7 @@ class ClickHouseCatalog extends TableCatalog // /////////////////// CLUSTERS ////////////////////// // /////////////////////////////////////////////////// private var clusterSpecs: Seq[ClusterSpec] = Nil + private var macroSpecs: Seq[MacrosSpec] = Nil private var functionRegistry: FunctionRegistry = _ @@ -82,6 +83,7 @@ class ClickHouseCatalog extends TableCatalog } this.clusterSpecs = queryClusterSpecs(nodeSpec) + this.macroSpecs = queryMacrosSpec() val dynamicFunctionRegistry = new DynamicFunctionRegistry val xxHash64ShardFunc = new ClickHouseXxHash64Shard(clusterSpecs) @@ -134,7 +136,7 @@ class ClickHouseCatalog extends TableCatalog val tableEngineSpec = TableEngineUtils.resolveTableEngine(tableSpec) val tableClusterSpec = tableEngineSpec match { case distributeSpec: DistributedEngineSpec => - Some(TableEngineUtils.resolveTableCluster(distributeSpec, clusterSpecs)) + Some(TableEngineUtils.resolveTableCluster(distributeSpec, clusterSpecs, macroSpecs)) case _ => None } ClickHouseTable( diff --git a/spark-3.4/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseHelper.scala b/spark-3.4/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseHelper.scala index 68a322fa..e0bf8a97 100644 --- a/spark-3.4/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseHelper.scala +++ b/spark-3.4/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseHelper.scala @@ -112,6 +112,23 @@ trait ClickHouseHelper extends Logging { }.toSeq } + def queryMacrosSpec()(implicit nodeClient: NodeClient): Seq[MacrosSpec] = { + val macrosOutput = nodeClient.syncQueryAndCheckOutputJSONEachRow( + """ SELECT + | `macro`, -- String + | `substitution` -- String + | FROM `system`.`macros` + |""".stripMargin + ) + + macrosOutput.records + .map { row => + val name = row.get("macro").asText + val substitution = row.get("substitution").asText + MacrosSpec(name, substitution) + } + } + def queryDatabaseSpec( database: String, actionIfNoSuchDatabase: String => Unit = DEFAULT_ACTION_IF_NO_SUCH_DATABASE diff --git a/spark-3.5/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/ClickHouseClusterReadSuite.scala b/spark-3.5/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/ClickHouseClusterReadSuite.scala index 44fe1ff2..834b6bba 100644 --- a/spark-3.5/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/ClickHouseClusterReadSuite.scala +++ b/spark-3.5/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/ClickHouseClusterReadSuite.scala @@ -86,6 +86,45 @@ class ClickHouseClusterReadSuite extends SparkClickHouseClusterTest { } } + test("push down aggregation - distributed table with cluster macros") { + withSimpleDistTableUsingMacro("{cluster}", "single_replica", "db_agg_col", "t_dist", true) { (_, db, tbl_dist, _) => + checkAnswer( + spark.sql(s"SELECT COUNT(id) FROM $db.$tbl_dist"), + Seq(Row(4)) + ) + + checkAnswer( + spark.sql(s"SELECT MIN(id) FROM $db.$tbl_dist"), + Seq(Row(1)) + ) + + checkAnswer( + spark.sql(s"SELECT MAX(id) FROM $db.$tbl_dist"), + Seq(Row(4)) + ) + + checkAnswer( + spark.sql(s"SELECT m, COUNT(DISTINCT id) FROM $db.$tbl_dist GROUP BY m"), + Seq( + Row(1, 1), + Row(2, 1), + Row(3, 1), + Row(4, 1) + ) + ) + + checkAnswer( + spark.sql(s"SELECT m, SUM(DISTINCT id) FROM $db.$tbl_dist GROUP BY m"), + Seq( + Row(1, 1), + Row(2, 2), + Row(3, 3), + Row(4, 4) + ) + ) + } + } + test("runtime filter - distributed table") { withSimpleDistTable("single_replica", "runtime_db", "runtime_tbl", true) { (_, db, tbl_dist, _) => spark.sql("set spark.clickhouse.read.runtimeFilter.enabled=false") diff --git a/spark-3.5/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/SparkClickHouseClusterTest.scala b/spark-3.5/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/SparkClickHouseClusterTest.scala index c8de2044..fba80e06 100644 --- a/spark-3.5/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/SparkClickHouseClusterTest.scala +++ b/spark-3.5/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/SparkClickHouseClusterTest.scala @@ -99,6 +99,52 @@ trait SparkClickHouseClusterTest extends SparkTest with ClickHouseClusterMixIn { } } + def withSimpleDistTableUsingMacro( + cluster_macro: String, + actual_cluster: String, + db: String, + tbl_dist: String, + writeData: Boolean = false + )(f: (String, String, String, String) => Unit): Unit = + autoCleanupDistTable(actual_cluster, db, tbl_dist) { (cluster, db, tbl_dist, tbl_local) => + runClickHouseSQL( + s"""CREATE TABLE IF NOT EXISTS $db.$tbl_local ON CLUSTER '$cluster_macro' ( + | create_time DateTime64(3), + | y int COMMENT 'shard key', + | m int COMMENT 'part key', + | id Int64 COMMENT 'sort key', + | value Nullable(String) + |) engine MergeTree() + | PARTITION BY (m) + | ORDER BY (id) + | SETTINGS index_granularity = 8192; + |""".stripMargin + ) + runClickHouseSQL( + s"""CREATE TABLE IF NOT EXISTS $db.$tbl_dist ON CLUSTER '$cluster_macro' ( + | create_time DateTime64(3), + | y int COMMENT 'shard key', + | m int COMMENT 'part key', + | id Int64 COMMENT 'sort key', + | value Nullable(String) + |) engine Distributed('$cluster_macro', '$db', '$tbl_local', y) + |""".stripMargin + ) + if (writeData) { + + runClickHouseSQL( + s"""INSERT INTO $db.$tbl_dist (create_time, y, m, id, value) + | VALUES + |('2021-01-01 10:10:10', 2021, 1, 1, '1'), + |('2022-02-02 10:10:10', 2022, 2, 2, '2'), + |('2023-03-03 10:10:10', 2023, 3, 3, '3'), + |('2024-04-04 10:10:10', 2024, 4, 4, '4'); + |""".stripMargin + ) + } + f(cluster, db, tbl_dist, tbl_local) + } + def withSimpleDistTable( cluster: String, db: String, diff --git a/spark-3.5/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseCatalog.scala b/spark-3.5/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseCatalog.scala index d8437b59..3ef01f7f 100644 --- a/spark-3.5/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseCatalog.scala +++ b/spark-3.5/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseCatalog.scala @@ -67,6 +67,7 @@ class ClickHouseCatalog extends TableCatalog // /////////////////// CLUSTERS ////////////////////// // /////////////////////////////////////////////////// private var clusterSpecs: Seq[ClusterSpec] = Nil + private var macroSpecs: Seq[MacrosSpec] = Nil private var functionRegistry: FunctionRegistry = _ @@ -90,6 +91,7 @@ class ClickHouseCatalog extends TableCatalog } this.clusterSpecs = queryClusterSpecs(nodeSpec) + this.macroSpecs = queryMacrosSpec() val dynamicFunctionRegistry = new DynamicFunctionRegistry val xxHash64ShardFunc = new ClickHouseXxHash64Shard(clusterSpecs) @@ -142,7 +144,7 @@ class ClickHouseCatalog extends TableCatalog val tableEngineSpec = TableEngineUtils.resolveTableEngine(tableSpec) val tableClusterSpec = tableEngineSpec match { case distributeSpec: DistributedEngineSpec => - Some(TableEngineUtils.resolveTableCluster(distributeSpec, clusterSpecs)) + Some(TableEngineUtils.resolveTableCluster(distributeSpec, clusterSpecs, macroSpecs)) case _ => None } ClickHouseTable( diff --git a/spark-3.5/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseHelper.scala b/spark-3.5/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseHelper.scala index f859b117..69956612 100644 --- a/spark-3.5/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseHelper.scala +++ b/spark-3.5/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseHelper.scala @@ -80,6 +80,23 @@ trait ClickHouseHelper extends Logging { ) } + def queryMacrosSpec()(implicit nodeClient: NodeClient): Seq[MacrosSpec] = { + val macrosOutput = nodeClient.syncQueryAndCheckOutputJSONEachRow( + """ SELECT + | `macro`, -- String + | `substitution` -- String + | FROM `system`.`macros` + |""".stripMargin + ) + + macrosOutput.records + .map { row => + val name = row.get("macro").asText + val substitution = row.get("substitution").asText + MacrosSpec(name, substitution) + } + } + def queryClusterSpecs(nodeSpec: NodeSpec)(implicit nodeClient: NodeClient): Seq[ClusterSpec] = { val clustersOutput = nodeClient.syncQueryAndCheckOutputJSONEachRow( """ SELECT