Nick: init

This commit is contained in:
Nicolas 2025-01-03 20:44:27 -03:00
parent 81cf05885b
commit 27457ed5db
7 changed files with 188 additions and 32 deletions

View File

@ -0,0 +1,53 @@
import { Response } from "express";
import {
supabaseGetJobByIdOnlyData,
supabaseGetJobsById,
} from "../../lib/supabase-jobs";
import { scrapeStatusRateLimiter } from "../../services/rate-limiter";
import { RequestWithAuth } from "./types";
export async function extractStatusController(
req: RequestWithAuth<{ jobId: string }, any, any>,
res: Response,
) {
try {
const rateLimiter = scrapeStatusRateLimiter;
const incomingIP = (req.headers["x-forwarded-for"] ||
req.socket.remoteAddress) as string;
const iptoken = incomingIP;
await rateLimiter.consume(iptoken);
const job = await supabaseGetJobByIdOnlyData(req.params.jobId);
if (!job || job.team_id !== req.auth.team_id) {
return res.status(403).json({
success: false,
error: "You are not allowed to access this resource.",
});
}
const jobData = await supabaseGetJobsById([req.params.jobId]);
if (!jobData || jobData.length === 0) {
return res.status(404).json({
success: false,
error: "Job not found",
});
}
return res.status(200).json({
success: true,
data: jobData[0].docs,
});
} catch (error) {
if (error instanceof Error && error.message == "Too Many Requests") {
return res.status(429).json({
success: false,
error: "Rate limit exceeded. Please try again later.",
});
} else {
return res.status(500).json({
success: false,
error: "An unexpected error occurred.",
});
}
}
}

View File

@ -5,7 +5,9 @@ import {
extractRequestSchema,
ExtractResponse,
} from "./types";
import { performExtraction } from "../../lib/extract/extraction-service";
import { getExtractQueue } from "../../services/queue-service";
import * as Sentry from "@sentry/node";
import { v4 as uuidv4 } from "uuid";
/**
* Extracts data from the provided URLs based on the request parameters.
@ -29,12 +31,47 @@ export async function extractController(
});
}
const result = await performExtraction({
const extractId = crypto.randomUUID();
const jobData = {
request: req.body,
teamId: req.auth.team_id,
plan: req.auth.plan,
subId: req.acuc?.sub_id || undefined,
});
subId: req.acuc?.sub_id,
extractId,
};
return res.status(result.success ? 200 : 400).json(result);
if (Sentry.isInitialized()) {
const size = JSON.stringify(jobData).length;
await Sentry.startSpan(
{
name: "Add extract job",
op: "queue.publish",
attributes: {
"messaging.message.id": extractId,
"messaging.destination.name": getExtractQueue().name,
"messaging.message.body.size": size,
},
},
async (span) => {
await getExtractQueue().add(extractId, {
...jobData,
sentry: {
trace: Sentry.spanToTraceHeader(span),
baggage: Sentry.spanToBaggageHeader(span),
size,
},
});
},
);
} else {
await getExtractQueue().add(extractId, jobData, {
jobId: extractId,
});
}
return res.status(202).json({
success: true,
id: extractId,
urlTrace: [],
});
}

View File

@ -478,10 +478,11 @@ export interface URLTrace {
export interface ExtractResponse {
success: boolean;
error?: string;
data?: any;
scrape_id?: string;
id?: string;
warning?: string;
error?: string;
urlTrace?: URLTrace[];
}

View File

@ -20,7 +20,7 @@ interface ExtractServiceOptions {
interface ExtractResult {
success: boolean;
data?: any;
scrapeId: string;
extractId: string;
warning?: string;
urlTrace?: URLTrace[];
error?: string;
@ -38,9 +38,8 @@ function getRootDomain(url: string): string {
}
}
export async function performExtraction(options: ExtractServiceOptions): Promise<ExtractResult> {
export async function performExtraction(extractId, options: ExtractServiceOptions): Promise<ExtractResult> {
const { request, teamId, plan, subId } = options;
const scrapeId = crypto.randomUUID();
const urlTraces: URLTrace[] = [];
let docs: Document[] = [];
@ -65,7 +64,7 @@ export async function performExtraction(options: ExtractServiceOptions): Promise
return {
success: false,
error: "No valid URLs found to scrape. Try adjusting your search criteria or including more URLs.",
scrapeId,
extractId,
urlTrace: urlTraces,
};
}
@ -89,7 +88,7 @@ export async function performExtraction(options: ExtractServiceOptions): Promise
return {
success: false,
error: error.message,
scrapeId,
extractId,
urlTrace: urlTraces,
};
}
@ -191,7 +190,7 @@ export async function performExtraction(options: ExtractServiceOptions): Promise
// Log job
logJob({
job_id: scrapeId,
job_id: extractId,
success: true,
message: "Extract completed",
num_docs: 1,
@ -208,7 +207,7 @@ export async function performExtraction(options: ExtractServiceOptions): Promise
return {
success: true,
data: completions.extract ?? {},
scrapeId,
extractId,
warning: completions.warning,
urlTrace: request.urlTrace ? urlTraces : undefined,
};

View File

@ -24,13 +24,7 @@ import { scrapeStatusController } from "../controllers/v1/scrape-status";
import { concurrencyCheckController } from "../controllers/v1/concurrency-check";
import { batchScrapeController } from "../controllers/v1/batch-scrape";
import { extractController } from "../controllers/v1/extract";
// import { crawlPreviewController } from "../../src/controllers/v1/crawlPreview";
// import { crawlJobStatusPreviewController } from "../../src/controllers/v1/status";
// import { searchController } from "../../src/controllers/v1/search";
// import { crawlCancelController } from "../../src/controllers/v1/crawl-cancel";
// import { keyAuthController } from "../../src/controllers/v1/keyAuth";
// import { livenessController } from "../controllers/v1/liveness";
// import { readinessController } from "../controllers/v1/readiness";
import { extractStatusController } from "../controllers/v1/extract-status";
import { creditUsageController } from "../controllers/v1/credit-usage";
import { BLOCKLISTED_URL_MESSAGE } from "../lib/strings";
import { searchController } from "../controllers/v1/search";
@ -215,6 +209,12 @@ v1Router.post(
wrap(extractController),
);
v1Router.get(
"/extract/:jobId",
authMiddleware(RateLimiterMode.CrawlStatus),
wrap(extractStatusController),
);
// v1Router.post("/crawlWebsitePreview", crawlPreviewController);
v1Router.delete(

View File

@ -3,12 +3,16 @@ import { logger } from "../lib/logger";
import IORedis from "ioredis";
let scrapeQueue: Queue;
let extractQueue: Queue;
let loggingQueue: Queue;
export const redisConnection = new IORedis(process.env.REDIS_URL!, {
maxRetriesPerRequest: null,
});
export const scrapeQueueName = "{scrapeQueue}";
export const extractQueueName = "{extractQueue}";
export const loggingQueueName = "{loggingQueue}";
export function getScrapeQueue() {
if (!scrapeQueue) {
@ -24,24 +28,35 @@ export function getScrapeQueue() {
age: 90000, // 25 hours
},
},
},
// {
// settings: {
// lockDuration: 1 * 60 * 1000, // 1 minute in milliseconds,
// lockRenewTime: 15 * 1000, // 15 seconds in milliseconds
// stalledInterval: 30 * 1000,
// maxStalledCount: 10,
// },
// defaultJobOptions:{
// attempts: 5
// }
// }
}
);
logger.info("Web scraper queue created");
}
return scrapeQueue;
}
export function getExtractQueue() {
if (!extractQueue) {
extractQueue = new Queue(
extractQueueName,
{
connection: redisConnection,
defaultJobOptions: {
removeOnComplete: {
age: 90000, // 25 hours
},
removeOnFail: {
age: 90000, // 25 hours
},
},
}
);
logger.info("Extraction queue created");
}
return extractQueue;
}
// === REMOVED IN FAVOR OF POLLING -- NOT RELIABLE
// import { QueueEvents } from 'bullmq';
// export const scrapeQueueEvents = new QueueEvents(scrapeQueueName, { connection: redisConnection.duplicate() });

View File

@ -4,8 +4,10 @@ import * as Sentry from "@sentry/node";
import { CustomError } from "../lib/custom-error";
import {
getScrapeQueue,
getExtractQueue,
redisConnection,
scrapeQueueName,
extractQueueName,
} from "./queue-service";
import { startWebScraperPipeline } from "../main/runWebScraper";
import { callWebhook } from "./webhook";
@ -50,6 +52,7 @@ import { isUrlBlocked } from "../scraper/WebScraper/utils/blocklist";
import { BLOCKLISTED_URL_MESSAGE } from "../lib/strings";
import { indexPage } from "../lib/extract/index/pinecone";
import { Document } from "../controllers/v1/types";
import { performExtraction } from "../lib/extract/extraction-service";
configDotenv();
@ -243,6 +246,52 @@ const processJobInternal = async (token: string, job: Job & { id: string }) => {
return err;
};
const processExtractJobInternal = async (token: string, job: Job & { id: string }) => {
const logger = _logger.child({
module: "extract-worker",
method: "processJobInternal",
jobId: job.id,
extractId: job.data.extractId,
teamId: job.data?.teamId ?? undefined,
});
const extendLockInterval = setInterval(async () => {
logger.info(`🔄 Worker extending lock on job ${job.id}`);
await job.extendLock(token, jobLockExtensionTime);
}, jobLockExtendInterval);
try {
const result = await performExtraction(job.data.extractId, {
request: job.data.request,
teamId: job.data.teamId,
plan: job.data.plan,
subId: job.data.subId,
});
if (result.success) {
// Move job to completed state in Redis
await job.moveToCompleted(result, token, false);
return result;
} else {
throw new Error(result.error || "Unknown error during extraction");
}
} catch (error) {
logger.error(`🚫 Job errored ${job.id} - ${error}`, { error });
Sentry.captureException(error, {
data: {
job: job.id,
},
});
// Move job to failed state in Redis
await job.moveToFailed(error, token, false);
throw error;
} finally {
clearInterval(extendLockInterval);
}
};
let isShuttingDown = false;
process.on("SIGINT", () => {
@ -399,7 +448,9 @@ const workerFun = async (
}
};
// Start both workers
workerFun(getScrapeQueue(), processJobInternal);
workerFun(getExtractQueue(), processExtractJobInternal);
async function processKickoffJob(job: Job & { id: string }, token: string) {
const logger = _logger.child({