feat: usage collection updated for ee (#1654)

* feat: usage collection updated with new schema and logic

* fix: added exporter id and common collector id

* fix: upload usage only when license is present

* fix: handle if db doesn't exists

* fix: select query updated for usage collection to support distributed table

Co-authored-by: Pranay Prateek <pranay@signoz.io>
Co-authored-by: Vishal Sharma <makeavish786@gmail.com>
Co-authored-by: Ankit Nayan <ankit@signoz.io>
This commit is contained in:
Nityananda Gohain 2022-12-06 22:52:39 +05:30 committed by GitHub
parent 87932de668
commit 1c8626e933
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 104 additions and 391 deletions

View File

@ -20,6 +20,7 @@ import (
"go.signoz.io/signoz/ee/query-service/dao" "go.signoz.io/signoz/ee/query-service/dao"
"go.signoz.io/signoz/ee/query-service/interfaces" "go.signoz.io/signoz/ee/query-service/interfaces"
licensepkg "go.signoz.io/signoz/ee/query-service/license" licensepkg "go.signoz.io/signoz/ee/query-service/license"
"go.signoz.io/signoz/ee/query-service/usage"
"go.signoz.io/signoz/pkg/query-service/app/dashboards" "go.signoz.io/signoz/pkg/query-service/app/dashboards"
baseconst "go.signoz.io/signoz/pkg/query-service/constants" baseconst "go.signoz.io/signoz/pkg/query-service/constants"
@ -117,6 +118,16 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
return nil, err return nil, err
} }
// start the usagemanager
usageManager, err := usage.New("sqlite", localDB, lm.GetRepo(), reader.GetConn())
if err != nil {
return nil, err
}
err = usageManager.Start()
if err != nil {
return nil, err
}
telemetry.GetInstance().SetReader(reader) telemetry.GetInstance().SetReader(reader)
apiOpts := api.APIHandlerOptions{ apiOpts := api.APIHandlerOptions{

View File

@ -127,7 +127,7 @@ func NewPostRequestWithCtx(ctx context.Context, url string, contentType string,
} }
// SendUsage reports the usage of signoz to license server // SendUsage reports the usage of signoz to license server
func SendUsage(ctx context.Context, usage *model.UsagePayload) *model.ApiError { func SendUsage(ctx context.Context, usage model.UsagePayload) *model.ApiError {
reqString, _ := json.Marshal(usage) reqString, _ := json.Marshal(usage)
req, err := NewPostRequestWithCtx(ctx, C.Prefix+"/usage", APPLICATION_JSON, bytes.NewBuffer(reqString)) req, err := NewPostRequestWithCtx(ctx, C.Prefix+"/usage", APPLICATION_JSON, bytes.NewBuffer(reqString))
if err != nil { if err != nil {

View File

@ -6,30 +6,27 @@ import (
"github.com/google/uuid" "github.com/google/uuid"
) )
type UsageSnapshot struct {
CurrentLogSizeBytes uint64 `json:"currentLogSizeBytes"`
CurrentLogSizeBytesColdStorage uint64 `json:"currentLogSizeBytesColdStorage"`
CurrentSpansCount uint64 `json:"currentSpansCount"`
CurrentSpansCountColdStorage uint64 `json:"currentSpansCountColdStorage"`
CurrentSamplesCount uint64 `json:"currentSamplesCount"`
CurrentSamplesCountColdStorage uint64 `json:"currentSamplesCountColdStorage"`
}
type UsageBase struct {
Id uuid.UUID `json:"id" db:"id"`
InstallationId uuid.UUID `json:"installationId" db:"installation_id"`
ActivationId uuid.UUID `json:"activationId" db:"activation_id"`
CreatedAt time.Time `json:"createdAt" db:"created_at"`
FailedSyncRequest int `json:"failedSyncRequest" db:"failed_sync_request_count"`
}
type UsagePayload struct { type UsagePayload struct {
UsageBase InstallationId uuid.UUID `json:"installationId"`
Metrics UsageSnapshot `json:"metrics"` LicenseKey uuid.UUID `json:"licenseKey"`
SnapshotDate time.Time `json:"snapshotDate"` Usage []Usage `json:"usage"`
} }
type Usage struct { type Usage struct {
UsageBase CollectorID string `json:"collectorId"`
Snapshot string `db:"snapshot"` ExporterID string `json:"exporterId"`
Type string `json:"type"`
Tenant string `json:"tenant"`
TimeStamp time.Time `json:"timestamp"`
Count int64 `json:"count"`
Size int64 `json:"size"`
}
type UsageDB struct {
CollectorID string `ch:"collector_id" json:"collectorId"`
ExporterID string `ch:"exporter_id" json:"exporterId"`
Type string `ch:"-" json:"type"`
TimeStamp time.Time `ch:"timestamp" json:"timestamp"`
Tenant string `ch:"tenant" json:"tenant"`
Data string `ch:"data" json:"data"`
} }

View File

@ -4,18 +4,19 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"strings"
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/ClickHouse/clickhouse-go/v2" "github.com/ClickHouse/clickhouse-go/v2"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
"go.uber.org/zap" "go.uber.org/zap"
licenseserver "go.signoz.io/signoz/ee/query-service/integrations/signozio" licenseserver "go.signoz.io/signoz/ee/query-service/integrations/signozio"
"go.signoz.io/signoz/ee/query-service/license" "go.signoz.io/signoz/ee/query-service/license"
"go.signoz.io/signoz/ee/query-service/model" "go.signoz.io/signoz/ee/query-service/model"
"go.signoz.io/signoz/ee/query-service/usage/repository"
"go.signoz.io/signoz/pkg/query-service/utils/encryption" "go.signoz.io/signoz/pkg/query-service/utils/encryption"
) )
@ -27,9 +28,6 @@ const (
) )
var ( var (
// collect usage every hour
collectionFrequency = 1 * time.Hour
// send usage every 24 hour // send usage every 24 hour
uploadFrequency = 24 * time.Hour uploadFrequency = 24 * time.Hour
@ -37,8 +35,6 @@ var (
) )
type Manager struct { type Manager struct {
repository *repository.Repository
clickhouseConn clickhouse.Conn clickhouseConn clickhouse.Conn
licenseRepo *license.Repo licenseRepo *license.Repo
@ -52,15 +48,9 @@ type Manager struct {
} }
func New(dbType string, db *sqlx.DB, licenseRepo *license.Repo, clickhouseConn clickhouse.Conn) (*Manager, error) { func New(dbType string, db *sqlx.DB, licenseRepo *license.Repo, clickhouseConn clickhouse.Conn) (*Manager, error) {
repo := repository.New(db)
err := repo.Init(dbType)
if err != nil {
return nil, fmt.Errorf("failed to initiate usage repo: %v", err)
}
m := &Manager{ m := &Manager{
repository: repo, // repository: repo,
clickhouseConn: clickhouseConn, clickhouseConn: clickhouseConn,
licenseRepo: licenseRepo, licenseRepo: licenseRepo,
} }
@ -74,6 +64,28 @@ func (lm *Manager) Start() error {
return fmt.Errorf("usage exporter is locked") return fmt.Errorf("usage exporter is locked")
} }
go lm.UsageExporter(context.Background())
return nil
}
func (lm *Manager) UsageExporter(ctx context.Context) {
defer close(lm.terminated)
uploadTicker := time.NewTicker(uploadFrequency)
defer uploadTicker.Stop()
for {
select {
case <-lm.done:
return
case <-uploadTicker.C:
lm.UploadUsage(ctx)
}
}
}
func (lm *Manager) UploadUsage(ctx context.Context) error {
// check if license is present or not // check if license is present or not
license, err := lm.licenseRepo.GetActiveLicense(context.Background()) license, err := lm.licenseRepo.GetActiveLicense(context.Background())
if err != nil { if err != nil {
@ -85,203 +97,81 @@ func (lm *Manager) Start() error {
return nil return nil
} }
// upload previous snapshots if any usages := []model.UsageDB{}
err = lm.UploadUsage(context.Background())
if err != nil {
return err
}
// collect snapshot if incase it wasn't collect in (t - collectionFrequency)
err = lm.CollectCurrentUsage(context.Background())
if err != nil {
return err
}
go lm.UsageExporter(context.Background())
return nil
}
// CollectCurrentUsage checks if needs to collect usage data
func (lm *Manager) CollectCurrentUsage(ctx context.Context) error {
// check the DB if anything exist where timestamp > t - collectionFrequency
ts := time.Now().Add(-collectionFrequency)
alreadyCreated, err := lm.repository.CheckSnapshotGtCreatedAt(ctx, ts)
if err != nil {
return err
}
if !alreadyCreated {
zap.S().Info("Collecting current usage")
exportError := lm.CollectAndStoreUsage(ctx)
if exportError != nil {
return exportError
}
} else {
zap.S().Info("Nothing to collect")
}
return nil
}
func (lm *Manager) UsageExporter(ctx context.Context) {
defer close(lm.terminated)
collectionTicker := time.NewTicker(collectionFrequency)
defer collectionTicker.Stop()
uploadTicker := time.NewTicker(uploadFrequency)
defer uploadTicker.Stop()
for {
select {
case <-lm.done:
return
case <-collectionTicker.C:
lm.CollectAndStoreUsage(ctx)
case <-uploadTicker.C:
lm.UploadUsage(ctx)
// remove the old snapshots
lm.repository.DropOldSnapshots(ctx)
}
}
}
type TableSize struct {
Table string `ch:"table"`
DiskName string `ch:"disk_name"`
Rows uint64 `ch:"rows"`
UncompressedBytes uint64 `ch:"uncompressed_bytes"`
}
func (lm *Manager) CollectAndStoreUsage(ctx context.Context) error {
snap, err := lm.GetUsageFromClickHouse(ctx)
if err != nil {
return err
}
license, err := lm.licenseRepo.GetActiveLicense(ctx)
if err != nil {
return err
}
activationId, _ := uuid.Parse(license.ActivationId)
// TODO (nitya) : Add installation ID in the payload
payload := model.UsagePayload{
UsageBase: model.UsageBase{
ActivationId: activationId,
FailedSyncRequest: 0,
},
Metrics: *snap,
SnapshotDate: time.Now(),
}
err = lm.repository.InsertSnapshot(ctx, &payload)
if err != nil {
return err
}
return nil
}
func (lm *Manager) GetUsageFromClickHouse(ctx context.Context) (*model.UsageSnapshot, error) {
tableSizes := []TableSize{}
snap := model.UsageSnapshot{}
// get usage from clickhouse // get usage from clickhouse
dbs := []string{"signoz_logs", "signoz_traces", "signoz_metrics"}
query := ` query := `
SELECT SELECT tenant, collector_id, exporter_id, timestamp, data
table, FROM %s.distributed_usage as u1
disk_name, GLOBAL INNER JOIN
sum(rows) as rows, (SELECT
sum(data_uncompressed_bytes) AS uncompressed_bytes tenant, collector_id, exporter_id, MAX(timestamp) as ts
FROM system.parts FROM %s.distributed_usage as u2
WHERE active AND (database in ('signoz_logs', 'signoz_metrics', 'signoz_traces')) AND (table in ('logs','samples_v2', 'signoz_index_v2')) where timestamp >= $1
GROUP BY GROUP BY tenant, collector_id, exporter_id
table, ) as t1
disk_name ON
ORDER BY table u1.tenant = t1.tenant AND u1.collector_id = t1.collector_id AND u1.exporter_id = t1.exporter_id and u1.timestamp = t1.ts
order by timestamp
` `
err := lm.clickhouseConn.Select(ctx, &tableSizes, query)
if err != nil {
return nil, err
}
for _, val := range tableSizes { for _, db := range dbs {
switch val.Table { dbusages := []model.UsageDB{}
case "logs": err := lm.clickhouseConn.Select(ctx, &dbusages, fmt.Sprintf(query, db, db), time.Now().Add(-(24 * time.Hour)))
if val.DiskName == "default" { if err != nil && !strings.Contains(err.Error(), "doesn't exist") {
snap.CurrentLogSizeBytes = val.UncompressedBytes return err
} else { }
snap.CurrentLogSizeBytesColdStorage = val.UncompressedBytes for _, u := range dbusages {
} u.Type = db
case "samples_v2": usages = append(usages, u)
if val.DiskName == "default" {
snap.CurrentSamplesCount = val.Rows
} else {
snap.CurrentSamplesCountColdStorage = val.Rows
}
case "signoz_index_v2":
if val.DiskName == "default" {
snap.CurrentSpansCount = val.Rows
} else {
snap.CurrentSpansCountColdStorage = val.Rows
}
} }
} }
return &snap, nil if len(usages) <= 0 {
}
func (lm *Manager) UploadUsage(ctx context.Context) error {
snapshots, err := lm.repository.GetSnapshotsNotSynced(ctx)
if err != nil {
return err
}
if len(snapshots) <= 0 {
zap.S().Info("no snapshots to upload, skipping.") zap.S().Info("no snapshots to upload, skipping.")
return nil return nil
} }
zap.S().Info("uploading snapshots") zap.S().Info("uploading usage data")
for _, snap := range snapshots {
metricsBytes, err := encryption.Decrypt([]byte(snap.ActivationId.String()[:32]), []byte(snap.Snapshot)) usagesPayload := []model.Usage{}
for _, usage := range usages {
usageDataBytes, err := encryption.Decrypt([]byte(usage.ExporterID[:32]), []byte(usage.Data))
if err != nil { if err != nil {
return err return err
} }
metrics := model.UsageSnapshot{} usageData := model.Usage{}
err = json.Unmarshal(metricsBytes, &metrics) err = json.Unmarshal(usageDataBytes, &usageData)
if err != nil { if err != nil {
return err return err
} }
err = lm.UploadUsageWithExponentalBackOff(ctx, model.UsagePayload{ usageData.CollectorID = usage.CollectorID
UsageBase: model.UsageBase{ usageData.ExporterID = usage.ExporterID
Id: snap.Id, usageData.Type = usage.Type
InstallationId: snap.InstallationId, usageData.Tenant = usage.Tenant
ActivationId: snap.ActivationId, usagesPayload = append(usagesPayload, usageData)
FailedSyncRequest: snap.FailedSyncRequest, }
},
SnapshotDate: snap.CreatedAt, key, _ := uuid.Parse(license.Key)
Metrics: metrics, payload := model.UsagePayload{
}) LicenseKey: key,
if err != nil { Usage: usagesPayload,
return err }
} err = lm.UploadUsageWithExponentalBackOff(ctx, payload)
if err != nil {
return err
} }
return nil return nil
} }
func (lm *Manager) UploadUsageWithExponentalBackOff(ctx context.Context, payload model.UsagePayload) error { func (lm *Manager) UploadUsageWithExponentalBackOff(ctx context.Context, payload model.UsagePayload) error {
for i := 1; i <= MaxRetries; i++ { for i := 1; i <= MaxRetries; i++ {
apiErr := licenseserver.SendUsage(ctx, &payload) apiErr := licenseserver.SendUsage(ctx, payload)
if apiErr != nil && i == MaxRetries { if apiErr != nil && i == MaxRetries {
err := lm.repository.IncrementFailedRequestCount(ctx, payload.Id) zap.S().Errorf("retries stopped : %v", zap.Error(apiErr))
if err != nil {
zap.S().Errorf("failed to updated the failure count for snapshot in DB : ", zap.Error(err))
return err
}
zap.S().Errorf("retries stopped : %v", zap.Error(err))
// not returning error here since it is captured in the failed count // not returning error here since it is captured in the failed count
return nil return nil
} else if apiErr != nil { } else if apiErr != nil {
@ -289,24 +179,10 @@ func (lm *Manager) UploadUsageWithExponentalBackOff(ctx context.Context, payload
sleepDuration := RetryInterval * time.Duration(i) sleepDuration := RetryInterval * time.Duration(i)
zap.S().Errorf("failed to upload snapshot retrying after %v secs : %v", sleepDuration.Seconds(), zap.Error(apiErr.Err)) zap.S().Errorf("failed to upload snapshot retrying after %v secs : %v", sleepDuration.Seconds(), zap.Error(apiErr.Err))
time.Sleep(sleepDuration) time.Sleep(sleepDuration)
// update the failed request count
err := lm.repository.IncrementFailedRequestCount(ctx, payload.Id)
if err != nil {
zap.S().Errorf("failed to updated the failure count for snapshot in DB : %v", zap.Error(err))
return err
}
} else { } else {
break break
} }
} }
// update the database that it is synced
err := lm.repository.MoveToSynced(ctx, payload.Id)
if err != nil {
return err
}
return nil return nil
} }

View File

@ -1,139 +0,0 @@
package repository
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/google/uuid"
"github.com/jmoiron/sqlx"
"go.uber.org/zap"
"go.signoz.io/signoz/ee/query-service/model"
"go.signoz.io/signoz/ee/query-service/usage/sqlite"
"go.signoz.io/signoz/pkg/query-service/utils/encryption"
)
const (
MaxFailedSyncCount = 9 // a snapshot will be ignored if the max failed count is greater than or equal to 9
SnapShotLife = 3 * 24 * time.Hour
)
// Repository is usage Repository which stores usage snapshot in a secured DB
type Repository struct {
db *sqlx.DB
}
// New initiates a new usage Repository
func New(db *sqlx.DB) *Repository {
return &Repository{
db: db,
}
}
func (r *Repository) Init(engine string) error {
switch engine {
case "sqlite3", "sqlite":
return sqlite.InitDB(r.db)
default:
return fmt.Errorf("unsupported db")
}
}
func (r *Repository) InsertSnapshot(ctx context.Context, usage *model.UsagePayload) error {
snapshotBytes, err := json.Marshal(usage.Metrics)
if err != nil {
return err
}
usage.Id = uuid.New()
encryptedSnapshot, err := encryption.Encrypt([]byte(usage.ActivationId.String()[:32]), snapshotBytes)
if err != nil {
return err
}
query := `INSERT INTO usage(id, activation_id, snapshot)
VALUES ($1, $2, $3)`
_, err = r.db.ExecContext(ctx,
query,
usage.Id,
usage.ActivationId,
string(encryptedSnapshot),
)
if err != nil {
zap.S().Errorf("error inserting usage data: %v", zap.Error(err))
return fmt.Errorf("failed to insert usage in db: %v", err)
}
return nil
}
func (r *Repository) MoveToSynced(ctx context.Context, id uuid.UUID) error {
query := `UPDATE usage
SET synced = 'true',
synced_at = $1
WHERE id = $2`
_, err := r.db.ExecContext(ctx, query, time.Now(), id)
if err != nil {
zap.S().Errorf("error in updating usage: %v", zap.Error(err))
return fmt.Errorf("failed to update usage in db: %v", err)
}
return nil
}
func (r *Repository) IncrementFailedRequestCount(ctx context.Context, id uuid.UUID) error {
query := `UPDATE usage SET failed_sync_request_count = failed_sync_request_count + 1 WHERE id = $1`
_, err := r.db.ExecContext(ctx, query, id)
if err != nil {
zap.S().Errorf("error in updating usage: %v", zap.Error(err))
return fmt.Errorf("failed to update usage in db: %v", err)
}
return nil
}
func (r *Repository) GetSnapshotsNotSynced(ctx context.Context) ([]*model.Usage, error) {
snapshots := []*model.Usage{}
query := `SELECT id,created_at, activation_id, snapshot, failed_sync_request_count from usage where synced!='true' and failed_sync_request_count < $1 order by created_at asc `
err := r.db.SelectContext(ctx, &snapshots, query, MaxFailedSyncCount)
if err != nil {
return nil, err
}
return snapshots, nil
}
func (r *Repository) DropOldSnapshots(ctx context.Context) error {
query := `delete from usage where created_at <= $1`
_, err := r.db.ExecContext(ctx, query, time.Now().Add(-(SnapShotLife)))
if err != nil {
zap.S().Errorf("failed to remove old snapshots from db: %v", zap.Error(err))
return err
}
return nil
}
// CheckSnapshotGtCreatedAt checks if there is any snapshot greater than the provided timestamp
func (r *Repository) CheckSnapshotGtCreatedAt(ctx context.Context, ts time.Time) (bool, error) {
var snapshots uint64
query := `SELECT count() from usage where created_at > '$1'`
err := r.db.QueryRowContext(ctx, query, ts).Scan(&snapshots)
if err != nil {
return false, err
}
return snapshots > 0, err
}

View File

@ -1,32 +0,0 @@
package sqlite
import (
"fmt"
"github.com/jmoiron/sqlx"
)
func InitDB(db *sqlx.DB) error {
var err error
if db == nil {
return fmt.Errorf("invalid db connection")
}
table_schema := `CREATE TABLE IF NOT EXISTS usage(
id UUID PRIMARY KEY,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
activation_id UUID,
snapshot TEXT,
synced BOOLEAN DEFAULT 'false',
synced_at TIMESTAMP,
failed_sync_request_count INTEGER DEFAULT 0
);
`
_, err = db.Exec(table_schema)
if err != nil {
return fmt.Errorf("error in creating usage table: %v", err.Error())
}
return nil
}