In this blog post we want to share experience and results achieved during Spark Optimization initiative with one of our clients.
It was suggested that many resource intensive Spark workflows are not utilizing allocated resources efficiently and can be optimized in order to save costs. With that objective Spark specialists squad was formed.
To tackle this challenge we agreed on the strategy:
- identify top N resource intensive Spark workflows
- analyze the collective impact of these workflows on the resource pool
- review and improve performance of the top N resource intensive workflows
- describe usage patterns which lead to inefficient resource consumption
- write a series of blog posts to share the findings
- embed optimization hints into existing Spark training
- host Spark Optimization workshop
The first challenge was to understand how to approach optimization of 3000+ workflows. It was obvious that it is impossible to optimize everything with the given time and human resources.
Execution Cost Service
With the same team, we recently built a service for the cost estimates of workflows running on the on-premise clusters. Cost metrics are – total and average resources (CPU and memory) consumption by a workflow and each of its steps during a representative period of time – day, week, month.
Looking at the cumulative distribution (CDF) of memory consumption by workflows gave us a strong indication that we can reduce the amount of workflows in our focus group.
95 percent of Spark workflows do not allocate more than 35 GB per day. Looking at the last 5 percent, we can see significant deviations (50 – 1100 GB/day). This outlier group we will analyze further.
Identify top resource intensive workflows
After some investigation it was decided to build ad-hoc analytics on top of the Cost Service, which would monitor resource consumption per workflow and normalize it. We run the analysis every other week to monitor the dynamics of resource consumption. It was important to eliminate workflows that were executing backfill tasks and remove them from the target group.
Grouping Spark workflows by CPU consumption in range buckets
Diagram explanation: for example the first pair of bars (0:4] – Spark workflows which consume from 0 to 4 CPUs per day. Blue bar – total amount of CPUs allocated per day by the workflows in the bucket. Red bar – number of Spark workflows in the bucket.
From this graph we know that 3 buckets (32, 80], (80, 400], (400, 100] in total are 49 Spark workflows, which allocate around 50% of CPUs of the total allocated by Spark workflows.
Data driven decision
Based on the provided analysis, we produced the estimate of the theoretical gain. The 49 Spark workflows, mentioned above, represent around 1% of the total amount of Spark workflows. In total Spark workflows produce 30% of the clusters load. If we optimise this 1% of workflows to consume 50% less CPU, it will cause 15% reduction of the clusters load. The assumption that we can get 50% CPU reduction is pretty optimistic. But if we reach at least 25%, which seemed feasible to us, it would be a big success anyway – reducing 7% of the clusters load. This estimate was good enough to start the optimization phase.
There are two clusters which experienced the heaviest loads from Spark workflows, thus they were chosen for the optimization battlefield.
Design of the clusters:
- YARN resource manager
- HDFS v2 primary data lake
- Hive and Spark processing engines
- Spark and Hadoop centralized History Servers
- Oozie and Airflow workflow schedulers
Technical results: optimised workflows
After defining workflows that should be tackled first, we started investigating them one by one.
From the way of working perspective, we’ve followed the following approach:
- investigate particular workflows
- propose necessary optimizations
- contact the owner of the workflow to discuss and implement necessary changes
As expected on the top of the list were Spark workflows which were processing quite a lot of data. On average these are Spark workflows that process datasets ranging from 5 to 27 TB. You can imagine that on these volumes of data even small optimization can make a huge impact. After reviewing a couple of workflows, we discovered common patterns suitable for optimization.
In the beginning of the initiative our intention was to touch as little code as possible and optimize Spark workflows by optimizing the configuration in the first place. The idea behind it was to get results fast, without extra work (testing, redeployment, data migration) if possible. Among others, one of the most useful settings during this process was
spark.sql.shuffle.partitions. The default value of 200 is hardly useful with the datasets we were working on. It led to large shuffle partitions, which generated a huge spill on memory and disk which significantly slowed down workflow execution. Spark creates one task for one partition, so if partition size is too big then partition won’t fit into memory of the executor. For some Spark workflows increasing the number of shuffle partitions, already boosted the performance by 30%.
When changing configuration was not giving any more performance gain or resource savings, the next logical step was to check the code itself. Most workflows were created with the goal of achieving results as soon as possible. That is why the efficiency of the workflows was mostly on a second place. The main parts we were focused on were: repartition statement, presence of UDF’s, join keys, complexity of transformations. Spark workflows contained a lot of
repartition() statements and quite commonly debugging information. Repartition operation can be very useful to reduce unnecessary shuffle and to store results in a way that is suitable for further processing. But in a lot of cases repartition was applied in the middle of transformation, which only introduced extra shuffle stage rather than reducing it. Also repartition without specifying partition column distributes data not deterministic, which adds extra shuffle operation during the execution of the workflow. The statement
df.cache().count() without any variable assigned to it could be found quite often. It triggers Spark to fetch the whole dataset into memory. It hardly had any purpose except showing data set size during the development of workflows. Just by removing this statement, workflow could be made much more efficient.
Most workflows contained a lot of UDF’s. Because in this case Python is the main language for Spark workflows, using UDF’s for every small and large action brought a lot of overhead and drastically decreased performance. The reason for that was that the main use case for Spark workflows is data science related. That’s why workflows contain some sophisticated logic, which is more readable if it is moved to a separate function and/or package. Most of the Spark workflows contain UDF’s for every simple transformation like parsing JSON data, exploding arrays, applying regex filters. Replacing these UDF’s with Spark basic features like DSL/SQL statements, drastically improved performance of workflows.
Despite all these examples and patterns, some workflows were exceptionally well written, but well, sometimes you indeed have a long running Spark workflow. Next step was to check the data.
Data skew is a tricky problem to solve in general, and difficult to identify. Especially when you have lots of stages in your Spark workflow and when every stage on average processes a couple of terabytes of data. If the tasks and/or the stages are stuck, the utilization of CPU is low, and you have a lot of Out of Memory errors, it might be a symptom of the skew data. The main evidence that the data is skewed is a drastically different size of partitions. There are a couple of techniques that might help, but in any case that is a long path adjustments. To solve data skewness one might use repartition, broadcasting data, data salting and/or changing join key techniques. But in general, there is no fixed solution against data skew problems. Of course you can get a lot of necessary information about Spark workflows from the Spark UI, but to get a better grasp about the resource allocation we have used Sparklint extensively.
As an example we took a Spark workflow that was running approximately for 6 hours processing 500 GB of data. After examining the code and metrics from the Spark UI and Sparklint, the most important problems with this workflow was data skew and the huge size of partitions during shuffle. Solving the size of shuffle partitions was relatively easy by setting
spark.sql.shuffle.partitions parameter. Solving data skew was achieved by finding the correct join key, which requires knowledge of the data structure that is processed. After applying the improvements, the optimized version of the Spark workflow takes about an hour to execute and the resource consumption was dropped by 77%.
Another remarkable example of optimization was achieved by changing the underlying data structure. One of the biggest workflows was using ORC as the storage format. After adjusting bucketing and partitioning, the performance of workflow was improved x100 times.
Not only Spark
Sometimes optimization had nothing to do with Spark itself, at least not directly. While optimizing Spark workflows we found some of them which were ‘eventually successful’. That means they were finishing successfully after some (each time different) number of attempts. Diving deeper into failed attempts, it was seen that the workflows were failing with
too many open files error. Although the error message is quite descriptive, it was still tricky to solve. The main issue was that the problem was not reproducible. That means, if you rerun the spark workflow, it might finish successfully without any issues.
After investigation, it was seen that the issue occurred only on specific nodes in the cluster. Every Spark workflow has at least two retries, with random assigned node distribution. So basically with enough attempts the workflow will in the end finish successfully. After this finding, it was easy to fix by increasing the maximum number of open file descriptors to a reasonable amount (64K in our case) on specific nodes. After that, the workflows that were taking a couple of attempts to finish, were finishing after the first attempt. Because it made workflow more stable, we reduced the time resources were claimed by workflow on a cluster.
Share & Coach
Communication with Spark users was very important during this process for two reasons: since new Spark workflow are developed every day, it is important to share findings and best practices to improve code standards and quality; on the other hand, changing configuration of existing workflows has impact on their performance and maintenance, which should be aligned with business objectives and therefore workflow owners. Although we were focusing on the technical side, one of the objectives of the initiative was to make the obtained knowledge available for Spark users through documentation, training materials, and workshops.
The initiative was active for 4 months and as a result brought about 7% reduction of resource usage in terms of memory and CPU for Spark workflow in total.
The initiative made quite a resonance among Spark users. For some cases it was enough to provide Spark users with the proposed optimization, while in other cases diving deep into the internals of Spark was required. To support the knowledge we published different blog posts with findings and prepared a workshop. All this collaboration resulted in significant changes in culture and approaches for implementing Spark workflows.
It’s always important to estimate the results of the work that was done. Although this initiative has clear measurable outcomes in terms of reducing allocated resources and cultural shift, it also revealed the need for a clear and structural approach to the problem of optimization. One of the main outcomes was the understanding that technical improvements should come hand in hand with mentality changes. Clarifying and emphasizing the need to write optimal Spark workflows was equally important as optimization itself.