Friday, June 30, 2017

Diving into Spark and Parquet Workloads, by Example

Topic: In this post you can find a few simple examples illustrating important features of Spark when reading partitioned tables stored in Parquet, in particular with a focus on performance investigations. The main topics covered are:
  • Partition pruning
  • Column projection
  • Predicate/filter push-down
  • Tools for investigating Parquet metadata
  • Tools for measuring Spark metrics

Motivations: The combination of Spark and Parquet currently is a very popular foundation for building scalable analytics platforms. At least this is what we find in several projects at the CERN Hadoop and Spark service. In particular performance, scalability and ease of use are key elements of this solution that make it very appealing to our users. This also motivates the work described in this post on exploring the technology and in particular the performance characteristics of Spark workloads and internals of Parquet to better understand what happens under the hood, what works well and what are some of the current limitations.

The lab environment

 
This post is based on examples and short pieces of code that you should be able to try out in your test environments and experiment on why something works (or why it doesn't). I have used in this post a table from the schema of the TPCDS benchmark. The setup instructions for TPCDS on Spark can be found on Github at: "Spark SQL Performance Tests".

When testing the examples and measuring performance for this post, I have mostly used Spark on a YARN/Hadoop cluster of 12 nodes, however this is not a hard dependency: you can run your tests with similar results using local filesystem and/or Spark in local mode. I have run most of the examples using spark-shell, however the examples use Spark SQL so in most cases they will run unchanged on PySpark and/or on a notebook environment.
The labs discussed in this post have been tested using Spark version 2.2.0 (Release Candidate) and on Spark 2.1.1. Spark 2.2.0 depends on Parquet libraries version 1.8.2 (Spark 2.1.1 uses Parquet 1.8.1).

Test table in Parquet

The examples presented here use the TPCDS schema created with scale 1500 GB. Only one table is used in the examples, I have chosen to use the largest fact table: STORE_SALES. The table is partitioned and after the schema installation is physically located as a collection of Parquet files organized under a root directory.
The total size is 185 GB in my lab environment. It adds up to 556 GB considering the 3-fold HDFS replication factor. This can be measured with:

$ hdfs dfs -du -h -s TPCDS/tpcds_1500/store_sales
185.3 G  556.0 G  TPCDS/tpcds_1500/store_sales

The partitioning structure is visible by listing the content of the top-level directory. There are 1824 partitions in the test schema I used. Each partition is organized in a separate folder, the folder name contains the partition key. The files are compressed with snappy. The size of each individual file varies depending on the amount of data in the given partition. Here is an example of path and size of one of the files that constitute the store_sale table in my test schema:

Name:  TPCDS/tpcds_1500/store_sales/ss_sold_date_sk=2452621/part-00077-57653b27-17f1-4069-85f2-7d7adf7ab7df.snappy.parquet
Bytes: 208004821
  

Spark and Parquet

Partitioning is a feature of many databases and data processing frameworks and it is key to make Spark jobs work at scale. Spark deals in a straightforward manner with partitioned tables in Parquet. The STORES_SALES from the TPCDS schema described in the previous paragraph is an example of how partitioning is implemented on a filesystem (HDFS in that case). Spark can read tables stored in Parquet and performs partition discovery with a straightforward API. This is an example of how to read the STORE_SALES table into a Spark DataFrame

val df = spark.read.
              format("parquet").
              load("TPCDS/tpcds_1500/store_sales")

This is an example of how to write a Spark DataFrame df into Parquet files preserving the partitioning (following the example of table STORE_SALES the partitioning key is ss_sold_date_sk): 

df.write.
  partitionBy("ss_sold_date_sk").
  parquet("TPCDS/tpcds_1500/store_sales_copy")
   

Partition pruning

 
Let's start the exploration with something simple: partition pruning. This feature, common to most systems implementing partitioning, can speed up your workloads considerably by reducing the amount of I/O needed to process your query/data access code. The underlying idea behind partition pruning, at least in its simplest form for single-table access as in the example discussed here, is to read data only from a list of partitions, based on a filter on the partitioning key, skipping the rest. An example to illustrate the concept is query (2) (see below if you want to peek ahead). Before getting to that, however, I want to introduce a "baseline workload" with the goal of having a reference point for comparing the results of all the optimizations that we are going to test in this post.

1. Baseline workload: full scan of the table, no partition pruning

I have used spark-shell on a YARN cluster in my tests. You can also reproduce this in local mode (i.e. using --master local[*]) and/or use pyspark or a notebook to run the tests if you prefer. An example of how to start spark-shell (customize as relevant for your environment) is:

$ spark-shell --num-executors 12 --executor-cores 4 --executor-memory 4g 

The next step is to use the Spark Dataframe API to lazily read the files from Parquet and register the resulting DataFrame as a temporary view in Spark.

spark.read.format("parquet").load("TPCDS/tpcds_1500/store_sales").createOrReplaceTempView("store_sales")

This Spark SQL command causes the full scan of all partitions of the table store_sales and we are going to use it as a "baseline workload" for the purposes of this post.

// query (1), this is a full scan of the table store_sales
spark.sql("select * from store_sales where ss_sales_price=-1.0").collect()

The query reads about 185 GB of data and 4.3 billion rows in my tests. You can find this type of performance metrics from the Spark Web UI (look for stage metrics: Input Size / Records), or with the tool sparkMeasure discussed later in this post. Here are the key metrics measured on a test of query (1):
  • Total Time Across All Tasks: 59 min
  • Locality Level Summary: Node local: 1675
  • Input Size / Records: 185.3 GB / 4319943621
  • Duration: 1.3 min            
The execution plan of the query shows how Spark executes the query: it distributes to the executors reading of all partitions/files on HDFS and then filtering for "ss_sales_price = -1", finally it collects the result set into the driver. Note the execution plan can be found in the Web UI or using the explain method on the Spark DataFrame.


Note: at this stage, you can ignore the where clause "ss_sales_price = -1.0" in the test query. It is there so that an empty result set is returned by the query (no sales price is negative), rather than returning 185 GB of data and flooding the driver! What happens when executing query (1) is that Spark has to scan all the table partitions (files) from Parquet before applying the filter "ss_sales_price = -1.0" so this example is still a valid illustration of the baseline workload of table full scan. Later in this post, you will find more details about how and why this works in the discussion on predicate push down.
   
2. Query with a filter on the partitioning key and using partition pruning

This is an example of a query where Spark SQL can use partition pruning. The query is similar to the baseline query (1) but with the notable change of an additional filter on the partition key. The query can be executed by reading only one partition of the STORE_SALES table.

// query (2), example of partition pruning
spark.sql("select * from store_sales where ss_sold_date_sk=2452621 and ss_sales_price=-1.0").collect()

The execution of query (2) is much faster than the full scan case of query (1). Obviously, the text of the query and the filters are different, but main point I want to stress is that only one partition needs to be read in the case of query (2) and this makes a huge difference. Web UI metrics show that query (2) needs to read only 198.6 MB of data (4502609 records).
  • Total Time Across All Tasks: 6 s
  • Locality Level Summary: Node local: 48
  • Input Size / Records: 198.6 MB / 4502609 
  • Duration: 3 s                        
The execution plan confirms that the filter on the partitioning key "ss_sold_date_sk=2452621" has been "pushed down" to Parquet reader which can use the information to just read the file containing data for that partition. Note: as in the previous example, the additional filter "ss_sales_price = -1" is there to return an empty set as opposed to fill the driver with the result set, as previously commented this can be ignored for the purposes of comparing full scan vs. partition pruning, as the filter is only evaluated after Spark has read the partition.


To recap, this section has shown two examples of reading with Spark a partitioned table stored in Parquet. The first example (query 1) is the baseline workload, performing a full scan of the entire table, the second example (query 2) shows the I/O reduction when a filter on the partitioning key allows Spark to use partition pruning. If partition pruning was not used by Spark the second query would also have to full scan the entire table.


Column projection

 
The idea behind this feature is simple: just read the data for columns that the query needs to process and skip the rest of the data. Column-oriented data formats like Parquet can implement this feature quite naturally. This contrasts with row-oriented data formats, typically used in relational databases and/or systems where optimizing for single row insert and updates are at a premium.
Column projection can provide an important reduction of the work needed to read the table and result in performance gains. The actual performance gain depends on the query, in particular on the fraction of the data/columns that need to be read to answer the business problem behind the query.
For example the table "store_sales" used for the example query (1) and (2) has 23 columns. For queries that don't need to retrieve the values of all the columns of the table, but rather a subset of the full schema, Spark and Parquet can optimize the I/O path and reduce the amount of data read from storage.
This command shows that "store_sales" has 23 columns and displays their names and data type:

spark.sql("desc store_sales").show(50)

+--------------------+------------+-------+
|            col_name|   data_type|comment|
+--------------------+------------+-------+
|     ss_sold_time_sk|         int|   null|
|          ss_item_sk|         int|   null|
|      ss_customer_sk|         int|   null|
|         ss_cdemo_sk|         int|   null|
|         ss_hdemo_sk|         int|   null|
|          ss_addr_sk|         int|   null|
|         ss_store_sk|         int|   null|
|         ss_promo_sk|         int|   null|
|    ss_ticket_number|         int|   null|
|         ss_quantity|         int|   null|
|   ss_wholesale_cost|decimal(7,2)|   null|
|       ss_list_price|decimal(7,2)|   null|
|      ss_sales_price|decimal(7,2)|   null|
| ss_ext_discount_amt|decimal(7,2)|   null|
|  ss_ext_sales_price|decimal(7,2)|   null|
|ss_ext_wholesale_...|decimal(7,2)|   null|
|   ss_ext_list_price|decimal(7,2)|   null|
|          ss_ext_tax|decimal(7,2)|   null|
|       ss_coupon_amt|decimal(7,2)|   null|
|         ss_net_paid|decimal(7,2)|   null|
| ss_net_paid_inc_tax|decimal(7,2)|   null|
|       ss_net_profit|decimal(7,2)|   null|
|     ss_sold_date_sk|         int|   null|
+--------------------+------------+-------+

The following example, query (3), is similar to the baseline query (1) discussed earlier, with the notable change that query (3) references 4 columns only: ss_sold_date_sk, ss_item_sk, ss_list_price with a filter on ss_sales_price. This allows Spark to reduce the amount of I/O performed compared to queries that need to read the entire table as query (1).

// query (3), an example of column pruning
spark.sql("select ss_sold_date_sk, ss_item_sk, ss_list_price from store_sales where ss_sales_price=-1.0").collect()

From the Web UI, after executing the query on a test system I find that it has read only 22.4 GB:

  • Total Time Across All Tasks: 13 min
  • Locality Level Summary: Node local: 1675
  • Input Size / Records: 22.4 GB / 4319943621
  • Duration: 18 s                        
Compare this with the baseline query (1), which accessed 185 GB of data instead.
The execution plan confirms that the query only needs to read data for the columns ss_sold_date_sk, ss_item_sk, ss_list_price:

    

Predicate push down

 
Predicate push down is another feature of Spark and Parquet that can improve query performance by reducing the amount of data read from Parquet files. Predicate push down works by evaluating filtering predicates in the query against metadata stored in the Parquet files. Parquet can optionally store statistics (in particular the minimum and maximum value for a column chunk) in the relevant metadata section of its files and can use that information to take decisions, for example, to skip reading chunks of data if the provided filter predicate value in the query is outside the range of values stored for a given column. This is a simplified explanation, there are many more details and exceptions that it does not catch, but it should give you a gist of what is happening under the hood. You will find more details later in this section and further in this post in the paragraph discussing Parquet internals.

An example where predicate push down can improve query performance


Here is an example where predicate push down is used to significantly improve the performance of a Spark query on Parquet.

// query (4), example of predicate push down
spark.sql("select * from store_sales where ss_quantity=-1").collect()

Query (4) is specially crafted with the filter predicate "where ss_quantity = -1". The predicate is never true in the STORE_SALES table (ss_quantity is a positive value). A powerful optimization can kick in in this case: Spark can push the filter to Parquet and Parquet can evaluate it against its metadata. As a consequence Spark and Parquet can skip performing I/O on data altogether with an important reduction in the workload and increase in performance. To be precise Spark/Parquet still need to access all the files that make the table to read the metadata, but this is orders of magnitude faster than reading the data. You can see that by comparing the execution metrics of query (4) with the baseline query (1):
  • Total Time Across All Tasks: 1.0 min
  • Locality Level Summary: Node local: 1675
  • Input Size / Records: 16.2 MB / 0
  • Duration: 3 s               
The execution plan shows that the filter  is indeed pushed down to Parquet:



There are exceptions when predicate push down cannot be used

This example brings us back to query(1):

// query(1), baseline with full scan of store_sales
spark.sql("select * from store_sales where ss_sales_price=-1.0").collect()

Question: if you compare query (1) and query (4), you will find that they look similar, however, their performance is quite different. Why?
The reason is that  predicate push down does not happen for all datatypes in Parquet, in particular with the current version of Spark+Parquet (that is Parquet version 1.8.2, for the tests reported here with Spark 2.2.0 RC) predicates on column of type DECIMAL are not pushed down, while INT (integer) values are pushed down (see also PARQUET-281). The reference query (1) has a filter on ss_sales_price of type decimal(7,2), while query (4) has a predicate on ss_quantity that is of type INT.

Note on the execution plan: Spark physical execution plan for query (1) reports that the predicate "ss_sales_price=-1.0" is actually pushed down similarly to what you can see for query (2), this can be misleading as Parquet is not actually able to push down this value. The performance metrics, elapsed time and amount of data read, confirm that the table is scanned fully in the case on query (1).

Another important point is that only predicates using certain operators can be pushed down as filters to Parquet. In the example of query (4) you can see a filter with an equality predicate being pushed down.
Other operators that can be pushed down are "<, <=, > , >=". More details on the datatypes and operators that Spark can push down as Parquet filters can be found in the source code. For more details follow the link to ParquetFilters.scala for a relevant piece of the source code.

An example where predicate push down kicks in but with minimal gains in performance

Here is another example that can be useful to understand some of the limitations around filter push down. What executing query (5) Spark can push down the predicate to  Parquet but the end result is only a small reduction in data read and therefore a minimal impact on performance.

// query (5), another example of predicate push down
// This time however with  small gains in performance, due to data distribution and filter value
spark.sql("select * from store_sales where ss_ticket_number=348594569").collect()

The stats are:
  • Total Time Across All Tasks: 50 min
  • Locality Level Summary: Node local: 1675
  • Input Size / Records: 173.7 GB / 4007845204  
  • Duration: 1.1 min             
Execution plan:



It is useful to understand the different behavior between query (4) and query (5). The reason is with the data: the value used for the filter "ss_ticket_number=348594569" has been cherry picked to be in inside the range between minimum and maximum for ss_ticket_number for most of the table partitions. To be more accurate the granularity at which Parquet stores metadata that can be used for predicate push down is called "row group" and is a subset of Parquet files. More on this in the section on Parquet internals and diagnostic tools.
Due to the nature of data and the value of the filter predicate, Parquet finds that the filter value is in the range of minimum-to-maximum value for most of the row groups. Therefore Parquet libraries end up reading the vast majority of the table in this example. For some partitions, predicate pushing kicks in and the actual amount of data read is a bit lower that the full table scan value in this example: 173 GB in query (5) vs. 185 GB in query (1).  
The main takeaway from this example is that filter push down does not guarantee performance improvements in all cases, the actual gains in I/O reduction due to predicate push down depend on data distribution and on predicate values.

Comments:

You can also expect that as Parquet matures more functionality will be added to predicate pushing. A notable example of this trend in PARQUET-384 "Add Dictionary Based Filtering to Filter2 API", an improvement pushed to Parquet 1.9.0.

Parquet file push down is enabled by default in Spark, if you want to further experiment with it you can also use the following parameter to turn the feature <on|off>: spark.sql.parquet.filterPushdown=<true|false>

In conclusion, predicate push down is a feature of Spark and Parquet that can help increase the performance of your queries by reducing the amount of data read based on metadata stored in Parquet about the minimum and maximum values of stored in columns and aggregated per row group. However this feature has limitations, notably, it can only be used with certain data types and operators as implemented in Parquet and Spark. Moreover, even when filters are pushed down, the actual reduction of I/O and relative increase in performance vary: the results depend on the provided filter values and data distribution in the source table.
 

Drill down into Parquet metadata with diagnostic tools

 
The discussion on predicate push down in the previous section has touched on some important facts about Parquet internals. At this stage you may want to explore the internals of Parquet files and their metadata structure to further understand the performance of your the queries. One good resource for that is the documentation at https://parquet.apache.org/documentation/latest
Also the Parquet source code has many additional details in the form of comments to the code. See for example: ParquetOutputFormat.java
Some of the main points about Parquet internals that I want to highlight are:
  • Hierarchically, a Parquet file consists of one or more "row groups". A row group contains data grouped ion "column chunks", one per column. Column chunks are structured in pages. Each column chunk contains one or more pages.
  • Parquet files have several metadata structures, containing among others the schema, the list of columns and details about the data stored there, such as name and datatype of the columns, their size, number of records and basic statistics as minimum and maximum value (for datatypes where support for this is available, as discussed in the previous section).
  • Parquet can use compression and encoding. The user can choose the compression algorithm used, if any. By default Spark uses snappy. 
  • Parquet can store complex data types and support nested structures. This is quite a powerful feature and it goes beyond the simple examples presented in this post.
There are a few tools and utilities that I find useful to investigate Parquet files. Using such tools against test Parquet data has helped me to better understand how Parquet works. In the rest of this section you can find some relevant examples.

Parquet-tools:

Parquet-tools is part of the main Apache Parquet repository, you can download it from  https://github.com/apache/parquet-mr/releases
The tests described here are based on Parquet version 1.8.2, released in January 2017. Note: Parquet version 1.9.0 is also out since October 2016, but it is not used by Spark, at least up to Spark version 2.2.0.

Tip: you can build and package the jar for parquet-tools with:
cd parquet-mr-apache-parquet-1.8.2/parquet-tools
mvn package

You can use parquet tools to examine the metadata of a Parquet file on HDFS using: "hadoop jar <path_to_jar> meta <path_to_Parquet_file>".  Other commands available with parquet-tools, besides "meta" include: cat, head, schema, meta, dump, just run parquet-tools with -h option to see the syntax.
This is an example of how to use the tool to read files located on HDFS:

$ echo "read metadata from a Parquet file in HDFS"
$ hadoop jar parquet-mr-apache-parquet-1.8.2/parquet-tools/target/parquet-tools-1.8.2.jar meta TPCDS/tpcds_1500/store_sales/ss_sold_date_sk=2452621/part-00077-57653b27-17f1-4069-85f2-7d7adf7ab7df.snappy.parquet

The output lists several details of the file metadata:
file path, parquet version used to write the file (1.8.2 in this case), additional info (Spark Row Metadata in this case):

file:                  hdfs://XXX.XXX.XXX/user/YYY/TPCDS/tpcds_1500/store_sales/ss_sold_date_sk=2452621/part-00077-57653b27-17f1-4069-85f2-7d7adf7ab7df.snappy.parquet
creator:               parquet-mr version 1.8.2 (build c6522788629e590a53eb79874b95f6c3ff11f16c)
extra:                 org.apache.spark.sql.parquet.row.metadata = {"type":"struct","fields":[{"name":"ss_sold_time_sk","type":"integer","nullable":true,"metadata":{}},{"name":"ss_item_sk","type":"integer","nullable":true,"metadata":{}},
...omitted in the interest of space... 
{"name":"ss_net_profit","type":"decimal(7,2)","nullable":true,"metadata":{}}]}

Additionally metadata about the schema:

file schema:           spark_schema
--------------------------------------------------------------
ss_sold_time_sk:       OPTIONAL INT32 R:0 D:1
ss_item_sk:            OPTIONAL INT32 R:0 D:1
ss_customer_sk:        OPTIONAL INT32 R:0 D:1
ss_cdemo_sk:           OPTIONAL INT32 R:0 D:1
ss_hdemo_sk:           OPTIONAL INT32 R:0 D:1
ss_addr_sk:            OPTIONAL INT32 R:0 D:1
ss_store_sk:           OPTIONAL INT32 R:0 D:1
ss_promo_sk:           OPTIONAL INT32 R:0 D:1
ss_ticket_number:      OPTIONAL INT32 R:0 D:1
ss_quantity:           OPTIONAL INT32 R:0 D:1
ss_wholesale_cost:     OPTIONAL INT32 O:DECIMAL R:0 D:1
ss_list_price:         OPTIONAL INT32 O:DECIMAL R:0 D:1
ss_sales_price:        OPTIONAL INT32 O:DECIMAL R:0 D:1
ss_ext_discount_amt:   OPTIONAL INT32 O:DECIMAL R:0 D:1
ss_ext_sales_price:    OPTIONAL INT32 O:DECIMAL R:0 D:1
ss_ext_wholesale_cost: OPTIONAL INT32 O:DECIMAL R:0 D:1
ss_ext_list_price:     OPTIONAL INT32 O:DECIMAL R:0 D:1
ss_ext_tax:            OPTIONAL INT32 O:DECIMAL R:0 D:1
ss_coupon_amt:         OPTIONAL INT32 O:DECIMAL R:0 D:1
ss_net_paid:           OPTIONAL INT32 O:DECIMAL R:0 D:1
ss_net_paid_inc_tax:   OPTIONAL INT32 O:DECIMAL R:0 D:1
ss_net_profit:         OPTIONAL INT32 O:DECIMAL R:0 D:1

Metadata about the row groups:

Note: If you want to investigate further, you can also dump information down to the page level using the command: parquet-tools command "dump --disable-data" on the Parquet file of interest.

Parquet_reader

Parquet_reader This is another utility that can help you navigate the internals and metadata of Parquet files. In particular parquet-cpp displays the statistics associated with Parquet columns and is useful to understand predicate push down.
Parquet_reader is a utility distributed with the Parquet-cpp project.
You can download it from https://github.com/apache/parquet-cpp/releases
The tests reported here have been run using version 1.1.0 released in May 2017.
Tips: You can build the project with: "cmake ." followed by "make". After that you can find the utility parquet_reader in the folder build/latest.

This is an example of how to use parquet_reader to browse file metadata. The tool works on filesystem data, so I have copied the parquet file from HDFS to local filesystem before running this:

./parquet_reader --only-metadata part-00077-57653b27-17f1-4069-85f2-7d7adf7ab7df.snappy.parquet

File metadata:  similarly to the case of parquet-tools you can find the list of columns and their data types. Note however that DECIMAL columns are not identified.

File Name: part-00077-57653b27-17f1-4069-85f2-7d7adf7ab7df.snappy.parquet
Version: 0
Created By: parquet-mr version 1.8.2 (build c6522788629e590a53eb79874b95f6c3ff11f16c)
Total rows: 2840100
Number of RowGroups: 2
Number of Real Columns: 22
Number of Columns: 22
Number of Selected Columns: 22
Column 0: ss_sold_time_sk (INT32)
Column 1: ss_item_sk (INT32)
Column 2: ss_customer_sk (INT32)
Column 3: ss_cdemo_sk (INT32)
Column 4: ss_hdemo_sk (INT32)
Column 5: ss_addr_sk (INT32)
Column 6: ss_store_sk (INT32)
Column 7: ss_promo_sk (INT32)
Column 8: ss_ticket_number (INT32)
Column 9: ss_quantity (INT32)
Column 10: ss_wholesale_cost (INT32)
Column 11: ss_list_price (INT32)
Column 12: ss_sales_price (INT32)
Column 13: ss_ext_discount_amt (INT32)
Column 14: ss_ext_sales_price (INT32)
Column 15: ss_ext_wholesale_cost (INT32)
Column 16: ss_ext_list_price (INT32)
Column 17: ss_ext_tax (INT32)
Column 18: ss_coupon_amt (INT32)
Column 19: ss_net_paid (INT32)
Column 20: ss_net_paid_inc_tax (INT32)
Column 21: ss_net_profit (INT32)

Row group metadata: here a snippet for the metadata relative the first row group. It contains the total size in bytes and the number of rows.

--- Row Group 0 ---
--- Total Bytes 154947976 ---
  Rows: 2840100---

Column chunk metadata: similarly to the case of parquet-tools you can find details on the number of rows and the compressed/uncompressed size. In addition parquet_reader shows the statistics of Minimum and Maximum values. Also the number of null values are reported, while distinct values appears to be 0 (not populated).

Column 0
, Values: 2840100, Null Values: 66393, Distinct Values: 0
  Max: 75599, Min: 28800
  Compression: SNAPPY, Encodings: RLE PLAIN_DICTIONARY BIT_PACKED
  Uncompressed Size: 5886233, Compressed Size: 2419027
Column 1
, Values: 2840100, Null Values: 0, Distinct Values: 0
  Max: 32000, Min: 1
  Compression: SNAPPY, Encodings: RLE PLAIN_DICTIONARY BIT_PACKED
  Uncompressed Size: 5040503, Compressed Size: 5040853
Column 2
, Values: 2840100, Null Values: 66684, Distinct Values: 0
  Max: 4599961, Min: 15
  Compression: SNAPPY, Encodings: RLE PLAIN_DICTIONARY BIT_PACKED
  Uncompressed Size: 7168827, Compressed Size: 4200678
...

Notably, there are no statistics for columns of type DECIMAL. This has implications for filter push down, as discussed earlier in this post. See for example:

...
Column 10
, Values: 2840100  Statistics Not Set
  Compression: SNAPPY, Encodings: RLE PLAIN_DICTIONARY BIT_PACKED
  Uncompressed Size: 5113188, Compressed Size: 5036313
Column 11
, Values: 2840100  Statistics Not Set
  Compression: SNAPPY, Encodings: RLE PLAIN_DICTIONARY BIT_PACKED
  Uncompressed Size: 5500119, Compressed Size: 5422519
...

sparkMeasure for measuring Spark metrics

 
Spark performance metrics are exposed via the WebUI and REST API. In addition to that a small custom-developed tool to measure Spark Task Metrics and SQL metrics is sparkMeasure. I have described it in the blog post "On Measuring Apache Spark Workload Metrics for Performance Troubleshooting".
To use sparkMeasure, download the jar or point to its coordinates on Maven Central as in this example: bin/spark-shell --packages ch.cern.sparkmeasure:spark-measure_2.11:0.11
This is an example of how to measure metrics for Spark SQL using sparkMeasure:

val stageMetrics = ch.cern.sparkmeasure.StageMetrics(spark)
stageMetrics.runAndMeasure(spark.sql("select * from store_sales where ss_sales_price=-1").collect())

Scheduling mode = FIFO
Spark Context default degree of parallelism = 48
Aggregated Spark stage metrics:
numStages => 1
sum(numTasks) => 1675
elapsedTime => 75684 (1.3 min)
sum(stageDuration) => 75684 (1.3 min)
sum(executorRunTime) => 3525951 (59 min)
sum(executorCpuTime) => 1006093 (17 min)
sum(executorDeserializeTime) => 4410 (4 s)
sum(executorDeserializeCpuTime) => 2106 (2 s)
sum(resultSerializationTime) => 69 (69 ms)
sum(jvmGCTime) => 811933 (14 min)
sum(shuffleFetchWaitTime) => 0 (0 ms)
sum(shuffleWriteTime) => 0 (0 ms)
max(resultSize) => 2346124 (2.0 MB)
sum(numUpdatedBlockStatuses) => 48
sum(diskBytesSpilled) => 0 (0 Bytes)
sum(memoryBytesSpilled) => 0 (0 Bytes)
max(peakExecutionMemory) => 0
sum(recordsRead) => 4319943621
sum(bytesRead) => 198992404103 (185.0 GB)
sum(recordsWritten) => 0
sum(bytesWritten) => 0 (0 Bytes)
sum(shuffleTotalBytesRead) => 0 (0 Bytes)
sum(shuffleTotalBlocksFetched) => 0
sum(shuffleLocalBlocksFetched) => 0
sum(shuffleRemoteBlocksFetched) => 0
sum(shuffleBytesWritten) => 0 (0 Bytes)
sum(shuffleRecordsWritten) => 0


scala> stageMetrics.printAccumulables

Aggregated Spark accumulables of type internal.metric. Sum of values grouped by metric name
Name => sum(value) [group by name]

executorCpuTime => 1006093 (17 min)
executorDeserializeCpuTime => 2106 (2 s)
executorDeserializeTime => 4410 (4 s)
executorRunTime => 3525951 (59 min)
input.bytesRead => 198992404103 (185.0 GB)
input.recordsRead => 4319943621
jvmGCTime => 811933 (14 min)
resultSerializationTime => 69 (69 ms)
resultSize => 2346124 (2.0 MB)

SQL Metrics and other non-internal metrics. Values grouped per accumulatorId and metric name.
Accid, Name => max(value) [group by accId, name]

256, number of output rows => 4319943621
259, scan time total => 3327453 (55 min)
260, duration total => 3522020 (59 min)

Conclusions

 
Spark and Parquet are currently the core technology for many analytics platforms. This post covers some of the basic features and workloads by example that highlight how Spark + Parquet can be useful when handling large partitioned tables, a typical use case for data warehousing and analytics. In particular partition discovery, partition pruning, data compression, column projection and filter push down are covered in this post. In addition this post shows some examples of diagnostic tools for exploring Parquet metadata (parquet-tools, parquet_reader) and tools to measure Spark workloads (Spark WebUI and a custom tool sparkMeasure). This only scratches the surface of what is there to explore and experiment with Spark and Parquet, I hope the readers will have much more fun with their own investigations.
 

References

Spark programming guide: https://spark.apache.org/docs/latest/sql-programming-guide.html 
Spark + Parquet In Depth, Spark Summit East talk: https://www.youtube.com/watch?v=_0Wpwj_gvzg  
Recent work on and around Parquet is can be followed at https://twitter.com/ApacheParquet
For the lab setup with the TPCDS schema for Spark see https://github.com/databricks/spark-sql-perf and also https://github.com/gregrahn/tpcds-kit

Acknowledgements

   
This work has been developed in the context of the CERN Hadoop and Spark service and is built thanks to the contributions of several of my colleagues there.
  

Bonus material

 
I have some spill over material for this post that I add here in the form of a few additional questions related to reading Parquet with Spark. The idea is that that you may be interested to use these questions as a learning exercise. It would take too long to discuss the questions and answers in details, for the moment I have only added some hints.

Question 1: At first though a simple implementation of the baseline workload for table scan would be to just count the elements in Parquet table (DataFrame in Spark). I mean something like spark.sql("select count(*) from store_sales").collect(). It appears a simpler than the SQL proposed in query (1) above, moreover you may be used to run a count(*) query in other database systems to cause a full scan of the table. Can you explain what is wrong with using count() to measure the performance or reading from Parquet files?

Question 2: Why does the partitioning column "ss_sold_date_sk" of the table store_sales does not appear in the Parquet metadata dumps performed with parquet-tools and parquet_reader?

Question 3: The table store_sales has 1824 partitions in my test sample. However, after reading it into Spark the Spark DataFrame has only 1675 partitions in my tests (note the number of partitions in a Dataframe df can be found with the command "df.rdd.partitions.size"). You should also find a similar difference in your test systems. Can you explain the difference between the number of partition in Parquet and Spark?
Can you describe the rules for populating Spark partitions when reading Parquet files?

Question 4: What happens if you write a Spark DataFrame to Parquet without specifying the partitionBy clause. Will the resulting table be partitioned?

Question 5: How can you configure the size of the row group in parquet? Experiment with different sizes and see the results by dumping the metadata.


Hints to the questions:

Hint 1: Look at Parquet metadata and where to find the information on row count.
Hint 2: The partitioning column is rather picked up by looking at the directory structure where doing partition discovery.
Hint 3: Experiment also with setting spark.sql.files.maxPartitionBytes (defaults to 128 MB, which is also the default for Parquet block size)
Hint 4: Examine the resulting directory structure on the filesystem.
Hint 5: Experiment with changing the Parquet block size using: sc.hadoopConfiguration.setInt("parquet.block.size",256*1024*1024)

   

Wednesday, March 29, 2017

On Measuring Apache Spark Workload Metrics for Performance Troubleshooting

Topic: This post is about measuring Apache Spark workload metrics for performance investigations. In particular you can find the description of some practical techniques and a simple tool that can help you with Spark workload metrics collection and performance analysis. The post is released with accompanying code on GitHub: sparkMeasure

Introduction to Spark performance instrumentation

  
The problem we are trying to solve: From recent experience I find that scalability and performance are some of the key motivating factors that drive people to use  Apache Spark. In that context, my colleagues and I have been involved in a few development projects around Spark lately and found the need to collect workload metrics and instrumentation for performance analysis and troubleshooting. Spark (I refer to Spark 2.1.0 in this post) comes with many instrumentation points, however I find that it is not always easy nor fast to identify and collect all the needed data, possibly from multiple sources, as needed for root-cause and performance analysis. That's why this post covers some ideas, tools and additional custom development on the topic of collecting Spark performance metrics that I find useful for troubleshooting and analysis of Spark workloads.

Before discussing custom development and tools in the next paragraphs, I want to cover some of the common and most basic approaches to measuring performance with Spark. Elapsed time is probably the first and easiest metric one can measure: you just need to instrument your code with time measurements at the beginning and end of the code you want to measure. For example you can do this by calling System.nanoTime (Scala) or time.time() (Python). When using Spark with Scala, a way to execute code and measure its elapsed time is by using: spark.time(<code to measure>), for example: 

scala> spark.time(sql("select count(*) from range(1e4) cross join range(1e4)").show)
+---------+
| count(1)|
+---------+
|100000000|
+---------+

Time taken: 1157 ms

The problem with investigating performance by just measuring elapsed time is that this approach often does not provide insights on why the system performs in a certain way. Many are familiar with a related pitfall that comes from using "black box" benchmark workloads. It is often found that the results of a benchmark based on measuring latency of a reference workload do not generalize well to production use cases. Typically you need to fig to the root cause analysis and find what is happening behind the hood. This is valid in general when doing performance investigations/drill-down, in this post we apply these ideas to Spark investigations.

The Spark WebUI is the next obvious place to go to for additional information and measurements when troubleshooting, or just monitoring job execution. For those of you knew to Spark, the WebUI is normally accessible by pointing your browser to port 4040 of the driver node. The WebUI is OK for interactive troubleshooting , however it lacks flexibility for performing custom aggregations and metrics visualizations, among others. The next stop is Spark's REST API  (see also Spark documentation "Monitoring and Instrumentation"), which makes the information available from the WebUI available through a REST interface. This opens the possibility to write custom analysis on the captured metrics. Moreover the API exposes a list of metrics, including CPU usage that in some cases go beyond what is exposed from the web pages of the WebUI (as of Spark version 2.1.0).
For completeness I want to mention also Spark's metrics system that can be used to send metrics' data to several sinks, including Graphite, to monitoring purposes.
Note: if you are new to Spark before reading further I advise to get an overview of Spark execution model (see for example "Job Scheduling") and make sure you have a practical understanding of what Jobs, Stages and Tasks are.

A practical introduction to Spark Listeners

  
Spark listeners are the main source of monitoring information in Spark: the WebUI and the rest of the instrumentation in Spark employs a variety of "Listeners" to collect performance data.
For the scope of this post you just need to know that listeners are implemented as Scala classes and used by the Spark engine to "trigger" code execution on particular events, notably one can use the listeners to collect metrics information at each job/stage/task start and end events. There is more to it than this simple explanation, but this should be enough to help you understanding the following examples if you are new to these topic (see the references section of this post for additional links to more detailed explanations).

1. A basic example that you can test using  spark-shell (the Scala REPL for Spark) should help illustrating how the instrumentation with listeners work (see this Github gist):


What can you get with this simple example of the instrumentation is the executor run time and CPU time, aggregated by Stage. For example when running the simple SQL with a cartesian join used in the example of the previous paragraph, you should find that the CustomListener emits log warning messages with workload metrics information similar to the following:

WARN CustomListener: Stage completed, runTime: 8, cpuTime: 5058939
WARN CustomListener: Stage completed, runTime: 8157, cpuTime: 7715929885
WARN CustomListener: Stage completed, runTime: 1, cpuTime: 1011061

Note that "run time" is measured in milliseconds, while "CPU time " is measured in nanoseconds. The purpose of the example is to illustrate that you can get interesting metrics from Spark execution using custom listeners. There are more metrics available, they are exposed to the code in the custom listener via the  stageInfo.taskMetrics class. This is just a first step, you will see more in the following. As a recap, the proof-of-concept code of this basic example:
  • creates the class CustomListener extending SparkListener
  • defines a method that overerides onStageCompleted to collect the metrics at the end of each stage
  • instantiates the class and "attaches it" to the active Spark Context using sc.addSparkListener(myListener)

2. Dig deeper into how the Spark Listeners work by cloning a listener from the WebUI and then examine the metrics values from the cloned object. This is how you can do it from the spark-shell command line:

scala> val myConf = new org.apache.spark.SparkConf()
scala> val myListener = new org.apache.spark.ui.jobs.JobProgressListener(myConf)
scala> sc.addSparkListener(myListener)

The custom listener, called myListener, is a clone of JobProgressListener on which you have full control. After adding it to the Spark Context it starts collecting information. You can read the details of the collected metrics from directly its instantiated class. For example you can print the executor run time and CPU time for all the completed stages with this example code:

myListener.completedStages.foreach(si => (
  println("runTime: " + si.taskMetrics.executorRunTime +
          ", cpuTime: " + si.taskMetrics.executorCpuTime)))

A recap of the lessons learned from experimenting with Spark listeners:
  • Spark listeners are used to implement monitoring and instrumentation in Spark.
  • This provides a programmatic interface to collect metrics from Spark job/stage/task executions.
  • User programs can extend listeners and gather monitoring information.
  • Metrics are provided by the Spark execution engine at for each task. Metrics are also provided in aggregated form at higher levels, notably at the stage level.
  • One of the key structures providing metrics data is the TaskMetrics class that reports for example run time, CPU time, shuffle metrics, I/O metrics and others.

Key learning point: it is possible to attach a listener to an active Spark Context, using: sc.addSparkListener).
For completeness, there is another method to attach listeners to Spark Context using --conf spark.extraListeners, this will be discussed later in this post.

