chore: add functions support (#4381)

This commit is contained in:
Srikanth Chekuri 2024-01-25 01:14:45 +05:30 committed by GitHub
parent 253137a6b8
commit be27a92fc9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 2284 additions and 697 deletions

View File

@ -0,0 +1,90 @@
package app
import (
"strings"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
)
// applyHavingClause applies the having clause to the result
// each query has its own having clause
// there can be multiple having clauses for each query
func applyHavingClause(result []*v3.Result, queryRangeParams *v3.QueryRangeParamsV3) {
for _, result := range result {
builderQueries := queryRangeParams.CompositeQuery.BuilderQueries
if builderQueries != nil && (builderQueries[result.QueryName].DataSource == v3.DataSourceMetrics) {
havingClause := builderQueries[result.QueryName].Having
for i := 0; i < len(result.Series); i++ {
for j := 0; j < len(result.Series[i].Points); j++ {
if !evaluateHavingClause(havingClause, result.Series[i].Points[j].Value) {
result.Series[i].Points = append(result.Series[i].Points[:j], result.Series[i].Points[j+1:]...)
j--
}
}
}
}
}
}
func evaluateHavingClause(having []v3.Having, value float64) bool {
if len(having) == 0 {
return true
}
for _, h := range having {
switch h.Operator {
case v3.HavingOperatorEqual:
if value == h.Value.(float64) {
return true
}
case v3.HavingOperatorNotEqual:
if value != h.Value.(float64) {
return true
}
case v3.HavingOperatorGreaterThan:
if value > h.Value.(float64) {
return true
}
case v3.HavingOperatorGreaterThanOrEq:
if value >= h.Value.(float64) {
return true
}
case v3.HavingOperatorLessThan:
if value < h.Value.(float64) {
return true
}
case v3.HavingOperatorLessThanOrEq:
if value <= h.Value.(float64) {
return true
}
case v3.HavingOperatorIn, v3.HavingOperator(strings.ToLower(string(v3.HavingOperatorIn))):
values, ok := h.Value.([]interface{})
if !ok {
return false
}
for _, v := range values {
if value == v.(float64) {
return true
}
}
case v3.HavingOperatorNotIn, v3.HavingOperator(strings.ToLower(string(v3.HavingOperatorNotIn))):
values, ok := h.Value.([]interface{})
if !ok {
return true
}
found := false
for _, v := range values {
if value == v.(float64) {
found = true
break
}
}
if !found {
return true
}
}
}
return false
}

View File

@ -0,0 +1,283 @@
package app
import (
"testing"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
)
func TestApplyHavingCaluse(t *testing.T) {
type testCase struct {
name string
results []*v3.Result
params *v3.QueryRangeParamsV3
want []*v3.Result
}
testCases := []testCase{
{
name: "test having equal to",
results: []*v3.Result{
{
QueryName: "A",
Series: []*v3.Series{
{
Points: []v3.Point{
{
Value: 0.5,
},
{
Value: 0.4,
},
{
Value: 0.3,
},
{
Value: 0.2,
},
{
Value: 0.1,
},
},
},
},
},
},
params: &v3.QueryRangeParamsV3{
CompositeQuery: &v3.CompositeQuery{
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
DataSource: v3.DataSourceMetrics,
Having: []v3.Having{
{
Operator: v3.HavingOperatorEqual,
Value: 0.3,
},
},
},
},
},
},
want: []*v3.Result{
{
Series: []*v3.Series{
{
Points: []v3.Point{
{
Value: 0.3,
},
},
},
},
},
},
},
{
name: "test having `in`",
results: []*v3.Result{
{
QueryName: "A",
Series: []*v3.Series{
{
Points: []v3.Point{
{
Value: 0.5,
},
{
Value: 0.4,
},
{
Value: 0.3,
},
{
Value: 0.2,
},
{
Value: 0.1,
},
},
},
},
},
},
params: &v3.QueryRangeParamsV3{
CompositeQuery: &v3.CompositeQuery{
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
DataSource: v3.DataSourceMetrics,
Having: []v3.Having{
{
Operator: v3.HavingOperatorIn,
Value: []interface{}{0.3, 0.4},
},
},
},
},
},
},
want: []*v3.Result{
{
Series: []*v3.Series{
{
Points: []v3.Point{
{
Value: 0.4,
},
{
Value: 0.3,
},
},
},
},
},
},
},
{
name: "test having `not in` and multiple results",
results: []*v3.Result{
{
QueryName: "A",
Series: []*v3.Series{
{
Points: []v3.Point{
{
Value: 0.5,
},
{
Value: 0.4,
},
{
Value: 0.3,
},
{
Value: 0.2,
},
{
Value: 0.1,
},
},
},
},
},
{
QueryName: "B",
Series: []*v3.Series{
{
Points: []v3.Point{
{
Value: 0.5,
},
{
Value: 0.4,
},
{
Value: 0.3,
},
{
Value: 0.2,
},
{
Value: 0.1,
},
},
},
},
},
},
params: &v3.QueryRangeParamsV3{
CompositeQuery: &v3.CompositeQuery{
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
DataSource: v3.DataSourceMetrics,
Having: []v3.Having{
{
Operator: v3.HavingOperatorNotIn,
Value: []interface{}{0.3, 0.4},
},
},
},
"B": {
DataSource: v3.DataSourceMetrics,
Having: []v3.Having{
{
Operator: v3.HavingOperatorNotIn,
Value: []interface{}{0.1},
},
},
},
},
},
},
want: []*v3.Result{
{
Series: []*v3.Series{
{
Points: []v3.Point{
{
Value: 0.5,
},
{
Value: 0.2,
},
{
Value: 0.1,
},
},
},
},
},
{
Series: []*v3.Series{
{
Points: []v3.Point{
{
Value: 0.5,
},
{
Value: 0.4,
},
{
Value: 0.3,
},
{
Value: 0.2,
},
},
},
},
},
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
applyHavingClause(tc.results, tc.params)
got := tc.results
if len(got) != len(tc.want) {
t.Errorf("got %v, want %v", got, tc.want)
}
for i := range got {
if len(got[i].Series) != len(tc.want[i].Series) {
t.Errorf("got %v, want %v", got, tc.want)
}
for j := range got[i].Series {
if len(got[i].Series[j].Points) != len(tc.want[i].Series[j].Points) {
t.Errorf("got %v, want %v", len(got[i].Series[j].Points), len(tc.want[i].Series[j].Points))
}
for k := range got[i].Series[j].Points {
if got[i].Series[j].Points[k].Value != tc.want[i].Series[j].Points[k].Value {
t.Errorf("got %v, want %v", got, tc.want)
}
}
}
}
})
}
}

View File

