chore: added metrics for analytics (#1356)

This commit is contained in:
Ankit Nayan 2022-07-04 17:13:36 +05:30 committed by GitHub
parent cf0eb44143
commit cbdeb5ad03
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 117 additions and 7 deletions

View File

@ -2906,3 +2906,77 @@ func (r *ClickHouseReader) GetMetricResult(ctx context.Context, query string) ([
} }
return seriesList, nil return seriesList, nil
} }
func (r *ClickHouseReader) GetTotalSpans(ctx context.Context) (uint64, error) {
var totalSpans uint64
queryStr := fmt.Sprintf("SELECT count() from %s.%s;", signozTraceDBName, signozTraceTableName)
r.db.QueryRow(ctx, queryStr).Scan(&totalSpans)
return totalSpans, nil
}
func (r *ClickHouseReader) GetSpansInLastHeartBeatInterval(ctx context.Context) (uint64, error) {
var spansInLastHeartBeatInterval uint64
queryStr := fmt.Sprintf("SELECT count() from %s.%s where timestamp > toUnixTimestamp(now()-toIntervalMinute(%d));", signozTraceDBName, signozSpansTable, 30)
r.db.QueryRow(ctx, queryStr).Scan(&spansInLastHeartBeatInterval)
return spansInLastHeartBeatInterval, nil
}
// func sum(array []tsByMetricName) uint64 {
// var result uint64
// result = 0
// for _, v := range array {
// result += v.count
// }
// return result
// }
func (r *ClickHouseReader) GetTimeSeriesInfo(ctx context.Context) (map[string]interface{}, error) {
queryStr := fmt.Sprintf("SELECT count() as count from %s.%s group by metric_name order by count desc;", signozMetricDBName, signozTSTableName)
// r.db.Select(ctx, &tsByMetricName, queryStr)
rows, _ := r.db.Query(ctx, queryStr)
var totalTS uint64
totalTS = 0
var maxTS uint64
maxTS = 0
count := 0
for rows.Next() {
var value uint64
rows.Scan(&value)
totalTS += value
if count == 0 {
maxTS = value
}
count += 1
}
timeSeriesData := map[string]interface{}{}
timeSeriesData["totalTS"] = totalTS
timeSeriesData["maxTS"] = maxTS
return timeSeriesData, nil
}
func (r *ClickHouseReader) GetSamplesInfoInLastHeartBeatInterval(ctx context.Context) (uint64, error) {
var totalSamples uint64
queryStr := fmt.Sprintf("select count() from %s.%s where timestamp_ms > toUnixTimestamp(now()-toIntervalMinute(%d))*1000;", signozMetricDBName, signozSampleTableName, 30)
r.db.QueryRow(ctx, queryStr).Scan(&totalSamples)
return totalSamples, nil
}

View File

