Skip to content

Latest commit

 

History

History
836 lines (659 loc) · 24.4 KB

File metadata and controls

836 lines (659 loc) · 24.4 KB

Workflows Integration

Integrate Cloudflare Workflows with Agents for durable, multi-step background processing while Agents handle real-time communication.

Quick Links

Introduction

What are Cloudflare Workflows?

Cloudflare Workflows provide durable, multi-step execution that survives failures, retries automatically, and can pause to wait for external events. They're ideal for:

  • Long-running background tasks (data processing, report generation)
  • Multi-step pipelines with retry logic
  • Human-in-the-loop approval flows
  • Tasks that shouldn't block user requests

Why Integrate with Agents?

Agents excel at real-time communication and state management, while Workflows excel at durable execution. Together they provide:

Feature Agent Workflow Combined
Real-time WebSocket Agent handles
Long-running tasks Limited Workflow handles
State persistence Both
Automatic retries Workflow handles
External event waiting Workflow handles

When to Use What

Use Case Recommendation
Chat/messaging Agent only
Quick API calls Agent only
Background processing (< 30s) Agent queue()
Long-running tasks (> 30s) Agent + Workflow
Multi-step pipelines Workflow
Human approval flows Agent + Workflow
Scheduled tasks Agent schedule() or Workflow

Quick Start

1. Define Your Workflow

Create a Workflow that extends AgentWorkflow to get typed access to the originating Agent:

// src/workflows/processing.ts
import { AgentWorkflow } from "agents/workflows";
import type { AgentWorkflowEvent, AgentWorkflowStep } from "agents/workflows";
import type { MyAgent } from "../agent";

type TaskParams = {
  taskId: string;
  data: string;
};

export class ProcessingWorkflow extends AgentWorkflow<MyAgent, TaskParams> {
  async run(event: AgentWorkflowEvent<TaskParams>, step: AgentWorkflowStep) {
    const params = event.payload;

    // Step 1: Process data
    const result = await step.do("process-data", async () => {
      // Durable step - will retry on failure
      return processData(params.data);
    });

    // Report progress to Agent (non-durable, lightweight)
    await this.reportProgress({
      step: "process",
      status: "complete",
      percent: 0.5
    });

    // Step 2: Save results
    await step.do("save-results", async () => {
      // Call Agent method via RPC
      await this.agent.saveResult(params.taskId, result);
    });

    // Broadcast to connected clients (non-durable)
    this.broadcastToClients({
      type: "task-complete",
      taskId: params.taskId
    });

    // Report completion (durable via step)
    await step.reportComplete(result);

    return result;
  }
}

2. Start Workflow from Agent

Use runWorkflow() to start a workflow with automatic tracking:

// src/agent.ts
import { Agent } from "agents";

export class MyAgent extends Agent {
  async startTask(taskId: string, data: string) {
    // Start workflow - automatically tracked in Agent's database
    const instanceId = await this.runWorkflow("PROCESSING_WORKFLOW", {
      taskId,
      data
    });

    return { instanceId };
  }

  // Called when workflow reports progress (progress is typed object)
  async onWorkflowProgress(
    workflowName: string,
    instanceId: string,
    progress: unknown
  ) {
    // Cast to your progress type
    const p = progress as { step?: string; status?: string; percent?: number };
    console.log(
      `Workflow ${workflowName}/${instanceId}: ${p.step} - ${p.status} (${(p.percent ?? 0) * 100}%)`
    );

    // Broadcast to connected clients
    this.broadcast(
      JSON.stringify({
        type: "workflow-progress",
        workflowName,
        instanceId,
        progress
      })
    );
  }

  // Called when workflow completes
  async onWorkflowComplete(
    workflowName: string,
    instanceId: string,
    result?: unknown
  ) {
    console.log(`Workflow ${workflowName}/${instanceId} completed:`, result);
  }

  // Method called by workflow via RPC
  async saveResult(taskId: string, result: unknown) {
    this
      .sql`INSERT INTO results (task_id, data) VALUES (${taskId}, ${JSON.stringify(result)})`;
  }
}

3. Configure Wrangler

