Efficient Resource Management at Pinterest’s Batch Processing Platform

1,273
Pinterest
Pinterest is a social bookmarking site where users collect and share photos of their favorite events, interests and hobbies. One of the fastest growing social networks online, Pinterest is the third-largest such network behind only Facebook and Twitter.

Yongjun Zhang | Software Engineer, Ang Zhang | Engineering Manager, Shaowen Wang | Software Engineer, Batch Processing Platform Team


Pinterest’s Batch Processing Platform, Monarch, runs most of the batch processing workflows of the company. At the scale shown in Table 1, it is important to manage the platform resources to provide quality of service (QoS) while achieving cost efficiency. This article shares how we do that and future work.

alt_text

Table 1: Scale of Monarch Batch Processing Platform

Introduction of Monarch

Figure 1 shows what Pinterest’s data system looks like at a high level. When users are using Pinterest applications on their mobile or desktop devices, they generate various logs that are ingested to our system via Singer + Kafka (see Scalable and reliable data ingestion at Pinterest) and the resulting data is stored to S3. Then the data is processed and analyzed by various workflows like sanitization, analytics, and machine learning data preparation. The results of the workflows are typically stored back to S3. There are essentially two types of processing platforms: batch and streaming. This blog is about the batch processing platform named Monarch. See this blog for more information about the streaming platform.

As an in-house big data platform, Monarch provides the infrastructure, services, and tools to help users develop, build, deploy, and troubleshoot their batch processing applications (mostly in the form of workflows) at scale. Monarch consists of more than 20 Hadoop YARN clusters built entirely in the Cloud utilizing AWS EC2, and we use many different instance types offered by EC2. The actual EC2 instance type we employ at a cluster depends on its workload; some clusters are more optimized for computing, while others have more memory or disk capacity.

User workflows can be submitted to Monarch from Spinner (an internal workflow platform built on top of Airflow) and other UI based workflow orchestration tools via Job Submission Service, or JSS (see Figure 2). The user workflow source code typically specifies the cluster and queue in which the workflow should run.

alt_text

Figure 1. Pinterest Data System and the Batch Processing Platform (Monarch).

alt_text

Figure 2. Pinterest Job Submission Service. See more description in the text.

Resource Management Challenges

Hadoop YARN is used to manage the cluster resources and task scheduling. The cluster resources are represented as a tree of queues. All the resources of the cluster, or all the EC2 instances the cluster has, are represented as the “root” of the tree, and the leaf nodes of the tree are where applications run. The weight configuration of a queue determines the amount of resources allocated to it. Child nodes of the same parent node share the resources allocated to the parent. How much resource a child gets is based on the ratio of this child’s weight over the sum of the weights of all sibling nodes. By setting the node weight, we can control how many EC2 instances are assigned to any given queue. YARN supports multiple schedulers, and the Fair Scheduler is used in Monarch.

alt_text

Figure 3. YARN’s resource allocation: Tree of Queues of Adhoc Structure.

The goal of using a tree of queues to represent resource allocation is to achieve resource isolation between workflows that run in different queues. However, Monarch initially didn’t have a consistent queue structure, as shown in Figure 3. Some queues were allocated to specific projects, some were for organizations, and others for workflows of a certain priority. As a result, there was severe interference between different workflows running in the same queue — more critical workflows were often impacted by non-critical ones.

There were mainly two reasons for interference:

  1. Workflows running in the same queue are treated the same. With no notion of priority, the scheduler has no way to give more resources to more critical workflows.
  2. There is a parameter maxRunningApps to control how many applications can concurrently run in a given queue. This prevents too many applications competing for resources, in which situation no application can make good progress. However, if lower priority workflows are submitted first and saturate the maxRunningApps, then critical workflows submitted later can be stuck for a long time without being scheduled.

To address these issues, we introduced workflow tiering and changed the resource allocation queues to be tier-ed accordingly.

Workflow Tiering and Hierarchical Queue Structure

The workloads on Monarch are typically in the form of workflows. Workflow is represented as a Directed Acyclic Graph (DAG) of multiple jobs to process input data and generate output. The jobs in the same workflow run in parallel or sequentially depending on whether there is dependency on each other. We took two main steps to provide QoS for workflows while achieving cost efficiency.

Firstly, we added tiering to distinguish critical workflows from non-critical ones. The critical workflows typically have higher requirements on the finishing time. We decided to classify workflows into three tiers: tier1, tier2, and tier3 (tier1 has the highest importance). Then we worked with user teams to define the tiering and runtime service level objective (SLO) of all workflows that run on the Monarch platform.

