feat(extract): add logging

This commit is contained in:
Móricz Gergő 2025-01-23 12:05:15 +01:00
parent 434a435a4b
commit d3518e85a8
4 changed files with 87 additions and 8 deletions

View File

@ -5,6 +5,7 @@ import { getScrapeQueue } from "../../services/queue-service";
import { waitForJob } from "../../services/queue-jobs";
import { addScrapeJob } from "../../services/queue-jobs";
import { getJobPriority } from "../job-priority";
import type { Logger } from "winston";
interface ScrapeDocumentOptions {
url: string;
@ -18,6 +19,7 @@ interface ScrapeDocumentOptions {
export async function scrapeDocument(
options: ScrapeDocumentOptions,
urlTraces: URLTrace[],
logger: Logger,
): Promise<Document | null> {
const trace = urlTraces.find((t) => t.url === options.url);
if (trace) {
@ -68,16 +70,25 @@ export async function scrapeDocument(
try {
try {
return await attemptScrape(options.timeout);
logger.debug("Attempting scrape...");
const x = await attemptScrape(options.timeout);
logger.debug("Scrape finished!");
return x;
} catch (timeoutError) {
logger.warn("Scrape failed.", { error: timeoutError });
if (options.isSingleUrl) {
// For single URLs, try again with double timeout
return await attemptScrape(options.timeout * 2);
logger.debug("Attempting scrape...");
const x = await attemptScrape(options.timeout * 2);
logger.debug("Scrape finished!");
return x;
}
throw timeoutError;
}
} catch (error) {
logger.error(`Error in scrapeDocument: ${error}`);
logger.error(`error in scrapeDocument`, { error });
if (trace) {
trace.status = "error";
trace.error = error.message;

View File

@ -193,7 +193,7 @@ export async function performExtraction(
let totalUrlsScraped = 0;
const logger = _logger.child({
module: "extraction-service",
module: "extract",
method: "performExtraction",
extractId,
});
@ -215,6 +215,9 @@ export async function performExtraction(
let startMap = Date.now();
let aggMapLinks: string[] = [];
logger.debug("Processing URLs...", {
urlCount: request.urls.length,
});
// Process URLs
const urlPromises = request.urls.map((url) =>
processUrl(
@ -243,11 +246,15 @@ export async function performExtraction(
],
});
},
logger.child({ module: "extract", method: "processUrl", url }),
),
);
const processedUrls = await Promise.all(urlPromises);
const links = processedUrls.flat().filter((url) => url);
logger.debug("Processed URLs.", {
linkCount: links.length,
});
if (links.length === 0) {
return {
@ -281,6 +288,8 @@ export async function performExtraction(
reqSchema = await dereferenceSchema(reqSchema);
}
logger.debug("Transformed schema.", { schema: reqSchema });
// agent evaluates if the schema or the prompt has an array with big amount of items
// also it checks if the schema any other properties that are not arrays
// if so, it splits the results into 2 types of completions:
@ -295,6 +304,8 @@ export async function performExtraction(
tokenUsage: schemaAnalysisTokenUsage,
} = await analyzeSchemaAndPrompt(links, reqSchema, request.prompt ?? "");
logger.debug("Analyzed schema.", { isMultiEntity, multiEntityKeys, reasoning, keyIndicators });
// Track schema analysis tokens
tokenUsage.push(schemaAnalysisTokenUsage);
@ -305,11 +316,14 @@ export async function performExtraction(
let rSchema = reqSchema;
if (isMultiEntity && reqSchema) {
logger.debug("=== MULTI-ENTITY ===");
const { singleAnswerSchema, multiEntitySchema } = await spreadSchemas(
reqSchema,
multiEntityKeys,
);
rSchema = singleAnswerSchema;
logger.debug("Spread schemas.", { singleAnswerSchema, multiEntitySchema });
await updateExtract(extractId, {
status: "processing",
@ -337,6 +351,7 @@ export async function performExtraction(
],
});
logger.debug("Starting multi-entity scrape...");
let startScrape = Date.now();
const scrapePromises = links.map((url) => {
if (!docsMap.has(url)) {
@ -349,6 +364,7 @@ export async function performExtraction(
timeout,
},
urlTraces,
logger.child({ module: "extract", method: "scrapeDocument", url, isMultiEntity: true }),
);
}
return docsMap.get(url);
@ -358,6 +374,10 @@ export async function performExtraction(
(doc): doc is Document => doc !== null,
);
logger.debug("Multi-entity scrape finished.", {
docCount: multyEntityDocs.length,
});
totalUrlsScraped += multyEntityDocs.length;
let endScrape = Date.now();
@ -380,6 +400,8 @@ export async function performExtraction(
}
}
logger.debug("Updated docsMap.", { docsMapSize: docsMap.size }); // useful for error probing
// Process docs in chunks with queue style processing
const chunkSize = 50;
const timeoutCompletion = 45000; // 45 second timeout
@ -527,7 +549,7 @@ export async function performExtraction(
return multiEntityCompletion.extract;
} catch (error) {
logger.error(`Failed to process document: ${error}`);
logger.error(`Failed to process document.`, { error, url: doc.metadata.url ?? doc.metadata.sourceURL! });
return null;
}
});
@ -537,6 +559,7 @@ export async function performExtraction(
multiEntityCompletions.push(
...chunkResults.filter((result) => result !== null),
);
logger.debug("All multi-entity completion chunks finished.", { completionCount: multiEntityCompletions.length });
}
try {
@ -548,7 +571,7 @@ export async function performExtraction(
multiEntityResult = mergeNullValObjs(multiEntityResult);
// @nick: maybe we can add here a llm that checks if the array probably has a primary key?
} catch (error) {
logger.error(`Failed to transform array to object: ${error}`);
logger.error(`Failed to transform array to object`, { error });
return {
success: false,
error:
@ -565,6 +588,11 @@ export async function performExtraction(
rSchema.properties &&
Object.keys(rSchema.properties).length > 0
) {
logger.debug("=== SINGLE PAGES ===", {
linkCount: links.length,
schema: rSchema,
});
// Scrape documents
const timeout = 60000;
let singleAnswerDocs: Document[] = [];
@ -593,6 +621,7 @@ export async function performExtraction(
timeout,
},
urlTraces,
logger.child({ module: "extract", method: "scrapeDocument", url, isMultiEntity: false })
);
}
return docsMap.get(url);
@ -606,12 +635,15 @@ export async function performExtraction(
docsMap.set(doc.metadata.url, doc);
}
}
logger.debug("Updated docsMap.", { docsMapSize: docsMap.size }); // useful for error probing
const validResults = results.filter(
(doc): doc is Document => doc !== null,
);
singleAnswerDocs.push(...validResults);
totalUrlsScraped += validResults.length;
logger.debug("Scrapes finished.", { docCount: validResults.length });
} catch (error) {
return {
success: false,
@ -624,6 +656,7 @@ export async function performExtraction(
if (docsMap.size == 0) {
// All urls are invalid
logger.error("All provided URLs are invalid!");
return {
success: false,
error:
@ -647,8 +680,9 @@ export async function performExtraction(
});
// Generate completions
logger.debug("Generating singleAnswer completions...");
singleAnswerCompletions = await generateOpenAICompletions(
logger.child({ method: "extractService/generateOpenAICompletions" }),
logger.child({ module: "extract", method: "generateOpenAICompletions" }),
{
mode: "llm",
systemPrompt:
@ -662,6 +696,7 @@ export async function performExtraction(
undefined,
true,
);
logger.debug("Done generating singleAnswer completions.");
// Track single answer extraction tokens
if (singleAnswerCompletions) {
@ -691,7 +726,7 @@ export async function performExtraction(
}
let finalResult = reqSchema
? await mixSchemaObjects(reqSchema, singleAnswerResult, multiEntityResult)
? await mixSchemaObjects(reqSchema, singleAnswerResult, multiEntityResult, logger.child({ method: "mixSchemaObjects" }))
: singleAnswerResult || multiEntityResult;
// Tokenize final result to get token count
@ -785,6 +820,8 @@ export async function performExtraction(
});
});
logger.debug("Done!");
return {
success: true,
data: finalResult ?? {},

View File

@ -1,9 +1,13 @@
import type { Logger } from "winston";
export async function mixSchemaObjects(
finalSchema: any,
singleAnswerResult: any,
multiEntityResult: any,
logger: Logger
) {
const finalResult: any = {};
logger.debug("Mixing schema objects.");
// Recursive helper function to merge results based on schema
function mergeResults(schema: any, singleResult: any, multiResult: any) {

View File

@ -9,6 +9,7 @@ import { rerankLinksWithLLM } from "./reranker";
import { extractConfig } from "./config";
import { updateExtract } from "./extract-redis";
import { ExtractStep } from "./extract-redis";
import type { Logger } from "winston";
interface ProcessUrlOptions {
url: string;
@ -26,6 +27,7 @@ export async function processUrl(
options: ProcessUrlOptions,
urlTraces: URLTrace[],
updateExtractCallback: (links: string[]) => void,
logger: Logger,
): Promise<string[]> {
const trace: URLTrace = {
url: options.url,
@ -41,6 +43,7 @@ export async function processUrl(
trace.usedInCompletion = true;
return [options.url];
}
logger.warn("URL is blocked");
trace.status = "error";
trace.error = "URL is blocked";
trace.usedInCompletion = false;
@ -63,6 +66,9 @@ export async function processUrl(
}
try {
logger.debug("Running map...", {
search: searchQuery,
})
const mapResults = await getMapResults({
url: baseUrl,
search: searchQuery,
@ -79,6 +85,10 @@ export async function processUrl(
let mappedLinks = mapResults.mapResults as MapDocument[];
let allUrls = [...mappedLinks.map((m) => m.url), ...mapResults.links];
let uniqueUrls = removeDuplicateUrls(allUrls);
logger.debug("Map finished.", {
linkCount: allUrls.length,
uniqueLinkCount: uniqueUrls.length,
});
// Track all discovered URLs
uniqueUrls.forEach((discoveredUrl) => {
@ -96,6 +106,7 @@ export async function processUrl(
// retry if only one url is returned
if (uniqueUrls.length <= 1) {
logger.debug("Running map... (pass 2)");
const retryMapResults = await getMapResults({
url: baseUrl,
teamId: options.teamId,
@ -111,6 +122,10 @@ export async function processUrl(
mappedLinks = retryMapResults.mapResults as MapDocument[];
allUrls = [...mappedLinks.map((m) => m.url), ...mapResults.links];
uniqueUrls = removeDuplicateUrls(allUrls);
logger.debug("Map finished. (pass 2)", {
linkCount: allUrls.length,
uniqueLinkCount: uniqueUrls.length,
});
// Track all discovered URLs
uniqueUrls.forEach((discoveredUrl) => {
@ -184,6 +199,11 @@ export async function processUrl(
// (link, index) => `${index + 1}. URL: ${link.url}, Title: ${link.title}, Description: ${link.description}`
// );
logger.info("Generated rephrased prompt.", {
rephrasedPrompt
});
logger.info("Reranking (pass 1)...");
const rerankerResult = await rerankLinksWithLLM(
mappedLinks,
rephrasedPrompt,
@ -191,9 +211,13 @@ export async function processUrl(
);
mappedLinks = rerankerResult.mapDocument;
let tokensUsed = rerankerResult.tokensUsed;
logger.info("Reranked! (pass 1)", {
linkCount: mappedLinks.length,
});
// 2nd Pass, useful for when the first pass returns too many links
if (mappedLinks.length > 100) {
logger.info("Reranking (pass 2)...");
const rerankerResult = await rerankLinksWithLLM(
mappedLinks,
rephrasedPrompt,
@ -201,6 +225,9 @@ export async function processUrl(
);
mappedLinks = rerankerResult.mapDocument;
tokensUsed += rerankerResult.tokensUsed;
logger.info("Reranked! (pass 2)", {
linkCount: mappedLinks.length,
});
}
// dumpToFile(