From 5cdcbef00cb01c8c3b60073409072a332ddf7db3 Mon Sep 17 00:00:00 2001 From: shivanshu Date: Wed, 7 Aug 2024 13:51:00 +0530 Subject: [PATCH 1/8] feat: add network latency for kafka --- pkg/query-service/app/http_handler.go | 76 ++++++ .../messagingQueues/kafka/consumerLag.md | 228 ++++++++++++++++++ .../messagingQueues/kafka/model.go | 5 + .../integrations/messagingQueues/kafka/sql.go | 64 +++++ .../messagingQueues/kafka/translator.go | 72 +++++- 5 files changed, 444 insertions(+), 1 deletion(-) diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index ceb8f4e8f0..8a6066fc4c 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -2496,10 +2496,86 @@ func (aH *APIHandler) RegisterMessagingQueuesRoutes(router *mux.Router, am *Auth kafkaSubRouter.HandleFunc("/producer-details", am.ViewAccess(aH.getProducerData)).Methods(http.MethodPost) kafkaSubRouter.HandleFunc("/consumer-details", am.ViewAccess(aH.getConsumerData)).Methods(http.MethodPost) + kafkaSubRouter.HandleFunc("/network-latency", am.ViewAccess(aH.getNetworkData)).Methods(http.MethodPost) // for other messaging queues, add SubRouters here } +func (aH *APIHandler) getNetworkData( + w http.ResponseWriter, r *http.Request, +) { + messagingQueue, apiErr := ParseMessagingQueueBody(r) + + if apiErr != nil { + zap.L().Error(apiErr.Err.Error()) + RespondError(w, apiErr, nil) + return + } + + attributeCache := make([]mq.Clients, 0) + queryRangeParams, err := mq.BuildQRParamsNetwork(messagingQueue, "throughput", attributeCache) + if err != nil { + zap.L().Error(err.Error()) + RespondError(w, apiErr, nil) + return + } + if err := validateQueryRangeParamsV3(queryRangeParams); err != nil { + zap.L().Error(err.Error()) + RespondError(w, apiErr, nil) + return + } + + var result []*v3.Result + var errQuriesByName map[string]error + + result, errQuriesByName, err = aH.querierV2.QueryRange(r.Context(), queryRangeParams, nil) + if err != nil { + apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err} + RespondError(w, apiErrObj, errQuriesByName) + return + } + result = postprocess.TransformToTableForClickHouseQueries(result) + + // iterate over the result and extract messaging.client_id + for _, res := range result { + table := res.Table + for _, row := range table.Rows { + if row.Data["consumer_id"] != nil && row.Data["serviceName"] != nil { + consumerId := row.Data["consumer_id"].(string) + serviceName := row.Data["serviceName"].(string) + attributeCache = append(attributeCache, mq.Clients{ConsumerId: consumerId, ServiceName: serviceName}) + } + } + } + + queryRangeParams, err = mq.BuildQRParamsNetwork(messagingQueue, "fetch-latency", attributeCache) + if err != nil { + zap.L().Error(err.Error()) + RespondError(w, apiErr, nil) + return + } + if err := validateQueryRangeParamsV3(queryRangeParams); err != nil { + zap.L().Error(err.Error()) + RespondError(w, apiErr, nil) + return + } + + resultFetchLatency, errQuriesByNameFetchLatency, err := aH.querierV2.QueryRange(r.Context(), queryRangeParams, nil) + if err != nil { + apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err} + RespondError(w, apiErrObj, errQuriesByNameFetchLatency) + return + } + resultFetchLatency = postprocess.TransformToTableForClickHouseQueries(resultFetchLatency) + + result = append(result, resultFetchLatency...) + + resp := v3.QueryRangeResponse{ + Result: result, + } + aH.Respond(w, resp) +} + func (aH *APIHandler) getProducerData( w http.ResponseWriter, r *http.Request, ) { diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md b/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md index c34bc7ad64..671d250fe9 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md @@ -195,3 +195,231 @@ response in query range format `table` } ``` +### 4) Network Fetch Latency: + + +API endpoint: + +``` +POST /api/v1/messaging-queues/kafka/consumer-lag/network-latency +``` + +```json +{ + "start": 1721174400000000000, + "end": 1722470400000000000, + "variables": { + "consumer_group": "cg1" + } +} +``` + +response in query range format `series` +```json +{ + "status": "success", + "data": { + "resultType": "", + "result": [ + { + "table": { + "columns": [ + { + "name": "consumer_id", + "queryName": "", + "isValueColumn": false + }, + { + "name": "instance_id", + "queryName": "", + "isValueColumn": false + }, + { + "name": "serviceName", + "queryName": "", + "isValueColumn": false + }, + { + "name": "throughput", + "queryName": "throughput", + "isValueColumn": true + } + ], + "rows": [ + { + "data": { + "consumer_id": "consumer-cg1-1", + "instance_id": "e33ffd7c-827a-427a-828e-547e00cb80d8", + "serviceName": "consumer-svc", + "throughput": 0.00035 + } + }, + { + "data": { + "consumer_id": "consumer-cg1-1", + "instance_id": "a96ff029-6f14-435a-a3d4-ab4742b4347f", + "serviceName": "consumer-svc", + "throughput": 0.00027 + } + }, + { + "data": { + "consumer_id": "consumer-cg1-1", + "instance_id": "ac4833a8-fbe1-4592-a0ff-241f46a0851d", + "serviceName": "consumer-svc-2", + "throughput": 0.00019 + } + }, + { + "data": { + "consumer_id": "consumer-cg1-1", + "instance_id": "9e87227f-a564-4b55-bf7c-fb00365d9400", + "serviceName": "consumer-svc", + "throughput": 0.00008 + } + } + ] + } + }, + { + "table": { + "columns": [ + { + "name": "service_name", + "queryName": "", + "isValueColumn": false + }, + { + "name": "service_instance_id", + "queryName": "", + "isValueColumn": false + }, + { + "name": "latency_0", + "queryName": "latency_0", + "isValueColumn": true + }, + { + "name": "latency_1", + "queryName": "latency_1", + "isValueColumn": true + }, + { + "name": "latency_2", + "queryName": "latency_2", + "isValueColumn": true + }, + { + "name": "latency_3", + "queryName": "latency_3", + "isValueColumn": true + } + ], + "rows": [ + { + "data": { + "latency_0": 3230.1, + "latency_1": "n/a", + "latency_2": "n/a", + "latency_3": "n/a", + "service_instance_id": "a96ff029-6f14-435a-a3d4-ab4742b4347f", + "service_name": "consumer-svc" + } + }, + { + "data": { + "latency_0": 503, + "latency_1": "n/a", + "latency_2": "n/a", + "latency_3": "n/a", + "service_instance_id": "e33ffd7c-827a-427a-828e-547e00cb80d8", + "service_name": "consumer-svc" + } + }, + { + "data": { + "latency_0": 502.62, + "latency_1": "n/a", + "latency_2": "n/a", + "latency_3": "n/a", + "service_instance_id": "9e87227f-a564-4b55-bf7c-fb00365d9400", + "service_name": "consumer-svc" + } + }, + { + "data": { + "latency_0": "n/a", + "latency_1": 3230.1, + "latency_2": "n/a", + "latency_3": "n/a", + "service_instance_id": "a96ff029-6f14-435a-a3d4-ab4742b4347f", + "service_name": "consumer-svc" + } + }, + { + "data": { + "latency_0": "n/a", + "latency_1": 503, + "latency_2": "n/a", + "latency_3": "n/a", + "service_instance_id": "e33ffd7c-827a-427a-828e-547e00cb80d8", + "service_name": "consumer-svc" + } + }, + { + "data": { + "latency_0": "n/a", + "latency_1": 502.62, + "latency_2": "n/a", + "latency_3": "n/a", + "service_instance_id": "9e87227f-a564-4b55-bf7c-fb00365d9400", + "service_name": "consumer-svc" + } + }, + { + "data": { + "latency_0": "n/a", + "latency_1": "n/a", + "latency_2": 502.81, + "latency_3": "n/a", + "service_instance_id": "ac4833a8-fbe1-4592-a0ff-241f46a0851d", + "service_name": "consumer-svc-2" + } + }, + { + "data": { + "latency_0": "n/a", + "latency_1": "n/a", + "latency_2": "n/a", + "latency_3": 3230.1, + "service_instance_id": "a96ff029-6f14-435a-a3d4-ab4742b4347f", + "service_name": "consumer-svc" + } + }, + { + "data": { + "latency_0": "n/a", + "latency_1": "n/a", + "latency_2": "n/a", + "latency_3": 503, + "service_instance_id": "e33ffd7c-827a-427a-828e-547e00cb80d8", + "service_name": "consumer-svc" + } + }, + { + "data": { + "latency_0": "n/a", + "latency_1": "n/a", + "latency_2": "n/a", + "latency_3": 502.62, + "service_instance_id": "9e87227f-a564-4b55-bf7c-fb00365d9400", + "service_name": "consumer-svc" + } + } + ] + } + } + ] + } +} +``` diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/model.go b/pkg/query-service/app/integrations/messagingQueues/kafka/model.go index b24734cf48..2d5a55dd5f 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/model.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/model.go @@ -7,3 +7,8 @@ type MessagingQueue struct { End int64 `json:"end"` Variables map[string]string `json:"variables,omitempty"` } + +type Clients struct { + ConsumerId string + ServiceName string +} diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go b/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go index e06e35efde..894ac26895 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go @@ -74,3 +74,67 @@ ORDER BY `, start, end, queueType, topic, partition, timeRange) return query } + +func generateNetworkLatencyThroughputSQL(start, end int64, consumerGroup, queueType string) string { + query := fmt.Sprintf(` +--- Subquery for RPS calculation, desc sorted by rps +SELECT + stringTagMap['messaging.client_id'] AS consumer_id, + stringTagMap['service.instance.id'] AS instance_id, + serviceName, + count(*) / ((%d - %d) / 1000000000) AS rps -- Convert nanoseconds to seconds +FROM signoz_traces.signoz_index_v2 +WHERE + timestamp >= '%d' + AND timestamp <= '%d' + AND kind = 5 + AND msgSystem = '%s' + AND stringTagMap['messaging.kafka.consumer.group'] = '%s' +GROUP BY serviceName, consumer_id, instance_id +ORDER BY rps DESC +`, end, start, start, end, queueType, consumerGroup) + return query +} + +func generateNetworkLatencyFetchSQL(step, start, end int64, clientId, serviceName string) string { + query := fmt.Sprintf(` +--- metrics aggregation, desc sorted by value +WITH filtered_time_series AS ( + SELECT DISTINCT + JSONExtractString(labels, 'service_instance_id') as service_instance_id, + JSONExtractString(labels, 'service_name') as service_name, + fingerprint + FROM signoz_metrics.time_series_v4_1day + WHERE metric_name = 'kafka_consumer_fetch_latency_avg' + AND temporality = 'Unspecified' + AND unix_milli >= '%d' + AND unix_milli < '%d' + AND JSONExtractString(labels, 'service_name') = '%s' + AND JSONExtractString(labels, 'client_id') = '%s' +), +aggregated_data AS ( + SELECT + fingerprint, + any(service_instance_id) as service_instance_id, + any(service_name) as service_name, + toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL '%d' SECOND) as ts, + avg(value) as per_series_value + FROM signoz_metrics.distributed_samples_v4 + INNER JOIN filtered_time_series USING fingerprint + WHERE metric_name = 'kafka_consumer_fetch_latency_avg' + AND unix_milli >= '%d' + AND unix_milli < '%d' + GROUP BY fingerprint, ts + ORDER BY fingerprint, ts +) +SELECT + service_name, + service_instance_id, + avg(per_series_value) as value +FROM aggregated_data +WHERE isNaN(per_series_value) = 0 +GROUP BY service_name, service_instance_id +ORDER BY value DESC +`, start, end, serviceName, clientId, step, start, end) + return query +} diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go b/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go index 6b5d355caa..d482cfa937 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go @@ -2,7 +2,6 @@ package kafka import ( "fmt" - v3 "go.signoz.io/signoz/pkg/query-service/model/v3" ) @@ -35,6 +34,77 @@ func BuildQueryRangeParams(messagingQueue *MessagingQueue, queryContext string) return queryRangeParams, nil } +func buildClickHouseQueryNetwork(messagingQueue *MessagingQueue, queueType string) (*v3.ClickHouseQuery, error) { + start := messagingQueue.Start + end := messagingQueue.End + consumerGroup, ok := messagingQueue.Variables["consumer_group"] + if !ok { + return nil, fmt.Errorf("consumer_group not found in the request") + } + + query := generateNetworkLatencyThroughputSQL(start, end, consumerGroup, queueType) + + return &v3.ClickHouseQuery{ + Query: query, + }, nil +} + +func buildClickHouseQueriesNetwork(messagingQueue *MessagingQueue, attributeCache []Clients) (map[string]*v3.ClickHouseQuery, error) { + cq := make(map[string]*v3.ClickHouseQuery) + start := messagingQueue.Start + end := messagingQueue.End + + for i, clientInfo := range attributeCache { + query := generateNetworkLatencyFetchSQL(defaultStepInterval, start/1000000, end/1000000, clientInfo.ConsumerId, clientInfo.ServiceName) + chq := &v3.ClickHouseQuery{ + Query: query, + } + index := fmt.Sprintf("latency_%d", i) + cq[index] = chq + } + + return cq, nil +} + +func BuildQRParamsNetwork(messagingQueue *MessagingQueue, queryContext string, attributeCache []Clients) (*v3.QueryRangeParamsV3, error) { + + // ToDo: propagate this through APIs when there are different handlers + queueType := kafkaQueue + + var cq *v3.CompositeQuery + + if queryContext == "throughput" { + chq, err := buildClickHouseQueryNetwork(messagingQueue, queueType) + + if err != nil { + return nil, err + } + + cq, err = buildCompositeQuery(chq, queryContext) + } else if queryContext == "fetch-latency" { + chq, err := buildClickHouseQueriesNetwork(messagingQueue, attributeCache) + if err != nil { + return nil, err + } + cq = &v3.CompositeQuery{ + QueryType: v3.QueryTypeClickHouseSQL, + ClickHouseQueries: chq, + PanelType: v3.PanelTypeTable, + } + } + + queryRangeParams := &v3.QueryRangeParamsV3{ + Start: messagingQueue.Start, + End: messagingQueue.End, + Step: defaultStepInterval, + CompositeQuery: cq, + Version: "v4", + FormatForWeb: true, + } + + return queryRangeParams, nil +} + func buildClickHouseQuery(messagingQueue *MessagingQueue, queueType string, queryContext string) (*v3.ClickHouseQuery, error) { start := messagingQueue.Start end := messagingQueue.End From 666f601ecd1da3a04db2e2875537b321be985db5 Mon Sep 17 00:00:00 2001 From: shivanshu Date: Mon, 26 Aug 2024 17:56:03 +0530 Subject: [PATCH 2/8] feat: change to builder queries --- pkg/query-service/app/http_handler.go | 9 +- .../messagingQueues/kafka/consumerLag.md | 10 +- .../messagingQueues/kafka/model.go | 5 +- .../integrations/messagingQueues/kafka/sql.go | 8 +- .../messagingQueues/kafka/translator.go | 116 +++++++++++++++--- 5 files changed, 115 insertions(+), 33 deletions(-) diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 8a6066fc4c..d51b9e2a64 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -2540,10 +2540,11 @@ func (aH *APIHandler) getNetworkData( for _, res := range result { table := res.Table for _, row := range table.Rows { - if row.Data["consumer_id"] != nil && row.Data["serviceName"] != nil { - consumerId := row.Data["consumer_id"].(string) - serviceName := row.Data["serviceName"].(string) - attributeCache = append(attributeCache, mq.Clients{ConsumerId: consumerId, ServiceName: serviceName}) + if row.Data["client_id"] != nil && row.Data["service_instance_id"] != nil && row.Data["service_name"] != nil { + clientID := row.Data["client_id"].(string) + serviceInstanceId := row.Data["service_instance_id"].(string) + ServiceName := row.Data["service_name"].(string) + attributeCache = append(attributeCache, mq.Clients{ClientID: clientID, ServiceInstanceID: serviceInstanceId, ServiceName: ServiceName}) } } } diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md b/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md index 671d250fe9..1d0f8c93d1 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md @@ -225,7 +225,7 @@ response in query range format `series` "table": { "columns": [ { - "name": "consumer_id", + "name": "client_id", "queryName": "", "isValueColumn": false }, @@ -248,7 +248,7 @@ response in query range format `series` "rows": [ { "data": { - "consumer_id": "consumer-cg1-1", + "client_id": "consumer-cg1-1", "instance_id": "e33ffd7c-827a-427a-828e-547e00cb80d8", "serviceName": "consumer-svc", "throughput": 0.00035 @@ -256,7 +256,7 @@ response in query range format `series` }, { "data": { - "consumer_id": "consumer-cg1-1", + "client_id": "consumer-cg1-1", "instance_id": "a96ff029-6f14-435a-a3d4-ab4742b4347f", "serviceName": "consumer-svc", "throughput": 0.00027 @@ -264,7 +264,7 @@ response in query range format `series` }, { "data": { - "consumer_id": "consumer-cg1-1", + "client_id": "consumer-cg1-1", "instance_id": "ac4833a8-fbe1-4592-a0ff-241f46a0851d", "serviceName": "consumer-svc-2", "throughput": 0.00019 @@ -272,7 +272,7 @@ response in query range format `series` }, { "data": { - "consumer_id": "consumer-cg1-1", + "client_id": "consumer-cg1-1", "instance_id": "9e87227f-a564-4b55-bf7c-fb00365d9400", "serviceName": "consumer-svc", "throughput": 0.00008 diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/model.go b/pkg/query-service/app/integrations/messagingQueues/kafka/model.go index 2d5a55dd5f..cedf62eff3 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/model.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/model.go @@ -9,6 +9,7 @@ type MessagingQueue struct { } type Clients struct { - ConsumerId string - ServiceName string + ClientID string + ServiceInstanceID string + ServiceName string } diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go b/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go index 894ac26895..b748109767 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go @@ -79,9 +79,9 @@ func generateNetworkLatencyThroughputSQL(start, end int64, consumerGroup, queueT query := fmt.Sprintf(` --- Subquery for RPS calculation, desc sorted by rps SELECT - stringTagMap['messaging.client_id'] AS consumer_id, - stringTagMap['service.instance.id'] AS instance_id, - serviceName, + stringTagMap['messaging.client_id'] AS client_id, + stringTagMap['service.instance.id'] AS service_instance_id, + serviceName AS service_name, count(*) / ((%d - %d) / 1000000000) AS rps -- Convert nanoseconds to seconds FROM signoz_traces.signoz_index_v2 WHERE @@ -90,7 +90,7 @@ WHERE AND kind = 5 AND msgSystem = '%s' AND stringTagMap['messaging.kafka.consumer.group'] = '%s' -GROUP BY serviceName, consumer_id, instance_id +GROUP BY service_name, client_id, service_instance_id ORDER BY rps DESC `, end, start, start, end, queueType, consumerGroup) return query diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go b/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go index d482cfa937..c6e85b1608 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go @@ -1,6 +1,7 @@ package kafka import ( + "encoding/json" "fmt" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" ) @@ -49,26 +50,98 @@ func buildClickHouseQueryNetwork(messagingQueue *MessagingQueue, queueType strin }, nil } -func buildClickHouseQueriesNetwork(messagingQueue *MessagingQueue, attributeCache []Clients) (map[string]*v3.ClickHouseQuery, error) { - cq := make(map[string]*v3.ClickHouseQuery) - start := messagingQueue.Start - end := messagingQueue.End +//func buildClickHouseQueriesNetwork(messagingQueue *MessagingQueue, attributeCache []Clients) (map[string]*v3.ClickHouseQuery, error) { +// cq := make(map[string]*v3.ClickHouseQuery) +// start := messagingQueue.Start +// end := messagingQueue.End +// +// for i, clientInfo := range attributeCache { +// query := generateNetworkLatencyFetchSQL(defaultStepInterval, start/1000000, end/1000000, clientInfo.ConsumerId, clientInfo.ServiceName) +// chq := &v3.ClickHouseQuery{ +// Query: query, +// } +// index := fmt.Sprintf("latency_%d", i) +// cq[index] = chq +// } +// +// return cq, nil +//} - for i, clientInfo := range attributeCache { - query := generateNetworkLatencyFetchSQL(defaultStepInterval, start/1000000, end/1000000, clientInfo.ConsumerId, clientInfo.ServiceName) - chq := &v3.ClickHouseQuery{ - Query: query, +func buildBuilderQueriesNetwork(attributeCache []Clients) (map[string]*v3.BuilderQuery, error) { + bq := make(map[string]*v3.BuilderQuery) + + for i, instanceInfo := range attributeCache { + queryName := fmt.Sprintf("latency_%d", i) + chq := &v3.BuilderQuery{ + QueryName: queryName, + StepInterval: defaultStepInterval, + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: "kafka_consumer_fetch_latency_avg", + }, + AggregateOperator: v3.AggregateOperatorAvg, + Temporality: v3.Unspecified, + TimeAggregation: v3.TimeAggregationAvg, + SpaceAggregation: v3.SpaceAggregationAvg, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{ + Key: "service_name", + Type: v3.AttributeKeyTypeTag, + DataType: v3.AttributeKeyDataTypeString, + }, + Operator: v3.FilterOperatorEqual, + Value: instanceInfo.ServiceName, + }, + { + Key: v3.AttributeKey{ + Key: "client_id", + Type: v3.AttributeKeyTypeTag, + DataType: v3.AttributeKeyDataTypeString, + }, + Operator: v3.FilterOperatorEqual, + Value: instanceInfo.ClientID, + }, + { + Key: v3.AttributeKey{ + Key: "service_instance_id", + Type: v3.AttributeKeyTypeTag, + DataType: v3.AttributeKeyDataTypeString, + }, + Operator: v3.FilterOperatorEqual, + Value: instanceInfo.ServiceInstanceID, + }, + }, + }, + Expression: queryName, + ReduceTo: v3.ReduceToOperatorAvg, + GroupBy: []v3.AttributeKey{{ + Key: "service_name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + { + Key: "client_id", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + { + Key: "service_instance_id", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + }, } - index := fmt.Sprintf("latency_%d", i) - cq[index] = chq + bq[queryName] = chq } - return cq, nil + return bq, nil } func BuildQRParamsNetwork(messagingQueue *MessagingQueue, queryContext string, attributeCache []Clients) (*v3.QueryRangeParamsV3, error) { - // ToDo: propagate this through APIs when there are different handlers queueType := kafkaQueue var cq *v3.CompositeQuery @@ -81,26 +154,33 @@ func BuildQRParamsNetwork(messagingQueue *MessagingQueue, queryContext string, a } cq, err = buildCompositeQuery(chq, queryContext) + } else if queryContext == "fetch-latency" { - chq, err := buildClickHouseQueriesNetwork(messagingQueue, attributeCache) + bhq, err := buildBuilderQueriesNetwork(attributeCache) if err != nil { return nil, err } cq = &v3.CompositeQuery{ - QueryType: v3.QueryTypeClickHouseSQL, - ClickHouseQueries: chq, - PanelType: v3.PanelTypeTable, + QueryType: v3.QueryTypeBuilder, + BuilderQueries: bhq, + PanelType: v3.PanelTypeTable, } } + unixMiliStart := messagingQueue.Start / 1000000 + unixMiliEnd := messagingQueue.End / 1000000 + queryRangeParams := &v3.QueryRangeParamsV3{ - Start: messagingQueue.Start, - End: messagingQueue.End, + Start: unixMiliStart, + End: unixMiliEnd, Step: defaultStepInterval, CompositeQuery: cq, Version: "v4", FormatForWeb: true, } + tmp, _ := json.Marshal(queryRangeParams) + xx := string(tmp) + fmt.Print(xx) return queryRangeParams, nil } From 413caad0d8628f61c2194d430aae8e57624b6f72 Mon Sep 17 00:00:00 2001 From: shivanshu Date: Mon, 26 Aug 2024 17:58:55 +0530 Subject: [PATCH 3/8] chore: cleanup --- .../integrations/messagingQueues/kafka/sql.go | 43 ------------------- .../messagingQueues/kafka/translator.go | 17 -------- 2 files changed, 60 deletions(-) diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go b/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go index b748109767..49e50a3d97 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go @@ -95,46 +95,3 @@ ORDER BY rps DESC `, end, start, start, end, queueType, consumerGroup) return query } - -func generateNetworkLatencyFetchSQL(step, start, end int64, clientId, serviceName string) string { - query := fmt.Sprintf(` ---- metrics aggregation, desc sorted by value -WITH filtered_time_series AS ( - SELECT DISTINCT - JSONExtractString(labels, 'service_instance_id') as service_instance_id, - JSONExtractString(labels, 'service_name') as service_name, - fingerprint - FROM signoz_metrics.time_series_v4_1day - WHERE metric_name = 'kafka_consumer_fetch_latency_avg' - AND temporality = 'Unspecified' - AND unix_milli >= '%d' - AND unix_milli < '%d' - AND JSONExtractString(labels, 'service_name') = '%s' - AND JSONExtractString(labels, 'client_id') = '%s' -), -aggregated_data AS ( - SELECT - fingerprint, - any(service_instance_id) as service_instance_id, - any(service_name) as service_name, - toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL '%d' SECOND) as ts, - avg(value) as per_series_value - FROM signoz_metrics.distributed_samples_v4 - INNER JOIN filtered_time_series USING fingerprint - WHERE metric_name = 'kafka_consumer_fetch_latency_avg' - AND unix_milli >= '%d' - AND unix_milli < '%d' - GROUP BY fingerprint, ts - ORDER BY fingerprint, ts -) -SELECT - service_name, - service_instance_id, - avg(per_series_value) as value -FROM aggregated_data -WHERE isNaN(per_series_value) = 0 -GROUP BY service_name, service_instance_id -ORDER BY value DESC -`, start, end, serviceName, clientId, step, start, end) - return query -} diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go b/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go index c6e85b1608..ef7e7049ba 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go @@ -50,23 +50,6 @@ func buildClickHouseQueryNetwork(messagingQueue *MessagingQueue, queueType strin }, nil } -//func buildClickHouseQueriesNetwork(messagingQueue *MessagingQueue, attributeCache []Clients) (map[string]*v3.ClickHouseQuery, error) { -// cq := make(map[string]*v3.ClickHouseQuery) -// start := messagingQueue.Start -// end := messagingQueue.End -// -// for i, clientInfo := range attributeCache { -// query := generateNetworkLatencyFetchSQL(defaultStepInterval, start/1000000, end/1000000, clientInfo.ConsumerId, clientInfo.ServiceName) -// chq := &v3.ClickHouseQuery{ -// Query: query, -// } -// index := fmt.Sprintf("latency_%d", i) -// cq[index] = chq -// } -// -// return cq, nil -//} - func buildBuilderQueriesNetwork(attributeCache []Clients) (map[string]*v3.BuilderQuery, error) { bq := make(map[string]*v3.BuilderQuery) From f508ee7521f76532dcb1c2f050d32036a2a3745a Mon Sep 17 00:00:00 2001 From: shivanshu Date: Mon, 26 Aug 2024 20:28:11 +0530 Subject: [PATCH 4/8] chore: query, response update --- pkg/query-service/app/http_handler.go | 32 +- .../messagingQueues/kafka/consumerLag.md | 378 +++++------------- .../integrations/messagingQueues/kafka/sql.go | 7 +- 3 files changed, 114 insertions(+), 303 deletions(-) diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index d51b9e2a64..229cb7195b 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -2534,17 +2534,16 @@ func (aH *APIHandler) getNetworkData( RespondError(w, apiErrObj, errQuriesByName) return } - result = postprocess.TransformToTableForClickHouseQueries(result) - // iterate over the result and extract messaging.client_id for _, res := range result { - table := res.Table - for _, row := range table.Rows { - if row.Data["client_id"] != nil && row.Data["service_instance_id"] != nil && row.Data["service_name"] != nil { - clientID := row.Data["client_id"].(string) - serviceInstanceId := row.Data["service_instance_id"].(string) - ServiceName := row.Data["service_name"].(string) - attributeCache = append(attributeCache, mq.Clients{ClientID: clientID, ServiceInstanceID: serviceInstanceId, ServiceName: ServiceName}) + for _, series := range res.Series { + clientID, clientIDOk := series.Labels["client_id"] + serviceInstanceID, serviceInstanceIDOk := series.Labels["service_instance_id"] + serviceName, serviceNameOk := series.Labels["service_name"] + if clientIDOk && serviceInstanceIDOk && serviceNameOk { + attributeCache = append(attributeCache, mq.Clients{ClientID: clientID, + ServiceInstanceID: serviceInstanceID, + ServiceName: serviceName}) } } } @@ -2567,12 +2566,21 @@ func (aH *APIHandler) getNetworkData( RespondError(w, apiErrObj, errQuriesByNameFetchLatency) return } - resultFetchLatency = postprocess.TransformToTableForClickHouseQueries(resultFetchLatency) - result = append(result, resultFetchLatency...) + latencyColoumn := &v3.Result{QueryName: "latency"} + var latencySeries []*v3.Series + for _, res := range resultFetchLatency { + for _, series := range res.Series { + latencySeries = append(latencySeries, series) + } + } + latencyColoumn.Series = latencySeries + result = append(result, latencyColoumn) + + resultFetchLatency = postprocess.TransformToTableForBuilderQueries(result, queryRangeParams) resp := v3.QueryRangeResponse{ - Result: result, + Result: resultFetchLatency, } aH.Respond(w, resp) } diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md b/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md index 1d0f8c93d1..cd0e1f6324 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md @@ -1,11 +1,6 @@ ## Consumer Lag feature break down -### 1) Consumer Lag Graph - - ---- - -### 2) Consumer Group Details +### 1) Consumer Group Details API endpoint: @@ -13,75 +8,75 @@ API endpoint: POST /api/v1/messaging-queues/kafka/consumer-lag/consumer-details ``` +Request-Body ```json { - "start": 1720685296000000000, - "end": 1721290096000000000, - "variables": { - "partition": "0", - "topic": "topic1", - "consumer_group": "cg1" - } + "start": 1724429217000000000, + "end": 1724431017000000000, + "variables": { + "partition": "0", + "topic": "topic1", + "consumer_group": "cg1" + } } ``` - -response in query range format `series` +Response in query range `table` format ```json { - "status": "success", - "data": { - "resultType": "", - "result": [ + "status": "success", + "data": { + "resultType": "", + "result": [ + { + "table": { + "columns": [ { - "table": { - "columns": [ - { - "name": "service_name", - "queryName": "", - "isValueColumn": false - }, - { - "name": "p99", - "queryName": "", - "isValueColumn": false - }, - { - "name": "error_rate", - "queryName": "", - "isValueColumn": false - }, - { - "name": "throughput", - "queryName": "", - "isValueColumn": false - }, - { - "name": "avg_msg_size", - "queryName": "", - "isValueColumn": false - } - ], - "rows": [ - { - "data": { - "avg_msg_size": "0", - "error_rate": "0", - "p99": "0.2942205100000016", - "service_name": "consumer-svc", - "throughput": "0.00016534391534391533" - } - } - ] - } + "name": "service_name", + "queryName": "", + "isValueColumn": false + }, + { + "name": "p99", + "queryName": "", + "isValueColumn": false + }, + { + "name": "error_rate", + "queryName": "", + "isValueColumn": false + }, + { + "name": "throughput", + "queryName": "", + "isValueColumn": false + }, + { + "name": "avg_msg_size", + "queryName": "", + "isValueColumn": false } - ] - } + ], + "rows": [ + { + "data": { + "avg_msg_size": "15", + "error_rate": "0", + "p99": "0.47993265000000035", + "service_name": "consumer-svc", + "throughput": "39.86888888888889" + } + } + ] + } + } + ] + } } ``` +---- - -### 3) Producer Details +### 2) Producer Details API endpoint: @@ -89,18 +84,19 @@ API endpoint: POST /api/v1/messaging-queues/kafka/consumer-lag/producer-details ``` +Request-Body ```json { - "start": 1720685296000000000, - "end": 1721290096000000000, + "start": 1724429217000000000, + "end": 1724431017000000000, "variables": { - "partition": "0", + "partition": "0", "topic": "topic1" } } ``` -response in query range format `series` +Response in query range `table` format ```json { "status": "success", @@ -116,12 +112,12 @@ response in query range format `series` "isValueColumn": false }, { - "name": "p99_query.p99", + "name": "p99", "queryName": "", "isValueColumn": false }, { - "name": "error_rate", + "name": "error_percentage", "queryName": "", "isValueColumn": false }, @@ -134,56 +130,9 @@ response in query range format `series` "rows": [ { "data": { - "error_rate": "0", - "p99_query.p99": "150.08830908000002", - "rps": "0.00016534391534391533", - "service_name": "producer-svc" - } - } - ] - } - } - ] - } -} -``` -response in query range format `table` -```json -{ - "status": "success", - "data": { - "resultType": "", - "result": [ - { - "table": { - "columns": [ - { - "name": "service_name", - "queryName": "", - "isValueColumn": false - }, - { - "name": "p99_query.p99", - "queryName": "", - "isValueColumn": false - }, - { - "name": "error_rate", - "queryName": "", - "isValueColumn": false - }, - { - "name": "rps", - "queryName": "", - "isValueColumn": false - } - ], - "rows": [ - { - "data": { - "error_rate": "0", - "p99_query.p99": "150.08830908000002", - "rps": "0.00016534391534391533", + "error_percentage": "0", + "p99": "5.51359028", + "rps": "39.86888888888889", "service_name": "producer-svc" } } @@ -195,8 +144,7 @@ response in query range format `table` } ``` -### 4) Network Fetch Latency: - +### 3) Network Fetch Latency: API endpoint: @@ -204,17 +152,18 @@ API endpoint: POST /api/v1/messaging-queues/kafka/consumer-lag/network-latency ``` +Request-Body ```json { - "start": 1721174400000000000, - "end": 1722470400000000000, + "start": 1724673937000000000, + "end": 1724675737000000000, "variables": { "consumer_group": "cg1" } } ``` -response in query range format `series` +Response in query range `table` format ```json { "status": "success", @@ -224,20 +173,25 @@ response in query range format `series` { "table": { "columns": [ + { + "name": "service_name", + "queryName": "", + "isValueColumn": false + }, { "name": "client_id", "queryName": "", "isValueColumn": false }, { - "name": "instance_id", + "name": "service_instance_id", "queryName": "", "isValueColumn": false }, { - "name": "serviceName", - "queryName": "", - "isValueColumn": false + "name": "latency", + "queryName": "latency", + "isValueColumn": true }, { "name": "throughput", @@ -249,171 +203,19 @@ response in query range format `series` { "data": { "client_id": "consumer-cg1-1", - "instance_id": "e33ffd7c-827a-427a-828e-547e00cb80d8", - "serviceName": "consumer-svc", - "throughput": 0.00035 + "latency": 25.21, + "service_instance_id": "ccf49550-2e8f-4c7b-be29-b9e0891ef93d", + "service_name": "consumer-svc", + "throughput": 24.91 } }, { "data": { "client_id": "consumer-cg1-1", - "instance_id": "a96ff029-6f14-435a-a3d4-ab4742b4347f", - "serviceName": "consumer-svc", - "throughput": 0.00027 - } - }, - { - "data": { - "client_id": "consumer-cg1-1", - "instance_id": "ac4833a8-fbe1-4592-a0ff-241f46a0851d", - "serviceName": "consumer-svc-2", - "throughput": 0.00019 - } - }, - { - "data": { - "client_id": "consumer-cg1-1", - "instance_id": "9e87227f-a564-4b55-bf7c-fb00365d9400", - "serviceName": "consumer-svc", - "throughput": 0.00008 - } - } - ] - } - }, - { - "table": { - "columns": [ - { - "name": "service_name", - "queryName": "", - "isValueColumn": false - }, - { - "name": "service_instance_id", - "queryName": "", - "isValueColumn": false - }, - { - "name": "latency_0", - "queryName": "latency_0", - "isValueColumn": true - }, - { - "name": "latency_1", - "queryName": "latency_1", - "isValueColumn": true - }, - { - "name": "latency_2", - "queryName": "latency_2", - "isValueColumn": true - }, - { - "name": "latency_3", - "queryName": "latency_3", - "isValueColumn": true - } - ], - "rows": [ - { - "data": { - "latency_0": 3230.1, - "latency_1": "n/a", - "latency_2": "n/a", - "latency_3": "n/a", - "service_instance_id": "a96ff029-6f14-435a-a3d4-ab4742b4347f", - "service_name": "consumer-svc" - } - }, - { - "data": { - "latency_0": 503, - "latency_1": "n/a", - "latency_2": "n/a", - "latency_3": "n/a", - "service_instance_id": "e33ffd7c-827a-427a-828e-547e00cb80d8", - "service_name": "consumer-svc" - } - }, - { - "data": { - "latency_0": 502.62, - "latency_1": "n/a", - "latency_2": "n/a", - "latency_3": "n/a", - "service_instance_id": "9e87227f-a564-4b55-bf7c-fb00365d9400", - "service_name": "consumer-svc" - } - }, - { - "data": { - "latency_0": "n/a", - "latency_1": 3230.1, - "latency_2": "n/a", - "latency_3": "n/a", - "service_instance_id": "a96ff029-6f14-435a-a3d4-ab4742b4347f", - "service_name": "consumer-svc" - } - }, - { - "data": { - "latency_0": "n/a", - "latency_1": 503, - "latency_2": "n/a", - "latency_3": "n/a", - "service_instance_id": "e33ffd7c-827a-427a-828e-547e00cb80d8", - "service_name": "consumer-svc" - } - }, - { - "data": { - "latency_0": "n/a", - "latency_1": 502.62, - "latency_2": "n/a", - "latency_3": "n/a", - "service_instance_id": "9e87227f-a564-4b55-bf7c-fb00365d9400", - "service_name": "consumer-svc" - } - }, - { - "data": { - "latency_0": "n/a", - "latency_1": "n/a", - "latency_2": 502.81, - "latency_3": "n/a", - "service_instance_id": "ac4833a8-fbe1-4592-a0ff-241f46a0851d", - "service_name": "consumer-svc-2" - } - }, - { - "data": { - "latency_0": "n/a", - "latency_1": "n/a", - "latency_2": "n/a", - "latency_3": 3230.1, - "service_instance_id": "a96ff029-6f14-435a-a3d4-ab4742b4347f", - "service_name": "consumer-svc" - } - }, - { - "data": { - "latency_0": "n/a", - "latency_1": "n/a", - "latency_2": "n/a", - "latency_3": 503, - "service_instance_id": "e33ffd7c-827a-427a-828e-547e00cb80d8", - "service_name": "consumer-svc" - } - }, - { - "data": { - "latency_0": "n/a", - "latency_1": "n/a", - "latency_2": "n/a", - "latency_3": 502.62, - "service_instance_id": "9e87227f-a564-4b55-bf7c-fb00365d9400", - "service_name": "consumer-svc" + "latency": 49.68, + "service_instance_id": "b0a851d7-1735-4e3f-8f5f-7c63a8a55a24", + "service_name": "consumer-svc", + "throughput": 14.97 } } ] diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go b/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go index 49e50a3d97..7b38f128bd 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go @@ -76,14 +76,15 @@ ORDER BY } func generateNetworkLatencyThroughputSQL(start, end int64, consumerGroup, queueType string) string { + timeRange := (end - start) / 1000000000 query := fmt.Sprintf(` --- Subquery for RPS calculation, desc sorted by rps SELECT stringTagMap['messaging.client_id'] AS client_id, stringTagMap['service.instance.id'] AS service_instance_id, serviceName AS service_name, - count(*) / ((%d - %d) / 1000000000) AS rps -- Convert nanoseconds to seconds -FROM signoz_traces.signoz_index_v2 + count(*) / %d AS rps -- Convert nanoseconds to seconds +FROM signoz_traces.distributed_signoz_index_v2 WHERE timestamp >= '%d' AND timestamp <= '%d' @@ -92,6 +93,6 @@ WHERE AND stringTagMap['messaging.kafka.consumer.group'] = '%s' GROUP BY service_name, client_id, service_instance_id ORDER BY rps DESC -`, end, start, start, end, queueType, consumerGroup) +`, timeRange, start, end, queueType, consumerGroup) return query } From 90cb8ba9a16357062e320fe48ac60b04a9e023de Mon Sep 17 00:00:00 2001 From: shivanshu Date: Mon, 26 Aug 2024 20:35:08 +0530 Subject: [PATCH 5/8] chore: modify producer output --- .../app/integrations/messagingQueues/kafka/consumerLag.md | 4 ++-- .../app/integrations/messagingQueues/kafka/sql.go | 8 +++----- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md b/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md index cd0e1f6324..b97cd9e69d 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md @@ -122,7 +122,7 @@ Response in query range `table` format "isValueColumn": false }, { - "name": "rps", + "name": "throughput", "queryName": "", "isValueColumn": false } @@ -132,7 +132,7 @@ Response in query range `table` format "data": { "error_percentage": "0", "p99": "5.51359028", - "rps": "39.86888888888889", + "throughput": "39.86888888888889", "service_name": "producer-svc" } } diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go b/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go index 7b38f128bd..f09cf73d9b 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go @@ -26,7 +26,6 @@ WITH consumer_query AS ( GROUP BY serviceName ) --- Main query to select all metrics SELECT serviceName AS service_name, p99, @@ -65,7 +64,7 @@ SELECT serviceName AS service_name, p99, COALESCE((error_count * 100.0) / total_count, 0) AS error_percentage, - COALESCE(total_count / %d, 0) AS rps -- Convert nanoseconds to seconds + COALESCE(total_count / %d, 0) AS throughput -- Convert nanoseconds to seconds FROM producer_query ORDER BY @@ -78,12 +77,11 @@ ORDER BY func generateNetworkLatencyThroughputSQL(start, end int64, consumerGroup, queueType string) string { timeRange := (end - start) / 1000000000 query := fmt.Sprintf(` ---- Subquery for RPS calculation, desc sorted by rps SELECT stringTagMap['messaging.client_id'] AS client_id, stringTagMap['service.instance.id'] AS service_instance_id, serviceName AS service_name, - count(*) / %d AS rps -- Convert nanoseconds to seconds + count(*) / %d AS throughput -- Convert nanoseconds to seconds FROM signoz_traces.distributed_signoz_index_v2 WHERE timestamp >= '%d' @@ -92,7 +90,7 @@ WHERE AND msgSystem = '%s' AND stringTagMap['messaging.kafka.consumer.group'] = '%s' GROUP BY service_name, client_id, service_instance_id -ORDER BY rps DESC +ORDER BY throughput DESC `, timeRange, start, end, queueType, consumerGroup) return query } From 8d655bf419ddd63eefc298e244b0ca76e7699eb2 Mon Sep 17 00:00:00 2001 From: shivanshu Date: Tue, 27 Aug 2024 14:04:00 +0530 Subject: [PATCH 6/8] chore: use MinAllowedStepInterval --- .../messagingQueues/kafka/translator.go | 21 ++++++++----------- 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go b/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go index ef7e7049ba..3de489a39e 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go @@ -1,8 +1,8 @@ package kafka import ( - "encoding/json" "fmt" + "go.signoz.io/signoz/pkg/query-service/common" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" ) @@ -50,14 +50,14 @@ func buildClickHouseQueryNetwork(messagingQueue *MessagingQueue, queueType strin }, nil } -func buildBuilderQueriesNetwork(attributeCache []Clients) (map[string]*v3.BuilderQuery, error) { +func buildBuilderQueriesNetwork(unixMilliStart, unixMilliEnd int64, attributeCache []Clients) (map[string]*v3.BuilderQuery, error) { bq := make(map[string]*v3.BuilderQuery) for i, instanceInfo := range attributeCache { queryName := fmt.Sprintf("latency_%d", i) chq := &v3.BuilderQuery{ QueryName: queryName, - StepInterval: defaultStepInterval, + StepInterval: common.MinAllowedStepInterval(unixMilliStart, unixMilliEnd), DataSource: v3.DataSourceMetrics, AggregateAttribute: v3.AttributeKey{ Key: "kafka_consumer_fetch_latency_avg", @@ -127,6 +127,9 @@ func BuildQRParamsNetwork(messagingQueue *MessagingQueue, queryContext string, a queueType := kafkaQueue + unixMilliStart := messagingQueue.Start / 1000000 + unixMilliEnd := messagingQueue.End / 1000000 + var cq *v3.CompositeQuery if queryContext == "throughput" { @@ -139,7 +142,7 @@ func BuildQRParamsNetwork(messagingQueue *MessagingQueue, queryContext string, a cq, err = buildCompositeQuery(chq, queryContext) } else if queryContext == "fetch-latency" { - bhq, err := buildBuilderQueriesNetwork(attributeCache) + bhq, err := buildBuilderQueriesNetwork(unixMilliStart, unixMilliEnd, attributeCache) if err != nil { return nil, err } @@ -150,20 +153,14 @@ func BuildQRParamsNetwork(messagingQueue *MessagingQueue, queryContext string, a } } - unixMiliStart := messagingQueue.Start / 1000000 - unixMiliEnd := messagingQueue.End / 1000000 - queryRangeParams := &v3.QueryRangeParamsV3{ - Start: unixMiliStart, - End: unixMiliEnd, + Start: unixMilliStart, + End: unixMilliEnd, Step: defaultStepInterval, CompositeQuery: cq, Version: "v4", FormatForWeb: true, } - tmp, _ := json.Marshal(queryRangeParams) - xx := string(tmp) - fmt.Print(xx) return queryRangeParams, nil } From aabf364cc625c74a5a9af09ef1cc5019cdf7f323 Mon Sep 17 00:00:00 2001 From: shivanshu Date: Tue, 27 Aug 2024 18:27:44 +0530 Subject: [PATCH 7/8] feat: add partition level granularity --- pkg/query-service/app/http_handler.go | 46 ++++-- .../messagingQueues/kafka/consumerLag.md | 15 +- .../messagingQueues/kafka/model.go | 7 +- .../integrations/messagingQueues/kafka/sql.go | 7 +- .../messagingQueues/kafka/translator.go | 134 ++++++++++-------- 5 files changed, 121 insertions(+), 88 deletions(-) diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 229cb7195b..5d3d9affd5 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -2501,9 +2501,17 @@ func (aH *APIHandler) RegisterMessagingQueuesRoutes(router *mux.Router, am *Auth // for other messaging queues, add SubRouters here } +// not using md5 hashing as the plain string would work +func uniqueIdentifier(clientID, serviceInstanceID, serviceName, separator string) string { + return clientID + separator + serviceInstanceID + separator + serviceName +} + func (aH *APIHandler) getNetworkData( w http.ResponseWriter, r *http.Request, ) { + attributeCache := &mq.Clients{ + Hash: make(map[string]struct{}), + } messagingQueue, apiErr := ParseMessagingQueueBody(r) if apiErr != nil { @@ -2512,7 +2520,6 @@ func (aH *APIHandler) getNetworkData( return } - attributeCache := make([]mq.Clients, 0) queryRangeParams, err := mq.BuildQRParamsNetwork(messagingQueue, "throughput", attributeCache) if err != nil { zap.L().Error(err.Error()) @@ -2526,12 +2533,12 @@ func (aH *APIHandler) getNetworkData( } var result []*v3.Result - var errQuriesByName map[string]error + var errQueriesByName map[string]error - result, errQuriesByName, err = aH.querierV2.QueryRange(r.Context(), queryRangeParams, nil) + result, errQueriesByName, err = aH.querierV2.QueryRange(r.Context(), queryRangeParams, nil) if err != nil { apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err} - RespondError(w, apiErrObj, errQuriesByName) + RespondError(w, apiErrObj, errQueriesByName) return } @@ -2540,10 +2547,13 @@ func (aH *APIHandler) getNetworkData( clientID, clientIDOk := series.Labels["client_id"] serviceInstanceID, serviceInstanceIDOk := series.Labels["service_instance_id"] serviceName, serviceNameOk := series.Labels["service_name"] - if clientIDOk && serviceInstanceIDOk && serviceNameOk { - attributeCache = append(attributeCache, mq.Clients{ClientID: clientID, - ServiceInstanceID: serviceInstanceID, - ServiceName: serviceName}) + hashKey := uniqueIdentifier(clientID, serviceInstanceID, serviceName, "#") + _, ok := attributeCache.Hash[hashKey] + if clientIDOk && serviceInstanceIDOk && serviceNameOk && !ok { + attributeCache.Hash[hashKey] = struct{}{} + attributeCache.ClientID = append(attributeCache.ClientID, clientID) + attributeCache.ServiceInstanceID = append(attributeCache.ServiceInstanceID, serviceInstanceID) + attributeCache.ServiceName = append(attributeCache.ServiceName, serviceName) } } } @@ -2560,22 +2570,30 @@ func (aH *APIHandler) getNetworkData( return } - resultFetchLatency, errQuriesByNameFetchLatency, err := aH.querierV2.QueryRange(r.Context(), queryRangeParams, nil) + resultFetchLatency, errQueriesByNameFetchLatency, err := aH.querierV2.QueryRange(r.Context(), queryRangeParams, nil) if err != nil { apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err} - RespondError(w, apiErrObj, errQuriesByNameFetchLatency) + RespondError(w, apiErrObj, errQueriesByNameFetchLatency) return } - latencyColoumn := &v3.Result{QueryName: "latency"} + latencyColumn := &v3.Result{QueryName: "latency"} var latencySeries []*v3.Series for _, res := range resultFetchLatency { for _, series := range res.Series { - latencySeries = append(latencySeries, series) + clientID, clientIDOk := series.Labels["client_id"] + serviceInstanceID, serviceInstanceIDOk := series.Labels["service_instance_id"] + serviceName, serviceNameOk := series.Labels["service_name"] + hashKey := uniqueIdentifier(clientID, serviceInstanceID, serviceName, "#") + _, ok := attributeCache.Hash[hashKey] + if clientIDOk && serviceInstanceIDOk && serviceNameOk && ok { + latencySeries = append(latencySeries, series) + } } } - latencyColoumn.Series = latencySeries - result = append(result, latencyColoumn) + + latencyColumn.Series = latencySeries + result = append(result, latencyColumn) resultFetchLatency = postprocess.TransformToTableForBuilderQueries(result, queryRangeParams) diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md b/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md index b97cd9e69d..38b61669ff 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md @@ -158,7 +158,8 @@ Request-Body "start": 1724673937000000000, "end": 1724675737000000000, "variables": { - "consumer_group": "cg1" + "consumer_group": "cg1", + "partition": "0" } } ``` @@ -203,19 +204,19 @@ Response in query range `table` format { "data": { "client_id": "consumer-cg1-1", - "latency": 25.21, - "service_instance_id": "ccf49550-2e8f-4c7b-be29-b9e0891ef93d", + "latency": 48.99, + "service_instance_id": "b0a851d7-1735-4e3f-8f5f-7c63a8a55a24", "service_name": "consumer-svc", - "throughput": 24.91 + "throughput": 14.97 } }, { "data": { "client_id": "consumer-cg1-1", - "latency": 49.68, - "service_instance_id": "b0a851d7-1735-4e3f-8f5f-7c63a8a55a24", + "latency": 25.21, + "service_instance_id": "ccf49550-2e8f-4c7b-be29-b9e0891ef93d", "service_name": "consumer-svc", - "throughput": 14.97 + "throughput": 24.91 } } ] diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/model.go b/pkg/query-service/app/integrations/messagingQueues/kafka/model.go index cedf62eff3..f587868610 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/model.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/model.go @@ -9,7 +9,8 @@ type MessagingQueue struct { } type Clients struct { - ClientID string - ServiceInstanceID string - ServiceName string + Hash map[string]struct{} + ClientID []string + ServiceInstanceID []string + ServiceName []string } diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go b/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go index f09cf73d9b..eb06689ef3 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go @@ -74,14 +74,14 @@ ORDER BY return query } -func generateNetworkLatencyThroughputSQL(start, end int64, consumerGroup, queueType string) string { +func generateNetworkLatencyThroughputSQL(start, end int64, consumerGroup, partitionID, queueType string) string { timeRange := (end - start) / 1000000000 query := fmt.Sprintf(` SELECT stringTagMap['messaging.client_id'] AS client_id, stringTagMap['service.instance.id'] AS service_instance_id, serviceName AS service_name, - count(*) / %d AS throughput -- Convert nanoseconds to seconds + count(*) / %d AS throughput FROM signoz_traces.distributed_signoz_index_v2 WHERE timestamp >= '%d' @@ -89,8 +89,9 @@ WHERE AND kind = 5 AND msgSystem = '%s' AND stringTagMap['messaging.kafka.consumer.group'] = '%s' + AND stringTagMap['messaging.destination.partition.id'] = '%s' GROUP BY service_name, client_id, service_instance_id ORDER BY throughput DESC -`, timeRange, start, end, queueType, consumerGroup) +`, timeRange, start, end, queueType, consumerGroup, partitionID) return query } diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go b/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go index 3de489a39e..7f4266df67 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go @@ -2,6 +2,8 @@ package kafka import ( "fmt" + "strings" + "go.signoz.io/signoz/pkg/query-service/common" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" ) @@ -43,87 +45,97 @@ func buildClickHouseQueryNetwork(messagingQueue *MessagingQueue, queueType strin return nil, fmt.Errorf("consumer_group not found in the request") } - query := generateNetworkLatencyThroughputSQL(start, end, consumerGroup, queueType) + partitionID, ok := messagingQueue.Variables["partition"] + if !ok { + return nil, fmt.Errorf("partition not found in the request") + } + + query := generateNetworkLatencyThroughputSQL(start, end, consumerGroup, partitionID, queueType) return &v3.ClickHouseQuery{ Query: query, }, nil } -func buildBuilderQueriesNetwork(unixMilliStart, unixMilliEnd int64, attributeCache []Clients) (map[string]*v3.BuilderQuery, error) { - bq := make(map[string]*v3.BuilderQuery) +func formatstring(str []string) string { + joined := strings.Join(str, ", ") + if len(joined) <= 2 { + return "" + } + return joined[1 : len(joined)-1] +} - for i, instanceInfo := range attributeCache { - queryName := fmt.Sprintf("latency_%d", i) - chq := &v3.BuilderQuery{ - QueryName: queryName, - StepInterval: common.MinAllowedStepInterval(unixMilliStart, unixMilliEnd), - DataSource: v3.DataSourceMetrics, - AggregateAttribute: v3.AttributeKey{ - Key: "kafka_consumer_fetch_latency_avg", - }, - AggregateOperator: v3.AggregateOperatorAvg, - Temporality: v3.Unspecified, - TimeAggregation: v3.TimeAggregationAvg, - SpaceAggregation: v3.SpaceAggregationAvg, - Filters: &v3.FilterSet{ - Operator: "AND", - Items: []v3.FilterItem{ - { - Key: v3.AttributeKey{ - Key: "service_name", - Type: v3.AttributeKeyTypeTag, - DataType: v3.AttributeKeyDataTypeString, - }, - Operator: v3.FilterOperatorEqual, - Value: instanceInfo.ServiceName, +func buildBuilderQueriesNetwork(unixMilliStart, unixMilliEnd int64, attributeCache *Clients) (map[string]*v3.BuilderQuery, error) { + bq := make(map[string]*v3.BuilderQuery) + queryName := fmt.Sprintf("latency") + + chq := &v3.BuilderQuery{ + QueryName: queryName, + StepInterval: common.MinAllowedStepInterval(unixMilliStart, unixMilliEnd), + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: "kafka_consumer_fetch_latency_avg", + }, + AggregateOperator: v3.AggregateOperatorAvg, + Temporality: v3.Unspecified, + TimeAggregation: v3.TimeAggregationAvg, + SpaceAggregation: v3.SpaceAggregationAvg, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{ + Key: "service_name", + Type: v3.AttributeKeyTypeTag, + DataType: v3.AttributeKeyDataTypeString, }, - { - Key: v3.AttributeKey{ - Key: "client_id", - Type: v3.AttributeKeyTypeTag, - DataType: v3.AttributeKeyDataTypeString, - }, - Operator: v3.FilterOperatorEqual, - Value: instanceInfo.ClientID, + Operator: v3.FilterOperatorIn, + Value: attributeCache.ServiceName, + }, + { + Key: v3.AttributeKey{ + Key: "client_id", + Type: v3.AttributeKeyTypeTag, + DataType: v3.AttributeKeyDataTypeString, }, - { - Key: v3.AttributeKey{ - Key: "service_instance_id", - Type: v3.AttributeKeyTypeTag, - DataType: v3.AttributeKeyDataTypeString, - }, - Operator: v3.FilterOperatorEqual, - Value: instanceInfo.ServiceInstanceID, + Operator: v3.FilterOperatorIn, + Value: attributeCache.ClientID, + }, + { + Key: v3.AttributeKey{ + Key: "service_instance_id", + Type: v3.AttributeKeyTypeTag, + DataType: v3.AttributeKeyDataTypeString, }, + Operator: v3.FilterOperatorIn, + Value: attributeCache.ServiceInstanceID, }, }, - Expression: queryName, - ReduceTo: v3.ReduceToOperatorAvg, - GroupBy: []v3.AttributeKey{{ - Key: "service_name", + }, + Expression: queryName, + ReduceTo: v3.ReduceToOperatorAvg, + GroupBy: []v3.AttributeKey{{ + Key: "service_name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + { + Key: "client_id", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, }, - { - Key: "client_id", - DataType: v3.AttributeKeyDataTypeString, - Type: v3.AttributeKeyTypeTag, - }, - { - Key: "service_instance_id", - DataType: v3.AttributeKeyDataTypeString, - Type: v3.AttributeKeyTypeTag, - }, + { + Key: "service_instance_id", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, }, - } - bq[queryName] = chq + }, } - + bq[queryName] = chq return bq, nil } -func BuildQRParamsNetwork(messagingQueue *MessagingQueue, queryContext string, attributeCache []Clients) (*v3.QueryRangeParamsV3, error) { +func BuildQRParamsNetwork(messagingQueue *MessagingQueue, queryContext string, attributeCache *Clients) (*v3.QueryRangeParamsV3, error) { queueType := kafkaQueue From faa6fdfcde84b011dc938eb14b7ed02b825d6146 Mon Sep 17 00:00:00 2001 From: shivanshu Date: Tue, 27 Aug 2024 18:28:52 +0530 Subject: [PATCH 8/8] feat: bug-fix in ClickHouseFormattedValue to allow strings --- pkg/query-service/utils/format.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/pkg/query-service/utils/format.go b/pkg/query-service/utils/format.go index bd7db15e6b..4de081940d 100644 --- a/pkg/query-service/utils/format.go +++ b/pkg/query-service/utils/format.go @@ -190,6 +190,19 @@ func ClickHouseFormattedValue(v interface{}) string { zap.L().Error("invalid type for formatted value", zap.Any("type", reflect.TypeOf(x[0]))) return "[]" } + case []string: + if len(x) == 0 { + return "[]" + } + str := "[" + for idx, sVal := range x { + str += fmt.Sprintf("'%s'", QuoteEscapedString(sVal)) + if idx != len(x)-1 { + str += "," + } + } + str += "]" + return str default: zap.L().Error("invalid type for formatted value", zap.Any("type", reflect.TypeOf(x))) return ""