Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
"prepare": "husky"
},
"devDependencies": {
"@apify/eslint-config": "^1.0.0",
"@apify/eslint-config": "^2.0.0",
"@apify/log": "^2.4.0",
"@apify/tsconfig": "^0.1.0",
"@biomejs/biome": "^2.3.11",
Expand Down
22 changes: 10 additions & 12 deletions packages/basic-crawler/src/internals/basic-crawler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1202,7 +1202,6 @@ export class BasicCrawler<Context extends CrawlingContext = BasicCrawlingContext
const requestLimit = this.calculateEnqueuedRequestLimit();

const skippedBecauseOfRobots = new Set<string>();
const skippedBecauseOfLimit = new Set<string>();
const skippedBecauseOfMaxCrawlDepth = new Set<string>();

const isAllowedBasedOnRobotsTxtFile = this.isAllowedBasedOnRobotsTxtFile.bind(this);
Expand All @@ -1216,31 +1215,29 @@ export class BasicCrawler<Context extends CrawlingContext = BasicCrawlingContext
);

async function* filteredRequests() {
let yieldedRequestCount = 0;

for await (const request of requests) {
const url = typeof request === 'string' ? request : request.url!;

if (requestLimit !== undefined && yieldedRequestCount >= requestLimit) {
skippedBecauseOfLimit.add(url);
continue;
}

if (maxCrawlDepth !== undefined && (request as any).crawlDepth > maxCrawlDepth) {
skippedBecauseOfMaxCrawlDepth.add(url);
continue;
}

if (await isAllowedBasedOnRobotsTxtFile(url)) {
yield request;
yieldedRequestCount += 1;
} else {
skippedBecauseOfRobots.add(url);
}
}
}

const result = await this.requestManager!.addRequestsBatched(filteredRequests(), options);
const result = await this.requestManager!.addRequestsBatched(filteredRequests(), {
...options,
maxNewRequests: requestLimit,
});

// Report requests skipped due to the maxNewRequests budget (i.e. maxRequestsPerCrawl limit)
const skippedBecauseOfLimit = result.requestsOverLimit ?? [];

