diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 4eff84d50c..29cb807686 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -43,6 +43,7 @@ import ( "go.uber.org/zap" + mq "go.signoz.io/signoz/pkg/query-service/app/integrations/messagingQueues/kafka" "go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline" "go.signoz.io/signoz/pkg/query-service/dao" am "go.signoz.io/signoz/pkg/query-service/integrations/alertManager" @@ -2246,9 +2247,112 @@ func (aH *APIHandler) WriteJSON(w http.ResponseWriter, r *http.Request, response w.Write(resp) } +// RegisterMessagingQueuesRoutes adds messaging-queues routes +func (aH *APIHandler) RegisterMessagingQueuesRoutes(router *mux.Router, am *AuthMiddleware) { + // SubRouter for kafka + kafkaSubRouter := router.PathPrefix("/api/v1/messaging-queues/kafka/consumer-lag").Subrouter() + + kafkaSubRouter.HandleFunc("/producer-details", am.ViewAccess(aH.getProducerData)).Methods(http.MethodPost) + kafkaSubRouter.HandleFunc("/consumer-details", am.ViewAccess(aH.getConsumerData)).Methods(http.MethodPost) + + // for other messaging queues, add SubRouters here +} + +func (aH *APIHandler) getProducerData( + w http.ResponseWriter, r *http.Request, +) { + // parse the query params to retrieve the messaging queue struct + messagingQueue, apiErr := ParseMessagingQueueBody(r) + + if apiErr != nil { + zap.L().Error(apiErr.Err.Error()) + RespondError(w, apiErr, nil) + return + } + + queryRangeParams, err := mq.BuildQueryRangeParams(messagingQueue, "producer") + if err != nil { + zap.L().Error(err.Error()) + RespondError(w, apiErr, nil) + return + } + + if err := validateQueryRangeParamsV3(queryRangeParams); err != nil { + zap.L().Error(err.Error()) + RespondError(w, apiErr, nil) + return + } + + var result []*v3.Result + var errQuriesByName map[string]error + + result, errQuriesByName, err = aH.querierV2.QueryRange(r.Context(), queryRangeParams, nil) + if err != nil { + apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err} + RespondError(w, apiErrObj, errQuriesByName) + return + } + result = postprocess.TransformToTableForClickHouseQueries(result) + + resp := v3.QueryRangeResponse{ + Result: result, + } + aH.Respond(w, resp) +} + +func (aH *APIHandler) getConsumerData( + w http.ResponseWriter, r *http.Request, +) { + messagingQueue, apiErr := ParseMessagingQueueBody(r) + + if apiErr != nil { + zap.L().Error(apiErr.Err.Error()) + RespondError(w, apiErr, nil) + return + } + + queryRangeParams, err := mq.BuildQueryRangeParams(messagingQueue, "consumer") + if err != nil { + zap.L().Error(err.Error()) + RespondError(w, apiErr, nil) + return + } + + if err := validateQueryRangeParamsV3(queryRangeParams); err != nil { + zap.L().Error(err.Error()) + RespondError(w, apiErr, nil) + return + } + + var result []*v3.Result + var errQuriesByName map[string]error + + result, errQuriesByName, err = aH.querierV2.QueryRange(r.Context(), queryRangeParams, nil) + if err != nil { + apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err} + RespondError(w, apiErrObj, errQuriesByName) + return + } + result = postprocess.TransformToTableForClickHouseQueries(result) + + resp := v3.QueryRangeResponse{ + Result: result, + } + aH.Respond(w, resp) +} + +// ParseMessagingQueueBody parse for messaging queue params +func ParseMessagingQueueBody(r *http.Request) (*mq.MessagingQueue, *model.ApiError) { + messagingQueue := new(mq.MessagingQueue) + if err := json.NewDecoder(r.Body).Decode(messagingQueue); err != nil { + return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("cannot parse the request body: %v", err)} + } + return messagingQueue, nil +} + // Preferences -func (ah *APIHandler) getUserPreference( +func (aH *APIHandler) getUserPreference( w http.ResponseWriter, r *http.Request, ) { preferenceId := mux.Vars(r)["preferenceId"] @@ -2262,10 +2366,10 @@ func (ah *APIHandler) getUserPreference( return } - ah.Respond(w, preference) + aH.Respond(w, preference) } -func (ah *APIHandler) updateUserPreference( +func (aH *APIHandler) updateUserPreference( w http.ResponseWriter, r *http.Request, ) { preferenceId := mux.Vars(r)["preferenceId"] @@ -2284,10 +2388,10 @@ func (ah *APIHandler) updateUserPreference( return } - ah.Respond(w, preference) + aH.Respond(w, preference) } -func (ah *APIHandler) getAllUserPreferences( +func (aH *APIHandler) getAllUserPreferences( w http.ResponseWriter, r *http.Request, ) { user := common.GetUserFromContext(r.Context()) @@ -2299,10 +2403,10 @@ func (ah *APIHandler) getAllUserPreferences( return } - ah.Respond(w, preference) + aH.Respond(w, preference) } -func (ah *APIHandler) getOrgPreference( +func (aH *APIHandler) getOrgPreference( w http.ResponseWriter, r *http.Request, ) { preferenceId := mux.Vars(r)["preferenceId"] @@ -2315,10 +2419,10 @@ func (ah *APIHandler) getOrgPreference( return } - ah.Respond(w, preference) + aH.Respond(w, preference) } -func (ah *APIHandler) updateOrgPreference( +func (aH *APIHandler) updateOrgPreference( w http.ResponseWriter, r *http.Request, ) { preferenceId := mux.Vars(r)["preferenceId"] @@ -2337,10 +2441,10 @@ func (ah *APIHandler) updateOrgPreference( return } - ah.Respond(w, preference) + aH.Respond(w, preference) } -func (ah *APIHandler) getAllOrgPreferences( +func (aH *APIHandler) getAllOrgPreferences( w http.ResponseWriter, r *http.Request, ) { user := common.GetUserFromContext(r.Context()) @@ -2352,36 +2456,36 @@ func (ah *APIHandler) getAllOrgPreferences( return } - ah.Respond(w, preference) + aH.Respond(w, preference) } -// Integrations -func (ah *APIHandler) RegisterIntegrationRoutes(router *mux.Router, am *AuthMiddleware) { +// RegisterIntegrationRoutes Registers all Integrations +func (aH *APIHandler) RegisterIntegrationRoutes(router *mux.Router, am *AuthMiddleware) { subRouter := router.PathPrefix("/api/v1/integrations").Subrouter() subRouter.HandleFunc( - "/install", am.ViewAccess(ah.InstallIntegration), + "/install", am.ViewAccess(aH.InstallIntegration), ).Methods(http.MethodPost) subRouter.HandleFunc( - "/uninstall", am.ViewAccess(ah.UninstallIntegration), + "/uninstall", am.ViewAccess(aH.UninstallIntegration), ).Methods(http.MethodPost) // Used for polling for status in v0 subRouter.HandleFunc( - "/{integrationId}/connection_status", am.ViewAccess(ah.GetIntegrationConnectionStatus), + "/{integrationId}/connection_status", am.ViewAccess(aH.GetIntegrationConnectionStatus), ).Methods(http.MethodGet) subRouter.HandleFunc( - "/{integrationId}", am.ViewAccess(ah.GetIntegration), + "/{integrationId}", am.ViewAccess(aH.GetIntegration), ).Methods(http.MethodGet) subRouter.HandleFunc( - "", am.ViewAccess(ah.ListIntegrations), + "", am.ViewAccess(aH.ListIntegrations), ).Methods(http.MethodGet) } -func (ah *APIHandler) ListIntegrations( +func (aH *APIHandler) ListIntegrations( w http.ResponseWriter, r *http.Request, ) { params := map[string]string{} @@ -2389,21 +2493,21 @@ func (ah *APIHandler) ListIntegrations( params[k] = values[0] } - resp, apiErr := ah.IntegrationsController.ListIntegrations( + resp, apiErr := aH.IntegrationsController.ListIntegrations( r.Context(), params, ) if apiErr != nil { RespondError(w, apiErr, "Failed to fetch integrations") return } - ah.Respond(w, resp) + aH.Respond(w, resp) } -func (ah *APIHandler) GetIntegration( +func (aH *APIHandler) GetIntegration( w http.ResponseWriter, r *http.Request, ) { integrationId := mux.Vars(r)["integrationId"] - integration, apiErr := ah.IntegrationsController.GetIntegration( + integration, apiErr := aH.IntegrationsController.GetIntegration( r.Context(), integrationId, ) if apiErr != nil { @@ -2411,14 +2515,14 @@ func (ah *APIHandler) GetIntegration( return } - ah.Respond(w, integration) + aH.Respond(w, integration) } -func (ah *APIHandler) GetIntegrationConnectionStatus( +func (aH *APIHandler) GetIntegrationConnectionStatus( w http.ResponseWriter, r *http.Request, ) { integrationId := mux.Vars(r)["integrationId"] - isInstalled, apiErr := ah.IntegrationsController.IsIntegrationInstalled( + isInstalled, apiErr := aH.IntegrationsController.IsIntegrationInstalled( r.Context(), integrationId, ) if apiErr != nil { @@ -2428,11 +2532,11 @@ func (ah *APIHandler) GetIntegrationConnectionStatus( // Do not spend resources calculating connection status unless installed. if !isInstalled { - ah.Respond(w, &integrations.IntegrationConnectionStatus{}) + aH.Respond(w, &integrations.IntegrationConnectionStatus{}) return } - connectionTests, apiErr := ah.IntegrationsController.GetIntegrationConnectionTests( + connectionTests, apiErr := aH.IntegrationsController.GetIntegrationConnectionTests( r.Context(), integrationId, ) if apiErr != nil { @@ -2446,7 +2550,7 @@ func (ah *APIHandler) GetIntegrationConnectionStatus( lookbackSeconds = 15 * 60 } - connectionStatus, apiErr := ah.calculateConnectionStatus( + connectionStatus, apiErr := aH.calculateConnectionStatus( r.Context(), connectionTests, lookbackSeconds, ) if apiErr != nil { @@ -2454,10 +2558,10 @@ func (ah *APIHandler) GetIntegrationConnectionStatus( return } - ah.Respond(w, connectionStatus) + aH.Respond(w, connectionStatus) } -func (ah *APIHandler) calculateConnectionStatus( +func (aH *APIHandler) calculateConnectionStatus( ctx context.Context, connectionTests *integrations.IntegrationConnectionTests, lookbackSeconds int64, @@ -2475,7 +2579,7 @@ func (ah *APIHandler) calculateConnectionStatus( go func() { defer wg.Done() - logsConnStatus, apiErr := ah.calculateLogsConnectionStatus( + logsConnStatus, apiErr := aH.calculateLogsConnectionStatus( ctx, connectionTests.Logs, lookbackSeconds, ) @@ -2498,7 +2602,7 @@ func (ah *APIHandler) calculateConnectionStatus( return } - statusForLastReceivedMetric, apiErr := ah.reader.GetLatestReceivedMetric( + statusForLastReceivedMetric, apiErr := aH.reader.GetLatestReceivedMetric( ctx, connectionTests.Metrics, ) @@ -2542,7 +2646,7 @@ func (ah *APIHandler) calculateConnectionStatus( return result, nil } -func (ah *APIHandler) calculateLogsConnectionStatus( +func (aH *APIHandler) calculateLogsConnectionStatus( ctx context.Context, logsConnectionTest *integrations.LogsConnectionTest, lookbackSeconds int64, @@ -2584,7 +2688,7 @@ func (ah *APIHandler) calculateLogsConnectionStatus( }, }, } - queryRes, _, err := ah.querier.QueryRange( + queryRes, _, err := aH.querier.QueryRange( ctx, qrParams, map[string]v3.AttributeKey{}, ) if err != nil { @@ -2621,7 +2725,7 @@ func (ah *APIHandler) calculateLogsConnectionStatus( return nil, nil } -func (ah *APIHandler) InstallIntegration( +func (aH *APIHandler) InstallIntegration( w http.ResponseWriter, r *http.Request, ) { req := integrations.InstallIntegrationRequest{} @@ -2632,7 +2736,7 @@ func (ah *APIHandler) InstallIntegration( return } - integration, apiErr := ah.IntegrationsController.Install( + integration, apiErr := aH.IntegrationsController.Install( r.Context(), &req, ) if apiErr != nil { @@ -2640,10 +2744,10 @@ func (ah *APIHandler) InstallIntegration( return } - ah.Respond(w, integration) + aH.Respond(w, integration) } -func (ah *APIHandler) UninstallIntegration( +func (aH *APIHandler) UninstallIntegration( w http.ResponseWriter, r *http.Request, ) { req := integrations.UninstallIntegrationRequest{} @@ -2654,13 +2758,13 @@ func (ah *APIHandler) UninstallIntegration( return } - apiErr := ah.IntegrationsController.Uninstall(r.Context(), &req) + apiErr := aH.IntegrationsController.Uninstall(r.Context(), &req) if apiErr != nil { RespondError(w, apiErr, nil) return } - ah.Respond(w, map[string]interface{}{}) + aH.Respond(w, map[string]interface{}{}) } // logs @@ -2807,7 +2911,7 @@ func parseAgentConfigVersion(r *http.Request) (int, *model.ApiError) { return int(version64), nil } -func (ah *APIHandler) PreviewLogsPipelinesHandler(w http.ResponseWriter, r *http.Request) { +func (aH *APIHandler) PreviewLogsPipelinesHandler(w http.ResponseWriter, r *http.Request) { req := logparsingpipeline.PipelinesPreviewRequest{} if err := json.NewDecoder(r.Body).Decode(&req); err != nil { @@ -2815,7 +2919,7 @@ func (ah *APIHandler) PreviewLogsPipelinesHandler(w http.ResponseWriter, r *http return } - resultLogs, apiErr := ah.LogsParsingPipelineController.PreviewLogsPipelines( + resultLogs, apiErr := aH.LogsParsingPipelineController.PreviewLogsPipelines( r.Context(), &req, ) @@ -2824,10 +2928,10 @@ func (ah *APIHandler) PreviewLogsPipelinesHandler(w http.ResponseWriter, r *http return } - ah.Respond(w, resultLogs) + aH.Respond(w, resultLogs) } -func (ah *APIHandler) ListLogsPipelinesHandler(w http.ResponseWriter, r *http.Request) { +func (aH *APIHandler) ListLogsPipelinesHandler(w http.ResponseWriter, r *http.Request) { version, err := parseAgentConfigVersion(r) if err != nil { @@ -2839,20 +2943,20 @@ func (ah *APIHandler) ListLogsPipelinesHandler(w http.ResponseWriter, r *http.Re var apierr *model.ApiError if version != -1 { - payload, apierr = ah.listLogsPipelinesByVersion(context.Background(), version) + payload, apierr = aH.listLogsPipelinesByVersion(context.Background(), version) } else { - payload, apierr = ah.listLogsPipelines(context.Background()) + payload, apierr = aH.listLogsPipelines(context.Background()) } if apierr != nil { RespondError(w, apierr, payload) return } - ah.Respond(w, payload) + aH.Respond(w, payload) } // listLogsPipelines lists logs piplines for latest version -func (ah *APIHandler) listLogsPipelines(ctx context.Context) ( +func (aH *APIHandler) listLogsPipelines(ctx context.Context) ( *logparsingpipeline.PipelinesResponse, *model.ApiError, ) { // get lateset agent config @@ -2866,7 +2970,7 @@ func (ah *APIHandler) listLogsPipelines(ctx context.Context) ( latestVersion = lastestConfig.Version } - payload, err := ah.LogsParsingPipelineController.GetPipelinesByVersion(ctx, latestVersion) + payload, err := aH.LogsParsingPipelineController.GetPipelinesByVersion(ctx, latestVersion) if err != nil { return nil, model.WrapApiError(err, "failed to get pipelines") } @@ -2882,10 +2986,10 @@ func (ah *APIHandler) listLogsPipelines(ctx context.Context) ( } // listLogsPipelinesByVersion lists pipelines along with config version history -func (ah *APIHandler) listLogsPipelinesByVersion(ctx context.Context, version int) ( +func (aH *APIHandler) listLogsPipelinesByVersion(ctx context.Context, version int) ( *logparsingpipeline.PipelinesResponse, *model.ApiError, ) { - payload, err := ah.LogsParsingPipelineController.GetPipelinesByVersion(ctx, version) + payload, err := aH.LogsParsingPipelineController.GetPipelinesByVersion(ctx, version) if err != nil { return nil, model.WrapApiError(err, "failed to get pipelines by version") } @@ -2901,7 +3005,7 @@ func (ah *APIHandler) listLogsPipelinesByVersion(ctx context.Context, version in return payload, nil } -func (ah *APIHandler) CreateLogsPipeline(w http.ResponseWriter, r *http.Request) { +func (aH *APIHandler) CreateLogsPipeline(w http.ResponseWriter, r *http.Request) { req := logparsingpipeline.PostablePipelines{} @@ -2924,7 +3028,7 @@ func (ah *APIHandler) CreateLogsPipeline(w http.ResponseWriter, r *http.Request) } } - return ah.LogsParsingPipelineController.ApplyPipelines(ctx, postable) + return aH.LogsParsingPipelineController.ApplyPipelines(ctx, postable) } res, err := createPipeline(r.Context(), req.Pipelines) @@ -2933,7 +3037,7 @@ func (ah *APIHandler) CreateLogsPipeline(w http.ResponseWriter, r *http.Request) return } - ah.Respond(w, res) + aH.Respond(w, res) } func (aH *APIHandler) getSavedViews(w http.ResponseWriter, r *http.Request) { diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md b/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md new file mode 100644 index 0000000000..c34bc7ad64 --- /dev/null +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md @@ -0,0 +1,197 @@ +## Consumer Lag feature break down + +### 1) Consumer Lag Graph + + +--- + +### 2) Consumer Group Details + +API endpoint: + +``` +POST /api/v1/messaging-queues/kafka/consumer-lag/consumer-details +``` + +```json +{ + "start": 1720685296000000000, + "end": 1721290096000000000, + "variables": { + "partition": "0", + "topic": "topic1", + "consumer_group": "cg1" + } +} +``` + +response in query range format `series` +```json +{ + "status": "success", + "data": { + "resultType": "", + "result": [ + { + "table": { + "columns": [ + { + "name": "service_name", + "queryName": "", + "isValueColumn": false + }, + { + "name": "p99", + "queryName": "", + "isValueColumn": false + }, + { + "name": "error_rate", + "queryName": "", + "isValueColumn": false + }, + { + "name": "throughput", + "queryName": "", + "isValueColumn": false + }, + { + "name": "avg_msg_size", + "queryName": "", + "isValueColumn": false + } + ], + "rows": [ + { + "data": { + "avg_msg_size": "0", + "error_rate": "0", + "p99": "0.2942205100000016", + "service_name": "consumer-svc", + "throughput": "0.00016534391534391533" + } + } + ] + } + } + ] + } +} +``` + + + +### 3) Producer Details + +API endpoint: + +``` +POST /api/v1/messaging-queues/kafka/consumer-lag/producer-details +``` + +```json +{ + "start": 1720685296000000000, + "end": 1721290096000000000, + "variables": { + "partition": "0", + "topic": "topic1" + } +} +``` + +response in query range format `series` +```json +{ + "status": "success", + "data": { + "resultType": "", + "result": [ + { + "table": { + "columns": [ + { + "name": "service_name", + "queryName": "", + "isValueColumn": false + }, + { + "name": "p99_query.p99", + "queryName": "", + "isValueColumn": false + }, + { + "name": "error_rate", + "queryName": "", + "isValueColumn": false + }, + { + "name": "rps", + "queryName": "", + "isValueColumn": false + } + ], + "rows": [ + { + "data": { + "error_rate": "0", + "p99_query.p99": "150.08830908000002", + "rps": "0.00016534391534391533", + "service_name": "producer-svc" + } + } + ] + } + } + ] + } +} +``` +response in query range format `table` +```json +{ + "status": "success", + "data": { + "resultType": "", + "result": [ + { + "table": { + "columns": [ + { + "name": "service_name", + "queryName": "", + "isValueColumn": false + }, + { + "name": "p99_query.p99", + "queryName": "", + "isValueColumn": false + }, + { + "name": "error_rate", + "queryName": "", + "isValueColumn": false + }, + { + "name": "rps", + "queryName": "", + "isValueColumn": false + } + ], + "rows": [ + { + "data": { + "error_rate": "0", + "p99_query.p99": "150.08830908000002", + "rps": "0.00016534391534391533", + "service_name": "producer-svc" + } + } + ] + } + } + ] + } +} +``` + diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/model.go b/pkg/query-service/app/integrations/messagingQueues/kafka/model.go new file mode 100644 index 0000000000..b24734cf48 --- /dev/null +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/model.go @@ -0,0 +1,9 @@ +package kafka + +const kafkaQueue = "kafka" + +type MessagingQueue struct { + Start int64 `json:"start"` + End int64 `json:"end"` + Variables map[string]string `json:"variables,omitempty"` +} diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go b/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go new file mode 100644 index 0000000000..e06e35efde --- /dev/null +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go @@ -0,0 +1,76 @@ +package kafka + +import ( + "fmt" +) + +func generateConsumerSQL(start, end int64, topic, partition, consumerGroup, queueType string) string { + timeRange := (end - start) / 1000000000 + query := fmt.Sprintf(` +WITH consumer_query AS ( + SELECT + serviceName, + quantile(0.99)(durationNano) / 1000000 AS p99, + COUNT(*) AS total_requests, + SUM(CASE WHEN statusCode = 2 THEN 1 ELSE 0 END) AS error_count, + avg(CASE WHEN has(numberTagMap, 'messaging.message.body.size') THEN numberTagMap['messaging.message.body.size'] ELSE NULL END) AS avg_msg_size + FROM signoz_traces.distributed_signoz_index_v2 + WHERE + timestamp >= '%d' + AND timestamp <= '%d' + AND kind = 5 + AND msgSystem = '%s' + AND stringTagMap['messaging.destination.name'] = '%s' + AND stringTagMap['messaging.destination.partition.id'] = '%s' + AND stringTagMap['messaging.kafka.consumer.group'] = '%s' + GROUP BY serviceName +) + +-- Main query to select all metrics +SELECT + serviceName AS service_name, + p99, + COALESCE((error_count * 100.0) / total_requests, 0) AS error_rate, + COALESCE(total_requests / %d, 0) AS throughput, -- Convert nanoseconds to seconds + COALESCE(avg_msg_size, 0) AS avg_msg_size +FROM + consumer_query +ORDER BY + serviceName; +`, start, end, queueType, topic, partition, consumerGroup, timeRange) + return query +} + +func generateProducerSQL(start, end int64, topic, partition, queueType string) string { + timeRange := (end - start) / 1000000000 + query := fmt.Sprintf(` +WITH producer_query AS ( + SELECT + serviceName, + quantile(0.99)(durationNano) / 1000000 AS p99, + count(*) AS total_count, + SUM(CASE WHEN statusCode = 2 THEN 1 ELSE 0 END) AS error_count + FROM signoz_traces.distributed_signoz_index_v2 + WHERE + timestamp >= '%d' + AND timestamp <= '%d' + AND kind = 4 + AND msgSystem = '%s' + AND stringTagMap['messaging.destination.name'] = '%s' + AND stringTagMap['messaging.destination.partition.id'] = '%s' + GROUP BY serviceName +) + +SELECT + serviceName AS service_name, + p99, + COALESCE((error_count * 100.0) / total_count, 0) AS error_percentage, + COALESCE(total_count / %d, 0) AS rps -- Convert nanoseconds to seconds +FROM + producer_query +ORDER BY + serviceName; + +`, start, end, queueType, topic, partition, timeRange) + return query +} diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go b/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go new file mode 100644 index 0000000000..98414ebf0f --- /dev/null +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go @@ -0,0 +1,74 @@ +package kafka + +import ( + "fmt" + + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" +) + +var defaultStepInterval int64 = 60 + +func BuildQueryRangeParams(messagingQueue *MessagingQueue, queryContext string) (*v3.QueryRangeParamsV3, error) { + + // ToDo: propagate this through APIs when there are different handlers + queueType := kafkaQueue + + var cq *v3.CompositeQuery + + chq, err := buildClickHouseQuery(messagingQueue, queueType, queryContext) + + if err != nil { + return nil, err + } + + cq, err = buildCompositeQuery(chq, queryContext) + + queryRangeParams := &v3.QueryRangeParamsV3{ + Start: messagingQueue.Start, + End: messagingQueue.End, + Step: defaultStepInterval, + CompositeQuery: cq, + Version: "v4", + FormatForWeb: true, + } + + return queryRangeParams, nil +} + +func buildClickHouseQuery(messagingQueue *MessagingQueue, queueType string, queryContext string) (*v3.ClickHouseQuery, error) { + start := messagingQueue.Start + end := messagingQueue.End + topic, ok := messagingQueue.Variables["topic"] + if !ok { + return nil, fmt.Errorf("invalid type for Topic") + } + + partition, ok := messagingQueue.Variables["partition"] + if !ok { + return nil, fmt.Errorf("invalid type for Partition") + } + + consumerGroup, ok := messagingQueue.Variables["consumer_group"] + if !ok { + return nil, fmt.Errorf("invalid type for consumer group") + } + + var query string + if queryContext == "producer" { + query = generateProducerSQL(start, end, topic, partition, queueType) + } else if queryContext == "consumer" { + query = generateConsumerSQL(start, end, topic, partition, consumerGroup, queueType) + } + + return &v3.ClickHouseQuery{ + Query: query, + }, nil +} + +func buildCompositeQuery(chq *v3.ClickHouseQuery, queryContext string) (*v3.CompositeQuery, error) { + return &v3.CompositeQuery{ + QueryType: v3.QueryTypeClickHouseSQL, + ClickHouseQueries: map[string]*v3.ClickHouseQuery{queryContext: chq}, + PanelType: v3.PanelTypeTable, + }, nil +} diff --git a/pkg/query-service/app/integrations/messagingQueues/readme.md b/pkg/query-service/app/integrations/messagingQueues/readme.md new file mode 100644 index 0000000000..5b9ae94c8b --- /dev/null +++ b/pkg/query-service/app/integrations/messagingQueues/readme.md @@ -0,0 +1,10 @@ +## Integreation: Messaging Queue + +This package contains the `api`, and `translation` logic to support messaging queue features. + +Currently supported queues: +1) Kafka + +For detailed setup, checkout our public docs for configuring: +1) Trace collection form Clients (Producer and Consumer) +2) Metrics collection from Kafka Brokers, Producers and Consumers diff --git a/pkg/query-service/app/server.go b/pkg/query-service/app/server.go index 5120cd0039..99fe166a70 100644 --- a/pkg/query-service/app/server.go +++ b/pkg/query-service/app/server.go @@ -295,6 +295,7 @@ func (s *Server) createPublicServer(api *APIHandler) (*http.Server, error) { api.RegisterIntegrationRoutes(r, am) api.RegisterQueryRangeV3Routes(r, am) api.RegisterQueryRangeV4Routes(r, am) + api.RegisterMessagingQueuesRoutes(r, am) c := cors.New(cors.Options{ AllowedOrigins: []string{"*"},