Nick: init

This commit is contained in:
Nicolas 2024-12-26 12:21:46 -03:00
parent c911aad228
commit f467a3ae6c
2 changed files with 241 additions and 120 deletions

View File

@ -7,6 +7,7 @@ import {
ExtractResponse, ExtractResponse,
MapDocument, MapDocument,
scrapeOptions, scrapeOptions,
URLTrace,
} from "./types"; } from "./types";
// import { Document } from "../../lib/entities"; // import { Document } from "../../lib/entities";
import Redis from "ioredis"; import Redis from "ioredis";
@ -56,14 +57,22 @@ export async function extractController(
let links: string[] = []; let links: string[] = [];
let docs: Document[] = []; let docs: Document[] = [];
const earlyReturn = false; const earlyReturn = false;
const urlTraces: URLTrace[] = [];
// Process all URLs in parallel // Process all URLs in parallel
const urlPromises = req.body.urls.map(async (url) => { const urlPromises = req.body.urls.map(async (url) => {
const trace: URLTrace = {
url,
status: 'mapped',
timing: {
discoveredAt: new Date().toISOString(),
},
};
urlTraces.push(trace);
if (url.includes("/*") || req.body.allowExternalLinks) { if (url.includes("/*") || req.body.allowExternalLinks) {
// Handle glob pattern URLs // Handle glob pattern URLs
const baseUrl = url.replace("/*", ""); const baseUrl = url.replace("/*", "");
// const pathPrefix = baseUrl.split('/').slice(3).join('/'); // Get path after domain if any
const allowExternalLinks = req.body.allowExternalLinks; const allowExternalLinks = req.body.allowExternalLinks;
let urlWithoutWww = baseUrl.replace("www.", ""); let urlWithoutWww = baseUrl.replace("www.", "");
@ -75,113 +84,167 @@ export async function extractController(
)) ?? req.body.prompt; )) ?? req.body.prompt;
} }
const mapResults = await getMapResults({ try {
url: baseUrl, const mapResults = await getMapResults({
search: rephrasedPrompt, url: baseUrl,
teamId: req.auth.team_id, search: rephrasedPrompt,
plan: req.auth.plan, teamId: req.auth.team_id,
allowExternalLinks, plan: req.auth.plan,
origin: req.body.origin, allowExternalLinks,
limit: req.body.limit, origin: req.body.origin,
// If we're self-hosted, we don't want to ignore the sitemap, due to our fire-engine mapping limit: req.body.limit,
ignoreSitemap: false, ignoreSitemap: false,
includeMetadata: true, includeMetadata: true,
includeSubdomains: req.body.includeSubdomains, includeSubdomains: req.body.includeSubdomains,
}); });
let mappedLinks = mapResults.mapResults as MapDocument[]; let mappedLinks = mapResults.mapResults as MapDocument[];
// Remove duplicates between mapResults.links and mappedLinks // Remove duplicates between mapResults.links and mappedLinks
const allUrls = [...mappedLinks.map((m) => m.url), ...mapResults.links]; const allUrls = [...mappedLinks.map((m) => m.url), ...mapResults.links];
const uniqueUrls = removeDuplicateUrls(allUrls); const uniqueUrls = removeDuplicateUrls(allUrls);
// Only add URLs from mapResults.links that aren't already in mappedLinks // Track all discovered URLs
const existingUrls = new Set(mappedLinks.map((m) => m.url)); uniqueUrls.forEach(discoveredUrl => {
const newUrls = uniqueUrls.filter((url) => !existingUrls.has(url)); if (!urlTraces.some(t => t.url === discoveredUrl)) {
urlTraces.push({
mappedLinks = [ url: discoveredUrl,
...mappedLinks, status: 'mapped',
...newUrls.map((url) => ({ url, title: "", description: "" })), timing: {
]; discoveredAt: new Date().toISOString(),
},
if (mappedLinks.length === 0) { usedInCompletion: false, // Default to false, will update if used
mappedLinks = [{ url: baseUrl, title: "", description: "" }]; });
}
// Limit number of links to MAX_EXTRACT_LIMIT
mappedLinks = mappedLinks.slice(0, MAX_EXTRACT_LIMIT);
let mappedLinksRerank = mappedLinks.map(
(x) =>
`url: ${x.url}, title: ${x.title}, description: ${x.description}`,
);
if (req.body.prompt) {
let searchQuery =
req.body.prompt && allowExternalLinks
? `${req.body.prompt} ${urlWithoutWww}`
: req.body.prompt
? `${req.body.prompt} site:${urlWithoutWww}`
: `site:${urlWithoutWww}`;
// Get similarity scores between the search query and each link's context
const linksAndScores = await performRanking(
mappedLinksRerank,
mappedLinks.map((l) => l.url),
searchQuery,
);
// First try with high threshold
let filteredLinks = filterAndProcessLinks(
mappedLinks,
linksAndScores,
INITIAL_SCORE_THRESHOLD,
);
// If we don't have enough high-quality links, try with lower threshold
if (filteredLinks.length < MIN_REQUIRED_LINKS) {
logger.info(
`Only found ${filteredLinks.length} links with score > ${INITIAL_SCORE_THRESHOLD}. Trying lower threshold...`,
);
filteredLinks = filterAndProcessLinks(
mappedLinks,
linksAndScores,
FALLBACK_SCORE_THRESHOLD,
);
if (filteredLinks.length === 0) {
// If still no results, take top N results regardless of score
logger.warn(
`No links found with score > ${FALLBACK_SCORE_THRESHOLD}. Taking top ${MIN_REQUIRED_LINKS} results.`,
);
filteredLinks = linksAndScores
.sort((a, b) => b.score - a.score)
.slice(0, MIN_REQUIRED_LINKS)
.map((x) => mappedLinks.find((link) => link.url === x.link))
.filter(
(x): x is MapDocument =>
x !== undefined &&
x.url !== undefined &&
!isUrlBlocked(x.url),
);
} }
});
// Only add URLs from mapResults.links that aren't already in mappedLinks
const existingUrls = new Set(mappedLinks.map((m) => m.url));
const newUrls = uniqueUrls.filter((url) => !existingUrls.has(url));
mappedLinks = [
...mappedLinks,
...newUrls.map((url) => ({ url, title: "", description: "" })),
];
if (mappedLinks.length === 0) {
mappedLinks = [{ url: baseUrl, title: "", description: "" }];
} }
mappedLinks = filteredLinks.slice(0, MAX_RANKING_LIMIT); // Limit number of links to MAX_EXTRACT_LIMIT
} mappedLinks = mappedLinks.slice(0, MAX_EXTRACT_LIMIT);
return mappedLinks.map((x) => x.url) as string[]; let mappedLinksRerank = mappedLinks.map(
(x) =>
`url: ${x.url}, title: ${x.title}, description: ${x.description}`,
);
if (req.body.prompt) {
let searchQuery =
req.body.prompt && allowExternalLinks
? `${req.body.prompt} ${urlWithoutWww}`
: req.body.prompt
? `${req.body.prompt} site:${urlWithoutWww}`
: `site:${urlWithoutWww}`;
// Get similarity scores between the search query and each link's context
const linksAndScores = await performRanking(
mappedLinksRerank,
mappedLinks.map((l) => l.url),
searchQuery,
);
// First try with high threshold
let filteredLinks = filterAndProcessLinks(
mappedLinks,
linksAndScores,
INITIAL_SCORE_THRESHOLD,
);
// If we don't have enough high-quality links, try with lower threshold
if (filteredLinks.length < MIN_REQUIRED_LINKS) {
logger.info(
`Only found ${filteredLinks.length} links with score > ${INITIAL_SCORE_THRESHOLD}. Trying lower threshold...`,
);
filteredLinks = filterAndProcessLinks(
mappedLinks,
linksAndScores,
FALLBACK_SCORE_THRESHOLD,
);
if (filteredLinks.length === 0) {
// If still no results, take top N results regardless of score
logger.warn(
`No links found with score > ${FALLBACK_SCORE_THRESHOLD}. Taking top ${MIN_REQUIRED_LINKS} results.`,
);
filteredLinks = linksAndScores
.sort((a, b) => b.score - a.score)
.slice(0, MIN_REQUIRED_LINKS)
.map((x) => mappedLinks.find((link) => link.url === x.link))
.filter(
(x): x is MapDocument =>
x !== undefined &&
x.url !== undefined &&
!isUrlBlocked(x.url),
);
}
}
// Update URL traces with relevance scores and mark filtered out URLs
linksAndScores.forEach((score) => {
const trace = urlTraces.find((t) => t.url === score.link);
if (trace) {
trace.relevanceScore = score.score;
// If URL didn't make it through filtering, mark it as filtered out
if (!filteredLinks.some(link => link.url === score.link)) {
trace.warning = `Relevance score ${score.score} below threshold`;
trace.usedInCompletion = false;
}
}
});
mappedLinks = filteredLinks.slice(0, MAX_RANKING_LIMIT);
// Mark URLs that will be used in completion
mappedLinks.forEach(link => {
const trace = urlTraces.find(t => t.url === link.url);
if (trace) {
trace.usedInCompletion = true;
}
});
// Mark URLs that were dropped due to ranking limit
filteredLinks.slice(MAX_RANKING_LIMIT).forEach(link => {
const trace = urlTraces.find(t => t.url === link.url);
if (trace) {
trace.warning = 'Excluded due to ranking limit';
trace.usedInCompletion = false;
}
});
}
return mappedLinks.map((x) => x.url);
} catch (error) {
trace.status = 'error';
trace.error = error.message;
trace.usedInCompletion = false;
return [];
}
} else { } else {
// Handle direct URLs without glob pattern // Handle direct URLs without glob pattern
if (!isUrlBlocked(url)) { if (!isUrlBlocked(url)) {
trace.usedInCompletion = true;
return [url]; return [url];
} }
trace.status = 'error';
trace.error = 'URL is blocked';
trace.usedInCompletion = false;
return []; return [];
} }
}); });
// Wait for all URL processing to complete and flatten results // Wait for all URL processing to complete and flatten results
const processedUrls = await Promise.all(urlPromises); const processedUrls = await Promise.all(urlPromises);
const flattenedUrls = processedUrls.flat().filter((url) => url); // Filter out any null/undefined values const flattenedUrls = processedUrls.flat().filter((url) => url);
links.push(...flattenedUrls); links.push(...flattenedUrls);
if (links.length === 0) { if (links.length === 0) {
@ -189,13 +252,20 @@ export async function extractController(
success: false, success: false,
error: error:
"No valid URLs found to scrape. Try adjusting your search criteria or including more URLs.", "No valid URLs found to scrape. Try adjusting your search criteria or including more URLs.",
urlTrace: urlTraces,
}); });
} }
// Scrape all links in parallel with retries // Scrape all links in parallel with retries
const scrapePromises = links.map(async (url) => { const scrapePromises = links.map(async (url) => {
const trace = urlTraces.find((t) => t.url === url);
if (trace) {
trace.status = 'scraped';
trace.timing.scrapedAt = new Date().toISOString();
}
const origin = req.body.origin || "api"; const origin = req.body.origin || "api";
const timeout = Math.floor((req.body.timeout || 40000) * 0.7) || 30000; // Use 70% of total timeout for individual scrapes const timeout = Math.floor((req.body.timeout || 40000) * 0.7) || 30000;
const jobId = crypto.randomUUID(); const jobId = crypto.randomUUID();
const jobPriority = await getJobPriority({ const jobPriority = await getJobPriority({
@ -204,31 +274,45 @@ export async function extractController(
basePriority: 10, basePriority: 10,
}); });
await addScrapeJob(
{
url,
mode: "single_urls",
team_id: req.auth.team_id,
scrapeOptions: scrapeOptions.parse({}),
internalOptions: {},
plan: req.auth.plan!,
origin,
is_scrape: true,
},
{},
jobId,
jobPriority,
);
try { try {
await addScrapeJob(
{
url,
mode: "single_urls",
team_id: req.auth.team_id,
scrapeOptions: scrapeOptions.parse({}),
internalOptions: {},
plan: req.auth.plan!,
origin,
is_scrape: true,
},
{},
jobId,
jobPriority,
);
const doc = await waitForJob<Document>(jobId, timeout); const doc = await waitForJob<Document>(jobId, timeout);
await getScrapeQueue().remove(jobId); await getScrapeQueue().remove(jobId);
if (trace) {
trace.timing.completedAt = new Date().toISOString();
trace.contentStats = {
rawContentLength: doc.markdown?.length || 0,
processedContentLength: doc.markdown?.length || 0,
tokensUsed: 0, // Will be updated after LLM processing
};
}
if (earlyReturn) { if (earlyReturn) {
return null; return null;
} }
return doc; return doc;
} catch (e) { } catch (e) {
logger.error(`Error in extractController: ${e}`); logger.error(`Error in extractController: ${e}`);
if (trace) {
trace.status = 'error';
trace.error = e.message;
}
return null; return null;
} }
}); });
@ -240,6 +324,7 @@ export async function extractController(
return res.status(e.status).json({ return res.status(e.status).json({
success: false, success: false,
error: e.error, error: e.error,
urlTrace: urlTraces,
}); });
} }
@ -256,9 +341,25 @@ export async function extractController(
}, },
docs.map((x) => buildDocument(x)).join("\n"), docs.map((x) => buildDocument(x)).join("\n"),
undefined, undefined,
true, // isExtractEndpoint true,
); );
// Update token usage in URL traces
if (completions.numTokens) {
// Distribute tokens proportionally based on content length
const totalLength = docs.reduce((sum, doc) => sum + (doc.markdown?.length || 0), 0);
docs.forEach((doc) => {
if (doc.metadata?.sourceURL) {
const trace = urlTraces.find((t) => t.url === doc.metadata.sourceURL);
if (trace && trace.contentStats) {
trace.contentStats.tokensUsed = Math.floor(
((doc.markdown?.length || 0) / totalLength) * completions.numTokens
);
}
}
});
}
// TODO: change this later // TODO: change this later
// While on beta, we're billing 5 credits per link discovered/scraped. // While on beta, we're billing 5 credits per link discovered/scraped.
billTeam(req.auth.team_id, req.acuc?.sub_id, links.length * 5).catch( billTeam(req.auth.team_id, req.acuc?.sub_id, links.length * 5).catch(
@ -292,6 +393,7 @@ export async function extractController(
data: data, data: data,
scrape_id: id, scrape_id: id,
warning: warning, warning: warning,
urlTrace: urlTraces,
}); });
} }

