chore: update heartbeat, language event and add serviceName event (#4571)

* chore: update heartbeat, language event and add serviceName event

* chore: update tagsInfo
This commit is contained in:
Vishal Sharma 2024-02-21 14:49:33 +05:30 committed by GitHub
parent ecd5ce92c2
commit 0cb60e1c10
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 63 additions and 21 deletions

View File

@ -3273,17 +3273,27 @@ func (r *ClickHouseReader) GetTotalSpans(ctx context.Context) (uint64, error) {
return totalSpans, nil
}
func (r *ClickHouseReader) GetSpansInLastHeartBeatInterval(ctx context.Context) (uint64, error) {
func (r *ClickHouseReader) GetSpansInLastHeartBeatInterval(ctx context.Context, interval time.Duration) (uint64, error) {
var spansInLastHeartBeatInterval uint64
queryStr := fmt.Sprintf("SELECT count() from %s.%s where timestamp > toUnixTimestamp(now()-toIntervalMinute(%d));", signozTraceDBName, signozSpansTable, 30)
queryStr := fmt.Sprintf("SELECT count() from %s.%s where timestamp > toUnixTimestamp(now()-toIntervalMinute(%d));", signozTraceDBName, signozSpansTable, int(interval.Minutes()))
r.db.QueryRow(ctx, queryStr).Scan(&spansInLastHeartBeatInterval)
return spansInLastHeartBeatInterval, nil
}
func (r *ClickHouseReader) GetTotalLogs(ctx context.Context) (uint64, error) {
var totalLogs uint64
queryStr := fmt.Sprintf("SELECT count() from %s.%s;", r.logsDB, r.logsTable)
r.db.QueryRow(ctx, queryStr).Scan(&totalLogs)
return totalLogs, nil
}
func (r *ClickHouseReader) FetchTemporality(ctx context.Context, metricNames []string) (map[string]map[v3.Temporality]bool, error) {
metricNameToTemporality := make(map[string]map[v3.Temporality]bool)
@ -3312,9 +3322,7 @@ func (r *ClickHouseReader) FetchTemporality(ctx context.Context, metricNames []s
func (r *ClickHouseReader) GetTimeSeriesInfo(ctx context.Context) (map[string]interface{}, error) {
queryStr := fmt.Sprintf("SELECT count() as count from %s.%s group by metric_name order by count desc;", signozMetricDBName, signozTSTableName)
// r.db.Select(ctx, &tsByMetricName, queryStr)
queryStr := fmt.Sprintf("SELECT count() as count from %s.%s where metric_name not like 'signoz_%%' group by metric_name order by count desc;", signozMetricDBName, signozTSTableName)
rows, _ := r.db.Query(ctx, queryStr)
@ -3343,11 +3351,21 @@ func (r *ClickHouseReader) GetTimeSeriesInfo(ctx context.Context) (map[string]in
return timeSeriesData, nil
}
func (r *ClickHouseReader) GetSamplesInfoInLastHeartBeatInterval(ctx context.Context) (uint64, error) {
func (r *ClickHouseReader) GetSamplesInfoInLastHeartBeatInterval(ctx context.Context, interval time.Duration) (uint64, error) {
var totalSamples uint64
queryStr := fmt.Sprintf("select count() from %s.%s where timestamp_ms > toUnixTimestamp(now()-toIntervalMinute(%d))*1000;", signozMetricDBName, signozSampleTableName, 30)
queryStr := fmt.Sprintf("select count() from %s.%s where metric_name not like 'signoz_%%' and timestamp_ms > toUnixTimestamp(now()-toIntervalMinute(%d))*1000;", signozMetricDBName, signozSampleTableName, int(interval.Minutes()))
r.db.QueryRow(ctx, queryStr).Scan(&totalSamples)
return totalSamples, nil
}
func (r *ClickHouseReader) GetTotalSamples(ctx context.Context) (uint64, error) {
var totalSamples uint64
queryStr := fmt.Sprintf("select count() from %s.%s where metric_name not like 'signoz_%%';", signozMetricDBName, signozSampleTableName)
r.db.QueryRow(ctx, queryStr).Scan(&totalSamples)
@ -3367,23 +3385,23 @@ func (r *ClickHouseReader) GetDistributedInfoInLastHeartBeatInterval(ctx context
return nil, nil
}
func (r *ClickHouseReader) GetLogsInfoInLastHeartBeatInterval(ctx context.Context) (uint64, error) {
func (r *ClickHouseReader) GetLogsInfoInLastHeartBeatInterval(ctx context.Context, interval time.Duration) (uint64, error) {
var totalLogLines uint64
queryStr := fmt.Sprintf("select count() from %s.%s where timestamp > toUnixTimestamp(now()-toIntervalMinute(%d))*1000000000;", r.logsDB, r.logsTable, 30)
queryStr := fmt.Sprintf("select count() from %s.%s where timestamp > toUnixTimestamp(now()-toIntervalMinute(%d))*1000000000;", r.logsDB, r.logsTable, int(interval.Minutes()))
err := r.db.QueryRow(ctx, queryStr).Scan(&totalLogLines)
return totalLogLines, err
}
func (r *ClickHouseReader) GetTagsInfoInLastHeartBeatInterval(ctx context.Context) (*model.TagsInfo, error) {
func (r *ClickHouseReader) GetTagsInfoInLastHeartBeatInterval(ctx context.Context, interval time.Duration) (*model.TagsInfo, error) {
queryStr := fmt.Sprintf(`select serviceName, stringTagMap['deployment.environment'] as env,
stringTagMap['telemetry.sdk.language'] as language from %s.%s
where timestamp > toUnixTimestamp(now()-toIntervalMinute(%d))
group by serviceName, env, language;`, r.TraceDB, r.indexTable, 1)
group by serviceName, env, language;`, r.TraceDB, r.indexTable, int(interval.Minutes()))
tagTelemetryDataList := []model.TagTelemetryData{}
err := r.db.Select(ctx, &tagTelemetryDataList, queryStr)
@ -3396,6 +3414,7 @@ func (r *ClickHouseReader) GetTagsInfoInLastHeartBeatInterval(ctx context.Contex
tagsInfo := model.TagsInfo{
Languages: make(map[string]interface{}),
Services: make(map[string]interface{}),
}
for _, tagTelemetryData := range tagTelemetryDataList {
@ -3409,6 +3428,9 @@ func (r *ClickHouseReader) GetTagsInfoInLastHeartBeatInterval(ctx context.Contex
if len(tagTelemetryData.Language) != 0 {
tagsInfo.Languages[tagTelemetryData.Language] = struct{}{}
}
if len(tagTelemetryData.ServiceName) != 0 {
tagsInfo.Services[tagTelemetryData.ServiceName] = struct{}{}
}
}

View File

@ -2,6 +2,7 @@ package interfaces
import (
"context"
"time"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/prometheus/prometheus/promql"
@ -74,11 +75,13 @@ type Reader interface {
GetDashboardsInfo(ctx context.Context) (*model.DashboardsInfo, error)
GetAlertsInfo(ctx context.Context) (*model.AlertsInfo, error)
GetTotalSpans(ctx context.Context) (uint64, error)
GetSpansInLastHeartBeatInterval(ctx context.Context) (uint64, error)
GetTotalLogs(ctx context.Context) (uint64, error)
GetTotalSamples(ctx context.Context) (uint64, error)
GetSpansInLastHeartBeatInterval(ctx context.Context, interval time.Duration) (uint64, error)
GetTimeSeriesInfo(ctx context.Context) (map[string]interface{}, error)
GetSamplesInfoInLastHeartBeatInterval(ctx context.Context) (uint64, error)
GetLogsInfoInLastHeartBeatInterval(ctx context.Context) (uint64, error)
GetTagsInfoInLastHeartBeatInterval(ctx context.Context) (*model.TagsInfo, error)
GetSamplesInfoInLastHeartBeatInterval(ctx context.Context, interval time.Duration) (uint64, error)
GetLogsInfoInLastHeartBeatInterval(ctx context.Context, interval time.Duration) (uint64, error)
GetTagsInfoInLastHeartBeatInterval(ctx context.Context, interval time.Duration) (*model.TagsInfo, error)
GetDistributedInfoInLastHeartBeatInterval(ctx context.Context) (map[string]interface{}, error)
// Logs
GetLogFields(ctx context.Context) (*model.GetFieldsResponse, *model.ApiError)

View File

@ -613,6 +613,7 @@ type DashboardVar struct {
type TagsInfo struct {
Languages map[string]interface{} `json:"languages"`
Env string `json:"env"`
Services map[string]interface{} `json:"services"`
}
type AlertsInfo struct {

View File

@ -35,6 +35,7 @@ const (
TELEMETRY_LICENSE_ACT_FAILED = "License Activation Failed"
TELEMETRY_EVENT_ENVIRONMENT = "Environment"
TELEMETRY_EVENT_LANGUAGE = "Language"
TELEMETRY_EVENT_SERVICE = "ServiceName"
TELEMETRY_EVENT_LOGS_FILTERS = "Logs Filters"
TELEMETRY_EVENT_DISTRIBUTED = "Distributed"
TELEMETRY_EVENT_QUERY_RANGE_V3 = "Query Range V3 Metadata"
@ -51,6 +52,7 @@ var SAAS_EVENTS_LIST = map[string]struct{}{
TELEMETRY_EVENT_ACTIVE_USER: {},
TELEMETRY_EVENT_HEART_BEAT: {},
TELEMETRY_EVENT_LANGUAGE: {},
TELEMETRY_EVENT_SERVICE: {},
TELEMETRY_EVENT_ENVIRONMENT: {},
TELEMETRY_EVENT_USER_INVITATION_SENT: {},
TELEMETRY_EVENT_USER_INVITATION_ACCEPTED: {},
@ -201,11 +203,17 @@ func createTelemetry() {
select {
case <-activeUserTicker.C:
if telemetry.activeUser["logs"] != 0 {
getLogsInfoInLastHeartBeatInterval, err := telemetry.reader.GetLogsInfoInLastHeartBeatInterval(context.Background())
getLogsInfoInLastHeartBeatInterval, err := telemetry.reader.GetLogsInfoInLastHeartBeatInterval(context.Background(), ACTIVE_USER_DURATION)
if err != nil && getLogsInfoInLastHeartBeatInterval == 0 {
telemetry.activeUser["logs"] = 0
}
}
if telemetry.activeUser["metrics"] != 0 {
getSamplesInfoInLastHeartBeatInterval, err := telemetry.reader.GetSamplesInfoInLastHeartBeatInterval(context.Background(), ACTIVE_USER_DURATION)
if err != nil && getSamplesInfoInLastHeartBeatInterval == 0 {
telemetry.activeUser["metrics"] = 0
}
}
if (telemetry.activeUser["traces"] != 0) || (telemetry.activeUser["metrics"] != 0) || (telemetry.activeUser["logs"] != 0) {
telemetry.activeUser["any"] = 1
}
@ -214,7 +222,7 @@ func createTelemetry() {
case <-ticker.C:
tagsInfo, _ := telemetry.reader.GetTagsInfoInLastHeartBeatInterval(context.Background())
tagsInfo, _ := telemetry.reader.GetTagsInfoInLastHeartBeatInterval(context.Background(), HEART_BEAT_DURATION)
if len(tagsInfo.Env) != 0 {
telemetry.SendEvent(TELEMETRY_EVENT_ENVIRONMENT, map[string]interface{}{"value": tagsInfo.Env}, "")
@ -224,12 +232,18 @@ func createTelemetry() {
telemetry.SendEvent(TELEMETRY_EVENT_LANGUAGE, map[string]interface{}{"language": language}, "")
}
for service, _ := range tagsInfo.Services {
telemetry.SendEvent(TELEMETRY_EVENT_SERVICE, map[string]interface{}{"serviceName": service}, "")
}
totalSpans, _ := telemetry.reader.GetTotalSpans(context.Background())
spansInLastHeartBeatInterval, _ := telemetry.reader.GetSpansInLastHeartBeatInterval(context.Background())
getSamplesInfoInLastHeartBeatInterval, _ := telemetry.reader.GetSamplesInfoInLastHeartBeatInterval(context.Background())
totalLogs, _ := telemetry.reader.GetTotalLogs(context.Background())
spansInLastHeartBeatInterval, _ := telemetry.reader.GetSpansInLastHeartBeatInterval(context.Background(), HEART_BEAT_DURATION)
getSamplesInfoInLastHeartBeatInterval, _ := telemetry.reader.GetSamplesInfoInLastHeartBeatInterval(context.Background(), HEART_BEAT_DURATION)
totalSamples, _ := telemetry.reader.GetTotalSamples(context.Background())
tsInfo, _ := telemetry.reader.GetTimeSeriesInfo(context.Background())
getLogsInfoInLastHeartBeatInterval, _ := telemetry.reader.GetLogsInfoInLastHeartBeatInterval(context.Background())
getLogsInfoInLastHeartBeatInterval, _ := telemetry.reader.GetLogsInfoInLastHeartBeatInterval(context.Background(), HEART_BEAT_DURATION)
traceTTL, _ := telemetry.reader.GetTTL(context.Background(), &model.GetTTLParams{Type: constants.TraceTTL})
metricsTTL, _ := telemetry.reader.GetTTL(context.Background(), &model.GetTTLParams{Type: constants.MetricsTTL})
@ -238,7 +252,9 @@ func createTelemetry() {
data := map[string]interface{}{
"totalSpans": totalSpans,
"spansInLastHeartBeatInterval": spansInLastHeartBeatInterval,
"totalSamples": totalSamples,
"getSamplesInfoInLastHeartBeatInterval": getSamplesInfoInLastHeartBeatInterval,
"totalLogs": totalLogs,
"getLogsInfoInLastHeartBeatInterval": getLogsInfoInLastHeartBeatInterval,
"countUsers": telemetry.countUsers,
"metricsTTLStatus": metricsTTL.Status,