Merge pull request #731 from mendableai/nsc/crawl-fixes

Fixes crawl failed and webhooks not working properly
This commit is contained in:
Nicolas 2024-10-03 17:37:03 -03:00 committed by GitHub
commit 99ca852e5d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 99 additions and 74 deletions

View File

@ -278,23 +278,24 @@ describe("E2E Tests for v1 API Routes", () => {
expect(response.body.data.metadata.statusCode).toBe(401); expect(response.body.data.metadata.statusCode).toBe(401);
}, 60000); }, 60000);
it.concurrent('should return a successful response for a scrape with 403 page', async () => { // Removed it as we want to retry fallback to the next scraper
const response: ScrapeResponseRequestTest = await request(TEST_URL) // it.concurrent('should return a successful response for a scrape with 403 page', async () => {
.post('/v1/scrape') // const response: ScrapeResponseRequestTest = await request(TEST_URL)
.set('Authorization', `Bearer ${process.env.TEST_API_KEY}`) // .post('/v1/scrape')
.set('Content-Type', 'application/json') // .set('Authorization', `Bearer ${process.env.TEST_API_KEY}`)
.send({ url: 'https://httpstat.us/403' }); // .set('Content-Type', 'application/json')
await new Promise((r) => setTimeout(r, 5000)); // .send({ url: 'https://httpstat.us/403' });
// await new Promise((r) => setTimeout(r, 5000));
expect(response.statusCode).toBe(200); // expect(response.statusCode).toBe(200);
expect(response.body).toHaveProperty('data'); // expect(response.body).toHaveProperty('data');
if (!("data" in response.body)) { // if (!("data" in response.body)) {
throw new Error("Expected response body to have 'data' property"); // throw new Error("Expected response body to have 'data' property");
} // }
expect(response.body.data).toHaveProperty('markdown'); // expect(response.body.data).toHaveProperty('markdown');
expect(response.body.data).toHaveProperty('metadata'); // expect(response.body.data).toHaveProperty('metadata');
expect(response.body.data.metadata.statusCode).toBe(403); // expect(response.body.data.metadata.statusCode).toBe(403);
}, 60000); // }, 60000);
it.concurrent('should return a successful response for a scrape with 404 page', async () => { it.concurrent('should return a successful response for a scrape with 404 page', async () => {
const response: ScrapeResponseRequestTest = await request(TEST_URL) const response: ScrapeResponseRequestTest = await request(TEST_URL)
@ -314,41 +315,41 @@ describe("E2E Tests for v1 API Routes", () => {
expect(response.body.data.metadata.statusCode).toBe(404); expect(response.body.data.metadata.statusCode).toBe(404);
}, 60000); }, 60000);
it.concurrent('should return a successful response for a scrape with 405 page', async () => { // it.concurrent('should return a successful response for a scrape with 405 page', async () => {
const response: ScrapeResponseRequestTest = await request(TEST_URL) // const response: ScrapeResponseRequestTest = await request(TEST_URL)
.post('/v1/scrape') // .post('/v1/scrape')
.set('Authorization', `Bearer ${process.env.TEST_API_KEY}`) // .set('Authorization', `Bearer ${process.env.TEST_API_KEY}`)
.set('Content-Type', 'application/json') // .set('Content-Type', 'application/json')
.send({ url: 'https://httpstat.us/405' }); // .send({ url: 'https://httpstat.us/405' });
await new Promise((r) => setTimeout(r, 5000)); // await new Promise((r) => setTimeout(r, 5000));
expect(response.statusCode).toBe(200); // expect(response.statusCode).toBe(200);
expect(response.body).toHaveProperty('data'); // expect(response.body).toHaveProperty('data');
if (!("data" in response.body)) { // if (!("data" in response.body)) {
throw new Error("Expected response body to have 'data' property"); // throw new Error("Expected response body to have 'data' property");
} // }
expect(response.body.data).toHaveProperty('markdown'); // expect(response.body.data).toHaveProperty('markdown');
expect(response.body.data).toHaveProperty('metadata'); // expect(response.body.data).toHaveProperty('metadata');
expect(response.body.data.metadata.statusCode).toBe(405); // expect(response.body.data.metadata.statusCode).toBe(405);
}, 60000); // }, 60000);
it.concurrent('should return a successful response for a scrape with 500 page', async () => { // it.concurrent('should return a successful response for a scrape with 500 page', async () => {
const response: ScrapeResponseRequestTest = await request(TEST_URL) // const response: ScrapeResponseRequestTest = await request(TEST_URL)
.post('/v1/scrape') // .post('/v1/scrape')
.set('Authorization', `Bearer ${process.env.TEST_API_KEY}`) // .set('Authorization', `Bearer ${process.env.TEST_API_KEY}`)
.set('Content-Type', 'application/json') // .set('Content-Type', 'application/json')
.send({ url: 'https://httpstat.us/500' }); // .send({ url: 'https://httpstat.us/500' });
await new Promise((r) => setTimeout(r, 5000)); // await new Promise((r) => setTimeout(r, 5000));
expect(response.statusCode).toBe(200); // expect(response.statusCode).toBe(200);
expect(response.body).toHaveProperty('data'); // expect(response.body).toHaveProperty('data');
if (!("data" in response.body)) { // if (!("data" in response.body)) {
throw new Error("Expected response body to have 'data' property"); // throw new Error("Expected response body to have 'data' property");
} // }
expect(response.body.data).toHaveProperty('markdown'); // expect(response.body.data).toHaveProperty('markdown');
expect(response.body.data).toHaveProperty('metadata'); // expect(response.body.data).toHaveProperty('metadata');
expect(response.body.data.metadata.statusCode).toBe(500); // expect(response.body.data.metadata.statusCode).toBe(500);
}, 60000); // }, 60000);
it.concurrent("should return a timeout error when scraping takes longer than the specified timeout", async () => { it.concurrent("should return a timeout error when scraping takes longer than the specified timeout", async () => {
const response: ScrapeResponseRequestTest = await request(TEST_URL) const response: ScrapeResponseRequestTest = await request(TEST_URL)
@ -680,7 +681,7 @@ describe("POST /v1/crawl", () => {
.set("Content-Type", "application/json") .set("Content-Type", "application/json")
.send({ .send({
url: "https://firecrawl.dev", url: "https://firecrawl.dev",
limit: 10, limit: 40,
includePaths: ["blog/*"], includePaths: ["blog/*"],
}); });
@ -736,7 +737,7 @@ describe("POST /v1/crawl", () => {
.set("Content-Type", "application/json") .set("Content-Type", "application/json")
.send({ .send({
url: "https://firecrawl.dev", url: "https://firecrawl.dev",
limit: 10, limit: 40,
excludePaths: ["blog/*"], excludePaths: ["blog/*"],
}); });
@ -928,7 +929,7 @@ describe("GET /v1/crawl/:jobId", () => {
.post("/v1/crawl") .post("/v1/crawl")
.set("Authorization", `Bearer ${process.env.TEST_API_KEY}`) .set("Authorization", `Bearer ${process.env.TEST_API_KEY}`)
.set("Content-Type", "application/json") .set("Content-Type", "application/json")
.send({ url: "https://docs.tatum.io", limit: 200 }); .send({ url: "https://docs.firecrawl.dev", limit: 10 });
expect(crawlResponse.statusCode).toBe(200); expect(crawlResponse.statusCode).toBe(200);

View File

@ -49,12 +49,27 @@ export async function crawlStatusController(req: Request, res: Response) {
if (sc.team_id !== team_id) { if (sc.team_id !== team_id) {
return res.status(403).json({ error: "Forbidden" }); return res.status(403).json({ error: "Forbidden" });
} }
let jobIDs = await getCrawlJobs(req.params.jobId);
let jobs = await getJobs(req.params.jobId, jobIDs);
let jobStatuses = await Promise.all(jobs.map(x => x.getState()));
const jobIDs = await getCrawlJobs(req.params.jobId); // Combine jobs and jobStatuses into a single array of objects
let jobsWithStatuses = jobs.map((job, index) => ({
job,
status: jobStatuses[index]
}));
const jobs = (await getJobs(req.params.jobId, jobIDs)).sort((a, b) => a.timestamp - b.timestamp); // Filter out failed jobs
const jobStatuses = await Promise.all(jobs.map(x => x.getState())); jobsWithStatuses = jobsWithStatuses.filter(x => x.status !== "failed");
const jobStatus = sc.cancelled ? "failed" : jobStatuses.every(x => x === "completed") ? "completed" : jobs.some((x, i) => jobStatuses[i] === "failed" && x.failedReason !== "Concurrency limit hit") ? "failed" : "active";
// Sort jobs by timestamp
jobsWithStatuses.sort((a, b) => a.job.timestamp - b.job.timestamp);
// Extract sorted jobs and statuses
jobs = jobsWithStatuses.map(x => x.job);
jobStatuses = jobsWithStatuses.map(x => x.status);
const jobStatus = sc.cancelled ? "failed" : jobStatuses.every(x => x === "completed") ? "completed" : "active";
const data = jobs.filter(x => x.failedReason !== "Concurreny limit hit").map(x => Array.isArray(x.returnvalue) ? x.returnvalue[0] : x.returnvalue); const data = jobs.filter(x => x.failedReason !== "Concurreny limit hit").map(x => Array.isArray(x.returnvalue) ? x.returnvalue[0] : x.returnvalue);

View File

@ -94,11 +94,15 @@ async function crawlStatusWS(ws: WebSocket, req: RequestWithAuth<CrawlStatusPara
doneJobIDs = await getDoneJobsOrdered(req.params.jobId); doneJobIDs = await getDoneJobsOrdered(req.params.jobId);
const jobIDs = await getCrawlJobs(req.params.jobId); let jobIDs = await getCrawlJobs(req.params.jobId);
let jobStatuses = await Promise.all(jobIDs.map(async x => [x, await getScrapeQueue().getJobState(x)] as const)); let jobStatuses = await Promise.all(jobIDs.map(async x => [x, await getScrapeQueue().getJobState(x)] as const));
const throttledJobs = new Set(...await getThrottledJobs(req.auth.team_id)); const throttledJobs = new Set(...await getThrottledJobs(req.auth.team_id));
jobStatuses = jobStatuses.filter(x => !throttledJobs.has(x[0])); // throttled jobs can have a failed status, but they are not actually failed jobStatuses = jobStatuses.filter(x => !throttledJobs.has(x[0])); // throttled jobs can have a failed status, but they are not actually failed
const status: Exclude<CrawlStatusResponse, ErrorResponse>["status"] = sc.cancelled ? "cancelled" : jobStatuses.every(x => x[1] === "completed") ? "completed" : jobStatuses.some(x => x[1] === "failed") ? "failed" : "scraping"; // filter out failed jobs
jobIDs = jobIDs.filter(id => !jobStatuses.some(status => status[0] === id && status[1] === "failed"));
// filter the job statues
jobStatuses = jobStatuses.filter(x => x[1] !== "failed");
const status: Exclude<CrawlStatusResponse, ErrorResponse>["status"] = sc.cancelled ? "cancelled" : jobStatuses.every(x => x[1] === "completed") ? "completed" : "scraping";
const doneJobs = await getJobs(doneJobIDs); const doneJobs = await getJobs(doneJobIDs);
const data = doneJobs.map(x => x.returnvalue); const data = doneJobs.map(x => x.returnvalue);

View File

@ -57,11 +57,15 @@ export async function crawlStatusController(req: RequestWithAuth<CrawlStatusPara
const start = typeof req.query.skip === "string" ? parseInt(req.query.skip, 10) : 0; const start = typeof req.query.skip === "string" ? parseInt(req.query.skip, 10) : 0;
const end = typeof req.query.limit === "string" ? (start + parseInt(req.query.limit, 10) - 1) : undefined; const end = typeof req.query.limit === "string" ? (start + parseInt(req.query.limit, 10) - 1) : undefined;
const jobIDs = await getCrawlJobs(req.params.jobId); let jobIDs = await getCrawlJobs(req.params.jobId);
let jobStatuses = await Promise.all(jobIDs.map(async x => [x, await getScrapeQueue().getJobState(x)] as const)); let jobStatuses = await Promise.all(jobIDs.map(async x => [x, await getScrapeQueue().getJobState(x)] as const));
const throttledJobs = new Set(...await getThrottledJobs(req.auth.team_id)); const throttledJobs = new Set(...await getThrottledJobs(req.auth.team_id));
jobStatuses = jobStatuses.filter(x => !throttledJobs.has(x[0])); // throttled jobs can have a failed status, but they are not actually failed jobStatuses = jobStatuses.filter(x => !throttledJobs.has(x[0])); // throttled jobs can have a failed status, but they are not actually failed
const status: Exclude<CrawlStatusResponse, ErrorResponse>["status"] = sc.cancelled ? "cancelled" : jobStatuses.every(x => x[1] === "completed") ? "completed" : jobStatuses.some(x => x[1] === "failed") ? "failed" : "scraping"; // filter out failed jobs
jobIDs = jobIDs.filter(id => !jobStatuses.some(status => status[0] === id && status[1] === "failed"));
// filter the job statues
jobStatuses = jobStatuses.filter(x => x[1] !== "failed");
const status: Exclude<CrawlStatusResponse, ErrorResponse>["status"] = sc.cancelled ? "cancelled" : jobStatuses.every(x => x[1] === "completed") ? "completed" : "scraping";
const doneJobsLength = await getDoneJobsOrderedLength(req.params.jobId); const doneJobsLength = await getDoneJobsOrderedLength(req.params.jobId);
const doneJobsOrder = await getDoneJobsOrdered(req.params.jobId, start, end ?? -1); const doneJobsOrder = await getDoneJobsOrdered(req.params.jobId, start, end ?? -1);

View File

@ -461,8 +461,8 @@ export function legacyDocumentConverter(doc: any): Document {
...doc.metadata, ...doc.metadata,
pageError: undefined, pageError: undefined,
pageStatusCode: undefined, pageStatusCode: undefined,
error: doc.metadata.pageError, error: doc.metadata?.pageError,
statusCode: doc.metadata.pageStatusCode, statusCode: doc.metadata?.pageStatusCode,
}, },
}; };
} }

View File

@ -59,6 +59,7 @@ export async function startWebScraperPipeline({
is_scrape: job.data.is_scrape ?? false, is_scrape: job.data.is_scrape ?? false,
})) as { success: boolean; message: string; docs: Document[] }; })) as { success: boolean; message: string; docs: Document[] };
} }
export async function runWebScraper({ export async function runWebScraper({
url, url,
mode, mode,

View File

@ -425,7 +425,7 @@ export async function scrapSingleUrl(
Logger.debug(`⛏️ ${scraper}: Successfully scraped ${urlToScrap} with text length >= 100 or screenshot, breaking`); Logger.debug(`⛏️ ${scraper}: Successfully scraped ${urlToScrap} with text length >= 100 or screenshot, breaking`);
break; break;
} }
if (pageStatusCode && (pageStatusCode == 404 || pageStatusCode == 400)) { if (pageStatusCode && (pageStatusCode == 404 || pageStatusCode == 400 || pageStatusCode == 401)) {
Logger.debug(`⛏️ ${scraper}: Successfully scraped ${urlToScrap} with status code ${pageStatusCode}, breaking`); Logger.debug(`⛏️ ${scraper}: Successfully scraped ${urlToScrap} with status code ${pageStatusCode}, breaking`);
break; break;
} }

View File

@ -395,6 +395,7 @@ async function processJob(job: Job, token: string) {
pageOptions: sc.pageOptions, pageOptions: sc.pageOptions,
origin: job.data.origin, origin: job.data.origin,
crawl_id: job.data.crawl_id, crawl_id: job.data.crawl_id,
webhook: job.data.webhook,
v1: job.data.v1, v1: job.data.v1,
}, },
{}, {},
@ -468,9 +469,8 @@ async function processJob(job: Job, token: string) {
} }
} else { } else {
const jobIDs = await getCrawlJobs(job.data.crawl_id); const jobIDs = await getCrawlJobs(job.data.crawl_id);
const jobStatuses = await Promise.all(jobIDs.map((x) => getScrapeQueue().getJobState(x)));
const jobStatus = const jobStatus =
sc.cancelled || jobStatuses.some((x) => x === "failed") sc.cancelled
? "failed" ? "failed"
: "completed"; : "completed";
@ -554,16 +554,16 @@ async function processJob(job: Job, token: string) {
job.data.v1 job.data.v1
); );
} }
if (job.data.v1) { // if (job.data.v1) {
callWebhook( // callWebhook(
job.data.team_id, // job.data.team_id,
job.id as string, // job.id as string,
[], // [],
job.data.webhook, // job.data.webhook,
job.data.v1, // job.data.v1,
"crawl.failed" // "crawl.failed"
); // );
} // }
if (job.data.crawl_id) { if (job.data.crawl_id) {
await logJob({ await logJob({