chore: add time shift function and some refactoring (#4445)

This commit is contained in:
Srikanth Chekuri 2024-02-11 00:31:47 +05:30 committed by GitHub
parent 6bd2c1ba74
commit 3b98073ad4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 435 additions and 85 deletions

View File

@ -71,6 +71,7 @@ const (
signozSampleLocalTableName = "samples_v2" signozSampleLocalTableName = "samples_v2"
signozSampleTableName = "distributed_samples_v2" signozSampleTableName = "distributed_samples_v2"
signozTSTableName = "distributed_time_series_v2" signozTSTableName = "distributed_time_series_v2"
signozTSTableNameV41Day = "distributed_time_series_v4_1day"
minTimespanForProgressiveSearch = time.Hour minTimespanForProgressiveSearch = time.Hour
minTimespanForProgressiveSearchMargin = time.Minute minTimespanForProgressiveSearchMargin = time.Minute
@ -3287,7 +3288,7 @@ func (r *ClickHouseReader) FetchTemporality(ctx context.Context, metricNames []s
metricNameToTemporality := make(map[string]map[v3.Temporality]bool) metricNameToTemporality := make(map[string]map[v3.Temporality]bool)
query := fmt.Sprintf(`SELECT DISTINCT metric_name, temporality FROM %s.%s WHERE metric_name IN $1`, signozMetricDBName, signozTSTableName) query := fmt.Sprintf(`SELECT DISTINCT metric_name, temporality FROM %s.%s WHERE metric_name IN $1`, signozMetricDBName, signozTSTableNameV41Day)
rows, err := r.db.Query(ctx, query, metricNames) rows, err := r.db.Query(ctx, query, metricNames)
if err != nil { if err != nil {
@ -3309,15 +3310,6 @@ func (r *ClickHouseReader) FetchTemporality(ctx context.Context, metricNames []s
return metricNameToTemporality, nil return metricNameToTemporality, nil
} }
// func sum(array []tsByMetricName) uint64 {
// var result uint64
// result = 0
// for _, v := range array {
// result += v.count
// }
// return result
// }
func (r *ClickHouseReader) GetTimeSeriesInfo(ctx context.Context) (map[string]interface{}, error) { func (r *ClickHouseReader) GetTimeSeriesInfo(ctx context.Context) (map[string]interface{}, error) {
queryStr := fmt.Sprintf("SELECT count() as count from %s.%s group by metric_name order by count desc;", signozMetricDBName, signozTSTableName) queryStr := fmt.Sprintf("SELECT count() as count from %s.%s group by metric_name order by count desc;", signozMetricDBName, signozTSTableName)
@ -3960,7 +3952,7 @@ func (r *ClickHouseReader) GetMetricAggregateAttributes(ctx context.Context, req
var rows driver.Rows var rows driver.Rows
var response v3.AggregateAttributeResponse var response v3.AggregateAttributeResponse
query = fmt.Sprintf("SELECT DISTINCT(metric_name) from %s.%s WHERE metric_name ILIKE $1", signozMetricDBName, signozTSTableName) query = fmt.Sprintf("SELECT DISTINCT metric_name, type from %s.%s WHERE metric_name ILIKE $1", signozMetricDBName, signozTSTableNameV41Day)
if req.Limit != 0 { if req.Limit != 0 {
query = query + fmt.Sprintf(" LIMIT %d;", req.Limit) query = query + fmt.Sprintf(" LIMIT %d;", req.Limit)
} }
@ -3972,15 +3964,16 @@ func (r *ClickHouseReader) GetMetricAggregateAttributes(ctx context.Context, req
} }
defer rows.Close() defer rows.Close()
var metricName string var metricName, typ string
for rows.Next() { for rows.Next() {
if err := rows.Scan(&metricName); err != nil { if err := rows.Scan(&metricName, &typ); err != nil {
return nil, fmt.Errorf("error while scanning rows: %s", err.Error()) return nil, fmt.Errorf("error while scanning rows: %s", err.Error())
} }
// unlike traces/logs `tag`/`resource` type, the `Type` will be metric type
key := v3.AttributeKey{ key := v3.AttributeKey{
Key: metricName, Key: metricName,
DataType: v3.AttributeKeyDataTypeFloat64, DataType: v3.AttributeKeyDataTypeFloat64,
Type: v3.AttributeKeyTypeUnspecified, Type: v3.AttributeKeyType(typ),
IsColumn: true, IsColumn: true,
} }
response.AttributeKeys = append(response.AttributeKeys, key) response.AttributeKeys = append(response.AttributeKeys, key)
@ -4111,6 +4104,68 @@ func (r *ClickHouseReader) GetLatencyMetricMetadata(ctx context.Context, metricN
}, nil }, nil
} }
func (r *ClickHouseReader) GetMetricMetadata(ctx context.Context, metricName, serviceName string) (*v3.MetricMetadataResponse, error) {
// Note: metric metadata should be accessible regardless of the time range selection
// our standard retention period is 30 days, so we are querying the table v4_1_day to reduce the
// amount of data scanned
query := fmt.Sprintf("SELECT DISTINCT temporality, description, type, unit, is_monotonic from %s.%s WHERE metric_name=$1", signozMetricDBName, signozTSTableNameV41Day)
rows, err := r.db.Query(ctx, query, metricName)
if err != nil {
zap.S().Error(err)
return nil, fmt.Errorf("error while fetching metric metadata: %s", err.Error())
}
defer rows.Close()
var deltaExists, isMonotonic bool
var temporality, description, metricType, unit string
for rows.Next() {
if err := rows.Scan(&temporality, &description, &metricType, &unit, &isMonotonic); err != nil {
return nil, fmt.Errorf("error while scanning rows: %s", err.Error())
}
if temporality == string(v3.Delta) {
deltaExists = true
}
}
query = fmt.Sprintf("SELECT DISTINCT(JSONExtractString(labels, 'le')) as le from %s.%s WHERE metric_name=$1 AND type = 'Histogram' AND JSONExtractString(labels, 'service_name') = $2 ORDER BY le", signozMetricDBName, signozTSTableNameV41Day)
rows, err = r.db.Query(ctx, query, metricName, serviceName)
if err != nil {
zap.S().Error(err)
return nil, fmt.Errorf("error while executing query: %s", err.Error())
}
defer rows.Close()
var leFloat64 []float64
for rows.Next() {
var leStr string
if err := rows.Scan(&leStr); err != nil {
return nil, fmt.Errorf("error while scanning rows: %s", err.Error())
}
le, err := strconv.ParseFloat(leStr, 64)
// ignore the error and continue if the value is not a float
// ideally this should not happen but we have seen ClickHouse
// returning empty string for some values
if err != nil {
zap.S().Error("error while parsing le value: ", err)
continue
}
if math.IsInf(le, 0) {
continue
}
leFloat64 = append(leFloat64, le)
}
return &v3.MetricMetadataResponse{
Delta: deltaExists,
Le: leFloat64,
Description: description,
Unit: unit,
Type: metricType,
IsMonotonic: isMonotonic,
Temporality: temporality,
}, nil
}
func isColumn(tableStatement, attrType, field, datType string) bool { func isColumn(tableStatement, attrType, field, datType string) bool {
// value of attrType will be `resource` or `tag`, if `tag` change it to `attribute` // value of attrType will be `resource` or `tag`, if `tag` change it to `attribute`
name := utils.GetClickhouseColumnName(attrType, datType, field) name := utils.GetClickhouseColumnName(attrType, datType, field)

View File

@ -154,6 +154,11 @@ func processResults(results []*v3.Result, expression *govaluate.EvaluableExpress
return nil, err return nil, err
} }
if series != nil { if series != nil {
labelsArray := make([]map[string]string, 0)
for k, v := range series.Labels {
labelsArray = append(labelsArray, map[string]string{k: v})
}
series.LabelsArray = labelsArray
newSeries = append(newSeries, series) newSeries = append(newSeries, series)
} }
} }

View File

@ -84,6 +84,12 @@ type APIHandler struct {
preferDelta bool preferDelta bool
preferSpanMetrics bool preferSpanMetrics bool
// temporalityMap is a map of metric name to temporality
// to avoid fetching temporality for the same metric multiple times
// querying the v4 table on low cardinal temporality column
// should be fast but we can still avoid the query if we have the data in memory
temporalityMap map[string]map[v3.Temporality]bool
maxIdleConns int maxIdleConns int
maxOpenConns int maxOpenConns int
dialTimeout time.Duration dialTimeout time.Duration
@ -161,6 +167,7 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
skipConfig: opts.SkipConfig, skipConfig: opts.SkipConfig,
preferDelta: opts.PerferDelta, preferDelta: opts.PerferDelta,
preferSpanMetrics: opts.PreferSpanMetrics, preferSpanMetrics: opts.PreferSpanMetrics,
temporalityMap: make(map[string]map[v3.Temporality]bool),
maxIdleConns: opts.MaxIdleConns, maxIdleConns: opts.MaxIdleConns,
maxOpenConns: opts.MaxOpenConns, maxOpenConns: opts.MaxOpenConns,
dialTimeout: opts.DialTimeout, dialTimeout: opts.DialTimeout,
@ -335,6 +342,7 @@ func (aH *APIHandler) RegisterQueryRangeV3Routes(router *mux.Router, am *AuthMid
func (aH *APIHandler) RegisterQueryRangeV4Routes(router *mux.Router, am *AuthMiddleware) { func (aH *APIHandler) RegisterQueryRangeV4Routes(router *mux.Router, am *AuthMiddleware) {
subRouter := router.PathPrefix("/api/v4").Subrouter() subRouter := router.PathPrefix("/api/v4").Subrouter()
subRouter.HandleFunc("/query_range", am.ViewAccess(aH.QueryRangeV4)).Methods(http.MethodPost) subRouter.HandleFunc("/query_range", am.ViewAccess(aH.QueryRangeV4)).Methods(http.MethodPost)
subRouter.HandleFunc("/metric/metric_metadata", am.ViewAccess(aH.getMetricMetadata)).Methods(http.MethodGet)
} }
func (aH *APIHandler) Respond(w http.ResponseWriter, data interface{}) { func (aH *APIHandler) Respond(w http.ResponseWriter, data interface{}) {
@ -537,7 +545,7 @@ func (aH *APIHandler) addTemporality(ctx context.Context, qp *v3.QueryRangeParam
metricNameToTemporality := make(map[string]map[v3.Temporality]bool) metricNameToTemporality := make(map[string]map[v3.Temporality]bool)
if qp.CompositeQuery != nil && len(qp.CompositeQuery.BuilderQueries) > 0 { if qp.CompositeQuery != nil && len(qp.CompositeQuery.BuilderQueries) > 0 {
for _, query := range qp.CompositeQuery.BuilderQueries { for _, query := range qp.CompositeQuery.BuilderQueries {
if query.DataSource == v3.DataSourceMetrics { if query.DataSource == v3.DataSourceMetrics && query.Temporality == "" {
metricNames = append(metricNames, query.AggregateAttribute.Key) metricNames = append(metricNames, query.AggregateAttribute.Key)
if _, ok := metricNameToTemporality[query.AggregateAttribute.Key]; !ok { if _, ok := metricNameToTemporality[query.AggregateAttribute.Key]; !ok {
metricNameToTemporality[query.AggregateAttribute.Key] = make(map[v3.Temporality]bool) metricNameToTemporality[query.AggregateAttribute.Key] = make(map[v3.Temporality]bool)
@ -782,6 +790,58 @@ func (aH *APIHandler) QueryRangeMetricsV2(w http.ResponseWriter, r *http.Request
aH.Respond(w, resp) aH.Respond(w, resp)
} }
// populateTemporality same as addTemporality but for v4 and better
func (aH *APIHandler) populateTemporality(ctx context.Context, qp *v3.QueryRangeParamsV3) error {
missingTemporality := make([]string, 0)
metricNameToTemporality := make(map[string]map[v3.Temporality]bool)
if qp.CompositeQuery != nil && len(qp.CompositeQuery.BuilderQueries) > 0 {
for _, query := range qp.CompositeQuery.BuilderQueries {
// if there is no temporality specified in the query but we have it in the map
// then use the value from the map
if query.Temporality == "" && aH.temporalityMap[query.AggregateAttribute.Key] != nil {
// We prefer delta if it is available
if aH.temporalityMap[query.AggregateAttribute.Key][v3.Delta] {
query.Temporality = v3.Delta
} else if aH.temporalityMap[query.AggregateAttribute.Key][v3.Cumulative] {
query.Temporality = v3.Cumulative
} else {
query.Temporality = v3.Unspecified
}
}
// we don't have temporality for this metric
if query.DataSource == v3.DataSourceMetrics && query.Temporality == "" {
missingTemporality = append(missingTemporality, query.AggregateAttribute.Key)
}
if _, ok := metricNameToTemporality[query.AggregateAttribute.Key]; !ok {
metricNameToTemporality[query.AggregateAttribute.Key] = make(map[v3.Temporality]bool)
}
}
}
nameToTemporality, err := aH.reader.FetchTemporality(ctx, missingTemporality)
if err != nil {
return err
}
if qp.CompositeQuery != nil && len(qp.CompositeQuery.BuilderQueries) > 0 {
for name := range qp.CompositeQuery.BuilderQueries {
query := qp.CompositeQuery.BuilderQueries[name]
if query.DataSource == v3.DataSourceMetrics && query.Temporality == "" {
if nameToTemporality[query.AggregateAttribute.Key][v3.Delta] {
query.Temporality = v3.Delta
} else if nameToTemporality[query.AggregateAttribute.Key][v3.Cumulative] {
query.Temporality = v3.Cumulative
} else {
query.Temporality = v3.Unspecified
}
aH.temporalityMap[query.AggregateAttribute.Key] = nameToTemporality[query.AggregateAttribute.Key]
}
}
}
return nil
}
func (aH *APIHandler) listRules(w http.ResponseWriter, r *http.Request) { func (aH *APIHandler) listRules(w http.ResponseWriter, r *http.Request) {
rules, err := aH.ruleManager.ListRuleStates(r.Context()) rules, err := aH.ruleManager.ListRuleStates(r.Context())
@ -3179,7 +3239,7 @@ func (aH *APIHandler) liveTailLogs(w http.ResponseWriter, r *http.Request) {
zap.S().Debug("done!") zap.S().Debug("done!")
return return
case err := <-client.Error: case err := <-client.Error:
zap.S().Error("error occured!", err) zap.S().Error("error occurred!", err)
fmt.Fprintf(w, "event: error\ndata: %v\n\n", err.Error()) fmt.Fprintf(w, "event: error\ndata: %v\n\n", err.Error())
flusher.Flush() flusher.Flush()
return return
@ -3187,6 +3247,18 @@ func (aH *APIHandler) liveTailLogs(w http.ResponseWriter, r *http.Request) {
} }
} }
func (aH *APIHandler) getMetricMetadata(w http.ResponseWriter, r *http.Request) {
metricName := r.URL.Query().Get("metricName")
serviceName := r.URL.Query().Get("serviceName")
metricMetadata, err := aH.reader.GetMetricMetadata(r.Context(), metricName, serviceName)
if err != nil {
RespondError(w, &model.ApiError{Err: err, Typ: model.ErrorInternal}, nil)
return
}
aH.WriteJSON(w, r, metricMetadata)
}
func (aH *APIHandler) queryRangeV4(ctx context.Context, queryRangeParams *v3.QueryRangeParamsV3, w http.ResponseWriter, r *http.Request) { func (aH *APIHandler) queryRangeV4(ctx context.Context, queryRangeParams *v3.QueryRangeParamsV3, w http.ResponseWriter, r *http.Request) {
var result []*v3.Result var result []*v3.Result
@ -3250,8 +3322,7 @@ func (aH *APIHandler) QueryRangeV4(w http.ResponseWriter, r *http.Request) {
} }
// add temporality for each metric // add temporality for each metric
temporalityErr := aH.populateTemporality(r.Context(), queryRangeParams)
temporalityErr := aH.addTemporality(r.Context(), queryRangeParams)
if temporalityErr != nil { if temporalityErr != nil {
zap.S().Errorf("Error while adding temporality for metrics: %v", temporalityErr) zap.S().Errorf("Error while adding temporality for metrics: %v", temporalityErr)
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: temporalityErr}, nil) RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: temporalityErr}, nil)
@ -3305,9 +3376,24 @@ func postProcessResult(result []*v3.Result, queryRangeParams *v3.QueryRangeParam
zap.S().Errorf("error in expression: %s", err.Error()) zap.S().Errorf("error in expression: %s", err.Error())
return nil, err return nil, err
} }
formulaResult.QueryName = query.QueryName
result = append(result, formulaResult) result = append(result, formulaResult)
} }
} }
// we are done with the formula calculations, only send the results for enabled queries
removeDisabledQueries := func(result []*v3.Result) []*v3.Result {
var newResult []*v3.Result
for _, res := range result {
if queryRangeParams.CompositeQuery.BuilderQueries[res.QueryName].Disabled {
continue
}
newResult = append(newResult, res)
}
return newResult
}
if queryRangeParams.CompositeQuery.QueryType == v3.QueryTypeBuilder {
result = removeDisabledQueries(result)
}
return result, nil return result, nil
} }

View File

@ -51,7 +51,7 @@ func TestPrepareTableQuery(t *testing.T) {
}, },
start: 1701794980000, start: 1701794980000,
end: 1701796780000, end: 1701796780000,
expectedQueryContains: "SELECT ts, sum(per_series_value) as value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, avg(value) as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'system_memory_usage' AND temporality = 'Unspecified' AND JSONExtractString(labels, 'state') != 'idle') as filtered_time_series USING fingerprint WHERE metric_name = 'system_memory_usage' AND timestamp_ms >= 1701794980000 AND timestamp_ms <= 1701796780000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(per_series_value) = 0 GROUP BY ts ORDER BY ts ASC", expectedQueryContains: "SELECT ts, sum(per_series_value) as value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, avg(value) as per_series_value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v4 WHERE metric_name = 'system_memory_usage' AND temporality = 'Unspecified' AND unix_milli >= 1701792000000 AND unix_milli < 1701796780000 AND JSONExtractString(labels, 'state') != 'idle') as filtered_time_series USING fingerprint WHERE metric_name = 'system_memory_usage' AND unix_milli >= 1701794980000 AND unix_milli < 1701796780000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(per_series_value) = 0 GROUP BY ts ORDER BY ts ASC",
}, },
{ {
name: "test time aggregation = rate, space aggregation = sum, temporality = cumulative", name: "test time aggregation = rate, space aggregation = sum, temporality = cumulative",
@ -93,7 +93,7 @@ func TestPrepareTableQuery(t *testing.T) {
}, },
start: 1701794980000, start: 1701794980000,
end: 1701796780000, end: 1701796780000,
expectedQueryContains: "SELECT service_name, ts, sum(per_series_value) as value FROM (SELECT service_name, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(service_name) as service_name, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'http_requests' AND temporality = 'Cumulative' AND like(JSONExtractString(labels, 'service_name'), '%payment_service%')) as filtered_time_series USING fingerprint WHERE metric_name = 'http_requests' AND timestamp_ms >= 1701794980000 AND timestamp_ms <= 1701796780000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(per_series_value) = 0 GROUP BY GROUPING SETS ( (service_name, ts), (service_name) ) ORDER BY service_name ASC, ts ASC", expectedQueryContains: "SELECT service_name, ts, sum(per_series_value) as value FROM (SELECT service_name, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(service_name) as service_name, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, max(value) as per_series_value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v4 WHERE metric_name = 'http_requests' AND temporality = 'Cumulative' AND unix_milli >= 1701792000000 AND unix_milli < 1701796780000 AND like(JSONExtractString(labels, 'service_name'), '%payment_service%')) as filtered_time_series USING fingerprint WHERE metric_name = 'http_requests' AND unix_milli >= 1701794980000 AND unix_milli < 1701796780000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(per_series_value) = 0 GROUP BY GROUPING SETS ( (service_name, ts), (service_name) ) ORDER BY service_name ASC, ts ASC",
}, },
} }

View File

@ -107,19 +107,19 @@ const (
func prepareTimeAggregationSubQuery(start, end, step int64, mq *v3.BuilderQuery) (string, error) { func prepareTimeAggregationSubQuery(start, end, step int64, mq *v3.BuilderQuery) (string, error) {
var subQuery string var subQuery string
timeSeriesSubQuery, err := helpers.PrepareTimeseriesFilterQuery(mq) timeSeriesSubQuery, err := helpers.PrepareTimeseriesFilterQuery(start, end, mq)
if err != nil { if err != nil {
return "", err return "", err
} }
samplesTableFilter := fmt.Sprintf("metric_name = %s AND timestamp_ms >= %d AND timestamp_ms <= %d", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key), start, end) samplesTableFilter := fmt.Sprintf("metric_name = %s AND unix_milli >= %d AND unix_milli < %d", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key), start, end)
// Select the aggregate value for interval // Select the aggregate value for interval
queryTmpl := queryTmpl :=
"SELECT fingerprint, %s" + "SELECT fingerprint, %s" +
" toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL %d SECOND) as ts," + " toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL %d SECOND) as ts," +
" %s as per_series_value" + " %s as per_series_value" +
" FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_TABLENAME + " FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_V4_TABLENAME +
" INNER JOIN" + " INNER JOIN" +
" (%s) as filtered_time_series" + " (%s) as filtered_time_series" +
" USING fingerprint" + " USING fingerprint" +

View File

@ -66,7 +66,7 @@ func TestPrepareTimeAggregationSubQuery(t *testing.T) {
}, },
start: 1701794980000, start: 1701794980000,
end: 1701796780000, end: 1701796780000,
expectedQueryContains: "SELECT fingerprint, any(service_name) as service_name, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, avg(value) as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'http_requests' AND temporality = 'Cumulative' AND JSONExtractString(labels, 'service_name') != 'payment_service' AND JSONExtractString(labels, 'endpoint') IN ['/paycallback','/payme','/paypal']) as filtered_time_series USING fingerprint WHERE metric_name = 'http_requests' AND timestamp_ms >= 1701794980000 AND timestamp_ms <= 1701796780000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts", expectedQueryContains: "SELECT fingerprint, any(service_name) as service_name, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, avg(value) as per_series_value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v4 WHERE metric_name = 'http_requests' AND temporality = 'Cumulative' AND unix_milli >= 1701792000000 AND unix_milli < 1701796780000 AND JSONExtractString(labels, 'service_name') != 'payment_service' AND JSONExtractString(labels, 'endpoint') IN ['/paycallback','/payme','/paypal']) as filtered_time_series USING fingerprint WHERE metric_name = 'http_requests' AND unix_milli >= 1701794980000 AND unix_milli < 1701796780000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts",
}, },
{ {
name: "test time aggregation = rate, temporality = cumulative", name: "test time aggregation = rate, temporality = cumulative",
@ -107,7 +107,7 @@ func TestPrepareTimeAggregationSubQuery(t *testing.T) {
}, },
start: 1701794980000, start: 1701794980000,
end: 1701796780000, end: 1701796780000,
expectedQueryContains: "SELECT service_name, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(service_name) as service_name, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'http_requests' AND temporality = 'Cumulative' AND like(JSONExtractString(labels, 'service_name'), '%payment_service%')) as filtered_time_series USING fingerprint WHERE metric_name = 'http_requests' AND timestamp_ms >= 1701794980000 AND timestamp_ms <= 1701796780000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)", expectedQueryContains: "SELECT service_name, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(service_name) as service_name, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, max(value) as per_series_value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v4 WHERE metric_name = 'http_requests' AND temporality = 'Cumulative' AND unix_milli >= 1701792000000 AND unix_milli < 1701796780000 AND like(JSONExtractString(labels, 'service_name'), '%payment_service%')) as filtered_time_series USING fingerprint WHERE metric_name = 'http_requests' AND unix_milli >= 1701794980000 AND unix_milli < 1701796780000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)",
}, },
} }
@ -168,7 +168,7 @@ func TestPrepareTimeseriesQuery(t *testing.T) {
}, },
start: 1701794980000, start: 1701794980000,
end: 1701796780000, end: 1701796780000,
expectedQueryContains: "SELECT ts, sum(per_series_value) as value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, avg(value) as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'system_memory_usage' AND temporality = 'Unspecified' AND JSONExtractString(labels, 'state') != 'idle') as filtered_time_series USING fingerprint WHERE metric_name = 'system_memory_usage' AND timestamp_ms >= 1701794980000 AND timestamp_ms <= 1701796780000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(per_series_value) = 0 GROUP BY ts ORDER BY ts ASC", expectedQueryContains: "SELECT ts, sum(per_series_value) as value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, avg(value) as per_series_value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v4 WHERE metric_name = 'system_memory_usage' AND temporality = 'Unspecified' AND unix_milli >= 1701792000000 AND unix_milli < 1701796780000 AND JSONExtractString(labels, 'state') != 'idle') as filtered_time_series USING fingerprint WHERE metric_name = 'system_memory_usage' AND unix_milli >= 1701794980000 AND unix_milli < 1701796780000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(per_series_value) = 0 GROUP BY ts ORDER BY ts ASC",
}, },
{ {
name: "test time aggregation = rate, space aggregation = sum, temporality = cumulative", name: "test time aggregation = rate, space aggregation = sum, temporality = cumulative",
@ -210,7 +210,7 @@ func TestPrepareTimeseriesQuery(t *testing.T) {
}, },
start: 1701794980000, start: 1701794980000,
end: 1701796780000, end: 1701796780000,
expectedQueryContains: "SELECT service_name, ts, sum(per_series_value) as value FROM (SELECT service_name, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(service_name) as service_name, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'http_requests' AND temporality = 'Cumulative' AND like(JSONExtractString(labels, 'service_name'), '%payment_service%')) as filtered_time_series USING fingerprint WHERE metric_name = 'http_requests' AND timestamp_ms >= 1701794980000 AND timestamp_ms <= 1701796780000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(per_series_value) = 0 GROUP BY GROUPING SETS ( (service_name, ts), (service_name) ) ORDER BY service_name ASC, ts ASC", expectedQueryContains: "SELECT service_name, ts, sum(per_series_value) as value FROM (SELECT service_name, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(service_name) as service_name, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, max(value) as per_series_value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v4 WHERE metric_name = 'http_requests' AND temporality = 'Cumulative' AND unix_milli >= 1701792000000 AND unix_milli < 1701796780000 AND like(JSONExtractString(labels, 'service_name'), '%payment_service%')) as filtered_time_series USING fingerprint WHERE metric_name = 'http_requests' AND unix_milli >= 1701794980000 AND unix_milli < 1701796780000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(per_series_value) = 0 GROUP BY GROUPING SETS ( (service_name, ts), (service_name) ) ORDER BY service_name ASC, ts ASC",
}, },
} }

