Manas Realtime — Enabling Changes to Be Searchable in a Blink of an Eye

1,349
Pinterest
Pinterest is a social bookmarking site where users collect and share photos of their favorite events, interests and hobbies. One of the fastest growing social networks online, Pinterest is the third-largest such network behind only Facebook and Twitter.

By Michael Mi | Tech Lead, Core Product Serving Infra


Manas, Pinterest’s in-house search engine, is a generic information retrieval platform. As we discussed in our previous post, Manas was designed as a search framework with high performance, availability, and scalability. Today, Manas powers search for the majority of Pinterest products, including Ads, Search, Homefeed, Related Pins, Visual, and Shopping.

One of the key metrics for a search system is the indexing latency, which is the time taken to update the search index to reflect changes. As we keep growing the system capabilities and onboarding new use cases, the ability to instantly index new documents has become more important. Manas already supports incremental indexing, which is able to provide indexing latency within the order of tens of minutes. Unfortunately, this can’t meet our growing business requirements from Ads and following feeds. We decided to build a new module within Manas to further reduce indexing latency to a fraction of a second.

In this blog post we describe the architecture of the system and its key challenges, and we provide details about the tradeoffs we made.

Challenges

New requirements come with new challenges. Here are several of the major challenges we faced.

Indexing Latency

The tiny batch approach, aka near-realtime, is the most popular choice for open source projects like Lucene, Vespa, etc. With this approach, the newly written document is not searchable until index commit is called. As a result, you need to make a tradeoff between indexing latency and throughput. Unfortunately, we can’t leverage this approach to reduce indexing latency to the order of seconds.

Index Refresh Ability

One of the drawbacks of realtime serving is the lack of index refresh agility. For a batch pipeline, it is trivial to rerun the indexing job to pick up all schema changes at once. However, when it comes to the realtime serving pipeline, an efficient index refresh support becomes complicated.

Scale-up for Constantly Changing Data

To avoid over-provisioning, auto-scaling was employed to adjust replicas based on the actual query load. If the index is immutable, it is relatively easy to bring up new replicas: you just need to copy the index to new nodes, and you are done. All of the difficulty lies in handling the constantly changing index: how to ensure that all replicas end up with the same index?

Error Recovery

Manas is a data-intensive service where each host may serve an index of up to several hundred GBs. Manas is also a stateful system; a bad binary could introduce data issues that rollbacks would not be able to fix. We needed to build a system that supports both fault tolerance and error recovery so that it is possible to recover from both binary bugs and data corruption.

Moving from Static to Realtime

Let’s take a brief look at the differences between conventional static serving and realtime serving. As shown in the above diagram, the major work for realtime serving is moving the indexing pipeline from offline to online.

For static serving, the indexes are generated offline with a batch workflow, and then they are copied to leaf for online serving. With batch workflows, due to the high framework overhead, it is barely possible to build a servable index within a fraction of a second. For realtime serving, instead of using an offline workflow, all writes are handled on the fly within the service. In addition, the realtime indexing pipeline handles writes in a way that generates the same index format as the static indexing pipeline, allowing us to reuse the entire index read logic. With this in mind, let’s continue the journey of understanding how realtime serving works.

Indexing Interface

Instead of directly using RPC, Kafka was employed as our high write throughput stream. Leaf servers continuously pull mutations to build incremental indexes. Turns out this decision dramatically simplified our system in multiple ways:

  • Data replications and write failures are taken care of by Kafka.
  • With the seek back ability, the Kafka queue also serves as WAL.
  • With a strict ordering guarantee in each partition, the system can blindly apply deletions without needing to worry about correctness.

Architecture Overview

Since the serving logic can be reused with a shared index format, we will focus on the indexing data flow.

Essentially, realtime Manas leaf is a LSM engine, which converts random IOs writes into sequential IOs and enables efficient serving for both read amplification and write amplification applications. As shown below, the whole indexing process consists of three critical steps. Let’s discuss them one by one.

Realtime Segment Build

