fix(crawl-status): avoid race conditions where crawl may be deemed failed

This commit is contained in:
Gergő Móricz 2024-09-26 21:00:27 +02:00
parent dec4171937
commit b696bfc854
5 changed files with 29 additions and 10 deletions

View File

@ -54,9 +54,9 @@ export async function crawlStatusController(req: Request, res: Response) {
const jobs = (await getJobs(req.params.jobId, jobIDs)).sort((a, b) => a.timestamp - b.timestamp);
const jobStatuses = await Promise.all(jobs.map(x => x.getState()));
const jobStatus = sc.cancelled ? "failed" : jobStatuses.every(x => x === "completed") ? "completed" : jobStatuses.some(x => x === "failed") ? "failed" : "active";
const jobStatus = sc.cancelled ? "failed" : jobStatuses.every(x => x === "completed") ? "completed" : jobs.some((x, i) => jobStatuses[i] === "failed" && x.failedReason !== "Concurrency limit hit") ? "failed" : "active";
const data = jobs.map(x => Array.isArray(x.returnvalue) ? x.returnvalue[0] : x.returnvalue);
const data = jobs.filter(x => x.failedReason !== "Concurreny limit hit").map(x => Array.isArray(x.returnvalue) ? x.returnvalue[0] : x.returnvalue);
if (
jobs.length > 0 &&

View File

@ -5,7 +5,7 @@ import { CrawlStatusParams, CrawlStatusResponse, Document, ErrorResponse, legacy
import { WebSocket } from "ws";
import { v4 as uuidv4 } from "uuid";
import { Logger } from "../../lib/logger";
import { getCrawl, getCrawlExpiry, getCrawlJobs, getDoneJobsOrdered, getDoneJobsOrderedLength, isCrawlFinished, isCrawlFinishedLocked } from "../../lib/crawl-redis";
import { getCrawl, getCrawlExpiry, getCrawlJobs, getDoneJobsOrdered, getDoneJobsOrderedLength, getThrottledJobs, isCrawlFinished, isCrawlFinishedLocked } from "../../lib/crawl-redis";
import { getScrapeQueue } from "../../services/queue-service";
import { getJob, getJobs } from "./crawl-status";
import * as Sentry from "@sentry/node";
@ -95,8 +95,10 @@ async function crawlStatusWS(ws: WebSocket, req: RequestWithAuth<CrawlStatusPara
doneJobIDs = await getDoneJobsOrdered(req.params.jobId);
const jobIDs = await getCrawlJobs(req.params.jobId);
const jobStatuses = await Promise.all(jobIDs.map(x => getScrapeQueue().getJobState(x)));
const status: Exclude<CrawlStatusResponse, ErrorResponse>["status"] = sc.cancelled ? "cancelled" : jobStatuses.every(x => x === "completed") ? "completed" : jobStatuses.some(x => x === "failed") ? "failed" : "scraping";
let jobStatuses = await Promise.all(jobIDs.map(async x => [x, await getScrapeQueue().getJobState(x)] as const));
const throttledJobs = new Set(...await getThrottledJobs(req.auth.team_id));
jobStatuses = jobStatuses.filter(x => !throttledJobs.has(x[0])); // throttled jobs can have a failed status, but they are not actually failed
const status: Exclude<CrawlStatusResponse, ErrorResponse>["status"] = sc.cancelled ? "cancelled" : jobStatuses.every(x => x[1] === "completed") ? "completed" : jobStatuses.some(x => x[1] === "failed") ? "failed" : "scraping";
const doneJobs = await getJobs(doneJobIDs);
const data = doneJobs.map(x => x.returnvalue);

View File

@ -1,6 +1,6 @@
import { Response } from "express";
import { CrawlStatusParams, CrawlStatusResponse, ErrorResponse, legacyDocumentConverter, RequestWithAuth } from "./types";
import { getCrawl, getCrawlExpiry, getCrawlJobs, getDoneJobsOrdered, getDoneJobsOrderedLength } from "../../lib/crawl-redis";
import { getCrawl, getCrawlExpiry, getCrawlJobs, getDoneJobsOrdered, getDoneJobsOrderedLength, getThrottledJobs } from "../../lib/crawl-redis";
import { getScrapeQueue } from "../../services/queue-service";
import { supabaseGetJobById, supabaseGetJobsById } from "../../lib/supabase-jobs";
import { configDotenv } from "dotenv";
@ -58,8 +58,10 @@ export async function crawlStatusController(req: RequestWithAuth<CrawlStatusPara
const end = typeof req.query.limit === "string" ? (start + parseInt(req.query.limit, 10) - 1) : undefined;
const jobIDs = await getCrawlJobs(req.params.jobId);
const jobStatuses = await Promise.all(jobIDs.map(x => getScrapeQueue().getJobState(x)));
const status: Exclude<CrawlStatusResponse, ErrorResponse>["status"] = sc.cancelled ? "cancelled" : jobStatuses.every(x => x === "completed") ? "completed" : jobStatuses.some(x => x === "failed") ? "failed" : "scraping";
let jobStatuses = await Promise.all(jobIDs.map(async x => [x, await getScrapeQueue().getJobState(x)] as const));
const throttledJobs = new Set(...await getThrottledJobs(req.auth.team_id));
jobStatuses = jobStatuses.filter(x => !throttledJobs.has(x[0])); // throttled jobs can have a failed status, but they are not actually failed
const status: Exclude<CrawlStatusResponse, ErrorResponse>["status"] = sc.cancelled ? "cancelled" : jobStatuses.every(x => x[1] === "completed") ? "completed" : jobStatuses.some(x => x[1] === "failed") ? "failed" : "scraping";
const doneJobsLength = await getDoneJobsOrderedLength(req.params.jobId);
const doneJobsOrder = await getDoneJobsOrdered(req.params.jobId, start, end ?? -1);

View File

@ -82,6 +82,10 @@ export async function getCrawlJobs(id: string): Promise<string[]> {
return await redisConnection.smembers("crawl:" + id + ":jobs");
}
export async function getThrottledJobs(teamId: string): Promise<string[]> {
return await redisConnection.zrangebyscore("concurrency-limiter:" + teamId + ":throttled", Date.now(), Infinity);
}
export async function lockURL(id: string, sc: StoredCrawl, url: string): Promise<boolean> {
if (typeof sc.crawlerOptions?.limit === "number") {
if (await redisConnection.scard("crawl:" + id + ":visited") >= sc.crawlerOptions.limit) {

View File

@ -132,18 +132,25 @@ const workerFun = async (
const concurrencyLimiterKey = "concurrency-limiter:" + job.data?.team_id;
if (job.data && job.data.team_id) {
const concurrencyLimiterThrottledKey = "concurrency-limiter:" + job.data.team_id + ":throttled";
const concurrencyLimit = 10; // TODO: determine based on price id
const now = Date.now();
const stalledJobTimeoutMs = 2 * 60 * 1000;
const throttledJobTimeoutMs = 10 * 60 * 1000;
redisConnection.zremrangebyscore(concurrencyLimiterThrottledKey, -Infinity, now);
redisConnection.zremrangebyscore(concurrencyLimiterKey, -Infinity, now);
const activeJobsOfTeam = await redisConnection.zrangebyscore(concurrencyLimiterKey, now, Infinity);
if (activeJobsOfTeam.length >= concurrencyLimit) {
Logger.info("Moving job " + job.id + " back the queue -- concurrency limit hit");
// Concurrency limit hit
await redisConnection.zadd(concurrencyLimiterThrottledKey, now + throttledJobTimeoutMs, job.id);
await job.moveToFailed(new Error("Concurrency limit hit"), token, false);
await job.remove();
await queue.add(job.name, job.data, {
await queue.add(job.name, {
...job.data,
concurrencyLimitHit: true,
}, {
...job.opts,
jobId: job.id,
priority: Math.round((job.opts.priority ?? 10) * 1.25), // exponential backoff for stuck jobs
@ -153,6 +160,7 @@ const workerFun = async (
continue;
} else {
await redisConnection.zadd(concurrencyLimiterKey, now + stalledJobTimeoutMs, job.id);
await redisConnection.zrem(concurrencyLimiterThrottledKey, job.id);
}
}
@ -294,7 +302,10 @@ async function processJob(job: Job, token: string) {
},
project_id: job.data.project_id,
error: message /* etc... */,
docs,
docs: job.data.concurrencyLimitHit ? docs.map(x => ({
...x,
warning: "This scrape was throttled because you hit you concurrency limit." + (x.warning ? " " + x.warning : ""),
})) : docs,
};
// No idea what this does and when it is called.