diff --git a/ee/query-service/main.go b/ee/query-service/main.go index 3323e5bdbd..4fad91008f 100644 --- a/ee/query-service/main.go +++ b/ee/query-service/main.go @@ -14,7 +14,9 @@ import ( semconv "go.opentelemetry.io/otel/semconv/v1.4.0" "go.signoz.io/signoz/ee/query-service/app" "go.signoz.io/signoz/pkg/query-service/auth" + "go.signoz.io/signoz/pkg/query-service/constants" baseconst "go.signoz.io/signoz/pkg/query-service/constants" + "go.signoz.io/signoz/pkg/query-service/migrate" "go.signoz.io/signoz/pkg/query-service/version" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -143,6 +145,12 @@ func main() { zap.L().Info("JWT secret key set successfully.") } + if err := migrate.Migrate(constants.RELATIONAL_DATASOURCE_PATH); err != nil { + zap.L().Error("Failed to migrate", zap.Error(err)) + } else { + zap.L().Info("Migration successful") + } + server, err := app.NewServer(serverOptions) if err != nil { zap.L().Fatal("Failed to create server", zap.Error(err)) diff --git a/pkg/query-service/app/formula.go b/pkg/query-service/app/formula.go index 8fa6010dfc..f1f10e4499 100644 --- a/pkg/query-service/app/formula.go +++ b/pkg/query-service/app/formula.go @@ -90,6 +90,7 @@ func joinAndCalculate(results []*v3.Result, uniqueLabelSet map[string]string, ex resultSeries := &v3.Series{ Labels: uniqueLabelSet, + Points: make([]v3.Point, 0), } timestamps := make([]int64, 0) for timestamp := range uniqueTimestamps { diff --git a/pkg/query-service/app/parser.go b/pkg/query-service/app/parser.go index 2a70f96250..773156cc0d 100644 --- a/pkg/query-service/app/parser.go +++ b/pkg/query-service/app/parser.go @@ -1065,43 +1065,7 @@ func ParseQueryRangeParams(r *http.Request) (*v3.QueryRangeParamsV3, *model.ApiE } query.ShiftBy = timeShiftBy - if query.Filters == nil || len(query.Filters.Items) == 0 { - continue - } - - for idx := range query.Filters.Items { - item := &query.Filters.Items[idx] - value := item.Value - if value != nil { - switch x := value.(type) { - case string: - variableName := strings.Trim(x, "{[.$]}") - if _, ok := queryRangeParams.Variables[variableName]; ok { - item.Value = queryRangeParams.Variables[variableName] - } - case []interface{}: - if len(x) > 0 { - switch x[0].(type) { - case string: - variableName := strings.Trim(x[0].(string), "{[.$]}") - if _, ok := queryRangeParams.Variables[variableName]; ok { - item.Value = queryRangeParams.Variables[variableName] - } - } - } - } - } - - if item.Operator != v3.FilterOperatorIn && item.Operator != v3.FilterOperatorNotIn { - // the value type should not be multiple values - if _, ok := item.Value.([]interface{}); ok { - return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("multiple values %s are not allowed for operator `%s` for key `%s`", item.Value, item.Operator, item.Key.Key)} - } - } - } - // for metrics v3 - // if the aggregate operator is a histogram quantile, and user has not forgotten // the le tag in the group by then add the le tag to the group by if query.AggregateOperator == v3.AggregateOperatorHistQuant50 || @@ -1129,28 +1093,38 @@ func ParseQueryRangeParams(r *http.Request) (*v3.QueryRangeParamsV3, *model.ApiE } } - // for metrics v4 - if v3.IsPercentileOperator(query.SpaceAggregation) && - query.AggregateAttribute.Type != v3.AttributeKeyType(v3.MetricTypeExponentialHistogram) { - // If quantile is set, we need to group by le - // and set the space aggregation to sum - // and time aggregation to rate - query.TimeAggregation = v3.TimeAggregationRate - query.SpaceAggregation = v3.SpaceAggregationSum - // If le is not present in group by for quantile, add it - leFound := false - for _, groupBy := range query.GroupBy { - if groupBy.Key == "le" { - leFound = true - break + if query.Filters == nil || len(query.Filters.Items) == 0 { + continue + } + + for idx := range query.Filters.Items { + item := &query.Filters.Items[idx] + value := item.Value + if value != nil { + switch x := value.(type) { + case string: + variableName := strings.Trim(x, "{[.$]}") + if _, ok := queryRangeParams.Variables[variableName]; ok { + item.Value = queryRangeParams.Variables[variableName] + } + case []interface{}: + if len(x) > 0 { + switch x[0].(type) { + case string: + variableName := strings.Trim(x[0].(string), "{[.$]}") + if _, ok := queryRangeParams.Variables[variableName]; ok { + item.Value = queryRangeParams.Variables[variableName] + } + } + } } } - if !leFound { - query.GroupBy = append(query.GroupBy, v3.AttributeKey{ - Key: "le", - Type: v3.AttributeKeyTypeTag, - DataType: v3.AttributeKeyDataTypeString, - }) + + if v3.FilterOperator(strings.ToLower((string(item.Operator)))) != v3.FilterOperatorIn && v3.FilterOperator(strings.ToLower((string(item.Operator)))) != v3.FilterOperatorNotIn { + // the value type should not be multiple values + if _, ok := item.Value.([]interface{}); ok { + return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("multiple values %s are not allowed for operator `%s` for key `%s`", item.Value, item.Operator, item.Key.Key)} + } } } } diff --git a/pkg/query-service/main.go b/pkg/query-service/main.go index b6f5e0281d..72962cfeef 100644 --- a/pkg/query-service/main.go +++ b/pkg/query-service/main.go @@ -11,6 +11,7 @@ import ( "go.signoz.io/signoz/pkg/query-service/app" "go.signoz.io/signoz/pkg/query-service/auth" "go.signoz.io/signoz/pkg/query-service/constants" + "go.signoz.io/signoz/pkg/query-service/migrate" "go.signoz.io/signoz/pkg/query-service/version" "go.uber.org/zap" @@ -90,6 +91,12 @@ func main() { zap.L().Info("JWT secret key set successfully.") } + if err := migrate.Migrate(constants.RELATIONAL_DATASOURCE_PATH); err != nil { + zap.L().Error("Failed to migrate", zap.Error(err)) + } else { + zap.L().Info("Migration successful") + } + server, err := app.NewServer(serverOptions) if err != nil { logger.Fatal("Failed to create server", zap.Error(err)) diff --git a/pkg/query-service/migrate/0_45_alerts_to_v4/run.go b/pkg/query-service/migrate/0_45_alerts_to_v4/run.go new file mode 100644 index 0000000000..f68f4ca43b --- /dev/null +++ b/pkg/query-service/migrate/0_45_alerts_to_v4/run.go @@ -0,0 +1,153 @@ +package alertstov4 + +import ( + "context" + "encoding/json" + + "github.com/jmoiron/sqlx" + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + "go.signoz.io/signoz/pkg/query-service/rules" + "go.uber.org/multierr" + "go.uber.org/zap" +) + +var Version = "0.45-alerts-to-v4" + +var mapTimeAggregation = map[v3.AggregateOperator]v3.TimeAggregation{ + v3.AggregateOperatorSum: v3.TimeAggregationSum, + v3.AggregateOperatorMin: v3.TimeAggregationMin, + v3.AggregateOperatorMax: v3.TimeAggregationMax, + v3.AggregateOperatorSumRate: v3.TimeAggregationRate, + v3.AggregateOperatorAvgRate: v3.TimeAggregationRate, + v3.AggregateOperatorMinRate: v3.TimeAggregationRate, + v3.AggregateOperatorMaxRate: v3.TimeAggregationRate, + v3.AggregateOperatorHistQuant50: v3.TimeAggregationUnspecified, + v3.AggregateOperatorHistQuant75: v3.TimeAggregationUnspecified, + v3.AggregateOperatorHistQuant90: v3.TimeAggregationUnspecified, + v3.AggregateOperatorHistQuant95: v3.TimeAggregationUnspecified, + v3.AggregateOperatorHistQuant99: v3.TimeAggregationUnspecified, +} + +var mapSpaceAggregation = map[v3.AggregateOperator]v3.SpaceAggregation{ + v3.AggregateOperatorSum: v3.SpaceAggregationSum, + v3.AggregateOperatorMin: v3.SpaceAggregationMin, + v3.AggregateOperatorMax: v3.SpaceAggregationMax, + v3.AggregateOperatorSumRate: v3.SpaceAggregationSum, + v3.AggregateOperatorAvgRate: v3.SpaceAggregationAvg, + v3.AggregateOperatorMinRate: v3.SpaceAggregationMin, + v3.AggregateOperatorMaxRate: v3.SpaceAggregationMax, + v3.AggregateOperatorHistQuant50: v3.SpaceAggregationPercentile50, + v3.AggregateOperatorHistQuant75: v3.SpaceAggregationPercentile75, + v3.AggregateOperatorHistQuant90: v3.SpaceAggregationPercentile90, + v3.AggregateOperatorHistQuant95: v3.SpaceAggregationPercentile95, + v3.AggregateOperatorHistQuant99: v3.SpaceAggregationPercentile99, +} + +func canMigrateOperator(operator v3.AggregateOperator) bool { + switch operator { + case v3.AggregateOperatorSum, + v3.AggregateOperatorMin, + v3.AggregateOperatorMax, + v3.AggregateOperatorSumRate, + v3.AggregateOperatorAvgRate, + v3.AggregateOperatorMinRate, + v3.AggregateOperatorMaxRate, + v3.AggregateOperatorHistQuant50, + v3.AggregateOperatorHistQuant75, + v3.AggregateOperatorHistQuant90, + v3.AggregateOperatorHistQuant95, + v3.AggregateOperatorHistQuant99: + return true + } + return false +} + +func Migrate(conn *sqlx.DB) error { + ruleDB := rules.NewRuleDB(conn) + storedRules, err := ruleDB.GetStoredRules(context.Background()) + if err != nil { + return err + } + + for _, storedRule := range storedRules { + parsedRule, errs := rules.ParsePostableRule([]byte(storedRule.Data)) + if len(errs) > 0 { + // this should not happen but if it does, we should not stop the migration + zap.L().Error("Error parsing rule", zap.Error(multierr.Combine(errs...)), zap.Int("rule", storedRule.Id)) + continue + } + zap.L().Info("Rule parsed", zap.Int("rule", storedRule.Id)) + updated := false + if parsedRule.RuleCondition != nil && parsedRule.Version == "" { + if parsedRule.RuleCondition.QueryType() == v3.QueryTypeBuilder { + // check if all the queries can be converted to v4 + canMigrate := true + for _, query := range parsedRule.RuleCondition.CompositeQuery.BuilderQueries { + if query.DataSource == v3.DataSourceMetrics && query.Expression == query.QueryName { + if !canMigrateOperator(query.AggregateOperator) { + canMigrate = false + break + } + } + } + + if canMigrate { + parsedRule.Version = "v4" + for _, query := range parsedRule.RuleCondition.CompositeQuery.BuilderQueries { + if query.DataSource == v3.DataSourceMetrics && query.Expression == query.QueryName { + // update aggregate attribute + if query.AggregateOperator == v3.AggregateOperatorSum || + query.AggregateOperator == v3.AggregateOperatorMin || + query.AggregateOperator == v3.AggregateOperatorMax { + query.AggregateAttribute.Type = "Gauge" + } + if query.AggregateOperator == v3.AggregateOperatorSumRate || + query.AggregateOperator == v3.AggregateOperatorAvgRate || + query.AggregateOperator == v3.AggregateOperatorMinRate || + query.AggregateOperator == v3.AggregateOperatorMaxRate { + query.AggregateAttribute.Type = "Sum" + } + + if query.AggregateOperator == v3.AggregateOperatorHistQuant50 || + query.AggregateOperator == v3.AggregateOperatorHistQuant75 || + query.AggregateOperator == v3.AggregateOperatorHistQuant90 || + query.AggregateOperator == v3.AggregateOperatorHistQuant95 || + query.AggregateOperator == v3.AggregateOperatorHistQuant99 { + query.AggregateAttribute.Type = "Histogram" + } + query.AggregateAttribute.DataType = v3.AttributeKeyDataTypeFloat64 + query.AggregateAttribute.IsColumn = true + query.TimeAggregation = mapTimeAggregation[query.AggregateOperator] + query.SpaceAggregation = mapSpaceAggregation[query.AggregateOperator] + query.AggregateOperator = v3.AggregateOperator(query.TimeAggregation) + updated = true + } + } + } + } + } + + if !updated { + zap.L().Info("Rule not updated", zap.Int("rule", storedRule.Id)) + continue + } + + ruleJSON, jsonErr := json.Marshal(parsedRule) + if jsonErr != nil { + zap.L().Error("Error marshalling rule; skipping rule migration", zap.Error(jsonErr), zap.Int("rule", storedRule.Id)) + continue + } + + stmt, prepareError := conn.PrepareContext(context.Background(), `UPDATE rules SET data=$3 WHERE id=$4;`) + if prepareError != nil { + zap.L().Error("Error in preparing statement for UPDATE to rules", zap.Error(prepareError)) + continue + } + defer stmt.Close() + + if _, err := stmt.Exec(ruleJSON, storedRule.Id); err != nil { + zap.L().Error("Error in Executing prepared statement for UPDATE to rules", zap.Error(err)) + } + } + return nil +} diff --git a/pkg/query-service/migrate/migate.go b/pkg/query-service/migrate/migate.go new file mode 100644 index 0000000000..f9d15a1567 --- /dev/null +++ b/pkg/query-service/migrate/migate.go @@ -0,0 +1,67 @@ +package migrate + +import ( + "database/sql" + + "github.com/jmoiron/sqlx" + alertstov4 "go.signoz.io/signoz/pkg/query-service/migrate/0_45_alerts_to_v4" + "go.uber.org/zap" +) + +type DataMigration struct { + ID int `db:"id"` + Version string `db:"version"` + CreatedAt string `db:"created_at"` + Succeeded bool `db:"succeeded"` +} + +func initSchema(conn *sqlx.DB) error { + tableSchema := ` + CREATE TABLE IF NOT EXISTS data_migrations ( + id SERIAL PRIMARY KEY, + version VARCHAR(255) NOT NULL UNIQUE, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + succeeded BOOLEAN NOT NULL DEFAULT FALSE + ); + ` + _, err := conn.Exec(tableSchema) + if err != nil { + return err + } + return nil +} + +func getMigrationVersion(conn *sqlx.DB, version string) (*DataMigration, error) { + var migration DataMigration + err := conn.Get(&migration, "SELECT * FROM data_migrations WHERE version = $1", version) + if err != nil { + if err == sql.ErrNoRows { + return nil, nil + } + return nil, err + } + return &migration, nil +} + +func Migrate(dsn string) error { + conn, err := sqlx.Connect("sqlite3", dsn) + if err != nil { + return err + } + if err := initSchema(conn); err != nil { + return err + } + + if m, err := getMigrationVersion(conn, "0.45_alerts_to_v4"); err == nil && m == nil { + if err := alertstov4.Migrate(conn); err != nil { + zap.L().Error("failed to migrate 0.45_alerts_to_v4", zap.Error(err)) + } else { + _, err := conn.Exec("INSERT INTO data_migrations (version, succeeded) VALUES ('0.45_alerts_to_v4', true)") + if err != nil { + return err + } + } + } + + return nil +} diff --git a/pkg/query-service/rules/db.go b/pkg/query-service/rules/db.go index cf903884fd..23372ce911 100644 --- a/pkg/query-service/rules/db.go +++ b/pkg/query-service/rules/db.go @@ -49,7 +49,7 @@ type ruleDB struct { // todo: move init methods for creating tables -func newRuleDB(db *sqlx.DB) RuleDB { +func NewRuleDB(db *sqlx.DB) RuleDB { return &ruleDB{ db, } diff --git a/pkg/query-service/rules/manager.go b/pkg/query-service/rules/manager.go index cad02523d7..d649b565fd 100644 --- a/pkg/query-service/rules/manager.go +++ b/pkg/query-service/rules/manager.go @@ -108,7 +108,7 @@ func NewManager(o *ManagerOptions) (*Manager, error) { return nil, err } - db := newRuleDB(o.DBConn) + db := NewRuleDB(o.DBConn) m := &Manager{ tasks: map[string]Task{},