From e51f4d986d0cb109991a18de0416fc72a2649aed Mon Sep 17 00:00:00 2001 From: Shivanshu Raj Shrivastava Date: Thu, 17 Oct 2024 19:44:42 +0530 Subject: [PATCH] feat: kafka Scenario 1, 3, 4 all squashed (#6144) * feat: kafka partition level observerability features --- ee/query-service/constants/constants.go | 2 + pkg/query-service/app/http_handler.go | 393 ++++++++++++++- .../messagingQueues/kafka/consumerLag.md | 459 ++++++++++++++++++ .../messagingQueues/kafka/model.go | 2 + .../integrations/messagingQueues/kafka/sql.go | 260 +++++++++- .../messagingQueues/kafka/translator.go | 141 +++++- pkg/query-service/app/querier/v2/querier.go | 16 +- 7 files changed, 1232 insertions(+), 41 deletions(-) diff --git a/ee/query-service/constants/constants.go b/ee/query-service/constants/constants.go index c1baa6320b..c20acf5bb9 100644 --- a/ee/query-service/constants/constants.go +++ b/ee/query-service/constants/constants.go @@ -14,6 +14,8 @@ var SaasSegmentKey = GetOrDefaultEnv("SIGNOZ_SAAS_SEGMENT_KEY", "") var FetchFeatures = GetOrDefaultEnv("FETCH_FEATURES", "false") var ZeusFeaturesURL = GetOrDefaultEnv("ZEUS_FEATURES_URL", "ZeusFeaturesURL") +var KafkaSpanEval = GetOrDefaultEnv("KAFKA_SPAN_EVAL", "false") + func GetOrDefaultEnv(key string, fallback string) string { v := os.Getenv(key) if len(v) == 0 { diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index fea82faf4c..790e64fb60 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -2516,22 +2516,35 @@ func (aH *APIHandler) RegisterMessagingQueuesRoutes(router *mux.Router, am *Auth // SubRouter for kafka kafkaRouter := router.PathPrefix("/api/v1/messaging-queues/kafka").Subrouter() - consumerLagRouter := kafkaRouter.PathPrefix("/consumer-lag").Subrouter() - consumerLagRouter.HandleFunc("/producer-details", am.ViewAccess(aH.getProducerData)).Methods(http.MethodPost) - consumerLagRouter.HandleFunc("/consumer-details", am.ViewAccess(aH.getConsumerData)).Methods(http.MethodPost) - consumerLagRouter.HandleFunc("/network-latency", am.ViewAccess(aH.getNetworkData)).Methods(http.MethodPost) - onboardingRouter := kafkaRouter.PathPrefix("/onboarding").Subrouter() onboardingRouter.HandleFunc("/producers", am.ViewAccess(aH.onboardProducers)).Methods(http.MethodPost) onboardingRouter.HandleFunc("/consumers", am.ViewAccess(aH.onboardConsumers)).Methods(http.MethodPost) onboardingRouter.HandleFunc("/kafka", am.ViewAccess(aH.onboardKafka)).Methods(http.MethodPost) + partitionLatency := kafkaRouter.PathPrefix("/partition-latency").Subrouter() + partitionLatency.HandleFunc("/overview", am.ViewAccess(aH.getPartitionOverviewLatencyData)).Methods(http.MethodPost) + partitionLatency.HandleFunc("/consumer", am.ViewAccess(aH.getConsumerPartitionLatencyData)).Methods(http.MethodPost) + + consumerLagRouter := kafkaRouter.PathPrefix("/consumer-lag").Subrouter() + consumerLagRouter.HandleFunc("/producer-details", am.ViewAccess(aH.getProducerData)).Methods(http.MethodPost) + consumerLagRouter.HandleFunc("/consumer-details", am.ViewAccess(aH.getConsumerData)).Methods(http.MethodPost) + consumerLagRouter.HandleFunc("/network-latency", am.ViewAccess(aH.getNetworkData)).Methods(http.MethodPost) + + topicThroughput := kafkaRouter.PathPrefix("/topic-throughput").Subrouter() + topicThroughput.HandleFunc("/producer", am.ViewAccess(aH.getProducerThroughputOverview)).Methods(http.MethodPost) + topicThroughput.HandleFunc("/producer-details", am.ViewAccess(aH.getProducerThroughputDetails)).Methods(http.MethodPost) + topicThroughput.HandleFunc("/consumer", am.ViewAccess(aH.getConsumerThroughputOverview)).Methods(http.MethodPost) + topicThroughput.HandleFunc("/consumer-details", am.ViewAccess(aH.getConsumerThroughputDetails)).Methods(http.MethodPost) + + spanEvaluation := kafkaRouter.PathPrefix("/span").Subrouter() + spanEvaluation.HandleFunc("/evaluation", am.ViewAccess(aH.getProducerConsumerEval)).Methods(http.MethodPost) + // for other messaging queues, add SubRouters here } // not using md5 hashing as the plain string would work -func uniqueIdentifier(clientID, serviceInstanceID, serviceName, separator string) string { - return clientID + separator + serviceInstanceID + separator + serviceName +func uniqueIdentifier(params []string, separator string) string { + return strings.Join(params, separator) } func (aH *APIHandler) onboardProducers( @@ -2874,7 +2887,7 @@ func (aH *APIHandler) getNetworkData( return } - queryRangeParams, err := mq.BuildQRParamsNetwork(messagingQueue, "throughput", attributeCache) + queryRangeParams, err := mq.BuildQRParamsWithCache(messagingQueue, "throughput", attributeCache) if err != nil { zap.L().Error(err.Error()) RespondError(w, apiErr, nil) @@ -2901,7 +2914,8 @@ func (aH *APIHandler) getNetworkData( clientID, clientIDOk := series.Labels["client_id"] serviceInstanceID, serviceInstanceIDOk := series.Labels["service_instance_id"] serviceName, serviceNameOk := series.Labels["service_name"] - hashKey := uniqueIdentifier(clientID, serviceInstanceID, serviceName, "#") + params := []string{clientID, serviceInstanceID, serviceName} + hashKey := uniqueIdentifier(params, "#") _, ok := attributeCache.Hash[hashKey] if clientIDOk && serviceInstanceIDOk && serviceNameOk && !ok { attributeCache.Hash[hashKey] = struct{}{} @@ -2912,7 +2926,7 @@ func (aH *APIHandler) getNetworkData( } } - queryRangeParams, err = mq.BuildQRParamsNetwork(messagingQueue, "fetch-latency", attributeCache) + queryRangeParams, err = mq.BuildQRParamsWithCache(messagingQueue, "fetch-latency", attributeCache) if err != nil { zap.L().Error(err.Error()) RespondError(w, apiErr, nil) @@ -2938,7 +2952,8 @@ func (aH *APIHandler) getNetworkData( clientID, clientIDOk := series.Labels["client_id"] serviceInstanceID, serviceInstanceIDOk := series.Labels["service_instance_id"] serviceName, serviceNameOk := series.Labels["service_name"] - hashKey := uniqueIdentifier(clientID, serviceInstanceID, serviceName, "#") + params := []string{clientID, serviceInstanceID, serviceName} + hashKey := uniqueIdentifier(params, "#") _, ok := attributeCache.Hash[hashKey] if clientIDOk && serviceInstanceIDOk && serviceNameOk && ok { latencySeries = append(latencySeries, series) @@ -3040,6 +3055,362 @@ func (aH *APIHandler) getConsumerData( aH.Respond(w, resp) } +// s1 +func (aH *APIHandler) getPartitionOverviewLatencyData( + w http.ResponseWriter, r *http.Request, +) { + messagingQueue, apiErr := ParseMessagingQueueBody(r) + + if apiErr != nil { + zap.L().Error(apiErr.Err.Error()) + RespondError(w, apiErr, nil) + return + } + + queryRangeParams, err := mq.BuildQueryRangeParams(messagingQueue, "partition_latency") + if err != nil { + zap.L().Error(err.Error()) + RespondError(w, apiErr, nil) + return + } + + if err := validateQueryRangeParamsV3(queryRangeParams); err != nil { + zap.L().Error(err.Error()) + RespondError(w, apiErr, nil) + return + } + + var result []*v3.Result + var errQuriesByName map[string]error + + result, errQuriesByName, err = aH.querierV2.QueryRange(r.Context(), queryRangeParams) + if err != nil { + apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err} + RespondError(w, apiErrObj, errQuriesByName) + return + } + result = postprocess.TransformToTableForClickHouseQueries(result) + + resp := v3.QueryRangeResponse{ + Result: result, + } + aH.Respond(w, resp) +} + +// s1 +func (aH *APIHandler) getConsumerPartitionLatencyData( + w http.ResponseWriter, r *http.Request, +) { + messagingQueue, apiErr := ParseMessagingQueueBody(r) + + if apiErr != nil { + zap.L().Error(apiErr.Err.Error()) + RespondError(w, apiErr, nil) + return + } + + queryRangeParams, err := mq.BuildQueryRangeParams(messagingQueue, "consumer_partition_latency") + if err != nil { + zap.L().Error(err.Error()) + RespondError(w, apiErr, nil) + return + } + + if err := validateQueryRangeParamsV3(queryRangeParams); err != nil { + zap.L().Error(err.Error()) + RespondError(w, apiErr, nil) + return + } + + var result []*v3.Result + var errQuriesByName map[string]error + + result, errQuriesByName, err = aH.querierV2.QueryRange(r.Context(), queryRangeParams) + if err != nil { + apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err} + RespondError(w, apiErrObj, errQuriesByName) + return + } + result = postprocess.TransformToTableForClickHouseQueries(result) + + resp := v3.QueryRangeResponse{ + Result: result, + } + aH.Respond(w, resp) +} + +// s3 p overview +// fetch traces +// cache attributes +// fetch byte rate metrics +func (aH *APIHandler) getProducerThroughputOverview( + w http.ResponseWriter, r *http.Request, +) { + messagingQueue, apiErr := ParseMessagingQueueBody(r) + + if apiErr != nil { + zap.L().Error(apiErr.Err.Error()) + RespondError(w, apiErr, nil) + return + } + + attributeCache := &mq.Clients{ + Hash: make(map[string]struct{}), + } + + queryRangeParams, err := mq.BuildQRParamsWithCache(messagingQueue, "producer-throughput-overview", attributeCache) + if err != nil { + zap.L().Error(err.Error()) + RespondError(w, apiErr, nil) + return + } + + if err := validateQueryRangeParamsV3(queryRangeParams); err != nil { + zap.L().Error(err.Error()) + RespondError(w, apiErr, nil) + return + } + + var result []*v3.Result + var errQuriesByName map[string]error + + result, errQuriesByName, err = aH.querierV2.QueryRange(r.Context(), queryRangeParams) + if err != nil { + apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err} + RespondError(w, apiErrObj, errQuriesByName) + return + } + + for _, res := range result { + for _, series := range res.Series { + serviceName, serviceNameOk := series.Labels["service_name"] + topicName, topicNameOk := series.Labels["topic"] + params := []string{serviceName, topicName} + hashKey := uniqueIdentifier(params, "#") + _, ok := attributeCache.Hash[hashKey] + if topicNameOk && serviceNameOk && !ok { + attributeCache.Hash[hashKey] = struct{}{} + attributeCache.TopicName = append(attributeCache.TopicName, topicName) + attributeCache.ServiceName = append(attributeCache.ServiceName, serviceName) + } + } + } + + queryRangeParams, err = mq.BuildQRParamsWithCache(messagingQueue, "producer-throughput-overview-latency", attributeCache) + if err != nil { + zap.L().Error(err.Error()) + RespondError(w, apiErr, nil) + return + } + if err := validateQueryRangeParamsV3(queryRangeParams); err != nil { + zap.L().Error(err.Error()) + RespondError(w, apiErr, nil) + return + } + + resultFetchLatency, errQueriesByNameFetchLatency, err := aH.querierV2.QueryRange(r.Context(), queryRangeParams) + if err != nil { + apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err} + RespondError(w, apiErrObj, errQueriesByNameFetchLatency) + return + } + + latencyColumn := &v3.Result{QueryName: "latency"} + var latencySeries []*v3.Series + for _, res := range resultFetchLatency { + for _, series := range res.Series { + topic, topicOk := series.Labels["topic"] + serviceName, serviceNameOk := series.Labels["service_name"] + params := []string{topic, serviceName} + hashKey := uniqueIdentifier(params, "#") + _, ok := attributeCache.Hash[hashKey] + if topicOk && serviceNameOk && ok { + latencySeries = append(latencySeries, series) + } + } + } + + latencyColumn.Series = latencySeries + result = append(result, latencyColumn) + + resultFetchLatency = postprocess.TransformToTableForBuilderQueries(result, queryRangeParams) + + resp := v3.QueryRangeResponse{ + Result: resultFetchLatency, + } + aH.Respond(w, resp) +} + +// s3 p details +func (aH *APIHandler) getProducerThroughputDetails( + w http.ResponseWriter, r *http.Request, +) { + messagingQueue, apiErr := ParseMessagingQueueBody(r) + + if apiErr != nil { + zap.L().Error(apiErr.Err.Error()) + RespondError(w, apiErr, nil) + return + } + + queryRangeParams, err := mq.BuildQueryRangeParams(messagingQueue, "producer-throughput-details") + if err != nil { + zap.L().Error(err.Error()) + RespondError(w, apiErr, nil) + return + } + + if err := validateQueryRangeParamsV3(queryRangeParams); err != nil { + zap.L().Error(err.Error()) + RespondError(w, apiErr, nil) + return + } + + var result []*v3.Result + var errQuriesByName map[string]error + + result, errQuriesByName, err = aH.querierV2.QueryRange(r.Context(), queryRangeParams) + if err != nil { + apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err} + RespondError(w, apiErrObj, errQuriesByName) + return + } + result = postprocess.TransformToTableForClickHouseQueries(result) + + resp := v3.QueryRangeResponse{ + Result: result, + } + aH.Respond(w, resp) +} + +// s3 c overview +func (aH *APIHandler) getConsumerThroughputOverview( + w http.ResponseWriter, r *http.Request, +) { + messagingQueue, apiErr := ParseMessagingQueueBody(r) + + if apiErr != nil { + zap.L().Error(apiErr.Err.Error()) + RespondError(w, apiErr, nil) + return + } + + queryRangeParams, err := mq.BuildQueryRangeParams(messagingQueue, "consumer-throughput-overview") + if err != nil { + zap.L().Error(err.Error()) + RespondError(w, apiErr, nil) + return + } + + if err := validateQueryRangeParamsV3(queryRangeParams); err != nil { + zap.L().Error(err.Error()) + RespondError(w, apiErr, nil) + return + } + + var result []*v3.Result + var errQuriesByName map[string]error + + result, errQuriesByName, err = aH.querierV2.QueryRange(r.Context(), queryRangeParams) + if err != nil { + apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err} + RespondError(w, apiErrObj, errQuriesByName) + return + } + result = postprocess.TransformToTableForClickHouseQueries(result) + + resp := v3.QueryRangeResponse{ + Result: result, + } + aH.Respond(w, resp) +} + +// s3 c details +func (aH *APIHandler) getConsumerThroughputDetails( + w http.ResponseWriter, r *http.Request, +) { + messagingQueue, apiErr := ParseMessagingQueueBody(r) + + if apiErr != nil { + zap.L().Error(apiErr.Err.Error()) + RespondError(w, apiErr, nil) + return + } + + queryRangeParams, err := mq.BuildQueryRangeParams(messagingQueue, "consumer-throughput-details") + if err != nil { + zap.L().Error(err.Error()) + RespondError(w, apiErr, nil) + return + } + + if err := validateQueryRangeParamsV3(queryRangeParams); err != nil { + zap.L().Error(err.Error()) + RespondError(w, apiErr, nil) + return + } + + var result []*v3.Result + var errQuriesByName map[string]error + + result, errQuriesByName, err = aH.querierV2.QueryRange(r.Context(), queryRangeParams) + if err != nil { + apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err} + RespondError(w, apiErrObj, errQuriesByName) + return + } + result = postprocess.TransformToTableForClickHouseQueries(result) + + resp := v3.QueryRangeResponse{ + Result: result, + } + aH.Respond(w, resp) +} + +// s4 +// needs logic to parse duration +// needs logic to get the percentage +// show 10 traces +func (aH *APIHandler) getProducerConsumerEval( + w http.ResponseWriter, r *http.Request, +) { + messagingQueue, apiErr := ParseMessagingQueueBody(r) + + if apiErr != nil { + zap.L().Error(apiErr.Err.Error()) + RespondError(w, apiErr, nil) + return + } + + queryRangeParams, err := mq.BuildQueryRangeParams(messagingQueue, "producer-consumer-eval") + if err != nil { + zap.L().Error(err.Error()) + RespondError(w, apiErr, nil) + return + } + + if err := validateQueryRangeParamsV3(queryRangeParams); err != nil { + zap.L().Error(err.Error()) + RespondError(w, apiErr, nil) + return + } + + var result []*v3.Result + var errQuriesByName map[string]error + + result, errQuriesByName, err = aH.querierV2.QueryRange(r.Context(), queryRangeParams) + if err != nil { + apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err} + RespondError(w, apiErrObj, errQuriesByName) + return + } + + resp := v3.QueryRangeResponse{ + Result: result, + } + aH.Respond(w, resp) +} + // ParseMessagingQueueBody parse for messaging queue params func ParseMessagingQueueBody(r *http.Request) (*mq.MessagingQueue, *model.ApiError) { messagingQueue := new(mq.MessagingQueue) diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md b/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md index d338a9acb9..0b09b293ce 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md @@ -28,6 +28,7 @@ Response in query range `table` format "resultType": "", "result": [ { + "table": { "columns": [ { @@ -519,3 +520,461 @@ Response in query range `table` format ] } ``` +### Partition Latency + +```json +/api/v1/messaging-queues/kafka/partition-latency/overview +``` +```json +{ + "start": 1728287046000000000, + "end": 1728587046000000000 +} +``` + +```json +{ + "status": "success", + "data": { + "resultType": "", + "result": [ + { + "table": { + "columns": [ + { + "name": "topic", + "queryName": "", + "isValueColumn": false + }, + { + "name": "p99", + "queryName": "", + "isValueColumn": false + }, + { + "name": "partition_latency", + "queryName": "partition_latency", + "isValueColumn": true + } + ], + "rows": [ + { + "data": { + "p99": "2", + "partition_latency": 1.18, + "topic": "topic1" + } + }, + { + "data": { + "p99": "2", + "partition_latency": 0.15, + "topic": "topic2" + } + }, + { + "data": { + "p99": "2", + "partition_latency": 0.26, + "topic": "topic3" + } + } + ] + } + } + ] + } +} +``` +--------- + +```json +/api/v1/messaging-queues/kafka/partition-latency/consumer +``` +```json +{ + "start": 1728287046000000000, + "end": 1728587046000000000, + "variables": { + "partition": "2", + "topic": "topic1" + } +} + +``` +```json +{ + "status": "success", + "data": { + "resultType": "", + "result": [ + { + "table": { + "columns": [ + { + "name": "consumer_group", + "queryName": "", + "isValueColumn": false + }, + { + "name": "service_name", + "queryName": "", + "isValueColumn": false + }, + { + "name": "p99", + "queryName": "", + "isValueColumn": false + }, + { + "name": "error_rate", + "queryName": "", + "isValueColumn": false + }, + { + "name": "throughput", + "queryName": "", + "isValueColumn": false + } + ], + "rows": [ + { + "data": { + "consumer_group": "cg1", + "error_rate": "0", + "p99": "0.11994228000000004", + "service_name": "consumer-svc", + "throughput": "1.18116" + } + } + ] + } + } + ] + } +} +``` +--------- +### Topic throughput + +```json +/api/v1/messaging-queues/kafka/topic-throughput/producer +``` +```json +{ + "start": 1728287046000000000, + "end": 1728587046000000000 +} +``` +```json +{ + "status": "success", + "data": { + "resultType": "", + "result": [ + { + "table": { + "columns": [ + { + "name": "topic", + "queryName": "", + "isValueColumn": false + }, + { + "name": "serviceName", + "queryName": "", + "isValueColumn": false + }, + { + "name": "p99", + "queryName": "", + "isValueColumn": false + }, + { + "name": "error_rate", + "queryName": "", + "isValueColumn": false + }, + { + "name": "throughput", + "queryName": "", + "isValueColumn": false + } + ], + "rows": [ + { + "data": { + "error_rate": "0", + "p99": "8.662880220000002", + "serviceName": "producer-svc1", + "throughput": "0.41642666666666667", + "topic": "topic1" + } + }, + { + "data": { + "error_rate": "0", + "p99": "9.786847500000016", + "serviceName": "producer-svc2", + "throughput": "0.76473", + "topic": "topic1" + } + }, + { + "data": { + "error_rate": "0", + "p99": "14.432925500000021", + "serviceName": "producer-svc3", + "throughput": "0.08976", + "topic": "topic2" + } + }, + { + "data": { + "error_rate": "0", + "p99": "14.32833297000002", + "serviceName": "producer-svc2", + "throughput": "0.06449333333333333", + "topic": "topic2" + } + }, + { + "data": { + "error_rate": "0", + "p99": "13.416533810000036", + "serviceName": "producer-svc4", + "throughput": "0.14766", + "topic": "topic3" + } + }, + { + "data": { + "error_rate": "0", + "p99": "13.366232000000034", + "serviceName": "producer-svc3", + "throughput": "0.11166666666666666", + "topic": "topic3" + } + } + ] + } + } + ] + } +} +``` +--------- +### Topic throughput + +```json +/api/v1/messaging-queues/kafka/topic-throughput/producer-details +``` +```json +{ + "start": 1728287046000000000, + "end": 1728587046000000000, + "variables": { + "partition": "2", + "topic": "topic1", + "service_name": "producer-svc2" + } +} +``` +```json +{ + "status": "success", + "data": { + "resultType": "", + "result": [ + { + "table": { + "columns": [ + { + "name": "partition", + "queryName": "", + "isValueColumn": false + }, + { + "name": "p99", + "queryName": "", + "isValueColumn": false + }, + { + "name": "error_rate", + "queryName": "", + "isValueColumn": false + }, + { + "name": "throughput", + "queryName": "", + "isValueColumn": false + } + ], + "rows": [ + { + "data": { + "error_rate": "0", + "p99": "9.165558780000026", + "partition": "2", + "throughput": "0.76473" + } + } + ] + } + } + ] + } +} +``` +--------- +### Topic throughput + +```json +/api/v1/messaging-queues/kafka/topic-throughput/consumer +``` +```json +{ + "start": 1728287046000000000, + "end": 1728587046000000000 +} +``` +```json +{ + "status": "success", + "data": { + "resultType": "", + "result": [ + { + "table": { + "columns": [ + { + "name": "topic", + "queryName": "", + "isValueColumn": false + }, + { + "name": "service_name", + "queryName": "", + "isValueColumn": false + }, + { + "name": "p99", + "queryName": "", + "isValueColumn": false + }, + { + "name": "error_rate", + "queryName": "", + "isValueColumn": false + }, + { + "name": "ingestion_rate", + "queryName": "", + "isValueColumn": false + }, + { + "name": "byte_rate", + "queryName": "", + "isValueColumn": false + } + ], + "rows": [ + { + "data": { + "byte_rate": "17.7174", + "error_rate": "0", + "ingestion_rate": "1.18116", + "p99": "0.12260112000000009", + "service_name": "consumer-svc", + "topic": "topic1" + } + }, + { + "data": { + "byte_rate": "2.1594533333333334", + "error_rate": "0", + "ingestion_rate": "0.15424666666666667", + "p99": "7.4079657800000005", + "service_name": "consumer-svc2", + "topic": "topic2" + } + }, + { + "data": { + "byte_rate": "3.66446", + "error_rate": "0", + "ingestion_rate": "0.25933", + "p99": "6.135769970000011", + "service_name": "consumer-svc3", + "topic": "topic3" + } + } + ] + } + } + ] + } +} +``` +--------- +### Topic throughput + +```json +/api/v1/messaging-queues/kafka/topic-throughput/consumer-details +``` +```json +{ + "start": 1728287046000000000, + "end": 1728587046000000000, + "variables": { + "topic": "topic1", + "service_name": "consumer-svc" + } +} +``` +```json +{ + "status": "success", + "data": { + "resultType": "", + "result": [ + { + "table": { + "columns": [ + { + "name": "partition", + "queryName": "", + "isValueColumn": false + }, + { + "name": "p99", + "queryName": "", + "isValueColumn": false + }, + { + "name": "error_rate", + "queryName": "", + "isValueColumn": false + }, + { + "name": "throughput", + "queryName": "", + "isValueColumn": false + } + ], + "rows": [ + { + "data": { + "error_rate": "0", + "p99": "0.11789381000000003", + "partition": "2", + "throughput": "1.18116" + } + } + ] + } + } + ] + } +} +``` diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/model.go b/pkg/query-service/app/integrations/messagingQueues/kafka/model.go index 803912c17f..6cddb1521c 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/model.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/model.go @@ -5,6 +5,7 @@ const KafkaQueue = "kafka" type MessagingQueue struct { Start int64 `json:"start"` End int64 `json:"end"` + EvalTime int64 `json:"eval_time"` Variables map[string]string `json:"variables,omitempty"` } @@ -13,6 +14,7 @@ type Clients struct { ClientID []string ServiceInstanceID []string ServiceName []string + TopicName []string } type OnboardingResponse struct { diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go b/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go index a4d3191fe1..eeb1167f23 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go @@ -12,7 +12,7 @@ WITH consumer_query AS ( serviceName, quantile(0.99)(durationNano) / 1000000 AS p99, COUNT(*) AS total_requests, - SUM(CASE WHEN statusCode = 2 THEN 1 ELSE 0 END) AS error_count, + sumIf(1, statusCode = 2) AS error_count, avg(CASE WHEN has(numberTagMap, 'messaging.message.body.size') THEN numberTagMap['messaging.message.body.size'] ELSE NULL END) AS avg_msg_size FROM signoz_traces.distributed_signoz_index_v2 WHERE @@ -30,7 +30,7 @@ SELECT serviceName AS service_name, p99, COALESCE((error_count * 100.0) / total_requests, 0) AS error_rate, - COALESCE(total_requests / %d, 0) AS throughput, -- Convert nanoseconds to seconds + COALESCE(total_requests / %d, 0) AS throughput, COALESCE(avg_msg_size, 0) AS avg_msg_size FROM consumer_query @@ -40,6 +40,257 @@ ORDER BY return query } +// S1 landing +func generatePartitionLatencySQL(start, end int64, queueType string) string { + timeRange := (end - start) / 1000000000 + query := fmt.Sprintf(` +WITH partition_query AS ( + SELECT + quantile(0.99)(durationNano) / 1000000 AS p99, + count(*) AS total_requests, + stringTagMap['messaging.destination.name'] AS topic, + stringTagMap['messaging.destination.partition.id'] AS partition + FROM signoz_traces.distributed_signoz_index_v2 + WHERE + timestamp >= '%d' + AND timestamp <= '%d' + AND kind = 4 + AND msgSystem = '%s' + GROUP BY topic, partition +) + +SELECT + topic, + partition, + p99, + COALESCE(total_requests / %d, 0) AS throughput +FROM + partition_query +ORDER BY + topic; +`, start, end, queueType, timeRange) + return query +} + +// S1 consumer +func generateConsumerPartitionLatencySQL(start, end int64, topic, partition, queueType string) string { + timeRange := (end - start) / 1000000000 + query := fmt.Sprintf(` +WITH consumer_pl AS ( + SELECT + stringTagMap['messaging.kafka.consumer.group'] AS consumer_group, + serviceName, + quantile(0.99)(durationNano) / 1000000 AS p99, + COUNT(*) AS total_requests, + sumIf(1, statusCode = 2) AS error_count + FROM signoz_traces.distributed_signoz_index_v2 + WHERE + timestamp >= '%d' + AND timestamp <= '%d' + AND kind = 5 + AND msgSystem = '%s' + AND stringTagMap['messaging.destination.name'] = '%s' + AND stringTagMap['messaging.destination.partition.id'] = '%s' + GROUP BY consumer_group, serviceName +) + +SELECT + consumer_group, + serviceName AS service_name, + p99, + COALESCE((error_count * 100.0) / total_requests, 0) AS error_rate, + COALESCE(total_requests / %d, 0) AS throughput +FROM + consumer_pl +ORDER BY + consumer_group; +`, start, end, queueType, topic, partition, timeRange) + return query +} + +// S3, producer overview +func generateProducerPartitionThroughputSQL(start, end int64, queueType string) string { + timeRange := (end - start) / 1000000000 + // t, svc, rps, byte*, p99, err + query := fmt.Sprintf(` +WITH producer_latency AS ( + SELECT + serviceName, + quantile(0.99)(durationNano) / 1000000 AS p99, + stringTagMap['messaging.destination.name'] AS topic, + COUNT(*) AS total_requests, + sumIf(1, statusCode = 2) AS error_count + FROM signoz_traces.distributed_signoz_index_v2 + WHERE + timestamp >= '%d' + AND timestamp <= '%d' + AND kind = 4 + AND msgSystem = '%s' + GROUP BY topic, serviceName +) + +SELECT + topic, + serviceName AS service_name, + p99, + COALESCE((error_count * 100.0) / total_requests, 0) AS error_rate, + COALESCE(total_requests / %d, 0) AS throughput +FROM + producer_latency +`, start, end, queueType, timeRange) + return query +} + +// S3, producer topic/service overview +func generateProducerTopicLatencySQL(start, end int64, topic, service, queueType string) string { + timeRange := (end - start) / 1000000000 + query := fmt.Sprintf(` +WITH consumer_latency AS ( + SELECT + quantile(0.99)(durationNano) / 1000000 AS p99, + stringTagMap['messaging.destination.partition.id'] AS partition, + COUNT(*) AS total_requests, + sumIf(1, statusCode = 2) AS error_count + FROM signoz_traces.distributed_signoz_index_v2 + WHERE + timestamp >= '%d' + AND timestamp <= '%d' + AND kind = 4 + AND serviceName = '%s' + AND msgSystem = '%s' + AND stringTagMap['messaging.destination.name'] = '%s' + GROUP BY partition +) + +SELECT + partition, + p99, + COALESCE((error_count * 100.0) / total_requests, 0) AS error_rate, + COALESCE(total_requests / %d, 0) AS throughput +FROM + consumer_latency +`, start, end, service, queueType, topic, timeRange) + return query +} + +// S3 consumer overview +func generateConsumerLatencySQL(start, end int64, queueType string) string { + timeRange := (end - start) / 1000000000 + query := fmt.Sprintf(` +WITH consumer_latency AS ( + SELECT + serviceName, + stringTagMap['messaging.destination.name'] AS topic, + quantile(0.99)(durationNano) / 1000000 AS p99, + COUNT(*) AS total_requests, + sumIf(1, statusCode = 2) AS error_count, + SUM(numberTagMap['messaging.message.body.size']) AS total_bytes + FROM signoz_traces.distributed_signoz_index_v2 + WHERE + timestamp >= '%d' + AND timestamp <= '%d' + AND kind = 5 + AND msgSystem = '%s' + GROUP BY topic, serviceName +) + +SELECT + topic, + serviceName AS service_name, + p99, + COALESCE((error_count * 100.0) / total_requests, 0) AS error_rate, + COALESCE(total_requests / %d, 0) AS ingestion_rate, + COALESCE(total_bytes / %d, 0) AS byte_rate +FROM + consumer_latency +ORDER BY + topic; +`, start, end, queueType, timeRange, timeRange) + return query +} + +// S3 consumer topic/service +func generateConsumerServiceLatencySQL(start, end int64, topic, service, queueType string) string { + timeRange := (end - start) / 1000000000 + query := fmt.Sprintf(` +WITH consumer_latency AS ( + SELECT + quantile(0.99)(durationNano) / 1000000 AS p99, + stringTagMap['messaging.destination.partition.id'] AS partition, + COUNT(*) AS total_requests, + sumIf(1, statusCode = 2) AS error_count + FROM signoz_traces.distributed_signoz_index_v2 + WHERE + timestamp >= '%d' + AND timestamp <= '%d' + AND kind = 5 + AND serviceName = '%s' + AND msgSystem = '%s' + AND stringTagMap['messaging.destination.name'] = '%s' + GROUP BY partition +) + +SELECT + partition, + p99, + COALESCE((error_count * 100.0) / total_requests, 0) AS error_rate, + COALESCE(total_requests / %d, 0) AS throughput +FROM + consumer_latency +`, start, end, service, queueType, topic, timeRange) + return query +} + +// s4 +func generateProducerConsumerEvalSQL(start, end int64, queueType string, evalTime int64) string { + query := fmt.Sprintf(` +WITH trace_data AS ( + SELECT + p.serviceName AS producer_service, + c.serviceName AS consumer_service, + p.traceID, + p.timestamp AS producer_timestamp, + c.timestamp AS consumer_timestamp, + p.durationNano AS durationNano, + (toUnixTimestamp64Nano(c.timestamp) - toUnixTimestamp64Nano(p.timestamp)) + p.durationNano AS time_difference + FROM + signoz_traces.distributed_signoz_index_v2 p + INNER JOIN + signoz_traces.distributed_signoz_index_v2 c + ON p.traceID = c.traceID + AND c.parentSpanID = p.spanID + WHERE + p.kind = 4 + AND c.kind = 5 + AND toUnixTimestamp64Nano(p.timestamp) BETWEEN '%d' AND '%d' + AND toUnixTimestamp64Nano(c.timestamp) BETWEEN '%d' AND '%d' + AND c.msgSystem = '%s' + AND p.msgSystem = '%s' +) + +SELECT + producer_service, + consumer_service, + COUNT(*) AS total_spans, + SUM(time_difference > '%d') AS breached_spans, + ((breached_spans) * 100.0) / total_spans AS breach_percentage, + arraySlice( + arrayMap(x -> x.1, + arraySort( + x -> -x.2, + groupArrayIf((traceID, time_difference), time_difference > '%d') + ) + ), + 1, 10 + ) AS top_traceIDs +FROM trace_data +GROUP BY + producer_service, + consumer_service +`, start, end, start, end, queueType, queueType, evalTime, evalTime) + return query +} + func generateProducerSQL(start, end int64, topic, partition, queueType string) string { timeRange := (end - start) / 1000000000 query := fmt.Sprintf(` @@ -48,7 +299,7 @@ WITH producer_query AS ( serviceName, quantile(0.99)(durationNano) / 1000000 AS p99, count(*) AS total_count, - SUM(CASE WHEN statusCode = 2 THEN 1 ELSE 0 END) AS error_count + sumIf(1, statusCode = 2) AS error_count FROM signoz_traces.distributed_signoz_index_v2 WHERE timestamp >= '%d' @@ -64,12 +315,11 @@ SELECT serviceName AS service_name, p99, COALESCE((error_count * 100.0) / total_count, 0) AS error_percentage, - COALESCE(total_count / %d, 0) AS throughput -- Convert nanoseconds to seconds + COALESCE(total_count / %d, 0) AS throughput FROM producer_query ORDER BY serviceName; - `, start, end, queueType, topic, partition, timeRange) return query } diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go b/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go index 4dca2a2cda..14150b3ac7 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go @@ -2,8 +2,7 @@ package kafka import ( "fmt" - "strings" - + "go.signoz.io/signoz/ee/query-service/constants" "go.signoz.io/signoz/pkg/query-service/common" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" ) @@ -12,6 +11,9 @@ var defaultStepInterval int64 = 60 func BuildQueryRangeParams(messagingQueue *MessagingQueue, queryContext string) (*v3.QueryRangeParamsV3, error) { + if constants.KafkaSpanEval == "false" && queryContext == "producer-consumer-eval" { + return nil, fmt.Errorf("span evaluation feature is disabled and is experimental") + } // ToDo: propagate this through APIs when there are different handlers queueType := KafkaQueue @@ -37,14 +39,6 @@ func BuildQueryRangeParams(messagingQueue *MessagingQueue, queryContext string) return queryRangeParams, nil } -func PrepareClikhouseQueries(messagingQueue *MessagingQueue, queryContext string) (*v3.ClickHouseQuery, error) { - queueType := KafkaQueue - - chq, err := BuildClickHouseQuery(messagingQueue, queueType, queryContext) - - return chq, err -} - func buildClickHouseQueryNetwork(messagingQueue *MessagingQueue, queueType string) (*v3.ClickHouseQuery, error) { start := messagingQueue.Start end := messagingQueue.End @@ -65,12 +59,60 @@ func buildClickHouseQueryNetwork(messagingQueue *MessagingQueue, queueType strin }, nil } -func formatstring(str []string) string { - joined := strings.Join(str, ", ") - if len(joined) <= 2 { - return "" +func buildBuilderQueriesProducerBytes(unixMilliStart, unixMilliEnd int64, attributeCache *Clients) (map[string]*v3.BuilderQuery, error) { + bq := make(map[string]*v3.BuilderQuery) + queryName := fmt.Sprintf("latency") + + chq := &v3.BuilderQuery{ + QueryName: queryName, + StepInterval: common.MinAllowedStepInterval(unixMilliStart, unixMilliEnd), + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: "kafka_producer_byte_rate", + }, + AggregateOperator: v3.AggregateOperatorAvg, + Temporality: v3.Unspecified, + TimeAggregation: v3.TimeAggregationAvg, + SpaceAggregation: v3.SpaceAggregationAvg, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{ + Key: "service_name", + Type: v3.AttributeKeyTypeTag, + DataType: v3.AttributeKeyDataTypeString, + }, + Operator: v3.FilterOperatorIn, + Value: attributeCache.ServiceName, + }, + { + Key: v3.AttributeKey{ + Key: "topic", + Type: v3.AttributeKeyTypeTag, + DataType: v3.AttributeKeyDataTypeString, + }, + Operator: v3.FilterOperatorIn, + Value: attributeCache.TopicName, + }, + }, + }, + Expression: queryName, + ReduceTo: v3.ReduceToOperatorAvg, + GroupBy: []v3.AttributeKey{{ + Key: "service_name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + { + Key: "topic", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + }, } - return joined[1 : len(joined)-1] + bq[queryName] = chq + return bq, nil } func buildBuilderQueriesNetwork(unixMilliStart, unixMilliEnd int64, attributeCache *Clients) (map[string]*v3.BuilderQuery, error) { @@ -143,7 +185,7 @@ func buildBuilderQueriesNetwork(unixMilliStart, unixMilliEnd int64, attributeCac return bq, nil } -func BuildQRParamsNetwork(messagingQueue *MessagingQueue, queryContext string, attributeCache *Clients) (*v3.QueryRangeParamsV3, error) { +func BuildQRParamsWithCache(messagingQueue *MessagingQueue, queryContext string, attributeCache *Clients) (*v3.QueryRangeParamsV3, error) { queueType := KafkaQueue @@ -151,6 +193,7 @@ func BuildQRParamsNetwork(messagingQueue *MessagingQueue, queryContext string, a unixMilliEnd := messagingQueue.End / 1000000 var cq *v3.CompositeQuery + var err error if queryContext == "throughput" { chq, err := buildClickHouseQueryNetwork(messagingQueue, queueType) @@ -171,6 +214,24 @@ func BuildQRParamsNetwork(messagingQueue *MessagingQueue, queryContext string, a BuilderQueries: bhq, PanelType: v3.PanelTypeTable, } + } else if queryContext == "producer-throughput-overview" { + start := messagingQueue.Start + end := messagingQueue.End + query := generateProducerPartitionThroughputSQL(start, end, queueType) + + cq, err = buildCompositeQuery(&v3.ClickHouseQuery{ + Query: query, + }, queryContext) + } else if queryContext == "producer-throughput-overview-latency" { + bhq, err := buildBuilderQueriesProducerBytes(unixMilliStart, unixMilliEnd, attributeCache) + if err != nil { + return nil, err + } + cq = &v3.CompositeQuery{ + QueryType: v3.QueryTypeBuilder, + BuilderQueries: bhq, + PanelType: v3.PanelTypeTable, + } } queryRangeParams := &v3.QueryRangeParamsV3{ @@ -182,7 +243,7 @@ func BuildQRParamsNetwork(messagingQueue *MessagingQueue, queryContext string, a FormatForWeb: true, } - return queryRangeParams, nil + return queryRangeParams, err } func BuildClickHouseQuery(messagingQueue *MessagingQueue, queueType string, queryContext string) (*v3.ClickHouseQuery, error) { @@ -190,16 +251,22 @@ func BuildClickHouseQuery(messagingQueue *MessagingQueue, queueType string, quer end := messagingQueue.End var topic, partition string - - if queryContext == "producer" || queryContext == "consumer" { + if queryContext == "producer" || + queryContext == "consumer" || + queryContext == "consumer_partition_latency" || + queryContext == "producer-topic-throughput" || + queryContext == "producer-throughput-details" || + queryContext == "consumer-throughput-details" { var ok bool topic, ok = messagingQueue.Variables["topic"] if !ok { return nil, fmt.Errorf("invalid type for Topic") } - partition, ok = messagingQueue.Variables["partition"] - if !ok { - return nil, fmt.Errorf("invalid type for Partition") + if queryContext != "consumer-throughput-details" { + partition, ok = messagingQueue.Variables["partition"] + if !ok { + return nil, fmt.Errorf("invalid type for Partition") + } } } @@ -212,6 +279,26 @@ func BuildClickHouseQuery(messagingQueue *MessagingQueue, queueType string, quer return nil, fmt.Errorf("invalid type for consumer group") } query = generateConsumerSQL(start, end, topic, partition, consumerGroup, queueType) + } else if queryContext == "producer-topic-throughput" { + query = generatePartitionLatencySQL(start, end, queueType) + } else if queryContext == "consumer_partition_latency" { + query = generateConsumerPartitionLatencySQL(start, end, topic, partition, queueType) + } else if queryContext == "producer-throughput-details" { + svcName, ok := messagingQueue.Variables["service_name"] + if !ok { + return nil, fmt.Errorf("invalid type for service") + } + query = generateProducerTopicLatencySQL(start, end, topic, svcName, queueType) + } else if queryContext == "consumer-throughput-overview" { + query = generateConsumerLatencySQL(start, end, queueType) + } else if queryContext == "consumer-throughput-details" { + svcName, ok := messagingQueue.Variables["service_name"] + if !ok { + return nil, fmt.Errorf("invalid type for service") + } + query = generateConsumerServiceLatencySQL(start, end, topic, svcName, queueType) + } else if queryContext == "producer-consumer-eval" { + query = generateProducerConsumerEvalSQL(start, end, queueType, messagingQueue.EvalTime) } else if queryContext == "onboard_producers" { query = onboardProducersSQL(start, end, queueType) } else if queryContext == "onboard_consumers" { @@ -219,13 +306,21 @@ func BuildClickHouseQuery(messagingQueue *MessagingQueue, queueType string, quer } else if queryContext == "onboard_kafka" { query = onboardKafkaSQL(start, end) } - return &v3.ClickHouseQuery{ Query: query, }, nil } func buildCompositeQuery(chq *v3.ClickHouseQuery, queryContext string) (*v3.CompositeQuery, error) { + + if queryContext == "producer-consumer-eva" { + return &v3.CompositeQuery{ + QueryType: v3.QueryTypeClickHouseSQL, + ClickHouseQueries: map[string]*v3.ClickHouseQuery{queryContext: chq}, + PanelType: v3.PanelTypeList, + }, nil + } + return &v3.CompositeQuery{ QueryType: v3.QueryTypeClickHouseSQL, ClickHouseQueries: map[string]*v3.ClickHouseQuery{queryContext: chq}, diff --git a/pkg/query-service/app/querier/v2/querier.go b/pkg/query-service/app/querier/v2/querier.go index f41fd761dd..311d213656 100644 --- a/pkg/query-service/app/querier/v2/querier.go +++ b/pkg/query-service/app/querier/v2/querier.go @@ -381,7 +381,15 @@ func (q *querier) runBuilderListQueries(ctx context.Context, params *v3.QueryRan } } - queries, err := q.builder.PrepareQueries(params) + queries := make(map[string]string) + var err error + if params.CompositeQuery.QueryType == v3.QueryTypeBuilder { + queries, err = q.builder.PrepareQueries(params) + } else if params.CompositeQuery.QueryType == v3.QueryTypeClickHouseSQL { + for name, chQuery := range params.CompositeQuery.ClickHouseQueries { + queries[name] = chQuery.Query + } + } if err != nil { return nil, nil, err @@ -452,7 +460,11 @@ func (q *querier) QueryRange(ctx context.Context, params *v3.QueryRangeParamsV3) case v3.QueryTypePromQL: results, errQueriesByName, err = q.runPromQueries(ctx, params) case v3.QueryTypeClickHouseSQL: - results, errQueriesByName, err = q.runClickHouseQueries(ctx, params) + if params.CompositeQuery.PanelType == v3.PanelTypeList || params.CompositeQuery.PanelType == v3.PanelTypeTrace { + results, errQueriesByName, err = q.runBuilderListQueries(ctx, params) + } else { + results, errQueriesByName, err = q.runClickHouseQueries(ctx, params) + } default: err = fmt.Errorf("invalid query type") }