From 481bb6e8b8d68b40d5b6706b91bb78b71d59a3c7 Mon Sep 17 00:00:00 2001 From: shivanshu Date: Fri, 26 Jul 2024 11:50:02 +0530 Subject: [PATCH 1/5] feat: add consumer and producer APIs --- pkg/query-service/app/http_handler.go | 217 +++++++++++---- .../messagingQueues/kafka/consumerLag.md | 251 ++++++++++++++++++ .../messagingQueues/kafka/model.go | 7 + .../integrations/messagingQueues/kafka/sql.go | 165 ++++++++++++ .../messagingQueues/kafka/translator.go | 101 +++++++ .../integrations/messagingQueues/readme.md | 10 + pkg/query-service/app/server.go | 1 + 7 files changed, 696 insertions(+), 56 deletions(-) create mode 100644 pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md create mode 100644 pkg/query-service/app/integrations/messagingQueues/kafka/model.go create mode 100644 pkg/query-service/app/integrations/messagingQueues/kafka/sql.go create mode 100644 pkg/query-service/app/integrations/messagingQueues/kafka/translator.go create mode 100644 pkg/query-service/app/integrations/messagingQueues/readme.md diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 4eff84d50c..b74cafabfb 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -43,6 +43,7 @@ import ( "go.uber.org/zap" + mq "go.signoz.io/signoz/pkg/query-service/app/integrations/messagingQueues/kafka" "go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline" "go.signoz.io/signoz/pkg/query-service/dao" am "go.signoz.io/signoz/pkg/query-service/integrations/alertManager" @@ -2246,9 +2247,113 @@ func (aH *APIHandler) WriteJSON(w http.ResponseWriter, r *http.Request, response w.Write(resp) } +// RegisterMessagingQueuesRoutes adds messaging-queues routes +func (aH *APIHandler) RegisterMessagingQueuesRoutes(router *mux.Router, am *AuthMiddleware) { + // SubRouter for kafka + kafkaSubRouter := router.PathPrefix("/api/v1/messaging-queues/kafka/consumer-lag").Subrouter() + + //kafkaSubRouter.HandleFunc("/consumer-lag", am.ViewAccess(aH.QueryRangeV4)).Methods(http.MethodPost) + kafkaSubRouter.HandleFunc("/producer-details", am.ViewAccess(aH.getProducerData)).Methods(http.MethodPost) + kafkaSubRouter.HandleFunc("/consumer-details", am.ViewAccess(aH.getConsumerData)).Methods(http.MethodPost) + + // for other messaging queues, add SubRouters here +} + +func (aH *APIHandler) getProducerData( + w http.ResponseWriter, r *http.Request, +) { + // parse the query params to retrieve the messaging queue struct + messagingQueue, apiErr := ParseMessagingQueueParams(r) + + if apiErr != nil { + zap.L().Error(apiErr.Err.Error()) + RespondError(w, apiErr, nil) + return + } + + queryRangeParams, err := mq.BuildQueryRangeParams(messagingQueue, "producer") + if err != nil { + zap.L().Error(err.Error()) + RespondError(w, apiErr, nil) + return + } + + if err := validateQueryRangeParamsV3(queryRangeParams); err != nil { + zap.L().Error(err.Error()) + RespondError(w, apiErr, nil) + return + } + + var result []*v3.Result + var errQuriesByName map[string]error + + result, errQuriesByName, err = aH.querierV2.QueryRange(r.Context(), queryRangeParams, nil) + if err != nil { + apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err} + RespondError(w, apiErrObj, errQuriesByName) + return + } + result = postprocess.TransformToTableForClickHouseQueries(result) + + resp := v3.QueryRangeResponse{ + Result: result, + } + aH.Respond(w, resp) +} + +func (aH *APIHandler) getConsumerData( + w http.ResponseWriter, r *http.Request, +) { + messagingQueue, apiErr := ParseMessagingQueueParams(r) + + if apiErr != nil { + zap.L().Error(apiErr.Err.Error()) + RespondError(w, apiErr, nil) + return + } + + queryRangeParams, err := mq.BuildQueryRangeParams(messagingQueue, "consumer") + if err != nil { + zap.L().Error(err.Error()) + RespondError(w, apiErr, nil) + return + } + + if err := validateQueryRangeParamsV3(queryRangeParams); err != nil { + zap.L().Error(err.Error()) + RespondError(w, apiErr, nil) + return + } + + var result []*v3.Result + var errQuriesByName map[string]error + + result, errQuriesByName, err = aH.querierV2.QueryRange(r.Context(), queryRangeParams, nil) + if err != nil { + apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err} + RespondError(w, apiErrObj, errQuriesByName) + return + } + result = postprocess.TransformToTableForClickHouseQueries(result) + + resp := v3.QueryRangeResponse{ + Result: result, + } + aH.Respond(w, resp) +} + +// ParseMessagingQueueParams parse for messaging queue params +func ParseMessagingQueueParams(r *http.Request) (*mq.MessagingQueue, *model.ApiError) { + var messagingQueue *mq.MessagingQueue + if err := json.NewDecoder(r.Body).Decode(&messagingQueue); err != nil { + return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("cannot parse the request body: %v", err)} + } + return messagingQueue, nil +} + // Preferences -func (ah *APIHandler) getUserPreference( +func (aH *APIHandler) getUserPreference( w http.ResponseWriter, r *http.Request, ) { preferenceId := mux.Vars(r)["preferenceId"] @@ -2262,10 +2367,10 @@ func (ah *APIHandler) getUserPreference( return } - ah.Respond(w, preference) + aH.Respond(w, preference) } -func (ah *APIHandler) updateUserPreference( +func (aH *APIHandler) updateUserPreference( w http.ResponseWriter, r *http.Request, ) { preferenceId := mux.Vars(r)["preferenceId"] @@ -2284,10 +2389,10 @@ func (ah *APIHandler) updateUserPreference( return } - ah.Respond(w, preference) + aH.Respond(w, preference) } -func (ah *APIHandler) getAllUserPreferences( +func (aH *APIHandler) getAllUserPreferences( w http.ResponseWriter, r *http.Request, ) { user := common.GetUserFromContext(r.Context()) @@ -2299,10 +2404,10 @@ func (ah *APIHandler) getAllUserPreferences( return } - ah.Respond(w, preference) + aH.Respond(w, preference) } -func (ah *APIHandler) getOrgPreference( +func (aH *APIHandler) getOrgPreference( w http.ResponseWriter, r *http.Request, ) { preferenceId := mux.Vars(r)["preferenceId"] @@ -2315,10 +2420,10 @@ func (ah *APIHandler) getOrgPreference( return } - ah.Respond(w, preference) + aH.Respond(w, preference) } -func (ah *APIHandler) updateOrgPreference( +func (aH *APIHandler) updateOrgPreference( w http.ResponseWriter, r *http.Request, ) { preferenceId := mux.Vars(r)["preferenceId"] @@ -2337,10 +2442,10 @@ func (ah *APIHandler) updateOrgPreference( return } - ah.Respond(w, preference) + aH.Respond(w, preference) } -func (ah *APIHandler) getAllOrgPreferences( +func (aH *APIHandler) getAllOrgPreferences( w http.ResponseWriter, r *http.Request, ) { user := common.GetUserFromContext(r.Context()) @@ -2352,36 +2457,36 @@ func (ah *APIHandler) getAllOrgPreferences( return } - ah.Respond(w, preference) + aH.Respond(w, preference) } -// Integrations -func (ah *APIHandler) RegisterIntegrationRoutes(router *mux.Router, am *AuthMiddleware) { +// RegisterIntegrationRoutes Registers all Integrations +func (aH *APIHandler) RegisterIntegrationRoutes(router *mux.Router, am *AuthMiddleware) { subRouter := router.PathPrefix("/api/v1/integrations").Subrouter() subRouter.HandleFunc( - "/install", am.ViewAccess(ah.InstallIntegration), + "/install", am.ViewAccess(aH.InstallIntegration), ).Methods(http.MethodPost) subRouter.HandleFunc( - "/uninstall", am.ViewAccess(ah.UninstallIntegration), + "/uninstall", am.ViewAccess(aH.UninstallIntegration), ).Methods(http.MethodPost) // Used for polling for status in v0 subRouter.HandleFunc( - "/{integrationId}/connection_status", am.ViewAccess(ah.GetIntegrationConnectionStatus), + "/{integrationId}/connection_status", am.ViewAccess(aH.GetIntegrationConnectionStatus), ).Methods(http.MethodGet) subRouter.HandleFunc( - "/{integrationId}", am.ViewAccess(ah.GetIntegration), + "/{integrationId}", am.ViewAccess(aH.GetIntegration), ).Methods(http.MethodGet) subRouter.HandleFunc( - "", am.ViewAccess(ah.ListIntegrations), + "", am.ViewAccess(aH.ListIntegrations), ).Methods(http.MethodGet) } -func (ah *APIHandler) ListIntegrations( +func (aH *APIHandler) ListIntegrations( w http.ResponseWriter, r *http.Request, ) { params := map[string]string{} @@ -2389,21 +2494,21 @@ func (ah *APIHandler) ListIntegrations( params[k] = values[0] } - resp, apiErr := ah.IntegrationsController.ListIntegrations( + resp, apiErr := aH.IntegrationsController.ListIntegrations( r.Context(), params, ) if apiErr != nil { RespondError(w, apiErr, "Failed to fetch integrations") return } - ah.Respond(w, resp) + aH.Respond(w, resp) } -func (ah *APIHandler) GetIntegration( +func (aH *APIHandler) GetIntegration( w http.ResponseWriter, r *http.Request, ) { integrationId := mux.Vars(r)["integrationId"] - integration, apiErr := ah.IntegrationsController.GetIntegration( + integration, apiErr := aH.IntegrationsController.GetIntegration( r.Context(), integrationId, ) if apiErr != nil { @@ -2411,14 +2516,14 @@ func (ah *APIHandler) GetIntegration( return } - ah.Respond(w, integration) + aH.Respond(w, integration) } -func (ah *APIHandler) GetIntegrationConnectionStatus( +func (aH *APIHandler) GetIntegrationConnectionStatus( w http.ResponseWriter, r *http.Request, ) { integrationId := mux.Vars(r)["integrationId"] - isInstalled, apiErr := ah.IntegrationsController.IsIntegrationInstalled( + isInstalled, apiErr := aH.IntegrationsController.IsIntegrationInstalled( r.Context(), integrationId, ) if apiErr != nil { @@ -2428,11 +2533,11 @@ func (ah *APIHandler) GetIntegrationConnectionStatus( // Do not spend resources calculating connection status unless installed. if !isInstalled { - ah.Respond(w, &integrations.IntegrationConnectionStatus{}) + aH.Respond(w, &integrations.IntegrationConnectionStatus{}) return } - connectionTests, apiErr := ah.IntegrationsController.GetIntegrationConnectionTests( + connectionTests, apiErr := aH.IntegrationsController.GetIntegrationConnectionTests( r.Context(), integrationId, ) if apiErr != nil { @@ -2446,7 +2551,7 @@ func (ah *APIHandler) GetIntegrationConnectionStatus( lookbackSeconds = 15 * 60 } - connectionStatus, apiErr := ah.calculateConnectionStatus( + connectionStatus, apiErr := aH.calculateConnectionStatus( r.Context(), connectionTests, lookbackSeconds, ) if apiErr != nil { @@ -2454,10 +2559,10 @@ func (ah *APIHandler) GetIntegrationConnectionStatus( return } - ah.Respond(w, connectionStatus) + aH.Respond(w, connectionStatus) } -func (ah *APIHandler) calculateConnectionStatus( +func (aH *APIHandler) calculateConnectionStatus( ctx context.Context, connectionTests *integrations.IntegrationConnectionTests, lookbackSeconds int64, @@ -2475,7 +2580,7 @@ func (ah *APIHandler) calculateConnectionStatus( go func() { defer wg.Done() - logsConnStatus, apiErr := ah.calculateLogsConnectionStatus( + logsConnStatus, apiErr := aH.calculateLogsConnectionStatus( ctx, connectionTests.Logs, lookbackSeconds, ) @@ -2498,7 +2603,7 @@ func (ah *APIHandler) calculateConnectionStatus( return } - statusForLastReceivedMetric, apiErr := ah.reader.GetLatestReceivedMetric( + statusForLastReceivedMetric, apiErr := aH.reader.GetLatestReceivedMetric( ctx, connectionTests.Metrics, ) @@ -2542,7 +2647,7 @@ func (ah *APIHandler) calculateConnectionStatus( return result, nil } -func (ah *APIHandler) calculateLogsConnectionStatus( +func (aH *APIHandler) calculateLogsConnectionStatus( ctx context.Context, logsConnectionTest *integrations.LogsConnectionTest, lookbackSeconds int64, @@ -2584,7 +2689,7 @@ func (ah *APIHandler) calculateLogsConnectionStatus( }, }, } - queryRes, _, err := ah.querier.QueryRange( + queryRes, _, err := aH.querier.QueryRange( ctx, qrParams, map[string]v3.AttributeKey{}, ) if err != nil { @@ -2621,7 +2726,7 @@ func (ah *APIHandler) calculateLogsConnectionStatus( return nil, nil } -func (ah *APIHandler) InstallIntegration( +func (aH *APIHandler) InstallIntegration( w http.ResponseWriter, r *http.Request, ) { req := integrations.InstallIntegrationRequest{} @@ -2632,7 +2737,7 @@ func (ah *APIHandler) InstallIntegration( return } - integration, apiErr := ah.IntegrationsController.Install( + integration, apiErr := aH.IntegrationsController.Install( r.Context(), &req, ) if apiErr != nil { @@ -2640,10 +2745,10 @@ func (ah *APIHandler) InstallIntegration( return } - ah.Respond(w, integration) + aH.Respond(w, integration) } -func (ah *APIHandler) UninstallIntegration( +func (aH *APIHandler) UninstallIntegration( w http.ResponseWriter, r *http.Request, ) { req := integrations.UninstallIntegrationRequest{} @@ -2654,13 +2759,13 @@ func (ah *APIHandler) UninstallIntegration( return } - apiErr := ah.IntegrationsController.Uninstall(r.Context(), &req) + apiErr := aH.IntegrationsController.Uninstall(r.Context(), &req) if apiErr != nil { RespondError(w, apiErr, nil) return } - ah.Respond(w, map[string]interface{}{}) + aH.Respond(w, map[string]interface{}{}) } // logs @@ -2807,7 +2912,7 @@ func parseAgentConfigVersion(r *http.Request) (int, *model.ApiError) { return int(version64), nil } -func (ah *APIHandler) PreviewLogsPipelinesHandler(w http.ResponseWriter, r *http.Request) { +func (aH *APIHandler) PreviewLogsPipelinesHandler(w http.ResponseWriter, r *http.Request) { req := logparsingpipeline.PipelinesPreviewRequest{} if err := json.NewDecoder(r.Body).Decode(&req); err != nil { @@ -2815,7 +2920,7 @@ func (ah *APIHandler) PreviewLogsPipelinesHandler(w http.ResponseWriter, r *http return } - resultLogs, apiErr := ah.LogsParsingPipelineController.PreviewLogsPipelines( + resultLogs, apiErr := aH.LogsParsingPipelineController.PreviewLogsPipelines( r.Context(), &req, ) @@ -2824,10 +2929,10 @@ func (ah *APIHandler) PreviewLogsPipelinesHandler(w http.ResponseWriter, r *http return } - ah.Respond(w, resultLogs) + aH.Respond(w, resultLogs) } -func (ah *APIHandler) ListLogsPipelinesHandler(w http.ResponseWriter, r *http.Request) { +func (aH *APIHandler) ListLogsPipelinesHandler(w http.ResponseWriter, r *http.Request) { version, err := parseAgentConfigVersion(r) if err != nil { @@ -2839,20 +2944,20 @@ func (ah *APIHandler) ListLogsPipelinesHandler(w http.ResponseWriter, r *http.Re var apierr *model.ApiError if version != -1 { - payload, apierr = ah.listLogsPipelinesByVersion(context.Background(), version) + payload, apierr = aH.listLogsPipelinesByVersion(context.Background(), version) } else { - payload, apierr = ah.listLogsPipelines(context.Background()) + payload, apierr = aH.listLogsPipelines(context.Background()) } if apierr != nil { RespondError(w, apierr, payload) return } - ah.Respond(w, payload) + aH.Respond(w, payload) } // listLogsPipelines lists logs piplines for latest version -func (ah *APIHandler) listLogsPipelines(ctx context.Context) ( +func (aH *APIHandler) listLogsPipelines(ctx context.Context) ( *logparsingpipeline.PipelinesResponse, *model.ApiError, ) { // get lateset agent config @@ -2866,7 +2971,7 @@ func (ah *APIHandler) listLogsPipelines(ctx context.Context) ( latestVersion = lastestConfig.Version } - payload, err := ah.LogsParsingPipelineController.GetPipelinesByVersion(ctx, latestVersion) + payload, err := aH.LogsParsingPipelineController.GetPipelinesByVersion(ctx, latestVersion) if err != nil { return nil, model.WrapApiError(err, "failed to get pipelines") } @@ -2882,10 +2987,10 @@ func (ah *APIHandler) listLogsPipelines(ctx context.Context) ( } // listLogsPipelinesByVersion lists pipelines along with config version history -func (ah *APIHandler) listLogsPipelinesByVersion(ctx context.Context, version int) ( +func (aH *APIHandler) listLogsPipelinesByVersion(ctx context.Context, version int) ( *logparsingpipeline.PipelinesResponse, *model.ApiError, ) { - payload, err := ah.LogsParsingPipelineController.GetPipelinesByVersion(ctx, version) + payload, err := aH.LogsParsingPipelineController.GetPipelinesByVersion(ctx, version) if err != nil { return nil, model.WrapApiError(err, "failed to get pipelines by version") } @@ -2901,7 +3006,7 @@ func (ah *APIHandler) listLogsPipelinesByVersion(ctx context.Context, version in return payload, nil } -func (ah *APIHandler) CreateLogsPipeline(w http.ResponseWriter, r *http.Request) { +func (aH *APIHandler) CreateLogsPipeline(w http.ResponseWriter, r *http.Request) { req := logparsingpipeline.PostablePipelines{} @@ -2924,7 +3029,7 @@ func (ah *APIHandler) CreateLogsPipeline(w http.ResponseWriter, r *http.Request) } } - return ah.LogsParsingPipelineController.ApplyPipelines(ctx, postable) + return aH.LogsParsingPipelineController.ApplyPipelines(ctx, postable) } res, err := createPipeline(r.Context(), req.Pipelines) @@ -2933,7 +3038,7 @@ func (ah *APIHandler) CreateLogsPipeline(w http.ResponseWriter, r *http.Request) return } - ah.Respond(w, res) + aH.Respond(w, res) } func (aH *APIHandler) getSavedViews(w http.ResponseWriter, r *http.Request) { diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md b/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md new file mode 100644 index 0000000000..c3691ecc36 --- /dev/null +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md @@ -0,0 +1,251 @@ +## Consumer Lag feature break down + +### 1) Consumer Lag Graph + + +--- + +### 2) Consumer Group Details + +API endpoint: + +``` +POST /api/v1/messaging-queues/kafka/consumer-lag/consumer-details +``` + +```json +{ + "start": 1720685296000000000, + "end": 1721290096000000000, + "variables": { + "partition": "0", + "topic": "topic1" + } +} +``` + +response in query range format `series` +```json +{ + "status": "success", + "data": { + "resultType": "", + "result": [ + { + "queryName": "producer", + "series": [ + { + "labels": { + "error_rate": "0", + "p99_query.p99": "150.08830908000002", + "rps": "0.00016534391534391533", + "service_name": "producer-svc" + }, + "labelsArray": [ + { + "service_name": "producer-svc" + }, + { + "p99_query.p99": "150.08830908000002" + }, + { + "error_rate": "0" + }, + { + "rps": "0.00016534391534391533" + } + ], + "values": [] + } + ] + } + ] + } +} +``` +response in query range format `table` +```json +{ + "status": "success", + "data": { + "resultType": "", + "result": [ + { + "table": { + "columns": [ + { + "name": "consumer_group", + "queryName": "", + "isValueColumn": false + }, + { + "name": "service_name", + "queryName": "", + "isValueColumn": false + }, + { + "name": "p99", + "queryName": "", + "isValueColumn": false + }, + { + "name": "error_rate", + "queryName": "", + "isValueColumn": false + }, + { + "name": "throughput", + "queryName": "", + "isValueColumn": false + }, + { + "name": "avg_msg_size", + "queryName": "", + "isValueColumn": false + } + ], + "rows": [ + { + "data": { + "avg_msg_size": "0", + "consumer_group": "cg1", + "error_rate": "0", + "p99": "0.2942205100000016", + "service_name": "consumer-svc", + "throughput": "0.00016534391534391533" + } + }, + { + "data": { + "avg_msg_size": "0", + "consumer_group": "cg3", + "error_rate": "0", + "p99": "0.216600410000002", + "service_name": "consumer-svc", + "throughput": "0.00016534391534391533" + } + } + ] + } + } + ] + } +} +``` + + + +### 3) Producer Details + +API endpoint: + +``` +POST /api/v1/messaging-queues/kafka/consumer-lag/consumer-details +``` + +```json +{ + "start": 1720685296000000000, + "end": 1721290096000000000, + "variables": { + "partition": "0", + "topic": "topic1" + } +} +``` + +response in query range format `series` +```json +{ + "status": "success", + "data": { + "resultType": "", + "result": [ + { + "table": { + "columns": [ + { + "name": "service_name", + "queryName": "", + "isValueColumn": false + }, + { + "name": "p99_query.p99", + "queryName": "", + "isValueColumn": false + }, + { + "name": "error_rate", + "queryName": "", + "isValueColumn": false + }, + { + "name": "rps", + "queryName": "", + "isValueColumn": false + } + ], + "rows": [ + { + "data": { + "error_rate": "0", + "p99_query.p99": "150.08830908000002", + "rps": "0.00016534391534391533", + "service_name": "producer-svc" + } + } + ] + } + } + ] + } +} +``` +response in query range format `table` +```json +{ + "status": "success", + "data": { + "resultType": "", + "result": [ + { + "table": { + "columns": [ + { + "name": "service_name", + "queryName": "", + "isValueColumn": false + }, + { + "name": "p99_query.p99", + "queryName": "", + "isValueColumn": false + }, + { + "name": "error_rate", + "queryName": "", + "isValueColumn": false + }, + { + "name": "rps", + "queryName": "", + "isValueColumn": false + } + ], + "rows": [ + { + "data": { + "error_rate": "0", + "p99_query.p99": "150.08830908000002", + "rps": "0.00016534391534391533", + "service_name": "producer-svc" + } + } + ] + } + } + ] + } +} +``` + diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/model.go b/pkg/query-service/app/integrations/messagingQueues/kafka/model.go new file mode 100644 index 0000000000..61a504eaee --- /dev/null +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/model.go @@ -0,0 +1,7 @@ +package kafka + +type MessagingQueue struct { + Start int64 `json:"start"` + End int64 `json:"end"` + Variables map[string]string `json:"variables,omitempty"` +} diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go b/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go new file mode 100644 index 0000000000..b22cc3f9aa --- /dev/null +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go @@ -0,0 +1,165 @@ +package kafka + +import ( + "fmt" +) + +func generateConsumerSQL(start, end int64, topic, partition string) string { + query := fmt.Sprintf(` +WITH +-- Sub query for p99 calculation +p99_query AS ( + SELECT + stringTagMap['messaging.kafka.consumer.group'] as consumer_group, + serviceName, + quantile(0.99)(durationNano) / 1000000 as p99 + FROM signoz_traces.signoz_index_v2 + WHERE + timestamp >= '%d' + AND timestamp <= '%d' + AND kind = 5 + AND stringTagMap['messaging.destination.name'] = '%s' + AND stringTagMap['messaging.destination.partition.id'] = '%s' + GROUP BY consumer_group, serviceName +), + +-- Sub query for RPS calculation +rps_query AS ( + SELECT + stringTagMap['messaging.kafka.consumer.group'] AS consumer_group, + serviceName, + count(*) / ((%d - %d) / 1000000000) AS rps -- Convert nanoseconds to seconds + FROM signoz_traces.signoz_index_v2 + WHERE + timestamp >= '%d' + AND timestamp <= '%d' + AND kind = 5 + AND stringTagMap['messaging.destination.name'] = '%s' + AND stringTagMap['messaging.destination.partition.id'] = '%s' + GROUP BY consumer_group, serviceName +), + +-- Sub query for error rate calculation +error_rate_query AS ( + SELECT + stringTagMap['messaging.kafka.consumer.group'] AS consumer_group, + serviceName, + count(*) / ((%d - %d) / 1000000000) AS error_rate -- Convert nanoseconds to seconds + FROM signoz_traces.signoz_index_v2 + WHERE + timestamp >= '%d' + AND timestamp <= '%d' + AND statusCode = 2 + AND kind = 5 + AND stringTagMap['messaging.destination.name'] = '%s' + AND stringTagMap['messaging.destination.partition.id'] = '%s' + GROUP BY consumer_group, serviceName +), + +-- Sub query for average message size calculation +avg_msg_size_query AS ( + SELECT + stringTagMap['messaging.kafka.consumer.group'] AS consumer_group, + serviceName, + avg(numberTagMap['messaging.message.body.size']) AS avg_msg_size + FROM signoz_traces.signoz_index_v2 + WHERE + timestamp >= '%d' + AND timestamp <= '%d' + AND kind = 5 + AND stringTagMap['messaging.destination.name'] = '%s' + AND stringTagMap['messaging.destination.partition.id'] = '%s' + GROUP BY consumer_group, serviceName +) + +-- Main query to combine all metrics +SELECT + p99_query.consumer_group AS consumer_group, + p99_query.serviceName AS service_name, + p99_query.p99 AS p99, + COALESCE(error_rate_query.error_rate, 0) AS error_rate, + COALESCE(rps_query.rps, 0) AS throughput, + COALESCE(avg_msg_size_query.avg_msg_size, 0) AS avg_msg_size +FROM + p99_query + LEFT JOIN rps_query ON p99_query.consumer_group = rps_query.consumer_group + AND p99_query.serviceName = rps_query.serviceName + LEFT JOIN error_rate_query ON p99_query.consumer_group = error_rate_query.consumer_group + AND p99_query.serviceName = error_rate_query.serviceName + LEFT JOIN avg_msg_size_query ON p99_query.consumer_group = avg_msg_size_query.consumer_group + AND p99_query.serviceName = avg_msg_size_query.serviceName +ORDER BY + p99_query.consumer_group; +`, start, end, topic, partition, end, start, start, end, topic, partition, end, start, start, end, topic, partition, end, start, topic, partition) + return query +} + +func generateProducerSQL(start, end int64, topic, partition string) string { + query := fmt.Sprintf(` + +-- producer +WITH +-- Subquery for p99 calculation +p99_query AS ( + SELECT + serviceName, + quantile(0.99)(durationNano) / 1000000 as p99 + FROM signoz_traces.signoz_index_v2 + WHERE + timestamp >= '%d' + AND timestamp <= '%d' + AND kind = 4 + AND stringTagMap['messaging.destination.name'] = '%s' + AND stringTagMap['messaging.destination.partition.id'] = '%s' + GROUP BY serviceName +), + +-- Subquery for RPS calculation +rps_query AS ( + SELECT + serviceName, + count(*) / ((%d - %d) / 1000000000) as rps -- Convert nanoseconds to seconds + FROM signoz_traces.signoz_index_v2 + WHERE + timestamp >= '%d' + AND timestamp <= '%d' + AND kind = 4 + AND stringTagMap['messaging.destination.name'] = '%s' + AND stringTagMap['messaging.destination.partition.id'] = '%s' + GROUP BY serviceName +), + +-- Subquery for error rate calculation +error_rate_query AS ( + SELECT + serviceName, + count(*) / ((%d - %d) / 1000000000) as error_rate -- Convert nanoseconds to seconds + FROM signoz_traces.signoz_index_v2 + WHERE + timestamp >= '%d' + AND timestamp <= '%d' + AND statusCode = 2 + AND kind = 4 + AND stringTagMap['messaging.destination.name'] = '%s' + AND stringTagMap['messaging.destination.partition.id'] = '%s' + GROUP BY serviceName +) + +-- Main query to combine all metrics +SELECT + p99_query.serviceName AS service_name, + p99_query.p99, + COALESCE(error_rate_query.error_rate, 0) AS error_rate, + COALESCE(rps_query.rps, 0) AS rps +FROM + p99_query + LEFT JOIN + rps_query ON p99_query.serviceName = rps_query.serviceName + LEFT JOIN + error_rate_query ON p99_query.serviceName = error_rate_query.serviceName +ORDER BY + p99_query.serviceName; + +`, start, end, topic, partition, end, start, start, end, topic, partition, end, start, start, end, topic, partition) + return query +} diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go b/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go new file mode 100644 index 0000000000..c8e4abf9fa --- /dev/null +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go @@ -0,0 +1,101 @@ +package kafka + +import ( + "fmt" + + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" +) + +var defaultStepInterval int64 = 60 + +func BuildQueryRangeParams(messagingQueue *MessagingQueue, queryContext string) (*v3.QueryRangeParamsV3, error) { + + var cq *v3.CompositeQuery + if queryContext == "producer" { + chq, err := buildProducerClickHouseQuery(messagingQueue) + if err != nil { + return nil, err + } + cq, err = buildCompositeQueryProducer(chq) + + if err != nil { + return nil, err + } + } else if queryContext == "consumer" { + chq, err := buildConsumerClickHouseQuery(messagingQueue) + if err != nil { + return nil, err + } + cq, err = buildCompositeQueryConsumer(chq) + if err != nil { + return nil, err + } + } + + queryRangeParams := &v3.QueryRangeParamsV3{ + Start: messagingQueue.Start, + End: messagingQueue.End, + Step: defaultStepInterval, + CompositeQuery: cq, + Version: "v4", + FormatForWeb: true, + } + + return queryRangeParams, nil +} + +func buildProducerClickHouseQuery(messagingQueue *MessagingQueue) (*v3.ClickHouseQuery, error) { + start := messagingQueue.Start + end := messagingQueue.End + topic, ok := messagingQueue.Variables["topic"] + if !ok { + return nil, fmt.Errorf("invalid type for Topic") + } + + partition, ok := messagingQueue.Variables["partition"] + + if !ok { + return nil, fmt.Errorf("invalid type for Partition") + } + query := generateProducerSQL(start, end, topic, partition) + + return &v3.ClickHouseQuery{ + Query: query, + }, nil +} + +func buildConsumerClickHouseQuery(messagingQueue *MessagingQueue) (*v3.ClickHouseQuery, error) { + start := messagingQueue.Start + end := messagingQueue.End + topic, ok := messagingQueue.Variables["topic"] + if !ok { + return nil, fmt.Errorf("invalid type for Topic") + } + + partition, ok := messagingQueue.Variables["partition"] + + if !ok { + return nil, fmt.Errorf("invalid type for Partition") + } + query := generateConsumerSQL(start, end, topic, partition) + + return &v3.ClickHouseQuery{ + Query: query, + }, nil +} + +func buildCompositeQueryProducer(chq *v3.ClickHouseQuery) (*v3.CompositeQuery, error) { + return &v3.CompositeQuery{ + QueryType: v3.QueryTypeClickHouseSQL, + ClickHouseQueries: map[string]*v3.ClickHouseQuery{"producer": chq}, + PanelType: v3.PanelTypeTable, + }, nil +} + +func buildCompositeQueryConsumer(chq *v3.ClickHouseQuery) (*v3.CompositeQuery, error) { + return &v3.CompositeQuery{ + QueryType: v3.QueryTypeClickHouseSQL, + ClickHouseQueries: map[string]*v3.ClickHouseQuery{"consumer": chq}, + PanelType: v3.PanelTypeTable, + }, nil +} diff --git a/pkg/query-service/app/integrations/messagingQueues/readme.md b/pkg/query-service/app/integrations/messagingQueues/readme.md new file mode 100644 index 0000000000..5b9ae94c8b --- /dev/null +++ b/pkg/query-service/app/integrations/messagingQueues/readme.md @@ -0,0 +1,10 @@ +## Integreation: Messaging Queue + +This package contains the `api`, and `translation` logic to support messaging queue features. + +Currently supported queues: +1) Kafka + +For detailed setup, checkout our public docs for configuring: +1) Trace collection form Clients (Producer and Consumer) +2) Metrics collection from Kafka Brokers, Producers and Consumers diff --git a/pkg/query-service/app/server.go b/pkg/query-service/app/server.go index 5120cd0039..99fe166a70 100644 --- a/pkg/query-service/app/server.go +++ b/pkg/query-service/app/server.go @@ -295,6 +295,7 @@ func (s *Server) createPublicServer(api *APIHandler) (*http.Server, error) { api.RegisterIntegrationRoutes(r, am) api.RegisterQueryRangeV3Routes(r, am) api.RegisterQueryRangeV4Routes(r, am) + api.RegisterMessagingQueuesRoutes(r, am) c := cors.New(cors.Options{ AllowedOrigins: []string{"*"}, From 5c3ce146fa2468e914c08666cd3c6723489e3a2e Mon Sep 17 00:00:00 2001 From: shivanshu Date: Fri, 26 Jul 2024 13:02:45 +0530 Subject: [PATCH 2/5] chore: add queue type --- .../integrations/messagingQueues/kafka/model.go | 6 ++++-- .../app/integrations/messagingQueues/kafka/sql.go | 15 +++++++++++---- .../messagingQueues/kafka/translator.go | 15 +++++++++------ 3 files changed, 24 insertions(+), 12 deletions(-) diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/model.go b/pkg/query-service/app/integrations/messagingQueues/kafka/model.go index 61a504eaee..b24734cf48 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/model.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/model.go @@ -1,7 +1,9 @@ package kafka +const kafkaQueue = "kafka" + type MessagingQueue struct { - Start int64 `json:"start"` - End int64 `json:"end"` + Start int64 `json:"start"` + End int64 `json:"end"` Variables map[string]string `json:"variables,omitempty"` } diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go b/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go index b22cc3f9aa..552fdc5d7a 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go @@ -4,7 +4,7 @@ import ( "fmt" ) -func generateConsumerSQL(start, end int64, topic, partition string) string { +func generateConsumerSQL(start, end int64, topic, partition, queueType string) string { query := fmt.Sprintf(` WITH -- Sub query for p99 calculation @@ -18,6 +18,7 @@ p99_query AS ( timestamp >= '%d' AND timestamp <= '%d' AND kind = 5 + AND msgSystem = '%s' AND stringTagMap['messaging.destination.name'] = '%s' AND stringTagMap['messaging.destination.partition.id'] = '%s' GROUP BY consumer_group, serviceName @@ -34,6 +35,7 @@ rps_query AS ( timestamp >= '%d' AND timestamp <= '%d' AND kind = 5 + AND msgSystem = '%s' AND stringTagMap['messaging.destination.name'] = '%s' AND stringTagMap['messaging.destination.partition.id'] = '%s' GROUP BY consumer_group, serviceName @@ -51,6 +53,7 @@ error_rate_query AS ( AND timestamp <= '%d' AND statusCode = 2 AND kind = 5 + AND msgSystem = '%s' AND stringTagMap['messaging.destination.name'] = '%s' AND stringTagMap['messaging.destination.partition.id'] = '%s' GROUP BY consumer_group, serviceName @@ -67,6 +70,7 @@ avg_msg_size_query AS ( timestamp >= '%d' AND timestamp <= '%d' AND kind = 5 + AND msgSystem = '%s' AND stringTagMap['messaging.destination.name'] = '%s' AND stringTagMap['messaging.destination.partition.id'] = '%s' GROUP BY consumer_group, serviceName @@ -90,11 +94,11 @@ FROM AND p99_query.serviceName = avg_msg_size_query.serviceName ORDER BY p99_query.consumer_group; -`, start, end, topic, partition, end, start, start, end, topic, partition, end, start, start, end, topic, partition, end, start, topic, partition) +`, start, end, queueType, topic, partition, end, start, start, end, queueType, topic, partition, end, start, start, end, queueType, topic, partition, end, start, queueType, topic, partition) return query } -func generateProducerSQL(start, end int64, topic, partition string) string { +func generateProducerSQL(start, end int64, topic, partition, queueType string) string { query := fmt.Sprintf(` -- producer @@ -109,6 +113,7 @@ p99_query AS ( timestamp >= '%d' AND timestamp <= '%d' AND kind = 4 + AND msgSystem = '%s' AND stringTagMap['messaging.destination.name'] = '%s' AND stringTagMap['messaging.destination.partition.id'] = '%s' GROUP BY serviceName @@ -124,6 +129,7 @@ rps_query AS ( timestamp >= '%d' AND timestamp <= '%d' AND kind = 4 + AND msgSystem = '%s' AND stringTagMap['messaging.destination.name'] = '%s' AND stringTagMap['messaging.destination.partition.id'] = '%s' GROUP BY serviceName @@ -140,6 +146,7 @@ error_rate_query AS ( AND timestamp <= '%d' AND statusCode = 2 AND kind = 4 + AND msgSystem = '%s' AND stringTagMap['messaging.destination.name'] = '%s' AND stringTagMap['messaging.destination.partition.id'] = '%s' GROUP BY serviceName @@ -160,6 +167,6 @@ FROM ORDER BY p99_query.serviceName; -`, start, end, topic, partition, end, start, start, end, topic, partition, end, start, start, end, topic, partition) +`, start, end, queueType, topic, partition, end, start, start, end, queueType, topic, partition, end, start, start, end, queueType, topic, partition) return query } diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go b/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go index c8e4abf9fa..5cb67d7bf0 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go @@ -10,9 +10,12 @@ var defaultStepInterval int64 = 60 func BuildQueryRangeParams(messagingQueue *MessagingQueue, queryContext string) (*v3.QueryRangeParamsV3, error) { + // ToDo: propagate this through APIs when there are different handlers + queueType := kafkaQueue + var cq *v3.CompositeQuery if queryContext == "producer" { - chq, err := buildProducerClickHouseQuery(messagingQueue) + chq, err := buildProducerClickHouseQuery(messagingQueue, queueType) if err != nil { return nil, err } @@ -22,7 +25,7 @@ func BuildQueryRangeParams(messagingQueue *MessagingQueue, queryContext string) return nil, err } } else if queryContext == "consumer" { - chq, err := buildConsumerClickHouseQuery(messagingQueue) + chq, err := buildConsumerClickHouseQuery(messagingQueue, queueType) if err != nil { return nil, err } @@ -44,7 +47,7 @@ func BuildQueryRangeParams(messagingQueue *MessagingQueue, queryContext string) return queryRangeParams, nil } -func buildProducerClickHouseQuery(messagingQueue *MessagingQueue) (*v3.ClickHouseQuery, error) { +func buildProducerClickHouseQuery(messagingQueue *MessagingQueue, queueType string) (*v3.ClickHouseQuery, error) { start := messagingQueue.Start end := messagingQueue.End topic, ok := messagingQueue.Variables["topic"] @@ -57,14 +60,14 @@ func buildProducerClickHouseQuery(messagingQueue *MessagingQueue) (*v3.ClickHous if !ok { return nil, fmt.Errorf("invalid type for Partition") } - query := generateProducerSQL(start, end, topic, partition) + query := generateProducerSQL(start, end, topic, partition, queueType) return &v3.ClickHouseQuery{ Query: query, }, nil } -func buildConsumerClickHouseQuery(messagingQueue *MessagingQueue) (*v3.ClickHouseQuery, error) { +func buildConsumerClickHouseQuery(messagingQueue *MessagingQueue, queueType string) (*v3.ClickHouseQuery, error) { start := messagingQueue.Start end := messagingQueue.End topic, ok := messagingQueue.Variables["topic"] @@ -77,7 +80,7 @@ func buildConsumerClickHouseQuery(messagingQueue *MessagingQueue) (*v3.ClickHous if !ok { return nil, fmt.Errorf("invalid type for Partition") } - query := generateConsumerSQL(start, end, topic, partition) + query := generateConsumerSQL(start, end, topic, partition, queueType) return &v3.ClickHouseQuery{ Query: query, From 063c9adba60d33a4522c517d89b590cd8f2c80fa Mon Sep 17 00:00:00 2001 From: shivanshu Date: Fri, 26 Jul 2024 15:23:31 +0530 Subject: [PATCH 3/5] chore: pr-reviews --- pkg/query-service/app/http_handler.go | 12 ++-- .../messagingQueues/kafka/translator.go | 66 +++++-------------- 2 files changed, 22 insertions(+), 56 deletions(-) diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index b74cafabfb..729dbf77aa 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -2263,7 +2263,7 @@ func (aH *APIHandler) getProducerData( w http.ResponseWriter, r *http.Request, ) { // parse the query params to retrieve the messaging queue struct - messagingQueue, apiErr := ParseMessagingQueueParams(r) + messagingQueue, apiErr := ParseMessagingQueueBody(r) if apiErr != nil { zap.L().Error(apiErr.Err.Error()) @@ -2304,7 +2304,7 @@ func (aH *APIHandler) getProducerData( func (aH *APIHandler) getConsumerData( w http.ResponseWriter, r *http.Request, ) { - messagingQueue, apiErr := ParseMessagingQueueParams(r) + messagingQueue, apiErr := ParseMessagingQueueBody(r) if apiErr != nil { zap.L().Error(apiErr.Err.Error()) @@ -2342,10 +2342,10 @@ func (aH *APIHandler) getConsumerData( aH.Respond(w, resp) } -// ParseMessagingQueueParams parse for messaging queue params -func ParseMessagingQueueParams(r *http.Request) (*mq.MessagingQueue, *model.ApiError) { - var messagingQueue *mq.MessagingQueue - if err := json.NewDecoder(r.Body).Decode(&messagingQueue); err != nil { +// ParseMessagingQueueBody parse for messaging queue params +func ParseMessagingQueueBody(r *http.Request) (*mq.MessagingQueue, *model.ApiError) { + messagingQueue := new(mq.MessagingQueue) + if err := json.NewDecoder(r.Body).Decode(messagingQueue); err != nil { return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("cannot parse the request body: %v", err)} } return messagingQueue, nil diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go b/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go index 5cb67d7bf0..99760d7fbb 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go @@ -14,27 +14,15 @@ func BuildQueryRangeParams(messagingQueue *MessagingQueue, queryContext string) queueType := kafkaQueue var cq *v3.CompositeQuery - if queryContext == "producer" { - chq, err := buildProducerClickHouseQuery(messagingQueue, queueType) - if err != nil { - return nil, err - } - cq, err = buildCompositeQueryProducer(chq) - if err != nil { - return nil, err - } - } else if queryContext == "consumer" { - chq, err := buildConsumerClickHouseQuery(messagingQueue, queueType) - if err != nil { - return nil, err - } - cq, err = buildCompositeQueryConsumer(chq) - if err != nil { - return nil, err - } + chq, err := buildClickHouseQuery(messagingQueue, queueType, queryContext) + + if err != nil { + return nil, err } + cq, err = buildCompositeQuery(chq, queryContext) + queryRangeParams := &v3.QueryRangeParamsV3{ Start: messagingQueue.Start, End: messagingQueue.End, @@ -47,7 +35,7 @@ func BuildQueryRangeParams(messagingQueue *MessagingQueue, queryContext string) return queryRangeParams, nil } -func buildProducerClickHouseQuery(messagingQueue *MessagingQueue, queueType string) (*v3.ClickHouseQuery, error) { +func buildClickHouseQuery(messagingQueue *MessagingQueue, queueType string, queryContext string) (*v3.ClickHouseQuery, error) { start := messagingQueue.Start end := messagingQueue.End topic, ok := messagingQueue.Variables["topic"] @@ -60,45 +48,23 @@ func buildProducerClickHouseQuery(messagingQueue *MessagingQueue, queueType stri if !ok { return nil, fmt.Errorf("invalid type for Partition") } - query := generateProducerSQL(start, end, topic, partition, queueType) + + var query string + if queryContext == "producer" { + query = generateProducerSQL(start, end, topic, partition, queueType) + } else if queryContext == "consumer" { + query = generateConsumerSQL(start, end, topic, partition, queueType) + } return &v3.ClickHouseQuery{ Query: query, }, nil } -func buildConsumerClickHouseQuery(messagingQueue *MessagingQueue, queueType string) (*v3.ClickHouseQuery, error) { - start := messagingQueue.Start - end := messagingQueue.End - topic, ok := messagingQueue.Variables["topic"] - if !ok { - return nil, fmt.Errorf("invalid type for Topic") - } - - partition, ok := messagingQueue.Variables["partition"] - - if !ok { - return nil, fmt.Errorf("invalid type for Partition") - } - query := generateConsumerSQL(start, end, topic, partition, queueType) - - return &v3.ClickHouseQuery{ - Query: query, - }, nil -} - -func buildCompositeQueryProducer(chq *v3.ClickHouseQuery) (*v3.CompositeQuery, error) { +func buildCompositeQuery(chq *v3.ClickHouseQuery, queryContext string) (*v3.CompositeQuery, error) { return &v3.CompositeQuery{ QueryType: v3.QueryTypeClickHouseSQL, - ClickHouseQueries: map[string]*v3.ClickHouseQuery{"producer": chq}, - PanelType: v3.PanelTypeTable, - }, nil -} - -func buildCompositeQueryConsumer(chq *v3.ClickHouseQuery) (*v3.CompositeQuery, error) { - return &v3.CompositeQuery{ - QueryType: v3.QueryTypeClickHouseSQL, - ClickHouseQueries: map[string]*v3.ClickHouseQuery{"consumer": chq}, + ClickHouseQueries: map[string]*v3.ClickHouseQuery{queryContext: chq}, PanelType: v3.PanelTypeTable, }, nil } From 3ff0aa4b4bc271d5fa83d433e8d7afadbe433a1a Mon Sep 17 00:00:00 2001 From: shivanshu Date: Wed, 31 Jul 2024 17:55:13 +0530 Subject: [PATCH 4/5] chore: consumer group filtering --- .../messagingQueues/kafka/consumerLag.md | 70 +++---------------- .../integrations/messagingQueues/kafka/sql.go | 37 +++++----- .../messagingQueues/kafka/translator.go | 8 ++- 3 files changed, 32 insertions(+), 83 deletions(-) diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md b/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md index c3691ecc36..5e12a87e76 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md @@ -19,52 +19,14 @@ POST /api/v1/messaging-queues/kafka/consumer-lag/consumer-details "end": 1721290096000000000, "variables": { "partition": "0", - "topic": "topic1" + "topic": "topic1", + "consumer_group": "cg1" } } ``` response in query range format `series` ```json -{ - "status": "success", - "data": { - "resultType": "", - "result": [ - { - "queryName": "producer", - "series": [ - { - "labels": { - "error_rate": "0", - "p99_query.p99": "150.08830908000002", - "rps": "0.00016534391534391533", - "service_name": "producer-svc" - }, - "labelsArray": [ - { - "service_name": "producer-svc" - }, - { - "p99_query.p99": "150.08830908000002" - }, - { - "error_rate": "0" - }, - { - "rps": "0.00016534391534391533" - } - ], - "values": [] - } - ] - } - ] - } -} -``` -response in query range format `table` -```json { "status": "success", "data": { @@ -73,11 +35,6 @@ response in query range format `table` { "table": { "columns": [ - { - "name": "consumer_group", - "queryName": "", - "isValueColumn": false - }, { "name": "service_name", "queryName": "", @@ -108,22 +65,11 @@ response in query range format `table` { "data": { "avg_msg_size": "0", - "consumer_group": "cg1", "error_rate": "0", "p99": "0.2942205100000016", "service_name": "consumer-svc", "throughput": "0.00016534391534391533" } - }, - { - "data": { - "avg_msg_size": "0", - "consumer_group": "cg3", - "error_rate": "0", - "p99": "0.216600410000002", - "service_name": "consumer-svc", - "throughput": "0.00016534391534391533" - } } ] } @@ -145,12 +91,12 @@ POST /api/v1/messaging-queues/kafka/consumer-lag/consumer-details ```json { - "start": 1720685296000000000, - "end": 1721290096000000000, - "variables": { - "partition": "0", - "topic": "topic1" - } + "start": 1720685296000000000, + "end": 1721290096000000000, + "variables": { + "partition": "0", + "topic": "topic1" + } } ``` diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go b/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go index 552fdc5d7a..f479ea5ac9 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go @@ -4,13 +4,12 @@ import ( "fmt" ) -func generateConsumerSQL(start, end int64, topic, partition, queueType string) string { +func generateConsumerSQL(start, end int64, topic, partition, consumerGroup, queueType string) string { query := fmt.Sprintf(` WITH -- Sub query for p99 calculation p99_query AS ( SELECT - stringTagMap['messaging.kafka.consumer.group'] as consumer_group, serviceName, quantile(0.99)(durationNano) / 1000000 as p99 FROM signoz_traces.signoz_index_v2 @@ -21,13 +20,13 @@ p99_query AS ( AND msgSystem = '%s' AND stringTagMap['messaging.destination.name'] = '%s' AND stringTagMap['messaging.destination.partition.id'] = '%s' - GROUP BY consumer_group, serviceName + AND stringTagMap['messaging.kafka.consumer.group'] = '%s' + GROUP BY serviceName ), -- Sub query for RPS calculation rps_query AS ( SELECT - stringTagMap['messaging.kafka.consumer.group'] AS consumer_group, serviceName, count(*) / ((%d - %d) / 1000000000) AS rps -- Convert nanoseconds to seconds FROM signoz_traces.signoz_index_v2 @@ -38,13 +37,13 @@ rps_query AS ( AND msgSystem = '%s' AND stringTagMap['messaging.destination.name'] = '%s' AND stringTagMap['messaging.destination.partition.id'] = '%s' - GROUP BY consumer_group, serviceName + AND stringTagMap['messaging.kafka.consumer.group'] = '%s' + GROUP BY serviceName ), -- Sub query for error rate calculation error_rate_query AS ( SELECT - stringTagMap['messaging.kafka.consumer.group'] AS consumer_group, serviceName, count(*) / ((%d - %d) / 1000000000) AS error_rate -- Convert nanoseconds to seconds FROM signoz_traces.signoz_index_v2 @@ -56,13 +55,13 @@ error_rate_query AS ( AND msgSystem = '%s' AND stringTagMap['messaging.destination.name'] = '%s' AND stringTagMap['messaging.destination.partition.id'] = '%s' - GROUP BY consumer_group, serviceName + AND stringTagMap['messaging.kafka.consumer.group'] = '%s' + GROUP BY serviceName ), -- Sub query for average message size calculation avg_msg_size_query AS ( SELECT - stringTagMap['messaging.kafka.consumer.group'] AS consumer_group, serviceName, avg(numberTagMap['messaging.message.body.size']) AS avg_msg_size FROM signoz_traces.signoz_index_v2 @@ -73,12 +72,12 @@ avg_msg_size_query AS ( AND msgSystem = '%s' AND stringTagMap['messaging.destination.name'] = '%s' AND stringTagMap['messaging.destination.partition.id'] = '%s' - GROUP BY consumer_group, serviceName + AND stringTagMap['messaging.kafka.consumer.group'] = '%s' + GROUP BY serviceName ) -- Main query to combine all metrics SELECT - p99_query.consumer_group AS consumer_group, p99_query.serviceName AS service_name, p99_query.p99 AS p99, COALESCE(error_rate_query.error_rate, 0) AS error_rate, @@ -86,15 +85,14 @@ SELECT COALESCE(avg_msg_size_query.avg_msg_size, 0) AS avg_msg_size FROM p99_query - LEFT JOIN rps_query ON p99_query.consumer_group = rps_query.consumer_group - AND p99_query.serviceName = rps_query.serviceName - LEFT JOIN error_rate_query ON p99_query.consumer_group = error_rate_query.consumer_group - AND p99_query.serviceName = error_rate_query.serviceName - LEFT JOIN avg_msg_size_query ON p99_query.consumer_group = avg_msg_size_query.consumer_group - AND p99_query.serviceName = avg_msg_size_query.serviceName + LEFT JOIN rps_query ON p99_query.serviceName = rps_query.serviceName + LEFT JOIN error_rate_query ON p99_query.serviceName = error_rate_query.serviceName + LEFT JOIN avg_msg_size_query ON p99_query.serviceName = avg_msg_size_query.serviceName ORDER BY - p99_query.consumer_group; -`, start, end, queueType, topic, partition, end, start, start, end, queueType, topic, partition, end, start, start, end, queueType, topic, partition, end, start, queueType, topic, partition) + p99_query.serviceName; +`, start, end, queueType, topic, partition, consumerGroup, end, start, start, end, queueType, topic, + partition, consumerGroup, end, start, start, end, queueType, topic, partition, + consumerGroup, end, start, queueType, topic, partition, consumerGroup) return query } @@ -167,6 +165,7 @@ FROM ORDER BY p99_query.serviceName; -`, start, end, queueType, topic, partition, end, start, start, end, queueType, topic, partition, end, start, start, end, queueType, topic, partition) +`, start, end, queueType, topic, partition, end, start, start, end, queueType, topic, + partition, end, start, start, end, queueType, topic, partition) return query } diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go b/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go index 99760d7fbb..98414ebf0f 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go @@ -44,16 +44,20 @@ func buildClickHouseQuery(messagingQueue *MessagingQueue, queueType string, quer } partition, ok := messagingQueue.Variables["partition"] - if !ok { return nil, fmt.Errorf("invalid type for Partition") } + consumerGroup, ok := messagingQueue.Variables["consumer_group"] + if !ok { + return nil, fmt.Errorf("invalid type for consumer group") + } + var query string if queryContext == "producer" { query = generateProducerSQL(start, end, topic, partition, queueType) } else if queryContext == "consumer" { - query = generateConsumerSQL(start, end, topic, partition, queueType) + query = generateConsumerSQL(start, end, topic, partition, consumerGroup, queueType) } return &v3.ClickHouseQuery{ From c957c0f75728b65fb8ab32d7778802bf1a042df6 Mon Sep 17 00:00:00 2001 From: shivanshu Date: Mon, 5 Aug 2024 18:14:40 +0530 Subject: [PATCH 5/5] chore: addressing review comments --- pkg/query-service/app/http_handler.go | 1 - .../messagingQueues/kafka/consumerLag.md | 2 +- .../integrations/messagingQueues/kafka/sql.go | 167 ++++-------------- 3 files changed, 37 insertions(+), 133 deletions(-) diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 729dbf77aa..29cb807686 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -2252,7 +2252,6 @@ func (aH *APIHandler) RegisterMessagingQueuesRoutes(router *mux.Router, am *Auth // SubRouter for kafka kafkaSubRouter := router.PathPrefix("/api/v1/messaging-queues/kafka/consumer-lag").Subrouter() - //kafkaSubRouter.HandleFunc("/consumer-lag", am.ViewAccess(aH.QueryRangeV4)).Methods(http.MethodPost) kafkaSubRouter.HandleFunc("/producer-details", am.ViewAccess(aH.getProducerData)).Methods(http.MethodPost) kafkaSubRouter.HandleFunc("/consumer-details", am.ViewAccess(aH.getConsumerData)).Methods(http.MethodPost) diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md b/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md index 5e12a87e76..c34bc7ad64 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md @@ -86,7 +86,7 @@ response in query range format `series` API endpoint: ``` -POST /api/v1/messaging-queues/kafka/consumer-lag/consumer-details +POST /api/v1/messaging-queues/kafka/consumer-lag/producer-details ``` ```json diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go b/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go index f479ea5ac9..e06e35efde 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go @@ -5,167 +5,72 @@ import ( ) func generateConsumerSQL(start, end int64, topic, partition, consumerGroup, queueType string) string { + timeRange := (end - start) / 1000000000 query := fmt.Sprintf(` -WITH --- Sub query for p99 calculation -p99_query AS ( +WITH consumer_query AS ( SELECT serviceName, - quantile(0.99)(durationNano) / 1000000 as p99 - FROM signoz_traces.signoz_index_v2 - WHERE - timestamp >= '%d' - AND timestamp <= '%d' - AND kind = 5 - AND msgSystem = '%s' - AND stringTagMap['messaging.destination.name'] = '%s' - AND stringTagMap['messaging.destination.partition.id'] = '%s' - AND stringTagMap['messaging.kafka.consumer.group'] = '%s' - GROUP BY serviceName -), - --- Sub query for RPS calculation -rps_query AS ( - SELECT - serviceName, - count(*) / ((%d - %d) / 1000000000) AS rps -- Convert nanoseconds to seconds - FROM signoz_traces.signoz_index_v2 + quantile(0.99)(durationNano) / 1000000 AS p99, + COUNT(*) AS total_requests, + SUM(CASE WHEN statusCode = 2 THEN 1 ELSE 0 END) AS error_count, + avg(CASE WHEN has(numberTagMap, 'messaging.message.body.size') THEN numberTagMap['messaging.message.body.size'] ELSE NULL END) AS avg_msg_size + FROM signoz_traces.distributed_signoz_index_v2 WHERE timestamp >= '%d' AND timestamp <= '%d' AND kind = 5 - AND msgSystem = '%s' + AND msgSystem = '%s' AND stringTagMap['messaging.destination.name'] = '%s' AND stringTagMap['messaging.destination.partition.id'] = '%s' - AND stringTagMap['messaging.kafka.consumer.group'] = '%s' - GROUP BY serviceName -), - --- Sub query for error rate calculation -error_rate_query AS ( - SELECT - serviceName, - count(*) / ((%d - %d) / 1000000000) AS error_rate -- Convert nanoseconds to seconds - FROM signoz_traces.signoz_index_v2 - WHERE - timestamp >= '%d' - AND timestamp <= '%d' - AND statusCode = 2 - AND kind = 5 - AND msgSystem = '%s' - AND stringTagMap['messaging.destination.name'] = '%s' - AND stringTagMap['messaging.destination.partition.id'] = '%s' - AND stringTagMap['messaging.kafka.consumer.group'] = '%s' - GROUP BY serviceName -), - --- Sub query for average message size calculation -avg_msg_size_query AS ( - SELECT - serviceName, - avg(numberTagMap['messaging.message.body.size']) AS avg_msg_size - FROM signoz_traces.signoz_index_v2 - WHERE - timestamp >= '%d' - AND timestamp <= '%d' - AND kind = 5 - AND msgSystem = '%s' - AND stringTagMap['messaging.destination.name'] = '%s' - AND stringTagMap['messaging.destination.partition.id'] = '%s' - AND stringTagMap['messaging.kafka.consumer.group'] = '%s' + AND stringTagMap['messaging.kafka.consumer.group'] = '%s' GROUP BY serviceName ) --- Main query to combine all metrics +-- Main query to select all metrics SELECT - p99_query.serviceName AS service_name, - p99_query.p99 AS p99, - COALESCE(error_rate_query.error_rate, 0) AS error_rate, - COALESCE(rps_query.rps, 0) AS throughput, - COALESCE(avg_msg_size_query.avg_msg_size, 0) AS avg_msg_size + serviceName AS service_name, + p99, + COALESCE((error_count * 100.0) / total_requests, 0) AS error_rate, + COALESCE(total_requests / %d, 0) AS throughput, -- Convert nanoseconds to seconds + COALESCE(avg_msg_size, 0) AS avg_msg_size FROM - p99_query - LEFT JOIN rps_query ON p99_query.serviceName = rps_query.serviceName - LEFT JOIN error_rate_query ON p99_query.serviceName = error_rate_query.serviceName - LEFT JOIN avg_msg_size_query ON p99_query.serviceName = avg_msg_size_query.serviceName + consumer_query ORDER BY - p99_query.serviceName; -`, start, end, queueType, topic, partition, consumerGroup, end, start, start, end, queueType, topic, - partition, consumerGroup, end, start, start, end, queueType, topic, partition, - consumerGroup, end, start, queueType, topic, partition, consumerGroup) + serviceName; +`, start, end, queueType, topic, partition, consumerGroup, timeRange) return query } func generateProducerSQL(start, end int64, topic, partition, queueType string) string { + timeRange := (end - start) / 1000000000 query := fmt.Sprintf(` - --- producer -WITH --- Subquery for p99 calculation -p99_query AS ( +WITH producer_query AS ( SELECT serviceName, - quantile(0.99)(durationNano) / 1000000 as p99 - FROM signoz_traces.signoz_index_v2 + quantile(0.99)(durationNano) / 1000000 AS p99, + count(*) AS total_count, + SUM(CASE WHEN statusCode = 2 THEN 1 ELSE 0 END) AS error_count + FROM signoz_traces.distributed_signoz_index_v2 WHERE timestamp >= '%d' - AND timestamp <= '%d' - AND kind = 4 - AND msgSystem = '%s' - AND stringTagMap['messaging.destination.name'] = '%s' - AND stringTagMap['messaging.destination.partition.id'] = '%s' - GROUP BY serviceName -), - --- Subquery for RPS calculation -rps_query AS ( - SELECT - serviceName, - count(*) / ((%d - %d) / 1000000000) as rps -- Convert nanoseconds to seconds - FROM signoz_traces.signoz_index_v2 - WHERE - timestamp >= '%d' - AND timestamp <= '%d' - AND kind = 4 - AND msgSystem = '%s' - AND stringTagMap['messaging.destination.name'] = '%s' - AND stringTagMap['messaging.destination.partition.id'] = '%s' - GROUP BY serviceName -), - --- Subquery for error rate calculation -error_rate_query AS ( - SELECT - serviceName, - count(*) / ((%d - %d) / 1000000000) as error_rate -- Convert nanoseconds to seconds - FROM signoz_traces.signoz_index_v2 - WHERE - timestamp >= '%d' - AND timestamp <= '%d' - AND statusCode = 2 - AND kind = 4 - AND msgSystem = '%s' - AND stringTagMap['messaging.destination.name'] = '%s' - AND stringTagMap['messaging.destination.partition.id'] = '%s' + AND timestamp <= '%d' + AND kind = 4 + AND msgSystem = '%s' + AND stringTagMap['messaging.destination.name'] = '%s' + AND stringTagMap['messaging.destination.partition.id'] = '%s' GROUP BY serviceName ) --- Main query to combine all metrics SELECT - p99_query.serviceName AS service_name, - p99_query.p99, - COALESCE(error_rate_query.error_rate, 0) AS error_rate, - COALESCE(rps_query.rps, 0) AS rps + serviceName AS service_name, + p99, + COALESCE((error_count * 100.0) / total_count, 0) AS error_percentage, + COALESCE(total_count / %d, 0) AS rps -- Convert nanoseconds to seconds FROM - p99_query - LEFT JOIN - rps_query ON p99_query.serviceName = rps_query.serviceName - LEFT JOIN - error_rate_query ON p99_query.serviceName = error_rate_query.serviceName + producer_query ORDER BY - p99_query.serviceName; + serviceName; -`, start, end, queueType, topic, partition, end, start, start, end, queueType, topic, - partition, end, start, start, end, queueType, topic, partition) +`, start, end, queueType, topic, partition, timeRange) return query }