Skip to content

Conversation

@amerberg
Copy link
Contributor

@amerberg amerberg commented Aug 20, 2025

This PR introduces a significant performance optimization for pandas MultiIndex validation that reduces both execution time and memory usage.

When validating a pandas MultiIndex, the current implementation calls get_level_values for every level. This is slow and memory intensive because pandas internally doesn't represent a MultiIndex level as a single array of values. Instead, it has one array with the "levels" which stores the unique values, and a separate array of "codes" which are integer references to positions in the levels array. Calling get_level_values on an index with many rows requires pandas to allocate and populate a large array using the levels and codes.

The key idea behind the change proposed here is that many common checks can be run just as well on the array of unique values as on the full array of values. For instance, we can check that a level has the right dtype, that integer values in a level are all positive, that strings in a level conform to a maximum length, or that a column of any type does not contain any nulls just by looking at the unique values.

The approach taken here is to define a new attribute determined_by_unique on the Check class, which can be set to True to indicate that the outcome of a check depends only on the unique values in an array being checked. This attribute is also set on built-in checks as appropriate. The MultiIndexBackend is also updated to validate on the unique levels when all checks on a level have determined_by_unique=True. (In the event that validating on unique values fails, we re-run validation on the fully materialized level to ensure that failure information will be returned correctly.)

This optimization can significantly improve running time and memory usage. For instance, in my local testing, the following benchmark script reported average running time of 4.5s and peak memory usage of 453.9MB on main, which on this branch was reduced to 0.16s and 107.7MB:

import time
import tracemalloc
import pandas as pd
import pandera.pandas as pa
import numpy as np
from datetime import datetime, timedelta

def create_test_data(n_as_of_times=100):
    """Create test data parametrized by number of as_of times."""
    # Generate n_as_of_times distinct as_of times (hourly granularity)
    base_time = datetime(2024, 1, 1, 0, 0, 0)  # Start from Jan 1, 2024
    as_of_times = [base_time + timedelta(hours=i) for i in range(n_as_of_times)]
    
    # Define all possible values
    interval_offsets = list(range(1, 7))  # 1-6 hours (6 offsets)
    locations = [f"LOC_{i:03d}" for i in range(100)]  # 100 locations
    quantiles = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]  # 9 quantiles
    
    # Create all combinations for MultiIndex
    index_data = []
    for as_of in as_of_times:
        for offset in interval_offsets:
            interval_start = as_of + timedelta(hours=offset)
            for location in locations:
                for quantile in quantiles:
                    index_data.append((as_of, interval_start, location, quantile))
    
    # Create MultiIndex
    mi = pd.MultiIndex.from_tuples(
        index_data, 
        names=['as_of', 'interval_start', 'location', 'quantile'],
    )
    mi = mi.set_levels(mi.levels[2].astype(pd.StringDtype()), level=2)
    
    # Create DataFrame with MultiIndex
    df = pd.DataFrame(
        {'temperature_F': np.random.uniform(-100, 100, len(index_data))},
        index=mi
    )
    
    print(f"  Generated {len(df):,} rows from {n_as_of_times} as_of times")
    
    return df


def create_optimized_schema():
    """Create schema with MultiIndex checks that benefit from the optimization."""
    return pa.DataFrameSchema(
        columns={
            'temperature_F': pa.Column(float, pa.Check.greater_than_or_equal_to(-100))
        },
        index=pa.MultiIndex([
            pa.Index(
                'datetime64[ns]',
                checks=[
                    pa.Check.greater_than_or_equal_to(datetime(2024, 1, 1, 0, 0, 0)),
                ],
                name='as_of'
            ),
            pa.Index(
                'datetime64[ns]',
                checks=[
                    pa.Check.greater_than_or_equal_to(datetime(2024, 1, 1, 1, 0, 0)),
                ],
                name='interval_start'
            ),
            pa.Index(
                pd.StringDtype(),
                checks=[
                    pa.Check.str_matches(r'^LOC_\d{3}$'), # Regex pattern check
                ],
                name='location'
            ),
            pa.Index(
                float,
                checks=[
                    pa.Check.isin([0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]), # Valid deciles
                ],
                name='quantile'
            ),
        ])
    )