It's time to write some code: sparkMeasure

  
The rest of this post covers a custom tool I have developed in Scala to collect Spark workload/performance metrics and ease their analysis: sparkMeasure.

Some of the key features of sparkMeasure are:
  • the tool can be used to collect Spark metrics data both from Scala and Python 
  • the user can choose to collect data (a) aggregated at the end of each Stage of execution and/or (b) performance metrics data for each Task
  • data collected by sparkMeasure can be exported to a Spark DataFrame for workload exploration and/or can saved for further analysis
  • sparkMeasure can also be used in "Flight Recorder" mode, recording all metrics in a file for later processing.
How to use sparkMeasure:
  • To use sparkMeasure, download the jar or point to its coordinates on Maven Central as in this example: bin/spark-shell --packages ch.cern.sparkmeasure:spark-measure_2.11:0.11
  • Another option is to compile and package the jar using sbt.
  • run spark-shell/pyspark/spark-submit adding the packaged jar to the "--jars" command line option. Example: spark-shell --jars target/scala-2.11/spark-measure_2.11-0.12-SNAPSHOT.jar


Examples of usage of sparkMeasure

  
Example 1a: A basic example using spark-shell (Scala).
Note this requires sparkMeasure, packaged in a jar file as detailed above:

[bash]$ spark-shell --packages ch.cern.sparkmeasure:spark-measure_2.11:0.11

