From fd8b38902a62aa6d631a2a0956b59fbeec1ab292 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gerg=C5=91=20M=C3=B3ricz?= Date: Sun, 16 Feb 2025 15:52:17 +0100 Subject: [PATCH] fix(crawl-status): consider concurrency limited jobs as prioritized (#1184) --- apps/api/src/controllers/v1/crawl-errors.ts | 14 +------------- apps/api/src/controllers/v1/crawl-status-ws.ts | 14 +++++++------- apps/api/src/controllers/v1/crawl-status.ts | 11 ++++++----- apps/api/src/lib/concurrency-limit.ts | 5 +++++ apps/api/src/lib/crawl-redis.ts | 8 -------- 5 files changed, 19 insertions(+), 33 deletions(-) diff --git a/apps/api/src/controllers/v1/crawl-errors.ts b/apps/api/src/controllers/v1/crawl-errors.ts index defdda01..979a6d7a 100644 --- a/apps/api/src/controllers/v1/crawl-errors.ts +++ b/apps/api/src/controllers/v1/crawl-errors.ts @@ -2,27 +2,15 @@ import { Response } from "express"; import { CrawlErrorsResponse, CrawlStatusParams, - CrawlStatusResponse, - ErrorResponse, RequestWithAuth, } from "./types"; import { getCrawl, - getCrawlExpiry, getCrawlJobs, - getDoneJobsOrdered, - getDoneJobsOrderedLength, - getThrottledJobs, - isCrawlFinished, } from "../../lib/crawl-redis"; import { getScrapeQueue, redisConnection } from "../../services/queue-service"; -import { - supabaseGetJobById, - supabaseGetJobsById, -} from "../../lib/supabase-jobs"; import { configDotenv } from "dotenv"; -import { Job, JobState } from "bullmq"; -import { logger } from "../../lib/logger"; +import { Job } from "bullmq"; configDotenv(); export async function getJob(id: string) { diff --git a/apps/api/src/controllers/v1/crawl-status-ws.ts b/apps/api/src/controllers/v1/crawl-status-ws.ts index 9ed1d1c6..41f99094 100644 --- a/apps/api/src/controllers/v1/crawl-status-ws.ts +++ b/apps/api/src/controllers/v1/crawl-status-ws.ts @@ -17,7 +17,6 @@ import { getCrawlJobs, getDoneJobsOrdered, getDoneJobsOrderedLength, - getThrottledJobs, isCrawlFinished, isCrawlFinishedLocked, } from "../../lib/crawl-redis"; @@ -25,6 +24,7 @@ import { getScrapeQueue } from "../../services/queue-service"; import { getJob, getJobs } from "./crawl-status"; import * as Sentry from "@sentry/node"; import { Job, JobState } from "bullmq"; +import { getConcurrencyLimitedJobs } from "../../lib/concurrency-limit"; type ErrorMessage = { type: "error"; @@ -127,16 +127,16 @@ async function crawlStatusWS( async (x) => [x, await getScrapeQueue().getJobState(x)] as const, ), ); - const throttledJobs = new Set(...(await getThrottledJobs(req.auth.team_id))); - - const throttledJobsSet = new Set(throttledJobs); - + const throttledJobsSet = await getConcurrencyLimitedJobs(req.auth.team_id); + const validJobStatuses: [string, JobState | "unknown"][] = []; const validJobIDs: string[] = []; for (const [id, status] of jobStatuses) { - if ( - !throttledJobsSet.has(id) && + if (throttledJobsSet.has(id)) { + validJobStatuses.push([id, "prioritized"]); + validJobIDs.push(id); + } else if ( status !== "failed" && status !== "unknown" ) { diff --git a/apps/api/src/controllers/v1/crawl-status.ts b/apps/api/src/controllers/v1/crawl-status.ts index d43562b6..8d0ea1b7 100644 --- a/apps/api/src/controllers/v1/crawl-status.ts +++ b/apps/api/src/controllers/v1/crawl-status.ts @@ -11,7 +11,6 @@ import { getCrawlJobs, getDoneJobsOrdered, getDoneJobsOrderedLength, - getThrottledJobs, isCrawlKickoffFinished, } from "../../lib/crawl-redis"; import { getScrapeQueue } from "../../services/queue-service"; @@ -23,6 +22,7 @@ import { configDotenv } from "dotenv"; import type { Job, JobState } from "bullmq"; import { logger } from "../../lib/logger"; import { supabase_service } from "../../services/supabase"; +import { getConcurrencyLimitedJobs } from "../../lib/concurrency-limit"; configDotenv(); export type PseudoJob = { @@ -137,16 +137,17 @@ export async function crawlStatusController( async (x) => [x, await getScrapeQueue().getJobState(x)] as const, ), ); - const throttledJobs = new Set(...(await getThrottledJobs(req.auth.team_id))); - const throttledJobsSet = new Set(throttledJobs); + const throttledJobsSet = await getConcurrencyLimitedJobs(req.auth.team_id); const validJobStatuses: [string, JobState | "unknown"][] = []; const validJobIDs: string[] = []; for (const [id, status] of jobStatuses) { - if ( - !throttledJobsSet.has(id) && + if (throttledJobsSet.has(id)) { + validJobStatuses.push([id, "prioritized"]); + validJobIDs.push(id); + } else if ( status !== "failed" && status !== "unknown" ) { diff --git a/apps/api/src/lib/concurrency-limit.ts b/apps/api/src/lib/concurrency-limit.ts index 8fa87cb2..5a1578ed 100644 --- a/apps/api/src/lib/concurrency-limit.ts +++ b/apps/api/src/lib/concurrency-limit.ts @@ -100,6 +100,11 @@ export async function pushConcurrencyLimitedJob( ); } +export async function getConcurrencyLimitedJobs( + team_id: string, +) { + return new Set((await redisConnection.zrange(constructQueueKey(team_id), 0, -1)).map(x => JSON.parse(x).id)); +} export async function getConcurrencyQueueJobsCount(team_id: string): Promise { const count = await redisConnection.zcard(constructQueueKey(team_id)); diff --git a/apps/api/src/lib/crawl-redis.ts b/apps/api/src/lib/crawl-redis.ts index c0870586..eaee3491 100644 --- a/apps/api/src/lib/crawl-redis.ts +++ b/apps/api/src/lib/crawl-redis.ts @@ -184,14 +184,6 @@ export async function getCrawlJobCount(id: string): Promise { return await redisConnection.scard("crawl:" + id + ":jobs"); } -export async function getThrottledJobs(teamId: string): Promise { - return await redisConnection.zrangebyscore( - "concurrency-limiter:" + teamId + ":throttled", - Date.now(), - Infinity, - ); -} - export function normalizeURL(url: string, sc: StoredCrawl): string { const urlO = new URL(url); if (!sc.crawlerOptions || sc.crawlerOptions.ignoreQueryParameters) {