Integrate Cloudflare Workflows with Agents for durable, multi-step background processing while Agents handle real-time communication.
Cloudflare Workflows provide durable, multi-step execution that survives failures, retries automatically, and can pause to wait for external events. They're ideal for:
- Long-running background tasks (data processing, report generation)
- Multi-step pipelines with retry logic
- Human-in-the-loop approval flows
- Tasks that shouldn't block user requests
Agents excel at real-time communication and state management, while Workflows excel at durable execution. Together they provide:
| Feature | Agent | Workflow | Combined |
|---|---|---|---|
| Real-time WebSocket | ✓ | ✗ | Agent handles |
| Long-running tasks | Limited | ✓ | Workflow handles |
| State persistence | ✓ | ✓ | Both |
| Automatic retries | ✗ | ✓ | Workflow handles |
| External event waiting | ✗ | ✓ | Workflow handles |
| Use Case | Recommendation |
|---|---|
| Chat/messaging | Agent only |
| Quick API calls | Agent only |
| Background processing (< 30s) | Agent queue() |
| Long-running tasks (> 30s) | Agent + Workflow |
| Multi-step pipelines | Workflow |
| Human approval flows | Agent + Workflow |
| Scheduled tasks | Agent schedule() or Workflow |
Create a Workflow that extends AgentWorkflow to get typed access to the originating Agent:
// src/workflows/processing.ts
import { AgentWorkflow } from "agents/workflows";
import type { AgentWorkflowEvent, AgentWorkflowStep } from "agents/workflows";
import type { MyAgent } from "../agent";
type TaskParams = {
taskId: string;
data: string;
};
export class ProcessingWorkflow extends AgentWorkflow<MyAgent, TaskParams> {
async run(event: AgentWorkflowEvent<TaskParams>, step: AgentWorkflowStep) {
const params = event.payload;
// Step 1: Process data
const result = await step.do("process-data", async () => {
// Durable step - will retry on failure
return processData(params.data);
});
// Report progress to Agent (non-durable, lightweight)
await this.reportProgress({
step: "process",
status: "complete",
percent: 0.5
});
// Step 2: Save results
await step.do("save-results", async () => {
// Call Agent method via RPC
await this.agent.saveResult(params.taskId, result);
});
// Broadcast to connected clients (non-durable)
this.broadcastToClients({
type: "task-complete",
taskId: params.taskId
});
// Report completion (durable via step)
await step.reportComplete(result);
return result;
}
}Use runWorkflow() to start a workflow with automatic tracking:
// src/agent.ts
import { Agent } from "agents";
export class MyAgent extends Agent {
async startTask(taskId: string, data: string) {
// Start workflow - automatically tracked in Agent's database
const instanceId = await this.runWorkflow("PROCESSING_WORKFLOW", {
taskId,
data
});
return { instanceId };
}
// Called when workflow reports progress (progress is typed object)
async onWorkflowProgress(
workflowName: string,
instanceId: string,
progress: unknown
) {
// Cast to your progress type
const p = progress as { step?: string; status?: string; percent?: number };
console.log(
`Workflow ${workflowName}/${instanceId}: ${p.step} - ${p.status} (${(p.percent ?? 0) * 100}%)`
);
// Broadcast to connected clients
this.broadcast(
JSON.stringify({
type: "workflow-progress",
workflowName,
instanceId,
progress
})
);
}
// Called when workflow completes
async onWorkflowComplete(
workflowName: string,
instanceId: string,
result?: unknown
) {
console.log(`Workflow ${workflowName}/${instanceId} completed:`, result);
}
// Method called by workflow via RPC
async saveResult(taskId: string, result: unknown) {
this
.sql`INSERT INTO results (task_id, data) VALUES (${taskId}, ${JSON.stringify(result)})`;
}
}Base class for Workflows that integrate with Agents.
Type Parameters:
AgentType- The Agent class type (for typed RPC)Params- User params passed to the workflow (optional)ProgressType- Type for progress reporting (defaults toDefaultProgress)Env- Environment type (defaults toCloudflare.Env)
Properties:
agent- Typed stub for calling Agent methods via RPCinstanceId- The workflow instance IDworkflowName- The workflow binding nameenv- Environment bindings
Methods on this (non-durable, may repeat on retry):
| Method | Description |
|---|---|
reportProgress(progress) |
Report typed progress object to the Agent |
broadcastToClients(message) |
Broadcast message to all WebSocket clients |
waitForApproval(step, opts?) |
Wait for approval event (throws on rejection) |
Methods on step (durable, idempotent, won't repeat on retry):
| Method | Description |
|---|---|
step.reportComplete(result?) |
Report successful completion |
step.reportError(error) |
Report an error |
step.sendEvent(event) |
Send a custom event to the Agent |
step.updateAgentState(state) |
Replace Agent state (broadcasts to clients) |
step.mergeAgentState(partial) |
Merge into Agent state (broadcasts to clients) |
step.resetAgentState() |
Reset Agent state to initialState |
DefaultProgress Type:
type DefaultProgress = {
step?: string;
status?: "pending" | "running" | "complete" | "error";
message?: string;
percent?: number;
[key: string]: unknown; // extensible
};Methods added to the Agent class:
Start a workflow and track it in the Agent's database.
const instanceId = await this.runWorkflow(
"MY_WORKFLOW",
{ taskId: "123", data: "process this" },
{
id: "custom-id", // optional - auto-generated if not provided
metadata: { userId: "user-456", priority: "high" }, // optional - for querying
agentBinding: "MyAgent" // optional - auto-detected from class name if not provided
}
);Parameters:
workflowName- Workflow binding name fromenvparams- Params to pass to the workflowoptions.id- Custom workflow ID (auto-generated if not provided)options.metadata- Optional metadata stored for querying (not passed to workflow)options.agentBinding- Agent binding name (auto-detected from class name if not provided)
Returns: Workflow instance ID
Send an event to a running workflow.
await this.sendWorkflowEvent("MY_WORKFLOW", instanceId, {
type: "approval",
payload: { approved: true }
});Get the status of a workflow and update tracking record.
const status = await this.getWorkflowStatus("MY_WORKFLOW", instanceId);
// status: { status: 'running', output: null, error: null }Get a tracked workflow by ID.
const workflow = this.getWorkflow(instanceId);
// { instanceId, workflowName, status, metadata, error, createdAt, ... }Query tracked workflows with cursor-based pagination. Returns a WorkflowPage with workflows, total count, and cursor for the next page.
// Get running workflows (default limit is 50, max is 100)
const { workflows, total } = this.getWorkflows({ status: "running" });
// Get workflows by binding name
const { workflows: processing } = this.getWorkflows({
workflowName: "PROCESSING_WORKFLOW"
});
// Filter by metadata
const { workflows: userWorkflows } = this.getWorkflows({
metadata: { userId: "user-456" }
});
// Pagination example
const page1 = this.getWorkflows({
status: ["complete", "errored"],
limit: 20,
orderBy: "desc"
});
console.log(`Showing ${page1.workflows.length} of ${page1.total} workflows`);
// Get next page using cursor
if (page1.nextCursor) {
const page2 = this.getWorkflows({
status: ["complete", "errored"],
limit: 20,
orderBy: "desc",
cursor: page1.nextCursor
});
}The WorkflowPage type:
type WorkflowPage = {
workflows: WorkflowInfo[];
total: number; // Total matching workflows
nextCursor: string | null; // null when no more pages
};Delete a single workflow tracking record.
const deleted = this.deleteWorkflow(instanceId);
// true if deleted, false if not foundDelete workflow tracking records matching criteria. Useful for cleanup.
// Delete all completed workflows older than 7 days
const count = this.deleteWorkflows({
status: "complete",
createdBefore: new Date(Date.now() - 7 * 24 * 60 * 60 * 1000)
});
// Delete all errored and terminated workflows
const count = this.deleteWorkflows({
status: ["errored", "terminated"]
});Terminate a running workflow immediately.
await this.terminateWorkflow(instanceId);This stops the workflow and sets its status to "terminated". Throws if the workflow is not found in the tracking table. Cloudflare will throw if the workflow is already completed, errored, or terminated.
Note:
terminate()is not yet supported in local development withwrangler dev. It works when deployed to Cloudflare. Follow #823 for details and updates.
Pause a running workflow. The workflow can be resumed later with resumeWorkflow().
await this.pauseWorkflow(instanceId);Throws if the workflow is not running. Cloudflare will throw if the workflow is already paused, completed, errored, or terminated.
Note:
pause()is not yet supported in local development withwrangler dev. It works when deployed to Cloudflare. Follow #823 for details and updates.
Resume a paused workflow.
await this.resumeWorkflow(instanceId);Throws if the workflow is not paused. Cloudflare will throw if the workflow is already running, completed, errored, or terminated.
Note:
resume()is not yet supported in local development withwrangler dev. It works when deployed to Cloudflare. Follow #823 for details and updates.
Restart a workflow instance from the beginning with the same ID.
// Reset tracking (default) - clears timestamps and error fields
await this.restartWorkflow(instanceId);
// Preserve original timestamps
await this.restartWorkflow(instanceId, { resetTracking: false });This is useful for re-running failed workflows or retrying from scratch. The resetTracking option (default: true) controls whether to reset the created_at timestamp and clear error fields.
Note:
restart()is not yet supported in local development withwrangler dev. It works when deployed to Cloudflare. Follow #823 for details and updates.
Override these methods in your Agent to handle workflow events:
class MyAgent extends Agent {
// Called when workflow reports progress (progress is typed object)
async onWorkflowProgress(
workflowName: string,
instanceId: string,
progress: unknown
) {
// Cast to your progress type
const p = progress as { step?: string; percent?: number };
}
// Called when workflow completes successfully
async onWorkflowComplete(
workflowName: string,
instanceId: string,
result?: unknown
) {}
// Called when workflow encounters an error
async onWorkflowError(
workflowName: string,
instanceId: string,
error: string
) {}
// Called when workflow sends a custom event
async onWorkflowEvent(
workflowName: string,
instanceId: string,
event: unknown
) {}
// Handle all callbacks in one place (alternative)
async onWorkflowCallback(callback: WorkflowCallback) {
// Called for all callback types - callback includes workflowName
}
}Convenience methods for human-in-the-loop approval flows:
class MyAgent extends Agent {
// Approve a waiting workflow
async handleApproval(instanceId: string, userId: string) {
await this.approveWorkflow(instanceId, {
reason: "Approved by admin",
metadata: { approvedBy: userId }
});
}
// Reject a waiting workflow
async handleRejection(instanceId: string, reason: string) {
await this.rejectWorkflow(instanceId, { reason });
}
}Workflows started with runWorkflow() are automatically tracked in the Agent's SQLite database.
| Column | Type | Description |
|---|---|---|
id |
TEXT | Internal row ID |
workflow_id |
TEXT | Cloudflare workflow instance ID |
workflow_name |
TEXT | Workflow binding name |
status |
TEXT | Current status |
metadata |
TEXT | JSON metadata (for querying) |
error_name |
TEXT | Error name (if failed) |
error_message |
TEXT | Error message (if failed) |
created_at |
INTEGER | Unix timestamp |
updated_at |
INTEGER | Unix timestamp |
completed_at |
INTEGER | Unix timestamp (when done) |
Note: Workflow params and output are not stored by default. Use metadata to store queryable information, and store large payloads in your own tables if needed.
queued- Waiting to startrunning- Currently executingpaused- Paused by userwaiting- Waiting for eventcomplete- Finished successfullyerrored- Failed with errorterminated- Manually terminated
// Workflow with default progress type
export class DataProcessingWorkflow extends AgentWorkflow<
MyAgent,
ProcessParams
> {
async run(event: AgentWorkflowEvent<ProcessParams>, step: AgentWorkflowStep) {
const params = event.payload;
const items = params.items;
for (let i = 0; i < items.length; i++) {
await step.do(`process-${i}`, async () => {
await processItem(items[i]);
});
// Report progress after each item (non-durable, lightweight)
await this.reportProgress({
step: `process-${i}`,
status: "complete",
percent: (i + 1) / items.length,
message: `Processed ${i + 1}/${items.length}`
});
}
await step.reportComplete({ processed: items.length });
}
}
// Agent
class MyAgent extends Agent {
async onWorkflowProgress(
workflowName: string,
instanceId: string,
progress: unknown
) {
// Broadcast progress to all connected clients
this.broadcast(
JSON.stringify({
type: "processing-progress",
workflowName,
instanceId,
progress
})
);
}
}// Workflow using the built-in waitForApproval helper
export class ApprovalWorkflow extends AgentWorkflow<MyAgent, RequestParams> {
async run(event: AgentWorkflowEvent<RequestParams>, step: AgentWorkflowStep) {
const params = event.payload;
// Prepare request
const request = await step.do("prepare", async () => {
return { ...params, preparedAt: Date.now() };
});
// Wait for approval (throws WorkflowRejectedError if rejected)
await this.reportProgress({
step: "approval",
status: "pending",
percent: 0.5,
message: "Awaiting approval"
});
const approvalData = await this.waitForApproval<{ approvedBy: string }>(
step,
{ timeout: "7 days" }
);
console.log("Approved by:", approvalData?.approvedBy);
// Execute approved action
const result = await step.do("execute", async () => {
return executeRequest(request);
});
await step.reportComplete(result);
return result;
}
}
// Agent using the built-in approval methods
class MyAgent extends Agent {
// Approve a waiting workflow
async handleApproval(instanceId: string, userId: string) {
await this.approveWorkflow(instanceId, {
reason: "Approved by admin",
metadata: { approvedBy: userId }
});
}
// Reject a waiting workflow
async handleRejection(instanceId: string, reason: string) {
await this.rejectWorkflow(instanceId, { reason });
}
}// Workflow with built-in retry logic
export class ResilientTaskWorkflow extends AgentWorkflow<MyAgent, TaskParams> {
async run(event: AgentWorkflowEvent<TaskParams>, step: AgentWorkflowStep) {
const params = event.payload;
const result = await step.do(
"call-external-api",
{
retries: {
limit: 5,
delay: "10 seconds",
backoff: "exponential"
},
timeout: "5 minutes"
},
async () => {
const response = await fetch("https://api.example.com/process", {
method: "POST",
body: JSON.stringify(params)
});
if (!response.ok) {
throw new Error(`API error: ${response.status}`);
}
return response.json();
}
);
await step.reportComplete(result);
return result;
}
}Workflows can update the Agent's state directly (durably via step), which automatically broadcasts to all connected clients:
// Workflow that syncs state to Agent
export class ProcessingWorkflow extends AgentWorkflow<MyAgent, TaskParams> {
async run(event: AgentWorkflowEvent<TaskParams>, step: AgentWorkflowStep) {
const params = event.payload;
// Update Agent state (durable, replaces entire state, broadcasts to clients)
await step.updateAgentState({
currentTask: {
id: params.taskId,
status: "processing",
startedAt: Date.now()
}
});
const result = await step.do("process", async () => {
return processTask(params);
});
// Merge partial state (durable, keeps existing fields, broadcasts to clients)
await step.mergeAgentState({
currentTask: {
status: "complete",
result,
completedAt: Date.now()
}
});
await step.reportComplete(result);
return result;
}
}Define custom progress types for domain-specific reporting:
// Custom progress type for data pipeline
type PipelineProgress = {
stage: "extract" | "transform" | "load";
recordsProcessed: number;
totalRecords: number;
currentTable?: string;
};
// Workflow with custom progress type (3rd type parameter)
export class ETLWorkflow extends AgentWorkflow<
MyAgent,
ETLParams,
PipelineProgress
> {
async run(event: AgentWorkflowEvent<ETLParams>, step: AgentWorkflowStep) {
const params = event.payload;
// Report typed progress (non-durable, lightweight for frequent updates)
await this.reportProgress({
stage: "extract",
recordsProcessed: 0,
totalRecords: 1000,
currentTable: "users"
});
// ... processing
}
}
// Agent receives typed progress
class MyAgent extends Agent {
async onWorkflowProgress(
workflowName: string,
instanceId: string,
progress: unknown
) {
const p = progress as PipelineProgress;
console.log(`Stage: ${p.stage}, ${p.recordsProcessed}/${p.totalRecords}`);
}
}// Direct RPC call (typed)
await this.agent.updateTaskStatus(taskId, "processing");
const data = await this.agent.getData(taskId);
// Non-durable callbacks (may repeat on retry, use for frequent updates)
await this.reportProgress({
step: "process",
percent: 0.5,
message: "Halfway done"
});
this.broadcastToClients({ type: "update", data });
// Durable callbacks via step (idempotent, won't repeat on retry)
await step.reportComplete(result);
await step.reportError("Something went wrong");
await step.sendEvent({ type: "custom", data: {} });
// Durable state synchronization via step (broadcasts to clients)
await step.updateAgentState({ status: "processing" });
await step.mergeAgentState({ progress: 0.5 });// Send event to waiting workflow (generic)
await this.sendWorkflowEvent("MY_WORKFLOW", instanceId, {
type: "custom-event",
payload: { action: "proceed" }
});
// Approve/reject workflows using convenience methods
await this.approveWorkflow(instanceId, {
reason: "Approved by admin",
metadata: { approvedBy: userId }
});
await this.rejectWorkflow(instanceId, {
reason: "Request denied"
});
// The workflow waits for approval with:
const approvalData = await this.waitForApproval(step, { timeout: "7 days" });- Keep workflows focused - One workflow per logical task
- Use meaningful step names - Helps with debugging and observability
- Report progress regularly - Keeps users informed
- Handle errors gracefully - Use
reportError()before throwing - Clean up completed workflows - The
cf_agents_workflowstable can grow unbounded, so implement a retention policy:
// Option 1: Cleanup immediately on completion
async onWorkflowComplete(workflowName, instanceId, result) {
// Process result first, then delete
this.deleteWorkflow(instanceId);
}
// Option 2: Scheduled cleanup (keep recent history)
// Call this periodically via a scheduled task or cron
this.deleteWorkflows({
status: ["complete", "errored"],
createdBefore: new Date(Date.now() - 7 * 24 * 60 * 60 * 1000) // 7 days
});
// Option 3: Keep all history for compliance/auditing
// Don't call deleteWorkflows() - query historical data as needed- Handle workflow binding renames carefully - If you rename a workflow binding in
wrangler.jsonc, existing tracked workflows will reference the old name. The agent will warn on startup if it detects this. UsemigrateWorkflowBinding()to update them:
// After renaming OLD_WORKFLOW to NEW_WORKFLOW in wrangler.toml
async onStart() {
// Migrate any existing tracked workflows to the new binding name
const migrated = this.migrateWorkflowBinding('OLD_WORKFLOW', 'NEW_WORKFLOW');
// You can remove this code after all agents have migrated
}- Workflows can have at most 1,024 steps
- Maximum 10MB state per workflow
- Events wait for at most 1 year
- No direct WebSocket from workflows (use
broadcastToClients()) - Workflow execution time: up to 30 minutes per step