Manas Realtime — Enabling Changes to Be Searchable in a Blink of an Eye

1,310
Pinterest
Pinterest is a social bookmarking site where users collect and share photos of their favorite events, interests and hobbies. One of the fastest growing social networks online, Pinterest is the third-largest such network behind only Facebook and Twitter.

By Michael Mi | Tech Lead, Core Product Serving Infra


Manas, Pinterest’s in-house search engine, is a generic information retrieval platform. As we discussed in our previous post, Manas was designed as a search framework with high performance, availability, and scalability. Today, Manas powers search for the majority of Pinterest products, including Ads, Search, Homefeed, Related Pins, Visual, and Shopping.

One of the key metrics for a search system is the indexing latency, which is the time taken to update the search index to reflect changes. As we keep growing the system capabilities and onboarding new use cases, the ability to instantly index new documents has become more important. Manas already supports incremental indexing, which is able to provide indexing latency within the order of tens of minutes. Unfortunately, this can’t meet our growing business requirements from Ads and following feeds. We decided to build a new module within Manas to further reduce indexing latency to a fraction of a second.

In this blog post we describe the architecture of the system and its key challenges, and we provide details about the tradeoffs we made.

Challenges

New requirements come with new challenges. Here are several of the major challenges we faced.

Indexing Latency

The tiny batch approach, aka near-realtime, is the most popular choice for open source projects like Lucene, Vespa, etc. With this approach, the newly written document is not searchable until index commit is called. As a result, you need to make a tradeoff between indexing latency and throughput. Unfortunately, we can’t leverage this approach to reduce indexing latency to the order of seconds.

Index Refresh Ability

One of the drawbacks of realtime serving is the lack of index refresh agility. For a batch pipeline, it is trivial to rerun the indexing job to pick up all schema changes at once. However, when it comes to the realtime serving pipeline, an efficient index refresh support becomes complicated.

Scale-up for Constantly Changing Data

To avoid over-provisioning, auto-scaling was employed to adjust replicas based on the actual query load. If the index is immutable, it is relatively easy to bring up new replicas: you just need to copy the index to new nodes, and you are done. All of the difficulty lies in handling the constantly changing index: how to ensure that all replicas end up with the same index?

Error Recovery

Manas is a data-intensive service where each host may serve an index of up to several hundred GBs. Manas is also a stateful system; a bad binary could introduce data issues that rollbacks would not be able to fix. We needed to build a system that supports both fault tolerance and error recovery so that it is possible to recover from both binary bugs and data corruption.

Moving from Static to Realtime

Let’s take a brief look at the differences between conventional static serving and realtime serving. As shown in the above diagram, the major work for realtime serving is moving the indexing pipeline from offline to online.

For static serving, the indexes are generated offline with a batch workflow, and then they are copied to leaf for online serving. With batch workflows, due to the high framework overhead, it is barely possible to build a servable index within a fraction of a second. For realtime serving, instead of using an offline workflow, all writes are handled on the fly within the service. In addition, the realtime indexing pipeline handles writes in a way that generates the same index format as the static indexing pipeline, allowing us to reuse the entire index read logic. With this in mind, let’s continue the journey of understanding how realtime serving works.

Indexing Interface

Instead of directly using RPC, Kafka was employed as our high write throughput stream. Leaf servers continuously pull mutations to build incremental indexes. Turns out this decision dramatically simplified our system in multiple ways:

  • Data replications and write failures are taken care of by Kafka.
  • With the seek back ability, the Kafka queue also serves as WAL.
  • With a strict ordering guarantee in each partition, the system can blindly apply deletions without needing to worry about correctness.

Architecture Overview

Since the serving logic can be reused with a shared index format, we will focus on the indexing data flow.

Essentially, realtime Manas leaf is a LSM engine, which converts random IOs writes into sequential IOs and enables efficient serving for both read amplification and write amplification applications. As shown below, the whole indexing process consists of three critical steps. Let’s discuss them one by one.

Realtime Segment Build

