(feat/deep-research) Alpha implementation of deep research (#1202)

* Nick:

* Revert "fix(v1/types): fix extract -> json rename (FIR-1072) (#1195)"

This reverts commit 586a10f40d354a038afc2b67809f20a7a829f8cb.

* Update deep-research-service.ts

* Nick:

* Nick:

* Nick:

* Nick:

* Nick:

* Nick:

* Update deep-research-service.ts

* Nick:

* Update deep-research-service.ts

* Apply suggestions from code review

---------

Co-authored-by: Gergő Móricz <mo.geryy@gmail.com>
This commit is contained in:
Nicolas 2025-02-19 12:44:21 -03:00 committed by GitHub
parent fc64f436ed
commit 5c47e97db2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 1343 additions and 3 deletions

View File

@ -0,0 +1,47 @@
import { Response } from "express";
import { RequestWithAuth } from "./types";
import {
getDeepResearch,
getDeepResearchExpiry,
} from "../../lib/deep-research/deep-research-redis";
import { supabaseGetJobsById } from "../../lib/supabase-jobs";
export async function deepResearchStatusController(
req: RequestWithAuth<{ jobId: string }, any, any>,
res: Response,
) {
const research = await getDeepResearch(req.params.jobId);
if (!research) {
return res.status(404).json({
success: false,
error: "Deep research job not found",
});
}
let data: any = null;
if (research.status === "completed") {
const jobData = await supabaseGetJobsById([req.params.jobId]);
if (jobData && jobData.length > 0) {
data = jobData[0].docs[0];
}
}
return res.status(200).json({
success: research.status === "failed" ? false : true,
data: {
finalAnalysis: research.finalAnalysis,
// completedSteps: research.completedSteps,
// totalSteps: research.totalExpectedSteps,
},
error: research?.error ?? undefined,
expiresAt: (await getDeepResearchExpiry(req.params.jobId)).toISOString(),
currentDepth: research.currentDepth,
maxDepth: research.maxDepth,
status: research.status,
activities: research.activities,
sources: research.sources,
// summaries: research.summaries,
});
}

View File

@ -0,0 +1,92 @@
import { Request, Response } from "express";
import { RequestWithAuth } from "./types";
import { getDeepResearchQueue } from "../../services/queue-service";
import * as Sentry from "@sentry/node";
import { saveDeepResearch } from "../../lib/deep-research/deep-research-redis";
import { z } from "zod";
export const deepResearchRequestSchema = z.object({
topic: z.string().describe('The topic or question to research'),
maxDepth: z.number().min(1).max(10).default(7).describe('Maximum depth of research iterations'),
timeLimit: z.number().min(30).max(600).default(300).describe('Time limit in seconds'),
__experimental_streamSteps: z.boolean().optional(),
});
export type DeepResearchRequest = z.infer<typeof deepResearchRequestSchema>;
export type DeepResearchResponse = {
success: boolean;
id: string;
};
/**
* Initiates a deep research job based on the provided topic.
* @param req - The request object containing authentication and research parameters.
* @param res - The response object to send the research job ID.
* @returns A promise that resolves when the research job is queued.
*/
export async function deepResearchController(
req: RequestWithAuth<{}, DeepResearchResponse, DeepResearchRequest>,
res: Response<DeepResearchResponse>,
) {
req.body = deepResearchRequestSchema.parse(req.body);
const researchId = crypto.randomUUID();
const jobData = {
request: req.body,
teamId: req.auth.team_id,
plan: req.auth.plan,
subId: req.acuc?.sub_id,
researchId,
};
await saveDeepResearch(researchId, {
id: researchId,
team_id: req.auth.team_id,
plan: req.auth.plan,
createdAt: Date.now(),
status: "processing",
currentDepth: 0,
maxDepth: req.body.maxDepth,
completedSteps: 0,
totalExpectedSteps: req.body.maxDepth * 5, // 5 steps per depth level
findings: [],
sources: [],
activities: [],
summaries: [],
});
if (Sentry.isInitialized()) {
const size = JSON.stringify(jobData).length;
await Sentry.startSpan(
{
name: "Add deep research job",
op: "queue.publish",
attributes: {
"messaging.message.id": researchId,
"messaging.destination.name": getDeepResearchQueue().name,
"messaging.message.body.size": size,
},
},
async (span) => {
await getDeepResearchQueue().add(researchId, {
...jobData,
sentry: {
trace: Sentry.spanToTraceHeader(span),
baggage: Sentry.spanToBaggageHeader(span),
size,
},
}, { jobId: researchId });
},
);
} else {
await getDeepResearchQueue().add(researchId, jobData, {
jobId: researchId,
});
}
return res.status(200).json({
success: true,
id: researchId,
});
}

View File

@ -20,6 +20,42 @@ import { isUrlBlocked } from "../../scraper/WebScraper/utils/blocklist";
import * as Sentry from "@sentry/node"; import * as Sentry from "@sentry/node";
import { BLOCKLISTED_URL_MESSAGE } from "../../lib/strings"; import { BLOCKLISTED_URL_MESSAGE } from "../../lib/strings";
// Used for deep research
export async function searchAndScrapeSearchResult(
query: string,
options: {
teamId: string;
plan: PlanType | undefined;
origin: string;
timeout: number;
scrapeOptions: ScrapeOptions;
}
): Promise<Document[]> {
try {
const searchResults = await search({
query,
num_results: 5
});
const documents = await Promise.all(
searchResults.map(result =>
scrapeSearchResult(
{
url: result.url,
title: result.title,
description: result.description
},
options
)
)
);
return documents;
} catch (error) {
return [];
}
}
async function scrapeSearchResult( async function scrapeSearchResult(
searchResult: { url: string; title: string; description: string }, searchResult: { url: string; title: string; description: string },
options: { options: {
@ -74,7 +110,7 @@ async function scrapeSearchResult(
}); });
let statusCode = 0; let statusCode = 0;
if (error.message.includes("Could not scrape url")) { if (error?.message?.includes("Could not scrape url")) {
statusCode = 403; statusCode = 403;
} }
// Return a minimal document with SERP results at top level // Return a minimal document with SERP results at top level

View File

@ -8,6 +8,7 @@ import {
getExtractQueue, getExtractQueue,
getScrapeQueue, getScrapeQueue,
getIndexQueue, getIndexQueue,
getDeepResearchQueue,
} from "./services/queue-service"; } from "./services/queue-service";
import { v0Router } from "./routes/v0"; import { v0Router } from "./routes/v0";
import os from "os"; import os from "os";
@ -54,6 +55,7 @@ const { addQueue, removeQueue, setQueues, replaceQueues } = createBullBoard({
new BullAdapter(getScrapeQueue()), new BullAdapter(getScrapeQueue()),
new BullAdapter(getExtractQueue()), new BullAdapter(getExtractQueue()),
new BullAdapter(getIndexQueue()), new BullAdapter(getIndexQueue()),
new BullAdapter(getDeepResearchQueue()),
], ],
serverAdapter: serverAdapter, serverAdapter: serverAdapter,
}); });

