Skip to content
64 changes: 64 additions & 0 deletions airbyte_cdk/sources/file_based/file_based_file_transfer_reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from abc import ABC, abstractmethod

from airbyte_cdk.sources.file_based.remote_file import RemoteFile


class AbstractFileBasedFileTransferReader(ABC):
FILE_SIZE_LIMIT = 1_500_000_000

def __init__(self, remote_file: RemoteFile) -> None:
self.remote_file = remote_file

@property
@abstractmethod
def file_id(self) -> str:
"""
Unique identifier for the file being transferred.
"""
...

@property
@abstractmethod
def file_created_at(self) -> str:
"""
Date time when the file was created.
"""
...

@property
@abstractmethod
def file_updated_at(self) -> str:
"""
Date time when the file was last updated.
"""
...

@property
@abstractmethod
def file_size(self) -> int:
"""
Returns the file size in bytes.
"""
...

@abstractmethod
def download_to_local_directory(self, local_file_path: str) -> None:
"""
Download the file from remote source to local storage.
"""
...

@property
@abstractmethod
def source_file_relative_path(self) -> str:
"""
Returns the relative path of the source file.
"""
...

@property
def file_uri_for_logging(self) -> str:
"""
Returns the URI for the file being logged.
"""
return self.remote_file.uri
81 changes: 68 additions & 13 deletions airbyte_cdk/sources/file_based/file_based_stream_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
#

import logging
import time
from abc import ABC, abstractmethod
from datetime import datetime
from enum import Enum
from io import IOBase
from os import makedirs, path
from typing import Any, Callable, Iterable, List, MutableMapping, Optional, Set, Tuple
from typing import Any, Callable, Iterable, List, MutableMapping, Optional, Set, Tuple, Type

from airbyte_protocol_dataclasses.models import FailureType
from wcmatch.glob import GLOBSTAR, globmatch

from airbyte_cdk.models import AirbyteRecordMessageFileReference
Expand All @@ -19,6 +21,10 @@
preserve_directory_structure,
use_file_transfer,
)
from airbyte_cdk.sources.file_based.exceptions import FileSizeLimitError
from airbyte_cdk.sources.file_based.file_based_file_transfer_reader import (
AbstractFileBasedFileTransferReader,
)
from airbyte_cdk.sources.file_based.file_record_data import FileRecordData
from airbyte_cdk.sources.file_based.remote_file import RemoteFile

Expand All @@ -37,6 +43,13 @@ class AbstractFileBasedStreamReader(ABC):

def __init__(self) -> None:
self._config = None
if (
self.file_transfer_reader_class is None
and type(self).upload is AbstractFileBasedStreamReader.upload
):
raise NotImplementedError(
"One of file_transfer_reader_class or upload method must be defined to support file transfer."
)

