feat: support for timeshift in logs (#4607)

* feat: support for timeshift in logs

* fix: post process the timeshift function result

* fix: start and end times adjusted

* fix: only apply functions instead of entire post process

* fix: unnecessary error handling removed

* fix: apply functions for all sources

* feat: test added for timeshift

* fix: comments corrected

---------

Co-authored-by: Srikanth Chekuri <srikanth.chekuri92@gmail.com>
This commit is contained in:
Nityananda Gohain 2024-03-08 21:12:53 +05:30 committed by GitHub
parent e519539468
commit d0d10daa44
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 312 additions and 39 deletions

View File

@ -3331,6 +3331,12 @@ func (aH *APIHandler) queryRangeV3(ctx context.Context, queryRangeParams *v3.Que
applyMetricLimit(result, queryRangeParams)
// only adding applyFunctions instead of postProcess since experssion are
// are executed in clickhouse directly and we wanted to add support for timeshift
if queryRangeParams.CompositeQuery.QueryType == v3.QueryTypeBuilder {
applyFunctions(result, queryRangeParams)
}
resp := v3.QueryRangeResponse{
Result: result,
}
@ -3605,7 +3611,7 @@ func applyFunctions(results []*v3.Result, queryRangeParams *v3.QueryRangeParamsV
for idx, result := range results {
builderQueries := queryRangeParams.CompositeQuery.BuilderQueries
if builderQueries != nil && (builderQueries[result.QueryName].DataSource == v3.DataSourceMetrics) {
if builderQueries != nil {
functions := builderQueries[result.QueryName].Functions
for _, function := range functions {

View File

@ -21,11 +21,6 @@ func PrepareMetricQuery(start, end int64, queryType v3.QueryType, panelType v3.P
start, end = common.AdjustedMetricTimeRange(start, end, mq.StepInterval, mq.TimeAggregation)
if mq.ShiftBy != 0 {
start = start - mq.ShiftBy*1000
end = end - mq.ShiftBy*1000
}
var quantile float64
if v3.IsPercentileOperator(mq.SpaceAggregation) &&

View File

@ -90,11 +90,18 @@ func (q *querier) runBuilderQuery(
preferRPM = q.featureLookUp.CheckFeature(constants.PreferRPM) == nil
}
start := params.Start
end := params.End
if builderQuery.ShiftBy != 0 {
start = start - builderQuery.ShiftBy*1000
end = end - builderQuery.ShiftBy*1000
}
if builderQuery.DataSource == v3.DataSourceLogs {
var query string
var err error
if _, ok := cacheKeys[queryName]; !ok {
query, err = prepareLogsQuery(ctx, params.Start, params.End, builderQuery, params, preferRPM)
query, err = prepareLogsQuery(ctx, start, end, builderQuery, params, preferRPM)
if err != nil {
ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil}
return
@ -114,7 +121,7 @@ func (q *querier) runBuilderQuery(
cachedData = data
}
}
misses := q.findMissingTimeRanges(params.Start, params.End, params.Step, cachedData)
misses := q.findMissingTimeRanges(start, end, params.Step, cachedData)
missedSeries := make([]*v3.Series, 0)
cachedSeries := make([]*v3.Series, 0)
for _, miss := range misses {
@ -152,7 +159,7 @@ func (q *querier) runBuilderQuery(
}
// response doesn't need everything
filterCachedPoints(mergedSeries, params.Start, params.End)
filterCachedPoints(mergedSeries, start, end)
ch <- channelResult{
Err: nil,
@ -181,8 +188,8 @@ func (q *querier) runBuilderQuery(
// for ts query with group by and limit form two queries
if params.CompositeQuery.PanelType == v3.PanelTypeGraph && builderQuery.Limit > 0 && len(builderQuery.GroupBy) > 0 {
limitQuery, err := tracesV3.PrepareTracesQuery(
params.Start,
params.End,
start,
end,
params.CompositeQuery.PanelType,
builderQuery,
keys,
@ -193,8 +200,8 @@ func (q *querier) runBuilderQuery(
return
}
placeholderQuery, err := tracesV3.PrepareTracesQuery(
params.Start,
params.End,
start,
end,
params.CompositeQuery.PanelType,
builderQuery,
keys,
@ -207,8 +214,8 @@ func (q *querier) runBuilderQuery(
query = fmt.Sprintf(placeholderQuery, limitQuery)
} else {
query, err = tracesV3.PrepareTracesQuery(
params.Start,
params.End,
start,
end,
params.CompositeQuery.PanelType,
builderQuery,
keys,
@ -229,7 +236,7 @@ func (q *querier) runBuilderQuery(
// We are only caching the graph panel queries. A non-existant cache key means that the query is not cached.
// If the query is not cached, we execute the query and return the result without caching it.
if _, ok := cacheKeys[queryName]; !ok {
query, err := metricsV3.PrepareMetricQuery(params.Start, params.End, params.CompositeQuery.QueryType, params.CompositeQuery.PanelType, builderQuery, metricsV3.Options{PreferRPM: preferRPM})
query, err := metricsV3.PrepareMetricQuery(start, end, params.CompositeQuery.QueryType, params.CompositeQuery.PanelType, builderQuery, metricsV3.Options{PreferRPM: preferRPM})
if err != nil {
ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil}
return
@ -249,7 +256,7 @@ func (q *querier) runBuilderQuery(
cachedData = data
}
}
misses := q.findMissingTimeRanges(params.Start, params.End, params.Step, cachedData)
misses := q.findMissingTimeRanges(start, end, params.Step, cachedData)
missedSeries := make([]*v3.Series, 0)
cachedSeries := make([]*v3.Series, 0)
for _, miss := range misses {
@ -298,7 +305,7 @@ func (q *querier) runBuilderQuery(
}
// response doesn't need everything
filterCachedPoints(mergedSeries, params.Start, params.End)
filterCachedPoints(mergedSeries, start, end)
ch <- channelResult{
Err: nil,
Name: queryName,

View File

@ -701,3 +701,253 @@ func TestQueryRangeValueType(t *testing.T) {
}
}
}
// test timeshift
func TestQueryRangeTimeShift(t *testing.T) {
params := []*v3.QueryRangeParamsV3{
{
Start: 1675115596722, //31, 3:23
End: 1675115596722 + 120*60*1000, //31, 5:23
Step: 5 * time.Minute.Milliseconds(),
CompositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypeBuilder,
PanelType: v3.PanelTypeGraph,
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
QueryName: "A",
StepInterval: 60,
DataSource: v3.DataSourceLogs,
AggregateAttribute: v3.AttributeKey{},
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{},
},
AggregateOperator: v3.AggregateOperatorCount,
Expression: "A",
ShiftBy: 86400,
},
},
},
},
}
opts := QuerierOptions{
Reader: nil,
FluxInterval: 5 * time.Minute,
KeyGenerator: queryBuilder.NewKeyGenerator(),
TestingMode: true,
}
q := NewQuerier(opts)
// logs queries are generates in ns
expectedTimeRangeInQueryString := fmt.Sprintf("timestamp >= %d AND timestamp <= %d", (1675115596722-86400*1000)*1000000, ((1675115596722+120*60*1000)-86400*1000)*1000000)
for i, param := range params {
_, err, errByName := q.QueryRange(context.Background(), param, nil)
if err != nil {
t.Errorf("expected no error, got %s", err)
}
if len(errByName) > 0 {
t.Errorf("expected no error, got %v", errByName)
}
if !strings.Contains(q.QueriesExecuted()[i], expectedTimeRangeInQueryString) {
t.Errorf("expected query to contain %s, got %s", expectedTimeRangeInQueryString, q.QueriesExecuted()[i])
}
}
}
// timeshift works with caching
func TestQueryRangeTimeShiftWithCache(t *testing.T) {
params := []*v3.QueryRangeParamsV3{
{
Start: 1675115596722 + 60*60*1000 - 86400*1000, //30, 4:23
End: 1675115596722 + 120*60*1000 - 86400*1000, //30, 5:23
Step: 5 * time.Minute.Milliseconds(),
CompositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypeBuilder,
PanelType: v3.PanelTypeGraph,
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
QueryName: "A",
StepInterval: 60,
DataSource: v3.DataSourceLogs,
AggregateAttribute: v3.AttributeKey{},
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{},
},
AggregateOperator: v3.AggregateOperatorCount,
Expression: "A",
GroupBy: []v3.AttributeKey{
{Key: "service_name", IsColumn: false},
{Key: "method", IsColumn: false},
},
},
},
},
},
{
Start: 1675115596722, //31, 3:23
End: 1675115596722 + 120*60*1000, //31, 5:23
Step: 5 * time.Minute.Milliseconds(),
CompositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypeBuilder,
PanelType: v3.PanelTypeGraph,
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
QueryName: "A",
StepInterval: 60,
DataSource: v3.DataSourceLogs,
AggregateAttribute: v3.AttributeKey{},
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{},
},
AggregateOperator: v3.AggregateOperatorCount,
Expression: "A",
ShiftBy: 86400,
GroupBy: []v3.AttributeKey{
{Key: "service_name", IsColumn: false},
{Key: "method", IsColumn: false},
},
},
},
},
},
}
cache := inmemory.New(&inmemory.Options{TTL: 60 * time.Minute, CleanupInterval: 10 * time.Minute})
opts := QuerierOptions{
Cache: cache,
Reader: nil,
FluxInterval: 5 * time.Minute,
KeyGenerator: queryBuilder.NewKeyGenerator(),
TestingMode: true,
ReturnedSeries: []*v3.Series{
{
Labels: map[string]string{},
Points: []v3.Point{
{Timestamp: 1675115596722 + 60*60*1000 - 86400*1000, Value: 1},
{Timestamp: 1675115596722 + 120*60*1000 - 86400*1000 + 60*60*1000, Value: 2},
},
},
},
}
q := NewQuerier(opts)
// logs queries are generates in ns
expectedTimeRangeInQueryString := []string{
fmt.Sprintf("timestamp >= %d AND timestamp <= %d", (1675115596722+60*60*1000-86400*1000)*1000000, (1675115596722+120*60*1000-86400*1000)*1000000),
fmt.Sprintf("timestamp >= %d AND timestamp <= %d", (1675115596722-86400*1000)*1000000, ((1675115596722+60*60*1000)-86400*1000-1)*1000000),
}
for i, param := range params {
_, err, errByName := q.QueryRange(context.Background(), param, nil)
if err != nil {
t.Errorf("expected no error, got %s", err)
}
if len(errByName) > 0 {
t.Errorf("expected no error, got %v", errByName)
}
if !strings.Contains(q.QueriesExecuted()[i], expectedTimeRangeInQueryString[i]) {
t.Errorf("expected query to contain %s, got %s", expectedTimeRangeInQueryString[i], q.QueriesExecuted()[i])
}
}
}
// timeshift with limit queries
func TestQueryRangeTimeShiftWithLimitAndCache(t *testing.T) {
params := []*v3.QueryRangeParamsV3{
{
Start: 1675115596722 + 60*60*1000 - 86400*1000, //30, 4:23
End: 1675115596722 + 120*60*1000 - 86400*1000, //30, 5:23
Step: 5 * time.Minute.Milliseconds(),
CompositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypeBuilder,
PanelType: v3.PanelTypeGraph,
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
QueryName: "A",
StepInterval: 60,
DataSource: v3.DataSourceLogs,
AggregateAttribute: v3.AttributeKey{},
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{},
},
AggregateOperator: v3.AggregateOperatorCount,
Expression: "A",
GroupBy: []v3.AttributeKey{
{Key: "service_name", IsColumn: false},
{Key: "method", IsColumn: false},
},
Limit: 5,
},
},
},
},
{
Start: 1675115596722, //31, 3:23
End: 1675115596722 + 120*60*1000, //31, 5:23
Step: 5 * time.Minute.Milliseconds(),
CompositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypeBuilder,
PanelType: v3.PanelTypeGraph,
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
QueryName: "A",
StepInterval: 60,
DataSource: v3.DataSourceLogs,
AggregateAttribute: v3.AttributeKey{},
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{},
},
AggregateOperator: v3.AggregateOperatorCount,
Expression: "A",
ShiftBy: 86400,
GroupBy: []v3.AttributeKey{
{Key: "service_name", IsColumn: false},
{Key: "method", IsColumn: false},
},
Limit: 5,
},
},
},
},
}
cache := inmemory.New(&inmemory.Options{TTL: 60 * time.Minute, CleanupInterval: 10 * time.Minute})
opts := QuerierOptions{
Cache: cache,
Reader: nil,
FluxInterval: 5 * time.Minute,
KeyGenerator: queryBuilder.NewKeyGenerator(),
TestingMode: true,
ReturnedSeries: []*v3.Series{
{
Labels: map[string]string{},
Points: []v3.Point{
{Timestamp: 1675115596722 + 60*60*1000 - 86400*1000, Value: 1},
{Timestamp: 1675115596722 + 120*60*1000 - 86400*1000 + 60*60*1000, Value: 2},
},
},
},
}
q := NewQuerier(opts)
// logs queries are generates in ns
expectedTimeRangeInQueryString := []string{
fmt.Sprintf("timestamp >= %d AND timestamp <= %d", (1675115596722+60*60*1000-86400*1000)*1000000, (1675115596722+120*60*1000-86400*1000)*1000000),
fmt.Sprintf("timestamp >= %d AND timestamp <= %d", (1675115596722-86400*1000)*1000000, ((1675115596722+60*60*1000)-86400*1000-1)*1000000),
}
for i, param := range params {
_, err, errByName := q.QueryRange(context.Background(), param, nil)
if err != nil {
t.Errorf("expected no error, got %s", err)
}
if len(errByName) > 0 {
t.Errorf("expected no error, got %v", errByName)
}
if !strings.Contains(q.QueriesExecuted()[i], expectedTimeRangeInQueryString[i]) {
t.Errorf("expected query to contain %s, got %s", expectedTimeRangeInQueryString[i], q.QueriesExecuted()[i])
}
}
}

