diff --git a/packages/ws-client/src/streams/OutboundStream.ts b/packages/ws-client/src/streams/OutboundStream.ts index 19bfe91..224e043 100644 --- a/packages/ws-client/src/streams/OutboundStream.ts +++ b/packages/ws-client/src/streams/OutboundStream.ts @@ -10,6 +10,7 @@ export default class OutboundStream { #localOnly: boolean = false; #timeoutHandle: number | null = null; #bufferFullBackoff = 50; + #sentProbe = false; readonly #disposer; constructor(db: DB, transport: Transport) { @@ -22,6 +23,7 @@ export default class OutboundStream { this.#lastSent = msg.since; this.#excludeSites = msg.excludeSites; this.#localOnly = msg.localOnly; + this.#sentProbe = false; // initial kickoff so we don't wait for a db change event this.#dbChanged(); }; @@ -56,6 +58,15 @@ export default class OutboundStream { } if (changes.length == 0) { + if (!this.#sentProbe) { + this.#sentProbe = true; + this.#transport.sendChanges({ + _tag: tags.Changes, + changes: [], + sender: this.#db.siteid, + since: lastSent, + }); + } return; } const lastChange = changes[changes.length - 1]; @@ -74,6 +85,7 @@ export default class OutboundStream { switch (didSend) { case "sent": this.#bufferFullBackoff = 50; + this.#sentProbe = true; break; case "buffer-full": this.#lastSent = lastSent; diff --git a/packages/ws-client/src/transport/Transport.ts b/packages/ws-client/src/transport/Transport.ts index 2f00dd3..3ab1dc1 100644 --- a/packages/ws-client/src/transport/Transport.ts +++ b/packages/ws-client/src/transport/Transport.ts @@ -3,6 +3,7 @@ import { Changes, RejectChanges, StartStreaming, + SyncStatus, } from "@vlcn.io/ws-common"; export type TransporOptions = { @@ -32,6 +33,8 @@ export interface Transport { onResetStream: ((msg: StartStreaming) => Promise) | null; + onSyncStatus?: ((msg: SyncStatus) => void) | null; + close(): void; // Connection lifecycle callbacks diff --git a/packages/ws-client/src/transport/WebSocketTransport.ts b/packages/ws-client/src/transport/WebSocketTransport.ts index 7aeea6d..cc16197 100644 --- a/packages/ws-client/src/transport/WebSocketTransport.ts +++ b/packages/ws-client/src/transport/WebSocketTransport.ts @@ -7,6 +7,7 @@ import { decode, encode, tags, + type SyncStatus, } from "@vlcn.io/ws-common"; export default class WebSocketTransport implements Transport { @@ -118,6 +119,7 @@ export default class WebSocketTransport implements Transport { // Connection event callbacks onConnOpen: (() => void) | null = null; onConnClose: (() => void) | null = null; + onSyncStatus: ((msg: SyncStatus) => void) | null = null; #processEvent = (data: Uint8Array) => { const msg = decode(data); @@ -143,6 +145,9 @@ export default class WebSocketTransport implements Transport { this.onStartStreaming && this.onStartStreaming(msg); } return; + case tags.SyncStatus: + this.onSyncStatus && this.onSyncStatus(msg); + return; case tags.Pong: // Right now pong is just a sign of life - handled above // TODO: we might implement additional data with the pong / heartbeat in the future diff --git a/packages/ws-common/src/decode.ts b/packages/ws-common/src/decode.ts index 9b104a2..ee4c853 100644 --- a/packages/ws-common/src/decode.ts +++ b/packages/ws-common/src/decode.ts @@ -12,6 +12,7 @@ import { Pong, RejectChanges, StartStreaming, + SyncStatus, TagValues, tags, CreateDbOnPrimaryResponse, @@ -103,6 +104,62 @@ export function decode(msg: Uint8Array): Msg { _reqid: decoding.readVarInt(decoder), err: decoding.readVarString(decoder), } satisfies Err; + case tags.SyncStatus: { + const ok = decoding.readUint8(decoder) === 1; + + let siteId: Uint8Array | undefined; + if (decoding.hasContent(decoder) && decoding.readUint8(decoder) === 1) { + siteId = decoding.readVarUint8Array(decoder); + } + + let schemaName: string | undefined; + if (decoding.hasContent(decoder) && decoding.readUint8(decoder) === 1) { + schemaName = decoding.readVarString(decoder); + } + + let schemaVersion: bigint | undefined; + if (decoding.hasContent(decoder) && decoding.readUint8(decoder) === 1) { + schemaVersion = decoding.readBigInt64(decoder); + } + + let schemaHash: string | undefined; + if (decoding.hasContent(decoder) && decoding.readUint8(decoder) === 1) { + schemaHash = decoding.readVarString(decoder); + } + + let stage: SyncStatus["stage"]; + if (decoding.hasContent(decoder) && decoding.readUint8(decoder) === 1) { + stage = decoding.readVarString(decoder) as SyncStatus["stage"]; + } + + let ackDbVersion: bigint | undefined; + if (decoding.hasContent(decoder) && decoding.readUint8(decoder) === 1) { + ackDbVersion = decoding.readBigInt64(decoder); + } + + let reason: string | undefined; + if (decoding.hasContent(decoder) && decoding.readUint8(decoder) === 1) { + reason = decoding.readVarString(decoder); + } + + let message: string | undefined; + if (decoding.hasContent(decoder) && decoding.readUint8(decoder) === 1) { + message = decoding.readVarString(decoder); + } + + return { + _tag: tags.SyncStatus, + ok, + siteId, + schemaName, + schemaVersion, + schemaHash, + stage, + ackDbVersion, + reason, + message, + } satisfies SyncStatus; + } default: tag as never; } diff --git a/packages/ws-common/src/encode.ts b/packages/ws-common/src/encode.ts index c8f60c9..847e240 100644 --- a/packages/ws-common/src/encode.ts +++ b/packages/ws-common/src/encode.ts @@ -58,6 +58,50 @@ export function encode(msg: Msg): Uint8Array { case tags.Err: encoding.writeVarInt(encoder, msg._reqid); encoding.writeVarString(encoder, msg.err); + return encoding.toUint8Array(encoder); + case tags.SyncStatus: + encoding.writeUint8(encoder, msg.ok ? 1 : 0); + + encoding.writeUint8(encoder, msg.siteId ? 1 : 0); + if (msg.siteId) { + encoding.writeVarUint8Array(encoder, msg.siteId); + } + + encoding.writeUint8(encoder, msg.schemaName ? 1 : 0); + if (msg.schemaName) { + encoding.writeVarString(encoder, msg.schemaName); + } + + encoding.writeUint8(encoder, msg.schemaVersion != null ? 1 : 0); + if (msg.schemaVersion != null) { + encoding.writeBigInt64(encoder, msg.schemaVersion); + } + + encoding.writeUint8(encoder, msg.schemaHash ? 1 : 0); + if (msg.schemaHash) { + encoding.writeVarString(encoder, msg.schemaHash); + } + + encoding.writeUint8(encoder, msg.stage ? 1 : 0); + if (msg.stage) { + encoding.writeVarString(encoder, msg.stage); + } + + encoding.writeUint8(encoder, msg.ackDbVersion != null ? 1 : 0); + if (msg.ackDbVersion != null) { + encoding.writeBigInt64(encoder, msg.ackDbVersion); + } + + encoding.writeUint8(encoder, msg.reason ? 1 : 0); + if (msg.reason) { + encoding.writeVarString(encoder, msg.reason); + } + + encoding.writeUint8(encoder, msg.message ? 1 : 0); + if (msg.message) { + encoding.writeVarString(encoder, msg.message); + } + return encoding.toUint8Array(encoder); } } diff --git a/packages/ws-common/src/msgTypes.ts b/packages/ws-common/src/msgTypes.ts index 15a046f..0005eb6 100644 --- a/packages/ws-common/src/msgTypes.ts +++ b/packages/ws-common/src/msgTypes.ts @@ -3,6 +3,7 @@ export type Msg = | Changes | RejectChanges | StartStreaming + | SyncStatus | CreateDbOnPrimary | ApplyChangesOnPrimary | Ping @@ -23,6 +24,7 @@ export const tags = { CreateDbOnPrimaryResponse: 9, Err: 10, ApplyChangesOnPrimaryResponse: 11, + SyncStatus: 12, } as const; export type Tags = typeof tags; @@ -76,6 +78,21 @@ export type StartStreaming = Readonly<{ localOnly: boolean; }>; +export type SyncStatusStage = "handshake" | "steady" | "apply_ack"; + +export type SyncStatus = Readonly<{ + _tag: Tags["SyncStatus"]; + ok: boolean; + siteId?: Uint8Array; + schemaName?: string; + schemaVersion?: bigint; + schemaHash?: string; + stage?: SyncStatusStage; + ackDbVersion?: bigint; + reason?: string; + message?: string; +}>; + export type CreateDbOnPrimary = Readonly<{ _tag: Tags["CreateDbOnPrimary"]; _reqid: number; diff --git a/packages/ws-server/src/ConnectionBroker.ts b/packages/ws-server/src/ConnectionBroker.ts index 83b94df..7883a54 100644 --- a/packages/ws-server/src/ConnectionBroker.ts +++ b/packages/ws-server/src/ConnectionBroker.ts @@ -1,6 +1,7 @@ -import { Msg, decode, encode, tags } from "@vlcn.io/ws-common"; +import { Msg, decode, encode, tags, uintArraysEqual, type AnnouncePresence, type SyncStatus } from "@vlcn.io/ws-common"; import SyncConnection, { createSyncConnection } from "./SyncConnection.js"; import DBCache from "./DBCache.js"; +import type { IDB } from "./DB.js"; import { WebSocket } from "ws"; import Transport from "./Trasnport.js"; import { logger } from "@vlcn.io/logger-provider"; @@ -11,6 +12,34 @@ export type Options = { room: string; }; +/** + * Checks whether a connecting client is coherent with the server's peer history. + * + * Returns `{ ok: true }` when the client is compatible, or + * `{ ok: false, reason: "peer_mismatch" }` when the server was rebuilt + * and the client has stale sync history. + */ +export function checkPeerCoherence( + db: IDB, + sender: Uint8Array, + lastSeens: readonly [Uint8Array, [bigint, number]][] +): { ok: true } | { ok: false; reason: "peer_mismatch" } { + const clientHasSyncHistory = lastSeens.length > 0; + if (!clientHasSyncHistory) { + return { ok: true }; + } + + const clientKnowsServer = lastSeens.some( + ([siteId]) => uintArraysEqual(siteId, db.siteId) + ); + + if (!clientKnowsServer) { + return { ok: false, reason: "peer_mismatch" }; + } + + return { ok: true }; +} + /** * A connection broker maps PartyKit connections to Database Sync connections * and dispatches messages from the PartyKitConnection to the appropriate @@ -21,11 +50,14 @@ export default class ConnectionBroker { readonly #dbCache; readonly #ws; readonly #room; + readonly #transport; + #closed = false; constructor({ ws, dbCache, room }: Options) { this.#dbCache = dbCache; this.#ws = ws; this.#room = room; + this.#transport = new Transport(ws); this.#ws.on("message", async (data) => { // TODO: for litefs support we should just read the tag out @@ -69,14 +101,37 @@ export default class ConnectionBroker { ); } - const syncConnection = await createSyncConnection( - this.#dbCache, - new Transport(this.#ws), - this.#room, - msg - ); - this.#syncConnection = syncConnection; - syncConnection.start(); + const status = await this.#buildSyncStatus(msg); + this.#transport.sendSyncStatus(status); + + if (!status.ok) { + logger.warn(`Closing connection for ${this.#room} due to incompatible sync status: ${status.reason || "unknown"}`); + this.#ws.close(1011, "sync_incompatible"); + return; + } + + try { + const syncConnection = await createSyncConnection( + this.#dbCache, + this.#transport, + this.#room, + msg + ); + this.#syncConnection = syncConnection; + syncConnection.start(); + } catch (err) { + const reason = err instanceof Error ? err.message : String(err); + logger.error(`Failed to create sync connection for ${this.#room}: ${reason}`); + this.#transport.sendSyncStatus({ + _tag: tags.SyncStatus, + ok: false, + reason: "server_error", + message: reason, + stage: "handshake", + }); + this.close(); + this.#ws.close(1011, "sync_setup_failed"); + } return; } case tags.Changes: { @@ -105,6 +160,69 @@ export default class ConnectionBroker { } close() { + if (this.#closed) { + return; + } + this.#closed = true; this.#syncConnection?.close(); } + + async #buildSyncStatus(msg: AnnouncePresence): Promise { + try { + return await this.#dbCache.use(this.#room, msg.schemaName, async (db) => { + const schemaMismatch = + db.schemaName !== msg.schemaName || db.schemaVersion !== msg.schemaVersion; + + if (schemaMismatch) { + return { + _tag: tags.SyncStatus, + ok: false, + siteId: db.siteId, + schemaName: db.schemaName, + schemaVersion: db.schemaVersion, + schemaHash: db.schemaVersion.toString(), + stage: "handshake", + reason: "schema_mismatch", + message: `Server schema ${db.schemaVersion.toString()} does not match client ${msg.schemaVersion.toString()}`, + }; + } + + const coherence = checkPeerCoherence(db, msg.sender, msg.lastSeens); + if (!coherence.ok) { + return { + _tag: tags.SyncStatus, + ok: false, + siteId: db.siteId, + schemaName: db.schemaName, + schemaVersion: db.schemaVersion, + schemaHash: db.schemaVersion.toString(), + stage: "handshake", + reason: "peer_mismatch", + message: "The server database was rebuilt. Your local data needs to be re-synced.", + }; + } + + const lastSeen = db.getLastSeen(msg.sender); + + return { + _tag: tags.SyncStatus, + ok: true, + siteId: db.siteId, + schemaName: db.schemaName, + schemaVersion: db.schemaVersion, + schemaHash: db.schemaVersion.toString(), + ackDbVersion: lastSeen?.[0], + stage: "handshake", + }; + }); + } catch (err) { + return { + _tag: tags.SyncStatus, + ok: false, + reason: "server_error", + message: err instanceof Error ? err.message : String(err), + stage: "handshake", + }; + } + } } diff --git a/packages/ws-server/src/DB.ts b/packages/ws-server/src/DB.ts index b3e3265..db8ef21 100644 --- a/packages/ws-server/src/DB.ts +++ b/packages/ws-server/src/DB.ts @@ -57,6 +57,7 @@ export default class DB implements IDB { readonly #applyChangesAndSetLastSeenTx; readonly #dbname; readonly #dbpath; + #closed = false; /** * A trivial `notifyOfChange` implementation. @@ -221,6 +222,9 @@ export default class DB implements IDB { } getLastSeen(site: Uint8Array): [bigint, number] { + if (this.#closed) { + return [0n, 0]; + } const result = this.#getLastSeenStmt.raw(true).get(site) as | [bigint, bigint] | null; @@ -236,6 +240,9 @@ export default class DB implements IDB { siteId: Uint8Array, newLastSeen: readonly [bigint, number] ): Promise { + if (this.#closed) { + return Promise.resolve(); + } this.#applyChangesAndSetLastSeenTx(changes, siteId, newLastSeen); if (this.#fsnotify == null) { this.#notifyOfChange(); @@ -249,6 +256,9 @@ export default class DB implements IDB { since: readonly [bigint, number], excludeSite: Uint8Array ): readonly Change[] { + if (this.#closed) { + return []; + } return this.#getChangesStmt.all(since[0], excludeSite) as Change[]; } @@ -272,6 +282,7 @@ export default class DB implements IDB { close() { this.#db.prepare(`SELECT crsql_finalize()`).run(); this.#db.close(); + this.#closed = true; } // No schema exists on the db. Straight apply it. diff --git a/packages/ws-server/src/DBCache.ts b/packages/ws-server/src/DBCache.ts index d762c05..fa62545 100644 --- a/packages/ws-server/src/DBCache.ts +++ b/packages/ws-server/src/DBCache.ts @@ -34,11 +34,15 @@ export default class DBCache { return ret[0]; } - async use(roomId: string, schemaName: string, cb: (db: IDB) => unknown) { + async use( + roomId: string, + schemaName: string, + cb: (db: IDB) => T | Promise + ): Promise { const version = getResidentSchemaVersion(schemaName, this.#config); const db = await this.getAndRef(roomId, schemaName, version); try { - await cb(db); + return await cb(db); } finally { this.unref(roomId); } diff --git a/packages/ws-server/src/SyncConnection.ts b/packages/ws-server/src/SyncConnection.ts index 3a9eaac..372b038 100644 --- a/packages/ws-server/src/SyncConnection.ts +++ b/packages/ws-server/src/SyncConnection.ts @@ -68,7 +68,15 @@ export default class SyncConnection { logger.info(`Sync connection closed`); this.#outboundStream.stop(); // tell the cache we're done. It'll close the db on 0 references. - this.#dbCache.unref(this.#room); + try { + this.#dbCache.unref(this.#room); + } catch (err) { + logger.warn( + `Failed to unref db cache entry for ${this.#room}: ${ + err instanceof Error ? err.message : String(err) + }` + ); + } } } diff --git a/packages/ws-server/src/Trasnport.ts b/packages/ws-server/src/Trasnport.ts index fe38b55..2395220 100644 --- a/packages/ws-server/src/Trasnport.ts +++ b/packages/ws-server/src/Trasnport.ts @@ -2,6 +2,7 @@ import { Changes, RejectChanges, StartStreaming, + SyncStatus, encode, } from "@vlcn.io/ws-common"; import { WebSocket } from "ws"; @@ -34,4 +35,8 @@ export default class Transport { startStreaming(msg: StartStreaming) { this.#ws.send(encode(msg)); } + + sendSyncStatus(msg: SyncStatus) { + this.#ws.send(encode(msg)); + } } diff --git a/packages/ws-server/src/__tests__/ConnectionBroker.test.ts b/packages/ws-server/src/__tests__/ConnectionBroker.test.ts new file mode 100644 index 0000000..4974729 --- /dev/null +++ b/packages/ws-server/src/__tests__/ConnectionBroker.test.ts @@ -0,0 +1,112 @@ +import { test, expect } from "vitest"; +import DB from "../DB.js"; +import { checkPeerCoherence } from "../ConnectionBroker.js"; +import { config, schemaVersion } from "./testConfig.js"; + +function createDb(name: string) { + return new DB(config, null, name, "test.sql", schemaVersion); +} + +test("new client with empty lastSeens is allowed", () => { + const db = createDb("coherence-empty-lastseens"); + try { + const result = checkPeerCoherence( + db, + new Uint8Array([1, 2, 3, 4]), + [] + ); + expect(result).toEqual({ ok: true }); + } finally { + db.close(); + } +}); + +test("returning client that knows server siteId is allowed", () => { + const db = createDb("coherence-knows-server"); + try { + const result = checkPeerCoherence( + db, + new Uint8Array([1, 2, 3, 4]), + [[db.siteId, [5n, 0]]] + ); + expect(result).toEqual({ ok: true }); + } finally { + db.close(); + } +}); + +test("client with stale history that does not know server is rejected", () => { + const db = createDb("coherence-stale-client"); + try { + const unknownSiteId = new Uint8Array([99, 98, 97, 96, 95, 94, 93, 92, 91, 90, 89, 88, 87, 86, 85, 84]); + const result = checkPeerCoherence( + db, + new Uint8Array([1, 2, 3, 4]), + [[unknownSiteId, [5n, 0]]] + ); + expect(result).toEqual({ ok: false, reason: "peer_mismatch" }); + } finally { + db.close(); + } +}); + +test("client unknown to server but referencing server siteId is allowed", () => { + const db = createDb("coherence-unknown-but-ref-server"); + try { + const unknownSender = new Uint8Array([10, 20, 30, 40]); + const result = checkPeerCoherence( + db, + unknownSender, + [[db.siteId, [5n, 0]]] + ); + expect(result).toEqual({ ok: true }); + } finally { + db.close(); + } +}); + +test("server knows client from prior session but client has stale lastSeens → rejected", async () => { + // This is the key scenario: after a server rebuild, the client connected once + // (before the coherence check existed) and the server recorded it. On subsequent + // reconnects the server "knows" the client, but the client's lastSeens still + // reference the OLD server siteId. This must still be rejected. + const db = createDb("coherence-server-knows-stale-client"); + try { + const clientSiteId = new Uint8Array([1, 2, 3, 4]); + // Simulate the server having recorded this client from a prior connection + await db.applyChangesetAndSetLastSeen([], clientSiteId, [47n, 0]); + expect(db.getLastSeen(clientSiteId)[0]).toBe(47n); + + const oldServerSiteId = new Uint8Array([99, 98, 97, 96, 95, 94, 93, 92, 91, 90, 89, 88, 87, 86, 85, 84]); + const result = checkPeerCoherence( + db, + clientSiteId, + [[oldServerSiteId, [5n, 0]]] + ); + expect(result).toEqual({ ok: false, reason: "peer_mismatch" }); + } finally { + db.close(); + } +}); + +test("schema mismatch takes precedence over peer mismatch in buildSyncStatus", () => { + // This test verifies that checkPeerCoherence itself doesn't check schemas — + // that's handled by #buildSyncStatus before calling checkPeerCoherence. + // Here we just confirm checkPeerCoherence only looks at peer data. + const db = createDb("coherence-schema-precedence"); + try { + // Even with a peer mismatch scenario, checkPeerCoherence returns peer_mismatch. + // The caller (#buildSyncStatus) is responsible for checking schemas first. + const unknownSiteId = new Uint8Array([99, 98, 97, 96, 95, 94, 93, 92, 91, 90, 89, 88, 87, 86, 85, 84]); + const result = checkPeerCoherence( + db, + new Uint8Array([1, 2, 3, 4]), + [[unknownSiteId, [5n, 0]]] + ); + expect(result).toEqual({ ok: false, reason: "peer_mismatch" }); + // This confirms schema_mismatch must be checked before calling checkPeerCoherence, + // which is exactly what #buildSyncStatus does. + } finally { + db.close(); + } +}); diff --git a/packages/ws-server/src/__tests__/DB.test.ts b/packages/ws-server/src/__tests__/DB.test.ts index 9672b83..a220bb4 100644 --- a/packages/ws-server/src/__tests__/DB.test.ts +++ b/packages/ws-server/src/__tests__/DB.test.ts @@ -26,3 +26,24 @@ test("write changes", () => {}); test("get last seen", () => {}); // TODO: test schema migration + +test("methods no-op safely after close", async () => { + const config: Config = { + schemaFolder: "./testSchemas", + dbFolder: null, + pathPattern: /\/vlcn-ws/, + }; + + const schemaContent = fs.readFileSync("./testSchemas/test.sql", "utf-8"); + const schemaVersion = cryb64(schemaContent); + const db = new DB(config, null, "closed-db", "test.sql", schemaVersion); + db.close(); + + expect(db.getLastSeen(new Uint8Array([1, 2, 3]))).toEqual([0n, 0]); + expect(() => + db.pullChangeset([0n, 0], new Uint8Array([9, 9, 9])) + ).not.toThrow(); + await expect( + db.applyChangesetAndSetLastSeen([], new Uint8Array([4, 5, 6]), [1n, 0]) + ).resolves.toBeUndefined(); +}); diff --git a/packages/ws-server/src/__tests__/OutboundStream.test.ts b/packages/ws-server/src/__tests__/OutboundStream.test.ts new file mode 100644 index 0000000..98c13c4 --- /dev/null +++ b/packages/ws-server/src/__tests__/OutboundStream.test.ts @@ -0,0 +1,65 @@ +import { test, expect } from "vitest"; +import { tags } from "@vlcn.io/ws-common"; +import type { RejectChanges } from "@vlcn.io/ws-common"; +import OutboundStream from "../streams/OutboundStream.js"; +import type { IDB } from "../DB.js"; +import type Transport from "../Trasnport.js"; + +// A crsql_changes row tuple: [table, pk, cid, val, col_version, db_version, site_id, cl, seq]. +// OutboundStream only cares about index 5 (db_version). +function change(dbVersion: bigint): any { + return ["item", new Uint8Array([1]), "v", null, 1n, dbVersion, null, 1n, 0]; +} + +// Minimal IDB: OutboundStream only uses siteId, onChange, pullChangeset. +function fakeDb(allChanges: any[]): IDB { + return { + siteId: new Uint8Array([0xaa, 0xbb, 0xcc, 0xdd]), + onChange: (_cb: () => void) => () => {}, + pullChangeset: (since: readonly [bigint, number], _excludeSite: Uint8Array) => + allChanges.filter((c) => (c[5] as bigint) > since[0]), + } as unknown as IDB; +} + +function recordingTransport(): { transport: Transport; sent: any[] } { + const sent: any[] = []; + const transport = { + sendChanges: (msg: any) => { + sent.push(msg); + return "sent" as const; + }, + rejectChanges: () => {}, + startStreaming: () => {}, + } as unknown as Transport; + return { transport, sent }; +} + +const clientId = new Uint8Array([0x01, 0x02, 0x03, 0x04]); + +test("reset() rewinds the cursor and re-sends after a peer rejects a gap", () => { + const db = fakeDb([change(1n), change(2n), change(3n)]); + const { transport, sent } = recordingTransport(); + + const stream = new OutboundStream(transport, db, [], clientId); + stream.start(); // initial kickoff sends everything, advancing the cursor to db_version 3 + + expect(sent).toHaveLength(1); + expect(sent[0].since).toEqual([0n, 0]); + expect(sent[0].changes.map((c: any) => c[5])).toEqual([1n, 2n, 3n]); + + // The peer only managed to apply up to db_version 1 (batch 2/3 failed or was lost), + // so it rejects and asks us to resume from [1, 0]. + const rejection: RejectChanges = { + _tag: tags.RejectChanges, + whose: db.siteId, + since: [1n, 0], + }; + stream.reset(rejection); + + // The server must rewind to [1, 0] and re-send the changes the peer is missing (2 and 3). + // Before the fix, reset() was a no-op and this second send never happened — the peer + // silently lost db_version 2 and 3 forever. + expect(sent).toHaveLength(2); + expect(sent[1].since).toEqual([1n, 0]); + expect(sent[1].changes.map((c: any) => c[5])).toEqual([2n, 3n]); +}); diff --git a/packages/ws-server/src/fs/FSNotify.ts b/packages/ws-server/src/fs/FSNotify.ts index 7246dc8..2cba326 100644 --- a/packages/ws-server/src/fs/FSNotify.ts +++ b/packages/ws-server/src/fs/FSNotify.ts @@ -67,9 +67,10 @@ export default class FSNotify { } addListener(dbid: string, cb: () => void) { - const listeners = this.listeners.get(dbid); + const key = util.fileEventNameToDbId(dbid); + const listeners = this.listeners.get(key); if (listeners == null) { - this.listeners.set(dbid, new Set([cb])); + this.listeners.set(key, new Set([cb])); } else { listeners.add(cb); } @@ -85,11 +86,12 @@ export default class FSNotify { } removeListener(dbid: string, cb: () => void) { - const listeners = this.listeners.get(dbid); + const key = util.fileEventNameToDbId(dbid); + const listeners = this.listeners.get(key); if (listeners != null) { listeners.delete(cb); if (listeners.size === 0) { - this.listeners.delete(dbid); + this.listeners.delete(key); } } } diff --git a/packages/ws-server/src/fs/util.ts b/packages/ws-server/src/fs/util.ts index 5960700..0ef3dbd 100644 --- a/packages/ws-server/src/fs/util.ts +++ b/packages/ws-server/src/fs/util.ts @@ -9,7 +9,8 @@ const ex = { }, fileEventNameToDbId(filename: string): string { - return path.parse(filename).name.replace(/-[pos|shm|wal]+$/, ""); + const base = path.basename(filename); + return base.replace(/\.touch$/, "").replace(/-(?:pos|shm|wal)$/, ""); }, needsTouchHack() { diff --git a/packages/ws-server/src/streams/InboundStream.ts b/packages/ws-server/src/streams/InboundStream.ts index 58ac746..5cfc996 100644 --- a/packages/ws-server/src/streams/InboundStream.ts +++ b/packages/ws-server/src/streams/InboundStream.ts @@ -1,4 +1,4 @@ -import { Changes, greaterThanOrEqual, tags } from "@vlcn.io/ws-common"; +import { Changes, greaterThanOrEqual, tags, type SyncStatus } from "@vlcn.io/ws-common"; import DB, { IDB } from "../DB.js"; import Transport from "../Trasnport.js"; @@ -14,6 +14,7 @@ export default class InboundStream { readonly #db; readonly #from; #lastSeen: readonly [bigint, number] | null = null; + #sentSteadyStatus = false; constructor(transport: Transport, db: IDB, from: Uint8Array) { this.#transport = transport; @@ -33,6 +34,10 @@ export default class InboundStream { localOnly: false, since: this.#lastSeen, }); + + // Immediately acknowledge readiness so the client can exit "connecting" even if there + // are no inbound changes to apply yet. + this.#sendApplyStatus(true, this.#lastSeen); } async receiveChanges(msg: Changes) { @@ -52,17 +57,66 @@ export default class InboundStream { }); } - if (msg.changes.length == 0) { + try { + if (msg.changes.length > 0) { + const lastChange = msg.changes[msg.changes.length - 1]; + const newLastSeen = [lastChange[5], 0] as const; + await this.#db.applyChangesetAndSetLastSeen( + msg.changes, + msg.sender, + newLastSeen + ); + + this.#lastSeen = newLastSeen; + } + this.#sendApplyStatus(true, this.#lastSeen); + } catch (err) { + this.#sendApplyStatus(false, null, err); + throw err; + } + } + + #sendApplyStatus( + ok: boolean, + lastSeen: readonly [bigint, number] | null, + err?: unknown + ) { + if (ok && lastSeen == null) { return; } - const lastChange = msg.changes[msg.changes.length - 1]; - const newLastSeen = [lastChange[5], 0] as const; - await this.#db.applyChangesetAndSetLastSeen( - msg.changes, - msg.sender, - newLastSeen - ); + const stage = ok + ? this.#sentSteadyStatus + ? ("apply_ack" as SyncStatus["stage"]) + : ("steady" as SyncStatus["stage"]) + : ("steady" as SyncStatus["stage"]); + + if (ok && !this.#sentSteadyStatus) { + this.#sentSteadyStatus = true; + } + + const status: SyncStatus = ok + ? { + _tag: tags.SyncStatus, + ok, + stage, + siteId: this.#db.siteId, + schemaName: this.#db.schemaName, + schemaVersion: this.#db.schemaVersion, + schemaHash: this.#db.schemaVersion.toString(), + ackDbVersion: lastSeen?.[0], + } + : { + _tag: tags.SyncStatus, + ok, + stage: this.#sentSteadyStatus ? "apply_ack" : "steady", + siteId: this.#db.siteId, + schemaName: this.#db.schemaName, + schemaVersion: this.#db.schemaVersion, + schemaHash: this.#db.schemaVersion.toString(), + reason: "apply_failed", + message: err instanceof Error ? err.message : String(err ?? "unknown"), + }; - this.#lastSeen = newLastSeen; + this.#transport.sendSyncStatus(status); } } diff --git a/packages/ws-server/src/streams/OutboundStream.ts b/packages/ws-server/src/streams/OutboundStream.ts index 2ea98aa..c72f3ef 100644 --- a/packages/ws-server/src/streams/OutboundStream.ts +++ b/packages/ws-server/src/streams/OutboundStream.ts @@ -4,8 +4,8 @@ import { tags, uintArraysEqual, } from "@vlcn.io/ws-common"; -import DB, { IDB } from "../DB.js"; -import Transport from "../Trasnport.js"; +import type { IDB } from "../DB.js"; +import type Transport from "../Trasnport.js"; import { logger } from "@vlcn.io/logger-provider"; /** @@ -57,8 +57,13 @@ export default class OutboundStream { } reset(msg: RejectChanges) { - // the peer rejected our changes. - // re-wind our stream back. + // The peer rejected our changes: they detected a gap because the `since` + // we sent was ahead of what they have actually applied (e.g. a previous + // batch failed to apply or was lost mid-connection). Rewind our cursor to + // the version they report and re-send from there. Without this, the gap is + // never re-delivered and the peer silently diverges forever. + this.#lastSent = msg.since; + this.#dbChanged(); } // db change notifications are already throttled for us in `DB.ts`