diff --git a/pkg/telemetrymetadata/stmt_parse.go b/pkg/telemetrymetadata/stmt_parse.go new file mode 100644 index 0000000000..f453665fed --- /dev/null +++ b/pkg/telemetrymetadata/stmt_parse.go @@ -0,0 +1,132 @@ +package telemetrymetadata + +import ( + "strings" + + "github.com/AfterShip/clickhouse-sql-parser/parser" + "github.com/SigNoz/signoz/pkg/types/telemetrytypes" +) + +// TelemetryFieldVisitor is an AST visitor for extracting telemetry fields +type TelemetryFieldVisitor struct { + parser.DefaultASTVisitor + Fields []*telemetrytypes.TelemetryFieldKey +} + +func NewTelemetryFieldVisitor() *TelemetryFieldVisitor { + return &TelemetryFieldVisitor{ + Fields: make([]*telemetrytypes.TelemetryFieldKey, 0), + } +} + +// VisitColumnDef is called when visiting a column definition +func (v *TelemetryFieldVisitor) VisitColumnDef(expr *parser.ColumnDef) error { + // Check if this is a materialized column with DEFAULT expression + if expr.DefaultExpr == nil { + return nil + } + + // Parse column name to extract context and data type + columnName := expr.Name.String() + + // Remove backticks if present + columnName = strings.TrimPrefix(columnName, "`") + columnName = strings.TrimSuffix(columnName, "`") + + // Parse the column name to extract components + parts := strings.Split(columnName, "_") + if len(parts) < 2 { + return nil + } + + context := parts[0] + dataType := parts[1] + + // Check if this is a valid telemetry column + var fieldContext telemetrytypes.FieldContext + switch context { + case "resource": + fieldContext = telemetrytypes.FieldContextResource + case "scope": + fieldContext = telemetrytypes.FieldContextScope + case "attribute": + fieldContext = telemetrytypes.FieldContextAttribute + default: + return nil // Not a telemetry column + } + + // Check and convert data type + var fieldDataType telemetrytypes.FieldDataType + switch dataType { + case "string": + fieldDataType = telemetrytypes.FieldDataTypeString + case "bool": + fieldDataType = telemetrytypes.FieldDataTypeBool + case "int", "int64": + fieldDataType = telemetrytypes.FieldDataTypeFloat64 + case "float", "float64": + fieldDataType = telemetrytypes.FieldDataTypeFloat64 + case "number": + fieldDataType = telemetrytypes.FieldDataTypeFloat64 + default: + return nil // Unknown data type + } + + // Extract field name from the DEFAULT expression + // The DEFAULT expression should be something like: resources_string['k8s.cluster.name'] + // We need to extract the key inside the square brackets + defaultExprStr := expr.DefaultExpr.String() + + // Look for the pattern: map['key'] + startIdx := strings.Index(defaultExprStr, "['") + endIdx := strings.Index(defaultExprStr, "']") + + if startIdx == -1 || endIdx == -1 || startIdx+2 >= endIdx { + return nil // Invalid DEFAULT expression format + } + + fieldName := defaultExprStr[startIdx+2 : endIdx] + + // Create and store the TelemetryFieldKey + field := telemetrytypes.TelemetryFieldKey{ + Name: fieldName, + FieldContext: fieldContext, + FieldDataType: fieldDataType, + Materialized: true, + } + + v.Fields = append(v.Fields, &field) + return nil +} + +func ExtractFieldKeysFromTblStatement(statement string) ([]*telemetrytypes.TelemetryFieldKey, error) { + // Parse the CREATE TABLE statement using the ClickHouse parser + p := parser.NewParser(statement) + stmts, err := p.ParseStmts() + if err != nil { + return nil, err + } + + // Create a visitor to collect telemetry fields + visitor := NewTelemetryFieldVisitor() + + // Visit each statement + for _, stmt := range stmts { + // We're looking for CreateTable statements + createTable, ok := stmt.(*parser.CreateTable) + if !ok { + continue + } + + // Visit the table schema to extract column definitions + if createTable.TableSchema != nil { + for _, column := range createTable.TableSchema.Columns { + if err := column.Accept(visitor); err != nil { + return nil, err + } + } + } + } + + return visitor.Fields, nil +} diff --git a/pkg/telemetrymetadata/stmt_parse_test.go b/pkg/telemetrymetadata/stmt_parse_test.go new file mode 100644 index 0000000000..73f386e691 --- /dev/null +++ b/pkg/telemetrymetadata/stmt_parse_test.go @@ -0,0 +1,148 @@ +package telemetrymetadata + +import ( + "slices" + "testing" + + "github.com/SigNoz/signoz/pkg/types/telemetrytypes" +) + +func TestExtractFieldKeysFromTblStatement(t *testing.T) { + + var statement = `CREATE TABLE signoz_logs.logs_v2 + ( + ` + "`ts_bucket_start`" + ` UInt64 CODEC(DoubleDelta, LZ4), + ` + "`resource_fingerprint`" + ` String CODEC(ZSTD(1)), + ` + "`timestamp`" + ` UInt64 CODEC(DoubleDelta, LZ4), + ` + "`observed_timestamp`" + ` UInt64 CODEC(DoubleDelta, LZ4), + ` + "`id`" + ` String CODEC(ZSTD(1)), + ` + "`trace_id`" + ` String CODEC(ZSTD(1)), + ` + "`span_id`" + ` String CODEC(ZSTD(1)), + ` + "`trace_flags`" + ` UInt32, + ` + "`severity_text`" + ` LowCardinality(String) CODEC(ZSTD(1)), + ` + "`severity_number`" + ` UInt8, + ` + "`body`" + ` String CODEC(ZSTD(2)), + ` + "`attributes_string`" + ` Map(LowCardinality(String), String) CODEC(ZSTD(1)), + ` + "`attributes_number`" + ` Map(LowCardinality(String), Float64) CODEC(ZSTD(1)), + ` + "`attributes_bool`" + ` Map(LowCardinality(String), Bool) CODEC(ZSTD(1)), + ` + "`resources_string`" + ` Map(LowCardinality(String), String) CODEC(ZSTD(1)), + ` + "`scope_name`" + ` String CODEC(ZSTD(1)), + ` + "`scope_version`" + ` String CODEC(ZSTD(1)), + ` + "`scope_string`" + ` Map(LowCardinality(String), String) CODEC(ZSTD(1)), + ` + "`attribute_number_input_size`" + ` Int64 DEFAULT attributes_number['input_size'] CODEC(ZSTD(1)), + ` + "`attribute_number_input_size_exists`" + ` Bool DEFAULT if(mapContains(attributes_number, 'input_size') != 0, true, false) CODEC(ZSTD(1)), + ` + "`attribute_string_log$$iostream`" + ` String DEFAULT attributes_string['log.iostream'] CODEC(ZSTD(1)), + ` + "`attribute_string_log$$iostream_exists`" + ` Bool DEFAULT if(mapContains(attributes_string, 'log.iostream') != 0, true, false) CODEC(ZSTD(1)), + ` + "`attribute_string_log$$file$$path`" + ` String DEFAULT attributes_string['log.file.path'] CODEC(ZSTD(1)), + ` + "`attribute_string_log$$file$$path_exists`" + ` Bool DEFAULT if(mapContains(attributes_string, 'log.file.path') != 0, true, false) CODEC(ZSTD(1)), + ` + "`resource_string_k8s$$cluster$$name`" + ` String DEFAULT resources_string['k8s.cluster.name'] CODEC(ZSTD(1)), + ` + "`resource_string_k8s$$cluster$$name_exists`" + ` Bool DEFAULT if(mapContains(resources_string, 'k8s.cluster.name') != 0, true, false) CODEC(ZSTD(1)), + ` + "`resource_string_k8s$$namespace$$name`" + ` String DEFAULT resources_string['k8s.namespace.name'] CODEC(ZSTD(1)), + ` + "`resource_string_k8s$$namespace$$name_exists`" + ` Bool DEFAULT if(mapContains(resources_string, 'k8s.namespace.name') != 0, true, false) CODEC(ZSTD(1)), + ` + "`resource_string_k8s$$pod$$name`" + ` String DEFAULT resources_string['k8s.pod.name'] CODEC(ZSTD(1)), + ` + "`resource_string_k8s$$pod$$name_exists`" + ` Bool DEFAULT if(mapContains(resources_string, 'k8s.pod.name') != 0, true, false) CODEC(ZSTD(1)), + ` + "`resource_string_k8s$$node$$name`" + ` String DEFAULT resources_string['k8s.node.name'] CODEC(ZSTD(1)), + ` + "`resource_string_k8s$$node$$name_exists`" + ` Bool DEFAULT if(mapContains(resources_string, 'k8s.node.name') != 0, true, false) CODEC(ZSTD(1)), + ` + "`resource_string_k8s$$container$$name`" + ` String DEFAULT resources_string['k8s.container.name'] CODEC(ZSTD(1)), + ` + "`resource_string_k8s$$container$$name_exists`" + ` Bool DEFAULT if(mapContains(resources_string, 'k8s.container.name') != 0, true, false) CODEC(ZSTD(1)), + ` + "`resource_string_k8s$$deployment$$name`" + ` String DEFAULT resources_string['k8s.deployment.name'] CODEC(ZSTD(1)), + ` + "`resource_string_k8s$$deployment$$name_exists`" + ` Bool DEFAULT if(mapContains(resources_string, 'k8s.deployment.name') != 0, true, false) CODEC(ZSTD(1)), + ` + "`attribute_string_processor`" + ` String DEFAULT attributes_string['processor'] CODEC(ZSTD(1)), + ` + "`attribute_string_processor_exists`" + ` Bool DEFAULT if(mapContains(attributes_string, 'processor') != 0, true, false) CODEC(ZSTD(1)), + INDEX body_idx lower(body) TYPE ngrambf_v1(4, 60000, 5, 0) GRANULARITY 1, + INDEX id_minmax id TYPE minmax GRANULARITY 1, + INDEX severity_number_idx severity_number TYPE set(25) GRANULARITY 4, + INDEX severity_text_idx severity_text TYPE set(25) GRANULARITY 4, + INDEX trace_flags_idx trace_flags TYPE bloom_filter GRANULARITY 4, + INDEX scope_name_idx scope_name TYPE tokenbf_v1(10240, 3, 0) GRANULARITY 4, + INDEX ` + "`resource_string_k8s$$cluster$$name_idx`" + ` ` + "`resource_string_k8s$$cluster$$name`" + ` TYPE bloom_filter(0.01) GRANULARITY 64, + INDEX ` + "`resource_string_k8s$$namespace$$name_idx`" + ` ` + "`resource_string_k8s$$namespace$$name`" + ` TYPE bloom_filter(0.01) GRANULARITY 64, + INDEX ` + "`resource_string_k8s$$pod$$name_idx`" + ` ` + "`resource_string_k8s$$pod$$name`" + ` TYPE bloom_filter(0.01) GRANULARITY 64, + INDEX ` + "`resource_string_k8s$$node$$name_idx`" + ` ` + "`resource_string_k8s$$node$$name`" + ` TYPE bloom_filter(0.01) GRANULARITY 64, + INDEX ` + "`resource_string_k8s$$container$$name_idx`" + ` ` + "`resource_string_k8s$$container$$name`" + ` TYPE bloom_filter(0.01) GRANULARITY 64, + INDEX ` + "`resource_string_k8s$$deployment$$name_idx`" + ` ` + "`resource_string_k8s$$deployment$$name`" + ` TYPE bloom_filter(0.01) GRANULARITY 64, + INDEX attribute_string_processor_idx attribute_string_processor TYPE bloom_filter(0.01) GRANULARITY 64 + ) + ENGINE = ReplicatedMergeTree('/clickhouse/tables/{uuid}/{shard}', '{replica}') + PARTITION BY toDate(timestamp / 1000000000) + ORDER BY (ts_bucket_start, resource_fingerprint, severity_text, timestamp, id) + TTL toDateTime(timestamp / 1000000000) + toIntervalSecond(2592000) + SETTINGS ttl_only_drop_parts = 1, index_granularity = 8192` + + keys, err := ExtractFieldKeysFromTblStatement(statement) + if err != nil { + t.Fatalf("failed to extract field keys from tbl statement: %v", err) + } + + // some expected keys + expectedKeys := []*telemetrytypes.TelemetryFieldKey{ + { + Name: "k8s.pod.name", + FieldContext: telemetrytypes.FieldContextResource, + FieldDataType: telemetrytypes.FieldDataTypeString, + Materialized: true, + }, + { + Name: "k8s.cluster.name", + FieldContext: telemetrytypes.FieldContextResource, + FieldDataType: telemetrytypes.FieldDataTypeString, + Materialized: true, + }, + { + Name: "k8s.namespace.name", + FieldContext: telemetrytypes.FieldContextResource, + FieldDataType: telemetrytypes.FieldDataTypeString, + Materialized: true, + }, + { + Name: "k8s.deployment.name", + FieldContext: telemetrytypes.FieldContextResource, + FieldDataType: telemetrytypes.FieldDataTypeString, + Materialized: true, + }, + { + Name: "k8s.node.name", + FieldContext: telemetrytypes.FieldContextResource, + FieldDataType: telemetrytypes.FieldDataTypeString, + Materialized: true, + }, + { + Name: "k8s.container.name", + FieldContext: telemetrytypes.FieldContextResource, + FieldDataType: telemetrytypes.FieldDataTypeString, + Materialized: true, + }, + { + Name: "processor", + FieldContext: telemetrytypes.FieldContextAttribute, + FieldDataType: telemetrytypes.FieldDataTypeString, + Materialized: true, + }, + { + Name: "input_size", + FieldContext: telemetrytypes.FieldContextAttribute, + FieldDataType: telemetrytypes.FieldDataTypeFloat64, + Materialized: true, + }, + { + Name: "log.iostream", + FieldContext: telemetrytypes.FieldContextAttribute, + FieldDataType: telemetrytypes.FieldDataTypeString, + Materialized: true, + }, + { + Name: "log.file.path", + FieldContext: telemetrytypes.FieldContextAttribute, + FieldDataType: telemetrytypes.FieldDataTypeString, + Materialized: true, + }, + } + + for _, key := range expectedKeys { + if !slices.ContainsFunc(keys, func(k *telemetrytypes.TelemetryFieldKey) bool { + return k.Name == key.Name && k.FieldContext == key.FieldContext && k.FieldDataType == key.FieldDataType && k.Materialized == key.Materialized + }) { + t.Errorf("expected key %v not found", key) + } + } +}