obsidian/wiki/payloadcms/jobs-queue-workflows.md
2026-05-15 16:14:29 +01:00

7.8 KiB
Raw Blame History

title aliases tags sources created updated
Jobs Queue — Workflows
payload-workflows
jobs-workflows
workflow-retry
payloadcms
jobs-queue
workflows
concurrency
retry
raw/jobs-queue__workflows.md
2026-05-15 2026-05-15

What is a Workflow?

A Workflow combines multiple Tasks into a sequential pipeline with smart failure recovery. If any task fails mid-workflow, the handler re-runs but already-completed tasks are skipped — only the failed task and everything after it retries.

Use a workflow when you have 2+ dependent tasks and want per-task retry logic. For a single operation, use a plain wiki/payloadcms/jobs-queue-tasks instead.

Defining a Workflow

Add to jobs.workflows[] in your Payload config:

{
  slug: 'onboardUser',
  inputSchema: [
    { name: 'userId', type: 'text', required: true },
  ],
  handler: async ({ job, tasks }) => {
    await tasks.createProfile('step-create-profile', { input: { userId: job.input.userId } })
    await tasks.sendWelcomeEmail('step-send-email', { input: { userId: job.input.userId } })
    await tasks.addToMailingList('step-add-list', { input: { userId: job.input.userId } })
  },
}

Workflow Config Options

Option Description
slug Unique name (shared namespace with task slugs)
handler Async function or file path string
inputSchema Field definitions — generates TypeScript types
interfaceName Override generated TS interface name (default: Workflow + slug)
label Human-friendly display name
queue Queue name, defaults to "default"
retries Workflow-level retry cap; 0 = fail on any task error; undefined = inherit per-task retries
concurrency Prevent parallel execution of jobs with the same key (see Concurrency Controls)

Task IDs Must Be Stable

Each tasks.someTask(id, ...) call needs a stable, unique ID. On retry, Payload uses this ID to find cached output. If the ID changes between runs, the task re-executes unnecessarily.

// Good — stable descriptive IDs
await tasks.sendEmail('send-welcome-email', { input })

// Bad — positional numbers work but are hard to debug
await tasks.sendEmail('1', { input })

Inline Tasks

Run ad-hoc logic without a pre-declared task using inlineTask. Useful for one-off steps.

handler: async ({ job, tasks, inlineTask }) => {
  await tasks.createPost('1', { input: { title: job.input.title } })

  const { newPost } = await inlineTask('2', {
    task: async ({ req }) => {
      const newPost = await req.payload.update({
        collection: 'posts',
        id: '2',
        data: { title: 'updated!' },
        req,
        retries: 3,
      })
      return { output: { newPost } }
    },
  })
}

Drawback: inline task data stored on job.taskStatus.inline['2'] is untyped.

Failure & Recovery

First run:
  step1 → ✅ profile created
  step2 → ❌ email service down
  step3 → never reached

Retry:
  step1 → skipped (returns cached output)
  step2 → ✅ email service recovered
  step3 → ✅ added to list

The entire handler re-runs, but completed tasks return their stored result immediately.

Accessing Task Outputs

handler: async ({ job, tasks }) => {
  // Method 1: capture return value
  const result = await tasks.createDocument('create-doc', {
    input: { title: 'My Document' },
  })
  const docId = result.output.documentId

  // Method 2: read from job.taskStatus
  const docId2 = job.taskStatus.createDocument['create-doc'].output.documentId

  await tasks.updateDocument('update-doc', {
    input: { documentId: docId, status: 'published' },
  })
}

Task status shape:

job.taskStatus[taskSlug][taskId] = {
  input: { /* provided input */ },
  output: { /* returned output */ },
  complete: true,
  totalTried: 1,
}

Best Practices

Keep Tasks Small and Focused

One task = one concern with independent retry logic. A monolithic task is all-or-nothing.

Pass IDs, Not Objects

// ✅ Pass ID — task fetches what it needs
await tasks.processUser('process', { input: { userId: '123' } })

// ❌ Avoid passing large objects as input
await tasks.processUser('process', { input: { user: { /* entire object */ } } })

Set Appropriate Retry Counts

  • External APIs (email, payment): 35 retries
  • Database operations: 12 retries
  • Idempotent operations: Higher retries are safe
  • Non-idempotent (creates, charges, sends): Lower retries to avoid duplicates

Handle Errors with Context

try {
  const result = await fetch('https://api.example.com/data')
  if (!result.ok) throw new Error(`API ${result.status}: ${result.statusText}`)
  return { output: { success: true } }
} catch (error) {
  throw new Error(`Failed to sync data for user ${input.userId}: ${error.message}`)
}

Concurrency Controls

Prevent race conditions when multiple jobs operate on the same resource.

Enable first:

export default buildConfig({
  jobs: {
    enableConcurrencyControl: true, // adds indexed concurrencyKey column — may need migration
  },
})

Then add concurrency to the workflow:

{
  slug: 'syncDocument',
  concurrency: ({ input }) => `sync:${input.documentId}`,
  handler: async ({ job, inlineTask }) => { /* ... */ }
}

Full Concurrency Config

concurrency: {
  key: ({ input, queue }) => `sync:${input.documentId}`,
  exclusive: true,   // only one at a time (default: true)
  supersedes: false, // delete older pending jobs (default: false)
}

Common Patterns

Pattern Config Use when
Sequential, all jobs run exclusive: true, supersedes: false Processing distinct document versions
Latest-wins exclusive: true, supersedes: true Regenerating embeddings/thumbnails after rapid edits
Queue-specific key includes queue param Same resource OK to process concurrently across queues

How Concurrency Works

  1. Key computed from input, stored on job document at queue time
  2. Job runner excludes jobs whose key is currently running
  3. If two same-key jobs are picked up in the same batch, only the first runs; others go back to processing: false
  4. All jobs eventually complete — they wait their turn

Important Notes

  • Concurrency is global across queues by default — include queue name in key if you want queue-specific behavior
  • Only pending jobs are deleted by supersedes, not running ones
  • Jobs without concurrency config run in parallel as normal

Key Takeaways

  • Workflow = multi-task pipeline with per-task retry; each task resumes from failure point, not from the beginning
  • Task IDs must be stable across handler re-runs — use descriptive strings, not positional numbers
  • inlineTask is convenient but its output is untyped
  • Pass IDs not objects to keep job input small and tasks independently fetchable
  • retries: 0 at workflow level disables all task-level retries and fails immediately on any error
  • enableConcurrencyControl: true requires a DB migration (adds concurrencyKey column)
  • supersedes: true deletes only pending jobs — a running job always completes