View File

@ -53,7 +53,7 @@ func TestPrepareTableQuery(t *testing.T) {
}, },
start: 1701794980000, start: 1701794980000,
end: 1701796780000, end: 1701796780000,
expectedQueryContains: "SELECT ts, sum(per_series_value) as value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, avg(value) as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'system_memory_usage' AND temporality = 'Unspecified' AND JSONExtractString(labels, 'state') != 'idle') as filtered_time_series USING fingerprint WHERE metric_name = 'system_memory_usage' AND timestamp_ms >= 1701794980000 AND timestamp_ms <= 1701796780000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(per_series_value) = 0 GROUP BY ts ORDER BY ts ASC", expectedQueryContains: "SELECT ts, sum(per_series_value) as value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, avg(value) as per_series_value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v4 WHERE metric_name = 'system_memory_usage' AND temporality = 'Unspecified' AND unix_milli >= 1701792000000 AND unix_milli < 1701796780000 AND JSONExtractString(labels, 'state') != 'idle') as filtered_time_series USING fingerprint WHERE metric_name = 'system_memory_usage' AND unix_milli >= 1701794980000 AND unix_milli < 1701796780000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(per_series_value) = 0 GROUP BY ts ORDER BY ts ASC",
}, },
{ {
name: "test time aggregation = rate, space aggregation = sum, temporality = delta", name: "test time aggregation = rate, space aggregation = sum, temporality = delta",
@ -95,7 +95,7 @@ func TestPrepareTableQuery(t *testing.T) {
}, },
start: 1701794980000, start: 1701794980000,
end: 1701796780000, end: 1701796780000,
expectedQueryContains: "SELECT service_name, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'http_requests' AND temporality = 'Delta' AND like(JSONExtractString(labels, 'service_name'), '%payment_service%')) as filtered_time_series USING fingerprint WHERE metric_name = 'http_requests' AND timestamp_ms >= 1701794980000 AND timestamp_ms <= 1701796780000 GROUP BY GROUPING SETS ( (service_name, ts), (service_name) ) ORDER BY service_name ASC, ts ASC", expectedQueryContains: "SELECT service_name, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v4 WHERE metric_name = 'http_requests' AND temporality = 'Delta' AND unix_milli >= 1701792000000 AND unix_milli < 1701796780000 AND like(JSONExtractString(labels, 'service_name'), '%payment_service%')) as filtered_time_series USING fingerprint WHERE metric_name = 'http_requests' AND unix_milli >= 1701794980000 AND unix_milli < 1701796780000 GROUP BY GROUPING SETS ( (service_name, ts), (service_name) ) ORDER BY service_name ASC, ts ASC",
}, },
} }

