Ray Workflows lets you build complex, multi-step data processing pipelines as Directed Acyclic Graphs (DAGs).

Let’s see it in action. Imagine you have a series of tasks: fetching data, processing it, and then storing it.

import ray
from ray import workflow

# Initialize Ray (if not already running)
if ray.is_initialized():
    ray.shutdown()
ray.init(num_cpus=4)

@ray.remote
def fetch_data(source_url: str) -> dict:
    """Simulates fetching data from a remote source."""
    print(f"Fetching data from {source_url}...")
    # In a real scenario, this would involve HTTP requests, DB queries, etc.
    return {"data": f"raw_data_from_{source_url}", "source": source_url}

@ray.remote
def process_data(data: dict) -> dict:
    """Simulates processing the fetched data."""
    print(f"Processing data: {data['data']}...")
    processed_content = data["data"].upper()
    return {"processed_data": processed_content, "original_source": data["source"]}

@ray.remote
def store_data(processed_data: dict) -> str:
    """Simulates storing the processed data."""
    print(f"Storing processed data: {processed_data['processed_data']}...")
    # In a real scenario, this would write to a database, file system, etc.
    storage_location = f"/storage/{processed_data['original_source']}.json"
    print(f"Data stored at: {storage_location}")
    return storage_location

# Define the workflow DAG
@workflow.step
def data_pipeline(source_url: str):
    fetched = yield fetch_data.bind(source_url)
    processed = yield process_data.bind(fetched)
    storage_path = yield store_data.bind(processed)
    return storage_path

# Run the workflow
if __name__ == "__main__":
    source = "http://example.com/data"
    run_id = "my_first_pipeline"
    print(f"Starting workflow '{run_id}' with source '{source}'...")
    
    # Execute the workflow
    # This will run the DAG and return the final result
    final_result = workflow.run(data_pipeline.bind(source), run_id=run_id)
    
    print(f"\nWorkflow '{run_id}' completed successfully!")
    print(f"Final result (storage path): {final_result}")
    
    # You can also inspect the workflow status and results
    # print(workflow.get_status(run_id=run_id))
    # print(workflow.get_result(run_id=run_id))

Ray Workflows tackles the problem of managing complex, multi-stage computations that might fail, need retries, or have dependencies between steps. It transforms your Python functions into a robust, distributed execution engine. The core idea is to define your computation as a DAG, where each node is a task and edges represent dependencies. Ray handles the scheduling, execution, fault tolerance, and result management for you.

The @workflow.step decorator is key. It turns a regular Python function into a workflow step. Inside a workflow step function, yield is used to express dependencies. When you yield a task (like fetch_data.bind(source_url)), you’re telling Ray Workflows: "This step depends on the result of fetch_data with these arguments." Ray then ensures fetch_data runs first, and its result is passed as input to process_data when it’s ready. The .bind() method is used to construct the specific task call with its arguments, making it serializable and executable by Ray.

The workflow.run() function is the entry point for executing your defined DAG. It takes the bound workflow step (the entry point of your DAG) and an optional run_id. The run_id is crucial for identifying and tracking specific executions of your workflow, allowing you to inspect status, retry failed runs, or retrieve results later. Ray Workflows manages the underlying Ray actors and tasks, ensuring that your workflow executes reliably, even if individual tasks fail and require retries.

When you define a workflow, you’re not just writing a script; you’re building a declarative plan. Ray Workflows translates this plan into a series of Ray tasks. If a task fails, Ray Workflows can be configured to retry it. It automatically handles passing the output of one task as the input to another, managing intermediate results that might be stored in object storage or on disk. This eliminates the need for manual state management and error handling across complex pipelines.

You can visualize your workflow’s structure using workflow.get_dag_json(run_id=run_id). This outputs a JSON representation of the DAG, showing all the tasks and their dependencies. This is incredibly useful for debugging and understanding the flow of your computation.

The "yield" keyword in workflow step functions isn’t just for creating dependencies; it’s also how you can conditionally execute steps or branch your logic. By yielding a list of tasks, you can express that a step depends on the completion of multiple parallel tasks. Conversely, you can use Python’s control flow (if/else, for loops) before yielding to dynamically construct parts of your DAG based on runtime conditions, though the yield itself signifies a dependency that Ray must resolve.

The most surprising aspect of Ray Workflows, and indeed many distributed DAG orchestrators, is how much state management it abstracts away. You might think you need to explicitly save intermediate results to disk or a database after each step, but Ray Workflows handles this implicitly through its object store and checkpointing mechanisms. If a workflow is interrupted and then resumed, Ray Workflows can often pick up where it left off by retrieving the already computed intermediate results, rather than re-executing everything from scratch. This is managed by the workflow.get_result() and workflow.get_status() functions, which interact with this persistent state.

The next step after mastering basic DAG orchestration is exploring advanced features like error handling strategies, custom schedulers, and integrating with external data sources and sinks more deeply.

Want structured learning?

Take the full Ray course →