View File

@ -0,0 +1,102 @@
import { redisConnection } from "../../services/queue-service";
import { logger as _logger } from "../logger";
export enum DeepResearchStep {
INITIAL = "initial",
SEARCH = "search",
EXTRACT = "extract",
ANALYZE = "analyze",
SYNTHESIS = "synthesis",
COMPLETE = "complete"
}
export type DeepResearchActivity = {
type: 'search' | 'extract' | 'analyze' | 'reasoning' | 'synthesis' | 'thought';
status: 'processing' | 'complete' | 'error';
message: string;
timestamp: string;
depth: number;
};
export type DeepResearchSource = {
url: string;
title: string;
description: string;
};
export type DeepResearchFinding = {
text: string;
source: string;
};
export type StoredDeepResearch = {
id: string;
team_id: string;
plan?: string;
createdAt: number;
status: "processing" | "completed" | "failed" | "cancelled";
error?: any;
currentDepth: number;
maxDepth: number;
completedSteps: number;
totalExpectedSteps: number;
findings: DeepResearchFinding[];
sources: DeepResearchSource[];
activities: DeepResearchActivity[];
summaries: string[];
finalAnalysis?: string;
};
// TTL of 6 hours
const DEEP_RESEARCH_TTL = 6 * 60 * 60;
export async function saveDeepResearch(id: string, research: StoredDeepResearch) {
_logger.debug("Saving deep research " + id + " to Redis...");
await redisConnection.set("deep-research:" + id, JSON.stringify(research));
await redisConnection.expire("deep-research:" + id, DEEP_RESEARCH_TTL);
}
export async function getDeepResearch(id: string): Promise<StoredDeepResearch | null> {
const x = await redisConnection.get("deep-research:" + id);
return x ? JSON.parse(x) : null;
}
export async function updateDeepResearch(
id: string,
research: Partial<StoredDeepResearch>,
) {
const current = await getDeepResearch(id);
if (!current) return;
const updatedResearch = {
...current,
...research,
// Append new activities if provided
activities: research.activities
? [...(current.activities || []), ...research.activities]
: current.activities,
// Append new findings if provided
// findings: research.findings
// ? [...(current.findings || []), ...research.findings]
// : current.findings,
// Append new sources if provided
sources: research.sources
? [...(current.sources || []), ...research.sources]
: current.sources,
// Append new summaries if provided
summaries: research.summaries
? [...(current.summaries || []), ...research.summaries]
: current.summaries
};
await redisConnection.set("deep-research:" + id, JSON.stringify(updatedResearch));
await redisConnection.expire("deep-research:" + id, DEEP_RESEARCH_TTL);
}
export async function getDeepResearchExpiry(id: string): Promise<Date> {
const d = new Date();
const ttl = await redisConnection.pttl("deep-research:" + id);
d.setMilliseconds(d.getMilliseconds() + ttl);
d.setMilliseconds(0);
return d;
}

View File

