(feat/deep-research) Alpha prep + Improvements (#1284)

* Update index.ts

* Nick:

* Nick: topic -> query

* Update deep-research.ts

* Nick: bump

* Nick: add onSource callback
This commit is contained in:
Nicolas 2025-03-02 17:33:21 -03:00 committed by GitHub
parent 9ad947884d
commit 22d4f0de38
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 469 additions and 16 deletions

View File

@ -0,0 +1,136 @@
import { redisConnection } from "../../../services/queue-service";
import {
saveDeepResearch,
getDeepResearch,
updateDeepResearch,
getDeepResearchExpiry,
StoredDeepResearch,
} from "../../../lib/deep-research/deep-research-redis";
jest.mock("../../../services/queue-service", () => ({
redisConnection: {
set: jest.fn(),
get: jest.fn(),
expire: jest.fn(),
pttl: jest.fn(),
},
}));
describe("Deep Research Redis Operations", () => {
const mockResearch: StoredDeepResearch = {
id: "test-id",
team_id: "team-1",
plan: "pro",
createdAt: Date.now(),
status: "processing",
currentDepth: 0,
maxDepth: 5,
completedSteps: 0,
totalExpectedSteps: 25,
findings: [],
sources: [],
activities: [],
summaries: [],
};
beforeEach(() => {
jest.clearAllMocks();
});
describe("saveDeepResearch", () => {
it("should save research data to Redis with TTL", async () => {
await saveDeepResearch("test-id", mockResearch);
expect(redisConnection.set).toHaveBeenCalledWith(
"deep-research:test-id",
JSON.stringify(mockResearch)
);
expect(redisConnection.expire).toHaveBeenCalledWith(
"deep-research:test-id",
6 * 60 * 60
);
});
});
describe("getDeepResearch", () => {
it("should retrieve research data from Redis", async () => {
(redisConnection.get as jest.Mock).mockResolvedValue(
JSON.stringify(mockResearch)
);
const result = await getDeepResearch("test-id");
expect(result).toEqual(mockResearch);
expect(redisConnection.get).toHaveBeenCalledWith("deep-research:test-id");
});
it("should return null when research not found", async () => {
(redisConnection.get as jest.Mock).mockResolvedValue(null);
const result = await getDeepResearch("non-existent-id");
expect(result).toBeNull();
});
});
describe("updateDeepResearch", () => {
it("should update existing research with new data", async () => {
(redisConnection.get as jest.Mock).mockResolvedValue(
JSON.stringify(mockResearch)
);
const update = {
status: "completed" as const,
finalAnalysis: "Test analysis",
activities: [
{
type: "search" as const,
status: "complete" as const,
message: "New activity",
timestamp: new Date().toISOString(),
depth: 1,
},
],
};
await updateDeepResearch("test-id", update);
const expectedUpdate = {
...mockResearch,
...update,
activities: [...mockResearch.activities, ...update.activities],
};
expect(redisConnection.set).toHaveBeenCalledWith(
"deep-research:test-id",
JSON.stringify(expectedUpdate)
);
expect(redisConnection.expire).toHaveBeenCalledWith(
"deep-research:test-id",
6 * 60 * 60
);
});
it("should do nothing if research not found", async () => {
(redisConnection.get as jest.Mock).mockResolvedValue(null);
await updateDeepResearch("test-id", { status: "completed" });
expect(redisConnection.set).not.toHaveBeenCalled();
expect(redisConnection.expire).not.toHaveBeenCalled();
});
});
describe("getDeepResearchExpiry", () => {
it("should return correct expiry date", async () => {
const mockTTL = 3600000; // 1 hour in milliseconds
(redisConnection.pttl as jest.Mock).mockResolvedValue(mockTTL);
const result = await getDeepResearchExpiry("test-id");
expect(result).toBeInstanceOf(Date);
expect(result.getTime()).toBeCloseTo(
new Date().getTime() + mockTTL,
-2 // Allow 100ms precision
);
});
});
});

View File

@ -21,7 +21,10 @@ export async function deepResearchStatusController(
let data: any = null;
if (research.status === "completed" && process.env.USE_DB_AUTHENTICATION === "true") {
if (
research.status === "completed" &&
process.env.USE_DB_AUTHENTICATION === "true"
) {
const jobData = await supabaseGetJobsById([req.params.jobId]);
if (jobData && jobData.length > 0) {
data = jobData[0].docs[0];
@ -42,8 +45,11 @@ export async function deepResearchStatusController(
currentDepth: research.currentDepth,
maxDepth: research.maxDepth,
status: research.status,
totalUrls: research.sources.length,
// DO NOT remove - backwards compatibility
//@deprecated
activities: research.activities,
//@deprecated
sources: research.sources,
// summaries: research.summaries,
});

View File

@ -6,12 +6,18 @@ import { saveDeepResearch } from "../../lib/deep-research/deep-research-redis";
import { z } from "zod";
export const deepResearchRequestSchema = z.object({
topic: z.string().describe('The topic or question to research'),
maxDepth: z.number().min(1).max(10).default(7).describe('Maximum depth of research iterations'),
query: z.string().describe('The query or topic to search for').optional(),
maxDepth: z.number().min(1).max(12).default(7).describe('Maximum depth of research iterations'),
maxUrls: z.number().min(1).max(1000).default(20).describe('Maximum number of URLs to analyze'),
timeLimit: z.number().min(30).max(600).default(300).describe('Time limit in seconds'),
__experimental_streamSteps: z.boolean().optional(),
});
// @deprecated Use query instead
topic: z.string().describe('The topic or question to research').optional(),
}).refine(data => data.query || data.topic, {
message: "Either query or topic must be provided"
}).transform(data => ({
...data,
query: data.topic || data.query // Use topic as query if provided
}));
export type DeepResearchRequest = z.infer<typeof deepResearchRequestSchema>;

View File

@ -4,14 +4,13 @@ import { PlanType } from "../../types";
import { searchAndScrapeSearchResult } from "../../controllers/v1/search";
import { ResearchLLMService, ResearchStateManager } from "./research-manager";
import { logJob } from "../../services/logging/log_job";
import { updateExtract } from "../extract/extract-redis";
import { billTeam } from "../../services/billing/credit_billing";
interface DeepResearchServiceOptions {
researchId: string;
teamId: string;
plan: string;
topic: string;
query: string;
maxDepth: number;
maxUrls: number;
timeLimit: number;
@ -21,7 +20,7 @@ interface DeepResearchServiceOptions {
export async function performDeepResearch(options: DeepResearchServiceOptions) {
const { researchId, teamId, plan, timeLimit, subId, maxUrls } = options;
const startTime = Date.now();
let currentTopic = options.topic;
let currentTopic = options.query;
let urlsAnalyzed = 0;
const logger = _logger.child({
@ -38,7 +37,7 @@ export async function performDeepResearch(options: DeepResearchServiceOptions) {
plan,
options.maxDepth,
logger,
options.topic,
options.query,
);
const llmService = new ResearchLLMService(logger);
@ -260,7 +259,7 @@ export async function performDeepResearch(options: DeepResearchServiceOptions) {
});
const finalAnalysis = await llmService.generateFinalAnalysis(
options.topic,
options.query,
state.getFindings(),
state.getSummaries(),
);
@ -286,7 +285,7 @@ export async function performDeepResearch(options: DeepResearchServiceOptions) {
time_taken: (Date.now() - startTime) / 1000,
team_id: teamId,
mode: "deep-research",
url: options.topic,
url: options.query,
scrapeOptions: options,
origin: "api",
num_tokens: 0,
@ -308,6 +307,7 @@ export async function performDeepResearch(options: DeepResearchServiceOptions) {
success: true,
data: {
finalAnalysis: finalAnalysis,
sources: state.getSources(),
},
};
} catch (error: any) {

View File

@ -407,7 +407,7 @@ const processDeepResearchJobInternal = async (
researchId: job.data.researchId,
teamId: job.data.teamId,
plan: job.data.plan,
topic: job.data.request.topic,
query: job.data.request.query,
maxDepth: job.data.request.maxDepth,
timeLimit: job.data.request.timeLimit,
subId: job.data.subId,

View File

@ -1,6 +1,6 @@
{
"name": "@mendable/firecrawl-js",
"version": "1.18.6",
"version": "1.19.0",
"description": "JavaScript SDK for Firecrawl API",
"main": "dist/index.js",
"types": "dist/index.d.ts",

View File

@ -351,7 +351,7 @@ export interface CrawlErrorsResponse {
/**
* Parameters for deep research operations.
* Defines options for conducting deep research on a topic.
* Defines options for conducting deep research on a query.
*/
export interface DeepResearchParams {
/**
@ -1400,6 +1400,156 @@ export default class FirecrawlApp {
}
/**
* Initiates a deep research operation on a given query and polls until completion.
* @param query - The query to research.
* @param params - Parameters for the deep research operation.
* @param onActivity - Optional callback to receive activity updates in real-time.
* @param onSource - Optional callback to receive source updates in real-time.
* @returns The final research results.
*/
async deepResearch(
query: string,
params: DeepResearchParams,
onActivity?: (activity: {
type: string;
status: string;
message: string;
timestamp: string;
depth: number;
}) => void,
onSource?: (source: {
url: string;
title?: string;
description?: string;
icon?: string;
}) => void
): Promise<DeepResearchStatusResponse | ErrorResponse> {
try {
const response = await this.asyncDeepResearch(query, params);
if (!response.success || 'error' in response) {
return { success: false, error: 'error' in response ? response.error : 'Unknown error' };
}
if (!response.id) {
throw new FirecrawlError(`Failed to start research. No job ID returned.`, 500);
}
const jobId = response.id;
let researchStatus;
let lastActivityCount = 0;
let lastSourceCount = 0;
while (true) {
researchStatus = await this.checkDeepResearchStatus(jobId);
if ('error' in researchStatus && !researchStatus.success) {
return researchStatus;
}
// Stream new activities through the callback if provided
if (onActivity && researchStatus.activities) {
const newActivities = researchStatus.activities.slice(lastActivityCount);
for (const activity of newActivities) {
onActivity(activity);
}
lastActivityCount = researchStatus.activities.length;
}
// Stream new sources through the callback if provided
if (onSource && researchStatus.sources) {
const newSources = researchStatus.sources.slice(lastSourceCount);
for (const source of newSources) {
onSource(source);
}
lastSourceCount = researchStatus.sources.length;
}
if (researchStatus.status === "completed") {
return researchStatus;
}
if (researchStatus.status === "failed") {
throw new FirecrawlError(
`Research job ${researchStatus.status}. Error: ${researchStatus.error}`,
500
);
}
if (researchStatus.status !== "processing") {
break;
}
await new Promise(resolve => setTimeout(resolve, 2000));
}
return { success: false, error: "Research job terminated unexpectedly" };
} catch (error: any) {
throw new FirecrawlError(error.message, 500, error.response?.data?.details);
}
}
/**
* Initiates a deep research operation on a given query without polling.
* @param params - Parameters for the deep research operation.
* @returns The response containing the research job ID.
*/
async asyncDeepResearch(query: string, params: DeepResearchParams): Promise<DeepResearchResponse | ErrorResponse> {
const headers = this.prepareHeaders();
try {
const response: AxiosResponse = await this.postRequest(
`${this.apiUrl}/v1/deep-research`,
{ query, ...params },
headers
);
if (response.status === 200) {
return response.data;
} else {
this.handleError(response, "start deep research");
}
} catch (error: any) {
if (error.response?.data?.error) {
throw new FirecrawlError(`Request failed with status code ${error.response.status}. Error: ${error.response.data.error} ${error.response.data.details ? ` - ${JSON.stringify(error.response.data.details)}` : ''}`, error.response.status);
} else {
throw new FirecrawlError(error.message, 500);
}
}
return { success: false, error: "Internal server error." };
}
/**
* Checks the status of a deep research operation.
* @param id - The ID of the deep research operation.
* @returns The current status and results of the research operation.
*/
async checkDeepResearchStatus(id: string): Promise<DeepResearchStatusResponse | ErrorResponse> {
const headers = this.prepareHeaders();
try {
const response: AxiosResponse = await this.getRequest(
`${this.apiUrl}/v1/deep-research/${id}`,
headers
);
if (response.status === 200) {
return response.data;
} else if (response.status === 404) {
throw new FirecrawlError("Deep research job not found", 404);
} else {
this.handleError(response, "check deep research status");
}
} catch (error: any) {
if (error.response?.data?.error) {
throw new FirecrawlError(`Request failed with status code ${error.response.status}. Error: ${error.response.data.error} ${error.response.data.details ? ` - ${JSON.stringify(error.response.data.details)}` : ''}`, error.response.status);
} else {
throw new FirecrawlError(error.message, 500);
}
}
return { success: false, error: "Internal server error." };
}
/**
* @deprecated Use deepResearch() instead
* Initiates a deep research operation on a given topic and polls until completion.
* @param topic - The topic to research.
* @param params - Parameters for the deep research operation.
@ -1473,6 +1623,7 @@ export default class FirecrawlApp {
}
/**
* @deprecated Use asyncDeepResearch() instead
* Initiates a deep research operation on a given topic without polling.
* @param params - Parameters for the deep research operation.
* @returns The response containing the research job ID.
@ -1502,6 +1653,7 @@ export default class FirecrawlApp {
}
/**
* @deprecated Use checkDeepResearchStatus() instead
* Checks the status of a deep research operation.
* @param id - The ID of the deep research operation.
* @returns The current status and results of the research operation.

View File

@ -13,7 +13,7 @@ import os
from .firecrawl import FirecrawlApp # noqa
__version__ = "1.12.0"
__version__ = "1.13.0"
# Define the logger for the Firecrawl project
logger: logging.Logger = logging.getLogger("firecrawl")

View File

@ -12,7 +12,7 @@ Classes:
import logging
import os
import time
from typing import Any, Dict, Optional, List, Union
from typing import Any, Dict, Optional, List, Union, Callable
import json
import requests
@ -41,6 +41,38 @@ class GenerateLLMsTextParams(pydantic.BaseModel):
showFullText: Optional[bool] = False
__experimental_stream: Optional[bool] = None
class DeepResearchParams(pydantic.BaseModel):
"""
Parameters for the deep research operation.
"""
maxDepth: Optional[int] = 7
timeLimit: Optional[int] = 270
maxUrls: Optional[int] = 20
__experimental_streamSteps: Optional[bool] = None
class DeepResearchResponse(pydantic.BaseModel):
"""
Response from the deep research operation.
"""
success: bool
id: str
error: Optional[str] = None
class DeepResearchStatusResponse(pydantic.BaseModel):
"""
Status response from the deep research operation.
"""
success: bool
data: Optional[Dict[str, Any]] = None
status: str
error: Optional[str] = None
expiresAt: str
currentDepth: int
maxDepth: int
activities: List[Dict[str, Any]]
sources: List[Dict[str, Any]]
summaries: List[str]
class FirecrawlApp:
class SearchResponse(pydantic.BaseModel):
"""
@ -1066,6 +1098,127 @@ class FirecrawlApp:
# Raise an HTTPError with the custom message and attach the response
raise requests.exceptions.HTTPError(message, response=response)
def deep_research(self, query: str, params: Optional[Union[Dict[str, Any], DeepResearchParams]] = None,
on_activity: Optional[Callable[[Dict[str, Any]], None]] = None,
on_source: Optional[Callable[[Dict[str, Any]], None]] = None) -> Dict[str, Any]:
"""
Initiates a deep research operation on a given query and polls until completion.
Args:
query (str): The query to research.
params (Optional[Union[Dict[str, Any], DeepResearchParams]]): Parameters for the deep research operation.
on_activity (Optional[Callable[[Dict[str, Any]], None]]): Optional callback to receive activity updates in real-time.
Returns:
Dict[str, Any]: The final research results.
Raises:
Exception: If the research operation fails.
"""
if params is None:
params = {}
if isinstance(params, dict):
research_params = DeepResearchParams(**params)
else:
research_params = params
response = self.async_deep_research(query, research_params)
if not response.get('success') or 'id' not in response:
return response
job_id = response['id']
while True:
status = self.check_deep_research_status(job_id)
if on_activity and 'activities' in status:
for activity in status['activities']:
on_activity(activity)
if on_source and 'sources' in status:
for source in status['sources']:
on_source(source)
if status['status'] == 'completed':
return status
elif status['status'] == 'failed':
raise Exception(f'Deep research failed. Error: {status.get("error")}')
elif status['status'] != 'processing':
break
time.sleep(2) # Polling interval
return {'success': False, 'error': 'Deep research job terminated unexpectedly'}
def async_deep_research(self, query: str, params: Optional[Union[Dict[str, Any], DeepResearchParams]] = None) -> Dict[str, Any]:
"""
Initiates an asynchronous deep research operation.
Args:
query (str): The query to research.
params (Optional[Union[Dict[str, Any], DeepResearchParams]]): Parameters for the deep research operation.
Returns:
Dict[str, Any]: The response from the deep research initiation.
Raises:
Exception: If the research initiation fails.
"""
if params is None:
params = {}
if isinstance(params, dict):
research_params = DeepResearchParams(**params)
else:
research_params = params
headers = self._prepare_headers()
json_data = {'query': query, **research_params.dict(exclude_none=True)}
try:
response = self._post_request(f'{self.api_url}/v1/research', json_data, headers)
if response.status_code == 200:
try:
return response.json()
except:
raise Exception('Failed to parse Firecrawl response as JSON.')
else:
self._handle_error(response, 'start deep research')
except Exception as e:
raise ValueError(str(e))
return {'success': False, 'error': 'Internal server error'}
def check_deep_research_status(self, id: str) -> Dict[str, Any]:
"""
Check the status of a deep research operation.
Args:
id (str): The ID of the deep research operation.
Returns:
Dict[str, Any]: The current status and results of the research operation.
Raises:
Exception: If the status check fails.
"""
headers = self._prepare_headers()
try:
response = self._get_request(f'{self.api_url}/v1/research/{id}', headers)
if response.status_code == 200:
try:
return response.json()
except:
raise Exception('Failed to parse Firecrawl response as JSON.')
elif response.status_code == 404:
raise Exception('Deep research job not found')
else:
self._handle_error(response, 'check deep research status')
except Exception as e:
raise ValueError(str(e))
return {'success': False, 'error': 'Internal server error'}
class CrawlWatcher:
def __init__(self, id: str, app: FirecrawlApp):
self.id = id