fix(crawl): finish crawl even if last one fails

This commit is contained in:
Gergő Móricz 2024-11-26 16:28:45 +01:00
parent f395c5b008
commit e217952434

View File

@ -56,6 +56,99 @@ const connectionMonitorInterval =
Number(process.env.CONNECTION_MONITOR_INTERVAL) || 10;
const gotJobInterval = Number(process.env.CONNECTION_MONITOR_INTERVAL) || 20;
async function finishCrawlIfNeeded(job: Job & { id: string }, sc: StoredCrawl) {
if (await finishCrawl(job.data.crawl_id)) {
if (!job.data.v1) {
const jobIDs = await getCrawlJobs(job.data.crawl_id);
const jobs = (await getJobs(jobIDs)).sort((a, b) => a.timestamp - b.timestamp);
// const jobStatuses = await Promise.all(jobs.map((x) => x.getState()));
const jobStatus =
sc.cancelled // || jobStatuses.some((x) => x === "failed")
? "failed"
: "completed";
const fullDocs = jobs.map((x) =>
Array.isArray(x.returnvalue) ? x.returnvalue[0] : x.returnvalue
);
await logJob({
job_id: job.data.crawl_id,
success: jobStatus === "completed",
message: sc.cancelled ? "Cancelled" : undefined,
num_docs: fullDocs.length,
docs: [],
time_taken: (Date.now() - sc.createdAt) / 1000,
team_id: job.data.team_id,
mode: job.data.crawlerOptions !== null ? "crawl" : "batch_scrape",
url: sc.originUrl!,
scrapeOptions: sc.scrapeOptions,
crawlerOptions: sc.crawlerOptions,
origin: job.data.origin,
});
const data = {
success: jobStatus !== "failed",
result: {
links: fullDocs.map((doc) => {
return {
content: doc,
source: doc?.metadata?.sourceURL ?? doc?.url ?? "",
};
}),
},
project_id: job.data.project_id,
docs: fullDocs,
};
// v0 web hooks, call when done with all the data
if (!job.data.v1) {
callWebhook(
job.data.team_id,
job.data.crawl_id,
data,
job.data.webhook,
job.data.v1,
job.data.crawlerOptions !== null ? "crawl.completed" : "batch_scrape.completed"
);
}
} else {
const jobIDs = await getCrawlJobs(job.data.crawl_id);
const jobStatus =
sc.cancelled
? "failed"
: "completed";
// v1 web hooks, call when done with no data, but with event completed
if (job.data.v1 && job.data.webhook) {
callWebhook(
job.data.team_id,
job.data.crawl_id,
[],
job.data.webhook,
job.data.v1,
job.data.crawlerOptions !== null ? "crawl.completed" : "batch_scrape.completed"
);
}
await logJob({
job_id: job.data.crawl_id,
success: jobStatus === "completed",
message: sc.cancelled ? "Cancelled" : undefined,
num_docs: jobIDs.length,
docs: [],
time_taken: (Date.now() - sc.createdAt) / 1000,
team_id: job.data.team_id,
scrapeOptions: sc.scrapeOptions,
mode: job.data.crawlerOptions !== null ? "crawl" : "batch_scrape",
url: sc?.originUrl ?? (job.data.crawlerOptions === null ? "Batch Scrape" : "Unknown"),
crawlerOptions: sc.crawlerOptions,
origin: job.data.origin,
}, true);
}
}
}
const processJobInternal = async (token: string, job: Job & { id: string }) => {
const extendLockInterval = setInterval(async () => {
logger.info(`🐂 Worker extending lock on job ${job.id}`);
@ -399,96 +492,7 @@ async function processJob(job: Job & { id: string }, token: string) {
}
}
if (await finishCrawl(job.data.crawl_id)) {
if (!job.data.v1) {
const jobIDs = await getCrawlJobs(job.data.crawl_id);
const jobs = (await getJobs(jobIDs)).sort((a, b) => a.timestamp - b.timestamp);
// const jobStatuses = await Promise.all(jobs.map((x) => x.getState()));
const jobStatus =
sc.cancelled // || jobStatuses.some((x) => x === "failed")
? "failed"
: "completed";
const fullDocs = jobs.map((x) =>
Array.isArray(x.returnvalue) ? x.returnvalue[0] : x.returnvalue
);
await logJob({
job_id: job.data.crawl_id,
success: jobStatus === "completed",
message: sc.cancelled ? "Cancelled" : undefined,
num_docs: fullDocs.length,
docs: [],
time_taken: (Date.now() - sc.createdAt) / 1000,
team_id: job.data.team_id,
mode: job.data.crawlerOptions !== null ? "crawl" : "batch_scrape",
url: sc.originUrl!,
scrapeOptions: sc.scrapeOptions,
crawlerOptions: sc.crawlerOptions,
origin: job.data.origin,
});
const data = {
success: jobStatus !== "failed",
result: {
links: fullDocs.map((doc) => {
return {
content: doc,
source: doc?.metadata?.sourceURL ?? doc?.url ?? "",
};
}),
},
project_id: job.data.project_id,
docs: fullDocs,
};
// v0 web hooks, call when done with all the data
if (!job.data.v1) {
callWebhook(
job.data.team_id,
job.data.crawl_id,
data,
job.data.webhook,
job.data.v1,
job.data.crawlerOptions !== null ? "crawl.completed" : "batch_scrape.completed"
);
}
} else {
const jobIDs = await getCrawlJobs(job.data.crawl_id);
const jobStatus =
sc.cancelled
? "failed"
: "completed";
// v1 web hooks, call when done with no data, but with event completed
if (job.data.v1 && job.data.webhook) {
callWebhook(
job.data.team_id,
job.data.crawl_id,
[],
job.data.webhook,
job.data.v1,
job.data.crawlerOptions !== null ? "crawl.completed" : "batch_scrape.completed"
);
}
await logJob({
job_id: job.data.crawl_id,
success: jobStatus === "completed",
message: sc.cancelled ? "Cancelled" : undefined,
num_docs: jobIDs.length,
docs: [],
time_taken: (Date.now() - sc.createdAt) / 1000,
team_id: job.data.team_id,
scrapeOptions: sc.scrapeOptions,
mode: job.data.crawlerOptions !== null ? "crawl" : "batch_scrape",
url: sc?.originUrl ?? (job.data.crawlerOptions === null ? "Batch Scrape" : "Unknown"),
crawlerOptions: sc.crawlerOptions,
origin: job.data.origin,
}, true);
}
}
await finishCrawlIfNeeded(job, sc);
}
logger.info(`🐂 Job done ${job.id}`);
@ -547,6 +551,8 @@ async function processJob(job: Job & { id: string }, token: string) {
if (job.data.crawl_id) {
const sc = (await getCrawl(job.data.crawl_id)) as StoredCrawl;
await addCrawlJobDone(job.data.crawl_id, job.id);
await logJob({
job_id: job.id as string,
@ -567,6 +573,8 @@ async function processJob(job: Job & { id: string }, token: string) {
origin: job.data.origin,
crawl_id: job.data.crawl_id,
}, true);
await finishCrawlIfNeeded(job, sc);
// await logJob({
// job_id: job.data.crawl_id,