Learning Goals
3 minBy the end of this lesson you can:
- Watch a folder for create/modify/delete/move events with
watchdog. - Write an event handler and run an
Observer. - Filter to the files you care about and debounce rapid duplicate events.
- Wait until a file is fully written before processing it.
Warm-Up · Polling vs. Watching
5 minYou could check a folder every few seconds with schedule and a glob. But polling is wasteful (constant scanning) and laggy (you only notice on the next tick). Watching is instant and idle until something happens.
pip install watchdog
watchdog taps into the OS's native file-system notifications, so your code sleeps until a real event fires — a file created, modified, deleted, or moved — then runs your handler immediately. You write an event handler (what to do) and start an observer (which watches a path). The tricky bits are real-world: filtering noise, debouncing duplicate events, and not grabbing a file that's still being written.
New Concept · Handlers & Observers
14 minA minimal watcher
import time from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler class MyHandler(FileSystemEventHandler): def on_created(self, event): if not event.is_directory: print("created:", event.src_path) def on_modified(self, event): if not event.is_directory: print("modified:", event.src_path) observer = Observer() observer.schedule(MyHandler(), path="inbox", recursive=False) observer.start() try: while True: time.sleep(1) # keep the main thread alive except KeyboardInterrupt: observer.stop() observer.join()
- Subclass
FileSystemEventHandlerand override the events you care about:on_created,on_modified,on_deleted,on_moved. - Each
eventhassrc_pathandis_directory(filter out folder events with the latter). observer.schedule(handler, path, recursive=)watches a folder;recursive=Trueincludes subfolders.- The observer runs in a background thread, so keep the main thread alive with a sleep loop.
Filtering to files you want
from pathlib import Path class CsvHandler(FileSystemEventHandler): def on_created(self, event): p = Path(event.src_path) if event.is_directory or p.suffix.lower() != ".csv": return # ignore non-CSV and folders print("new CSV:", p.name)
Always guard at the top of the handler — file systems emit events for temp files, hidden files, and folders you don't care about.
The "file still being written" problem
When a large file is copied in, you may get on_created the instant it starts arriving — read it now and you get a truncated file. Wait until its size stops changing before processing.
import time from pathlib import Path def wait_until_stable(path: str, checks: int = 3, interval: float = 0.5) -> bool: p = Path(path) last = -1 stable = 0 while stable < checks: if not p.exists(): return False size = p.stat().st_size if size == last and size > 0: stable += 1 else: stable = 0 last = size time.sleep(interval) return True
This polls the size a few times; only when it's unchanged across several checks do we trust the file is complete.
Debouncing duplicate events
import time class DebouncedHandler(FileSystemEventHandler): def __init__(self, cooldown=1.0): self._last = {} self._cooldown = cooldown def _recent(self, path) -> bool: now = time.time() if now - self._last.get(path, 0) < self._cooldown: return True self._last[path] = now return False def on_modified(self, event): if event.is_directory or self._recent(event.src_path): return print("processing", event.src_path)
Editors and OSes often fire several modified events for one save. A per-path cooldown collapses the burst into a single action.
Worked Example · An Auto-Processing Inbox
12 minGoal: watch an inbox/ folder; the instant a CSV lands and finishes copying, process it (here: count rows) and move it to processed/ — the "drop a file, it just happens" pattern.
import time, csv, shutil, logging from pathlib import Path from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s", datefmt="%H:%M:%S") log = logging.getLogger("inbox") INBOX = Path("inbox"); DONE = Path("processed"); FAIL = Path("failed") for d in (INBOX, DONE, FAIL): d.mkdir(exist_ok=True) def wait_until_stable(p: Path, checks=3, interval=0.5) -> bool: last, stable = -1, 0 while stable < checks: if not p.exists(): return False size = p.stat().st_size stable = stable + 1 if (size == last and size > 0) else 0 last = size time.sleep(interval) return True def process(path: Path) -> None: if not wait_until_stable(path): log.warning("vanished or never stabilised: %s", path.name) return try: with open(path, newline="", encoding="utf-8") as f: rows = list(csv.DictReader(f)) log.info("processed %s — %d rows", path.name, len(rows)) shutil.move(str(path), DONE / path.name) except Exception: log.exception("failed %s", path.name) shutil.move(str(path), FAIL / path.name) class InboxHandler(FileSystemEventHandler): def on_created(self, event): p = Path(event.src_path) if event.is_directory or p.suffix.lower() != ".csv": return log.info("detected %s", p.name) process(p) observer = Observer() observer.schedule(InboxHandler(), path=str(INBOX), recursive=False) observer.start() log.info("watching %s/ — drop CSVs in. Ctrl-C to stop.", INBOX) try: while True: time.sleep(1) except KeyboardInterrupt: observer.stop() observer.join()
14:30:00 INFO watching inbox/ — drop CSVs in. Ctrl-C to stop. 14:30:12 INFO detected sales.csv 14:30:14 INFO processed sales.csv — 412 rows # sales.csv is now in processed/
Read the code
This is a real ingest pipeline: the handler filters to CSVs, wait_until_stable ensures the file finished copying before we read it, and each file is moved to processed/ or failed/ so the inbox always shows only pending work (and you never reprocess). Errors are isolated per file. Drop ten CSVs in and they're each handled the moment they land — no schedule, no polling. Pair this with Lesson 15's real CSV processing and you have a hands-free data intake.
Try It Yourself
13 minWatch a folder and print every create/modify/delete/move event with its path. Create, edit, and delete files in that folder and observe the events fire.
Watch a folder but only react to .txt files: when one appears, print its first line. Ignore folders and other extensions.
Hint
def on_created(self, event): p = Path(event.src_path) if event.is_directory or p.suffix != ".txt": return print(p.name, "→", p.read_text(encoding="utf-8").splitlines()[:1])
Watch your Downloads-like folder and auto-move each new file into a subfolder by extension (combine with Lesson 5's organiser). Use wait_until_stable so big downloads finish first.
Hint
def on_created(self, event): p = Path(event.src_path) if event.is_directory: return if not wait_until_stable(p): return dest = p.parent / (p.suffix.lstrip(".").lower() or "other") dest.mkdir(exist_ok=True) shutil.move(str(p), dest / p.name)
Mini-Challenge · The Live Reloader
8 minBuild a mini "live reload": watch a source file (e.g. a .py or .json config), and whenever it changes, re-run a function (re-validate the JSON, or re-run a script via subprocess). Debounce so one save triggers exactly one reload. This is how dev tools auto-restart on edit.
Show a sample solution
import time, json, logging from pathlib import Path from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler log = logging.getLogger("reload") logging.basicConfig(level=logging.INFO, format="%(message)s") class Reloader(FileSystemEventHandler): def __init__(self, target: str, action): self.target = str(Path(target).resolve()) self.action = action self._last = 0.0 def on_modified(self, event): if str(Path(event.src_path).resolve()) != self.target: return if time.time() - self._last < 0.5: # debounce return self._last = time.time() log.info("change detected — reloading") self.action() def revalidate(): try: json.loads(Path("config.json").read_text(encoding="utf-8")) log.info("config OK ✅") except json.JSONDecodeError as e: log.error("config invalid: %s", e) obs = Observer() obs.schedule(Reloader("config.json", revalidate), path=".", recursive=False) obs.start() try: while True: time.sleep(1) except KeyboardInterrupt: obs.stop() obs.join()
Non-negotiables: watches one target file, debounces saves, re-runs an action on change.
Recap
3 minwatchdog reacts to file-system events instead of polling: subclass FileSystemEventHandler (override on_created/on_modified/etc.), register it with an Observer on a path, and keep the main thread alive. The real-world skills are the guards: filter to the files you want (check is_directory and suffix), wait until the file is stable before reading (so you don't grab a half-copied file), and debounce the duplicate events editors emit. Move processed files out of the watched folder so you never reprocess. Use watching for event-driven work; use scheduling (Lessons 35-36) for clock-driven work.
Vocabulary Card
- event handler
- A class whose methods run when file-system events occur.
- Observer
- The watchdog component that watches a path and dispatches events.
- debounce
- Collapsing a burst of rapid events into a single action.
- file stability
- Confirming a file has finished writing (size stopped changing) before use.
Homework
4 minBuild watcher.py: a robust inbox watcher that processes new files of a chosen type (validate a CSV/JSON, or convert with an earlier tool), waits for stability, debounces, moves results to processed/ or failed/, and logs everything. Test it by dropping several files (including one large and one invalid) and confirm each is handled correctly and exactly once.
Sample · what a complete watcher proves
Drop test:
big.csv (50MB, copied slowly) → waits for stability, then
processes once, moves to processed/.
bad.csv (malformed) → process() raises, logged,
moved to failed/ (not processed/).
good.csv (small) → processed instantly, moved.
notes.txt (wrong type) → ignored by the suffix filter.
Each file handled exactly once; inbox/ ends empty; logs show
detect → stable → process/move for each. Editors saving rapidly
don't double-trigger thanks to the debounce.Non-negotiables: stability wait, debounce, type filter, processed/failed routing, exactly-once handling, full logging.