diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 5055913113..7078566eca 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -11,6 +11,7 @@ import ( "net/http" "regexp" "slices" + "sort" "strconv" "strings" "sync" @@ -2463,12 +2464,19 @@ func (aH *APIHandler) WriteJSON(w http.ResponseWriter, r *http.Request, response // RegisterMessagingQueuesRoutes adds messaging-queues routes func (aH *APIHandler) RegisterMessagingQueuesRoutes(router *mux.Router, am *AuthMiddleware) { - // SubRouter for kafka - kafkaSubRouter := router.PathPrefix("/api/v1/messaging-queues/kafka/consumer-lag").Subrouter() - kafkaSubRouter.HandleFunc("/producer-details", am.ViewAccess(aH.getProducerData)).Methods(http.MethodPost) - kafkaSubRouter.HandleFunc("/consumer-details", am.ViewAccess(aH.getConsumerData)).Methods(http.MethodPost) - kafkaSubRouter.HandleFunc("/network-latency", am.ViewAccess(aH.getNetworkData)).Methods(http.MethodPost) + // SubRouter for kafka + kafkaRouter := router.PathPrefix("/api/v1/messaging-queues/kafka").Subrouter() + + consumerLagRouter := kafkaRouter.PathPrefix("/consumer-lag").Subrouter() + consumerLagRouter.HandleFunc("/producer-details", am.ViewAccess(aH.getProducerData)).Methods(http.MethodPost) + consumerLagRouter.HandleFunc("/consumer-details", am.ViewAccess(aH.getConsumerData)).Methods(http.MethodPost) + consumerLagRouter.HandleFunc("/network-latency", am.ViewAccess(aH.getNetworkData)).Methods(http.MethodPost) + + onboardingRouter := kafkaRouter.PathPrefix("/onboarding").Subrouter() + onboardingRouter.HandleFunc("/producers", am.ViewAccess(aH.onboardProducers)).Methods(http.MethodPost) + onboardingRouter.HandleFunc("/consumers", am.ViewAccess(aH.onboardConsumers)).Methods(http.MethodPost) + onboardingRouter.HandleFunc("/kafka", am.ViewAccess(aH.onboardKafka)).Methods(http.MethodPost) // for other messaging queues, add SubRouters here } @@ -2478,6 +2486,332 @@ func uniqueIdentifier(clientID, serviceInstanceID, serviceName, separator string return clientID + separator + serviceInstanceID + separator + serviceName } +func (aH *APIHandler) onboardProducers( + + w http.ResponseWriter, r *http.Request, + +) { + messagingQueue, apiErr := ParseMessagingQueueBody(r) + if apiErr != nil { + zap.L().Error(apiErr.Err.Error()) + RespondError(w, apiErr, nil) + return + } + + chq, err := mq.BuildClickHouseQuery(messagingQueue, mq.KafkaQueue, "onboard_producers") + + if err != nil { + zap.L().Error(err.Error()) + RespondError(w, apiErr, nil) + return + } + + results, err := aH.reader.GetListResultV3(r.Context(), chq.Query) + + if err != nil { + apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err} + RespondError(w, apiErrObj, err) + return + } + + var entries []mq.OnboardingResponse + + for _, result := range results { + + for key, value := range result.Data { + var message, attribute, status string + + intValue := int(*value.(*uint8)) + + if key == "entries" { + attribute = "telemetry ingestion" + if intValue != 0 { + entries = nil + entry := mq.OnboardingResponse{ + Attribute: attribute, + Message: "No data available in the given time range", + Status: "0", + } + entries = append(entries, entry) + break + } else { + status = "1" + } + } else if key == "queue" { + attribute = "messaging.system" + if intValue != 0 { + status = "0" + message = "messaging.system attribute is not present or not equal to kafka in your spans" + } else { + status = "1" + } + } else if key == "kind" { + attribute = "kind" + if intValue != 0 { + status = "0" + message = "check if your producer spans has kind=4 as attribute" + } else { + status = "1" + } + } else if key == "destination" { + attribute = "messaging.destination.name" + if intValue != 0 { + status = "0" + message = "messaging.destination.name attribute is not present in your spans" + } else { + status = "1" + } + } else if key == "partition" { + attribute = "messaging.destination.partition.id" + if intValue != 0 { + status = "0" + message = "messaging.destination.partition.id attribute is not present in your spans" + } else { + status = "1" + } + } + + entry := mq.OnboardingResponse{ + Attribute: attribute, + Message: message, + Status: status, + } + + entries = append(entries, entry) + } + } + + sort.Slice(entries, func(i, j int) bool { + return entries[i].Attribute < entries[j].Attribute + }) + + aH.Respond(w, entries) +} + +func (aH *APIHandler) onboardConsumers( + + w http.ResponseWriter, r *http.Request, + +) { + messagingQueue, apiErr := ParseMessagingQueueBody(r) + if apiErr != nil { + zap.L().Error(apiErr.Err.Error()) + RespondError(w, apiErr, nil) + return + } + + chq, err := mq.BuildClickHouseQuery(messagingQueue, mq.KafkaQueue, "onboard_consumers") + + if err != nil { + zap.L().Error(err.Error()) + RespondError(w, apiErr, nil) + return + } + + result, err := aH.reader.GetListResultV3(r.Context(), chq.Query) + + if err != nil { + apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err} + RespondError(w, apiErrObj, err) + return + } + + var entries []mq.OnboardingResponse + + for _, result := range result { + for key, value := range result.Data { + var message, attribute, status string + + intValue := int(*value.(*uint8)) + + if key == "entries" { + attribute = "telemetry ingestion" + if intValue != 0 { + entries = nil + entry := mq.OnboardingResponse{ + Attribute: attribute, + Message: "No data available in the given time range", + Status: "0", + } + entries = append(entries, entry) + break + } else { + status = "1" + } + } else if key == "queue" { + attribute = "messaging.system" + if intValue != 0 { + status = "0" + message = "messaging.system attribute is not present or not equal to kafka in your spans" + } else { + status = "1" + } + } else if key == "kind" { + attribute = "kind" + if intValue != 0 { + status = "0" + message = "check if your consumer spans has kind=5 as attribute" + } else { + status = "1" + } + } else if key == "destination" { + attribute = "messaging.destination.name" + if intValue != 0 { + status = "0" + message = "messaging.destination.name attribute is not present in your spans" + } else { + status = "1" + } + } else if key == "partition" { + attribute = "messaging.destination.partition.id" + if intValue != 0 { + status = "0" + message = "messaging.destination.partition.id attribute is not present in your spans" + } else { + status = "1" + } + } else if key == "svc" { + attribute = "service_name" + if intValue != 0 { + status = "0" + message = "service_name attribute is not present in your spans" + } else { + status = "1" + } + } else if key == "cgroup" { + attribute = "messaging.kafka.consumer.group" + if intValue != 0 { + status = "0" + message = "messaging.kafka.consumer.group attribute is not present in your spans" + } else { + status = "1" + } + } else if key == "bodysize" { + attribute = "messaging.message.body.size" + if intValue != 0 { + status = "0" + message = "messaging.message.body.size attribute is not present in your spans" + } else { + status = "1" + } + } else if key == "clientid" { + attribute = "messaging.client_id" + if intValue != 0 { + status = "0" + message = "messaging.client_id attribute is not present in your spans" + } else { + status = "1" + } + } else if key == "instanceid" { + attribute = "service.instance.id" + if intValue != 0 { + status = "0" + message = "service.instance.id attribute is not present in your spans" + } else { + status = "1" + } + } + + entry := mq.OnboardingResponse{ + Attribute: attribute, + Message: message, + Status: status, + } + entries = append(entries, entry) + } + } + + sort.Slice(entries, func(i, j int) bool { + return entries[i].Attribute < entries[j].Attribute + }) + + aH.Respond(w, entries) +} + +func (aH *APIHandler) onboardKafka( + + w http.ResponseWriter, r *http.Request, + +) { + messagingQueue, apiErr := ParseMessagingQueueBody(r) + if apiErr != nil { + zap.L().Error(apiErr.Err.Error()) + RespondError(w, apiErr, nil) + return + } + + chq, err := mq.BuildClickHouseQuery(messagingQueue, mq.KafkaQueue, "onboard_kafka") + + if err != nil { + zap.L().Error(err.Error()) + RespondError(w, apiErr, nil) + return + } + + result, err := aH.reader.GetListResultV3(r.Context(), chq.Query) + + if err != nil { + apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err} + RespondError(w, apiErrObj, err) + return + } + + var entries []mq.OnboardingResponse + + for _, result := range result { + for key, value := range result.Data { + var message, attribute, status string + + intValue := int(*value.(*uint8)) + + if key == "entries" { + attribute = "telemetry ingestion" + if intValue != 0 { + entries = nil + entry := mq.OnboardingResponse{ + Attribute: attribute, + Message: "No data available in the given time range", + Status: "0", + } + entries = append(entries, entry) + break + } else { + status = "1" + } + } else if key == "fetchlatency" { + attribute = "kafka_consumer_fetch_latency_avg" + if intValue != 0 { + status = "0" + message = "Metric kafka_consumer_fetch_latency_avg is not present in the given time range." + } else { + status = "1" + } + } else if key == "grouplag" { + attribute = "kafka_consumer_group_lag" + if intValue != 0 { + status = "0" + message = "Metric kafka_consumer_group_lag is not present in the given time range." + } else { + status = "1" + } + } + + entry := mq.OnboardingResponse{ + Attribute: attribute, + Message: message, + Status: status, + } + entries = append(entries, entry) + } + } + + sort.Slice(entries, func(i, j int) bool { + return entries[i].Attribute < entries[j].Attribute + }) + + aH.Respond(w, entries) +} + func (aH *APIHandler) getNetworkData( w http.ResponseWriter, r *http.Request, ) { diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md b/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md index 38b61669ff..d338a9acb9 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md @@ -226,3 +226,296 @@ Response in query range `table` format } } ``` + +### Onboarding APIs + +``` +/api/v1/messaging-queues/kafka/onboarding/producers +``` + +```json +{ + "start": 1727620544260611424, + "end": 1727620556401481428 +} +``` + +```json +// everything is present +{ + "status": "success", + "data": [ + { + "attribute": "kind", + "error_message": "", + "status": "1" + }, + { + "attribute": "messaging.destination.name", + "error_message": "", + "status": "1" + }, + { + "attribute": "messaging.destination.partition.id", + "error_message": "messaging.destination.partition.id attribute is not present in your spans", + "status": "0" + }, + { + "attribute": "messaging.system", + "error_message": "", + "status": "1" + }, + { + "attribute": "telemetry ingestion", + "error_message": "", + "status": "1" + } + ] +} + +// partial attributes are present +{ + "status": "success", + "data": [ + { + "attribute": "kind", + "error_message": "", + "status": "1" + }, + { + "attribute": "messaging.destination.name", + "error_message": "messaging.destination.name attribute is not present in your spans", + "status": "0" + }, + { + "attribute": "messaging.destination.partition.id", + "error_message": "", + "status": "1" + }, + { + "attribute": "messaging.system", + "error_message": "", + "status": "1" + }, + { + "attribute": "telemetry ingestion", + "error_message": "", + "status": "1" + } + ] +} + +// no data available +{ + "status": "success", + "data": [ + { + "attribute": "telemetry ingestion", + "error_message": "No data available in the given time range", + "status": "0" + } + ] +} +``` + + +``` +/api/v1/messaging-queues/kafka/onboarding/consumers +``` + +```json +// everything is present +{ + "status": "success", + "data": [ + { + "attribute": "kind", + "error_message": "", + "status": "1" + }, + { + "attribute": "messaging.client_id", + "error_message": "", + "status": "1" + }, + { + "attribute": "messaging.destination.name", + "error_message": "", + "status": "1" + }, + { + "attribute": "messaging.destination.partition.id", + "error_message": "", + "status": "1" + }, + { + "attribute": "messaging.kafka.consumer.group", + "error_message": "", + "status": "1" + }, + { + "attribute": "messaging.message.body.size", + "error_message": "", + "status": "1" + }, + { + "attribute": "messaging.system", + "error_message": "", + "status": "1" + }, + { + "attribute": "service.instance.id", + "error_message": "", + "status": "1" + }, + { + "attribute": "service_name", + "error_message": "", + "status": "1" + }, + { + "attribute": "telemetry ingestion", + "error_message": "", + "status": "1" + } + ] +} + +// partial attributes are present +{ + "status": "success", + "data": [ + { + "attribute": "kind", + "error_message": "check if your consumer spans has kind=5 as attribute", + "status": "0" + }, + { + "attribute": "messaging.client_id", + "error_message": "", + "status": "1" + }, + { + "attribute": "messaging.destination.name", + "error_message": "", + "status": "1" + }, + { + "attribute": "messaging.destination.partition.id", + "error_message": "", + "status": "1" + }, + { + "attribute": "messaging.kafka.consumer.group", + "error_message": "messaging.kafka.consumer.group attribute is not present in your spans", + "status": "0" + }, + { + "attribute": "messaging.message.body.size", + "error_message": "messaging.message.body.size attribute is not present in your spans", + "status": "0" + }, + { + "attribute": "messaging.system", + "error_message": "", + "status": "1" + }, + { + "attribute": "service.instance.id", + "error_message": "", + "status": "1" + }, + { + "attribute": "service_name", + "error_message": "", + "status": "1" + }, + { + "attribute": "telemetry ingestion", + "error_message": "", + "status": "1" + } + ] +} + +// no data available +{ + "status": "success", + "data": [ + { + "attribute": "telemetry ingestion", + "error_message": "No data available in the given time range", + "status": "0" + } + ] +} +``` + +``` +/api/v1/messaging-queues/kafka/onboarding/kafka +``` + +```json +{ + "start": 1728485200000000000, + "end": 1728749800000000000 +} +``` + +```json +// everything is present +{ + "status": "success", + "data": [ + { + "attribute": "telemetry ingestion", + "error_message": "", + "status": "1" + }, + { + "attribute": "kafka_consumer_fetch_latency_avg", + "error_message": "Metric kafka_consumer_fetch_latency_avg is not present in the given time range.", + "status": "0" + }, + { + "attribute": "kafka_consumer_group_lag", + "error_message": "Metric kafka_consumer_group_lag is not present in the given time range.", + "status": "0" + } + ] +} + +// partial attributes are present +{ + "status": "success", + "data": [ + { + "attribute": "telemetry ingestion", + "error_message": "", + "status": "1" + }, + { + "attribute": "kafka_consumer_fetch_latency_avg", + "error_message": "", + "status": "1" + }, + { + "attribute": "kafka_consumer_group_lag", + "error_message": "", + "status": "1" + } + ] +} + +// no data available +{ + "status": "success", + "data": [ + { + "attribute": "telemetry ingestion", + "error_message": "No data available in the given time range", + "status": "0" + } + ] +} +``` diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/model.go b/pkg/query-service/app/integrations/messagingQueues/kafka/model.go index f587868610..803912c17f 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/model.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/model.go @@ -1,6 +1,6 @@ package kafka -const kafkaQueue = "kafka" +const KafkaQueue = "kafka" type MessagingQueue struct { Start int64 `json:"start"` @@ -14,3 +14,9 @@ type Clients struct { ServiceInstanceID []string ServiceName []string } + +type OnboardingResponse struct { + Attribute string `json:"attribute"` + Message string `json:"error_message"` + Status string `json:"status"` +} diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go b/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go index eb06689ef3..a4d3191fe1 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go @@ -95,3 +95,54 @@ ORDER BY throughput DESC `, timeRange, start, end, queueType, consumerGroup, partitionID) return query } + +func onboardProducersSQL(start, end int64, queueType string) string { + query := fmt.Sprintf(` +SELECT + COUNT(*) = 0 AS entries, + COUNT(IF(msgSystem = '%s', 1, NULL)) = 0 AS queue, + COUNT(IF(kind = 4, 1, NULL)) = 0 AS kind, + COUNT(IF(has(stringTagMap, 'messaging.destination.name'), 1, NULL)) = 0 AS destination, + COUNT(IF(has(stringTagMap, 'messaging.destination.partition.id'), 1, NULL)) = 0 AS partition +FROM + signoz_traces.distributed_signoz_index_v2 +WHERE + timestamp >= '%d' + AND timestamp <= '%d';`, queueType, start, end) + return query +} + +func onboardConsumerSQL(start, end int64, queueType string) string { + query := fmt.Sprintf(` +SELECT + COUNT(*) = 0 AS entries, + COUNT(IF(msgSystem = '%s', 1, NULL)) = 0 AS queue, + COUNT(IF(kind = 5, 1, NULL)) = 0 AS kind, + COUNT(serviceName) = 0 AS svc, + COUNT(IF(has(stringTagMap, 'messaging.destination.name'), 1, NULL)) = 0 AS destination, + COUNT(IF(has(stringTagMap, 'messaging.destination.partition.id'), 1, NULL)) = 0 AS partition, + COUNT(IF(has(stringTagMap, 'messaging.kafka.consumer.group'), 1, NULL)) = 0 AS cgroup, + COUNT(IF(has(numberTagMap, 'messaging.message.body.size'), 1, NULL)) = 0 AS bodysize, + COUNT(IF(has(stringTagMap, 'messaging.client_id'), 1, NULL)) = 0 AS clientid, + COUNT(IF(has(stringTagMap, 'service.instance.id'), 1, NULL)) = 0 AS instanceid +FROM signoz_traces.distributed_signoz_index_v2 +WHERE + timestamp >= '%d' + AND timestamp <= '%d';`, queueType, start, end) + return query +} + +func onboardKafkaSQL(start, end int64) string { + query := fmt.Sprintf(` +SELECT + COUNT(*) = 0 AS entries, + COUNT(IF(metric_name = 'kafka_consumer_fetch_latency_avg', 1, NULL)) = 0 AS fetchlatency, + COUNT(IF(metric_name = 'kafka_consumer_group_lag', 1, NULL)) = 0 AS grouplag +FROM + signoz_metrics.time_series_v4_1day +WHERE + metric_name IN ('kafka_consumer_fetch_latency_avg', 'kafka_consumer_group_lag') + AND unix_milli >= '%d' + AND unix_milli < '%d';`, start/1000000, end/1000000) + return query +} diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go b/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go index 7f4266df67..4dca2a2cda 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go @@ -13,16 +13,16 @@ var defaultStepInterval int64 = 60 func BuildQueryRangeParams(messagingQueue *MessagingQueue, queryContext string) (*v3.QueryRangeParamsV3, error) { // ToDo: propagate this through APIs when there are different handlers - queueType := kafkaQueue + queueType := KafkaQueue - var cq *v3.CompositeQuery - - chq, err := buildClickHouseQuery(messagingQueue, queueType, queryContext) + chq, err := BuildClickHouseQuery(messagingQueue, queueType, queryContext) if err != nil { return nil, err } + var cq *v3.CompositeQuery + cq, err = buildCompositeQuery(chq, queryContext) queryRangeParams := &v3.QueryRangeParamsV3{ @@ -37,6 +37,14 @@ func BuildQueryRangeParams(messagingQueue *MessagingQueue, queryContext string) return queryRangeParams, nil } +func PrepareClikhouseQueries(messagingQueue *MessagingQueue, queryContext string) (*v3.ClickHouseQuery, error) { + queueType := KafkaQueue + + chq, err := BuildClickHouseQuery(messagingQueue, queueType, queryContext) + + return chq, err +} + func buildClickHouseQueryNetwork(messagingQueue *MessagingQueue, queueType string) (*v3.ClickHouseQuery, error) { start := messagingQueue.Start end := messagingQueue.End @@ -137,7 +145,7 @@ func buildBuilderQueriesNetwork(unixMilliStart, unixMilliEnd int64, attributeCac func BuildQRParamsNetwork(messagingQueue *MessagingQueue, queryContext string, attributeCache *Clients) (*v3.QueryRangeParamsV3, error) { - queueType := kafkaQueue + queueType := KafkaQueue unixMilliStart := messagingQueue.Start / 1000000 unixMilliEnd := messagingQueue.End / 1000000 @@ -177,17 +185,22 @@ func BuildQRParamsNetwork(messagingQueue *MessagingQueue, queryContext string, a return queryRangeParams, nil } -func buildClickHouseQuery(messagingQueue *MessagingQueue, queueType string, queryContext string) (*v3.ClickHouseQuery, error) { +func BuildClickHouseQuery(messagingQueue *MessagingQueue, queueType string, queryContext string) (*v3.ClickHouseQuery, error) { start := messagingQueue.Start end := messagingQueue.End - topic, ok := messagingQueue.Variables["topic"] - if !ok { - return nil, fmt.Errorf("invalid type for Topic") - } - partition, ok := messagingQueue.Variables["partition"] - if !ok { - return nil, fmt.Errorf("invalid type for Partition") + var topic, partition string + + if queryContext == "producer" || queryContext == "consumer" { + var ok bool + topic, ok = messagingQueue.Variables["topic"] + if !ok { + return nil, fmt.Errorf("invalid type for Topic") + } + partition, ok = messagingQueue.Variables["partition"] + if !ok { + return nil, fmt.Errorf("invalid type for Partition") + } } var query string @@ -199,6 +212,12 @@ func buildClickHouseQuery(messagingQueue *MessagingQueue, queueType string, quer return nil, fmt.Errorf("invalid type for consumer group") } query = generateConsumerSQL(start, end, topic, partition, consumerGroup, queueType) + } else if queryContext == "onboard_producers" { + query = onboardProducersSQL(start, end, queueType) + } else if queryContext == "onboard_consumers" { + query = onboardConsumerSQL(start, end, queueType) + } else if queryContext == "onboard_kafka" { + query = onboardKafkaSQL(start, end) } return &v3.ClickHouseQuery{