@ -8,7 +8,6 @@ import (
"fmt"
"io"
"net/http"
"sort"
"strconv"
"strings"
"sync"
@ -3107,78 +3106,6 @@ func (aH *APIHandler) QueryRangeV3(w http.ResponseWriter, r *http.Request) {
aH.queryRangeV3(r.Context(), queryRangeParams, w, r)
}
func applyMetricLimit(results []*v3.Result, queryRangeParams *v3.QueryRangeParamsV3) {
// apply limit if any for metrics
// use the grouping set points to apply the limit
for _, result := range results {
builderQueries := queryRangeParams.CompositeQuery.BuilderQueries
if builderQueries != nil && (builderQueries[result.QueryName].DataSource == v3.DataSourceMetrics ||
result.QueryName != builderQueries[result.QueryName].Expression) {
limit := builderQueries[result.QueryName].Limit
orderByList := builderQueries[result.QueryName].OrderBy
if limit >= 0 {
if len(orderByList) == 0 {
// If no orderBy is specified, sort by value in descending order
orderByList = []v3.OrderBy{{ColumnName: constants.SigNozOrderByValue, Order: "desc"}}
}
sort.SliceStable(result.Series, func(i, j int) bool {
for _, orderBy := range orderByList {
if orderBy.ColumnName == constants.SigNozOrderByValue {
// For table type queries (we rely on the fact that one value for row), sort
// based on final aggregation value
if len(result.Series[i].Points) == 1 && len(result.Series[j].Points) == 1 {
if orderBy.Order == "asc" {
return result.Series[i].Points[0].Value < result.Series[j].Points[0].Value
} else if orderBy.Order == "desc" {
return result.Series[i].Points[0].Value > result.Series[j].Points[0].Value
}
}
// For graph type queries, sort based on GroupingSetsPoint
if result.Series[i].GroupingSetsPoint == nil || result.Series[j].GroupingSetsPoint == nil {
// Handle nil GroupingSetsPoint, if needed
// Here, we assume non-nil values are always less than nil values
return result.Series[i].GroupingSetsPoint != nil
}
if orderBy.Order == "asc" {
return result.Series[i].GroupingSetsPoint.Value < result.Series[j].GroupingSetsPoint.Value
} else if orderBy.Order == "desc" {
return result.Series[i].GroupingSetsPoint.Value > result.Series[j].GroupingSetsPoint.Value
}
} else {
// Sort based on Labels map
labelI, existsI := result.Series[i].Labels[orderBy.ColumnName]
labelJ, existsJ := result.Series[j].Labels[orderBy.ColumnName]
if !existsI || !existsJ {
// Handle missing labels, if needed
// Here, we assume non-existent labels are always less than existing ones
return existsI
}
if orderBy.Order == "asc" {
return strings.Compare(labelI, labelJ) < 0
} else if orderBy.Order == "desc" {
return strings.Compare(labelI, labelJ) > 0
}
}
}
// Preserve original order if no matching orderBy is found
return i < j
})
if limit > 0 && len(result.Series) > int(limit) {
result.Series = result.Series[:limit]
}
}
}
}
}
func (aH *APIHandler) liveTailLogs(w http.ResponseWriter, r *http.Request) {
// get the param from url and add it to body
@ -3295,6 +3222,10 @@ func (aH *APIHandler) queryRangeV4(ctx context.Context, queryRangeParams *v3.Que
return
}
if queryRangeParams.CompositeQuery.QueryType == v3.QueryTypeBuilder {
postProcessResult(result, queryRangeParams)
}
resp := v3.QueryRangeResponse{
Result: result,
}
@ -3322,3 +3253,48 @@ func (aH *APIHandler) QueryRangeV4(w http.ResponseWriter, r *http.Request) {
aH.queryRangeV4(r.Context(), queryRangeParams, w, r)
}
// postProcessResult applies having clause, metric limit, reduce function to the result
// This function is effective for metrics data source for now, but it can be extended to other data sources
// if needed
// Much of this work can be done in the ClickHouse query, but we decided to do it here because:
// 1. Effective use of caching
// 2. Easier to add new functions
func postProcessResult(result []*v3.Result, queryRangeParams *v3.QueryRangeParamsV3) {
// Having clause is not part of the clickhouse query, so we need to apply it here
// It's not included in the query because it doesn't work nicely with caching
// With this change, if you have a query with a having clause, and then you change the having clause
// to something else, the query will still be cached.
applyHavingClause(result, queryRangeParams)
// We apply the metric limit here because it's not part of the clickhouse query
// The limit in the context of the time series query is the number of time series
// So for the limit to work, we need to know what series to keep and what to discard
// For us to know that, we need to execute the query first, and then apply the limit
// which we found expensive, because we are executing the query twice on the same data
// So we decided to apply the limit here, after the query is executed
// The function is named applyMetricLimit because it only applies to metrics data source
// In traces and logs, the limit is achieved using subqueries
applyMetricLimit(result, queryRangeParams)
// Each series in the result produces N number of points, where N is (end - start) / step
// For the panel type table, we need to show one point for each series in the row
// We do that by applying a reduce function to each series
applyReduceTo(result, queryRangeParams)
// We apply the functions here it's easier to add new functions
applyFunctions(result, queryRangeParams)
}
// applyFunctions applies functions for each query in the composite query
// The functions can be more than one, and they are applied in the order they are defined
func applyFunctions(results []*v3.Result, queryRangeParams *v3.QueryRangeParamsV3) {
for idx, result := range results {
builderQueries := queryRangeParams.CompositeQuery.BuilderQueries
if builderQueries != nil && (builderQueries[result.QueryName].DataSource == v3.DataSourceMetrics) {
functions := builderQueries[result.QueryName].Functions
for _, function := range functions {
results[idx] = queryBuilder.ApplyFunction(function, result)
}
}
}
}

View File

@ -8,9 +8,7 @@ import (
"strings"
"testing"
"go.signoz.io/signoz/pkg/query-service/constants"
"go.signoz.io/signoz/pkg/query-service/model"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
)
func TestPrepareQuery(t *testing.T) {
@ -132,619 +130,3 @@ func TestPrepareQuery(t *testing.T) {
})
}
}
func TestApplyLimitOnMetricResult(t *testing.T) {
cases := []struct {
name string
inputResult []*v3.Result
params *v3.QueryRangeParamsV3
expectedResult []*v3.Result
}{
{
name: "test limit 1 without order", // top most (latency/error) as default
inputResult: []*v3.Result{
{
QueryName: "A",
Series: []*v3.Series{
{
Labels: map[string]string{
"service_name": "frontend",
},
Points: []v3.Point{
{
Timestamp: 1689220036000,
Value: 19.2,
},
{
Timestamp: 1689220096000,
Value: 19.5,
},
},
GroupingSetsPoint: &v3.Point{
Timestamp: 0,
Value: 19.3,
},
},
{
Labels: map[string]string{
"service_name": "route",
},
Points: []v3.Point{
{
Timestamp: 1689220036000,
Value: 8.83,
},
{
Timestamp: 1689220096000,
Value: 8.83,
},
},
GroupingSetsPoint: &v3.Point{
Timestamp: 0,
Value: 8.83,
},
},
},
},
},
params: &v3.QueryRangeParamsV3{
Start: 1689220036000,
End: 1689220096000,
Step: 60,
CompositeQuery: &v3.CompositeQuery{
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
QueryName: "A",
AggregateAttribute: v3.AttributeKey{Key: "signo_calls_total"},
DataSource: v3.DataSourceMetrics,
AggregateOperator: v3.AggregateOperatorSumRate,
Expression: "A",
GroupBy: []v3.AttributeKey{{Key: "service_name"}},
Limit: 1,
},
},
QueryType: v3.QueryTypeBuilder,
PanelType: v3.PanelTypeGraph,
},
},
expectedResult: []*v3.Result{
{
QueryName: "A",
Series: []*v3.Series{
{
Labels: map[string]string{
"service_name": "frontend",
},
Points: []v3.Point{
{
Timestamp: 1689220036000,
Value: 19.2,
},
{
Timestamp: 1689220096000,
Value: 19.5,
},
},
GroupingSetsPoint: &v3.Point{
Timestamp: 0,
Value: 19.3,
},
},
},
},
},
},
{
name: "test limit with order asc",
inputResult: []*v3.Result{
{
QueryName: "A",
Series: []*v3.Series{
{
Labels: map[string]string{
"service_name": "frontend",
},
Points: []v3.Point{
{
Timestamp: 1689220036000,
Value: 19.2,
},
{
Timestamp: 1689220096000,
Value: 19.5,
},
},
GroupingSetsPoint: &v3.Point{
Timestamp: 0,
Value: 19.3,
},
},
{
Labels: map[string]string{
"service_name": "route",
},
Points: []v3.Point{
{
Timestamp: 1689220036000,
Value: 8.83,
},
{
Timestamp: 1689220096000,
Value: 8.83,
},
},
GroupingSetsPoint: &v3.Point{
Timestamp: 0,
Value: 8.83,
},
},
},
},
},
params: &v3.QueryRangeParamsV3{
Start: 1689220036000,
End: 1689220096000,
Step: 60,
CompositeQuery: &v3.CompositeQuery{
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
QueryName: "A",
AggregateAttribute: v3.AttributeKey{Key: "signo_calls_total"},
DataSource: v3.DataSourceMetrics,
AggregateOperator: v3.AggregateOperatorSumRate,
Expression: "A",
GroupBy: []v3.AttributeKey{{Key: "service_name"}},
Limit: 1,
OrderBy: []v3.OrderBy{{ColumnName: constants.SigNozOrderByValue, Order: "asc"}},
},
},
QueryType: v3.QueryTypeBuilder,
PanelType: v3.PanelTypeGraph,
},
},
expectedResult: []*v3.Result{
{
QueryName: "A",
Series: []*v3.Series{
{
Labels: map[string]string{
"service_name": "route",
},
Points: []v3.Point{
{
Timestamp: 1689220036000,
Value: 8.83,
},
{
Timestamp: 1689220096000,
Value: 8.83,
},
},
GroupingSetsPoint: &v3.Point{
Timestamp: 0,
Value: 8.83,
},
},
},
},
},
},
{
name: "test data source not metrics",
inputResult: []*v3.Result{
{
QueryName: "A",
Series: []*v3.Series{
{
Labels: map[string]string{
"service_name": "frontend",
},
Points: []v3.Point{
{
Timestamp: 1689220036000,
Value: 69,
},
{
Timestamp: 1689220096000,
Value: 240,
},
},
GroupingSetsPoint: &v3.Point{
Timestamp: 0,
Value: 154.5,
},
},
{
Labels: map[string]string{
"service_name": "redis",
},
Points: []v3.Point{
{
Timestamp: 1689220036000,
Value: 420,
},
{
Timestamp: 1689220096000,
Value: 260,
},
},
GroupingSetsPoint: &v3.Point{
Timestamp: 0,
Value: 340,
},
},
},
},
},
params: &v3.QueryRangeParamsV3{
Start: 1689220036000,
End: 1689220096000,
Step: 60,
CompositeQuery: &v3.CompositeQuery{
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
QueryName: "A",
AggregateAttribute: v3.AttributeKey{Key: "service_name"},
DataSource: v3.DataSourceTraces,
AggregateOperator: v3.AggregateOperatorSum,
Expression: "A",
GroupBy: []v3.AttributeKey{{Key: "service_name"}},
Limit: 1,
OrderBy: []v3.OrderBy{{ColumnName: constants.SigNozOrderByValue, Order: "asc"}},
},
},
QueryType: v3.QueryTypeBuilder,
PanelType: v3.PanelTypeGraph,
},
},
expectedResult: []*v3.Result{
{
QueryName: "A",
Series: []*v3.Series{
{
Labels: map[string]string{
"service_name": "frontend",
},
Points: []v3.Point{
{
Timestamp: 1689220036000,
Value: 69,
},
{
Timestamp: 1689220096000,
Value: 240,
},
},
GroupingSetsPoint: &v3.Point{
Timestamp: 0,
Value: 154.5,
},
},
{
Labels: map[string]string{
"service_name": "redis",
},
Points: []v3.Point{
{
Timestamp: 1689220036000,
Value: 420,
},
{
Timestamp: 1689220096000,
Value: 260,
},
},
GroupingSetsPoint: &v3.Point{
Timestamp: 0,
Value: 340,
},
},
},
},
},
},
{
// ["GET /api/v1/health", "DELETE /api/v1/health"] so result should be ["DELETE /api/v1/health"] although it has lower value
name: "test limit with operation asc",
inputResult: []*v3.Result{
{
QueryName: "A",
Series: []*v3.Series{
{
Labels: map[string]string{
"service_name": "frontend",
"operation": "GET /api/v1/health",
},
Points: []v3.Point{
{
Timestamp: 1689220036000,
Value: 19.2,
},
{
Timestamp: 1689220096000,
Value: 19.5,
},
},
GroupingSetsPoint: &v3.Point{
Timestamp: 0,
Value: 19.3,
},
},
{
Labels: map[string]string{
"service_name": "route",
"operation": "DELETE /api/v1/health",
},
Points: []v3.Point{
{
Timestamp: 1689220036000,
Value: 8.83,
},
{
Timestamp: 1689220096000,
Value: 8.83,
},
},
GroupingSetsPoint: &v3.Point{
Timestamp: 0,
Value: 8.83,
},
},
},
},
},
params: &v3.QueryRangeParamsV3{
Start: 1689220036000,
End: 1689220096000,
Step: 60,
CompositeQuery: &v3.CompositeQuery{
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
QueryName: "A",
AggregateAttribute: v3.AttributeKey{Key: "signo_calls_total"},
DataSource: v3.DataSourceMetrics,
AggregateOperator: v3.AggregateOperatorSumRate,
Expression: "A",
GroupBy: []v3.AttributeKey{{Key: "service_name"}},
Limit: 1,
OrderBy: []v3.OrderBy{{ColumnName: "operation", Order: "asc"}},
},
},
QueryType: v3.QueryTypeBuilder,
PanelType: v3.PanelTypeGraph,
},
},
expectedResult: []*v3.Result{
{
QueryName: "A",
Series: []*v3.Series{
{
Labels: map[string]string{
"service_name": "route",
"operation": "DELETE /api/v1/health",
},
Points: []v3.Point{
{
Timestamp: 1689220036000,
Value: 8.83,
},
{
Timestamp: 1689220096000,
Value: 8.83,
},
},
GroupingSetsPoint: &v3.Point{
Timestamp: 0,
Value: 8.83,
},
},
},
},
},
},
{
name: "test limit with multiple order by labels",
inputResult: []*v3.Result{
{
QueryName: "A",
Series: []*v3.Series{
{
Labels: map[string]string{
"service_name": "frontend",
"operation": "GET /api/v1/health",
"status_code": "200",
"priority": "P0",
},
Points: []v3.Point{
{
Timestamp: 1689220036000,
Value: 19.2,
},
{
Timestamp: 1689220096000,
Value: 19.5,
},
},
GroupingSetsPoint: &v3.Point{
Timestamp: 0,
Value: 19.3,
},
},
{
Labels: map[string]string{
"service_name": "route",
"operation": "DELETE /api/v1/health",
"status_code": "301",
"priority": "P1",
},
Points: []v3.Point{
{
Timestamp: 1689220036000,
Value: 8.83,
},
{
Timestamp: 1689220096000,
Value: 8.83,
},
},
GroupingSetsPoint: &v3.Point{
Timestamp: 0,
Value: 8.83,
},
},
{
Labels: map[string]string{
"service_name": "route",
"operation": "DELETE /api/v1/health",
"status_code": "400",
"priority": "P0",
},
Points: []v3.Point{
{
Timestamp: 1689220036000,
Value: 8.83,
},
{
Timestamp: 1689220096000,
Value: 8.83,
},
},
GroupingSetsPoint: &v3.Point{
Timestamp: 0,
Value: 8.83,
},
},
{
Labels: map[string]string{
"service_name": "route",
"operation": "DELETE /api/v1/health",
"status_code": "200",
"priority": "P1",
},
Points: []v3.Point{
{
Timestamp: 1689220036000,
Value: 8.83,
},
{
Timestamp: 1689220096000,
Value: 8.83,
},
},
GroupingSetsPoint: &v3.Point{
Timestamp: 0,
Value: 8.83,
},
},
},
},
},
params: &v3.QueryRangeParamsV3{
Start: 1689220036000,
End: 1689220096000,
Step: 60,
CompositeQuery: &v3.CompositeQuery{
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
QueryName: "A",
AggregateAttribute: v3.AttributeKey{Key: "signo_calls_total"},
DataSource: v3.DataSourceMetrics,
AggregateOperator: v3.AggregateOperatorSumRate,
Expression: "A",
GroupBy: []v3.AttributeKey{{Key: "service_name"}, {Key: "operation"}, {Key: "status_code"}, {Key: "priority"}},
Limit: 2,
OrderBy: []v3.OrderBy{
{ColumnName: "priority", Order: "asc"},
{ColumnName: "status_code", Order: "desc"},
},
},
},
QueryType: v3.QueryTypeBuilder,
PanelType: v3.PanelTypeGraph,
},
},
expectedResult: []*v3.Result{
{
QueryName: "A",
Series: []*v3.Series{
{
Labels: map[string]string{
"service_name": "frontend",
"operation": "GET /api/v1/health",
"status_code": "200",
"priority": "P0",
},
Points: []v3.Point{
{
Timestamp: 1689220036000,
Value: 19.2,
},
{
Timestamp: 1689220096000,
Value: 19.5,
},
},
GroupingSetsPoint: &v3.Point{
Timestamp: 0,
Value: 19.3,
},
},
{
Labels: map[string]string{
"service_name": "route",
"operation": "DELETE /api/v1/health",
"status_code": "400",
"priority": "P0",
},
Points: []v3.Point{
{
Timestamp: 1689220036000,
Value: 8.83,
},
{
Timestamp: 1689220096000,
Value: 8.83,
},
},
GroupingSetsPoint: &v3.Point{
Timestamp: 0,
Value: 8.83,
},
},
},
},
},
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
result := c.inputResult
applyMetricLimit(result, c.params)
if len(result) != len(c.expectedResult) {
t.Errorf("expected result length: %d, but got: %d", len(c.expectedResult), len(result))
}
for i, r := range result {
if r.QueryName != c.expectedResult[i].QueryName {
t.Errorf("expected query name: %s, but got: %s", c.expectedResult[i].QueryName, r.QueryName)
}
if len(r.Series) != len(c.expectedResult[i].Series) {
t.Errorf("expected series length: %d, but got: %d", len(c.expectedResult[i].Series), len(r.Series))
}
for j, s := range r.Series {
if len(s.Points) != len(c.expectedResult[i].Series[j].Points) {
t.Errorf("expected points length: %d, but got: %d", len(c.expectedResult[i].Series[j].Points), len(s.Points))
}
for k, p := range s.Points {
if p.Timestamp != c.expectedResult[i].Series[j].Points[k].Timestamp {
t.Errorf("expected point timestamp: %d, but got: %d", c.expectedResult[i].Series[j].Points[k].Timestamp, p.Timestamp)
}
if p.Value != c.expectedResult[i].Series[j].Points[k].Value {
t.Errorf("expected point value: %f, but got: %f", c.expectedResult[i].Series[j].Points[k].Value, p.Value)
}
}
}
}
})
}
}

