Nick: batch billing (#1264)

This commit is contained in:
Nicolas 2025-02-27 16:18:03 -03:00 committed by GitHub
parent 289e351c14
commit b72e21a697
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 522 additions and 54 deletions

View File

@ -10,6 +10,7 @@ import {
getIndexQueue,
getGenerateLlmsTxtQueue,
getDeepResearchQueue,
getBillingQueue,
} from "./services/queue-service";
import { v0Router } from "./routes/v0";
import os from "os";
@ -58,6 +59,7 @@ const { addQueue, removeQueue, setQueues, replaceQueues } = createBullBoard({
new BullAdapter(getIndexQueue()),
new BullAdapter(getGenerateLlmsTxtQueue()),
new BullAdapter(getDeepResearchQueue()),
new BullAdapter(getBillingQueue()),
],
serverAdapter: serverAdapter,
});

View File

@ -0,0 +1,327 @@
import { logger } from "../../lib/logger";
import { redisConnection } from "../queue-service";
import { supabase_service } from "../supabase";
import * as Sentry from "@sentry/node";
import { Queue } from "bullmq";
import { withAuth } from "../../lib/withAuth";
import { getACUC, setCachedACUC } from "../../controllers/auth";
// Configuration constants
const BATCH_KEY = "billing_batch";
const BATCH_LOCK_KEY = "billing_batch_lock";
const BATCH_SIZE = 50; // Batch size for processing
const BATCH_TIMEOUT = 15000; // 15 seconds processing interval
const LOCK_TIMEOUT = 30000; // 30 seconds lock timeout
// Define interfaces for billing operations
interface BillingOperation {
team_id: string;
subscription_id: string | null;
credits: number;
is_extract: boolean;
timestamp: string;
}
// Grouped billing operations for batch processing
interface GroupedBillingOperation {
team_id: string;
subscription_id: string | null;
total_credits: number;
is_extract: boolean;
operations: BillingOperation[];
}
// Function to acquire a lock for batch processing
async function acquireLock(): Promise<boolean> {
const redis = redisConnection;
// Set lock with NX (only if it doesn't exist) and PX (millisecond expiry)
const result = await redis.set(BATCH_LOCK_KEY, "1", "PX", LOCK_TIMEOUT, "NX");
const acquired = result === "OK";
if (acquired) {
logger.info("🔒 Acquired billing batch processing lock");
}
return acquired;
}
// Function to release the lock
async function releaseLock() {
const redis = redisConnection;
await redis.del(BATCH_LOCK_KEY);
logger.info("🔓 Released billing batch processing lock");
}
// Main function to process the billing batch
export async function processBillingBatch() {
const redis = redisConnection;
// Try to acquire lock
if (!(await acquireLock())) {
return;
}
try {
// Get all operations from Redis list
const operations: BillingOperation[] = [];
while (operations.length < BATCH_SIZE) {
const op = await redis.lpop(BATCH_KEY);
if (!op) break;
operations.push(JSON.parse(op));
}
if (operations.length === 0) {
logger.info("No billing operations to process in batch");
return;
}
logger.info(`📦 Processing batch of ${operations.length} billing operations`);
// Group operations by team_id and subscription_id
const groupedOperations = new Map<string, GroupedBillingOperation>();
for (const op of operations) {
const key = `${op.team_id}:${op.subscription_id ?? 'null'}:${op.is_extract}`;
if (!groupedOperations.has(key)) {
groupedOperations.set(key, {
team_id: op.team_id,
subscription_id: op.subscription_id,
total_credits: 0,
is_extract: op.is_extract,
operations: []
});
}
const group = groupedOperations.get(key)!;
group.total_credits += op.credits;
group.operations.push(op);
}
// Process each group of operations
for (const [key, group] of groupedOperations.entries()) {
logger.info(`🔄 Billing team ${group.team_id} for ${group.total_credits} credits`, {
team_id: group.team_id,
subscription_id: group.subscription_id,
total_credits: group.total_credits,
operation_count: group.operations.length,
is_extract: group.is_extract
});
// Skip billing for preview teams
if (group.team_id === "preview" || group.team_id.startsWith("preview_")) {
logger.info(`Skipping billing for preview team ${group.team_id}`);
continue;
}
try {
// Execute the actual billing
await withAuth(supaBillTeam, { success: true, message: "No DB, bypassed." })(
group.team_id,
group.subscription_id,
group.total_credits,
logger,
group.is_extract
);
logger.info(`✅ Successfully billed team ${group.team_id} for ${group.total_credits} credits`);
} catch (error) {
logger.error(`❌ Failed to bill team ${group.team_id}`, { error, group });
Sentry.captureException(error, {
data: {
operation: "batch_billing",
team_id: group.team_id,
credits: group.total_credits
}
});
}
}
logger.info("✅ Billing batch processing completed successfully");
} catch (error) {
logger.error("Error processing billing batch", { error });
Sentry.captureException(error, {
data: {
operation: "batch_billing_process"
}
});
} finally {
await releaseLock();
}
}
// Start periodic batch processing
let batchInterval: NodeJS.Timeout | null = null;
export function startBillingBatchProcessing() {
if (batchInterval) return;
logger.info("🔄 Starting periodic billing batch processing");
batchInterval = setInterval(async () => {
const queueLength = await redisConnection.llen(BATCH_KEY);
logger.info(`Checking billing batch queue (${queueLength} items pending)`);
await processBillingBatch();
}, BATCH_TIMEOUT);
// Unref to not keep process alive
batchInterval.unref();
}
// Add a billing operation to the queue
export async function queueBillingOperation(
team_id: string,
subscription_id: string | null | undefined,
credits: number,
is_extract: boolean = false
) {
// Skip queuing for preview teams
if (team_id === "preview" || team_id.startsWith("preview_")) {
logger.info(`Skipping billing queue for preview team ${team_id}`);
return { success: true, message: "Preview team, no credits used" };
}
logger.info(`Queueing billing operation for team ${team_id}`, {
team_id,
subscription_id,
credits,
is_extract
});
try {
const operation: BillingOperation = {
team_id,
subscription_id: subscription_id ?? null,
credits,
is_extract,
timestamp: new Date().toISOString()
};
// Add operation to Redis list
const redis = redisConnection;
await redis.rpush(BATCH_KEY, JSON.stringify(operation));
const queueLength = await redis.llen(BATCH_KEY);
logger.info(`📥 Added billing operation to queue (${queueLength} total pending)`, {
team_id,
credits
});
// Start batch processing if not already started
startBillingBatchProcessing();
// If we have enough items, trigger immediate processing
if (queueLength >= BATCH_SIZE) {
logger.info("🔄 Billing queue reached batch size, triggering immediate processing");
await processBillingBatch();
}
// TODO is there a better way to do this?
// Update cached credits used immediately to provide accurate feedback to users
// This is optimistic - actual billing happens in batch
// Should we add this?
// I guess batch is fast enough that it's fine
// if (process.env.USE_DB_AUTHENTICATION === "true") {
// (async () => {
// // Get API keys for this team to update in cache
// const { data } = await supabase_service
// .from("api_keys")
// .select("key")
// .eq("team_id", team_id);
// for (const apiKey of (data ?? []).map(x => x.key)) {
// await setCachedACUC(apiKey, (acuc) =>
// acuc
// ? {
// ...acuc,
// credits_used: acuc.credits_used + credits,
// adjusted_credits_used: acuc.adjusted_credits_used + credits,
// remaining_credits: acuc.remaining_credits - credits,
// }
// : null,
// );
// }
// })().catch(error => {
// logger.error("Failed to update cached credits", { error, team_id });
// });
// }
return { success: true };
} catch (error) {
logger.error("Error queueing billing operation", { error, team_id });
Sentry.captureException(error, {
data: {
operation: "queue_billing",
team_id,
credits
}
});
return { success: false, error };
}
}
// Modified version of the billing function for batch operations
async function supaBillTeam(
team_id: string,
subscription_id: string | null | undefined,
credits: number,
__logger?: any,
is_extract: boolean = false,
) {
const _logger = (__logger ?? logger).child({
module: "credit_billing",
method: "supaBillTeam",
teamId: team_id,
subscriptionId: subscription_id,
credits,
});
if (team_id === "preview" || team_id.startsWith("preview_")) {
return { success: true, message: "Preview team, no credits used" };
}
_logger.info(`Batch billing team ${team_id} for ${credits} credits`);
// Perform the actual database operation
const { data, error } = await supabase_service.rpc("bill_team_w_extract_3", {
_team_id: team_id,
sub_id: subscription_id ?? null,
fetch_subscription: subscription_id === undefined,
credits,
is_extract_param: is_extract,
});
if (error) {
Sentry.captureException(error);
_logger.error("Failed to bill team.", { error });
return { success: false, error };
}
// Update cached ACUC to reflect the new credit usage
(async () => {
for (const apiKey of (data ?? []).map((x) => x.api_key)) {
await setCachedACUC(apiKey, (acuc) =>
acuc
? {
...acuc,
credits_used: acuc.credits_used + credits,
adjusted_credits_used: acuc.adjusted_credits_used + credits,
remaining_credits: acuc.remaining_credits - credits,
}
: null,
);
}
})().catch(error => {
_logger.error("Failed to update cached credits", { error, team_id });
});
return { success: true, data };
}
// Cleanup on exit
process.on("beforeExit", async () => {
if (batchInterval) {
clearInterval(batchInterval);
batchInterval = null;
logger.info("Stopped periodic billing batch processing");
}
await processBillingBatch();
});

