package kafka import ( "fmt" "github.com/SigNoz/signoz/pkg/query-service/common" "github.com/SigNoz/signoz/pkg/query-service/constants" v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3" ) var defaultStepInterval int64 = 60 func BuildQueryRangeParams(messagingQueue *MessagingQueue, queryContext string) (*v3.QueryRangeParamsV3, error) { if constants.KafkaSpanEval == "false" && queryContext == "producer-consumer-eval" { return nil, fmt.Errorf("span evaluation feature is disabled and is experimental") } // ToDo: propagate this through APIs when there are different handlers queueType := KafkaQueue chq, err := BuildClickHouseQuery(messagingQueue, queueType, queryContext) if err != nil { return nil, err } cq, err := buildCompositeQuery(chq, queryContext) if err != nil { return nil, err } queryRangeParams := &v3.QueryRangeParamsV3{ Start: messagingQueue.Start, End: messagingQueue.End, Step: defaultStepInterval, CompositeQuery: cq, Version: "v4", FormatForWeb: true, } return queryRangeParams, nil } func buildClickHouseQueryNetwork(messagingQueue *MessagingQueue, queueType string) (*v3.ClickHouseQuery, error) { start := messagingQueue.Start end := messagingQueue.End consumerGroup, ok := messagingQueue.Variables["consumer_group"] if !ok { return nil, fmt.Errorf("consumer_group not found in the request") } partitionID, ok := messagingQueue.Variables["partition"] if !ok { return nil, fmt.Errorf("partition not found in the request") } query := generateNetworkLatencyThroughputSQL(start, end, consumerGroup, partitionID, queueType) return &v3.ClickHouseQuery{ Query: query, }, nil } func buildBuilderQueriesProducerBytes( unixMilliStart, unixMilliEnd int64, attributeCache *Clients, ) (map[string]*v3.BuilderQuery, error) { normalized := true if constants.IsDotMetricsEnabled { normalized = false } bq := make(map[string]*v3.BuilderQuery) queryName := "byte_rate" chq := &v3.BuilderQuery{ QueryName: queryName, StepInterval: common.MinAllowedStepInterval(unixMilliStart, unixMilliEnd), DataSource: v3.DataSourceMetrics, AggregateAttribute: v3.AttributeKey{ Key: getDotMetrics("kafka_producer_byte_rate", normalized), DataType: v3.AttributeKeyDataTypeFloat64, Type: v3.AttributeKeyType("Gauge"), IsColumn: true, }, AggregateOperator: v3.AggregateOperatorAvg, Temporality: v3.Unspecified, TimeAggregation: v3.TimeAggregationAvg, SpaceAggregation: v3.SpaceAggregationAvg, Filters: &v3.FilterSet{ Operator: "AND", Items: []v3.FilterItem{ { Key: v3.AttributeKey{ Key: getDotMetrics("service_name", normalized), Type: v3.AttributeKeyTypeTag, DataType: v3.AttributeKeyDataTypeString, }, Operator: v3.FilterOperatorIn, Value: attributeCache.ServiceName, }, { Key: v3.AttributeKey{ Key: "topic", Type: v3.AttributeKeyTypeTag, DataType: v3.AttributeKeyDataTypeString, }, Operator: v3.FilterOperatorIn, Value: attributeCache.TopicName, }, }, }, Expression: queryName, ReduceTo: v3.ReduceToOperatorAvg, GroupBy: []v3.AttributeKey{ { Key: getDotMetrics("service_name", normalized), DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, }, { Key: "topic", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, }, }, } bq[queryName] = chq return bq, nil } func buildBuilderQueriesNetwork( unixMilliStart, unixMilliEnd int64, attributeCache *Clients, ) (map[string]*v3.BuilderQuery, error) { bq := make(map[string]*v3.BuilderQuery) queryName := "latency" normalized := true if constants.IsDotMetricsEnabled { normalized = false } chq := &v3.BuilderQuery{ QueryName: queryName, StepInterval: common.MinAllowedStepInterval(unixMilliStart, unixMilliEnd), DataSource: v3.DataSourceMetrics, AggregateAttribute: v3.AttributeKey{ Key: getDotMetrics("kafka_consumer_fetch_latency_avg", normalized), }, AggregateOperator: v3.AggregateOperatorAvg, Temporality: v3.Unspecified, TimeAggregation: v3.TimeAggregationAvg, SpaceAggregation: v3.SpaceAggregationAvg, Filters: &v3.FilterSet{ Operator: "AND", Items: []v3.FilterItem{ { Key: v3.AttributeKey{ Key: getDotMetrics("service_name", normalized), Type: v3.AttributeKeyTypeTag, DataType: v3.AttributeKeyDataTypeString, }, Operator: v3.FilterOperatorIn, Value: attributeCache.ServiceName, }, { Key: v3.AttributeKey{ Key: getDotMetrics("client_id", normalized), Type: v3.AttributeKeyTypeTag, DataType: v3.AttributeKeyDataTypeString, }, Operator: v3.FilterOperatorIn, Value: attributeCache.ClientID, }, { Key: v3.AttributeKey{ Key: getDotMetrics("service_instance_id", normalized), Type: v3.AttributeKeyTypeTag, DataType: v3.AttributeKeyDataTypeString, }, Operator: v3.FilterOperatorIn, Value: attributeCache.ServiceInstanceID, }, }, }, Expression: queryName, ReduceTo: v3.ReduceToOperatorAvg, GroupBy: []v3.AttributeKey{ { Key: getDotMetrics("service_name", normalized), DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, }, { Key: getDotMetrics("client_id", normalized), DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, }, { Key: getDotMetrics("service_instance_id", normalized), DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, }, }, } bq[queryName] = chq return bq, nil } func BuildBuilderQueriesKafkaOnboarding(messagingQueue *MessagingQueue) (*v3.QueryRangeParamsV3, error) { bq := make(map[string]*v3.BuilderQuery) unixMilliStart := messagingQueue.Start / 1000000 unixMilliEnd := messagingQueue.End / 1000000 normalized := true if constants.IsDotMetricsEnabled { normalized = false } buiderQuery := &v3.BuilderQuery{ QueryName: "fetch_latency", StepInterval: common.MinAllowedStepInterval(unixMilliStart, unixMilliEnd), DataSource: v3.DataSourceMetrics, AggregateAttribute: v3.AttributeKey{ Key: getDotMetrics("kafka_consumer_fetch_latency_avg", normalized), }, AggregateOperator: v3.AggregateOperatorCount, Temporality: v3.Unspecified, TimeAggregation: v3.TimeAggregationCount, SpaceAggregation: v3.SpaceAggregationSum, Expression: "fetch_latency", } bq["fetch_latency"] = buiderQuery buiderQuery = &v3.BuilderQuery{ QueryName: "consumer_lag", StepInterval: common.MinAllowedStepInterval(unixMilliStart, unixMilliEnd), DataSource: v3.DataSourceMetrics, AggregateAttribute: v3.AttributeKey{ Key: getDotMetrics("kafka_consumer_group_lag", normalized), }, AggregateOperator: v3.AggregateOperatorCount, Temporality: v3.Unspecified, TimeAggregation: v3.TimeAggregationCount, SpaceAggregation: v3.SpaceAggregationSum, Expression: "consumer_lag", } bq["consumer_lag"] = buiderQuery cq := &v3.CompositeQuery{ QueryType: v3.QueryTypeBuilder, BuilderQueries: bq, PanelType: v3.PanelTypeTable, } queryRangeParams := &v3.QueryRangeParamsV3{ Start: unixMilliStart, End: unixMilliEnd, Step: defaultStepInterval, CompositeQuery: cq, Version: "v4", FormatForWeb: true, } return queryRangeParams, nil } func BuildQRParamsWithCache( messagingQueue *MessagingQueue, queryContext string, attributeCache *Clients, ) (*v3.QueryRangeParamsV3, error) { queueType := KafkaQueue unixMilliStart := messagingQueue.Start / 1000000 unixMilliEnd := messagingQueue.End / 1000000 var cq *v3.CompositeQuery var err error if queryContext == "throughput" { chq, err := buildClickHouseQueryNetwork(messagingQueue, queueType) if err != nil { return nil, err } cq, err = buildCompositeQuery(chq, queryContext) } else if queryContext == "fetch-latency" { bhq, err := buildBuilderQueriesNetwork(unixMilliStart, unixMilliEnd, attributeCache) if err != nil { return nil, err } cq = &v3.CompositeQuery{ QueryType: v3.QueryTypeBuilder, BuilderQueries: bhq, PanelType: v3.PanelTypeTable, } } else if queryContext == "producer-throughput-overview" { start := messagingQueue.Start end := messagingQueue.End query := generateProducerPartitionThroughputSQL(start, end, queueType) cq, err = buildCompositeQuery(&v3.ClickHouseQuery{ Query: query, }, queryContext) } else if queryContext == "producer-throughput-overview-byte-rate" { bhq, err := buildBuilderQueriesProducerBytes(unixMilliStart, unixMilliEnd, attributeCache) if err != nil { return nil, err } cq = &v3.CompositeQuery{ QueryType: v3.QueryTypeBuilder, BuilderQueries: bhq, PanelType: v3.PanelTypeTable, FillGaps: false, } } queryRangeParams := &v3.QueryRangeParamsV3{ Start: unixMilliStart, End: unixMilliEnd, Step: defaultStepInterval, CompositeQuery: cq, Version: "v4", FormatForWeb: true, } return queryRangeParams, err } func BuildClickHouseQuery( messagingQueue *MessagingQueue, queueType string, queryContext string, ) (*v3.ClickHouseQuery, error) { start := messagingQueue.Start end := messagingQueue.End var topic, partition string if queryContext == "producer" || queryContext == "consumer" || queryContext == "consumer_partition_latency" || queryContext == "producer-throughput-details" || queryContext == "consumer-throughput-details" { var ok bool topic, ok = messagingQueue.Variables["topic"] if !ok { return nil, fmt.Errorf("invalid type for Topic") } if !(queryContext == "consumer-throughput-details" || queryContext == "producer-throughput-details") { partition, ok = messagingQueue.Variables["partition"] if !ok { return nil, fmt.Errorf("invalid type for Partition") } } } var query string switch queryContext { case "producer": query = generateProducerSQL(start, end, topic, partition, queueType) case "consumer": consumerGroup, ok := messagingQueue.Variables["consumer_group"] if !ok { return nil, fmt.Errorf("invalid type for consumer group") } query = generateConsumerSQL(start, end, topic, partition, consumerGroup, queueType) case "producer-topic-throughput": query = generatePartitionLatencySQL(start, end, queueType) case "consumer_partition_latency": query = generateConsumerPartitionLatencySQL(start, end, topic, partition, queueType) case "producer-throughput-details": svcName, ok := messagingQueue.Variables["service_name"] if !ok { return nil, fmt.Errorf("invalid type for service") } query = generateProducerTopicLatencySQL(start, end, topic, svcName, queueType) case "consumer-throughput-overview": query = generateConsumerLatencySQL(start, end, queueType) case "consumer-throughput-details": svcName, ok := messagingQueue.Variables["service_name"] if !ok { return nil, fmt.Errorf("invalid type for service") } query = generateConsumerServiceLatencySQL(start, end, topic, svcName, queueType) case "producer-consumer-eval": query = generateProducerConsumerEvalSQL(start, end, queueType, messagingQueue.EvalTime) case "onboard_producers": query = onboardProducersSQL(start, end, queueType) case "onboard_consumers": query = onboardConsumerSQL(start, end, queueType) } return &v3.ClickHouseQuery{ Query: query, }, nil } func buildCompositeQuery(chq *v3.ClickHouseQuery, queryContext string) (*v3.CompositeQuery, error) { if queryContext == "producer-consumer-eval" { return &v3.CompositeQuery{ QueryType: v3.QueryTypeClickHouseSQL, ClickHouseQueries: map[string]*v3.ClickHouseQuery{queryContext: chq}, PanelType: v3.PanelTypeList, }, nil } return &v3.CompositeQuery{ QueryType: v3.QueryTypeClickHouseSQL, ClickHouseQueries: map[string]*v3.ClickHouseQuery{queryContext: chq}, PanelType: v3.PanelTypeTable, }, nil } func getDotMetrics(metricName string, normalized bool) string { dotMetricsMap := map[string]string{ "kafka_producer_byte_rate": "kafka.producer.byte-rate", "service_name": "service.name", "kafka_consumer_fetch_latency_avg": "kafka.consumer.fetch_latency_avg", "service_instance_id": "service.instance.id", "client_id": "client-id", "kafka_consumer_group_lag": "kafka.consumer_group.lag", } if _, ok := dotMetricsMap[metricName]; ok && !normalized { return dotMetricsMap[metricName] } else { return metricName } }