@property
def config(self) -> Optional[AbstractFileBasedSpec]:
Expand Down Expand Up @@ -113,16 +126,6 @@ def filter_files_by_globs_and_start_date(
seen.add(file.uri)
yield file

@abstractmethod
def file_size(self, file: RemoteFile) -> int:
"""Utility method to get size of the remote file.

This is required for connectors that will support writing to
files. If the connector does not support writing files, then the
subclass can simply `return 0`.
"""
...

@staticmethod
def file_matches_globs(file: RemoteFile, globs: List[str]) -> bool:
# Use the GLOBSTAR flag to enable recursive ** matching
Expand Down Expand Up @@ -153,7 +156,10 @@ def include_identities_stream(self) -> bool:
return include_identities_stream(self.config)
return False

@abstractmethod
@property
def file_transfer_reader_class(self) -> Type[AbstractFileBasedFileTransferReader] | None:
return None

def upload(
self, file: RemoteFile, local_directory: str, logger: logging.Logger
) -> Tuple[FileRecordData, AirbyteRecordMessageFileReference]:
Expand All @@ -173,7 +179,56 @@ def upload(
- file_size_bytes (int): The size of the referenced file in bytes.
- source_file_relative_path (str): The relative path to the referenced file in source.
"""
...
if self.file_transfer_reader_class is None:
raise NotImplementedError(
"file_transfer_reader_class must be defined to support default file transfer upload method."
)

file_transfer = self.file_transfer_reader_class(file)
file_size = file_transfer.file_size

if file_size > file_transfer.FILE_SIZE_LIMIT:
message = f"File size exceeds the {file_transfer.FILE_SIZE_LIMIT / 1e9} GB limit."
raise FileSizeLimitError(
message=message, internal_message=message, failure_type=FailureType.config_error
)

file_paths = self._get_file_transfer_paths(
source_file_relative_path=file_transfer.source_file_relative_path,
staging_directory=local_directory,
)
local_file_path = file_paths[self.LOCAL_FILE_PATH]
file_relative_path = file_paths[self.FILE_RELATIVE_PATH]
file_name = file_paths[self.FILE_NAME]

logger.info(
f"Starting to download the file {file_transfer.file_uri_for_logging} with size: {file_size / (1024 * 1024):,.2f} MB ({file_size / (1024 * 1024 * 1024):.2f} GB)"
)
start_download_time = time.time()

file_transfer.download_to_local_directory(local_file_path)

write_duration = time.time() - start_download_time
logger.info(
f"Finished downloading the file {file_transfer.file_uri_for_logging} and saved to {local_file_path} in {write_duration:,.2f} seconds."
)

file_record_data = FileRecordData(
folder=file_paths[self.FILE_FOLDER],
file_name=file_name,
bytes=file_size,
id=file_transfer.file_id,
mime_type=file.mime_type,
created_at=file_transfer.file_created_at,
updated_at=file_transfer.file_updated_at,
source_uri=file.uri,
)
file_reference = AirbyteRecordMessageFileReference(
staging_file_url=local_file_path,
source_file_relative_path=file_relative_path,
file_size_bytes=file_size,
)
return file_record_data, file_reference

def _get_file_transfer_paths(
self, source_file_relative_path: str, staging_directory: str
Expand Down
90 changes: 90 additions & 0 deletions unit_tests/sources/file_based/test_file_based_stream_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,16 @@
from io import IOBase
from os import path
from typing import Any, ClassVar, Dict, Iterable, List, Mapping, Optional, Set
from unittest.mock import MagicMock

import pytest
from pydantic.v1 import AnyUrl

from airbyte_cdk.sources.file_based.config.abstract_file_based_spec import AbstractFileBasedSpec
from airbyte_cdk.sources.file_based.exceptions import FileSizeLimitError
from airbyte_cdk.sources.file_based.file_based_file_transfer_reader import (
AbstractFileBasedFileTransferReader,
)
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
from airbyte_cdk.sources.utils.files_directory import get_files_directory
Expand Down Expand Up @@ -64,6 +69,69 @@
}


class TestFileBasedFileTransferReader(AbstractFileBasedFileTransferReader):
@property
def file_id(self) -> str:
return "test_file_id"

@property
def file_created_at(self) -> str:
return "2025-05-05"

@property
def file_updated_at(self) -> str:
return "2025-05-06"

@property
def file_size(self) -> int:
return self.remote_file.size

def download_to_local_directory(self, local_file_path: str) -> None:
pass

@property
def source_file_relative_path(self) -> str:
return "source/path"

@property
def file_uri_for_logging(self) -> str:
return "logging/url"


class TestStreamReaderWithFileTransferClass(AbstractFileBasedStreamReader):
__test__: ClassVar[bool] = False # Tell Pytest this is not a Pytest class, despite its name

file_transfer_reader_class = TestFileBasedFileTransferReader

@property
def config(self) -> Optional[AbstractFileBasedSpec]:
return self._config

@config.setter
def config(self, value: AbstractFileBasedSpec) -> None:
self._config = value

def get_matching_files(self, globs: List[str]) -> Iterable[RemoteFile]:
pass

def open_file(self, file: RemoteFile) -> IOBase:
pass

def get_file_acl_permissions(self, file: RemoteFile, logger: logging.Logger) -> Dict[str, Any]:
return {}

def load_identity_groups(self, logger: logging.Logger) -> Iterable[Dict[str, Any]]:
return [{}]

@property
def file_permissions_schema(self) -> Dict[str, Any]:
return {"type": "object", "properties": {}}

@property
def identities_schema(self) -> Dict[str, Any]:
return {"type": "object", "properties": {}}


class TestStreamReader(AbstractFileBasedStreamReader):
__test__: ClassVar[bool] = False # Tell Pytest this is not a Pytest class, despite its name

Expand Down Expand Up @@ -458,3 +526,25 @@ def test_preserve_sub_directories_scenarios(
assert file_paths[AbstractFileBasedStreamReader.LOCAL_FILE_PATH] == expected_local_file_path
assert file_paths[AbstractFileBasedStreamReader.FILE_NAME] == path.basename(source_file_path)
assert file_paths[AbstractFileBasedStreamReader.FILE_FOLDER] == path.dirname(source_file_path)


def test_upload_with_file_transfer_reader():
stream_reader = TestStreamReaderWithFileTransferClass()
logger = logging.getLogger("airbyte")

remote_file = MagicMock()
remote_file.size = 200
remote_file.uri = "test_url"
remote_file.mime_type = "test_mime_type"
file_record_data, file_reference = stream_reader.upload(remote_file, "test_directory", logger)
assert file_record_data
assert file_reference

remote_file = MagicMock()
remote_file.size = 2_500_000_000
remote_file.uri = "test_url"
remote_file.mime_type = "test_mime_type"
with pytest.raises(FileSizeLimitError):
stream_reader.upload(remote_file, "test_directory", logger)
with pytest.raises(FileSizeLimitError):
stream_reader.upload(remote_file, "test_directory", logger)
Loading