# Designing a Job Scheduler: DAGs, State Machines, and Crash Recovery

I've been in the postmortem for a scheduler that "forgot" a task was running after a scheduler-process restart, and dutifully kicked off a second copy against the same target table. Nothing in the code was obviously wrong — the bug was architectural. State lived in the scheduler's memory instead of durable storage, so a routine restart erased the fact that a task was mid-flight, and the scheduler's only option on restart was to trust what it could see, which was nothing. That single design choice — state in memory vs state in a database — is the fault line that separates a toy scheduler from one you can run in production, and it's the thread this whole article pulls on.

This isn't a guide to operating Airflow or Dagster — [Airflow internals](airflow-internals) and the [Airflow vs Prefect vs Dagster comparison](airflow-prefect-dagster-orchestration) already cover choosing and running those tools. This is the theory underneath all of them: how you'd actually design a DAG-based workflow scheduler from first principles, and where the real difficulty hides.

## Why is a workflow a DAG, and what does that buy you?

A workflow is a **directed acyclic graph (DAG)** — nodes are tasks, edges are dependencies, and "acyclic" is a hard constraint because a cycle would mean a task depends on itself finishing, directly or transitively, which is unrunnable by definition. Representing a workflow this way, rather than as a fixed linear script, buys you two things a script can't give you for free: **parallelism** — any two tasks with no dependency path between them can run at the same time — and a well-defined, computable execution order via **topological sort**, which produces a valid linear ordering of all tasks such that every task appears after everything it depends on. A scheduler doesn't need to compute one fixed topological order up front and follow it rigidly, though — the more useful runtime behavior is dynamic: continuously ask "which tasks have all their upstream dependencies satisfied right now" and run all of them, which naturally yields maximum parallelism without ever computing a full static ordering in advance.

## How does the scheduler loop actually decide what to run next?

At its core, the scheduler loop is simple to state and easy to get wrong in the details: continuously poll (or react to events) and ask, for every task in every active run, "are all of this task's upstream dependencies in a terminal-success state, and is this task itself not yet running?" If yes, enqueue it for execution. Two architectural choices sit inside that simple statement. **Polling** — a loop that re-evaluates DAG state on a fixed interval — is simpler to reason about and debug, at the cost of latency bounded by the poll interval and wasted work re-checking DAGs that haven't changed. **Event-driven triggering** — react immediately when a task completes, checking only whether its completion unblocks anything downstream — is lower-latency and less wasteful, at the cost of real complexity: you now need reliable event delivery, and a missed or duplicated event becomes a scheduling bug instead of just a slightly late scheduling decision. Most production schedulers land on a hybrid: event-driven triggering for the common case, with a periodic polling pass as a safety net that catches anything the event path missed — belt and suspenders, because a scheduler that silently stops scheduling is a much worse failure than one that's occasionally a few seconds slow.

```mermaid
graph TD
    A["Task A"] --> B["Task B"]
    A --> C["Task C"]
    B --> D["Task D"]
    C --> D
    D --> E["Task E"]
    subgraph LOOP["Scheduler loop, continuously"]
        Q["For each task:all upstream deps in SUCCESS?"]
        Q -->|"yes + not running"| ENQ["Enqueue for execution"]
        Q -->|"no"| WAIT["Leave in queued state"]
    end
          
```

Tasks B and C can run in parallel once A succeeds — the scheduler doesn't need a single fixed order, only a continuous check of "which tasks have every upstream dependency satisfied right now." D can only start once both B and C are done; that fan-in is exactly what a naive linear script can't express without manual coordination logic.

## Why is the task state machine the hardest part of the whole design?

Because it's the part that has to be simultaneously exhaustive (every real-world failure mode maps to a valid state) and unambiguous (the scheduler can always tell, from the state alone, what to do next) — and most first attempts get neither right on the first pass. A workable state machine needs, at minimum: `queued`, `running`, `success`, `failed`, `retrying`, and `upstream_failed` — that last one matters more than it looks: a task whose dependency failed needs a state distinct from "failed on its own merits," because the response is different (don't retry it, its own logic never ran) and the observability story is different (an operator needs to see immediately that this is a cascade, not an independent break).

The subtlety that catches teams is **transition validity** — not every state can move to every other state, and the scheduler has to enforce that or it becomes possible to end up with corrupted, self-contradictory state (a task simultaneously `running` and `success`, for instance, because two different code paths wrote to it without coordinating). Every transition should be a single, atomic, explicit operation against the metadata store, not an implicit side effect of some other piece of logic — which is exactly why this is the piece most likely to have a subtle, hard-to-reproduce bug months into production, long after the happy path has been tested extensively.

```python
# A task state transition should be one explicit, atomic write —
# not something inferred from "no exception was raised"
VALID_TRANSITIONS = {
    "queued":         {"running"},
    "running":        {"success", "failed", "retrying"},
    "retrying":       {"queued"},
    "failed":         set(),          # terminal
    "success":        set(),          # terminal
    "upstream_failed": set(),         # terminal
}

def transition(task_instance, new_state):
    if new_state not in VALID_TRANSITIONS[task_instance.state]:
        raise InvalidStateTransition(task_instance.state, new_state)
    # single atomic write to the metadata store, not an in-memory flag
    metadata_db.update_task_state(task_instance.id, new_state)
```

## How should retries and backoff actually be designed?

