fix(concurrency): proper job timeouting

This commit is contained in:
Móricz Gergő 2025-01-31 11:22:10 +01:00
parent e026917698
commit 8a8d7d645f
5 changed files with 44 additions and 9 deletions

View File

@ -71,7 +71,7 @@ describe("Concurrency Limit", () => {
describe("pushConcurrencyLimitActiveJob", () => {
it("should add job with expiration timestamp", async () => {
await pushConcurrencyLimitActiveJob(mockTeamId, mockJobId, mockNow);
await pushConcurrencyLimitActiveJob(mockTeamId, mockJobId, 2 * 60 * 1000, mockNow);
expect(redisConnection.zadd).toHaveBeenCalledWith(
"concurrency-limiter:test-team-id",
@ -223,7 +223,7 @@ describe("Concurrency Limit", () => {
expect(takenJob).toEqual(mockJob);
// Add to active jobs
await pushConcurrencyLimitActiveJob(mockTeamId, mockJob.id, mockNow);
await pushConcurrencyLimitActiveJob(mockTeamId, mockJob.id, 2 * 60 * 1000, mockNow);
expect(redisConnection.zadd).toHaveBeenCalled();
// Verify active jobs

View File

@ -208,7 +208,7 @@ describe("Queue Concurrency Integration", () => {
expect(nextJob).toEqual(queuedJob);
// Should have added new job to active jobs
await pushConcurrencyLimitActiveJob(mockTeamId, nextJob!.id);
await pushConcurrencyLimitActiveJob(mockTeamId, nextJob!.id, 2 * 60 * 1000);
expect(redisConnection.zadd).toHaveBeenCalledWith(
expect.stringContaining("concurrency-limiter"),
expect.any(Number),
@ -226,7 +226,7 @@ describe("Queue Concurrency Integration", () => {
};
// Add job to active jobs
await pushConcurrencyLimitActiveJob(mockTeamId, mockJob.id);
await pushConcurrencyLimitActiveJob(mockTeamId, mockJob.id, 2 * 60 * 1000);
// Simulate job failure and cleanup
await removeConcurrencyLimitActiveJob(mockTeamId, mockJob.id);

View File

@ -1,13 +1,37 @@
import { CONCURRENCY_LIMIT } from "../services/rate-limiter";
import { redisConnection } from "../services/queue-service";
import { PlanType } from "../types";
import { JobsOptions } from "bullmq";
import type { Job, JobsOptions } from "bullmq";
const constructKey = (team_id: string) => "concurrency-limiter:" + team_id;
const constructQueueKey = (team_id: string) =>
"concurrency-limit-queue:" + team_id;
const stalledJobTimeoutMs = 2 * 60 * 1000;
export function calculateJobTimeToRun(
job: ConcurrencyLimitedJob
): number {
let jobTimeToRun = 2 * 60 * 1000;
if (job.data.scrapeOptions) {
if (job.data.scrapeOptions.timeout) {
jobTimeToRun = job.data.scrapeOptions.timeout;
}
if (job.data.scrapeOptions.waitFor) {
jobTimeToRun += job.data.scrapeOptions.waitFor;
}
(job.data.scrapeOptions.actions ?? []).forEach(x => {
if (x.type === "wait" && x.milliseconds) {
jobTimeToRun += x.milliseconds;
} else {
jobTimeToRun += 1000;
}
})
}
return jobTimeToRun;
}
export async function cleanOldConcurrencyLimitEntries(
team_id: string,
@ -30,11 +54,12 @@ export async function getConcurrencyLimitActiveJobs(
export async function pushConcurrencyLimitActiveJob(
team_id: string,
id: string,
timeout: number,
now: number = Date.now(),
) {
await redisConnection.zadd(
constructKey(team_id),
now + stalledJobTimeoutMs,
now + timeout,
id,
);
}

View File

@ -4,6 +4,7 @@ import { v4 as uuidv4 } from "uuid";
import { NotificationType, PlanType, WebScraperOptions } from "../types";
import * as Sentry from "@sentry/node";
import {
calculateJobTimeToRun,
cleanOldConcurrencyLimitEntries,
getConcurrencyLimitActiveJobs,
getConcurrencyQueueJobsCount,
@ -43,7 +44,15 @@ export async function _addScrapeJobToBullMQ(
webScraperOptions.team_id &&
webScraperOptions.plan
) {
await pushConcurrencyLimitActiveJob(webScraperOptions.team_id, jobId);
await pushConcurrencyLimitActiveJob(webScraperOptions.team_id, jobId, calculateJobTimeToRun({
id: jobId,
opts: {
...options,
priority: jobPriority,
jobId,
},
data: webScraperOptions,
}));
}
await getScrapeQueue().add(jobId, webScraperOptions, {

View File

@ -48,6 +48,7 @@ import { configDotenv } from "dotenv";
import { scrapeOptions } from "../controllers/v1/types";
import { getRateLimiterPoints } from "./rate-limiter";
import {
calculateJobTimeToRun,
cleanOldConcurrencyLimitEntries,
pushConcurrencyLimitActiveJob,
removeConcurrencyLimitActiveJob,
@ -446,7 +447,7 @@ const workerFun = async (
// we are 1 under the limit, assuming the job insertion logic never over-inserts. - MG
const nextJob = await takeConcurrencyLimitedJob(job.data.team_id);
if (nextJob !== null) {
await pushConcurrencyLimitActiveJob(job.data.team_id, nextJob.id);
await pushConcurrencyLimitActiveJob(job.data.team_id, nextJob.id, calculateJobTimeToRun(nextJob));
await queue.add(
nextJob.id,