Skip to content

Commit 064de11

Browse files
committed
Switch README to recipe style.
1 parent ef3f7a4 commit 064de11

File tree

1 file changed

+160
-109
lines changed
  • foundations/claim_check_pattern_python

1 file changed

+160
-109
lines changed
Lines changed: 160 additions & 109 deletions
Original file line numberDiff line numberDiff line change
@@ -1,171 +1,222 @@
1+
<!--
2+
description: Use the Claim Check pattern to keep large payloads out of Temporal Event History by storing them in Redis and referencing them with keys, with optional codec server support for a better Web UI experience.
3+
tags:[foundations, claim-check, python, redis]
4+
priority: 999
5+
-->
6+
17
# Claim Check Pattern with Temporal
28

3-
This recipe demonstrates how to use the Claim Check pattern to offload data from Temporal Server's Event History to external storage. This can be useful in conversational AI applications that include the full conversation history with each LLM call, creating large Event History that can exceed server size limits.
9+
The Claim Check pattern enables efficient handling of large payloads by storing them externally and passing only keys through Temporal workflows and activities. This keeps Temporal Event History small while preserving transparent access to full data via a codec.
410

5-
## What is the Claim Check Pattern?
11+
This recipe includes:
612

7-
Each Temporal Workflow has an associated Event History that is stored in Temporal Server and used to provide durable execution. When using the Claim Check pattern, we store the payload content of the Event in separate storage system, then store a reference to that storage in the Temporal Event History instead.
13+
- A `PayloadCodec` that stores large payloads in Redis and replaces them with keys
14+
- A client plugin that wires the codec into the Temporal data converter
15+
- A lightweight codec server for a better Web UI experience
16+
- An AI/RAG example workflow that demonstrates the pattern end-to-end
817

9-
That is, we:
18+
## How the Claim Check Pattern Works
1019

11-
1. Store large payloads in external storage (Redis, S3, etc.)
12-
2. Replace the payload with a unique key
13-
3. Automatically retrieve the original payload when needed
20+
The Claim Check pattern implements a `PayloadCodec` that:
1421

15-
This is implemented as a `PayloadCodec` that operates transparently - your workflows don't need to know about the claim check mechanism.
22+
1. Encode: Replaces large payloads with unique keys and stores the original data in external storage (Redis, S3, etc.)
23+
2. Decode: Retrieves the original payload using the key when needed
1624

17-
## Prerequisites
25+
Workflows operate with small, lightweight keys while maintaining transparent access to full data through automatic encoding/decoding.
1826

19-
- **Redis Server**: Required for external storage of large payloads
20-
- **Temporal Server**: Required for workflow execution
21-
- **Python 3.9+**: Required for running the code
27+
## Claim Check Codec Implementation
2228

23-
## Running the Example
29+
The `ClaimCheckCodec` implements `PayloadCodec` and adds an inline threshold to keep small payloads inline for debuggability.
2430

25-
1. Start Redis server:
26-
```bash
27-
redis-server
28-
```
29-
30-
2. Start the Temporal Dev Server:
31-
```bash
32-
temporal server start-dev
33-
```
34-
35-
3. Run the worker:
36-
```bash
37-
uv run python -m worker
38-
```
39-
40-
4. Start execution:
41-
```bash
42-
uv run python -m start_workflow
43-
```
31+
*File: claim_check_codec.py*
4432

