mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-08-12 15:19:00 +08:00
fix: logs window based pagination to pageSize offset instead of using… (#6830)
* fix: logs window based pagination to pageSize offset instead of using id filter * fix: add support for asc * fix: remove unwanted code --------- Co-authored-by: Srikanth Chekuri <srikanth.chekuri92@gmail.com>
This commit is contained in:
parent
824302be38
commit
aadf2a3ac7
@ -284,7 +284,7 @@ func orderByAttributeKeyTags(panelType v3.PanelType, items []v3.OrderBy, tags []
|
|||||||
|
|
||||||
if len(orderByArray) == 0 {
|
if len(orderByArray) == 0 {
|
||||||
if panelType == v3.PanelTypeList {
|
if panelType == v3.PanelTypeList {
|
||||||
orderByArray = append(orderByArray, constants.TIMESTAMP+" DESC")
|
orderByArray = append(orderByArray, constants.TIMESTAMP+" DESC", "id DESC")
|
||||||
} else {
|
} else {
|
||||||
orderByArray = append(orderByArray, "value DESC")
|
orderByArray = append(orderByArray, "value DESC")
|
||||||
}
|
}
|
||||||
@ -541,6 +541,7 @@ func PrepareLogsQuery(start, end int64, queryType v3.QueryType, panelType v3.Pan
|
|||||||
return "", fmt.Errorf("max limit exceeded")
|
return "", fmt.Errorf("max limit exceeded")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// when pageSize is provided, we need to fetch the logs in chunks
|
||||||
if mq.PageSize > 0 {
|
if mq.PageSize > 0 {
|
||||||
if mq.Limit > 0 && mq.Offset+mq.PageSize > mq.Limit {
|
if mq.Limit > 0 && mq.Offset+mq.PageSize > mq.Limit {
|
||||||
query = logsV3.AddLimitToQuery(query, mq.Limit-mq.Offset)
|
query = logsV3.AddLimitToQuery(query, mq.Limit-mq.Offset)
|
||||||
@ -548,12 +549,9 @@ func PrepareLogsQuery(start, end int64, queryType v3.QueryType, panelType v3.Pan
|
|||||||
query = logsV3.AddLimitToQuery(query, mq.PageSize)
|
query = logsV3.AddLimitToQuery(query, mq.PageSize)
|
||||||
}
|
}
|
||||||
|
|
||||||
// add offset to the query only if it is not orderd by timestamp.
|
query = logsV3.AddOffsetToQuery(query, mq.Offset)
|
||||||
if !logsV3.IsOrderByTs(mq.OrderBy) {
|
|
||||||
query = logsV3.AddOffsetToQuery(query, mq.Offset)
|
|
||||||
}
|
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
|
// when pageSize is not provided, we fetch all the logs in the limit
|
||||||
query = logsV3.AddLimitToQuery(query, mq.Limit)
|
query = logsV3.AddLimitToQuery(query, mq.Limit)
|
||||||
}
|
}
|
||||||
} else if panelType == v3.PanelTypeTable {
|
} else if panelType == v3.PanelTypeTable {
|
||||||
|
@ -1097,7 +1097,7 @@ func TestPrepareLogsQuery(t *testing.T) {
|
|||||||
},
|
},
|
||||||
want: "SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body, attributes_string, attributes_number, attributes_bool, resources_string, scope_string from " +
|
want: "SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body, attributes_string, attributes_number, attributes_bool, resources_string, scope_string from " +
|
||||||
"signoz_logs.distributed_logs_v2 where (timestamp >= 1680066360726000000 AND timestamp <= 1680066458000000000) AND (ts_bucket_start >= 1680064560 AND ts_bucket_start <= 1680066458) " +
|
"signoz_logs.distributed_logs_v2 where (timestamp >= 1680066360726000000 AND timestamp <= 1680066458000000000) AND (ts_bucket_start >= 1680064560 AND ts_bucket_start <= 1680066458) " +
|
||||||
"order by timestamp desc LIMIT 1",
|
"order by timestamp desc LIMIT 1 OFFSET 0",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "Test limit greater than pageSize - order by ts",
|
name: "Test limit greater than pageSize - order by ts",
|
||||||
@ -1122,7 +1122,7 @@ func TestPrepareLogsQuery(t *testing.T) {
|
|||||||
},
|
},
|
||||||
want: "SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body, attributes_string, attributes_number, attributes_bool, resources_string, scope_string from " +
|
want: "SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body, attributes_string, attributes_number, attributes_bool, resources_string, scope_string from " +
|
||||||
"signoz_logs.distributed_logs_v2 where (timestamp >= 1680066360726000000 AND timestamp <= 1680066458000000000) AND (ts_bucket_start >= 1680064560 AND ts_bucket_start <= 1680066458) " +
|
"signoz_logs.distributed_logs_v2 where (timestamp >= 1680066360726000000 AND timestamp <= 1680066458000000000) AND (ts_bucket_start >= 1680064560 AND ts_bucket_start <= 1680066458) " +
|
||||||
"AND id < '2TNh4vp2TpiWyLt3SzuadLJF2s4' order by timestamp desc LIMIT 10",
|
"AND id < '2TNh4vp2TpiWyLt3SzuadLJF2s4' order by timestamp desc LIMIT 10 OFFSET 10",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "Test limit less than pageSize - order by custom",
|
name: "Test limit less than pageSize - order by custom",
|
||||||
|
@ -3,6 +3,7 @@ package querier
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -307,114 +308,108 @@ func (q *querier) runWindowBasedListQuery(ctx context.Context, params *v3.QueryR
|
|||||||
limit := uint64(0)
|
limit := uint64(0)
|
||||||
offset := uint64(0)
|
offset := uint64(0)
|
||||||
|
|
||||||
// se we are considering only one query
|
// Get query details and check order direction
|
||||||
for name, v := range params.CompositeQuery.BuilderQueries {
|
for name, v := range params.CompositeQuery.BuilderQueries {
|
||||||
qName = name
|
qName = name
|
||||||
pageSize = v.PageSize
|
pageSize = v.PageSize
|
||||||
|
|
||||||
// for traces specifically
|
|
||||||
limit = v.Limit
|
limit = v.Limit
|
||||||
offset = v.Offset
|
offset = v.Offset
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check if order is ascending
|
||||||
|
if strings.ToLower(string(params.CompositeQuery.BuilderQueries[qName].OrderBy[0].Order)) == "asc" {
|
||||||
|
// Reverse the time ranges for ascending order
|
||||||
|
for i, j := 0, len(tsRanges)-1; i < j; i, j = i+1, j-1 {
|
||||||
|
tsRanges[i], tsRanges[j] = tsRanges[j], tsRanges[i]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// check if it is a logs query
|
||||||
|
isLogs := false
|
||||||
|
if params.CompositeQuery.BuilderQueries[qName].DataSource == v3.DataSourceLogs {
|
||||||
|
isLogs = true
|
||||||
|
}
|
||||||
|
|
||||||
data := []*v3.Row{}
|
data := []*v3.Row{}
|
||||||
|
|
||||||
tracesLimit := limit + offset
|
limitWithOffset := limit + offset
|
||||||
|
if isLogs {
|
||||||
|
// for logs we use pageSize to define the current limit and limit to define the absolute limit
|
||||||
|
limitWithOffset = pageSize + offset
|
||||||
|
if limit > 0 && offset >= limit {
|
||||||
|
return nil, nil, fmt.Errorf("max limit exceeded")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
for _, v := range tsRanges {
|
for _, v := range tsRanges {
|
||||||
params.Start = v.Start
|
params.Start = v.Start
|
||||||
params.End = v.End
|
params.End = v.End
|
||||||
|
|
||||||
length := uint64(0)
|
length := uint64(0)
|
||||||
// this will to run only once
|
|
||||||
|
|
||||||
// appending the filter to get the next set of data
|
// max limit + offset is 10k for pagination for traces/logs
|
||||||
if params.CompositeQuery.BuilderQueries[qName].DataSource == v3.DataSourceLogs {
|
// TODO(nitya): define something for logs
|
||||||
params.CompositeQuery.BuilderQueries[qName].PageSize = pageSize - uint64(len(data))
|
if !isLogs && limitWithOffset > constants.TRACE_V4_MAX_PAGINATION_LIMIT {
|
||||||
queries, err := q.builder.PrepareQueries(params)
|
return nil, nil, fmt.Errorf("maximum traces that can be paginated is 10000")
|
||||||
if err != nil {
|
}
|
||||||
return nil, nil, err
|
|
||||||
}
|
|
||||||
for name, query := range queries {
|
|
||||||
rowList, err := q.reader.GetListResultV3(ctx, query)
|
|
||||||
if err != nil {
|
|
||||||
errs := []error{err}
|
|
||||||
errQueriesByName := map[string]error{
|
|
||||||
name: err,
|
|
||||||
}
|
|
||||||
return nil, errQueriesByName, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...))
|
|
||||||
}
|
|
||||||
length += uint64(len(rowList))
|
|
||||||
data = append(data, rowList...)
|
|
||||||
}
|
|
||||||
|
|
||||||
if length > 0 {
|
// we are updating the offset and limit based on the number of traces/logs we have found in the current timerange
|
||||||
params.CompositeQuery.BuilderQueries[qName].Filters.Items = append(params.CompositeQuery.BuilderQueries[qName].Filters.Items, v3.FilterItem{
|
// eg -
|
||||||
Key: v3.AttributeKey{
|
// 1)offset = 0, limit = 100, tsRanges = [t1, t10], [t10, 20], [t20, t30]
|
||||||
Key: "id",
|
//
|
||||||
IsColumn: true,
|
// if 100 traces/logs are there in [t1, t10] then 100 will return immediately.
|
||||||
DataType: "string",
|
// if 10 traces/logs are there in [t1, t10] then we get 10, set offset to 0 and limit to 90, search in the next timerange of [t10, 20]
|
||||||
},
|
// if we don't find any trace in [t1, t10], then we search in [t10, 20] with offset=0, limit=100
|
||||||
Operator: v3.FilterOperatorLessThan,
|
|
||||||
Value: data[len(data)-1].Data["id"],
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
if uint64(len(data)) >= pageSize {
|
//
|
||||||
break
|
// 2) offset = 50, limit = 100, tsRanges = [t1, t10], [t10, 20], [t20, t30]
|
||||||
}
|
//
|
||||||
|
// If we find 150 traces/logs with limit=150 and offset=0 in [t1, t10] then we return immediately 100 traces/logs
|
||||||
|
// If we find 50 in [t1, t10] with limit=150 and offset=0 then it will set limit = 100 and offset=0 and search in the next timerange of [t10, 20]
|
||||||
|
// if we don't find any trace in [t1, t10], then we search in [t10, 20] with limit=150 and offset=0
|
||||||
|
|
||||||
|
params.CompositeQuery.BuilderQueries[qName].Offset = 0
|
||||||
|
// if datasource is logs
|
||||||
|
if isLogs {
|
||||||
|
// for logs we use limit to define the absolute limit and pagesize to define the current limit
|
||||||
|
params.CompositeQuery.BuilderQueries[qName].PageSize = limitWithOffset
|
||||||
} else {
|
} else {
|
||||||
// TRACE
|
params.CompositeQuery.BuilderQueries[qName].Limit = limitWithOffset
|
||||||
// we are updating the offset and limit based on the number of traces we have found in the current timerange
|
}
|
||||||
// eg -
|
|
||||||
// 1)offset = 0, limit = 100, tsRanges = [t1, t10], [t10, 20], [t20, t30]
|
|
||||||
//
|
|
||||||
// if 100 traces are there in [t1, t10] then 100 will return immediately.
|
|
||||||
// if 10 traces are there in [t1, t10] then we get 10, set offset to 0 and limit to 90, search in the next timerange of [t10, 20]
|
|
||||||
// if we don't find any trace in [t1, t10], then we search in [t10, 20] with offset=0, limit=100
|
|
||||||
|
|
||||||
//
|
queries, err := q.builder.PrepareQueries(params)
|
||||||
// 2) offset = 50, limit = 100, tsRanges = [t1, t10], [t10, 20], [t20, t30]
|
if err != nil {
|
||||||
//
|
return nil, nil, err
|
||||||
// If we find 150 traces with limit=150 and offset=0 in [t1, t10] then we return immediately 100 traces
|
}
|
||||||
// If we find 50 in [t1, t10] with limit=150 and offset=0 then it will set limit = 100 and offset=0 and search in the next timerange of [t10, 20]
|
for name, query := range queries {
|
||||||
// if we don't find any trace in [t1, t10], then we search in [t10, 20] with limit=150 and offset=0
|
rowList, err := q.reader.GetListResultV3(ctx, query)
|
||||||
|
|
||||||
// max limit + offset is 10k for pagination
|
|
||||||
if tracesLimit > constants.TRACE_V4_MAX_PAGINATION_LIMIT {
|
|
||||||
return nil, nil, fmt.Errorf("maximum traces that can be paginated is 10000")
|
|
||||||
}
|
|
||||||
|
|
||||||
params.CompositeQuery.BuilderQueries[qName].Offset = 0
|
|
||||||
params.CompositeQuery.BuilderQueries[qName].Limit = tracesLimit
|
|
||||||
queries, err := q.builder.PrepareQueries(params)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
errs := []error{err}
|
||||||
}
|
errQueriesByName := map[string]error{
|
||||||
for name, query := range queries {
|
name: err,
|
||||||
rowList, err := q.reader.GetListResultV3(ctx, query)
|
|
||||||
if err != nil {
|
|
||||||
errs := []error{err}
|
|
||||||
errQueriesByName := map[string]error{
|
|
||||||
name: err,
|
|
||||||
}
|
|
||||||
return nil, errQueriesByName, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...))
|
|
||||||
}
|
}
|
||||||
length += uint64(len(rowList))
|
return nil, errQueriesByName, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...))
|
||||||
|
}
|
||||||
|
length += uint64(len(rowList))
|
||||||
|
|
||||||
// skip the traces unless offset is 0
|
// skip the traces unless offset is 0
|
||||||
for _, row := range rowList {
|
for _, row := range rowList {
|
||||||
if offset == 0 {
|
if offset == 0 {
|
||||||
data = append(data, row)
|
data = append(data, row)
|
||||||
} else {
|
} else {
|
||||||
offset--
|
offset--
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
tracesLimit = tracesLimit - length
|
}
|
||||||
|
|
||||||
if uint64(len(data)) >= limit {
|
limitWithOffset = limitWithOffset - length
|
||||||
break
|
|
||||||
}
|
if isLogs && uint64(len(data)) >= pageSize {
|
||||||
|
// for logs
|
||||||
|
break
|
||||||
|
} else if !isLogs && uint64(len(data)) >= limit {
|
||||||
|
// for traces
|
||||||
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
res = append(res, &v3.Result{
|
res = append(res, &v3.Result{
|
||||||
@ -431,12 +426,17 @@ func (q *querier) runBuilderListQueries(ctx context.Context, params *v3.QueryRan
|
|||||||
len(params.CompositeQuery.BuilderQueries) == 1 &&
|
len(params.CompositeQuery.BuilderQueries) == 1 &&
|
||||||
params.CompositeQuery.PanelType != v3.PanelTypeTrace {
|
params.CompositeQuery.PanelType != v3.PanelTypeTrace {
|
||||||
for _, v := range params.CompositeQuery.BuilderQueries {
|
for _, v := range params.CompositeQuery.BuilderQueries {
|
||||||
// only allow of logs queries with timestamp ordering desc
|
// for logs: allow only when order of timestamp and id is same
|
||||||
// TODO(nitya): allow for timestamp asc
|
if v.DataSource == v3.DataSourceTraces &&
|
||||||
if (v.DataSource == v3.DataSourceLogs || v.DataSource == v3.DataSourceTraces) &&
|
|
||||||
len(v.OrderBy) == 1 &&
|
len(v.OrderBy) == 1 &&
|
||||||
|
v.OrderBy[0].ColumnName == "timestamp" {
|
||||||
|
startEndArr := utils.GetListTsRanges(params.Start, params.End)
|
||||||
|
return q.runWindowBasedListQuery(ctx, params, startEndArr)
|
||||||
|
} else if v.DataSource == v3.DataSourceLogs &&
|
||||||
|
len(v.OrderBy) == 2 &&
|
||||||
v.OrderBy[0].ColumnName == "timestamp" &&
|
v.OrderBy[0].ColumnName == "timestamp" &&
|
||||||
v.OrderBy[0].Order == "desc" {
|
v.OrderBy[1].ColumnName == "id" &&
|
||||||
|
v.OrderBy[1].Order == v.OrderBy[0].Order {
|
||||||
startEndArr := utils.GetListTsRanges(params.Start, params.End)
|
startEndArr := utils.GetListTsRanges(params.Start, params.End)
|
||||||
return q.runWindowBasedListQuery(ctx, params, startEndArr)
|
return q.runWindowBasedListQuery(ctx, params, startEndArr)
|
||||||
}
|
}
|
||||||
|
@ -15,8 +15,10 @@ import (
|
|||||||
"github.com/SigNoz/signoz/pkg/prometheus"
|
"github.com/SigNoz/signoz/pkg/prometheus"
|
||||||
"github.com/SigNoz/signoz/pkg/prometheus/prometheustest"
|
"github.com/SigNoz/signoz/pkg/prometheus/prometheustest"
|
||||||
"github.com/SigNoz/signoz/pkg/query-service/app/clickhouseReader"
|
"github.com/SigNoz/signoz/pkg/query-service/app/clickhouseReader"
|
||||||
|
logsV4 "github.com/SigNoz/signoz/pkg/query-service/app/logs/v4"
|
||||||
"github.com/SigNoz/signoz/pkg/query-service/app/queryBuilder"
|
"github.com/SigNoz/signoz/pkg/query-service/app/queryBuilder"
|
||||||
tracesV3 "github.com/SigNoz/signoz/pkg/query-service/app/traces/v3"
|
tracesV3 "github.com/SigNoz/signoz/pkg/query-service/app/traces/v3"
|
||||||
|
tracesV4 "github.com/SigNoz/signoz/pkg/query-service/app/traces/v4"
|
||||||
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
|
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
|
||||||
"github.com/SigNoz/signoz/pkg/query-service/querycache"
|
"github.com/SigNoz/signoz/pkg/query-service/querycache"
|
||||||
"github.com/SigNoz/signoz/pkg/query-service/utils"
|
"github.com/SigNoz/signoz/pkg/query-service/utils"
|
||||||
@ -1164,7 +1166,7 @@ func TestQueryRangeValueTypePromQL(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func Test_querier_runWindowBasedListQuery(t *testing.T) {
|
func Test_querier_Traces_runWindowBasedListQueryDesc(t *testing.T) {
|
||||||
params := &v3.QueryRangeParamsV3{
|
params := &v3.QueryRangeParamsV3{
|
||||||
Start: 1722171576000000000, // July 28, 2024 6:29:36 PM
|
Start: 1722171576000000000, // July 28, 2024 6:29:36 PM
|
||||||
End: 1722262800000000000, // July 29, 2024 7:50:00 PM
|
End: 1722262800000000000, // July 29, 2024 7:50:00 PM
|
||||||
@ -1184,6 +1186,13 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) {
|
|||||||
Operator: "AND",
|
Operator: "AND",
|
||||||
Items: []v3.FilterItem{},
|
Items: []v3.FilterItem{},
|
||||||
},
|
},
|
||||||
|
OrderBy: []v3.OrderBy{
|
||||||
|
{
|
||||||
|
ColumnName: "timestamp",
|
||||||
|
IsColumn: true,
|
||||||
|
Order: "DESC",
|
||||||
|
},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -1237,7 +1246,7 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) {
|
|||||||
queryResponses: []queryResponse{
|
queryResponses: []queryResponse{
|
||||||
{
|
{
|
||||||
expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 2",
|
expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 2",
|
||||||
timestamps: []uint64{1722259300000000000, 1722259400000000000},
|
timestamps: []uint64{1722259400000000000, 1722259300000000000},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
queryParams: queryParams{
|
queryParams: queryParams{
|
||||||
@ -1246,14 +1255,14 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) {
|
|||||||
limit: 2,
|
limit: 2,
|
||||||
offset: 0,
|
offset: 0,
|
||||||
},
|
},
|
||||||
expectedTimestamps: []int64{1722259300000000000, 1722259400000000000},
|
expectedTimestamps: []int64{1722259400000000000, 1722259300000000000},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "all data not in first windows",
|
name: "all data not in first windows",
|
||||||
queryResponses: []queryResponse{
|
queryResponses: []queryResponse{
|
||||||
{
|
{
|
||||||
expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 3",
|
expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 3",
|
||||||
timestamps: []uint64{1722259300000000000, 1722259400000000000},
|
timestamps: []uint64{1722259400000000000, 1722259300000000000},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
expectedQuery: ".*(timestamp >= '1722252000000000000' AND timestamp <= '1722259200000000000').* DESC LIMIT 1",
|
expectedQuery: ".*(timestamp >= '1722252000000000000' AND timestamp <= '1722259200000000000').* DESC LIMIT 1",
|
||||||
@ -1266,14 +1275,14 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) {
|
|||||||
limit: 3,
|
limit: 3,
|
||||||
offset: 0,
|
offset: 0,
|
||||||
},
|
},
|
||||||
expectedTimestamps: []int64{1722259300000000000, 1722259400000000000, 1722253000000000000},
|
expectedTimestamps: []int64{1722259400000000000, 1722259300000000000, 1722253000000000000},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "data in multiple windows",
|
name: "data in multiple windows",
|
||||||
queryResponses: []queryResponse{
|
queryResponses: []queryResponse{
|
||||||
{
|
{
|
||||||
expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 5",
|
expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 5",
|
||||||
timestamps: []uint64{1722259300000000000, 1722259400000000000},
|
timestamps: []uint64{1722259400000000000, 1722259300000000000},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
expectedQuery: ".*(timestamp >= '1722252000000000000' AND timestamp <= '1722259200000000000').* DESC LIMIT 3",
|
expectedQuery: ".*(timestamp >= '1722252000000000000' AND timestamp <= '1722259200000000000').* DESC LIMIT 3",
|
||||||
@ -1298,18 +1307,18 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) {
|
|||||||
limit: 5,
|
limit: 5,
|
||||||
offset: 0,
|
offset: 0,
|
||||||
},
|
},
|
||||||
expectedTimestamps: []int64{1722259300000000000, 1722259400000000000, 1722253000000000000, 1722237700000000000},
|
expectedTimestamps: []int64{1722259400000000000, 1722259300000000000, 1722253000000000000, 1722237700000000000},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "query with offset",
|
name: "query with offset",
|
||||||
queryResponses: []queryResponse{
|
queryResponses: []queryResponse{
|
||||||
{
|
{
|
||||||
expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 7",
|
expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 7",
|
||||||
timestamps: []uint64{1722259210000000000, 1722259220000000000, 1722259230000000000},
|
timestamps: []uint64{1722259230000000000, 1722259220000000000, 1722259210000000000},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
expectedQuery: ".*(timestamp >= '1722252000000000000' AND timestamp <= '1722259200000000000').* DESC LIMIT 4",
|
expectedQuery: ".*(timestamp >= '1722252000000000000' AND timestamp <= '1722259200000000000').* DESC LIMIT 4",
|
||||||
timestamps: []uint64{1722253000000000000, 1722254000000000000, 1722255000000000000},
|
timestamps: []uint64{1722255000000000000, 1722254000000000000, 1722253000000000000},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
expectedQuery: ".*(timestamp >= '1722237600000000000' AND timestamp <= '1722252000000000000').* DESC LIMIT 1",
|
expectedQuery: ".*(timestamp >= '1722237600000000000' AND timestamp <= '1722252000000000000').* DESC LIMIT 1",
|
||||||
@ -1322,7 +1331,7 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) {
|
|||||||
limit: 4,
|
limit: 4,
|
||||||
offset: 3,
|
offset: 3,
|
||||||
},
|
},
|
||||||
expectedTimestamps: []int64{1722253000000000000, 1722254000000000000, 1722255000000000000, 1722237700000000000},
|
expectedTimestamps: []int64{1722255000000000000, 1722254000000000000, 1722253000000000000, 1722237700000000000},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "query with offset and limit- data spread across multiple windows",
|
name: "query with offset and limit- data spread across multiple windows",
|
||||||
@ -1333,15 +1342,15 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) {
|
|||||||
},
|
},
|
||||||
{
|
{
|
||||||
expectedQuery: ".*(timestamp >= '1722252000000000000' AND timestamp <= '1722259200000000000').* DESC LIMIT 11",
|
expectedQuery: ".*(timestamp >= '1722252000000000000' AND timestamp <= '1722259200000000000').* DESC LIMIT 11",
|
||||||
timestamps: []uint64{1722253000000000000, 1722254000000000000, 1722255000000000000},
|
timestamps: []uint64{1722255000000000000, 1722254000000000000, 1722253000000000000},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
expectedQuery: ".*(timestamp >= '1722237600000000000' AND timestamp <= '1722252000000000000').* DESC LIMIT 8",
|
expectedQuery: ".*(timestamp >= '1722237600000000000' AND timestamp <= '1722252000000000000').* DESC LIMIT 8",
|
||||||
timestamps: []uint64{1722237700000000000, 1722237800000000000, 1722237900000000000, 1722237910000000000, 1722237920000000000},
|
timestamps: []uint64{1722237920000000000, 1722237910000000000, 1722237900000000000, 1722237800000000000, 1722237700000000000},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
expectedQuery: ".*(timestamp >= '1722208800000000000' AND timestamp <= '1722237600000000000').* DESC LIMIT 3",
|
expectedQuery: ".*(timestamp >= '1722208800000000000' AND timestamp <= '1722237600000000000').* DESC LIMIT 3",
|
||||||
timestamps: []uint64{1722208810000000000, 1722208820000000000, 1722208830000000000},
|
timestamps: []uint64{1722208830000000000, 1722208820000000000, 1722208810000000000},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
queryParams: queryParams{
|
queryParams: queryParams{
|
||||||
@ -1350,7 +1359,7 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) {
|
|||||||
limit: 5,
|
limit: 5,
|
||||||
offset: 6,
|
offset: 6,
|
||||||
},
|
},
|
||||||
expectedTimestamps: []int64{1722237910000000000, 1722237920000000000, 1722208810000000000, 1722208820000000000, 1722208830000000000},
|
expectedTimestamps: []int64{1722237800000000000, 1722237700000000000, 1722208830000000000, 1722208820000000000, 1722208810000000000},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "don't allow pagination to get more than 10k spans",
|
name: "don't allow pagination to get more than 10k spans",
|
||||||
@ -1385,11 +1394,11 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) {
|
|||||||
for _, ts := range response.timestamps {
|
for _, ts := range response.timestamps {
|
||||||
values = append(values, []any{&ts, &testName})
|
values = append(values, []any{&ts, &testName})
|
||||||
}
|
}
|
||||||
|
// mock.ExpectQuery(response.expectedQuery).WillReturnRows(
|
||||||
// if len(values) > 0 {
|
// if len(values) > 0 {
|
||||||
telemetryStore.Mock().ExpectQuery(response.expectedQuery).WillReturnRows(
|
telemetryStore.Mock().ExpectQuery(response.expectedQuery).WillReturnRows(
|
||||||
cmock.NewRows(cols, values),
|
cmock.NewRows(cols, values),
|
||||||
)
|
)
|
||||||
// }
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create reader and querier
|
// Create reader and querier
|
||||||
@ -1407,7 +1416,7 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) {
|
|||||||
reader: reader,
|
reader: reader,
|
||||||
builder: queryBuilder.NewQueryBuilder(
|
builder: queryBuilder.NewQueryBuilder(
|
||||||
queryBuilder.QueryBuilderOptions{
|
queryBuilder.QueryBuilderOptions{
|
||||||
BuildTraceQuery: tracesV3.PrepareTracesQuery,
|
BuildTraceQuery: tracesV4.PrepareTracesQuery,
|
||||||
},
|
},
|
||||||
),
|
),
|
||||||
}
|
}
|
||||||
@ -1448,3 +1457,757 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func Test_querier_Traces_runWindowBasedListQueryAsc(t *testing.T) {
|
||||||
|
params := &v3.QueryRangeParamsV3{
|
||||||
|
Start: 1722171576000000000, // July 28, 2024 6:29:36 PM
|
||||||
|
End: 1722262800000000000, // July 29, 2024 7:50:00 PM
|
||||||
|
CompositeQuery: &v3.CompositeQuery{
|
||||||
|
PanelType: v3.PanelTypeList,
|
||||||
|
BuilderQueries: map[string]*v3.BuilderQuery{
|
||||||
|
"A": {
|
||||||
|
QueryName: "A",
|
||||||
|
Expression: "A",
|
||||||
|
DataSource: v3.DataSourceTraces,
|
||||||
|
PageSize: 10,
|
||||||
|
Limit: 100,
|
||||||
|
StepInterval: 60,
|
||||||
|
AggregateOperator: v3.AggregateOperatorNoOp,
|
||||||
|
SelectColumns: []v3.AttributeKey{{Key: "serviceName"}},
|
||||||
|
Filters: &v3.FilterSet{
|
||||||
|
Operator: "AND",
|
||||||
|
Items: []v3.FilterItem{},
|
||||||
|
},
|
||||||
|
OrderBy: []v3.OrderBy{
|
||||||
|
{
|
||||||
|
ColumnName: "timestamp",
|
||||||
|
IsColumn: true,
|
||||||
|
Key: "timestamp",
|
||||||
|
Order: "asc",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
tsRanges := []utils.LogsListTsRange{
|
||||||
|
{
|
||||||
|
Start: 1722259200000000000, // July 29, 2024 6:50:00 PM
|
||||||
|
End: 1722262800000000000, // July 29, 2024 7:50:00 PM
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Start: 1722252000000000000, // July 29, 2024 4:50:00 PM
|
||||||
|
End: 1722259200000000000, // July 29, 2024 6:50:00 PM
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Start: 1722237600000000000, // July 29, 2024 12:50:00 PM
|
||||||
|
End: 1722252000000000000, // July 29, 2024 4:50:00 PM
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Start: 1722208800000000000, // July 29, 2024 4:50:00 AM
|
||||||
|
End: 1722237600000000000, // July 29, 2024 12:50:00 PM
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Start: 1722171576000000000, // July 28, 2024 6:29:36 PM
|
||||||
|
End: 1722208800000000000, // July 29, 2024 4:50:00 AM
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
type queryParams struct {
|
||||||
|
start int64
|
||||||
|
end int64
|
||||||
|
limit uint64
|
||||||
|
offset uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
type queryResponse struct {
|
||||||
|
expectedQuery string
|
||||||
|
timestamps []uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
// create test struct with moc data i.e array of timestamps, limit, offset and expected results
|
||||||
|
testCases := []struct {
|
||||||
|
name string
|
||||||
|
queryResponses []queryResponse
|
||||||
|
queryParams queryParams
|
||||||
|
expectedTimestamps []int64
|
||||||
|
expectedError bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "should return correct timestamps when querying within time window",
|
||||||
|
queryResponses: []queryResponse{
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= '1722171576000000000' AND timestamp <= '1722208800000000000').* asc LIMIT 2",
|
||||||
|
timestamps: []uint64{1722171576000000000, 1722171577000000000},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
queryParams: queryParams{
|
||||||
|
start: 1722171576000000000,
|
||||||
|
end: 1722262800000000000,
|
||||||
|
limit: 2,
|
||||||
|
offset: 0,
|
||||||
|
},
|
||||||
|
expectedTimestamps: []int64{1722171576000000000, 1722171577000000000},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "all data not in first windows",
|
||||||
|
queryResponses: []queryResponse{
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= '1722171576000000000' AND timestamp <= '1722208800000000000').* asc LIMIT 3",
|
||||||
|
timestamps: []uint64{1722259200000000000, 1722259201000000000},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= '1722208800000000000' AND timestamp <= '1722237600000000000').* asc LIMIT 1",
|
||||||
|
timestamps: []uint64{1722208800100000000},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
queryParams: queryParams{
|
||||||
|
start: 1722171576000000000,
|
||||||
|
end: 1722262800000000000,
|
||||||
|
limit: 3,
|
||||||
|
offset: 0,
|
||||||
|
},
|
||||||
|
expectedTimestamps: []int64{1722259200000000000, 1722259201000000000, 1722208800100000000},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "query with offset and limit- data spread across multiple windows",
|
||||||
|
queryResponses: []queryResponse{
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= '1722171576000000000' AND timestamp <= '1722208800000000000').* asc LIMIT 11",
|
||||||
|
timestamps: []uint64{},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= '1722208800000000000' AND timestamp <= '1722237600000000000').* asc LIMIT 11",
|
||||||
|
timestamps: []uint64{1722208801000000000, 1722208802000000000, 1722208803000000000},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= '1722237600000000000' AND timestamp <= '1722252000000000000').* asc LIMIT 8",
|
||||||
|
timestamps: []uint64{1722237600010000000, 1722237600020000000, 1722237600030000000, 1722237600040000000, 1722237600050000000},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= '1722252000000000000' AND timestamp <= '1722259200000000000').* asc LIMIT 3",
|
||||||
|
timestamps: []uint64{1722252000100000000, 1722252000200000000, 1722252000300000000},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
queryParams: queryParams{
|
||||||
|
start: 1722171576000000000,
|
||||||
|
end: 1722262800000000000,
|
||||||
|
limit: 5,
|
||||||
|
offset: 6,
|
||||||
|
},
|
||||||
|
expectedTimestamps: []int64{1722237600040000000, 1722237600050000000, 1722252000100000000, 1722252000200000000, 1722252000300000000},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
cols := []cmock.ColumnType{
|
||||||
|
{Name: "timestamp", Type: "UInt64"},
|
||||||
|
{Name: "name", Type: "String"},
|
||||||
|
}
|
||||||
|
testName := "name"
|
||||||
|
|
||||||
|
options := clickhouseReader.NewOptions("", "", "archiveNamespace")
|
||||||
|
|
||||||
|
// iterate over test data, create reader and run test
|
||||||
|
for _, tc := range testCases {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
// Setup mock
|
||||||
|
telemetryStore := telemetrystoretest.New(telemetrystore.Config{Provider: "clickhouse"}, sqlmock.QueryMatcherRegexp)
|
||||||
|
|
||||||
|
// Configure mock responses
|
||||||
|
for _, response := range tc.queryResponses {
|
||||||
|
values := make([][]any, 0, len(response.timestamps))
|
||||||
|
for _, ts := range response.timestamps {
|
||||||
|
values = append(values, []any{&ts, &testName})
|
||||||
|
}
|
||||||
|
telemetryStore.Mock().ExpectQuery(response.expectedQuery).WillReturnRows(
|
||||||
|
cmock.NewRows(cols, values),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create reader and querier
|
||||||
|
reader := clickhouseReader.NewReaderFromClickhouseConnection(
|
||||||
|
options,
|
||||||
|
nil,
|
||||||
|
telemetryStore,
|
||||||
|
prometheustest.New(instrumentationtest.New().Logger(), prometheus.Config{}),
|
||||||
|
"",
|
||||||
|
time.Duration(time.Second),
|
||||||
|
nil,
|
||||||
|
)
|
||||||
|
|
||||||
|
q := &querier{
|
||||||
|
reader: reader,
|
||||||
|
builder: queryBuilder.NewQueryBuilder(
|
||||||
|
queryBuilder.QueryBuilderOptions{
|
||||||
|
BuildTraceQuery: tracesV3.PrepareTracesQuery,
|
||||||
|
},
|
||||||
|
),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update query parameters
|
||||||
|
params.Start = tc.queryParams.start
|
||||||
|
params.End = tc.queryParams.end
|
||||||
|
params.CompositeQuery.BuilderQueries["A"].Limit = tc.queryParams.limit
|
||||||
|
params.CompositeQuery.BuilderQueries["A"].Offset = tc.queryParams.offset
|
||||||
|
|
||||||
|
// Create a copy of tsRanges before passing to the function
|
||||||
|
// required because tsRanges is modified in the function
|
||||||
|
tsRangesCopy := make([]utils.LogsListTsRange, len(tsRanges))
|
||||||
|
copy(tsRangesCopy, tsRanges)
|
||||||
|
|
||||||
|
results, errMap, err := q.runWindowBasedListQuery(context.Background(), params, tsRangesCopy)
|
||||||
|
|
||||||
|
if tc.expectedError {
|
||||||
|
require.Error(t, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Assertions
|
||||||
|
require.NoError(t, err, "Query execution failed")
|
||||||
|
require.Nil(t, errMap, "Unexpected error map in results")
|
||||||
|
require.Len(t, results, 1, "Expected exactly one result set")
|
||||||
|
|
||||||
|
result := results[0]
|
||||||
|
require.Equal(t, "A", result.QueryName, "Incorrect query name in results")
|
||||||
|
require.Len(t, result.List, len(tc.expectedTimestamps),
|
||||||
|
"Result count mismatch: got %d results, expected %d",
|
||||||
|
len(result.List), len(tc.expectedTimestamps))
|
||||||
|
|
||||||
|
for i, expected := range tc.expectedTimestamps {
|
||||||
|
require.Equal(t, expected, result.List[i].Timestamp.UnixNano(),
|
||||||
|
"Timestamp mismatch at index %d: got %d, expected %d",
|
||||||
|
i, result.List[i].Timestamp.UnixNano(), expected)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify mock expectations
|
||||||
|
err = telemetryStore.Mock().ExpectationsWereMet()
|
||||||
|
require.NoError(t, err, "Mock expectations were not met")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func Test_querier_Logs_runWindowBasedListQueryDesc(t *testing.T) {
|
||||||
|
params := &v3.QueryRangeParamsV3{
|
||||||
|
Start: 1722171576000000000, // July 28, 2024 6:29:36 PM
|
||||||
|
End: 1722262800000000000, // July 29, 2024 7:50:00 PM
|
||||||
|
CompositeQuery: &v3.CompositeQuery{
|
||||||
|
PanelType: v3.PanelTypeList,
|
||||||
|
BuilderQueries: map[string]*v3.BuilderQuery{
|
||||||
|
"A": {
|
||||||
|
QueryName: "A",
|
||||||
|
Expression: "A",
|
||||||
|
DataSource: v3.DataSourceLogs,
|
||||||
|
PageSize: 10,
|
||||||
|
Limit: 100,
|
||||||
|
StepInterval: 60,
|
||||||
|
AggregateOperator: v3.AggregateOperatorNoOp,
|
||||||
|
Filters: &v3.FilterSet{
|
||||||
|
Operator: "AND",
|
||||||
|
Items: []v3.FilterItem{},
|
||||||
|
},
|
||||||
|
OrderBy: []v3.OrderBy{
|
||||||
|
{
|
||||||
|
ColumnName: "timestamp",
|
||||||
|
Order: "DESC",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
ColumnName: "id",
|
||||||
|
Order: "DESC",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
tsRanges := []utils.LogsListTsRange{
|
||||||
|
{
|
||||||
|
Start: 1722259200000000000, // July 29, 2024 6:50:00 PM
|
||||||
|
End: 1722262800000000000, // July 29, 2024 7:50:00 PM
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Start: 1722252000000000000, // July 29, 2024 4:50:00 PM
|
||||||
|
End: 1722259200000000000, // July 29, 2024 6:50:00 PM
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Start: 1722237600000000000, // July 29, 2024 12:50:00 PM
|
||||||
|
End: 1722252000000000000, // July 29, 2024 4:50:00 PM
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Start: 1722208800000000000, // July 29, 2024 4:50:00 AM
|
||||||
|
End: 1722237600000000000, // July 29, 2024 12:50:00 PM
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Start: 1722171576000000000, // July 28, 2024 6:29:36 PM
|
||||||
|
End: 1722208800000000000, // July 29, 2024 4:50:00 AM
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
type queryParams struct {
|
||||||
|
start int64
|
||||||
|
end int64
|
||||||
|
limit uint64
|
||||||
|
offset uint64
|
||||||
|
pageSize uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
type queryResponse struct {
|
||||||
|
expectedQuery string
|
||||||
|
timestamps []uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
// create test struct with moc data i.e array of timestamps, limit, offset and expected results
|
||||||
|
testCases := []struct {
|
||||||
|
name string
|
||||||
|
queryResponses []queryResponse
|
||||||
|
queryParams queryParams
|
||||||
|
expectedTimestamps []int64
|
||||||
|
expectedError bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "should return correct timestamps when querying within time window",
|
||||||
|
queryResponses: []queryResponse{
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= 1722259200000000000 AND timestamp <= 1722262800000000000).* DESC LIMIT 2",
|
||||||
|
timestamps: []uint64{1722259400000000000, 1722259300000000000},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
queryParams: queryParams{
|
||||||
|
start: 1722171576000000000,
|
||||||
|
end: 1722262800000000000,
|
||||||
|
pageSize: 2,
|
||||||
|
offset: 0,
|
||||||
|
},
|
||||||
|
expectedTimestamps: []int64{1722259400000000000, 1722259300000000000},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "all data not in first windows",
|
||||||
|
queryResponses: []queryResponse{
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= 1722259200000000000 AND timestamp <= 1722262800000000000).* DESC LIMIT 3",
|
||||||
|
timestamps: []uint64{1722259400000000000, 1722259300000000000},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= 1722252000000000000 AND timestamp <= 1722259200000000000).* DESC LIMIT 1",
|
||||||
|
timestamps: []uint64{1722253000000000000},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
queryParams: queryParams{
|
||||||
|
start: 1722171576000000000,
|
||||||
|
end: 1722262800000000000,
|
||||||
|
pageSize: 3,
|
||||||
|
offset: 0,
|
||||||
|
},
|
||||||
|
expectedTimestamps: []int64{1722259400000000000, 1722259300000000000, 1722253000000000000},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "data in multiple windows",
|
||||||
|
queryResponses: []queryResponse{
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= 1722259200000000000 AND timestamp <= 1722262800000000000).* DESC LIMIT 5",
|
||||||
|
timestamps: []uint64{1722259400000000000, 1722259300000000000},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= 1722252000000000000 AND timestamp <= 1722259200000000000).* DESC LIMIT 3",
|
||||||
|
timestamps: []uint64{1722253000000000000},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= 1722237600000000000 AND timestamp <= 1722252000000000000).* DESC LIMIT 2",
|
||||||
|
timestamps: []uint64{1722237700000000000},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= 1722208800000000000 AND timestamp <= 1722237600000000000).* DESC LIMIT 1",
|
||||||
|
timestamps: []uint64{},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= 1722171576000000000 AND timestamp <= 1722208800000000000).* DESC LIMIT 1",
|
||||||
|
timestamps: []uint64{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
queryParams: queryParams{
|
||||||
|
start: 1722171576000000000,
|
||||||
|
end: 1722262800000000000,
|
||||||
|
pageSize: 5,
|
||||||
|
offset: 0,
|
||||||
|
},
|
||||||
|
expectedTimestamps: []int64{1722259400000000000, 1722259300000000000, 1722253000000000000, 1722237700000000000},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "query with offset",
|
||||||
|
queryResponses: []queryResponse{
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= 1722259200000000000 AND timestamp <= 1722262800000000000).* DESC LIMIT 7",
|
||||||
|
timestamps: []uint64{1722259230000000000, 1722259220000000000, 1722259210000000000},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= 1722252000000000000 AND timestamp <= 1722259200000000000).* DESC LIMIT 4",
|
||||||
|
timestamps: []uint64{1722255000000000000, 1722254000000000000, 1722253000000000000},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= 1722237600000000000 AND timestamp <= 1722252000000000000).* DESC LIMIT 1",
|
||||||
|
timestamps: []uint64{1722237700000000000},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
queryParams: queryParams{
|
||||||
|
start: 1722171576000000000,
|
||||||
|
end: 1722262800000000000,
|
||||||
|
pageSize: 4,
|
||||||
|
offset: 3,
|
||||||
|
},
|
||||||
|
expectedTimestamps: []int64{1722255000000000000, 1722254000000000000, 1722253000000000000, 1722237700000000000},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "query with offset and limit- data spread across multiple windows",
|
||||||
|
queryResponses: []queryResponse{
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= 1722259200000000000 AND timestamp <= 1722262800000000000).* DESC LIMIT 11",
|
||||||
|
timestamps: []uint64{},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= 1722252000000000000 AND timestamp <= 1722259200000000000).* DESC LIMIT 11",
|
||||||
|
timestamps: []uint64{1722255000000000000, 1722254000000000000, 1722253000000000000},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= 1722237600000000000 AND timestamp <= 1722252000000000000).* DESC LIMIT 8",
|
||||||
|
timestamps: []uint64{1722237920000000000, 1722237910000000000, 1722237900000000000, 1722237800000000000, 1722237700000000000},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= 1722208800000000000 AND timestamp <= 1722237600000000000).* DESC LIMIT 3",
|
||||||
|
timestamps: []uint64{1722208830000000000, 1722208820000000000, 1722208810000000000},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
queryParams: queryParams{
|
||||||
|
start: 1722171576000000000,
|
||||||
|
end: 1722262800000000000,
|
||||||
|
pageSize: 5,
|
||||||
|
offset: 6,
|
||||||
|
},
|
||||||
|
expectedTimestamps: []int64{1722237800000000000, 1722237700000000000, 1722208830000000000, 1722208820000000000, 1722208810000000000},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "dont allow pagination to get more than speficied limit",
|
||||||
|
queryResponses: []queryResponse{},
|
||||||
|
queryParams: queryParams{
|
||||||
|
start: 1722171576000000000,
|
||||||
|
end: 1722262800000000000,
|
||||||
|
limit: 200,
|
||||||
|
offset: 210,
|
||||||
|
pageSize: 30,
|
||||||
|
},
|
||||||
|
expectedError: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
cols := []cmock.ColumnType{
|
||||||
|
{Name: "timestamp", Type: "UInt64"},
|
||||||
|
{Name: "name", Type: "String"},
|
||||||
|
}
|
||||||
|
testName := "name"
|
||||||
|
|
||||||
|
options := clickhouseReader.NewOptions("", "", "archiveNamespace")
|
||||||
|
|
||||||
|
// iterate over test data, create reader and run test
|
||||||
|
for _, tc := range testCases {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
// Setup mock
|
||||||
|
telemetryStore := telemetrystoretest.New(telemetrystore.Config{Provider: "clickhouse"}, sqlmock.QueryMatcherRegexp)
|
||||||
|
|
||||||
|
// Configure mock responses
|
||||||
|
for _, response := range tc.queryResponses {
|
||||||
|
values := make([][]any, 0, len(response.timestamps))
|
||||||
|
for _, ts := range response.timestamps {
|
||||||
|
values = append(values, []any{&ts, &testName})
|
||||||
|
}
|
||||||
|
telemetryStore.Mock().ExpectQuery(response.expectedQuery).WillReturnRows(
|
||||||
|
cmock.NewRows(cols, values),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create reader and querier
|
||||||
|
reader := clickhouseReader.NewReaderFromClickhouseConnection(
|
||||||
|
options,
|
||||||
|
nil,
|
||||||
|
telemetryStore,
|
||||||
|
prometheustest.New(instrumentationtest.New().Logger(), prometheus.Config{}),
|
||||||
|
"",
|
||||||
|
time.Duration(time.Second),
|
||||||
|
nil,
|
||||||
|
)
|
||||||
|
|
||||||
|
q := &querier{
|
||||||
|
reader: reader,
|
||||||
|
builder: queryBuilder.NewQueryBuilder(
|
||||||
|
queryBuilder.QueryBuilderOptions{
|
||||||
|
BuildLogQuery: logsV4.PrepareLogsQuery,
|
||||||
|
},
|
||||||
|
),
|
||||||
|
}
|
||||||
|
// Update query parameters
|
||||||
|
params.Start = tc.queryParams.start
|
||||||
|
params.End = tc.queryParams.end
|
||||||
|
params.CompositeQuery.BuilderQueries["A"].Limit = tc.queryParams.limit
|
||||||
|
params.CompositeQuery.BuilderQueries["A"].Offset = tc.queryParams.offset
|
||||||
|
params.CompositeQuery.BuilderQueries["A"].PageSize = tc.queryParams.pageSize
|
||||||
|
// Execute query
|
||||||
|
results, errMap, err := q.runWindowBasedListQuery(context.Background(), params, tsRanges)
|
||||||
|
|
||||||
|
if tc.expectedError {
|
||||||
|
require.Error(t, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Assertions
|
||||||
|
require.NoError(t, err, "Query execution failed")
|
||||||
|
require.Nil(t, errMap, "Unexpected error map in results")
|
||||||
|
require.Len(t, results, 1, "Expected exactly one result set")
|
||||||
|
|
||||||
|
result := results[0]
|
||||||
|
require.Equal(t, "A", result.QueryName, "Incorrect query name in results")
|
||||||
|
require.Len(t, result.List, len(tc.expectedTimestamps),
|
||||||
|
"Result count mismatch: got %d results, expected %d",
|
||||||
|
len(result.List), len(tc.expectedTimestamps))
|
||||||
|
|
||||||
|
for i, expected := range tc.expectedTimestamps {
|
||||||
|
require.Equal(t, expected, result.List[i].Timestamp.UnixNano(),
|
||||||
|
"Timestamp mismatch at index %d: got %d, expected %d",
|
||||||
|
i, result.List[i].Timestamp.UnixNano(), expected)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify mock expectations
|
||||||
|
err = telemetryStore.Mock().ExpectationsWereMet()
|
||||||
|
require.NoError(t, err, "Mock expectations were not met")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func Test_querier_Logs_runWindowBasedListQueryAsc(t *testing.T) {
|
||||||
|
params := &v3.QueryRangeParamsV3{
|
||||||
|
Start: 1722171576000000000, // July 28, 2024 6:29:36 PM
|
||||||
|
End: 1722262800000000000, // July 29, 2024 7:50:00 PM
|
||||||
|
CompositeQuery: &v3.CompositeQuery{
|
||||||
|
PanelType: v3.PanelTypeList,
|
||||||
|
BuilderQueries: map[string]*v3.BuilderQuery{
|
||||||
|
"A": {
|
||||||
|
QueryName: "A",
|
||||||
|
Expression: "A",
|
||||||
|
DataSource: v3.DataSourceLogs,
|
||||||
|
PageSize: 10,
|
||||||
|
Limit: 100,
|
||||||
|
StepInterval: 60,
|
||||||
|
AggregateOperator: v3.AggregateOperatorNoOp,
|
||||||
|
Filters: &v3.FilterSet{
|
||||||
|
Operator: "AND",
|
||||||
|
Items: []v3.FilterItem{},
|
||||||
|
},
|
||||||
|
OrderBy: []v3.OrderBy{
|
||||||
|
{
|
||||||
|
ColumnName: "timestamp",
|
||||||
|
Order: "asc",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
ColumnName: "id",
|
||||||
|
Order: "asc",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
tsRanges := []utils.LogsListTsRange{
|
||||||
|
{
|
||||||
|
Start: 1722259200000000000, // July 29, 2024 6:50:00 PM
|
||||||
|
End: 1722262800000000000, // July 29, 2024 7:50:00 PM
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Start: 1722252000000000000, // July 29, 2024 4:50:00 PM
|
||||||
|
End: 1722259200000000000, // July 29, 2024 6:50:00 PM
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Start: 1722237600000000000, // July 29, 2024 12:50:00 PM
|
||||||
|
End: 1722252000000000000, // July 29, 2024 4:50:00 PM
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Start: 1722208800000000000, // July 29, 2024 4:50:00 AM
|
||||||
|
End: 1722237600000000000, // July 29, 2024 12:50:00 PM
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Start: 1722171576000000000, // July 28, 2024 6:29:36 PM
|
||||||
|
End: 1722208800000000000, // July 29, 2024 4:50:00 AM
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
type queryParams struct {
|
||||||
|
start int64
|
||||||
|
end int64
|
||||||
|
limit uint64
|
||||||
|
offset uint64
|
||||||
|
pageSize uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
type queryResponse struct {
|
||||||
|
expectedQuery string
|
||||||
|
timestamps []uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
// create test struct with moc data i.e array of timestamps, limit, offset and expected results
|
||||||
|
testCases := []struct {
|
||||||
|
name string
|
||||||
|
queryResponses []queryResponse
|
||||||
|
queryParams queryParams
|
||||||
|
expectedTimestamps []int64
|
||||||
|
expectedError bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "should return correct timestamps when querying within time window",
|
||||||
|
queryResponses: []queryResponse{
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= 1722171576000000000 AND timestamp <= 1722208800000000000).* asc LIMIT 2",
|
||||||
|
timestamps: []uint64{1722171576010000000, 1722171576020000000},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
queryParams: queryParams{
|
||||||
|
start: 1722171576000000000,
|
||||||
|
end: 1722262800000000000,
|
||||||
|
pageSize: 2,
|
||||||
|
offset: 0,
|
||||||
|
},
|
||||||
|
expectedTimestamps: []int64{1722171576010000000, 1722171576020000000},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "all data not in first windows",
|
||||||
|
queryResponses: []queryResponse{
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= 1722171576000000000 AND timestamp <= 1722208800000000000).* asc LIMIT 3",
|
||||||
|
timestamps: []uint64{1722171576001000000, 1722171576002000000},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= 1722208800000000000 AND timestamp <= 1722237600000000000).* asc LIMIT 1",
|
||||||
|
timestamps: []uint64{1722208800100000000},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
queryParams: queryParams{
|
||||||
|
start: 1722171576000000000,
|
||||||
|
end: 1722262800000000000,
|
||||||
|
pageSize: 3,
|
||||||
|
offset: 0,
|
||||||
|
},
|
||||||
|
expectedTimestamps: []int64{1722171576001000000, 1722171576002000000, 1722208800100000000},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "query with offset and limit- data spread across multiple windows",
|
||||||
|
queryResponses: []queryResponse{
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= 1722171576000000000 AND timestamp <= 1722208800000000000).* asc LIMIT 11",
|
||||||
|
timestamps: []uint64{},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= 1722208800000000000 AND timestamp <= 1722237600000000000).* asc LIMIT 11",
|
||||||
|
timestamps: []uint64{1722208800100000000, 1722208800200000000, 1722208800300000000},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= 1722237600000000000 AND timestamp <= 1722252000000000000).* asc LIMIT 8",
|
||||||
|
timestamps: []uint64{1722237600100000000, 1722237600200000000, 1722237600300000000, 1722237600400000000, 1722237600500000000},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= 1722252000000000000 AND timestamp <= 1722259200000000000).* asc LIMIT 3",
|
||||||
|
timestamps: []uint64{1722252000000100000, 1722252000000200000, 1722252000000300000},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
queryParams: queryParams{
|
||||||
|
start: 1722171576000000000,
|
||||||
|
end: 1722262800000000000,
|
||||||
|
pageSize: 5,
|
||||||
|
offset: 6,
|
||||||
|
},
|
||||||
|
expectedTimestamps: []int64{1722237600400000000, 1722237600500000000, 1722252000000100000, 1722252000000200000, 1722252000000300000},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
cols := []cmock.ColumnType{
|
||||||
|
{Name: "timestamp", Type: "UInt64"},
|
||||||
|
{Name: "name", Type: "String"},
|
||||||
|
}
|
||||||
|
testName := "name"
|
||||||
|
|
||||||
|
options := clickhouseReader.NewOptions("", "", "archiveNamespace")
|
||||||
|
|
||||||
|
// iterate over test data, create reader and run test
|
||||||
|
for _, tc := range testCases {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
// Setup mock
|
||||||
|
telemetryStore := telemetrystoretest.New(telemetrystore.Config{Provider: "clickhouse"}, sqlmock.QueryMatcherRegexp)
|
||||||
|
|
||||||
|
// Configure mock responses
|
||||||
|
for _, response := range tc.queryResponses {
|
||||||
|
values := make([][]any, 0, len(response.timestamps))
|
||||||
|
for _, ts := range response.timestamps {
|
||||||
|
values = append(values, []any{&ts, &testName})
|
||||||
|
}
|
||||||
|
telemetryStore.Mock().ExpectQuery(response.expectedQuery).WillReturnRows(
|
||||||
|
cmock.NewRows(cols, values),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create reader and querier
|
||||||
|
reader := clickhouseReader.NewReaderFromClickhouseConnection(
|
||||||
|
options,
|
||||||
|
nil,
|
||||||
|
telemetryStore,
|
||||||
|
prometheustest.New(instrumentationtest.New().Logger(), prometheus.Config{}),
|
||||||
|
"",
|
||||||
|
time.Duration(time.Second),
|
||||||
|
nil,
|
||||||
|
)
|
||||||
|
|
||||||
|
q := &querier{
|
||||||
|
reader: reader,
|
||||||
|
builder: queryBuilder.NewQueryBuilder(
|
||||||
|
queryBuilder.QueryBuilderOptions{
|
||||||
|
BuildLogQuery: logsV4.PrepareLogsQuery,
|
||||||
|
},
|
||||||
|
),
|
||||||
|
}
|
||||||
|
// Update query parameters
|
||||||
|
params.Start = tc.queryParams.start
|
||||||
|
params.End = tc.queryParams.end
|
||||||
|
params.CompositeQuery.BuilderQueries["A"].Limit = tc.queryParams.limit
|
||||||
|
params.CompositeQuery.BuilderQueries["A"].Offset = tc.queryParams.offset
|
||||||
|
params.CompositeQuery.BuilderQueries["A"].PageSize = tc.queryParams.pageSize
|
||||||
|
|
||||||
|
// Create a copy of tsRanges before passing to the function
|
||||||
|
// required because tsRanges is modified in the function
|
||||||
|
tsRangesCopy := make([]utils.LogsListTsRange, len(tsRanges))
|
||||||
|
copy(tsRangesCopy, tsRanges)
|
||||||
|
// Execute query
|
||||||
|
results, errMap, err := q.runWindowBasedListQuery(context.Background(), params, tsRangesCopy)
|
||||||
|
|
||||||
|
if tc.expectedError {
|
||||||
|
require.Error(t, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Assertions
|
||||||
|
require.NoError(t, err, "Query execution failed")
|
||||||
|
require.Nil(t, errMap, "Unexpected error map in results")
|
||||||
|
require.Len(t, results, 1, "Expected exactly one result set")
|
||||||
|
|
||||||
|
result := results[0]
|
||||||
|
require.Equal(t, "A", result.QueryName, "Incorrect query name in results")
|
||||||
|
require.Len(t, result.List, len(tc.expectedTimestamps),
|
||||||
|
"Result count mismatch: got %d results, expected %d",
|
||||||
|
len(result.List), len(tc.expectedTimestamps))
|
||||||
|
|
||||||
|
for i, expected := range tc.expectedTimestamps {
|
||||||
|
require.Equal(t, expected, result.List[i].Timestamp.UnixNano(),
|
||||||
|
"Timestamp mismatch at index %d: got %d, expected %d",
|
||||||
|
i, result.List[i].Timestamp.UnixNano(), expected)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify mock expectations
|
||||||
|
err = telemetryStore.Mock().ExpectationsWereMet()
|
||||||
|
require.NoError(t, err, "Mock expectations were not met")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -3,6 +3,7 @@ package v2
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -308,114 +309,108 @@ func (q *querier) runWindowBasedListQuery(ctx context.Context, params *v3.QueryR
|
|||||||
limit := uint64(0)
|
limit := uint64(0)
|
||||||
offset := uint64(0)
|
offset := uint64(0)
|
||||||
|
|
||||||
// se we are considering only one query
|
// Get query details and check order direction
|
||||||
for name, v := range params.CompositeQuery.BuilderQueries {
|
for name, v := range params.CompositeQuery.BuilderQueries {
|
||||||
qName = name
|
qName = name
|
||||||
pageSize = v.PageSize
|
pageSize = v.PageSize
|
||||||
|
|
||||||
// for traces specifically
|
|
||||||
limit = v.Limit
|
limit = v.Limit
|
||||||
offset = v.Offset
|
offset = v.Offset
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check if order is ascending
|
||||||
|
if strings.ToLower(string(params.CompositeQuery.BuilderQueries[qName].OrderBy[0].Order)) == "asc" {
|
||||||
|
// Reverse the time ranges for ascending order
|
||||||
|
for i, j := 0, len(tsRanges)-1; i < j; i, j = i+1, j-1 {
|
||||||
|
tsRanges[i], tsRanges[j] = tsRanges[j], tsRanges[i]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// check if it is a logs query
|
||||||
|
isLogs := false
|
||||||
|
if params.CompositeQuery.BuilderQueries[qName].DataSource == v3.DataSourceLogs {
|
||||||
|
isLogs = true
|
||||||
|
}
|
||||||
|
|
||||||
data := []*v3.Row{}
|
data := []*v3.Row{}
|
||||||
|
|
||||||
tracesLimit := limit + offset
|
limitWithOffset := limit + offset
|
||||||
|
if isLogs {
|
||||||
|
// for logs we use pageSize to define the current limit and limit to define the absolute limit
|
||||||
|
limitWithOffset = pageSize + offset
|
||||||
|
if limit > 0 && offset >= limit {
|
||||||
|
return nil, nil, fmt.Errorf("max limit exceeded")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
for _, v := range tsRanges {
|
for _, v := range tsRanges {
|
||||||
params.Start = v.Start
|
params.Start = v.Start
|
||||||
params.End = v.End
|
params.End = v.End
|
||||||
|
|
||||||
length := uint64(0)
|
length := uint64(0)
|
||||||
// this will to run only once
|
|
||||||
|
|
||||||
// appending the filter to get the next set of data
|
// max limit + offset is 10k for pagination for traces/logs
|
||||||
if params.CompositeQuery.BuilderQueries[qName].DataSource == v3.DataSourceLogs {
|
// TODO(nitya): define something for logs
|
||||||
params.CompositeQuery.BuilderQueries[qName].PageSize = pageSize - uint64(len(data))
|
if !isLogs && limitWithOffset > constants.TRACE_V4_MAX_PAGINATION_LIMIT {
|
||||||
queries, err := q.builder.PrepareQueries(params)
|
return nil, nil, fmt.Errorf("maximum traces that can be paginated is 10000")
|
||||||
if err != nil {
|
}
|
||||||
return nil, nil, err
|
|
||||||
}
|
|
||||||
for name, query := range queries {
|
|
||||||
rowList, err := q.reader.GetListResultV3(ctx, query)
|
|
||||||
if err != nil {
|
|
||||||
errs := []error{err}
|
|
||||||
errQueriesByName := map[string]error{
|
|
||||||
name: err,
|
|
||||||
}
|
|
||||||
return nil, errQueriesByName, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...))
|
|
||||||
}
|
|
||||||
length += uint64(len(rowList))
|
|
||||||
data = append(data, rowList...)
|
|
||||||
}
|
|
||||||
|
|
||||||
if length > 0 {
|
// we are updating the offset and limit based on the number of traces/logs we have found in the current timerange
|
||||||
params.CompositeQuery.BuilderQueries[qName].Filters.Items = append(params.CompositeQuery.BuilderQueries[qName].Filters.Items, v3.FilterItem{
|
// eg -
|
||||||
Key: v3.AttributeKey{
|
// 1)offset = 0, limit = 100, tsRanges = [t1, t10], [t10, 20], [t20, t30]
|
||||||
Key: "id",
|
//
|
||||||
IsColumn: true,
|
// if 100 traces/logs are there in [t1, t10] then 100 will return immediately.
|
||||||
DataType: "string",
|
// if 10 traces/logs are there in [t1, t10] then we get 10, set offset to 0 and limit to 90, search in the next timerange of [t10, 20]
|
||||||
},
|
// if we don't find any trace in [t1, t10], then we search in [t10, 20] with offset=0, limit=100
|
||||||
Operator: v3.FilterOperatorLessThan,
|
|
||||||
Value: data[len(data)-1].Data["id"],
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
if uint64(len(data)) >= pageSize {
|
//
|
||||||
break
|
// 2) offset = 50, limit = 100, tsRanges = [t1, t10], [t10, 20], [t20, t30]
|
||||||
}
|
//
|
||||||
|
// If we find 150 traces/logs with limit=150 and offset=0 in [t1, t10] then we return immediately 100 traces/logs
|
||||||
|
// If we find 50 in [t1, t10] with limit=150 and offset=0 then it will set limit = 100 and offset=0 and search in the next timerange of [t10, 20]
|
||||||
|
// if we don't find any trace in [t1, t10], then we search in [t10, 20] with limit=150 and offset=0
|
||||||
|
|
||||||
|
params.CompositeQuery.BuilderQueries[qName].Offset = 0
|
||||||
|
// if datasource is logs
|
||||||
|
if isLogs {
|
||||||
|
// for logs we use limit to define the absolute limit and pagesize to define the current limit
|
||||||
|
params.CompositeQuery.BuilderQueries[qName].PageSize = limitWithOffset
|
||||||
} else {
|
} else {
|
||||||
// TRACE
|
params.CompositeQuery.BuilderQueries[qName].Limit = limitWithOffset
|
||||||
// we are updating the offset and limit based on the number of traces we have found in the current timerange
|
}
|
||||||
// eg -
|
|
||||||
// 1)offset = 0, limit = 100, tsRanges = [t1, t10], [t10, 20], [t20, t30]
|
|
||||||
//
|
|
||||||
// if 100 traces are there in [t1, t10] then 100 will return immediately.
|
|
||||||
// if 10 traces are there in [t1, t10] then we get 10, set offset to 0 and limit to 90, search in the next timerange of [t10, 20]
|
|
||||||
// if we don't find any trace in [t1, t10], then we search in [t10, 20] with offset=0, limit=100
|
|
||||||
|
|
||||||
//
|
queries, err := q.builder.PrepareQueries(params)
|
||||||
// 2) offset = 50, limit = 100, tsRanges = [t1, t10], [t10, 20], [t20, t30]
|
if err != nil {
|
||||||
//
|
return nil, nil, err
|
||||||
// If we find 150 traces with limit=150 and offset=0 in [t1, t10] then we return immediately 100 traces
|
}
|
||||||
// If we find 50 in [t1, t10] with limit=150 and offset=0 then it will set limit = 100 and offset=0 and search in the next timerange of [t10, 20]
|
for name, query := range queries {
|
||||||
// if we don't find any trace in [t1, t10], then we search in [t10, 20] with limit=150 and offset=0
|
rowList, err := q.reader.GetListResultV3(ctx, query)
|
||||||
|
|
||||||
// max limit + offset is 10k for pagination
|
|
||||||
if tracesLimit > constants.TRACE_V4_MAX_PAGINATION_LIMIT {
|
|
||||||
return nil, nil, fmt.Errorf("maximum traces that can be paginated is 10000")
|
|
||||||
}
|
|
||||||
|
|
||||||
params.CompositeQuery.BuilderQueries[qName].Offset = 0
|
|
||||||
params.CompositeQuery.BuilderQueries[qName].Limit = tracesLimit
|
|
||||||
queries, err := q.builder.PrepareQueries(params)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
errs := []error{err}
|
||||||
}
|
errQueriesByName := map[string]error{
|
||||||
for name, query := range queries {
|
name: err,
|
||||||
rowList, err := q.reader.GetListResultV3(ctx, query)
|
|
||||||
if err != nil {
|
|
||||||
errs := []error{err}
|
|
||||||
errQueriesByName := map[string]error{
|
|
||||||
name: err,
|
|
||||||
}
|
|
||||||
return nil, errQueriesByName, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...))
|
|
||||||
}
|
}
|
||||||
length += uint64(len(rowList))
|
return nil, errQueriesByName, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...))
|
||||||
|
}
|
||||||
|
length += uint64(len(rowList))
|
||||||
|
|
||||||
// skip the traces unless offset is 0
|
// skip the traces unless offset is 0
|
||||||
for _, row := range rowList {
|
for _, row := range rowList {
|
||||||
if offset == 0 {
|
if offset == 0 {
|
||||||
data = append(data, row)
|
data = append(data, row)
|
||||||
} else {
|
} else {
|
||||||
offset--
|
offset--
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
tracesLimit = tracesLimit - length
|
}
|
||||||
|
|
||||||
if uint64(len(data)) >= limit {
|
limitWithOffset = limitWithOffset - length
|
||||||
break
|
|
||||||
}
|
if isLogs && uint64(len(data)) >= pageSize {
|
||||||
|
// for logs
|
||||||
|
break
|
||||||
|
} else if !isLogs && uint64(len(data)) >= limit {
|
||||||
|
// for traces
|
||||||
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
res = append(res, &v3.Result{
|
res = append(res, &v3.Result{
|
||||||
@ -432,12 +427,18 @@ func (q *querier) runBuilderListQueries(ctx context.Context, params *v3.QueryRan
|
|||||||
len(params.CompositeQuery.BuilderQueries) == 1 &&
|
len(params.CompositeQuery.BuilderQueries) == 1 &&
|
||||||
params.CompositeQuery.PanelType != v3.PanelTypeTrace {
|
params.CompositeQuery.PanelType != v3.PanelTypeTrace {
|
||||||
for _, v := range params.CompositeQuery.BuilderQueries {
|
for _, v := range params.CompositeQuery.BuilderQueries {
|
||||||
// only allow of logs queries with timestamp ordering desc
|
|
||||||
// TODO(nitya): allow for timestamp asc
|
// for logs: allow only when order of timestamp and id is same
|
||||||
if (v.DataSource == v3.DataSourceLogs || v.DataSource == v3.DataSourceTraces) &&
|
if v.DataSource == v3.DataSourceTraces &&
|
||||||
len(v.OrderBy) == 1 &&
|
len(v.OrderBy) == 1 &&
|
||||||
|
v.OrderBy[0].ColumnName == "timestamp" {
|
||||||
|
startEndArr := utils.GetListTsRanges(params.Start, params.End)
|
||||||
|
return q.runWindowBasedListQuery(ctx, params, startEndArr)
|
||||||
|
} else if v.DataSource == v3.DataSourceLogs &&
|
||||||
|
len(v.OrderBy) == 2 &&
|
||||||
v.OrderBy[0].ColumnName == "timestamp" &&
|
v.OrderBy[0].ColumnName == "timestamp" &&
|
||||||
v.OrderBy[0].Order == "desc" {
|
v.OrderBy[1].ColumnName == "id" &&
|
||||||
|
v.OrderBy[1].Order == v.OrderBy[0].Order {
|
||||||
startEndArr := utils.GetListTsRanges(params.Start, params.End)
|
startEndArr := utils.GetListTsRanges(params.Start, params.End)
|
||||||
return q.runWindowBasedListQuery(ctx, params, startEndArr)
|
return q.runWindowBasedListQuery(ctx, params, startEndArr)
|
||||||
}
|
}
|
||||||
|
@ -15,8 +15,10 @@ import (
|
|||||||
"github.com/SigNoz/signoz/pkg/prometheus"
|
"github.com/SigNoz/signoz/pkg/prometheus"
|
||||||
"github.com/SigNoz/signoz/pkg/prometheus/prometheustest"
|
"github.com/SigNoz/signoz/pkg/prometheus/prometheustest"
|
||||||
"github.com/SigNoz/signoz/pkg/query-service/app/clickhouseReader"
|
"github.com/SigNoz/signoz/pkg/query-service/app/clickhouseReader"
|
||||||
|
logsV4 "github.com/SigNoz/signoz/pkg/query-service/app/logs/v4"
|
||||||
"github.com/SigNoz/signoz/pkg/query-service/app/queryBuilder"
|
"github.com/SigNoz/signoz/pkg/query-service/app/queryBuilder"
|
||||||
tracesV3 "github.com/SigNoz/signoz/pkg/query-service/app/traces/v3"
|
tracesV3 "github.com/SigNoz/signoz/pkg/query-service/app/traces/v3"
|
||||||
|
tracesV4 "github.com/SigNoz/signoz/pkg/query-service/app/traces/v4"
|
||||||
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
|
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
|
||||||
"github.com/SigNoz/signoz/pkg/query-service/querycache"
|
"github.com/SigNoz/signoz/pkg/query-service/querycache"
|
||||||
"github.com/SigNoz/signoz/pkg/query-service/utils"
|
"github.com/SigNoz/signoz/pkg/query-service/utils"
|
||||||
@ -1217,7 +1219,7 @@ func TestV2QueryRangeValueTypePromQL(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func Test_querier_runWindowBasedListQuery(t *testing.T) {
|
func Test_querier_Traces_runWindowBasedListQueryDesc(t *testing.T) {
|
||||||
params := &v3.QueryRangeParamsV3{
|
params := &v3.QueryRangeParamsV3{
|
||||||
Start: 1722171576000000000, // July 28, 2024 6:29:36 PM
|
Start: 1722171576000000000, // July 28, 2024 6:29:36 PM
|
||||||
End: 1722262800000000000, // July 29, 2024 7:50:00 PM
|
End: 1722262800000000000, // July 29, 2024 7:50:00 PM
|
||||||
@ -1237,6 +1239,13 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) {
|
|||||||
Operator: "AND",
|
Operator: "AND",
|
||||||
Items: []v3.FilterItem{},
|
Items: []v3.FilterItem{},
|
||||||
},
|
},
|
||||||
|
OrderBy: []v3.OrderBy{
|
||||||
|
{
|
||||||
|
ColumnName: "timestamp",
|
||||||
|
IsColumn: true,
|
||||||
|
Order: "DESC",
|
||||||
|
},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -1290,7 +1299,7 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) {
|
|||||||
queryResponses: []queryResponse{
|
queryResponses: []queryResponse{
|
||||||
{
|
{
|
||||||
expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 2",
|
expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 2",
|
||||||
timestamps: []uint64{1722259300000000000, 1722259400000000000},
|
timestamps: []uint64{1722259400000000000, 1722259300000000000},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
queryParams: queryParams{
|
queryParams: queryParams{
|
||||||
@ -1299,14 +1308,14 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) {
|
|||||||
limit: 2,
|
limit: 2,
|
||||||
offset: 0,
|
offset: 0,
|
||||||
},
|
},
|
||||||
expectedTimestamps: []int64{1722259300000000000, 1722259400000000000},
|
expectedTimestamps: []int64{1722259400000000000, 1722259300000000000},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "all data not in first windows",
|
name: "all data not in first windows",
|
||||||
queryResponses: []queryResponse{
|
queryResponses: []queryResponse{
|
||||||
{
|
{
|
||||||
expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 3",
|
expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 3",
|
||||||
timestamps: []uint64{1722259300000000000, 1722259400000000000},
|
timestamps: []uint64{1722259400000000000, 1722259300000000000},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
expectedQuery: ".*(timestamp >= '1722252000000000000' AND timestamp <= '1722259200000000000').* DESC LIMIT 1",
|
expectedQuery: ".*(timestamp >= '1722252000000000000' AND timestamp <= '1722259200000000000').* DESC LIMIT 1",
|
||||||
@ -1319,14 +1328,14 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) {
|
|||||||
limit: 3,
|
limit: 3,
|
||||||
offset: 0,
|
offset: 0,
|
||||||
},
|
},
|
||||||
expectedTimestamps: []int64{1722259300000000000, 1722259400000000000, 1722253000000000000},
|
expectedTimestamps: []int64{1722259400000000000, 1722259300000000000, 1722253000000000000},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "data in multiple windows",
|
name: "data in multiple windows",
|
||||||
queryResponses: []queryResponse{
|
queryResponses: []queryResponse{
|
||||||
{
|
{
|
||||||
expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 5",
|
expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 5",
|
||||||
timestamps: []uint64{1722259300000000000, 1722259400000000000},
|
timestamps: []uint64{1722259400000000000, 1722259300000000000},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
expectedQuery: ".*(timestamp >= '1722252000000000000' AND timestamp <= '1722259200000000000').* DESC LIMIT 3",
|
expectedQuery: ".*(timestamp >= '1722252000000000000' AND timestamp <= '1722259200000000000').* DESC LIMIT 3",
|
||||||
@ -1351,18 +1360,18 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) {
|
|||||||
limit: 5,
|
limit: 5,
|
||||||
offset: 0,
|
offset: 0,
|
||||||
},
|
},
|
||||||
expectedTimestamps: []int64{1722259300000000000, 1722259400000000000, 1722253000000000000, 1722237700000000000},
|
expectedTimestamps: []int64{1722259400000000000, 1722259300000000000, 1722253000000000000, 1722237700000000000},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "query with offset",
|
name: "query with offset",
|
||||||
queryResponses: []queryResponse{
|
queryResponses: []queryResponse{
|
||||||
{
|
{
|
||||||
expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 7",
|
expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 7",
|
||||||
timestamps: []uint64{1722259210000000000, 1722259220000000000, 1722259230000000000},
|
timestamps: []uint64{1722259230000000000, 1722259220000000000, 1722259210000000000},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
expectedQuery: ".*(timestamp >= '1722252000000000000' AND timestamp <= '1722259200000000000').* DESC LIMIT 4",
|
expectedQuery: ".*(timestamp >= '1722252000000000000' AND timestamp <= '1722259200000000000').* DESC LIMIT 4",
|
||||||
timestamps: []uint64{1722253000000000000, 1722254000000000000, 1722255000000000000},
|
timestamps: []uint64{1722255000000000000, 1722254000000000000, 1722253000000000000},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
expectedQuery: ".*(timestamp >= '1722237600000000000' AND timestamp <= '1722252000000000000').* DESC LIMIT 1",
|
expectedQuery: ".*(timestamp >= '1722237600000000000' AND timestamp <= '1722252000000000000').* DESC LIMIT 1",
|
||||||
@ -1375,7 +1384,7 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) {
|
|||||||
limit: 4,
|
limit: 4,
|
||||||
offset: 3,
|
offset: 3,
|
||||||
},
|
},
|
||||||
expectedTimestamps: []int64{1722253000000000000, 1722254000000000000, 1722255000000000000, 1722237700000000000},
|
expectedTimestamps: []int64{1722255000000000000, 1722254000000000000, 1722253000000000000, 1722237700000000000},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "query with offset and limit- data spread across multiple windows",
|
name: "query with offset and limit- data spread across multiple windows",
|
||||||
@ -1386,15 +1395,15 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) {
|
|||||||
},
|
},
|
||||||
{
|
{
|
||||||
expectedQuery: ".*(timestamp >= '1722252000000000000' AND timestamp <= '1722259200000000000').* DESC LIMIT 11",
|
expectedQuery: ".*(timestamp >= '1722252000000000000' AND timestamp <= '1722259200000000000').* DESC LIMIT 11",
|
||||||
timestamps: []uint64{1722253000000000000, 1722254000000000000, 1722255000000000000},
|
timestamps: []uint64{1722255000000000000, 1722254000000000000, 1722253000000000000},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
expectedQuery: ".*(timestamp >= '1722237600000000000' AND timestamp <= '1722252000000000000').* DESC LIMIT 8",
|
expectedQuery: ".*(timestamp >= '1722237600000000000' AND timestamp <= '1722252000000000000').* DESC LIMIT 8",
|
||||||
timestamps: []uint64{1722237700000000000, 1722237800000000000, 1722237900000000000, 1722237910000000000, 1722237920000000000},
|
timestamps: []uint64{1722237920000000000, 1722237910000000000, 1722237900000000000, 1722237800000000000, 1722237700000000000},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
expectedQuery: ".*(timestamp >= '1722208800000000000' AND timestamp <= '1722237600000000000').* DESC LIMIT 3",
|
expectedQuery: ".*(timestamp >= '1722208800000000000' AND timestamp <= '1722237600000000000').* DESC LIMIT 3",
|
||||||
timestamps: []uint64{1722208810000000000, 1722208820000000000, 1722208830000000000},
|
timestamps: []uint64{1722208830000000000, 1722208820000000000, 1722208810000000000},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
queryParams: queryParams{
|
queryParams: queryParams{
|
||||||
@ -1403,7 +1412,7 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) {
|
|||||||
limit: 5,
|
limit: 5,
|
||||||
offset: 6,
|
offset: 6,
|
||||||
},
|
},
|
||||||
expectedTimestamps: []int64{1722237910000000000, 1722237920000000000, 1722208810000000000, 1722208820000000000, 1722208830000000000},
|
expectedTimestamps: []int64{1722237800000000000, 1722237700000000000, 1722208830000000000, 1722208820000000000, 1722208810000000000},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "don't allow pagination to get more than 10k spans",
|
name: "don't allow pagination to get more than 10k spans",
|
||||||
@ -1442,7 +1451,6 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) {
|
|||||||
telemetryStore.Mock().ExpectQuery(response.expectedQuery).WillReturnRows(
|
telemetryStore.Mock().ExpectQuery(response.expectedQuery).WillReturnRows(
|
||||||
cmock.NewRows(cols, values),
|
cmock.NewRows(cols, values),
|
||||||
)
|
)
|
||||||
// }
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create reader and querier
|
// Create reader and querier
|
||||||
@ -1460,7 +1468,7 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) {
|
|||||||
reader: reader,
|
reader: reader,
|
||||||
builder: queryBuilder.NewQueryBuilder(
|
builder: queryBuilder.NewQueryBuilder(
|
||||||
queryBuilder.QueryBuilderOptions{
|
queryBuilder.QueryBuilderOptions{
|
||||||
BuildTraceQuery: tracesV3.PrepareTracesQuery,
|
BuildTraceQuery: tracesV4.PrepareTracesQuery,
|
||||||
},
|
},
|
||||||
),
|
),
|
||||||
}
|
}
|
||||||
@ -1501,3 +1509,756 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func Test_querier_Traces_runWindowBasedListQueryAsc(t *testing.T) {
|
||||||
|
params := &v3.QueryRangeParamsV3{
|
||||||
|
Start: 1722171576000000000, // July 28, 2024 6:29:36 PM
|
||||||
|
End: 1722262800000000000, // July 29, 2024 7:50:00 PM
|
||||||
|
CompositeQuery: &v3.CompositeQuery{
|
||||||
|
PanelType: v3.PanelTypeList,
|
||||||
|
BuilderQueries: map[string]*v3.BuilderQuery{
|
||||||
|
"A": {
|
||||||
|
QueryName: "A",
|
||||||
|
Expression: "A",
|
||||||
|
DataSource: v3.DataSourceTraces,
|
||||||
|
PageSize: 10,
|
||||||
|
Limit: 100,
|
||||||
|
StepInterval: 60,
|
||||||
|
AggregateOperator: v3.AggregateOperatorNoOp,
|
||||||
|
SelectColumns: []v3.AttributeKey{{Key: "serviceName"}},
|
||||||
|
Filters: &v3.FilterSet{
|
||||||
|
Operator: "AND",
|
||||||
|
Items: []v3.FilterItem{},
|
||||||
|
},
|
||||||
|
OrderBy: []v3.OrderBy{
|
||||||
|
{
|
||||||
|
ColumnName: "timestamp",
|
||||||
|
IsColumn: true,
|
||||||
|
Key: "timestamp",
|
||||||
|
Order: "asc",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
tsRanges := []utils.LogsListTsRange{
|
||||||
|
{
|
||||||
|
Start: 1722259200000000000, // July 29, 2024 6:50:00 PM
|
||||||
|
End: 1722262800000000000, // July 29, 2024 7:50:00 PM
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Start: 1722252000000000000, // July 29, 2024 4:50:00 PM
|
||||||
|
End: 1722259200000000000, // July 29, 2024 6:50:00 PM
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Start: 1722237600000000000, // July 29, 2024 12:50:00 PM
|
||||||
|
End: 1722252000000000000, // July 29, 2024 4:50:00 PM
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Start: 1722208800000000000, // July 29, 2024 4:50:00 AM
|
||||||
|
End: 1722237600000000000, // July 29, 2024 12:50:00 PM
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Start: 1722171576000000000, // July 28, 2024 6:29:36 PM
|
||||||
|
End: 1722208800000000000, // July 29, 2024 4:50:00 AM
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
type queryParams struct {
|
||||||
|
start int64
|
||||||
|
end int64
|
||||||
|
limit uint64
|
||||||
|
offset uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
type queryResponse struct {
|
||||||
|
expectedQuery string
|
||||||
|
timestamps []uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
// create test struct with moc data i.e array of timestamps, limit, offset and expected results
|
||||||
|
testCases := []struct {
|
||||||
|
name string
|
||||||
|
queryResponses []queryResponse
|
||||||
|
queryParams queryParams
|
||||||
|
expectedTimestamps []int64
|
||||||
|
expectedError bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "should return correct timestamps when querying within time window",
|
||||||
|
queryResponses: []queryResponse{
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= '1722171576000000000' AND timestamp <= '1722208800000000000').* asc LIMIT 2",
|
||||||
|
timestamps: []uint64{1722171576000000000, 1722171577000000000},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
queryParams: queryParams{
|
||||||
|
start: 1722171576000000000,
|
||||||
|
end: 1722262800000000000,
|
||||||
|
limit: 2,
|
||||||
|
offset: 0,
|
||||||
|
},
|
||||||
|
expectedTimestamps: []int64{1722171576000000000, 1722171577000000000},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "all data not in first windows",
|
||||||
|
queryResponses: []queryResponse{
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= '1722171576000000000' AND timestamp <= '1722208800000000000').* asc LIMIT 3",
|
||||||
|
timestamps: []uint64{1722259200000000000, 1722259201000000000},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= '1722208800000000000' AND timestamp <= '1722237600000000000').* asc LIMIT 1",
|
||||||
|
timestamps: []uint64{1722208800100000000},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
queryParams: queryParams{
|
||||||
|
start: 1722171576000000000,
|
||||||
|
end: 1722262800000000000,
|
||||||
|
limit: 3,
|
||||||
|
offset: 0,
|
||||||
|
},
|
||||||
|
expectedTimestamps: []int64{1722259200000000000, 1722259201000000000, 1722208800100000000},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "query with offset and limit- data spread across multiple windows",
|
||||||
|
queryResponses: []queryResponse{
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= '1722171576000000000' AND timestamp <= '1722208800000000000').* asc LIMIT 11",
|
||||||
|
timestamps: []uint64{},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= '1722208800000000000' AND timestamp <= '1722237600000000000').* asc LIMIT 11",
|
||||||
|
timestamps: []uint64{1722208801000000000, 1722208802000000000, 1722208803000000000},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= '1722237600000000000' AND timestamp <= '1722252000000000000').* asc LIMIT 8",
|
||||||
|
timestamps: []uint64{1722237600010000000, 1722237600020000000, 1722237600030000000, 1722237600040000000, 1722237600050000000},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= '1722252000000000000' AND timestamp <= '1722259200000000000').* asc LIMIT 3",
|
||||||
|
timestamps: []uint64{1722252000100000000, 1722252000200000000, 1722252000300000000},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
queryParams: queryParams{
|
||||||
|
start: 1722171576000000000,
|
||||||
|
end: 1722262800000000000,
|
||||||
|
limit: 5,
|
||||||
|
offset: 6,
|
||||||
|
},
|
||||||
|
expectedTimestamps: []int64{1722237600040000000, 1722237600050000000, 1722252000100000000, 1722252000200000000, 1722252000300000000},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
cols := []cmock.ColumnType{
|
||||||
|
{Name: "timestamp", Type: "UInt64"},
|
||||||
|
{Name: "name", Type: "String"},
|
||||||
|
}
|
||||||
|
testName := "name"
|
||||||
|
|
||||||
|
options := clickhouseReader.NewOptions("", "", "archiveNamespace")
|
||||||
|
|
||||||
|
// iterate over test data, create reader and run test
|
||||||
|
for _, tc := range testCases {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
// Setup mock
|
||||||
|
telemetryStore := telemetrystoretest.New(telemetrystore.Config{Provider: "clickhouse"}, sqlmock.QueryMatcherRegexp)
|
||||||
|
|
||||||
|
// Configure mock responses
|
||||||
|
for _, response := range tc.queryResponses {
|
||||||
|
values := make([][]any, 0, len(response.timestamps))
|
||||||
|
for _, ts := range response.timestamps {
|
||||||
|
values = append(values, []any{&ts, &testName})
|
||||||
|
}
|
||||||
|
telemetryStore.Mock().ExpectQuery(response.expectedQuery).WillReturnRows(
|
||||||
|
cmock.NewRows(cols, values),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create reader and querier
|
||||||
|
reader := clickhouseReader.NewReaderFromClickhouseConnection(
|
||||||
|
options,
|
||||||
|
nil,
|
||||||
|
telemetryStore,
|
||||||
|
prometheustest.New(instrumentationtest.New().Logger(), prometheus.Config{}),
|
||||||
|
"",
|
||||||
|
time.Duration(time.Second),
|
||||||
|
nil,
|
||||||
|
)
|
||||||
|
|
||||||
|
q := &querier{
|
||||||
|
reader: reader,
|
||||||
|
builder: queryBuilder.NewQueryBuilder(
|
||||||
|
queryBuilder.QueryBuilderOptions{
|
||||||
|
BuildTraceQuery: tracesV4.PrepareTracesQuery,
|
||||||
|
},
|
||||||
|
),
|
||||||
|
}
|
||||||
|
// Update query parameters
|
||||||
|
params.Start = tc.queryParams.start
|
||||||
|
params.End = tc.queryParams.end
|
||||||
|
params.CompositeQuery.BuilderQueries["A"].Limit = tc.queryParams.limit
|
||||||
|
params.CompositeQuery.BuilderQueries["A"].Offset = tc.queryParams.offset
|
||||||
|
|
||||||
|
// Create a copy of tsRanges before passing to the function
|
||||||
|
// required because tsRanges is modified in the function
|
||||||
|
tsRangesCopy := make([]utils.LogsListTsRange, len(tsRanges))
|
||||||
|
copy(tsRangesCopy, tsRanges)
|
||||||
|
|
||||||
|
results, errMap, err := q.runWindowBasedListQuery(context.Background(), params, tsRangesCopy)
|
||||||
|
|
||||||
|
if tc.expectedError {
|
||||||
|
require.Error(t, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Assertions
|
||||||
|
require.NoError(t, err, "Query execution failed")
|
||||||
|
require.Nil(t, errMap, "Unexpected error map in results")
|
||||||
|
require.Len(t, results, 1, "Expected exactly one result set")
|
||||||
|
|
||||||
|
result := results[0]
|
||||||
|
require.Equal(t, "A", result.QueryName, "Incorrect query name in results")
|
||||||
|
require.Len(t, result.List, len(tc.expectedTimestamps),
|
||||||
|
"Result count mismatch: got %d results, expected %d",
|
||||||
|
len(result.List), len(tc.expectedTimestamps))
|
||||||
|
|
||||||
|
for i, expected := range tc.expectedTimestamps {
|
||||||
|
require.Equal(t, expected, result.List[i].Timestamp.UnixNano(),
|
||||||
|
"Timestamp mismatch at index %d: got %d, expected %d",
|
||||||
|
i, result.List[i].Timestamp.UnixNano(), expected)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify mock expectations
|
||||||
|
err = telemetryStore.Mock().ExpectationsWereMet()
|
||||||
|
require.NoError(t, err, "Mock expectations were not met")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func Test_querier_Logs_runWindowBasedListQueryDesc(t *testing.T) {
|
||||||
|
params := &v3.QueryRangeParamsV3{
|
||||||
|
Start: 1722171576000000000, // July 28, 2024 6:29:36 PM
|
||||||
|
End: 1722262800000000000, // July 29, 2024 7:50:00 PM
|
||||||
|
CompositeQuery: &v3.CompositeQuery{
|
||||||
|
PanelType: v3.PanelTypeList,
|
||||||
|
BuilderQueries: map[string]*v3.BuilderQuery{
|
||||||
|
"A": {
|
||||||
|
QueryName: "A",
|
||||||
|
Expression: "A",
|
||||||
|
DataSource: v3.DataSourceLogs,
|
||||||
|
PageSize: 10,
|
||||||
|
Limit: 100,
|
||||||
|
StepInterval: 60,
|
||||||
|
AggregateOperator: v3.AggregateOperatorNoOp,
|
||||||
|
Filters: &v3.FilterSet{
|
||||||
|
Operator: "AND",
|
||||||
|
Items: []v3.FilterItem{},
|
||||||
|
},
|
||||||
|
OrderBy: []v3.OrderBy{
|
||||||
|
{
|
||||||
|
ColumnName: "timestamp",
|
||||||
|
Order: "DESC",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
ColumnName: "id",
|
||||||
|
Order: "DESC",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
tsRanges := []utils.LogsListTsRange{
|
||||||
|
{
|
||||||
|
Start: 1722259200000000000, // July 29, 2024 6:50:00 PM
|
||||||
|
End: 1722262800000000000, // July 29, 2024 7:50:00 PM
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Start: 1722252000000000000, // July 29, 2024 4:50:00 PM
|
||||||
|
End: 1722259200000000000, // July 29, 2024 6:50:00 PM
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Start: 1722237600000000000, // July 29, 2024 12:50:00 PM
|
||||||
|
End: 1722252000000000000, // July 29, 2024 4:50:00 PM
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Start: 1722208800000000000, // July 29, 2024 4:50:00 AM
|
||||||
|
End: 1722237600000000000, // July 29, 2024 12:50:00 PM
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Start: 1722171576000000000, // July 28, 2024 6:29:36 PM
|
||||||
|
End: 1722208800000000000, // July 29, 2024 4:50:00 AM
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
type queryParams struct {
|
||||||
|
start int64
|
||||||
|
end int64
|
||||||
|
limit uint64
|
||||||
|
offset uint64
|
||||||
|
pageSize uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
type queryResponse struct {
|
||||||
|
expectedQuery string
|
||||||
|
timestamps []uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
// create test struct with moc data i.e array of timestamps, limit, offset and expected results
|
||||||
|
testCases := []struct {
|
||||||
|
name string
|
||||||
|
queryResponses []queryResponse
|
||||||
|
queryParams queryParams
|
||||||
|
expectedTimestamps []int64
|
||||||
|
expectedError bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "should return correct timestamps when querying within time window",
|
||||||
|
queryResponses: []queryResponse{
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= 1722259200000000000 AND timestamp <= 1722262800000000000).* DESC LIMIT 2",
|
||||||
|
timestamps: []uint64{1722259400000000000, 1722259300000000000},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
queryParams: queryParams{
|
||||||
|
start: 1722171576000000000,
|
||||||
|
end: 1722262800000000000,
|
||||||
|
pageSize: 2,
|
||||||
|
offset: 0,
|
||||||
|
},
|
||||||
|
expectedTimestamps: []int64{1722259400000000000, 1722259300000000000},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "all data not in first windows",
|
||||||
|
queryResponses: []queryResponse{
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= 1722259200000000000 AND timestamp <= 1722262800000000000).* DESC LIMIT 3",
|
||||||
|
timestamps: []uint64{1722259400000000000, 1722259300000000000},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= 1722252000000000000 AND timestamp <= 1722259200000000000).* DESC LIMIT 1",
|
||||||
|
timestamps: []uint64{1722253000000000000},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
queryParams: queryParams{
|
||||||
|
start: 1722171576000000000,
|
||||||
|
end: 1722262800000000000,
|
||||||
|
pageSize: 3,
|
||||||
|
offset: 0,
|
||||||
|
},
|
||||||
|
expectedTimestamps: []int64{1722259400000000000, 1722259300000000000, 1722253000000000000},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "data in multiple windows",
|
||||||
|
queryResponses: []queryResponse{
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= 1722259200000000000 AND timestamp <= 1722262800000000000).* DESC LIMIT 5",
|
||||||
|
timestamps: []uint64{1722259400000000000, 1722259300000000000},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= 1722252000000000000 AND timestamp <= 1722259200000000000).* DESC LIMIT 3",
|
||||||
|
timestamps: []uint64{1722253000000000000},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= 1722237600000000000 AND timestamp <= 1722252000000000000).* DESC LIMIT 2",
|
||||||
|
timestamps: []uint64{1722237700000000000},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= 1722208800000000000 AND timestamp <= 1722237600000000000).* DESC LIMIT 1",
|
||||||
|
timestamps: []uint64{},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= 1722171576000000000 AND timestamp <= 1722208800000000000).* DESC LIMIT 1",
|
||||||
|
timestamps: []uint64{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
queryParams: queryParams{
|
||||||
|
start: 1722171576000000000,
|
||||||
|
end: 1722262800000000000,
|
||||||
|
pageSize: 5,
|
||||||
|
offset: 0,
|
||||||
|
},
|
||||||
|
expectedTimestamps: []int64{1722259400000000000, 1722259300000000000, 1722253000000000000, 1722237700000000000},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "query with offset",
|
||||||
|
queryResponses: []queryResponse{
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= 1722259200000000000 AND timestamp <= 1722262800000000000).* DESC LIMIT 7",
|
||||||
|
timestamps: []uint64{1722259230000000000, 1722259220000000000, 1722259210000000000},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= 1722252000000000000 AND timestamp <= 1722259200000000000).* DESC LIMIT 4",
|
||||||
|
timestamps: []uint64{1722255000000000000, 1722254000000000000, 1722253000000000000},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= 1722237600000000000 AND timestamp <= 1722252000000000000).* DESC LIMIT 1",
|
||||||
|
timestamps: []uint64{1722237700000000000},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
queryParams: queryParams{
|
||||||
|
start: 1722171576000000000,
|
||||||
|
end: 1722262800000000000,
|
||||||
|
pageSize: 4,
|
||||||
|
offset: 3,
|
||||||
|
},
|
||||||
|
expectedTimestamps: []int64{1722255000000000000, 1722254000000000000, 1722253000000000000, 1722237700000000000},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "query with offset and limit- data spread across multiple windows",
|
||||||
|
queryResponses: []queryResponse{
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= 1722259200000000000 AND timestamp <= 1722262800000000000).* DESC LIMIT 11",
|
||||||
|
timestamps: []uint64{},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= 1722252000000000000 AND timestamp <= 1722259200000000000).* DESC LIMIT 11",
|
||||||
|
timestamps: []uint64{1722255000000000000, 1722254000000000000, 1722253000000000000},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= 1722237600000000000 AND timestamp <= 1722252000000000000).* DESC LIMIT 8",
|
||||||
|
timestamps: []uint64{1722237920000000000, 1722237910000000000, 1722237900000000000, 1722237800000000000, 1722237700000000000},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= 1722208800000000000 AND timestamp <= 1722237600000000000).* DESC LIMIT 3",
|
||||||
|
timestamps: []uint64{1722208830000000000, 1722208820000000000, 1722208810000000000},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
queryParams: queryParams{
|
||||||
|
start: 1722171576000000000,
|
||||||
|
end: 1722262800000000000,
|
||||||
|
pageSize: 5,
|
||||||
|
offset: 6,
|
||||||
|
},
|
||||||
|
expectedTimestamps: []int64{1722237800000000000, 1722237700000000000, 1722208830000000000, 1722208820000000000, 1722208810000000000},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "dont allow pagination to get more than speficied limit",
|
||||||
|
queryResponses: []queryResponse{},
|
||||||
|
queryParams: queryParams{
|
||||||
|
start: 1722171576000000000,
|
||||||
|
end: 1722262800000000000,
|
||||||
|
limit: 200,
|
||||||
|
offset: 210,
|
||||||
|
pageSize: 30,
|
||||||
|
},
|
||||||
|
expectedError: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
cols := []cmock.ColumnType{
|
||||||
|
{Name: "timestamp", Type: "UInt64"},
|
||||||
|
{Name: "name", Type: "String"},
|
||||||
|
}
|
||||||
|
testName := "name"
|
||||||
|
|
||||||
|
options := clickhouseReader.NewOptions("", "", "archiveNamespace")
|
||||||
|
|
||||||
|
// iterate over test data, create reader and run test
|
||||||
|
for _, tc := range testCases {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
// Setup mock
|
||||||
|
telemetryStore := telemetrystoretest.New(telemetrystore.Config{Provider: "clickhouse"}, sqlmock.QueryMatcherRegexp)
|
||||||
|
|
||||||
|
// Configure mock responses
|
||||||
|
for _, response := range tc.queryResponses {
|
||||||
|
values := make([][]any, 0, len(response.timestamps))
|
||||||
|
for _, ts := range response.timestamps {
|
||||||
|
values = append(values, []any{&ts, &testName})
|
||||||
|
}
|
||||||
|
telemetryStore.Mock().ExpectQuery(response.expectedQuery).WillReturnRows(
|
||||||
|
cmock.NewRows(cols, values),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create reader and querier
|
||||||
|
reader := clickhouseReader.NewReaderFromClickhouseConnection(
|
||||||
|
options,
|
||||||
|
nil,
|
||||||
|
telemetryStore,
|
||||||
|
prometheustest.New(instrumentationtest.New().Logger(), prometheus.Config{}),
|
||||||
|
"",
|
||||||
|
time.Duration(time.Second),
|
||||||
|
nil,
|
||||||
|
)
|
||||||
|
|
||||||
|
q := &querier{
|
||||||
|
reader: reader,
|
||||||
|
builder: queryBuilder.NewQueryBuilder(
|
||||||
|
queryBuilder.QueryBuilderOptions{
|
||||||
|
BuildLogQuery: logsV4.PrepareLogsQuery,
|
||||||
|
},
|
||||||
|
),
|
||||||
|
}
|
||||||
|
// Update query parameters
|
||||||
|
params.Start = tc.queryParams.start
|
||||||
|
params.End = tc.queryParams.end
|
||||||
|
params.CompositeQuery.BuilderQueries["A"].Limit = tc.queryParams.limit
|
||||||
|
params.CompositeQuery.BuilderQueries["A"].Offset = tc.queryParams.offset
|
||||||
|
params.CompositeQuery.BuilderQueries["A"].PageSize = tc.queryParams.pageSize
|
||||||
|
// Execute query
|
||||||
|
results, errMap, err := q.runWindowBasedListQuery(context.Background(), params, tsRanges)
|
||||||
|
|
||||||
|
if tc.expectedError {
|
||||||
|
require.Error(t, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Assertions
|
||||||
|
require.NoError(t, err, "Query execution failed")
|
||||||
|
require.Nil(t, errMap, "Unexpected error map in results")
|
||||||
|
require.Len(t, results, 1, "Expected exactly one result set")
|
||||||
|
|
||||||
|
result := results[0]
|
||||||
|
require.Equal(t, "A", result.QueryName, "Incorrect query name in results")
|
||||||
|
require.Len(t, result.List, len(tc.expectedTimestamps),
|
||||||
|
"Result count mismatch: got %d results, expected %d",
|
||||||
|
len(result.List), len(tc.expectedTimestamps))
|
||||||
|
|
||||||
|
for i, expected := range tc.expectedTimestamps {
|
||||||
|
require.Equal(t, expected, result.List[i].Timestamp.UnixNano(),
|
||||||
|
"Timestamp mismatch at index %d: got %d, expected %d",
|
||||||
|
i, result.List[i].Timestamp.UnixNano(), expected)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify mock expectations
|
||||||
|
err = telemetryStore.Mock().ExpectationsWereMet()
|
||||||
|
require.NoError(t, err, "Mock expectations were not met")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func Test_querier_Logs_runWindowBasedListQueryAsc(t *testing.T) {
|
||||||
|
params := &v3.QueryRangeParamsV3{
|
||||||
|
Start: 1722171576000000000, // July 28, 2024 6:29:36 PM
|
||||||
|
End: 1722262800000000000, // July 29, 2024 7:50:00 PM
|
||||||
|
CompositeQuery: &v3.CompositeQuery{
|
||||||
|
PanelType: v3.PanelTypeList,
|
||||||
|
BuilderQueries: map[string]*v3.BuilderQuery{
|
||||||
|
"A": {
|
||||||
|
QueryName: "A",
|
||||||
|
Expression: "A",
|
||||||
|
DataSource: v3.DataSourceLogs,
|
||||||
|
PageSize: 10,
|
||||||
|
Limit: 100,
|
||||||
|
StepInterval: 60,
|
||||||
|
AggregateOperator: v3.AggregateOperatorNoOp,
|
||||||
|
Filters: &v3.FilterSet{
|
||||||
|
Operator: "AND",
|
||||||
|
Items: []v3.FilterItem{},
|
||||||
|
},
|
||||||
|
OrderBy: []v3.OrderBy{
|
||||||
|
{
|
||||||
|
ColumnName: "timestamp",
|
||||||
|
Order: "asc",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
ColumnName: "id",
|
||||||
|
Order: "asc",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
tsRanges := []utils.LogsListTsRange{
|
||||||
|
{
|
||||||
|
Start: 1722259200000000000, // July 29, 2024 6:50:00 PM
|
||||||
|
End: 1722262800000000000, // July 29, 2024 7:50:00 PM
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Start: 1722252000000000000, // July 29, 2024 4:50:00 PM
|
||||||
|
End: 1722259200000000000, // July 29, 2024 6:50:00 PM
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Start: 1722237600000000000, // July 29, 2024 12:50:00 PM
|
||||||
|
End: 1722252000000000000, // July 29, 2024 4:50:00 PM
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Start: 1722208800000000000, // July 29, 2024 4:50:00 AM
|
||||||
|
End: 1722237600000000000, // July 29, 2024 12:50:00 PM
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Start: 1722171576000000000, // July 28, 2024 6:29:36 PM
|
||||||
|
End: 1722208800000000000, // July 29, 2024 4:50:00 AM
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
type queryParams struct {
|
||||||
|
start int64
|
||||||
|
end int64
|
||||||
|
limit uint64
|
||||||
|
offset uint64
|
||||||
|
pageSize uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
type queryResponse struct {
|
||||||
|
expectedQuery string
|
||||||
|
timestamps []uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
// create test struct with moc data i.e array of timestamps, limit, offset and expected results
|
||||||
|
testCases := []struct {
|
||||||
|
name string
|
||||||
|
queryResponses []queryResponse
|
||||||
|
queryParams queryParams
|
||||||
|
expectedTimestamps []int64
|
||||||
|
expectedError bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "should return correct timestamps when querying within time window",
|
||||||
|
queryResponses: []queryResponse{
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= 1722171576000000000 AND timestamp <= 1722208800000000000).* asc LIMIT 2",
|
||||||
|
timestamps: []uint64{1722171576010000000, 1722171576020000000},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
queryParams: queryParams{
|
||||||
|
start: 1722171576000000000,
|
||||||
|
end: 1722262800000000000,
|
||||||
|
pageSize: 2,
|
||||||
|
offset: 0,
|
||||||
|
},
|
||||||
|
expectedTimestamps: []int64{1722171576010000000, 1722171576020000000},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "all data not in first windows",
|
||||||
|
queryResponses: []queryResponse{
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= 1722171576000000000 AND timestamp <= 1722208800000000000).* asc LIMIT 3",
|
||||||
|
timestamps: []uint64{1722171576001000000, 1722171576002000000},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= 1722208800000000000 AND timestamp <= 1722237600000000000).* asc LIMIT 1",
|
||||||
|
timestamps: []uint64{1722208800100000000},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
queryParams: queryParams{
|
||||||
|
start: 1722171576000000000,
|
||||||
|
end: 1722262800000000000,
|
||||||
|
pageSize: 3,
|
||||||
|
offset: 0,
|
||||||
|
},
|
||||||
|
expectedTimestamps: []int64{1722171576001000000, 1722171576002000000, 1722208800100000000},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "query with offset and limit- data spread across multiple windows",
|
||||||
|
queryResponses: []queryResponse{
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= 1722171576000000000 AND timestamp <= 1722208800000000000).* asc LIMIT 11",
|
||||||
|
timestamps: []uint64{},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= 1722208800000000000 AND timestamp <= 1722237600000000000).* asc LIMIT 11",
|
||||||
|
timestamps: []uint64{1722208800100000000, 1722208800200000000, 1722208800300000000},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= 1722237600000000000 AND timestamp <= 1722252000000000000).* asc LIMIT 8",
|
||||||
|
timestamps: []uint64{1722237600100000000, 1722237600200000000, 1722237600300000000, 1722237600400000000, 1722237600500000000},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
expectedQuery: ".*(timestamp >= 1722252000000000000 AND timestamp <= 1722259200000000000).* asc LIMIT 3",
|
||||||
|
timestamps: []uint64{1722252000000100000, 1722252000000200000, 1722252000000300000},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
queryParams: queryParams{
|
||||||
|
start: 1722171576000000000,
|
||||||
|
end: 1722262800000000000,
|
||||||
|
pageSize: 5,
|
||||||
|
offset: 6,
|
||||||
|
},
|
||||||
|
expectedTimestamps: []int64{1722237600400000000, 1722237600500000000, 1722252000000100000, 1722252000000200000, 1722252000000300000},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
cols := []cmock.ColumnType{
|
||||||
|
{Name: "timestamp", Type: "UInt64"},
|
||||||
|
{Name: "name", Type: "String"},
|
||||||
|
}
|
||||||
|
testName := "name"
|
||||||
|
|
||||||
|
options := clickhouseReader.NewOptions("", "", "archiveNamespace")
|
||||||
|
|
||||||
|
// iterate over test data, create reader and run test
|
||||||
|
for _, tc := range testCases {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
// Setup mock
|
||||||
|
telemetryStore := telemetrystoretest.New(telemetrystore.Config{Provider: "clickhouse"}, sqlmock.QueryMatcherRegexp)
|
||||||
|
|
||||||
|
// Configure mock responses
|
||||||
|
for _, response := range tc.queryResponses {
|
||||||
|
values := make([][]any, 0, len(response.timestamps))
|
||||||
|
for _, ts := range response.timestamps {
|
||||||
|
values = append(values, []any{&ts, &testName})
|
||||||
|
}
|
||||||
|
telemetryStore.Mock().ExpectQuery(response.expectedQuery).WillReturnRows(
|
||||||
|
cmock.NewRows(cols, values),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create reader and querier
|
||||||
|
reader := clickhouseReader.NewReaderFromClickhouseConnection(
|
||||||
|
options,
|
||||||
|
nil,
|
||||||
|
telemetryStore,
|
||||||
|
prometheustest.New(instrumentationtest.New().Logger(), prometheus.Config{}),
|
||||||
|
"",
|
||||||
|
time.Duration(time.Second),
|
||||||
|
nil,
|
||||||
|
)
|
||||||
|
|
||||||
|
q := &querier{
|
||||||
|
reader: reader,
|
||||||
|
builder: queryBuilder.NewQueryBuilder(
|
||||||
|
queryBuilder.QueryBuilderOptions{
|
||||||
|
BuildLogQuery: logsV4.PrepareLogsQuery,
|
||||||
|
},
|
||||||
|
),
|
||||||
|
}
|
||||||
|
// Update query parameters
|
||||||
|
params.Start = tc.queryParams.start
|
||||||
|
params.End = tc.queryParams.end
|
||||||
|
params.CompositeQuery.BuilderQueries["A"].Limit = tc.queryParams.limit
|
||||||
|
params.CompositeQuery.BuilderQueries["A"].Offset = tc.queryParams.offset
|
||||||
|
params.CompositeQuery.BuilderQueries["A"].PageSize = tc.queryParams.pageSize
|
||||||
|
|
||||||
|
// Create a copy of tsRanges before passing to the function
|
||||||
|
// required because tsRanges is modified in the function
|
||||||
|
tsRangesCopy := make([]utils.LogsListTsRange, len(tsRanges))
|
||||||
|
copy(tsRangesCopy, tsRanges)
|
||||||
|
// Execute query
|
||||||
|
results, errMap, err := q.runWindowBasedListQuery(context.Background(), params, tsRangesCopy)
|
||||||
|
|
||||||
|
if tc.expectedError {
|
||||||
|
require.Error(t, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Assertions
|
||||||
|
require.NoError(t, err, "Query execution failed")
|
||||||
|
require.Nil(t, errMap, "Unexpected error map in results")
|
||||||
|
require.Len(t, results, 1, "Expected exactly one result set")
|
||||||
|
|
||||||
|
result := results[0]
|
||||||
|
require.Equal(t, "A", result.QueryName, "Incorrect query name in results")
|
||||||
|
require.Len(t, result.List, len(tc.expectedTimestamps),
|
||||||
|
"Result count mismatch: got %d results, expected %d",
|
||||||
|
len(result.List), len(tc.expectedTimestamps))
|
||||||
|
|
||||||
|
for i, expected := range tc.expectedTimestamps {
|
||||||
|
require.Equal(t, expected, result.List[i].Timestamp.UnixNano(),
|
||||||
|
"Timestamp mismatch at index %d: got %d, expected %d",
|
||||||
|
i, result.List[i].Timestamp.UnixNano(), expected)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify mock expectations
|
||||||
|
err = telemetryStore.Mock().ExpectationsWereMet()
|
||||||
|
require.NoError(t, err, "Mock expectations were not met")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user