View File

@ -0,0 +1,81 @@
package app
import (
"sort"
"strings"
"go.signoz.io/signoz/pkg/query-service/constants"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
)
// applyMetricLimit applies limit to the metrics query results
func applyMetricLimit(results []*v3.Result, queryRangeParams *v3.QueryRangeParamsV3) {
// apply limit if any for metrics
// use the grouping set points to apply the limit
for _, result := range results {
builderQueries := queryRangeParams.CompositeQuery.BuilderQueries
if builderQueries != nil && (builderQueries[result.QueryName].DataSource == v3.DataSourceMetrics) {
limit := builderQueries[result.QueryName].Limit
orderByList := builderQueries[result.QueryName].OrderBy
if limit > 0 {
if len(orderByList) == 0 {
// If no orderBy is specified, sort by value in descending order
orderByList = []v3.OrderBy{{ColumnName: constants.SigNozOrderByValue, Order: "desc"}}
}
sort.SliceStable(result.Series, func(i, j int) bool {
for _, orderBy := range orderByList {
if orderBy.ColumnName == constants.SigNozOrderByValue {
// For table type queries (we rely on the fact that one value for row), sort
// based on final aggregation value
if len(result.Series[i].Points) == 1 && len(result.Series[j].Points) == 1 {
if orderBy.Order == "asc" {
return result.Series[i].Points[0].Value < result.Series[j].Points[0].Value
} else if orderBy.Order == "desc" {
return result.Series[i].Points[0].Value > result.Series[j].Points[0].Value
}
}
// For graph type queries, sort based on GroupingSetsPoint
if result.Series[i].GroupingSetsPoint == nil || result.Series[j].GroupingSetsPoint == nil {
// Handle nil GroupingSetsPoint, if needed
// Here, we assume non-nil values are always less than nil values
return result.Series[i].GroupingSetsPoint != nil
}
if orderBy.Order == "asc" {
return result.Series[i].GroupingSetsPoint.Value < result.Series[j].GroupingSetsPoint.Value
} else if orderBy.Order == "desc" {
return result.Series[i].GroupingSetsPoint.Value > result.Series[j].GroupingSetsPoint.Value
}
} else {
// Sort based on Labels map
labelI, existsI := result.Series[i].Labels[orderBy.ColumnName]
labelJ, existsJ := result.Series[j].Labels[orderBy.ColumnName]
if !existsI || !existsJ {
// Handle missing labels, if needed
// Here, we assume non-existent labels are always less than existing ones
return existsI
}
if orderBy.Order == "asc" {
return strings.Compare(labelI, labelJ) < 0
} else if orderBy.Order == "desc" {
return strings.Compare(labelI, labelJ) > 0
}
}
}
// Preserve original order if no matching orderBy is found
return i < j
})
if limit > 0 && len(result.Series) > int(limit) {
result.Series = result.Series[:limit]
}
}
}
}
}