This will instantiate the instrumentation, run the test workload and print a short report:

scala> val stageMetrics = new ch.cern.sparkmeasure.StageMetrics(spark)

scala> stageMetrics.runAndMeasure(sql("select count(*) from range(1e4) cross join range(1e4)").show)

 The output you should find if you run it, should be similar to this:

Scheduling mode = FIFO
Spark Contex default degree of parallelism = 8
Aggregated Spark stage metrics:
numstages = 3
sum(numtasks) = 17
elapsedtime = 1092 (1 s)
sum(stageduration) = 1051 (1 s)
sum(executorruntime) = 7861 (8 s)
sum(executorcputime) = 7647 (8 s)
sum(executordeserializetime) = 68 (68 ms)
sum(executordeserializecputime) = 22 (22 ms)
sum(resultserializationtime) = 0 (0 ms)
....
Note: additional metrics reported by the tool are omitted here as their value is close to 0 are negligible for this example

The first conclusion is that the  job executes almost entirely on CPU, not causing any significant activity of shuffle and/or disk read/write, as expected. You can see in the printed report that the job was executed with 3 stages and that the default degree of parallelism was set to 8. Executor run time and CPU time metrics, both report cumulative time values and are expected to be greater than the elapsed time: indeed their value is close to 8 (degree of parallelism) * elapsed (wall clock) time.
A note on what happens with stageMetrics.runAndMeasure:
  • the stageMetrics class works as "wrapper code" to instantiate an instance of the custom listener "StageInfoRecorderListener" 
  • it adds the listener into the active Spark Context, this takes care of recording workload metrics at each Stage end event,
  • finally when the execution of the code (an SQL statement in this case) is finished, runAndMeasure exports the metrics into a Spark DataFrame and prints a cumulative report of the metrics collected.


