diff --git a/apps/api/src/__tests__/concurrency-limit.test.ts b/apps/api/src/__tests__/concurrency-limit.test.ts index 742bffd6..8a2b3ddc 100644 --- a/apps/api/src/__tests__/concurrency-limit.test.ts +++ b/apps/api/src/__tests__/concurrency-limit.test.ts @@ -126,7 +126,7 @@ describe("Concurrency Limit", () => { describe("pushConcurrencyLimitedJob", () => { it("should add job to queue with priority", async () => { - await pushConcurrencyLimitedJob(mockTeamId, mockJob); + await pushConcurrencyLimitedJob(mockTeamId, mockJob, 30000); expect(redisConnection.zadd).toHaveBeenCalledWith( "concurrency-limit-queue:test-team-id", @@ -139,7 +139,7 @@ describe("Concurrency Limit", () => { const jobWithoutPriority = { ...mockJob }; delete jobWithoutPriority.priority; - await pushConcurrencyLimitedJob(mockTeamId, jobWithoutPriority); + await pushConcurrencyLimitedJob(mockTeamId, jobWithoutPriority, 30000); expect(redisConnection.zadd).toHaveBeenCalledWith( "concurrency-limit-queue:test-team-id", @@ -181,7 +181,7 @@ describe("Concurrency Limit", () => { }; // Push job to queue - await pushConcurrencyLimitedJob(mockTeamId, mockJob); + await pushConcurrencyLimitedJob(mockTeamId, mockJob, 30000); expect(redisConnection.zadd).toHaveBeenCalled(); // Take job from queue diff --git a/apps/api/src/lib/concurrency-limit.ts b/apps/api/src/lib/concurrency-limit.ts index 3be85ec2..8901413f 100644 --- a/apps/api/src/lib/concurrency-limit.ts +++ b/apps/api/src/lib/concurrency-limit.ts @@ -56,6 +56,7 @@ export type ConcurrencyLimitedJob = { export async function takeConcurrencyLimitedJob( team_id: string, ): Promise { + await redisConnection.zremrangebyscore(constructQueueKey(team_id), -Infinity, Date.now()); const res = await redisConnection.zmpop(1, constructQueueKey(team_id), "MIN"); if (res === null || res === undefined) { return null; @@ -67,10 +68,11 @@ export async function takeConcurrencyLimitedJob( export async function pushConcurrencyLimitedJob( team_id: string, job: ConcurrencyLimitedJob, + timeout: number, ) { await redisConnection.zadd( constructQueueKey(team_id), - job.priority ?? 1, + Date.now() + timeout, JSON.stringify(job), ); } diff --git a/apps/api/src/services/queue-jobs.ts b/apps/api/src/services/queue-jobs.ts index 8dd57ffc..f2ecb28e 100644 --- a/apps/api/src/services/queue-jobs.ts +++ b/apps/api/src/services/queue-jobs.ts @@ -46,7 +46,7 @@ async function _addScrapeJobToConcurrencyQueue( jobId: jobId, }, priority: jobPriority, - }); + }, webScraperOptions.scrapeOptions?.timeout ?? (60 * 1000)); } async function _addCrawlScrapeJobToConcurrencyQueue(