View File

@ -0,0 +1,624 @@
package app
import (
"testing"
"go.signoz.io/signoz/pkg/query-service/constants"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
)
func TestApplyLimitOnMetricResult(t *testing.T) {
cases := []struct {
name string
inputResult []*v3.Result
params *v3.QueryRangeParamsV3
expectedResult []*v3.Result
}{
{
name: "test limit 1 without order", // top most (latency/error) as default
inputResult: []*v3.Result{
{
QueryName: "A",
Series: []*v3.Series{
{
Labels: map[string]string{
"service_name": "frontend",
},
Points: []v3.Point{
{
Timestamp: 1689220036000,
Value: 19.2,
},
{
Timestamp: 1689220096000,
Value: 19.5,
},
},
GroupingSetsPoint: &v3.Point{
Timestamp: 0,
Value: 19.3,
},
},
{
Labels: map[string]string{
"service_name": "route",
},
Points: []v3.Point{
{
Timestamp: 1689220036000,
Value: 8.83,
},
{
Timestamp: 1689220096000,
Value: 8.83,
},
},
GroupingSetsPoint: &v3.Point{
Timestamp: 0,
Value: 8.83,
},
},
},
},
},
params: &v3.QueryRangeParamsV3{
Start: 1689220036000,
End: 1689220096000,
Step: 60,
CompositeQuery: &v3.CompositeQuery{
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
QueryName: "A",
AggregateAttribute: v3.AttributeKey{Key: "signo_calls_total"},
DataSource: v3.DataSourceMetrics,
AggregateOperator: v3.AggregateOperatorSumRate,
Expression: "A",
GroupBy: []v3.AttributeKey{{Key: "service_name"}},
Limit: 1,
},
},
QueryType: v3.QueryTypeBuilder,
PanelType: v3.PanelTypeGraph,
},
},
expectedResult: []*v3.Result{
{
QueryName: "A",
Series: []*v3.Series{
{
Labels: map[string]string{
"service_name": "frontend",
},
Points: []v3.Point{
{
Timestamp: 1689220036000,
Value: 19.2,
},
{
Timestamp: 1689220096000,
Value: 19.5,
},
},
GroupingSetsPoint: &v3.Point{
Timestamp: 0,
Value: 19.3,
},
},
},
},
},
},
{
name: "test limit with order asc",
inputResult: []*v3.Result{
{
QueryName: "A",
Series: []*v3.Series{
{
Labels: map[string]string{
"service_name": "frontend",
},
Points: []v3.Point{
{
Timestamp: 1689220036000,
Value: 19.2,
},
{
Timestamp: 1689220096000,
Value: 19.5,
},
},
GroupingSetsPoint: &v3.Point{
Timestamp: 0,
Value: 19.3,
},
},
{
Labels: map[string]string{
"service_name": "route",
},
Points: []v3.Point{
{
Timestamp: 1689220036000,
Value: 8.83,
},
{
Timestamp: 1689220096000,
Value: 8.83,
},
},
GroupingSetsPoint: &v3.Point{
Timestamp: 0,
Value: 8.83,
},
},
},
},
},
params: &v3.QueryRangeParamsV3{
Start: 1689220036000,
End: 1689220096000,
Step: 60,
CompositeQuery: &v3.CompositeQuery{
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
QueryName: "A",
AggregateAttribute: v3.AttributeKey{Key: "signo_calls_total"},
DataSource: v3.DataSourceMetrics,
AggregateOperator: v3.AggregateOperatorSumRate,
Expression: "A",
GroupBy: []v3.AttributeKey{{Key: "service_name"}},
Limit: 1,
OrderBy: []v3.OrderBy{{ColumnName: constants.SigNozOrderByValue, Order: "asc"}},
},
},
QueryType: v3.QueryTypeBuilder,
PanelType: v3.PanelTypeGraph,
},
},
expectedResult: []*v3.Result{
{
QueryName: "A",
Series: []*v3.Series{
{
Labels: map[string]string{
"service_name": "route",
},
Points: []v3.Point{
{
Timestamp: 1689220036000,
Value: 8.83,
},
{
Timestamp: 1689220096000,
Value: 8.83,
},
},
GroupingSetsPoint: &v3.Point{
Timestamp: 0,
Value: 8.83,
},
},
},
},
},
},
{
name: "test data source not metrics",
inputResult: []*v3.Result{
{
QueryName: "A",
Series: []*v3.Series{
{
Labels: map[string]string{
"service_name": "frontend",
},
Points: []v3.Point{
{
Timestamp: 1689220036000,
Value: 69,
},
{
Timestamp: 1689220096000,
Value: 240,
},
},
GroupingSetsPoint: &v3.Point{
Timestamp: 0,
Value: 154.5,
},
},
{
Labels: map[string]string{
"service_name": "redis",
},
Points: []v3.Point{
{
Timestamp: 1689220036000,
Value: 420,
},
{
Timestamp: 1689220096000,
Value: 260,
},
},
GroupingSetsPoint: &v3.Point{
Timestamp: 0,
Value: 340,
},
},
},
},
},
params: &v3.QueryRangeParamsV3{
Start: 1689220036000,
End: 1689220096000,
Step: 60,
CompositeQuery: &v3.CompositeQuery{
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
QueryName: "A",
AggregateAttribute: v3.AttributeKey{Key: "service_name"},
DataSource: v3.DataSourceTraces,
AggregateOperator: v3.AggregateOperatorSum,
Expression: "A",
GroupBy: []v3.AttributeKey{{Key: "service_name"}},
Limit: 1,
OrderBy: []v3.OrderBy{{ColumnName: constants.SigNozOrderByValue, Order: "asc"}},
},
},
QueryType: v3.QueryTypeBuilder,
PanelType: v3.PanelTypeGraph,
},
},
expectedResult: []*v3.Result{
{
QueryName: "A",
Series: []*v3.Series{
{
Labels: map[string]string{
"service_name": "frontend",
},
Points: []v3.Point{
{
Timestamp: 1689220036000,
Value: 69,
},
{
Timestamp: 1689220096000,
Value: 240,
},
},
GroupingSetsPoint: &v3.Point{
Timestamp: 0,
Value: 154.5,
},
},
{
Labels: map[string]string{
"service_name": "redis",
},
Points: []v3.Point{
{
Timestamp: 1689220036000,
Value: 420,
},
{
Timestamp: 1689220096000,
Value: 260,
},
},
GroupingSetsPoint: &v3.Point{
Timestamp: 0,
Value: 340,
},
},
},
},
},
},
{
// ["GET /api/v1/health", "DELETE /api/v1/health"] so result should be ["DELETE /api/v1/health"] although it has lower value
name: "test limit with operation asc",
inputResult: []*v3.Result{
{
QueryName: "A",
Series: []*v3.Series{
{
Labels: map[string]string{
"service_name": "frontend",
"operation": "GET /api/v1/health",
},
Points: []v3.Point{
{
Timestamp: 1689220036000,
Value: 19.2,
},
{
Timestamp: 1689220096000,
Value: 19.5,
},
},
GroupingSetsPoint: &v3.Point{
Timestamp: 0,
Value: 19.3,
},
},
{
Labels: map[string]string{
"service_name": "route",
"operation": "DELETE /api/v1/health",
},
Points: []v3.Point{
{
Timestamp: 1689220036000,
Value: 8.83,
},
{
Timestamp: 1689220096000,
Value: 8.83,
},
},
GroupingSetsPoint: &v3.Point{
Timestamp: 0,
Value: 8.83,
},
},
},
},
},
params: &v3.QueryRangeParamsV3{
Start: 1689220036000,
End: 1689220096000,
Step: 60,
CompositeQuery: &v3.CompositeQuery{
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
QueryName: "A",
AggregateAttribute: v3.AttributeKey{Key: "signo_calls_total"},
DataSource: v3.DataSourceMetrics,
AggregateOperator: v3.AggregateOperatorSumRate,
Expression: "A",
GroupBy: []v3.AttributeKey{{Key: "service_name"}},
Limit: 1,
OrderBy: []v3.OrderBy{{ColumnName: "operation", Order: "asc"}},
},
},
QueryType: v3.QueryTypeBuilder,
PanelType: v3.PanelTypeGraph,
},
},
expectedResult: []*v3.Result{
{
QueryName: "A",
Series: []*v3.Series{
{
Labels: map[string]string{
"service_name": "route",
"operation": "DELETE /api/v1/health",
},
Points: []v3.Point{
{
Timestamp: 1689220036000,
Value: 8.83,
},
{
Timestamp: 1689220096000,
Value: 8.83,
},
},
GroupingSetsPoint: &v3.Point{
Timestamp: 0,
Value: 8.83,
},
},
},
},
},
},
{
name: "test limit with multiple order by labels",
inputResult: []*v3.Result{
{
QueryName: "A",
Series: []*v3.Series{
{
Labels: map[string]string{
"service_name": "frontend",
"operation": "GET /api/v1/health",
"status_code": "200",
"priority": "P0",
},
Points: []v3.Point{
{
Timestamp: 1689220036000,
Value: 19.2,
},
{
Timestamp: 1689220096000,
Value: 19.5,
},
},
GroupingSetsPoint: &v3.Point{
Timestamp: 0,
Value: 19.3,
},
},
{
Labels: map[string]string{
"service_name": "route",
"operation": "DELETE /api/v1/health",
"status_code": "301",
"priority": "P1",
},
Points: []v3.Point{
{
Timestamp: 1689220036000,
Value: 8.83,
},
{
Timestamp: 1689220096000,
Value: 8.83,
},
},
GroupingSetsPoint: &v3.Point{
Timestamp: 0,
Value: 8.83,
},
},
{
Labels: map[string]string{
"service_name": "route",
"operation": "DELETE /api/v1/health",
"status_code": "400",
"priority": "P0",
},
Points: []v3.Point{
{
Timestamp: 1689220036000,
Value: 8.83,
},
{
Timestamp: 1689220096000,
Value: 8.83,
},
},
GroupingSetsPoint: &v3.Point{
Timestamp: 0,
Value: 8.83,
},
},
{
Labels: map[string]string{
"service_name": "route",
"operation": "DELETE /api/v1/health",
"status_code": "200",
"priority": "P1",
},
Points: []v3.Point{
{
Timestamp: 1689220036000,
Value: 8.83,
},
{
Timestamp: 1689220096000,
Value: 8.83,
},
},
GroupingSetsPoint: &v3.Point{
Timestamp: 0,
Value: 8.83,
},
},
},
},
},
params: &v3.QueryRangeParamsV3{
Start: 1689220036000,
End: 1689220096000,
Step: 60,
CompositeQuery: &v3.CompositeQuery{
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
QueryName: "A",
AggregateAttribute: v3.AttributeKey{Key: "signo_calls_total"},
DataSource: v3.DataSourceMetrics,
AggregateOperator: v3.AggregateOperatorSumRate,
Expression: "A",
GroupBy: []v3.AttributeKey{{Key: "service_name"}, {Key: "operation"}, {Key: "status_code"}, {Key: "priority"}},
Limit: 2,
OrderBy: []v3.OrderBy{
{ColumnName: "priority", Order: "asc"},
{ColumnName: "status_code", Order: "desc"},
},
},
},
QueryType: v3.QueryTypeBuilder,
PanelType: v3.PanelTypeGraph,
},
},
expectedResult: []*v3.Result{
{
QueryName: "A",
Series: []*v3.Series{
{
Labels: map[string]string{
"service_name": "frontend",
"operation": "GET /api/v1/health",
"status_code": "200",
"priority": "P0",
},
Points: []v3.Point{
{
Timestamp: 1689220036000,
Value: 19.2,
},
{
Timestamp: 1689220096000,
Value: 19.5,
},
},
GroupingSetsPoint: &v3.Point{
Timestamp: 0,
Value: 19.3,
},
},
{
Labels: map[string]string{
"service_name": "route",
"operation": "DELETE /api/v1/health",
"status_code": "400",
"priority": "P0",
},
Points: []v3.Point{
{
Timestamp: 1689220036000,
Value: 8.83,
},
{
Timestamp: 1689220096000,
Value: 8.83,
},
},
GroupingSetsPoint: &v3.Point{
Timestamp: 0,
Value: 8.83,
},
},
},
},
},
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
result := c.inputResult
applyMetricLimit(result, c.params)
if len(result) != len(c.expectedResult) {
t.Errorf("expected result length: %d, but got: %d", len(c.expectedResult), len(result))
}
for i, r := range result {
if r.QueryName != c.expectedResult[i].QueryName {
t.Errorf("expected query name: %s, but got: %s", c.expectedResult[i].QueryName, r.QueryName)
}
if len(r.Series) != len(c.expectedResult[i].Series) {
t.Errorf("expected series length: %d, but got: %d", len(c.expectedResult[i].Series), len(r.Series))
}
for j, s := range r.Series {
if len(s.Points) != len(c.expectedResult[i].Series[j].Points) {
t.Errorf("expected points length: %d, but got: %d", len(c.expectedResult[i].Series[j].Points), len(s.Points))
}
for k, p := range s.Points {
if p.Timestamp != c.expectedResult[i].Series[j].Points[k].Timestamp {
t.Errorf("expected point timestamp: %d, but got: %d", c.expectedResult[i].Series[j].Points[k].Timestamp, p.Timestamp)
}
if p.Value != c.expectedResult[i].Series[j].Points[k].Value {
t.Errorf("expected point value: %f, but got: %f", c.expectedResult[i].Series[j].Points[k].Value, p.Value)
}
}
}
}
})
}
}

