Learning Goals
3 minBy the end of this lesson you can:
- Read CSV rows as dictionaries with
csv.DictReader. - Write them back out with
csv.DictWriter(and correct newline handling). - Merge many files, even with slightly different column orders.
- De-duplicate rows by a chosen key column, keeping the first or last seen.
Warm-Up · The Monthly Export Pile
5 minA common chore: every month a system spits out sales-jan.csv, sales-feb.csv, … and someone opens each in Excel, copies the rows, and pastes them into a master file — re-typing the header, missing duplicates, making mistakes.
The csv module reads each row as a dictionary keyed by column name (row["email"]), so you don't care about column order — only names. That makes merging files and removing duplicates a few clean lines, every time, with no copy-paste errors.
New Concept · DictReader & DictWriter
14 minReading rows as dictionaries
import csv with open("sales.csv", newline="", encoding="utf-8") as f: reader = csv.DictReader(f) for row in reader: print(row["name"], row["amount"]) # access by column name
DictReaderuses the first line as headers; each row becomes a dict like{"name": "Aisha", "amount": "120"}.- Always open with
newline=""— the csv module handles line endings itself, and skipping this causes blank lines on Windows. - Values are always strings — convert with
int()/float()when you need numbers.
Writing rows back
rows = [ {"name": "Aisha", "amount": "120"}, {"name": "Ben", "amount": "85"}, ] with open("out.csv", "w", newline="", encoding="utf-8") as f: writer = csv.DictWriter(f, fieldnames=["name", "amount"]) writer.writeheader() # write the column titles writer.writerows(rows) # write all the rows at once
DictWriter needs fieldnames (the columns, in the order you want them). writeheader() writes the title row; writerow/writerows write data.
Merging files (order-independent)
from pathlib import Path import csv def merge(folder: str, out: str) -> int: all_rows = [] fields = None for path in sorted(Path(folder).glob("*.csv")): with open(path, newline="", encoding="utf-8") as f: reader = csv.DictReader(f) fields = fields or reader.fieldnames # take columns from first file all_rows.extend(reader) with open(out, "w", newline="", encoding="utf-8") as f: writer = csv.DictWriter(f, fieldnames=fields) writer.writeheader() writer.writerows(all_rows) return len(all_rows)
Because rows are dicts, DictWriter places each value under the right column even if an input file had a different order. (If files have genuinely different columns, pass extrasaction="ignore" or compute the union of fieldnames.)
De-duplicating by a key column
def dedupe(rows: list[dict], key: str, keep="first") -> list[dict]: seen = {} for row in rows: k = row[key] if keep == "last" or k not in seen: seen[k] = row # last write wins if keep="last" return list(seen.values())
A dict keyed by the chosen column collapses duplicates automatically — keep="first" ignores later repeats, keep="last" lets newer rows overwrite older ones. Insertion order is preserved (dicts remember order since Python 3.7).
The csv module is perfect up to tens of thousands of rows and zero dependencies. For millions of rows, joins, or heavy analysis, pandas (Level 8) is the power tool. For everyday merge/clean automation, plain csv is faster to write and easier to ship.
Worked Example · Consolidate Monthly Sales
12 minGoal: merge every sales-*.csv, drop duplicate order IDs (keeping the latest), and report totals — a complete monthly close.
import csv, logging from pathlib import Path logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s") log = logging.getLogger("sales") def consolidate(folder: str, out: str, key: str = "order_id") -> None: rows, fields = [], None files = sorted(Path(folder).glob("sales-*.csv")) for path in files: with open(path, newline="", encoding="utf-8") as f: reader = csv.DictReader(f) fields = fields or reader.fieldnames count = 0 for row in reader: rows.append(row) count += 1 log.info("%s: %d rows", path.name, count) # de-dupe by order_id, keep latest (files are date-sorted) by_id = {row[key]: row for row in rows} unique = list(by_id.values()) removed = len(rows) - len(unique) with open(out, "w", newline="", encoding="utf-8") as f: writer = csv.DictWriter(f, fieldnames=fields) writer.writeheader() writer.writerows(unique) total = sum(float(r["amount"]) for r in unique) log.info("merged %d files → %d unique rows (%d dupes removed)", len(files), len(unique), removed) log.info("total sales: %.2f", total) consolidate("exports", "master_sales.csv")
INFO sales-jan.csv: 412 rows INFO sales-feb.csv: 388 rows INFO sales-mar.csv: 401 rows INFO merged 3 files → 1188 unique rows (13 dupes removed) INFO total sales: 84210.50
Read the code
Everything from this level comes together: glob finds the files, DictReader reads them order-independently, a dict comprehension de-dupes by order_id, DictWriter emits the master, and logging narrates the whole run. The hour-long monthly copy-paste chore is now a five-second script that's also correct — no missed duplicates. Lesson 22 will wrap output like this into a polished report.
Try It Yourself
13 minRead a CSV with an amount column and print the sum, count, and average. Remember to float() the strings.
Read a CSV of people and write a new CSV containing only rows where age >= 18, preserving the header. Report how many were kept and dropped.
Hint
import csv with open("people.csv", newline="", encoding="utf-8") as f: reader = csv.DictReader(f) rows = [r for r in reader if int(r["age"]) >= 18] fields = reader.fieldnames with open("adults.csv", "w", newline="", encoding="utf-8") as f: w = csv.DictWriter(f, fieldnames=fields) w.writeheader(); w.writerows(rows)
Merge two CSVs and de-duplicate by email, but also print which emails were duplicated and how many times. Use a Counter.
Hint
from collections import Counter counts = Counter(r["email"] for r in all_rows) dupes = {email: n for email, n in counts.items() if n > 1} for email, n in dupes.items(): print(f"{email}: seen {n} times")
Mini-Challenge · The CSV Joiner
8 minWrite join(left, right, on) that merges two CSVs side-by-side on a shared key column (like a database join): each output row has the columns of both files where the key matches. Report rows that had no match on the other side.
Show a sample solution
import csv def load(path): with open(path, newline="", encoding="utf-8") as f: return list(csv.DictReader(f)) def join(left_path, right_path, on, out): left, right = load(left_path), load(right_path) right_by_key = {r[on]: r for r in right} merged, unmatched = [], 0 fields = list(left[0].keys()) + [k for k in right[0] if k != on] for l in left: match = right_by_key.get(l[on]) if match: row = {**l, **{k: v for k, v in match.items() if k != on}} merged.append(row) else: unmatched += 1 with open(out, "w", newline="", encoding="utf-8") as f: w = csv.DictWriter(f, fieldnames=fields) w.writeheader(); w.writerows(merged) print(f"{len(merged)} joined, {unmatched} unmatched") join("customers.csv", "orders.csv", on="customer_id", out="joined.csv")
Non-negotiables: join on a key, combined columns, report of unmatched rows.
Recap
3 mincsv.DictReader reads each row as a dict keyed by column name (open with newline=""; values are strings), and csv.DictWriter writes them back given fieldnames + writeheader(). Because access is by name, merging files with different column orders "just works." De-duplicate by collapsing rows into a dict keyed on your chosen column — keep="first" or "last". This turns the monthly copy-paste-merge chore into a correct, repeatable script. For huge data or complex joins, reach for pandas in Level 8.
Vocabulary Card
- DictReader
- Reads CSV rows as dictionaries keyed by the header names.
- DictWriter
- Writes dictionaries to CSV given an ordered list of fieldnames.
- newline=""
- The required
openmode for CSV so line endings are handled correctly. - de-dupe key
- The column whose value identifies duplicate rows.
Homework
4 minBuild csvtool.py with argparse subcommands (Lesson 4): merge <folder> <out> combines all CSVs in a folder, dedupe <file> <key> removes duplicate rows by a column, and stats <file> <column> prints count/sum/min/max/average of a numeric column. Log progress and handle missing columns with a clear error.
Sample · csvtool.py (core)
import argparse, csv from pathlib import Path def read(path): with open(path, newline="", encoding="utf-8") as f: return list(csv.DictReader(f)), csv.DictReader( open(path, newline="", encoding="utf-8")).fieldnames def write(path, rows, fields): with open(path, "w", newline="", encoding="utf-8") as f: w = csv.DictWriter(f, fieldnames=fields) w.writeheader(); w.writerows(rows) def cmd_merge(a): rows, fields = [], None for p in sorted(Path(a.folder).glob("*.csv")): r, fields = read(p) rows += r write(a.out, rows, fields) print(f"merged {len(rows)} rows → {a.out}") def cmd_dedupe(a): rows, fields = read(a.file) uniq = list({r[a.key]: r for r in rows}.values()) write(a.file, uniq, fields) print(f"{len(rows) - len(uniq)} dupes removed") def cmd_stats(a): rows, _ = read(a.file) nums = [float(r[a.column]) for r in rows] print(f"count {len(nums)} sum {sum(nums):.2f} " f"min {min(nums)} max {max(nums)} avg {sum(nums)/len(nums):.2f}") p = argparse.ArgumentParser(); sub = p.add_subparsers(dest="cmd", required=True) m = sub.add_parser("merge"); m.add_argument("folder"); m.add_argument("out"); m.set_defaults(func=cmd_merge) d = sub.add_parser("dedupe"); d.add_argument("file"); d.add_argument("key"); d.set_defaults(func=cmd_dedupe) s = sub.add_parser("stats"); s.add_argument("file"); s.add_argument("column"); s.set_defaults(func=cmd_stats) args = p.parse_args(); args.func(args)
Non-negotiables: three subcommands, DictReader/DictWriter, dedupe by key, numeric stats.