diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 6566c98..06e3fb9 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -26,7 +26,7 @@ jobs: uses: astral-sh/setup-uv@v5 - name: Install dependencies - run: uv sync --dev + run: uv sync --dev --all-extras - name: Check formatting with Ruff run: uv run ruff format . --check diff --git a/CHANGELOG.md b/CHANGELOG.md index 24f05d8..ff8c956 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,17 @@ All notable changes to this project will be documented in this file. ## [Released] - 2025-11-27 +## [1.1.0] - 2025-12-02 - 2025-12-05 + +### Added + +- Add ParallelExecutor for parallel processing with multiprocessing, threading and joblib backends. +- Added tests for parallel execution in `tests/test_parallel.py`. + +### Changed + +- Updated `report()` method to include `backend` and `n_jobs` parameters for parallel execution control. + ## [1.0.1] - 2025-11-27 ### Added diff --git a/docs/api/accessor.md b/docs/api/accessor.md index ec0e0b1..8f441d6 100644 --- a/docs/api/accessor.md +++ b/docs/api/accessor.md @@ -3,17 +3,13 @@ The `LintAccessor` class provides the `.lint` accessor for pandas DataFrames. ::: lintdata.accessor.LintAccessor - options: - show_root_heading: true - show_source: false - members: - - report - - register_check - - unregister_check - - list_custom_checks - group_by_category: true - show_signature_annotations: true - separate_signature: true +options: +show_root_heading: true +show_source: false +members: - report - register_check - unregister_check - list_custom_checks +group_by_category: true +show_signature_annotations: true +separate_signature: true ## Usage Examples @@ -81,6 +77,16 @@ df.lint.register_check(check_email_format) report = df.lint.report() ``` +### Parallel Execution + +```python +# Run checks in parallel +report = df.lint.report( + backend="multiprocessing", + n_jobs=4 +) +``` + ### Validation Pipeline ```python @@ -131,7 +137,6 @@ result = validate_dataset(df, config) - `correlation_threshold` (float): Correlation threshold (default: 0.95) - `foreign_key_mappings` (Dict[str, Union[pd.DataFrame, Tuple[pd.DataFrame, str]]]): Referential integrity mappings - ### Column-Specific Parameters - `negative_value_columns` (List[str]): Columns to check for negatives @@ -144,6 +149,12 @@ result = validate_dataset(df, config) - `output` (str): File path to save report - `return_dict` (bool): Return structured dictionary +### Parallelisation Parameters + +- `backend` (str): Parallel backend - `"joblib"`, `"threading"`, or `"multiprocessing"`. Default: `"multiprocessing"`. +- `n_jobs` (int): Number of parallel jobs. Default: `-1` (all cores) +- `min_checks_for_parallel` (int): Minimum checks to trigger automatic parallel execution (default: 3) + ## Design Patterns ### Reusable Configuration diff --git a/docs/api/report.md b/docs/api/report.md index f49c426..4e0b536 100644 --- a/docs/api/report.md +++ b/docs/api/report.md @@ -113,6 +113,18 @@ data = df.lint.report(return_dict=True) # } ``` +### Parallel Execution + +You can run checks in parallel by specifying the `backend` and `n_jobs` parameters in the `report()` method. + +```python +report = df.lint.report( + checks_to_run=["missing", "duplicates", "outliers"], + backend="multiprocessing", # Options: "multiprocessing", "threading", "joblib" + n_jobs=4 # Number of parallel jobs +) +``` + ## Usage Examples ### 1. Run All Checks (Default) diff --git a/docs/changelog/CHANGELOG.md b/docs/changelog/CHANGELOG.md index 7fb73e8..c35a582 100644 --- a/docs/changelog/CHANGELOG.md +++ b/docs/changelog/CHANGELOG.md @@ -7,6 +7,19 @@ hide: All notable changes to this project will be documented in this file. +## v1.1.0 + +### Added + +- Add ParallelExecutor for parallel processing with multiprocessing, threading and joblib backends. +- Added tests for parallel execution in `tests/test_parallel.py`. + +### Changed + +- Updated `report()` method to include `backend` and `n_jobs` parameters for parallel execution control. + +### Added + ## v1.0.1 ### Added diff --git a/docs/getting-started/quickstart.md b/docs/getting-started/quickstart.md index af7db8d..726edfd 100644 --- a/docs/getting-started/quickstart.md +++ b/docs/getting-started/quickstart.md @@ -139,6 +139,18 @@ print(data["issue_count"]) # 4 print(data["issues"]) # List of issue dicts ``` +## Parallel Execution + +Configure parallel execution to speed up large DataFrame checks: + +```python +# Run checks in parallel +report = df.lint.report( + backend="multiprocessing", + n_jobs=4 +) +``` + ## Common Workflows ### Before Analysis diff --git a/docs/index.md b/docs/index.md index 3349de3..41fac99 100644 --- a/docs/index.md +++ b/docs/index.md @@ -19,7 +19,8 @@ LintData helps data scientists, analysts, and ML engineers identify data quality ✅ **Highly Configurable** - Customise thresholds and select specific checks ✅ **Multiple Export Formats** - Text, HTML, JSON, and CSV reports ✅ **Custom Checks API** - Extend with your own validation logic -✅ **Pandas Native** - Integrates seamlessly via `.lint` accessor +✅ **Pandas Native** - Integrates seamlessly via `.lint` accessor +✅ **Parallel Execution** - Speed up checks with multiprocessing, threading, or joblib backends ## Why LintData? diff --git a/pyproject.toml b/pyproject.toml index db48bcd..9ba20aa 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "lintdata" -version = "1.0.1" +version = "1.1.0" description = "A \"linter\" for pandas DataFrames to automate data quality audits." readme = "README.md" authors = [{ name = "Heet Patel", email = "heetkpatel30@gmail.com" }] @@ -23,6 +23,9 @@ Documentation = "https://lintdata.patelheet.com" Repository = "https://github.com/patelheet30/lintdata" Issues = "https://github.com/patelheet30/lintdata/issues" +[project.optional-dependencies] +joblib = ["joblib>=1.5.2"] + [build-system] requires = ["hatchling"] build-backend = "hatchling.build" diff --git a/src/lintdata/__init__.py b/src/lintdata/__init__.py index c16abc0..dccb855 100644 --- a/src/lintdata/__init__.py +++ b/src/lintdata/__init__.py @@ -2,7 +2,7 @@ LintData: A 'linter' for pandas DataFrames to automate data quality audits. """ -__version__ = "1.0.1" +__version__ = "1.1.0" from .accessor import LintAccessor diff --git a/src/lintdata/accessor.py b/src/lintdata/accessor.py index 7f518c4..cc63d8c 100644 --- a/src/lintdata/accessor.py +++ b/src/lintdata/accessor.py @@ -5,11 +5,12 @@ import csv import json import re -from typing import Any, Dict, List, Optional, Tuple, Union +from typing import Any, Callable, Dict, List, Optional, Tuple, Union import pandas as pd from . import checks +from .parallel import ParallelExecutor from .report_formatter import HTMLReportFormatter __all__ = ["LintAccessor"] @@ -111,6 +112,9 @@ def report( report_format: str = "text", output: Optional[str] = None, return_dict: bool = False, + n_jobs: int = 1, + backend: str = "multiprocessing", + min_checks_for_parallel: int = 3, ) -> Union[str, Dict[str, Any]]: """Generate a comprehensive quality report for the DataFrame. @@ -141,6 +145,9 @@ def report( report_format (str, optional): Output format. Options: 'text', 'html', 'json', 'csv'. Defaults to 'text'. output (Optional[str], optional): File path to save the report. If None, returns as string. Defaults to None. return_dict (bool, optional): If True, returns structured dictionary instead of formatted string. Defaults to False. + n_jobs (int, optional): Number of parallel workers. -1: Use all cores, 1: Serial execution, n > 1: Use n workers. Defaults to 1. + backend (str, optional): Parallelisation backend. Options: 'multiprocessing', 'threading', 'joblib'. Defaults to 'multiprocessing'. + min_checks_for_parallel (int, optional): Minimum number of checks required to trigger parallel execution. Defaults to 3. Raises: ValueError: If invalid check names are provided or invalid format specified. @@ -165,6 +172,12 @@ def report( >>> # Get structured data >>> data = df.lint.report(return_dict=True) + + >>> # Parallel execution with all cores + >>> df.lint.report(n_jobs=-1) + + >>> # Parallel execution with specific backend + >>> df.lint.report(n_jobs=4, backend='threading') ``` """ valid_formats = ["text", "html", "json", "csv"] @@ -182,63 +195,81 @@ def report( result = HTMLReportFormatter.generate((0, 0), []) elif report_format == "json": result = json.dumps({"shape": [0, 0], "issues": [], "issue_count": 0}, indent=2) - else: # csv - result = "check,column,severity,message\n" + elif report_format == "csv": + from io import StringIO + + output_io = StringIO() + writer = csv.writer(output_io) + writer.writerow(["check", "column", "severity", "message"]) + result = output_io.getvalue() if output: with open(output, "w", encoding="utf-8") as f: - f.write(result) + f.write(result) # pyright: ignore[reportPossiblyUnboundVariable] - return result + return result # pyright: ignore[reportPossiblyUnboundVariable] - if checks_to_run == "all": - checks_to_run = None - elif isinstance(checks_to_run, str): - checks_to_run = [checks_to_run] + check_functions: List[Callable] = [] + check_kwargs: Dict[str, Dict[str, Any]] = {} available_checks = { - "missing": lambda: checks.check_missing_values(self._df), - "duplicates": lambda: checks.check_duplicate_rows(self._df), - "mixed_types": lambda: checks.check_mixed_types(self._df), - "whitespace": lambda: checks.check_whitespace(self._df), - "constant": lambda: checks.check_constant_columns(self._df), - "unique": lambda: checks.check_unique_columns(self._df, threshold=unique_column_threshold), - "outliers": lambda: checks.check_outliers(self._df, threshold=outlier_threshold), - "missing_patterns": lambda: checks.check_missing_patterns(self._df), - "case": lambda: checks.check_case_consistency(self._df), - "cardinality": lambda: checks.check_cardinality( - self._df, high_threshold=cardinality_high_threshold, low_threshold=cardinality_low_threshold + "missing": (checks.check_missing_values, {}), + "duplicates": (checks.check_duplicate_rows, {}), + "mixed_types": (checks.check_mixed_types, {}), + "whitespace": (checks.check_whitespace, {}), + "constant": (checks.check_constant_columns, {}), + "unique": (checks.check_unique_columns, {"threshold": unique_column_threshold}), + "outliers": (checks.check_outliers, {"threshold": outlier_threshold}), + "missing_patterns": (checks.check_missing_patterns, {}), + "case": (checks.check_case_consistency, {}), + "cardinality": ( + checks.check_cardinality, + {"high_threshold": cardinality_high_threshold, "low_threshold": cardinality_low_threshold}, ), - "skewness": lambda: checks.check_skewness(self._df, threshold=skewness_threshold), - "duplicate_columns": lambda: checks.check_duplicate_columns(self._df), - "type_consistency": lambda: checks.check_data_type_consistency(self._df), - "negative": lambda: checks.check_negative_values(self._df, columns=negative_value_columns), - "rare_categories": lambda: checks.check_rare_categories(self._df, threshold=rare_category_threshold), - "date_format": lambda: checks.check_date_format_consistency(self._df), - "string_length": lambda: checks.check_string_length_outliers(self._df, threshold=string_length_threshold), - "zero_inflation": lambda: checks.check_zero_inflation(self._df, threshold=zero_inflation_threshold), - "future_dates": lambda: checks.check_future_dates( - self._df, columns=future_date_columns, reference_date=future_date_reference + "skewness": (checks.check_skewness, {"threshold": skewness_threshold}), + "duplicate_columns": (checks.check_duplicate_columns, {}), + "type_consistency": (checks.check_data_type_consistency, {}), + "negative": (checks.check_negative_values, {"columns": negative_value_columns}), + "rare_categories": (checks.check_rare_categories, {"threshold": rare_category_threshold}), + "date_format": (checks.check_date_format_consistency, {}), + "string_length": (checks.check_string_length_outliers, {"threshold": string_length_threshold}), + "zero_inflation": (checks.check_zero_inflation, {"threshold": zero_inflation_threshold}), + "future_dates": ( + checks.check_future_dates, + {"columns": future_date_columns, "reference_date": future_date_reference}, ), - "special_chars": lambda: checks.check_special_characters(self._df, threshold=special_chars_threshold), - "date_anomalies": lambda: checks.check_date_range_anomalies( - self._df, columns=future_date_columns, threshold_years=threshold_years + "special_chars": (checks.check_special_characters, {"threshold": special_chars_threshold}), + "date_anomalies": ( + checks.check_date_range_anomalies, + {"columns": future_date_columns, "threshold_years": threshold_years}, ), - "correlation": lambda: checks.check_correlation_warnings(self._df, threshold=correlation_threshold), - "foreign_keys": lambda: checks.check_referential_integrity(self._df, foreign_key_mappings or {}), + "correlation": (checks.check_correlation_warnings, {"threshold": correlation_threshold}), + "foreign_keys": (checks.check_referential_integrity, {"foreign_keys": foreign_key_mappings or {}}), } if checks_to_run is None: - checks_to_execute = available_checks.keys() + checks_to_execute = list(available_checks.keys()) else: - invalid_checks = [c for c in checks_to_run if c not in available_checks] + if isinstance(checks_to_run, str) and checks_to_run == "all": + checks_to_execute = list(available_checks.keys()) + else: + checks_to_execute = checks_to_run if isinstance(checks_to_run, list) else [checks_to_run] + + invalid_checks = [c for c in checks_to_execute if c not in available_checks] if invalid_checks: raise ValueError(f"Invalid check(s): {invalid_checks}. Valid options: {list(available_checks.keys())}") - checks_to_execute = checks_to_run - all_warnings: List[str] = [] for check_name in checks_to_execute: - all_warnings.extend(available_checks[check_name]()) + func, kwargs = available_checks[check_name] + check_functions.append(func) + check_kwargs[func.__name__] = kwargs + + executor = ParallelExecutor(n_jobs=n_jobs, backend=backend, min_checks_for_parallel=min_checks_for_parallel) + results = executor.execute_checks(check_functions, self._df, check_kwargs) + + all_warnings: List[str] = [] + for func in check_functions: + all_warnings.extend(results[func.__name__]) for check_name, check_func in self._custom_checks.items(): try: @@ -279,7 +310,6 @@ def report( elif report_format == "csv": result = self._format_as_csv(all_warnings) - # Save to file if output path provided if output: with open(output, "w", encoding="utf-8") as f: f.write(result) # pyright: ignore[reportPossiblyUnboundVariable] diff --git a/src/lintdata/parallel.py b/src/lintdata/parallel.py new file mode 100644 index 0000000..f72e9ec --- /dev/null +++ b/src/lintdata/parallel.py @@ -0,0 +1,222 @@ +import multiprocessing +from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed +from typing import Any, Callable, Dict, List, Optional + +import pandas as pd + +try: + from joblib import Parallel, delayed + + JOBLIB_AVAILABLE = True +except ImportError: + JOBLIB_AVAILABLE = False + + +class ParallelExecutor: + def __init__(self, n_jobs: int = -1, backend: str = "multiprocessing", min_checks_for_parallel: int = 3): + """Initialise parallel executor. + + Args: + n_jobs (int, optional): Number of worker processes/threads. -1: Use all available cores, 1: Serial Execution, n > 1: Use n workers. Defaults to -1. + backend (str, optional): Parallelisation backend. "multiprocessing": Process Pool (CPU-bound), "threading": Thread Pool (I/O-bound), "joblib": Joblib Parallel. Defaults to "multiprocessing". + min_checks_for_parallel (int, optional): Minimum number of checks required to trigger parallel execution. Defaults to 3. + + Raises: + ImportError: If joblib is not installed and backend is set to "joblib". + ValueError: If n_jobs is not -1 or a positive integer. + """ + self.backend = backend + self.min_checks_for_parallel = min_checks_for_parallel + + if backend == "joblib" and not JOBLIB_AVAILABLE: + raise ImportError( + "joblib is not installed. Please install via 'pip install joblib' to use this backend. " + "Alternatively, choose 'multiprocessing' or 'threading' backend." + ) + + if n_jobs == -1: + self.n_jobs = multiprocessing.cpu_count() + elif n_jobs < 1: + raise ValueError("n_jobs must be -1 or a positive integer.") + else: + self.n_jobs = n_jobs + + def should_parallelise(self, n_checks: int, df_size: int) -> bool: + """Determine if parallelisation would be beneficial. + + Args: + n_checks (int): Number of checks to execute. + df_size (int): Size of DataFrame (number of rows). + + Returns: + bool: Whether parallelisation should be used. + """ + if self.n_jobs == 1: + return False + + if n_checks < self.min_checks_for_parallel: + return False + + return not df_size < 1000 + + def _execute_serial( + self, + check_functions: List[Callable], + df: pd.DataFrame, + check_kwargs: Dict[str, Dict[str, Any]], + ) -> Dict[str, List[str]]: + """Execute checks serially.""" + results = {} + + for func in check_functions: + func_name = func.__name__ + kwargs = check_kwargs.get(func_name, {}) + + try: + warnings = func(df, **kwargs) + results[func_name] = warnings + except Exception as e: + results[func_name] = [f"Check failed with error: {type(e).__name__}: {e!s}"] + + return results + + def _execute_multiprocessing( + self, + check_functions: List[Callable], + df: pd.DataFrame, + check_kwargs: Dict[str, Dict[str, Any]], + ) -> Dict[str, List[str]]: + """Execute checks using multiprocessing.""" + results = {} + + with ProcessPoolExecutor(max_workers=self.n_jobs) as executor: + future_to_func = {} + + for func in check_functions: + func_name = func.__name__ + kwargs = check_kwargs.get(func_name, {}) + future = executor.submit(self._run_check, func, df, kwargs) + future_to_func[future] = func_name + + for future in as_completed(future_to_func): + func_name = future_to_func[future] + try: + warnings = future.result() + results[func_name] = warnings + except Exception as e: + results[func_name] = [f"Check failed with error: {type(e).__name__}: {e!s}"] + + return results + + def _execute_threading( + self, check_functions: List[Callable], df: pd.DataFrame, check_kwargs: Dict[str, Dict[str, Any]] + ) -> Dict[str, List[str]]: + """Execute checks using threading.""" + results = {} + + with ThreadPoolExecutor(max_workers=self.n_jobs) as executor: + future_to_func = {} + for func in check_functions: + func_name = func.__name__ + kwargs = check_kwargs.get(func_name, {}) + future = executor.submit(self._run_check, func, df, kwargs) + future_to_func[future] = func_name + + for future in as_completed(future_to_func): + func_name = future_to_func[future] + try: + warnings = future.result() + results[func_name] = warnings + except Exception as e: + results[func_name] = [f"Check failed with error: {type(e).__name__}: {e!s}"] + + return results + + def _execute_joblib( + self, + check_functions: List[Callable], + df: pd.DataFrame, + check_kwargs: Dict[str, Dict[str, Any]], + ) -> Dict[str, List[str]]: + """Execute checks using joblib.""" + if not JOBLIB_AVAILABLE: + raise ImportError("joblib is not installed.") + + tasks = [] + func_names = [] + for func in check_functions: + func_name = func.__name__ + kwargs = check_kwargs.get(func_name, {}) + tasks.append(delayed(self._run_check_safe)(func, df, kwargs)) # type: ignore + func_names.append(func_name) + + parallel_results = Parallel(n_jobs=self.n_jobs, prefer="processes", verbose=0)(tasks) # type: ignore + results = dict(zip(func_names, parallel_results)) + return results # type: ignore + + @staticmethod + def _run_check(func: Callable, df: pd.DataFrame, kwargs: Dict[str, Any]) -> List[str]: + """ + Run a single check function. + + This is a static method to ensure it can be pickled for multiprocessing. + """ + return func(df, **kwargs) + + @staticmethod + def _run_check_safe(func: Callable, df: pd.DataFrame, kwargs: Dict[str, Any]) -> List[str]: + """ + Run a single check function with error handling. + + Used in joblib backend to capture exceptions. + """ + try: + return func(df, **kwargs) + except Exception as e: + return [f"Check failed with error: {type(e).__name__}: {e!s}"] + + def execute_checks( + self, + check_functions: List[Callable], + df: pd.DataFrame, + check_kwargs: Optional[Dict[str, Dict[str, Any]]] = None, + ) -> Dict[str, List[str]]: + """Execute multiple check functions in parallel. + + Args: + check_functions (List[Callable]): List of check functions to execute + df (pd.DataFrame): DataFrame to check + check_kwargs (Optional[Dict[str, Dict[str, Any]]], optional): Keyword arguments for each check function. + + Raises: + ValueError: If an unknown backend is specified. + + Returns: + Dict[str, List[str]]: Dictionary mapping check function names to their warning messages. + """ + check_kwargs = check_kwargs or {} + + if not self.should_parallelise(len(check_functions), len(df)): + return self._execute_serial(check_functions, df, check_kwargs) + + if self.backend == "multiprocessing": + return self._execute_multiprocessing(check_functions, df, check_kwargs) + elif self.backend == "threading": + return self._execute_threading(check_functions, df, check_kwargs) + elif self.backend == "joblib": + return self._execute_joblib(check_functions, df, check_kwargs) + else: + raise ValueError(f"Unknown backend: {self.backend}. Choose from: 'multiprocessing', 'threading', 'joblib'") + + +def get_optimal_workers() -> int: + """Get optimal number of worker processes based on CPU count. + + Returns: + int: Optimal number of worker processes. + """ + cpu_count = multiprocessing.cpu_count() + + if cpu_count > 2: + return cpu_count - 1 + return 1 diff --git a/tests/test_parallel.py b/tests/test_parallel.py new file mode 100644 index 0000000..fc6aa63 --- /dev/null +++ b/tests/test_parallel.py @@ -0,0 +1,303 @@ +"""Tests for parallel execution functionality.""" + +import pandas as pd +import pytest + +from lintdata.parallel import JOBLIB_AVAILABLE, ParallelExecutor, get_optimal_workers + + +@pytest.fixture +def sample_df(): + """Create a sample DataFrame for testing.""" + return pd.DataFrame( + { + "A": [1, 2, None, 4, 5], + "B": ["a", "b", "c", "d", "e"], + "C": [1.0, 2.0, 3.0, 4.0, 5.0], + } + ) + + +@pytest.fixture +def sample_checks(): + """Create sample check functions.""" + + def check_missing(df): + """Check for missing values.""" + missing = df.isna().sum() + if missing.any(): + return [f"Found {missing.sum()} missing values"] + return [] + + def check_duplicates(df): + """Check for duplicate rows.""" + dupes = df.duplicated().sum() + if dupes > 0: + return [f"Found {dupes} duplicate rows"] + return [] + + def check_negatives(df): + """Check for negative values.""" + numeric_cols = df.select_dtypes(include=["number"]).columns + has_negatives = (df[numeric_cols] < 0).any().any() + if has_negatives: + return ["Found negative values"] + return [] + + return [check_missing, check_duplicates, check_negatives] + + +class TestParallelExecutor: + """Test ParallelExecutor class.""" + + def test_init_default(self): + """Test default initialisation.""" + executor = ParallelExecutor() + assert executor.n_jobs > 0 + assert executor.backend == "multiprocessing" + assert executor.min_checks_for_parallel == 3 + + def test_init_custom_workers(self): + """Test initialisation with custom n_jobs.""" + executor = ParallelExecutor(n_jobs=4) + assert executor.n_jobs == 4 + + def test_init_all_cores(self): + """Test initialisation with n_jobs=-1.""" + executor = ParallelExecutor(n_jobs=-1) + assert executor.n_jobs > 0 + + def test_init_invalid_workers(self): + """Test initialisation with invalid n_jobs.""" + with pytest.raises(ValueError, match="n_jobs must be -1 or a positive integer"): + ParallelExecutor(n_jobs=0) + + with pytest.raises(ValueError, match="n_jobs must be -1 or a positive integer"): + ParallelExecutor(n_jobs=-2) + + def test_init_threading_backend(self): + """Test initialisation with threading backend.""" + executor = ParallelExecutor(backend="threading") + assert executor.backend == "threading" + + @pytest.mark.skipif(not JOBLIB_AVAILABLE, reason="joblib not installed") + def test_init_joblib_backend(self): + """Test initialisation with joblib backend.""" + executor = ParallelExecutor(backend="joblib") + assert executor.backend == "joblib" + + @pytest.mark.skipif(JOBLIB_AVAILABLE, reason="joblib is installed") + def test_init_joblib_backend_not_installed(self): + """Test initialisation with joblib backend when not installed.""" + with pytest.raises(ImportError, match="joblib backend requires joblib"): + ParallelExecutor(backend="joblib") + + def test_should_parallelise_too_few_checks(self): + """Test that parallelisation is disabled for too few checks.""" + executor = ParallelExecutor(min_checks_for_parallel=3) + assert not executor.should_parallelise(n_checks=2, df_size=10000) + + def test_should_parallelise_small_dataframe(self): + """Test that parallelisation is disabled for small DataFrames.""" + executor = ParallelExecutor() + assert not executor.should_parallelise(n_checks=5, df_size=100) + + def test_should_parallelise_serial_mode(self): + """Test that parallelisation is disabled when n_jobs=1.""" + executor = ParallelExecutor(n_jobs=1) + assert not executor.should_parallelise(n_checks=10, df_size=10000) + + def test_should_parallelise_valid_case(self): + """Test that parallelisation is enabled for valid cases.""" + executor = ParallelExecutor(n_jobs=4, min_checks_for_parallel=3) + assert executor.should_parallelise(n_checks=5, df_size=10000) + + def test_execute_serial(self, sample_df, sample_checks): + """Test serial execution.""" + executor = ParallelExecutor(n_jobs=1) + results = executor.execute_checks(sample_checks, sample_df) + + assert "check_missing" in results + assert "check_duplicates" in results + assert "check_negatives" in results + assert len(results["check_missing"]) > 0 # Should find missing values + + def test_execute_multiprocessing(self, sample_df, sample_checks): + """Test multiprocessing execution.""" + executor = ParallelExecutor(n_jobs=2, backend="multiprocessing") + + # Force parallel execution + executor.min_checks_for_parallel = 1 + results = executor.execute_checks(sample_checks, sample_df) + + assert "check_missing" in results + assert "check_duplicates" in results + assert "check_negatives" in results + + def test_execute_threading(self, sample_df, sample_checks): + """Test threading execution.""" + executor = ParallelExecutor(n_jobs=2, backend="threading") + + # Force parallel execution + executor.min_checks_for_parallel = 1 + results = executor.execute_checks(sample_checks, sample_df) + + assert "check_missing" in results + assert "check_duplicates" in results + assert "check_negatives" in results + + @pytest.mark.skipif(not JOBLIB_AVAILABLE, reason="joblib not installed") + def test_execute_joblib(self, sample_df, sample_checks): + """Test joblib execution.""" + executor = ParallelExecutor(n_jobs=2, backend="joblib") + + # Force parallel execution + executor.min_checks_for_parallel = 1 + results = executor.execute_checks(sample_checks, sample_df) + + assert "check_missing" in results + assert "check_duplicates" in results + assert "check_negatives" in results + + def test_execute_with_check_kwargs(self, sample_df): + """Test execution with check keyword arguments.""" + + def check_with_threshold(df, threshold=0): + """Check with threshold parameter.""" + numeric_cols = df.select_dtypes(include=["number"]).columns + max_val = df[numeric_cols].max().max() + if max_val > threshold: + return [f"Max value {max_val} exceeds threshold {threshold}"] + return [] + + executor = ParallelExecutor(n_jobs=1) + check_kwargs = {"check_with_threshold": {"threshold": 3}} + + results = executor.execute_checks([check_with_threshold], sample_df, check_kwargs) + assert "check_with_threshold" in results + assert len(results["check_with_threshold"]) > 0 + + def test_execute_with_failing_check(self, sample_df): + """Test that failing checks are isolated and don't crash execution.""" + + def failing_check(df): + """Check that always fails.""" + raise ValueError("Intentional error") + + def passing_check(df): + """Check that passes.""" + return ["Passing check warning"] + + executor = ParallelExecutor(n_jobs=1) + results = executor.execute_checks([failing_check, passing_check], sample_df) + + assert "failing_check" in results + assert "Check failed with error" in results["failing_check"][0] + assert "passing_check" in results + assert results["passing_check"] == ["Passing check warning"] + + @pytest.mark.skipif(not JOBLIB_AVAILABLE, reason="joblib not installed") + def test_execute_joblib_with_failing_check(self, sample_df): + """Test that joblib handles failing checks gracefully.""" + + def failing_check(df): + """Check that always fails.""" + raise ValueError("Intentional error") + + def passing_check(df): + """Check that passes.""" + return ["Passing check warning"] + + executor = ParallelExecutor(n_jobs=2, backend="joblib") + executor.min_checks_for_parallel = 1 + results = executor.execute_checks([failing_check, passing_check], sample_df) + + assert "failing_check" in results + assert "Check failed with error" in results["failing_check"][0] + assert "passing_check" in results + assert results["passing_check"] == ["Passing check warning"] + + def test_execute_empty_checks_list(self, sample_df): + """Test execution with empty checks list.""" + executor = ParallelExecutor() + results = executor.execute_checks([], sample_df) + assert results == {} + + def test_auto_parallelise_decision(self, sample_df, sample_checks): + """Test automatic parallelisation decision.""" + # Small DataFrame and few checks -> serial + small_df = pd.DataFrame({"A": [1, 2, 3]}) + executor = ParallelExecutor() + assert not executor.should_parallelise(len(sample_checks), len(small_df)) + + # Large DataFrame and many checks -> parallel + large_df = pd.DataFrame({"A": range(10000)}) + assert executor.should_parallelise(len(sample_checks), len(large_df)) + + +class TestGetOptimalWorkers: + """Test get_optimal_workers function.""" + + def test_get_optimal_workers(self): + """Test that optimal workers is calculated correctly.""" + workers = get_optimal_workers() + assert workers > 0 + assert isinstance(workers, int) + + +class TestParallelIntegration: + """Integration tests with actual LintDataAccessor.""" + + def test_report_with_parallel_true(self): + """Test report() with parallel=True.""" + df = pd.DataFrame( + { + "A": [1, 2, None, 4, 5] * 200, # Make it large enough + "B": ["a", "b", "c", "d", "e"] * 200, + } + ) + + report = df.lint.report(return_dict=True) + assert isinstance(report, dict) + assert "Missing Values" in report["issues"][0]["check"] + + def test_report_with_parallel_false(self): + """Test report() with parallel=False.""" + df = pd.DataFrame({"A": [1, 2, None, 4, 5], "B": ["a", "b", "c", "d", "e"]}) + + report = df.lint.report(return_dict=True) + assert isinstance(report, dict) + + def test_report_with_auto_parallel(self): + """Test report() with automatic parallelisation.""" + # Small DataFrame - should use serial + small_df = pd.DataFrame({"A": [1, 2, 3]}) + report = small_df.lint.report(return_dict=True) + assert isinstance(report, dict) + + # Large DataFrame - should use parallel (if enough cores) + large_df = pd.DataFrame({"A": range(10000), "B": range(10000)}) + report = large_df.lint.report(return_dict=True) + assert isinstance(report, dict) + + def test_report_with_custom_workers(self): + """Test report() with custom n_jobs.""" + df = pd.DataFrame({"A": range(1000), "B": range(1000)}) + + report = df.lint.report(n_jobs=2, return_dict=True) + assert isinstance(report, dict) + + def test_report_with_threading_backend(self): + """Test report() with threading backend.""" + df = pd.DataFrame({"A": range(1000), "B": range(1000)}) + + report = df.lint.report(backend="threading", return_dict=True) + assert isinstance(report, dict) + + @pytest.mark.skipif(not JOBLIB_AVAILABLE, reason="joblib not installed") + def test_report_with_joblib_backend(self): + """Test report() with joblib backend.""" + df = pd.DataFrame({"A": range(1000), "B": range(1000)}) + + report = df.lint.report(backend="joblib", return_dict=True) + assert isinstance(report, dict) diff --git a/uv.lock b/uv.lock index f9e677a..d9a3596 100644 --- a/uv.lock +++ b/uv.lock @@ -150,14 +150,28 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/62/a1/3d680cbfd5f4b8f15abc1d571870c5fc3e594bb582bc3b64ea099db13e56/jinja2-3.1.6-py3-none-any.whl", hash = "sha256:85ece4451f492d0c13c5dd7c13a64681a86afae63a5f347908daf103ce6d2f67", size = 134899, upload-time = "2025-03-05T20:05:00.369Z" }, ] +[[package]] +name = "joblib" +version = "1.5.2" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/e8/5d/447af5ea094b9e4c4054f82e223ada074c552335b9b4b2d14bd9b35a67c4/joblib-1.5.2.tar.gz", hash = "sha256:3faa5c39054b2f03ca547da9b2f52fde67c06240c31853f306aea97f13647b55", size = 331077, upload-time = "2025-08-27T12:15:46.575Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/1e/e8/685f47e0d754320684db4425a0967f7d3fa70126bffd76110b7009a0090f/joblib-1.5.2-py3-none-any.whl", hash = "sha256:4e1f0bdbb987e6d843c70cf43714cb276623def372df3c22fe5266b2670bc241", size = 308396, upload-time = "2025-08-27T12:15:45.188Z" }, +] + [[package]] name = "lintdata" -version = "1.0.1" +version = "1.1.0" source = { editable = "." } dependencies = [ { name = "pandas" }, ] +[package.optional-dependencies] +joblib = [ + { name = "joblib" }, +] + [package.dev-dependencies] dev = [ { name = "mkdocs" }, @@ -181,7 +195,11 @@ test = [ ] [package.metadata] -requires-dist = [{ name = "pandas", specifier = ">=2.3.3" }] +requires-dist = [ + { name = "joblib", marker = "extra == 'joblib'", specifier = ">=1.5.2" }, + { name = "pandas", specifier = ">=2.3.3" }, +] +provides-extras = ["joblib"] [package.metadata.requires-dev] dev = [