View File

@ -0,0 +1,286 @@
package queryBuilder
import (
"math"
"sort"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
)
// funcCutOffMin cuts off values below the threshold and replaces them with NaN
func funcCutOffMin(result *v3.Result, threshold float64) *v3.Result {
for _, series := range result.Series {
for idx, point := range series.Points {
if point.Value < threshold {
point.Value = math.NaN()
}
series.Points[idx] = point
}
}
return result
}
// funcCutOffMax cuts off values above the threshold and replaces them with NaN
func funcCutOffMax(result *v3.Result, threshold float64) *v3.Result {
for _, series := range result.Series {
for idx, point := range series.Points {
if point.Value > threshold {
point.Value = math.NaN()
}
series.Points[idx] = point
}
}
return result
}
// funcClampMin cuts off values below the threshold and replaces them with the threshold
func funcClampMin(result *v3.Result, threshold float64) *v3.Result {
for _, series := range result.Series {
for idx, point := range series.Points {
if point.Value < threshold {
point.Value = threshold
}
series.Points[idx] = point
}
}
return result
}
// funcClampMax cuts off values above the threshold and replaces them with the threshold
func funcClampMax(result *v3.Result, threshold float64) *v3.Result {
for _, series := range result.Series {
for idx, point := range series.Points {
if point.Value > threshold {
point.Value = threshold
}
series.Points[idx] = point
}
}
return result
}
// funcAbsolute returns the absolute value of each point
func funcAbsolute(result *v3.Result) *v3.Result {
for _, series := range result.Series {
for idx, point := range series.Points {
point.Value = math.Abs(point.Value)
series.Points[idx] = point
}
}
return result
}
// funcLog2 returns the log2 of each point
func funcLog2(result *v3.Result) *v3.Result {
for _, series := range result.Series {
for idx, point := range series.Points {
point.Value = math.Log2(point.Value)
series.Points[idx] = point
}
}
return result
}
// funcLog10 returns the log10 of each point
func funcLog10(result *v3.Result) *v3.Result {
for _, series := range result.Series {
for idx, point := range series.Points {
point.Value = math.Log10(point.Value)
series.Points[idx] = point
}
}
return result
}
// funcCumSum returns the cumulative sum for each point in a series
func funcCumSum(result *v3.Result) *v3.Result {
for _, series := range result.Series {
var sum float64
for idx, point := range series.Points {
if !math.IsNaN(point.Value) {
sum += point.Value
}
point.Value = sum
series.Points[idx] = point
}
}
return result
}
func funcEWMA(result *v3.Result, alpha float64) *v3.Result {
for _, series := range result.Series {
var ewma float64
var initialized bool
for i, point := range series.Points {
if !initialized {
if !math.IsNaN(point.Value) {
// Initialize EWMA with the first non-NaN value
ewma = point.Value
initialized = true
}
// Continue until the EWMA is initialized
continue
}
if !math.IsNaN(point.Value) {
// Update EWMA with the current value
ewma = alpha*point.Value + (1-alpha)*ewma
}
// Set the EWMA value for the current point
series.Points[i].Value = ewma
}
}
return result
}
// funcMedian3 returns the median of 3 points for each point in a series
func funcMedian3(result *v3.Result) *v3.Result {
for _, series := range result.Series {
median3 := make([]float64, 0)
for i := 1; i < len(series.Points)-1; i++ {
values := make([]float64, 0, 3)
// Add non-NaN values to the slice
for j := -1; j <= 1; j++ {
if !math.IsNaN(series.Points[i+j].Value) {
values = append(values, series.Points[i+j].Value)
}
}
// Handle the case where there are not enough values to calculate a median
if len(values) == 0 {
median3 = append(median3, math.NaN())
continue
}
median3 = append(median3, median(values))
}
// Set the median3 values for the series
for i := 1; i < len(series.Points)-1; i++ {
series.Points[i].Value = median3[i-1]
}
}
return result
}
// funcMedian5 returns the median of 5 points for each point in a series
func funcMedian5(result *v3.Result) *v3.Result {
for _, series := range result.Series {
median5 := make([]float64, 0)
for i := 2; i < len(series.Points)-2; i++ {
values := make([]float64, 0, 5)
// Add non-NaN values to the slice
for j := -2; j <= 2; j++ {
if !math.IsNaN(series.Points[i+j].Value) {
values = append(values, series.Points[i+j].Value)
}
}
// Handle the case where there are not enough values to calculate a median
if len(values) == 0 {
median5 = append(median5, math.NaN())
continue
}
median5 = append(median5, median(values))
}
// Set the median5 values for the series
for i := 2; i < len(series.Points)-2; i++ {
series.Points[i].Value = median5[i-2]
}
}
return result
}
// funcMedian7 returns the median of 7 points for each point in a series
func funcMedian7(result *v3.Result) *v3.Result {
for _, series := range result.Series {
median7 := make([]float64, 0)
for i := 3; i < len(series.Points)-3; i++ {
values := make([]float64, 0, 7)
// Add non-NaN values to the slice
for j := -3; j <= 3; j++ {
if !math.IsNaN(series.Points[i+j].Value) {
values = append(values, series.Points[i+j].Value)
}
}
// Handle the case where there are not enough values to calculate a median
if len(values) == 0 {
median7 = append(median7, math.NaN())
continue
}
median7 = append(median7, median(values))
}
// Set the median7 values for the series
for i := 3; i < len(series.Points)-3; i++ {
series.Points[i].Value = median7[i-3]
}
}
return result
}
func median(values []float64) float64 {
sort.Float64s(values)
medianIndex := len(values) / 2
if len(values)%2 == 0 {
return (values[medianIndex-1] + values[medianIndex]) / 2
}
return values[medianIndex]
}
func ApplyFunction(fn v3.Function, result *v3.Result) *v3.Result {
switch fn.Name {
case v3.FunctionNameCutOffMin, v3.FunctionNameCutOffMax, v3.FunctionNameClampMin, v3.FunctionNameClampMax:
threshold, ok := fn.Args[0].(float64)
if !ok {
return result
}
switch fn.Name {
case v3.FunctionNameCutOffMin:
return funcCutOffMin(result, threshold)
case v3.FunctionNameCutOffMax:
return funcCutOffMax(result, threshold)
case v3.FunctionNameClampMin:
return funcClampMin(result, threshold)
case v3.FunctionNameClampMax:
return funcClampMax(result, threshold)
}
case v3.FunctionNameAbsolute:
return funcAbsolute(result)
case v3.FunctionNameLog2:
return funcLog2(result)
case v3.FunctionNameLog10:
return funcLog10(result)
case v3.FunctionNameCumSum:
return funcCumSum(result)
case v3.FunctionNameEWMA3, v3.FunctionNameEWMA5, v3.FunctionNameEWMA7:
alpha, ok := fn.Args[0].(float64)
if !ok {
// alpha = 2 / (n + 1) where n is the window size
if fn.Name == v3.FunctionNameEWMA3 {
alpha = 0.5 // 2 / (3 + 1)
} else if fn.Name == v3.FunctionNameEWMA5 {
alpha = 1 / float64(3) // 2 / (5 + 1)
} else if fn.Name == v3.FunctionNameEWMA7 {
alpha = 0.25 // 2 / (7 + 1)
}
}
return funcEWMA(result, alpha)
case v3.FunctionNameMedian3:
return funcMedian3(result)
case v3.FunctionNameMedian5:
return funcMedian5(result)
case v3.FunctionNameMedian7:
return funcMedian7(result)
}
return result
}

