Efficient Resource Management at Pinterest’s Batch Processing Platform

469
Pinterest
Pinterest is a social bookmarking site where users collect and share photos of their favorite events, interests and hobbies. One of the fastest growing social networks online, Pinterest is the third-largest such network behind only Facebook and Twitter.

Yongjun Zhang | Software Engineer, Ang Zhang | Engineering Manager, Shaowen Wang | Software Engineer, Batch Processing Platform Team


Pinterest’s Batch Processing Platform, Monarch, runs most of the batch processing workflows of the company. At the scale shown in Table 1, it is important to manage the platform resources to provide quality of service (QoS) while achieving cost efficiency. This article shares how we do that and future work.

alt_text

Table 1: Scale of Monarch Batch Processing Platform

Introduction of Monarch

Figure 1 shows what Pinterest’s data system looks like at a high level. When users are using Pinterest applications on their mobile or desktop devices, they generate various logs that are ingested to our system via Singer + Kafka (see Scalable and reliable data ingestion at Pinterest) and the resulting data is stored to S3. Then the data is processed and analyzed by various workflows like sanitization, analytics, and machine learning data preparation. The results of the workflows are typically stored back to S3. There are essentially two types of processing platforms: batch and streaming. This blog is about the batch processing platform named Monarch. See this blog for more information about the streaming platform.

As an in-house big data platform, Monarch provides the infrastructure, services, and tools to help users develop, build, deploy, and troubleshoot their batch processing applications (mostly in the form of workflows) at scale. Monarch consists of more than 20 Hadoop YARN clusters built entirely in the Cloud utilizing AWS EC2, and we use many different instance types offered by EC2. The actual EC2 instance type we employ at a cluster depends on its workload; some clusters are more optimized for computing, while others have more memory or disk capacity.

User workflows can be submitted to Monarch from Spinner (an internal workflow platform built on top of Airflow) and other UI based workflow orchestration tools via Job Submission Service, or JSS (see Figure 2). The user workflow source code typically specifies the cluster and queue in which the workflow should run.

alt_text

Figure 1. Pinterest Data System and the Batch Processing Platform (Monarch).

alt_text

Figure 2. Pinterest Job Submission Service. See more description in the text.

Resource Management Challenges

Hadoop YARN is used to manage the cluster resources and task scheduling. The cluster resources are represented as a tree of queues. All the resources of the cluster, or all the EC2 instances the cluster has, are represented as the “root” of the tree, and the leaf nodes of the tree are where applications run. The weight configuration of a queue determines the amount of resources allocated to it. Child nodes of the same parent node share the resources allocated to the parent. How much resource a child gets is based on the ratio of this child’s weight over the sum of the weights of all sibling nodes. By setting the node weight, we can control how many EC2 instances are assigned to any given queue. YARN supports multiple schedulers, and the Fair Scheduler is used in Monarch.

alt_text

Figure 3. YARN’s resource allocation: Tree of Queues of Adhoc Structure.

The goal of using a tree of queues to represent resource allocation is to achieve resource isolation between workflows that run in different queues. However, Monarch initially didn’t have a consistent queue structure, as shown in Figure 3. Some queues were allocated to specific projects, some were for organizations, and others for workflows of a certain priority. As a result, there was severe interference between different workflows running in the same queue — more critical workflows were often impacted by non-critical ones.

There were mainly two reasons for interference:

  1. Workflows running in the same queue are treated the same. With no notion of priority, the scheduler has no way to give more resources to more critical workflows.
  2. There is a parameter maxRunningApps to control how many applications can concurrently run in a given queue. This prevents too many applications competing for resources, in which situation no application can make good progress. However, if lower priority workflows are submitted first and saturate the maxRunningApps, then critical workflows submitted later can be stuck for a long time without being scheduled.

To address these issues, we introduced workflow tiering and changed the resource allocation queues to be tier-ed accordingly.

Workflow Tiering and Hierarchical Queue Structure

