Learning Goals
3 minBy the end of this lesson you can:
- Schedule jobs with the
schedulelibrary's fluent API. - Write the run loop that actually fires due jobs.
- Isolate job errors so one failure doesn't kill the scheduler.
- Know when
schedulefits and when to use the OS scheduler instead.
Warm-Up · From One-Shot to Recurring
5 minYour report generator works — but someone still has to run it each morning. The goal of automation is to remove that someone. You could write a while True: ... time.sleep(...) loop by hand, but it gets messy fast: what time is it? did I already run today? what if a job throws?
pip install schedule
The schedule library lets you declare a timetable in readable English — schedule.every().day.at("08:00").do(job) — then you run a tiny loop that calls schedule.run_pending() to fire whatever's due. It's pure Python, in-process, and perfect for a long-running script. The key skill is making that loop robust: a failing job must not crash the scheduler.
New Concept · Declaring & Running Jobs
14 minThe fluent schedule API
import schedule def job(): print("running…") schedule.every().day.at("08:00").do(job) # daily at 8am schedule.every(10).minutes.do(job) # every 10 minutes schedule.every().hour.do(job) # top of every hour schedule.every().monday.at("09:30").do(job) # Mondays 9:30 schedule.every(5).seconds.do(job) # every 5s (for testing)
It reads like English. .do(fn) registers the function (don't call it — pass the name). Times are in the machine's local timezone (remember Lesson 12 if that matters).
Passing arguments
def backup(folder, dest): ... schedule.every().day.at("02:00").do(backup, "data", "backups")
Extra arguments to .do() are passed through to your function when it fires.
The run loop
import time while True: schedule.run_pending() # fire any jobs whose time has come time.sleep(1) # check once a second
Crucial point: scheduling a job doesn't run it — run_pending() does, when called. The loop checks every second and fires what's due. This script must keep running (it's a long-lived process), unlike a one-shot script.
Error isolation — don't let one job kill the loop
import logging log = logging.getLogger("scheduler") def safe(fn): """Wrap a job so an exception is logged, not fatal.""" def wrapper(*args, **kwargs): try: return fn(*args, **kwargs) except Exception: log.exception("job %s failed", fn.__name__) return wrapper schedule.every().day.at("08:00").do(safe(daily_report))
If a job raises and you don't catch it, the exception propagates out of run_pending() and crashes your whole scheduler — every other job stops too. Always wrap jobs (or use schedule's built-in CancelJob/error handling) so a single bad run is logged and the scheduler keeps going.
Useful extras
schedule.every().day.at("08:00").do(job).tag("reports") schedule.clear("reports") # cancel jobs by tag schedule.every(3).seconds.until("18:00").do(job) # stop after a time print(schedule.idle_seconds()) # seconds until the next job
Worked Example · A Robust Scheduler Skeleton
12 minGoal: a reusable scheduler that runs several jobs, isolates errors, prevents overlapping runs, and reports the next-run time — the skeleton you'd deploy.
import time, logging, functools, threading import schedule logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s", datefmt="%H:%M:%S") log = logging.getLogger("scheduler") def job_wrapper(fn): """Log start/finish, swallow errors, and prevent overlap.""" running = threading.Lock() @functools.wraps(fn) def wrapper(*args, **kwargs): if not running.acquire(blocking=False): log.warning("%s still running — skipping this tick", fn.__name__) return try: log.info("▶ %s", fn.__name__) fn(*args, **kwargs) log.info("✓ %s done", fn.__name__) except Exception: log.exception("✗ %s failed", fn.__name__) finally: running.release() return wrapper @job_wrapper def daily_report(): time.sleep(0.5) # pretend work # raise RuntimeError("boom") # try this: scheduler survives it @job_wrapper def heartbeat(): log.info("still alive") # the timetable schedule.every(5).seconds.do(daily_report) # fast cadence for the demo schedule.every(2).seconds.do(heartbeat) log.info("scheduler started — Ctrl-C to stop") try: while True: schedule.run_pending() time.sleep(1) except KeyboardInterrupt: log.info("shutting down cleanly")
14:30:00 INFO scheduler started — Ctrl-C to stop 14:30:02 INFO still alive 14:30:04 INFO still alive 14:30:05 INFO ▶ daily_report 14:30:05 INFO ✓ daily_report done 14:30:06 INFO still alive ...
Read the code
The job_wrapper decorator does three production jobs at once: it logs each run, catches exceptions so the scheduler never dies (uncomment the raise to prove it), and uses a non-blocking Lock to skip a tick if the previous run is still going — preventing two backups from stomping on each other. The KeyboardInterrupt handler exits cleanly. This skeleton is the home for any recurring automation you've built — point daily_report at Lesson 22's generator and you have a self-running report.
schedule runs inside a process you keep alive — great for development, a long-running service, or when you want everything in Python. But if that process dies (reboot, crash), the schedule dies with it. For "must run even after a reboot" reliability, hand the job to the OS (cron / Task Scheduler) — next lesson.
Try It Yourself
13 minSchedule a job that prints the time every 3 seconds and run the loop. Confirm it fires on cadence; stop with Ctrl-C.
Run two jobs at different intervals (e.g. one every 2s, one every 5s) and confirm both fire on their own schedules. Add the safe() wrapper to one and make it raise — prove the other keeps running.
Hint
@safe def flaky(): raise ValueError("oops") schedule.every(2).seconds.do(flaky) schedule.every(5).seconds.do(lambda: print("steady"))
Schedule a job for a specific clock time a minute or two from now (at("HH:MM")) and confirm it fires exactly then. Add idle_seconds() logging so you can see the countdown.
Hint
import schedule, time schedule.every().day.at("14:32").do(lambda: print("fired!")) while True: print("next in", round(schedule.idle_seconds()), "s") schedule.run_pending() time.sleep(5)
Mini-Challenge · The Job Registry
8 minBuild a tiny scheduler that loads its jobs from a config (a list of (name, every_seconds, function) or a small JSON/dict), wraps each with error isolation and logging automatically, and prints a startup summary of the registered schedule. This is how real schedulers stay configurable.
Show a sample solution
import time, logging, functools, schedule logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s", datefmt="%H:%M:%S") log = logging.getLogger("reg") def isolate(fn): @functools.wraps(fn) def w(*a, **k): try: fn(*a, **k) except Exception: log.exception("%s failed", fn.__name__) return w def report(): log.info("report ran") def cleanup(): log.info("cleanup ran") JOBS = [ ("report", 10, report), ("cleanup", 30, cleanup), ] for name, every, fn in JOBS: schedule.every(every).seconds.do(isolate(fn)).tag(name) log.info("registered '%s' every %ds", name, every) log.info("starting %d jobs", len(JOBS)) while True: schedule.run_pending() time.sleep(1)
Non-negotiables: jobs from config, auto error-isolation, a registration summary, working loop.
Recap
3 minThe schedule library declares timetables in readable English — schedule.every().day.at("08:00").do(job) — and a loop calling schedule.run_pending() every second fires what's due. Scheduling doesn't run a job; the loop does, and the process must stay alive. The non-negotiable habit is robustness: wrap every job so an exception is logged rather than crashing the scheduler, and use a lock to skip overlapping runs. schedule is ideal for in-process, Python-only, development scheduling — but if the job must survive reboots and crashes, hand it to the OS scheduler (next lesson).
Vocabulary Card
- run_pending
- Fires all jobs whose scheduled time has arrived; call it in a loop.
- run loop
- The
while True+ short sleep that keeps the scheduler alive. - error isolation
- Wrapping jobs so one failure is logged, not fatal to the scheduler.
- overlap protection
- Skipping a new run while the previous one is still in progress.
Homework
4 minBuild scheduler.py that schedules at least two real automations from this level (e.g. the report generator every day at a time, and a health check every few minutes), each wrapped with logging + error isolation + overlap protection. Run it for a few minutes to confirm jobs fire on cadence and a deliberately-failing job doesn't take down the others. Note one reason you'd move this to the OS scheduler instead.
Sample · scheduler.py
import time, logging, functools, schedule logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s", datefmt="%H:%M:%S") log = logging.getLogger("sched") def isolate(fn): @functools.wraps(fn) def w(*a, **k): try: log.info("▶ %s", fn.__name__); fn(*a, **k); log.info("✓ %s", fn.__name__) except Exception: log.exception("✗ %s", fn.__name__) return w @isolate def daily_report(): ... # call Lesson 22 generator here @isolate def health_check(): ... # ping an endpoint / check disk schedule.every().day.at("08:00").do(daily_report) schedule.every(5).minutes.do(health_check) log.info("scheduler up") while True: schedule.run_pending() time.sleep(1)
Why move to the OS scheduler? If the machine reboots or this process crashes overnight, schedule's timetable is gone — the 8am report never runs. cron/Task Scheduler restart it for you.
Non-negotiables: ≥2 real jobs, isolation+logging+overlap protection, survives a failing job, an OS-scheduler reason.