1- import asyncio
1+ import concurrent . futures
22from collections .abc import Callable
33from typing import Any
44
@@ -20,38 +20,40 @@ def add_task(self, func: Callable[..., Any], *args: Any, **kwargs: Any) -> None:
2020
2121 if settings .use_docket :
2222 logger .info ("Scheduling task through Docket" )
23- # Schedule task directly in Docket's Redis queue
24- self ._schedule_docket_task (func , * args , ** kwargs )
23+
24+ # Create a wrapper that will handle Docket scheduling in a thread
25+ def docket_wrapper ():
26+ """Wrapper function that schedules the task through Docket"""
27+
28+ def run_in_thread ():
29+ """Run the async Docket operations in a separate thread"""
30+ import asyncio
31+
32+ from docket import Docket
33+
34+ async def schedule_task ():
35+ async with Docket (
36+ name = settings .docket_name ,
37+ url = settings .redis_url ,
38+ ) as docket :
39+ # Schedule task in Docket's queue
40+ await docket .add (func )(* args , ** kwargs )
41+
42+ # Run in a new event loop in this thread
43+ asyncio .run (schedule_task ())
44+
45+ # Execute in a thread pool to avoid event loop conflicts
46+ with concurrent .futures .ThreadPoolExecutor () as executor :
47+ future = executor .submit (run_in_thread )
48+ future .result () # Wait for completion
49+
50+ # Add the wrapper to FastAPI background tasks
51+ super ().add_task (docket_wrapper )
2552 else :
2653 logger .info ("Using FastAPI background tasks" )
2754 # Use FastAPI's background tasks directly
2855 super ().add_task (func , * args , ** kwargs )
2956
30- def _schedule_docket_task (
31- self , func : Callable [..., Any ], * args : Any , ** kwargs : Any
32- ) -> None :
33- """Schedule a task in Docket's Redis queue"""
34-
35- async def schedule_task ():
36- from docket import Docket
37-
38- async with Docket (
39- name = settings .docket_name ,
40- url = settings .redis_url ,
41- ) as docket :
42- # Schedule task in Docket's queue
43- await docket .add (func )(* args , ** kwargs )
44-
45- # Run the async scheduling operation
46- try :
47- # Try to get the current event loop
48- loop = asyncio .get_running_loop ()
49- # If we're in an async context, create a task
50- loop .create_task (schedule_task ())
51- except RuntimeError :
52- # If no event loop is running, run it synchronously
53- asyncio .run (schedule_task ())
54-
5557
5658# Backwards compatibility alias
5759DocketBackgroundTasks = HybridBackgroundTasks
0 commit comments