Distributed Key-Value Store for Cache Nodes

Abstract

This RFC proposes adding an eventually-consistent distributed replication layer to the cache service’s key-value store. Today, KV entries live in per-node SQLite databases, causing inconsistent hit/miss behavior when multiple cache nodes sit behind the same load balancer within a region. The proposal introduces a shared PlanetScale Postgres instance as the global source of truth, with asynchronous background replication keeping each node’s local SQLite synchronized. The design preserves the existing local-first architecture for self-hosted deployments and keeps shared infrastructure entirely off the request hot path.

Motivation

KV entries live in per-node SQLite. As we scale horizontally within a region, an Xcode cache upload that lands on one node is a miss on its sibling nodes behind the same load balancer. That means one request can be a miss and the next a hit inside the same region, which is not acceptable. We also have globally split teams, so cross-region visibility matters too.

Requirements

  • Eventually consistent is fine; cross-node visibility can lag by minutes.
  • Last-write-wins at the key level is acceptable.
  • Request latency is critical; shared infra must stay off the hot path.
  • S3 PUTs are a major cost driver; no per-entry S3 uploads for KV metadata.
  • Self-hosted single-node deployments must keep working with local-only SQLite.
  • KV PUTs can burst to ~1000 req/s; synchronous remote writes on the request path are not viable.
  • Local SQLite on each node must stay within the current storage budget; we cannot mirror the full global dataset onto every node.

Detailed Design

Operating Modes

The system introduces two operating modes, selected via KEY_VALUE_MODE=local|distributed:

Mode Default Behavior
local Yes Current behavior. Cachex + local SQLite. No shared state.
distributed No Local Cachex + SQLite as edge cache. PlanetScale Postgres in us-east as global truth.

All existing behavior stays unchanged in local mode.

Production Topology

cache-eu-central ──┐
cache-eu-north ────┤
cache-us-east ─────┤
cache-us-west ─────┼──▶ PlanetScale Postgres (us-east)
cache-ap-southeast ┤
cache-sa-west ─────┤
cache-au-east ─────┘

Data Model

PlanetScale Postgres: kv_entries

Column Type Notes
key text PK. Format: keyvalue:{account}:{project}:{cas_id}
account_handle text Extracted from key for query efficiency
project_handle text Extracted from key for query efficiency
cas_id text Extracted from key
json_payload text Opaque blob, same format as today
source_node text Originating cache host
source_updated_at timestamptz LWW timestamp from originating node
last_accessed_at timestamptz Latest globally replicated access timestamp; used to decide what stays hot locally
updated_at timestamptz DB-side timestamp for sync ordering
deleted_at timestamptz Soft-delete tombstone, NULL when alive

Indexes:

  • PK on key
  • (updated_at, key) for poller watermark queries
  • (last_accessed_at, key) for optional recency-based operational queries
  • (account_handle, project_handle) for project-scoped cleanup
  • (deleted_at) partial index for tombstone purging

The json_payload column is stored as text, not jsonb. The payload is treated as an opaque blob everywhere – the request path never queries into it, and Postgres never needs to index or filter by its contents. text avoids the parsing overhead of jsonb on every insert. If debugging queries against the payload become necessary, a one-time ALTER COLUMN to jsonb is non-breaking.

Local SQLite Changes (distributed mode)

key_value_entries stays as it is in local mode. In distributed mode, it becomes a materialized hot cache rather than the source of truth.

A new migration adds:

  • source_updated_at (utc_datetime_usec, nullable) – payload-version conflict resolution. Intentionally separate from last_accessed_at: reads bump last_accessed_at, but must not make an older payload look newer than a later write.
  • replication_enqueued_at (utc_datetime_usec, nullable) – marks rows whose latest write or access bump still needs to be shipped to Postgres.

In local mode these columns are unused and remain NULL. In distributed mode, KeyValueBuffer.write_batch/2 sets source_updated_at to the current timestamp for payload writes, and both writes and access bumps set replication_enqueued_at when they need shipping.

Every node converges on the global hot working set using one background sync loop: KeyValueReplicationPoller follows rows in Postgres ordered by updated_at, and updated_at advances for both payload writes and replicated access bumps. The local database is kept below the existing 25GB limit by the current time- and size-based eviction policy, which already orders by last_accessed_at.

  • Each node converges toward the global hot working set, not the full historical dataset.
  • Different nodes will not hold the exact same set, but they should broadly converge as globally hot keys keep getting touched and therefore keep moving through the sync stream.
  • Cold entries can be dropped locally and later re-materialized when another node writes or accesses them, because both actions advance the shared row’s updated_at.

key_value_entry_hashes is removed entirely (see Removal of key_value_entry_hashes). The table, schema, and all code that reads or writes hash references are deleted as part of this RFC.

Local Replication Queue

We do not add a second SQLite table for replication. The existing key_value_entries table is sufficient:

  • New writes already flow through KeyValueBuffer.
  • In distributed mode, KeyValueBuffer.write_batch/2 writes the row locally and sets replication_enqueued_at.
  • KeyValueReplicationShipper scans key_value_entries WHERE replication_enqueued_at IS NOT NULL ORDER BY replication_enqueued_at, id LIMIT ....
  • On successful shipment it clears replication_enqueued_at.
  • Repeated writes to the same key naturally coalesce because key_value_entries.key is already unique.
  • A partial index on replication_enqueued_at IS NOT NULL ensures the shipper only scans pending work.
  • Pending rows must not be evicted locally before they have been shipped successfully.

Write Path

Client PUT
  │
  ▼
Cache node (any)
  ├─▶ Cachex.put (in-memory, immediate)
  └─▶ KeyValueBuffer.enqueue (ETS buffer → local SQLite, marks row for replication)
  │
  ▼
ACK to client
  │
  ▼ (async, background)
KeyValueReplicationShipper (GenServer, per-node)
  ├─ polls pending local rows every 200ms (configurable via DISTRIBUTED_KV_SHIP_INTERVAL_MS)
  ├─ batches pending rows (up to 500-2000 per batch)
  ├─ INSERT INTO kv_entries ... ON CONFLICT (key) DO UPDATE
  │    SET json_payload = CASE
  │          WHEN EXCLUDED.source_updated_at > kv_entries.source_updated_at
  │          THEN EXCLUDED.json_payload
  │          ELSE kv_entries.json_payload
  │        END,
  │        source_updated_at = GREATEST(kv_entries.source_updated_at, EXCLUDED.source_updated_at),
  │        last_accessed_at = GREATEST(kv_entries.last_accessed_at, EXCLUDED.last_accessed_at),
  │        ...
  └─ clears `replication_enqueued_at` on success

LWW resolution: payload fields use source_updated_at; access recency uses GREATEST(last_accessed_at) so an access bump can propagate without overwriting a newer payload. If two payload writes arrive with identical source_updated_at, the tie is broken by lexicographically comparing source_node so the rule is explicit and deterministic.

Burst absorption: duplicate-key bursts are rare, so the main protection is batching many different keys into a small number of SQL transactions. When the same key is hit repeatedly, the local row is updated in place and remains a single pending shipment.

Cross-region tuning: shipper database timeouts and batch sizes must be tuned for the farthest regions. The correct answer for high-latency regions is smaller batches and a longer shared-DB timeout, not a single huge transaction that times out from Australia.

Read Path

Client GET
  │
  ▼
Cachex.get(key)
  ├─ hit ──▶ return payload
  └─ miss
      │
      ▼
  KeyValueRepo.get_by(key) [local SQLite]
  ├─ hit ──▶ populate Cachex, enqueue access update, return payload
  └─ miss ──▶ return {:error, :not_found}

The initial design keeps Postgres entirely off the read path. Xcode cache artifacts are small enough that a cross-ocean metadata lookup may not beat a miss and rebuild anyway, so remote miss fallback is intentionally not part of the core feature. If metrics later show a clear benefit, it can be added as an explicit follow-up behind a flag.

Access replication: distributed mode needs a coalesced access-bump path, not just payload-write replication. When a key is read locally, we eventually mark it for shipment so the shipper can propagate an updated last_accessed_at to Postgres. That access bump also advances the shared row’s updated_at, so it naturally flows through the same inbound poller as payload writes.

  • Payload freshness is governed by source_updated_at.
  • Hotness is governed by last_accessed_at.
  • We do not need to ship every individual read; the latest observed access time per key is enough.
  • In practice this should be throttled/coalesced so hot Cachex hits still refresh global hotness without turning every hit into a SQLite + Postgres write.

Inbound Replication (Poller)

KeyValueReplicationPoller (GenServer, per-node)
  │
  ├─ stores local watermark in GenServer state (last seen `updated_at` + `key`)
  ├─ polls PlanetScale every 30-60s (configurable)
  ├─ SELECT * FROM kv_entries
  │    WHERE source_node != @current_node
  │      AND updated_at < NOW() - interval '5 seconds'
  │      AND (updated_at, key) > (@last_updated_at, @last_key)
  │    ORDER BY updated_at, key
  │    LIMIT 1000
  │
  ├─ for each row:
  │    ├─ alive (deleted_at IS NULL):
  │    │    upsert into local SQLite key_value_entries
  │    │    payload fields use LWW on `source_updated_at`
  │    │    `last_accessed_at` uses GREATEST(local, remote)
  │    └─ tombstoned (deleted_at IS NOT NULL):
  │         skip if local row has `replication_enqueued_at IS NOT NULL`
  │           (pending shipment must reach Postgres before local delete)
  │         otherwise delete from local SQLite key_value_entries
  │
  └─ advance local watermark

Key details:

  • source_node != @current_node filter: avoids re-importing the node’s own writes.
  • updated_at < NOW() - interval '5 seconds' lag buffer: prevents the classic CDC gap where an in-flight Postgres transaction with updated_at = T hasn’t committed yet, but a later transaction with updated_at = T+1 has. The poller would advance its watermark past T and never see the first transaction. A 5-second buffer gives in-flight transactions time to commit before the poller reads them.
  • Payload freshness and hotness are separate: payload bytes are governed by source_updated_at, while last_accessed_at is merged independently with GREATEST(...) so hotness can propagate without a new payload write.
  • Tombstones must not destroy pending shipments: if the local row has replication_enqueued_at IS NOT NULL, the poller skips the tombstone delete. The shipper will ship the row to Postgres, where the newer source_updated_at will win over the tombstone’s cutoff and revive the row. Once shipped, the next poll cycle will see the row is alive globally and leave it alone.
  • Global replication, local eviction: every node sees global changes, but the local SQLite store is still bounded by the existing eviction worker. We do not try to predict a per-region subset in the poller.

Convergence guarantee: all nodes eventually see globally touched rows (writes and replicated accesses), bounded by poll interval + lag buffer + query page size.

Watermark persistence: the watermark lives in GenServer state and is lost on restart. On startup, the poller initializes its watermark to NOW() - DISTRIBUTED_KV_SYNC_INTERVAL_MS * 2 so it picks up recent changes without re-scanning the entire history. Since all inbound operations are idempotent upserts, re-processing a few rows after restart is harmless.

No Cachex invalidation on inbound replication: let TTL handle staleness. Avoids complexity and Cachex churn.

Eviction and Cleanup

Local Eviction (both modes)

  • Purpose: free local SQLite space.
  • Runs via KeyValueEvictionWorker with time-based and size-based triggers (unchanged).
  • Deletes local SQLite rows only. CASCleanupWorker is removed entirely as part of this RFC (see Removal of key_value_entry_hashes). Orphaned local disk files are handled by OrphanCleanupWorker.
  • In distributed mode: must skip rows where replication_enqueued_at IS NOT NULL so pending writes are not dropped before they reach Postgres. The entry still exists globally; other nodes and the poller can re-materialize it if it becomes active again.

Shared-store Invalidation (distributed mode only)

Tombstones are normal kv_entries rows with deleted_at set. Only explicit invalidation flows create tombstones (e.g., tuist cache clean --remote via CleanProjectWorker). Local eviction never creates tombstones.

Race-safe semantics for tuist cache clean --remote:

  1. The cleanup request carries a single cleanup_started_at timestamp generated once and shared across all node requests.
  2. Each node deletes its local SQLite rows for the target scope only where source_updated_at <= cleanup_started_at.
  3. Each node writes Postgres tombstones (deleted_at) for the same scope and cutoff. Multiple nodes executing the same tombstone write is safe: the predicate is deterministic.
  4. A write that happens after cleanup_started_at wins automatically: it is not deleted locally, not tombstoned in Postgres, and if it lands after an older tombstone it revives the row.

Concrete lifecycle:

  1. A user runs tuist cache clean --remote.
  2. The system generates one cleanup_started_at cutoff and sends it to all cache nodes.
  3. Each node removes matching local rows only if source_updated_at <= cleanup_started_at.
  4. The shared-store cleanup sets deleted_at on matching Postgres rows using that same cutoff.
  5. Tombstoned rows appear in the normal replication stream, and pollers remove any remaining local copies.
  6. After a safety window, the tombstone can optionally be hard-deleted from Postgres.

The safety window (default 7 days) ensures slow or restarting nodes don’t miss the delete. Tombstone purging is not required for the initial implementation; it is an operational cleanup once we have real tombstone volume data.


Configuration

Env var Default Description
KEY_VALUE_MODE local local or distributed
DISTRIBUTED_KV_DATABASE_URL PlanetScale Postgres connection string
DISTRIBUTED_KV_POOL_SIZE 5 Connection pool size
DISTRIBUTED_KV_DATABASE_TIMEOUT_MS 10000 Shared-DB query timeout for shipper/poller operations
DISTRIBUTED_KV_SYNC_INTERVAL_MS 30000 Poller interval
DISTRIBUTED_KV_SHIP_INTERVAL_MS 200 Shipper interval
DISTRIBUTED_KV_SHIP_BATCH_SIZE 1000 Max rows per ship batch
DISTRIBUTED_KV_TOMBSTONE_RETENTION_DAYS 7 How long to keep tombstones
DISTRIBUTED_KV_NODE_NAME PUBLIC_HOST Identifier for this node, written to source_node. Used by the poller to filter out its own writes.

Telemetry

All new components emit telemetry events consistent with the existing patterns in Cache.PromEx and the codebase’s :telemetry.execute usage.

Event Measurements Metadata
[:cache, :kv, :replication, :ship, :flush] duration_ms, batch_size status (:ok, :error)
[:cache, :kv, :replication, :ship, :pending_rows] count
[:cache, :kv, :replication, :poll, :complete] duration_ms, rows_materialized, rows_deleted
[:cache, :kv, :replication, :poll, :lag_ms] lag_ms – (time between newest polled row’s updated_at and now)
[:cache, :kv, :replication, :ship, :timeout] count region
[:cache, :kv, :replication, :local_store, :size_bytes] size_bytes node, region
[:cache, :kv, :tombstone_purge, :complete] entries_purged, duration_ms

Alternatives Considered

Why a shared metadata store at all?

Each cache node has its own SQLite database. In a horizontally scaled region, there is no mechanism for sibling nodes behind the same load balancer to converge on the same KV state. The options were:

  1. Shared metadata store (chosen): nodes replicate KV state asynchronously to a central database. Local SQLite becomes an edge cache; the shared store is the global source of truth.
  2. S3-based replication: upload KV entries to S3 and have nodes poll/download them. Rejected because S3 PUTs are already a major cost driver for Xcode cache artifacts; adding per-entry KV uploads would multiply that cost for metadata that is small but high-frequency.
  3. Erlang clustering / distributed ETS / libcluster: rejected because the cache nodes are globally distributed across 7 regions with no shared network. BEAM distribution assumes low-latency, reliable connections. Cross-ocean netsplits would cause constant cluster instability.
  4. Do nothing; accept per-node misses: rejected because same-region miss/hit flip-flopping behind a load balancer is operationally unacceptable.

Why Postgres, not ClickHouse?

The KV metadata workload is OLTP, not analytics:

  • Point lookups by key (read path)
  • Frequent upserts with LWW conflict resolution (write path)
  • Deletes and tombstones (retention/cleanup)

ClickHouse is optimized for append-heavy event streams and columnar scans. Using it here would mean fighting its data model for correctness on mutations, deletes, and point lookups. Postgres is the boring-correct choice for this workload shape.

Why a dedicated Postgres, not the server’s Postgres?

  • The cache KV workload is bursty (up to 1000 writes/s), write-heavy, and operationally separate from the server’s transactional workload.
  • Mixing them risks cache bursts degrading server DB performance.
  • Independent scaling, maintenance windows, and failure domains.

Why not route KV reads/writes through the server application?

  • Adds an extra network hop, serialization layer, and queueing point on the request path.
  • The server is not on the cache request path today and should not be.
  • Cache nodes should talk directly to the shared metadata store.

Why us-east for the primary?

  • cache-us-east is by far the largest traffic region and will remain so for the foreseeable future.
  • The shared DB is still off the request path, so non-local regions pay latency only on background replication.
  • Lowest-risk placement because it is already where most cache traffic originates.

