diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 9ce110d545..968aca0030 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -2833,7 +2833,7 @@ func (aH *APIHandler) onboardKafka( return } - chq, err := mq.BuildClickHouseQuery(messagingQueue, mq.KafkaQueue, "onboard_kafka") + queryRangeParams, err := mq.BuildBuilderQueriesKafkaOnboarding(messagingQueue) if err != nil { zap.L().Error(err.Error()) @@ -2841,66 +2841,69 @@ func (aH *APIHandler) onboardKafka( return } - result, err := aH.reader.GetListResultV3(r.Context(), chq.Query) - + results, errQueriesByName, err := aH.querierV2.QueryRange(r.Context(), queryRangeParams) if err != nil { apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err} - RespondError(w, apiErrObj, err) + RespondError(w, apiErrObj, errQueriesByName) return } var entries []mq.OnboardingResponse - for _, result := range result { - for key, value := range result.Data { - var message, attribute, status string + var fetchLatencyState, consumerLagState bool - intValue := int(*value.(*uint8)) - - if key == "entries" { - attribute = "telemetry ingestion" - if intValue != 0 { - entries = nil - entry := mq.OnboardingResponse{ - Attribute: attribute, - Message: "No data available in the given time range", - Status: "0", + for _, result := range results { + for _, series := range result.Series { + for _, point := range series.Points { + pointValue := point.Value + if pointValue > 0 { + if result.QueryName == "fetch_latency" { + fetchLatencyState = true + break + } + if result.QueryName == "consumer_lag" { + consumerLagState = true + break } - entries = append(entries, entry) - break - } else { - status = "1" } - } else if key == "fetchlatency" { - attribute = "kafka_consumer_fetch_latency_avg" - if intValue != 0 { - status = "0" - message = "Metric kafka_consumer_fetch_latency_avg is not present in the given time range." - } else { - status = "1" - } - } else if key == "grouplag" { - attribute = "kafka_consumer_group_lag" - if intValue != 0 { - status = "0" - message = "Metric kafka_consumer_group_lag is not present in the given time range." - } else { - status = "1" - } - } - entry := mq.OnboardingResponse{ - Attribute: attribute, - Message: message, - Status: status, } - entries = append(entries, entry) } } - sort.Slice(entries, func(i, j int) bool { - return entries[i].Attribute < entries[j].Attribute - }) + if !fetchLatencyState && !consumerLagState { + entries = append(entries, mq.OnboardingResponse{ + Attribute: "telemetry ingestion", + Message: "No data available in the given time range", + Status: "0", + }) + } + + if !fetchLatencyState { + entries = append(entries, mq.OnboardingResponse{ + Attribute: "kafka_consumer_fetch_latency_avg", + Message: "Metric kafka_consumer_fetch_latency_avg is not present in the given time range.", + Status: "0", + }) + } else { + entries = append(entries, mq.OnboardingResponse{ + Attribute: "kafka_consumer_fetch_latency_avg", + Status: "1", + }) + } + + if !consumerLagState { + entries = append(entries, mq.OnboardingResponse{ + Attribute: "kafka_consumer_group_lag", + Message: "Metric kafka_consumer_group_lag is not present in the given time range.", + Status: "0", + }) + } else { + entries = append(entries, mq.OnboardingResponse{ + Attribute: "kafka_consumer_group_lag", + Status: "1", + }) + } aH.Respond(w, entries) } diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go b/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go index eeb1167f23..bf07316bb2 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go @@ -381,18 +381,3 @@ WHERE AND timestamp <= '%d';`, queueType, start, end) return query } - -func onboardKafkaSQL(start, end int64) string { - query := fmt.Sprintf(` -SELECT - COUNT(*) = 0 AS entries, - COUNT(IF(metric_name = 'kafka_consumer_fetch_latency_avg', 1, NULL)) = 0 AS fetchlatency, - COUNT(IF(metric_name = 'kafka_consumer_group_lag', 1, NULL)) = 0 AS grouplag -FROM - signoz_metrics.time_series_v4_1day -WHERE - metric_name IN ('kafka_consumer_fetch_latency_avg', 'kafka_consumer_group_lag') - AND unix_milli >= '%d' - AND unix_milli < '%d';`, start/1000000, end/1000000) - return query -} diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go b/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go index a7d934d5a4..1de4a4bb5d 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go @@ -185,6 +185,60 @@ func buildBuilderQueriesNetwork(unixMilliStart, unixMilliEnd int64, attributeCac return bq, nil } +func BuildBuilderQueriesKafkaOnboarding(messagingQueue *MessagingQueue) (*v3.QueryRangeParamsV3, error) { + bq := make(map[string]*v3.BuilderQuery) + + unixMilliStart := messagingQueue.Start / 1000000 + unixMilliEnd := messagingQueue.End / 1000000 + + buiderQuery := &v3.BuilderQuery{ + QueryName: "fetch_latency", + StepInterval: common.MinAllowedStepInterval(unixMilliStart, unixMilliEnd), + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: "kafka_consumer_fetch_latency_avg", + }, + AggregateOperator: v3.AggregateOperatorCount, + Temporality: v3.Unspecified, + TimeAggregation: v3.TimeAggregationCount, + SpaceAggregation: v3.SpaceAggregationSum, + Expression: "fetch_latency", + } + bq["fetch_latency"] = buiderQuery + + buiderQuery = &v3.BuilderQuery{ + QueryName: "consumer_lag", + StepInterval: common.MinAllowedStepInterval(unixMilliStart, unixMilliEnd), + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: "kafka_consumer_group_lag", + }, + AggregateOperator: v3.AggregateOperatorCount, + Temporality: v3.Unspecified, + TimeAggregation: v3.TimeAggregationCount, + SpaceAggregation: v3.SpaceAggregationSum, + Expression: "consumer_lag", + } + bq["consumer_lag"] = buiderQuery + + cq := &v3.CompositeQuery{ + QueryType: v3.QueryTypeBuilder, + BuilderQueries: bq, + PanelType: v3.PanelTypeTable, + } + + queryRangeParams := &v3.QueryRangeParamsV3{ + Start: unixMilliStart, + End: unixMilliEnd, + Step: defaultStepInterval, + CompositeQuery: cq, + Version: "v4", + FormatForWeb: true, + } + + return queryRangeParams, nil +} + func BuildQRParamsWithCache(messagingQueue *MessagingQueue, queryContext string, attributeCache *Clients) (*v3.QueryRangeParamsV3, error) { queueType := KafkaQueue @@ -302,8 +356,6 @@ func BuildClickHouseQuery(messagingQueue *MessagingQueue, queueType string, quer query = onboardProducersSQL(start, end, queueType) } else if queryContext == "onboard_consumers" { query = onboardConsumerSQL(start, end, queueType) - } else if queryContext == "onboard_kafka" { - query = onboardKafkaSQL(start, end) } return &v3.ClickHouseQuery{ Query: query,