We introduced realtime segments except for the existing static segments. As shown above, there are two types of realtime segments in the system: active realtime segments, and sealed realtime segments.

  • Active realtime segment, the only mutable component, is used to accumulate mutations (adds/deletes) pulled from Kafka. It’s worth pointing out that after a document is added into a realtime segment, it becomes searchable immediately after the document level commit.
  • Once the active realtime segment reaches a configurable threshold, it is sealed, becomes immutable, and is put into a flush queue. Meanwhile, a new active realtime segment is created to continue accumulating mutations.

In the case of a service restart, the realtime segments can be reconstructed by replaying messages from Kafka.

Index Flush

Index flush is the process of persisting in-memory data from a realtime segment into a compact index file. A flush is automatically triggered when a realtime segment gets sealed, and a flush can also be manually triggered using a debug command.

The index flush is a beneficial operator that guarantees data persistency so that we don’t need to reconstruct in-memory segments from scratch during restart. In addition, flushing reduces a segment’s memory footprint and improves serving efficiency with a compact immutable index.

Index Compaction

Over time, multiple generated small segments hurt serving performance. To overcome this, we introduced a background compaction thread to merge small segments into bigger ones. Since deletion operators just mark documents as deleted instead of physically deleting them, the compaction thread also persists these deleted/expired documents.

After each flush and compaction operator, a new index manifest consisting of all the static segments would be generated. Kafka offsets, used as checkpoints, are also added into each manifest. Based on the checkpoints, the service knows where to consume messages after a restart.

Detailed Design

In this section, we will cover several key areas in more detail. Let’s start with the most interesting part, the concurrency model.

Concurrency Model

The realtime segment, as aforementioned, is the only mutable component where we need to handle both read and write simultaneously. Unfortunately, the near-realtime approach employed by open source projects can’t meet our business requirement. Instead, we chose a different approach that enables us to commit a document immediately after adding into the index without waiting for an index flush. For the sake of performance, we employed a lock-free technique for data structures tailored to our usage. Now let’s open the box!

Realtime Segment

Each realtime segment consists of an inverted index and a forward index. The inverted index is logically a mapping from term to posting list, a list of document ids, used for retrieval. Meanwhile, the forward index stores an arbitrary binary blob used for full scoring and data fetching. Let’s only focus on realtime inverted index part, which is more interesting and challenging as compared to the forward index.

At a high level, the major difference between a realtime segment and a static segment is mutability. For the realtime inverted index, the map from term to posting list needs to be a concurrent one. This is well supported by open sources like folly’s concurrent hashmap. What we care about more is the internal representation for the posting list, which can support our concurrency model in an efficient way.

Append-only Vector

Usually, it is more efficient and easier to reason about a single-writer, multiple-readers model. We chose a similar data model as HDFS with an append-only lock-free data structure. Let’s take a closer look at how the reader and the writer interact with each other.

  • Writer appends doc id into the vector, then commits size to make it accessible to readers
  • Reader takes a snapshot up till committed size before accessing data

In order to avoid memory copying overhead as the posting list grows, internally we manage data as a list of buckets. We just need to add a new bucket without touching old ones when we run out of capacity. In addition, usually search engines use skip lists to speed up the skip operator. Thanks to this format, it is convenient to support a single-level skip list, which is good enough for realtime inverted index since the size of it is usually small.

Document Atomicity

Now with an append-only vector, we are able to achieve atomicity for a single posting list. However, a document can contain a list of terms, and we may end up returning unexpected documents with a partially updated index. To address this potential issue, we introduced a document level commit to guarantee document atomicity. In the serving pipeline, an additional filter is used to make sure only committed documents are returned.

Speaking of document atomicity, document updating is another scenario worth mentioning here. For each document update, we deliberately convert it to two operators: adding the new document, then deleting the old one from the index. Although each operator is atomic, together we can’t guarantee atomicity. We think it is ok to either return the old version or the new version in a very short time window, but nevertheless, we added dedupe logic into the serving pipeline to filter out the old one when both are returned.

Writes Scaling

One question that naturally comes up is that if your data structures only support the single-write and multiple-reads concurrent model, what if a single thread can’t handle all the writes in time? It does not seem like a good idea to blindly add more shards just to scale write throughput. While this is a valid concern, it has already been taken care of in our design.

