diff --git a/go.mod b/go.mod index b25925512c..4faca164ba 100644 --- a/go.mod +++ b/go.mod @@ -149,4 +149,4 @@ require ( k8s.io/client-go v8.0.0+incompatible // indirect ) -replace github.com/prometheus/prometheus => github.com/SigNoz/prometheus v1.9.76 +replace github.com/prometheus/prometheus => github.com/SigNoz/prometheus v1.9.77 diff --git a/go.sum b/go.sum index 0782dacd7d..6d566af573 100644 --- a/go.sum +++ b/go.sum @@ -57,8 +57,8 @@ github.com/OneOfOne/xxhash v1.2.8 h1:31czK/TI9sNkxIKfaUfGlU47BAxQ0ztGgd9vPyqimf8 github.com/OneOfOne/xxhash v1.2.8/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q= github.com/SigNoz/govaluate v0.0.0-20220522085550-d19c08c206cb h1:bneLSKPf9YUSFmafKx32bynV6QrzViL/s+ZDvQxH1E4= github.com/SigNoz/govaluate v0.0.0-20220522085550-d19c08c206cb/go.mod h1:JznGDNg9x1cujDKa22RaQOimOvvEfy3nxzDGd8XDgmA= -github.com/SigNoz/prometheus v1.9.76 h1:YQOHezj4Yyu6PHV7/bVR297FQgUMQAAJtCVZ+NslwYk= -github.com/SigNoz/prometheus v1.9.76/go.mod h1:Y4J9tGDmacMC+EcOTp+EIAn2C1sN+9kE+idyVKadiVM= +github.com/SigNoz/prometheus v1.9.77 h1:We3pxbCnCRWQE7tSELBkcmneiXRtcpOlXrGtLPVQ1v0= +github.com/SigNoz/prometheus v1.9.77/go.mod h1:Y4J9tGDmacMC+EcOTp+EIAn2C1sN+9kE+idyVKadiVM= github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= diff --git a/pkg/query-service/app/clickhouseReader/options.go b/pkg/query-service/app/clickhouseReader/options.go index 12f277e4f3..5836070410 100644 --- a/pkg/query-service/app/clickhouseReader/options.go +++ b/pkg/query-service/app/clickhouseReader/options.go @@ -20,18 +20,19 @@ const ( const ( defaultDatasource string = "tcp://localhost:9000" defaultTraceDB string = "signoz_traces" - defaultOperationsTable string = "signoz_operations" - defaultIndexTable string = "signoz_index_v2" - defaultErrorTable string = "signoz_error_index_v2" - defaultDurationTable string = "durationSortMV" - defaultUsageExplorerTable string = "usage_explorer" - defaultSpansTable string = "signoz_spans" - defaultDependencyGraphTable string = "dependency_graph_minutes" - defaultTopLevelOperationsTable string = "top_level_operations" + defaultOperationsTable string = "distributed_signoz_operations" + defaultIndexTable string = "distributed_signoz_index_v2" + defaultErrorTable string = "distributed_signoz_error_index_v2" + defaultDurationTable string = "distributed_durationSort" + defaultUsageExplorerTable string = "distributed_usage_explorer" + defaultSpansTable string = "distributed_signoz_spans" + defaultDependencyGraphTable string = "distributed_dependency_graph_minutes" + defaultTopLevelOperationsTable string = "distributed_top_level_operations" defaultLogsDB string = "signoz_logs" - defaultLogsTable string = "logs" - defaultLogAttributeKeysTable string = "logs_atrribute_keys" - defaultLogResourceKeysTable string = "logs_resource_keys" + defaultLogsTable string = "distributed_logs" + defaultLogsLocalTable string = "logs" + defaultLogAttributeKeysTable string = "distributed_logs_atrribute_keys" + defaultLogResourceKeysTable string = "distributed_logs_resource_keys" defaultLiveTailRefreshSeconds int = 10 defaultWriteBatchDelay time.Duration = 5 * time.Second defaultWriteBatchSize int = 10000 @@ -65,6 +66,7 @@ type namespaceConfig struct { TopLevelOperationsTable string LogsDB string LogsTable string + LogsLocalTable string LogsAttributeKeysTable string LogsResourceKeysTable string LiveTailRefreshSeconds int @@ -132,6 +134,7 @@ func NewOptions(datasource string, primaryNamespace string, otherNamespaces ...s TopLevelOperationsTable: defaultTopLevelOperationsTable, LogsDB: defaultLogsDB, LogsTable: defaultLogsTable, + LogsLocalTable: defaultLogsLocalTable, LogsAttributeKeysTable: defaultLogAttributeKeysTable, LogsResourceKeysTable: defaultLogResourceKeysTable, LiveTailRefreshSeconds: defaultLiveTailRefreshSeconds, @@ -154,6 +157,7 @@ func NewOptions(datasource string, primaryNamespace string, otherNamespaces ...s ErrorTable: "", LogsDB: "", LogsTable: "", + LogsLocalTable: "", LogsAttributeKeysTable: "", LogsResourceKeysTable: "", LiveTailRefreshSeconds: defaultLiveTailRefreshSeconds, diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index fc6df97b2d..bc8da8c615 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -50,17 +50,20 @@ import ( ) const ( - primaryNamespace = "clickhouse" - archiveNamespace = "clickhouse-archive" - signozTraceDBName = "signoz_traces" - signozDurationMVTable = "durationSort" - signozUsageExplorerTable = "usage_explorer" - signozSpansTable = "signoz_spans" - signozErrorIndexTable = "signoz_error_index_v2" - signozTraceTableName = "signoz_index_v2" - signozMetricDBName = "signoz_metrics" - signozSampleTableName = "samples_v2" - signozTSTableName = "time_series_v2" + cluster = "cluster" + primaryNamespace = "clickhouse" + archiveNamespace = "clickhouse-archive" + signozTraceDBName = "signoz_traces" + signozDurationMVTable = "distributed_durationSort" + signozUsageExplorerTable = "distributed_usage_explorer" + signozSpansTable = "distributed_signoz_spans" + signozErrorIndexTable = "distributed_signoz_error_index_v2" + signozTraceTableName = "distributed_signoz_index_v2" + signozTraceLocalTableName = "signoz_index_v2" + signozMetricDBName = "signoz_metrics" + signozSampleLocalTableName = "samples_v2" + signozSampleTableName = "distributed_samples_v2" + signozTSTableName = "distributed_time_series_v2" minTimespanForProgressiveSearch = time.Hour minTimespanForProgressiveSearchMargin = time.Minute @@ -92,6 +95,7 @@ type ClickHouseReader struct { topLevelOperationsTable string logsDB string logsTable string + logsLocalTable string logsAttributeKeys string logsResourceKeys string queryEngine *promql.Engine @@ -140,6 +144,7 @@ func NewReader(localDB *sqlx.DB, configFile string, featureFlag interfaces.Featu topLevelOperationsTable: options.primary.TopLevelOperationsTable, logsDB: options.primary.LogsDB, logsTable: options.primary.LogsTable, + logsLocalTable: options.primary.LogsLocalTable, logsAttributeKeys: options.primary.LogsAttributeKeysTable, logsResourceKeys: options.primary.LogsResourceKeysTable, liveTailRefreshSeconds: options.primary.LiveTailRefreshSeconds, @@ -2036,6 +2041,13 @@ func (r *ClickHouseReader) GetFilteredSpansAggregates(ctx context.Context, query return &GetFilteredSpansAggregatesResponse, nil } +func getLocalTableName(tableName string) string { + + tableNameSplit := strings.Split(tableName, ".") + return tableNameSplit[0] + "." + strings.Split(tableNameSplit[1], "distributed_")[1] + +} + // SetTTL sets the TTL for traces or metrics or logs tables. // This is an async API which creates goroutines to set TTL. // Status of TTL update is tracked with ttl_status table in sqlite db. @@ -2057,6 +2069,7 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context, case constants.TraceTTL: tableNameArray := []string{signozTraceDBName + "." + signozTraceTableName, signozTraceDBName + "." + signozDurationMVTable, signozTraceDBName + "." + signozSpansTable, signozTraceDBName + "." + signozErrorIndexTable, signozTraceDBName + "." + signozUsageExplorerTable, signozTraceDBName + "." + defaultDependencyGraphTable} for _, tableName = range tableNameArray { + tableName = getLocalTableName(tableName) statusItem, err := r.checkTTLStatusItem(ctx, tableName) if err != nil { return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("Error in processing ttl_status check sql query")} @@ -2066,6 +2079,7 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context, } } for _, tableName := range tableNameArray { + tableName = getLocalTableName(tableName) // TODO: DB queries should be implemented with transactional statements but currently clickhouse doesn't support them. Issue: https://github.com/ClickHouse/ClickHouse/issues/22086 go func(tableName string) { _, dbErr := r.localDB.Exec("INSERT INTO ttl_status (transaction_id, created_at, updated_at, table_name, ttl, status, cold_storage_ttl) VALUES (?, ?, ?, ?, ?, ?, ?)", uuid, time.Now(), time.Now(), tableName, params.DelDuration, constants.StatusPending, coldStorageDuration) @@ -2074,8 +2088,8 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context, return } req = fmt.Sprintf( - "ALTER TABLE %v MODIFY TTL toDateTime(timestamp) + INTERVAL %v SECOND DELETE", - tableName, params.DelDuration) + "ALTER TABLE %v ON CLUSTER %s MODIFY TTL toDateTime(timestamp) + INTERVAL %v SECOND DELETE", + tableName, cluster, params.DelDuration) if len(params.ColdStorageVolume) > 0 { req += fmt.Sprintf(", toDateTime(timestamp) + INTERVAL %v SECOND TO VOLUME '%s'", params.ToColdStorageDuration, params.ColdStorageVolume) @@ -2113,7 +2127,7 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context, } case constants.MetricsTTL: - tableName = signozMetricDBName + "." + signozSampleTableName + tableName = signozMetricDBName + "." + signozSampleLocalTableName statusItem, err := r.checkTTLStatusItem(ctx, tableName) if err != nil { return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("Error in processing ttl_status check sql query")} @@ -2128,8 +2142,8 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context, return } req = fmt.Sprintf( - "ALTER TABLE %v MODIFY TTL toDateTime(toUInt32(timestamp_ms / 1000), 'UTC') + "+ - "INTERVAL %v SECOND DELETE", tableName, params.DelDuration) + "ALTER TABLE %v ON CLUSTER %s MODIFY TTL toDateTime(toUInt32(timestamp_ms / 1000), 'UTC') + "+ + "INTERVAL %v SECOND DELETE", tableName, cluster, params.DelDuration) if len(params.ColdStorageVolume) > 0 { req += fmt.Sprintf(", toDateTime(toUInt32(timestamp_ms / 1000), 'UTC')"+ " + INTERVAL %v SECOND TO VOLUME '%s'", @@ -2166,7 +2180,7 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context, } }(tableName) case constants.LogsTTL: - tableName = r.logsDB + "." + r.logsTable + tableName = r.logsDB + "." + r.logsLocalTable statusItem, err := r.checkTTLStatusItem(ctx, tableName) if err != nil { return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")} @@ -2181,8 +2195,8 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context, return } req = fmt.Sprintf( - "ALTER TABLE %v MODIFY TTL toDateTime(timestamp / 1000000000) + "+ - "INTERVAL %v SECOND DELETE", tableName, params.DelDuration) + "ALTER TABLE %v ON CLUSTER %s MODIFY TTL toDateTime(timestamp / 1000000000) + "+ + "INTERVAL %v SECOND DELETE", tableName, cluster, params.DelDuration) if len(params.ColdStorageVolume) > 0 { req += fmt.Sprintf(", toDateTime(timestamp / 1000000000)"+ " + INTERVAL %v SECOND TO VOLUME '%s'", @@ -2287,7 +2301,7 @@ func (r *ClickHouseReader) setColdStorage(ctx context.Context, tableName string, // Set the storage policy for the required table. If it is already set, then setting it again // will not a problem. if len(coldStorageVolume) > 0 { - policyReq := fmt.Sprintf("ALTER TABLE %s MODIFY SETTING storage_policy='tiered'", tableName) + policyReq := fmt.Sprintf("ALTER TABLE %s ON CLUSTER %s MODIFY SETTING storage_policy='tiered'", tableName, cluster) zap.S().Debugf("Executing Storage policy request: %s\n", policyReq) if err := r.db.Exec(ctx, policyReq); err != nil { @@ -2313,6 +2327,15 @@ func (r *ClickHouseReader) GetDisks(ctx context.Context) (*[]model.DiskItem, *mo return &diskItems, nil } +func getLocalTableNameArray(tableNames []string) []string { + var localTableNames []string + for _, name := range tableNames { + tableNameSplit := strings.Split(name, ".") + localTableNames = append(localTableNames, tableNameSplit[0]+"."+strings.Split(tableNameSplit[1], "distributed_")[1]) + } + return localTableNames +} + // GetTTL returns current ttl, expected ttl and past setTTL status for metrics/traces. func (r *ClickHouseReader) GetTTL(ctx context.Context, ttlParams *model.GetTTLParams) (*model.GetTTLResponseItem, *model.ApiError) { @@ -2348,7 +2371,7 @@ func (r *ClickHouseReader) GetTTL(ctx context.Context, ttlParams *model.GetTTLPa getMetricsTTL := func() (*model.DBResponseTTL, *model.ApiError) { var dbResp []model.DBResponseTTL - query := fmt.Sprintf("SELECT engine_full FROM system.tables WHERE name='%v'", signozSampleTableName) + query := fmt.Sprintf("SELECT engine_full FROM system.tables WHERE name='%v'", signozSampleLocalTableName) err := r.db.Select(ctx, &dbResp, query) @@ -2366,7 +2389,7 @@ func (r *ClickHouseReader) GetTTL(ctx context.Context, ttlParams *model.GetTTLPa getTracesTTL := func() (*model.DBResponseTTL, *model.ApiError) { var dbResp []model.DBResponseTTL - query := fmt.Sprintf("SELECT engine_full FROM system.tables WHERE name='%v' AND database='%v'", signozTraceTableName, signozTraceDBName) + query := fmt.Sprintf("SELECT engine_full FROM system.tables WHERE name='%v' AND database='%v'", signozTraceLocalTableName, signozTraceDBName) err := r.db.Select(ctx, &dbResp, query) @@ -2384,7 +2407,7 @@ func (r *ClickHouseReader) GetTTL(ctx context.Context, ttlParams *model.GetTTLPa getLogsTTL := func() (*model.DBResponseTTL, *model.ApiError) { var dbResp []model.DBResponseTTL - query := fmt.Sprintf("SELECT engine_full FROM system.tables WHERE name='%v' AND database='%v'", r.logsTable, r.logsDB) + query := fmt.Sprintf("SELECT engine_full FROM system.tables WHERE name='%v' AND database='%v'", r.logsLocalTable, r.logsDB) err := r.db.Select(ctx, &dbResp, query) @@ -2402,6 +2425,8 @@ func (r *ClickHouseReader) GetTTL(ctx context.Context, ttlParams *model.GetTTLPa switch ttlParams.Type { case constants.TraceTTL: tableNameArray := []string{signozTraceDBName + "." + signozTraceTableName, signozTraceDBName + "." + signozDurationMVTable, signozTraceDBName + "." + signozSpansTable, signozTraceDBName + "." + signozErrorIndexTable, signozTraceDBName + "." + signozUsageExplorerTable, signozTraceDBName + "." + defaultDependencyGraphTable} + + tableNameArray = getLocalTableNameArray(tableNameArray) status, err := r.setTTLQueryStatus(ctx, tableNameArray) if err != nil { return nil, err @@ -2424,6 +2449,7 @@ func (r *ClickHouseReader) GetTTL(ctx context.Context, ttlParams *model.GetTTLPa case constants.MetricsTTL: tableNameArray := []string{signozMetricDBName + "." + signozSampleTableName} + tableNameArray = getLocalTableNameArray(tableNameArray) status, err := r.setTTLQueryStatus(ctx, tableNameArray) if err != nil { return nil, err @@ -2446,6 +2472,7 @@ func (r *ClickHouseReader) GetTTL(ctx context.Context, ttlParams *model.GetTTLPa case constants.LogsTTL: tableNameArray := []string{r.logsDB + "." + r.logsTable} + tableNameArray = getLocalTableNameArray(tableNameArray) status, err := r.setTTLQueryStatus(ctx, tableNameArray) if err != nil { return nil, err @@ -3107,7 +3134,7 @@ func (r *ClickHouseReader) GetLogFields(ctx context.Context) (*model.GetFieldsRe } statements := []model.ShowCreateTableStatement{} - query = fmt.Sprintf("SHOW CREATE TABLE %s.%s", r.logsDB, r.logsTable) + query = fmt.Sprintf("SHOW CREATE TABLE %s.%s", r.logsDB, r.logsLocalTable) err = r.db.Select(ctx, &statements, query) if err != nil { return nil, &model.ApiError{Err: err, Typ: model.ErrorInternal} @@ -3137,11 +3164,18 @@ func (r *ClickHouseReader) UpdateLogField(ctx context.Context, field *model.Upda // if the type is attribute or resource, create the materialized column first if field.Type == constants.Attributes || field.Type == constants.Resources { // create materialized - query := fmt.Sprintf("ALTER TABLE %s.%s ADD COLUMN IF NOT EXISTS %s %s MATERIALIZED %s_%s_value[indexOf(%s_%s_key, '%s')] CODEC(LZ4)", r.logsDB, r.logsTable, field.Name, field.DataType, field.Type, strings.ToLower(field.DataType), field.Type, strings.ToLower(field.DataType), field.Name) + query := fmt.Sprintf("ALTER TABLE %s.%s ON CLUSTER %s ADD COLUMN IF NOT EXISTS %s %s MATERIALIZED %s_%s_value[indexOf(%s_%s_key, '%s')] CODEC(LZ4)", r.logsDB, r.logsLocalTable, cluster, field.Name, field.DataType, field.Type, strings.ToLower(field.DataType), field.Type, strings.ToLower(field.DataType), field.Name) + err := r.db.Exec(ctx, query) if err != nil { return &model.ApiError{Err: err, Typ: model.ErrorInternal} } + + query = fmt.Sprintf("ALTER TABLE %s.%s ON CLUSTER %s ADD COLUMN IF NOT EXISTS %s %s", r.logsDB, r.logsTable, cluster, field.Name, field.DataType) + err = r.db.Exec(ctx, query) + if err != nil { + return &model.ApiError{Err: err, Typ: model.ErrorInternal} + } } // create the index @@ -3151,14 +3185,15 @@ func (r *ClickHouseReader) UpdateLogField(ctx context.Context, field *model.Upda if field.IndexGranularity == 0 { field.IndexGranularity = constants.DefaultLogSkipIndexGranularity } - query := fmt.Sprintf("ALTER TABLE %s.%s ADD INDEX IF NOT EXISTS %s_idx (%s) TYPE %s GRANULARITY %d", r.logsDB, r.logsTable, field.Name, field.Name, field.IndexType, field.IndexGranularity) + query := fmt.Sprintf("ALTER TABLE %s.%s ON CLUSTER %s ADD INDEX IF NOT EXISTS %s_idx (%s) TYPE %s GRANULARITY %d", r.logsDB, r.logsLocalTable, cluster, field.Name, field.Name, field.IndexType, field.IndexGranularity) err := r.db.Exec(ctx, query) if err != nil { return &model.ApiError{Err: err, Typ: model.ErrorInternal} } + } else { // remove index - query := fmt.Sprintf("ALTER TABLE %s.%s DROP INDEX IF EXISTS %s_idx", r.logsDB, r.logsTable, field.Name) + query := fmt.Sprintf("ALTER TABLE %s.%s ON CLUSTER %s DROP INDEX IF EXISTS %s_idx", r.logsDB, r.logsLocalTable, cluster, field.Name) err := r.db.Exec(ctx, query) if err != nil { return &model.ApiError{Err: err, Typ: model.ErrorInternal} diff --git a/pkg/query-service/app/metrics/query_builder.go b/pkg/query-service/app/metrics/query_builder.go index 071a32baa3..454482a60b 100644 --- a/pkg/query-service/app/metrics/query_builder.go +++ b/pkg/query-service/app/metrics/query_builder.go @@ -166,7 +166,7 @@ func BuildMetricQuery(qp *model.QueryRangeParamsV2, mq *model.MetricQuery, table " toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL %d SECOND) as ts," + " %s as value" + " FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_TABLENAME + - " INNER JOIN" + + " GLOBAL INNER JOIN" + " (%s) as filtered_time_series" + " USING fingerprint" + " WHERE " + samplesTableTimeFilter + @@ -228,7 +228,7 @@ func BuildMetricQuery(qp *model.QueryRangeParamsV2, mq *model.MetricQuery, table " toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL %d SECOND) as ts," + " any(value) as value" + " FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_TABLENAME + - " INNER JOIN" + + " GLOBAL INNER JOIN" + " (%s) as filtered_time_series" + " USING fingerprint" + " WHERE " + samplesTableTimeFilter + @@ -371,7 +371,7 @@ func expressionToQuery(qp *model.QueryRangeParamsV2, varToQuery map[string]strin joinUsing = strings.Join(groupTags, ",") formulaSubQuery += fmt.Sprintf("(%s) as %s ", query, var_) if idx < len(vars)-1 { - formulaSubQuery += "INNER JOIN" + formulaSubQuery += "GLOBAL INNER JOIN" } else if len(vars) > 1 { formulaSubQuery += fmt.Sprintf("USING (%s)", joinUsing) } diff --git a/pkg/query-service/constants/constants.go b/pkg/query-service/constants/constants.go index 24690e542a..7a266cb2fc 100644 --- a/pkg/query-service/constants/constants.go +++ b/pkg/query-service/constants/constants.go @@ -111,8 +111,8 @@ const ( ) const ( SIGNOZ_METRIC_DBNAME = "signoz_metrics" - SIGNOZ_SAMPLES_TABLENAME = "samples_v2" - SIGNOZ_TIMESERIES_TABLENAME = "time_series_v2" + SIGNOZ_SAMPLES_TABLENAME = "distributed_samples_v2" + SIGNOZ_TIMESERIES_TABLENAME = "distributed_time_series_v2" ) var TimeoutExcludedRoutes = map[string]bool{