Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.clickhouse.spark.spec

import com.clickhouse.spark.ToJson

case class MacrosSpec(
name: String,
substitution: String
) extends ToJson with Serializable {

override def toString: String = s"macro: $name, substitution: $substitution"
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,34 @@ object TableEngineUtils extends Logging {
}
}

def resolveTableCluster(distributedEngineSpec: DistributedEngineSpec, clusterSpecs: Seq[ClusterSpec]): ClusterSpec =
clusterSpecs.find(_.name == distributedEngineSpec.cluster)
def resolveTableCluster(
distributedEngineSpec: DistributedEngineSpec,
clusterSpecs: Seq[ClusterSpec],
macrosSpecs: Seq[MacrosSpec]
): ClusterSpec = {
val clusterName = if (distributedEngineSpec.cluster.contains("{")) {
val macrosMap = macrosSpecs.map(spec => (spec.name, spec.substitution)).toMap

var clusterName = distributedEngineSpec.cluster
var startPos = clusterName.indexOf('{')
while (startPos >= 0) {
val endPos = clusterName.indexOf('}', startPos)
if (endPos > startPos) {
val macroName = clusterName.substring(startPos + 1, endPos)
val substitution = macrosMap.getOrElse(macroName, throw CHClientException(s"Unknown macro: ${macroName}"))
clusterName = clusterName
.substring(0, startPos)
.concat(substitution)
.concat(clusterName.substring(endPos + 1))
}
startPos = clusterName.indexOf('{')
}
clusterName
} else {
distributedEngineSpec.cluster
}

clusterSpecs.find(_.name == clusterName)
.getOrElse(throw CHClientException(s"Unknown cluster: ${distributedEngineSpec.cluster}"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package com.clickhouse.spark.spec

trait NodeSpecHelper {

val cluster_name = "cluster-s2r2"

val node_s1r1: NodeSpec = NodeSpec("s1r1")
val node_s1r2: NodeSpec = NodeSpec("s1r2")
val node_s2r1: NodeSpec = NodeSpec("s2r1")
Expand All @@ -40,7 +42,7 @@ trait NodeSpecHelper {

val cluster: ClusterSpec =
ClusterSpec(
name = "cluster-s2r2",
name = cluster_name,
shards = Array(shard_s2, shard_s1) // unsorted
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.clickhouse.spark.spec

import org.scalatest.funsuite.AnyFunSuite

class TableEngineUtilsSuite extends AnyFunSuite with NodeSpecHelper {

test("test resolve table cluster by macro") {
val distributeSpec = DistributedEngineSpec(
engine_clause = "Distributed('{cluster}', 'wj_report', 'wj_respondent_local')",
cluster = "{cluster}",
local_db = "wj_report",
local_table = "wj_respondent_local",
sharding_key = None
)
val clusterName = TableEngineUtils
.resolveTableCluster(distributeSpec, Seq(cluster), Seq(MacrosSpec("cluster", cluster_name)))

assert(clusterName.name === cluster_name)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,6 @@
<macros>
<shard>1</shard>
<replica>1</replica>
<cluster>single_replica</cluster>
</macros>
</yandex>
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,6 @@
<macros>
<shard>1</shard>
<replica>2</replica>
<cluster>single_replica</cluster>
</macros>
</yandex>
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,6 @@
<macros>
<shard>2</shard>
<replica>1</replica>
<cluster>single_replica</cluster>
</macros>
</yandex>
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,6 @@
<macros>
<shard>2</shard>
<replica>2</replica>
<cluster>single_replica</cluster>
</macros>
</yandex>
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,45 @@ class ClickHouseClusterReadSuite extends SparkClickHouseClusterTest {
}
}

test("push down aggregation - distributed table with cluster macros") {
withSimpleDistTableUsingMacro("{cluster}", "single_replica", "db_agg_col", "t_dist", true) { (_, db, tbl_dist, _) =>
checkAnswer(
spark.sql(s"SELECT COUNT(id) FROM $db.$tbl_dist"),
Seq(Row(4))
)

checkAnswer(
spark.sql(s"SELECT MIN(id) FROM $db.$tbl_dist"),
Seq(Row(1))
)

checkAnswer(
spark.sql(s"SELECT MAX(id) FROM $db.$tbl_dist"),
Seq(Row(4))
)

checkAnswer(
spark.sql(s"SELECT m, COUNT(DISTINCT id) FROM $db.$tbl_dist GROUP BY m"),
Seq(
Row(1, 1),
Row(2, 1),
Row(3, 1),
Row(4, 1)
)
)

checkAnswer(
spark.sql(s"SELECT m, SUM(DISTINCT id) FROM $db.$tbl_dist GROUP BY m"),
Seq(
Row(1, 1),
Row(2, 2),
Row(3, 3),
Row(4, 4)
)
)
}
}

test("runtime filter - distributed table") {
withSimpleDistTable("single_replica", "runtime_db", "runtime_tbl", true) { (_, db, tbl_dist, _) =>
spark.sql("set spark.clickhouse.read.runtimeFilter.enabled=false")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,52 @@ trait SparkClickHouseClusterTest extends SparkTest with ClickHouseClusterMixIn {
}
}

def withSimpleDistTableUsingMacro(
cluster_macro: String,
actual_cluster: String,
db: String,
tbl_dist: String,
writeData: Boolean = false
)(f: (String, String, String, String) => Unit): Unit =
autoCleanupDistTable(actual_cluster, db, tbl_dist) { (cluster, db, tbl_dist, tbl_local) =>
runClickHouseSQL(
s"""CREATE TABLE IF NOT EXISTS $db.$tbl_local ON CLUSTER '$cluster_macro' (
| create_time DateTime64(3),
| y int COMMENT 'shard key',
| m int COMMENT 'part key',
| id Int64 COMMENT 'sort key',
| value Nullable(String)
|) engine MergeTree()
| PARTITION BY (m)
| ORDER BY (id)
| SETTINGS index_granularity = 8192;
|""".stripMargin
)
runClickHouseSQL(
s"""CREATE TABLE IF NOT EXISTS $db.$tbl_dist ON CLUSTER '$cluster_macro' (
| create_time DateTime64(3),
| y int COMMENT 'shard key',
| m int COMMENT 'part key',
| id Int64 COMMENT 'sort key',
| value Nullable(String)
|) engine Distributed('$cluster_macro', '$db', '$tbl_local', y)
|""".stripMargin
)
if (writeData) {

runClickHouseSQL(
s"""INSERT INTO $db.$tbl_dist (create_time, y, m, id, value)
| VALUES
|('2021-01-01 10:10:10', 2021, 1, 1, '1'),
|('2022-02-02 10:10:10', 2022, 2, 2, '2'),
|('2023-03-03 10:10:10', 2023, 3, 3, '3'),
|('2024-04-04 10:10:10', 2024, 4, 4, '4');
|""".stripMargin
)
}
f(cluster, db, tbl_dist, tbl_local)
}

def withSimpleDistTable(
cluster: String,
db: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class ClickHouseCatalog extends TableCatalog
// /////////////////// CLUSTERS //////////////////////
// ///////////////////////////////////////////////////
private var clusterSpecs: Seq[ClusterSpec] = Nil
private var macroSpecs: Seq[MacrosSpec] = Nil

private var functionRegistry: FunctionRegistry = _

Expand All @@ -90,6 +91,7 @@ class ClickHouseCatalog extends TableCatalog
}

this.clusterSpecs = queryClusterSpecs(nodeSpec)
this.macroSpecs = queryMacrosSpec()

val dynamicFunctionRegistry = new DynamicFunctionRegistry
val xxHash64ShardFunc = new ClickHouseXxHash64Shard(clusterSpecs)
Expand Down Expand Up @@ -142,7 +144,7 @@ class ClickHouseCatalog extends TableCatalog
val tableEngineSpec = TableEngineUtils.resolveTableEngine(tableSpec)
val tableClusterSpec = tableEngineSpec match {
case distributeSpec: DistributedEngineSpec =>
Some(TableEngineUtils.resolveTableCluster(distributeSpec, clusterSpecs))
Some(TableEngineUtils.resolveTableCluster(distributeSpec, clusterSpecs, macroSpecs))
case _ => None
}
ClickHouseTable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,23 @@ trait ClickHouseHelper extends Logging {
)
}

def queryMacrosSpec()(implicit nodeClient: NodeClient): Seq[MacrosSpec] = {
val macrosOutput = nodeClient.syncQueryAndCheckOutputJSONEachRow(
""" SELECT
| `macro`, -- String
| `substitution` -- String
| FROM `system`.`macros`
|""".stripMargin
)

macrosOutput.records
.map { row =>
val name = row.get("macro").asText
val substitution = row.get("substitution").asText
MacrosSpec(name, substitution)
}
}

def queryClusterSpecs(nodeSpec: NodeSpec)(implicit nodeClient: NodeClient): Seq[ClusterSpec] = {
val clustersOutput = nodeClient.syncQueryAndCheckOutputJSONEachRow(
""" SELECT
Expand Down