diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 8a6066fc4c..d51b9e2a64 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -2540,10 +2540,11 @@ func (aH *APIHandler) getNetworkData( for _, res := range result { table := res.Table for _, row := range table.Rows { - if row.Data["consumer_id"] != nil && row.Data["serviceName"] != nil { - consumerId := row.Data["consumer_id"].(string) - serviceName := row.Data["serviceName"].(string) - attributeCache = append(attributeCache, mq.Clients{ConsumerId: consumerId, ServiceName: serviceName}) + if row.Data["client_id"] != nil && row.Data["service_instance_id"] != nil && row.Data["service_name"] != nil { + clientID := row.Data["client_id"].(string) + serviceInstanceId := row.Data["service_instance_id"].(string) + ServiceName := row.Data["service_name"].(string) + attributeCache = append(attributeCache, mq.Clients{ClientID: clientID, ServiceInstanceID: serviceInstanceId, ServiceName: ServiceName}) } } } diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md b/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md index 671d250fe9..1d0f8c93d1 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md @@ -225,7 +225,7 @@ response in query range format `series` "table": { "columns": [ { - "name": "consumer_id", + "name": "client_id", "queryName": "", "isValueColumn": false }, @@ -248,7 +248,7 @@ response in query range format `series` "rows": [ { "data": { - "consumer_id": "consumer-cg1-1", + "client_id": "consumer-cg1-1", "instance_id": "e33ffd7c-827a-427a-828e-547e00cb80d8", "serviceName": "consumer-svc", "throughput": 0.00035 @@ -256,7 +256,7 @@ response in query range format `series` }, { "data": { - "consumer_id": "consumer-cg1-1", + "client_id": "consumer-cg1-1", "instance_id": "a96ff029-6f14-435a-a3d4-ab4742b4347f", "serviceName": "consumer-svc", "throughput": 0.00027 @@ -264,7 +264,7 @@ response in query range format `series` }, { "data": { - "consumer_id": "consumer-cg1-1", + "client_id": "consumer-cg1-1", "instance_id": "ac4833a8-fbe1-4592-a0ff-241f46a0851d", "serviceName": "consumer-svc-2", "throughput": 0.00019 @@ -272,7 +272,7 @@ response in query range format `series` }, { "data": { - "consumer_id": "consumer-cg1-1", + "client_id": "consumer-cg1-1", "instance_id": "9e87227f-a564-4b55-bf7c-fb00365d9400", "serviceName": "consumer-svc", "throughput": 0.00008 diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/model.go b/pkg/query-service/app/integrations/messagingQueues/kafka/model.go index 2d5a55dd5f..cedf62eff3 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/model.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/model.go @@ -9,6 +9,7 @@ type MessagingQueue struct { } type Clients struct { - ConsumerId string - ServiceName string + ClientID string + ServiceInstanceID string + ServiceName string } diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go b/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go index 894ac26895..b748109767 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go @@ -79,9 +79,9 @@ func generateNetworkLatencyThroughputSQL(start, end int64, consumerGroup, queueT query := fmt.Sprintf(` --- Subquery for RPS calculation, desc sorted by rps SELECT - stringTagMap['messaging.client_id'] AS consumer_id, - stringTagMap['service.instance.id'] AS instance_id, - serviceName, + stringTagMap['messaging.client_id'] AS client_id, + stringTagMap['service.instance.id'] AS service_instance_id, + serviceName AS service_name, count(*) / ((%d - %d) / 1000000000) AS rps -- Convert nanoseconds to seconds FROM signoz_traces.signoz_index_v2 WHERE @@ -90,7 +90,7 @@ WHERE AND kind = 5 AND msgSystem = '%s' AND stringTagMap['messaging.kafka.consumer.group'] = '%s' -GROUP BY serviceName, consumer_id, instance_id +GROUP BY service_name, client_id, service_instance_id ORDER BY rps DESC `, end, start, start, end, queueType, consumerGroup) return query diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go b/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go index d482cfa937..c6e85b1608 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go @@ -1,6 +1,7 @@ package kafka import ( + "encoding/json" "fmt" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" ) @@ -49,26 +50,98 @@ func buildClickHouseQueryNetwork(messagingQueue *MessagingQueue, queueType strin }, nil } -func buildClickHouseQueriesNetwork(messagingQueue *MessagingQueue, attributeCache []Clients) (map[string]*v3.ClickHouseQuery, error) { - cq := make(map[string]*v3.ClickHouseQuery) - start := messagingQueue.Start - end := messagingQueue.End +//func buildClickHouseQueriesNetwork(messagingQueue *MessagingQueue, attributeCache []Clients) (map[string]*v3.ClickHouseQuery, error) { +// cq := make(map[string]*v3.ClickHouseQuery) +// start := messagingQueue.Start +// end := messagingQueue.End +// +// for i, clientInfo := range attributeCache { +// query := generateNetworkLatencyFetchSQL(defaultStepInterval, start/1000000, end/1000000, clientInfo.ConsumerId, clientInfo.ServiceName) +// chq := &v3.ClickHouseQuery{ +// Query: query, +// } +// index := fmt.Sprintf("latency_%d", i) +// cq[index] = chq +// } +// +// return cq, nil +//} - for i, clientInfo := range attributeCache { - query := generateNetworkLatencyFetchSQL(defaultStepInterval, start/1000000, end/1000000, clientInfo.ConsumerId, clientInfo.ServiceName) - chq := &v3.ClickHouseQuery{ - Query: query, +func buildBuilderQueriesNetwork(attributeCache []Clients) (map[string]*v3.BuilderQuery, error) { + bq := make(map[string]*v3.BuilderQuery) + + for i, instanceInfo := range attributeCache { + queryName := fmt.Sprintf("latency_%d", i) + chq := &v3.BuilderQuery{ + QueryName: queryName, + StepInterval: defaultStepInterval, + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: "kafka_consumer_fetch_latency_avg", + }, + AggregateOperator: v3.AggregateOperatorAvg, + Temporality: v3.Unspecified, + TimeAggregation: v3.TimeAggregationAvg, + SpaceAggregation: v3.SpaceAggregationAvg, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{ + Key: "service_name", + Type: v3.AttributeKeyTypeTag, + DataType: v3.AttributeKeyDataTypeString, + }, + Operator: v3.FilterOperatorEqual, + Value: instanceInfo.ServiceName, + }, + { + Key: v3.AttributeKey{ + Key: "client_id", + Type: v3.AttributeKeyTypeTag, + DataType: v3.AttributeKeyDataTypeString, + }, + Operator: v3.FilterOperatorEqual, + Value: instanceInfo.ClientID, + }, + { + Key: v3.AttributeKey{ + Key: "service_instance_id", + Type: v3.AttributeKeyTypeTag, + DataType: v3.AttributeKeyDataTypeString, + }, + Operator: v3.FilterOperatorEqual, + Value: instanceInfo.ServiceInstanceID, + }, + }, + }, + Expression: queryName, + ReduceTo: v3.ReduceToOperatorAvg, + GroupBy: []v3.AttributeKey{{ + Key: "service_name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + { + Key: "client_id", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + { + Key: "service_instance_id", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + }, } - index := fmt.Sprintf("latency_%d", i) - cq[index] = chq + bq[queryName] = chq } - return cq, nil + return bq, nil } func BuildQRParamsNetwork(messagingQueue *MessagingQueue, queryContext string, attributeCache []Clients) (*v3.QueryRangeParamsV3, error) { - // ToDo: propagate this through APIs when there are different handlers queueType := kafkaQueue var cq *v3.CompositeQuery @@ -81,26 +154,33 @@ func BuildQRParamsNetwork(messagingQueue *MessagingQueue, queryContext string, a } cq, err = buildCompositeQuery(chq, queryContext) + } else if queryContext == "fetch-latency" { - chq, err := buildClickHouseQueriesNetwork(messagingQueue, attributeCache) + bhq, err := buildBuilderQueriesNetwork(attributeCache) if err != nil { return nil, err } cq = &v3.CompositeQuery{ - QueryType: v3.QueryTypeClickHouseSQL, - ClickHouseQueries: chq, - PanelType: v3.PanelTypeTable, + QueryType: v3.QueryTypeBuilder, + BuilderQueries: bhq, + PanelType: v3.PanelTypeTable, } } + unixMiliStart := messagingQueue.Start / 1000000 + unixMiliEnd := messagingQueue.End / 1000000 + queryRangeParams := &v3.QueryRangeParamsV3{ - Start: messagingQueue.Start, - End: messagingQueue.End, + Start: unixMiliStart, + End: unixMiliEnd, Step: defaultStepInterval, CompositeQuery: cq, Version: "v4", FormatForWeb: true, } + tmp, _ := json.Marshal(queryRangeParams) + xx := string(tmp) + fmt.Print(xx) return queryRangeParams, nil }