Ray AIR is your new best friend for building ML pipelines, but it’s not just about connecting pre-built blocks; it’s about making them talk to each other intelligently.
Let’s see what that looks like in action. Imagine you’ve got some data in a CSV, you want to train a PyTorch model, and then deploy it using Ray Serve.
import ray
from ray import train
from ray.air import session
from ray.train.torch import TorchTrainer
from ray.air.integrations.mlflow import MLflowLoggerCallback
from ray.air.predictors import TorchPredictor
from ray.serve.model_config import ModelConfig
from ray.serve.api import deploy
# Initialize Ray
ray.init(address="auto", ignore_reinit_error=True)
# Dummy data and model for demonstration
def get_dummy_data():
import pandas as pd
return pd.DataFrame({
"feature1": [i * 0.1 for i in range(100)],
"feature2": [i * 0.5 for i in range(100)],
"target": [i * 0.1 + i * 0.5 + 0.1 for i in range(100)],
})
def get_dummy_model():
import torch
import torch.nn as nn
class SimpleModel(nn.Module):
def __init__(self):
super().__init__()
self.linear = nn.Linear(2, 1)
def forward(self, x):
return self.linear(x)
return SimpleModel()
# Define a training function
def train_func(config):
# Load data
data = get_dummy_data()
features = data[["feature1", "feature2"]].values
target = data["target"].values.reshape(-1, 1)
# Convert to PyTorch tensors
features_tensor = torch.tensor(features, dtype=torch.float32)
target_tensor = torch.tensor(target, dtype=torch.float32)
# Initialize model, optimizer, and loss
model = get_dummy_model()
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
criterion = nn.MSELoss()
# Training loop
epochs = config.get("num_epochs", 10)
for epoch in range(epochs):
optimizer.zero_grad()
outputs = model(features_tensor)
loss = criterion(outputs, target_tensor)
loss.backward()
optimizer.step()
# Report metrics to Ray AIR session
session.report({"loss": loss.item(), "epoch": epoch})
# Save model checkpoint
session.update_checkpoint({
"model_state_dict": model.state_dict(),
"optimizer_state_dict": optimizer.state_dict(),
"loss": loss.item()
})
# Configure the trainer
trainer = TorchTrainer(
train_func=train_func,
train_loop_config={"num_epochs": 5},
scaling_config=train.ScalingConfig(num_workers=1),
# Add MLflow callback for logging
callbacks=[MLflowLoggerCallback(experiment_name="ray_air_unified_pipeline")]
)
# Run the training
result = trainer.fit()
# Get the best checkpoint
best_checkpoint = result.best_checkpoint
# Deploy the model using Ray Serve
# We need to define a predictor that knows how to load and serve the checkpoint
# TorchPredictor is a good choice for PyTorch models.
predictor = TorchPredictor.from_checkpoint(
best_checkpoint,
model_class=get_dummy_model,
model_kwargs={"input_dim": 2, "output_dim": 1} # These might be needed depending on your model
)
# Deploy the predictor
# We'll use ModelConfig to specify how to load the model.
model_config = ModelConfig(
predictor_cls=TorchPredictor,
# The predictor_cls argument is actually redundant here as it's inferred from `predictor`
# but it's good to be explicit.
# The key is to pass the loaded predictor.
model=predictor
)
# Deploy the model to Ray Serve
handle = deploy(
predictor.predict, # The function that will handle incoming requests
model_config=model_config,
name="my_pytorch_model"
)
print("Model deployed! You can send requests to it.")
# Example of sending a request (this would typically be done from another client)
# import requests
# import json
# response = requests.post("http://127.0.0.1:8000/predict", json={"data": [0.1, 0.2]})
# print(response.json())
# To shut down the Ray cluster and Serve deployment
# ray.shutdown()
Ray AIR streamlines this by providing a unified interface, train.Trainable, that abstracts away the complexities of different distributed training frameworks (like PyTorch, TensorFlow, XGBoost, etc.) and their associated hardware requirements. It treats training jobs, hyperparameter tuning, and model serving as first-class citizens within the Ray ecosystem. The TorchTrainer is just one example; you’d find similar trainers for other frameworks.
The core idea is that you define your training logic in a Python function (like train_func). Ray AIR handles distributing this function across your Ray cluster. It manages data sharding, communication between workers, and collecting results. The session.report() call is crucial: it’s how your training function communicates progress and metrics back to the Ray AIR orchestrator. This is what enables features like real-time monitoring and early stopping.
The session.update_checkpoint() call saves the state of your model and optimizer. Ray AIR then makes these checkpoints accessible, allowing you to easily resume training or, as shown here, deploy the trained model. The TorchPredictor is a pre-built component that knows how to load a PyTorch model from a checkpoint and expose a predict method. Ray Serve then takes this predictor and deploys it as a scalable, fault-tolerant service.
What most people miss is how deeply integrated the checkpointing and reporting mechanisms are with Ray Serve’s deployment. When you call TorchPredictor.from_checkpoint(), you’re not just loading weights; you’re creating an object that Ray Serve understands how to manage. The deploy() function then takes this prepared predictor and wraps it in a Ray Serve deployment, automatically configuring it to receive requests and invoke the predictor’s predict method.
The next step is exploring how Ray Tune integrates with AIR for hyperparameter optimization, allowing you to automatically find the best train_loop_config for your TorchTrainer.