View File

@ -66,7 +66,7 @@ func TestPrepareTimeAggregationSubQuery(t *testing.T) {
}, },
start: 1701794980000, start: 1701794980000,
end: 1701796780000, end: 1701796780000,
expectedQueryContains: "SELECT fingerprint, any(service_name) as service_name, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, avg(value) as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'http_requests' AND temporality = 'Delta' AND JSONExtractString(labels, 'service_name') != 'payment_service' AND JSONExtractString(labels, 'endpoint') IN ['/paycallback','/payme','/paypal']) as filtered_time_series USING fingerprint WHERE metric_name = 'http_requests' AND timestamp_ms >= 1701794980000 AND timestamp_ms <= 1701796780000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts", expectedQueryContains: "SELECT fingerprint, any(service_name) as service_name, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, avg(value) as per_series_value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v4 WHERE metric_name = 'http_requests' AND temporality = 'Delta' AND unix_milli >= 1701792000000 AND unix_milli < 1701796780000 AND JSONExtractString(labels, 'service_name') != 'payment_service' AND JSONExtractString(labels, 'endpoint') IN ['/paycallback','/payme','/paypal']) as filtered_time_series USING fingerprint WHERE metric_name = 'http_requests' AND unix_milli >= 1701794980000 AND unix_milli < 1701796780000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts",
}, },
{ {
name: "test time aggregation = rate, temporality = delta", name: "test time aggregation = rate, temporality = delta",
@ -107,7 +107,7 @@ func TestPrepareTimeAggregationSubQuery(t *testing.T) {
}, },
start: 1701794980000, start: 1701794980000,
end: 1701796780000, end: 1701796780000,
expectedQueryContains: "SELECT fingerprint, any(service_name) as service_name, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'http_requests' AND temporality = 'Delta' AND like(JSONExtractString(labels, 'service_name'), '%payment_service%')) as filtered_time_series USING fingerprint WHERE metric_name = 'http_requests' AND timestamp_ms >= 1701794980000 AND timestamp_ms <= 1701796780000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts", expectedQueryContains: "SELECT fingerprint, any(service_name) as service_name, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as per_series_value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v4 WHERE metric_name = 'http_requests' AND temporality = 'Delta' AND unix_milli >= 1701792000000 AND unix_milli < 1701796780000 AND like(JSONExtractString(labels, 'service_name'), '%payment_service%')) as filtered_time_series USING fingerprint WHERE metric_name = 'http_requests' AND unix_milli >= 1701794980000 AND unix_milli < 1701796780000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts",
}, },
} }
@ -168,7 +168,7 @@ func TestPrepareTimeseriesQuery(t *testing.T) {
}, },
start: 1701794980000, start: 1701794980000,
end: 1701796780000, end: 1701796780000,
expectedQueryContains: "SELECT ts, sum(per_series_value) as value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, avg(value) as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'system_memory_usage' AND temporality = 'Unspecified' AND JSONExtractString(labels, 'state') != 'idle') as filtered_time_series USING fingerprint WHERE metric_name = 'system_memory_usage' AND timestamp_ms >= 1701794980000 AND timestamp_ms <= 1701796780000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(per_series_value) = 0 GROUP BY ts ORDER BY ts ASC", expectedQueryContains: "SELECT ts, sum(per_series_value) as value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, avg(value) as per_series_value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v4 WHERE metric_name = 'system_memory_usage' AND temporality = 'Unspecified' AND unix_milli >= 1701792000000 AND unix_milli < 1701796780000 AND JSONExtractString(labels, 'state') != 'idle') as filtered_time_series USING fingerprint WHERE metric_name = 'system_memory_usage' AND unix_milli >= 1701794980000 AND unix_milli < 1701796780000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(per_series_value) = 0 GROUP BY ts ORDER BY ts ASC",
}, },
{ {
name: "test time aggregation = rate, space aggregation = sum, temporality = delta", name: "test time aggregation = rate, space aggregation = sum, temporality = delta",
@ -210,7 +210,7 @@ func TestPrepareTimeseriesQuery(t *testing.T) {
}, },
start: 1701794980000, start: 1701794980000,
end: 1701796780000, end: 1701796780000,
expectedQueryContains: "SELECT service_name, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'http_requests' AND temporality = 'Delta' AND like(JSONExtractString(labels, 'service_name'), '%payment_service%')) as filtered_time_series USING fingerprint WHERE metric_name = 'http_requests' AND timestamp_ms >= 1701794980000 AND timestamp_ms <= 1701796780000 GROUP BY GROUPING SETS ( (service_name, ts), (service_name) ) ORDER BY service_name ASC, ts ASC", expectedQueryContains: "SELECT service_name, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v4 WHERE metric_name = 'http_requests' AND temporality = 'Delta' AND unix_milli >= 1701792000000 AND unix_milli < 1701796780000 AND like(JSONExtractString(labels, 'service_name'), '%payment_service%')) as filtered_time_series USING fingerprint WHERE metric_name = 'http_requests' AND unix_milli >= 1701794980000 AND unix_milli < 1701796780000 GROUP BY GROUPING SETS ( (service_name, ts), (service_name) ) ORDER BY service_name ASC, ts ASC",
}, },
} }

