Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,74 @@
"required": ["name", "format"]
}
},
"delivery_method": {
"title": "Delivery Method",
"default": "use_records_transfer",
"type": "object",
"order": 7,
"display_type": "radio",
"group": "advanced",
"airbyte_hidden": true,
"oneOf": [
{
"title": "Replicate Records",
"type": "object",
"properties": {
"delivery_type": {
"title": "Delivery Type",
"default": "use_records_transfer",
"const": "use_records_transfer",
"enum": ["use_records_transfer"],
"type": "string"
}
},
"description": "Recommended - Extract and load structured records into your destination of choice. This is the classic method of moving data in Airbyte. It allows for blocking and hashing individual fields or files from a structured schema. Data can be flattened, typed and deduped depending on the destination.",
"required": ["delivery_type"]
},
{
"title": "Copy Raw Files",
"type": "object",
"properties": {
"delivery_type": {
"title": "Delivery Type",
"default": "use_file_transfer",
"const": "use_file_transfer",
"enum": ["use_file_transfer"],
"type": "string"
},
"preserve_directory_structure": {
"title": "Preserve Sub-Directories in File Paths",
"description": "If enabled, sends subdirectory folder structure along with source file names to the destination. Otherwise, files will be synced by their names only. This option is ignored when file-based replication is not enabled.",
"default": true,
"type": "boolean"
}
},
"description": "Copy raw files without parsing their contents. Bits are copied into the destination exactly as they appeared in the source. Recommended for use with unstructured text data, non-text and compressed files.",
"required": ["delivery_type"]
},
{
"title": "Replicate Permissions ACL",
"type": "object",
"properties": {
"delivery_type": {
"title": "Delivery Type",
"default": "use_permissions_transfer",
"const": "use_permissions_transfer",
"enum": ["use_permissions_transfer"],
"type": "string"
},
"include_identities_stream": {
"title": "Include Identity Stream",
"description": "This data can be used in downstream systems to recreate permission restrictions mirroring the original source",
"default": true,
"type": "boolean"
}
},
"description": "Sends one identity stream and one for more permissions (ACL) streams to the destination. This data can be used in downstream systems to recreate permission restrictions mirroring the original source.",
"required": ["delivery_type"]
}
]
},
"credentials": {
"title": "Authentication",
"description": "Credentials for connecting to the Google Cloud Storage API",
Expand Down
5 changes: 4 additions & 1 deletion airbyte-integrations/connectors/source-gcs/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,17 @@ data:
connectorSubtype: file
connectorType: source
definitionId: 2a8c41ae-8c23-4be0-a73f-2ab10ca1a820
dockerImageTag: 0.8.30
dockerImageTag: 0.9.0-rc.1
dockerRepository: airbyte/source-gcs
documentationUrl: https://docs.airbyte.com/integrations/sources/gcs
githubIssueLabel: source-gcs
icon: gcs.svg
license: ELv2
maxSecondsBetweenMessages: 5400
name: Google Cloud Storage (GCS)
releases:
rolloutConfiguration:
enableProgressiveRollout: true
remoteRegistries:
pypi:
enabled: true
Expand Down
1,850 changes: 1,188 additions & 662 deletions airbyte-integrations/connectors/source-gcs/poetry.lock

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions airbyte-integrations/connectors/source-gcs/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "0.8.30"
version = "0.9.0-rc.1"
name = "source-gcs"
description = "Source implementation for Gcs."
authors = [ "Airbyte <[email protected]>",]
Expand All @@ -17,10 +17,10 @@ include = "source_gcs"

[tool.poetry.dependencies]
python = "^3.10,<3.12"
pytz = "==2024.1"
pytz = "==2024.2"
google-cloud-storage = "==2.12.0"
smart-open = {extras = ["gcs"], version = "==5.1.0"}
airbyte-cdk = {extras = ["file-based"], version = "^5"}
airbyte-cdk = {extras = ["file-based"], version = "^7"}

[tool.poetry.scripts]
source-gcs = "source_gcs.run:run"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@


import json
from typing import Any

from google.cloud import storage
from google.oauth2 import credentials, service_account
Expand Down Expand Up @@ -50,4 +51,5 @@ class GCSRemoteFile(RemoteFile):
displayed_uri is being used by Cursor to identify files with temporal local path in their uri attribute.
"""

blob: Any
displayed_uri: str = None
12 changes: 11 additions & 1 deletion airbyte-integrations/connectors/source-gcs/source_gcs/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@

from airbyte_cdk import emit_configuration_as_airbyte_control_message
from airbyte_cdk.models import AdvancedAuth, AuthFlowType, ConnectorSpecification, OAuthConfigSpecification
from airbyte_cdk.sources.file_based.config.abstract_file_based_spec import AbstractFileBasedSpec
from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig
from airbyte_cdk.sources.file_based.config.validate_config_transfer_modes import (
preserve_directory_structure,
use_file_transfer,
)
from airbyte_cdk.sources.file_based.file_based_source import FileBasedSource
from airbyte_cdk.sources.file_based.stream import AbstractFileBasedStream
from airbyte_cdk.sources.file_based.stream.cursor import AbstractFileBasedCursor
Expand Down Expand Up @@ -67,7 +72,10 @@ def spec(self, *args: Any, **kwargs: Any) -> ConnectorSpecification:
)

def _make_default_stream(
self, stream_config: FileBasedStreamConfig, cursor: Optional[AbstractFileBasedCursor]
self,
stream_config: FileBasedStreamConfig,
cursor: Optional[AbstractFileBasedCursor],
parsed_config: AbstractFileBasedSpec,
) -> AbstractFileBasedStream:
return GCSStream(
config=stream_config,
Expand All @@ -79,4 +87,6 @@ def _make_default_stream(
validation_policy=self._validate_and_get_validation_policy(stream_config),
errors_collector=self.errors_collector,
cursor=cursor,
use_file_transfer=use_file_transfer(parsed_config),
preserve_directory_structure=preserve_directory_structure(parsed_config),
)
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,22 @@
import json
import logging
import tempfile
import time
import urllib.parse
from datetime import datetime, timedelta
from io import IOBase, StringIO
from typing import Iterable, List, Optional
from typing import Iterable, List, Optional, Tuple

import pytz
import smart_open
from airbyte_protocol_dataclasses.models import FailureType
from google.cloud import storage
from google.oauth2 import credentials, service_account

from airbyte_cdk.sources.file_based.exceptions import ErrorListingFiles, FileBasedSourceError
from airbyte_cdk.models import AirbyteRecordMessageFileReference
from airbyte_cdk.sources.file_based.exceptions import ErrorListingFiles, FileBasedSourceError, FileSizeLimitError
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader, FileReadMode
from airbyte_cdk.sources.file_based.file_record_data import FileRecordData
from source_gcs.config import Config
from source_gcs.helpers import GCSRemoteFile
from source_gcs.zip_helper import ZipHelper
Expand All @@ -35,6 +40,8 @@ class SourceGCSStreamReader(AbstractFileBasedStreamReader):
Stream reader for Google Cloud Storage (GCS).
"""

FILE_SIZE_LIMIT = 1_500_000_000

def __init__(self):
super().__init__()
self._gcs_client = None
Expand All @@ -50,6 +57,50 @@ def config(self, value: Config):
assert isinstance(value, Config), "Config must be an instance of the expected Config class."
self._config = value

def file_size(self, file: GCSRemoteFile) -> int:
return file.blob.size

def upload(
self, file: GCSRemoteFile, local_directory: str, logger: logging.Logger
) -> Tuple[FileRecordData, AirbyteRecordMessageFileReference]:
file_size = self.file_size(file)

if file_size > self.FILE_SIZE_LIMIT:
message = "File size exceeds the 1 GB limit."
raise FileSizeLimitError(message=message, internal_message=message, failure_type=FailureType.config_error)

file_paths = self._get_file_transfer_paths(
source_file_relative_path=urllib.parse.unquote(file.blob.path), staging_directory=local_directory
)
local_file_path = file_paths[self.LOCAL_FILE_PATH]
file_relative_path = file_paths[self.FILE_RELATIVE_PATH]
file_name = file_paths[self.FILE_NAME]

logger.info(
f"Starting to download the file {file.uri} with size: {file_size / (1024 * 1024):,.2f} MB ({file_size / (1024 * 1024 * 1024):.2f} GB)"
)
start_download_time = time.time()
file.blob.download_to_filename(local_file_path)
write_duration = time.time() - start_download_time
logger.info(f"Finished downloading the file {file.uri} and saved to {local_file_path} in {write_duration:,.2f} seconds.")

file_record_data = FileRecordData(
folder=file_paths[self.FILE_FOLDER],
file_name=file_name,
bytes=file_size,
id=file.blob.id,
mime_type=file.mime_type,
created_at=file.blob.time_created.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
updated_at=file.blob.updated.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
source_uri=file.uri,
)
file_reference = AirbyteRecordMessageFileReference(
staging_file_url=local_file_path,
source_file_relative_path=file_relative_path,
file_size_bytes=file_size,
)
return file_record_data, file_reference

def _initialize_gcs_client(self):
if self.config is None:
raise ValueError("Source config is missing; cannot create the GCS client.")
Expand Down Expand Up @@ -104,7 +155,7 @@ def get_matching_files(self, globs: List[str], prefix: Optional[str], logger: lo
uri = blob.generate_signed_url(expiration=timedelta(days=7), version="v4")

file_extension = ".".join(blob.name.split(".")[1:])
remote_file = GCSRemoteFile(uri=uri, last_modified=last_modified, mime_type=file_extension)
remote_file = GCSRemoteFile(uri=uri, last_modified=last_modified, mime_type=file_extension, blob=blob)

if file_extension == "zip":
yield from ZipHelper(blob, remote_file, self.tmp_dir).get_gcs_remote_files()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import logging
from datetime import datetime, timedelta
from pathlib import Path
from unittest.mock import Mock
from unittest.mock import MagicMock, Mock

import pytest
from source_gcs import Cursor, SourceGCSStreamReader
Expand All @@ -23,7 +23,8 @@ def _file_uri() -> str:

@pytest.fixture
def remote_file():
return GCSRemoteFile(uri=_file_uri(), last_modified=datetime.now(), mime_type="csv")
blob = MagicMock(size=100, id="test/file/id", time_created=datetime.now() - timedelta(hours=1), updated=datetime.now())
return GCSRemoteFile(uri=_file_uri(), last_modified=datetime.now(), mime_type="csv", blob=blob)


@pytest.fixture
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,14 @@ def test_open_file_without_compression(remote_file, logger):

with pytest.raises(OSError):
reader.open_file(remote_file, FileReadMode.READ, None, logger)


def test_upload(remote_file, logger):
reader = SourceGCSStreamReader()
reader._gcs_client = Mock()
reader._config = Mock()

file_record_data, file_reference = reader.upload(remote_file, "test_local_directory", logger)

assert file_record_data is not None
assert file_reference is not None
Loading
Loading