chore: update heartbeat interval logic (#5507)

* chore: update heartbeat interval logic

* chore: address review comment
This commit is contained in:
Vishal Sharma 2024-07-17 17:05:15 +05:30 committed by GitHub
parent 77eba9a558
commit d3b83f5a41
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 171 additions and 129 deletions

View File

@ -55,6 +55,10 @@ func GetAlertManagerApiPrefix() string {
return "http://alertmanager:9093/api/" return "http://alertmanager:9093/api/"
} }
var TELEMETRY_HEART_BEAT_DURATION_MINUTES = GetOrDefaultEnvInt("TELEMETRY_HEART_BEAT_DURATION_MINUTES", 720)
var TELEMETRY_ACTIVE_USER_DURATION_MINUTES = GetOrDefaultEnvInt("TELEMETRY_ACTIVE_USER_DURATION_MINUTES", 360)
var InviteEmailTemplate = GetOrDefaultEnv("INVITE_EMAIL_TEMPLATE", "/root/templates/invitation_email_template.html") var InviteEmailTemplate = GetOrDefaultEnv("INVITE_EMAIL_TEMPLATE", "/root/templates/invitation_email_template.html")
// Alert manager channel subpath // Alert manager channel subpath
@ -232,6 +236,18 @@ func GetOrDefaultEnv(key string, fallback string) string {
return v return v
} }
func GetOrDefaultEnvInt(key string, fallback int) int {
v := os.Getenv(key)
if len(v) == 0 {
return fallback
}
intVal, err := strconv.Atoi(v)
if err != nil {
return fallback
}
return intVal
}
const ( const (
STRING = "String" STRING = "String"
UINT32 = "UInt32" UINT32 = "UInt32"

View File

@ -11,6 +11,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/go-co-op/gocron"
"gopkg.in/segmentio/analytics-go.v3" "gopkg.in/segmentio/analytics-go.v3"
"go.signoz.io/signoz/pkg/query-service/constants" "go.signoz.io/signoz/pkg/query-service/constants"
@ -85,19 +86,11 @@ const api_key = "9kRrJ7oPCGPEJLF6QjMPLt5bljFhRQBr"
const IP_NOT_FOUND_PLACEHOLDER = "NA" const IP_NOT_FOUND_PLACEHOLDER = "NA"
const DEFAULT_NUMBER_OF_SERVICES = 6 const DEFAULT_NUMBER_OF_SERVICES = 6
const HEART_BEAT_DURATION = 12 * time.Hour const SCHEDULE_START_TIME = "04:00" // 4 AM UTC
const ACTIVE_USER_DURATION = 6 * time.Hour
// const HEART_BEAT_DURATION = 30 * time.Second
// const ACTIVE_USER_DURATION = 30 * time.Second
const RATE_LIMIT_CHECK_DURATION = 1 * time.Minute const RATE_LIMIT_CHECK_DURATION = 1 * time.Minute
const RATE_LIMIT_VALUE = 1 const RATE_LIMIT_VALUE = 1
// const RATE_LIMIT_CHECK_DURATION = 20 * time.Second
// const RATE_LIMIT_VALUE = 5
var telemetry *Telemetry var telemetry *Telemetry
var once sync.Once var once sync.Once
@ -213,8 +206,11 @@ func createTelemetry() {
telemetry.SetTelemetryEnabled(constants.IsTelemetryEnabled()) telemetry.SetTelemetryEnabled(constants.IsTelemetryEnabled())
ticker := time.NewTicker(HEART_BEAT_DURATION) // Create a new scheduler
activeUserTicker := time.NewTicker(ACTIVE_USER_DURATION) s := gocron.NewScheduler(time.UTC)
HEART_BEAT_DURATION := time.Duration(constants.TELEMETRY_HEART_BEAT_DURATION_MINUTES) * time.Minute
ACTIVE_USER_DURATION := time.Duration(constants.TELEMETRY_ACTIVE_USER_DURATION_MINUTES) * time.Minute
rateLimitTicker := time.NewTicker(RATE_LIMIT_CHECK_DURATION) rateLimitTicker := time.NewTicker(RATE_LIMIT_CHECK_DURATION)
@ -227,36 +223,10 @@ func createTelemetry() {
} }
} }
}() }()
go func() { ctx := context.Background()
for { // Define heartbeat function
select { heartbeatFunc := func() {
case <-activeUserTicker.C: tagsInfo, _ := telemetry.reader.GetTagsInfoInLastHeartBeatInterval(ctx, HEART_BEAT_DURATION)
if telemetry.activeUser["logs"] != 0 {
getLogsInfoInLastHeartBeatInterval, err := telemetry.reader.GetLogsInfoInLastHeartBeatInterval(context.Background(), ACTIVE_USER_DURATION)
if err != nil && getLogsInfoInLastHeartBeatInterval == 0 {
telemetry.activeUser["logs"] = 0
}
}
if telemetry.activeUser["metrics"] != 0 {
getSamplesInfoInLastHeartBeatInterval, err := telemetry.reader.GetSamplesInfoInLastHeartBeatInterval(context.Background(), ACTIVE_USER_DURATION)
if err != nil && getSamplesInfoInLastHeartBeatInterval == 0 {
telemetry.activeUser["metrics"] = 0
}
}
if (telemetry.activeUser["traces"] != 0) || (telemetry.activeUser["metrics"] != 0) || (telemetry.activeUser["logs"] != 0) {
telemetry.activeUser["any"] = 1
}
telemetry.SendEvent(TELEMETRY_EVENT_ACTIVE_USER, map[string]interface{}{
"traces": telemetry.activeUser["traces"],
"metrics": telemetry.activeUser["metrics"],
"logs": telemetry.activeUser["logs"],
"any": telemetry.activeUser["any"]},
"", true, false)
telemetry.activeUser = map[string]int8{"traces": 0, "metrics": 0, "logs": 0, "any": 0}
case <-ticker.C:
tagsInfo, _ := telemetry.reader.GetTagsInfoInLastHeartBeatInterval(context.Background(), HEART_BEAT_DURATION)
if len(tagsInfo.Env) != 0 { if len(tagsInfo.Env) != 0 {
telemetry.SendEvent(TELEMETRY_EVENT_ENVIRONMENT, map[string]interface{}{"value": tagsInfo.Env}, "", true, false) telemetry.SendEvent(TELEMETRY_EVENT_ENVIRONMENT, map[string]interface{}{"value": tagsInfo.Env}, "", true, false)
@ -276,18 +246,18 @@ func createTelemetry() {
if len(services) > 0 { if len(services) > 0 {
telemetry.SendEvent(TELEMETRY_EVENT_SERVICE, map[string]interface{}{"serviceName": services}, "", true, false) telemetry.SendEvent(TELEMETRY_EVENT_SERVICE, map[string]interface{}{"serviceName": services}, "", true, false)
} }
totalSpans, _ := telemetry.reader.GetTotalSpans(context.Background()) totalSpans, _ := telemetry.reader.GetTotalSpans(ctx)
totalLogs, _ := telemetry.reader.GetTotalLogs(context.Background()) totalLogs, _ := telemetry.reader.GetTotalLogs(ctx)
spansInLastHeartBeatInterval, _ := telemetry.reader.GetSpansInLastHeartBeatInterval(context.Background(), HEART_BEAT_DURATION) spansInLastHeartBeatInterval, _ := telemetry.reader.GetSpansInLastHeartBeatInterval(ctx, HEART_BEAT_DURATION)
getSamplesInfoInLastHeartBeatInterval, _ := telemetry.reader.GetSamplesInfoInLastHeartBeatInterval(context.Background(), HEART_BEAT_DURATION) getSamplesInfoInLastHeartBeatInterval, _ := telemetry.reader.GetSamplesInfoInLastHeartBeatInterval(ctx, HEART_BEAT_DURATION)
totalSamples, _ := telemetry.reader.GetTotalSamples(context.Background()) totalSamples, _ := telemetry.reader.GetTotalSamples(ctx)
tsInfo, _ := telemetry.reader.GetTimeSeriesInfo(context.Background()) tsInfo, _ := telemetry.reader.GetTimeSeriesInfo(ctx)
getLogsInfoInLastHeartBeatInterval, _ := telemetry.reader.GetLogsInfoInLastHeartBeatInterval(context.Background(), HEART_BEAT_DURATION) getLogsInfoInLastHeartBeatInterval, _ := telemetry.reader.GetLogsInfoInLastHeartBeatInterval(ctx, HEART_BEAT_DURATION)
traceTTL, _ := telemetry.reader.GetTTL(context.Background(), &model.GetTTLParams{Type: constants.TraceTTL}) traceTTL, _ := telemetry.reader.GetTTL(ctx, &model.GetTTLParams{Type: constants.TraceTTL})
metricsTTL, _ := telemetry.reader.GetTTL(context.Background(), &model.GetTTLParams{Type: constants.MetricsTTL}) metricsTTL, _ := telemetry.reader.GetTTL(ctx, &model.GetTTLParams{Type: constants.MetricsTTL})
logsTTL, _ := telemetry.reader.GetTTL(context.Background(), &model.GetTTLParams{Type: constants.LogsTTL}) logsTTL, _ := telemetry.reader.GetTTL(ctx, &model.GetTTLParams{Type: constants.LogsTTL})
data := map[string]interface{}{ data := map[string]interface{}{
"totalSpans": totalSpans, "totalSpans": totalSpans,
@ -307,7 +277,7 @@ func createTelemetry() {
data[key] = value data[key] = value
} }
users, apiErr := telemetry.reader.GetUsers(context.Background()) users, apiErr := telemetry.reader.GetUsers(ctx)
if apiErr == nil { if apiErr == nil {
for _, user := range users { for _, user := range users {
if user.Email == DEFAULT_CLOUD_EMAIL { if user.Email == DEFAULT_CLOUD_EMAIL {
@ -316,13 +286,14 @@ func createTelemetry() {
telemetry.SendEvent(TELEMETRY_EVENT_HEART_BEAT, data, user.Email, false, false) telemetry.SendEvent(TELEMETRY_EVENT_HEART_BEAT, data, user.Email, false, false)
} }
} }
alertsInfo, err := telemetry.alertsInfoCallback(context.Background())
alertsInfo, err := telemetry.alertsInfoCallback(ctx)
if err == nil { if err == nil {
dashboardsInfo, err := telemetry.reader.GetDashboardsInfo(context.Background()) dashboardsInfo, err := telemetry.reader.GetDashboardsInfo(ctx)
if err == nil { if err == nil {
channels, err := telemetry.reader.GetChannels() channels, err := telemetry.reader.GetChannels()
if err == nil { if err == nil {
savedViewsInfo, err := telemetry.reader.GetSavedViewsInfo(context.Background()) savedViewsInfo, err := telemetry.reader.GetSavedViewsInfo(ctx)
if err == nil { if err == nil {
dashboardsAlertsData := map[string]interface{}{ dashboardsAlertsData := map[string]interface{}{
"totalDashboards": dashboardsInfo.TotalDashboards, "totalDashboards": dashboardsInfo.TotalDashboards,
@ -356,12 +327,67 @@ func createTelemetry() {
telemetry.SendEvent(TELEMETRY_EVENT_DASHBOARDS_ALERTS, map[string]interface{}{"error": err.Error()}, "", true, false) telemetry.SendEvent(TELEMETRY_EVENT_DASHBOARDS_ALERTS, map[string]interface{}{"error": err.Error()}, "", true, false)
} }
getDistributedInfoInLastHeartBeatInterval, _ := telemetry.reader.GetDistributedInfoInLastHeartBeatInterval(context.Background()) getDistributedInfoInLastHeartBeatInterval, _ := telemetry.reader.GetDistributedInfoInLastHeartBeatInterval(ctx)
telemetry.SendEvent(TELEMETRY_EVENT_DISTRIBUTED, getDistributedInfoInLastHeartBeatInterval, "", true, false) telemetry.SendEvent(TELEMETRY_EVENT_DISTRIBUTED, getDistributedInfoInLastHeartBeatInterval, "", true, false)
} }
}
}()
// Define active user function
activeUserFunc := func() {
if telemetry.activeUser["logs"] != 0 {
getLogsInfoInLastHeartBeatInterval, err := telemetry.reader.GetLogsInfoInLastHeartBeatInterval(ctx, ACTIVE_USER_DURATION)
if err != nil && getLogsInfoInLastHeartBeatInterval == 0 {
telemetry.activeUser["logs"] = 0
}
}
if telemetry.activeUser["metrics"] != 0 {
getSamplesInfoInLastHeartBeatInterval, err := telemetry.reader.GetSamplesInfoInLastHeartBeatInterval(ctx, ACTIVE_USER_DURATION)
if err != nil && getSamplesInfoInLastHeartBeatInterval == 0 {
telemetry.activeUser["metrics"] = 0
}
}
if (telemetry.activeUser["traces"] != 0) || (telemetry.activeUser["metrics"] != 0) || (telemetry.activeUser["logs"] != 0) {
telemetry.activeUser["any"] = 1
}
telemetry.SendEvent(TELEMETRY_EVENT_ACTIVE_USER, map[string]interface{}{
"traces": telemetry.activeUser["traces"],
"metrics": telemetry.activeUser["metrics"],
"logs": telemetry.activeUser["logs"],
"any": telemetry.activeUser["any"]},
"", true, false)
telemetry.activeUser = map[string]int8{"traces": 0, "metrics": 0, "logs": 0, "any": 0}
}
// Calculate next run time based on duration and start time
calculateNextRun := func(duration time.Duration, startTimeStr string) time.Time {
now := time.Now().UTC()
startTime, _ := time.Parse("15:04", startTimeStr)
todayStartTime := time.Date(now.Year(), now.Month(), now.Day(), startTime.Hour(), startTime.Minute(), 0, 0, time.UTC)
if now.Before(todayStartTime) {
todayStartTime = todayStartTime.Add(-24 * time.Hour)
}
diff := now.Sub(todayStartTime)
intervalsPassed := int(diff / duration)
nextRun := todayStartTime.Add(time.Duration(intervalsPassed+1) * duration)
return nextRun
}
// Schedule next runs
scheduleNextRuns := func() {
nextHeartbeat := calculateNextRun(HEART_BEAT_DURATION, SCHEDULE_START_TIME)
nextActiveUser := calculateNextRun(ACTIVE_USER_DURATION, SCHEDULE_START_TIME)
s.Every(HEART_BEAT_DURATION).StartAt(nextHeartbeat).Do(heartbeatFunc)
s.Every(ACTIVE_USER_DURATION).StartAt(nextActiveUser).Do(activeUserFunc)
}
// Schedule immediate execution and subsequent runs
scheduleNextRuns()
// Start the scheduler in a separate goroutine
go s.StartBlocking()
} }
// Get preferred outbound ip of this machine // Get preferred outbound ip of this machine