diff --git a/pkg/query-service/app/clickhouseReader/options.go b/pkg/query-service/app/clickhouseReader/options.go index 56dc36ff98..ebbee2a7b8 100644 --- a/pkg/query-service/app/clickhouseReader/options.go +++ b/pkg/query-service/app/clickhouseReader/options.go @@ -33,6 +33,7 @@ const ( defaultLogsLocalTable string = "logs" defaultLogAttributeKeysTable string = "distributed_logs_attribute_keys" defaultLogResourceKeysTable string = "distributed_logs_resource_keys" + defaultLogTagAttributeTable string = "distributed_tag_attributes" defaultLiveTailRefreshSeconds int = 10 defaultWriteBatchDelay time.Duration = 5 * time.Second defaultWriteBatchSize int = 10000 @@ -69,6 +70,7 @@ type namespaceConfig struct { LogsLocalTable string LogsAttributeKeysTable string LogsResourceKeysTable string + LogsTagAttributeTable string LiveTailRefreshSeconds int WriteBatchDelay time.Duration WriteBatchSize int @@ -137,6 +139,7 @@ func NewOptions(datasource string, primaryNamespace string, otherNamespaces ...s LogsLocalTable: defaultLogsLocalTable, LogsAttributeKeysTable: defaultLogAttributeKeysTable, LogsResourceKeysTable: defaultLogResourceKeysTable, + LogsTagAttributeTable: defaultLogTagAttributeTable, LiveTailRefreshSeconds: defaultLiveTailRefreshSeconds, WriteBatchDelay: defaultWriteBatchDelay, WriteBatchSize: defaultWriteBatchSize, diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index abfa0c8dfa..50fc30283f 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -3,6 +3,7 @@ package clickhouseReader import ( "bytes" "context" + "database/sql" "encoding/json" "fmt" @@ -101,6 +102,7 @@ type ClickHouseReader struct { logsLocalTable string logsAttributeKeys string logsResourceKeys string + logsTagAttributeTable string queryEngine *promql.Engine remoteStorage *remote.Storage fanoutStorage *storage.Storage @@ -150,6 +152,7 @@ func NewReader(localDB *sqlx.DB, configFile string, featureFlag interfaces.Featu logsLocalTable: options.primary.LogsLocalTable, logsAttributeKeys: options.primary.LogsAttributeKeysTable, logsResourceKeys: options.primary.LogsResourceKeysTable, + logsTagAttributeTable: options.primary.LogsTagAttributeTable, liveTailRefreshSeconds: options.primary.LiveTailRefreshSeconds, promConfigFile: configFile, featureFlags: featureFlag, @@ -3385,7 +3388,7 @@ func (r *ClickHouseReader) GetLogFields(ctx context.Context) (*model.GetFieldsRe func extractSelectedAndInterestingFields(tableStatement string, fieldType string, fields *[]model.LogField, response *model.GetFieldsResponse) { for _, field := range *fields { field.Type = fieldType - if strings.Contains(tableStatement, fmt.Sprintf("INDEX %s_idx", field.Name)) { + if isSelectedField(tableStatement, field.Name) { response.Selected = append(response.Selected, field) } else { response.Interesting = append(response.Interesting, field) @@ -3393,6 +3396,10 @@ func extractSelectedAndInterestingFields(tableStatement string, fieldType string } } +func isSelectedField(tableStatement, field string) bool { + return strings.Contains(tableStatement, fmt.Sprintf("INDEX %s_idx", field)) +} + func (r *ClickHouseReader) UpdateLogField(ctx context.Context, field *model.UpdateField) *model.ApiError { // if a field is selected it means that the field needs to be indexed if field.Selected { @@ -3706,7 +3713,7 @@ func (r *ClickHouseReader) GetMetricAggregateAttributes(ctx context.Context, req } key := v3.AttributeKey{ Key: metricName, - DataType: v3.AttributeKeyDataTypeNumber, + DataType: v3.AttributeKeyDataTypeFloat64, Type: v3.AttributeKeyTypeTag, } response.AttributeKeys = append(response.AttributeKeys, key) @@ -3782,6 +3789,243 @@ func (r *ClickHouseReader) GetMetricAttributeValues(ctx context.Context, req *v3 return &attributeValues, nil } +func isColumn(tableStatement, field string) bool { + return strings.Contains(tableStatement, fmt.Sprintf("`%s` ", field)) +} + +func (r *ClickHouseReader) GetLogAggregateAttributes(ctx context.Context, req *v3.AggregateAttributeRequest) (*v3.AggregateAttributeResponse, error) { + + var query string + var err error + var rows driver.Rows + var response v3.AggregateAttributeResponse + + where := "" + switch req.Operator { + case v3.AggregateOperatorCountDistinct: + where = "tagKey ILIKE $1" + case + v3.AggregateOperatorRateSum, + v3.AggregateOperatorRateMax, + v3.AggregateOperatorRateAvg, + v3.AggregateOperatorRate, + v3.AggregateOperatorRateMin, + v3.AggregateOperatorP05, + v3.AggregateOperatorP10, + v3.AggregateOperatorP20, + v3.AggregateOperatorP25, + v3.AggregateOperatorP50, + v3.AggregateOperatorP75, + v3.AggregateOperatorP90, + v3.AggregateOperatorP95, + v3.AggregateOperatorP99, + v3.AggregateOperatorAvg, + v3.AggregateOperatorSum, + v3.AggregateOperatorMin, + v3.AggregateOperatorMax: + where = "tagKey ILIKE $1 AND (tagDataType='int64' or tagDataType='float64')" + case + v3.AggregateOpeatorCount, + v3.AggregateOperatorNoOp: + return &v3.AggregateAttributeResponse{}, nil + default: + return nil, fmt.Errorf("unsupported aggregate operator") + } + + query = fmt.Sprintf("SELECT DISTINCT(tagKey), tagType, tagDataType from %s.%s WHERE %s limit $2", r.logsDB, r.logsTagAttributeTable, where) + rows, err = r.db.Query(ctx, query, fmt.Sprintf("%%%s%%", req.SearchText), req.Limit) + if err != nil { + zap.S().Error(err) + return nil, fmt.Errorf("error while executing query: %s", err.Error()) + } + defer rows.Close() + + statements := []model.ShowCreateTableStatement{} + query = fmt.Sprintf("SHOW CREATE TABLE %s.%s", r.logsDB, r.logsLocalTable) + err = r.db.Select(ctx, &statements, query) + if err != nil { + return nil, fmt.Errorf("error while fetching logs schema: %s", err.Error()) + } + + var tagKey string + var dataType string + var attType string + for rows.Next() { + if err := rows.Scan(&tagKey, &attType, &dataType); err != nil { + return nil, fmt.Errorf("error while scanning rows: %s", err.Error()) + } + key := v3.AttributeKey{ + Key: tagKey, + DataType: v3.AttributeKeyDataType(dataType), + Type: v3.AttributeKeyType(attType), + IsColumn: isColumn(statements[0].Statement, tagKey), + } + response.AttributeKeys = append(response.AttributeKeys, key) + } + // add other attributes + for _, f := range constants.StaticFieldsLogsV3 { + if len(req.SearchText) == 0 || strings.Contains(f.Key, req.SearchText) { + f.IsColumn = isColumn(statements[0].Statement, f.Key) + response.AttributeKeys = append(response.AttributeKeys, f) + } + } + + return &response, nil +} + +func (r *ClickHouseReader) GetLogAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) { + var query string + var err error + var rows driver.Rows + var response v3.FilterAttributeKeyResponse + + if len(req.SearchText) != 0 { + query = fmt.Sprintf("select distinct tagKey, tagType, tagDataType from %s.%s where tagKey ILIKE $1 limit $2", r.logsDB, r.logsTagAttributeTable) + rows, err = r.db.Query(ctx, query, fmt.Sprintf("%%%s%%", req.SearchText), req.Limit) + } else { + query = fmt.Sprintf("select distinct tagKey, tagType, tagDataType from %s.%s limit $1", r.logsDB, r.logsTagAttributeTable) + rows, err = r.db.Query(ctx, query, req.Limit) + } + + if err != nil { + zap.S().Error(err) + return nil, fmt.Errorf("error while executing query: %s", err.Error()) + } + defer rows.Close() + + statements := []model.ShowCreateTableStatement{} + query = fmt.Sprintf("SHOW CREATE TABLE %s.%s", r.logsDB, r.logsLocalTable) + err = r.db.Select(ctx, &statements, query) + if err != nil { + return nil, fmt.Errorf("error while fetching logs schema: %s", err.Error()) + } + + var attributeKey string + var attributeDataType string + var tagType string + for rows.Next() { + if err := rows.Scan(&attributeKey, &tagType, &attributeDataType); err != nil { + return nil, fmt.Errorf("error while scanning rows: %s", err.Error()) + } + + key := v3.AttributeKey{ + Key: attributeKey, + DataType: v3.AttributeKeyDataType(attributeDataType), + Type: v3.AttributeKeyType(tagType), + IsColumn: isColumn(statements[0].Statement, attributeKey), + } + + response.AttributeKeys = append(response.AttributeKeys, key) + } + + // add other attributes + for _, f := range constants.StaticFieldsLogsV3 { + if len(req.SearchText) == 0 || strings.Contains(f.Key, req.SearchText) { + f.IsColumn = isColumn(statements[0].Statement, f.Key) + response.AttributeKeys = append(response.AttributeKeys, f) + } + } + + return &response, nil +} + +func (r *ClickHouseReader) GetLogAttributeValues(ctx context.Context, req *v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error) { + var err error + var filterValueColumn string + var rows driver.Rows + var attributeValues v3.FilterAttributeValueResponse + + // if dataType or tagType is not present return empty response + if len(req.FilterAttributeKeyDataType) == 0 || len(req.TagType) == 0 || req.FilterAttributeKey == "body" { + return &v3.FilterAttributeValueResponse{}, nil + } + + // if data type is bool, return true and false + if req.FilterAttributeKeyDataType == v3.AttributeKeyDataTypeBool { + return &v3.FilterAttributeValueResponse{ + BoolAttributeValues: []bool{true, false}, + }, nil + } + + query := "select distinct" + switch req.FilterAttributeKeyDataType { + case v3.AttributeKeyDataTypeInt64: + filterValueColumn = "int64TagValue" + case v3.AttributeKeyDataTypeFloat64: + filterValueColumn = "float64TagValue" + case v3.AttributeKeyDataTypeString: + filterValueColumn = "stringTagValue" + } + + searchText := fmt.Sprintf("%%%s%%", req.SearchText) + + // check if the tagKey is a topLevelColumn + if _, ok := constants.LogsTopLevelColumnsV3[req.FilterAttributeKey]; ok { + // query the column for the last 48 hours + filterValueColumnWhere := req.FilterAttributeKey + selectKey := req.FilterAttributeKey + if req.FilterAttributeKeyDataType != v3.AttributeKeyDataTypeString { + filterValueColumnWhere = fmt.Sprintf("toString(%s)", req.FilterAttributeKey) + selectKey = fmt.Sprintf("toInt64(%s)", req.FilterAttributeKey) + } + + // prepare the query and run + if len(req.SearchText) != 0 { + query = fmt.Sprintf("select distinct %s from %s.%s where timestamp >= toInt64(toUnixTimestamp(now() - INTERVAL 48 HOUR)*1000000000) and %s ILIKE $1 limit $2", selectKey, r.logsDB, r.logsTable, filterValueColumnWhere) + rows, err = r.db.Query(ctx, query, searchText, req.Limit) + } else { + query = fmt.Sprintf("select distinct %s from %s.%s where timestamp >= toInt64(toUnixTimestamp(now() - INTERVAL 48 HOUR)*1000000000) limit $1", selectKey, r.logsDB, r.logsTable) + rows, err = r.db.Query(ctx, query, req.Limit) + } + } else if len(req.SearchText) != 0 { + filterValueColumnWhere := filterValueColumn + if req.FilterAttributeKeyDataType != v3.AttributeKeyDataTypeString { + filterValueColumnWhere = fmt.Sprintf("toString(%s)", filterValueColumn) + } + query = fmt.Sprintf("select distinct %s from %s.%s where tagKey=$1 and %s ILIKE $2 and tagType=$3 limit $4", filterValueColumn, r.logsDB, r.logsTagAttributeTable, filterValueColumnWhere) + rows, err = r.db.Query(ctx, query, req.FilterAttributeKey, searchText, req.TagType, req.Limit) + } else { + query = fmt.Sprintf("select distinct %s from %s.%s where tagKey=$1 and tagType=$2 limit $3", filterValueColumn, r.logsDB, r.logsTagAttributeTable) + rows, err = r.db.Query(ctx, query, req.FilterAttributeKey, req.TagType, req.Limit) + } + + if err != nil { + zap.S().Error(err) + return nil, fmt.Errorf("error while executing query: %s", err.Error()) + } + defer rows.Close() + + var strAttributeValue string + var float64AttributeValue sql.NullFloat64 + var int64AttributeValue sql.NullInt64 + for rows.Next() { + switch req.FilterAttributeKeyDataType { + case v3.AttributeKeyDataTypeInt64: + if err := rows.Scan(&int64AttributeValue); err != nil { + return nil, fmt.Errorf("error while scanning rows: %s", err.Error()) + } + if int64AttributeValue.Valid { + attributeValues.NumberAttributeValues = append(attributeValues.NumberAttributeValues, int64AttributeValue.Int64) + } + case v3.AttributeKeyDataTypeFloat64: + if err := rows.Scan(&float64AttributeValue); err != nil { + return nil, fmt.Errorf("error while scanning rows: %s", err.Error()) + } + if float64AttributeValue.Valid { + attributeValues.NumberAttributeValues = append(attributeValues.NumberAttributeValues, float64AttributeValue.Float64) + } + case v3.AttributeKeyDataTypeString: + if err := rows.Scan(&strAttributeValue); err != nil { + return nil, fmt.Errorf("error while scanning rows: %s", err.Error()) + } + attributeValues.StringAttributeValues = append(attributeValues.StringAttributeValues, strAttributeValue) + } + } + + return &attributeValues, nil + +} + func readRow(vars []interface{}, columnNames []string) ([]string, map[string]string, v3.Point) { // Each row will have a value and a timestamp, and an optional list of label values // example: {Timestamp: ..., Value: ...} diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 4778afa723..841986c9f2 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -2369,7 +2369,7 @@ func (aH *APIHandler) autocompleteAggregateAttributes(w http.ResponseWriter, r * case v3.DataSourceMetrics: response, err = aH.reader.GetMetricAggregateAttributes(r.Context(), req) case v3.DataSourceLogs: - // TODO: implement + response, err = aH.reader.GetLogAggregateAttributes(r.Context(), req) case v3.DataSourceTraces: // TODO: implement default: @@ -2398,7 +2398,7 @@ func (aH *APIHandler) autoCompleteAttributeKeys(w http.ResponseWriter, r *http.R case v3.DataSourceMetrics: response, err = aH.reader.GetMetricAttributeKeys(r.Context(), req) case v3.DataSourceLogs: - // TODO: implement + response, err = aH.reader.GetLogAttributeKeys(r.Context(), req) case v3.DataSourceTraces: // TODO: implement default: @@ -2427,7 +2427,7 @@ func (aH *APIHandler) autoCompleteAttributeValues(w http.ResponseWriter, r *http case v3.DataSourceMetrics: response, err = aH.reader.GetMetricAttributeValues(r.Context(), req) case v3.DataSourceLogs: - // TODO: implement + response, err = aH.reader.GetLogAttributeValues(r.Context(), req) case v3.DataSourceTraces: // TODO: implement default: diff --git a/pkg/query-service/app/parser.go b/pkg/query-service/app/parser.go index 5c4393ae7a..024ae85e6b 100644 --- a/pkg/query-service/app/parser.go +++ b/pkg/query-service/app/parser.go @@ -833,6 +833,8 @@ func parseFilterAttributeKeyRequest(r *http.Request) (*v3.FilterAttributeKeyRequ dataSource := v3.DataSource(r.URL.Query().Get("dataSource")) aggregateOperator := v3.AggregateOperator(r.URL.Query().Get("aggregateOperator")) aggregateAttribute := r.URL.Query().Get("aggregateAttribute") + tagType := v3.TagType(r.URL.Query().Get("tagType")) + limit, err := strconv.Atoi(r.URL.Query().Get("limit")) if err != nil { limit = 50 @@ -846,10 +848,15 @@ func parseFilterAttributeKeyRequest(r *http.Request) (*v3.FilterAttributeKeyRequ return nil, err } + if err := tagType.Validate(); err != nil && tagType != v3.TagType("") { + return nil, err + } + req = v3.FilterAttributeKeyRequest{ DataSource: dataSource, AggregateOperator: aggregateOperator, AggregateAttribute: aggregateAttribute, + TagType: tagType, Limit: limit, SearchText: r.URL.Query().Get("searchText"), } @@ -862,7 +869,9 @@ func parseFilterAttributeValueRequest(r *http.Request) (*v3.FilterAttributeValue dataSource := v3.DataSource(r.URL.Query().Get("dataSource")) aggregateOperator := v3.AggregateOperator(r.URL.Query().Get("aggregateOperator")) + filterAttributeKeyDataType := v3.AttributeKeyDataType(r.URL.Query().Get("filterAttributeKeyDataType")) // can be empty aggregateAttribute := r.URL.Query().Get("aggregateAttribute") + tagType := v3.TagType(r.URL.Query().Get("tagType")) // can be empty limit, err := strconv.Atoi(r.URL.Query().Get("limit")) if err != nil { @@ -878,12 +887,14 @@ func parseFilterAttributeValueRequest(r *http.Request) (*v3.FilterAttributeValue } req = v3.FilterAttributeValueRequest{ - DataSource: dataSource, - AggregateOperator: aggregateOperator, - AggregateAttribute: aggregateAttribute, - Limit: limit, - SearchText: r.URL.Query().Get("searchText"), - FilterAttributeKey: r.URL.Query().Get("attributeKey"), + DataSource: dataSource, + AggregateOperator: aggregateOperator, + AggregateAttribute: aggregateAttribute, + TagType: tagType, + Limit: limit, + SearchText: r.URL.Query().Get("searchText"), + FilterAttributeKey: r.URL.Query().Get("attributeKey"), + FilterAttributeKeyDataType: filterAttributeKeyDataType, } return &req, nil } diff --git a/pkg/query-service/constants/constants.go b/pkg/query-service/constants/constants.go index ce03e364e1..574e35b82d 100644 --- a/pkg/query-service/constants/constants.go +++ b/pkg/query-service/constants/constants.go @@ -5,6 +5,7 @@ import ( "strconv" "go.signoz.io/signoz/pkg/query-service/model" + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" ) const ( @@ -221,3 +222,47 @@ var ReservedColumnTargetAliases = map[string]struct{}{ // logsPPLPfx is a short constant for logsPipelinePrefix const LogsPPLPfx = "logstransform/pipeline_" + +// The datatype present here doesn't represent the actual datatype of column in the logs table. +var StaticFieldsLogsV3 = []v3.AttributeKey{ + { + Key: "trace_id", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + { + Key: "span_id", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + { + Key: "trace_flags", + DataType: v3.AttributeKeyDataTypeInt64, + Type: v3.AttributeKeyTypeTag, + }, + { + Key: "severity_text", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + { + Key: "severity_number", + DataType: v3.AttributeKeyDataTypeInt64, + Type: v3.AttributeKeyTypeTag, + }, + { + Key: "body", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, +} + +var LogsTopLevelColumnsV3 = map[string]struct{}{ + "trace_id": {}, + "span_id": {}, + "trace_flags": {}, + "severity_text": {}, + "severity_number": {}, + "timestamp": {}, + "id": {}, +} diff --git a/pkg/query-service/interfaces/interface.go b/pkg/query-service/interfaces/interface.go index 089003fc8c..59037fba3d 100644 --- a/pkg/query-service/interfaces/interface.go +++ b/pkg/query-service/interfaces/interface.go @@ -75,6 +75,9 @@ type Reader interface { GetLogs(ctx context.Context, params *model.LogsFilterParams) (*[]model.GetLogsResponse, *model.ApiError) TailLogs(ctx context.Context, client *model.LogsTailClient) AggregateLogs(ctx context.Context, params *model.LogsAggregateParams) (*model.GetLogsAggregatesResponse, *model.ApiError) + GetLogAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) + GetLogAttributeValues(ctx context.Context, req *v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error) + GetLogAggregateAttributes(ctx context.Context, req *v3.AggregateAttributeRequest) (*v3.AggregateAttributeResponse, error) // Connection needed for rules, not ideal but required GetConn() clickhouse.Conn diff --git a/pkg/query-service/model/v3/v3.go b/pkg/query-service/model/v3/v3.go index 8a55fb02a8..46fc69f45a 100644 --- a/pkg/query-service/model/v3/v3.go +++ b/pkg/query-service/model/v3/v3.go @@ -182,11 +182,19 @@ type AggregateAttributeRequest struct { type TagType string const ( - TagTypeColumn TagType = "column" TagTypeTag TagType = "tag" TagTypeResource TagType = "resource" ) +func (q TagType) Validate() error { + switch q { + case TagTypeTag, TagTypeResource: + return nil + default: + return fmt.Errorf("invalid tag type: %s", q) + } +} + // FilterAttributeKeyRequest is a request to fetch possible attribute keys // for a selected aggregate operator and aggregate attribute and search text. type FilterAttributeKeyRequest struct { @@ -201,11 +209,21 @@ type FilterAttributeKeyRequest struct { type AttributeKeyDataType string const ( - AttributeKeyDataTypeString AttributeKeyDataType = "string" - AttributeKeyDataTypeNumber AttributeKeyDataType = "number" - AttributeKeyDataTypeBool AttributeKeyDataType = "bool" + AttributeKeyDataTypeString AttributeKeyDataType = "string" + AttributeKeyDataTypeInt64 AttributeKeyDataType = "int64" + AttributeKeyDataTypeFloat64 AttributeKeyDataType = "float64" + AttributeKeyDataTypeBool AttributeKeyDataType = "bool" ) +func (q AttributeKeyDataType) Validate() error { + switch q { + case AttributeKeyDataTypeString, AttributeKeyDataTypeInt64, AttributeKeyDataTypeFloat64, AttributeKeyDataTypeBool: + return nil + default: + return fmt.Errorf("invalid tag type: %s", q) + } +} + // FilterAttributeValueRequest is a request to fetch possible attribute values // for a selected aggregate operator, aggregate attribute, filter attribute key // and search text. @@ -244,7 +262,7 @@ type AttributeKey struct { func (a AttributeKey) Validate() error { switch a.DataType { - case AttributeKeyDataTypeBool, AttributeKeyDataTypeNumber, AttributeKeyDataTypeString: + case AttributeKeyDataTypeBool, AttributeKeyDataTypeInt64, AttributeKeyDataTypeFloat64, AttributeKeyDataTypeString: break default: return fmt.Errorf("invalid attribute dataType: %s", a.DataType)