diff --git a/apps/api/src/index.ts b/apps/api/src/index.ts index f18975eb..3e72040d 100644 --- a/apps/api/src/index.ts +++ b/apps/api/src/index.ts @@ -10,6 +10,7 @@ import cluster from "cluster"; import os from "os"; import { Job } from "bull"; import { supabase_service } from "./services/supabase"; +import { logJob } from "./services/logging/log_job"; const { createBullBoard } = require("@bull-board/api"); const { BullAdapter } = require("@bull-board/api/bullAdapter"); @@ -44,6 +45,13 @@ if (cluster.isMaster) { jobId: x.job_id, } }))) + + if (data.length > 0) { + await supabase_service + .from("firecrawl_jobs") + .delete() + .in("id", data.map(x => x.id)); + } } })(); @@ -79,17 +87,31 @@ if (cluster.isMaster) { }))).flat(1); for (const job of activeJobs) { + console.log(job.id); try { - const { error } = await supabase_service - .from("firecrawl_jobs") - .update({ docs: job.data.docs, partial_docs: job.data.partialDocs, retry: true }) - .eq("job_id", job.id); - - if (error) throw new Error(error.message); + await logJob({ + job_id: job.id as string, + success: false, + message: "Interrupted, retrying", + num_docs: 0, + docs: [], + time_taken: 0, + team_id: job.data.team_id, + mode: "crawl", + url: job.data.url, + crawlerOptions: job.data.crawlerOptions, + pageOptions: job.data.pageOptions, + origin: job.data.origin, + retry: true, + }); + + await wsq.client.del(await job.lockKey()); + await job.takeLock(); + await job.moveToFailed({ message: "interrupted" }); + await job.remove(); } catch (error) { console.error("Failed to update job status:", error); } - await wsq.removeJobs(job.id.toString()); } } diff --git a/apps/api/src/services/logging/log_job.ts b/apps/api/src/services/logging/log_job.ts index 448168a4..7c8c78f9 100644 --- a/apps/api/src/services/logging/log_job.ts +++ b/apps/api/src/services/logging/log_job.ts @@ -38,6 +38,7 @@ export async function logJob(job: FirecrawlJob) { origin: job.origin, extractor_options: job.extractor_options, num_tokens: job.num_tokens, + retry: !!job.retry, }, ]); @@ -61,6 +62,7 @@ export async function logJob(job: FirecrawlJob) { origin: job.origin, extractor_options: job.extractor_options, num_tokens: job.num_tokens, + retry: job.retry, }, }; posthog.capture(phLog); diff --git a/apps/api/src/types.ts b/apps/api/src/types.ts index 755896e1..cef49f2f 100644 --- a/apps/api/src/types.ts +++ b/apps/api/src/types.ts @@ -62,6 +62,7 @@ export interface FirecrawlJob { origin: string; extractor_options?: ExtractorOptions, num_tokens?: number, + retry?: boolean, } export interface FirecrawlScrapeResponse {