diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index 84c5e6e079..2124ad8935 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -614,6 +614,101 @@ func (r *ClickHouseReader) SearchSpansAggregate(ctx context.Context, queryParams spanSearchAggregatesResponseItems := []model.SpanSearchAggregatesResponseItem{} + aggregation_query := "" + if queryParams.Dimension == "duration" { + switch queryParams.AggregationOption { + case "p50": + aggregation_query = " quantile(0.50)(durationNano) as value " + break + + case "p95": + aggregation_query = " quantile(0.95)(durationNano) as value " + break + + case "p99": + aggregation_query = " quantile(0.99)(durationNano) as value " + break + } + } else if queryParams.Dimension == "calls" { + aggregation_query = " count(*) as value " + } + + query := fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, %s FROM %s WHERE timestamp >= ? AND timestamp <= ?", queryParams.StepSeconds/60, aggregation_query, r.indexTable) + + args := []interface{}{strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10)} + + if len(queryParams.ServiceName) != 0 { + query = query + " AND serviceName = ?" + args = append(args, queryParams.ServiceName) + } + + if len(queryParams.OperationName) != 0 { + + query = query + " AND name = ?" + args = append(args, queryParams.OperationName) + + } + + if len(queryParams.Kind) != 0 { + query = query + " AND kind = ?" + args = append(args, queryParams.Kind) + + } + + if len(queryParams.MinDuration) != 0 { + query = query + " AND durationNano >= ?" + args = append(args, queryParams.MinDuration) + } + if len(queryParams.MaxDuration) != 0 { + query = query + " AND durationNano <= ?" + args = append(args, queryParams.MaxDuration) + } + + for _, item := range queryParams.Tags { + + if item.Key == "error" && item.Value == "true" { + query = query + " AND ( has(tags, 'error:true') OR statusCode>=500)" + continue + } + + if item.Operator == "equals" { + query = query + " AND has(tags, ?)" + args = append(args, fmt.Sprintf("%s:%s", item.Key, item.Value)) + + } else if item.Operator == "contains" { + query = query + " AND tagsValues[indexOf(tagsKeys, ?)] ILIKE ?" + args = append(args, item.Key) + args = append(args, fmt.Sprintf("%%%s%%", item.Value)) + } else if item.Operator == "isnotnull" { + query = query + " AND has(tagsKeys, ?)" + args = append(args, item.Key) + } else { + return nil, fmt.Errorf("Tag Operator %s not supported", item.Operator) + } + + } + + query = query + " GROUP BY time ORDER BY time" + + err := r.db.Select(&spanSearchAggregatesResponseItems, query, args...) + + zap.S().Info(query) + + if err != nil { + zap.S().Debug("Error in processing sql query: ", err) + return nil, fmt.Errorf("Error in processing sql query") + } + + for i, _ := range spanSearchAggregatesResponseItems { + + timeObj, _ := time.Parse(time.RFC3339Nano, spanSearchAggregatesResponseItems[i].Time) + spanSearchAggregatesResponseItems[i].Timestamp = int64(timeObj.UnixNano()) + spanSearchAggregatesResponseItems[i].Time = "" + if queryParams.AggregationOption == "rate_per_sec" { + spanSearchAggregatesResponseItems[i].Value = float32(spanSearchAggregatesResponseItems[i].Value) / float32(queryParams.StepSeconds) + } + } + return spanSearchAggregatesResponseItems, nil } diff --git a/pkg/query-service/app/parser.go b/pkg/query-service/app/parser.go index a586dc6f8b..59222cd592 100644 --- a/pkg/query-service/app/parser.go +++ b/pkg/query-service/app/parser.go @@ -16,7 +16,7 @@ var allowedDimesions = []string{"calls", "duration"} var allowedAggregations = map[string][]string{ "calls": []string{"count", "rate_per_sec"}, - "duration": []string{"avg", "p50", "p90", "p99"}, + "duration": []string{"avg", "p50", "p95", "p99"}, } func parseGetTopEndpointsRequest(r *http.Request) (*model.GetTopEndpointsParams, error) { @@ -234,6 +234,8 @@ func parseSearchSpanAggregatesRequest(r *http.Request) (*model.SpanSearchAggrega } params := &model.SpanSearchAggregatesParams{ + Start: startTime, + End: endTime, Intervals: fmt.Sprintf("%s/%s", startTimeStr, endTimeStr), GranOrigin: startTimeStr, GranPeriod: granPeriod, diff --git a/pkg/query-service/druidQuery/query.go b/pkg/query-service/druidQuery/query.go index b618b5ce1d..e040c192dd 100644 --- a/pkg/query-service/druidQuery/query.go +++ b/pkg/query-service/druidQuery/query.go @@ -240,10 +240,10 @@ func SearchSpansAggregate(client *godruid.Client, queryParams *model.SpanSearchA postAggregationString := `{"type":"quantilesDoublesSketchToQuantile","name":"value","field":{"type":"fieldAccess","fieldName":"quantile_agg"},"fraction":0.5}` postAggregation = godruid.PostAggRawJson(postAggregationString) break - case "p90": + case "p95": aggregationString := `{ "type": "quantilesDoublesSketch", "fieldName": "QuantileDuration", "name": "quantile_agg", "k": 128}` aggregation = godruid.AggRawJson(aggregationString) - postAggregationString := `{"type":"quantilesDoublesSketchToQuantile","name":"value","field":{"type":"fieldAccess","fieldName":"quantile_agg"},"fraction":0.9}` + postAggregationString := `{"type":"quantilesDoublesSketchToQuantile","name":"value","field":{"type":"fieldAccess","fieldName":"quantile_agg"},"fraction":0.95}` postAggregation = godruid.PostAggRawJson(postAggregationString) break diff --git a/pkg/query-service/model/queryParams.go b/pkg/query-service/model/queryParams.go index 49a0ed5b28..73955a0397 100644 --- a/pkg/query-service/model/queryParams.go +++ b/pkg/query-service/model/queryParams.go @@ -66,6 +66,8 @@ type SpanSearchAggregatesParams struct { MinDuration string MaxDuration string Tags []TagQuery + Start *time.Time + End *time.Time GranOrigin string GranPeriod string Intervals string diff --git a/pkg/query-service/model/response.go b/pkg/query-service/model/response.go index 841a8eb86c..96ea06bc20 100644 --- a/pkg/query-service/model/response.go +++ b/pkg/query-service/model/response.go @@ -164,6 +164,7 @@ type ServiceMapDependencyResponseItem struct { } type SpanSearchAggregatesResponseItem struct { - Timestamp int64 `json:"timestamp"` - Value float32 `json:"value"` + Timestamp int64 `json:"timestamp,omitempty" db:"timestamp" ` + Time string `json:"time,omitempty" db:"time"` + Value float32 `json:"value,omitempty" db:"value"` }