Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
uses: astral-sh/setup-uv@v5

- name: Install dependencies
run: uv sync --dev
run: uv sync --dev --all-extras

- name: Check formatting with Ruff
run: uv run ruff format . --check
Expand Down
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,17 @@ All notable changes to this project will be documented in this file.

## [Released] - 2025-11-27

## [1.1.0] - 2025-12-02 - 2025-12-05

### Added

- Add ParallelExecutor for parallel processing with multiprocessing, threading and joblib backends.
- Added tests for parallel execution in `tests/test_parallel.py`.

### Changed

- Updated `report()` method to include `backend` and `n_jobs` parameters for parallel execution control.

## [1.0.1] - 2025-11-27

### Added
Expand Down
35 changes: 23 additions & 12 deletions docs/api/accessor.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,13 @@
The `LintAccessor` class provides the `.lint` accessor for pandas DataFrames.

::: lintdata.accessor.LintAccessor
options:
show_root_heading: true
show_source: false
members:
- report
- register_check
- unregister_check
- list_custom_checks
group_by_category: true
show_signature_annotations: true
separate_signature: true
options:
show_root_heading: true
show_source: false
members: - report - register_check - unregister_check - list_custom_checks
group_by_category: true
show_signature_annotations: true
separate_signature: true

## Usage Examples

Expand Down Expand Up @@ -81,6 +77,16 @@ df.lint.register_check(check_email_format)
report = df.lint.report()
```

### Parallel Execution

```python
# Run checks in parallel
report = df.lint.report(
backend="multiprocessing",
n_jobs=4
)
```

### Validation Pipeline

```python
Expand Down Expand Up @@ -131,7 +137,6 @@ result = validate_dataset(df, config)
- `correlation_threshold` (float): Correlation threshold (default: 0.95)
- `foreign_key_mappings` (Dict[str, Union[pd.DataFrame, Tuple[pd.DataFrame, str]]]): Referential integrity mappings


### Column-Specific Parameters

- `negative_value_columns` (List[str]): Columns to check for negatives
Expand All @@ -144,6 +149,12 @@ result = validate_dataset(df, config)
- `output` (str): File path to save report
- `return_dict` (bool): Return structured dictionary

### Parallelisation Parameters

- `backend` (str): Parallel backend - `"joblib"`, `"threading"`, or `"multiprocessing"`. Default: `"multiprocessing"`.
- `n_jobs` (int): Number of parallel jobs. Default: `-1` (all cores)
- `min_checks_for_parallel` (int): Minimum checks to trigger automatic parallel execution (default: 3)

## Design Patterns

### Reusable Configuration
Expand Down
12 changes: 12 additions & 0 deletions docs/api/report.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,18 @@ data = df.lint.report(return_dict=True)
# }
```

### Parallel Execution

You can run checks in parallel by specifying the `backend` and `n_jobs` parameters in the `report()` method.

```python
report = df.lint.report(
checks_to_run=["missing", "duplicates", "outliers"],
backend="multiprocessing", # Options: "multiprocessing", "threading", "joblib"
n_jobs=4 # Number of parallel jobs
)
```

## Usage Examples

### 1. Run All Checks (Default)
Expand Down
13 changes: 13 additions & 0 deletions docs/changelog/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,19 @@ hide:

All notable changes to this project will be documented in this file.

## v1.1.0

### Added

- Add ParallelExecutor for parallel processing with multiprocessing, threading and joblib backends.
- Added tests for parallel execution in `tests/test_parallel.py`.

### Changed

- Updated `report()` method to include `backend` and `n_jobs` parameters for parallel execution control.

### Added

## v1.0.1

### Added
Expand Down
12 changes: 12 additions & 0 deletions docs/getting-started/quickstart.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,18 @@ print(data["issue_count"]) # 4
print(data["issues"]) # List of issue dicts
```

## Parallel Execution

Configure parallel execution to speed up large DataFrame checks:

```python
# Run checks in parallel
report = df.lint.report(
backend="multiprocessing",
n_jobs=4
)
```

## Common Workflows

### Before Analysis
Expand Down
3 changes: 2 additions & 1 deletion docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ LintData helps data scientists, analysts, and ML engineers identify data quality
✅ **Highly Configurable** - Customise thresholds and select specific checks
✅ **Multiple Export Formats** - Text, HTML, JSON, and CSV reports
✅ **Custom Checks API** - Extend with your own validation logic
✅ **Pandas Native** - Integrates seamlessly via `.lint` accessor
✅ **Pandas Native** - Integrates seamlessly via `.lint` accessor
✅ **Parallel Execution** - Speed up checks with multiprocessing, threading, or joblib backends

## Why LintData?

