Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 36 additions & 10 deletions libs/checkpoint-redis/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ export class RedisSaver extends BaseCheckpointSaver {
jsonDoc
);

return this.createCheckpointTuple(jsonDoc, checkpoint, pendingWrites);
return await this.createCheckpointTuple(jsonDoc, checkpoint, pendingWrites);
}

async put(
Expand Down Expand Up @@ -388,7 +388,11 @@ export class RedisSaver extends BaseCheckpointSaver {
// Load checkpoint with pending writes and migrate sends
const { checkpoint, pendingWrites } =
await this.loadCheckpointWithWrites(jsonDoc);
yield this.createCheckpointTuple(jsonDoc, checkpoint, pendingWrites);
yield await this.createCheckpointTuple(
jsonDoc,
checkpoint,
pendingWrites
);
yieldedCount++;
}

Expand Down Expand Up @@ -465,7 +469,7 @@ export class RedisSaver extends BaseCheckpointSaver {
// Load checkpoint with pending writes and migrate sends
const { checkpoint, pendingWrites } =
await this.loadCheckpointWithWrites(jsonDoc);
yield this.createCheckpointTuple(
yield await this.createCheckpointTuple(
jsonDoc,
checkpoint,
pendingWrites
Expand Down Expand Up @@ -555,7 +559,11 @@ export class RedisSaver extends BaseCheckpointSaver {
// Load checkpoint with pending writes and migrate sends
const { checkpoint, pendingWrites } =
await this.loadCheckpointWithWrites(jsonDoc);
yield this.createCheckpointTuple(jsonDoc, checkpoint, pendingWrites);
yield await this.createCheckpointTuple(
jsonDoc,
checkpoint,
pendingWrites
);
yieldedCount++;
}
}
Expand Down Expand Up @@ -718,7 +726,16 @@ export class RedisSaver extends BaseCheckpointSaver {

const pendingWrites: Array<[string, string, any]> = [];
for (const writeDoc of writeDocuments) {
pendingWrites.push([writeDoc.task_id, writeDoc.channel, writeDoc.value]);
// Deserialize write value using serde to restore LangChain objects
const deserializedValue = await this.serde.loadsTyped(
"json",
JSON.stringify(writeDoc.value)
);
pendingWrites.push([
writeDoc.task_id,
writeDoc.channel,
deserializedValue,
]);
}

return pendingWrites;
Expand All @@ -729,8 +746,11 @@ export class RedisSaver extends BaseCheckpointSaver {
checkpoint: Checkpoint;
pendingWrites?: Array<[string, string, any]>;
}> {
// Load checkpoint directly from JSON
const checkpoint = { ...jsonDoc.checkpoint };
// Deserialize checkpoint using serde to restore LangChain objects
const checkpoint: Checkpoint = await this.serde.loadsTyped(
"json",
JSON.stringify(jsonDoc.checkpoint)
);

// Migrate pending sends ONLY for OLD checkpoint versions (v < 4) with parents
// Modern checkpoints (v >= 4) should NEVER have pending sends migrated
Expand Down Expand Up @@ -805,15 +825,21 @@ export class RedisSaver extends BaseCheckpointSaver {
}

// Helper method to create checkpoint tuple from json document
private createCheckpointTuple(
private async createCheckpointTuple(
jsonDoc: any,
checkpoint: Checkpoint,
pendingWrites?: Array<[string, string, any]>
): CheckpointTuple {
): Promise<CheckpointTuple> {
// Convert back from "__empty__" to empty string
const checkpointNs =
jsonDoc.checkpoint_ns === "__empty__" ? "" : jsonDoc.checkpoint_ns;

// Deserialize metadata using serde
const metadata = (await this.serde.loadsTyped(
"json",
JSON.stringify(jsonDoc.metadata)
)) as CheckpointMetadata;

return {
config: {
configurable: {
Expand All @@ -823,7 +849,7 @@ export class RedisSaver extends BaseCheckpointSaver {
},
},
checkpoint,
metadata: jsonDoc.metadata,
metadata,
parentConfig: jsonDoc.parent_checkpoint_id
? {
configurable: {
Expand Down
51 changes: 31 additions & 20 deletions libs/checkpoint-redis/src/shallow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -212,11 +212,11 @@ export class ShallowRedisSaver extends BaseCheckpointSaver {
await this.applyTTL(key);
}

// Channel values are stored inline in shallow mode
const checkpoint = {
...jsonDoc.checkpoint,
channel_values: jsonDoc.checkpoint.channel_values || {},
};
// Deserialize checkpoint using serde to restore LangChain objects
const checkpoint: Checkpoint = await this.serde.loadsTyped(
"json",
JSON.stringify(jsonDoc.checkpoint)
);

// Load pending writes if they exist
let pendingWrites: Array<[string, string, any]> | undefined;
Expand All @@ -228,7 +228,7 @@ export class ShallowRedisSaver extends BaseCheckpointSaver {
);
}

return this.createCheckpointTuple(jsonDoc, checkpoint, pendingWrites);
return await this.createCheckpointTuple(jsonDoc, checkpoint, pendingWrites);
}

async *list(
Expand Down Expand Up @@ -309,12 +309,12 @@ export class ShallowRedisSaver extends BaseCheckpointSaver {
}

// Channel values are inline in shallow mode
const checkpoint = {
...jsonDoc.checkpoint,
channel_values: jsonDoc.checkpoint.channel_values || {},
};
const checkpoint: Checkpoint = await this.serde.loadsTyped(
"json",
JSON.stringify(jsonDoc.checkpoint)
);

yield this.createCheckpointTuple(jsonDoc, checkpoint);
yield await this.createCheckpointTuple(jsonDoc, checkpoint);
yieldCount++;
}
} catch (error: any) {
Expand Down Expand Up @@ -359,12 +359,12 @@ export class ShallowRedisSaver extends BaseCheckpointSaver {
}

// Channel values are inline in shallow mode
const checkpoint = {
...jsonDoc.checkpoint,
channel_values: jsonDoc.checkpoint.channel_values || {},
};
const checkpoint: Checkpoint = await this.serde.loadsTyped(
"json",
JSON.stringify(jsonDoc.checkpoint)
);

yield this.createCheckpointTuple(jsonDoc, checkpoint);
yield await this.createCheckpointTuple(jsonDoc, checkpoint);
yieldCount++;
}
return;
Expand Down Expand Up @@ -507,11 +507,17 @@ export class ShallowRedisSaver extends BaseCheckpointSaver {
}

// Helper method to create checkpoint tuple from json document
private createCheckpointTuple(
private async createCheckpointTuple(
jsonDoc: any,
checkpoint: Checkpoint,
pendingWrites?: Array<[string, string, any]>
): CheckpointTuple {
): Promise<CheckpointTuple> {
// Deserialize metadata using serde
const metadata = (await this.serde.loadsTyped(
"json",
JSON.stringify(jsonDoc.metadata)
)) as CheckpointMetadata;

return {
config: {
configurable: {
Expand All @@ -521,7 +527,7 @@ export class ShallowRedisSaver extends BaseCheckpointSaver {
},
},
checkpoint,
metadata: jsonDoc.metadata,
metadata,
parentConfig: jsonDoc.parent_checkpoint_id
? {
configurable: {
Expand Down Expand Up @@ -572,10 +578,15 @@ export class ShallowRedisSaver extends BaseCheckpointSaver {
for (const writeKey of writeKeys) {
const writeDoc = await this.client.json.get(writeKey);
if (writeDoc) {
// Deserialize write value using serde to restore LangChain objects
const deserializedValue = await this.serde.loadsTyped(
"json",
JSON.stringify(writeDoc.value)
);
pendingWrites.push([
writeDoc.task_id,
writeDoc.channel,
writeDoc.value,
deserializedValue,
]);
}
}
Expand Down