This commit is contained in:
Yanlong Wang 2025-04-21 16:56:28 +08:00
parent d874c4890d
commit e8dee24a9a
No known key found for this signature in database
GPG Key ID: C0A623C0BADF9F37
10 changed files with 337 additions and 329 deletions

View File

@ -1,86 +0,0 @@
import { Also, Prop, parseJSONText } from 'civkit';
import { FirestoreRecord } from '../shared/lib/firestore';
import _ from 'lodash';
export enum AdaptiveCrawlTaskStatus {
PENDING = 'pending',
PROCESSING = 'processing',
COMPLETED = 'completed',
FAILED = 'failed',
}
@Also({
dictOf: Object
})
export class AdaptiveCrawlTask extends FirestoreRecord {
static override collectionName = 'adaptiveCrawlTasks';
override _id!: string;
@Prop({
required: true
})
status!: AdaptiveCrawlTaskStatus;
@Prop({
required: true
})
statusText!: string;
@Prop()
meta!: {
useSitemap: boolean;
maxPages: number;
targetUrl: string;
};
@Prop()
urls!: string[];
@Prop()
processed!: {
[url: string]: string;
};
@Prop()
failed!: {
[url: string]: any;
};
@Prop()
createdAt!: Date;
@Prop()
finishedAt?: Date;
@Prop()
duration?: number;
static patchedFields = [
'meta',
];
static override from(input: any) {
for (const field of this.patchedFields) {
if (typeof input[field] === 'string') {
input[field] = parseJSONText(input[field]);
}
}
return super.from(input) as AdaptiveCrawlTask;
}
override degradeForFireStore() {
const copy: any = { ...this };
for (const field of (this.constructor as typeof AdaptiveCrawlTask).patchedFields) {
if (typeof copy[field] === 'object') {
copy[field] = JSON.stringify(copy[field]) as any;
}
}
return copy;
}
[k: string]: any;
}

View File