View File

@ -36,6 +36,14 @@ func (q *querier) runBuilderQuery(
preferRPM = q.featureLookUp.CheckFeature(constants.PreferRPM) == nil
}
// making a local clone since we should not update the global params if there is sift by
start := params.Start
end := params.End
if builderQuery.ShiftBy != 0 {
start = start - builderQuery.ShiftBy*1000
end = end - builderQuery.ShiftBy*1000
}
// TODO: handle other data sources
if builderQuery.DataSource == v3.DataSourceLogs {
var query string
@ -43,8 +51,8 @@ func (q *querier) runBuilderQuery(
// for ts query with limit replace it as it is already formed
if params.CompositeQuery.PanelType == v3.PanelTypeGraph && builderQuery.Limit > 0 && len(builderQuery.GroupBy) > 0 {
limitQuery, err := logsV3.PrepareLogsQuery(
params.Start,
params.End,
start,
end,
params.CompositeQuery.QueryType,
params.CompositeQuery.PanelType,
builderQuery,
@ -55,8 +63,8 @@ func (q *querier) runBuilderQuery(
return
}
placeholderQuery, err := logsV3.PrepareLogsQuery(
params.Start,
params.End,
start,
end,
params.CompositeQuery.QueryType,
params.CompositeQuery.PanelType,
builderQuery,
@ -69,8 +77,8 @@ func (q *querier) runBuilderQuery(
query = strings.Replace(placeholderQuery, "#LIMIT_PLACEHOLDER", limitQuery, 1)
} else {
query, err = logsV3.PrepareLogsQuery(
params.Start,
params.End,
start,
end,
params.CompositeQuery.QueryType,
params.CompositeQuery.PanelType,
builderQuery,
@ -98,8 +106,8 @@ func (q *querier) runBuilderQuery(
// for ts query with group by and limit form two queries
if params.CompositeQuery.PanelType == v3.PanelTypeGraph && builderQuery.Limit > 0 && len(builderQuery.GroupBy) > 0 {
limitQuery, err := tracesV3.PrepareTracesQuery(
params.Start,
params.End,
start,
end,
params.CompositeQuery.PanelType,
builderQuery,
keys,
@ -110,8 +118,8 @@ func (q *querier) runBuilderQuery(
return
}
placeholderQuery, err := tracesV3.PrepareTracesQuery(
params.Start,
params.End,
start,
end,
params.CompositeQuery.PanelType,
builderQuery,
keys,
@ -124,8 +132,8 @@ func (q *querier) runBuilderQuery(
query = fmt.Sprintf(placeholderQuery, limitQuery)
} else {
query, err = tracesV3.PrepareTracesQuery(
params.Start,
params.End,
start,
end,
params.CompositeQuery.PanelType,
builderQuery,
keys,
@ -146,7 +154,7 @@ func (q *querier) runBuilderQuery(
// We are only caching the graph panel queries. A non-existant cache key means that the query is not cached.
// If the query is not cached, we execute the query and return the result without caching it.
if _, ok := cacheKeys[queryName]; !ok {
query, err := metricsV4.PrepareMetricQuery(params.Start, params.End, params.CompositeQuery.QueryType, params.CompositeQuery.PanelType, builderQuery, metricsV3.Options{PreferRPM: preferRPM})
query, err := metricsV4.PrepareMetricQuery(start, end, params.CompositeQuery.QueryType, params.CompositeQuery.PanelType, builderQuery, metricsV3.Options{PreferRPM: preferRPM})
if err != nil {
ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil}
return
@ -166,7 +174,7 @@ func (q *querier) runBuilderQuery(
cachedData = data
}
}
misses := q.findMissingTimeRanges(params.Start, params.End, params.Step, cachedData)
misses := q.findMissingTimeRanges(start, end, params.Step, cachedData)
missedSeries := make([]*v3.Series, 0)
cachedSeries := make([]*v3.Series, 0)
for _, miss := range misses {

View File

@ -183,6 +183,13 @@ func (qb *QueryBuilder) PrepareQueries(params *v3.QueryRangeParamsV3, args ...in
PreferRPMFeatureEnabled := err == nil
// Build queries for each builder query
for queryName, query := range compositeQuery.BuilderQueries {
// making a local clone since we should not update the global params if there is sift by
start := params.Start
end := params.End
if query.ShiftBy != 0 {
start = start - query.ShiftBy*1000
end = end - query.ShiftBy*1000
}
if query.Expression == queryName {
switch query.DataSource {
case v3.DataSourceTraces:
@ -192,12 +199,12 @@ func (qb *QueryBuilder) PrepareQueries(params *v3.QueryRangeParamsV3, args ...in
}
// for ts query with group by and limit form two queries
if compositeQuery.PanelType == v3.PanelTypeGraph && query.Limit > 0 && len(query.GroupBy) > 0 {
limitQuery, err := qb.options.BuildTraceQuery(params.Start, params.End, compositeQuery.PanelType, query,
limitQuery, err := qb.options.BuildTraceQuery(start, end, compositeQuery.PanelType, query,
keys, tracesV3.Options{GraphLimitQtype: constants.FirstQueryGraphLimit, PreferRPM: PreferRPMFeatureEnabled})
if err != nil {
return nil, err
}
placeholderQuery, err := qb.options.BuildTraceQuery(params.Start, params.End, compositeQuery.PanelType,
placeholderQuery, err := qb.options.BuildTraceQuery(start, end, compositeQuery.PanelType,
query, keys, tracesV3.Options{GraphLimitQtype: constants.SecondQueryGraphLimit, PreferRPM: PreferRPMFeatureEnabled})
if err != nil {
return nil, err
@ -205,7 +212,7 @@ func (qb *QueryBuilder) PrepareQueries(params *v3.QueryRangeParamsV3, args ...in
query := fmt.Sprintf(placeholderQuery, limitQuery)
queries[queryName] = query
} else {
queryString, err := qb.options.BuildTraceQuery(params.Start, params.End, compositeQuery.PanelType,
queryString, err := qb.options.BuildTraceQuery(start, end, compositeQuery.PanelType,
query, keys, tracesV3.Options{PreferRPM: PreferRPMFeatureEnabled, GraphLimitQtype: ""})
if err != nil {
return nil, err
@ -215,25 +222,25 @@ func (qb *QueryBuilder) PrepareQueries(params *v3.QueryRangeParamsV3, args ...in
case v3.DataSourceLogs:
// for ts query with limit replace it as it is already formed
if compositeQuery.PanelType == v3.PanelTypeGraph && query.Limit > 0 && len(query.GroupBy) > 0 {
limitQuery, err := qb.options.BuildLogQuery(params.Start, params.End, compositeQuery.QueryType, compositeQuery.PanelType, query, logsV3.Options{GraphLimitQtype: constants.FirstQueryGraphLimit, PreferRPM: PreferRPMFeatureEnabled})
limitQuery, err := qb.options.BuildLogQuery(start, end, compositeQuery.QueryType, compositeQuery.PanelType, query, logsV3.Options{GraphLimitQtype: constants.FirstQueryGraphLimit, PreferRPM: PreferRPMFeatureEnabled})
if err != nil {
return nil, err
}
placeholderQuery, err := qb.options.BuildLogQuery(params.Start, params.End, compositeQuery.QueryType, compositeQuery.PanelType, query, logsV3.Options{GraphLimitQtype: constants.SecondQueryGraphLimit, PreferRPM: PreferRPMFeatureEnabled})
placeholderQuery, err := qb.options.BuildLogQuery(start, end, compositeQuery.QueryType, compositeQuery.PanelType, query, logsV3.Options{GraphLimitQtype: constants.SecondQueryGraphLimit, PreferRPM: PreferRPMFeatureEnabled})
if err != nil {
return nil, err
}
query := fmt.Sprintf(placeholderQuery, limitQuery)
queries[queryName] = query
} else {
queryString, err := qb.options.BuildLogQuery(params.Start, params.End, compositeQuery.QueryType, compositeQuery.PanelType, query, logsV3.Options{PreferRPM: PreferRPMFeatureEnabled, GraphLimitQtype: ""})
queryString, err := qb.options.BuildLogQuery(start, end, compositeQuery.QueryType, compositeQuery.PanelType, query, logsV3.Options{PreferRPM: PreferRPMFeatureEnabled, GraphLimitQtype: ""})
if err != nil {
return nil, err
}
queries[queryName] = queryString
}
case v3.DataSourceMetrics:
queryString, err := qb.options.BuildMetricQuery(params.Start, params.End, compositeQuery.QueryType, compositeQuery.PanelType, query, metricsV3.Options{PreferRPM: PreferRPMFeatureEnabled})
queryString, err := qb.options.BuildMetricQuery(start, end, compositeQuery.QueryType, compositeQuery.PanelType, query, metricsV3.Options{PreferRPM: PreferRPMFeatureEnabled})
if err != nil {
return nil, err
}