The single-write and multiple-reads concurrent model used for data structures doesn’t mean we are not able to use multiple threads for writes. We planned to use the term-shard strategy to support writes with multiple threads. As shown in the above diagram, for a given document with a list of terms, each term would be always mapped to the fixed thread so that all data structures tailored for single-write and multiple-reads can be reused directly without any limitations.

Index Refresh

Index refresh ability is a critical feature for our products, enabling quick turnaround and improving dev velocity. Generally, two approaches can be used to refresh the index in an efficient way, backfilling on the fly and reinstating from the offline built index, respectively.

Backfilling Index

We provide the ability to backfill documents at a reasonable throughput. To avoid impacting production freshness, we need a separate stream for backfill traffic with a lower priority. As a result, it is possible that two versions of a document are present in both streams and the old version overrides the new one. To overcome this, we need to introduce a versioning mechanism and a conflict resolver in the realtime indexing pipeline to decide which one is more fresh.

Reinstating from Offline Built Index

Sometimes, backfilling at a given speed for a full dataset would be too time-consuming. Another quicker index refresh approach we support is building an index offline, and then reinstating from it, with a synchronization mechanism between the offline built index and Kafka stream.

Failover and Auto-scaling

From time to time, we need to bring up new instances for various reasons, like failover and auto-scaling, etc. For static serving, it is easy to start a new instance with an immutable index downloaded from the index store. However, it becomes complicated for realtime serving with a constantly changing index. How do we ensure that new instances have the same copy of the index as others eventually?

We decided to use leader-based replication, as shown in the above diagram. Our process would look like this:

  1. The leader periodically takes a new snapshot and uploads it to the durable index store
  2. New instances download the latest snapshot, by default, from the index store
  3. New instances resume consuming messages from Kafka based on the checkpoint from the snapshot index
  4. New instances start serving traffic once they have caught up

There are some key points in the design worth pointing out:

Leader Election

The only responsibility of the leader is to take snapshots and upload the index periodically. This means we can afford to have no leader or have multiple leaders for a short period of time, up to hours. Therefore, we have some flexibility in choosing a leader election algorithm. For simplicity, we chose to use our cluster maintenance job to select a leader statically, where we periodically check if we have a good leader.

Snapshot Upload

Usually, the new instance just connects to the leader to download the latest snapshot. In this approach, the snapshot downloading from new instances would potentially overload the leader, leading to cascading failures. Instead, we chose to upload snapshots periodically to the index store, trading space and freshness for stability. In addition, the uploaded snapshots are useful for error recovery, which will be covered shortly.

Error Recovery

As aforementioned, error recovery is another challenge for realtime serving system. There are some specific scenarios involving data corruption we need to handle.

Input Data Corruption

We use Kafka as our input write stream; unfortunately, those messages are immutable due to the fact that a producer can just append messages to it but can’t change content for existing ones. This means once the data corruption is introduced into Kafka messages, it is permanent. Thanks to the uploaded snapshots, we have the ability to rewind our index to the point without corruption, skip corrupted messages, and then consume new messages with the fix.

Binary Bugs Caused Data Corruption

Although we have a mature indexing validation pipeline for static clusters to guarantee no issues with the new index and the new binary before swapping in the new version, it is still possible that some bugs sneak into production. Fortunately, we can fix the issue by rolling back the binary or index. It becomes much harder for realtime serving where rolling back the binary can’t roll back the errors in the index. Using our snapshot uploading mechanism, we are able to rollback the binary together with a rewinded index and then replay messages from Kafka to fix errors in the index.

What’s next

As more scenarios are onboarded to Manas, we need to keep improving the system’s efficiency, scalability, and capability. Some interesting projects in our roadmap are as follows:

  • Cohosting static and realtime clusters to simplify our serving stack
  • Optimizing the system to support a large dataset
  • Building a generic embedding based retrieval to power advanced scenarios

Acknowledgments: This post summarizes several quarter’s work that involved multiple teams. Thanks to Tim Koh, Haibin Xie, George Wu, Sheng Chen, Jiacheng Hong and Zheng Liu for their countless contributions. Thanks to Mukund Narasimhan, Angela Sheu, Ang Xu, Chengcheng Hu and Dumitru Daniliuc for many meaningful discussions and feedbacks. Thanks to Roger Wang and Randall Keller for the great leadership.

