Data scientists and machine learning engineers at Pinterest found themselves hitting major challenges with existing tools. Hive and Presto were readily accessible tools for large scale data transformations, but complex logic is difficult to write in SQL. Some engineers wrote complex logics in Cascading or Scala Spark jobs, but these have a steep learning curve and take significantly more time to learn and build jobs. Furthermore, data scientists and machine learning engineers often trained models in a small-scale notebook environment, but they lacked the tools to perform large-scale inference.
To combat these challenges, we, (machine learning and data processing platform engineers), built and productionized PySpark infrastructure. The PySpark infrastructure gives our users the following capabilities:
- Writing logic using the familiar Python language and libraries, in isolated environments that allow experimenting with new packages.
- Rapid prototyping from our JupyterHub deployment, enabling users to interactively try out feature transformations, model ideas, and data processing jobs.
- Integration with our internal workflow system, so that users can easily productionize their PySpark applications as scheduled workflows.
PySpark on Kubernetes as a minimum viable product (MVP)
Figure 1. An overview of the MVP architecture
The infrastructure consists of Kubernetes pods carrying out different tasks:
- Spark Master managing cluster resources
- Workers — where Spark executors are spawned
- Jupyter servers assigned to each user
This architecture enabled our users to experience the power of PySpark for the first time. Data scientists were able to quickly grasp Python UDFs, transform features, and perform batch inference of TensorFlow models with terabytes of data.
This architecture, however, had some limitations:
- Jupyter notebook and PySpark driver share resources since they are in the same pod.
- Driver’s port and address are hard-coded in the config.
- Users can launch only one PySpark application per assigned Jupyter server.
- Python dependency per user/team is difficult.
- Resource management is limited to FIFO approach across all the users (no queue defined).
As the demand for PySpark grew, we worked on a production-grade PySpark infrastructure based on Yarn, Livy, and Sparkmagic.
Production-grade PySpark infrastructure
Figure 2: An overview of the production architecture
In this architecture, each Spark application runs on the YARN cluster. We use Apache Livy to proxy between our internal JupyterHub, the Spark application and the YARN cluster. On Jupyter, Sparkmagic provides a PySpark kernel that forwards the PySpark code to a running Spark application. Conda provides isolated Python environments for each application.
With this architecture, we offer two development approaches.
- A user creates a conda environment zip containing Python packages they need, if any.
- From JupyterHub, they create a notebook with PySpark kernel from Sparkmagic.
- In the notebook, they declare resources required, conda environment, and other configuration. Livy launches a Spark application on the YARN cluster.
- Sparkmagic ships the user’s Jupyter cells (via Livy) to the PySpark application. Livy proxies results back to the Jupyter notebook.
See the attached picture (see Appendix) for a full annotated example of a Jupyter notebook.
Non-interactive development (ad-hoc and production workflow runs):
- A Pinterest-internal Job Submission Service acts as the gateway to the YARN cluster.
- In development, the user’s local Python code base is packaged into an archive and submitted to launch a PySpark application in YARN.
- In scheduled production runs, the production build’s archive is submitted instead.
This infrastructure offers us the following benefits:
- No resources sharing between Jupyter notebook and PySpark drivers
- No hard-coded drivers’ ports and addresses
- Users can launch many PySpark applications
- Efficient resource allocation and isolation with aggressive dynamic allocation for high resource utilization
- Python dependency per user is supported
- Resource accountable
- Dr. Elephant for PySpark Job analyses
Pinterest JupyterHub Integration: (benefits #1,2,3)
We set up Apache Livy, which provides a REST API proxy from Jupyter to the YARN cluster and PySpark applications.
A YARN cluster: (benefit #4)
- Efficient resource allocation and isolation. We define a queue structure with Fair Scheduler to ensure dedicated resources and preemptable under certain conditions (e.g. after waiting for at least 10 minutes) but a portion of non-preemptable resources will be held for queues with minResource being set. Scheduler and resource manager logs are to manage cluster resources.
- Aggressive Dynamic allocation policy for high resource utilization. We set the policy where a PySpark application holds at most a certain amount of executors and automatically releases resources once they don’t need. This policy makes sure resources are recycled faster, leading to a better resource utilization.
Python Dependency Management: (benefit #5)
Users can try various Python libraries (e.g. different ML frameworks) without asking platform engineers to install them. To that end, we created a Jenkins job to package a conda environment based on a requirement file, and archive it as a zip file on S3. PySpark applications launched with “ — archives” to broadcast zip file to driver along with all executors, and reset both “PYSPARKPYTHON” (for driver) as well as “spark.yarn.appMasterEnv.PYSPARKPYTHON” (for executors). That way, each application runs under in an isolated Python environment with all libraries needed.
Integrating with Pinterest-internal Job Submission Service (JSS): (benefit #6)
To productionize PySpark applications, users leverage the internal workflow system to schedule. We provided a workflow template to integrate with job submission interfaces to specify code location, parameters, and a Python environment artifact to use.
Self-service job performance analysis: (benefit #7)
We forked the open-sourced Dr. Elephant, and added new heuristics to analyze application’s configuration with various kinds of runtime metrics (executor, job, stage, …). This service provides tuning suggestions and offers guidelines on how to write a spark job properly. The service alleviates users’ debugging-and-troubleshooting pain, boosting the velocity. Moreover, it avoids resource waste and improves cluster stability. Below is an example of the performance analysis.
Figure 3: An overview of Dr. Elephant
PySpark is now being used throughout our Product Analytics and Data Science, and Ads teams for a wide range of use cases.
- Training: users can train models with mllib or any Python machine learning frameworks (e.g. TensorFlow) iteratively with any size of data.
- Inference: users can test and productionize their Python codes for inferences without depending on platform engineers.
- Ad-hoc analyses: users can perform various ad-hoc analyses as needed.
Moreover, our users now have the freedom to explore various Python dependencies and use Python UDF for large scale data.
We thank David Liu (EM, Machine Learning Platform team), Ang Zhang (EM, Data Processing Platform team), Tais (our TPM), Pinterest Product Analytics and Data Science organization (Sarthak Shah, Grace Huang, Minli Zhang, Dan Lee, Ladi Ositelu), Compute-Platform team (Harry Zhang, June Liu), Data Processing Platform team (Zaheen Aziz), Jupyter team (Prasun Ghosh — Tech Lead) for their support and the collaborations.
Appendix — An example of our use-case (Appendix):
Below is an example of how our users train a model, and run inference logic at scale from their Jupyter notebook with PySpark. We leave explanations in each cell.