From aadf2a3ac700112e6cfd42c2f9c08251c0bcbb52 Mon Sep 17 00:00:00 2001 From: Nityananda Gohain Date: Thu, 22 May 2025 15:05:41 +0530 Subject: [PATCH] =?UTF-8?q?fix:=20logs=20window=20based=20pagination=20to?= =?UTF-8?q?=20pageSize=20offset=20instead=20of=20using=E2=80=A6=20(#6830)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix: logs window based pagination to pageSize offset instead of using id filter * fix: add support for asc * fix: remove unwanted code --------- Co-authored-by: Srikanth Chekuri --- .../app/logs/v4/query_builder.go | 10 +- .../app/logs/v4/query_builder_test.go | 4 +- pkg/query-service/app/querier/querier.go | 176 ++-- pkg/query-service/app/querier/querier_test.go | 795 +++++++++++++++++- pkg/query-service/app/querier/v2/querier.go | 177 ++-- .../app/querier/v2/querier_test.go | 793 ++++++++++++++++- 6 files changed, 1739 insertions(+), 216 deletions(-) diff --git a/pkg/query-service/app/logs/v4/query_builder.go b/pkg/query-service/app/logs/v4/query_builder.go index e97ec9a75d..470f674aa3 100644 --- a/pkg/query-service/app/logs/v4/query_builder.go +++ b/pkg/query-service/app/logs/v4/query_builder.go @@ -284,7 +284,7 @@ func orderByAttributeKeyTags(panelType v3.PanelType, items []v3.OrderBy, tags [] if len(orderByArray) == 0 { if panelType == v3.PanelTypeList { - orderByArray = append(orderByArray, constants.TIMESTAMP+" DESC") + orderByArray = append(orderByArray, constants.TIMESTAMP+" DESC", "id DESC") } else { orderByArray = append(orderByArray, "value DESC") } @@ -541,6 +541,7 @@ func PrepareLogsQuery(start, end int64, queryType v3.QueryType, panelType v3.Pan return "", fmt.Errorf("max limit exceeded") } + // when pageSize is provided, we need to fetch the logs in chunks if mq.PageSize > 0 { if mq.Limit > 0 && mq.Offset+mq.PageSize > mq.Limit { query = logsV3.AddLimitToQuery(query, mq.Limit-mq.Offset) @@ -548,12 +549,9 @@ func PrepareLogsQuery(start, end int64, queryType v3.QueryType, panelType v3.Pan query = logsV3.AddLimitToQuery(query, mq.PageSize) } - // add offset to the query only if it is not orderd by timestamp. - if !logsV3.IsOrderByTs(mq.OrderBy) { - query = logsV3.AddOffsetToQuery(query, mq.Offset) - } - + query = logsV3.AddOffsetToQuery(query, mq.Offset) } else { + // when pageSize is not provided, we fetch all the logs in the limit query = logsV3.AddLimitToQuery(query, mq.Limit) } } else if panelType == v3.PanelTypeTable { diff --git a/pkg/query-service/app/logs/v4/query_builder_test.go b/pkg/query-service/app/logs/v4/query_builder_test.go index 75165bab9b..d89cb16ade 100644 --- a/pkg/query-service/app/logs/v4/query_builder_test.go +++ b/pkg/query-service/app/logs/v4/query_builder_test.go @@ -1097,7 +1097,7 @@ func TestPrepareLogsQuery(t *testing.T) { }, want: "SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body, attributes_string, attributes_number, attributes_bool, resources_string, scope_string from " + "signoz_logs.distributed_logs_v2 where (timestamp >= 1680066360726000000 AND timestamp <= 1680066458000000000) AND (ts_bucket_start >= 1680064560 AND ts_bucket_start <= 1680066458) " + - "order by timestamp desc LIMIT 1", + "order by timestamp desc LIMIT 1 OFFSET 0", }, { name: "Test limit greater than pageSize - order by ts", @@ -1122,7 +1122,7 @@ func TestPrepareLogsQuery(t *testing.T) { }, want: "SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body, attributes_string, attributes_number, attributes_bool, resources_string, scope_string from " + "signoz_logs.distributed_logs_v2 where (timestamp >= 1680066360726000000 AND timestamp <= 1680066458000000000) AND (ts_bucket_start >= 1680064560 AND ts_bucket_start <= 1680066458) " + - "AND id < '2TNh4vp2TpiWyLt3SzuadLJF2s4' order by timestamp desc LIMIT 10", + "AND id < '2TNh4vp2TpiWyLt3SzuadLJF2s4' order by timestamp desc LIMIT 10 OFFSET 10", }, { name: "Test limit less than pageSize - order by custom", diff --git a/pkg/query-service/app/querier/querier.go b/pkg/query-service/app/querier/querier.go index 1fdc249cd5..67fe8928f5 100644 --- a/pkg/query-service/app/querier/querier.go +++ b/pkg/query-service/app/querier/querier.go @@ -3,6 +3,7 @@ package querier import ( "context" "fmt" + "strings" "sync" "time" @@ -307,114 +308,108 @@ func (q *querier) runWindowBasedListQuery(ctx context.Context, params *v3.QueryR limit := uint64(0) offset := uint64(0) - // se we are considering only one query + // Get query details and check order direction for name, v := range params.CompositeQuery.BuilderQueries { qName = name pageSize = v.PageSize - - // for traces specifically limit = v.Limit offset = v.Offset + } + + // Check if order is ascending + if strings.ToLower(string(params.CompositeQuery.BuilderQueries[qName].OrderBy[0].Order)) == "asc" { + // Reverse the time ranges for ascending order + for i, j := 0, len(tsRanges)-1; i < j; i, j = i+1, j-1 { + tsRanges[i], tsRanges[j] = tsRanges[j], tsRanges[i] + } + } + + // check if it is a logs query + isLogs := false + if params.CompositeQuery.BuilderQueries[qName].DataSource == v3.DataSourceLogs { + isLogs = true + } + data := []*v3.Row{} - tracesLimit := limit + offset + limitWithOffset := limit + offset + if isLogs { + // for logs we use pageSize to define the current limit and limit to define the absolute limit + limitWithOffset = pageSize + offset + if limit > 0 && offset >= limit { + return nil, nil, fmt.Errorf("max limit exceeded") + } + } for _, v := range tsRanges { params.Start = v.Start params.End = v.End - length := uint64(0) - // this will to run only once - // appending the filter to get the next set of data - if params.CompositeQuery.BuilderQueries[qName].DataSource == v3.DataSourceLogs { - params.CompositeQuery.BuilderQueries[qName].PageSize = pageSize - uint64(len(data)) - queries, err := q.builder.PrepareQueries(params) - if err != nil { - return nil, nil, err - } - for name, query := range queries { - rowList, err := q.reader.GetListResultV3(ctx, query) - if err != nil { - errs := []error{err} - errQueriesByName := map[string]error{ - name: err, - } - return nil, errQueriesByName, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...)) - } - length += uint64(len(rowList)) - data = append(data, rowList...) - } + // max limit + offset is 10k for pagination for traces/logs + // TODO(nitya): define something for logs + if !isLogs && limitWithOffset > constants.TRACE_V4_MAX_PAGINATION_LIMIT { + return nil, nil, fmt.Errorf("maximum traces that can be paginated is 10000") + } - if length > 0 { - params.CompositeQuery.BuilderQueries[qName].Filters.Items = append(params.CompositeQuery.BuilderQueries[qName].Filters.Items, v3.FilterItem{ - Key: v3.AttributeKey{ - Key: "id", - IsColumn: true, - DataType: "string", - }, - Operator: v3.FilterOperatorLessThan, - Value: data[len(data)-1].Data["id"], - }) - } + // we are updating the offset and limit based on the number of traces/logs we have found in the current timerange + // eg - + // 1)offset = 0, limit = 100, tsRanges = [t1, t10], [t10, 20], [t20, t30] + // + // if 100 traces/logs are there in [t1, t10] then 100 will return immediately. + // if 10 traces/logs are there in [t1, t10] then we get 10, set offset to 0 and limit to 90, search in the next timerange of [t10, 20] + // if we don't find any trace in [t1, t10], then we search in [t10, 20] with offset=0, limit=100 - if uint64(len(data)) >= pageSize { - break - } + // + // 2) offset = 50, limit = 100, tsRanges = [t1, t10], [t10, 20], [t20, t30] + // + // If we find 150 traces/logs with limit=150 and offset=0 in [t1, t10] then we return immediately 100 traces/logs + // If we find 50 in [t1, t10] with limit=150 and offset=0 then it will set limit = 100 and offset=0 and search in the next timerange of [t10, 20] + // if we don't find any trace in [t1, t10], then we search in [t10, 20] with limit=150 and offset=0 + + params.CompositeQuery.BuilderQueries[qName].Offset = 0 + // if datasource is logs + if isLogs { + // for logs we use limit to define the absolute limit and pagesize to define the current limit + params.CompositeQuery.BuilderQueries[qName].PageSize = limitWithOffset } else { - // TRACE - // we are updating the offset and limit based on the number of traces we have found in the current timerange - // eg - - // 1)offset = 0, limit = 100, tsRanges = [t1, t10], [t10, 20], [t20, t30] - // - // if 100 traces are there in [t1, t10] then 100 will return immediately. - // if 10 traces are there in [t1, t10] then we get 10, set offset to 0 and limit to 90, search in the next timerange of [t10, 20] - // if we don't find any trace in [t1, t10], then we search in [t10, 20] with offset=0, limit=100 + params.CompositeQuery.BuilderQueries[qName].Limit = limitWithOffset + } - // - // 2) offset = 50, limit = 100, tsRanges = [t1, t10], [t10, 20], [t20, t30] - // - // If we find 150 traces with limit=150 and offset=0 in [t1, t10] then we return immediately 100 traces - // If we find 50 in [t1, t10] with limit=150 and offset=0 then it will set limit = 100 and offset=0 and search in the next timerange of [t10, 20] - // if we don't find any trace in [t1, t10], then we search in [t10, 20] with limit=150 and offset=0 - - // max limit + offset is 10k for pagination - if tracesLimit > constants.TRACE_V4_MAX_PAGINATION_LIMIT { - return nil, nil, fmt.Errorf("maximum traces that can be paginated is 10000") - } - - params.CompositeQuery.BuilderQueries[qName].Offset = 0 - params.CompositeQuery.BuilderQueries[qName].Limit = tracesLimit - queries, err := q.builder.PrepareQueries(params) + queries, err := q.builder.PrepareQueries(params) + if err != nil { + return nil, nil, err + } + for name, query := range queries { + rowList, err := q.reader.GetListResultV3(ctx, query) if err != nil { - return nil, nil, err - } - for name, query := range queries { - rowList, err := q.reader.GetListResultV3(ctx, query) - if err != nil { - errs := []error{err} - errQueriesByName := map[string]error{ - name: err, - } - return nil, errQueriesByName, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...)) + errs := []error{err} + errQueriesByName := map[string]error{ + name: err, } - length += uint64(len(rowList)) + return nil, errQueriesByName, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...)) + } + length += uint64(len(rowList)) - // skip the traces unless offset is 0 - for _, row := range rowList { - if offset == 0 { - data = append(data, row) - } else { - offset-- - } + // skip the traces unless offset is 0 + for _, row := range rowList { + if offset == 0 { + data = append(data, row) + } else { + offset-- } } - tracesLimit = tracesLimit - length + } - if uint64(len(data)) >= limit { - break - } + limitWithOffset = limitWithOffset - length + + if isLogs && uint64(len(data)) >= pageSize { + // for logs + break + } else if !isLogs && uint64(len(data)) >= limit { + // for traces + break } } res = append(res, &v3.Result{ @@ -431,12 +426,17 @@ func (q *querier) runBuilderListQueries(ctx context.Context, params *v3.QueryRan len(params.CompositeQuery.BuilderQueries) == 1 && params.CompositeQuery.PanelType != v3.PanelTypeTrace { for _, v := range params.CompositeQuery.BuilderQueries { - // only allow of logs queries with timestamp ordering desc - // TODO(nitya): allow for timestamp asc - if (v.DataSource == v3.DataSourceLogs || v.DataSource == v3.DataSourceTraces) && + // for logs: allow only when order of timestamp and id is same + if v.DataSource == v3.DataSourceTraces && len(v.OrderBy) == 1 && + v.OrderBy[0].ColumnName == "timestamp" { + startEndArr := utils.GetListTsRanges(params.Start, params.End) + return q.runWindowBasedListQuery(ctx, params, startEndArr) + } else if v.DataSource == v3.DataSourceLogs && + len(v.OrderBy) == 2 && v.OrderBy[0].ColumnName == "timestamp" && - v.OrderBy[0].Order == "desc" { + v.OrderBy[1].ColumnName == "id" && + v.OrderBy[1].Order == v.OrderBy[0].Order { startEndArr := utils.GetListTsRanges(params.Start, params.End) return q.runWindowBasedListQuery(ctx, params, startEndArr) } diff --git a/pkg/query-service/app/querier/querier_test.go b/pkg/query-service/app/querier/querier_test.go index 20e9254ed5..8d33a7cc43 100644 --- a/pkg/query-service/app/querier/querier_test.go +++ b/pkg/query-service/app/querier/querier_test.go @@ -15,8 +15,10 @@ import ( "github.com/SigNoz/signoz/pkg/prometheus" "github.com/SigNoz/signoz/pkg/prometheus/prometheustest" "github.com/SigNoz/signoz/pkg/query-service/app/clickhouseReader" + logsV4 "github.com/SigNoz/signoz/pkg/query-service/app/logs/v4" "github.com/SigNoz/signoz/pkg/query-service/app/queryBuilder" tracesV3 "github.com/SigNoz/signoz/pkg/query-service/app/traces/v3" + tracesV4 "github.com/SigNoz/signoz/pkg/query-service/app/traces/v4" v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3" "github.com/SigNoz/signoz/pkg/query-service/querycache" "github.com/SigNoz/signoz/pkg/query-service/utils" @@ -1164,7 +1166,7 @@ func TestQueryRangeValueTypePromQL(t *testing.T) { } } -func Test_querier_runWindowBasedListQuery(t *testing.T) { +func Test_querier_Traces_runWindowBasedListQueryDesc(t *testing.T) { params := &v3.QueryRangeParamsV3{ Start: 1722171576000000000, // July 28, 2024 6:29:36 PM End: 1722262800000000000, // July 29, 2024 7:50:00 PM @@ -1184,6 +1186,13 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) { Operator: "AND", Items: []v3.FilterItem{}, }, + OrderBy: []v3.OrderBy{ + { + ColumnName: "timestamp", + IsColumn: true, + Order: "DESC", + }, + }, }, }, }, @@ -1237,7 +1246,7 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) { queryResponses: []queryResponse{ { expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 2", - timestamps: []uint64{1722259300000000000, 1722259400000000000}, + timestamps: []uint64{1722259400000000000, 1722259300000000000}, }, }, queryParams: queryParams{ @@ -1246,14 +1255,14 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) { limit: 2, offset: 0, }, - expectedTimestamps: []int64{1722259300000000000, 1722259400000000000}, + expectedTimestamps: []int64{1722259400000000000, 1722259300000000000}, }, { name: "all data not in first windows", queryResponses: []queryResponse{ { expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 3", - timestamps: []uint64{1722259300000000000, 1722259400000000000}, + timestamps: []uint64{1722259400000000000, 1722259300000000000}, }, { expectedQuery: ".*(timestamp >= '1722252000000000000' AND timestamp <= '1722259200000000000').* DESC LIMIT 1", @@ -1266,14 +1275,14 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) { limit: 3, offset: 0, }, - expectedTimestamps: []int64{1722259300000000000, 1722259400000000000, 1722253000000000000}, + expectedTimestamps: []int64{1722259400000000000, 1722259300000000000, 1722253000000000000}, }, { name: "data in multiple windows", queryResponses: []queryResponse{ { expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 5", - timestamps: []uint64{1722259300000000000, 1722259400000000000}, + timestamps: []uint64{1722259400000000000, 1722259300000000000}, }, { expectedQuery: ".*(timestamp >= '1722252000000000000' AND timestamp <= '1722259200000000000').* DESC LIMIT 3", @@ -1298,18 +1307,18 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) { limit: 5, offset: 0, }, - expectedTimestamps: []int64{1722259300000000000, 1722259400000000000, 1722253000000000000, 1722237700000000000}, + expectedTimestamps: []int64{1722259400000000000, 1722259300000000000, 1722253000000000000, 1722237700000000000}, }, { name: "query with offset", queryResponses: []queryResponse{ { expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 7", - timestamps: []uint64{1722259210000000000, 1722259220000000000, 1722259230000000000}, + timestamps: []uint64{1722259230000000000, 1722259220000000000, 1722259210000000000}, }, { expectedQuery: ".*(timestamp >= '1722252000000000000' AND timestamp <= '1722259200000000000').* DESC LIMIT 4", - timestamps: []uint64{1722253000000000000, 1722254000000000000, 1722255000000000000}, + timestamps: []uint64{1722255000000000000, 1722254000000000000, 1722253000000000000}, }, { expectedQuery: ".*(timestamp >= '1722237600000000000' AND timestamp <= '1722252000000000000').* DESC LIMIT 1", @@ -1322,7 +1331,7 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) { limit: 4, offset: 3, }, - expectedTimestamps: []int64{1722253000000000000, 1722254000000000000, 1722255000000000000, 1722237700000000000}, + expectedTimestamps: []int64{1722255000000000000, 1722254000000000000, 1722253000000000000, 1722237700000000000}, }, { name: "query with offset and limit- data spread across multiple windows", @@ -1333,15 +1342,15 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) { }, { expectedQuery: ".*(timestamp >= '1722252000000000000' AND timestamp <= '1722259200000000000').* DESC LIMIT 11", - timestamps: []uint64{1722253000000000000, 1722254000000000000, 1722255000000000000}, + timestamps: []uint64{1722255000000000000, 1722254000000000000, 1722253000000000000}, }, { expectedQuery: ".*(timestamp >= '1722237600000000000' AND timestamp <= '1722252000000000000').* DESC LIMIT 8", - timestamps: []uint64{1722237700000000000, 1722237800000000000, 1722237900000000000, 1722237910000000000, 1722237920000000000}, + timestamps: []uint64{1722237920000000000, 1722237910000000000, 1722237900000000000, 1722237800000000000, 1722237700000000000}, }, { expectedQuery: ".*(timestamp >= '1722208800000000000' AND timestamp <= '1722237600000000000').* DESC LIMIT 3", - timestamps: []uint64{1722208810000000000, 1722208820000000000, 1722208830000000000}, + timestamps: []uint64{1722208830000000000, 1722208820000000000, 1722208810000000000}, }, }, queryParams: queryParams{ @@ -1350,7 +1359,7 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) { limit: 5, offset: 6, }, - expectedTimestamps: []int64{1722237910000000000, 1722237920000000000, 1722208810000000000, 1722208820000000000, 1722208830000000000}, + expectedTimestamps: []int64{1722237800000000000, 1722237700000000000, 1722208830000000000, 1722208820000000000, 1722208810000000000}, }, { name: "don't allow pagination to get more than 10k spans", @@ -1385,11 +1394,11 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) { for _, ts := range response.timestamps { values = append(values, []any{&ts, &testName}) } + // mock.ExpectQuery(response.expectedQuery).WillReturnRows( // if len(values) > 0 { telemetryStore.Mock().ExpectQuery(response.expectedQuery).WillReturnRows( cmock.NewRows(cols, values), ) - // } } // Create reader and querier @@ -1407,7 +1416,7 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) { reader: reader, builder: queryBuilder.NewQueryBuilder( queryBuilder.QueryBuilderOptions{ - BuildTraceQuery: tracesV3.PrepareTracesQuery, + BuildTraceQuery: tracesV4.PrepareTracesQuery, }, ), } @@ -1448,3 +1457,757 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) { }) } } + +func Test_querier_Traces_runWindowBasedListQueryAsc(t *testing.T) { + params := &v3.QueryRangeParamsV3{ + Start: 1722171576000000000, // July 28, 2024 6:29:36 PM + End: 1722262800000000000, // July 29, 2024 7:50:00 PM + CompositeQuery: &v3.CompositeQuery{ + PanelType: v3.PanelTypeList, + BuilderQueries: map[string]*v3.BuilderQuery{ + "A": { + QueryName: "A", + Expression: "A", + DataSource: v3.DataSourceTraces, + PageSize: 10, + Limit: 100, + StepInterval: 60, + AggregateOperator: v3.AggregateOperatorNoOp, + SelectColumns: []v3.AttributeKey{{Key: "serviceName"}}, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{}, + }, + OrderBy: []v3.OrderBy{ + { + ColumnName: "timestamp", + IsColumn: true, + Key: "timestamp", + Order: "asc", + }, + }, + }, + }, + }, + } + + tsRanges := []utils.LogsListTsRange{ + { + Start: 1722259200000000000, // July 29, 2024 6:50:00 PM + End: 1722262800000000000, // July 29, 2024 7:50:00 PM + }, + { + Start: 1722252000000000000, // July 29, 2024 4:50:00 PM + End: 1722259200000000000, // July 29, 2024 6:50:00 PM + }, + { + Start: 1722237600000000000, // July 29, 2024 12:50:00 PM + End: 1722252000000000000, // July 29, 2024 4:50:00 PM + }, + { + Start: 1722208800000000000, // July 29, 2024 4:50:00 AM + End: 1722237600000000000, // July 29, 2024 12:50:00 PM + }, + { + Start: 1722171576000000000, // July 28, 2024 6:29:36 PM + End: 1722208800000000000, // July 29, 2024 4:50:00 AM + }, + } + + type queryParams struct { + start int64 + end int64 + limit uint64 + offset uint64 + } + + type queryResponse struct { + expectedQuery string + timestamps []uint64 + } + + // create test struct with moc data i.e array of timestamps, limit, offset and expected results + testCases := []struct { + name string + queryResponses []queryResponse + queryParams queryParams + expectedTimestamps []int64 + expectedError bool + }{ + { + name: "should return correct timestamps when querying within time window", + queryResponses: []queryResponse{ + { + expectedQuery: ".*(timestamp >= '1722171576000000000' AND timestamp <= '1722208800000000000').* asc LIMIT 2", + timestamps: []uint64{1722171576000000000, 1722171577000000000}, + }, + }, + queryParams: queryParams{ + start: 1722171576000000000, + end: 1722262800000000000, + limit: 2, + offset: 0, + }, + expectedTimestamps: []int64{1722171576000000000, 1722171577000000000}, + }, + { + name: "all data not in first windows", + queryResponses: []queryResponse{ + { + expectedQuery: ".*(timestamp >= '1722171576000000000' AND timestamp <= '1722208800000000000').* asc LIMIT 3", + timestamps: []uint64{1722259200000000000, 1722259201000000000}, + }, + { + expectedQuery: ".*(timestamp >= '1722208800000000000' AND timestamp <= '1722237600000000000').* asc LIMIT 1", + timestamps: []uint64{1722208800100000000}, + }, + }, + queryParams: queryParams{ + start: 1722171576000000000, + end: 1722262800000000000, + limit: 3, + offset: 0, + }, + expectedTimestamps: []int64{1722259200000000000, 1722259201000000000, 1722208800100000000}, + }, + { + name: "query with offset and limit- data spread across multiple windows", + queryResponses: []queryResponse{ + { + expectedQuery: ".*(timestamp >= '1722171576000000000' AND timestamp <= '1722208800000000000').* asc LIMIT 11", + timestamps: []uint64{}, + }, + { + expectedQuery: ".*(timestamp >= '1722208800000000000' AND timestamp <= '1722237600000000000').* asc LIMIT 11", + timestamps: []uint64{1722208801000000000, 1722208802000000000, 1722208803000000000}, + }, + { + expectedQuery: ".*(timestamp >= '1722237600000000000' AND timestamp <= '1722252000000000000').* asc LIMIT 8", + timestamps: []uint64{1722237600010000000, 1722237600020000000, 1722237600030000000, 1722237600040000000, 1722237600050000000}, + }, + { + expectedQuery: ".*(timestamp >= '1722252000000000000' AND timestamp <= '1722259200000000000').* asc LIMIT 3", + timestamps: []uint64{1722252000100000000, 1722252000200000000, 1722252000300000000}, + }, + }, + queryParams: queryParams{ + start: 1722171576000000000, + end: 1722262800000000000, + limit: 5, + offset: 6, + }, + expectedTimestamps: []int64{1722237600040000000, 1722237600050000000, 1722252000100000000, 1722252000200000000, 1722252000300000000}, + }, + } + + cols := []cmock.ColumnType{ + {Name: "timestamp", Type: "UInt64"}, + {Name: "name", Type: "String"}, + } + testName := "name" + + options := clickhouseReader.NewOptions("", "", "archiveNamespace") + + // iterate over test data, create reader and run test + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Setup mock + telemetryStore := telemetrystoretest.New(telemetrystore.Config{Provider: "clickhouse"}, sqlmock.QueryMatcherRegexp) + + // Configure mock responses + for _, response := range tc.queryResponses { + values := make([][]any, 0, len(response.timestamps)) + for _, ts := range response.timestamps { + values = append(values, []any{&ts, &testName}) + } + telemetryStore.Mock().ExpectQuery(response.expectedQuery).WillReturnRows( + cmock.NewRows(cols, values), + ) + } + + // Create reader and querier + reader := clickhouseReader.NewReaderFromClickhouseConnection( + options, + nil, + telemetryStore, + prometheustest.New(instrumentationtest.New().Logger(), prometheus.Config{}), + "", + time.Duration(time.Second), + nil, + ) + + q := &querier{ + reader: reader, + builder: queryBuilder.NewQueryBuilder( + queryBuilder.QueryBuilderOptions{ + BuildTraceQuery: tracesV3.PrepareTracesQuery, + }, + ), + } + + // Update query parameters + params.Start = tc.queryParams.start + params.End = tc.queryParams.end + params.CompositeQuery.BuilderQueries["A"].Limit = tc.queryParams.limit + params.CompositeQuery.BuilderQueries["A"].Offset = tc.queryParams.offset + + // Create a copy of tsRanges before passing to the function + // required because tsRanges is modified in the function + tsRangesCopy := make([]utils.LogsListTsRange, len(tsRanges)) + copy(tsRangesCopy, tsRanges) + + results, errMap, err := q.runWindowBasedListQuery(context.Background(), params, tsRangesCopy) + + if tc.expectedError { + require.Error(t, err) + return + } + + // Assertions + require.NoError(t, err, "Query execution failed") + require.Nil(t, errMap, "Unexpected error map in results") + require.Len(t, results, 1, "Expected exactly one result set") + + result := results[0] + require.Equal(t, "A", result.QueryName, "Incorrect query name in results") + require.Len(t, result.List, len(tc.expectedTimestamps), + "Result count mismatch: got %d results, expected %d", + len(result.List), len(tc.expectedTimestamps)) + + for i, expected := range tc.expectedTimestamps { + require.Equal(t, expected, result.List[i].Timestamp.UnixNano(), + "Timestamp mismatch at index %d: got %d, expected %d", + i, result.List[i].Timestamp.UnixNano(), expected) + } + + // Verify mock expectations + err = telemetryStore.Mock().ExpectationsWereMet() + require.NoError(t, err, "Mock expectations were not met") + }) + } +} + +func Test_querier_Logs_runWindowBasedListQueryDesc(t *testing.T) { + params := &v3.QueryRangeParamsV3{ + Start: 1722171576000000000, // July 28, 2024 6:29:36 PM + End: 1722262800000000000, // July 29, 2024 7:50:00 PM + CompositeQuery: &v3.CompositeQuery{ + PanelType: v3.PanelTypeList, + BuilderQueries: map[string]*v3.BuilderQuery{ + "A": { + QueryName: "A", + Expression: "A", + DataSource: v3.DataSourceLogs, + PageSize: 10, + Limit: 100, + StepInterval: 60, + AggregateOperator: v3.AggregateOperatorNoOp, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{}, + }, + OrderBy: []v3.OrderBy{ + { + ColumnName: "timestamp", + Order: "DESC", + }, + { + ColumnName: "id", + Order: "DESC", + }, + }, + }, + }, + }, + } + + tsRanges := []utils.LogsListTsRange{ + { + Start: 1722259200000000000, // July 29, 2024 6:50:00 PM + End: 1722262800000000000, // July 29, 2024 7:50:00 PM + }, + { + Start: 1722252000000000000, // July 29, 2024 4:50:00 PM + End: 1722259200000000000, // July 29, 2024 6:50:00 PM + }, + { + Start: 1722237600000000000, // July 29, 2024 12:50:00 PM + End: 1722252000000000000, // July 29, 2024 4:50:00 PM + }, + { + Start: 1722208800000000000, // July 29, 2024 4:50:00 AM + End: 1722237600000000000, // July 29, 2024 12:50:00 PM + }, + { + Start: 1722171576000000000, // July 28, 2024 6:29:36 PM + End: 1722208800000000000, // July 29, 2024 4:50:00 AM + }, + } + + type queryParams struct { + start int64 + end int64 + limit uint64 + offset uint64 + pageSize uint64 + } + + type queryResponse struct { + expectedQuery string + timestamps []uint64 + } + + // create test struct with moc data i.e array of timestamps, limit, offset and expected results + testCases := []struct { + name string + queryResponses []queryResponse + queryParams queryParams + expectedTimestamps []int64 + expectedError bool + }{ + { + name: "should return correct timestamps when querying within time window", + queryResponses: []queryResponse{ + { + expectedQuery: ".*(timestamp >= 1722259200000000000 AND timestamp <= 1722262800000000000).* DESC LIMIT 2", + timestamps: []uint64{1722259400000000000, 1722259300000000000}, + }, + }, + queryParams: queryParams{ + start: 1722171576000000000, + end: 1722262800000000000, + pageSize: 2, + offset: 0, + }, + expectedTimestamps: []int64{1722259400000000000, 1722259300000000000}, + }, + { + name: "all data not in first windows", + queryResponses: []queryResponse{ + { + expectedQuery: ".*(timestamp >= 1722259200000000000 AND timestamp <= 1722262800000000000).* DESC LIMIT 3", + timestamps: []uint64{1722259400000000000, 1722259300000000000}, + }, + { + expectedQuery: ".*(timestamp >= 1722252000000000000 AND timestamp <= 1722259200000000000).* DESC LIMIT 1", + timestamps: []uint64{1722253000000000000}, + }, + }, + queryParams: queryParams{ + start: 1722171576000000000, + end: 1722262800000000000, + pageSize: 3, + offset: 0, + }, + expectedTimestamps: []int64{1722259400000000000, 1722259300000000000, 1722253000000000000}, + }, + { + name: "data in multiple windows", + queryResponses: []queryResponse{ + { + expectedQuery: ".*(timestamp >= 1722259200000000000 AND timestamp <= 1722262800000000000).* DESC LIMIT 5", + timestamps: []uint64{1722259400000000000, 1722259300000000000}, + }, + { + expectedQuery: ".*(timestamp >= 1722252000000000000 AND timestamp <= 1722259200000000000).* DESC LIMIT 3", + timestamps: []uint64{1722253000000000000}, + }, + { + expectedQuery: ".*(timestamp >= 1722237600000000000 AND timestamp <= 1722252000000000000).* DESC LIMIT 2", + timestamps: []uint64{1722237700000000000}, + }, + { + expectedQuery: ".*(timestamp >= 1722208800000000000 AND timestamp <= 1722237600000000000).* DESC LIMIT 1", + timestamps: []uint64{}, + }, + { + expectedQuery: ".*(timestamp >= 1722171576000000000 AND timestamp <= 1722208800000000000).* DESC LIMIT 1", + timestamps: []uint64{}, + }, + }, + queryParams: queryParams{ + start: 1722171576000000000, + end: 1722262800000000000, + pageSize: 5, + offset: 0, + }, + expectedTimestamps: []int64{1722259400000000000, 1722259300000000000, 1722253000000000000, 1722237700000000000}, + }, + { + name: "query with offset", + queryResponses: []queryResponse{ + { + expectedQuery: ".*(timestamp >= 1722259200000000000 AND timestamp <= 1722262800000000000).* DESC LIMIT 7", + timestamps: []uint64{1722259230000000000, 1722259220000000000, 1722259210000000000}, + }, + { + expectedQuery: ".*(timestamp >= 1722252000000000000 AND timestamp <= 1722259200000000000).* DESC LIMIT 4", + timestamps: []uint64{1722255000000000000, 1722254000000000000, 1722253000000000000}, + }, + { + expectedQuery: ".*(timestamp >= 1722237600000000000 AND timestamp <= 1722252000000000000).* DESC LIMIT 1", + timestamps: []uint64{1722237700000000000}, + }, + }, + queryParams: queryParams{ + start: 1722171576000000000, + end: 1722262800000000000, + pageSize: 4, + offset: 3, + }, + expectedTimestamps: []int64{1722255000000000000, 1722254000000000000, 1722253000000000000, 1722237700000000000}, + }, + { + name: "query with offset and limit- data spread across multiple windows", + queryResponses: []queryResponse{ + { + expectedQuery: ".*(timestamp >= 1722259200000000000 AND timestamp <= 1722262800000000000).* DESC LIMIT 11", + timestamps: []uint64{}, + }, + { + expectedQuery: ".*(timestamp >= 1722252000000000000 AND timestamp <= 1722259200000000000).* DESC LIMIT 11", + timestamps: []uint64{1722255000000000000, 1722254000000000000, 1722253000000000000}, + }, + { + expectedQuery: ".*(timestamp >= 1722237600000000000 AND timestamp <= 1722252000000000000).* DESC LIMIT 8", + timestamps: []uint64{1722237920000000000, 1722237910000000000, 1722237900000000000, 1722237800000000000, 1722237700000000000}, + }, + { + expectedQuery: ".*(timestamp >= 1722208800000000000 AND timestamp <= 1722237600000000000).* DESC LIMIT 3", + timestamps: []uint64{1722208830000000000, 1722208820000000000, 1722208810000000000}, + }, + }, + queryParams: queryParams{ + start: 1722171576000000000, + end: 1722262800000000000, + pageSize: 5, + offset: 6, + }, + expectedTimestamps: []int64{1722237800000000000, 1722237700000000000, 1722208830000000000, 1722208820000000000, 1722208810000000000}, + }, + { + name: "dont allow pagination to get more than speficied limit", + queryResponses: []queryResponse{}, + queryParams: queryParams{ + start: 1722171576000000000, + end: 1722262800000000000, + limit: 200, + offset: 210, + pageSize: 30, + }, + expectedError: true, + }, + } + + cols := []cmock.ColumnType{ + {Name: "timestamp", Type: "UInt64"}, + {Name: "name", Type: "String"}, + } + testName := "name" + + options := clickhouseReader.NewOptions("", "", "archiveNamespace") + + // iterate over test data, create reader and run test + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Setup mock + telemetryStore := telemetrystoretest.New(telemetrystore.Config{Provider: "clickhouse"}, sqlmock.QueryMatcherRegexp) + + // Configure mock responses + for _, response := range tc.queryResponses { + values := make([][]any, 0, len(response.timestamps)) + for _, ts := range response.timestamps { + values = append(values, []any{&ts, &testName}) + } + telemetryStore.Mock().ExpectQuery(response.expectedQuery).WillReturnRows( + cmock.NewRows(cols, values), + ) + } + + // Create reader and querier + reader := clickhouseReader.NewReaderFromClickhouseConnection( + options, + nil, + telemetryStore, + prometheustest.New(instrumentationtest.New().Logger(), prometheus.Config{}), + "", + time.Duration(time.Second), + nil, + ) + + q := &querier{ + reader: reader, + builder: queryBuilder.NewQueryBuilder( + queryBuilder.QueryBuilderOptions{ + BuildLogQuery: logsV4.PrepareLogsQuery, + }, + ), + } + // Update query parameters + params.Start = tc.queryParams.start + params.End = tc.queryParams.end + params.CompositeQuery.BuilderQueries["A"].Limit = tc.queryParams.limit + params.CompositeQuery.BuilderQueries["A"].Offset = tc.queryParams.offset + params.CompositeQuery.BuilderQueries["A"].PageSize = tc.queryParams.pageSize + // Execute query + results, errMap, err := q.runWindowBasedListQuery(context.Background(), params, tsRanges) + + if tc.expectedError { + require.Error(t, err) + return + } + + // Assertions + require.NoError(t, err, "Query execution failed") + require.Nil(t, errMap, "Unexpected error map in results") + require.Len(t, results, 1, "Expected exactly one result set") + + result := results[0] + require.Equal(t, "A", result.QueryName, "Incorrect query name in results") + require.Len(t, result.List, len(tc.expectedTimestamps), + "Result count mismatch: got %d results, expected %d", + len(result.List), len(tc.expectedTimestamps)) + + for i, expected := range tc.expectedTimestamps { + require.Equal(t, expected, result.List[i].Timestamp.UnixNano(), + "Timestamp mismatch at index %d: got %d, expected %d", + i, result.List[i].Timestamp.UnixNano(), expected) + } + + // Verify mock expectations + err = telemetryStore.Mock().ExpectationsWereMet() + require.NoError(t, err, "Mock expectations were not met") + }) + } +} + +func Test_querier_Logs_runWindowBasedListQueryAsc(t *testing.T) { + params := &v3.QueryRangeParamsV3{ + Start: 1722171576000000000, // July 28, 2024 6:29:36 PM + End: 1722262800000000000, // July 29, 2024 7:50:00 PM + CompositeQuery: &v3.CompositeQuery{ + PanelType: v3.PanelTypeList, + BuilderQueries: map[string]*v3.BuilderQuery{ + "A": { + QueryName: "A", + Expression: "A", + DataSource: v3.DataSourceLogs, + PageSize: 10, + Limit: 100, + StepInterval: 60, + AggregateOperator: v3.AggregateOperatorNoOp, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{}, + }, + OrderBy: []v3.OrderBy{ + { + ColumnName: "timestamp", + Order: "asc", + }, + { + ColumnName: "id", + Order: "asc", + }, + }, + }, + }, + }, + } + + tsRanges := []utils.LogsListTsRange{ + { + Start: 1722259200000000000, // July 29, 2024 6:50:00 PM + End: 1722262800000000000, // July 29, 2024 7:50:00 PM + }, + { + Start: 1722252000000000000, // July 29, 2024 4:50:00 PM + End: 1722259200000000000, // July 29, 2024 6:50:00 PM + }, + { + Start: 1722237600000000000, // July 29, 2024 12:50:00 PM + End: 1722252000000000000, // July 29, 2024 4:50:00 PM + }, + { + Start: 1722208800000000000, // July 29, 2024 4:50:00 AM + End: 1722237600000000000, // July 29, 2024 12:50:00 PM + }, + { + Start: 1722171576000000000, // July 28, 2024 6:29:36 PM + End: 1722208800000000000, // July 29, 2024 4:50:00 AM + }, + } + + type queryParams struct { + start int64 + end int64 + limit uint64 + offset uint64 + pageSize uint64 + } + + type queryResponse struct { + expectedQuery string + timestamps []uint64 + } + + // create test struct with moc data i.e array of timestamps, limit, offset and expected results + testCases := []struct { + name string + queryResponses []queryResponse + queryParams queryParams + expectedTimestamps []int64 + expectedError bool + }{ + { + name: "should return correct timestamps when querying within time window", + queryResponses: []queryResponse{ + { + expectedQuery: ".*(timestamp >= 1722171576000000000 AND timestamp <= 1722208800000000000).* asc LIMIT 2", + timestamps: []uint64{1722171576010000000, 1722171576020000000}, + }, + }, + queryParams: queryParams{ + start: 1722171576000000000, + end: 1722262800000000000, + pageSize: 2, + offset: 0, + }, + expectedTimestamps: []int64{1722171576010000000, 1722171576020000000}, + }, + { + name: "all data not in first windows", + queryResponses: []queryResponse{ + { + expectedQuery: ".*(timestamp >= 1722171576000000000 AND timestamp <= 1722208800000000000).* asc LIMIT 3", + timestamps: []uint64{1722171576001000000, 1722171576002000000}, + }, + { + expectedQuery: ".*(timestamp >= 1722208800000000000 AND timestamp <= 1722237600000000000).* asc LIMIT 1", + timestamps: []uint64{1722208800100000000}, + }, + }, + queryParams: queryParams{ + start: 1722171576000000000, + end: 1722262800000000000, + pageSize: 3, + offset: 0, + }, + expectedTimestamps: []int64{1722171576001000000, 1722171576002000000, 1722208800100000000}, + }, + { + name: "query with offset and limit- data spread across multiple windows", + queryResponses: []queryResponse{ + { + expectedQuery: ".*(timestamp >= 1722171576000000000 AND timestamp <= 1722208800000000000).* asc LIMIT 11", + timestamps: []uint64{}, + }, + { + expectedQuery: ".*(timestamp >= 1722208800000000000 AND timestamp <= 1722237600000000000).* asc LIMIT 11", + timestamps: []uint64{1722208800100000000, 1722208800200000000, 1722208800300000000}, + }, + { + expectedQuery: ".*(timestamp >= 1722237600000000000 AND timestamp <= 1722252000000000000).* asc LIMIT 8", + timestamps: []uint64{1722237600100000000, 1722237600200000000, 1722237600300000000, 1722237600400000000, 1722237600500000000}, + }, + { + expectedQuery: ".*(timestamp >= 1722252000000000000 AND timestamp <= 1722259200000000000).* asc LIMIT 3", + timestamps: []uint64{1722252000000100000, 1722252000000200000, 1722252000000300000}, + }, + }, + queryParams: queryParams{ + start: 1722171576000000000, + end: 1722262800000000000, + pageSize: 5, + offset: 6, + }, + expectedTimestamps: []int64{1722237600400000000, 1722237600500000000, 1722252000000100000, 1722252000000200000, 1722252000000300000}, + }, + } + + cols := []cmock.ColumnType{ + {Name: "timestamp", Type: "UInt64"}, + {Name: "name", Type: "String"}, + } + testName := "name" + + options := clickhouseReader.NewOptions("", "", "archiveNamespace") + + // iterate over test data, create reader and run test + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Setup mock + telemetryStore := telemetrystoretest.New(telemetrystore.Config{Provider: "clickhouse"}, sqlmock.QueryMatcherRegexp) + + // Configure mock responses + for _, response := range tc.queryResponses { + values := make([][]any, 0, len(response.timestamps)) + for _, ts := range response.timestamps { + values = append(values, []any{&ts, &testName}) + } + telemetryStore.Mock().ExpectQuery(response.expectedQuery).WillReturnRows( + cmock.NewRows(cols, values), + ) + } + + // Create reader and querier + reader := clickhouseReader.NewReaderFromClickhouseConnection( + options, + nil, + telemetryStore, + prometheustest.New(instrumentationtest.New().Logger(), prometheus.Config{}), + "", + time.Duration(time.Second), + nil, + ) + + q := &querier{ + reader: reader, + builder: queryBuilder.NewQueryBuilder( + queryBuilder.QueryBuilderOptions{ + BuildLogQuery: logsV4.PrepareLogsQuery, + }, + ), + } + // Update query parameters + params.Start = tc.queryParams.start + params.End = tc.queryParams.end + params.CompositeQuery.BuilderQueries["A"].Limit = tc.queryParams.limit + params.CompositeQuery.BuilderQueries["A"].Offset = tc.queryParams.offset + params.CompositeQuery.BuilderQueries["A"].PageSize = tc.queryParams.pageSize + + // Create a copy of tsRanges before passing to the function + // required because tsRanges is modified in the function + tsRangesCopy := make([]utils.LogsListTsRange, len(tsRanges)) + copy(tsRangesCopy, tsRanges) + // Execute query + results, errMap, err := q.runWindowBasedListQuery(context.Background(), params, tsRangesCopy) + + if tc.expectedError { + require.Error(t, err) + return + } + + // Assertions + require.NoError(t, err, "Query execution failed") + require.Nil(t, errMap, "Unexpected error map in results") + require.Len(t, results, 1, "Expected exactly one result set") + + result := results[0] + require.Equal(t, "A", result.QueryName, "Incorrect query name in results") + require.Len(t, result.List, len(tc.expectedTimestamps), + "Result count mismatch: got %d results, expected %d", + len(result.List), len(tc.expectedTimestamps)) + + for i, expected := range tc.expectedTimestamps { + require.Equal(t, expected, result.List[i].Timestamp.UnixNano(), + "Timestamp mismatch at index %d: got %d, expected %d", + i, result.List[i].Timestamp.UnixNano(), expected) + } + + // Verify mock expectations + err = telemetryStore.Mock().ExpectationsWereMet() + require.NoError(t, err, "Mock expectations were not met") + }) + } +} diff --git a/pkg/query-service/app/querier/v2/querier.go b/pkg/query-service/app/querier/v2/querier.go index a669cbb403..322b25be91 100644 --- a/pkg/query-service/app/querier/v2/querier.go +++ b/pkg/query-service/app/querier/v2/querier.go @@ -3,6 +3,7 @@ package v2 import ( "context" "fmt" + "strings" "sync" "time" @@ -308,114 +309,108 @@ func (q *querier) runWindowBasedListQuery(ctx context.Context, params *v3.QueryR limit := uint64(0) offset := uint64(0) - // se we are considering only one query + // Get query details and check order direction for name, v := range params.CompositeQuery.BuilderQueries { qName = name pageSize = v.PageSize - - // for traces specifically limit = v.Limit offset = v.Offset + } + + // Check if order is ascending + if strings.ToLower(string(params.CompositeQuery.BuilderQueries[qName].OrderBy[0].Order)) == "asc" { + // Reverse the time ranges for ascending order + for i, j := 0, len(tsRanges)-1; i < j; i, j = i+1, j-1 { + tsRanges[i], tsRanges[j] = tsRanges[j], tsRanges[i] + } + } + + // check if it is a logs query + isLogs := false + if params.CompositeQuery.BuilderQueries[qName].DataSource == v3.DataSourceLogs { + isLogs = true + } + data := []*v3.Row{} - tracesLimit := limit + offset + limitWithOffset := limit + offset + if isLogs { + // for logs we use pageSize to define the current limit and limit to define the absolute limit + limitWithOffset = pageSize + offset + if limit > 0 && offset >= limit { + return nil, nil, fmt.Errorf("max limit exceeded") + } + } for _, v := range tsRanges { params.Start = v.Start params.End = v.End - length := uint64(0) - // this will to run only once - // appending the filter to get the next set of data - if params.CompositeQuery.BuilderQueries[qName].DataSource == v3.DataSourceLogs { - params.CompositeQuery.BuilderQueries[qName].PageSize = pageSize - uint64(len(data)) - queries, err := q.builder.PrepareQueries(params) - if err != nil { - return nil, nil, err - } - for name, query := range queries { - rowList, err := q.reader.GetListResultV3(ctx, query) - if err != nil { - errs := []error{err} - errQueriesByName := map[string]error{ - name: err, - } - return nil, errQueriesByName, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...)) - } - length += uint64(len(rowList)) - data = append(data, rowList...) - } + // max limit + offset is 10k for pagination for traces/logs + // TODO(nitya): define something for logs + if !isLogs && limitWithOffset > constants.TRACE_V4_MAX_PAGINATION_LIMIT { + return nil, nil, fmt.Errorf("maximum traces that can be paginated is 10000") + } - if length > 0 { - params.CompositeQuery.BuilderQueries[qName].Filters.Items = append(params.CompositeQuery.BuilderQueries[qName].Filters.Items, v3.FilterItem{ - Key: v3.AttributeKey{ - Key: "id", - IsColumn: true, - DataType: "string", - }, - Operator: v3.FilterOperatorLessThan, - Value: data[len(data)-1].Data["id"], - }) - } + // we are updating the offset and limit based on the number of traces/logs we have found in the current timerange + // eg - + // 1)offset = 0, limit = 100, tsRanges = [t1, t10], [t10, 20], [t20, t30] + // + // if 100 traces/logs are there in [t1, t10] then 100 will return immediately. + // if 10 traces/logs are there in [t1, t10] then we get 10, set offset to 0 and limit to 90, search in the next timerange of [t10, 20] + // if we don't find any trace in [t1, t10], then we search in [t10, 20] with offset=0, limit=100 - if uint64(len(data)) >= pageSize { - break - } + // + // 2) offset = 50, limit = 100, tsRanges = [t1, t10], [t10, 20], [t20, t30] + // + // If we find 150 traces/logs with limit=150 and offset=0 in [t1, t10] then we return immediately 100 traces/logs + // If we find 50 in [t1, t10] with limit=150 and offset=0 then it will set limit = 100 and offset=0 and search in the next timerange of [t10, 20] + // if we don't find any trace in [t1, t10], then we search in [t10, 20] with limit=150 and offset=0 + + params.CompositeQuery.BuilderQueries[qName].Offset = 0 + // if datasource is logs + if isLogs { + // for logs we use limit to define the absolute limit and pagesize to define the current limit + params.CompositeQuery.BuilderQueries[qName].PageSize = limitWithOffset } else { - // TRACE - // we are updating the offset and limit based on the number of traces we have found in the current timerange - // eg - - // 1)offset = 0, limit = 100, tsRanges = [t1, t10], [t10, 20], [t20, t30] - // - // if 100 traces are there in [t1, t10] then 100 will return immediately. - // if 10 traces are there in [t1, t10] then we get 10, set offset to 0 and limit to 90, search in the next timerange of [t10, 20] - // if we don't find any trace in [t1, t10], then we search in [t10, 20] with offset=0, limit=100 + params.CompositeQuery.BuilderQueries[qName].Limit = limitWithOffset + } - // - // 2) offset = 50, limit = 100, tsRanges = [t1, t10], [t10, 20], [t20, t30] - // - // If we find 150 traces with limit=150 and offset=0 in [t1, t10] then we return immediately 100 traces - // If we find 50 in [t1, t10] with limit=150 and offset=0 then it will set limit = 100 and offset=0 and search in the next timerange of [t10, 20] - // if we don't find any trace in [t1, t10], then we search in [t10, 20] with limit=150 and offset=0 - - // max limit + offset is 10k for pagination - if tracesLimit > constants.TRACE_V4_MAX_PAGINATION_LIMIT { - return nil, nil, fmt.Errorf("maximum traces that can be paginated is 10000") - } - - params.CompositeQuery.BuilderQueries[qName].Offset = 0 - params.CompositeQuery.BuilderQueries[qName].Limit = tracesLimit - queries, err := q.builder.PrepareQueries(params) + queries, err := q.builder.PrepareQueries(params) + if err != nil { + return nil, nil, err + } + for name, query := range queries { + rowList, err := q.reader.GetListResultV3(ctx, query) if err != nil { - return nil, nil, err - } - for name, query := range queries { - rowList, err := q.reader.GetListResultV3(ctx, query) - if err != nil { - errs := []error{err} - errQueriesByName := map[string]error{ - name: err, - } - return nil, errQueriesByName, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...)) + errs := []error{err} + errQueriesByName := map[string]error{ + name: err, } - length += uint64(len(rowList)) + return nil, errQueriesByName, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...)) + } + length += uint64(len(rowList)) - // skip the traces unless offset is 0 - for _, row := range rowList { - if offset == 0 { - data = append(data, row) - } else { - offset-- - } + // skip the traces unless offset is 0 + for _, row := range rowList { + if offset == 0 { + data = append(data, row) + } else { + offset-- } } - tracesLimit = tracesLimit - length + } - if uint64(len(data)) >= limit { - break - } + limitWithOffset = limitWithOffset - length + + if isLogs && uint64(len(data)) >= pageSize { + // for logs + break + } else if !isLogs && uint64(len(data)) >= limit { + // for traces + break } } res = append(res, &v3.Result{ @@ -432,12 +427,18 @@ func (q *querier) runBuilderListQueries(ctx context.Context, params *v3.QueryRan len(params.CompositeQuery.BuilderQueries) == 1 && params.CompositeQuery.PanelType != v3.PanelTypeTrace { for _, v := range params.CompositeQuery.BuilderQueries { - // only allow of logs queries with timestamp ordering desc - // TODO(nitya): allow for timestamp asc - if (v.DataSource == v3.DataSourceLogs || v.DataSource == v3.DataSourceTraces) && + + // for logs: allow only when order of timestamp and id is same + if v.DataSource == v3.DataSourceTraces && len(v.OrderBy) == 1 && + v.OrderBy[0].ColumnName == "timestamp" { + startEndArr := utils.GetListTsRanges(params.Start, params.End) + return q.runWindowBasedListQuery(ctx, params, startEndArr) + } else if v.DataSource == v3.DataSourceLogs && + len(v.OrderBy) == 2 && v.OrderBy[0].ColumnName == "timestamp" && - v.OrderBy[0].Order == "desc" { + v.OrderBy[1].ColumnName == "id" && + v.OrderBy[1].Order == v.OrderBy[0].Order { startEndArr := utils.GetListTsRanges(params.Start, params.End) return q.runWindowBasedListQuery(ctx, params, startEndArr) } diff --git a/pkg/query-service/app/querier/v2/querier_test.go b/pkg/query-service/app/querier/v2/querier_test.go index 8787cfa60b..c0cc7670bc 100644 --- a/pkg/query-service/app/querier/v2/querier_test.go +++ b/pkg/query-service/app/querier/v2/querier_test.go @@ -15,8 +15,10 @@ import ( "github.com/SigNoz/signoz/pkg/prometheus" "github.com/SigNoz/signoz/pkg/prometheus/prometheustest" "github.com/SigNoz/signoz/pkg/query-service/app/clickhouseReader" + logsV4 "github.com/SigNoz/signoz/pkg/query-service/app/logs/v4" "github.com/SigNoz/signoz/pkg/query-service/app/queryBuilder" tracesV3 "github.com/SigNoz/signoz/pkg/query-service/app/traces/v3" + tracesV4 "github.com/SigNoz/signoz/pkg/query-service/app/traces/v4" v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3" "github.com/SigNoz/signoz/pkg/query-service/querycache" "github.com/SigNoz/signoz/pkg/query-service/utils" @@ -1217,7 +1219,7 @@ func TestV2QueryRangeValueTypePromQL(t *testing.T) { } } -func Test_querier_runWindowBasedListQuery(t *testing.T) { +func Test_querier_Traces_runWindowBasedListQueryDesc(t *testing.T) { params := &v3.QueryRangeParamsV3{ Start: 1722171576000000000, // July 28, 2024 6:29:36 PM End: 1722262800000000000, // July 29, 2024 7:50:00 PM @@ -1237,6 +1239,13 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) { Operator: "AND", Items: []v3.FilterItem{}, }, + OrderBy: []v3.OrderBy{ + { + ColumnName: "timestamp", + IsColumn: true, + Order: "DESC", + }, + }, }, }, }, @@ -1290,7 +1299,7 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) { queryResponses: []queryResponse{ { expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 2", - timestamps: []uint64{1722259300000000000, 1722259400000000000}, + timestamps: []uint64{1722259400000000000, 1722259300000000000}, }, }, queryParams: queryParams{ @@ -1299,14 +1308,14 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) { limit: 2, offset: 0, }, - expectedTimestamps: []int64{1722259300000000000, 1722259400000000000}, + expectedTimestamps: []int64{1722259400000000000, 1722259300000000000}, }, { name: "all data not in first windows", queryResponses: []queryResponse{ { expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 3", - timestamps: []uint64{1722259300000000000, 1722259400000000000}, + timestamps: []uint64{1722259400000000000, 1722259300000000000}, }, { expectedQuery: ".*(timestamp >= '1722252000000000000' AND timestamp <= '1722259200000000000').* DESC LIMIT 1", @@ -1319,14 +1328,14 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) { limit: 3, offset: 0, }, - expectedTimestamps: []int64{1722259300000000000, 1722259400000000000, 1722253000000000000}, + expectedTimestamps: []int64{1722259400000000000, 1722259300000000000, 1722253000000000000}, }, { name: "data in multiple windows", queryResponses: []queryResponse{ { expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 5", - timestamps: []uint64{1722259300000000000, 1722259400000000000}, + timestamps: []uint64{1722259400000000000, 1722259300000000000}, }, { expectedQuery: ".*(timestamp >= '1722252000000000000' AND timestamp <= '1722259200000000000').* DESC LIMIT 3", @@ -1351,18 +1360,18 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) { limit: 5, offset: 0, }, - expectedTimestamps: []int64{1722259300000000000, 1722259400000000000, 1722253000000000000, 1722237700000000000}, + expectedTimestamps: []int64{1722259400000000000, 1722259300000000000, 1722253000000000000, 1722237700000000000}, }, { name: "query with offset", queryResponses: []queryResponse{ { expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 7", - timestamps: []uint64{1722259210000000000, 1722259220000000000, 1722259230000000000}, + timestamps: []uint64{1722259230000000000, 1722259220000000000, 1722259210000000000}, }, { expectedQuery: ".*(timestamp >= '1722252000000000000' AND timestamp <= '1722259200000000000').* DESC LIMIT 4", - timestamps: []uint64{1722253000000000000, 1722254000000000000, 1722255000000000000}, + timestamps: []uint64{1722255000000000000, 1722254000000000000, 1722253000000000000}, }, { expectedQuery: ".*(timestamp >= '1722237600000000000' AND timestamp <= '1722252000000000000').* DESC LIMIT 1", @@ -1375,7 +1384,7 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) { limit: 4, offset: 3, }, - expectedTimestamps: []int64{1722253000000000000, 1722254000000000000, 1722255000000000000, 1722237700000000000}, + expectedTimestamps: []int64{1722255000000000000, 1722254000000000000, 1722253000000000000, 1722237700000000000}, }, { name: "query with offset and limit- data spread across multiple windows", @@ -1386,15 +1395,15 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) { }, { expectedQuery: ".*(timestamp >= '1722252000000000000' AND timestamp <= '1722259200000000000').* DESC LIMIT 11", - timestamps: []uint64{1722253000000000000, 1722254000000000000, 1722255000000000000}, + timestamps: []uint64{1722255000000000000, 1722254000000000000, 1722253000000000000}, }, { expectedQuery: ".*(timestamp >= '1722237600000000000' AND timestamp <= '1722252000000000000').* DESC LIMIT 8", - timestamps: []uint64{1722237700000000000, 1722237800000000000, 1722237900000000000, 1722237910000000000, 1722237920000000000}, + timestamps: []uint64{1722237920000000000, 1722237910000000000, 1722237900000000000, 1722237800000000000, 1722237700000000000}, }, { expectedQuery: ".*(timestamp >= '1722208800000000000' AND timestamp <= '1722237600000000000').* DESC LIMIT 3", - timestamps: []uint64{1722208810000000000, 1722208820000000000, 1722208830000000000}, + timestamps: []uint64{1722208830000000000, 1722208820000000000, 1722208810000000000}, }, }, queryParams: queryParams{ @@ -1403,7 +1412,7 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) { limit: 5, offset: 6, }, - expectedTimestamps: []int64{1722237910000000000, 1722237920000000000, 1722208810000000000, 1722208820000000000, 1722208830000000000}, + expectedTimestamps: []int64{1722237800000000000, 1722237700000000000, 1722208830000000000, 1722208820000000000, 1722208810000000000}, }, { name: "don't allow pagination to get more than 10k spans", @@ -1442,7 +1451,6 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) { telemetryStore.Mock().ExpectQuery(response.expectedQuery).WillReturnRows( cmock.NewRows(cols, values), ) - // } } // Create reader and querier @@ -1460,7 +1468,7 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) { reader: reader, builder: queryBuilder.NewQueryBuilder( queryBuilder.QueryBuilderOptions{ - BuildTraceQuery: tracesV3.PrepareTracesQuery, + BuildTraceQuery: tracesV4.PrepareTracesQuery, }, ), } @@ -1501,3 +1509,756 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) { }) } } + +func Test_querier_Traces_runWindowBasedListQueryAsc(t *testing.T) { + params := &v3.QueryRangeParamsV3{ + Start: 1722171576000000000, // July 28, 2024 6:29:36 PM + End: 1722262800000000000, // July 29, 2024 7:50:00 PM + CompositeQuery: &v3.CompositeQuery{ + PanelType: v3.PanelTypeList, + BuilderQueries: map[string]*v3.BuilderQuery{ + "A": { + QueryName: "A", + Expression: "A", + DataSource: v3.DataSourceTraces, + PageSize: 10, + Limit: 100, + StepInterval: 60, + AggregateOperator: v3.AggregateOperatorNoOp, + SelectColumns: []v3.AttributeKey{{Key: "serviceName"}}, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{}, + }, + OrderBy: []v3.OrderBy{ + { + ColumnName: "timestamp", + IsColumn: true, + Key: "timestamp", + Order: "asc", + }, + }, + }, + }, + }, + } + + tsRanges := []utils.LogsListTsRange{ + { + Start: 1722259200000000000, // July 29, 2024 6:50:00 PM + End: 1722262800000000000, // July 29, 2024 7:50:00 PM + }, + { + Start: 1722252000000000000, // July 29, 2024 4:50:00 PM + End: 1722259200000000000, // July 29, 2024 6:50:00 PM + }, + { + Start: 1722237600000000000, // July 29, 2024 12:50:00 PM + End: 1722252000000000000, // July 29, 2024 4:50:00 PM + }, + { + Start: 1722208800000000000, // July 29, 2024 4:50:00 AM + End: 1722237600000000000, // July 29, 2024 12:50:00 PM + }, + { + Start: 1722171576000000000, // July 28, 2024 6:29:36 PM + End: 1722208800000000000, // July 29, 2024 4:50:00 AM + }, + } + + type queryParams struct { + start int64 + end int64 + limit uint64 + offset uint64 + } + + type queryResponse struct { + expectedQuery string + timestamps []uint64 + } + + // create test struct with moc data i.e array of timestamps, limit, offset and expected results + testCases := []struct { + name string + queryResponses []queryResponse + queryParams queryParams + expectedTimestamps []int64 + expectedError bool + }{ + { + name: "should return correct timestamps when querying within time window", + queryResponses: []queryResponse{ + { + expectedQuery: ".*(timestamp >= '1722171576000000000' AND timestamp <= '1722208800000000000').* asc LIMIT 2", + timestamps: []uint64{1722171576000000000, 1722171577000000000}, + }, + }, + queryParams: queryParams{ + start: 1722171576000000000, + end: 1722262800000000000, + limit: 2, + offset: 0, + }, + expectedTimestamps: []int64{1722171576000000000, 1722171577000000000}, + }, + { + name: "all data not in first windows", + queryResponses: []queryResponse{ + { + expectedQuery: ".*(timestamp >= '1722171576000000000' AND timestamp <= '1722208800000000000').* asc LIMIT 3", + timestamps: []uint64{1722259200000000000, 1722259201000000000}, + }, + { + expectedQuery: ".*(timestamp >= '1722208800000000000' AND timestamp <= '1722237600000000000').* asc LIMIT 1", + timestamps: []uint64{1722208800100000000}, + }, + }, + queryParams: queryParams{ + start: 1722171576000000000, + end: 1722262800000000000, + limit: 3, + offset: 0, + }, + expectedTimestamps: []int64{1722259200000000000, 1722259201000000000, 1722208800100000000}, + }, + { + name: "query with offset and limit- data spread across multiple windows", + queryResponses: []queryResponse{ + { + expectedQuery: ".*(timestamp >= '1722171576000000000' AND timestamp <= '1722208800000000000').* asc LIMIT 11", + timestamps: []uint64{}, + }, + { + expectedQuery: ".*(timestamp >= '1722208800000000000' AND timestamp <= '1722237600000000000').* asc LIMIT 11", + timestamps: []uint64{1722208801000000000, 1722208802000000000, 1722208803000000000}, + }, + { + expectedQuery: ".*(timestamp >= '1722237600000000000' AND timestamp <= '1722252000000000000').* asc LIMIT 8", + timestamps: []uint64{1722237600010000000, 1722237600020000000, 1722237600030000000, 1722237600040000000, 1722237600050000000}, + }, + { + expectedQuery: ".*(timestamp >= '1722252000000000000' AND timestamp <= '1722259200000000000').* asc LIMIT 3", + timestamps: []uint64{1722252000100000000, 1722252000200000000, 1722252000300000000}, + }, + }, + queryParams: queryParams{ + start: 1722171576000000000, + end: 1722262800000000000, + limit: 5, + offset: 6, + }, + expectedTimestamps: []int64{1722237600040000000, 1722237600050000000, 1722252000100000000, 1722252000200000000, 1722252000300000000}, + }, + } + + cols := []cmock.ColumnType{ + {Name: "timestamp", Type: "UInt64"}, + {Name: "name", Type: "String"}, + } + testName := "name" + + options := clickhouseReader.NewOptions("", "", "archiveNamespace") + + // iterate over test data, create reader and run test + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Setup mock + telemetryStore := telemetrystoretest.New(telemetrystore.Config{Provider: "clickhouse"}, sqlmock.QueryMatcherRegexp) + + // Configure mock responses + for _, response := range tc.queryResponses { + values := make([][]any, 0, len(response.timestamps)) + for _, ts := range response.timestamps { + values = append(values, []any{&ts, &testName}) + } + telemetryStore.Mock().ExpectQuery(response.expectedQuery).WillReturnRows( + cmock.NewRows(cols, values), + ) + } + + // Create reader and querier + reader := clickhouseReader.NewReaderFromClickhouseConnection( + options, + nil, + telemetryStore, + prometheustest.New(instrumentationtest.New().Logger(), prometheus.Config{}), + "", + time.Duration(time.Second), + nil, + ) + + q := &querier{ + reader: reader, + builder: queryBuilder.NewQueryBuilder( + queryBuilder.QueryBuilderOptions{ + BuildTraceQuery: tracesV4.PrepareTracesQuery, + }, + ), + } + // Update query parameters + params.Start = tc.queryParams.start + params.End = tc.queryParams.end + params.CompositeQuery.BuilderQueries["A"].Limit = tc.queryParams.limit + params.CompositeQuery.BuilderQueries["A"].Offset = tc.queryParams.offset + + // Create a copy of tsRanges before passing to the function + // required because tsRanges is modified in the function + tsRangesCopy := make([]utils.LogsListTsRange, len(tsRanges)) + copy(tsRangesCopy, tsRanges) + + results, errMap, err := q.runWindowBasedListQuery(context.Background(), params, tsRangesCopy) + + if tc.expectedError { + require.Error(t, err) + return + } + + // Assertions + require.NoError(t, err, "Query execution failed") + require.Nil(t, errMap, "Unexpected error map in results") + require.Len(t, results, 1, "Expected exactly one result set") + + result := results[0] + require.Equal(t, "A", result.QueryName, "Incorrect query name in results") + require.Len(t, result.List, len(tc.expectedTimestamps), + "Result count mismatch: got %d results, expected %d", + len(result.List), len(tc.expectedTimestamps)) + + for i, expected := range tc.expectedTimestamps { + require.Equal(t, expected, result.List[i].Timestamp.UnixNano(), + "Timestamp mismatch at index %d: got %d, expected %d", + i, result.List[i].Timestamp.UnixNano(), expected) + } + + // Verify mock expectations + err = telemetryStore.Mock().ExpectationsWereMet() + require.NoError(t, err, "Mock expectations were not met") + }) + } +} + +func Test_querier_Logs_runWindowBasedListQueryDesc(t *testing.T) { + params := &v3.QueryRangeParamsV3{ + Start: 1722171576000000000, // July 28, 2024 6:29:36 PM + End: 1722262800000000000, // July 29, 2024 7:50:00 PM + CompositeQuery: &v3.CompositeQuery{ + PanelType: v3.PanelTypeList, + BuilderQueries: map[string]*v3.BuilderQuery{ + "A": { + QueryName: "A", + Expression: "A", + DataSource: v3.DataSourceLogs, + PageSize: 10, + Limit: 100, + StepInterval: 60, + AggregateOperator: v3.AggregateOperatorNoOp, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{}, + }, + OrderBy: []v3.OrderBy{ + { + ColumnName: "timestamp", + Order: "DESC", + }, + { + ColumnName: "id", + Order: "DESC", + }, + }, + }, + }, + }, + } + + tsRanges := []utils.LogsListTsRange{ + { + Start: 1722259200000000000, // July 29, 2024 6:50:00 PM + End: 1722262800000000000, // July 29, 2024 7:50:00 PM + }, + { + Start: 1722252000000000000, // July 29, 2024 4:50:00 PM + End: 1722259200000000000, // July 29, 2024 6:50:00 PM + }, + { + Start: 1722237600000000000, // July 29, 2024 12:50:00 PM + End: 1722252000000000000, // July 29, 2024 4:50:00 PM + }, + { + Start: 1722208800000000000, // July 29, 2024 4:50:00 AM + End: 1722237600000000000, // July 29, 2024 12:50:00 PM + }, + { + Start: 1722171576000000000, // July 28, 2024 6:29:36 PM + End: 1722208800000000000, // July 29, 2024 4:50:00 AM + }, + } + + type queryParams struct { + start int64 + end int64 + limit uint64 + offset uint64 + pageSize uint64 + } + + type queryResponse struct { + expectedQuery string + timestamps []uint64 + } + + // create test struct with moc data i.e array of timestamps, limit, offset and expected results + testCases := []struct { + name string + queryResponses []queryResponse + queryParams queryParams + expectedTimestamps []int64 + expectedError bool + }{ + { + name: "should return correct timestamps when querying within time window", + queryResponses: []queryResponse{ + { + expectedQuery: ".*(timestamp >= 1722259200000000000 AND timestamp <= 1722262800000000000).* DESC LIMIT 2", + timestamps: []uint64{1722259400000000000, 1722259300000000000}, + }, + }, + queryParams: queryParams{ + start: 1722171576000000000, + end: 1722262800000000000, + pageSize: 2, + offset: 0, + }, + expectedTimestamps: []int64{1722259400000000000, 1722259300000000000}, + }, + { + name: "all data not in first windows", + queryResponses: []queryResponse{ + { + expectedQuery: ".*(timestamp >= 1722259200000000000 AND timestamp <= 1722262800000000000).* DESC LIMIT 3", + timestamps: []uint64{1722259400000000000, 1722259300000000000}, + }, + { + expectedQuery: ".*(timestamp >= 1722252000000000000 AND timestamp <= 1722259200000000000).* DESC LIMIT 1", + timestamps: []uint64{1722253000000000000}, + }, + }, + queryParams: queryParams{ + start: 1722171576000000000, + end: 1722262800000000000, + pageSize: 3, + offset: 0, + }, + expectedTimestamps: []int64{1722259400000000000, 1722259300000000000, 1722253000000000000}, + }, + { + name: "data in multiple windows", + queryResponses: []queryResponse{ + { + expectedQuery: ".*(timestamp >= 1722259200000000000 AND timestamp <= 1722262800000000000).* DESC LIMIT 5", + timestamps: []uint64{1722259400000000000, 1722259300000000000}, + }, + { + expectedQuery: ".*(timestamp >= 1722252000000000000 AND timestamp <= 1722259200000000000).* DESC LIMIT 3", + timestamps: []uint64{1722253000000000000}, + }, + { + expectedQuery: ".*(timestamp >= 1722237600000000000 AND timestamp <= 1722252000000000000).* DESC LIMIT 2", + timestamps: []uint64{1722237700000000000}, + }, + { + expectedQuery: ".*(timestamp >= 1722208800000000000 AND timestamp <= 1722237600000000000).* DESC LIMIT 1", + timestamps: []uint64{}, + }, + { + expectedQuery: ".*(timestamp >= 1722171576000000000 AND timestamp <= 1722208800000000000).* DESC LIMIT 1", + timestamps: []uint64{}, + }, + }, + queryParams: queryParams{ + start: 1722171576000000000, + end: 1722262800000000000, + pageSize: 5, + offset: 0, + }, + expectedTimestamps: []int64{1722259400000000000, 1722259300000000000, 1722253000000000000, 1722237700000000000}, + }, + { + name: "query with offset", + queryResponses: []queryResponse{ + { + expectedQuery: ".*(timestamp >= 1722259200000000000 AND timestamp <= 1722262800000000000).* DESC LIMIT 7", + timestamps: []uint64{1722259230000000000, 1722259220000000000, 1722259210000000000}, + }, + { + expectedQuery: ".*(timestamp >= 1722252000000000000 AND timestamp <= 1722259200000000000).* DESC LIMIT 4", + timestamps: []uint64{1722255000000000000, 1722254000000000000, 1722253000000000000}, + }, + { + expectedQuery: ".*(timestamp >= 1722237600000000000 AND timestamp <= 1722252000000000000).* DESC LIMIT 1", + timestamps: []uint64{1722237700000000000}, + }, + }, + queryParams: queryParams{ + start: 1722171576000000000, + end: 1722262800000000000, + pageSize: 4, + offset: 3, + }, + expectedTimestamps: []int64{1722255000000000000, 1722254000000000000, 1722253000000000000, 1722237700000000000}, + }, + { + name: "query with offset and limit- data spread across multiple windows", + queryResponses: []queryResponse{ + { + expectedQuery: ".*(timestamp >= 1722259200000000000 AND timestamp <= 1722262800000000000).* DESC LIMIT 11", + timestamps: []uint64{}, + }, + { + expectedQuery: ".*(timestamp >= 1722252000000000000 AND timestamp <= 1722259200000000000).* DESC LIMIT 11", + timestamps: []uint64{1722255000000000000, 1722254000000000000, 1722253000000000000}, + }, + { + expectedQuery: ".*(timestamp >= 1722237600000000000 AND timestamp <= 1722252000000000000).* DESC LIMIT 8", + timestamps: []uint64{1722237920000000000, 1722237910000000000, 1722237900000000000, 1722237800000000000, 1722237700000000000}, + }, + { + expectedQuery: ".*(timestamp >= 1722208800000000000 AND timestamp <= 1722237600000000000).* DESC LIMIT 3", + timestamps: []uint64{1722208830000000000, 1722208820000000000, 1722208810000000000}, + }, + }, + queryParams: queryParams{ + start: 1722171576000000000, + end: 1722262800000000000, + pageSize: 5, + offset: 6, + }, + expectedTimestamps: []int64{1722237800000000000, 1722237700000000000, 1722208830000000000, 1722208820000000000, 1722208810000000000}, + }, + { + name: "dont allow pagination to get more than speficied limit", + queryResponses: []queryResponse{}, + queryParams: queryParams{ + start: 1722171576000000000, + end: 1722262800000000000, + limit: 200, + offset: 210, + pageSize: 30, + }, + expectedError: true, + }, + } + + cols := []cmock.ColumnType{ + {Name: "timestamp", Type: "UInt64"}, + {Name: "name", Type: "String"}, + } + testName := "name" + + options := clickhouseReader.NewOptions("", "", "archiveNamespace") + + // iterate over test data, create reader and run test + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Setup mock + telemetryStore := telemetrystoretest.New(telemetrystore.Config{Provider: "clickhouse"}, sqlmock.QueryMatcherRegexp) + + // Configure mock responses + for _, response := range tc.queryResponses { + values := make([][]any, 0, len(response.timestamps)) + for _, ts := range response.timestamps { + values = append(values, []any{&ts, &testName}) + } + telemetryStore.Mock().ExpectQuery(response.expectedQuery).WillReturnRows( + cmock.NewRows(cols, values), + ) + } + + // Create reader and querier + reader := clickhouseReader.NewReaderFromClickhouseConnection( + options, + nil, + telemetryStore, + prometheustest.New(instrumentationtest.New().Logger(), prometheus.Config{}), + "", + time.Duration(time.Second), + nil, + ) + + q := &querier{ + reader: reader, + builder: queryBuilder.NewQueryBuilder( + queryBuilder.QueryBuilderOptions{ + BuildLogQuery: logsV4.PrepareLogsQuery, + }, + ), + } + // Update query parameters + params.Start = tc.queryParams.start + params.End = tc.queryParams.end + params.CompositeQuery.BuilderQueries["A"].Limit = tc.queryParams.limit + params.CompositeQuery.BuilderQueries["A"].Offset = tc.queryParams.offset + params.CompositeQuery.BuilderQueries["A"].PageSize = tc.queryParams.pageSize + // Execute query + results, errMap, err := q.runWindowBasedListQuery(context.Background(), params, tsRanges) + + if tc.expectedError { + require.Error(t, err) + return + } + + // Assertions + require.NoError(t, err, "Query execution failed") + require.Nil(t, errMap, "Unexpected error map in results") + require.Len(t, results, 1, "Expected exactly one result set") + + result := results[0] + require.Equal(t, "A", result.QueryName, "Incorrect query name in results") + require.Len(t, result.List, len(tc.expectedTimestamps), + "Result count mismatch: got %d results, expected %d", + len(result.List), len(tc.expectedTimestamps)) + + for i, expected := range tc.expectedTimestamps { + require.Equal(t, expected, result.List[i].Timestamp.UnixNano(), + "Timestamp mismatch at index %d: got %d, expected %d", + i, result.List[i].Timestamp.UnixNano(), expected) + } + + // Verify mock expectations + err = telemetryStore.Mock().ExpectationsWereMet() + require.NoError(t, err, "Mock expectations were not met") + }) + } +} + +func Test_querier_Logs_runWindowBasedListQueryAsc(t *testing.T) { + params := &v3.QueryRangeParamsV3{ + Start: 1722171576000000000, // July 28, 2024 6:29:36 PM + End: 1722262800000000000, // July 29, 2024 7:50:00 PM + CompositeQuery: &v3.CompositeQuery{ + PanelType: v3.PanelTypeList, + BuilderQueries: map[string]*v3.BuilderQuery{ + "A": { + QueryName: "A", + Expression: "A", + DataSource: v3.DataSourceLogs, + PageSize: 10, + Limit: 100, + StepInterval: 60, + AggregateOperator: v3.AggregateOperatorNoOp, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{}, + }, + OrderBy: []v3.OrderBy{ + { + ColumnName: "timestamp", + Order: "asc", + }, + { + ColumnName: "id", + Order: "asc", + }, + }, + }, + }, + }, + } + + tsRanges := []utils.LogsListTsRange{ + { + Start: 1722259200000000000, // July 29, 2024 6:50:00 PM + End: 1722262800000000000, // July 29, 2024 7:50:00 PM + }, + { + Start: 1722252000000000000, // July 29, 2024 4:50:00 PM + End: 1722259200000000000, // July 29, 2024 6:50:00 PM + }, + { + Start: 1722237600000000000, // July 29, 2024 12:50:00 PM + End: 1722252000000000000, // July 29, 2024 4:50:00 PM + }, + { + Start: 1722208800000000000, // July 29, 2024 4:50:00 AM + End: 1722237600000000000, // July 29, 2024 12:50:00 PM + }, + { + Start: 1722171576000000000, // July 28, 2024 6:29:36 PM + End: 1722208800000000000, // July 29, 2024 4:50:00 AM + }, + } + + type queryParams struct { + start int64 + end int64 + limit uint64 + offset uint64 + pageSize uint64 + } + + type queryResponse struct { + expectedQuery string + timestamps []uint64 + } + + // create test struct with moc data i.e array of timestamps, limit, offset and expected results + testCases := []struct { + name string + queryResponses []queryResponse + queryParams queryParams + expectedTimestamps []int64 + expectedError bool + }{ + { + name: "should return correct timestamps when querying within time window", + queryResponses: []queryResponse{ + { + expectedQuery: ".*(timestamp >= 1722171576000000000 AND timestamp <= 1722208800000000000).* asc LIMIT 2", + timestamps: []uint64{1722171576010000000, 1722171576020000000}, + }, + }, + queryParams: queryParams{ + start: 1722171576000000000, + end: 1722262800000000000, + pageSize: 2, + offset: 0, + }, + expectedTimestamps: []int64{1722171576010000000, 1722171576020000000}, + }, + { + name: "all data not in first windows", + queryResponses: []queryResponse{ + { + expectedQuery: ".*(timestamp >= 1722171576000000000 AND timestamp <= 1722208800000000000).* asc LIMIT 3", + timestamps: []uint64{1722171576001000000, 1722171576002000000}, + }, + { + expectedQuery: ".*(timestamp >= 1722208800000000000 AND timestamp <= 1722237600000000000).* asc LIMIT 1", + timestamps: []uint64{1722208800100000000}, + }, + }, + queryParams: queryParams{ + start: 1722171576000000000, + end: 1722262800000000000, + pageSize: 3, + offset: 0, + }, + expectedTimestamps: []int64{1722171576001000000, 1722171576002000000, 1722208800100000000}, + }, + { + name: "query with offset and limit- data spread across multiple windows", + queryResponses: []queryResponse{ + { + expectedQuery: ".*(timestamp >= 1722171576000000000 AND timestamp <= 1722208800000000000).* asc LIMIT 11", + timestamps: []uint64{}, + }, + { + expectedQuery: ".*(timestamp >= 1722208800000000000 AND timestamp <= 1722237600000000000).* asc LIMIT 11", + timestamps: []uint64{1722208800100000000, 1722208800200000000, 1722208800300000000}, + }, + { + expectedQuery: ".*(timestamp >= 1722237600000000000 AND timestamp <= 1722252000000000000).* asc LIMIT 8", + timestamps: []uint64{1722237600100000000, 1722237600200000000, 1722237600300000000, 1722237600400000000, 1722237600500000000}, + }, + { + expectedQuery: ".*(timestamp >= 1722252000000000000 AND timestamp <= 1722259200000000000).* asc LIMIT 3", + timestamps: []uint64{1722252000000100000, 1722252000000200000, 1722252000000300000}, + }, + }, + queryParams: queryParams{ + start: 1722171576000000000, + end: 1722262800000000000, + pageSize: 5, + offset: 6, + }, + expectedTimestamps: []int64{1722237600400000000, 1722237600500000000, 1722252000000100000, 1722252000000200000, 1722252000000300000}, + }, + } + + cols := []cmock.ColumnType{ + {Name: "timestamp", Type: "UInt64"}, + {Name: "name", Type: "String"}, + } + testName := "name" + + options := clickhouseReader.NewOptions("", "", "archiveNamespace") + + // iterate over test data, create reader and run test + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Setup mock + telemetryStore := telemetrystoretest.New(telemetrystore.Config{Provider: "clickhouse"}, sqlmock.QueryMatcherRegexp) + + // Configure mock responses + for _, response := range tc.queryResponses { + values := make([][]any, 0, len(response.timestamps)) + for _, ts := range response.timestamps { + values = append(values, []any{&ts, &testName}) + } + telemetryStore.Mock().ExpectQuery(response.expectedQuery).WillReturnRows( + cmock.NewRows(cols, values), + ) + } + + // Create reader and querier + reader := clickhouseReader.NewReaderFromClickhouseConnection( + options, + nil, + telemetryStore, + prometheustest.New(instrumentationtest.New().Logger(), prometheus.Config{}), + "", + time.Duration(time.Second), + nil, + ) + + q := &querier{ + reader: reader, + builder: queryBuilder.NewQueryBuilder( + queryBuilder.QueryBuilderOptions{ + BuildLogQuery: logsV4.PrepareLogsQuery, + }, + ), + } + // Update query parameters + params.Start = tc.queryParams.start + params.End = tc.queryParams.end + params.CompositeQuery.BuilderQueries["A"].Limit = tc.queryParams.limit + params.CompositeQuery.BuilderQueries["A"].Offset = tc.queryParams.offset + params.CompositeQuery.BuilderQueries["A"].PageSize = tc.queryParams.pageSize + + // Create a copy of tsRanges before passing to the function + // required because tsRanges is modified in the function + tsRangesCopy := make([]utils.LogsListTsRange, len(tsRanges)) + copy(tsRangesCopy, tsRanges) + // Execute query + results, errMap, err := q.runWindowBasedListQuery(context.Background(), params, tsRangesCopy) + + if tc.expectedError { + require.Error(t, err) + return + } + + // Assertions + require.NoError(t, err, "Query execution failed") + require.Nil(t, errMap, "Unexpected error map in results") + require.Len(t, results, 1, "Expected exactly one result set") + + result := results[0] + require.Equal(t, "A", result.QueryName, "Incorrect query name in results") + require.Len(t, result.List, len(tc.expectedTimestamps), + "Result count mismatch: got %d results, expected %d", + len(result.List), len(tc.expectedTimestamps)) + + for i, expected := range tc.expectedTimestamps { + require.Equal(t, expected, result.List[i].Timestamp.UnixNano(), + "Timestamp mismatch at index %d: got %d, expected %d", + i, result.List[i].Timestamp.UnixNano(), expected) + } + + // Verify mock expectations + err = telemetryStore.Mock().ExpectationsWereMet() + require.NoError(t, err, "Mock expectations were not met") + }) + } +}