Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ This server synchronizes your Stripe account to a Postgres database. It can be a
- [x] `product.created` 🟢
- [x] `product.deleted` 🟢
- [x] `product.updated` 🟢
- [x] `radar.early_fraud_warning.created` 🟢
- [x] `radar.early_fraud_warning.updated` 🟢
- [x] `setup_intent.canceled` 🟢
- [x] `setup_intent.created` 🟢
- [x] `setup_intent.requires_action` 🟢
Expand Down
3 changes: 2 additions & 1 deletion packages/fastify-app/src/test/invoices.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { vitest, beforeAll, describe, test, expect } from 'vitest'
import { runMigrations } from '@supabase/stripe-sync-engine'
import { getConfig } from '../utils/config'
import { mockStripe } from './helpers/mockStripe'
import { logger } from '../logger'

let stripeSync: StripeSync

Expand All @@ -15,7 +16,7 @@ beforeAll(async () => {
await runMigrations({
databaseUrl: config.databaseUrl,
schema: config.schema,
logger: config.logger,
logger,
})

stripeSync = new StripeSync(config)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"id": "evt_1KJrGtJDPojXS6LN15fcthM3",
"object": "event",
"api_version": "2020-03-02",
"created": 1642649111,
"data": {
"object": {
"id": "issfr_1NnrwHBw2dPENLoi9lnhV3RQ",
"object": "radar.early_fraud_warning",
"actionable": true,
"charge": "ch_1234",
"created": 123456789,
"fraud_type": "misc",
"livemode": false
}
},
"livemode": false,
"pending_webhooks": 3,
"request": {
"id": null,
"idempotency_key": null
},
"type": "radar.early_fraud_warning.created"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"id": "evt_1KJrGtJDPojXS6LN15fcthM3",
"object": "event",
"api_version": "2020-03-02",
"created": 1642649111,
"data": {
"object": {
"id": "issfr_1NnrwHBw2dPENLoi9lnhV3RQ",
"object": "radar.early_fraud_warning",
"actionable": true,
"charge": "ch_1234",
"created": 123456789,
"fraud_type": "misc",
"livemode": false
}
},
"livemode": false,
"pending_webhooks": 3,
"request": {
"id": null,
"idempotency_key": null
},
"type": "radar.early_fraud_warning.updated"
}
4 changes: 3 additions & 1 deletion packages/fastify-app/src/test/webhooks.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ describe('POST /webhooks', () => {
await runMigrations({
databaseUrl: config.databaseUrl,
schema: config.schema,
logger: config.logger,
logger,
})

process.env.AUTO_EXPAND_LISTS = 'false'
Expand Down Expand Up @@ -96,6 +96,8 @@ describe('POST /webhooks', () => {
'credit_note_created',
'credit_note_updated',
'credit_note_voided',
'early_fraud_warning_created',
'early_fraud_warning_updated',
])('process event %s', async (jsonFile) => {
const eventBody = await import(`./stripe/${jsonFile}`).then(({ default: myData }) => myData)
const signature = createHmac('sha256', stripeWebhookSecret)
Expand Down
22 changes: 22 additions & 0 deletions packages/sync-engine/db/migrations/0028_early_fraud_warning.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
create table
if not exists "stripe"."early_fraud_warnings" (
"id" text primary key,
object text,
actionable boolean,
charge text,
created integer,
fraud_type text,
livemode boolean,
payment_intent text,
updated_at timestamptz default timezone('utc'::text, now()) not null
);

create index stripe_early_fraud_warnings_charge_idx on "stripe"."early_fraud_warnings" using btree (charge);

create index stripe_early_fraud_warnings_payment_intent_idx on "stripe"."early_fraud_warnings" using btree (payment_intent);

create trigger handle_updated_at
before update
on stripe.early_fraud_warnings
for each row
execute procedure set_updated_at();
14 changes: 14 additions & 0 deletions packages/sync-engine/src/schemas/early_fraud_warning.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import type { EntitySchema } from './types'

export const earlyFraudWarningSchema: EntitySchema = {
properties: [
'id',
'object',
'actionable',
'charge',
'created',
'fraud_type',
'livemode',
'payment_intent',
],
} as const
86 changes: 75 additions & 11 deletions packages/sync-engine/src/stripeSync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ import { subscriptionItemSchema } from './schemas/subscription_item'
import { subscriptionScheduleSchema } from './schemas/subscription_schedules'
import { subscriptionSchema } from './schemas/subscription'
import { StripeSyncConfig, Sync, SyncBackfill, SyncBackfillParams } from './types'
import { earlyFraudWarningSchema } from './schemas/early_fraud_warning'

// eslint-disable-next-line @typescript-eslint/no-explicit-any
function getUniqueIds(entries: any[], key: string): string[] {
function getUniqueIds<T>(entries: T[], key: string): string[] {
const set = new Set(
entries
.map((subscription) => subscription?.[key]?.toString())
.map((subscription) => subscription?.[key as keyof T]?.toString())
.filter((it): it is string => Boolean(it))
)

Expand Down Expand Up @@ -367,6 +367,22 @@ export class StripeSync {
break
}

case 'radar.early_fraud_warning.created':
case 'radar.early_fraud_warning.updated': {
const earlyFraudWarning = await this.fetchOrUseWebhookData(
event.data.object as Stripe.Radar.EarlyFraudWarning,
(id) => this.stripe.radar.earlyFraudWarnings.retrieve(id)
)

this.config.logger?.info(
`Received webhook ${event.id}: ${event.type} for earlyFraudWarning ${earlyFraudWarning.id}`
)

await this.upsertEarlyFraudWarning([earlyFraudWarning])

break
}

default:
throw new Error('Unhandled webhook event')
}
Expand Down Expand Up @@ -420,6 +436,10 @@ export class StripeSync {
return this.stripe.taxIds.retrieve(stripeId).then((it) => this.upsertTaxIds([it]))
} else if (stripeId.startsWith('cn_')) {
return this.stripe.creditNotes.retrieve(stripeId).then((it) => this.upsertCreditNotes([it]))
} else if (stripeId.startsWith('issfr_')) {
return this.stripe.radar.earlyFraudWarnings
.retrieve(stripeId)
.then((it) => this.upsertEarlyFraudWarning([it]))
}
}

Expand All @@ -438,7 +458,8 @@ export class StripeSync {
paymentIntents,
plans,
taxIds,
creditNotes
creditNotes,
earlyFraudWarnings

switch (object) {
case 'all':
Expand All @@ -456,6 +477,7 @@ export class StripeSync {
taxIds = await this.syncTaxIds(params)
creditNotes = await this.syncCreditNotes(params)
disputes = await this.syncDisputes(params)
earlyFraudWarnings = await this.syncEarlyFraudWarnings(params)
break
case 'customer':
customers = await this.syncCustomers(params)
Expand Down Expand Up @@ -498,6 +520,9 @@ export class StripeSync {
case 'credit_note':
creditNotes = await this.syncCreditNotes(params)
break
case 'early_fraud_warning':
earlyFraudWarnings = await this.syncEarlyFraudWarnings(params)
break
default:
break
}
Expand All @@ -517,6 +542,7 @@ export class StripeSync {
plans,
taxIds,
creditNotes,
earlyFraudWarnings,
}
}

Expand Down Expand Up @@ -701,6 +727,18 @@ export class StripeSync {
)
}

async syncEarlyFraudWarnings(syncParams?: SyncBackfillParams): Promise<Sync> {
this.config.logger?.info('Syncing early fraud warnings')

const params: Stripe.Radar.EarlyFraudWarningListParams = { limit: 100 }
if (syncParams?.created) params.created = syncParams.created

return this.fetchAndUpsert(
() => this.stripe.radar.earlyFraudWarnings.list(params),
(items) => this.upsertEarlyFraudWarning(items, syncParams?.backfillRelatedEntities)
)
}

async syncCreditNotes(syncParams?: SyncBackfillParams): Promise<Sync> {
this.config.logger?.info('Syncing credit notes')

Expand Down Expand Up @@ -749,8 +787,6 @@ export class StripeSync {
])
}

// Stripe only sends the first 10 refunds by default, the option will actively fetch all refunds

await this.expandEntity(charges, 'refunds', (id) =>
this.stripe.refunds.list({ charge: id, limit: 100 })
)
Expand All @@ -766,6 +802,17 @@ export class StripeSync {
).then((charges) => this.upsertCharges(charges))
}

private async backfillPaymentIntents(paymentIntentIds: string[]) {
const missingIds = await this.postgresClient.findMissingEntries(
'payment_intents',
paymentIntentIds
)

await this.fetchMissingEntities(missingIds, (id) =>
this.stripe.paymentIntents.retrieve(id)
).then((paymentIntents) => this.upsertPaymentIntents(paymentIntents))
Comment thread
kevcodez marked this conversation as resolved.
}

private async upsertCreditNotes(
creditNotes: Stripe.CreditNote[],
backfillRelatedEntities?: boolean
Expand All @@ -777,14 +824,31 @@ export class StripeSync {
])
}

// Stripe only sends the first 10 line items by default, the option will actively fetch all line items
await this.expandEntity(creditNotes, 'lines', (id) =>
this.stripe.creditNotes.listLineItems(id, { limit: 100 })
)

return this.postgresClient.upsertMany(creditNotes, 'credit_notes', creditNoteSchema)
}

private async upsertEarlyFraudWarning(
earlyFraudWarnings: Stripe.Radar.EarlyFraudWarning[],
backfillRelatedEntities?: boolean
): Promise<Stripe.Radar.EarlyFraudWarning[]> {
if (backfillRelatedEntities ?? this.config.backfillRelatedEntities) {
await Promise.all([
this.backfillPaymentIntents(getUniqueIds(earlyFraudWarnings, 'payment_intent')),
this.backfillCharges(getUniqueIds(earlyFraudWarnings, 'charge')),
])
}

return this.postgresClient.upsertMany(
earlyFraudWarnings,
'early_fraud_warnings',
earlyFraudWarningSchema
)
}

async upsertCustomers(
customers: (Stripe.Customer | Stripe.DeletedCustomer)[]
): Promise<(Stripe.Customer | Stripe.DeletedCustomer)[]> {
Expand Down Expand Up @@ -830,8 +894,6 @@ export class StripeSync {
])
}

// Stripe only sends the first 10 line items by default, the option will actively fetch all line items

await this.expandEntity(invoices, 'lines', (id) =>
this.stripe.invoices.listLineItems(id, { limit: 100 })
)
Expand Down Expand Up @@ -1022,7 +1084,6 @@ export class StripeSync {
await this.backfillCustomers(customerIds)
}

// Stripe only sends the first 10 items by default, the option will actively fetch all items
await this.expandEntity(subscriptions, 'items', (id) =>
this.stripe.subscriptionItems.list({ subscription: id, limit: 100 })
)
Expand Down Expand Up @@ -1076,9 +1137,12 @@ export class StripeSync {
).then((subscriptionSchedules) => this.upsertSubscriptionSchedules(subscriptionSchedules))
}

/**
* Stripe only sends the first 10 entries by default, the option will actively fetch all entries.
*/
private async expandEntity<
K,
P extends string,
P extends keyof T,
T extends { id?: string } & { [key in P]?: Stripe.ApiList<K> | null },
>(entities: T[], property: P, listFn: (id: string) => Stripe.ApiListPromise<K>) {
if (!this.config.autoExpandLists) return
Expand Down
2 changes: 2 additions & 0 deletions packages/sync-engine/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ export type SyncObject =
| 'plan'
| 'tax_id'
| 'credit_note'
| 'early_fraud_warning'

export interface Sync {
synced: number
Expand All @@ -76,6 +77,7 @@ export interface SyncBackfill {
charges?: Sync
taxIds?: Sync
creditNotes?: Sync
earlyFraudWarnings?: Sync
}

export interface SyncBackfillParams {
Expand Down