diff --git a/plugins/replicator/README.md b/plugins/replicator/README.md new file mode 100644 index 0000000..746ae8a --- /dev/null +++ b/plugins/replicator/README.md @@ -0,0 +1,68 @@ +# Replicator Plugin + +Pulls data from a configured **external data source** (e.g. a Postgres database +on Supabase) into the **internal Durable Object SQLite** so a StarbaseDB +instance can serve as a close-to-edge replica that can be queried instead of +hitting the upstream database directly. + +Replication is a **pull** mechanism and is **append-only**: for each table you +register a monotonically increasing _tracking column_ (e.g. `id` or +`created_at`). On every sync the plugin pulls only the rows whose tracking +column is greater than the last value it has already seen, and upserts them +into the matching internal table with `INSERT OR REPLACE`. + +## Requirements + +An external data source must be configured (see the `EXTERNAL_DB_*` / +`HYPERDRIVE` settings in `wrangler.toml`). Without one the management endpoints +still work but `POST /replicator/sync` will report an error per table. + +## Endpoints + +All endpoints require an **admin** authorization token. + +### `GET /replicator/tables` + +Lists every table configured for replication, including the last synced +watermark (`last_value`) and `last_synced_at`. + +### `POST /replicator/tables` + +Registers (or updates) a table for replication. + +```json +{ + "table": "orders", + "schema": "public", + "trackingColumn": "id", + "intervalSeconds": 300, + "batchSize": 1000, + "isActive": true +} +``` + +- `table` (required) – name of the table in both the external and internal DB. +- `trackingColumn` (required) – append-only column used as the watermark. +- `schema` – optional schema name for the external table. +- `intervalSeconds` – how often the table should be polled (default `300`). +- `batchSize` – max rows pulled per sync (default `1000`). +- `isActive` – set to `false` to pause replication for the table. + +### `DELETE /replicator/tables/:table` + +Removes a table from replication. Existing internal data is left untouched. + +### `POST /replicator/sync` + +Triggers a sync immediately. Pass `?table=` to sync a single table, +otherwise every active table is synced. Returns the number of rows replicated +per table. This endpoint can be driven by a Cloudflare Cron Trigger, the cron +plugin, or any external scheduler. + +## Automatic polling + +By default the plugin also syncs opportunistically: on incoming requests it +checks whether any table's `intervalSeconds` has elapsed since its last sync +and, if so, replicates it in the background (via `ctx.waitUntil`). Pass +`new ReplicatorPlugin({ autoSyncOnRequest: false })` in `src/index.ts` to rely +solely on the `/replicator/sync` endpoint instead. diff --git a/plugins/replicator/index.test.ts b/plugins/replicator/index.test.ts new file mode 100644 index 0000000..b43b783 --- /dev/null +++ b/plugins/replicator/index.test.ts @@ -0,0 +1,247 @@ +import { describe, it, expect, vi, beforeEach } from 'vitest' +import { ReplicatorPlugin, ReplicatedTable } from './index' +import { executeQuery } from '../../src/operation' +import { DataSource } from '../../src/types' + +vi.mock('../../src/operation', () => ({ + executeQuery: vi.fn(), +})) + +const mockedExecuteQuery = vi.mocked(executeQuery) + +let plugin: ReplicatorPlugin +let internalQuery: ReturnType +let dataSource: DataSource + +function makeTable(overrides: Partial = {}): ReplicatedTable { + return { + table_name: 'orders', + source_schema: null, + tracking_column: 'id', + last_value: null, + interval_seconds: 300, + batch_size: 1000, + is_active: 1, + last_synced_at: null, + ...overrides, + } +} + +beforeEach(() => { + vi.clearAllMocks() + internalQuery = vi.fn().mockResolvedValue([]) + dataSource = { + rpc: { executeQuery: internalQuery }, + source: 'internal', + external: { dialect: 'postgresql' }, + } as unknown as DataSource + + plugin = new ReplicatorPlugin() + // Inject the data source the way the register() middleware would. + ;(plugin as any).dataSource = dataSource + ;(plugin as any).config = { role: 'admin' } +}) + +describe('ReplicatorPlugin - initialization', () => { + it('registers with the expected name and route prefix', () => { + expect(plugin.name).toBe('starbasedb:replicator') + expect(plugin.pathPrefix).toBe('/replicator') + expect(plugin.opts.requiresAuth).toBe(true) + }) +}) + +describe('ReplicatorPlugin - quoteIdentifier', () => { + it('uses double quotes by default', () => { + expect(plugin.quoteIdentifier('orders')).toBe('"orders"') + }) + + it('uses backticks for mysql', () => { + expect(plugin.quoteIdentifier('orders', 'mysql')).toBe('`orders`') + }) + + it('escapes embedded quote characters', () => { + expect(plugin.quoteIdentifier('we"ird')).toBe('"we""ird"') + expect(plugin.quoteIdentifier('we`ird', 'mysql')).toBe('`we``ird`') + }) +}) + +describe('ReplicatorPlugin - quoteLiteral', () => { + it('emits numbers bare', () => { + expect(plugin.quoteLiteral(42)).toBe('42') + }) + + it('single-quotes strings and escapes quotes', () => { + expect(plugin.quoteLiteral("O'Brien")).toBe("'O''Brien'") + }) + + it('renders null/undefined as NULL', () => { + expect(plugin.quoteLiteral(null)).toBe('NULL') + expect(plugin.quoteLiteral(undefined)).toBe('NULL') + }) +}) + +describe('ReplicatorPlugin - buildSelectQuery', () => { + it('selects all rows when there is no watermark yet', () => { + const sql = plugin.buildSelectQuery(makeTable(), 'postgresql') + expect(sql).toBe('SELECT * FROM "orders" ORDER BY "id" ASC LIMIT 1000') + }) + + it('filters by the tracking column once a watermark exists', () => { + const sql = plugin.buildSelectQuery( + makeTable({ last_value: '100', batch_size: 50 }), + 'postgresql' + ) + expect(sql).toBe( + 'SELECT * FROM "orders" WHERE "id" > \'100\' ORDER BY "id" ASC LIMIT 50' + ) + }) + + it('qualifies the table with its schema when provided', () => { + const sql = plugin.buildSelectQuery( + makeTable({ source_schema: 'public' }), + 'postgresql' + ) + expect(sql).toBe( + 'SELECT * FROM "public"."orders" ORDER BY "id" ASC LIMIT 1000' + ) + }) +}) + +describe('ReplicatorPlugin - buildUpsertQuery', () => { + it('builds a parameterized INSERT OR REPLACE statement', () => { + const { sql, params } = plugin.buildUpsertQuery('orders', { + id: 1, + name: 'Alice', + }) + expect(sql).toBe( + 'INSERT OR REPLACE INTO "orders" ("id", "name") VALUES (?, ?)' + ) + expect(params).toEqual([1, 'Alice']) + }) +}) + +describe('ReplicatorPlugin - isDue', () => { + it('is due when a table has never synced', () => { + expect(plugin.isDue(makeTable())).toBe(true) + }) + + it('is not due when the interval has not elapsed', () => { + const now = Date.now() + const table = makeTable({ + interval_seconds: 300, + last_synced_at: new Date(now - 60_000) + .toISOString() + .replace('T', ' ') + .replace(/\.\d+Z$/, ''), + }) + expect(plugin.isDue(table, now)).toBe(false) + }) + + it('is due once the interval has elapsed', () => { + const now = Date.now() + const table = makeTable({ + interval_seconds: 60, + last_synced_at: new Date(now - 120_000) + .toISOString() + .replace('T', ' ') + .replace(/\.\d+Z$/, ''), + }) + expect(plugin.isDue(table, now)).toBe(true) + }) + + it('is never due when the table is inactive', () => { + expect(plugin.isDue(makeTable({ is_active: 0 }))).toBe(false) + }) +}) + +describe('ReplicatorPlugin - registerTable', () => { + it('upserts the table configuration with defaults', async () => { + await plugin.registerTable({ + table: 'orders', + trackingColumn: 'id', + }) + + expect(internalQuery).toHaveBeenCalledTimes(1) + const call = internalQuery.mock.calls[0][0] + expect(call.params).toEqual(['orders', null, 'id', 300, 1000, 1]) + }) + + it('honors provided options', async () => { + await plugin.registerTable({ + table: 'orders', + schema: 'public', + trackingColumn: 'created_at', + intervalSeconds: 30, + batchSize: 10, + isActive: false, + }) + + const call = internalQuery.mock.calls[0][0] + expect(call.params).toEqual([ + 'orders', + 'public', + 'created_at', + 30, + 10, + 0, + ]) + }) +}) + +describe('ReplicatorPlugin - syncTable', () => { + it('pulls external rows, upserts them and advances the watermark', async () => { + mockedExecuteQuery.mockResolvedValue([ + { id: 1, name: 'Alice' }, + { id: 2, name: 'Bob' }, + ] as any) + + const result = await plugin.syncTable(makeTable()) + + expect(result.rowsReplicated).toBe(2) + expect(result.lastValue).toBe('2') + + // 2 upserts + 1 progress update on the internal database. + expect(internalQuery).toHaveBeenCalledTimes(3) + const upsert = internalQuery.mock.calls[0][0] + expect(upsert.sql).toContain('INSERT OR REPLACE INTO "orders"') + expect(upsert.params).toEqual([1, 'Alice']) + + const progress = internalQuery.mock.calls[2][0] + expect(progress.params).toEqual(['2', 'orders']) + }) + + it('keeps the previous watermark when no new rows are returned', async () => { + mockedExecuteQuery.mockResolvedValue([] as any) + + const result = await plugin.syncTable(makeTable({ last_value: '99' })) + + expect(result.rowsReplicated).toBe(0) + expect(result.lastValue).toBe('99') + // Only the progress update runs. + expect(internalQuery).toHaveBeenCalledTimes(1) + expect(internalQuery.mock.calls[0][0].params).toEqual(['99', 'orders']) + }) +}) + +describe('ReplicatorPlugin - sync', () => { + it('captures per-table errors instead of failing the whole run', async () => { + internalQuery.mockResolvedValueOnce([makeTable()]) + mockedExecuteQuery.mockRejectedValue(new Error('connection refused')) + + const results = await plugin.sync() + + expect(results).toHaveLength(1) + expect(results[0].error).toBe('connection refused') + expect(results[0].rowsReplicated).toBe(0) + }) + + it('skips inactive tables when syncing everything', async () => { + internalQuery.mockResolvedValueOnce([ + makeTable({ table_name: 'orders', is_active: 0 }), + ]) + + const results = await plugin.sync() + expect(results).toHaveLength(0) + expect(mockedExecuteQuery).not.toHaveBeenCalled() + }) +}) diff --git a/plugins/replicator/index.ts b/plugins/replicator/index.ts new file mode 100644 index 0000000..382d646 --- /dev/null +++ b/plugins/replicator/index.ts @@ -0,0 +1,429 @@ +import { StarbaseApp, StarbaseDBConfiguration } from '../../src/handler' +import { StarbasePlugin } from '../../src/plugin' +import { executeQuery } from '../../src/operation' +import { DataSource, QueryResult } from '../../src/types' +import { createResponse } from '../../src/utils' + +const SQL_QUERIES = { + CREATE_TABLE: ` + CREATE TABLE IF NOT EXISTS tmp_replicator_tables ( + table_name TEXT NOT NULL UNIQUE PRIMARY KEY, + source_schema TEXT, + tracking_column TEXT NOT NULL, + last_value TEXT, + interval_seconds INTEGER NOT NULL DEFAULT 300, + batch_size INTEGER NOT NULL DEFAULT 1000, + is_active INTEGER NOT NULL DEFAULT 1, + last_synced_at TEXT + ) + `, + UPSERT_TABLE: ` + INSERT INTO tmp_replicator_tables + (table_name, source_schema, tracking_column, interval_seconds, batch_size, is_active) + VALUES (?, ?, ?, ?, ?, ?) + ON CONFLICT(table_name) DO UPDATE SET + source_schema = excluded.source_schema, + tracking_column = excluded.tracking_column, + interval_seconds = excluded.interval_seconds, + batch_size = excluded.batch_size, + is_active = excluded.is_active + `, + GET_TABLES: `SELECT * FROM tmp_replicator_tables`, + GET_TABLE: `SELECT * FROM tmp_replicator_tables WHERE table_name = ?`, + DELETE_TABLE: `DELETE FROM tmp_replicator_tables WHERE table_name = ?`, + UPDATE_PROGRESS: ` + UPDATE tmp_replicator_tables + SET last_value = ?, last_synced_at = datetime('now') + WHERE table_name = ? + `, +} + +export interface ReplicatedTable { + table_name: string + source_schema: string | null + tracking_column: string + last_value: string | null + interval_seconds: number + batch_size: number + is_active: number + last_synced_at: string | null +} + +export interface ReplicationResult { + table: string + rowsReplicated: number + lastValue: string | null + error?: string +} + +/** + * ReplicatorPlugin pulls data from a configured external data source (e.g. a + * Postgres database on Supabase) into the internal Durable Object SQLite so + * the instance can serve as a close-to-edge replica. + * + * Replication is append-only: for each registered table the user defines a + * monotonically increasing `tracking_column` (e.g. `id` or `created_at`). On + * each sync the plugin pulls rows where `tracking_column` is greater than the + * last value it observed and upserts them into the internal table. + */ +export class ReplicatorPlugin extends StarbasePlugin { + public pathPrefix: string = '/replicator' + private dataSource?: DataSource + private config?: StarbaseDBConfiguration + // When true the plugin opportunistically syncs any table whose poll + // interval has elapsed on incoming requests (edge-friendly pull cadence). + private autoSyncOnRequest: boolean + + constructor(opts?: { autoSyncOnRequest?: boolean }) { + super('starbasedb:replicator', { + requiresAuth: true, + }) + this.autoSyncOnRequest = opts?.autoSyncOnRequest ?? true + } + + override async register(app: StarbaseApp) { + app.use(async (c, next) => { + this.dataSource = c?.get('dataSource') + this.config = c?.get('config') + await this.init() + + if (this.autoSyncOnRequest) { + const due = this.syncDueTables() + if (this.dataSource?.executionContext) { + this.dataSource.executionContext.waitUntil(due) + } else { + await due + } + } + + await next() + }) + + // List the tables configured for replication. + app.get(this.pathPrefix + '/tables', async (c) => { + if (this.config?.role !== 'admin') { + return createResponse(undefined, 'Unauthorized request', 401) + } + + const tables = await this.listTables() + return createResponse({ tables }, undefined, 200) + }) + + // Register (or update) a table for replication. + app.post(this.pathPrefix + '/tables', async (c) => { + if (this.config?.role !== 'admin') { + return createResponse(undefined, 'Unauthorized request', 401) + } + + try { + const body = (await c.req.json()) as { + table?: string + schema?: string + trackingColumn?: string + intervalSeconds?: number + batchSize?: number + isActive?: boolean + } + + if (!body.table || !body.trackingColumn) { + return createResponse( + undefined, + '`table` and `trackingColumn` are required', + 400 + ) + } + + await this.registerTable({ + table: body.table, + schema: body.schema, + trackingColumn: body.trackingColumn, + intervalSeconds: body.intervalSeconds, + batchSize: body.batchSize, + isActive: body.isActive, + }) + + return createResponse({ success: true }, undefined, 200) + } catch (error: any) { + return createResponse( + undefined, + error?.message ?? 'Failed to register table', + 400 + ) + } + }) + + // Remove a table from replication. + app.delete(this.pathPrefix + '/tables/:table', async (c) => { + if (this.config?.role !== 'admin') { + return createResponse(undefined, 'Unauthorized request', 401) + } + + await this.dataSource?.rpc.executeQuery({ + sql: SQL_QUERIES.DELETE_TABLE, + params: [c.req.param('table')], + }) + + return createResponse({ success: true }, undefined, 200) + }) + + // Trigger a sync immediately. Optionally scope it to a single table + // via `?table=` so external schedulers (Cron Triggers, the cron + // plugin, etc.) can drive replication cadence. + app.post(this.pathPrefix + '/sync', async (c) => { + if (this.config?.role !== 'admin') { + return createResponse(undefined, 'Unauthorized request', 401) + } + + const table = c.req.query('table') + const results = await this.sync(table) + return createResponse({ results }, undefined, 200) + }) + } + + private async init() { + if (!this.dataSource) return + + await this.dataSource.rpc.executeQuery({ + sql: SQL_QUERIES.CREATE_TABLE, + params: [], + }) + } + + public async listTables(): Promise { + if (!this.dataSource) return [] + + const result = (await this.dataSource.rpc.executeQuery({ + sql: SQL_QUERIES.GET_TABLES, + params: [], + })) as QueryResult[] + + return (result ?? []) as unknown as ReplicatedTable[] + } + + public async registerTable(opts: { + table: string + schema?: string + trackingColumn: string + intervalSeconds?: number + batchSize?: number + isActive?: boolean + }) { + if (!this.dataSource) + throw new Error('ReplicatorPlugin not properly initialized') + + await this.dataSource.rpc.executeQuery({ + sql: SQL_QUERIES.UPSERT_TABLE, + params: [ + opts.table, + opts.schema ?? null, + opts.trackingColumn, + opts.intervalSeconds ?? 300, + opts.batchSize ?? 1000, + opts.isActive === false ? 0 : 1, + ], + }) + } + + /** + * Quote a SQL identifier for the given dialect. MySQL uses backticks, + * every other supported dialect uses double quotes. + */ + quoteIdentifier(name: string, dialect?: string): string { + if (dialect === 'mysql') { + return '`' + name.replace(/`/g, '``') + '`' + } + return '"' + name.replace(/"/g, '""') + '"' + } + + /** + * Render a value as a SQL literal. Numbers are emitted bare, everything + * else is single-quoted with embedded quotes escaped. Used only for the + * tracking-column watermark which originates from previously synced data. + */ + quoteLiteral(value: unknown): string { + if (value === null || value === undefined) return 'NULL' + if (typeof value === 'number' && Number.isFinite(value)) { + return String(value) + } + return `'${String(value).replace(/'/g, "''")}'` + } + + /** + * Build the SELECT statement issued against the external data source for + * a given table configuration. + */ + buildSelectQuery(table: ReplicatedTable, dialect?: string): string { + const trackingColumn = this.quoteIdentifier( + table.tracking_column, + dialect + ) + const qualifiedName = table.source_schema + ? `${this.quoteIdentifier(table.source_schema, dialect)}.${this.quoteIdentifier(table.table_name, dialect)}` + : this.quoteIdentifier(table.table_name, dialect) + + const where = + table.last_value !== null && table.last_value !== undefined + ? ` WHERE ${trackingColumn} > ${this.quoteLiteral(table.last_value)}` + : '' + + return `SELECT * FROM ${qualifiedName}${where} ORDER BY ${trackingColumn} ASC LIMIT ${Number(table.batch_size) || 1000}` + } + + /** + * Build an `INSERT OR REPLACE` statement that upserts a single external + * row into the internal SQLite table. + */ + buildUpsertQuery( + tableName: string, + row: Record + ): { sql: string; params: unknown[] } { + const columns = Object.keys(row) + const quotedColumns = columns.map((c) => this.quoteIdentifier(c)) + const placeholders = columns.map(() => '?') + + return { + sql: `INSERT OR REPLACE INTO ${this.quoteIdentifier(tableName)} (${quotedColumns.join(', ')}) VALUES (${placeholders.join(', ')})`, + params: columns.map((c) => row[c]), + } + } + + private async queryExternalSource(sql: string): Promise { + if (!this.dataSource?.external) { + throw new Error('No external data source is configured') + } + + // Reuse the shared query pipeline but force the external/hyperdrive + // path regardless of which source the incoming request targeted. + const externalDataSource: DataSource = { + ...this.dataSource, + source: + 'connectionString' in this.dataSource.external + ? 'hyperdrive' + : 'external', + } + + const result = await executeQuery({ + sql, + params: undefined, + isRaw: false, + dataSource: externalDataSource, + config: this.config ?? ({ role: 'admin' } as any), + }) + + return Array.isArray(result) ? result : [] + } + + /** + * Replicate a single configured table and return how many rows were + * pulled into the internal database. + */ + async syncTable(table: ReplicatedTable): Promise { + if (!this.dataSource) { + throw new Error('ReplicatorPlugin not properly initialized') + } + + const dialect = this.dataSource.external?.dialect + const selectQuery = this.buildSelectQuery(table, dialect) + const rows = (await this.queryExternalSource(selectQuery)) as Record< + string, + unknown + >[] + + let lastValue = table.last_value + + for (const row of rows) { + const { sql, params } = this.buildUpsertQuery(table.table_name, row) + await this.dataSource.rpc.executeQuery({ sql, params }) + + const trackingValue = row[table.tracking_column] + if (trackingValue !== undefined && trackingValue !== null) { + lastValue = String(trackingValue) + } + } + + // Persist the new watermark so the next sync is append-only. + await this.dataSource.rpc.executeQuery({ + sql: SQL_QUERIES.UPDATE_PROGRESS, + params: [lastValue ?? null, table.table_name], + }) + + return { + table: table.table_name, + rowsReplicated: rows.length, + lastValue: lastValue ?? null, + } + } + + /** + * Sync every active table, or just the named table when provided. + */ + async sync(tableName?: string | null): Promise { + if (!this.dataSource) return [] + + const tables = await this.listTables() + const results: ReplicationResult[] = [] + + for (const table of tables) { + if (tableName && table.table_name !== tableName) continue + if (!tableName && !table.is_active) continue + + try { + results.push(await this.syncTable(table)) + } catch (error: any) { + console.error( + `Replicator failed to sync table ${table.table_name}:`, + error + ) + results.push({ + table: table.table_name, + rowsReplicated: 0, + lastValue: table.last_value, + error: error?.message ?? String(error), + }) + } + } + + return results + } + + /** + * Returns true when a table's poll interval has elapsed since its last + * successful sync (or it has never synced). + */ + isDue(table: ReplicatedTable, now: number = Date.now()): boolean { + if (!table.is_active) return false + if (!table.last_synced_at) return true + + // `last_synced_at` is stored as a UTC datetime string. + const lastSynced = Date.parse(table.last_synced_at + 'Z') + if (Number.isNaN(lastSynced)) return true + + return now - lastSynced >= (Number(table.interval_seconds) || 0) * 1000 + } + + /** + * Sync any active table whose poll interval has elapsed. + */ + async syncDueTables(): Promise { + if (!this.dataSource?.external) return [] + + const now = Date.now() + const tables = await this.listTables() + const results: ReplicationResult[] = [] + + for (const table of tables) { + if (!this.isDue(table, now)) continue + + try { + results.push(await this.syncTable(table)) + } catch (error: any) { + console.error( + `Replicator failed to sync table ${table.table_name}:`, + error + ) + } + } + + return results + } +} diff --git a/plugins/replicator/meta.json b/plugins/replicator/meta.json new file mode 100644 index 0000000..3127f42 --- /dev/null +++ b/plugins/replicator/meta.json @@ -0,0 +1,15 @@ +{ + "version": "1.0.0", + "resources": { + "tables": { + "tmp_replicator_tables": "Tracks which external tables are replicated, their tracking column and the last synced watermark." + }, + "secrets": {}, + "variables": {} + }, + "dependencies": { + "tables": {}, + "secrets": {}, + "variables": {} + } +} diff --git a/pnpm-workspace.yaml b/pnpm-workspace.yaml new file mode 100644 index 0000000..0df06b7 --- /dev/null +++ b/pnpm-workspace.yaml @@ -0,0 +1,3 @@ +allowBuilds: + esbuild: set this to true or false + workerd: set this to true or false diff --git a/src/index.ts b/src/index.ts index 4d08932..16c550b 100644 --- a/src/index.ts +++ b/src/index.ts @@ -12,6 +12,7 @@ import { QueryLogPlugin } from '../plugins/query-log' import { StatsPlugin } from '../plugins/stats' import { CronPlugin } from '../plugins/cron' import { InterfacePlugin } from '../plugins/interface' +import { ReplicatorPlugin } from '../plugins/replicator' export { StarbaseDBDurableObject } from './do' @@ -225,6 +226,7 @@ export default { cdcPlugin, cronPlugin, new StatsPlugin(), + new ReplicatorPlugin(), interfacePlugin, ] satisfies StarbasePlugin[]