From 589e12b8f3c7bce64188671d254e59f46292a5cf Mon Sep 17 00:00:00 2001 From: Tony Xiao Date: Wed, 6 May 2026 15:45:19 -0700 Subject: [PATCH 01/13] feat(destination-postgres): add PGlite support Refactor destination-postgres to support both pg.Pool and PGlite as backends. PGlite enables in-process WASM Postgres for testing without Docker and lightweight embedded deployments. - Add QueryClient/ManagedClient abstractions (src/client.ts) - Add `pglite` config option (true for in-memory, {data_dir} for persistent) - Refactor writeMany/upsertMany/deleteMany to accept QueryClient interface - Add @electric-sql/pglite as optional peer dependency - Add PGlite-specific test suite (no Docker required) Co-Authored-By: Claude Opus 4.6 (1M context) Committed-By-Agent: claude --- packages/destination-postgres/package.json | 7 +- packages/destination-postgres/src/client.ts | 64 +++++ packages/destination-postgres/src/index.ts | 156 +++++------ .../destination-postgres/src/pglite.test.ts | 244 ++++++++++++++++++ packages/destination-postgres/src/spec.ts | 13 + pnpm-lock.yaml | 3 + 6 files changed, 400 insertions(+), 87 deletions(-) create mode 100644 packages/destination-postgres/src/client.ts create mode 100644 packages/destination-postgres/src/pglite.test.ts diff --git a/packages/destination-postgres/package.json b/packages/destination-postgres/package.json index e4484fe2b..13d317bcc 100644 --- a/packages/destination-postgres/package.json +++ b/packages/destination-postgres/package.json @@ -30,7 +30,8 @@ }, "peerDependencies": { "@aws-sdk/client-sts": "^3", - "@aws-sdk/rds-signer": "^3" + "@aws-sdk/rds-signer": "^3", + "@electric-sql/pglite": "^0.2" }, "peerDependenciesMeta": { "@aws-sdk/client-sts": { @@ -38,11 +39,15 @@ }, "@aws-sdk/rds-signer": { "optional": true + }, + "@electric-sql/pglite": { + "optional": true } }, "devDependencies": { "@aws-sdk/client-sts": "^3.1013.0", "@aws-sdk/rds-signer": "^3.1013.0", + "@electric-sql/pglite": "^0.2.17", "@types/pg": "^8.15.5", "vitest": "^3.2.4" } diff --git a/packages/destination-postgres/src/client.ts b/packages/destination-postgres/src/client.ts new file mode 100644 index 000000000..26449a9d2 --- /dev/null +++ b/packages/destination-postgres/src/client.ts @@ -0,0 +1,64 @@ +import type pg from 'pg' +import type { Logger } from '@stripe/sync-logger' +import { log } from './logger.js' + +export interface QueryClient { + query(text: string, values?: unknown[]): Promise +} + +export interface ManagedClient extends QueryClient { + close(): Promise + stats?(): { total_count: number; idle_count: number; waiting_count: number } +} + +export function pgPoolClient(pool: pg.Pool, logger: Logger = log): ManagedClient { + pool.on('error', (err) => { + logger.error({ err }, 'Postgres destination pool error') + }) + + return { + query(text: string, values?: unknown[]) { + return pool.query(text, values) + }, + async close() { + await pool.end() + }, + stats() { + return { + total_count: pool.totalCount, + idle_count: pool.idleCount, + waiting_count: pool.waitingCount, + } + }, + } +} + +export async function pgliteClient( + config: { data_dir?: string } = {} +): Promise { + const { PGlite } = await import('@electric-sql/pglite') + const db = await PGlite.create(config.data_dir) + + return { + async query(text: string, values?: unknown[]) { + const result = await db.query(text, values) + return { + rows: result.rows as Record[], + rowCount: result.affectedRows ?? null, + command: '', + oid: 0, + fields: result.fields?.map((f) => ({ + ...f, + tableID: 0, + columnID: 0, + dataTypeSize: 0, + dataTypeModifier: 0, + format: 'text' as const, + })) ?? [], + } as pg.QueryResult + }, + async close() { + await db.close() + }, + } +} diff --git a/packages/destination-postgres/src/index.ts b/packages/destination-postgres/src/index.ts index 89113c07f..db3bd555c 100644 --- a/packages/destination-postgres/src/index.ts +++ b/packages/destination-postgres/src/index.ts @@ -20,10 +20,14 @@ import { import defaultSpec from './spec.js' import { log } from './logger.js' import type { Config } from './spec.js' +import { pgPoolClient, pgliteClient } from './client.js' +import type { QueryClient, ManagedClient } from './client.js' // MARK: - Spec export { configSchema, type Config } from './spec.js' +export { pgPoolClient, pgliteClient } from './client.js' +export type { QueryClient, ManagedClient } from './client.js' export async function buildPoolConfig(config: Config): Promise { if (config.aws) { @@ -80,7 +84,7 @@ export interface WriteManyResult extends UpsertManyResult, DeleteManyResult {} * cleaned up — no production user is on the soft-delete code path. */ export async function writeMany( - pool: pg.Pool, + client: QueryClient, schema: string, table: string, // eslint-disable-next-line @typescript-eslint/no-explicit-any @@ -91,8 +95,8 @@ export async function writeMany( const tombstones = entries.filter((e) => e.recordDeleted === true).map((r) => r.data) const liveRecords = entries.filter((e) => e.recordDeleted !== true).map((r) => r.data) - const u = await upsertMany(pool, schema, table, liveRecords, primaryKeyColumns, newerThanField) - const d = await deleteMany(pool, schema, table, tombstones, primaryKeyColumns) + const u = await upsertMany(client, schema, table, liveRecords, primaryKeyColumns, newerThanField) + const d = await deleteMany(client, schema, table, tombstones, primaryKeyColumns) return { ...u, deleted_count: d.deleted_count } } @@ -102,7 +106,7 @@ export async function writeMany( * `_synced_at` is the destination write time. */ export async function upsertMany( - pool: pg.Pool, + client: QueryClient, schema: string, table: string, // eslint-disable-next-line @typescript-eslint/no-explicit-any @@ -128,7 +132,7 @@ export async function upsertMany( return { _raw_data: e, _synced_at: syncedAt, _updated_at: new Date(ts * 1000).toISOString() } }) - return await upsertWithStats(pool, records, { + return await upsertWithStats(client, records, { schema, table, primaryKeyColumns, @@ -141,7 +145,7 @@ export async function upsertMany( * terminal — once an object is deleted it cannot be undeleted. */ export async function deleteMany( - pool: pg.Pool, + client: QueryClient, schema: string, table: string, // eslint-disable-next-line @typescript-eslint/no-explicit-any @@ -165,7 +169,7 @@ export async function deleteMany( USING (VALUES ${valueRows.join(', ')}) AS d(${identList(primaryKeyColumns)}) WHERE ${pkJoin}` - const result = await pool.query(stmt, params) + const result = await client.query(stmt, params) return { deleted_count: result.rowCount ?? 0 } } @@ -187,7 +191,7 @@ export { /** Throw if any stream's catalog enum allow-list disagrees with an existing CHECK constraint. */ async function assertEnumConstraintsConsistent( - pool: pg.Pool, + client: QueryClient, schema: string, streams: ReadonlyArray<{ stream: { name: string; json_schema?: Record } }> ): Promise { @@ -203,7 +207,7 @@ async function assertEnumConstraintsConsistent( if (enumColumns.size === 0) return const existing = await getExistingEnumAllowLists( - pool, + client, schema, streams.map((s) => s.stream.name), [...enumColumns] @@ -236,23 +240,6 @@ function errorMessage(err: unknown): string { return (err as NodeJS.ErrnoException).code ?? err.constructor.name } -function createPool(config: PoolConfig): pg.Pool { - const pool = new pg.Pool(config) - // Destination connectors should surface pool failures without crashing the host process. - pool.on('error', (err) => { - log.error({ err }, 'Postgres destination pool error') - }) - return pool -} - -function poolStats(pool: pg.Pool) { - return { - total_count: pool.totalCount, - idle_count: pool.idleCount, - waiting_count: pool.waitingCount, - } -} - function describePoolConfig(config: PoolConfig) { return { host: config.host, @@ -269,7 +256,19 @@ function describePoolConfig(config: PoolConfig) { } } -async function createInstrumentedPool(config: Config, operation: string): Promise { +async function createManagedClient(config: Config, operation: string): Promise { + if (config.pglite) { + const dataDir = config.pglite === true ? undefined : config.pglite.data_dir + log.debug({ operation, data_dir: dataDir }, 'dest postgres: creating PGlite client') + const startedAt = Date.now() + const client = await pgliteClient({ data_dir: dataDir }) + log.debug( + { operation, duration_ms: Date.now() - startedAt }, + 'dest postgres: PGlite client ready' + ) + return client + } + const configStartedAt = Date.now() log.debug({ operation }, 'dest postgres: building pool config') const poolConfig = await buildPoolConfig(config) @@ -282,35 +281,27 @@ async function createInstrumentedPool(config: Config, operation: string): Promis 'dest postgres: built pool config' ) - const pool = withQueryLogging(createPool(poolConfig), log) - log.debug({ operation, ...poolStats(pool) }, 'dest postgres: pool created') - return pool + const pool = withQueryLogging(new pg.Pool(poolConfig), log) + const client = pgPoolClient(pool, log) + log.debug({ operation, ...client.stats?.() }, 'dest postgres: pool created') + return client } -async function connectAndRelease(pool: pg.Pool, operation: string): Promise { +async function verifyConnectivity(client: ManagedClient, operation: string): Promise { const startedAt = Date.now() - log.debug({ operation, ...poolStats(pool) }, 'dest postgres: pool.connect start') - const client = await pool.connect() - try { - log.debug( - { - operation, - duration_ms: Date.now() - startedAt, - ...poolStats(pool), - }, - 'dest postgres: pool.connect complete' - ) - } finally { - client.release() - log.debug({ operation, ...poolStats(pool) }, 'dest postgres: pool.connect released') - } + log.debug({ operation, ...client.stats?.() }, 'dest postgres: connectivity check start') + await client.query('SELECT 1') + log.debug( + { operation, duration_ms: Date.now() - startedAt, ...client.stats?.() }, + 'dest postgres: connectivity check complete' + ) } -async function endPool(pool: pg.Pool, operation: string): Promise { +async function closeClient(client: ManagedClient, operation: string): Promise { const startedAt = Date.now() - log.debug({ operation, ...poolStats(pool) }, 'dest postgres: pool.end start') - await pool.end() - log.debug({ operation, duration_ms: Date.now() - startedAt }, 'dest postgres: pool.end complete') + log.debug({ operation, ...client.stats?.() }, 'dest postgres: closing client') + await client.close() + log.debug({ operation, duration_ms: Date.now() - startedAt }, 'dest postgres: client closed') } const destination = { @@ -319,10 +310,9 @@ const destination = { }, async *check({ config }) { - const pool = await createInstrumentedPool(config, 'check') + const client = await createManagedClient(config, 'check') try { - await connectAndRelease(pool, 'check') - await pool.query('SELECT 1') + await verifyConnectivity(client, 'check') yield { type: 'connection_status' as const, connection_status: { status: 'succeeded' as const }, @@ -336,40 +326,35 @@ const destination = { }, } } finally { - await endPool(pool, 'check') + await closeClient(client, 'check') } }, async *setup({ config, catalog }) { - log.debug({ schema: config.schema }, 'dest setup: connecting to pool') - const pool = await createInstrumentedPool(config, 'setup') + log.debug({ schema: config.schema }, 'dest setup: creating client') + const client = await createManagedClient(config, 'setup') try { - await connectAndRelease(pool, 'setup') + await verifyConnectivity(client, 'setup') log.info(`Creating schema "${config.schema}" (${catalog.streams.length} streams)`) log.debug('dest setup: creating schema') - await pool.query(sql`CREATE SCHEMA IF NOT EXISTS "${config.schema}"`) - // Backward-compat: drop legacy `set_updated_at()` (CASCADE removes any orphan `handle_updated_at` triggers from older deployments). + await client.query(sql`CREATE SCHEMA IF NOT EXISTS "${config.schema}"`) log.debug('dest setup: dropping legacy set_updated_at() function') - await pool.query(sql`DROP FUNCTION IF EXISTS "${config.schema}".set_updated_at() CASCADE`) + await client.query(sql`DROP FUNCTION IF EXISTS "${config.schema}".set_updated_at() CASCADE`) - // The DO $check$ block uses ADD CONSTRAINT + EXCEPTION WHEN duplicate_object, - // which silently no-ops on a changed enum list — surface it loudly instead. - await assertEnumConstraintsConsistent(pool, config.schema, catalog.streams) + await assertEnumConstraintsConsistent(client, config.schema, catalog.streams) log.debug({ streamCount: catalog.streams.length }, 'dest setup: creating tables') - await Promise.all( - catalog.streams.map(async (cs) => { - await pool.query( - buildCreateTableDDL(config.schema, cs.stream.name, cs.stream.json_schema ?? {}, { - system_columns: cs.system_columns, - primary_key: cs.stream.primary_key, - }) - ) - }) - ) + for (const cs of catalog.streams) { + await client.query( + buildCreateTableDDL(config.schema, cs.stream.name, cs.stream.json_schema ?? {}, { + system_columns: cs.system_columns, + primary_key: cs.stream.primary_key, + }) + ) + } log.debug('dest setup: complete') } finally { - await endPool(pool, 'setup') + await closeClient(client, 'setup') } }, @@ -380,17 +365,17 @@ const destination = { `Refusing to drop protected schema "${config.schema}" — teardown only drops user-created schemas` ) } - const pool = await createInstrumentedPool(config, 'teardown') + const client = await createManagedClient(config, 'teardown') try { - await connectAndRelease(pool, 'teardown') - await pool.query(sql`DROP SCHEMA IF EXISTS "${config.schema}" CASCADE`) + await verifyConnectivity(client, 'teardown') + await client.query(sql`DROP SCHEMA IF EXISTS "${config.schema}" CASCADE`) } finally { - await endPool(pool, 'teardown') + await closeClient(client, 'teardown') } }, async *write({ config, catalog }, $stdin) { - const pool = await createInstrumentedPool(config, 'write') + const client = await createManagedClient(config, 'write') const batchSize = config.batch_size // eslint-disable-next-line @typescript-eslint/no-explicit-any const streamBuffers = new Map[]>() @@ -406,7 +391,6 @@ const destination = { const failedStreams = new Set() - /** Flush and return error message if failed, undefined if ok. */ const flushStream = async (streamName: string): Promise => { if (failedStreams.has(streamName)) return undefined const buffer = streamBuffers.get(streamName) @@ -421,12 +405,12 @@ const destination = { schema: config.schema, primary_key: pk, newer_than_field: newerThan, - ...poolStats(pool), + ...client.stats?.(), }, 'dest write: flush start' ) try { - const stats = await writeMany(pool, config.schema, streamName, buffer, pk, newerThan) + const stats = await writeMany(client, config.schema, streamName, buffer, pk, newerThan) log.debug( { stream: streamName, @@ -438,7 +422,7 @@ const destination = { deleted: stats.deleted_count, skipped: stats.skipped_count, duration_ms: Date.now() - startedAt, - ...poolStats(pool), + ...client.stats?.(), }, `dest write: upsert ${config.schema}.${streamName}` ) @@ -455,7 +439,7 @@ const destination = { duration_ms: Date.now() - startedAt, error: errMsg, err, - ...poolStats(pool), + ...client.stats?.(), }, 'dest write: flush failed' ) @@ -475,7 +459,7 @@ const destination = { } try { - await connectAndRelease(pool, 'write') + await verifyConnectivity(client, 'write') for await (const msg of $stdin) { if (msg.type === 'record') { const { stream } = msg.record @@ -548,7 +532,7 @@ const destination = { log.debug(`Postgres destination: wrote to schema "${config.schema}"`) } } finally { - await endPool(pool, 'write') + await closeClient(client, 'write') } }, } satisfies Destination diff --git a/packages/destination-postgres/src/pglite.test.ts b/packages/destination-postgres/src/pglite.test.ts new file mode 100644 index 000000000..063ee16d1 --- /dev/null +++ b/packages/destination-postgres/src/pglite.test.ts @@ -0,0 +1,244 @@ +import { mkdtempSync, rmSync } from 'fs' +import { tmpdir } from 'os' +import { join } from 'path' +import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it } from 'vitest' +import destination, { deleteMany, upsertMany, writeMany, pgliteClient } from './index.js' +import type { ManagedClient } from './client.js' +import type { ConfiguredCatalog, DestinationInput, DestinationOutput } from '@stripe/sync-protocol' +import { collectFirst, drain } from '@stripe/sync-protocol' +import type { Config } from './spec.js' + +const SCHEMA = 'test_dest' +let dataDir: string + +function makeConfig(): Config { + return { pglite: { data_dir: dataDir }, schema: SCHEMA, batch_size: 100 } +} + +beforeAll(() => { + dataDir = mkdtempSync(join(tmpdir(), 'pglite-test-')) +}) + +afterAll(() => { + rmSync(dataDir, { recursive: true, force: true }) +}) + +let nextRecordTs = Math.floor(Date.now() / 1000) +function makeRecord(stream: string, data: Record) { + return { + type: 'record' as const, + record: { + stream, + data: { _updated_at: nextRecordTs++, ...data }, + emitted_at: new Date().toISOString(), + }, + } +} + +function makeState(stream: string, data: unknown) { + return { type: 'source_state' as const, source_state: { stream, data } } +} + +async function* toAsyncIter(msgs: DestinationInput[]): AsyncIterable { + for (const msg of msgs) yield msg +} + +async function collectOutputs(iter: AsyncIterable): Promise { + const results: DestinationOutput[] = [] + for await (const msg of iter) results.push(msg) + return results +} + +async function resetSchema() { + const c = await pgliteClient({ data_dir: dataDir }) + await c.query(`DROP SCHEMA IF EXISTS "${SCHEMA}" CASCADE`) + await c.close() +} + +const catalog: ConfiguredCatalog = { + streams: [ + { + stream: { + name: 'customer', + primary_key: [['id']], + newer_than_field: '_updated_at', + metadata: {}, + }, + sync_mode: 'full_refresh', + destination_sync_mode: 'overwrite', + }, + ], +} + +describe('PGlite destination', () => { + beforeEach(async () => { + await resetSchema() + }) + + describe('check()', () => { + it('succeeds with pglite config', async () => { + const statusMsg = await collectFirst( + destination.check({ config: makeConfig() }), + 'connection_status' + ) + expect(statusMsg.connection_status.status).toBe('succeeded') + }) + }) + + describe('setup()', () => { + it('creates schema and table', async () => { + await drain(destination.setup!({ config: makeConfig(), catalog })) + + const c = await pgliteClient({ data_dir: dataDir }) + try { + const { rows } = await c.query( + `SELECT table_name FROM information_schema.tables WHERE table_schema = $1`, + [SCHEMA] + ) + expect(rows.map((r) => r.table_name)).toContain('customer') + } finally { + await c.close() + } + }) + }) + + describe('write()', () => { + beforeEach(async () => { + await drain(destination.setup!({ config: makeConfig(), catalog })) + }) + + it('upserts records via PGlite', async () => { + const messages = toAsyncIter([ + makeRecord('customer', { id: 'cus_1', name: 'Alice' }), + makeRecord('customer', { id: 'cus_2', name: 'Bob' }), + ]) + + const outputs = await collectOutputs(destination.write({ config: makeConfig(), catalog }, messages)) + const records = outputs.filter((m) => m.type === 'record') + expect(records).toHaveLength(2) + }) + + it('re-emits SourceStateMessage after flushing', async () => { + const stateData = { cursor: 'abc123' } + const messages = toAsyncIter([ + makeRecord('customer', { id: 'cus_1', name: 'Alice' }), + makeState('customer', stateData), + ]) + + const outputs = await collectOutputs(destination.write({ config: makeConfig(), catalog }, messages)) + const stateOutputs = outputs.filter((m) => m.type === 'source_state') + expect(stateOutputs).toHaveLength(1) + expect(stateOutputs[0]).toEqual({ + type: 'source_state', + source_state: { stream: 'customer', data: stateData }, + }) + }) + + it('handles upsert (ON CONFLICT update)', async () => { + const messages1 = toAsyncIter([makeRecord('customer', { id: 'cus_1', name: 'Alice' })]) + await collectOutputs(destination.write({ config: makeConfig(), catalog }, messages1)) + + const messages2 = toAsyncIter([makeRecord('customer', { id: 'cus_1', name: 'Alice Updated' })]) + await collectOutputs(destination.write({ config: makeConfig(), catalog }, messages2)) + + const c = await pgliteClient({ data_dir: dataDir }) + try { + const { rows } = await c.query( + `SELECT _raw_data->>'name' AS name FROM "${SCHEMA}".customer WHERE id = 'cus_1'` + ) + expect(rows[0].name).toBe('Alice Updated') + } finally { + await c.close() + } + }) + }) +}) + +describe('PGlite upsertMany / deleteMany / writeMany', () => { + let client: ManagedClient + + beforeEach(async () => { + client = await pgliteClient({ data_dir: dataDir }) + await client.query(`DROP SCHEMA IF EXISTS "${SCHEMA}" CASCADE`) + await client.query(`CREATE SCHEMA IF NOT EXISTS "${SCHEMA}"`) + await client.query(`CREATE TABLE IF NOT EXISTS "${SCHEMA}".customer ( + "_raw_data" jsonb NOT NULL, + "_synced_at" timestamptz NOT NULL DEFAULT now(), + "_updated_at" timestamptz NOT NULL DEFAULT now(), + "id" text GENERATED ALWAYS AS ((_raw_data->>'id')::text) STORED, + PRIMARY KEY ("id") + )`) + }) + + afterEach(async () => { + await client?.close() + }) + + it('upsertMany inserts records', async () => { + const ts = Math.floor(Date.now() / 1000) + await upsertMany( + client, + SCHEMA, + 'customer', + [ + { id: 'cus_10', name: 'Direct', _updated_at: ts }, + { id: 'cus_11', name: 'Insert', _updated_at: ts }, + ], + ['id'], + '_updated_at' + ) + + const { rows } = await client.query(`SELECT count(*)::int AS n FROM "${SCHEMA}".customer`) + expect(rows[0].n).toBe(2) + }) + + it('deleteMany removes rows', async () => { + const ts = Math.floor(Date.now() / 1000) + await upsertMany( + client, + SCHEMA, + 'customer', + [ + { id: 'cus_keep', name: 'Keep', _updated_at: ts }, + { id: 'cus_drop', name: 'Drop', _updated_at: ts }, + ], + ['id'], + '_updated_at' + ) + + const result = await deleteMany(client, SCHEMA, 'customer', [{ id: 'cus_drop' }], ['id']) + expect(result.deleted_count).toBe(1) + + const { rows } = await client.query(`SELECT id FROM "${SCHEMA}".customer ORDER BY id`) + expect(rows).toEqual([{ id: 'cus_keep' }]) + }) + + it('writeMany routes mixed batch to upsert and delete', async () => { + const ts = Math.floor(Date.now() / 1000) + await upsertMany( + client, + SCHEMA, + 'customer', + [{ id: 'cus_old', name: 'Old', _updated_at: ts }], + ['id'], + '_updated_at' + ) + + const result = await writeMany( + client, + SCHEMA, + 'customer', + [ + { data: { id: 'cus_new', name: 'New', _updated_at: ts + 1 } }, + { recordDeleted: true, data: { id: 'cus_old', _updated_at: ts + 1 } }, + ], + ['id'], + '_updated_at' + ) + expect(result.created_count).toBe(1) + expect(result.deleted_count).toBe(1) + + const { rows } = await client.query(`SELECT id FROM "${SCHEMA}".customer ORDER BY id`) + expect(rows).toEqual([{ id: 'cus_new' }]) + }) +}) diff --git a/packages/destination-postgres/src/spec.ts b/packages/destination-postgres/src/spec.ts index f7b67b869..4a20dfb4b 100644 --- a/packages/destination-postgres/src/spec.ts +++ b/packages/destination-postgres/src/spec.ts @@ -19,6 +19,15 @@ export const configSchema = z }) .optional() .describe('AWS RDS IAM authentication config'), + pglite: z + .union([ + z.literal(true), + z.object({ + data_dir: z.string().optional().describe('Directory for persistent storage (omit for in-memory)'), + }), + ]) + .optional() + .describe('Use PGlite (in-process WASM Postgres) instead of connecting to an external server'), ssl_ca_pem: z .string() .optional() @@ -30,6 +39,10 @@ export const configSchema = z message: 'Specify either url/connection_string or aws config, not both', path: ['aws'], }) + .refine((config) => !(config.pglite && (config.url || config.connection_string || config.aws)), { + message: 'Specify pglite OR url/connection_string/aws, not both', + path: ['pglite'], + }) export type Config = z.infer diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index d2955c56b..be1bfe14c 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -528,6 +528,9 @@ importers: '@aws-sdk/rds-signer': specifier: ^3.1013.0 version: 3.1013.0 + '@electric-sql/pglite': + specifier: ^0.2.17 + version: 0.2.17 '@types/pg': specifier: ^8.15.5 version: 8.15.6 From 9b03622a3e67466237f10f8af248cce89bdf0442 Mon Sep 17 00:00:00 2001 From: Tony Xiao Date: Wed, 6 May 2026 15:45:40 -0700 Subject: [PATCH 02/13] chore: regenerate OpenAPI specs for pglite config Co-Authored-By: Claude Opus 4.6 (1M context) Committed-By-Agent: claude --- apps/engine/src/__generated__/openapi.d.ts | 4 ++++ apps/engine/src/__generated__/openapi.json | 18 ++++++++++++++++++ apps/service/src/__generated__/openapi.d.ts | 4 ++++ apps/service/src/__generated__/openapi.json | 18 ++++++++++++++++++ 4 files changed, 44 insertions(+) diff --git a/apps/engine/src/__generated__/openapi.d.ts b/apps/engine/src/__generated__/openapi.d.ts index 9f90086ad..27ab0bc0c 100644 --- a/apps/engine/src/__generated__/openapi.d.ts +++ b/apps/engine/src/__generated__/openapi.d.ts @@ -532,6 +532,10 @@ export interface components { /** @description External ID for STS AssumeRole */ external_id?: string; }; + pglite?: true | { + /** @description Directory for persistent storage (omit for in-memory) */ + data_dir?: string; + }; /** @description PEM-encoded CA certificate for SSL verification (required for verify-ca / verify-full with a private CA) */ ssl_ca_pem?: string; }; diff --git a/apps/engine/src/__generated__/openapi.json b/apps/engine/src/__generated__/openapi.json index f7d8ecf2b..fdc167f79 100644 --- a/apps/engine/src/__generated__/openapi.json +++ b/apps/engine/src/__generated__/openapi.json @@ -1572,6 +1572,24 @@ "additionalProperties": false, "description": "AWS RDS IAM authentication config" }, + "pglite": { + "anyOf": [ + { + "type": "boolean", + "const": true + }, + { + "type": "object", + "properties": { + "data_dir": { + "type": "string", + "description": "Directory for persistent storage (omit for in-memory)" + } + }, + "additionalProperties": false + } + ] + }, "ssl_ca_pem": { "type": "string", "description": "PEM-encoded CA certificate for SSL verification (required for verify-ca / verify-full with a private CA)" diff --git a/apps/service/src/__generated__/openapi.d.ts b/apps/service/src/__generated__/openapi.d.ts index d1c5a5b6c..b52029292 100644 --- a/apps/service/src/__generated__/openapi.d.ts +++ b/apps/service/src/__generated__/openapi.d.ts @@ -467,6 +467,10 @@ export interface components { /** @description External ID for STS AssumeRole */ external_id?: string; }; + pglite?: true | { + /** @description Directory for persistent storage (omit for in-memory) */ + data_dir?: string; + }; /** @description PEM-encoded CA certificate for SSL verification (required for verify-ca / verify-full with a private CA) */ ssl_ca_pem?: string; }; diff --git a/apps/service/src/__generated__/openapi.json b/apps/service/src/__generated__/openapi.json index d24c1a737..dce2c6500 100644 --- a/apps/service/src/__generated__/openapi.json +++ b/apps/service/src/__generated__/openapi.json @@ -1783,6 +1783,24 @@ "additionalProperties": false, "description": "AWS RDS IAM authentication config" }, + "pglite": { + "anyOf": [ + { + "type": "boolean", + "const": true + }, + { + "type": "object", + "properties": { + "data_dir": { + "type": "string", + "description": "Directory for persistent storage (omit for in-memory)" + } + }, + "additionalProperties": false + } + ] + }, "ssl_ca_pem": { "type": "string", "description": "PEM-encoded CA certificate for SSL verification (required for verify-ca / verify-full with a private CA)" From f1404e5fc16058f67f4aeb913f5675e71f1073f6 Mon Sep 17 00:00:00 2001 From: Tony Xiao Date: Wed, 6 May 2026 16:08:58 -0700 Subject: [PATCH 03/13] feat: add allow_experimental_pglite gate, file:///memory:// URL support, upgrade to pglite 0.4.5 - Add `allow_experimental_pglite` boolean config gate - Detect file:// and memory:// URL schemes and route to PGlite - Upgrade @electric-sql/pglite from 0.2.17 to 0.4.5 Co-Authored-By: Claude Opus 4.6 (1M context) Committed-By-Agent: claude --- packages/destination-postgres/package.json | 4 ++-- packages/destination-postgres/src/client.ts | 10 ++++++++-- packages/destination-postgres/src/index.ts | 14 ++++++++------ .../destination-postgres/src/pglite.test.ts | 2 +- packages/destination-postgres/src/spec.ts | 17 +++++++++++++++++ pnpm-lock.yaml | 9 +++++++-- 6 files changed, 43 insertions(+), 13 deletions(-) diff --git a/packages/destination-postgres/package.json b/packages/destination-postgres/package.json index 13d317bcc..39b0ca544 100644 --- a/packages/destination-postgres/package.json +++ b/packages/destination-postgres/package.json @@ -31,7 +31,7 @@ "peerDependencies": { "@aws-sdk/client-sts": "^3", "@aws-sdk/rds-signer": "^3", - "@electric-sql/pglite": "^0.2" + "@electric-sql/pglite": "^0.4.5" }, "peerDependenciesMeta": { "@aws-sdk/client-sts": { @@ -47,7 +47,7 @@ "devDependencies": { "@aws-sdk/client-sts": "^3.1013.0", "@aws-sdk/rds-signer": "^3.1013.0", - "@electric-sql/pglite": "^0.2.17", + "@electric-sql/pglite": "^0.4.5", "@types/pg": "^8.15.5", "vitest": "^3.2.4" } diff --git a/packages/destination-postgres/src/client.ts b/packages/destination-postgres/src/client.ts index 26449a9d2..50a537200 100644 --- a/packages/destination-postgres/src/client.ts +++ b/packages/destination-postgres/src/client.ts @@ -33,11 +33,17 @@ export function pgPoolClient(pool: pg.Pool, logger: Logger = log): ManagedClient } } +export function isPGliteUrl(url: string): boolean { + return url.startsWith('file://') || url.startsWith('memory://') +} + export async function pgliteClient( - config: { data_dir?: string } = {} + config: { data_dir?: string; url?: string } = {} ): Promise { const { PGlite } = await import('@electric-sql/pglite') - const db = await PGlite.create(config.data_dir) + + const dataSource = config.url ?? config.data_dir + const db = await PGlite.create(dataSource) return { async query(text: string, values?: unknown[]) { diff --git a/packages/destination-postgres/src/index.ts b/packages/destination-postgres/src/index.ts index db3bd555c..d048df0bc 100644 --- a/packages/destination-postgres/src/index.ts +++ b/packages/destination-postgres/src/index.ts @@ -20,13 +20,13 @@ import { import defaultSpec from './spec.js' import { log } from './logger.js' import type { Config } from './spec.js' -import { pgPoolClient, pgliteClient } from './client.js' +import { pgPoolClient, pgliteClient, isPGliteUrl } from './client.js' import type { QueryClient, ManagedClient } from './client.js' // MARK: - Spec export { configSchema, type Config } from './spec.js' -export { pgPoolClient, pgliteClient } from './client.js' +export { pgPoolClient, pgliteClient, isPGliteUrl } from './client.js' export type { QueryClient, ManagedClient } from './client.js' export async function buildPoolConfig(config: Config): Promise { @@ -257,11 +257,13 @@ function describePoolConfig(config: PoolConfig) { } async function createManagedClient(config: Config, operation: string): Promise { - if (config.pglite) { - const dataDir = config.pglite === true ? undefined : config.pglite.data_dir - log.debug({ operation, data_dir: dataDir }, 'dest postgres: creating PGlite client') + const connectionUrl = config.url ?? config.connection_string + if (config.pglite || (connectionUrl && isPGliteUrl(connectionUrl))) { + const url = connectionUrl && isPGliteUrl(connectionUrl) ? connectionUrl : undefined + const dataDir = config.pglite && config.pglite !== true ? config.pglite.data_dir : undefined + log.debug({ operation, url, data_dir: dataDir }, 'dest postgres: creating PGlite client') const startedAt = Date.now() - const client = await pgliteClient({ data_dir: dataDir }) + const client = await pgliteClient({ url, data_dir: dataDir }) log.debug( { operation, duration_ms: Date.now() - startedAt }, 'dest postgres: PGlite client ready' diff --git a/packages/destination-postgres/src/pglite.test.ts b/packages/destination-postgres/src/pglite.test.ts index 063ee16d1..11cfa2ed5 100644 --- a/packages/destination-postgres/src/pglite.test.ts +++ b/packages/destination-postgres/src/pglite.test.ts @@ -12,7 +12,7 @@ const SCHEMA = 'test_dest' let dataDir: string function makeConfig(): Config { - return { pglite: { data_dir: dataDir }, schema: SCHEMA, batch_size: 100 } + return { pglite: { data_dir: dataDir }, schema: SCHEMA, batch_size: 100, allow_experimental_pglite: true } } beforeAll(() => { diff --git a/packages/destination-postgres/src/spec.ts b/packages/destination-postgres/src/spec.ts index 4a20dfb4b..c52dd6b83 100644 --- a/packages/destination-postgres/src/spec.ts +++ b/packages/destination-postgres/src/spec.ts @@ -19,6 +19,10 @@ export const configSchema = z }) .optional() .describe('AWS RDS IAM authentication config'), + allow_experimental_pglite: z + .boolean() + .optional() + .describe('Enable experimental PGlite support (required to use pglite config or file:///memory:// URLs)'), pglite: z .union([ z.literal(true), @@ -43,6 +47,19 @@ export const configSchema = z message: 'Specify pglite OR url/connection_string/aws, not both', path: ['pglite'], }) + .refine( + (config) => { + if (config.pglite) return config.allow_experimental_pglite === true + const url = config.url ?? config.connection_string + if (url && (url.startsWith('file://') || url.startsWith('memory://'))) + return config.allow_experimental_pglite === true + return true + }, + { + message: 'Set allow_experimental_pglite: true to use PGlite or file:///memory:// URLs', + path: ['allow_experimental_pglite'], + } + ) export type Config = z.infer diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index be1bfe14c..8a458c4ea 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -529,8 +529,8 @@ importers: specifier: ^3.1013.0 version: 3.1013.0 '@electric-sql/pglite': - specifier: ^0.2.17 - version: 0.2.17 + specifier: ^0.4.5 + version: 0.4.5 '@types/pg': specifier: ^8.15.5 version: 8.15.6 @@ -1005,6 +1005,9 @@ packages: '@electric-sql/pglite@0.2.17': resolution: {integrity: sha512-qEpKRT2oUaWDH6tjRxLHjdzMqRUGYDnGZlKrnL4dJ77JVMcP2Hpo3NYnOSPKdZdeec57B6QPprCUFg0picx5Pw==} + '@electric-sql/pglite@0.4.5': + resolution: {integrity: sha512-aGG2zGEyZzGWKy8P+9ZoNUV0jxt1+hgbeTf+bVAYyxVZZLXg3/9aFlfLxb08AYZVAfAkQlQIysmWjhc5hwDG8g==} + '@emnapi/runtime@1.9.1': resolution: {integrity: sha512-VYi5+ZVLhpgK4hQ0TAjiQiZ6ol0oe4mBx7mVv7IflsiEp0OWoVsp/+f9Vc1hOhE0TtkORVrI1GvzyreqpgWtkA==} @@ -5559,6 +5562,8 @@ snapshots: '@electric-sql/pglite@0.2.17': {} + '@electric-sql/pglite@0.4.5': {} + '@emnapi/runtime@1.9.1': dependencies: tslib: 2.8.1 From 5df3d828282b9e7870b33818eb17ce06c8e41226 Mon Sep 17 00:00:00 2001 From: Tony Xiao Date: Wed, 6 May 2026 16:10:15 -0700 Subject: [PATCH 04/13] fix: handle multi-statement DDL in PGlite adapter PGlite's query() rejects multiple SQL statements in a single call. Fall back to exec() when the "multiple commands" error is detected. This fixes destination setup which uses buildCreateTableDDL (returns a DO block + standalone DO blocks). Co-Authored-By: Claude Opus 4.6 (1M context) Committed-By-Agent: claude --- packages/destination-postgres/src/client.ts | 45 ++++++++++++++------- 1 file changed, 30 insertions(+), 15 deletions(-) diff --git a/packages/destination-postgres/src/client.ts b/packages/destination-postgres/src/client.ts index 50a537200..85c302a8f 100644 --- a/packages/destination-postgres/src/client.ts +++ b/packages/destination-postgres/src/client.ts @@ -45,23 +45,38 @@ export async function pgliteClient( const dataSource = config.url ?? config.data_dir const db = await PGlite.create(dataSource) + function adaptResult(result: { rows: unknown[]; affectedRows?: number; fields?: { name: string; dataTypeID: number }[] }): pg.QueryResult { + return { + rows: result.rows as Record[], + rowCount: result.affectedRows ?? null, + command: '', + oid: 0, + fields: result.fields?.map((f) => ({ + ...f, + tableID: 0, + columnID: 0, + dataTypeSize: 0, + dataTypeModifier: 0, + format: 'text' as const, + })) ?? [], + } as pg.QueryResult + } + return { async query(text: string, values?: unknown[]) { - const result = await db.query(text, values) - return { - rows: result.rows as Record[], - rowCount: result.affectedRows ?? null, - command: '', - oid: 0, - fields: result.fields?.map((f) => ({ - ...f, - tableID: 0, - columnID: 0, - dataTypeSize: 0, - dataTypeModifier: 0, - format: 'text' as const, - })) ?? [], - } as pg.QueryResult + if (values && values.length > 0) { + return adaptResult(await db.query(text, values)) + } + // PGlite's query() rejects multiple statements; use exec() as fallback + try { + return adaptResult(await db.query(text)) + } catch (err) { + if (err instanceof Error && err.message.includes('multiple commands')) { + await db.exec(text) + return adaptResult({ rows: [], affectedRows: 0, fields: [] }) + } + throw err + } }, async close() { await db.close() From 6d820a472065f9c08b6e8b9ebaff2885c7ebfc28 Mon Sep 17 00:00:00 2001 From: Tony Xiao Date: Wed, 6 May 2026 16:10:35 -0700 Subject: [PATCH 05/13] chore: regenerate OpenAPI specs for allow_experimental_pglite Co-Authored-By: Claude Opus 4.6 (1M context) Committed-By-Agent: claude --- apps/engine/src/__generated__/openapi.d.ts | 2 ++ apps/engine/src/__generated__/openapi.json | 4 ++++ apps/service/src/__generated__/openapi.d.ts | 2 ++ apps/service/src/__generated__/openapi.json | 4 ++++ 4 files changed, 12 insertions(+) diff --git a/apps/engine/src/__generated__/openapi.d.ts b/apps/engine/src/__generated__/openapi.d.ts index 27ab0bc0c..386e8c469 100644 --- a/apps/engine/src/__generated__/openapi.d.ts +++ b/apps/engine/src/__generated__/openapi.d.ts @@ -532,6 +532,8 @@ export interface components { /** @description External ID for STS AssumeRole */ external_id?: string; }; + /** @description Enable experimental PGlite support (required to use pglite config or file:///memory:// URLs) */ + allow_experimental_pglite?: boolean; pglite?: true | { /** @description Directory for persistent storage (omit for in-memory) */ data_dir?: string; diff --git a/apps/engine/src/__generated__/openapi.json b/apps/engine/src/__generated__/openapi.json index fdc167f79..bf9be2afd 100644 --- a/apps/engine/src/__generated__/openapi.json +++ b/apps/engine/src/__generated__/openapi.json @@ -1572,6 +1572,10 @@ "additionalProperties": false, "description": "AWS RDS IAM authentication config" }, + "allow_experimental_pglite": { + "type": "boolean", + "description": "Enable experimental PGlite support (required to use pglite config or file:///memory:// URLs)" + }, "pglite": { "anyOf": [ { diff --git a/apps/service/src/__generated__/openapi.d.ts b/apps/service/src/__generated__/openapi.d.ts index b52029292..d7570e156 100644 --- a/apps/service/src/__generated__/openapi.d.ts +++ b/apps/service/src/__generated__/openapi.d.ts @@ -467,6 +467,8 @@ export interface components { /** @description External ID for STS AssumeRole */ external_id?: string; }; + /** @description Enable experimental PGlite support (required to use pglite config or file:///memory:// URLs) */ + allow_experimental_pglite?: boolean; pglite?: true | { /** @description Directory for persistent storage (omit for in-memory) */ data_dir?: string; diff --git a/apps/service/src/__generated__/openapi.json b/apps/service/src/__generated__/openapi.json index dce2c6500..4b26b8f1d 100644 --- a/apps/service/src/__generated__/openapi.json +++ b/apps/service/src/__generated__/openapi.json @@ -1783,6 +1783,10 @@ "additionalProperties": false, "description": "AWS RDS IAM authentication config" }, + "allow_experimental_pglite": { + "type": "boolean", + "description": "Enable experimental PGlite support (required to use pglite config or file:///memory:// URLs)" + }, "pglite": { "anyOf": [ { From 3e1ec26304804d7fb8e3ed2ccde6ce113d0acb12 Mon Sep 17 00:00:00 2001 From: Tony Xiao Date: Thu, 7 May 2026 07:45:42 -0700 Subject: [PATCH 06/13] refactor(destination-postgres): split client into separate pg and pglite files Co-Authored-By: Claude Opus 4.6 (1M context) Committed-By-Agent: claude --- .../destination-postgres/src/client-pg.ts | 26 +++++++ .../destination-postgres/src/client-pglite.ts | 53 +++++++++++++ packages/destination-postgres/src/client.ts | 76 +------------------ 3 files changed, 81 insertions(+), 74 deletions(-) create mode 100644 packages/destination-postgres/src/client-pg.ts create mode 100644 packages/destination-postgres/src/client-pglite.ts diff --git a/packages/destination-postgres/src/client-pg.ts b/packages/destination-postgres/src/client-pg.ts new file mode 100644 index 000000000..0b379f35b --- /dev/null +++ b/packages/destination-postgres/src/client-pg.ts @@ -0,0 +1,26 @@ +import type pg from 'pg' +import type { Logger } from '@stripe/sync-logger' +import { log } from './logger.js' +import type { ManagedClient } from './client.js' + +export function pgPoolClient(pool: pg.Pool, logger: Logger = log): ManagedClient { + pool.on('error', (err) => { + logger.error({ err }, 'Postgres destination pool error') + }) + + return { + query(text: string, values?: unknown[]) { + return pool.query(text, values) + }, + async close() { + await pool.end() + }, + stats() { + return { + total_count: pool.totalCount, + idle_count: pool.idleCount, + waiting_count: pool.waitingCount, + } + }, + } +} diff --git a/packages/destination-postgres/src/client-pglite.ts b/packages/destination-postgres/src/client-pglite.ts new file mode 100644 index 000000000..40dc01013 --- /dev/null +++ b/packages/destination-postgres/src/client-pglite.ts @@ -0,0 +1,53 @@ +import type pg from 'pg' +import type { ManagedClient } from './client.js' + +export function isPGliteUrl(url: string): boolean { + return url.startsWith('file://') || url.startsWith('memory://') +} + +export async function pgliteClient( + config: { data_dir?: string; url?: string } = {} +): Promise { + const { PGlite } = await import('@electric-sql/pglite') + + const dataSource = config.url ?? config.data_dir + const db = await PGlite.create(dataSource) + + function adaptResult(result: { rows: unknown[]; affectedRows?: number; fields?: { name: string; dataTypeID: number }[] }): pg.QueryResult { + return { + rows: result.rows as Record[], + rowCount: result.affectedRows ?? null, + command: '', + oid: 0, + fields: result.fields?.map((f) => ({ + ...f, + tableID: 0, + columnID: 0, + dataTypeSize: 0, + dataTypeModifier: 0, + format: 'text' as const, + })) ?? [], + } as pg.QueryResult + } + + return { + async query(text: string, values?: unknown[]) { + if (values && values.length > 0) { + return adaptResult(await db.query(text, values)) + } + // PGlite's query() rejects multiple statements; use exec() as fallback + try { + return adaptResult(await db.query(text)) + } catch (err) { + if (err instanceof Error && err.message.includes('multiple commands')) { + await db.exec(text) + return adaptResult({ rows: [], affectedRows: 0, fields: [] }) + } + throw err + } + }, + async close() { + await db.close() + }, + } +} diff --git a/packages/destination-postgres/src/client.ts b/packages/destination-postgres/src/client.ts index 85c302a8f..749b1b871 100644 --- a/packages/destination-postgres/src/client.ts +++ b/packages/destination-postgres/src/client.ts @@ -1,6 +1,4 @@ import type pg from 'pg' -import type { Logger } from '@stripe/sync-logger' -import { log } from './logger.js' export interface QueryClient { query(text: string, values?: unknown[]): Promise @@ -11,75 +9,5 @@ export interface ManagedClient extends QueryClient { stats?(): { total_count: number; idle_count: number; waiting_count: number } } -export function pgPoolClient(pool: pg.Pool, logger: Logger = log): ManagedClient { - pool.on('error', (err) => { - logger.error({ err }, 'Postgres destination pool error') - }) - - return { - query(text: string, values?: unknown[]) { - return pool.query(text, values) - }, - async close() { - await pool.end() - }, - stats() { - return { - total_count: pool.totalCount, - idle_count: pool.idleCount, - waiting_count: pool.waitingCount, - } - }, - } -} - -export function isPGliteUrl(url: string): boolean { - return url.startsWith('file://') || url.startsWith('memory://') -} - -export async function pgliteClient( - config: { data_dir?: string; url?: string } = {} -): Promise { - const { PGlite } = await import('@electric-sql/pglite') - - const dataSource = config.url ?? config.data_dir - const db = await PGlite.create(dataSource) - - function adaptResult(result: { rows: unknown[]; affectedRows?: number; fields?: { name: string; dataTypeID: number }[] }): pg.QueryResult { - return { - rows: result.rows as Record[], - rowCount: result.affectedRows ?? null, - command: '', - oid: 0, - fields: result.fields?.map((f) => ({ - ...f, - tableID: 0, - columnID: 0, - dataTypeSize: 0, - dataTypeModifier: 0, - format: 'text' as const, - })) ?? [], - } as pg.QueryResult - } - - return { - async query(text: string, values?: unknown[]) { - if (values && values.length > 0) { - return adaptResult(await db.query(text, values)) - } - // PGlite's query() rejects multiple statements; use exec() as fallback - try { - return adaptResult(await db.query(text)) - } catch (err) { - if (err instanceof Error && err.message.includes('multiple commands')) { - await db.exec(text) - return adaptResult({ rows: [], affectedRows: 0, fields: [] }) - } - throw err - } - }, - async close() { - await db.close() - }, - } -} +export { pgPoolClient } from './client-pg.js' +export { pgliteClient, isPGliteUrl } from './client-pglite.js' From b98bbe911f8d318c95644c1b9e937f476a181f7c Mon Sep 17 00:00:00 2001 From: Tony Xiao Date: Thu, 7 May 2026 09:24:53 -0700 Subject: [PATCH 07/13] chore: upgrade pglite to 0.4.5 in visualizer to match destination-postgres Co-Authored-By: Claude Opus 4.6 (1M context) Committed-By-Agent: claude --- apps/visualizer/package.json | 2 +- pnpm-lock.yaml | 195 +++++++++++++++++++++++++++++++++-- 2 files changed, 188 insertions(+), 9 deletions(-) diff --git a/apps/visualizer/package.json b/apps/visualizer/package.json index da9d6fe7d..2673c4721 100644 --- a/apps/visualizer/package.json +++ b/apps/visualizer/package.json @@ -12,7 +12,7 @@ "@codemirror/lang-sql": "^6.7.0", "@codemirror/state": "^6.4.0", "@codemirror/view": "^6.26.0", - "@electric-sql/pglite": "^0.2.0", + "@electric-sql/pglite": "^0.4.5", "@stripe/sync-source-stripe": "workspace:*", "codemirror": "^6.0.1", "next": "^15", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 8a458c4ea..83792a266 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -164,9 +164,15 @@ importers: '@stripe/sync-destination-postgres': specifier: workspace:* version: link:../../packages/destination-postgres + '@stripe/sync-destination-redis': + specifier: workspace:* + version: link:../../packages/destination-redis '@stripe/sync-destination-sqlite': specifier: workspace:* version: link:../../packages/destination-sqlite + '@stripe/sync-destination-stripe': + specifier: workspace:* + version: link:../../packages/destination-stripe '@stripe/sync-hono-zod-openapi': specifier: workspace:* version: link:../../packages/hono-zod-openapi @@ -176,6 +182,12 @@ importers: '@stripe/sync-protocol': specifier: workspace:* version: link:../../packages/protocol + '@stripe/sync-source-metronome': + specifier: workspace:* + version: link:../../packages/source-metronome + '@stripe/sync-source-postgres': + specifier: workspace:* + version: link:../../packages/source-postgres '@stripe/sync-source-stripe': specifier: workspace:* version: link:../../packages/source-stripe @@ -255,6 +267,9 @@ importers: '@stripe/sync-destination-sqlite': specifier: workspace:* version: link:../../packages/destination-sqlite + '@stripe/sync-destination-stripe': + specifier: workspace:* + version: link:../../packages/destination-stripe '@stripe/sync-engine': specifier: workspace:* version: link:../engine @@ -267,6 +282,9 @@ importers: '@stripe/sync-protocol': specifier: workspace:* version: link:../../packages/protocol + '@stripe/sync-source-postgres': + specifier: workspace:* + version: link:../../packages/source-postgres '@stripe/sync-source-stripe': specifier: workspace:* version: link:../../packages/source-stripe @@ -378,8 +396,8 @@ importers: specifier: ^6.26.0 version: 6.40.0 '@electric-sql/pglite': - specifier: ^0.2.0 - version: 0.2.17 + specifier: ^0.4.5 + version: 0.4.5 '@stripe/sync-source-stripe': specifier: workspace:* version: link:../../packages/source-stripe @@ -436,6 +454,9 @@ importers: '@stripe/sync-destination-postgres': specifier: workspace:* version: link:../packages/destination-postgres + '@stripe/sync-destination-stripe': + specifier: workspace:* + version: link:../packages/destination-stripe '@stripe/sync-engine': specifier: workspace:* version: link:../apps/engine @@ -448,6 +469,9 @@ importers: '@stripe/sync-service': specifier: workspace:* version: link:../apps/service + '@stripe/sync-source-postgres': + specifier: workspace:* + version: link:../packages/source-postgres '@stripe/sync-source-stripe': specifier: workspace:* version: link:../packages/source-stripe @@ -538,6 +562,28 @@ importers: specifier: ^3.2.4 version: 3.2.4(@types/node@25.5.0)(jiti@2.6.1)(lightningcss@1.32.0)(terser@5.46.1)(tsx@4.21.0)(yaml@2.8.1) + packages/destination-redis: + dependencies: + '@stripe/sync-logger': + specifier: workspace:* + version: link:../logger + '@stripe/sync-protocol': + specifier: workspace:* + version: link:../protocol + ioredis: + specifier: ^5.6.1 + version: 5.10.1 + zod: + specifier: ^4.3.6 + version: 4.3.6 + devDependencies: + '@types/node': + specifier: ^24.5.0 + version: 24.10.1 + vitest: + specifier: ^3.2.4 + version: 3.2.4(@types/node@24.10.1)(jiti@2.6.1)(lightningcss@1.32.0)(terser@5.46.1)(tsx@4.21.0)(yaml@2.8.1) + packages/destination-sqlite: dependencies: '@stripe/sync-logger': @@ -550,6 +596,28 @@ importers: specifier: ^4.3.6 version: 4.3.6 + packages/destination-stripe: + dependencies: + '@stripe/sync-logger': + specifier: workspace:* + version: link:../logger + '@stripe/sync-openapi': + specifier: workspace:* + version: link:../openapi + '@stripe/sync-protocol': + specifier: workspace:* + version: link:../protocol + zod: + specifier: ^4.3.6 + version: 4.3.6 + devDependencies: + '@types/node': + specifier: ^24.10.1 + version: 24.10.1 + vitest: + specifier: ^3.2.4 + version: 3.2.4(@types/node@24.10.1)(jiti@2.6.1)(lightningcss@1.32.0)(terser@5.46.1)(tsx@4.21.0)(yaml@2.8.1) + packages/hono-zod-openapi: dependencies: '@hono/zod-validator': @@ -629,6 +697,50 @@ importers: specifier: ^3.2.1 version: 3.2.4(@types/node@24.10.1)(jiti@2.6.1)(lightningcss@1.32.0)(terser@5.46.1)(tsx@4.21.0)(yaml@2.8.1) + packages/source-metronome: + dependencies: + '@stripe/sync-logger': + specifier: workspace:* + version: link:../logger + '@stripe/sync-protocol': + specifier: workspace:* + version: link:../protocol + zod: + specifier: ^4.3.6 + version: 4.3.6 + devDependencies: + '@types/node': + specifier: ^24.5.0 + version: 24.10.1 + vitest: + specifier: ^3.2.4 + version: 3.2.4(@types/node@24.10.1)(jiti@2.6.1)(lightningcss@1.32.0)(terser@5.46.1)(tsx@4.21.0)(yaml@2.8.1) + + packages/source-postgres: + dependencies: + '@stripe/sync-logger': + specifier: workspace:* + version: link:../logger + '@stripe/sync-protocol': + specifier: workspace:* + version: link:../protocol + '@stripe/sync-util-postgres': + specifier: workspace:* + version: link:../util-postgres + pg: + specifier: ^8.16.3 + version: 8.16.3 + zod: + specifier: ^4.3.6 + version: 4.3.6 + devDependencies: + '@types/pg': + specifier: ^8.15.5 + version: 8.20.0 + vitest: + specifier: ^3.2.4 + version: 3.2.4(@types/node@25.5.0)(jiti@2.6.1)(lightningcss@1.32.0)(terser@5.46.1)(tsx@4.21.0)(yaml@2.8.1) + packages/source-stripe: dependencies: '@stripe/sync-logger': @@ -1002,9 +1114,6 @@ packages: '@codemirror/view@6.40.0': resolution: {integrity: sha512-WA0zdU7xfF10+5I3HhUUq3kqOx3KjqmtQ9lqZjfK7jtYk4G72YW9rezcSywpaUMCWOMlq+6E0pO1IWg1TNIhtg==} - '@electric-sql/pglite@0.2.17': - resolution: {integrity: sha512-qEpKRT2oUaWDH6tjRxLHjdzMqRUGYDnGZlKrnL4dJ77JVMcP2Hpo3NYnOSPKdZdeec57B6QPprCUFg0picx5Pw==} - '@electric-sql/pglite@0.4.5': resolution: {integrity: sha512-aGG2zGEyZzGWKy8P+9ZoNUV0jxt1+hgbeTf+bVAYyxVZZLXg3/9aFlfLxb08AYZVAfAkQlQIysmWjhc5hwDG8g==} @@ -1415,6 +1524,9 @@ packages: cpu: [x64] os: [win32] + '@ioredis/commands@1.5.1': + resolution: {integrity: sha512-JH8ZL/ywcJyR9MmJ5BNqZllXNZQqQbnVZOqpPQqE1vHiFgAw4NHbvE0FOduNU8IX9babitBT46571OnPTT0Zcw==} + '@isaacs/balanced-match@4.0.1': resolution: {integrity: sha512-yzMTt9lEb8Gv7zRioUilSglI0c0smZ9k5D65677DLWLtWJaXIS3CqcGyUFByYKlnUj6TkjLVs54fBl6+TiGQDQ==} engines: {node: 20 || >=22} @@ -3029,6 +3141,10 @@ packages: resolution: {integrity: sha512-eYm0QWBtUrBWZWG0d386OGAw16Z995PiOVo2B7bjWSbHedGl5e0ZWaq65kOGgUSNesEIDkB9ISbTg/JK9dhCZA==} engines: {node: '>=6'} + cluster-key-slot@1.1.2: + resolution: {integrity: sha512-RMr0FhtfXemyinomL4hrWcYJxmX6deFdCxpJzhDttxgO1+bcCnkk+9drydLVDmAMG7NE6aN/fl4F7ucU/90gAA==} + engines: {node: '>=0.10.0'} + code-excerpt@4.0.0: resolution: {integrity: sha512-xxodCmBen3iy2i0WtAK8FlFNrRzjUqjRsMfho58xT/wvZU1YTM3fCnRjcy1gJPMepaRlgm/0e6w8SpWHpn3/cA==} engines: {node: ^12.20.0 || ^14.13.1 || >=16.0.0} @@ -3140,6 +3256,10 @@ packages: resolution: {integrity: sha512-ZySD7Nf91aLB0RxL4KGrKHBXl7Eds1DAmEdcoVawXnLD7SDhpNgtuII2aAkg7a7QS41jxPSZ17p4VdGnMHk3MQ==} engines: {node: '>=0.4.0'} + denque@2.1.0: + resolution: {integrity: sha512-HVQE3AAb/pxF8fQAoiqpvg9i3evqug3hoiwakOyZAwJm+6vZehbkYXZ0l4JxS+I3QxM97v5aaRNhj8v5oBhekw==} + engines: {node: '>=0.10'} + detect-libc@2.1.2: resolution: {integrity: sha512-Btj2BOOO83o3WyH59e8MgXsxEQVcarkUOpEYrubB0urwnN10yQ364rsiByU11nZlqWYZm05i/of7io4mzihBtQ==} engines: {node: '>=8'} @@ -3606,6 +3726,10 @@ packages: react-devtools-core: optional: true + ioredis@5.10.1: + resolution: {integrity: sha512-HuEDBTI70aYdx1v6U97SbNx9F1+svQKBDo30o0b9fw055LMepzpOOd0Ccg9Q6tbqmBSJaMuY0fB7yw9/vjBYCA==} + engines: {node: '>=12.22.0'} + is-extglob@2.1.1: resolution: {integrity: sha512-SbKbANkN603Vi4jEZv49LeVJMn4yGwsbzZworEoyEiutsN3nJYdbO36zfhGJ6QEDpOZIFkDtnq5JRxmvl3jsoQ==} engines: {node: '>=0.10.0'} @@ -3807,6 +3931,12 @@ packages: lodash.camelcase@4.3.0: resolution: {integrity: sha512-TwuEnCnxbc3rAvhf/LbG7tJUDzhqXyFnv3dtzLOPgCG/hODL7WFnsbwktkD7yUV0RrreP/l1PALq/YSg6VvjlA==} + lodash.defaults@4.2.0: + resolution: {integrity: sha512-qjxPLHd3r5DnsdGacqOMU6pb/avJzdh9tFX2ymgoZE27BmjXrNy/y4LoaiTeAb+O3gL8AfpJGtqfX/ae2leYYQ==} + + lodash.isarguments@3.1.0: + resolution: {integrity: sha512-chi4NHZlZqZD18a0imDHnZPrDeBbTtVN7GXMwuGdRH9qotxAjYs3aVLKc7zNOG9eddR5Ksd8rvFEBc9SsggPpg==} + lodash.merge@4.6.2: resolution: {integrity: sha512-0KpjqXRVvrYyCsX1swR/XTK0va6VQkQM6MNo7PqW77ByjAhoARA8EfrP1N4+KlKj8YS0ZUCtRT/YUuhyYDujIQ==} @@ -4249,6 +4379,14 @@ packages: resolution: {integrity: sha512-57frrGM/OCTLqLOAh0mhVA9VBMHd+9U7Zb2THMGdBUoZVOtGbJzjxsYGDJ3A9AYYCP4hn6y1TVbaOfzWtm5GFg==} engines: {node: '>= 12.13.0'} + redis-errors@1.2.0: + resolution: {integrity: sha512-1qny3OExCf0UvUV/5wpYKf2YwPcOqXzkwKKSmKHiE6ZMQs5heeE/c8eXK+PNllPvmjgAbfnsbpkGZWy8cBpn9w==} + engines: {node: '>=4'} + + redis-parser@3.0.0: + resolution: {integrity: sha512-DJnGAeenTdpMEH6uAJRK/uiyEIH9WVsUmoLwzudwGJUwZPp80PDBWPHXSAGNPwNvIXAbe7MSUB1zQFugFml66A==} + engines: {node: '>=4'} + require-directory@2.1.1: resolution: {integrity: sha512-fGxEI7+wsG9xrvdjsrlmL22OMTTiHRwAMroiEeMgq8gzoLC/PQr7RsRDSTLUg/bZAZtF+TVIkHc6/4RIKrui+Q==} engines: {node: '>=0.10.0'} @@ -4404,6 +4542,9 @@ packages: stackback@0.0.2: resolution: {integrity: sha512-1XMJE5fQo1jGH6Y/7ebnwPOBEkIEnT4QF32d5R1+VXdXveM0IBMJt8zfaxX1P3QhVwrYe+576+jkANtSS2mBbw==} + standard-as-callback@2.1.0: + resolution: {integrity: sha512-qoRRSyROncaz1z0mvYqIE4lCd9p2R90i6GxW3uZv5ucSu8tU7B5HXUP1gG8pVZsYNVaXjk8ClXHPttLyxAL48A==} + std-env@3.9.0: resolution: {integrity: sha512-UGvjygr6F6tpH7o2qyqR6QYpwraIjKSdtzyBdyytFOHmPZY917kwdwLG0RbOjWOnKmnm3PeHjaoLLMie7kPLQw==} @@ -5560,8 +5701,6 @@ snapshots: style-mod: 4.1.3 w3c-keyname: 2.2.8 - '@electric-sql/pglite@0.2.17': {} - '@electric-sql/pglite@0.4.5': {} '@emnapi/runtime@1.9.1': @@ -5869,6 +6008,8 @@ snapshots: '@img/sharp-win32-x64@0.34.5': optional: true + '@ioredis/commands@1.5.1': {} + '@isaacs/balanced-match@4.0.1': {} '@isaacs/brace-expansion@5.0.0': @@ -7287,6 +7428,14 @@ snapshots: chai: 5.2.0 tinyrainbow: 2.0.0 + '@vitest/mocker@3.2.4(vite@7.2.2(@types/node@24.10.1)(jiti@2.6.1)(lightningcss@1.32.0)(terser@5.46.1)(tsx@4.21.0)(yaml@2.8.1))': + dependencies: + '@vitest/spy': 3.2.4 + estree-walker: 3.0.3 + magic-string: 0.30.21 + optionalDependencies: + vite: 7.2.2(@types/node@24.10.1)(jiti@2.6.1)(lightningcss@1.32.0)(terser@5.46.1)(tsx@4.21.0)(yaml@2.8.1) + '@vitest/mocker@3.2.4(vite@7.2.2(@types/node@25.5.0)(jiti@2.6.1)(lightningcss@1.32.0)(terser@5.46.1)(tsx@4.21.0)(yaml@2.8.1))': dependencies: '@vitest/spy': 3.2.4 @@ -7619,6 +7768,8 @@ snapshots: clsx@2.1.1: {} + cluster-key-slot@1.1.2: {} + code-excerpt@4.0.0: dependencies: convert-to-spaces: 2.0.1 @@ -7706,6 +7857,8 @@ snapshots: delayed-stream@1.0.0: {} + denque@2.1.0: {} + detect-libc@2.1.2: {} detect-node-es@1.1.0: {} @@ -8239,6 +8392,20 @@ snapshots: - bufferutil - utf-8-validate + ioredis@5.10.1: + dependencies: + '@ioredis/commands': 1.5.1 + cluster-key-slot: 1.1.2 + debug: 4.4.3(supports-color@10.2.2) + denque: 2.1.0 + lodash.defaults: 4.2.0 + lodash.isarguments: 3.1.0 + redis-errors: 1.2.0 + redis-parser: 3.0.0 + standard-as-callback: 2.1.0 + transitivePeerDependencies: + - supports-color + is-extglob@2.1.1: {} is-fullwidth-code-point@3.0.0: {} @@ -8393,6 +8560,10 @@ snapshots: lodash.camelcase@4.3.0: {} + lodash.defaults@4.2.0: {} + + lodash.isarguments@3.1.0: {} + lodash.merge@4.6.2: {} long@5.3.2: {} @@ -8813,6 +8984,12 @@ snapshots: real-require@0.2.0: {} + redis-errors@1.2.0: {} + + redis-parser@3.0.0: + dependencies: + redis-errors: 1.2.0 + require-directory@2.1.1: {} require-from-string@2.0.2: {} @@ -9026,6 +9203,8 @@ snapshots: stackback@0.0.2: {} + standard-as-callback@2.1.0: {} + std-env@3.9.0: {} string-width@4.2.3: @@ -9349,7 +9528,7 @@ snapshots: dependencies: '@types/chai': 5.2.2 '@vitest/expect': 3.2.4 - '@vitest/mocker': 3.2.4(vite@7.2.2(@types/node@25.5.0)(jiti@2.6.1)(lightningcss@1.32.0)(terser@5.46.1)(tsx@4.21.0)(yaml@2.8.1)) + '@vitest/mocker': 3.2.4(vite@7.2.2(@types/node@24.10.1)(jiti@2.6.1)(lightningcss@1.32.0)(terser@5.46.1)(tsx@4.21.0)(yaml@2.8.1)) '@vitest/pretty-format': 3.2.4 '@vitest/runner': 3.2.4 '@vitest/snapshot': 3.2.4 From 2d00defddc6e95fe9430d33cb36afb75943ae651 Mon Sep 17 00:00:00 2001 From: Tony Xiao Date: Thu, 7 May 2026 09:34:32 -0700 Subject: [PATCH 08/13] fix(destination-postgres): register signal handlers to close PGlite on process kill Prevents data directory corruption when the process receives SIGTERM/SIGINT mid-write. PGlite lacks WAL recovery, so unclean shutdown corrupts the data dir. Co-Authored-By: Claude Opus 4.6 (1M context) Committed-By-Agent: claude --- .../destination-postgres/src/client-pglite.ts | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/packages/destination-postgres/src/client-pglite.ts b/packages/destination-postgres/src/client-pglite.ts index 40dc01013..eb578323b 100644 --- a/packages/destination-postgres/src/client-pglite.ts +++ b/packages/destination-postgres/src/client-pglite.ts @@ -13,6 +13,20 @@ export async function pgliteClient( const dataSource = config.url ?? config.data_dir const db = await PGlite.create(dataSource) + let closed = false + const shutdown = () => { + if (closed) return + closed = true + db.close().catch(() => {}) + cleanup() + } + const cleanup = () => { + process.removeListener('SIGTERM', shutdown) + process.removeListener('SIGINT', shutdown) + } + process.on('SIGTERM', shutdown) + process.on('SIGINT', shutdown) + function adaptResult(result: { rows: unknown[]; affectedRows?: number; fields?: { name: string; dataTypeID: number }[] }): pg.QueryResult { return { rows: result.rows as Record[], @@ -47,6 +61,9 @@ export async function pgliteClient( } }, async close() { + if (closed) return + closed = true + cleanup() await db.close() }, } From dcb495d0c6ff83c2e78afa8d00c30646481773a3 Mon Sep 17 00:00:00 2001 From: Tony Xiao Date: Thu, 7 May 2026 09:40:40 -0700 Subject: [PATCH 09/13] feat: add browser-sync example app (React + Vite + PGlite) Runs the sync engine entirely in-browser with PGlite at memory://, including live WebSocket updates from Stripe. Co-Authored-By: Claude Opus 4.6 (1M context) Committed-By-Agent: claude --- examples/browser-sync/index.html | 12 + examples/browser-sync/package.json | 26 ++ examples/browser-sync/src/App.tsx | 106 +++++ examples/browser-sync/src/lib/sync.ts | 49 ++ examples/browser-sync/src/main.tsx | 4 + .../browser-sync/src/shims/logger-progress.ts | 4 + examples/browser-sync/src/shims/logger.ts | 43 ++ examples/browser-sync/src/shims/noop.ts | 2 + examples/browser-sync/src/shims/pg.ts | 6 + examples/browser-sync/src/shims/ws.ts | 38 ++ examples/browser-sync/tsconfig.json | 17 + examples/browser-sync/vite.config.ts | 38 ++ pnpm-lock.yaml | 426 +++++++++++++++++- pnpm-workspace.yaml | 1 + 14 files changed, 763 insertions(+), 9 deletions(-) create mode 100644 examples/browser-sync/index.html create mode 100644 examples/browser-sync/package.json create mode 100644 examples/browser-sync/src/App.tsx create mode 100644 examples/browser-sync/src/lib/sync.ts create mode 100644 examples/browser-sync/src/main.tsx create mode 100644 examples/browser-sync/src/shims/logger-progress.ts create mode 100644 examples/browser-sync/src/shims/logger.ts create mode 100644 examples/browser-sync/src/shims/noop.ts create mode 100644 examples/browser-sync/src/shims/pg.ts create mode 100644 examples/browser-sync/src/shims/ws.ts create mode 100644 examples/browser-sync/tsconfig.json create mode 100644 examples/browser-sync/vite.config.ts diff --git a/examples/browser-sync/index.html b/examples/browser-sync/index.html new file mode 100644 index 000000000..e5c0ffe67 --- /dev/null +++ b/examples/browser-sync/index.html @@ -0,0 +1,12 @@ + + + + + + Stripe Sync Engine — Browser + + +
+ + + diff --git a/examples/browser-sync/package.json b/examples/browser-sync/package.json new file mode 100644 index 000000000..4ee7fda9a --- /dev/null +++ b/examples/browser-sync/package.json @@ -0,0 +1,26 @@ +{ + "name": "@stripe/sync-example-browser", + "private": true, + "type": "module", + "scripts": { + "dev": "vite", + "build": "vite build", + "preview": "vite preview" + }, + "dependencies": { + "@electric-sql/pglite": "^0.4.5", + "@stripe/sync-engine": "workspace:*", + "@stripe/sync-source-stripe": "workspace:*", + "@stripe/sync-destination-postgres": "workspace:*", + "@stripe/sync-protocol": "workspace:*", + "react": "^19.0.0", + "react-dom": "^19.0.0" + }, + "devDependencies": { + "@types/react": "^19", + "@types/react-dom": "^19", + "@vitejs/plugin-react": "^4", + "typescript": "^5", + "vite": "^6" + } +} diff --git a/examples/browser-sync/src/App.tsx b/examples/browser-sync/src/App.tsx new file mode 100644 index 000000000..c6cb9b39c --- /dev/null +++ b/examples/browser-sync/src/App.tsx @@ -0,0 +1,106 @@ +import { useState, useRef, useCallback } from 'react' +import { startSync } from './lib/sync' + +export default function App() { + const [apiKey, setApiKey] = useState('') + const [status, setStatus] = useState<'idle' | 'running' | 'error'>('idle') + const [messages, setMessages] = useState([]) + const [query, setQuery] = useState('') + const [queryResult, setQueryResult] = useState('') + const abortRef = useRef(null) + + const addMessage = useCallback((msg: string) => { + setMessages((prev) => [...prev.slice(-200), msg]) + }, []) + + const handleStart = async () => { + if (!apiKey) return + setStatus('running') + setMessages([]) + abortRef.current = new AbortController() + + try { + await startSync({ + apiKey, + websocket: true, + signal: abortRef.current.signal, + onMessage: (msg: unknown) => { + const m = msg as { type?: string; record?: { stream?: string } } + if (m.type === 'record') { + addMessage(`record: ${m.record?.stream}`) + } else { + addMessage(JSON.stringify(m).slice(0, 120)) + } + }, + }) + } catch (err) { + if ((err as Error).name !== 'AbortError') { + setStatus('error') + addMessage(`Error: ${(err as Error).message}`) + } + } + } + + const handleStop = () => { + abortRef.current?.abort() + setStatus('idle') + } + + return ( +
+

Stripe Sync Engine — Browser

+ +
+ setApiKey(e.target.value)} + style={{ width: '400px', padding: '0.5rem', fontFamily: 'monospace' }} + /> + {status === 'idle' ? ( + + ) : ( + + )} + {status} +
+ +
+ {messages.map((m, i) => ( +
{m}
+ ))} +
+ +
+