Merge branch 'develop' into release/v0.58.x

This commit is contained in:
Prashant Shahi 2024-11-16 04:50:33 +05:30
commit d859301d30
8 changed files with 862 additions and 89 deletions

View File

@ -12,6 +12,7 @@ import (
"go.signoz.io/signoz/pkg/query-service/app/queryBuilder" "go.signoz.io/signoz/pkg/query-service/app/queryBuilder"
tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3" tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3"
"go.signoz.io/signoz/pkg/query-service/common" "go.signoz.io/signoz/pkg/query-service/common"
"go.signoz.io/signoz/pkg/query-service/constants"
chErrors "go.signoz.io/signoz/pkg/query-service/errors" chErrors "go.signoz.io/signoz/pkg/query-service/errors"
"go.signoz.io/signoz/pkg/query-service/querycache" "go.signoz.io/signoz/pkg/query-service/querycache"
"go.signoz.io/signoz/pkg/query-service/utils" "go.signoz.io/signoz/pkg/query-service/utils"
@ -52,7 +53,8 @@ type querier struct {
returnedSeries []*v3.Series returnedSeries []*v3.Series
returnedErr error returnedErr error
UseLogsNewSchema bool UseLogsNewSchema bool
UseTraceNewSchema bool
} }
type QuerierOptions struct { type QuerierOptions struct {
@ -308,56 +310,121 @@ func (q *querier) runClickHouseQueries(ctx context.Context, params *v3.QueryRang
return results, errQueriesByName, err return results, errQueriesByName, err
} }
func (q *querier) runLogsListQuery(ctx context.Context, params *v3.QueryRangeParamsV3, tsRanges []utils.LogsListTsRange) ([]*v3.Result, map[string]error, error) { func (q *querier) runWindowBasedListQuery(ctx context.Context, params *v3.QueryRangeParamsV3, tsRanges []utils.LogsListTsRange) ([]*v3.Result, map[string]error, error) {
res := make([]*v3.Result, 0) res := make([]*v3.Result, 0)
qName := "" qName := ""
pageSize := uint64(0) pageSize := uint64(0)
limit := uint64(0)
offset := uint64(0)
// se we are considering only one query // se we are considering only one query
for name, v := range params.CompositeQuery.BuilderQueries { for name, v := range params.CompositeQuery.BuilderQueries {
qName = name qName = name
pageSize = v.PageSize pageSize = v.PageSize
// for traces specifically
limit = v.Limit
offset = v.Offset
} }
data := []*v3.Row{} data := []*v3.Row{}
tracesLimit := limit + offset
for _, v := range tsRanges { for _, v := range tsRanges {
params.Start = v.Start params.Start = v.Start
params.End = v.End params.End = v.End
params.CompositeQuery.BuilderQueries[qName].PageSize = pageSize - uint64(len(data)) length := uint64(0)
queries, err := q.builder.PrepareQueries(params)
if err != nil {
return nil, nil, err
}
// this will to run only once // this will to run only once
for name, query := range queries {
rowList, err := q.reader.GetListResultV3(ctx, query) // appending the filter to get the next set of data
if params.CompositeQuery.BuilderQueries[qName].DataSource == v3.DataSourceLogs {
params.CompositeQuery.BuilderQueries[qName].PageSize = pageSize - uint64(len(data))
queries, err := q.builder.PrepareQueries(params)
if err != nil { if err != nil {
errs := []error{err} return nil, nil, err
errQuriesByName := map[string]error{ }
name: err, for name, query := range queries {
} rowList, err := q.reader.GetListResultV3(ctx, query)
return nil, errQuriesByName, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...)) if err != nil {
errs := []error{err}
errQueriesByName := map[string]error{
name: err,
}
return nil, errQueriesByName, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...))
}
length += uint64(len(rowList))
data = append(data, rowList...)
} }
data = append(data, rowList...)
}
// append a filter to the params if length > 0 {
if len(data) > 0 { params.CompositeQuery.BuilderQueries[qName].Filters.Items = append(params.CompositeQuery.BuilderQueries[qName].Filters.Items, v3.FilterItem{
params.CompositeQuery.BuilderQueries[qName].Filters.Items = append(params.CompositeQuery.BuilderQueries[qName].Filters.Items, v3.FilterItem{ Key: v3.AttributeKey{
Key: v3.AttributeKey{ Key: "id",
Key: "id", IsColumn: true,
IsColumn: true, DataType: "string",
DataType: "string", },
}, Operator: v3.FilterOperatorLessThan,
Operator: v3.FilterOperatorLessThan, Value: data[len(data)-1].Data["id"],
Value: data[len(data)-1].Data["id"], })
}) }
}
if uint64(len(data)) >= pageSize { if uint64(len(data)) >= pageSize {
break break
}
} else {
// TRACE
// we are updating the offset and limit based on the number of traces we have found in the current timerange
// eg -
// 1)offset = 0, limit = 100, tsRanges = [t1, t10], [t10, 20], [t20, t30]
//
// if 100 traces are there in [t1, t10] then 100 will return immediately.
// if 10 traces are there in [t1, t10] then we get 10, set offset to 0 and limit to 90, search in the next timerange of [t10, 20]
// if we don't find any trace in [t1, t10], then we search in [t10, 20] with offset=0, limit=100
//
// 2) offset = 50, limit = 100, tsRanges = [t1, t10], [t10, 20], [t20, t30]
//
// If we find 150 traces with limit=150 and offset=0 in [t1, t10] then we return immediately 100 traces
// If we find 50 in [t1, t10] with limit=150 and offset=0 then it will set limit = 100 and offset=0 and search in the next timerange of [t10, 20]
// if we don't find any trace in [t1, t10], then we search in [t10, 20] with limit=150 and offset=0
// max limit + offset is 10k for pagination
if tracesLimit > constants.TRACE_V4_MAX_PAGINATION_LIMIT {
return nil, nil, fmt.Errorf("maximum traces that can be paginated is 10000")
}
params.CompositeQuery.BuilderQueries[qName].Offset = 0
params.CompositeQuery.BuilderQueries[qName].Limit = tracesLimit
queries, err := q.builder.PrepareQueries(params)
if err != nil {
return nil, nil, err
}
for name, query := range queries {
rowList, err := q.reader.GetListResultV3(ctx, query)
if err != nil {
errs := []error{err}
errQueriesByName := map[string]error{
name: err,
}
return nil, errQueriesByName, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...))
}
length += uint64(len(rowList))
// skip the traces unless offset is 0
for _, row := range rowList {
if offset == 0 {
data = append(data, row)
} else {
offset--
}
}
}
tracesLimit = tracesLimit - length
if uint64(len(data)) >= limit {
break
}
} }
} }
res = append(res, &v3.Result{ res = append(res, &v3.Result{
@ -368,15 +435,25 @@ func (q *querier) runLogsListQuery(ctx context.Context, params *v3.QueryRangePar
} }
func (q *querier) runBuilderListQueries(ctx context.Context, params *v3.QueryRangeParamsV3) ([]*v3.Result, map[string]error, error) { func (q *querier) runBuilderListQueries(ctx context.Context, params *v3.QueryRangeParamsV3) ([]*v3.Result, map[string]error, error) {
// List query has support for only one query. // List query has support for only one query
if q.UseLogsNewSchema && params.CompositeQuery != nil && len(params.CompositeQuery.BuilderQueries) == 1 { // we are skipping for PanelTypeTrace as it has a custom order by regardless of what's in the payload
if params.CompositeQuery != nil &&
len(params.CompositeQuery.BuilderQueries) == 1 &&
params.CompositeQuery.PanelType != v3.PanelTypeTrace {
for _, v := range params.CompositeQuery.BuilderQueries { for _, v := range params.CompositeQuery.BuilderQueries {
if (v.DataSource == v3.DataSourceLogs && !q.UseLogsNewSchema) ||
(v.DataSource == v3.DataSourceTraces && !q.UseTraceNewSchema) {
break
}
// only allow of logs queries with timestamp ordering desc // only allow of logs queries with timestamp ordering desc
if v.DataSource == v3.DataSourceLogs && len(v.OrderBy) == 1 && v.OrderBy[0].ColumnName == "timestamp" && v.OrderBy[0].Order == "desc" { // TODO(nitya): allow for timestamp asc
startEndArr := utils.GetLogsListTsRanges(params.Start, params.End) if (v.DataSource == v3.DataSourceLogs || v.DataSource == v3.DataSourceTraces) &&
if len(startEndArr) > 0 { len(v.OrderBy) == 1 &&
return q.runLogsListQuery(ctx, params, startEndArr) v.OrderBy[0].ColumnName == "timestamp" &&
} v.OrderBy[0].Order == "desc" {
startEndArr := utils.GetListTsRanges(params.Start, params.End)
return q.runWindowBasedListQuery(ctx, params, startEndArr)
} }
} }
} }
@ -408,13 +485,13 @@ func (q *querier) runBuilderListQueries(ctx context.Context, params *v3.QueryRan
close(ch) close(ch)
var errs []error var errs []error
errQuriesByName := make(map[string]error) errQueriesByName := make(map[string]error)
res := make([]*v3.Result, 0) res := make([]*v3.Result, 0)
// read values from the channel // read values from the channel
for r := range ch { for r := range ch {
if r.Err != nil { if r.Err != nil {
errs = append(errs, r.Err) errs = append(errs, r.Err)
errQuriesByName[r.Name] = r.Err errQueriesByName[r.Name] = r.Err
continue continue
} }
res = append(res, &v3.Result{ res = append(res, &v3.Result{
@ -423,7 +500,7 @@ func (q *querier) runBuilderListQueries(ctx context.Context, params *v3.QueryRan
}) })
} }
if len(errs) != 0 { if len(errs) != 0 {
return nil, errQuriesByName, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...)) return nil, errQueriesByName, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...))
} }
return res, nil, nil return res, nil, nil
} }

View File

@ -5,15 +5,21 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"math" "math"
"regexp"
"strings" "strings"
"testing" "testing"
"time" "time"
cmock "github.com/srikanthccv/ClickHouse-go-mock"
"github.com/stretchr/testify/require"
"go.signoz.io/signoz/pkg/query-service/app/clickhouseReader"
"go.signoz.io/signoz/pkg/query-service/app/queryBuilder" "go.signoz.io/signoz/pkg/query-service/app/queryBuilder"
tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3" tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3"
"go.signoz.io/signoz/pkg/query-service/cache/inmemory" "go.signoz.io/signoz/pkg/query-service/cache/inmemory"
"go.signoz.io/signoz/pkg/query-service/featureManager"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3" v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/query-service/querycache" "go.signoz.io/signoz/pkg/query-service/querycache"
"go.signoz.io/signoz/pkg/query-service/utils"
) )
func minTimestamp(series []*v3.Series) int64 { func minTimestamp(series []*v3.Series) int64 {
@ -1124,3 +1130,304 @@ func TestQueryRangeValueTypePromQL(t *testing.T) {
} }
} }
} }
type regexMatcher struct {
}
func (m *regexMatcher) Match(expectedSQL, actualSQL string) error {
re, err := regexp.Compile(expectedSQL)
if err != nil {
return err
}
if !re.MatchString(actualSQL) {
return fmt.Errorf("expected query to contain %s, got %s", expectedSQL, actualSQL)
}
return nil
}
func Test_querier_runWindowBasedListQuery(t *testing.T) {
params := &v3.QueryRangeParamsV3{
Start: 1722171576000000000, // July 28, 2024 6:29:36 PM
End: 1722262800000000000, // July 29, 2024 7:50:00 PM
CompositeQuery: &v3.CompositeQuery{
PanelType: v3.PanelTypeList,
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
QueryName: "A",
Expression: "A",
DataSource: v3.DataSourceTraces,
PageSize: 10,
Limit: 100,
StepInterval: 60,
AggregateOperator: v3.AggregateOperatorNoOp,
SelectColumns: []v3.AttributeKey{{Key: "serviceName"}},
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{},
},
},
},
},
}
tsRanges := []utils.LogsListTsRange{
{
Start: 1722259200000000000, // July 29, 2024 6:50:00 PM
End: 1722262800000000000, // July 29, 2024 7:50:00 PM
},
{
Start: 1722252000000000000, // July 29, 2024 4:50:00 PM
End: 1722259200000000000, // July 29, 2024 6:50:00 PM
},
{
Start: 1722237600000000000, // July 29, 2024 12:50:00 PM
End: 1722252000000000000, // July 29, 2024 4:50:00 PM
},
{
Start: 1722208800000000000, // July 29, 2024 4:50:00 AM
End: 1722237600000000000, // July 29, 2024 12:50:00 PM
},
{
Start: 1722171576000000000, // July 28, 2024 6:29:36 PM
End: 1722208800000000000, // July 29, 2024 4:50:00 AM
},
}
type queryParams struct {
start int64
end int64
limit uint64
offset uint64
}
type queryResponse struct {
expectedQuery string
timestamps []uint64
}
// create test struct with moc data i.e array of timestamps, limit, offset and expected results
testCases := []struct {
name string
queryResponses []queryResponse
queryParams queryParams
expectedTimestamps []int64
expectedError bool
}{
{
name: "should return correct timestamps when querying within time window",
queryResponses: []queryResponse{
{
expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 2",
timestamps: []uint64{1722259300000000000, 1722259400000000000},
},
},
queryParams: queryParams{
start: 1722171576000000000,
end: 1722262800000000000,
limit: 2,
offset: 0,
},
expectedTimestamps: []int64{1722259300000000000, 1722259400000000000},
},
{
name: "all data not in first windows",
queryResponses: []queryResponse{
{
expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 3",
timestamps: []uint64{1722259300000000000, 1722259400000000000},
},
{
expectedQuery: ".*(timestamp >= '1722252000000000000' AND timestamp <= '1722259200000000000').* DESC LIMIT 1",
timestamps: []uint64{1722253000000000000},
},
},
queryParams: queryParams{
start: 1722171576000000000,
end: 1722262800000000000,
limit: 3,
offset: 0,
},
expectedTimestamps: []int64{1722259300000000000, 1722259400000000000, 1722253000000000000},
},
{
name: "data in multiple windows",
queryResponses: []queryResponse{
{
expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 5",
timestamps: []uint64{1722259300000000000, 1722259400000000000},
},
{
expectedQuery: ".*(timestamp >= '1722252000000000000' AND timestamp <= '1722259200000000000').* DESC LIMIT 3",
timestamps: []uint64{1722253000000000000},
},
{
expectedQuery: ".*(timestamp >= '1722237600000000000' AND timestamp <= '1722252000000000000').* DESC LIMIT 2",
timestamps: []uint64{1722237700000000000},
},
{
expectedQuery: ".*(timestamp >= '1722208800000000000' AND timestamp <= '1722237600000000000').* DESC LIMIT 1",
timestamps: []uint64{},
},
{
expectedQuery: ".*(timestamp >= '1722171576000000000' AND timestamp <= '1722208800000000000').* DESC LIMIT 1",
timestamps: []uint64{},
},
},
queryParams: queryParams{
start: 1722171576000000000,
end: 1722262800000000000,
limit: 5,
offset: 0,
},
expectedTimestamps: []int64{1722259300000000000, 1722259400000000000, 1722253000000000000, 1722237700000000000},
},
{
name: "query with offset",
queryResponses: []queryResponse{
{
expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 7",
timestamps: []uint64{1722259210000000000, 1722259220000000000, 1722259230000000000},
},
{
expectedQuery: ".*(timestamp >= '1722252000000000000' AND timestamp <= '1722259200000000000').* DESC LIMIT 4",
timestamps: []uint64{1722253000000000000, 1722254000000000000, 1722255000000000000},
},
{
expectedQuery: ".*(timestamp >= '1722237600000000000' AND timestamp <= '1722252000000000000').* DESC LIMIT 1",
timestamps: []uint64{1722237700000000000},
},
},
queryParams: queryParams{
start: 1722171576000000000,
end: 1722262800000000000,
limit: 4,
offset: 3,
},
expectedTimestamps: []int64{1722253000000000000, 1722254000000000000, 1722255000000000000, 1722237700000000000},
},
{
name: "query with offset and limit- data spread across multiple windows",
queryResponses: []queryResponse{
{
expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 11",
timestamps: []uint64{},
},
{
expectedQuery: ".*(timestamp >= '1722252000000000000' AND timestamp <= '1722259200000000000').* DESC LIMIT 11",
timestamps: []uint64{1722253000000000000, 1722254000000000000, 1722255000000000000},
},
{
expectedQuery: ".*(timestamp >= '1722237600000000000' AND timestamp <= '1722252000000000000').* DESC LIMIT 8",
timestamps: []uint64{1722237700000000000, 1722237800000000000, 1722237900000000000, 1722237910000000000, 1722237920000000000},
},
{
expectedQuery: ".*(timestamp >= '1722208800000000000' AND timestamp <= '1722237600000000000').* DESC LIMIT 3",
timestamps: []uint64{1722208810000000000, 1722208820000000000, 1722208830000000000},
},
},
queryParams: queryParams{
start: 1722171576000000000,
end: 1722262800000000000,
limit: 5,
offset: 6,
},
expectedTimestamps: []int64{1722237910000000000, 1722237920000000000, 1722208810000000000, 1722208820000000000, 1722208830000000000},
},
{
name: "don't allow pagination to get more than 10k spans",
queryResponses: []queryResponse{},
queryParams: queryParams{
start: 1722171576000000000,
end: 1722262800000000000,
limit: 10,
offset: 9991,
},
expectedError: true,
},
}
cols := []cmock.ColumnType{
{Name: "timestamp", Type: "UInt64"},
{Name: "name", Type: "String"},
}
testName := "name"
options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace")
// iterate over test data, create reader and run test
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Setup mock
mock, err := cmock.NewClickHouseWithQueryMatcher(nil, &regexMatcher{})
require.NoError(t, err, "Failed to create ClickHouse mock")
// Configure mock responses
for _, response := range tc.queryResponses {
values := make([][]any, 0, len(response.timestamps))
for _, ts := range response.timestamps {
values = append(values, []any{&ts, &testName})
}
// if len(values) > 0 {
mock.ExpectQuery(response.expectedQuery).WillReturnRows(
cmock.NewRows(cols, values),
)
// }
}
// Create reader and querier
reader := clickhouseReader.NewReaderFromClickhouseConnection(
mock,
options,
nil,
"",
featureManager.StartManager(),
"",
true,
)
q := &querier{
reader: reader,
builder: queryBuilder.NewQueryBuilder(
queryBuilder.QueryBuilderOptions{
BuildTraceQuery: tracesV3.PrepareTracesQuery,
},
featureManager.StartManager(),
),
}
// Update query parameters
params.Start = tc.queryParams.start
params.End = tc.queryParams.end
params.CompositeQuery.BuilderQueries["A"].Limit = tc.queryParams.limit
params.CompositeQuery.BuilderQueries["A"].Offset = tc.queryParams.offset
// Execute query
results, errMap, err := q.runWindowBasedListQuery(context.Background(), params, tsRanges)
if tc.expectedError {
require.Error(t, err)
return
}
// Assertions
require.NoError(t, err, "Query execution failed")
require.Nil(t, errMap, "Unexpected error map in results")
require.Len(t, results, 1, "Expected exactly one result set")
result := results[0]
require.Equal(t, "A", result.QueryName, "Incorrect query name in results")
require.Len(t, result.List, len(tc.expectedTimestamps),
"Result count mismatch: got %d results, expected %d",
len(result.List), len(tc.expectedTimestamps))
for i, expected := range tc.expectedTimestamps {
require.Equal(t, expected, result.List[i].Timestamp.UnixNano(),
"Timestamp mismatch at index %d: got %d, expected %d",
i, result.List[i].Timestamp.UnixNano(), expected)
}
// Verify mock expectations
err = mock.ExpectationsWereMet()
require.NoError(t, err, "Mock expectations were not met")
})
}
}

