diff --git a/packages/fastify-app/.env.sample b/packages/fastify-app/.env.sample index f9cf8766e..056c79735 100644 --- a/packages/fastify-app/.env.sample +++ b/packages/fastify-app/.env.sample @@ -32,3 +32,7 @@ BACKFILL_RELATED_ENTITIES=true # optional, default 10 # Max number of connections for the Postgres connection pool, higher value lead to more concurrent queries, but also more load on the database (connections are expensive) MAX_POSTGRES_CONNECTIONS=20 + +# If true, the webhook data is not used and instead the webhook is just a trigger to fetch the entity from Stripe again. This ensures that a race condition with failed webhooks can never accidentally overwrite the data with an older state. +# Default: false +REVALIDATE_ENTITY_VIA_STRIPE_API=false \ No newline at end of file diff --git a/packages/fastify-app/src/utils/config.ts b/packages/fastify-app/src/utils/config.ts index df42ddf09..01af6b66d 100644 --- a/packages/fastify-app/src/utils/config.ts +++ b/packages/fastify-app/src/utils/config.ts @@ -41,6 +41,8 @@ export type StripeSyncServerConfig = { maxPostgresConnections?: number + revalidateEntityViaStripeApi: boolean + port: number } @@ -58,5 +60,7 @@ export function getConfig(): StripeSyncServerConfig { autoExpandLists: getConfigFromEnv('AUTO_EXPAND_LISTS', 'false') === 'true', backfillRelatedEntities: getConfigFromEnv('BACKFILL_RELATED_ENTITIES', 'true') === 'true', maxPostgresConnections: Number(getConfigFromEnv('MAX_POSTGRES_CONNECTIONS', '10')), + revalidateEntityViaStripeApi: + getConfigFromEnv('REVALIDATE_ENTITY_VIA_STRIPE_API', 'false') === 'true', } } diff --git a/packages/sync-engine/src/stripeSync.ts b/packages/sync-engine/src/stripeSync.ts index 71d0c4803..2cb5129e6 100644 --- a/packages/sync-engine/src/stripeSync.ts +++ b/packages/sync-engine/src/stripeSync.ts @@ -72,7 +72,9 @@ export class StripeSync { case 'charge.refunded': case 'charge.succeeded': case 'charge.updated': { - const charge = event.data.object as Stripe.Charge + const charge = await this.fetchOrUseWebhookData(event.data.object as Stripe.Charge, (id) => + this.stripe.charges.retrieve(id) + ) this.config.logger?.info( `Received webhook ${event.id}: ${event.type} for charge ${charge.id}` @@ -84,7 +86,10 @@ export class StripeSync { case 'customer.created': case 'customer.deleted': case 'customer.updated': { - const customer = event.data.object as Stripe.Customer + const customer = await this.fetchOrUseWebhookData( + event.data.object as Stripe.Customer | Stripe.DeletedCustomer, + (id) => this.stripe.customers.retrieve(id) + ) this.config.logger?.info( `Received webhook ${event.id}: ${event.type} for customer ${customer.id}` @@ -101,7 +106,10 @@ export class StripeSync { case 'customer.subscription.trial_will_end': case 'customer.subscription.resumed': case 'customer.subscription.updated': { - const subscription = event.data.object as Stripe.Subscription + const subscription = await this.fetchOrUseWebhookData( + event.data.object as Stripe.Subscription, + (id) => this.stripe.subscriptions.retrieve(id) + ) this.config.logger?.info( `Received webhook ${event.id}: ${event.type} for subscription ${subscription.id}` @@ -112,7 +120,9 @@ export class StripeSync { } case 'customer.tax_id.updated': case 'customer.tax_id.created': { - const taxId = event.data.object as Stripe.TaxId + const taxId = await this.fetchOrUseWebhookData(event.data.object as Stripe.TaxId, (id) => + this.stripe.taxIds.retrieve(id) + ) this.config.logger?.info( `Received webhook ${event.id}: ${event.type} for taxId ${taxId.id}` @@ -144,7 +154,10 @@ export class StripeSync { case 'invoice.voided': case 'invoice.marked_uncollectible': case 'invoice.updated': { - const invoice = event.data.object as Stripe.Invoice + const invoice = await this.fetchOrUseWebhookData( + event.data.object as Stripe.Invoice, + (id) => this.stripe.invoices.retrieve(id) + ) this.config.logger?.info( `Received webhook ${event.id}: ${event.type} for invoice ${invoice.id}` @@ -155,13 +168,25 @@ export class StripeSync { } case 'product.created': case 'product.updated': { - const product = event.data.object as Stripe.Product + try { + const product = await this.fetchOrUseWebhookData( + event.data.object as Stripe.Product, + (id) => this.stripe.products.retrieve(id) + ) - this.config.logger?.info( - `Received webhook ${event.id}: ${event.type} for product ${product.id}` - ) + this.config.logger?.info( + `Received webhook ${event.id}: ${event.type} for product ${product.id}` + ) + + await this.upsertProducts([product]) + } catch (err) { + if (err instanceof Stripe.errors.StripeAPIError && err.code === 'resource_missing') { + await this.deleteProduct(event.data.object.id) + } else { + throw err + } + } - await this.upsertProducts([product]) break } case 'product.deleted': { @@ -176,13 +201,24 @@ export class StripeSync { } case 'price.created': case 'price.updated': { - const price = event.data.object as Stripe.Price + try { + const price = await this.fetchOrUseWebhookData(event.data.object as Stripe.Price, (id) => + this.stripe.prices.retrieve(id) + ) - this.config.logger?.info( - `Received webhook ${event.id}: ${event.type} for price ${price.id}` - ) + this.config.logger?.info( + `Received webhook ${event.id}: ${event.type} for price ${price.id}` + ) + + await this.upsertPrices([price]) + } catch (err) { + if (err instanceof Stripe.errors.StripeAPIError && err.code === 'resource_missing') { + await this.deletePrice(event.data.object.id) + } else { + throw err + } + } - await this.upsertPrices([price]) break } case 'price.deleted': { @@ -197,11 +233,24 @@ export class StripeSync { } case 'plan.created': case 'plan.updated': { - const plan = event.data.object as Stripe.Plan + try { + const plan = await this.fetchOrUseWebhookData(event.data.object as Stripe.Plan, (id) => + this.stripe.plans.retrieve(id) + ) - this.config.logger?.info(`Received webhook ${event.id}: ${event.type} for plan ${plan.id}`) + this.config.logger?.info( + `Received webhook ${event.id}: ${event.type} for plan ${plan.id}` + ) + + await this.upsertPlans([plan]) + } catch (err) { + if (err instanceof Stripe.errors.StripeAPIError && err.code === 'resource_missing') { + await this.deletePlan(event.data.object.id) + } else { + throw err + } + } - await this.upsertPlans([plan]) break } case 'plan.deleted': { @@ -217,7 +266,10 @@ export class StripeSync { case 'setup_intent.requires_action': case 'setup_intent.setup_failed': case 'setup_intent.succeeded': { - const setupIntent = event.data.object as Stripe.SetupIntent + const setupIntent = await this.fetchOrUseWebhookData( + event.data.object as Stripe.SetupIntent, + (id) => this.stripe.setupIntents.retrieve(id) + ) this.config.logger?.info( `Received webhook ${event.id}: ${event.type} for setupIntent ${setupIntent.id}` @@ -233,7 +285,10 @@ export class StripeSync { case 'subscription_schedule.expiring': case 'subscription_schedule.released': case 'subscription_schedule.updated': { - const subscriptionSchedule = event.data.object as Stripe.SubscriptionSchedule + const subscriptionSchedule = await this.fetchOrUseWebhookData( + event.data.object as Stripe.SubscriptionSchedule, + (id) => this.stripe.subscriptionSchedules.retrieve(id) + ) this.config.logger?.info( `Received webhook ${event.id}: ${event.type} for subscriptionSchedule ${subscriptionSchedule.id}` @@ -246,7 +301,10 @@ export class StripeSync { case 'payment_method.automatically_updated': case 'payment_method.detached': case 'payment_method.updated': { - const paymentMethod = event.data.object as Stripe.PaymentMethod + const paymentMethod = await this.fetchOrUseWebhookData( + event.data.object as Stripe.PaymentMethod, + (id) => this.stripe.paymentMethods.retrieve(id) + ) this.config.logger?.info( `Received webhook ${event.id}: ${event.type} for paymentMethod ${paymentMethod.id}` @@ -260,7 +318,10 @@ export class StripeSync { case 'charge.dispute.funds_withdrawn': case 'charge.dispute.updated': case 'charge.dispute.closed': { - const dispute = event.data.object as Stripe.Dispute + const dispute = await this.fetchOrUseWebhookData( + event.data.object as Stripe.Dispute, + (id) => this.stripe.disputes.retrieve(id) + ) this.config.logger?.info( `Received webhook ${event.id}: ${event.type} for dispute ${dispute.id}` @@ -277,7 +338,10 @@ export class StripeSync { case 'payment_intent.processing': case 'payment_intent.requires_action': case 'payment_intent.succeeded': { - const paymentIntent = event.data.object as Stripe.PaymentIntent + const paymentIntent = await this.fetchOrUseWebhookData( + event.data.object as Stripe.PaymentIntent, + (id) => this.stripe.paymentIntents.retrieve(id) + ) this.config.logger?.info( `Received webhook ${event.id}: ${event.type} for paymentIntent ${paymentIntent.id}` @@ -290,7 +354,10 @@ export class StripeSync { case 'credit_note.created': case 'credit_note.updated': case 'credit_note.voided': { - const creditNote = event.data.object as Stripe.CreditNote + const creditNote = await this.fetchOrUseWebhookData( + event.data.object as Stripe.CreditNote, + (id) => this.stripe.creditNotes.retrieve(id) + ) this.config.logger?.info( `Received webhook ${event.id}: ${event.type} for creditNote ${creditNote.id}` @@ -305,6 +372,19 @@ export class StripeSync { } } + private async fetchOrUseWebhookData( + entity: T, + fetchFn: (id: string) => Promise + ): Promise { + if (!entity.id) return entity + + if (this.config.revalidateEntityViaStripeApi) { + return fetchFn(entity.id) + } + + return entity + } + async syncSingleEntity(stripeId: string) { if (stripeId.startsWith('cus_')) { return this.stripe.customers.retrieve(stripeId).then((it) => { diff --git a/packages/sync-engine/src/types.ts b/packages/sync-engine/src/types.ts index c8d32a5b4..dc5cc7f76 100644 --- a/packages/sync-engine/src/types.ts +++ b/packages/sync-engine/src/types.ts @@ -28,6 +28,13 @@ export type StripeSyncConfig = { */ backfillRelatedEntities?: boolean + /** + * If true, the webhook data is not used and instead the webhook is just a trigger to fetch the entity from Stripe again. This ensures that a race condition with failed webhooks can never accidentally overwrite the data with an older state. + * + * Default: false + */ + revalidateEntityViaStripeApi?: boolean + maxPostgresConnections?: number logger?: pino.Logger