diff --git a/ee/query-service/app/api/api.go b/ee/query-service/app/api/api.go index bb36fdf479..2e2eb8ded5 100644 --- a/ee/query-service/app/api/api.go +++ b/ee/query-service/app/api/api.go @@ -38,8 +38,7 @@ type APIHandlerOptions struct { Cache cache.Cache Gateway *httputil.ReverseProxy // Querier Influx Interval - FluxInterval time.Duration - + FluxInterval time.Duration UseLogsNewSchema bool } diff --git a/pkg/query-service/app/clickhouseReader/options.go b/pkg/query-service/app/clickhouseReader/options.go index 538cef33e5..695bef8570 100644 --- a/pkg/query-service/app/clickhouseReader/options.go +++ b/pkg/query-service/app/clickhouseReader/options.go @@ -40,6 +40,11 @@ const ( defaultWriteBatchDelay time.Duration = 5 * time.Second defaultWriteBatchSize int = 10000 defaultEncoding Encoding = EncodingJSON + + defaultLogsLocalTableV2 string = "logs_v2" + defaultLogsTableV2 string = "distributed_logs_v2" + defaultLogsResourceLocalTableV2 string = "logs_v2_resource" + defaultLogsResourceTableV2 string = "distributed_logs_v2_resource" ) // NamespaceConfig is Clickhouse's internal configuration data @@ -72,6 +77,11 @@ type namespaceConfig struct { WriteBatchSize int Encoding Encoding Connector Connector + + LogsLocalTableV2 string + LogsTableV2 string + LogsResourceLocalTableV2 string + LogsResourceTableV2 string } // Connecto defines how to connect to the database @@ -159,6 +169,11 @@ func NewOptions( WriteBatchSize: defaultWriteBatchSize, Encoding: defaultEncoding, Connector: defaultConnector, + + LogsTableV2: defaultLogsTableV2, + LogsLocalTableV2: defaultLogsLocalTableV2, + LogsResourceTableV2: defaultLogsResourceTableV2, + LogsResourceLocalTableV2: defaultLogsResourceLocalTableV2, }, others: make(map[string]*namespaceConfig, len(otherNamespaces)), } diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index b3ef773da0..a3d2536aa8 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -89,6 +89,7 @@ const ( maxProgressiveSteps = 4 charset = "abcdefghijklmnopqrstuvwxyz" + "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" + NANOSECOND = 1000000000 ) var ( @@ -125,6 +126,11 @@ type ClickHouseReader struct { fanoutStorage *storage.Storage queryProgressTracker queryprogress.QueryProgressTracker + logsTableV2 string + logsLocalTableV2 string + logsResourceTableV2 string + logsResourceLocalTableV2 string + promConfigFile string promConfig *config.Config alertManager am.Manager @@ -134,6 +140,7 @@ type ClickHouseReader struct { cluster string useLogsNewSchema bool + logsTableName string } // NewTraceReader returns a TraceReader for the database @@ -197,6 +204,11 @@ func NewReaderFromClickhouseConnection( }, } + logsTableName := options.primary.LogsTable + if useLogsNewSchema { + logsTableName = options.primary.LogsTableV2 + } + return &ClickHouseReader{ db: wrap, localDB: localDB, @@ -223,7 +235,14 @@ func NewReaderFromClickhouseConnection( featureFlags: featureFlag, cluster: cluster, queryProgressTracker: queryprogress.NewQueryProgressTracker(), - useLogsNewSchema: useLogsNewSchema, + + useLogsNewSchema: useLogsNewSchema, + + logsTableV2: options.primary.LogsTableV2, + logsLocalTableV2: options.primary.LogsLocalTableV2, + logsResourceTableV2: options.primary.LogsResourceTableV2, + logsResourceLocalTableV2: options.primary.LogsResourceLocalTableV2, + logsTableName: logsTableName, } } @@ -3518,7 +3537,7 @@ func (r *ClickHouseReader) GetLogFields(ctx context.Context) (*model.GetFieldsRe resources = removeUnderscoreDuplicateFields(resources) statements := []model.ShowCreateTableStatement{} - query = fmt.Sprintf("SHOW CREATE TABLE %s.%s", r.logsDB, r.logsLocalTable) + query = fmt.Sprintf("SHOW CREATE TABLE %s.%s", r.logsDB, r.logsTable) err = r.db.Select(ctx, &statements, query) if err != nil { return nil, &model.ApiError{Err: err, Typ: model.ErrorInternal} @@ -3549,6 +3568,72 @@ func isSelectedField(tableStatement string, field model.LogField) bool { return strings.Contains(tableStatement, name) } +func (r *ClickHouseReader) UpdateLogFieldV2(ctx context.Context, field *model.UpdateField) *model.ApiError { + if !field.Selected { + return model.ForbiddenError(errors.New("removing a selected field is not allowed, please reach out to support.")) + } + + colname := utils.GetClickhouseColumnNameV2(field.Type, field.DataType, field.Name) + + dataType := strings.ToLower(field.DataType) + if dataType == "int64" || dataType == "float64" { + dataType = "number" + } + attrColName := fmt.Sprintf("%s_%s", field.Type, dataType) + for _, table := range []string{r.logsLocalTableV2, r.logsTableV2} { + q := "ALTER TABLE %s.%s ON CLUSTER %s ADD COLUMN IF NOT EXISTS `%s` %s DEFAULT %s['%s'] CODEC(ZSTD(1))" + query := fmt.Sprintf(q, + r.logsDB, table, + r.cluster, + colname, field.DataType, + attrColName, + field.Name, + ) + err := r.db.Exec(ctx, query) + if err != nil { + return &model.ApiError{Err: err, Typ: model.ErrorInternal} + } + + query = fmt.Sprintf("ALTER TABLE %s.%s ON CLUSTER %s ADD COLUMN IF NOT EXISTS `%s_exists` bool DEFAULT if(mapContains(%s, '%s') != 0, true, false) CODEC(ZSTD(1))", + r.logsDB, table, + r.cluster, + colname, + attrColName, + field.Name, + ) + err = r.db.Exec(ctx, query) + if err != nil { + return &model.ApiError{Err: err, Typ: model.ErrorInternal} + } + } + + // create the index + if strings.ToLower(field.DataType) == "bool" { + // there is no point in creating index for bool attributes as the cardinality is just 2 + return nil + } + + if field.IndexType == "" { + field.IndexType = constants.DefaultLogSkipIndexType + } + if field.IndexGranularity == 0 { + field.IndexGranularity = constants.DefaultLogSkipIndexGranularity + } + query := fmt.Sprintf("ALTER TABLE %s.%s ON CLUSTER %s ADD INDEX IF NOT EXISTS `%s_idx` (`%s`) TYPE %s GRANULARITY %d", + r.logsDB, r.logsLocalTableV2, + r.cluster, + colname, + colname, + field.IndexType, + field.IndexGranularity, + ) + err := r.db.Exec(ctx, query) + if err != nil { + return &model.ApiError{Err: err, Typ: model.ErrorInternal} + } + return nil +} + func (r *ClickHouseReader) UpdateLogField(ctx context.Context, field *model.UpdateField) *model.ApiError { // don't allow updating static fields if field.Type == constants.Static { @@ -3556,10 +3641,14 @@ func (r *ClickHouseReader) UpdateLogField(ctx context.Context, field *model.Upda return &model.ApiError{Err: err, Typ: model.ErrorBadData} } - colname := utils.GetClickhouseColumnName(field.Type, field.DataType, field.Name) + if r.useLogsNewSchema { + return r.UpdateLogFieldV2(ctx, field) + } // if a field is selected it means that the field needs to be indexed if field.Selected { + colname := utils.GetClickhouseColumnName(field.Type, field.DataType, field.Name) + keyColName := fmt.Sprintf("%s_%s_key", field.Type, strings.ToLower(field.DataType)) valueColName := fmt.Sprintf("%s_%s_value", field.Type, strings.ToLower(field.DataType)) @@ -4150,10 +4239,14 @@ func (r *ClickHouseReader) GetLatestReceivedMetric( return result, nil } -func isColumn(tableStatement, attrType, field, datType string) bool { +func isColumn(useLogsNewSchema bool, tableStatement, attrType, field, datType string) bool { // value of attrType will be `resource` or `tag`, if `tag` change it to `attribute` - name := utils.GetClickhouseColumnName(attrType, datType, field) - + var name string + if useLogsNewSchema { + name = utils.GetClickhouseColumnNameV2(attrType, datType, field) + } else { + name = utils.GetClickhouseColumnName(attrType, datType, field) + } return strings.Contains(tableStatement, fmt.Sprintf("%s ", name)) } @@ -4209,7 +4302,7 @@ func (r *ClickHouseReader) GetLogAggregateAttributes(ctx context.Context, req *v defer rows.Close() statements := []model.ShowCreateTableStatement{} - query = fmt.Sprintf("SHOW CREATE TABLE %s.%s", r.logsDB, r.logsLocalTable) + query = fmt.Sprintf("SHOW CREATE TABLE %s.%s", r.logsDB, r.logsTable) err = r.db.Select(ctx, &statements, query) if err != nil { return nil, fmt.Errorf("error while fetching logs schema: %s", err.Error()) @@ -4226,7 +4319,7 @@ func (r *ClickHouseReader) GetLogAggregateAttributes(ctx context.Context, req *v Key: tagKey, DataType: v3.AttributeKeyDataType(dataType), Type: v3.AttributeKeyType(attType), - IsColumn: isColumn(statements[0].Statement, attType, tagKey, dataType), + IsColumn: isColumn(r.useLogsNewSchema, statements[0].Statement, attType, tagKey, dataType), } response.AttributeKeys = append(response.AttributeKeys, key) } @@ -4263,7 +4356,7 @@ func (r *ClickHouseReader) GetLogAttributeKeys(ctx context.Context, req *v3.Filt defer rows.Close() statements := []model.ShowCreateTableStatement{} - query = fmt.Sprintf("SHOW CREATE TABLE %s.%s", r.logsDB, r.logsLocalTable) + query = fmt.Sprintf("SHOW CREATE TABLE %s.%s", r.logsDB, r.logsTable) err = r.db.Select(ctx, &statements, query) if err != nil { return nil, fmt.Errorf("error while fetching logs schema: %s", err.Error()) @@ -4281,7 +4374,7 @@ func (r *ClickHouseReader) GetLogAttributeKeys(ctx context.Context, req *v3.Filt Key: attributeKey, DataType: v3.AttributeKeyDataType(attributeDataType), Type: v3.AttributeKeyType(tagType), - IsColumn: isColumn(statements[0].Statement, tagType, attributeKey, attributeDataType), + IsColumn: isColumn(r.useLogsNewSchema, statements[0].Statement, tagType, attributeKey, attributeDataType), } response.AttributeKeys = append(response.AttributeKeys, key) @@ -4315,7 +4408,7 @@ func (r *ClickHouseReader) GetLogAttributeValues(ctx context.Context, req *v3.Fi } // ignore autocomplete request for body - if req.FilterAttributeKey == "body" { + if req.FilterAttributeKey == "body" || req.FilterAttributeKey == "__attrs" { return &v3.FilterAttributeValueResponse{}, nil } @@ -5262,6 +5355,59 @@ func (r *ClickHouseReader) GetSpanAttributeKeys(ctx context.Context) (map[string return response, nil } +func (r *ClickHouseReader) LiveTailLogsV4(ctx context.Context, query string, timestampStart uint64, idStart string, client *v3.LogsLiveTailClientV2) { + if timestampStart == 0 { + timestampStart = uint64(time.Now().UnixNano()) + } else { + timestampStart = uint64(utils.GetEpochNanoSecs(int64(timestampStart))) + } + + ticker := time.NewTicker(time.Duration(r.liveTailRefreshSeconds) * time.Second) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + done := true + client.Done <- &done + zap.L().Debug("closing go routine : " + client.Name) + return + case <-ticker.C: + // get the new 100 logs as anything more older won't make sense + var tmpQuery string + bucketStart := (timestampStart / NANOSECOND) - 1800 + + // we have to form the query differently if the resource filters are used + if strings.Contains(query, r.logsResourceTableV2) { + tmpQuery = fmt.Sprintf("seen_at_ts_bucket_start >=%d)) AND ts_bucket_start >=%d AND timestamp >=%d", bucketStart, bucketStart, timestampStart) + } else { + tmpQuery = fmt.Sprintf("ts_bucket_start >=%d AND timestamp >=%d", bucketStart, timestampStart) + } + if idStart != "" { + tmpQuery = fmt.Sprintf("%s AND id > '%s'", tmpQuery, idStart) + } + + // the reason we are doing desc is that we need the latest logs first + tmpQuery = query + tmpQuery + " order by timestamp desc, id desc limit 100" + + // using the old structure since we can directly read it to the struct as use it. + response := []model.SignozLogV2{} + err := r.db.Select(ctx, &response, tmpQuery) + if err != nil { + zap.L().Error("Error while getting logs", zap.Error(err)) + client.Error <- err + return + } + for i := len(response) - 1; i >= 0; i-- { + client.Logs <- &response[i] + if i == 0 { + timestampStart = response[i].Timestamp + idStart = response[i].ID + } + } + } + } +} + func (r *ClickHouseReader) LiveTailLogsV3(ctx context.Context, query string, timestampStart uint64, idStart string, client *v3.LogsLiveTailClient) { if timestampStart == 0 { timestampStart = uint64(time.Now().UnixNano()) diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 681a472b3a..6f9987a518 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -29,6 +29,7 @@ import ( "go.signoz.io/signoz/pkg/query-service/app/integrations" "go.signoz.io/signoz/pkg/query-service/app/logs" logsv3 "go.signoz.io/signoz/pkg/query-service/app/logs/v3" + logsv4 "go.signoz.io/signoz/pkg/query-service/app/logs/v4" "go.signoz.io/signoz/pkg/query-service/app/metrics" metricsv3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3" "go.signoz.io/signoz/pkg/query-service/app/preferences" @@ -143,7 +144,7 @@ type APIHandlerOpts struct { // Querier Influx Interval FluxInterval time.Duration - // Use new schema + // Use Logs New schema UseLogsNewSchema bool } @@ -195,10 +196,15 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) { UseLogsNewSchema: opts.UseLogsNewSchema, } + logsQueryBuilder := logsv3.PrepareLogsQuery + if opts.UseLogsNewSchema { + logsQueryBuilder = logsv4.PrepareLogsQuery + } + builderOpts := queryBuilder.QueryBuilderOptions{ BuildMetricQuery: metricsv3.PrepareMetricQuery, BuildTraceQuery: tracesV3.PrepareTracesQuery, - BuildLogQuery: logsv3.PrepareLogsQuery, + BuildLogQuery: logsQueryBuilder, } aH.queryBuilder = queryBuilder.NewQueryBuilder(builderOpts, aH.featureFlags) @@ -3927,7 +3933,93 @@ func (aH *APIHandler) GetQueryProgressUpdates(w http.ResponseWriter, r *http.Req } } +func (aH *APIHandler) liveTailLogsV2(w http.ResponseWriter, r *http.Request) { + + // get the param from url and add it to body + stringReader := strings.NewReader(r.URL.Query().Get("q")) + r.Body = io.NopCloser(stringReader) + + queryRangeParams, apiErrorObj := ParseQueryRangeParams(r) + if apiErrorObj != nil { + zap.L().Error(apiErrorObj.Err.Error()) + RespondError(w, apiErrorObj, nil) + return + } + + var err error + var queryString string + switch queryRangeParams.CompositeQuery.QueryType { + case v3.QueryTypeBuilder: + // check if any enrichment is required for logs if yes then enrich them + if logsv3.EnrichmentRequired(queryRangeParams) { + // get the fields if any logs query is present + var fields map[string]v3.AttributeKey + fields, err = aH.getLogFieldsV3(r.Context(), queryRangeParams) + if err != nil { + apiErrObj := &model.ApiError{Typ: model.ErrorInternal, Err: err} + RespondError(w, apiErrObj, nil) + return + } + logsv3.Enrich(queryRangeParams, fields) + } + + queryString, err = aH.queryBuilder.PrepareLiveTailQuery(queryRangeParams) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil) + return + } + + default: + err = fmt.Errorf("invalid query type") + RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil) + return + } + + w.Header().Set("Connection", "keep-alive") + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Access-Control-Allow-Origin", "*") + w.WriteHeader(200) + + flusher, ok := w.(http.Flusher) + if !ok { + err := model.ApiError{Typ: model.ErrorStreamingNotSupported, Err: nil} + RespondError(w, &err, "streaming is not supported") + return + } + + // flush the headers + flusher.Flush() + + // create the client + client := &v3.LogsLiveTailClientV2{Name: r.RemoteAddr, Logs: make(chan *model.SignozLogV2, 1000), Done: make(chan *bool), Error: make(chan error)} + go aH.reader.LiveTailLogsV4(r.Context(), queryString, uint64(queryRangeParams.Start), "", client) + for { + select { + case log := <-client.Logs: + var buf bytes.Buffer + enc := json.NewEncoder(&buf) + enc.Encode(log) + fmt.Fprintf(w, "data: %v\n\n", buf.String()) + flusher.Flush() + case <-client.Done: + zap.L().Debug("done!") + return + case err := <-client.Error: + zap.L().Error("error occurred", zap.Error(err)) + fmt.Fprintf(w, "event: error\ndata: %v\n\n", err.Error()) + flusher.Flush() + return + } + } + +} + func (aH *APIHandler) liveTailLogs(w http.ResponseWriter, r *http.Request) { + if aH.UseLogsNewSchema { + aH.liveTailLogsV2(w, r) + return + } // get the param from url and add it to body stringReader := strings.NewReader(r.URL.Query().Get("q")) @@ -4005,6 +4097,7 @@ func (aH *APIHandler) liveTailLogs(w http.ResponseWriter, r *http.Request) { return } } + } func (aH *APIHandler) getMetricMetadata(w http.ResponseWriter, r *http.Request) { diff --git a/pkg/query-service/app/logs/v3/query_builder.go b/pkg/query-service/app/logs/v3/query_builder.go index bd64b4d0e6..05bd799712 100644 --- a/pkg/query-service/app/logs/v3/query_builder.go +++ b/pkg/query-service/app/logs/v3/query_builder.go @@ -486,12 +486,6 @@ func AddOffsetToQuery(query string, offset uint64) string { return fmt.Sprintf("%s OFFSET %d", query, offset) } -type Options struct { - GraphLimitQtype string - IsLivetailQuery bool - PreferRPM bool -} - func IsOrderByTs(orderBy []v3.OrderBy) bool { if len(orderBy) == 1 && (orderBy[0].Key == constants.TIMESTAMP || orderBy[0].ColumnName == constants.TIMESTAMP) { return true @@ -502,7 +496,7 @@ func IsOrderByTs(orderBy []v3.OrderBy) bool { // PrepareLogsQuery prepares the query for logs // start and end are in epoch millisecond // step is in seconds -func PrepareLogsQuery(start, end int64, queryType v3.QueryType, panelType v3.PanelType, mq *v3.BuilderQuery, options Options) (string, error) { +func PrepareLogsQuery(start, end int64, queryType v3.QueryType, panelType v3.PanelType, mq *v3.BuilderQuery, options v3.LogQBOptions) (string, error) { // adjust the start and end time to the step interval // NOTE: Disabling this as it's creating confusion between charts and actual data diff --git a/pkg/query-service/app/logs/v3/query_builder_test.go b/pkg/query-service/app/logs/v3/query_builder_test.go index db57cb2549..0eb0c202e5 100644 --- a/pkg/query-service/app/logs/v3/query_builder_test.go +++ b/pkg/query-service/app/logs/v3/query_builder_test.go @@ -1201,7 +1201,7 @@ var testPrepLogsQueryData = []struct { TableName string AggregateOperator v3.AggregateOperator ExpectedQuery string - Options Options + Options v3.LogQBOptions }{ { Name: "Test TS with limit- first", @@ -1223,7 +1223,7 @@ var testPrepLogsQueryData = []struct { }, TableName: "logs", ExpectedQuery: "SELECT `method` from (SELECT attributes_string_value[indexOf(attributes_string_key, 'method')] as `method`, toFloat64(count(distinct(attributes_string_value[indexOf(attributes_string_key, 'name')]))) as value from signoz_logs.distributed_logs where (timestamp >= 1680066360726000000 AND timestamp <= 1680066458000000000) AND attributes_string_value[indexOf(attributes_string_key, 'method')] = 'GET' AND has(attributes_string_key, 'method') AND has(attributes_string_key, 'name') group by `method` order by value DESC) LIMIT 10", - Options: Options{GraphLimitQtype: constants.FirstQueryGraphLimit, PreferRPM: true}, + Options: v3.LogQBOptions{GraphLimitQtype: constants.FirstQueryGraphLimit, PreferRPM: true}, }, { Name: "Test TS with limit- first - with order by value", @@ -1246,7 +1246,7 @@ var testPrepLogsQueryData = []struct { }, TableName: "logs", ExpectedQuery: "SELECT `method` from (SELECT attributes_string_value[indexOf(attributes_string_key, 'method')] as `method`, toFloat64(count(distinct(attributes_string_value[indexOf(attributes_string_key, 'name')]))) as value from signoz_logs.distributed_logs where (timestamp >= 1680066360726000000 AND timestamp <= 1680066458000000000) AND attributes_string_value[indexOf(attributes_string_key, 'method')] = 'GET' AND has(attributes_string_key, 'method') AND has(attributes_string_key, 'name') group by `method` order by value ASC) LIMIT 10", - Options: Options{GraphLimitQtype: constants.FirstQueryGraphLimit, PreferRPM: true}, + Options: v3.LogQBOptions{GraphLimitQtype: constants.FirstQueryGraphLimit, PreferRPM: true}, }, { Name: "Test TS with limit- first - with order by attribute", @@ -1269,7 +1269,7 @@ var testPrepLogsQueryData = []struct { }, TableName: "logs", ExpectedQuery: "SELECT `method` from (SELECT attributes_string_value[indexOf(attributes_string_key, 'method')] as `method`, toFloat64(count(distinct(attributes_string_value[indexOf(attributes_string_key, 'name')]))) as value from signoz_logs.distributed_logs where (timestamp >= 1680066360726000000 AND timestamp <= 1680066458000000000) AND attributes_string_value[indexOf(attributes_string_key, 'method')] = 'GET' AND has(attributes_string_key, 'method') AND has(attributes_string_key, 'name') group by `method` order by `method` ASC) LIMIT 10", - Options: Options{GraphLimitQtype: constants.FirstQueryGraphLimit, PreferRPM: true}, + Options: v3.LogQBOptions{GraphLimitQtype: constants.FirstQueryGraphLimit, PreferRPM: true}, }, { Name: "Test TS with limit- second", @@ -1291,7 +1291,7 @@ var testPrepLogsQueryData = []struct { }, TableName: "logs", ExpectedQuery: "SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 60 SECOND) AS ts, attributes_string_value[indexOf(attributes_string_key, 'method')] as `method`, toFloat64(count(distinct(attributes_string_value[indexOf(attributes_string_key, 'name')]))) as value from signoz_logs.distributed_logs where (timestamp >= 1680066360726000000 AND timestamp <= 1680066458000000000) AND attributes_string_value[indexOf(attributes_string_key, 'method')] = 'GET' AND has(attributes_string_key, 'method') AND has(attributes_string_key, 'name') AND (`method`) GLOBAL IN (#LIMIT_PLACEHOLDER) group by `method`,ts order by value DESC", - Options: Options{GraphLimitQtype: constants.SecondQueryGraphLimit}, + Options: v3.LogQBOptions{GraphLimitQtype: constants.SecondQueryGraphLimit}, }, { Name: "Test TS with limit- second - with order by", @@ -1314,7 +1314,7 @@ var testPrepLogsQueryData = []struct { }, TableName: "logs", ExpectedQuery: "SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 60 SECOND) AS ts, attributes_string_value[indexOf(attributes_string_key, 'method')] as `method`, toFloat64(count(distinct(attributes_string_value[indexOf(attributes_string_key, 'name')]))) as value from signoz_logs.distributed_logs where (timestamp >= 1680066360726000000 AND timestamp <= 1680066458000000000) AND attributes_string_value[indexOf(attributes_string_key, 'method')] = 'GET' AND has(attributes_string_key, 'method') AND has(attributes_string_key, 'name') AND (`method`) GLOBAL IN (#LIMIT_PLACEHOLDER) group by `method`,ts order by `method` ASC", - Options: Options{GraphLimitQtype: constants.SecondQueryGraphLimit}, + Options: v3.LogQBOptions{GraphLimitQtype: constants.SecondQueryGraphLimit}, }, // Live tail { @@ -1334,7 +1334,7 @@ var testPrepLogsQueryData = []struct { }, TableName: "logs", ExpectedQuery: "SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, body,CAST((attributes_string_key, attributes_string_value), 'Map(String, String)') as attributes_string,CAST((attributes_int64_key, attributes_int64_value), 'Map(String, Int64)') as attributes_int64,CAST((attributes_float64_key, attributes_float64_value), 'Map(String, Float64)') as attributes_float64,CAST((attributes_bool_key, attributes_bool_value), 'Map(String, Bool)') as attributes_bool,CAST((resources_string_key, resources_string_value), 'Map(String, String)') as resources_string from signoz_logs.distributed_logs where attributes_string_value[indexOf(attributes_string_key, 'method')] = 'GET' AND ", - Options: Options{IsLivetailQuery: true}, + Options: v3.LogQBOptions{IsLivetailQuery: true}, }, { Name: "Live Tail Query with contains", @@ -1353,7 +1353,7 @@ var testPrepLogsQueryData = []struct { }, TableName: "logs", ExpectedQuery: "SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, body,CAST((attributes_string_key, attributes_string_value), 'Map(String, String)') as attributes_string,CAST((attributes_int64_key, attributes_int64_value), 'Map(String, Int64)') as attributes_int64,CAST((attributes_float64_key, attributes_float64_value), 'Map(String, Float64)') as attributes_float64,CAST((attributes_bool_key, attributes_bool_value), 'Map(String, Bool)') as attributes_bool,CAST((resources_string_key, resources_string_value), 'Map(String, String)') as resources_string from signoz_logs.distributed_logs where attributes_string_value[indexOf(attributes_string_key, 'method')] ILIKE '%GET%' AND ", - Options: Options{IsLivetailQuery: true}, + Options: v3.LogQBOptions{IsLivetailQuery: true}, }, { Name: "Live Tail Query W/O filter", @@ -1369,7 +1369,7 @@ var testPrepLogsQueryData = []struct { }, TableName: "logs", ExpectedQuery: "SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, body,CAST((attributes_string_key, attributes_string_value), 'Map(String, String)') as attributes_string,CAST((attributes_int64_key, attributes_int64_value), 'Map(String, Int64)') as attributes_int64,CAST((attributes_float64_key, attributes_float64_value), 'Map(String, Float64)') as attributes_float64,CAST((attributes_bool_key, attributes_bool_value), 'Map(String, Bool)') as attributes_bool,CAST((resources_string_key, resources_string_value), 'Map(String, String)') as resources_string from signoz_logs.distributed_logs where ", - Options: Options{IsLivetailQuery: true}, + Options: v3.LogQBOptions{IsLivetailQuery: true}, }, { Name: "Table query w/o limit", @@ -1385,7 +1385,7 @@ var testPrepLogsQueryData = []struct { }, TableName: "logs", ExpectedQuery: "SELECT now() as ts, toFloat64(count(*)) as value from signoz_logs.distributed_logs where (timestamp >= 1680066360726000000 AND timestamp <= 1680066458000000000) order by value DESC", - Options: Options{}, + Options: v3.LogQBOptions{}, }, { Name: "Table query with limit", @@ -1402,7 +1402,7 @@ var testPrepLogsQueryData = []struct { }, TableName: "logs", ExpectedQuery: "SELECT now() as ts, toFloat64(count(*)) as value from signoz_logs.distributed_logs where (timestamp >= 1680066360726000000 AND timestamp <= 1680066458000000000) order by value DESC LIMIT 10", - Options: Options{}, + Options: v3.LogQBOptions{}, }, { Name: "Ignore offset if order by is timestamp in list queries", @@ -1488,7 +1488,7 @@ var testPrepLogsQueryLimitOffsetData = []struct { TableName string AggregateOperator v3.AggregateOperator ExpectedQuery string - Options Options + Options v3.LogQBOptions }{ { Name: "Test limit less than pageSize - order by ts", diff --git a/pkg/query-service/app/logs/v4/query_builder.go b/pkg/query-service/app/logs/v4/query_builder.go index b96c5b9113..421b36aa62 100644 --- a/pkg/query-service/app/logs/v4/query_builder.go +++ b/pkg/query-service/app/logs/v4/query_builder.go @@ -451,7 +451,11 @@ func buildLogsLiveTailQuery(mq *v3.BuilderQuery) (string, error) { } // join both the filter clauses if resourceSubQuery != "" { - filterSubQuery = filterSubQuery + " AND (resource_fingerprint GLOBAL IN " + resourceSubQuery + if filterSubQuery != "" { + filterSubQuery = filterSubQuery + " AND (resource_fingerprint GLOBAL IN " + resourceSubQuery + } else { + filterSubQuery = "(resource_fingerprint GLOBAL IN " + resourceSubQuery + } } // the reader will add the timestamp and id filters diff --git a/pkg/query-service/app/querier/helper.go b/pkg/query-service/app/querier/helper.go index cf42828d05..a4ccfd047a 100644 --- a/pkg/query-service/app/querier/helper.go +++ b/pkg/query-service/app/querier/helper.go @@ -9,6 +9,7 @@ import ( "time" logsV3 "go.signoz.io/signoz/pkg/query-service/app/logs/v3" + logsV4 "go.signoz.io/signoz/pkg/query-service/app/logs/v4" metricsV3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3" tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3" "go.signoz.io/signoz/pkg/query-service/cache/status" @@ -19,6 +20,7 @@ import ( ) func prepareLogsQuery(_ context.Context, + useLogsNewSchema bool, start, end int64, builderQuery *v3.BuilderQuery, @@ -27,30 +29,35 @@ func prepareLogsQuery(_ context.Context, ) (string, error) { query := "" + logsQueryBuilder := logsV3.PrepareLogsQuery + if useLogsNewSchema { + logsQueryBuilder = logsV4.PrepareLogsQuery + } + if params == nil || builderQuery == nil { return query, fmt.Errorf("params and builderQuery cannot be nil") } // for ts query with limit replace it as it is already formed if params.CompositeQuery.PanelType == v3.PanelTypeGraph && builderQuery.Limit > 0 && len(builderQuery.GroupBy) > 0 { - limitQuery, err := logsV3.PrepareLogsQuery( + limitQuery, err := logsQueryBuilder( start, end, params.CompositeQuery.QueryType, params.CompositeQuery.PanelType, builderQuery, - logsV3.Options{GraphLimitQtype: constants.FirstQueryGraphLimit, PreferRPM: preferRPM}, + v3.LogQBOptions{GraphLimitQtype: constants.FirstQueryGraphLimit, PreferRPM: preferRPM}, ) if err != nil { return query, err } - placeholderQuery, err := logsV3.PrepareLogsQuery( + placeholderQuery, err := logsQueryBuilder( start, end, params.CompositeQuery.QueryType, params.CompositeQuery.PanelType, builderQuery, - logsV3.Options{GraphLimitQtype: constants.SecondQueryGraphLimit, PreferRPM: preferRPM}, + v3.LogQBOptions{GraphLimitQtype: constants.SecondQueryGraphLimit, PreferRPM: preferRPM}, ) if err != nil { return query, err @@ -59,13 +66,13 @@ func prepareLogsQuery(_ context.Context, return query, err } - query, err := logsV3.PrepareLogsQuery( + query, err := logsQueryBuilder( start, end, params.CompositeQuery.QueryType, params.CompositeQuery.PanelType, builderQuery, - logsV3.Options{PreferRPM: preferRPM}, + v3.LogQBOptions{PreferRPM: preferRPM}, ) if err != nil { return query, err @@ -101,7 +108,7 @@ func (q *querier) runBuilderQuery( var query string var err error if _, ok := cacheKeys[queryName]; !ok { - query, err = prepareLogsQuery(ctx, start, end, builderQuery, params, preferRPM) + query, err = prepareLogsQuery(ctx, q.UseLogsNewSchema, start, end, builderQuery, params, preferRPM) if err != nil { ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil} return @@ -125,7 +132,7 @@ func (q *querier) runBuilderQuery( missedSeries := make([]*v3.Series, 0) cachedSeries := make([]*v3.Series, 0) for _, miss := range misses { - query, err = prepareLogsQuery(ctx, miss.start, miss.end, builderQuery, params, preferRPM) + query, err = prepareLogsQuery(ctx, q.UseLogsNewSchema, miss.start, miss.end, builderQuery, params, preferRPM) if err != nil { ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil} return diff --git a/pkg/query-service/app/querier/querier.go b/pkg/query-service/app/querier/querier.go index 15b587c8f5..9adea09d47 100644 --- a/pkg/query-service/app/querier/querier.go +++ b/pkg/query-service/app/querier/querier.go @@ -11,6 +11,7 @@ import ( "time" logsV3 "go.signoz.io/signoz/pkg/query-service/app/logs/v3" + logsV4 "go.signoz.io/signoz/pkg/query-service/app/logs/v4" metricsV3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3" "go.signoz.io/signoz/pkg/query-service/app/queryBuilder" tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3" @@ -74,6 +75,11 @@ type QuerierOptions struct { } func NewQuerier(opts QuerierOptions) interfaces.Querier { + logsQueryBuilder := logsV3.PrepareLogsQuery + if opts.UseLogsNewSchema { + logsQueryBuilder = logsV4.PrepareLogsQuery + } + return &querier{ cache: opts.Cache, reader: opts.Reader, @@ -82,14 +88,15 @@ func NewQuerier(opts QuerierOptions) interfaces.Querier { builder: queryBuilder.NewQueryBuilder(queryBuilder.QueryBuilderOptions{ BuildTraceQuery: tracesV3.PrepareTracesQuery, - BuildLogQuery: logsV3.PrepareLogsQuery, + BuildLogQuery: logsQueryBuilder, BuildMetricQuery: metricsV3.PrepareMetricQuery, }, opts.FeatureLookup), featureLookUp: opts.FeatureLookup, - testingMode: opts.TestingMode, - returnedSeries: opts.ReturnedSeries, - returnedErr: opts.ReturnedErr, + testingMode: opts.TestingMode, + returnedSeries: opts.ReturnedSeries, + returnedErr: opts.ReturnedErr, + UseLogsNewSchema: opts.UseLogsNewSchema, } } diff --git a/pkg/query-service/app/querier/v2/helper.go b/pkg/query-service/app/querier/v2/helper.go index 126b532af6..f1dd33c4e6 100644 --- a/pkg/query-service/app/querier/v2/helper.go +++ b/pkg/query-service/app/querier/v2/helper.go @@ -9,6 +9,7 @@ import ( "time" logsV3 "go.signoz.io/signoz/pkg/query-service/app/logs/v3" + logsV4 "go.signoz.io/signoz/pkg/query-service/app/logs/v4" metricsV3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3" metricsV4 "go.signoz.io/signoz/pkg/query-service/app/metrics/v4" tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3" @@ -19,12 +20,17 @@ import ( ) func prepareLogsQuery(_ context.Context, + useLogsNewSchema bool, start, end int64, builderQuery *v3.BuilderQuery, params *v3.QueryRangeParamsV3, preferRPM bool, ) (string, error) { + logsQueryBuilder := logsV3.PrepareLogsQuery + if useLogsNewSchema { + logsQueryBuilder = logsV4.PrepareLogsQuery + } query := "" if params == nil || builderQuery == nil { @@ -33,24 +39,24 @@ func prepareLogsQuery(_ context.Context, // for ts query with limit replace it as it is already formed if params.CompositeQuery.PanelType == v3.PanelTypeGraph && builderQuery.Limit > 0 && len(builderQuery.GroupBy) > 0 { - limitQuery, err := logsV3.PrepareLogsQuery( + limitQuery, err := logsQueryBuilder( start, end, params.CompositeQuery.QueryType, params.CompositeQuery.PanelType, builderQuery, - logsV3.Options{GraphLimitQtype: constants.FirstQueryGraphLimit, PreferRPM: preferRPM}, + v3.LogQBOptions{GraphLimitQtype: constants.FirstQueryGraphLimit, PreferRPM: preferRPM}, ) if err != nil { return query, err } - placeholderQuery, err := logsV3.PrepareLogsQuery( + placeholderQuery, err := logsQueryBuilder( start, end, params.CompositeQuery.QueryType, params.CompositeQuery.PanelType, builderQuery, - logsV3.Options{GraphLimitQtype: constants.SecondQueryGraphLimit, PreferRPM: preferRPM}, + v3.LogQBOptions{GraphLimitQtype: constants.SecondQueryGraphLimit, PreferRPM: preferRPM}, ) if err != nil { return query, err @@ -59,13 +65,13 @@ func prepareLogsQuery(_ context.Context, return query, err } - query, err := logsV3.PrepareLogsQuery( + query, err := logsQueryBuilder( start, end, params.CompositeQuery.QueryType, params.CompositeQuery.PanelType, builderQuery, - logsV3.Options{PreferRPM: preferRPM}, + v3.LogQBOptions{PreferRPM: preferRPM}, ) if err != nil { return query, err @@ -103,7 +109,7 @@ func (q *querier) runBuilderQuery( var query string var err error if _, ok := cacheKeys[queryName]; !ok { - query, err = prepareLogsQuery(ctx, start, end, builderQuery, params, preferRPM) + query, err = prepareLogsQuery(ctx, q.UseLogsNewSchema, start, end, builderQuery, params, preferRPM) if err != nil { ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil} return @@ -126,7 +132,7 @@ func (q *querier) runBuilderQuery( missedSeries := make([]*v3.Series, 0) cachedSeries := make([]*v3.Series, 0) for _, miss := range misses { - query, err = prepareLogsQuery(ctx, miss.start, miss.end, builderQuery, params, preferRPM) + query, err = prepareLogsQuery(ctx, q.UseLogsNewSchema, miss.start, miss.end, builderQuery, params, preferRPM) if err != nil { ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil} return diff --git a/pkg/query-service/app/querier/v2/querier.go b/pkg/query-service/app/querier/v2/querier.go index 7dae7f659f..19538fa9a5 100644 --- a/pkg/query-service/app/querier/v2/querier.go +++ b/pkg/query-service/app/querier/v2/querier.go @@ -11,6 +11,7 @@ import ( "time" logsV3 "go.signoz.io/signoz/pkg/query-service/app/logs/v3" + logsV4 "go.signoz.io/signoz/pkg/query-service/app/logs/v4" metricsV4 "go.signoz.io/signoz/pkg/query-service/app/metrics/v4" "go.signoz.io/signoz/pkg/query-service/app/queryBuilder" tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3" @@ -52,10 +53,9 @@ type querier struct { testingMode bool queriesExecuted []string // tuple of start and end time in milliseconds - timeRanges [][]int - returnedSeries []*v3.Series - returnedErr error - + timeRanges [][]int + returnedSeries []*v3.Series + returnedErr error UseLogsNewSchema bool } @@ -74,6 +74,11 @@ type QuerierOptions struct { } func NewQuerier(opts QuerierOptions) interfaces.Querier { + logsQueryBuilder := logsV3.PrepareLogsQuery + if opts.UseLogsNewSchema { + logsQueryBuilder = logsV4.PrepareLogsQuery + } + return &querier{ cache: opts.Cache, reader: opts.Reader, @@ -82,14 +87,15 @@ func NewQuerier(opts QuerierOptions) interfaces.Querier { builder: queryBuilder.NewQueryBuilder(queryBuilder.QueryBuilderOptions{ BuildTraceQuery: tracesV3.PrepareTracesQuery, - BuildLogQuery: logsV3.PrepareLogsQuery, + BuildLogQuery: logsQueryBuilder, BuildMetricQuery: metricsV4.PrepareMetricQuery, }, opts.FeatureLookup), featureLookUp: opts.FeatureLookup, - testingMode: opts.TestingMode, - returnedSeries: opts.ReturnedSeries, - returnedErr: opts.ReturnedErr, + testingMode: opts.TestingMode, + returnedSeries: opts.ReturnedSeries, + returnedErr: opts.ReturnedErr, + UseLogsNewSchema: opts.UseLogsNewSchema, } } diff --git a/pkg/query-service/app/queryBuilder/query_builder.go b/pkg/query-service/app/queryBuilder/query_builder.go index 8e8b79572f..879c2d5153 100644 --- a/pkg/query-service/app/queryBuilder/query_builder.go +++ b/pkg/query-service/app/queryBuilder/query_builder.go @@ -5,7 +5,6 @@ import ( "strings" "github.com/SigNoz/govaluate" - logsV3 "go.signoz.io/signoz/pkg/query-service/app/logs/v3" metricsV3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3" tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3" "go.signoz.io/signoz/pkg/query-service/cache" @@ -44,7 +43,7 @@ var SupportedFunctions = []string{ var EvalFuncs = map[string]govaluate.ExpressionFunction{} type prepareTracesQueryFunc func(start, end int64, panelType v3.PanelType, bq *v3.BuilderQuery, options tracesV3.Options) (string, error) -type prepareLogsQueryFunc func(start, end int64, queryType v3.QueryType, panelType v3.PanelType, bq *v3.BuilderQuery, options logsV3.Options) (string, error) +type prepareLogsQueryFunc func(start, end int64, queryType v3.QueryType, panelType v3.PanelType, bq *v3.BuilderQuery, options v3.LogQBOptions) (string, error) type prepareMetricQueryFunc func(start, end int64, queryType v3.QueryType, panelType v3.PanelType, bq *v3.BuilderQuery, options metricsV3.Options) (string, error) type QueryBuilder struct { @@ -162,7 +161,7 @@ func (qb *QueryBuilder) PrepareLiveTailQuery(params *v3.QueryRangeParamsV3) (str } for queryName, query := range compositeQuery.BuilderQueries { if query.Expression == queryName { - queryStr, err = qb.options.BuildLogQuery(params.Start, params.End, compositeQuery.QueryType, compositeQuery.PanelType, query, logsV3.Options{IsLivetailQuery: true}) + queryStr, err = qb.options.BuildLogQuery(params.Start, params.End, compositeQuery.QueryType, compositeQuery.PanelType, query, v3.LogQBOptions{IsLivetailQuery: true}) if err != nil { return "", err } @@ -218,18 +217,18 @@ func (qb *QueryBuilder) PrepareQueries(params *v3.QueryRangeParamsV3) (map[strin case v3.DataSourceLogs: // for ts query with limit replace it as it is already formed if compositeQuery.PanelType == v3.PanelTypeGraph && query.Limit > 0 && len(query.GroupBy) > 0 { - limitQuery, err := qb.options.BuildLogQuery(start, end, compositeQuery.QueryType, compositeQuery.PanelType, query, logsV3.Options{GraphLimitQtype: constants.FirstQueryGraphLimit, PreferRPM: PreferRPMFeatureEnabled}) + limitQuery, err := qb.options.BuildLogQuery(start, end, compositeQuery.QueryType, compositeQuery.PanelType, query, v3.LogQBOptions{GraphLimitQtype: constants.FirstQueryGraphLimit, PreferRPM: PreferRPMFeatureEnabled}) if err != nil { return nil, err } - placeholderQuery, err := qb.options.BuildLogQuery(start, end, compositeQuery.QueryType, compositeQuery.PanelType, query, logsV3.Options{GraphLimitQtype: constants.SecondQueryGraphLimit, PreferRPM: PreferRPMFeatureEnabled}) + placeholderQuery, err := qb.options.BuildLogQuery(start, end, compositeQuery.QueryType, compositeQuery.PanelType, query, v3.LogQBOptions{GraphLimitQtype: constants.SecondQueryGraphLimit, PreferRPM: PreferRPMFeatureEnabled}) if err != nil { return nil, err } query := fmt.Sprintf(placeholderQuery, limitQuery) queries[queryName] = query } else { - queryString, err := qb.options.BuildLogQuery(start, end, compositeQuery.QueryType, compositeQuery.PanelType, query, logsV3.Options{PreferRPM: PreferRPMFeatureEnabled, GraphLimitQtype: ""}) + queryString, err := qb.options.BuildLogQuery(start, end, compositeQuery.QueryType, compositeQuery.PanelType, query, v3.LogQBOptions{PreferRPM: PreferRPMFeatureEnabled, GraphLimitQtype: ""}) if err != nil { return nil, err } diff --git a/pkg/query-service/app/queryBuilder/query_builder_test.go b/pkg/query-service/app/queryBuilder/query_builder_test.go index cca8e4a028..52af7af780 100644 --- a/pkg/query-service/app/queryBuilder/query_builder_test.go +++ b/pkg/query-service/app/queryBuilder/query_builder_test.go @@ -6,6 +6,7 @@ import ( "github.com/stretchr/testify/require" logsV3 "go.signoz.io/signoz/pkg/query-service/app/logs/v3" + logsV4 "go.signoz.io/signoz/pkg/query-service/app/logs/v4" metricsv3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3" "go.signoz.io/signoz/pkg/query-service/constants" "go.signoz.io/signoz/pkg/query-service/featureManager" @@ -585,6 +586,217 @@ func TestLogsQueryWithFormula(t *testing.T) { } +var testLogsWithFormulaV2 = []struct { + Name string + Query *v3.QueryRangeParamsV3 + ExpectedQuery string +}{ + { + Name: "test formula without dot in filter and group by attribute", + Query: &v3.QueryRangeParamsV3{ + Start: 1702979275000000000, + End: 1702981075000000000, + CompositeQuery: &v3.CompositeQuery{ + QueryType: v3.QueryTypeBuilder, + PanelType: v3.PanelTypeGraph, + BuilderQueries: map[string]*v3.BuilderQuery{ + "A": { + QueryName: "A", + StepInterval: 60, + DataSource: v3.DataSourceLogs, + Filters: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{ + {Key: v3.AttributeKey{Key: "key_1", DataType: v3.AttributeKeyDataTypeBool, Type: v3.AttributeKeyTypeTag}, Value: true, Operator: v3.FilterOperatorEqual}, + }}, + AggregateOperator: v3.AggregateOperatorCount, + Expression: "A", + OrderBy: []v3.OrderBy{ + { + ColumnName: "timestamp", + Order: "desc", + }, + }, + GroupBy: []v3.AttributeKey{ + {Key: "key_1", DataType: v3.AttributeKeyDataTypeBool, Type: v3.AttributeKeyTypeTag}, + }, + }, + "B": { + QueryName: "B", + StepInterval: 60, + DataSource: v3.DataSourceLogs, + Filters: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{ + {Key: v3.AttributeKey{Key: "key_2", DataType: v3.AttributeKeyDataTypeBool, Type: v3.AttributeKeyTypeTag}, Value: true, Operator: v3.FilterOperatorEqual}, + }}, + AggregateOperator: v3.AggregateOperatorCount, + Expression: "B", + OrderBy: []v3.OrderBy{ + { + ColumnName: "timestamp", + Order: "desc", + }, + }, + GroupBy: []v3.AttributeKey{ + {Key: "key_1", DataType: v3.AttributeKeyDataTypeBool, Type: v3.AttributeKeyTypeTag}, + }, + }, + "C": { + QueryName: "C", + Expression: "A + B", + }, + }, + }, + }, + ExpectedQuery: "SELECT A.`key_1` as `key_1`, A.`ts` as `ts`, A.value + B.value as value FROM " + + "(SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 60 SECOND) AS ts, attributes_bool['key_1'] as `key_1`, toFloat64(count(*)) as value from " + + "signoz_logs.distributed_logs_v2 where (timestamp >= 1702979275000000000 AND timestamp <= 1702981075000000000) AND (ts_bucket_start >= 1702977475 AND ts_bucket_start <= 1702981075) " + + "AND attributes_bool['key_1'] = true AND mapContains(attributes_bool, 'key_1') AND mapContains(attributes_bool, 'key_1') group by `key_1`,ts order by value DESC) as A INNER JOIN (SELECT " + + "toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 60 SECOND) AS ts, attributes_bool['key_1'] as `key_1`, toFloat64(count(*)) as value " + + "from signoz_logs.distributed_logs_v2 where (timestamp >= 1702979275000000000 AND timestamp <= 1702981075000000000) AND (ts_bucket_start >= 1702977475 AND ts_bucket_start <= 1702981075) " + + "AND attributes_bool['key_2'] = true AND mapContains(attributes_bool, 'key_2') AND mapContains(attributes_bool, 'key_1') group by `key_1`,ts order by value DESC) as B ON A.`key_1` = B.`key_1` AND A.`ts` = B.`ts`", + }, + { + Name: "test formula with dot in filter and group by attribute", + Query: &v3.QueryRangeParamsV3{ + Start: 1702979056000000000, + End: 1702982656000000000, + CompositeQuery: &v3.CompositeQuery{ + QueryType: v3.QueryTypeBuilder, + PanelType: v3.PanelTypeTable, + BuilderQueries: map[string]*v3.BuilderQuery{ + "A": { + QueryName: "A", + StepInterval: 60, + DataSource: v3.DataSourceLogs, + Filters: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{ + {Key: v3.AttributeKey{Key: "key1.1", DataType: v3.AttributeKeyDataTypeBool, Type: v3.AttributeKeyTypeTag}, Value: true, Operator: v3.FilterOperatorEqual}, + }}, + AggregateOperator: v3.AggregateOperatorCount, + Expression: "A", + OrderBy: []v3.OrderBy{ + { + ColumnName: "timestamp", + Order: "desc", + }, + }, + GroupBy: []v3.AttributeKey{ + {Key: "key1.1", DataType: v3.AttributeKeyDataTypeBool, Type: v3.AttributeKeyTypeTag}, + }, + }, + "B": { + QueryName: "B", + StepInterval: 60, + DataSource: v3.DataSourceLogs, + Filters: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{ + {Key: v3.AttributeKey{Key: "key1.2", DataType: v3.AttributeKeyDataTypeBool, Type: v3.AttributeKeyTypeTag}, Value: true, Operator: v3.FilterOperatorEqual}, + }}, + AggregateOperator: v3.AggregateOperatorCount, + Expression: "B", + OrderBy: []v3.OrderBy{ + { + ColumnName: "timestamp", + Order: "desc", + }, + }, + GroupBy: []v3.AttributeKey{ + {Key: "key1.1", DataType: v3.AttributeKeyDataTypeBool, Type: v3.AttributeKeyTypeTag}, + }, + }, + "C": { + QueryName: "C", + Expression: "A + B", + }, + }, + }, + }, + ExpectedQuery: "SELECT A.`key1.1` as `key1.1`, A.`ts` as `ts`, A.value + B.value as value FROM (SELECT attributes_bool['key1.1'] as `key1.1`, " + + "toFloat64(count(*)) as value from signoz_logs.distributed_logs_v2 where (timestamp >= 1702979056000000000 AND timestamp <= 1702982656000000000) AND (ts_bucket_start >= 1702977256 AND ts_bucket_start <= 1702982656) " + + "AND attributes_bool['key1.1'] = true AND mapContains(attributes_bool, 'key1.1') AND mapContains(attributes_bool, 'key1.1') group by `key1.1` order by value DESC) as A INNER JOIN (SELECT " + + "attributes_bool['key1.1'] as `key1.1`, toFloat64(count(*)) as value from signoz_logs.distributed_logs_v2 where (timestamp >= 1702979056000000000 AND timestamp <= 1702982656000000000) " + + "AND (ts_bucket_start >= 1702977256 AND ts_bucket_start <= 1702982656) AND attributes_bool['key1.2'] = true AND mapContains(attributes_bool, 'key1.2') AND " + + "mapContains(attributes_bool, 'key1.1') group by `key1.1` order by value DESC) as B ON A.`key1.1` = B.`key1.1` AND A.`ts` = B.`ts`", + }, + { + Name: "test formula with dot in filter and group by materialized attribute", + Query: &v3.QueryRangeParamsV3{ + Start: 1702980884000000000, + End: 1702984484000000000, + CompositeQuery: &v3.CompositeQuery{ + QueryType: v3.QueryTypeBuilder, + PanelType: v3.PanelTypeGraph, + BuilderQueries: map[string]*v3.BuilderQuery{ + "A": { + QueryName: "A", + StepInterval: 60, + DataSource: v3.DataSourceLogs, + Filters: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{ + {Key: v3.AttributeKey{Key: "key_2", DataType: v3.AttributeKeyDataTypeBool, Type: v3.AttributeKeyTypeTag, IsColumn: true}, Value: true, Operator: v3.FilterOperatorEqual}, + }}, + AggregateOperator: v3.AggregateOperatorCount, + Expression: "A", + OrderBy: []v3.OrderBy{ + { + ColumnName: "timestamp", + Order: "desc", + }, + }, + GroupBy: []v3.AttributeKey{ + {Key: "key1.1", DataType: v3.AttributeKeyDataTypeBool, Type: v3.AttributeKeyTypeTag, IsColumn: true}, + }, + }, + "B": { + QueryName: "B", + StepInterval: 60, + DataSource: v3.DataSourceLogs, + Filters: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{ + {Key: v3.AttributeKey{Key: "key_1", DataType: v3.AttributeKeyDataTypeBool, Type: v3.AttributeKeyTypeTag}, Value: true, Operator: v3.FilterOperatorEqual}, + }}, + AggregateOperator: v3.AggregateOperatorCount, + Expression: "B", + OrderBy: []v3.OrderBy{ + { + ColumnName: "timestamp", + Order: "desc", + }, + }, + GroupBy: []v3.AttributeKey{ + {Key: "key1.1", DataType: v3.AttributeKeyDataTypeBool, Type: v3.AttributeKeyTypeTag, IsColumn: true}, + }, + }, + "C": { + QueryName: "C", + Expression: "A - B", + }, + }, + }, + }, + ExpectedQuery: "SELECT A.`key1.1` as `key1.1`, A.`ts` as `ts`, A.value - B.value as value FROM (SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 60 SECOND) AS ts, " + + "`attribute_bool_key1$$1` as `key1.1`, toFloat64(count(*)) as value from signoz_logs.distributed_logs_v2 where (timestamp >= 1702980884000000000 AND timestamp <= 1702984484000000000) AND " + + "(ts_bucket_start >= 1702979084 AND ts_bucket_start <= 1702984484) AND `attribute_bool_key_2` = true AND `attribute_bool_key1$$1_exists`=true group by `key1.1`,ts order by value DESC) as " + + "A INNER JOIN (SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 60 SECOND) AS ts, `attribute_bool_key1$$1` as `key1.1`, toFloat64(count(*)) as value from " + + "signoz_logs.distributed_logs_v2 where (timestamp >= 1702980884000000000 AND timestamp <= 1702984484000000000) AND (ts_bucket_start >= 1702979084 AND ts_bucket_start <= 1702984484) AND " + + "attributes_bool['key_1'] = true AND mapContains(attributes_bool, 'key_1') AND `attribute_bool_key1$$1_exists`=true group by `key1.1`,ts order by value DESC) as B " + + "ON A.`key1.1` = B.`key1.1` AND A.`ts` = B.`ts`", + }, +} + +func TestLogsQueryWithFormulaV2(t *testing.T) { + t.Parallel() + + qbOptions := QueryBuilderOptions{ + BuildLogQuery: logsV4.PrepareLogsQuery, + } + fm := featureManager.StartManager() + qb := NewQueryBuilder(qbOptions, fm) + + for _, test := range testLogsWithFormulaV2 { + t.Run(test.Name, func(t *testing.T) { + queries, err := qb.PrepareQueries(test.Query) + require.NoError(t, err) + require.Equal(t, test.ExpectedQuery, queries["C"]) + }) + } + +} + func TestGenerateCacheKeysMetricsBuilder(t *testing.T) { testCases := []struct { name string diff --git a/pkg/query-service/interfaces/interface.go b/pkg/query-service/interfaces/interface.go index e9e46254d2..7a4252293f 100644 --- a/pkg/query-service/interfaces/interface.go +++ b/pkg/query-service/interfaces/interface.go @@ -71,6 +71,7 @@ type Reader interface { GetTimeSeriesResultV3(ctx context.Context, query string) ([]*v3.Series, error) GetListResultV3(ctx context.Context, query string) ([]*v3.Row, error) LiveTailLogsV3(ctx context.Context, query string, timestampStart uint64, idStart string, client *v3.LogsLiveTailClient) + LiveTailLogsV4(ctx context.Context, query string, timestampStart uint64, idStart string, client *v3.LogsLiveTailClientV2) GetDashboardsInfo(ctx context.Context) (*model.DashboardsInfo, error) GetSavedViewsInfo(ctx context.Context) (*model.SavedViewsInfo, error) diff --git a/pkg/query-service/model/response.go b/pkg/query-service/model/response.go index 83df872175..5144dade47 100644 --- a/pkg/query-service/model/response.go +++ b/pkg/query-service/model/response.go @@ -572,6 +572,21 @@ type SignozLog struct { Attributes_bool map[string]bool `json:"attributes_bool" ch:"attributes_bool"` } +type SignozLogV2 struct { + Timestamp uint64 `json:"timestamp" ch:"timestamp"` + ID string `json:"id" ch:"id"` + TraceID string `json:"trace_id" ch:"trace_id"` + SpanID string `json:"span_id" ch:"span_id"` + TraceFlags uint32 `json:"trace_flags" ch:"trace_flags"` + SeverityText string `json:"severity_text" ch:"severity_text"` + SeverityNumber uint8 `json:"severity_number" ch:"severity_number"` + Body string `json:"body" ch:"body"` + Resources_string map[string]string `json:"resources_string" ch:"resources_string"` + Attributes_string map[string]string `json:"attributes_string" ch:"attributes_string"` + Attributes_number map[string]float64 `json:"attributes_float" ch:"attributes_number"` + Attributes_bool map[string]bool `json:"attributes_bool" ch:"attributes_bool"` +} + type LogsTailClient struct { Name string Logs chan *SignozLog diff --git a/pkg/query-service/model/v3/v3.go b/pkg/query-service/model/v3/v3.go index c21d47229c..4af5d36ae4 100644 --- a/pkg/query-service/model/v3/v3.go +++ b/pkg/query-service/model/v3/v3.go @@ -1035,6 +1035,13 @@ type Result struct { Table *Table `json:"table,omitempty"` } +type LogsLiveTailClientV2 struct { + Name string + Logs chan *model.SignozLogV2 + Done chan *bool + Error chan error +} + type LogsLiveTailClient struct { Name string Logs chan *model.SignozLog diff --git a/pkg/query-service/utils/format.go b/pkg/query-service/utils/format.go index e9b7a0b7e3..f09f4dffd6 100644 --- a/pkg/query-service/utils/format.go +++ b/pkg/query-service/utils/format.go @@ -14,6 +14,8 @@ import ( // ValidateAndCastValue validates and casts the value of a key to the corresponding data type of the key func ValidateAndCastValue(v interface{}, dataType v3.AttributeKeyDataType) (interface{}, error) { + // get the actual value if it's a pointer + v = getPointerValue(v) switch dataType { case v3.AttributeKeyDataTypeString: switch x := v.(type) {