Merge branch 'main' into v1-webscraper

This commit is contained in:
Nicolas 2024-08-16 13:43:28 -04:00
commit b0d211ecc1
4 changed files with 21 additions and 6 deletions

View File

@ -336,6 +336,8 @@ function getPlanByPriceId(price_id: string) {
case process.env.STRIPE_PRICE_ID_GROWTH: case process.env.STRIPE_PRICE_ID_GROWTH:
case process.env.STRIPE_PRICE_ID_GROWTH_YEARLY: case process.env.STRIPE_PRICE_ID_GROWTH_YEARLY:
return "growth"; return "growth";
case process.env.STRIPE_PRICE_ID_GROWTH_DOUBLE_MONTHLY:
return "growthdouble";
default: default:
return "free"; return "free";
} }

View File

@ -45,6 +45,16 @@ export async function isCrawlFinished(id: string) {
return (await redisConnection.scard("crawl:" + id + ":jobs_done")) === (await redisConnection.scard("crawl:" + id + ":jobs")); return (await redisConnection.scard("crawl:" + id + ":jobs_done")) === (await redisConnection.scard("crawl:" + id + ":jobs"));
} }
export async function finishCrawl(id: string) {
if (await isCrawlFinished(id)) {
const set = await redisConnection.setnx("crawl:" + id + ":finish", "yes");
if (set === 1) {
await redisConnection.expire("crawl:" + id + ":finish", 24 * 60 * 60);
}
return set === 1
}
}
export async function getCrawlJobs(id: string): Promise<string[]> { export async function getCrawlJobs(id: string): Promise<string[]> {
return await redisConnection.smembers("crawl:" + id + ":jobs"); return await redisConnection.smembers("crawl:" + id + ":jobs");
} }

View File

@ -15,7 +15,7 @@ import { Logger } from "../lib/logger";
import { Worker } from "bullmq"; import { Worker } from "bullmq";
import systemMonitor from "./system-monitor"; import systemMonitor from "./system-monitor";
import { v4 as uuidv4 } from "uuid"; import { v4 as uuidv4 } from "uuid";
import { addCrawlJob, addCrawlJobDone, crawlToCrawler, getCrawl, getCrawlJobs, isCrawlFinished, lockURL } from "../lib/crawl-redis"; import { addCrawlJob, addCrawlJobDone, crawlToCrawler, finishCrawl, getCrawl, getCrawlJobs, lockURL } from "../lib/crawl-redis";
import { StoredCrawl } from "../lib/crawl-redis"; import { StoredCrawl } from "../lib/crawl-redis";
import { addScrapeJob } from "./queue-jobs"; import { addScrapeJob } from "./queue-jobs";
import { supabaseGetJobById } from "../../src/lib/supabase-jobs"; import { supabaseGetJobById } from "../../src/lib/supabase-jobs";
@ -174,7 +174,7 @@ async function processJob(job: Job, token: string) {
if (!sc.cancelled) { if (!sc.cancelled) {
const crawler = crawlToCrawler(job.data.crawl_id, sc); const crawler = crawlToCrawler(job.data.crawl_id, sc);
const links = crawler.filterLinks((data.docs[0].linksOnPage as string[]) const links = crawler.filterLinks((data.docs[0].linksOnPage ?? [])
.map(href => crawler.filterURL(href.trim(), sc.originUrl)) .map(href => crawler.filterURL(href.trim(), sc.originUrl))
.filter(x => x !== null), .filter(x => x !== null),
Infinity, Infinity,
@ -199,7 +199,7 @@ async function processJob(job: Job, token: string) {
} }
} }
if (await isCrawlFinished(job.data.crawl_id)) { if (await finishCrawl(job.data.crawl_id)) {
const jobIDs = await getCrawlJobs(job.data.crawl_id); const jobIDs = await getCrawlJobs(job.data.crawl_id);
const jobs = (await Promise.all(jobIDs.map(async x => { const jobs = (await Promise.all(jobIDs.map(async x => {
@ -226,14 +226,14 @@ async function processJob(job: Job, token: string) {
return j; return j;
}))).sort((a, b) => a.timestamp - b.timestamp); }))).sort((a, b) => a.timestamp - b.timestamp);
const jobStatuses = await Promise.all(jobs.map(x => x.getState())); const jobStatuses = await Promise.all(jobs.map(x => x.getState()));
const jobStatus = sc.cancelled ? "failed" : jobStatuses.every(x => x === "completed") ? "completed" : jobStatuses.some(x => x === "failed") ? "failed" : "active"; const jobStatus = sc.cancelled || jobStatuses.some(x => x === "failed") ? "failed" : "completed";
const fullDocs = jobs.map(x => Array.isArray(x.returnvalue) ? x.returnvalue[0] : x.returnvalue); const fullDocs = jobs.map(x => Array.isArray(x.returnvalue) ? x.returnvalue[0] : x.returnvalue);
await logJob({ await logJob({
job_id: job.data.crawl_id, job_id: job.data.crawl_id,
success: jobStatus === "completed", success: jobStatus === "completed",
message: message, message: sc.cancelled ? "Cancelled" : message,
num_docs: fullDocs.length, num_docs: fullDocs.length,
docs: [], docs: [],
time_taken: (Date.now() - sc.createdAt) / 1000, time_taken: (Date.now() - sc.createdAt) / 1000,
@ -260,7 +260,7 @@ async function processJob(job: Job, token: string) {
docs: fullDocs, docs: fullDocs,
}; };
await callWebhook(job.data.team_id, job.id as string, data); await callWebhook(job.data.team_id, job.data.crawl_id, data);
} }
} }

View File

@ -14,6 +14,7 @@ const RATE_LIMITS = {
standardNew: 10, standardNew: 10,
standardnew: 10, standardnew: 10,
growth: 50, growth: 50,
growthdouble: 50,
}, },
scrape: { scrape: {
default: 20, default: 20,
@ -26,6 +27,7 @@ const RATE_LIMITS = {
standardNew: 50, standardNew: 50,
standardnew: 50, standardnew: 50,
growth: 500, growth: 500,
growthdouble: 500,
}, },
search: { search: {
default: 20, default: 20,
@ -38,6 +40,7 @@ const RATE_LIMITS = {
standardNew: 50, standardNew: 50,
standardnew: 50, standardnew: 50,
growth: 500, growth: 500,
growthdouble: 500,
}, },
preview: { preview: {
free: 5, free: 5,