View File

@ -0,0 +1,604 @@
package queryBuilder
import (
"math"
"testing"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
)
func TestFuncCutOffMin(t *testing.T) {
type args struct {
result *v3.Result
threshold float64
}
tests := []struct {
name string
args args
want *v3.Result
}{
{
name: "test funcCutOffMin",
args: args{
result: &v3.Result{
Series: []*v3.Series{
{
Points: []v3.Point{
{
Value: 0.5,
},
{
Value: 0.4,
},
{
Value: 0.3,
},
{
Value: 0.2,
},
{
Value: 0.1,
},
},
},
},
},
threshold: 0.3,
},
want: &v3.Result{
Series: []*v3.Series{
{
Points: []v3.Point{
{
Value: 0.5,
},
{
Value: 0.4,
},
{
Value: 0.3,
},
{
Value: math.NaN(),
},
{
Value: math.NaN(),
},
},
},
},
},
},
{
name: "test funcCutOffMin with threshold 0",
args: args{
result: &v3.Result{
Series: []*v3.Series{
{
Points: []v3.Point{
{
Value: 0.5,
},
{
Value: 0.4,
},
{
Value: 0.3,
},
{
Value: 0.2,
},
{
Value: 0.1,
},
},
},
},
},
threshold: 0,
},
want: &v3.Result{
Series: []*v3.Series{
{
Points: []v3.Point{
{
Value: 0.5,
},
{
Value: 0.4,
},
{
Value: 0.3,
},
{
Value: 0.2,
},
{
Value: 0.1,
},
},
},
},
},
},
}
for _, tt := range tests {
newResult := funcCutOffMin(tt.args.result, tt.args.threshold)
for j, series := range newResult.Series {
for k, point := range series.Points {
if math.IsNaN(tt.want.Series[j].Points[k].Value) {
if !math.IsNaN(point.Value) {
t.Errorf("funcCutOffMin() = %v, want %v", point.Value, tt.want.Series[j].Points[k].Value)
}
continue
}
if point.Value != tt.want.Series[j].Points[k].Value {
t.Errorf("funcCutOffMin() = %v, want %v", point.Value, tt.want.Series[j].Points[k].Value)
}
}
}
}
}
func TestFuncCutOffMax(t *testing.T) {
type args struct {
result *v3.Result
threshold float64
}
tests := []struct {
name string
args args
want *v3.Result
}{
{
name: "test funcCutOffMax",
args: args{
result: &v3.Result{
Series: []*v3.Series{
{
Points: []v3.Point{
{
Value: 0.5,
},
{
Value: 0.4,
},
{
Value: 0.3,
},
{
Value: 0.2,
},
{
Value: 0.1,
},
},
},
},
},
threshold: 0.3,
},
want: &v3.Result{
Series: []*v3.Series{
{
Points: []v3.Point{
{
Value: math.NaN(),
},
{
Value: math.NaN(),
},
{
Value: 0.3,
},
{
Value: 0.2,
},
{
Value: 0.1,
},
},
},
},
},
},
{
name: "test funcCutOffMax with threshold 0",
args: args{
result: &v3.Result{
Series: []*v3.Series{
{
Points: []v3.Point{
{
Value: 0.5,
},
{
Value: 0.4,
},
{
Value: 0.3,
},
{
Value: 0.2,
},
{
Value: 0.1,
},
},
},
},
},
threshold: 0,
},
want: &v3.Result{
Series: []*v3.Series{
{
Points: []v3.Point{
{
Value: math.NaN(),
},
{
Value: math.NaN(),
},
{
Value: math.NaN(),
},
{
Value: math.NaN(),
},
{
Value: math.NaN(),
},
},
},
},
},
},
}
for _, tt := range tests {
newResult := funcCutOffMax(tt.args.result, tt.args.threshold)
for j, series := range newResult.Series {
for k, point := range series.Points {
if math.IsNaN(tt.want.Series[j].Points[k].Value) {
if !math.IsNaN(point.Value) {
t.Errorf("funcCutOffMax() = %v, want %v", point.Value, tt.want.Series[j].Points[k].Value)
}
continue
}
if point.Value != tt.want.Series[j].Points[k].Value {
t.Errorf("funcCutOffMax() = %v, want %v", point.Value, tt.want.Series[j].Points[k].Value)
}
}
}
}
}
func TestCutOffMinCumSum(t *testing.T) {
type args struct {
result *v3.Result
threshold float64
}
tests := []struct {
name string
args args
want *v3.Result
}{
{
name: "test funcCutOffMin followed by funcCumulativeSum",
args: args{
result: &v3.Result{
Series: []*v3.Series{
{
Points: []v3.Point{
{
Value: 0.5,
},
{
Value: 0.2,
},
{
Value: 0.1,
},
{
Value: 0.4,
},
{
Value: 0.3,
},
},
},
},
},
threshold: 0.3,
},
want: &v3.Result{
Series: []*v3.Series{
{
Points: []v3.Point{
{
Value: 0.5,
},
{
Value: 0.5,
},
{
Value: 0.5,
},
{
Value: 0.9,
},
{
Value: 1.2,
},
},
},
},
},
},
}
for _, tt := range tests {
newResult := funcCutOffMin(tt.args.result, tt.args.threshold)
newResult = funcCumSum(newResult)
for j, series := range newResult.Series {
for k, point := range series.Points {
if math.IsNaN(tt.want.Series[j].Points[k].Value) {
if !math.IsNaN(point.Value) {
t.Errorf("funcCutOffMin() = %v, want %v", point.Value, tt.want.Series[j].Points[k].Value)
}
continue
}
if point.Value != tt.want.Series[j].Points[k].Value {
t.Errorf("funcCutOffMin() = %v, want %v", point.Value, tt.want.Series[j].Points[k].Value)
}
}
}
}
}
func TestFuncMedian3(t *testing.T) {
type args struct {
result *v3.Result
}
tests := []struct {
name string
args args
want *v3.Result
}{
{
name: "Values",
args: args{
result: &v3.Result{
Series: []*v3.Series{
{
Points: []v3.Point{{Timestamp: 1, Value: 5}, {Timestamp: 2, Value: 3}, {Timestamp: 3, Value: 8}, {Timestamp: 4, Value: 2}, {Timestamp: 5, Value: 7}},
},
},
},
},
want: &v3.Result{
Series: []*v3.Series{
{
Points: []v3.Point{{Timestamp: 1, Value: 5}, {Timestamp: 2, Value: 5}, {Timestamp: 3, Value: 3}, {Timestamp: 4, Value: 7}, {Timestamp: 5, Value: 7}},
},
},
},
},
{
name: "NaNHandling",
args: args{
result: &v3.Result{
Series: []*v3.Series{
{
Points: []v3.Point{{Timestamp: 1, Value: math.NaN()}, {Timestamp: 2, Value: 3}, {Timestamp: 3, Value: math.NaN()}, {Timestamp: 4, Value: 7}, {Timestamp: 5, Value: 9}},
},
},
},
},
want: &v3.Result{
Series: []*v3.Series{
{
Points: []v3.Point{{Timestamp: 1, Value: math.NaN()}, {Timestamp: 2, Value: 3}, {Timestamp: 3, Value: 5}, {Timestamp: 4, Value: 8}, {Timestamp: 5, Value: 9}},
},
},
},
},
{
name: "UniformValues",
args: args{
result: &v3.Result{
Series: []*v3.Series{
{
Points: []v3.Point{{Timestamp: 1, Value: 7}, {Timestamp: 2, Value: 7}, {Timestamp: 3, Value: 7}, {Timestamp: 4, Value: 7}, {Timestamp: 5, Value: 7}},
},
},
},
},
want: &v3.Result{
Series: []*v3.Series{
{
Points: []v3.Point{{Timestamp: 1, Value: 7}, {Timestamp: 2, Value: 7}, {Timestamp: 3, Value: 7}, {Timestamp: 4, Value: 7}, {Timestamp: 5, Value: 7}},
},
},
},
},
{
name: "SingleValueSeries",
args: args{
result: &v3.Result{
Series: []*v3.Series{
{
Points: []v3.Point{{Timestamp: 1, Value: 9}},
},
},
},
},
want: &v3.Result{
Series: []*v3.Series{
{
Points: []v3.Point{{Timestamp: 1, Value: 9}},
},
},
},
},
{
name: "EmptySeries",
args: args{
result: &v3.Result{
Series: []*v3.Series{
{
Points: []v3.Point{},
},
},
},
},
want: &v3.Result{
Series: []*v3.Series{
{
Points: []v3.Point{},
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := funcMedian3(tt.args.result)
for j, series := range got.Series {
for k, point := range series.Points {
if point.Value != tt.want.Series[j].Points[k].Value && !math.IsNaN(tt.want.Series[j].Points[k].Value) {
t.Errorf("funcMedian3() = %v, want %v", point.Value, tt.want.Series[j].Points[k].Value)
}
}
}
})
}
}
func TestFuncMedian5(t *testing.T) {
type args struct {
result *v3.Result
}
tests := []struct {
name string
args args
want *v3.Result
}{
{
name: "Values",
args: args{
result: &v3.Result{
Series: []*v3.Series{
{
Points: []v3.Point{{Timestamp: 1, Value: 5}, {Timestamp: 2, Value: 3}, {Timestamp: 3, Value: 8}, {Timestamp: 4, Value: 2}, {Timestamp: 5, Value: 7}, {Timestamp: 6, Value: 9}, {Timestamp: 7, Value: 1}, {Timestamp: 8, Value: 4}, {Timestamp: 9, Value: 6}, {Timestamp: 10, Value: 10}},
},
},
},
},
want: &v3.Result{
Series: []*v3.Series{
{
Points: []v3.Point{{Timestamp: 1, Value: 5}, {Timestamp: 2, Value: 3}, {Timestamp: 3, Value: 5}, {Timestamp: 4, Value: 7}, {Timestamp: 5, Value: 7}, {Timestamp: 6, Value: 4}, {Timestamp: 7, Value: 6}, {Timestamp: 8, Value: 6}, {Timestamp: 9, Value: 6}, {Timestamp: 10, Value: 10}},
},
},
},
},
{
name: "NaNHandling",
args: args{
result: &v3.Result{
Series: []*v3.Series{
{
Points: []v3.Point{{Timestamp: 1, Value: math.NaN()}, {Timestamp: 2, Value: 3}, {Timestamp: 3, Value: math.NaN()}, {Timestamp: 4, Value: 7}, {Timestamp: 5, Value: 9}, {Timestamp: 6, Value: 1}, {Timestamp: 7, Value: 4}, {Timestamp: 8, Value: 6}, {Timestamp: 9, Value: 10}, {Timestamp: 10, Value: 2}},
},
},
},
},
want: &v3.Result{
Series: []*v3.Series{
{
Points: []v3.Point{{Timestamp: 1, Value: math.NaN()}, {Timestamp: 2, Value: 3}, {Timestamp: 3, Value: 7}, {Timestamp: 4, Value: 5}, {Timestamp: 5, Value: 5.5}, {Timestamp: 6, Value: 6}, {Timestamp: 7, Value: 6}, {Timestamp: 8, Value: 4}, {Timestamp: 9, Value: 10}, {Timestamp: 10, Value: 2}},
},
},
},
},
{
name: "UniformValues",
args: args{
result: &v3.Result{
Series: []*v3.Series{
{
Points: []v3.Point{{Timestamp: 1, Value: 7}, {Timestamp: 2, Value: 7}, {Timestamp: 3, Value: 7}, {Timestamp: 4, Value: 7}, {Timestamp: 5, Value: 7}},
},
},
},
},
want: &v3.Result{
Series: []*v3.Series{
{
Points: []v3.Point{{Timestamp: 1, Value: 7}, {Timestamp: 2, Value: 7}, {Timestamp: 3, Value: 7}, {Timestamp: 4, Value: 7}, {Timestamp: 5, Value: 7}},
},
},
},
},
{
name: "SingleValueSeries",
args: args{
result: &v3.Result{
Series: []*v3.Series{
{
Points: []v3.Point{{Timestamp: 1, Value: 9}},
},
},
},
},
want: &v3.Result{
Series: []*v3.Series{
{
Points: []v3.Point{{Timestamp: 1, Value: 9}},
},
},
},
},
{
name: "EmptySeries",
args: args{
result: &v3.Result{
Series: []*v3.Series{
{
Points: []v3.Point{},
},
},
},
},
want: &v3.Result{
Series: []*v3.Series{
{
Points: []v3.Point{},
},
},
},
},
}
for _, tt := range tests {
got := funcMedian5(tt.args.result)
for j, series := range got.Series {
for k, point := range series.Points {
if point.Value != tt.want.Series[j].Points[k].Value && !math.IsNaN(tt.want.Series[j].Points[k].Value) {
t.Errorf("funcMedian5() = %v, want %v", point.Value, tt.want.Series[j].Points[k].Value)
}
}
}
}
}

View File

@ -0,0 +1,71 @@
package app
import (
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
)
// applyReduceTo applies the reduceTo operator to each series
// and returns a new series with the reduced value
// reduceTo can be one of the following:
// - last
// - sum
// - avg
// - min
// - max
func applyReduceTo(result []*v3.Result, queryRangeParams *v3.QueryRangeParamsV3) {
for _, result := range result {
builderQueries := queryRangeParams.CompositeQuery.BuilderQueries
// reduceTo is only applicable for metrics data source
// and for table and value panels
if builderQueries[result.QueryName] != nil && (builderQueries[result.QueryName].DataSource == v3.DataSourceMetrics &&
(queryRangeParams.CompositeQuery.PanelType == v3.PanelTypeTable || queryRangeParams.CompositeQuery.PanelType == v3.PanelTypeValue)) {
reduceTo := builderQueries[result.QueryName].ReduceTo
switch reduceTo {
case v3.ReduceToOperatorLast:
for i := 0; i < len(result.Series); i++ {
if len(result.Series[i].Points) > 0 {
result.Series[i].Points = []v3.Point{result.Series[i].Points[len(result.Series[i].Points)-1]}
}
}
case v3.ReduceToOperatorSum:
for i := 0; i < len(result.Series); i++ {
var sum float64
for j := 0; j < len(result.Series[i].Points); j++ {
sum += result.Series[i].Points[j].Value
}
result.Series[i].Points = []v3.Point{{Value: sum}}
}
case v3.ReduceToOperatorAvg:
for i := 0; i < len(result.Series); i++ {
var sum float64
for j := 0; j < len(result.Series[i].Points); j++ {
sum += result.Series[i].Points[j].Value
}
result.Series[i].Points = []v3.Point{{Value: sum / float64(len(result.Series[i].Points))}}
}
case v3.ReduceToOperatorMin:
for i := 0; i < len(result.Series); i++ {
var min float64
for j := 0; j < len(result.Series[i].Points); j++ {
if j == 0 || result.Series[i].Points[j].Value < min {
min = result.Series[i].Points[j].Value
}
}
result.Series[i].Points = []v3.Point{{Value: min}}
}
case v3.ReduceToOperatorMax:
for i := 0; i < len(result.Series); i++ {
var max float64
for j := 0; j < len(result.Series[i].Points); j++ {
if j == 0 || result.Series[i].Points[j].Value > max {
max = result.Series[i].Points[j].Value
}
}
result.Series[i].Points = []v3.Point{{Value: max}}
}
}
}
}
}

View File

@ -0,0 +1,99 @@
package app
import (
"testing"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
)
func TestApplyReduceTo(t *testing.T) {
type testCase struct {
name string
results []*v3.Result
params *v3.QueryRangeParamsV3
want []*v3.Result
}
testCases := []testCase{
{
name: "test reduce to",
results: []*v3.Result{
{
QueryName: "A",
Series: []*v3.Series{
{
Points: []v3.Point{
{
Value: 0.5,
},
{
Value: 0.4,
},
{
Value: 0.3,
},
{
Value: 0.2,
},
{
Value: 0.1,
},
},
},
},
},
},
params: &v3.QueryRangeParamsV3{
CompositeQuery: &v3.CompositeQuery{
PanelType: v3.PanelTypeValue,
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
DataSource: v3.DataSourceMetrics,
ReduceTo: v3.ReduceToOperatorSum,
},
},
},
},
want: []*v3.Result{
{
QueryName: "A",
Series: []*v3.Series{
{
Points: []v3.Point{
{
Value: 1.5,
},
},
},
},
},
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
applyReduceTo(tc.results, tc.params)
got := tc.results
for _, gotResult := range got {
for _, wantResult := range tc.want {
if gotResult.QueryName == wantResult.QueryName {
if len(gotResult.Series) != len(wantResult.Series) {
t.Errorf("got %v, want %v", gotResult.Series, wantResult.Series)
} else {
for i, gotSeries := range gotResult.Series {
for j, gotPoint := range gotSeries.Points {
if gotPoint.Value != wantResult.Series[i].Points[j].Value {
t.Errorf("got %v, want %v", gotPoint.Value, wantResult.Series[i].Points[j].Value)
}
}
}
}
}
}
}
})
}
}

