-
Notifications
You must be signed in to change notification settings - Fork 5
Open
Description
Hi,
I am using FastAPI, Taskiq and PyMongo.
And i am requesting some hints to help me resolve it.
I have an AsyncIOMotorDatabase as dependencies but it's implement getattr magic to manage collection_name resulting in an exception raise through the resolver.
def __bool__(self) -> NoReturn:
raise NotImplementedError(
f"{type(self).__name__} objects do not implement truth "
"value testing or bool(). Please compare "
"with None instead: collection is not None"
)
The issue is in the resolver method :
if getattr(executed_func, "dep_graph", False):
"""
Sync resolver.
This function is used to execute functions
to resolve dependencies.
:param executed_func: function to resolve.
:param initial_cache: cache to build a context if graph was passed.
:raises RuntimeError: if async function is passed as the dependency.
:return: dict with resolved kwargs.
"""
if getattr(executed_func, "dep_graph", False):
ctx = SyncResolveContext(executed_func, self.main_graph, initial_cache)
self.sub_contexts.append(ctx)
sub_result = ctx.resolve_kwargs()
elif inspect.isgenerator(executed_func):
sub_result = next(executed_func)
self.opened_dependencies.append(executed_func)
elif asyncio.iscoroutine(executed_func):
raise RuntimeError(
"Coroutines cannot be used in sync context. "
"Please use async context instead.",
)
elif iscontextmanager(executed_func):
sub_result = executed_func.__enter__()
self.opened_dependencies.append(executed_func)
elif inspect.isasyncgen(executed_func) or isasynccontextmanager(executed_func):
raise RuntimeError(
"Coroutines cannot be used in sync context. "
"Please use async context instead.",
)
else:
sub_result = executed_func
return sub_result`
Metadata
Metadata
Assignees
Labels
No labels