-
Notifications
You must be signed in to change notification settings - Fork 2k
Description
Is your feature request related to a problem or challenge?
DataFusion currently lacks a built-in approximate top-k / heavy hitter aggregate function. Users who need to find the most frequently occurring values in a column must resort to GROUP BY value ORDER BY COUNT(*) DESC LIMIT k, which requires materializing all distinct groups, sorting, and truncating — a process that is both memory-intensive and slow on high-cardinality columns.
This is a common analytical query pattern with well-known streaming approximation algorithms that provide bounded-error results in constant memory. Other query engines already support this:
- ClickHouse:
topK/topKWeighted— uses the Filtered Space-Saving algorithm - PostgreSQL: Available via extensions (e.g.,
datasketches) - Druid: Built-in approximate top-k via Data Sketches
Describe the solution you'd like
Add an approx_top_k(expression, k) aggregate function that returns an approximate list of the top k most frequent values along with their estimated counts, using a streaming algorithm that operates in bounded memory.
Algorithm: Filtered Space-Saving
The implementation should use the Space-Saving algorithm (Metwally et al., 2005), which maintains a fixed-size summary of counters. When a new item arrives and the summary is full, the item with the minimum count is evicted and replaced. This guarantees that all items with frequency above N/capacity will be tracked.
The accuracy is further improved with an alpha map (as described in the Filtered Space-Saving variant), which tracks recently evicted items and filters out low-frequency noise before it enters the main summary. This is the same approach used in ClickHouse's topK implementation.
Key design points:
CAPACITY_MULTIPLIER = 3: The internal summary tracksk * 3counters (matching ClickHouse's default). Over-provisioning relative toksignificantly improves accuracy for skewed distributions.- Mergeable state: Summaries from different partitions can be merged, enabling parallel / distributed execution.
- Serializable: The summary can be serialized to/from bytes for intermediate state transfer (e.g., proto roundtrip).
- Type support: Works with any hashable scalar type (strings, integers, floats, etc.).
SQL Syntax
-- Returns a list of structs {value, count} for the top k most frequent values
SELECT approx_top_k(column_name, 5) FROM table;
-- With GROUP BY
SELECT group_col, approx_top_k(value_col, 3) FROM table GROUP BY group_col;
-- k defaults to 5 if omitted
SELECT approx_top_k(column_name) FROM table;Return Type
Returns a List(Struct({value: T, count: UInt64})) ordered by count descending, where T matches the input column type.
Describe alternatives you've considered
- Exact
GROUP BY ... ORDER BY COUNT(*) DESC LIMIT k— works but requires full materialization of all groups, which is prohibitive on high-cardinality columns (millions of distinct values). - External sketching libraries — users can implement this as a UDAF, but having it built-in lowers the barrier and ensures it's well-tested and optimized within DataFusion's execution model.
- Count-Min Sketch + Heap — another approximate approach, but Space-Saving provides deterministic error bounds and is simpler to implement correctly for top-k specifically.
Additional context
- The Space-Saving algorithm provides the guarantee that any item with true frequency ≥
N/capacitywill appear in the summary. - The Filtered Space-Saving extension (with alpha map) reduces over-counting of infrequent items, improving result quality in practice.
- This is particularly useful for observability (top error codes, top URLs), analytics (most popular products, most active users), and data profiling (frequent value detection).
- ClickHouse has validated this algorithm at scale in production for years.
References
- Metwally, A., Agrawal, D., & El Abbadi, A. (2005). Efficient Computation of Frequent and Top-k Elements in Data Streams.
- ClickHouse
topKdocumentation - Filtered Space-Saving description