mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-10-18 20:11:30 +08:00
318 lines
7.8 KiB
Go
318 lines
7.8 KiB
Go
package usage
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/ClickHouse/clickhouse-go/v2"
|
|
"github.com/google/uuid"
|
|
"github.com/jmoiron/sqlx"
|
|
"go.uber.org/zap"
|
|
|
|
licenseserver "go.signoz.io/signoz/ee/query-service/integrations/signozio"
|
|
"go.signoz.io/signoz/ee/query-service/license"
|
|
"go.signoz.io/signoz/ee/query-service/model"
|
|
"go.signoz.io/signoz/ee/query-service/usage/repository"
|
|
"go.signoz.io/signoz/pkg/query-service/utils/encryption"
|
|
)
|
|
|
|
const (
|
|
MaxRetries = 3
|
|
RetryInterval = 5 * time.Second
|
|
stateUnlocked uint32 = 0
|
|
stateLocked uint32 = 1
|
|
)
|
|
|
|
var (
|
|
// collect usage every hour
|
|
collectionFrequency = 1 * time.Hour
|
|
|
|
// send usage every 24 hour
|
|
uploadFrequency = 24 * time.Hour
|
|
|
|
locker = stateUnlocked
|
|
)
|
|
|
|
type Manager struct {
|
|
repository *repository.Repository
|
|
|
|
clickhouseConn clickhouse.Conn
|
|
|
|
licenseRepo *license.Repo
|
|
|
|
// end the usage routine, this is important to gracefully
|
|
// stopping usage reporting and protect in-consistent updates
|
|
done chan struct{}
|
|
|
|
// terminated waits for the UsageExporter go routine to end
|
|
terminated chan struct{}
|
|
}
|
|
|
|
func New(dbType string, db *sqlx.DB, licenseRepo *license.Repo, clickhouseConn clickhouse.Conn) (*Manager, error) {
|
|
repo := repository.New(db)
|
|
|
|
err := repo.Init(dbType)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to initiate usage repo: %v", err)
|
|
}
|
|
|
|
m := &Manager{
|
|
repository: repo,
|
|
clickhouseConn: clickhouseConn,
|
|
licenseRepo: licenseRepo,
|
|
}
|
|
return m, nil
|
|
}
|
|
|
|
// start loads collects and exports any exported snapshot and starts the exporter
|
|
func (lm *Manager) Start() error {
|
|
// compares the locker and stateUnlocked if both are same lock is applied else returns error
|
|
if !atomic.CompareAndSwapUint32(&locker, stateUnlocked, stateLocked) {
|
|
return fmt.Errorf("usage exporter is locked")
|
|
}
|
|
|
|
// check if license is present or not
|
|
license, err := lm.licenseRepo.GetActiveLicense(context.Background())
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get active license")
|
|
}
|
|
if license == nil {
|
|
// we will not start the usage reporting if license is not present.
|
|
zap.S().Info("no license present, skipping usage reporting")
|
|
return nil
|
|
}
|
|
|
|
// upload previous snapshots if any
|
|
err = lm.UploadUsage(context.Background())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// collect snapshot if incase it wasn't collect in (t - collectionFrequency)
|
|
err = lm.CollectCurrentUsage(context.Background())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
go lm.UsageExporter(context.Background())
|
|
|
|
return nil
|
|
}
|
|
|
|
// CollectCurrentUsage checks if needs to collect usage data
|
|
func (lm *Manager) CollectCurrentUsage(ctx context.Context) error {
|
|
// check the DB if anything exist where timestamp > t - collectionFrequency
|
|
ts := time.Now().Add(-collectionFrequency)
|
|
alreadyCreated, err := lm.repository.CheckSnapshotGtCreatedAt(ctx, ts)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !alreadyCreated {
|
|
zap.S().Info("Collecting current usage")
|
|
exportError := lm.CollectAndStoreUsage(ctx)
|
|
if exportError != nil {
|
|
return exportError
|
|
}
|
|
} else {
|
|
zap.S().Info("Nothing to collect")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (lm *Manager) UsageExporter(ctx context.Context) {
|
|
defer close(lm.terminated)
|
|
|
|
collectionTicker := time.NewTicker(collectionFrequency)
|
|
defer collectionTicker.Stop()
|
|
|
|
uploadTicker := time.NewTicker(uploadFrequency)
|
|
defer uploadTicker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-lm.done:
|
|
return
|
|
case <-collectionTicker.C:
|
|
lm.CollectAndStoreUsage(ctx)
|
|
case <-uploadTicker.C:
|
|
lm.UploadUsage(ctx)
|
|
// remove the old snapshots
|
|
lm.repository.DropOldSnapshots(ctx)
|
|
}
|
|
}
|
|
}
|
|
|
|
type TableSize struct {
|
|
Table string `ch:"table"`
|
|
DiskName string `ch:"disk_name"`
|
|
Rows uint64 `ch:"rows"`
|
|
UncompressedBytes uint64 `ch:"uncompressed_bytes"`
|
|
}
|
|
|
|
func (lm *Manager) CollectAndStoreUsage(ctx context.Context) error {
|
|
snap, err := lm.GetUsageFromClickHouse(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
license, err := lm.licenseRepo.GetActiveLicense(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
activationId, _ := uuid.Parse(license.ActivationId)
|
|
// TODO (nitya) : Add installation ID in the payload
|
|
payload := model.UsagePayload{
|
|
UsageBase: model.UsageBase{
|
|
ActivationId: activationId,
|
|
FailedSyncRequest: 0,
|
|
},
|
|
Metrics: *snap,
|
|
SnapshotDate: time.Now(),
|
|
}
|
|
|
|
err = lm.repository.InsertSnapshot(ctx, &payload)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (lm *Manager) GetUsageFromClickHouse(ctx context.Context) (*model.UsageSnapshot, error) {
|
|
tableSizes := []TableSize{}
|
|
snap := model.UsageSnapshot{}
|
|
|
|
// get usage from clickhouse
|
|
query := `
|
|
SELECT
|
|
table,
|
|
disk_name,
|
|
sum(rows) as rows,
|
|
sum(data_uncompressed_bytes) AS uncompressed_bytes
|
|
FROM system.parts
|
|
WHERE active AND (database in ('signoz_logs', 'signoz_metrics', 'signoz_traces')) AND (table in ('logs','samples_v2', 'signoz_index_v2'))
|
|
GROUP BY
|
|
table,
|
|
disk_name
|
|
ORDER BY table
|
|
`
|
|
err := lm.clickhouseConn.Select(ctx, &tableSizes, query)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for _, val := range tableSizes {
|
|
switch val.Table {
|
|
case "logs":
|
|
if val.DiskName == "default" {
|
|
snap.CurrentLogSizeBytes = val.UncompressedBytes
|
|
} else {
|
|
snap.CurrentLogSizeBytesColdStorage = val.UncompressedBytes
|
|
}
|
|
case "samples_v2":
|
|
if val.DiskName == "default" {
|
|
snap.CurrentSamplesCount = val.Rows
|
|
} else {
|
|
snap.CurrentSamplesCountColdStorage = val.Rows
|
|
}
|
|
case "signoz_index_v2":
|
|
if val.DiskName == "default" {
|
|
snap.CurrentSpansCount = val.Rows
|
|
} else {
|
|
snap.CurrentSpansCountColdStorage = val.Rows
|
|
}
|
|
}
|
|
}
|
|
|
|
return &snap, nil
|
|
}
|
|
|
|
func (lm *Manager) UploadUsage(ctx context.Context) error {
|
|
snapshots, err := lm.repository.GetSnapshotsNotSynced(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if len(snapshots) <= 0 {
|
|
zap.S().Info("no snapshots to upload, skipping.")
|
|
return nil
|
|
}
|
|
|
|
zap.S().Info("uploading snapshots")
|
|
for _, snap := range snapshots {
|
|
metricsBytes, err := encryption.Decrypt([]byte(snap.ActivationId.String()[:32]), []byte(snap.Snapshot))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
metrics := model.UsageSnapshot{}
|
|
err = json.Unmarshal(metricsBytes, &metrics)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = lm.UploadUsageWithExponentalBackOff(ctx, model.UsagePayload{
|
|
UsageBase: model.UsageBase{
|
|
Id: snap.Id,
|
|
InstallationId: snap.InstallationId,
|
|
ActivationId: snap.ActivationId,
|
|
FailedSyncRequest: snap.FailedSyncRequest,
|
|
},
|
|
SnapshotDate: snap.CreatedAt,
|
|
Metrics: metrics,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (lm *Manager) UploadUsageWithExponentalBackOff(ctx context.Context, payload model.UsagePayload) error {
|
|
for i := 1; i <= MaxRetries; i++ {
|
|
apiErr := licenseserver.SendUsage(ctx, &payload)
|
|
if apiErr != nil && i == MaxRetries {
|
|
err := lm.repository.IncrementFailedRequestCount(ctx, payload.Id)
|
|
if err != nil {
|
|
zap.S().Errorf("failed to updated the failure count for snapshot in DB : ", zap.Error(err))
|
|
return err
|
|
}
|
|
zap.S().Errorf("retries stopped : %v", zap.Error(err))
|
|
// not returning error here since it is captured in the failed count
|
|
return nil
|
|
} else if apiErr != nil {
|
|
// sleeping for exponential backoff
|
|
sleepDuration := RetryInterval * time.Duration(i)
|
|
zap.S().Errorf("failed to upload snapshot retrying after %v secs : %v", sleepDuration.Seconds(), zap.Error(apiErr.Err))
|
|
time.Sleep(sleepDuration)
|
|
|
|
// update the failed request count
|
|
err := lm.repository.IncrementFailedRequestCount(ctx, payload.Id)
|
|
if err != nil {
|
|
zap.S().Errorf("failed to updated the failure count for snapshot in DB : %v", zap.Error(err))
|
|
return err
|
|
}
|
|
} else {
|
|
break
|
|
}
|
|
}
|
|
|
|
// update the database that it is synced
|
|
err := lm.repository.MoveToSynced(ctx, payload.Id)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (lm *Manager) Stop() {
|
|
close(lm.done)
|
|
atomic.StoreUint32(&locker, stateUnlocked)
|
|
<-lm.terminated
|
|
}
|