Secondly, we changed the resource queue structure across all clusters to have the notion of tiering, project, and organization. Given that each workflow is associated with a project, each project belongs to a team, and each team belongs to a larger organization, we decided to create a three level hierarchical queue structure: organization, project, and tier. See Figure 4 for an example (“default” is used in place of tier3, for historical reason).

alt_text

Figure 4. Hierarchical Queues with Organization, Project and Tiering.

Some of the most important configurations of the queues are:

  • Weight: The weight of a queue determines the amount of resources allocated to it. Child nodes of the same parent node share the parent’s resources based on the relative ratio of their weights.
  • MaxRunningApps: The maximum number of applications that can run concurrently within the queue. This prevents from having too many applications running in the same queue of limited resources, meaning no applications can make good progress.
  • Preemption:
  1. preemption: whether to enable preemption
  2. fairSharePreemptionTimeout: number of seconds the queue is under its fair share threshold before it will try to preempt containers to take resources from other queues.
  3. fairSharePreemptionThreshold: the fair share preemption threshold for the queue. If the queue waits fairSharePreemptionTimeout without receiving fairSharePreemptionThreshold*fairShare resources, it is allowed to preempt containers to take resources from other queues.
  4. allowPreemptionFrom: determines whether the scheduler is allowed to preempt resources from the queue.

We configure tier1 queues to not allow preemption and also configure the other two parameters (fairSharePreemptionTimeout and fairSharePreemptionThreshold) to smaller values than for tier2 and tier3 queues. This allows tier1 queues to acquire resources faster when they are not getting their fair share of resources.

Because Monarch has many clusters, and the workflows running on different clusters could change from time to time, it’s not practical or efficient to manually create the queue structures. We developed a tool that analyzes the historical data of the workflows on the clusters, generates the queue structure, and updates the settings automatically and periodically.

Besides the preemption configuration described above, two of the most important configurations are the queue weight and maxRunningApps. In the next section, we will share more details on the algorithm we use to generate these settings.

Resource Allocation Algorithm

The workflows running in a queue have different requirements at different times. To ensure QoS of the critical workflows, we designed an algorithm to assign queue weight based on historical run data, namely, the Percentile Algorithm.

alt_text

Figure 5. The Percentile Resource Allocation Algorithm.

The algorithm looks at the historical run data within the most recent time window, such as 30 days, to see how much resource is needed for a given queue. Below is what it does:

  • Step 1: The queue may be used at some times and may be vacant at other times. When the queue is being used, sometimes X EC2 instances are being used and sometimes Y EC2 instances are being used. The algorithm divides the time window into time units; each unit is a timespan that the same number of EC2 instances are used. The time unit is represented as <timeLength, instanceUsed>. (See the left side of Figure 5)
  • Step 2: Excluding the time units in which the queue is vacant, sort the time units by the number of instances used in the time unit (see the right side of Figure 5) from smallest to largest.
  • Step 3: Determine the minimum number of instances to allocate to the queue to make sure a pre-specified time length percentage threshold is met. This threshold means, given the total length of time units (TTIU) that the queue is in use, the allocated resource needs to be enough to satisfy the percentage of TTIU. For example, for a tier1 queue that is used for 240 hours in total within a 30-day window (vacant other times), we’d like to guarantee the resources for 95% of the time, thus it’s 228 hours. The algorithm finds out the number of instances being used at the sorted results from Step 2. For example, tu0 + tu4 + tu7 + tu2 is about 95% of the whole time length in use, then the number of instances used in tu2 is the number of instances to be allocated to this queue. If we were to allocate the number of instances used in tu5, which is larger than used in tu2, it would potentially cause waste because tu2 is only 5% of the whole time the queue is in use.

The 95% threshold above is just an example. We evaluated the resource usage of different tiers and came up with different thresholds based on the size of the clusters and resources used by those workflows. The thresholds are also adjusted from time to time when the percentages of resources used by different tiers change.

There are several reasons we don’t have to guarantee 100% of the resources required at the peak usage time of a given tier1/tier2 queue, thus avoiding waste.

  1. The workflow tiering has a rough distribution such that ~10% workflows are tier1, 20–30% tier2, and 60–70% tier3.
  2. Not all queues are busy at the same time, and the YARN scheduler allows workflows to use resources available at other queues.
  3. Higher tier queues can preempt resources faster.

We measure the resource headroom of a queue by a metric called usage/capacity ratio. The capacity of a queue is the number of instances allocated to the queue times the length of the time window being measured. The usage is measured by YARN as instance-hours. E.g., if the queue uses X instances for Y hours, the resource usage is X * Y instance-hours. In addition, we also measure vcore-hours and memory-hours usage/capacity ratio in a similar fashion to see how balanced the vcore and memory resource usage is. Notice that YARN reported vcore-hours and memory-hours, and we use the dominant-resource (DR) method to calculate the instance-hours here.