45-
## Configuration
33+
```python
34+
class ClaimCheckCodec(PayloadCodec):
35+
def __init__(self, redis_host: str = "localhost", redis_port: int = 6379, max_inline_bytes: int = 20 * 1024):
36+
self.redis_client = redis.Redis(host=redis_host, port=redis_port)
37+
self.max_inline_bytes = max_inline_bytes
4638

47-
The example uses Redis for external storage. You can configure the Redis connection with environment variables:
39+
async def encode(self, payloads: Iterable[Payload]) -> List[Payload]:
40+
out: List[Payload] = []
41+
for payload in payloads:
42+
if len(payload.data or b"") <= self.max_inline_bytes:
43+
out.append(payload)
44+
continue
45+
out.append(await self.encode_payload(payload))
46+
return out
4847

49-
```bash
50-
export REDIS_HOST=localhost
51-
export REDIS_PORT=6379
48+
async def decode(self, payloads: Iterable[Payload]) -> List[Payload]:
49+
out: List[Payload] = []
50+
for payload in payloads:
51+
if payload.metadata.get("temporal.io/claim-check-codec", b"").decode() != "v1":
52+
out.append(payload)
53+
continue
54+
redis_key = payload.data.decode("utf-8")
55+
stored_data = await self.redis_client.get(redis_key)
56+
if stored_data is None:
57+
raise ValueError(f"Claim check key not found in Redis: {redis_key}")
58+
out.append(Payload.FromString(stored_data))
59+
return out
5260
```
5361

54-
### Inline payload threshold (skip claim check for small payloads)
55-
56-
By default, payloads that are small enough are kept inline to improve debuggability and avoid unnecessary indirection. This example sets the inline threshold to 20KB. Any payload larger than 20KB will be claim-checked and stored in Redis; payloads at or below 20KB remain inline.
62+
### Inline payload threshold
5763

5864
- Default: 20KB
5965
- Where configured: `ClaimCheckCodec(max_inline_bytes=20 * 1024)` in `claim_check_codec.py`
60-
- How to change: pass a different `max_inline_bytes` when constructing `ClaimCheckCodec` (e.g., in your client/plugin wiring)
66+
- Change by passing a different `max_inline_bytes` when constructing `ClaimCheckCodec`
6167

62-
## Key Components
68+
## Claim Check Plugin
6369

64-
- `claim_check_codec.py`: Implements the PayloadCodec for claim check functionality
65-
- `claim_check_plugin.py`: Temporal plugin that integrates the codec
66-
- `codec_server.py`: Lightweight codec server for Web UI integration
67-
- `activities/`: Activities that demonstrate large data processing:
68-
- `transform_large_dataset`: Transforms large input into large output
69-
- `generate_summary`: Takes large input and produces small summary
70-
- `workflows/`: Workflows that demonstrate the pattern
71-
- `worker.py`: Temporal worker with claim check plugin
72-
- `start_workflow.py`: Example workflow execution
70+
The `ClaimCheckPlugin` integrates the codec with the Temporal client configuration and supports plugin chaining.
7371

74-
## AI / RAG Example using Claim Check
72+
*File: claim_check_plugin.py*
7573

76-
This example also includes a simple Retrieval-Augmented Generation (RAG) flow that ingests a large text (a public-domain book), creates embeddings, and answers a question while keeping large intermediates (chunks, embeddings) out of Temporal payloads via the Claim Check codec. Only the small final answer is returned inline.
74+
```python
75+
class ClaimCheckPlugin(Plugin):
76+
def __init__(self):
77+
self.redis_host = os.getenv("REDIS_HOST", "localhost")
78+
self.redis_port = int(os.getenv("REDIS_PORT", "6379"))
79+
self._next_plugin = None
7780

78-
### Files
81+
def init_client_plugin(self, next_plugin: Plugin) -> None:
82+
self._next_plugin = next_plugin
7983

