diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index d7da4a41e5..0711382acf 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -3068,6 +3068,20 @@ func (r *ClickHouseReader) GetSamplesInfoInLastHeartBeatInterval(ctx context.Con return totalSamples, nil } + +func (r *ClickHouseReader) GetDistributedInfoInLastHeartBeatInterval(ctx context.Context) (map[string]interface{}, error) { + + clusterInfo := []model.ClusterInfo{} + + queryStr := `SELECT shard_num, shard_weight, replica_num, errors_count, slowdowns_count, estimated_recovery_time FROM system.clusters where cluster='cluster';` + r.db.Select(ctx, &clusterInfo, queryStr) + if len(clusterInfo) == 1 { + return clusterInfo[0].GetMapFromStruct(), nil + } + + return nil, nil +} + func (r *ClickHouseReader) GetLogsInfoInLastHeartBeatInterval(ctx context.Context) (uint64, error) { var totalLogLines uint64 diff --git a/pkg/query-service/interfaces/interface.go b/pkg/query-service/interfaces/interface.go index 3200b10c2f..bcaf889ab6 100644 --- a/pkg/query-service/interfaces/interface.go +++ b/pkg/query-service/interfaces/interface.go @@ -63,6 +63,7 @@ type Reader interface { GetSamplesInfoInLastHeartBeatInterval(ctx context.Context) (uint64, error) GetLogsInfoInLastHeartBeatInterval(ctx context.Context) (uint64, error) GetTagsInfoInLastHeartBeatInterval(ctx context.Context) (*model.TagsInfo, error) + GetDistributedInfoInLastHeartBeatInterval(ctx context.Context) (map[string]interface{}, error) // Logs GetLogFields(ctx context.Context) (*model.GetFieldsResponse, *model.ApiError) UpdateLogField(ctx context.Context, field *model.UpdateField) *model.ApiError diff --git a/pkg/query-service/model/response.go b/pkg/query-service/model/response.go index 7ae79be456..0f1d60d9c2 100644 --- a/pkg/query-service/model/response.go +++ b/pkg/query-service/model/response.go @@ -564,3 +564,19 @@ type TagTelemetryData struct { Env string `json:"env" ch:"env"` Language string `json:"language" ch:"language"` } + +type ClusterInfo struct { + ShardNum uint32 `json:"shard_num" ch:"shard_num"` + ShardWeight uint32 `json:"shard_weight" ch:"shard_weight"` + ReplicaNum uint32 `json:"replica_num" ch:"replica_num"` + ErrorsCount uint32 `json:"errors_count" ch:"errors_count"` + SlowdownsCount uint32 `json:"slowdowns_count" ch:"slowdowns_count"` + EstimatedRecoveryTime uint32 `json:"estimated_recovery_time" ch:"estimated_recovery_time"` +} + +func (ci *ClusterInfo) GetMapFromStruct() map[string]interface{} { + var clusterInfoMap map[string]interface{} + data, _ := json.Marshal(*ci) + json.Unmarshal(data, &clusterInfoMap) + return clusterInfoMap +} diff --git a/pkg/query-service/telemetry/telemetry.go b/pkg/query-service/telemetry/telemetry.go index 17f93f6339..e90207475a 100644 --- a/pkg/query-service/telemetry/telemetry.go +++ b/pkg/query-service/telemetry/telemetry.go @@ -33,6 +33,7 @@ const ( TELEMETRY_EVENT_ENVIRONMENT = "Environment" TELEMETRY_EVENT_LANGUAGE = "Language" TELEMETRY_EVENT_LOGS_FILTERS = "Logs Filters" + TELEMETRY_EVENT_DISTRIBUTED = "Distributed" ) const api_key = "4Gmoa4ixJAUHx2BpJxsjwA1bEfnwEeRz" @@ -139,6 +140,10 @@ func createTelemetry() { data[key] = value } telemetry.SendEvent(TELEMETRY_EVENT_HEART_BEAT, data) + + getDistributedInfoInLastHeartBeatInterval, _ := telemetry.reader.GetDistributedInfoInLastHeartBeatInterval(context.Background()) + telemetry.SendEvent(TELEMETRY_EVENT_DISTRIBUTED, getDistributedInfoInLastHeartBeatInterval) + } } }()