Skip to content

Commit b48327e

Browse files
authored
Merge pull request #46 from tutorcruncher/async-webhooks
Move webhook sending to async
2 parents 09f893e + b7a2e25 commit b48327e

File tree

5 files changed

+107
-92
lines changed

5 files changed

+107
-92
lines changed

chronos/pydantic_schema.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,3 +77,4 @@ class RequestData(BaseModel):
7777
response_headers: str = '{"Message": "No response from endpoint"}'
7878
response_body: str = '{"Message": "No response from endpoint"}'
7979
status_code: int = 999
80+
successful_response: str = False

chronos/worker.py

Lines changed: 61 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import asyncio
12
import hashlib
23
import hmac
34
import json
@@ -7,6 +8,7 @@
78
from celery.app import Celery
89
from fastapi import APIRouter
910
from fastapi_utilities import repeat_at
11+
from httpx import AsyncClient
1012
from sqlalchemy import delete
1113
from sqlmodel import Session, col, select
1214

@@ -22,9 +24,10 @@
2224
celery_app.conf.broker_connection_retry_on_startup = True
2325

2426

25-
def webhook_request(url: str, *, method: str = 'POST', webhook_sig: str, data: dict = None):
27+
async def webhook_request(client: AsyncClient, url: str, *, webhook_sig: str, data: dict = None):
2628
"""
2729
Send a request to TutorCruncher
30+
:param client
2831
:param url: The endpoint supplied by clients when creating an integration in TC2
2932
:param method: We should always be sending POST requests as we are sending data to the endpoints
3033
:param webhook_sig: The signature generated by hashing the payload with the shared key
@@ -39,10 +42,10 @@ def webhook_request(url: str, *, method: str = 'POST', webhook_sig: str, data: d
3942
'webhook-signature': webhook_sig,
4043
}
4144
logfire.debug('TutorCruncher request to url: {url=}: {data=}', url=url, data=data)
42-
with logfire.span('{method=} {url!r}', url=url, method=method):
45+
with logfire.span('{method=} {url!r}', url=url, method='POST'):
4346
r = None
4447
try:
45-
r = session.request(method=method, url=url, json=data, headers=headers, timeout=4)
48+
r = await client.post(url=url, json=data, headers=headers, timeout=4)
4649
except requests.exceptions.HTTPError as httperr:
4750
app_logger.info('HTTP error sending webhook to %s: %s', url, httperr)
4851
except requests.exceptions.ConnectionError as conerr:
@@ -52,18 +55,15 @@ def webhook_request(url: str, *, method: str = 'POST', webhook_sig: str, data: d
5255
except requests.exceptions.RequestException as rerr:
5356
app_logger.info('Request error sending webhook to %s: %s', url, rerr)
5457
else:
55-
app_logger.info('Request method=%s url=%s status_code=%s', method, url, r.status_code, extra={'data': data})
58+
app_logger.info('Request method=%s url=%s status_code=%s', 'POST', url, r.status_code, extra={'data': data})
5659

5760
request_data = RequestData(request_headers=json.dumps(headers), request_body=json.dumps(data))
58-
if r is None:
59-
webhook_was_received = False
60-
else:
61+
if r is not None:
6162
request_data.response_headers = json.dumps(dict(r.headers))
6263
request_data.response_body = json.dumps(r.content.decode())
6364
request_data.status_code = r.status_code
64-
webhook_was_received = True
65-
66-
return request_data, webhook_was_received
65+
request_data.successful_response = True
66+
return request_data
6767

6868

6969
acceptable_url_schemes = ('http', 'https', 'ftp', 'ftps')
@@ -83,28 +83,11 @@ def get_qlength():
8383
return qlength
8484

8585

86-
@celery_app.task
87-
def task_send_webhooks(
88-
payload: str,
89-
url_extension: str = None,
90-
):
91-
"""
92-
Send the webhook to the relevant endpoints
93-
"""
94-
loaded_payload = json.loads(payload)
95-
loaded_payload['_request_time'] = loaded_payload.pop('request_time')
96-
branch_id = loaded_payload['events'][0]['branch']
97-
98-
qlength = get_qlength()
99-
app_logger.info('Starting send webhook task for branch %s. qlength=%s.', branch_id, qlength)
100-
if qlength > 100:
101-
app_logger.error('Queue is too long. Check workers and speeds.')
102-
86+
async def _async_post_webhooks(endpoints, url_extension, payload):
87+
webhook_logs = []
10388
total_success, total_failed = 0, 0
104-
with Session(engine) as db:
105-
# Get all the endpoints for the branch
106-
endpoints_query = select(WebhookEndpoint).where(WebhookEndpoint.branch_id == branch_id, WebhookEndpoint.active)
107-
endpoints = db.exec(endpoints_query).all()
89+
async with AsyncClient() as client:
90+
tasks = []
10891
for endpoint in endpoints:
10992
# Check if the webhook URL is valid
11093
if not endpoint.webhook_url.startswith(acceptable_url_schemes):
@@ -122,9 +105,16 @@ def task_send_webhooks(
122105
if url_extension:
123106
url += f'/{url_extension}'
124107
# Send the Webhook to the endpoint
125-
response, webhook_sent = webhook_request(url, webhook_sig=sig_hex, data=loaded_payload)
126108

127-
if not webhook_sent:
109+
loaded_payload = json.loads(payload)
110+
task = asyncio.ensure_future(webhook_request(client, url, webhook_sig=sig_hex, data=loaded_payload))
111+
tasks.append(task)
112+
webhook_responses = await asyncio.gather(*tasks, return_exceptions=True)
113+
for response in webhook_responses:
114+
if not isinstance(response, RequestData):
115+
app_logger.info('No response from endpoint %s: %s. %s', endpoint.id, endpoint.webhook_url, response)
116+
continue
117+
elif not response.successful_response:
128118
app_logger.info('No response from endpoint %s: %s', endpoint.id, endpoint.webhook_url)
129119

130120
if response.status_code in {200, 201, 202, 204}:
@@ -135,16 +125,45 @@ def task_send_webhooks(
135125
total_failed += 1
136126

137127
# Log the response
138-
webhooklog = WebhookLog(
139-
webhook_endpoint_id=endpoint.id,
140-
request_headers=response.request_headers,
141-
request_body=response.request_body,
142-
response_headers=response.response_headers,
143-
response_body=response.response_body,
144-
status=status,
145-
status_code=response.status_code,
128+
webhook_logs.append(
129+
WebhookLog(
130+
webhook_endpoint_id=endpoint.id,
131+
request_headers=response.request_headers,
132+
request_body=response.request_body,
133+
response_headers=response.response_headers,
134+
response_body=response.response_body,
135+
status=status,
136+
status_code=response.status_code,
137+
)
146138
)
147-
db.add(webhooklog)
139+
return webhook_logs, total_success, total_failed
140+
141+
142+
@celery_app.task
143+
def task_send_webhooks(
144+
payload: str,
145+
url_extension: str = None,
146+
):
147+
"""
148+
Send the webhook to the relevant endpoints
149+
"""
150+
loaded_payload = json.loads(payload)
151+
loaded_payload['_request_time'] = loaded_payload.pop('request_time')
152+
branch_id = loaded_payload['events'][0]['branch']
153+
154+
qlength = get_qlength()
155+
app_logger.info('Starting send webhook task for branch %s. qlength=%s.', branch_id, qlength)
156+
if qlength > 100:
157+
app_logger.error('Queue is too long. Check workers and speeds.')
158+
159+
with Session(engine) as db:
160+
# Get all the endpoints for the branch
161+
endpoints_query = select(WebhookEndpoint).where(WebhookEndpoint.branch_id == branch_id, WebhookEndpoint.active)
162+
endpoints = db.exec(endpoints_query).all()
163+
164+
webhook_logs, total_success, total_failed = asyncio.run(_async_post_webhooks(endpoints, url_extension, payload))
165+
for webhook_log in webhook_logs:
166+
db.add(webhook_log)
148167
db.commit()
149168
app_logger.info(
150169
'%s Webhooks sent for branch %s. Total Sent: %s. Total failed: %s',

requirements.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,5 @@ setuptools~=75.2.0
2020
SQLAlchemy~=2.0.36
2121
sqlmodel~=0.0.22
2222
starlette~=0.41.2
23-
uvicorn~=0.32.0
23+
uvicorn~=0.32.0
24+
respx~=0.21.1

tests/test_helpers.py

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import json
22

3-
from requests import Request, Response
3+
from httpx import Response
4+
from requests import Request
45

56
from chronos.main import app
67
from chronos.sql_models import WebhookEndpoint, WebhookLog
@@ -116,10 +117,7 @@ def get_successful_response(payload, headers, **kwargs) -> Response:
116117
request = Request()
117118
request.headers = headers
118119
request.body = json.dumps(payload).encode()
119-
response = Response()
120-
response.request = request
121-
response.status_code = 200
122-
response._content = json.dumps(response_dict).encode()
120+
response = Response(status_code=200, request=request, content=json.dumps(response_dict).encode())
123121
return response
124122

125123

@@ -130,8 +128,5 @@ def get_failed_response(payload, headers, **kwargs) -> Response:
130128
request = Request()
131129
request.headers = headers
132130
request.body = json.dumps(payload).encode()
133-
response = Response()
134-
response.request = request
135-
response.status_code = 409
136-
response._content = json.dumps(response_dict).encode()
131+
response = Response(status_code=409, request=request, content=json.dumps(response_dict).encode())
137132
return response

0 commit comments

Comments
 (0)