mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-06-21 05:58:23 +08:00
404 lines
12 KiB
Go
404 lines
12 KiB
Go
package druidQuery
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"time"
|
|
|
|
"go.signoz.io/query-service/constants"
|
|
"go.signoz.io/query-service/godruid"
|
|
"go.signoz.io/query-service/model"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
func check(e error) {
|
|
if e != nil {
|
|
panic(e)
|
|
}
|
|
}
|
|
|
|
type DurationItem struct {
|
|
Value float32 `json:"value"`
|
|
QuantileAgg int `json:"quantile_agg"`
|
|
}
|
|
|
|
type SpanSearchAggregatesDuratonReceivedItem struct {
|
|
Timestamp string `json:"timestamp"`
|
|
Result DurationItem `json:"result"`
|
|
}
|
|
|
|
type SpanSearchAggregatesResponseItem struct {
|
|
Timestamp int64 `json:"timestamp"`
|
|
Value float32 `json:"value"`
|
|
}
|
|
|
|
func buildFilters(queryParams *model.SpanSearchParams) (*godruid.Filter, error) {
|
|
|
|
var filter *godruid.Filter
|
|
|
|
if len(queryParams.ServiceName) != 0 {
|
|
filter = godruid.FilterSelector("ServiceName", queryParams.ServiceName)
|
|
}
|
|
|
|
if len(queryParams.OperationName) != 0 {
|
|
|
|
newFilter := godruid.FilterSelector("Name", queryParams.OperationName)
|
|
filter = godruid.FilterAnd(filter, newFilter)
|
|
|
|
}
|
|
if len(queryParams.Kind) != 0 {
|
|
|
|
newFilter := godruid.FilterSelector("Kind", queryParams.Kind)
|
|
filter = godruid.FilterAnd(filter, newFilter)
|
|
|
|
}
|
|
|
|
// zap.S().Debug("MinDuration: ", queryParams.MinDuration)
|
|
var lower string
|
|
var upper string
|
|
|
|
if len(queryParams.MinDuration) != 0 {
|
|
lower = queryParams.MinDuration
|
|
}
|
|
if len(queryParams.MaxDuration) != 0 {
|
|
upper = queryParams.MaxDuration
|
|
}
|
|
|
|
if len(lower) != 0 || len(upper) != 0 {
|
|
|
|
newFilter := godruid.FilterBound("DurationNano", lower, upper, false, false, "numeric")
|
|
filter = godruid.FilterAnd(filter, newFilter)
|
|
|
|
}
|
|
|
|
for _, item := range queryParams.Tags {
|
|
|
|
var newFilter *godruid.Filter
|
|
|
|
if item.Operator == "equals" {
|
|
newFilter = godruid.FilterSelector("Tags", fmt.Sprintf("%s:%s", item.Key, item.Value))
|
|
} else if item.Operator == "contains" {
|
|
valuesFilter := godruid.FilterSearch("TagsValues", fmt.Sprintf("%s", item.Value))
|
|
keysFilter := godruid.FilterSelector("TagsKeys", fmt.Sprintf("%s", item.Key))
|
|
newFilter = godruid.FilterAnd(valuesFilter, keysFilter)
|
|
} else if item.Operator == "isnotnull" {
|
|
newFilter = godruid.FilterSelector("TagsKeys", fmt.Sprintf("%s", item.Key))
|
|
} else {
|
|
return nil, fmt.Errorf("Tag Operator %s not supported", item.Operator)
|
|
}
|
|
|
|
if item.Key == "error" && item.Value == "true" {
|
|
statusCodeFilter := godruid.FilterBound("StatusCode", "500", "600", false, true, "numeric")
|
|
filterError := godruid.FilterOr(statusCodeFilter, newFilter)
|
|
filter = godruid.FilterAnd(filter, filterError)
|
|
continue
|
|
}
|
|
|
|
filter = godruid.FilterAnd(filter, newFilter)
|
|
|
|
}
|
|
|
|
// if filter == nil {
|
|
// return nil, fmt.Errorf("No search criteria for spans was specified")
|
|
// }
|
|
return filter, nil
|
|
|
|
}
|
|
|
|
func buildFiltersForSpansAggregates(queryParams *model.SpanSearchAggregatesParams) (*godruid.Filter, error) {
|
|
|
|
var filter *godruid.Filter
|
|
|
|
if len(queryParams.ServiceName) != 0 {
|
|
filter = godruid.FilterSelector("ServiceName", queryParams.ServiceName)
|
|
}
|
|
|
|
if len(queryParams.OperationName) != 0 {
|
|
|
|
newFilter := godruid.FilterSelector("Name", queryParams.OperationName)
|
|
filter = godruid.FilterAnd(filter, newFilter)
|
|
|
|
}
|
|
if len(queryParams.Kind) != 0 {
|
|
|
|
newFilter := godruid.FilterSelector("Kind", queryParams.Kind)
|
|
filter = godruid.FilterAnd(filter, newFilter)
|
|
|
|
}
|
|
|
|
// zap.S().Debug("MinDuration: ", queryParams.MinDuration)
|
|
var lower string
|
|
var upper string
|
|
|
|
if len(queryParams.MinDuration) != 0 {
|
|
lower = queryParams.MinDuration
|
|
}
|
|
if len(queryParams.MaxDuration) != 0 {
|
|
upper = queryParams.MaxDuration
|
|
}
|
|
|
|
if len(lower) != 0 || len(upper) != 0 {
|
|
|
|
newFilter := godruid.FilterBound("DurationNano", lower, upper, false, false, "numeric")
|
|
filter = godruid.FilterAnd(filter, newFilter)
|
|
|
|
}
|
|
|
|
for _, item := range queryParams.Tags {
|
|
|
|
var newFilter *godruid.Filter
|
|
|
|
if item.Operator == "equals" {
|
|
newFilter = godruid.FilterSelector("Tags", fmt.Sprintf("%s:%s", item.Key, item.Value))
|
|
} else if item.Operator == "contains" {
|
|
valuesFilter := godruid.FilterSearch("TagsValues", fmt.Sprintf("%s", item.Value))
|
|
keysFilter := godruid.FilterSelector("TagsKeys", fmt.Sprintf("%s", item.Key))
|
|
newFilter = godruid.FilterAnd(valuesFilter, keysFilter)
|
|
} else if item.Operator == "isnotnull" {
|
|
newFilter = godruid.FilterSelector("TagsKeys", fmt.Sprintf("%s", item.Key))
|
|
} else {
|
|
return nil, fmt.Errorf("Tag Operator %s not supported", item.Operator)
|
|
}
|
|
|
|
if item.Key == "error" && item.Value == "true" {
|
|
statusCodeFilter := godruid.FilterBound("StatusCode", "500", "600", false, true, "numeric")
|
|
filterError := godruid.FilterOr(statusCodeFilter, newFilter)
|
|
filter = godruid.FilterAnd(filter, filterError)
|
|
continue
|
|
}
|
|
|
|
filter = godruid.FilterAnd(filter, newFilter)
|
|
|
|
}
|
|
|
|
// newFilter := godruid.FilterSelector("Kind", "2")
|
|
// filter = godruid.FilterAnd(filter, newFilter)
|
|
|
|
// if filter == nil {
|
|
// return nil, fmt.Errorf("No search criteria for spans was specified")
|
|
// }
|
|
return filter, nil
|
|
|
|
}
|
|
|
|
func SearchTraces(client *godruid.Client, traceId string) ([]godruid.ScanResult, error) {
|
|
|
|
filter := godruid.FilterSelector("TraceId", traceId)
|
|
|
|
query := &godruid.QueryScan{
|
|
DataSource: constants.DruidDatasource,
|
|
Intervals: []string{"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z"},
|
|
Filter: filter,
|
|
Columns: []string{"__time", "SpanId", "TraceId", "ServiceName", "Name", "Kind", "DurationNano", "TagsKeys", "TagsValues", "References"},
|
|
Order: "none",
|
|
BatchSize: 20480,
|
|
}
|
|
|
|
clientErr := client.Query(query)
|
|
// fmt.Println("requst", client.LastRequest)
|
|
if clientErr != nil {
|
|
// fmt.Println("Error: ", err)
|
|
zap.S().Error(zap.Error(clientErr))
|
|
return nil, clientErr
|
|
}
|
|
|
|
// fmt.Println("response", client.LastResponse)
|
|
|
|
// fmt.Printf("query.QueryResult:\n%v", query.QueryResult)
|
|
|
|
return query.QueryResult, nil
|
|
}
|
|
|
|
func SearchSpansAggregate(client *godruid.Client, queryParams *model.SpanSearchAggregatesParams) ([]SpanSearchAggregatesResponseItem, error) {
|
|
|
|
filter, err := buildFiltersForSpansAggregates(queryParams)
|
|
var needsPostAggregation bool = true
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
granularity := godruid.GranPeriod{
|
|
Type: "period",
|
|
Period: queryParams.GranPeriod,
|
|
// Origin: queryParams.GranOrigin,
|
|
}
|
|
|
|
var aggregation godruid.Aggregation
|
|
var postAggregation godruid.PostAggregation
|
|
|
|
if queryParams.Dimension == "duration" {
|
|
switch queryParams.AggregationOption {
|
|
case "p50":
|
|
aggregationString := `{ "type": "quantilesDoublesSketch", "fieldName": "QuantileDuration", "name": "quantile_agg", "k": 128}`
|
|
aggregation = godruid.AggRawJson(aggregationString)
|
|
postAggregationString := `{"type":"quantilesDoublesSketchToQuantile","name":"value","field":{"type":"fieldAccess","fieldName":"quantile_agg"},"fraction":0.5}`
|
|
postAggregation = godruid.PostAggRawJson(postAggregationString)
|
|
break
|
|
case "p90":
|
|
aggregationString := `{ "type": "quantilesDoublesSketch", "fieldName": "QuantileDuration", "name": "quantile_agg", "k": 128}`
|
|
aggregation = godruid.AggRawJson(aggregationString)
|
|
postAggregationString := `{"type":"quantilesDoublesSketchToQuantile","name":"value","field":{"type":"fieldAccess","fieldName":"quantile_agg"},"fraction":0.9}`
|
|
postAggregation = godruid.PostAggRawJson(postAggregationString)
|
|
break
|
|
|
|
case "p99":
|
|
aggregationString := `{ "type": "quantilesDoublesSketch", "fieldName": "QuantileDuration", "name": "quantile_agg", "k": 128}`
|
|
aggregation = godruid.AggRawJson(aggregationString)
|
|
postAggregationString := `{"type":"quantilesDoublesSketchToQuantile","name":"value","field":{"type":"fieldAccess","fieldName":"quantile_agg"},"fraction":0.99}`
|
|
postAggregation = godruid.PostAggRawJson(postAggregationString)
|
|
break
|
|
}
|
|
|
|
} else if queryParams.Dimension == "calls" {
|
|
|
|
aggregation = godruid.AggCount("value")
|
|
needsPostAggregation = false
|
|
}
|
|
var query *godruid.QueryTimeseries
|
|
if needsPostAggregation {
|
|
query = &godruid.QueryTimeseries{
|
|
DataSource: constants.DruidDatasource,
|
|
Intervals: []string{queryParams.Intervals},
|
|
Granularity: granularity,
|
|
Filter: filter,
|
|
Aggregations: []godruid.Aggregation{aggregation},
|
|
PostAggregations: []godruid.PostAggregation{postAggregation},
|
|
}
|
|
} else {
|
|
query = &godruid.QueryTimeseries{
|
|
DataSource: constants.DruidDatasource,
|
|
Intervals: []string{queryParams.Intervals},
|
|
Granularity: granularity,
|
|
Filter: filter,
|
|
Aggregations: []godruid.Aggregation{aggregation},
|
|
// PostAggregations: []godruid.PostAggregation{postAggregation},
|
|
}
|
|
}
|
|
|
|
clientErr := client.Query(query)
|
|
// fmt.Println("requst", client.LastRequest)
|
|
if clientErr != nil {
|
|
// fmt.Println("Error: ", err)
|
|
zap.S().Error(zap.Error(clientErr))
|
|
return nil, clientErr
|
|
}
|
|
|
|
// fmt.Println("response", client.LastResponse)
|
|
|
|
receivedResponse := new([]SpanSearchAggregatesDuratonReceivedItem)
|
|
err = json.Unmarshal([]byte(client.LastResponse), receivedResponse)
|
|
if err != nil && len(*receivedResponse) == 0 {
|
|
zap.S().Error(err)
|
|
return nil, fmt.Errorf("Error in unmarshalling response from druid")
|
|
}
|
|
|
|
var response []SpanSearchAggregatesResponseItem
|
|
|
|
for _, elem := range *receivedResponse {
|
|
|
|
value := elem.Result.Value
|
|
timeObj, _ := time.Parse(time.RFC3339Nano, elem.Timestamp)
|
|
timestamp := timeObj.UnixNano()
|
|
|
|
if queryParams.AggregationOption == "rate_per_sec" {
|
|
value = elem.Result.Value * 1.0 / float32(queryParams.StepSeconds)
|
|
}
|
|
response = append(response, SpanSearchAggregatesResponseItem{
|
|
Timestamp: timestamp,
|
|
Value: value,
|
|
})
|
|
}
|
|
return response, nil
|
|
|
|
// fmt.Printf("query.QueryResult:\n%v", query.QueryResult)
|
|
|
|
return nil, nil
|
|
}
|
|
|
|
func SearchSpans(client *godruid.Client, queryParams *model.SpanSearchParams) ([]godruid.ScanResult, error) {
|
|
|
|
filter, err := buildFilters(queryParams)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
query := &godruid.QueryScan{
|
|
DataSource: constants.DruidDatasource,
|
|
Intervals: []string{queryParams.Intervals},
|
|
Filter: filter,
|
|
Columns: []string{"__time", "SpanId", "TraceId", "ServiceName", "Name", "Kind", "DurationNano", "TagsKeys", "TagsValues"},
|
|
Limit: queryParams.Limit,
|
|
Offset: queryParams.Offset,
|
|
Order: "descending",
|
|
BatchSize: 20480,
|
|
}
|
|
|
|
clientErr := client.Query(query)
|
|
// fmt.Println("requst", client.LastRequest)
|
|
if clientErr != nil {
|
|
// fmt.Println("Error: ", err)
|
|
zap.S().Error(zap.Error(clientErr))
|
|
return nil, clientErr
|
|
}
|
|
|
|
// fmt.Println("response", client.LastResponse)
|
|
|
|
// fmt.Printf("query.QueryResult:\n%v", query.QueryResult)
|
|
|
|
return query.QueryResult, nil
|
|
}
|
|
|
|
func GetApplicationPercentiles(client *godruid.Client, queryParams *model.ApplicationPercentileParams) ([]godruid.Timeseries, error) {
|
|
|
|
// query := &godruid.QueryGroupBy{
|
|
// DataSource: constants.DruidDatasource,
|
|
// Intervals: []string{"2020-12-11T05:23:00.000Z/2020-12-11T05:24:00.000Z"},
|
|
// Granularity: godruid.GranMinute,
|
|
// Filter: godruid.FilterSelector("Kind", "2"),
|
|
// Dimensions: []godruid.DimSpec{"ServiceName"},
|
|
// Aggregations: []godruid.Aggregation{godruid.AggRawJson(`{ "type" : "count", "name" : "count" }`)},
|
|
// }
|
|
|
|
granularity := godruid.GranPeriod{
|
|
Type: "period",
|
|
Period: queryParams.GranPeriod,
|
|
Origin: queryParams.GranOrigin,
|
|
}
|
|
|
|
filterKind := godruid.FilterSelector("Kind", "2")
|
|
filterService := godruid.FilterSelector("ServiceName", queryParams.ServiceName)
|
|
filter := godruid.FilterAnd(filterKind, filterService)
|
|
|
|
aggregationString := `{ "type": "quantilesDoublesSketch", "fieldName": "QuantileDuration", "name": "quantile_agg", "k": 128}`
|
|
aggregation := godruid.AggRawJson(aggregationString)
|
|
|
|
postAggregationString := `{"type":"quantilesDoublesSketchToQuantiles","name":"final_quantile","field":{"type":"fieldAccess","fieldName":"quantile_agg"},"fractions":[0.5,0.99]}`
|
|
postAggregation := godruid.PostAggRawJson(postAggregationString)
|
|
|
|
query := &godruid.QueryTimeseries{
|
|
DataSource: constants.DruidDatasource,
|
|
Intervals: []string{queryParams.Intervals},
|
|
Granularity: granularity,
|
|
Filter: filter,
|
|
Aggregations: []godruid.Aggregation{aggregation},
|
|
PostAggregations: []godruid.PostAggregation{postAggregation},
|
|
}
|
|
|
|
err := client.Query(query)
|
|
// fmt.Println("requst", client.LastRequest)
|
|
if err != nil {
|
|
// fmt.Println("Error: ", err)
|
|
zap.S().Error(zap.Error(err))
|
|
return nil, err
|
|
}
|
|
|
|
// fmt.Println("response", client.LastResponse)
|
|
|
|
// fmt.Printf("query.QueryResult:\n%v", query.QueryResult)
|
|
|
|
return query.QueryResult, nil
|
|
|
|
}
|