From f467a3ae6c62759addb88136a3f63cd11a96a18c Mon Sep 17 00:00:00 2001 From: Nicolas Date: Thu, 26 Dec 2024 12:21:46 -0300 Subject: [PATCH 1/4] Nick: init --- apps/api/src/controllers/v1/extract.ts | 320 ++++++++++++++++--------- apps/api/src/controllers/v1/types.ts | 41 +++- 2 files changed, 241 insertions(+), 120 deletions(-) diff --git a/apps/api/src/controllers/v1/extract.ts b/apps/api/src/controllers/v1/extract.ts index 58e75751..107c1fad 100644 --- a/apps/api/src/controllers/v1/extract.ts +++ b/apps/api/src/controllers/v1/extract.ts @@ -7,6 +7,7 @@ import { ExtractResponse, MapDocument, scrapeOptions, + URLTrace, } from "./types"; // import { Document } from "../../lib/entities"; import Redis from "ioredis"; @@ -56,14 +57,22 @@ export async function extractController( let links: string[] = []; let docs: Document[] = []; const earlyReturn = false; + const urlTraces: URLTrace[] = []; // Process all URLs in parallel const urlPromises = req.body.urls.map(async (url) => { + const trace: URLTrace = { + url, + status: 'mapped', + timing: { + discoveredAt: new Date().toISOString(), + }, + }; + urlTraces.push(trace); + if (url.includes("/*") || req.body.allowExternalLinks) { // Handle glob pattern URLs const baseUrl = url.replace("/*", ""); - // const pathPrefix = baseUrl.split('/').slice(3).join('/'); // Get path after domain if any - const allowExternalLinks = req.body.allowExternalLinks; let urlWithoutWww = baseUrl.replace("www.", ""); @@ -75,113 +84,167 @@ export async function extractController( )) ?? req.body.prompt; } - const mapResults = await getMapResults({ - url: baseUrl, - search: rephrasedPrompt, - teamId: req.auth.team_id, - plan: req.auth.plan, - allowExternalLinks, - origin: req.body.origin, - limit: req.body.limit, - // If we're self-hosted, we don't want to ignore the sitemap, due to our fire-engine mapping - ignoreSitemap: false, - includeMetadata: true, - includeSubdomains: req.body.includeSubdomains, - }); + try { + const mapResults = await getMapResults({ + url: baseUrl, + search: rephrasedPrompt, + teamId: req.auth.team_id, + plan: req.auth.plan, + allowExternalLinks, + origin: req.body.origin, + limit: req.body.limit, + ignoreSitemap: false, + includeMetadata: true, + includeSubdomains: req.body.includeSubdomains, + }); - let mappedLinks = mapResults.mapResults as MapDocument[]; + let mappedLinks = mapResults.mapResults as MapDocument[]; - // Remove duplicates between mapResults.links and mappedLinks - const allUrls = [...mappedLinks.map((m) => m.url), ...mapResults.links]; - const uniqueUrls = removeDuplicateUrls(allUrls); + // Remove duplicates between mapResults.links and mappedLinks + const allUrls = [...mappedLinks.map((m) => m.url), ...mapResults.links]; + const uniqueUrls = removeDuplicateUrls(allUrls); - // Only add URLs from mapResults.links that aren't already in mappedLinks - const existingUrls = new Set(mappedLinks.map((m) => m.url)); - const newUrls = uniqueUrls.filter((url) => !existingUrls.has(url)); - - mappedLinks = [ - ...mappedLinks, - ...newUrls.map((url) => ({ url, title: "", description: "" })), - ]; - - if (mappedLinks.length === 0) { - mappedLinks = [{ url: baseUrl, title: "", description: "" }]; - } - - // Limit number of links to MAX_EXTRACT_LIMIT - mappedLinks = mappedLinks.slice(0, MAX_EXTRACT_LIMIT); - - let mappedLinksRerank = mappedLinks.map( - (x) => - `url: ${x.url}, title: ${x.title}, description: ${x.description}`, - ); - - if (req.body.prompt) { - let searchQuery = - req.body.prompt && allowExternalLinks - ? `${req.body.prompt} ${urlWithoutWww}` - : req.body.prompt - ? `${req.body.prompt} site:${urlWithoutWww}` - : `site:${urlWithoutWww}`; - // Get similarity scores between the search query and each link's context - const linksAndScores = await performRanking( - mappedLinksRerank, - mappedLinks.map((l) => l.url), - searchQuery, - ); - - // First try with high threshold - let filteredLinks = filterAndProcessLinks( - mappedLinks, - linksAndScores, - INITIAL_SCORE_THRESHOLD, - ); - - // If we don't have enough high-quality links, try with lower threshold - if (filteredLinks.length < MIN_REQUIRED_LINKS) { - logger.info( - `Only found ${filteredLinks.length} links with score > ${INITIAL_SCORE_THRESHOLD}. Trying lower threshold...`, - ); - filteredLinks = filterAndProcessLinks( - mappedLinks, - linksAndScores, - FALLBACK_SCORE_THRESHOLD, - ); - - if (filteredLinks.length === 0) { - // If still no results, take top N results regardless of score - logger.warn( - `No links found with score > ${FALLBACK_SCORE_THRESHOLD}. Taking top ${MIN_REQUIRED_LINKS} results.`, - ); - filteredLinks = linksAndScores - .sort((a, b) => b.score - a.score) - .slice(0, MIN_REQUIRED_LINKS) - .map((x) => mappedLinks.find((link) => link.url === x.link)) - .filter( - (x): x is MapDocument => - x !== undefined && - x.url !== undefined && - !isUrlBlocked(x.url), - ); + // Track all discovered URLs + uniqueUrls.forEach(discoveredUrl => { + if (!urlTraces.some(t => t.url === discoveredUrl)) { + urlTraces.push({ + url: discoveredUrl, + status: 'mapped', + timing: { + discoveredAt: new Date().toISOString(), + }, + usedInCompletion: false, // Default to false, will update if used + }); } + }); + + // Only add URLs from mapResults.links that aren't already in mappedLinks + const existingUrls = new Set(mappedLinks.map((m) => m.url)); + const newUrls = uniqueUrls.filter((url) => !existingUrls.has(url)); + + mappedLinks = [ + ...mappedLinks, + ...newUrls.map((url) => ({ url, title: "", description: "" })), + ]; + + if (mappedLinks.length === 0) { + mappedLinks = [{ url: baseUrl, title: "", description: "" }]; } - mappedLinks = filteredLinks.slice(0, MAX_RANKING_LIMIT); - } + // Limit number of links to MAX_EXTRACT_LIMIT + mappedLinks = mappedLinks.slice(0, MAX_EXTRACT_LIMIT); - return mappedLinks.map((x) => x.url) as string[]; + let mappedLinksRerank = mappedLinks.map( + (x) => + `url: ${x.url}, title: ${x.title}, description: ${x.description}`, + ); + + if (req.body.prompt) { + let searchQuery = + req.body.prompt && allowExternalLinks + ? `${req.body.prompt} ${urlWithoutWww}` + : req.body.prompt + ? `${req.body.prompt} site:${urlWithoutWww}` + : `site:${urlWithoutWww}`; + // Get similarity scores between the search query and each link's context + const linksAndScores = await performRanking( + mappedLinksRerank, + mappedLinks.map((l) => l.url), + searchQuery, + ); + + // First try with high threshold + let filteredLinks = filterAndProcessLinks( + mappedLinks, + linksAndScores, + INITIAL_SCORE_THRESHOLD, + ); + + // If we don't have enough high-quality links, try with lower threshold + if (filteredLinks.length < MIN_REQUIRED_LINKS) { + logger.info( + `Only found ${filteredLinks.length} links with score > ${INITIAL_SCORE_THRESHOLD}. Trying lower threshold...`, + ); + filteredLinks = filterAndProcessLinks( + mappedLinks, + linksAndScores, + FALLBACK_SCORE_THRESHOLD, + ); + + if (filteredLinks.length === 0) { + // If still no results, take top N results regardless of score + logger.warn( + `No links found with score > ${FALLBACK_SCORE_THRESHOLD}. Taking top ${MIN_REQUIRED_LINKS} results.`, + ); + filteredLinks = linksAndScores + .sort((a, b) => b.score - a.score) + .slice(0, MIN_REQUIRED_LINKS) + .map((x) => mappedLinks.find((link) => link.url === x.link)) + .filter( + (x): x is MapDocument => + x !== undefined && + x.url !== undefined && + !isUrlBlocked(x.url), + ); + } + } + + // Update URL traces with relevance scores and mark filtered out URLs + linksAndScores.forEach((score) => { + const trace = urlTraces.find((t) => t.url === score.link); + if (trace) { + trace.relevanceScore = score.score; + // If URL didn't make it through filtering, mark it as filtered out + if (!filteredLinks.some(link => link.url === score.link)) { + trace.warning = `Relevance score ${score.score} below threshold`; + trace.usedInCompletion = false; + } + } + }); + + mappedLinks = filteredLinks.slice(0, MAX_RANKING_LIMIT); + + // Mark URLs that will be used in completion + mappedLinks.forEach(link => { + const trace = urlTraces.find(t => t.url === link.url); + if (trace) { + trace.usedInCompletion = true; + } + }); + + // Mark URLs that were dropped due to ranking limit + filteredLinks.slice(MAX_RANKING_LIMIT).forEach(link => { + const trace = urlTraces.find(t => t.url === link.url); + if (trace) { + trace.warning = 'Excluded due to ranking limit'; + trace.usedInCompletion = false; + } + }); + } + + return mappedLinks.map((x) => x.url); + } catch (error) { + trace.status = 'error'; + trace.error = error.message; + trace.usedInCompletion = false; + return []; + } } else { // Handle direct URLs without glob pattern if (!isUrlBlocked(url)) { + trace.usedInCompletion = true; return [url]; } + trace.status = 'error'; + trace.error = 'URL is blocked'; + trace.usedInCompletion = false; return []; } }); // Wait for all URL processing to complete and flatten results const processedUrls = await Promise.all(urlPromises); - const flattenedUrls = processedUrls.flat().filter((url) => url); // Filter out any null/undefined values + const flattenedUrls = processedUrls.flat().filter((url) => url); links.push(...flattenedUrls); if (links.length === 0) { @@ -189,13 +252,20 @@ export async function extractController( success: false, error: "No valid URLs found to scrape. Try adjusting your search criteria or including more URLs.", + urlTrace: urlTraces, }); } // Scrape all links in parallel with retries const scrapePromises = links.map(async (url) => { + const trace = urlTraces.find((t) => t.url === url); + if (trace) { + trace.status = 'scraped'; + trace.timing.scrapedAt = new Date().toISOString(); + } + const origin = req.body.origin || "api"; - const timeout = Math.floor((req.body.timeout || 40000) * 0.7) || 30000; // Use 70% of total timeout for individual scrapes + const timeout = Math.floor((req.body.timeout || 40000) * 0.7) || 30000; const jobId = crypto.randomUUID(); const jobPriority = await getJobPriority({ @@ -204,31 +274,45 @@ export async function extractController( basePriority: 10, }); - await addScrapeJob( - { - url, - mode: "single_urls", - team_id: req.auth.team_id, - scrapeOptions: scrapeOptions.parse({}), - internalOptions: {}, - plan: req.auth.plan!, - origin, - is_scrape: true, - }, - {}, - jobId, - jobPriority, - ); - try { + await addScrapeJob( + { + url, + mode: "single_urls", + team_id: req.auth.team_id, + scrapeOptions: scrapeOptions.parse({}), + internalOptions: {}, + plan: req.auth.plan!, + origin, + is_scrape: true, + }, + {}, + jobId, + jobPriority, + ); + const doc = await waitForJob(jobId, timeout); await getScrapeQueue().remove(jobId); + + if (trace) { + trace.timing.completedAt = new Date().toISOString(); + trace.contentStats = { + rawContentLength: doc.markdown?.length || 0, + processedContentLength: doc.markdown?.length || 0, + tokensUsed: 0, // Will be updated after LLM processing + }; + } + if (earlyReturn) { return null; } return doc; } catch (e) { logger.error(`Error in extractController: ${e}`); + if (trace) { + trace.status = 'error'; + trace.error = e.message; + } return null; } }); @@ -240,6 +324,7 @@ export async function extractController( return res.status(e.status).json({ success: false, error: e.error, + urlTrace: urlTraces, }); } @@ -256,9 +341,25 @@ export async function extractController( }, docs.map((x) => buildDocument(x)).join("\n"), undefined, - true, // isExtractEndpoint + true, ); + // Update token usage in URL traces + if (completions.numTokens) { + // Distribute tokens proportionally based on content length + const totalLength = docs.reduce((sum, doc) => sum + (doc.markdown?.length || 0), 0); + docs.forEach((doc) => { + if (doc.metadata?.sourceURL) { + const trace = urlTraces.find((t) => t.url === doc.metadata.sourceURL); + if (trace && trace.contentStats) { + trace.contentStats.tokensUsed = Math.floor( + ((doc.markdown?.length || 0) / totalLength) * completions.numTokens + ); + } + } + }); + } + // TODO: change this later // While on beta, we're billing 5 credits per link discovered/scraped. billTeam(req.auth.team_id, req.acuc?.sub_id, links.length * 5).catch( @@ -292,6 +393,7 @@ export async function extractController( data: data, scrape_id: id, warning: warning, + urlTrace: urlTraces, }); } diff --git a/apps/api/src/controllers/v1/types.ts b/apps/api/src/controllers/v1/types.ts index 114c115e..5f388920 100644 --- a/apps/api/src/controllers/v1/types.ts +++ b/apps/api/src/controllers/v1/types.ts @@ -379,16 +379,16 @@ export type MapRequest = z.infer; export type Document = { markdown?: string; - extract?: any; html?: string; rawHtml?: string; links?: string[]; screenshot?: string; + extract?: any; + warning?: string; actions?: { screenshots?: string[]; scrapes?: ScrapeActionContent[]; }; - warning?: string; metadata: { title?: string; description?: string; @@ -425,7 +425,7 @@ export type Document = { error?: string; [key: string]: string | string[] | number | undefined; }; -}; +} export type ErrorResponse = { success: false; @@ -448,14 +448,33 @@ export interface ScrapeResponseRequestTest { error?: string; } -export type ExtractResponse = - | ErrorResponse - | { - success: true; - warning?: string; - data: z.infer; - scrape_id?: string; - }; +export interface URLTrace { + url: string; + status: 'mapped' | 'scraped' | 'error'; + timing: { + discoveredAt: string; + scrapedAt?: string; + completedAt?: string; + }; + error?: string; + warning?: string; + contentStats?: { + rawContentLength: number; + processedContentLength: number; + tokensUsed: number; + }; + relevanceScore?: number; + usedInCompletion?: boolean; +} + +export interface ExtractResponse { + success: boolean; + data?: any; + scrape_id?: string; + warning?: string; + error?: string; + urlTrace?: URLTrace[]; +} export interface ExtractResponseRequestTest { statusCode: number; From 233f347f5e73c26ac34a4020c4d2964e69538300 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Thu, 26 Dec 2024 12:41:37 -0300 Subject: [PATCH 2/4] Nick: refactor --- apps/api/src/controllers/v1/extract.ts | 404 +----------------- apps/api/src/lib/extract/document-scraper.ts | 69 +++ .../api/src/lib/extract/extraction-service.ts | 145 +++++++ apps/api/src/lib/extract/reranker.ts | 124 +++++- apps/api/src/lib/extract/url-processor.ts | 118 +++++ 5 files changed, 465 insertions(+), 395 deletions(-) create mode 100644 apps/api/src/lib/extract/document-scraper.ts create mode 100644 apps/api/src/lib/extract/extraction-service.ts create mode 100644 apps/api/src/lib/extract/url-processor.ts diff --git a/apps/api/src/controllers/v1/extract.ts b/apps/api/src/controllers/v1/extract.ts index 107c1fad..af63f446 100644 --- a/apps/api/src/controllers/v1/extract.ts +++ b/apps/api/src/controllers/v1/extract.ts @@ -1,42 +1,11 @@ import { Request, Response } from "express"; import { - Document, RequestWithAuth, ExtractRequest, extractRequestSchema, ExtractResponse, - MapDocument, - scrapeOptions, - URLTrace, } from "./types"; -// import { Document } from "../../lib/entities"; -import Redis from "ioredis"; -import { configDotenv } from "dotenv"; -import { performRanking } from "../../lib/ranker"; -import { billTeam } from "../../services/billing/credit_billing"; -import { logJob } from "../../services/logging/log_job"; -import { logger } from "../../lib/logger"; -import { getScrapeQueue } from "../../services/queue-service"; -import { waitForJob } from "../../services/queue-jobs"; -import { addScrapeJob } from "../../services/queue-jobs"; -import { PlanType } from "../../types"; -import { getJobPriority } from "../../lib/job-priority"; -import { generateOpenAICompletions } from "../../scraper/scrapeURL/transformers/llmExtract"; -import { isUrlBlocked } from "../../scraper/WebScraper/utils/blocklist"; -import { getMapResults } from "./map"; -import { buildDocument } from "../../lib/extract/build-document"; -import { generateBasicCompletion } from "../../lib/LLM-extraction"; -import { buildRefrasedPrompt } from "../../lib/extract/build-prompts"; -import { removeDuplicateUrls } from "../../lib/validateUrl"; - -configDotenv(); -const redis = new Redis(process.env.REDIS_URL!); - -const MAX_EXTRACT_LIMIT = 100; -const MAX_RANKING_LIMIT = 10; -const INITIAL_SCORE_THRESHOLD = 0.75; -const FALLBACK_SCORE_THRESHOLD = 0.5; -const MIN_REQUIRED_LINKS = 1; +import { performExtraction } from "../../lib/extract/extraction-service"; /** * Extracts data from the provided URLs based on the request parameters. @@ -50,375 +19,22 @@ export async function extractController( res: Response, ) { const selfHosted = process.env.USE_DB_AUTHENTICATION !== "true"; - req.body = extractRequestSchema.parse(req.body); - const id = crypto.randomUUID(); - let links: string[] = []; - let docs: Document[] = []; - const earlyReturn = false; - const urlTraces: URLTrace[] = []; - - // Process all URLs in parallel - const urlPromises = req.body.urls.map(async (url) => { - const trace: URLTrace = { - url, - status: 'mapped', - timing: { - discoveredAt: new Date().toISOString(), - }, - }; - urlTraces.push(trace); - - if (url.includes("/*") || req.body.allowExternalLinks) { - // Handle glob pattern URLs - const baseUrl = url.replace("/*", ""); - const allowExternalLinks = req.body.allowExternalLinks; - let urlWithoutWww = baseUrl.replace("www.", ""); - - let rephrasedPrompt = req.body.prompt; - if (req.body.prompt) { - rephrasedPrompt = - (await generateBasicCompletion( - buildRefrasedPrompt(req.body.prompt, baseUrl), - )) ?? req.body.prompt; - } - - try { - const mapResults = await getMapResults({ - url: baseUrl, - search: rephrasedPrompt, - teamId: req.auth.team_id, - plan: req.auth.plan, - allowExternalLinks, - origin: req.body.origin, - limit: req.body.limit, - ignoreSitemap: false, - includeMetadata: true, - includeSubdomains: req.body.includeSubdomains, - }); - - let mappedLinks = mapResults.mapResults as MapDocument[]; - - // Remove duplicates between mapResults.links and mappedLinks - const allUrls = [...mappedLinks.map((m) => m.url), ...mapResults.links]; - const uniqueUrls = removeDuplicateUrls(allUrls); - - // Track all discovered URLs - uniqueUrls.forEach(discoveredUrl => { - if (!urlTraces.some(t => t.url === discoveredUrl)) { - urlTraces.push({ - url: discoveredUrl, - status: 'mapped', - timing: { - discoveredAt: new Date().toISOString(), - }, - usedInCompletion: false, // Default to false, will update if used - }); - } - }); - - // Only add URLs from mapResults.links that aren't already in mappedLinks - const existingUrls = new Set(mappedLinks.map((m) => m.url)); - const newUrls = uniqueUrls.filter((url) => !existingUrls.has(url)); - - mappedLinks = [ - ...mappedLinks, - ...newUrls.map((url) => ({ url, title: "", description: "" })), - ]; - - if (mappedLinks.length === 0) { - mappedLinks = [{ url: baseUrl, title: "", description: "" }]; - } - - // Limit number of links to MAX_EXTRACT_LIMIT - mappedLinks = mappedLinks.slice(0, MAX_EXTRACT_LIMIT); - - let mappedLinksRerank = mappedLinks.map( - (x) => - `url: ${x.url}, title: ${x.title}, description: ${x.description}`, - ); - - if (req.body.prompt) { - let searchQuery = - req.body.prompt && allowExternalLinks - ? `${req.body.prompt} ${urlWithoutWww}` - : req.body.prompt - ? `${req.body.prompt} site:${urlWithoutWww}` - : `site:${urlWithoutWww}`; - // Get similarity scores between the search query and each link's context - const linksAndScores = await performRanking( - mappedLinksRerank, - mappedLinks.map((l) => l.url), - searchQuery, - ); - - // First try with high threshold - let filteredLinks = filterAndProcessLinks( - mappedLinks, - linksAndScores, - INITIAL_SCORE_THRESHOLD, - ); - - // If we don't have enough high-quality links, try with lower threshold - if (filteredLinks.length < MIN_REQUIRED_LINKS) { - logger.info( - `Only found ${filteredLinks.length} links with score > ${INITIAL_SCORE_THRESHOLD}. Trying lower threshold...`, - ); - filteredLinks = filterAndProcessLinks( - mappedLinks, - linksAndScores, - FALLBACK_SCORE_THRESHOLD, - ); - - if (filteredLinks.length === 0) { - // If still no results, take top N results regardless of score - logger.warn( - `No links found with score > ${FALLBACK_SCORE_THRESHOLD}. Taking top ${MIN_REQUIRED_LINKS} results.`, - ); - filteredLinks = linksAndScores - .sort((a, b) => b.score - a.score) - .slice(0, MIN_REQUIRED_LINKS) - .map((x) => mappedLinks.find((link) => link.url === x.link)) - .filter( - (x): x is MapDocument => - x !== undefined && - x.url !== undefined && - !isUrlBlocked(x.url), - ); - } - } - - // Update URL traces with relevance scores and mark filtered out URLs - linksAndScores.forEach((score) => { - const trace = urlTraces.find((t) => t.url === score.link); - if (trace) { - trace.relevanceScore = score.score; - // If URL didn't make it through filtering, mark it as filtered out - if (!filteredLinks.some(link => link.url === score.link)) { - trace.warning = `Relevance score ${score.score} below threshold`; - trace.usedInCompletion = false; - } - } - }); - - mappedLinks = filteredLinks.slice(0, MAX_RANKING_LIMIT); - - // Mark URLs that will be used in completion - mappedLinks.forEach(link => { - const trace = urlTraces.find(t => t.url === link.url); - if (trace) { - trace.usedInCompletion = true; - } - }); - - // Mark URLs that were dropped due to ranking limit - filteredLinks.slice(MAX_RANKING_LIMIT).forEach(link => { - const trace = urlTraces.find(t => t.url === link.url); - if (trace) { - trace.warning = 'Excluded due to ranking limit'; - trace.usedInCompletion = false; - } - }); - } - - return mappedLinks.map((x) => x.url); - } catch (error) { - trace.status = 'error'; - trace.error = error.message; - trace.usedInCompletion = false; - return []; - } - } else { - // Handle direct URLs without glob pattern - if (!isUrlBlocked(url)) { - trace.usedInCompletion = true; - return [url]; - } - trace.status = 'error'; - trace.error = 'URL is blocked'; - trace.usedInCompletion = false; - return []; - } - }); - - // Wait for all URL processing to complete and flatten results - const processedUrls = await Promise.all(urlPromises); - const flattenedUrls = processedUrls.flat().filter((url) => url); - links.push(...flattenedUrls); - - if (links.length === 0) { + if (!req.auth.plan) { return res.status(400).json({ success: false, - error: - "No valid URLs found to scrape. Try adjusting your search criteria or including more URLs.", - urlTrace: urlTraces, + error: "No plan specified", + urlTrace: [], }); } - // Scrape all links in parallel with retries - const scrapePromises = links.map(async (url) => { - const trace = urlTraces.find((t) => t.url === url); - if (trace) { - trace.status = 'scraped'; - trace.timing.scrapedAt = new Date().toISOString(); - } - - const origin = req.body.origin || "api"; - const timeout = Math.floor((req.body.timeout || 40000) * 0.7) || 30000; - const jobId = crypto.randomUUID(); - - const jobPriority = await getJobPriority({ - plan: req.auth.plan as PlanType, - team_id: req.auth.team_id, - basePriority: 10, - }); - - try { - await addScrapeJob( - { - url, - mode: "single_urls", - team_id: req.auth.team_id, - scrapeOptions: scrapeOptions.parse({}), - internalOptions: {}, - plan: req.auth.plan!, - origin, - is_scrape: true, - }, - {}, - jobId, - jobPriority, - ); - - const doc = await waitForJob(jobId, timeout); - await getScrapeQueue().remove(jobId); - - if (trace) { - trace.timing.completedAt = new Date().toISOString(); - trace.contentStats = { - rawContentLength: doc.markdown?.length || 0, - processedContentLength: doc.markdown?.length || 0, - tokensUsed: 0, // Will be updated after LLM processing - }; - } - - if (earlyReturn) { - return null; - } - return doc; - } catch (e) { - logger.error(`Error in extractController: ${e}`); - if (trace) { - trace.status = 'error'; - trace.error = e.message; - } - return null; - } + const result = await performExtraction({ + request: req.body, + teamId: req.auth.team_id, + plan: req.auth.plan, + subId: req.acuc?.sub_id || undefined, }); - try { - const results = await Promise.all(scrapePromises); - docs.push(...results.filter((doc) => doc !== null).map((x) => x!)); - } catch (e) { - return res.status(e.status).json({ - success: false, - error: e.error, - urlTrace: urlTraces, - }); - } - - const completions = await generateOpenAICompletions( - logger.child({ method: "extractController/generateOpenAICompletions" }), - { - mode: "llm", - systemPrompt: - (req.body.systemPrompt ? `${req.body.systemPrompt}\n` : "") + - "Always prioritize using the provided content to answer the question. Do not make up an answer. Be concise and follow the schema always if provided. Here are the urls the user provided of which he wants to extract information from: " + - links.join(", "), - prompt: req.body.prompt, - schema: req.body.schema, - }, - docs.map((x) => buildDocument(x)).join("\n"), - undefined, - true, - ); - - // Update token usage in URL traces - if (completions.numTokens) { - // Distribute tokens proportionally based on content length - const totalLength = docs.reduce((sum, doc) => sum + (doc.markdown?.length || 0), 0); - docs.forEach((doc) => { - if (doc.metadata?.sourceURL) { - const trace = urlTraces.find((t) => t.url === doc.metadata.sourceURL); - if (trace && trace.contentStats) { - trace.contentStats.tokensUsed = Math.floor( - ((doc.markdown?.length || 0) / totalLength) * completions.numTokens - ); - } - } - }); - } - - // TODO: change this later - // While on beta, we're billing 5 credits per link discovered/scraped. - billTeam(req.auth.team_id, req.acuc?.sub_id, links.length * 5).catch( - (error) => { - logger.error( - `Failed to bill team ${req.auth.team_id} for ${links.length * 5} credits: ${error}`, - ); - }, - ); - - let data = completions.extract ?? {}; - let warning = completions.warning; - - logJob({ - job_id: id, - success: true, - message: "Extract completed", - num_docs: 1, - docs: data, - time_taken: (new Date().getTime() - Date.now()) / 1000, - team_id: req.auth.team_id, - mode: "extract", - url: req.body.urls.join(", "), - scrapeOptions: req.body, - origin: req.body.origin ?? "api", - num_tokens: completions.numTokens ?? 0, - }); - - return res.status(200).json({ - success: true, - data: data, - scrape_id: id, - warning: warning, - urlTrace: urlTraces, - }); -} - -/** - * Filters links based on their similarity score to the search query. - * @param mappedLinks - The list of mapped links to filter. - * @param linksAndScores - The list of links and their similarity scores. - * @param threshold - The score threshold to filter by. - * @returns The filtered list of links. - */ -function filterAndProcessLinks( - mappedLinks: MapDocument[], - linksAndScores: { - link: string; - linkWithContext: string; - score: number; - originalIndex: number; - }[], - threshold: number, -): MapDocument[] { - return linksAndScores - .filter((x) => x.score > threshold) - .map((x) => mappedLinks.find((link) => link.url === x.link)) - .filter( - (x): x is MapDocument => - x !== undefined && x.url !== undefined && !isUrlBlocked(x.url), - ); + return res.status(result.success ? 200 : 400).json(result); } diff --git a/apps/api/src/lib/extract/document-scraper.ts b/apps/api/src/lib/extract/document-scraper.ts new file mode 100644 index 00000000..04194b0b --- /dev/null +++ b/apps/api/src/lib/extract/document-scraper.ts @@ -0,0 +1,69 @@ +import { Document, URLTrace, scrapeOptions } from "../../controllers/v1/types"; +import { PlanType } from "../../types"; +import { logger } from "../logger"; +import { getScrapeQueue } from "../../services/queue-service"; +import { waitForJob } from "../../services/queue-jobs"; +import { addScrapeJob } from "../../services/queue-jobs"; +import { getJobPriority } from "../job-priority"; + +interface ScrapeDocumentOptions { + url: string; + teamId: string; + plan: PlanType; + origin: string; + timeout: number; +} + +export async function scrapeDocument(options: ScrapeDocumentOptions, urlTraces: URLTrace[]): Promise { + const trace = urlTraces.find((t) => t.url === options.url); + if (trace) { + trace.status = 'scraped'; + trace.timing.scrapedAt = new Date().toISOString(); + } + + const jobId = crypto.randomUUID(); + const jobPriority = await getJobPriority({ + plan: options.plan, + team_id: options.teamId, + basePriority: 10, + }); + + try { + await addScrapeJob( + { + url: options.url, + mode: "single_urls", + team_id: options.teamId, + scrapeOptions: scrapeOptions.parse({}), + internalOptions: {}, + plan: options.plan, + origin: options.origin, + is_scrape: true, + }, + {}, + jobId, + jobPriority, + ); + + const doc = await waitForJob(jobId, options.timeout); + await getScrapeQueue().remove(jobId); + + if (trace) { + trace.timing.completedAt = new Date().toISOString(); + trace.contentStats = { + rawContentLength: doc.markdown?.length || 0, + processedContentLength: doc.markdown?.length || 0, + tokensUsed: 0, + }; + } + + return doc; + } catch (error) { + logger.error(`Error in scrapeDocument: ${error}`); + if (trace) { + trace.status = 'error'; + trace.error = error.message; + } + return null; + } +} \ No newline at end of file diff --git a/apps/api/src/lib/extract/extraction-service.ts b/apps/api/src/lib/extract/extraction-service.ts new file mode 100644 index 00000000..1cf47f86 --- /dev/null +++ b/apps/api/src/lib/extract/extraction-service.ts @@ -0,0 +1,145 @@ +import { Document, ExtractRequest, URLTrace } from "../../controllers/v1/types"; +import { PlanType } from "../../types"; +import { logger } from "../logger"; +import { processUrl } from "./url-processor"; +import { scrapeDocument } from "./document-scraper"; +import { generateOpenAICompletions } from "../../scraper/scrapeURL/transformers/llmExtract"; +import { buildDocument } from "./build-document"; +import { billTeam } from "../../services/billing/credit_billing"; +import { logJob } from "../../services/logging/log_job"; + +interface ExtractServiceOptions { + request: ExtractRequest; + teamId: string; + plan: PlanType; + subId?: string; +} + +interface ExtractResult { + success: boolean; + data?: any; + scrapeId: string; + warning?: string; + urlTrace: URLTrace[]; + error?: string; +} + +export async function performExtraction(options: ExtractServiceOptions): Promise { + const { request, teamId, plan, subId } = options; + const scrapeId = crypto.randomUUID(); + const urlTraces: URLTrace[] = []; + let docs: Document[] = []; + + // Process URLs + const urlPromises = request.urls.map(url => + processUrl({ + url, + prompt: request.prompt, + teamId, + plan, + allowExternalLinks: request.allowExternalLinks, + origin: request.origin, + limit: request.limit, + includeSubdomains: request.includeSubdomains, + }, urlTraces) + ); + + const processedUrls = await Promise.all(urlPromises); + const links = processedUrls.flat().filter(url => url); + + if (links.length === 0) { + return { + success: false, + error: "No valid URLs found to scrape. Try adjusting your search criteria or including more URLs.", + scrapeId, + urlTrace: urlTraces, + }; + } + + // Scrape documents + const timeout = Math.floor((request.timeout || 40000) * 0.7) || 30000; + const scrapePromises = links.map(url => + scrapeDocument({ + url, + teamId, + plan, + origin: request.origin || "api", + timeout, + }, urlTraces) + ); + + try { + const results = await Promise.all(scrapePromises); + docs.push(...results.filter((doc): doc is Document => doc !== null)); + } catch (error) { + return { + success: false, + error: error.message, + scrapeId, + urlTrace: urlTraces, + }; + } + + // Generate completions + const completions = await generateOpenAICompletions( + logger.child({ method: "extractService/generateOpenAICompletions" }), + { + mode: "llm", + systemPrompt: + (request.systemPrompt ? `${request.systemPrompt}\n` : "") + + "Always prioritize using the provided content to answer the question. Do not make up an answer. Be concise and follow the schema always if provided. Here are the urls the user provided of which he wants to extract information from: " + + links.join(", "), + prompt: request.prompt, + schema: request.schema, + }, + docs.map((x) => buildDocument(x)).join("\n"), + undefined, + true, + ); + + // Update token usage in traces + if (completions.numTokens) { + const totalLength = docs.reduce((sum, doc) => sum + (doc.markdown?.length || 0), 0); + docs.forEach((doc) => { + if (doc.metadata?.sourceURL) { + const trace = urlTraces.find((t) => t.url === doc.metadata.sourceURL); + if (trace && trace.contentStats) { + trace.contentStats.tokensUsed = Math.floor( + ((doc.markdown?.length || 0) / totalLength) * completions.numTokens + ); + } + } + }); + } + + // Bill team for usage + billTeam(teamId, subId, links.length * 5).catch((error) => { + logger.error( + `Failed to bill team ${teamId} for ${links.length * 5} credits: ${error}`, + ); + }); + + // Log job + logJob({ + job_id: scrapeId, + success: true, + message: "Extract completed", + num_docs: 1, + docs: completions.extract ?? {}, + time_taken: (new Date().getTime() - Date.now()) / 1000, + team_id: teamId, + mode: "extract", + url: request.urls.join(", "), + scrapeOptions: request, + origin: request.origin ?? "api", + num_tokens: completions.numTokens ?? 0, + }); + + return { + success: true, + data: completions.extract ?? {}, + scrapeId, + warning: completions.warning, + urlTrace: urlTraces, + }; +} \ No newline at end of file diff --git a/apps/api/src/lib/extract/reranker.ts b/apps/api/src/lib/extract/reranker.ts index 26e7ac06..2a4e2f62 100644 --- a/apps/api/src/lib/extract/reranker.ts +++ b/apps/api/src/lib/extract/reranker.ts @@ -1,9 +1,28 @@ +import { MapDocument, URLTrace } from "../../controllers/v1/types"; +import { performRanking } from "../ranker"; +import { isUrlBlocked } from "../../scraper/WebScraper/utils/blocklist"; +import { logger } from "../logger"; import { CohereClient } from "cohere-ai"; -import { MapDocument } from "../../controllers/v1/types"; + const cohere = new CohereClient({ token: process.env.COHERE_API_KEY, }); +const MAX_RANKING_LIMIT = 10; +const INITIAL_SCORE_THRESHOLD = 0.75; +const FALLBACK_SCORE_THRESHOLD = 0.5; +const MIN_REQUIRED_LINKS = 1; + +interface RankingResult { + mappedLinks: MapDocument[]; + linksAndScores: { + link: string; + linkWithContext: string; + score: number; + originalIndex: number; + }[]; +} + export async function rerankDocuments( documents: (string | Record)[], query: string, @@ -26,3 +45,106 @@ export async function rerankDocuments( relevanceScore: x.relevanceScore, })); } + +export async function rerankLinks( + mappedLinks: MapDocument[], + searchQuery: string, + urlTraces: URLTrace[], +): Promise { + const mappedLinksRerank = mappedLinks.map( + (x) => `url: ${x.url}, title: ${x.title}, description: ${x.description}`, + ); + + const linksAndScores = await performRanking( + mappedLinksRerank, + mappedLinks.map((l) => l.url), + searchQuery, + ); + + // First try with high threshold + let filteredLinks = filterAndProcessLinks( + mappedLinks, + linksAndScores, + INITIAL_SCORE_THRESHOLD, + ); + + // If we don't have enough high-quality links, try with lower threshold + if (filteredLinks.length < MIN_REQUIRED_LINKS) { + logger.info( + `Only found ${filteredLinks.length} links with score > ${INITIAL_SCORE_THRESHOLD}. Trying lower threshold...`, + ); + filteredLinks = filterAndProcessLinks( + mappedLinks, + linksAndScores, + FALLBACK_SCORE_THRESHOLD, + ); + + if (filteredLinks.length === 0) { + // If still no results, take top N results regardless of score + logger.warn( + `No links found with score > ${FALLBACK_SCORE_THRESHOLD}. Taking top ${MIN_REQUIRED_LINKS} results.`, + ); + filteredLinks = linksAndScores + .sort((a, b) => b.score - a.score) + .slice(0, MIN_REQUIRED_LINKS) + .map((x) => mappedLinks.find((link) => link.url === x.link)) + .filter( + (x): x is MapDocument => + x !== undefined && x.url !== undefined && !isUrlBlocked(x.url), + ); + } + } + + // Update URL traces with relevance scores and mark filtered out URLs + linksAndScores.forEach((score) => { + const trace = urlTraces.find((t) => t.url === score.link); + if (trace) { + trace.relevanceScore = score.score; + // If URL didn't make it through filtering, mark it as filtered out + if (!filteredLinks.some(link => link.url === score.link)) { + trace.warning = `Relevance score ${score.score} below threshold`; + trace.usedInCompletion = false; + } + } + }); + + const rankedLinks = filteredLinks.slice(0, MAX_RANKING_LIMIT); + + // Mark URLs that will be used in completion + rankedLinks.forEach(link => { + const trace = urlTraces.find(t => t.url === link.url); + if (trace) { + trace.usedInCompletion = true; + } + }); + + // Mark URLs that were dropped due to ranking limit + filteredLinks.slice(MAX_RANKING_LIMIT).forEach(link => { + const trace = urlTraces.find(t => t.url === link.url); + if (trace) { + trace.warning = 'Excluded due to ranking limit'; + trace.usedInCompletion = false; + } + }); + + return rankedLinks; +} + +function filterAndProcessLinks( + mappedLinks: MapDocument[], + linksAndScores: { + link: string; + linkWithContext: string; + score: number; + originalIndex: number; + }[], + threshold: number, +): MapDocument[] { + return linksAndScores + .filter((x) => x.score > threshold) + .map((x) => mappedLinks.find((link) => link.url === x.link)) + .filter( + (x): x is MapDocument => + x !== undefined && x.url !== undefined && !isUrlBlocked(x.url), + ); +} diff --git a/apps/api/src/lib/extract/url-processor.ts b/apps/api/src/lib/extract/url-processor.ts new file mode 100644 index 00000000..4d61a8d3 --- /dev/null +++ b/apps/api/src/lib/extract/url-processor.ts @@ -0,0 +1,118 @@ +import { MapDocument, URLTrace } from "../../controllers/v1/types"; +import { getMapResults } from "../../controllers/v1/map"; +import { PlanType } from "../../types"; +import { removeDuplicateUrls } from "../validateUrl"; +import { isUrlBlocked } from "../../scraper/WebScraper/utils/blocklist"; +import { generateBasicCompletion } from "../LLM-extraction"; +import { buildRefrasedPrompt } from "./build-prompts"; +import { logger } from "../logger"; +import { rerankLinks } from "./reranker"; + +const MAX_EXTRACT_LIMIT = 100; + +interface ProcessUrlOptions { + url: string; + prompt?: string; + teamId: string; + plan: PlanType; + allowExternalLinks?: boolean; + origin?: string; + limit?: number; + includeSubdomains?: boolean; +} + +export async function processUrl(options: ProcessUrlOptions, urlTraces: URLTrace[]): Promise { + const trace: URLTrace = { + url: options.url, + status: 'mapped', + timing: { + discoveredAt: new Date().toISOString(), + }, + }; + urlTraces.push(trace); + + if (!options.url.includes("/*") && !options.allowExternalLinks) { + if (!isUrlBlocked(options.url)) { + trace.usedInCompletion = true; + return [options.url]; + } + trace.status = 'error'; + trace.error = 'URL is blocked'; + trace.usedInCompletion = false; + return []; + } + + const baseUrl = options.url.replace("/*", ""); + let urlWithoutWww = baseUrl.replace("www.", ""); + + let rephrasedPrompt = options.prompt; + if (options.prompt) { + rephrasedPrompt = await generateBasicCompletion( + buildRefrasedPrompt(options.prompt, baseUrl) + ) ?? options.prompt; + } + + try { + const mapResults = await getMapResults({ + url: baseUrl, + search: rephrasedPrompt, + teamId: options.teamId, + plan: options.plan, + allowExternalLinks: options.allowExternalLinks, + origin: options.origin, + limit: options.limit, + ignoreSitemap: false, + includeMetadata: true, + includeSubdomains: options.includeSubdomains, + }); + + let mappedLinks = mapResults.mapResults as MapDocument[]; + const allUrls = [...mappedLinks.map((m) => m.url), ...mapResults.links]; + const uniqueUrls = removeDuplicateUrls(allUrls); + + // Track all discovered URLs + uniqueUrls.forEach(discoveredUrl => { + if (!urlTraces.some(t => t.url === discoveredUrl)) { + urlTraces.push({ + url: discoveredUrl, + status: 'mapped', + timing: { + discoveredAt: new Date().toISOString(), + }, + usedInCompletion: false, + }); + } + }); + + const existingUrls = new Set(mappedLinks.map((m) => m.url)); + const newUrls = uniqueUrls.filter((url) => !existingUrls.has(url)); + + mappedLinks = [ + ...mappedLinks, + ...newUrls.map((url) => ({ url, title: "", description: "" })), + ]; + + if (mappedLinks.length === 0) { + mappedLinks = [{ url: baseUrl, title: "", description: "" }]; + } + + // Limit initial set of links + mappedLinks = mappedLinks.slice(0, MAX_EXTRACT_LIMIT); + + // Perform reranking if prompt is provided + if (options.prompt) { + const searchQuery = options.allowExternalLinks + ? `${options.prompt} ${urlWithoutWww}` + : `${options.prompt} site:${urlWithoutWww}`; + + mappedLinks = await rerankLinks(mappedLinks, searchQuery, urlTraces); + } + + return mappedLinks.map(x => x.url); + } catch (error) { + trace.status = 'error'; + trace.error = error.message; + trace.usedInCompletion = false; + return []; + } +} \ No newline at end of file From 4332f18a8fc7d5a3e867cd5d4ef66466d39629fb Mon Sep 17 00:00:00 2001 From: Nicolas Date: Thu, 26 Dec 2024 12:43:58 -0300 Subject: [PATCH 3/4] Nick: making it optional for the user --- apps/api/src/controllers/v1/types.ts | 1 + apps/api/src/lib/extract/extraction-service.ts | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/apps/api/src/controllers/v1/types.ts b/apps/api/src/controllers/v1/types.ts index 5f388920..99c3aa6f 100644 --- a/apps/api/src/controllers/v1/types.ts +++ b/apps/api/src/controllers/v1/types.ts @@ -199,6 +199,7 @@ export const extractV1Options = z includeSubdomains: z.boolean().default(true), allowExternalLinks: z.boolean().default(false), origin: z.string().optional().default("api"), + urlTrace: z.boolean().default(false), timeout: z.number().int().positive().finite().safe().default(60000), }) .strict(strictMessage); diff --git a/apps/api/src/lib/extract/extraction-service.ts b/apps/api/src/lib/extract/extraction-service.ts index 1cf47f86..f84a1f34 100644 --- a/apps/api/src/lib/extract/extraction-service.ts +++ b/apps/api/src/lib/extract/extraction-service.ts @@ -20,7 +20,7 @@ interface ExtractResult { data?: any; scrapeId: string; warning?: string; - urlTrace: URLTrace[]; + urlTrace?: URLTrace[]; error?: string; } @@ -140,6 +140,6 @@ export async function performExtraction(options: ExtractServiceOptions): Promise data: completions.extract ?? {}, scrapeId, warning: completions.warning, - urlTrace: urlTraces, + urlTrace: request.urlTrace ? urlTraces : undefined, }; } \ No newline at end of file From 1abb544e3e9997b5df3524075c0e2de65d982591 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Fri, 27 Dec 2024 13:59:09 -0300 Subject: [PATCH 4/4] Update index.test.ts --- apps/api/src/__tests__/e2e_extract/index.test.ts | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/apps/api/src/__tests__/e2e_extract/index.test.ts b/apps/api/src/__tests__/e2e_extract/index.test.ts index e1e4d1ce..dcb6bb4f 100644 --- a/apps/api/src/__tests__/e2e_extract/index.test.ts +++ b/apps/api/src/__tests__/e2e_extract/index.test.ts @@ -111,9 +111,11 @@ describe("E2E Tests for Extract API Routes", () => { let gotItRight = 0; for (const hiring of response.body.data?.items) { - if (hiring.includes("Developer Support Engineer")) gotItRight++; - if (hiring.includes("Dev Ops Engineer")) gotItRight++; + if (hiring.includes("Firecrawl Example Creator")) gotItRight++; + if (hiring.includes("Senior Frontend Engineer")) gotItRight++; + if (hiring.includes("Technical Chief of Staff")) gotItRight++; if (hiring.includes("Founding Web Automation Engineer")) gotItRight++; + if (hiring.includes("Founding Fullstack Engineer")) gotItRight++; } expect(gotItRight).toBeGreaterThan(2);