Skip to content

Commit 45ea971

Browse files
authored
clean(fai): Refactor shared integration logic into integration_common.py. (#5562)
1 parent aa7ea3e commit 45ea971

File tree

3 files changed

+282
-472
lines changed

3 files changed

+282
-472
lines changed

servers/fai/src/fai/routes/slack_ask_fern.py

Lines changed: 36 additions & 239 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,10 @@
11
import asyncio
2-
import hashlib
3-
import hmac
42
import json
5-
import time
63
from datetime import (
74
UTC,
85
datetime,
9-
timedelta,
106
)
117
from typing import Any
12-
from urllib.parse import quote
138
from uuid import uuid4
149

1510
from fastapi import (
@@ -23,11 +18,7 @@
2318
Response,
2419
)
2520
from slack_sdk.web.async_client import AsyncWebClient
26-
from sqlalchemy import (
27-
delete,
28-
select,
29-
)
30-
from sqlalchemy.dialects.postgresql import insert
21+
from sqlalchemy import select
3122
from sqlalchemy.ext.asyncio import AsyncSession
3223
from sqlalchemy.orm import attributes
3324

@@ -36,7 +27,6 @@
3627
from fai.dependencies import (
3728
ask_ai_enabled,
3829
get_db,
39-
strip_domain,
4030
verify_token,
4131
)
4232
from fai.models.api.update_channel_settings import ChannelSettings
@@ -56,6 +46,14 @@
5646
send_error_message,
5747
update_modal,
5848
)
49+
from fai.utils.slack.integration_common import (
50+
cleanup_message_cache,
51+
create_integration,
52+
create_slack_integration_url,
53+
handle_oauth_callback,
54+
is_message_processed,
55+
mark_message_processed,
56+
)
5957
from fai.utils.slack.message_handler import (
6058
get_slack_integration,
6159
get_thread_history,
@@ -64,122 +62,27 @@
6462
)
6563
from fai.utils.slack.response_qa import log_message_for_qa
6664

67-
MESSAGE_CACHE_TTL = 600
68-
69-
70-
async def cleanup_message_cache() -> None:
71-
cutoff_time = datetime.now(UTC) - timedelta(seconds=MESSAGE_CACHE_TTL)
72-
73-
async with async_session_maker() as session:
74-
await session.execute(delete(SlackMessageCacheDb).where(SlackMessageCacheDb.processed_at < cutoff_time))
75-
await session.commit()
76-
77-
78-
async def is_message_processed(team_id: str, message_ts: str) -> bool:
79-
async with async_session_maker() as session:
80-
result = await session.execute(
81-
select(SlackMessageCacheDb).where(
82-
SlackMessageCacheDb.team_id == team_id, SlackMessageCacheDb.message_ts == message_ts
83-
)
84-
)
85-
return result.scalar_one_or_none() is not None
86-
87-
88-
async def mark_message_processed(team_id: str, message_ts: str) -> None:
89-
async with async_session_maker() as session:
90-
stmt = insert(SlackMessageCacheDb).values(
91-
id=str(uuid4()), message_ts=message_ts, team_id=team_id, processed_at=datetime.now(UTC)
92-
)
93-
stmt = stmt.on_conflict_do_nothing(constraint="uq_slack_message_cache_team_message")
94-
await session.execute(stmt)
95-
await session.commit()
96-
97-
98-
def verify_slack_signature(request_body: bytes, timestamp: str, signature: str) -> bool:
99-
if abs(time.time() - float(timestamp)) > 60 * 5:
100-
return False
101-
102-
sig_basestring = f"v0:{timestamp}:{request_body.decode('utf-8')}"
103-
104-
my_signature = (
105-
"v0=" + hmac.new(VARIABLES.SLACK_SIGNING_SECRET.encode(), sig_basestring.encode(), hashlib.sha256).hexdigest()
106-
)
107-
108-
return hmac.compare_digest(my_signature, signature)
109-
11065

11166
async def get_domain_from_slack_team(team_id: str) -> str | None:
67+
"""Get the domain associated with a Slack team ID."""
11268
async with async_session_maker() as session:
11369
result = await session.execute(select(SlackIntegrationDb).where(SlackIntegrationDb.slack_team_id == team_id))
11470
integration = result.scalar_one_or_none()
11571
return integration.domain if integration else None
11672