View File

@ -12,6 +12,7 @@ import (
"go.signoz.io/signoz/pkg/query-service/app/queryBuilder" "go.signoz.io/signoz/pkg/query-service/app/queryBuilder"
tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3" tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3"
"go.signoz.io/signoz/pkg/query-service/common" "go.signoz.io/signoz/pkg/query-service/common"
"go.signoz.io/signoz/pkg/query-service/constants"
chErrors "go.signoz.io/signoz/pkg/query-service/errors" chErrors "go.signoz.io/signoz/pkg/query-service/errors"
"go.signoz.io/signoz/pkg/query-service/querycache" "go.signoz.io/signoz/pkg/query-service/querycache"
"go.signoz.io/signoz/pkg/query-service/utils" "go.signoz.io/signoz/pkg/query-service/utils"
@ -48,10 +49,11 @@ type querier struct {
testingMode bool testingMode bool
queriesExecuted []string queriesExecuted []string
// tuple of start and end time in milliseconds // tuple of start and end time in milliseconds
timeRanges [][]int timeRanges [][]int
returnedSeries []*v3.Series returnedSeries []*v3.Series
returnedErr error returnedErr error
UseLogsNewSchema bool UseLogsNewSchema bool
UseTraceNewSchema bool
} }
type QuerierOptions struct { type QuerierOptions struct {
@ -308,56 +310,121 @@ func (q *querier) runClickHouseQueries(ctx context.Context, params *v3.QueryRang
return results, errQueriesByName, err return results, errQueriesByName, err
} }
func (q *querier) runLogsListQuery(ctx context.Context, params *v3.QueryRangeParamsV3, tsRanges []utils.LogsListTsRange) ([]*v3.Result, map[string]error, error) { func (q *querier) runWindowBasedListQuery(ctx context.Context, params *v3.QueryRangeParamsV3, tsRanges []utils.LogsListTsRange) ([]*v3.Result, map[string]error, error) {
res := make([]*v3.Result, 0) res := make([]*v3.Result, 0)
qName := "" qName := ""
pageSize := uint64(0) pageSize := uint64(0)
limit := uint64(0)
offset := uint64(0)
// se we are considering only one query // se we are considering only one query
for name, v := range params.CompositeQuery.BuilderQueries { for name, v := range params.CompositeQuery.BuilderQueries {
qName = name qName = name
pageSize = v.PageSize pageSize = v.PageSize
// for traces specifically
limit = v.Limit
offset = v.Offset
} }
data := []*v3.Row{} data := []*v3.Row{}
tracesLimit := limit + offset
for _, v := range tsRanges { for _, v := range tsRanges {
params.Start = v.Start params.Start = v.Start
params.End = v.End params.End = v.End
params.CompositeQuery.BuilderQueries[qName].PageSize = pageSize - uint64(len(data)) length := uint64(0)
queries, err := q.builder.PrepareQueries(params)
if err != nil {
return nil, nil, err
}
// this will to run only once // this will to run only once
for name, query := range queries {
rowList, err := q.reader.GetListResultV3(ctx, query) // appending the filter to get the next set of data
if params.CompositeQuery.BuilderQueries[qName].DataSource == v3.DataSourceLogs {
params.CompositeQuery.BuilderQueries[qName].PageSize = pageSize - uint64(len(data))
queries, err := q.builder.PrepareQueries(params)
if err != nil { if err != nil {
errs := []error{err} return nil, nil, err
errQuriesByName := map[string]error{ }
name: err, for name, query := range queries {
} rowList, err := q.reader.GetListResultV3(ctx, query)
return nil, errQuriesByName, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...)) if err != nil {
errs := []error{err}
errQueriesByName := map[string]error{
name: err,
}
return nil, errQueriesByName, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...))
}
length += uint64(len(rowList))
data = append(data, rowList...)
} }
data = append(data, rowList...)
}
// append a filter to the params if length > 0 {
if len(data) > 0 { params.CompositeQuery.BuilderQueries[qName].Filters.Items = append(params.CompositeQuery.BuilderQueries[qName].Filters.Items, v3.FilterItem{
params.CompositeQuery.BuilderQueries[qName].Filters.Items = append(params.CompositeQuery.BuilderQueries[qName].Filters.Items, v3.FilterItem{ Key: v3.AttributeKey{
Key: v3.AttributeKey{ Key: "id",
Key: "id", IsColumn: true,
IsColumn: true, DataType: "string",
DataType: "string", },
}, Operator: v3.FilterOperatorLessThan,
Operator: v3.FilterOperatorLessThan, Value: data[len(data)-1].Data["id"],
Value: data[len(data)-1].Data["id"], })
}) }
}
if uint64(len(data)) >= pageSize { if uint64(len(data)) >= pageSize {
break break
}
} else {
// TRACE
// we are updating the offset and limit based on the number of traces we have found in the current timerange
// eg -
// 1)offset = 0, limit = 100, tsRanges = [t1, t10], [t10, 20], [t20, t30]
//
// if 100 traces are there in [t1, t10] then 100 will return immediately.
// if 10 traces are there in [t1, t10] then we get 10, set offset to 0 and limit to 90, search in the next timerange of [t10, 20]
// if we don't find any trace in [t1, t10], then we search in [t10, 20] with offset=0, limit=100
//
// 2) offset = 50, limit = 100, tsRanges = [t1, t10], [t10, 20], [t20, t30]
//
// If we find 150 traces with limit=150 and offset=0 in [t1, t10] then we return immediately 100 traces
// If we find 50 in [t1, t10] with limit=150 and offset=0 then it will set limit = 100 and offset=0 and search in the next timerange of [t10, 20]
// if we don't find any trace in [t1, t10], then we search in [t10, 20] with limit=150 and offset=0
// max limit + offset is 10k for pagination
if tracesLimit > constants.TRACE_V4_MAX_PAGINATION_LIMIT {
return nil, nil, fmt.Errorf("maximum traces that can be paginated is 10000")
}
params.CompositeQuery.BuilderQueries[qName].Offset = 0
params.CompositeQuery.BuilderQueries[qName].Limit = tracesLimit
queries, err := q.builder.PrepareQueries(params)
if err != nil {
return nil, nil, err
}
for name, query := range queries {
rowList, err := q.reader.GetListResultV3(ctx, query)
if err != nil {
errs := []error{err}
errQueriesByName := map[string]error{
name: err,
}
return nil, errQueriesByName, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...))
}
length += uint64(len(rowList))
// skip the traces unless offset is 0
for _, row := range rowList {
if offset == 0 {
data = append(data, row)
} else {
offset--
}
}
}
tracesLimit = tracesLimit - length
if uint64(len(data)) >= limit {
break
}
} }
} }
res = append(res, &v3.Result{ res = append(res, &v3.Result{
@ -369,14 +436,24 @@ func (q *querier) runLogsListQuery(ctx context.Context, params *v3.QueryRangePar
func (q *querier) runBuilderListQueries(ctx context.Context, params *v3.QueryRangeParamsV3) ([]*v3.Result, map[string]error, error) { func (q *querier) runBuilderListQueries(ctx context.Context, params *v3.QueryRangeParamsV3) ([]*v3.Result, map[string]error, error) {
// List query has support for only one query. // List query has support for only one query.
if q.UseLogsNewSchema && params.CompositeQuery != nil && len(params.CompositeQuery.BuilderQueries) == 1 { // we are skipping for PanelTypeTrace as it has a custom order by regardless of what's in the payload
if params.CompositeQuery != nil &&
len(params.CompositeQuery.BuilderQueries) == 1 &&
params.CompositeQuery.PanelType != v3.PanelTypeTrace {
for _, v := range params.CompositeQuery.BuilderQueries { for _, v := range params.CompositeQuery.BuilderQueries {
if (v.DataSource == v3.DataSourceLogs && !q.UseLogsNewSchema) ||
(v.DataSource == v3.DataSourceTraces && !q.UseTraceNewSchema) {
break
}
// only allow of logs queries with timestamp ordering desc // only allow of logs queries with timestamp ordering desc
if v.DataSource == v3.DataSourceLogs && len(v.OrderBy) == 1 && v.OrderBy[0].ColumnName == "timestamp" && v.OrderBy[0].Order == "desc" { // TODO(nitya): allow for timestamp asc
startEndArr := utils.GetLogsListTsRanges(params.Start, params.End) if (v.DataSource == v3.DataSourceLogs || v.DataSource == v3.DataSourceTraces) &&
if len(startEndArr) > 0 { len(v.OrderBy) == 1 &&
return q.runLogsListQuery(ctx, params, startEndArr) v.OrderBy[0].ColumnName == "timestamp" &&
} v.OrderBy[0].Order == "desc" {
startEndArr := utils.GetListTsRanges(params.Start, params.End)
return q.runWindowBasedListQuery(ctx, params, startEndArr)
} }
} }
} }
@ -416,13 +493,13 @@ func (q *querier) runBuilderListQueries(ctx context.Context, params *v3.QueryRan
close(ch) close(ch)
var errs []error var errs []error
errQuriesByName := make(map[string]error) errQueriesByName := make(map[string]error)
res := make([]*v3.Result, 0) res := make([]*v3.Result, 0)
// read values from the channel // read values from the channel
for r := range ch { for r := range ch {
if r.Err != nil { if r.Err != nil {
errs = append(errs, r.Err) errs = append(errs, r.Err)
errQuriesByName[r.Name] = r.Err errQueriesByName[r.Name] = r.Err
continue continue
} }
res = append(res, &v3.Result{ res = append(res, &v3.Result{
@ -431,7 +508,7 @@ func (q *querier) runBuilderListQueries(ctx context.Context, params *v3.QueryRan
}) })
} }
if len(errs) != 0 { if len(errs) != 0 {
return nil, errQuriesByName, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...)) return nil, errQueriesByName, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...))
} }
return res, nil, nil return res, nil, nil
} }

View File

@ -5,15 +5,21 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"math" "math"
"regexp"
"strings" "strings"
"testing" "testing"
"time" "time"
cmock "github.com/srikanthccv/ClickHouse-go-mock"
"github.com/stretchr/testify/require"
"go.signoz.io/signoz/pkg/query-service/app/clickhouseReader"
"go.signoz.io/signoz/pkg/query-service/app/queryBuilder" "go.signoz.io/signoz/pkg/query-service/app/queryBuilder"
tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3" tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3"
"go.signoz.io/signoz/pkg/query-service/cache/inmemory" "go.signoz.io/signoz/pkg/query-service/cache/inmemory"
"go.signoz.io/signoz/pkg/query-service/featureManager"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3" v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/query-service/querycache" "go.signoz.io/signoz/pkg/query-service/querycache"
"go.signoz.io/signoz/pkg/query-service/utils"
) )
func minTimestamp(series []*v3.Series) int64 { func minTimestamp(series []*v3.Series) int64 {
@ -798,8 +804,8 @@ func TestV2QueryRangeValueType(t *testing.T) {
} }
q := NewQuerier(opts) q := NewQuerier(opts)
expectedTimeRangeInQueryString := []string{ expectedTimeRangeInQueryString := []string{
fmt.Sprintf("unix_milli >= %d AND unix_milli < %d", 1675115520000, 1675115580000+120*60*1000), // 31st Jan, 03:23:00 to 31st Jan, 05:23:00 fmt.Sprintf("unix_milli >= %d AND unix_milli < %d", 1675115520000, 1675115580000+120*60*1000), // 31st Jan, 03:23:00 to 31st Jan, 05:23:00
fmt.Sprintf("unix_milli >= %d AND unix_milli < %d", 1675115580000+120*60*1000, 1675115580000+180*60*1000), // 31st Jan, 05:23:00 to 31st Jan, 06:23:00 fmt.Sprintf("unix_milli >= %d AND unix_milli < %d", 1675115580000+120*60*1000, 1675115580000+180*60*1000), // 31st Jan, 05:23:00 to 31st Jan, 06:23:00
fmt.Sprintf("timestamp >= '%d' AND timestamp <= '%d'", (1675119196722)*int64(1000000), (1675126396722)*int64(1000000)), // 31st Jan, 05:23:00 to 31st Jan, 06:23:00 fmt.Sprintf("timestamp >= '%d' AND timestamp <= '%d'", (1675119196722)*int64(1000000), (1675126396722)*int64(1000000)), // 31st Jan, 05:23:00 to 31st Jan, 06:23:00
} }
@ -1178,3 +1184,304 @@ func TestV2QueryRangeValueTypePromQL(t *testing.T) {
} }
} }
} }
type regexMatcher struct {
}
func (m *regexMatcher) Match(expectedSQL, actualSQL string) error {
re, err := regexp.Compile(expectedSQL)
if err != nil {
return err
}
if !re.MatchString(actualSQL) {
return fmt.Errorf("expected query to contain %s, got %s", expectedSQL, actualSQL)
}
return nil
}
func Test_querier_runWindowBasedListQuery(t *testing.T) {
params := &v3.QueryRangeParamsV3{
Start: 1722171576000000000, // July 28, 2024 6:29:36 PM
End: 1722262800000000000, // July 29, 2024 7:50:00 PM
CompositeQuery: &v3.CompositeQuery{
PanelType: v3.PanelTypeList,
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
QueryName: "A",
Expression: "A",
DataSource: v3.DataSourceTraces,
PageSize: 10,
Limit: 100,
StepInterval: 60,
AggregateOperator: v3.AggregateOperatorNoOp,
SelectColumns: []v3.AttributeKey{{Key: "serviceName"}},
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{},
},
},
},
},
}
tsRanges := []utils.LogsListTsRange{
{
Start: 1722259200000000000, // July 29, 2024 6:50:00 PM
End: 1722262800000000000, // July 29, 2024 7:50:00 PM
},
{
Start: 1722252000000000000, // July 29, 2024 4:50:00 PM
End: 1722259200000000000, // July 29, 2024 6:50:00 PM
},
{
Start: 1722237600000000000, // July 29, 2024 12:50:00 PM
End: 1722252000000000000, // July 29, 2024 4:50:00 PM
},
{
Start: 1722208800000000000, // July 29, 2024 4:50:00 AM
End: 1722237600000000000, // July 29, 2024 12:50:00 PM
},
{
Start: 1722171576000000000, // July 28, 2024 6:29:36 PM
End: 1722208800000000000, // July 29, 2024 4:50:00 AM
},
}
type queryParams struct {
start int64
end int64
limit uint64
offset uint64
}
type queryResponse struct {
expectedQuery string
timestamps []uint64
}
// create test struct with moc data i.e array of timestamps, limit, offset and expected results
testCases := []struct {
name string
queryResponses []queryResponse
queryParams queryParams
expectedTimestamps []int64
expectedError bool
}{
{
name: "should return correct timestamps when querying within time window",
queryResponses: []queryResponse{
{
expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 2",
timestamps: []uint64{1722259300000000000, 1722259400000000000},
},
},
queryParams: queryParams{
start: 1722171576000000000,
end: 1722262800000000000,
limit: 2,
offset: 0,
},
expectedTimestamps: []int64{1722259300000000000, 1722259400000000000},
},
{
name: "all data not in first windows",
queryResponses: []queryResponse{
{
expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 3",
timestamps: []uint64{1722259300000000000, 1722259400000000000},
},
{
expectedQuery: ".*(timestamp >= '1722252000000000000' AND timestamp <= '1722259200000000000').* DESC LIMIT 1",
timestamps: []uint64{1722253000000000000},
},
},
queryParams: queryParams{
start: 1722171576000000000,
end: 1722262800000000000,
limit: 3,
offset: 0,
},
expectedTimestamps: []int64{1722259300000000000, 1722259400000000000, 1722253000000000000},
},
{
name: "data in multiple windows",
queryResponses: []queryResponse{
{
expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 5",
timestamps: []uint64{1722259300000000000, 1722259400000000000},
},
{
expectedQuery: ".*(timestamp >= '1722252000000000000' AND timestamp <= '1722259200000000000').* DESC LIMIT 3",
timestamps: []uint64{1722253000000000000},
},
{
expectedQuery: ".*(timestamp >= '1722237600000000000' AND timestamp <= '1722252000000000000').* DESC LIMIT 2",
timestamps: []uint64{1722237700000000000},
},
{
expectedQuery: ".*(timestamp >= '1722208800000000000' AND timestamp <= '1722237600000000000').* DESC LIMIT 1",
timestamps: []uint64{},
},
{
expectedQuery: ".*(timestamp >= '1722171576000000000' AND timestamp <= '1722208800000000000').* DESC LIMIT 1",
timestamps: []uint64{},
},
},
queryParams: queryParams{
start: 1722171576000000000,
end: 1722262800000000000,
limit: 5,
offset: 0,
},
expectedTimestamps: []int64{1722259300000000000, 1722259400000000000, 1722253000000000000, 1722237700000000000},
},
{
name: "query with offset",
queryResponses: []queryResponse{
{
expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 7",
timestamps: []uint64{1722259210000000000, 1722259220000000000, 1722259230000000000},
},
{
expectedQuery: ".*(timestamp >= '1722252000000000000' AND timestamp <= '1722259200000000000').* DESC LIMIT 4",
timestamps: []uint64{1722253000000000000, 1722254000000000000, 1722255000000000000},
},
{
expectedQuery: ".*(timestamp >= '1722237600000000000' AND timestamp <= '1722252000000000000').* DESC LIMIT 1",
timestamps: []uint64{1722237700000000000},
},
},
queryParams: queryParams{
start: 1722171576000000000,
end: 1722262800000000000,
limit: 4,
offset: 3,
},
expectedTimestamps: []int64{1722253000000000000, 1722254000000000000, 1722255000000000000, 1722237700000000000},
},
{
name: "query with offset and limit- data spread across multiple windows",
queryResponses: []queryResponse{
{
expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 11",
timestamps: []uint64{},
},
{
expectedQuery: ".*(timestamp >= '1722252000000000000' AND timestamp <= '1722259200000000000').* DESC LIMIT 11",
timestamps: []uint64{1722253000000000000, 1722254000000000000, 1722255000000000000},
},
{
expectedQuery: ".*(timestamp >= '1722237600000000000' AND timestamp <= '1722252000000000000').* DESC LIMIT 8",
timestamps: []uint64{1722237700000000000, 1722237800000000000, 1722237900000000000, 1722237910000000000, 1722237920000000000},
},
{
expectedQuery: ".*(timestamp >= '1722208800000000000' AND timestamp <= '1722237600000000000').* DESC LIMIT 3",
timestamps: []uint64{1722208810000000000, 1722208820000000000, 1722208830000000000},
},
},
queryParams: queryParams{
start: 1722171576000000000,
end: 1722262800000000000,
limit: 5,
offset: 6,
},
expectedTimestamps: []int64{1722237910000000000, 1722237920000000000, 1722208810000000000, 1722208820000000000, 1722208830000000000},
},
{
name: "don't allow pagination to get more than 10k spans",
queryResponses: []queryResponse{},
queryParams: queryParams{
start: 1722171576000000000,
end: 1722262800000000000,
limit: 10,
offset: 9991,
},
expectedError: true,
},
}
cols := []cmock.ColumnType{
{Name: "timestamp", Type: "UInt64"},
{Name: "name", Type: "String"},
}
testName := "name"
options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace")
// iterate over test data, create reader and run test
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Setup mock
mock, err := cmock.NewClickHouseWithQueryMatcher(nil, &regexMatcher{})
require.NoError(t, err, "Failed to create ClickHouse mock")
// Configure mock responses
for _, response := range tc.queryResponses {
values := make([][]any, 0, len(response.timestamps))
for _, ts := range response.timestamps {
values = append(values, []any{&ts, &testName})
}
// if len(values) > 0 {
mock.ExpectQuery(response.expectedQuery).WillReturnRows(
cmock.NewRows(cols, values),
)
// }
}
// Create reader and querier
reader := clickhouseReader.NewReaderFromClickhouseConnection(
mock,
options,
nil,
"",
featureManager.StartManager(),
"",
true,
)
q := &querier{
reader: reader,
builder: queryBuilder.NewQueryBuilder(
queryBuilder.QueryBuilderOptions{
BuildTraceQuery: tracesV3.PrepareTracesQuery,
},
featureManager.StartManager(),
),
}
// Update query parameters
params.Start = tc.queryParams.start
params.End = tc.queryParams.end
params.CompositeQuery.BuilderQueries["A"].Limit = tc.queryParams.limit
params.CompositeQuery.BuilderQueries["A"].Offset = tc.queryParams.offset
// Execute query
results, errMap, err := q.runWindowBasedListQuery(context.Background(), params, tsRanges)
if tc.expectedError {
require.Error(t, err)
return
}
// Assertions
require.NoError(t, err, "Query execution failed")
require.Nil(t, errMap, "Unexpected error map in results")
require.Len(t, results, 1, "Expected exactly one result set")
result := results[0]
require.Equal(t, "A", result.QueryName, "Incorrect query name in results")
require.Len(t, result.List, len(tc.expectedTimestamps),
"Result count mismatch: got %d results, expected %d",
len(result.List), len(tc.expectedTimestamps))
for i, expected := range tc.expectedTimestamps {
require.Equal(t, expected, result.List[i].Timestamp.UnixNano(),
"Timestamp mismatch at index %d: got %d, expected %d",
i, result.List[i].Timestamp.UnixNano(), expected)
}
// Verify mock expectations
err = mock.ExpectationsWereMet()
require.NoError(t, err, "Mock expectations were not met")
})
}
}