The algorithm ensures the percentage threshold is set in a decreasing order from tier1 to tier3 queues, while it also ensures that the usage/capacity ratio is in an increasing order. This means the head room is the largest for tier1, second for tier2, and smallest for tier3.

The resource allocation algorithm also looks at historical run data to determine the maxRunningApps setting and sets this configuration with some headroom for each queue.

Comparing with Autoscaling

Autoscaling is another common approach to save cost in the Cloud, scaling up the cluster when needed and scaling down when peak demand has passed. Because Cloud providers normally charge much higher rates for on-demand capacity than reserved instances, users normally reserve the capacity that is always required and use on-demand instances for the autoscaling.

Autoscaling works well for online services at Pinterest, but we found it is not as cost efficient for batch processing for the following reasons:

  1. Tasks from large scale batch processing can run for hours, and the two options to scale down the cluster are wasteful. Scaling down gracefully and waiting for running tasks to finish (i.e. draining the instances before terminating them) potentially wastes a significant amount of resources because the instances may not be fully utilized. Scaling down by terminating instances forcefully even when tasks are still running on them means unfinished computing is wasted (and longer runtime for the involved jobs) and extra resources are needed to rerun the terminated tasks.
  2. In order for autoscaling using on-demand instances to make economical sense when compared with reserved instances, we estimated the percentage of time of peak consumption of the cluster using on-demand instances will need to be less than 30% for certain instance types. Considering the time it takes to scale down, the percentage would be a lot smaller. However, it’s hard to control this percentage, and resources can easily be wasted if the percentage goes higher.
  3. At Pinterest’s big data processing scale, using autoscaling would require getting hundreds or more instances of desired instance types during peak hours, which is not always possible. Not getting enough resources to run critical workflows could affect the business in a significant way.

By utilizing the resource allocation algorithm described above and workflow tiering, we were able to utilize good reserved instance pricing while still guaranteeing enough resources for critical workflows when needed.

Please note that in this blog, we focus on production workflows, not adhoc workloads like Spark SQL queries from Querybook or PySpark jobs from Jupyter notebooks. On adhoc clusters, we do utilize autoscaling with Spot instances because the peak usage only lasts 2–3 hours on business days.

Workflow Performance Monitoring

When allocating resources for a workflow, the runtime SLO is an important factor to consider. For example, if the workflow uses X instances-hours resources, and the runtime SLO is 12 hours, then the number of instances needed to run this workflow is X / 12.

With the resource allocation being in effect, we need a way to monitor the overall workflow runtime performance. We developed a dashboard to show how each tier workflows are performing in various clusters.

Within a time window of a certain size, for any given workflow, if it is run for X times and Y runs meet SLO, its SLO success ratio is defined as Y/X. It’s ideal if this ratio is 100% for any given workflow, but it’s not feasible for many reasons. As a compromise, we define a workflow as SLO-successful if its SLO success ratio is no less than 90%.

As mentioned earlier, we classified workflows into three tiers. For workflows of each tier, we measure the percentage of workflows that are SLO-successful. Our goal is to have this percentage higher than 90%.

Figure 6 is a snapshot of the dashboard that measures the performance of the 30-day time window. Before the project, the tier1 workflow’s success percentage was around 70%. It has been improved to and stabilized around 90% now. While we try to make most tier1 workflows successful, the same metrics of other tiers are not sacrificed too much because they have less stringent SLO requirements.

alt_text

Figure 6. Workflow performance monitoring: runtime SLO success ratio of each tier.

Cluster Resource Usage Monitoring

The workflow requirement is not static and may change from time to time. A daily report is done for each cluster on the following metrics:

  1. Total, tier1, tier2, and tier3 usage/capacity ratio (including instance, vcore, memory)
  2. Number of all tier1, tier2, and tier3 workflows running in the cluster (there may be new workflows onboarded, or re-tiering and SLO change of existing workflows)

Based on these metrics, we determine if the cluster is over or under utilized and take actions by either adding more resources to the cluster (organic growth), downsizing the cluster to save cost, or keeping it as is.

Cross-Cluster Routing And Load Balancing

As mentioned earlier, different workflows have different resource needs — some require more memory, some more CPU, and others more disk IO or storage. Their needs may change over time. Additionally, some clusters may become full while others are underutilized over time. Through monitoring resource consumption, we may find better home clusters for the workflows than their current ones. To ask users to change their source code to move the workflow is a tedious process, as we also have to adjust the resource allocation when we move the workflow.

