From c957c0f75728b65fb8ab32d7778802bf1a042df6 Mon Sep 17 00:00:00 2001 From: shivanshu Date: Mon, 5 Aug 2024 18:14:40 +0530 Subject: [PATCH] chore: addressing review comments --- pkg/query-service/app/http_handler.go | 1 - .../messagingQueues/kafka/consumerLag.md | 2 +- .../integrations/messagingQueues/kafka/sql.go | 167 ++++-------------- 3 files changed, 37 insertions(+), 133 deletions(-) diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 729dbf77aa..29cb807686 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -2252,7 +2252,6 @@ func (aH *APIHandler) RegisterMessagingQueuesRoutes(router *mux.Router, am *Auth // SubRouter for kafka kafkaSubRouter := router.PathPrefix("/api/v1/messaging-queues/kafka/consumer-lag").Subrouter() - //kafkaSubRouter.HandleFunc("/consumer-lag", am.ViewAccess(aH.QueryRangeV4)).Methods(http.MethodPost) kafkaSubRouter.HandleFunc("/producer-details", am.ViewAccess(aH.getProducerData)).Methods(http.MethodPost) kafkaSubRouter.HandleFunc("/consumer-details", am.ViewAccess(aH.getConsumerData)).Methods(http.MethodPost) diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md b/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md index 5e12a87e76..c34bc7ad64 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md @@ -86,7 +86,7 @@ response in query range format `series` API endpoint: ``` -POST /api/v1/messaging-queues/kafka/consumer-lag/consumer-details +POST /api/v1/messaging-queues/kafka/consumer-lag/producer-details ``` ```json diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go b/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go index f479ea5ac9..e06e35efde 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go @@ -5,167 +5,72 @@ import ( ) func generateConsumerSQL(start, end int64, topic, partition, consumerGroup, queueType string) string { + timeRange := (end - start) / 1000000000 query := fmt.Sprintf(` -WITH --- Sub query for p99 calculation -p99_query AS ( +WITH consumer_query AS ( SELECT serviceName, - quantile(0.99)(durationNano) / 1000000 as p99 - FROM signoz_traces.signoz_index_v2 - WHERE - timestamp >= '%d' - AND timestamp <= '%d' - AND kind = 5 - AND msgSystem = '%s' - AND stringTagMap['messaging.destination.name'] = '%s' - AND stringTagMap['messaging.destination.partition.id'] = '%s' - AND stringTagMap['messaging.kafka.consumer.group'] = '%s' - GROUP BY serviceName -), - --- Sub query for RPS calculation -rps_query AS ( - SELECT - serviceName, - count(*) / ((%d - %d) / 1000000000) AS rps -- Convert nanoseconds to seconds - FROM signoz_traces.signoz_index_v2 + quantile(0.99)(durationNano) / 1000000 AS p99, + COUNT(*) AS total_requests, + SUM(CASE WHEN statusCode = 2 THEN 1 ELSE 0 END) AS error_count, + avg(CASE WHEN has(numberTagMap, 'messaging.message.body.size') THEN numberTagMap['messaging.message.body.size'] ELSE NULL END) AS avg_msg_size + FROM signoz_traces.distributed_signoz_index_v2 WHERE timestamp >= '%d' AND timestamp <= '%d' AND kind = 5 - AND msgSystem = '%s' + AND msgSystem = '%s' AND stringTagMap['messaging.destination.name'] = '%s' AND stringTagMap['messaging.destination.partition.id'] = '%s' - AND stringTagMap['messaging.kafka.consumer.group'] = '%s' - GROUP BY serviceName -), - --- Sub query for error rate calculation -error_rate_query AS ( - SELECT - serviceName, - count(*) / ((%d - %d) / 1000000000) AS error_rate -- Convert nanoseconds to seconds - FROM signoz_traces.signoz_index_v2 - WHERE - timestamp >= '%d' - AND timestamp <= '%d' - AND statusCode = 2 - AND kind = 5 - AND msgSystem = '%s' - AND stringTagMap['messaging.destination.name'] = '%s' - AND stringTagMap['messaging.destination.partition.id'] = '%s' - AND stringTagMap['messaging.kafka.consumer.group'] = '%s' - GROUP BY serviceName -), - --- Sub query for average message size calculation -avg_msg_size_query AS ( - SELECT - serviceName, - avg(numberTagMap['messaging.message.body.size']) AS avg_msg_size - FROM signoz_traces.signoz_index_v2 - WHERE - timestamp >= '%d' - AND timestamp <= '%d' - AND kind = 5 - AND msgSystem = '%s' - AND stringTagMap['messaging.destination.name'] = '%s' - AND stringTagMap['messaging.destination.partition.id'] = '%s' - AND stringTagMap['messaging.kafka.consumer.group'] = '%s' + AND stringTagMap['messaging.kafka.consumer.group'] = '%s' GROUP BY serviceName ) --- Main query to combine all metrics +-- Main query to select all metrics SELECT - p99_query.serviceName AS service_name, - p99_query.p99 AS p99, - COALESCE(error_rate_query.error_rate, 0) AS error_rate, - COALESCE(rps_query.rps, 0) AS throughput, - COALESCE(avg_msg_size_query.avg_msg_size, 0) AS avg_msg_size + serviceName AS service_name, + p99, + COALESCE((error_count * 100.0) / total_requests, 0) AS error_rate, + COALESCE(total_requests / %d, 0) AS throughput, -- Convert nanoseconds to seconds + COALESCE(avg_msg_size, 0) AS avg_msg_size FROM - p99_query - LEFT JOIN rps_query ON p99_query.serviceName = rps_query.serviceName - LEFT JOIN error_rate_query ON p99_query.serviceName = error_rate_query.serviceName - LEFT JOIN avg_msg_size_query ON p99_query.serviceName = avg_msg_size_query.serviceName + consumer_query ORDER BY - p99_query.serviceName; -`, start, end, queueType, topic, partition, consumerGroup, end, start, start, end, queueType, topic, - partition, consumerGroup, end, start, start, end, queueType, topic, partition, - consumerGroup, end, start, queueType, topic, partition, consumerGroup) + serviceName; +`, start, end, queueType, topic, partition, consumerGroup, timeRange) return query } func generateProducerSQL(start, end int64, topic, partition, queueType string) string { + timeRange := (end - start) / 1000000000 query := fmt.Sprintf(` - --- producer -WITH --- Subquery for p99 calculation -p99_query AS ( +WITH producer_query AS ( SELECT serviceName, - quantile(0.99)(durationNano) / 1000000 as p99 - FROM signoz_traces.signoz_index_v2 + quantile(0.99)(durationNano) / 1000000 AS p99, + count(*) AS total_count, + SUM(CASE WHEN statusCode = 2 THEN 1 ELSE 0 END) AS error_count + FROM signoz_traces.distributed_signoz_index_v2 WHERE timestamp >= '%d' - AND timestamp <= '%d' - AND kind = 4 - AND msgSystem = '%s' - AND stringTagMap['messaging.destination.name'] = '%s' - AND stringTagMap['messaging.destination.partition.id'] = '%s' - GROUP BY serviceName -), - --- Subquery for RPS calculation -rps_query AS ( - SELECT - serviceName, - count(*) / ((%d - %d) / 1000000000) as rps -- Convert nanoseconds to seconds - FROM signoz_traces.signoz_index_v2 - WHERE - timestamp >= '%d' - AND timestamp <= '%d' - AND kind = 4 - AND msgSystem = '%s' - AND stringTagMap['messaging.destination.name'] = '%s' - AND stringTagMap['messaging.destination.partition.id'] = '%s' - GROUP BY serviceName -), - --- Subquery for error rate calculation -error_rate_query AS ( - SELECT - serviceName, - count(*) / ((%d - %d) / 1000000000) as error_rate -- Convert nanoseconds to seconds - FROM signoz_traces.signoz_index_v2 - WHERE - timestamp >= '%d' - AND timestamp <= '%d' - AND statusCode = 2 - AND kind = 4 - AND msgSystem = '%s' - AND stringTagMap['messaging.destination.name'] = '%s' - AND stringTagMap['messaging.destination.partition.id'] = '%s' + AND timestamp <= '%d' + AND kind = 4 + AND msgSystem = '%s' + AND stringTagMap['messaging.destination.name'] = '%s' + AND stringTagMap['messaging.destination.partition.id'] = '%s' GROUP BY serviceName ) --- Main query to combine all metrics SELECT - p99_query.serviceName AS service_name, - p99_query.p99, - COALESCE(error_rate_query.error_rate, 0) AS error_rate, - COALESCE(rps_query.rps, 0) AS rps + serviceName AS service_name, + p99, + COALESCE((error_count * 100.0) / total_count, 0) AS error_percentage, + COALESCE(total_count / %d, 0) AS rps -- Convert nanoseconds to seconds FROM - p99_query - LEFT JOIN - rps_query ON p99_query.serviceName = rps_query.serviceName - LEFT JOIN - error_rate_query ON p99_query.serviceName = error_rate_query.serviceName + producer_query ORDER BY - p99_query.serviceName; + serviceName; -`, start, end, queueType, topic, partition, end, start, start, end, queueType, topic, - partition, end, start, start, end, queueType, topic, partition) +`, start, end, queueType, topic, partition, timeRange) return query }