@ -22,6 +22,7 @@ import (
"go.signoz.io/query-service/constants" "go.signoz.io/query-service/constants"
"go.signoz.io/query-service/dao" "go.signoz.io/query-service/dao"
am "go.signoz.io/query-service/integrations/alertManager" am "go.signoz.io/query-service/integrations/alertManager"
"go.signoz.io/query-service/interfaces"
"go.signoz.io/query-service/model" "go.signoz.io/query-service/model"
"go.signoz.io/query-service/telemetry" "go.signoz.io/query-service/telemetry"
"go.signoz.io/query-service/version" "go.signoz.io/query-service/version"
@ -46,14 +47,14 @@ type APIHandler struct {
// queryParser queryParser // queryParser queryParser
basePath string basePath string
apiPrefix string apiPrefix string
reader *Reader reader *interfaces.Reader
relationalDB dao.ModelDao relationalDB dao.ModelDao
alertManager am.Manager alertManager am.Manager
ready func(http.HandlerFunc) http.HandlerFunc ready func(http.HandlerFunc) http.HandlerFunc
} }
// NewAPIHandler returns an APIHandler // NewAPIHandler returns an APIHandler
func NewAPIHandler(reader *Reader, relationalDB dao.ModelDao) (*APIHandler, error) { func NewAPIHandler(reader *interfaces.Reader, relationalDB dao.ModelDao) (*APIHandler, error) {
alertManager := am.New("") alertManager := am.New("")
aH := &APIHandler{ aH := &APIHandler{

View File

@ -19,6 +19,7 @@ import (
"go.signoz.io/query-service/constants" "go.signoz.io/query-service/constants"
"go.signoz.io/query-service/dao" "go.signoz.io/query-service/dao"
"go.signoz.io/query-service/healthcheck" "go.signoz.io/query-service/healthcheck"
"go.signoz.io/query-service/interfaces"
"go.signoz.io/query-service/telemetry" "go.signoz.io/query-service/telemetry"
"go.signoz.io/query-service/utils" "go.signoz.io/query-service/utils"
"go.uber.org/zap" "go.uber.org/zap"
@ -65,7 +66,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
localDB.SetMaxOpenConns(10) localDB.SetMaxOpenConns(10)
var reader Reader var reader interfaces.Reader
storage := os.Getenv("STORAGE") storage := os.Getenv("STORAGE")
if storage == "clickhouse" { if storage == "clickhouse" {
zap.S().Info("Using ClickHouse as datastore ...") zap.S().Info("Using ClickHouse as datastore ...")
@ -76,6 +77,8 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
return nil, fmt.Errorf("Storage type: %s is not supported in query service", storage) return nil, fmt.Errorf("Storage type: %s is not supported in query service", storage)
} }
telemetry.GetInstance().SetReader(reader)
apiHandler, err := NewAPIHandler(&reader, dao.DB()) apiHandler, err := NewAPIHandler(&reader, dao.DB())
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -1,4 +1,4 @@
package app package interfaces
import ( import (
"context" "context"
@ -54,4 +54,9 @@ type Reader interface {
GetMetricAutocompleteTagKey(ctx context.Context, params *model.MetricAutocompleteTagParams) (*[]string, *model.ApiError) GetMetricAutocompleteTagKey(ctx context.Context, params *model.MetricAutocompleteTagParams) (*[]string, *model.ApiError)
GetMetricAutocompleteTagValue(ctx context.Context, params *model.MetricAutocompleteTagParams) (*[]string, *model.ApiError) GetMetricAutocompleteTagValue(ctx context.Context, params *model.MetricAutocompleteTagParams) (*[]string, *model.ApiError)
GetMetricResult(ctx context.Context, query string) ([]*model.Series, error) GetMetricResult(ctx context.Context, query string) ([]*model.Series, error)
GetTotalSpans(ctx context.Context) (uint64, error)
GetSpansInLastHeartBeatInterval(ctx context.Context) (uint64, error)
GetTimeSeriesInfo(ctx context.Context) (map[string]interface{}, error)
GetSamplesInfoInLastHeartBeatInterval(ctx context.Context) (uint64, error)
} }

View File

@ -2,8 +2,11 @@ package telemetry
func IgnoredPaths() map[string]struct{} { func IgnoredPaths() map[string]struct{} {
ignoredPaths := map[string]struct{}{ ignoredPaths := map[string]struct{}{
"/api/v1/tags": {}, "/api/v1/tags": {},
"/api/v1/version": {}, "/api/v1/version": {},
"/api/v1/query_range": {},
"/api/v2/metrics/query_range": {},
"/api/v1/services/list": {},
} }
return ignoredPaths return ignoredPaths

View File

@ -1,6 +1,7 @@
package telemetry package telemetry
import ( import (
"context"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"os" "os"
@ -8,6 +9,7 @@ import (
"time" "time"
"go.signoz.io/query-service/constants" "go.signoz.io/query-service/constants"
"go.signoz.io/query-service/interfaces"
"go.signoz.io/query-service/model" "go.signoz.io/query-service/model"
"go.signoz.io/query-service/version" "go.signoz.io/query-service/version"
"gopkg.in/segmentio/analytics-go.v3" "gopkg.in/segmentio/analytics-go.v3"
@ -25,6 +27,10 @@ const (
const api_key = "4Gmoa4ixJAUHx2BpJxsjwA1bEfnwEeRz" const api_key = "4Gmoa4ixJAUHx2BpJxsjwA1bEfnwEeRz"
const IP_NOT_FOUND_PLACEHOLDER = "NA" const IP_NOT_FOUND_PLACEHOLDER = "NA"
const HEART_BEAT_DURATION = 6 * time.Hour
// const HEART_BEAT_DURATION = 10 * time.Second
var telemetry *Telemetry var telemetry *Telemetry
var once sync.Once var once sync.Once
@ -34,6 +40,7 @@ type Telemetry struct {
isEnabled bool isEnabled bool
isAnonymous bool isAnonymous bool
distinctId string distinctId string
reader interfaces.Reader
} }
func createTelemetry() { func createTelemetry() {
@ -46,11 +53,24 @@ func createTelemetry() {
telemetry.SetTelemetryEnabled(constants.IsTelemetryEnabled()) telemetry.SetTelemetryEnabled(constants.IsTelemetryEnabled())
telemetry.SendEvent(TELEMETRY_EVENT_HEART_BEAT, data) telemetry.SendEvent(TELEMETRY_EVENT_HEART_BEAT, data)
ticker := time.NewTicker(6 * time.Hour) ticker := time.NewTicker(HEART_BEAT_DURATION)
go func() { go func() {
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
totalSpans, _ := telemetry.reader.GetTotalSpans(context.Background())
spansInLastHeartBeatInterval, _ := telemetry.reader.GetSpansInLastHeartBeatInterval(context.Background())
getSamplesInfoInLastHeartBeatInterval, _ := telemetry.reader.GetSamplesInfoInLastHeartBeatInterval(context.Background())
tsInfo, _ := telemetry.reader.GetTimeSeriesInfo(context.Background())
data := map[string]interface{}{
"totalSpans": totalSpans,
"spansInLastHeartBeatInterval": spansInLastHeartBeatInterval,
"getSamplesInfoInLastHeartBeatInterval": getSamplesInfoInLastHeartBeatInterval,
}
for key, value := range tsInfo {
data[key] = value
}
telemetry.SendEvent(TELEMETRY_EVENT_HEART_BEAT, data) telemetry.SendEvent(TELEMETRY_EVENT_HEART_BEAT, data)
} }
} }
@ -153,6 +173,10 @@ func (a *Telemetry) SetTelemetryEnabled(value bool) {
a.isEnabled = value a.isEnabled = value
} }
func (a *Telemetry) SetReader(reader interfaces.Reader) {
a.reader = reader
}
func GetInstance() *Telemetry { func GetInstance() *Telemetry {
once.Do(func() { once.Do(func() {