From cbdeb5ad03fedefaff43055944cfa82ceb8c81bc Mon Sep 17 00:00:00 2001 From: Ankit Nayan Date: Mon, 4 Jul 2022 17:13:36 +0530 Subject: [PATCH] chore: added metrics for analytics (#1356) --- .../app/clickhouseReader/reader.go | 74 +++++++++++++++++++ pkg/query-service/app/http_handler.go | 5 +- pkg/query-service/app/server.go | 5 +- .../{app => interfaces}/interface.go | 7 +- pkg/query-service/telemetry/ignoredPaths.go | 7 +- pkg/query-service/telemetry/telemetry.go | 26 ++++++- 6 files changed, 117 insertions(+), 7 deletions(-) rename pkg/query-service/{app => interfaces}/interface.go (92%) diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index 17f3e5b047..8a6e2e6d36 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -2906,3 +2906,77 @@ func (r *ClickHouseReader) GetMetricResult(ctx context.Context, query string) ([ } return seriesList, nil } + +func (r *ClickHouseReader) GetTotalSpans(ctx context.Context) (uint64, error) { + + var totalSpans uint64 + + queryStr := fmt.Sprintf("SELECT count() from %s.%s;", signozTraceDBName, signozTraceTableName) + r.db.QueryRow(ctx, queryStr).Scan(&totalSpans) + + return totalSpans, nil +} + +func (r *ClickHouseReader) GetSpansInLastHeartBeatInterval(ctx context.Context) (uint64, error) { + + var spansInLastHeartBeatInterval uint64 + + queryStr := fmt.Sprintf("SELECT count() from %s.%s where timestamp > toUnixTimestamp(now()-toIntervalMinute(%d));", signozTraceDBName, signozSpansTable, 30) + + r.db.QueryRow(ctx, queryStr).Scan(&spansInLastHeartBeatInterval) + + return spansInLastHeartBeatInterval, nil +} + +// func sum(array []tsByMetricName) uint64 { +// var result uint64 +// result = 0 +// for _, v := range array { +// result += v.count +// } +// return result +// } + +func (r *ClickHouseReader) GetTimeSeriesInfo(ctx context.Context) (map[string]interface{}, error) { + + queryStr := fmt.Sprintf("SELECT count() as count from %s.%s group by metric_name order by count desc;", signozMetricDBName, signozTSTableName) + + // r.db.Select(ctx, &tsByMetricName, queryStr) + + rows, _ := r.db.Query(ctx, queryStr) + + var totalTS uint64 + totalTS = 0 + + var maxTS uint64 + maxTS = 0 + + count := 0 + for rows.Next() { + + var value uint64 + rows.Scan(&value) + totalTS += value + if count == 0 { + maxTS = value + } + count += 1 + } + + timeSeriesData := map[string]interface{}{} + timeSeriesData["totalTS"] = totalTS + timeSeriesData["maxTS"] = maxTS + + return timeSeriesData, nil +} + +func (r *ClickHouseReader) GetSamplesInfoInLastHeartBeatInterval(ctx context.Context) (uint64, error) { + + var totalSamples uint64 + + queryStr := fmt.Sprintf("select count() from %s.%s where timestamp_ms > toUnixTimestamp(now()-toIntervalMinute(%d))*1000;", signozMetricDBName, signozSampleTableName, 30) + + r.db.QueryRow(ctx, queryStr).Scan(&totalSamples) + + return totalSamples, nil +} diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 400c9f2de4..51ff99a8a9 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -22,6 +22,7 @@ import ( "go.signoz.io/query-service/constants" "go.signoz.io/query-service/dao" am "go.signoz.io/query-service/integrations/alertManager" + "go.signoz.io/query-service/interfaces" "go.signoz.io/query-service/model" "go.signoz.io/query-service/telemetry" "go.signoz.io/query-service/version" @@ -46,14 +47,14 @@ type APIHandler struct { // queryParser queryParser basePath string apiPrefix string - reader *Reader + reader *interfaces.Reader relationalDB dao.ModelDao alertManager am.Manager ready func(http.HandlerFunc) http.HandlerFunc } // NewAPIHandler returns an APIHandler -func NewAPIHandler(reader *Reader, relationalDB dao.ModelDao) (*APIHandler, error) { +func NewAPIHandler(reader *interfaces.Reader, relationalDB dao.ModelDao) (*APIHandler, error) { alertManager := am.New("") aH := &APIHandler{ diff --git a/pkg/query-service/app/server.go b/pkg/query-service/app/server.go index 7904e813e2..5bccea66e2 100644 --- a/pkg/query-service/app/server.go +++ b/pkg/query-service/app/server.go @@ -19,6 +19,7 @@ import ( "go.signoz.io/query-service/constants" "go.signoz.io/query-service/dao" "go.signoz.io/query-service/healthcheck" + "go.signoz.io/query-service/interfaces" "go.signoz.io/query-service/telemetry" "go.signoz.io/query-service/utils" "go.uber.org/zap" @@ -65,7 +66,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { localDB.SetMaxOpenConns(10) - var reader Reader + var reader interfaces.Reader storage := os.Getenv("STORAGE") if storage == "clickhouse" { zap.S().Info("Using ClickHouse as datastore ...") @@ -76,6 +77,8 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { return nil, fmt.Errorf("Storage type: %s is not supported in query service", storage) } + telemetry.GetInstance().SetReader(reader) + apiHandler, err := NewAPIHandler(&reader, dao.DB()) if err != nil { return nil, err diff --git a/pkg/query-service/app/interface.go b/pkg/query-service/interfaces/interface.go similarity index 92% rename from pkg/query-service/app/interface.go rename to pkg/query-service/interfaces/interface.go index fadbcd1e79..9c52a4497d 100644 --- a/pkg/query-service/app/interface.go +++ b/pkg/query-service/interfaces/interface.go @@ -1,4 +1,4 @@ -package app +package interfaces import ( "context" @@ -54,4 +54,9 @@ type Reader interface { GetMetricAutocompleteTagKey(ctx context.Context, params *model.MetricAutocompleteTagParams) (*[]string, *model.ApiError) GetMetricAutocompleteTagValue(ctx context.Context, params *model.MetricAutocompleteTagParams) (*[]string, *model.ApiError) GetMetricResult(ctx context.Context, query string) ([]*model.Series, error) + + GetTotalSpans(ctx context.Context) (uint64, error) + GetSpansInLastHeartBeatInterval(ctx context.Context) (uint64, error) + GetTimeSeriesInfo(ctx context.Context) (map[string]interface{}, error) + GetSamplesInfoInLastHeartBeatInterval(ctx context.Context) (uint64, error) } diff --git a/pkg/query-service/telemetry/ignoredPaths.go b/pkg/query-service/telemetry/ignoredPaths.go index 00ceb1c79a..e564cce2a2 100644 --- a/pkg/query-service/telemetry/ignoredPaths.go +++ b/pkg/query-service/telemetry/ignoredPaths.go @@ -2,8 +2,11 @@ package telemetry func IgnoredPaths() map[string]struct{} { ignoredPaths := map[string]struct{}{ - "/api/v1/tags": {}, - "/api/v1/version": {}, + "/api/v1/tags": {}, + "/api/v1/version": {}, + "/api/v1/query_range": {}, + "/api/v2/metrics/query_range": {}, + "/api/v1/services/list": {}, } return ignoredPaths diff --git a/pkg/query-service/telemetry/telemetry.go b/pkg/query-service/telemetry/telemetry.go index 1f5360fa40..ca402e4da8 100644 --- a/pkg/query-service/telemetry/telemetry.go +++ b/pkg/query-service/telemetry/telemetry.go @@ -1,6 +1,7 @@ package telemetry import ( + "context" "io/ioutil" "net/http" "os" @@ -8,6 +9,7 @@ import ( "time" "go.signoz.io/query-service/constants" + "go.signoz.io/query-service/interfaces" "go.signoz.io/query-service/model" "go.signoz.io/query-service/version" "gopkg.in/segmentio/analytics-go.v3" @@ -25,6 +27,10 @@ const ( const api_key = "4Gmoa4ixJAUHx2BpJxsjwA1bEfnwEeRz" const IP_NOT_FOUND_PLACEHOLDER = "NA" +const HEART_BEAT_DURATION = 6 * time.Hour + +// const HEART_BEAT_DURATION = 10 * time.Second + var telemetry *Telemetry var once sync.Once @@ -34,6 +40,7 @@ type Telemetry struct { isEnabled bool isAnonymous bool distinctId string + reader interfaces.Reader } func createTelemetry() { @@ -46,11 +53,24 @@ func createTelemetry() { telemetry.SetTelemetryEnabled(constants.IsTelemetryEnabled()) telemetry.SendEvent(TELEMETRY_EVENT_HEART_BEAT, data) - ticker := time.NewTicker(6 * time.Hour) + ticker := time.NewTicker(HEART_BEAT_DURATION) go func() { for { select { case <-ticker.C: + totalSpans, _ := telemetry.reader.GetTotalSpans(context.Background()) + spansInLastHeartBeatInterval, _ := telemetry.reader.GetSpansInLastHeartBeatInterval(context.Background()) + getSamplesInfoInLastHeartBeatInterval, _ := telemetry.reader.GetSamplesInfoInLastHeartBeatInterval(context.Background()) + tsInfo, _ := telemetry.reader.GetTimeSeriesInfo(context.Background()) + + data := map[string]interface{}{ + "totalSpans": totalSpans, + "spansInLastHeartBeatInterval": spansInLastHeartBeatInterval, + "getSamplesInfoInLastHeartBeatInterval": getSamplesInfoInLastHeartBeatInterval, + } + for key, value := range tsInfo { + data[key] = value + } telemetry.SendEvent(TELEMETRY_EVENT_HEART_BEAT, data) } } @@ -153,6 +173,10 @@ func (a *Telemetry) SetTelemetryEnabled(value bool) { a.isEnabled = value } +func (a *Telemetry) SetReader(reader interfaces.Reader) { + a.reader = reader +} + func GetInstance() *Telemetry { once.Do(func() {