The Rules
3 minSix events, each timed. Use only the standard library (csv, pathlib, collections, datetime) — no pandas. The goal is fluency: writing correct file-wrangling code without looking things up.
- 🥉 Bronze — produce correct output.
- 🥈 Silver — handle the edge cases (empty fields, bad rows) without crashing.
- 🥇 Gold — do it in clean, readable code within the target time.
Create a few small test CSVs first (10-20 rows each) so you can verify your output by eye. Each event lists a suggested target time — race yourself.
Event 1 · The Cleaner
7 minYou're given messy.csv with problems: leading/trailing spaces in values, inconsistent email casing, and some blank rows. Write a script that produces clean.csv: trim every field, lowercase the email column, and drop any row where email is empty.
Show the medal solution
import csv with open("messy.csv", newline="", encoding="utf-8") as f: reader = csv.DictReader(f) fields = reader.fieldnames rows = [] for row in reader: cleaned = {k: (v or "").strip() for k, v in row.items()} cleaned["email"] = cleaned["email"].lower() if cleaned["email"]: # drop blanks rows.append(cleaned) with open("clean.csv", "w", newline="", encoding="utf-8") as f: w = csv.DictWriter(f, fieldnames=fields) w.writeheader(); w.writerows(rows) print(f"cleaned → {len(rows)} rows")
🥈 The (v or "") guards against None values from short rows.
Event 2 · The Pivot & Event 3 · The Validator
16 minEvent 2 · The Pivot (8 min)
Given sales.csv with columns region, product, amount, produce a summary CSV of total amount per region, sorted highest-first.
Show the medal solution
import csv from collections import defaultdict totals = defaultdict(float) with open("sales.csv", newline="", encoding="utf-8") as f: for row in csv.DictReader(f): totals[row["region"]] += float(row["amount"] or 0) ranked = sorted(totals.items(), key=lambda kv: kv[1], reverse=True) with open("by_region.csv", "w", newline="", encoding="utf-8") as f: w = csv.writer(f) w.writerow(["region", "total"]) w.writerows((r, f"{t:.2f}") for r, t in ranked)
Event 3 · The Validator (8 min)
Given signups.csv, write every row that fails validation to errors.csv with an extra reason column. Rules: email must contain @, age must be an integer 13-120, name must be non-empty.
Show the medal solution
import csv def problem(row) -> str | None: if not row.get("name", "").strip(): return "missing name" if "@" not in row.get("email", ""): return "bad email" try: age = int(row["age"]) except (ValueError, KeyError): return "age not a number" if not 13 <= age <= 120: return "age out of range" return None with open("signups.csv", newline="", encoding="utf-8") as f: reader = csv.DictReader(f) bad = [] for row in reader: why = problem(row) if why: bad.append({**row, "reason": why}) fields = reader.fieldnames + ["reason"] with open("errors.csv", "w", newline="", encoding="utf-8") as f: w = csv.DictWriter(f, fieldnames=fields) w.writeheader(); w.writerows(bad) print(f"{len(bad)} invalid rows")
🥇 One problem() function returning a reason string keeps the rules readable and testable.
Event 4 · The Splitter
9 minGiven one big orders.csv, split it into one file per region — orders-north.csv, orders-south.csv, etc. — each with the original header. The inverse of merging.
Show the medal solution
import csv from collections import defaultdict groups = defaultdict(list) with open("orders.csv", newline="", encoding="utf-8") as f: reader = csv.DictReader(f) fields = reader.fieldnames for row in reader: groups[row["region"]].append(row) for region, rows in groups.items(): safe = region.lower().replace(" ", "-") with open(f"orders-{safe}.csv", "w", newline="", encoding="utf-8") as f: w = csv.DictWriter(f, fieldnames=fields) w.writeheader(); w.writerows(rows) print(f"orders-{safe}.csv: {len(rows)} rows")
🥈 Sanitise the region into a safe filename (lowercase, no spaces) so "North East" → orders-north-east.csv.
Event 5 · The Time-Boxer
10 minGiven events.csv with a timestamp column (ISO format like 2026-05-28T14:30:00), write only the rows that fall within the last 7 days to recent.csv. Combine the csv and datetime skills.
Try it before peeking. Hint: parse with datetime.fromisoformat and compare against datetime.now() - timedelta(days=7).
Show the medal solution
import csv from datetime import datetime, timedelta cutoff = datetime.now() - timedelta(days=7) with open("events.csv", newline="", encoding="utf-8") as f: reader = csv.DictReader(f) fields = reader.fieldnames recent = [] for row in reader: try: when = datetime.fromisoformat(row["timestamp"]) except ValueError: continue # skip unparseable timestamps if when >= cutoff: recent.append(row) with open("recent.csv", "w", newline="", encoding="utf-8") as f: w = csv.DictWriter(f, fieldnames=fields) w.writeheader(); w.writerows(recent) print(f"{len(recent)} rows in the last 7 days")
🥈 A bad timestamp shouldn't crash the run — skip it (or log it) and continue.
Event 6 · The Grand Final — Full Pipeline
12 minThe medal event. Build one script that: (1) merges every raw/*.csv, (2) drops rows failing validation (from Event 3's rules), (3) de-dupes by email keeping the latest, (4) writes the clean master to master.csv, and (5) writes a summary.csv of row counts per region. Log each stage.
Show the medal solution
import csv, logging from pathlib import Path from collections import defaultdict logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s") log = logging.getLogger("olympics") def valid(row) -> bool: return ("@" in row.get("email", "") and row.get("name", "").strip() and row.get("region", "").strip()) # 1. merge rows, fields = [], None for p in sorted(Path("raw").glob("*.csv")): with open(p, newline="", encoding="utf-8") as f: r = csv.DictReader(f) fields = fields or r.fieldnames rows += list(r) log.info("merged %d rows", len(rows)) # 2. validate clean = [r for r in rows if valid(r)] log.info("dropped %d invalid", len(rows) - len(clean)) # 3. de-dupe by email (last wins) unique = list({r["email"].lower(): r for r in clean}.values()) log.info("%d unique rows", len(unique)) # 4. master.csv with open("master.csv", "w", newline="", encoding="utf-8") as f: w = csv.DictWriter(f, fieldnames=fields) w.writeheader(); w.writerows(unique) # 5. summary.csv by_region = defaultdict(int) for r in unique: by_region[r["region"]] += 1 with open("summary.csv", "w", newline="", encoding="utf-8") as f: w = csv.writer(f); w.writerow(["region", "count"]) w.writerows(sorted(by_region.items(), key=lambda kv: kv[1], reverse=True)) log.info("done → master.csv + summary.csv")
🥇 Gold = all five stages, logged, correct, and readable. This is a real ETL pipeline in 30 lines.
Recap & Scorecard
3 minSix events, one toolkit: DictReader/DictWriter for I/O, defaultdict/Counter for grouping, dict-keying for de-dupe, and datetime for time filters. If you can clean, pivot, validate, split, time-box, and pipeline CSV data without reaching for docs, you've got the fluency real data automation demands. Tally your medals — and revisit any event where you peeked before solving.
Pattern Card
- clean
- Strip + normalise fields; drop rows failing a rule.
- pivot
- Group rows by a key and aggregate a numeric column.
- validate
- One predicate function per row, routing failures to an error file.
- pipeline
- Merge → validate → de-dupe → write — the shape of every ETL job.
Homework
4 minPick the event you found hardest and redo it from a blank file, without looking at the solution, timing yourself. Then design a seventh event of your own based on a real CSV chore you (or family/friends) actually face, write the solution, and note which patterns from this level it used.
Sample · a self-designed Event 7
Event 7: "The Deduper Plus"
Chore: a club's signup sheet has the same person entered
with different email casing AND occasional typos in name.
Task: merge sheets, de-dupe by lowercased email, and when two
rows share an email, keep the one with the LONGER name
(assume the fuller name is more correct).
Patterns used: DictReader/Writer, dict-keying for de-dupe,
a custom "keep" rule comparing len(name).best = {} for r in all_rows: k = r["email"].lower() if k not in best or len(r["name"]) > len(best[k]["name"]): best[k] = r unique = list(best.values())
Non-negotiables: a real chore, a working solution, and naming the patterns it reuses.