Nick: streaming callback experimental

This commit is contained in:
Nicolas 2025-01-14 02:13:42 -03:00
parent 23d3257a57
commit 61e6af2b16
3 changed files with 45 additions and 9 deletions

View File

@ -3,12 +3,15 @@ import { logger as _logger } from "../logger";
export enum ExtractStep {
INITIAL = "initial",
MAP = "map",
MAP_RERANK = "map-rerank",
MULTI_ENTITY = "multi-entity",
MULTI_ENTITY_SCRAPE = "multi-entity-scrape",
MULTI_ENTITY_EXTRACT = "multi-entity-extract",
SCRAPE = "scrape",
MAP = "map",
EXTRACT = "extract",
COMPLETE = "complete",
}

View File

@ -170,6 +170,8 @@ export async function performExtraction(
],
});
let startMap = Date.now();
let aggMapLinks: string[] = [];
// Process URLs
const urlPromises = request.urls.map((url) =>
processUrl(
@ -184,9 +186,20 @@ export async function performExtraction(
includeSubdomains: request.includeSubdomains,
schema: request.schema,
},
urlTraces,
),
);
urlTraces,
(links: string[]) => {
aggMapLinks.push(...links);
updateExtract(extractId, {
steps: [
{
step: ExtractStep.MAP,
startedAt: startMap,
finishedAt: Date.now(),
discoveredLinks: aggMapLinks,
},
],
});
}));
const processedUrls = await Promise.all(urlPromises);
const links = processedUrls.flat().filter((url) => url);
@ -205,8 +218,8 @@ export async function performExtraction(
status: "processing",
steps: [
{
step: ExtractStep.MAP,
startedAt: Date.now(),
step: ExtractStep.MAP_RERANK,
startedAt: startMap,
finishedAt: Date.now(),
discoveredLinks: links,
},
@ -221,6 +234,7 @@ export async function performExtraction(
// if so, it splits the results into 2 types of completions:
// 1. the first one is a completion that will extract the array of items
// 2. the second one is multiple completions that will extract the items from the array
let startAnalyze = Date.now();
const { isMultiEntity, multiEntityKeys, reasoning, keyIndicators } =
await analyzeSchemaAndPrompt(links, request.schema, request.prompt ?? "");
@ -239,7 +253,7 @@ export async function performExtraction(
steps: [
{
step: ExtractStep.MULTI_ENTITY,
startedAt: Date.now(),
startedAt: startAnalyze,
finishedAt: Date.now(),
discoveredLinks: [],
},
@ -254,12 +268,14 @@ export async function performExtraction(
steps: [
{
step: ExtractStep.MULTI_ENTITY_SCRAPE,
startedAt: Date.now(),
startedAt: startAnalyze,
finishedAt: Date.now(),
discoveredLinks: links,
},
],
});
let startScrape = Date.now();
const scrapePromises = links.map((url) => {
if (!docsMap.has(url)) {
return scrapeDocument(
@ -280,6 +296,20 @@ export async function performExtraction(
(doc): doc is Document => doc !== null,
);
let endScrape = Date.now();
await updateExtract(extractId, {
status: "processing",
steps: [
{
step: ExtractStep.MULTI_ENTITY_SCRAPE,
startedAt: startScrape,
finishedAt: endScrape,
discoveredLinks: links,
},
],
});
for (const doc of multyEntityDocs) {
if (doc?.metadata?.url) {
docsMap.set(doc.metadata.url, doc);
@ -352,7 +382,7 @@ export async function performExtraction(
steps: [
{
step: ExtractStep.MULTI_ENTITY_EXTRACT,
startedAt: Date.now(),
startedAt: startScrape,
finishedAt: Date.now(),
discoveredLinks: [doc.metadata.url || doc.metadata.sourceURL || ""],
},

View File

@ -25,6 +25,7 @@ interface ProcessUrlOptions {
export async function processUrl(
options: ProcessUrlOptions,
urlTraces: URLTrace[],
updateExtractCallback: (links: string[]) => void,
): Promise<string[]> {
const trace: URLTrace = {
url: options.url,
@ -160,6 +161,8 @@ export async function processUrl(
);
updateExtractCallback(mappedLinks.map((x) => x.url));
// Perform reranking using either prompt or schema
let searchQuery = "";