diff --git a/airbyte_cdk/test/entrypoint_wrapper.py b/airbyte_cdk/test/entrypoint_wrapper.py index c6e4dd9e4..101880025 100644 --- a/airbyte_cdk/test/entrypoint_wrapper.py +++ b/airbyte_cdk/test/entrypoint_wrapper.py @@ -51,7 +51,6 @@ from airbyte_cdk.test.models.scenario import ExpectedOutcome -@dataclass class AirbyteEntrypointException(Exception): """Exception raised for errors in the AirbyteEntrypoint execution. diff --git a/airbyte_cdk/test/models/scenario.py b/airbyte_cdk/test/models/scenario.py index 92813101e..033807ef7 100644 --- a/airbyte_cdk/test/models/scenario.py +++ b/airbyte_cdk/test/models/scenario.py @@ -44,12 +44,21 @@ class AcceptanceTestFileTypes(BaseModel): skip_test: bool bypass_reason: str + class AcceptanceTestEmptyStream(BaseModel): + name: str + bypass_reason: str | None = None + + # bypass reason does not affect equality + def __hash__(self) -> int: + return hash(self.name) + config_path: Path | None = None config_dict: dict[str, Any] | None = None _id: str | None = None # Used to override the default ID generation configured_catalog_path: Path | None = None + empty_streams: list[AcceptanceTestEmptyStream] | None = None timeout_seconds: int | None = None expect_records: AcceptanceTestExpectRecords | None = None file_types: AcceptanceTestFileTypes | None = None diff --git a/airbyte_cdk/test/standard_tests/connector_base.py b/airbyte_cdk/test/standard_tests/connector_base.py index 3c87281c9..dac656cf4 100644 --- a/airbyte_cdk/test/standard_tests/connector_base.py +++ b/airbyte_cdk/test/standard_tests/connector_base.py @@ -10,20 +10,12 @@ from boltons.typeutils import classproperty -from airbyte_cdk.models import ( - AirbyteMessage, - Type, -) from airbyte_cdk.test import entrypoint_wrapper from airbyte_cdk.test.models import ( ConnectorTestScenario, ) from airbyte_cdk.test.standard_tests._job_runner import IConnector, run_test_job from airbyte_cdk.test.standard_tests.docker_base import DockerConnectorTestSuite -from airbyte_cdk.utils.connector_paths import ( - ACCEPTANCE_TEST_CONFIG, - find_connector_root, -) if TYPE_CHECKING: from collections.abc import Callable diff --git a/airbyte_cdk/test/standard_tests/docker_base.py b/airbyte_cdk/test/standard_tests/docker_base.py index 68bb4cf11..bc26802dd 100644 --- a/airbyte_cdk/test/standard_tests/docker_base.py +++ b/airbyte_cdk/test/standard_tests/docker_base.py @@ -10,8 +10,7 @@ import warnings from dataclasses import asdict from pathlib import Path -from subprocess import CompletedProcess, SubprocessError -from typing import Literal, cast +from typing import Any, Literal, cast import orjson import pytest @@ -35,7 +34,6 @@ from airbyte_cdk.utils.docker import ( build_connector_image, run_docker_airbyte_command, - run_docker_command, ) @@ -66,13 +64,57 @@ def is_destination_connector(cls) -> bool: return cast(str, cls.connector_name).startswith("destination-") @classproperty - def acceptance_test_config_path(cls) -> Path: - """Get the path to the acceptance test config file.""" - result = cls.get_connector_root_dir() / ACCEPTANCE_TEST_CONFIG - if result.exists(): - return result + def acceptance_test_config(cls) -> Any: + """Get the contents of acceptance test config file. - raise FileNotFoundError(f"Acceptance test config file not found at: {str(result)}") + Also perform some basic validation that the file has the expected structure. + """ + acceptance_test_config_path = cls.get_connector_root_dir() / ACCEPTANCE_TEST_CONFIG + if not acceptance_test_config_path.exists(): + raise FileNotFoundError( + f"Acceptance test config file not found at: {str(acceptance_test_config_path)}" + ) + + tests_config = yaml.safe_load(acceptance_test_config_path.read_text()) + + if "acceptance_tests" not in tests_config: + raise ValueError( + f"Acceptance tests config not found in {acceptance_test_config_path}." + f" Found only: {str(tests_config)}." + ) + return tests_config + + @staticmethod + def _dedup_scenarios(scenarios: list[ConnectorTestScenario]) -> list[ConnectorTestScenario]: + """ + For FAST tests, we treat each config as a separate test scenario to run against, whereas CATs defined + a series of more granular scenarios specifying a config_path and empty_streams among other things. + + This method deduplicates the CATs scenarios based on their config_path. In doing so, we choose to + take the union of any defined empty_streams, to have high confidence that runnning a read with the + config will not error on the lack of data in the empty streams or lack of permissions to read them. + + """ + deduped_scenarios: list[ConnectorTestScenario] = [] + + for scenario in scenarios: + for existing_scenario in deduped_scenarios: + if scenario.config_path == existing_scenario.config_path: + # If a scenario with the same config_path already exists, we merge the empty streams. + # scenarios are immutable, so we create a new one. + all_empty_streams = (existing_scenario.empty_streams or []) + ( + scenario.empty_streams or [] + ) + merged_scenario = existing_scenario.model_copy( + update={"empty_streams": list(set(all_empty_streams))} + ) + deduped_scenarios.remove(existing_scenario) + deduped_scenarios.append(merged_scenario) + break + else: + # If a scenario does not exist with the config, add the new scenario to the list. + deduped_scenarios.append(scenario) + return deduped_scenarios @classmethod def get_scenarios( @@ -83,9 +125,8 @@ def get_scenarios( This has to be a separate function because pytest does not allow parametrization of fixtures with arguments from the test class itself. """ - categories = ["connection", "spec"] try: - acceptance_test_config_path = cls.acceptance_test_config_path + all_tests_config = cls.acceptance_test_config except FileNotFoundError as e: # Destinations sometimes do not have an acceptance tests file. warnings.warn( @@ -95,15 +136,9 @@ def get_scenarios( ) return [] - all_tests_config = yaml.safe_load(cls.acceptance_test_config_path.read_text()) - if "acceptance_tests" not in all_tests_config: - raise ValueError( - f"Acceptance tests config not found in {cls.acceptance_test_config_path}." - f" Found only: {str(all_tests_config)}." - ) - test_scenarios: list[ConnectorTestScenario] = [] - for category in categories: + # we look in the basic_read section to find any empty streams + for category in ["spec", "connection", "basic_read"]: if ( category not in all_tests_config["acceptance_tests"] or "tests" not in all_tests_config["acceptance_tests"][category] @@ -121,15 +156,11 @@ def get_scenarios( scenario = ConnectorTestScenario.model_validate(test) - if scenario.config_path and scenario.config_path in [ - s.config_path for s in test_scenarios - ]: - # Skip duplicate scenarios based on config_path - continue - test_scenarios.append(scenario) - return test_scenarios + deduped_test_scenarios = cls._dedup_scenarios(test_scenarios) + + return deduped_test_scenarios @pytest.mark.skipif( shutil.which("docker") is None, @@ -332,6 +363,11 @@ def test_docker_image_build_and_read( # If `read_from_streams` is a list, we filter the discovered streams. streams_list = list(set(streams_list) & set(read_from_streams)) + if scenario.empty_streams: + # Filter out streams marked as empty in the scenario. + empty_stream_names = [stream.name for stream in scenario.empty_streams] + streams_list = [s for s in streams_list if s.name not in empty_stream_names] + configured_catalog: ConfiguredAirbyteCatalog = ConfiguredAirbyteCatalog( streams=[ ConfiguredAirbyteStream( diff --git a/airbyte_cdk/test/standard_tests/source_base.py b/airbyte_cdk/test/standard_tests/source_base.py index f995f4682..faecb03c7 100644 --- a/airbyte_cdk/test/standard_tests/source_base.py +++ b/airbyte_cdk/test/standard_tests/source_base.py @@ -120,6 +120,12 @@ def test_basic_read( if scenario.expected_outcome.expect_exception() and discover_result.errors: # Failed as expected; we're done. return + streams = discover_result.catalog.catalog.streams # type: ignore [reportOptionalMemberAccess, union-attr] + + if scenario.empty_streams: + # Filter out streams marked as empty in the scenario. + empty_stream_names = [stream.name for stream in scenario.empty_streams] + streams = [s for s in streams if s.name not in empty_stream_names] configured_catalog = ConfiguredAirbyteCatalog( streams=[ @@ -128,7 +134,7 @@ def test_basic_read( sync_mode=SyncMode.full_refresh, destination_sync_mode=DestinationSyncMode.append_dedup, ) - for stream in discover_result.catalog.catalog.streams # type: ignore [reportOptionalMemberAccess, union-attr] + for stream in streams ] ) result = run_test_job( diff --git a/airbyte_cdk/utils/connector_paths.py b/airbyte_cdk/utils/connector_paths.py index fa6f2c2d2..e93d8764c 100644 --- a/airbyte_cdk/utils/connector_paths.py +++ b/airbyte_cdk/utils/connector_paths.py @@ -86,8 +86,6 @@ def _find_in_adjacent_dirs(current_dir: Path) -> Path | None: def resolve_connector_name_and_directory( connector_ref: str | Path | None = None, - *, - connector_directory: Path | None = None, ) -> tuple[str, Path]: """Resolve the connector name and directory. @@ -104,6 +102,7 @@ def resolve_connector_name_and_directory( FileNotFoundError: If the connector directory does not exist or cannot be found. """ connector_name: str | None = None + connector_directory: Path | None = None # Resolve connector_ref to connector_name or connector_directory (if provided) if connector_ref: