Ray ML Pipelines let you orchestrate complex machine learning workflows, but their real power is in how they decouple training, evaluation, and deployment into distinct, repeatable steps that can run independently.
Here’s a look at a simple pipeline in action:
import ray
from ray.ml.config import RunConfig, ScalingConfig
from ray.ml.train.integrations.huggingface import HuggingFaceTrainer
from ray.train.data import Data
from ray.train.integrations.lightning import LightningTrainer
from ray.air.integrations.mlflow import MLflowLogger
# Initialize Ray
ray.init(address="auto")
# Dummy data and model for demonstration
from torch.utils.data import Dataset, DataLoader
import torch
import torch.nn as nn
class DummyDataset(Dataset):
def __init__(self, num_samples=1000):
self.num_samples = num_samples
self.features = torch.randn(num_samples, 10)
self.labels = torch.randint(0, 2, (num_samples, 1)).float()
def __len__(self):
return self.num_samples
def __getitem__(self, idx):
return self.features[idx], self.labels[idx]
class DummyModel(nn.Module):
def __init__(self):
super().__init__()
self.linear = nn.Linear(10, 1)
self.sigmoid = nn.Sigmoid()
def forward(self, x):
return self.sigmoid(self.linear(x))
# Define a training function
def train_func(config):
model = DummyModel()
dataset = DummyDataset()
dataloader = DataLoader(dataset, batch_size=32)
criterion = nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
for epoch in range(1): # Simplified for demo
for features, labels in dataloader:
optimizer.zero_grad()
outputs = model(features)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
print(f"Epoch {epoch+1}, Loss: {loss.item()}")
# In a real scenario, you'd return model artifacts
# For simplicity, we'll just return the model instance here
return {"model": model}
# Configure the trainer
trainer = LightningTrainer(
train_loop_per_worker=train_func,
scaling_config=ScalingConfig(num_workers=2),
run_config=RunConfig(
storage_path="/tmp/ray_results",
name="my_ml_pipeline_run",
# Example of adding an MLflow logger
# You'd need to have MLflow installed and configured
# loggers=[MLflowLogger(experiment_name="RayMLPipelineDemo")]
)
)
# Run the training
result = trainer.fit()
print("Training finished.")
# The 'result' object contains information about the training run,
# including artifacts that can be used for subsequent steps like evaluation or deployment.
# For example, to access the trained model (if saved correctly):
# trained_model = result.get_model("model")
The core idea behind Ray ML Pipelines is to define a directed acyclic graph (DAG) of execution steps. Each step in the DAG is a distinct operation, such as data preprocessing, model training, hyperparameter tuning, evaluation, or even model deployment. Ray handles the execution of these steps, managing dependencies and distributing computations across your cluster.
The system is built around a few key components:
DataObjects: These represent datasets managed by Ray. They can be distributed and efficiently accessed by different workers. Ray integrates with various data sources and formats.PredictorandTrainerClasses: These are the building blocks for your pipeline steps.Trainerclasses (likeLightningTrainer,HuggingFaceTrainer,XGBoostTrainer) are used for training models, whilePredictorclasses are for inference. You can also define custom Python functions for more flexibility.PipelineObject: This orchestrates the execution of yourTrainerorPredictorcomponents. You define the order and dependencies between these components.RunConfigandScalingConfig: These configurations control how each step of your pipeline is executed.ScalingConfigdetermines the compute resources (e.g., number of workers, GPUs per worker), andRunConfighandles aspects like experiment tracking (e.g., MLflow, TensorBoard), checkpointing, and logging.
When you execute trainer.fit(), Ray serializes your training function and its dependencies, distributes it to the specified workers, and manages the training process. The result object returned is crucial; it encapsulates the outcomes of the training run, including any model artifacts, metrics, and logs, which are essential for feeding into the next stage of your pipeline.
A common pattern is to chain these Trainer or Predictor objects. For instance, the output of a Trainer (e.g., a trained model artifact) can be passed as input to an Evaluator or a Deployer component in a subsequent step. This creates a seamless flow from raw data to a deployed model.
The concept of "actors" is fundamental to how Ray orchestrates these distributed tasks. Each worker processing a part of your pipeline, or a specific component like a Trainer, is essentially an actor. Ray’s actor model provides fault tolerance and efficient communication between these distributed units of computation.
The most surprising true thing about Ray ML Pipelines is that the result object returned by a trainer.fit() call isn’t just a static snapshot; it’s a dynamic handle to the completed run. You can use result.get_model("your_model_artifact_name") to retrieve not just the model weights but potentially other associated artifacts (like tokenizers, configs) that were explicitly saved and registered during the training loop. This allows subsequent pipeline stages to seamlessly pick up exactly where the previous stage left off, with all necessary components.
This entire framework allows you to build robust, reproducible, and scalable ML systems. The next logical step after training is often evaluating the model’s performance on unseen data or deploying it to an inference endpoint.