Ray’s serialization, primarily using Python’s pickle module, chokes on large objects, leading to performance bottlenecks.
Let’s see what happens when we try to serialize a large NumPy array.
import ray
import numpy as np
import time
# Initialize Ray
if ray.is_initialized():
ray.shutdown()
ray.init()
# Create a large NumPy array
large_array = np.random.rand(10000, 10000)
print(f"Created a large array of shape: {large_array.shape}")
# Measure serialization time
start_time = time.time()
serialized_data = ray.cloudpickle.dumps(large_array)
end_time = time.time()
print(f"Serialization took: {end_time - start_time:.4f} seconds")
print(f"Serialized data size: {len(serialized_data) / (1024 * 1024):.2f} MB")
# Measure deserialization time
start_time = time.time()
deserialized_array = ray.cloudpickle.loads(serialized_data)
end_time = time.time()
print(f"Deserialization took: {end_time - start_time:.4f} seconds")
print(f"Deserialized array shape: {deserialized_array.shape}")
ray.shutdown()
When you run this, you’ll observe significant time spent in dumps and loads, especially as the array size increases. This is because pickle, by default, serializes large, contiguous blocks of data like NumPy arrays inefficiently. It often converts them into a series of smaller Python objects and instructions, which then need to be reassembled.
Ray uses cloudpickle by default, which is an extension of the standard pickle module designed to handle more complex Python constructs, including closures and lambdas. However, its underlying serialization strategy for large, primitive data types like NumPy arrays or Pandas DataFrames is still rooted in pickle’s behavior. The core problem is that pickle isn’t designed for high-performance serialization of large binary blobs. It excels at serializing arbitrary Python object graphs, but this generality comes at a cost when dealing with structured, large data.
The key to optimizing this lies in understanding that Ray can leverage more efficient serialization methods for specific data types. For NumPy arrays, this means using a method that serializes the raw underlying buffer directly. Ray provides mechanisms to register custom serializers, and it also has built-in support for certain types.
Here’s how to get Ray to use a more efficient serialization for NumPy arrays:
First, ensure your Ray version is recent enough to have these optimizations built-in or consider using ray.data which is designed for large-scale data processing and handles serialization efficiently. For direct object serialization, Ray has improved its handling of NumPy arrays over time. The default ray.put() and ray.get() operations, when dealing with NumPy arrays, should already be attempting to use more efficient methods if available.
If you’re seeing significant overhead with large NumPy arrays, it’s often because the data is being passed through the Python object layer unnecessarily. The goal is to get Ray to serialize the ndarray’s __reduce__ method in a way that it signals the underlying buffer.
Let’s try serializing a large NumPy array again, this time focusing on how ray.put and ray.get might handle it.
import ray
import numpy as np
import time
# Initialize Ray
if ray.is_initialized():
ray.shutdown()
ray.init()
# Create a large NumPy array
large_array = np.random.rand(10000, 10000)
print(f"Created a large array of shape: {large_array.shape}")
# Measure putting the object into the object store
start_time = time.time()
object_ref = ray.put(large_array)
end_time = time.time()
print(f"ray.put took: {end_time - start_time:.4f} seconds")
# Measure getting the object from the object store
start_time = time.time()
retrieved_array = ray.get(object_ref)
end_time = time.time()
print(f"ray.get took: {end_time - start_time:.4f} seconds")
print(f"Retrieved array shape: {retrieved_array.shape}")
ray.shutdown()
The performance difference you observe between the direct ray.cloudpickle.dumps and ray.put is often where the optimization lies. ray.put stages the object into Ray’s distributed object store. For types like NumPy arrays, Ray’s internal serialization logic attempts to bypass pure Python object serialization and instead serialize the underlying memory buffer more directly. This typically involves serializing the array’s dtype, shape, and the raw byte data. When ray.get is called, it reconstructs the NumPy array from this buffer, which is significantly faster than pickling and unpickling a large number of Python objects that represent the array’s elements.
If you are still experiencing serialization issues with very large objects, consider breaking them down. For instance, a large NumPy array could be split into smaller chunks, serialized individually, and then reassembled on the receiving end. Alternatively, for tabular data, ray.data is highly recommended as it’s built from the ground up to handle distributed datasets and uses efficient, columnar storage and serialization formats like Apache Arrow.
One subtle point about how Ray handles large objects like NumPy arrays is its use of the object store. When you call ray.put(large_object), Ray doesn’t immediately serialize it for transmission. Instead, it serializes the object and places it into a distributed in-memory object store. When another task needs this object, it gets a reference to it. If the object is local to the worker node, it’s retrieved directly. If it’s on another node, Ray transfers the serialized data efficiently. The "serialization" you’re concerned about happens when data moves between nodes or when it’s written to disk, but ray.put/ray.get abstract this by managing the object store. The optimization for NumPy arrays is that the serialization format used within the object store and for inter-node transfer is more efficient than a generic pickle of the Python ndarray object.
The next challenge you’ll likely encounter is managing the memory footprint of these large objects across your Ray cluster.