Example 1b: This is the Python equivalent of the example 1a above (i.e. relevant when using  pyspark). The example code is:

$ pyspark --packages ch.cern.sparkmeasure:spark-measure_2.11:0.11

stageMetrics = sc._jvm.ch.cern.sparkmeasure.StageMetrics(spark._jsparkSession)
stageMetrics.begin()
spark.sql("select count(*) from range(1e4) cross join range(1e4)").show()
stageMetrics.end()
stageMetrics.printReport()

Note that the syntax for the Python example  is almost the same as for the Scala example 1a, with the notable exceptions of using sc_jvm to access the JVM from Python, and the use of spark._jsparkSession to access the relevant Spark Session. Another difference between Scala and Python, is that the method stageMetrics.runAndMeasure used in example 1a does not work in Python, you will need to break down its operations (time measurement and reporting of the results) as detailed in the example 1b.


Example 2: This example is about investigating the effect of "task stragglers" by using Task metrics data. The metrics I collected and report here as an example are taken by runnig on a Spark (on YARN) cluster of 14 nodes, as follows:

$ spark-shell --num-executors 14 --executor-cores 4 --driver-memory 2g  --executor-memory 2g --packages ch.cern.sparkmeasure:spark-measure_2.11:0.11

The test workload for this example is the one previously described in the post "Apache Spark 2.0 Performance Improvements Investigated With Flame Graphs". This is the code for preparing the test tables:

