extract concurrency hotfix

This commit is contained in:
Gergő Móricz 2025-04-11 20:38:51 +02:00
parent 6e9396dc57
commit f18a6b20ff
4 changed files with 10 additions and 4 deletions

View File

@ -31,6 +31,7 @@ export async function scrapeDocument(
const jobPriority = await getJobPriority({ const jobPriority = await getJobPriority({
team_id: options.teamId, team_id: options.teamId,
basePriority: 10, basePriority: 10,
from_extract: true,
}); });
await addScrapeJob( await addScrapeJob(
@ -45,6 +46,7 @@ export async function scrapeDocument(
}, },
origin: options.origin, origin: options.origin,
is_scrape: true, is_scrape: true,
from_extract: true,
}, },
{}, {},
jobId, jobId,

View File

@ -1,3 +1,4 @@
import { RateLimiterMode } from "../types";
import { getACUC, getACUCTeam } from "../controllers/auth"; import { getACUC, getACUCTeam } from "../controllers/auth";
import { redisConnection } from "../services/queue-service"; import { redisConnection } from "../services/queue-service";
import { logger } from "./logger"; import { logger } from "./logger";
@ -31,16 +32,18 @@ export async function deleteJobPriority(team_id, job_id) {
export async function getJobPriority({ export async function getJobPriority({
team_id, team_id,
basePriority = 10, basePriority = 10,
from_extract = false,
}: { }: {
team_id: string; team_id: string;
basePriority?: number; basePriority?: number;
from_extract?: boolean;
}): Promise<number> { }): Promise<number> {
if (team_id === "d97c4ceb-290b-4957-8432-2b2a02727d95") { if (team_id === "d97c4ceb-290b-4957-8432-2b2a02727d95") {
return 50; return 50;
} }
try { try {
const acuc = await getACUCTeam(team_id); const acuc = await getACUCTeam(team_id, false, true, from_extract ? RateLimiterMode.Extract : RateLimiterMode.Crawl);
const setKey = SET_KEY_PREFIX + team_id; const setKey = SET_KEY_PREFIX + team_id;

View File

@ -1,6 +1,6 @@
import { getScrapeQueue } from "./queue-service"; import { getScrapeQueue } from "./queue-service";
import { v4 as uuidv4 } from "uuid"; import { v4 as uuidv4 } from "uuid";
import { NotificationType, WebScraperOptions } from "../types"; import { NotificationType, RateLimiterMode, WebScraperOptions } from "../types";
import * as Sentry from "@sentry/node"; import * as Sentry from "@sentry/node";
import { import {
cleanOldConcurrencyLimitEntries, cleanOldConcurrencyLimitEntries,
@ -78,7 +78,7 @@ async function addScrapeJobRaw(
webScraperOptions.team_id webScraperOptions.team_id
) { ) {
const now = Date.now(); const now = Date.now();
maxConcurrency = (await getACUCTeam(webScraperOptions.team_id))?.concurrency ?? 2; maxConcurrency = (await getACUCTeam(webScraperOptions.team_id, false, true, webScraperOptions.is_extract ? RateLimiterMode.Extract : RateLimiterMode.Crawl))?.concurrency ?? 2;
cleanOldConcurrencyLimitEntries(webScraperOptions.team_id, now); cleanOldConcurrencyLimitEntries(webScraperOptions.team_id, now);
currentActiveConcurrency = (await getConcurrencyLimitActiveJobs(webScraperOptions.team_id, now)).length; currentActiveConcurrency = (await getConcurrencyLimitActiveJobs(webScraperOptions.team_id, now)).length;
concurrencyLimited = currentActiveConcurrency >= maxConcurrency; concurrencyLimited = currentActiveConcurrency >= maxConcurrency;
@ -171,7 +171,7 @@ export async function addScrapeJobs(
if (jobs[0].data && jobs[0].data.team_id) { if (jobs[0].data && jobs[0].data.team_id) {
const now = Date.now(); const now = Date.now();
maxConcurrency = (await getACUCTeam(jobs[0].data.team_id))?.concurrency ?? 2; maxConcurrency = (await getACUCTeam(jobs[0].data.team_id, false, true, jobs[0].data.from_extract ? RateLimiterMode.Extract : RateLimiterMode.Crawl))?.concurrency ?? 2;
cleanOldConcurrencyLimitEntries(jobs[0].data.team_id, now); cleanOldConcurrencyLimitEntries(jobs[0].data.team_id, now);
currentActiveConcurrency = (await getConcurrencyLimitActiveJobs(jobs[0].data.team_id, now)).length; currentActiveConcurrency = (await getConcurrencyLimitActiveJobs(jobs[0].data.team_id, now)).length;

View File

@ -44,6 +44,7 @@ export interface WebScraperOptions {
v1?: boolean; v1?: boolean;
is_scrape?: boolean; is_scrape?: boolean;
isCrawlSourceScrape?: boolean; isCrawlSourceScrape?: boolean;
from_extract?: boolean;
} }
export interface RunWebScraperParams { export interface RunWebScraperParams {