View File

@ -10,6 +10,7 @@ import { issueCredits } from "./issue_credits";
import { redlock } from "../redlock";
import { autoCharge } from "./auto_charge";
import { getValue, setValue } from "../redis";
import { queueBillingOperation } from "./batch_billing";
import type { Logger } from "winston";
// Deprecated, done via rpc
@ -25,14 +26,16 @@ export async function billTeam(
logger?: Logger,
is_extract: boolean = false,
) {
return withAuth(supaBillTeam, { success: true, message: "No DB, bypassed." })(
team_id,
subscription_id,
credits,
logger,
is_extract,
);
// Maintain the withAuth wrapper for authentication
return withAuth(
async (team_id, subscription_id, credits, logger, is_extract) => {
// Within the authenticated context, queue the billing operation
return queueBillingOperation(team_id, subscription_id, credits, is_extract);
},
{ success: true, message: "No DB, bypassed." }
)(team_id, subscription_id, credits, logger, is_extract);
}
export async function supaBillTeam(
team_id: string,
subscription_id: string | null | undefined,
@ -40,6 +43,8 @@ export async function supaBillTeam(
__logger?: Logger,
is_extract: boolean = false,
) {
// This function should no longer be called directly
// It has been moved to batch_billing.ts
const _logger = (__logger ?? logger).child({
module: "credit_billing",
method: "supaBillTeam",
@ -48,39 +53,16 @@ export async function supaBillTeam(
credits,
});
if (team_id === "preview" || team_id.startsWith("preview_")) {
return { success: true, message: "Preview team, no credits used" };
}
_logger.info(`Billing team ${team_id} for ${credits} credits`);
const { data, error } = await supabase_service.rpc("bill_team_w_extract_3", {
_team_id: team_id,
sub_id: subscription_id ?? null,
fetch_subscription: subscription_id === undefined,
credits,
is_extract_param: is_extract,
_logger.warn("supaBillTeam was called directly. This function is deprecated and should only be called from batch_billing.ts");
queueBillingOperation(team_id, subscription_id, credits, is_extract).catch((err) => {
_logger.error("Error queuing billing operation", { err });
Sentry.captureException(err);
});
if (error) {
Sentry.captureException(error);
_logger.error("Failed to bill team.", { error });
return;
}
(async () => {
for (const apiKey of (data ?? []).map((x) => x.api_key)) {
await setCachedACUC(apiKey, (acuc) =>
acuc
? {
...acuc,
credits_used: acuc.credits_used + credits,
adjusted_credits_used: acuc.adjusted_credits_used + credits,
remaining_credits: acuc.remaining_credits - credits,
}
: null,
);
}
})();
// Forward to the batch billing system
return {
success: true,
message: "Billing operation queued",
};
}
export type CheckTeamCreditsResponse = {

View File

@ -0,0 +1,49 @@
import { logger } from "../../lib/logger";
import { getBillingQueue } from "../queue-service";
import { v4 as uuidv4 } from "uuid";
import * as Sentry from "@sentry/node";
/**
* Adds a job to the billing queue to trigger batch processing
* This can be used when we want to ensure billing is processed without waiting for the next interval
*/
export async function addBillingBatchJob() {
try {
const jobId = uuidv4();
logger.info("Adding billing batch job to queue", { jobId });
await getBillingQueue().add(
"process-batch",
{
timestamp: new Date().toISOString(),
},
{
jobId,
priority: 10,
}
);
return { success: true, jobId };
} catch (error) {
logger.error("Error adding billing batch job", { error });
Sentry.captureException(error, {
data: {
operation: "add_billing_batch_job"
}
});
return { success: false, error };
}
}
/**
* Trigger immediate processing of any pending billing operations
* This is useful for ensuring billing operations are processed without delay
*/
export async function triggerImmediateBillingProcess() {
try {
return await addBillingBatchJob();
} catch (error) {
logger.error("Error triggering immediate billing process", { error });
return { success: false, error };
}
}

View File

@ -7,8 +7,11 @@ import {
redisConnection,
indexQueueName,
getIndexQueue,
billingQueueName,
getBillingQueue,
} from "../queue-service";
import { saveCrawlMap } from "./crawl-maps-index";
import { processBillingBatch, queueBillingOperation, startBillingBatchProcessing } from "../billing/batch_billing";
import systemMonitor from "../system-monitor";
import { v4 as uuidv4 } from "uuid";
@ -61,6 +64,59 @@ const processJobInternal = async (token: string, job: Job) => {
return err;
};
// Create a processor for billing jobs
const processBillingJobInternal = async (token: string, job: Job) => {
if (!job.id) {
throw new Error("Job has no ID");
}
const logger = _logger.child({
module: "billing-worker",
method: "processBillingJobInternal",
jobId: job.id,
});
const extendLockInterval = setInterval(async () => {
logger.info(`🔄 Worker extending lock on billing job ${job.id}`);
await job.extendLock(token, jobLockExtensionTime);
}, jobLockExtendInterval);
let err = null;
try {
// Check job type - it could be either a batch processing trigger or an individual billing operation
if (job.name === "process-batch") {
// Process the entire batch
logger.info("Received batch process trigger job");
await processBillingBatch();
} else if (job.name === "bill_team") {
// This is an individual billing operation that should be queued for batch processing
const { team_id, subscription_id, credits, is_extract } = job.data;
logger.info(`Adding team ${team_id} billing operation to batch queue`, {
credits,
is_extract,
originating_job_id: job.data.originating_job_id,
});
// Add to the REDIS batch queue
await queueBillingOperation(team_id, subscription_id, credits, is_extract);
} else {
logger.warn(`Unknown billing job type: ${job.name}`);
}
await job.moveToCompleted({ success: true }, token, false);
} catch (error) {
logger.error("Error processing billing job", { error });
Sentry.captureException(error);
err = error;
await job.moveToFailed(error, token, false);
} finally {
clearInterval(extendLockInterval);
}
return err;
};
let isShuttingDown = false;
process.on("SIGINT", () => {
@ -75,7 +131,8 @@ process.on("SIGTERM", () => {
let cantAcceptConnectionCount = 0;
const workerFun = async (queue: Queue) => {
// Generic worker function that can process different job types
const workerFun = async (queue: Queue, jobProcessor: (token: string, job: Job) => Promise<any>) => {
const logger = _logger.child({ module: "index-worker", method: "workerFun" });
const worker = new Worker(queue.name, null, {
@ -139,13 +196,13 @@ const workerFun = async (queue: Queue) => {
},
},
async () => {
await processJobInternal(token, job);
await jobProcessor(token, job);
},
);
},
);
} else {
await processJobInternal(token, job);
await jobProcessor(token, job);
}
if (job.id) {
@ -168,7 +225,15 @@ const workerFun = async (queue: Queue) => {
process.exit(0);
};
// Start the worker
// Start the workers
(async () => {
await workerFun(getIndexQueue());
// Start index worker
const indexWorkerPromise = workerFun(getIndexQueue(), processJobInternal);
// Start billing worker and batch processing
startBillingBatchProcessing();
const billingWorkerPromise = workerFun(getBillingQueue(), processBillingJobInternal);
// Wait for both workers to complete (which should only happen on shutdown)
await Promise.all([indexWorkerPromise, billingWorkerPromise]);
})();

View File

@ -10,6 +10,7 @@ let loggingQueue: Queue;
let indexQueue: Queue;
let deepResearchQueue: Queue;
let generateLlmsTxtQueue: Queue;
let billingQueue: Queue;
export const redisConnection = new IORedis(process.env.REDIS_URL!, {
maxRetriesPerRequest: null,
@ -21,6 +22,7 @@ export const loggingQueueName = "{loggingQueue}";
export const indexQueueName = "{indexQueue}";
export const generateLlmsTxtQueueName = "{generateLlmsTxtQueue}";
export const deepResearchQueueName = "{deepResearchQueue}";
export const billingQueueName = "{billingQueue}";
export function getScrapeQueue() {
if (!scrapeQueue) {
@ -112,6 +114,24 @@ export function getDeepResearchQueue() {
return deepResearchQueue;
}
export function getBillingQueue() {
if (!billingQueue) {
billingQueue = new Queue(billingQueueName, {
connection: redisConnection,
defaultJobOptions: {
removeOnComplete: {
age: 90000, // 25 hours
},
removeOnFail: {
age: 90000, // 25 hours
},
},
});
logger.info("Billing queue created");
}
return billingQueue;
}
// === REMOVED IN FAVOR OF POLLING -- NOT RELIABLE
// import { QueueEvents } from 'bullmq';
// export const scrapeQueueEvents = new QueueEvents(scrapeQueueName, { connection: redisConnection.duplicate() });

View File

@ -12,6 +12,7 @@ import {
deepResearchQueueName,
getIndexQueue,
getGenerateLlmsTxtQueue,
getBillingQueue,
} from "./queue-service";
import { startWebScraperPipeline } from "../main/runWebScraper";
import { callWebhook } from "./webhook";
@ -1119,16 +1120,38 @@ async function processJob(job: Job & { id: string }, token: string) {
creditsToBeBilled = 5;
}
if (job.data.team_id !== process.env.BACKGROUND_INDEX_TEAM_ID!) {
billTeam(job.data.team_id, undefined, creditsToBeBilled, logger).catch(
(error) => {
logger.error(
`Failed to bill team ${job.data.team_id} for ${creditsToBeBilled} credits`,
{ error },
);
// Optionally, you could notify an admin or add to a retry queue here
},
);
if (job.data.team_id !== process.env.BACKGROUND_INDEX_TEAM_ID! && process.env.USE_DB_AUTHENTICATION === "true") {
try {
const billingJobId = uuidv4();
logger.debug(`Adding billing job to queue for team ${job.data.team_id}`, {
billingJobId,
credits: creditsToBeBilled,
is_extract: job.data.scrapeOptions.extract,
});
// Add directly to the billing queue - the billing worker will handle the rest
await getBillingQueue().add(
"bill_team",
{
team_id: job.data.team_id,
subscription_id: undefined,
credits: creditsToBeBilled,
is_extract: job.data.scrapeOptions.extract,
timestamp: new Date().toISOString(),
originating_job_id: job.id
},
{
jobId: billingJobId,
priority: 10,
}
);
} catch (error) {
logger.error(
`Failed to add billing job to queue for team ${job.data.team_id} for ${creditsToBeBilled} credits`,
{ error },
);
Sentry.captureException(error);
}
}
}