The workloads on Monarch are typically in the form of workflows. Workflow is represented as a Directed Acyclic Graph (DAG) of multiple jobs to process input data and generate output. The jobs in the same workflow run in parallel or sequentially depending on whether there is dependency on each other. We took two main steps to provide QoS for workflows while achieving cost efficiency.

Firstly, we added tiering to distinguish critical workflows from non-critical ones. The critical workflows typically have higher requirements on the finishing time. We decided to classify workflows into three tiers: tier1, tier2, and tier3 (tier1 has the highest importance). Then we worked with user teams to define the tiering and runtime service level objective (SLO) of all workflows that run on the Monarch platform.

Secondly, we changed the resource queue structure across all clusters to have the notion of tiering, project, and organization. Given that each workflow is associated with a project, each project belongs to a team, and each team belongs to a larger organization, we decided to create a three level hierarchical queue structure: organization, project, and tier. See Figure 4 for an example (“default” is used in place of tier3, for historical reason).

alt_text

Figure 4. Hierarchical Queues with Organization, Project and Tiering.

Some of the most important configurations of the queues are:

  • Weight: The weight of a queue determines the amount of resources allocated to it. Child nodes of the same parent node share the parent’s resources based on the relative ratio of their weights.
  • MaxRunningApps: The maximum number of applications that can run concurrently within the queue. This prevents from having too many applications running in the same queue of limited resources, meaning no applications can make good progress.
  • Preemption:
  1. preemption: whether to enable preemption
  2. fairSharePreemptionTimeout: number of seconds the queue is under its fair share threshold before it will try to preempt containers to take resources from other queues.
  3. fairSharePreemptionThreshold: the fair share preemption threshold for the queue. If the queue waits fairSharePreemptionTimeout without receiving fairSharePreemptionThreshold*fairShare resources, it is allowed to preempt containers to take resources from other queues.
  4. allowPreemptionFrom: determines whether the scheduler is allowed to preempt resources from the queue.

We configure tier1 queues to not allow preemption and also configure the other two parameters (fairSharePreemptionTimeout and fairSharePreemptionThreshold) to smaller values than for tier2 and tier3 queues. This allows tier1 queues to acquire resources faster when they are not getting their fair share of resources.

Because Monarch has many clusters, and the workflows running on different clusters could change from time to time, it’s not practical or efficient to manually create the queue structures. We developed a tool that analyzes the historical data of the workflows on the clusters, generates the queue structure, and updates the settings automatically and periodically.

Besides the preemption configuration described above, two of the most important configurations are the queue weight and maxRunningApps. In the next section, we will share more details on the algorithm we use to generate these settings.

Resource Allocation Algorithm

The workflows running in a queue have different requirements at different times. To ensure QoS of the critical workflows, we designed an algorithm to assign queue weight based on historical run data, namely, the Percentile Algorithm.

alt_text

Figure 5. The Percentile Resource Allocation Algorithm.

The algorithm looks at the historical run data within the most recent time window, such as 30 days, to see how much resource is needed for a given queue. Below is what it does:

  • Step 1: The queue may be used at some times and may be vacant at other times. When the queue is being used, sometimes X EC2 instances are being used and sometimes Y EC2 instances are being used. The algorithm divides the time window into time units; each unit is a timespan that the same number of EC2 instances are used. The time unit is represented as <timeLength, instanceUsed>. (See the left side of Figure 5)
  • Step 2: Excluding the time units in which the queue is vacant, sort the time units by the number of instances used in the time unit (see the right side of Figure 5) from smallest to largest.
  • Step 3: Determine the minimum number of instances to allocate to the queue to make sure a pre-specified time length percentage threshold is met. This threshold means, given the total length of time units (TTIU) that the queue is in use, the allocated resource needs to be enough to satisfy the percentage of TTIU. For example, for a tier1 queue that is used for 240 hours in total within a 30-day window (vacant other times), we’d like to guarantee the resources for 95% of the time, thus it’s 228 hours. The algorithm finds out the number of instances being used at the sorted results from Step 2. For example, tu0 + tu4 + tu7 + tu2 is about 95% of the whole time length in use, then the number of instances used in tu2 is the number of instances to be allocated to this queue. If we were to allocate the number of instances used in tu5, which is larger than used in tu2, it would potentially cause waste because tu2 is only 5% of the whole time the queue is in use.

