From cd73a184965876ec5f79f16e630bebeb114b6cf5 Mon Sep 17 00:00:00 2001 From: "seer-by-sentry[bot]" <157164994+seer-by-sentry[bot]@users.noreply.github.com> Date: Wed, 10 Jun 2026 21:23:04 +0000 Subject: [PATCH] fix: Implement RSS feed circuit breaker --- .../0012_add_source_failure_tracking.sql | 5 ++ packages/api/src/db/schema.ts | 4 + packages/api/src/services/rss-fetcher.ts | 73 +++++++++++++++++-- 3 files changed, 76 insertions(+), 6 deletions(-) create mode 100644 packages/api/drizzle/0012_add_source_failure_tracking.sql diff --git a/packages/api/drizzle/0012_add_source_failure_tracking.sql b/packages/api/drizzle/0012_add_source_failure_tracking.sql new file mode 100644 index 0000000..e4e84e5 --- /dev/null +++ b/packages/api/drizzle/0012_add_source_failure_tracking.sql @@ -0,0 +1,5 @@ +-- Add failure tracking columns to sources table for circuit breaker pattern +ALTER TABLE `sources` ADD `consecutive_failures` integer NOT NULL DEFAULT 0; +ALTER TABLE `sources` ADD `last_error_at` integer; +ALTER TABLE `sources` ADD `fetch_disabled_at` integer; +CREATE INDEX `idx_sources_fetch_disabled_at` ON `sources` (`fetch_disabled_at`); \ No newline at end of file diff --git a/packages/api/src/db/schema.ts b/packages/api/src/db/schema.ts index 817be76..3579e3d 100644 --- a/packages/api/src/db/schema.ts +++ b/packages/api/src/db/schema.ts @@ -135,6 +135,9 @@ export const sources = sqliteTable( }).default("auto"), iconUpdatedAt: integer("icon_updated_at", { mode: "timestamp" }), lastFetched: integer("last_fetched", { mode: "timestamp" }), + consecutiveFailures: integer("consecutive_failures").notNull().default(0), + lastErrorAt: integer("last_error_at", { mode: "timestamp" }), + fetchDisabledAt: integer("fetch_disabled_at", { mode: "timestamp" }), createdAt: integer("created_at", { mode: "timestamp" }) .notNull() .$defaultFn(() => new Date()), @@ -146,6 +149,7 @@ export const sources = sqliteTable( index("idx_sources_url").on(table.url), index("idx_sources_icon_url").on(table.iconUrl), index("idx_sources_last_fetched").on(table.lastFetched), + index("idx_sources_fetch_disabled_at").on(table.fetchDisabledAt), ] ); diff --git a/packages/api/src/services/rss-fetcher.ts b/packages/api/src/services/rss-fetcher.ts index 95aedd8..b5358f9 100644 --- a/packages/api/src/services/rss-fetcher.ts +++ b/packages/api/src/services/rss-fetcher.ts @@ -10,7 +10,7 @@ import { parseFeed } from "feedsmith"; import type { Rss, Atom, Rdf, Json } from "@/types/feed"; import type { Database } from "@/db/client"; import * as schema from "@/db/schema"; -import { and, eq, inArray, or, isNull, lt } from "drizzle-orm"; +import { and, eq, inArray, or, isNull, lt, sql } from "drizzle-orm"; import { extractOgImage } from "@/utils/og-image-fetcher"; import { sanitizeHtml, @@ -59,6 +59,12 @@ const STALENESS_DEFAULTS = { batchSize: 20, // Feeds per batch (optimized for D1 query limits) } as const; +/** Circuit breaker configuration for persistently-failing feeds */ +const CIRCUIT_BREAKER = { + failureThreshold: 10, // Disable feed after this many consecutive failures + cooldownHours: 24, // Re-enable disabled feeds after this many hours +} as const; + // ============================================================================= // Types // ============================================================================= @@ -116,17 +122,30 @@ function buildStalenessWhereClause(staleThreshold: Date) { } /** - * Get stale sources that need fetching + * Get stale sources that need fetching, skipping circuit-broken feeds + * unless their cooldown period has elapsed. */ async function getStaleSources( db: Database, staleThreshold: Date, limit: number ) { + const cooldownCutoff = new Date( + Date.now() - CIRCUIT_BREAKER.cooldownHours * 60 * 60 * 1000 + ); + + // Include sources that are: + // (a) not disabled (fetchDisabledAt IS NULL), OR + // (b) disabled but cooldown has elapsed (fetchDisabledAt < cooldownCutoff) + const notDisabledOrCooledDown = or( + isNull(schema.sources.fetchDisabledAt), + lt(schema.sources.fetchDisabledAt, cooldownCutoff) + ); + return await db .select() .from(schema.sources) - .where(buildStalenessWhereClause(staleThreshold)) + .where(and(buildStalenessWhereClause(staleThreshold), notDisabledOrCooledDown)) .orderBy(schema.sources.lastFetched) .limit(limit); } @@ -469,6 +488,41 @@ export async function fetchSingleFeed( domain: extractDomain(feedUrl) || "unknown", }); + // Circuit breaker: increment consecutive failure count and disable + // the source if it exceeds the threshold. + try { + const now = new Date(); + // Increment consecutive_failures atomically and fetch new value + const updated = await db + .update(schema.sources) + .set({ + consecutiveFailures: sql`${schema.sources.consecutiveFailures} + 1`, + lastErrorAt: now, + lastFetched: now, // Advance lastFetched so the source isn't immediately re-queued + }) + .where(eq(schema.sources.id, sourceId)) + .returning({ consecutiveFailures: schema.sources.consecutiveFailures }); + + const newCount = updated[0]?.consecutiveFailures ?? 0; + + if (newCount >= CIRCUIT_BREAKER.failureThreshold) { + await db + .update(schema.sources) + .set({ fetchDisabledAt: now }) + .where(eq(schema.sources.id, sourceId)); + + console.warn( + `⚡ Circuit breaker opened for source ${sourceId} (${feedUrl}) after ${newCount} consecutive failures` + ); + emitCounter("rss.feed_circuit_broken", 1, { + domain: extractDomain(feedUrl) || "unknown", + }); + } + } catch (dbError) { + // Don't let failure tracking errors mask the original fetch error + console.error("Failed to update failure tracking for source:", sourceId, dbError); + } + // Error already captured in specific places, re-throw throw error; } @@ -552,19 +606,26 @@ async function updateSourceMetadata( updates.iconUpdatedAt = new Date(); } + // Always reset circuit breaker on a successful fetch + const circuitBreakerReset: Partial = { + consecutiveFailures: 0, + lastErrorAt: null, + fetchDisabledAt: null, + }; + // Only update if we have something to update (beyond lastFetched) if (Object.keys(updates).length > 1) { await db .update(schema.sources) - .set(updates) + .set({ ...updates, ...circuitBreakerReset }) .where(eq(schema.sources.id, sourceId)); return true; } - // Just update lastFetched + // Just update lastFetched and reset circuit breaker await db .update(schema.sources) - .set({ lastFetched: new Date() }) + .set({ lastFetched: new Date(), ...circuitBreakerReset }) .where(eq(schema.sources.id, sourceId)); return false; }