View File

@ -14,19 +14,19 @@ func prepareTimeAggregationSubQuery(start, end, step int64, mq *v3.BuilderQuery)
var subQuery string var subQuery string
timeSeriesSubQuery, err := helpers.PrepareTimeseriesFilterQuery(mq) timeSeriesSubQuery, err := helpers.PrepareTimeseriesFilterQuery(start, end, mq)
if err != nil { if err != nil {
return "", err return "", err
} }
samplesTableFilter := fmt.Sprintf("metric_name = %s AND timestamp_ms >= %d AND timestamp_ms <= %d", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key), start, end) samplesTableFilter := fmt.Sprintf("metric_name = %s AND unix_milli >= %d AND unix_milli < %d", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key), start, end)
// Select the aggregate value for interval // Select the aggregate value for interval
queryTmpl := queryTmpl :=
"SELECT fingerprint, %s" + "SELECT fingerprint, %s" +
" toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL %d SECOND) as ts," + " toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL %d SECOND) as ts," +
" %s as per_series_value" + " %s as per_series_value" +
" FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_TABLENAME + " FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_V4_TABLENAME +
" INNER JOIN" + " INNER JOIN" +
" (%s) as filtered_time_series" + " (%s) as filtered_time_series" +
" USING fingerprint" + " USING fingerprint" +
@ -77,19 +77,19 @@ func prepareQueryOptimized(start, end, step int64, mq *v3.BuilderQuery) (string,
var query string var query string
timeSeriesSubQuery, err := helpers.PrepareTimeseriesFilterQuery(mq) timeSeriesSubQuery, err := helpers.PrepareTimeseriesFilterQuery(start, end, mq)
if err != nil { if err != nil {
return "", err return "", err
} }
samplesTableFilter := fmt.Sprintf("metric_name = %s AND timestamp_ms >= %d AND timestamp_ms <= %d", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key), start, end) samplesTableFilter := fmt.Sprintf("metric_name = %s AND unix_milli >= %d AND unix_milli < %d", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key), start, end)
// Select the aggregate value for interval // Select the aggregate value for interval
queryTmpl := queryTmpl :=
"SELECT %s" + "SELECT %s" +
" toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL %d SECOND) as ts," + " toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL %d SECOND) as ts," +
" %s as value" + " %s as value" +
" FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_TABLENAME + " FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_V4_TABLENAME +
" INNER JOIN" + " INNER JOIN" +
" (%s) as filtered_time_series" + " (%s) as filtered_time_series" +
" USING fingerprint" + " USING fingerprint" +