We developed a cross-cluster routing (CCR) capability to change the target cluster of the workflows without the need of users to change settings. To implement this, we added instrumentation logic in the JSS component that can redirect jobs to another cluster as we need.

We also developed a workflow to periodically analyze the cluster usage and choose candidate workflows to move to other clusters to keep improving the load balancing and cost efficiency.

To enable redirecting jobs, we need to do resource allocation change on the target cluster with the above mentioned algorithm. To achieve this, we automated the resource allocation process such that with a single button click (triggering a workflow), it will do both resource allocation and configure job redirection in one step.

Current and Future Work

At the time of writing, our metrics indicate the vcore and memory usage of a fairly big cluster is not balanced, and a lot of vcores are wasted as a result. We are working on splitting this cluster into two clusters of different instance types with CCR support and migrating the workflows running on the original cluster into one of the resulting clusters. We expect with this change we will be able to not only run the applications more reliably, but also save a lot of cost.

Our clusters are located at different availability zones. When one zone has an issue, we can leverage the CCR feature to move critical workflows to another cluster in a different zone. We are working on making this process smoother.

We are also looking into dynamically route jobs at runtime to different clusters when the current load on the target cluster is full.

Acknowledgement

Thanks to Hengzhe Guo, Bogdan Pisica, Sandeep Kumar from the Batch Processing Platform team who helped further improve the implementations. Thanks to Soam Acharya, Jooseong Kim and Hannah Chen for driving the workflow tiering. Thanks to Jooseong Kim, William Tom, Soam Acharya, Chunyan Wang for the discussions and support along the way. Thanks to the workflow team, our platform user teams for their feedback and support.

Pinterest
Pinterest is a social bookmarking site where users collect and share photos of their favorite events, interests and hobbies. One of the fastest growing social networks online, Pinterest is the third-largest such network behind only Facebook and Twitter.
Tools mentioned in article
Open jobs at Pinterest
Android Engineer, Client Excellence
Mexico City, MEX

About Pinterest:  

Millions of people across the world come to Pinterest to find new ideas every day. It’s where they get inspiration, dream about new possibilities and plan for what matters most. Our mission is to help those people find their inspiration and create a life they love. In your role, you’ll be challenged to take on work that upholds this mission and pushes Pinterest forward. You’ll grow as a person and leader in your field, all the while helping Pinners make their lives better in the positive corner of the internet.

On the Client Excellence team you ensure Pinners have a high quality experience on Pinterest. You do this by improving our critical client metrics like crash-free users and by upgrading our supported libraries and operating systems. You also partner with other engineering teams to improve the developer experience and champion operational excellence.

What you’ll do:

  • Improve the quality of our apps by monitoring and improving core client metrics e.g. crash-free user rate, app size, memory management and cpu usage
  • Drive library and OS upgrades with minimal disruption across Pinterest
  • Partner with other engineering teams to improve client developer experience
  • Champion operational excellence across all client engineering teams

What we’re looking for:

  • Deep understanding of Android development and best practices in Java or Kotlin
  • Knowledge on multi-threading, logging, memory management, caching and builds on Android
  • Expertise in developing and debugging across a diverse service stack including storage and data solutions
  • Demonstrated track record of improving software quality with stable releases
  • Experience on platform teams/initiatives, driving technology adoption across feature teams
  • Keeps up to date with new technologies to understand what should be incorporated 
  • Strong collaboration and communication skills
Backend Engineer, Discovery Measurements
Mexico City, MEX

About Pinterest:  

Millions of people across the world come to Pinterest to find new ideas every day. It’s where they get inspiration, dream about new possibilities and plan for what matters most. Our mission is to help those people find their inspiration and create a life they love. In your role, you’ll be challenged to take on work that upholds this mission and pushes Pinterest forward. You’ll grow as a person and leader in your field, all the while helping Pinners make their lives better in the positive corner of the internet.

Pinterest personalizes millions of experiences by using machine learning algorithms to sift through our catalog of one hundred billion Pins to find the best content for each Pinner. It is critical to measure the users experience across Pinterest and identify opportunities for improvement. The Discovery Measurements team’s charter is to establish human-powered ground truth for major Pinterest products, e.g. Search and Ads, and develop company critical measurements about relevance, domain quality, session experience, retention, etc. As we look to scale these platforms both vertically and horizontally, we’re looking for strong software engineers to join the team to drive technical excellence and curiosity. We need someone who has experience as a backend developer as well as drive to dive into challenging data processing and data mining problems.

