Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,53 @@
"required": ["name", "format"]
}
},
"delivery_method": {
"title": "Delivery Method",
"default": "use_records_transfer",
"type": "object",
"order": 7,
"display_type": "radio",
"group": "advanced",
"airbyte_hidden": true,
"oneOf": [
{
"title": "Replicate Records",
"type": "object",
"properties": {
"delivery_type": {
"title": "Delivery Type",
"default": "use_records_transfer",
"const": "use_records_transfer",
"enum": ["use_records_transfer"],
"type": "string"
}
},
"description": "Recommended - Extract and load structured records into your destination of choice. This is the classic method of moving data in Airbyte. It allows for blocking and hashing individual fields or files from a structured schema. Data can be flattened, typed and deduped depending on the destination.",
"required": ["delivery_type"]
},
{
"title": "Copy Raw Files",
"type": "object",
"properties": {
"delivery_type": {
"title": "Delivery Type",
"default": "use_file_transfer",
"const": "use_file_transfer",
"enum": ["use_file_transfer"],
"type": "string"
},
"preserve_directory_structure": {
"title": "Preserve Sub-Directories in File Paths",
"description": "If enabled, sends subdirectory folder structure along with source file names to the destination. Otherwise, files will be synced by their names only. This option is ignored when file-based replication is not enabled.",
"default": true,
"type": "boolean"
}
},
"description": "Copy raw files without parsing their contents. Bits are copied into the destination exactly as they appeared in the source. Recommended for use with unstructured text data, non-text and compressed files.",
"required": ["delivery_type"]
}
]
},
"credentials": {
"title": "Authentication",
"description": "Credentials for connecting to the Google Cloud Storage API",
Expand Down
5 changes: 4 additions & 1 deletion airbyte-integrations/connectors/source-gcs/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,17 @@ data:
connectorSubtype: file
connectorType: source
definitionId: 2a8c41ae-8c23-4be0-a73f-2ab10ca1a820
dockerImageTag: 0.8.31
dockerImageTag: 0.9.0-rc.1
dockerRepository: airbyte/source-gcs
documentationUrl: https://docs.airbyte.com/integrations/sources/gcs
githubIssueLabel: source-gcs
icon: gcs.svg
license: ELv2
maxSecondsBetweenMessages: 5400
name: Google Cloud Storage (GCS)
releases:
rolloutConfiguration:
enableProgressiveRollout: true
remoteRegistries:
pypi:
enabled: true
Expand Down
1,469 changes: 1,045 additions & 424 deletions airbyte-integrations/connectors/source-gcs/poetry.lock

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions airbyte-integrations/connectors/source-gcs/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "0.8.31"
version = "0.9.0-rc.1"
name = "source-gcs"
description = "Source implementation for Gcs."
authors = [ "Airbyte <[email protected]>",]
Expand All @@ -17,10 +17,10 @@ include = "source_gcs"

[tool.poetry.dependencies]
python = "^3.10,<3.12"
pytz = "==2024.1"
pytz = "==2024.2"
google-cloud-storage = "==2.12.0"
smart-open = {extras = ["gcs"], version = "==5.1.0"}
airbyte-cdk = {extras = ["file-based"], version = "^5"}
airbyte-cdk = {extras = ["file-based"], version = "^7"}

[tool.poetry.scripts]
source-gcs = "source_gcs.run:run"
Expand Down
13 changes: 12 additions & 1 deletion airbyte-integrations/connectors/source-gcs/source_gcs/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from pydantic.v1 import AnyUrl, BaseModel, Field

from airbyte_cdk.sources.file_based.config.abstract_file_based_spec import AbstractFileBasedSpec
from airbyte_cdk.sources.file_based.config.abstract_file_based_spec import AbstractFileBasedSpec, DeliverRawFiles, DeliverRecords
from airbyte_cdk.utils.oneof_option_config import OneOfOptionConfig


Expand Down Expand Up @@ -71,6 +71,17 @@ class Config(AbstractFileBasedSpec, BaseModel):

bucket: str = Field(title="Bucket", description="Name of the GCS bucket where the file(s) exist.", order=2)

delivery_method: Union[DeliverRecords, DeliverRawFiles] = Field(
title="Delivery Method",
discriminator="delivery_type",
type="object",
order=3,
display_type="radio",
group="advanced",
default="use_records_transfer",
airbyte_hidden=True,
)

Comment on lines +74 to +84
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

πŸ‘

@classmethod
def documentation_url(cls) -> AnyUrl:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,16 @@
from datetime import datetime

from airbyte_cdk.sources.file_based.stream.cursor import DefaultFileBasedCursor
from source_gcs.helpers import GCSRemoteFile
from source_gcs.helpers import GCSUploadableRemoteFile


class Cursor(DefaultFileBasedCursor):
@staticmethod
def get_file_uri(file: GCSRemoteFile) -> str:
def get_file_uri(file: GCSUploadableRemoteFile) -> str:
file_uri = file.displayed_uri if file.displayed_uri else file.uri
return file_uri.split("?")[0]

def add_file(self, file: GCSRemoteFile) -> None:
def add_file(self, file: GCSUploadableRemoteFile) -> None:
uri = self.get_file_uri(file)
self._file_to_datetime_history[uri] = file.last_modified.strftime(self.DATE_TIME_FORMAT)
if len(self._file_to_datetime_history) > self.DEFAULT_MAX_HISTORY_SIZE:
Expand All @@ -29,7 +29,7 @@ def add_file(self, file: GCSRemoteFile) -> None:
"The history is full but there is no files in the history. This should never happen and might be indicative of a bug in the CDK."
)

