TypeScript libraries for backend-agnostic batch processing: process any collection at scale with validation, concurrency control, pause/resume, and distributed workers.
Use it for data imports, mass notifications, batch API calls, health checks, migrations — anything that processes records one by one at scale.
- Process any data source: files (CSV/JSON/XML), arrays, async iterables, database cursors
- Built-in concurrency control, retries with exponential backoff, and error boundaries per record
- Pause, resume, abort, and restore flows out of the box
- Distributed mode with atomic batch claiming for multi-worker execution
- Rich lifecycle events and hooks for observability and data enrichment
- Schema validation + preview for import-specific workflows
- Pluggable state stores and parsers so you can adapt to existing infrastructure
| Package | Purpose | npm |
|---|---|---|
@batchactions/core |
Core batch engine, state model, events, sources, and state stores | npm |
@batchactions/import |
High-level import facade with schema validation + CSV/JSON/XML parsers | npm |
@batchactions/distributed |
Multi-worker orchestration for distributed processing | npm |
@batchactions/state-sequelize |
Sequelize adapter for StateStore and DistributedStateStore |
npm |
@batchactions/state-prisma |
Prisma v6/v7 adapter for StateStore and DistributedStateStore |
npm |
npm install @batchactions/core @batchactions/importAdd these when needed:
npm install @batchactions/distributed
# Sequelize state store
npm install @batchactions/state-sequelize sequelize
# Prisma state store
npm install @batchactions/state-prisma- Start with
@batchactions/corefor any batch processing workflow (in-memory data, custom sources, full control) - Add
@batchactions/importfor CSV/JSON/XML import workflows with schema validation and preview - Add
@batchactions/distributedwhen one process is not enough - Add
@batchactions/state-sequelizefor SQL-backed state with Sequelize - Add
@batchactions/state-prismafor SQL-backed state with Prisma (v6 or v7)
Process any collection — database results, API responses, queued items — with concurrency, retries, and full observability:
import { BatchEngine } from '@batchactions/core';
const accounts = await db.accounts.findAll({ where: { status: 'active' } });
const engine = new BatchEngine({
batchSize: 50,
maxConcurrentBatches: 4,
continueOnError: true,
maxRetries: 2,
retryDelayMs: 1000,
});
engine.fromRecords(accounts);
engine.on('job:progress', (e) => console.log(`${e.progress.percentage}% done`));
engine.on('record:failed', (e) => console.error(`Record ${e.index} failed: ${e.error}`));
await engine.start(async (record) => {
await messagingGateway.send({
channel: record.preferredChannel,
to: record.contactInfo,
template: 'monthly-report',
});
});
const status = engine.getStatus();
console.log(`Sent: ${status.progress.processedRecords}, Failed: ${status.progress.failedRecords}`);For file-based imports with schema validation and preview:
import { BulkImport, CsvParser, BufferSource } from '@batchactions/import';
const importer = new BulkImport({
schema: {
fields: [
{ name: 'email', type: 'email', required: true },
{ name: 'name', type: 'string', required: true },
],
},
batchSize: 500,
continueOnError: true,
});
importer.from(new BufferSource('email,name\nuser@example.com,Ada'), new CsvParser());
const preview = await importer.preview(10);
console.log(preview.validRecords.length, preview.invalidRecords.length);
await importer.start(async (record) => {
await db.users.insert(record);
});- Use queue-first tools (BullMQ, Agenda, Bree) when your main need is generic background jobs with scheduling and priorities.
- Use
@batchactionswhen you need structured batch processing with per-record error tracking, pause/resume, retries, concurrency control, lifecycle events, and optional schema validation — whether for imports, notifications, migrations, or any record-by-record workflow.
- Process any data: in-memory arrays, async iterables, CSV/JSON/XML files, streams, URLs
- Batch processing with configurable size and concurrency (
maxConcurrentBatches) - Per-record retries with exponential backoff
- Pause, resume, abort, and restore flows
- Rich lifecycle events (
job:*,batch:*,record:*) and hooks - Schema validation, transforms, and preview (via
@batchactions/import) - Serverless-friendly chunk processing (
processChunk) - Distributed worker mode with atomic batch claiming
- Pluggable architecture (sources, parsers, state stores)
- Root contributing guide:
CONTRIBUTING.md - GitHub positioning checklist:
.github/GITHUB_POSITIONING_CHECKLIST.md - Release template:
.github/RELEASE_TEMPLATE.md - End-to-end example:
examples/basic-csv-import @batchactions/core:packages/core/README.md@batchactions/import:packages/import/README.md@batchactions/distributed:packages/distributed/README.md@batchactions/state-sequelize:packages/state-sequelize/README.md@batchactions/state-prisma:packages/state-prisma/README.md
- Node.js >= 20.0.0
- TypeScript >= 5.0 (if using TypeScript)
MIT