Expand Down
5 changes: 4 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "lintdata"
version = "1.0.1"
version = "1.1.0"
description = "A \"linter\" for pandas DataFrames to automate data quality audits."
readme = "README.md"
authors = [{ name = "Heet Patel", email = "heetkpatel30@gmail.com" }]
Expand All @@ -23,6 +23,9 @@ Documentation = "https://lintdata.patelheet.com"
Repository = "https://github.com/patelheet30/lintdata"
Issues = "https://github.com/patelheet30/lintdata/issues"

[project.optional-dependencies]
joblib = ["joblib>=1.5.2"]

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
Expand Down
2 changes: 1 addition & 1 deletion src/lintdata/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
LintData: A 'linter' for pandas DataFrames to automate data quality audits.
"""

__version__ = "1.0.1"
__version__ = "1.1.0"

from .accessor import LintAccessor

Expand Down
112 changes: 71 additions & 41 deletions src/lintdata/accessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@
import csv
import json
import re
from typing import Any, Dict, List, Optional, Tuple, Union
from typing import Any, Callable, Dict, List, Optional, Tuple, Union

import pandas as pd

from . import checks
from .parallel import ParallelExecutor
from .report_formatter import HTMLReportFormatter

__all__ = ["LintAccessor"]
Expand Down Expand Up @@ -111,6 +112,9 @@ def report(
report_format: str = "text",
output: Optional[str] = None,
return_dict: bool = False,
n_jobs: int = 1,
backend: str = "multiprocessing",
min_checks_for_parallel: int = 3,
) -> Union[str, Dict[str, Any]]:
"""Generate a comprehensive quality report for the DataFrame.

