Skip to content

AsyncRedisSaver: AttributeError: 'Document' object has no attribute 'blob' when calling aget_state_history() #116

@JHSeo-git

Description

@JHSeo-git

Problem

When using AsyncRedisSaver as the checkpointer backend, calling
aget_state_history() (or any function that internally calls _abatch_load_pending_sends)
raises the following error:

  File "/.venv/lib/python3.13/site-packages/langgraph/pregel/main.py", line 1409, in aget_state_history
    for checkpoint_tuple in [
                            ^
    ...<4 lines>...
    ]:
    ^
  File "/.venv/lib/python3.13/site-packages/langgraph/checkpoint/redis/aio.py", line 788, in alist
    pending_sends_map = await self._abatch_load_pending_sends(
                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
        pending_sends_batch_keys
        ^^^^^^^^^^^^^^^^^^^^^^^^
    )
    ^
  File "/.venv/lib/python3.13/site-packages/langgraph/checkpoint/redis/aio.py", line 1759, in _abatch_load_pending_sends
    results_map[batch_key] = [(d.type, d.blob) for d in sorted_docs]
                                       ^^^^^^
AttributeError: 'Document' object has no attribute 'blob'

Proposed Fix

The issue can be resolved by applying the same logic as in the sync version of the Redis saver:

# langgraph/checkpoint/redis/__init__.py#L1253
            batch_query = FilterQuery(
                filter_expression=thread_filter
                & ns_filter
                & checkpoint_filter
                & channel_filter,
                return_fields=[
                    "checkpoint_id",
                    "type",
                    "$.blob",
                    "task_path",
                    "task_id",
                    "idx",
                ],
                num_results=1000,  # Increased limit for batch loading
            )

            batch_results = await self.checkpoint_writes_index.search(batch_query)

            # Group results by parent checkpoint ID
            writes_by_checkpoint: Dict[str, List[Any]] = {}
            for doc in batch_results.docs:
                parent_checkpoint_id = from_storage_safe_id(doc.checkpoint_id)
                if parent_checkpoint_id not in writes_by_checkpoint:
                    writes_by_checkpoint[parent_checkpoint_id] = []
                writes_by_checkpoint[parent_checkpoint_id].append(doc)

            # Sort and format results for each parent checkpoint
            for parent_checkpoint_id in parent_checkpoint_ids:
                batch_key = (thread_id, checkpoint_ns, parent_checkpoint_id)
                writes = writes_by_checkpoint.get(parent_checkpoint_id, [])

                # Sort results by task_path, task_id, idx
                sorted_writes = sorted(
                    writes,
                    key=lambda x: (
                        getattr(x, "task_path", ""),
                        getattr(x, "task_id", ""),
                        getattr(x, "idx", 0),
                    ),
                )

                # Extract type and blob pairs
                # Handle both direct attribute access and JSON path access
                results_map[batch_key] = [
                    (
                        getattr(doc, "type", ""),
                        getattr(doc, "$.blob", getattr(doc, "blob", b"")),
                    )
                    for doc in sorted_writes
                ]

I’ve already tested this locally with the proposed change, and it resolves the issue without side effects.
If possible, I’ll open a pull request soon to contribute the fix.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions