feat(crawl-status): allow for jobs to expire out of the redis

This commit is contained in:
Gergő Móricz 2025-01-23 19:33:43 +01:00
parent 6f696d32ae
commit 95ce3c3b71

View File

@ -20,50 +20,89 @@ import {
supabaseGetJobsById, supabaseGetJobsById,
} from "../../lib/supabase-jobs"; } from "../../lib/supabase-jobs";
import { configDotenv } from "dotenv"; import { configDotenv } from "dotenv";
import { Job, JobState } from "bullmq"; import type { Job, JobState } from "bullmq";
import { logger } from "../../lib/logger"; import { logger } from "../../lib/logger";
import { supabase_service } from "../../services/supabase";
configDotenv(); configDotenv();
export async function getJob(id: string) { type PseudoJob<T> = {
const job = await getScrapeQueue().getJob(id); id: string,
if (!job) return job; getState(): Promise<JobState | "unknown"> | JobState | "unknown",
returnvalue: T | null,
timestamp: number,
data: {
scrapeOptions: any,
},
}
if (process.env.USE_DB_AUTHENTICATION === "true") { type DBJob = { docs: any, success: boolean, page_options: any, date_added: any }
const supabaseData = await supabaseGetJobById(id);
if (supabaseData) { export async function getJob(id: string): Promise<PseudoJob<any> | null> {
job.returnvalue = supabaseData.docs; const [bullJob, dbJob] = await Promise.all([
getScrapeQueue().getJob(id),
(process.env.USE_DB_AUTHENTICATION === "true" ? supabaseGetJobById(id) : null) as Promise<DBJob | null>,
]);
if (!bullJob && !dbJob) return null;
const data = dbJob?.docs ?? bullJob?.returnvalue;
const job: PseudoJob<any> = {
id,
getState: bullJob ? bullJob.getState : (() => dbJob!.success ? "completed" : "failed"),
returnvalue: Array.isArray(data)
? data[0]
: data,
data: {
scrapeOptions: bullJob ? bullJob.data.scrapeOptions : dbJob!.page_options,
},
timestamp: bullJob ? bullJob.timestamp : new Date(dbJob!.date_added).valueOf(),
} }
}
job.returnvalue = Array.isArray(job.returnvalue)
? job.returnvalue[0]
: job.returnvalue;
return job; return job;
} }
export async function getJobs(ids: string[]) { export async function getJobs(ids: string[]): Promise<PseudoJob<any>[]> {
const jobs: (Job & { id: string })[] = ( const [bullJobs, dbJobs] = await Promise.all([
await Promise.all(ids.map((x) => getScrapeQueue().getJob(x))) Promise.all(ids.map((x) => getScrapeQueue().getJob(x))).then(x => x.filter(x => x)) as Promise<(Job<any, any, string> & { id: string })[]>,
).filter((x) => x) as (Job & { id: string })[]; process.env.USE_DB_AUTHENTICATION === "true" ? supabaseGetJobsById(ids) : [],
]);
if (process.env.USE_DB_AUTHENTICATION === "true") { const bullJobMap = new Map<string, PseudoJob<any>>();
const supabaseData = await supabaseGetJobsById(ids); const dbJobMap = new Map<string, DBJob>();
supabaseData.forEach((x) => { for (const job of bullJobs) {
const job = jobs.find((y) => y.id === x.job_id); bullJobMap.set(job.id, job);
if (job) {
job.returnvalue = x.docs;
}
});
} }
jobs.forEach((job) => { for (const job of dbJobs) {
job.returnvalue = Array.isArray(job.returnvalue) dbJobMap.set(job.job_id, job);
? job.returnvalue[0] }
: job.returnvalue;
}); const jobs: PseudoJob<any>[] = [];
for (const id of ids) {
const bullJob = bullJobMap.get(id);
const dbJob = dbJobMap.get(id);
if (!bullJob && !dbJob) continue;
const data = dbJob?.docs ?? bullJob?.returnvalue;
const job: PseudoJob<any> = {
id,
getState: bullJob ? (() => bullJob.getState()) : (() => dbJob!.success ? "completed" : "failed"),
returnvalue: Array.isArray(data)
? data[0]
: data,
data: {
scrapeOptions: bullJob ? bullJob.data.scrapeOptions : dbJob!.page_options,
},
timestamp: bullJob ? bullJob.timestamp : new Date(dbJob!.date_added).valueOf(),
}
jobs.push(job);
}
return jobs; return jobs;
} }
@ -133,7 +172,7 @@ export async function crawlStatusController(
end ?? -1, end ?? -1,
); );
let doneJobs: Job[] = []; let doneJobs: PseudoJob<any>[] = [];
if (end === undefined) { if (end === undefined) {
// determine 10 megabyte limit // determine 10 megabyte limit
@ -184,7 +223,7 @@ export async function crawlStatusController(
(await x.getState()) === "failed" ? null : x, (await x.getState()) === "failed" ? null : x,
), ),
) )
).filter((x) => x !== null) as Job[]; ).filter((x) => x !== null) as PseudoJob<any>[];
} }
const data = doneJobs.map((x) => x.returnvalue); const data = doneJobs.map((x) => x.returnvalue);
@ -200,24 +239,24 @@ export async function crawlStatusController(
nextURL.searchParams.set("limit", req.query.limit); nextURL.searchParams.set("limit", req.query.limit);
} }
// deprecated: this is done on queue-worker side now. if you see this after january 8, 2025, remove this let totalCount = jobIDs.length;
if (data.length > 0) {
if (!doneJobs[0].data.scrapeOptions.formats.includes("rawHtml")) { if (totalCount === 0) {
for (let ii = 0; ii < doneJobs.length; ii++) { const x = await supabase_service
if (data[ii]) { .from('firecrawl_jobs')
delete data[ii].rawHtml; .select('*', { count: 'exact', head: true })
.eq("crawl_id", req.params.jobId)
.eq("success", true)
totalCount = x.count ?? 0;
} }
}
}
}
// remove until here
res.status(200).json({ res.status(200).json({
success: true, success: true,
status, status,
completed: doneJobsLength, completed: doneJobsLength,
total: jobIDs.length, total: totalCount,
creditsUsed: jobIDs.length, creditsUsed: totalCount,
expiresAt: (await getCrawlExpiry(req.params.jobId)).toISOString(), expiresAt: (await getCrawlExpiry(req.params.jobId)).toISOString(),
next: next:
status !== "scraping" && start + data.length === doneJobsLength // if there's not gonna be any documents after this status !== "scraping" && start + data.length === doneJobsLength // if there's not gonna be any documents after this