// wrangler.jsonc
{
  "name": "my-app",
  "main": "src/index.ts",
  "compatibility_date": "2025-02-11",
  "durable_objects": {
    "bindings": [{ "name": "MY_AGENT", "class_name": "MyAgent" }]
  },
  "workflows": [
    {
      "name": "processing-workflow",
      "binding": "PROCESSING_WORKFLOW",
      "class_name": "ProcessingWorkflow"
    }
  ],
  "migrations": [{ "tag": "v1", "new_sqlite_classes": ["MyAgent"] }]
}

API Reference

AgentWorkflow<AgentType, Params, ProgressType, Env>

Base class for Workflows that integrate with Agents.

Type Parameters:

  • AgentType - The Agent class type (for typed RPC)
  • Params - User params passed to the workflow (optional)
  • ProgressType - Type for progress reporting (defaults to DefaultProgress)
  • Env - Environment type (defaults to Cloudflare.Env)

Properties:

  • agent - Typed stub for calling Agent methods via RPC
  • instanceId - The workflow instance ID
  • workflowName - The workflow binding name
  • env - Environment bindings

Methods on this (non-durable, may repeat on retry):

Method Description
reportProgress(progress) Report typed progress object to the Agent
broadcastToClients(message) Broadcast message to all WebSocket clients
waitForApproval(step, opts?) Wait for approval event (throws on rejection)

