Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions packages/fastify-app/.env.sample
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,7 @@ BACKFILL_RELATED_ENTITIES=true
# optional, default 10
# Max number of connections for the Postgres connection pool, higher value lead to more concurrent queries, but also more load on the database (connections are expensive)
MAX_POSTGRES_CONNECTIONS=20

# If true, the webhook data is not used and instead the webhook is just a trigger to fetch the entity from Stripe again. This ensures that a race condition with failed webhooks can never accidentally overwrite the data with an older state.
# Default: false
REVALIDATE_ENTITY_VIA_STRIPE_API=false
4 changes: 4 additions & 0 deletions packages/fastify-app/src/utils/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ export type StripeSyncServerConfig = {

maxPostgresConnections?: number

revalidateEntityViaStripeApi: boolean

port: number
}

Expand All @@ -58,5 +60,7 @@ export function getConfig(): StripeSyncServerConfig {
autoExpandLists: getConfigFromEnv('AUTO_EXPAND_LISTS', 'false') === 'true',
backfillRelatedEntities: getConfigFromEnv('BACKFILL_RELATED_ENTITIES', 'true') === 'true',
maxPostgresConnections: Number(getConfigFromEnv('MAX_POSTGRES_CONNECTIONS', '10')),
revalidateEntityViaStripeApi:
getConfigFromEnv('REVALIDATE_ENTITY_VIA_STRIPE_API', 'false') === 'true',
}
}
128 changes: 104 additions & 24 deletions packages/sync-engine/src/stripeSync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ export class StripeSync {
case 'charge.refunded':
case 'charge.succeeded':
case 'charge.updated': {
const charge = event.data.object as Stripe.Charge
const charge = await this.fetchOrUseWebhookData(event.data.object as Stripe.Charge, (id) =>
this.stripe.charges.retrieve(id)
)

this.config.logger?.info(
`Received webhook ${event.id}: ${event.type} for charge ${charge.id}`
Expand All @@ -84,7 +86,10 @@ export class StripeSync {
case 'customer.created':
case 'customer.deleted':
case 'customer.updated': {
const customer = event.data.object as Stripe.Customer
const customer = await this.fetchOrUseWebhookData(
event.data.object as Stripe.Customer | Stripe.DeletedCustomer,
(id) => this.stripe.customers.retrieve(id)
)

this.config.logger?.info(
`Received webhook ${event.id}: ${event.type} for customer ${customer.id}`
Expand All @@ -101,7 +106,10 @@ export class StripeSync {
case 'customer.subscription.trial_will_end':
case 'customer.subscription.resumed':
case 'customer.subscription.updated': {
const subscription = event.data.object as Stripe.Subscription
const subscription = await this.fetchOrUseWebhookData(
event.data.object as Stripe.Subscription,
(id) => this.stripe.subscriptions.retrieve(id)
)

this.config.logger?.info(
`Received webhook ${event.id}: ${event.type} for subscription ${subscription.id}`
Expand All @@ -112,7 +120,9 @@ export class StripeSync {
}
case 'customer.tax_id.updated':
case 'customer.tax_id.created': {
const taxId = event.data.object as Stripe.TaxId
const taxId = await this.fetchOrUseWebhookData(event.data.object as Stripe.TaxId, (id) =>
this.stripe.taxIds.retrieve(id)
)

this.config.logger?.info(
`Received webhook ${event.id}: ${event.type} for taxId ${taxId.id}`
Expand Down Expand Up @@ -144,7 +154,10 @@ export class StripeSync {
case 'invoice.voided':
case 'invoice.marked_uncollectible':
case 'invoice.updated': {
const invoice = event.data.object as Stripe.Invoice
const invoice = await this.fetchOrUseWebhookData(
event.data.object as Stripe.Invoice,
(id) => this.stripe.invoices.retrieve(id)
)

this.config.logger?.info(
`Received webhook ${event.id}: ${event.type} for invoice ${invoice.id}`
Expand All @@ -155,13 +168,25 @@ export class StripeSync {
}
case 'product.created':
case 'product.updated': {
const product = event.data.object as Stripe.Product
try {
const product = await this.fetchOrUseWebhookData(
event.data.object as Stripe.Product,
(id) => this.stripe.products.retrieve(id)
)

this.config.logger?.info(
`Received webhook ${event.id}: ${event.type} for product ${product.id}`
)
this.config.logger?.info(
`Received webhook ${event.id}: ${event.type} for product ${product.id}`
)

await this.upsertProducts([product])
} catch (err) {
if (err instanceof Stripe.errors.StripeAPIError && err.code === 'resource_missing') {
await this.deleteProduct(event.data.object.id)
} else {
throw err
}
}

await this.upsertProducts([product])
break
}
case 'product.deleted': {
Expand All @@ -176,13 +201,24 @@ export class StripeSync {
}
case 'price.created':
case 'price.updated': {
const price = event.data.object as Stripe.Price
try {
const price = await this.fetchOrUseWebhookData(event.data.object as Stripe.Price, (id) =>
this.stripe.prices.retrieve(id)
)

this.config.logger?.info(
`Received webhook ${event.id}: ${event.type} for price ${price.id}`
)
this.config.logger?.info(
`Received webhook ${event.id}: ${event.type} for price ${price.id}`
)

await this.upsertPrices([price])
} catch (err) {
if (err instanceof Stripe.errors.StripeAPIError && err.code === 'resource_missing') {
await this.deletePrice(event.data.object.id)
} else {
throw err
}
}

await this.upsertPrices([price])
break
}
case 'price.deleted': {
Expand All @@ -197,11 +233,24 @@ export class StripeSync {
}
case 'plan.created':
case 'plan.updated': {
const plan = event.data.object as Stripe.Plan
try {
const plan = await this.fetchOrUseWebhookData(event.data.object as Stripe.Plan, (id) =>
this.stripe.plans.retrieve(id)
)

this.config.logger?.info(`Received webhook ${event.id}: ${event.type} for plan ${plan.id}`)
this.config.logger?.info(
`Received webhook ${event.id}: ${event.type} for plan ${plan.id}`
)

await this.upsertPlans([plan])
} catch (err) {
if (err instanceof Stripe.errors.StripeAPIError && err.code === 'resource_missing') {
await this.deletePlan(event.data.object.id)
} else {
throw err
}
}

await this.upsertPlans([plan])
break
}
case 'plan.deleted': {
Expand All @@ -217,7 +266,10 @@ export class StripeSync {
case 'setup_intent.requires_action':
case 'setup_intent.setup_failed':
case 'setup_intent.succeeded': {
const setupIntent = event.data.object as Stripe.SetupIntent
const setupIntent = await this.fetchOrUseWebhookData(
event.data.object as Stripe.SetupIntent,
(id) => this.stripe.setupIntents.retrieve(id)
)

this.config.logger?.info(
`Received webhook ${event.id}: ${event.type} for setupIntent ${setupIntent.id}`
Expand All @@ -233,7 +285,10 @@ export class StripeSync {
case 'subscription_schedule.expiring':
case 'subscription_schedule.released':
case 'subscription_schedule.updated': {
const subscriptionSchedule = event.data.object as Stripe.SubscriptionSchedule
const subscriptionSchedule = await this.fetchOrUseWebhookData(
event.data.object as Stripe.SubscriptionSchedule,
(id) => this.stripe.subscriptionSchedules.retrieve(id)
)

this.config.logger?.info(
`Received webhook ${event.id}: ${event.type} for subscriptionSchedule ${subscriptionSchedule.id}`
Expand All @@ -246,7 +301,10 @@ export class StripeSync {
case 'payment_method.automatically_updated':
case 'payment_method.detached':
case 'payment_method.updated': {
const paymentMethod = event.data.object as Stripe.PaymentMethod
const paymentMethod = await this.fetchOrUseWebhookData(
event.data.object as Stripe.PaymentMethod,
(id) => this.stripe.paymentMethods.retrieve(id)
)

this.config.logger?.info(
`Received webhook ${event.id}: ${event.type} for paymentMethod ${paymentMethod.id}`
Expand All @@ -260,7 +318,10 @@ export class StripeSync {
case 'charge.dispute.funds_withdrawn':
case 'charge.dispute.updated':
case 'charge.dispute.closed': {
const dispute = event.data.object as Stripe.Dispute
const dispute = await this.fetchOrUseWebhookData(
event.data.object as Stripe.Dispute,
(id) => this.stripe.disputes.retrieve(id)
)

this.config.logger?.info(
`Received webhook ${event.id}: ${event.type} for dispute ${dispute.id}`
Expand All @@ -277,7 +338,10 @@ export class StripeSync {
case 'payment_intent.processing':
case 'payment_intent.requires_action':
case 'payment_intent.succeeded': {
const paymentIntent = event.data.object as Stripe.PaymentIntent
const paymentIntent = await this.fetchOrUseWebhookData(
event.data.object as Stripe.PaymentIntent,
(id) => this.stripe.paymentIntents.retrieve(id)
)

this.config.logger?.info(
`Received webhook ${event.id}: ${event.type} for paymentIntent ${paymentIntent.id}`
Expand All @@ -290,7 +354,10 @@ export class StripeSync {
case 'credit_note.created':
case 'credit_note.updated':
case 'credit_note.voided': {
const creditNote = event.data.object as Stripe.CreditNote
const creditNote = await this.fetchOrUseWebhookData(
event.data.object as Stripe.CreditNote,
(id) => this.stripe.creditNotes.retrieve(id)
)

this.config.logger?.info(
`Received webhook ${event.id}: ${event.type} for creditNote ${creditNote.id}`
Expand All @@ -305,6 +372,19 @@ export class StripeSync {
}
}

private async fetchOrUseWebhookData<T extends { id?: string }>(
entity: T,
fetchFn: (id: string) => Promise<T>
): Promise<T> {
if (!entity.id) return entity

if (this.config.revalidateEntityViaStripeApi) {
return fetchFn(entity.id)
}

return entity
}

async syncSingleEntity(stripeId: string) {
if (stripeId.startsWith('cus_')) {
return this.stripe.customers.retrieve(stripeId).then((it) => {
Expand Down
7 changes: 7 additions & 0 deletions packages/sync-engine/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ export type StripeSyncConfig = {
*/
backfillRelatedEntities?: boolean

/**
* If true, the webhook data is not used and instead the webhook is just a trigger to fetch the entity from Stripe again. This ensures that a race condition with failed webhooks can never accidentally overwrite the data with an older state.
*
* Default: false
*/
revalidateEntityViaStripeApi?: boolean

maxPostgresConnections?: number

logger?: pino.Logger
Expand Down