Skip to content
64 changes: 64 additions & 0 deletions airbyte_cdk/sources/file_based/file_based_file_transfer_reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from abc import ABC, abstractmethod

from airbyte_cdk.sources.file_based.remote_file import RemoteFile


class AbstractFileBasedFileTransferReader(ABC):
FILE_SIZE_LIMIT = 1_500_000_000

def __init__(self, remote_file: RemoteFile) -> None:
self.remote_file = remote_file

@property
@abstractmethod
def file_id(self) -> str:
"""
Unique identifier for the file being transferred.
"""
...

@property
@abstractmethod
def file_created_at(self) -> str:
"""
Date time when the file was created.
"""
...

@property
@abstractmethod
def file_updated_at(self) -> str:
"""
Date time when the file was last updated.
"""
...

@property
@abstractmethod
def file_size(self) -> int:
"""
Returns the file size in bytes.
"""
...

@abstractmethod
def download_to_local_directory(self, local_file_path: str) -> None:
"""
Download the file from remote source to local storage.
"""
...

@property
@abstractmethod
def source_file_relative_path(self) -> str:
"""
Returns the relative path of the source file.
"""
...

@property
def file_uri_for_logging(self):
"""
Returns the URI for the file being logged.
"""
return self.remote_file.uri
68 changes: 66 additions & 2 deletions airbyte_cdk/sources/file_based/file_based_stream_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#

import logging
import time
from abc import ABC, abstractmethod
from datetime import datetime
from enum import Enum
Expand All @@ -19,8 +20,13 @@
preserve_directory_structure,
use_file_transfer,
)
from airbyte_cdk.sources.file_based.file_based_file_transfer_reader import (
AbstractFileBasedFileTransferReader,
)
from airbyte_cdk.sources.file_based.file_record_data import FileRecordData
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
from airbyte_protocol_dataclasses.models import FailureType
from airbyte_cdk.sources.file_based.exceptions import FileSizeLimitError


class FileReadMode(Enum):
Expand All @@ -38,6 +44,14 @@ class AbstractFileBasedStreamReader(ABC):
def __init__(self) -> None:
self._config = None

if (
self.file_transfer_reader_class is None
and self.upload.__func__ == AbstractFileBasedStreamReader.upload
):
raise NotImplementedError(
"One of file_transfer_reader_class or upload method must be defined to support file transfer."
)

@property
def config(self) -> Optional[AbstractFileBasedSpec]:
return self._config
Expand Down Expand Up @@ -153,7 +167,10 @@ def include_identities_stream(self) -> bool:
return include_identities_stream(self.config)
return False

@abstractmethod
@property
def file_transfer_reader_class(self) -> AbstractFileBasedFileTransferReader | None:
return None

def upload(
self, file: RemoteFile, local_directory: str, logger: logging.Logger
) -> Tuple[FileRecordData, AirbyteRecordMessageFileReference]:
Expand All @@ -173,7 +190,54 @@ def upload(
- file_size_bytes (int): The size of the referenced file in bytes.
- source_file_relative_path (str): The relative path to the referenced file in source.
"""
...
# if self.file_transfer_reader_class is None and self.upload.__func__ == AbstractFileBasedStreamReader.upload:
# raise NotImplementedError("One of file_transfer_reader_class or upload method must be defined to support file transfer.")

file_transfer = self.file_transfer_reader_class(file)
file_size = file_transfer.file_size

if file_size > file_transfer.FILE_SIZE_LIMIT:
message = "File size exceeds the 1 GB limit."
raise FileSizeLimitError(
message=message, internal_message=message, failure_type=FailureType.config_error
)

file_paths = self._get_file_transfer_paths(
source_file_relative_path=file_transfer.source_file_relative_path,
staging_directory=local_directory,
)
local_file_path = file_paths[self.LOCAL_FILE_PATH]
file_relative_path = file_paths[self.FILE_RELATIVE_PATH]
file_name = file_paths[self.FILE_NAME]

logger.info(
f"Starting to download the file {file_transfer.file_uri_for_logging} with size: {file_size / (1024 * 1024):,.2f} MB ({file_size / (1024 * 1024 * 1024):.2f} GB)"
)
start_download_time = time.time()

file_transfer.download_to_local_directory(local_file_path)

write_duration = time.time() - start_download_time
logger.info(
f"Finished downloading the file {file_transfer.file_uri_for_logging} and saved to {local_file_path} in {write_duration:,.2f} seconds."
)

file_record_data = FileRecordData(
folder=file_paths[self.FILE_FOLDER],
file_name=file_name,
bytes=file_size,
id=file_transfer.file_id,
mime_type=file.mime_type,
created_at=file_transfer.file_created_at,
updated_at=file_transfer.file_updated_at,
source_uri=file.uri,
)
file_reference = AirbyteRecordMessageFileReference(
staging_file_url=local_file_path,
source_file_relative_path=file_relative_path,
file_size_bytes=file_size,
)
return file_record_data, file_reference

def _get_file_transfer_paths(
self, source_file_relative_path: str, staging_directory: str
Expand Down
Loading