Skip to content

Commit 7a06a13

Browse files
Spark: Support read with settings (#367)
* allow read with settings * make optional * update doc * update doc to describe default value is None --------- Co-authored-by: Hua Shi <[email protected]>
1 parent 35088ca commit 7a06a13

File tree

7 files changed

+37
-3
lines changed

7 files changed

+37
-3
lines changed

docs/configurations/02_sql_configurations.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ spark.clickhouse.ignoreUnsupportedTransform|false|ClickHouse supports using comp
2020
spark.clickhouse.read.compression.codec|lz4|The codec used to decompress data for reading. Supported codecs: none, lz4.|0.5.0
2121
spark.clickhouse.read.distributed.convertLocal|true|When reading Distributed table, read local table instead of itself. If `true`, ignore `spark.clickhouse.read.distributed.useClusterNodes`.|0.1.0
2222
spark.clickhouse.read.fixedStringAs|binary|Read ClickHouse FixedString type as the specified Spark data type. Supported types: binary, string|0.8.0
23+
spark.clickhouse.read.settings|None|Settings when read from ClickHouse. e.g. `final=1, max_execution_time=5`|0.9.0
2324
spark.clickhouse.read.format|json|Serialize format for reading. Supported formats: json, binary|0.6.0
2425
spark.clickhouse.read.runtimeFilter.enabled|false|Enable runtime filter for reading.|0.8.0
2526
spark.clickhouse.read.splitByPartitionId|true|If `true`, construct input partition filter by virtual column `_partition_id`, instead of partition value. There are known bugs to assemble SQL predication by partition value. This feature requires ClickHouse Server v21.6+|0.4.0

spark-3.3/clickhouse-spark/src/main/scala/com/clickhouse/spark/read/ClickHouseReader.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ abstract class ClickHouseReader[Record](
3636

3737
val readDistributedUseClusterNodes: Boolean = conf.getConf(READ_DISTRIBUTED_USE_CLUSTER_NODES)
3838
val readDistributedConvertLocal: Boolean = conf.getConf(READ_DISTRIBUTED_CONVERT_LOCAL)
39+
private val readSettings: Option[String] = conf.getConf(READ_SETTINGS)
3940

4041
val database: String = part.table.database
4142
val table: String = part.table.name
@@ -60,6 +61,7 @@ abstract class ClickHouseReader[Record](
6061
|WHERE (${part.partFilterExpr}) AND (${scanJob.filtersExpr})
6162
|${scanJob.groupByClause.getOrElse("")}
6263
|${scanJob.limit.map(n => s"LIMIT $n").getOrElse("")}
64+
|${readSettings.map(settings => s"SETTINGS $settings").getOrElse("")}
6365
|""".stripMargin
6466
}
6567

spark-3.3/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
package org.apache.spark.sql.clickhouse
1616

17-
import org.apache.spark.internal.config.ConfigEntry
17+
import org.apache.spark.internal.config.{ConfigEntry, OptionalConfigEntry}
1818
import org.apache.spark.sql.internal.SQLConf._
1919
import com.clickhouse.spark.exception.ClickHouseErrCode._
2020

@@ -209,4 +209,13 @@ object ClickHouseSQLConf {
209209
.stringConf
210210
.transform(_.toLowerCase)
211211
.createWithDefault("binary")
212+
213+
val READ_SETTINGS: OptionalConfigEntry[String] =
214+
buildConf("spark.clickhouse.read.settings")
215+
.doc("Settings when read from ClickHouse. e.g. `final=1, max_execution_time=5`")
216+
.version("0.9.0")
217+
.stringConf
218+
.transform(_.toLowerCase)
219+
.createOptional
220+
212221
}

spark-3.4/clickhouse-spark/src/main/scala/com/clickhouse/spark/read/ClickHouseReader.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ abstract class ClickHouseReader[Record](
3636

3737
val readDistributedUseClusterNodes: Boolean = conf.getConf(READ_DISTRIBUTED_USE_CLUSTER_NODES)
3838
val readDistributedConvertLocal: Boolean = conf.getConf(READ_DISTRIBUTED_CONVERT_LOCAL)
39+
private val readSettings: Option[String] = conf.getConf(READ_SETTINGS)
3940

4041
val database: String = part.table.database
4142
val table: String = part.table.name
@@ -60,6 +61,7 @@ abstract class ClickHouseReader[Record](
6061
|WHERE (${part.partFilterExpr}) AND (${scanJob.filtersExpr})
6162
|${scanJob.groupByClause.getOrElse("")}
6263
|${scanJob.limit.map(n => s"LIMIT $n").getOrElse("")}
64+
|${readSettings.map(settings => s"SETTINGS $settings").getOrElse("")}
6365
|""".stripMargin
6466
}
6567

spark-3.4/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
package org.apache.spark.sql.clickhouse
1616

17-
import org.apache.spark.internal.config.ConfigEntry
17+
import org.apache.spark.internal.config.{ConfigEntry, OptionalConfigEntry}
1818
import org.apache.spark.sql.internal.SQLConf._
1919
import com.clickhouse.spark.exception.ClickHouseErrCode._
2020

@@ -209,4 +209,13 @@ object ClickHouseSQLConf {
209209
.stringConf
210210
.transform(_.toLowerCase)
211211
.createWithDefault("binary")
212+
213+
val READ_SETTINGS: OptionalConfigEntry[String] =
214+
buildConf("spark.clickhouse.read.settings")
215+
.doc("Settings when read from ClickHouse. e.g. `final=1, max_execution_time=5`")
216+
.version("0.9.0")
217+
.stringConf
218+
.transform(_.toLowerCase)
219+
.createOptional
220+
212221
}

spark-3.5/clickhouse-spark/src/main/scala/com/clickhouse/spark/read/ClickHouseReader.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ abstract class ClickHouseReader[Record](
3636

3737
val readDistributedUseClusterNodes: Boolean = conf.getConf(READ_DISTRIBUTED_USE_CLUSTER_NODES)
3838
val readDistributedConvertLocal: Boolean = conf.getConf(READ_DISTRIBUTED_CONVERT_LOCAL)
39+
private val readSettings: Option[String] = conf.getConf(READ_SETTINGS)
3940

4041
val database: String = part.table.database
4142
val table: String = part.table.name
@@ -60,6 +61,7 @@ abstract class ClickHouseReader[Record](
6061
|WHERE (${part.partFilterExpr}) AND (${scanJob.filtersExpr})
6162
|${scanJob.groupByClause.getOrElse("")}
6263
|${scanJob.limit.map(n => s"LIMIT $n").getOrElse("")}
64+
|${readSettings.map(settings => s"SETTINGS $settings").getOrElse("")}
6365
|""".stripMargin
6466
}
6567

spark-3.5/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
package org.apache.spark.sql.clickhouse
1616

17-
import org.apache.spark.internal.config.ConfigEntry
17+
import org.apache.spark.internal.config.{ConfigEntry, OptionalConfigEntry}
1818
import org.apache.spark.sql.internal.SQLConf._
1919
import com.clickhouse.spark.exception.ClickHouseErrCode._
2020

@@ -209,4 +209,13 @@ object ClickHouseSQLConf {
209209
.stringConf
210210
.transform(_.toLowerCase)
211211
.createWithDefault("binary")
212+
213+
val READ_SETTINGS: OptionalConfigEntry[String] =
214+
buildConf("spark.clickhouse.read.settings")
215+
.doc("Settings when read from ClickHouse. e.g. `final=1, max_execution_time=5`")
216+
.version("0.9.0")
217+
.stringConf
218+
.transform(_.toLowerCase)
219+
.createOptional
220+
212221
}

0 commit comments

Comments
 (0)