-
Notifications
You must be signed in to change notification settings - Fork 3.8k
Description
Checked other resources
- This is a bug, not a usage question. For questions, please use the LangChain Forum (https://forum.langchain.com/).
- I added a clear and detailed title that summarizes the issue.
- I read what a minimal reproducible example is (https://stackoverflow.com/help/minimal-reproducible-example).
- I included a self-contained, minimal example that demonstrates the issue INCLUDING all the relevant imports. The code run AS IS to reproduce the issue.
Example Code
import operator
from typing import Annotated, TypedDict, List, Optional
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.postgres import PostgresSaver
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
from langgraph.checkpoint.memory import MemorySaver
from langgraph.constants import Send
class OverallState(TypedDict):
topics: List[str] = ["fine arts", 'firefighters', 'maths']
jokes: Annotated[List[str], operator.add]
all_jokes: str
default_state = OverallState(topics=["fine arts", 'firefighters', 'maths'],
jokes=[], all_jokes="")
class JokeState(TypedDict):
topic: str
def send_topics(state: OverallState):
print("Sending joke creation")
return [Send("create_joke", JokeState(topic=topic)) for topic in state["topics"]]
def create_joke(joke_state: JokeState) -> dict:
return {"jokes": [f'Joke for topic {joke_state["topic"]}']}
def reduce_jokes(state: OverallState) -> dict:
print("Merge jokes")
return {"all_jokes": "\n".join(state['jokes'])}
def main():
print("Creating graph with...")
# Create graph
workflow = StateGraph(OverallState)
workflow.add_node("create_joke", create_joke)
workflow.add_node("reduce_jokes", reduce_jokes)
workflow.add_conditional_edges(START, send_topics, {"create_joke"})
workflow.add_edge("create_joke", "reduce_jokes")
workflow.add_edge("reduce_jokes", END)
# Try with Memory checkpointing (this should work)
print("\nTrying with Memory checkpointing...")
try:
checkpointer = MemorySaver()
app = workflow.compile(checkpointer=checkpointer)
print("Running graph with Memory checkpointing...")
result = app.invoke(default_state, config={'configurable': {'thread_id': "1"}})
print(f"Result: {result}")
except Exception as e:
print(f"Error with Memory checkpointing: {e}")
raise e
# Try with Postgres checkpointing (this should fail)
try:
with PostgresSaver.from_conn_string(
"postgresql://test_user:test_password@postgres:9999/test_db") as checkpointer:
checkpointer.setup()
app = workflow.compile(checkpointer=checkpointer)
print("\n\nRunning graph with Postgres checkpointing...")
result = app.invoke(default_state, config={'configurable': {'thread_id': "1"}})
print(f"Result: {result}")
except Exception as e:
print(f"Error with Postgres checkpointing: {e}")
raise e
if __name__ == "__main__":
main()Error Message and Stack Trace (if applicable)
TypeError: Object of type Send is not JSON serializable
Traceback (most recent call last):
File "/app/app.py", line 75, in <module>
main()
File "/app/app.py", line 72, in main
raise e
File "/app/app.py", line 67, in main
result = app.invoke(default_state, config={'configurable': {'thread_id': "1"}})
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/langgraph/pregel/__init__.py", line 1927, in invoke
for chunk in self.stream(
File "/usr/local/lib/python3.11/site-packages/langgraph/pregel/__init__.py", line 1596, in stream
with SyncPregelLoop(
File "/usr/local/lib/python3.11/site-packages/langgraph/pregel/loop.py", line 874, in __exit__
return self.stack.__exit__(exc_type, exc_value, traceback)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/contextlib.py", line 601, in __exit__
raise exc_details[1]
File "/usr/local/lib/python3.11/contextlib.py", line 586, in __exit__
if cb(*exc_details):
^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/langgraph/pregel/executor.py", line 107, in __exit__
task.result()
File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 449, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
raise self._exception
File "/usr/local/lib/python3.11/site-packages/langgraph/pregel/executor.py", line 70, in done
task.result()
File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 449, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
raise self._exception
File "/usr/local/lib/python3.11/concurrent/futures/thread.py", line 58, in run
result = self.fn(*self.args, **self.kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/langgraph/pregel/loop.py", line 799, in _checkpointer_put_after_previous
prev.result()
File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 456, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
raise self._exception
File "/usr/local/lib/python3.11/site-packages/langgraph/pregel/executor.py", line 70, in done
task.result()
File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 449, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
raise self._exception
File "/usr/local/lib/python3.11/concurrent/futures/thread.py", line 58, in run
result = self.fn(*self.args, **self.kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/langgraph/pregel/loop.py", line 799, in _checkpointer_put_after_previous
prev.result()
File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 456, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
raise self._exception
File "/usr/local/lib/python3.11/site-packages/langgraph/pregel/executor.py", line 70, in done
task.result()
File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 449, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
raise self._exception
File "/usr/local/lib/python3.11/concurrent/futures/thread.py", line 58, in run
result = self.fn(*self.args, **self.kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/langgraph/pregel/loop.py", line 801, in _checkpointer_put_after_previous
cast(BaseCheckpointSaver, self.checkpointer).put(
File "/usr/local/lib/python3.11/site-packages/langgraph/checkpoint/postgres/__init__.py", line 321, in put
cur.execute(
File "/usr/local/lib/python3.11/site-packages/psycopg/cursor.py", line 93, in execute
self._conn.wait(
File "/usr/local/lib/python3.11/site-packages/psycopg/connection.py", line 453, in wait
return waiting.wait(gen, self.pgconn.socket, interval=interval)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "psycopg_binary/_psycopg/waiting.pyx", line 219, in psycopg_binary._psycopg.wait_c
File "/usr/local/lib/python3.11/site-packages/psycopg/_cursor_base.py", line 194, in _execute_gen
pgq = self._convert_query(query, params)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/psycopg/_cursor_base.py", line 453, in _convert_query
pgq.convert(query, params)
File "/usr/local/lib/python3.11/site-packages/psycopg/_queries.py", line 95, in convert
self.dump(vars)
File "/usr/local/lib/python3.11/site-packages/psycopg/_queries.py", line 106, in dump
self.params = self._tx.dump_sequence(params, self._want_formats)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "psycopg_binary/_psycopg/transform.pyx", line 353, in psycopg_binary._psycopg.Transformer.dump_sequence
File "psycopg_binary/_psycopg/transform.pyx", line 404, in psycopg_binary._psycopg.Transformer.dump_sequence
File "/usr/local/lib/python3.11/site-packages/psycopg/types/json.py", line 218, in dump
if isinstance((data := dumps(obj)), str):
^^^^^^^^^^
File "/usr/local/lib/python3.11/json/__init__.py", line 231, in dumps
return _default_encoder.encode(obj)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/json/encoder.py", line 200, in encode
chunks = self.iterencode(o, _one_shot=True)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/json/encoder.py", line 258, in iterencode
return _iterencode(o, 0)
^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/json/encoder.py", line 180, in default
raise TypeError(f'Object of type {o.__class__.__name__} '
TypeError: Object of type Send is not JSON serializableDescription
Recently I wanted to persist the checkpoint data of my graph. I imported PostgresSaver (actually AsyncPosgresSaver) and figured out how to call this from the documentation: https://docs.langchain.com/oss/python/langgraph/add-memory#example-using-postgres-checkpointer.
However, I found that my implementation using Send as specified in many Map-Reduce implementations using LangGraph (e.g. https://www.youtube.com/watch?v=JQznvlSatPQ) raises a TypeError due to the Send class not being JSON serializable.
I found some other issues regarding serialization, but none are specific to this scenario.
The code shows that whereas with MemorySaver it works, with the database and PostgresSaver there is a need to serialize data, which turns out to contain the Send.
The output shown at the termina when I perform docker compose up for the app container is:
Creating graph with...
Trying with Memory checkpointing...
Running graph with Memory checkpointing...
Sending joke creation
Merge jokes
Result: {'topics': ['fine arts', 'firefighters', 'maths'], 'jokes': ['Joke for topic fine arts', 'Joke for topic firefighters', 'Joke for topic maths'], 'all_jokes': 'Joke for topic fine arts\nJoke for topic firefighters\nJoke for topic maths'}
Running graph with Postgres checkpointing...
Sending joke creation
Merge jokes
Error with Postgres checkpointing: Object of type Send is not JSON serializableSystem Info
I am using dockercompose:
services:
postgres:
image: postgres
restart: always
environment:
- POSTGRES_USER=test_user
- POSTGRES_PASSWORD=test_password
- POSTGRES_DB=test_db
- PGPORT=9999
ports:
- 9999:9999
volumes:
- postgres_data:/var/lib/postgresql/data
app:
build: .
depends_on:
- postgres
environment:
- POSTGRES_HOST=postgres
- POSTGRES_PORT=9999
- POSTGRES_DB=test_db
- POSTGRES_USER=test_user
- POSTGRES_PASSWORD=test_password
volumes:
- .:/app/
volumes:
postgres_data:Where the Dockerfile for app is:
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
CMD ["python", "app.py"]The requirements:
langchain==0.3.9
langchain-community==0.3.8
langchain-core==0.3.21
langchain-experimental==0.3.3
langgraph==0.2.53
langgraph-checkpoint==2.1.2
langgraph-checkpoint-postgres==3.0.1
langgraph-sdk==0.1.40
psycopg[binary,pool]
psycopg==3.2.12
psycopg-pool==3.2.7
pydantic==2.10.2