NIck: rm scrape events

This commit is contained in:
Nicolas 2025-04-23 14:19:25 -04:00
parent feda4dede7
commit 22f7efed35
2 changed files with 128 additions and 129 deletions

View File

@ -1,109 +1,109 @@
import { Job } from "bullmq"; // import { Job } from "bullmq";
import { supabase_service as supabase } from "../services/supabase"; // import { supabase_service as supabase } from "../services/supabase";
import { logger } from "./logger"; // import { logger } from "./logger";
import { configDotenv } from "dotenv"; // import { configDotenv } from "dotenv";
import { Engine } from "../scraper/scrapeURL/engines"; // import { Engine } from "../scraper/scrapeURL/engines";
configDotenv(); // configDotenv();
export type ScrapeErrorEvent = { // export type ScrapeErrorEvent = {
type: "error"; // type: "error";
message: string; // message: string;
stack?: string; // stack?: string;
}; // };
export type ScrapeScrapeEvent = { // export type ScrapeScrapeEvent = {
type: "scrape"; // type: "scrape";
url: string; // url: string;
worker?: string; // worker?: string;
method: Engine; // method: Engine;
result: null | { // result: null | {
success: boolean; // success: boolean;
response_code?: number; // response_code?: number;
response_size?: number; // response_size?: number;
error?: string | object; // error?: string | object;
// proxy?: string, // // proxy?: string,
time_taken: number; // time_taken: number;
}; // };
}; // };
export type ScrapeQueueEvent = { // export type ScrapeQueueEvent = {
type: "queue"; // type: "queue";
event: // event:
| "waiting" // | "waiting"
| "active" // | "active"
| "completed" // | "completed"
| "paused" // | "paused"
| "resumed" // | "resumed"
| "removed" // | "removed"
| "failed"; // | "failed";
worker?: string; // worker?: string;
}; // };
export type ScrapeEvent = // export type ScrapeEvent =
| ScrapeErrorEvent // | ScrapeErrorEvent
| ScrapeScrapeEvent // | ScrapeScrapeEvent
| ScrapeQueueEvent; // | ScrapeQueueEvent;
export class ScrapeEvents { // export class ScrapeEvents {
static async insert(jobId: string, content: ScrapeEvent) { // static async insert(jobId: string, content: ScrapeEvent) {
if (jobId === "TEST") return null; // if (jobId === "TEST") return null;
const useDbAuthentication = process.env.USE_DB_AUTHENTICATION === "true"; // const useDbAuthentication = process.env.USE_DB_AUTHENTICATION === "true";
if (useDbAuthentication) { // if (useDbAuthentication) {
try { // try {
const result = await supabase // const result = await supabase
.from("scrape_events") // .from("scrape_events")
.insert({ // .insert({
job_id: jobId, // job_id: jobId,
type: content.type, // type: content.type,
content: content, // content: content,
// created_at // // created_at
}) // })
.select() // .select()
.single(); // .single();
return (result.data as any).id; // return (result.data as any).id;
} catch (error) { // } catch (error) {
// logger.error(`Error inserting scrape event: ${error}`); // // logger.error(`Error inserting scrape event: ${error}`);
return null; // return null;
} // }
} // }
return null; // return null;
} // }
static async updateScrapeResult( // static async updateScrapeResult(
logId: number | null, // logId: number | null,
result: ScrapeScrapeEvent["result"], // result: ScrapeScrapeEvent["result"],
) { // ) {
if (logId === null) return; // if (logId === null) return;
try { // try {
const previousLog = ( // const previousLog = (
await supabase.from("scrape_events").select().eq("id", logId).single() // await supabase.from("scrape_events").select().eq("id", logId).single()
).data as any; // ).data as any;
await supabase // await supabase
.from("scrape_events") // .from("scrape_events")
.update({ // .update({
content: { // content: {
...previousLog.content, // ...previousLog.content,
result, // result,
}, // },
}) // })
.eq("id", logId); // .eq("id", logId);
} catch (error) { // } catch (error) {
logger.error(`Error updating scrape result: ${error}`); // logger.error(`Error updating scrape result: ${error}`);
} // }
} // }
static async logJobEvent(job: Job | any, event: ScrapeQueueEvent["event"]) { // static async logJobEvent(job: Job | any, event: ScrapeQueueEvent["event"]) {
try { // try {
await this.insert(((job as any).id ? (job as any).id : job) as string, { // await this.insert(((job as any).id ? (job as any).id : job) as string, {
type: "queue", // type: "queue",
event, // event,
worker: process.env.FLY_MACHINE_ID, // worker: process.env.FLY_MACHINE_ID,
}); // });
} catch (error) { // } catch (error) {
logger.error(`Error logging job event: ${error}`); // logger.error(`Error logging job event: ${error}`);
} // }
} // }
} // }

View File

@ -8,7 +8,6 @@ import { billTeam } from "../services/billing/credit_billing";
import { Document } from "../controllers/v1/types"; import { Document } from "../controllers/v1/types";
import { supabase_service } from "../services/supabase"; import { supabase_service } from "../services/supabase";
import { logger as _logger } from "../lib/logger"; import { logger as _logger } from "../lib/logger";
import { ScrapeEvents } from "../lib/scrape-events";
import { configDotenv } from "dotenv"; import { configDotenv } from "dotenv";
import { import {
EngineResultsTracker, EngineResultsTracker,
@ -146,35 +145,35 @@ export async function runWebScraper({
} }
} }
const engineOrder = Object.entries(engines) // const engineOrder = Object.entries(engines)
.sort((a, b) => a[1].startedAt - b[1].startedAt) // .sort((a, b) => a[1].startedAt - b[1].startedAt)
.map((x) => x[0]) as Engine[]; // .map((x) => x[0]) as Engine[];
for (const engine of engineOrder) { // for (const engine of engineOrder) {
const result = engines[engine] as Exclude< // const result = engines[engine] as Exclude<
EngineResultsTracker[Engine], // EngineResultsTracker[Engine],
undefined // undefined
>; // >;
ScrapeEvents.insert(bull_job_id, { // ScrapeEvents.insert(bull_job_id, {
type: "scrape", // type: "scrape",
url, // url,
method: engine, // method: engine,
result: { // result: {
success: result.state === "success", // success: result.state === "success",
response_code: // response_code:
result.state === "success" ? result.result.statusCode : undefined, // result.state === "success" ? result.result.statusCode : undefined,
response_size: // response_size:
result.state === "success" ? result.result.html.length : undefined, // result.state === "success" ? result.result.html.length : undefined,
error: // error:
result.state === "error" // result.state === "error"
? result.error // ? result.error
: result.state === "timeout" // : result.state === "timeout"
? "Timed out" // ? "Timed out"
: undefined, // : undefined,
time_taken: result.finishedAt - result.startedAt, // time_taken: result.finishedAt - result.startedAt,
}, // },
}); // });
} // }
if (error === undefined && response?.success) { if (error === undefined && response?.success) {
return response; return response;
@ -228,7 +227,7 @@ const saveJob = async (
// // I think the job won't exist here anymore // // I think the job won't exist here anymore
// } // }
} }
ScrapeEvents.logJobEvent(job, "completed"); // ScrapeEvents.logJobEvent(job, "completed");
} catch (error) { } catch (error) {
_logger.error(`🐂 Failed to update job status`, { _logger.error(`🐂 Failed to update job status`, {
module: "runWebScraper", module: "runWebScraper",