11773

118-
def create_slack_integration_url(integration_id: str) -> str:
119-
scopes = [
120-
"app_mentions:read",
121-
"channels:history",
122-
"channels:join",
123-
"channels:read",
124-
"chat:write",
125-
"commands",
126-
"groups:history",
127-
"im:history",
128-
"mpim:history",
129-
"reactions:read",
130-
"reactions:write",
131-
"users:read",
132-
"users:read.email",
133-
]
134-
scope_string = ",".join(scopes)
135-
return (
136-
f"https://slack.com/oauth/v2/authorize?"
137-
f"client_id={VARIABLES.SLACK_CLIENT_ID}&"
138-
f"scope={quote(scope_string)}&"
139-
f"state={integration_id}"
140-
)
141-
142-
14374
@fai_app.post("/slack/install", openapi_extra={"x-fern-audiences": ["customers"], "security": [{"bearerAuth": []}]})
14475
async def create_slack_integration(
14576
domain: str, db: AsyncSession = Depends(get_db), _: None = Depends(verify_token), __: None = Depends(ask_ai_enabled)
14677
) -> SlackIntegrationResponse:
147-
try:
148-
stripped_domain = strip_domain(domain)
149-
150-
existing = await db.execute(select(SlackIntegrationDb).where(SlackIntegrationDb.domain == stripped_domain))
151-
existing_record = existing.scalar_one_or_none()
152-
if existing_record:
153-
integration_url = create_slack_integration_url(existing_record.integration_id)
154-
return SlackIntegrationResponse(
155-
integration_id=existing_record.integration_id,
156-
domain=existing_record.domain,
157-
slack_team_id=existing_record.slack_team_id,
158-
slack_team_name=existing_record.slack_team_name,
159-
created_at=existing_record.created_at,
160-
installed_at=existing_record.installed_at,
161-
integration_url=integration_url,
162-
)
163-
164-
new_integration = SlackIntegrationDb(domain=stripped_domain, created_at=datetime.now(UTC))
165-
db.add(new_integration)
166-
await db.commit()
167-
await db.refresh(new_integration)
168-
169-
integration_url = create_slack_integration_url(new_integration.integration_id)
170-
return SlackIntegrationResponse(
171-
integration_id=new_integration.integration_id,
172-
domain=new_integration.domain,
173-
slack_team_id=new_integration.slack_team_id,
174-
slack_team_name=new_integration.slack_team_name,
175-
created_at=new_integration.created_at,
176-
installed_at=new_integration.installed_at,
177-
integration_url=integration_url,
178-
)
179-
180-
except Exception:
181-
LOGGER.error("Failed to create Slack integration")
182-
raise HTTPException(status_code=500, detail="Failed to create integration")
78+
result = await create_integration(
79+
domain=domain,
80+
db=db,
81+
integration_db_model=SlackIntegrationDb,
82+
client_id=VARIABLES.SLACK_CLIENT_ID,
83+
log_prefix="[ASK_FERN]",
84+
)
85+
return SlackIntegrationResponse(**result)
18386

18487

