chore: formatting

This commit is contained in:
Nicolas 2024-12-17 16:58:57 -03:00
parent b9f621bed5
commit 3b6edef9fa
12 changed files with 55 additions and 30 deletions

View File

@ -60,7 +60,11 @@ export async function scrapeController(
try { try {
doc = await waitForJob<Document>(jobId, timeout + totalWait); // TODO: better types for this doc = await waitForJob<Document>(jobId, timeout + totalWait); // TODO: better types for this
} catch (e) { } catch (e) {
logger.error(`Error in scrapeController: ${e}`, { jobId, scrapeId: jobId, startTime }); logger.error(`Error in scrapeController: ${e}`, {
jobId,
scrapeId: jobId,
startTime,
});
if ( if (
e instanceof Error && e instanceof Error &&
(e.message.startsWith("Job wait") || e.message === "timeout") (e.message.startsWith("Job wait") || e.message === "timeout")

View File

@ -94,9 +94,13 @@ export async function addCrawlJobDone(
await redisConnection.rpush("crawl:" + id + ":jobs_done_ordered", job_id); await redisConnection.rpush("crawl:" + id + ":jobs_done_ordered", job_id);
} else { } else {
// in case it's already been pushed, make sure it's removed // in case it's already been pushed, make sure it's removed
await redisConnection.lrem("crawl:" + id + ":jobs_done_ordered", -1, job_id); await redisConnection.lrem(
"crawl:" + id + ":jobs_done_ordered",
-1,
job_id,
);
} }
await redisConnection.expire( await redisConnection.expire(
"crawl:" + id + ":jobs_done_ordered", "crawl:" + id + ":jobs_done_ordered",
24 * 60 * 60, 24 * 60 * 60,

View File

@ -122,4 +122,3 @@
// }, // },
// }; // };
// } // }

View File

@ -5,7 +5,7 @@ import { specialtyScrapeCheck } from "../utils/specialtyHandler";
export async function scrapeURLWithFetch( export async function scrapeURLWithFetch(
meta: Meta, meta: Meta,
timeToRun: number | undefined timeToRun: number | undefined,
): Promise<EngineScrapeResult> { ): Promise<EngineScrapeResult> {
const timeout = timeToRun ?? 300000; const timeout = timeToRun ?? 300000;

View File

@ -128,7 +128,7 @@ export async function scrapeURLWithFireEngineChromeCDP(
(a, x) => (x.type === "wait" ? (x.milliseconds ?? 1000) + a : a), (a, x) => (x.type === "wait" ? (x.milliseconds ?? 1000) + a : a),
0, 0,
); );
const timeout = (timeToRun ?? 300000) + totalWait; const timeout = (timeToRun ?? 300000) + totalWait;
const request: FireEngineScrapeRequestCommon & const request: FireEngineScrapeRequestCommon &

View File

@ -105,7 +105,10 @@ export type EngineScrapeResult = {
}; };
const engineHandlers: { const engineHandlers: {
[E in Engine]: (meta: Meta, timeToRun: number | undefined) => Promise<EngineScrapeResult>; [E in Engine]: (
meta: Meta,
timeToRun: number | undefined,
) => Promise<EngineScrapeResult>;
} = { } = {
cache: scrapeCache, cache: scrapeCache,
"fire-engine;chrome-cdp": scrapeURLWithFireEngineChromeCDP, "fire-engine;chrome-cdp": scrapeURLWithFireEngineChromeCDP,
@ -372,7 +375,7 @@ export function buildFallbackList(meta: Meta): {
export async function scrapeURLWithEngine( export async function scrapeURLWithEngine(
meta: Meta, meta: Meta,
engine: Engine, engine: Engine,
timeToRun: number | undefined timeToRun: number | undefined,
): Promise<EngineScrapeResult> { ): Promise<EngineScrapeResult> {
const fn = engineHandlers[engine]; const fn = engineHandlers[engine];
const logger = meta.logger.child({ const logger = meta.logger.child({

View File

@ -124,7 +124,10 @@ async function scrapePDFWithParsePDF(
}; };
} }
export async function scrapePDF(meta: Meta, timeToRun: number | undefined): Promise<EngineScrapeResult> { export async function scrapePDF(
meta: Meta,
timeToRun: number | undefined,
): Promise<EngineScrapeResult> {
if (!meta.options.parsePDF) { if (!meta.options.parsePDF) {
const file = await fetchFileToBuffer(meta.url); const file = await fetchFileToBuffer(meta.url);
const content = file.buffer.toString("base64"); const content = file.buffer.toString("base64");
@ -152,9 +155,12 @@ export async function scrapePDF(meta: Meta, timeToRun: number | undefined): Prom
tempFilePath, tempFilePath,
); );
// If the parsed text is under 500 characters and LLAMAPARSE_API_KEY exists, try LlamaParse // If the parsed text is under 500 characters and LLAMAPARSE_API_KEY exists, try LlamaParse
if (result.markdown && result.markdown.length < 500 && process.env.LLAMAPARSE_API_KEY) { if (
result.markdown &&
result.markdown.length < 500 &&
process.env.LLAMAPARSE_API_KEY
) {
try { try {
const llamaResult = await scrapePDFWithLlamaParse( const llamaResult = await scrapePDFWithLlamaParse(
{ {
@ -193,4 +199,4 @@ export async function scrapePDF(meta: Meta, timeToRun: number | undefined): Prom
html: result.html, html: result.html,
markdown: result.markdown, markdown: result.markdown,
}; };
} }

View File

@ -10,7 +10,10 @@ const client = new ScrapingBeeClient(process.env.SCRAPING_BEE_API_KEY!);
export function scrapeURLWithScrapingBee( export function scrapeURLWithScrapingBee(
wait_browser: "domcontentloaded" | "networkidle2", wait_browser: "domcontentloaded" | "networkidle2",
): (meta: Meta, timeToRun: number | undefined) => Promise<EngineScrapeResult> { ): (meta: Meta, timeToRun: number | undefined) => Promise<EngineScrapeResult> {
return async (meta: Meta, timeToRun: number | undefined): Promise<EngineScrapeResult> => { return async (
meta: Meta,
timeToRun: number | undefined,
): Promise<EngineScrapeResult> => {
let response: AxiosResponse<any>; let response: AxiosResponse<any>;
const timeout = (timeToRun ?? 300000) + meta.options.waitFor; const timeout = (timeToRun ?? 300000) + meta.options.waitFor;
try { try {

View File

@ -60,9 +60,7 @@ export class SiteError extends Error {
export class ActionError extends Error { export class ActionError extends Error {
public code: string; public code: string;
constructor(code: string) { constructor(code: string) {
super( super("Action(s) failed to complete. Error code: " + code);
"Action(s) failed to complete. Error code: " + code,
);
this.code = code; this.code = code;
} }
} }

View File

@ -203,9 +203,10 @@ async function scrapeURLLoop(meta: Meta): Promise<ScrapeUrlResponse> {
const results: EngineResultsTracker = {}; const results: EngineResultsTracker = {};
let result: EngineScrapeResultWithContext | null = null; let result: EngineScrapeResultWithContext | null = null;
const timeToRun = meta.options.timeout !== undefined const timeToRun =
? Math.round(meta.options.timeout / Math.min(fallbackList.length, 2)) meta.options.timeout !== undefined
: undefined ? Math.round(meta.options.timeout / Math.min(fallbackList.length, 2))
: undefined;
for (const { engine, unsupportedFeatures } of fallbackList) { for (const { engine, unsupportedFeatures } of fallbackList) {
const startedAt = Date.now(); const startedAt = Date.now();

View File

@ -72,7 +72,12 @@ async function addScrapeJobRaw(
} }
if (concurrencyLimited) { if (concurrencyLimited) {
await _addScrapeJobToConcurrencyQueue(webScraperOptions, options, jobId, jobPriority); await _addScrapeJobToConcurrencyQueue(
webScraperOptions,
options,
jobId,
jobPriority,
);
} else { } else {
await _addScrapeJobToBullMQ(webScraperOptions, options, jobId, jobPriority); await _addScrapeJobToBullMQ(webScraperOptions, options, jobId, jobPriority);
} }
@ -130,17 +135,17 @@ export async function addScrapeJobs(
let countCanBeDirectlyAdded = Infinity; let countCanBeDirectlyAdded = Infinity;
if ( if (jobs[0].data && jobs[0].data.team_id && jobs[0].data.plan) {
jobs[0].data &&
jobs[0].data.team_id &&
jobs[0].data.plan
) {
const now = Date.now(); const now = Date.now();
const limit = await getConcurrencyLimitMax(jobs[0].data.plan); const limit = await getConcurrencyLimitMax(jobs[0].data.plan);
console.log("CC limit", limit); console.log("CC limit", limit);
cleanOldConcurrencyLimitEntries(jobs[0].data.team_id, now); cleanOldConcurrencyLimitEntries(jobs[0].data.team_id, now);
countCanBeDirectlyAdded = Math.max(limit - (await getConcurrencyLimitActiveJobs(jobs[0].data.team_id, now)).length, 0); countCanBeDirectlyAdded = Math.max(
limit -
(await getConcurrencyLimitActiveJobs(jobs[0].data.team_id, now)).length,
0,
);
} }
const addToBull = jobs.slice(0, countCanBeDirectlyAdded); const addToBull = jobs.slice(0, countCanBeDirectlyAdded);

View File

@ -496,15 +496,14 @@ async function processJob(job: Job & { id: string }, token: string) {
// See lockURL // See lockURL
const x = await redisConnection.sadd( const x = await redisConnection.sadd(
"crawl:" + job.data.crawl_id + ":visited", "crawl:" + job.data.crawl_id + ":visited",
...p1.map(x => x.href), ...p1.map((x) => x.href),
); );
const lockRes = x === p1.length; const lockRes = x === p1.length;
if (job.data.crawlerOptions !== null && !lockRes) { if (job.data.crawlerOptions !== null && !lockRes) {
throw new RacedRedirectError(); throw new RacedRedirectError();
} }
} }
} }
logger.debug("Logging job to DB..."); logger.debug("Logging job to DB...");
@ -675,7 +674,10 @@ async function processJob(job: Job & { id: string }, token: string) {
logger.debug("Declaring job as done..."); logger.debug("Declaring job as done...");
await addCrawlJobDone(job.data.crawl_id, job.id, false); await addCrawlJobDone(job.data.crawl_id, job.id, false);
await redisConnection.srem("crawl:" + job.data.crawl_id + ":visited_unique", normalizeURL(job.data.url, sc)); await redisConnection.srem(
"crawl:" + job.data.crawl_id + ":visited_unique",
normalizeURL(job.data.url, sc),
);
logger.debug("Logging job to DB..."); logger.debug("Logging job to DB...");
await logJob( await logJob(