Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
238 changes: 238 additions & 0 deletions packages/core/src/composite/bulk-enrich-status.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
import type { LeadbayClient } from "../client.js";
import type { Tool, ToolContext } from "../types.js";
import { getContacts } from "../tools/get-contacts.js";
import { isValidBulkId, type BulkRecord } from "../jobs/bulk-store.js";

interface BulkEnrichStatusParams {
bulk_id: string;
include_contacts?: boolean;
}

// Keep concurrency in step with LeadbayClient.MAX_CONCURRENT (client.ts:17).
// Client semaphore is the real rate limit; composite concurrency above the cap
// is cosmetic and starves other tools.
const STATUS_FETCH_CONCURRENCY = 5;

async function pMap<T, R>(
items: T[],
fn: (item: T, idx: number) => Promise<R>,
concurrency: number
): Promise<R[]> {
const out = new Array<R>(items.length);
let next = 0;
async function worker() {
while (true) {
const i = next++;
if (i >= items.length) return;
out[i] = await fn(items[i], i);
}
}
await Promise.all(
Array.from({ length: Math.min(concurrency, items.length) }, () => worker())
);
return out;
}

export const bulkEnrichStatus: Tool<BulkEnrichStatusParams> = {
name: "leadbay_bulk_enrich_status",
description:
"Check status + per-lead contacts for a bulk enrichment you previously launched via leadbay_enrich_titles. " +
"Returns the bulk_id, progress per lead (done/total enrichable contacts), and overall progress. " +
"When include_contacts=true (opt-in), includes each contact's email/phone/job_title/enrichment.done. " +
"When to use: poll this after leadbay_enrich_titles returns a bulk_id. Default include_contacts=false for cheap " +
"status polls; set include_contacts=true once all_done flips for the final read. " +
"When NOT to use: as a substitute for leadbay_research_lead — that already includes enriched contacts for a single lead.",
inputSchema: {
type: "object",
properties: {
bulk_id: {
type: "string",
description:
"UUIDv4 returned by leadbay_enrich_titles at launch time. Required.",
},
include_contacts: {
type: "boolean",
description:
"If true, return the full contact list per lead (email, phone, enrichment.done). Default false — cheap status polls.",
},
},
required: ["bulk_id"],
},
execute: async (
client: LeadbayClient,
params: BulkEnrichStatusParams,
ctx?: ToolContext
) => {
// Strict UUIDv4 validation BEFORE any disk read — path-traversal / LFI defense.
if (!isValidBulkId(params.bulk_id)) {
return {
error: true,
code: "BULK_INVALID_ID",
message: "bulk_id is not a valid UUIDv4",
hint: "Pass the bulk_id returned by leadbay_enrich_titles verbatim.",
};
}

if (!ctx?.bulkTracker) {
return {
error: true,
code: "BULK_TRACKER_UNAVAILABLE",
message: "No BulkTracker configured on this MCP instance",
hint:
"This composite requires a BulkTracker in ToolContext. Upgrade to @leadbay/mcp ≥0.3.0 or run with LEADBAY_BULK_STORE_ALLOW_MEMORY=1.",
};
}

const includeContacts = params.include_contacts ?? false;

const startMs = Date.now();

let record: BulkRecord | undefined;
try {
record = await ctx.bulkTracker.get(params.bulk_id);
} catch (err: any) {
return {
error: true,
code: "BULK_STORE_UNAVAILABLE",
message: `Bulk store read failed: ${err?.message ?? err}`,
hint:
"Check the file at $LEADBAY_BULK_STORE_PATH (default ~/.leadbay/bulks.json). " +
"Set LEADBAY_BULK_STORE_ALLOW_MEMORY=1 to fall back to in-memory storage on startup (handles won't survive restart).",
};
}

if (!record) {
return {
error: true,
code: "BULK_NOT_FOUND",
message: "No bulk record for that bulk_id",
hint:
"The record may have aged out (30-day TTL) or the MCP process was restarted without persistence. " +
"Launch a new enrichment via leadbay_enrich_titles.",
};
}

if (record.status === "pending") {
return {
error: true,
code: "BULK_PENDING",
message:
"Bulk is in 'pending' state — the launch is in flight or the MCP crashed between launch and ack.",
hint:
"Retry leadbay_bulk_enrich_status in a few seconds. If it persists >60s, relaunch via leadbay_enrich_titles.",
bulk_id: record.bulk_id,
launched_at: record.launched_at,
};
}

if (record.status === "failed") {
return {
error: true,
code: "BULK_LAUNCH_FAILED",
message:
"The original /enrichment/launch POST failed; no backend enrichment was ordered.",
hint:
"Call leadbay_enrich_titles again — the failed record won't block a fresh launch.",
bulk_id: record.bulk_id,
launched_at: record.launched_at,
};
}

// record.status === "launched" — fetch per-lead contacts.
type Ok = {
kind: "ok";
lead_id: string;
done: number;
total: number;
contacts?: any[];
};
type Fail = {
kind: "fail";
lead_id: string;
code: string;
retry_after?: number;
};

const results = await pMap<string, Ok | Fail>(
record.lead_ids,
async (leadId) => {
try {
const out: any = await getContacts.execute(client, { leadId });
const contacts: any[] = Array.isArray(out?.contacts) ? out.contacts : [];
const enrichable = contacts.filter((c) => c && c.enrichment);
const done = enrichable.filter((c) => c.enrichment?.done === true).length;
const total = enrichable.length;
return {
kind: "ok",
lead_id: leadId,
done,
total,
contacts: includeContacts ? contacts : undefined,
};
} catch (err: any) {
return {
kind: "fail",
lead_id: leadId,
code: err?.code ?? "UNKNOWN",
retry_after: err?._meta?.retry_after,
};
}
},
STATUS_FETCH_CONCURRENCY
);

const leads: any[] = [];
const partialFailures: Array<{
lead_id: string;
code: string;
retry_after?: number;
}> = [];
let totalDone = 0;
let totalAll = 0;
for (const r of results) {
if (r.kind === "fail") {
partialFailures.push({
lead_id: r.lead_id,
code: r.code,
...(r.retry_after !== undefined ? { retry_after: r.retry_after } : {}),
});
continue;
}
leads.push({
lead_id: r.lead_id,
...(r.contacts ? { contacts: r.contacts } : {}),
enrichment_progress: { done: r.done, total: r.total },
});
totalDone += r.done;
totalAll += r.total;
}

const overallProgress = {
done: totalDone,
total: totalAll,
done_ratio: totalAll === 0 ? 0 : totalDone / totalAll,
};
const allDone = totalAll > 0 && totalDone === totalAll && partialFailures.length === 0;

ctx?.logger?.info?.(
`bulk.status_checked bulk_id=${record.bulk_id} done=${totalDone} total=${totalAll} wall_ms=${
Date.now() - startMs
}`
);

return {
bulk_id: record.bulk_id,
launched_at: record.launched_at,
status: record.status,
durability: record.durability,
titles: record.titles,
email: record.email,
phone: record.phone,
lens_id: record.lens_id,
leads,
overall_progress: overallProgress,
all_done: allDone,
...(partialFailures.length > 0 ? { partial_failures: partialFailures } : {}),
};
},
};
114 changes: 108 additions & 6 deletions packages/core/src/composite/enrich-titles.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,16 @@ export const enrichTitles: Tool<EnrichTitlesParams> = {
};
}

