diff --git a/apps/api/src/controllers/v1/deep-research-status.ts b/apps/api/src/controllers/v1/deep-research-status.ts new file mode 100644 index 00000000..ea6e9165 --- /dev/null +++ b/apps/api/src/controllers/v1/deep-research-status.ts @@ -0,0 +1,47 @@ +import { Response } from "express"; +import { RequestWithAuth } from "./types"; +import { + getDeepResearch, + getDeepResearchExpiry, +} from "../../lib/deep-research/deep-research-redis"; +import { supabaseGetJobsById } from "../../lib/supabase-jobs"; + +export async function deepResearchStatusController( + req: RequestWithAuth<{ jobId: string }, any, any>, + res: Response, +) { + const research = await getDeepResearch(req.params.jobId); + + if (!research) { + return res.status(404).json({ + success: false, + error: "Deep research job not found", + }); + } + + let data: any = null; + + if (research.status === "completed") { + const jobData = await supabaseGetJobsById([req.params.jobId]); + if (jobData && jobData.length > 0) { + data = jobData[0].docs[0]; + } + } + + return res.status(200).json({ + success: research.status === "failed" ? false : true, + data: { + finalAnalysis: research.finalAnalysis, + // completedSteps: research.completedSteps, + // totalSteps: research.totalExpectedSteps, + }, + error: research?.error ?? undefined, + expiresAt: (await getDeepResearchExpiry(req.params.jobId)).toISOString(), + currentDepth: research.currentDepth, + maxDepth: research.maxDepth, + status: research.status, + activities: research.activities, + sources: research.sources, + // summaries: research.summaries, + }); +} diff --git a/apps/api/src/controllers/v1/deep-research.ts b/apps/api/src/controllers/v1/deep-research.ts new file mode 100644 index 00000000..f5d7b072 --- /dev/null +++ b/apps/api/src/controllers/v1/deep-research.ts @@ -0,0 +1,92 @@ +import { Request, Response } from "express"; +import { RequestWithAuth } from "./types"; +import { getDeepResearchQueue } from "../../services/queue-service"; +import * as Sentry from "@sentry/node"; +import { saveDeepResearch } from "../../lib/deep-research/deep-research-redis"; +import { z } from "zod"; + +export const deepResearchRequestSchema = z.object({ + topic: z.string().describe('The topic or question to research'), + maxDepth: z.number().min(1).max(10).default(7).describe('Maximum depth of research iterations'), + timeLimit: z.number().min(30).max(600).default(300).describe('Time limit in seconds'), + __experimental_streamSteps: z.boolean().optional(), +}); + +export type DeepResearchRequest = z.infer; + +export type DeepResearchResponse = { + success: boolean; + id: string; +}; + +/** + * Initiates a deep research job based on the provided topic. + * @param req - The request object containing authentication and research parameters. + * @param res - The response object to send the research job ID. + * @returns A promise that resolves when the research job is queued. + */ +export async function deepResearchController( + req: RequestWithAuth<{}, DeepResearchResponse, DeepResearchRequest>, + res: Response, +) { + req.body = deepResearchRequestSchema.parse(req.body); + + const researchId = crypto.randomUUID(); + const jobData = { + request: req.body, + teamId: req.auth.team_id, + plan: req.auth.plan, + subId: req.acuc?.sub_id, + researchId, + }; + + await saveDeepResearch(researchId, { + id: researchId, + team_id: req.auth.team_id, + plan: req.auth.plan, + createdAt: Date.now(), + status: "processing", + currentDepth: 0, + maxDepth: req.body.maxDepth, + completedSteps: 0, + totalExpectedSteps: req.body.maxDepth * 5, // 5 steps per depth level + findings: [], + sources: [], + activities: [], + summaries: [], + }); + + if (Sentry.isInitialized()) { + const size = JSON.stringify(jobData).length; + await Sentry.startSpan( + { + name: "Add deep research job", + op: "queue.publish", + attributes: { + "messaging.message.id": researchId, + "messaging.destination.name": getDeepResearchQueue().name, + "messaging.message.body.size": size, + }, + }, + async (span) => { + await getDeepResearchQueue().add(researchId, { + ...jobData, + sentry: { + trace: Sentry.spanToTraceHeader(span), + baggage: Sentry.spanToBaggageHeader(span), + size, + }, + }, { jobId: researchId }); + }, + ); + } else { + await getDeepResearchQueue().add(researchId, jobData, { + jobId: researchId, + }); + } + + return res.status(200).json({ + success: true, + id: researchId, + }); +} diff --git a/apps/api/src/controllers/v1/search.ts b/apps/api/src/controllers/v1/search.ts index 1d8c59eb..7975e75a 100644 --- a/apps/api/src/controllers/v1/search.ts +++ b/apps/api/src/controllers/v1/search.ts @@ -20,6 +20,42 @@ import { isUrlBlocked } from "../../scraper/WebScraper/utils/blocklist"; import * as Sentry from "@sentry/node"; import { BLOCKLISTED_URL_MESSAGE } from "../../lib/strings"; +// Used for deep research +export async function searchAndScrapeSearchResult( + query: string, + options: { + teamId: string; + plan: PlanType | undefined; + origin: string; + timeout: number; + scrapeOptions: ScrapeOptions; + } +): Promise { + try { + const searchResults = await search({ + query, + num_results: 5 + }); + + const documents = await Promise.all( + searchResults.map(result => + scrapeSearchResult( + { + url: result.url, + title: result.title, + description: result.description + }, + options + ) + ) + ); + + return documents; + } catch (error) { + return []; + } +} + async function scrapeSearchResult( searchResult: { url: string; title: string; description: string }, options: { @@ -74,7 +110,7 @@ async function scrapeSearchResult( }); let statusCode = 0; - if (error.message.includes("Could not scrape url")) { + if (error?.message?.includes("Could not scrape url")) { statusCode = 403; } // Return a minimal document with SERP results at top level diff --git a/apps/api/src/index.ts b/apps/api/src/index.ts index d212ae4d..a278be9f 100644 --- a/apps/api/src/index.ts +++ b/apps/api/src/index.ts @@ -8,6 +8,7 @@ import { getExtractQueue, getScrapeQueue, getIndexQueue, + getDeepResearchQueue, } from "./services/queue-service"; import { v0Router } from "./routes/v0"; import os from "os"; @@ -54,6 +55,7 @@ const { addQueue, removeQueue, setQueues, replaceQueues } = createBullBoard({ new BullAdapter(getScrapeQueue()), new BullAdapter(getExtractQueue()), new BullAdapter(getIndexQueue()), + new BullAdapter(getDeepResearchQueue()), ], serverAdapter: serverAdapter, }); diff --git a/apps/api/src/lib/deep-research/deep-research-redis.ts b/apps/api/src/lib/deep-research/deep-research-redis.ts new file mode 100644 index 00000000..efe757f6 --- /dev/null +++ b/apps/api/src/lib/deep-research/deep-research-redis.ts @@ -0,0 +1,102 @@ +import { redisConnection } from "../../services/queue-service"; +import { logger as _logger } from "../logger"; + +export enum DeepResearchStep { + INITIAL = "initial", + SEARCH = "search", + EXTRACT = "extract", + ANALYZE = "analyze", + SYNTHESIS = "synthesis", + COMPLETE = "complete" +} + +export type DeepResearchActivity = { + type: 'search' | 'extract' | 'analyze' | 'reasoning' | 'synthesis' | 'thought'; + status: 'processing' | 'complete' | 'error'; + message: string; + timestamp: string; + depth: number; +}; + +export type DeepResearchSource = { + url: string; + title: string; + description: string; +}; + +export type DeepResearchFinding = { + text: string; + source: string; +}; + +export type StoredDeepResearch = { + id: string; + team_id: string; + plan?: string; + createdAt: number; + status: "processing" | "completed" | "failed" | "cancelled"; + error?: any; + currentDepth: number; + maxDepth: number; + completedSteps: number; + totalExpectedSteps: number; + findings: DeepResearchFinding[]; + sources: DeepResearchSource[]; + activities: DeepResearchActivity[]; + summaries: string[]; + finalAnalysis?: string; +}; + +// TTL of 6 hours +const DEEP_RESEARCH_TTL = 6 * 60 * 60; + +export async function saveDeepResearch(id: string, research: StoredDeepResearch) { + _logger.debug("Saving deep research " + id + " to Redis..."); + await redisConnection.set("deep-research:" + id, JSON.stringify(research)); + await redisConnection.expire("deep-research:" + id, DEEP_RESEARCH_TTL); +} + +export async function getDeepResearch(id: string): Promise { + const x = await redisConnection.get("deep-research:" + id); + return x ? JSON.parse(x) : null; +} + +export async function updateDeepResearch( + id: string, + research: Partial, +) { + const current = await getDeepResearch(id); + if (!current) return; + + const updatedResearch = { + ...current, + ...research, + // Append new activities if provided + activities: research.activities + ? [...(current.activities || []), ...research.activities] + : current.activities, + // Append new findings if provided + // findings: research.findings + // ? [...(current.findings || []), ...research.findings] + // : current.findings, + // Append new sources if provided + sources: research.sources + ? [...(current.sources || []), ...research.sources] + : current.sources, + // Append new summaries if provided + summaries: research.summaries + ? [...(current.summaries || []), ...research.summaries] + : current.summaries + }; + + await redisConnection.set("deep-research:" + id, JSON.stringify(updatedResearch)); + await redisConnection.expire("deep-research:" + id, DEEP_RESEARCH_TTL); +} + +export async function getDeepResearchExpiry(id: string): Promise { + const d = new Date(); + const ttl = await redisConnection.pttl("deep-research:" + id); + d.setMilliseconds(d.getMilliseconds() + ttl); + d.setMilliseconds(0); + return d; +} \ No newline at end of file diff --git a/apps/api/src/lib/deep-research/deep-research-service.ts b/apps/api/src/lib/deep-research/deep-research-service.ts new file mode 100644 index 00000000..1fe6f073 --- /dev/null +++ b/apps/api/src/lib/deep-research/deep-research-service.ts @@ -0,0 +1,312 @@ +import { logger as _logger } from "../logger"; +import { updateDeepResearch } from "./deep-research-redis"; +import { PlanType } from "../../types"; +import { searchAndScrapeSearchResult } from "../../controllers/v1/search"; +import { ResearchLLMService, ResearchStateManager } from "./research-manager"; +import { logJob } from "../../services/logging/log_job"; +import { updateExtract } from "../extract/extract-redis"; +import { billTeam } from "../../services/billing/credit_billing"; + +interface DeepResearchServiceOptions { + researchId: string; + teamId: string; + plan: string; + topic: string; + maxDepth: number; + timeLimit: number; + subId?: string; +} + +export async function performDeepResearch(options: DeepResearchServiceOptions) { + const { researchId, teamId, plan, timeLimit, subId } = options; + const startTime = Date.now(); + let currentTopic = options.topic; + + const logger = _logger.child({ + module: "deep-research", + method: "performDeepResearch", + researchId, + }); + + logger.debug("[Deep Research] Starting research with options:", { options }); + + const state = new ResearchStateManager( + researchId, + teamId, + plan, + options.maxDepth, + logger, + options.topic, + ); + const llmService = new ResearchLLMService(logger); + + try { + while (!state.hasReachedMaxDepth()) { + logger.debug("[Deep Research] Current depth:", state.getCurrentDepth()); + const timeElapsed = Date.now() - startTime; + if (timeElapsed >= timeLimit * 1000) { + logger.debug("[Deep Research] Time limit reached, stopping research"); + break; + } + + await state.incrementDepth(); + + // Search phase + await state.addActivity({ + type: "search", + status: "processing", + message: `Generating search queries for "${currentTopic}"`, + timestamp: new Date().toISOString(), + depth: state.getCurrentDepth(), + }); + + const nextSearchTopic = state.getNextSearchTopic(); + logger.debug("[Deep Research] Next search topic:", { nextSearchTopic }); + + const searchQueries = ( + await llmService.generateSearchQueries( + nextSearchTopic, + state.getFindings(), + ) + ).slice(0, 3); + + logger.debug("[Deep Research] Generated search queries:", { searchQueries }); + + await state.addActivity({ + type: "search", + status: "processing", + message: `Starting ${searchQueries.length} parallel searches for "${currentTopic}"`, + timestamp: new Date().toISOString(), + depth: state.getCurrentDepth(), + }); + + // Run all searches in parallel + const searchPromises = searchQueries.map(async (searchQuery) => { + await state.addActivity({ + type: "search", + status: "processing", + message: `Searching for "${searchQuery.query}" - Goal: ${searchQuery.researchGoal}`, + timestamp: new Date().toISOString(), + depth: state.getCurrentDepth(), + }); + + const response = await searchAndScrapeSearchResult(searchQuery.query, { + teamId: options.teamId, + plan: options.plan as PlanType, + origin: "deep-research", + timeout: 15000, + scrapeOptions: { + formats: ["markdown"], + onlyMainContent: true, + waitFor: 0, + mobile: false, + parsePDF: false, + useMock: "none", + skipTlsVerification: false, + removeBase64Images: false, + fastMode: false, + blockAds: false, + }, + }); + return response.length > 0 ? response : []; + }); + + const searchResultsArrays = await Promise.all(searchPromises); + const searchResults = searchResultsArrays.flat(); + + logger.debug( + "[Deep Research] Search results count:", + { count: searchResults.length }, + ); + + if (!searchResults || searchResults.length === 0) { + logger.debug( + "[Deep Research] No results found for topic:", + { currentTopic }, + ); + await state.addActivity({ + type: "search", + status: "error", + message: `No results found for any queries about "${currentTopic}"`, + timestamp: new Date().toISOString(), + depth: state.getCurrentDepth(), + }); + continue; + } + + // Filter out already seen URLs and track new ones + const newSearchResults = searchResults.filter((result) => { + if (!result.url || state.hasSeenUrl(result.url)) { + return false; + } + state.addSeenUrl(result.url); + return true; + }); + + logger.debug( + "[Deep Research] New unique results count:", + { length: newSearchResults.length }, + ); + + if (newSearchResults.length === 0) { + logger.debug( + "[Deep Research] No new unique results found for topic:", + { currentTopic }, + ); + await state.addActivity({ + type: "search", + status: "error", + message: `Found ${searchResults.length} results but all URLs were already processed for "${currentTopic}"`, + timestamp: new Date().toISOString(), + depth: state.getCurrentDepth(), + }); + continue; + } + + await state.addActivity({ + type: "search", + status: "complete", + message: `Found ${newSearchResults.length} new relevant results across ${searchQueries.length} parallel queries`, + timestamp: new Date().toISOString(), + depth: state.getCurrentDepth(), + }); + + await state.addFindings( + newSearchResults.map((result) => ({ + text: result.markdown ?? "", + source: result.url ?? "", + })), + ); + + // Analysis phase + await state.addActivity({ + type: "analyze", + status: "processing", + message: "Analyzing findings", + timestamp: new Date().toISOString(), + depth: state.getCurrentDepth(), + }); + + const timeRemaining = timeLimit * 1000 - (Date.now() - startTime); + logger.debug("[Deep Research] Time remaining (ms):", { timeRemaining }); + + const analysis = await llmService.analyzeAndPlan( + state.getFindings(), + currentTopic, + timeRemaining, + ); + + if (!analysis) { + logger.debug("[Deep Research] Analysis failed"); + await state.addActivity({ + type: "analyze", + status: "error", + message: "Failed to analyze findings", + timestamp: new Date().toISOString(), + depth: state.getCurrentDepth(), + }); + + state.incrementFailedAttempts(); + if (state.hasReachedMaxFailedAttempts()) { + logger.debug("[Deep Research] Max failed attempts reached"); + break; + } + continue; + } + + logger.debug("[Deep Research] Analysis result:", { + nextTopic: analysis.nextSearchTopic, + shouldContinue: analysis.shouldContinue, + gapsCount: analysis.gaps.length, + }); + + state.setNextSearchTopic(analysis.nextSearchTopic || ""); + + await state.addActivity({ + type: "analyze", + status: "complete", + message: "Analyzed findings", + timestamp: new Date().toISOString(), + depth: state.getCurrentDepth(), + }); + + if (!analysis.shouldContinue || analysis.gaps.length === 0) { + logger.debug("[Deep Research] No more gaps to research, ending search"); + break; + } + + currentTopic = analysis.gaps[0] || currentTopic; + logger.debug("[Deep Research] Next topic to research:", { currentTopic }); + } + + // Final synthesis + logger.debug("[Deep Research] Starting final synthesis"); + await state.addActivity({ + type: "synthesis", + status: "processing", + message: "Preparing final analysis", + timestamp: new Date().toISOString(), + depth: state.getCurrentDepth(), + }); + + const finalAnalysis = await llmService.generateFinalAnalysis( + options.topic, + state.getFindings(), + state.getSummaries(), + ); + + await state.addActivity({ + type: "synthesis", + status: "complete", + message: "Research completed", + timestamp: new Date().toISOString(), + depth: state.getCurrentDepth(), + }); + + const progress = state.getProgress(); + logger.debug("[Deep Research] Research completed successfully"); + + // Log job with token usage and sources + await logJob({ + job_id: researchId, + success: true, + message: "Research completed", + num_docs: 1, + docs: [{ finalAnalysis: finalAnalysis }], + time_taken: (Date.now() - startTime) / 1000, + team_id: teamId, + mode: "deep-research", + url: options.topic, + scrapeOptions: options, + origin: "api", + num_tokens: 0, + tokens_billed: 0, + sources: {}, + }); + await updateDeepResearch(researchId, { + status: "completed", + finalAnalysis: finalAnalysis, + }); + // Bill team for usage + billTeam(teamId, subId, state.getFindings().length, logger).catch( + (error) => { + logger.error( + `Failed to bill team ${teamId} for ${state.getFindings().length} findings`, { teamId, count: state.getFindings().length, error }, + ); + }, + ); + return { + success: true, + data: { + finalAnalysis: finalAnalysis, + }, + }; + } catch (error: any) { + logger.error("Deep research error", { error }); + await updateDeepResearch(researchId, { + status: "failed", + error: error.message, + }); + throw error; + } +} diff --git a/apps/api/src/lib/deep-research/research-manager.ts b/apps/api/src/lib/deep-research/research-manager.ts new file mode 100644 index 00000000..556df078 --- /dev/null +++ b/apps/api/src/lib/deep-research/research-manager.ts @@ -0,0 +1,298 @@ +import { Logger } from "winston"; +import { + DeepResearchActivity, + DeepResearchFinding, + DeepResearchSource, + updateDeepResearch, +} from "./deep-research-redis"; +import { generateOpenAICompletions } from "../../scraper/scrapeURL/transformers/llmExtract"; +import { truncateText } from "../../scraper/scrapeURL/transformers/llmExtract"; + +interface AnalysisResult { + gaps: string[]; + nextSteps: string[]; + shouldContinue: boolean; + nextSearchTopic?: string; +} + +export class ResearchStateManager { + private findings: DeepResearchFinding[] = []; + private summaries: string[] = []; + private nextSearchTopic: string = ""; + private urlToSearch: string = ""; + private currentDepth: number = 0; + private failedAttempts: number = 0; + private readonly maxFailedAttempts: number = 3; + private completedSteps: number = 0; + private readonly totalExpectedSteps: number; + private seenUrls: Set = new Set(); + + constructor( + private readonly researchId: string, + private readonly teamId: string, + private readonly plan: string, + private readonly maxDepth: number, + private readonly logger: Logger, + private readonly topic: string, + ) { + this.totalExpectedSteps = maxDepth * 5; // 5 steps per depth level + this.nextSearchTopic = topic; + } + + hasSeenUrl(url: string): boolean { + return this.seenUrls.has(url); + } + + addSeenUrl(url: string): void { + this.seenUrls.add(url); + } + + getSeenUrls(): Set { + return this.seenUrls; + } + + async addActivity(activity: DeepResearchActivity): Promise { + if (activity.status === "complete") { + this.completedSteps++; + } + + await updateDeepResearch(this.researchId, { + activities: [activity], + completedSteps: this.completedSteps, + }); + } + + async addSource(source: DeepResearchSource): Promise { + await updateDeepResearch(this.researchId, { + sources: [source], + }); + } + + async addFindings(findings: DeepResearchFinding[]): Promise { + // Only keep the most recent 50 findings + // To avoid memory issues for now + this.findings = [...this.findings, ...findings].slice(-50); + await updateDeepResearch(this.researchId, { + findings: findings, + }); + } + + async addSummary(summary: string): Promise { + this.summaries.push(summary); + await updateDeepResearch(this.researchId, { + summaries: [summary], + }); + } + + async incrementDepth(): Promise { + this.currentDepth++; + await updateDeepResearch(this.researchId, { + currentDepth: this.currentDepth, + }); + } + + incrementFailedAttempts(): void { + this.failedAttempts++; + } + + getFindings(): DeepResearchFinding[] { + return this.findings; + } + + getSummaries(): string[] { + return this.summaries; + } + + getCurrentDepth(): number { + return this.currentDepth; + } + + hasReachedMaxDepth(): boolean { + return this.currentDepth >= this.maxDepth; + } + + hasReachedMaxFailedAttempts(): boolean { + return this.failedAttempts >= this.maxFailedAttempts; + } + + getProgress(): { completedSteps: number; totalSteps: number } { + return { + completedSteps: this.completedSteps, + totalSteps: this.totalExpectedSteps, + }; + } + + setNextSearchTopic(topic: string): void { + this.nextSearchTopic = topic; + } + + getNextSearchTopic(): string { + return this.nextSearchTopic; + } + + setUrlToSearch(url: string): void { + this.urlToSearch = url; + } + + getUrlToSearch(): string { + return this.urlToSearch; + } +} + +export class ResearchLLMService { + constructor(private readonly logger: Logger) {} + + async generateSearchQueries( + topic: string, + findings: DeepResearchFinding[] = [], + ): Promise<{ query: string; researchGoal: string }[]> { + const { extract } = await generateOpenAICompletions( + this.logger.child({ + method: "generateSearchQueries", + }), + { + mode: "llm", + systemPrompt: + "You are an expert research agent that generates search queries (SERP) to explore topics deeply and thoroughly. Do not generate repeated queries. Today's date is " + + new Date().toISOString().split("T")[0], + schema: { + type: "object", + properties: { + queries: { + type: "array", + items: { + type: "object", + properties: { + query: { + type: "string", + description: "The search query to use", + }, + researchGoal: { + type: "string", + description: + "The specific goal this query aims to achieve and how it advances the research", + }, + }, + }, + }, + }, + }, + prompt: `Generate a list of 3-5 search queries to deeply research this topic: "${topic}" + ${findings.length > 0 ? `\nBased on these previous findings, generate more specific queries:\n${truncateText(findings.map((f) => `- ${f.text}`).join("\n"), 10000)}` : ""} + + Each query should be specific and focused on a particular aspect. + Build upon previous findings when available. + Be specific and go deep, not wide - always following the original topic. + Every search query is a new SERP query so make sure the whole context is added without overwhelming the search engine. + The first SERP query you generate should be a very concise, simple version of the topic. `, + }, + "", + undefined, + true, + ); + + return extract.queries; + } + + async analyzeAndPlan( + findings: DeepResearchFinding[], + currentTopic: string, + timeRemaining: number, + ): Promise { + try { + const timeRemainingMinutes = + Math.round((timeRemaining / 1000 / 60) * 10) / 10; + + const { extract } = await generateOpenAICompletions( + this.logger.child({ + method: "analyzeAndPlan", + }), + { + mode: "llm", + systemPrompt: + "You are an expert research agent that is analyzing findings. Your goal is to synthesize information and identify gaps for further research. Today's date is " + + new Date().toISOString().split("T")[0], + schema: { + type: "object", + properties: { + analysis: { + type: "object", + properties: { + gaps: { type: "array", items: { type: "string" } }, + nextSteps: { type: "array", items: { type: "string" } }, + shouldContinue: { type: "boolean" }, + nextSearchTopic: { type: "string" }, + }, + required: ["gaps", "nextSteps", "shouldContinue"], + }, + }, + }, + prompt: truncateText( + `You are researching: ${currentTopic} + You have ${timeRemainingMinutes} minutes remaining to complete the research but you don't need to use all of it. + Current findings: ${findings.map((f) => `[From ${f.source}]: ${f.text}`).join("\n")} + What has been learned? What gaps remain, if any? What specific aspects should be investigated next if any? + If you need to search for more information inside the same topic pick a sub-topic by including a nextSearchTopic -which should be highly related to the original topic/users'query. + Important: If less than 1 minute remains, set shouldContinue to false to allow time for final synthesis. + If I have enough information, set shouldContinue to false.`, + 120000, + ), + }, + "", + undefined, + true, + ); + + return extract.analysis; + } catch (error) { + this.logger.error("Analysis error", { error }); + return null; + } + } + + async generateFinalAnalysis( + topic: string, + findings: DeepResearchFinding[], + summaries: string[], + ): Promise { + const { extract } = await generateOpenAICompletions( + this.logger.child({ + method: "generateFinalAnalysis", + }), + { + mode: "llm", + systemPrompt: + "You are an expert research analyst who creates comprehensive, well-structured reports. Your reports are detailed, properly formatted in Markdown, and include clear sections with citations. Today's date is " + + new Date().toISOString().split("T")[0], + schema: { + type: "object", + properties: { + report: { type: "string" }, + }, + }, + prompt: truncateText( + `Create a comprehensive research report on "${topic}" based on the collected findings and analysis. + + Research data: + ${findings.map((f) => `[From ${f.source}]: ${f.text}`).join("\n")} + + Requirements: + - Format the report in Markdown with proper headers and sections + - Include specific citations to sources where appropriate + - Provide detailed analysis in each section + - Make it comprehensive and thorough (aim for 4+ pages worth of content) + - Include all relevant findings and insights from the research + - Cite sources + - Use bullet points and lists where appropriate for readability`, + 100000, + ), + }, + "", + undefined, + true, + "gpt-4o" + ); + + return extract.report; + } +} diff --git a/apps/api/src/lib/llm/generate.ts b/apps/api/src/lib/llm/generate.ts new file mode 100644 index 00000000..5249dadd --- /dev/null +++ b/apps/api/src/lib/llm/generate.ts @@ -0,0 +1,33 @@ +import OpenAI from "openai"; + +const openai = new OpenAI({ + apiKey: process.env.OPENAI_API_KEY, +}); + +interface Message { + role: "system" | "user" | "assistant"; + content: string; +} + +interface GenerateTextOptions { + model: string; + messages: Message[]; + temperature?: number; + maxTokens?: number; +} + +export async function generateText(options: GenerateTextOptions) { + const { model, messages, temperature = 0.7, maxTokens } = options; + + const completion = await openai.chat.completions.create({ + model, + messages, + temperature, + max_tokens: maxTokens, + }); + + return { + text: completion.choices[0].message.content || "", + usage: completion.usage, + }; +} \ No newline at end of file diff --git a/apps/api/src/routes/v1.ts b/apps/api/src/routes/v1.ts index 7901cce2..2a99f8aa 100644 --- a/apps/api/src/routes/v1.ts +++ b/apps/api/src/routes/v1.ts @@ -29,6 +29,8 @@ import { creditUsageController } from "../controllers/v1/credit-usage"; import { BLOCKLISTED_URL_MESSAGE } from "../lib/strings"; import { searchController } from "../controllers/v1/search"; import { crawlErrorsController } from "../controllers/v1/crawl-errors"; +import { deepResearchController } from "../controllers/v1/deep-research"; +import { deepResearchStatusController } from "../controllers/v1/deep-research-status"; function checkCreditsMiddleware( minimum?: number, @@ -240,6 +242,19 @@ v1Router.get( wrap(extractStatusController), ); +v1Router.post( + "/deep-research", + authMiddleware(RateLimiterMode.Extract), + checkCreditsMiddleware(1), + wrap(deepResearchController), +); + +v1Router.get( + "/deep-research/:jobId", + authMiddleware(RateLimiterMode.ExtractStatus), + wrap(deepResearchStatusController), +); + // v1Router.post("/crawlWebsitePreview", crawlPreviewController); v1Router.delete( diff --git a/apps/api/src/scraper/scrapeURL/transformers/llmExtract.test.ts b/apps/api/src/scraper/scrapeURL/transformers/llmExtract.test.ts index 844a7fbc..a3f3e04c 100644 --- a/apps/api/src/scraper/scrapeURL/transformers/llmExtract.test.ts +++ b/apps/api/src/scraper/scrapeURL/transformers/llmExtract.test.ts @@ -1,4 +1,10 @@ import { removeDefaultProperty } from "./llmExtract"; +import { truncateText } from "./llmExtract"; +import { encoding_for_model } from "@dqbd/tiktoken"; + +jest.mock("@dqbd/tiktoken", () => ({ + encoding_for_model: jest.fn(), +})); describe("removeDefaultProperty", () => { it("should remove the default property from a simple object", () => { @@ -39,3 +45,96 @@ describe("removeDefaultProperty", () => { expect(removeDefaultProperty(123)).toBe(123); }); }); + +describe("truncateText", () => { + const mockEncode = jest.fn(); + const mockEncoder = { + encode: mockEncode, + }; + + beforeEach(() => { + jest.clearAllMocks(); + (encoding_for_model as jest.Mock).mockReturnValue(mockEncoder); + }); + + it("should return the original text if it's within token limit", () => { + const text = "This is a short text"; + mockEncode.mockReturnValue(new Array(5)); // Simulate 5 tokens + + const result = truncateText(text, 10); + expect(result).toBe(text); + expect(mockEncode).toHaveBeenCalledWith(text); + }); + + it("should truncate text that exceeds token limit", () => { + const text = "This is a longer text that needs truncation"; + mockEncode.mockReturnValue(new Array(20)); // Simulate 20 tokens + + const result = truncateText(text, 10); + expect(result.length).toBeLessThan(text.length); + expect(mockEncode).toHaveBeenCalled(); + }); + + it("should handle empty string", () => { + const text = ""; + mockEncode.mockReturnValue([]); + + const result = truncateText(text, 10); + expect(result).toBe(""); + expect(mockEncode).toHaveBeenCalledWith(""); + }); + + it("should use character-based fallback when encoder throws error", () => { + const text = "This is some text"; + mockEncode.mockImplementation(() => { + throw new Error("Encoder error"); + }); + + const result = truncateText(text, 5); + // With modifier of 3, should truncate to approximately 15 characters + expect(result.length).toBeLessThanOrEqual(15); + }); + + it("should handle very short max token limits", () => { + const text = "Short text"; + mockEncode.mockReturnValue(new Array(10)); + + const result = truncateText(text, 1); + expect(result.length).toBeLessThan(text.length); + }); + + it("should handle zero max tokens", () => { + const text = "Some text"; + mockEncode.mockReturnValue(new Array(2)); + + const result = truncateText(text, 0); + expect(result).toBe(""); + }); + + it("should handle extremely large text exceeding model context", () => { + // Create a very large text (e.g., 100,000 characters) + const text = "a".repeat(100000); + + // First call: simulate 25000 tokens + mockEncode.mockReturnValueOnce(new Array(25000)); + // Subsequent calls: simulate gradually decreasing token counts + // This simulates the iterative truncation process + mockEncode + .mockReturnValueOnce(new Array(20000)) + .mockReturnValueOnce(new Array(15000)) + .mockReturnValueOnce(new Array(12000)) + .mockReturnValueOnce(new Array(9000)); + + const result = truncateText(text, 10000); // Common model context limit + + // The result should be significantly shorter but not empty + expect(result.length).toBeLessThan(text.length); + expect(result.length).toBeGreaterThan(0); + // Given our new conservative approach, we should have a substantial amount of text + expect(result.length).toBeGreaterThan(30000); // At least 30% of original + expect(mockEncode).toHaveBeenCalled(); + + // Log the actual length for verification + console.log("Result length:", result.length, "characters"); + }); +}); diff --git a/apps/api/src/scraper/scrapeURL/transformers/llmExtract.ts b/apps/api/src/scraper/scrapeURL/transformers/llmExtract.ts index 0ee7f733..f8084b76 100644 --- a/apps/api/src/scraper/scrapeURL/transformers/llmExtract.ts +++ b/apps/api/src/scraper/scrapeURL/transformers/llmExtract.ts @@ -86,6 +86,38 @@ function normalizeSchema(x: any): any { } } +export function truncateText(text: string, maxTokens: number): string { + const modifier = 3; // Estimate: 1 token ≈ 3-4 characters for safety + try { + const encoder = encoding_for_model("gpt-4o"); + // Continuously trim the text until its token count is within the limit. + while (true) { + const tokens = encoder.encode(text); + if (tokens.length <= maxTokens) { + return text; + } + // Calculate a new length using a more conservative approach + // Instead of scaling the entire text, we'll remove a smaller portion + const ratio = maxTokens / tokens.length; + const newLength = Math.max( + Math.ceil(text.length * ratio), + Math.floor(text.length * 0.8) // Never remove more than 20% at once + ); + if (newLength <= 0) { + return ""; + } + text = text.slice(0, newLength); + } + } catch (error) { + // Fallback using character-based estimation. + if (text.length <= maxTokens * modifier) { + return text; + } + return text.slice(0, maxTokens * modifier); + } +} + + export async function generateOpenAICompletions( logger: Logger, options: ExtractOptions, diff --git a/apps/api/src/services/queue-service.ts b/apps/api/src/services/queue-service.ts index e0653090..1526ec25 100644 --- a/apps/api/src/services/queue-service.ts +++ b/apps/api/src/services/queue-service.ts @@ -6,6 +6,7 @@ let scrapeQueue: Queue; let extractQueue: Queue; let loggingQueue: Queue; let indexQueue: Queue; +let deepResearchQueue: Queue; export const redisConnection = new IORedis(process.env.REDIS_URL!, { maxRetriesPerRequest: null, @@ -15,6 +16,7 @@ export const scrapeQueueName = "{scrapeQueue}"; export const extractQueueName = "{extractQueue}"; export const loggingQueueName = "{loggingQueue}"; export const indexQueueName = "{indexQueue}"; +export const deepResearchQueueName = "{deepResearchQueue}"; export function getScrapeQueue() { if (!scrapeQueue) { @@ -70,6 +72,24 @@ export function getIndexQueue() { return indexQueue; } +export function getDeepResearchQueue() { + if (!deepResearchQueue) { + deepResearchQueue = new Queue(deepResearchQueueName, { + connection: redisConnection, + defaultJobOptions: { + removeOnComplete: { + age: 90000, // 25 hours + }, + removeOnFail: { + age: 90000, // 25 hours + }, + }, + }); + logger.info("Deep research queue created"); + } + return deepResearchQueue; +} + // === REMOVED IN FAVOR OF POLLING -- NOT RELIABLE // import { QueueEvents } from 'bullmq'; // export const scrapeQueueEvents = new QueueEvents(scrapeQueueName, { connection: redisConnection.duplicate() }); diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index b6d101f7..124194bf 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -5,9 +5,11 @@ import { CustomError } from "../lib/custom-error"; import { getScrapeQueue, getExtractQueue, + getDeepResearchQueue, redisConnection, scrapeQueueName, extractQueueName, + deepResearchQueueName, getIndexQueue, } from "./queue-service"; import { startWebScraperPipeline } from "../main/runWebScraper"; @@ -65,6 +67,8 @@ import { normalizeUrl, normalizeUrlOnlyHostname } from "../lib/canonical-url"; import { saveExtract, updateExtract } from "../lib/extract/extract-redis"; import { billTeam } from "./billing/credit_billing"; import { saveCrawlMap } from "./indexing/crawl-maps-index"; +import { updateDeepResearch } from "../lib/deep-research/deep-research-redis"; +import { performDeepResearch } from "../lib/deep-research/deep-research-service"; configDotenv(); @@ -372,6 +376,76 @@ const processExtractJobInternal = async ( } }; +const processDeepResearchJobInternal = async ( + token: string, + job: Job & { id: string }, +) => { + const logger = _logger.child({ + module: "deep-research-worker", + method: "processJobInternal", + jobId: job.id, + researchId: job.data.researchId, + teamId: job.data?.teamId ?? undefined, + }); + + const extendLockInterval = setInterval(async () => { + logger.info(`🔄 Worker extending lock on job ${job.id}`); + await job.extendLock(token, jobLockExtensionTime); + }, jobLockExtendInterval); + + try { + console.log("[Deep Research] Starting deep research: ", job.data.researchId); + const result = await performDeepResearch({ + researchId: job.data.researchId, + teamId: job.data.teamId, + plan: job.data.plan, + topic: job.data.request.topic, + maxDepth: job.data.request.maxDepth, + timeLimit: job.data.request.timeLimit, + }); + + if(result.success) { + // Move job to completed state in Redis and update research status + await job.moveToCompleted(result, token, false); + return result; + } else { + // If the deep research failed but didn't throw an error + const error = new Error("Deep research failed without specific error"); + await updateDeepResearch(job.data.researchId, { + status: "failed", + error: error.message, + }); + await job.moveToFailed(error, token, false); + + return { success: false, error: error.message }; + } + } catch (error) { + logger.error(`🚫 Job errored ${job.id} - ${error}`, { error }); + + Sentry.captureException(error, { + data: { + job: job.id, + }, + }); + + try { + // Move job to failed state in Redis + await job.moveToFailed(error, token, false); + } catch (e) { + logger.error("Failed to move job to failed state in Redis", { error }); + } + + await updateDeepResearch(job.data.researchId, { + status: "failed", + error: error.message || "Unknown error occurred", + }); + + return { success: false, error: error.message || "Unknown error occurred" }; + } finally { + clearInterval(extendLockInterval); + } +}; + let isShuttingDown = false; process.on("SIGINT", () => { @@ -1090,11 +1164,12 @@ async function processJob(job: Job & { id: string }, token: string) { // wsq.on("resumed", j => ScrapeEvents.logJobEvent(j, "resumed")); // wsq.on("removed", j => ScrapeEvents.logJobEvent(j, "removed")); -// Start both workers +// Start all workers (async () => { await Promise.all([ workerFun(getScrapeQueue(), processJobInternal), workerFun(getExtractQueue(), processExtractJobInternal), + workerFun(getDeepResearchQueue(), processDeepResearchJobInternal), ]); console.log("All workers exited. Waiting for all jobs to finish..."); diff --git a/apps/js-sdk/firecrawl/package.json b/apps/js-sdk/firecrawl/package.json index c0ff4543..a3f3d015 100644 --- a/apps/js-sdk/firecrawl/package.json +++ b/apps/js-sdk/firecrawl/package.json @@ -1,6 +1,6 @@ { "name": "@mendable/firecrawl-js", - "version": "1.17.0", + "version": "1.18.0-beta.8", "description": "JavaScript SDK for Firecrawl API", "main": "dist/index.js", "types": "dist/index.d.ts", diff --git a/apps/js-sdk/firecrawl/src/index.ts b/apps/js-sdk/firecrawl/src/index.ts index ad22c2a3..f5b64948 100644 --- a/apps/js-sdk/firecrawl/src/index.ts +++ b/apps/js-sdk/firecrawl/src/index.ts @@ -348,6 +348,70 @@ export interface CrawlErrorsResponse { robotsBlocked: string[]; }; +/** + * Parameters for deep research operations. + * Defines options for conducting deep research on a topic. + */ +export interface DeepResearchParams { + /** + * Maximum depth of research iterations (1-10) + * @default 7 + */ + maxDepth?: number; + /** + * Time limit in seconds (30-300) + * @default 270 + */ + timeLimit?: number; + /** + * Experimental flag for streaming steps + */ + __experimental_streamSteps?: boolean; +} + +/** + * Response interface for deep research operations. + */ +export interface DeepResearchResponse { + success: boolean; + id: string; +} + +/** + * Status response interface for deep research operations. + */ +export interface DeepResearchStatusResponse { + success: boolean; + data: { + findings: Array<{ + text: string; + source: string; + }>; + finalAnalysis: string; + analysis: string; + completedSteps: number; + totalSteps: number; + }; + status: "processing" | "completed" | "failed"; + error?: string; + expiresAt: string; + currentDepth: number; + maxDepth: number; + activities: Array<{ + type: string; + status: string; + message: string; + timestamp: string; + depth: number; + }>; + sources: Array<{ + url: string; + title: string; + description: string; + }>; + summaries: string[]; +} + /** * Main class for interacting with the Firecrawl API. * Provides methods for scraping, searching, crawling, and mapping web content. @@ -1281,6 +1345,119 @@ export default class FirecrawlApp { ); } } + + /** + * Initiates a deep research operation on a given topic and polls until completion. + * @param params - Parameters for the deep research operation. + * @returns The final research results. + */ + async __deepResearch(topic: string, params: DeepResearchParams): Promise { + try { + const response = await this.__asyncDeepResearch(topic, params); + + if (!response.success || 'error' in response) { + return { success: false, error: 'error' in response ? response.error : 'Unknown error' }; + } + + if (!response.id) { + throw new FirecrawlError(`Failed to start research. No job ID returned.`, 500); + } + + const jobId = response.id; + let researchStatus; + + while (true) { + // console.log("Checking research status..."); + researchStatus = await this.__checkDeepResearchStatus(jobId); + // console.log("Research status:", researchStatus); + + if ('error' in researchStatus && !researchStatus.success) { + return researchStatus; + } + + if (researchStatus.status === "completed") { + return researchStatus; + } + + if (researchStatus.status === "failed") { + throw new FirecrawlError( + `Research job ${researchStatus.status}. Error: ${researchStatus.error}`, + 500 + ); + } + + if (researchStatus.status !== "processing") { + break; + } + + await new Promise(resolve => setTimeout(resolve, 2000)); + } + // console.log("Research status finished:", researchStatus); + + return { success: false, error: "Research job terminated unexpectedly" }; + } catch (error: any) { + throw new FirecrawlError(error.message, 500, error.response?.data?.details); + } + } + + /** + * Initiates a deep research operation on a given topic without polling. + * @param params - Parameters for the deep research operation. + * @returns The response containing the research job ID. + */ + async __asyncDeepResearch(topic: string, params: DeepResearchParams): Promise { + const headers = this.prepareHeaders(); + try { + const response: AxiosResponse = await this.postRequest( + `${this.apiUrl}/v1/deep-research`, + { topic, ...params }, + headers + ); + + if (response.status === 200) { + return response.data; + } else { + this.handleError(response, "start deep research"); + } + } catch (error: any) { + if (error.response?.data?.error) { + throw new FirecrawlError(`Request failed with status code ${error.response.status}. Error: ${error.response.data.error} ${error.response.data.details ? ` - ${JSON.stringify(error.response.data.details)}` : ''}`, error.response.status); + } else { + throw new FirecrawlError(error.message, 500); + } + } + return { success: false, error: "Internal server error." }; + } + + /** + * Checks the status of a deep research operation. + * @param id - The ID of the deep research operation. + * @returns The current status and results of the research operation. + */ + async __checkDeepResearchStatus(id: string): Promise { + const headers = this.prepareHeaders(); + try { + const response: AxiosResponse = await this.getRequest( + `${this.apiUrl}/v1/deep-research/${id}`, + headers + ); + + if (response.status === 200) { + return response.data; + } else if (response.status === 404) { + throw new FirecrawlError("Deep research job not found", 404); + } else { + this.handleError(response, "check deep research status"); + } + } catch (error: any) { + if (error.response?.data?.error) { + throw new FirecrawlError(`Request failed with status code ${error.response.status}. Error: ${error.response.data.error} ${error.response.data.details ? ` - ${JSON.stringify(error.response.data.details)}` : ''}`, error.response.status); + } else { + throw new FirecrawlError(error.message, 500); + } + } + return { success: false, error: "Internal server error." }; + } } interface CrawlWatcherEvents {