diff --git a/package-lock.json b/package-lock.json index 35f1d1e..854f457 100644 --- a/package-lock.json +++ b/package-lock.json @@ -32,6 +32,7 @@ "lru-cache": "^11.0.2", "maxmind": "^4.3.18", "minio": "^7.1.3", + "mongodb": "^6.15.0", "node-libcurl": "^4.1.0", "openai": "^4.20.0", "pdfjs-dist": "^4.10.38", @@ -1685,7 +1686,6 @@ "version": "1.1.9", "resolved": "https://registry.npmjs.org/@mongodb-js/saslprep/-/saslprep-1.1.9.tgz", "integrity": "sha512-tVkljjeEaAhCqTzajSdgbQ6gE6f3oneVwa3iXR6csiEwXXOFsiC6Uh9iAjAhXPtqa/XMDHWjjeNH/77m/Yq2dw==", - "peer": true, "dependencies": { "sparse-bitfield": "^3.0.3" } @@ -2656,14 +2656,12 @@ "node_modules/@types/webidl-conversions": { "version": "7.0.3", "resolved": "https://registry.npmjs.org/@types/webidl-conversions/-/webidl-conversions-7.0.3.tgz", - "integrity": "sha512-CiJJvcRtIgzadHCYXw7dqEnMNRjhGZlYK05Mj9OyktqV8uVT8fD2BFOB7S1uwBE3Kj2Z+4UyPmFw/Ixgw/LAlA==", - "peer": true + "integrity": "sha512-CiJJvcRtIgzadHCYXw7dqEnMNRjhGZlYK05Mj9OyktqV8uVT8fD2BFOB7S1uwBE3Kj2Z+4UyPmFw/Ixgw/LAlA==" }, "node_modules/@types/whatwg-url": { "version": "11.0.5", "resolved": "https://registry.npmjs.org/@types/whatwg-url/-/whatwg-url-11.0.5.tgz", "integrity": "sha512-coYR071JRaHa+xoEvvYqvnIHaVqaYrLPbsufM9BF63HkwI5Lgmy2QR8Q5K/lYDYo5AK82wOvSOS0UsLTpTG7uQ==", - "peer": true, "dependencies": { "@types/webidl-conversions": "*" } @@ -3727,10 +3725,10 @@ } }, "node_modules/bson": { - "version": "6.8.0", - "resolved": "https://registry.npmjs.org/bson/-/bson-6.8.0.tgz", - "integrity": "sha512-iOJg8pr7wq2tg/zSlCCHMi3hMm5JTOxLTagf3zxhcenHsFp+c6uOs6K7W5UE7A4QIJGtqh/ZovFNMP4mOPJynQ==", - "peer": true, + "version": "6.10.3", + "resolved": "https://registry.npmjs.org/bson/-/bson-6.10.3.tgz", + "integrity": "sha512-MTxGsqgYTwfshYWTRdmZRC+M7FnG1b4y7RO7p2k3X24Wq0yv1m77Wsj0BzlPzd/IowgESfsruQCUToa7vbOpPQ==", + "license": "Apache-2.0", "engines": { "node": ">=16.20.1" } @@ -8771,8 +8769,7 @@ "node_modules/memory-pager": { "version": "1.5.0", "resolved": "https://registry.npmjs.org/memory-pager/-/memory-pager-1.5.0.tgz", - "integrity": "sha512-ZS4Bp4r/Zoeq6+NLJpP+0Zzm0pR8whtGPf1XExKLJBAczGMnSi3It14OiNCStjQjM6NU1okjQGSxgEZN8eBYKg==", - "peer": true + "integrity": "sha512-ZS4Bp4r/Zoeq6+NLJpP+0Zzm0pR8whtGPf1XExKLJBAczGMnSi3It14OiNCStjQjM6NU1okjQGSxgEZN8eBYKg==" }, "node_modules/merge-deep": { "version": "3.0.3", @@ -9129,13 +9126,13 @@ } }, "node_modules/mongodb": { - "version": "6.8.0", - "resolved": "https://registry.npmjs.org/mongodb/-/mongodb-6.8.0.tgz", - "integrity": "sha512-HGQ9NWDle5WvwMnrvUxsFYPd3JEbqD3RgABHBQRuoCEND0qzhsd0iH5ypHsf1eJ+sXmvmyKpP+FLOKY8Il7jMw==", - "peer": true, + "version": "6.15.0", + "resolved": "https://registry.npmjs.org/mongodb/-/mongodb-6.15.0.tgz", + "integrity": "sha512-ifBhQ0rRzHDzqp9jAQP6OwHSH7dbYIQjD3SbJs9YYk9AikKEettW/9s/tbSFDTpXcRbF+u1aLrhHxDFaYtZpFQ==", + "license": "Apache-2.0", "dependencies": { - "@mongodb-js/saslprep": "^1.1.5", - "bson": "^6.7.0", + "@mongodb-js/saslprep": "^1.1.9", + "bson": "^6.10.3", "mongodb-connection-string-url": "^3.0.0" }, "engines": { @@ -9143,7 +9140,7 @@ }, "peerDependencies": { "@aws-sdk/credential-providers": "^3.188.0", - "@mongodb-js/zstd": "^1.1.0", + "@mongodb-js/zstd": "^1.1.0 || ^2.0.0", "gcp-metadata": "^5.2.0", "kerberos": "^2.0.1", "mongodb-client-encryption": ">=6.0.0 <7", @@ -9178,7 +9175,6 @@ "version": "3.0.1", "resolved": "https://registry.npmjs.org/mongodb-connection-string-url/-/mongodb-connection-string-url-3.0.1.tgz", "integrity": "sha512-XqMGwRX0Lgn05TDB4PyG2h2kKO/FfWJyCzYQbIhXUxz7ETt0I/FqHjUeqj37irJ+Dl1ZtU82uYyj14u2XsZKfg==", - "peer": true, "dependencies": { "@types/whatwg-url": "^11.0.2", "whatwg-url": "^13.0.0" @@ -11684,7 +11680,6 @@ "version": "3.0.3", "resolved": "https://registry.npmjs.org/sparse-bitfield/-/sparse-bitfield-3.0.3.tgz", "integrity": "sha512-kvzhi7vqKTfkh0PZU+2D2PIllw2ymqJKujUcyPMd9Y75Nv4nPbGJZXNhxsgdQab2BmlDct1YnfQCguEvHr7VsQ==", - "peer": true, "dependencies": { "memory-pager": "^1.0.2" } @@ -12212,7 +12207,6 @@ "version": "4.1.1", "resolved": "https://registry.npmjs.org/tr46/-/tr46-4.1.1.tgz", "integrity": "sha512-2lv/66T7e5yNyhAAC4NaKe5nVavzuGJQVVtRYLyQ2OI8tsJ61PMLlelehb0wi2Hx6+hT/OJUWZcw8MjlSRnxvw==", - "peer": true, "dependencies": { "punycode": "^2.3.0" }, @@ -12697,7 +12691,6 @@ "version": "7.0.0", "resolved": "https://registry.npmjs.org/webidl-conversions/-/webidl-conversions-7.0.0.tgz", "integrity": "sha512-VwddBukDzu71offAQR975unBIGqfKZpM+8ZX6ySk8nYhVoo5CYaZyzt3YBvYtRtO+aoGlqxPg/B87NGVZ/fu6g==", - "peer": true, "engines": { "node": ">=12" } @@ -12727,7 +12720,6 @@ "version": "13.0.0", "resolved": "https://registry.npmjs.org/whatwg-url/-/whatwg-url-13.0.0.tgz", "integrity": "sha512-9WWbymnqj57+XEuqADHrCJ2eSXzn8WXIW/YSGaZtb2WKAInQ6CHfaUUcTyyver0p8BDg5StLQq8h1vtZuwmOig==", - "peer": true, "dependencies": { "tr46": "^4.1.1", "webidl-conversions": "^7.0.0" diff --git a/package.json b/package.json index ae35f72..cedf320 100644 --- a/package.json +++ b/package.json @@ -41,6 +41,7 @@ "lru-cache": "^11.0.2", "maxmind": "^4.3.18", "minio": "^7.1.3", + "mongodb": "^6.15.0", "node-libcurl": "^4.1.0", "openai": "^4.20.0", "pdfjs-dist": "^4.10.38", diff --git a/src/db/rate-limit.ts b/src/db/rate-limit.ts new file mode 100644 index 0000000..9a703ae --- /dev/null +++ b/src/db/rate-limit.ts @@ -0,0 +1,297 @@ +import { singleton } from 'tsyringe'; +import { Also, AutoCastable, Prop } from 'civkit/civ-rpc'; +import { ObjectId } from 'mongodb'; +import { MongoCollection } from '../services/mongodb'; +import { getTraceId } from 'civkit/async-context'; + +export class RateLimitDesc extends AutoCastable { + @Prop({ + default: 1000 + }) + _id!: ObjectId; + + @Prop({ + default: 1000 + }) + occurrence!: number; + + @Prop({ + default: 3600 + }) + periodSeconds!: number; + + @Prop() + notBefore?: Date; + + @Prop() + notAfter?: Date; + + isEffective() { + const now = new Date(); + if (this.notBefore && this.notBefore > now) { + return false; + } + if (this.notAfter && this.notAfter < now) { + return false; + } + + return true; + } +} + +export enum API_CALL_STATUS { + SUCCESS = 'success', + ERROR = 'error', + PENDING = 'pending', +} + +@Also({ dictOf: Object }) +export class APICallLog extends AutoCastable { + + @Prop({ + defaultFactory: () => new ObjectId() + }) + _id!: ObjectId; + + @Prop({ + required: true, + defaultFactory: () => getTraceId() + }) + traceId!: string; + + @Prop() + uid?: string; + + @Prop() + ip?: string; + + @Prop({ + arrayOf: String, + default: [], + }) + tags!: string[]; + + @Prop({ + required: true, + defaultFactory: () => new Date(), + }) + createdAt!: Date; + + @Prop() + completedAt?: Date; + + @Prop({ + required: true, + default: API_CALL_STATUS.PENDING, + }) + status!: API_CALL_STATUS; + + @Prop({ + required: true, + defaultFactory: () => new Date(Date.now() + 1000 * 60 * 60 * 24 * 90), + }) + expireAt!: Date; + + [k: string]: any; + + tag(...tags: string[]) { + for (const t of tags) { + if (!this.tags.includes(t)) { + this.tags.push(t); + } + } + } +} + + +@singleton() +export class RateLimitControl extends MongoCollection { + override collectionName = 'apiCallLogs' + + override async init() { + await this.dependencyReady(); + + this.emit('ready'); + } + + async queryByUid(uid: string, pointInTime: Date, ...tags: string[]) { + let q = APICall.COLLECTION + .orderBy('createdAt', 'asc') + .where('createdAt', '>=', pointInTime) + .where('status', 'in', [API_CALL_STATUS.SUCCESS, API_CALL_STATUS.PENDING]) + .where('uid', '==', uid); + if (tags.length) { + q = q.where('tags', 'array-contains-any', tags); + } + + return APICall.fromFirestoreQuery(q); + } + + async queryByIp(ip: string, pointInTime: Date, ...tags: string[]) { + let q = APICall.COLLECTION + .orderBy('createdAt', 'asc') + .where('createdAt', '>=', pointInTime) + .where('status', 'in', [API_CALL_STATUS.SUCCESS, API_CALL_STATUS.PENDING]) + .where('ip', '==', ip); + if (tags.length) { + q = q.where('tags', 'array-contains-any', tags); + } + + return APICall.fromFirestoreQuery(q); + } + + async assertUidPeriodicLimit(uid: string, pointInTime: Date, limit: number, ...tags: string[]) { + if (limit <= 0) { + throw new ResourcePolicyDenyError(`This UID(${uid}) is not allowed to call this endpoint (rate limit quota is 0).`); + } + + let q = APICall.COLLECTION + .orderBy('createdAt', 'asc') + .where('createdAt', '>=', pointInTime) + .where('status', 'in', [API_CALL_STATUS.SUCCESS, API_CALL_STATUS.PENDING]) + .where('uid', '==', uid); + if (tags.length) { + q = q.where('tags', 'array-contains-any', tags); + } + try { + const count = (await q.count().get()).data().count; + + if (count >= limit) { + const r = await APICall.fromFirestoreQuery(q.limit(1)); + const [r1] = r; + + const dtMs = Math.abs(r1.createdAt?.valueOf() - pointInTime.valueOf()); + const dtSec = Math.ceil(dtMs / 1000); + + throw RateLimitTriggeredError.from({ + message: `Per UID rate limit exceeded (${tags.join(',') || 'called'} ${limit} times since ${pointInTime})`, + retryAfter: dtSec, + }); + } + + return count + 1; + } catch (err) { + if (err instanceof ApplicationError) { + throw err; + } + this.logger.error(`Failed to query rate limit, firebase just cant handle it. Ignoring and just continue.`, { err }); + } + + return 0; + } + + async assertIPPeriodicLimit(ip: string, pointInTime: Date, limit: number, ...tags: string[]) { + let q = APICall.COLLECTION + .orderBy('createdAt', 'asc') + .where('createdAt', '>=', pointInTime) + .where('status', 'in', [API_CALL_STATUS.SUCCESS, API_CALL_STATUS.PENDING]) + .where('ip', '==', ip); + if (tags.length) { + q = q.where('tags', 'array-contains-any', tags); + } + + try { + const count = (await q.count().get()).data().count; + + if (count >= limit) { + const r = await APICall.fromFirestoreQuery(q.limit(1)); + const [r1] = r; + + const dtMs = Math.abs(r1.createdAt?.valueOf() - pointInTime.valueOf()); + const dtSec = Math.ceil(dtMs / 1000); + + throw RateLimitTriggeredError.from({ + message: `Per IP rate limit exceeded (${tags.join(',') || 'called'} ${limit} times since ${pointInTime})`, + retryAfter: dtSec, + }); + } + + return count + 1; + } catch (err) { + if (err instanceof ApplicationError) { + throw err; + } + this.logger.error(`Failed to query rate limit, firebase just cant handle it. Ignoring and just continue.`, { err }); + } + + return 0; + } + + record(partialRecord: Partial) { + if (partialRecord.uid) { + const record = APICall.from(partialRecord); + const newId = APICall.COLLECTION.doc().id; + record._id = newId; + + return record; + } + const record = APICall.from(partialRecord); + const newId = APICall.COLLECTION.doc().id; + record._id = newId; + + return record; + } + + async simpleRPCUidBasedLimit(rpcReflect: RPCReflection, uid: string, tags: string[] = [], + ...inputCriterion: RateLimitDesc[] | [Date, number][]) { + const criterion = inputCriterion.map((c) => { return Array.isArray(c) ? c : this.rateLimitDescToCriterion(c); }); + + await Promise.all(criterion.map(([pointInTime, n]) => + this.assertUidPeriodicLimit(uid, pointInTime, n, ...tags))); + + const r = this.record({ + uid, + tags, + }); + + r.save().catch((err) => this.logger.warn(`Failed to save rate limit record`, { err })); + rpcReflect.then(() => { + r.status = API_CALL_STATUS.SUCCESS; + r.save() + .catch((err) => this.logger.warn(`Failed to save rate limit record`, { err })); + }); + rpcReflect.catch((err) => { + r.status = API_CALL_STATUS.ERROR; + r.error = err.toString(); + r.save() + .catch((err) => this.logger.warn(`Failed to save rate limit record`, { err })); + }); + + return r; + } + + rateLimitDescToCriterion(rateLimitDesc: RateLimitDesc) { + return [new Date(Date.now() - rateLimitDesc.periodSeconds * 1000), rateLimitDesc.occurrence] as [Date, number]; + } + + async simpleRpcIPBasedLimit(rpcReflect: RPCReflection, ip: string, tags: string[] = [], + ...inputCriterion: RateLimitDesc[] | [Date, number][]) { + const criterion = inputCriterion.map((c) => { return Array.isArray(c) ? c : this.rateLimitDescToCriterion(c); }); + await Promise.all(criterion.map(([pointInTime, n]) => + this.assertIPPeriodicLimit(ip, pointInTime, n, ...tags))); + + const r = this.record({ + ip, + tags, + }); + + r.save().catch((err) => this.logger.warn(`Failed to save rate limit record`, { err })); + rpcReflect.then(() => { + r.status = API_CALL_STATUS.SUCCESS; + r.save() + .catch((err) => this.logger.warn(`Failed to save rate limit record`, { err })); + }); + rpcReflect.catch((err) => { + r.status = API_CALL_STATUS.ERROR; + r.error = err.toString(); + r.save() + .catch((err) => this.logger.warn(`Failed to save rate limit record`, { err })); + }); + + return r; + } +} + +const instance = container.resolve(RateLimitControl); + +export default instance; \ No newline at end of file