@ -0,0 +1,312 @@
import { logger as _logger } from "../logger";
import { updateDeepResearch } from "./deep-research-redis";
import { PlanType } from "../../types";
import { searchAndScrapeSearchResult } from "../../controllers/v1/search";
import { ResearchLLMService, ResearchStateManager } from "./research-manager";
import { logJob } from "../../services/logging/log_job";
import { updateExtract } from "../extract/extract-redis";
import { billTeam } from "../../services/billing/credit_billing";
interface DeepResearchServiceOptions {
researchId: string;
teamId: string;
plan: string;
topic: string;
maxDepth: number;
timeLimit: number;
subId?: string;
}
export async function performDeepResearch(options: DeepResearchServiceOptions) {
const { researchId, teamId, plan, timeLimit, subId } = options;
const startTime = Date.now();
let currentTopic = options.topic;
const logger = _logger.child({
module: "deep-research",
method: "performDeepResearch",
researchId,
});
logger.debug("[Deep Research] Starting research with options:", { options });
const state = new ResearchStateManager(
researchId,
teamId,
plan,
options.maxDepth,
logger,
options.topic,
);
const llmService = new ResearchLLMService(logger);
try {
while (!state.hasReachedMaxDepth()) {
logger.debug("[Deep Research] Current depth:", state.getCurrentDepth());
const timeElapsed = Date.now() - startTime;
if (timeElapsed >= timeLimit * 1000) {
logger.debug("[Deep Research] Time limit reached, stopping research");
break;
}
await state.incrementDepth();
// Search phase
await state.addActivity({
type: "search",
status: "processing",
message: `Generating search queries for "${currentTopic}"`,
timestamp: new Date().toISOString(),
depth: state.getCurrentDepth(),
});
const nextSearchTopic = state.getNextSearchTopic();
logger.debug("[Deep Research] Next search topic:", { nextSearchTopic });
const searchQueries = (
await llmService.generateSearchQueries(
nextSearchTopic,
state.getFindings(),
)
).slice(0, 3);
logger.debug("[Deep Research] Generated search queries:", { searchQueries });
await state.addActivity({
type: "search",
status: "processing",
message: `Starting ${searchQueries.length} parallel searches for "${currentTopic}"`,
timestamp: new Date().toISOString(),
depth: state.getCurrentDepth(),
});
// Run all searches in parallel
const searchPromises = searchQueries.map(async (searchQuery) => {
await state.addActivity({
type: "search",
status: "processing",
message: `Searching for "${searchQuery.query}" - Goal: ${searchQuery.researchGoal}`,
timestamp: new Date().toISOString(),
depth: state.getCurrentDepth(),
});
const response = await searchAndScrapeSearchResult(searchQuery.query, {
teamId: options.teamId,
plan: options.plan as PlanType,
origin: "deep-research",
timeout: 15000,
scrapeOptions: {
formats: ["markdown"],
onlyMainContent: true,
waitFor: 0,
mobile: false,
parsePDF: false,
useMock: "none",
skipTlsVerification: false,
removeBase64Images: false,
fastMode: false,
blockAds: false,
},
});
return response.length > 0 ? response : [];
});
const searchResultsArrays = await Promise.all(searchPromises);
const searchResults = searchResultsArrays.flat();
logger.debug(
"[Deep Research] Search results count:",
{ count: searchResults.length },
);
if (!searchResults || searchResults.length === 0) {
logger.debug(
"[Deep Research] No results found for topic:",
{ currentTopic },
);
await state.addActivity({
type: "search",
status: "error",
message: `No results found for any queries about "${currentTopic}"`,
timestamp: new Date().toISOString(),
depth: state.getCurrentDepth(),
});
continue;
}
// Filter out already seen URLs and track new ones
const newSearchResults = searchResults.filter((result) => {
if (!result.url || state.hasSeenUrl(result.url)) {
return false;
}
state.addSeenUrl(result.url);
return true;
});
logger.debug(
"[Deep Research] New unique results count:",
{ length: newSearchResults.length },
);
if (newSearchResults.length === 0) {
logger.debug(
"[Deep Research] No new unique results found for topic:",
{ currentTopic },
);
await state.addActivity({
type: "search",
status: "error",
message: `Found ${searchResults.length} results but all URLs were already processed for "${currentTopic}"`,
timestamp: new Date().toISOString(),
depth: state.getCurrentDepth(),
});
continue;
}
await state.addActivity({
type: "search",
status: "complete",
message: `Found ${newSearchResults.length} new relevant results across ${searchQueries.length} parallel queries`,
timestamp: new Date().toISOString(),
depth: state.getCurrentDepth(),
});
await state.addFindings(
newSearchResults.map((result) => ({
text: result.markdown ?? "",
source: result.url ?? "",
})),
);
// Analysis phase
await state.addActivity({
type: "analyze",
status: "processing",
message: "Analyzing findings",
timestamp: new Date().toISOString(),
depth: state.getCurrentDepth(),
});
const timeRemaining = timeLimit * 1000 - (Date.now() - startTime);
logger.debug("[Deep Research] Time remaining (ms):", { timeRemaining });
const analysis = await llmService.analyzeAndPlan(
state.getFindings(),
currentTopic,
timeRemaining,
);
if (!analysis) {
logger.debug("[Deep Research] Analysis failed");
await state.addActivity({
type: "analyze",
status: "error",
message: "Failed to analyze findings",
timestamp: new Date().toISOString(),
depth: state.getCurrentDepth(),
});
state.incrementFailedAttempts();
if (state.hasReachedMaxFailedAttempts()) {
logger.debug("[Deep Research] Max failed attempts reached");
break;
}
continue;
}
logger.debug("[Deep Research] Analysis result:", {
nextTopic: analysis.nextSearchTopic,
shouldContinue: analysis.shouldContinue,
gapsCount: analysis.gaps.length,
});
state.setNextSearchTopic(analysis.nextSearchTopic || "");
await state.addActivity({
type: "analyze",
status: "complete",
message: "Analyzed findings",
timestamp: new Date().toISOString(),
depth: state.getCurrentDepth(),
});
if (!analysis.shouldContinue || analysis.gaps.length === 0) {
logger.debug("[Deep Research] No more gaps to research, ending search");
break;
}
currentTopic = analysis.gaps[0] || currentTopic;
logger.debug("[Deep Research] Next topic to research:", { currentTopic });
}
// Final synthesis
logger.debug("[Deep Research] Starting final synthesis");
await state.addActivity({
type: "synthesis",
status: "processing",
message: "Preparing final analysis",
timestamp: new Date().toISOString(),
depth: state.getCurrentDepth(),
});
const finalAnalysis = await llmService.generateFinalAnalysis(
options.topic,
state.getFindings(),
state.getSummaries(),
);
await state.addActivity({
type: "synthesis",
status: "complete",
message: "Research completed",
timestamp: new Date().toISOString(),
depth: state.getCurrentDepth(),
});
const progress = state.getProgress();
logger.debug("[Deep Research] Research completed successfully");
// Log job with token usage and sources
await logJob({
job_id: researchId,
success: true,
message: "Research completed",
num_docs: 1,
docs: [{ finalAnalysis: finalAnalysis }],
time_taken: (Date.now() - startTime) / 1000,
team_id: teamId,
mode: "deep-research",
url: options.topic,
scrapeOptions: options,
origin: "api",
num_tokens: 0,
tokens_billed: 0,
sources: {},
});
await updateDeepResearch(researchId, {
status: "completed",
finalAnalysis: finalAnalysis,
});
// Bill team for usage
billTeam(teamId, subId, state.getFindings().length, logger).catch(
(error) => {
logger.error(
`Failed to bill team ${teamId} for ${state.getFindings().length} findings`, { teamId, count: state.getFindings().length, error },
);
},
);
return {
success: true,
data: {
finalAnalysis: finalAnalysis,
},
};
} catch (error: any) {
logger.error("Deep research error", { error });
await updateDeepResearch(researchId, {
status: "failed",
error: error.message,
});
throw error;
}
}

