Skip to content

Commit 79f45c7

Browse files
committed
Add claim check recipe.
Based on temporalio/ai-cookbook#9
1 parent af3cf48 commit 79f45c7

File tree

1 file changed

+383
-0
lines changed

1 file changed

+383
-0
lines changed
Lines changed: 383 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,383 @@
1+
---
2+
title: Claim Check Pattern with Temporal
3+
description: Use the Claim Check pattern to efficiently handle large payloads by storing them externally and passing only keys through Temporal.
4+
tags: [foundations, python, patterns]
5+
source: https://github.com/temporalio/ai-cookbook/tree/main/foundations/claim_check_pattern_python
6+
---
7+
8+
The Claim Check pattern enables efficient handling of large payloads by storing
9+
them externally and passing only keys through Temporal workflows and activities.
10+
11+
This pattern is particularly useful when:
12+
- Working with large files, images, or datasets
13+
- Processing bulk data operations
14+
- Handling payloads that exceed Temporal's size limits
15+
- Improving performance by reducing payload serialization overhead
16+
17+
## How the Claim Check Pattern Works
18+
19+
The Claim Check pattern implements a `PayloadCodec` that:
20+
21+
1. **Encode**: Replaces large payloads with unique keys and stores the original
22+
data in external storage (Redis, S3, etc.)
23+
2. **Decode**: Retrieves the original payload using the key when needed
24+
25+
This allows Temporal workflows to operate with small, lightweight keys instead
26+
of large payloads, while maintaining transparent access to the full data through
27+
automatic encoding/decoding.
28+
29+
## Claim Check Codec Implementation
30+
31+
The `ClaimCheckCodec` class implements the `PayloadCodec` interface to handle
32+
the encoding and decoding of payloads.
33+
34+
```python
35+
import uuid
36+
import redis.asyncio as redis
37+
from typing import Iterable, List
38+
39+
from temporalio.api.common.v1 import Payload
40+
from temporalio.converter import PayloadCodec
41+
42+
class ClaimCheckCodec(PayloadCodec):
43+
def __init__(self, redis_host: str = "localhost", redis_port: int = 6379):
44+
self.redis_client = redis.Redis(host=redis_host, port=redis_port)
45+
46+
async def encode(self, payloads: Iterable[Payload]) -> List[Payload]:
47+
"""Replace large payloads with keys and store original data in Redis."""
48+
out: List[Payload] = []
49+
for payload in payloads:
50+
encoded = await self.encode_payload(payload)
51+
out.append(encoded)
52+
return out
53+
54+
async def decode(self, payloads: Iterable[Payload]) -> List[Payload]:
55+
"""Retrieve original payloads from Redis using stored keys."""
56+
out: List[Payload] = []
57+
for payload in payloads:
58+
if payload.metadata.get("temporal.io/claim-check-codec", b"").decode() != "v1":
59+
# Not a claim-checked payload, pass through unchanged
60+
out.append(payload)
61+
continue
62+
63+
redis_key = payload.data.decode("utf-8")
64+
stored_data = await self.redis_client.get(redis_key)
65+
if stored_data is None:
66+
raise ValueError(f"Claim check key not found in Redis: {redis_key}")
67+
68+
original_payload = Payload.FromString(stored_data)
69+
out.append(original_payload)
70+
return out
71+
72+
async def encode_payload(self, payload: Payload) -> Payload:
73+
"""Store payload in Redis and return a key-based payload."""
74+
key = str(uuid.uuid4())
75+
serialized_data = payload.SerializeToString()
76+
77+
# Store the original payload data in Redis
78+
await self.redis_client.set(key, serialized_data)
79+
80+
# Return a lightweight payload containing only the key
81+
return Payload(
82+
metadata={
83+
"encoding": b"claim-checked",
84+
"temporal.io/claim-check-codec": b"v1",
85+
},
86+
data=key.encode("utf-8"),
87+
)
88+
```
89+
90+
## Claim Check Plugin
91+
92+
The `ClaimCheckPlugin` integrates the codec with Temporal's client configuration.
93+
94+
```python
95+
import os
96+
from temporalio.client import Plugin, ClientConfig
97+
from temporalio.converter import DataConverter
98+
99+
from claim_check_codec import ClaimCheckCodec
100+
101+
class ClaimCheckPlugin(Plugin):
102+
def __init__(self):
103+
self.redis_host = os.getenv("REDIS_HOST", "localhost")
104+
self.redis_port = int(os.getenv("REDIS_PORT", "6379"))
105+
106+
def get_data_converter(self, config: ClientConfig) -> DataConverter:
107+
"""Configure the data converter with claim check codec."""
108+
default_converter_class = config["data_converter"].payload_converter_class
109+
claim_check_codec = ClaimCheckCodec(self.redis_host, self.redis_port)
110+
111+
return DataConverter(
112+
payload_converter_class=default_converter_class,
113+
payload_codec=claim_check_codec
114+
)
115+
116+
def configure_client(self, config: ClientConfig) -> ClientConfig:
117+
"""Apply the claim check configuration to the client."""
118+
config["data_converter"] = self.get_data_converter(config)
119+
return super().configure_client(config)
120+
```
121+
122+
## Example: Large Data Processing Workflow
123+
124+
This example demonstrates processing large datasets using the Claim Check pattern.
125+
126+
```python
127+
from temporalio import workflow, activity
128+
from dataclasses import dataclass
129+
from typing import List
130+
import json
131+
132+
@dataclass
133+
class LargeDataset:
134+
"""Represents a large dataset that would benefit from claim check pattern."""
135+
data: List[dict]
136+
metadata: dict
137+
138+
@dataclass
139+
class ProcessingResult:
140+
"""Result of processing the large dataset."""
141+
processed_count: int
142+
summary: dict
143+
errors: List[str]
144+
145+
@activity.defn
146+
async def process_large_dataset(dataset: LargeDataset) -> ProcessingResult:
147+
"""Process a large dataset - this would normally cause payload size issues.
148+
149+
This activity demonstrates how the claim check pattern allows processing
150+
of large datasets without hitting Temporal's payload size limits.
151+
152+
Note: For production workloads processing very large datasets, consider adding
153+
activity heartbeats to prevent timeout issues during long-running processing.
154+
155+
Args:
156+
dataset: Large dataset to process
157+
158+
Returns:
159+
ProcessingResult with processing statistics and any errors
160+
"""
161+
processed_count = 0
162+
errors = []
163+
summary = {"total_items": len(dataset.data)}
164+
165+
for item in dataset.data:
166+
try:
167+
# Simulate processing each item
168+
if "value" in item:
169+
item["processed_value"] = item["value"] * 2
170+
processed_count += 1
171+
elif "text" in item:
172+
# Simulate text processing
173+
item["word_count"] = len(item["text"].split())
174+
processed_count += 1
175+
except Exception as e:
176+
errors.append(f"Error processing item {item.get('id', 'unknown')}: {str(e)}")
177+
178+
summary["processed_items"] = processed_count
179+
summary["error_count"] = len(errors)
180+
181+
return ProcessingResult(
182+
processed_count=processed_count,
183+
summary=summary,
184+
errors=errors
185+
)
186+
187+
@workflow.defn
188+
class LargeDataProcessingWorkflow:
189+
@workflow.run
190+
async def run(self, dataset: LargeDataset) -> ProcessingResult:
191+
"""Process large dataset using claim check pattern."""
192+
result = await workflow.execute_activity(
193+
process_large_dataset,
194+
dataset,
195+
start_to_close_timeout=timedelta(minutes=10),
196+
summary="Process large dataset"
197+
)
198+
return result
199+
```
200+
201+
## Configuration
202+
203+
Set environment variables to configure the Redis connection:
204+
205+
```bash
206+
# Configure Redis connection (optional - defaults to localhost:6379)
207+
export REDIS_HOST=localhost
208+
export REDIS_PORT=6379
209+
```
210+
211+
## Prerequisites
212+
213+
- **Redis Server**: Required for external storage of large payloads
214+
215+
## Running the Example
216+
217+
1. Start Redis server:
218+
```bash
219+
redis-server
220+
```
221+
222+
2. Start the Temporal Dev Server:
223+
```bash
224+
temporal server start-dev
225+
```
226+
227+
3. Run the worker:
228+
```bash
229+
uv run python -m worker
230+
```
231+
232+
4. Start execution:
233+
```bash
234+
uv run python -m start_workflow
235+
```
236+
237+
## Codec Server for Web UI
238+
239+
When using the Claim Check pattern, the Temporal Web UI will show encoded Redis
240+
keys instead of the actual payload data. This makes debugging and monitoring
241+
difficult since you can't see what data is being passed through your workflows.
242+
243+
### The Problem
244+
245+
Without a codec server, the Web UI displays raw claim check keys like:
246+
```
247+
abc123-def4-5678-9abc-def012345678
248+
```
249+
250+
This provides no context about what data is stored or how to access it, making
251+
workflow debugging and monitoring challenging.
252+
253+
### Our Solution: Lightweight Codec Server
254+
255+
We've designed a codec server that provides helpful information without the
256+
risks of reading large payload data:
257+
258+
#### Design Principles
259+
260+
1. **No Data Reading**: The codec server never reads the actual payload data during decode operations
261+
2. **On-Demand Access**: Full data is available via a separate endpoint when needed
262+
3. **Simple & Safe**: Just provides the Redis key and a link - no assumptions about data content
263+
4. **Performance First**: Zero impact on Web UI performance
264+
265+
#### What It Shows
266+
267+
Instead of raw keys, the Web UI displays:
268+
```
269+
"Claim check data (key: abc123-def4-5678-9abc-def012345678) - View at: http://localhost:8081/view/abc123-def4-5678-9abc-def012345678"
270+
```
271+
272+
This gives you:
273+
- **Clear identification**: You know this is claim check data
274+
- **Redis key**: The actual key used for storage
275+
- **Direct access**: Click the URL to view the full payload data
276+
277+
### Running the Codec Server
278+
279+
1. Start the codec server:
280+
```bash
281+
uv run python -m codec_server
282+
```
283+
284+
2. Configure the Temporal Web UI to use the codec server. For `temporal server
285+
start-dev`, see the [Temporal documentation on configuring codec
286+
servers](https://docs.temporal.io/production-deployment/data-encryption#set-your-codec-server-endpoints-with-web-ui-and-cli)
287+
for the appropriate configuration method.
288+
289+
3. Access the Temporal Web UI and you'll see helpful summaries instead of raw keys.
290+
291+
### Why This Approach?
292+
293+
#### Avoiding Common Pitfalls
294+
295+
**❌ What we DON'T do:**
296+
- Parse or analyze payload data (could be huge or malformed)
297+
- Attempt to summarize content (assumes data structure)
298+
- Read data during decode operations (performance impact)
299+
300+
**✅ What we DO:**
301+
- Provide the Redis key for manual inspection
302+
- Offer a direct link to view full data when needed
303+
- Keep the Web UI responsive with minimal information
304+
305+
#### Benefits
306+
307+
- **Performance**: No Redis calls during Web UI operations
308+
- **Safety**: No risk of parsing problematic data
309+
- **Flexibility**: Works with any data type or size
310+
- **Debugging**: Full data accessible when needed via `/view/{key}` endpoint
311+
312+
### Configuration Details
313+
314+
The codec server implements the Temporal codec server protocol with two endpoints:
315+
316+
- **`/decode`**: Returns helpful text with Redis key and view URL
317+
- **`/view/{key}`**: Serves the raw payload data for inspection
318+
319+
When you click the view URL, you'll see the complete payload data as stored in
320+
Redis, formatted appropriately for text or binary content.
321+
322+
## Considerations
323+
324+
### Performance Trade-offs
325+
326+
- **Benefits**: Reduces payload size, improves workflow performance, enables handling of large data
327+
- **Costs**: Additional network calls to external storage, potential latency increase
328+
329+
### Storage Backend Options
330+
331+
While this example uses Redis, production systems typically use:
332+
- **AWS S3**: For AWS environments
333+
- **Google Cloud Storage**: For GCP environments
334+
- **Azure Blob Storage**: For Azure environments
335+
- **Redis**: For development and testing
336+
337+
### Activity Heartbeats
338+
339+
For production workloads processing very large datasets, consider implementing
340+
activity heartbeats to prevent timeout issues:
341+
342+
```python
343+
@activity.defn
344+
async def process_large_dataset(dataset: LargeDataset) -> ProcessingResult:
345+
total_items = len(dataset.data)
346+
347+
for i, item in enumerate(dataset.data):
348+
# Send heartbeat every 100 items to prevent timeout
349+
if i % 100 == 0:
350+
await activity.heartbeat(f"Processed {i}/{total_items} items")
351+
352+
# Process item...
353+
```
354+
355+
This ensures Temporal knows the activity is still making progress during
356+
long-running operations.
357+
358+
### Error Handling
359+
360+
The codec includes error handling for:
361+
- Missing keys in storage
362+
- Storage connection failures
363+
- Serialization/deserialization errors
364+
365+
### Cleanup
366+
367+
Consider implementing cleanup strategies for stored data:
368+
- TTL (Time To Live) for automatic expiration
369+
- Manual cleanup workflows
370+
- Lifecycle policies for cloud storage
371+
372+
## Best Practices
373+
374+
1. **Enable globally**: The claim check pattern applies to all payloads when
375+
enabled, so consider the performance impact across your entire system
376+
2. **Monitor storage**: Track storage usage and costs since all payloads will be
377+
stored externally
378+
3. **Implement cleanup**: Prevent storage bloat with appropriate cleanup
379+
strategies
380+
4. **Test thoroughly**: Verify the pattern works correctly with your specific
381+
data types and doesn't introduce unexpected latency
382+
5. **Consider alternatives**: Evaluate if data compression or other
383+
optimizations might be sufficient before implementing claim check

0 commit comments

Comments
 (0)