80-
- `activities/ai_claim_check.py`: Activities `ingest_document` and `rag_answer` using OpenAI.
81-
- `workflows/ai_rag_workflow.py`: Orchestrates ingestion then question answering.
82-
- `start_workflow.py`: Starter that downloads a public-domain text if missing and asks a question.
84+
def configure_client(self, config: ClientConfig) -> ClientConfig:
85+
default_converter_class = config["data_converter"].payload_converter_class
86+
claim_check_codec = ClaimCheckCodec(self.redis_host, self.redis_port)
87+
config["data_converter"] = DataConverter(
88+
payload_converter_class=default_converter_class,
89+
payload_codec=claim_check_codec,
90+
)
91+
return self._next_plugin.configure_client(config) if self._next_plugin else config
92+
```
8393

84-
### Requirements
94+
## Example: AI / RAG Workflow using Claim Check
95+
96+
This example ingests a large text, performs lightweight lexical retrieval, and answers a question with an LLM. Large intermediates (chunks, scores) are kept out of Temporal payloads via the Claim Check codec. Only the small final answer is returned inline.
97+
98+
### Activities
99+
100+
*File: activities/ai_claim_check.py*
101+
102+
```python
103+
@activity.defn
104+
async def ingest_document(req: IngestRequest) -> IngestResult:
105+
text = req.document_bytes.decode("utf-8", errors="ignore")
106+
chunks = _split_text(text, req.chunk_size, req.chunk_overlap)
107+
return IngestResult(chunk_texts=chunks, metadata={"filename": req.filename, "mime_type": req.mime_type, "chunk_count": len(chunks)})
108+
109+
@activity.defn
110+
async def rag_answer(req: RagRequest, ingest_result: IngestResult) -> RagAnswer:
111+
tokenized_corpus = [chunk.split() for chunk in ingest_result.chunk_texts]
112+
bm25 = BM25Okapi(tokenized_corpus)
113+
tokenized_query = req.question.split()
114+
scores = bm25.get_scores(tokenized_query)
115+
top_indices = sorted(range(len(scores)), key=lambda i: scores[i], reverse=True)[: max(1, req.top_k)]
116+
contexts = [ingest_result.chunk_texts[i] for i in top_indices]
117+
chat = await AsyncOpenAI(max_retries=0).chat.completions.create(
118+
model=req.generation_model,
119+
messages=[{"role": "user", "content": "..."}],
120+
temperature=0.2,
121+
)
122+
return RagAnswer(answer=chat.choices[0].message.content.strip(), sources=[{"chunk_index": i, "score": float(scores[i])} for i in top_indices])
123+
```
85124

86-
- Set `OPENAI_API_KEY` for embeddings and chat generation.
87-
- Redis and Temporal dev server running (same as the main example).
88-
- Internet access for the first run to download the text from Project Gutenberg (`https://www.gutenberg.org/ebooks/100.txt.utf-8`).
125+
### Workflow
126+
127+
*File: workflows/ai_rag_workflow.py*
128+
129+
```python
130+
@workflow.defn
131+
class AiRagWorkflow:
132+
@workflow.run
133+
async def run(self, document_bytes: bytes, filename: str, mime_type: str, question: str) -> RagAnswer:
134+
ingest = await workflow.execute_activity(
135+
ingest_document,
136+
IngestRequest(document_bytes=document_bytes, filename=filename, mime_type=mime_type),
137+
start_to_close_timeout=timedelta(minutes=10),
138+
summary="Ingest and embed large document",
139+
)
140+
answer = await workflow.execute_activity(
141+
rag_answer,
142+
args=[RagRequest(question=question), ingest],
143+
start_to_close_timeout=timedelta(minutes=5),
144+
summary="RAG answer using embedded chunks",
145+
)
146+
return answer
147+
```
148+
149+
## Configuration
89150

90-
### Run
151+
Set environment variables to configure Redis and OpenAI:
91152

92-
1. Export your API key:
93153
```bash
154+
export REDIS_HOST=localhost
155+
export REDIS_PORT=6379
94156
export OPENAI_API_KEY=your_key_here
95157
```
96-
2. Start the worker (claim check enabled by default):
158+
159+
## Prerequisites
160+
161+
- Redis server
162+
- Temporal dev server
163+
- Python 3.9+
164+
165+
## Running
166+
167+
1. Start Redis:
97168
```bash
98-
uv run python -m worker
169+
redis-server
99170
```
100-
3. Start the AI/RAG workflow (first run will download the text):
171+
172+
2. Start Temporal dev server:
101173
```bash
102-
uv run python -m start_workflow
174+
temporal server start-dev
103175
```
104176

105-
### Toggle Claim Check (optional)
106-
107-
To demonstrate payload size failures without claim check, disable it with an environment variable:
108-
177+
3. Run the worker:
109178
```bash
110-
export CLAIM_CHECK_ENABLED=false
111179
uv run python -m worker
180+
```
181+
182+
4. Start execution:
183+
```bash
112184
uv run python -m start_workflow
113185
```
114186

