removed hyperdx (they also have graceful shutdown) and tried to change the process for running on server. It didn't work.

This commit is contained in:
rafaelsideguide 2024-07-10 18:29:55 -03:00
parent 7c3cc89a80
commit 86d0e88a91
5 changed files with 123 additions and 122 deletions

View File

@ -3,7 +3,7 @@
# See https://fly.io/docs/reference/configuration/ for information about how to use this file. # See https://fly.io/docs/reference/configuration/ for information about how to use this file.
# #
app = 'staging-firecrawl-scraper-js' app = 'firecrawl-scraper-js'
primary_region = 'mia' primary_region = 'mia'
kill_signal = 'SIGINT' kill_signal = 'SIGINT'
kill_timeout = '5s' kill_timeout = '5s'
@ -17,7 +17,7 @@ kill_timeout = '5s'
[http_service] [http_service]
internal_port = 8080 internal_port = 8080
force_https = true force_https = true
auto_stop_machines = true auto_stop_machines = false
auto_start_machines = true auto_start_machines = true
min_machines_running = 2 min_machines_running = 2
processes = ['app'] processes = ['app']
@ -28,17 +28,16 @@ kill_timeout = '5s'
soft_limit = 50 soft_limit = 50
[[http_service.checks]] [[http_service.checks]]
grace_period = "10s" grace_period = "20s"
interval = "30s" interval = "30s"
method = "GET" method = "GET"
timeout = "5s" timeout = "15s"
path = "/" path = "/"
[[services]] [[services]]
protocol = 'tcp' protocol = 'tcp'
internal_port = 8080 internal_port = 8080
processes = ['worker'] processes = ['app']
[[services.ports]] [[services.ports]]
port = 80 port = 80
@ -51,13 +50,12 @@ kill_timeout = '5s'
[services.concurrency] [services.concurrency]
type = 'connections' type = 'connections'
hard_limit = 25 hard_limit = 30
soft_limit = 20 soft_limit = 12
[[vm]] [[vm]]
size = 'performance-1x' size = 'performance-4x'
processes = ['app','worker'] processes = ['app']

View File

@ -3,7 +3,7 @@
# See https://fly.io/docs/reference/configuration/ for information about how to use this file. # See https://fly.io/docs/reference/configuration/ for information about how to use this file.
# #
app = 'firecrawl-scraper-js' app = 'staging-firecrawl-scraper-js'
primary_region = 'mia' primary_region = 'mia'
kill_signal = 'SIGINT' kill_signal = 'SIGINT'
kill_timeout = '5s' kill_timeout = '5s'
@ -17,7 +17,7 @@ kill_timeout = '5s'
[http_service] [http_service]
internal_port = 8080 internal_port = 8080
force_https = true force_https = true
auto_stop_machines = false auto_stop_machines = true
auto_start_machines = true auto_start_machines = true
min_machines_running = 2 min_machines_running = 2
processes = ['app'] processes = ['app']
@ -28,16 +28,17 @@ kill_timeout = '5s'
soft_limit = 50 soft_limit = 50
[[http_service.checks]] [[http_service.checks]]
grace_period = "20s" grace_period = "10s"
interval = "30s" interval = "30s"
method = "GET" method = "GET"
timeout = "15s" timeout = "5s"
path = "/" path = "/"
[[services]] [[services]]
protocol = 'tcp' protocol = 'tcp'
internal_port = 8080 internal_port = 8080
processes = ['app'] processes = ['worker']
[[services.ports]] [[services.ports]]
port = 80 port = 80
@ -50,12 +51,13 @@ kill_timeout = '5s'
[services.concurrency] [services.concurrency]
type = 'connections' type = 'connections'
hard_limit = 30 hard_limit = 25
soft_limit = 12 soft_limit = 20
[[vm]] [[vm]]
size = 'performance-4x' size = 'performance-1x'
processes = ['app'] processes = ['app','worker']

View File