Pinterest
Pinterest is a social bookmarking site where users collect and share photos of their favorite events, interests and hobbies. One of the fastest growing social networks online, Pinterest is the third-largest such network behind only Facebook and Twitter.
Tools mentioned in article
Open jobs at Pinterest
Backend Engineer, Measurement User Match
Seattle, WA, US

About Pinterest:  

Millions of people across the world come to Pinterest to find new ideas every day. It’s where they get inspiration, dream about new possibilities and plan for what matters most. Our mission is to help those people find their inspiration and create a life they love. In your role, you’ll be challenged to take on work that upholds this mission and pushes Pinterest forward. You’ll grow as a person and leader in your field, all the while helping Pinners make their lives better in the positive corner of the internet.

Our mission is to help advertisers gain a deep understanding of their ad performance and generate helpful insights so they can make good decisions about their ad campaigns. You’d design and build systems and services to help advertisers learn more about conversions, viewability, brand lift, sales lift, offline conversions, etc. We’re building end-to-end Big Data distributed systems using a board mix of leading open source and Cloud technologies and integrating with 3rd party tools that Advertisers already trust.

What you’ll do:

  • Increase visibility and scale of conversion capture to power our measurement, targeting, and auction products
  • Create cutting edge technical solutions to match conversion events to Pinners
  • Design and build conversion tags, APIs, and data processing algorithms around tracking and reporting against conversions

What we’re looking for:

  • 3+ years of software engineering experience
  • Experiences in developing backend large scale distributed services and data processing workflows in Java and Python

#LI-GK1

Engineering Manager, Shopping Content...
Toronto, ON, CA

About Pinterest:  

Millions of people across the world come to Pinterest to find new ideas every day. It’s where they get inspiration, dream about new possibilities and plan for what matters most. Our mission is to help those people find their inspiration and create a life they love. In your role, you’ll be challenged to take on work that upholds this mission and pushes Pinterest forward. You’ll grow as a person and leader in your field, all the while helping Pinners make their lives better in the positive corner of the internet.

Pinterest is aiming to build a world-class shopping experience for our users, and has a unique advantage to succeed due to the high shopping intent of Pinners. The new Shopping Content Mining team being founded in Toronto plays a critical role in this journey. This team is responsible for building a brand new platform for mining and understanding product data, including extracting high quality product attributes from web pages and free texts that come from all major retailers across the world, mining product reviews and product relationships, product classification, etc. The rich product data generated by this platform is the foundation of the unified product catalog, which powers all shopping experiences at Pinterest (e.g., product search & recommendations, product detail page, shop the look, shopping ads).

There are unique technical challenges for this team: building large scale systems that can process billions of products, Machine Learning models that require few training examples to generate wrappers for web pages, NLP models that can extract information from free-texts, easy-to-use human labelling tools that generate high quality labeled data.Your work will have a huge impact on improving the shopping experience of 400M+ Pinners and driving revenue growth for Pinterest.

What you’ll do:

  • As the Engineering Manager, you’ll be responsible for:
    • Growing this team further in Toronto
    • Driving execution and deliver impact
    • Setting long term technical visions for this area
  • Work with tech leads to provide technical guidance on:
    • Large scale systems that can process billions of products
    • ML models for wrapper induction that require few training examples, NLP models for understanding free-texts
  • Drive cross functional collaborations with partner teams working on shopping

What we’re looking for:

  • 7+ years of industry experience, including 2+ years of management experience
  • Experience on large scale machine learning systems (full ML stack from modelling to deployment at scale.)
  • Experience with big data technologies (e.g., Hadoop/Spark) and scalable realtime systems that process stream data

Nice to have:

  • PhD in Machine Learning or related areas, publication on top ML conferences
  • Familiarity with information extraction techniques for web-pages and free-texts.
  • Experience working with shopping data is a plus.
  • Experience building internal tools for labeling / diagnosing.

#LI-EA1

Staff Machine Learning Software Engin...
Toronto, ON, CA

About Pinterest:  

