From f18a6b20ffbc908da00596c083b3eab1b79aff4d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gerg=C5=91=20M=C3=B3ricz?= Date: Fri, 11 Apr 2025 20:38:51 +0200 Subject: [PATCH] extract concurrency hotfix --- apps/api/src/lib/extract/document-scraper.ts | 2 ++ apps/api/src/lib/job-priority.ts | 5 ++++- apps/api/src/services/queue-jobs.ts | 6 +++--- apps/api/src/types.ts | 1 + 4 files changed, 10 insertions(+), 4 deletions(-) diff --git a/apps/api/src/lib/extract/document-scraper.ts b/apps/api/src/lib/extract/document-scraper.ts index 313f2f31..f5501230 100644 --- a/apps/api/src/lib/extract/document-scraper.ts +++ b/apps/api/src/lib/extract/document-scraper.ts @@ -31,6 +31,7 @@ export async function scrapeDocument( const jobPriority = await getJobPriority({ team_id: options.teamId, basePriority: 10, + from_extract: true, }); await addScrapeJob( @@ -45,6 +46,7 @@ export async function scrapeDocument( }, origin: options.origin, is_scrape: true, + from_extract: true, }, {}, jobId, diff --git a/apps/api/src/lib/job-priority.ts b/apps/api/src/lib/job-priority.ts index 5e89ad9d..02356c21 100644 --- a/apps/api/src/lib/job-priority.ts +++ b/apps/api/src/lib/job-priority.ts @@ -1,3 +1,4 @@ +import { RateLimiterMode } from "../types"; import { getACUC, getACUCTeam } from "../controllers/auth"; import { redisConnection } from "../services/queue-service"; import { logger } from "./logger"; @@ -31,16 +32,18 @@ export async function deleteJobPriority(team_id, job_id) { export async function getJobPriority({ team_id, basePriority = 10, + from_extract = false, }: { team_id: string; basePriority?: number; + from_extract?: boolean; }): Promise { if (team_id === "d97c4ceb-290b-4957-8432-2b2a02727d95") { return 50; } try { - const acuc = await getACUCTeam(team_id); + const acuc = await getACUCTeam(team_id, false, true, from_extract ? RateLimiterMode.Extract : RateLimiterMode.Crawl); const setKey = SET_KEY_PREFIX + team_id; diff --git a/apps/api/src/services/queue-jobs.ts b/apps/api/src/services/queue-jobs.ts index 3e8985fe..24924d7d 100644 --- a/apps/api/src/services/queue-jobs.ts +++ b/apps/api/src/services/queue-jobs.ts @@ -1,6 +1,6 @@ import { getScrapeQueue } from "./queue-service"; import { v4 as uuidv4 } from "uuid"; -import { NotificationType, WebScraperOptions } from "../types"; +import { NotificationType, RateLimiterMode, WebScraperOptions } from "../types"; import * as Sentry from "@sentry/node"; import { cleanOldConcurrencyLimitEntries, @@ -78,7 +78,7 @@ async function addScrapeJobRaw( webScraperOptions.team_id ) { const now = Date.now(); - maxConcurrency = (await getACUCTeam(webScraperOptions.team_id))?.concurrency ?? 2; + maxConcurrency = (await getACUCTeam(webScraperOptions.team_id, false, true, webScraperOptions.is_extract ? RateLimiterMode.Extract : RateLimiterMode.Crawl))?.concurrency ?? 2; cleanOldConcurrencyLimitEntries(webScraperOptions.team_id, now); currentActiveConcurrency = (await getConcurrencyLimitActiveJobs(webScraperOptions.team_id, now)).length; concurrencyLimited = currentActiveConcurrency >= maxConcurrency; @@ -171,7 +171,7 @@ export async function addScrapeJobs( if (jobs[0].data && jobs[0].data.team_id) { const now = Date.now(); - maxConcurrency = (await getACUCTeam(jobs[0].data.team_id))?.concurrency ?? 2; + maxConcurrency = (await getACUCTeam(jobs[0].data.team_id, false, true, jobs[0].data.from_extract ? RateLimiterMode.Extract : RateLimiterMode.Crawl))?.concurrency ?? 2; cleanOldConcurrencyLimitEntries(jobs[0].data.team_id, now); currentActiveConcurrency = (await getConcurrencyLimitActiveJobs(jobs[0].data.team_id, now)).length; diff --git a/apps/api/src/types.ts b/apps/api/src/types.ts index a42d4cb1..6a6ae6d9 100644 --- a/apps/api/src/types.ts +++ b/apps/api/src/types.ts @@ -44,6 +44,7 @@ export interface WebScraperOptions { v1?: boolean; is_scrape?: boolean; isCrawlSourceScrape?: boolean; + from_extract?: boolean; } export interface RunWebScraperParams {