Skip to content

Commit 1a2edc4

Browse files
authored
Merge pull request #48 from tutorcruncher/extend-timeout-reduce-logging
Handle big loads of connections
2 parents 6e9e2a6 + bb3ca3c commit 1a2edc4

File tree

1 file changed

+23
-12
lines changed

1 file changed

+23
-12
lines changed

chronos/worker.py

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
import json
55
from datetime import datetime, timedelta
66

7+
import httpx
8+
import logfire
79
import requests
810
from celery.app import Celery
911
from fastapi import APIRouter
@@ -41,7 +43,6 @@ async def webhook_request(client: AsyncClient, url: str, endpoint_id: int, *, we
4143
'Content-Type': 'application/json',
4244
'webhook-signature': webhook_sig,
4345
}
44-
logfire.debug('TutorCruncher request to url: {url=}: {data=}', url=url, data=data)
4546
with logfire.span('{method=} {url!r}', url=url, method='POST'):
4647
r = None
4748
try:
@@ -54,8 +55,6 @@ async def webhook_request(client: AsyncClient, url: str, endpoint_id: int, *, we
5455
app_logger.info('Timeout error sending webhook to %s: %s', url, terr)
5556
except requests.exceptions.RequestException as rerr:
5657
app_logger.info('Request error sending webhook to %s: %s', url, rerr)
57-
else:
58-
app_logger.info('Request method=%s url=%s status_code=%s', 'POST', url, r.status_code, extra={'data': data})
5958

6059
request_data = RequestData(
6160
endpoint_id=endpoint_id, request_headers=json.dumps(headers), request_body=json.dumps(data)
@@ -79,6 +78,11 @@ def get_qlength():
7978
qlength = 0
8079
celery_inspector = celery_app.control.inspect()
8180
dict_of_queues = celery_inspector.reserved()
81+
if dict_of_queues and isinstance(dict_of_queues, dict):
82+
for k, v in dict_of_queues.items():
83+
qlength += len(v)
84+
85+
dict_of_queues = celery_inspector.active()
8286
if dict_of_queues and isinstance(dict_of_queues, dict):
8387
for k, v in dict_of_queues.items():
8488
qlength += len(v)
@@ -88,7 +92,9 @@ def get_qlength():
8892
async def _async_post_webhooks(endpoints, url_extension, payload):
8993
webhook_logs = []
9094
total_success, total_failed = 0, 0
91-
async with AsyncClient() as client:
95+
# Temporary fix for the issue with the number of connections caused by a certain client
96+
limits = httpx.Limits(max_connections=250)
97+
async with AsyncClient(limits=limits) as client:
9298
tasks = []
9399
for endpoint in endpoints:
94100
# Check if the webhook URL is valid
@@ -160,15 +166,20 @@ def task_send_webhooks(
160166
if qlength > 100:
161167
app_logger.error('Queue is too long. Check workers and speeds.')
162168

163-
with Session(engine) as db:
164-
# Get all the endpoints for the branch
165-
endpoints_query = select(WebhookEndpoint).where(WebhookEndpoint.branch_id == branch_id, WebhookEndpoint.active)
166-
endpoints = db.exec(endpoints_query).all()
169+
with logfire.span('Sending webhooks for branch: {branch_id=}', branch_id=branch_id):
170+
with Session(engine) as db:
171+
# Get all the endpoints for the branch
172+
endpoints_query = select(WebhookEndpoint).where(
173+
WebhookEndpoint.branch_id == branch_id, WebhookEndpoint.active
174+
)
175+
endpoints = db.exec(endpoints_query).all()
167176

168-
webhook_logs, total_success, total_failed = asyncio.run(_async_post_webhooks(endpoints, url_extension, payload))
169-
for webhook_log in webhook_logs:
170-
db.add(webhook_log)
171-
db.commit()
177+
webhook_logs, total_success, total_failed = asyncio.run(
178+
_async_post_webhooks(endpoints, url_extension, payload)
179+
)
180+
for webhook_log in webhook_logs:
181+
db.add(webhook_log)
182+
db.commit()
172183
app_logger.info(
173184
'%s Webhooks sent for branch %s. Total Sent: %s. Total failed: %s',
174185
total_success + total_failed,

0 commit comments

Comments
 (0)