val testNumRows = 1e7.toLong
sql(s"select id from range($testNumRows)").createOrReplaceTempView("t0")
sql("select id, floor(200*rand()) bucket, floor(1000*rand()) val1, floor(10*rand()) val2 from t0").cache().createOrReplaceTempView("t1")
sql("select count(*) from t1").show

This part instantiates the classe used to measure Task metrics using custom listeners:

val taskMetrics = new ch.cern.sparkmeasure.TaskMetrics(spark)

This is the code to run the test workload:

taskMetrics.runAndMeasure(sql(
"select a.bucket, sum(a.val2) tot from t1 a, t1 b where a.bucket=b.bucket and a.val1+b.val1<1000 group by a.bucket order by a.bucket").show)

The metrics values collected and aggregated over all the tasks underlying the Spark workload under measurement (one SQL statement execution in this case) are:

Scheduling mode = FIFO
Spark Contex default degree of parallelism = 56
Aggregated Spark task metrics:
numtasks = 312
elapsedtime = 341393 (5.7 min)
sum(duration) = 10397845 (2.9 h)
sum(schedulerdelay) = 3737
sum(executorruntime) = 10353312 (2.9 h)
sum(executorcputime) = 10190371 (2.9 h)
sum(executordeserializetime) = 40691 (40 s)
sum(executordeserializecputime) = 8200 (8 s)
sum(resultserializationtime) = 105 (0.1 s)
sum(jvmgctime) = 21524 (21 s)
sum(shufflefetchwaittime) = 121732 (122 s)
sum(shufflewritetime) = 13101 (13 s)
sum(gettingresulttime) = 0 (0 ms)
max(resultsize) = 6305
sum(numupdatedblockstatuses) = 76
sum(diskbytesspilled) = 0
sum(memorybytesspilled) = 0
max(peakexecutionmemory) = 42467328
sum(recordsread) = 1120
sum(bytesread) = 74702928 (74.7 MB)
sum(recordswritten) = 0
sum(byteswritten) = 0
sum(shuffletotalbytesread) = 171852053 (171.8 MB)
sum(shuffletotalblocksfetched) = 13888
sum(shufflelocalblocksfetched) = 1076
sum(shuffleremoteblocksfetched) = 12812
sum(shufflebyteswritten) = 171852053 (171.8 MB)
sum(shufflerecordswritten) = 20769230


