Cache-Aside with Postgres + Redis
Put a Redis cache-aside (lazy-loading) layer in front of Postgres, prove cache hits skip the database, and invalidate correctly on writes.
instructions
Objective
Place a Redis cache in front of a Postgres table using the cache-aside
(lazy-loading) pattern — the application checks and populates the cache itself.
This is strategy 1 in the chapter; it is not read-through (where the cache
layer loads on a miss). Prove two things with tests:
- A cache hit does not touch Postgres (measured via
db.query_count). - A write invalidates the cached copy so reads never go stale.
This is the most common caching pattern in production backends, and the place
most teams get invalidation wrong.
The environment (zero setup)
Postgres and Redis are already running and seeded when you launch the lab — in
Codespaces/Gitpod they come up automatically; locally they start with
docker compose -f .devcontainer/docker-compose.yml up -d.
Available in 8 languages — Python, Go, JavaScript, TypeScript, Ruby, Rust,
C++, and C. Each
<lang>/folder has astubto implement and asolutionfor reference. The single-flight guard is written idiomatically per language
(a per-key mutex in threaded runtimes; an in-flight-promise map in Node).
You never open a connection. The platform layer hands you ready clients:
from labkit import db, cache| Layer | Handle | What you use |
|---|---|---|
| Postgres (persistence) | db | db.queryone(sql, params), db.execute(sql, params), db.query_count |
| Redis (cache) | cache | cache.get_json(key), cache.set_json(key, value, ttl=...), cache.delete(key), cache.exists(key) |
The users table is pre-seeded (5 rows). db.query_count increments on every
query — that is how the tests prove your cache actually saves a database hit.
Your task
Open python/stub.py. The lab has two parts:
Part 1 — cache-aside
get_user_profile(user_id)— read from cache first; on a miss read Postgres
and populate the cache; return None if the user does not exist.
update_user_plan(user_id, plan)— write to Postgres, then invalidate the
cached entry.
Part 2 — stampede protection (single-flight)
get_user_profile_singleflight(user_id)— when a popular key expires and many
requests miss at once, all of them would hit Postgres simultaneously (a *cache
stampede / thundering herd*). Use a per-key lock with a double-check so that
only one thread queries Postgres and the rest reuse its result. The test
fires 50 concurrent cold-cache reads and asserts exactly one DB query.
Validate
python -m unittest test_lab.pyA green OK means cache hits skip Postgres and invalidation works. Compare your
approach with the reference:
python solution.pyRelated module
../../bsps/07-core-backend-engineering/03-caching-strategy.md— theory
your task — Python
#!/usr/bin/env python3
"""
Lab 03: Cache-Aside (lazy loading) — YOUR TURN.
Postgres is the source of truth; Redis sits in front as a cache. The platform
already gives you connected `db` and `cache` handles — you write zero setup.
Implement (two parts):
Part 1 — cache-aside:
get_user_profile(user_id) -> cache first, fall back to Postgres, populate
update_user_plan(user_id, plan) -> write Postgres, then invalidate the cache
Part 2 — stampede protection:
get_user_profile_singleflight(user_id) -> when many requests miss a cold hot
key at once, only ONE queries Postgres; the rest wait and reuse its result.
Prove it works:
python -m unittest test_lab.py # checks cache hits + single-flight DB hits
python solution.py # see the reference behaviour
Useful labkit API:
db.queryone(sql, params) -> dict | None db.query_count (read counter)
db.execute(sql, params) -> rowcount
cache.get_json(key) cache.set_json(key, value, ttl=...)
cache.delete(key) cache.exists(key)
"""
import threading
from labkit import db, cache
CACHE_TTL_SECONDS = 60
def get_user_profile(user_id: int) -> dict | None:
"""
TODO: Implement cache-aside reads.
1. key = f"user:{user_id}" — return cache.get_json(key) if present.
2. On miss, SELECT id, name, email, plan FROM users WHERE id = %s.
3. If the row exists, store it with cache.set_json(key, row, ttl=...).
4. Return None when the user does not exist.
"""
raise NotImplementedError("Implement get_user_profile")
def update_user_plan(user_id: int, plan: str) -> None:
"""
TODO:
1. UPDATE users SET plan = %s, updated_at = now() WHERE id = %s.
2. cache.delete(f"user:{user_id}") so the next read refills the cache.
"""
raise NotImplementedError("Implement update_user_plan")
# ── Part 2 — stampede protection (single-flight) ─────────────────────
_locks: dict[str, threading.Lock] = {}
_locks_guard = threading.Lock()
def _key_lock(key: str) -> threading.Lock:
"""One lock per cache key, created safely under a global guard. (given)"""
with _locks_guard:
lock = _locks.get(key)
if lock is None:
lock = threading.Lock()
_locks[key] = lock
return lock
def get_user_profile_singleflight(user_id: int) -> dict | None:
"""
TODO: stampede-safe cache-aside read.
1. key = f"user:{user_id}" — return cache.get_json(key) if present.
2. On a miss, acquire _key_lock(key) (with the lock as a context manager).
3. Inside the lock, DOUBLE-CHECK the cache — another thread may have filled
it while you waited; if so, return it without querying Postgres.
4. Otherwise query Postgres once, populate the cache, and return the row
(None if the user does not exist).
The double-check inside the lock is what collapses the herd to one DB query.
"""
raise NotImplementedError("Implement get_user_profile_singleflight")
def main():
try:
print(get_user_profile(1))
except NotImplementedError as e:
print(f"Not implemented yet: {e}")
if __name__ == "__main__":
main()