115-
With claim check disabled, large payloads (e.g., the Shakespeare text or large intermediates) may exceed Temporal's default payload size limits and fail. Re-enable by unsetting or setting `CLAIM_CHECK_ENABLED=true`.
187+
### Toggle Claim Check (optional)
116188

117-
The starter downloads “The Complete Works of William Shakespeare” from Project Gutenberg [link](https://www.gutenberg.org/ebooks/100.txt.utf-8) on first run and saves it under `assets/shakespeare_complete.txt` (~5.1MB). This exceeds Temporal’s default payload size (2MB), making it a good demonstration for the claim check pattern. Large intermediates (chunked text and embeddings) will be claim-checked automatically (payloads > 20KB stored in Redis). The final `RagAnswer` is small and remains inline for easy inspection in the Web UI.
189+
To demonstrate payload size failures without claim check, you can disable it in your local wiring (e.g., omit the plugin/codec) and re-run. With claim check disabled, large payloads may exceed Temporal's default payload size limits and fail.
118190

119-
## How It Works
191+
## Codec Server for Web UI
120192

121-
This example demonstrates the claim check pattern with a realistic data processing pipeline:
193+
When claim check is enabled, the Web UI would otherwise show opaque keys. This codec server shows helpful text with a link to view the raw data on demand.
122194

123-
1. **Large Workflow Input**: The workflow receives a large dataset from the client
124-
2. **Large Activity Input/Output**: The first activity transforms the large dataset, producing another large dataset
125-
3. **Large Activity Input, Small Output**: The second activity takes the transformed data and produces a compact summary
195+
### Running the Codec Server
126196

127-
This flow shows how the claim check pattern handles large payloads at multiple stages of processing, making it transparent to your workflow logic while avoiding Temporal's payload size limits.
197+
```bash
198+
uv run python -m codec_server
199+
```
128200

129-
## Codec Server for Web UI
201+
Then configure the Web UI to use the codec server. For `temporal server start-dev`, see the Temporal docs on configuring codec servers.
130202

131-
When using the Claim Check pattern, the Temporal Web UI will show encoded Redis keys instead of the actual payload data. This makes debugging and monitoring difficult since you can't see what data is being passed through your workflows.
203+
### What it shows
132204

133-
### The Problem
205+
Instead of raw keys:
134206

135-
Without a codec server, the Web UI displays raw claim check keys like:
136207
```
137208
abc123-def4-5678-9abc-def012345678
138209
```
139210

140-
This provides no context about what data is stored or how to access it, making workflow debugging and monitoring challenging.
211+
You will see text like:
141212

142-
### Our Solution: Lightweight Codec Server
143-
144-
The codec server provides helpful information without reading large payload data during Web UI operations.
145-
146-
Instead of raw keys, the Web UI displays:
147213
```
148214
"Claim check data (key: abc123-def4-5678-9abc-def012345678) - View at: http://localhost:8081/view/abc123-def4-5678-9abc-def012345678"
149215
```
150216

151-
This gives you the Redis key and a direct link to view the full payload data when needed.
152-
153-
### Running the Codec Server
154-
155-
1. Start the codec server:
156-
```bash
157-
uv run python -m codec_server
158-
```
159-
160-
2. Configure the Temporal Web UI to use the codec server. For `temporal server start-dev`, see the [Temporal documentation on configuring codec servers](https://docs.temporal.io/production-deployment/data-encryption#set-your-codec-server-endpoints-with-web-ui-and-cli) for the appropriate configuration method.
161-
162-
3. Access the Temporal Web UI and you'll see helpful summaries instead of raw keys.
163-
164-
### Configuration Details
165-
166-
The codec server implements the Temporal codec server protocol with two endpoints:
217+
### Endpoints
167218

168-
- **`/decode`**: Returns helpful text with Redis key and view URL
169-
- **`/view/{key}`**: Serves the raw payload data for inspection
219+
- `POST /decode`: Returns helpful text with Redis key and view URL (no data reads)
220+
- `GET /view/{key}`: Serves raw payload data for inspection
170221

171-
When you click the view URL, you'll see the complete payload data as stored in Redis, formatted appropriately for text or binary content.
222+
The server also includes CORS handling for the local Web UI.

0 commit comments

Comments
 (0)