Nick: __experimental_streamSteps

This commit is contained in:
Nicolas 2025-01-14 01:45:50 -03:00
parent 558a7f4c08
commit 033e9bbf29
8 changed files with 122 additions and 4 deletions

View File

@ -36,5 +36,6 @@ export async function extractStatusController(
status: extract.status, status: extract.status,
error: extract?.error ?? undefined, error: extract?.error ?? undefined,
expiresAt: (await getExtractExpiry(req.params.jobId)).toISOString(), expiresAt: (await getExtractExpiry(req.params.jobId)).toISOString(),
steps: extract.showSteps ? extract.steps : undefined,
}); });
} }

View File

@ -70,6 +70,7 @@ export async function extractController(
plan: req.auth.plan, plan: req.auth.plan,
createdAt: Date.now(), createdAt: Date.now(),
status: "processing", status: "processing",
showSteps: req.body.__experimental_streamSteps,
}); });
if (Sentry.isInitialized()) { if (Sentry.isInitialized()) {

View File

@ -221,6 +221,7 @@ export const extractV1Options = z
allowExternalLinks: z.boolean().default(false), allowExternalLinks: z.boolean().default(false),
origin: z.string().optional().default("api"), origin: z.string().optional().default("api"),
urlTrace: z.boolean().default(false), urlTrace: z.boolean().default(false),
__experimental_streamSteps: z.boolean().default(false),
timeout: z.number().int().positive().finite().safe().default(60000), timeout: z.number().int().positive().finite().safe().default(60000),
}) })
.strict(strictMessage); .strict(strictMessage);

View File

