diff --git a/ee/query-service/app/server.go b/ee/query-service/app/server.go index 501ad96aa9..0d92d8be82 100644 --- a/ee/query-service/app/server.go +++ b/ee/query-service/app/server.go @@ -20,6 +20,7 @@ import ( "go.signoz.io/signoz/ee/query-service/dao" "go.signoz.io/signoz/ee/query-service/interfaces" licensepkg "go.signoz.io/signoz/ee/query-service/license" + "go.signoz.io/signoz/ee/query-service/usage" "go.signoz.io/signoz/pkg/query-service/app/dashboards" baseconst "go.signoz.io/signoz/pkg/query-service/constants" @@ -117,6 +118,16 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { return nil, err } + // start the usagemanager + usageManager, err := usage.New("sqlite", localDB, lm.GetRepo(), reader.GetConn()) + if err != nil { + return nil, err + } + err = usageManager.Start() + if err != nil { + return nil, err + } + telemetry.GetInstance().SetReader(reader) apiOpts := api.APIHandlerOptions{ diff --git a/ee/query-service/integrations/signozio/signozio.go b/ee/query-service/integrations/signozio/signozio.go index ac9d4128ab..ce9410e7a0 100644 --- a/ee/query-service/integrations/signozio/signozio.go +++ b/ee/query-service/integrations/signozio/signozio.go @@ -127,7 +127,7 @@ func NewPostRequestWithCtx(ctx context.Context, url string, contentType string, } // SendUsage reports the usage of signoz to license server -func SendUsage(ctx context.Context, usage *model.UsagePayload) *model.ApiError { +func SendUsage(ctx context.Context, usage model.UsagePayload) *model.ApiError { reqString, _ := json.Marshal(usage) req, err := NewPostRequestWithCtx(ctx, C.Prefix+"/usage", APPLICATION_JSON, bytes.NewBuffer(reqString)) if err != nil { diff --git a/ee/query-service/model/usage.go b/ee/query-service/model/usage.go index 7d6eec91cc..d9129531dc 100644 --- a/ee/query-service/model/usage.go +++ b/ee/query-service/model/usage.go @@ -6,30 +6,27 @@ import ( "github.com/google/uuid" ) -type UsageSnapshot struct { - CurrentLogSizeBytes uint64 `json:"currentLogSizeBytes"` - CurrentLogSizeBytesColdStorage uint64 `json:"currentLogSizeBytesColdStorage"` - CurrentSpansCount uint64 `json:"currentSpansCount"` - CurrentSpansCountColdStorage uint64 `json:"currentSpansCountColdStorage"` - CurrentSamplesCount uint64 `json:"currentSamplesCount"` - CurrentSamplesCountColdStorage uint64 `json:"currentSamplesCountColdStorage"` -} - -type UsageBase struct { - Id uuid.UUID `json:"id" db:"id"` - InstallationId uuid.UUID `json:"installationId" db:"installation_id"` - ActivationId uuid.UUID `json:"activationId" db:"activation_id"` - CreatedAt time.Time `json:"createdAt" db:"created_at"` - FailedSyncRequest int `json:"failedSyncRequest" db:"failed_sync_request_count"` -} - type UsagePayload struct { - UsageBase - Metrics UsageSnapshot `json:"metrics"` - SnapshotDate time.Time `json:"snapshotDate"` + InstallationId uuid.UUID `json:"installationId"` + LicenseKey uuid.UUID `json:"licenseKey"` + Usage []Usage `json:"usage"` } type Usage struct { - UsageBase - Snapshot string `db:"snapshot"` + CollectorID string `json:"collectorId"` + ExporterID string `json:"exporterId"` + Type string `json:"type"` + Tenant string `json:"tenant"` + TimeStamp time.Time `json:"timestamp"` + Count int64 `json:"count"` + Size int64 `json:"size"` +} + +type UsageDB struct { + CollectorID string `ch:"collector_id" json:"collectorId"` + ExporterID string `ch:"exporter_id" json:"exporterId"` + Type string `ch:"-" json:"type"` + TimeStamp time.Time `ch:"timestamp" json:"timestamp"` + Tenant string `ch:"tenant" json:"tenant"` + Data string `ch:"data" json:"data"` } diff --git a/ee/query-service/usage/manager.go b/ee/query-service/usage/manager.go index 067b65e0b4..4f361d4651 100644 --- a/ee/query-service/usage/manager.go +++ b/ee/query-service/usage/manager.go @@ -4,18 +4,19 @@ import ( "context" "encoding/json" "fmt" + "strings" "sync/atomic" "time" "github.com/ClickHouse/clickhouse-go/v2" "github.com/google/uuid" "github.com/jmoiron/sqlx" + "go.uber.org/zap" licenseserver "go.signoz.io/signoz/ee/query-service/integrations/signozio" "go.signoz.io/signoz/ee/query-service/license" "go.signoz.io/signoz/ee/query-service/model" - "go.signoz.io/signoz/ee/query-service/usage/repository" "go.signoz.io/signoz/pkg/query-service/utils/encryption" ) @@ -27,9 +28,6 @@ const ( ) var ( - // collect usage every hour - collectionFrequency = 1 * time.Hour - // send usage every 24 hour uploadFrequency = 24 * time.Hour @@ -37,8 +35,6 @@ var ( ) type Manager struct { - repository *repository.Repository - clickhouseConn clickhouse.Conn licenseRepo *license.Repo @@ -52,15 +48,9 @@ type Manager struct { } func New(dbType string, db *sqlx.DB, licenseRepo *license.Repo, clickhouseConn clickhouse.Conn) (*Manager, error) { - repo := repository.New(db) - - err := repo.Init(dbType) - if err != nil { - return nil, fmt.Errorf("failed to initiate usage repo: %v", err) - } m := &Manager{ - repository: repo, + // repository: repo, clickhouseConn: clickhouseConn, licenseRepo: licenseRepo, } @@ -74,6 +64,28 @@ func (lm *Manager) Start() error { return fmt.Errorf("usage exporter is locked") } + go lm.UsageExporter(context.Background()) + + return nil +} + +func (lm *Manager) UsageExporter(ctx context.Context) { + defer close(lm.terminated) + + uploadTicker := time.NewTicker(uploadFrequency) + defer uploadTicker.Stop() + + for { + select { + case <-lm.done: + return + case <-uploadTicker.C: + lm.UploadUsage(ctx) + } + } +} + +func (lm *Manager) UploadUsage(ctx context.Context) error { // check if license is present or not license, err := lm.licenseRepo.GetActiveLicense(context.Background()) if err != nil { @@ -85,203 +97,81 @@ func (lm *Manager) Start() error { return nil } - // upload previous snapshots if any - err = lm.UploadUsage(context.Background()) - if err != nil { - return err - } - - // collect snapshot if incase it wasn't collect in (t - collectionFrequency) - err = lm.CollectCurrentUsage(context.Background()) - if err != nil { - return err - } - - go lm.UsageExporter(context.Background()) - - return nil -} - -// CollectCurrentUsage checks if needs to collect usage data -func (lm *Manager) CollectCurrentUsage(ctx context.Context) error { - // check the DB if anything exist where timestamp > t - collectionFrequency - ts := time.Now().Add(-collectionFrequency) - alreadyCreated, err := lm.repository.CheckSnapshotGtCreatedAt(ctx, ts) - if err != nil { - return err - } - if !alreadyCreated { - zap.S().Info("Collecting current usage") - exportError := lm.CollectAndStoreUsage(ctx) - if exportError != nil { - return exportError - } - } else { - zap.S().Info("Nothing to collect") - } - return nil -} - -func (lm *Manager) UsageExporter(ctx context.Context) { - defer close(lm.terminated) - - collectionTicker := time.NewTicker(collectionFrequency) - defer collectionTicker.Stop() - - uploadTicker := time.NewTicker(uploadFrequency) - defer uploadTicker.Stop() - - for { - select { - case <-lm.done: - return - case <-collectionTicker.C: - lm.CollectAndStoreUsage(ctx) - case <-uploadTicker.C: - lm.UploadUsage(ctx) - // remove the old snapshots - lm.repository.DropOldSnapshots(ctx) - } - } -} - -type TableSize struct { - Table string `ch:"table"` - DiskName string `ch:"disk_name"` - Rows uint64 `ch:"rows"` - UncompressedBytes uint64 `ch:"uncompressed_bytes"` -} - -func (lm *Manager) CollectAndStoreUsage(ctx context.Context) error { - snap, err := lm.GetUsageFromClickHouse(ctx) - if err != nil { - return err - } - - license, err := lm.licenseRepo.GetActiveLicense(ctx) - if err != nil { - return err - } - - activationId, _ := uuid.Parse(license.ActivationId) - // TODO (nitya) : Add installation ID in the payload - payload := model.UsagePayload{ - UsageBase: model.UsageBase{ - ActivationId: activationId, - FailedSyncRequest: 0, - }, - Metrics: *snap, - SnapshotDate: time.Now(), - } - - err = lm.repository.InsertSnapshot(ctx, &payload) - if err != nil { - return err - } - - return nil -} - -func (lm *Manager) GetUsageFromClickHouse(ctx context.Context) (*model.UsageSnapshot, error) { - tableSizes := []TableSize{} - snap := model.UsageSnapshot{} + usages := []model.UsageDB{} // get usage from clickhouse + dbs := []string{"signoz_logs", "signoz_traces", "signoz_metrics"} query := ` - SELECT - table, - disk_name, - sum(rows) as rows, - sum(data_uncompressed_bytes) AS uncompressed_bytes - FROM system.parts - WHERE active AND (database in ('signoz_logs', 'signoz_metrics', 'signoz_traces')) AND (table in ('logs','samples_v2', 'signoz_index_v2')) - GROUP BY - table, - disk_name - ORDER BY table + SELECT tenant, collector_id, exporter_id, timestamp, data + FROM %s.distributed_usage as u1 + GLOBAL INNER JOIN + (SELECT + tenant, collector_id, exporter_id, MAX(timestamp) as ts + FROM %s.distributed_usage as u2 + where timestamp >= $1 + GROUP BY tenant, collector_id, exporter_id + ) as t1 + ON + u1.tenant = t1.tenant AND u1.collector_id = t1.collector_id AND u1.exporter_id = t1.exporter_id and u1.timestamp = t1.ts + order by timestamp ` - err := lm.clickhouseConn.Select(ctx, &tableSizes, query) - if err != nil { - return nil, err - } - for _, val := range tableSizes { - switch val.Table { - case "logs": - if val.DiskName == "default" { - snap.CurrentLogSizeBytes = val.UncompressedBytes - } else { - snap.CurrentLogSizeBytesColdStorage = val.UncompressedBytes - } - case "samples_v2": - if val.DiskName == "default" { - snap.CurrentSamplesCount = val.Rows - } else { - snap.CurrentSamplesCountColdStorage = val.Rows - } - case "signoz_index_v2": - if val.DiskName == "default" { - snap.CurrentSpansCount = val.Rows - } else { - snap.CurrentSpansCountColdStorage = val.Rows - } + for _, db := range dbs { + dbusages := []model.UsageDB{} + err := lm.clickhouseConn.Select(ctx, &dbusages, fmt.Sprintf(query, db, db), time.Now().Add(-(24 * time.Hour))) + if err != nil && !strings.Contains(err.Error(), "doesn't exist") { + return err + } + for _, u := range dbusages { + u.Type = db + usages = append(usages, u) } } - return &snap, nil -} - -func (lm *Manager) UploadUsage(ctx context.Context) error { - snapshots, err := lm.repository.GetSnapshotsNotSynced(ctx) - if err != nil { - return err - } - - if len(snapshots) <= 0 { + if len(usages) <= 0 { zap.S().Info("no snapshots to upload, skipping.") return nil } - zap.S().Info("uploading snapshots") - for _, snap := range snapshots { - metricsBytes, err := encryption.Decrypt([]byte(snap.ActivationId.String()[:32]), []byte(snap.Snapshot)) + zap.S().Info("uploading usage data") + + usagesPayload := []model.Usage{} + for _, usage := range usages { + usageDataBytes, err := encryption.Decrypt([]byte(usage.ExporterID[:32]), []byte(usage.Data)) if err != nil { return err } - metrics := model.UsageSnapshot{} - err = json.Unmarshal(metricsBytes, &metrics) + usageData := model.Usage{} + err = json.Unmarshal(usageDataBytes, &usageData) if err != nil { return err } - err = lm.UploadUsageWithExponentalBackOff(ctx, model.UsagePayload{ - UsageBase: model.UsageBase{ - Id: snap.Id, - InstallationId: snap.InstallationId, - ActivationId: snap.ActivationId, - FailedSyncRequest: snap.FailedSyncRequest, - }, - SnapshotDate: snap.CreatedAt, - Metrics: metrics, - }) - if err != nil { - return err - } + usageData.CollectorID = usage.CollectorID + usageData.ExporterID = usage.ExporterID + usageData.Type = usage.Type + usageData.Tenant = usage.Tenant + usagesPayload = append(usagesPayload, usageData) + } + + key, _ := uuid.Parse(license.Key) + payload := model.UsagePayload{ + LicenseKey: key, + Usage: usagesPayload, + } + err = lm.UploadUsageWithExponentalBackOff(ctx, payload) + if err != nil { + return err } return nil } func (lm *Manager) UploadUsageWithExponentalBackOff(ctx context.Context, payload model.UsagePayload) error { for i := 1; i <= MaxRetries; i++ { - apiErr := licenseserver.SendUsage(ctx, &payload) + apiErr := licenseserver.SendUsage(ctx, payload) if apiErr != nil && i == MaxRetries { - err := lm.repository.IncrementFailedRequestCount(ctx, payload.Id) - if err != nil { - zap.S().Errorf("failed to updated the failure count for snapshot in DB : ", zap.Error(err)) - return err - } - zap.S().Errorf("retries stopped : %v", zap.Error(err)) + zap.S().Errorf("retries stopped : %v", zap.Error(apiErr)) // not returning error here since it is captured in the failed count return nil } else if apiErr != nil { @@ -289,24 +179,10 @@ func (lm *Manager) UploadUsageWithExponentalBackOff(ctx context.Context, payload sleepDuration := RetryInterval * time.Duration(i) zap.S().Errorf("failed to upload snapshot retrying after %v secs : %v", sleepDuration.Seconds(), zap.Error(apiErr.Err)) time.Sleep(sleepDuration) - - // update the failed request count - err := lm.repository.IncrementFailedRequestCount(ctx, payload.Id) - if err != nil { - zap.S().Errorf("failed to updated the failure count for snapshot in DB : %v", zap.Error(err)) - return err - } } else { break } } - - // update the database that it is synced - err := lm.repository.MoveToSynced(ctx, payload.Id) - if err != nil { - return err - } - return nil } diff --git a/ee/query-service/usage/repository/repository.go b/ee/query-service/usage/repository/repository.go deleted file mode 100644 index 57bf5388b6..0000000000 --- a/ee/query-service/usage/repository/repository.go +++ /dev/null @@ -1,139 +0,0 @@ -package repository - -import ( - "context" - "encoding/json" - "fmt" - "time" - - "github.com/google/uuid" - "github.com/jmoiron/sqlx" - "go.uber.org/zap" - - "go.signoz.io/signoz/ee/query-service/model" - "go.signoz.io/signoz/ee/query-service/usage/sqlite" - "go.signoz.io/signoz/pkg/query-service/utils/encryption" -) - -const ( - MaxFailedSyncCount = 9 // a snapshot will be ignored if the max failed count is greater than or equal to 9 - SnapShotLife = 3 * 24 * time.Hour -) - -// Repository is usage Repository which stores usage snapshot in a secured DB -type Repository struct { - db *sqlx.DB -} - -// New initiates a new usage Repository -func New(db *sqlx.DB) *Repository { - return &Repository{ - db: db, - } -} - -func (r *Repository) Init(engine string) error { - switch engine { - case "sqlite3", "sqlite": - return sqlite.InitDB(r.db) - default: - return fmt.Errorf("unsupported db") - } -} - -func (r *Repository) InsertSnapshot(ctx context.Context, usage *model.UsagePayload) error { - - snapshotBytes, err := json.Marshal(usage.Metrics) - if err != nil { - return err - } - - usage.Id = uuid.New() - - encryptedSnapshot, err := encryption.Encrypt([]byte(usage.ActivationId.String()[:32]), snapshotBytes) - if err != nil { - return err - } - - query := `INSERT INTO usage(id, activation_id, snapshot) - VALUES ($1, $2, $3)` - _, err = r.db.ExecContext(ctx, - query, - usage.Id, - usage.ActivationId, - string(encryptedSnapshot), - ) - if err != nil { - zap.S().Errorf("error inserting usage data: %v", zap.Error(err)) - return fmt.Errorf("failed to insert usage in db: %v", err) - } - return nil -} - -func (r *Repository) MoveToSynced(ctx context.Context, id uuid.UUID) error { - - query := `UPDATE usage - SET synced = 'true', - synced_at = $1 - WHERE id = $2` - - _, err := r.db.ExecContext(ctx, query, time.Now(), id) - - if err != nil { - zap.S().Errorf("error in updating usage: %v", zap.Error(err)) - return fmt.Errorf("failed to update usage in db: %v", err) - } - - return nil -} - -func (r *Repository) IncrementFailedRequestCount(ctx context.Context, id uuid.UUID) error { - - query := `UPDATE usage SET failed_sync_request_count = failed_sync_request_count + 1 WHERE id = $1` - _, err := r.db.ExecContext(ctx, query, id) - if err != nil { - zap.S().Errorf("error in updating usage: %v", zap.Error(err)) - return fmt.Errorf("failed to update usage in db: %v", err) - } - - return nil -} - -func (r *Repository) GetSnapshotsNotSynced(ctx context.Context) ([]*model.Usage, error) { - snapshots := []*model.Usage{} - - query := `SELECT id,created_at, activation_id, snapshot, failed_sync_request_count from usage where synced!='true' and failed_sync_request_count < $1 order by created_at asc ` - - err := r.db.SelectContext(ctx, &snapshots, query, MaxFailedSyncCount) - if err != nil { - return nil, err - } - - return snapshots, nil -} - -func (r *Repository) DropOldSnapshots(ctx context.Context) error { - query := `delete from usage where created_at <= $1` - - _, err := r.db.ExecContext(ctx, query, time.Now().Add(-(SnapShotLife))) - if err != nil { - zap.S().Errorf("failed to remove old snapshots from db: %v", zap.Error(err)) - return err - } - - return nil -} - -// CheckSnapshotGtCreatedAt checks if there is any snapshot greater than the provided timestamp -func (r *Repository) CheckSnapshotGtCreatedAt(ctx context.Context, ts time.Time) (bool, error) { - - var snapshots uint64 - query := `SELECT count() from usage where created_at > '$1'` - - err := r.db.QueryRowContext(ctx, query, ts).Scan(&snapshots) - if err != nil { - return false, err - } - - return snapshots > 0, err -} diff --git a/ee/query-service/usage/sqlite/init.go b/ee/query-service/usage/sqlite/init.go deleted file mode 100644 index 4fefa644ae..0000000000 --- a/ee/query-service/usage/sqlite/init.go +++ /dev/null @@ -1,32 +0,0 @@ -package sqlite - -import ( - "fmt" - - "github.com/jmoiron/sqlx" -) - -func InitDB(db *sqlx.DB) error { - var err error - if db == nil { - return fmt.Errorf("invalid db connection") - } - - table_schema := `CREATE TABLE IF NOT EXISTS usage( - id UUID PRIMARY KEY, - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - activation_id UUID, - snapshot TEXT, - synced BOOLEAN DEFAULT 'false', - synced_at TIMESTAMP, - failed_sync_request_count INTEGER DEFAULT 0 - ); - ` - - _, err = db.Exec(table_schema) - if err != nil { - return fmt.Errorf("error in creating usage table: %v", err.Error()) - } - return nil -}