By Ping-Min Lin | Software Engineer, Logging Platform
At Pinterest, the Logging Platform team maintains the backbone of data ingestion infrastructure that ingests terabytes of data per day. When building the services powering these pipelines, it is extremely important that we build efficient systems considering how widespread and deep in the stack the systems are. Along our journey of continuous improvement, we’ve figured out basic but useful patterns and learnings that could be applied in general — and hopefully for you as well.
MemQ: Achieving memory-efficient batch data delivery using Netty
MemQ is the next-gen data ingestion platform built in-house and recently open-sourced by the Logging Platform team. When designing the service, we tried hard to maximize the efficiency of our resources, specifically, we focused on reducing GC by using off-heap memory. Netty was chosen as our low-level networking framework due to its great balance between flexibility, performance, and sophisticated out-of-the-box features. For example, we used ByteBuf heavily throughout the project. ByteBufs are the building blocks of data within Netty. They are similar to Java NIO ByteBuffers, but allow the developers much more control of the lifecycle of the objects by providing a “smart pointer” approach for customized memory management using manual reference counting. By using ByteBufs, we managed to transport messages with a single copy of data by passing off-heap network buffer pointers, further reducing cycles used on garbage collection.
The typical journey of a message in the MemQ broker: Each message received from the network will be reconstructed via a length-encoded protocol that will be allocated into a ByteBuf that is off of the JVM heap (direct memory in Netty terms), and will be the only existing copy of the payload throughout the whole pipeline. This ByteBuf reference will be passed into the topic processor and put into a Batch along with other messages that are also waiting to be uploaded to the storage destination. Once the upload constraints are met, either due to the time threshold or the size threshold, the Batch will be dispatched. In the case of uploading to a remote object store like S3, the whole batch of messages will be kept in a CompositeByteBuf (which is a virtual wrapper ByteBuf consisting of multiple ByteBufs) and uploaded to the destination using the netty-reactor library, allowing us to create no additional copies of data within the processing path. By building on top of ByteBufs and other Netty constructs, we were able to iterate rapidly without sacrificing performance and avoid reinventing the wheel.
Singer: Leveraging asynchronous processing to reduce thread overheads
Singer has been around at Pinterest for a long time, reliably delivering messages to PubSub backends. With more and more use cases onboarded to Singer, we’ve started to hit bottlenecks on memory usage that led to frequent OOM issues and incidents. Singer has memory and CPU resources constrained on nearly all fleets at Pinterest to avoid impact on the host service e.g. our API serving layer. After inspecting the code and leveraging debugging tools such as VisualVM, Native Memory Tracking (NMT), and pmap, we noticed various potential improvements to be done, most notably reducing the number of threads. After performing NMT result analysis we noticed the number of threads and the memory used by the stack as a result of these threads (allocated due to the Singer executor and producer thread pools).
Taking a deeper look into the source of these threads, the majority of these threads come from the thread pools for each Kafka cluster Singer publishes to. The threads in these thread pools are used to wait for Kafka to complete writing messages to a partition and then report the status of the writes. While the threads do the job, each thread in the JVM (by default) will allocate 1MB of memory used for the thread’s stack.
A Singer NMT report showing the different memory regions a JVM process allocates. The Thread entry represents the thread stack. Arena contains the off-heap/direct memory portion managed outside of the JVM heap.
Even with lazy allocation of the stack memory on the underlying operating systems until the thread is actually used, this still quickly adds up to hundreds of MBs of the process’ memory. When there are a lot of log streams publishing to multiple partitions on different clusters, the memory used by thread stacks can be easily comparable to the 800MB default heap size of Singer and eats into the resources of the application.
Each submission of KafkaWriteTask will occupy a thread. Full code can be found here
By closely examining the usage of these threads, it quickly becomes clear that most of these threads are doing non-blocking operations such as updating metrics and are perfectly suitable for asynchronous processing using CompletableFutures provided starting in Java 8. The CompletableFuture allows us to resolve the blocking calls by chaining stages asynchronously, thus replacing the usage of these threads that had to wait until the results to come back from Kafka. By utilizing the callback in the KafkaProducer.send(record, callback) method, we rely on the Kafka producer’s network client to be completely in control of the multiplexing of networking.
A brief example of the result code after using CompletableFutures. Full code can be found here
Once we convert the original logic into several chained non-blocking stages, it becomes obvious to use a single common thread pool to handle them regardless of the logstream, so we use the common ForkJoinPool that is already at our disposal from JVM. This dramatically reduces the thread usage for Singer, from a couple of hundred threads to virtually no additional threads. This improvement demonstrates the power of asynchronous processing and how network-bound applications can benefit from it.
Kafka and Singer: Balancing performance and efficiency with controllable variance
Operating our Kafka clusters has always been a delicate balance between performance, fault tolerance, and efficiency. Our logging agent Singer, at the front line of publishing messages to Kafka, is a crucial component that plays a heavy role in these factors, especially in routing the traffic by deciding which partitions we deliver data to for a topic.
The Default Partitioner: Evenly Distributed Traffic
In Singer, logs from a machine would be picked up and routed to the corresponding topic it belongs to and published to that topic in Kafka. In the early days, Singer would publish uniformly to all the partitions that topic has in a round-robin fashion using our default partitioner. For example, if there were 3000 messages on a particular host that needed to be published to a 30 partition topic, each partition would roughly receive 100 messages. This worked pretty well for most of the use cases and has a nice benefit where all partitions receive the same amount of messages, which is great for the consumers of these topics since the workload is evenly distributed amongst them.
DefaultPartitioner: Producers and Partitions are fully connected
The Single Partition Partitioner: In Favor of the Law of Large Numbers
SinglePartitionPartitioner: Ideal scenario where connections are evenly distributed
As Pinterest grew, we had fleets expanding to thousands of hosts, and this evenly-distributed approach started to cause some issues to our Kafka brokers: high connections counts and large amounts of produce requests started to elevate the brokers’ CPU usage, and spreading out the messages means that the batch sizes are smaller for each partition, or lower efficiency of the compression, resulting in higher aggregated network traffic. To tackle this, we implemented a new partitioner: the SinglePartitionPartitioner. This partitioner solves the issue by forcing Singer to only write to one random partition per topic per host, reducing the fanout from all brokers to a single broker. This partition remains the same throughout the producer’s lifetime until Singer restarts.
For pipelines that had a large producer fleet and relatively uniform message rates across hosts, this was extremely effective: The law of large numbers worked in our favor, and statistically, if the number of producers is significantly larger than partitions, each partition will still receive a similar amount of traffic. Connection count went down from (number of brokers serving the topic) times (number of producers) to only (number of producers), which could be up to a hundred times less for larger topics. Meanwhile, batching up all messages per producer to a single partition improved compression ratios by at least 10% in most use cases.
SinglePartitionPartitioner: Skewed scenario where there are too few producers vs. partitions
The Fixed Partitions Partitioner: Configurable variance for adjusting trade-offs
Despite coming up with this new solution, there were still some pipelines that lie in the middle ground where both solutions are subpar, such as when the number of producers is not large enough to outnumber the number of partitions. In this case, the SinglePartitionPartitioner would introduce significant skew between partitions: some partitions will have multiple producers writing to them, and some are assigned very few or even no producers. This skew could cause unbalanced workloads for the downstream consumers, and also increases the burden for our team to manage the cluster, especially when storage is tight. We thus recently introduced a new partitioner that can be used on these cases, and even cover the original use cases: the FixedPartitionsPartitioner, which basically allows us to not only publish to one fixed partition like the SinglePartitionPartitioner, but randomly across a fixed number of partitions.
This approach is somewhat similar to the concept of virtual nodes in consistent hashing, where we artificially create more “effective producers” to achieve a more continuous distribution. Since the number of partitions for each host can be configured, we can tune it to the sweet spot where the efficiency and performance are both at desired levels. This partitioner could also help with “hot producers” by spreading traffic out while still maintaining a reasonable connection count. Although a simple concept, it turns out that having the ability to configure the degree of variance could be a powerful tool to manage trade-offs.
FixedPartitionsPartitioner: Less skew while still keeping connection count lower than the default
Relative compression ratio and request rate skew with different numbers of fixed partitions on a 120 partition topic on 30 brokers
Conclusion and Acknowledgements
These learnings are just a few examples of improvements the Logging Platform team has been making. Despite their seemingly different nature, the ultimate goal of all these improvements was to achieve better results for our team and our customers. We hope that these findings are inspiring and could spark a few ideas for you.
None of the content in this article could have been delivered without the in-depth discussions and candid feedback from Ambud Sharma, Eric Lopez, Henry Cai, Jeff Xiang, and Vahid Hashemian on the Logging Platform team. We also deeply appreciate the great support from external teams that provided support and input on the various improvements we’ve been working on. As we strive for continuous improvement within our architecture, we hope we will be able to share more interesting findings in our pursuit of perfecting our system.