Ray Serve, when used for gRPC streaming, can be surprisingly efficient at delivering real-time inference results, but its true power lies in its ability to multiplex requests over a single connection, making it far more than just a simple RPC endpoint.

Let’s see it in action. Imagine a scenario where we have a Ray Serve deployment running a model. We want to stream predictions back to a client as new data arrives.

Here’s a simplified Python example for the Ray Serve deployment:

from ray import serve
import ray
import time
import grpc
from concurrent import futures

# Assume we have a model and a prediction function
def predict(data):
    # Simulate some heavy computation
    time.sleep(0.1)
    return {"prediction": data * 2}

@serve.deployment(route_prefix="/stream")
class StreamingModel:
    def __init__(self, model_path=None):
        # Load your model here if needed
        pass

    async def __call__(self, request):
        # This is where the gRPC streaming logic happens
        # For simplicity, we'll simulate streaming data
        async def stream_response():
            for i in range(10):
                data_to_process = i
                result = predict(data_to_process)
                yield result
                await asyncio.sleep(0.05) # Simulate data arrival rate

        return stream_response()

# Initialize Ray if not already running
if not ray.is_initialized():
    ray.init(ignore_reinit_error=True)

# Deploy the model
app = StreamingModel.bind()
serve.run(app)

print("Ray Serve streaming deployment is running.")
print("Access via gRPC on port 8000 (default).")

And here’s a conceptual client that would consume this stream:

import grpc
import asyncio

# Assume 'inference_pb2' and 'inference_pb2_grpc' are generated from a .proto file
# For this example, we'll imagine they exist.
# from . import inference_pb2, inference_pb2_grpc

# Placeholder for generated protobuf messages and service
class MockInferenceStub:
    def StreamPredict(self, request_iterator):
        print("Client: Starting stream...")
        for item in request_iterator:
            # Simulate receiving data from the server stream
            print(f"Client received: {item}")
            # In a real scenario, item would be a protobuf message
            yield {"prediction": item.get("prediction")} # Simulate protobuf response

class MockChannel:
    def __init__(self):
        pass
    def __enter__(self):
        return self
    def __exit__(self, exc_type, exc_val, exc_tb):
        pass
    def __call__(self, *args, **kwargs):
        return MockInferenceStub()

async def run_client():
    # Replace with actual channel and stub creation
    # with grpc.insecure_channel("localhost:8000") as channel:
    #     stub = inference_pb2_grpc.InferenceStub(channel)
    #     request_iterator = ({"data": i} for i in range(10)) # Simulate client sending data
    #     response_stream = stub.StreamPredict(request_iterator)
    #     for response in response_stream:
    #         print(f"Client received: {response}")

    # Using mock objects for demonstration
    mock_channel = MockChannel()
    async with mock_channel as channel:
        stub = channel() # This would be inference_pb2_grpc.InferenceStub(channel)
        async def generate_requests():
            for i in range(10):
                print(f"Client sending: {i}")
                await asyncio.sleep(0.1) # Simulate client sending data
                yield {"data": i} # Simulate protobuf request

        response_stream = stub.StreamPredict(generate_requests())
        for response in response_stream:
            print(f"Client received: {response}")

if __name__ == "__main__":
    asyncio.run(run_client())

The core problem Ray Serve’s gRPC streaming addresses is inefficient communication for scenarios requiring continuous or near-continuous data flow. Traditional RPCs are request-response. For real-time updates, you’d either poll (inefficient) or open many connections (resource intensive). gRPC streaming allows a single, long-lived connection to carry multiple independent streams of messages. Ray Serve integrates with this by allowing its deployments to return asynchronous iterators (generators that yield results). Ray Serve then handles the gRPC serialization and transmission of these yielded items as a stream.

Internally, when a __call__ method in a Ray Serve deployment returns an asynchronous iterator, Ray Serve recognizes this as a streaming request. It sets up a gRPC server that wraps this iterator. For each item yielded by the iterator, Ray Serve serializes it and sends it over the established gRPC stream to the client. The client, in turn, has a gRPC stub configured to handle server-side streaming, which iterates over the incoming messages from the server. The beauty is that Ray Serve’s sharding and load balancing mechanisms still apply. Multiple clients can connect, and Ray Serve will distribute these streaming requests across its worker replicas. If one replica goes down, Ray’s fault tolerance mechanisms can re-route subsequent requests to healthy replicas without interrupting the client’s overall experience if the client implements appropriate retry logic.

The key levers you control are primarily within your __call__ method. By returning an async def function that yields results, you enable streaming. The rate at which you yield and the asyncio.sleep calls within your generator dictate the "real-time" aspect of the inference. You can also configure Ray Serve’s replica counts and autoscaling to handle the load of many concurrent streaming connections. The underlying gRPC server configuration (handled by Ray Serve implicitly) also plays a role, but for most use cases, the defaults are sufficient.

What most people miss is how Ray Serve’s internal request batching interacts with streaming. If your model can process batches of data, Ray Serve can, under certain configurations, internally batch items yielded by your streaming generator before they are sent over the network. This can drastically improve throughput by amortizing network overhead and allowing your model to run more efficiently on batched inputs. However, this can also increase latency, as the batch might need to fill up before any results are sent. You often need to tune ray.serve.batch decorators or related configurations to find the sweet spot between latency and throughput for your specific streaming application.

The next logical step is to explore how to handle client-side streaming and bidirectional streaming with Ray Serve.

Want structured learning?

Take the full Ray course →