Added implementation for saving docs on supabase

- TODO: remove the comments on `log_job.ts` before deploying to prod
This commit is contained in:
rafaelsideguide 2024-06-26 18:23:28 -03:00
parent 3b92fb8433
commit c40da77be0
6 changed files with 64 additions and 12 deletions

View File

@ -3,6 +3,7 @@ import { authenticateUser } from "./auth";
import { RateLimiterMode } from "../../src/types"; import { RateLimiterMode } from "../../src/types";
import { addWebScraperJob } from "../../src/services/queue-jobs"; import { addWebScraperJob } from "../../src/services/queue-jobs";
import { getWebScraperQueue } from "../../src/services/queue-service"; import { getWebScraperQueue } from "../../src/services/queue-service";
import { supabaseGetJobById } from "../../src/lib/supabase-jobs";
export async function crawlStatusController(req: Request, res: Response) { export async function crawlStatusController(req: Request, res: Response) {
try { try {
@ -20,15 +21,27 @@ export async function crawlStatusController(req: Request, res: Response) {
} }
const { current, current_url, total, current_step, partialDocs } = await job.progress(); const { current, current_url, total, current_step, partialDocs } = await job.progress();
let data = job.returnvalue;
if (process.env.USE_DB_AUTHENTICATION) {
const supabaseData = await supabaseGetJobById(req.params.jobId);
if (supabaseData) {
data = supabaseData.docs;
}
}
const jobStatus = await job.getState();
res.json({ res.json({
status: await job.getState(), status: jobStatus,
// progress: job.progress(), // progress: job.progress(),
current: current, current,
current_url: current_url, current_url,
current_step: current_step, current_step,
total: total, total,
data: job.returnvalue, data,
partial_data: partialDocs ?? [], partial_data: jobStatus == 'completed' ? [] : partialDocs,
}); });
} catch (error) { } catch (error) {
console.error(error); console.error(error);

View File

@ -0,0 +1,20 @@
import { supabase_service } from "../services/supabase";
export const supabaseGetJobById = async (jobId: string) => {
const { data, error } = await supabase_service
.from('firecrawl_jobs')
.select('*')
.eq('job_id', jobId)
.single();
if (error) {
console.error('Error while fetching supabase for job:', jobId, 'error:', error);
return null;
}
if (!data) {
return null;
}
return data;
}

View File

@ -4,6 +4,7 @@ import { WebScraperDataProvider } from "../scraper/WebScraper";
import { DocumentUrl, Progress } from "../lib/entities"; import { DocumentUrl, Progress } from "../lib/entities";
import { billTeam } from "../services/billing/credit_billing"; import { billTeam } from "../services/billing/credit_billing";
import { Document } from "../lib/entities"; import { Document } from "../lib/entities";
import { supabase_service } from "../services/supabase";
export async function startWebScraperPipeline({ export async function startWebScraperPipeline({
job, job,
@ -26,7 +27,7 @@ export async function startWebScraperPipeline({
} }
}, },
onSuccess: (result) => { onSuccess: (result) => {
job.moveToCompleted(result); saveJob(job, result);
}, },
onError: (error) => { onError: (error) => {
job.moveToFailed(error); job.moveToFailed(error);
@ -107,3 +108,17 @@ export async function runWebScraper({
return { success: false, message: error.message, docs: [] }; return { success: false, message: error.message, docs: [] };
} }
} }
const saveJob = async (job: Job, result: any) => {
if (process.env.USE_DB_AUTHENTICATION) {
const { data, error } = await supabase_service
.from("firecrawl_jobs")
.update({ docs: result })
.eq("job_id", job.id);
job.moveToCompleted(null); // returnvalue
} else {
job.moveToCompleted(result); // returnvalue
}
}

View File

@ -7,14 +7,15 @@ import "dotenv/config";
export async function logJob(job: FirecrawlJob) { export async function logJob(job: FirecrawlJob) {
try { try {
// Only log jobs in production // Only log jobs in production
if (process.env.ENV !== "production") { // if (process.env.ENV !== "production") {
return; // return;
} // }
const { data, error } = await supabase_service const { data, error } = await supabase_service
.from("firecrawl_jobs") .from("firecrawl_jobs")
.insert([ .insert([
{ {
job_id: job.job_id ? job.job_id : null,
success: job.success, success: job.success,
message: job.message, message: job.message,
num_docs: job.num_docs, num_docs: job.num_docs,

View File

@ -41,6 +41,7 @@ getWebScraperQueue().process(
await callWebhook(job.data.team_id, job.id as string, data); await callWebhook(job.data.team_id, job.id as string, data);
await logJob({ await logJob({
job_id: job.id as string,
success: success, success: success,
message: message, message: message,
num_docs: docs.length, num_docs: docs.length,
@ -80,6 +81,7 @@ getWebScraperQueue().process(
}; };
await callWebhook(job.data.team_id, job.id as string, data); await callWebhook(job.data.team_id, job.id as string, data);
await logJob({ await logJob({
job_id: job.id as string,
success: false, success: false,
message: typeof error === 'string' ? error : (error.message ?? "Something went wrong... Contact help@mendable.ai"), message: typeof error === 'string' ? error : (error.message ?? "Something went wrong... Contact help@mendable.ai"),
num_docs: 0, num_docs: 0,

View File

@ -48,6 +48,7 @@ export interface RunWebScraperResult {
} }
export interface FirecrawlJob { export interface FirecrawlJob {
job_id?: string;
success: boolean; success: boolean;
message: string; message: string;
num_docs: number; num_docs: number;