A naive retry ("just try again immediately") makes transient failures worse under load — a downstream service that's struggling gets hit with another request the instant the first one fails, from every failed task simultaneously if the failure is widespread, which is how a brief blip turns into a cascading outage. **Exponential backoff** — waiting 1s, then 2s, then 4s, then 8s, then 16s between attempts — spaces retries out so a recovering service gets breathing room instead of a second wave of load timed to arrive exactly when it's weakest. Concretely: on failure, the scheduler sets the task's state to `retrying`, increments a `retry_count`, computes the next eligible run time as `now + backoff(retry_count)`, and writes that back to the metadata store so a future scheduler-loop pass picks it up exactly when it's due — not before.

The part that's easy to skip and expensive to skip is a **max-retry ceiling** and **poison-pill detection**. Without a hard cap, a task that fails for a genuinely non-transient reason (a bad input row, a permanently missing upstream file) retries forever, consuming scheduler and executor capacity indefinitely while never succeeding — a "poison pill" that can quietly starve other work in a shared execution pool. The fix is boring but necessary: a fixed max-retry count that moves the task to a genuinely terminal `failed` state and pages someone, rather than a queue that silently retries the same broken task for days.

## Why does the scheduler have to be crash-recoverable from a database, not memory?

Because a scheduler process *will* restart — deploys, crashes, node failures, autoscaling events — and every one of those has to be a non-event from the perspective of correctness. The **metadata store** is the durable source of truth for DAG definitions, run state, and every task's current state; the scheduler process itself should be able to die and come back with zero memory of what it was doing, reconstruct its entire view of the world from a query against that database, and resume exactly where it left off. This is the fix for the incident I opened with: if task state lives only in the scheduler's memory, a restart genuinely loses the fact that a task was in flight, and the scheduler's only honest options on restart are "assume it's still running" (which can be wrong) or "assume it failed and retry" (which duplicates work if it was actually still running) — both wrong answers to a question a durable metadata store would have answered correctly.

The complementary mechanism on the executor side is a **heartbeat and lease timeout**: a running task periodically writes a heartbeat timestamp to the metadata store, and a separate watchdog process looks for tasks marked `running` whose last heartbeat is older than the lease timeout — those are presumed to belong to a worker that crashed without updating state, and get requeued for another worker to pick up. Combined, the metadata store plus the heartbeat mechanism are what make "the scheduler restarted" and "a worker crashed mid-task" both recoverable events instead of data-corruption events.

## What does the executor abstraction need to provide, and what does that place on the tasks themselves?

The **executor** is the layer that actually runs a task's code, and separating it cleanly from the scheduler is what lets the same DAG logic run against wildly different execution backends — a local process pool for development, a distributed worker fleet (Celery, Kubernetes pods) for production — without the scheduler needing to know or care which one is underneath. The executor abstraction has to expose, at minimum: submit a task for execution, report back success/failure, and support a concurrency limit so a burst of simultaneously-ready tasks doesn't overwhelm shared downstream resources (a database connection pool, an API rate limit) — that concurrency cap is the scheduler's primary backpressure mechanism, and it needs to be enforceable per-pool or per-resource, not just globally, or one noisy DAG can starve every other DAG sharing the executor.

All of this — retries, crash recovery, requeuing after a missed heartbeat — puts one non-negotiable requirement on task authors: **tasks must be idempotent**. If a task can be retried, and it will be, running it twice has to produce the same end state as running it once — an `INSERT` without a dedup key, or a non-idempotent external API call, turns every one of the scheduler's crash-recovery and retry guarantees into a data-correctness bug the moment a retry actually fires. This is exactly the same requirement [pipeline design](designing-a-data-pipeline) calls out as the single most important property of a pipeline task — the scheduler's entire reliability story is built on top of an assumption about the tasks it runs, not something the scheduler can enforce or fix on its own.

**A scheduler with a beautifully durable metadata store and a broken idempotency assumption is still going to corrupt data — the crash-recovery machinery works exactly as designed, and that's the problem.** I've seen a "reliable" scheduler faithfully requeue a task after a missed heartbeat, exactly as it should, only for that task to append a second copy of the same batch to a downstream table because nobody had audited it for idempotency. The scheduler did its job correctly; the failure was a task author's assumption that retries "probably wouldn't happen." Every task that goes into a DAG needs an explicit idempotency review before it ships — not an assumption based on how rarely retries seem to fire in testing.

## What to carry away

Every workflow scheduler — however different Airflow, Dagster, and Temporal look on the surface — is built from the same five pieces: a DAG model with topological execution order, a scheduler loop deciding what's ready to run, a task state machine that has to be exhaustive and atomic, a durable metadata store that makes crash recovery a non-event instead of a data-loss event, and an executor abstraction that separates "what to run" from "where it runs." The state machine is the part most likely to have a subtle bug months in, because it has to enumerate every real failure mode (including the cascade case, `upstream_failed`, that's easy to forget) and enforce that transitions only happen atomically against durable storage.

None of the reliability machinery — retries, backoff, crash recovery via heartbeats — means anything if the tasks running underneath it aren't idempotent, because every one of those mechanisms works by potentially running a task more than once. Design the state machine and the metadata store first; audit every task for idempotency before it ships; and treat "the scheduler process might restart at any moment" as a normal operating condition to design for, not an edge case to handle later.