View File

@ -590,3 +590,5 @@ var StaticFieldsTraces = map[string]v3.AttributeKey{
IsColumn: true, IsColumn: true,
}, },
} }
const TRACE_V4_MAX_PAGINATION_LIMIT = 10000

View File

@ -546,6 +546,9 @@ type SignozLogV2 struct {
SeverityText string `json:"severity_text" ch:"severity_text"` SeverityText string `json:"severity_text" ch:"severity_text"`
SeverityNumber uint8 `json:"severity_number" ch:"severity_number"` SeverityNumber uint8 `json:"severity_number" ch:"severity_number"`
Body string `json:"body" ch:"body"` Body string `json:"body" ch:"body"`
ScopeName string `json:"scope_name" ch:"scope_name"`
ScopeVersion string `json:"scope_version" ch:"scope_version"`
ScopeString map[string]string `json:"scope_string" ch:"scope_string"`
Resources_string map[string]string `json:"resources_string" ch:"resources_string"` Resources_string map[string]string `json:"resources_string" ch:"resources_string"`
Attributes_string map[string]string `json:"attributes_string" ch:"attributes_string"` Attributes_string map[string]string `json:"attributes_string" ch:"attributes_string"`
Attributes_number map[string]float64 `json:"attributes_float" ch:"attributes_number"` Attributes_number map[string]float64 `json:"attributes_float" ch:"attributes_number"`

View File

@ -9,7 +9,7 @@ type LogsListTsRange struct {
End int64 End int64
} }
func GetLogsListTsRanges(start, end int64) []LogsListTsRange { func GetListTsRanges(start, end int64) []LogsListTsRange {
startNano := GetEpochNanoSecs(start) startNano := GetEpochNanoSecs(start)
endNano := GetEpochNanoSecs(end) endNano := GetEpochNanoSecs(end)
result := []LogsListTsRange{} result := []LogsListTsRange{}

View File

@ -7,7 +7,7 @@ import (
v3 "go.signoz.io/signoz/pkg/query-service/model/v3" v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
) )
func TestLogsListTsRange(t *testing.T) { func TestListTsRange(t *testing.T) {
startEndData := []struct { startEndData := []struct {
name string name string
start int64 start int64
@ -44,7 +44,7 @@ func TestLogsListTsRange(t *testing.T) {
} }
for _, test := range startEndData { for _, test := range startEndData {
res := GetLogsListTsRanges(test.start, test.end) res := GetListTsRanges(test.start, test.end)
for i, v := range res { for i, v := range res {
if test.res[i].Start != v.Start || test.res[i].End != v.End { if test.res[i].Start != v.Start || test.res[i].End != v.End {
t.Errorf("expected range was %v - %v, got %v - %v", v.Start, v.End, test.res[i].Start, test.res[i].End) t.Errorf("expected range was %v - %v, got %v - %v", v.Start, v.End, test.res[i].Start, test.res[i].End)