diff --git a/apps/api/src/controllers/v1/extract-status.ts b/apps/api/src/controllers/v1/extract-status.ts index 20b87f68..81477e7c 100644 --- a/apps/api/src/controllers/v1/extract-status.ts +++ b/apps/api/src/controllers/v1/extract-status.ts @@ -39,6 +39,6 @@ export async function extractStatusController( expiresAt: (await getExtractExpiry(req.params.jobId)).toISOString(), steps: extract.showSteps ? extract.steps : undefined, llmUsage: extract.showLLMUsage ? extract.llmUsage : undefined, - // sources: extract.sources, + sources: extract.showSources ? extract.sources : undefined, }); } diff --git a/apps/api/src/controllers/v1/extract.ts b/apps/api/src/controllers/v1/extract.ts index d005e9be..ec29b35c 100644 --- a/apps/api/src/controllers/v1/extract.ts +++ b/apps/api/src/controllers/v1/extract.ts @@ -73,6 +73,7 @@ export async function extractController( status: "processing", showSteps: req.body.__experimental_streamSteps, showLLMUsage: req.body.__experimental_llmUsage, + showSources: req.body.__experimental_showSources, }); if (Sentry.isInitialized()) { diff --git a/apps/api/src/controllers/v1/types.ts b/apps/api/src/controllers/v1/types.ts index 0f8d00e2..7277821e 100644 --- a/apps/api/src/controllers/v1/types.ts +++ b/apps/api/src/controllers/v1/types.ts @@ -228,6 +228,7 @@ export const extractV1Options = z urlTrace: z.boolean().default(false), __experimental_streamSteps: z.boolean().default(false), __experimental_llmUsage: z.boolean().default(false), + __experimental_showSources: z.boolean().default(false), timeout: z.number().int().positive().finite().safe().default(60000), }) .strict(strictMessage) diff --git a/apps/api/src/lib/extract/document-scraper.ts b/apps/api/src/lib/extract/document-scraper.ts index cc084ee4..8cbc75fd 100644 --- a/apps/api/src/lib/extract/document-scraper.ts +++ b/apps/api/src/lib/extract/document-scraper.ts @@ -1,4 +1,4 @@ -import { Document, URLTrace, scrapeOptions } from "../../controllers/v1/types"; +import { Document, ScrapeOptions, URLTrace, scrapeOptions } from "../../controllers/v1/types"; import { PlanType } from "../../types"; import { logger } from "../logger"; import { getScrapeQueue } from "../../services/queue-service"; @@ -20,6 +20,7 @@ export async function scrapeDocument( options: ScrapeDocumentOptions, urlTraces: URLTrace[], logger: Logger, + internalScrapeOptions: Partial = { onlyMainContent: false }, ): Promise { const trace = urlTraces.find((t) => t.url === options.url); if (trace) { @@ -40,7 +41,7 @@ export async function scrapeDocument( url: options.url, mode: "single_urls", team_id: options.teamId, - scrapeOptions: scrapeOptions.parse({ onlyMainContent: false }), + scrapeOptions: scrapeOptions.parse({ ...internalScrapeOptions }), internalOptions: { useCache: true, }, diff --git a/apps/api/src/lib/extract/extract-redis.ts b/apps/api/src/lib/extract/extract-redis.ts index 39cf2364..c642f8b3 100644 --- a/apps/api/src/lib/extract/extract-redis.ts +++ b/apps/api/src/lib/extract/extract-redis.ts @@ -31,6 +31,7 @@ export type StoredExtract = { showSteps?: boolean; steps?: ExtractedStep[]; showLLMUsage?: boolean; + showSources?: boolean; llmUsage?: number; sources?: { [key: string]: string[]; diff --git a/apps/api/src/lib/extract/extraction-service.ts b/apps/api/src/lib/extract/extraction-service.ts index 9d1dd453..3725ff8b 100644 --- a/apps/api/src/lib/extract/extraction-service.ts +++ b/apps/api/src/lib/extract/extraction-service.ts @@ -25,6 +25,7 @@ const ajv = new Ajv(); import { ExtractStep, updateExtract } from "./extract-redis"; import { deduplicateObjectsArray } from "./helpers/deduplicate-objs-array"; import { mergeNullValObjs } from "./helpers/merge-null-val-objs"; +import { areMergeable } from "./helpers/merge-null-val-objs"; import { CUSTOM_U_TEAMS } from "./config"; import { calculateFinalResultCost, @@ -34,6 +35,7 @@ import { analyzeSchemaAndPrompt } from "./completions/analyzeSchemaAndPrompt"; import { checkShouldExtract } from "./completions/checkShouldExtract"; import { batchExtractPromise } from "./completions/batchExtract"; import { singleAnswerCompletion } from "./completions/singleAnswer"; +import { SourceTracker } from "./helpers/source-tracker"; interface ExtractServiceOptions { request: ExtractRequest; @@ -272,6 +274,10 @@ export async function performExtraction( url, isMultiEntity: true, }), + { + // Needs to be true for multi-entity to work properly + onlyMainContent: true, + } ); } return docsMap.get(url); @@ -313,6 +319,7 @@ export async function performExtraction( const chunkSize = 50; const timeoutCompletion = 45000; // 45 second timeout const chunks: Document[][] = []; + const extractionResults: {extract: any, url: string}[] = []; // Split into chunks for (let i = 0; i < multyEntityDocs.length; i += chunkSize) { @@ -361,7 +368,6 @@ export async function performExtraction( "is_content_relevant", ], }; - // console.log("schemaWithConfidence", schemaWithConfidence); await updateExtract(extractId, { status: "processing", @@ -377,7 +383,7 @@ export async function performExtraction( ], }); - const completionPromise = batchExtractPromise(multiEntitySchema, links, request.prompt ?? "", request.systemPrompt ?? "", doc); + const completionPromise = batchExtractPromise(multiEntitySchema, links, request.prompt ?? "", request.systemPrompt ?? "", doc); // Race between timeout and completion const multiEntityCompletion = (await Promise.race([ @@ -389,21 +395,11 @@ export async function performExtraction( if (multiEntityCompletion) { tokenUsage.push(multiEntityCompletion.totalUsage); - // Track sources for multi-entity items if (multiEntityCompletion.extract) { - // For each multi-entity key, track the source URL - multiEntityKeys.forEach(key => { - const items = multiEntityCompletion.extract[key]; - if (Array.isArray(items)) { - items.forEach((item, index) => { - const sourcePath = `${key}[${index}]`; - if (!sources[sourcePath]) { - sources[sourcePath] = []; - } - sources[sourcePath].push(doc.metadata.url || doc.metadata.sourceURL || ""); - }); - } - }); + return { + extract: multiEntityCompletion.extract, + url: doc.metadata.url || doc.metadata.sourceURL || "" + }; } } @@ -439,7 +435,7 @@ export async function performExtraction( // return null; // } - return multiEntityCompletion.extract; + return null; } catch (error) { logger.error(`Failed to process document.`, { error, @@ -451,22 +447,37 @@ export async function performExtraction( // Wait for current chunk to complete before processing next chunk const chunkResults = await Promise.all(chunkPromises); - multiEntityCompletions.push( - ...chunkResults.filter((result) => result !== null), - ); + const validResults = chunkResults.filter((result): result is {extract: any, url: string} => result !== null); + extractionResults.push(...validResults); + multiEntityCompletions.push(...validResults.map(r => r.extract)); logger.debug("All multi-entity completion chunks finished.", { completionCount: multiEntityCompletions.length, }); } try { + // Use SourceTracker to handle source tracking + const sourceTracker = new SourceTracker(); + + // Transform and merge results while preserving sources + sourceTracker.transformResults(extractionResults, multiEntitySchema, false); + multiEntityResult = transformArrayToObject( multiEntitySchema, multiEntityCompletions, ); + + // Track sources before deduplication + sourceTracker.trackPreDeduplicationSources(multiEntityResult); + + // Apply deduplication and merge multiEntityResult = deduplicateObjectsArray(multiEntityResult); multiEntityResult = mergeNullValObjs(multiEntityResult); - // @nick: maybe we can add here a llm that checks if the array probably has a primary key? + + // Map sources to final deduplicated/merged items + const multiEntitySources = sourceTracker.mapSourcesToFinalItems(multiEntityResult, multiEntityKeys); + Object.assign(sources, multiEntitySources); + } catch (error) { logger.error(`Failed to transform array to object`, { error }); return { @@ -741,6 +752,6 @@ export async function performExtraction( urlTrace: request.urlTrace ? urlTraces : undefined, llmUsage, totalUrlsScraped, - // sources, + sources, }; } diff --git a/apps/api/src/lib/extract/helpers/__tests__/source-tracker.test.ts b/apps/api/src/lib/extract/helpers/__tests__/source-tracker.test.ts new file mode 100644 index 00000000..d58806b2 --- /dev/null +++ b/apps/api/src/lib/extract/helpers/__tests__/source-tracker.test.ts @@ -0,0 +1,262 @@ +import { SourceTracker } from "../source-tracker"; +import { transformArrayToObject } from "../transform-array-to-obj"; + +describe("SourceTracker", () => { + let sourceTracker: SourceTracker; + + beforeEach(() => { + sourceTracker = new SourceTracker(); + }); + + describe("transformResults", () => { + it("should transform and merge results while preserving sources", () => { + const extractionResults = [ + { + extract: { products: [{ name: "Product 1", price: 10 }] }, + url: "http://example1.com" + }, + { + extract: { products: [{ name: "Product 2", price: 20 }] }, + url: "http://example2.com" + } + ]; + + const schema = { + type: "object", + properties: { + products: { + type: "array", + items: { + type: "object", + properties: { + name: { type: "string" }, + price: { type: "number" } + } + } + } + } + }; + + const result = sourceTracker.transformResults(extractionResults, schema); + expect(result).toEqual({ + products: [ + { name: "Product 1", price: 10 }, + { name: "Product 2", price: 20 } + ] + }); + }); + + it("should match original transformArrayToObject behavior", () => { + // Test case 1: Simple array transformation + const schema1 = { + type: "object", + properties: { + items: { + type: "array", + items: { + type: "object", + properties: { + id: { type: "number" } + } + } + } + } + }; + + const extractionResults1 = [ + { extract: { items: [{ id: 1 }] }, url: "url1" }, + { extract: { items: [{ id: 2 }] }, url: "url2" } + ]; + + const originalResult1 = transformArrayToObject(schema1, extractionResults1.map(r => r.extract)); + const newResult1 = sourceTracker.transformResults(extractionResults1, schema1); + expect(newResult1).toEqual(originalResult1); + + // Test case 2: Nested objects with arrays + const schema2 = { + type: "object", + properties: { + data: { + type: "object", + properties: { + products: { + type: "array", + items: { + type: "object", + properties: { + id: { type: "number" }, + variants: { + type: "array", + items: { type: "string" } + } + } + } + } + } + } + } + }; + + const extractionResults2 = [ + { + extract: { + data: { + products: [ + { id: 1, variants: ["a", "b"] } + ] + } + }, + url: "url1" + }, + { + extract: { + data: { + products: [ + { id: 2, variants: ["c", "d"] } + ] + } + }, + url: "url2" + } + ]; + + const originalResult2 = transformArrayToObject(schema2, extractionResults2.map(r => r.extract)); + const newResult2 = sourceTracker.transformResults(extractionResults2, schema2); + expect(newResult2).toEqual(originalResult2); + + // Test case 3: Empty arrays + const emptyResults = []; + const originalResult3 = transformArrayToObject(schema1, emptyResults); + const newResult3 = sourceTracker.transformResults([], schema1); + expect(newResult3).toEqual(originalResult3); + + // Test case 4: Non-array properties + const schema4 = { + type: "object", + properties: { + name: { type: "string" }, + count: { type: "number" } + } + }; + + const extractionResults4 = [ + { extract: { name: "test1", count: 1 }, url: "url1" }, + { extract: { name: "test2", count: 2 }, url: "url2" } + ]; + + const originalResult4 = transformArrayToObject(schema4, extractionResults4.map(r => r.extract)); + const newResult4 = sourceTracker.transformResults(extractionResults4, schema4); + expect(newResult4).toEqual(originalResult4); + }); + }); + + describe("mapSourcesToFinalItems", () => { + it("should correctly map sources after deduplication and merging", () => { + // Setup initial data with mergeable items (same name, complementary fields) + const extractionResults = [ + { + extract: { products: [{ name: "Product 1", price: 10, description: null }] }, + url: "http://example1.com" + }, + { + extract: { products: [{ name: "Product 1", price: null, description: "Great product" }] }, + url: "http://example2.com" + } + ]; + + const schema = { + type: "object", + properties: { + products: { + type: "array", + items: { + type: "object", + properties: { + name: { type: "string" }, + price: { type: "number" }, + description: { type: "string" } + } + } + } + } + }; + + // Transform results first + const multiEntityResult = sourceTracker.transformResults(extractionResults, schema); + sourceTracker.trackPreDeduplicationSources(multiEntityResult); + + // Test source mapping with a merged item that matches both sources + const sources = sourceTracker.mapSourcesToFinalItems( + { + products: [ + { name: "Product 1", price: 10, description: "Great product" } + ] + }, + ["products"] + ); + + expect(sources).toEqual({ + "products[0]": ["http://example1.com", "http://example2.com"] + }); + }); + + it("should handle empty results", () => { + const sources = sourceTracker.mapSourcesToFinalItems({}, []); + expect(sources).toEqual({}); + }); + + it("should handle non-array properties", () => { + const sources = sourceTracker.mapSourcesToFinalItems( + { nonArray: "value" } as any, + ["nonArray"] + ); + expect(sources).toEqual({}); + }); + }); + + describe("trackPreDeduplicationSources", () => { + it("should track sources before deduplication", () => { + const extractionResults = [ + { + extract: { products: [{ id: 1, name: "Product 1" }] }, + url: "http://example1.com" + }, + { + extract: { products: [{ id: 1, name: "Product 1" }] }, + url: "http://example2.com" + } + ]; + + const schema = { + type: "object", + properties: { + products: { + type: "array", + items: { + type: "object", + properties: { + id: { type: "number" }, + name: { type: "string" } + } + } + } + } + }; + + const multiEntityResult = sourceTracker.transformResults(extractionResults, schema); + sourceTracker.trackPreDeduplicationSources(multiEntityResult); + + // Test source mapping after deduplication + const sources = sourceTracker.mapSourcesToFinalItems( + { + products: [{ id: 1, name: "Product 1" }] + }, + ["products"] + ); + + expect(sources).toEqual({ + "products[0]": ["http://example1.com", "http://example2.com"] + }); + }); + }); +}); \ No newline at end of file diff --git a/apps/api/src/lib/extract/helpers/merge-null-val-objs.ts b/apps/api/src/lib/extract/helpers/merge-null-val-objs.ts index 2044097b..4f67f989 100644 --- a/apps/api/src/lib/extract/helpers/merge-null-val-objs.ts +++ b/apps/api/src/lib/extract/helpers/merge-null-val-objs.ts @@ -21,7 +21,7 @@ function unifyItemValues(item: T): T { /** * Check if two objects are mergeable by comparing their non-null values */ -function areMergeable(obj1: any, obj2: any): boolean { +export function areMergeable(obj1: any, obj2: any): boolean { const allKeys = new Set([...Object.keys(obj1), ...Object.keys(obj2)]); let matchingNonNullValues = 0; let nonNullComparisons = 0; diff --git a/apps/api/src/lib/extract/helpers/source-tracker.ts b/apps/api/src/lib/extract/helpers/source-tracker.ts new file mode 100644 index 00000000..797ea3ef --- /dev/null +++ b/apps/api/src/lib/extract/helpers/source-tracker.ts @@ -0,0 +1,116 @@ +import { logger } from "../../../lib/logger"; +import { areMergeable } from "./merge-null-val-objs"; +import { transformArrayToObject } from "./transform-array-to-obj"; + +interface TransformedResult { + transformed: { [key: string]: any[] }; + url: string; +} + +/** + * Tracks sources through the transformation, deduplication, and merging process + */ +export class SourceTracker { + private transformedResults: TransformedResult[]; + private preDedupeSourceMap: Map; + + constructor() { + this.transformedResults = []; + this.preDedupeSourceMap = new Map(); + } + + /** + * Transform raw extraction results into a format that preserves source information + */ + transformResults(extractionResults: { extract: any; url: string }[], schema: any, withTransform: boolean = true) { + // First transform each result individually + this.transformedResults = extractionResults.map(result => ({ + transformed: transformArrayToObject(schema, [result.extract]), + url: result.url + })); + if (withTransform) { + // Then combine all extracts and transform them together to match original behavior + const combinedExtracts = extractionResults.map(r => r.extract); + return transformArrayToObject(schema, combinedExtracts); + } + return this.transformedResults; + } + + /** + * Merge all transformed results into one object - this is now only used internally + */ + private mergeTransformedResults() { + return this.transformedResults.reduce((acc, curr) => { + Object.keys(curr.transformed).forEach(key => { + const value = curr.transformed[key]; + if (!acc[key]) { + acc[key] = Array.isArray(value) ? [...value] : value; + } else if (Array.isArray(acc[key]) && Array.isArray(value)) { + acc[key].push(...value); + } else if (typeof acc[key] === 'object' && typeof value === 'object') { + acc[key] = { ...acc[key], ...value }; + } + }); + return acc; + }, {} as { [key: string]: any[] }); + } + + /** + * Track sources for each item before deduplication + */ + trackPreDeduplicationSources(multiEntityResult: { [key: string]: any[] }) { + try { + Object.keys(multiEntityResult).forEach(key => { + multiEntityResult[key].forEach((item: any) => { + const itemKey = JSON.stringify(item); + const matchingSources = this.transformedResults + .filter(result => + result.transformed[key]?.some((resultItem: any) => + JSON.stringify(resultItem) === itemKey + ) + ) + .map(result => result.url); + this.preDedupeSourceMap.set(itemKey, matchingSources); + }); + }); + } catch (error) { + logger.error(`Failed to track pre-deduplication sources`, { error }); + } + } + + /** + * Map sources to final deduplicated/merged items + */ + mapSourcesToFinalItems( + multiEntityResult: { [key: string]: any[] }, + multiEntityKeys: string[] + ): Record { + try { + const sources: Record = {}; + + multiEntityKeys.forEach(key => { + if (multiEntityResult[key] && Array.isArray(multiEntityResult[key])) { + multiEntityResult[key].forEach((item: any, finalIndex: number) => { + const sourceKey = `${key}[${finalIndex}]`; + const itemSources = new Set(); + + this.transformedResults.forEach(result => { + result.transformed[key]?.forEach((originalItem: any) => { + if (areMergeable(item, originalItem)) { + itemSources.add(result.url); + } + }); + }); + + sources[sourceKey] = Array.from(itemSources); + }); + } + }); + + return sources; + } catch (error) { + logger.error(`Failed to map sources to final items`, { error }); + return {}; + } +} +} \ No newline at end of file diff --git a/apps/api/src/lib/extract/helpers/transform-array-to-obj.ts b/apps/api/src/lib/extract/helpers/transform-array-to-obj.ts index 6c65e234..c164951f 100644 --- a/apps/api/src/lib/extract/helpers/transform-array-to-obj.ts +++ b/apps/api/src/lib/extract/helpers/transform-array-to-obj.ts @@ -1,5 +1,11 @@ import isEqual from "lodash/isEqual"; +/** + * Transforms an array of objects into a single object, merging properties with the same name. + * @param originalSchema - The schema of the original data. + * @param arrayData - The array of objects to transform. + * @returns A single object with merged properties. + */ export function transformArrayToObject( originalSchema: any, arrayData: any[],