View File

@ -37,6 +37,17 @@ func GroupByAttributeKeyTags(tags ...v3.AttributeKey) string {
return strings.Join(groupTags, ", ") return strings.Join(groupTags, ", ")
} }
func GroupByAttributeKeyTagsWithoutLe(tags ...v3.AttributeKey) string {
groupTags := []string{}
for _, tag := range tags {
if tag.Key != "le" {
groupTags = append(groupTags, tag.Key)
}
}
groupTags = append(groupTags, "ts")
return strings.Join(groupTags, ", ")
}
// OrderByAttributeKeyTags returns a string of comma separated tags for order by clause // OrderByAttributeKeyTags returns a string of comma separated tags for order by clause
// if the order is not specified, it defaults to ASC // if the order is not specified, it defaults to ASC
func OrderByAttributeKeyTags(items []v3.OrderBy, tags []v3.AttributeKey) string { func OrderByAttributeKeyTags(items []v3.OrderBy, tags []v3.AttributeKey) string {
@ -60,6 +71,29 @@ func OrderByAttributeKeyTags(items []v3.OrderBy, tags []v3.AttributeKey) string
return strings.Join(orderBy, ", ") return strings.Join(orderBy, ", ")
} }
func OrderByAttributeKeyTagsWithoutLe(items []v3.OrderBy, tags []v3.AttributeKey) string {
var orderBy []string
for _, tag := range tags {
if tag.Key != "le" {
found := false
for _, item := range items {
if item.ColumnName == tag.Key {
found = true
orderBy = append(orderBy, fmt.Sprintf("%s %s", item.ColumnName, item.Order))
break
}
}
if !found {
orderBy = append(orderBy, fmt.Sprintf("%s ASC", tag.Key))
}
}
}
orderBy = append(orderBy, "ts ASC")
return strings.Join(orderBy, ", ")
}
func SelectLabelsAny(tags []v3.AttributeKey) string { func SelectLabelsAny(tags []v3.AttributeKey) string {
var selectLabelsAny []string var selectLabelsAny []string
for _, tag := range tags { for _, tag := range tags {

View File

@ -3,14 +3,43 @@ package helpers
import ( import (
"fmt" "fmt"
"strings" "strings"
"time"
"go.signoz.io/signoz/pkg/query-service/constants" "go.signoz.io/signoz/pkg/query-service/constants"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3" v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/query-service/utils" "go.signoz.io/signoz/pkg/query-service/utils"
) )
var (
sixHoursInMilliseconds = time.Hour.Milliseconds() * 6
oneDayInMilliseconds = time.Hour.Milliseconds() * 24
)
// start and end are in milliseconds
func which(start, end int64) (int64, int64, string) {
// If time range is less than 6 hours, we need to use the `time_series_v4` table
// else if time range is less than 1 day and greater than 6 hours, we need to use the `time_series_v4_6hrs` table
// else we need to use the `time_series_v4_1day` table
var tableName string
if end-start <= sixHoursInMilliseconds {
// adjust the start time to nearest 1 hour
start = start - (start % (time.Hour.Milliseconds() * 1))
tableName = constants.SIGNOZ_TIMESERIES_v4_LOCAL_TABLENAME
} else if end-start <= oneDayInMilliseconds {
// adjust the start time to nearest 6 hours
start = start - (start % (time.Hour.Milliseconds() * 6))
tableName = constants.SIGNOZ_TIMESERIES_v4_6HRS_LOCAL_TABLENAME
} else {
// adjust the start time to nearest 1 day
start = start - (start % (time.Hour.Milliseconds() * 24))
tableName = constants.SIGNOZ_TIMESERIES_v4_1DAY_LOCAL_TABLENAME
}
return start, end, tableName
}
// PrepareTimeseriesFilterQuery builds the sub-query to be used for filtering timeseries based on the search criteria // PrepareTimeseriesFilterQuery builds the sub-query to be used for filtering timeseries based on the search criteria
func PrepareTimeseriesFilterQuery(mq *v3.BuilderQuery) (string, error) { func PrepareTimeseriesFilterQuery(start, end int64, mq *v3.BuilderQuery) (string, error) {
var conditions []string var conditions []string
var fs *v3.FilterSet = mq.Filters var fs *v3.FilterSet = mq.Filters
var groupTags []v3.AttributeKey = mq.GroupBy var groupTags []v3.AttributeKey = mq.GroupBy
@ -18,6 +47,10 @@ func PrepareTimeseriesFilterQuery(mq *v3.BuilderQuery) (string, error) {
conditions = append(conditions, fmt.Sprintf("metric_name = %s", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key))) conditions = append(conditions, fmt.Sprintf("metric_name = %s", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key)))
conditions = append(conditions, fmt.Sprintf("temporality = '%s'", mq.Temporality)) conditions = append(conditions, fmt.Sprintf("temporality = '%s'", mq.Temporality))
start, end, tableName := which(start, end)
conditions = append(conditions, fmt.Sprintf("unix_milli >= %d AND unix_milli < %d", start, end))
if fs != nil && len(fs.Items) != 0 { if fs != nil && len(fs.Items) != 0 {
for _, item := range fs.Items { for _, item := range fs.Items {
toFormat := item.Value toFormat := item.Value
@ -78,7 +111,7 @@ func PrepareTimeseriesFilterQuery(mq *v3.BuilderQuery) (string, error) {
"SELECT DISTINCT %s FROM %s.%s WHERE %s", "SELECT DISTINCT %s FROM %s.%s WHERE %s",
selectLabels, selectLabels,
constants.SIGNOZ_METRIC_DBNAME, constants.SIGNOZ_METRIC_DBNAME,
constants.SIGNOZ_TIMESERIES_LOCAL_TABLENAME, tableName,
whereClause, whereClause,
) )

View File

@ -21,8 +21,10 @@ func PrepareMetricQuery(start, end int64, queryType v3.QueryType, panelType v3.P
start, end = common.AdjustedMetricTimeRange(start, end, mq.StepInterval, mq.TimeAggregation) start, end = common.AdjustedMetricTimeRange(start, end, mq.StepInterval, mq.TimeAggregation)
groupBy := helpers.GroupByAttributeKeyTags(mq.GroupBy...) if mq.ShiftBy != 0 {
orderBy := helpers.OrderByAttributeKeyTags(mq.OrderBy, mq.GroupBy) start = start - mq.ShiftBy*1000
end = end - mq.ShiftBy*1000
}
var quantile float64 var quantile float64
@ -33,11 +35,21 @@ func PrepareMetricQuery(start, end int64, queryType v3.QueryType, panelType v3.P
// and time aggregation to rate // and time aggregation to rate
mq.TimeAggregation = v3.TimeAggregationRate mq.TimeAggregation = v3.TimeAggregationRate
mq.SpaceAggregation = v3.SpaceAggregationSum mq.SpaceAggregation = v3.SpaceAggregationSum
mq.GroupBy = append(mq.GroupBy, v3.AttributeKey{ // If le is not present in group by for quantile, add it
Key: "le", leFound := false
Type: v3.AttributeKeyTypeTag, for _, groupBy := range mq.GroupBy {
DataType: v3.AttributeKeyDataTypeString, if groupBy.Key == "le" {
}) leFound = true
break
}
}
if !leFound {
mq.GroupBy = append(mq.GroupBy, v3.AttributeKey{
Key: "le",
Type: v3.AttributeKeyTypeTag,
DataType: v3.AttributeKeyDataTypeString,
})
}
} }
var query string var query string
@ -60,6 +72,15 @@ func PrepareMetricQuery(start, end int64, queryType v3.QueryType, panelType v3.P
return "", err return "", err
} }
groupByWithoutLe := []v3.AttributeKey{}
for _, groupBy := range mq.GroupBy {
if groupBy.Key != "le" {
groupByWithoutLe = append(groupByWithoutLe, groupBy)
}
}
groupBy := helpers.GroupByAttributeKeyTags(groupByWithoutLe...)
orderBy := helpers.OrderByAttributeKeyTags(mq.OrderBy, groupByWithoutLe)
if quantile != 0 { if quantile != 0 {
query = fmt.Sprintf(`SELECT %s, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), %.3f) as value FROM (%s) GROUP BY %s ORDER BY %s`, groupBy, quantile, query, groupBy, orderBy) query = fmt.Sprintf(`SELECT %s, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), %.3f) as value FROM (%s) GROUP BY %s ORDER BY %s`, groupBy, quantile, query, groupBy, orderBy)
} }

View File

@ -33,7 +33,7 @@ func TestPrepareTimeseriesFilterQuery(t *testing.T) {
Disabled: false, Disabled: false,
// remaining struct fields are not needed here // remaining struct fields are not needed here
}, },
expectedQueryContains: "SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'http_requests' AND temporality = 'Delta'", expectedQueryContains: "SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v4 WHERE metric_name = 'http_requests' AND temporality = 'Delta' AND unix_milli >= 1706428800000 AND unix_milli < 1706434026000",
}, },
{ {
name: "test prepare time series with no filters and group by", name: "test prepare time series with no filters and group by",
@ -58,7 +58,7 @@ func TestPrepareTimeseriesFilterQuery(t *testing.T) {
Disabled: false, Disabled: false,
// remaining struct fields are not needed here // remaining struct fields are not needed here
}, },
expectedQueryContains: "SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'http_requests' AND temporality = 'Cumulative'", expectedQueryContains: "SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v4 WHERE metric_name = 'http_requests' AND temporality = 'Cumulative' AND unix_milli >= 1706428800000 AND unix_milli < 1706434026000",
}, },
{ {
name: "test prepare time series with no filters and multiple group by", name: "test prepare time series with no filters and multiple group by",
@ -90,7 +90,7 @@ func TestPrepareTimeseriesFilterQuery(t *testing.T) {
Disabled: false, Disabled: false,
// remaining struct fields are not needed here // remaining struct fields are not needed here
}, },
expectedQueryContains: "SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'endpoint') as endpoint, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'http_requests' AND temporality = 'Cumulative'", expectedQueryContains: "SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'endpoint') as endpoint, fingerprint FROM signoz_metrics.time_series_v4 WHERE metric_name = 'http_requests' AND temporality = 'Cumulative' AND unix_milli >= 1706428800000 AND unix_milli < 1706434026000",
}, },
{ {
name: "test prepare time series with filters and multiple group by", name: "test prepare time series with filters and multiple group by",
@ -138,13 +138,15 @@ func TestPrepareTimeseriesFilterQuery(t *testing.T) {
Disabled: false, Disabled: false,
// remaining struct fields are not needed here // remaining struct fields are not needed here
}, },
expectedQueryContains: "SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'http_requests' AND temporality = 'Cumulative' AND JSONExtractString(labels, 'service_name') != 'payment_service' AND JSONExtractString(labels, 'endpoint') IN ['/paycallback','/payme','/paypal']", expectedQueryContains: "SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v4 WHERE metric_name = 'http_requests' AND temporality = 'Cumulative' AND unix_milli >= 1706428800000 AND unix_milli < 1706434026000 AND JSONExtractString(labels, 'service_name') != 'payment_service' AND JSONExtractString(labels, 'endpoint') IN ['/paycallback','/payme','/paypal']",
}, },
} }
for _, testCase := range testCases { for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) { t.Run(testCase.name, func(t *testing.T) {
query, err := helpers.PrepareTimeseriesFilterQuery(testCase.builderQuery) // 1706432226000 - 2:27:06 PM (IST)
// 1706434026000 - 2:57:06 PM (IST)
query, err := helpers.PrepareTimeseriesFilterQuery(1706432226000, 1706434026000, testCase.builderQuery)
assert.Nil(t, err) assert.Nil(t, err)
assert.Contains(t, query, testCase.expectedQueryContains) assert.Contains(t, query, testCase.expectedQueryContains)
}) })
@ -191,7 +193,7 @@ func TestPrepareMetricQueryCumulativeRate(t *testing.T) {
TimeAggregation: v3.TimeAggregationRate, TimeAggregation: v3.TimeAggregationRate,
SpaceAggregation: v3.SpaceAggregationSum, SpaceAggregation: v3.SpaceAggregationSum,
}, },
expectedQueryContains: "SELECT service_name, ts, sum(per_series_value) as value FROM (SELECT service_name, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(service_name) as service_name, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_calls_total' AND temporality = 'Cumulative' AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_calls_total' AND timestamp_ms >= 1650991920000 AND timestamp_ms <= 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(per_series_value) = 0 GROUP BY GROUPING SETS ( (service_name, ts), (service_name) ) ORDER BY service_name ASC, ts ASC", expectedQueryContains: "SELECT service_name, ts, sum(per_series_value) as value FROM (SELECT service_name, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(service_name) as service_name, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, max(value) as per_series_value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'signoz_calls_total' AND temporality = 'Cumulative' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000 AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_calls_total' AND unix_milli >= 1650991920000 AND unix_milli < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(per_series_value) = 0 GROUP BY GROUPING SETS ( (service_name, ts), (service_name) ) ORDER BY service_name ASC, ts ASC",
}, },
{ {
name: "test time aggregation = rate, space aggregation = sum, temporality = cumulative, multiple group by", name: "test time aggregation = rate, space aggregation = sum, temporality = cumulative, multiple group by",
@ -224,12 +226,14 @@ func TestPrepareMetricQueryCumulativeRate(t *testing.T) {
TimeAggregation: v3.TimeAggregationRate, TimeAggregation: v3.TimeAggregationRate,
SpaceAggregation: v3.SpaceAggregationSum, SpaceAggregation: v3.SpaceAggregationSum,
}, },
expectedQueryContains: "SELECT service_name, endpoint, ts, sum(per_series_value) as value FROM (SELECT service_name, endpoint, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(service_name) as service_name, any(endpoint) as endpoint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'endpoint') as endpoint, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_calls_total' AND temporality = 'Cumulative') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_calls_total' AND timestamp_ms >= 1650991920000 AND timestamp_ms <= 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(per_series_value) = 0 GROUP BY GROUPING SETS ( (service_name, endpoint, ts), (service_name, endpoint) ) ORDER BY service_name ASC, endpoint ASC, ts ASC", expectedQueryContains: "SELECT service_name, endpoint, ts, sum(per_series_value) as value FROM (SELECT service_name, endpoint, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(service_name) as service_name, any(endpoint) as endpoint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, max(value) as per_series_value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'endpoint') as endpoint, fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'signoz_calls_total' AND temporality = 'Cumulative' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_calls_total' AND unix_milli >= 1650991920000 AND unix_milli < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(per_series_value) = 0 GROUP BY GROUPING SETS ( (service_name, endpoint, ts), (service_name, endpoint) ) ORDER BY service_name ASC, endpoint ASC, ts ASC",
}, },
} }
for _, testCase := range testCases { for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) { t.Run(testCase.name, func(t *testing.T) {
// 1650991982000 - April 26, 2022 10:23:02 PM
// 1651078382000 - April 27, 2022 10:23:02 PM
query, err := PrepareMetricQuery(1650991982000, 1651078382000, v3.QueryTypeBuilder, v3.PanelTypeGraph, testCase.builderQuery, metricsV3.Options{}) query, err := PrepareMetricQuery(1650991982000, 1651078382000, v3.QueryTypeBuilder, v3.PanelTypeGraph, testCase.builderQuery, metricsV3.Options{})
assert.Nil(t, err) assert.Nil(t, err)
assert.Contains(t, query, testCase.expectedQueryContains) assert.Contains(t, query, testCase.expectedQueryContains)
@ -262,7 +266,7 @@ func TestPrepareMetricQueryDeltaRate(t *testing.T) {
TimeAggregation: v3.TimeAggregationRate, TimeAggregation: v3.TimeAggregationRate,
SpaceAggregation: v3.SpaceAggregationSum, SpaceAggregation: v3.SpaceAggregationSum,
}, },
expectedQueryContains: "SELECT toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_calls_total' AND temporality = 'Delta') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_calls_total' AND timestamp_ms >= 1650991920000 AND timestamp_ms <= 1651078380000 GROUP BY ts ORDER BY ts ASC", expectedQueryContains: "SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'signoz_calls_total' AND temporality = 'Delta' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_calls_total' AND unix_milli >= 1650991920000 AND unix_milli < 1651078380000 GROUP BY ts ORDER BY ts ASC",
}, },
{ {
name: "test time aggregation = rate, space aggregation = sum, temporality = delta, group by service_name", name: "test time aggregation = rate, space aggregation = sum, temporality = delta, group by service_name",
@ -288,7 +292,7 @@ func TestPrepareMetricQueryDeltaRate(t *testing.T) {
TimeAggregation: v3.TimeAggregationRate, TimeAggregation: v3.TimeAggregationRate,
SpaceAggregation: v3.SpaceAggregationSum, SpaceAggregation: v3.SpaceAggregationSum,
}, },
expectedQueryContains: "SELECT service_name, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_calls_total' AND temporality = 'Delta') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_calls_total' AND timestamp_ms >= 1650991920000 AND timestamp_ms <= 1651078380000 GROUP BY GROUPING SETS ( (service_name, ts), (service_name) ) ORDER BY service_name ASC, ts ASC", expectedQueryContains: "SELECT service_name, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'signoz_calls_total' AND temporality = 'Delta' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_calls_total' AND unix_milli >= 1650991920000 AND unix_milli < 1651078380000 GROUP BY GROUPING SETS ( (service_name, ts), (service_name) ) ORDER BY service_name ASC, ts ASC",
}, },
} }
@ -340,7 +344,7 @@ func TestPrepreMetricQueryCumulativeQuantile(t *testing.T) {
Disabled: false, Disabled: false,
SpaceAggregation: v3.SpaceAggregationPercentile99, SpaceAggregation: v3.SpaceAggregationPercentile99,
}, },
expectedQueryContains: "SELECT service_name, ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT service_name, le, ts, sum(per_series_value) as value FROM (SELECT service_name, le, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(service_name) as service_name, any(le) as le, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Cumulative' AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(per_series_value) = 0 GROUP BY GROUPING SETS ( (service_name, le, ts), (service_name, le) ) ORDER BY service_name ASC, le ASC, ts ASC) GROUP BY service_name, ts ORDER BY service_name ASC, ts ASC", expectedQueryContains: "SELECT service_name, ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT service_name, le, ts, sum(per_series_value) as value FROM (SELECT service_name, le, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(service_name) as service_name, any(le) as le, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, max(value) as per_series_value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Cumulative' AND unix_milli >= 1650974400000 AND unix_milli < 1651078380000 AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(per_series_value) = 0 GROUP BY GROUPING SETS ( (service_name, le, ts), (service_name, le) ) ORDER BY service_name ASC, le ASC, ts ASC) GROUP BY service_name, ts ORDER BY service_name ASC, ts ASC",
}, },
{ {
name: "test temporality = cumulative, quantile = 0.99 without group by", name: "test temporality = cumulative, quantile = 0.99 without group by",
@ -370,7 +374,7 @@ func TestPrepreMetricQueryCumulativeQuantile(t *testing.T) {
Disabled: false, Disabled: false,
SpaceAggregation: v3.SpaceAggregationPercentile99, SpaceAggregation: v3.SpaceAggregationPercentile99,
}, },
expectedQueryContains: "SELECT ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT le, ts, sum(per_series_value) as value FROM (SELECT le, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(le) as le, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Cumulative' AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(per_series_value) = 0 GROUP BY GROUPING SETS ( (le, ts), (le) ) ORDER BY le ASC, ts ASC) GROUP BY ts ORDER BY ts ASC", expectedQueryContains: "SELECT ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT le, ts, sum(per_series_value) as value FROM (SELECT le, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(le) as le, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, max(value) as per_series_value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Cumulative' AND unix_milli >= 1650974400000 AND unix_milli < 1651078380000 AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(per_series_value) = 0 GROUP BY GROUPING SETS ( (le, ts), (le) ) ORDER BY le ASC, ts ASC) GROUP BY ts ORDER BY ts ASC",
}, },
} }
@ -422,7 +426,7 @@ func TestPrepreMetricQueryDeltaQuantile(t *testing.T) {
Disabled: false, Disabled: false,
SpaceAggregation: v3.SpaceAggregationPercentile99, SpaceAggregation: v3.SpaceAggregationPercentile99,
}, },
expectedQueryContains: "SELECT service_name, ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT service_name, le, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY GROUPING SETS ( (service_name, le, ts), (service_name, le) ) ORDER BY service_name ASC, le ASC, ts ASC) GROUP BY service_name, ts ORDER BY service_name ASC, ts ASC", expectedQueryContains: "SELECT service_name, ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT service_name, le, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' AND unix_milli >= 1650974400000 AND unix_milli < 1651078380000 AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY GROUPING SETS ( (service_name, le, ts), (service_name, le) ) ORDER BY service_name ASC, le ASC, ts ASC) GROUP BY service_name, ts ORDER BY service_name ASC, ts ASC",
}, },
{ {
name: "test temporality = delta, quantile = 0.99 no group by", name: "test temporality = delta, quantile = 0.99 no group by",
@ -452,7 +456,7 @@ func TestPrepreMetricQueryDeltaQuantile(t *testing.T) {
Disabled: false, Disabled: false,
SpaceAggregation: v3.SpaceAggregationPercentile99, SpaceAggregation: v3.SpaceAggregationPercentile99,
}, },
expectedQueryContains: "SELECT ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT le, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY GROUPING SETS ( (le, ts), (le) ) ORDER BY le ASC, ts ASC) GROUP BY ts ORDER BY ts ASC", expectedQueryContains: "SELECT ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT le, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' AND unix_milli >= 1650974400000 AND unix_milli < 1651078380000 AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY GROUPING SETS ( (le, ts), (le) ) ORDER BY le ASC, ts ASC) GROUP BY ts ORDER BY ts ASC",
}, },
} }
@ -490,7 +494,7 @@ func TestPrepareMetricQueryGauge(t *testing.T) {
SpaceAggregation: v3.SpaceAggregationSum, SpaceAggregation: v3.SpaceAggregationSum,
Disabled: false, Disabled: false,
}, },
expectedQueryContains: "SELECT ts, sum(per_series_value) as value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, avg(value) as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'system_cpu_usage' AND temporality = 'Unspecified') as filtered_time_series USING fingerprint WHERE metric_name = 'system_cpu_usage' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(per_series_value) = 0 GROUP BY ts ORDER BY ts ASC", expectedQueryContains: "SELECT ts, sum(per_series_value) as value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, avg(value) as per_series_value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name = 'system_cpu_usage' AND temporality = 'Unspecified' AND unix_milli >= 1650974400000 AND unix_milli < 1651078380000) as filtered_time_series USING fingerprint WHERE metric_name = 'system_cpu_usage' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(per_series_value) = 0 GROUP BY ts ORDER BY ts ASC",
}, },
{ {
name: "test gauge query with group by host_name", name: "test gauge query with group by host_name",
@ -516,7 +520,7 @@ func TestPrepareMetricQueryGauge(t *testing.T) {
Expression: "A", Expression: "A",
Disabled: false, Disabled: false,
}, },
expectedQueryContains: "SELECT host_name, ts, sum(per_series_value) as value FROM (SELECT fingerprint, any(host_name) as host_name, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, avg(value) as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'host_name') as host_name, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'system_cpu_usage' AND temporality = 'Unspecified') as filtered_time_series USING fingerprint WHERE metric_name = 'system_cpu_usage' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(per_series_value) = 0 GROUP BY GROUPING SETS ( (host_name, ts), (host_name) ) ORDER BY host_name ASC, ts ASC", expectedQueryContains: "SELECT host_name, ts, sum(per_series_value) as value FROM (SELECT fingerprint, any(host_name) as host_name, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, avg(value) as per_series_value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'host_name') as host_name, fingerprint FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name = 'system_cpu_usage' AND temporality = 'Unspecified' AND unix_milli >= 1650974400000 AND unix_milli < 1651078380000) as filtered_time_series USING fingerprint WHERE metric_name = 'system_cpu_usage' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(per_series_value) = 0 GROUP BY GROUPING SETS ( (host_name, ts), (host_name) ) ORDER BY host_name ASC, ts ASC",
}, },
} }