const explicitLeadIds = params.leadIds && params.leadIds.length > 0;
const selectionSource: "explicit" | "wishlist" = explicitLeadIds
? "explicit"
: "wishlist";
// Resolve lens_id once so bulkTracker gets it regardless of which branch
// populates leadIds.
const lensId = params.lensId ?? (await client.resolveDefaultLens());

let leadIds = params.leadIds;
if (!leadIds || leadIds.length === 0) {
const lensId = params.lensId ?? (await client.resolveDefaultLens());
const cnt = params.candidateCount ?? DEFAULT_CANDIDATE_COUNT;
const wish = await client.request<WishlistResponse>(
"GET",
Expand Down Expand Up @@ -186,13 +193,73 @@ export const enrichTitles: Tool<EnrichTitlesParams> = {
};
}

// Two-phase launch: reserve a bulk slot via tracker BEFORE POSTing to
// /launch. findOrCreatePending is atomic; if an identical bulk was
// launched within the idempotency window, short-circuit without
// spending quota. If the tracker is absent (e.g. legacy OpenClaw
// deployment), fall through to the raw launch without tracking.
const tracker = ctx?.bulkTracker;
let bulkRecord:
| { bulk_id: string; launched_at: string; durability: "file" | "memory" }
| undefined;
let bulkReused = false;
let bulkSecondsSinceOriginal: number | undefined;
if (tracker) {
const res = await tracker.findOrCreatePending({
lead_ids: leadIds,
titles: params.titles,
email,
phone,
lens_id: lensId,
selection_source: selectionSource,
});
bulkRecord = {
bulk_id: res.record.bulk_id,
launched_at: res.record.launched_at,
durability: res.record.durability,
};
bulkReused = res.reused;
bulkSecondsSinceOriginal = res.seconds_since_original;

if (bulkReused && res.record.status !== "failed") {
// Skip /launch — quota preserved. The original launch's record is
// reused verbatim so the agent polls the same bulk_id.
return {
mode: "already_launched",
re_used: true,
bulk_id: res.record.bulk_id,
launched_at: res.record.launched_at,
durability: res.record.durability,
seconds_since_original_launch: bulkSecondsSinceOriginal ?? 0,
titles: params.titles,
email,
phone,
preview,
message:
"No new enrichment was ordered; quota not spent. An identical bulk was launched " +
`${bulkSecondsSinceOriginal ?? 0}s ago. Poll leadbay_bulk_enrich_status with this bulk_id for results.`,
next_action:
"Call leadbay_bulk_enrich_status({bulk_id}) to check progress; include_contacts=true for the final read.",
};
}
}

try {
await client.requestVoid(
"POST",
"/leads/selection/enrichment/launch",
{ titles: params.titles, email, phone }
);
} catch (err: any) {
if (bulkRecord && tracker) {
try {
await tracker.markFailed(bulkRecord.bulk_id);
} catch (e: any) {
ctx?.logger?.warn?.(
`enrich_titles: tracker.markFailed failed: ${e?.message ?? e}`
);
}
}
if (err?.code === "QUOTA_EXCEEDED") {
return {
status: "quota_exceeded",
Expand All @@ -204,18 +271,53 @@ export const enrichTitles: Tool<EnrichTitlesParams> = {
throw err;
}

if (bulkRecord && tracker) {
try {
await tracker.markLaunched(bulkRecord.bulk_id);
} catch (e: any) {
// Launch already succeeded on the backend; flipping the tracker
// status failed. Return BULK_PENDING signal in the payload so the
// agent knows the handle is in flight.
ctx?.logger?.warn?.(
`enrich_titles: tracker.markLaunched failed: ${e?.message ?? e}`
);
return {
mode: "launched_tracker_pending",
launched: true,
preview,
bulk_id: bulkRecord.bulk_id,
launched_at: bulkRecord.launched_at,
durability: bulkRecord.durability,
titles: params.titles,
email,
phone,
message:
"Enrichment job launched on the backend, but the local tracker record could not be flipped to 'launched'. " +
"The bulk_id is still valid — leadbay_bulk_enrich_status will return status:'pending' until the tracker heals.",
next_action:
"Wait ~60s, then call leadbay_bulk_enrich_status({bulk_id}). If it persists, restart the MCP.",
};
}
}

return {
mode: "launched",
preview,
launched: true,
titles: params.titles,
email,
phone,
message:
"Enrichment job launched. The Leadbay backend does not return a bulk_id (probed 2026-04-20) — " +
"track results by polling individual leads via leadbay_get_contacts after ~60s; contact.enrichment.done flips to true.",
next_action:
"Wait ~60s, then call leadbay_research_lead or leadbay_get_contacts on the leads you care about.",
bulk_id: bulkRecord?.bulk_id,
launched_at: bulkRecord?.launched_at,
durability: bulkRecord?.durability,
message: bulkRecord
? "Enrichment job launched. Backend has no server-side bulk_id yet; MCP minted a client-side bulk_id " +
"(persisted to disk by default) so you can poll via leadbay_bulk_enrich_status."
: "Enrichment job launched. No bulk_id tracker configured — poll leadbay_get_contacts per lead " +
"after ~60s; contact.enrichment.done flips to true.",
next_action: bulkRecord
? "Call leadbay_bulk_enrich_status({bulk_id}) after ~60s; pass include_contacts=true for the final read."
: "Wait ~60s, then call leadbay_research_lead or leadbay_get_contacts on the leads you care about.",
};
} finally {
// Always clear, but never re-throw from finally (would mask the
Expand Down
Loading
Loading