def benchmark_validation(df, schema, warmup_runs=2, time_runs=5, memory_runs=5):
    """Benchmark validation with warmup and multiple runs."""
        
    # Warmup
    print(f"  Running {warmup_runs} warmup runs...")
    for _ in range(warmup_runs):
        schema.validate(df)
    
    times = []
    peak_memories = []

    print("Beginning timed runs...")
    for run in range(time_runs):
        
        start = time.perf_counter()
        _ = schema.validate(df)
        end = time.perf_counter()
                
        run_time = end - start
        times.append(run_time)
        
        print(f"   Run {run+1}: {run_time:.3f}s")
    
    print("Beginning memory runs...")
    for run in range(memory_runs):
        # Clear any existing traces and start fresh memory tracking
        tracemalloc.stop()  # Stop any existing tracking
        tracemalloc.start()
        
        start = time.perf_counter()
        _ = schema.validate(df)
        end = time.perf_counter()
        
        # Get peak memory usage (in bytes)
        _, peak = tracemalloc.get_traced_memory()
        tracemalloc.stop()
        
        run_time = end - start
        peak_memories.append(peak)
        
        print(f"   Run {run+1}: {run_time:.3f}s, Peak Memory: {peak / 1_000_000:.1f} MB")

    print(f"   Average time: {sum(times) / len(times):.3f}s")
    print(f"   Average peak memory: {sum(peak_memories) / len(peak_memories) / 1_000_000:.1f} MB")


def main():    
    # Create test data - each as_of time generates 5,400 rows (6×100×9)
    # 2000 as_of times = 10,800,000 rows
    df = create_test_data(n_as_of_times=2000)
    schema = create_optimized_schema()
    
    benchmark_validation(df, schema)


if __name__ == "__main__":
    try:
        main()
    except Exception as e:
        print(f"  Benchmark failed: {e}")

Signed-off-by: Adam Merberg <[email protected]>
Signed-off-by: Adam Merberg <[email protected]>
Signed-off-by: Adam Merberg <[email protected]>
Signed-off-by: Adam Merberg <[email protected]>
Signed-off-by: Adam Merberg <[email protected]>
Signed-off-by: Adam Merberg <[email protected]>
@codecov
Copy link

codecov bot commented Aug 20, 2025

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 93.49%. Comparing base (812b2a8) to head (5b8b3bf).
⚠️ Report is 358 commits behind head on main.

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #2118      +/-   ##
==========================================
- Coverage   94.28%   93.49%   -0.79%     
==========================================
  Files          91      135      +44     
  Lines        7013    10796    +3783     
==========================================
+ Hits         6612    10094    +3482     
- Misses        401      702     +301     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@amerberg amerberg marked this pull request as ready for review August 20, 2025 19:30
@amerberg
Copy link
Contributor Author

@cosmicBboy Would an optimization along the lines proposed here would be viable in your view? There are probably a few design details to be worked out here, but something like this would be a big performance improvement for us.

@cosmicBboy
Copy link
Collaborator

cosmicBboy commented Aug 28, 2025

Hi @amerberg this approach seems reasonable. Some clarifying questions:

  • how is this behavior meant to change failure case reporting? Will only the first instance of each failure case value be reported?
  • it looks like the the changes in the PR only change the MultiIndex component backend. If my reading is correct, this PR won't change the behavior of Column or Index validation right?
  • where does the determined_by_unique property need to be implemented? At the check-level or the schema component level?

@amerberg
Copy link
Contributor Author

@cosmicBboy

how is this behavior meant to change failure case reporting? Will only the first instance of each failure case value be reported?

As currently written, this implementation won't change failure reporting at all. The approach taken here is to abandon the optimization and switch to validation on the full values as soon as any failure is encountered.

That does mean failed validations will be slightly slower with this change. That includes things like @check_types decorators with union types and any other situations where we run a validation expecting it to fail. So maybe we need to be a bit more careful about when it makes sense to use this optimization, or make the optimization optional and only provide the first failure case.

it looks like the the changes in the PR only change the MultiIndex component backend. If my reading is correct, this PR won't change the behavior of Column or Index validation right?

That's right. Series and Index objects don't have the same codes/levels representation as MultiIndex so there isn't as much room for improvement there as far as I know.

where does the determined_by_unique property need to be implemented? At the check-level or the schema component level?

This PR implements it at the check level, and then the optimization is applied for a level at validation time if all of the checks for the schema component have determined_by_unique = True. The rationale here was that this makes it easy for users to benefit from the optimization using built-in checks (where the attribute is set by default on the checks where it's applicable), but that might not be the right approach if failing validations are getting slower.

@cosmicBboy cosmicBboy closed this Sep 20, 2025
@cosmicBboy cosmicBboy reopened this Sep 20, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants