How we use RxDB at ClarityBoss (Part 2)

The Go and Postgres backend that keeps RxDB replication reliable.

Part 2 of RxDB at ClarityBoss, covering our Go implementation of the replication protocol and lessons from running it in production.

Dan McGeeEngineering

Part 1 covered how we model data in the frontend, why we landed on two collections, and what we did to make RxDB ergonomic in Vue. This part is about the server side.

The RxDB replication protocol defines three HTTP endpoints per collection: _pull, _push, and _pullStream. Implementing those basic endpoints is covered in the RxDB documentation. Keeping them reliable in production is where the interesting problems live.

A generic Go helper for all collections

We have multiple collections to replicate, and we do not want to duplicate protocol boilerplate for each one. So we built a generic RxDBHelper[T] that handles the protocol layer, while each collection provides its own database-specific functions:

// PullHelperFunc retrieves documents, given the account ID and pull request.
type PullHelperFunc[T RxDBBaseDocument] func(
    ctx context.Context, queries *db.Queries, accountID int32,
    input *RxDBPullRequest) (*RxDBPullBody[T], error)

// PushHelperFunc processes a document push request, given the account ID,
// the document to be pushed, and the current time. It returns a tuple of
// (conflict doc, error), where the conflict doc is non-nil if there was
// a conflict and should be included in the response.
type PushHelperFunc[T RxDBBaseDocument] func(
    ctx context.Context, queries *db.Queries, accountID int32,
    doc *RxDBPushDocument[T], now time.Time) (*T, error)

type RxDBHelper[T RxDBBaseDocument] struct {
    Pool           databasePool
    CollectionName string

    PullHelper      PullHelperFunc[T]
    PushHelper      PushHelperFunc[T]

    Shutdown <-chan struct{}
    Broker   *Broker[int32]
}

PullHelperFunc[T] and PushHelperFunc[T] carry the SQL logic for a given collection. The helper handles all the protocol plumbing: response shapes, conflict tracking, transaction management, and stream lifecycle. Each collection provides its own pair of helper functions and passes them in at startup.

The push helper function is used by the _push endpoint for the collection, while the pull helper is used by both _pull and pullStream, reducing the number of collection-specific functions needed, and keeping all the pull vs. pull stream distinctions in the generic code, out of collection-specific logic.

The interface that T must satisfy is intentionally small:

type RxDBBaseDocument interface {
    GetID() uuid.UUID
    IsDeleted() bool
}

Any struct that embeds RxDBDocument satisfies it automatically. Adding a new collection means writing two SQL-backed functions and wiring them up — nothing else.

If you’re interested in seeing the full real implementation, please reach out. This is code we could open source if abstracted properly out of our codebase.

Checkpoints in Postgres

RxDB replication is built around checkpoints. A checkpoint marks how far the client has already pulled, and the next pull request sends that checkpoint back so the server can return only newer documents.

We use (updated_at, id) as the checkpoint — a microsecond-precision timestamp and a UUID ID. The pull query uses Postgres tuple comparison:

SELECT *
FROM entry
WHERE account_id = @account_id
  AND (updated_at, id) > (@updated_at::timestamptz, @entry_id::uuid)
ORDER BY updated_at, id
LIMIT $1;

This is stable and deterministic. If two rows share the same updated_at, the UUID tiebreak preserves a consistent order. The LIMIT keeps response sizes bounded and lets RxDB page through large backlogs incrementally on a first sync.

On writes, upserts use ON CONFLICT (id) DO UPDATE SET ... updated_at = now(). Every save bumps updated_at, which is exactly what we want — any change to a document will show up in the next pull.

There is actually a slight problem here that we haven’t solved, but hasn’t become problematic yet. In Postgres, now() is the time the transaction started. If a transaction starts and is slow to commit, the updated_at value for that row could be earlier than a simultaneous fast-committing transaction, causing updates to be missed by the above query once those rows become visible after transaction commit.

The pull stream

The _pull endpoint is stateless: request in, documents out. _pullStream is different. It is a long-lived SSE connection that pushes new data to the client without polling.

The stream loop drives three things concurrently:

  1. Catch-up on connect: pull any documents newer than the client’s current checkpoint. This closes the gap between the client’s last disconnect and now.
  2. Live updates: wait for a notification from the broker, then pull and send new documents.
  3. Keepalive: if nothing arrives within 15 seconds, send a ping event to keep the connection alive through proxies and load balancers.

There is also a max duration timer. We run on Google Cloud Run, which we have set at a maximum request time of 900 seconds. We close the stream cleanly at 890 seconds on the server side to avoid noisy errors in our log about dropped connections. When the server closes the stream, the client reconnects automatically and sends its last checkpoint — no data is lost.

One more note about Cloud Run - you only get client connection-closed notifications if you’re using HTTP/2 as the transport layer between your Cloud Load Balancer and Cloud Run instance. This is critical for not wasting time or resources on SSE connections that are opened by the client, but the server doesn’t know the client has gone away.

Fanout with PostgreSQL LISTEN/NOTIFY

Cloud Run is horizontally scaled. When one instance handles a push and writes new data, the other instances need to know about it.

For us, we use Postgres LISTEN/NOTIFY as the coordination layer. After a successful push, we fire:

SELECT pg_notify($collection_name, $string_account_id)

The channel is the collection name ("entry", "person"), and the payload is the affected account ID. This runs inside the write transaction, so the notification is only delivered if the write commits.

A singleton PGBroker holds a dedicated Postgres connection per server instance and fans out incoming notifications to all active pull streams on that instance via an in-memory pub/sub broker. Each stream subscribes to its collection’s broker channel and wakes up only when relevant data changes.

We also use account ID 0 as a global broadcast signal for administrative operations that should trigger all active streams, regardless of account. The stream loop filters this specifically.

A few things we learned in production

_attachments forward compatibility

After a schema migration, RxDB started including an _attachments field in assumedMasterState on push requests. Because our API is strongly-typed, our API rejected these with a validation error. The fix was accepting the field without using it — we added it to RxDBDocument as an optional map. The broader lesson: when the client is generated by a library you do not control, design your API to tolerate fields you did not expect.

RESYNC timing matters

This is a frontend concern, but was only made obvious by our backend implmenetation. When the SSE connection drops, we need to tell RxDB to pull from the last checkpoint again. Our first implementation emitted RESYNC immediately on the EventSource onerror handler. That caused spurious pulls before the connection was actually re-established. The fix was setting a needsResync flag on error and emitting RESYNC from onopen — only after the connection is confirmed back up.

Wrapping up

The backend side of RxDB replication is not conceptually complex — it is mostly SQL queries, HTTP handlers, and a stream loop. However, it took us a while to get hte details just right. Deterministic checkpoints that stay stable under concurrent writes, Postgres LISTEN/NOTIFY for low-latency cross-instance fanout, and careful handling of stream lifecycle (reconnects, max duration, keepalive).

ClarityBoss

Get Results. Keep Your People.

Get the tool built for managers.

Back to Blog