@ -1,15 +1,18 @@
import { Also, parseJSONText, Prop } from 'civkit';
import { FirestoreRecord } from '../shared/lib/firestore';
import { singleton, container } from 'tsyringe';
import { Also, AutoCastable, Prop } from 'civkit/civ-rpc';
import _ from 'lodash';
import type { PageSnapshot } from '../services/puppeteer';
import { MongoCollection } from '../services/mongodb';
import { ObjectId } from 'mongodb';
@Also({
dictOf: Object
})
export class Crawled extends FirestoreRecord {
static override collectionName = 'crawled';
override _id!: string;
export class Crawled extends AutoCastable {
@Prop({
defaultFactory: () => new ObjectId()
})
_id!: ObjectId;
@Prop({
required: true
@ -42,31 +45,15 @@ export class Crawled extends FirestoreRecord {
@Prop()
expireAt!: Date;
static patchedFields = [
'snapshot'
];
static override from(input: any) {
for (const field of this.patchedFields) {
if (typeof input[field] === 'string') {
input[field] = parseJSONText(input[field]);
}
}
return super.from(input) as Crawled;
}
override degradeForFireStore() {
const copy: any = { ...this };
for (const field of (this.constructor as typeof Crawled).patchedFields) {
if (typeof copy[field] === 'object') {
copy[field] = JSON.stringify(copy[field]) as any;
}
}
return copy;
}
[k: string]: any;
}
@singleton()
export class PageCacheCollection extends MongoCollection<Crawled> {
override collectionName = 'pageCaches';
override typeclass = Crawled;
}
const instance = container.resolve(PageCacheCollection);
export default instance;

View File

@ -1,13 +1,16 @@
import { Also, Prop } from 'civkit';
import { FirestoreRecord } from '../shared/lib/firestore';
import { singleton, container } from 'tsyringe';
import { Also, AutoCastable, Prop } from 'civkit/civ-rpc';
import { ObjectId } from 'mongodb';
import { MongoCollection } from '../services/mongodb';
@Also({
dictOf: Object
})
export class DomainBlockade extends FirestoreRecord {
static override collectionName = 'domainBlockades';
override _id!: string;
export class DomainBlockade extends AutoCastable {
@Prop({
defaultFactory: () => new ObjectId()
})
_id!: string;
@Prop({
required: true
@ -28,3 +31,14 @@ export class DomainBlockade extends FirestoreRecord {
[k: string]: any;
}
@singleton()
export class DomainBlockadeCollection extends MongoCollection<DomainBlockade> {
override collectionName = 'domainBlockades';
override typeclass = DomainBlockade;
}
const instance = container.resolve(DomainBlockadeCollection);
export default instance;

View File

@ -1,31 +0,0 @@
import { Also, Prop } from 'civkit';
import { FirestoreRecord } from '../shared/lib/firestore';
import { ENGINE_TYPE } from '../dto/crawler-options';
@Also({
dictOf: Object
})
export class DomainProfile extends FirestoreRecord {
static override collectionName = 'domainProfiles';
override _id!: string;
@Prop({
required: true
})
path!: string;
@Prop()
triggerUrl?: string;
@Prop({ required: true, type: ENGINE_TYPE })
engine!: string;
@Prop()
createdAt!: Date;
@Prop()
expireAt?: Date;
[k: string]: any;
}

View File

@ -1,14 +1,18 @@
import { Also, Prop } from 'civkit';
import { FirestoreRecord } from '../shared/lib/firestore';
import { singleton, container } from 'tsyringe';
import { Also, AutoCastable, Prop } from 'civkit/civ-rpc';
import _ from 'lodash';
import { ObjectId } from 'mongodb';
import { MongoCollection } from '../services/mongodb';
@Also({
dictOf: Object
})
export class ImgAlt extends FirestoreRecord {
static override collectionName = 'imgAlts';
export class ImgAlt extends AutoCastable {
override _id!: string;
@Prop({
defaultFactory: () => new ObjectId()
})
_id!: ObjectId;
@Prop({
required: true
@ -40,3 +44,14 @@ export class ImgAlt extends FirestoreRecord {
[k: string]: any;
}
@singleton()
export class ImageAltCollection extends MongoCollection<ImgAlt> {
override collectionName = 'imageAlts';
override typeclass = ImgAlt;
}
const instance = container.resolve(ImageAltCollection);
export default instance;

View File

@ -0,0 +1,145 @@
import { singleton, container } from 'tsyringe';
import { ArrayOf, AutoCastable, Prop } from 'civkit';
import _ from 'lodash';
import { ObjectId } from 'mongodb';
import { MongoCollection } from '../services/mongodb';
export class RateLimitDesc extends AutoCastable {
@Prop({
default: 1000
})
_id!: ObjectId;
@Prop({
default: 1000
})
occurrence!: number;
@Prop({
default: 3600
})
periodSeconds!: number;
@Prop()
notBefore?: Date;
@Prop()
notAfter?: Date;
isEffective() {
const now = new Date();
if (this.notBefore && this.notBefore > now) {
return false;
}
if (this.notAfter && this.notAfter < now) {
return false;
}
return true;
}
}
export class JinaWallet extends AutoCastable {
@Prop({
default: ''
})
user_id!: string;
@Prop({
default: 0
})
trial_balance!: number;
@Prop()
trial_start?: Date;
@Prop()
trial_end?: Date;
@Prop({
default: 0
})
regular_balance!: number;
@Prop({
default: 0
})
total_balance!: number;
}
export class JinaEmbeddingsTokenAccount extends AutoCastable {
@Prop({
defaultFactory: () => new ObjectId()
})
_id!: string;
@Prop({
required: true
})
user_id!: string;
@Prop({
nullable: true,
type: String,
})
email?: string;
@Prop({
nullable: true,
type: String,
})
full_name?: string;
@Prop({
nullable: true,
type: String,
})
customer_id?: string;
@Prop({
nullable: true,
type: String,
})
avatar_url?: string;
// Not keeping sensitive info for now
// @Prop()
// billing_address?: object;
// @Prop()
// payment_method?: object;
@Prop({
required: true
})
wallet!: JinaWallet;
@Prop({
type: Object
})
metadata?: { [k: string]: any; };
@Prop({
defaultFactory: () => new Date()
})
lastSyncedAt!: Date;
@Prop({
dictOf: [ArrayOf(RateLimitDesc)]
})
customRateLimits?: { [k: string]: RateLimitDesc[]; };
[k: string]: any;
}
@singleton()
export class JinaEmbeddingsTokenAccountCollection extends MongoCollection<JinaEmbeddingsTokenAccount> {
override collectionName = 'embeddingsTokenAccounts';
override typeclass = JinaEmbeddingsTokenAccount;
}
const instance = container.resolve(JinaEmbeddingsTokenAccountCollection);
export default instance;

View File

@ -1,14 +1,18 @@
import { Also, Prop, parseJSONText } from 'civkit';
import { FirestoreRecord } from '../shared/lib/firestore';
import { singleton, container } from 'tsyringe';
import { Also, AutoCastable, Prop } from 'civkit/civ-rpc';
import _ from 'lodash';
import { ObjectId } from 'mongodb';
import { MongoCollection } from '../services/mongodb';
@Also({
dictOf: Object
})
export class PDFContent extends FirestoreRecord {
static override collectionName = 'pdfs';
export class PDFContent extends AutoCastable {
override _id!: string;
@Prop({
defaultFactory: () => new ObjectId()
})
_id!: ObjectId;
@Prop({
required: true
@ -35,31 +39,16 @@ export class PDFContent extends FirestoreRecord {
@Prop()
expireAt?: Date;
static patchedFields = [
'meta'
];
static override from(input: any) {
for (const field of this.patchedFields) {
if (typeof input[field] === 'string') {
input[field] = parseJSONText(input[field]);
}
}
return super.from(input) as PDFContent;
}
override degradeForFireStore() {
const copy: any = { ...this };
for (const field of (this.constructor as typeof PDFContent).patchedFields) {
if (typeof copy[field] === 'object') {
copy[field] = JSON.stringify(copy[field]) as any;
}
}
return copy;
}
[k: string]: any;
}
@singleton()
export class PDFContentCollection extends MongoCollection<PDFContent> {
override collectionName = 'pdfs';
override typeclass = PDFContent;
}
const instance = container.resolve(PDFContentCollection);
export default instance;

View File

@ -1,43 +1,11 @@
import { singleton } from 'tsyringe';
import { Also, AutoCastable, Prop } from 'civkit/civ-rpc';
import { container, singleton } from 'tsyringe';
import { Also, ApplicationError, AutoCastable, Prop, ResourcePolicyDenyError, RPCReflection } from 'civkit/civ-rpc';
import { ObjectId } from 'mongodb';
import { MongoCollection } from '../services/mongodb';
import { getTraceId } from 'civkit/async-context';
import { RateLimitTriggeredError } from '../services/errors';
import type { RateLimitDesc } from './jina-embeddings-token-account';
export class RateLimitDesc extends AutoCastable {
@Prop({
default: 1000
})
_id!: ObjectId;
@Prop({
default: 1000
})
occurrence!: number;
@Prop({
default: 3600
})
periodSeconds!: number;
@Prop()
notBefore?: Date;
@Prop()
notAfter?: Date;
isEffective() {
const now = new Date();
if (this.notBefore && this.notBefore > now) {
return false;
}
if (this.notAfter && this.notAfter < now) {
return false;
}
return true;
}
}
export enum API_CALL_STATUS {
SUCCESS = 'success',
@ -105,8 +73,9 @@ export class APICallLog extends AutoCastable {
@singleton()
export class RateLimitControl extends MongoCollection<APICallLog> {
override collectionName = 'apiCallLogs'
export class RateLimitCollection extends MongoCollection<APICallLog> {
override collectionName = 'apiCallLogs';
override typeclass = APICallLog;
override async init() {
await this.dependencyReady();
@ -114,58 +83,44 @@ export class RateLimitControl extends MongoCollection<APICallLog> {
this.emit('ready');
}
async queryByUid(uid: string, pointInTime: Date, ...tags: string[]) {
let q = APICall.COLLECTION
.orderBy('createdAt', 'asc')
.where('createdAt', '>=', pointInTime)
.where('status', 'in', [API_CALL_STATUS.SUCCESS, API_CALL_STATUS.PENDING])
.where('uid', '==', uid);
if (tags.length) {
q = q.where('tags', 'array-contains-any', tags);
}
return APICall.fromFirestoreQuery(q);
}
async queryByIp(ip: string, pointInTime: Date, ...tags: string[]) {
let q = APICall.COLLECTION
.orderBy('createdAt', 'asc')
.where('createdAt', '>=', pointInTime)
.where('status', 'in', [API_CALL_STATUS.SUCCESS, API_CALL_STATUS.PENDING])
.where('ip', '==', ip);
if (tags.length) {
q = q.where('tags', 'array-contains-any', tags);
}
return APICall.fromFirestoreQuery(q);
}
async assertUidPeriodicLimit(uid: string, pointInTime: Date, limit: number, ...tags: string[]) {
if (limit <= 0) {
throw new ResourcePolicyDenyError(`This UID(${uid}) is not allowed to call this endpoint (rate limit quota is 0).`);
}
let q = APICall.COLLECTION
.orderBy('createdAt', 'asc')
.where('createdAt', '>=', pointInTime)
.where('status', 'in', [API_CALL_STATUS.SUCCESS, API_CALL_STATUS.PENDING])
.where('uid', '==', uid);
const query: any = {
createdAt: {
$gte: pointInTime,
},
status: {
$in: [API_CALL_STATUS.SUCCESS, API_CALL_STATUS.PENDING],
},
uid,
};
if (tags.length) {
q = q.where('tags', 'array-contains-any', tags);
query.tags = {
$in: tags,
};
}
try {
const count = (await q.count().get()).data().count;
const count = await this.collection.countDocuments(query);
if (count >= limit) {
const r = await APICall.fromFirestoreQuery(q.limit(1));
const [r1] = r;
const r = await this.findOne(query, { sort: { createdAt: 1 } });
if (!r) {
throw RateLimitTriggeredError.from({
message: `Per UID rate limit exceeded (${tags.join(',') || 'called'} ${limit} times since ${pointInTime})`,
});
}
const dtMs = Math.abs(r1.createdAt?.valueOf() - pointInTime.valueOf());
const dtMs = Math.abs(r?.createdAt?.valueOf() - pointInTime.valueOf());
const dtSec = Math.ceil(dtMs / 1000);
throw RateLimitTriggeredError.from({
message: `Per UID rate limit exceeded (${tags.join(',') || 'called'} ${limit} times since ${pointInTime})`,
retryAfter: dtSec,
retryAfterDate: new Date(Date.now() + dtMs),
});
}
@ -181,28 +136,40 @@ export class RateLimitControl extends MongoCollection<APICallLog> {
}
async assertIPPeriodicLimit(ip: string, pointInTime: Date, limit: number, ...tags: string[]) {
let q = APICall.COLLECTION
.orderBy('createdAt', 'asc')
.where('createdAt', '>=', pointInTime)
.where('status', 'in', [API_CALL_STATUS.SUCCESS, API_CALL_STATUS.PENDING])
.where('ip', '==', ip);
const query: any = {
createdAt: {
$gte: pointInTime,
},
status: {
$in: [API_CALL_STATUS.SUCCESS, API_CALL_STATUS.PENDING],
},
ip,
};
if (tags.length) {
q = q.where('tags', 'array-contains-any', tags);
query.tags = {
$in: tags,
};
}
try {
const count = (await q.count().get()).data().count;
const count = await this.collection.countDocuments(query);
if (count >= limit) {
const r = await APICall.fromFirestoreQuery(q.limit(1));
const [r1] = r;
const r = await this.collection.findOne(query, { sort: { createdAt: 1 } });
const dtMs = Math.abs(r1.createdAt?.valueOf() - pointInTime.valueOf());
if (!r) {
throw RateLimitTriggeredError.from({
message: `Per IP rate limit exceeded (${tags.join(',') || 'called'} ${limit} times since ${pointInTime})`,
});
}
const dtMs = Math.abs(r.createdAt?.valueOf() - pointInTime.valueOf());
const dtSec = Math.ceil(dtMs / 1000);
throw RateLimitTriggeredError.from({
message: `Per IP rate limit exceeded (${tags.join(',') || 'called'} ${limit} times since ${pointInTime})`,
retryAfter: dtSec,
retryAfterDate: new Date(Date.now() + dtMs),
});
}
@ -217,17 +184,13 @@ export class RateLimitControl extends MongoCollection<APICallLog> {
return 0;
}
record(partialRecord: Partial<APICall>) {
record(partialRecord: Partial<APICallLog>) {
if (partialRecord.uid) {
const record = APICall.from(partialRecord);
const newId = APICall.COLLECTION.doc().id;
record._id = newId;
const record = APICallLog.from(partialRecord);
return record;
}
const record = APICall.from(partialRecord);
const newId = APICall.COLLECTION.doc().id;
record._id = newId;
const record = APICallLog.from(partialRecord);
return record;
}
@ -244,16 +207,17 @@ export class RateLimitControl extends MongoCollection<APICallLog> {
tags,
});
r.save().catch((err) => this.logger.warn(`Failed to save rate limit record`, { err }));
this.insertOne(r).catch((err) => this.logger.warn(`Failed to save rate limit record`, { err }));
rpcReflect.then(() => {
r.status = API_CALL_STATUS.SUCCESS;
r.save()
this.updateOne({ _id: r._id }, { $set: { status: API_CALL_STATUS.SUCCESS } })
.catch((err) => this.logger.warn(`Failed to save rate limit record`, { err }));
});
rpcReflect.catch((err) => {
r.status = API_CALL_STATUS.ERROR;
r.error = err.toString();
r.save()
this.updateOne({ _id: r._id }, { $set: { status: API_CALL_STATUS.ERROR, error: err.toString() } })
.catch((err) => this.logger.warn(`Failed to save rate limit record`, { err }));
});
@ -274,17 +238,16 @@ export class RateLimitControl extends MongoCollection<APICallLog> {
ip,
tags,
});
r.save().catch((err) => this.logger.warn(`Failed to save rate limit record`, { err }));
this.collection.insertOne(r).catch((err) => this.logger.warn(`Failed to save rate limit record`, { err }));
rpcReflect.then(() => {
r.status = API_CALL_STATUS.SUCCESS;
r.save()
this.collection.updateOne({ _id: r._id }, { $set: { status: API_CALL_STATUS.SUCCESS } })
.catch((err) => this.logger.warn(`Failed to save rate limit record`, { err }));
});
rpcReflect.catch((err) => {
r.status = API_CALL_STATUS.ERROR;
r.error = err.toString();
r.save()
this.collection.updateOne({ _id: r._id }, { $set: { status: API_CALL_STATUS.ERROR, error: err.toString() } })
.catch((err) => this.logger.warn(`Failed to save rate limit record`, { err }));
});
@ -292,6 +255,6 @@ export class RateLimitControl extends MongoCollection<APICallLog> {
}
}
const instance = container.resolve(RateLimitControl);
const instance = container.resolve(RateLimitCollection);
export default instance;

View File

@ -1,14 +1,17 @@
import { Also, parseJSONText, Prop } from 'civkit';
import { FirestoreRecord } from '../shared/lib/firestore';
import { singleton, container } from 'tsyringe';
import { Also, Prop, AutoCastable } from 'civkit/civ-rpc';
import _ from 'lodash';
import { ObjectId } from 'mongodb';
import { MongoCollection } from '../services/mongodb';
@Also({
dictOf: Object
})
export class SearchResult extends FirestoreRecord {
static override collectionName = 'searchResults';
override _id!: string;
export class SearchResult extends AutoCastable {
@Prop({
defaultFactory: () => new ObjectId()
})
_id!: string;
@Prop({
required: true
@ -30,39 +33,20 @@ export class SearchResult extends FirestoreRecord {
expireAt?: Date;
[k: string]: any;
static patchedFields = [
'query',
'response',
];
static override from(input: any) {
for (const field of this.patchedFields) {
if (typeof input[field] === 'string') {
input[field] = parseJSONText(input[field]);
}
}
return super.from(input) as SearchResult;
}
override degradeForFireStore() {
const copy: any = { ...this };
for (const field of (this.constructor as typeof SearchResult).patchedFields) {
if (typeof copy[field] === 'object') {
copy[field] = JSON.stringify(copy[field]) as any;
}
}
return copy;
}
}
export class SerperSearchResult extends SearchResult {
static override collectionName = 'serperSearchResults';
@singleton()
export class SerperResultsCollection extends MongoCollection<SearchResult> {
override collectionName = 'serperSearchResults';
override typeclass = SearchResult;
}
export class SERPResult extends SearchResult {
static override collectionName = 'SERPResults';
}
@singleton()
export class SERPResultsCollection extends MongoCollection<SearchResult> {
override collectionName = 'SERPResults';
override typeclass = SearchResult;
}
const instance = container.resolve(SERPResultsCollection);
export default instance;

View File

@ -1,4 +1,4 @@
import { ApplicationError, StatusCode } from 'civkit/civ-rpc';
import { ApplicationError, Prop, RPC_TRANSFER_PROTOCOL_META_SYMBOL, StatusCode } from 'civkit/civ-rpc';
import _ from 'lodash';
import dayjs from 'dayjs';
import utc from 'dayjs/plugin/utc';
@ -46,3 +46,31 @@ export class SecurityCompromiseError extends ApplicationError { }
@StatusCode(41201)
export class BatchSizeTooLargeError extends ApplicationError { }
@StatusCode(42903)
export class RateLimitTriggeredError extends ApplicationError {
@Prop({
desc: 'Retry after seconds',
})
retryAfter?: number;
@Prop({
desc: 'Retry after date',
})
retryAfterDate?: Date;
protected override get [RPC_TRANSFER_PROTOCOL_META_SYMBOL]() {
const retryAfter = this.retryAfter || this.retryAfterDate;
if (!retryAfter) {
return super[RPC_TRANSFER_PROTOCOL_META_SYMBOL];
}
return _.merge(_.cloneDeep(super[RPC_TRANSFER_PROTOCOL_META_SYMBOL]), {
headers: {
'Retry-After': `${retryAfter instanceof Date ? dayjs(retryAfter).utc().format('ddd, DD MMM YYYY HH:mm:ss [GMT]') : retryAfter}`,
}
});
}
}