mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-07-30 15:52:00 +08:00
fix: Modifies messaging queue paylod (#6783)
* fix: use filterset Signed-off-by: Shivanshu Raj Shrivastava <shivanshu1333@gmail.com>
This commit is contained in:
parent
56b17bcfef
commit
2ead4fbb66
@ -29,6 +29,7 @@ import (
|
|||||||
"go.signoz.io/signoz/pkg/query-service/app/explorer"
|
"go.signoz.io/signoz/pkg/query-service/app/explorer"
|
||||||
"go.signoz.io/signoz/pkg/query-service/app/inframetrics"
|
"go.signoz.io/signoz/pkg/query-service/app/inframetrics"
|
||||||
"go.signoz.io/signoz/pkg/query-service/app/integrations"
|
"go.signoz.io/signoz/pkg/query-service/app/integrations"
|
||||||
|
queues2 "go.signoz.io/signoz/pkg/query-service/app/integrations/messagingQueues/queues"
|
||||||
"go.signoz.io/signoz/pkg/query-service/app/logs"
|
"go.signoz.io/signoz/pkg/query-service/app/logs"
|
||||||
logsv3 "go.signoz.io/signoz/pkg/query-service/app/logs/v3"
|
logsv3 "go.signoz.io/signoz/pkg/query-service/app/logs/v3"
|
||||||
logsv4 "go.signoz.io/signoz/pkg/query-service/app/logs/v4"
|
logsv4 "go.signoz.io/signoz/pkg/query-service/app/logs/v4"
|
||||||
@ -50,11 +51,11 @@ import (
|
|||||||
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
|
||||||
mq "go.signoz.io/signoz/pkg/query-service/app/integrations/messagingQueues/kafka"
|
"go.signoz.io/signoz/pkg/query-service/app/integrations/messagingQueues/kafka"
|
||||||
"go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline"
|
"go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline"
|
||||||
"go.signoz.io/signoz/pkg/query-service/dao"
|
"go.signoz.io/signoz/pkg/query-service/dao"
|
||||||
am "go.signoz.io/signoz/pkg/query-service/integrations/alertManager"
|
am "go.signoz.io/signoz/pkg/query-service/integrations/alertManager"
|
||||||
signozio "go.signoz.io/signoz/pkg/query-service/integrations/signozio"
|
"go.signoz.io/signoz/pkg/query-service/integrations/signozio"
|
||||||
"go.signoz.io/signoz/pkg/query-service/interfaces"
|
"go.signoz.io/signoz/pkg/query-service/interfaces"
|
||||||
"go.signoz.io/signoz/pkg/query-service/model"
|
"go.signoz.io/signoz/pkg/query-service/model"
|
||||||
"go.signoz.io/signoz/pkg/query-service/rules"
|
"go.signoz.io/signoz/pkg/query-service/rules"
|
||||||
@ -2565,14 +2566,14 @@ func (aH *APIHandler) onboardProducers(
|
|||||||
w http.ResponseWriter, r *http.Request,
|
w http.ResponseWriter, r *http.Request,
|
||||||
|
|
||||||
) {
|
) {
|
||||||
messagingQueue, apiErr := ParseMessagingQueueBody(r)
|
messagingQueue, apiErr := ParseKafkaQueueBody(r)
|
||||||
if apiErr != nil {
|
if apiErr != nil {
|
||||||
zap.L().Error(apiErr.Err.Error())
|
zap.L().Error(apiErr.Err.Error())
|
||||||
RespondError(w, apiErr, nil)
|
RespondError(w, apiErr, nil)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
chq, err := mq.BuildClickHouseQuery(messagingQueue, mq.KafkaQueue, "onboard_producers")
|
chq, err := kafka.BuildClickHouseQuery(messagingQueue, kafka.KafkaQueue, "onboard_producers")
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
zap.L().Error(err.Error())
|
zap.L().Error(err.Error())
|
||||||
@ -2588,7 +2589,7 @@ func (aH *APIHandler) onboardProducers(
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
var entries []mq.OnboardingResponse
|
var entries []kafka.OnboardingResponse
|
||||||
|
|
||||||
for _, result := range results {
|
for _, result := range results {
|
||||||
|
|
||||||
@ -2601,7 +2602,7 @@ func (aH *APIHandler) onboardProducers(
|
|||||||
attribute = "telemetry ingestion"
|
attribute = "telemetry ingestion"
|
||||||
if intValue != 0 {
|
if intValue != 0 {
|
||||||
entries = nil
|
entries = nil
|
||||||
entry := mq.OnboardingResponse{
|
entry := kafka.OnboardingResponse{
|
||||||
Attribute: attribute,
|
Attribute: attribute,
|
||||||
Message: "No data available in the given time range",
|
Message: "No data available in the given time range",
|
||||||
Status: "0",
|
Status: "0",
|
||||||
@ -2645,7 +2646,7 @@ func (aH *APIHandler) onboardProducers(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
entry := mq.OnboardingResponse{
|
entry := kafka.OnboardingResponse{
|
||||||
Attribute: attribute,
|
Attribute: attribute,
|
||||||
Message: message,
|
Message: message,
|
||||||
Status: status,
|
Status: status,
|
||||||
@ -2667,14 +2668,14 @@ func (aH *APIHandler) onboardConsumers(
|
|||||||
w http.ResponseWriter, r *http.Request,
|
w http.ResponseWriter, r *http.Request,
|
||||||
|
|
||||||
) {
|
) {
|
||||||
messagingQueue, apiErr := ParseMessagingQueueBody(r)
|
messagingQueue, apiErr := ParseKafkaQueueBody(r)
|
||||||
if apiErr != nil {
|
if apiErr != nil {
|
||||||
zap.L().Error(apiErr.Err.Error())
|
zap.L().Error(apiErr.Err.Error())
|
||||||
RespondError(w, apiErr, nil)
|
RespondError(w, apiErr, nil)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
chq, err := mq.BuildClickHouseQuery(messagingQueue, mq.KafkaQueue, "onboard_consumers")
|
chq, err := kafka.BuildClickHouseQuery(messagingQueue, kafka.KafkaQueue, "onboard_consumers")
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
zap.L().Error(err.Error())
|
zap.L().Error(err.Error())
|
||||||
@ -2690,7 +2691,7 @@ func (aH *APIHandler) onboardConsumers(
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
var entries []mq.OnboardingResponse
|
var entries []kafka.OnboardingResponse
|
||||||
|
|
||||||
for _, result := range result {
|
for _, result := range result {
|
||||||
for key, value := range result.Data {
|
for key, value := range result.Data {
|
||||||
@ -2702,7 +2703,7 @@ func (aH *APIHandler) onboardConsumers(
|
|||||||
attribute = "telemetry ingestion"
|
attribute = "telemetry ingestion"
|
||||||
if intValue != 0 {
|
if intValue != 0 {
|
||||||
entries = nil
|
entries = nil
|
||||||
entry := mq.OnboardingResponse{
|
entry := kafka.OnboardingResponse{
|
||||||
Attribute: attribute,
|
Attribute: attribute,
|
||||||
Message: "No data available in the given time range",
|
Message: "No data available in the given time range",
|
||||||
Status: "0",
|
Status: "0",
|
||||||
@ -2786,7 +2787,7 @@ func (aH *APIHandler) onboardConsumers(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
entry := mq.OnboardingResponse{
|
entry := kafka.OnboardingResponse{
|
||||||
Attribute: attribute,
|
Attribute: attribute,
|
||||||
Message: message,
|
Message: message,
|
||||||
Status: status,
|
Status: status,
|
||||||
@ -2807,14 +2808,14 @@ func (aH *APIHandler) onboardKafka(
|
|||||||
w http.ResponseWriter, r *http.Request,
|
w http.ResponseWriter, r *http.Request,
|
||||||
|
|
||||||
) {
|
) {
|
||||||
messagingQueue, apiErr := ParseMessagingQueueBody(r)
|
messagingQueue, apiErr := ParseKafkaQueueBody(r)
|
||||||
if apiErr != nil {
|
if apiErr != nil {
|
||||||
zap.L().Error(apiErr.Err.Error())
|
zap.L().Error(apiErr.Err.Error())
|
||||||
RespondError(w, apiErr, nil)
|
RespondError(w, apiErr, nil)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
queryRangeParams, err := mq.BuildBuilderQueriesKafkaOnboarding(messagingQueue)
|
queryRangeParams, err := kafka.BuildBuilderQueriesKafkaOnboarding(messagingQueue)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
zap.L().Error(err.Error())
|
zap.L().Error(err.Error())
|
||||||
@ -2829,7 +2830,7 @@ func (aH *APIHandler) onboardKafka(
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
var entries []mq.OnboardingResponse
|
var entries []kafka.OnboardingResponse
|
||||||
|
|
||||||
var fetchLatencyState, consumerLagState bool
|
var fetchLatencyState, consumerLagState bool
|
||||||
|
|
||||||
@ -2853,7 +2854,7 @@ func (aH *APIHandler) onboardKafka(
|
|||||||
}
|
}
|
||||||
|
|
||||||
if !fetchLatencyState && !consumerLagState {
|
if !fetchLatencyState && !consumerLagState {
|
||||||
entries = append(entries, mq.OnboardingResponse{
|
entries = append(entries, kafka.OnboardingResponse{
|
||||||
Attribute: "telemetry ingestion",
|
Attribute: "telemetry ingestion",
|
||||||
Message: "No data available in the given time range",
|
Message: "No data available in the given time range",
|
||||||
Status: "0",
|
Status: "0",
|
||||||
@ -2861,26 +2862,26 @@ func (aH *APIHandler) onboardKafka(
|
|||||||
}
|
}
|
||||||
|
|
||||||
if !fetchLatencyState {
|
if !fetchLatencyState {
|
||||||
entries = append(entries, mq.OnboardingResponse{
|
entries = append(entries, kafka.OnboardingResponse{
|
||||||
Attribute: "kafka_consumer_fetch_latency_avg",
|
Attribute: "kafka_consumer_fetch_latency_avg",
|
||||||
Message: "Metric kafka_consumer_fetch_latency_avg is not present in the given time range.",
|
Message: "Metric kafka_consumer_fetch_latency_avg is not present in the given time range.",
|
||||||
Status: "0",
|
Status: "0",
|
||||||
})
|
})
|
||||||
} else {
|
} else {
|
||||||
entries = append(entries, mq.OnboardingResponse{
|
entries = append(entries, kafka.OnboardingResponse{
|
||||||
Attribute: "kafka_consumer_fetch_latency_avg",
|
Attribute: "kafka_consumer_fetch_latency_avg",
|
||||||
Status: "1",
|
Status: "1",
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
if !consumerLagState {
|
if !consumerLagState {
|
||||||
entries = append(entries, mq.OnboardingResponse{
|
entries = append(entries, kafka.OnboardingResponse{
|
||||||
Attribute: "kafka_consumer_group_lag",
|
Attribute: "kafka_consumer_group_lag",
|
||||||
Message: "Metric kafka_consumer_group_lag is not present in the given time range.",
|
Message: "Metric kafka_consumer_group_lag is not present in the given time range.",
|
||||||
Status: "0",
|
Status: "0",
|
||||||
})
|
})
|
||||||
} else {
|
} else {
|
||||||
entries = append(entries, mq.OnboardingResponse{
|
entries = append(entries, kafka.OnboardingResponse{
|
||||||
Attribute: "kafka_consumer_group_lag",
|
Attribute: "kafka_consumer_group_lag",
|
||||||
Status: "1",
|
Status: "1",
|
||||||
})
|
})
|
||||||
@ -2892,10 +2893,10 @@ func (aH *APIHandler) onboardKafka(
|
|||||||
func (aH *APIHandler) getNetworkData(
|
func (aH *APIHandler) getNetworkData(
|
||||||
w http.ResponseWriter, r *http.Request,
|
w http.ResponseWriter, r *http.Request,
|
||||||
) {
|
) {
|
||||||
attributeCache := &mq.Clients{
|
attributeCache := &kafka.Clients{
|
||||||
Hash: make(map[string]struct{}),
|
Hash: make(map[string]struct{}),
|
||||||
}
|
}
|
||||||
messagingQueue, apiErr := ParseMessagingQueueBody(r)
|
messagingQueue, apiErr := ParseKafkaQueueBody(r)
|
||||||
|
|
||||||
if apiErr != nil {
|
if apiErr != nil {
|
||||||
zap.L().Error(apiErr.Err.Error())
|
zap.L().Error(apiErr.Err.Error())
|
||||||
@ -2903,7 +2904,7 @@ func (aH *APIHandler) getNetworkData(
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
queryRangeParams, err := mq.BuildQRParamsWithCache(messagingQueue, "throughput", attributeCache)
|
queryRangeParams, err := kafka.BuildQRParamsWithCache(messagingQueue, "throughput", attributeCache)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
zap.L().Error(err.Error())
|
zap.L().Error(err.Error())
|
||||||
RespondError(w, apiErr, nil)
|
RespondError(w, apiErr, nil)
|
||||||
@ -2942,7 +2943,7 @@ func (aH *APIHandler) getNetworkData(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
queryRangeParams, err = mq.BuildQRParamsWithCache(messagingQueue, "fetch-latency", attributeCache)
|
queryRangeParams, err = kafka.BuildQRParamsWithCache(messagingQueue, "fetch-latency", attributeCache)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
zap.L().Error(err.Error())
|
zap.L().Error(err.Error())
|
||||||
RespondError(w, apiErr, nil)
|
RespondError(w, apiErr, nil)
|
||||||
@ -2992,7 +2993,7 @@ func (aH *APIHandler) getProducerData(
|
|||||||
w http.ResponseWriter, r *http.Request,
|
w http.ResponseWriter, r *http.Request,
|
||||||
) {
|
) {
|
||||||
// parse the query params to retrieve the messaging queue struct
|
// parse the query params to retrieve the messaging queue struct
|
||||||
messagingQueue, apiErr := ParseMessagingQueueBody(r)
|
messagingQueue, apiErr := ParseKafkaQueueBody(r)
|
||||||
|
|
||||||
if apiErr != nil {
|
if apiErr != nil {
|
||||||
zap.L().Error(apiErr.Err.Error())
|
zap.L().Error(apiErr.Err.Error())
|
||||||
@ -3000,7 +3001,7 @@ func (aH *APIHandler) getProducerData(
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
queryRangeParams, err := mq.BuildQueryRangeParams(messagingQueue, "producer")
|
queryRangeParams, err := kafka.BuildQueryRangeParams(messagingQueue, "producer")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
zap.L().Error(err.Error())
|
zap.L().Error(err.Error())
|
||||||
RespondError(w, apiErr, nil)
|
RespondError(w, apiErr, nil)
|
||||||
@ -3033,7 +3034,7 @@ func (aH *APIHandler) getProducerData(
|
|||||||
func (aH *APIHandler) getConsumerData(
|
func (aH *APIHandler) getConsumerData(
|
||||||
w http.ResponseWriter, r *http.Request,
|
w http.ResponseWriter, r *http.Request,
|
||||||
) {
|
) {
|
||||||
messagingQueue, apiErr := ParseMessagingQueueBody(r)
|
messagingQueue, apiErr := ParseKafkaQueueBody(r)
|
||||||
|
|
||||||
if apiErr != nil {
|
if apiErr != nil {
|
||||||
zap.L().Error(apiErr.Err.Error())
|
zap.L().Error(apiErr.Err.Error())
|
||||||
@ -3041,7 +3042,7 @@ func (aH *APIHandler) getConsumerData(
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
queryRangeParams, err := mq.BuildQueryRangeParams(messagingQueue, "consumer")
|
queryRangeParams, err := kafka.BuildQueryRangeParams(messagingQueue, "consumer")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
zap.L().Error(err.Error())
|
zap.L().Error(err.Error())
|
||||||
RespondError(w, apiErr, nil)
|
RespondError(w, apiErr, nil)
|
||||||
@ -3075,7 +3076,7 @@ func (aH *APIHandler) getConsumerData(
|
|||||||
func (aH *APIHandler) getPartitionOverviewLatencyData(
|
func (aH *APIHandler) getPartitionOverviewLatencyData(
|
||||||
w http.ResponseWriter, r *http.Request,
|
w http.ResponseWriter, r *http.Request,
|
||||||
) {
|
) {
|
||||||
messagingQueue, apiErr := ParseMessagingQueueBody(r)
|
messagingQueue, apiErr := ParseKafkaQueueBody(r)
|
||||||
|
|
||||||
if apiErr != nil {
|
if apiErr != nil {
|
||||||
zap.L().Error(apiErr.Err.Error())
|
zap.L().Error(apiErr.Err.Error())
|
||||||
@ -3083,7 +3084,7 @@ func (aH *APIHandler) getPartitionOverviewLatencyData(
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
queryRangeParams, err := mq.BuildQueryRangeParams(messagingQueue, "producer-topic-throughput")
|
queryRangeParams, err := kafka.BuildQueryRangeParams(messagingQueue, "producer-topic-throughput")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
zap.L().Error(err.Error())
|
zap.L().Error(err.Error())
|
||||||
RespondError(w, apiErr, nil)
|
RespondError(w, apiErr, nil)
|
||||||
@ -3117,7 +3118,7 @@ func (aH *APIHandler) getPartitionOverviewLatencyData(
|
|||||||
func (aH *APIHandler) getConsumerPartitionLatencyData(
|
func (aH *APIHandler) getConsumerPartitionLatencyData(
|
||||||
w http.ResponseWriter, r *http.Request,
|
w http.ResponseWriter, r *http.Request,
|
||||||
) {
|
) {
|
||||||
messagingQueue, apiErr := ParseMessagingQueueBody(r)
|
messagingQueue, apiErr := ParseKafkaQueueBody(r)
|
||||||
|
|
||||||
if apiErr != nil {
|
if apiErr != nil {
|
||||||
zap.L().Error(apiErr.Err.Error())
|
zap.L().Error(apiErr.Err.Error())
|
||||||
@ -3125,7 +3126,7 @@ func (aH *APIHandler) getConsumerPartitionLatencyData(
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
queryRangeParams, err := mq.BuildQueryRangeParams(messagingQueue, "consumer_partition_latency")
|
queryRangeParams, err := kafka.BuildQueryRangeParams(messagingQueue, "consumer_partition_latency")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
zap.L().Error(err.Error())
|
zap.L().Error(err.Error())
|
||||||
RespondError(w, apiErr, nil)
|
RespondError(w, apiErr, nil)
|
||||||
@ -3162,7 +3163,7 @@ func (aH *APIHandler) getConsumerPartitionLatencyData(
|
|||||||
func (aH *APIHandler) getProducerThroughputOverview(
|
func (aH *APIHandler) getProducerThroughputOverview(
|
||||||
w http.ResponseWriter, r *http.Request,
|
w http.ResponseWriter, r *http.Request,
|
||||||
) {
|
) {
|
||||||
messagingQueue, apiErr := ParseMessagingQueueBody(r)
|
messagingQueue, apiErr := ParseKafkaQueueBody(r)
|
||||||
|
|
||||||
if apiErr != nil {
|
if apiErr != nil {
|
||||||
zap.L().Error(apiErr.Err.Error())
|
zap.L().Error(apiErr.Err.Error())
|
||||||
@ -3170,11 +3171,11 @@ func (aH *APIHandler) getProducerThroughputOverview(
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
attributeCache := &mq.Clients{
|
attributeCache := &kafka.Clients{
|
||||||
Hash: make(map[string]struct{}),
|
Hash: make(map[string]struct{}),
|
||||||
}
|
}
|
||||||
|
|
||||||
producerQueryRangeParams, err := mq.BuildQRParamsWithCache(messagingQueue, "producer-throughput-overview", attributeCache)
|
producerQueryRangeParams, err := kafka.BuildQRParamsWithCache(messagingQueue, "producer-throughput-overview", attributeCache)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
zap.L().Error(err.Error())
|
zap.L().Error(err.Error())
|
||||||
RespondError(w, apiErr, nil)
|
RespondError(w, apiErr, nil)
|
||||||
@ -3212,7 +3213,7 @@ func (aH *APIHandler) getProducerThroughputOverview(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
queryRangeParams, err := mq.BuildQRParamsWithCache(messagingQueue, "producer-throughput-overview-byte-rate", attributeCache)
|
queryRangeParams, err := kafka.BuildQRParamsWithCache(messagingQueue, "producer-throughput-overview-byte-rate", attributeCache)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
zap.L().Error(err.Error())
|
zap.L().Error(err.Error())
|
||||||
RespondError(w, apiErr, nil)
|
RespondError(w, apiErr, nil)
|
||||||
@ -3265,7 +3266,7 @@ func (aH *APIHandler) getProducerThroughputOverview(
|
|||||||
func (aH *APIHandler) getProducerThroughputDetails(
|
func (aH *APIHandler) getProducerThroughputDetails(
|
||||||
w http.ResponseWriter, r *http.Request,
|
w http.ResponseWriter, r *http.Request,
|
||||||
) {
|
) {
|
||||||
messagingQueue, apiErr := ParseMessagingQueueBody(r)
|
messagingQueue, apiErr := ParseKafkaQueueBody(r)
|
||||||
|
|
||||||
if apiErr != nil {
|
if apiErr != nil {
|
||||||
zap.L().Error(apiErr.Err.Error())
|
zap.L().Error(apiErr.Err.Error())
|
||||||
@ -3273,7 +3274,7 @@ func (aH *APIHandler) getProducerThroughputDetails(
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
queryRangeParams, err := mq.BuildQueryRangeParams(messagingQueue, "producer-throughput-details")
|
queryRangeParams, err := kafka.BuildQueryRangeParams(messagingQueue, "producer-throughput-details")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
zap.L().Error(err.Error())
|
zap.L().Error(err.Error())
|
||||||
RespondError(w, apiErr, nil)
|
RespondError(w, apiErr, nil)
|
||||||
@ -3307,7 +3308,7 @@ func (aH *APIHandler) getProducerThroughputDetails(
|
|||||||
func (aH *APIHandler) getConsumerThroughputOverview(
|
func (aH *APIHandler) getConsumerThroughputOverview(
|
||||||
w http.ResponseWriter, r *http.Request,
|
w http.ResponseWriter, r *http.Request,
|
||||||
) {
|
) {
|
||||||
messagingQueue, apiErr := ParseMessagingQueueBody(r)
|
messagingQueue, apiErr := ParseKafkaQueueBody(r)
|
||||||
|
|
||||||
if apiErr != nil {
|
if apiErr != nil {
|
||||||
zap.L().Error(apiErr.Err.Error())
|
zap.L().Error(apiErr.Err.Error())
|
||||||
@ -3315,7 +3316,7 @@ func (aH *APIHandler) getConsumerThroughputOverview(
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
queryRangeParams, err := mq.BuildQueryRangeParams(messagingQueue, "consumer-throughput-overview")
|
queryRangeParams, err := kafka.BuildQueryRangeParams(messagingQueue, "consumer-throughput-overview")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
zap.L().Error(err.Error())
|
zap.L().Error(err.Error())
|
||||||
RespondError(w, apiErr, nil)
|
RespondError(w, apiErr, nil)
|
||||||
@ -3349,7 +3350,7 @@ func (aH *APIHandler) getConsumerThroughputOverview(
|
|||||||
func (aH *APIHandler) getConsumerThroughputDetails(
|
func (aH *APIHandler) getConsumerThroughputDetails(
|
||||||
w http.ResponseWriter, r *http.Request,
|
w http.ResponseWriter, r *http.Request,
|
||||||
) {
|
) {
|
||||||
messagingQueue, apiErr := ParseMessagingQueueBody(r)
|
messagingQueue, apiErr := ParseKafkaQueueBody(r)
|
||||||
|
|
||||||
if apiErr != nil {
|
if apiErr != nil {
|
||||||
zap.L().Error(apiErr.Err.Error())
|
zap.L().Error(apiErr.Err.Error())
|
||||||
@ -3357,7 +3358,7 @@ func (aH *APIHandler) getConsumerThroughputDetails(
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
queryRangeParams, err := mq.BuildQueryRangeParams(messagingQueue, "consumer-throughput-details")
|
queryRangeParams, err := kafka.BuildQueryRangeParams(messagingQueue, "consumer-throughput-details")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
zap.L().Error(err.Error())
|
zap.L().Error(err.Error())
|
||||||
RespondError(w, apiErr, nil)
|
RespondError(w, apiErr, nil)
|
||||||
@ -3394,7 +3395,7 @@ func (aH *APIHandler) getConsumerThroughputDetails(
|
|||||||
func (aH *APIHandler) getProducerConsumerEval(
|
func (aH *APIHandler) getProducerConsumerEval(
|
||||||
w http.ResponseWriter, r *http.Request,
|
w http.ResponseWriter, r *http.Request,
|
||||||
) {
|
) {
|
||||||
messagingQueue, apiErr := ParseMessagingQueueBody(r)
|
messagingQueue, apiErr := ParseKafkaQueueBody(r)
|
||||||
|
|
||||||
if apiErr != nil {
|
if apiErr != nil {
|
||||||
zap.L().Error(apiErr.Err.Error())
|
zap.L().Error(apiErr.Err.Error())
|
||||||
@ -3402,7 +3403,7 @@ func (aH *APIHandler) getProducerConsumerEval(
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
queryRangeParams, err := mq.BuildQueryRangeParams(messagingQueue, "producer-consumer-eval")
|
queryRangeParams, err := kafka.BuildQueryRangeParams(messagingQueue, "producer-consumer-eval")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
zap.L().Error(err.Error())
|
zap.L().Error(err.Error())
|
||||||
RespondError(w, apiErr, nil)
|
RespondError(w, apiErr, nil)
|
||||||
@ -3431,15 +3432,24 @@ func (aH *APIHandler) getProducerConsumerEval(
|
|||||||
aH.Respond(w, resp)
|
aH.Respond(w, resp)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ParseMessagingQueueBody parse for messaging queue params
|
// ParseKafkaQueueBody parse for messaging queue params
|
||||||
func ParseMessagingQueueBody(r *http.Request) (*mq.MessagingQueue, *model.ApiError) {
|
func ParseKafkaQueueBody(r *http.Request) (*kafka.MessagingQueue, *model.ApiError) {
|
||||||
messagingQueue := new(mq.MessagingQueue)
|
messagingQueue := new(kafka.MessagingQueue)
|
||||||
if err := json.NewDecoder(r.Body).Decode(messagingQueue); err != nil {
|
if err := json.NewDecoder(r.Body).Decode(messagingQueue); err != nil {
|
||||||
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("cannot parse the request body: %v", err)}
|
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("cannot parse the request body: %v", err)}
|
||||||
}
|
}
|
||||||
return messagingQueue, nil
|
return messagingQueue, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ParseQueueBody parses for any queue
|
||||||
|
func ParseQueueBody(r *http.Request) (*queues2.QueueListRequest, *model.ApiError) {
|
||||||
|
queue := new(queues2.QueueListRequest)
|
||||||
|
if err := json.NewDecoder(r.Body).Decode(queue); err != nil {
|
||||||
|
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("cannot parse the request body: %v", err)}
|
||||||
|
}
|
||||||
|
return queue, nil
|
||||||
|
}
|
||||||
|
|
||||||
// Preferences
|
// Preferences
|
||||||
|
|
||||||
func (aH *APIHandler) getUserPreference(
|
func (aH *APIHandler) getUserPreference(
|
||||||
@ -4963,9 +4973,8 @@ func (aH *APIHandler) updateTraceField(w http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (aH *APIHandler) getQueueOverview(w http.ResponseWriter, r *http.Request) {
|
func (aH *APIHandler) getQueueOverview(w http.ResponseWriter, r *http.Request) {
|
||||||
// ToDo: add capability of dynamic filtering based on any of the filters using QueueFilters
|
|
||||||
|
|
||||||
messagingQueue, apiErr := ParseMessagingQueueBody(r)
|
queueListRequest, apiErr := ParseQueueBody(r)
|
||||||
|
|
||||||
if apiErr != nil {
|
if apiErr != nil {
|
||||||
zap.L().Error(apiErr.Err.Error())
|
zap.L().Error(apiErr.Err.Error())
|
||||||
@ -4973,11 +4982,11 @@ func (aH *APIHandler) getQueueOverview(w http.ResponseWriter, r *http.Request) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
chq, err := mq.BuildClickHouseQuery(messagingQueue, "", "overview")
|
chq, err := queues2.BuildOverviewQuery(queueListRequest)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
zap.L().Error(err.Error())
|
zap.L().Error(err.Error())
|
||||||
RespondError(w, apiErr, nil)
|
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("error building clickhouse query: %v", err)}, nil)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -4987,7 +4996,6 @@ func (aH *APIHandler) getQueueOverview(w http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (aH *APIHandler) getCeleryOverview(w http.ResponseWriter, r *http.Request) {
|
func (aH *APIHandler) getCeleryOverview(w http.ResponseWriter, r *http.Request) {
|
||||||
// TODO: Implement celery overview logic for both worker and tasks types
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (aH *APIHandler) getCeleryTasks(w http.ResponseWriter, r *http.Request) {
|
func (aH *APIHandler) getCeleryTasks(w http.ResponseWriter, r *http.Request) {
|
||||||
|
@ -0,0 +1 @@
|
|||||||
|
package celery
|
@ -22,37 +22,3 @@ type OnboardingResponse struct {
|
|||||||
Message string `json:"error_message"`
|
Message string `json:"error_message"`
|
||||||
Status string `json:"status"`
|
Status string `json:"status"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// QueueFilters
|
|
||||||
// ToDo: add capability of dynamic filtering based on any of the filters
|
|
||||||
type QueueFilters struct {
|
|
||||||
ServiceName []string
|
|
||||||
SpanName []string
|
|
||||||
Queue []string
|
|
||||||
Destination []string
|
|
||||||
Kind []string
|
|
||||||
}
|
|
||||||
|
|
||||||
type CeleryTask struct {
|
|
||||||
kind string
|
|
||||||
status string
|
|
||||||
}
|
|
||||||
|
|
||||||
type CeleryTasks interface {
|
|
||||||
GetKind() string
|
|
||||||
GetStatus() string
|
|
||||||
Set(string, string)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *CeleryTask) GetKind() string {
|
|
||||||
return r.kind
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *CeleryTask) GetStatus() string {
|
|
||||||
return r.status
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *CeleryTask) Set(kind, status string) {
|
|
||||||
r.kind = kind
|
|
||||||
r.status = status
|
|
||||||
}
|
|
||||||
|
@ -2,7 +2,6 @@ package kafka
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func generateConsumerSQL(start, end int64, topic, partition, consumerGroup, queueType string) string {
|
func generateConsumerSQL(start, end int64, topic, partition, consumerGroup, queueType string) string {
|
||||||
@ -319,139 +318,6 @@ GROUP BY
|
|||||||
return query
|
return query
|
||||||
}
|
}
|
||||||
|
|
||||||
// generateOverviewSQL builds the ClickHouse SQL query with optional filters.
|
|
||||||
// If a filter slice is empty, the query does not constrain on that field.
|
|
||||||
func generateOverviewSQL(start, end int64, filters *QueueFilters) string {
|
|
||||||
// Convert from nanoseconds to float seconds in Go to avoid decimal overflow in ClickHouse
|
|
||||||
startSeconds := float64(start) / 1e9
|
|
||||||
endSeconds := float64(end) / 1e9
|
|
||||||
|
|
||||||
// Compute time range difference in Go
|
|
||||||
timeRangeSecs := endSeconds - startSeconds
|
|
||||||
|
|
||||||
// Example ts_bucket boundaries (could be your own logic)
|
|
||||||
tsBucketStart := startSeconds - 1800
|
|
||||||
tsBucketEnd := endSeconds
|
|
||||||
|
|
||||||
// Build WHERE clauses for optional filters
|
|
||||||
// We always require messaging_system IN ('kafka', 'celery'), but
|
|
||||||
// we add additional AND conditions only if the slices are non-empty.
|
|
||||||
var whereClauses []string
|
|
||||||
|
|
||||||
// Mandatory base filter: show only kafka/celery
|
|
||||||
whereClauses = append(whereClauses, "messaging_system IN ('kafka', 'celery')")
|
|
||||||
|
|
||||||
if len(filters.ServiceName) > 0 {
|
|
||||||
whereClauses = append(whereClauses, inClause("service_name", filters.ServiceName))
|
|
||||||
}
|
|
||||||
if len(filters.SpanName) > 0 {
|
|
||||||
whereClauses = append(whereClauses, inClause("span_name", filters.SpanName))
|
|
||||||
}
|
|
||||||
if len(filters.Queue) > 0 {
|
|
||||||
// "queue" in the struct refers to the messaging_system in the DB
|
|
||||||
whereClauses = append(whereClauses, inClause("messaging_system", filters.Queue))
|
|
||||||
}
|
|
||||||
if len(filters.Destination) > 0 {
|
|
||||||
whereClauses = append(whereClauses, inClause("destination", filters.Destination))
|
|
||||||
}
|
|
||||||
if len(filters.Kind) > 0 {
|
|
||||||
whereClauses = append(whereClauses, inClause("kind_string", filters.Kind))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Combine all WHERE clauses with AND
|
|
||||||
whereSQL := strings.Join(whereClauses, "\n AND ")
|
|
||||||
|
|
||||||
if len(whereSQL) > 0 {
|
|
||||||
whereSQL = fmt.Sprintf("AND %s", whereSQL)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Final query string
|
|
||||||
// Note the use of %f for float64 values in fmt.Sprintf
|
|
||||||
query := fmt.Sprintf(`
|
|
||||||
WITH
|
|
||||||
processed_traces AS (
|
|
||||||
SELECT
|
|
||||||
resource_string_service$$name AS service_name,
|
|
||||||
name AS span_name,
|
|
||||||
CASE
|
|
||||||
WHEN attribute_string_messaging$$system != '' THEN attribute_string_messaging$$system
|
|
||||||
WHEN (has(attributes_string, 'celery.action') OR has(attributes_string, 'celery.task_name')) THEN 'celery'
|
|
||||||
ELSE 'undefined'
|
|
||||||
END AS messaging_system,
|
|
||||||
kind_string,
|
|
||||||
COALESCE(
|
|
||||||
NULLIF(attributes_string['messaging.destination.name'], ''),
|
|
||||||
NULLIF(attributes_string['messaging.destination'], '')
|
|
||||||
) AS destination,
|
|
||||||
durationNano,
|
|
||||||
status_code
|
|
||||||
FROM signoz_traces.distributed_signoz_index_v3
|
|
||||||
WHERE
|
|
||||||
timestamp >= toDateTime64(%f, 9)
|
|
||||||
AND timestamp <= toDateTime64(%f, 9)
|
|
||||||
AND ts_bucket_start >= toDateTime64(%f, 9)
|
|
||||||
AND ts_bucket_start <= toDateTime64(%f, 9)
|
|
||||||
AND (
|
|
||||||
attribute_string_messaging$$system = 'kafka'
|
|
||||||
OR has(attributes_string, 'celery.action')
|
|
||||||
OR has(attributes_string, 'celery.task_name')
|
|
||||||
)
|
|
||||||
%s
|
|
||||||
),
|
|
||||||
aggregated_metrics AS (
|
|
||||||
SELECT
|
|
||||||
service_name,
|
|
||||||
span_name,
|
|
||||||
messaging_system,
|
|
||||||
destination,
|
|
||||||
kind_string,
|
|
||||||
count(*) AS total_count,
|
|
||||||
sumIf(1, status_code = 2) AS error_count,
|
|
||||||
quantile(0.95)(durationNano) / 1000000 AS p95_latency -- Convert to ms
|
|
||||||
FROM
|
|
||||||
processed_traces
|
|
||||||
GROUP BY
|
|
||||||
service_name,
|
|
||||||
span_name,
|
|
||||||
messaging_system,
|
|
||||||
destination,
|
|
||||||
kind_string
|
|
||||||
)
|
|
||||||
SELECT
|
|
||||||
aggregated_metrics.service_name,
|
|
||||||
aggregated_metrics.span_name,
|
|
||||||
aggregated_metrics.messaging_system,
|
|
||||||
aggregated_metrics.destination,
|
|
||||||
aggregated_metrics.kind_string,
|
|
||||||
COALESCE(aggregated_metrics.total_count / %f, 0) AS throughput,
|
|
||||||
COALESCE((aggregated_metrics.error_count * 100.0) / aggregated_metrics.total_count, 0) AS error_percentage,
|
|
||||||
aggregated_metrics.p95_latency
|
|
||||||
FROM
|
|
||||||
aggregated_metrics
|
|
||||||
ORDER BY
|
|
||||||
aggregated_metrics.service_name,
|
|
||||||
aggregated_metrics.span_name;
|
|
||||||
`,
|
|
||||||
startSeconds, endSeconds,
|
|
||||||
tsBucketStart, tsBucketEnd,
|
|
||||||
whereSQL, timeRangeSecs,
|
|
||||||
)
|
|
||||||
|
|
||||||
return query
|
|
||||||
}
|
|
||||||
|
|
||||||
// inClause returns SQL like "fieldName IN ('val1','val2','val3')"
|
|
||||||
func inClause(fieldName string, values []string) string {
|
|
||||||
// Quote and escape each value for safety
|
|
||||||
var quoted []string
|
|
||||||
for _, v := range values {
|
|
||||||
// Simple escape: replace any single quotes in v
|
|
||||||
safeVal := strings.ReplaceAll(v, "'", "''")
|
|
||||||
quoted = append(quoted, fmt.Sprintf("'%s'", safeVal))
|
|
||||||
}
|
|
||||||
return fmt.Sprintf("%s IN (%s)", fieldName, strings.Join(quoted, ","))
|
|
||||||
}
|
|
||||||
|
|
||||||
func generateProducerSQL(start, end int64, topic, partition, queueType string) string {
|
func generateProducerSQL(start, end int64, topic, partition, queueType string) string {
|
||||||
timeRange := (end - start) / 1000000000
|
timeRange := (end - start) / 1000000000
|
||||||
tsBucketStart := (start / 1000000000) - 1800
|
tsBucketStart := (start / 1000000000) - 1800
|
||||||
|
@ -2,11 +2,9 @@ package kafka
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"go.signoz.io/signoz/pkg/query-service/common"
|
"go.signoz.io/signoz/pkg/query-service/common"
|
||||||
"go.signoz.io/signoz/pkg/query-service/constants"
|
"go.signoz.io/signoz/pkg/query-service/constants"
|
||||||
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||||
"strings"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var defaultStepInterval int64 = 60
|
var defaultStepInterval int64 = 60
|
||||||
@ -21,6 +19,7 @@ func BuildQueryRangeParams(messagingQueue *MessagingQueue, queryContext string)
|
|||||||
queueType := KafkaQueue
|
queueType := KafkaQueue
|
||||||
|
|
||||||
chq, err := BuildClickHouseQuery(messagingQueue, queueType, queryContext)
|
chq, err := BuildClickHouseQuery(messagingQueue, queueType, queryContext)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -321,34 +320,6 @@ func BuildQRParamsWithCache(
|
|||||||
return queryRangeParams, err
|
return queryRangeParams, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func getFilters(variables map[string]string) *QueueFilters {
|
|
||||||
return &QueueFilters{
|
|
||||||
ServiceName: parseFilter(variables["service_name"]),
|
|
||||||
SpanName: parseFilter(variables["span_name"]),
|
|
||||||
Queue: parseFilter(variables["queue"]),
|
|
||||||
Destination: parseFilter(variables["destination"]),
|
|
||||||
Kind: parseFilter(variables["kind"]),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// parseFilter splits a comma-separated string into a []string.
|
|
||||||
// Returns an empty slice if the input is blank.
|
|
||||||
func parseFilter(val string) []string {
|
|
||||||
if val == "" {
|
|
||||||
return []string{}
|
|
||||||
}
|
|
||||||
// Split on commas, trim whitespace around each part
|
|
||||||
parts := strings.Split(val, ",")
|
|
||||||
var out []string
|
|
||||||
for _, p := range parts {
|
|
||||||
trimmed := strings.TrimSpace(p)
|
|
||||||
if trimmed != "" {
|
|
||||||
out = append(out, trimmed)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return out
|
|
||||||
}
|
|
||||||
|
|
||||||
func BuildClickHouseQuery(
|
func BuildClickHouseQuery(
|
||||||
messagingQueue *MessagingQueue,
|
messagingQueue *MessagingQueue,
|
||||||
queueType string,
|
queueType string,
|
||||||
@ -385,8 +356,6 @@ func BuildClickHouseQuery(
|
|||||||
var query string
|
var query string
|
||||||
|
|
||||||
switch queryContext {
|
switch queryContext {
|
||||||
case "overview":
|
|
||||||
query = generateOverviewSQL(start, end, getFilters(messagingQueue.Variables))
|
|
||||||
case "producer":
|
case "producer":
|
||||||
query = generateProducerSQL(start, end, topic, partition, queueType)
|
query = generateProducerSQL(start, end, topic, partition, queueType)
|
||||||
case "consumer":
|
case "consumer":
|
||||||
|
@ -0,0 +1,27 @@
|
|||||||
|
package queues
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||||
|
)
|
||||||
|
|
||||||
|
type QueueListRequest struct {
|
||||||
|
Start int64 `json:"start"` // unix nano
|
||||||
|
End int64 `json:"end"` // unix nano
|
||||||
|
Filters *v3.FilterSet `json:"filters"`
|
||||||
|
Limit int `json:"limit"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (qr *QueueListRequest) Validate() error {
|
||||||
|
|
||||||
|
err := qr.Filters.Validate()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if qr.Start < 0 || qr.End < 0 {
|
||||||
|
return fmt.Errorf("start and end must be unixnano time")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
@ -0,0 +1,19 @@
|
|||||||
|
package queues
|
||||||
|
|
||||||
|
import (
|
||||||
|
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||||
|
)
|
||||||
|
|
||||||
|
func BuildOverviewQuery(queueList *QueueListRequest) (*v3.ClickHouseQuery, error) {
|
||||||
|
|
||||||
|
err := queueList.Validate()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
query := generateOverviewSQL(queueList.Start, queueList.End, queueList.Filters.Items)
|
||||||
|
|
||||||
|
return &v3.ClickHouseQuery{
|
||||||
|
Query: query,
|
||||||
|
}, nil
|
||||||
|
}
|
117
pkg/query-service/app/integrations/messagingQueues/queues/sql.go
Normal file
117
pkg/query-service/app/integrations/messagingQueues/queues/sql.go
Normal file
@ -0,0 +1,117 @@
|
|||||||
|
package queues
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||||
|
format "go.signoz.io/signoz/pkg/query-service/utils"
|
||||||
|
)
|
||||||
|
|
||||||
|
// generateOverviewSQL builds the ClickHouse SQL query with optional filters.
|
||||||
|
// If a filter slice is empty, the query does not constrain on that field.
|
||||||
|
func generateOverviewSQL(start, end int64, item []v3.FilterItem) string {
|
||||||
|
// Convert from nanoseconds to float seconds in Go to avoid decimal overflow in ClickHouse
|
||||||
|
startSeconds := float64(start) / 1e9
|
||||||
|
endSeconds := float64(end) / 1e9
|
||||||
|
|
||||||
|
timeRangeSecs := endSeconds - startSeconds
|
||||||
|
|
||||||
|
tsBucketStart := startSeconds - 1800
|
||||||
|
tsBucketEnd := endSeconds
|
||||||
|
|
||||||
|
var whereClauses []string
|
||||||
|
|
||||||
|
whereClauses = append(whereClauses, fmt.Sprintf("timestamp >= toDateTime64(%f, 9)", startSeconds))
|
||||||
|
whereClauses = append(whereClauses, fmt.Sprintf("timestamp <= toDateTime64(%f, 9)", endSeconds))
|
||||||
|
|
||||||
|
for _, filter := range item {
|
||||||
|
switch filter.Key.Key {
|
||||||
|
case "service.name":
|
||||||
|
whereClauses = append(whereClauses, fmt.Sprintf("%s IN (%s)", "service_name", format.ClickHouseFormattedValue(filter.Value)))
|
||||||
|
case "name":
|
||||||
|
whereClauses = append(whereClauses, fmt.Sprintf("%s IN (%s)", "span_name", format.ClickHouseFormattedValue(filter.Value)))
|
||||||
|
case "destination":
|
||||||
|
whereClauses = append(whereClauses, fmt.Sprintf("%s IN (%s)", "destination", format.ClickHouseFormattedValue(filter.Value)))
|
||||||
|
case "queue":
|
||||||
|
whereClauses = append(whereClauses, fmt.Sprintf("%s IN (%s)", "messaging_system", format.ClickHouseFormattedValue(filter.Value)))
|
||||||
|
case "kind_string":
|
||||||
|
whereClauses = append(whereClauses, fmt.Sprintf("%s IN (%s)", "kind_string", format.ClickHouseFormattedValue(filter.Value)))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Combine all WHERE clauses with AND
|
||||||
|
whereSQL := strings.Join(whereClauses, "\n AND ")
|
||||||
|
|
||||||
|
if len(whereSQL) > 0 {
|
||||||
|
whereSQL = fmt.Sprintf("AND %s", whereSQL)
|
||||||
|
}
|
||||||
|
|
||||||
|
query := fmt.Sprintf(`
|
||||||
|
WITH
|
||||||
|
processed_traces AS (
|
||||||
|
SELECT
|
||||||
|
resource_string_service$$name AS service_name,
|
||||||
|
name AS span_name,
|
||||||
|
CASE
|
||||||
|
WHEN attribute_string_messaging$$system != '' THEN attribute_string_messaging$$system
|
||||||
|
WHEN (has(attributes_string, 'celery.action') OR has(attributes_string, 'celery.task_name')) THEN 'celery'
|
||||||
|
ELSE 'undefined'
|
||||||
|
END AS messaging_system,
|
||||||
|
kind_string,
|
||||||
|
COALESCE(
|
||||||
|
NULLIF(attributes_string['messaging.destination.name'], ''),
|
||||||
|
NULLIF(attributes_string['messaging.destination'], '')
|
||||||
|
) AS destination,
|
||||||
|
durationNano,
|
||||||
|
status_code
|
||||||
|
FROM signoz_traces.distributed_signoz_index_v3
|
||||||
|
WHERE
|
||||||
|
ts_bucket_start >= toDateTime64(%f, 9)
|
||||||
|
AND ts_bucket_start <= toDateTime64(%f, 9)
|
||||||
|
AND (
|
||||||
|
attribute_string_messaging$$system = 'kafka'
|
||||||
|
OR has(attributes_string, 'celery.action')
|
||||||
|
OR has(attributes_string, 'celery.task_name')
|
||||||
|
)
|
||||||
|
%s
|
||||||
|
),
|
||||||
|
aggregated_metrics AS (
|
||||||
|
SELECT
|
||||||
|
service_name,
|
||||||
|
span_name,
|
||||||
|
messaging_system,
|
||||||
|
destination,
|
||||||
|
kind_string,
|
||||||
|
count(*) AS total_count,
|
||||||
|
sumIf(1, status_code = 2) AS error_count,
|
||||||
|
quantile(0.95)(durationNano) / 1000000 AS p95_latency -- Convert to ms
|
||||||
|
FROM
|
||||||
|
processed_traces
|
||||||
|
GROUP BY
|
||||||
|
service_name,
|
||||||
|
span_name,
|
||||||
|
messaging_system,
|
||||||
|
destination,
|
||||||
|
kind_string
|
||||||
|
)
|
||||||
|
SELECT
|
||||||
|
aggregated_metrics.service_name,
|
||||||
|
aggregated_metrics.span_name,
|
||||||
|
aggregated_metrics.messaging_system,
|
||||||
|
aggregated_metrics.destination,
|
||||||
|
aggregated_metrics.kind_string,
|
||||||
|
COALESCE(aggregated_metrics.total_count / %f, 0) AS throughput,
|
||||||
|
COALESCE((aggregated_metrics.error_count * 100.0) / aggregated_metrics.total_count, 0) AS error_percentage,
|
||||||
|
aggregated_metrics.p95_latency
|
||||||
|
FROM
|
||||||
|
aggregated_metrics
|
||||||
|
ORDER BY
|
||||||
|
aggregated_metrics.service_name,
|
||||||
|
aggregated_metrics.span_name;
|
||||||
|
`, tsBucketStart, tsBucketEnd,
|
||||||
|
whereSQL, timeRangeSecs,
|
||||||
|
)
|
||||||
|
|
||||||
|
return query
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user