fix(crawl-status): consider concurrency limited jobs as prioritized (#1184)

This commit is contained in:
Gergő Móricz 2025-02-16 15:52:17 +01:00 committed by GitHub
parent 7ac2b99210
commit fd8b38902a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 19 additions and 33 deletions

View File

@ -2,27 +2,15 @@ import { Response } from "express";
import { import {
CrawlErrorsResponse, CrawlErrorsResponse,
CrawlStatusParams, CrawlStatusParams,
CrawlStatusResponse,
ErrorResponse,
RequestWithAuth, RequestWithAuth,
} from "./types"; } from "./types";
import { import {
getCrawl, getCrawl,
getCrawlExpiry,
getCrawlJobs, getCrawlJobs,
getDoneJobsOrdered,
getDoneJobsOrderedLength,
getThrottledJobs,
isCrawlFinished,
} from "../../lib/crawl-redis"; } from "../../lib/crawl-redis";
import { getScrapeQueue, redisConnection } from "../../services/queue-service"; import { getScrapeQueue, redisConnection } from "../../services/queue-service";
import {
supabaseGetJobById,
supabaseGetJobsById,
} from "../../lib/supabase-jobs";
import { configDotenv } from "dotenv"; import { configDotenv } from "dotenv";
import { Job, JobState } from "bullmq"; import { Job } from "bullmq";
import { logger } from "../../lib/logger";
configDotenv(); configDotenv();
export async function getJob(id: string) { export async function getJob(id: string) {

View File

@ -17,7 +17,6 @@ import {
getCrawlJobs, getCrawlJobs,
getDoneJobsOrdered, getDoneJobsOrdered,
getDoneJobsOrderedLength, getDoneJobsOrderedLength,
getThrottledJobs,
isCrawlFinished, isCrawlFinished,
isCrawlFinishedLocked, isCrawlFinishedLocked,
} from "../../lib/crawl-redis"; } from "../../lib/crawl-redis";
@ -25,6 +24,7 @@ import { getScrapeQueue } from "../../services/queue-service";
import { getJob, getJobs } from "./crawl-status"; import { getJob, getJobs } from "./crawl-status";
import * as Sentry from "@sentry/node"; import * as Sentry from "@sentry/node";
import { Job, JobState } from "bullmq"; import { Job, JobState } from "bullmq";
import { getConcurrencyLimitedJobs } from "../../lib/concurrency-limit";
type ErrorMessage = { type ErrorMessage = {
type: "error"; type: "error";
@ -127,16 +127,16 @@ async function crawlStatusWS(
async (x) => [x, await getScrapeQueue().getJobState(x)] as const, async (x) => [x, await getScrapeQueue().getJobState(x)] as const,
), ),
); );
const throttledJobs = new Set(...(await getThrottledJobs(req.auth.team_id))); const throttledJobsSet = await getConcurrencyLimitedJobs(req.auth.team_id);
const throttledJobsSet = new Set(throttledJobs);
const validJobStatuses: [string, JobState | "unknown"][] = []; const validJobStatuses: [string, JobState | "unknown"][] = [];
const validJobIDs: string[] = []; const validJobIDs: string[] = [];
for (const [id, status] of jobStatuses) { for (const [id, status] of jobStatuses) {
if ( if (throttledJobsSet.has(id)) {
!throttledJobsSet.has(id) && validJobStatuses.push([id, "prioritized"]);
validJobIDs.push(id);
} else if (
status !== "failed" && status !== "failed" &&
status !== "unknown" status !== "unknown"
) { ) {

View File

@ -11,7 +11,6 @@ import {
getCrawlJobs, getCrawlJobs,
getDoneJobsOrdered, getDoneJobsOrdered,
getDoneJobsOrderedLength, getDoneJobsOrderedLength,
getThrottledJobs,
isCrawlKickoffFinished, isCrawlKickoffFinished,
} from "../../lib/crawl-redis"; } from "../../lib/crawl-redis";
import { getScrapeQueue } from "../../services/queue-service"; import { getScrapeQueue } from "../../services/queue-service";
@ -23,6 +22,7 @@ import { configDotenv } from "dotenv";
import type { Job, JobState } from "bullmq"; import type { Job, JobState } from "bullmq";
import { logger } from "../../lib/logger"; import { logger } from "../../lib/logger";
import { supabase_service } from "../../services/supabase"; import { supabase_service } from "../../services/supabase";
import { getConcurrencyLimitedJobs } from "../../lib/concurrency-limit";
configDotenv(); configDotenv();
export type PseudoJob<T> = { export type PseudoJob<T> = {
@ -137,16 +137,17 @@ export async function crawlStatusController(
async (x) => [x, await getScrapeQueue().getJobState(x)] as const, async (x) => [x, await getScrapeQueue().getJobState(x)] as const,
), ),
); );
const throttledJobs = new Set(...(await getThrottledJobs(req.auth.team_id)));
const throttledJobsSet = new Set(throttledJobs); const throttledJobsSet = await getConcurrencyLimitedJobs(req.auth.team_id);
const validJobStatuses: [string, JobState | "unknown"][] = []; const validJobStatuses: [string, JobState | "unknown"][] = [];
const validJobIDs: string[] = []; const validJobIDs: string[] = [];
for (const [id, status] of jobStatuses) { for (const [id, status] of jobStatuses) {
if ( if (throttledJobsSet.has(id)) {
!throttledJobsSet.has(id) && validJobStatuses.push([id, "prioritized"]);
validJobIDs.push(id);
} else if (
status !== "failed" && status !== "failed" &&
status !== "unknown" status !== "unknown"
) { ) {

View File

@ -100,6 +100,11 @@ export async function pushConcurrencyLimitedJob(
); );
} }
export async function getConcurrencyLimitedJobs(
team_id: string,
) {
return new Set((await redisConnection.zrange(constructQueueKey(team_id), 0, -1)).map(x => JSON.parse(x).id));
}
export async function getConcurrencyQueueJobsCount(team_id: string): Promise<number> { export async function getConcurrencyQueueJobsCount(team_id: string): Promise<number> {
const count = await redisConnection.zcard(constructQueueKey(team_id)); const count = await redisConnection.zcard(constructQueueKey(team_id));

View File

@ -184,14 +184,6 @@ export async function getCrawlJobCount(id: string): Promise<number> {
return await redisConnection.scard("crawl:" + id + ":jobs"); return await redisConnection.scard("crawl:" + id + ":jobs");
} }
export async function getThrottledJobs(teamId: string): Promise<string[]> {
return await redisConnection.zrangebyscore(
"concurrency-limiter:" + teamId + ":throttled",
Date.now(),
Infinity,
);
}
export function normalizeURL(url: string, sc: StoredCrawl): string { export function normalizeURL(url: string, sc: StoredCrawl): string {
const urlO = new URL(url); const urlO = new URL(url);
if (!sc.crawlerOptions || sc.crawlerOptions.ignoreQueryParameters) { if (!sc.crawlerOptions || sc.crawlerOptions.ignoreQueryParameters) {