diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index ceb8f4e8f0..5d3d9affd5 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -2496,10 +2496,113 @@ func (aH *APIHandler) RegisterMessagingQueuesRoutes(router *mux.Router, am *Auth kafkaSubRouter.HandleFunc("/producer-details", am.ViewAccess(aH.getProducerData)).Methods(http.MethodPost) kafkaSubRouter.HandleFunc("/consumer-details", am.ViewAccess(aH.getConsumerData)).Methods(http.MethodPost) + kafkaSubRouter.HandleFunc("/network-latency", am.ViewAccess(aH.getNetworkData)).Methods(http.MethodPost) // for other messaging queues, add SubRouters here } +// not using md5 hashing as the plain string would work +func uniqueIdentifier(clientID, serviceInstanceID, serviceName, separator string) string { + return clientID + separator + serviceInstanceID + separator + serviceName +} + +func (aH *APIHandler) getNetworkData( + w http.ResponseWriter, r *http.Request, +) { + attributeCache := &mq.Clients{ + Hash: make(map[string]struct{}), + } + messagingQueue, apiErr := ParseMessagingQueueBody(r) + + if apiErr != nil { + zap.L().Error(apiErr.Err.Error()) + RespondError(w, apiErr, nil) + return + } + + queryRangeParams, err := mq.BuildQRParamsNetwork(messagingQueue, "throughput", attributeCache) + if err != nil { + zap.L().Error(err.Error()) + RespondError(w, apiErr, nil) + return + } + if err := validateQueryRangeParamsV3(queryRangeParams); err != nil { + zap.L().Error(err.Error()) + RespondError(w, apiErr, nil) + return + } + + var result []*v3.Result + var errQueriesByName map[string]error + + result, errQueriesByName, err = aH.querierV2.QueryRange(r.Context(), queryRangeParams, nil) + if err != nil { + apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err} + RespondError(w, apiErrObj, errQueriesByName) + return + } + + for _, res := range result { + for _, series := range res.Series { + clientID, clientIDOk := series.Labels["client_id"] + serviceInstanceID, serviceInstanceIDOk := series.Labels["service_instance_id"] + serviceName, serviceNameOk := series.Labels["service_name"] + hashKey := uniqueIdentifier(clientID, serviceInstanceID, serviceName, "#") + _, ok := attributeCache.Hash[hashKey] + if clientIDOk && serviceInstanceIDOk && serviceNameOk && !ok { + attributeCache.Hash[hashKey] = struct{}{} + attributeCache.ClientID = append(attributeCache.ClientID, clientID) + attributeCache.ServiceInstanceID = append(attributeCache.ServiceInstanceID, serviceInstanceID) + attributeCache.ServiceName = append(attributeCache.ServiceName, serviceName) + } + } + } + + queryRangeParams, err = mq.BuildQRParamsNetwork(messagingQueue, "fetch-latency", attributeCache) + if err != nil { + zap.L().Error(err.Error()) + RespondError(w, apiErr, nil) + return + } + if err := validateQueryRangeParamsV3(queryRangeParams); err != nil { + zap.L().Error(err.Error()) + RespondError(w, apiErr, nil) + return + } + + resultFetchLatency, errQueriesByNameFetchLatency, err := aH.querierV2.QueryRange(r.Context(), queryRangeParams, nil) + if err != nil { + apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err} + RespondError(w, apiErrObj, errQueriesByNameFetchLatency) + return + } + + latencyColumn := &v3.Result{QueryName: "latency"} + var latencySeries []*v3.Series + for _, res := range resultFetchLatency { + for _, series := range res.Series { + clientID, clientIDOk := series.Labels["client_id"] + serviceInstanceID, serviceInstanceIDOk := series.Labels["service_instance_id"] + serviceName, serviceNameOk := series.Labels["service_name"] + hashKey := uniqueIdentifier(clientID, serviceInstanceID, serviceName, "#") + _, ok := attributeCache.Hash[hashKey] + if clientIDOk && serviceInstanceIDOk && serviceNameOk && ok { + latencySeries = append(latencySeries, series) + } + } + } + + latencyColumn.Series = latencySeries + result = append(result, latencyColumn) + + resultFetchLatency = postprocess.TransformToTableForBuilderQueries(result, queryRangeParams) + + resp := v3.QueryRangeResponse{ + Result: resultFetchLatency, + } + aH.Respond(w, resp) +} + func (aH *APIHandler) getProducerData( w http.ResponseWriter, r *http.Request, ) { diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md b/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md index c34bc7ad64..38b61669ff 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md @@ -1,11 +1,6 @@ ## Consumer Lag feature break down -### 1) Consumer Lag Graph - - ---- - -### 2) Consumer Group Details +### 1) Consumer Group Details API endpoint: @@ -13,75 +8,75 @@ API endpoint: POST /api/v1/messaging-queues/kafka/consumer-lag/consumer-details ``` +Request-Body ```json { - "start": 1720685296000000000, - "end": 1721290096000000000, - "variables": { - "partition": "0", - "topic": "topic1", - "consumer_group": "cg1" - } + "start": 1724429217000000000, + "end": 1724431017000000000, + "variables": { + "partition": "0", + "topic": "topic1", + "consumer_group": "cg1" + } } ``` - -response in query range format `series` +Response in query range `table` format ```json { - "status": "success", - "data": { - "resultType": "", - "result": [ + "status": "success", + "data": { + "resultType": "", + "result": [ + { + "table": { + "columns": [ { - "table": { - "columns": [ - { - "name": "service_name", - "queryName": "", - "isValueColumn": false - }, - { - "name": "p99", - "queryName": "", - "isValueColumn": false - }, - { - "name": "error_rate", - "queryName": "", - "isValueColumn": false - }, - { - "name": "throughput", - "queryName": "", - "isValueColumn": false - }, - { - "name": "avg_msg_size", - "queryName": "", - "isValueColumn": false - } - ], - "rows": [ - { - "data": { - "avg_msg_size": "0", - "error_rate": "0", - "p99": "0.2942205100000016", - "service_name": "consumer-svc", - "throughput": "0.00016534391534391533" - } - } - ] - } + "name": "service_name", + "queryName": "", + "isValueColumn": false + }, + { + "name": "p99", + "queryName": "", + "isValueColumn": false + }, + { + "name": "error_rate", + "queryName": "", + "isValueColumn": false + }, + { + "name": "throughput", + "queryName": "", + "isValueColumn": false + }, + { + "name": "avg_msg_size", + "queryName": "", + "isValueColumn": false } - ] - } + ], + "rows": [ + { + "data": { + "avg_msg_size": "15", + "error_rate": "0", + "p99": "0.47993265000000035", + "service_name": "consumer-svc", + "throughput": "39.86888888888889" + } + } + ] + } + } + ] + } } ``` +---- - -### 3) Producer Details +### 2) Producer Details API endpoint: @@ -89,18 +84,19 @@ API endpoint: POST /api/v1/messaging-queues/kafka/consumer-lag/producer-details ``` +Request-Body ```json { - "start": 1720685296000000000, - "end": 1721290096000000000, + "start": 1724429217000000000, + "end": 1724431017000000000, "variables": { - "partition": "0", + "partition": "0", "topic": "topic1" } } ``` -response in query range format `series` +Response in query range `table` format ```json { "status": "success", @@ -116,17 +112,17 @@ response in query range format `series` "isValueColumn": false }, { - "name": "p99_query.p99", + "name": "p99", "queryName": "", "isValueColumn": false }, { - "name": "error_rate", + "name": "error_percentage", "queryName": "", "isValueColumn": false }, { - "name": "rps", + "name": "throughput", "queryName": "", "isValueColumn": false } @@ -134,56 +130,9 @@ response in query range format `series` "rows": [ { "data": { - "error_rate": "0", - "p99_query.p99": "150.08830908000002", - "rps": "0.00016534391534391533", - "service_name": "producer-svc" - } - } - ] - } - } - ] - } -} -``` -response in query range format `table` -```json -{ - "status": "success", - "data": { - "resultType": "", - "result": [ - { - "table": { - "columns": [ - { - "name": "service_name", - "queryName": "", - "isValueColumn": false - }, - { - "name": "p99_query.p99", - "queryName": "", - "isValueColumn": false - }, - { - "name": "error_rate", - "queryName": "", - "isValueColumn": false - }, - { - "name": "rps", - "queryName": "", - "isValueColumn": false - } - ], - "rows": [ - { - "data": { - "error_rate": "0", - "p99_query.p99": "150.08830908000002", - "rps": "0.00016534391534391533", + "error_percentage": "0", + "p99": "5.51359028", + "throughput": "39.86888888888889", "service_name": "producer-svc" } } @@ -195,3 +144,85 @@ response in query range format `table` } ``` +### 3) Network Fetch Latency: + +API endpoint: + +``` +POST /api/v1/messaging-queues/kafka/consumer-lag/network-latency +``` + +Request-Body +```json +{ + "start": 1724673937000000000, + "end": 1724675737000000000, + "variables": { + "consumer_group": "cg1", + "partition": "0" + } +} +``` + +Response in query range `table` format +```json +{ + "status": "success", + "data": { + "resultType": "", + "result": [ + { + "table": { + "columns": [ + { + "name": "service_name", + "queryName": "", + "isValueColumn": false + }, + { + "name": "client_id", + "queryName": "", + "isValueColumn": false + }, + { + "name": "service_instance_id", + "queryName": "", + "isValueColumn": false + }, + { + "name": "latency", + "queryName": "latency", + "isValueColumn": true + }, + { + "name": "throughput", + "queryName": "throughput", + "isValueColumn": true + } + ], + "rows": [ + { + "data": { + "client_id": "consumer-cg1-1", + "latency": 48.99, + "service_instance_id": "b0a851d7-1735-4e3f-8f5f-7c63a8a55a24", + "service_name": "consumer-svc", + "throughput": 14.97 + } + }, + { + "data": { + "client_id": "consumer-cg1-1", + "latency": 25.21, + "service_instance_id": "ccf49550-2e8f-4c7b-be29-b9e0891ef93d", + "service_name": "consumer-svc", + "throughput": 24.91 + } + } + ] + } + } + ] + } +} +``` diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/model.go b/pkg/query-service/app/integrations/messagingQueues/kafka/model.go index b24734cf48..f587868610 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/model.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/model.go @@ -7,3 +7,10 @@ type MessagingQueue struct { End int64 `json:"end"` Variables map[string]string `json:"variables,omitempty"` } + +type Clients struct { + Hash map[string]struct{} + ClientID []string + ServiceInstanceID []string + ServiceName []string +} diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go b/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go index e06e35efde..eb06689ef3 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go @@ -26,7 +26,6 @@ WITH consumer_query AS ( GROUP BY serviceName ) --- Main query to select all metrics SELECT serviceName AS service_name, p99, @@ -65,7 +64,7 @@ SELECT serviceName AS service_name, p99, COALESCE((error_count * 100.0) / total_count, 0) AS error_percentage, - COALESCE(total_count / %d, 0) AS rps -- Convert nanoseconds to seconds + COALESCE(total_count / %d, 0) AS throughput -- Convert nanoseconds to seconds FROM producer_query ORDER BY @@ -74,3 +73,25 @@ ORDER BY `, start, end, queueType, topic, partition, timeRange) return query } + +func generateNetworkLatencyThroughputSQL(start, end int64, consumerGroup, partitionID, queueType string) string { + timeRange := (end - start) / 1000000000 + query := fmt.Sprintf(` +SELECT + stringTagMap['messaging.client_id'] AS client_id, + stringTagMap['service.instance.id'] AS service_instance_id, + serviceName AS service_name, + count(*) / %d AS throughput +FROM signoz_traces.distributed_signoz_index_v2 +WHERE + timestamp >= '%d' + AND timestamp <= '%d' + AND kind = 5 + AND msgSystem = '%s' + AND stringTagMap['messaging.kafka.consumer.group'] = '%s' + AND stringTagMap['messaging.destination.partition.id'] = '%s' +GROUP BY service_name, client_id, service_instance_id +ORDER BY throughput DESC +`, timeRange, start, end, queueType, consumerGroup, partitionID) + return query +} diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go b/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go index 6b5d355caa..7f4266df67 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go @@ -2,7 +2,9 @@ package kafka import ( "fmt" + "strings" + "go.signoz.io/signoz/pkg/query-service/common" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" ) @@ -35,6 +37,146 @@ func BuildQueryRangeParams(messagingQueue *MessagingQueue, queryContext string) return queryRangeParams, nil } +func buildClickHouseQueryNetwork(messagingQueue *MessagingQueue, queueType string) (*v3.ClickHouseQuery, error) { + start := messagingQueue.Start + end := messagingQueue.End + consumerGroup, ok := messagingQueue.Variables["consumer_group"] + if !ok { + return nil, fmt.Errorf("consumer_group not found in the request") + } + + partitionID, ok := messagingQueue.Variables["partition"] + if !ok { + return nil, fmt.Errorf("partition not found in the request") + } + + query := generateNetworkLatencyThroughputSQL(start, end, consumerGroup, partitionID, queueType) + + return &v3.ClickHouseQuery{ + Query: query, + }, nil +} + +func formatstring(str []string) string { + joined := strings.Join(str, ", ") + if len(joined) <= 2 { + return "" + } + return joined[1 : len(joined)-1] +} + +func buildBuilderQueriesNetwork(unixMilliStart, unixMilliEnd int64, attributeCache *Clients) (map[string]*v3.BuilderQuery, error) { + bq := make(map[string]*v3.BuilderQuery) + queryName := fmt.Sprintf("latency") + + chq := &v3.BuilderQuery{ + QueryName: queryName, + StepInterval: common.MinAllowedStepInterval(unixMilliStart, unixMilliEnd), + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: "kafka_consumer_fetch_latency_avg", + }, + AggregateOperator: v3.AggregateOperatorAvg, + Temporality: v3.Unspecified, + TimeAggregation: v3.TimeAggregationAvg, + SpaceAggregation: v3.SpaceAggregationAvg, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{ + Key: "service_name", + Type: v3.AttributeKeyTypeTag, + DataType: v3.AttributeKeyDataTypeString, + }, + Operator: v3.FilterOperatorIn, + Value: attributeCache.ServiceName, + }, + { + Key: v3.AttributeKey{ + Key: "client_id", + Type: v3.AttributeKeyTypeTag, + DataType: v3.AttributeKeyDataTypeString, + }, + Operator: v3.FilterOperatorIn, + Value: attributeCache.ClientID, + }, + { + Key: v3.AttributeKey{ + Key: "service_instance_id", + Type: v3.AttributeKeyTypeTag, + DataType: v3.AttributeKeyDataTypeString, + }, + Operator: v3.FilterOperatorIn, + Value: attributeCache.ServiceInstanceID, + }, + }, + }, + Expression: queryName, + ReduceTo: v3.ReduceToOperatorAvg, + GroupBy: []v3.AttributeKey{{ + Key: "service_name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + { + Key: "client_id", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + { + Key: "service_instance_id", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + }, + } + bq[queryName] = chq + return bq, nil +} + +func BuildQRParamsNetwork(messagingQueue *MessagingQueue, queryContext string, attributeCache *Clients) (*v3.QueryRangeParamsV3, error) { + + queueType := kafkaQueue + + unixMilliStart := messagingQueue.Start / 1000000 + unixMilliEnd := messagingQueue.End / 1000000 + + var cq *v3.CompositeQuery + + if queryContext == "throughput" { + chq, err := buildClickHouseQueryNetwork(messagingQueue, queueType) + + if err != nil { + return nil, err + } + + cq, err = buildCompositeQuery(chq, queryContext) + + } else if queryContext == "fetch-latency" { + bhq, err := buildBuilderQueriesNetwork(unixMilliStart, unixMilliEnd, attributeCache) + if err != nil { + return nil, err + } + cq = &v3.CompositeQuery{ + QueryType: v3.QueryTypeBuilder, + BuilderQueries: bhq, + PanelType: v3.PanelTypeTable, + } + } + + queryRangeParams := &v3.QueryRangeParamsV3{ + Start: unixMilliStart, + End: unixMilliEnd, + Step: defaultStepInterval, + CompositeQuery: cq, + Version: "v4", + FormatForWeb: true, + } + + return queryRangeParams, nil +} + func buildClickHouseQuery(messagingQueue *MessagingQueue, queueType string, queryContext string) (*v3.ClickHouseQuery, error) { start := messagingQueue.Start end := messagingQueue.End diff --git a/pkg/query-service/utils/format.go b/pkg/query-service/utils/format.go index bd7db15e6b..4de081940d 100644 --- a/pkg/query-service/utils/format.go +++ b/pkg/query-service/utils/format.go @@ -190,6 +190,19 @@ func ClickHouseFormattedValue(v interface{}) string { zap.L().Error("invalid type for formatted value", zap.Any("type", reflect.TypeOf(x[0]))) return "[]" } + case []string: + if len(x) == 0 { + return "[]" + } + str := "[" + for idx, sVal := range x { + str += fmt.Sprintf("'%s'", QuoteEscapedString(sVal)) + if idx != len(x)-1 { + str += "," + } + } + str += "]" + return str default: zap.L().Error("invalid type for formatted value", zap.Any("type", reflect.TypeOf(x))) return ""