From 481bb6e8b8d68b40d5b6706b91bb78b71d59a3c7 Mon Sep 17 00:00:00 2001 From: shivanshu Date: Fri, 26 Jul 2024 11:50:02 +0530 Subject: [PATCH] feat: add consumer and producer APIs --- pkg/query-service/app/http_handler.go | 217 +++++++++++---- .../messagingQueues/kafka/consumerLag.md | 251 ++++++++++++++++++ .../messagingQueues/kafka/model.go | 7 + .../integrations/messagingQueues/kafka/sql.go | 165 ++++++++++++ .../messagingQueues/kafka/translator.go | 101 +++++++ .../integrations/messagingQueues/readme.md | 10 + pkg/query-service/app/server.go | 1 + 7 files changed, 696 insertions(+), 56 deletions(-) create mode 100644 pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md create mode 100644 pkg/query-service/app/integrations/messagingQueues/kafka/model.go create mode 100644 pkg/query-service/app/integrations/messagingQueues/kafka/sql.go create mode 100644 pkg/query-service/app/integrations/messagingQueues/kafka/translator.go create mode 100644 pkg/query-service/app/integrations/messagingQueues/readme.md diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 4eff84d50c..b74cafabfb 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -43,6 +43,7 @@ import ( "go.uber.org/zap" + mq "go.signoz.io/signoz/pkg/query-service/app/integrations/messagingQueues/kafka" "go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline" "go.signoz.io/signoz/pkg/query-service/dao" am "go.signoz.io/signoz/pkg/query-service/integrations/alertManager" @@ -2246,9 +2247,113 @@ func (aH *APIHandler) WriteJSON(w http.ResponseWriter, r *http.Request, response w.Write(resp) } +// RegisterMessagingQueuesRoutes adds messaging-queues routes +func (aH *APIHandler) RegisterMessagingQueuesRoutes(router *mux.Router, am *AuthMiddleware) { + // SubRouter for kafka + kafkaSubRouter := router.PathPrefix("/api/v1/messaging-queues/kafka/consumer-lag").Subrouter() + + //kafkaSubRouter.HandleFunc("/consumer-lag", am.ViewAccess(aH.QueryRangeV4)).Methods(http.MethodPost) + kafkaSubRouter.HandleFunc("/producer-details", am.ViewAccess(aH.getProducerData)).Methods(http.MethodPost) + kafkaSubRouter.HandleFunc("/consumer-details", am.ViewAccess(aH.getConsumerData)).Methods(http.MethodPost) + + // for other messaging queues, add SubRouters here +} + +func (aH *APIHandler) getProducerData( + w http.ResponseWriter, r *http.Request, +) { + // parse the query params to retrieve the messaging queue struct + messagingQueue, apiErr := ParseMessagingQueueParams(r) + + if apiErr != nil { + zap.L().Error(apiErr.Err.Error()) + RespondError(w, apiErr, nil) + return + } + + queryRangeParams, err := mq.BuildQueryRangeParams(messagingQueue, "producer") + if err != nil { + zap.L().Error(err.Error()) + RespondError(w, apiErr, nil) + return + } + + if err := validateQueryRangeParamsV3(queryRangeParams); err != nil { + zap.L().Error(err.Error()) + RespondError(w, apiErr, nil) + return + } + + var result []*v3.Result + var errQuriesByName map[string]error + + result, errQuriesByName, err = aH.querierV2.QueryRange(r.Context(), queryRangeParams, nil) + if err != nil { + apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err} + RespondError(w, apiErrObj, errQuriesByName) + return + } + result = postprocess.TransformToTableForClickHouseQueries(result) + + resp := v3.QueryRangeResponse{ + Result: result, + } + aH.Respond(w, resp) +} + +func (aH *APIHandler) getConsumerData( + w http.ResponseWriter, r *http.Request, +) { + messagingQueue, apiErr := ParseMessagingQueueParams(r) + + if apiErr != nil { + zap.L().Error(apiErr.Err.Error()) + RespondError(w, apiErr, nil) + return + } + + queryRangeParams, err := mq.BuildQueryRangeParams(messagingQueue, "consumer") + if err != nil { + zap.L().Error(err.Error()) + RespondError(w, apiErr, nil) + return + } + + if err := validateQueryRangeParamsV3(queryRangeParams); err != nil { + zap.L().Error(err.Error()) + RespondError(w, apiErr, nil) + return + } + + var result []*v3.Result + var errQuriesByName map[string]error + + result, errQuriesByName, err = aH.querierV2.QueryRange(r.Context(), queryRangeParams, nil) + if err != nil { + apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err} + RespondError(w, apiErrObj, errQuriesByName) + return + } + result = postprocess.TransformToTableForClickHouseQueries(result) + + resp := v3.QueryRangeResponse{ + Result: result, + } + aH.Respond(w, resp) +} + +// ParseMessagingQueueParams parse for messaging queue params +func ParseMessagingQueueParams(r *http.Request) (*mq.MessagingQueue, *model.ApiError) { + var messagingQueue *mq.MessagingQueue + if err := json.NewDecoder(r.Body).Decode(&messagingQueue); err != nil { + return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("cannot parse the request body: %v", err)} + } + return messagingQueue, nil +} + // Preferences -func (ah *APIHandler) getUserPreference( +func (aH *APIHandler) getUserPreference( w http.ResponseWriter, r *http.Request, ) { preferenceId := mux.Vars(r)["preferenceId"] @@ -2262,10 +2367,10 @@ func (ah *APIHandler) getUserPreference( return } - ah.Respond(w, preference) + aH.Respond(w, preference) } -func (ah *APIHandler) updateUserPreference( +func (aH *APIHandler) updateUserPreference( w http.ResponseWriter, r *http.Request, ) { preferenceId := mux.Vars(r)["preferenceId"] @@ -2284,10 +2389,10 @@ func (ah *APIHandler) updateUserPreference( return } - ah.Respond(w, preference) + aH.Respond(w, preference) } -func (ah *APIHandler) getAllUserPreferences( +func (aH *APIHandler) getAllUserPreferences( w http.ResponseWriter, r *http.Request, ) { user := common.GetUserFromContext(r.Context()) @@ -2299,10 +2404,10 @@ func (ah *APIHandler) getAllUserPreferences( return } - ah.Respond(w, preference) + aH.Respond(w, preference) } -func (ah *APIHandler) getOrgPreference( +func (aH *APIHandler) getOrgPreference( w http.ResponseWriter, r *http.Request, ) { preferenceId := mux.Vars(r)["preferenceId"] @@ -2315,10 +2420,10 @@ func (ah *APIHandler) getOrgPreference( return } - ah.Respond(w, preference) + aH.Respond(w, preference) } -func (ah *APIHandler) updateOrgPreference( +func (aH *APIHandler) updateOrgPreference( w http.ResponseWriter, r *http.Request, ) { preferenceId := mux.Vars(r)["preferenceId"] @@ -2337,10 +2442,10 @@ func (ah *APIHandler) updateOrgPreference( return } - ah.Respond(w, preference) + aH.Respond(w, preference) } -func (ah *APIHandler) getAllOrgPreferences( +func (aH *APIHandler) getAllOrgPreferences( w http.ResponseWriter, r *http.Request, ) { user := common.GetUserFromContext(r.Context()) @@ -2352,36 +2457,36 @@ func (ah *APIHandler) getAllOrgPreferences( return } - ah.Respond(w, preference) + aH.Respond(w, preference) } -// Integrations -func (ah *APIHandler) RegisterIntegrationRoutes(router *mux.Router, am *AuthMiddleware) { +// RegisterIntegrationRoutes Registers all Integrations +func (aH *APIHandler) RegisterIntegrationRoutes(router *mux.Router, am *AuthMiddleware) { subRouter := router.PathPrefix("/api/v1/integrations").Subrouter() subRouter.HandleFunc( - "/install", am.ViewAccess(ah.InstallIntegration), + "/install", am.ViewAccess(aH.InstallIntegration), ).Methods(http.MethodPost) subRouter.HandleFunc( - "/uninstall", am.ViewAccess(ah.UninstallIntegration), + "/uninstall", am.ViewAccess(aH.UninstallIntegration), ).Methods(http.MethodPost) // Used for polling for status in v0 subRouter.HandleFunc( - "/{integrationId}/connection_status", am.ViewAccess(ah.GetIntegrationConnectionStatus), + "/{integrationId}/connection_status", am.ViewAccess(aH.GetIntegrationConnectionStatus), ).Methods(http.MethodGet) subRouter.HandleFunc( - "/{integrationId}", am.ViewAccess(ah.GetIntegration), + "/{integrationId}", am.ViewAccess(aH.GetIntegration), ).Methods(http.MethodGet) subRouter.HandleFunc( - "", am.ViewAccess(ah.ListIntegrations), + "", am.ViewAccess(aH.ListIntegrations), ).Methods(http.MethodGet) } -func (ah *APIHandler) ListIntegrations( +func (aH *APIHandler) ListIntegrations( w http.ResponseWriter, r *http.Request, ) { params := map[string]string{} @@ -2389,21 +2494,21 @@ func (ah *APIHandler) ListIntegrations( params[k] = values[0] } - resp, apiErr := ah.IntegrationsController.ListIntegrations( + resp, apiErr := aH.IntegrationsController.ListIntegrations( r.Context(), params, ) if apiErr != nil { RespondError(w, apiErr, "Failed to fetch integrations") return } - ah.Respond(w, resp) + aH.Respond(w, resp) } -func (ah *APIHandler) GetIntegration( +func (aH *APIHandler) GetIntegration( w http.ResponseWriter, r *http.Request, ) { integrationId := mux.Vars(r)["integrationId"] - integration, apiErr := ah.IntegrationsController.GetIntegration( + integration, apiErr := aH.IntegrationsController.GetIntegration( r.Context(), integrationId, ) if apiErr != nil { @@ -2411,14 +2516,14 @@ func (ah *APIHandler) GetIntegration( return } - ah.Respond(w, integration) + aH.Respond(w, integration) } -func (ah *APIHandler) GetIntegrationConnectionStatus( +func (aH *APIHandler) GetIntegrationConnectionStatus( w http.ResponseWriter, r *http.Request, ) { integrationId := mux.Vars(r)["integrationId"] - isInstalled, apiErr := ah.IntegrationsController.IsIntegrationInstalled( + isInstalled, apiErr := aH.IntegrationsController.IsIntegrationInstalled( r.Context(), integrationId, ) if apiErr != nil { @@ -2428,11 +2533,11 @@ func (ah *APIHandler) GetIntegrationConnectionStatus( // Do not spend resources calculating connection status unless installed. if !isInstalled { - ah.Respond(w, &integrations.IntegrationConnectionStatus{}) + aH.Respond(w, &integrations.IntegrationConnectionStatus{}) return } - connectionTests, apiErr := ah.IntegrationsController.GetIntegrationConnectionTests( + connectionTests, apiErr := aH.IntegrationsController.GetIntegrationConnectionTests( r.Context(), integrationId, ) if apiErr != nil { @@ -2446,7 +2551,7 @@ func (ah *APIHandler) GetIntegrationConnectionStatus( lookbackSeconds = 15 * 60 } - connectionStatus, apiErr := ah.calculateConnectionStatus( + connectionStatus, apiErr := aH.calculateConnectionStatus( r.Context(), connectionTests, lookbackSeconds, ) if apiErr != nil { @@ -2454,10 +2559,10 @@ func (ah *APIHandler) GetIntegrationConnectionStatus( return } - ah.Respond(w, connectionStatus) + aH.Respond(w, connectionStatus) } -func (ah *APIHandler) calculateConnectionStatus( +func (aH *APIHandler) calculateConnectionStatus( ctx context.Context, connectionTests *integrations.IntegrationConnectionTests, lookbackSeconds int64, @@ -2475,7 +2580,7 @@ func (ah *APIHandler) calculateConnectionStatus( go func() { defer wg.Done() - logsConnStatus, apiErr := ah.calculateLogsConnectionStatus( + logsConnStatus, apiErr := aH.calculateLogsConnectionStatus( ctx, connectionTests.Logs, lookbackSeconds, ) @@ -2498,7 +2603,7 @@ func (ah *APIHandler) calculateConnectionStatus( return } - statusForLastReceivedMetric, apiErr := ah.reader.GetLatestReceivedMetric( + statusForLastReceivedMetric, apiErr := aH.reader.GetLatestReceivedMetric( ctx, connectionTests.Metrics, ) @@ -2542,7 +2647,7 @@ func (ah *APIHandler) calculateConnectionStatus( return result, nil } -func (ah *APIHandler) calculateLogsConnectionStatus( +func (aH *APIHandler) calculateLogsConnectionStatus( ctx context.Context, logsConnectionTest *integrations.LogsConnectionTest, lookbackSeconds int64, @@ -2584,7 +2689,7 @@ func (ah *APIHandler) calculateLogsConnectionStatus( }, }, } - queryRes, _, err := ah.querier.QueryRange( + queryRes, _, err := aH.querier.QueryRange( ctx, qrParams, map[string]v3.AttributeKey{}, ) if err != nil { @@ -2621,7 +2726,7 @@ func (ah *APIHandler) calculateLogsConnectionStatus( return nil, nil } -func (ah *APIHandler) InstallIntegration( +func (aH *APIHandler) InstallIntegration( w http.ResponseWriter, r *http.Request, ) { req := integrations.InstallIntegrationRequest{} @@ -2632,7 +2737,7 @@ func (ah *APIHandler) InstallIntegration( return } - integration, apiErr := ah.IntegrationsController.Install( + integration, apiErr := aH.IntegrationsController.Install( r.Context(), &req, ) if apiErr != nil { @@ -2640,10 +2745,10 @@ func (ah *APIHandler) InstallIntegration( return } - ah.Respond(w, integration) + aH.Respond(w, integration) } -func (ah *APIHandler) UninstallIntegration( +func (aH *APIHandler) UninstallIntegration( w http.ResponseWriter, r *http.Request, ) { req := integrations.UninstallIntegrationRequest{} @@ -2654,13 +2759,13 @@ func (ah *APIHandler) UninstallIntegration( return } - apiErr := ah.IntegrationsController.Uninstall(r.Context(), &req) + apiErr := aH.IntegrationsController.Uninstall(r.Context(), &req) if apiErr != nil { RespondError(w, apiErr, nil) return } - ah.Respond(w, map[string]interface{}{}) + aH.Respond(w, map[string]interface{}{}) } // logs @@ -2807,7 +2912,7 @@ func parseAgentConfigVersion(r *http.Request) (int, *model.ApiError) { return int(version64), nil } -func (ah *APIHandler) PreviewLogsPipelinesHandler(w http.ResponseWriter, r *http.Request) { +func (aH *APIHandler) PreviewLogsPipelinesHandler(w http.ResponseWriter, r *http.Request) { req := logparsingpipeline.PipelinesPreviewRequest{} if err := json.NewDecoder(r.Body).Decode(&req); err != nil { @@ -2815,7 +2920,7 @@ func (ah *APIHandler) PreviewLogsPipelinesHandler(w http.ResponseWriter, r *http return } - resultLogs, apiErr := ah.LogsParsingPipelineController.PreviewLogsPipelines( + resultLogs, apiErr := aH.LogsParsingPipelineController.PreviewLogsPipelines( r.Context(), &req, ) @@ -2824,10 +2929,10 @@ func (ah *APIHandler) PreviewLogsPipelinesHandler(w http.ResponseWriter, r *http return } - ah.Respond(w, resultLogs) + aH.Respond(w, resultLogs) } -func (ah *APIHandler) ListLogsPipelinesHandler(w http.ResponseWriter, r *http.Request) { +func (aH *APIHandler) ListLogsPipelinesHandler(w http.ResponseWriter, r *http.Request) { version, err := parseAgentConfigVersion(r) if err != nil { @@ -2839,20 +2944,20 @@ func (ah *APIHandler) ListLogsPipelinesHandler(w http.ResponseWriter, r *http.Re var apierr *model.ApiError if version != -1 { - payload, apierr = ah.listLogsPipelinesByVersion(context.Background(), version) + payload, apierr = aH.listLogsPipelinesByVersion(context.Background(), version) } else { - payload, apierr = ah.listLogsPipelines(context.Background()) + payload, apierr = aH.listLogsPipelines(context.Background()) } if apierr != nil { RespondError(w, apierr, payload) return } - ah.Respond(w, payload) + aH.Respond(w, payload) } // listLogsPipelines lists logs piplines for latest version -func (ah *APIHandler) listLogsPipelines(ctx context.Context) ( +func (aH *APIHandler) listLogsPipelines(ctx context.Context) ( *logparsingpipeline.PipelinesResponse, *model.ApiError, ) { // get lateset agent config @@ -2866,7 +2971,7 @@ func (ah *APIHandler) listLogsPipelines(ctx context.Context) ( latestVersion = lastestConfig.Version } - payload, err := ah.LogsParsingPipelineController.GetPipelinesByVersion(ctx, latestVersion) + payload, err := aH.LogsParsingPipelineController.GetPipelinesByVersion(ctx, latestVersion) if err != nil { return nil, model.WrapApiError(err, "failed to get pipelines") } @@ -2882,10 +2987,10 @@ func (ah *APIHandler) listLogsPipelines(ctx context.Context) ( } // listLogsPipelinesByVersion lists pipelines along with config version history -func (ah *APIHandler) listLogsPipelinesByVersion(ctx context.Context, version int) ( +func (aH *APIHandler) listLogsPipelinesByVersion(ctx context.Context, version int) ( *logparsingpipeline.PipelinesResponse, *model.ApiError, ) { - payload, err := ah.LogsParsingPipelineController.GetPipelinesByVersion(ctx, version) + payload, err := aH.LogsParsingPipelineController.GetPipelinesByVersion(ctx, version) if err != nil { return nil, model.WrapApiError(err, "failed to get pipelines by version") } @@ -2901,7 +3006,7 @@ func (ah *APIHandler) listLogsPipelinesByVersion(ctx context.Context, version in return payload, nil } -func (ah *APIHandler) CreateLogsPipeline(w http.ResponseWriter, r *http.Request) { +func (aH *APIHandler) CreateLogsPipeline(w http.ResponseWriter, r *http.Request) { req := logparsingpipeline.PostablePipelines{} @@ -2924,7 +3029,7 @@ func (ah *APIHandler) CreateLogsPipeline(w http.ResponseWriter, r *http.Request) } } - return ah.LogsParsingPipelineController.ApplyPipelines(ctx, postable) + return aH.LogsParsingPipelineController.ApplyPipelines(ctx, postable) } res, err := createPipeline(r.Context(), req.Pipelines) @@ -2933,7 +3038,7 @@ func (ah *APIHandler) CreateLogsPipeline(w http.ResponseWriter, r *http.Request) return } - ah.Respond(w, res) + aH.Respond(w, res) } func (aH *APIHandler) getSavedViews(w http.ResponseWriter, r *http.Request) { diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md b/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md new file mode 100644 index 0000000000..c3691ecc36 --- /dev/null +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md @@ -0,0 +1,251 @@ +## Consumer Lag feature break down + +### 1) Consumer Lag Graph + + +--- + +### 2) Consumer Group Details + +API endpoint: + +``` +POST /api/v1/messaging-queues/kafka/consumer-lag/consumer-details +``` + +```json +{ + "start": 1720685296000000000, + "end": 1721290096000000000, + "variables": { + "partition": "0", + "topic": "topic1" + } +} +``` + +response in query range format `series` +```json +{ + "status": "success", + "data": { + "resultType": "", + "result": [ + { + "queryName": "producer", + "series": [ + { + "labels": { + "error_rate": "0", + "p99_query.p99": "150.08830908000002", + "rps": "0.00016534391534391533", + "service_name": "producer-svc" + }, + "labelsArray": [ + { + "service_name": "producer-svc" + }, + { + "p99_query.p99": "150.08830908000002" + }, + { + "error_rate": "0" + }, + { + "rps": "0.00016534391534391533" + } + ], + "values": [] + } + ] + } + ] + } +} +``` +response in query range format `table` +```json +{ + "status": "success", + "data": { + "resultType": "", + "result": [ + { + "table": { + "columns": [ + { + "name": "consumer_group", + "queryName": "", + "isValueColumn": false + }, + { + "name": "service_name", + "queryName": "", + "isValueColumn": false + }, + { + "name": "p99", + "queryName": "", + "isValueColumn": false + }, + { + "name": "error_rate", + "queryName": "", + "isValueColumn": false + }, + { + "name": "throughput", + "queryName": "", + "isValueColumn": false + }, + { + "name": "avg_msg_size", + "queryName": "", + "isValueColumn": false + } + ], + "rows": [ + { + "data": { + "avg_msg_size": "0", + "consumer_group": "cg1", + "error_rate": "0", + "p99": "0.2942205100000016", + "service_name": "consumer-svc", + "throughput": "0.00016534391534391533" + } + }, + { + "data": { + "avg_msg_size": "0", + "consumer_group": "cg3", + "error_rate": "0", + "p99": "0.216600410000002", + "service_name": "consumer-svc", + "throughput": "0.00016534391534391533" + } + } + ] + } + } + ] + } +} +``` + + + +### 3) Producer Details + +API endpoint: + +``` +POST /api/v1/messaging-queues/kafka/consumer-lag/consumer-details +``` + +```json +{ + "start": 1720685296000000000, + "end": 1721290096000000000, + "variables": { + "partition": "0", + "topic": "topic1" + } +} +``` + +response in query range format `series` +```json +{ + "status": "success", + "data": { + "resultType": "", + "result": [ + { + "table": { + "columns": [ + { + "name": "service_name", + "queryName": "", + "isValueColumn": false + }, + { + "name": "p99_query.p99", + "queryName": "", + "isValueColumn": false + }, + { + "name": "error_rate", + "queryName": "", + "isValueColumn": false + }, + { + "name": "rps", + "queryName": "", + "isValueColumn": false + } + ], + "rows": [ + { + "data": { + "error_rate": "0", + "p99_query.p99": "150.08830908000002", + "rps": "0.00016534391534391533", + "service_name": "producer-svc" + } + } + ] + } + } + ] + } +} +``` +response in query range format `table` +```json +{ + "status": "success", + "data": { + "resultType": "", + "result": [ + { + "table": { + "columns": [ + { + "name": "service_name", + "queryName": "", + "isValueColumn": false + }, + { + "name": "p99_query.p99", + "queryName": "", + "isValueColumn": false + }, + { + "name": "error_rate", + "queryName": "", + "isValueColumn": false + }, + { + "name": "rps", + "queryName": "", + "isValueColumn": false + } + ], + "rows": [ + { + "data": { + "error_rate": "0", + "p99_query.p99": "150.08830908000002", + "rps": "0.00016534391534391533", + "service_name": "producer-svc" + } + } + ] + } + } + ] + } +} +``` + diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/model.go b/pkg/query-service/app/integrations/messagingQueues/kafka/model.go new file mode 100644 index 0000000000..61a504eaee --- /dev/null +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/model.go @@ -0,0 +1,7 @@ +package kafka + +type MessagingQueue struct { + Start int64 `json:"start"` + End int64 `json:"end"` + Variables map[string]string `json:"variables,omitempty"` +} diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go b/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go new file mode 100644 index 0000000000..b22cc3f9aa --- /dev/null +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go @@ -0,0 +1,165 @@ +package kafka + +import ( + "fmt" +) + +func generateConsumerSQL(start, end int64, topic, partition string) string { + query := fmt.Sprintf(` +WITH +-- Sub query for p99 calculation +p99_query AS ( + SELECT + stringTagMap['messaging.kafka.consumer.group'] as consumer_group, + serviceName, + quantile(0.99)(durationNano) / 1000000 as p99 + FROM signoz_traces.signoz_index_v2 + WHERE + timestamp >= '%d' + AND timestamp <= '%d' + AND kind = 5 + AND stringTagMap['messaging.destination.name'] = '%s' + AND stringTagMap['messaging.destination.partition.id'] = '%s' + GROUP BY consumer_group, serviceName +), + +-- Sub query for RPS calculation +rps_query AS ( + SELECT + stringTagMap['messaging.kafka.consumer.group'] AS consumer_group, + serviceName, + count(*) / ((%d - %d) / 1000000000) AS rps -- Convert nanoseconds to seconds + FROM signoz_traces.signoz_index_v2 + WHERE + timestamp >= '%d' + AND timestamp <= '%d' + AND kind = 5 + AND stringTagMap['messaging.destination.name'] = '%s' + AND stringTagMap['messaging.destination.partition.id'] = '%s' + GROUP BY consumer_group, serviceName +), + +-- Sub query for error rate calculation +error_rate_query AS ( + SELECT + stringTagMap['messaging.kafka.consumer.group'] AS consumer_group, + serviceName, + count(*) / ((%d - %d) / 1000000000) AS error_rate -- Convert nanoseconds to seconds + FROM signoz_traces.signoz_index_v2 + WHERE + timestamp >= '%d' + AND timestamp <= '%d' + AND statusCode = 2 + AND kind = 5 + AND stringTagMap['messaging.destination.name'] = '%s' + AND stringTagMap['messaging.destination.partition.id'] = '%s' + GROUP BY consumer_group, serviceName +), + +-- Sub query for average message size calculation +avg_msg_size_query AS ( + SELECT + stringTagMap['messaging.kafka.consumer.group'] AS consumer_group, + serviceName, + avg(numberTagMap['messaging.message.body.size']) AS avg_msg_size + FROM signoz_traces.signoz_index_v2 + WHERE + timestamp >= '%d' + AND timestamp <= '%d' + AND kind = 5 + AND stringTagMap['messaging.destination.name'] = '%s' + AND stringTagMap['messaging.destination.partition.id'] = '%s' + GROUP BY consumer_group, serviceName +) + +-- Main query to combine all metrics +SELECT + p99_query.consumer_group AS consumer_group, + p99_query.serviceName AS service_name, + p99_query.p99 AS p99, + COALESCE(error_rate_query.error_rate, 0) AS error_rate, + COALESCE(rps_query.rps, 0) AS throughput, + COALESCE(avg_msg_size_query.avg_msg_size, 0) AS avg_msg_size +FROM + p99_query + LEFT JOIN rps_query ON p99_query.consumer_group = rps_query.consumer_group + AND p99_query.serviceName = rps_query.serviceName + LEFT JOIN error_rate_query ON p99_query.consumer_group = error_rate_query.consumer_group + AND p99_query.serviceName = error_rate_query.serviceName + LEFT JOIN avg_msg_size_query ON p99_query.consumer_group = avg_msg_size_query.consumer_group + AND p99_query.serviceName = avg_msg_size_query.serviceName +ORDER BY + p99_query.consumer_group; +`, start, end, topic, partition, end, start, start, end, topic, partition, end, start, start, end, topic, partition, end, start, topic, partition) + return query +} + +func generateProducerSQL(start, end int64, topic, partition string) string { + query := fmt.Sprintf(` + +-- producer +WITH +-- Subquery for p99 calculation +p99_query AS ( + SELECT + serviceName, + quantile(0.99)(durationNano) / 1000000 as p99 + FROM signoz_traces.signoz_index_v2 + WHERE + timestamp >= '%d' + AND timestamp <= '%d' + AND kind = 4 + AND stringTagMap['messaging.destination.name'] = '%s' + AND stringTagMap['messaging.destination.partition.id'] = '%s' + GROUP BY serviceName +), + +-- Subquery for RPS calculation +rps_query AS ( + SELECT + serviceName, + count(*) / ((%d - %d) / 1000000000) as rps -- Convert nanoseconds to seconds + FROM signoz_traces.signoz_index_v2 + WHERE + timestamp >= '%d' + AND timestamp <= '%d' + AND kind = 4 + AND stringTagMap['messaging.destination.name'] = '%s' + AND stringTagMap['messaging.destination.partition.id'] = '%s' + GROUP BY serviceName +), + +-- Subquery for error rate calculation +error_rate_query AS ( + SELECT + serviceName, + count(*) / ((%d - %d) / 1000000000) as error_rate -- Convert nanoseconds to seconds + FROM signoz_traces.signoz_index_v2 + WHERE + timestamp >= '%d' + AND timestamp <= '%d' + AND statusCode = 2 + AND kind = 4 + AND stringTagMap['messaging.destination.name'] = '%s' + AND stringTagMap['messaging.destination.partition.id'] = '%s' + GROUP BY serviceName +) + +-- Main query to combine all metrics +SELECT + p99_query.serviceName AS service_name, + p99_query.p99, + COALESCE(error_rate_query.error_rate, 0) AS error_rate, + COALESCE(rps_query.rps, 0) AS rps +FROM + p99_query + LEFT JOIN + rps_query ON p99_query.serviceName = rps_query.serviceName + LEFT JOIN + error_rate_query ON p99_query.serviceName = error_rate_query.serviceName +ORDER BY + p99_query.serviceName; + +`, start, end, topic, partition, end, start, start, end, topic, partition, end, start, start, end, topic, partition) + return query +} diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go b/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go new file mode 100644 index 0000000000..c8e4abf9fa --- /dev/null +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go @@ -0,0 +1,101 @@ +package kafka + +import ( + "fmt" + + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" +) + +var defaultStepInterval int64 = 60 + +func BuildQueryRangeParams(messagingQueue *MessagingQueue, queryContext string) (*v3.QueryRangeParamsV3, error) { + + var cq *v3.CompositeQuery + if queryContext == "producer" { + chq, err := buildProducerClickHouseQuery(messagingQueue) + if err != nil { + return nil, err + } + cq, err = buildCompositeQueryProducer(chq) + + if err != nil { + return nil, err + } + } else if queryContext == "consumer" { + chq, err := buildConsumerClickHouseQuery(messagingQueue) + if err != nil { + return nil, err + } + cq, err = buildCompositeQueryConsumer(chq) + if err != nil { + return nil, err + } + } + + queryRangeParams := &v3.QueryRangeParamsV3{ + Start: messagingQueue.Start, + End: messagingQueue.End, + Step: defaultStepInterval, + CompositeQuery: cq, + Version: "v4", + FormatForWeb: true, + } + + return queryRangeParams, nil +} + +func buildProducerClickHouseQuery(messagingQueue *MessagingQueue) (*v3.ClickHouseQuery, error) { + start := messagingQueue.Start + end := messagingQueue.End + topic, ok := messagingQueue.Variables["topic"] + if !ok { + return nil, fmt.Errorf("invalid type for Topic") + } + + partition, ok := messagingQueue.Variables["partition"] + + if !ok { + return nil, fmt.Errorf("invalid type for Partition") + } + query := generateProducerSQL(start, end, topic, partition) + + return &v3.ClickHouseQuery{ + Query: query, + }, nil +} + +func buildConsumerClickHouseQuery(messagingQueue *MessagingQueue) (*v3.ClickHouseQuery, error) { + start := messagingQueue.Start + end := messagingQueue.End + topic, ok := messagingQueue.Variables["topic"] + if !ok { + return nil, fmt.Errorf("invalid type for Topic") + } + + partition, ok := messagingQueue.Variables["partition"] + + if !ok { + return nil, fmt.Errorf("invalid type for Partition") + } + query := generateConsumerSQL(start, end, topic, partition) + + return &v3.ClickHouseQuery{ + Query: query, + }, nil +} + +func buildCompositeQueryProducer(chq *v3.ClickHouseQuery) (*v3.CompositeQuery, error) { + return &v3.CompositeQuery{ + QueryType: v3.QueryTypeClickHouseSQL, + ClickHouseQueries: map[string]*v3.ClickHouseQuery{"producer": chq}, + PanelType: v3.PanelTypeTable, + }, nil +} + +func buildCompositeQueryConsumer(chq *v3.ClickHouseQuery) (*v3.CompositeQuery, error) { + return &v3.CompositeQuery{ + QueryType: v3.QueryTypeClickHouseSQL, + ClickHouseQueries: map[string]*v3.ClickHouseQuery{"consumer": chq}, + PanelType: v3.PanelTypeTable, + }, nil +} diff --git a/pkg/query-service/app/integrations/messagingQueues/readme.md b/pkg/query-service/app/integrations/messagingQueues/readme.md new file mode 100644 index 0000000000..5b9ae94c8b --- /dev/null +++ b/pkg/query-service/app/integrations/messagingQueues/readme.md @@ -0,0 +1,10 @@ +## Integreation: Messaging Queue + +This package contains the `api`, and `translation` logic to support messaging queue features. + +Currently supported queues: +1) Kafka + +For detailed setup, checkout our public docs for configuring: +1) Trace collection form Clients (Producer and Consumer) +2) Metrics collection from Kafka Brokers, Producers and Consumers diff --git a/pkg/query-service/app/server.go b/pkg/query-service/app/server.go index 5120cd0039..99fe166a70 100644 --- a/pkg/query-service/app/server.go +++ b/pkg/query-service/app/server.go @@ -295,6 +295,7 @@ func (s *Server) createPublicServer(api *APIHandler) (*http.Server, error) { api.RegisterIntegrationRoutes(r, am) api.RegisterQueryRangeV3Routes(r, am) api.RegisterQueryRangeV4Routes(r, am) + api.RegisterMessagingQueuesRoutes(r, am) c := cors.New(cors.Options{ AllowedOrigins: []string{"*"},