Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 19 additions & 12 deletions airbyte-integrations/connectors/source-slack/components.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.

import logging
from copy import deepcopy
from dataclasses import dataclass
from datetime import timedelta
from functools import partial
Expand All @@ -16,6 +17,7 @@
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import (
InterpolatedString,
)
from airbyte_cdk.sources.declarative.migrations.state_migration import StateMigration
from airbyte_cdk.sources.declarative.partition_routers import SinglePartitionRouter, SubstreamPartitionRouter
from airbyte_cdk.sources.declarative.requesters import HttpRequester
from airbyte_cdk.sources.declarative.requesters.request_options.interpolated_request_options_provider import (
Expand All @@ -35,7 +37,7 @@
from airbyte_cdk.sources.streams.core import StreamData
from airbyte_cdk.sources.streams.http import HttpClient, HttpStream
from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator
from airbyte_cdk.sources.types import EmptyString, StreamState
from airbyte_cdk.sources.types import Config, EmptyString, StreamState
from airbyte_cdk.utils.datetime_helpers import ab_datetime_parse


Expand Down Expand Up @@ -98,7 +100,6 @@ def extract_records(self, response: requests.Response) -> List[Record]:
class ChannelsRetriever(SimpleRetriever):
def __post_init__(self, parameters: Mapping[str, Any]):
super().__post_init__(parameters)
self.stream_slicer = SinglePartitionRouter(parameters={})
self.record_selector.transformations = []

def should_join_to_channel(self, config: Mapping[str, Any], record: Record) -> bool:
Expand Down Expand Up @@ -156,7 +157,7 @@ def read_records(
return


class ThreadsPartitionRouter(SubstreamPartitionRouter):
class ThreadsStateMigration(StateMigration):
"""
The logic for incrementally syncing threads is not very obvious, so buckle up.
To get all messages in a thread, one must specify the channel and timestamp of the parent (first) message of that thread,
Expand All @@ -171,11 +172,19 @@ class ThreadsPartitionRouter(SubstreamPartitionRouter):
Good luck.
"""

def set_initial_state(self, stream_state: StreamState) -> None:
config: Config

def __init__(self, config: Config):
self._config = config

def should_migrate(self, stream_state: Mapping[str, Any]) -> bool:
return True

def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]:
if not stream_state:
return
return {}

start_date_state = ab_datetime_parse(self.config["start_date"]).timestamp() # start date is required
start_date_state = ab_datetime_parse(self._config["start_date"]).timestamp() # start date is required
# for migrated state
if stream_state.get("states"):
for state in stream_state["states"]:
Expand All @@ -184,13 +193,11 @@ def set_initial_state(self, stream_state: StreamState) -> None:
if stream_state.get("float_ts"):
start_date_state = max(start_date_state, float(stream_state["float_ts"]))

lookback_window = timedelta(days=self.config.get("lookback_window", 0)) # lookback window in days
lookback_window = timedelta(days=self._config.get("lookback_window", 0)) # lookback window in days
final_state = {"float_ts": (ab_datetime_parse(int(start_date_state)) - lookback_window).timestamp()}
# Set state for each parent stream with an incremental dependency
for parent_config in self.parent_stream_configs:
# Migrate child state to parent state format
start_date_state = self._migrate_child_state_to_parent_state(final_state)
parent_config.stream.state = start_date_state.get(parent_config.stream.name, {})
stream_state["parent_state"] = {"channel_messages": final_state}

return stream_state


MESSAGES_AND_THREADS_RATE = Rate(limit=1, interval=timedelta(seconds=60))
Expand Down
9 changes: 4 additions & 5 deletions airbyte-integrations/connectors/source-slack/manifest.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -148,13 +148,10 @@ definitions:
record_filter:
type: RecordFilter
condition: "{{ record.name in config.channel_filter or not config.channel_filter }}"
$parameters:
transformations: [[]]
paginator:
$ref: "#/definitions/default_paginator"
$parameters:
url_base: https://slack.com/api/
partition_router: []

channels_partition_router:
type: SubstreamPartitionRouter
Expand Down Expand Up @@ -296,6 +293,9 @@ definitions:
primary_key:
- channel_id
- ts
state_migrations:
- type: CustomStateMigration
class_name: "source_declarative_manifest.components.ThreadsStateMigration"
retriever:
$ref: "#/definitions/retriever"
requester:
Expand Down Expand Up @@ -324,8 +324,7 @@ definitions:
paginator:
$ref: "#/definitions/default_paginator"
partition_router:
type: CustomPartitionRouter
class_name: source_declarative_manifest.components.ThreadsPartitionRouter
type: SubstreamPartitionRouter
parent_stream_configs:
- type: ParentStreamConfig
stream:
Expand Down
4 changes: 2 additions & 2 deletions airbyte-integrations/connectors/source-slack/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ data:
hosts:
- slack.com
connectorBuildOptions:
baseImage: docker.io/airbyte/source-declarative-manifest:7.1.1@sha256:e8dd37b6675300a0cc048457435fdd32fb58b806c91fd65367609542d658ed49
baseImage: docker.io/airbyte/source-declarative-manifest:7.1.0@sha256:a46cbb8400b053c5e3de0a2751ed5000915f975f6d2740e1ec1880e306bc6603
connectorSubtype: api
connectorType: source
definitionId: c2281cee-86f9-4a86-bb48-d23286b4c7bd
dockerImageTag: 3.1.1
dockerImageTag: 3.1.2
dockerRepository: airbyte/source-slack
documentationUrl: https://docs.airbyte.com/integrations/sources/slack
githubIssueLabel: source-slack
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@

import pytest

from airbyte_cdk.sources.declarative.retrievers import Retriever
from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource
from airbyte_cdk.test.catalog_builder import CatalogBuilder
from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream
from airbyte_cdk.test.catalog_builder import CatalogBuilder, ConfiguredAirbyteStreamBuilder
from airbyte_cdk.test.state_builder import StateBuilder


Expand All @@ -30,15 +32,15 @@ def _get_manifest_path() -> Path:
sys.path.append(str(_SOURCE_FOLDER_PATH)) # to allow loading custom components


def get_source(config, state=None) -> YamlDeclarativeSource:
catalog = CatalogBuilder().build()
def get_source(config, stream_name=None, state=None) -> YamlDeclarativeSource:
catalog = CatalogBuilder().with_stream(ConfiguredAirbyteStreamBuilder().with_name(stream_name)).build() if stream_name else None
state = StateBuilder().build() if not state else state
return YamlDeclarativeSource(path_to_yaml=str(_YAML_FILE_PATH), catalog=catalog, config=config, state=state)


def get_stream_by_name(stream_name, config, state=None):
state = StateBuilder().build() if not state else state
streams = get_source(config, state).streams(config=config)
streams = get_source(config, stream_name, state).streams(config=config)
for stream in streams:
if stream.name == stream_name:
return stream
Expand Down Expand Up @@ -158,3 +160,7 @@ def joined_channel():
"purpose": {"value": "For widget discussion", "creator": "", "last_set": 0},
"previous_names": [],
}


def get_retriever(stream: DefaultStream) -> Retriever:
return stream._stream_partition_generator._partition_factory._retriever
Loading
Loading