if (skippedBecauseOfRobots.size > 0) {
this.log.warning(`Some requests were skipped because they were disallowed based on the robots.txt file`, {
Expand All @@ -1250,7 +1247,7 @@ export class BasicCrawler<Context extends CrawlingContext = BasicCrawlingContext

if (
skippedBecauseOfRobots.size > 0 ||
skippedBecauseOfLimit.size > 0 ||
skippedBecauseOfLimit.length > 0 ||
skippedBecauseOfMaxCrawlDepth.size > 0
) {
await Promise.all(
Expand All @@ -1259,7 +1256,8 @@ export class BasicCrawler<Context extends CrawlingContext = BasicCrawlingContext
return this.handleSkippedRequest({ url, reason: 'robotsTxt' });
})
.concat(
[...skippedBecauseOfLimit].map((url) => {
skippedBecauseOfLimit.map((request) => {
const url = typeof request === 'string' ? request : request.url!;
return this.handleSkippedRequest({ url, reason: 'limit' });
}),
[...skippedBecauseOfMaxCrawlDepth].map((url) => {
Expand Down
16 changes: 9 additions & 7 deletions packages/core/src/enqueue_links/enqueue_links.ts
Original file line number Diff line number Diff line change
Expand Up @@ -488,17 +488,19 @@ export async function enqueueLinks(
return filtered;
}

let requests = await createFilteredRequests();
if (typeof limit === 'number' && limit < requests.length) {
await reportSkippedRequests(requests.slice(limit), 'enqueueLimit');
requests = requests.slice(0, limit);
}

const { addedRequests } = await requestQueue.addRequestsBatched(requests, {
const { addedRequests, requestsOverLimit } = await requestQueue.addRequestsBatched(await createFilteredRequests(), {
forefront,
waitForAllRequestsToBeAdded,
maxNewRequests: limit,
});

if (requestsOverLimit?.length !== undefined && requestsOverLimit.length > 0) {
await reportSkippedRequests(
requestsOverLimit.map((r) => ({ url: typeof r === 'string' ? r : r.url! })),
'enqueueLimit',
);
}

return { processedRequests: addedRequests, unprocessedRequests: [] };
}

Expand Down
98 changes: 80 additions & 18 deletions packages/core/src/storages/request_provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,7 @@ export abstract class RequestProvider implements IStorage, IRequestManager {
waitForAllRequestsToBeAdded: ow.optional.boolean,
batchSize: ow.optional.number,
waitBetweenBatchesMillis: ow.optional.number,
maxNewRequests: ow.optional.number,
}),
);

Expand Down Expand Up @@ -454,9 +455,21 @@ export abstract class RequestProvider implements IStorage, IRequestManager {
}
}

const { batchSize = 1000, waitBetweenBatchesMillis = 1000 } = options;
const { batchSize = 1000, waitBetweenBatchesMillis = 1000, maxNewRequests } = options;

const chunks = peekableAsyncIterable(chunkedAsyncIterable(generateRequests(), batchSize));
let remainingBudget = maxNewRequests ?? Infinity;
const requestsOverLimit: Source[] = [];

// If there's a limit on the number of added requests, do not send batches bigger than the limit
const effectiveChunkSize =
maxNewRequests !== undefined ? () => Math.min(batchSize, remainingBudget) : batchSize;

// Hold onto the underlying iterator so we can drain leftovers from it in buildResult
const requestIterator = generateRequests();

const chunks = peekableAsyncIterable(
chunkedAsyncIterable(requestIterator, effectiveChunkSize) as AsyncIterable<Source[]>,
);
const chunksIterator = chunks[Symbol.asyncIterator]();

const attemptToAddToQueueAndAddAnyUnprocessed = async (providedRequests: Source[], cache = true) => {
Expand All @@ -480,30 +493,63 @@ export abstract class RequestProvider implements IStorage, IRequestManager {
return resultsToReturn;
};

// Add initial batch of `batchSize` to process them right away
/**
* Process a chunk: send it to the queue, then update the remaining budget if maxNewRequests is active.
*/
const processChunk = async (chunk: Source[], cache = true) => {
const results = await attemptToAddToQueueAndAddAnyUnprocessed(chunk, cache);

if (maxNewRequests !== undefined) {
remainingBudget -= results.filter((r) => !r.wasAlreadyPresent).length;
}

return results;
};

/**
* Build the final result. When maxNewRequests is set, drains any remaining items
* from the underlying request iterator into requestsOverLimit.
*
* We accept the iterator explicitly (rather than closing over it) to make it obvious
* that this is the *same* iterator that `chunkedAsyncIterable` has been consuming —
* so only unconsumed items are drained. We drain `requestIterator` (not `chunks`)
* because `chunkedAsyncIterable` stops yielding when the budget-based chunk size
* drops to 0, leaving unconsumed items in the underlying iterator.
*/
const buildResult = async (
addedRequests: ProcessedRequest[],
waitForAllRequestsToBeAdded: Promise<ProcessedRequest[]>,
unconsumedIterator: AsyncGenerator<RequestOptions>,
): Promise<AddRequestsBatchedResult> => {
if (maxNewRequests !== undefined) {
for await (const request of unconsumedIterator) {
requestsOverLimit.push(request);
}
}

return { addedRequests, waitForAllRequestsToBeAdded, requestsOverLimit };
};

// Add initial batch to process right away
const initialChunk = await chunksIterator.peek();
if (initialChunk === undefined) {
return { addedRequests: [], waitForAllRequestsToBeAdded: Promise.resolve([]) };
return buildResult([], Promise.resolve([]), requestIterator);
}

const addedRequests = await attemptToAddToQueueAndAddAnyUnprocessed(initialChunk);
const addedRequests = await processChunk(initialChunk);
await chunksIterator.next();

// If we have no more requests to add, return immediately
// If we have no more requests to add (either exhausted or budget hit), return immediately
if ((await chunksIterator.peek()) === undefined) {
return {
addedRequests,
waitForAllRequestsToBeAdded: Promise.resolve([]),
};
return buildResult(addedRequests, Promise.resolve([]), requestIterator);
}

// eslint-disable-next-line no-async-promise-executor
const promise = new Promise<ProcessedRequest[]>(async (resolve) => {
const finalAddedRequests: ProcessedRequest[] = [];

for await (const requestChunk of chunks) {
finalAddedRequests.push(...(await attemptToAddToQueueAndAddAnyUnprocessed(requestChunk, false)));

finalAddedRequests.push(...(await processChunk(requestChunk, false)));
await sleep(waitBetweenBatchesMillis);
}

Expand All @@ -515,15 +561,12 @@ export abstract class RequestProvider implements IStorage, IRequestManager {
this.inProgressRequestBatchCount -= 1;
});

// If the user wants to wait for all the requests to be added, we wait for the promise to resolve for them
if (options.waitForAllRequestsToBeAdded) {
// When maxNewRequests is set, we must wait for all batches so we can accurately report skipped requests.
if (options.waitForAllRequestsToBeAdded || maxNewRequests !== undefined) {
addedRequests.push(...(await promise));
}

return {
addedRequests,
waitForAllRequestsToBeAdded: promise,
};
return buildResult(addedRequests, promise, requestIterator);
}

/**
Expand Down Expand Up @@ -980,6 +1023,18 @@ export interface AddRequestsBatchedOptions extends RequestQueueOperationOptions
* @default 1000
*/
waitBetweenBatchesMillis?: number;

/**
* If set, only this many *actually new* requests (i.e. not already present in the queue) will be added.
* Once the budget is reached, remaining requests from the iterable will be collected in
* {@apilink AddRequestsBatchedResult.requestsOverLimit|`requestsOverLimit`} instead.
*
* This is useful in combination with `maxRequestsPerCrawl` to avoid duplicate URLs consuming the budget.
*
* **Note:** Setting this option implicitly enables {@apilink AddRequestsBatchedOptions.waitForAllRequestsToBeAdded|`waitForAllRequestsToBeAdded`},
* since all batches must complete before leftover requests can be accurately reported.
*/
maxNewRequests?: number;
}

export interface AddRequestsBatchedResult {
Expand All @@ -1001,4 +1056,11 @@ export interface AddRequestsBatchedResult {
* ```
*/
waitForAllRequestsToBeAdded: Promise<ProcessedRequest[]>;

/**
* Requests from the input that were not added to the queue because the
* {@apilink AddRequestsBatchedOptions.maxNewRequests|`maxNewRequests`} budget was reached.
* Empty when `maxNewRequests` is not set.
*/
requestsOverLimit?: Source[];
}
1 change: 0 additions & 1 deletion packages/core/src/storages/request_queue_v2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,6 @@ export class RequestQueue extends RequestProvider {
this.queuePausedForMigration = true;
let requestId: string | null;

// eslint-disable-next-line no-cond-assign
while ((requestId = this.queueHeadIds.removeFirst()) !== null) {
try {
await this.client.deleteRequestLock(requestId);
Expand Down
3 changes: 0 additions & 3 deletions packages/memory-storage/test/async-iteration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,6 @@ describe('Async iteration support', () => {
});

test('yields strings directly, not objects', async () => {
// eslint-disable-next-line no-unreachable-loop
for await (const key of kvStore.keys()) {
expect(typeof key).toBe('string');
break; // Only need to check the first one
Expand Down Expand Up @@ -291,7 +290,6 @@ describe('Async iteration support', () => {
});

test('yields values directly, not KeyValueStoreRecord objects', async () => {
// eslint-disable-next-line no-unreachable-loop
for await (const value of kvStore.values()) {
// Should be the actual value, not a record wrapper
expect(value).toStrictEqual({ data: 'key-00' });
Expand Down Expand Up @@ -380,7 +378,6 @@ describe('Async iteration support', () => {
});

test('yields [key, value] tuples', async () => {
// eslint-disable-next-line no-unreachable-loop
for await (const [key, value] of kvStore.entries()) {
expect(typeof key).toBe('string');
expect(key).toBe('key-00');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,7 @@ export class AdaptivePlaywrightCrawler extends PlaywrightCrawler {
wasAlreadyHandled: false,
})),
waitForAllRequestsToBeAdded: Promise.resolve([]),
requestsOverLimit: [],
};
};
// We need to use a mock request queue implementation, in order to add the requests into our result object
Expand Down
30 changes: 20 additions & 10 deletions packages/utils/src/internals/iterables.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,24 +85,34 @@ export async function* asyncifyIterable<T>(iterable: Iterable<T> | AsyncIterable
*/
export async function* chunkedAsyncIterable<T>(
iterable: AsyncIterable<T> | Iterable<T>,
chunkSize: number,
chunkSize: number | (() => number),
): AsyncIterable<T[]> {
if (typeof chunkSize !== 'number' || chunkSize < 1) {
const getChunkSize = typeof chunkSize === 'function' ? chunkSize : () => chunkSize;

if (typeof chunkSize === 'number' && chunkSize < 1) {
throw new Error(`Chunk size must be a positive number (${inspect(chunkSize)}) received`);
}

let chunk: T[] = [];
const iterator =
Symbol.asyncIterator in iterable
? (iterable as AsyncIterable<T>)[Symbol.asyncIterator]()
: (iterable as Iterable<T>)[Symbol.iterator]();

while (true) {
const currentSize = getChunkSize();
if (currentSize < 1) break;

for await (const item of iterable) {
chunk.push(item);
const chunk: T[] = [];

if (chunk.length >= chunkSize) {
yield chunk;
chunk = [];
for (let i = 0; i < currentSize; i++) {
const next = await iterator.next();
if (next.done) {
break;
}
chunk.push(next.value);
}
}

if (chunk.length) {
if (chunk.length === 0) break;
yield chunk;
}
}
Expand Down
13 changes: 13 additions & 0 deletions packages/utils/src/internals/sitemap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,13 @@ export interface ParseSitemapOptions {
* @default true
*/
reportNetworkErrors?: boolean;
/**
* Optional filter for nested sitemap URLs discovered in sitemap index files.
* Called with the URL of each child sitemap before it is fetched.
* Return `true` to include the sitemap, `false` to skip it.
* If not provided, all nested sitemaps are followed.
*/
nestedSitemapFilter?: (sitemapUrl: string) => boolean;
}

export async function* parseSitemap<T extends ParseSitemapOptions>(
Expand All @@ -209,6 +216,7 @@ export async function* parseSitemap<T extends ParseSitemapOptions>(
sitemapRetries = 3,
networkTimeouts,
reportNetworkErrors = true,
nestedSitemapFilter,
} = options ?? {};

const sources = [...initialSources];
Expand Down Expand Up @@ -340,6 +348,11 @@ export async function* parseSitemap<T extends ParseSitemapOptions>(

for await (const item of items) {
if (item.type === 'sitemapUrl' && !visitedSitemapUrls.has(item.url)) {
if (nestedSitemapFilter && !nestedSitemapFilter(item.url)) {
log.debug(`Skipping sitemap ${item.url} due to nestedSitemapFilter.`);
continue;
}

sources.push({ type: 'url', url: item.url, depth: (source.depth ?? 0) + 1 });
if (emitNestedSitemaps) {
yield { loc: item.url, originSitemapUrl: null } as any;
Expand Down
Loading
Loading