Why single-region Postgres is acceptable

  • Convergence can lag by minutes (stated requirement).
  • Most requests never touch the shared DB (local Cachex + SQLite serve the hot path).
  • Async replication amortizes RTT across batches.
  • Cross-ocean latency is paid by background shippers and pollers, not the request path.

Why poll-based replication, not LISTEN/NOTIFY or push

  • 7 globally distributed hosts with minutes-level convergence tolerance.
  • Polling is simpler, stateless, and tolerant of network interruptions.
  • LISTEN/NOTIFY requires persistent connections and is fragile across regions.
  • Push/fanout adds complexity for marginal latency improvement that is not needed.

Why pending replication coalesces by key

At 1000 req/s burst, if many writes hit the same key, the local SQLite row is updated in place and remains a single pending shipment. If writes hit different keys, the shipper batches them into a small number of SQL transactions. Without coalescing, 1000 req/s would mean 1000 remote upserts/s, which is fragile under cross-region latency.

Why no per-entry count cap on entries[]

  • Xcode controls the payload shape; we do not.
  • Currently one entry per key, but this may change.
  • A count cap would be brittle and require coordinated client/server changes.
  • The authoritative global row stores the opaque json_payload blob. Pathological payloads may slow background processing, not request serving.
  • Byte-size cap already exists at the request level (25MB body limit).

Why local mode must remain the default

Self-hosted Tuist users run a single cache node with block storage. They have no shared DB and should not need one. Forcing a roundtrip to a remote shared database would make their single-node setup strictly worse. All existing behavior (Cachex + SQLite, local eviction behavior) must remain unchanged and be the default.

Removal of key_value_entry_hashes

Today, KV eviction extracts CAS hashes from deleted entries, checks which are unreferenced via key_value_entry_hashes, and enqueues CASCleanupWorker jobs to delete artifacts from disk and S3. This coupling is removed entirely (both modes) for two reasons:

  1. S3 lifecycle policies own artifact expiration. Xcode cache artifacts are moving to S3 with aggressive lifecycle policies. Cache nodes should not be in the business of deleting S3 objects on eviction. The only S3 deletion that cache nodes perform is explicit user-initiated cleanup via tuist cache clean.
  2. Local disk orphans are already handled. When a KV entry is evicted, any CAS files on local disk become orphans. The existing OrphanCleanupWorker periodically walks the filesystem and deletes files that have no metadata entry. This is sufficient for local disk hygiene without the complexity of hash reference tracking.

Self-hosted deployments that use S3 must configure appropriate lifecycle policies on their buckets. This should be documented in the self-hosting guide.

The cleanup model after this change:

  • Eviction (both modes): delete KV metadata from SQLite. Nothing else.
  • OrphanCleanupWorker: finds and deletes orphaned CAS files from local disk.
  • S3 lifecycle policies: handle S3 artifact expiration.
  • tuist cache clean: explicit user action – deletes from local disk, S3, and (in distributed mode) writes Postgres tombstones.

Why local eviction must be decoupled from shared-store invalidation

In distributed mode, a local eviction only means “this node dropped its cached copy to free SQLite space.” The entry still exists globally in Postgres. Other nodes may still have it materialized.

  • Local eviction = free local space only. No tombstones.
  • Shared-store deletes are only for explicit invalidation flows such as project cleanup, and they propagate as tombstones.

Aligned with the proposal. This is definitely a strong need as we horizontally scale and we need a mechanism to share the values as we do for artifacts via S3.

Is there a specific reason to use PlanetScale over Supabase? I’m not opposed to this, but would like to see the reasoning that went into this.

Us wanting to switch the server database to PlanetScale at some point anyways, and the general price/performance ratio benefit of them PlanetScale vs Supabase benchmarks — PlanetScale

Also personally liking their dashboard better for query insights which might become helpful for across-the-world stuff.
No hard feelings about it though, the database provider is the one interchangeable bit in this proposal :slight_smile:

Thanks for putting this together :clap:

I was going to ask why not using our server Postgres DB through the server, but I noticed you addressed it at the end.

A couple of things I’d recommend mentioning somewhere in the proposal, for people that don’t have enough context, is:

  • What prompted this? A bit of context around the per-host bandwidth limit and how we are seeing sharding as a tool to overcome that and connect that with this piece of work.
  • Maybe a summary of what the state of things are in regards to what we store in that KV DB, and what are other kinds of artifacts that those nodes deal with. Just a summary