The 95% threshold above is just an example. We evaluated the resource usage of different tiers and came up with different thresholds based on the size of the clusters and resources used by those workflows. The thresholds are also adjusted from time to time when the percentages of resources used by different tiers change.

There are several reasons we don’t have to guarantee 100% of the resources required at the peak usage time of a given tier1/tier2 queue, thus avoiding waste.

  1. The workflow tiering has a rough distribution such that ~10% workflows are tier1, 20–30% tier2, and 60–70% tier3.
  2. Not all queues are busy at the same time, and the YARN scheduler allows workflows to use resources available at other queues.
  3. Higher tier queues can preempt resources faster.

We measure the resource headroom of a queue by a metric called usage/capacity ratio. The capacity of a queue is the number of instances allocated to the queue times the length of the time window being measured. The usage is measured by YARN as instance-hours. E.g., if the queue uses X instances for Y hours, the resource usage is X * Y instance-hours. In addition, we also measure vcore-hours and memory-hours usage/capacity ratio in a similar fashion to see how balanced the vcore and memory resource usage is. Notice that YARN reported vcore-hours and memory-hours, and we use the dominant-resource (DR) method to calculate the instance-hours here.

The algorithm ensures the percentage threshold is set in a decreasing order from tier1 to tier3 queues, while it also ensures that the usage/capacity ratio is in an increasing order. This means the head room is the largest for tier1, second for tier2, and smallest for tier3.

The resource allocation algorithm also looks at historical run data to determine the maxRunningApps setting and sets this configuration with some headroom for each queue.

Comparing with Autoscaling

Autoscaling is another common approach to save cost in the Cloud, scaling up the cluster when needed and scaling down when peak demand has passed. Because Cloud providers normally charge much higher rates for on-demand capacity than reserved instances, users normally reserve the capacity that is always required and use on-demand instances for the autoscaling.

Autoscaling works well for online services at Pinterest, but we found it is not as cost efficient for batch processing for the following reasons:

  1. Tasks from large scale batch processing can run for hours, and the two options to scale down the cluster are wasteful. Scaling down gracefully and waiting for running tasks to finish (i.e. draining the instances before terminating them) potentially wastes a significant amount of resources because the instances may not be fully utilized. Scaling down by terminating instances forcefully even when tasks are still running on them means unfinished computing is wasted (and longer runtime for the involved jobs) and extra resources are needed to rerun the terminated tasks.
  2. In order for autoscaling using on-demand instances to make economical sense when compared with reserved instances, we estimated the percentage of time of peak consumption of the cluster using on-demand instances will need to be less than 30% for certain instance types. Considering the time it takes to scale down, the percentage would be a lot smaller. However, it’s hard to control this percentage, and resources can easily be wasted if the percentage goes higher.
  3. At Pinterest’s big data processing scale, using autoscaling would require getting hundreds or more instances of desired instance types during peak hours, which is not always possible. Not getting enough resources to run critical workflows could affect the business in a significant way.

By utilizing the resource allocation algorithm described above and workflow tiering, we were able to utilize good reserved instance pricing while still guaranteeing enough resources for critical workflows when needed.

Please note that in this blog, we focus on production workflows, not adhoc workloads like Spark SQL queries from Querybook or PySpark jobs from Jupyter notebooks. On adhoc clusters, we do utilize autoscaling with Spot instances because the peak usage only lasts 2–3 hours on business days.

Workflow Performance Monitoring

