fix(queue-worker): new getJobs, log on v0

This commit is contained in:
Gergő Móricz 2024-09-01 19:29:35 +02:00
parent 44fe741c35
commit 980293652d

View File

@ -35,6 +35,7 @@ import {
getJobPriority,
} from "../../src/lib/job-priority";
import { PlanType } from "../types";
import { getJobs } from "../../src/controllers/v1/crawl-status";
if (process.env.ENV === "production") {
initSDK({
@ -358,33 +359,7 @@ async function processJob(job: Job, token: string) {
if (!job.data.v1) {
const jobIDs = await getCrawlJobs(job.data.crawl_id);
const jobs = (
await Promise.all(
jobIDs.map(async (x) => {
if (x === job.id) {
return {
async getState() {
return "completed";
},
timestamp: Date.now(),
returnvalue: docs,
};
}
const j = await getScrapeQueue().getJob(x);
if (process.env.USE_DB_AUTHENTICATION === "true") {
const supabaseData = await supabaseGetJobById(j.id);
if (supabaseData) {
j.returnvalue = supabaseData.docs;
}
}
return j;
})
)
).sort((a, b) => a.timestamp - b.timestamp);
const jobs = (await getJobs(jobIDs)).sort((a, b) => a.timestamp - b.timestamp);
const jobStatuses = await Promise.all(jobs.map((x) => x.getState()));
const jobStatus =
sc.cancelled || jobStatuses.some((x) => x === "failed")
@ -437,6 +412,28 @@ async function processJob(job: Job, token: string) {
"crawl.completed"
);
}
} else {
const jobIDs = await getCrawlJobs(job.data.crawl_id);
const jobStatuses = await Promise.all(jobIDs.map((x) => getScrapeQueue().getJobState(x)));
const jobStatus =
sc.cancelled || jobStatuses.some((x) => x === "failed")
? "failed"
: "completed";
await logJob({
job_id: job.data.crawl_id,
success: jobStatus === "completed",
message: sc.cancelled ? "Cancelled" : message,
num_docs: jobIDs.length,
docs: [],
time_taken: (Date.now() - sc.createdAt) / 1000,
team_id: job.data.team_id,
mode: "crawl",
url: sc.originUrl,
crawlerOptions: sc.crawlerOptions,
pageOptions: sc.pageOptions,
origin: job.data.origin,
});
}
}
}