How Ikigai Labs Serves Interactive AI Workflows at Scale using Ray Serve

Ikigai Labs
10 min readJan 28, 2022

--

by Jaehyun Sim and Amar Shah

What We Do at Ikigai Labs

In short, Ikigai Labs provides AI-charged spreadsheets: an AI-augmented data processing and analytics collaborative, cloud platform that can be used with ease of spreadsheet. Within the platform, powerful AI-enabled end-to-end automated workflows can be built while staying in the comfort of a spreadsheet. The Ikigai Labs platform offers a collaborative space that brings data integration, processing, visualization, and interactive dashboards to users at unprecedented ease and at scale. Users can utilize the platform to automate, maintain, and enhance day-to-day mission-critical operations.

While the platform supports various features, they all revolve around the data processing pipeline. The Ikigai data pipeline encapsulates many components such as data wrangling steps, data connectors, predictive functionality, what-if scenario analysis and dashboards. Making the data pipeline scalable and highly interactive by enabling users to inject custom Python code into a traditional data pipelining platform involved solving many technical challenges. This post dives into these challenges and how Ray and Ray Serve provided excellent flexibility in resolving them.

Ikigai Data Pipeline on Ray

The Ikigai data pipeline aims to achieve the following challenging goals simultaneously:

  1. Mission-Critical data pipelines to address any real-world use case of data-centric tasks. By allowing end-users to inject custom Python code on top of traditional data wrangling and preparation features, the data pipeline can adapt to any task requirements.
  2. Highly Interactive data pipelines to bring transparency in every step of the data pipeline. End users can interact with their datasets at any point with a simple spreadsheet or Jupyter notebook view.
  3. Instantly Browsable data pipelines to enable sub-second data browsing of potentially extremely large datasets. The Ikigai data platform offers a unique way of viewing the intermediate state of datasets in the middle of the pipeline, called ‘peeking’.

Most existing data platforms provide either interactive experience but for small datasets (e.g. traditional spreadsheets) or the ability to deal with large datasets but in an “offline” or “batch processing” environment (e.g. Apache Spark). At Ikigai, we need the data pipeline to achieve data interactivity while maintaining massive scalability to accomplish the above three missions. This is what makes engineering at Ikigai Labs extremely challenging. At Ikigai Labs, we overcame this challenge by developing a novel computational and data processing architecture with the help of AI. This architecture is made operational using a distributed execution engine: Ray.

Scalability with Ray Core

Ray is an open-source project which provides a simple and universal API for building distributed applications with Python as a first-class citizen. It made it easier for small teams of data engineers and data scientists like ourselves to scale Python-based applications into distributed applications with minimal code changes.

import pandas as pd

# Business Logics
def deduplicate(df):
df.drop_duplicates(inplace=True)
return df

def sort(df, target_columns, descending):
df.sort_values(
target_columns,
ascending=not descending,
inplace=True
)
return data

df = pd.DataFrame([[1, 2, 3], [1, 2, 3], [2, 3, 4]])
df = deduplicate(df=df)

to

import pandas as pd
import ray

ray.init()

# Business Logics stay unchanged
def deduplicate(df):
df.drop_duplicates(inplace=True)
return df

def sort(df, target_columns, descending):
df.sort_values(
target_columns,
ascending=not descending,
inplace=True
)
return df

df = pd.DataFrame([[1, 2, 3], [1, 2, 3], [2, 3, 4]])
obj_ref = ray.remote(deduplicate).remote(df=df)
result_df = ray.get(df_obj_ref)

This shows that we were able to adopt Ray Core into our users’ custom Python codebase and scale it without any modification to their code. With Ray Core, unlike frameworks like Spark, users do not need to formulate their codebase into a specific format and understand the complex dependencies to massively scale their data pipeline. While Ray Core can transform any arbitrary Python code from a user into a parallelized distributed application, we focused on a library in the Ray ecosystem to achieve data interactivity as well: Ray Serve.

Interactivity with Ray Serve

Ray Serve is an easy-to-use and scalable model serving library built on Ray. Being framework-agnostic, Ray Serve can serve not only the various deep learning models, but also arbitrary Python code in a distributed manner. Since one of the biggest missions in the Ikigai data pipeline is to run user’s arbitrary Python code at scale with interactivity, Ray Serve provided answers to many challenges we faced as it enabled us to serve users’ code with real-time interaction.

Ikigai + Ray Serve: Resolving the Challenges We Faced

As the Ikigai data pipeline aims to run arbitrary Python code at scale with data interactivity, we faced a few challenges coming from the nature of arbitrary code.

Conflicting Python Library Dependencies