What you’ll do:

  • Build a platform that enables teams to evaluate and train their ML models
  • Design and scale company-wide online & offline measurement platforms for organic and ad content
  • Design and develop company critical measurements, including relevance, domain quality, session experience, retention, user satisfaction
  • Establish technical foundation to generate insightful signals about Pin and Pinners that could power other ML models in the Pinterest ecosystem
  • Partner with cross-functional stakeholders to align engineering efforts for high impact technical initiatives

What we’re looking for:

  • Fluent in any of the following languages: C/C++, Java, JavaScript, Python
  • Exposure to architectural patterns of a large, high-scale web application (e.g., well-designed APIs, high volume data pipelines, efficient algorithms)
  • Model of software engineering best practices, including agile development, unit testing, code reviews, design documentation, debugging, and problem solving
  • Familiar with large data processing and measurement
  • Curiosity for leveraging data and metrics to identify challenging opportunities and build impactful solutions
Engineering Manager, Client Excellence
Mexico City, MEX

About Pinterest:  

Millions of people across the world come to Pinterest to find new ideas every day. It’s where they get inspiration, dream about new possibilities and plan for what matters most. Our mission is to help those people find their inspiration and create a life they love. In your role, you’ll be challenged to take on work that upholds this mission and pushes Pinterest forward. You’ll grow as a person and leader in your field, all the while helping Pinners make their lives better in the positive corner of the internet.

We’re looking for an Engineering Manager to build out the Client Excellence team. This team of Android, iOS, Web and API engineers is responsible for ensuring Pinners have a high quality experience on Pinterest. They do this by creating tools to monitor and improve our critical client metrics like crash-free sessions, keeping our critical libraries up to date and partnering with other engineering teams to champion operational excellence.

What you’ll do:

  • Build out an experienced team of Android/iOS/Web/API engineers and help them develop new skills and advance in their careers
  • Provide a vision to the team, drive technical excellence and partner with key stakeholders to prioritize and deliver on the team's roadmap
  • Improve the quality of our apps by monitoring and improving core client metrics e.g. crash-free user rate, app size, memory management and cpu usage
  • Create an operational strategy to drive library and OS upgrades with minimal disruption across Pinterest
  • Partner with other engineering teams to discover future opportunities to improve client developer experience
  • Champion operational excellence across all client engineering teams

What we’re looking for:

  • Strong communication, people development and software project management skills
  • Ability to deliver on immediate goals and form long-term strategies around technology, processes, and people
  • Demonstrated track record of improving software quality with stable releases
  • Ability to dive deeply into platform metrics (e.g. crash rates, logging) to identify opportunities for focus
  • Experience leading platform teams/initiatives, driving technology adoption across feature teams
Fullstack Engineer, Discovery Measure...
Mexico City, MEX

About Pinterest:  

Millions of people across the world come to Pinterest to find new ideas every day. It’s where they get inspiration, dream about new possibilities and plan for what matters most. Our mission is to help those people find their inspiration and create a life they love. In your role, you’ll be challenged to take on work that upholds this mission and pushes Pinterest forward. You’ll grow as a person and leader in your field, all the while helping Pinners make their lives better in the positive corner of the internet.

Pinterest personalizes millions of experiences by using machine learning algorithms to sift through our catalog of one hundred billion Pins to find the best content for each Pinner. It is critical to measure the users experience across Pinterest and identify opportunities for improvement. The Discovery Measurements team’s charter is to establish human-powered ground truth for major Pinterest products, e.g. Search and Ads, and develop company critical measurements about relevance, domain quality, session experience, retention, and more. As we look to scale these platforms both vertically and horizontally, we’re looking for strong software engineers to join the team to drive technical excellence and curiosity. We need someone who has experience as a full-stack engineer to dive into challenging human-in-the-loop AI problems.

What you’ll do:

  • You will start by building human-in-the-loop AI platforms to power ML models on production
  • Design and implement the UI layer by closely working with Data Scientist, Product Managers, and Machine Learning engineers
  • Contribute to the new unified human computation backend service
  • Build the scalable backend API infrastructure which can be used to measure and evaluate all various deep learning and machine learning models on production

What we’re looking for:

  • Mastery in frontend stack (Javascript/HTML/CSS), familiarity with modern frontend frameworks (e.g. React/Redux)
  • Knowledge of backend stack (Java, Python, Go) and how they interact with MySQL, Redis, Kafka, etc.
  • Good judgment about shipping improvement quickly while ensuring the sustainability of platforms
  • Ability to measure and improve large scale platforms
Verified by
Security Software Engineer
Tech Lead, Big Data Platform
Software Engineer
Talent Brand Manager
Sourcer
Software Engineer
You may also like