Skip to content

Commit 68f4e02

Browse files
hoaihuongbkhuong.vuong
andauthored
Add Spark SQL connector (#277)
* Add SparkSQL connector * Fix SparkSQL constructor * Add query as mandatory field * Add SparkSQLConnector test cases * Move spark session init into test Co-authored-by: huong.vuong <[email protected]>
1 parent a170c57 commit 68f4e02

File tree

5 files changed

+135
-0
lines changed

5 files changed

+135
-0
lines changed

src/main/java/io/github/setl/enums/Storage.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ public enum Storage {
1414
JDBC("io.github.setl.storage.connector.JDBCConnector"),
1515
STRUCTURED_STREAMING("io.github.setl.storage.connector.StructuredStreamingConnector"),
1616
HUDI("io.github.setl.storage.connector.HudiConnector"),
17+
SPARK_SQL("io.github.setl.storage.connector.SparkSQLConnector"),
1718
OTHER(null);
1819

1920
private String connectorName;
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package io.github.setl.storage.connector
2+
3+
import com.typesafe.config.Config
4+
import io.github.setl.config.Conf
5+
import io.github.setl.enums.Storage
6+
import io.github.setl.util.TypesafeConfigUtils
7+
import org.apache.spark.sql.DataFrame
8+
9+
class SparkSQLConnector(val query: String) extends Connector {
10+
override val storage: Storage = Storage.SPARK_SQL
11+
12+
def this(conf: Conf) = this(conf.get("query", ""))
13+
def this(config: Config) = this(
14+
query = TypesafeConfigUtils.getAs[String](config, "query").getOrElse("")
15+
)
16+
17+
require(query.nonEmpty, "query is not defined")
18+
19+
/**
20+
* Read data from the data source
21+
*
22+
* @return a [[DataFrame]]
23+
*/
24+
@throws[org.apache.spark.sql.AnalysisException](s"$query is invalid")
25+
override def read(): DataFrame = spark.sql(query)
26+
27+
/**
28+
* Write a [[DataFrame]] into the data storage
29+
*
30+
* @param t a [[DataFrame]] to be saved
31+
* @param suffix for data connectors that support suffix (e.g. [[FileConnector]],
32+
* add the given suffix to the save path
33+
*/
34+
override def write(t: DataFrame, suffix: Option[String]): Unit = {
35+
if (suffix.isDefined) logWarning("suffix is not supported in SparkSQLConnector")
36+
write(t)
37+
}
38+
39+
/**
40+
* Write a [[DataFrame]] into the data storage
41+
*
42+
* @param t a [[DataFrame]] to be saved
43+
*/
44+
override def write(t: DataFrame): Unit = {
45+
logWarning("write is not supported in SparkSQLConnector")
46+
}
47+
}

src/test/resources/application.conf

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,3 +263,9 @@ hudi {
263263
hoodie.datasource.write.table.type = "MERGE_ON_READ"
264264
}
265265
}
266+
267+
sparkSQL {
268+
test {
269+
query = "SELECT * FROM schema.table"
270+
}
271+
}

src/test/scala/io/github/setl/config/Properties.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ object Properties {
2626
val jdbcConfig: Config = cl.getConfig("psql.test")
2727

2828
val hudiConfig : Config = cl.getConfig("hudi.test")
29+
val sparkSQLConfig : Config = cl.getConfig("sparkSQL.test")
2930

3031
val excelConfigConnector: Config = cl.getConfig("connector.excel")
3132
val cassandraConfigConnector: Config = cl.getConfig("connector.cassandra")
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
package io.github.setl.storage.connector
2+
3+
import io.github.setl.config.{Conf, Properties}
4+
import io.github.setl.{SparkSessionBuilder, TestObject}
5+
import org.apache.spark.SparkConf
6+
import org.apache.spark.sql.SparkSession
7+
import org.scalatest.funsuite.AnyFunSuite
8+
9+
class SparkSQLConnectorSuite extends AnyFunSuite{
10+
11+
val query : String =
12+
"""
13+
| SELECT (ones.n1 + tens.n2 * 10) as user_id
14+
| FROM (
15+
| SELECT 0 AS n1
16+
| UNION SELECT 1 AS n1
17+
| UNION SELECT 2 AS n1
18+
| UNION SELECT 3 AS n1
19+
| UNION SELECT 4 AS n1
20+
| UNION SELECT 5 AS n1
21+
| UNION SELECT 6 AS n1
22+
| UNION SELECT 7 AS n1
23+
| UNION SELECT 8 AS n1
24+
| UNION SELECT 9 AS n1
25+
| ) ones
26+
| CROSS JOIN
27+
| (
28+
| SELECT 0 AS n2
29+
| UNION SELECT 1 AS n2
30+
| UNION SELECT 2 AS n2
31+
| UNION SELECT 3 AS n2
32+
| UNION SELECT 4 AS n2
33+
| UNION SELECT 5 AS n2
34+
| UNION SELECT 6 AS n2
35+
| UNION SELECT 7 AS n2
36+
| UNION SELECT 8 AS n2
37+
| UNION SELECT 9 AS n2
38+
| ) tens
39+
|""".stripMargin
40+
41+
val testTable: Seq[TestObject] = Seq(
42+
TestObject(1, "p1", "c1", 1L),
43+
TestObject(2, "p2", "c2", 2L),
44+
TestObject(3, "p3", "c3", 3L)
45+
)
46+
47+
val options : Map[String, String] = Map(
48+
"query" -> query
49+
)
50+
51+
52+
test("Instantiation of constructors") {
53+
val connector = new SparkSQLConnector(query)
54+
assert(connector.query === query)
55+
56+
val testConfig = Properties.sparkSQLConfig
57+
val connector2 = new SparkSQLConnector(testConfig)
58+
assert(connector2.query === "SELECT * FROM schema.table")
59+
60+
val connector3 = new SparkSQLConnector(Conf.fromMap(options))
61+
assert(connector3.query === query)
62+
63+
assertThrows[IllegalArgumentException](new SparkSQLConnector(""))
64+
assertThrows[IllegalArgumentException](new SparkSQLConnector(Conf.fromMap(Map.empty)))
65+
assertThrows[IllegalArgumentException](new SparkSQLConnector(testConfig.withoutPath("query")))
66+
}
67+
68+
test("Read/Write of SparkSQLConnector") {
69+
val spark: SparkSession = SparkSession.builder().config(new SparkConf()).master("local[*]").getOrCreate()
70+
import spark.implicits._
71+
72+
val connector = new SparkSQLConnector(query)
73+
assert(connector.read().collect().length == 100)
74+
75+
// Should log warning & do nothing
76+
val testDF = testTable.toDF()
77+
connector.write(testDF)
78+
connector.write(testDF, Some("any_"))
79+
}
80+
}

0 commit comments

Comments
 (0)