fix: use query builder for metrics onboarding API (#6327)

This commit is contained in:
Shivanshu Raj Shrivastava 2024-10-30 23:13:56 +05:30 committed by GitHub
parent 860145fb1d
commit fbe75cd057
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 103 additions and 63 deletions

View File

@ -2833,7 +2833,7 @@ func (aH *APIHandler) onboardKafka(
return return
} }
chq, err := mq.BuildClickHouseQuery(messagingQueue, mq.KafkaQueue, "onboard_kafka") queryRangeParams, err := mq.BuildBuilderQueriesKafkaOnboarding(messagingQueue)
if err != nil { if err != nil {
zap.L().Error(err.Error()) zap.L().Error(err.Error())
@ -2841,66 +2841,69 @@ func (aH *APIHandler) onboardKafka(
return return
} }
result, err := aH.reader.GetListResultV3(r.Context(), chq.Query) results, errQueriesByName, err := aH.querierV2.QueryRange(r.Context(), queryRangeParams)
if err != nil { if err != nil {
apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err} apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err}
RespondError(w, apiErrObj, err) RespondError(w, apiErrObj, errQueriesByName)
return return
} }
var entries []mq.OnboardingResponse var entries []mq.OnboardingResponse
for _, result := range result { var fetchLatencyState, consumerLagState bool
for key, value := range result.Data {
var message, attribute, status string
intValue := int(*value.(*uint8)) for _, result := range results {
for _, series := range result.Series {
for _, point := range series.Points {
pointValue := point.Value
if pointValue > 0 {
if result.QueryName == "fetch_latency" {
fetchLatencyState = true
break
}
if result.QueryName == "consumer_lag" {
consumerLagState = true
break
}
}
if key == "entries" { }
attribute = "telemetry ingestion" }
if intValue != 0 { }
entries = nil
entry := mq.OnboardingResponse{ if !fetchLatencyState && !consumerLagState {
Attribute: attribute, entries = append(entries, mq.OnboardingResponse{
Attribute: "telemetry ingestion",
Message: "No data available in the given time range", Message: "No data available in the given time range",
Status: "0", Status: "0",
}
entries = append(entries, entry)
break
} else {
status = "1"
}
} else if key == "fetchlatency" {
attribute = "kafka_consumer_fetch_latency_avg"
if intValue != 0 {
status = "0"
message = "Metric kafka_consumer_fetch_latency_avg is not present in the given time range."
} else {
status = "1"
}
} else if key == "grouplag" {
attribute = "kafka_consumer_group_lag"
if intValue != 0 {
status = "0"
message = "Metric kafka_consumer_group_lag is not present in the given time range."
} else {
status = "1"
}
}
entry := mq.OnboardingResponse{
Attribute: attribute,
Message: message,
Status: status,
}
entries = append(entries, entry)
}
}
sort.Slice(entries, func(i, j int) bool {
return entries[i].Attribute < entries[j].Attribute
}) })
}
if !fetchLatencyState {
entries = append(entries, mq.OnboardingResponse{
Attribute: "kafka_consumer_fetch_latency_avg",
Message: "Metric kafka_consumer_fetch_latency_avg is not present in the given time range.",
Status: "0",
})
} else {
entries = append(entries, mq.OnboardingResponse{
Attribute: "kafka_consumer_fetch_latency_avg",
Status: "1",
})
}
if !consumerLagState {
entries = append(entries, mq.OnboardingResponse{
Attribute: "kafka_consumer_group_lag",
Message: "Metric kafka_consumer_group_lag is not present in the given time range.",
Status: "0",
})
} else {
entries = append(entries, mq.OnboardingResponse{
Attribute: "kafka_consumer_group_lag",
Status: "1",
})
}
aH.Respond(w, entries) aH.Respond(w, entries)
} }

View File

@ -381,18 +381,3 @@ WHERE
AND timestamp <= '%d';`, queueType, start, end) AND timestamp <= '%d';`, queueType, start, end)
return query return query
} }
func onboardKafkaSQL(start, end int64) string {
query := fmt.Sprintf(`
SELECT
COUNT(*) = 0 AS entries,
COUNT(IF(metric_name = 'kafka_consumer_fetch_latency_avg', 1, NULL)) = 0 AS fetchlatency,
COUNT(IF(metric_name = 'kafka_consumer_group_lag', 1, NULL)) = 0 AS grouplag
FROM
signoz_metrics.time_series_v4_1day
WHERE
metric_name IN ('kafka_consumer_fetch_latency_avg', 'kafka_consumer_group_lag')
AND unix_milli >= '%d'
AND unix_milli < '%d';`, start/1000000, end/1000000)
return query
}

View File

@ -185,6 +185,60 @@ func buildBuilderQueriesNetwork(unixMilliStart, unixMilliEnd int64, attributeCac
return bq, nil return bq, nil
} }
func BuildBuilderQueriesKafkaOnboarding(messagingQueue *MessagingQueue) (*v3.QueryRangeParamsV3, error) {
bq := make(map[string]*v3.BuilderQuery)
unixMilliStart := messagingQueue.Start / 1000000
unixMilliEnd := messagingQueue.End / 1000000
buiderQuery := &v3.BuilderQuery{
QueryName: "fetch_latency",
StepInterval: common.MinAllowedStepInterval(unixMilliStart, unixMilliEnd),
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: "kafka_consumer_fetch_latency_avg",
},
AggregateOperator: v3.AggregateOperatorCount,
Temporality: v3.Unspecified,
TimeAggregation: v3.TimeAggregationCount,
SpaceAggregation: v3.SpaceAggregationSum,
Expression: "fetch_latency",
}
bq["fetch_latency"] = buiderQuery
buiderQuery = &v3.BuilderQuery{
QueryName: "consumer_lag",
StepInterval: common.MinAllowedStepInterval(unixMilliStart, unixMilliEnd),
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: "kafka_consumer_group_lag",
},
AggregateOperator: v3.AggregateOperatorCount,
Temporality: v3.Unspecified,
TimeAggregation: v3.TimeAggregationCount,
SpaceAggregation: v3.SpaceAggregationSum,
Expression: "consumer_lag",
}
bq["consumer_lag"] = buiderQuery
cq := &v3.CompositeQuery{
QueryType: v3.QueryTypeBuilder,
BuilderQueries: bq,
PanelType: v3.PanelTypeTable,
}
queryRangeParams := &v3.QueryRangeParamsV3{
Start: unixMilliStart,
End: unixMilliEnd,
Step: defaultStepInterval,
CompositeQuery: cq,
Version: "v4",
FormatForWeb: true,
}
return queryRangeParams, nil
}
func BuildQRParamsWithCache(messagingQueue *MessagingQueue, queryContext string, attributeCache *Clients) (*v3.QueryRangeParamsV3, error) { func BuildQRParamsWithCache(messagingQueue *MessagingQueue, queryContext string, attributeCache *Clients) (*v3.QueryRangeParamsV3, error) {
queueType := KafkaQueue queueType := KafkaQueue
@ -302,8 +356,6 @@ func BuildClickHouseQuery(messagingQueue *MessagingQueue, queueType string, quer
query = onboardProducersSQL(start, end, queueType) query = onboardProducersSQL(start, end, queueType)
} else if queryContext == "onboard_consumers" { } else if queryContext == "onboard_consumers" {
query = onboardConsumerSQL(start, end, queueType) query = onboardConsumerSQL(start, end, queueType)
} else if queryContext == "onboard_kafka" {
query = onboardKafkaSQL(start, end)
} }
return &v3.ClickHouseQuery{ return &v3.ClickHouseQuery{
Query: query, Query: query,