VR
hands-on lab· intermediate· 40 min

Cache-Aside with Postgres + Redis

Put a Redis cache-aside (lazy-loading) layer in front of Postgres, prove cache hits skip the database, and invalidate correctly on writes.

Objective

Place a Redis cache in front of a Postgres table using the cache-aside

(lazy-loading) pattern — the application checks and populates the cache itself.

This is strategy 1 in the chapter; it is not read-through (where the cache

layer loads on a miss). Prove two things with tests:

  1. A cache hit does not touch Postgres (measured via db.query_count).
  2. A write invalidates the cached copy so reads never go stale.

This is the most common caching pattern in production backends, and the place

most teams get invalidation wrong.


The environment (zero setup)

Postgres and Redis are already running and seeded when you launch the lab — in

Codespaces/Gitpod they come up automatically; locally they start with

docker compose -f .devcontainer/docker-compose.yml up -d.

Available in 8 languages — Python, Go, JavaScript, TypeScript, Ruby, Rust,

C++, and C. Each <lang>/ folder has a stub to implement and a solution

for reference. The single-flight guard is written idiomatically per language

(a per-key mutex in threaded runtimes; an in-flight-promise map in Node).

You never open a connection. The platform layer hands you ready clients:

from labkit import db, cache
LayerHandleWhat you use
Postgres (persistence)dbdb.queryone(sql, params), db.execute(sql, params), db.query_count
Redis (cache)cachecache.get_json(key), cache.set_json(key, value, ttl=...), cache.delete(key), cache.exists(key)

The users table is pre-seeded (5 rows). db.query_count increments on every

query — that is how the tests prove your cache actually saves a database hit.


Your task

Open python/stub.py. The lab has two parts:

Part 1 — cache-aside

  • get_user_profile(user_id) — read from cache first; on a miss read Postgres

and populate the cache; return None if the user does not exist.

  • update_user_plan(user_id, plan) — write to Postgres, then invalidate the

cached entry.

Part 2 — stampede protection (single-flight)

  • get_user_profile_singleflight(user_id) — when a popular key expires and many

requests miss at once, all of them would hit Postgres simultaneously (a *cache

stampede / thundering herd*). Use a per-key lock with a double-check so that

only one thread queries Postgres and the rest reuse its result. The test

fires 50 concurrent cold-cache reads and asserts exactly one DB query.

Validate

python -m unittest test_lab.py

A green OK means cache hits skip Postgres and invalidation works. Compare your

approach with the reference:

python solution.py
  • ../../bsps/07-core-backend-engineering/03-caching-strategy.md — theory
stub.py
#!/usr/bin/env python3
"""
Lab 03: Cache-Aside (lazy loading) — YOUR TURN.

Postgres is the source of truth; Redis sits in front as a cache. The platform
already gives you connected `db` and `cache` handles — you write zero setup.

Implement (two parts):
  Part 1 — cache-aside:
    get_user_profile(user_id)       -> cache first, fall back to Postgres, populate
    update_user_plan(user_id, plan) -> write Postgres, then invalidate the cache
  Part 2 — stampede protection:
    get_user_profile_singleflight(user_id) -> when many requests miss a cold hot
    key at once, only ONE queries Postgres; the rest wait and reuse its result.

Prove it works:
  python -m unittest test_lab.py     # checks cache hits + single-flight DB hits
  python solution.py                  # see the reference behaviour

Useful labkit API:
  db.queryone(sql, params) -> dict | None     db.query_count (read counter)
  db.execute(sql, params)  -> rowcount
  cache.get_json(key)      cache.set_json(key, value, ttl=...)
  cache.delete(key)        cache.exists(key)
"""

import threading

from labkit import db, cache

CACHE_TTL_SECONDS = 60


def get_user_profile(user_id: int) -> dict | None:
    """
    TODO: Implement cache-aside reads.
      1. key = f"user:{user_id}" — return cache.get_json(key) if present.
      2. On miss, SELECT id, name, email, plan FROM users WHERE id = %s.
      3. If the row exists, store it with cache.set_json(key, row, ttl=...).
      4. Return None when the user does not exist.
    """
    raise NotImplementedError("Implement get_user_profile")


def update_user_plan(user_id: int, plan: str) -> None:
    """
    TODO:
      1. UPDATE users SET plan = %s, updated_at = now() WHERE id = %s.
      2. cache.delete(f"user:{user_id}") so the next read refills the cache.
    """
    raise NotImplementedError("Implement update_user_plan")


# ── Part 2 — stampede protection (single-flight) ─────────────────────

_locks: dict[str, threading.Lock] = {}
_locks_guard = threading.Lock()


def _key_lock(key: str) -> threading.Lock:
    """One lock per cache key, created safely under a global guard. (given)"""
    with _locks_guard:
        lock = _locks.get(key)
        if lock is None:
            lock = threading.Lock()
            _locks[key] = lock
        return lock


def get_user_profile_singleflight(user_id: int) -> dict | None:
    """
    TODO: stampede-safe cache-aside read.
      1. key = f"user:{user_id}" — return cache.get_json(key) if present.
      2. On a miss, acquire _key_lock(key) (with the lock as a context manager).
      3. Inside the lock, DOUBLE-CHECK the cache — another thread may have filled
         it while you waited; if so, return it without querying Postgres.
      4. Otherwise query Postgres once, populate the cache, and return the row
         (None if the user does not exist).
    The double-check inside the lock is what collapses the herd to one DB query.
    """
    raise NotImplementedError("Implement get_user_profile_singleflight")


def main():
    try:
        print(get_user_profile(1))
    except NotImplementedError as e:
        print(f"Not implemented yet: {e}")


if __name__ == "__main__":
    main()