View File

@ -935,11 +935,10 @@ func validateExpressions(expressions []string, funcs map[string]govaluate.Expres
for _, exp := range expressions { for _, exp := range expressions {
evalExp, err := govaluate.NewEvaluableExpressionWithFunctions(exp, funcs) evalExp, err := govaluate.NewEvaluableExpressionWithFunctions(exp, funcs)
if err != nil { if err != nil {
errs = append(errs, err) errs = append(errs, fmt.Errorf("invalid expression %s: %v", exp, err))
continue continue
} }
variables := evalExp.Vars() for _, v := range evalExp.Vars() {
for _, v := range variables {
var hasVariable bool var hasVariable bool
for _, q := range cq.BuilderQueries { for _, q := range cq.BuilderQueries {
if q.Expression == v { if q.Expression == v {
@ -961,7 +960,7 @@ func ParseQueryRangeParams(r *http.Request) (*v3.QueryRangeParamsV3, *model.ApiE
// parse the request body // parse the request body
if err := json.NewDecoder(r.Body).Decode(&queryRangeParams); err != nil { if err := json.NewDecoder(r.Body).Decode(&queryRangeParams); err != nil {
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: err} return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("cannot parse the request body: %v", err)}
} }
// validate the request body // validate the request body
@ -969,7 +968,7 @@ func ParseQueryRangeParams(r *http.Request) (*v3.QueryRangeParamsV3, *model.ApiE
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: err} return nil, &model.ApiError{Typ: model.ErrorBadData, Err: err}
} }
// prepare the variables for the corrspnding query type // prepare the variables for the corresponding query type
formattedVars := make(map[string]interface{}) formattedVars := make(map[string]interface{})
for name, value := range queryRangeParams.Variables { for name, value := range queryRangeParams.Variables {
if queryRangeParams.CompositeQuery.QueryType == v3.QueryTypePromQL { if queryRangeParams.CompositeQuery.QueryType == v3.QueryTypePromQL {
@ -1046,6 +1045,25 @@ func ParseQueryRangeParams(r *http.Request) (*v3.QueryRangeParamsV3, *model.ApiE
} }
} }
} }
var timeShiftBy int64
if len(query.Functions) > 0 {
for idx := range query.Functions {
function := &query.Functions[idx]
if function.Name == v3.FunctionNameTimeShift {
// move the function to the beginning of the list
// so any other function can use the shifted time
var fns []v3.Function
fns = append(fns, *function)
fns = append(fns, query.Functions[:idx]...)
fns = append(fns, query.Functions[idx+1:]...)
query.Functions = fns
timeShiftBy = int64(function.Args[0].(float64))
break
}
}
}
query.ShiftBy = timeShiftBy
} }
} }
queryRangeParams.Variables = formattedVars queryRangeParams.Variables = formattedVars

View File

@ -276,14 +276,9 @@ func (q *querier) runBuilderQueries(ctx context.Context, params *v3.QueryRangePa
var wg sync.WaitGroup var wg sync.WaitGroup
for queryName, builderQuery := range params.CompositeQuery.BuilderQueries { for queryName, builderQuery := range params.CompositeQuery.BuilderQueries {
if builderQuery.Disabled {
continue
}
wg.Add(1)
if queryName == builderQuery.Expression { if queryName == builderQuery.Expression {
wg.Add(1)
go q.runBuilderQuery(ctx, builderQuery, params, keys, cacheKeys, ch, &wg) go q.runBuilderQuery(ctx, builderQuery, params, keys, cacheKeys, ch, &wg)
} else {
go q.runBuilderExpression(ctx, builderQuery, params, keys, cacheKeys, ch, &wg)
} }
} }

View File

@ -281,6 +281,21 @@ func ApplyFunction(fn v3.Function, result *v3.Result) *v3.Result {
return funcMedian5(result) return funcMedian5(result)
case v3.FunctionNameMedian7: case v3.FunctionNameMedian7:
return funcMedian7(result) return funcMedian7(result)
case v3.FunctionNameTimeShift:
shift, ok := fn.Args[0].(float64)
if !ok {
return result
}
return funcTimeShift(result, shift)
}
return result
}
func funcTimeShift(result *v3.Result, shift float64) *v3.Result {
for _, series := range result.Series {
for idx, point := range series.Points {
series.Points[idx].Timestamp = point.Timestamp + int64(shift)*1000
}
} }
return result return result
} }

