mirror of
https://git.mirrors.martin98.com/https://github.com/mendableai/firecrawl
synced 2025-08-12 06:18:59 +08:00
feat(queue-worker/afterJobDone): improved ccq insert logic
This commit is contained in:
parent
f41af8241e
commit
97c635676d
@ -54,6 +54,7 @@ import { scrapeOptions } from "../controllers/v1/types";
|
|||||||
import {
|
import {
|
||||||
cleanOldConcurrencyLimitEntries,
|
cleanOldConcurrencyLimitEntries,
|
||||||
cleanOldCrawlConcurrencyLimitEntries,
|
cleanOldCrawlConcurrencyLimitEntries,
|
||||||
|
getConcurrencyLimitActiveJobs,
|
||||||
pushConcurrencyLimitActiveJob,
|
pushConcurrencyLimitActiveJob,
|
||||||
pushCrawlConcurrencyLimitActiveJob,
|
pushCrawlConcurrencyLimitActiveJob,
|
||||||
removeConcurrencyLimitActiveJob,
|
removeConcurrencyLimitActiveJob,
|
||||||
@ -86,6 +87,7 @@ import http from "http";
|
|||||||
import https from "https";
|
import https from "https";
|
||||||
import { cacheableLookup } from "../scraper/scrapeURL/lib/cacheableLookup";
|
import { cacheableLookup } from "../scraper/scrapeURL/lib/cacheableLookup";
|
||||||
import { robustFetch } from "../scraper/scrapeURL/lib/fetch";
|
import { robustFetch } from "../scraper/scrapeURL/lib/fetch";
|
||||||
|
import { RateLimiterMode } from "../types";
|
||||||
|
|
||||||
configDotenv();
|
configDotenv();
|
||||||
|
|
||||||
@ -805,31 +807,37 @@ const workerFun = async (
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (job.id && job.data && job.data.team_id) {
|
if (job.id && job.data && job.data.team_id) {
|
||||||
|
const maxConcurrency = (await getACUCTeam(job.data.team_id, false, true, job.data.is_extract ? RateLimiterMode.Extract : RateLimiterMode.Crawl))?.concurrency ?? 2;
|
||||||
|
|
||||||
await removeConcurrencyLimitActiveJob(job.data.team_id, job.id);
|
await removeConcurrencyLimitActiveJob(job.data.team_id, job.id);
|
||||||
cleanOldConcurrencyLimitEntries(job.data.team_id);
|
await cleanOldConcurrencyLimitEntries(job.data.team_id);
|
||||||
|
|
||||||
// No need to check if we're under the limit here -- if the current job is finished,
|
// Check if we're under the concurrency limit before adding a new job
|
||||||
// we are 1 under the limit, assuming the job insertion logic never over-inserts. - MG
|
const currentActiveConcurrency = (await getConcurrencyLimitActiveJobs(job.data.team_id)).length;
|
||||||
const nextJob = await takeConcurrencyLimitedJob(job.data.team_id);
|
const concurrencyLimited = currentActiveConcurrency >= maxConcurrency;
|
||||||
if (nextJob !== null) {
|
|
||||||
await pushConcurrencyLimitActiveJob(
|
|
||||||
job.data.team_id,
|
|
||||||
nextJob.id,
|
|
||||||
60 * 1000,
|
|
||||||
); // 60s initial timeout
|
|
||||||
|
|
||||||
await queue.add(
|
if (!concurrencyLimited) {
|
||||||
nextJob.id,
|
const nextJob = await takeConcurrencyLimitedJob(job.data.team_id);
|
||||||
{
|
if (nextJob !== null) {
|
||||||
...nextJob.data,
|
await pushConcurrencyLimitActiveJob(
|
||||||
concurrencyLimitHit: true,
|
job.data.team_id,
|
||||||
},
|
nextJob.id,
|
||||||
{
|
60 * 1000,
|
||||||
...nextJob.opts,
|
); // 60s initial timeout
|
||||||
jobId: nextJob.id,
|
|
||||||
priority: nextJob.priority,
|
await queue.add(
|
||||||
},
|
nextJob.id,
|
||||||
);
|
{
|
||||||
|
...nextJob.data,
|
||||||
|
concurrencyLimitHit: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
...nextJob.opts,
|
||||||
|
jobId: nextJob.id,
|
||||||
|
priority: nextJob.priority,
|
||||||
|
},
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user