Methods on step (durable, idempotent, won't repeat on retry):

Method Description
step.reportComplete(result?) Report successful completion
step.reportError(error) Report an error
step.sendEvent(event) Send a custom event to the Agent
step.updateAgentState(state) Replace Agent state (broadcasts to clients)
step.mergeAgentState(partial) Merge into Agent state (broadcasts to clients)
step.resetAgentState() Reset Agent state to initialState

DefaultProgress Type:

type DefaultProgress = {
  step?: string;
  status?: "pending" | "running" | "complete" | "error";
  message?: string;
  percent?: number;
  [key: string]: unknown; // extensible
};

Agent Workflow Methods

Methods added to the Agent class:

runWorkflow(workflowName, params, options?)

Start a workflow and track it in the Agent's database.

const instanceId = await this.runWorkflow(
  "MY_WORKFLOW",
  { taskId: "123", data: "process this" },
  {
    id: "custom-id", // optional - auto-generated if not provided
    metadata: { userId: "user-456", priority: "high" }, // optional - for querying
    agentBinding: "MyAgent" // optional - auto-detected from class name if not provided
  }
);

Parameters:

  • workflowName - Workflow binding name from env
  • params - Params to pass to the workflow
  • options.id - Custom workflow ID (auto-generated if not provided)
  • options.metadata - Optional metadata stored for querying (not passed to workflow)
  • options.agentBinding - Agent binding name (auto-detected from class name if not provided)

Returns: Workflow instance ID

sendWorkflowEvent(workflowName, instanceId, event)

Send an event to a running workflow.

await this.sendWorkflowEvent("MY_WORKFLOW", instanceId, {
  type: "approval",
  payload: { approved: true }
});

getWorkflowStatus(workflowName, instanceId)

Get the status of a workflow and update tracking record.

const status = await this.getWorkflowStatus("MY_WORKFLOW", instanceId);
// status: { status: 'running', output: null, error: null }

getWorkflow(instanceId)

Get a tracked workflow by ID.

const workflow = this.getWorkflow(instanceId);
// { instanceId, workflowName, status, metadata, error, createdAt, ... }

getWorkflows(criteria?)

Query tracked workflows with cursor-based pagination. Returns a WorkflowPage with workflows, total count, and cursor for the next page.

// Get running workflows (default limit is 50, max is 100)
const { workflows, total } = this.getWorkflows({ status: "running" });

// Get workflows by binding name
const { workflows: processing } = this.getWorkflows({
  workflowName: "PROCESSING_WORKFLOW"
});

// Filter by metadata
const { workflows: userWorkflows } = this.getWorkflows({
  metadata: { userId: "user-456" }
});

// Pagination example
const page1 = this.getWorkflows({
  status: ["complete", "errored"],
  limit: 20,
  orderBy: "desc"
});

console.log(`Showing ${page1.workflows.length} of ${page1.total} workflows`);

// Get next page using cursor
if (page1.nextCursor) {
  const page2 = this.getWorkflows({
    status: ["complete", "errored"],
    limit: 20,
    orderBy: "desc",
    cursor: page1.nextCursor
  });
}

The WorkflowPage type:

type WorkflowPage = {
  workflows: WorkflowInfo[];
  total: number; // Total matching workflows
  nextCursor: string | null; // null when no more pages
};

deleteWorkflow(instanceId)

Delete a single workflow tracking record.

const deleted = this.deleteWorkflow(instanceId);
// true if deleted, false if not found

deleteWorkflows(criteria?)

Delete workflow tracking records matching criteria. Useful for cleanup.

// Delete all completed workflows older than 7 days
const count = this.deleteWorkflows({
  status: "complete",
  createdBefore: new Date(Date.now() - 7 * 24 * 60 * 60 * 1000)
});

// Delete all errored and terminated workflows
const count = this.deleteWorkflows({
  status: ["errored", "terminated"]
});

terminateWorkflow(instanceId)

Terminate a running workflow immediately.

await this.terminateWorkflow(instanceId);

This stops the workflow and sets its status to "terminated". Throws if the workflow is not found in the tracking table. Cloudflare will throw if the workflow is already completed, errored, or terminated.

Note: terminate() is not yet supported in local development with wrangler dev. It works when deployed to Cloudflare. Follow #823 for details and updates.

pauseWorkflow(instanceId)

Pause a running workflow. The workflow can be resumed later with resumeWorkflow().

await this.pauseWorkflow(instanceId);

Throws if the workflow is not running. Cloudflare will throw if the workflow is already paused, completed, errored, or terminated.

Note: pause() is not yet supported in local development with wrangler dev. It works when deployed to Cloudflare. Follow #823 for details and updates.

resumeWorkflow(instanceId)

Resume a paused workflow.

await this.resumeWorkflow(instanceId);

Throws if the workflow is not paused. Cloudflare will throw if the workflow is already running, completed, errored, or terminated.

Note: resume() is not yet supported in local development with wrangler dev. It works when deployed to Cloudflare. Follow #823 for details and updates.

restartWorkflow(instanceId, options?)

Restart a workflow instance from the beginning with the same ID.

// Reset tracking (default) - clears timestamps and error fields
await this.restartWorkflow(instanceId);

// Preserve original timestamps
await this.restartWorkflow(instanceId, { resetTracking: false });

This is useful for re-running failed workflows or retrying from scratch. The resetTracking option (default: true) controls whether to reset the created_at timestamp and clear error fields.

Note: restart() is not yet supported in local development with wrangler dev. It works when deployed to Cloudflare. Follow #823 for details and updates.

Lifecycle Callbacks

Override these methods in your Agent to handle workflow events:

class MyAgent extends Agent {
  // Called when workflow reports progress (progress is typed object)
  async onWorkflowProgress(
    workflowName: string,
    instanceId: string,
    progress: unknown
  ) {
    // Cast to your progress type
    const p = progress as { step?: string; percent?: number };
  }

  // Called when workflow completes successfully
  async onWorkflowComplete(
    workflowName: string,
    instanceId: string,
    result?: unknown
  ) {}

  // Called when workflow encounters an error
  async onWorkflowError(
    workflowName: string,
    instanceId: string,
    error: string
  ) {}

  // Called when workflow sends a custom event
  async onWorkflowEvent(
    workflowName: string,
    instanceId: string,
    event: unknown
  ) {}

  // Handle all callbacks in one place (alternative)
  async onWorkflowCallback(callback: WorkflowCallback) {
    // Called for all callback types - callback includes workflowName
  }
}

Approval Methods

Convenience methods for human-in-the-loop approval flows:

class MyAgent extends Agent {
  // Approve a waiting workflow
  async handleApproval(instanceId: string, userId: string) {
    await this.approveWorkflow(instanceId, {
      reason: "Approved by admin",
      metadata: { approvedBy: userId }
    });
  }

  // Reject a waiting workflow
  async handleRejection(instanceId: string, reason: string) {
    await this.rejectWorkflow(instanceId, { reason });
  }
}

Workflow Tracking

Workflows started with runWorkflow() are automatically tracked in the Agent's SQLite database.

cf_agents_workflows Table

Column Type Description
id TEXT Internal row ID
workflow_id TEXT Cloudflare workflow instance ID
workflow_name TEXT Workflow binding name
status TEXT Current status
metadata TEXT JSON metadata (for querying)
error_name TEXT Error name (if failed)
error_message TEXT Error message (if failed)
created_at INTEGER Unix timestamp
updated_at INTEGER Unix timestamp
completed_at INTEGER Unix timestamp (when done)

Note: Workflow params and output are not stored by default. Use metadata to store queryable information, and store large payloads in your own tables if needed.

Workflow Status Values

  • queued - Waiting to start
  • running - Currently executing
  • paused - Paused by user
  • waiting - Waiting for event
  • complete - Finished successfully
  • errored - Failed with error
  • terminated - Manually terminated

Patterns

Background Processing with Progress

// Workflow with default progress type
export class DataProcessingWorkflow extends AgentWorkflow<
  MyAgent,
  ProcessParams
> {
  async run(event: AgentWorkflowEvent<ProcessParams>, step: AgentWorkflowStep) {
    const params = event.payload;
    const items = params.items;

    for (let i = 0; i < items.length; i++) {
      await step.do(`process-${i}`, async () => {
        await processItem(items[i]);
      });

      // Report progress after each item (non-durable, lightweight)
      await this.reportProgress({
        step: `process-${i}`,
        status: "complete",
        percent: (i + 1) / items.length,
        message: `Processed ${i + 1}/${items.length}`
      });
    }

    await step.reportComplete({ processed: items.length });
  }
}

// Agent
class MyAgent extends Agent {
  async onWorkflowProgress(
    workflowName: string,
    instanceId: string,
    progress: unknown
  ) {
    // Broadcast progress to all connected clients
    this.broadcast(
      JSON.stringify({
        type: "processing-progress",
        workflowName,
        instanceId,
        progress
      })
    );
  }
}

Human-in-the-Loop Approval

// Workflow using the built-in waitForApproval helper
export class ApprovalWorkflow extends AgentWorkflow<MyAgent, RequestParams> {
  async run(event: AgentWorkflowEvent<RequestParams>, step: AgentWorkflowStep) {
    const params = event.payload;

    // Prepare request
    const request = await step.do("prepare", async () => {
      return { ...params, preparedAt: Date.now() };
    });

    // Wait for approval (throws WorkflowRejectedError if rejected)
    await this.reportProgress({
      step: "approval",
      status: "pending",
      percent: 0.5,
      message: "Awaiting approval"
    });
    const approvalData = await this.waitForApproval<{ approvedBy: string }>(
      step,
      { timeout: "7 days" }
    );

    console.log("Approved by:", approvalData?.approvedBy);

    // Execute approved action
    const result = await step.do("execute", async () => {
      return executeRequest(request);
    });

    await step.reportComplete(result);
    return result;
  }
}

// Agent using the built-in approval methods
class MyAgent extends Agent {
  // Approve a waiting workflow
  async handleApproval(instanceId: string, userId: string) {
    await this.approveWorkflow(instanceId, {
      reason: "Approved by admin",
      metadata: { approvedBy: userId }
    });
  }

  // Reject a waiting workflow
  async handleRejection(instanceId: string, reason: string) {
    await this.rejectWorkflow(instanceId, { reason });
  }
}

Durable Task Queue with Retries

// Workflow with built-in retry logic
export class ResilientTaskWorkflow extends AgentWorkflow<MyAgent, TaskParams> {
  async run(event: AgentWorkflowEvent<TaskParams>, step: AgentWorkflowStep) {
    const params = event.payload;

    const result = await step.do(
      "call-external-api",
      {
        retries: {
          limit: 5,
          delay: "10 seconds",
          backoff: "exponential"
        },
        timeout: "5 minutes"
      },
      async () => {
        const response = await fetch("https://api.example.com/process", {
          method: "POST",
          body: JSON.stringify(params)
        });

        if (!response.ok) {
          throw new Error(`API error: ${response.status}`);
        }

        return response.json();
      }
    );

    await step.reportComplete(result);
    return result;
  }
}

State Synchronization

Workflows can update the Agent's state directly (durably via step), which automatically broadcasts to all connected clients:

// Workflow that syncs state to Agent
export class ProcessingWorkflow extends AgentWorkflow<MyAgent, TaskParams> {
  async run(event: AgentWorkflowEvent<TaskParams>, step: AgentWorkflowStep) {
    const params = event.payload;

    // Update Agent state (durable, replaces entire state, broadcasts to clients)
    await step.updateAgentState({
      currentTask: {
        id: params.taskId,
        status: "processing",
        startedAt: Date.now()
      }
    });

    const result = await step.do("process", async () => {
      return processTask(params);
    });

    // Merge partial state (durable, keeps existing fields, broadcasts to clients)
    await step.mergeAgentState({
      currentTask: {
        status: "complete",
        result,
        completedAt: Date.now()
      }
    });

    await step.reportComplete(result);
    return result;
  }
}

Custom Progress Types

Define custom progress types for domain-specific reporting:

// Custom progress type for data pipeline
type PipelineProgress = {
  stage: "extract" | "transform" | "load";
  recordsProcessed: number;
  totalRecords: number;
  currentTable?: string;
};

// Workflow with custom progress type (3rd type parameter)
export class ETLWorkflow extends AgentWorkflow<
  MyAgent,
  ETLParams,
  PipelineProgress
> {
  async run(event: AgentWorkflowEvent<ETLParams>, step: AgentWorkflowStep) {
    const params = event.payload;

    // Report typed progress (non-durable, lightweight for frequent updates)
    await this.reportProgress({
      stage: "extract",
      recordsProcessed: 0,
      totalRecords: 1000,
      currentTable: "users"
    });

    // ... processing
  }
}

// Agent receives typed progress
class MyAgent extends Agent {
  async onWorkflowProgress(
    workflowName: string,
    instanceId: string,
    progress: unknown
  ) {
    const p = progress as PipelineProgress;
    console.log(`Stage: ${p.stage}, ${p.recordsProcessed}/${p.totalRecords}`);
  }
}

Bidirectional Communication

Workflow → Agent

// Direct RPC call (typed)
await this.agent.updateTaskStatus(taskId, "processing");
const data = await this.agent.getData(taskId);

// Non-durable callbacks (may repeat on retry, use for frequent updates)
await this.reportProgress({
  step: "process",
  percent: 0.5,
  message: "Halfway done"
});
this.broadcastToClients({ type: "update", data });

// Durable callbacks via step (idempotent, won't repeat on retry)
await step.reportComplete(result);
await step.reportError("Something went wrong");
await step.sendEvent({ type: "custom", data: {} });

// Durable state synchronization via step (broadcasts to clients)
await step.updateAgentState({ status: "processing" });
await step.mergeAgentState({ progress: 0.5 });

Agent → Workflow

// Send event to waiting workflow (generic)
await this.sendWorkflowEvent("MY_WORKFLOW", instanceId, {
  type: "custom-event",
  payload: { action: "proceed" }
});

// Approve/reject workflows using convenience methods
await this.approveWorkflow(instanceId, {
  reason: "Approved by admin",
  metadata: { approvedBy: userId }
});

await this.rejectWorkflow(instanceId, {
  reason: "Request denied"
});

// The workflow waits for approval with:
const approvalData = await this.waitForApproval(step, { timeout: "7 days" });

Best Practices

  1. Keep workflows focused - One workflow per logical task
  2. Use meaningful step names - Helps with debugging and observability
  3. Report progress regularly - Keeps users informed
  4. Handle errors gracefully - Use reportError() before throwing
  5. Clean up completed workflows - The cf_agents_workflows table can grow unbounded, so implement a retention policy:
// Option 1: Cleanup immediately on completion
async onWorkflowComplete(workflowName, instanceId, result) {
  // Process result first, then delete
  this.deleteWorkflow(instanceId);
}

// Option 2: Scheduled cleanup (keep recent history)
// Call this periodically via a scheduled task or cron
this.deleteWorkflows({
  status: ["complete", "errored"],
  createdBefore: new Date(Date.now() - 7 * 24 * 60 * 60 * 1000) // 7 days
});

// Option 3: Keep all history for compliance/auditing
// Don't call deleteWorkflows() - query historical data as needed
  1. Handle workflow binding renames carefully - If you rename a workflow binding in wrangler.jsonc, existing tracked workflows will reference the old name. The agent will warn on startup if it detects this. Use migrateWorkflowBinding() to update them:
// After renaming OLD_WORKFLOW to NEW_WORKFLOW in wrangler.toml
async onStart() {
  // Migrate any existing tracked workflows to the new binding name
  const migrated = this.migrateWorkflowBinding('OLD_WORKFLOW', 'NEW_WORKFLOW');
  // You can remove this code after all agents have migrated
}

Limitations

  • Workflows can have at most 1,024 steps
  • Maximum 10MB state per workflow
  • Events wait for at most 1 year
  • No direct WebSocket from workflows (use broadcastToClients())
  • Workflow execution time: up to 30 minutes per step