diff --git a/apps/api/src/controllers/v1/extract-status.ts b/apps/api/src/controllers/v1/extract-status.ts index feba30ec..a42f17fa 100644 --- a/apps/api/src/controllers/v1/extract-status.ts +++ b/apps/api/src/controllers/v1/extract-status.ts @@ -1,21 +1,39 @@ import { Response } from "express"; import { supabaseGetJobsById } from "../../lib/supabase-jobs"; import { RequestWithAuth } from "./types"; +import { getExtract, getExtractExpiry } from "../../lib/extract/extract-redis"; export async function extractStatusController( req: RequestWithAuth<{ jobId: string }, any, any>, res: Response, ) { - const jobData = await supabaseGetJobsById([req.params.jobId]); - if (!jobData || jobData.length === 0) { + const extract = await getExtract(req.params.jobId); + + if (!extract) { return res.status(404).json({ success: false, - error: "Job not found", + error: "Extract job not found", }); } + let data: any[] = []; + + if (extract.status === "completed") { + const jobData = await supabaseGetJobsById([req.params.jobId]); + if (!jobData || jobData.length === 0) { + return res.status(404).json({ + success: false, + error: "Job not found", + }); + } + + data = jobData[0].docs; + } + return res.status(200).json({ success: true, - data: jobData[0].docs, + data: data, + status: extract.status, + expiresAt: (await getExtractExpiry(req.params.jobId)).toISOString(), }); } diff --git a/apps/api/src/controllers/v1/extract.ts b/apps/api/src/controllers/v1/extract.ts index 2838779b..6668d33b 100644 --- a/apps/api/src/controllers/v1/extract.ts +++ b/apps/api/src/controllers/v1/extract.ts @@ -7,7 +7,7 @@ import { } from "./types"; import { getExtractQueue } from "../../services/queue-service"; import * as Sentry from "@sentry/node"; -import { v4 as uuidv4 } from "uuid"; +import { saveExtract } from "../../lib/extract/extract-redis"; /** * Extracts data from the provided URLs based on the request parameters. @@ -23,14 +23,6 @@ export async function extractController( const selfHosted = process.env.USE_DB_AUTHENTICATION !== "true"; req.body = extractRequestSchema.parse(req.body); - if (!req.auth.plan) { - return res.status(400).json({ - success: false, - error: "No plan specified", - urlTrace: [], - }); - } - const extractId = crypto.randomUUID(); const jobData = { request: req.body, @@ -40,6 +32,14 @@ export async function extractController( extractId, }; + await saveExtract(extractId, { + id: extractId, + team_id: req.auth.team_id, + plan: req.auth.plan, + createdAt: Date.now(), + status: "processing", + }); + if (Sentry.isInitialized()) { const size = JSON.stringify(jobData).length; await Sentry.startSpan( diff --git a/apps/api/src/lib/extract/extract-redis.ts b/apps/api/src/lib/extract/extract-redis.ts new file mode 100644 index 00000000..2d9410f9 --- /dev/null +++ b/apps/api/src/lib/extract/extract-redis.ts @@ -0,0 +1,37 @@ +import { redisConnection } from "../../services/queue-service"; +import { logger as _logger } from "../logger"; + +export type StoredExtract = { + id: string; + team_id: string; + plan?: string; + createdAt: number; + status: "processing" | "completed" | "failed" | "cancelled"; +}; + +export async function saveExtract(id: string, extract: StoredExtract) { + _logger.debug("Saving extract " + id + " to Redis..."); + await redisConnection.set("extract:" + id, JSON.stringify(extract)); + await redisConnection.expire("extract:" + id, 24 * 60 * 60, "NX"); +} + +export async function getExtract(id: string): Promise { + const x = await redisConnection.get("extract:" + id); + return x ? JSON.parse(x) : null; +} + +export async function updateExtract(id: string, extract: Partial) { + const current = await getExtract(id); + if (!current) return; + await redisConnection.set("extract:" + id, JSON.stringify({ ...current, ...extract })); + await redisConnection.expire("extract:" + id, 24 * 60 * 60, "NX"); +} + + +export async function getExtractExpiry(id: string): Promise { + const d = new Date(); + const ttl = await redisConnection.pttl("extract:" + id); + d.setMilliseconds(d.getMilliseconds() + ttl); + d.setMilliseconds(0); + return d; +} diff --git a/apps/api/src/lib/extract/extraction-service.ts b/apps/api/src/lib/extract/extraction-service.ts index 6a98ef94..e4893bc9 100644 --- a/apps/api/src/lib/extract/extraction-service.ts +++ b/apps/api/src/lib/extract/extraction-service.ts @@ -9,6 +9,7 @@ import { billTeam } from "../../services/billing/credit_billing"; import { logJob } from "../../services/logging/log_job"; import { _addScrapeJobToBullMQ } from "../../services/queue-jobs"; import { saveCrawl, StoredCrawl } from "../crawl-redis"; +import { updateExtract } from "./extract-redis"; interface ExtractServiceOptions { request: ExtractRequest; @@ -202,8 +203,16 @@ export async function performExtraction(extractId, options: ExtractServiceOption scrapeOptions: request, origin: request.origin ?? "api", num_tokens: completions.numTokens ?? 0, + }).then(() => { + updateExtract(extractId, { + status: "completed", + }).catch((error) => { + logger.error(`Failed to update extract ${extractId} status to completed: ${error}`); + }); }); + + return { success: true, data: completions.extract ?? {}, diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index a8a2e408..96fad2d2 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -353,6 +353,7 @@ const processExtractJobInternal = async (token: string, job: Job & { id: string await job.moveToFailed(error, token, false); // throw error; } finally { + clearInterval(extendLockInterval); } };