Merge pull request #653 from mendableai/mog/fix-status-job-get

fix(v0/crawl-status): don't crash on big crawls when requesting jobs from supa
This commit is contained in:
Nicolas 2024-09-12 11:39:42 -04:00 committed by GitHub
commit ee38273ff9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 44 additions and 9 deletions

View File

@ -4,16 +4,16 @@ import { RateLimiterMode } from "../../../src/types";
import { getScrapeQueue } from "../../../src/services/queue-service"; import { getScrapeQueue } from "../../../src/services/queue-service";
import { Logger } from "../../../src/lib/logger"; import { Logger } from "../../../src/lib/logger";
import { getCrawl, getCrawlJobs } from "../../../src/lib/crawl-redis"; import { getCrawl, getCrawlJobs } from "../../../src/lib/crawl-redis";
import { supabaseGetJobsById } from "../../../src/lib/supabase-jobs"; import { supabaseGetJobsByCrawlId } from "../../../src/lib/supabase-jobs";
import * as Sentry from "@sentry/node"; import * as Sentry from "@sentry/node";
import { configDotenv } from "dotenv"; import { configDotenv } from "dotenv";
configDotenv(); configDotenv();
export async function getJobs(ids: string[]) { export async function getJobs(crawlId: string, ids: string[]) {
const jobs = (await Promise.all(ids.map(x => getScrapeQueue().getJob(x)))).filter(x => x); const jobs = (await Promise.all(ids.map(x => getScrapeQueue().getJob(x)))).filter(x => x);
if (process.env.USE_DB_AUTHENTICATION === "true") { if (process.env.USE_DB_AUTHENTICATION === "true") {
const supabaseData = await supabaseGetJobsById(ids); const supabaseData = await supabaseGetJobsByCrawlId(crawlId);
supabaseData.forEach(x => { supabaseData.forEach(x => {
const job = jobs.find(y => y.id === x.job_id); const job = jobs.find(y => y.id === x.job_id);
@ -52,7 +52,7 @@ export async function crawlStatusController(req: Request, res: Response) {
const jobIDs = await getCrawlJobs(req.params.jobId); const jobIDs = await getCrawlJobs(req.params.jobId);
const jobs = (await getJobs(jobIDs)).sort((a, b) => a.timestamp - b.timestamp); const jobs = (await getJobs(req.params.jobId, jobIDs)).sort((a, b) => a.timestamp - b.timestamp);
const jobStatuses = await Promise.all(jobs.map(x => x.getState())); const jobStatuses = await Promise.all(jobs.map(x => x.getState()));
const jobStatus = sc.cancelled ? "failed" : jobStatuses.every(x => x === "completed") ? "completed" : jobStatuses.some(x => x === "failed") ? "failed" : "active"; const jobStatus = sc.cancelled ? "failed" : jobStatuses.every(x => x === "completed") ? "completed" : jobStatuses.some(x => x === "failed") ? "failed" : "active";

View File

@ -22,7 +22,7 @@ export async function crawlJobStatusPreviewController(req: Request, res: Respons
// } // }
// } // }
const jobs = (await getJobs(jobIDs)).sort((a, b) => a.timestamp - b.timestamp); const jobs = (await getJobs(req.params.jobId, jobIDs)).sort((a, b) => a.timestamp - b.timestamp);
const jobStatuses = await Promise.all(jobs.map(x => x.getState())); const jobStatuses = await Promise.all(jobs.map(x => x.getState()));
const jobStatus = sc.cancelled ? "failed" : jobStatuses.every(x => x === "completed") ? "completed" : jobStatuses.some(x => x === "failed") ? "failed" : "active"; const jobStatus = sc.cancelled ? "failed" : jobStatuses.every(x => x === "completed") ? "completed" : jobStatuses.some(x => x === "failed") ? "failed" : "active";

View File

@ -2,6 +2,11 @@ import { supabase_service } from "../services/supabase";
import { Logger } from "./logger"; import { Logger } from "./logger";
import * as Sentry from "@sentry/node"; import * as Sentry from "@sentry/node";
/**
* Get a single firecrawl_job by ID
* @param jobId ID of Job
* @returns {any | null} Job
*/
export const supabaseGetJobById = async (jobId: string) => { export const supabaseGetJobById = async (jobId: string) => {
const { data, error } = await supabase_service const { data, error } = await supabase_service
.from("firecrawl_jobs") .from("firecrawl_jobs")
@ -20,13 +25,43 @@ export const supabaseGetJobById = async (jobId: string) => {
return data; return data;
}; };
/**
* Get multiple firecrawl_jobs by ID. Use this if you're not requesting a lot (50+) of jobs at once.
* @param jobIds IDs of Jobs
* @returns {any[]} Jobs
*/
export const supabaseGetJobsById = async (jobIds: string[]) => { export const supabaseGetJobsById = async (jobIds: string[]) => {
const { data, error } = await supabase_service.rpc("get_jobs_by_ids", { const { data, error } = await supabase_service
job_ids: jobIds, .from("firecrawl_jobs")
}); .select()
.in("job_id", jobIds);
if (error) { if (error) {
Logger.error(`Error in get_jobs_by_ids: ${error}`); Logger.error(`Error in supabaseGetJobsById: ${error}`);
Sentry.captureException(error);
return [];
}
if (!data) {
return [];
}
return data;
};
/**
* Get multiple firecrawl_jobs by crawl ID. Use this if you need a lot of jobs at once.
* @param crawlId ID of crawl
* @returns {any[]} Jobs
*/
export const supabaseGetJobsByCrawlId = async (crawlId: string) => {
const { data, error } = await supabase_service
.from("firecrawl_jobs")
.select()
.eq("crawl_id", crawlId)
if (error) {
Logger.error(`Error in supabaseGetJobsByCrawlId: ${error}`);
Sentry.captureException(error); Sentry.captureException(error);
return []; return [];
} }