Apache Spark is a cluster computing framework designed to work on massive amounts of data. The Spark driver program splits the overall query into tasks and sends these tasks to executor processes on different nodes of the cluster. To improve query performance, one strategy is to reduce the amount of data that is transferred from the data storage to these executors.
One way to prevent loading data that is not actually needed is filter pushdown (sometimes also referred to as predicate pushdown), which enables the execution of certain filters at the data source before it is loaded to an executor process. This becomes even more important if the executors are not on the same physical machine as the data.
In many cases, filter pushdown is automatically applied by Spark without explicit commands or input from the user. But in certain cases, which we will look at in this article, users have to provide specific information or even implement certain functionality themselves, especially when creating custom data sources, i.e. for unsupported database types or unsupported file types.
We use Scala for our code examples since the Spark classes and traits used are also written in Scala. However, implementing this functionality in other languages like Java is also possible. The full source code of our example project is available here: https://github.com/dynatrace-research/filter-pushdown-examples
Getting started
We use the Spark SQL module to filter pushdown and perform other optimizations. This module allows us to improve query performance by incorporating schema information for the underlying data using Spark DataFrames. The Spark SQL operations are accessed via a SparkSession, which we can create using a builder:
val session = SparkSession
.builder
.config(“spark.master”, “local[*]”)
.getOrCreate
We will use Spark in local mode for our examples — as seen from the config in the source code above. When using the local[N] setting, Spark creates N local executors, while local[*] creates one executor for each core of the machine the program is running on.
Running example: Dataset
To illustrate the examples in this text, we will use the following simple table of people.
How to get filters to the data source
For our first example, we read the above table from a CSV file and selected all “testers” from the list. When executing a query that contains such a filter without filter pushdown, the executors evaluate this filter. Looking at our example dataset above, we can see that the filter will remove the majority of rows from the table, and only two of the ten persons will be returned as a result of the query.
val dataFramePosition = session
.read.option("header", value = true)
.csv("Filter/src/main/resources/data.csv")
.filter(col("position") === "tester")
dataFramePosition.show()
The second and third lines read the data from the CSV file and use the column names in the first line to create the dataset’s schema. In the filter command, we define that the value in column position has to be equal to “tester.” Finally, the show method in the last line prints the result to the console.
+---+------+---+--------+
| id| name|age|position|
+---+------+---+--------+
| 1| Bob| 35| tester|
| 6|Maggie| 29| tester|
+---+------+---+--------+
When using a DataFrame, Spark now allows this filter to already be executed at the data source — the filter is pushed down to the data source. We can confirm the filter pushdown by analyzing the execution plan for the DataFrame using the explain method:
dataFramePosition.explain()
The query above gives the following output:
== Physical Plan ==
*(1) Project [id#16, name#17, age#18, position#19]
+- *(1) Filter (isnotnull(position#19) AND (position#19 = tester))
+- FileScan csv (…) PushedFilters: [IsNotNull(position), EqualTo(position,tester)], ReadSchema: struct<id:string,name:string,age:string,position:string>
We can see that the position filter is listed as a pushed filter, with an additional one that checks that the field is not empty. To prevent all the rows from being loaded into the Spark executors, the data source has to evaluate the filter and exclude the respective rows. The Spark CSV reader only loads those rows that satisfy the filters to the executor, but this is not the case for all data sources. For example, a custom data source has to implement the handling of pushed-down filters if this optimization should be included.
Filters containing casts
Not all filters are pushed down to the data source. One notable example are all filters that require casting the content of a field. To demonstrate this, we can change the filter from our previous example to only select people up to the age of 25.
val dataFrameAge = session
.read.option("header", value = true)
.csv("Filter/src/main/resources/data.csv")
.filter(col("age") <= 25)
The show and explain methods can again be used to get the final result and a description of the query execution plan:
+---+------+---+---------+
| id| name|age| position|
+---+------+---+---------+
| 0| Alice| 20|developer|
| 3| Marge| 23|developer|
| 6|Maggie| 20| tester|
+---+------+---+---------+
(…) Filter (isnotnull(age#63) AND (cast(age#63 as int) <= 25))
(…) PushedFilters: [IsNotNull(age)], ReadSchema: struct<id:string,name:string,age:string,position:string>
When looking at the output, we see that the correct rows are returned, but the filter was not actually pushed down. The default schema for the dataset prevents the filter pushdown because it sets the type of all columns to StringType. As a result, the filter requires a cast to integer, which is an operation that is not pushed down.
There are two ways to avoid this problem:
The first way is to use the Spark option to infer the schema of the data in a CSV file. However, this requires the file to be loaded twice — a first time to infer the schema from the data and the second time to actually load the data.
The second way is to explicitly define the schema for the CSV file that is opened. Since we already know the structure of the data, we will define the schema and add it to the query. In this new schema we define the age column (and additionally the id column) to have an integer type:
val schema = StructType(Array(
StructField(“id”, IntegerType, nullable = true),
StructField(“name”, StringType, nullable = true),
StructField(“age”, IntegerType, nullable = true),
StructField(“position”, StringType, nullable = true)))
val dataFrameSchema = session
.read.option(“header”, value = true)
.schema(schema)
.csv(“Filter/src/main/resources/data.csv”)
.filter(col(“age”) <= 25)
The output of the explain method for this DataFrame shows that the filter is pushed down again, and no cast is applied.
(…) PushedFilters: [IsNotNull(age), LessThanOrEqual(age,25)], ReadSchema: struct<id:int,name:string,age:int,position:string>
Implementation for a custom DataFrame
As discussed above, pushing the filters down to the data source gives no benefit if the data source does not evaluate the filters. This is especially important to keep in mind when creating your own data source to connect to an unsupported database or when using unsupported file types, for example.
One way to create a new data source is to extend BaseRelation and create a DataFrame from this new class — in our case named MyBaseRelation. When extending BaseRelation, one should also implement one of the following Scanner traits/interfaces, which all contain a method called buildScan (with different parameters) to load the actual data items:
- TableScan: The most basic trait that does not allow any filtering or selection of columns at the data source. The selection at the data source with this trait is equivalent to the SQL query “SELECT * FROM tableA”.
- PrunedScan: The second trait indicates that the data sources can select a subset of columns and change their order — equivalent to “SELECT fieldB, fieldA FROM tableA”
- PrunedFilteredScan: With this trait it is possible to both select columns and additionally apply specific filters to the rows at the data source. Since we want to use our filters in the BaseRelation, we will go for this trait.
- CatalystScan: The final Scanner trait also offers to select columns and filter rows. However, it uses expressions from the query plan directly instead of filters generated from them. As a result, some filter expressions that are not transformed into filters are available for the CatalystScan trait, but not for the PrunedFilteredScan, for example, array expressions like array_contains or size. However, working with common expressions becomes a lot more tedious. This trait is also still marked as “Experimental” and unstable in Spark 3.
To make this simple example a bit more concise we replaced the CSV file with a hard-coded sequence of Rows which is stored in the field DATA. In addition, we have to define the structure of these rows in the schema method. We use the same schema from the previous example.
Filter pushdown doesn’t mean that the filter must be executed during loading of the data items. By default, the filters are executed again on the result set in a later step. Therefore, pushed down filters that are only executed partly or that are not executed at all don’t lead to wrong results.
In contrast, the selection of the required columns (selection pushdown) must be performed in the BaseRelation, if the selected trait supports it. We therefore must implement a basic mapping for the expected columns given in the parameter requiredColumns.
override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
val seq = for (row <- DATA if passFilters(row, filters))
yield Row.fromSeq(for (column <- requiredColumns)
yield row.get(schema.fieldIndex(column)))
println(“Number of returned rows: “ + seq.size)
sparkSession.sparkContext.parallelize(seq)
}
The buildScan method creates a new Row object from each of the rows stored in DATA, if the original row passes all the filters. If the row is not filtered out, only the columns requested in requiredColumns are collected and the new Row object is added to the collection seq. The method then prints the number of rows that are returned.
Similar to filter pushdown, selection pushdown also aims to reduce the data that is loaded to the executors. This technique ensures that only those columns that are present in the result or that are needed to calculate the result are loaded, for example, a column used in a filter.
In the column selection code, we also add the call to the filter handling and a log message for the number of returned rows. From the output of dataFrame.explain() we know that for our queries so far three types of filters are pushed down: IsNotNull, EqualTo, and LessThanOrEqual. The IsNotNull filter is needed, because the fields in our schema specify that they can be empty (nullable = true). Setting nullable to false in the schema for the columns in question removes these filters. Our passFilters method that is called in the code above applies the filters used in our example:
def passFilters(row: Row, filters: Array[Filter]): Boolean = {
for (filter <- filters) {
filter match {
case IsNotNull(x) =>
if (row.get(schema.fieldIndex(x)) == null) return false
case EqualTo(attribute, value) =>
val index = schema.fieldIndex(attribute)
if (index == -1 || !row.get(index).equals(value)) return false
case LessThanOrEqual(attribute, value) =>
if (!checkInequality(row, attribute, value, (a: Double, b: Double) => a <= b)) return false
case LessThan(attribute, value) =>
if (!checkInequality(row, attribute, value, (a: Double, b: Double) => a < b)) return false
case GreaterThanOrEqual(attribute, value) =>
if (!checkInequality(row, attribute, value, (a: Double, b: Double) => a >= b)) return false
case GreaterThan(attribute, value) =>
if (!checkInequality(row, attribute, value, (a: Double, b: Double) => a > b)) return false
case _ => // ignore other filters
}
}
true
}
For the inequality filters (<=, <, >= and >) we only handle integer and double values. As mentioned above, all filters that are pushed down to MyBaseRelation are executed again at a later point. Letting some rows pass that will be removed in a later step is therefore not a problem. We also do not have to perform null checks for the field values, because of the explicit IsNotNull filters.
def checkInequality(row: Row, attribute: String, value: Any, comparison: (Double, Double) => Boolean): Boolean = {
val index = schema.fieldIndex(attribute)
val doubleVal = schema.fields(index).dataType match {
case _: IntegerType => row.get(index).asInstanceOf[Integer].doubleValue()
case _: DoubleType => row.get(index).asInstanceOf[Double]
case _ => return true
}
value match {
case i: Integer => comparison(doubleVal, i.doubleValue())
case d: Double => comparison(doubleVal, d)
case _ => true
}
}
In this example, we create a new DataFrame from an instance of MyBaseRelation and apply both filters from the previous examples to it.
var dataFrameBaseRelation = session
.baseRelationToDataFrame(new MyBaseRelation(session))
.filter(col(“position”) === “tester”)
.filter(col(“age”) <= 25)
dataFrameBaseRelation.show()
dataFrameBaseRelation.explain()
Checking the output of the experiment, we can see that the filters are pushed down and executed correctly. There is only one tester that is also at most 25 years old and the log statement from the buildScan method confirms that there is only one row that is not removed by the filters.
Number of returned rows: 1
+---+------+---+--------+
| id| name|age|position|
+---+------+---+--------+
| 6|Maggie| 20| tester|
+---+------+---+--------+
Conclusion
Filter pushdown and selection pushdown are two techniques that can greatly improve the performance of queries. While many of the standard technologies (such as the Spark readers for CSV files or parquet files) already implement these strategies, we still have to implement the handling, for example, when implementing our custom database connector or a specific data structure. Also, keep in mind that not every filter operation is pushed down to the data source, and small changes, for example, in the schema of the dataset, might have a huge performance impact.
Looking for answers?
Start a new discussion or ask for help in our Q&A forum.
Go to forum