When allocating resources for a workflow, the runtime SLO is an important factor to consider. For example, if the workflow uses X instances-hours resources, and the runtime SLO is 12 hours, then the number of instances needed to run this workflow is X / 12.

With the resource allocation being in effect, we need a way to monitor the overall workflow runtime performance. We developed a dashboard to show how each tier workflows are performing in various clusters.

Within a time window of a certain size, for any given workflow, if it is run for X times and Y runs meet SLO, its SLO success ratio is defined as Y/X. It’s ideal if this ratio is 100% for any given workflow, but it’s not feasible for many reasons. As a compromise, we define a workflow as SLO-successful if its SLO success ratio is no less than 90%.

As mentioned earlier, we classified workflows into three tiers. For workflows of each tier, we measure the percentage of workflows that are SLO-successful. Our goal is to have this percentage higher than 90%.

Figure 6 is a snapshot of the dashboard that measures the performance of the 30-day time window. Before the project, the tier1 workflow’s success percentage was around 70%. It has been improved to and stabilized around 90% now. While we try to make most tier1 workflows successful, the same metrics of other tiers are not sacrificed too much because they have less stringent SLO requirements.

alt_text

Figure 6. Workflow performance monitoring: runtime SLO success ratio of each tier.

Cluster Resource Usage Monitoring

The workflow requirement is not static and may change from time to time. A daily report is done for each cluster on the following metrics:

  1. Total, tier1, tier2, and tier3 usage/capacity ratio (including instance, vcore, memory)
  2. Number of all tier1, tier2, and tier3 workflows running in the cluster (there may be new workflows onboarded, or re-tiering and SLO change of existing workflows)

Based on these metrics, we determine if the cluster is over or under utilized and take actions by either adding more resources to the cluster (organic growth), downsizing the cluster to save cost, or keeping it as is.

Cross-Cluster Routing And Load Balancing

As mentioned earlier, different workflows have different resource needs — some require more memory, some more CPU, and others more disk IO or storage. Their needs may change over time. Additionally, some clusters may become full while others are underutilized over time. Through monitoring resource consumption, we may find better home clusters for the workflows than their current ones. To ask users to change their source code to move the workflow is a tedious process, as we also have to adjust the resource allocation when we move the workflow.

We developed a cross-cluster routing (CCR) capability to change the target cluster of the workflows without the need of users to change settings. To implement this, we added instrumentation logic in the JSS component that can redirect jobs to another cluster as we need.

We also developed a workflow to periodically analyze the cluster usage and choose candidate workflows to move to other clusters to keep improving the load balancing and cost efficiency.

To enable redirecting jobs, we need to do resource allocation change on the target cluster with the above mentioned algorithm. To achieve this, we automated the resource allocation process such that with a single button click (triggering a workflow), it will do both resource allocation and configure job redirection in one step.

Current and Future Work

At the time of writing, our metrics indicate the vcore and memory usage of a fairly big cluster is not balanced, and a lot of vcores are wasted as a result. We are working on splitting this cluster into two clusters of different instance types with CCR support and migrating the workflows running on the original cluster into one of the resulting clusters. We expect with this change we will be able to not only run the applications more reliably, but also save a lot of cost.

Our clusters are located at different availability zones. When one zone has an issue, we can leverage the CCR feature to move critical workflows to another cluster in a different zone. We are working on making this process smoother.

We are also looking into dynamically route jobs at runtime to different clusters when the current load on the target cluster is full.

Acknowledgement

Thanks to Hengzhe Guo, Bogdan Pisica, Sandeep Kumar from the Batch Processing Platform team who helped further improve the implementations. Thanks to Soam Acharya, Jooseong Kim and Hannah Chen for driving the workflow tiering. Thanks to Jooseong Kim, William Tom, Soam Acharya, Chunyan Wang for the discussions and support along the way. Thanks to the workflow team, our platform user teams for their feedback and support.

