This commit is contained in:
Nicolas 2025-01-07 16:16:01 -03:00
parent 86e34d7c6c
commit eb254547e5
5 changed files with 78 additions and 13 deletions

View File

@ -1,21 +1,39 @@
import { Response } from "express";
import { supabaseGetJobsById } from "../../lib/supabase-jobs";
import { RequestWithAuth } from "./types";
import { getExtract, getExtractExpiry } from "../../lib/extract/extract-redis";
export async function extractStatusController(
req: RequestWithAuth<{ jobId: string }, any, any>,
res: Response,
) {
const jobData = await supabaseGetJobsById([req.params.jobId]);
if (!jobData || jobData.length === 0) {
const extract = await getExtract(req.params.jobId);
if (!extract) {
return res.status(404).json({
success: false,
error: "Job not found",
error: "Extract job not found",
});
}
let data: any[] = [];
if (extract.status === "completed") {
const jobData = await supabaseGetJobsById([req.params.jobId]);
if (!jobData || jobData.length === 0) {
return res.status(404).json({
success: false,
error: "Job not found",
});
}
data = jobData[0].docs;
}
return res.status(200).json({
success: true,
data: jobData[0].docs,
data: data,
status: extract.status,
expiresAt: (await getExtractExpiry(req.params.jobId)).toISOString(),
});
}

View File

@ -7,7 +7,7 @@ import {
} from "./types";
import { getExtractQueue } from "../../services/queue-service";
import * as Sentry from "@sentry/node";
import { v4 as uuidv4 } from "uuid";
import { saveExtract } from "../../lib/extract/extract-redis";
/**
* Extracts data from the provided URLs based on the request parameters.
@ -23,14 +23,6 @@ export async function extractController(
const selfHosted = process.env.USE_DB_AUTHENTICATION !== "true";
req.body = extractRequestSchema.parse(req.body);
if (!req.auth.plan) {
return res.status(400).json({
success: false,
error: "No plan specified",
urlTrace: [],
});
}
const extractId = crypto.randomUUID();
const jobData = {
request: req.body,
@ -40,6 +32,14 @@ export async function extractController(
extractId,
};
await saveExtract(extractId, {
id: extractId,
team_id: req.auth.team_id,
plan: req.auth.plan,
createdAt: Date.now(),
status: "processing",
});
if (Sentry.isInitialized()) {
const size = JSON.stringify(jobData).length;
await Sentry.startSpan(

View File

@ -0,0 +1,37 @@
import { redisConnection } from "../../services/queue-service";
import { logger as _logger } from "../logger";
export type StoredExtract = {
id: string;
team_id: string;
plan?: string;
createdAt: number;
status: "processing" | "completed" | "failed" | "cancelled";
};
export async function saveExtract(id: string, extract: StoredExtract) {
_logger.debug("Saving extract " + id + " to Redis...");
await redisConnection.set("extract:" + id, JSON.stringify(extract));
await redisConnection.expire("extract:" + id, 24 * 60 * 60, "NX");
}
export async function getExtract(id: string): Promise<StoredExtract | null> {
const x = await redisConnection.get("extract:" + id);
return x ? JSON.parse(x) : null;
}
export async function updateExtract(id: string, extract: Partial<StoredExtract>) {
const current = await getExtract(id);
if (!current) return;
await redisConnection.set("extract:" + id, JSON.stringify({ ...current, ...extract }));
await redisConnection.expire("extract:" + id, 24 * 60 * 60, "NX");
}
export async function getExtractExpiry(id: string): Promise<Date> {
const d = new Date();
const ttl = await redisConnection.pttl("extract:" + id);
d.setMilliseconds(d.getMilliseconds() + ttl);
d.setMilliseconds(0);
return d;
}

View File

@ -9,6 +9,7 @@ import { billTeam } from "../../services/billing/credit_billing";
import { logJob } from "../../services/logging/log_job";
import { _addScrapeJobToBullMQ } from "../../services/queue-jobs";
import { saveCrawl, StoredCrawl } from "../crawl-redis";
import { updateExtract } from "./extract-redis";
interface ExtractServiceOptions {
request: ExtractRequest;
@ -202,8 +203,16 @@ export async function performExtraction(extractId, options: ExtractServiceOption
scrapeOptions: request,
origin: request.origin ?? "api",
num_tokens: completions.numTokens ?? 0,
}).then(() => {
updateExtract(extractId, {
status: "completed",
}).catch((error) => {
logger.error(`Failed to update extract ${extractId} status to completed: ${error}`);
});
});
return {
success: true,
data: completions.extract ?? {},

View File

@ -353,6 +353,7 @@ const processExtractJobInternal = async (token: string, job: Job & { id: string
await job.moveToFailed(error, token, false);
// throw error;
} finally {
clearInterval(extendLockInterval);
}
};