From f38a1d9f1c3b162e3d3d0befa4aadaba62a8a829 Mon Sep 17 00:00:00 2001 From: shivanshu Date: Mon, 30 Sep 2024 14:42:26 +0530 Subject: [PATCH 1/4] feat: add the queries --- .../integrations/messagingQueues/kafka/sql.go | 59 +++++++++++++++++++ 1 file changed, 59 insertions(+) diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go b/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go index eb06689ef3..fd50ac4e6f 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go @@ -95,3 +95,62 @@ ORDER BY throughput DESC `, timeRange, start, end, queueType, consumerGroup, partitionID) return query } + +func onboardProducersSQL(start, end int64, queueType string) string { + query := fmt.Sprintf(` +SELECT + CASE + WHEN COUNT(*) = 0 THEN 1 + WHEN SUM(msgSystem = '%s') = 0 THEN 2 + WHEN SUM(kind = 4) = 0 THEN 3 + WHEN SUM(has(stringTagMap, 'messaging.destination.name')) = 0 THEN 4 + WHEN SUM(has(stringTagMap, 'messaging.destination.partition.id')) = 0 THEN 5 + ELSE 0 + END AS result_code +FROM signoz_traces.distributed_signoz_index_v2 +WHERE + timestamp >= '%d' + AND timestamp <= '%d';`, queueType, start, end) + return query +} + +func onboardConsumerSQL(start, end int64, queueType string) string { + query := fmt.Sprintf(` +SELECT + CASE + WHEN COUNT(*) = 0 THEN 1 + WHEN SUM(msgSystem = '%s') = 0 THEN 2 + WHEN SUM(kind = 5) = 0 THEN 3 + WHEN SUM(serviceName IS NOT NULL) = 0 THEN 4 + WHEN SUM(has(stringTagMap, 'messaging.destination.name')) = 0 THEN 5 + WHEN SUM(has(stringTagMap, 'messaging.destination.partition.id')) = 0 THEN 6 + WHEN SUM(has(stringTagMap, 'messaging.kafka.consumer.group')) = 0 THEN 7 + WHEN SUM(has(numberTagMap, 'messaging.message.body.size')) = 0 THEN 8 + WHEN SUM(has(stringTagMap, 'messaging.client_id')) = 0 THEN 9 + WHEN SUM(has(stringTagMap, 'service.instance.id')) = 0 THEN 10 + ELSE 0 + END AS result_code +FROM signoz_traces.distributed_signoz_index_v2 +WHERE + timestamp >= '%d' + AND timestamp <= '%d';`, queueType, start, end) + return query +} + +func onboardKafkaSQL(start, end int64) string { + query := fmt.Sprintf(` +SELECT + CASE + WHEN COUNT(CASE WHEN metric_name = 'kafka_consumer_fetch_latency_avg' THEN 1 END) = 0 + AND COUNT(CASE WHEN metric_name = 'kafka_consumer_group_lag' THEN 1 END) = 0 THEN 1 + WHEN COUNT(CASE WHEN metric_name = 'kafka_consumer_fetch_latency_avg' THEN 1 END) = 0 THEN 2 + WHEN COUNT(CASE WHEN metric_name = 'kafka_consumer_group_lag' THEN 1 END) = 0 THEN 3 + ELSE 0 + END AS result_code +FROM signoz_metrics.time_series_v4_1day +WHERE + metric_name IN ('kafka_consumer_fetch_latency_avg', 'kafka_consumer_group_lag') + AND unix_milli >= '%d' + AND unix_milli < '%d';`, start, end) + return query +} From ea0263cc73dd25383cda4dd0573c63f084e22264 Mon Sep 17 00:00:00 2001 From: shivanshu Date: Mon, 30 Sep 2024 18:13:39 +0530 Subject: [PATCH 2/4] feat: onboarding APIs --- pkg/query-service/app/http_handler.go | 238 ++++++++++++++++++ .../messagingQueues/kafka/consumerLag.md | 90 +++++++ .../messagingQueues/kafka/translator.go | 25 +- 3 files changed, 346 insertions(+), 7 deletions(-) diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 5055913113..b6a7c44262 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -2469,6 +2469,9 @@ func (aH *APIHandler) RegisterMessagingQueuesRoutes(router *mux.Router, am *Auth kafkaSubRouter.HandleFunc("/producer-details", am.ViewAccess(aH.getProducerData)).Methods(http.MethodPost) kafkaSubRouter.HandleFunc("/consumer-details", am.ViewAccess(aH.getConsumerData)).Methods(http.MethodPost) kafkaSubRouter.HandleFunc("/network-latency", am.ViewAccess(aH.getNetworkData)).Methods(http.MethodPost) + kafkaSubRouter.HandleFunc("/onboarding-producers", am.ViewAccess(aH.onboardProducers)).Methods(http.MethodPost) + kafkaSubRouter.HandleFunc("/onboarding-consumers", am.ViewAccess(aH.onboardConsumers)).Methods(http.MethodPost) + kafkaSubRouter.HandleFunc("/onboarding-kafka", am.ViewAccess(aH.onboardKafka)).Methods(http.MethodPost) // for other messaging queues, add SubRouters here } @@ -2478,6 +2481,241 @@ func uniqueIdentifier(clientID, serviceInstanceID, serviceName, separator string return clientID + separator + serviceInstanceID + separator + serviceName } +func (aH *APIHandler) onboardProducers( + + w http.ResponseWriter, r *http.Request, + +) { + messagingQueue, apiErr := ParseMessagingQueueBody(r) + if apiErr != nil { + zap.L().Error(apiErr.Err.Error()) + RespondError(w, apiErr, nil) + return + } + + queryRangeParams, err := mq.BuildQueryRangeParams(messagingQueue, "onboard_producers") + if err != nil { + zap.L().Error(err.Error()) + RespondError(w, apiErr, nil) + return + } + + if err := validateQueryRangeParamsV3(queryRangeParams); err != nil { + zap.L().Error(err.Error()) + RespondError(w, apiErr, nil) + return + } + + result, errQuriesByName, err := aH.querierV2.QueryRange(r.Context(), queryRangeParams) + if err != nil { + apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err} + RespondError(w, apiErrObj, errQuriesByName) + return + } + + transformedSeries := &v3.Series{ + Labels: make(map[string]string), + LabelsArray: make([]map[string]string, 0), + Points: make([]v3.Point, 0), + } + + var value string + + for _, result := range result { + for _, series := range result.Series { + for _, point := range series.Points { + switch point.Value { + case 0: + value = "All attributes are present and meet the conditions" + case 1: + value = "No data available in the given time range" + case 2: + value = "messaging.system attribute is not present or not equal to kafka in your spans" + case 3: + value = "check if your producer spans has kind producer" + case 4: + value = "messaging.destination.name attribute is not present in your spans" + case 5: + value = "messaging.destination.partition.id attribute is not present in your spans" + default: + value = "Unknown problem occurred, try increasing the time" + } + } + } + } + + transformedSeries.LabelsArray = append(transformedSeries.LabelsArray, map[string]string{"result": value}) + + for _, result := range result { + for i := range result.Series { + result.Series[i] = transformedSeries + } + } + + resp := v3.QueryRangeResponse{ + Result: result, + } + aH.Respond(w, resp) +} + +func (aH *APIHandler) onboardConsumers( + + w http.ResponseWriter, r *http.Request, + +) { + messagingQueue, apiErr := ParseMessagingQueueBody(r) + if apiErr != nil { + zap.L().Error(apiErr.Err.Error()) + RespondError(w, apiErr, nil) + return + } + + queryRangeParams, err := mq.BuildQueryRangeParams(messagingQueue, "onboard_consumers") + if err != nil { + zap.L().Error(err.Error()) + RespondError(w, apiErr, nil) + return + } + + if err := validateQueryRangeParamsV3(queryRangeParams); err != nil { + zap.L().Error(err.Error()) + RespondError(w, apiErr, nil) + return + } + + result, errQuriesByName, err := aH.querierV2.QueryRange(r.Context(), queryRangeParams) + if err != nil { + apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err} + RespondError(w, apiErrObj, errQuriesByName) + return + } + + transformedSeries := &v3.Series{ + Labels: make(map[string]string), + LabelsArray: make([]map[string]string, 0), + Points: make([]v3.Point, 0), + } + + var value string + + for _, result := range result { + for _, series := range result.Series { + for _, point := range series.Points { + switch point.Value { + case 0: + value = "All attributes are present and meet the conditions" + case 1: + value = "No data available in the given time range" + case 2: + value = "msgSystem attribute is not present or not equal to 'kafka' in your spans" + case 3: + value = "kind attribute is not present or not equal to 5 in your spans" + case 4: + value = "serviceName attribute is not present in your spans" + case 5: + value = "messaging.destination.name attribute is not present in your spans" + case 6: + value = "messaging.destination.partition.id attribute is not present in your spans" + case 7: + value = "messaging.kafka.consumer.group attribute is not present in your spans" + case 8: + value = "messaging.message.body.size attribute is not present in your spans" + case 9: + value = "messaging.client_id attribute is not present in your spans" + case 10: + value = "service.instance.id attribute is not present in your spans" + default: + value = "Unknown problem occurred, try increasing the time" + } + } + } + } + + transformedSeries.LabelsArray = append(transformedSeries.LabelsArray, map[string]string{"result": value}) + + for _, result := range result { + for i := range result.Series { + result.Series[i] = transformedSeries + } + } + + resp := v3.QueryRangeResponse{ + Result: result, + } + aH.Respond(w, resp) +} + +func (aH *APIHandler) onboardKafka( + + w http.ResponseWriter, r *http.Request, + +) { + messagingQueue, apiErr := ParseMessagingQueueBody(r) + if apiErr != nil { + zap.L().Error(apiErr.Err.Error()) + RespondError(w, apiErr, nil) + return + } + + queryRangeParams, err := mq.BuildQueryRangeParams(messagingQueue, "onboard_consumers") + if err != nil { + zap.L().Error(err.Error()) + RespondError(w, apiErr, nil) + return + } + + if err := validateQueryRangeParamsV3(queryRangeParams); err != nil { + zap.L().Error(err.Error()) + RespondError(w, apiErr, nil) + return + } + + result, errQuriesByName, err := aH.querierV2.QueryRange(r.Context(), queryRangeParams) + if err != nil { + apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err} + RespondError(w, apiErrObj, errQuriesByName) + return + } + + transformedSeries := &v3.Series{ + Labels: make(map[string]string), + LabelsArray: make([]map[string]string, 0), + Points: make([]v3.Point, 0), + } + + var value string + + for _, result := range result { + for _, series := range result.Series { + for _, point := range series.Points { + switch point.Value { + case 0: + value = "All required metrics are present and meet the conditions" + case 1: + value = "Neither metric (kafka_consumer_fetch_latency_avg nor kafka_consumer_group_lag) is present in the given time range." + case 2: + value = "Metric kafka_consumer_fetch_latency_avg is not present in the given time range." + default: + value = "Unknown problem occurred, try increasing the time" + } + } + } + } + + transformedSeries.LabelsArray = append(transformedSeries.LabelsArray, map[string]string{"result": value}) + + for _, result := range result { + for i := range result.Series { + result.Series[i] = transformedSeries + } + } + + resp := v3.QueryRangeResponse{ + Result: result, + } + aH.Respond(w, resp) +} + func (aH *APIHandler) getNetworkData( w http.ResponseWriter, r *http.Request, ) { diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md b/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md index 38b61669ff..2d4b80ac40 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md @@ -226,3 +226,93 @@ Response in query range `table` format } } ``` + +### Onboarding APIs + +``` +/api/v1/messaging-queues/kafka/consumer-lag/onboarding-producers +``` + +``` +{ + "status": "success", + "data": { + "resultType": "", + "result": [ + { + "queryName": "onboard_producers", + "series": [ + { + "labels": {}, + "labelsArray": [ + { + "result": "All attributes are present and meet the conditions" + } + ], + "values": [] + } + ] + } + ] + } +} +``` + + +``` +/api/v1/messaging-queues/kafka/consumer-lag/onboarding-consumers +``` + +``` +{ + "status": "success", + "data": { + "resultType": "", + "result": [ + { + "queryName": "onboard_consumers", + "series": [ + { + "labels": {}, + "labelsArray": [ + { + "result": "All attributes are present and meet the conditions" + } + ], + "values": [] + } + ] + } + ] + } +} +``` + +``` +127.0.0.1:8080/api/v1/messaging-queues/kafka/consumer-lag/onboarding-kafka +``` + +``` +{ + "status": "success", + "data": { + "resultType": "", + "result": [ + { + "queryName": "onboard_consumers", + "series": [ + { + "labels": {}, + "labelsArray": [ + { + "result": "Neither metric (kafka_consumer_fetch_latency_avg nor kafka_consumer_group_lag) is present in the given time range." + } + ], + "values": [] + } + ] + } + ] + } +} +``` diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go b/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go index 7f4266df67..196a7da081 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go @@ -180,14 +180,19 @@ func BuildQRParamsNetwork(messagingQueue *MessagingQueue, queryContext string, a func buildClickHouseQuery(messagingQueue *MessagingQueue, queueType string, queryContext string) (*v3.ClickHouseQuery, error) { start := messagingQueue.Start end := messagingQueue.End - topic, ok := messagingQueue.Variables["topic"] - if !ok { - return nil, fmt.Errorf("invalid type for Topic") - } - partition, ok := messagingQueue.Variables["partition"] - if !ok { - return nil, fmt.Errorf("invalid type for Partition") + var topic, partition string + + if queryContext == "producer" || queryContext == "consumer" { + var ok bool + topic, ok = messagingQueue.Variables["topic"] + if !ok { + return nil, fmt.Errorf("invalid type for Topic") + } + partition, ok = messagingQueue.Variables["partition"] + if !ok { + return nil, fmt.Errorf("invalid type for Partition") + } } var query string @@ -199,6 +204,12 @@ func buildClickHouseQuery(messagingQueue *MessagingQueue, queueType string, quer return nil, fmt.Errorf("invalid type for consumer group") } query = generateConsumerSQL(start, end, topic, partition, consumerGroup, queueType) + } else if queryContext == "onboard_producers" { + query = onboardProducersSQL(start, end, queueType) + } else if queryContext == "onboard_consumers" { + query = onboardConsumerSQL(start, end, queueType) + } else if queryContext == "onboard_kafka" { + query = onboardKafkaSQL(start, end) } return &v3.ClickHouseQuery{ From d380894c352d583d16d3a4dc584841db55eb465b Mon Sep 17 00:00:00 2001 From: shivanshu Date: Fri, 4 Oct 2024 01:10:13 +0530 Subject: [PATCH 3/4] feat: a bunch of advancements, query optimisation, new response format --- pkg/query-service/app/http_handler.go | 374 +++++++++++------- .../messagingQueues/kafka/model.go | 8 +- .../integrations/messagingQueues/kafka/sql.go | 70 ++-- .../messagingQueues/kafka/translator.go | 20 +- 4 files changed, 283 insertions(+), 189 deletions(-) diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index b6a7c44262..261b3d96df 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -11,6 +11,7 @@ import ( "net/http" "regexp" "slices" + "sort" "strconv" "strings" "sync" @@ -2463,15 +2464,19 @@ func (aH *APIHandler) WriteJSON(w http.ResponseWriter, r *http.Request, response // RegisterMessagingQueuesRoutes adds messaging-queues routes func (aH *APIHandler) RegisterMessagingQueuesRoutes(router *mux.Router, am *AuthMiddleware) { - // SubRouter for kafka - kafkaSubRouter := router.PathPrefix("/api/v1/messaging-queues/kafka/consumer-lag").Subrouter() - kafkaSubRouter.HandleFunc("/producer-details", am.ViewAccess(aH.getProducerData)).Methods(http.MethodPost) - kafkaSubRouter.HandleFunc("/consumer-details", am.ViewAccess(aH.getConsumerData)).Methods(http.MethodPost) - kafkaSubRouter.HandleFunc("/network-latency", am.ViewAccess(aH.getNetworkData)).Methods(http.MethodPost) - kafkaSubRouter.HandleFunc("/onboarding-producers", am.ViewAccess(aH.onboardProducers)).Methods(http.MethodPost) - kafkaSubRouter.HandleFunc("/onboarding-consumers", am.ViewAccess(aH.onboardConsumers)).Methods(http.MethodPost) - kafkaSubRouter.HandleFunc("/onboarding-kafka", am.ViewAccess(aH.onboardKafka)).Methods(http.MethodPost) + // SubRouter for kafka + kafkaRouter := router.PathPrefix("/api/v1/messaging-queues/kafka").Subrouter() + + consumerLagRouter := kafkaRouter.PathPrefix("/consumer-lag").Subrouter() + consumerLagRouter.HandleFunc("/producer-details", am.ViewAccess(aH.getProducerData)).Methods(http.MethodPost) + consumerLagRouter.HandleFunc("/consumer-details", am.ViewAccess(aH.getConsumerData)).Methods(http.MethodPost) + consumerLagRouter.HandleFunc("/network-latency", am.ViewAccess(aH.getNetworkData)).Methods(http.MethodPost) + + onboardingRouter := kafkaRouter.PathPrefix("/onboarding").Subrouter() + onboardingRouter.HandleFunc("/producers", am.ViewAccess(aH.onboardProducers)).Methods(http.MethodPost) + onboardingRouter.HandleFunc("/consumers", am.ViewAccess(aH.onboardConsumers)).Methods(http.MethodPost) + onboardingRouter.HandleFunc("/kafka", am.ViewAccess(aH.onboardKafka)).Methods(http.MethodPost) // for other messaging queues, add SubRouters here } @@ -2493,69 +2498,94 @@ func (aH *APIHandler) onboardProducers( return } - queryRangeParams, err := mq.BuildQueryRangeParams(messagingQueue, "onboard_producers") + chq, err := mq.BuildClickHouseQuery(messagingQueue, mq.KafkaQueue, "onboard_producers") + if err != nil { zap.L().Error(err.Error()) RespondError(w, apiErr, nil) return } - if err := validateQueryRangeParamsV3(queryRangeParams); err != nil { - zap.L().Error(err.Error()) - RespondError(w, apiErr, nil) - return - } + results, err := aH.reader.GetListResultV3(r.Context(), chq.Query) - result, errQuriesByName, err := aH.querierV2.QueryRange(r.Context(), queryRangeParams) if err != nil { apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err} - RespondError(w, apiErrObj, errQuriesByName) + RespondError(w, apiErrObj, err) return } - transformedSeries := &v3.Series{ - Labels: make(map[string]string), - LabelsArray: make([]map[string]string, 0), - Points: make([]v3.Point, 0), - } + var entries []mq.OnboardingResponse - var value string + for _, result := range results { - for _, result := range result { - for _, series := range result.Series { - for _, point := range series.Points { - switch point.Value { - case 0: - value = "All attributes are present and meet the conditions" - case 1: - value = "No data available in the given time range" - case 2: - value = "messaging.system attribute is not present or not equal to kafka in your spans" - case 3: - value = "check if your producer spans has kind producer" - case 4: - value = "messaging.destination.name attribute is not present in your spans" - case 5: - value = "messaging.destination.partition.id attribute is not present in your spans" - default: - value = "Unknown problem occurred, try increasing the time" + for key, value := range result.Data { + var message, attribute, status string + + intValue := int(*value.(*uint8)) + + if key == "entries" { + attribute = "telemetry ingestion" + if intValue != 0 { + entries = nil + entry := mq.OnboardingResponse{ + Attribute: attribute, + Message: "No data available in the given time range", + Status: "0", + } + entries = append(entries, entry) + break + } else { + status = "1" + } + } else if key == "queue" { + attribute = "messaging.system" + if intValue != 0 { + status = "0" + message = "messaging.system attribute is not present or not equal to kafka in your spans" + } else { + status = "1" + } + } else if key == "kind" { + attribute = "kind" + if intValue != 0 { + status = "0" + message = "check if your producer spans has kind=4 as attribute" + } else { + status = "1" + } + } else if key == "destination" { + attribute = "messaging.destination.name" + if intValue != 0 { + status = "0" + message = "messaging.destination.name attribute is not present in your spans" + } else { + status = "1" + } + } else if key == "partition" { + attribute = "messaging.destination.partition.id" + if intValue != 0 { + status = "0" + message = "messaging.destination.partition.id attribute is not present in your spans" + } else { + status = "1" } } + + entry := mq.OnboardingResponse{ + Attribute: attribute, + Message: message, + Status: status, + } + + entries = append(entries, entry) } } - transformedSeries.LabelsArray = append(transformedSeries.LabelsArray, map[string]string{"result": value}) + sort.Slice(entries, func(i, j int) bool { + return entries[i].Attribute < entries[j].Attribute + }) - for _, result := range result { - for i := range result.Series { - result.Series[i] = transformedSeries - } - } - - resp := v3.QueryRangeResponse{ - Result: result, - } - aH.Respond(w, resp) + aH.Respond(w, entries) } func (aH *APIHandler) onboardConsumers( @@ -2570,79 +2600,128 @@ func (aH *APIHandler) onboardConsumers( return } - queryRangeParams, err := mq.BuildQueryRangeParams(messagingQueue, "onboard_consumers") + chq, err := mq.BuildClickHouseQuery(messagingQueue, mq.KafkaQueue, "onboard_producers") + if err != nil { zap.L().Error(err.Error()) RespondError(w, apiErr, nil) return } - if err := validateQueryRangeParamsV3(queryRangeParams); err != nil { - zap.L().Error(err.Error()) - RespondError(w, apiErr, nil) - return - } + result, err := aH.reader.GetListResultV3(r.Context(), chq.Query) - result, errQuriesByName, err := aH.querierV2.QueryRange(r.Context(), queryRangeParams) if err != nil { apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err} - RespondError(w, apiErrObj, errQuriesByName) + RespondError(w, apiErrObj, err) return } - transformedSeries := &v3.Series{ - Labels: make(map[string]string), - LabelsArray: make([]map[string]string, 0), - Points: make([]v3.Point, 0), - } - - var value string + var entries []mq.OnboardingResponse for _, result := range result { - for _, series := range result.Series { - for _, point := range series.Points { - switch point.Value { - case 0: - value = "All attributes are present and meet the conditions" - case 1: - value = "No data available in the given time range" - case 2: - value = "msgSystem attribute is not present or not equal to 'kafka' in your spans" - case 3: - value = "kind attribute is not present or not equal to 5 in your spans" - case 4: - value = "serviceName attribute is not present in your spans" - case 5: - value = "messaging.destination.name attribute is not present in your spans" - case 6: - value = "messaging.destination.partition.id attribute is not present in your spans" - case 7: - value = "messaging.kafka.consumer.group attribute is not present in your spans" - case 8: - value = "messaging.message.body.size attribute is not present in your spans" - case 9: - value = "messaging.client_id attribute is not present in your spans" - case 10: - value = "service.instance.id attribute is not present in your spans" - default: - value = "Unknown problem occurred, try increasing the time" + for key, value := range result.Data { + var message, attribute, status string + + intValue := int(*value.(*uint8)) + + if key == "entries" { + attribute = "telemetry ingestion" + if intValue != 0 { + entries = nil + entry := mq.OnboardingResponse{ + Attribute: attribute, + Message: "No data available in the given time range", + Status: "0", + } + entries = append(entries, entry) + break + } else { + status = "1" + } + } else if key == "queue" { + attribute = "messaging.system" + if intValue != 0 { + status = "0" + message = "messaging.system attribute is not present or not equal to kafka in your spans" + } else { + status = "1" + } + } else if key == "kind" { + attribute = "kind" + if intValue != 0 { + status = "0" + message = "check if your producer spans has kind=4 as attribute" + } else { + status = "1" + } + } else if key == "destination" { + attribute = "messaging.destination.name" + if intValue != 0 { + status = "0" + message = "messaging.destination.name attribute is not present in your spans" + } else { + status = "1" + } + } else if key == "partition" { + attribute = "messaging.destination.partition.id" + if intValue != 0 { + status = "0" + message = "messaging.destination.partition.id attribute is not present in your spans" + } else { + status = "1" + } + } else if key == "svc" { + attribute = "service_name" + if intValue != 0 { + status = "0" + message = "service_name attribute is not present in your spans" + } else { + status = "1" + } + } else if key == "cgroup" { + attribute = "messaging.kafka.consumer.group" + if intValue != 0 { + status = "0" + message = "messaging.kafka.consumer.group attribute is not present in your spans" + } else { + status = "1" + } + } else if key == "bodysize" { + attribute = "messaging.message.body.size" + if intValue != 0 { + status = "0" + message = "messaging.message.body.size attribute is not present in your spans" + } else { + status = "1" + } + } else if key == "clientid" { + attribute = "messaging.client_id" + if intValue != 0 { + status = "0" + message = "messaging.client_id attribute is not present in your spans" + } else { + status = "1" + } + } else if key == "instanceid" { + attribute = "service.instance.id" + if intValue != 0 { + status = "0" + message = "service.instance.id attribute is not present in your spans" + } else { + status = "1" } } + + entry := mq.OnboardingResponse{ + Attribute: attribute, + Message: message, + Status: status, + } + entries = append(entries, entry) } } - transformedSeries.LabelsArray = append(transformedSeries.LabelsArray, map[string]string{"result": value}) - - for _, result := range result { - for i := range result.Series { - result.Series[i] = transformedSeries - } - } - - resp := v3.QueryRangeResponse{ - Result: result, - } - aH.Respond(w, resp) + aH.Respond(w, entries) } func (aH *APIHandler) onboardKafka( @@ -2657,63 +2736,72 @@ func (aH *APIHandler) onboardKafka( return } - queryRangeParams, err := mq.BuildQueryRangeParams(messagingQueue, "onboard_consumers") + chq, err := mq.BuildClickHouseQuery(messagingQueue, mq.KafkaQueue, "onboard_kafka") + if err != nil { zap.L().Error(err.Error()) RespondError(w, apiErr, nil) return } - if err := validateQueryRangeParamsV3(queryRangeParams); err != nil { - zap.L().Error(err.Error()) - RespondError(w, apiErr, nil) - return - } + result, err := aH.reader.GetListResultV3(r.Context(), chq.Query) - result, errQuriesByName, err := aH.querierV2.QueryRange(r.Context(), queryRangeParams) if err != nil { apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err} - RespondError(w, apiErrObj, errQuriesByName) + RespondError(w, apiErrObj, err) return } - transformedSeries := &v3.Series{ - Labels: make(map[string]string), - LabelsArray: make([]map[string]string, 0), - Points: make([]v3.Point, 0), - } - - var value string + var entries []mq.OnboardingResponse for _, result := range result { - for _, series := range result.Series { - for _, point := range series.Points { - switch point.Value { - case 0: - value = "All required metrics are present and meet the conditions" - case 1: - value = "Neither metric (kafka_consumer_fetch_latency_avg nor kafka_consumer_group_lag) is present in the given time range." - case 2: - value = "Metric kafka_consumer_fetch_latency_avg is not present in the given time range." - default: - value = "Unknown problem occurred, try increasing the time" + for key, value := range result.Data { + var message, attribute, status string + + intValue := int(*value.(*uint8)) + + if key == "entries" { + attribute = "telemetry ingestion" + if intValue != 0 { + entries = nil + entry := mq.OnboardingResponse{ + Attribute: attribute, + Message: "No data available in the given time range", + Status: "0", + } + entries = append(entries, entry) + break + } else { + status = "1" + } + } else if key == "fetchlatency" { + attribute = "kafka_consumer_fetch_latency_avg" + if intValue != 0 { + status = "0" + message = "Metric kafka_consumer_fetch_latency_avg is not present in the given time range." + } else { + status = "1" + } + } else if key == "grouplag" { + attribute = "kafka_consumer_group_lag" + if intValue != 0 { + status = "0" + message = "Metric kafka_consumer_group_lag is not present in the given time range." + } else { + status = "1" } } + + entry := mq.OnboardingResponse{ + Attribute: attribute, + Message: message, + Status: status, + } + entries = append(entries, entry) } } - transformedSeries.LabelsArray = append(transformedSeries.LabelsArray, map[string]string{"result": value}) - - for _, result := range result { - for i := range result.Series { - result.Series[i] = transformedSeries - } - } - - resp := v3.QueryRangeResponse{ - Result: result, - } - aH.Respond(w, resp) + aH.Respond(w, entries) } func (aH *APIHandler) getNetworkData( diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/model.go b/pkg/query-service/app/integrations/messagingQueues/kafka/model.go index f587868610..803912c17f 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/model.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/model.go @@ -1,6 +1,6 @@ package kafka -const kafkaQueue = "kafka" +const KafkaQueue = "kafka" type MessagingQueue struct { Start int64 `json:"start"` @@ -14,3 +14,9 @@ type Clients struct { ServiceInstanceID []string ServiceName []string } + +type OnboardingResponse struct { + Attribute string `json:"attribute"` + Message string `json:"error_message"` + Status string `json:"status"` +} diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go b/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go index fd50ac4e6f..a4d3191fe1 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go @@ -99,58 +99,50 @@ ORDER BY throughput DESC func onboardProducersSQL(start, end int64, queueType string) string { query := fmt.Sprintf(` SELECT - CASE - WHEN COUNT(*) = 0 THEN 1 - WHEN SUM(msgSystem = '%s') = 0 THEN 2 - WHEN SUM(kind = 4) = 0 THEN 3 - WHEN SUM(has(stringTagMap, 'messaging.destination.name')) = 0 THEN 4 - WHEN SUM(has(stringTagMap, 'messaging.destination.partition.id')) = 0 THEN 5 - ELSE 0 - END AS result_code -FROM signoz_traces.distributed_signoz_index_v2 + COUNT(*) = 0 AS entries, + COUNT(IF(msgSystem = '%s', 1, NULL)) = 0 AS queue, + COUNT(IF(kind = 4, 1, NULL)) = 0 AS kind, + COUNT(IF(has(stringTagMap, 'messaging.destination.name'), 1, NULL)) = 0 AS destination, + COUNT(IF(has(stringTagMap, 'messaging.destination.partition.id'), 1, NULL)) = 0 AS partition +FROM + signoz_traces.distributed_signoz_index_v2 WHERE - timestamp >= '%d' - AND timestamp <= '%d';`, queueType, start, end) + timestamp >= '%d' + AND timestamp <= '%d';`, queueType, start, end) return query } func onboardConsumerSQL(start, end int64, queueType string) string { query := fmt.Sprintf(` -SELECT - CASE - WHEN COUNT(*) = 0 THEN 1 - WHEN SUM(msgSystem = '%s') = 0 THEN 2 - WHEN SUM(kind = 5) = 0 THEN 3 - WHEN SUM(serviceName IS NOT NULL) = 0 THEN 4 - WHEN SUM(has(stringTagMap, 'messaging.destination.name')) = 0 THEN 5 - WHEN SUM(has(stringTagMap, 'messaging.destination.partition.id')) = 0 THEN 6 - WHEN SUM(has(stringTagMap, 'messaging.kafka.consumer.group')) = 0 THEN 7 - WHEN SUM(has(numberTagMap, 'messaging.message.body.size')) = 0 THEN 8 - WHEN SUM(has(stringTagMap, 'messaging.client_id')) = 0 THEN 9 - WHEN SUM(has(stringTagMap, 'service.instance.id')) = 0 THEN 10 - ELSE 0 - END AS result_code +SELECT + COUNT(*) = 0 AS entries, + COUNT(IF(msgSystem = '%s', 1, NULL)) = 0 AS queue, + COUNT(IF(kind = 5, 1, NULL)) = 0 AS kind, + COUNT(serviceName) = 0 AS svc, + COUNT(IF(has(stringTagMap, 'messaging.destination.name'), 1, NULL)) = 0 AS destination, + COUNT(IF(has(stringTagMap, 'messaging.destination.partition.id'), 1, NULL)) = 0 AS partition, + COUNT(IF(has(stringTagMap, 'messaging.kafka.consumer.group'), 1, NULL)) = 0 AS cgroup, + COUNT(IF(has(numberTagMap, 'messaging.message.body.size'), 1, NULL)) = 0 AS bodysize, + COUNT(IF(has(stringTagMap, 'messaging.client_id'), 1, NULL)) = 0 AS clientid, + COUNT(IF(has(stringTagMap, 'service.instance.id'), 1, NULL)) = 0 AS instanceid FROM signoz_traces.distributed_signoz_index_v2 WHERE - timestamp >= '%d' - AND timestamp <= '%d';`, queueType, start, end) + timestamp >= '%d' + AND timestamp <= '%d';`, queueType, start, end) return query } func onboardKafkaSQL(start, end int64) string { query := fmt.Sprintf(` -SELECT - CASE - WHEN COUNT(CASE WHEN metric_name = 'kafka_consumer_fetch_latency_avg' THEN 1 END) = 0 - AND COUNT(CASE WHEN metric_name = 'kafka_consumer_group_lag' THEN 1 END) = 0 THEN 1 - WHEN COUNT(CASE WHEN metric_name = 'kafka_consumer_fetch_latency_avg' THEN 1 END) = 0 THEN 2 - WHEN COUNT(CASE WHEN metric_name = 'kafka_consumer_group_lag' THEN 1 END) = 0 THEN 3 - ELSE 0 - END AS result_code -FROM signoz_metrics.time_series_v4_1day +SELECT + COUNT(*) = 0 AS entries, + COUNT(IF(metric_name = 'kafka_consumer_fetch_latency_avg', 1, NULL)) = 0 AS fetchlatency, + COUNT(IF(metric_name = 'kafka_consumer_group_lag', 1, NULL)) = 0 AS grouplag +FROM + signoz_metrics.time_series_v4_1day WHERE - metric_name IN ('kafka_consumer_fetch_latency_avg', 'kafka_consumer_group_lag') - AND unix_milli >= '%d' - AND unix_milli < '%d';`, start, end) + metric_name IN ('kafka_consumer_fetch_latency_avg', 'kafka_consumer_group_lag') + AND unix_milli >= '%d' + AND unix_milli < '%d';`, start/1000000, end/1000000) return query } diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go b/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go index 196a7da081..4dca2a2cda 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go @@ -13,16 +13,16 @@ var defaultStepInterval int64 = 60 func BuildQueryRangeParams(messagingQueue *MessagingQueue, queryContext string) (*v3.QueryRangeParamsV3, error) { // ToDo: propagate this through APIs when there are different handlers - queueType := kafkaQueue + queueType := KafkaQueue - var cq *v3.CompositeQuery - - chq, err := buildClickHouseQuery(messagingQueue, queueType, queryContext) + chq, err := BuildClickHouseQuery(messagingQueue, queueType, queryContext) if err != nil { return nil, err } + var cq *v3.CompositeQuery + cq, err = buildCompositeQuery(chq, queryContext) queryRangeParams := &v3.QueryRangeParamsV3{ @@ -37,6 +37,14 @@ func BuildQueryRangeParams(messagingQueue *MessagingQueue, queryContext string) return queryRangeParams, nil } +func PrepareClikhouseQueries(messagingQueue *MessagingQueue, queryContext string) (*v3.ClickHouseQuery, error) { + queueType := KafkaQueue + + chq, err := BuildClickHouseQuery(messagingQueue, queueType, queryContext) + + return chq, err +} + func buildClickHouseQueryNetwork(messagingQueue *MessagingQueue, queueType string) (*v3.ClickHouseQuery, error) { start := messagingQueue.Start end := messagingQueue.End @@ -137,7 +145,7 @@ func buildBuilderQueriesNetwork(unixMilliStart, unixMilliEnd int64, attributeCac func BuildQRParamsNetwork(messagingQueue *MessagingQueue, queryContext string, attributeCache *Clients) (*v3.QueryRangeParamsV3, error) { - queueType := kafkaQueue + queueType := KafkaQueue unixMilliStart := messagingQueue.Start / 1000000 unixMilliEnd := messagingQueue.End / 1000000 @@ -177,7 +185,7 @@ func BuildQRParamsNetwork(messagingQueue *MessagingQueue, queryContext string, a return queryRangeParams, nil } -func buildClickHouseQuery(messagingQueue *MessagingQueue, queueType string, queryContext string) (*v3.ClickHouseQuery, error) { +func BuildClickHouseQuery(messagingQueue *MessagingQueue, queueType string, queryContext string) (*v3.ClickHouseQuery, error) { start := messagingQueue.Start end := messagingQueue.End From 98cbdf570fc319877b9673138bcd421f9ce10cc0 Mon Sep 17 00:00:00 2001 From: shivanshu Date: Fri, 4 Oct 2024 01:44:52 +0530 Subject: [PATCH 4/4] feat: api documentation and nits --- pkg/query-service/app/http_handler.go | 12 +- .../messagingQueues/kafka/consumerLag.md | 329 ++++++++++++++---- 2 files changed, 276 insertions(+), 65 deletions(-) diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 261b3d96df..7078566eca 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -2600,7 +2600,7 @@ func (aH *APIHandler) onboardConsumers( return } - chq, err := mq.BuildClickHouseQuery(messagingQueue, mq.KafkaQueue, "onboard_producers") + chq, err := mq.BuildClickHouseQuery(messagingQueue, mq.KafkaQueue, "onboard_consumers") if err != nil { zap.L().Error(err.Error()) @@ -2650,7 +2650,7 @@ func (aH *APIHandler) onboardConsumers( attribute = "kind" if intValue != 0 { status = "0" - message = "check if your producer spans has kind=4 as attribute" + message = "check if your consumer spans has kind=5 as attribute" } else { status = "1" } @@ -2721,6 +2721,10 @@ func (aH *APIHandler) onboardConsumers( } } + sort.Slice(entries, func(i, j int) bool { + return entries[i].Attribute < entries[j].Attribute + }) + aH.Respond(w, entries) } @@ -2801,6 +2805,10 @@ func (aH *APIHandler) onboardKafka( } } + sort.Slice(entries, func(i, j int) bool { + return entries[i].Attribute < entries[j].Attribute + }) + aH.Respond(w, entries) } diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md b/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md index 2d4b80ac40..d338a9acb9 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md @@ -230,89 +230,292 @@ Response in query range `table` format ### Onboarding APIs ``` -/api/v1/messaging-queues/kafka/consumer-lag/onboarding-producers +/api/v1/messaging-queues/kafka/onboarding/producers ``` +```json +{ + "start": 1727620544260611424, + "end": 1727620556401481428 +} ``` + +```json +// everything is present { "status": "success", - "data": { - "resultType": "", - "result": [ - { - "queryName": "onboard_producers", - "series": [ - { - "labels": {}, - "labelsArray": [ - { - "result": "All attributes are present and meet the conditions" - } - ], - "values": [] - } - ] - } - ] - } + "data": [ + { + "attribute": "kind", + "error_message": "", + "status": "1" + }, + { + "attribute": "messaging.destination.name", + "error_message": "", + "status": "1" + }, + { + "attribute": "messaging.destination.partition.id", + "error_message": "messaging.destination.partition.id attribute is not present in your spans", + "status": "0" + }, + { + "attribute": "messaging.system", + "error_message": "", + "status": "1" + }, + { + "attribute": "telemetry ingestion", + "error_message": "", + "status": "1" + } + ] +} + +// partial attributes are present +{ + "status": "success", + "data": [ + { + "attribute": "kind", + "error_message": "", + "status": "1" + }, + { + "attribute": "messaging.destination.name", + "error_message": "messaging.destination.name attribute is not present in your spans", + "status": "0" + }, + { + "attribute": "messaging.destination.partition.id", + "error_message": "", + "status": "1" + }, + { + "attribute": "messaging.system", + "error_message": "", + "status": "1" + }, + { + "attribute": "telemetry ingestion", + "error_message": "", + "status": "1" + } + ] +} + +// no data available +{ + "status": "success", + "data": [ + { + "attribute": "telemetry ingestion", + "error_message": "No data available in the given time range", + "status": "0" + } + ] } ``` ``` -/api/v1/messaging-queues/kafka/consumer-lag/onboarding-consumers +/api/v1/messaging-queues/kafka/onboarding/consumers ``` -``` +```json +// everything is present { - "status": "success", - "data": { - "resultType": "", - "result": [ - { - "queryName": "onboard_consumers", - "series": [ - { - "labels": {}, - "labelsArray": [ - { - "result": "All attributes are present and meet the conditions" - } - ], - "values": [] - } - ] - } - ] + "status": "success", + "data": [ + { + "attribute": "kind", + "error_message": "", + "status": "1" + }, + { + "attribute": "messaging.client_id", + "error_message": "", + "status": "1" + }, + { + "attribute": "messaging.destination.name", + "error_message": "", + "status": "1" + }, + { + "attribute": "messaging.destination.partition.id", + "error_message": "", + "status": "1" + }, + { + "attribute": "messaging.kafka.consumer.group", + "error_message": "", + "status": "1" + }, + { + "attribute": "messaging.message.body.size", + "error_message": "", + "status": "1" + }, + { + "attribute": "messaging.system", + "error_message": "", + "status": "1" + }, + { + "attribute": "service.instance.id", + "error_message": "", + "status": "1" + }, + { + "attribute": "service_name", + "error_message": "", + "status": "1" + }, + { + "attribute": "telemetry ingestion", + "error_message": "", + "status": "1" } + ] +} + +// partial attributes are present +{ + "status": "success", + "data": [ + { + "attribute": "kind", + "error_message": "check if your consumer spans has kind=5 as attribute", + "status": "0" + }, + { + "attribute": "messaging.client_id", + "error_message": "", + "status": "1" + }, + { + "attribute": "messaging.destination.name", + "error_message": "", + "status": "1" + }, + { + "attribute": "messaging.destination.partition.id", + "error_message": "", + "status": "1" + }, + { + "attribute": "messaging.kafka.consumer.group", + "error_message": "messaging.kafka.consumer.group attribute is not present in your spans", + "status": "0" + }, + { + "attribute": "messaging.message.body.size", + "error_message": "messaging.message.body.size attribute is not present in your spans", + "status": "0" + }, + { + "attribute": "messaging.system", + "error_message": "", + "status": "1" + }, + { + "attribute": "service.instance.id", + "error_message": "", + "status": "1" + }, + { + "attribute": "service_name", + "error_message": "", + "status": "1" + }, + { + "attribute": "telemetry ingestion", + "error_message": "", + "status": "1" + } + ] +} + +// no data available +{ + "status": "success", + "data": [ + { + "attribute": "telemetry ingestion", + "error_message": "No data available in the given time range", + "status": "0" + } + ] } ``` ``` -127.0.0.1:8080/api/v1/messaging-queues/kafka/consumer-lag/onboarding-kafka +/api/v1/messaging-queues/kafka/onboarding/kafka ``` -``` +```json { - "status": "success", - "data": { - "resultType": "", - "result": [ - { - "queryName": "onboard_consumers", - "series": [ - { - "labels": {}, - "labelsArray": [ - { - "result": "Neither metric (kafka_consumer_fetch_latency_avg nor kafka_consumer_group_lag) is present in the given time range." - } - ], - "values": [] - } - ] - } - ] - } + "start": 1728485200000000000, + "end": 1728749800000000000 +} +``` + +```json +// everything is present +{ + "status": "success", + "data": [ + { + "attribute": "telemetry ingestion", + "error_message": "", + "status": "1" + }, + { + "attribute": "kafka_consumer_fetch_latency_avg", + "error_message": "Metric kafka_consumer_fetch_latency_avg is not present in the given time range.", + "status": "0" + }, + { + "attribute": "kafka_consumer_group_lag", + "error_message": "Metric kafka_consumer_group_lag is not present in the given time range.", + "status": "0" + } + ] +} + +// partial attributes are present +{ + "status": "success", + "data": [ + { + "attribute": "telemetry ingestion", + "error_message": "", + "status": "1" + }, + { + "attribute": "kafka_consumer_fetch_latency_avg", + "error_message": "", + "status": "1" + }, + { + "attribute": "kafka_consumer_group_lag", + "error_message": "", + "status": "1" + } + ] +} + +// no data available +{ + "status": "success", + "data": [ + { + "attribute": "telemetry ingestion", + "error_message": "No data available in the given time range", + "status": "0" + } + ] } ```