The main points to note from the output of the aggregated metrics are:
  • The workload/SQL execution takes about 5 minutes of elapsed time (wall-clock time, as observed by the user launching the query). 
  • The workload is CPU-bound: the reported values for "run time" and "CPU time" metrics are almost equal, moreover the reported values of other time-related metrics are close to 0 and negligible for this workload. This behavior was expected from the results of the analysis discussed at this link
  • The total time spent executing the SQL summing the time spent by all the tasks is about 3 hours.
  • The amount of CPU "cores" used concurrently, on average over the elapsed time of the SQL execution, can be estimated with this formula:   sum(executorcputime) / elapsedtime  = 10190371 / 341393 ~ 30
  • The number of allocated cores by Spark executors is 56 (see also the reported value of default parallelism). Compare the 56 allocated cores to the calculated average CPU core utilization of 30. This points to the fact that the allocated CPUs were not fully utilized on average and it's worth additional investigations (more about this in the following)
Workload metrics show that the execution was CPU-bound but also that not all the potentially available CPU cycles on the executors were used. Why the low efficiency? The idea is to drill down on this performance-related question using the metrics collected by the TaskMetrics class and TaskInfoRecorderListener, which detail the behavior of each executed task. As a reference, the following piece of code can be used to export all the collected metrics into a DataFrame and also to save them to a file for further analysis:

