Feat/distributed ch (#1701)

* feat: support for distributed table

Co-authored-by: Srikanth Chekuri <srikanth.chekuri92@gmail.com>
This commit is contained in:
Ankit Nayan 2022-12-02 12:30:28 +05:30 committed by GitHub
parent ab5311caac
commit 7f77bcca2b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 85 additions and 46 deletions

2
go.mod
View File

@ -149,4 +149,4 @@ require (
k8s.io/client-go v8.0.0+incompatible // indirect
)
replace github.com/prometheus/prometheus => github.com/SigNoz/prometheus v1.9.76
replace github.com/prometheus/prometheus => github.com/SigNoz/prometheus v1.9.77

4
go.sum
View File

@ -57,8 +57,8 @@ github.com/OneOfOne/xxhash v1.2.8 h1:31czK/TI9sNkxIKfaUfGlU47BAxQ0ztGgd9vPyqimf8
github.com/OneOfOne/xxhash v1.2.8/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q=
github.com/SigNoz/govaluate v0.0.0-20220522085550-d19c08c206cb h1:bneLSKPf9YUSFmafKx32bynV6QrzViL/s+ZDvQxH1E4=
github.com/SigNoz/govaluate v0.0.0-20220522085550-d19c08c206cb/go.mod h1:JznGDNg9x1cujDKa22RaQOimOvvEfy3nxzDGd8XDgmA=
github.com/SigNoz/prometheus v1.9.76 h1:YQOHezj4Yyu6PHV7/bVR297FQgUMQAAJtCVZ+NslwYk=
github.com/SigNoz/prometheus v1.9.76/go.mod h1:Y4J9tGDmacMC+EcOTp+EIAn2C1sN+9kE+idyVKadiVM=
github.com/SigNoz/prometheus v1.9.77 h1:We3pxbCnCRWQE7tSELBkcmneiXRtcpOlXrGtLPVQ1v0=
github.com/SigNoz/prometheus v1.9.77/go.mod h1:Y4J9tGDmacMC+EcOTp+EIAn2C1sN+9kE+idyVKadiVM=
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=

View File

@ -20,18 +20,19 @@ const (
const (
defaultDatasource string = "tcp://localhost:9000"
defaultTraceDB string = "signoz_traces"
defaultOperationsTable string = "signoz_operations"
defaultIndexTable string = "signoz_index_v2"
defaultErrorTable string = "signoz_error_index_v2"
defaultDurationTable string = "durationSortMV"
defaultUsageExplorerTable string = "usage_explorer"
defaultSpansTable string = "signoz_spans"
defaultDependencyGraphTable string = "dependency_graph_minutes"
defaultTopLevelOperationsTable string = "top_level_operations"
defaultOperationsTable string = "distributed_signoz_operations"
defaultIndexTable string = "distributed_signoz_index_v2"
defaultErrorTable string = "distributed_signoz_error_index_v2"
defaultDurationTable string = "distributed_durationSort"
defaultUsageExplorerTable string = "distributed_usage_explorer"
defaultSpansTable string = "distributed_signoz_spans"
defaultDependencyGraphTable string = "distributed_dependency_graph_minutes"
defaultTopLevelOperationsTable string = "distributed_top_level_operations"
defaultLogsDB string = "signoz_logs"
defaultLogsTable string = "logs"
defaultLogAttributeKeysTable string = "logs_atrribute_keys"
defaultLogResourceKeysTable string = "logs_resource_keys"
defaultLogsTable string = "distributed_logs"
defaultLogsLocalTable string = "logs"
defaultLogAttributeKeysTable string = "distributed_logs_atrribute_keys"
defaultLogResourceKeysTable string = "distributed_logs_resource_keys"
defaultLiveTailRefreshSeconds int = 10
defaultWriteBatchDelay time.Duration = 5 * time.Second
defaultWriteBatchSize int = 10000
@ -65,6 +66,7 @@ type namespaceConfig struct {
TopLevelOperationsTable string
LogsDB string
LogsTable string
LogsLocalTable string
LogsAttributeKeysTable string
LogsResourceKeysTable string
LiveTailRefreshSeconds int
@ -132,6 +134,7 @@ func NewOptions(datasource string, primaryNamespace string, otherNamespaces ...s
TopLevelOperationsTable: defaultTopLevelOperationsTable,
LogsDB: defaultLogsDB,
LogsTable: defaultLogsTable,
LogsLocalTable: defaultLogsLocalTable,
LogsAttributeKeysTable: defaultLogAttributeKeysTable,
LogsResourceKeysTable: defaultLogResourceKeysTable,
LiveTailRefreshSeconds: defaultLiveTailRefreshSeconds,
@ -154,6 +157,7 @@ func NewOptions(datasource string, primaryNamespace string, otherNamespaces ...s
ErrorTable: "",
LogsDB: "",
LogsTable: "",
LogsLocalTable: "",
LogsAttributeKeysTable: "",
LogsResourceKeysTable: "",
LiveTailRefreshSeconds: defaultLiveTailRefreshSeconds,

View File

@ -50,17 +50,20 @@ import (
)
const (
primaryNamespace = "clickhouse"
archiveNamespace = "clickhouse-archive"
signozTraceDBName = "signoz_traces"
signozDurationMVTable = "durationSort"
signozUsageExplorerTable = "usage_explorer"
signozSpansTable = "signoz_spans"
signozErrorIndexTable = "signoz_error_index_v2"
signozTraceTableName = "signoz_index_v2"
signozMetricDBName = "signoz_metrics"
signozSampleTableName = "samples_v2"
signozTSTableName = "time_series_v2"
cluster = "cluster"
primaryNamespace = "clickhouse"
archiveNamespace = "clickhouse-archive"
signozTraceDBName = "signoz_traces"
signozDurationMVTable = "distributed_durationSort"
signozUsageExplorerTable = "distributed_usage_explorer"
signozSpansTable = "distributed_signoz_spans"
signozErrorIndexTable = "distributed_signoz_error_index_v2"
signozTraceTableName = "distributed_signoz_index_v2"
signozTraceLocalTableName = "signoz_index_v2"
signozMetricDBName = "signoz_metrics"
signozSampleLocalTableName = "samples_v2"
signozSampleTableName = "distributed_samples_v2"
signozTSTableName = "distributed_time_series_v2"
minTimespanForProgressiveSearch = time.Hour
minTimespanForProgressiveSearchMargin = time.Minute
@ -92,6 +95,7 @@ type ClickHouseReader struct {
topLevelOperationsTable string
logsDB string
logsTable string
logsLocalTable string
logsAttributeKeys string
logsResourceKeys string
queryEngine *promql.Engine
@ -140,6 +144,7 @@ func NewReader(localDB *sqlx.DB, configFile string, featureFlag interfaces.Featu
topLevelOperationsTable: options.primary.TopLevelOperationsTable,
logsDB: options.primary.LogsDB,
logsTable: options.primary.LogsTable,
logsLocalTable: options.primary.LogsLocalTable,
logsAttributeKeys: options.primary.LogsAttributeKeysTable,
logsResourceKeys: options.primary.LogsResourceKeysTable,
liveTailRefreshSeconds: options.primary.LiveTailRefreshSeconds,
@ -2036,6 +2041,13 @@ func (r *ClickHouseReader) GetFilteredSpansAggregates(ctx context.Context, query
return &GetFilteredSpansAggregatesResponse, nil
}
func getLocalTableName(tableName string) string {
tableNameSplit := strings.Split(tableName, ".")
return tableNameSplit[0] + "." + strings.Split(tableNameSplit[1], "distributed_")[1]
}
// SetTTL sets the TTL for traces or metrics or logs tables.
// This is an async API which creates goroutines to set TTL.
// Status of TTL update is tracked with ttl_status table in sqlite db.
@ -2057,6 +2069,7 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context,
case constants.TraceTTL:
tableNameArray := []string{signozTraceDBName + "." + signozTraceTableName, signozTraceDBName + "." + signozDurationMVTable, signozTraceDBName + "." + signozSpansTable, signozTraceDBName + "." + signozErrorIndexTable, signozTraceDBName + "." + signozUsageExplorerTable, signozTraceDBName + "." + defaultDependencyGraphTable}
for _, tableName = range tableNameArray {
tableName = getLocalTableName(tableName)
statusItem, err := r.checkTTLStatusItem(ctx, tableName)
if err != nil {
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("Error in processing ttl_status check sql query")}
@ -2066,6 +2079,7 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context,
}
}
for _, tableName := range tableNameArray {
tableName = getLocalTableName(tableName)
// TODO: DB queries should be implemented with transactional statements but currently clickhouse doesn't support them. Issue: https://github.com/ClickHouse/ClickHouse/issues/22086
go func(tableName string) {
_, dbErr := r.localDB.Exec("INSERT INTO ttl_status (transaction_id, created_at, updated_at, table_name, ttl, status, cold_storage_ttl) VALUES (?, ?, ?, ?, ?, ?, ?)", uuid, time.Now(), time.Now(), tableName, params.DelDuration, constants.StatusPending, coldStorageDuration)
@ -2074,8 +2088,8 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context,
return
}
req = fmt.Sprintf(
"ALTER TABLE %v MODIFY TTL toDateTime(timestamp) + INTERVAL %v SECOND DELETE",
tableName, params.DelDuration)
"ALTER TABLE %v ON CLUSTER %s MODIFY TTL toDateTime(timestamp) + INTERVAL %v SECOND DELETE",
tableName, cluster, params.DelDuration)
if len(params.ColdStorageVolume) > 0 {
req += fmt.Sprintf(", toDateTime(timestamp) + INTERVAL %v SECOND TO VOLUME '%s'",
params.ToColdStorageDuration, params.ColdStorageVolume)
@ -2113,7 +2127,7 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context,
}
case constants.MetricsTTL:
tableName = signozMetricDBName + "." + signozSampleTableName
tableName = signozMetricDBName + "." + signozSampleLocalTableName
statusItem, err := r.checkTTLStatusItem(ctx, tableName)
if err != nil {
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("Error in processing ttl_status check sql query")}
@ -2128,8 +2142,8 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context,
return
}
req = fmt.Sprintf(
"ALTER TABLE %v MODIFY TTL toDateTime(toUInt32(timestamp_ms / 1000), 'UTC') + "+
"INTERVAL %v SECOND DELETE", tableName, params.DelDuration)
"ALTER TABLE %v ON CLUSTER %s MODIFY TTL toDateTime(toUInt32(timestamp_ms / 1000), 'UTC') + "+
"INTERVAL %v SECOND DELETE", tableName, cluster, params.DelDuration)
if len(params.ColdStorageVolume) > 0 {
req += fmt.Sprintf(", toDateTime(toUInt32(timestamp_ms / 1000), 'UTC')"+
" + INTERVAL %v SECOND TO VOLUME '%s'",
@ -2166,7 +2180,7 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context,
}
}(tableName)
case constants.LogsTTL:
tableName = r.logsDB + "." + r.logsTable
tableName = r.logsDB + "." + r.logsLocalTable
statusItem, err := r.checkTTLStatusItem(ctx, tableName)
if err != nil {
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")}
@ -2181,8 +2195,8 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context,
return
}
req = fmt.Sprintf(
"ALTER TABLE %v MODIFY TTL toDateTime(timestamp / 1000000000) + "+
"INTERVAL %v SECOND DELETE", tableName, params.DelDuration)
"ALTER TABLE %v ON CLUSTER %s MODIFY TTL toDateTime(timestamp / 1000000000) + "+
"INTERVAL %v SECOND DELETE", tableName, cluster, params.DelDuration)
if len(params.ColdStorageVolume) > 0 {
req += fmt.Sprintf(", toDateTime(timestamp / 1000000000)"+
" + INTERVAL %v SECOND TO VOLUME '%s'",
@ -2287,7 +2301,7 @@ func (r *ClickHouseReader) setColdStorage(ctx context.Context, tableName string,
// Set the storage policy for the required table. If it is already set, then setting it again
// will not a problem.
if len(coldStorageVolume) > 0 {
policyReq := fmt.Sprintf("ALTER TABLE %s MODIFY SETTING storage_policy='tiered'", tableName)
policyReq := fmt.Sprintf("ALTER TABLE %s ON CLUSTER %s MODIFY SETTING storage_policy='tiered'", tableName, cluster)
zap.S().Debugf("Executing Storage policy request: %s\n", policyReq)
if err := r.db.Exec(ctx, policyReq); err != nil {
@ -2313,6 +2327,15 @@ func (r *ClickHouseReader) GetDisks(ctx context.Context) (*[]model.DiskItem, *mo
return &diskItems, nil
}
func getLocalTableNameArray(tableNames []string) []string {
var localTableNames []string
for _, name := range tableNames {
tableNameSplit := strings.Split(name, ".")
localTableNames = append(localTableNames, tableNameSplit[0]+"."+strings.Split(tableNameSplit[1], "distributed_")[1])
}
return localTableNames
}
// GetTTL returns current ttl, expected ttl and past setTTL status for metrics/traces.
func (r *ClickHouseReader) GetTTL(ctx context.Context, ttlParams *model.GetTTLParams) (*model.GetTTLResponseItem, *model.ApiError) {
@ -2348,7 +2371,7 @@ func (r *ClickHouseReader) GetTTL(ctx context.Context, ttlParams *model.GetTTLPa
getMetricsTTL := func() (*model.DBResponseTTL, *model.ApiError) {
var dbResp []model.DBResponseTTL
query := fmt.Sprintf("SELECT engine_full FROM system.tables WHERE name='%v'", signozSampleTableName)
query := fmt.Sprintf("SELECT engine_full FROM system.tables WHERE name='%v'", signozSampleLocalTableName)
err := r.db.Select(ctx, &dbResp, query)
@ -2366,7 +2389,7 @@ func (r *ClickHouseReader) GetTTL(ctx context.Context, ttlParams *model.GetTTLPa
getTracesTTL := func() (*model.DBResponseTTL, *model.ApiError) {
var dbResp []model.DBResponseTTL
query := fmt.Sprintf("SELECT engine_full FROM system.tables WHERE name='%v' AND database='%v'", signozTraceTableName, signozTraceDBName)
query := fmt.Sprintf("SELECT engine_full FROM system.tables WHERE name='%v' AND database='%v'", signozTraceLocalTableName, signozTraceDBName)
err := r.db.Select(ctx, &dbResp, query)
@ -2384,7 +2407,7 @@ func (r *ClickHouseReader) GetTTL(ctx context.Context, ttlParams *model.GetTTLPa
getLogsTTL := func() (*model.DBResponseTTL, *model.ApiError) {
var dbResp []model.DBResponseTTL
query := fmt.Sprintf("SELECT engine_full FROM system.tables WHERE name='%v' AND database='%v'", r.logsTable, r.logsDB)
query := fmt.Sprintf("SELECT engine_full FROM system.tables WHERE name='%v' AND database='%v'", r.logsLocalTable, r.logsDB)
err := r.db.Select(ctx, &dbResp, query)
@ -2402,6 +2425,8 @@ func (r *ClickHouseReader) GetTTL(ctx context.Context, ttlParams *model.GetTTLPa
switch ttlParams.Type {
case constants.TraceTTL:
tableNameArray := []string{signozTraceDBName + "." + signozTraceTableName, signozTraceDBName + "." + signozDurationMVTable, signozTraceDBName + "." + signozSpansTable, signozTraceDBName + "." + signozErrorIndexTable, signozTraceDBName + "." + signozUsageExplorerTable, signozTraceDBName + "." + defaultDependencyGraphTable}
tableNameArray = getLocalTableNameArray(tableNameArray)
status, err := r.setTTLQueryStatus(ctx, tableNameArray)
if err != nil {
return nil, err
@ -2424,6 +2449,7 @@ func (r *ClickHouseReader) GetTTL(ctx context.Context, ttlParams *model.GetTTLPa
case constants.MetricsTTL:
tableNameArray := []string{signozMetricDBName + "." + signozSampleTableName}
tableNameArray = getLocalTableNameArray(tableNameArray)
status, err := r.setTTLQueryStatus(ctx, tableNameArray)
if err != nil {
return nil, err
@ -2446,6 +2472,7 @@ func (r *ClickHouseReader) GetTTL(ctx context.Context, ttlParams *model.GetTTLPa
case constants.LogsTTL:
tableNameArray := []string{r.logsDB + "." + r.logsTable}
tableNameArray = getLocalTableNameArray(tableNameArray)
status, err := r.setTTLQueryStatus(ctx, tableNameArray)
if err != nil {
return nil, err
@ -3107,7 +3134,7 @@ func (r *ClickHouseReader) GetLogFields(ctx context.Context) (*model.GetFieldsRe
}
statements := []model.ShowCreateTableStatement{}
query = fmt.Sprintf("SHOW CREATE TABLE %s.%s", r.logsDB, r.logsTable)
query = fmt.Sprintf("SHOW CREATE TABLE %s.%s", r.logsDB, r.logsLocalTable)
err = r.db.Select(ctx, &statements, query)
if err != nil {
return nil, &model.ApiError{Err: err, Typ: model.ErrorInternal}
@ -3137,11 +3164,18 @@ func (r *ClickHouseReader) UpdateLogField(ctx context.Context, field *model.Upda
// if the type is attribute or resource, create the materialized column first
if field.Type == constants.Attributes || field.Type == constants.Resources {
// create materialized
query := fmt.Sprintf("ALTER TABLE %s.%s ADD COLUMN IF NOT EXISTS %s %s MATERIALIZED %s_%s_value[indexOf(%s_%s_key, '%s')] CODEC(LZ4)", r.logsDB, r.logsTable, field.Name, field.DataType, field.Type, strings.ToLower(field.DataType), field.Type, strings.ToLower(field.DataType), field.Name)
query := fmt.Sprintf("ALTER TABLE %s.%s ON CLUSTER %s ADD COLUMN IF NOT EXISTS %s %s MATERIALIZED %s_%s_value[indexOf(%s_%s_key, '%s')] CODEC(LZ4)", r.logsDB, r.logsLocalTable, cluster, field.Name, field.DataType, field.Type, strings.ToLower(field.DataType), field.Type, strings.ToLower(field.DataType), field.Name)
err := r.db.Exec(ctx, query)
if err != nil {
return &model.ApiError{Err: err, Typ: model.ErrorInternal}
}
query = fmt.Sprintf("ALTER TABLE %s.%s ON CLUSTER %s ADD COLUMN IF NOT EXISTS %s %s", r.logsDB, r.logsTable, cluster, field.Name, field.DataType)
err = r.db.Exec(ctx, query)
if err != nil {
return &model.ApiError{Err: err, Typ: model.ErrorInternal}
}
}
// create the index
@ -3151,14 +3185,15 @@ func (r *ClickHouseReader) UpdateLogField(ctx context.Context, field *model.Upda
if field.IndexGranularity == 0 {
field.IndexGranularity = constants.DefaultLogSkipIndexGranularity
}
query := fmt.Sprintf("ALTER TABLE %s.%s ADD INDEX IF NOT EXISTS %s_idx (%s) TYPE %s GRANULARITY %d", r.logsDB, r.logsTable, field.Name, field.Name, field.IndexType, field.IndexGranularity)
query := fmt.Sprintf("ALTER TABLE %s.%s ON CLUSTER %s ADD INDEX IF NOT EXISTS %s_idx (%s) TYPE %s GRANULARITY %d", r.logsDB, r.logsLocalTable, cluster, field.Name, field.Name, field.IndexType, field.IndexGranularity)
err := r.db.Exec(ctx, query)
if err != nil {
return &model.ApiError{Err: err, Typ: model.ErrorInternal}
}
} else {
// remove index
query := fmt.Sprintf("ALTER TABLE %s.%s DROP INDEX IF EXISTS %s_idx", r.logsDB, r.logsTable, field.Name)
query := fmt.Sprintf("ALTER TABLE %s.%s ON CLUSTER %s DROP INDEX IF EXISTS %s_idx", r.logsDB, r.logsLocalTable, cluster, field.Name)
err := r.db.Exec(ctx, query)
if err != nil {
return &model.ApiError{Err: err, Typ: model.ErrorInternal}

View File

@ -166,7 +166,7 @@ func BuildMetricQuery(qp *model.QueryRangeParamsV2, mq *model.MetricQuery, table
" toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL %d SECOND) as ts," +
" %s as value" +
" FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_TABLENAME +
" INNER JOIN" +
" GLOBAL INNER JOIN" +
" (%s) as filtered_time_series" +
" USING fingerprint" +
" WHERE " + samplesTableTimeFilter +
@ -228,7 +228,7 @@ func BuildMetricQuery(qp *model.QueryRangeParamsV2, mq *model.MetricQuery, table
" toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL %d SECOND) as ts," +
" any(value) as value" +
" FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_TABLENAME +
" INNER JOIN" +
" GLOBAL INNER JOIN" +
" (%s) as filtered_time_series" +
" USING fingerprint" +
" WHERE " + samplesTableTimeFilter +
@ -371,7 +371,7 @@ func expressionToQuery(qp *model.QueryRangeParamsV2, varToQuery map[string]strin
joinUsing = strings.Join(groupTags, ",")
formulaSubQuery += fmt.Sprintf("(%s) as %s ", query, var_)
if idx < len(vars)-1 {
formulaSubQuery += "INNER JOIN"
formulaSubQuery += "GLOBAL INNER JOIN"
} else if len(vars) > 1 {
formulaSubQuery += fmt.Sprintf("USING (%s)", joinUsing)
}

View File

@ -111,8 +111,8 @@ const (
)
const (
SIGNOZ_METRIC_DBNAME = "signoz_metrics"
SIGNOZ_SAMPLES_TABLENAME = "samples_v2"
SIGNOZ_TIMESERIES_TABLENAME = "time_series_v2"
SIGNOZ_SAMPLES_TABLENAME = "distributed_samples_v2"
SIGNOZ_TIMESERIES_TABLENAME = "distributed_time_series_v2"
)
var TimeoutExcludedRoutes = map[string]bool{