mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-07-06 07:15:11 +08:00
fix: add bucketing (#6669)
Co-authored-by: Nityananda Gohain <nityanandagohain@gmail.com>
This commit is contained in:
parent
85cf4f4e2e
commit
60dc479a19
@ -6,6 +6,8 @@ import (
|
|||||||
|
|
||||||
func generateConsumerSQL(start, end int64, topic, partition, consumerGroup, queueType string) string {
|
func generateConsumerSQL(start, end int64, topic, partition, consumerGroup, queueType string) string {
|
||||||
timeRange := (end - start) / 1000000000
|
timeRange := (end - start) / 1000000000
|
||||||
|
tsBucketStart := (start / 1000000000) - 1800
|
||||||
|
tsBucketEnd := end / 1000000000
|
||||||
query := fmt.Sprintf(`
|
query := fmt.Sprintf(`
|
||||||
WITH consumer_query AS (
|
WITH consumer_query AS (
|
||||||
SELECT
|
SELECT
|
||||||
@ -18,6 +20,8 @@ WITH consumer_query AS (
|
|||||||
WHERE
|
WHERE
|
||||||
timestamp >= '%d'
|
timestamp >= '%d'
|
||||||
AND timestamp <= '%d'
|
AND timestamp <= '%d'
|
||||||
|
AND ts_bucket_start >= '%d'
|
||||||
|
AND ts_bucket_start <= '%d'
|
||||||
AND kind = 5
|
AND kind = 5
|
||||||
AND attribute_string_messaging$$system = '%s'
|
AND attribute_string_messaging$$system = '%s'
|
||||||
AND attributes_string['messaging.destination.name'] = '%s'
|
AND attributes_string['messaging.destination.name'] = '%s'
|
||||||
@ -36,13 +40,15 @@ FROM
|
|||||||
consumer_query
|
consumer_query
|
||||||
ORDER BY
|
ORDER BY
|
||||||
resource_string_service$$name;
|
resource_string_service$$name;
|
||||||
`, start, end, queueType, topic, partition, consumerGroup, timeRange)
|
`, start, end, tsBucketStart, tsBucketEnd, queueType, topic, partition, consumerGroup, timeRange)
|
||||||
return query
|
return query
|
||||||
}
|
}
|
||||||
|
|
||||||
// S1 landing
|
// S1 landing
|
||||||
func generatePartitionLatencySQL(start, end int64, queueType string) string {
|
func generatePartitionLatencySQL(start, end int64, queueType string) string {
|
||||||
timeRange := (end - start) / 1000000000
|
timeRange := (end - start) / 1000000000
|
||||||
|
tsBucketStart := (start / 1000000000) - 1800
|
||||||
|
tsBucketEnd := end / 1000000000
|
||||||
query := fmt.Sprintf(`
|
query := fmt.Sprintf(`
|
||||||
WITH partition_query AS (
|
WITH partition_query AS (
|
||||||
SELECT
|
SELECT
|
||||||
@ -54,6 +60,8 @@ WITH partition_query AS (
|
|||||||
WHERE
|
WHERE
|
||||||
timestamp >= '%d'
|
timestamp >= '%d'
|
||||||
AND timestamp <= '%d'
|
AND timestamp <= '%d'
|
||||||
|
AND ts_bucket_start >= '%d'
|
||||||
|
AND ts_bucket_start <= '%d'
|
||||||
AND kind = 4
|
AND kind = 4
|
||||||
AND attribute_string_messaging$$system = '%s'
|
AND attribute_string_messaging$$system = '%s'
|
||||||
GROUP BY topic, partition
|
GROUP BY topic, partition
|
||||||
@ -68,13 +76,15 @@ FROM
|
|||||||
partition_query
|
partition_query
|
||||||
ORDER BY
|
ORDER BY
|
||||||
topic;
|
topic;
|
||||||
`, start, end, queueType, timeRange)
|
`, start, end, tsBucketStart, tsBucketEnd, queueType, timeRange)
|
||||||
return query
|
return query
|
||||||
}
|
}
|
||||||
|
|
||||||
// S1 consumer
|
// S1 consumer
|
||||||
func generateConsumerPartitionLatencySQL(start, end int64, topic, partition, queueType string) string {
|
func generateConsumerPartitionLatencySQL(start, end int64, topic, partition, queueType string) string {
|
||||||
timeRange := (end - start) / 1000000000
|
timeRange := (end - start) / 1000000000
|
||||||
|
tsBucketStart := (start / 1000000000) - 1800
|
||||||
|
tsBucketEnd := end / 1000000000
|
||||||
query := fmt.Sprintf(`
|
query := fmt.Sprintf(`
|
||||||
WITH consumer_pl AS (
|
WITH consumer_pl AS (
|
||||||
SELECT
|
SELECT
|
||||||
@ -87,6 +97,8 @@ WITH consumer_pl AS (
|
|||||||
WHERE
|
WHERE
|
||||||
timestamp >= '%d'
|
timestamp >= '%d'
|
||||||
AND timestamp <= '%d'
|
AND timestamp <= '%d'
|
||||||
|
AND ts_bucket_start >= '%d'
|
||||||
|
AND ts_bucket_start <= '%d'
|
||||||
AND kind = 5
|
AND kind = 5
|
||||||
AND attribute_string_messaging$$system = '%s'
|
AND attribute_string_messaging$$system = '%s'
|
||||||
AND attributes_string['messaging.destination.name'] = '%s'
|
AND attributes_string['messaging.destination.name'] = '%s'
|
||||||
@ -104,14 +116,15 @@ FROM
|
|||||||
consumer_pl
|
consumer_pl
|
||||||
ORDER BY
|
ORDER BY
|
||||||
consumer_group;
|
consumer_group;
|
||||||
`, start, end, queueType, topic, partition, timeRange)
|
`, start, end, tsBucketStart, tsBucketEnd, queueType, topic, partition, timeRange)
|
||||||
return query
|
return query
|
||||||
}
|
}
|
||||||
|
|
||||||
// S3, producer overview
|
// S3, producer overview
|
||||||
func generateProducerPartitionThroughputSQL(start, end int64, queueType string) string {
|
func generateProducerPartitionThroughputSQL(start, end int64, queueType string) string {
|
||||||
timeRange := (end - start) / 1000000000
|
timeRange := (end - start) / 1000000000
|
||||||
// t, svc, rps, byte*, p99, err
|
tsBucketStart := (start / 1000000000) - 1800
|
||||||
|
tsBucketEnd := end / 1000000000 // t, svc, rps, byte*, p99, err
|
||||||
query := fmt.Sprintf(`
|
query := fmt.Sprintf(`
|
||||||
WITH producer_latency AS (
|
WITH producer_latency AS (
|
||||||
SELECT
|
SELECT
|
||||||
@ -124,6 +137,8 @@ WITH producer_latency AS (
|
|||||||
WHERE
|
WHERE
|
||||||
timestamp >= '%d'
|
timestamp >= '%d'
|
||||||
AND timestamp <= '%d'
|
AND timestamp <= '%d'
|
||||||
|
AND ts_bucket_start >= '%d'
|
||||||
|
AND ts_bucket_start <= '%d'
|
||||||
AND kind = 4
|
AND kind = 4
|
||||||
AND attribute_string_messaging$$system = '%s'
|
AND attribute_string_messaging$$system = '%s'
|
||||||
GROUP BY topic, resource_string_service$$name
|
GROUP BY topic, resource_string_service$$name
|
||||||
@ -137,13 +152,15 @@ SELECT
|
|||||||
COALESCE(total_requests / %d, 0) AS throughput
|
COALESCE(total_requests / %d, 0) AS throughput
|
||||||
FROM
|
FROM
|
||||||
producer_latency
|
producer_latency
|
||||||
`, start, end, queueType, timeRange)
|
`, start, end, tsBucketStart, tsBucketEnd, queueType, timeRange)
|
||||||
return query
|
return query
|
||||||
}
|
}
|
||||||
|
|
||||||
// S3, producer topic/service overview
|
// S3, producer topic/service overview
|
||||||
func generateProducerTopicLatencySQL(start, end int64, topic, service, queueType string) string {
|
func generateProducerTopicLatencySQL(start, end int64, topic, service, queueType string) string {
|
||||||
timeRange := (end - start) / 1000000000
|
timeRange := (end - start) / 1000000000
|
||||||
|
tsBucketStart := (start / 1000000000) - 1800
|
||||||
|
tsBucketEnd := end / 1000000000
|
||||||
query := fmt.Sprintf(`
|
query := fmt.Sprintf(`
|
||||||
WITH consumer_latency AS (
|
WITH consumer_latency AS (
|
||||||
SELECT
|
SELECT
|
||||||
@ -155,6 +172,8 @@ WITH consumer_latency AS (
|
|||||||
WHERE
|
WHERE
|
||||||
timestamp >= '%d'
|
timestamp >= '%d'
|
||||||
AND timestamp <= '%d'
|
AND timestamp <= '%d'
|
||||||
|
AND ts_bucket_start >= '%d'
|
||||||
|
AND ts_bucket_start <= '%d'
|
||||||
AND kind = 4
|
AND kind = 4
|
||||||
AND resource_string_service$$name = '%s'
|
AND resource_string_service$$name = '%s'
|
||||||
AND attribute_string_messaging$$system = '%s'
|
AND attribute_string_messaging$$system = '%s'
|
||||||
@ -169,13 +188,15 @@ SELECT
|
|||||||
COALESCE(total_requests / %d, 0) AS throughput
|
COALESCE(total_requests / %d, 0) AS throughput
|
||||||
FROM
|
FROM
|
||||||
consumer_latency
|
consumer_latency
|
||||||
`, start, end, service, queueType, topic, timeRange)
|
`, start, end, tsBucketStart, tsBucketEnd, service, queueType, topic, timeRange)
|
||||||
return query
|
return query
|
||||||
}
|
}
|
||||||
|
|
||||||
// S3 consumer overview
|
// S3 consumer overview
|
||||||
func generateConsumerLatencySQL(start, end int64, queueType string) string {
|
func generateConsumerLatencySQL(start, end int64, queueType string) string {
|
||||||
timeRange := (end - start) / 1000000000
|
timeRange := (end - start) / 1000000000
|
||||||
|
tsBucketStart := (start / 1000000000) - 1800
|
||||||
|
tsBucketEnd := end / 1000000000
|
||||||
query := fmt.Sprintf(`
|
query := fmt.Sprintf(`
|
||||||
WITH consumer_latency AS (
|
WITH consumer_latency AS (
|
||||||
SELECT
|
SELECT
|
||||||
@ -189,6 +210,8 @@ WITH consumer_latency AS (
|
|||||||
WHERE
|
WHERE
|
||||||
timestamp >= '%d'
|
timestamp >= '%d'
|
||||||
AND timestamp <= '%d'
|
AND timestamp <= '%d'
|
||||||
|
AND ts_bucket_start >= '%d'
|
||||||
|
AND ts_bucket_start <= '%d'
|
||||||
AND kind = 5
|
AND kind = 5
|
||||||
AND attribute_string_messaging$$system = '%s'
|
AND attribute_string_messaging$$system = '%s'
|
||||||
GROUP BY topic, resource_string_service$$name
|
GROUP BY topic, resource_string_service$$name
|
||||||
@ -205,13 +228,15 @@ FROM
|
|||||||
consumer_latency
|
consumer_latency
|
||||||
ORDER BY
|
ORDER BY
|
||||||
topic;
|
topic;
|
||||||
`, start, end, queueType, timeRange, timeRange)
|
`, start, end, tsBucketStart, tsBucketEnd, queueType, timeRange, timeRange)
|
||||||
return query
|
return query
|
||||||
}
|
}
|
||||||
|
|
||||||
// S3 consumer topic/service
|
// S3 consumer topic/service
|
||||||
func generateConsumerServiceLatencySQL(start, end int64, topic, service, queueType string) string {
|
func generateConsumerServiceLatencySQL(start, end int64, topic, service, queueType string) string {
|
||||||
timeRange := (end - start) / 1000000000
|
timeRange := (end - start) / 1000000000
|
||||||
|
tsBucketStart := (start / 1000000000) - 1800
|
||||||
|
tsBucketEnd := end / 1000000000
|
||||||
query := fmt.Sprintf(`
|
query := fmt.Sprintf(`
|
||||||
WITH consumer_latency AS (
|
WITH consumer_latency AS (
|
||||||
SELECT
|
SELECT
|
||||||
@ -223,6 +248,8 @@ WITH consumer_latency AS (
|
|||||||
WHERE
|
WHERE
|
||||||
timestamp >= '%d'
|
timestamp >= '%d'
|
||||||
AND timestamp <= '%d'
|
AND timestamp <= '%d'
|
||||||
|
AND ts_bucket_start >= '%d'
|
||||||
|
AND ts_bucket_start <= '%d'
|
||||||
AND kind = 5
|
AND kind = 5
|
||||||
AND resource_string_service$$name = '%s'
|
AND resource_string_service$$name = '%s'
|
||||||
AND attribute_string_messaging$$system = '%s'
|
AND attribute_string_messaging$$system = '%s'
|
||||||
@ -237,7 +264,7 @@ SELECT
|
|||||||
COALESCE(total_requests / %d, 0) AS throughput
|
COALESCE(total_requests / %d, 0) AS throughput
|
||||||
FROM
|
FROM
|
||||||
consumer_latency
|
consumer_latency
|
||||||
`, start, end, service, queueType, topic, timeRange)
|
`, start, end, tsBucketStart, tsBucketEnd, service, queueType, topic, timeRange)
|
||||||
return query
|
return query
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -293,6 +320,8 @@ GROUP BY
|
|||||||
|
|
||||||
func generateProducerSQL(start, end int64, topic, partition, queueType string) string {
|
func generateProducerSQL(start, end int64, topic, partition, queueType string) string {
|
||||||
timeRange := (end - start) / 1000000000
|
timeRange := (end - start) / 1000000000
|
||||||
|
tsBucketStart := (start / 1000000000) - 1800
|
||||||
|
tsBucketEnd := end / 1000000000
|
||||||
query := fmt.Sprintf(`
|
query := fmt.Sprintf(`
|
||||||
WITH producer_query AS (
|
WITH producer_query AS (
|
||||||
SELECT
|
SELECT
|
||||||
@ -304,6 +333,8 @@ WITH producer_query AS (
|
|||||||
WHERE
|
WHERE
|
||||||
timestamp >= '%d'
|
timestamp >= '%d'
|
||||||
AND timestamp <= '%d'
|
AND timestamp <= '%d'
|
||||||
|
AND ts_bucket_start >= '%d'
|
||||||
|
AND ts_bucket_start <= '%d'
|
||||||
AND kind = 4
|
AND kind = 4
|
||||||
AND attribute_string_messaging$$system = '%s'
|
AND attribute_string_messaging$$system = '%s'
|
||||||
AND attributes_string['messaging.destination.name'] = '%s'
|
AND attributes_string['messaging.destination.name'] = '%s'
|
||||||
@ -320,12 +351,14 @@ FROM
|
|||||||
producer_query
|
producer_query
|
||||||
ORDER BY
|
ORDER BY
|
||||||
resource_string_service$$name;
|
resource_string_service$$name;
|
||||||
`, start, end, queueType, topic, partition, timeRange)
|
`, start, end, tsBucketStart, tsBucketEnd, queueType, topic, partition, timeRange)
|
||||||
return query
|
return query
|
||||||
}
|
}
|
||||||
|
|
||||||
func generateNetworkLatencyThroughputSQL(start, end int64, consumerGroup, partitionID, queueType string) string {
|
func generateNetworkLatencyThroughputSQL(start, end int64, consumerGroup, partitionID, queueType string) string {
|
||||||
timeRange := (end - start) / 1000000000
|
timeRange := (end - start) / 1000000000
|
||||||
|
tsBucketStart := (start / 1000000000) - 1800
|
||||||
|
tsBucketEnd := end / 1000000000
|
||||||
query := fmt.Sprintf(`
|
query := fmt.Sprintf(`
|
||||||
SELECT
|
SELECT
|
||||||
attributes_string['messaging.client_id'] AS client_id,
|
attributes_string['messaging.client_id'] AS client_id,
|
||||||
@ -336,17 +369,21 @@ FROM signoz_traces.distributed_signoz_index_v3
|
|||||||
WHERE
|
WHERE
|
||||||
timestamp >= '%d'
|
timestamp >= '%d'
|
||||||
AND timestamp <= '%d'
|
AND timestamp <= '%d'
|
||||||
|
AND ts_bucket_start >= '%d'
|
||||||
|
AND ts_bucket_start <= '%d'
|
||||||
AND kind = 5
|
AND kind = 5
|
||||||
AND attribute_string_messaging$$system = '%s'
|
AND attribute_string_messaging$$system = '%s'
|
||||||
AND attributes_string['messaging.kafka.consumer.group'] = '%s'
|
AND attributes_string['messaging.kafka.consumer.group'] = '%s'
|
||||||
AND attributes_string['messaging.destination.partition.id'] = '%s'
|
AND attributes_string['messaging.destination.partition.id'] = '%s'
|
||||||
GROUP BY service_name, client_id, service_instance_id
|
GROUP BY service_name, client_id, service_instance_id
|
||||||
ORDER BY throughput DESC
|
ORDER BY throughput DESC
|
||||||
`, timeRange, start, end, queueType, consumerGroup, partitionID)
|
`, timeRange, start, end, tsBucketStart, tsBucketEnd, queueType, consumerGroup, partitionID)
|
||||||
return query
|
return query
|
||||||
}
|
}
|
||||||
|
|
||||||
func onboardProducersSQL(start, end int64, queueType string) string {
|
func onboardProducersSQL(start, end int64, queueType string) string {
|
||||||
|
tsBucketStart := (start / 1000000000) - 1800
|
||||||
|
tsBucketEnd := end / 1000000000
|
||||||
query := fmt.Sprintf(`
|
query := fmt.Sprintf(`
|
||||||
SELECT
|
SELECT
|
||||||
COUNT(*) = 0 AS entries,
|
COUNT(*) = 0 AS entries,
|
||||||
@ -358,11 +395,15 @@ FROM
|
|||||||
signoz_traces.distributed_signoz_index_v3
|
signoz_traces.distributed_signoz_index_v3
|
||||||
WHERE
|
WHERE
|
||||||
timestamp >= '%d'
|
timestamp >= '%d'
|
||||||
AND timestamp <= '%d';`, queueType, start, end)
|
AND timestamp <= '%d'
|
||||||
|
AND ts_bucket_start >= '%d'
|
||||||
|
AND ts_bucket_start <= '%d';`, queueType, start, end, tsBucketStart, tsBucketEnd)
|
||||||
return query
|
return query
|
||||||
}
|
}
|
||||||
|
|
||||||
func onboardConsumerSQL(start, end int64, queueType string) string {
|
func onboardConsumerSQL(start, end int64, queueType string) string {
|
||||||
|
tsBucketStart := (start / 1000000000) - 1800
|
||||||
|
tsBucketEnd := end / 1000000000
|
||||||
query := fmt.Sprintf(`
|
query := fmt.Sprintf(`
|
||||||
SELECT
|
SELECT
|
||||||
COUNT(*) = 0 AS entries,
|
COUNT(*) = 0 AS entries,
|
||||||
@ -378,6 +419,8 @@ SELECT
|
|||||||
FROM signoz_traces.distributed_signoz_index_v3
|
FROM signoz_traces.distributed_signoz_index_v3
|
||||||
WHERE
|
WHERE
|
||||||
timestamp >= '%d'
|
timestamp >= '%d'
|
||||||
AND timestamp <= '%d';`, queueType, start, end)
|
AND timestamp <= '%d'
|
||||||
|
AND ts_bucket_start >= '%d'
|
||||||
|
AND ts_bucket_start <= '%d' ;`, queueType, start, end, tsBucketStart, tsBucketEnd)
|
||||||
return query
|
return query
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user