// export task metrics information into a Spark DataFrame for analysis 
// if needed, also save them to disk
val df = taskMetrics.createTaskMetricsDF()
taskMetrics.saveData(df, "myPerfTaskMetrics1")

Note: It is also useful to note the start and end time of the execution of the code of interest. When using taskMetrics.runAndMeasure those values can be retrieve by printing taskMetrics.beginSnapshot and taskMetrics.endSnapshot, another option is to run System.currentTimeMillis() at the start and end of the workload of interest

The plot of the "Number of running Spark tasks vs. Time" (see below) can give you more clues on why the allocated CPUs were not fully ustilized during the workload execution. You can see that (1) in the first 150 seconds of the workload execution, the system uses all the available cores, after that it starts to slowly "ramp down", finally an important amount of time is spent on a long tail with some "straggler tasks". This provides additional information on why and how the SQL query under study was not able to use all the available CPUs all the time, as discussed above: we find that some of the available CPUs were idle for a considerable amount of time. It is worth reminding that this particular workload is CPU bound (i.e. no significant time is spent on I/O or other activities). For the pourpose of this post we can stop the analysis here. You can find the code for this analysis, with plots and additional drill down on the collected metrics in the notebook at this link






Why is this useful: Performing  analysis of the workload by drilling down into the metrics collected at the task level is of great help to understand why a given workload performs in a certain way and to identify the bottlenecks. The goal is also to derive actionable information to further improve the performance. You may be already familiar with investigating Spark performance using the Event Timeline in the  Spark WebUI, which already makes this type of investigations possible.
The techniques discussed in this post allow to extend and generalize the analysis, the idea is that you can export all the available metrics to your favorite analytics tool (for example a Jupyter notebook running PySpark) and experiment by aggregating and filtering metrics across multiple dimensions. Moreover the analysis can span multiple stages or jobs as needed and can correlate the behavior of all the collected metrics, as relevant (elapsed time, CPU, scheduler delay, shuffle I/O time, I/O time, etc). Another point is that having the metrics stored on a file allows to compare jobs performance across systems and/or application releases in a simple way and opens also the way to automation of data analysis tasks