View File

@ -203,12 +203,16 @@ var GroupByColMap = map[string]struct{}{
} }
const ( const (
SIGNOZ_METRIC_DBNAME = "signoz_metrics" SIGNOZ_METRIC_DBNAME = "signoz_metrics"
SIGNOZ_SAMPLES_TABLENAME = "distributed_samples_v2" SIGNOZ_SAMPLES_TABLENAME = "distributed_samples_v2"
SIGNOZ_TIMESERIES_TABLENAME = "distributed_time_series_v2" SIGNOZ_SAMPLES_V4_TABLENAME = "distributed_samples_v4"
SIGNOZ_TRACE_DBNAME = "signoz_traces" SIGNOZ_TIMESERIES_TABLENAME = "distributed_time_series_v2"
SIGNOZ_SPAN_INDEX_TABLENAME = "distributed_signoz_index_v2" SIGNOZ_TRACE_DBNAME = "signoz_traces"
SIGNOZ_TIMESERIES_LOCAL_TABLENAME = "time_series_v2" SIGNOZ_SPAN_INDEX_TABLENAME = "distributed_signoz_index_v2"
SIGNOZ_TIMESERIES_LOCAL_TABLENAME = "time_series_v2"
SIGNOZ_TIMESERIES_v4_LOCAL_TABLENAME = "time_series_v4"
SIGNOZ_TIMESERIES_v4_6HRS_LOCAL_TABLENAME = "time_series_v4_6hrs"
SIGNOZ_TIMESERIES_v4_1DAY_LOCAL_TABLENAME = "time_series_v4_1day"
) )
var TimeoutExcludedRoutes = map[string]bool{ var TimeoutExcludedRoutes = map[string]bool{

View File

@ -99,6 +99,7 @@ type Reader interface {
CheckClickHouse(ctx context.Context) error CheckClickHouse(ctx context.Context) error
GetLatencyMetricMetadata(context.Context, string, string, bool) (*v3.LatencyMetricMetadataResponse, error) GetLatencyMetricMetadata(context.Context, string, string, bool) (*v3.LatencyMetricMetadataResponse, error)
GetMetricMetadata(context.Context, string, string) (*v3.MetricMetadataResponse, error)
} }
type Querier interface { type Querier interface {

View File

@ -463,6 +463,23 @@ const (
TimeAggregationIncrease TimeAggregation = "increase" TimeAggregationIncrease TimeAggregation = "increase"
) )
func (t TimeAggregation) Validate() error {
switch t {
case TimeAggregationAnyLast,
TimeAggregationSum,
TimeAggregationAvg,
TimeAggregationMin,
TimeAggregationMax,
TimeAggregationCount,
TimeAggregationCountDistinct,
TimeAggregationRate,
TimeAggregationIncrease:
return nil
default:
return fmt.Errorf("invalid time aggregation: %s", t)
}
}
func (t TimeAggregation) IsRateOperator() bool { func (t TimeAggregation) IsRateOperator() bool {
switch t { switch t {
case TimeAggregationRate, TimeAggregationIncrease: case TimeAggregationRate, TimeAggregationIncrease:
@ -488,6 +505,24 @@ const (
SpaceAggregationPercentile99 SpaceAggregation = "percentile_99" SpaceAggregationPercentile99 SpaceAggregation = "percentile_99"
) )
func (s SpaceAggregation) Validate() error {
switch s {
case SpaceAggregationSum,
SpaceAggregationAvg,
SpaceAggregationMin,
SpaceAggregationMax,
SpaceAggregationCount,
SpaceAggregationPercentile50,
SpaceAggregationPercentile75,
SpaceAggregationPercentile90,
SpaceAggregationPercentile95,
SpaceAggregationPercentile99:
return nil
default:
return fmt.Errorf("invalid space aggregation: %s", s)
}
}
func IsPercentileOperator(operator SpaceAggregation) bool { func IsPercentileOperator(operator SpaceAggregation) bool {
switch operator { switch operator {
case SpaceAggregationPercentile50, case SpaceAggregationPercentile50,
@ -536,6 +571,7 @@ const (
FunctionNameMedian3 FunctionName = "median3" FunctionNameMedian3 FunctionName = "median3"
FunctionNameMedian5 FunctionName = "median5" FunctionNameMedian5 FunctionName = "median5"
FunctionNameMedian7 FunctionName = "median7" FunctionNameMedian7 FunctionName = "median7"
FunctionNameTimeShift FunctionName = "timeShift"
) )
func (f FunctionName) Validate() error { func (f FunctionName) Validate() error {
@ -553,7 +589,8 @@ func (f FunctionName) Validate() error {
FunctionNameEWMA7, FunctionNameEWMA7,
FunctionNameMedian3, FunctionNameMedian3,
FunctionNameMedian5, FunctionNameMedian5,
FunctionNameMedian7: FunctionNameMedian7,
FunctionNameTimeShift:
return nil return nil
default: default:
return fmt.Errorf("invalid function name: %s", f) return fmt.Errorf("invalid function name: %s", f)
@ -587,6 +624,7 @@ type BuilderQuery struct {
TimeAggregation TimeAggregation `json:"timeAggregation,omitempty"` TimeAggregation TimeAggregation `json:"timeAggregation,omitempty"`
SpaceAggregation SpaceAggregation `json:"spaceAggregation,omitempty"` SpaceAggregation SpaceAggregation `json:"spaceAggregation,omitempty"`
Functions []Function `json:"functions,omitempty"` Functions []Function `json:"functions,omitempty"`
ShiftBy int64
} }
func (b *BuilderQuery) Validate() error { func (b *BuilderQuery) Validate() error {
@ -604,10 +642,19 @@ func (b *BuilderQuery) Validate() error {
return fmt.Errorf("data source is invalid: %w", err) return fmt.Errorf("data source is invalid: %w", err)
} }
if b.DataSource == DataSourceMetrics { if b.DataSource == DataSourceMetrics {
if b.TimeAggregation == TimeAggregationUnspecified { // if AggregateOperator is specified, then the request is using v3 payload
if b.AggregateOperator != "" {
if err := b.AggregateOperator.Validate(); err != nil { if err := b.AggregateOperator.Validate(); err != nil {
return fmt.Errorf("aggregate operator is invalid: %w", err) return fmt.Errorf("aggregate operator is invalid: %w", err)
} }
} else {
if err := b.TimeAggregation.Validate(); err != nil {
return fmt.Errorf("time aggregation is invalid: %w", err)
}
if err := b.SpaceAggregation.Validate(); err != nil {
return fmt.Errorf("space aggregation is invalid: %w", err)
}
} }
} else { } else {
if err := b.AggregateOperator.Validate(); err != nil { if err := b.AggregateOperator.Validate(); err != nil {
@ -661,6 +708,28 @@ func (b *BuilderQuery) Validate() error {
if err := function.Name.Validate(); err != nil { if err := function.Name.Validate(); err != nil {
return fmt.Errorf("function name is invalid: %w", err) return fmt.Errorf("function name is invalid: %w", err)
} }
if function.Name == FunctionNameTimeShift {
if len(function.Args) == 0 {
return fmt.Errorf("timeShiftBy param missing in query")
}
} else if function.Name == FunctionNameEWMA3 ||
function.Name == FunctionNameEWMA5 ||
function.Name == FunctionNameEWMA7 {
if len(function.Args) == 0 {
return fmt.Errorf("alpha param missing in query")
}
alpha := function.Args[0].(float64)
if alpha < 0 || alpha > 1 {
return fmt.Errorf("alpha param should be between 0 and 1")
}
} else if function.Name == FunctionNameCutOffMax ||
function.Name == FunctionNameCutOffMin ||
function.Name == FunctionNameClampMax ||
function.Name == FunctionNameClampMin {
if len(function.Args) == 0 {
return fmt.Errorf("threshold param missing in query")
}
}
} }
} }
@ -919,3 +988,13 @@ type LatencyMetricMetadataResponse struct {
Delta bool `json:"delta"` Delta bool `json:"delta"`
Le []float64 `json:"le"` Le []float64 `json:"le"`
} }
type MetricMetadataResponse struct {
Delta bool `json:"delta"`
Le []float64 `json:"le"`
Description string `json:"description"`
Unit string `json:"unit"`
Type string `json:"type"`
IsMonotonic bool `json:"isMonotonic"`
Temporality string `json:"temporality"`
}