From 1a07e9d23b3895682469ba9ee5279507f5876f0a Mon Sep 17 00:00:00 2001 From: Gergo Moricz Date: Tue, 9 Jul 2024 14:56:47 +0200 Subject: [PATCH] feat: pick up and commit interrupted jobs from/to DB --- apps/api/src/index.ts | 75 +++++++++++++++++++++++- apps/api/src/services/logging/log_job.ts | 2 + apps/api/src/types.ts | 1 + 3 files changed, 77 insertions(+), 1 deletion(-) diff --git a/apps/api/src/index.ts b/apps/api/src/index.ts index 747f8a7b..3e72040d 100644 --- a/apps/api/src/index.ts +++ b/apps/api/src/index.ts @@ -9,6 +9,8 @@ import { initSDK } from "@hyperdx/node-opentelemetry"; import cluster from "cluster"; import os from "os"; import { Job } from "bull"; +import { supabase_service } from "./services/supabase"; +import { logJob } from "./services/logging/log_job"; const { createBullBoard } = require("@bull-board/api"); const { BullAdapter } = require("@bull-board/api/bullAdapter"); @@ -20,6 +22,39 @@ console.log(`Number of CPUs: ${numCPUs} available`); if (cluster.isMaster) { console.log(`Master ${process.pid} is running`); + (async () => { + if (process.env.USE_DB_AUTHENTICATION) { + const wsq = getWebScraperQueue(); + const { error, data } = await supabase_service + .from("firecrawl_jobs") + .select() + .eq("retry", true); + + if (error) throw new Error(error.message); + + await wsq.addBulk(data.map(x => ({ + data: { + url: x.url, + mode: x.mode, + crawlerOptions: x.crawler_options, + team_id: x.team_id, + pageOptions: x.page_options, + origin: x.origin, + }, + opts: { + jobId: x.job_id, + } + }))) + + if (data.length > 0) { + await supabase_service + .from("firecrawl_jobs") + .delete() + .in("id", data.map(x => x.id)); + } + } + })(); + // Fork workers. for (let i = 0; i < numCPUs; i++) { cluster.fork(); @@ -33,7 +68,7 @@ if (cluster.isMaster) { } }); - const onExit = () => { + const onExit = async () => { console.log("Shutting down gracefully..."); if (cluster.workers) { @@ -42,6 +77,44 @@ if (cluster.isMaster) { } } + if (process.env.USE_DB_AUTHENTICATION) { + const wsq = getWebScraperQueue(); + const activeJobCount = await wsq.getActiveCount(); + console.log("Updating", activeJobCount, "in-progress jobs"); + + const activeJobs = (await Promise.all(new Array(Math.ceil(activeJobCount / 10)).fill(0).map((_, i) => { + return wsq.getActive(i, i+10) + }))).flat(1); + + for (const job of activeJobs) { + console.log(job.id); + try { + await logJob({ + job_id: job.id as string, + success: false, + message: "Interrupted, retrying", + num_docs: 0, + docs: [], + time_taken: 0, + team_id: job.data.team_id, + mode: "crawl", + url: job.data.url, + crawlerOptions: job.data.crawlerOptions, + pageOptions: job.data.pageOptions, + origin: job.data.origin, + retry: true, + }); + + await wsq.client.del(await job.lockKey()); + await job.takeLock(); + await job.moveToFailed({ message: "interrupted" }); + await job.remove(); + } catch (error) { + console.error("Failed to update job status:", error); + } + } + } + process.exit(); }; diff --git a/apps/api/src/services/logging/log_job.ts b/apps/api/src/services/logging/log_job.ts index 448168a4..7c8c78f9 100644 --- a/apps/api/src/services/logging/log_job.ts +++ b/apps/api/src/services/logging/log_job.ts @@ -38,6 +38,7 @@ export async function logJob(job: FirecrawlJob) { origin: job.origin, extractor_options: job.extractor_options, num_tokens: job.num_tokens, + retry: !!job.retry, }, ]); @@ -61,6 +62,7 @@ export async function logJob(job: FirecrawlJob) { origin: job.origin, extractor_options: job.extractor_options, num_tokens: job.num_tokens, + retry: job.retry, }, }; posthog.capture(phLog); diff --git a/apps/api/src/types.ts b/apps/api/src/types.ts index 755896e1..cef49f2f 100644 --- a/apps/api/src/types.ts +++ b/apps/api/src/types.ts @@ -62,6 +62,7 @@ export interface FirecrawlJob { origin: string; extractor_options?: ExtractorOptions, num_tokens?: number, + retry?: boolean, } export interface FirecrawlScrapeResponse {