We introduced realtime segments except for the existing static segments. As shown above, there are two types of realtime segments in the system: active realtime segments, and sealed realtime segments.

  • Active realtime segment, the only mutable component, is used to accumulate mutations (adds/deletes) pulled from Kafka. It’s worth pointing out that after a document is added into a realtime segment, it becomes searchable immediately after the document level commit.
  • Once the active realtime segment reaches a configurable threshold, it is sealed, becomes immutable, and is put into a flush queue. Meanwhile, a new active realtime segment is created to continue accumulating mutations.

In the case of a service restart, the realtime segments can be reconstructed by replaying messages from Kafka.

Index Flush

Index flush is the process of persisting in-memory data from a realtime segment into a compact index file. A flush is automatically triggered when a realtime segment gets sealed, and a flush can also be manually triggered using a debug command.

The index flush is a beneficial operator that guarantees data persistency so that we don’t need to reconstruct in-memory segments from scratch during restart. In addition, flushing reduces a segment’s memory footprint and improves serving efficiency with a compact immutable index.

Index Compaction

Over time, multiple generated small segments hurt serving performance. To overcome this, we introduced a background compaction thread to merge small segments into bigger ones. Since deletion operators just mark documents as deleted instead of physically deleting them, the compaction thread also persists these deleted/expired documents.

After each flush and compaction operator, a new index manifest consisting of all the static segments would be generated. Kafka offsets, used as checkpoints, are also added into each manifest. Based on the checkpoints, the service knows where to consume messages after a restart.

Detailed Design

In this section, we will cover several key areas in more detail. Let’s start with the most interesting part, the concurrency model.

Concurrency Model

The realtime segment, as aforementioned, is the only mutable component where we need to handle both read and write simultaneously. Unfortunately, the near-realtime approach employed by open source projects can’t meet our business requirement. Instead, we chose a different approach that enables us to commit a document immediately after adding into the index without waiting for an index flush. For the sake of performance, we employed a lock-free technique for data structures tailored to our usage. Now let’s open the box!

Realtime Segment

Each realtime segment consists of an inverted index and a forward index. The inverted index is logically a mapping from term to posting list, a list of document ids, used for retrieval. Meanwhile, the forward index stores an arbitrary binary blob used for full scoring and data fetching. Let’s only focus on realtime inverted index part, which is more interesting and challenging as compared to the forward index.

At a high level, the major difference between a realtime segment and a static segment is mutability. For the realtime inverted index, the map from term to posting list needs to be a concurrent one. This is well supported by open sources like folly’s concurrent hashmap. What we care about more is the internal representation for the posting list, which can support our concurrency model in an efficient way.

Append-only Vector

Usually, it is more efficient and easier to reason about a single-writer, multiple-readers model. We chose a similar data model as HDFS with an append-only lock-free data structure. Let’s take a closer look at how the reader and the writer interact with each other.

  • Writer appends doc id into the vector, then commits size to make it accessible to readers
  • Reader takes a snapshot up till committed size before accessing data

In order to avoid memory copying overhead as the posting list grows, internally we manage data as a list of buckets. We just need to add a new bucket without touching old ones when we run out of capacity. In addition, usually search engines use skip lists to speed up the skip operator. Thanks to this format, it is convenient to support a single-level skip list, which is good enough for realtime inverted index since the size of it is usually small.

Document Atomicity

Now with an append-only vector, we are able to achieve atomicity for a single posting list. However, a document can contain a list of terms, and we may end up returning unexpected documents with a partially updated index. To address this potential issue, we introduced a document level commit to guarantee document atomicity. In the serving pipeline, an additional filter is used to make sure only committed documents are returned.

Speaking of document atomicity, document updating is another scenario worth mentioning here. For each document update, we deliberately convert it to two operators: adding the new document, then deleting the old one from the index. Although each operator is atomic, together we can’t guarantee atomicity. We think it is ok to either return the old version or the new version in a very short time window, but nevertheless, we added dedupe logic into the serving pipeline to filter out the old one when both are returned.

Writes Scaling

One question that naturally comes up is that if your data structures only support the single-write and multiple-reads concurrent model, what if a single thread can’t handle all the writes in time? It does not seem like a good idea to blindly add more shards just to scale write throughput. While this is a valid concern, it has already been taken care of in our design.

