diff --git a/ee/query-service/app/api/api.go b/ee/query-service/app/api/api.go index d145d15066..991e6519ff 100644 --- a/ee/query-service/app/api/api.go +++ b/ee/query-service/app/api/api.go @@ -66,8 +66,6 @@ func NewAPIHandler(opts APIHandlerOptions, signoz *signoz.SigNoz) (*APIHandler, LogsParsingPipelineController: opts.LogsParsingPipelineController, Cache: opts.Cache, FluxInterval: opts.FluxInterval, - UseLogsNewSchema: opts.UseLogsNewSchema, - UseTraceNewSchema: opts.UseTraceNewSchema, AlertmanagerAPI: alertmanager.NewAPI(signoz.Alertmanager), FieldsAPI: fields.NewAPI(signoz.TelemetryStore), Signoz: signoz, diff --git a/ee/query-service/app/db/reader.go b/ee/query-service/app/db/reader.go index a77f1a13d2..8ec0eb17ab 100644 --- a/ee/query-service/app/db/reader.go +++ b/ee/query-service/app/db/reader.go @@ -23,12 +23,10 @@ func NewDataConnector( telemetryStore telemetrystore.TelemetryStore, prometheus prometheus.Prometheus, cluster string, - useLogsNewSchema bool, - useTraceNewSchema bool, fluxIntervalForTraceDetail time.Duration, cache cache.Cache, ) *ClickhouseReader { - chReader := basechr.NewReader(sqlDB, telemetryStore, prometheus, cluster, useLogsNewSchema, useTraceNewSchema, fluxIntervalForTraceDetail, cache) + chReader := basechr.NewReader(sqlDB, telemetryStore, prometheus, cluster, fluxIntervalForTraceDetail, cache) return &ClickhouseReader{ conn: telemetryStore.ClickhouseDB(), appdb: sqlDB, diff --git a/ee/query-service/app/server.go b/ee/query-service/app/server.go index 0cf47adf39..1c1169edf6 100644 --- a/ee/query-service/app/server.go +++ b/ee/query-service/app/server.go @@ -62,8 +62,6 @@ type ServerOptions struct { FluxIntervalForTraceDetail string Cluster string GatewayUrl string - UseLogsNewSchema bool - UseTraceNewSchema bool Jwt *authtypes.JWT } @@ -132,8 +130,6 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { serverOptions.SigNoz.TelemetryStore, serverOptions.SigNoz.Prometheus, serverOptions.Cluster, - serverOptions.UseLogsNewSchema, - serverOptions.UseTraceNewSchema, fluxIntervalForTraceDetail, serverOptions.SigNoz.Cache, ) @@ -151,8 +147,6 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { serverOptions.SigNoz.SQLStore.SQLxDB(), reader, c, - serverOptions.UseLogsNewSchema, - serverOptions.UseTraceNewSchema, serverOptions.SigNoz.Alertmanager, serverOptions.SigNoz.SQLStore, serverOptions.SigNoz.TelemetryStore, @@ -233,8 +227,6 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { FluxInterval: fluxInterval, Gateway: gatewayProxy, GatewayUrl: serverOptions.GatewayUrl, - UseLogsNewSchema: serverOptions.UseLogsNewSchema, - UseTraceNewSchema: serverOptions.UseTraceNewSchema, JWT: serverOptions.Jwt, } @@ -244,8 +236,6 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { } s := &Server{ - // logger: logger, - // tracer: tracer, ruleManager: rm, serverOptions: serverOptions, unavailableChannel: make(chan healthcheck.Status), @@ -486,8 +476,6 @@ func makeRulesManager( db *sqlx.DB, ch baseint.Reader, cache cache.Cache, - useLogsNewSchema bool, - useTraceNewSchema bool, alertmanager alertmanager.Alertmanager, sqlstore sqlstore.SQLStore, telemetryStore telemetrystore.TelemetryStore, @@ -504,8 +492,6 @@ func makeRulesManager( Cache: cache, EvalDelay: baseconst.GetEvalDelay(), PrepareTaskFunc: rules.PrepareTaskFunc, - UseLogsNewSchema: useLogsNewSchema, - UseTraceNewSchema: useTraceNewSchema, PrepareTestRuleFunc: rules.TestNotification, Alertmanager: alertmanager, SQLStore: sqlstore, diff --git a/ee/query-service/main.go b/ee/query-service/main.go index c4212c457c..811718345a 100644 --- a/ee/query-service/main.go +++ b/ee/query-service/main.go @@ -23,6 +23,7 @@ import ( "go.uber.org/zap/zapcore" ) +// Deprecated: Please use the logger from pkg/instrumentation. func initZapLog() *zap.Logger { config := zap.NewProductionConfig() config.EncoderConfig.TimeKey = "timestamp" @@ -52,7 +53,9 @@ func main() { var gatewayUrl string var useLicensesV3 bool + // Deprecated flag.BoolVar(&useLogsNewSchema, "use-logs-new-schema", false, "use logs_v2 schema for logs") + // Deprecated flag.BoolVar(&useTraceNewSchema, "use-trace-new-schema", false, "use new schema for traces") // Deprecated flag.StringVar(&promConfigPath, "config", "./config/prometheus.yml", "(prometheus config to read metrics)") @@ -140,8 +143,6 @@ func main() { FluxIntervalForTraceDetail: fluxIntervalForTraceDetail, Cluster: cluster, GatewayUrl: gatewayUrl, - UseLogsNewSchema: useLogsNewSchema, - UseTraceNewSchema: useTraceNewSchema, Jwt: jwt, } diff --git a/ee/query-service/rules/manager.go b/ee/query-service/rules/manager.go index 8d6d00ddcc..cb67a5ab4b 100644 --- a/ee/query-service/rules/manager.go +++ b/ee/query-service/rules/manager.go @@ -25,8 +25,6 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error) ruleId, opts.Rule, opts.Reader, - opts.UseLogsNewSchema, - opts.UseTraceNewSchema, baserules.WithEvalDelay(opts.ManagerOpts.EvalDelay), baserules.WithSQLStore(opts.SQLStore), ) @@ -123,8 +121,6 @@ func TestNotification(opts baserules.PrepareTestRuleOptions) (int, *basemodel.Ap alertname, parsedRule, opts.Reader, - opts.UseLogsNewSchema, - opts.UseTraceNewSchema, baserules.WithSendAlways(), baserules.WithSendUnmatched(), baserules.WithSQLStore(opts.SQLStore), diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index 4f2451cfd6..bff7b82ada 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -21,7 +21,6 @@ import ( "github.com/SigNoz/signoz/pkg/telemetrystore" "github.com/SigNoz/signoz/pkg/types" "github.com/SigNoz/signoz/pkg/valuer" - "github.com/mailru/easyjson" "github.com/uptrace/bun" "github.com/google/uuid" @@ -145,9 +144,6 @@ type ClickHouseReader struct { liveTailRefreshSeconds int cluster string - useLogsNewSchema bool - useTraceNewSchema bool - logsTableName string logsLocalTableName string @@ -168,13 +164,11 @@ func NewReader( telemetryStore telemetrystore.TelemetryStore, prometheus prometheus.Prometheus, cluster string, - useLogsNewSchema bool, - useTraceNewSchema bool, fluxIntervalForTraceDetail time.Duration, cache cache.Cache, ) *ClickHouseReader { options := NewOptions(primaryNamespace, archiveNamespace) - return NewReaderFromClickhouseConnection(options, sqlDB, telemetryStore, prometheus, cluster, useLogsNewSchema, useTraceNewSchema, fluxIntervalForTraceDetail, cache) + return NewReaderFromClickhouseConnection(options, sqlDB, telemetryStore, prometheus, cluster, fluxIntervalForTraceDetail, cache) } func NewReaderFromClickhouseConnection( @@ -183,65 +177,48 @@ func NewReaderFromClickhouseConnection( telemetryStore telemetrystore.TelemetryStore, prometheus prometheus.Prometheus, cluster string, - useLogsNewSchema bool, - useTraceNewSchema bool, fluxIntervalForTraceDetail time.Duration, cache cache.Cache, ) *ClickHouseReader { - logsTableName := options.primary.LogsTable - logsLocalTableName := options.primary.LogsLocalTable - if useLogsNewSchema { - logsTableName = options.primary.LogsTableV2 - logsLocalTableName = options.primary.LogsLocalTableV2 - } - - traceTableName := options.primary.IndexTable - traceLocalTableName := options.primary.LocalIndexTable - if useTraceNewSchema { - traceTableName = options.primary.TraceIndexTableV3 - traceLocalTableName = options.primary.TraceLocalTableNameV3 - } + logsTableName := options.primary.LogsTableV2 + logsLocalTableName := options.primary.LogsLocalTableV2 + traceTableName := options.primary.TraceIndexTableV3 + traceLocalTableName := options.primary.TraceLocalTableNameV3 return &ClickHouseReader{ - db: telemetryStore.ClickhouseDB(), - prometheus: prometheus, - sqlDB: sqlDB, - TraceDB: options.primary.TraceDB, - operationsTable: options.primary.OperationsTable, - indexTable: options.primary.IndexTable, - errorTable: options.primary.ErrorTable, - usageExplorerTable: options.primary.UsageExplorerTable, - durationTable: options.primary.DurationTable, - SpansTable: options.primary.SpansTable, - spanAttributeTableV2: options.primary.SpanAttributeTableV2, - spanAttributesKeysTable: options.primary.SpanAttributeKeysTable, - dependencyGraphTable: options.primary.DependencyGraphTable, - topLevelOperationsTable: options.primary.TopLevelOperationsTable, - logsDB: options.primary.LogsDB, - logsTable: options.primary.LogsTable, - logsLocalTable: options.primary.LogsLocalTable, - logsAttributeKeys: options.primary.LogsAttributeKeysTable, - logsResourceKeys: options.primary.LogsResourceKeysTable, - logsTagAttributeTableV2: options.primary.LogsTagAttributeTableV2, - liveTailRefreshSeconds: options.primary.LiveTailRefreshSeconds, - cluster: cluster, - queryProgressTracker: queryprogress.NewQueryProgressTracker(), - - useLogsNewSchema: useLogsNewSchema, - useTraceNewSchema: useTraceNewSchema, - - logsTableV2: options.primary.LogsTableV2, - logsLocalTableV2: options.primary.LogsLocalTableV2, - logsResourceTableV2: options.primary.LogsResourceTableV2, - logsResourceLocalTableV2: options.primary.LogsResourceLocalTableV2, - logsTableName: logsTableName, - logsLocalTableName: logsLocalTableName, - - traceLocalTableName: traceLocalTableName, - traceTableName: traceTableName, - traceResourceTableV3: options.primary.TraceResourceTableV3, - traceSummaryTable: options.primary.TraceSummaryTable, - + db: telemetryStore.ClickhouseDB(), + prometheus: prometheus, + sqlDB: sqlDB, + TraceDB: options.primary.TraceDB, + operationsTable: options.primary.OperationsTable, + indexTable: options.primary.IndexTable, + errorTable: options.primary.ErrorTable, + usageExplorerTable: options.primary.UsageExplorerTable, + durationTable: options.primary.DurationTable, + SpansTable: options.primary.SpansTable, + spanAttributeTableV2: options.primary.SpanAttributeTableV2, + spanAttributesKeysTable: options.primary.SpanAttributeKeysTable, + dependencyGraphTable: options.primary.DependencyGraphTable, + topLevelOperationsTable: options.primary.TopLevelOperationsTable, + logsDB: options.primary.LogsDB, + logsTable: options.primary.LogsTable, + logsLocalTable: options.primary.LogsLocalTable, + logsAttributeKeys: options.primary.LogsAttributeKeysTable, + logsResourceKeys: options.primary.LogsResourceKeysTable, + logsTagAttributeTableV2: options.primary.LogsTagAttributeTableV2, + liveTailRefreshSeconds: options.primary.LiveTailRefreshSeconds, + cluster: cluster, + queryProgressTracker: queryprogress.NewQueryProgressTracker(), + logsTableV2: options.primary.LogsTableV2, + logsLocalTableV2: options.primary.LogsLocalTableV2, + logsResourceTableV2: options.primary.LogsResourceTableV2, + logsResourceLocalTableV2: options.primary.LogsResourceLocalTableV2, + logsTableName: logsTableName, + logsLocalTableName: logsLocalTableName, + traceLocalTableName: traceLocalTableName, + traceTableName: traceTableName, + traceResourceTableV3: options.primary.TraceResourceTableV3, + traceSummaryTable: options.primary.TraceSummaryTable, fluxIntervalForTraceDetail: fluxIntervalForTraceDetail, cache: cache, metadataDB: options.primary.MetadataDB, @@ -288,20 +265,9 @@ func (r *ClickHouseReader) GetQueryRangeResult(ctx context.Context, query *model } func (r *ClickHouseReader) GetServicesList(ctx context.Context) (*[]string, error) { - services := []string{} - query := fmt.Sprintf(`SELECT DISTINCT serviceName FROM %s.%s WHERE toDate(timestamp) > now() - INTERVAL 1 DAY`, r.TraceDB, r.traceTableName) - - if r.useTraceNewSchema { - query = fmt.Sprintf(`SELECT DISTINCT serviceName FROM %s.%s WHERE ts_bucket_start > (toUnixTimestamp(now() - INTERVAL 1 DAY) - 1800) AND toDate(timestamp) > now() - INTERVAL 1 DAY`, r.TraceDB, r.traceTableName) - } - - rows, err := r.db.Query(ctx, query) - - zap.L().Info(query) - + rows, err := r.db.Query(ctx, fmt.Sprintf(`SELECT DISTINCT serviceName FROM %s.%s WHERE ts_bucket_start > (toUnixTimestamp(now() - INTERVAL 1 DAY) - 1800) AND toDate(timestamp) > now() - INTERVAL 1 DAY`, r.TraceDB, r.traceTableName)) if err != nil { - zap.L().Error("Error in processing sql query", zap.Error(err)) return nil, fmt.Errorf("error in processing sql query") } @@ -313,6 +279,7 @@ func (r *ClickHouseReader) GetServicesList(ctx context.Context) (*[]string, erro } services = append(services, serviceName) } + return &services, nil } @@ -410,7 +377,7 @@ func (r *ClickHouseReader) buildResourceSubQuery(tags []model.TagQueryParam, svc return resourceSubQuery, nil } -func (r *ClickHouseReader) GetServicesV2(ctx context.Context, queryParams *model.GetServicesParams) (*[]model.ServiceItem, *model.ApiError) { +func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.GetServicesParams) (*[]model.ServiceItem, *model.ApiError) { if r.indexTable == "" { return nil, &model.ApiError{Typ: model.ErrorExec, Err: ErrNoIndexTable} @@ -535,128 +502,7 @@ func (r *ClickHouseReader) GetServicesV2(ctx context.Context, queryParams *model return &serviceItems, nil } -func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.GetServicesParams) (*[]model.ServiceItem, *model.ApiError) { - if r.useTraceNewSchema { - return r.GetServicesV2(ctx, queryParams) - } - - if r.indexTable == "" { - return nil, &model.ApiError{Typ: model.ErrorExec, Err: ErrNoIndexTable} - } - - topLevelOps, apiErr := r.GetTopLevelOperations(ctx, *queryParams.Start, *queryParams.End, nil) - if apiErr != nil { - return nil, apiErr - } - - serviceItems := []model.ServiceItem{} - var wg sync.WaitGroup - // limit the number of concurrent queries to not overload the clickhouse server - sem := make(chan struct{}, 10) - var mtx sync.RWMutex - - for svc, ops := range *topLevelOps { - sem <- struct{}{} - wg.Add(1) - go func(svc string, ops []string) { - defer wg.Done() - defer func() { <-sem }() - var serviceItem model.ServiceItem - var numErrors uint64 - - // Even if the total number of operations within the time range is less and the all - // the top level operations are high, we want to warn to let user know the issue - // with the instrumentation - serviceItem.DataWarning = model.DataWarning{ - TopLevelOps: (*topLevelOps)[svc], - } - - // default max_query_size = 262144 - // Let's assume the average size of the item in `ops` is 50 bytes - // We can have 262144/50 = 5242 items in the `ops` array - // Although we have make it as big as 5k, We cap the number of items - // in the `ops` array to 1500 - - ops = ops[:int(math.Min(1500, float64(len(ops))))] - - query := fmt.Sprintf( - `SELECT - quantile(0.99)(durationNano) as p99, - avg(durationNano) as avgDuration, - count(*) as numCalls - FROM %s.%s - WHERE serviceName = @serviceName AND name In @names AND timestamp>= @start AND timestamp<= @end`, - r.TraceDB, r.indexTable, - ) - errorQuery := fmt.Sprintf( - `SELECT - count(*) as numErrors - FROM %s.%s - WHERE serviceName = @serviceName AND name In @names AND timestamp>= @start AND timestamp<= @end AND statusCode=2`, - r.TraceDB, r.indexTable, - ) - - args := []interface{}{} - args = append(args, - clickhouse.Named("start", strconv.FormatInt(queryParams.Start.UnixNano(), 10)), - clickhouse.Named("end", strconv.FormatInt(queryParams.End.UnixNano(), 10)), - clickhouse.Named("serviceName", svc), - clickhouse.Named("names", ops), - ) - // create TagQuery from TagQueryParams - tags := createTagQueryFromTagQueryParams(queryParams.Tags) - subQuery, argsSubQuery, errStatus := buildQueryWithTagParams(ctx, tags) - query += subQuery - args = append(args, argsSubQuery...) - if errStatus != nil { - zap.L().Error("Error in processing sql query", zap.Error(errStatus)) - return - } - err := r.db.QueryRow( - ctx, - query, - args..., - ).ScanStruct(&serviceItem) - - if serviceItem.NumCalls == 0 { - return - } - - if err != nil { - zap.L().Error("Error in processing sql query", zap.Error(err)) - return - } - subQuery, argsSubQuery, errStatus = buildQueryWithTagParams(ctx, tags) - if errStatus != nil { - zap.L().Error("Error building query with tag params", zap.Error(errStatus)) - return - } - errorQuery += subQuery - args = append(args, argsSubQuery...) - err = r.db.QueryRow(ctx, errorQuery, args...).Scan(&numErrors) - if err != nil { - zap.L().Error("Error in processing sql query", zap.Error(err)) - return - } - - serviceItem.ServiceName = svc - serviceItem.NumErrors = numErrors - mtx.Lock() - serviceItems = append(serviceItems, serviceItem) - mtx.Unlock() - }(svc, ops) - } - wg.Wait() - - for idx := range serviceItems { - serviceItems[idx].CallRate = float64(serviceItems[idx].NumCalls) / float64(queryParams.Period) - serviceItems[idx].ErrorRate = float64(serviceItems[idx].NumErrors) * 100 / float64(serviceItems[idx].NumCalls) - } - return &serviceItems, nil -} - func getStatusFilters(query string, statusParams []string, excludeMap map[string]struct{}) string { - // status can only be two and if both are selected than they are equivalent to none selected if _, ok := excludeMap["status"]; ok { if len(statusParams) == 1 { @@ -833,7 +679,7 @@ func addExistsOperator(item model.TagQuery, tagMapType string, not bool) (string return fmt.Sprintf(" AND %s (%s)", notStr, strings.Join(tagOperatorPair, " OR ")), args } -func (r *ClickHouseReader) GetTopOperationsV2(ctx context.Context, queryParams *model.GetTopOperationsParams) (*[]model.TopOperationsItem, *model.ApiError) { +func (r *ClickHouseReader) GetTopOperations(ctx context.Context, queryParams *model.GetTopOperationsParams) (*[]model.TopOperationsItem, *model.ApiError) { namedArgs := []interface{}{ clickhouse.Named("start", strconv.FormatInt(queryParams.Start.UnixNano(), 10)), @@ -888,60 +734,6 @@ func (r *ClickHouseReader) GetTopOperationsV2(ctx context.Context, queryParams * return &topOperationsItems, nil } -func (r *ClickHouseReader) GetTopOperations(ctx context.Context, queryParams *model.GetTopOperationsParams) (*[]model.TopOperationsItem, *model.ApiError) { - - if r.useTraceNewSchema { - return r.GetTopOperationsV2(ctx, queryParams) - } - - namedArgs := []interface{}{ - clickhouse.Named("start", strconv.FormatInt(queryParams.Start.UnixNano(), 10)), - clickhouse.Named("end", strconv.FormatInt(queryParams.End.UnixNano(), 10)), - clickhouse.Named("serviceName", queryParams.ServiceName), - } - - var topOperationsItems []model.TopOperationsItem - - query := fmt.Sprintf(` - SELECT - quantile(0.5)(durationNano) as p50, - quantile(0.95)(durationNano) as p95, - quantile(0.99)(durationNano) as p99, - COUNT(*) as numCalls, - countIf(statusCode=2) as errorCount, - name - FROM %s.%s - WHERE serviceName = @serviceName AND timestamp>= @start AND timestamp<= @end`, - r.TraceDB, r.indexTable, - ) - args := []interface{}{} - args = append(args, namedArgs...) - // create TagQuery from TagQueryParams - tags := createTagQueryFromTagQueryParams(queryParams.Tags) - subQuery, argsSubQuery, errStatus := buildQueryWithTagParams(ctx, tags) - query += subQuery - args = append(args, argsSubQuery...) - if errStatus != nil { - return nil, errStatus - } - query += " GROUP BY name ORDER BY p99 DESC" - if queryParams.Limit > 0 { - query += " LIMIT @limit" - args = append(args, clickhouse.Named("limit", queryParams.Limit)) - } - err := r.db.Select(ctx, &topOperationsItems, query, args...) - - if err != nil { - zap.L().Error("Error in processing sql query", zap.Error(err)) - return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query")} - } - - if topOperationsItems == nil { - topOperationsItems = []model.TopOperationsItem{} - } - - return &topOperationsItems, nil -} func (r *ClickHouseReader) GetUsage(ctx context.Context, queryParams *model.GetUsageParams) (*[]model.UsageItem, error) { var usageItems []model.UsageItem @@ -1415,7 +1207,7 @@ func getLocalTableName(tableName string) string { } -func (r *ClickHouseReader) SetTTLLogsV2(ctx context.Context, orgID string, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) { +func (r *ClickHouseReader) setTTLLogs(ctx context.Context, orgID string, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) { // uuid is used as transaction id uuidWithHyphen := uuid.New() uuid := strings.Replace(uuidWithHyphen.String(), "-", "", -1) @@ -1556,7 +1348,7 @@ func (r *ClickHouseReader) SetTTLLogsV2(ctx context.Context, orgID string, param return &model.SetTTLResponseItem{Message: "move ttl has been successfully set up"}, nil } -func (r *ClickHouseReader) SetTTLTracesV2(ctx context.Context, orgID string, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) { +func (r *ClickHouseReader) setTTLTraces(ctx context.Context, orgID string, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) { // uuid is used as transaction id uuidWithHyphen := uuid.New() uuid := strings.Replace(uuidWithHyphen.String(), "-", "", -1) @@ -1703,8 +1495,7 @@ func (r *ClickHouseReader) SetTTLTracesV2(ctx context.Context, orgID string, par // SetTTL sets the TTL for traces or metrics or logs tables. // This is an async API which creates goroutines to set TTL. // Status of TTL update is tracked with ttl_status table in sqlite db. -func (r *ClickHouseReader) SetTTL(ctx context.Context, orgID string, - params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) { +func (r *ClickHouseReader) SetTTL(ctx context.Context, orgID string, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) { // Keep only latest 100 transactions/requests r.deleteTtlTransactions(ctx, orgID, 100) // uuid is used as transaction id @@ -1718,121 +1509,7 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context, orgID string, switch params.Type { case constants.TraceTTL: - if r.useTraceNewSchema { - return r.SetTTLTracesV2(ctx, orgID, params) - } - - tableNames := []string{ - signozTraceDBName + "." + signozTraceTableName, - signozTraceDBName + "." + signozDurationMVTable, - signozTraceDBName + "." + signozSpansTable, - signozTraceDBName + "." + signozErrorIndexTable, - signozTraceDBName + "." + signozUsageExplorerTable, - signozTraceDBName + "." + defaultDependencyGraphTable, - } - for _, tableName := range tableNames { - tableName := getLocalTableName(tableName) - statusItem, err := r.checkTTLStatusItem(ctx, orgID, tableName) - if err != nil { - return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")} - } - if statusItem.Status == constants.StatusPending { - return nil, &model.ApiError{Typ: model.ErrorConflict, Err: fmt.Errorf("TTL is already running")} - } - } - for _, tableName := range tableNames { - tableName := getLocalTableName(tableName) - // TODO: DB queries should be implemented with transactional statements but currently clickhouse doesn't support them. Issue: https://github.com/ClickHouse/ClickHouse/issues/22086 - go func(tableName string) { - ttl := types.TTLSetting{ - Identifiable: types.Identifiable{ - ID: valuer.GenerateUUID(), - }, - TimeAuditable: types.TimeAuditable{ - CreatedAt: time.Now(), - UpdatedAt: time.Now(), - }, - TransactionID: uuid, - TableName: tableName, - TTL: int(params.DelDuration), - Status: constants.StatusPending, - ColdStorageTTL: coldStorageDuration, - OrgID: orgID, - } - _, dbErr := r. - sqlDB. - BunDB(). - NewInsert(). - Model(&ttl). - Exec(ctx) - if dbErr != nil { - zap.L().Error("error in inserting to ttl_status table", zap.Error(dbErr)) - return - } - req := fmt.Sprintf( - "ALTER TABLE %v ON CLUSTER %s MODIFY TTL toDateTime(timestamp) + INTERVAL %v SECOND DELETE", - tableName, r.cluster, params.DelDuration) - if len(params.ColdStorageVolume) > 0 { - req += fmt.Sprintf(", toDateTime(timestamp) + INTERVAL %v SECOND TO VOLUME '%s'", - params.ToColdStorageDuration, params.ColdStorageVolume) - } - err := r.setColdStorage(context.Background(), tableName, params.ColdStorageVolume) - if err != nil { - zap.L().Error("Error in setting cold storage", zap.Error(err)) - statusItem, err := r.checkTTLStatusItem(ctx, orgID, tableName) - if err == nil { - _, dbErr := r. - sqlDB. - BunDB(). - NewUpdate(). - Model(new(types.TTLSetting)). - Set("updated_at = ?", time.Now()). - Set("status = ?", constants.StatusFailed). - Where("id = ?", statusItem.ID.StringValue()). - Exec(ctx) - if dbErr != nil { - zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr)) - return - } - } - return - } - req += " SETTINGS materialize_ttl_after_modify=0;" - zap.L().Error("Executing TTL request: ", zap.String("request", req)) - statusItem, _ := r.checkTTLStatusItem(ctx, orgID, tableName) - if err := r.db.Exec(context.Background(), req); err != nil { - zap.L().Error("Error in executing set TTL query", zap.Error(err)) - _, dbErr := r. - sqlDB. - BunDB(). - NewUpdate(). - Model(new(types.TTLSetting)). - Set("updated_at = ?", time.Now()). - Set("status = ?", constants.StatusFailed). - Where("id = ?", statusItem.ID.StringValue()). - Exec(ctx) - if dbErr != nil { - zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr)) - return - } - return - } - _, dbErr = r. - sqlDB. - BunDB(). - NewUpdate(). - Model(new(types.TTLSetting)). - Set("updated_at = ?", time.Now()). - Set("status = ?", constants.StatusSuccess). - Where("id = ?", statusItem.ID.StringValue()). - Exec(ctx) - if dbErr != nil { - zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr)) - return - } - }(tableName) - } - + return r.setTTLTraces(ctx, orgID, params) case constants.MetricsTTL: tableNames := []string{ signozMetricDBName + "." + signozSampleLocalTableName, @@ -1951,111 +1628,10 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context, orgID string, go metricTTL(tableName) } case constants.LogsTTL: - if r.useLogsNewSchema { - return r.SetTTLLogsV2(ctx, orgID, params) - } - - tableName := r.logsDB + "." + r.logsLocalTable - statusItem, err := r.checkTTLStatusItem(ctx, orgID, tableName) - if err != nil { - return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")} - } - if statusItem.Status == constants.StatusPending { - return nil, &model.ApiError{Typ: model.ErrorConflict, Err: fmt.Errorf("TTL is already running")} - } - go func(tableName string) { - ttl := types.TTLSetting{ - Identifiable: types.Identifiable{ - ID: valuer.GenerateUUID(), - }, - TimeAuditable: types.TimeAuditable{ - CreatedAt: time.Now(), - UpdatedAt: time.Now(), - }, - TransactionID: uuid, - TableName: tableName, - TTL: int(params.DelDuration), - Status: constants.StatusPending, - ColdStorageTTL: coldStorageDuration, - OrgID: orgID, - } - _, dbErr := r. - sqlDB. - BunDB(). - NewInsert(). - Model(&ttl). - Exec(ctx) - if dbErr != nil { - zap.L().Error("error in inserting to ttl_status table", zap.Error(dbErr)) - return - } - req := fmt.Sprintf( - "ALTER TABLE %v ON CLUSTER %s MODIFY TTL toDateTime(timestamp / 1000000000) + "+ - "INTERVAL %v SECOND DELETE", tableName, r.cluster, params.DelDuration) - if len(params.ColdStorageVolume) > 0 { - req += fmt.Sprintf(", toDateTime(timestamp / 1000000000)"+ - " + INTERVAL %v SECOND TO VOLUME '%s'", - params.ToColdStorageDuration, params.ColdStorageVolume) - } - err := r.setColdStorage(context.Background(), tableName, params.ColdStorageVolume) - if err != nil { - zap.L().Error("error in setting cold storage", zap.Error(err)) - statusItem, err := r.checkTTLStatusItem(ctx, orgID, tableName) - if err == nil { - _, dbErr := r. - sqlDB. - BunDB(). - NewUpdate(). - Model(new(types.TTLSetting)). - Set("updated_at = ?", time.Now()). - Set("status = ?", constants.StatusFailed). - Where("id = ?", statusItem.ID.StringValue()). - Exec(ctx) - if dbErr != nil { - zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr)) - return - } - } - return - } - req += " SETTINGS materialize_ttl_after_modify=0" - zap.L().Info("Executing TTL request: ", zap.String("request", req)) - statusItem, _ := r.checkTTLStatusItem(ctx, orgID, tableName) - if err := r.db.Exec(ctx, req); err != nil { - zap.L().Error("error while setting ttl", zap.Error(err)) - _, dbErr := r. - sqlDB. - BunDB(). - NewUpdate(). - Model(new(types.TTLSetting)). - Set("updated_at = ?", time.Now()). - Set("status = ?", constants.StatusFailed). - Where("id = ?", statusItem.ID.StringValue()). - Exec(ctx) - if dbErr != nil { - zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr)) - return - } - return - } - _, dbErr = r. - sqlDB. - BunDB(). - NewUpdate(). - Model(new(types.TTLSetting)). - Set("updated_at = ?", time.Now()). - Set("status = ?", constants.StatusSuccess). - Where("id = ?", statusItem.ID.StringValue()). - Exec(ctx) - if dbErr != nil { - zap.L().Error("Error in processing ttl_status update sql query", zap.Error(dbErr)) - return - } - }(tableName) + return r.setTTLLogs(ctx, orgID, params) default: - return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error while setting ttl. ttl type should be , got %v", - params.Type)} + return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error while setting ttl. ttl type should be , got %v", params.Type)} } return &model.SetTTLResponseItem{Message: "move ttl has been successfully set up"}, nil @@ -2674,14 +2250,8 @@ func (r *ClickHouseReader) GetTotalSpans(ctx context.Context) (uint64, error) { } func (r *ClickHouseReader) GetSpansInLastHeartBeatInterval(ctx context.Context, interval time.Duration) (uint64, error) { - var spansInLastHeartBeatInterval uint64 - - queryStr := fmt.Sprintf("SELECT count() from %s.%s where timestamp > toUnixTimestamp(now()-toIntervalMinute(%d));", signozTraceDBName, signozSpansTable, int(interval.Minutes())) - if r.useTraceNewSchema { - queryStr = fmt.Sprintf("SELECT count() from %s.%s where ts_bucket_start >= toUInt64(toUnixTimestamp(now() - toIntervalMinute(%d))) - 1800 and timestamp > toUnixTimestamp(now()-toIntervalMinute(%d));", signozTraceDBName, r.traceTableName, int(interval.Minutes()), int(interval.Minutes())) - } - r.db.QueryRow(ctx, queryStr).Scan(&spansInLastHeartBeatInterval) + r.db.QueryRow(ctx, fmt.Sprintf("SELECT count() from %s.%s where ts_bucket_start >= toUInt64(toUnixTimestamp(now() - toIntervalMinute(%d))) - 1800 and timestamp > toUnixTimestamp(now()-toIntervalMinute(%d));", signozTraceDBName, r.traceTableName, int(interval.Minutes()), int(interval.Minutes()))).Scan(&spansInLastHeartBeatInterval) return spansInLastHeartBeatInterval, nil } @@ -2813,17 +2383,10 @@ func (r *ClickHouseReader) GetLogsInfoInLastHeartBeatInterval(ctx context.Contex } func (r *ClickHouseReader) GetTagsInfoInLastHeartBeatInterval(ctx context.Context, interval time.Duration) (*model.TagsInfo, error) { - queryStr := fmt.Sprintf(`select serviceName, stringTagMap['deployment.environment'] as env, - stringTagMap['telemetry.sdk.language'] as language from %s.%s - where timestamp > toUnixTimestamp(now()-toIntervalMinute(%d)) - group by serviceName, env, language;`, r.TraceDB, r.traceTableName, int(interval.Minutes())) - - if r.useTraceNewSchema { - queryStr = fmt.Sprintf(`select serviceName, resources_string['deployment.environment'] as env, + queryStr := fmt.Sprintf(`select serviceName, resources_string['deployment.environment'] as env, resources_string['telemetry.sdk.language'] as language from %s.%s where timestamp > toUnixTimestamp(now()-toIntervalMinute(%d)) group by serviceName, env, language;`, r.TraceDB, r.traceTableName, int(interval.Minutes())) - } tagTelemetryDataList := []model.TagTelemetryData{} err := r.db.Select(ctx, &tagTelemetryDataList, queryStr) @@ -2924,7 +2487,7 @@ func (r *ClickHouseReader) extractSelectedAndInterestingFields(tableStatement st field.Type = overrideFieldType } // all static fields are assumed to be selected as we don't allow changing them - if isColumn(r.useLogsNewSchema, tableStatement, field.Type, field.Name, field.DataType) { + if isColumn(tableStatement, field.Type, field.Name, field.DataType) { response.Selected = append(response.Selected, field) } else { response.Interesting = append(response.Interesting, field) @@ -2932,7 +2495,7 @@ func (r *ClickHouseReader) extractSelectedAndInterestingFields(tableStatement st } } -func (r *ClickHouseReader) UpdateLogFieldV2(ctx context.Context, field *model.UpdateField) *model.ApiError { +func (r *ClickHouseReader) UpdateLogField(ctx context.Context, field *model.UpdateField) *model.ApiError { if !field.Selected { return model.ForbiddenError(errors.New("removing a selected field is not allowed, please reach out to support.")) } @@ -2998,120 +2561,6 @@ func (r *ClickHouseReader) UpdateLogFieldV2(ctx context.Context, field *model.Up return nil } -func (r *ClickHouseReader) UpdateLogField(ctx context.Context, field *model.UpdateField) *model.ApiError { - // don't allow updating static fields - if field.Type == constants.Static { - err := errors.New("cannot update static fields") - return &model.ApiError{Err: err, Typ: model.ErrorBadData} - } - - if r.useLogsNewSchema { - return r.UpdateLogFieldV2(ctx, field) - } - - // if a field is selected it means that the field needs to be indexed - if field.Selected { - colname := utils.GetClickhouseColumnName(field.Type, field.DataType, field.Name) - - keyColName := fmt.Sprintf("%s_%s_key", field.Type, strings.ToLower(field.DataType)) - valueColName := fmt.Sprintf("%s_%s_value", field.Type, strings.ToLower(field.DataType)) - - // create materialized column - - for _, table := range []string{r.logsLocalTable, r.logsTable} { - q := "ALTER TABLE %s.%s ON CLUSTER %s ADD COLUMN IF NOT EXISTS %s %s DEFAULT %s[indexOf(%s, '%s')] CODEC(ZSTD(1))" - query := fmt.Sprintf(q, - r.logsDB, table, - r.cluster, - colname, field.DataType, - valueColName, - keyColName, - field.Name, - ) - err := r.db.Exec(ctx, query) - if err != nil { - return &model.ApiError{Err: err, Typ: model.ErrorInternal} - } - - query = fmt.Sprintf("ALTER TABLE %s.%s ON CLUSTER %s ADD COLUMN IF NOT EXISTS %s_exists` bool DEFAULT if(indexOf(%s, '%s') != 0, true, false) CODEC(ZSTD(1))", - r.logsDB, table, - r.cluster, - strings.TrimSuffix(colname, "`"), - keyColName, - field.Name, - ) - err = r.db.Exec(ctx, query) - if err != nil { - return &model.ApiError{Err: err, Typ: model.ErrorInternal} - } - } - - // create the index - if strings.ToLower(field.DataType) == "bool" { - // there is no point in creating index for bool attributes as the cardinality is just 2 - return nil - } - - if field.IndexType == "" { - field.IndexType = constants.DefaultLogSkipIndexType - } - if field.IndexGranularity == 0 { - field.IndexGranularity = constants.DefaultLogSkipIndexGranularity - } - query := fmt.Sprintf("ALTER TABLE %s.%s ON CLUSTER %s ADD INDEX IF NOT EXISTS %s_idx` (%s) TYPE %s GRANULARITY %d", - r.logsDB, r.logsLocalTable, - r.cluster, - strings.TrimSuffix(colname, "`"), - colname, - field.IndexType, - field.IndexGranularity, - ) - err := r.db.Exec(ctx, query) - if err != nil { - return &model.ApiError{Err: err, Typ: model.ErrorInternal} - } - - } else { - // We are not allowing to delete a materialized column - // For more details please check https://github.com/SigNoz/signoz/issues/4566 - return model.ForbiddenError(errors.New("Removing a selected field is not allowed, please reach out to support.")) - - // Delete the index first - // query := fmt.Sprintf("ALTER TABLE %s.%s ON CLUSTER %s DROP INDEX IF EXISTS %s_idx`", r.logsDB, r.logsLocalTable, r.cluster, strings.TrimSuffix(colname, "`")) - // err := r.db.Exec(ctx, query) - // if err != nil { - // return &model.ApiError{Err: err, Typ: model.ErrorInternal} - // } - - // for _, table := range []string{r.logsTable, r.logsLocalTable} { - // // drop materialized column from logs table - // query := "ALTER TABLE %s.%s ON CLUSTER %s DROP COLUMN IF EXISTS %s " - // err := r.db.Exec(ctx, fmt.Sprintf(query, - // r.logsDB, table, - // r.cluster, - // colname, - // ), - // ) - // if err != nil { - // return &model.ApiError{Err: err, Typ: model.ErrorInternal} - // } - - // // drop exists column on logs table - // query = "ALTER TABLE %s.%s ON CLUSTER %s DROP COLUMN IF EXISTS %s_exists` " - // err = r.db.Exec(ctx, fmt.Sprintf(query, - // r.logsDB, table, - // r.cluster, - // strings.TrimSuffix(colname, "`"), - // ), - // ) - // if err != nil { - // return &model.ApiError{Err: err, Typ: model.ErrorInternal} - // } - // } - } - return nil -} - func (r *ClickHouseReader) GetTraceFields(ctx context.Context) (*model.GetFieldsResponse, *model.ApiError) { // response will contain top level fields from the otel trace model response := model.GetFieldsResponse{ @@ -3821,15 +3270,8 @@ func (r *ClickHouseReader) GetLatestReceivedMetric( return result, nil } -func isColumn(useLogsNewSchema bool, tableStatement, attrType, field, datType string) bool { - // value of attrType will be `resource` or `tag`, if `tag` change it to `attribute` - var name string - if useLogsNewSchema { - // adding explict '`' - name = fmt.Sprintf("`%s`", utils.GetClickhouseColumnNameV2(attrType, datType, field)) - } else { - name = utils.GetClickhouseColumnName(attrType, datType, field) - } +func isColumn(tableStatement, attrType, field, datType string) bool { + name := fmt.Sprintf("`%s`", utils.GetClickhouseColumnNameV2(attrType, datType, field)) return strings.Contains(tableStatement, fmt.Sprintf("%s ", name)) } @@ -3902,7 +3344,7 @@ func (r *ClickHouseReader) GetLogAggregateAttributes(ctx context.Context, req *v Key: tagKey, DataType: v3.AttributeKeyDataType(dataType), Type: v3.AttributeKeyType(attType), - IsColumn: isColumn(r.useLogsNewSchema, statements[0].Statement, attType, tagKey, dataType), + IsColumn: isColumn(statements[0].Statement, attType, tagKey, dataType), } response.AttributeKeys = append(response.AttributeKeys, key) } @@ -3962,7 +3404,7 @@ func (r *ClickHouseReader) GetLogAttributeKeys(ctx context.Context, req *v3.Filt Key: attributeKey, DataType: v3.AttributeKeyDataType(attributeDataType), Type: v3.AttributeKeyType(tagType), - IsColumn: isColumn(r.useLogsNewSchema, statements[0].Statement, tagType, attributeKey, attributeDataType), + IsColumn: isColumn(statements[0].Statement, tagType, attributeKey, attributeDataType), } response.AttributeKeys = append(response.AttributeKeys, key) @@ -4689,7 +4131,7 @@ func (r *ClickHouseReader) GetTraceAggregateAttributes(ctx context.Context, req Key: tagKey, DataType: v3.AttributeKeyDataType(dataType), Type: v3.AttributeKeyType(tagType), - IsColumn: isColumn(true, statements[0].Statement, tagType, tagKey, dataType), + IsColumn: isColumn(statements[0].Statement, tagType, tagKey, dataType), } if _, ok := constants.DeprecatedStaticFieldsTraces[tagKey]; !ok { @@ -4698,10 +4140,6 @@ func (r *ClickHouseReader) GetTraceAggregateAttributes(ctx context.Context, req } fields := constants.NewStaticFieldsTraces - if !r.useTraceNewSchema { - fields = constants.DeprecatedStaticFieldsTraces - } - // add the new static fields for _, field := range fields { if (!stringAllowed && field.DataType == v3.AttributeKeyDataTypeString) || (v3.AttributeKey{} == field) { @@ -4754,7 +4192,7 @@ func (r *ClickHouseReader) GetTraceAttributeKeys(ctx context.Context, req *v3.Fi Key: tagKey, DataType: v3.AttributeKeyDataType(dataType), Type: v3.AttributeKeyType(tagType), - IsColumn: isColumn(true, statements[0].Statement, tagType, tagKey, dataType), + IsColumn: isColumn(statements[0].Statement, tagType, tagKey, dataType), } // don't send deprecated static fields @@ -4767,10 +4205,6 @@ func (r *ClickHouseReader) GetTraceAttributeKeys(ctx context.Context, req *v3.Fi // remove this later just to have NewStaticFieldsTraces in the response fields := constants.NewStaticFieldsTraces - if !r.useTraceNewSchema { - fields = constants.DeprecatedStaticFieldsTraces - } - // add the new static fields only when the tagType is not specified // i.e retrieve all attributes if req.TagType == "" { @@ -4833,10 +4267,7 @@ func (r *ClickHouseReader) GetTraceAttributeValues(ctx context.Context, req *v3. } // TODO(nitya): remove 24 hour limit in future after checking the perf/resource implications - where := "timestamp >= toDateTime64(now() - INTERVAL 48 HOUR, 9)" - if r.useTraceNewSchema { - where += " AND ts_bucket_start >= toUInt64(toUnixTimestamp(now() - INTERVAL 48 HOUR))" - } + where := "timestamp >= toDateTime64(now() - INTERVAL 48 HOUR, 9) AND ts_bucket_start >= toUInt64(toUnixTimestamp(now() - INTERVAL 48 HOUR))" query = fmt.Sprintf("SELECT DISTINCT %s FROM %s.%s WHERE %s AND %s ILIKE $1 LIMIT $2", selectKey, r.TraceDB, r.traceTableName, where, filterValueColumnWhere) rows, err = r.db.Query(ctx, query, searchText, req.Limit) } else { @@ -4883,7 +4314,7 @@ func (r *ClickHouseReader) GetTraceAttributeValues(ctx context.Context, req *v3. return &attributeValues, nil } -func (r *ClickHouseReader) GetSpanAttributeKeysV2(ctx context.Context) (map[string]v3.AttributeKey, error) { +func (r *ClickHouseReader) GetSpanAttributeKeys(ctx context.Context) (map[string]v3.AttributeKey, error) { var query string var err error var rows driver.Rows @@ -4916,7 +4347,7 @@ func (r *ClickHouseReader) GetSpanAttributeKeysV2(ctx context.Context) (map[stri Key: tagKey, DataType: v3.AttributeKeyDataType(dataType), Type: v3.AttributeKeyType(tagType), - IsColumn: isColumn(true, statements[0].Statement, tagType, tagKey, dataType), + IsColumn: isColumn(statements[0].Statement, tagType, tagKey, dataType), } name := tagKey + "##" + tagType + "##" + strings.ToLower(dataType) @@ -4931,50 +4362,6 @@ func (r *ClickHouseReader) GetSpanAttributeKeysV2(ctx context.Context) (map[stri return response, nil } -func (r *ClickHouseReader) GetSpanAttributeKeys(ctx context.Context) (map[string]v3.AttributeKey, error) { - if r.useTraceNewSchema { - return r.GetSpanAttributeKeysV2(ctx) - } - var query string - var err error - var rows driver.Rows - response := map[string]v3.AttributeKey{} - - query = fmt.Sprintf("SELECT DISTINCT(tagKey), tagType, dataType, isColumn FROM %s.%s", r.TraceDB, r.spanAttributesKeysTable) - - rows, err = r.db.Query(ctx, query) - - if err != nil { - zap.L().Error("Error while executing query", zap.Error(err)) - return nil, fmt.Errorf("error while executing query: %s", err.Error()) - } - defer rows.Close() - - var tagKey string - var dataType string - var tagType string - var isColumn bool - for rows.Next() { - if err := rows.Scan(&tagKey, &tagType, &dataType, &isColumn); err != nil { - return nil, fmt.Errorf("error while scanning rows: %s", err.Error()) - } - key := v3.AttributeKey{ - Key: tagKey, - DataType: v3.AttributeKeyDataType(dataType), - Type: v3.AttributeKeyType(tagType), - IsColumn: isColumn, - } - response[tagKey] = key - } - - // add the deprecated static fields as they are not present in spanAttributeKeysTable - for _, f := range constants.DeprecatedStaticFieldsTraces { - response[f.Key] = f - } - - return response, nil -} - func (r *ClickHouseReader) LiveTailLogsV4(ctx context.Context, query string, timestampStart uint64, idStart string, client *model.LogsLiveTailClientV2) { if timestampStart == 0 { timestampStart = uint64(time.Now().UnixNano()) @@ -6808,7 +6195,7 @@ func (r *ClickHouseReader) GetUpdatedMetricsMetadata(ctx context.Context, metric return cachedMetadata, nil } -func (r *ClickHouseReader) SearchTracesV2(ctx context.Context, params *model.SearchTracesParams) (*[]model.SearchSpansResult, error) { +func (r *ClickHouseReader) SearchTraces(ctx context.Context, params *model.SearchTracesParams) (*[]model.SearchSpansResult, error) { searchSpansResult := []model.SearchSpansResult{ { Columns: []string{"__time", "SpanId", "TraceId", "ServiceName", "Name", "Kind", "DurationNano", "TagsKeys", "TagsValues", "References", "Events", "HasError", "StatusMessage", "StatusCodeString", "SpanKind"}, @@ -6954,115 +6341,3 @@ func (r *ClickHouseReader) SearchTracesV2(ctx context.Context, params *model.Sea return &searchSpansResult, nil } - -func (r *ClickHouseReader) SearchTraces(ctx context.Context, params *model.SearchTracesParams) (*[]model.SearchSpansResult, error) { - - if r.useTraceNewSchema { - return r.SearchTracesV2(ctx, params) - } - - var countSpans uint64 - countQuery := fmt.Sprintf("SELECT count() as count from %s.%s WHERE traceID=$1", r.TraceDB, r.SpansTable) - err := r.db.QueryRow(ctx, countQuery, params.TraceID).Scan(&countSpans) - if err != nil { - zap.L().Error("Error in processing sql query", zap.Error(err)) - return nil, fmt.Errorf("error in processing sql query") - } - - if countSpans > uint64(params.MaxSpansInTrace) { - zap.L().Error("Max spans allowed in a trace limit reached", zap.Int("MaxSpansInTrace", params.MaxSpansInTrace), - zap.Uint64("Count", countSpans)) - claims, errv2 := authtypes.ClaimsFromContext(ctx) - if errv2 == nil { - data := map[string]interface{}{ - "traceSize": countSpans, - "maxSpansInTraceLimit": params.MaxSpansInTrace, - "algo": "smart", - } - telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_MAX_SPANS_ALLOWED_LIMIT_REACHED, data, claims.Email, true, false) - } - return nil, fmt.Errorf("max spans allowed in trace limit reached, please contact support for more details") - } - - claims, errv2 := authtypes.ClaimsFromContext(ctx) - if errv2 == nil { - data := map[string]interface{}{ - "traceSize": countSpans, - "algo": "smart", - } - telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_TRACE_DETAIL_API, data, claims.Email, true, false) - } - - var startTime, endTime, durationNano uint64 - var searchScanResponses []model.SearchSpanDBResponseItem - - query := fmt.Sprintf("SELECT timestamp, traceID, model FROM %s.%s WHERE traceID=$1", r.TraceDB, r.SpansTable) - - start := time.Now() - - err = r.db.Select(ctx, &searchScanResponses, query, params.TraceID) - - zap.L().Info(query) - - if err != nil { - zap.L().Error("Error in processing sql query", zap.Error(err)) - return nil, fmt.Errorf("error in processing sql query") - } - end := time.Now() - zap.L().Debug("getTraceSQLQuery took: ", zap.Duration("duration", end.Sub(start))) - searchSpansResult := []model.SearchSpansResult{{ - Columns: []string{"__time", "SpanId", "TraceId", "ServiceName", "Name", "Kind", "DurationNano", "TagsKeys", "TagsValues", "References", "Events", "HasError", "StatusMessage", "StatusCodeString", "SpanKind"}, - Events: make([][]interface{}, len(searchScanResponses)), - IsSubTree: false, - }, - } - - searchSpanResponses := []model.SearchSpanResponseItem{} - start = time.Now() - for _, item := range searchScanResponses { - var jsonItem model.SearchSpanResponseItem - easyjson.Unmarshal([]byte(item.Model), &jsonItem) - jsonItem.TimeUnixNano = uint64(item.Timestamp.UnixNano() / 1000000) - searchSpanResponses = append(searchSpanResponses, jsonItem) - if startTime == 0 || jsonItem.TimeUnixNano < startTime { - startTime = jsonItem.TimeUnixNano - } - if endTime == 0 || jsonItem.TimeUnixNano > endTime { - endTime = jsonItem.TimeUnixNano - } - if durationNano == 0 || uint64(jsonItem.DurationNano) > durationNano { - durationNano = uint64(jsonItem.DurationNano) - } - } - end = time.Now() - zap.L().Debug("getTraceSQLQuery unmarshal took: ", zap.Duration("duration", end.Sub(start))) - - if len(searchScanResponses) > params.SpansRenderLimit { - start = time.Now() - searchSpansResult, err = smart.SmartTraceAlgorithm(searchSpanResponses, params.SpanID, params.LevelUp, params.LevelDown, params.SpansRenderLimit) - if err != nil { - return nil, err - } - end = time.Now() - zap.L().Debug("smartTraceAlgo took: ", zap.Duration("duration", end.Sub(start))) - claims, errv2 := authtypes.ClaimsFromContext(ctx) - if errv2 == nil { - data := map[string]interface{}{ - "traceSize": len(searchScanResponses), - "spansRenderLimit": params.SpansRenderLimit, - "algo": "smart", - } - telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_LARGE_TRACE_OPENED, data, claims.Email, true, false) - } - } else { - for i, item := range searchSpanResponses { - spanEvents := item.GetValues() - searchSpansResult[0].Events[i] = spanEvents - } - } - - searchSpansResult[0].StartTimestampMillis = startTime - (durationNano / 1000000) - searchSpansResult[0].EndTimestampMillis = endTime + (durationNano / 1000000) - - return &searchSpansResult, nil -} diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 0528278dc4..7c9e696051 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -118,9 +118,6 @@ type APIHandler struct { // Websocket connection upgrader Upgrader *websocket.Upgrader - UseLogsNewSchema bool - UseTraceNewSchema bool - hostsRepo *inframetrics.HostsRepo processesRepo *inframetrics.ProcessesRepo podsRepo *inframetrics.PodsRepo @@ -177,11 +174,6 @@ type APIHandlerOpts struct { // Querier Influx Interval FluxInterval time.Duration - // Use Logs New schema - UseLogsNewSchema bool - - UseTraceNewSchema bool - JWT *authtypes.JWT AlertmanagerAPI *alertmanager.API @@ -194,21 +186,17 @@ type APIHandlerOpts struct { // NewAPIHandler returns an APIHandler func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) { querierOpts := querier.QuerierOptions{ - Reader: opts.Reader, - Cache: opts.Cache, - KeyGenerator: queryBuilder.NewKeyGenerator(), - FluxInterval: opts.FluxInterval, - UseLogsNewSchema: opts.UseLogsNewSchema, - UseTraceNewSchema: opts.UseTraceNewSchema, + Reader: opts.Reader, + Cache: opts.Cache, + KeyGenerator: queryBuilder.NewKeyGenerator(), + FluxInterval: opts.FluxInterval, } querierOptsV2 := querierV2.QuerierOptions{ - Reader: opts.Reader, - Cache: opts.Cache, - KeyGenerator: queryBuilder.NewKeyGenerator(), - FluxInterval: opts.FluxInterval, - UseLogsNewSchema: opts.UseLogsNewSchema, - UseTraceNewSchema: opts.UseTraceNewSchema, + Reader: opts.Reader, + Cache: opts.Cache, + KeyGenerator: queryBuilder.NewKeyGenerator(), + FluxInterval: opts.FluxInterval, } querier := querier.NewQuerier(querierOpts) @@ -239,8 +227,6 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) { LogsParsingPipelineController: opts.LogsParsingPipelineController, querier: querier, querierV2: querierv2, - UseLogsNewSchema: opts.UseLogsNewSchema, - UseTraceNewSchema: opts.UseTraceNewSchema, hostsRepo: hostsRepo, processesRepo: processesRepo, podsRepo: podsRepo, @@ -259,15 +245,8 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) { FieldsAPI: opts.FieldsAPI, } - logsQueryBuilder := logsv3.PrepareLogsQuery - if opts.UseLogsNewSchema { - logsQueryBuilder = logsv4.PrepareLogsQuery - } - - tracesQueryBuilder := tracesV3.PrepareTracesQuery - if opts.UseTraceNewSchema { - tracesQueryBuilder = tracesV4.PrepareTracesQuery - } + logsQueryBuilder := logsv4.PrepareLogsQuery + tracesQueryBuilder := tracesV4.PrepareTracesQuery builderOpts := queryBuilder.QueryBuilderOptions{ BuildMetricQuery: metricsv3.PrepareMetricQuery, @@ -4839,11 +4818,7 @@ func (aH *APIHandler) queryRangeV3(ctx context.Context, queryRangeParams *v3.Que RespondError(w, apiErrObj, errQuriesByName) return } - if aH.UseTraceNewSchema { - tracesV4.Enrich(queryRangeParams, spanKeys) - } else { - tracesV3.Enrich(queryRangeParams, spanKeys) - } + tracesV4.Enrich(queryRangeParams, spanKeys) } @@ -5202,88 +5177,7 @@ func (aH *APIHandler) liveTailLogsV2(w http.ResponseWriter, r *http.Request) { } func (aH *APIHandler) liveTailLogs(w http.ResponseWriter, r *http.Request) { - if aH.UseLogsNewSchema { - aH.liveTailLogsV2(w, r) - return - } - - // get the param from url and add it to body - stringReader := strings.NewReader(r.URL.Query().Get("q")) - r.Body = io.NopCloser(stringReader) - - queryRangeParams, apiErrorObj := ParseQueryRangeParams(r) - if apiErrorObj != nil { - zap.L().Error(apiErrorObj.Err.Error()) - RespondError(w, apiErrorObj, nil) - return - } - - var err error - var queryString string - switch queryRangeParams.CompositeQuery.QueryType { - case v3.QueryTypeBuilder: - // check if any enrichment is required for logs if yes then enrich them - if logsv3.EnrichmentRequired(queryRangeParams) { - logsFields, err := aH.reader.GetLogFields(r.Context()) - if err != nil { - apiErrObj := &model.ApiError{Typ: model.ErrorInternal, Err: err} - RespondError(w, apiErrObj, nil) - return - } - // get the fields if any logs query is present - fields := model.GetLogFieldsV3(r.Context(), queryRangeParams, logsFields) - logsv3.Enrich(queryRangeParams, fields) - } - - queryString, err = aH.queryBuilder.PrepareLiveTailQuery(queryRangeParams) - if err != nil { - RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil) - return - } - - default: - err = fmt.Errorf("invalid query type") - RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil) - return - } - - // create the client - client := &model.LogsLiveTailClient{Name: r.RemoteAddr, Logs: make(chan *model.SignozLog, 1000), Done: make(chan *bool), Error: make(chan error)} - go aH.reader.LiveTailLogsV3(r.Context(), queryString, uint64(queryRangeParams.Start), "", client) - - w.Header().Set("Connection", "keep-alive") - w.Header().Set("Content-Type", "text/event-stream") - w.Header().Set("Cache-Control", "no-cache") - w.Header().Set("Access-Control-Allow-Origin", "*") - w.WriteHeader(200) - - flusher, ok := w.(http.Flusher) - if !ok { - err := model.ApiError{Typ: model.ErrorStreamingNotSupported, Err: nil} - RespondError(w, &err, "streaming is not supported") - return - } - // flush the headers - flusher.Flush() - for { - select { - case log := <-client.Logs: - var buf bytes.Buffer - enc := json.NewEncoder(&buf) - enc.Encode(log) - fmt.Fprintf(w, "data: %v\n\n", buf.String()) - flusher.Flush() - case <-client.Done: - zap.L().Debug("done!") - return - case err := <-client.Error: - zap.L().Error("error occurred", zap.Error(err)) - fmt.Fprintf(w, "event: error\ndata: %v\n\n", err.Error()) - flusher.Flush() - return - } - } - + aH.liveTailLogsV2(w, r) } func (aH *APIHandler) getMetricMetadata(w http.ResponseWriter, r *http.Request) { @@ -5324,11 +5218,7 @@ func (aH *APIHandler) queryRangeV4(ctx context.Context, queryRangeParams *v3.Que RespondError(w, apiErrObj, errQuriesByName) return } - if aH.UseTraceNewSchema { - tracesV4.Enrich(queryRangeParams, spanKeys) - } else { - tracesV3.Enrich(queryRangeParams, spanKeys) - } + tracesV4.Enrich(queryRangeParams, spanKeys) } // WARN: Only works for AND operator in traces query diff --git a/pkg/query-service/app/querier/helper.go b/pkg/query-service/app/querier/helper.go index ba98aa32f8..893f97de83 100644 --- a/pkg/query-service/app/querier/helper.go +++ b/pkg/query-service/app/querier/helper.go @@ -6,10 +6,8 @@ import ( "strings" "sync" - logsV3 "github.com/SigNoz/signoz/pkg/query-service/app/logs/v3" logsV4 "github.com/SigNoz/signoz/pkg/query-service/app/logs/v4" metricsV3 "github.com/SigNoz/signoz/pkg/query-service/app/metrics/v3" - tracesV3 "github.com/SigNoz/signoz/pkg/query-service/app/traces/v3" tracesV4 "github.com/SigNoz/signoz/pkg/query-service/app/traces/v4" "github.com/SigNoz/signoz/pkg/query-service/common" "github.com/SigNoz/signoz/pkg/query-service/constants" @@ -19,19 +17,15 @@ import ( "go.uber.org/zap" ) -func prepareLogsQuery(_ context.Context, - useLogsNewSchema bool, +func prepareLogsQuery( + _ context.Context, start, end int64, builderQuery *v3.BuilderQuery, params *v3.QueryRangeParamsV3, ) (string, error) { query := "" - - logsQueryBuilder := logsV3.PrepareLogsQuery - if useLogsNewSchema { - logsQueryBuilder = logsV4.PrepareLogsQuery - } + logsQueryBuilder := logsV4.PrepareLogsQuery if params == nil || builderQuery == nil { return query, fmt.Errorf("params and builderQuery cannot be nil") @@ -102,7 +96,7 @@ func (q *querier) runBuilderQuery( var err error if _, ok := cacheKeys[queryName]; !ok || params.NoCache { zap.L().Info("skipping cache for logs query", zap.String("queryName", queryName), zap.Int64("start", start), zap.Int64("end", end), zap.Int64("step", builderQuery.StepInterval), zap.Bool("noCache", params.NoCache), zap.String("cacheKey", cacheKeys[queryName])) - query, err = prepareLogsQuery(ctx, q.UseLogsNewSchema, start, end, builderQuery, params) + query, err = prepareLogsQuery(ctx, start, end, builderQuery, params) if err != nil { ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil} return @@ -117,7 +111,7 @@ func (q *querier) runBuilderQuery( missedSeries := make([]querycache.CachedSeriesData, 0) filteredMissedSeries := make([]querycache.CachedSeriesData, 0) for _, miss := range misses { - query, err = prepareLogsQuery(ctx, q.UseLogsNewSchema, miss.Start, miss.End, builderQuery, params) + query, err = prepareLogsQuery(ctx, miss.Start, miss.End, builderQuery, params) if err != nil { ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil} return @@ -169,11 +163,7 @@ func (q *querier) runBuilderQuery( } if builderQuery.DataSource == v3.DataSourceTraces { - - tracesQueryBuilder := tracesV3.PrepareTracesQuery - if q.UseTraceNewSchema { - tracesQueryBuilder = tracesV4.PrepareTracesQuery - } + tracesQueryBuilder := tracesV4.PrepareTracesQuery var query string var err error diff --git a/pkg/query-service/app/querier/querier.go b/pkg/query-service/app/querier/querier.go index 78d5fae534..7ce9e00037 100644 --- a/pkg/query-service/app/querier/querier.go +++ b/pkg/query-service/app/querier/querier.go @@ -6,11 +6,9 @@ import ( "sync" "time" - logsV3 "github.com/SigNoz/signoz/pkg/query-service/app/logs/v3" logsV4 "github.com/SigNoz/signoz/pkg/query-service/app/logs/v4" metricsV3 "github.com/SigNoz/signoz/pkg/query-service/app/metrics/v3" "github.com/SigNoz/signoz/pkg/query-service/app/queryBuilder" - tracesV3 "github.com/SigNoz/signoz/pkg/query-service/app/traces/v3" tracesV4 "github.com/SigNoz/signoz/pkg/query-service/app/traces/v4" "github.com/SigNoz/signoz/pkg/query-service/common" "github.com/SigNoz/signoz/pkg/query-service/constants" @@ -52,9 +50,6 @@ type querier struct { timeRanges [][]int returnedSeries []*v3.Series returnedErr error - - UseLogsNewSchema bool - UseTraceNewSchema bool } type QuerierOptions struct { @@ -64,22 +59,14 @@ type QuerierOptions struct { FluxInterval time.Duration // used for testing - TestingMode bool - ReturnedSeries []*v3.Series - ReturnedErr error - UseLogsNewSchema bool - UseTraceNewSchema bool + TestingMode bool + ReturnedSeries []*v3.Series + ReturnedErr error } func NewQuerier(opts QuerierOptions) interfaces.Querier { - logsQueryBuilder := logsV3.PrepareLogsQuery - if opts.UseLogsNewSchema { - logsQueryBuilder = logsV4.PrepareLogsQuery - } - tracesQueryBuilder := tracesV3.PrepareTracesQuery - if opts.UseTraceNewSchema { - tracesQueryBuilder = tracesV4.PrepareTracesQuery - } + logsQueryBuilder := logsV4.PrepareLogsQuery + tracesQueryBuilder := tracesV4.PrepareTracesQuery qc := querycache.NewQueryCache(querycache.WithCache(opts.Cache), querycache.WithFluxInterval(opts.FluxInterval)) @@ -96,11 +83,9 @@ func NewQuerier(opts QuerierOptions) interfaces.Querier { BuildMetricQuery: metricsV3.PrepareMetricQuery, }), - testingMode: opts.TestingMode, - returnedSeries: opts.ReturnedSeries, - returnedErr: opts.ReturnedErr, - UseLogsNewSchema: opts.UseLogsNewSchema, - UseTraceNewSchema: opts.UseTraceNewSchema, + testingMode: opts.TestingMode, + returnedSeries: opts.ReturnedSeries, + returnedErr: opts.ReturnedErr, } } @@ -445,11 +430,6 @@ func (q *querier) runBuilderListQueries(ctx context.Context, params *v3.QueryRan len(params.CompositeQuery.BuilderQueries) == 1 && params.CompositeQuery.PanelType != v3.PanelTypeTrace { for _, v := range params.CompositeQuery.BuilderQueries { - if (v.DataSource == v3.DataSourceLogs && !q.UseLogsNewSchema) || - (v.DataSource == v3.DataSourceTraces && !q.UseTraceNewSchema) { - break - } - // only allow of logs queries with timestamp ordering desc // TODO(nitya): allow for timestamp asc if (v.DataSource == v3.DataSourceLogs || v.DataSource == v3.DataSourceTraces) && diff --git a/pkg/query-service/app/querier/querier_test.go b/pkg/query-service/app/querier/querier_test.go index 989211f0b6..c13e6ed151 100644 --- a/pkg/query-service/app/querier/querier_test.go +++ b/pkg/query-service/app/querier/querier_test.go @@ -1370,8 +1370,6 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) { telemetryStore, prometheustest.New(instrumentationtest.New().Logger(), prometheus.Config{}), "", - true, - true, time.Duration(time.Second), nil, ) diff --git a/pkg/query-service/app/querier/v2/helper.go b/pkg/query-service/app/querier/v2/helper.go index 0ef9c32257..4120c3a095 100644 --- a/pkg/query-service/app/querier/v2/helper.go +++ b/pkg/query-service/app/querier/v2/helper.go @@ -6,11 +6,9 @@ import ( "strings" "sync" - logsV3 "github.com/SigNoz/signoz/pkg/query-service/app/logs/v3" logsV4 "github.com/SigNoz/signoz/pkg/query-service/app/logs/v4" metricsV3 "github.com/SigNoz/signoz/pkg/query-service/app/metrics/v3" metricsV4 "github.com/SigNoz/signoz/pkg/query-service/app/metrics/v4" - tracesV3 "github.com/SigNoz/signoz/pkg/query-service/app/traces/v3" tracesV4 "github.com/SigNoz/signoz/pkg/query-service/app/traces/v4" "github.com/SigNoz/signoz/pkg/query-service/common" "github.com/SigNoz/signoz/pkg/query-service/constants" @@ -19,17 +17,14 @@ import ( "go.uber.org/zap" ) -func prepareLogsQuery(_ context.Context, - useLogsNewSchema bool, +func prepareLogsQuery( + _ context.Context, start, end int64, builderQuery *v3.BuilderQuery, params *v3.QueryRangeParamsV3, ) (string, error) { - logsQueryBuilder := logsV3.PrepareLogsQuery - if useLogsNewSchema { - logsQueryBuilder = logsV4.PrepareLogsQuery - } + logsQueryBuilder := logsV4.PrepareLogsQuery query := "" if params == nil || builderQuery == nil { @@ -102,7 +97,7 @@ func (q *querier) runBuilderQuery( var err error if _, ok := cacheKeys[queryName]; !ok || params.NoCache { zap.L().Info("skipping cache for logs query", zap.String("queryName", queryName), zap.Int64("start", params.Start), zap.Int64("end", params.End), zap.Int64("step", params.Step), zap.Bool("noCache", params.NoCache), zap.String("cacheKey", cacheKeys[queryName])) - query, err = prepareLogsQuery(ctx, q.UseLogsNewSchema, start, end, builderQuery, params) + query, err = prepareLogsQuery(ctx, start, end, builderQuery, params) if err != nil { ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil} return @@ -116,7 +111,7 @@ func (q *querier) runBuilderQuery( missedSeries := make([]querycache.CachedSeriesData, 0) filteredMissedSeries := make([]querycache.CachedSeriesData, 0) for _, miss := range misses { - query, err = prepareLogsQuery(ctx, q.UseLogsNewSchema, miss.Start, miss.End, builderQuery, params) + query, err = prepareLogsQuery(ctx, miss.Start, miss.End, builderQuery, params) if err != nil { ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil} return @@ -169,11 +164,7 @@ func (q *querier) runBuilderQuery( } if builderQuery.DataSource == v3.DataSourceTraces { - - tracesQueryBuilder := tracesV3.PrepareTracesQuery - if q.UseTraceNewSchema { - tracesQueryBuilder = tracesV4.PrepareTracesQuery - } + tracesQueryBuilder := tracesV4.PrepareTracesQuery var query string var err error diff --git a/pkg/query-service/app/querier/v2/querier.go b/pkg/query-service/app/querier/v2/querier.go index d5c8e18c75..c3050ad14b 100644 --- a/pkg/query-service/app/querier/v2/querier.go +++ b/pkg/query-service/app/querier/v2/querier.go @@ -6,11 +6,9 @@ import ( "sync" "time" - logsV3 "github.com/SigNoz/signoz/pkg/query-service/app/logs/v3" logsV4 "github.com/SigNoz/signoz/pkg/query-service/app/logs/v4" metricsV4 "github.com/SigNoz/signoz/pkg/query-service/app/metrics/v4" "github.com/SigNoz/signoz/pkg/query-service/app/queryBuilder" - tracesV3 "github.com/SigNoz/signoz/pkg/query-service/app/traces/v3" tracesV4 "github.com/SigNoz/signoz/pkg/query-service/app/traces/v4" "github.com/SigNoz/signoz/pkg/query-service/common" "github.com/SigNoz/signoz/pkg/query-service/constants" @@ -49,11 +47,9 @@ type querier struct { testingMode bool queriesExecuted []string // tuple of start and end time in milliseconds - timeRanges [][]int - returnedSeries []*v3.Series - returnedErr error - UseLogsNewSchema bool - UseTraceNewSchema bool + timeRanges [][]int + returnedSeries []*v3.Series + returnedErr error } type QuerierOptions struct { @@ -63,23 +59,14 @@ type QuerierOptions struct { FluxInterval time.Duration // used for testing - TestingMode bool - ReturnedSeries []*v3.Series - ReturnedErr error - UseLogsNewSchema bool - UseTraceNewSchema bool + TestingMode bool + ReturnedSeries []*v3.Series + ReturnedErr error } func NewQuerier(opts QuerierOptions) interfaces.Querier { - logsQueryBuilder := logsV3.PrepareLogsQuery - if opts.UseLogsNewSchema { - logsQueryBuilder = logsV4.PrepareLogsQuery - } - - tracesQueryBuilder := tracesV3.PrepareTracesQuery - if opts.UseTraceNewSchema { - tracesQueryBuilder = tracesV4.PrepareTracesQuery - } + logsQueryBuilder := logsV4.PrepareLogsQuery + tracesQueryBuilder := tracesV4.PrepareTracesQuery qc := querycache.NewQueryCache(querycache.WithCache(opts.Cache), querycache.WithFluxInterval(opts.FluxInterval)) @@ -96,11 +83,9 @@ func NewQuerier(opts QuerierOptions) interfaces.Querier { BuildMetricQuery: metricsV4.PrepareMetricQuery, }), - testingMode: opts.TestingMode, - returnedSeries: opts.ReturnedSeries, - returnedErr: opts.ReturnedErr, - UseLogsNewSchema: opts.UseLogsNewSchema, - UseTraceNewSchema: opts.UseTraceNewSchema, + testingMode: opts.TestingMode, + returnedSeries: opts.ReturnedSeries, + returnedErr: opts.ReturnedErr, } } @@ -446,11 +431,6 @@ func (q *querier) runBuilderListQueries(ctx context.Context, params *v3.QueryRan len(params.CompositeQuery.BuilderQueries) == 1 && params.CompositeQuery.PanelType != v3.PanelTypeTrace { for _, v := range params.CompositeQuery.BuilderQueries { - if (v.DataSource == v3.DataSourceLogs && !q.UseLogsNewSchema) || - (v.DataSource == v3.DataSourceTraces && !q.UseTraceNewSchema) { - break - } - // only allow of logs queries with timestamp ordering desc // TODO(nitya): allow for timestamp asc if (v.DataSource == v3.DataSourceLogs || v.DataSource == v3.DataSourceTraces) && diff --git a/pkg/query-service/app/querier/v2/querier_test.go b/pkg/query-service/app/querier/v2/querier_test.go index 94c70b3c50..8b6d1c3114 100644 --- a/pkg/query-service/app/querier/v2/querier_test.go +++ b/pkg/query-service/app/querier/v2/querier_test.go @@ -1424,8 +1424,6 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) { telemetryStore, prometheustest.New(instrumentationtest.New().Logger(), prometheus.Config{}), "", - true, - true, time.Duration(time.Second), nil, ) diff --git a/pkg/query-service/app/server.go b/pkg/query-service/app/server.go index f0ee2cc0c1..1b0d544273 100644 --- a/pkg/query-service/app/server.go +++ b/pkg/query-service/app/server.go @@ -53,8 +53,6 @@ type ServerOptions struct { FluxInterval string FluxIntervalForTraceDetail string Cluster string - UseLogsNewSchema bool - UseTraceNewSchema bool SigNoz *signoz.SigNoz Jwt *authtypes.JWT } @@ -110,8 +108,6 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { serverOptions.SigNoz.TelemetryStore, serverOptions.SigNoz.Prometheus, serverOptions.Cluster, - serverOptions.UseLogsNewSchema, - serverOptions.UseTraceNewSchema, fluxIntervalForTraceDetail, serverOptions.SigNoz.Cache, ) @@ -129,8 +125,6 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { serverOptions.SigNoz.SQLStore.SQLxDB(), reader, c, - serverOptions.UseLogsNewSchema, - serverOptions.UseTraceNewSchema, serverOptions.SigNoz.SQLStore, serverOptions.SigNoz.TelemetryStore, serverOptions.SigNoz.Prometheus, @@ -173,8 +167,6 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { LogsParsingPipelineController: logParsingPipelineController, Cache: c, FluxInterval: fluxInterval, - UseLogsNewSchema: serverOptions.UseLogsNewSchema, - UseTraceNewSchema: serverOptions.UseTraceNewSchema, JWT: serverOptions.Jwt, AlertmanagerAPI: alertmanager.NewAPI(serverOptions.SigNoz.Alertmanager), FieldsAPI: fields.NewAPI(serverOptions.SigNoz.TelemetryStore), @@ -435,25 +427,21 @@ func makeRulesManager( db *sqlx.DB, ch interfaces.Reader, cache cache.Cache, - useLogsNewSchema bool, - useTraceNewSchema bool, sqlstore sqlstore.SQLStore, telemetryStore telemetrystore.TelemetryStore, prometheus prometheus.Prometheus, ) (*rules.Manager, error) { // create manager opts managerOpts := &rules.ManagerOptions{ - TelemetryStore: telemetryStore, - Prometheus: prometheus, - DBConn: db, - Context: context.Background(), - Logger: zap.L(), - Reader: ch, - Cache: cache, - EvalDelay: constants.GetEvalDelay(), - UseLogsNewSchema: useLogsNewSchema, - UseTraceNewSchema: useTraceNewSchema, - SQLStore: sqlstore, + TelemetryStore: telemetryStore, + Prometheus: prometheus, + DBConn: db, + Context: context.Background(), + Logger: zap.L(), + Reader: ch, + Cache: cache, + EvalDelay: constants.GetEvalDelay(), + SQLStore: sqlstore, } // create Manager diff --git a/pkg/query-service/main.go b/pkg/query-service/main.go index 5824ab4266..d376ce45d8 100644 --- a/pkg/query-service/main.go +++ b/pkg/query-service/main.go @@ -47,7 +47,9 @@ func main() { var maxOpenConns int var dialTimeout time.Duration + // Deprecated flag.BoolVar(&useLogsNewSchema, "use-logs-new-schema", false, "use logs_v2 schema for logs") + // Deprecated flag.BoolVar(&useTraceNewSchema, "use-trace-new-schema", false, "use new schema for traces") // Deprecated flag.StringVar(&promConfigPath, "config", "./config/prometheus.yml", "(prometheus config to read metrics)") @@ -130,8 +132,6 @@ func main() { FluxInterval: fluxInterval, FluxIntervalForTraceDetail: fluxIntervalForTraceDetail, Cluster: cluster, - UseLogsNewSchema: useLogsNewSchema, - UseTraceNewSchema: useTraceNewSchema, SigNoz: signoz, Jwt: jwt, } diff --git a/pkg/query-service/rules/manager.go b/pkg/query-service/rules/manager.go index 59406137cb..e320e6d878 100644 --- a/pkg/query-service/rules/manager.go +++ b/pkg/query-service/rules/manager.go @@ -34,33 +34,29 @@ import ( ) type PrepareTaskOptions struct { - Rule *ruletypes.PostableRule - TaskName string - RuleStore ruletypes.RuleStore - MaintenanceStore ruletypes.MaintenanceStore - Logger *zap.Logger - Reader interfaces.Reader - Cache cache.Cache - ManagerOpts *ManagerOptions - NotifyFunc NotifyFunc - SQLStore sqlstore.SQLStore - UseLogsNewSchema bool - UseTraceNewSchema bool - OrgID string + Rule *ruletypes.PostableRule + TaskName string + RuleStore ruletypes.RuleStore + MaintenanceStore ruletypes.MaintenanceStore + Logger *zap.Logger + Reader interfaces.Reader + Cache cache.Cache + ManagerOpts *ManagerOptions + NotifyFunc NotifyFunc + SQLStore sqlstore.SQLStore + OrgID string } type PrepareTestRuleOptions struct { - Rule *ruletypes.PostableRule - RuleStore ruletypes.RuleStore - MaintenanceStore ruletypes.MaintenanceStore - Logger *zap.Logger - Reader interfaces.Reader - Cache cache.Cache - ManagerOpts *ManagerOptions - NotifyFunc NotifyFunc - SQLStore sqlstore.SQLStore - UseLogsNewSchema bool - UseTraceNewSchema bool + Rule *ruletypes.PostableRule + RuleStore ruletypes.RuleStore + MaintenanceStore ruletypes.MaintenanceStore + Logger *zap.Logger + Reader interfaces.Reader + Cache cache.Cache + ManagerOpts *ManagerOptions + NotifyFunc NotifyFunc + SQLStore sqlstore.SQLStore } const taskNamesuffix = "webAppEditor" @@ -95,10 +91,7 @@ type ManagerOptions struct { EvalDelay time.Duration - PrepareTaskFunc func(opts PrepareTaskOptions) (Task, error) - - UseLogsNewSchema bool - UseTraceNewSchema bool + PrepareTaskFunc func(opts PrepareTaskOptions) (Task, error) PrepareTestRuleFunc func(opts PrepareTestRuleOptions) (int, *model.ApiError) Alertmanager alertmanager.Alertmanager SQLStore sqlstore.SQLStore @@ -121,9 +114,6 @@ type Manager struct { prepareTaskFunc func(opts PrepareTaskOptions) (Task, error) prepareTestRuleFunc func(opts PrepareTestRuleOptions) (int, *model.ApiError) - UseLogsNewSchema bool - UseTraceNewSchema bool - alertmanager alertmanager.Alertmanager sqlstore sqlstore.SQLStore } @@ -156,8 +146,6 @@ func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) { ruleId, opts.Rule, opts.Reader, - opts.UseLogsNewSchema, - opts.UseTraceNewSchema, WithEvalDelay(opts.ManagerOpts.EvalDelay), WithSQLStore(opts.SQLStore), ) @@ -407,19 +395,17 @@ func (m *Manager) editTask(_ context.Context, orgID string, rule *ruletypes.Post zap.L().Debug("editing a rule task", zap.String("name", taskName)) newTask, err := m.prepareTaskFunc(PrepareTaskOptions{ - Rule: rule, - TaskName: taskName, - RuleStore: m.ruleStore, - MaintenanceStore: m.maintenanceStore, - Logger: m.logger, - Reader: m.reader, - Cache: m.cache, - ManagerOpts: m.opts, - NotifyFunc: m.prepareNotifyFunc(), - SQLStore: m.sqlstore, - UseLogsNewSchema: m.opts.UseLogsNewSchema, - UseTraceNewSchema: m.opts.UseTraceNewSchema, - OrgID: orgID, + Rule: rule, + TaskName: taskName, + RuleStore: m.ruleStore, + MaintenanceStore: m.maintenanceStore, + Logger: m.logger, + Reader: m.reader, + Cache: m.cache, + ManagerOpts: m.opts, + NotifyFunc: m.prepareNotifyFunc(), + SQLStore: m.sqlstore, + OrgID: orgID, }) if err != nil { @@ -595,19 +581,17 @@ func (m *Manager) addTask(_ context.Context, orgID string, rule *ruletypes.Posta zap.L().Debug("adding a new rule task", zap.String("name", taskName)) newTask, err := m.prepareTaskFunc(PrepareTaskOptions{ - Rule: rule, - TaskName: taskName, - RuleStore: m.ruleStore, - MaintenanceStore: m.maintenanceStore, - Logger: m.logger, - Reader: m.reader, - Cache: m.cache, - ManagerOpts: m.opts, - NotifyFunc: m.prepareNotifyFunc(), - SQLStore: m.sqlstore, - UseLogsNewSchema: m.opts.UseLogsNewSchema, - UseTraceNewSchema: m.opts.UseTraceNewSchema, - OrgID: orgID, + Rule: rule, + TaskName: taskName, + RuleStore: m.ruleStore, + MaintenanceStore: m.maintenanceStore, + Logger: m.logger, + Reader: m.reader, + Cache: m.cache, + ManagerOpts: m.opts, + NotifyFunc: m.prepareNotifyFunc(), + SQLStore: m.sqlstore, + OrgID: orgID, }) if err != nil { @@ -987,17 +971,15 @@ func (m *Manager) TestNotification(ctx context.Context, ruleStr string) (int, *m } alertCount, apiErr := m.prepareTestRuleFunc(PrepareTestRuleOptions{ - Rule: parsedRule, - RuleStore: m.ruleStore, - MaintenanceStore: m.maintenanceStore, - Logger: m.logger, - Reader: m.reader, - Cache: m.cache, - ManagerOpts: m.opts, - NotifyFunc: m.prepareTestNotifyFunc(), - SQLStore: m.sqlstore, - UseLogsNewSchema: m.opts.UseLogsNewSchema, - UseTraceNewSchema: m.opts.UseTraceNewSchema, + Rule: parsedRule, + RuleStore: m.ruleStore, + MaintenanceStore: m.maintenanceStore, + Logger: m.logger, + Reader: m.reader, + Cache: m.cache, + ManagerOpts: m.opts, + NotifyFunc: m.prepareTestNotifyFunc(), + SQLStore: m.sqlstore, }) return alertCount, apiErr diff --git a/pkg/query-service/rules/test_notification.go b/pkg/query-service/rules/test_notification.go index dd3aa13aae..c0349201d1 100644 --- a/pkg/query-service/rules/test_notification.go +++ b/pkg/query-service/rules/test_notification.go @@ -15,7 +15,6 @@ import ( // TestNotification prepares a dummy rule for given rule parameters and // sends a test notification. returns alert count and error (if any) func defaultTestNotification(opts PrepareTestRuleOptions) (int, *model.ApiError) { - ctx := context.Background() if opts.Rule == nil { @@ -48,8 +47,6 @@ func defaultTestNotification(opts PrepareTestRuleOptions) (int, *model.ApiError) alertname, parsedRule, opts.Reader, - opts.UseLogsNewSchema, - opts.UseTraceNewSchema, WithSendAlways(), WithSendUnmatched(), WithSQLStore(opts.SQLStore), diff --git a/pkg/query-service/rules/threshold_rule.go b/pkg/query-service/rules/threshold_rule.go index 162631829d..b0739dfd55 100644 --- a/pkg/query-service/rules/threshold_rule.go +++ b/pkg/query-service/rules/threshold_rule.go @@ -29,7 +29,6 @@ import ( "github.com/SigNoz/signoz/pkg/query-service/utils/timestamp" logsv3 "github.com/SigNoz/signoz/pkg/query-service/app/logs/v3" - tracesV3 "github.com/SigNoz/signoz/pkg/query-service/app/traces/v3" tracesV4 "github.com/SigNoz/signoz/pkg/query-service/app/traces/v4" "github.com/SigNoz/signoz/pkg/query-service/formatter" @@ -52,16 +51,12 @@ type ThresholdRule struct { // used for attribute metadata enrichment for logs and traces logsKeys map[string]v3.AttributeKey spansKeys map[string]v3.AttributeKey - - useTraceNewSchema bool } func NewThresholdRule( id string, p *ruletypes.PostableRule, reader interfaces.Reader, - useLogsNewSchema bool, - useTraceNewSchema bool, opts ...RuleOption, ) (*ThresholdRule, error) { @@ -73,25 +68,20 @@ func NewThresholdRule( } t := ThresholdRule{ - BaseRule: baseRule, - version: p.Version, - useTraceNewSchema: useTraceNewSchema, + BaseRule: baseRule, + version: p.Version, } querierOption := querier.QuerierOptions{ - Reader: reader, - Cache: nil, - KeyGenerator: queryBuilder.NewKeyGenerator(), - UseLogsNewSchema: useLogsNewSchema, - UseTraceNewSchema: useTraceNewSchema, + Reader: reader, + Cache: nil, + KeyGenerator: queryBuilder.NewKeyGenerator(), } querierOptsV2 := querierV2.QuerierOptions{ - Reader: reader, - Cache: nil, - KeyGenerator: queryBuilder.NewKeyGenerator(), - UseLogsNewSchema: useLogsNewSchema, - UseTraceNewSchema: useTraceNewSchema, + Reader: reader, + Cache: nil, + KeyGenerator: queryBuilder.NewKeyGenerator(), } t.querier = querier.NewQuerier(querierOption) @@ -301,11 +291,7 @@ func (r *ThresholdRule) buildAndRunQuery(ctx context.Context, ts time.Time) (rul return nil, err } r.spansKeys = spanKeys - if r.useTraceNewSchema { - tracesV4.Enrich(params, spanKeys) - } else { - tracesV3.Enrich(params, spanKeys) - } + tracesV4.Enrich(params, spanKeys) } } diff --git a/pkg/query-service/rules/threshold_rule_test.go b/pkg/query-service/rules/threshold_rule_test.go index 97e23aba1f..1e15f9636e 100644 --- a/pkg/query-service/rules/threshold_rule_test.go +++ b/pkg/query-service/rules/threshold_rule_test.go @@ -801,7 +801,7 @@ func TestThresholdRuleShouldAlert(t *testing.T) { postableRule.RuleCondition.MatchType = ruletypes.MatchType(c.matchType) postableRule.RuleCondition.Target = &c.target - rule, err := NewThresholdRule("69", &postableRule, nil, true, true, WithEvalDelay(2*time.Minute)) + rule, err := NewThresholdRule("69", &postableRule, nil, WithEvalDelay(2*time.Minute)) if err != nil { assert.NoError(t, err) } @@ -889,7 +889,7 @@ func TestPrepareLinksToLogs(t *testing.T) { }, } - rule, err := NewThresholdRule("69", &postableRule, nil, true, true, WithEvalDelay(2*time.Minute)) + rule, err := NewThresholdRule("69", &postableRule, nil, WithEvalDelay(2*time.Minute)) if err != nil { assert.NoError(t, err) } @@ -930,7 +930,7 @@ func TestPrepareLinksToTraces(t *testing.T) { }, } - rule, err := NewThresholdRule("69", &postableRule, nil, true, true, WithEvalDelay(2*time.Minute)) + rule, err := NewThresholdRule("69", &postableRule, nil, WithEvalDelay(2*time.Minute)) if err != nil { assert.NoError(t, err) } @@ -1005,7 +1005,7 @@ func TestThresholdRuleLabelNormalization(t *testing.T) { postableRule.RuleCondition.MatchType = ruletypes.MatchType(c.matchType) postableRule.RuleCondition.Target = &c.target - rule, err := NewThresholdRule("69", &postableRule, nil, true, true, WithEvalDelay(2*time.Minute)) + rule, err := NewThresholdRule("69", &postableRule, nil, WithEvalDelay(2*time.Minute)) if err != nil { assert.NoError(t, err) } @@ -1057,7 +1057,7 @@ func TestThresholdRuleEvalDelay(t *testing.T) { } for idx, c := range cases { - rule, err := NewThresholdRule("69", &postableRule, nil, true, true) // no eval delay + rule, err := NewThresholdRule("69", &postableRule, nil) // no eval delay if err != nil { assert.NoError(t, err) } @@ -1105,7 +1105,7 @@ func TestThresholdRuleClickHouseTmpl(t *testing.T) { } for idx, c := range cases { - rule, err := NewThresholdRule("69", &postableRule, nil, true, true, WithEvalDelay(2*time.Minute)) + rule, err := NewThresholdRule("69", &postableRule, nil, WithEvalDelay(2*time.Minute)) if err != nil { assert.NoError(t, err) } @@ -1244,8 +1244,8 @@ func TestThresholdRuleUnitCombinations(t *testing.T) { options := clickhouseReader.NewOptions("", "", "archiveNamespace") readerCache, err := memorycache.New(context.Background(), factorytest.NewSettings(), cache.Config{Provider: "memory", Memory: cache.Memory{TTL: DefaultFrequency}}) require.NoError(t, err) - reader := clickhouseReader.NewReaderFromClickhouseConnection(options, nil, telemetryStore, prometheustest.New(instrumentationtest.New().Logger(), prometheus.Config{}), "", true, true, time.Duration(time.Second), readerCache) - rule, err := NewThresholdRule("69", &postableRule, reader, true, true) + reader := clickhouseReader.NewReaderFromClickhouseConnection(options, nil, telemetryStore, prometheustest.New(instrumentationtest.New().Logger(), prometheus.Config{}), "", time.Duration(time.Second), readerCache) + rule, err := NewThresholdRule("69", &postableRule, reader) rule.TemporalityMap = map[string]map[v3.Temporality]bool{ "signoz_calls_total": { v3.Delta: true, @@ -1340,9 +1340,9 @@ func TestThresholdRuleNoData(t *testing.T) { } readerCache, err := memorycache.New(context.Background(), factorytest.NewSettings(), cache.Config{Provider: "memory", Memory: cache.Memory{TTL: DefaultFrequency}}) options := clickhouseReader.NewOptions("", "", "archiveNamespace") - reader := clickhouseReader.NewReaderFromClickhouseConnection(options, nil, telemetryStore, prometheustest.New(instrumentationtest.New().Logger(), prometheus.Config{}), "", true, true, time.Duration(time.Second), readerCache) + reader := clickhouseReader.NewReaderFromClickhouseConnection(options, nil, telemetryStore, prometheustest.New(instrumentationtest.New().Logger(), prometheus.Config{}), "", time.Duration(time.Second), readerCache) - rule, err := NewThresholdRule("69", &postableRule, reader, true, true) + rule, err := NewThresholdRule("69", &postableRule, reader) rule.TemporalityMap = map[string]map[v3.Temporality]bool{ "signoz_calls_total": { v3.Delta: true, @@ -1444,9 +1444,9 @@ func TestThresholdRuleTracesLink(t *testing.T) { } options := clickhouseReader.NewOptions("", "", "archiveNamespace") - reader := clickhouseReader.NewReaderFromClickhouseConnection(options, nil, telemetryStore, prometheustest.New(instrumentationtest.New().Logger(), prometheus.Config{}), "", true, true, time.Duration(time.Second), nil) + reader := clickhouseReader.NewReaderFromClickhouseConnection(options, nil, telemetryStore, prometheustest.New(instrumentationtest.New().Logger(), prometheus.Config{}), "", time.Duration(time.Second), nil) - rule, err := NewThresholdRule("69", &postableRule, reader, true, true) + rule, err := NewThresholdRule("69", &postableRule, reader) rule.TemporalityMap = map[string]map[v3.Temporality]bool{ "signoz_calls_total": { v3.Delta: true, @@ -1565,9 +1565,9 @@ func TestThresholdRuleLogsLink(t *testing.T) { } options := clickhouseReader.NewOptions("", "", "archiveNamespace") - reader := clickhouseReader.NewReaderFromClickhouseConnection(options, nil, telemetryStore, prometheustest.New(instrumentationtest.New().Logger(), prometheus.Config{}), "", true, true, time.Duration(time.Second), nil) + reader := clickhouseReader.NewReaderFromClickhouseConnection(options, nil, telemetryStore, prometheustest.New(instrumentationtest.New().Logger(), prometheus.Config{}), "", time.Duration(time.Second), nil) - rule, err := NewThresholdRule("69", &postableRule, reader, true, true) + rule, err := NewThresholdRule("69", &postableRule, reader) rule.TemporalityMap = map[string]map[v3.Temporality]bool{ "signoz_calls_total": { v3.Delta: true, @@ -1643,7 +1643,7 @@ func TestThresholdRuleShiftBy(t *testing.T) { }, } - rule, err := NewThresholdRule("69", &postableRule, nil, true, true) + rule, err := NewThresholdRule("69", &postableRule, nil) if err != nil { assert.NoError(t, err) } diff --git a/pkg/query-service/tests/integration/test_utils.go b/pkg/query-service/tests/integration/test_utils.go index 2d8f11009b..1a6e4f24c7 100644 --- a/pkg/query-service/tests/integration/test_utils.go +++ b/pkg/query-service/tests/integration/test_utils.go @@ -46,8 +46,6 @@ func NewMockClickhouseReader(t *testing.T, testDB sqlstore.SQLStore) (*clickhous telemetryStore, prometheustest.New(instrumentationtest.New().Logger(), prometheus.Config{}), "", - true, - true, time.Duration(time.Second), nil, )