feat(v1/map): stop mapping if timed out via AbortController (#1205)

This commit is contained in:
Gergő Móricz 2025-02-20 00:42:13 +01:00 committed by GitHub
parent 2200f084f3
commit 46b187bc64
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 137 additions and 105 deletions

View File

@ -5,6 +5,7 @@ import {
mapRequestSchema,
RequestWithAuth,
scrapeOptions,
TimeoutSignal,
} from "./types";
import { crawlToCrawler, StoredCrawl } from "../../lib/crawl-redis";
import { MapResponse, MapRequest } from "./types";
@ -53,6 +54,7 @@ export async function getMapResults({
origin,
includeMetadata = false,
allowExternalLinks,
abort = new AbortController().signal, // noop
}: {
url: string;
search?: string;
@ -65,6 +67,7 @@ export async function getMapResults({
origin?: string;
includeMetadata?: boolean;
allowExternalLinks?: boolean;
abort?: AbortSignal;
}): Promise<MapResult> {
const id = uuidv4();
let links: string[] = [url];
@ -87,8 +90,8 @@ export async function getMapResults({
const crawler = crawlToCrawler(id, sc);
try {
sc.robots = await crawler.getRobotsTxt();
await crawler.importRobotsTxt(sc.robots);
sc.robots = await crawler.getRobotsTxt(false, abort);
crawler.importRobotsTxt(sc.robots);
} catch (_) {}
// If sitemapOnly is true, only get links from sitemap
@ -102,6 +105,7 @@ export async function getMapResults({
true,
true,
30000,
abort,
);
if (sitemap > 0) {
links = links
@ -144,7 +148,7 @@ export async function getMapResults({
return fireEngineMap(mapUrl, {
numResults: resultsPerPage,
page: page,
});
}, abort);
};
pagePromises = Array.from({ length: maxPages }, (_, i) =>
@ -157,7 +161,7 @@ export async function getMapResults({
// Parallelize sitemap index query with search results
const [sitemapIndexResult, ...searchResults] = await Promise.all([
querySitemapIndex(url),
querySitemapIndex(url, abort),
...(cachedResult ? [] : pagePromises),
]);
@ -178,6 +182,7 @@ export async function getMapResults({
true,
false,
30000,
abort,
);
} catch (e) {
logger.warn("tryGetSitemap threw an error", { error: e });
@ -277,6 +282,7 @@ export async function mapController(
req.body = mapRequestSchema.parse(req.body);
let result: Awaited<ReturnType<typeof getMapResults>>;
const abort = new AbortController();
try {
result = await Promise.race([
getMapResults({
@ -289,13 +295,17 @@ export async function mapController(
origin: req.body.origin,
teamId: req.auth.team_id,
plan: req.auth.plan,
abort: abort.signal,
}),
...(req.body.timeout !== undefined ? [
new Promise((resolve, reject) => setTimeout(() => reject("timeout"), req.body.timeout))
new Promise((resolve, reject) => setTimeout(() => {
abort.abort(new TimeoutSignal());
reject(new TimeoutSignal());
}, req.body.timeout))
] : []),
]) as any;
} catch (error) {
if (error === "timeout") {
if (error instanceof TimeoutSignal || error === "timeout") {
return res.status(408).json({
success: false,
error: "Request timed out",

View File

@ -1004,3 +1004,9 @@ export const generateLLMsTextRequestSchema = z.object({
export type GenerateLLMsTextRequest = z.infer<
typeof generateLLMsTextRequestSchema
>;
export class TimeoutSignal extends Error {
constructor() {
super("Operation timed out")
}
}

View File

@ -9,6 +9,7 @@ import { logger as _logger } from "../../lib/logger";
import https from "https";
import { redisConnection } from "../../services/queue-service";
import { extractLinks } from "../../lib/html-transformer";
import { TimeoutSignal } from "../../controllers/v1/types";
export class WebCrawler {
private jobId: string;
private initialUrl: string;
@ -182,7 +183,7 @@ export class WebCrawler {
.slice(0, limit);
}
public async getRobotsTxt(skipTlsVerification = false): Promise<string> {
public async getRobotsTxt(skipTlsVerification = false, abort?: AbortSignal): Promise<string> {
let extraArgs = {};
if (skipTlsVerification) {
extraArgs["httpsAgent"] = new https.Agent({
@ -191,6 +192,7 @@ export class WebCrawler {
}
const response = await axios.get(this.robotsTxtUrl, {
timeout: axiosTimeout,
signal: abort,
...extraArgs,
});
return response.data;
@ -205,6 +207,7 @@ export class WebCrawler {
fromMap: boolean = false,
onlySitemap: boolean = false,
timeout: number = 120000,
abort?: AbortSignal
): Promise<number> {
this.logger.debug(`Fetching sitemap links from ${this.initialUrl}`, {
method: "tryGetSitemap",
@ -260,10 +263,10 @@ export class WebCrawler {
try {
let count = (await Promise.race([
Promise.all([
this.tryFetchSitemapLinks(this.initialUrl, _urlsHandler),
this.tryFetchSitemapLinks(this.initialUrl, _urlsHandler, abort),
...this.robots
.getSitemaps()
.map((x) => this.tryFetchSitemapLinks(x, _urlsHandler)),
.map((x) => this.tryFetchSitemapLinks(x, _urlsHandler, abort)),
]).then((results) => results.reduce((a, x) => a + x, 0)),
timeoutPromise,
])) as number;
@ -555,6 +558,7 @@ export class WebCrawler {
private async tryFetchSitemapLinks(
url: string,
urlsHandler: (urls: string[]) => unknown,
abort?: AbortSignal,
): Promise<number> {
const sitemapUrl = url.endsWith(".xml")
? url
@ -569,13 +573,18 @@ export class WebCrawler {
this.logger,
this.jobId,
this.sitemapsHit,
abort,
);
} catch (error) {
this.logger.debug(`Failed to fetch sitemap from ${sitemapUrl}`, {
method: "tryFetchSitemapLinks",
sitemapUrl,
error,
});
if (error instanceof TimeoutSignal) {
throw error;
} else {
this.logger.debug(`Failed to fetch sitemap from ${sitemapUrl}`, {
method: "tryFetchSitemapLinks",
sitemapUrl,
error,
});
}
}
// If this is a subdomain, also try to get sitemap from the main domain
@ -611,20 +620,29 @@ export class WebCrawler {
this.logger,
this.jobId,
this.sitemapsHit,
abort,
);
} catch (error) {
this.logger.debug(
`Failed to fetch main domain sitemap from ${mainDomainSitemapUrl}`,
{ method: "tryFetchSitemapLinks", mainDomainSitemapUrl, error },
);
if (error instanceof TimeoutSignal) {
throw error;
} else {
this.logger.debug(
`Failed to fetch main domain sitemap from ${mainDomainSitemapUrl}`,
{ method: "tryFetchSitemapLinks", mainDomainSitemapUrl, error },
);
}
}
}
} catch (error) {
this.logger.debug(`Error processing main domain sitemap`, {
method: "tryFetchSitemapLinks",
url,
error,
});
if (error instanceof TimeoutSignal) {
throw error;
} else {
this.logger.debug(`Error processing main domain sitemap`, {
method: "tryFetchSitemapLinks",
url,
error,
});
}
}
// If no sitemap found yet, try the baseUrl as a last resort
@ -636,22 +654,28 @@ export class WebCrawler {
this.logger,
this.jobId,
this.sitemapsHit,
abort,
);
} catch (error) {
this.logger.debug(`Failed to fetch sitemap from ${baseUrlSitemap}`, {
method: "tryFetchSitemapLinks",
sitemapUrl: baseUrlSitemap,
error,
});
if (error instanceof AxiosError && error.response?.status === 404) {
// ignore 404
if (error instanceof TimeoutSignal) {
throw error;
} else {
sitemapCount += await getLinksFromSitemap(
{ sitemapUrl: baseUrlSitemap, urlsHandler, mode: "fire-engine" },
this.logger,
this.jobId,
this.sitemapsHit,
);
this.logger.debug(`Failed to fetch sitemap from ${baseUrlSitemap}`, {
method: "tryFetchSitemapLinks",
sitemapUrl: baseUrlSitemap,
error,
});
if (error instanceof AxiosError && error.response?.status === 404) {
// ignore 404
} else {
sitemapCount += await getLinksFromSitemap(
{ sitemapUrl: baseUrlSitemap, urlsHandler, mode: "fire-engine" },
this.logger,
this.jobId,
this.sitemapsHit,
abort,
);
}
}
}
}

View File

@ -12,10 +12,11 @@ import { supabase_service } from "../../services/supabase";
*/
import { withAuth } from "../../lib/withAuth";
async function querySitemapIndexFunction(url: string) {
async function querySitemapIndexFunction(url: string, abort?: AbortSignal) {
const originUrl = normalizeUrlOnlyHostname(url);
for (let attempt = 1; attempt <= 3; attempt++) {
abort?.throwIfAborted();
try {
const { data, error } = await supabase_service
.from("crawl_maps")

View File

@ -1,8 +1,7 @@
import { axiosTimeout } from "../../lib/timeout";
import { parseStringPromise } from "xml2js";
import { WebCrawler } from "./crawler";
import { scrapeURL } from "../scrapeURL";
import { scrapeOptions } from "../../controllers/v1/types";
import { scrapeOptions, TimeoutSignal } from "../../controllers/v1/types";
import type { Logger } from "winston";
const useFireEngine =
process.env.FIRE_ENGINE_BETA_URL !== "" &&
@ -20,6 +19,7 @@ export async function getLinksFromSitemap(
logger: Logger,
crawlId: string,
sitemapsHit: Set<string>,
abort?: AbortSignal,
): Promise<number> {
if (sitemapsHit.size >= 20) {
return 0;
@ -44,7 +44,8 @@ export async function getLinksFromSitemap(
"fetch",
...((mode === "fire-engine" && useFireEngine) ? ["fire-engine;tlsclient" as const] : []),
],
v0DisableJsDom: true
v0DisableJsDom: true,
abort,
},
);
@ -69,14 +70,18 @@ export async function getLinksFromSitemap(
return 0;
}
} catch (error) {
logger.error(`Request failed for sitemap fetch`, {
method: "getLinksFromSitemap",
mode,
sitemapUrl,
error,
});
return 0;
if (error instanceof TimeoutSignal) {
throw error;
} else {
logger.error(`Request failed for sitemap fetch`, {
method: "getLinksFromSitemap",
mode,
sitemapUrl,
error,
});
return 0;
}
}
const parsed = await parseStringPromise(content);
@ -90,7 +95,7 @@ export async function getLinksFromSitemap(
.map((sitemap) => sitemap.loc[0].trim());
const sitemapPromises: Promise<number>[] = sitemapUrls.map((sitemapUrl) =>
getLinksFromSitemap({ sitemapUrl, urlsHandler, mode }, logger, crawlId, sitemapsHit),
getLinksFromSitemap({ sitemapUrl, urlsHandler, mode }, logger, crawlId, sitemapsHit, abort),
);
const results = await Promise.all(sitemapPromises);
@ -114,6 +119,7 @@ export async function getLinksFromSitemap(
logger,
crawlId,
sitemapsHit,
abort,
),
);
count += (await Promise.all(sitemapPromises)).reduce(
@ -151,56 +157,3 @@ export async function getLinksFromSitemap(
return 0;
}
export const fetchSitemapData = async (
url: string,
timeout?: number,
): Promise<SitemapEntry[] | null> => {
const sitemapUrl = url.endsWith("/sitemap.xml") ? url : `${url}/sitemap.xml`;
try {
const fetchResponse = await scrapeURL(
"sitemap",
sitemapUrl,
scrapeOptions.parse({
formats: ["rawHtml"],
timeout: timeout || axiosTimeout,
}),
{ forceEngine: "fetch" },
);
if (
fetchResponse.success &&
fetchResponse.document.metadata.statusCode >= 200 &&
fetchResponse.document.metadata.statusCode < 300
) {
const xml = fetchResponse.document.rawHtml!;
const parsedXml = await parseStringPromise(xml);
const sitemapData: SitemapEntry[] = [];
if (parsedXml.urlset && parsedXml.urlset.url) {
for (const urlElement of parsedXml.urlset.url) {
const sitemapEntry: SitemapEntry = { loc: urlElement.loc[0] };
if (urlElement.lastmod) sitemapEntry.lastmod = urlElement.lastmod[0];
if (urlElement.changefreq)
sitemapEntry.changefreq = urlElement.changefreq[0];
if (urlElement.priority)
sitemapEntry.priority = Number(urlElement.priority[0]);
sitemapData.push(sitemapEntry);
}
}
return sitemapData;
}
return null;
} catch (error) {
// Error handling for failed sitemap fetch
}
return [];
};
export interface SitemapEntry {
loc: string;
lastmod?: string;
changefreq?: string;
priority?: number;
}

View File

@ -21,6 +21,7 @@ export async function scrapeURLWithFetch(
dispatcher: await makeSecureDispatcher(meta.url),
redirect: "follow",
headers: meta.options.headers,
signal: meta.internalOptions.abort,
}),
(async () => {
await new Promise((resolve) =>

View File

@ -85,6 +85,7 @@ export async function fireEngineCheckStatus(
logger: Logger,
jobId: string,
mock: MockState | null,
abort?: AbortSignal,
): Promise<FireEngineCheckStatusSuccess> {
const status = await Sentry.startSpan(
{

View File

@ -24,8 +24,9 @@ import * as Sentry from "@sentry/node";
import { Action } from "../../../../lib/entities";
import { specialtyScrapeCheck } from "../utils/specialtyHandler";
import { fireEngineDelete } from "./delete";
import { MockState, saveMock } from "../../lib/mock";
import { MockState } from "../../lib/mock";
import { getInnerJSON } from "../../../../lib/html-transformer";
import { TimeoutSignal } from "../../../../controllers/v1/types";
// This function does not take `Meta` on purpose. It may not access any
// meta values to construct the request -- that must be done by the
@ -40,6 +41,7 @@ async function performFireEngineScrape<
request: FireEngineScrapeRequestCommon & Engine,
timeout: number,
mock: MockState | null,
abort?: AbortSignal,
): Promise<FireEngineCheckStatusSuccess> {
const scrape = await fireEngineScrape(
logger.child({ method: "fireEngineScrape" }),
@ -84,6 +86,7 @@ async function performFireEngineScrape<
logger.child({ method: "fireEngineCheckStatus" }),
scrape.jobId,
mock,
abort,
);
} catch (error) {
if (error instanceof StillProcessingError) {
@ -107,6 +110,16 @@ async function performFireEngineScrape<
jobId: scrape.jobId,
});
throw error;
} else if (error instanceof TimeoutSignal) {
fireEngineDelete(
logger.child({
method: "performFireEngineScrape/fireEngineDelete",
afterError: error,
}),
scrape.jobId,
mock,
);
throw error;
} else {
Sentry.captureException(error);
errors.push(error);
@ -219,6 +232,7 @@ export async function scrapeURLWithFireEngineChromeCDP(
request,
timeout,
meta.mock,
meta.internalOptions.abort,
);
if (
@ -298,6 +312,7 @@ export async function scrapeURLWithFireEnginePlaywright(
request,
timeout,
meta.mock,
meta.internalOptions.abort,
);
if (!response.url) {
@ -353,6 +368,7 @@ export async function scrapeURLWithFireEngineTLSClient(
request,
timeout,
meta.mock,
meta.internalOptions.abort,
);
if (!response.url) {

View File

@ -76,6 +76,7 @@ export async function fireEngineScrape<
logger: Logger,
request: FireEngineScrapeRequestCommon & Engine,
mock: MockState | null,
abort?: AbortSignal,
): Promise<z.infer<typeof schema>> {
const scrapeRequest = await Sentry.startSpan(
{
@ -101,6 +102,7 @@ export async function fireEngineScrape<
schema,
tryCount: 3,
mock,
abort,
});
},
);

View File

@ -1,7 +1,7 @@
import { Logger } from "winston";
import * as Sentry from "@sentry/node";
import { Document, ScrapeOptions } from "../../controllers/v1/types";
import { Document, ScrapeOptions, TimeoutSignal } from "../../controllers/v1/types";
import { logger as _logger } from "../../lib/logger";
import {
buildFallbackList,
@ -165,6 +165,7 @@ export type InternalOptions = {
disableSmartWaitCache?: boolean; // Passed along to fire-engine
isBackgroundIndex?: boolean;
fromCache?: boolean; // Indicates if the document was retrieved from cache
abort?: AbortSignal;
};
export type EngineResultsTracker = {
@ -222,6 +223,7 @@ async function scrapeURLLoop(meta: Meta): Promise<ScrapeUrlResponse> {
: undefined;
for (const { engine, unsupportedFeatures } of fallbackList) {
meta.internalOptions.abort?.throwIfAborted();
const startedAt = Date.now();
try {
meta.logger.info("Scraping via " + engine + "...");
@ -307,6 +309,8 @@ async function scrapeURLLoop(meta: Meta): Promise<ScrapeUrlResponse> {
throw error;
} else if (error instanceof UnsupportedFileError) {
throw error;
} else if (error instanceof TimeoutSignal) {
throw error;
} else {
Sentry.captureException(error);
meta.logger.warn(
@ -433,6 +437,8 @@ export async function scrapeURL(
meta.logger.warn("scrapeURL: Tried to scrape unsupported file", {
error,
});
} else if (error instanceof TimeoutSignal) {
throw error;
} else {
Sentry.captureException(error);
meta.logger.error("scrapeURL: Unexpected error happened", { error });

View File

@ -2,6 +2,7 @@ import { Logger } from "winston";
import { z, ZodError } from "zod";
import * as Sentry from "@sentry/node";
import { MockState, saveMock } from "./mock";
import { TimeoutSignal } from "../../../controllers/v1/types";
import { fireEngineURL } from "../engines/fire-engine/scrape";
export type RobustFetchParams<Schema extends z.Schema<any>> = {
@ -18,6 +19,7 @@ export type RobustFetchParams<Schema extends z.Schema<any>> = {
tryCount?: number;
tryCooldown?: number;
mock: MockState | null;
abort?: AbortSignal;
};
export async function robustFetch<
@ -36,7 +38,10 @@ export async function robustFetch<
tryCount = 1,
tryCooldown,
mock,
abort,
}: RobustFetchParams<Schema>): Promise<Output> {
abort?.throwIfAborted();
const params = {
url,
logger,
@ -48,6 +53,7 @@ export async function robustFetch<
ignoreFailure,
tryCount,
tryCooldown,
abort,
};
let response: {
@ -71,6 +77,7 @@ export async function robustFetch<
: {}),
...(headers !== undefined ? headers : {}),
},
signal: abort,
...(body instanceof FormData
? {
body,
@ -82,7 +89,9 @@ export async function robustFetch<
: {}),
});
} catch (error) {
if (!ignoreFailure) {
if (error instanceof TimeoutSignal) {
throw error;
} else if (!ignoreFailure) {
Sentry.captureException(error);
if (tryCount > 1) {
logger.debug(

View File

@ -305,6 +305,7 @@ export async function performLLMExtract(
document: Document,
): Promise<Document> {
if (meta.options.formats.includes("extract")) {
meta.internalOptions.abort?.throwIfAborted();
const { extract, warning } = await generateOpenAICompletions(
meta.logger.child({
method: "performLLMExtract/generateOpenAICompletions",

View File

@ -16,6 +16,7 @@ export async function fireEngineMap(
numResults: number;
page?: number;
},
abort?: AbortSignal,
): Promise<SearchResult[]> {
try {
let data = JSON.stringify({
@ -40,6 +41,7 @@ export async function fireEngineMap(
"X-Disable-Cache": "true",
},
body: data,
signal: abort,
});
if (response.ok) {