View File

@ -0,0 +1,298 @@
import { Logger } from "winston";
import {
DeepResearchActivity,
DeepResearchFinding,
DeepResearchSource,
updateDeepResearch,
} from "./deep-research-redis";
import { generateOpenAICompletions } from "../../scraper/scrapeURL/transformers/llmExtract";
import { truncateText } from "../../scraper/scrapeURL/transformers/llmExtract";
interface AnalysisResult {
gaps: string[];
nextSteps: string[];
shouldContinue: boolean;
nextSearchTopic?: string;
}
export class ResearchStateManager {
private findings: DeepResearchFinding[] = [];
private summaries: string[] = [];
private nextSearchTopic: string = "";
private urlToSearch: string = "";
private currentDepth: number = 0;
private failedAttempts: number = 0;
private readonly maxFailedAttempts: number = 3;
private completedSteps: number = 0;
private readonly totalExpectedSteps: number;
private seenUrls: Set<string> = new Set();
constructor(
private readonly researchId: string,
private readonly teamId: string,
private readonly plan: string,
private readonly maxDepth: number,
private readonly logger: Logger,
private readonly topic: string,
) {
this.totalExpectedSteps = maxDepth * 5; // 5 steps per depth level
this.nextSearchTopic = topic;
}
hasSeenUrl(url: string): boolean {
return this.seenUrls.has(url);
}
addSeenUrl(url: string): void {
this.seenUrls.add(url);
}
getSeenUrls(): Set<string> {
return this.seenUrls;
}
async addActivity(activity: DeepResearchActivity): Promise<void> {
if (activity.status === "complete") {
this.completedSteps++;
}
await updateDeepResearch(this.researchId, {
activities: [activity],
completedSteps: this.completedSteps,
});
}
async addSource(source: DeepResearchSource): Promise<void> {
await updateDeepResearch(this.researchId, {
sources: [source],
});
}
async addFindings(findings: DeepResearchFinding[]): Promise<void> {
// Only keep the most recent 50 findings
// To avoid memory issues for now
this.findings = [...this.findings, ...findings].slice(-50);
await updateDeepResearch(this.researchId, {
findings: findings,
});
}
async addSummary(summary: string): Promise<void> {
this.summaries.push(summary);
await updateDeepResearch(this.researchId, {
summaries: [summary],
});
}
async incrementDepth(): Promise<void> {
this.currentDepth++;
await updateDeepResearch(this.researchId, {
currentDepth: this.currentDepth,
});
}
incrementFailedAttempts(): void {
this.failedAttempts++;
}
getFindings(): DeepResearchFinding[] {
return this.findings;
}
getSummaries(): string[] {
return this.summaries;
}
getCurrentDepth(): number {
return this.currentDepth;
}
hasReachedMaxDepth(): boolean {
return this.currentDepth >= this.maxDepth;
}
hasReachedMaxFailedAttempts(): boolean {
return this.failedAttempts >= this.maxFailedAttempts;
}
getProgress(): { completedSteps: number; totalSteps: number } {
return {
completedSteps: this.completedSteps,
totalSteps: this.totalExpectedSteps,
};
}
setNextSearchTopic(topic: string): void {
this.nextSearchTopic = topic;
}
getNextSearchTopic(): string {
return this.nextSearchTopic;
}
setUrlToSearch(url: string): void {
this.urlToSearch = url;
}
getUrlToSearch(): string {
return this.urlToSearch;
}
}
export class ResearchLLMService {
constructor(private readonly logger: Logger) {}
async generateSearchQueries(
topic: string,
findings: DeepResearchFinding[] = [],
): Promise<{ query: string; researchGoal: string }[]> {
const { extract } = await generateOpenAICompletions(
this.logger.child({
method: "generateSearchQueries",
}),
{
mode: "llm",
systemPrompt:
"You are an expert research agent that generates search queries (SERP) to explore topics deeply and thoroughly. Do not generate repeated queries. Today's date is " +
new Date().toISOString().split("T")[0],
schema: {
type: "object",
properties: {
queries: {
type: "array",
items: {
type: "object",
properties: {
query: {
type: "string",
description: "The search query to use",
},
researchGoal: {
type: "string",
description:
"The specific goal this query aims to achieve and how it advances the research",
},
},
},
},
},
},
prompt: `Generate a list of 3-5 search queries to deeply research this topic: "${topic}"
${findings.length > 0 ? `\nBased on these previous findings, generate more specific queries:\n${truncateText(findings.map((f) => `- ${f.text}`).join("\n"), 10000)}` : ""}
Each query should be specific and focused on a particular aspect.
Build upon previous findings when available.
Be specific and go deep, not wide - always following the original topic.
Every search query is a new SERP query so make sure the whole context is added without overwhelming the search engine.
The first SERP query you generate should be a very concise, simple version of the topic. `,
},
"",
undefined,
true,
);
return extract.queries;
}
async analyzeAndPlan(
findings: DeepResearchFinding[],
currentTopic: string,
timeRemaining: number,
): Promise<AnalysisResult | null> {
try {
const timeRemainingMinutes =
Math.round((timeRemaining / 1000 / 60) * 10) / 10;
const { extract } = await generateOpenAICompletions(
this.logger.child({
method: "analyzeAndPlan",
}),
{
mode: "llm",
systemPrompt:
"You are an expert research agent that is analyzing findings. Your goal is to synthesize information and identify gaps for further research. Today's date is " +
new Date().toISOString().split("T")[0],
schema: {
type: "object",
properties: {
analysis: {
type: "object",
properties: {
gaps: { type: "array", items: { type: "string" } },
nextSteps: { type: "array", items: { type: "string" } },
shouldContinue: { type: "boolean" },
nextSearchTopic: { type: "string" },
},
required: ["gaps", "nextSteps", "shouldContinue"],
},
},
},
prompt: truncateText(
`You are researching: ${currentTopic}
You have ${timeRemainingMinutes} minutes remaining to complete the research but you don't need to use all of it.
Current findings: ${findings.map((f) => `[From ${f.source}]: ${f.text}`).join("\n")}
What has been learned? What gaps remain, if any? What specific aspects should be investigated next if any?
If you need to search for more information inside the same topic pick a sub-topic by including a nextSearchTopic -which should be highly related to the original topic/users'query.
Important: If less than 1 minute remains, set shouldContinue to false to allow time for final synthesis.
If I have enough information, set shouldContinue to false.`,
120000,
),
},
"",
undefined,
true,
);
return extract.analysis;
} catch (error) {
this.logger.error("Analysis error", { error });
return null;
}
}
async generateFinalAnalysis(
topic: string,
findings: DeepResearchFinding[],
summaries: string[],
): Promise<string> {
const { extract } = await generateOpenAICompletions(
this.logger.child({
method: "generateFinalAnalysis",
}),
{
mode: "llm",
systemPrompt:
"You are an expert research analyst who creates comprehensive, well-structured reports. Your reports are detailed, properly formatted in Markdown, and include clear sections with citations. Today's date is " +
new Date().toISOString().split("T")[0],
schema: {
type: "object",
properties: {
report: { type: "string" },
},
},
prompt: truncateText(
`Create a comprehensive research report on "${topic}" based on the collected findings and analysis.
Research data:
${findings.map((f) => `[From ${f.source}]: ${f.text}`).join("\n")}
Requirements:
- Format the report in Markdown with proper headers and sections
- Include specific citations to sources where appropriate
- Provide detailed analysis in each section
- Make it comprehensive and thorough (aim for 4+ pages worth of content)
- Include all relevant findings and insights from the research
- Cite sources
- Use bullet points and lists where appropriate for readability`,
100000,
),
},
"",
undefined,
true,
"gpt-4o"
);
return extract.report;
}
}