@ -4,23 +4,23 @@ import { AuthResponse, NotificationType, RateLimiterMode } from "../../src/types
import { supabase_service } from "../../src/services/supabase"; import { supabase_service } from "../../src/services/supabase";
import { withAuth } from "../../src/lib/withAuth"; import { withAuth } from "../../src/lib/withAuth";
import { RateLimiterRedis } from "rate-limiter-flexible"; import { RateLimiterRedis } from "rate-limiter-flexible";
import { setTraceAttributes } from '@hyperdx/node-opentelemetry'; // import { setTraceAttributes } from '@hyperdx/node-opentelemetry';
import { sendNotification } from "../services/notification/email_notification"; import { sendNotification } from "../services/notification/email_notification";
export async function authenticateUser(req, res, mode?: RateLimiterMode): Promise<AuthResponse> { export async function authenticateUser(req, res, mode?: RateLimiterMode): Promise<AuthResponse> {
return withAuth(supaAuthenticateUser)(req, res, mode); return withAuth(supaAuthenticateUser)(req, res, mode);
} }
function setTrace(team_id: string, api_key: string) { // function setTrace(team_id: string, api_key: string) {
try { // try {
setTraceAttributes({ // setTraceAttributes({
team_id, // team_id,
api_key // api_key
}); // });
} catch (error) { // } catch (error) {
console.error('Error setting trace attributes:', error); // console.error('Error setting trace attributes:', error);
} // }
} // }
export async function supaAuthenticateUser( export async function supaAuthenticateUser(
req, req,
res, res,
@ -99,7 +99,7 @@ export async function supaAuthenticateUser(
const plan = getPlanByPriceId(data[0].price_id); const plan = getPlanByPriceId(data[0].price_id);
// HyperDX Logging // HyperDX Logging
setTrace(team_id, normalizedApi); // setTrace(team_id, normalizedApi);
subscriptionData = { subscriptionData = {
team_id: team_id, team_id: team_id,
plan: plan plan: plan

View File

@ -5,7 +5,7 @@ import "dotenv/config";
import { getWebScraperQueue } from "./services/queue-service"; import { getWebScraperQueue } from "./services/queue-service";
import { redisClient } from "./services/rate-limiter"; import { redisClient } from "./services/rate-limiter";
import { v0Router } from "./routes/v0"; import { v0Router } from "./routes/v0";
import { initSDK } from "@hyperdx/node-opentelemetry"; // import { initSDK } from "@hyperdx/node-opentelemetry";
import cluster from "cluster"; import cluster from "cluster";
import os from "os"; import os from "os";
import { Job } from "bull"; import { Job } from "bull";
@ -22,38 +22,38 @@ console.log(`Number of CPUs: ${numCPUs} available`);
if (cluster.isMaster) { if (cluster.isMaster) {
console.log(`Master ${process.pid} is running`); console.log(`Master ${process.pid} is running`);
(async () => { // (async () => {
if (process.env.USE_DB_AUTHENTICATION) { // if (process.env.USE_DB_AUTHENTICATION) {
const wsq = getWebScraperQueue(); // const wsq = getWebScraperQueue();
const { error, data } = await supabase_service // const { error, data } = await supabase_service
.from("firecrawl_jobs") // .from("firecrawl_jobs")
.select() // .select()
.eq("retry", true); // .eq("retry", true);
if (error) throw new Error(error.message); // if (error) throw new Error(error.message);
await wsq.addBulk(data.map(x => ({ // await wsq.addBulk(data.map(x => ({
data: { // data: {
url: x.url, // url: x.url,
mode: x.mode, // mode: x.mode,
crawlerOptions: x.crawler_options, // crawlerOptions: x.crawler_options,
team_id: x.team_id, // team_id: x.team_id,
pageOptions: x.page_options, // pageOptions: x.page_options,
origin: x.origin, // origin: x.origin,
}, // },
opts: { // opts: {
jobId: x.job_id, // jobId: x.job_id,
} // }
}))) // })))
if (data.length > 0) { // if (data.length > 0) {
await supabase_service // await supabase_service
.from("firecrawl_jobs") // .from("firecrawl_jobs")
.delete() // .delete()
.in("id", data.map(x => x.id)); // .in("id", data.map(x => x.id));
} // }
} // }
})(); // })();
// Fork workers. // Fork workers.
for (let i = 0; i < numCPUs; i++) { for (let i = 0; i < numCPUs; i++) {
@ -67,59 +67,6 @@ if (cluster.isMaster) {
cluster.fork(); cluster.fork();
} }
}); });
const onExit = async () => {
console.log("Shutting down gracefully...");
if (cluster.workers) {
for (const worker of Object.keys(cluster.workers || {})) {
cluster.workers[worker].process.kill();
}
}
if (process.env.USE_DB_AUTHENTICATION) {
const wsq = getWebScraperQueue();
const activeJobCount = await wsq.getActiveCount();
console.log("Updating", activeJobCount, "in-progress jobs");
const activeJobs = (await Promise.all(new Array(Math.ceil(activeJobCount / 10)).fill(0).map((_, i) => {
return wsq.getActive(i, i+10)
}))).flat(1);
for (const job of activeJobs) {
console.log(job.id);
try {
await logJob({
job_id: job.id as string,
success: false,
message: "Interrupted, retrying",
num_docs: 0,
docs: [],
time_taken: 0,
team_id: job.data.team_id,
mode: "crawl",
url: job.data.url,
crawlerOptions: job.data.crawlerOptions,
pageOptions: job.data.pageOptions,
origin: job.data.origin,
retry: true,
});
await wsq.client.del(await job.lockKey());
await job.takeLock();
await job.moveToFailed({ message: "interrupted" });
await job.remove();
} catch (error) {
console.error("Failed to update job status:", error);
}
}
}
process.exit();
};
process.on("SIGINT", onExit);
process.on("SIGTERM", onExit);
} else { } else {
const app = express(); const app = express();
@ -160,9 +107,9 @@ if (cluster.isMaster) {
redisClient.connect(); redisClient.connect();
// HyperDX OpenTelemetry // HyperDX OpenTelemetry
if (process.env.ENV === "production") { // if (process.env.ENV === "production") {
initSDK({ consoleCapture: true, additionalInstrumentations: [] }); // initSDK({ consoleCapture: true, additionalInstrumentations: [] });
} // }
function startServer(port = DEFAULT_PORT) { function startServer(port = DEFAULT_PORT) {
const server = app.listen(Number(port), HOST, () => { const server = app.listen(Number(port), HOST, () => {
@ -324,3 +271,57 @@ if (cluster.isMaster) {
console.log(`Worker ${process.pid} started`); console.log(`Worker ${process.pid} started`);
} }
const onExit = async () => {
console.log("Shutting down gracefully...");
if (cluster.workers) {
for (const worker of Object.keys(cluster.workers || {})) {
cluster.workers[worker].process.kill();
}
}
if (process.env.USE_DB_AUTHENTICATION) {
const wsq = getWebScraperQueue();
const activeJobCount = await wsq.getActiveCount();
console.log("Updating", activeJobCount, "in-progress jobs");
const activeJobs = (await Promise.all(new Array(Math.ceil(activeJobCount / 10)).fill(0).map((_, i) => {
return wsq.getActive(i, i+10)
}))).flat(1);
for (const job of activeJobs) {
console.log(job.id);
try {
await logJob({
job_id: job.id as string,
success: false,
message: "Interrupted, retrying",
num_docs: 0,
docs: [],
time_taken: 0,
team_id: job.data.team_id,
mode: "crawl",
url: job.data.url,
crawlerOptions: job.data.crawlerOptions,
pageOptions: job.data.pageOptions,
origin: job.data.origin,
retry: true,
});
await wsq.client.del(await job.lockKey());
await job.takeLock();
await job.moveToFailed({ message: "interrupted" });
await job.remove();
} catch (error) {
console.error("Failed to update job status:", error);
}
}
}
console.log("Bye!");
process.exit();
};
process.on("SIGINT", onExit);
process.on("SIGTERM", onExit);

View File

@ -5,11 +5,11 @@ import { logtail } from "./logtail";
import { startWebScraperPipeline } from "../main/runWebScraper"; import { startWebScraperPipeline } from "../main/runWebScraper";
import { callWebhook } from "./webhook"; import { callWebhook } from "./webhook";
import { logJob } from "./logging/log_job"; import { logJob } from "./logging/log_job";
import { initSDK } from '@hyperdx/node-opentelemetry'; // import { initSDK } from '@hyperdx/node-opentelemetry';
if(process.env.ENV === 'production') { // if(process.env.ENV === 'production') {
initSDK({ consoleCapture: true, additionalInstrumentations: []}); // initSDK({ consoleCapture: true, additionalInstrumentations: []});
} // }
getWebScraperQueue().process( getWebScraperQueue().process(
Math.floor(Number(process.env.NUM_WORKERS_PER_QUEUE ?? 8)), Math.floor(Number(process.env.NUM_WORKERS_PER_QUEUE ?? 8)),