crawl status and document stuff

This commit is contained in:
Gergő Móricz 2024-08-16 22:47:56 +02:00
parent 0c057bb649
commit f20328bdbb
7 changed files with 144 additions and 88 deletions

View File

@ -1,68 +1,85 @@
import { Response } from "express";
import { v4 as uuidv4 } from "uuid";
import { RequestWithAuth } from "./types";
import { CrawlStatusParams, CrawlStatusResponse, ErrorResponse, legacyDocumentConverter, RequestWithAuth } from "./types";
import { getCrawl, getCrawlExpiry, getCrawlJobs, getDoneJobsOrdered, getDoneJobsOrderedLength } from "../../lib/crawl-redis";
import { getScrapeQueue } from "../../services/queue-service";
import { supabaseGetJobById } from "../../lib/supabase-jobs";
export async function crawlStatusController(req: RequestWithAuth, res: Response) {
// const job = await getWebScraperQueue().getJob(req.params.jobId);
// if (!job) {
// return res.status(404).json({ error: "Job not found" });
// }
async function getJob(id: string) {
console.log("getting job", id);
const job = await getScrapeQueue().getJob(id);
if (!job) return job;
// const { current, current_url, total, current_step, partialDocs } = await job.progress();
if (process.env.USE_DB_AUTHENTICATION === "true") {
const supabaseData = await supabaseGetJobById(id);
// let data = job.returnvalue;
// if (process.env.USE_DB_AUTHENTICATION === "true") {
// const supabaseData = await supabaseGetJobById(req.params.jobId);
// if (supabaseData) {
// data = supabaseData.docs;
// }
// }
// const jobStatus = await job.getState();
// mock:
const id = uuidv4();
const result = {
totalCount: 100,
creditsUsed: 2,
expiresAt: new Date(Date.now() + 24 * 60 * 60 * 1000).getTime(),
status: "scraping", // scraping, completed, failed
next: `${req.protocol}://${req.get("host")}/v1/crawl/${id}`,
data: [{
markdown: "test",
content: "test",
html: "test",
rawHtml: "test",
linksOnPage: ["test1", "test2"],
screenshot: "test",
metadata: {
title: "test",
description: "test",
language: "test",
sourceURL: "test",
statusCode: 200,
error: "test"
if (supabaseData) {
job.returnvalue = supabaseData.docs;
}
},
{
markdown: "test",
content: "test",
html: "test",
rawHtml: "test",
linksOnPage: ["test1", "test2"],
screenshot: "test",
metadata: {
title: "test",
description: "test",
language: "test",
sourceURL: "test",
statusCode: 200,
error: "test"
}
}]
}
res.status(200).json(result);
job.returnvalue = Array.isArray(job.returnvalue) ? job.returnvalue[0] : job.returnvalue;
return job;
}
export async function crawlStatusController(req: RequestWithAuth<CrawlStatusParams, undefined, CrawlStatusResponse>, res: Response<CrawlStatusResponse>) {
const sc = await getCrawl(req.params.jobId);
if (!sc) {
return res.status(404).json({ success: false, error: "Job not found" });
}
if (sc.team_id !== req.auth.team_id) {
return res.status(403).json({ success: false, error: "Forbidden" });
}
const start = typeof req.query.skip === "string" ? parseInt(req.query.skip, 10) : 0;
const end = typeof req.query.limit === "string" ? (start + parseInt(req.query.limit, 10) - 1) : undefined;
const jobIDs = await getCrawlJobs(req.params.jobId);
const jobStatuses = await Promise.all(jobIDs.map(x => getScrapeQueue().getJobState(x)));
const status: Exclude<CrawlStatusResponse, ErrorResponse>["status"] = sc.cancelled ? "cancelled" : jobStatuses.every(x => x === "completed") ? "completed" : jobStatuses.some(x => x === "failed") ? "failed" : "scraping";
const doneJobsLength = await getDoneJobsOrderedLength(req.params.jobId);
const doneJobsOrder = await getDoneJobsOrdered(req.params.jobId, start, end ?? -1);
let doneJobs = [];
if (end === undefined) { // determine 10 megabyte limit
let bytes = 0, used = 0;
while (bytes < 10485760 && used < doneJobsOrder.length) {
const job = await getJob(doneJobsOrder[used]);
doneJobs.push(job);
bytes += JSON.stringify(legacyDocumentConverter(job.returnvalue)).length;
used++;
}
doneJobs.splice(doneJobs.length - 1, 1);
used--;
} else {
doneJobs = (await Promise.all(doneJobsOrder.map(async x => await getJob(x))));
}
const data = doneJobs.map(x => x.returnvalue);
const nextURL = new URL(`${req.protocol}://${req.get("host")}/v1/crawl/${req.params.jobId}`);
nextURL.searchParams.set("skip", (start + data.length).toString());
if (typeof req.query.limit === "string") {
nextURL.searchParams.set("limit", req.query.limit);
}
res.status(200).json({
status,
totalCount: jobIDs.length,
creditsUsed: jobIDs.length,
expiresAt: (await getCrawlExpiry(req.params.jobId)).toISOString(),
next:
status !== "scraping" && (start + data.length) === doneJobsLength // if there's not gonna be any documents after this
? undefined
: nextURL.href,
data: data.map(x => legacyDocumentConverter(x)),
});
}

View File

@ -7,7 +7,7 @@ import { getScrapeQueue } from "../../services/queue-service";
import { addScrapeJob } from "../../services/queue-jobs";
import { Logger } from "../../lib/logger";
export async function crawlController(req: RequestWithAuth<CrawlResponse, CrawlRequest>, res: Response<CrawlResponse>) {
export async function crawlController(req: RequestWithAuth<{}, CrawlResponse, CrawlRequest>, res: Response<CrawlResponse>) {
req.body = crawlRequestSchema.parse(req.body);
const id = uuidv4();

View File

@ -4,7 +4,7 @@ import { checkAndUpdateURL } from "../../../src/lib/validateUrl";
import { MapRequest, mapRequestSchema, MapResponse, RequestWithAuth } from "./types";
import { checkTeamCredits } from "../../services/billing/credit_billing";
export async function mapController(req: RequestWithAuth<MapResponse, MapRequest>, res: Response<MapResponse>) {
export async function mapController(req: RequestWithAuth<{}, MapResponse, MapRequest>, res: Response<MapResponse>) {
req.body = mapRequestSchema.parse(req.body);
console.log(req.body);
// expected req.body

View File

@ -1,6 +1,6 @@
import { Request, Response } from "express";
import { Logger } from '../../lib/logger';
import { Document, legacyScrapeOptions, RequestWithAuth, ScrapeRequest, scrapeRequestSchema, ScrapeResponse } from "./types";
import { Document, legacyDocumentConverter, legacyScrapeOptions, RequestWithAuth, ScrapeRequest, scrapeRequestSchema, ScrapeResponse } from "./types";
import { billTeam } from "../../services/billing/credit_billing";
import { v4 as uuidv4 } from 'uuid';
import { numTokensFromString } from "../../lib/LLM-extraction/helpers";
@ -8,7 +8,7 @@ import { addScrapeJob } from "../../services/queue-jobs";
import { scrapeQueueEvents } from '../../services/queue-service';
import { logJob } from "../../services/logging/log_job";
export async function scrapeController(req: RequestWithAuth<ScrapeResponse, ScrapeRequest>, res: Response<ScrapeResponse>) {
export async function scrapeController(req: RequestWithAuth<{}, ScrapeResponse, ScrapeRequest>, res: Response<ScrapeResponse>) {
req.body = scrapeRequestSchema.parse(req.body);
let earlyReturn = false;
@ -101,20 +101,6 @@ export async function scrapeController(req: RequestWithAuth<ScrapeResponse, Scra
return res.status(200).json({
success: true,
data: {
markdown: doc.markdown,
links: doc.linksOnPage,
rawHtml: doc.rawHtml,
html: doc.html,
screenshot: doc.screenshot,
fullPageScreenshot: doc.fullPageScreenshot,
metadata: {
...doc.metadata,
pageError: undefined,
pageStatusCode: undefined,
error: doc.metadata.pageError,
statusCode: doc.metadata.pageStatusCode,
},
} as Document
data: legacyDocumentConverter(doc),
});
}

View File

@ -171,16 +171,29 @@ export type MapResponse = ErrorResponse | {
links: string[];
}
export type CrawlStatusParams = {
jobId: string;
}
export type CrawlStatusResponse = ErrorResponse | {
status: "scraping" | "completed" | "failed" | "cancelled",
totalCount: number;
creditsUsed: number;
expiresAt: string;
next?: string;
data: Document[];
}
type AuthObject = {
team_id: string;
plan: string;
}
export interface RequestWithMaybeAuth<ReqBody = undefined, ResBody = undefined> extends Request<{}, ReqBody, ResBody> {
export interface RequestWithMaybeAuth<ReqParams = {}, ReqBody = undefined, ResBody = undefined> extends Request<ReqParams, ReqBody, ResBody> {
auth?: AuthObject;
}
export interface RequestWithAuth<ReqBody = undefined, ResBody = undefined> extends Request<{}, ReqBody, ResBody> {
export interface RequestWithAuth<ReqParams = {}, ReqBody = undefined, ResBody = undefined> extends Request<ReqParams, ReqBody, ResBody> {
auth: AuthObject;
}
@ -211,3 +224,20 @@ export function legacyScrapeOptions(x: ScrapeOptions): PageOptions {
parsePDF: x.parsePDF
};
}
export function legacyDocumentConverter(doc: any): Document {
return {
markdown: doc.markdown,
links: doc.linksOnPage,
rawHtml: doc.rawHtml,
html: doc.html,
screenshot: doc.screenshot ?? doc.fullPageScreenshot,
metadata: {
...doc.metadata,
pageError: undefined,
pageStatusCode: undefined,
error: doc.metadata.pageError,
statusCode: doc.metadata.pageStatusCode,
},
}
}

View File

@ -26,6 +26,13 @@ export async function getCrawl(id: string): Promise<StoredCrawl | null> {
return JSON.parse(x);
}
export async function getCrawlExpiry(id: string): Promise<Date> {
const d = new Date();
const ttl = await redisConnection.ttl(id);
d.setSeconds(d.getSeconds() + ttl);
return d;
}
export async function addCrawlJob(id: string, job_id: string) {
await redisConnection.sadd("crawl:" + id + ":jobs", job_id);
await redisConnection.expire("crawl:" + id + ":jobs", 24 * 60 * 60, "NX");
@ -38,7 +45,17 @@ export async function addCrawlJobs(id: string, job_ids: string[]) {
export async function addCrawlJobDone(id: string, job_id: string) {
await redisConnection.sadd("crawl:" + id + ":jobs_done", job_id);
await redisConnection.lpush("crawl:" + id + ":jobs_done_ordered", job_id);
await redisConnection.expire("crawl:" + id + ":jobs_done", 24 * 60 * 60, "NX");
await redisConnection.expire("crawl:" + id + ":jobs_done_ordered", 24 * 60 * 60, "NX");
}
export async function getDoneJobsOrderedLength(id: string): Promise<number> {
return await redisConnection.llen("crawl:" + id + ":jobs_done_ordered");
}
export async function getDoneJobsOrdered(id: string, start = 0, end = -1): Promise<string[]> {
return await redisConnection.lrange("crawl:" + id + ":jobs_done_ordered", start, end);
}
export async function isCrawlFinished(id: string) {

View File

@ -60,12 +60,7 @@ function idempotencyMiddleware(req: Request, res: Response, next: NextFunction)
if (!isIdempotencyValid) {
return res.status(409).json({ success: false, error: "Idempotency key already used" });
}
// try {
createIdempotencyKey(req);
// } catch (error) {
// Logger.error(error);
// return res.status(500).json({ success: false, error: error.message });
// }
}
next();
})()
@ -128,7 +123,18 @@ v1Router.use((err: unknown, req: Request<{}, ErrorResponse, undefined>, res: Res
res.status(400).json({ success: false, error: "Bad Request", details: err.errors });
} else {
const id = uuidv4();
Logger.error("Error occurred in request! (" + req.path + ") -- ID " + id + " -- " + JSON.stringify(err));
let verbose = JSON.stringify(err);
if (verbose === "{}") {
if (err instanceof Error) {
verbose = JSON.stringify({
message: err.message,
name: err.name,
stack: err.stack,
});
}
}
Logger.error("Error occurred in request! (" + req.path + ") -- ID " + id + " -- " + verbose);
res.status(500).json({ success: false, error: "An unexpected error occurred. Please contact hello@firecrawl.com for help. Your exception ID is " + id + "" });
}
});