View File

@ -0,0 +1,33 @@
import OpenAI from "openai";
const openai = new OpenAI({
apiKey: process.env.OPENAI_API_KEY,
});
interface Message {
role: "system" | "user" | "assistant";
content: string;
}
interface GenerateTextOptions {
model: string;
messages: Message[];
temperature?: number;
maxTokens?: number;
}
export async function generateText(options: GenerateTextOptions) {
const { model, messages, temperature = 0.7, maxTokens } = options;
const completion = await openai.chat.completions.create({
model,
messages,
temperature,
max_tokens: maxTokens,
});
return {
text: completion.choices[0].message.content || "",
usage: completion.usage,
};
}

View File

@ -29,6 +29,8 @@ import { creditUsageController } from "../controllers/v1/credit-usage";
import { BLOCKLISTED_URL_MESSAGE } from "../lib/strings"; import { BLOCKLISTED_URL_MESSAGE } from "../lib/strings";
import { searchController } from "../controllers/v1/search"; import { searchController } from "../controllers/v1/search";
import { crawlErrorsController } from "../controllers/v1/crawl-errors"; import { crawlErrorsController } from "../controllers/v1/crawl-errors";
import { deepResearchController } from "../controllers/v1/deep-research";
import { deepResearchStatusController } from "../controllers/v1/deep-research-status";
function checkCreditsMiddleware( function checkCreditsMiddleware(
minimum?: number, minimum?: number,
@ -240,6 +242,19 @@ v1Router.get(
wrap(extractStatusController), wrap(extractStatusController),
); );
v1Router.post(
"/deep-research",
authMiddleware(RateLimiterMode.Extract),
checkCreditsMiddleware(1),
wrap(deepResearchController),
);
v1Router.get(
"/deep-research/:jobId",
authMiddleware(RateLimiterMode.ExtractStatus),
wrap(deepResearchStatusController),
);
// v1Router.post("/crawlWebsitePreview", crawlPreviewController); // v1Router.post("/crawlWebsitePreview", crawlPreviewController);
v1Router.delete( v1Router.delete(

View File

@ -1,4 +1,10 @@
import { removeDefaultProperty } from "./llmExtract"; import { removeDefaultProperty } from "./llmExtract";
import { truncateText } from "./llmExtract";
import { encoding_for_model } from "@dqbd/tiktoken";
jest.mock("@dqbd/tiktoken", () => ({
encoding_for_model: jest.fn(),
}));
describe("removeDefaultProperty", () => { describe("removeDefaultProperty", () => {
it("should remove the default property from a simple object", () => { it("should remove the default property from a simple object", () => {
@ -39,3 +45,96 @@ describe("removeDefaultProperty", () => {
expect(removeDefaultProperty(123)).toBe(123); expect(removeDefaultProperty(123)).toBe(123);
}); });
}); });
describe("truncateText", () => {
const mockEncode = jest.fn();
const mockEncoder = {
encode: mockEncode,
};
beforeEach(() => {
jest.clearAllMocks();
(encoding_for_model as jest.Mock).mockReturnValue(mockEncoder);
});
it("should return the original text if it's within token limit", () => {
const text = "This is a short text";
mockEncode.mockReturnValue(new Array(5)); // Simulate 5 tokens
const result = truncateText(text, 10);
expect(result).toBe(text);
expect(mockEncode).toHaveBeenCalledWith(text);
});
it("should truncate text that exceeds token limit", () => {
const text = "This is a longer text that needs truncation";
mockEncode.mockReturnValue(new Array(20)); // Simulate 20 tokens
const result = truncateText(text, 10);
expect(result.length).toBeLessThan(text.length);
expect(mockEncode).toHaveBeenCalled();
});
it("should handle empty string", () => {
const text = "";
mockEncode.mockReturnValue([]);
const result = truncateText(text, 10);
expect(result).toBe("");
expect(mockEncode).toHaveBeenCalledWith("");
});
it("should use character-based fallback when encoder throws error", () => {
const text = "This is some text";
mockEncode.mockImplementation(() => {
throw new Error("Encoder error");
});
const result = truncateText(text, 5);
// With modifier of 3, should truncate to approximately 15 characters
expect(result.length).toBeLessThanOrEqual(15);
});
it("should handle very short max token limits", () => {
const text = "Short text";
mockEncode.mockReturnValue(new Array(10));
const result = truncateText(text, 1);
expect(result.length).toBeLessThan(text.length);
});
it("should handle zero max tokens", () => {
const text = "Some text";
mockEncode.mockReturnValue(new Array(2));
const result = truncateText(text, 0);
expect(result).toBe("");
});
it("should handle extremely large text exceeding model context", () => {
// Create a very large text (e.g., 100,000 characters)
const text = "a".repeat(100000);
// First call: simulate 25000 tokens
mockEncode.mockReturnValueOnce(new Array(25000));
// Subsequent calls: simulate gradually decreasing token counts
// This simulates the iterative truncation process
mockEncode
.mockReturnValueOnce(new Array(20000))
.mockReturnValueOnce(new Array(15000))
.mockReturnValueOnce(new Array(12000))
.mockReturnValueOnce(new Array(9000));
const result = truncateText(text, 10000); // Common model context limit
// The result should be significantly shorter but not empty
expect(result.length).toBeLessThan(text.length);
expect(result.length).toBeGreaterThan(0);
// Given our new conservative approach, we should have a substantial amount of text
expect(result.length).toBeGreaterThan(30000); // At least 30% of original
expect(mockEncode).toHaveBeenCalled();
// Log the actual length for verification
console.log("Result length:", result.length, "characters");
});
});

View File

@ -86,6 +86,38 @@ function normalizeSchema(x: any): any {
} }
} }
export function truncateText(text: string, maxTokens: number): string {
const modifier = 3; // Estimate: 1 token ≈ 3-4 characters for safety
try {
const encoder = encoding_for_model("gpt-4o");
// Continuously trim the text until its token count is within the limit.
while (true) {
const tokens = encoder.encode(text);
if (tokens.length <= maxTokens) {
return text;
}
// Calculate a new length using a more conservative approach
// Instead of scaling the entire text, we'll remove a smaller portion
const ratio = maxTokens / tokens.length;
const newLength = Math.max(
Math.ceil(text.length * ratio),
Math.floor(text.length * 0.8) // Never remove more than 20% at once
);
if (newLength <= 0) {
return "";
}
text = text.slice(0, newLength);
}
} catch (error) {
// Fallback using character-based estimation.
if (text.length <= maxTokens * modifier) {
return text;
}
return text.slice(0, maxTokens * modifier);
}
}
export async function generateOpenAICompletions( export async function generateOpenAICompletions(
logger: Logger, logger: Logger,
options: ExtractOptions, options: ExtractOptions,

View File

@ -6,6 +6,7 @@ let scrapeQueue: Queue;
let extractQueue: Queue; let extractQueue: Queue;
let loggingQueue: Queue; let loggingQueue: Queue;
let indexQueue: Queue; let indexQueue: Queue;
let deepResearchQueue: Queue;
export const redisConnection = new IORedis(process.env.REDIS_URL!, { export const redisConnection = new IORedis(process.env.REDIS_URL!, {
maxRetriesPerRequest: null, maxRetriesPerRequest: null,
@ -15,6 +16,7 @@ export const scrapeQueueName = "{scrapeQueue}";
export const extractQueueName = "{extractQueue}"; export const extractQueueName = "{extractQueue}";
export const loggingQueueName = "{loggingQueue}"; export const loggingQueueName = "{loggingQueue}";
export const indexQueueName = "{indexQueue}"; export const indexQueueName = "{indexQueue}";
export const deepResearchQueueName = "{deepResearchQueue}";
export function getScrapeQueue() { export function getScrapeQueue() {
if (!scrapeQueue) { if (!scrapeQueue) {
@ -70,6 +72,24 @@ export function getIndexQueue() {
return indexQueue; return indexQueue;
} }
export function getDeepResearchQueue() {
if (!deepResearchQueue) {
deepResearchQueue = new Queue(deepResearchQueueName, {
connection: redisConnection,
defaultJobOptions: {
removeOnComplete: {
age: 90000, // 25 hours
},
removeOnFail: {
age: 90000, // 25 hours
},
},
});
logger.info("Deep research queue created");
}
return deepResearchQueue;
}
// === REMOVED IN FAVOR OF POLLING -- NOT RELIABLE // === REMOVED IN FAVOR OF POLLING -- NOT RELIABLE
// import { QueueEvents } from 'bullmq'; // import { QueueEvents } from 'bullmq';
// export const scrapeQueueEvents = new QueueEvents(scrapeQueueName, { connection: redisConnection.duplicate() }); // export const scrapeQueueEvents = new QueueEvents(scrapeQueueName, { connection: redisConnection.duplicate() });

View File

@ -5,9 +5,11 @@ import { CustomError } from "../lib/custom-error";
import { import {
getScrapeQueue, getScrapeQueue,
getExtractQueue, getExtractQueue,
getDeepResearchQueue,
redisConnection, redisConnection,
scrapeQueueName, scrapeQueueName,
extractQueueName, extractQueueName,
deepResearchQueueName,
getIndexQueue, getIndexQueue,
} from "./queue-service"; } from "./queue-service";
import { startWebScraperPipeline } from "../main/runWebScraper"; import { startWebScraperPipeline } from "../main/runWebScraper";
@ -65,6 +67,8 @@ import { normalizeUrl, normalizeUrlOnlyHostname } from "../lib/canonical-url";
import { saveExtract, updateExtract } from "../lib/extract/extract-redis"; import { saveExtract, updateExtract } from "../lib/extract/extract-redis";
import { billTeam } from "./billing/credit_billing"; import { billTeam } from "./billing/credit_billing";
import { saveCrawlMap } from "./indexing/crawl-maps-index"; import { saveCrawlMap } from "./indexing/crawl-maps-index";
import { updateDeepResearch } from "../lib/deep-research/deep-research-redis";
import { performDeepResearch } from "../lib/deep-research/deep-research-service";
configDotenv(); configDotenv();
@ -372,6 +376,76 @@ const processExtractJobInternal = async (
} }
}; };
const processDeepResearchJobInternal = async (
token: string,
job: Job & { id: string },
) => {
const logger = _logger.child({
module: "deep-research-worker",
method: "processJobInternal",
jobId: job.id,
researchId: job.data.researchId,
teamId: job.data?.teamId ?? undefined,
});
const extendLockInterval = setInterval(async () => {
logger.info(`🔄 Worker extending lock on job ${job.id}`);
await job.extendLock(token, jobLockExtensionTime);
}, jobLockExtendInterval);
try {
console.log("[Deep Research] Starting deep research: ", job.data.researchId);
const result = await performDeepResearch({
researchId: job.data.researchId,
teamId: job.data.teamId,
plan: job.data.plan,
topic: job.data.request.topic,
maxDepth: job.data.request.maxDepth,
timeLimit: job.data.request.timeLimit,
});
if(result.success) {
// Move job to completed state in Redis and update research status
await job.moveToCompleted(result, token, false);
return result;
} else {
// If the deep research failed but didn't throw an error
const error = new Error("Deep research failed without specific error");
await updateDeepResearch(job.data.researchId, {
status: "failed",
error: error.message,
});
await job.moveToFailed(error, token, false);
return { success: false, error: error.message };
}
} catch (error) {
logger.error(`🚫 Job errored ${job.id} - ${error}`, { error });
Sentry.captureException(error, {
data: {
job: job.id,
},
});
try {
// Move job to failed state in Redis
await job.moveToFailed(error, token, false);
} catch (e) {
logger.error("Failed to move job to failed state in Redis", { error });
}
await updateDeepResearch(job.data.researchId, {
status: "failed",
error: error.message || "Unknown error occurred",
});
return { success: false, error: error.message || "Unknown error occurred" };
} finally {
clearInterval(extendLockInterval);
}
};
let isShuttingDown = false; let isShuttingDown = false;
process.on("SIGINT", () => { process.on("SIGINT", () => {
@ -1090,11 +1164,12 @@ async function processJob(job: Job & { id: string }, token: string) {
// wsq.on("resumed", j => ScrapeEvents.logJobEvent(j, "resumed")); // wsq.on("resumed", j => ScrapeEvents.logJobEvent(j, "resumed"));
// wsq.on("removed", j => ScrapeEvents.logJobEvent(j, "removed")); // wsq.on("removed", j => ScrapeEvents.logJobEvent(j, "removed"));
// Start both workers // Start all workers
(async () => { (async () => {
await Promise.all([ await Promise.all([
workerFun(getScrapeQueue(), processJobInternal), workerFun(getScrapeQueue(), processJobInternal),
workerFun(getExtractQueue(), processExtractJobInternal), workerFun(getExtractQueue(), processExtractJobInternal),
workerFun(getDeepResearchQueue(), processDeepResearchJobInternal),
]); ]);
console.log("All workers exited. Waiting for all jobs to finish..."); console.log("All workers exited. Waiting for all jobs to finish...");

View File

@ -1,6 +1,6 @@
{ {
"name": "@mendable/firecrawl-js", "name": "@mendable/firecrawl-js",
"version": "1.17.0", "version": "1.18.0-beta.8",
"description": "JavaScript SDK for Firecrawl API", "description": "JavaScript SDK for Firecrawl API",
"main": "dist/index.js", "main": "dist/index.js",
"types": "dist/index.d.ts", "types": "dist/index.d.ts",

View File

@ -348,6 +348,70 @@ export interface CrawlErrorsResponse {
robotsBlocked: string[]; robotsBlocked: string[];
}; };
/**
* Parameters for deep research operations.
* Defines options for conducting deep research on a topic.
*/
export interface DeepResearchParams {
/**
* Maximum depth of research iterations (1-10)
* @default 7
*/
maxDepth?: number;
/**
* Time limit in seconds (30-300)
* @default 270
*/
timeLimit?: number;
/**
* Experimental flag for streaming steps
*/
__experimental_streamSteps?: boolean;
}
/**
* Response interface for deep research operations.
*/
export interface DeepResearchResponse {
success: boolean;
id: string;
}
/**
* Status response interface for deep research operations.
*/
export interface DeepResearchStatusResponse {
success: boolean;
data: {
findings: Array<{
text: string;
source: string;
}>;
finalAnalysis: string;
analysis: string;
completedSteps: number;
totalSteps: number;
};
status: "processing" | "completed" | "failed";
error?: string;
expiresAt: string;
currentDepth: number;
maxDepth: number;
activities: Array<{
type: string;
status: string;
message: string;
timestamp: string;
depth: number;
}>;
sources: Array<{
url: string;
title: string;
description: string;
}>;
summaries: string[];
}
/** /**
* Main class for interacting with the Firecrawl API. * Main class for interacting with the Firecrawl API.
* Provides methods for scraping, searching, crawling, and mapping web content. * Provides methods for scraping, searching, crawling, and mapping web content.
@ -1281,6 +1345,119 @@ export default class FirecrawlApp {
); );
} }
} }
/**
* Initiates a deep research operation on a given topic and polls until completion.
* @param params - Parameters for the deep research operation.
* @returns The final research results.
*/
async __deepResearch(topic: string, params: DeepResearchParams): Promise<DeepResearchStatusResponse | ErrorResponse> {
try {
const response = await this.__asyncDeepResearch(topic, params);
if (!response.success || 'error' in response) {
return { success: false, error: 'error' in response ? response.error : 'Unknown error' };
}
if (!response.id) {
throw new FirecrawlError(`Failed to start research. No job ID returned.`, 500);
}
const jobId = response.id;
let researchStatus;
while (true) {
// console.log("Checking research status...");
researchStatus = await this.__checkDeepResearchStatus(jobId);
// console.log("Research status:", researchStatus);
if ('error' in researchStatus && !researchStatus.success) {
return researchStatus;
}
if (researchStatus.status === "completed") {
return researchStatus;
}
if (researchStatus.status === "failed") {
throw new FirecrawlError(
`Research job ${researchStatus.status}. Error: ${researchStatus.error}`,
500
);
}
if (researchStatus.status !== "processing") {
break;
}
await new Promise(resolve => setTimeout(resolve, 2000));
}
// console.log("Research status finished:", researchStatus);
return { success: false, error: "Research job terminated unexpectedly" };
} catch (error: any) {
throw new FirecrawlError(error.message, 500, error.response?.data?.details);
}
}
/**
* Initiates a deep research operation on a given topic without polling.
* @param params - Parameters for the deep research operation.
* @returns The response containing the research job ID.
*/
async __asyncDeepResearch(topic: string, params: DeepResearchParams): Promise<DeepResearchResponse | ErrorResponse> {
const headers = this.prepareHeaders();
try {
const response: AxiosResponse = await this.postRequest(
`${this.apiUrl}/v1/deep-research`,
{ topic, ...params },
headers
);
if (response.status === 200) {
return response.data;
} else {
this.handleError(response, "start deep research");
}
} catch (error: any) {
if (error.response?.data?.error) {
throw new FirecrawlError(`Request failed with status code ${error.response.status}. Error: ${error.response.data.error} ${error.response.data.details ? ` - ${JSON.stringify(error.response.data.details)}` : ''}`, error.response.status);
} else {
throw new FirecrawlError(error.message, 500);
}
}
return { success: false, error: "Internal server error." };
}
/**
* Checks the status of a deep research operation.
* @param id - The ID of the deep research operation.
* @returns The current status and results of the research operation.
*/
async __checkDeepResearchStatus(id: string): Promise<DeepResearchStatusResponse | ErrorResponse> {
const headers = this.prepareHeaders();
try {
const response: AxiosResponse = await this.getRequest(
`${this.apiUrl}/v1/deep-research/${id}`,
headers
);
if (response.status === 200) {
return response.data;
} else if (response.status === 404) {
throw new FirecrawlError("Deep research job not found", 404);
} else {
this.handleError(response, "check deep research status");
}
} catch (error: any) {
if (error.response?.data?.error) {
throw new FirecrawlError(`Request failed with status code ${error.response.status}. Error: ${error.response.data.error} ${error.response.data.details ? ` - ${JSON.stringify(error.response.data.details)}` : ''}`, error.response.status);
} else {
throw new FirecrawlError(error.message, 500);
}
}
return { success: false, error: "Internal server error." };
}
} }
interface CrawlWatcherEvents { interface CrawlWatcherEvents {