From 95ce3c3b712fd4f349190a6e2226fdb42f829024 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gerg=C5=91=20M=C3=B3ricz?= Date: Thu, 23 Jan 2025 19:33:43 +0100 Subject: [PATCH] feat(crawl-status): allow for jobs to expire out of the redis --- apps/api/src/controllers/v1/crawl-status.ts | 127 +++++++++++++------- 1 file changed, 83 insertions(+), 44 deletions(-) diff --git a/apps/api/src/controllers/v1/crawl-status.ts b/apps/api/src/controllers/v1/crawl-status.ts index bac70a34..a27e791a 100644 --- a/apps/api/src/controllers/v1/crawl-status.ts +++ b/apps/api/src/controllers/v1/crawl-status.ts @@ -20,50 +20,89 @@ import { supabaseGetJobsById, } from "../../lib/supabase-jobs"; import { configDotenv } from "dotenv"; -import { Job, JobState } from "bullmq"; +import type { Job, JobState } from "bullmq"; import { logger } from "../../lib/logger"; +import { supabase_service } from "../../services/supabase"; configDotenv(); -export async function getJob(id: string) { - const job = await getScrapeQueue().getJob(id); - if (!job) return job; +type PseudoJob = { + id: string, + getState(): Promise | JobState | "unknown", + returnvalue: T | null, + timestamp: number, + data: { + scrapeOptions: any, + }, +} - if (process.env.USE_DB_AUTHENTICATION === "true") { - const supabaseData = await supabaseGetJobById(id); +type DBJob = { docs: any, success: boolean, page_options: any, date_added: any } - if (supabaseData) { - job.returnvalue = supabaseData.docs; - } +export async function getJob(id: string): Promise | null> { + const [bullJob, dbJob] = await Promise.all([ + getScrapeQueue().getJob(id), + (process.env.USE_DB_AUTHENTICATION === "true" ? supabaseGetJobById(id) : null) as Promise, + ]); + + if (!bullJob && !dbJob) return null; + + const data = dbJob?.docs ?? bullJob?.returnvalue; + + const job: PseudoJob = { + id, + getState: bullJob ? bullJob.getState : (() => dbJob!.success ? "completed" : "failed"), + returnvalue: Array.isArray(data) + ? data[0] + : data, + data: { + scrapeOptions: bullJob ? bullJob.data.scrapeOptions : dbJob!.page_options, + }, + timestamp: bullJob ? bullJob.timestamp : new Date(dbJob!.date_added).valueOf(), } - job.returnvalue = Array.isArray(job.returnvalue) - ? job.returnvalue[0] - : job.returnvalue; - return job; } -export async function getJobs(ids: string[]) { - const jobs: (Job & { id: string })[] = ( - await Promise.all(ids.map((x) => getScrapeQueue().getJob(x))) - ).filter((x) => x) as (Job & { id: string })[]; +export async function getJobs(ids: string[]): Promise[]> { + const [bullJobs, dbJobs] = await Promise.all([ + Promise.all(ids.map((x) => getScrapeQueue().getJob(x))).then(x => x.filter(x => x)) as Promise<(Job & { id: string })[]>, + process.env.USE_DB_AUTHENTICATION === "true" ? supabaseGetJobsById(ids) : [], + ]); - if (process.env.USE_DB_AUTHENTICATION === "true") { - const supabaseData = await supabaseGetJobsById(ids); + const bullJobMap = new Map>(); + const dbJobMap = new Map(); - supabaseData.forEach((x) => { - const job = jobs.find((y) => y.id === x.job_id); - if (job) { - job.returnvalue = x.docs; - } - }); + for (const job of bullJobs) { + bullJobMap.set(job.id, job); } - jobs.forEach((job) => { - job.returnvalue = Array.isArray(job.returnvalue) - ? job.returnvalue[0] - : job.returnvalue; - }); + for (const job of dbJobs) { + dbJobMap.set(job.job_id, job); + } + + const jobs: PseudoJob[] = []; + + for (const id of ids) { + const bullJob = bullJobMap.get(id); + const dbJob = dbJobMap.get(id); + + if (!bullJob && !dbJob) continue; + + const data = dbJob?.docs ?? bullJob?.returnvalue; + + const job: PseudoJob = { + id, + getState: bullJob ? (() => bullJob.getState()) : (() => dbJob!.success ? "completed" : "failed"), + returnvalue: Array.isArray(data) + ? data[0] + : data, + data: { + scrapeOptions: bullJob ? bullJob.data.scrapeOptions : dbJob!.page_options, + }, + timestamp: bullJob ? bullJob.timestamp : new Date(dbJob!.date_added).valueOf(), + } + + jobs.push(job); + } return jobs; } @@ -133,7 +172,7 @@ export async function crawlStatusController( end ?? -1, ); - let doneJobs: Job[] = []; + let doneJobs: PseudoJob[] = []; if (end === undefined) { // determine 10 megabyte limit @@ -184,7 +223,7 @@ export async function crawlStatusController( (await x.getState()) === "failed" ? null : x, ), ) - ).filter((x) => x !== null) as Job[]; + ).filter((x) => x !== null) as PseudoJob[]; } const data = doneJobs.map((x) => x.returnvalue); @@ -200,24 +239,24 @@ export async function crawlStatusController( nextURL.searchParams.set("limit", req.query.limit); } - // deprecated: this is done on queue-worker side now. if you see this after january 8, 2025, remove this - if (data.length > 0) { - if (!doneJobs[0].data.scrapeOptions.formats.includes("rawHtml")) { - for (let ii = 0; ii < doneJobs.length; ii++) { - if (data[ii]) { - delete data[ii].rawHtml; - } - } - } + let totalCount = jobIDs.length; + + if (totalCount === 0) { + const x = await supabase_service + .from('firecrawl_jobs') + .select('*', { count: 'exact', head: true }) + .eq("crawl_id", req.params.jobId) + .eq("success", true) + + totalCount = x.count ?? 0; } - // remove until here res.status(200).json({ success: true, status, completed: doneJobsLength, - total: jobIDs.length, - creditsUsed: jobIDs.length, + total: totalCount, + creditsUsed: totalCount, expiresAt: (await getCrawlExpiry(req.params.jobId)).toISOString(), next: status !== "scraping" && start + data.length === doneJobsLength // if there's not gonna be any documents after this