The single-write and multiple-reads concurrent model used for data structures doesn’t mean we are not able to use multiple threads for writes. We planned to use the term-shard strategy to support writes with multiple threads. As shown in the above diagram, for a given document with a list of terms, each term would be always mapped to the fixed thread so that all data structures tailored for single-write and multiple-reads can be reused directly without any limitations.

Index Refresh

Index refresh ability is a critical feature for our products, enabling quick turnaround and improving dev velocity. Generally, two approaches can be used to refresh the index in an efficient way, backfilling on the fly and reinstating from the offline built index, respectively.

Backfilling Index

We provide the ability to backfill documents at a reasonable throughput. To avoid impacting production freshness, we need a separate stream for backfill traffic with a lower priority. As a result, it is possible that two versions of a document are present in both streams and the old version overrides the new one. To overcome this, we need to introduce a versioning mechanism and a conflict resolver in the realtime indexing pipeline to decide which one is more fresh.

Reinstating from Offline Built Index

Sometimes, backfilling at a given speed for a full dataset would be too time-consuming. Another quicker index refresh approach we support is building an index offline, and then reinstating from it, with a synchronization mechanism between the offline built index and Kafka stream.

Failover and Auto-scaling

From time to time, we need to bring up new instances for various reasons, like failover and auto-scaling, etc. For static serving, it is easy to start a new instance with an immutable index downloaded from the index store. However, it becomes complicated for realtime serving with a constantly changing index. How do we ensure that new instances have the same copy of the index as others eventually?

We decided to use leader-based replication, as shown in the above diagram. Our process would look like this:

  1. The leader periodically takes a new snapshot and uploads it to the durable index store
  2. New instances download the latest snapshot, by default, from the index store
  3. New instances resume consuming messages from Kafka based on the checkpoint from the snapshot index
  4. New instances start serving traffic once they have caught up

There are some key points in the design worth pointing out:

Leader Election

The only responsibility of the leader is to take snapshots and upload the index periodically. This means we can afford to have no leader or have multiple leaders for a short period of time, up to hours. Therefore, we have some flexibility in choosing a leader election algorithm. For simplicity, we chose to use our cluster maintenance job to select a leader statically, where we periodically check if we have a good leader.

Snapshot Upload

Usually, the new instance just connects to the leader to download the latest snapshot. In this approach, the snapshot downloading from new instances would potentially overload the leader, leading to cascading failures. Instead, we chose to upload snapshots periodically to the index store, trading space and freshness for stability. In addition, the uploaded snapshots are useful for error recovery, which will be covered shortly.

Error Recovery

As aforementioned, error recovery is another challenge for realtime serving system. There are some specific scenarios involving data corruption we need to handle.

Input Data Corruption

We use Kafka as our input write stream; unfortunately, those messages are immutable due to the fact that a producer can just append messages to it but can’t change content for existing ones. This means once the data corruption is introduced into Kafka messages, it is permanent. Thanks to the uploaded snapshots, we have the ability to rewind our index to the point without corruption, skip corrupted messages, and then consume new messages with the fix.

Binary Bugs Caused Data Corruption

Although we have a mature indexing validation pipeline for static clusters to guarantee no issues with the new index and the new binary before swapping in the new version, it is still possible that some bugs sneak into production. Fortunately, we can fix the issue by rolling back the binary or index. It becomes much harder for realtime serving where rolling back the binary can’t roll back the errors in the index. Using our snapshot uploading mechanism, we are able to rollback the binary together with a rewinded index and then replay messages from Kafka to fix errors in the index.

What’s next

As more scenarios are onboarded to Manas, we need to keep improving the system’s efficiency, scalability, and capability. Some interesting projects in our roadmap are as follows:

  • Cohosting static and realtime clusters to simplify our serving stack
  • Optimizing the system to support a large dataset
  • Building a generic embedding based retrieval to power advanced scenarios

