From b696bfc8545ce0b6f01b21a189a69dfd8fad489b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gerg=C5=91=20M=C3=B3ricz?= Date: Thu, 26 Sep 2024 21:00:27 +0200 Subject: [PATCH] fix(crawl-status): avoid race conditions where crawl may be deemed failed --- apps/api/src/controllers/v0/crawl-status.ts | 4 ++-- apps/api/src/controllers/v1/crawl-status-ws.ts | 8 +++++--- apps/api/src/controllers/v1/crawl-status.ts | 8 +++++--- apps/api/src/lib/crawl-redis.ts | 4 ++++ apps/api/src/services/queue-worker.ts | 15 +++++++++++++-- 5 files changed, 29 insertions(+), 10 deletions(-) diff --git a/apps/api/src/controllers/v0/crawl-status.ts b/apps/api/src/controllers/v0/crawl-status.ts index 41491f86..1b1ffdc5 100644 --- a/apps/api/src/controllers/v0/crawl-status.ts +++ b/apps/api/src/controllers/v0/crawl-status.ts @@ -54,9 +54,9 @@ export async function crawlStatusController(req: Request, res: Response) { const jobs = (await getJobs(req.params.jobId, jobIDs)).sort((a, b) => a.timestamp - b.timestamp); const jobStatuses = await Promise.all(jobs.map(x => x.getState())); - const jobStatus = sc.cancelled ? "failed" : jobStatuses.every(x => x === "completed") ? "completed" : jobStatuses.some(x => x === "failed") ? "failed" : "active"; + const jobStatus = sc.cancelled ? "failed" : jobStatuses.every(x => x === "completed") ? "completed" : jobs.some((x, i) => jobStatuses[i] === "failed" && x.failedReason !== "Concurrency limit hit") ? "failed" : "active"; - const data = jobs.map(x => Array.isArray(x.returnvalue) ? x.returnvalue[0] : x.returnvalue); + const data = jobs.filter(x => x.failedReason !== "Concurreny limit hit").map(x => Array.isArray(x.returnvalue) ? x.returnvalue[0] : x.returnvalue); if ( jobs.length > 0 && diff --git a/apps/api/src/controllers/v1/crawl-status-ws.ts b/apps/api/src/controllers/v1/crawl-status-ws.ts index 16a67682..9832a948 100644 --- a/apps/api/src/controllers/v1/crawl-status-ws.ts +++ b/apps/api/src/controllers/v1/crawl-status-ws.ts @@ -5,7 +5,7 @@ import { CrawlStatusParams, CrawlStatusResponse, Document, ErrorResponse, legacy import { WebSocket } from "ws"; import { v4 as uuidv4 } from "uuid"; import { Logger } from "../../lib/logger"; -import { getCrawl, getCrawlExpiry, getCrawlJobs, getDoneJobsOrdered, getDoneJobsOrderedLength, isCrawlFinished, isCrawlFinishedLocked } from "../../lib/crawl-redis"; +import { getCrawl, getCrawlExpiry, getCrawlJobs, getDoneJobsOrdered, getDoneJobsOrderedLength, getThrottledJobs, isCrawlFinished, isCrawlFinishedLocked } from "../../lib/crawl-redis"; import { getScrapeQueue } from "../../services/queue-service"; import { getJob, getJobs } from "./crawl-status"; import * as Sentry from "@sentry/node"; @@ -95,8 +95,10 @@ async function crawlStatusWS(ws: WebSocket, req: RequestWithAuth getScrapeQueue().getJobState(x))); - const status: Exclude["status"] = sc.cancelled ? "cancelled" : jobStatuses.every(x => x === "completed") ? "completed" : jobStatuses.some(x => x === "failed") ? "failed" : "scraping"; + let jobStatuses = await Promise.all(jobIDs.map(async x => [x, await getScrapeQueue().getJobState(x)] as const)); + const throttledJobs = new Set(...await getThrottledJobs(req.auth.team_id)); + jobStatuses = jobStatuses.filter(x => !throttledJobs.has(x[0])); // throttled jobs can have a failed status, but they are not actually failed + const status: Exclude["status"] = sc.cancelled ? "cancelled" : jobStatuses.every(x => x[1] === "completed") ? "completed" : jobStatuses.some(x => x[1] === "failed") ? "failed" : "scraping"; const doneJobs = await getJobs(doneJobIDs); const data = doneJobs.map(x => x.returnvalue); diff --git a/apps/api/src/controllers/v1/crawl-status.ts b/apps/api/src/controllers/v1/crawl-status.ts index 2ee0638c..df36f894 100644 --- a/apps/api/src/controllers/v1/crawl-status.ts +++ b/apps/api/src/controllers/v1/crawl-status.ts @@ -1,6 +1,6 @@ import { Response } from "express"; import { CrawlStatusParams, CrawlStatusResponse, ErrorResponse, legacyDocumentConverter, RequestWithAuth } from "./types"; -import { getCrawl, getCrawlExpiry, getCrawlJobs, getDoneJobsOrdered, getDoneJobsOrderedLength } from "../../lib/crawl-redis"; +import { getCrawl, getCrawlExpiry, getCrawlJobs, getDoneJobsOrdered, getDoneJobsOrderedLength, getThrottledJobs } from "../../lib/crawl-redis"; import { getScrapeQueue } from "../../services/queue-service"; import { supabaseGetJobById, supabaseGetJobsById } from "../../lib/supabase-jobs"; import { configDotenv } from "dotenv"; @@ -58,8 +58,10 @@ export async function crawlStatusController(req: RequestWithAuth getScrapeQueue().getJobState(x))); - const status: Exclude["status"] = sc.cancelled ? "cancelled" : jobStatuses.every(x => x === "completed") ? "completed" : jobStatuses.some(x => x === "failed") ? "failed" : "scraping"; + let jobStatuses = await Promise.all(jobIDs.map(async x => [x, await getScrapeQueue().getJobState(x)] as const)); + const throttledJobs = new Set(...await getThrottledJobs(req.auth.team_id)); + jobStatuses = jobStatuses.filter(x => !throttledJobs.has(x[0])); // throttled jobs can have a failed status, but they are not actually failed + const status: Exclude["status"] = sc.cancelled ? "cancelled" : jobStatuses.every(x => x[1] === "completed") ? "completed" : jobStatuses.some(x => x[1] === "failed") ? "failed" : "scraping"; const doneJobsLength = await getDoneJobsOrderedLength(req.params.jobId); const doneJobsOrder = await getDoneJobsOrdered(req.params.jobId, start, end ?? -1); diff --git a/apps/api/src/lib/crawl-redis.ts b/apps/api/src/lib/crawl-redis.ts index 9240018e..5b1ee77d 100644 --- a/apps/api/src/lib/crawl-redis.ts +++ b/apps/api/src/lib/crawl-redis.ts @@ -82,6 +82,10 @@ export async function getCrawlJobs(id: string): Promise { return await redisConnection.smembers("crawl:" + id + ":jobs"); } +export async function getThrottledJobs(teamId: string): Promise { + return await redisConnection.zrangebyscore("concurrency-limiter:" + teamId + ":throttled", Date.now(), Infinity); +} + export async function lockURL(id: string, sc: StoredCrawl, url: string): Promise { if (typeof sc.crawlerOptions?.limit === "number") { if (await redisConnection.scard("crawl:" + id + ":visited") >= sc.crawlerOptions.limit) { diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index ed13cab7..7468a050 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -132,18 +132,25 @@ const workerFun = async ( const concurrencyLimiterKey = "concurrency-limiter:" + job.data?.team_id; if (job.data && job.data.team_id) { + const concurrencyLimiterThrottledKey = "concurrency-limiter:" + job.data.team_id + ":throttled"; const concurrencyLimit = 10; // TODO: determine based on price id const now = Date.now(); const stalledJobTimeoutMs = 2 * 60 * 1000; + const throttledJobTimeoutMs = 10 * 60 * 1000; + redisConnection.zremrangebyscore(concurrencyLimiterThrottledKey, -Infinity, now); redisConnection.zremrangebyscore(concurrencyLimiterKey, -Infinity, now); const activeJobsOfTeam = await redisConnection.zrangebyscore(concurrencyLimiterKey, now, Infinity); if (activeJobsOfTeam.length >= concurrencyLimit) { Logger.info("Moving job " + job.id + " back the queue -- concurrency limit hit"); // Concurrency limit hit + await redisConnection.zadd(concurrencyLimiterThrottledKey, now + throttledJobTimeoutMs, job.id); await job.moveToFailed(new Error("Concurrency limit hit"), token, false); await job.remove(); - await queue.add(job.name, job.data, { + await queue.add(job.name, { + ...job.data, + concurrencyLimitHit: true, + }, { ...job.opts, jobId: job.id, priority: Math.round((job.opts.priority ?? 10) * 1.25), // exponential backoff for stuck jobs @@ -153,6 +160,7 @@ const workerFun = async ( continue; } else { await redisConnection.zadd(concurrencyLimiterKey, now + stalledJobTimeoutMs, job.id); + await redisConnection.zrem(concurrencyLimiterThrottledKey, job.id); } } @@ -294,7 +302,10 @@ async function processJob(job: Job, token: string) { }, project_id: job.data.project_id, error: message /* etc... */, - docs, + docs: job.data.concurrencyLimitHit ? docs.map(x => ({ + ...x, + warning: "This scrape was throttled because you hit you concurrency limit." + (x.warning ? " " + x.warning : ""), + })) : docs, }; // No idea what this does and when it is called.