def _should_sync_file(self, file: GCSRemoteFile, logger: logging.Logger) -> bool:
def _should_sync_file(self, file: GCSUploadableRemoteFile, logger: logging.Logger) -> bool:
uri = self.get_file_uri(file)
if uri in self._file_to_datetime_history:
# If the file's uri is in the history, we should sync the file if it has been modified since it was synced
Expand Down
32 changes: 30 additions & 2 deletions airbyte-integrations/connectors/source-gcs/source_gcs/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@


import json
import urllib.parse
from datetime import timedelta
from typing import Any

import pytz
from google.cloud import storage
from google.oauth2 import credentials, service_account

from airbyte_cdk.sources.file_based.remote_file import RemoteFile
from airbyte_cdk.sources.file_based.remote_file import UploadableRemoteFile


def get_gcs_client(config):
Expand Down Expand Up @@ -44,10 +48,34 @@ def get_stream_name(blob):
return stream_name


class GCSRemoteFile(RemoteFile):
class GCSUploadableRemoteFile(UploadableRemoteFile):
"""
Extends RemoteFile instance with displayed_uri attribute.
displayed_uri is being used by Cursor to identify files with temporal local path in their uri attribute.
"""

blob: Any
displayed_uri: str = None

def __init__(self, blob: Any, displayed_uri: str = None, **kwargs):
super().__init__(**kwargs)
self.blob = blob
self.displayed_uri = displayed_uri
self.id = self.blob.id
self.created_at = self.blob.time_created.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
self.updated_at = self.blob.updated.strftime("%Y-%m-%dT%H:%M:%S.%fZ")

@property
def size(self) -> int:
return self.blob.size

def download_to_local_directory(self, local_file_path: str) -> None:
self.blob.download_to_filename(local_file_path)

@property
def source_file_relative_path(self) -> str:
return urllib.parse.unquote(self.blob.path)

@property
def file_uri_for_logging(self) -> str:
return urllib.parse.unquote(self.blob.path)
Comment on lines +60 to +81
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

12 changes: 11 additions & 1 deletion airbyte-integrations/connectors/source-gcs/source_gcs/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@

from airbyte_cdk import emit_configuration_as_airbyte_control_message
from airbyte_cdk.models import AdvancedAuth, AuthFlowType, ConnectorSpecification, OAuthConfigSpecification
from airbyte_cdk.sources.file_based.config.abstract_file_based_spec import AbstractFileBasedSpec
from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig
from airbyte_cdk.sources.file_based.config.validate_config_transfer_modes import (
preserve_directory_structure,
use_file_transfer,
)
from airbyte_cdk.sources.file_based.file_based_source import FileBasedSource
from airbyte_cdk.sources.file_based.stream import AbstractFileBasedStream
from airbyte_cdk.sources.file_based.stream.cursor import AbstractFileBasedCursor
Expand Down Expand Up @@ -67,7 +72,10 @@ def spec(self, *args: Any, **kwargs: Any) -> ConnectorSpecification:
)

