From 8a8d7d645f6f149776ac5cb2f167a3cf06ef271e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=C3=B3ricz=20Gerg=C5=91?= Date: Fri, 31 Jan 2025 11:22:10 +0100 Subject: [PATCH] fix(concurrency): proper job timeouting --- .../src/__tests__/concurrency-limit.test.ts | 4 +-- .../queue-concurrency-integration.test.ts | 4 +-- apps/api/src/lib/concurrency-limit.ts | 31 +++++++++++++++++-- apps/api/src/services/queue-jobs.ts | 11 ++++++- apps/api/src/services/queue-worker.ts | 3 +- 5 files changed, 44 insertions(+), 9 deletions(-) diff --git a/apps/api/src/__tests__/concurrency-limit.test.ts b/apps/api/src/__tests__/concurrency-limit.test.ts index 932b3724..2cd9a63f 100644 --- a/apps/api/src/__tests__/concurrency-limit.test.ts +++ b/apps/api/src/__tests__/concurrency-limit.test.ts @@ -71,7 +71,7 @@ describe("Concurrency Limit", () => { describe("pushConcurrencyLimitActiveJob", () => { it("should add job with expiration timestamp", async () => { - await pushConcurrencyLimitActiveJob(mockTeamId, mockJobId, mockNow); + await pushConcurrencyLimitActiveJob(mockTeamId, mockJobId, 2 * 60 * 1000, mockNow); expect(redisConnection.zadd).toHaveBeenCalledWith( "concurrency-limiter:test-team-id", @@ -223,7 +223,7 @@ describe("Concurrency Limit", () => { expect(takenJob).toEqual(mockJob); // Add to active jobs - await pushConcurrencyLimitActiveJob(mockTeamId, mockJob.id, mockNow); + await pushConcurrencyLimitActiveJob(mockTeamId, mockJob.id, 2 * 60 * 1000, mockNow); expect(redisConnection.zadd).toHaveBeenCalled(); // Verify active jobs diff --git a/apps/api/src/__tests__/queue-concurrency-integration.test.ts b/apps/api/src/__tests__/queue-concurrency-integration.test.ts index 3a2b2a30..d93b200a 100644 --- a/apps/api/src/__tests__/queue-concurrency-integration.test.ts +++ b/apps/api/src/__tests__/queue-concurrency-integration.test.ts @@ -208,7 +208,7 @@ describe("Queue Concurrency Integration", () => { expect(nextJob).toEqual(queuedJob); // Should have added new job to active jobs - await pushConcurrencyLimitActiveJob(mockTeamId, nextJob!.id); + await pushConcurrencyLimitActiveJob(mockTeamId, nextJob!.id, 2 * 60 * 1000); expect(redisConnection.zadd).toHaveBeenCalledWith( expect.stringContaining("concurrency-limiter"), expect.any(Number), @@ -226,7 +226,7 @@ describe("Queue Concurrency Integration", () => { }; // Add job to active jobs - await pushConcurrencyLimitActiveJob(mockTeamId, mockJob.id); + await pushConcurrencyLimitActiveJob(mockTeamId, mockJob.id, 2 * 60 * 1000); // Simulate job failure and cleanup await removeConcurrencyLimitActiveJob(mockTeamId, mockJob.id); diff --git a/apps/api/src/lib/concurrency-limit.ts b/apps/api/src/lib/concurrency-limit.ts index afbbb334..7bab02f9 100644 --- a/apps/api/src/lib/concurrency-limit.ts +++ b/apps/api/src/lib/concurrency-limit.ts @@ -1,13 +1,37 @@ import { CONCURRENCY_LIMIT } from "../services/rate-limiter"; import { redisConnection } from "../services/queue-service"; import { PlanType } from "../types"; -import { JobsOptions } from "bullmq"; +import type { Job, JobsOptions } from "bullmq"; const constructKey = (team_id: string) => "concurrency-limiter:" + team_id; const constructQueueKey = (team_id: string) => "concurrency-limit-queue:" + team_id; -const stalledJobTimeoutMs = 2 * 60 * 1000; +export function calculateJobTimeToRun( + job: ConcurrencyLimitedJob +): number { + let jobTimeToRun = 2 * 60 * 1000; + + if (job.data.scrapeOptions) { + if (job.data.scrapeOptions.timeout) { + jobTimeToRun = job.data.scrapeOptions.timeout; + } + + if (job.data.scrapeOptions.waitFor) { + jobTimeToRun += job.data.scrapeOptions.waitFor; + } + + (job.data.scrapeOptions.actions ?? []).forEach(x => { + if (x.type === "wait" && x.milliseconds) { + jobTimeToRun += x.milliseconds; + } else { + jobTimeToRun += 1000; + } + }) + } + + return jobTimeToRun; +} export async function cleanOldConcurrencyLimitEntries( team_id: string, @@ -30,11 +54,12 @@ export async function getConcurrencyLimitActiveJobs( export async function pushConcurrencyLimitActiveJob( team_id: string, id: string, + timeout: number, now: number = Date.now(), ) { await redisConnection.zadd( constructKey(team_id), - now + stalledJobTimeoutMs, + now + timeout, id, ); } diff --git a/apps/api/src/services/queue-jobs.ts b/apps/api/src/services/queue-jobs.ts index 40fb11e0..7433e6f9 100644 --- a/apps/api/src/services/queue-jobs.ts +++ b/apps/api/src/services/queue-jobs.ts @@ -4,6 +4,7 @@ import { v4 as uuidv4 } from "uuid"; import { NotificationType, PlanType, WebScraperOptions } from "../types"; import * as Sentry from "@sentry/node"; import { + calculateJobTimeToRun, cleanOldConcurrencyLimitEntries, getConcurrencyLimitActiveJobs, getConcurrencyQueueJobsCount, @@ -43,7 +44,15 @@ export async function _addScrapeJobToBullMQ( webScraperOptions.team_id && webScraperOptions.plan ) { - await pushConcurrencyLimitActiveJob(webScraperOptions.team_id, jobId); + await pushConcurrencyLimitActiveJob(webScraperOptions.team_id, jobId, calculateJobTimeToRun({ + id: jobId, + opts: { + ...options, + priority: jobPriority, + jobId, + }, + data: webScraperOptions, + })); } await getScrapeQueue().add(jobId, webScraperOptions, { diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index 76e5f336..c4ebe939 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -48,6 +48,7 @@ import { configDotenv } from "dotenv"; import { scrapeOptions } from "../controllers/v1/types"; import { getRateLimiterPoints } from "./rate-limiter"; import { + calculateJobTimeToRun, cleanOldConcurrencyLimitEntries, pushConcurrencyLimitActiveJob, removeConcurrencyLimitActiveJob, @@ -446,7 +447,7 @@ const workerFun = async ( // we are 1 under the limit, assuming the job insertion logic never over-inserts. - MG const nextJob = await takeConcurrencyLimitedJob(job.data.team_id); if (nextJob !== null) { - await pushConcurrencyLimitActiveJob(job.data.team_id, nextJob.id); + await pushConcurrencyLimitActiveJob(job.data.team_id, nextJob.id, calculateJobTimeToRun(nextJob)); await queue.add( nextJob.id,