From af8cccf54630b29d170a57e3cc4a6e6ed8ed2261 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kevin=20Gr=C3=BCneberg?= Date: Mon, 2 Jun 2025 12:02:18 +0800 Subject: [PATCH 1/2] feat: sync early fraud warnings --- README.md | 2 + .../fastify-app/src/test/invoices.test.ts | 3 +- .../stripe/early_fraud_warning_created.json | 24 ++++++ .../stripe/early_fraud_warning_updated.json | 24 ++++++ .../fastify-app/src/test/webhooks.test.ts | 4 +- .../migrations/0028_early_fraud_warning.sql | 22 +++++ .../src/schemas/early_fraud_warning.ts | 14 +++ packages/sync-engine/src/stripeSync.ts | 86 ++++++++++++++++--- packages/sync-engine/src/types.ts | 2 + 9 files changed, 168 insertions(+), 13 deletions(-) create mode 100644 packages/fastify-app/src/test/stripe/early_fraud_warning_created.json create mode 100644 packages/fastify-app/src/test/stripe/early_fraud_warning_updated.json create mode 100644 packages/sync-engine/db/migrations/0028_early_fraud_warning.sql create mode 100644 packages/sync-engine/src/schemas/early_fraud_warning.ts diff --git a/README.md b/README.md index f9f825fe7..c197ba169 100644 --- a/README.md +++ b/README.md @@ -100,6 +100,8 @@ This server synchronizes your Stripe account to a Postgres database. It can be a - [x] `product.created` 🟢 - [x] `product.deleted` 🟢 - [x] `product.updated` 🟢 +- [x] `radar.early_fraud_warning.created` 🟢 +- [x] `radar.early_fraud_warning.updated` 🟢 - [x] `setup_intent.canceled` 🟢 - [x] `setup_intent.created` 🟢 - [x] `setup_intent.requires_action` 🟢 diff --git a/packages/fastify-app/src/test/invoices.test.ts b/packages/fastify-app/src/test/invoices.test.ts index 2716698c4..057bb4be9 100644 --- a/packages/fastify-app/src/test/invoices.test.ts +++ b/packages/fastify-app/src/test/invoices.test.ts @@ -4,6 +4,7 @@ import { vitest, beforeAll, describe, test, expect } from 'vitest' import { runMigrations } from '@supabase/stripe-sync-engine' import { getConfig } from '../utils/config' import { mockStripe } from './helpers/mockStripe' +import { logger } from '../logger' let stripeSync: StripeSync @@ -15,7 +16,7 @@ beforeAll(async () => { await runMigrations({ databaseUrl: config.databaseUrl, schema: config.schema, - logger: config.logger, + logger, }) stripeSync = new StripeSync(config) diff --git a/packages/fastify-app/src/test/stripe/early_fraud_warning_created.json b/packages/fastify-app/src/test/stripe/early_fraud_warning_created.json new file mode 100644 index 000000000..3dd6c96ad --- /dev/null +++ b/packages/fastify-app/src/test/stripe/early_fraud_warning_created.json @@ -0,0 +1,24 @@ +{ + "id": "evt_1KJrGtJDPojXS6LN15fcthM3", + "object": "event", + "api_version": "2020-03-02", + "created": 1642649111, + "data": { + "object": { + "id": "issfr_1NnrwHBw2dPENLoi9lnhV3RQ", + "object": "radar.early_fraud_warning", + "actionable": true, + "charge": "ch_1234", + "created": 123456789, + "fraud_type": "misc", + "livemode": false + } + }, + "livemode": false, + "pending_webhooks": 3, + "request": { + "id": null, + "idempotency_key": null + }, + "type": "radar.early_fraud_warning.created" +} diff --git a/packages/fastify-app/src/test/stripe/early_fraud_warning_updated.json b/packages/fastify-app/src/test/stripe/early_fraud_warning_updated.json new file mode 100644 index 000000000..b1b73d800 --- /dev/null +++ b/packages/fastify-app/src/test/stripe/early_fraud_warning_updated.json @@ -0,0 +1,24 @@ +{ + "id": "evt_1KJrGtJDPojXS6LN15fcthM3", + "object": "event", + "api_version": "2020-03-02", + "created": 1642649111, + "data": { + "object": { + "id": "issfr_1NnrwHBw2dPENLoi9lnhV3RQ", + "object": "radar.early_fraud_warning", + "actionable": true, + "charge": "ch_1234", + "created": 123456789, + "fraud_type": "misc", + "livemode": false + } + }, + "livemode": false, + "pending_webhooks": 3, + "request": { + "id": null, + "idempotency_key": null + }, + "type": "radar.early_fraud_warning.updated" +} diff --git a/packages/fastify-app/src/test/webhooks.test.ts b/packages/fastify-app/src/test/webhooks.test.ts index e1f71cb5e..015806ceb 100644 --- a/packages/fastify-app/src/test/webhooks.test.ts +++ b/packages/fastify-app/src/test/webhooks.test.ts @@ -20,7 +20,7 @@ describe('POST /webhooks', () => { await runMigrations({ databaseUrl: config.databaseUrl, schema: config.schema, - logger: config.logger, + logger, }) process.env.AUTO_EXPAND_LISTS = 'false' @@ -96,6 +96,8 @@ describe('POST /webhooks', () => { 'credit_note_created', 'credit_note_updated', 'credit_note_voided', + 'early_fraud_warning_created', + 'early_fraud_warning_updated', ])('process event %s', async (jsonFile) => { const eventBody = await import(`./stripe/${jsonFile}`).then(({ default: myData }) => myData) const signature = createHmac('sha256', stripeWebhookSecret) diff --git a/packages/sync-engine/db/migrations/0028_early_fraud_warning.sql b/packages/sync-engine/db/migrations/0028_early_fraud_warning.sql new file mode 100644 index 000000000..e1872ffd0 --- /dev/null +++ b/packages/sync-engine/db/migrations/0028_early_fraud_warning.sql @@ -0,0 +1,22 @@ +create table + if not exists "stripe"."early_fraud_warnings" ( + "id" text primary key, + object text, + actionable boolean, + charge text, + created integer, + fraud_type text, + livemode boolean, + payment_intent text, + updated_at timestamptz default timezone('utc'::text, now()) not null + ); + +create index stripe_early_fraud_warnings_customer_idx on "stripe"."early_fraud_warnings" using btree (charge); + +create index stripe_early_fraud_warnings_invoice_idx on "stripe"."early_fraud_warnings" using btree (payment_intent); + +create trigger handle_updated_at + before update + on stripe.early_fraud_warnings + for each row + execute procedure set_updated_at(); diff --git a/packages/sync-engine/src/schemas/early_fraud_warning.ts b/packages/sync-engine/src/schemas/early_fraud_warning.ts new file mode 100644 index 000000000..2ce2d26f2 --- /dev/null +++ b/packages/sync-engine/src/schemas/early_fraud_warning.ts @@ -0,0 +1,14 @@ +import type { EntitySchema } from './types' + +export const earlyFraudWarningSchema: EntitySchema = { + properties: [ + 'id', + 'object', + 'actionable', + 'charge', + 'created', + 'fraud_type', + 'livemode', + 'payment_intent', + ], +} as const diff --git a/packages/sync-engine/src/stripeSync.ts b/packages/sync-engine/src/stripeSync.ts index 2cb5129e6..e81e29b71 100644 --- a/packages/sync-engine/src/stripeSync.ts +++ b/packages/sync-engine/src/stripeSync.ts @@ -17,12 +17,12 @@ import { subscriptionItemSchema } from './schemas/subscription_item' import { subscriptionScheduleSchema } from './schemas/subscription_schedules' import { subscriptionSchema } from './schemas/subscription' import { StripeSyncConfig, Sync, SyncBackfill, SyncBackfillParams } from './types' +import { earlyFraudWarningSchema } from './schemas/early_fraud_warning' -// eslint-disable-next-line @typescript-eslint/no-explicit-any -function getUniqueIds(entries: any[], key: string): string[] { +function getUniqueIds(entries: T[], key: string): string[] { const set = new Set( entries - .map((subscription) => subscription?.[key]?.toString()) + .map((subscription) => subscription?.[key as keyof T]?.toString()) .filter((it): it is string => Boolean(it)) ) @@ -367,6 +367,22 @@ export class StripeSync { break } + case 'radar.early_fraud_warning.created': + case 'radar.early_fraud_warning.updated': { + const earlyFraudWarning = await this.fetchOrUseWebhookData( + event.data.object as Stripe.Radar.EarlyFraudWarning, + (id) => this.stripe.radar.earlyFraudWarnings.retrieve(id) + ) + + this.config.logger?.info( + `Received webhook ${event.id}: ${event.type} for earlyFraudWarning ${earlyFraudWarning.id}` + ) + + await this.upsertEarlyFraudWarning([earlyFraudWarning]) + + break + } + default: throw new Error('Unhandled webhook event') } @@ -420,6 +436,10 @@ export class StripeSync { return this.stripe.taxIds.retrieve(stripeId).then((it) => this.upsertTaxIds([it])) } else if (stripeId.startsWith('cn_')) { return this.stripe.creditNotes.retrieve(stripeId).then((it) => this.upsertCreditNotes([it])) + } else if (stripeId.startsWith('issfr_')) { + return this.stripe.radar.earlyFraudWarnings + .retrieve(stripeId) + .then((it) => this.upsertEarlyFraudWarning([it])) } } @@ -438,7 +458,8 @@ export class StripeSync { paymentIntents, plans, taxIds, - creditNotes + creditNotes, + earlyFraudWarnings switch (object) { case 'all': @@ -456,6 +477,7 @@ export class StripeSync { taxIds = await this.syncTaxIds(params) creditNotes = await this.syncCreditNotes(params) disputes = await this.syncDisputes(params) + earlyFraudWarnings = await this.syncEarlyFraudWarnings(params) break case 'customer': customers = await this.syncCustomers(params) @@ -498,6 +520,9 @@ export class StripeSync { case 'credit_note': creditNotes = await this.syncCreditNotes(params) break + case 'early_fraud_warning': + earlyFraudWarnings = await this.syncEarlyFraudWarnings(params) + break default: break } @@ -517,6 +542,7 @@ export class StripeSync { plans, taxIds, creditNotes, + earlyFraudWarnings, } } @@ -701,6 +727,18 @@ export class StripeSync { ) } + async syncEarlyFraudWarnings(syncParams?: SyncBackfillParams): Promise { + this.config.logger?.info('Syncing early fraud warnings') + + const params: Stripe.Radar.EarlyFraudWarningListParams = { limit: 100 } + if (syncParams?.created) params.created = syncParams.created + + return this.fetchAndUpsert( + () => this.stripe.radar.earlyFraudWarnings.list(params), + (items) => this.upsertEarlyFraudWarning(items, syncParams?.backfillRelatedEntities) + ) + } + async syncCreditNotes(syncParams?: SyncBackfillParams): Promise { this.config.logger?.info('Syncing credit notes') @@ -749,8 +787,6 @@ export class StripeSync { ]) } - // Stripe only sends the first 10 refunds by default, the option will actively fetch all refunds - await this.expandEntity(charges, 'refunds', (id) => this.stripe.refunds.list({ charge: id, limit: 100 }) ) @@ -766,6 +802,17 @@ export class StripeSync { ).then((charges) => this.upsertCharges(charges)) } + private async backfillPaymentIntents(paymentIntentIds: string[]) { + const missingIds = await this.postgresClient.findMissingEntries( + 'payment_intents', + paymentIntentIds + ) + + await this.fetchMissingEntities(missingIds, (id) => + this.stripe.paymentIntents.retrieve(id) + ).then((paymentIntents) => this.upsertPaymentIntents(paymentIntents)) + } + private async upsertCreditNotes( creditNotes: Stripe.CreditNote[], backfillRelatedEntities?: boolean @@ -777,7 +824,6 @@ export class StripeSync { ]) } - // Stripe only sends the first 10 line items by default, the option will actively fetch all line items await this.expandEntity(creditNotes, 'lines', (id) => this.stripe.creditNotes.listLineItems(id, { limit: 100 }) ) @@ -785,6 +831,24 @@ export class StripeSync { return this.postgresClient.upsertMany(creditNotes, 'credit_notes', creditNoteSchema) } + private async upsertEarlyFraudWarning( + earlyFraudWarnings: Stripe.Radar.EarlyFraudWarning[], + backfillRelatedEntities?: boolean + ): Promise { + if (backfillRelatedEntities ?? this.config.backfillRelatedEntities) { + await Promise.all([ + this.backfillPaymentIntents(getUniqueIds(earlyFraudWarnings, 'payment_intent')), + this.backfillCharges(getUniqueIds(earlyFraudWarnings, 'charge')), + ]) + } + + return this.postgresClient.upsertMany( + earlyFraudWarnings, + 'early_fraud_warnings', + earlyFraudWarningSchema + ) + } + async upsertCustomers( customers: (Stripe.Customer | Stripe.DeletedCustomer)[] ): Promise<(Stripe.Customer | Stripe.DeletedCustomer)[]> { @@ -830,8 +894,6 @@ export class StripeSync { ]) } - // Stripe only sends the first 10 line items by default, the option will actively fetch all line items - await this.expandEntity(invoices, 'lines', (id) => this.stripe.invoices.listLineItems(id, { limit: 100 }) ) @@ -1022,7 +1084,6 @@ export class StripeSync { await this.backfillCustomers(customerIds) } - // Stripe only sends the first 10 items by default, the option will actively fetch all items await this.expandEntity(subscriptions, 'items', (id) => this.stripe.subscriptionItems.list({ subscription: id, limit: 100 }) ) @@ -1076,9 +1137,12 @@ export class StripeSync { ).then((subscriptionSchedules) => this.upsertSubscriptionSchedules(subscriptionSchedules)) } + /** + * Stripe only sends the first 10 entries by default, the option will actively fetch all entries. + */ private async expandEntity< K, - P extends string, + P extends keyof T, T extends { id?: string } & { [key in P]?: Stripe.ApiList | null }, >(entities: T[], property: P, listFn: (id: string) => Stripe.ApiListPromise) { if (!this.config.autoExpandLists) return diff --git a/packages/sync-engine/src/types.ts b/packages/sync-engine/src/types.ts index dc5cc7f76..36896b28f 100644 --- a/packages/sync-engine/src/types.ts +++ b/packages/sync-engine/src/types.ts @@ -56,6 +56,7 @@ export type SyncObject = | 'plan' | 'tax_id' | 'credit_note' + | 'early_fraud_warning' export interface Sync { synced: number @@ -76,6 +77,7 @@ export interface SyncBackfill { charges?: Sync taxIds?: Sync creditNotes?: Sync + earlyFraudWarnings?: Sync } export interface SyncBackfillParams { From 3583589a852989276242b743fce78c27afa588e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kevin=20Gr=C3=BCneberg?= Date: Mon, 2 Jun 2025 12:37:22 +0800 Subject: [PATCH 2/2] Update 0028_early_fraud_warning.sql --- .../sync-engine/db/migrations/0028_early_fraud_warning.sql | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/sync-engine/db/migrations/0028_early_fraud_warning.sql b/packages/sync-engine/db/migrations/0028_early_fraud_warning.sql index e1872ffd0..603b595a3 100644 --- a/packages/sync-engine/db/migrations/0028_early_fraud_warning.sql +++ b/packages/sync-engine/db/migrations/0028_early_fraud_warning.sql @@ -11,9 +11,9 @@ create table updated_at timestamptz default timezone('utc'::text, now()) not null ); -create index stripe_early_fraud_warnings_customer_idx on "stripe"."early_fraud_warnings" using btree (charge); +create index stripe_early_fraud_warnings_charge_idx on "stripe"."early_fraud_warnings" using btree (charge); -create index stripe_early_fraud_warnings_invoice_idx on "stripe"."early_fraud_warnings" using btree (payment_intent); +create index stripe_early_fraud_warnings_payment_intent_idx on "stripe"."early_fraud_warnings" using btree (payment_intent); create trigger handle_updated_at before update