The Brief
3 minBuild an alerts package: raise_alert(event, level, **context) takes an event, and a routing table decides which channels receive it based on severity. Channels are pluggable adapters (Lesson 32). The system debounces (no repeat alert for the same event until it clears) and logs everything.
- A clean
Alertdata model (event, level, message, context, timestamp). - Pluggable channel adapters: Log, Email, Slack, SMS.
- A routing table mapping severity → channels.
- Debouncing state so the same firing event alerts once, then on recovery.
The Architecture
5 mintrigger → raise_alert(event, level, **context)
│
▼
build Alert object (message + metadata)
│
debounce check (already alerting on this event?)
│
routing table level → [channels]
│
fan out → Log Email Slack SMS (each an adapter)Separate the three concerns: what happened (the Alert), who should know (the routing table), and how to tell them (channel adapters). Keep them decoupled and you can change routing without touching channels, add a channel without touching routing, and test each in isolation. That separation is the whole project.
Build It · Model, Channels, Routing
14 minThe Alert model
from dataclasses import dataclass, field from datetime import datetime LEVELS = ["info", "warning", "error", "critical"] @dataclass class Alert: event: str # stable id, e.g. "disk_full" level: str # one of LEVELS message: str context: dict = field(default_factory=dict) at: datetime = field(default_factory=datetime.now) def format(self) -> str: ctx = " ".join(f"{k}={v}" for k, v in self.context.items()) return (f"[{self.level.upper()}] {self.message} " f"({self.at:%H:%M:%S}) {ctx}").strip()
A @dataclass (from Level 4/6) gives a tidy value object. format() renders a default text representation each channel can use or override.
Channel adapters
import os, logging, requests log = logging.getLogger("alerts") class LogChannel: def send(self, alert: Alert) -> bool: log.info(alert.format()); return True class SlackChannel: def __init__(self, url): self.url = url def send(self, alert: Alert) -> bool: colour = {"info": "#2563EB", "warning": "#F59E0B", "error": "#DC2626", "critical": "#991B1B"}.get(alert.level) try: return requests.post(self.url, json={"attachments": [ {"color": colour, "text": alert.format()}]}, timeout=10).ok except requests.RequestException as e: log.error("slack failed: %s", e); return False class EmailChannel: def __init__(self, to): self.to = to def send(self, alert: Alert) -> bool: # reuse your Lesson 29/30 send_email() from mailer import send_email return send_email([self.to], f"[{alert.level}] {alert.event}", alert.format()) class SmsChannel: def __init__(self, to): self.to = to def send(self, alert: Alert) -> bool: from sms import send_sms # Lesson 33, with its own rate cap return send_sms(self.to, alert.format())
Each channel exposes the same send(alert) method — the common interface from Lesson 32. They wrap the senders you already built. Adding a channel = one new class.
The routing table
def build_channels() -> dict: log_ch = LogChannel() chans = {"log": log_ch} if os.getenv("SLACK_WEBHOOK"): chans["slack"] = SlackChannel(os.environ["SLACK_WEBHOOK"]) if os.getenv("ALERT_EMAIL"): chans["email"] = EmailChannel(os.environ["ALERT_EMAIL"]) if os.getenv("ONCALL_PHONE"): chans["sms"] = SmsChannel(os.environ["ONCALL_PHONE"]) return chans # which channels each level reaches (log always; SMS only critical) ROUTING = { "info": ["log"], "warning": ["log", "slack"], "error": ["log", "slack", "email"], "critical": ["log", "slack", "email", "sms"], }
The routing table is plain data — easy to read, change, and test. Severity climbs from log-only to all-channels-plus-SMS, encoding the policy from Lesson 33.
Build It · Debounce & Dispatch
12 minNow the engine: dedupe repeated firings, dispatch to the routed channels, and offer a clear() that sends an "all good" recovery.
import json from pathlib import Path STATE = Path(".alert_state.json") def _load() -> dict: return json.loads(STATE.read_text()) if STATE.exists() else {} def _save(state: dict) -> None: STATE.write_text(json.dumps(state)) def raise_alert(event: str, level: str, message: str, **context) -> dict: if level not in LEVELS: raise ValueError(f"unknown level: {level}") state = _load() # debounce: skip if we're already alerting on this event at >= this level if state.get(event, {}).get("active"): log.debug("debounced repeat alert for %s", event) return {} alert = Alert(event=event, level=level, message=message, context=context) channels = build_channels() results = {} for name in ROUTING.get(level, ["log"]): ch = channels.get(name) if ch: results[name] = ch.send(alert) state[event] = {"active": True, "level": level} _save(state) return results def clear_alert(event: str, message: str = "recovered") -> dict: state = _load() if not state.get(event, {}).get("active"): return {} # nothing to clear level = state[event]["level"] alert = Alert(event=event, level="info", message=f"✅ {message}") channels = build_channels() results = {name: channels[name].send(alert) for name in ROUTING.get(level, ["log"]) if name in channels} state[event]["active"] = False _save(state) return results # --- usage --- raise_alert("disk_full", "critical", "disk at 95% on /var", host="web-01", mount="/var") raise_alert("disk_full", "critical", "disk at 96%") # debounced — silent # …later, when it recovers… clear_alert("disk_full", "disk back to 60%")
INFO [CRITICAL] disk at 95% on /var (14:30:01) host=web-01 mount=/var # (second raise is debounced — no duplicate to Slack/email/SMS) INFO [INFO] ✅ disk back to 60% (14:45:10)
Read the result
The engine ties the three concerns together: it builds an Alert, checks debounce state so a metric stuck at 95% doesn't page you every minute, looks up the routing table, and fans out — with each channel's failure isolated. clear_alert sends the recovery and resets state so the next occurrence alerts fresh. Because channels and routing are decoupled, you can re-point "error" to add Discord, or swap the SMS provider, without touching this engine at all. That's a production-grade alerting core.
Build It Yourself
13 minUse simulated channels (log-only) while building so you don't spend money or spam yourself; switch real webhooks in once it works.
Build the Alert dataclass and a LogChannel, and get raise_alert printing a formatted line for each level. Verify the format includes level, message, time, and context.
Add the routing table and at least one more channel (Slack or a simulated one). Raise alerts at each level and confirm each reaches exactly the right channels.
Implement the debounce state and clear_alert. Prove that raising the same event twice only alerts once, and that clear_alert sends a recovery and re-arms the event.
Hint
raise_alert("api_down", "error", "API 500s") # alerts raise_alert("api_down", "error", "still down") # debounced (silent) clear_alert("api_down", "API healthy") # recovery + re-arm raise_alert("api_down", "error", "down again") # alerts again
Stretch · A Real Trigger
8 minWire the alert system to a real condition: a monitor loop that checks something (disk %, an API's health endpoint, a file's freshness) and calls raise_alert when it's bad and clear_alert when it recovers. Run it on a short interval to see debouncing and recovery in action end-to-end.
Show a sample solution
import shutil, time def monitor_disk(threshold=90, interval=5): while True: used = shutil.disk_usage("/") pct = used.used / used.total * 100 if pct >= threshold: raise_alert("disk_full", "critical", f"disk at {pct:.0f}%", threshold=threshold) else: clear_alert("disk_full", f"disk healthy at {pct:.0f}%") time.sleep(interval) # monitor_disk() # Ctrl-C to stop; alerts once on cross, recovers once on drop
Non-negotiables: real metric check, raise on cross, clear on recovery, debounce visible across loop iterations.
Recap
3 minA real alert system separates three concerns: the Alert (what happened), the routing table (who hears it, by severity), and channel adapters (how, each behind one send(alert)). The engine builds the alert, debounces repeats, looks up the route, and fans out with per-channel error isolation; clear_alert sends recovery and re-arms. Because the parts are decoupled, you change routing or add a channel without touching the others. This composes everything from Lessons 29-33 into one production-grade pattern — and it's the alerting backbone for the capstone in Lesson 47.
Vocabulary Card
- routing table
- Data mapping severity levels to the channels that should receive them.
- channel adapter
- A class with a uniform
send(alert)wrapping one delivery method. - debounce
- Suppressing duplicate alerts for an event that's already active.
- recovery alert
- An "all clear" message sent when a fired condition resolves.
Homework
4 minFinish the alert system as a reusable alerts.py module with the Alert model, at least two channel adapters, a routing table, debouncing, and raise_alert/clear_alert. Wire it to one real trigger (disk, API health, or file freshness). Write a short note describing your routing policy and a scenario showing debounce + recovery working.
Sample · a working scenario
Routing: info→log; warning→log,slack; error→+email; critical→+sms.
Scenario (API health monitor, 5s loop):
t=0 API returns 500 → raise_alert("api","error",...)
→ log + slack + email fire once.
t=5 still 500 → debounced, nothing sent.
t=10 still 500 → debounced.
t=15 API returns 200 → clear_alert("api","recovered")
→ recovery posted; event re-armed.
t=40 API 500 again → alerts fresh (not debounced).
No duplicate pages, no missed incidents. Channels and routing
are decoupled — adding Discord = one adapter + one routing edit.Non-negotiables: reusable module, ≥2 adapters, routing table, debounce + recovery, a real trigger, a written scenario.