From 86d0e88a91e1738e0a774f665321fe9d9505ec80 Mon Sep 17 00:00:00 2001 From: rafaelsideguide <150964962+rafaelsideguide@users.noreply.github.com> Date: Wed, 10 Jul 2024 18:29:55 -0300 Subject: [PATCH] removed hyperdx (they also have graceful shutdown) and tried to change the process for running on server. It didn't work. --- .../{fly.staging.toml => _ORIGIN_fly.toml} | 20 +- apps/api/fly.toml | 20 +- apps/api/src/controllers/auth.ts | 24 +-- apps/api/src/index.ts | 173 +++++++++--------- apps/api/src/services/queue-worker.ts | 8 +- 5 files changed, 123 insertions(+), 122 deletions(-) rename apps/api/{fly.staging.toml => _ORIGIN_fly.toml} (80%) diff --git a/apps/api/fly.staging.toml b/apps/api/_ORIGIN_fly.toml similarity index 80% rename from apps/api/fly.staging.toml rename to apps/api/_ORIGIN_fly.toml index 565e693d..468695d8 100644 --- a/apps/api/fly.staging.toml +++ b/apps/api/_ORIGIN_fly.toml @@ -3,7 +3,7 @@ # See https://fly.io/docs/reference/configuration/ for information about how to use this file. # -app = 'staging-firecrawl-scraper-js' +app = 'firecrawl-scraper-js' primary_region = 'mia' kill_signal = 'SIGINT' kill_timeout = '5s' @@ -17,7 +17,7 @@ kill_timeout = '5s' [http_service] internal_port = 8080 force_https = true - auto_stop_machines = true + auto_stop_machines = false auto_start_machines = true min_machines_running = 2 processes = ['app'] @@ -28,17 +28,16 @@ kill_timeout = '5s' soft_limit = 50 [[http_service.checks]] - grace_period = "10s" + grace_period = "20s" interval = "30s" method = "GET" - timeout = "5s" + timeout = "15s" path = "/" - [[services]] protocol = 'tcp' internal_port = 8080 - processes = ['worker'] + processes = ['app'] [[services.ports]] port = 80 @@ -51,13 +50,12 @@ kill_timeout = '5s' [services.concurrency] type = 'connections' - hard_limit = 25 - soft_limit = 20 + hard_limit = 30 + soft_limit = 12 [[vm]] - size = 'performance-1x' - processes = ['app','worker'] - + size = 'performance-4x' + processes = ['app'] diff --git a/apps/api/fly.toml b/apps/api/fly.toml index 468695d8..565e693d 100644 --- a/apps/api/fly.toml +++ b/apps/api/fly.toml @@ -3,7 +3,7 @@ # See https://fly.io/docs/reference/configuration/ for information about how to use this file. # -app = 'firecrawl-scraper-js' +app = 'staging-firecrawl-scraper-js' primary_region = 'mia' kill_signal = 'SIGINT' kill_timeout = '5s' @@ -17,7 +17,7 @@ kill_timeout = '5s' [http_service] internal_port = 8080 force_https = true - auto_stop_machines = false + auto_stop_machines = true auto_start_machines = true min_machines_running = 2 processes = ['app'] @@ -28,16 +28,17 @@ kill_timeout = '5s' soft_limit = 50 [[http_service.checks]] - grace_period = "20s" + grace_period = "10s" interval = "30s" method = "GET" - timeout = "15s" + timeout = "5s" path = "/" + [[services]] protocol = 'tcp' internal_port = 8080 - processes = ['app'] + processes = ['worker'] [[services.ports]] port = 80 @@ -50,12 +51,13 @@ kill_timeout = '5s' [services.concurrency] type = 'connections' - hard_limit = 30 - soft_limit = 12 + hard_limit = 25 + soft_limit = 20 [[vm]] - size = 'performance-4x' - processes = ['app'] + size = 'performance-1x' + processes = ['app','worker'] + diff --git a/apps/api/src/controllers/auth.ts b/apps/api/src/controllers/auth.ts index 56a5ec61..fe3a5e1b 100644 --- a/apps/api/src/controllers/auth.ts +++ b/apps/api/src/controllers/auth.ts @@ -4,23 +4,23 @@ import { AuthResponse, NotificationType, RateLimiterMode } from "../../src/types import { supabase_service } from "../../src/services/supabase"; import { withAuth } from "../../src/lib/withAuth"; import { RateLimiterRedis } from "rate-limiter-flexible"; -import { setTraceAttributes } from '@hyperdx/node-opentelemetry'; +// import { setTraceAttributes } from '@hyperdx/node-opentelemetry'; import { sendNotification } from "../services/notification/email_notification"; export async function authenticateUser(req, res, mode?: RateLimiterMode): Promise { return withAuth(supaAuthenticateUser)(req, res, mode); } -function setTrace(team_id: string, api_key: string) { - try { - setTraceAttributes({ - team_id, - api_key - }); - } catch (error) { - console.error('Error setting trace attributes:', error); - } +// function setTrace(team_id: string, api_key: string) { +// try { +// setTraceAttributes({ +// team_id, +// api_key +// }); +// } catch (error) { +// console.error('Error setting trace attributes:', error); +// } -} +// } export async function supaAuthenticateUser( req, res, @@ -99,7 +99,7 @@ export async function supaAuthenticateUser( const plan = getPlanByPriceId(data[0].price_id); // HyperDX Logging - setTrace(team_id, normalizedApi); + // setTrace(team_id, normalizedApi); subscriptionData = { team_id: team_id, plan: plan diff --git a/apps/api/src/index.ts b/apps/api/src/index.ts index 3e72040d..47c97ea4 100644 --- a/apps/api/src/index.ts +++ b/apps/api/src/index.ts @@ -5,7 +5,7 @@ import "dotenv/config"; import { getWebScraperQueue } from "./services/queue-service"; import { redisClient } from "./services/rate-limiter"; import { v0Router } from "./routes/v0"; -import { initSDK } from "@hyperdx/node-opentelemetry"; +// import { initSDK } from "@hyperdx/node-opentelemetry"; import cluster from "cluster"; import os from "os"; import { Job } from "bull"; @@ -22,38 +22,38 @@ console.log(`Number of CPUs: ${numCPUs} available`); if (cluster.isMaster) { console.log(`Master ${process.pid} is running`); - (async () => { - if (process.env.USE_DB_AUTHENTICATION) { - const wsq = getWebScraperQueue(); - const { error, data } = await supabase_service - .from("firecrawl_jobs") - .select() - .eq("retry", true); + // (async () => { + // if (process.env.USE_DB_AUTHENTICATION) { + // const wsq = getWebScraperQueue(); + // const { error, data } = await supabase_service + // .from("firecrawl_jobs") + // .select() + // .eq("retry", true); - if (error) throw new Error(error.message); + // if (error) throw new Error(error.message); - await wsq.addBulk(data.map(x => ({ - data: { - url: x.url, - mode: x.mode, - crawlerOptions: x.crawler_options, - team_id: x.team_id, - pageOptions: x.page_options, - origin: x.origin, - }, - opts: { - jobId: x.job_id, - } - }))) + // await wsq.addBulk(data.map(x => ({ + // data: { + // url: x.url, + // mode: x.mode, + // crawlerOptions: x.crawler_options, + // team_id: x.team_id, + // pageOptions: x.page_options, + // origin: x.origin, + // }, + // opts: { + // jobId: x.job_id, + // } + // }))) - if (data.length > 0) { - await supabase_service - .from("firecrawl_jobs") - .delete() - .in("id", data.map(x => x.id)); - } - } - })(); + // if (data.length > 0) { + // await supabase_service + // .from("firecrawl_jobs") + // .delete() + // .in("id", data.map(x => x.id)); + // } + // } + // })(); // Fork workers. for (let i = 0; i < numCPUs; i++) { @@ -67,59 +67,6 @@ if (cluster.isMaster) { cluster.fork(); } }); - - const onExit = async () => { - console.log("Shutting down gracefully..."); - - if (cluster.workers) { - for (const worker of Object.keys(cluster.workers || {})) { - cluster.workers[worker].process.kill(); - } - } - - if (process.env.USE_DB_AUTHENTICATION) { - const wsq = getWebScraperQueue(); - const activeJobCount = await wsq.getActiveCount(); - console.log("Updating", activeJobCount, "in-progress jobs"); - - const activeJobs = (await Promise.all(new Array(Math.ceil(activeJobCount / 10)).fill(0).map((_, i) => { - return wsq.getActive(i, i+10) - }))).flat(1); - - for (const job of activeJobs) { - console.log(job.id); - try { - await logJob({ - job_id: job.id as string, - success: false, - message: "Interrupted, retrying", - num_docs: 0, - docs: [], - time_taken: 0, - team_id: job.data.team_id, - mode: "crawl", - url: job.data.url, - crawlerOptions: job.data.crawlerOptions, - pageOptions: job.data.pageOptions, - origin: job.data.origin, - retry: true, - }); - - await wsq.client.del(await job.lockKey()); - await job.takeLock(); - await job.moveToFailed({ message: "interrupted" }); - await job.remove(); - } catch (error) { - console.error("Failed to update job status:", error); - } - } - } - - process.exit(); - }; - - process.on("SIGINT", onExit); - process.on("SIGTERM", onExit); } else { const app = express(); @@ -160,9 +107,9 @@ if (cluster.isMaster) { redisClient.connect(); // HyperDX OpenTelemetry - if (process.env.ENV === "production") { - initSDK({ consoleCapture: true, additionalInstrumentations: [] }); - } + // if (process.env.ENV === "production") { + // initSDK({ consoleCapture: true, additionalInstrumentations: [] }); + // } function startServer(port = DEFAULT_PORT) { const server = app.listen(Number(port), HOST, () => { @@ -324,3 +271,57 @@ if (cluster.isMaster) { console.log(`Worker ${process.pid} started`); } + +const onExit = async () => { + console.log("Shutting down gracefully..."); + + if (cluster.workers) { + for (const worker of Object.keys(cluster.workers || {})) { + cluster.workers[worker].process.kill(); + } + } + + if (process.env.USE_DB_AUTHENTICATION) { + const wsq = getWebScraperQueue(); + const activeJobCount = await wsq.getActiveCount(); + console.log("Updating", activeJobCount, "in-progress jobs"); + + const activeJobs = (await Promise.all(new Array(Math.ceil(activeJobCount / 10)).fill(0).map((_, i) => { + return wsq.getActive(i, i+10) + }))).flat(1); + + for (const job of activeJobs) { + console.log(job.id); + try { + await logJob({ + job_id: job.id as string, + success: false, + message: "Interrupted, retrying", + num_docs: 0, + docs: [], + time_taken: 0, + team_id: job.data.team_id, + mode: "crawl", + url: job.data.url, + crawlerOptions: job.data.crawlerOptions, + pageOptions: job.data.pageOptions, + origin: job.data.origin, + retry: true, + }); + + await wsq.client.del(await job.lockKey()); + await job.takeLock(); + await job.moveToFailed({ message: "interrupted" }); + await job.remove(); + } catch (error) { + console.error("Failed to update job status:", error); + } + } + } + + console.log("Bye!"); + process.exit(); +}; + +process.on("SIGINT", onExit); +process.on("SIGTERM", onExit); \ No newline at end of file diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index 228d02f5..8721691c 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -5,11 +5,11 @@ import { logtail } from "./logtail"; import { startWebScraperPipeline } from "../main/runWebScraper"; import { callWebhook } from "./webhook"; import { logJob } from "./logging/log_job"; -import { initSDK } from '@hyperdx/node-opentelemetry'; +// import { initSDK } from '@hyperdx/node-opentelemetry'; -if(process.env.ENV === 'production') { - initSDK({ consoleCapture: true, additionalInstrumentations: []}); -} +// if(process.env.ENV === 'production') { +// initSDK({ consoleCapture: true, additionalInstrumentations: []}); +// } getWebScraperQueue().process( Math.floor(Number(process.env.NUM_WORKERS_PER_QUEUE ?? 8)),