In the Ikigai data pipeline, it is common to have multiple custom Python scripts injected into a single pipeline, where each script requires different Python libraries to be installed. This becomes an issue when the requirements of different scripts start conflicting with each other. Such issues become even more complicated when the data pipeline operates on a distributed cluster as it can be a nightmare to manage different Python environments even within a single machine setup.

As an answer to the challenge, we decided to serve custom Python scripts with Ray Serve. When defining a Serve deployment, Ray Serve lets you define a customizable Conda environment manifest, which will be automatically installed and managed in the Ray cluster. Custom Python scripts are assigned to Serve deployments based on their dependencies, so the data pipeline can handle as many custom Python scripts as needed without worrying about conflicts.

import ray
from ray import serve

ray.init(address="auto")

conda_env = {
"channels": ["conda-forge", "defaults"],
"dependencies": [
"python=3.7",
{
"pip": [
"numpy==1.20.2",
"pandas==1.2.4",
"scipy==1.6.3",
"boto3==1.7.0"
]
}
]
}

@serve.deployment(
name="custom_service",
ray_actor_options={"runtime_env": {"conda": conda_env}}
)
class CustomService:

def deduplicate(self, data):
data.drop_duplicates(inplace=True)
return data

def sort(self, data, target_columns, descending):
data.sort_values(
target_columns,
ascending=not descending,
inplace=True
)
return data

CustomService.deploy()

The definition of the Python environment manifest will be fed into the serve.deployment decorator, making Conda environment management very lightweight.

Task Overhead for ‘Peek’

To achieve the mission of an ‘instantly browsable’ data pipeline, the Ikigai data pipeline enabled the sub-second peeking of intermediate datasets.

In the data pipeline cluster, there exist internal Python libraries which hold all the traditional data preparation functions. Ray cluster and simple Python runner containers utilize the same functionalities in different locations, depending on the type of pipeline task: scalable run or peek run. In the scenario of a scalable run, tasks are run on Ray cluster, utilizing Ray’s distributed computing engine.

On the other hand, the data pipeline will simply utilize local Python runner containers for peek runs. This design decision was made to eliminate the ‘task submission overhead’ (about ~10 seconds) which is generated when submitting a Python task definition to a Ray Cluster. With such a design, peek runs on local Python runner containers that process subsets of the whole dataset in under a second as they don’t submit any tasks to the Ray cluster.

Here, a specific challenge emerges because we serve users’ custom Python scripts with Ray Serve. Ray Serve’s dependency management solution is essential in this case but now peek runs will suffer from the task submission overhead time problem. Thus, in order to leverage Ray Serve without incurring the task submission overhead we looked into additional features. Thankfully, Ray Serve has adopted FastAPI to expose HTTP endpoints for Serve instances which allows us to peek with Ray Serve without incurring additional overhead.

@serve.deployment(name="custom_service")
class CustomService:

def deduplicate(self, data):
data.drop_duplicates(inplace=True)
return data

to

app = FastAPI()

@serve.deployment(name="custom_service")
@serve.ingress(app)
class CustomService:
@app.post("/deduplicate")
def deduplicate(self, request: typing.Dict):
data = request["data"]
columns = request["columns"]

import pandas as pd
data = pd.DataFrame(data, columns=columns)

data.drop_duplicates(inplace=True)
return data.values.tolist()

Having this easy transformation of a Serve instance into a HTTP compatible version, simple Python runner containers are able to run custom Python scripts without suffering from the task submission overhead.

import requests

df = requests.post(
"http://ray-serve-service.ray:8000/custom_service/deduplicate",
json={"data": [[1, 2, 3], [1, 2, 3], [2, 3, 4]]}
).json()

Handling Concurrent Deployments

Establishing an interactive data pipeline enabled our users to have faster development cycles with their business logic written in Python. Such fast-paced cycles encouraged more frequent updates of their Python custom scripts, which led to our system re-deploying the same Ray Serve deployments repeatedly. These updates are often executed concurrently as the Ikigai data pipeline allows multiple team members to collaborate even within the same data pipeline. In that scenario, we encountered race condition problems as multiple users tried to update the same Serve instance at the same time. To resolve this challenge, we collaborated with the Ray Serve team at Anyscale to contribute a patch to Ray Serve that allowed us to solve this problem by setting a `prev_version` flag when updating a deployment, allowing us to detect and avoid race conditions.

# Plain Deployment
CustomService.deploy()

# Version-aware Deployment
CustomService.options(
version="new_version", prev_version="old_version"
).deploy()

With the version-aware deployment, the Ikigai data pipeline system can now catch unwanted concurrent deployments.

Skewed Traffic on Serve Instance

