feat(v0): fix jobs

This commit is contained in:
Gergő Móricz 2025-04-14 11:39:13 -07:00
parent ebdf182b00
commit 0b50349fed

View File

@ -10,16 +10,19 @@ import { configDotenv } from "dotenv";
import { Job } from "bullmq";
import { toLegacyDocument } from "../v1/types";
import type { DBJob, PseudoJob } from "../v1/crawl-status";
import { getJobFromGCS } from "../../lib/gcs-jobs";
configDotenv();
export async function getJobs(crawlId: string, ids: string[]): Promise<PseudoJob<any>[]> {
const [bullJobs, dbJobs] = await Promise.all([
const [bullJobs, dbJobs, gcsJobs] = await Promise.all([
Promise.all(ids.map((x) => getScrapeQueue().getJob(x))).then(x => x.filter(x => x)) as Promise<(Job<any, any, string> & { id: string })[]>,
process.env.USE_DB_AUTHENTICATION === "true" ? await supabaseGetJobsByCrawlId(crawlId) : [],
process.env.GCS_BUCKET_NAME ? Promise.all(ids.map(async (x) => ({ id: x, job: await getJobFromGCS(x) }))).then(x => x.filter(x => x.job)) as Promise<({ id: string, job: any | null })[]> : [],
]);
const bullJobMap = new Map<string, PseudoJob<any>>();
const dbJobMap = new Map<string, DBJob>();
const gcsJobMap = new Map<string, any>();
for (const job of bullJobs) {
bullJobMap.set(job.id, job);
@ -28,16 +31,26 @@ export async function getJobs(crawlId: string, ids: string[]): Promise<PseudoJob
for (const job of dbJobs) {
dbJobMap.set(job.job_id, job);
}
for (const job of gcsJobs) {
gcsJobMap.set(job.id, job.job);
}
const jobs: PseudoJob<any>[] = [];
for (const id of ids) {
const bullJob = bullJobMap.get(id);
const dbJob = dbJobMap.get(id);
const gcsJob = gcsJobMap.get(id);
if (!bullJob && !dbJob) continue;
const data = dbJob?.docs ?? bullJob?.returnvalue;
const data = gcsJob ?? dbJob?.docs ?? bullJob?.returnvalue;
if (gcsJob === null && data) {
logger.warn("GCS Job not found", {
jobId: id,
});
}
const job: PseudoJob<any> = {
id,