diff --git a/apps/api/fly.staging.toml b/apps/api/fly.staging.toml index 09fa135e..7a5e0848 100644 --- a/apps/api/fly.staging.toml +++ b/apps/api/fly.staging.toml @@ -8,9 +8,6 @@ primary_region = 'mia' kill_signal = 'SIGINT' kill_timeout = '30s' -[deploy] - release_command = 'node dist/src/trigger-shutdown.js https://staging-firecrawl-scraper-js.fly.dev' - [build] [processes] diff --git a/apps/api/fly.toml b/apps/api/fly.toml index 481290f0..f7ef786f 100644 --- a/apps/api/fly.toml +++ b/apps/api/fly.toml @@ -8,9 +8,6 @@ primary_region = 'mia' kill_signal = 'SIGINT' kill_timeout = '30s' -[deploy] - release_command = 'node dist/src/trigger-shutdown.js https://api.firecrawl.dev' - [build] [processes] diff --git a/apps/api/package.json b/apps/api/package.json index 183ddaa3..da1b2b33 100644 --- a/apps/api/package.json +++ b/apps/api/package.json @@ -19,8 +19,8 @@ "mongo-docker": "docker run -d -p 2717:27017 -v ./mongo-data:/data/db --name mongodb mongo:latest", "mongo-docker-console": "docker exec -it mongodb mongosh", "run-example": "npx ts-node src/example.ts", - "deploy:fly": "flyctl deploy && node postdeploy.js https://api.firecrawl.dev", - "deploy:fly:staging": "fly deploy -c fly.staging.toml && node postdeploy.js https://staging-firecrawl-scraper-js.fly.dev" + "deploy:fly": "flyctl deploy", + "deploy:fly:staging": "fly deploy -c fly.staging.toml" }, "author": "", "license": "ISC", diff --git a/apps/api/postdeploy.js b/apps/api/postdeploy.js deleted file mode 100644 index c1b94d70..00000000 --- a/apps/api/postdeploy.js +++ /dev/null @@ -1,11 +0,0 @@ -require("dotenv").config(); - -fetch(process.argv[2] + "/admin/" + process.env.BULL_AUTH_KEY + "/unpause", { - method: "POST" -}).then(async x => { - console.log(await x.text()); - process.exit(0); -}).catch(e => { - console.error(e); - process.exit(1); -}); diff --git a/apps/api/src/index.ts b/apps/api/src/index.ts index 3942e937..88ec4418 100644 --- a/apps/api/src/index.ts +++ b/apps/api/src/index.ts @@ -119,63 +119,6 @@ if (cluster.isMaster) { } }); - app.post(`/admin/${process.env.BULL_AUTH_KEY}/shutdown`, async (req, res) => { - // return res.status(200).json({ ok: true }); - try { - console.log("Gracefully shutting down..."); - await getWebScraperQueue().pause(false, true); - res.json({ ok: true }); - } catch (error) { - console.error(error); - return res.status(500).json({ error: error.message }); - } - }); - - app.post(`/admin/${process.env.BULL_AUTH_KEY}/unpause`, async (req, res) => { - try { - const wsq = getWebScraperQueue(); - - const jobs = await wsq.getActive(); - - console.log("Requeueing", jobs.length, "jobs..."); - - if (jobs.length > 0) { - console.log(" Removing", jobs.length, "jobs..."); - - await Promise.all( - jobs.map(async (x) => { - try { - await wsq.client.del(await x.lockKey()); - await x.takeLock(); - await x.moveToFailed({ message: "interrupted" }); - await x.remove(); - } catch (e) { - console.warn("Failed to remove job", x.id, e); - } - }) - ); - - console.log(" Re-adding", jobs.length, "jobs..."); - await wsq.addBulk( - jobs.map((x) => ({ - data: x.data, - opts: { - jobId: x.id, - }, - })) - ); - - console.log(" Done!"); - } - - await getWebScraperQueue().resume(false); - res.json({ ok: true }); - } catch (error) { - console.error(error); - return res.status(500).json({ error: error.message }); - } - }); - app.get(`/serverHealthCheck`, async (req, res) => { try { const webScraperQueue = getWebScraperQueue(); diff --git a/apps/api/src/services/queue-service.ts b/apps/api/src/services/queue-service.ts index a32b78f0..f93c3504 100644 --- a/apps/api/src/services/queue-service.ts +++ b/apps/api/src/services/queue-service.ts @@ -7,8 +7,10 @@ export function getWebScraperQueue() { if (!webScraperQueue) { webScraperQueue = new Queue("web-scraper", process.env.REDIS_URL, { settings: { - lockDuration: 2 * 60 * 60 * 1000, // 2 hours in milliseconds, - lockRenewTime: 30 * 60 * 1000, // 30 minutes in milliseconds + lockDuration: 2 * 60 * 1000, // 1 minute in milliseconds, + lockRenewTime: 15 * 1000, // 15 seconds in milliseconds + stalledInterval: 30 * 1000, + maxStalledCount: 10, }, }); console.log("Web scraper queue created"); diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index 24343487..be2a4c70 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -6,6 +6,7 @@ import { startWebScraperPipeline } from "../main/runWebScraper"; import { callWebhook } from "./webhook"; import { logJob } from "./logging/log_job"; import { initSDK } from '@hyperdx/node-opentelemetry'; +import { Job } from "bull"; if(process.env.ENV === 'production') { initSDK({ @@ -16,93 +17,99 @@ if(process.env.ENV === 'production') { const wsq = getWebScraperQueue(); -wsq.process( - Math.floor(Number(process.env.NUM_WORKERS_PER_QUEUE ?? 8)), - async function (job, done) { - try { - job.progress({ - current: 1, - total: 100, - current_step: "SCRAPING", - current_url: "", - }); - const start = Date.now(); - const { success, message, docs } = await startWebScraperPipeline({ job }); - const end = Date.now(); - const timeTakenInSeconds = (end - start) / 1000; +async function processJob(job: Job, done) { + console.log("taking job", job.id); + try { + job.progress({ + current: 1, + total: 100, + current_step: "SCRAPING", + current_url: "", + }); + const start = Date.now(); + const { success, message, docs } = await startWebScraperPipeline({ job }); + const end = Date.now(); + const timeTakenInSeconds = (end - start) / 1000; - const data = { - success: success, - result: { - links: docs.map((doc) => { - return { content: doc, source: doc?.metadata?.sourceURL ?? doc?.url ?? "" }; - }), - }, - project_id: job.data.project_id, - error: message /* etc... */, - }; + const data = { + success: success, + result: { + links: docs.map((doc) => { + return { content: doc, source: doc?.metadata?.sourceURL ?? doc?.url ?? "" }; + }), + }, + project_id: job.data.project_id, + error: message /* etc... */, + }; - await callWebhook(job.data.team_id, job.id as string, data); + await callWebhook(job.data.team_id, job.id as string, data); - await logJob({ - job_id: job.id as string, - success: success, - message: message, - num_docs: docs.length, - docs: docs, - time_taken: timeTakenInSeconds, - team_id: job.data.team_id, - mode: "crawl", - url: job.data.url, - crawlerOptions: job.data.crawlerOptions, - pageOptions: job.data.pageOptions, - origin: job.data.origin, - }); - done(null, data); - } catch (error) { - if (await getWebScraperQueue().isPaused(false)) { - return; - } + await logJob({ + job_id: job.id as string, + success: success, + message: message, + num_docs: docs.length, + docs: docs, + time_taken: timeTakenInSeconds, + team_id: job.data.team_id, + mode: "crawl", + url: job.data.url, + crawlerOptions: job.data.crawlerOptions, + pageOptions: job.data.pageOptions, + origin: job.data.origin, + }); + console.log("job done", job.id); + done(null, data); + } catch (error) { + console.log("job errored", job.id, error); + if (await getWebScraperQueue().isPaused(false)) { + console.log("queue is paused, ignoring"); + return; + } - if (error instanceof CustomError) { - // Here we handle the error, then save the failed job - console.error(error.message); // or any other error handling + if (error instanceof CustomError) { + // Here we handle the error, then save the failed job + console.error(error.message); // or any other error handling - logtail.error("Custom error while ingesting", { - job_id: job.id, - error: error.message, - dataIngestionJob: error.dataIngestionJob, - }); - } - console.log(error); - - logtail.error("Overall error ingesting", { + logtail.error("Custom error while ingesting", { job_id: job.id, error: error.message, + dataIngestionJob: error.dataIngestionJob, }); - - const data = { - success: false, - project_id: job.data.project_id, - error: - "Something went wrong... Contact help@mendable.ai or try again." /* etc... */, - }; - await callWebhook(job.data.team_id, job.id as string, data); - await logJob({ - job_id: job.id as string, - success: false, - message: typeof error === 'string' ? error : (error.message ?? "Something went wrong... Contact help@mendable.ai"), - num_docs: 0, - docs: [], - time_taken: 0, - team_id: job.data.team_id, - mode: "crawl", - url: job.data.url, - crawlerOptions: job.data.crawlerOptions, - pageOptions: job.data.pageOptions, - origin: job.data.origin, - }); - done(null, data); } + console.log(error); + + logtail.error("Overall error ingesting", { + job_id: job.id, + error: error.message, + }); + + const data = { + success: false, + project_id: job.data.project_id, + error: + "Something went wrong... Contact help@mendable.ai or try again." /* etc... */, + }; + await callWebhook(job.data.team_id, job.id as string, data); + await logJob({ + job_id: job.id as string, + success: false, + message: typeof error === 'string' ? error : (error.message ?? "Something went wrong... Contact help@mendable.ai"), + num_docs: 0, + docs: [], + time_taken: 0, + team_id: job.data.team_id, + mode: "crawl", + url: job.data.url, + crawlerOptions: job.data.crawlerOptions, + pageOptions: job.data.pageOptions, + origin: job.data.origin, + }); + done(null, data); } +} + +wsq.process( + Math.floor(Number(process.env.NUM_WORKERS_PER_QUEUE ?? 8)), + processJob ); diff --git a/apps/api/src/trigger-shutdown.ts b/apps/api/src/trigger-shutdown.ts deleted file mode 100644 index 5b36f81f..00000000 --- a/apps/api/src/trigger-shutdown.ts +++ /dev/null @@ -1,9 +0,0 @@ -fetch(process.argv[2] + "/admin/" + process.env.BULL_AUTH_KEY + "/shutdown", { - method: "POST" -}).then(async x => { - console.log(await x.text()); - process.exit(0); -}).catch(e => { - console.error(e); - process.exit(1); -});