View File

@ -6,6 +6,7 @@ import (
"fmt"
"sort"
"strconv"
"strings"
"time"
"github.com/google/uuid"
@ -482,10 +483,50 @@ const (
SpaceAggregationCount SpaceAggregation = "count"
)
type FunctionName string
const (
FunctionNameCutOffMin FunctionName = "cutOffMin"
FunctionNameCutOffMax FunctionName = "cutOffMax"
FunctionNameClampMin FunctionName = "clampMin"
FunctionNameClampMax FunctionName = "clampMax"
FunctionNameAbsolute FunctionName = "absolute"
FunctionNameLog2 FunctionName = "log2"
FunctionNameLog10 FunctionName = "log10"
FunctionNameCumSum FunctionName = "cumSum"
FunctionNameEWMA3 FunctionName = "ewma3"
FunctionNameEWMA5 FunctionName = "ewma5"
FunctionNameEWMA7 FunctionName = "ewma7"
FunctionNameMedian3 FunctionName = "median3"
FunctionNameMedian5 FunctionName = "median5"
FunctionNameMedian7 FunctionName = "median7"
)
func (f FunctionName) Validate() error {
switch f {
case FunctionNameCutOffMin,
FunctionNameCutOffMax,
FunctionNameClampMin,
FunctionNameClampMax,
FunctionNameAbsolute,
FunctionNameLog2,
FunctionNameLog10,
FunctionNameCumSum,
FunctionNameEWMA3,
FunctionNameEWMA5,
FunctionNameEWMA7,
FunctionNameMedian3,
FunctionNameMedian5,
FunctionNameMedian7:
return nil
default:
return fmt.Errorf("invalid function name: %s", f)
}
}
type Function struct {
Category string `json:"category"`
Name string `json:"name"`
Args []interface{} `json:"args,omitempty"`
Name FunctionName `json:"name"`
Args []interface{} `json:"args,omitempty"`
}
type BuilderQuery struct {
@ -562,6 +603,14 @@ func (b *BuilderQuery) Validate() error {
}
}
if b.Having != nil {
for _, having := range b.Having {
if err := having.Operator.Validate(); err != nil {
return fmt.Errorf("having operator is invalid: %w", err)
}
}
}
for _, selectColumn := range b.SelectColumns {
if err := selectColumn.Validate(); err != nil {
return fmt.Errorf("select column is invalid %w", err)
@ -571,6 +620,15 @@ func (b *BuilderQuery) Validate() error {
if b.Expression == "" {
return fmt.Errorf("expression is required")
}
if len(b.Functions) > 0 {
for _, function := range b.Functions {
if err := function.Name.Validate(); err != nil {
return fmt.Errorf("function name is invalid: %w", err)
}
}
}
return nil
}
@ -655,10 +713,43 @@ type OrderBy struct {
IsColumn bool `json:"-"`
}
// See HAVING_OPERATORS in queryBuilder.ts
type HavingOperator string
const (
HavingOperatorEqual HavingOperator = "="
HavingOperatorNotEqual HavingOperator = "!="
HavingOperatorGreaterThan HavingOperator = ">"
HavingOperatorGreaterThanOrEq HavingOperator = ">="
HavingOperatorLessThan HavingOperator = "<"
HavingOperatorLessThanOrEq HavingOperator = "<="
HavingOperatorIn HavingOperator = "IN"
HavingOperatorNotIn HavingOperator = "NOT_IN"
)
func (h HavingOperator) Validate() error {
switch h {
case HavingOperatorEqual,
HavingOperatorNotEqual,
HavingOperatorGreaterThan,
HavingOperatorGreaterThanOrEq,
HavingOperatorLessThan,
HavingOperatorLessThanOrEq,
HavingOperatorIn,
HavingOperatorNotIn,
HavingOperator(strings.ToLower(string(HavingOperatorIn))),
HavingOperator(strings.ToLower(string(HavingOperatorNotIn))):
return nil
default:
return fmt.Errorf("invalid having operator: %s", h)
}
}
type Having struct {
ColumnName string `json:"columnName"`
Operator string `json:"op"`
Value interface{} `json:"value"`
ColumnName string `json:"columnName"`
Operator HavingOperator `json:"op"`
Value interface{} `json:"value"`
}
func (h *Having) CacheKey() string {