Example 3:  This example is about measuring a complex query taken from the TPCS-DS benchmark at scale 1500GB deployed using spark-sql-perf. The query tested is TPCDS_v1.4_query 14a. The amount of I/O and of shuffle to support the join operations in this query are quite important. In this example Spark was run using 14 executors (on a cluster) and a total of 28 cores (2 cores for executor). Spark version: 2.1.0. The example is reported mostly to show that sparkMeasure can be used also for complex and long-running  workload. I postpone the analysis, as that would go beyond the scope of this post. The output metrics of the execution of query TPCDS 14a in the test environment described above are:

Scheduling mode = FIFO
SparkContex default degree of parallelism = 28
numstages = 23
sum(numtasks) = 13580
sum(duration) = 6136302 (1.7 h)
sum(executorruntime) = 54329000 (15.1 h)
sum(executorcputime) = 36956091 (10.3 h)
sum(executordeserializetime) = 52272 (52 s)
sum(executordeserializecputime) = 28390 (28 s)
sum(resultserializationtime) = 757 (0.8 s)
sum(jvmgctime) = 2774103 (46 min)
sum(shufflefetchwaittime) = 6802271 (1.9 h)
sum(shufflewritetime) = 4074881 (1.1 h)
max(resultsize) = 12327247
sum(numupdatedblockstatuses) = 894
sum(diskbytesspilled) = 0
sum(memorybytesspilled) = 1438044651520 (1438.0 GB)
max(peakexecutionmemory) = 379253665280
sum(recordsread) = 22063697280
sum(bytesread) = 446514239001 (446.5 GB)
sum(recordswritten) = 0
sum(byteswritten) = 0
sum(shuffletotalbytesread) = 845480329356 (845.5 GB)
sum(shuffletotalblocksfetched) = 1429271
sum(shufflelocalblocksfetched) = 104503
sum(shuffleremoteblocksfetched) = 1324768
sum(shufflebyteswritten) = 845478036776 (845.5 GB)
sum(shufflerecordswritten) = 11751384039


The flight recorder mode for sparkMeasure

  
Flight recorder mode addresses the cases when you need to instrument a Spark application but do not want (or cannot) add code to your job(s) to instantiate the custom listeners and attach them to the active Spark Context (for example using StageMetrics and/or TaskMetrics class, as was the case in the previous examples). You can deploy the metrics data collection in offline ("flight recorder") mode by adding custom listener code into Spark ListenerBus when starting the Spark Context.
For example using the spark-submit command line you can do that by adding: "--conf spark.extraListeners=...". The code for two listeners suitable for "Flight Mode" is provided with sparkMeasure: FlightRecorderStageMetrics and FlightRecorderTaskMetrics, respectively to measure stage- and task-level metrics. Example:

$ spark-submit --conf spark.extraListeners=ch.cern.sparkmeasure.FlightRecorderStageMetrics --packages ch.cern.sparkmeasure:spark-measure_2.11:0.11 ...additional jars and/or code

The flight recorder mode will save the results in serialized format on a file in the driver's filesystem. The action of saving the metrics to a file happens at the end of the application and is triggered by intercepting the relative event using the listener. Additional parameters are available to specify the name of the output files:

--conf spark.executorEnv.taskMetricsFileName=<file path> (defaults to "/tmp/taskMetrics.serialized")
--conf spark.executorEnv.stageMetricsFileName=<file path> (defaults to "/tmp/stageMetrics.serialized")

You will need to post-process the output files produced by the "Flight Recorder" mode. The reason is that the saved files contain the collected metrics in the form of serialized objects. You can read the files and deserialize the objects using the package Utils provided in sparkMeasure. After deserialization the values are stored in a ListBuffer that can be easily transformed in a DataFrame. An example of what all this means in practice:

val taskVals = ch.cern.sparkmeasure.Utils.readSerializedTaskMetrics("<file name>")
val taskMetricsDF = taskVals.toDF

Similarly, when post-processing stage metrics:
val stageVals = ch.cern.sparkmeasure.Utils.readSerializedStageMetrics("<file name>")
val stageMetricsDF = stageVals.toDF


Recap and main points on how and why to use sparkMeasure


  • Use sparkMeasureto measure Spark workload performance. Compile and add the jar of sparkMeasure to your Spark environemnt
  • Consider sparkMeasure as an alternative and extension of spark.time(<spark code>), instead just measuring the elapsed time with stageMetrics.runAndMeasure(<spark code>) or taskMetrics.runAndMeasure(<spark code>) you have the summary of multiple workload metrics
  • Start with measuring at Stage level, as it is more lightweight. Use the Task-level metrics if you need to drill down on task execution details or skew (certain tasks or hosts may behave differtly than the average)
  • Export metrics for offline analysis if needed and import them in your tool of choice (for example a notebook environment).


Summary and further work

    
Collecting and analyzing workload metrics beyond simple measurement of the elapsed time is important to drill down on performance investigations with root-cause analysis. sparkMeasure is a tool and proof-of-concept code that can help you collect and analyze workload metrics of Apache Spark jobs.
You can use sparkMeasure to investigate the performance of Spark workloads both for Scala and Python environments. You can use it from the command-line shell (REPL) or Jupyter notebook or as an aid to instrument your code with custom listeners and metrics reports. It is also possible to use sparkMeasure to collect and store metrics for offline analysis.
The available metrics are collected by extending the Spark listener interface, similarly to what is done by the Spark WebUI. The collected metrics are transformed into Spark DataFrames for ease of analysis.
sparkMeasure allows to collect metrics at the Task level for fine granularity and/or aggregated at Stage level. The collected metrics data come from existing Spark instrumentation. For the case of Spark 2.1.0 this includes execution time, CPU time, time for serialization and deserialization, shuffle read/write time, HDFS I/O metrics and others (see more details in sparkMeasure documentation and code). See also this example analysis of Task Metrics data using a notebook.

In this post you can find some simple examples of how and why to use sparkMeasure to drill down on performance metrics. Ideas for future work in this area include:
  • add more examples to illustrate the meaning and accuracy of Spark instrumentation metrics
  • show further examples where actionable info or insights can be gained by drilling down into Spark performance metrics
  • show limitations of the currently available instrumentation (for example in the area of instrumentation for I/O service time)
  • measure the overhead of the instrumentation using Spark listeners
  • additional insights that can be derived by examining skew in the distribution of performance metrics at the task level


Acknowledgements


This work has been developed in the context of the CERN Hadoop and Spark service: credits go to my colleagues there for collaboration, in particular to Prasanth Kothuri and Zbigniew Baranowski. Thanks to Viktor Khristenko for direct collaboration on this topic and for his original work on the instrumentation of spark-root with Spark listeners.
Other material that has helped me for the development of this work are Jacek Laskowski's writeup and presentations on the subject of Spark Listeners and the presentation "Making Sense of Spark Performance" by Kay Ousterhout.
The Spark source code and the comments therein have also been very useful for researching this topic. In particular I would like to point to the Scheduler's code for the Spark Listener and the WebUI's JobProgressListener.