-
Notifications
You must be signed in to change notification settings - Fork 286
Expand file tree
/
Copy pathExternalNonRectangular.scala
More file actions
191 lines (165 loc) · 6.2 KB
/
ExternalNonRectangular.scala
File metadata and controls
191 lines (165 loc) · 6.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
package sql
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SQLContext, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
import org.apache.spark.sql.sources._
import scala.collection.mutable.{ArrayBuffer, HashMap}
//
// Demonstrate the Spark SQL external data source API, but for
// simplicity don't connect to an external system -- just create a
// synthetic table whose size and partitioning are determined by
// configuration parameters.
//
//
// The FilterInterpreter class contains everything
// we need to interpret the filter language, both for constructing the query to
// the back-end data engine and for filtering the rows it gives us.
//
class FilterInterpreter2(allFilters: Array[Filter]) {
//
// First organize the filters into a map according to the columns they apply to
//
private val allAttrToFilters: Map[String, Array[Filter]] = allFilters
.map(f => (getFilterAttribute(f), f))
.groupBy(attrFilter => attrFilter._1)
.mapValues(a => a.map(p => p._2))
//
// pull out the parts of the filters we'll push out to the back-and data engine
//
val (min, max, otherKeyFilters) = splitKeyFilter
//
// and tidy up the remaining folters that we'll apply to records returned
//
private val attrToFilters = allAttrToFilters - "val" + ("val" -> otherKeyFilters)
//
// Apply the filters to a returned record
//
def apply(r: Map[String, Int]): Boolean = {
r.forall({
case (attr, v) => {
val filters = attrToFilters.getOrElse(attr, new Array[Filter](0))
satisfiesAll(v, filters)
}
})
}
private def splitKeyFilter: (Option[Int], Option[Int], Array[Filter]) = {
val keyFilters = allAttrToFilters.getOrElse("val", new Array[Filter](0))
var min: Option[Int] = None
var max: Option[Int] = None
val others = new ArrayBuffer[Filter](0)
keyFilters.foreach({
case GreaterThan(attr, v) => min = Some(v.asInstanceOf[Int] + 1)
case LessThan(attr, v) => max = Some(v.asInstanceOf[Int] - 1)
case GreaterThanOrEqual(attr, v) => min = Some(v.asInstanceOf[Int])
case LessThanOrEqual(attr, v) => max = Some(v.asInstanceOf[Int])
case _ => others.++=: _
})
(min, max, others.toArray)
}
private def getFilterAttribute(f: Filter): String = {
f match {
case EqualTo(attr, v) => attr
case GreaterThan(attr, v) => attr
case LessThan(attr, v) => attr
case GreaterThanOrEqual(attr, v) => attr
case LessThanOrEqual(attr, v) => attr
case In(attr, vs) => attr
}
}
private def satisfiesAll(value: Int, filters: Array[Filter]): Boolean = {
filters.forall({
case EqualTo(attr, v) => value == v.asInstanceOf[Int]
case GreaterThan(attr, v) => value > v.asInstanceOf[Int]
case LessThan(attr, v) => value < v.asInstanceOf[Int]
case GreaterThanOrEqual(attr, v) => value >= v.asInstanceOf[Int]
case LessThanOrEqual(attr, v) => value <= v.asInstanceOf[Int]
case In(attr, vs) => vs.exists(v => value == v.asInstanceOf[Int])
})
}
}
//
// Extending TableScan allows us to describe the schema and
// provide the rows when requested
//
case class MyPFTableScan2(count: Int, partitions: Int)
(@transient val sqlContext: SQLContext)
extends BaseRelation with PrunedFilteredScan {
// instantiate the (fake) back-end storage engine
val db = new RangeDB(count)
val schema: StructType = StructType(Seq(
StructField("val", IntegerType, nullable = false),
StructField("data", StructType(Seq(
StructField("squared", IntegerType, nullable = false),
StructField("cubed", IntegerType, nullable = false)
)))
))
// massage a back-end row into a map for uniformity
private def makeMap(rec: RangeDBRecord): Map[String, Int] = {
val m = new HashMap[String, Int]()
m += ("val" -> rec.key)
m += ("squared" -> rec.squared)
m += ("cubed" -> rec.cubed)
m.toMap
}
// project down to the required columns in the right order and wrap up as a Row
private def projectAndWrapRow(m: Map[String, Int],
requiredColumns: Array[String]): Row = {
//val l = requiredColumns.map(c => m(c))
//val r = Row.fromSeq(l)
val r = Row(m("val"), Row(m("squared"), m("cubed")))
r
}
// Get the data, filter and project it, and return as an RDD
def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
// organzie the filters
val filterInterpreter = new FilterInterpreter2(filters)
// get the data, pushing as much filtering to the back-end as possible
// (in this case, not much)
val rowIterator = db.getRecords(filterInterpreter.min, filterInterpreter.max)
val rows = rowIterator
.map(rec => makeMap(rec))
.filter(r => filterInterpreter.apply(r))
.map(r => projectAndWrapRow(r, requiredColumns))
sqlContext.sparkContext.parallelize(rows.toSeq, partitions)
}
}
//
// Extending RelationProvider allows us to route configuration parameters into
// our implementation -- this is the class we specify below when registering
// temporary tables.
//
class CustomPFRP2 extends RelationProvider {
def createRelation(sqlContext: SQLContext, parameters: Map[String, String]) = {
MyPFTableScan2(parameters("rows").toInt,
parameters("partitions").toInt)(sqlContext)
}
}
object ExternalNonRectangular {
def main(args: Array[String]) {
val spark =
SparkSession.builder()
.appName("SQL-ExternalNonRectangular")
.master("local[4]")
.getOrCreate()
// register it as a temporary table to be queried
// (could register several of these with different parameter values)
spark.sql(
s"""
|CREATE TEMPORARY VIEW dataTable
|USING sql.CustomPFRP2
|OPTIONS (partitions '9', rows '50')
""".stripMargin)
// query the table we registered, using its column names
// NOTE: requests the columns val, cubed, squared in that order!
val data =
spark.sql(
s"""
|SELECT val, data.cubed
|FROM dataTable
|WHERE val <= 40 AND data.squared >= 900
|ORDER BY val
""".stripMargin)
data.foreach(r => println(r))
}
}