Pinterest
Pinterest is a social bookmarking site where users collect and share photos of their favorite events, interests and hobbies. One of the fastest growing social networks online, Pinterest is the third-largest such network behind only Facebook and Twitter.
Tools mentioned in article
Open jobs at Pinterest
Video Platform Engineer
San Francisco, CA, US

About Pinterest:  

Millions of people across the world come to Pinterest to find new ideas every day. It’s where they get inspiration, dream about new possibilities and plan for what matters most. Our mission is to help those people find their inspiration and create a life they love. In your role, you’ll be challenged to take on work that upholds this mission and pushes Pinterest forward. You’ll grow as a person and leader in your field, all the while helping Pinners make their lives better in the positive corner of the internet.

Video is becoming the most important content format on Pinterest ecosystem. This role will act as an architect for Pinterest video platform, which responsible for the whole lifecycle of a video from uploading, transcoding, delivery and playback. The video architect will oversee Pinterest video platform strategy, owns the direction of what will be our next strategic investment to strengthen our video platform, and land the strategy into major initiatives towards the directions.

What you'll do: 

  • Lead the optimization and improvement in video codec efficiency, encoder rate control, transcode speed, video pre/post-processing and error resilience.
  • Improve end-to-end video experiences on lossy networks in various user scenarios.
  • Identify various opportunities to optimize in video codec, pipeline, error resilience.
  • Define the video optimization roadmap for both low-end and high-end network and devices.
  • Lead the definition and implementation of media processing pipeline.

What we're looking for: 

  • Experience with AWS Elemental
  • Solid knowledge in modern video codecs such as H.264, H.265, VP8/VP9 and AV1. 
  • Deep understanding of adaptive streaming technology especially HLS and MPEG-DASH.
  • Experience in architecting end to end video streaming infrastructure.
  • Experience in building media upload and transcoding pipelines.
  • Proficient in FFmpeg command line tools and libraries.
  • Familiar with popular client side media frameworks such as AVFoundation, Exoplayer, HLS.js, and etc.
  • Experience with streaming quality optimization on mobile devices.
  • Experience collaborating cross-functionally between groups with different video technologies and pipelines.

#LI-EA1

Senior Software Engineer, Data Privacy
Dublin, IE

About Pinterest:  

Millions of people across the world come to Pinterest to find new ideas every day. It’s where they get inspiration, dream about new possibilities and plan for what matters most. Our mission is to help those people find their inspiration and create a life they love. In your role, you’ll be challenged to take on work that upholds this mission and pushes Pinterest forward. You’ll grow as a person and leader in your field, all the while helping Pinners make their lives better in the positive corner of the internet.

The Data Privacy Engineering team builds platforms and works with engineers across Pinterest to help ensure our handling of customer and partner data meets or exceeds their expectations of privacy and security.  We’re a small, and growing, team based in Dublin.  We own three major engineering projects with company-wide impact: expanding and onboarding teams doing big data processing to a new fine-grained data access platform, tracking how data moves and evolves through our systems, and ensuring data is always handled appropriately.  As a Senior Engineer, you’ll take a driving role on one of these projects and responsibility for working with internal teams to understand their needs, designing solutions, and collaborating with teams in Dublin and the US to successfully execute on your plans.  Your work will help ensure the safety of our users’ and partners’ data and help Pinterest be a source of inspiration for millions of users.

What you’ll do:

  • Consult with engineers, product designers, and security experts to design data-handling solutions
  • Review code and designs from across the company to guide teams to secure and private solutions
  • Onboard customers onto platforms and refine our tools to streamline these processes
  • Mentor and coach engineers and grow your technical leadership skills, with engineers in Dublin and other offices.
  • Grow your engineering skills as you work with a range of open-source technologies and engineers across the company, and code across Pinterest’s stack in a variety of languages

What we’re looking for:

  • 5+ years of experience building enterprise-scale backend services in an object-oriented programing language (Java preferred)
  • Experience mentoring junior engineers and driving an engineering culture
  • The ability to drive ambiguous projects to successful outcomes independently
  • Understanding of big-data processing concepts
  • Experience with data querying and analytics techniques
  • Strong advocacy for the customer and their privacy

