From ef141d2cee37f25148557b420a2f31e272a1daf6 Mon Sep 17 00:00:00 2001 From: nityanandagohain Date: Tue, 12 Jul 2022 16:38:26 +0530 Subject: [PATCH] API for fields added --- .../app/clickhouseReader/options.go | 108 ++++++++------ .../app/clickhouseReader/reader.go | 138 +++++++++++++++--- pkg/query-service/app/http_handler.go | 41 ++++++ pkg/query-service/app/logs/validator.go | 40 +++++ pkg/query-service/app/server.go | 1 + pkg/query-service/constants/constants.go | 94 +++++++++--- pkg/query-service/interfaces/interface.go | 4 + pkg/query-service/model/queryParams.go | 9 ++ pkg/query-service/model/response.go | 15 ++ 9 files changed, 359 insertions(+), 91 deletions(-) create mode 100644 pkg/query-service/app/logs/validator.go diff --git a/pkg/query-service/app/clickhouseReader/options.go b/pkg/query-service/app/clickhouseReader/options.go index 30f23b5cf3..fc25fb43ce 100644 --- a/pkg/query-service/app/clickhouseReader/options.go +++ b/pkg/query-service/app/clickhouseReader/options.go @@ -18,16 +18,20 @@ const ( ) const ( - defaultDatasource string = "tcp://localhost:9000" - defaultTraceDB string = "signoz_traces" - defaultOperationsTable string = "signoz_operations" - defaultIndexTable string = "signoz_index_v2" - defaultErrorTable string = "signoz_error_index" - defaulDurationTable string = "durationSortMV" - defaultSpansTable string = "signoz_spans" - defaultWriteBatchDelay time.Duration = 5 * time.Second - defaultWriteBatchSize int = 10000 - defaultEncoding Encoding = EncodingJSON + defaultDatasource string = "tcp://localhost:9000" + defaultTraceDB string = "signoz_traces" + defaultOperationsTable string = "signoz_operations" + defaultIndexTable string = "signoz_index_v2" + defaultErrorTable string = "signoz_error_index" + defaulDurationTable string = "durationSortMV" + defaultSpansTable string = "signoz_spans" + defaultLogsDB string = "signoz_logs" + defaultLogsTable string = "logs" + defaultLogAttributeKeysTable string = "logs_atrribute_keys" + defaultLogResourceKeysTable string = "logs_resource_keys" + defaultWriteBatchDelay time.Duration = 5 * time.Second + defaultWriteBatchSize int = 10000 + defaultEncoding Encoding = EncodingJSON ) const ( @@ -43,19 +47,23 @@ const ( // NamespaceConfig is Clickhouse's internal configuration data type namespaceConfig struct { - namespace string - Enabled bool - Datasource string - TraceDB string - OperationsTable string - IndexTable string - DurationTable string - SpansTable string - ErrorTable string - WriteBatchDelay time.Duration - WriteBatchSize int - Encoding Encoding - Connector Connector + namespace string + Enabled bool + Datasource string + TraceDB string + OperationsTable string + IndexTable string + DurationTable string + SpansTable string + ErrorTable string + LogsDB string + LogsTable string + LogsAttributeKeysTable string + LogsResourceKeysTable string + WriteBatchDelay time.Duration + WriteBatchSize int + Encoding Encoding + Connector Connector } // Connecto defines how to connect to the database @@ -102,19 +110,23 @@ func NewOptions(datasource string, primaryNamespace string, otherNamespaces ...s options := &Options{ primary: &namespaceConfig{ - namespace: primaryNamespace, - Enabled: true, - Datasource: datasource, - TraceDB: defaultTraceDB, - OperationsTable: defaultOperationsTable, - IndexTable: defaultIndexTable, - ErrorTable: defaultErrorTable, - DurationTable: defaulDurationTable, - SpansTable: defaultSpansTable, - WriteBatchDelay: defaultWriteBatchDelay, - WriteBatchSize: defaultWriteBatchSize, - Encoding: defaultEncoding, - Connector: defaultConnector, + namespace: primaryNamespace, + Enabled: true, + Datasource: datasource, + TraceDB: defaultTraceDB, + OperationsTable: defaultOperationsTable, + IndexTable: defaultIndexTable, + ErrorTable: defaultErrorTable, + DurationTable: defaulDurationTable, + SpansTable: defaultSpansTable, + LogsDB: defaultLogsDB, + LogsTable: defaultLogsTable, + LogsAttributeKeysTable: defaultLogAttributeKeysTable, + LogsResourceKeysTable: defaultLogResourceKeysTable, + WriteBatchDelay: defaultWriteBatchDelay, + WriteBatchSize: defaultWriteBatchSize, + Encoding: defaultEncoding, + Connector: defaultConnector, }, others: make(map[string]*namespaceConfig, len(otherNamespaces)), } @@ -122,16 +134,20 @@ func NewOptions(datasource string, primaryNamespace string, otherNamespaces ...s for _, namespace := range otherNamespaces { if namespace == archiveNamespace { options.others[namespace] = &namespaceConfig{ - namespace: namespace, - Datasource: datasource, - TraceDB: "", - OperationsTable: "", - IndexTable: "", - ErrorTable: "", - WriteBatchDelay: defaultWriteBatchDelay, - WriteBatchSize: defaultWriteBatchSize, - Encoding: defaultEncoding, - Connector: defaultConnector, + namespace: namespace, + Datasource: datasource, + TraceDB: "", + OperationsTable: "", + IndexTable: "", + ErrorTable: "", + LogsDB: "", + LogsTable: "", + LogsAttributeKeysTable: "", + LogsResourceKeysTable: "", + WriteBatchDelay: defaultWriteBatchDelay, + WriteBatchSize: defaultWriteBatchSize, + Encoding: defaultEncoding, + Connector: defaultConnector, } } else { options.others[namespace] = &namespaceConfig{namespace: namespace} diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index 596354433e..bfcdbe4368 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -83,19 +83,23 @@ var ( // SpanWriter for reading spans from ClickHouse type ClickHouseReader struct { - db clickhouse.Conn - localDB *sqlx.DB - traceDB string - operationsTable string - durationTable string - indexTable string - errorTable string - spansTable string - queryEngine *promql.Engine - remoteStorage *remote.Storage - ruleManager *rules.Manager - promConfig *config.Config - alertManager am.Manager + db clickhouse.Conn + localDB *sqlx.DB + traceDB string + operationsTable string + durationTable string + indexTable string + errorTable string + spansTable string + logsDB string + logsTable string + logsAttributeKeys string + logsResourceKeys string + queryEngine *promql.Engine + remoteStorage *remote.Storage + ruleManager *rules.Manager + promConfig *config.Config + alertManager am.Manager } // NewTraceReader returns a TraceReader for the database @@ -113,15 +117,19 @@ func NewReader(localDB *sqlx.DB) *ClickHouseReader { alertManager := am.New("") return &ClickHouseReader{ - db: db, - localDB: localDB, - traceDB: options.primary.TraceDB, - alertManager: alertManager, - operationsTable: options.primary.OperationsTable, - indexTable: options.primary.IndexTable, - errorTable: options.primary.ErrorTable, - durationTable: options.primary.DurationTable, - spansTable: options.primary.SpansTable, + db: db, + localDB: localDB, + traceDB: options.primary.TraceDB, + alertManager: alertManager, + operationsTable: options.primary.OperationsTable, + indexTable: options.primary.IndexTable, + errorTable: options.primary.ErrorTable, + durationTable: options.primary.DurationTable, + spansTable: options.primary.SpansTable, + logsDB: options.primary.LogsDB, + logsTable: options.primary.LogsTable, + logsAttributeKeys: options.primary.LogsAttributeKeysTable, + logsResourceKeys: options.primary.LogsResourceKeysTable, } } @@ -2985,3 +2993,89 @@ func (r *ClickHouseReader) GetSamplesInfoInLastHeartBeatInterval(ctx context.Con return totalSamples, nil } + +func (r *ClickHouseReader) GetLogFields(ctx context.Context) (*model.GetFieldsResponse, *model.ApiError) { + // response will contain top level fields from the otel log model + response := model.GetFieldsResponse{ + Selected: constants.StaticSelectedLogFields, + Interesting: []model.LogField{}, + } + + // get attribute keys + attributes := &[]model.LogField{} + query := fmt.Sprintf("SELECT DISTINCT name, datatype from %s.%s group by name, datatype", r.logsDB, r.logsAttributeKeys) + err := r.db.Select(ctx, attributes, query) + if err != nil { + return nil, &model.ApiError{Err: err, Typ: model.ErrorInternal} + } + + // get resource keys + resources := &[]model.LogField{} + query = fmt.Sprintf("SELECT DISTINCT name, datatype from %s.%s group by name, datatype", r.logsDB, r.logsResourceKeys) + err = r.db.Select(ctx, resources, query) + if err != nil { + return nil, &model.ApiError{Err: err, Typ: model.ErrorInternal} + } + + statements := []model.CreateTableStatement{} + query = fmt.Sprintf("SHOW CREATE TABLE %s.%s", r.logsDB, r.logsTable) + err = r.db.Select(ctx, &statements, query) + if err != nil { + return nil, &model.ApiError{Err: err, Typ: model.ErrorInternal} + } + + extractSelectedAndInterestingFields(statements[0].Statement, constants.Attributes, attributes, &response) + extractSelectedAndInterestingFields(statements[0].Statement, constants.Resources, resources, &response) + extractSelectedAndInterestingFields(statements[0].Statement, constants.Static, &constants.StaticInterestingLogFields, &response) + + return &response, nil +} + +func extractSelectedAndInterestingFields(tableStatement string, fieldType string, fields *[]model.LogField, response *model.GetFieldsResponse) { + for _, field := range *fields { + field.Type = fieldType + if strings.Contains(tableStatement, fmt.Sprintf("INDEX %s_idx", field.Name)) { + response.Selected = append(response.Selected, field) + } else { + response.Interesting = append(response.Interesting, field) + } + } +} + +func (r *ClickHouseReader) UpdateLogField(ctx context.Context, field *model.UpdateField) *model.ApiError { + // if a field is selected it means that the field is indexed + if field.Selected { + // if the type is attribute or resource, create the materialized column first + if field.Type == constants.Attributes || field.Type == constants.Resources { + // create materialized + query := fmt.Sprintf("ALTER TABLE %s.%s ADD COLUMN IF NOT EXISTS %s %s MATERIALIZED %s_%s_value[indexOf(%s_%s_key, '%s')]", r.logsDB, r.logsTable, field.Name, field.DataType, field.Type, strings.ToLower(field.DataType), field.Type, strings.ToLower(field.DataType), field.Name) + err := r.db.Exec(ctx, query) + if err != nil { + return &model.ApiError{Err: err, Typ: model.ErrorInternal} + } + } + + // create the index + if field.IndexType == nil { + iType := constants.DefaultLogSkipIndexType + field.IndexType = &iType + } + if field.IndexGranularity == nil { + granularity := constants.DefaultLogSkipIndexGranularity + field.IndexGranularity = &granularity + } + query := fmt.Sprintf("ALTER TABLE %s.%s ADD INDEX IF NOT EXISTS %s_idx (%s) TYPE %s GRANULARITY %d", r.logsDB, r.logsTable, field.Name, field.Name, *field.IndexType, *field.IndexGranularity) + err := r.db.Exec(ctx, query) + if err != nil { + return &model.ApiError{Err: err, Typ: model.ErrorInternal} + } + } else { + // remove index + query := fmt.Sprintf("ALTER TABLE %s.%s DROP INDEX IF EXISTS %s_idx", r.logsDB, r.logsTable, field.Name) + err := r.db.Exec(ctx, query) + if err != nil { + return &model.ApiError{Err: err, Typ: model.ErrorInternal} + } + } + return nil +} diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 4e923af79c..dd752fa376 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -16,6 +16,7 @@ import ( _ "github.com/mattn/go-sqlite3" "github.com/prometheus/prometheus/promql" "go.signoz.io/query-service/app/dashboards" + "go.signoz.io/query-service/app/logs" "go.signoz.io/query-service/app/metrics" "go.signoz.io/query-service/app/parser" "go.signoz.io/query-service/auth" @@ -1816,3 +1817,43 @@ func (aH *APIHandler) writeJSON(w http.ResponseWriter, r *http.Request, response w.Header().Set("Content-Type", "application/json") w.Write(resp) } + +// logs +func (aH *APIHandler) RegisterLogsRoutes(router *mux.Router) { + subRouter := router.PathPrefix("/api/v1/logs").Subrouter() + subRouter.HandleFunc("/fields", ViewAccess(aH.logFields)).Methods(http.MethodGet) + subRouter.HandleFunc("/fields", ViewAccess(aH.logFieldUpdate)).Methods(http.MethodPost) +} + +func (aH *APIHandler) logFields(w http.ResponseWriter, r *http.Request) { + + fields, apiErr := (*aH.reader).GetLogFields(r.Context()) + if apiErr != nil { + respondError(w, apiErr, "Failed to fetch org from the DB") + return + } + aH.writeJSON(w, r, fields) +} + +func (aH *APIHandler) logFieldUpdate(w http.ResponseWriter, r *http.Request) { + field := model.UpdateField{} + if err := json.NewDecoder(r.Body).Decode(&field); err != nil { + apiErr := &model.ApiError{Typ: model.ErrorBadData, Err: err} + respondError(w, apiErr, "Failed to decode payload") + return + } + + err := logs.ValidateUpdateFieldPayload(&field) + if err != nil { + apiErr := &model.ApiError{Typ: model.ErrorBadData, Err: err} + respondError(w, apiErr, "Incorrect payload") + return + } + + apiErr := (*aH.reader).UpdateLogField(r.Context(), &field) + if apiErr != nil { + respondError(w, apiErr, "Failed to fetch org from the DB") + return + } + aH.writeJSON(w, r, field) +} diff --git a/pkg/query-service/app/logs/validator.go b/pkg/query-service/app/logs/validator.go new file mode 100644 index 0000000000..9277bda047 --- /dev/null +++ b/pkg/query-service/app/logs/validator.go @@ -0,0 +1,40 @@ +package logs + +import ( + "fmt" + "regexp" + + "go.signoz.io/query-service/constants" + "go.signoz.io/query-service/model" +) + +func ValidateUpdateFieldPayload(field *model.UpdateField) error { + if field.Name == "" { + return fmt.Errorf("name cannot be empty") + } + if field.Type == "" { + return fmt.Errorf("type cannot be empty") + } + if field.DataType == "" { + return fmt.Errorf("dataType cannot be empty") + } + + matched, err := regexp.MatchString(fmt.Sprintf("^(%s|%s|%s)$", constants.Static, constants.Attributes, constants.Resources), field.Type) + if err != nil { + return err + } + if !matched { + return fmt.Errorf("type %s not supported", field.Type) + } + + if field.IndexType != nil { + matched, err := regexp.MatchString(`^(minmax|set\([0-9]\)|bloom_filter\((0?.?[0-9]+|1)\)|tokenbf_v1\([0-9]+,[0-9]+,[0-9]+\)|ngrambf_v1\([0-9]+,[0-9]+,[0-9]+,[0-9]+\))$`, *field.IndexType) + if err != nil { + return err + } + if !matched { + return fmt.Errorf("index type %s not supported", *field.IndexType) + } + } + return nil +} diff --git a/pkg/query-service/app/server.go b/pkg/query-service/app/server.go index 5bccea66e2..2d23531c41 100644 --- a/pkg/query-service/app/server.go +++ b/pkg/query-service/app/server.go @@ -145,6 +145,7 @@ func (s *Server) createPublicServer(api *APIHandler) (*http.Server, error) { api.RegisterRoutes(r) api.RegisterMetricsRoutes(r) + api.RegisterLogsRoutes(r) c := cors.New(cors.Options{ AllowedOrigins: []string{"*"}, diff --git a/pkg/query-service/constants/constants.go b/pkg/query-service/constants/constants.go index b4bc4b08ef..3de0f6ae7a 100644 --- a/pkg/query-service/constants/constants.go +++ b/pkg/query-service/constants/constants.go @@ -3,6 +3,8 @@ package constants import ( "os" "strconv" + + "go.signoz.io/query-service/model" ) const ( @@ -38,29 +40,34 @@ var AmChannelApiPath = GetOrDefaultEnv("ALERTMANAGER_API_CHANNEL_PATH", "v1/rout var RELATIONAL_DATASOURCE_PATH = GetOrDefaultEnv("SIGNOZ_LOCAL_DB_PATH", "/var/lib/signoz/signoz.db") const ( - ServiceName = "serviceName" - HttpRoute = "httpRoute" - HttpCode = "httpCode" - HttpHost = "httpHost" - HttpUrl = "httpUrl" - HttpMethod = "httpMethod" - Component = "component" - OperationDB = "name" - OperationRequest = "operation" - Status = "status" - Duration = "duration" - DBName = "dbName" - DBOperation = "dbOperation" - DBSystem = "dbSystem" - MsgSystem = "msgSystem" - MsgOperation = "msgOperation" - Timestamp = "timestamp" - Descending = "descending" - Ascending = "ascending" - ContextTimeout = 60 // seconds - StatusPending = "pending" - StatusFailed = "failed" - StatusSuccess = "success" + ServiceName = "serviceName" + HttpRoute = "httpRoute" + HttpCode = "httpCode" + HttpHost = "httpHost" + HttpUrl = "httpUrl" + HttpMethod = "httpMethod" + Component = "component" + OperationDB = "name" + OperationRequest = "operation" + Status = "status" + Duration = "duration" + DBName = "dbName" + DBOperation = "dbOperation" + DBSystem = "dbSystem" + MsgSystem = "msgSystem" + MsgOperation = "msgOperation" + Timestamp = "timestamp" + Descending = "descending" + Ascending = "ascending" + ContextTimeout = 60 // seconds + StatusPending = "pending" + StatusFailed = "failed" + StatusSuccess = "success" + Attributes = "attributes" + Resources = "resources" + Static = "static" + DefaultLogSkipIndexType = "bloom_filter(0.01)" + DefaultLogSkipIndexGranularity = 64 ) const ( SIGNOZ_METRIC_DBNAME = "signoz_metrics" @@ -75,3 +82,44 @@ func GetOrDefaultEnv(key string, fallback string) string { } return v } + +var StaticInterestingLogFields = []model.LogField{ + { + Name: "trace_id", + DataType: "String", + Type: Static, + }, + { + Name: "span_id", + DataType: "String", + Type: Static, + }, + { + Name: "trace_flags", + DataType: "UInt32", + Type: Static, + }, + { + Name: "severity_text", + DataType: "LowCardinality(String)", + Type: Static, + }, + { + Name: "severity_number", + DataType: "Int32", + Type: Static, + }, +} + +var StaticSelectedLogFields = []model.LogField{ + { + Name: "timestamp", + DataType: "UInt64", + Type: Static, + }, + { + Name: "id", + DataType: "String", + Type: Static, + }, +} diff --git a/pkg/query-service/interfaces/interface.go b/pkg/query-service/interfaces/interface.go index 9c52a4497d..0bf1811b0c 100644 --- a/pkg/query-service/interfaces/interface.go +++ b/pkg/query-service/interfaces/interface.go @@ -59,4 +59,8 @@ type Reader interface { GetSpansInLastHeartBeatInterval(ctx context.Context) (uint64, error) GetTimeSeriesInfo(ctx context.Context) (map[string]interface{}, error) GetSamplesInfoInLastHeartBeatInterval(ctx context.Context) (uint64, error) + + // Logs + GetLogFields(ctx context.Context) (*model.GetFieldsResponse, *model.ApiError) + UpdateLogField(ctx context.Context, field *model.UpdateField) *model.ApiError } diff --git a/pkg/query-service/model/queryParams.go b/pkg/query-service/model/queryParams.go index 69509849d2..ec2032a06e 100644 --- a/pkg/query-service/model/queryParams.go +++ b/pkg/query-service/model/queryParams.go @@ -303,3 +303,12 @@ type FilterSet struct { Operator string `json:"op,omitempty"` Items []FilterItem `json:"items"` } + +type UpdateField struct { + Name string `json:"name"` + DataType string `json:"dataType"` + Type string `json:"type"` + Selected bool `json:"selected"` + IndexType *string `json:"index"` + IndexGranularity *int `json:"indexGranularity"` +} diff --git a/pkg/query-service/model/response.go b/pkg/query-service/model/response.go index 523ad7e96e..f61132bc94 100644 --- a/pkg/query-service/model/response.go +++ b/pkg/query-service/model/response.go @@ -373,3 +373,18 @@ func (p *MetricPoint) MarshalJSON() ([]byte, error) { v := strconv.FormatFloat(p.Value, 'f', -1, 64) return json.Marshal([...]interface{}{float64(p.Timestamp) / 1000, v}) } + +type CreateTableStatement struct { + Statement string `json:"statement" ch:"statement"` +} + +type LogField struct { + Name string `json:"name" ch:"name"` + DataType string `json:"dataType" ch:"datatype"` + Type string `json:"type"` +} + +type GetFieldsResponse struct { + Selected []LogField `json:"selected"` + Interesting []LogField `json:"interesting"` +}