diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 229cb7195b..5d3d9affd5 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -2501,9 +2501,17 @@ func (aH *APIHandler) RegisterMessagingQueuesRoutes(router *mux.Router, am *Auth // for other messaging queues, add SubRouters here } +// not using md5 hashing as the plain string would work +func uniqueIdentifier(clientID, serviceInstanceID, serviceName, separator string) string { + return clientID + separator + serviceInstanceID + separator + serviceName +} + func (aH *APIHandler) getNetworkData( w http.ResponseWriter, r *http.Request, ) { + attributeCache := &mq.Clients{ + Hash: make(map[string]struct{}), + } messagingQueue, apiErr := ParseMessagingQueueBody(r) if apiErr != nil { @@ -2512,7 +2520,6 @@ func (aH *APIHandler) getNetworkData( return } - attributeCache := make([]mq.Clients, 0) queryRangeParams, err := mq.BuildQRParamsNetwork(messagingQueue, "throughput", attributeCache) if err != nil { zap.L().Error(err.Error()) @@ -2526,12 +2533,12 @@ func (aH *APIHandler) getNetworkData( } var result []*v3.Result - var errQuriesByName map[string]error + var errQueriesByName map[string]error - result, errQuriesByName, err = aH.querierV2.QueryRange(r.Context(), queryRangeParams, nil) + result, errQueriesByName, err = aH.querierV2.QueryRange(r.Context(), queryRangeParams, nil) if err != nil { apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err} - RespondError(w, apiErrObj, errQuriesByName) + RespondError(w, apiErrObj, errQueriesByName) return } @@ -2540,10 +2547,13 @@ func (aH *APIHandler) getNetworkData( clientID, clientIDOk := series.Labels["client_id"] serviceInstanceID, serviceInstanceIDOk := series.Labels["service_instance_id"] serviceName, serviceNameOk := series.Labels["service_name"] - if clientIDOk && serviceInstanceIDOk && serviceNameOk { - attributeCache = append(attributeCache, mq.Clients{ClientID: clientID, - ServiceInstanceID: serviceInstanceID, - ServiceName: serviceName}) + hashKey := uniqueIdentifier(clientID, serviceInstanceID, serviceName, "#") + _, ok := attributeCache.Hash[hashKey] + if clientIDOk && serviceInstanceIDOk && serviceNameOk && !ok { + attributeCache.Hash[hashKey] = struct{}{} + attributeCache.ClientID = append(attributeCache.ClientID, clientID) + attributeCache.ServiceInstanceID = append(attributeCache.ServiceInstanceID, serviceInstanceID) + attributeCache.ServiceName = append(attributeCache.ServiceName, serviceName) } } } @@ -2560,22 +2570,30 @@ func (aH *APIHandler) getNetworkData( return } - resultFetchLatency, errQuriesByNameFetchLatency, err := aH.querierV2.QueryRange(r.Context(), queryRangeParams, nil) + resultFetchLatency, errQueriesByNameFetchLatency, err := aH.querierV2.QueryRange(r.Context(), queryRangeParams, nil) if err != nil { apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err} - RespondError(w, apiErrObj, errQuriesByNameFetchLatency) + RespondError(w, apiErrObj, errQueriesByNameFetchLatency) return } - latencyColoumn := &v3.Result{QueryName: "latency"} + latencyColumn := &v3.Result{QueryName: "latency"} var latencySeries []*v3.Series for _, res := range resultFetchLatency { for _, series := range res.Series { - latencySeries = append(latencySeries, series) + clientID, clientIDOk := series.Labels["client_id"] + serviceInstanceID, serviceInstanceIDOk := series.Labels["service_instance_id"] + serviceName, serviceNameOk := series.Labels["service_name"] + hashKey := uniqueIdentifier(clientID, serviceInstanceID, serviceName, "#") + _, ok := attributeCache.Hash[hashKey] + if clientIDOk && serviceInstanceIDOk && serviceNameOk && ok { + latencySeries = append(latencySeries, series) + } } } - latencyColoumn.Series = latencySeries - result = append(result, latencyColoumn) + + latencyColumn.Series = latencySeries + result = append(result, latencyColumn) resultFetchLatency = postprocess.TransformToTableForBuilderQueries(result, queryRangeParams) diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md b/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md index b97cd9e69d..38b61669ff 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md @@ -158,7 +158,8 @@ Request-Body "start": 1724673937000000000, "end": 1724675737000000000, "variables": { - "consumer_group": "cg1" + "consumer_group": "cg1", + "partition": "0" } } ``` @@ -203,19 +204,19 @@ Response in query range `table` format { "data": { "client_id": "consumer-cg1-1", - "latency": 25.21, - "service_instance_id": "ccf49550-2e8f-4c7b-be29-b9e0891ef93d", + "latency": 48.99, + "service_instance_id": "b0a851d7-1735-4e3f-8f5f-7c63a8a55a24", "service_name": "consumer-svc", - "throughput": 24.91 + "throughput": 14.97 } }, { "data": { "client_id": "consumer-cg1-1", - "latency": 49.68, - "service_instance_id": "b0a851d7-1735-4e3f-8f5f-7c63a8a55a24", + "latency": 25.21, + "service_instance_id": "ccf49550-2e8f-4c7b-be29-b9e0891ef93d", "service_name": "consumer-svc", - "throughput": 14.97 + "throughput": 24.91 } } ] diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/model.go b/pkg/query-service/app/integrations/messagingQueues/kafka/model.go index cedf62eff3..f587868610 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/model.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/model.go @@ -9,7 +9,8 @@ type MessagingQueue struct { } type Clients struct { - ClientID string - ServiceInstanceID string - ServiceName string + Hash map[string]struct{} + ClientID []string + ServiceInstanceID []string + ServiceName []string } diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go b/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go index f09cf73d9b..eb06689ef3 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go @@ -74,14 +74,14 @@ ORDER BY return query } -func generateNetworkLatencyThroughputSQL(start, end int64, consumerGroup, queueType string) string { +func generateNetworkLatencyThroughputSQL(start, end int64, consumerGroup, partitionID, queueType string) string { timeRange := (end - start) / 1000000000 query := fmt.Sprintf(` SELECT stringTagMap['messaging.client_id'] AS client_id, stringTagMap['service.instance.id'] AS service_instance_id, serviceName AS service_name, - count(*) / %d AS throughput -- Convert nanoseconds to seconds + count(*) / %d AS throughput FROM signoz_traces.distributed_signoz_index_v2 WHERE timestamp >= '%d' @@ -89,8 +89,9 @@ WHERE AND kind = 5 AND msgSystem = '%s' AND stringTagMap['messaging.kafka.consumer.group'] = '%s' + AND stringTagMap['messaging.destination.partition.id'] = '%s' GROUP BY service_name, client_id, service_instance_id ORDER BY throughput DESC -`, timeRange, start, end, queueType, consumerGroup) +`, timeRange, start, end, queueType, consumerGroup, partitionID) return query } diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go b/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go index 3de489a39e..7f4266df67 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go @@ -2,6 +2,8 @@ package kafka import ( "fmt" + "strings" + "go.signoz.io/signoz/pkg/query-service/common" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" ) @@ -43,87 +45,97 @@ func buildClickHouseQueryNetwork(messagingQueue *MessagingQueue, queueType strin return nil, fmt.Errorf("consumer_group not found in the request") } - query := generateNetworkLatencyThroughputSQL(start, end, consumerGroup, queueType) + partitionID, ok := messagingQueue.Variables["partition"] + if !ok { + return nil, fmt.Errorf("partition not found in the request") + } + + query := generateNetworkLatencyThroughputSQL(start, end, consumerGroup, partitionID, queueType) return &v3.ClickHouseQuery{ Query: query, }, nil } -func buildBuilderQueriesNetwork(unixMilliStart, unixMilliEnd int64, attributeCache []Clients) (map[string]*v3.BuilderQuery, error) { - bq := make(map[string]*v3.BuilderQuery) +func formatstring(str []string) string { + joined := strings.Join(str, ", ") + if len(joined) <= 2 { + return "" + } + return joined[1 : len(joined)-1] +} - for i, instanceInfo := range attributeCache { - queryName := fmt.Sprintf("latency_%d", i) - chq := &v3.BuilderQuery{ - QueryName: queryName, - StepInterval: common.MinAllowedStepInterval(unixMilliStart, unixMilliEnd), - DataSource: v3.DataSourceMetrics, - AggregateAttribute: v3.AttributeKey{ - Key: "kafka_consumer_fetch_latency_avg", - }, - AggregateOperator: v3.AggregateOperatorAvg, - Temporality: v3.Unspecified, - TimeAggregation: v3.TimeAggregationAvg, - SpaceAggregation: v3.SpaceAggregationAvg, - Filters: &v3.FilterSet{ - Operator: "AND", - Items: []v3.FilterItem{ - { - Key: v3.AttributeKey{ - Key: "service_name", - Type: v3.AttributeKeyTypeTag, - DataType: v3.AttributeKeyDataTypeString, - }, - Operator: v3.FilterOperatorEqual, - Value: instanceInfo.ServiceName, +func buildBuilderQueriesNetwork(unixMilliStart, unixMilliEnd int64, attributeCache *Clients) (map[string]*v3.BuilderQuery, error) { + bq := make(map[string]*v3.BuilderQuery) + queryName := fmt.Sprintf("latency") + + chq := &v3.BuilderQuery{ + QueryName: queryName, + StepInterval: common.MinAllowedStepInterval(unixMilliStart, unixMilliEnd), + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: "kafka_consumer_fetch_latency_avg", + }, + AggregateOperator: v3.AggregateOperatorAvg, + Temporality: v3.Unspecified, + TimeAggregation: v3.TimeAggregationAvg, + SpaceAggregation: v3.SpaceAggregationAvg, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{ + Key: "service_name", + Type: v3.AttributeKeyTypeTag, + DataType: v3.AttributeKeyDataTypeString, }, - { - Key: v3.AttributeKey{ - Key: "client_id", - Type: v3.AttributeKeyTypeTag, - DataType: v3.AttributeKeyDataTypeString, - }, - Operator: v3.FilterOperatorEqual, - Value: instanceInfo.ClientID, + Operator: v3.FilterOperatorIn, + Value: attributeCache.ServiceName, + }, + { + Key: v3.AttributeKey{ + Key: "client_id", + Type: v3.AttributeKeyTypeTag, + DataType: v3.AttributeKeyDataTypeString, }, - { - Key: v3.AttributeKey{ - Key: "service_instance_id", - Type: v3.AttributeKeyTypeTag, - DataType: v3.AttributeKeyDataTypeString, - }, - Operator: v3.FilterOperatorEqual, - Value: instanceInfo.ServiceInstanceID, + Operator: v3.FilterOperatorIn, + Value: attributeCache.ClientID, + }, + { + Key: v3.AttributeKey{ + Key: "service_instance_id", + Type: v3.AttributeKeyTypeTag, + DataType: v3.AttributeKeyDataTypeString, }, + Operator: v3.FilterOperatorIn, + Value: attributeCache.ServiceInstanceID, }, }, - Expression: queryName, - ReduceTo: v3.ReduceToOperatorAvg, - GroupBy: []v3.AttributeKey{{ - Key: "service_name", + }, + Expression: queryName, + ReduceTo: v3.ReduceToOperatorAvg, + GroupBy: []v3.AttributeKey{{ + Key: "service_name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + { + Key: "client_id", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, }, - { - Key: "client_id", - DataType: v3.AttributeKeyDataTypeString, - Type: v3.AttributeKeyTypeTag, - }, - { - Key: "service_instance_id", - DataType: v3.AttributeKeyDataTypeString, - Type: v3.AttributeKeyTypeTag, - }, + { + Key: "service_instance_id", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, }, - } - bq[queryName] = chq + }, } - + bq[queryName] = chq return bq, nil } -func BuildQRParamsNetwork(messagingQueue *MessagingQueue, queryContext string, attributeCache []Clients) (*v3.QueryRangeParamsV3, error) { +func BuildQRParamsNetwork(messagingQueue *MessagingQueue, queryContext string, attributeCache *Clients) (*v3.QueryRangeParamsV3, error) { queueType := kafkaQueue