Yongjun Zhang | Software Engineer, Ang Zhang | Engineering Manager, Shaowen Wang | Software Engineer, Batch Processing Platform Team
Pinterest’s Batch Processing Platform, Monarch, runs most of the batch processing workflows of the company. At the scale shown in Table 1, it is important to manage the platform resources to provide quality of service (QoS) while achieving cost efficiency. This article shares how we do that and future work.
Table 1: Scale of Monarch Batch Processing Platform
Introduction of Monarch
Figure 1 shows what Pinterest’s data system looks like at a high level. When users are using Pinterest applications on their mobile or desktop devices, they generate various logs that are ingested to our system via Singer + Kafka (see Scalable and reliable data ingestion at Pinterest) and the resulting data is stored to S3. Then the data is processed and analyzed by various workflows like sanitization, analytics, and machine learning data preparation. The results of the workflows are typically stored back to S3. There are essentially two types of processing platforms: batch and streaming. This blog is about the batch processing platform named Monarch. See this blog for more information about the streaming platform.
As an in-house big data platform, Monarch provides the infrastructure, services, and tools to help users develop, build, deploy, and troubleshoot their batch processing applications (mostly in the form of workflows) at scale. Monarch consists of more than 20 Hadoop YARN clusters built entirely in the Cloud utilizing AWS EC2, and we use many different instance types offered by EC2. The actual EC2 instance type we employ at a cluster depends on its workload; some clusters are more optimized for computing, while others have more memory or disk capacity.
User workflows can be submitted to Monarch from Spinner (an internal workflow platform built on top of Airflow) and other UI based workflow orchestration tools via Job Submission Service, or JSS (see Figure 2). The user workflow source code typically specifies the cluster and queue in which the workflow should run.
Figure 1. Pinterest Data System and the Batch Processing Platform (Monarch).
Figure 2. Pinterest Job Submission Service. See more description in the text.
Resource Management Challenges
Hadoop YARN is used to manage the cluster resources and task scheduling. The cluster resources are represented as a tree of queues. All the resources of the cluster, or all the EC2 instances the cluster has, are represented as the “root” of the tree, and the leaf nodes of the tree are where applications run. The weight configuration of a queue determines the amount of resources allocated to it. Child nodes of the same parent node share the resources allocated to the parent. How much resource a child gets is based on the ratio of this child’s weight over the sum of the weights of all sibling nodes. By setting the node weight, we can control how many EC2 instances are assigned to any given queue. YARN supports multiple schedulers, and the Fair Scheduler is used in Monarch.
Figure 3. YARN’s resource allocation: Tree of Queues of Adhoc Structure.
The goal of using a tree of queues to represent resource allocation is to achieve resource isolation between workflows that run in different queues. However, Monarch initially didn’t have a consistent queue structure, as shown in Figure 3. Some queues were allocated to specific projects, some were for organizations, and others for workflows of a certain priority. As a result, there was severe interference between different workflows running in the same queue — more critical workflows were often impacted by non-critical ones.
There were mainly two reasons for interference:
- Workflows running in the same queue are treated the same. With no notion of priority, the scheduler has no way to give more resources to more critical workflows.
- There is a parameter maxRunningApps to control how many applications can concurrently run in a given queue. This prevents too many applications competing for resources, in which situation no application can make good progress. However, if lower priority workflows are submitted first and saturate the maxRunningApps, then critical workflows submitted later can be stuck for a long time without being scheduled.
To address these issues, we introduced workflow tiering and changed the resource allocation queues to be tier-ed accordingly.
Workflow Tiering and Hierarchical Queue Structure
The workloads on Monarch are typically in the form of workflows. Workflow is represented as a Directed Acyclic Graph (DAG) of multiple jobs to process input data and generate output. The jobs in the same workflow run in parallel or sequentially depending on whether there is dependency on each other. We took two main steps to provide QoS for workflows while achieving cost efficiency.
Firstly, we added tiering to distinguish critical workflows from non-critical ones. The critical workflows typically have higher requirements on the finishing time. We decided to classify workflows into three tiers: tier1, tier2, and tier3 (tier1 has the highest importance). Then we worked with user teams to define the tiering and runtime service level objective (SLO) of all workflows that run on the Monarch platform.
Secondly, we changed the resource queue structure across all clusters to have the notion of tiering, project, and organization. Given that each workflow is associated with a project, each project belongs to a team, and each team belongs to a larger organization, we decided to create a three level hierarchical queue structure: organization, project, and tier. See Figure 4 for an example (“default” is used in place of tier3, for historical reason).
Figure 4. Hierarchical Queues with Organization, Project and Tiering.
Some of the most important configurations of the queues are:
- Weight: The weight of a queue determines the amount of resources allocated to it. Child nodes of the same parent node share the parent’s resources based on the relative ratio of their weights.
- MaxRunningApps: The maximum number of applications that can run concurrently within the queue. This prevents from having too many applications running in the same queue of limited resources, meaning no applications can make good progress.
- Preemption:
- preemption: whether to enable preemption
- fairSharePreemptionTimeout: number of seconds the queue is under its fair share threshold before it will try to preempt containers to take resources from other queues.
- fairSharePreemptionThreshold: the fair share preemption threshold for the queue. If the queue waits fairSharePreemptionTimeout without receiving fairSharePreemptionThreshold*fairShare resources, it is allowed to preempt containers to take resources from other queues.
- allowPreemptionFrom: determines whether the scheduler is allowed to preempt resources from the queue.
We configure tier1 queues to not allow preemption and also configure the other two parameters (fairSharePreemptionTimeout and fairSharePreemptionThreshold) to smaller values than for tier2 and tier3 queues. This allows tier1 queues to acquire resources faster when they are not getting their fair share of resources.
Because Monarch has many clusters, and the workflows running on different clusters could change from time to time, it’s not practical or efficient to manually create the queue structures. We developed a tool that analyzes the historical data of the workflows on the clusters, generates the queue structure, and updates the settings automatically and periodically.
Besides the preemption configuration described above, two of the most important configurations are the queue weight and maxRunningApps. In the next section, we will share more details on the algorithm we use to generate these settings.
Resource Allocation Algorithm
The workflows running in a queue have different requirements at different times. To ensure QoS of the critical workflows, we designed an algorithm to assign queue weight based on historical run data, namely, the Percentile Algorithm.
Figure 5. The Percentile Resource Allocation Algorithm.
The algorithm looks at the historical run data within the most recent time window, such as 30 days, to see how much resource is needed for a given queue. Below is what it does:
- Step 1: The queue may be used at some times and may be vacant at other times. When the queue is being used, sometimes X EC2 instances are being used and sometimes Y EC2 instances are being used. The algorithm divides the time window into time units; each unit is a timespan that the same number of EC2 instances are used. The time unit is represented as <timeLength, instanceUsed>. (See the left side of Figure 5)
- Step 2: Excluding the time units in which the queue is vacant, sort the time units by the number of instances used in the time unit (see the right side of Figure 5) from smallest to largest.
- Step 3: Determine the minimum number of instances to allocate to the queue to make sure a pre-specified time length percentage threshold is met. This threshold means, given the total length of time units (TTIU) that the queue is in use, the allocated resource needs to be enough to satisfy the percentage of TTIU. For example, for a tier1 queue that is used for 240 hours in total within a 30-day window (vacant other times), we’d like to guarantee the resources for 95% of the time, thus it’s 228 hours. The algorithm finds out the number of instances being used at the sorted results from Step 2. For example, tu0 + tu4 + tu7 + tu2 is about 95% of the whole time length in use, then the number of instances used in tu2 is the number of instances to be allocated to this queue. If we were to allocate the number of instances used in tu5, which is larger than used in tu2, it would potentially cause waste because tu2 is only 5% of the whole time the queue is in use.
The 95% threshold above is just an example. We evaluated the resource usage of different tiers and came up with different thresholds based on the size of the clusters and resources used by those workflows. The thresholds are also adjusted from time to time when the percentages of resources used by different tiers change.
There are several reasons we don’t have to guarantee 100% of the resources required at the peak usage time of a given tier1/tier2 queue, thus avoiding waste.
- The workflow tiering has a rough distribution such that ~10% workflows are tier1, 20–30% tier2, and 60–70% tier3.
- Not all queues are busy at the same time, and the YARN scheduler allows workflows to use resources available at other queues.
- Higher tier queues can preempt resources faster.
We measure the resource headroom of a queue by a metric called usage/capacity ratio. The capacity of a queue is the number of instances allocated to the queue times the length of the time window being measured. The usage is measured by YARN as instance-hours. E.g., if the queue uses X instances for Y hours, the resource usage is X * Y instance-hours. In addition, we also measure vcore-hours and memory-hours usage/capacity ratio in a similar fashion to see how balanced the vcore and memory resource usage is. Notice that YARN reported vcore-hours and memory-hours, and we use the dominant-resource (DR) method to calculate the instance-hours here.
The algorithm ensures the percentage threshold is set in a decreasing order from tier1 to tier3 queues, while it also ensures that the usage/capacity ratio is in an increasing order. This means the head room is the largest for tier1, second for tier2, and smallest for tier3.
The resource allocation algorithm also looks at historical run data to determine the maxRunningApps setting and sets this configuration with some headroom for each queue.
Comparing with Autoscaling
Autoscaling is another common approach to save cost in the Cloud, scaling up the cluster when needed and scaling down when peak demand has passed. Because Cloud providers normally charge much higher rates for on-demand capacity than reserved instances, users normally reserve the capacity that is always required and use on-demand instances for the autoscaling.
Autoscaling works well for online services at Pinterest, but we found it is not as cost efficient for batch processing for the following reasons:
- Tasks from large scale batch processing can run for hours, and the two options to scale down the cluster are wasteful. Scaling down gracefully and waiting for running tasks to finish (i.e. draining the instances before terminating them) potentially wastes a significant amount of resources because the instances may not be fully utilized. Scaling down by terminating instances forcefully even when tasks are still running on them means unfinished computing is wasted (and longer runtime for the involved jobs) and extra resources are needed to rerun the terminated tasks.
- In order for autoscaling using on-demand instances to make economical sense when compared with reserved instances, we estimated the percentage of time of peak consumption of the cluster using on-demand instances will need to be less than 30% for certain instance types. Considering the time it takes to scale down, the percentage would be a lot smaller. However, it’s hard to control this percentage, and resources can easily be wasted if the percentage goes higher.
- At Pinterest’s big data processing scale, using autoscaling would require getting hundreds or more instances of desired instance types during peak hours, which is not always possible. Not getting enough resources to run critical workflows could affect the business in a significant way.
By utilizing the resource allocation algorithm described above and workflow tiering, we were able to utilize good reserved instance pricing while still guaranteeing enough resources for critical workflows when needed.
Please note that in this blog, we focus on production workflows, not adhoc workloads like Spark SQL queries from Querybook or PySpark jobs from Jupyter notebooks. On adhoc clusters, we do utilize autoscaling with Spot instances because the peak usage only lasts 2–3 hours on business days.
Workflow Performance Monitoring
When allocating resources for a workflow, the runtime SLO is an important factor to consider. For example, if the workflow uses X instances-hours resources, and the runtime SLO is 12 hours, then the number of instances needed to run this workflow is X / 12.
With the resource allocation being in effect, we need a way to monitor the overall workflow runtime performance. We developed a dashboard to show how each tier workflows are performing in various clusters.
Within a time window of a certain size, for any given workflow, if it is run for X times and Y runs meet SLO, its SLO success ratio is defined as Y/X. It’s ideal if this ratio is 100% for any given workflow, but it’s not feasible for many reasons. As a compromise, we define a workflow as SLO-successful if its SLO success ratio is no less than 90%.
As mentioned earlier, we classified workflows into three tiers. For workflows of each tier, we measure the percentage of workflows that are SLO-successful. Our goal is to have this percentage higher than 90%.
Figure 6 is a snapshot of the dashboard that measures the performance of the 30-day time window. Before the project, the tier1 workflow’s success percentage was around 70%. It has been improved to and stabilized around 90% now. While we try to make most tier1 workflows successful, the same metrics of other tiers are not sacrificed too much because they have less stringent SLO requirements.
Figure 6. Workflow performance monitoring: runtime SLO success ratio of each tier.
Cluster Resource Usage Monitoring
The workflow requirement is not static and may change from time to time. A daily report is done for each cluster on the following metrics:
- Total, tier1, tier2, and tier3 usage/capacity ratio (including instance, vcore, memory)
- Number of all tier1, tier2, and tier3 workflows running in the cluster (there may be new workflows onboarded, or re-tiering and SLO change of existing workflows)
Based on these metrics, we determine if the cluster is over or under utilized and take actions by either adding more resources to the cluster (organic growth), downsizing the cluster to save cost, or keeping it as is.
Cross-Cluster Routing And Load Balancing
As mentioned earlier, different workflows have different resource needs — some require more memory, some more CPU, and others more disk IO or storage. Their needs may change over time. Additionally, some clusters may become full while others are underutilized over time. Through monitoring resource consumption, we may find better home clusters for the workflows than their current ones. To ask users to change their source code to move the workflow is a tedious process, as we also have to adjust the resource allocation when we move the workflow.
We developed a cross-cluster routing (CCR) capability to change the target cluster of the workflows without the need of users to change settings. To implement this, we added instrumentation logic in the JSS component that can redirect jobs to another cluster as we need.
We also developed a workflow to periodically analyze the cluster usage and choose candidate workflows to move to other clusters to keep improving the load balancing and cost efficiency.
To enable redirecting jobs, we need to do resource allocation change on the target cluster with the above mentioned algorithm. To achieve this, we automated the resource allocation process such that with a single button click (triggering a workflow), it will do both resource allocation and configure job redirection in one step.
Current and Future Work
At the time of writing, our metrics indicate the vcore and memory usage of a fairly big cluster is not balanced, and a lot of vcores are wasted as a result. We are working on splitting this cluster into two clusters of different instance types with CCR support and migrating the workflows running on the original cluster into one of the resulting clusters. We expect with this change we will be able to not only run the applications more reliably, but also save a lot of cost.
Our clusters are located at different availability zones. When one zone has an issue, we can leverage the CCR feature to move critical workflows to another cluster in a different zone. We are working on making this process smoother.
We are also looking into dynamically route jobs at runtime to different clusters when the current load on the target cluster is full.
Acknowledgement
Thanks to Hengzhe Guo, Bogdan Pisica, Sandeep Kumar from the Batch Processing Platform team who helped further improve the implementations. Thanks to Soam Acharya, Jooseong Kim and Hannah Chen for driving the workflow tiering. Thanks to Jooseong Kim, William Tom, Soam Acharya, Chunyan Wang for the discussions and support along the way. Thanks to the workflow team, our platform user teams for their feedback and support.