Cloudflare Workflows
Cloudflare Workflows
This document describes the Cloudflare Workflows implementation in the bloqr-backend, providing durable execution for compilation, batch processing, cache warming, and health monitoring.
Table of Contents
- Overview
- Benefits over Queue-Based Processing
- Available Workflows
- API Endpoints
- Real-Time Events
- Scheduled Workflows (Cron)
- Workflow Status & Monitoring
- Configuration
- Error Handling & Recovery
Overview
Cloudflare Workflows provide durable execution for long-running operations. Unlike traditional queue-based processing, workflows offer:
- Automatic state persistence between steps
- Crash recovery - resumes from the last successful step
- Built-in retry with configurable policies
- Observable step-by-step progress
- Reliable scheduled execution with cron triggers
Benefits over Queue-Based Processing
| Feature | Queue-Based | Workflows |
|---|---|---|
| State Persistence | Manual (KV) | Automatic |
| Crash Recovery | Re-process entire message | Resume from checkpoint |
| Step Visibility | Limited | Full step-by-step |
| Retry Logic | Custom implementation | Built-in with backoff |
| Long-running Tasks | 30s limit | Up to 15 minutes per step |
| Scheduled Execution | External scheduler | Native cron triggers |
Available Workflows
CompilationWorkflow
Handles single async compilation requests with durable state between steps.
Steps:
validate- Validate configurationcompile-sources- Fetch and compile all sourcescache-result- Compress and store in KVupdate-metrics- Update workflow metrics
Parameters:
interface CompilationParams { requestId: string; // Unique tracking ID configuration: IConfiguration; // Filter list config preFetchedContent?: Record<string, string>; // Optional pre-fetched content benchmark?: boolean; // Include benchmark metrics priority?: 'standard' | 'high'; queuedAt: number; // Timestamp}API Endpoint: POST /workflow/compile
curl -X POST http://localhost:8787/workflow/compile \ -H "Content-Type: application/json" \ -d '{ "configuration": { "name": "My Filter List", "sources": [ {"source": "https://easylist.to/easylist/easylist.txt", "name": "EasyList"} ], "transformations": ["Deduplicate", "RemoveEmptyLines"] }, "priority": "high" }'Response:
{ "success": true, "message": "Compilation workflow started", "workflowId": "wf-compile-abc123", "workflowType": "compilation", "requestId": "wf-compile-abc123", "configName": "My Filter List"}BatchCompilationWorkflow
Processes multiple compilations with per-chunk durability and crash recovery.
Steps:
validate-batch- Validate all configurationscompile-chunk-N- Process chunks of 3 compilations in parallelupdate-batch-metrics- Update aggregate metrics
Parameters:
interface BatchCompilationParams { batchId: string; requests: Array<{ id: string; configuration: IConfiguration; preFetchedContent?: Record<string, string>; benchmark?: boolean; }>; priority?: 'standard' | 'high'; queuedAt: number;}API Endpoint: POST /workflow/batch
curl -X POST http://localhost:8787/workflow/batch \ -H "Content-Type: application/json" \ -d '{ "requests": [ { "id": "request-1", "configuration": { "name": "EasyList", "sources": [{"source": "https://easylist.to/easylist/easylist.txt"}] } }, { "id": "request-2", "configuration": { "name": "EasyPrivacy", "sources": [{"source": "https://easylist.to/easylist/easyprivacy.txt"}] } } ], "priority": "standard" }'CacheWarmingWorkflow
Pre-populates the cache with popular filter lists. Runs on schedule or manual trigger.
Steps:
check-cache-status- Identify configurations needing refreshwarm-chunk-N- Compile and cache configurations in chunksupdate-warming-metrics- Track warming statistics
Default Popular Configurations:
- EasyList
- EasyPrivacy
- AdGuard Base
Parameters:
interface CacheWarmingParams { runId: string; configurations: IConfiguration[]; // Empty = use defaults scheduled: boolean;}API Endpoint: POST /workflow/cache-warm
# Trigger with default configurationscurl -X POST http://localhost:8787/workflow/cache-warm \ -H "Content-Type: application/json" \ -d '{}'
# Trigger with custom configurationscurl -X POST http://localhost:8787/workflow/cache-warm \ -H "Content-Type: application/json" \ -d '{ "configurations": [ { "name": "Custom List", "sources": [{"source": "https://example.com/filters.txt"}] } ] }'Cron Schedule: Every 6 hours (0 */6 * * *)
HealthMonitoringWorkflow
Monitors filter source availability and alerts on failures.
Steps:
load-health-history- Load recent health check historycheck-source-N- Check each source individuallyanalyze-results- Detect consecutive failures for alertingsend-alerts- Send alerts if threshold exceededstore-results- Persist health data
Default Sources Monitored:
- EasyList (expected: 50,000+ rules)
- EasyPrivacy (expected: 10,000+ rules)
- AdGuard Base (expected: 30,000+ rules)
- AdGuard Tracking Protection (expected: 10,000+ rules)
- Peter Lowe’s List (expected: 2,000+ rules)
Health Thresholds:
- Max response time: 30 seconds
- Failure threshold: 3 consecutive failures before alerting
Parameters:
interface HealthMonitoringParams { runId: string; sources: Array<{ name: string; url: string; expectedMinRules?: number; }>; // Empty = use defaults alertOnFailure: boolean;}API Endpoint: POST /workflow/health-check
# Trigger with default sourcescurl -X POST http://localhost:8787/workflow/health-check \ -H "Content-Type: application/json" \ -d '{"alertOnFailure": true}'
# Check custom sourcescurl -X POST http://localhost:8787/workflow/health-check \ -H "Content-Type: application/json" \ -d '{ "sources": [ {"name": "My Source", "url": "https://example.com/filters.txt", "expectedMinRules": 100} ], "alertOnFailure": true }'Cron Schedule: Every hour (0 * * * *)
DynamicCompilationWorkflow
Durable compilation workflow where each step runs in an isolated V8 Worker loaded at runtime via @cloudflare/dynamic-workflows. Introduced 2026-05-01.
Key properties vs CompilationWorkflow:
- Each step source is an inline JavaScript string (
VALIDATE_STEP_SOURCE,TRANSFORM_STEP_SOURCE,CACHE_STEP_SOURCE) - Steps run in sandboxed V8 isolates — zero ambient authority (
globalOutbound: null) - Requires
DYNAMIC_WORKFLOW_LOADERbinding (Cloudflare beta)
Steps:
validate- Load and execute inline validator Worker; validates configuration structuretransform- Load and execute inline transform Worker; processespreFetchedContentand produces compiled filter rulescache- Load and execute inline cache Worker; derives SHA-256 cache key from the compiled rules (not the configuration JSON)
ZTA properties:
clerkUserIdsourced exclusively from verified JWT (authContext.userId), never from request bodyglobalOutbound: nullenforced on all dynamic step isolates- Workflow instance ID equals
requestId;event.instanceIdused for all WorkflowEvents and analytics calls
Parameters:
interface DynamicCompilationParams { clerkUserId: string; // ZTA: from authContext only requestId: string; configuration: IConfiguration; preFetchedContent: Record<string, string>; // required: isolates run with globalOutbound: null benchmark?: boolean; priority: 'high' | 'standard'; // defaults to 'standard' queuedAt: number; // Unix ms — queue-latency tracking}API Endpoint: POST /workflow/dynamic-compile
curl -X POST http://localhost:8787/workflow/dynamic-compile \ -H "Content-Type: application/json" \ -H "Authorization: Bearer <token>" \ -d '{ "configuration": { "name": "My Filter List", "sources": [ { "source": "https://easylist.to/easylist/easylist.txt", "name": "EasyList" } ] }, "preFetchedContent": { "EasyList": "! EasyList\n||example.com^" }, "priority": "high" }'Note:
preFetchedContentis required forPOST /workflow/dynamic-compile. Each dynamic step isolate runs withglobalOutbound: null— it has no outbound network access and therefore cannot fetch filter list sources at runtime. The caller must pre-fetch and provide all source content before creating the workflow. When Cloudflare Access is configured, this endpoint returns 403 Forbidden if the CF Access JWT is absent or invalid.
Flow diagram:
flowchart TD
Start([DynamicCompilationWorkflow.run]) --> ValidateLoad[load VALIDATE_STEP_SOURCE\ninto isolated V8 Worker]
ValidateLoad --> ValidateRun[handle.run payload\n→ validated config]
ValidateRun --> TransformLoad[load TRANSFORM_STEP_SOURCE\ninto isolated V8 Worker]
TransformLoad --> TransformRun[handle.run validated config + preFetchedContent\n→ compiled rules]
TransformRun --> CacheLoad[load CACHE_STEP_SOURCE\ninto isolated V8 Worker]
CacheLoad --> CacheRun[handle.run compiled result\n→ SHA-256 cache key]
CacheRun --> Complete[Return WorkflowCompilationResult\nwith rules + ruleCount + cacheKey]
style ValidateLoad fill:#1a237e,color:#fff
style TransformLoad fill:#b8860b,color:#fff
style CacheLoad fill:#b84000,color:#fff
style Complete fill:#1b5e20,color:#fff
API Endpoints
Workflow Management
| Method | Endpoint | Description |
|---|---|---|
| POST | /workflow/compile | Start compilation workflow |
| POST | /workflow/batch | Start batch compilation workflow |
| POST | /workflow/cache-warm | Trigger cache warming |
| POST | /workflow/health-check | Trigger health monitoring |
| POST | /workflow/dynamic-compile | Start isolated dynamic compilation |
| GET | /workflow/status/:type/:id | Get workflow instance status |
| GET | /workflow/events/:id | Get real-time progress events |
| GET | /workflow/metrics | Get aggregate workflow metrics |
| GET | /health/latest | Get latest health check results |
Status Endpoint
Get the status of a running or completed workflow:
curl http://localhost:8787/workflow/status/compilation/wf-compile-abc123Response:
{ "success": true, "workflowType": "compilation", "workflowId": "wf-compile-abc123", "status": "complete", "output": { "success": true, "requestId": "wf-compile-abc123", "configName": "My Filter List", "ruleCount": 45000, "totalDurationMs": 2500 }}Workflow Status Values:
queued- Waiting to startrunning- Currently executingpaused- Manually pausedcomplete- Successfully finishederrored- Failed with errorterminated- Manually stoppedunknown- Status unavailable
Metrics Endpoint
Get aggregate metrics for all workflows:
curl http://localhost:8787/workflow/metricsResponse:
{ "compilation": { "totalRuns": 150, "successfulRuns": 145, "failedRuns": 5, "avgDurationMs": 3200, "lastRunAt": "2024-01-15T10:30:00Z" }, "batch": { "totalRuns": 25, "totalCompilations": 100, "avgDurationMs": 15000 }, "cacheWarming": { "totalRuns": 48, "scheduledRuns": 46, "manualRuns": 2, "totalConfigsWarmed": 144 }, "health": { "totalChecks": 168, "totalSourcesChecked": 840, "totalHealthy": 820, "alertsTriggered": 3 }}Latest Health Results
Get the most recent health check results:
curl http://localhost:8787/health/latestResponse:
{ "success": true, "timestamp": "2024-01-15T10:00:00Z", "runId": "cron-health-abc123", "results": [ { "name": "EasyList", "url": "https://easylist.to/easylist/easylist.txt", "healthy": true, "statusCode": 200, "responseTimeMs": 450, "ruleCount": 72500 }, { "name": "EasyPrivacy", "url": "https://easylist.to/easylist/easyprivacy.txt", "healthy": true, "statusCode": 200, "responseTimeMs": 380, "ruleCount": 18200 } ], "summary": { "total": 5, "healthy": 5, "unhealthy": 0 }}Workflow Events (Real-Time Progress)
Get real-time progress events for a running workflow:
# Get all events for a workflowcurl http://localhost:8787/workflow/events/wf-compile-abc123
# Get events since a specific timestamp (for polling)curl "http://localhost:8787/workflow/events/wf-compile-abc123?since=2024-01-15T10:30:00.000Z"Response:
{ "success": true, "workflowId": "wf-compile-abc123", "workflowType": "compilation", "startedAt": "2024-01-15T10:30:00.000Z", "completedAt": "2024-01-15T10:30:05.000Z", "progress": 100, "isComplete": true, "events": [ { "type": "workflow:started", "workflowId": "wf-compile-abc123", "workflowType": "compilation", "timestamp": "2024-01-15T10:30:00.000Z", "data": {"configName": "My Filter List", "sourceCount": 2} }, { "type": "workflow:step:started", "workflowId": "wf-compile-abc123", "workflowType": "compilation", "timestamp": "2024-01-15T10:30:00.100Z", "step": "validate" }, { "type": "workflow:progress", "workflowId": "wf-compile-abc123", "workflowType": "compilation", "timestamp": "2024-01-15T10:30:00.500Z", "progress": 25, "message": "Configuration validated" }, { "type": "workflow:completed", "workflowId": "wf-compile-abc123", "workflowType": "compilation", "timestamp": "2024-01-15T10:30:05.000Z", "data": {"ruleCount": 45000, "totalDurationMs": 5000} } ]}Event Types:
| Type | Description |
|---|---|
workflow:started | Workflow execution began |
workflow:step:started | A workflow step started |
workflow:step:completed | A workflow step finished successfully |
workflow:step:failed | A workflow step failed |
workflow:progress | Progress update with percentage and message |
workflow:completed | Workflow finished successfully |
workflow:failed | Workflow failed with error |
source:fetch:started | Source fetch operation started |
source:fetch:completed | Source fetch completed with rule count |
transformation:started | Transformation step started |
transformation:completed | Transformation completed |
cache:stored | Result cached to KV |
health:check:started | Health check started for a source |
health:check:completed | Health check completed |
Polling for Real-Time Updates:
To monitor workflow progress in real-time, poll the events endpoint:
async function pollWorkflowEvents(workflowId) { let lastTimestamp = null;
while (true) { const url = `/workflow/events/${workflowId}`; const params = lastTimestamp ? `?since=${encodeURIComponent(lastTimestamp)}` : '';
const response = await fetch(url + params); const data = await response.json();
if (data.events?.length > 0) { for (const event of data.events) { console.log(`[${event.type}] ${event.message || event.step || ''}`); lastTimestamp = event.timestamp; } }
if (data.isComplete) { console.log('Workflow completed!'); break; }
await new Promise(resolve => setTimeout(resolve, 2000)); }}Scheduled Workflows (Cron)
Workflows can be triggered automatically via cron schedules defined in wrangler.toml:
[triggers]crons = [ "0 */6 * * *", # Cache warming: every 6 hours "0 * * * *", # Health monitoring: every hour]The scheduled() handler routes cron events to the appropriate workflow:
| Cron Pattern | Workflow | Purpose |
|---|---|---|
0 */6 * * * | CacheWarmingWorkflow | Pre-warm popular filter list caches |
0 * * * * | HealthMonitoringWorkflow | Monitor source availability |
Configuration
wrangler.toml
# Workflow bindings[[workflows]]name = "compilation-workflow"binding = "COMPILATION_WORKFLOW"class_name = "CompilationWorkflow"
[[workflows]]name = "batch-compilation-workflow"binding = "BATCH_COMPILATION_WORKFLOW"class_name = "BatchCompilationWorkflow"
[[workflows]]name = "cache-warming-workflow"binding = "CACHE_WARMING_WORKFLOW"class_name = "CacheWarmingWorkflow"
[[workflows]]name = "health-monitoring-workflow"binding = "HEALTH_MONITORING_WORKFLOW"class_name = "HealthMonitoringWorkflow"
# Cron triggers[triggers]crons = [ "0 */6 * * *", "0 * * * *",]Step Configuration
Each workflow step can have custom retry and timeout settings:
await step.do('step-name', { retries: { limit: 3, // Max retries delay: '30 seconds', // Initial delay backoff: 'exponential', // Backoff strategy }, timeout: '5 minutes', // Step timeout}, async () => { // Step logic});Error Handling & Recovery
Automatic Retry
Each step has configurable retry policies:
- Compilation steps: 2 retries with 30s exponential backoff, 5 minute timeout
- Cache steps: 2 retries with 2s delay
- Health checks: 2 retries with 5s delay, 2 minute timeout
Crash Recovery
If a workflow crashes mid-execution:
- Cloudflare detects the failure
- Workflow resumes from the last completed step
- State is automatically restored
- Processing continues without re-running completed steps
Dead Letter Handling
Failed workflows after max retries are logged with:
- Full error details
- Step that failed
- Workflow parameters
- Timestamp
Alerts can be configured via the health monitoring workflow to notify on persistent failures.
Workflow Diagrams
Compilation Workflow
flowchart TD
Start[Workflow Start] --> Validate[Step: validate]
Validate -->|Valid| Compile[Step: compile-sources]
Validate -->|Invalid| Error[Return Error Result]
Compile -->|Success| Cache[Step: cache-result]
Compile -->|Retry| Compile
Compile -->|Max Retries| Error
Cache --> Metrics[Step: update-metrics]
Metrics --> Complete[Return Success Result]
Error --> Complete
style Validate fill:#1a237e,color:#fff
style Compile fill:#b8860b,color:#fff
style Cache fill:#1b5e20,color:#fff
style Metrics fill:#1a237e,color:#fff
style Complete fill:#1b5e20,color:#fff
style Error fill:#c62828,color:#fff
Batch Workflow with Chunking
flowchart TD
Start[Workflow Start] --> ValidateBatch[Step: validate-batch]
ValidateBatch --> Chunk1[Step: compile-chunk-1]
Chunk1 --> Item1A[Compile Item 1]
Chunk1 --> Item1B[Compile Item 2]
Chunk1 --> Item1C[Compile Item 3]
Item1A --> Chunk1Done
Item1B --> Chunk1Done
Item1C --> Chunk1Done
Chunk1Done[Chunk 1 Complete] --> Chunk2[Step: compile-chunk-2]
Chunk2 --> Item2A[Compile Item 4]
Chunk2 --> Item2B[Compile Item 5]
Item2A --> Chunk2Done
Item2B --> Chunk2Done
Chunk2Done[Chunk 2 Complete] --> Metrics[Step: update-batch-metrics]
Metrics --> Complete[Return Batch Result]
style ValidateBatch fill:#1a237e,color:#fff
style Chunk1 fill:#b8860b,color:#fff
style Chunk2 fill:#b8860b,color:#fff
style Metrics fill:#1a237e,color:#fff
style Complete fill:#1b5e20,color:#fff
Health Monitoring Workflow
flowchart TD
Start[Cron/Manual Trigger] --> LoadHistory[Step: load-health-history]
LoadHistory --> CheckSource1[Step: check-source-1]
CheckSource1 --> Delay1[Sleep 2s]
Delay1 --> CheckSource2[Step: check-source-2]
CheckSource2 --> Delay2[Sleep 2s]
Delay2 --> CheckSourceN[Step: check-source-N]
CheckSourceN --> Analyze[Step: analyze-results]
Analyze -->|Alerts Needed| SendAlerts[Step: send-alerts]
Analyze -->|No Alerts| Store
SendAlerts --> Store[Step: store-results]
Store --> Complete[Return Health Result]
style LoadHistory fill:#1a237e,color:#fff
style CheckSource1 fill:#b8860b,color:#fff
style CheckSource2 fill:#b8860b,color:#fff
style CheckSourceN fill:#b8860b,color:#fff
style Analyze fill:#b84000,color:#fff
style SendAlerts fill:#c62828,color:#fff
style Store fill:#1b5e20,color:#fff
style Complete fill:#1b5e20,color:#fff
Notes
- Workflows are available when deployed to Cloudflare Workers
- Local development may use stubs for workflow bindings
- Metrics are stored in the
METRICSKV namespace - Cached results use the
COMPILATION_CACHEKV namespace - Health history is retained for 30 days
- Workflow instances can be monitored in the Cloudflare dashboard