Acknowledgments: This post summarizes several quarter’s work that involved multiple teams. Thanks to Tim Koh, Haibin Xie, George Wu, Sheng Chen, Jiacheng Hong and Zheng Liu for their countless contributions. Thanks to Mukund Narasimhan, Angela Sheu, Ang Xu, Chengcheng Hu and Dumitru Daniliuc for many meaningful discussions and feedbacks. Thanks to Roger Wang and Randall Keller for the great leadership.

Pinterest
Pinterest is a social bookmarking site where users collect and share photos of their favorite events, interests and hobbies. One of the fastest growing social networks online, Pinterest is the third-largest such network behind only Facebook and Twitter.
Tools mentioned in article
Open jobs at Pinterest
Video Platform Engineer
San Francisco, CA, US

About Pinterest:  

Millions of people across the world come to Pinterest to find new ideas every day. It’s where they get inspiration, dream about new possibilities and plan for what matters most. Our mission is to help those people find their inspiration and create a life they love. In your role, you’ll be challenged to take on work that upholds this mission and pushes Pinterest forward. You’ll grow as a person and leader in your field, all the while helping Pinners make their lives better in the positive corner of the internet.

Video is becoming the most important content format on Pinterest ecosystem. This role will act as an architect for Pinterest video platform, which responsible for the whole lifecycle of a video from uploading, transcoding, delivery and playback. The video architect will oversee Pinterest video platform strategy, owns the direction of what will be our next strategic investment to strengthen our video platform, and land the strategy into major initiatives towards the directions.

What you'll do: 

  • Lead the optimization and improvement in video codec efficiency, encoder rate control, transcode speed, video pre/post-processing and error resilience.
  • Improve end-to-end video experiences on lossy networks in various user scenarios.
  • Identify various opportunities to optimize in video codec, pipeline, error resilience.
  • Define the video optimization roadmap for both low-end and high-end network and devices.
  • Lead the definition and implementation of media processing pipeline.

What we're looking for: 

  • Experience with AWS Elemental
  • Solid knowledge in modern video codecs such as H.264, H.265, VP8/VP9 and AV1. 
  • Deep understanding of adaptive streaming technology especially HLS and MPEG-DASH.
  • Experience in architecting end to end video streaming infrastructure.
  • Experience in building media upload and transcoding pipelines.
  • Proficient in FFmpeg command line tools and libraries.
  • Familiar with popular client side media frameworks such as AVFoundation, Exoplayer, HLS.js, and etc.
  • Experience with streaming quality optimization on mobile devices.
  • Experience collaborating cross-functionally between groups with different video technologies and pipelines.

#LI-EA1

Senior Software Engineer, Data Privacy
Dublin, IE

About Pinterest:  

Millions of people across the world come to Pinterest to find new ideas every day. It’s where they get inspiration, dream about new possibilities and plan for what matters most. Our mission is to help those people find their inspiration and create a life they love. In your role, you’ll be challenged to take on work that upholds this mission and pushes Pinterest forward. You’ll grow as a person and leader in your field, all the while helping Pinners make their lives better in the positive corner of the internet.

The Data Privacy Engineering team builds platforms and works with engineers across Pinterest to help ensure our handling of customer and partner data meets or exceeds their expectations of privacy and security.  We’re a small, and growing, team based in Dublin.  We own three major engineering projects with company-wide impact: expanding and onboarding teams doing big data processing to a new fine-grained data access platform, tracking how data moves and evolves through our systems, and ensuring data is always handled appropriately.  As a Senior Engineer, you’ll take a driving role on one of these projects and responsibility for working with internal teams to understand their needs, designing solutions, and collaborating with teams in Dublin and the US to successfully execute on your plans.  Your work will help ensure the safety of our users’ and partners’ data and help Pinterest be a source of inspiration for millions of users.

What you’ll do:

  • Consult with engineers, product designers, and security experts to design data-handling solutions
  • Review code and designs from across the company to guide teams to secure and private solutions
  • Onboard customers onto platforms and refine our tools to streamline these processes
  • Mentor and coach engineers and grow your technical leadership skills, with engineers in Dublin and other offices.
  • Grow your engineering skills as you work with a range of open-source technologies and engineers across the company, and code across Pinterest’s stack in a variety of languages