Expand Down Expand Up @@ -141,6 +145,9 @@ def report(
report_format (str, optional): Output format. Options: 'text', 'html', 'json', 'csv'. Defaults to 'text'.
output (Optional[str], optional): File path to save the report. If None, returns as string. Defaults to None.
return_dict (bool, optional): If True, returns structured dictionary instead of formatted string. Defaults to False.
n_jobs (int, optional): Number of parallel workers. -1: Use all cores, 1: Serial execution, n > 1: Use n workers. Defaults to 1.
backend (str, optional): Parallelisation backend. Options: 'multiprocessing', 'threading', 'joblib'. Defaults to 'multiprocessing'.
min_checks_for_parallel (int, optional): Minimum number of checks required to trigger parallel execution. Defaults to 3.

Raises:
ValueError: If invalid check names are provided or invalid format specified.
Expand All @@ -165,6 +172,12 @@ def report(

>>> # Get structured data
>>> data = df.lint.report(return_dict=True)

>>> # Parallel execution with all cores
>>> df.lint.report(n_jobs=-1)

>>> # Parallel execution with specific backend
>>> df.lint.report(n_jobs=4, backend='threading')
```
"""
valid_formats = ["text", "html", "json", "csv"]
Expand All @@ -182,63 +195,81 @@ def report(
result = HTMLReportFormatter.generate((0, 0), [])
elif report_format == "json":
result = json.dumps({"shape": [0, 0], "issues": [], "issue_count": 0}, indent=2)
else: # csv
result = "check,column,severity,message\n"
elif report_format == "csv":
from io import StringIO

output_io = StringIO()
writer = csv.writer(output_io)
writer.writerow(["check", "column", "severity", "message"])
result = output_io.getvalue()

if output:
with open(output, "w", encoding="utf-8") as f:
f.write(result)
f.write(result) # pyright: ignore[reportPossiblyUnboundVariable]

return result
return result # pyright: ignore[reportPossiblyUnboundVariable]

if checks_to_run == "all":
checks_to_run = None
elif isinstance(checks_to_run, str):
checks_to_run = [checks_to_run]
check_functions: List[Callable] = []
check_kwargs: Dict[str, Dict[str, Any]] = {}

available_checks = {
"missing": lambda: checks.check_missing_values(self._df),
"duplicates": lambda: checks.check_duplicate_rows(self._df),
"mixed_types": lambda: checks.check_mixed_types(self._df),
"whitespace": lambda: checks.check_whitespace(self._df),
"constant": lambda: checks.check_constant_columns(self._df),
"unique": lambda: checks.check_unique_columns(self._df, threshold=unique_column_threshold),
"outliers": lambda: checks.check_outliers(self._df, threshold=outlier_threshold),
"missing_patterns": lambda: checks.check_missing_patterns(self._df),
"case": lambda: checks.check_case_consistency(self._df),
"cardinality": lambda: checks.check_cardinality(
self._df, high_threshold=cardinality_high_threshold, low_threshold=cardinality_low_threshold
"missing": (checks.check_missing_values, {}),
"duplicates": (checks.check_duplicate_rows, {}),
"mixed_types": (checks.check_mixed_types, {}),
"whitespace": (checks.check_whitespace, {}),
"constant": (checks.check_constant_columns, {}),
"unique": (checks.check_unique_columns, {"threshold": unique_column_threshold}),
"outliers": (checks.check_outliers, {"threshold": outlier_threshold}),
"missing_patterns": (checks.check_missing_patterns, {}),
"case": (checks.check_case_consistency, {}),
"cardinality": (
checks.check_cardinality,
{"high_threshold": cardinality_high_threshold, "low_threshold": cardinality_low_threshold},
),
"skewness": lambda: checks.check_skewness(self._df, threshold=skewness_threshold),
"duplicate_columns": lambda: checks.check_duplicate_columns(self._df),
"type_consistency": lambda: checks.check_data_type_consistency(self._df),
"negative": lambda: checks.check_negative_values(self._df, columns=negative_value_columns),
"rare_categories": lambda: checks.check_rare_categories(self._df, threshold=rare_category_threshold),
"date_format": lambda: checks.check_date_format_consistency(self._df),
"string_length": lambda: checks.check_string_length_outliers(self._df, threshold=string_length_threshold),
"zero_inflation": lambda: checks.check_zero_inflation(self._df, threshold=zero_inflation_threshold),
"future_dates": lambda: checks.check_future_dates(
self._df, columns=future_date_columns, reference_date=future_date_reference
"skewness": (checks.check_skewness, {"threshold": skewness_threshold}),
"duplicate_columns": (checks.check_duplicate_columns, {}),
"type_consistency": (checks.check_data_type_consistency, {}),
"negative": (checks.check_negative_values, {"columns": negative_value_columns}),
"rare_categories": (checks.check_rare_categories, {"threshold": rare_category_threshold}),
"date_format": (checks.check_date_format_consistency, {}),
"string_length": (checks.check_string_length_outliers, {"threshold": string_length_threshold}),
"zero_inflation": (checks.check_zero_inflation, {"threshold": zero_inflation_threshold}),
"future_dates": (
checks.check_future_dates,
{"columns": future_date_columns, "reference_date": future_date_reference},
),
"special_chars": lambda: checks.check_special_characters(self._df, threshold=special_chars_threshold),
"date_anomalies": lambda: checks.check_date_range_anomalies(
self._df, columns=future_date_columns, threshold_years=threshold_years
"special_chars": (checks.check_special_characters, {"threshold": special_chars_threshold}),
"date_anomalies": (
checks.check_date_range_anomalies,
{"columns": future_date_columns, "threshold_years": threshold_years},
),
"correlation": lambda: checks.check_correlation_warnings(self._df, threshold=correlation_threshold),
"foreign_keys": lambda: checks.check_referential_integrity(self._df, foreign_key_mappings or {}),
"correlation": (checks.check_correlation_warnings, {"threshold": correlation_threshold}),
"foreign_keys": (checks.check_referential_integrity, {"foreign_keys": foreign_key_mappings or {}}),
}

if checks_to_run is None:
checks_to_execute = available_checks.keys()
checks_to_execute = list(available_checks.keys())
else:
invalid_checks = [c for c in checks_to_run if c not in available_checks]
if isinstance(checks_to_run, str) and checks_to_run == "all":
checks_to_execute = list(available_checks.keys())
else:
checks_to_execute = checks_to_run if isinstance(checks_to_run, list) else [checks_to_run]

invalid_checks = [c for c in checks_to_execute if c not in available_checks]
if invalid_checks:
raise ValueError(f"Invalid check(s): {invalid_checks}. Valid options: {list(available_checks.keys())}")
checks_to_execute = checks_to_run

all_warnings: List[str] = []
for check_name in checks_to_execute:
all_warnings.extend(available_checks[check_name]())
func, kwargs = available_checks[check_name]
check_functions.append(func)
check_kwargs[func.__name__] = kwargs

executor = ParallelExecutor(n_jobs=n_jobs, backend=backend, min_checks_for_parallel=min_checks_for_parallel)
results = executor.execute_checks(check_functions, self._df, check_kwargs)

all_warnings: List[str] = []
for func in check_functions:
all_warnings.extend(results[func.__name__])

for check_name, check_func in self._custom_checks.items():
try:
Expand Down Expand Up @@ -279,7 +310,6 @@ def report(
elif report_format == "csv":
result = self._format_as_csv(all_warnings)

# Save to file if output path provided
if output:
with open(output, "w", encoding="utf-8") as f:
f.write(result) # pyright: ignore[reportPossiblyUnboundVariable]
Expand Down
Loading