Unlike other challenges introduced above, we faced a challenge which does not break the system or the mission of the Ikigai data pipeline but is more related to resource management optimization. With the nature of platform users’ unpredictable behavior, a small subset of Serve instances were having highly concentrated traffic. This syndrome showed that if we scale all Serve instances equally, we would suffer from lack of availability for popular Serve instances while wasting resources on unpopular instances. To tackle this problem, we decided to keep track of the amount of traffic on each Serve instance and scale each instance proportionally to its traffic size using Ray Serve’s in-house replica count control.

@serve.deployment(
name="deduplicate_service",
num_replicas=2
)
class DeduplicateService:

def deduplicate(self, data):
data.drop_duplicates(inplace=True)
return data

@serve.deployment(
name="sort_service",
num_replicas=5
)
class SortService:

def sort(self, data, target_columns, descending):
data.sort_values(
target_columns,
ascending=not descending,
inplace=True
)
return data

Throughout the adoption of Ray Serve to our system, we were fairly impressed by how well-designed the Ray Serve APIs and its deployment managements were. As a software engineer trying to incorporate a new framework to an existing platform, we felt the experience has been very smooth as most of the challenges we faced did not require significant ‘patching’; we could find in-house solutions for our problems following Ray Serve’s design.

Future of Ikigai Data Pipeline

Having a working system of scalable and interactive data pipeline with Ray Core and Ray Serve, we decided to incorporate a few more Ray sub-projects to take our platform further.

Ray Client

Ray Client provides a very simple way to connect a local Python script to a Ray Cluster.

import pandas as pd
import ray

# Remotely connect to Ray Cluster
ray.client("http://ray-client-service.ray:10001").connect()

@serve.deployment(name="custom_service")
class CustomService:

def deduplicate(self, data):
data.drop_duplicates(inplace=True)
return data

df = pd.DataFrame([[1, 2, 3], [1, 2, 3], [2, 3, 4]])
obj_ref = CustomService.get_handle().deduplicate.remote(data=df)
result_df = ray.get(df_obj_ref)

Adopting Ray Client into the Ikigai data pipeline cluster will be extremely beneficial, as it will help us eliminate the task submission step completely.

Ray Workflow

Ray Workflow is yet another Ray sub-project which is under active construction. Ray Workflow aims to add fault-tolerance to any Ray remote tasks and provide recovery of the tasks on distributed systems. Having Ray Workflow embedded as a backbone of the Ikigai data pipeline will not only boost the user experiences by saving their time but also optimize the resource management of the system.

Summary

We believe the Ray team is building a truly awesome product. It is helping Ikigai Labs build a very unique data pipeline system. We are excited to evolve our platform even further, alongside the Ray team’s endeavor.

We talked about this topic at Ray Summit 2021 and you can watch the full video here.

Check out some client solutions we were able to build and if you are interested in trying out our product, book a demo here. We are also hiring!

About the Authors

Jaehyun Sim (Director of Engineering)

Jaehyun Sim is the Director of Engineering at Ikigai Labs, where he is building a highly scalable and interactive data pipelining platform for raw data. He is CNCF-certified CKA and CKAD and enjoys working with solving big data problems with cloud native approach, such as Kubernetes and AWS. He is currently working at making big data more transparent by making data pipelines both massively scalable and easily visualizable. He worked previosuly at Celect, Inc as Data Engineer and has undergraduate degrees in Computer Science and Statistics from UC Berkeley. When he is not sitting in front of his laptop, you might find him training in the woods as a Sergeant in US Army Reserve.

Amar Shah (Software Engineer Team Lead)

Amar Shah is a Software Engineer Team Lead at Ikigai Labs where he designs and builds the core infrastructure to support the company’s highly scalable dynamic data pipelineing platform. He is CNCF-certified CKAD and his experiences range from building Kubernetes applications to scaling infrastructure with the latest developments in cloud native technologies. He worked previously as a Software Engineer at IBM Cloud and is an alumni of Cornell Computer Science. When he’s not at the computer, Amar is an avid cook and landscape photographer.

Original publication date: August 19, 2021 (Anyscale Blog)

Special Thanks

James Oh, Robert Xin, Robbie Jung, Aldreen Venzon @ Ikigai Labs for helping us design the system

Edward Oakes @ Anyscale for introducing Ray Serve to us and his effort of helping us with Ray integration

Yi Cheng, Siyuan (Ryans) Zhuang @ Anyscale for introducing Ray Workflow and detailed guidance

Michael Galarnyk @ Anyscale for helping us organize this post

--

--

Ikigai Labs
Ikigai Labs

Written by Ikigai Labs

The only operational BI platform that turns actionable insights into insightful actions.

No responses yet