What we’re looking for:

  • 5+ years of experience building enterprise-scale backend services in an object-oriented programing language (Java preferred)
  • Experience mentoring junior engineers and driving an engineering culture
  • The ability to drive ambiguous projects to successful outcomes independently
  • Understanding of big-data processing concepts
  • Experience with data querying and analytics techniques
  • Strong advocacy for the customer and their privacy

#LI-KL1

Software Engineer, Key Value Systems
San Francisco, CA, US

About Pinterest:  

Millions of people across the world come to Pinterest to find new ideas every day. It’s where they get inspiration, dream about new possibilities and plan for what matters most. Our mission is to help those people find their inspiration and create a life they love. In your role, you’ll be challenged to take on work that upholds this mission and pushes Pinterest forward. You’ll grow as a person and leader in your field, all the while helping Pinners make their lives better in the positive corner of the internet.

Pinterest brings millions of Pinners the inspiration to create a life they love for everything; whether that be tonight’s dinner, next summer’s vacation, or a dream house down the road. Our Key Value Systems team is responsible for building and owning the systems that store and serve data that powers Pinterest's business-critical applications. These applications range from user-facing features all the way to being integral components of our machine learning processing systems. The mission of the team is to provide storage and serving systems that are not only highly scalable, performant, and reliable, but also a delight to use. Our systems enable our product engineers to move fast and build awesome features rapidly on top of them.

What you’ll do

  • Build, own, and improve Pinterest's next generation key-value platform that will store petabytes of data, handle tens of millions of QPS, and serve hundreds of use cases powering almost all of Pinterest's business-critical applications
  • Contribute to open-source databases like RocksDB and Rocksplicator
  • Own, improve, and contribute to the main key-value storage platform, streaming write architectures using Kafka, and additional derivative
  • RocksDB-based distributed systems
  • Continually improve operability, scalability, efficiency, performance, and reliability of our storage solutions

What we’re looking for:

  • Deep expertise on online distributed storage and key-value stores at consumer Internet scale
  • Strong ability to work cross-functionally with product teams and with the storage SRE/DBA team
  • Fluent in C/C++ and Java
  • Good communication skills and an excellent team player

#LI-KL1

Head of Ads Delivery Engineering
San Francisco, CA, US

About Pinterest:  

Millions of people across the world come to Pinterest to find new ideas every day. It’s where they get inspiration, dream about new possibilities and plan for what matters most. Our mission is to help those people find their inspiration and create a life they love. In your role, you’ll be challenged to take on work that upholds this mission and pushes Pinterest forward. You’ll grow as a person and leader in your field, all the while helping Pinners make their lives better in the positive corner of the internet.

Pinterest is on a mission to help millions of people across the globe to find the inspiration to create a life they love. Within the Ads Quality team, we try to connect the dots between the aspirations of pinners and the products offered by our partners. 

You will lead an ML centric organization that is responsible for the optimization of the ads delivery funnel and Ads marketplace at Pinterest. Using your strong analytical skill sets, thorough understanding of machine learning, online auctions and experience in managing an engineering team you’ll advance the state of the art in ML and auction theory while at the same time unlock Pinterest’s monetization potential.  In short, this is a unique position, where you’ll get the freedom to work across the organization to bring together pinners and partners in this unique marketplace.

What you’ll do: 

  • Manage the ads delivery engineering organization, consisting of managers and engineers with a background in ML, backend development, economics and data science
  • Develop and execute a vision for ads marketplace and ads delivery funnel
  • Build strong XFN relationships with peers in Ads Quality, Monetization and the larger engineering organization, as well as with XFN partners in Product, Data Science, Finance and Sales

What we’re looking for:

  • MSc. or Ph.D. degree in Economics, Statistics, Computer Science or related field
  • 10+ years of relevant industry experience
  • 5+ years of management experience
  • XFN collaborator and a strong communicator
  • Hands-on experience building large-scale ML systems and/or Ads domain knowledge
  • Strong mathematical skills with knowledge of statistical models (RL, DNN)

#LI-TG1

Verified by
Security Software Engineer
Tech Lead, Big Data Platform
Software Engineer
Talent Brand Manager
Sourcer
Software Engineer
You may also like