def _make_default_stream(
self, stream_config: FileBasedStreamConfig, cursor: Optional[AbstractFileBasedCursor]
self,
stream_config: FileBasedStreamConfig,
cursor: Optional[AbstractFileBasedCursor],
parsed_config: AbstractFileBasedSpec,
) -> AbstractFileBasedStream:
return GCSStream(
config=stream_config,
Expand All @@ -79,4 +87,6 @@ def _make_default_stream(
validation_policy=self._validate_and_get_validation_policy(stream_config),
errors_collector=self.errors_collector,
cursor=cursor,
use_file_transfer=use_file_transfer(parsed_config),
preserve_directory_structure=preserve_directory_structure(parsed_config),
)
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
from typing import Any

from airbyte_cdk.sources.file_based.stream import DefaultFileBasedStream
from source_gcs.helpers import GCSRemoteFile
from source_gcs.helpers import GCSUploadableRemoteFile


class GCSStream(DefaultFileBasedStream):
def transform_record(self, record: dict[str, Any], file: GCSRemoteFile, last_updated: str) -> dict[str, Any]:
def transform_record(self, record: dict[str, Any], file: GCSUploadableRemoteFile, last_updated: str) -> dict[str, Any]:
record[self.ab_last_mod_col] = last_updated
record[self.ab_file_name_col] = file.displayed_uri if file.displayed_uri else file.uri
return record
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import json
import logging
import tempfile
import urllib.parse
from datetime import datetime, timedelta
from io import IOBase, StringIO
from typing import Iterable, List, Optional
Expand All @@ -14,10 +15,11 @@
from google.cloud import storage
from google.oauth2 import credentials, service_account

from airbyte_cdk.sources.file_based.config.abstract_file_based_spec import DeliverRawFiles
from airbyte_cdk.sources.file_based.exceptions import ErrorListingFiles, FileBasedSourceError
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader, FileReadMode
from source_gcs.config import Config
from source_gcs.helpers import GCSRemoteFile
from source_gcs.helpers import GCSUploadableRemoteFile
from source_gcs.zip_helper import ZipHelper


Expand Down Expand Up @@ -77,7 +79,7 @@ def _get_credentials(self):
def gcs_client(self) -> storage.Client:
return self._initialize_gcs_client()

def get_matching_files(self, globs: List[str], prefix: Optional[str], logger: logging.Logger) -> Iterable[GCSRemoteFile]:
def get_matching_files(self, globs: List[str], prefix: Optional[str], logger: logging.Logger) -> Iterable[GCSUploadableRemoteFile]:
"""
Retrieve all files matching the specified glob patterns in GCS.
"""
Expand All @@ -103,10 +105,11 @@ def get_matching_files(self, globs: List[str], prefix: Optional[str], logger: lo
else:
uri = blob.generate_signed_url(expiration=timedelta(days=7), version="v4")

file_extension = ".".join(blob.name.split(".")[1:])
remote_file = GCSRemoteFile(uri=uri, last_modified=last_modified, mime_type=file_extension)
remote_file = GCSUploadableRemoteFile(
uri=uri, blob=blob, last_modified=last_modified, mime_type=".".join(blob.name.split(".")[1:])
)

if file_extension == "zip":
if remote_file.mime_type == "zip" and self.config.delivery_method.delivery_type != DeliverRawFiles.delivery_type:
yield from ZipHelper(blob, remote_file, self.tmp_dir).get_gcs_remote_files()
else:
yield remote_file
Expand All @@ -122,7 +125,7 @@ def _handle_file_listing_error(self, exc: Exception, prefix: str, logger: loggin
prefix=prefix,
) from exc

def open_file(self, file: GCSRemoteFile, mode: FileReadMode, encoding: Optional[str], logger: logging.Logger) -> IOBase:
def open_file(self, file: GCSUploadableRemoteFile, mode: FileReadMode, encoding: Optional[str], logger: logging.Logger) -> IOBase:
"""
Open and yield a remote file from GCS for reading.
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from google.cloud.storage.blob import Blob

from source_gcs.helpers import GCSRemoteFile
from source_gcs.helpers import GCSUploadableRemoteFile


logger = logging.getLogger("airbyte")
Expand All @@ -17,7 +17,7 @@
class ZipHelper:
BUFFER_SIZE_DEFAULT = 1024 * 1024

def __init__(self, blob: Blob, zip_file: GCSRemoteFile, tmp_dir: tempfile.TemporaryDirectory):
def __init__(self, blob: Blob, zip_file: GCSUploadableRemoteFile, tmp_dir: tempfile.TemporaryDirectory):
self._blob = blob
self._size = blob.size
self._tmp_dir = tmp_dir
Expand All @@ -42,16 +42,17 @@ def _extract_files_to_tmp_directory(self, object_bytes: bytes) -> None:
with zipfile.ZipFile(bytes_io, "r") as zf:
zf.extractall(self._tmp_dir.name)

def get_gcs_remote_files(self) -> Iterable[GCSRemoteFile]:
def get_gcs_remote_files(self) -> Iterable[GCSUploadableRemoteFile]:
self._extract_files_to_tmp_directory(self._chunk_download())

for unzipped_file in os.listdir(self._tmp_dir.name):
logger.info(f"Picking up file {unzipped_file.split('/')[-1]} from zip archive {self._blob.public_url}.")
file_extension = unzipped_file.split(".")[-1]

yield GCSRemoteFile(
yield GCSUploadableRemoteFile(
uri=os.path.join(self._tmp_dir.name, unzipped_file), # uri to temporal local file
last_modified=self._zip_file.last_modified,
mime_type=file_extension,
displayed_uri=self._zip_file.uri, # uri to remote file .zip
blob=self._blob,
)
18 changes: 10 additions & 8 deletions airbyte-integrations/connectors/source-gcs/unit_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
import logging
from datetime import datetime, timedelta
from pathlib import Path
from unittest.mock import Mock
from unittest.mock import MagicMock, Mock

import pytest
from source_gcs import Cursor, SourceGCSStreamReader
from source_gcs.helpers import GCSRemoteFile
from source_gcs.helpers import GCSUploadableRemoteFile

from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig

Expand All @@ -23,22 +23,24 @@ def _file_uri() -> str:

@pytest.fixture
def remote_file():
return GCSRemoteFile(uri=_file_uri(), last_modified=datetime.now(), mime_type="csv")
blob = MagicMock(size=100, id="test/file/id", time_created=datetime.now() - timedelta(hours=1), updated=datetime.now())
blob.name.return_value = "file.csv"
return GCSUploadableRemoteFile(uri=_file_uri(), last_modified=datetime.now(), mime_type="csv", blob=blob)


@pytest.fixture
def remote_file_older():
return GCSRemoteFile(uri=_file_uri(), last_modified=datetime.now() - timedelta(days=1))
return GCSUploadableRemoteFile(uri=_file_uri(), last_modified=datetime.now() - timedelta(days=1), blob=MagicMock())


@pytest.fixture
def remote_file_future():
return GCSRemoteFile(uri=_file_uri(), last_modified=datetime.now() + timedelta(days=1))
return GCSUploadableRemoteFile(uri=_file_uri(), last_modified=datetime.now() + timedelta(days=1), blob=MagicMock())


@pytest.fixture
def remote_file_b():
return GCSRemoteFile(uri=_file_uri().replace("a.csv", "b.csv"), last_modified=datetime.now())
return GCSUploadableRemoteFile(uri=_file_uri().replace("a.csv", "b.csv"), last_modified=datetime.now(), blob=MagicMock())


@pytest.fixture
Expand All @@ -60,10 +62,10 @@ def mocked_reader():

@pytest.fixture
def zip_file():
return GCSRemoteFile(
return GCSUploadableRemoteFile(
uri=str(Path(__file__).parent / "resource/files/test.csv.zip"),
blob=MagicMock(),
last_modified=datetime.today(),
mime_type=".zip",
displayed_uri="resource/files/test.csv.zip",
)

Expand Down
Loading
Loading