Millions of people across the world come to Pinterest to find new ideas every day. It’s where they get inspiration, dream about new possibilities and plan for what matters most. Our mission is to help those people find their inspiration and create a life they love. In your role, you’ll be challenged to take on work that upholds this mission and pushes Pinterest forward. You’ll grow as a person and leader in your field, all the while helping Pinners make their lives better in the positive corner of the internet.

Shopping is at the core of Pinterest’s mission to help people create a life they love. The shopping discovery team at Pinterest is inventing a brand new, more visual and personalized shopping experience for 350M+ users worldwide. The team is responsible for delivering mid-funnel shopping experience on shopping surfaces like Product Detail Page, Shopping Search, Shopping on Board etc. As an engineer of the team you will be working on the most cutting edge recommendation algorithms to develop diverse types of shopping recommendations that will be displayed across different shopping surfaces on Pinterest. 

You’ll also be responsible for optimizing the whole page layout by appropriately selecting and slotting the UI templates and recommendation modules optimizing towards a shopping metric. As an engineer of the team you’ll be running experiments and directly improving the shopping metrics contributing to the bottom line of the company.

If you are excited about large scale machine learning problems in the area of recommendation, search and whole page optimization then you must consider this role

What you'll do: 

  • Develop large scale shopping recommendation algorithms
  • Build data pipelines to do data analysis and collect training data
  • Train deep learning models to improve quality and engagement of shopping recommenders
  • Work on backend and infrastructure to build, deploy and serve machine learning models
  • Develop algorithms to optimize the whole page layout of the shopping surfaces
  • Drive the roadmap for next generation of shopping recommenders

What we're looking for: 

  • 6+ years working experience in the area of applied Machine Learning
  • Interest or experience working on a large-scale search, recommendation and ranking problems
  • Interest and experience in doing full stack ML, including backend and ML infrastructure
  • Experience is any of the following areas
    • Developing large scale recommender systems
    • Contextual bandit algorithms
    • Reinforcement learning

#LI-JY1

Senior Machine Learning Engineer, Sho...
Toronto, ON, CA

About Pinterest:  

Millions of people across the world come to Pinterest to find new ideas every day. It’s where they get inspiration, dream about new possibilities and plan for what matters most. Our mission is to help those people find their inspiration and create a life they love. In your role, you’ll be challenged to take on work that upholds this mission and pushes Pinterest forward. You’ll grow as a person and leader in your field, all the while helping Pinners make their lives better in the positive corner of the internet.

Pinterest is aiming to build a world-class shopping experience for our users, and has a unique advantage to succeed due to the high shopping intent of Pinners. The new Shopping Content Mining team being founded in Toronto plays a critical role in this journey. This team is responsible for building a brand new platform for mining and understanding product data, including extracting high quality product attributes from web pages and free texts that come from all major retailers across the world, mining product reviews and product relationships, product classification, etc. The rich product data generated by this platform is the foundation of the unified product catalog, which powers all shopping experiences at Pinterest (e.g., product search & recommendations, product detail page, shop the look, shopping ads).

There are unique technical challenges for this team: building large scale systems that can process billions of products, Machine Learning models that require few training examples to generate wrappers for web pages, NLP models that can extract information from free-texts, easy-to-use human labelling tools that generate high quality labeled data. Your work will have a huge impact on improving the shopping experience of 400M+ Pinners and driving revenue growth for Pinterest.

What you’ll do:

  • As a ML engineer, you will design and build large scale ML systems that can process billions of products
  • ML models for wrapper induction that require few training examples, NLP models for understanding free-texts
  • Drive cross functional collaborations with partner teams working on shopping

What we’re looking for:

  • 3+ years of industry experience
  • Hands-on experience on large scale machine learning systems (full ML stack from modelling to deployment at scale.)
  • Hands-on experience with big data technologies (e.g., Hadoop/Spark) and scalable realtime systems that process stream data
  • Nice to have: PhD in Machine Learning or related areas, publication on top ML conferences, Familiarity with information extraction techniques for web-pages and free-texts, Experience working with shopping data is a plus

#LI-EA1

Verified by
Security Software Engineer
Tech Lead, Big Data Platform
Software Engineer
Talent Brand Manager
Sourcer
Software Engineer
You may also like