diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index b7b5e391a2..39c84b93b8 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -4167,6 +4167,48 @@ func (r *ClickHouseReader) GetTimeSeriesResultV3(ctx context.Context, query stri return readRowsForTimeSeriesResult(rows, vars, columnNames) } +// GetListResultV3 runs the query and returns list of rows +func (r *ClickHouseReader) GetListResultV3(ctx context.Context, query string) ([]*v3.Row, error) { + + defer utils.Elapsed("GetListResultV3", query)() + + rows, err := r.db.Query(ctx, query) + + if err != nil { + zap.S().Errorf("error while reading time series result %v", err) + return nil, err + } + defer rows.Close() + + var ( + columnTypes = rows.ColumnTypes() + columnNames = rows.Columns() + vars = make([]interface{}, len(columnTypes)) + ) + for i := range columnTypes { + vars[i] = reflect.New(columnTypes[i].ScanType()).Interface() + } + + var rowList []*v3.Row + + for rows.Next() { + if err := rows.Scan(vars...); err != nil { + return nil, err + } + row := map[string]interface{}{} + var t time.Time + for idx, v := range vars { + if columnNames[idx] == "timestamp" { + t = time.Unix(0, int64(*v.(*uint64))) + } + row[columnNames[idx]] = v + } + rowList = append(rowList, &v3.Row{Timestamp: t, Data: row}) + } + + return rowList, nil + +} func (r *ClickHouseReader) CheckClickHouse(ctx context.Context) error { rows, err := r.db.Query(ctx, "SELECT 1") if err != nil { diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index e8b4b4a6e5..9310cc5ad8 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -21,6 +21,7 @@ import ( "go.signoz.io/signoz/pkg/query-service/app/dashboards" "go.signoz.io/signoz/pkg/query-service/app/explorer" "go.signoz.io/signoz/pkg/query-service/app/logs" + logsv3 "go.signoz.io/signoz/pkg/query-service/app/logs/v3" "go.signoz.io/signoz/pkg/query-service/app/metrics" metricsv3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3" "go.signoz.io/signoz/pkg/query-service/app/parser" @@ -109,9 +110,7 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) { BuildTraceQuery: func(start, end int64, queryType v3.QueryType, panelType v3.PanelType, bq *v3.BuilderQuery) (string, error) { return "", errors.New("not implemented") }, - BuildLogQuery: func(start, end int64, queryType v3.QueryType, panelType v3.PanelType, bq *v3.BuilderQuery) (string, error) { - return "", errors.New("not implemented") - }, + BuildLogQuery: logsv3.PrepareLogsQuery, } aH.queryBuilder = NewQueryBuilder(builderOpts) @@ -2443,7 +2442,7 @@ func (aH *APIHandler) autoCompleteAttributeValues(w http.ResponseWriter, r *http aH.Respond(w, response) } -func (aH *APIHandler) execClickHouseQueries(ctx context.Context, queries map[string]string) ([]*v3.Result, error, map[string]string) { +func (aH *APIHandler) execClickHouseGraphQueries(ctx context.Context, queries map[string]string) ([]*v3.Result, error, map[string]string) { type channelResult struct { Series []*v3.Series Err error @@ -2492,6 +2491,55 @@ func (aH *APIHandler) execClickHouseQueries(ctx context.Context, queries map[str return res, nil, nil } +func (aH *APIHandler) execClickHouseListQueries(ctx context.Context, queries map[string]string) ([]*v3.Result, error, map[string]string) { + type channelResult struct { + List []*v3.Row + Err error + Name string + Query string + } + + ch := make(chan channelResult, len(queries)) + var wg sync.WaitGroup + + for name, query := range queries { + wg.Add(1) + go func(name, query string) { + defer wg.Done() + rowList, err := aH.reader.GetListResultV3(ctx, query) + + if err != nil { + ch <- channelResult{Err: fmt.Errorf("error in query-%s: %v", name, err), Name: name, Query: query} + return + } + ch <- channelResult{List: rowList, Name: name, Query: query} + }(name, query) + } + + wg.Wait() + close(ch) + + var errs []error + errQuriesByName := make(map[string]string) + res := make([]*v3.Result, 0) + // read values from the channel + for r := range ch { + if r.Err != nil { + errs = append(errs, r.Err) + errQuriesByName[r.Name] = r.Query + continue + } + res = append(res, &v3.Result{ + QueryName: r.Name, + List: r.List, + }) + } + if len(errs) != 0 { + return nil, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...)), errQuriesByName + } + return res, nil, nil +} + func (aH *APIHandler) execPromQueries(ctx context.Context, metricsQueryRangeParams *v3.QueryRangeParamsV3) ([]*v3.Result, error, map[string]string) { type channelResult struct { Series []*v3.Series @@ -2572,19 +2620,83 @@ func (aH *APIHandler) execPromQueries(ctx context.Context, metricsQueryRangePara return res, nil, nil } -func (aH *APIHandler) queryRangeV3(queryRangeParams *v3.QueryRangeParamsV3, w http.ResponseWriter, r *http.Request) { +func (aH *APIHandler) getLogFieldsV3(ctx context.Context, queryRangeParams *v3.QueryRangeParamsV3) (map[string]v3.AttributeKey, error) { + data := map[string]v3.AttributeKey{} + for _, query := range queryRangeParams.CompositeQuery.BuilderQueries { + if query.DataSource == v3.DataSourceLogs { + fields, apiError := aH.reader.GetLogFields(ctx) + if apiError != nil { + return nil, apiError.Err + } + + // top level fields meta will always be present in the frontend. (can be support for that as enchancement) + getType := func(t string) (v3.AttributeKeyType, bool) { + if t == "attributes" { + return v3.AttributeKeyTypeTag, false + } else if t == "resources" { + return v3.AttributeKeyTypeResource, false + } + return "", true + } + + for _, selectedField := range fields.Selected { + fieldType, pass := getType(selectedField.Type) + if pass { + continue + } + data[selectedField.Name] = v3.AttributeKey{ + Key: selectedField.Name, + Type: fieldType, + DataType: v3.AttributeKeyDataType(strings.ToLower(selectedField.DataType)), + IsColumn: true, + } + } + for _, interestingField := range fields.Interesting { + fieldType, pass := getType(interestingField.Type) + if pass { + continue + } + data[interestingField.Name] = v3.AttributeKey{ + Key: interestingField.Name, + Type: fieldType, + DataType: v3.AttributeKeyDataType(strings.ToLower(interestingField.DataType)), + IsColumn: false, + } + } + break + } + } + return data, nil +} + +func (aH *APIHandler) queryRangeV3(ctx context.Context, queryRangeParams *v3.QueryRangeParamsV3, w http.ResponseWriter, r *http.Request) { var result []*v3.Result var err error var errQuriesByName map[string]string + var queries map[string]string switch queryRangeParams.CompositeQuery.QueryType { case v3.QueryTypeBuilder: - queries, err := aH.queryBuilder.prepareQueries(queryRangeParams) + // get the fields if any logs query is present + var fields map[string]v3.AttributeKey + fields, err = aH.getLogFieldsV3(ctx, queryRangeParams) + if err != nil { + apiErrObj := &model.ApiError{Typ: model.ErrorInternal, Err: err} + RespondError(w, apiErrObj, errQuriesByName) + return + } + + queries, err = aH.queryBuilder.prepareQueries(queryRangeParams, fields) if err != nil { RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil) return } - result, err, errQuriesByName = aH.execClickHouseQueries(r.Context(), queries) + + if queryRangeParams.CompositeQuery.PanelType == v3.PanelTypeList { + result, err, errQuriesByName = aH.execClickHouseListQueries(r.Context(), queries) + } else { + result, err, errQuriesByName = aH.execClickHouseGraphQueries(r.Context(), queries) + } case v3.QueryTypeClickHouseSQL: queries := make(map[string]string) for name, query := range queryRangeParams.CompositeQuery.ClickHouseQueries { @@ -2593,7 +2705,7 @@ func (aH *APIHandler) queryRangeV3(queryRangeParams *v3.QueryRangeParamsV3, w ht } queries[name] = query.Query } - result, err, errQuriesByName = aH.execClickHouseQueries(r.Context(), queries) + result, err, errQuriesByName = aH.execClickHouseGraphQueries(r.Context(), queries) case v3.QueryTypePromQL: result, err, errQuriesByName = aH.execPromQueries(r.Context(), queryRangeParams) default: @@ -2623,5 +2735,5 @@ func (aH *APIHandler) QueryRangeV3(w http.ResponseWriter, r *http.Request) { return } - aH.queryRangeV3(queryRangeParams, w, r) + aH.queryRangeV3(r.Context(), queryRangeParams, w, r) } diff --git a/pkg/query-service/app/logs/v3/query_builder.go b/pkg/query-service/app/logs/v3/query_builder.go new file mode 100644 index 0000000000..d2b1a145e6 --- /dev/null +++ b/pkg/query-service/app/logs/v3/query_builder.go @@ -0,0 +1,397 @@ +package v3 + +import ( + "fmt" + "math" + "strings" + + "go.signoz.io/signoz/pkg/query-service/constants" + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + "go.signoz.io/signoz/pkg/query-service/utils" +) + +var aggregateOperatorToPercentile = map[v3.AggregateOperator]float64{ + v3.AggregateOperatorP05: 0.05, + v3.AggregateOperatorP10: 0.10, + v3.AggregateOperatorP20: 0.20, + v3.AggregateOperatorP25: 0.25, + v3.AggregateOperatorP50: 0.50, + v3.AggregateOperatorP75: 0.75, + v3.AggregateOperatorP90: 0.90, + v3.AggregateOperatorP95: 0.95, + v3.AggregateOperatorP99: 0.99, +} + +var aggregateOperatorToSQLFunc = map[v3.AggregateOperator]string{ + v3.AggregateOperatorAvg: "avg", + v3.AggregateOperatorMax: "max", + v3.AggregateOperatorMin: "min", + v3.AggregateOperatorSum: "sum", + v3.AggregateOperatorRateSum: "sum", + v3.AggregateOperatorRateAvg: "avg", + v3.AggregateOperatorRateMax: "max", + v3.AggregateOperatorRateMin: "min", +} + +var logOperators = map[v3.FilterOperator]string{ + v3.FilterOperatorEqual: "=", + v3.FilterOperatorNotEqual: "!=", + v3.FilterOperatorLessThan: "<", + v3.FilterOperatorLessThanOrEq: "<=", + v3.FilterOperatorGreaterThan: ">", + v3.FilterOperatorGreaterThanOrEq: ">=", + v3.FilterOperatorLike: "ILIKE", + v3.FilterOperatorNotLike: "NOT ILIKE", + v3.FilterOperatorContains: "ILIKE", + v3.FilterOperatorNotContains: "NOT ILIKE", + v3.FilterOperatorRegex: "REGEXP", + v3.FilterOperatorNotRegex: "NOT REGEXP", + v3.FilterOperatorIn: "IN", + v3.FilterOperatorNotIn: "NOT IN", + v3.FilterOperatorExists: "has(%s_%s_key, '%s')", + v3.FilterOperatorNotExists: "not has(%s_%s_key, '%s')", + // (todo) check contains/not contains/ +} + +func encrichFieldWithMetadata(field v3.AttributeKey, fields map[string]v3.AttributeKey) (v3.AttributeKey, error) { + if field.Type == "" || field.DataType == "" { + // check if the field is present in the fields map + if existingField, ok := fields[field.Key]; ok { + if existingField.IsColumn { + return field, nil + } + field.Type = existingField.Type + field.DataType = existingField.DataType + } else { + return field, fmt.Errorf("field not found to enrich metadata") + } + } + return field, nil +} + +func getClickhouseLogsColumnType(columnType v3.AttributeKeyType) string { + if columnType == v3.AttributeKeyTypeTag { + return "attributes" + } + return "resources" +} + +func getClickhouseLogsColumnDataType(columnDataType v3.AttributeKeyDataType) string { + if columnDataType == v3.AttributeKeyDataTypeFloat64 { + return "float64" + } + if columnDataType == v3.AttributeKeyDataTypeInt64 { + return "int64" + } + // for bool also we are returning string as we store bool data as string. + return "string" +} + +// getClickhouseColumnName returns the corresponding clickhouse column name for the given attribute/resource key +func getClickhouseColumnName(key v3.AttributeKey, fields map[string]v3.AttributeKey) (string, error) { + clickhouseColumn := key.Key + //if the key is present in the topLevelColumn then it will be only searched in those columns, + //regardless if it is indexed/present again in resource or column attribute + var err error + _, isTopLevelCol := constants.LogsTopLevelColumnsV3[key.Key] + if !isTopLevelCol && !key.IsColumn { + key, err = encrichFieldWithMetadata(key, fields) + if err != nil { + return "", err + } + columnType := getClickhouseLogsColumnType(key.Type) + columnDataType := getClickhouseLogsColumnDataType(key.DataType) + clickhouseColumn = fmt.Sprintf("%s_%s_value[indexOf(%s_%s_key, '%s')]", columnType, columnDataType, columnType, columnDataType, key.Key) + } + return clickhouseColumn, nil +} + +// getSelectLabels returns the select labels for the query based on groupBy and aggregateOperator +func getSelectLabels(aggregatorOperator v3.AggregateOperator, groupBy []v3.AttributeKey, fields map[string]v3.AttributeKey) (string, error) { + var selectLabels string + if aggregatorOperator == v3.AggregateOperatorNoOp { + selectLabels = "" + } else { + for _, tag := range groupBy { + columnName, err := getClickhouseColumnName(tag, fields) + if err != nil { + return "", err + } + selectLabels += fmt.Sprintf(", %s as %s", columnName, tag.Key) + } + } + return selectLabels, nil +} + +func buildLogsTimeSeriesFilterQuery(fs *v3.FilterSet, fields map[string]v3.AttributeKey) (string, error) { + var conditions []string + + if fs != nil && len(fs.Items) != 0 { + for _, item := range fs.Items { + toFormat := item.Value + op := v3.FilterOperator(strings.ToLower(strings.TrimSpace(string(item.Operator)))) + if logsOp, ok := logOperators[op]; ok { + switch op { + case v3.FilterOperatorExists, v3.FilterOperatorNotExists: + //(todo): refractor this later + key, err := encrichFieldWithMetadata(item.Key, fields) + if err != nil { + return "", err + } + columnType := getClickhouseLogsColumnType(key.Type) + columnDataType := getClickhouseLogsColumnDataType(key.DataType) + conditions = append(conditions, fmt.Sprintf(logsOp, columnType, columnDataType, item.Key.Key)) + case v3.FilterOperatorContains, v3.FilterOperatorNotContains: + // generate the key + columnName, err := getClickhouseColumnName(item.Key, fields) + if err != nil { + return "", err + } + conditions = append(conditions, fmt.Sprintf("%s %s '%%%s%%'", columnName, logsOp, item.Value)) + default: + // generate the key + columnName, err := getClickhouseColumnName(item.Key, fields) + if err != nil { + return "", err + } + fmtVal := utils.ClickHouseFormattedValue(toFormat) + conditions = append(conditions, fmt.Sprintf("%s %s %s", columnName, logsOp, fmtVal)) + } + } else { + return "", fmt.Errorf("unsupported operator: %s", op) + } + } + } + queryString := strings.Join(conditions, " AND ") + + if len(queryString) > 0 { + queryString = " AND " + queryString + } + return queryString, nil +} + +// getZerosForEpochNano returns the number of zeros to be appended to the epoch time for converting it to nanoseconds +func getZerosForEpochNano(epoch int64) int64 { + count := 0 + if epoch == 0 { + count = 1 + } else { + for epoch != 0 { + epoch /= 10 + count++ + } + } + return int64(math.Pow(10, float64(19-count))) +} + +func buildLogsQuery(start, end, step int64, mq *v3.BuilderQuery, fields map[string]v3.AttributeKey) (string, error) { + + filterSubQuery, err := buildLogsTimeSeriesFilterQuery(mq.Filters, fields) + if err != nil { + return "", err + } + + // timerange will be sent in epoch millisecond + timeFilter := fmt.Sprintf("(timestamp >= %d AND timestamp <= %d)", start*getZerosForEpochNano(start), end*getZerosForEpochNano(end)) + + selectLabels, err := getSelectLabels(mq.AggregateOperator, mq.GroupBy, fields) + if err != nil { + return "", err + } + + having := having(mq.Having) + if having != "" { + having = " having " + having + } + + queryTmpl := + "SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL %d SECOND) AS ts" + selectLabels + + ", %s as value " + + "from signoz_logs.distributed_logs " + + "where " + timeFilter + "%s " + + "group by %s%s " + + "order by %sts" + + groupBy := groupByAttributeKeyTags(mq.GroupBy...) + orderBy := orderByAttributeKeyTags(mq.OrderBy, mq.GroupBy) + + aggregationKey := "" + if mq.AggregateAttribute.Key != "" { + aggregationKey, err = getClickhouseColumnName(mq.AggregateAttribute, fields) + if err != nil { + return "", err + } + } + + switch mq.AggregateOperator { + case v3.AggregateOperatorRate: + op := fmt.Sprintf("count(%s)/%d", aggregationKey, step) + query := fmt.Sprintf(queryTmpl, step, op, filterSubQuery, groupBy, having, orderBy) + return query, nil + case + v3.AggregateOperatorRateSum, + v3.AggregateOperatorRateMax, + v3.AggregateOperatorRateAvg, + v3.AggregateOperatorRateMin: + op := fmt.Sprintf("%s(%s)/%d", aggregateOperatorToSQLFunc[mq.AggregateOperator], aggregationKey, step) + query := fmt.Sprintf(queryTmpl, step, op, filterSubQuery, groupBy, having, orderBy) + return query, nil + case + v3.AggregateOperatorP05, + v3.AggregateOperatorP10, + v3.AggregateOperatorP20, + v3.AggregateOperatorP25, + v3.AggregateOperatorP50, + v3.AggregateOperatorP75, + v3.AggregateOperatorP90, + v3.AggregateOperatorP95, + v3.AggregateOperatorP99: + op := fmt.Sprintf("quantile(%v)(%s)", aggregateOperatorToPercentile[mq.AggregateOperator], aggregationKey) + query := fmt.Sprintf(queryTmpl, step, op, filterSubQuery, groupBy, having, orderBy) + return query, nil + case v3.AggregateOperatorAvg, v3.AggregateOperatorSum, v3.AggregateOperatorMin, v3.AggregateOperatorMax: + op := fmt.Sprintf("%s(%s)", aggregateOperatorToSQLFunc[mq.AggregateOperator], aggregationKey) + query := fmt.Sprintf(queryTmpl, step, op, filterSubQuery, groupBy, having, orderBy) + return query, nil + case v3.AggregateOpeatorCount: + if mq.AggregateAttribute.Key != "" { + field, err := encrichFieldWithMetadata(mq.AggregateAttribute, fields) + if err != nil { + return "", err + } + columnType := getClickhouseLogsColumnType(field.Type) + columnDataType := getClickhouseLogsColumnDataType(field.DataType) + filterSubQuery = fmt.Sprintf("%s AND has(%s_%s_key, '%s')", filterSubQuery, columnType, columnDataType, mq.AggregateAttribute.Key) + // check having + } + + op := "toFloat64(count(*))" + query := fmt.Sprintf(queryTmpl, step, op, filterSubQuery, groupBy, having, orderBy) + return query, nil + case v3.AggregateOperatorCountDistinct: + op := fmt.Sprintf("toFloat64(count(distinct(%s)))", aggregationKey) + query := fmt.Sprintf(queryTmpl, step, op, filterSubQuery, groupBy, having, orderBy) + return query, nil + case v3.AggregateOperatorNoOp: + queryTmpl := constants.LogsSQLSelect + "from signoz_logs.distributed_logs where %s %s" + query := fmt.Sprintf(queryTmpl, timeFilter, filterSubQuery) + return query, nil + default: + return "", fmt.Errorf("unsupported aggregate operator") + } +} + +// groupBy returns a string of comma separated tags for group by clause +// `ts` is always added to the group by clause +func groupBy(tags ...string) string { + tags = append(tags, "ts") + return strings.Join(tags, ",") +} + +func groupByAttributeKeyTags(tags ...v3.AttributeKey) string { + groupTags := []string{} + for _, tag := range tags { + groupTags = append(groupTags, tag.Key) + } + return groupBy(groupTags...) +} + +// orderBy returns a string of comma separated tags for order by clause +// if the order is not specified, it defaults to ASC +func orderBy(items []v3.OrderBy, tags []string) string { + var orderBy []string + for _, tag := range tags { + found := false + for _, item := range items { + if item.ColumnName == tag { + found = true + orderBy = append(orderBy, fmt.Sprintf("%s %s", item.ColumnName, item.Order)) + break + } + } + if !found { + orderBy = append(orderBy, fmt.Sprintf("%s ASC", tag)) + } + } + + // users might want to order by value of aggreagation + for _, item := range items { + if item.ColumnName == constants.SigNozOrderByValue { + orderBy = append(orderBy, fmt.Sprintf("value %s", item.Order)) + } + } + return strings.Join(orderBy, ",") +} + +func orderByAttributeKeyTags(items []v3.OrderBy, tags []v3.AttributeKey) string { + var groupTags []string + for _, tag := range tags { + groupTags = append(groupTags, tag.Key) + } + str := orderBy(items, groupTags) + if len(str) > 0 { + str = str + "," + } + return str +} + +func having(items []v3.Having) string { + // aggregate something and filter on that aggregate + var having []string + for _, item := range items { + having = append(having, fmt.Sprintf("value %s %s", item.Operator, utils.ClickHouseFormattedValue(item.Value))) + } + return strings.Join(having, " AND ") +} + +func reduceQuery(query string, reduceTo v3.ReduceToOperator, aggregateOperator v3.AggregateOperator) (string, error) { + // the timestamp picked is not relevant here since the final value used is show the single + // chart with just the query value. + switch reduceTo { + case v3.ReduceToOperatorLast: + query = fmt.Sprintf("SELECT anyLast(value) as value, any(ts) as ts FROM (%s)", query) + case v3.ReduceToOperatorSum: + query = fmt.Sprintf("SELECT sum(value) as value, any(ts) as ts FROM (%s)", query) + case v3.ReduceToOperatorAvg: + query = fmt.Sprintf("SELECT avg(value) as value, any(ts) as ts FROM (%s)", query) + case v3.ReduceToOperatorMax: + query = fmt.Sprintf("SELECT max(value) as value, any(ts) as ts FROM (%s)", query) + case v3.ReduceToOperatorMin: + query = fmt.Sprintf("SELECT min(value) as value, any(ts) as ts FROM (%s)", query) + default: + return "", fmt.Errorf("unsupported reduce operator") + } + return query, nil +} + +func addLimitToQuery(query string, limit uint64, panelType v3.PanelType) string { + if limit == 0 { + limit = 100 + } + if panelType == v3.PanelTypeList { + return fmt.Sprintf("%s LIMIT %d", query, limit) + } + return query +} + +func addOffsetToQuery(query string, offset uint64) string { + return fmt.Sprintf("%s OFFSET %d", query, offset) +} + +func PrepareLogsQuery(start, end int64, queryType v3.QueryType, panelType v3.PanelType, mq *v3.BuilderQuery, fields map[string]v3.AttributeKey) (string, error) { + query, err := buildLogsQuery(start, end, mq.StepInterval, mq, fields) + if err != nil { + return "", err + } + if panelType == v3.PanelTypeValue { + query, err = reduceQuery(query, mq.ReduceTo, mq.AggregateOperator) + } + + query = addLimitToQuery(query, mq.Limit, panelType) + + if mq.Offset != 0 { + query = addOffsetToQuery(query, mq.Offset) + } + + return query, err +} diff --git a/pkg/query-service/app/logs/v3/query_builder_test.go b/pkg/query-service/app/logs/v3/query_builder_test.go new file mode 100644 index 0000000000..b67d5b4f8e --- /dev/null +++ b/pkg/query-service/app/logs/v3/query_builder_test.go @@ -0,0 +1,695 @@ +package v3 + +import ( + "testing" + + . "github.com/smartystreets/goconvey/convey" + "go.signoz.io/signoz/pkg/query-service/constants" + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" +) + +var testGetClickhouseColumnNameData = []struct { + Name string + AttributeKey v3.AttributeKey + ExpectedColumnName string +}{ + { + Name: "attribute", + AttributeKey: v3.AttributeKey{Key: "user_name", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}, + ExpectedColumnName: "attributes_string_value[indexOf(attributes_string_key, 'user_name')]", + }, + { + Name: "resource", + AttributeKey: v3.AttributeKey{Key: "servicename", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeResource}, + ExpectedColumnName: "resources_string_value[indexOf(resources_string_key, 'servicename')]", + }, + { + Name: "selected field", + AttributeKey: v3.AttributeKey{Key: "servicename", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: true}, + ExpectedColumnName: "servicename", + }, + { + Name: "top level column", + AttributeKey: v3.AttributeKey{Key: "trace_id", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}, + ExpectedColumnName: "trace_id", + }, + { + Name: "top level column with isColumn ignored", + AttributeKey: v3.AttributeKey{Key: "trace_id", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: false}, + ExpectedColumnName: "trace_id", + }, +} + +func TestGetClickhouseColumnName(t *testing.T) { + for _, tt := range testGetClickhouseColumnNameData { + Convey("testGetClickhouseColumnNameData", t, func() { + columnName, err := getClickhouseColumnName(tt.AttributeKey, map[string]v3.AttributeKey{}) + So(err, ShouldBeNil) + So(columnName, ShouldEqual, tt.ExpectedColumnName) + }) + } +} + +var testGetSelectLabelsData = []struct { + Name string + AggregateOperator v3.AggregateOperator + GroupByTags []v3.AttributeKey + SelectLabels string +}{ + { + Name: "select fields for groupBy attribute", + AggregateOperator: v3.AggregateOpeatorCount, + GroupByTags: []v3.AttributeKey{{Key: "user_name", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}}, + SelectLabels: ", attributes_string_value[indexOf(attributes_string_key, 'user_name')] as user_name", + }, + { + Name: "select fields for groupBy resource", + AggregateOperator: v3.AggregateOpeatorCount, + GroupByTags: []v3.AttributeKey{{Key: "user_name", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeResource}}, + SelectLabels: ", resources_string_value[indexOf(resources_string_key, 'user_name')] as user_name", + }, + { + Name: "select fields for groupBy attribute and resource", + AggregateOperator: v3.AggregateOpeatorCount, + GroupByTags: []v3.AttributeKey{ + {Key: "user_name", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeResource}, + {Key: "host", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}, + }, + SelectLabels: ", resources_string_value[indexOf(resources_string_key, 'user_name')] as user_name, attributes_string_value[indexOf(attributes_string_key, 'host')] as host", + }, + { + Name: "select fields for groupBy materialized columns", + AggregateOperator: v3.AggregateOpeatorCount, + GroupByTags: []v3.AttributeKey{{Key: "host", IsColumn: true}}, + SelectLabels: ", host as host", + }, +} + +func TestGetSelectLabels(t *testing.T) { + for _, tt := range testGetSelectLabelsData { + Convey("testGetSelectLabelsData", t, func() { + selectLabels, err := getSelectLabels(tt.AggregateOperator, tt.GroupByTags, map[string]v3.AttributeKey{}) + So(err, ShouldBeNil) + So(selectLabels, ShouldEqual, tt.SelectLabels) + }) + } +} + +var timeSeriesFilterQueryData = []struct { + Name string + FilterSet *v3.FilterSet + ExpectedFilter string +}{ + { + Name: "Test attribute and resource attribute", + FilterSet: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{ + {Key: v3.AttributeKey{Key: "user_name", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}, Value: "john", Operator: "="}, + {Key: v3.AttributeKey{Key: "k8s_namespace", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeResource}, Value: "my_service", Operator: "!="}, + }}, + ExpectedFilter: " AND attributes_string_value[indexOf(attributes_string_key, 'user_name')] = 'john' AND resources_string_value[indexOf(resources_string_key, 'k8s_namespace')] != 'my_service'", + }, + { + Name: "Test materialized column", + FilterSet: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{ + {Key: v3.AttributeKey{Key: "user_name", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: true}, Value: "john", Operator: "="}, + {Key: v3.AttributeKey{Key: "k8s_namespace", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeResource}, Value: "my_service", Operator: "!="}, + }}, + ExpectedFilter: " AND user_name = 'john' AND resources_string_value[indexOf(resources_string_key, 'k8s_namespace')] != 'my_service'", + }, + { + Name: "Test like", + FilterSet: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{ + {Key: v3.AttributeKey{Key: "host", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}, Value: "102.%", Operator: "like"}, + }}, + ExpectedFilter: " AND attributes_string_value[indexOf(attributes_string_key, 'host')] ILIKE '102.%'", + }, + { + Name: "Test IN", + FilterSet: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{ + {Key: v3.AttributeKey{Key: "bytes", DataType: v3.AttributeKeyDataTypeFloat64, Type: v3.AttributeKeyTypeTag}, Value: []interface{}{1, 2, 3, 4}, Operator: "in"}, + }}, + ExpectedFilter: " AND attributes_float64_value[indexOf(attributes_float64_key, 'bytes')] IN [1,2,3,4]", + }, + { + Name: "Test DataType int64", + FilterSet: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{ + {Key: v3.AttributeKey{Key: "bytes", DataType: v3.AttributeKeyDataTypeInt64, Type: v3.AttributeKeyTypeTag}, Value: 10, Operator: ">"}, + }}, + ExpectedFilter: " AND attributes_int64_value[indexOf(attributes_int64_key, 'bytes')] > 10", + }, + { + Name: "Test NOT IN", + FilterSet: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{ + {Key: v3.AttributeKey{Key: "name", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}, Value: []interface{}{"john", "bunny"}, Operator: "nin"}, + }}, + ExpectedFilter: " AND attributes_string_value[indexOf(attributes_string_key, 'name')] NOT IN ['john','bunny']", + }, + { + Name: "Test exists", + FilterSet: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{ + {Key: v3.AttributeKey{Key: "bytes", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}, Value: "", Operator: "exists"}, + }}, + ExpectedFilter: " AND has(attributes_string_key, 'bytes')", + }, + { + Name: "Test not exists", + FilterSet: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{ + {Key: v3.AttributeKey{Key: "bytes", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}, Value: "", Operator: "nexists"}, + }}, + ExpectedFilter: " AND not has(attributes_string_key, 'bytes')", + }, + { + Name: "Test contains", + FilterSet: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{ + {Key: v3.AttributeKey{Key: "host", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}, Value: "102.", Operator: "contains"}, + }}, + ExpectedFilter: " AND attributes_string_value[indexOf(attributes_string_key, 'host')] ILIKE '%102.%'", + }, + { + Name: "Test not contains", + FilterSet: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{ + {Key: v3.AttributeKey{Key: "host", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}, Value: "102.", Operator: "ncontains"}, + }}, + ExpectedFilter: " AND attributes_string_value[indexOf(attributes_string_key, 'host')] NOT ILIKE '%102.%'", + }, +} + +func TestBuildLogsTimeSeriesFilterQuery(t *testing.T) { + for _, tt := range timeSeriesFilterQueryData { + Convey("TestBuildLogsTimeSeriesFilterQuery", t, func() { + query, err := buildLogsTimeSeriesFilterQuery(tt.FilterSet, map[string]v3.AttributeKey{}) + So(err, ShouldBeNil) + So(query, ShouldEqual, tt.ExpectedFilter) + }) + } +} + +var testBuildLogsQueryData = []struct { + Name string + Start int64 + End int64 + Step int64 + BuilderQuery *v3.BuilderQuery + GroupByTags []v3.AttributeKey + TableName string + AggregateOperator v3.AggregateOperator + ExpectedQuery string +}{ + { + Name: "Test aggregate count on select field", + Start: 1680066360726210000, + End: 1680066458000000000, + Step: 60, + BuilderQuery: &v3.BuilderQuery{ + QueryName: "A", + AggregateOperator: v3.AggregateOpeatorCount, + Expression: "A", + }, + TableName: "logs", + ExpectedQuery: "SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 60 SECOND) AS ts, toFloat64(count(*)) as value from signoz_logs.distributed_logs where (timestamp >= 1680066360726210000 AND timestamp <= 1680066458000000000) group by ts order by ts", + }, + { + Name: "Test aggregate count on a attribute", + Start: 1680066360726210000, + End: 1680066458000000000, + Step: 60, + BuilderQuery: &v3.BuilderQuery{ + QueryName: "A", + AggregateAttribute: v3.AttributeKey{Key: "user_name", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}, + AggregateOperator: v3.AggregateOpeatorCount, + Expression: "A", + }, + TableName: "logs", + ExpectedQuery: "SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 60 SECOND) AS ts, toFloat64(count(*)) as value from signoz_logs.distributed_logs where (timestamp >= 1680066360726210000 AND timestamp <= 1680066458000000000) AND has(attributes_string_key, 'user_name') group by ts order by ts", + }, + { + Name: "Test aggregate count on a with filter", + Start: 1680066360726210000, + End: 1680066458000000000, + Step: 60, + BuilderQuery: &v3.BuilderQuery{ + QueryName: "A", + AggregateAttribute: v3.AttributeKey{Key: "user_name", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}, + AggregateOperator: v3.AggregateOpeatorCount, + Filters: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{ + {Key: v3.AttributeKey{Key: "bytes", DataType: v3.AttributeKeyDataTypeFloat64, Type: v3.AttributeKeyTypeTag}, Value: 100, Operator: ">"}, + }}, + Expression: "A", + }, + TableName: "logs", + ExpectedQuery: "SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 60 SECOND) AS ts, toFloat64(count(*)) as value from signoz_logs.distributed_logs where (timestamp >= 1680066360726210000 AND timestamp <= 1680066458000000000) AND attributes_float64_value[indexOf(attributes_float64_key, 'bytes')] > 100 AND has(attributes_string_key, 'user_name') group by ts order by ts", + }, + { + Name: "Test aggregate count distinct and order by value", + Start: 1680066360726210000, + End: 1680066458000000000, + Step: 60, + BuilderQuery: &v3.BuilderQuery{ + QueryName: "A", + AggregateAttribute: v3.AttributeKey{Key: "name", IsColumn: true}, + AggregateOperator: v3.AggregateOperatorCountDistinct, + Expression: "A", + OrderBy: []v3.OrderBy{{ColumnName: "#SIGNOZ_VALUE", Order: "ASC"}}, + }, + TableName: "logs", + ExpectedQuery: "SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 60 SECOND) AS ts, toFloat64(count(distinct(name))) as value from signoz_logs.distributed_logs where (timestamp >= 1680066360726210000 AND timestamp <= 1680066458000000000) group by ts order by value ASC,ts", + }, + { + Name: "Test aggregate count distinct on non selected field", + Start: 1680066360726210000, + End: 1680066458000000000, + Step: 60, + BuilderQuery: &v3.BuilderQuery{ + QueryName: "A", + AggregateAttribute: v3.AttributeKey{Key: "name", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}, + AggregateOperator: v3.AggregateOperatorCountDistinct, + Expression: "A", + }, + TableName: "logs", + ExpectedQuery: "SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 60 SECOND) AS ts, toFloat64(count(distinct(attributes_string_value[indexOf(attributes_string_key, 'name')]))) as value from signoz_logs.distributed_logs where (timestamp >= 1680066360726210000 AND timestamp <= 1680066458000000000) group by ts order by ts", + }, + { + Name: "Test aggregate count distinct with filter and groupBy", + Start: 1680066360726210000, + End: 1680066458000000000, + Step: 60, + BuilderQuery: &v3.BuilderQuery{ + QueryName: "A", + AggregateAttribute: v3.AttributeKey{Key: "name", IsColumn: true}, + AggregateOperator: v3.AggregateOperatorCountDistinct, + Expression: "A", + Filters: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{ + {Key: v3.AttributeKey{Key: "method", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}, Value: "GET", Operator: "="}, + {Key: v3.AttributeKey{Key: "x", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeResource}, Value: "abc", Operator: "!="}, + }, + }, + GroupBy: []v3.AttributeKey{{Key: "method", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}}, + OrderBy: []v3.OrderBy{{ColumnName: "method", Order: "ASC"}, {ColumnName: "ts", Order: "ASC"}}, + }, + TableName: "logs", + ExpectedQuery: "SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 60 SECOND) AS ts," + + " attributes_string_value[indexOf(attributes_string_key, 'method')] as method, " + + "toFloat64(count(distinct(name))) as value from signoz_logs.distributed_logs " + + "where (timestamp >= 1680066360726210000 AND timestamp <= 1680066458000000000) " + + "AND attributes_string_value[indexOf(attributes_string_key, 'method')] = 'GET' AND resources_string_value[indexOf(resources_string_key, 'x')] != 'abc' " + + "group by method,ts " + + "order by method ASC,ts", + }, + { + Name: "Test aggregate count with multiple filter,groupBy and orderBy", + Start: 1680066360726210000, + End: 1680066458000000000, + Step: 60, + BuilderQuery: &v3.BuilderQuery{ + QueryName: "A", + AggregateAttribute: v3.AttributeKey{Key: "name", IsColumn: true}, + AggregateOperator: v3.AggregateOperatorCountDistinct, + Expression: "A", + Filters: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{ + {Key: v3.AttributeKey{Key: "method", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}, Value: "GET", Operator: "="}, + {Key: v3.AttributeKey{Key: "x", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeResource}, Value: "abc", Operator: "!="}, + }, + }, + GroupBy: []v3.AttributeKey{{Key: "method", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}, {Key: "x", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeResource}}, + OrderBy: []v3.OrderBy{{ColumnName: "method", Order: "ASC"}, {ColumnName: "x", Order: "ASC"}}, + }, + TableName: "logs", + ExpectedQuery: "SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 60 SECOND) AS ts," + + " attributes_string_value[indexOf(attributes_string_key, 'method')] as method, " + + "resources_string_value[indexOf(resources_string_key, 'x')] as x, " + + "toFloat64(count(distinct(name))) as value from signoz_logs.distributed_logs " + + "where (timestamp >= 1680066360726210000 AND timestamp <= 1680066458000000000) " + + "AND attributes_string_value[indexOf(attributes_string_key, 'method')] = 'GET' AND resources_string_value[indexOf(resources_string_key, 'x')] != 'abc' " + + "group by method,x,ts " + + "order by method ASC,x ASC,ts", + }, + { + Name: "Test aggregate avg", + Start: 1680066360726210000, + End: 1680066458000000000, + Step: 60, + BuilderQuery: &v3.BuilderQuery{ + QueryName: "A", + AggregateAttribute: v3.AttributeKey{Key: "bytes", DataType: v3.AttributeKeyDataTypeFloat64, Type: v3.AttributeKeyTypeTag}, + AggregateOperator: v3.AggregateOperatorAvg, + Expression: "A", + Filters: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{ + {Key: v3.AttributeKey{Key: "method", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}, Value: "GET", Operator: "="}, + }, + }, + GroupBy: []v3.AttributeKey{{Key: "method", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}}, + OrderBy: []v3.OrderBy{{ColumnName: "method", Order: "ASC"}, {ColumnName: "x", Order: "ASC"}}, + }, + TableName: "logs", + ExpectedQuery: "SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 60 SECOND) AS ts," + + " attributes_string_value[indexOf(attributes_string_key, 'method')] as method, " + + "avg(attributes_float64_value[indexOf(attributes_float64_key, 'bytes')]) as value " + + "from signoz_logs.distributed_logs " + + "where (timestamp >= 1680066360726210000 AND timestamp <= 1680066458000000000) " + + "AND attributes_string_value[indexOf(attributes_string_key, 'method')] = 'GET' " + + "group by method,ts " + + "order by method ASC,ts", + }, + { + Name: "Test aggregate sum", + Start: 1680066360726210000, + End: 1680066458000000000, + Step: 60, + BuilderQuery: &v3.BuilderQuery{ + QueryName: "A", + AggregateAttribute: v3.AttributeKey{Key: "bytes", IsColumn: true}, + AggregateOperator: v3.AggregateOperatorSum, + Expression: "A", + Filters: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{ + {Key: v3.AttributeKey{Key: "method", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}, Value: "GET", Operator: "="}, + }, + }, + GroupBy: []v3.AttributeKey{{Key: "method", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}}, + OrderBy: []v3.OrderBy{{ColumnName: "method", Order: "ASC"}}, + }, + TableName: "logs", + ExpectedQuery: "SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 60 SECOND) AS ts," + + " attributes_string_value[indexOf(attributes_string_key, 'method')] as method, " + + "sum(bytes) as value " + + "from signoz_logs.distributed_logs " + + "where (timestamp >= 1680066360726210000 AND timestamp <= 1680066458000000000) " + + "AND attributes_string_value[indexOf(attributes_string_key, 'method')] = 'GET' " + + "group by method,ts " + + "order by method ASC,ts", + }, + { + Name: "Test aggregate min", + Start: 1680066360726210000, + End: 1680066458000000000, + Step: 60, + BuilderQuery: &v3.BuilderQuery{ + QueryName: "A", + AggregateAttribute: v3.AttributeKey{Key: "bytes", IsColumn: true}, + AggregateOperator: v3.AggregateOperatorMin, + Expression: "A", + Filters: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{ + {Key: v3.AttributeKey{Key: "method", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}, Value: "GET", Operator: "="}, + }, + }, + GroupBy: []v3.AttributeKey{{Key: "method", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}}, + OrderBy: []v3.OrderBy{{ColumnName: "method", Order: "ASC"}}, + }, + TableName: "logs", + ExpectedQuery: "SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 60 SECOND) AS ts," + + " attributes_string_value[indexOf(attributes_string_key, 'method')] as method, " + + "min(bytes) as value " + + "from signoz_logs.distributed_logs " + + "where (timestamp >= 1680066360726210000 AND timestamp <= 1680066458000000000) " + + "AND attributes_string_value[indexOf(attributes_string_key, 'method')] = 'GET' " + + "group by method,ts " + + "order by method ASC,ts", + }, + { + Name: "Test aggregate max", + Start: 1680066360726210000, + End: 1680066458000000000, + Step: 60, + BuilderQuery: &v3.BuilderQuery{ + QueryName: "A", + AggregateAttribute: v3.AttributeKey{Key: "bytes", IsColumn: true}, + AggregateOperator: v3.AggregateOperatorMax, + Expression: "A", + Filters: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{ + {Key: v3.AttributeKey{Key: "method", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}, Value: "GET", Operator: "="}, + }, + }, + GroupBy: []v3.AttributeKey{{Key: "method", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}}, + OrderBy: []v3.OrderBy{{ColumnName: "method", Order: "ASC"}}, + }, + TableName: "logs", + ExpectedQuery: "SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 60 SECOND) AS ts," + + " attributes_string_value[indexOf(attributes_string_key, 'method')] as method, " + + "max(bytes) as value " + + "from signoz_logs.distributed_logs " + + "where (timestamp >= 1680066360726210000 AND timestamp <= 1680066458000000000) " + + "AND attributes_string_value[indexOf(attributes_string_key, 'method')] = 'GET' " + + "group by method,ts " + + "order by method ASC,ts", + }, + { + Name: "Test aggregate PXX", + Start: 1680066360726210000, + End: 1680066458000000000, + Step: 60, + BuilderQuery: &v3.BuilderQuery{ + QueryName: "A", + AggregateAttribute: v3.AttributeKey{Key: "bytes", IsColumn: true}, + AggregateOperator: v3.AggregateOperatorP05, + Expression: "A", + Filters: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{}}, + GroupBy: []v3.AttributeKey{{Key: "method", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}}, + OrderBy: []v3.OrderBy{{ColumnName: "method", Order: "ASC"}}, + }, + TableName: "logs", + ExpectedQuery: "SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 60 SECOND) AS ts," + + " attributes_string_value[indexOf(attributes_string_key, 'method')] as method, " + + "quantile(0.05)(bytes) as value " + + "from signoz_logs.distributed_logs " + + "where (timestamp >= 1680066360726210000 AND timestamp <= 1680066458000000000) " + + "group by method,ts " + + "order by method ASC,ts", + }, + { + Name: "Test aggregate RateSum", + Start: 1680066360726210000, + End: 1680066458000000000, + Step: 60, + BuilderQuery: &v3.BuilderQuery{ + QueryName: "A", + AggregateAttribute: v3.AttributeKey{Key: "bytes", IsColumn: true}, + AggregateOperator: v3.AggregateOperatorRateSum, + Expression: "A", + Filters: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{}}, + GroupBy: []v3.AttributeKey{{Key: "method", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}}, + OrderBy: []v3.OrderBy{{ColumnName: "method", Order: "ASC"}}, + }, + TableName: "logs", + ExpectedQuery: "SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 60 SECOND) AS ts, attributes_string_value[indexOf(attributes_string_key, 'method')] as method" + + ", sum(bytes)/60 as value from signoz_logs.distributed_logs " + + "where (timestamp >= 1680066360726210000 AND timestamp <= 1680066458000000000)" + + " group by method,ts order by method ASC,ts", + }, + { + Name: "Test aggregate rate", + Start: 1680066360726210000, + End: 1680066458000000000, + Step: 60, + BuilderQuery: &v3.BuilderQuery{ + QueryName: "A", + AggregateAttribute: v3.AttributeKey{Key: "bytes", Type: v3.AttributeKeyTypeTag, DataType: v3.AttributeKeyDataTypeFloat64}, + AggregateOperator: v3.AggregateOperatorRate, + Expression: "A", + Filters: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{}}, + GroupBy: []v3.AttributeKey{{Key: "method", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}}, + OrderBy: []v3.OrderBy{{ColumnName: "method", Order: "ASC"}}, + }, + TableName: "logs", + ExpectedQuery: "SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 60 SECOND) AS ts, attributes_string_value[indexOf(attributes_string_key, 'method')] as method" + + ", count(attributes_float64_value[indexOf(attributes_float64_key, 'bytes')])/60 as value " + + "from signoz_logs.distributed_logs where (timestamp >= 1680066360726210000 AND timestamp <= 1680066458000000000) " + + "group by method,ts " + + "order by method ASC,ts", + }, + { + Name: "Test aggregate RateSum without materialized column", + Start: 1680066360726210000, + End: 1680066458000000000, + Step: 60, + BuilderQuery: &v3.BuilderQuery{ + QueryName: "A", + AggregateAttribute: v3.AttributeKey{Key: "bytes", Type: v3.AttributeKeyTypeTag, DataType: v3.AttributeKeyDataTypeFloat64}, + AggregateOperator: v3.AggregateOperatorRateSum, + Expression: "A", + Filters: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{}}, + GroupBy: []v3.AttributeKey{{Key: "method", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}}, + OrderBy: []v3.OrderBy{{ColumnName: "method", Order: "ASC"}}, + }, + TableName: "logs", + ExpectedQuery: "SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 60 SECOND) AS ts, " + + "attributes_string_value[indexOf(attributes_string_key, 'method')] as method, " + + "sum(attributes_float64_value[indexOf(attributes_float64_key, 'bytes')])/60 as value " + + "from signoz_logs.distributed_logs where (timestamp >= 1680066360726210000 AND timestamp <= 1680066458000000000) " + + "group by method,ts " + + "order by method ASC,ts", + }, + { + Name: "Test Noop", + Start: 1680066360726210000, + End: 1680066458000000000, + Step: 60, + BuilderQuery: &v3.BuilderQuery{ + SelectColumns: []v3.AttributeKey{}, + QueryName: "A", + AggregateOperator: v3.AggregateOperatorNoOp, + Expression: "A", + Filters: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{}}, + // GroupBy: []v3.AttributeKey{{Key: "method", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}}, + // OrderBy: []v3.OrderBy{{ColumnName: "method", Order: "ASC"}}, + }, + ExpectedQuery: "SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, body,CAST((attributes_string_key, attributes_string_value), 'Map(String, String)') as attributes_string," + + "CAST((attributes_int64_key, attributes_int64_value), 'Map(String, Int64)') as attributes_int64,CAST((attributes_float64_key, attributes_float64_value), 'Map(String, Float64)') as attributes_float64," + + "CAST((resources_string_key, resources_string_value), 'Map(String, String)') as resources_string " + + "from signoz_logs.distributed_logs where (timestamp >= 1680066360726210000 AND timestamp <= 1680066458000000000) ", + }, + { + Name: "Test aggregate with having clause", + Start: 1680066360726210000, + End: 1680066458000000000, + Step: 60, + BuilderQuery: &v3.BuilderQuery{ + QueryName: "A", + AggregateAttribute: v3.AttributeKey{Key: "name", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}, + AggregateOperator: v3.AggregateOperatorCountDistinct, + Expression: "A", + Having: []v3.Having{ + { + ColumnName: "name", + Operator: ">", + Value: 10, + }, + }, + }, + TableName: "logs", + ExpectedQuery: "SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 60 SECOND) AS ts, toFloat64(count(distinct(attributes_string_value[indexOf(attributes_string_key, 'name')]))) as value from signoz_logs.distributed_logs where (timestamp >= 1680066360726210000 AND timestamp <= 1680066458000000000) group by ts having value > 10 order by ts", + }, + { + Name: "Test aggregate with having clause and filters", + Start: 1680066360726210000, + End: 1680066458000000000, + Step: 60, + BuilderQuery: &v3.BuilderQuery{ + QueryName: "A", + AggregateAttribute: v3.AttributeKey{Key: "name", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}, + AggregateOperator: v3.AggregateOperatorCountDistinct, + Expression: "A", + Filters: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{ + {Key: v3.AttributeKey{Key: "method", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}, Value: "GET", Operator: "="}, + }, + }, + Having: []v3.Having{ + { + ColumnName: "name", + Operator: ">", + Value: 10, + }, + }, + }, + TableName: "logs", + ExpectedQuery: "SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 60 SECOND) AS ts, toFloat64(count(distinct(attributes_string_value[indexOf(attributes_string_key, 'name')]))) as value from signoz_logs.distributed_logs where (timestamp >= 1680066360726210000 AND timestamp <= 1680066458000000000) AND attributes_string_value[indexOf(attributes_string_key, 'method')] = 'GET' group by ts having value > 10 order by ts", + }, +} + +func TestBuildLogsQuery(t *testing.T) { + for _, tt := range testBuildLogsQueryData { + Convey("TestBuildLogsQuery", t, func() { + query, err := buildLogsQuery(tt.Start, tt.End, tt.Step, tt.BuilderQuery, map[string]v3.AttributeKey{}) + So(err, ShouldBeNil) + So(query, ShouldEqual, tt.ExpectedQuery) + + }) + } +} + +var testGetZerosForEpochNanoData = []struct { + Name string + Epoch int64 + Multiplier int64 + Result int64 +}{ + { + Name: "Test 1", + Epoch: 1680712080000, + Multiplier: 1000000, + Result: 1680712080000000000, + }, + { + Name: "Test 1", + Epoch: 1680712080000000000, + Multiplier: 1, + Result: 1680712080000000000, + }, +} + +func TestGetZerosForEpochNano(t *testing.T) { + for _, tt := range testGetZerosForEpochNanoData { + Convey("testGetZerosForEpochNanoData", t, func() { + multiplier := getZerosForEpochNano(tt.Epoch) + So(multiplier, ShouldEqual, tt.Multiplier) + So(tt.Epoch*multiplier, ShouldEqual, tt.Result) + }) + } +} + +var testOrderBy = []struct { + Name string + Items []v3.OrderBy + Tags []string + Result string +}{ + { + Name: "Test 1", + Items: []v3.OrderBy{ + { + ColumnName: "name", + Order: "asc", + }, + { + ColumnName: constants.SigNozOrderByValue, + Order: "desc", + }, + }, + Tags: []string{"name"}, + Result: "name asc,value desc", + }, + { + Name: "Test 2", + Items: []v3.OrderBy{ + { + ColumnName: "name", + Order: "asc", + }, + { + ColumnName: "bytes", + Order: "asc", + }, + }, + Tags: []string{"name", "bytes"}, + Result: "name asc,bytes asc", + }, + { + Name: "Test 3", + Items: []v3.OrderBy{ + { + ColumnName: "name", + Order: "asc", + }, + { + ColumnName: constants.SigNozOrderByValue, + Order: "asc", + }, + { + ColumnName: "bytes", + Order: "asc", + }, + }, + Tags: []string{"name", "bytes"}, + Result: "name asc,bytes asc,value asc", + }, +} + +func TestOrderBy(t *testing.T) { + for _, tt := range testOrderBy { + Convey("testOrderBy", t, func() { + res := orderBy(tt.Items, tt.Tags) + So(res, ShouldEqual, tt.Result) + + // So(multiplier, ShouldEqual, tt.Multiplier) + // So(tt.Epoch*multiplier, ShouldEqual, tt.Result) + }) + } +} diff --git a/pkg/query-service/app/query_builder.go b/pkg/query-service/app/query_builder.go index e353f5f3df..59627ed4af 100644 --- a/pkg/query-service/app/query_builder.go +++ b/pkg/query-service/app/query_builder.go @@ -36,7 +36,7 @@ var SupportedFunctions = []string{ var evalFuncs = map[string]govaluate.ExpressionFunction{} type prepareTracesQueryFunc func(start, end int64, queryType v3.QueryType, panelType v3.PanelType, bq *v3.BuilderQuery) (string, error) -type prepareLogsQueryFunc func(start, end int64, queryType v3.QueryType, panelType v3.PanelType, bq *v3.BuilderQuery) (string, error) +type prepareLogsQueryFunc func(start, end int64, queryType v3.QueryType, panelType v3.PanelType, bq *v3.BuilderQuery, fields map[string]v3.AttributeKey) (string, error) type prepareMetricQueryFunc func(start, end int64, queryType v3.QueryType, panelType v3.PanelType, bq *v3.BuilderQuery) (string, error) type queryBuilder struct { @@ -127,7 +127,7 @@ func expressionToQuery(qp *v3.QueryRangeParamsV3, varToQuery map[string]string, return formulaQuery, nil } -func (qb *queryBuilder) prepareQueries(params *v3.QueryRangeParamsV3) (map[string]string, error) { +func (qb *queryBuilder) prepareQueries(params *v3.QueryRangeParamsV3, args ...interface{}) (map[string]string, error) { queries := make(map[string]string) compositeQuery := params.CompositeQuery @@ -145,7 +145,11 @@ func (qb *queryBuilder) prepareQueries(params *v3.QueryRangeParamsV3) (map[strin } queries[queryName] = queryString case v3.DataSourceLogs: - queryString, err := qb.options.BuildLogQuery(params.Start, params.End, compositeQuery.QueryType, compositeQuery.PanelType, query) + fields := map[string]v3.AttributeKey{} + if len(args) == 1 { + fields = args[0].(map[string]v3.AttributeKey) + } + queryString, err := qb.options.BuildLogQuery(params.Start, params.End, compositeQuery.QueryType, compositeQuery.PanelType, query, fields) if err != nil { return nil, err } diff --git a/pkg/query-service/constants/constants.go b/pkg/query-service/constants/constants.go index 574e35b82d..d0ded00bbe 100644 --- a/pkg/query-service/constants/constants.go +++ b/pkg/query-service/constants/constants.go @@ -266,3 +266,5 @@ var LogsTopLevelColumnsV3 = map[string]struct{}{ "timestamp": {}, "id": {}, } + +const SigNozOrderByValue = "#SIGNOZ_VALUE" diff --git a/pkg/query-service/interfaces/interface.go b/pkg/query-service/interfaces/interface.go index 59037fba3d..78d4031724 100644 --- a/pkg/query-service/interfaces/interface.go +++ b/pkg/query-service/interfaces/interface.go @@ -60,7 +60,10 @@ type Reader interface { GetMetricAggregateAttributes(ctx context.Context, req *v3.AggregateAttributeRequest) (*v3.AggregateAttributeResponse, error) GetMetricAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) GetMetricAttributeValues(ctx context.Context, req *v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error) + + // QB V3 metrics/traces/logs GetTimeSeriesResultV3(ctx context.Context, query string) ([]*v3.Series, error) + GetListResultV3(ctx context.Context, query string) ([]*v3.Row, error) GetTotalSpans(ctx context.Context) (uint64, error) GetSpansInLastHeartBeatInterval(ctx context.Context) (uint64, error) diff --git a/pkg/query-service/model/v3/v3.go b/pkg/query-service/model/v3/v3.go index b9ef61b9a7..c1dddc60f6 100644 --- a/pkg/query-service/model/v3/v3.go +++ b/pkg/query-service/model/v3/v3.go @@ -209,10 +209,11 @@ type FilterAttributeKeyRequest struct { type AttributeKeyDataType string const ( - AttributeKeyDataTypeString AttributeKeyDataType = "string" - AttributeKeyDataTypeInt64 AttributeKeyDataType = "int64" - AttributeKeyDataTypeFloat64 AttributeKeyDataType = "float64" - AttributeKeyDataTypeBool AttributeKeyDataType = "bool" + AttributeKeyDataTypeUnspecified AttributeKeyDataType = "" + AttributeKeyDataTypeString AttributeKeyDataType = "string" + AttributeKeyDataTypeInt64 AttributeKeyDataType = "int64" + AttributeKeyDataTypeFloat64 AttributeKeyDataType = "float64" + AttributeKeyDataTypeBool AttributeKeyDataType = "bool" ) func (q AttributeKeyDataType) Validate() error { @@ -263,7 +264,7 @@ type AttributeKey struct { func (a AttributeKey) Validate() error { switch a.DataType { - case AttributeKeyDataTypeBool, AttributeKeyDataTypeInt64, AttributeKeyDataTypeFloat64, AttributeKeyDataTypeString: + case AttributeKeyDataTypeBool, AttributeKeyDataTypeInt64, AttributeKeyDataTypeFloat64, AttributeKeyDataTypeString, AttributeKeyDataTypeUnspecified: break default: return fmt.Errorf("invalid attribute dataType: %s", a.DataType) @@ -516,7 +517,7 @@ type OrderBy struct { type Having struct { ColumnName string `json:"columnName"` - Operator string `json:"operator"` + Operator string `json:"op"` Value interface{} `json:"value"` } @@ -537,8 +538,8 @@ type Series struct { } type Row struct { - Timestamp time.Time `json:"timestamp"` - Data map[string]string `json:"data"` + Timestamp time.Time `json:"timestamp"` + Data map[string]interface{} `json:"data"` } type Point struct {