@ -1,6 +1,25 @@
import { redisConnection } from "../../services/queue-service"; import { redisConnection } from "../../services/queue-service";
import { logger as _logger } from "../logger"; import { logger as _logger } from "../logger";
export enum ExtractStep {
INITIAL = "initial",
MULTI_ENTITY = "multi-entity",
MULTI_ENTITY_SCRAPE = "multi-entity-scrape",
MULTI_ENTITY_EXTRACT = "multi-entity-extract",
SCRAPE = "scrape",
MAP = "map",
EXTRACT = "extract",
COMPLETE = "complete",
}
export type ExtractedStep = {
step: ExtractStep;
startedAt: number;
finishedAt: number;
error?: any;
discoveredLinks?: string[];
};
export type StoredExtract = { export type StoredExtract = {
id: string; id: string;
team_id: string; team_id: string;
@ -8,6 +27,8 @@ export type StoredExtract = {
createdAt: number; createdAt: number;
status: "processing" | "completed" | "failed" | "cancelled"; status: "processing" | "completed" | "failed" | "cancelled";
error?: any; error?: any;
showSteps?: boolean;
steps?: ExtractedStep[];
}; };
export async function saveExtract(id: string, extract: StoredExtract) { export async function saveExtract(id: string, extract: StoredExtract) {
@ -27,6 +48,12 @@ export async function updateExtract(
) { ) {
const current = await getExtract(id); const current = await getExtract(id);
if (!current) return; if (!current) return;
// Handle steps aggregation
if (extract.steps && current.steps) {
extract.steps = [...current.steps, ...extract.steps];
}
await redisConnection.set( await redisConnection.set(
"extract:" + id, "extract:" + id,
JSON.stringify({ ...current, ...extract }), JSON.stringify({ ...current, ...extract }),

View File

@ -24,7 +24,7 @@ import Ajv from "ajv";
const ajv = new Ajv(); const ajv = new Ajv();
const openai = new OpenAI(); const openai = new OpenAI();
import { updateExtract } from "./extract-redis"; import { ExtractStep, updateExtract } from "./extract-redis";
import { deduplicateObjectsArray } from "./helpers/deduplicate-objs-array"; import { deduplicateObjectsArray } from "./helpers/deduplicate-objs-array";
import { mergeNullValObjs } from "./helpers/merge-null-val-objs"; import { mergeNullValObjs } from "./helpers/merge-null-val-objs";
import { CUSTOM_U_TEAMS } from "./config"; import { CUSTOM_U_TEAMS } from "./config";
@ -157,6 +157,19 @@ export async function performExtraction(
let multiEntityCompletions: completions[] = []; let multiEntityCompletions: completions[] = [];
let multiEntityResult: any = {}; let multiEntityResult: any = {};
let singleAnswerResult: any = {}; let singleAnswerResult: any = {};
await updateExtract(extractId, {
status: "processing",
steps: [
{
step: ExtractStep.INITIAL,
startedAt: Date.now(),
finishedAt: Date.now(),
discoveredLinks: request.urls,
},
],
});
// Process URLs // Process URLs
const urlPromises = request.urls.map((url) => const urlPromises = request.urls.map((url) =>
processUrl( processUrl(
@ -188,6 +201,18 @@ export async function performExtraction(
}; };
} }
await updateExtract(extractId, {
status: "processing",
steps: [
{
step: ExtractStep.MAP,
startedAt: Date.now(),
finishedAt: Date.now(),
discoveredLinks: links,
},
],
});
let reqSchema = request.schema; let reqSchema = request.schema;
reqSchema = await dereferenceSchema(reqSchema); reqSchema = await dereferenceSchema(reqSchema);
@ -209,8 +234,32 @@ export async function performExtraction(
const { singleAnswerSchema, multiEntitySchema } = await spreadSchemas(reqSchema, multiEntityKeys) const { singleAnswerSchema, multiEntitySchema } = await spreadSchemas(reqSchema, multiEntityKeys)
rSchema = singleAnswerSchema; rSchema = singleAnswerSchema;
await updateExtract(extractId, {
status: "processing",
steps: [
{
step: ExtractStep.MULTI_ENTITY,
startedAt: Date.now(),
finishedAt: Date.now(),
discoveredLinks: [],
},
],
});
const timeout = Math.floor((request.timeout || 40000) * 0.7) || 30000; const timeout = Math.floor((request.timeout || 40000) * 0.7) || 30000;
await updateExtract(extractId, {
status: "processing",
steps: [
{
step: ExtractStep.MULTI_ENTITY_SCRAPE,
startedAt: Date.now(),
finishedAt: Date.now(),
discoveredLinks: [],
},
],
});
const scrapePromises = links.map((url) => { const scrapePromises = links.map((url) => {
if (!docsMap.has(url)) { if (!docsMap.has(url)) {
return scrapeDocument( return scrapeDocument(
@ -298,6 +347,18 @@ export async function performExtraction(
}; };
// console.log("schemaWithConfidence", schemaWithConfidence); // console.log("schemaWithConfidence", schemaWithConfidence);
await updateExtract(extractId, {
status: "processing",
steps: [
{
step: ExtractStep.MULTI_ENTITY_EXTRACT,
startedAt: Date.now(),
finishedAt: Date.now(),
discoveredLinks: [doc.metadata.url || doc.metadata.sourceURL || ""],
},
],
});
const completionPromise = generateOpenAICompletions( const completionPromise = generateOpenAICompletions(
logger.child({ method: "extractService/generateOpenAICompletions" }), logger.child({ method: "extractService/generateOpenAICompletions" }),
{ {
@ -386,6 +447,17 @@ export async function performExtraction(
// let rerank = await rerankLinks(links.map((url) => ({ url })), request.prompt ?? JSON.stringify(request.schema), urlTraces); // let rerank = await rerankLinks(links.map((url) => ({ url })), request.prompt ?? JSON.stringify(request.schema), urlTraces);
await updateExtract(extractId, {
status: "processing",
steps: [
{
step: ExtractStep.SCRAPE,
startedAt: Date.now(),
finishedAt: Date.now(),
discoveredLinks: links,
},
],
});
const scrapePromises = links.map((url) => { const scrapePromises = links.map((url) => {
if (!docsMap.has(url)) { if (!docsMap.has(url)) {
return scrapeDocument( return scrapeDocument(
@ -431,6 +503,18 @@ export async function performExtraction(
}; };
} }
await updateExtract(extractId, {
status: "processing",
steps: [
{
step: ExtractStep.EXTRACT,
startedAt: Date.now(),
finishedAt: Date.now(),
discoveredLinks: [],
},
],
});
// Generate completions // Generate completions
singleAnswerCompletions = await generateOpenAICompletions( singleAnswerCompletions = await generateOpenAICompletions(
logger.child({ method: "extractService/generateOpenAICompletions" }), logger.child({ method: "extractService/generateOpenAICompletions" }),

View File

@ -7,6 +7,8 @@ import { generateBasicCompletion } from "../LLM-extraction";
import { buildRefrasedPrompt } from "./build-prompts"; import { buildRefrasedPrompt } from "./build-prompts";
import { rerankLinksWithLLM } from "./reranker"; import { rerankLinksWithLLM } from "./reranker";
import { extractConfig } from "./config"; import { extractConfig } from "./config";
import { updateExtract } from "./extract-redis";
import { ExtractStep } from "./extract-redis";
interface ProcessUrlOptions { interface ProcessUrlOptions {
url: string; url: string;
@ -157,6 +159,8 @@ export async function processUrl(
extractConfig.RERANKING.MAX_INITIAL_RANKING_LIMIT, extractConfig.RERANKING.MAX_INITIAL_RANKING_LIMIT,
); );
// Perform reranking using either prompt or schema // Perform reranking using either prompt or schema
let searchQuery = ""; let searchQuery = "";
if (options.prompt) { if (options.prompt) {

View File

@ -6,7 +6,7 @@ const saveMocksDirPath = path.join(__dirname, "../mocks/").replace("dist/", "");
const loadMocksDirPath = path.join(__dirname, "../../../__tests__/snips/mocks"); const loadMocksDirPath = path.join(__dirname, "../../../__tests__/snips/mocks");
export async function saveMock(options: unknown, result: unknown) { export async function saveMock(options: unknown, result: unknown) {
if (!process.env.FIRECRAWL_SAVE_MOCKS) return; if (process.env.FIRECRAWL_SAVE_MOCKS !== "true") return;
await fs.mkdir(saveMocksDirPath, { recursive: true }); await fs.mkdir(saveMocksDirPath, { recursive: true });

View File

@ -930,12 +930,12 @@ export default class FirecrawlApp {
* @returns The response from the extract operation. * @returns The response from the extract operation.
*/ */
async asyncExtract( async asyncExtract(
url: string, urls: string[],
params?: ExtractParams, params?: ExtractParams,
idempotencyKey?: string idempotencyKey?: string
): Promise<ExtractResponse | ErrorResponse> { ): Promise<ExtractResponse | ErrorResponse> {
const headers = this.prepareHeaders(idempotencyKey); const headers = this.prepareHeaders(idempotencyKey);
let jsonData: any = { url, ...params }; let jsonData: any = { urls, ...params };
let jsonSchema: any; let jsonSchema: any;
try { try {