Merge pull request #5542 from shivanshuraj1333/api-kafka

messaging queue, consumer lag APIs
This commit is contained in:
Shivanshu Raj Shrivastava 2024-08-06 17:58:01 +05:30 committed by GitHub
commit abe65975c9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 527 additions and 56 deletions

View File

@ -43,6 +43,7 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
mq "go.signoz.io/signoz/pkg/query-service/app/integrations/messagingQueues/kafka"
"go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline" "go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline"
"go.signoz.io/signoz/pkg/query-service/dao" "go.signoz.io/signoz/pkg/query-service/dao"
am "go.signoz.io/signoz/pkg/query-service/integrations/alertManager" am "go.signoz.io/signoz/pkg/query-service/integrations/alertManager"
@ -2246,9 +2247,112 @@ func (aH *APIHandler) WriteJSON(w http.ResponseWriter, r *http.Request, response
w.Write(resp) w.Write(resp)
} }
// RegisterMessagingQueuesRoutes adds messaging-queues routes
func (aH *APIHandler) RegisterMessagingQueuesRoutes(router *mux.Router, am *AuthMiddleware) {
// SubRouter for kafka
kafkaSubRouter := router.PathPrefix("/api/v1/messaging-queues/kafka/consumer-lag").Subrouter()
kafkaSubRouter.HandleFunc("/producer-details", am.ViewAccess(aH.getProducerData)).Methods(http.MethodPost)
kafkaSubRouter.HandleFunc("/consumer-details", am.ViewAccess(aH.getConsumerData)).Methods(http.MethodPost)
// for other messaging queues, add SubRouters here
}
func (aH *APIHandler) getProducerData(
w http.ResponseWriter, r *http.Request,
) {
// parse the query params to retrieve the messaging queue struct
messagingQueue, apiErr := ParseMessagingQueueBody(r)
if apiErr != nil {
zap.L().Error(apiErr.Err.Error())
RespondError(w, apiErr, nil)
return
}
queryRangeParams, err := mq.BuildQueryRangeParams(messagingQueue, "producer")
if err != nil {
zap.L().Error(err.Error())
RespondError(w, apiErr, nil)
return
}
if err := validateQueryRangeParamsV3(queryRangeParams); err != nil {
zap.L().Error(err.Error())
RespondError(w, apiErr, nil)
return
}
var result []*v3.Result
var errQuriesByName map[string]error
result, errQuriesByName, err = aH.querierV2.QueryRange(r.Context(), queryRangeParams, nil)
if err != nil {
apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err}
RespondError(w, apiErrObj, errQuriesByName)
return
}
result = postprocess.TransformToTableForClickHouseQueries(result)
resp := v3.QueryRangeResponse{
Result: result,
}
aH.Respond(w, resp)
}
func (aH *APIHandler) getConsumerData(
w http.ResponseWriter, r *http.Request,
) {
messagingQueue, apiErr := ParseMessagingQueueBody(r)
if apiErr != nil {
zap.L().Error(apiErr.Err.Error())
RespondError(w, apiErr, nil)
return
}
queryRangeParams, err := mq.BuildQueryRangeParams(messagingQueue, "consumer")
if err != nil {
zap.L().Error(err.Error())
RespondError(w, apiErr, nil)
return
}
if err := validateQueryRangeParamsV3(queryRangeParams); err != nil {
zap.L().Error(err.Error())
RespondError(w, apiErr, nil)
return
}
var result []*v3.Result
var errQuriesByName map[string]error
result, errQuriesByName, err = aH.querierV2.QueryRange(r.Context(), queryRangeParams, nil)
if err != nil {
apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err}
RespondError(w, apiErrObj, errQuriesByName)
return
}
result = postprocess.TransformToTableForClickHouseQueries(result)
resp := v3.QueryRangeResponse{
Result: result,
}
aH.Respond(w, resp)
}
// ParseMessagingQueueBody parse for messaging queue params
func ParseMessagingQueueBody(r *http.Request) (*mq.MessagingQueue, *model.ApiError) {
messagingQueue := new(mq.MessagingQueue)
if err := json.NewDecoder(r.Body).Decode(messagingQueue); err != nil {
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("cannot parse the request body: %v", err)}
}
return messagingQueue, nil
}
// Preferences // Preferences
func (ah *APIHandler) getUserPreference( func (aH *APIHandler) getUserPreference(
w http.ResponseWriter, r *http.Request, w http.ResponseWriter, r *http.Request,
) { ) {
preferenceId := mux.Vars(r)["preferenceId"] preferenceId := mux.Vars(r)["preferenceId"]
@ -2262,10 +2366,10 @@ func (ah *APIHandler) getUserPreference(
return return
} }
ah.Respond(w, preference) aH.Respond(w, preference)
} }
func (ah *APIHandler) updateUserPreference( func (aH *APIHandler) updateUserPreference(
w http.ResponseWriter, r *http.Request, w http.ResponseWriter, r *http.Request,
) { ) {
preferenceId := mux.Vars(r)["preferenceId"] preferenceId := mux.Vars(r)["preferenceId"]
@ -2284,10 +2388,10 @@ func (ah *APIHandler) updateUserPreference(
return return
} }
ah.Respond(w, preference) aH.Respond(w, preference)
} }
func (ah *APIHandler) getAllUserPreferences( func (aH *APIHandler) getAllUserPreferences(
w http.ResponseWriter, r *http.Request, w http.ResponseWriter, r *http.Request,
) { ) {
user := common.GetUserFromContext(r.Context()) user := common.GetUserFromContext(r.Context())
@ -2299,10 +2403,10 @@ func (ah *APIHandler) getAllUserPreferences(
return return
} }
ah.Respond(w, preference) aH.Respond(w, preference)
} }
func (ah *APIHandler) getOrgPreference( func (aH *APIHandler) getOrgPreference(
w http.ResponseWriter, r *http.Request, w http.ResponseWriter, r *http.Request,
) { ) {
preferenceId := mux.Vars(r)["preferenceId"] preferenceId := mux.Vars(r)["preferenceId"]
@ -2315,10 +2419,10 @@ func (ah *APIHandler) getOrgPreference(
return return
} }
ah.Respond(w, preference) aH.Respond(w, preference)
} }
func (ah *APIHandler) updateOrgPreference( func (aH *APIHandler) updateOrgPreference(
w http.ResponseWriter, r *http.Request, w http.ResponseWriter, r *http.Request,
) { ) {
preferenceId := mux.Vars(r)["preferenceId"] preferenceId := mux.Vars(r)["preferenceId"]
@ -2337,10 +2441,10 @@ func (ah *APIHandler) updateOrgPreference(
return return
} }
ah.Respond(w, preference) aH.Respond(w, preference)
} }
func (ah *APIHandler) getAllOrgPreferences( func (aH *APIHandler) getAllOrgPreferences(
w http.ResponseWriter, r *http.Request, w http.ResponseWriter, r *http.Request,
) { ) {
user := common.GetUserFromContext(r.Context()) user := common.GetUserFromContext(r.Context())
@ -2352,36 +2456,36 @@ func (ah *APIHandler) getAllOrgPreferences(
return return
} }
ah.Respond(w, preference) aH.Respond(w, preference)
} }
// Integrations // RegisterIntegrationRoutes Registers all Integrations
func (ah *APIHandler) RegisterIntegrationRoutes(router *mux.Router, am *AuthMiddleware) { func (aH *APIHandler) RegisterIntegrationRoutes(router *mux.Router, am *AuthMiddleware) {
subRouter := router.PathPrefix("/api/v1/integrations").Subrouter() subRouter := router.PathPrefix("/api/v1/integrations").Subrouter()
subRouter.HandleFunc( subRouter.HandleFunc(
"/install", am.ViewAccess(ah.InstallIntegration), "/install", am.ViewAccess(aH.InstallIntegration),
).Methods(http.MethodPost) ).Methods(http.MethodPost)
subRouter.HandleFunc( subRouter.HandleFunc(
"/uninstall", am.ViewAccess(ah.UninstallIntegration), "/uninstall", am.ViewAccess(aH.UninstallIntegration),
).Methods(http.MethodPost) ).Methods(http.MethodPost)
// Used for polling for status in v0 // Used for polling for status in v0
subRouter.HandleFunc( subRouter.HandleFunc(
"/{integrationId}/connection_status", am.ViewAccess(ah.GetIntegrationConnectionStatus), "/{integrationId}/connection_status", am.ViewAccess(aH.GetIntegrationConnectionStatus),
).Methods(http.MethodGet) ).Methods(http.MethodGet)
subRouter.HandleFunc( subRouter.HandleFunc(
"/{integrationId}", am.ViewAccess(ah.GetIntegration), "/{integrationId}", am.ViewAccess(aH.GetIntegration),
).Methods(http.MethodGet) ).Methods(http.MethodGet)
subRouter.HandleFunc( subRouter.HandleFunc(
"", am.ViewAccess(ah.ListIntegrations), "", am.ViewAccess(aH.ListIntegrations),
).Methods(http.MethodGet) ).Methods(http.MethodGet)
} }
func (ah *APIHandler) ListIntegrations( func (aH *APIHandler) ListIntegrations(
w http.ResponseWriter, r *http.Request, w http.ResponseWriter, r *http.Request,
) { ) {
params := map[string]string{} params := map[string]string{}
@ -2389,21 +2493,21 @@ func (ah *APIHandler) ListIntegrations(
params[k] = values[0] params[k] = values[0]
} }
resp, apiErr := ah.IntegrationsController.ListIntegrations( resp, apiErr := aH.IntegrationsController.ListIntegrations(
r.Context(), params, r.Context(), params,
) )
if apiErr != nil { if apiErr != nil {
RespondError(w, apiErr, "Failed to fetch integrations") RespondError(w, apiErr, "Failed to fetch integrations")
return return
} }
ah.Respond(w, resp) aH.Respond(w, resp)
} }
func (ah *APIHandler) GetIntegration( func (aH *APIHandler) GetIntegration(
w http.ResponseWriter, r *http.Request, w http.ResponseWriter, r *http.Request,
) { ) {
integrationId := mux.Vars(r)["integrationId"] integrationId := mux.Vars(r)["integrationId"]
integration, apiErr := ah.IntegrationsController.GetIntegration( integration, apiErr := aH.IntegrationsController.GetIntegration(
r.Context(), integrationId, r.Context(), integrationId,
) )
if apiErr != nil { if apiErr != nil {
@ -2411,14 +2515,14 @@ func (ah *APIHandler) GetIntegration(
return return
} }
ah.Respond(w, integration) aH.Respond(w, integration)
} }
func (ah *APIHandler) GetIntegrationConnectionStatus( func (aH *APIHandler) GetIntegrationConnectionStatus(
w http.ResponseWriter, r *http.Request, w http.ResponseWriter, r *http.Request,
) { ) {
integrationId := mux.Vars(r)["integrationId"] integrationId := mux.Vars(r)["integrationId"]
isInstalled, apiErr := ah.IntegrationsController.IsIntegrationInstalled( isInstalled, apiErr := aH.IntegrationsController.IsIntegrationInstalled(
r.Context(), integrationId, r.Context(), integrationId,
) )
if apiErr != nil { if apiErr != nil {
@ -2428,11 +2532,11 @@ func (ah *APIHandler) GetIntegrationConnectionStatus(
// Do not spend resources calculating connection status unless installed. // Do not spend resources calculating connection status unless installed.
if !isInstalled { if !isInstalled {
ah.Respond(w, &integrations.IntegrationConnectionStatus{}) aH.Respond(w, &integrations.IntegrationConnectionStatus{})
return return
} }
connectionTests, apiErr := ah.IntegrationsController.GetIntegrationConnectionTests( connectionTests, apiErr := aH.IntegrationsController.GetIntegrationConnectionTests(
r.Context(), integrationId, r.Context(), integrationId,
) )
if apiErr != nil { if apiErr != nil {
@ -2446,7 +2550,7 @@ func (ah *APIHandler) GetIntegrationConnectionStatus(
lookbackSeconds = 15 * 60 lookbackSeconds = 15 * 60
} }
connectionStatus, apiErr := ah.calculateConnectionStatus( connectionStatus, apiErr := aH.calculateConnectionStatus(
r.Context(), connectionTests, lookbackSeconds, r.Context(), connectionTests, lookbackSeconds,
) )
if apiErr != nil { if apiErr != nil {
@ -2454,10 +2558,10 @@ func (ah *APIHandler) GetIntegrationConnectionStatus(
return return
} }
ah.Respond(w, connectionStatus) aH.Respond(w, connectionStatus)
} }
func (ah *APIHandler) calculateConnectionStatus( func (aH *APIHandler) calculateConnectionStatus(
ctx context.Context, ctx context.Context,
connectionTests *integrations.IntegrationConnectionTests, connectionTests *integrations.IntegrationConnectionTests,
lookbackSeconds int64, lookbackSeconds int64,
@ -2475,7 +2579,7 @@ func (ah *APIHandler) calculateConnectionStatus(
go func() { go func() {
defer wg.Done() defer wg.Done()
logsConnStatus, apiErr := ah.calculateLogsConnectionStatus( logsConnStatus, apiErr := aH.calculateLogsConnectionStatus(
ctx, connectionTests.Logs, lookbackSeconds, ctx, connectionTests.Logs, lookbackSeconds,
) )
@ -2498,7 +2602,7 @@ func (ah *APIHandler) calculateConnectionStatus(
return return
} }
statusForLastReceivedMetric, apiErr := ah.reader.GetLatestReceivedMetric( statusForLastReceivedMetric, apiErr := aH.reader.GetLatestReceivedMetric(
ctx, connectionTests.Metrics, ctx, connectionTests.Metrics,
) )
@ -2542,7 +2646,7 @@ func (ah *APIHandler) calculateConnectionStatus(
return result, nil return result, nil
} }
func (ah *APIHandler) calculateLogsConnectionStatus( func (aH *APIHandler) calculateLogsConnectionStatus(
ctx context.Context, ctx context.Context,
logsConnectionTest *integrations.LogsConnectionTest, logsConnectionTest *integrations.LogsConnectionTest,
lookbackSeconds int64, lookbackSeconds int64,
@ -2584,7 +2688,7 @@ func (ah *APIHandler) calculateLogsConnectionStatus(
}, },
}, },
} }
queryRes, _, err := ah.querier.QueryRange( queryRes, _, err := aH.querier.QueryRange(
ctx, qrParams, map[string]v3.AttributeKey{}, ctx, qrParams, map[string]v3.AttributeKey{},
) )
if err != nil { if err != nil {
@ -2621,7 +2725,7 @@ func (ah *APIHandler) calculateLogsConnectionStatus(
return nil, nil return nil, nil
} }
func (ah *APIHandler) InstallIntegration( func (aH *APIHandler) InstallIntegration(
w http.ResponseWriter, r *http.Request, w http.ResponseWriter, r *http.Request,
) { ) {
req := integrations.InstallIntegrationRequest{} req := integrations.InstallIntegrationRequest{}
@ -2632,7 +2736,7 @@ func (ah *APIHandler) InstallIntegration(
return return
} }
integration, apiErr := ah.IntegrationsController.Install( integration, apiErr := aH.IntegrationsController.Install(
r.Context(), &req, r.Context(), &req,
) )
if apiErr != nil { if apiErr != nil {
@ -2640,10 +2744,10 @@ func (ah *APIHandler) InstallIntegration(
return return
} }
ah.Respond(w, integration) aH.Respond(w, integration)
} }
func (ah *APIHandler) UninstallIntegration( func (aH *APIHandler) UninstallIntegration(
w http.ResponseWriter, r *http.Request, w http.ResponseWriter, r *http.Request,
) { ) {
req := integrations.UninstallIntegrationRequest{} req := integrations.UninstallIntegrationRequest{}
@ -2654,13 +2758,13 @@ func (ah *APIHandler) UninstallIntegration(
return return
} }
apiErr := ah.IntegrationsController.Uninstall(r.Context(), &req) apiErr := aH.IntegrationsController.Uninstall(r.Context(), &req)
if apiErr != nil { if apiErr != nil {
RespondError(w, apiErr, nil) RespondError(w, apiErr, nil)
return return
} }
ah.Respond(w, map[string]interface{}{}) aH.Respond(w, map[string]interface{}{})
} }
// logs // logs
@ -2807,7 +2911,7 @@ func parseAgentConfigVersion(r *http.Request) (int, *model.ApiError) {
return int(version64), nil return int(version64), nil
} }
func (ah *APIHandler) PreviewLogsPipelinesHandler(w http.ResponseWriter, r *http.Request) { func (aH *APIHandler) PreviewLogsPipelinesHandler(w http.ResponseWriter, r *http.Request) {
req := logparsingpipeline.PipelinesPreviewRequest{} req := logparsingpipeline.PipelinesPreviewRequest{}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil { if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
@ -2815,7 +2919,7 @@ func (ah *APIHandler) PreviewLogsPipelinesHandler(w http.ResponseWriter, r *http
return return
} }
resultLogs, apiErr := ah.LogsParsingPipelineController.PreviewLogsPipelines( resultLogs, apiErr := aH.LogsParsingPipelineController.PreviewLogsPipelines(
r.Context(), &req, r.Context(), &req,
) )
@ -2824,10 +2928,10 @@ func (ah *APIHandler) PreviewLogsPipelinesHandler(w http.ResponseWriter, r *http
return return
} }
ah.Respond(w, resultLogs) aH.Respond(w, resultLogs)
} }
func (ah *APIHandler) ListLogsPipelinesHandler(w http.ResponseWriter, r *http.Request) { func (aH *APIHandler) ListLogsPipelinesHandler(w http.ResponseWriter, r *http.Request) {
version, err := parseAgentConfigVersion(r) version, err := parseAgentConfigVersion(r)
if err != nil { if err != nil {
@ -2839,20 +2943,20 @@ func (ah *APIHandler) ListLogsPipelinesHandler(w http.ResponseWriter, r *http.Re
var apierr *model.ApiError var apierr *model.ApiError
if version != -1 { if version != -1 {
payload, apierr = ah.listLogsPipelinesByVersion(context.Background(), version) payload, apierr = aH.listLogsPipelinesByVersion(context.Background(), version)
} else { } else {
payload, apierr = ah.listLogsPipelines(context.Background()) payload, apierr = aH.listLogsPipelines(context.Background())
} }
if apierr != nil { if apierr != nil {
RespondError(w, apierr, payload) RespondError(w, apierr, payload)
return return
} }
ah.Respond(w, payload) aH.Respond(w, payload)
} }
// listLogsPipelines lists logs piplines for latest version // listLogsPipelines lists logs piplines for latest version
func (ah *APIHandler) listLogsPipelines(ctx context.Context) ( func (aH *APIHandler) listLogsPipelines(ctx context.Context) (
*logparsingpipeline.PipelinesResponse, *model.ApiError, *logparsingpipeline.PipelinesResponse, *model.ApiError,
) { ) {
// get lateset agent config // get lateset agent config
@ -2866,7 +2970,7 @@ func (ah *APIHandler) listLogsPipelines(ctx context.Context) (
latestVersion = lastestConfig.Version latestVersion = lastestConfig.Version
} }
payload, err := ah.LogsParsingPipelineController.GetPipelinesByVersion(ctx, latestVersion) payload, err := aH.LogsParsingPipelineController.GetPipelinesByVersion(ctx, latestVersion)
if err != nil { if err != nil {
return nil, model.WrapApiError(err, "failed to get pipelines") return nil, model.WrapApiError(err, "failed to get pipelines")
} }
@ -2882,10 +2986,10 @@ func (ah *APIHandler) listLogsPipelines(ctx context.Context) (
} }
// listLogsPipelinesByVersion lists pipelines along with config version history // listLogsPipelinesByVersion lists pipelines along with config version history
func (ah *APIHandler) listLogsPipelinesByVersion(ctx context.Context, version int) ( func (aH *APIHandler) listLogsPipelinesByVersion(ctx context.Context, version int) (
*logparsingpipeline.PipelinesResponse, *model.ApiError, *logparsingpipeline.PipelinesResponse, *model.ApiError,
) { ) {
payload, err := ah.LogsParsingPipelineController.GetPipelinesByVersion(ctx, version) payload, err := aH.LogsParsingPipelineController.GetPipelinesByVersion(ctx, version)
if err != nil { if err != nil {
return nil, model.WrapApiError(err, "failed to get pipelines by version") return nil, model.WrapApiError(err, "failed to get pipelines by version")
} }
@ -2901,7 +3005,7 @@ func (ah *APIHandler) listLogsPipelinesByVersion(ctx context.Context, version in
return payload, nil return payload, nil
} }
func (ah *APIHandler) CreateLogsPipeline(w http.ResponseWriter, r *http.Request) { func (aH *APIHandler) CreateLogsPipeline(w http.ResponseWriter, r *http.Request) {
req := logparsingpipeline.PostablePipelines{} req := logparsingpipeline.PostablePipelines{}
@ -2924,7 +3028,7 @@ func (ah *APIHandler) CreateLogsPipeline(w http.ResponseWriter, r *http.Request)
} }
} }
return ah.LogsParsingPipelineController.ApplyPipelines(ctx, postable) return aH.LogsParsingPipelineController.ApplyPipelines(ctx, postable)
} }
res, err := createPipeline(r.Context(), req.Pipelines) res, err := createPipeline(r.Context(), req.Pipelines)
@ -2933,7 +3037,7 @@ func (ah *APIHandler) CreateLogsPipeline(w http.ResponseWriter, r *http.Request)
return return
} }
ah.Respond(w, res) aH.Respond(w, res)
} }
func (aH *APIHandler) getSavedViews(w http.ResponseWriter, r *http.Request) { func (aH *APIHandler) getSavedViews(w http.ResponseWriter, r *http.Request) {

View File

@ -0,0 +1,197 @@
## Consumer Lag feature break down
### 1) Consumer Lag Graph
---
### 2) Consumer Group Details
API endpoint:
```
POST /api/v1/messaging-queues/kafka/consumer-lag/consumer-details
```
```json
{
"start": 1720685296000000000,
"end": 1721290096000000000,
"variables": {
"partition": "0",
"topic": "topic1",
"consumer_group": "cg1"
}
}
```
response in query range format `series`
```json
{
"status": "success",
"data": {
"resultType": "",
"result": [
{
"table": {
"columns": [
{
"name": "service_name",
"queryName": "",
"isValueColumn": false
},
{
"name": "p99",
"queryName": "",
"isValueColumn": false
},
{
"name": "error_rate",
"queryName": "",
"isValueColumn": false
},
{
"name": "throughput",
"queryName": "",
"isValueColumn": false
},
{
"name": "avg_msg_size",
"queryName": "",
"isValueColumn": false
}
],
"rows": [
{
"data": {
"avg_msg_size": "0",
"error_rate": "0",
"p99": "0.2942205100000016",
"service_name": "consumer-svc",
"throughput": "0.00016534391534391533"
}
}
]
}
}
]
}
}
```
### 3) Producer Details
API endpoint:
```
POST /api/v1/messaging-queues/kafka/consumer-lag/producer-details
```
```json
{
"start": 1720685296000000000,
"end": 1721290096000000000,
"variables": {
"partition": "0",
"topic": "topic1"
}
}
```
response in query range format `series`
```json
{
"status": "success",
"data": {
"resultType": "",
"result": [
{
"table": {
"columns": [
{
"name": "service_name",
"queryName": "",
"isValueColumn": false
},
{
"name": "p99_query.p99",
"queryName": "",
"isValueColumn": false
},
{
"name": "error_rate",
"queryName": "",
"isValueColumn": false
},
{
"name": "rps",
"queryName": "",
"isValueColumn": false
}
],
"rows": [
{
"data": {
"error_rate": "0",
"p99_query.p99": "150.08830908000002",
"rps": "0.00016534391534391533",
"service_name": "producer-svc"
}
}
]
}
}
]
}
}
```
response in query range format `table`
```json
{
"status": "success",
"data": {
"resultType": "",
"result": [
{
"table": {
"columns": [
{
"name": "service_name",
"queryName": "",
"isValueColumn": false
},
{
"name": "p99_query.p99",
"queryName": "",
"isValueColumn": false
},
{
"name": "error_rate",
"queryName": "",
"isValueColumn": false
},
{
"name": "rps",
"queryName": "",
"isValueColumn": false
}
],
"rows": [
{
"data": {
"error_rate": "0",
"p99_query.p99": "150.08830908000002",
"rps": "0.00016534391534391533",
"service_name": "producer-svc"
}
}
]
}
}
]
}
}
```

View File

@ -0,0 +1,9 @@
package kafka
const kafkaQueue = "kafka"
type MessagingQueue struct {
Start int64 `json:"start"`
End int64 `json:"end"`
Variables map[string]string `json:"variables,omitempty"`
}

View File

@ -0,0 +1,76 @@
package kafka
import (
"fmt"
)
func generateConsumerSQL(start, end int64, topic, partition, consumerGroup, queueType string) string {
timeRange := (end - start) / 1000000000
query := fmt.Sprintf(`
WITH consumer_query AS (
SELECT
serviceName,
quantile(0.99)(durationNano) / 1000000 AS p99,
COUNT(*) AS total_requests,
SUM(CASE WHEN statusCode = 2 THEN 1 ELSE 0 END) AS error_count,
avg(CASE WHEN has(numberTagMap, 'messaging.message.body.size') THEN numberTagMap['messaging.message.body.size'] ELSE NULL END) AS avg_msg_size
FROM signoz_traces.distributed_signoz_index_v2
WHERE
timestamp >= '%d'
AND timestamp <= '%d'
AND kind = 5
AND msgSystem = '%s'
AND stringTagMap['messaging.destination.name'] = '%s'
AND stringTagMap['messaging.destination.partition.id'] = '%s'
AND stringTagMap['messaging.kafka.consumer.group'] = '%s'
GROUP BY serviceName
)
-- Main query to select all metrics
SELECT
serviceName AS service_name,
p99,
COALESCE((error_count * 100.0) / total_requests, 0) AS error_rate,
COALESCE(total_requests / %d, 0) AS throughput, -- Convert nanoseconds to seconds
COALESCE(avg_msg_size, 0) AS avg_msg_size
FROM
consumer_query
ORDER BY
serviceName;
`, start, end, queueType, topic, partition, consumerGroup, timeRange)
return query
}
func generateProducerSQL(start, end int64, topic, partition, queueType string) string {
timeRange := (end - start) / 1000000000
query := fmt.Sprintf(`
WITH producer_query AS (
SELECT
serviceName,
quantile(0.99)(durationNano) / 1000000 AS p99,
count(*) AS total_count,
SUM(CASE WHEN statusCode = 2 THEN 1 ELSE 0 END) AS error_count
FROM signoz_traces.distributed_signoz_index_v2
WHERE
timestamp >= '%d'
AND timestamp <= '%d'
AND kind = 4
AND msgSystem = '%s'
AND stringTagMap['messaging.destination.name'] = '%s'
AND stringTagMap['messaging.destination.partition.id'] = '%s'
GROUP BY serviceName
)
SELECT
serviceName AS service_name,
p99,
COALESCE((error_count * 100.0) / total_count, 0) AS error_percentage,
COALESCE(total_count / %d, 0) AS rps -- Convert nanoseconds to seconds
FROM
producer_query
ORDER BY
serviceName;
`, start, end, queueType, topic, partition, timeRange)
return query
}

View File

@ -0,0 +1,74 @@
package kafka
import (
"fmt"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
)
var defaultStepInterval int64 = 60
func BuildQueryRangeParams(messagingQueue *MessagingQueue, queryContext string) (*v3.QueryRangeParamsV3, error) {
// ToDo: propagate this through APIs when there are different handlers
queueType := kafkaQueue
var cq *v3.CompositeQuery
chq, err := buildClickHouseQuery(messagingQueue, queueType, queryContext)
if err != nil {
return nil, err
}
cq, err = buildCompositeQuery(chq, queryContext)
queryRangeParams := &v3.QueryRangeParamsV3{
Start: messagingQueue.Start,
End: messagingQueue.End,
Step: defaultStepInterval,
CompositeQuery: cq,
Version: "v4",
FormatForWeb: true,
}
return queryRangeParams, nil
}
func buildClickHouseQuery(messagingQueue *MessagingQueue, queueType string, queryContext string) (*v3.ClickHouseQuery, error) {
start := messagingQueue.Start
end := messagingQueue.End
topic, ok := messagingQueue.Variables["topic"]
if !ok {
return nil, fmt.Errorf("invalid type for Topic")
}
partition, ok := messagingQueue.Variables["partition"]
if !ok {
return nil, fmt.Errorf("invalid type for Partition")
}
consumerGroup, ok := messagingQueue.Variables["consumer_group"]
if !ok {
return nil, fmt.Errorf("invalid type for consumer group")
}
var query string
if queryContext == "producer" {
query = generateProducerSQL(start, end, topic, partition, queueType)
} else if queryContext == "consumer" {
query = generateConsumerSQL(start, end, topic, partition, consumerGroup, queueType)
}
return &v3.ClickHouseQuery{
Query: query,
}, nil
}
func buildCompositeQuery(chq *v3.ClickHouseQuery, queryContext string) (*v3.CompositeQuery, error) {
return &v3.CompositeQuery{
QueryType: v3.QueryTypeClickHouseSQL,
ClickHouseQueries: map[string]*v3.ClickHouseQuery{queryContext: chq},
PanelType: v3.PanelTypeTable,
}, nil
}

View File

@ -0,0 +1,10 @@
## Integreation: Messaging Queue
This package contains the `api`, and `translation` logic to support messaging queue features.
Currently supported queues:
1) Kafka
For detailed setup, checkout our public docs for configuring:
1) Trace collection form Clients (Producer and Consumer)
2) Metrics collection from Kafka Brokers, Producers and Consumers

View File

@ -295,6 +295,7 @@ func (s *Server) createPublicServer(api *APIHandler) (*http.Server, error) {
api.RegisterIntegrationRoutes(r, am) api.RegisterIntegrationRoutes(r, am)
api.RegisterQueryRangeV3Routes(r, am) api.RegisterQueryRangeV3Routes(r, am)
api.RegisterQueryRangeV4Routes(r, am) api.RegisterQueryRangeV4Routes(r, am)
api.RegisterMessagingQueuesRoutes(r, am)
c := cors.New(cors.Options{ c := cors.New(cors.Options{
AllowedOrigins: []string{"*"}, AllowedOrigins: []string{"*"},