Skip to content

Commit 400cad5

Browse files
darynaishchenkooctavia-squidington-iiikatmarkham
authored
feat(source-slack): add custom api budget for threads and channel messages streams (#64553)
Co-authored-by: Octavia Squidington III <[email protected]> Co-authored-by: Kat Wilson <[email protected]>
1 parent b803721 commit 400cad5

File tree

7 files changed

+240
-34
lines changed

7 files changed

+240
-34
lines changed

airbyte-integrations/connectors/source-slack/metadata.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ data:
1010
connectorSubtype: api
1111
connectorType: source
1212
definitionId: c2281cee-86f9-4a86-bb48-d23286b4c7bd
13-
dockerImageTag: 2.2.0-rc.5
13+
dockerImageTag: 2.2.0-rc.6
1414
dockerRepository: airbyte/source-slack
1515
documentationUrl: https://docs.airbyte.com/integrations/sources/slack
1616
githubIssueLabel: source-slack

airbyte-integrations/connectors/source-slack/pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
33
build-backend = "poetry.core.masonry.api"
44

55
[tool.poetry]
6-
version = "2.2.0-rc.5"
6+
version = "2.2.0-rc.6"
77
name = "source-slack"
88
description = "Source implementation for Slack."
99
authors = [ "Airbyte <[email protected]>",]
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
2+
3+
from dataclasses import dataclass
4+
from datetime import timedelta
5+
from typing import Any, Dict, Mapping
6+
7+
from airbyte_cdk.sources.declarative.auth.declarative_authenticator import (
8+
NoAuth,
9+
)
10+
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import (
11+
InterpolatedString,
12+
)
13+
from airbyte_cdk.sources.declarative.requesters import HttpRequester
14+
from airbyte_cdk.sources.declarative.requesters.request_options.interpolated_request_options_provider import (
15+
InterpolatedRequestOptionsProvider,
16+
)
17+
from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod
18+
from airbyte_cdk.sources.streams.call_rate import (
19+
APIBudget,
20+
HttpRequestMatcher,
21+
LimiterMixin,
22+
MovingWindowCallRatePolicy,
23+
Rate,
24+
UnlimitedCallRatePolicy,
25+
)
26+
from airbyte_cdk.sources.streams.http import HttpClient
27+
from airbyte_cdk.sources.types import EmptyString
28+
29+
30+
MESSAGES_AND_THREADS_RATE = Rate(limit=1, interval=timedelta(seconds=60))
31+
32+
33+
class MessagesAndThreadsApiBudget(APIBudget, LimiterMixin):
34+
"""
35+
Switches to MovingWindowCallRatePolicy 1 request per minute if rate limits were exceeded.
36+
"""
37+
38+
def update_from_response(self, request: Any, response: Any) -> None:
39+
current_policy = self.get_matching_policy(request)
40+
if response.status_code == 429 and isinstance(current_policy, UnlimitedCallRatePolicy):
41+
matchers = current_policy._matchers
42+
self._policies = [
43+
MovingWindowCallRatePolicy(
44+
matchers=matchers,
45+
rates=[MESSAGES_AND_THREADS_RATE],
46+
)
47+
]
48+
49+
50+
@dataclass
51+
class MessagesAndThreadsHttpRequester(HttpRequester):
52+
"""
53+
Redefines Custom API Budget to handle rate limits.
54+
"""
55+
56+
url_match: str = None
57+
# redefine this here to set up in InterpolatedRequestOptionsProvider in __post_init__
58+
request_parameters: Dict[str, Any] = None
59+
60+
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
61+
self._url = InterpolatedString.create(self.url if self.url else EmptyString, parameters=parameters)
62+
# deprecated
63+
self._url_base = InterpolatedString.create(self.url_base if self.url_base else EmptyString, parameters=parameters)
64+
# deprecated
65+
self._path = InterpolatedString.create(self.path if self.path else EmptyString, parameters=parameters)
66+
if self.request_options_provider is None:
67+
self._request_options_provider = InterpolatedRequestOptionsProvider(
68+
config=self.config,
69+
parameters=parameters,
70+
request_parameters=self.request_parameters,
71+
)
72+
elif isinstance(self.request_options_provider, dict):
73+
self._request_options_provider = InterpolatedRequestOptionsProvider(config=self.config, **self.request_options_provider)
74+
else:
75+
self._request_options_provider = self.request_options_provider
76+
self._authenticator = self.authenticator or NoAuth(parameters=parameters)
77+
self._http_method = HttpMethod[self.http_method] if isinstance(self.http_method, str) else self.http_method
78+
self.error_handler = self.error_handler
79+
self._parameters = parameters
80+
81+
if self.error_handler is not None and hasattr(self.error_handler, "backoff_strategies"):
82+
backoff_strategies = self.error_handler.backoff_strategies # type: ignore
83+
else:
84+
backoff_strategies = None
85+
86+
self._http_client = HttpClient(
87+
name=self.name,
88+
logger=self.logger,
89+
error_handler=self.error_handler,
90+
api_budget=MessagesAndThreadsApiBudget(
91+
policies=[
92+
UnlimitedCallRatePolicy(
93+
matchers=[HttpRequestMatcher(url=self.url_match)],
94+
)
95+
]
96+
),
97+
authenticator=self._authenticator,
98+
use_cache=self.use_cache,
99+
backoff_strategy=backoff_strategies,
100+
disable_retries=self.disable_retries,
101+
message_repository=self.message_repository,
102+
)

airbyte-integrations/connectors/source-slack/source_slack/components/slack_backoff_strategy.py

Lines changed: 0 additions & 28 deletions
This file was deleted.

airbyte-integrations/connectors/source-slack/source_slack/manifest.yaml

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,13 @@ definitions:
3131
type: BearerAuthenticator
3232
api_token: "{{ config['credentials']['access_token'] }}"
3333

34+
authenticator:
35+
type: SelectiveAuthenticator
36+
authenticator_selection_path: ["credentials", "option_title"]
37+
authenticators:
38+
Default OAuth2.0 authorization: "#/definitions/access_token_auth"
39+
API Token Credentials: "#/definitions/api_token_auth"
40+
3441
requester:
3542
type: HttpRequester
3643
url_base: https://slack.com/api/
@@ -200,15 +207,26 @@ definitions:
200207
retriever:
201208
$ref: "#/definitions/retriever"
202209
requester:
203-
$ref: "#/definitions/requester"
204-
use_cache: true
210+
type: CustomRequester
211+
class_name: "source_slack.components.slack_api_budget.MessagesAndThreadsHttpRequester"
212+
url_match: "https://slack.com/api/conversations.history?.+"
213+
url_base: https://slack.com/api/
214+
path: "{{ parameters['path'] }}"
215+
http_method: GET
205216
request_parameters:
206217
inclusive: "True"
218+
request_headers: {}
219+
authenticator:
220+
$ref: "#/definitions/authenticator"
221+
request_body_json: {}
207222
error_handler:
208223
type: DefaultErrorHandler
209224
backoff_strategies:
210225
- type: "WaitTimeFromHeader"
211226
header: "retry-after"
227+
- type: "WaitTimeFromHeader"
228+
header: "Retry-After"
229+
use_cache: true
212230
record_selector:
213231
$ref: "#/definitions/selector"
214232
paginator:
@@ -275,9 +293,18 @@ definitions:
275293
retriever:
276294
$ref: "#/definitions/retriever"
277295
requester:
278-
$ref: "#/definitions/requester"
296+
type: CustomRequester
297+
class_name: "source_slack.components.slack_api_budget.MessagesAndThreadsHttpRequester"
298+
url_match: "https://slack.com/api/conversations.replies?.+"
299+
url_base: https://slack.com/api/
300+
path: "{{ parameters['path'] }}"
301+
http_method: GET
279302
request_parameters:
280303
channel: "{{ stream_partition['parent_slice']['channel'] }}"
304+
request_headers: {}
305+
authenticator:
306+
$ref: "#/definitions/authenticator"
307+
request_body_json: {}
281308
error_handler:
282309
type: DefaultErrorHandler
283310
max_retries: 20

airbyte-integrations/connectors/source-slack/unit_tests/test_components.py

Lines changed: 97 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,17 @@
77
from source_slack import SourceSlack
88
from source_slack.components.channel_members_extractor import ChannelMembersExtractor
99
from source_slack.components.join_channels import ChannelsRetriever, JoinChannelsStream
10+
from source_slack.components.threads_partition_router import ThreadsPartitionRouter
1011

1112
from airbyte_cdk.models import SyncMode
1213
from airbyte_cdk.sources.declarative.extractors import DpathExtractor, RecordSelector
1314
from airbyte_cdk.sources.declarative.requesters import HttpRequester
15+
from airbyte_cdk.sources.streams.call_rate import MovingWindowCallRatePolicy, UnlimitedCallRatePolicy
1416
from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator
1517

1618

1719
def get_stream_by_name(stream_name, config):
18-
streams = SourceSlack().streams(config=config)
20+
streams = SourceSlack(catalog={}, config=config, state={}).streams(config=config)
1921
for stream in streams:
2022
if stream.name == stream_name:
2123
return stream
@@ -95,3 +97,97 @@ def test_join_channel_read(requests_mock, token_config, joined_channel, caplog,
9597
assert mocked_request.called
9698
assert mocked_request.last_request._request.body == b'{"channel": "channel 2"}'
9799
assert log_message in caplog.text
100+
101+
102+
@pytest.mark.parametrize(
103+
"threads_stream_state, expected_parent_state",
104+
(
105+
({}, {}),
106+
(
107+
{"float_ts": 7270247822.0},
108+
# lookback window applied
109+
{"float_ts": 7270161422.0},
110+
),
111+
(
112+
{
113+
"states": [
114+
{
115+
"partition": {"float_ts": "1683104542.931169", "parent_slice": {"channel": "C04KX3KEZ54", "parent_slice": {}}},
116+
"cursor": {"float_ts": "1753263869"},
117+
},
118+
{
119+
"partition": {"float_ts": "1683104590.931169", "parent_slice": {"channel": "C04KX3KEZ54", "parent_slice": {}}},
120+
"cursor": {"float_ts": "1753263870"},
121+
},
122+
{
123+
"partition": {"float_ts": "1683104590.931169", "parent_slice": {"channel": "C04KX3KEZ54", "parent_slice": {}}},
124+
"cursor": {"float_ts": "1753263849"},
125+
},
126+
]
127+
},
128+
# lookback window applied
129+
{"float_ts": 1753177470.0},
130+
),
131+
),
132+
ids=["no_state", "old_format_state", "new_format_state"],
133+
)
134+
def test_threads_partition_router(token_config, threads_stream_state, expected_parent_state):
135+
stream = get_stream_by_name("threads", token_config)
136+
threads_partition_router = stream.retriever.stream_slicer._partition_router
137+
threads_partition_router.set_initial_state(stream_state=threads_stream_state)
138+
assert threads_partition_router.parent_stream_configs[0].stream.state["state"] == expected_parent_state
139+
140+
141+
@pytest.mark.parametrize(
142+
"response_status_code, api_response, expected_policy",
143+
(
144+
(
145+
429,
146+
[
147+
# first call rate limited
148+
{"headers": {"Retry-After": "1"}, "text": "rate limited", "status_code": 429},
149+
# refreshed limits on second call
150+
{"json": {"messages": []}, "status_code": 200},
151+
],
152+
MovingWindowCallRatePolicy,
153+
),
154+
(
155+
200,
156+
[
157+
# no rate limits
158+
{"json": {"messages": []}, "status_code": 200},
159+
],
160+
UnlimitedCallRatePolicy,
161+
),
162+
),
163+
ids=["rate_limited_policy", "no_rate_limits_policy"],
164+
)
165+
def test_threads_and_messages_api_budget(response_status_code, api_response, expected_policy, token_config, requests_mock):
166+
stream = get_stream_by_name("threads", token_config)
167+
assert len(stream.retriever.requester._http_client._api_budget._policies) == 1
168+
assert isinstance(stream.retriever.requester._http_client._api_budget._policies[0], UnlimitedCallRatePolicy)
169+
170+
messages = [{"ts": 1577866844}, {"ts": 1577877406}]
171+
172+
requests_mock.register_uri(
173+
"GET",
174+
"https://slack.com/api/conversations.replies",
175+
api_response,
176+
)
177+
requests_mock.register_uri(
178+
"GET",
179+
"https://slack.com/api/conversations.history?limit=1000&channel=airbyte-for-beginners",
180+
[{"json": {"messages": messages}}, {"json": {"messages": []}}],
181+
)
182+
requests_mock.register_uri(
183+
"GET",
184+
"https://slack.com/api/conversations.history?limit=1000&channel=good-reads",
185+
[{"json": {"messages": messages}}, {"json": {"messages": []}}],
186+
)
187+
188+
stream_slice = list(stream.stream_slices(sync_mode=SyncMode.incremental, stream_state={}))[0]
189+
190+
list(stream.retriever.read_records(records_schema={}, stream_slice=stream_slice))
191+
192+
assert len(stream.retriever.requester._http_client._api_budget._policies) == 1
193+
assert isinstance(stream.retriever.requester._http_client._api_budget._policies[0], expected_policy)

docs/integrations/sources/slack.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@ The following instructions guide you through creating a Slack app. Airbyte can o
2323
If you are using a legacy Slack API Key, you can skip this section.
2424
:::
2525

26+
:::warning
27+
Source Slack has a different API Budget Requests Policy for the Channel Messages and Threads streams (one request per minute). We suggest you create a separate connection for those streams so that you do not experience slower syncs across all Slack streams. Please visit [Rate limiting section](https://docs.airbyte.com/integrations/sources/slack#rate-limiting) for more info.
28+
:::
29+
2630
To create a Slack App, read this [tutorial](https://api.slack.com/tutorials/tracks/getting-a-token) on how to create an app, or follow these instructions.
2731

2832
1. Go to your [Apps](https://api.slack.com/apps)
@@ -150,10 +154,14 @@ Expand to see details about Slack connector limitations and troubleshooting.
150154

151155
Slack has [rate limit restrictions](https://api.slack.com/docs/rate-limits).
152156

157+
###### Rate Limits for Channel Messages and Threads streams:
158+
Slack API [rate limits](https://api.slack.com/changelog/2025-05-terms-rate-limit-update-and-faq#what) for the [conversations.replies](https://api.slack.com/methods/conversations.replies) and [conversations.history](https://api.slack.com/methods/conversations.history) endpoints are now limited to one request per minute. Due to the new Source Slack policy, syncs of Channel Messages and Threads streams can be slow. If you want to sync data from Users, Channels and Channel Members streams in Source Slack more quickly, we recommended you create a separate connection for Channel Messages and Threads streams and other streams which observe a different rate limit policy. Please note that Users, Channels and Channel Members streams are not being read with the one request per minute policy, so their read time is only depends on how much data should be extracted.
159+
153160
### Troubleshooting
154161

155162
- Check out common troubleshooting issues for the Slack source connector on our Airbyte Forum [here](https://github.com/airbytehq/airbyte/discussions).
156163

164+
157165
</details>
158166

159167
</HideInUI>
@@ -166,6 +174,7 @@ Slack has [rate limit restrictions](https://api.slack.com/docs/rate-limits).
166174

167175
| Version | Date | Pull Request | Subject |
168176
|:-----------|:-----------|:---------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------|
177+
| 2.2.0-rc.6 | 2025-08-14 | [64553](https://github.com/airbytehq/airbyte/pull/64553) | Add API budget for Threads and Channel Messages streams. |
169178
| 2.2.0-rc.5 | 2025-08-06 | [64530](https://github.com/airbytehq/airbyte/pull/64530) | Set use_cache = true for Channels and Channel Messages streams. |
170179
| 2.2.0-rc.4 | 2025-08-04 | [64486](https://github.com/airbytehq/airbyte/pull/64486) | Add backoff strategy for Channels stream. |
171180
| 2.2.0-rc.3 | 2025-07-29 | [64107](https://github.com/airbytehq/airbyte/pull/64107) | Add custom partition router. |

0 commit comments

Comments
 (0)