#LI-KL1

Software Engineer, Key Value Systems
San Francisco, CA, US

About Pinterest:  

Millions of people across the world come to Pinterest to find new ideas every day. It’s where they get inspiration, dream about new possibilities and plan for what matters most. Our mission is to help those people find their inspiration and create a life they love. In your role, you’ll be challenged to take on work that upholds this mission and pushes Pinterest forward. You’ll grow as a person and leader in your field, all the while helping Pinners make their lives better in the positive corner of the internet.

Pinterest brings millions of Pinners the inspiration to create a life they love for everything; whether that be tonight’s dinner, next summer’s vacation, or a dream house down the road. Our Key Value Systems team is responsible for building and owning the systems that store and serve data that powers Pinterest's business-critical applications. These applications range from user-facing features all the way to being integral components of our machine learning processing systems. The mission of the team is to provide storage and serving systems that are not only highly scalable, performant, and reliable, but also a delight to use. Our systems enable our product engineers to move fast and build awesome features rapidly on top of them.

What you’ll do

  • Build, own, and improve Pinterest's next generation key-value platform that will store petabytes of data, handle tens of millions of QPS, and serve hundreds of use cases powering almost all of Pinterest's business-critical applications
  • Contribute to open-source databases like RocksDB and Rocksplicator
  • Own, improve, and contribute to the main key-value storage platform, streaming write architectures using Kafka, and additional derivative
  • RocksDB-based distributed systems
  • Continually improve operability, scalability, efficiency, performance, and reliability of our storage solutions

What we’re looking for:

  • Deep expertise on online distributed storage and key-value stores at consumer Internet scale
  • Strong ability to work cross-functionally with product teams and with the storage SRE/DBA team
  • Fluent in C/C++ and Java
  • Good communication skills and an excellent team player

#LI-KL1

Head of Ads Delivery Engineering
San Francisco, CA, US

About Pinterest:  

Millions of people across the world come to Pinterest to find new ideas every day. It’s where they get inspiration, dream about new possibilities and plan for what matters most. Our mission is to help those people find their inspiration and create a life they love. In your role, you’ll be challenged to take on work that upholds this mission and pushes Pinterest forward. You’ll grow as a person and leader in your field, all the while helping Pinners make their lives better in the positive corner of the internet.

Pinterest is on a mission to help millions of people across the globe to find the inspiration to create a life they love. Within the Ads Quality team, we try to connect the dots between the aspirations of pinners and the products offered by our partners. 

You will lead an ML centric organization that is responsible for the optimization of the ads delivery funnel and Ads marketplace at Pinterest. Using your strong analytical skill sets, thorough understanding of machine learning, online auctions and experience in managing an engineering team you’ll advance the state of the art in ML and auction theory while at the same time unlock Pinterest’s monetization potential.  In short, this is a unique position, where you’ll get the freedom to work across the organization to bring together pinners and partners in this unique marketplace.

What you’ll do: 

  • Manage the ads delivery engineering organization, consisting of managers and engineers with a background in ML, backend development, economics and data science
  • Develop and execute a vision for ads marketplace and ads delivery funnel
  • Build strong XFN relationships with peers in Ads Quality, Monetization and the larger engineering organization, as well as with XFN partners in Product, Data Science, Finance and Sales

What we’re looking for:

  • MSc. or Ph.D. degree in Economics, Statistics, Computer Science or related field
  • 10+ years of relevant industry experience
  • 5+ years of management experience
  • XFN collaborator and a strong communicator
  • Hands-on experience building large-scale ML systems and/or Ads domain knowledge
  • Strong mathematical skills with knowledge of statistical models (RL, DNN)

#LI-TG1

Verified by
Security Software Engineer
Tech Lead, Big Data Platform
Software Engineer
Talent Brand Manager
Sourcer
Software Engineer
You may also like