The OpenAI Batch API, when used asynchronously, can slash your inference costs by up to 50% by allowing you to process multiple requests in parallel without the overhead of individual API calls.
Let’s see it in action. Imagine you have a list of 1,000 customer reviews and you want to get a sentiment analysis for each.
import openai
import time
import os
# Ensure you have your OpenAI API key set as an environment variable
# openai.api_key = os.environ.get("OPENAI_API_KEY")
# --- Configuration ---
BATCH_SIZE = 100 # Process 100 requests at a time
SLEEP_INTERVAL = 30 # Seconds to wait between checking batch status
MAX_RETRIES = 5 # Max retries for batch submission
# --- Sample Data ---
# In a real scenario, this would come from your database or application
customer_reviews = [
"This product is amazing, I love it!",
"It was okay, nothing special.",
"Terrible experience, would not recommend.",
"Fast shipping and good quality.",
"The battery life is disappointing.",
"I'm very happy with my purchase.",
"It broke after only a week.",
"Great value for money.",
"The user interface is confusing.",
"Excellent customer service.",
] * 100 # Repeat to create 1000 reviews for demonstration
# --- Prepare Batch Input ---
# The Batch API expects a JSONL file where each line is a JSON object
# representing a single API request.
# For sentiment analysis, we'll use the "classify" endpoint with a custom prompt.
batch_input_filename = "batch_input.jsonl"
with open(batch_input_filename, "w") as f:
for i, review in enumerate(customer_reviews):
prompt = f"Classify the sentiment of the following customer review as Positive, Negative, or Neutral:\n\nReview: \"{review}\"\nSentiment:"
request_data = {
"custom_id": f"review_{i}", # Unique identifier for each request
"method": "POST",
"url": "/v1/completions", # Or "/v1/chat/completions" depending on your model
"body": {
"model": "gpt-3.5-turbo-instruct", # Or your preferred model
"prompt": prompt,
"max_tokens": 10,
"temperature": 0
}
}
f.write(f"{request_data}\n")
print(f"Created batch input file: {batch_input_filename}")
# --- Submit Batch ---
batch_id = None
for attempt in range(MAX_RETRIES):
try:
with open(batch_input_filename, "rb") as f:
batch_response = openai.batch.create(
file=f,
endpoint="/v1/completions", # The API endpoint the batch will call
completion_window="24h" # How long the batch will be active
)
batch_id = batch_response.id
print(f"Batch submitted successfully. Batch ID: {batch_id}")
break
except Exception as e:
print(f"Attempt {attempt + 1} failed to submit batch: {e}")
if attempt < MAX_RETRIES - 1:
time.sleep(5) # Wait before retrying
if not batch_id:
print("Failed to submit batch after multiple retries. Exiting.")
exit()
# --- Monitor Batch Status ---
print("Monitoring batch status...")
while True:
try:
batch_status = openai.batch.retrieve(batch_id)
print(f"Batch Status: {batch_status.status} ({batch_status.processed_count}/{batch_status.total_count} processed)")
if batch_status.status in ["failed", "expired", "cancelling", "cancelled"]:
print(f"Batch processing ended with status: {batch_status.status}")
break
elif batch_status.status == "completed":
print("Batch completed successfully!")
break
elif batch_status.status == "validating":
print("Batch is being validated. Waiting...")
elif batch_status.status == "pending":
print("Batch is pending. Waiting...")
elif batch_status.status == "processing":
print("Batch is processing. Waiting...")
time.sleep(SLEEP_INTERVAL)
except Exception as e:
print(f"Error retrieving batch status: {e}")
time.sleep(SLEEP_INTERVAL)
# --- Retrieve Results ---
if batch_status.status == "completed":
print("Retrieving batch results...")
try:
# The results are typically found in a file linked in the batch object
# or directly accessible via a results endpoint. For simplicity,
# we'll assume a direct download or a URL is provided.
# In a real application, you'd handle the file download and parsing.
# Example: results = openai.batch.download_results(batch_id)
print("Batch results are ready. You can download them using the batch ID.")
print(f"Batch ID: {batch_id}")
print("The results file will contain a JSONL with outcomes for each request.")
# To simulate accessing results, let's assume a results file named 'batch_results_YOUR_BATCH_ID.jsonl'
# You would typically download this from the OpenAI dashboard or via the API if available.
# For this example, we'll just print a placeholder message.
print("Please refer to your OpenAI dashboard or API documentation for downloading the results file.")
except Exception as e:
print(f"Error retrieving batch results: {e}")
The core idea is to prepare a batch_input.jsonl file. Each line in this file is a JSON object representing a single API request you would normally make. You specify the method (usually POST), the url (e.g., /v1/completions or /v1/chat/completions), and the body containing your prompt, model, and parameters. You also assign a custom_id to each request, which is crucial for matching responses back to your original inputs.
Once you have this file, you submit it to the Batch API using openai.batch.create(). You provide the file and the endpoint that all requests within the batch will hit. The API then processes these requests in parallel, often leveraging more efficient infrastructure than individual requests. You get a batch_id back, which you use to monitor the progress via openai.batch.retrieve(). The status will cycle through validating, pending, processing, and finally completed (or failed, expired). When complete, you can download the results, which will be another JSONL file containing the outcomes for each custom_id you provided.
The system solves the problem of high latency and per-request overhead associated with making thousands of individual API calls. Instead of paying for the connection setup, authentication, and processing for each of those calls, you pay a single, optimized rate for the batch. This is particularly effective for tasks like processing large datasets of text for classification, summarization, or extraction, where the actual inference time per item is relatively short compared to the overhead of a single API call. The asynchronous nature means you submit the batch and can go do other things, only needing to check back when it’s done, freeing up your application’s main threads.
The most surprising aspect for many is how dramatically the cost reduction scales. It’s not just a small percentage; for very large volumes, the savings can easily exceed 50%, sometimes reaching 70-80%, because you’re essentially amortizing the fixed costs of each API interaction over hundreds or thousands of individual inference tasks. This shifts the cost structure from per-call to per-token processing, which is far more economical for bulk operations.
The completion_window parameter, set to 24h in the example, dictates how long your submitted batch will remain active and available for processing. If the batch isn’t completed within this window (e.g., due to API load or a very large number of requests), it will expire. You need to balance this with your expected processing time to ensure your tasks get done without incurring unnecessary costs for an expired batch.
The next concept to explore is handling the results file, which can be quite large, and how to efficiently parse and integrate that data back into your application workflow.