From 7e14ae8773773813a9abf140937e38e244b92e2a Mon Sep 17 00:00:00 2001 From: Brian Sam-Bodden Date: Tue, 21 Oct 2025 08:20:30 -0700 Subject: [PATCH] feat(checkpoint)!: sort list/alist by checkpoint ID descending (#106) Add sort_by parameter to FilterQuery in all list/alist implementations to return checkpoints in descending order (newest first). This matches SQLite checkpointer behavior and enables efficient lookup of recent/ crashed sessions without fetching all checkpoints into memory. Updated implementations: - AsyncRedisSaver.alist() (aio.py:709) - RedisSaver.list() (__init__.py:279) - AsyncShallowRedisSaver.alist() (ashallow.py:384) - ShallowRedisSaver.list() (shallow.py:334) BREAKING CHANGE: Checkpoints are now returned in DESC order (newest first) instead of ASC order (oldest first). Code that accessed the most recent checkpoint with `list(saver.list(config))[-1]` must now use `list(saver.list(config))[0]`. Fixes #106 --- langgraph/checkpoint/redis/__init__.py | 2 + langgraph/checkpoint/redis/aio.py | 2 + langgraph/checkpoint/redis/ashallow.py | 2 + langgraph/checkpoint/redis/shallow.py | 2 + tests/test_alist_sort_order.py | 264 +++++++++++++++++++++++++ tests/test_streaming.py | 6 +- 6 files changed, 276 insertions(+), 2 deletions(-) create mode 100644 tests/test_alist_sort_order.py diff --git a/langgraph/checkpoint/redis/__init__.py b/langgraph/checkpoint/redis/__init__.py index e6206fb..50b4f37 100644 --- a/langgraph/checkpoint/redis/__init__.py +++ b/langgraph/checkpoint/redis/__init__.py @@ -263,6 +263,7 @@ def list( combined_filter &= expr # Construct the Redis query + # Sort by checkpoint_id in descending order to get most recent checkpoints first query = FilterQuery( filter_expression=combined_filter, return_fields=[ @@ -275,6 +276,7 @@ def list( "has_writes", # Include has_writes to optimize pending_writes loading ], num_results=limit or 10000, + sort_by=("checkpoint_id", "DESC"), ) # Execute the query diff --git a/langgraph/checkpoint/redis/aio.py b/langgraph/checkpoint/redis/aio.py index ec4b699..f8e87d3 100644 --- a/langgraph/checkpoint/redis/aio.py +++ b/langgraph/checkpoint/redis/aio.py @@ -693,6 +693,7 @@ async def alist( combined_filter &= expr # Construct the Redis query + # Sort by checkpoint_id in descending order to get most recent checkpoints first query = FilterQuery( filter_expression=combined_filter, return_fields=[ @@ -705,6 +706,7 @@ async def alist( "has_writes", # Include has_writes to optimize pending_writes loading ], num_results=limit or 10000, + sort_by=("checkpoint_id", "DESC"), ) # Execute the query asynchronously diff --git a/langgraph/checkpoint/redis/ashallow.py b/langgraph/checkpoint/redis/ashallow.py index b86c4ce..46cfe10 100644 --- a/langgraph/checkpoint/redis/ashallow.py +++ b/langgraph/checkpoint/redis/ashallow.py @@ -368,6 +368,7 @@ async def alist( for expr in query_filter[1:]: combined_filter &= expr + # Sort by checkpoint_id in descending order to get most recent checkpoints first query = FilterQuery( filter_expression=combined_filter, return_fields=[ @@ -380,6 +381,7 @@ async def alist( "ts", ], num_results=limit or 100, # Set higher limit to retrieve more results + sort_by=("checkpoint_id", "DESC"), ) results = await self.checkpoints_index.search(query) diff --git a/langgraph/checkpoint/redis/shallow.py b/langgraph/checkpoint/redis/shallow.py index ef54483..0d63dbe 100644 --- a/langgraph/checkpoint/redis/shallow.py +++ b/langgraph/checkpoint/redis/shallow.py @@ -321,6 +321,7 @@ def list( combined_filter &= expr # Get checkpoint data + # Sort by checkpoint_id in descending order to get most recent checkpoints first query = FilterQuery( filter_expression=combined_filter, return_fields=[ @@ -330,6 +331,7 @@ def list( "$.metadata", ], num_results=limit or 10000, + sort_by=("checkpoint_id", "DESC"), ) # Execute the query diff --git a/tests/test_alist_sort_order.py b/tests/test_alist_sort_order.py new file mode 100644 index 0000000..1dbf703 --- /dev/null +++ b/tests/test_alist_sort_order.py @@ -0,0 +1,264 @@ +"""Test for issue #106: alist should sort by checkpoint ID DESC.""" + +import asyncio +import time +from typing import AsyncGenerator, Generator + +import pytest +from langchain_core.runnables import RunnableConfig +from langgraph.checkpoint.base import ( + Checkpoint, + CheckpointMetadata, + create_checkpoint, + empty_checkpoint, +) +from ulid import ULID + +from langgraph.checkpoint.redis import RedisSaver +from langgraph.checkpoint.redis.aio import AsyncRedisSaver + + +@pytest.fixture +async def async_saver(redis_url: str) -> AsyncGenerator[AsyncRedisSaver, None]: + """Async saver fixture.""" + saver = AsyncRedisSaver(redis_url) + await saver.asetup() + yield saver + + +@pytest.fixture +def sync_saver(redis_url: str) -> Generator[RedisSaver, None, None]: + """Sync saver fixture.""" + saver = RedisSaver(redis_url) + saver.setup() + yield saver + + +@pytest.mark.asyncio +async def test_alist_sorts_by_checkpoint_id_desc(async_saver: AsyncRedisSaver) -> None: + """Test that alist returns checkpoints sorted by checkpoint ID in descending order. + + This is a reproducer for issue #106: when listing checkpoints, they should be + sorted by checkpoint ID (which embeds timestamp via ULID) in descending order, + so that the most recent checkpoints appear first. This allows users to efficiently + find crashed/unfinished sessions after restart. + """ + thread_id = "test-thread-sort" + checkpoint_ns = "" + + # Create multiple checkpoints with increasing timestamps + # We'll use explicit checkpoint IDs with different timestamps to ensure ordering + checkpoint_ids = [] + + # Create 5 checkpoints with small delays between them to ensure different timestamps + for i in range(5): + # Create a checkpoint with a unique ULID + checkpoint_id = str(ULID()) + checkpoint_ids.append(checkpoint_id) + + config: RunnableConfig = { + "configurable": { + "thread_id": thread_id, + "checkpoint_id": checkpoint_id, + "checkpoint_ns": checkpoint_ns, + } + } + + checkpoint: Checkpoint = empty_checkpoint() + checkpoint["id"] = checkpoint_id + + metadata: CheckpointMetadata = { + "source": "test", + "step": i, + "writes": {}, + } + + await async_saver.aput(config, checkpoint, metadata, {}) + + # Small delay to ensure different ULID timestamps + # ULID has millisecond precision, so we need to wait at least 1ms + await asyncio.sleep(0.01) + + # Now list all checkpoints for this thread + config: RunnableConfig = { + "configurable": { + "thread_id": thread_id, + "checkpoint_ns": checkpoint_ns, + } + } + + listed_checkpoints = [] + async for checkpoint_tuple in async_saver.alist(config): + listed_checkpoints.append( + checkpoint_tuple.config["configurable"]["checkpoint_id"] + ) + + # Verify we got all checkpoints + assert ( + len(listed_checkpoints) == 5 + ), f"Expected 5 checkpoints, got {len(listed_checkpoints)}" + + # Verify they are sorted in descending order (most recent first) + # Since we created them in chronological order, the last one created should be first + # checkpoint_ids[4] should appear first, then checkpoint_ids[3], etc. + expected_order = checkpoint_ids[::-1] # Reverse the list + + assert listed_checkpoints == expected_order, ( + f"Checkpoints are not sorted in descending order by checkpoint ID.\n" + f"Expected: {expected_order}\n" + f"Got: {listed_checkpoints}" + ) + + +@pytest.mark.asyncio +async def test_alist_sorts_multiple_threads(async_saver: AsyncRedisSaver) -> None: + """Test that alist sorts correctly when filtering by thread_id.""" + # Create checkpoints for two different threads + thread1_ids = [] + thread2_ids = [] + + # Thread 1: Create 3 checkpoints + for i in range(3): + checkpoint_id = str(ULID()) + thread1_ids.append(checkpoint_id) + + config: RunnableConfig = { + "configurable": { + "thread_id": "thread-1-sort", + "checkpoint_id": checkpoint_id, + "checkpoint_ns": "", + } + } + + checkpoint: Checkpoint = empty_checkpoint() + checkpoint["id"] = checkpoint_id + + metadata: CheckpointMetadata = {"source": "test", "step": i, "writes": {}} + await async_saver.aput(config, checkpoint, metadata, {}) + await asyncio.sleep(0.01) + + # Thread 2: Create 3 checkpoints (interleaved with thread 1) + for i in range(3): + checkpoint_id = str(ULID()) + thread2_ids.append(checkpoint_id) + + config: RunnableConfig = { + "configurable": { + "thread_id": "thread-2-sort", + "checkpoint_id": checkpoint_id, + "checkpoint_ns": "", + } + } + + checkpoint: Checkpoint = empty_checkpoint() + checkpoint["id"] = checkpoint_id + + metadata: CheckpointMetadata = {"source": "test", "step": i, "writes": {}} + await async_saver.aput(config, checkpoint, metadata, {}) + await asyncio.sleep(0.01) + + # List checkpoints for thread 1 + config1: RunnableConfig = { + "configurable": { + "thread_id": "thread-1-sort", + "checkpoint_ns": "", + } + } + + thread1_listed = [] + async for checkpoint_tuple in async_saver.alist(config1): + thread1_listed.append(checkpoint_tuple.config["configurable"]["checkpoint_id"]) + + # Verify thread 1 checkpoints are in descending order + assert thread1_listed == thread1_ids[::-1], ( + f"Thread 1 checkpoints not sorted correctly.\n" + f"Expected: {thread1_ids[::-1]}\n" + f"Got: {thread1_listed}" + ) + + # List checkpoints for thread 2 + config2: RunnableConfig = { + "configurable": { + "thread_id": "thread-2-sort", + "checkpoint_ns": "", + } + } + + thread2_listed = [] + async for checkpoint_tuple in async_saver.alist(config2): + thread2_listed.append(checkpoint_tuple.config["configurable"]["checkpoint_id"]) + + # Verify thread 2 checkpoints are in descending order + assert thread2_listed == thread2_ids[::-1], ( + f"Thread 2 checkpoints not sorted correctly.\n" + f"Expected: {thread2_ids[::-1]}\n" + f"Got: {thread2_listed}" + ) + + +def test_list_sorts_by_checkpoint_id_desc(sync_saver: RedisSaver) -> None: + """Test that list (sync) returns checkpoints sorted by checkpoint ID in descending order. + + This is a sync version of the test for issue #106. + """ + thread_id = "test-thread-sort-sync" + checkpoint_ns = "" + + # Create multiple checkpoints with increasing timestamps + checkpoint_ids = [] + + # Create 5 checkpoints with small delays between them to ensure different timestamps + for i in range(5): + # Create a checkpoint with a unique ULID + checkpoint_id = str(ULID()) + checkpoint_ids.append(checkpoint_id) + + config: RunnableConfig = { + "configurable": { + "thread_id": thread_id, + "checkpoint_id": checkpoint_id, + "checkpoint_ns": checkpoint_ns, + } + } + + checkpoint: Checkpoint = empty_checkpoint() + checkpoint["id"] = checkpoint_id + + metadata: CheckpointMetadata = { + "source": "test", + "step": i, + "writes": {}, + } + + sync_saver.put(config, checkpoint, metadata, {}) + + # Small delay to ensure different ULID timestamps + time.sleep(0.01) + + # Now list all checkpoints for this thread + config: RunnableConfig = { + "configurable": { + "thread_id": thread_id, + "checkpoint_ns": checkpoint_ns, + } + } + + listed_checkpoints = [] + for checkpoint_tuple in sync_saver.list(config): + listed_checkpoints.append( + checkpoint_tuple.config["configurable"]["checkpoint_id"] + ) + + # Verify we got all checkpoints + assert ( + len(listed_checkpoints) == 5 + ), f"Expected 5 checkpoints, got {len(listed_checkpoints)}" + + # Verify they are sorted in descending order (most recent first) + expected_order = checkpoint_ids[::-1] # Reverse the list + + assert listed_checkpoints == expected_order, ( + f"Checkpoints are not sorted in descending order by checkpoint ID.\n" + f"Expected: {expected_order}\n" + f"Got: {listed_checkpoints}" + ) diff --git a/tests/test_streaming.py b/tests/test_streaming.py index 918b14d..4849ef8 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -65,9 +65,10 @@ def test_streaming_values_with_redis_checkpointer(graph_with_redis_checkpointer) assert len(results) == 11 # 5 iterations x 2 nodes + initial state # Check state history from the checkpointer + # Note: states are now sorted DESC (newest first), so states[0] is the most recent states = list(graph_with_redis_checkpointer.get_state_history(thread_config)) assert len(states) > 0 - final_state = states[-1] + final_state = states[0] # First item is now the most recent (DESC order) assert final_state.values["counter"] == 5 assert len(final_state.values["values"]) == 5 @@ -97,9 +98,10 @@ def test_streaming_updates_with_redis_checkpointer(graph_with_redis_checkpointer assert "values" in update["values_node"] # Check state history from the checkpointer + # Note: states are now sorted DESC (newest first), so states[0] is the most recent states = list(graph_with_redis_checkpointer.get_state_history(thread_config)) assert len(states) > 0 - final_state = states[-1] + final_state = states[0] # First item is now the most recent (DESC order) assert final_state.values["counter"] == 5 assert len(final_state.values["values"]) == 5