diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index b6a7c44262..261b3d96df 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -11,6 +11,7 @@ import ( "net/http" "regexp" "slices" + "sort" "strconv" "strings" "sync" @@ -2463,15 +2464,19 @@ func (aH *APIHandler) WriteJSON(w http.ResponseWriter, r *http.Request, response // RegisterMessagingQueuesRoutes adds messaging-queues routes func (aH *APIHandler) RegisterMessagingQueuesRoutes(router *mux.Router, am *AuthMiddleware) { - // SubRouter for kafka - kafkaSubRouter := router.PathPrefix("/api/v1/messaging-queues/kafka/consumer-lag").Subrouter() - kafkaSubRouter.HandleFunc("/producer-details", am.ViewAccess(aH.getProducerData)).Methods(http.MethodPost) - kafkaSubRouter.HandleFunc("/consumer-details", am.ViewAccess(aH.getConsumerData)).Methods(http.MethodPost) - kafkaSubRouter.HandleFunc("/network-latency", am.ViewAccess(aH.getNetworkData)).Methods(http.MethodPost) - kafkaSubRouter.HandleFunc("/onboarding-producers", am.ViewAccess(aH.onboardProducers)).Methods(http.MethodPost) - kafkaSubRouter.HandleFunc("/onboarding-consumers", am.ViewAccess(aH.onboardConsumers)).Methods(http.MethodPost) - kafkaSubRouter.HandleFunc("/onboarding-kafka", am.ViewAccess(aH.onboardKafka)).Methods(http.MethodPost) + // SubRouter for kafka + kafkaRouter := router.PathPrefix("/api/v1/messaging-queues/kafka").Subrouter() + + consumerLagRouter := kafkaRouter.PathPrefix("/consumer-lag").Subrouter() + consumerLagRouter.HandleFunc("/producer-details", am.ViewAccess(aH.getProducerData)).Methods(http.MethodPost) + consumerLagRouter.HandleFunc("/consumer-details", am.ViewAccess(aH.getConsumerData)).Methods(http.MethodPost) + consumerLagRouter.HandleFunc("/network-latency", am.ViewAccess(aH.getNetworkData)).Methods(http.MethodPost) + + onboardingRouter := kafkaRouter.PathPrefix("/onboarding").Subrouter() + onboardingRouter.HandleFunc("/producers", am.ViewAccess(aH.onboardProducers)).Methods(http.MethodPost) + onboardingRouter.HandleFunc("/consumers", am.ViewAccess(aH.onboardConsumers)).Methods(http.MethodPost) + onboardingRouter.HandleFunc("/kafka", am.ViewAccess(aH.onboardKafka)).Methods(http.MethodPost) // for other messaging queues, add SubRouters here } @@ -2493,69 +2498,94 @@ func (aH *APIHandler) onboardProducers( return } - queryRangeParams, err := mq.BuildQueryRangeParams(messagingQueue, "onboard_producers") + chq, err := mq.BuildClickHouseQuery(messagingQueue, mq.KafkaQueue, "onboard_producers") + if err != nil { zap.L().Error(err.Error()) RespondError(w, apiErr, nil) return } - if err := validateQueryRangeParamsV3(queryRangeParams); err != nil { - zap.L().Error(err.Error()) - RespondError(w, apiErr, nil) - return - } + results, err := aH.reader.GetListResultV3(r.Context(), chq.Query) - result, errQuriesByName, err := aH.querierV2.QueryRange(r.Context(), queryRangeParams) if err != nil { apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err} - RespondError(w, apiErrObj, errQuriesByName) + RespondError(w, apiErrObj, err) return } - transformedSeries := &v3.Series{ - Labels: make(map[string]string), - LabelsArray: make([]map[string]string, 0), - Points: make([]v3.Point, 0), - } + var entries []mq.OnboardingResponse - var value string + for _, result := range results { - for _, result := range result { - for _, series := range result.Series { - for _, point := range series.Points { - switch point.Value { - case 0: - value = "All attributes are present and meet the conditions" - case 1: - value = "No data available in the given time range" - case 2: - value = "messaging.system attribute is not present or not equal to kafka in your spans" - case 3: - value = "check if your producer spans has kind producer" - case 4: - value = "messaging.destination.name attribute is not present in your spans" - case 5: - value = "messaging.destination.partition.id attribute is not present in your spans" - default: - value = "Unknown problem occurred, try increasing the time" + for key, value := range result.Data { + var message, attribute, status string + + intValue := int(*value.(*uint8)) + + if key == "entries" { + attribute = "telemetry ingestion" + if intValue != 0 { + entries = nil + entry := mq.OnboardingResponse{ + Attribute: attribute, + Message: "No data available in the given time range", + Status: "0", + } + entries = append(entries, entry) + break + } else { + status = "1" + } + } else if key == "queue" { + attribute = "messaging.system" + if intValue != 0 { + status = "0" + message = "messaging.system attribute is not present or not equal to kafka in your spans" + } else { + status = "1" + } + } else if key == "kind" { + attribute = "kind" + if intValue != 0 { + status = "0" + message = "check if your producer spans has kind=4 as attribute" + } else { + status = "1" + } + } else if key == "destination" { + attribute = "messaging.destination.name" + if intValue != 0 { + status = "0" + message = "messaging.destination.name attribute is not present in your spans" + } else { + status = "1" + } + } else if key == "partition" { + attribute = "messaging.destination.partition.id" + if intValue != 0 { + status = "0" + message = "messaging.destination.partition.id attribute is not present in your spans" + } else { + status = "1" } } + + entry := mq.OnboardingResponse{ + Attribute: attribute, + Message: message, + Status: status, + } + + entries = append(entries, entry) } } - transformedSeries.LabelsArray = append(transformedSeries.LabelsArray, map[string]string{"result": value}) + sort.Slice(entries, func(i, j int) bool { + return entries[i].Attribute < entries[j].Attribute + }) - for _, result := range result { - for i := range result.Series { - result.Series[i] = transformedSeries - } - } - - resp := v3.QueryRangeResponse{ - Result: result, - } - aH.Respond(w, resp) + aH.Respond(w, entries) } func (aH *APIHandler) onboardConsumers( @@ -2570,79 +2600,128 @@ func (aH *APIHandler) onboardConsumers( return } - queryRangeParams, err := mq.BuildQueryRangeParams(messagingQueue, "onboard_consumers") + chq, err := mq.BuildClickHouseQuery(messagingQueue, mq.KafkaQueue, "onboard_producers") + if err != nil { zap.L().Error(err.Error()) RespondError(w, apiErr, nil) return } - if err := validateQueryRangeParamsV3(queryRangeParams); err != nil { - zap.L().Error(err.Error()) - RespondError(w, apiErr, nil) - return - } + result, err := aH.reader.GetListResultV3(r.Context(), chq.Query) - result, errQuriesByName, err := aH.querierV2.QueryRange(r.Context(), queryRangeParams) if err != nil { apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err} - RespondError(w, apiErrObj, errQuriesByName) + RespondError(w, apiErrObj, err) return } - transformedSeries := &v3.Series{ - Labels: make(map[string]string), - LabelsArray: make([]map[string]string, 0), - Points: make([]v3.Point, 0), - } - - var value string + var entries []mq.OnboardingResponse for _, result := range result { - for _, series := range result.Series { - for _, point := range series.Points { - switch point.Value { - case 0: - value = "All attributes are present and meet the conditions" - case 1: - value = "No data available in the given time range" - case 2: - value = "msgSystem attribute is not present or not equal to 'kafka' in your spans" - case 3: - value = "kind attribute is not present or not equal to 5 in your spans" - case 4: - value = "serviceName attribute is not present in your spans" - case 5: - value = "messaging.destination.name attribute is not present in your spans" - case 6: - value = "messaging.destination.partition.id attribute is not present in your spans" - case 7: - value = "messaging.kafka.consumer.group attribute is not present in your spans" - case 8: - value = "messaging.message.body.size attribute is not present in your spans" - case 9: - value = "messaging.client_id attribute is not present in your spans" - case 10: - value = "service.instance.id attribute is not present in your spans" - default: - value = "Unknown problem occurred, try increasing the time" + for key, value := range result.Data { + var message, attribute, status string + + intValue := int(*value.(*uint8)) + + if key == "entries" { + attribute = "telemetry ingestion" + if intValue != 0 { + entries = nil + entry := mq.OnboardingResponse{ + Attribute: attribute, + Message: "No data available in the given time range", + Status: "0", + } + entries = append(entries, entry) + break + } else { + status = "1" + } + } else if key == "queue" { + attribute = "messaging.system" + if intValue != 0 { + status = "0" + message = "messaging.system attribute is not present or not equal to kafka in your spans" + } else { + status = "1" + } + } else if key == "kind" { + attribute = "kind" + if intValue != 0 { + status = "0" + message = "check if your producer spans has kind=4 as attribute" + } else { + status = "1" + } + } else if key == "destination" { + attribute = "messaging.destination.name" + if intValue != 0 { + status = "0" + message = "messaging.destination.name attribute is not present in your spans" + } else { + status = "1" + } + } else if key == "partition" { + attribute = "messaging.destination.partition.id" + if intValue != 0 { + status = "0" + message = "messaging.destination.partition.id attribute is not present in your spans" + } else { + status = "1" + } + } else if key == "svc" { + attribute = "service_name" + if intValue != 0 { + status = "0" + message = "service_name attribute is not present in your spans" + } else { + status = "1" + } + } else if key == "cgroup" { + attribute = "messaging.kafka.consumer.group" + if intValue != 0 { + status = "0" + message = "messaging.kafka.consumer.group attribute is not present in your spans" + } else { + status = "1" + } + } else if key == "bodysize" { + attribute = "messaging.message.body.size" + if intValue != 0 { + status = "0" + message = "messaging.message.body.size attribute is not present in your spans" + } else { + status = "1" + } + } else if key == "clientid" { + attribute = "messaging.client_id" + if intValue != 0 { + status = "0" + message = "messaging.client_id attribute is not present in your spans" + } else { + status = "1" + } + } else if key == "instanceid" { + attribute = "service.instance.id" + if intValue != 0 { + status = "0" + message = "service.instance.id attribute is not present in your spans" + } else { + status = "1" } } + + entry := mq.OnboardingResponse{ + Attribute: attribute, + Message: message, + Status: status, + } + entries = append(entries, entry) } } - transformedSeries.LabelsArray = append(transformedSeries.LabelsArray, map[string]string{"result": value}) - - for _, result := range result { - for i := range result.Series { - result.Series[i] = transformedSeries - } - } - - resp := v3.QueryRangeResponse{ - Result: result, - } - aH.Respond(w, resp) + aH.Respond(w, entries) } func (aH *APIHandler) onboardKafka( @@ -2657,63 +2736,72 @@ func (aH *APIHandler) onboardKafka( return } - queryRangeParams, err := mq.BuildQueryRangeParams(messagingQueue, "onboard_consumers") + chq, err := mq.BuildClickHouseQuery(messagingQueue, mq.KafkaQueue, "onboard_kafka") + if err != nil { zap.L().Error(err.Error()) RespondError(w, apiErr, nil) return } - if err := validateQueryRangeParamsV3(queryRangeParams); err != nil { - zap.L().Error(err.Error()) - RespondError(w, apiErr, nil) - return - } + result, err := aH.reader.GetListResultV3(r.Context(), chq.Query) - result, errQuriesByName, err := aH.querierV2.QueryRange(r.Context(), queryRangeParams) if err != nil { apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err} - RespondError(w, apiErrObj, errQuriesByName) + RespondError(w, apiErrObj, err) return } - transformedSeries := &v3.Series{ - Labels: make(map[string]string), - LabelsArray: make([]map[string]string, 0), - Points: make([]v3.Point, 0), - } - - var value string + var entries []mq.OnboardingResponse for _, result := range result { - for _, series := range result.Series { - for _, point := range series.Points { - switch point.Value { - case 0: - value = "All required metrics are present and meet the conditions" - case 1: - value = "Neither metric (kafka_consumer_fetch_latency_avg nor kafka_consumer_group_lag) is present in the given time range." - case 2: - value = "Metric kafka_consumer_fetch_latency_avg is not present in the given time range." - default: - value = "Unknown problem occurred, try increasing the time" + for key, value := range result.Data { + var message, attribute, status string + + intValue := int(*value.(*uint8)) + + if key == "entries" { + attribute = "telemetry ingestion" + if intValue != 0 { + entries = nil + entry := mq.OnboardingResponse{ + Attribute: attribute, + Message: "No data available in the given time range", + Status: "0", + } + entries = append(entries, entry) + break + } else { + status = "1" + } + } else if key == "fetchlatency" { + attribute = "kafka_consumer_fetch_latency_avg" + if intValue != 0 { + status = "0" + message = "Metric kafka_consumer_fetch_latency_avg is not present in the given time range." + } else { + status = "1" + } + } else if key == "grouplag" { + attribute = "kafka_consumer_group_lag" + if intValue != 0 { + status = "0" + message = "Metric kafka_consumer_group_lag is not present in the given time range." + } else { + status = "1" } } + + entry := mq.OnboardingResponse{ + Attribute: attribute, + Message: message, + Status: status, + } + entries = append(entries, entry) } } - transformedSeries.LabelsArray = append(transformedSeries.LabelsArray, map[string]string{"result": value}) - - for _, result := range result { - for i := range result.Series { - result.Series[i] = transformedSeries - } - } - - resp := v3.QueryRangeResponse{ - Result: result, - } - aH.Respond(w, resp) + aH.Respond(w, entries) } func (aH *APIHandler) getNetworkData( diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/model.go b/pkg/query-service/app/integrations/messagingQueues/kafka/model.go index f587868610..803912c17f 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/model.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/model.go @@ -1,6 +1,6 @@ package kafka -const kafkaQueue = "kafka" +const KafkaQueue = "kafka" type MessagingQueue struct { Start int64 `json:"start"` @@ -14,3 +14,9 @@ type Clients struct { ServiceInstanceID []string ServiceName []string } + +type OnboardingResponse struct { + Attribute string `json:"attribute"` + Message string `json:"error_message"` + Status string `json:"status"` +} diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go b/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go index fd50ac4e6f..a4d3191fe1 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go @@ -99,58 +99,50 @@ ORDER BY throughput DESC func onboardProducersSQL(start, end int64, queueType string) string { query := fmt.Sprintf(` SELECT - CASE - WHEN COUNT(*) = 0 THEN 1 - WHEN SUM(msgSystem = '%s') = 0 THEN 2 - WHEN SUM(kind = 4) = 0 THEN 3 - WHEN SUM(has(stringTagMap, 'messaging.destination.name')) = 0 THEN 4 - WHEN SUM(has(stringTagMap, 'messaging.destination.partition.id')) = 0 THEN 5 - ELSE 0 - END AS result_code -FROM signoz_traces.distributed_signoz_index_v2 + COUNT(*) = 0 AS entries, + COUNT(IF(msgSystem = '%s', 1, NULL)) = 0 AS queue, + COUNT(IF(kind = 4, 1, NULL)) = 0 AS kind, + COUNT(IF(has(stringTagMap, 'messaging.destination.name'), 1, NULL)) = 0 AS destination, + COUNT(IF(has(stringTagMap, 'messaging.destination.partition.id'), 1, NULL)) = 0 AS partition +FROM + signoz_traces.distributed_signoz_index_v2 WHERE - timestamp >= '%d' - AND timestamp <= '%d';`, queueType, start, end) + timestamp >= '%d' + AND timestamp <= '%d';`, queueType, start, end) return query } func onboardConsumerSQL(start, end int64, queueType string) string { query := fmt.Sprintf(` -SELECT - CASE - WHEN COUNT(*) = 0 THEN 1 - WHEN SUM(msgSystem = '%s') = 0 THEN 2 - WHEN SUM(kind = 5) = 0 THEN 3 - WHEN SUM(serviceName IS NOT NULL) = 0 THEN 4 - WHEN SUM(has(stringTagMap, 'messaging.destination.name')) = 0 THEN 5 - WHEN SUM(has(stringTagMap, 'messaging.destination.partition.id')) = 0 THEN 6 - WHEN SUM(has(stringTagMap, 'messaging.kafka.consumer.group')) = 0 THEN 7 - WHEN SUM(has(numberTagMap, 'messaging.message.body.size')) = 0 THEN 8 - WHEN SUM(has(stringTagMap, 'messaging.client_id')) = 0 THEN 9 - WHEN SUM(has(stringTagMap, 'service.instance.id')) = 0 THEN 10 - ELSE 0 - END AS result_code +SELECT + COUNT(*) = 0 AS entries, + COUNT(IF(msgSystem = '%s', 1, NULL)) = 0 AS queue, + COUNT(IF(kind = 5, 1, NULL)) = 0 AS kind, + COUNT(serviceName) = 0 AS svc, + COUNT(IF(has(stringTagMap, 'messaging.destination.name'), 1, NULL)) = 0 AS destination, + COUNT(IF(has(stringTagMap, 'messaging.destination.partition.id'), 1, NULL)) = 0 AS partition, + COUNT(IF(has(stringTagMap, 'messaging.kafka.consumer.group'), 1, NULL)) = 0 AS cgroup, + COUNT(IF(has(numberTagMap, 'messaging.message.body.size'), 1, NULL)) = 0 AS bodysize, + COUNT(IF(has(stringTagMap, 'messaging.client_id'), 1, NULL)) = 0 AS clientid, + COUNT(IF(has(stringTagMap, 'service.instance.id'), 1, NULL)) = 0 AS instanceid FROM signoz_traces.distributed_signoz_index_v2 WHERE - timestamp >= '%d' - AND timestamp <= '%d';`, queueType, start, end) + timestamp >= '%d' + AND timestamp <= '%d';`, queueType, start, end) return query } func onboardKafkaSQL(start, end int64) string { query := fmt.Sprintf(` -SELECT - CASE - WHEN COUNT(CASE WHEN metric_name = 'kafka_consumer_fetch_latency_avg' THEN 1 END) = 0 - AND COUNT(CASE WHEN metric_name = 'kafka_consumer_group_lag' THEN 1 END) = 0 THEN 1 - WHEN COUNT(CASE WHEN metric_name = 'kafka_consumer_fetch_latency_avg' THEN 1 END) = 0 THEN 2 - WHEN COUNT(CASE WHEN metric_name = 'kafka_consumer_group_lag' THEN 1 END) = 0 THEN 3 - ELSE 0 - END AS result_code -FROM signoz_metrics.time_series_v4_1day +SELECT + COUNT(*) = 0 AS entries, + COUNT(IF(metric_name = 'kafka_consumer_fetch_latency_avg', 1, NULL)) = 0 AS fetchlatency, + COUNT(IF(metric_name = 'kafka_consumer_group_lag', 1, NULL)) = 0 AS grouplag +FROM + signoz_metrics.time_series_v4_1day WHERE - metric_name IN ('kafka_consumer_fetch_latency_avg', 'kafka_consumer_group_lag') - AND unix_milli >= '%d' - AND unix_milli < '%d';`, start, end) + metric_name IN ('kafka_consumer_fetch_latency_avg', 'kafka_consumer_group_lag') + AND unix_milli >= '%d' + AND unix_milli < '%d';`, start/1000000, end/1000000) return query } diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go b/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go index 196a7da081..4dca2a2cda 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go @@ -13,16 +13,16 @@ var defaultStepInterval int64 = 60 func BuildQueryRangeParams(messagingQueue *MessagingQueue, queryContext string) (*v3.QueryRangeParamsV3, error) { // ToDo: propagate this through APIs when there are different handlers - queueType := kafkaQueue + queueType := KafkaQueue - var cq *v3.CompositeQuery - - chq, err := buildClickHouseQuery(messagingQueue, queueType, queryContext) + chq, err := BuildClickHouseQuery(messagingQueue, queueType, queryContext) if err != nil { return nil, err } + var cq *v3.CompositeQuery + cq, err = buildCompositeQuery(chq, queryContext) queryRangeParams := &v3.QueryRangeParamsV3{ @@ -37,6 +37,14 @@ func BuildQueryRangeParams(messagingQueue *MessagingQueue, queryContext string) return queryRangeParams, nil } +func PrepareClikhouseQueries(messagingQueue *MessagingQueue, queryContext string) (*v3.ClickHouseQuery, error) { + queueType := KafkaQueue + + chq, err := BuildClickHouseQuery(messagingQueue, queueType, queryContext) + + return chq, err +} + func buildClickHouseQueryNetwork(messagingQueue *MessagingQueue, queueType string) (*v3.ClickHouseQuery, error) { start := messagingQueue.Start end := messagingQueue.End @@ -137,7 +145,7 @@ func buildBuilderQueriesNetwork(unixMilliStart, unixMilliEnd int64, attributeCac func BuildQRParamsNetwork(messagingQueue *MessagingQueue, queryContext string, attributeCache *Clients) (*v3.QueryRangeParamsV3, error) { - queueType := kafkaQueue + queueType := KafkaQueue unixMilliStart := messagingQueue.Start / 1000000 unixMilliEnd := messagingQueue.End / 1000000 @@ -177,7 +185,7 @@ func BuildQRParamsNetwork(messagingQueue *MessagingQueue, queryContext string, a return queryRangeParams, nil } -func buildClickHouseQuery(messagingQueue *MessagingQueue, queueType string, queryContext string) (*v3.ClickHouseQuery, error) { +func BuildClickHouseQuery(messagingQueue *MessagingQueue, queueType string, queryContext string) (*v3.ClickHouseQuery, error) { start := messagingQueue.Start end := messagingQueue.End