From 60dc479a195cab7419528096e82bad6b9bc1e78f Mon Sep 17 00:00:00 2001 From: Shivanshu Raj Shrivastava Date: Wed, 18 Dec 2024 19:57:33 +0530 Subject: [PATCH] fix: add bucketing (#6669) Co-authored-by: Nityananda Gohain --- .../integrations/messagingQueues/kafka/sql.go | 67 +++++++++++++++---- 1 file changed, 55 insertions(+), 12 deletions(-) diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go b/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go index 05577ab8b9..9b943acbc8 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go @@ -6,6 +6,8 @@ import ( func generateConsumerSQL(start, end int64, topic, partition, consumerGroup, queueType string) string { timeRange := (end - start) / 1000000000 + tsBucketStart := (start / 1000000000) - 1800 + tsBucketEnd := end / 1000000000 query := fmt.Sprintf(` WITH consumer_query AS ( SELECT @@ -18,6 +20,8 @@ WITH consumer_query AS ( WHERE timestamp >= '%d' AND timestamp <= '%d' + AND ts_bucket_start >= '%d' + AND ts_bucket_start <= '%d' AND kind = 5 AND attribute_string_messaging$$system = '%s' AND attributes_string['messaging.destination.name'] = '%s' @@ -36,13 +40,15 @@ FROM consumer_query ORDER BY resource_string_service$$name; -`, start, end, queueType, topic, partition, consumerGroup, timeRange) +`, start, end, tsBucketStart, tsBucketEnd, queueType, topic, partition, consumerGroup, timeRange) return query } // S1 landing func generatePartitionLatencySQL(start, end int64, queueType string) string { timeRange := (end - start) / 1000000000 + tsBucketStart := (start / 1000000000) - 1800 + tsBucketEnd := end / 1000000000 query := fmt.Sprintf(` WITH partition_query AS ( SELECT @@ -54,6 +60,8 @@ WITH partition_query AS ( WHERE timestamp >= '%d' AND timestamp <= '%d' + AND ts_bucket_start >= '%d' + AND ts_bucket_start <= '%d' AND kind = 4 AND attribute_string_messaging$$system = '%s' GROUP BY topic, partition @@ -68,13 +76,15 @@ FROM partition_query ORDER BY topic; -`, start, end, queueType, timeRange) +`, start, end, tsBucketStart, tsBucketEnd, queueType, timeRange) return query } // S1 consumer func generateConsumerPartitionLatencySQL(start, end int64, topic, partition, queueType string) string { timeRange := (end - start) / 1000000000 + tsBucketStart := (start / 1000000000) - 1800 + tsBucketEnd := end / 1000000000 query := fmt.Sprintf(` WITH consumer_pl AS ( SELECT @@ -87,6 +97,8 @@ WITH consumer_pl AS ( WHERE timestamp >= '%d' AND timestamp <= '%d' + AND ts_bucket_start >= '%d' + AND ts_bucket_start <= '%d' AND kind = 5 AND attribute_string_messaging$$system = '%s' AND attributes_string['messaging.destination.name'] = '%s' @@ -104,14 +116,15 @@ FROM consumer_pl ORDER BY consumer_group; -`, start, end, queueType, topic, partition, timeRange) +`, start, end, tsBucketStart, tsBucketEnd, queueType, topic, partition, timeRange) return query } // S3, producer overview func generateProducerPartitionThroughputSQL(start, end int64, queueType string) string { timeRange := (end - start) / 1000000000 - // t, svc, rps, byte*, p99, err + tsBucketStart := (start / 1000000000) - 1800 + tsBucketEnd := end / 1000000000 // t, svc, rps, byte*, p99, err query := fmt.Sprintf(` WITH producer_latency AS ( SELECT @@ -124,6 +137,8 @@ WITH producer_latency AS ( WHERE timestamp >= '%d' AND timestamp <= '%d' + AND ts_bucket_start >= '%d' + AND ts_bucket_start <= '%d' AND kind = 4 AND attribute_string_messaging$$system = '%s' GROUP BY topic, resource_string_service$$name @@ -137,13 +152,15 @@ SELECT COALESCE(total_requests / %d, 0) AS throughput FROM producer_latency -`, start, end, queueType, timeRange) +`, start, end, tsBucketStart, tsBucketEnd, queueType, timeRange) return query } // S3, producer topic/service overview func generateProducerTopicLatencySQL(start, end int64, topic, service, queueType string) string { timeRange := (end - start) / 1000000000 + tsBucketStart := (start / 1000000000) - 1800 + tsBucketEnd := end / 1000000000 query := fmt.Sprintf(` WITH consumer_latency AS ( SELECT @@ -155,6 +172,8 @@ WITH consumer_latency AS ( WHERE timestamp >= '%d' AND timestamp <= '%d' + AND ts_bucket_start >= '%d' + AND ts_bucket_start <= '%d' AND kind = 4 AND resource_string_service$$name = '%s' AND attribute_string_messaging$$system = '%s' @@ -169,13 +188,15 @@ SELECT COALESCE(total_requests / %d, 0) AS throughput FROM consumer_latency -`, start, end, service, queueType, topic, timeRange) +`, start, end, tsBucketStart, tsBucketEnd, service, queueType, topic, timeRange) return query } // S3 consumer overview func generateConsumerLatencySQL(start, end int64, queueType string) string { timeRange := (end - start) / 1000000000 + tsBucketStart := (start / 1000000000) - 1800 + tsBucketEnd := end / 1000000000 query := fmt.Sprintf(` WITH consumer_latency AS ( SELECT @@ -189,6 +210,8 @@ WITH consumer_latency AS ( WHERE timestamp >= '%d' AND timestamp <= '%d' + AND ts_bucket_start >= '%d' + AND ts_bucket_start <= '%d' AND kind = 5 AND attribute_string_messaging$$system = '%s' GROUP BY topic, resource_string_service$$name @@ -205,13 +228,15 @@ FROM consumer_latency ORDER BY topic; -`, start, end, queueType, timeRange, timeRange) +`, start, end, tsBucketStart, tsBucketEnd, queueType, timeRange, timeRange) return query } // S3 consumer topic/service func generateConsumerServiceLatencySQL(start, end int64, topic, service, queueType string) string { timeRange := (end - start) / 1000000000 + tsBucketStart := (start / 1000000000) - 1800 + tsBucketEnd := end / 1000000000 query := fmt.Sprintf(` WITH consumer_latency AS ( SELECT @@ -223,6 +248,8 @@ WITH consumer_latency AS ( WHERE timestamp >= '%d' AND timestamp <= '%d' + AND ts_bucket_start >= '%d' + AND ts_bucket_start <= '%d' AND kind = 5 AND resource_string_service$$name = '%s' AND attribute_string_messaging$$system = '%s' @@ -237,7 +264,7 @@ SELECT COALESCE(total_requests / %d, 0) AS throughput FROM consumer_latency -`, start, end, service, queueType, topic, timeRange) +`, start, end, tsBucketStart, tsBucketEnd, service, queueType, topic, timeRange) return query } @@ -293,6 +320,8 @@ GROUP BY func generateProducerSQL(start, end int64, topic, partition, queueType string) string { timeRange := (end - start) / 1000000000 + tsBucketStart := (start / 1000000000) - 1800 + tsBucketEnd := end / 1000000000 query := fmt.Sprintf(` WITH producer_query AS ( SELECT @@ -304,6 +333,8 @@ WITH producer_query AS ( WHERE timestamp >= '%d' AND timestamp <= '%d' + AND ts_bucket_start >= '%d' + AND ts_bucket_start <= '%d' AND kind = 4 AND attribute_string_messaging$$system = '%s' AND attributes_string['messaging.destination.name'] = '%s' @@ -320,12 +351,14 @@ FROM producer_query ORDER BY resource_string_service$$name; -`, start, end, queueType, topic, partition, timeRange) +`, start, end, tsBucketStart, tsBucketEnd, queueType, topic, partition, timeRange) return query } func generateNetworkLatencyThroughputSQL(start, end int64, consumerGroup, partitionID, queueType string) string { timeRange := (end - start) / 1000000000 + tsBucketStart := (start / 1000000000) - 1800 + tsBucketEnd := end / 1000000000 query := fmt.Sprintf(` SELECT attributes_string['messaging.client_id'] AS client_id, @@ -336,17 +369,21 @@ FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= '%d' AND timestamp <= '%d' + AND ts_bucket_start >= '%d' + AND ts_bucket_start <= '%d' AND kind = 5 AND attribute_string_messaging$$system = '%s' AND attributes_string['messaging.kafka.consumer.group'] = '%s' AND attributes_string['messaging.destination.partition.id'] = '%s' GROUP BY service_name, client_id, service_instance_id ORDER BY throughput DESC -`, timeRange, start, end, queueType, consumerGroup, partitionID) +`, timeRange, start, end, tsBucketStart, tsBucketEnd, queueType, consumerGroup, partitionID) return query } func onboardProducersSQL(start, end int64, queueType string) string { + tsBucketStart := (start / 1000000000) - 1800 + tsBucketEnd := end / 1000000000 query := fmt.Sprintf(` SELECT COUNT(*) = 0 AS entries, @@ -358,11 +395,15 @@ FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= '%d' - AND timestamp <= '%d';`, queueType, start, end) + AND timestamp <= '%d' + AND ts_bucket_start >= '%d' + AND ts_bucket_start <= '%d';`, queueType, start, end, tsBucketStart, tsBucketEnd) return query } func onboardConsumerSQL(start, end int64, queueType string) string { + tsBucketStart := (start / 1000000000) - 1800 + tsBucketEnd := end / 1000000000 query := fmt.Sprintf(` SELECT COUNT(*) = 0 AS entries, @@ -378,6 +419,8 @@ SELECT FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= '%d' - AND timestamp <= '%d';`, queueType, start, end) + AND timestamp <= '%d' + AND ts_bucket_start >= '%d' + AND ts_bucket_start <= '%d' ;`, queueType, start, end, tsBucketStart, tsBucketEnd) return query }