Learning Goals
3 minBy the end of this lesson you can:
- Checkpoint progress so a restarted job resumes instead of starting over.
- Isolate per-item errors so one bad record doesn't kill the batch.
- Retry transient failures with backoff (a reusable decorator).
- Write idempotent steps and shut down gracefully on Ctrl-C / signals.
Warm-Up · The Hour-3 Crash
5 minYou're processing 100,000 records. At record 73,000 a network blip raises an exception. A naive script dies — and a re-run starts at record 1, redoing 3 hours of work (and maybe double-charging, double-emailing, double-uploading).
Resilient long jobs are built from four habits: checkpoint (remember what's done so you can resume), isolate (one item's failure doesn't stop the rest), retry (transient errors get another go), and idempotency (re-running a step is safe — no double effects). Add graceful shutdown and a long job becomes something you can trust to finish, or to pick up exactly where it left off.
New Concept · The Four Habits
14 min1. Checkpointing — resume, don't restart
import json from pathlib import Path CHECKPOINT = Path("progress.json") def load_done() -> set: if CHECKPOINT.exists(): return set(json.loads(CHECKPOINT.read_text())) return set() def mark_done(done: set, item_id) -> None: done.add(item_id) CHECKPOINT.write_text(json.dumps(list(done))) # persist after each item done = load_done() for item in all_items: if item["id"] in done: continue # already processed — skip on resume process(item) mark_done(done, item["id"]) # record progress immediately
Persist progress as you go. On restart, skip what's already marked done. The job resumes at record 73,001, not record 1.
2. Per-item isolation — keep going
failures = [] for item in items: try: process(item) mark_done(done, item["id"]) except Exception as e: log.exception("item %s failed", item["id"]) failures.append((item["id"], str(e))) # collect, don't crash log.info("done: %d ok, %d failed", len(items) - len(failures), len(failures))
Wrap each item so one failure is logged and recorded, not fatal. At the end you have a list of failures to retry later — far better than a half-finished crash.
3. Retry with backoff — a reusable decorator
import time, functools, random def retry(attempts=3, base=1.0, exceptions=(Exception,)): def deco(fn): @functools.wraps(fn) def wrapper(*args, **kwargs): for n in range(1, attempts + 1): try: return fn(*args, **kwargs) except exceptions as e: if n == attempts: raise wait = base * 2 ** (n - 1) + random.uniform(0, 0.5) log.warning("attempt %d failed (%s); retry in %.1fs", n, e, wait) time.sleep(wait) return wrapper return deco @retry(attempts=4, exceptions=(ConnectionError, TimeoutError)) def fetch(item): ...
This generalises Lesson 24's retry into a decorator you can slap on any flaky operation. Retry only the errors that might be transient — don't retry a ValueError that'll fail identically every time.
4. Idempotency — safe to re-run
# NOT idempotent: re-running double-charges charge_customer(amount) # idempotent: a key makes a repeat a no-op def charge_once(customer, amount, idempotency_key): if already_charged(idempotency_key): # check first return # safe re-run charge_customer(amount) record_charge(idempotency_key)
Retries and resumes only work if re-doing a step is harmless. Design steps so "process item 73,000 again" produces the same result, not a second email/charge/upload. Use idempotency keys, "upsert" instead of "insert," and check-before-act. Without idempotency, recovery can be worse than the crash.
Graceful shutdown
import signal stop = False def handle_signal(signum, frame): global stop stop = True log.info("shutdown requested — finishing current item then stopping") signal.signal(signal.SIGINT, handle_signal) # Ctrl-C signal.signal(signal.SIGTERM, handle_signal) # kill / scheduler stop for item in items: if stop: break # exit cleanly between items, checkpoint intact process(item)
Catch the stop signal and finish the current item before exiting — so you never leave a half-written record. The checkpoint means the next run continues seamlessly.
Worked Example · A Resumable Batch Processor
12 minGoal: process a large list of items with all four habits — checkpoint, isolate, retry, graceful stop — so it can crash, be killed, or hit bad items and still finish correctly on the next run.
import json, time, signal, logging, functools, random from pathlib import Path logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s", datefmt="%H:%M:%S") log = logging.getLogger("batch") CHECKPOINT = Path("progress.json") FAILURES = Path("failures.json") _stop = False def _handle(signum, frame): global _stop; _stop = True log.info("stop requested — will finish current item") signal.signal(signal.SIGINT, _handle) def retry(attempts=3, base=0.5): def deco(fn): @functools.wraps(fn) def w(*a, **k): for n in range(1, attempts + 1): try: return fn(*a, **k) except (ConnectionError, TimeoutError) as e: if n == attempts: raise time.sleep(base * 2 ** (n - 1) + random.uniform(0, 0.3)) return None return w return deco @retry(attempts=4) def process(item: dict) -> None: # …real work: call an API, write a row, etc. (idempotent!)… if item["id"] == 5: raise ValueError("bad data in item 5") # permanent → not retried time.sleep(0.05) def run(items: list[dict]) -> None: done = set(json.loads(CHECKPOINT.read_text())) if CHECKPOINT.exists() else set() failures = json.loads(FAILURES.read_text()) if FAILURES.exists() else {} log.info("resuming: %d already done", len(done)) for item in items: if _stop: log.info("graceful stop at item %s", item["id"]); break if item["id"] in done: continue try: process(item) done.add(item["id"]) CHECKPOINT.write_text(json.dumps(list(done))) # checkpoint each except Exception as e: log.exception("item %s failed", item["id"]) failures[str(item["id"])] = str(e) FAILURES.write_text(json.dumps(failures)) log.info("finished: %d done, %d failed", len(done), len(failures)) run([{"id": i} for i in range(10)])
12:00:00 INFO resuming: 0 already done 12:00:00 ERROR item 5 failed ValueError: bad data in item 5 12:00:01 INFO finished: 9 done, 1 failed # Re-run later: "resuming: 9 already done" — only item 5 is retried.
Read the code
All four habits work together: each success is checkpointed immediately, item 5's permanent error is isolated (logged to failures.json, batch continues), transient errors are retried via the decorator (but ValueError isn't — it'd just fail again), and Ctrl-C triggers a graceful stop between items. Kill it mid-run and the re-run picks up exactly where it left off. This skeleton is how you make any long pipeline — including the capstone — survive the real world.
Try It Yourself
13 minWrite a loop over 20 items that checkpoints each. Kill it (Ctrl-C) around item 10, re-run, and confirm it resumes from 11 — not from 1.
Write the @retry decorator and apply it to a function that fails the first 2 calls then succeeds (use a counter). Confirm it retries and eventually succeeds; then make it always fail and confirm it gives up after N.
Hint
calls = {"n": 0} @retry(attempts=4) def flaky(): calls["n"] += 1 if calls["n"] < 3: raise ConnectionError("blip") return "ok" print(flaky()) # "ok" after 2 retries
Take a step with a side effect (append a line to a file, or "send" an email by logging) and make it idempotent with a key set, so running it twice for the same id only acts once. Prove a re-run doesn't duplicate.
Hint
processed = set() # persisted to disk in real code def send_once(user_id): if user_id in processed: return # already sent — no-op log.info("sending to %s", user_id) processed.add(user_id)
Mini-Challenge · The Retryable Failures Re-Runner
8 minAdd a second mode to the batch processor: --retry-failures reads failures.json and re-attempts only those items (clearing ones that now succeed, keeping ones that still fail). This is the "clean up the stragglers" pass every robust pipeline needs.
Show a sample solution
import json from pathlib import Path def retry_failures(items_by_id: dict) -> None: failures = json.loads(Path("failures.json").read_text()) still_failing = {} for item_id in list(failures): try: process(items_by_id[item_id]) log.info("recovered %s", item_id) except Exception as e: still_failing[item_id] = str(e) log.warning("still failing %s: %s", item_id, e) Path("failures.json").write_text(json.dumps(still_failing)) log.info("%d recovered, %d still failing", len(failures) - len(still_failing), len(still_failing))
Non-negotiables: re-attempts only failed items, removes recovered ones, keeps still-failing ones with reasons.
Recap
3 minLong jobs must survive failure. The four habits: checkpoint progress (persist after each item, skip done ones on resume); isolate per-item errors (log + collect, never crash the batch); retry transient errors with backoff (a reusable decorator — but only retryable exceptions); and make steps idempotent (re-running is harmless — the safety net under retries and resumes). Add graceful shutdown (catch SIGINT/SIGTERM, stop cleanly between items) and a failures-re-run pass. Then a crash at hour 3 costs you the current item, not the whole job.
Vocabulary Card
- checkpoint
- Persisted progress that lets a restarted job resume where it stopped.
- error isolation
- Containing one item's failure so the rest of the batch continues.
- idempotent
- An operation that has the same effect whether run once or many times.
- graceful shutdown
- Stopping cleanly on a signal, leaving state consistent.
Homework
4 minTake one batch automation you've built (CSV processor, API sync, form bot) and make it fully resilient: checkpointing, per-item isolation, a retry decorator, idempotent steps, graceful shutdown, and a --retry-failures pass. Test the recovery: run it, kill it midway, re-run (confirm resume), introduce a bad item (confirm isolation), and re-run failures (confirm recovery). Write a short note on which steps you made idempotent and how.
Sample · recovery test log
Test: resilient API → CSV sync of 1000 records.
Run 1: killed at ~record 400 (Ctrl-C) → "graceful stop at 401",
progress.json has 400 ids.
Run 2: "resuming: 400 already done" → finishes 401-1000.
Record 612 had bad JSON → isolated to failures.json, batch continued.
Run 3: --retry-failures → record 612 still bad (source unfixed),
kept in failures.json with its reason. 0 recovered.
Idempotent steps: each record is written keyed by its id (upsert),
so re-processing an id overwrites the same row rather than appending
a duplicate. Retries are therefore always safe.Non-negotiables: all four habits + graceful stop + retry-failures, a real recovery test, an idempotency explanation.