View File

@ -379,16 +379,16 @@ export type MapRequest = z.infer<typeof mapRequestSchema>;
export type Document = { export type Document = {
markdown?: string; markdown?: string;
extract?: any;
html?: string; html?: string;
rawHtml?: string; rawHtml?: string;
links?: string[]; links?: string[];
screenshot?: string; screenshot?: string;
extract?: any;
warning?: string;
actions?: { actions?: {
screenshots?: string[]; screenshots?: string[];
scrapes?: ScrapeActionContent[]; scrapes?: ScrapeActionContent[];
}; };
warning?: string;
metadata: { metadata: {
title?: string; title?: string;
description?: string; description?: string;
@ -425,7 +425,7 @@ export type Document = {
error?: string; error?: string;
[key: string]: string | string[] | number | undefined; [key: string]: string | string[] | number | undefined;
}; };
}; }
export type ErrorResponse = { export type ErrorResponse = {
success: false; success: false;
@ -448,14 +448,33 @@ export interface ScrapeResponseRequestTest {
error?: string; error?: string;
} }
export type ExtractResponse = export interface URLTrace {
| ErrorResponse url: string;
| { status: 'mapped' | 'scraped' | 'error';
success: true; timing: {
warning?: string; discoveredAt: string;
data: z.infer<typeof extractRequestSchema>; scrapedAt?: string;
scrape_id?: string; completedAt?: string;
}; };
error?: string;
warning?: string;
contentStats?: {
rawContentLength: number;
processedContentLength: number;
tokensUsed: number;
};
relevanceScore?: number;
usedInCompletion?: boolean;
}
export interface ExtractResponse {
success: boolean;
data?: any;
scrape_id?: string;
warning?: string;
error?: string;
urlTrace?: URLTrace[];
}
export interface ExtractResponseRequestTest { export interface ExtractResponseRequestTest {
statusCode: number; statusCode: number;