From 22d4f0de38b7a32397e645a0e4c6a917c48c5134 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Sun, 2 Mar 2025 17:33:21 -0300 Subject: [PATCH] (feat/deep-research) Alpha prep + Improvements (#1284) * Update index.ts * Nick: * Nick: topic -> query * Update deep-research.ts * Nick: bump * Nick: add onSource callback --- .../unit/deep-research-redis.test.ts | 136 +++++++++++++++ .../controllers/v1/deep-research-status.ts | 8 +- apps/api/src/controllers/v1/deep-research.ts | 14 +- .../deep-research/deep-research-service.ts | 12 +- apps/api/src/services/queue-worker.ts | 2 +- apps/js-sdk/firecrawl/package.json | 2 +- apps/js-sdk/firecrawl/src/index.ts | 154 ++++++++++++++++- apps/python-sdk/firecrawl/__init__.py | 2 +- apps/python-sdk/firecrawl/firecrawl.py | 155 +++++++++++++++++- 9 files changed, 469 insertions(+), 16 deletions(-) create mode 100644 apps/api/src/__tests__/deep-research/unit/deep-research-redis.test.ts diff --git a/apps/api/src/__tests__/deep-research/unit/deep-research-redis.test.ts b/apps/api/src/__tests__/deep-research/unit/deep-research-redis.test.ts new file mode 100644 index 00000000..13b4d994 --- /dev/null +++ b/apps/api/src/__tests__/deep-research/unit/deep-research-redis.test.ts @@ -0,0 +1,136 @@ +import { redisConnection } from "../../../services/queue-service"; +import { + saveDeepResearch, + getDeepResearch, + updateDeepResearch, + getDeepResearchExpiry, + StoredDeepResearch, +} from "../../../lib/deep-research/deep-research-redis"; + +jest.mock("../../../services/queue-service", () => ({ + redisConnection: { + set: jest.fn(), + get: jest.fn(), + expire: jest.fn(), + pttl: jest.fn(), + }, +})); + +describe("Deep Research Redis Operations", () => { + const mockResearch: StoredDeepResearch = { + id: "test-id", + team_id: "team-1", + plan: "pro", + createdAt: Date.now(), + status: "processing", + currentDepth: 0, + maxDepth: 5, + completedSteps: 0, + totalExpectedSteps: 25, + findings: [], + sources: [], + activities: [], + summaries: [], + }; + + beforeEach(() => { + jest.clearAllMocks(); + }); + + describe("saveDeepResearch", () => { + it("should save research data to Redis with TTL", async () => { + await saveDeepResearch("test-id", mockResearch); + + expect(redisConnection.set).toHaveBeenCalledWith( + "deep-research:test-id", + JSON.stringify(mockResearch) + ); + expect(redisConnection.expire).toHaveBeenCalledWith( + "deep-research:test-id", + 6 * 60 * 60 + ); + }); + }); + + describe("getDeepResearch", () => { + it("should retrieve research data from Redis", async () => { + (redisConnection.get as jest.Mock).mockResolvedValue( + JSON.stringify(mockResearch) + ); + + const result = await getDeepResearch("test-id"); + expect(result).toEqual(mockResearch); + expect(redisConnection.get).toHaveBeenCalledWith("deep-research:test-id"); + }); + + it("should return null when research not found", async () => { + (redisConnection.get as jest.Mock).mockResolvedValue(null); + + const result = await getDeepResearch("non-existent-id"); + expect(result).toBeNull(); + }); + }); + + describe("updateDeepResearch", () => { + it("should update existing research with new data", async () => { + (redisConnection.get as jest.Mock).mockResolvedValue( + JSON.stringify(mockResearch) + ); + + const update = { + status: "completed" as const, + finalAnalysis: "Test analysis", + activities: [ + { + type: "search" as const, + status: "complete" as const, + message: "New activity", + timestamp: new Date().toISOString(), + depth: 1, + }, + ], + }; + + await updateDeepResearch("test-id", update); + + const expectedUpdate = { + ...mockResearch, + ...update, + activities: [...mockResearch.activities, ...update.activities], + }; + + expect(redisConnection.set).toHaveBeenCalledWith( + "deep-research:test-id", + JSON.stringify(expectedUpdate) + ); + expect(redisConnection.expire).toHaveBeenCalledWith( + "deep-research:test-id", + 6 * 60 * 60 + ); + }); + + it("should do nothing if research not found", async () => { + (redisConnection.get as jest.Mock).mockResolvedValue(null); + + await updateDeepResearch("test-id", { status: "completed" }); + + expect(redisConnection.set).not.toHaveBeenCalled(); + expect(redisConnection.expire).not.toHaveBeenCalled(); + }); + }); + + describe("getDeepResearchExpiry", () => { + it("should return correct expiry date", async () => { + const mockTTL = 3600000; // 1 hour in milliseconds + (redisConnection.pttl as jest.Mock).mockResolvedValue(mockTTL); + + const result = await getDeepResearchExpiry("test-id"); + + expect(result).toBeInstanceOf(Date); + expect(result.getTime()).toBeCloseTo( + new Date().getTime() + mockTTL, + -2 // Allow 100ms precision + ); + }); + }); +}); \ No newline at end of file diff --git a/apps/api/src/controllers/v1/deep-research-status.ts b/apps/api/src/controllers/v1/deep-research-status.ts index 06fff76c..bae49d0a 100644 --- a/apps/api/src/controllers/v1/deep-research-status.ts +++ b/apps/api/src/controllers/v1/deep-research-status.ts @@ -21,7 +21,10 @@ export async function deepResearchStatusController( let data: any = null; - if (research.status === "completed" && process.env.USE_DB_AUTHENTICATION === "true") { + if ( + research.status === "completed" && + process.env.USE_DB_AUTHENTICATION === "true" + ) { const jobData = await supabaseGetJobsById([req.params.jobId]); if (jobData && jobData.length > 0) { data = jobData[0].docs[0]; @@ -42,8 +45,11 @@ export async function deepResearchStatusController( currentDepth: research.currentDepth, maxDepth: research.maxDepth, status: research.status, + totalUrls: research.sources.length, // DO NOT remove - backwards compatibility + //@deprecated activities: research.activities, + //@deprecated sources: research.sources, // summaries: research.summaries, }); diff --git a/apps/api/src/controllers/v1/deep-research.ts b/apps/api/src/controllers/v1/deep-research.ts index e26715de..df3f49c8 100644 --- a/apps/api/src/controllers/v1/deep-research.ts +++ b/apps/api/src/controllers/v1/deep-research.ts @@ -6,12 +6,18 @@ import { saveDeepResearch } from "../../lib/deep-research/deep-research-redis"; import { z } from "zod"; export const deepResearchRequestSchema = z.object({ - topic: z.string().describe('The topic or question to research'), - maxDepth: z.number().min(1).max(10).default(7).describe('Maximum depth of research iterations'), + query: z.string().describe('The query or topic to search for').optional(), + maxDepth: z.number().min(1).max(12).default(7).describe('Maximum depth of research iterations'), maxUrls: z.number().min(1).max(1000).default(20).describe('Maximum number of URLs to analyze'), timeLimit: z.number().min(30).max(600).default(300).describe('Time limit in seconds'), - __experimental_streamSteps: z.boolean().optional(), -}); + // @deprecated Use query instead + topic: z.string().describe('The topic or question to research').optional(), +}).refine(data => data.query || data.topic, { + message: "Either query or topic must be provided" +}).transform(data => ({ + ...data, + query: data.topic || data.query // Use topic as query if provided +})); export type DeepResearchRequest = z.infer; diff --git a/apps/api/src/lib/deep-research/deep-research-service.ts b/apps/api/src/lib/deep-research/deep-research-service.ts index 7de4609c..eb9150d3 100644 --- a/apps/api/src/lib/deep-research/deep-research-service.ts +++ b/apps/api/src/lib/deep-research/deep-research-service.ts @@ -4,14 +4,13 @@ import { PlanType } from "../../types"; import { searchAndScrapeSearchResult } from "../../controllers/v1/search"; import { ResearchLLMService, ResearchStateManager } from "./research-manager"; import { logJob } from "../../services/logging/log_job"; -import { updateExtract } from "../extract/extract-redis"; import { billTeam } from "../../services/billing/credit_billing"; interface DeepResearchServiceOptions { researchId: string; teamId: string; plan: string; - topic: string; + query: string; maxDepth: number; maxUrls: number; timeLimit: number; @@ -21,7 +20,7 @@ interface DeepResearchServiceOptions { export async function performDeepResearch(options: DeepResearchServiceOptions) { const { researchId, teamId, plan, timeLimit, subId, maxUrls } = options; const startTime = Date.now(); - let currentTopic = options.topic; + let currentTopic = options.query; let urlsAnalyzed = 0; const logger = _logger.child({ @@ -38,7 +37,7 @@ export async function performDeepResearch(options: DeepResearchServiceOptions) { plan, options.maxDepth, logger, - options.topic, + options.query, ); const llmService = new ResearchLLMService(logger); @@ -260,7 +259,7 @@ export async function performDeepResearch(options: DeepResearchServiceOptions) { }); const finalAnalysis = await llmService.generateFinalAnalysis( - options.topic, + options.query, state.getFindings(), state.getSummaries(), ); @@ -286,7 +285,7 @@ export async function performDeepResearch(options: DeepResearchServiceOptions) { time_taken: (Date.now() - startTime) / 1000, team_id: teamId, mode: "deep-research", - url: options.topic, + url: options.query, scrapeOptions: options, origin: "api", num_tokens: 0, @@ -308,6 +307,7 @@ export async function performDeepResearch(options: DeepResearchServiceOptions) { success: true, data: { finalAnalysis: finalAnalysis, + sources: state.getSources(), }, }; } catch (error: any) { diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index 8f6c5d15..86375ec2 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -407,7 +407,7 @@ const processDeepResearchJobInternal = async ( researchId: job.data.researchId, teamId: job.data.teamId, plan: job.data.plan, - topic: job.data.request.topic, + query: job.data.request.query, maxDepth: job.data.request.maxDepth, timeLimit: job.data.request.timeLimit, subId: job.data.subId, diff --git a/apps/js-sdk/firecrawl/package.json b/apps/js-sdk/firecrawl/package.json index 74770c4d..3fc252a0 100644 --- a/apps/js-sdk/firecrawl/package.json +++ b/apps/js-sdk/firecrawl/package.json @@ -1,6 +1,6 @@ { "name": "@mendable/firecrawl-js", - "version": "1.18.6", + "version": "1.19.0", "description": "JavaScript SDK for Firecrawl API", "main": "dist/index.js", "types": "dist/index.d.ts", diff --git a/apps/js-sdk/firecrawl/src/index.ts b/apps/js-sdk/firecrawl/src/index.ts index 4fae6010..6d3093b8 100644 --- a/apps/js-sdk/firecrawl/src/index.ts +++ b/apps/js-sdk/firecrawl/src/index.ts @@ -351,7 +351,7 @@ export interface CrawlErrorsResponse { /** * Parameters for deep research operations. - * Defines options for conducting deep research on a topic. + * Defines options for conducting deep research on a query. */ export interface DeepResearchParams { /** @@ -1400,6 +1400,156 @@ export default class FirecrawlApp { } /** + * Initiates a deep research operation on a given query and polls until completion. + * @param query - The query to research. + * @param params - Parameters for the deep research operation. + * @param onActivity - Optional callback to receive activity updates in real-time. + * @param onSource - Optional callback to receive source updates in real-time. + * @returns The final research results. + */ + async deepResearch( + query: string, + params: DeepResearchParams, + onActivity?: (activity: { + type: string; + status: string; + message: string; + timestamp: string; + depth: number; + }) => void, + onSource?: (source: { + url: string; + title?: string; + description?: string; + icon?: string; + }) => void + ): Promise { + try { + const response = await this.asyncDeepResearch(query, params); + + if (!response.success || 'error' in response) { + return { success: false, error: 'error' in response ? response.error : 'Unknown error' }; + } + + if (!response.id) { + throw new FirecrawlError(`Failed to start research. No job ID returned.`, 500); + } + + const jobId = response.id; + let researchStatus; + let lastActivityCount = 0; + let lastSourceCount = 0; + + while (true) { + researchStatus = await this.checkDeepResearchStatus(jobId); + + if ('error' in researchStatus && !researchStatus.success) { + return researchStatus; + } + + // Stream new activities through the callback if provided + if (onActivity && researchStatus.activities) { + const newActivities = researchStatus.activities.slice(lastActivityCount); + for (const activity of newActivities) { + onActivity(activity); + } + lastActivityCount = researchStatus.activities.length; + } + + // Stream new sources through the callback if provided + if (onSource && researchStatus.sources) { + const newSources = researchStatus.sources.slice(lastSourceCount); + for (const source of newSources) { + onSource(source); + } + lastSourceCount = researchStatus.sources.length; + } + + if (researchStatus.status === "completed") { + return researchStatus; + } + + if (researchStatus.status === "failed") { + throw new FirecrawlError( + `Research job ${researchStatus.status}. Error: ${researchStatus.error}`, + 500 + ); + } + + if (researchStatus.status !== "processing") { + break; + } + + await new Promise(resolve => setTimeout(resolve, 2000)); + } + + return { success: false, error: "Research job terminated unexpectedly" }; + } catch (error: any) { + throw new FirecrawlError(error.message, 500, error.response?.data?.details); + } + } + + /** + * Initiates a deep research operation on a given query without polling. + * @param params - Parameters for the deep research operation. + * @returns The response containing the research job ID. + */ + async asyncDeepResearch(query: string, params: DeepResearchParams): Promise { + const headers = this.prepareHeaders(); + try { + const response: AxiosResponse = await this.postRequest( + `${this.apiUrl}/v1/deep-research`, + { query, ...params }, + headers + ); + + if (response.status === 200) { + return response.data; + } else { + this.handleError(response, "start deep research"); + } + } catch (error: any) { + if (error.response?.data?.error) { + throw new FirecrawlError(`Request failed with status code ${error.response.status}. Error: ${error.response.data.error} ${error.response.data.details ? ` - ${JSON.stringify(error.response.data.details)}` : ''}`, error.response.status); + } else { + throw new FirecrawlError(error.message, 500); + } + } + return { success: false, error: "Internal server error." }; + } + + /** + * Checks the status of a deep research operation. + * @param id - The ID of the deep research operation. + * @returns The current status and results of the research operation. + */ + async checkDeepResearchStatus(id: string): Promise { + const headers = this.prepareHeaders(); + try { + const response: AxiosResponse = await this.getRequest( + `${this.apiUrl}/v1/deep-research/${id}`, + headers + ); + + if (response.status === 200) { + return response.data; + } else if (response.status === 404) { + throw new FirecrawlError("Deep research job not found", 404); + } else { + this.handleError(response, "check deep research status"); + } + } catch (error: any) { + if (error.response?.data?.error) { + throw new FirecrawlError(`Request failed with status code ${error.response.status}. Error: ${error.response.data.error} ${error.response.data.details ? ` - ${JSON.stringify(error.response.data.details)}` : ''}`, error.response.status); + } else { + throw new FirecrawlError(error.message, 500); + } + } + return { success: false, error: "Internal server error." }; + } + + /** + * @deprecated Use deepResearch() instead * Initiates a deep research operation on a given topic and polls until completion. * @param topic - The topic to research. * @param params - Parameters for the deep research operation. @@ -1473,6 +1623,7 @@ export default class FirecrawlApp { } /** + * @deprecated Use asyncDeepResearch() instead * Initiates a deep research operation on a given topic without polling. * @param params - Parameters for the deep research operation. * @returns The response containing the research job ID. @@ -1502,6 +1653,7 @@ export default class FirecrawlApp { } /** + * @deprecated Use checkDeepResearchStatus() instead * Checks the status of a deep research operation. * @param id - The ID of the deep research operation. * @returns The current status and results of the research operation. diff --git a/apps/python-sdk/firecrawl/__init__.py b/apps/python-sdk/firecrawl/__init__.py index 9dd7793f..62983a48 100644 --- a/apps/python-sdk/firecrawl/__init__.py +++ b/apps/python-sdk/firecrawl/__init__.py @@ -13,7 +13,7 @@ import os from .firecrawl import FirecrawlApp # noqa -__version__ = "1.12.0" +__version__ = "1.13.0" # Define the logger for the Firecrawl project logger: logging.Logger = logging.getLogger("firecrawl") diff --git a/apps/python-sdk/firecrawl/firecrawl.py b/apps/python-sdk/firecrawl/firecrawl.py index 9364635b..8531395f 100644 --- a/apps/python-sdk/firecrawl/firecrawl.py +++ b/apps/python-sdk/firecrawl/firecrawl.py @@ -12,7 +12,7 @@ Classes: import logging import os import time -from typing import Any, Dict, Optional, List, Union +from typing import Any, Dict, Optional, List, Union, Callable import json import requests @@ -41,6 +41,38 @@ class GenerateLLMsTextParams(pydantic.BaseModel): showFullText: Optional[bool] = False __experimental_stream: Optional[bool] = None +class DeepResearchParams(pydantic.BaseModel): + """ + Parameters for the deep research operation. + """ + maxDepth: Optional[int] = 7 + timeLimit: Optional[int] = 270 + maxUrls: Optional[int] = 20 + __experimental_streamSteps: Optional[bool] = None + +class DeepResearchResponse(pydantic.BaseModel): + """ + Response from the deep research operation. + """ + success: bool + id: str + error: Optional[str] = None + +class DeepResearchStatusResponse(pydantic.BaseModel): + """ + Status response from the deep research operation. + """ + success: bool + data: Optional[Dict[str, Any]] = None + status: str + error: Optional[str] = None + expiresAt: str + currentDepth: int + maxDepth: int + activities: List[Dict[str, Any]] + sources: List[Dict[str, Any]] + summaries: List[str] + class FirecrawlApp: class SearchResponse(pydantic.BaseModel): """ @@ -1066,6 +1098,127 @@ class FirecrawlApp: # Raise an HTTPError with the custom message and attach the response raise requests.exceptions.HTTPError(message, response=response) + def deep_research(self, query: str, params: Optional[Union[Dict[str, Any], DeepResearchParams]] = None, + on_activity: Optional[Callable[[Dict[str, Any]], None]] = None, + on_source: Optional[Callable[[Dict[str, Any]], None]] = None) -> Dict[str, Any]: + """ + Initiates a deep research operation on a given query and polls until completion. + + Args: + query (str): The query to research. + params (Optional[Union[Dict[str, Any], DeepResearchParams]]): Parameters for the deep research operation. + on_activity (Optional[Callable[[Dict[str, Any]], None]]): Optional callback to receive activity updates in real-time. + + Returns: + Dict[str, Any]: The final research results. + + Raises: + Exception: If the research operation fails. + """ + if params is None: + params = {} + + if isinstance(params, dict): + research_params = DeepResearchParams(**params) + else: + research_params = params + + response = self.async_deep_research(query, research_params) + if not response.get('success') or 'id' not in response: + return response + + job_id = response['id'] + while True: + status = self.check_deep_research_status(job_id) + + if on_activity and 'activities' in status: + for activity in status['activities']: + on_activity(activity) + + if on_source and 'sources' in status: + for source in status['sources']: + on_source(source) + + if status['status'] == 'completed': + return status + elif status['status'] == 'failed': + raise Exception(f'Deep research failed. Error: {status.get("error")}') + elif status['status'] != 'processing': + break + + time.sleep(2) # Polling interval + + return {'success': False, 'error': 'Deep research job terminated unexpectedly'} + + def async_deep_research(self, query: str, params: Optional[Union[Dict[str, Any], DeepResearchParams]] = None) -> Dict[str, Any]: + """ + Initiates an asynchronous deep research operation. + + Args: + query (str): The query to research. + params (Optional[Union[Dict[str, Any], DeepResearchParams]]): Parameters for the deep research operation. + + Returns: + Dict[str, Any]: The response from the deep research initiation. + + Raises: + Exception: If the research initiation fails. + """ + if params is None: + params = {} + + if isinstance(params, dict): + research_params = DeepResearchParams(**params) + else: + research_params = params + + headers = self._prepare_headers() + json_data = {'query': query, **research_params.dict(exclude_none=True)} + + try: + response = self._post_request(f'{self.api_url}/v1/research', json_data, headers) + if response.status_code == 200: + try: + return response.json() + except: + raise Exception('Failed to parse Firecrawl response as JSON.') + else: + self._handle_error(response, 'start deep research') + except Exception as e: + raise ValueError(str(e)) + + return {'success': False, 'error': 'Internal server error'} + + def check_deep_research_status(self, id: str) -> Dict[str, Any]: + """ + Check the status of a deep research operation. + + Args: + id (str): The ID of the deep research operation. + + Returns: + Dict[str, Any]: The current status and results of the research operation. + + Raises: + Exception: If the status check fails. + """ + headers = self._prepare_headers() + try: + response = self._get_request(f'{self.api_url}/v1/research/{id}', headers) + if response.status_code == 200: + try: + return response.json() + except: + raise Exception('Failed to parse Firecrawl response as JSON.') + elif response.status_code == 404: + raise Exception('Deep research job not found') + else: + self._handle_error(response, 'check deep research status') + except Exception as e: + raise ValueError(str(e)) + + return {'success': False, 'error': 'Internal server error'} + class CrawlWatcher: def __init__(self, id: str, app: FirecrawlApp): self.id = id