The Capstone Brief
5 minBuild a workflow orchestrator: a tool that runs a pipeline of named steps, where steps can depend on others, fail and retry, be skipped if already done, and report their outcome. Think a minimal Airflow / Make for your own automations.
- Steps with dependencies — run in correct order (topological sort).
- Retries per step with backoff (Lessons 24, 45).
- Checkpointing — resume a partially-completed run (Lesson 45).
- Observability — structured logging + a run report (Lessons 13-14, 22).
- Alerting — Slack/notify on completion and failure (Lessons 31-34).
- Safety — dry-run, idempotent steps, fail loud (Lessons 44-46).
Then prove it by orchestrating a real pipeline — e.g. fetch → transform → report → deliver → backup.
Design the Engine
8 minA workflow is a graph of steps:
fetch ──► transform ──► report ──► deliver
│ ▲
└──► backup ───────────┘ (deliver waits for both)
The engine:
1. takes steps + their dependencies
2. orders them (a step runs only after its deps succeed)
3. runs each: skip if checkpointed, retry on transient failure
4. on any hard failure: stop dependents, log, alert
5. emit a run report (per-step status + timing)Every mechanism here you've already built: retries (24/45), checkpoints (45), logging (13-14), reports (22), alerts (34), safety (44/46). The capstone is wiring them into one reusable engine and then using it. Aim for clean structure over feature count — a small orchestrator done well beats a sprawling one.
Build It · The Step & the Engine
16 minA Step
from dataclasses import dataclass, field from typing import Callable @dataclass class Step: name: str run: Callable[[dict], None] # the work; gets a shared context dict depends_on: list[str] = field(default_factory=list) retries: int = 2
A step is a named callable plus its dependencies and retry budget. The run function receives a shared context dict so steps can pass data downstream (fetch puts rows in it; report reads them).
Ordering steps (topological sort)
def order_steps(steps: dict) -> list[str]: """Return step names in dependency order; raise on cycles.""" ordered, visiting, done = [], set(), set() def visit(name): if name in done: return if name in visiting: raise ValueError(f"cycle detected at {name}") visiting.add(name) for dep in steps[name].depends_on: visit(dep) visiting.discard(name); done.add(name) ordered.append(name) for name in steps: visit(name) return ordered
A depth-first topological sort guarantees a step runs only after its dependencies, and detects impossible cycles. This is the one genuinely new algorithm — and it's small.
The engine
import json, time, logging from pathlib import Path from datetime import datetime log = logging.getLogger("orchestrator") class Workflow: def __init__(self, name: str, dry_run: bool = False): self.name = name self.dry_run = dry_run self.steps: dict[str, Step] = {} self.state_file = Path(f".wf_{name}.json") def add(self, step: Step): self.steps[step.name] = step return self def _load_done(self) -> set: if self.state_file.exists(): return set(json.loads(self.state_file.read_text())) return set() def run(self, context: dict | None = None) -> dict: context = context or {} done = self._load_done() report = {"workflow": self.name, "started": datetime.now().isoformat(), "steps": []} order = order_steps(self.steps) log.info("run order: %s", " → ".join(order)) for name in order: step = self.steps[name] # skip if dependency failed earlier this run if any(d not in done and self._failed(report, d) for d in step.depends_on): self._record(report, name, "skipped", 0, "dependency failed") continue if name in done: self._record(report, name, "cached", 0, "already done") continue if self.dry_run: self._record(report, name, "dry-run", 0, "would run") continue self._run_step(step, context, done, report) report["finished"] = datetime.now().isoformat() self._save_report(report) return report
The engine orders the steps, then for each: skips if a dependency failed, returns cached if checkpointed, shows intent in dry-run, else runs it with retries. State and a report are persisted.
Build It · Step Execution & Using It
16 minRunning a step with retries + checkpoint + alert
def _run_step(self, step, context, done, report): for attempt in range(1, step.retries + 2): # 1 try + N retries t0 = time.time() try: log.info("▶ %s (attempt %d)", step.name, attempt) step.run(context) # do the work dt = time.time() - t0 done.add(step.name) self.state_file.write_text(json.dumps(list(done))) # checkpoint self._record(report, step.name, "ok", dt, "") log.info("✓ %s in %.1fs", step.name, dt) return except Exception as e: dt = time.time() - t0 if attempt <= step.retries: wait = 2 ** (attempt - 1) log.warning("✗ %s failed (%s); retry in %ds", step.name, e, wait) time.sleep(wait) else: log.exception("✗ %s FAILED permanently", step.name) self._record(report, step.name, "failed", dt, str(e)) notify_alert(f"workflow '{self.name}' step '{step.name}' " f"FAILED: {e}", "critical") # Lesson 34 return def _record(self, report, name, status, dt, note): report["steps"].append({"name": name, "status": status, "duration_s": round(dt, 2), "note": note}) def _failed(self, report, name): return any(s["name"] == name and s["status"] == "failed" for s in report["steps"]) def _save_report(self, report): Path(f"report_{self.name}.json").write_text(json.dumps(report, indent=2))
Using the orchestrator on a real pipeline
import logging logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s", datefmt="%H:%M:%S") # each step is a small function that reads/writes the shared context def fetch(ctx): ctx["rows"] = get_data() # Lesson 24 def transform(ctx): ctx["summary"] = summarise(ctx["rows"]) # Lesson 17 def report(ctx): ctx["out"] = build_report(ctx["rows"], ctx["summary"]) # L22 def deliver(ctx): send_report(ctx["out"], ctx["summary"]) # Lessons 29-30 def backup(ctx): upload_offsite(ctx["out"]) # Lesson 42 wf = (Workflow("daily", dry_run=False) .add(Step("fetch", fetch)) .add(Step("transform", transform, depends_on=["fetch"])) .add(Step("report", report, depends_on=["transform"])) .add(Step("backup", backup, depends_on=["report"])) .add(Step("deliver", deliver, depends_on=["report", "backup"]))) result = wf.run() ok = all(s["status"] in ("ok", "cached") for s in result["steps"]) notify_alert(f"daily workflow {'✅ complete' if ok else '⚠️ had failures'}", "good" if ok else "warning")
12:00:00 INFO run order: fetch → transform → report → backup → deliver 12:00:01 INFO ▶ fetch (attempt 1) 12:00:03 INFO ✓ fetch in 2.1s 12:00:03 INFO ▶ transform (attempt 1) 12:00:03 INFO ✓ transform in 0.2s 12:00:03 INFO ▶ report (attempt 1) 12:00:04 INFO ✓ report in 1.0s 12:00:04 INFO ▶ backup (attempt 1) 12:00:06 INFO ✓ backup in 1.8s 12:00:06 INFO ▶ deliver (attempt 1) 12:00:07 INFO ✓ deliver in 0.9s # Slack: ✅ daily workflow complete # report_daily.json written; .wf_daily.json checkpoints all 5 steps
Read the result
You've built a real engine: it ordered the steps by dependency, ran each with retries, checkpointed every success (so a re-run after a crash skips completed steps as "cached"), persisted a per-step report, and alerted on the outcome. deliver correctly waited for both report and backup. The fluent .add().add() API and the shared context dict make defining a pipeline clean. Every Level 7 skill is in here — and you can now express any automation as a workflow.
Build Your Capstone
20 minBuild the orchestrator and use it for a pipeline you care about — your data, your servers, your chores. Tackle it in stages.
Implement Step, order_steps (with cycle detection), and a Workflow.run that executes steps in order with logging. Test with 3-4 toy steps and verify the order respects dependencies.
Add per-step retries, checkpointing (skip cached steps on re-run), and dependency-failure skipping. Make a step fail and confirm: it retries, then its dependents are skipped, and a re-run resumes from the failure.
Wire real steps (fetch/transform/report/deliver/backup using your earlier modules), add the run report and Slack/email alerts, and a --dry-run. Schedule it (Lesson 36). Run it end to end and capture the report + the alert.
Stretch · Make It Shine
10 minPick one or two to push your capstone further:
- Parallel steps — run independent steps concurrently (threads), respecting dependencies.
- Config-defined workflows — load the step graph from a YAML/JSON file so non-programmers can edit it.
- A status dashboard — write the run report as an HTML page (or post a Block Kit summary to Slack).
- Conditional steps — a step runs only if a predicate on the context is true.
- Per-step timeout — kill a step that runs too long and treat it as a failure.
Show a config-driven sketch
# workflow.json # { # "name": "daily", # "steps": [ # {"name": "fetch", "fn": "steps.fetch"}, # {"name": "transform", "fn": "steps.transform", "depends_on": ["fetch"]}, # ... # ] # } import json, importlib def load_workflow(path): spec = json.loads(open(path).read()) wf = Workflow(spec["name"]) for s in spec["steps"]: module, fn = s["fn"].rsplit(".", 1) func = getattr(importlib.import_module(module), fn) wf.add(Step(s["name"], func, s.get("depends_on", []))) return wf
Config-driven workflows separate the pipeline definition from the engine — the hallmark of a real orchestrator.
Recap
3 minThe orchestrator is Level 7 in one tool: a Step (named callable + dependencies + retries), a topological sort to order them, and a Workflow engine that runs each step with retries, checkpointing (resume on re-run), dependency-failure skipping, dry-run, a persisted report, and outcome alerts. Steps share a context dict to pass data. You wired in fetch/transform/report/deliver/backup from earlier lessons and got a production-grade pipeline. The deeper lesson: with the right small abstractions, complex automation becomes composable — you describe the graph, the engine handles the hard parts. That's the craft of automation.
Vocabulary Card
- orchestrator
- A tool that runs multi-step workflows, handling order, retries, and failures.
- topological sort
- Ordering items so each comes after its dependencies (detects cycles).
- context
- A shared object passed between steps to carry data through the pipeline.
- DAG
- Directed acyclic graph — the dependency structure of a workflow.
Homework · Ship Your Capstone
5 minFinish and ship your workflow orchestrator running a real, multi-step pipeline you'll actually use. It must demonstrate: dependency ordering, retries, checkpointed resume, logging, a run report, an alert on completion/failure, dry-run, and scheduling. Write a short README (what it does, the step graph, how to run it, how failures surface) and record one full run's log + report. This is your Level 7 portfolio piece.
Sample · capstone README
Daily Sales Workflow (orchestrator capstone)
Graph: fetch → transform → report → backup → deliver
(deliver depends on report AND backup)
Run: python run_workflow.py # live
python run_workflow.py --dry-run # shows plan, does nothing
Schedule: cron '0 8 * * *' (scheduler-safe paths, file logging)
Resilience: each step retries 2x with backoff; successes are
checkpointed to .wf_daily.json, so a crash mid-run resumes
(completed steps show "cached"). A hard failure skips dependents,
writes report_daily.json, and posts a CRITICAL Slack alert.
Observability: logs/daily.log (rotating), report_daily.json
(per-step status+timing), Slack ping on every run (✅/⚠️).
Proof: attached daily.log + report_daily.json from a real 08:00 run,
plus a screenshot of the kill-and-resume test.Non-negotiables: real multi-step pipeline, dependency order, retries+resume, report, alert, dry-run, scheduled, README + run evidence.