From e28c415cf4ac3b228fe4f1266029a4f1490af940 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Fri, 9 Aug 2024 14:07:46 -0400 Subject: [PATCH] Nick: --- apps/api/fly.staging.toml | 3 +- apps/api/src/main/runWebScraper.ts | 2 +- apps/api/src/run-req.ts | 175 ++++++++++++++++++++++++++ apps/api/src/services/queue-worker.ts | 4 +- 4 files changed, 180 insertions(+), 4 deletions(-) create mode 100644 apps/api/src/run-req.ts diff --git a/apps/api/fly.staging.toml b/apps/api/fly.staging.toml index 7a5e0848..6a28a692 100644 --- a/apps/api/fly.staging.toml +++ b/apps/api/fly.staging.toml @@ -55,8 +55,9 @@ kill_timeout = '30s' soft_limit = 20 [[vm]] - size = 'performance-1x' + size = 'performance-2x' processes = ['app','worker'] + memory = 8192 diff --git a/apps/api/src/main/runWebScraper.ts b/apps/api/src/main/runWebScraper.ts index d204bcd4..0c053b77 100644 --- a/apps/api/src/main/runWebScraper.ts +++ b/apps/api/src/main/runWebScraper.ts @@ -35,7 +35,7 @@ export async function startWebScraperPipeline({ if (partialDocs.length > 50) { partialDocs = partialDocs.slice(-50); } - job.updateProgress({ ...progress, partialDocs: partialDocs }); + // job.updateProgress({ ...progress, partialDocs: partialDocs }); } }, onSuccess: (result, mode) => { diff --git a/apps/api/src/run-req.ts b/apps/api/src/run-req.ts new file mode 100644 index 00000000..6d29916d --- /dev/null +++ b/apps/api/src/run-req.ts @@ -0,0 +1,175 @@ +import axios from "axios"; +import { promises as fs } from "fs"; +import { v4 as uuidV4 } from "uuid"; + +interface Result { + start_url: string; + job_id?: string; + idempotency_key?: string; + result_data_jsonb?: any; +} + +async function sendCrawl(result: Result): Promise { + const idempotencyKey = uuidV4(); + const url = result.start_url; + try { + const response = await axios.post( + "https://staging-firecrawl-scraper-js.fly.dev/v0/crawl", + { + url: url, + crawlerOptions: { + limit: 75, + }, + pageOptions: { + includeHtml: true, + replaceAllPathsWithAbsolutePaths: true, + waitFor: 1000, + }, + }, + { + headers: { + "Content-Type": "application/json", + Authorization: `Bearer `, + }, + } + ); + result.idempotency_key = idempotencyKey; + return response.data.jobId; + } catch (error) { + console.error("Error sending crawl:", error); + return undefined; + } +} + +async function getContent(result: Result): Promise { + let attempts = 0; + while (attempts < 120) { + // Reduce the number of attempts to speed up + try { + const response = await axios.get( + `https://staging-firecrawl-scraper-js.fly.dev/v0/crawl/status/${result.job_id}`, + { + headers: { + "Content-Type": "application/json", + Authorization: `Bearer `, + }, + } + ); + if (response.data.status === "completed") { + result.result_data_jsonb = response.data.data; + // Job actually completed + return true; + } + } catch (error) { + console.error("Error getting content:", error); + } + const randomSleep = Math.floor(Math.random() * 15000) + 5000; + await new Promise((resolve) => setTimeout(resolve, randomSleep)); // Reduce sleep time to 1.5 seconds + attempts++; + } + // Set result as null if timed out + result.result_data_jsonb = null; + return false; +} + +async function processResults(results: Result[]): Promise { + let processedCount = 0; + let starterCount = 0; + const queue: Result[] = []; + const processedUrls = new Set(); + + // Initialize the queue with the first 1000 results + for (let i = 0; i < Math.min(100, results.length); i++) { + queue.push(results[i]); + processedUrls.add(results[i].start_url); + } + + // Function to process a single result + const processSingleResult = async (result: Result) => { + const jobId = await sendCrawl(result); + if (jobId) { + console.log(`Job requested count: ${starterCount}`); + starterCount++; + result.job_id = jobId; + processedCount++; + // Save the result to the file + try { + // Save job id along with the start_url + const resultWithJobId = results.map(r => ({ + start_url: r.start_url, + job_id: r.job_id, + })); + await fs.writeFile( + "results_with_job_id_4000_6000.json", + JSON.stringify(resultWithJobId, null, 4) + ); + } catch (error) { + console.error("Error writing to results_with_content.json:", error); + } + + // Add a new result to the queue if there are more results to process + // if (processedCount < results.length) { + // for (let i = queue.length; i < results.length; i++) { + // if (!processedUrls.has(results[i].start_url)) { + // const nextResult = results[i]; + // console.log("Next result:", nextResult.start_url); + // queue.push(nextResult); + // processedUrls.add(nextResult.start_url); + // console.log(`Queue length: ${queue.length}`); + // processSingleResult(nextResult); + // break; + // } + // } + // } + } + }; + + // Start processing the initial queue concurrently + // for (let i = 0; i < queue.length; i++) { + // processSingleResult(queue[i]); + // if ((i + 1) % 500 === 0) { + // console.log(`Processed ${i + 1} results, waiting for 1 minute before adding the next batch...`); + // await new Promise(resolve => setTimeout(resolve, 60 * 1000)); // Wait for 1 minute + // } + // } + // Start processing the initial queue concurrently + // await Promise.all(queue.map(result => processSingleResult(result))); + for (let i = 0; i < results.length; i += 100) { + const batch = results.slice(i, i + 100); + Promise.all(batch.map((result) => processSingleResult(result))) + .then(() => { + console.log(`Processed ${i + 100} results.`); + }) + .catch((error) => { + console.error(`Error processing batch starting at index ${i}:`, error); + }); + await new Promise((resolve) => setTimeout(resolve, 60 * 1000)); // Wait for 1 minute + } +} + +// Example call + +async function getStartUrls(): Promise { + try { + const data = await fs.readFile("starturls.json", "utf-8"); + return JSON.parse(data); + } catch (error) { + console.error("Error reading starturls.json:", error); + return []; + } +} + +async function main() { + const results: Result[] = (await getStartUrls()).slice(3999, 6000); + // console.log(results.map((r) => r.start_url).slice(0, 3)); + + processResults(results) + .then(() => { + console.log("All results processed."); + }) + .catch((error) => { + console.error("Error processing results:", error); + }); +} + +main(); diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index c5af5fab..f7e82dc2 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -51,9 +51,9 @@ const processJobInternal = async (token: string, job: Job) => { try { const result = await processJob(job, token); - const jobState = await job.getState(); - if (jobState !== "completed" && jobState !== "failed") { + try{ await job.moveToCompleted(result.docs, token, false); + }catch(e){ } } catch (error) { console.log("Job failed, error:", error);