18588
@fai_app.post("/slack/events", openapi_extra={"x-fern-audiences": ["internal"]})
@@ -206,11 +109,11 @@ async def handle_slack_events(request: Request, background_tasks: BackgroundTask
206109

207110
LOGGER.info(f"Received Slack event: {event_type} from team: {team_id}")
208111

209-
await cleanup_message_cache()
112+
await cleanup_message_cache(SlackMessageCacheDb)
210113

211114
message_ts = event.get("ts")
212115
if message_ts:
213-
if await is_message_processed(team_id, message_ts):
116+
if await is_message_processed(team_id, message_ts, SlackMessageCacheDb):
214117
LOGGER.info(f"Skipping duplicate message: {message_ts}")
215118
return JSONResponse(content={"status": "ok"})
216119

@@ -220,7 +123,9 @@ async def handle_slack_events(request: Request, background_tasks: BackgroundTask
220123
return JSONResponse(content={"status": "ok"})
221124

222125
if message_ts:
223-
await mark_message_processed(team_id, message_ts)
126+
await mark_message_processed(
127+
team_id, message_ts, SlackMessageCacheDb, "uq_slack_message_cache_team_message"
128+
)
224129
await handle_app_mention(event, team_id, background_tasks)
225130
elif event_type == "message":
226131
if event.get("bot_id"):
@@ -238,7 +143,9 @@ async def handle_slack_events(request: Request, background_tasks: BackgroundTask
238143
return JSONResponse(content={"status": "ok"})
239144

240145
if message_ts:
241-
await mark_message_processed(team_id, message_ts)
146+
await mark_message_processed(
147+
team_id, message_ts, SlackMessageCacheDb, "uq_slack_message_cache_team_message"
148+
)
242149
await handle_message(event, team_id, background_tasks)
243150
else:
244151
LOGGER.info(f"Unhandled event type: {event_type}")
@@ -1266,79 +1173,14 @@ async def handle_message(event: dict[str, Any], team_id: str, background_tasks:
12661173

12671174
@fai_app.get("/slack/oauth/callback", openapi_extra={"x-fern-audiences": ["internal"]})
12681175
async def handle_slack_oauth_callback(code: str, state: str | None = None) -> JSONResponse:
1269-
try:
1270-
LOGGER.info(f"Received OAuth callback with code: {code[:10]}... and state: {state}")
1271-
1272-
if not state:
1273-
raise HTTPException(status_code=400, detail="Missing integration_id in state parameter")
1274-
1275-
async with async_session_maker() as session:
1276-
result = await session.execute(select(SlackIntegrationDb).where(SlackIntegrationDb.integration_id == state))
1277-
integration = result.scalar_one_or_none()
1278-
1279-
if not integration:
1280-
raise HTTPException(status_code=404, detail="Invalid integration_id")
1281-
1282-
if not VARIABLES.SLACK_CLIENT_ID or not VARIABLES.SLACK_CLIENT_SECRET:
1283-
LOGGER.error("Slack OAuth credentials not configured")
1284-
raise HTTPException(status_code=500, detail="OAuth not configured")
1285-
1286-
client = AsyncWebClient()
1287-
oauth_response = await client.oauth_v2_access(
1288-
client_id=VARIABLES.SLACK_CLIENT_ID, client_secret=VARIABLES.SLACK_CLIENT_SECRET, code=code
1289-
)
1290-
1291-
if not oauth_response.get("ok"):
1292-
LOGGER.error(f"OAuth exchange error: {oauth_response.get('error')}")
1293-
raise HTTPException(status_code=500, detail=oauth_response.get("error", "OAuth failed"))
1294-
1295-
team_id = oauth_response.get("team", {}).get("id")
1296-
1297-
if team_id:
1298-
existing_team_result = await session.execute(
1299-
select(SlackIntegrationDb).where(
1300-
SlackIntegrationDb.slack_team_id == team_id, SlackIntegrationDb.integration_id != state
1301-
)
1302-
)
1303-
existing_team_integration = existing_team_result.scalar_one_or_none()
1304-
1305-
if existing_team_integration:
1306-
LOGGER.info(
1307-
f"Removing team {team_id} from old integration {existing_team_integration.integration_id}"
1308-
)
1309-
existing_team_integration.slack_team_id = None
1310-
existing_team_integration.slack_team_name = None
1311-
existing_team_integration.slack_bot_token = None
1312-
existing_team_integration.slack_bot_user_id = None
1313-
existing_team_integration.slack_app_id = None
1314-
existing_team_integration.installed_at = None
1315-
await session.flush()
1316-
1317-
integration.slack_team_id = team_id
1318-
integration.slack_team_name = oauth_response.get("team", {}).get("name")
1319-
integration.slack_bot_token = oauth_response.get("access_token")
1320-
integration.slack_bot_user_id = oauth_response.get("bot_user_id")
1321-
integration.slack_app_id = oauth_response.get("app_id")
1322-
integration.installed_at = datetime.now(UTC)
1323-
1324-
await session.commit()
1325-
1326-
LOGGER.info(f"Successfully installed Slack app for team: {integration.slack_team_id}")
1327-
1328-
return JSONResponse(
1329-
content={
1330-
"status": "success",
1331-
"message": "Slack app successfully installed",
1332-
"team_id": integration.slack_team_id,
1333-
"domain": integration.domain,
1334-
}
1335-
)
1336-
1337-
except HTTPException:
1338-
raise
1339-
except Exception as e:
1340-
LOGGER.error(f"Error handling Slack OAuth callback: {e}")
1341-
raise HTTPException(status_code=500, detail="OAuth callback failed")
1176+
return await handle_oauth_callback(
1177+
code=code,
1178+
state=state,
1179+
integration_db_model=SlackIntegrationDb,
1180+
client_id=VARIABLES.SLACK_CLIENT_ID,
1181+
client_secret=VARIABLES.SLACK_CLIENT_SECRET,
1182+
log_prefix="[ASK_FERN]",
1183+
)
13421184

13431185

13441186
@fai_app.get("/slack/get-install", openapi_extra={"x-fern-audiences": ["customers"], "security": [{"bearerAuth": []}]})
@@ -1355,6 +1197,8 @@ async def get_slack_install_link(domain: str) -> JSONResponse:
13551197
integration_id = new_integration.integration_id
13561198
LOGGER.info(f"Created new integration {integration_id} for domain {domain}")
13571199

1200+
install_url = create_slack_integration_url(integration_id, VARIABLES.SLACK_CLIENT_ID)
1201+
13581202
scopes = [
13591203
"app_mentions:read",
13601204
"channels:history",
@@ -1371,15 +1215,6 @@ async def get_slack_install_link(domain: str) -> JSONResponse:
13711215
"users:read.email",
13721216
]
13731217

1374-
scope_string = ",".join(scopes)
1375-
1376-
install_url = (
1377-
f"https://slack.com/oauth/v2/authorize?"
1378-
f"client_id={VARIABLES.SLACK_CLIENT_ID}&"
1379-
f"scope={quote(scope_string)}&"
1380-
f"state={integration_id}"
1381-
)
1382-
13831218
return JSONResponse(
13841219
content={
13851220
"integration_id": integration_id,
@@ -1392,41 +1227,3 @@ async def get_slack_install_link(domain: str) -> JSONResponse:
13921227
except Exception as e:
13931228
LOGGER.error(f"Error generating Slack install link: {e}")
13941229
raise HTTPException(status_code=500, detail="Failed to generate install link")
1395-
1396-
1397-
@fai_app.get("/slack/integrations/{domain}", openapi_extra={"x-fern-audiences": ["internal"]})
1398-
async def list_slack_integrations(domain: str) -> JSONResponse:
1399-
try:
1400-
async with async_session_maker() as session:
1401-
result = await session.execute(
1402-
select(SlackIntegrationDb)
1403-
.where(SlackIntegrationDb.domain == domain)
1404-
.order_by(SlackIntegrationDb.created_at.desc())
1405-
)
1406-
integrations = result.scalars().all()
1407-
1408-
integration_list = []
1409-
for integration in integrations:
1410-
integration_list.append(
1411-
{
1412-
"integration_id": integration.integration_id,
1413-
"domain": integration.domain,
1414-
"slack_team_id": integration.slack_team_id,
1415-
"slack_team_name": integration.slack_team_name,
1416-
"created_at": integration.created_at.isoformat() if integration.created_at else None,
1417-
"installed_at": integration.installed_at.isoformat() if integration.installed_at else None,
1418-
"is_installed": integration.slack_team_id is not None,
1419-
}
1420-
)
1421-
1422-
return JSONResponse(
1423-
content={
1424-
"domain": domain,
1425-
"integrations": integration_list,
1426-
"total_count": len(integration_list),
1427-
}
1428-
)
1429-
1430-
except Exception as e:
1431-
LOGGER.error(f"Error listing Slack integrations for domain {domain}: {e}")
1432-
raise HTTPException(status_code=500, detail="Failed to list integrations")

0 commit comments

Comments
 (0)