Ray Actors are essentially stateful, remote Python classes.

Let’s see an actor in action. Imagine we have a simple counter actor that we want to increment remotely and retrieve its value.

import ray

# Initialize Ray if it's not already initialized.
# In a real cluster, you'd connect to the Ray cluster.
if ray.is_initialized():
    ray.shutdown()
ray.init(ignore_reinit_error=True)

@ray.remote
class Counter:
    def __init__(self):
        self.count = 0

    def increment(self):
        self.count += 1
        return self.count

    def get_count(self):
        return self.count

# Create an actor instance. This deploys the Counter class
# to a Ray worker and returns a handle (proxy object).
counter_actor_handle = Counter.remote()

# Call methods on the actor. These calls are asynchronous.
# The result is an ObjectRef, which is a future.
future_count1 = counter_actor_handle.increment.remote()
future_count2 = counter_actor_handle.increment.remote()
future_count3 = counter_actor_handle.increment.remote()

# Get the current count.
future_current_count = counter_actor_handle.get_count.remote()

# Retrieve the actual results.
results = ray.get([future_count1, future_count2, future_count3, future_current_count])

print(f"Results from increment calls: {results[:3]}")
print(f"Current count: {results[3]}")

# Expected output:
# Results from increment calls: [1, 2, 3]
# Current count: 3

This example demonstrates the core idea: Counter.remote() creates an instance of the Counter class on a Ray worker process, and counter_actor_handle is a proxy object that lets you call its methods remotely. The state (self.count) is maintained across method calls on that specific actor instance.

The problem Ray Actors solve is managing distributed state. Traditional distributed systems often struggle with shared mutable state. If you have multiple processes trying to update the same data, you run into synchronization issues, race conditions, and complex locking mechanisms. Actors encapsulate state within a single process and expose it through well-defined, asynchronous message-passing interfaces (the method calls). This simplifies state management in a distributed environment significantly.

Internally, when you call Counter.remote(), Ray serializes the Counter class definition and sends it to a Ray worker. The worker then instantiates the class. When you call a method like counter_actor_handle.increment.remote(), Ray serializes the method call and its arguments, sends it as a message to the worker hosting the actor, the worker executes the method on the actor instance, and then serializes the return value (or exception) and sends it back as an ObjectRef. This ObjectRef is a future that can be resolved using ray.get().

The key levers you control are:

  • Actor Creation: YourClass.remote() creates a new instance. You can also pass arguments to the __init__ method: YourClass.remote(arg1, arg2).
  • Method Invocation: actor_handle.method.remote(*args, **kwargs) sends a message to the actor. You can also call methods synchronously using actor_handle.method(*args, **kwargs), which internally uses ray.get().
  • Actor Placement: You can influence where an actor is placed using ray.remote(num_cpus=..., num_gpus=...) or by specifying resources in actor_handle = YourClass.options(resources={"custom_resource": 1}).remote(). This is crucial for resource management and co-locating actors with data or other compute.
  • Actor Supervision: Ray provides mechanisms for restarting actors if they crash. You can configure this behavior using ray.remote(max_restarts=...).

When you call actor_handle.method.remote(), the call is placed into a queue for that specific actor instance on its designated worker. Ray guarantees that method calls for a single actor are executed sequentially in the order they are received. This is the magic behind maintaining state consistency without explicit locks. However, if you have multiple actors, their method calls are processed concurrently by their respective workers, allowing for parallel execution across different actors.

The most surprising thing about actor method calls is that actor_handle.method.remote() returns an ObjectRef immediately, even though the actual computation might take a significant amount of time and might be running on a machine across the network. This non-blocking nature is fundamental to achieving high throughput in distributed applications, as your main application thread isn’t waiting for each remote operation to complete.

The next concept to explore is how to manage actor lifecycles and handle failures gracefully, especially in larger-scale deployments.

Want structured learning?

Take the full Ray course →