The most surprising thing about sharding aggregations is that the database often has to do more work to aggregate data across shards than it would if the data were all in one place.
Let’s watch this in action. Imagine we have a users table sharded by user_id. We want to count how many users are in each country.
Here’s a simplified schema:
CREATE TABLE users (
user_id BIGINT PRIMARY KEY,
country VARCHAR(255),
signup_date DATE
);
And let’s say we have two shards:
- Shard 1:
user_idbetween 1 and 1,000,000 - Shard 2:
user_idbetween 1,000,001 and 2,000,000
If we run a simple GROUP BY on a single shard, say Shard 1:
SELECT country, COUNT(*)
FROM users
WHERE user_id BETWEEN 1 AND 1000000
GROUP BY country;
The database on Shard 1 will:
- Scan the
userstable (or an index onuser_idandcountryif one exists). - For each row, extract the
country. - Maintain an in-memory hash table or similar structure, incrementing the count for each
countryencountered. - Finally, emit the aggregated counts for each
countryfound on that shard.
This is efficient. The aggregation happens locally.
Now, let’s try to do this across both shards without any special handling:
SELECT country, COUNT(*)
FROM users
GROUP BY country;
This query, as seen by the application, is a single SQL statement. However, the database system (or the application if it’s manually sharding) needs to coordinate.
Here’s what happens conceptually:
- Query Distribution: The query is sent to a coordinator node or the primary shard.
- Parallel Execution (Shard Level): The coordinator tells each shard to execute its part of the query.
- Shard 1: Executes
SELECT country, COUNT(*) FROM users WHERE user_id BETWEEN 1 AND 1000000 GROUP BY country;and produces results like{'USA': 50000, 'CAN': 10000}. - Shard 2: Executes
SELECT country, COUNT(*) FROM users WHERE user_id BETWEEN 1000001 AND 2000000 GROUP BY country;and produces results like{'USA': 75000, 'MEX': 5000}.
- Shard 1: Executes
- Data Gathering & Re-aggregation: The coordinator node receives the partial results from all shards. It then has to perform a second aggregation step. It takes the partial results (
{'USA': 50000, 'CAN': 10000}and{'USA': 75000, 'MEX': 5000}) and aggregates them. The coordinator will combine these to get the final result:{'USA': 125000, 'CAN': 10000, 'MEX': 5000}.
This re-aggregation step is the core challenge. The coordinator needs to:
- Receive potentially large intermediate result sets from each shard.
- Perform an aggregation on these intermediate results. This might involve its own in-memory structures.
- This can become a bottleneck if the number of unique
countryvalues is large, or if the number of shards is high, or if the intermediate results themselves are massive.
The problem is exacerbated when the GROUP BY key (in this case, country) is not the sharding key (user_id). If country were the sharding key, each shard would only have data for a subset of countries, and the aggregation would be much simpler, often requiring no cross-shard communication for the GROUP BY itself.
The mental model to build is that sharding decomposes operations, but aggregation often requires recomposition. When the aggregation key is different from the sharding key, the system must:
- Push down the aggregation to each shard as much as possible (partial aggregation).
- Collect these partial aggregations.
- Perform a final, global aggregation on the partial results.
The "levers" you control are:
- Sharding Strategy: Can you shard by a key that aligns better with your most common aggregation queries? (e.g., shard by
countryif you do manyGROUP BY countryqueries, but this might create hot shards if one country has vastly more users). - Query Design: Can you filter data before aggregation to reduce the size of intermediate results?
WHERE signup_date > '2023-01-01'applied before theGROUP BYis crucial. - Database Features: Many distributed databases offer specific optimizations for cross-shard aggregations, such as distributed hash aggregation, where the coordinator doesn’t just sum up counts but might distribute the aggregation work further. Understanding your database’s specific capabilities is key.
The one thing most people don’t know is that the network transfer of intermediate aggregation results can be the dominant cost, even if the final aggregated result is small. If you have 100 shards and 100 unique countries, each shard might send 100 rows to the coordinator. But if you have 100 shards and 1 million unique countries, and each shard has data for 100,000 of them, you’re sending millions of intermediate rows across the network to be aggregated again.
The next problem you’ll likely run into is handling ORDER BY clauses on aggregated results, which often involves a similar two-phase process of sorting partial results and then merging/sorting the final set.