diff --git a/pkg/query-service/app/clickhouseReader/options.go b/pkg/query-service/app/clickhouseReader/options.go index 29816c2a08..12f277e4f3 100644 --- a/pkg/query-service/app/clickhouseReader/options.go +++ b/pkg/query-service/app/clickhouseReader/options.go @@ -28,6 +28,11 @@ const ( defaultSpansTable string = "signoz_spans" defaultDependencyGraphTable string = "dependency_graph_minutes" defaultTopLevelOperationsTable string = "top_level_operations" + defaultLogsDB string = "signoz_logs" + defaultLogsTable string = "logs" + defaultLogAttributeKeysTable string = "logs_atrribute_keys" + defaultLogResourceKeysTable string = "logs_resource_keys" + defaultLiveTailRefreshSeconds int = 10 defaultWriteBatchDelay time.Duration = 5 * time.Second defaultWriteBatchSize int = 10000 defaultEncoding Encoding = EncodingJSON @@ -58,6 +63,11 @@ type namespaceConfig struct { ErrorTable string DependencyGraphTable string TopLevelOperationsTable string + LogsDB string + LogsTable string + LogsAttributeKeysTable string + LogsResourceKeysTable string + LiveTailRefreshSeconds int WriteBatchDelay time.Duration WriteBatchSize int Encoding Encoding @@ -120,6 +130,11 @@ func NewOptions(datasource string, primaryNamespace string, otherNamespaces ...s SpansTable: defaultSpansTable, DependencyGraphTable: defaultDependencyGraphTable, TopLevelOperationsTable: defaultTopLevelOperationsTable, + LogsDB: defaultLogsDB, + LogsTable: defaultLogsTable, + LogsAttributeKeysTable: defaultLogAttributeKeysTable, + LogsResourceKeysTable: defaultLogResourceKeysTable, + LiveTailRefreshSeconds: defaultLiveTailRefreshSeconds, WriteBatchDelay: defaultWriteBatchDelay, WriteBatchSize: defaultWriteBatchSize, Encoding: defaultEncoding, @@ -131,16 +146,21 @@ func NewOptions(datasource string, primaryNamespace string, otherNamespaces ...s for _, namespace := range otherNamespaces { if namespace == archiveNamespace { options.others[namespace] = &namespaceConfig{ - namespace: namespace, - Datasource: datasource, - TraceDB: "", - OperationsTable: "", - IndexTable: "", - ErrorTable: "", - WriteBatchDelay: defaultWriteBatchDelay, - WriteBatchSize: defaultWriteBatchSize, - Encoding: defaultEncoding, - Connector: defaultConnector, + namespace: namespace, + Datasource: datasource, + TraceDB: "", + OperationsTable: "", + IndexTable: "", + ErrorTable: "", + LogsDB: "", + LogsTable: "", + LogsAttributeKeysTable: "", + LogsResourceKeysTable: "", + LiveTailRefreshSeconds: defaultLiveTailRefreshSeconds, + WriteBatchDelay: defaultWriteBatchDelay, + WriteBatchSize: defaultWriteBatchSize, + Encoding: defaultEncoding, + Connector: defaultConnector, } } else { options.others[namespace] = &namespaceConfig{namespace: namespace} diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index 65758f00e2..1367c51a27 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -39,6 +39,7 @@ import ( "github.com/jmoiron/sqlx" promModel "github.com/prometheus/common/model" + "go.signoz.io/query-service/app/logs" "go.signoz.io/query-service/constants" am "go.signoz.io/query-service/integrations/alertManager" "go.signoz.io/query-service/model" @@ -81,18 +82,24 @@ type ClickHouseReader struct { traceDB string operationsTable string durationTable string - usageExplorerTable string indexTable string errorTable string + usageExplorerTable string spansTable string dependencyGraphTable string topLevelOperationsTable string + logsDB string + logsTable string + logsAttributeKeys string + logsResourceKeys string queryEngine *promql.Engine remoteStorage *remote.Storage promConfigFile string promConfig *config.Config alertManager am.Manager + + liveTailRefreshSeconds int } // NewTraceReader returns a TraceReader for the database @@ -127,6 +134,11 @@ func NewReader(localDB *sqlx.DB, configFile string) *ClickHouseReader { spansTable: options.primary.SpansTable, dependencyGraphTable: options.primary.DependencyGraphTable, topLevelOperationsTable: options.primary.TopLevelOperationsTable, + logsDB: options.primary.LogsDB, + logsTable: options.primary.LogsTable, + logsAttributeKeys: options.primary.LogsAttributeKeysTable, + logsResourceKeys: options.primary.LogsResourceKeysTable, + liveTailRefreshSeconds: options.primary.LiveTailRefreshSeconds, promConfigFile: configFile, } } @@ -1972,7 +1984,7 @@ func (r *ClickHouseReader) GetFilteredSpansAggregates(ctx context.Context, query return &GetFilteredSpansAggregatesResponse, nil } -// SetTTL sets the TTL for traces or metrics tables. +// SetTTL sets the TTL for traces or metrics or logs tables. // This is an async API which creates goroutines to set TTL. // Status of TTL update is tracked with ttl_status table in sqlite db. func (r *ClickHouseReader) SetTTL(ctx context.Context, @@ -2101,6 +2113,59 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context, return } }(tableName) + case constants.LogsTTL: + tableName = r.logsDB + "." + r.logsTable + statusItem, err := r.checkTTLStatusItem(ctx, tableName) + if err != nil { + return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")} + } + if statusItem.Status == constants.StatusPending { + return nil, &model.ApiError{Typ: model.ErrorConflict, Err: fmt.Errorf("TTL is already running")} + } + go func(tableName string) { + _, dbErr := r.localDB.Exec("INSERT INTO ttl_status (transaction_id, created_at, updated_at, table_name, ttl, status, cold_storage_ttl) VALUES (?, ?, ?, ?, ?, ?, ?)", uuid, time.Now(), time.Now(), tableName, params.DelDuration, constants.StatusPending, coldStorageDuration) + if dbErr != nil { + zap.S().Error(fmt.Errorf("error in inserting to ttl_status table: %s", dbErr.Error())) + return + } + req = fmt.Sprintf( + "ALTER TABLE %v MODIFY TTL toDateTime(timestamp / 1000000000) + "+ + "INTERVAL %v SECOND DELETE", tableName, params.DelDuration) + if len(params.ColdStorageVolume) > 0 { + req += fmt.Sprintf(", toDateTime(timestamp / 1000000000)"+ + " + INTERVAL %v SECOND TO VOLUME '%s'", + params.ToColdStorageDuration, params.ColdStorageVolume) + } + err := r.setColdStorage(context.Background(), tableName, params.ColdStorageVolume) + if err != nil { + zap.S().Error(fmt.Errorf("error in setting cold storage: %s", err.Err.Error())) + statusItem, err := r.checkTTLStatusItem(ctx, tableName) + if err == nil { + _, dbErr := r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusFailed, statusItem.Id) + if dbErr != nil { + zap.S().Debug("Error in processing ttl_status update sql query: ", dbErr) + return + } + } + return + } + zap.S().Debugf("Executing TTL request: %s\n", req) + statusItem, _ := r.checkTTLStatusItem(ctx, tableName) + if err := r.db.Exec(ctx, req); err != nil { + zap.S().Error(fmt.Errorf("error while setting ttl. Err=%v", err)) + _, dbErr := r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusFailed, statusItem.Id) + if dbErr != nil { + zap.S().Debug("Error in processing ttl_status update sql query: ", dbErr) + return + } + return + } + _, dbErr = r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusSuccess, statusItem.Id) + if dbErr != nil { + zap.S().Debug("Error in processing ttl_status update sql query: ", dbErr) + return + } + }(tableName) default: return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error while setting ttl. ttl type should be , got %v", @@ -2264,6 +2329,24 @@ func (r *ClickHouseReader) GetTTL(ctx context.Context, ttlParams *model.GetTTLPa } } + getLogsTTL := func() (*model.DBResponseTTL, *model.ApiError) { + var dbResp []model.DBResponseTTL + + query := fmt.Sprintf("SELECT engine_full FROM system.tables WHERE name='%v' AND database='%v'", r.logsTable, r.logsDB) + + err := r.db.Select(ctx, &dbResp, query) + + if err != nil { + zap.S().Error(fmt.Errorf("error while getting ttl. Err=%v", err)) + return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error while getting ttl. Err=%v", err)} + } + if len(dbResp) == 0 { + return nil, nil + } else { + return &dbResp[0], nil + } + } + switch ttlParams.Type { case constants.TraceTTL: tableNameArray := []string{signozTraceDBName + "." + signozTraceTableName, signozTraceDBName + "." + signozDurationMVTable, signozTraceDBName + "." + signozSpansTable, signozTraceDBName + "." + signozErrorIndexTable, signozTraceDBName + "." + signozUsageExplorerTable, signozTraceDBName + "." + defaultDependencyGraphTable} @@ -2308,6 +2391,29 @@ func (r *ClickHouseReader) GetTTL(ctx context.Context, ttlParams *model.GetTTLPa delTTL, moveTTL := parseTTL(dbResp.EngineFull) return &model.GetTTLResponseItem{MetricsTime: delTTL, MetricsMoveTime: moveTTL, ExpectedMetricsTime: ttlQuery.TTL, ExpectedMetricsMoveTime: ttlQuery.ColdStorageTtl, Status: status}, nil + + case constants.LogsTTL: + tableNameArray := []string{r.logsDB + "." + r.logsTable} + status, err := r.setTTLQueryStatus(ctx, tableNameArray) + if err != nil { + return nil, err + } + dbResp, err := getLogsTTL() + if err != nil { + return nil, err + } + ttlQuery, err := r.checkTTLStatusItem(ctx, tableNameArray[0]) + if err != nil { + return nil, err + } + ttlQuery.TTL = ttlQuery.TTL / 3600 // convert to hours + if ttlQuery.ColdStorageTtl != -1 { + ttlQuery.ColdStorageTtl = ttlQuery.ColdStorageTtl / 3600 // convert to hours + } + + delTTL, moveTTL := parseTTL(dbResp.EngineFull) + return &model.GetTTLResponseItem{LogsTime: delTTL, LogsMoveTime: moveTTL, ExpectedLogsTime: ttlQuery.TTL, ExpectedLogsMoveTime: ttlQuery.ColdStorageTtl, Status: status}, nil + default: return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error while getting ttl. ttl type should be metrics|traces, got %v", ttlParams.Type)} @@ -2849,3 +2955,272 @@ func (r *ClickHouseReader) GetSamplesInfoInLastHeartBeatInterval(ctx context.Con return totalSamples, nil } + +func (r *ClickHouseReader) GetLogFields(ctx context.Context) (*model.GetFieldsResponse, *model.ApiError) { + // response will contain top level fields from the otel log model + response := model.GetFieldsResponse{ + Selected: constants.StaticSelectedLogFields, + Interesting: []model.LogField{}, + } + + // get attribute keys + attributes := []model.LogField{} + query := fmt.Sprintf("SELECT DISTINCT name, datatype from %s.%s group by name, datatype", r.logsDB, r.logsAttributeKeys) + err := r.db.Select(ctx, &attributes, query) + if err != nil { + return nil, &model.ApiError{Err: err, Typ: model.ErrorInternal} + } + + // get resource keys + resources := []model.LogField{} + query = fmt.Sprintf("SELECT DISTINCT name, datatype from %s.%s group by name, datatype", r.logsDB, r.logsResourceKeys) + err = r.db.Select(ctx, &resources, query) + if err != nil { + return nil, &model.ApiError{Err: err, Typ: model.ErrorInternal} + } + + statements := []model.ShowCreateTableStatement{} + query = fmt.Sprintf("SHOW CREATE TABLE %s.%s", r.logsDB, r.logsTable) + err = r.db.Select(ctx, &statements, query) + if err != nil { + return nil, &model.ApiError{Err: err, Typ: model.ErrorInternal} + } + + extractSelectedAndInterestingFields(statements[0].Statement, constants.Attributes, &attributes, &response) + extractSelectedAndInterestingFields(statements[0].Statement, constants.Resources, &resources, &response) + extractSelectedAndInterestingFields(statements[0].Statement, constants.Static, &constants.StaticInterestingLogFields, &response) + + return &response, nil +} + +func extractSelectedAndInterestingFields(tableStatement string, fieldType string, fields *[]model.LogField, response *model.GetFieldsResponse) { + for _, field := range *fields { + field.Type = fieldType + if strings.Contains(tableStatement, fmt.Sprintf("INDEX %s_idx", field.Name)) { + response.Selected = append(response.Selected, field) + } else { + response.Interesting = append(response.Interesting, field) + } + } +} + +func (r *ClickHouseReader) UpdateLogField(ctx context.Context, field *model.UpdateField) *model.ApiError { + // if a field is selected it means that the field needs to be indexed + if field.Selected { + // if the type is attribute or resource, create the materialized column first + if field.Type == constants.Attributes || field.Type == constants.Resources { + // create materialized + query := fmt.Sprintf("ALTER TABLE %s.%s ADD COLUMN IF NOT EXISTS %s %s MATERIALIZED %s_%s_value[indexOf(%s_%s_key, '%s')]", r.logsDB, r.logsTable, field.Name, field.DataType, field.Type, strings.ToLower(field.DataType), field.Type, strings.ToLower(field.DataType), field.Name) + err := r.db.Exec(ctx, query) + if err != nil { + return &model.ApiError{Err: err, Typ: model.ErrorInternal} + } + } + + // create the index + if field.IndexType == "" { + field.IndexType = constants.DefaultLogSkipIndexType + } + if field.IndexGranularity == 0 { + field.IndexGranularity = constants.DefaultLogSkipIndexGranularity + } + query := fmt.Sprintf("ALTER TABLE %s.%s ADD INDEX IF NOT EXISTS %s_idx (%s) TYPE %s GRANULARITY %d", r.logsDB, r.logsTable, field.Name, field.Name, field.IndexType, field.IndexGranularity) + err := r.db.Exec(ctx, query) + if err != nil { + return &model.ApiError{Err: err, Typ: model.ErrorInternal} + } + } else { + // remove index + query := fmt.Sprintf("ALTER TABLE %s.%s DROP INDEX IF EXISTS %s_idx", r.logsDB, r.logsTable, field.Name) + err := r.db.Exec(ctx, query) + if err != nil { + return &model.ApiError{Err: err, Typ: model.ErrorInternal} + } + } + return nil +} + +func (r *ClickHouseReader) GetLogs(ctx context.Context, params *model.LogsFilterParams) (*[]model.GetLogsResponse, *model.ApiError) { + response := []model.GetLogsResponse{} + fields, apiErr := r.GetLogFields(ctx) + if apiErr != nil { + return nil, apiErr + } + + isPaginatePrev := logs.CheckIfPrevousPaginateAndModifyOrder(params) + filterSql, err := logs.GenerateSQLWhere(fields, params) + if err != nil { + return nil, &model.ApiError{Err: err, Typ: model.ErrorBadData} + } + + query := fmt.Sprintf("%s from %s.%s", constants.LogsSQLSelect, r.logsDB, r.logsTable) + + if filterSql != "" { + query = fmt.Sprintf("%s where %s", query, filterSql) + } + + query = fmt.Sprintf("%s order by %s %s limit %d", query, params.OrderBy, params.Order, params.Limit) + zap.S().Debug(query) + err = r.db.Select(ctx, &response, query) + if err != nil { + return nil, &model.ApiError{Err: err, Typ: model.ErrorInternal} + } + if isPaginatePrev { + // rever the results from db + for i, j := 0, len(response)-1; i < j; i, j = i+1, j-1 { + response[i], response[j] = response[j], response[i] + } + } + return &response, nil +} + +func (r *ClickHouseReader) TailLogs(ctx context.Context, client *model.LogsTailClient) { + + fields, apiErr := r.GetLogFields(ctx) + if apiErr != nil { + client.Error <- apiErr.Err + return + } + + filterSql, err := logs.GenerateSQLWhere(fields, &model.LogsFilterParams{ + Query: client.Filter.Query, + }) + + if err != nil { + client.Error <- err + return + } + + query := fmt.Sprintf("%s from %s.%s", constants.LogsSQLSelect, r.logsDB, r.logsTable) + + tsStart := uint64(time.Now().UnixNano()) + if client.Filter.TimestampStart != 0 { + tsStart = client.Filter.TimestampStart + } + + var idStart string + if client.Filter.IdGt != "" { + idStart = client.Filter.IdGt + } + + ticker := time.NewTicker(time.Duration(r.liveTailRefreshSeconds) * time.Second) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + done := true + client.Done <- &done + zap.S().Debug("closing go routine : " + client.Name) + return + case <-ticker.C: + // get the new 100 logs as anything more older won't make sense + tmpQuery := fmt.Sprintf("%s where timestamp >='%d'", query, tsStart) + if filterSql != "" { + tmpQuery = fmt.Sprintf("%s and %s", tmpQuery, filterSql) + } + if idStart != "" { + tmpQuery = fmt.Sprintf("%s and id > '%s'", tmpQuery, idStart) + } + tmpQuery = fmt.Sprintf("%s order by timestamp desc, id desc limit 100", tmpQuery) + zap.S().Debug(tmpQuery) + response := []model.GetLogsResponse{} + err := r.db.Select(ctx, &response, tmpQuery) + if err != nil { + zap.S().Error(err) + client.Error <- err + return + } + for i := len(response) - 1; i >= 0; i-- { + select { + case <-ctx.Done(): + done := true + client.Done <- &done + zap.S().Debug("closing go routine while sending logs : " + client.Name) + return + default: + client.Logs <- &response[i] + if i == 0 { + tsStart = response[i].Timestamp + idStart = response[i].ID + } + } + } + } + } +} + +func (r *ClickHouseReader) AggregateLogs(ctx context.Context, params *model.LogsAggregateParams) (*model.GetLogsAggregatesResponse, *model.ApiError) { + logAggregatesDBResponseItems := []model.LogsAggregatesDBResponseItem{} + + function := "toFloat64(count()) as value" + if params.Function != "" { + function = fmt.Sprintf("toFloat64(%s) as value", params.Function) + } + + fields, apiErr := r.GetLogFields(ctx) + if apiErr != nil { + return nil, apiErr + } + + filterSql, err := logs.GenerateSQLWhere(fields, &model.LogsFilterParams{ + Query: params.Query, + }) + if err != nil { + return nil, &model.ApiError{Err: err, Typ: model.ErrorBadData} + } + + query := "" + if params.GroupBy != "" { + query = fmt.Sprintf("SELECT toInt64(toUnixTimestamp(toStartOfInterval(toDateTime(timestamp/1000000000), INTERVAL %d minute))*1000000000) as time, toString(%s) as groupBy, "+ + "%s "+ + "FROM %s.%s WHERE timestamp >= '%d' AND timestamp <= '%d' ", + params.StepSeconds/60, params.GroupBy, function, r.logsDB, r.logsTable, params.TimestampStart, params.TimestampEnd) + } else { + query = fmt.Sprintf("SELECT toInt64(toUnixTimestamp(toStartOfInterval(toDateTime(timestamp/1000000000), INTERVAL %d minute))*1000000000) as time, "+ + "%s "+ + "FROM %s.%s WHERE timestamp >= '%d' AND timestamp <= '%d' ", + params.StepSeconds/60, function, r.logsDB, r.logsTable, params.TimestampStart, params.TimestampEnd) + } + if filterSql != "" { + query = fmt.Sprintf("%s AND %s ", query, filterSql) + } + if params.GroupBy != "" { + query = fmt.Sprintf("%s GROUP BY time, toString(%s) as groupBy ORDER BY time", query, params.GroupBy) + } else { + query = fmt.Sprintf("%s GROUP BY time ORDER BY time", query) + } + + zap.S().Debug(query) + err = r.db.Select(ctx, &logAggregatesDBResponseItems, query) + if err != nil { + return nil, &model.ApiError{Err: err, Typ: model.ErrorInternal} + } + + aggregateResponse := model.GetLogsAggregatesResponse{ + Items: make(map[int64]model.LogsAggregatesResponseItem), + } + + for i := range logAggregatesDBResponseItems { + if elem, ok := aggregateResponse.Items[int64(logAggregatesDBResponseItems[i].Timestamp)]; ok { + if params.GroupBy != "" && logAggregatesDBResponseItems[i].GroupBy != "" { + elem.GroupBy[logAggregatesDBResponseItems[i].GroupBy] = logAggregatesDBResponseItems[i].Value + } + aggregateResponse.Items[logAggregatesDBResponseItems[i].Timestamp] = elem + } else { + if params.GroupBy != "" && logAggregatesDBResponseItems[i].GroupBy != "" { + aggregateResponse.Items[logAggregatesDBResponseItems[i].Timestamp] = model.LogsAggregatesResponseItem{ + Timestamp: logAggregatesDBResponseItems[i].Timestamp, + GroupBy: map[string]interface{}{logAggregatesDBResponseItems[i].GroupBy: logAggregatesDBResponseItems[i].Value}, + } + } else if params.GroupBy == "" { + aggregateResponse.Items[logAggregatesDBResponseItems[i].Timestamp] = model.LogsAggregatesResponseItem{ + Timestamp: logAggregatesDBResponseItems[i].Timestamp, + Value: logAggregatesDBResponseItems[i].Value, + } + } + } + + } + + return &aggregateResponse, nil +} diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index a7a0a40c70..0a63e38c6b 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -1,6 +1,7 @@ package app import ( + "bytes" "context" "encoding/json" "errors" @@ -16,6 +17,7 @@ import ( _ "github.com/mattn/go-sqlite3" "github.com/prometheus/prometheus/promql" "go.signoz.io/query-service/app/dashboards" + "go.signoz.io/query-service/app/logs" "go.signoz.io/query-service/app/metrics" "go.signoz.io/query-service/app/parser" "go.signoz.io/query-service/auth" @@ -1922,3 +1924,120 @@ func (aH *APIHandler) writeJSON(w http.ResponseWriter, r *http.Request, response w.Header().Set("Content-Type", "application/json") w.Write(resp) } + +// logs +func (aH *APIHandler) RegisterLogsRoutes(router *mux.Router) { + subRouter := router.PathPrefix("/api/v1/logs").Subrouter() + subRouter.HandleFunc("", ViewAccess(aH.getLogs)).Methods(http.MethodGet) + subRouter.HandleFunc("/tail", ViewAccess(aH.tailLogs)).Methods(http.MethodGet) + subRouter.HandleFunc("/fields", ViewAccess(aH.logFields)).Methods(http.MethodGet) + subRouter.HandleFunc("/fields", EditAccess(aH.logFieldUpdate)).Methods(http.MethodPost) + subRouter.HandleFunc("/aggregate", ViewAccess(aH.logAggregate)).Methods(http.MethodGet) +} + +func (aH *APIHandler) logFields(w http.ResponseWriter, r *http.Request) { + fields, apiErr := (*aH.reader).GetLogFields(r.Context()) + if apiErr != nil { + respondError(w, apiErr, "Failed to fetch fields from the DB") + return + } + aH.writeJSON(w, r, fields) +} + +func (aH *APIHandler) logFieldUpdate(w http.ResponseWriter, r *http.Request) { + field := model.UpdateField{} + if err := json.NewDecoder(r.Body).Decode(&field); err != nil { + apiErr := &model.ApiError{Typ: model.ErrorBadData, Err: err} + respondError(w, apiErr, "Failed to decode payload") + return + } + + err := logs.ValidateUpdateFieldPayload(&field) + if err != nil { + apiErr := &model.ApiError{Typ: model.ErrorBadData, Err: err} + respondError(w, apiErr, "Incorrect payload") + return + } + + apiErr := (*aH.reader).UpdateLogField(r.Context(), &field) + if apiErr != nil { + respondError(w, apiErr, "Failed to update filed in the DB") + return + } + aH.writeJSON(w, r, field) +} + +func (aH *APIHandler) getLogs(w http.ResponseWriter, r *http.Request) { + params, err := logs.ParseLogFilterParams(r) + if err != nil { + apiErr := &model.ApiError{Typ: model.ErrorBadData, Err: err} + respondError(w, apiErr, "Incorrect params") + return + } + + res, apiErr := (*aH.reader).GetLogs(r.Context(), params) + if apiErr != nil { + respondError(w, apiErr, "Failed to fetch logs from the DB") + return + } + aH.writeJSON(w, r, map[string]interface{}{"results": res}) +} + +func (aH *APIHandler) tailLogs(w http.ResponseWriter, r *http.Request) { + params, err := logs.ParseLogFilterParams(r) + if err != nil { + apiErr := &model.ApiError{Typ: model.ErrorBadData, Err: err} + respondError(w, apiErr, "Incorrect params") + return + } + + // create the client + client := &model.LogsTailClient{Name: r.RemoteAddr, Logs: make(chan *model.GetLogsResponse, 1000), Done: make(chan *bool), Error: make(chan error), Filter: *params} + go (*aH.reader).TailLogs(r.Context(), client) + + w.Header().Set("Connection", "keep-alive") + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Access-Control-Allow-Origin", "*") + w.WriteHeader(200) + + flusher, ok := w.(http.Flusher) + if !ok { + err := model.ApiError{Typ: model.ErrorStreamingNotSupported, Err: nil} + respondError(w, &err, "streaming is not supported") + return + } + + for { + select { + case log := <-client.Logs: + var buf bytes.Buffer + enc := json.NewEncoder(&buf) + enc.Encode(log) + fmt.Fprintf(w, "data: %v\n\n", buf.String()) + flusher.Flush() + case <-client.Done: + zap.S().Debug("done!") + return + case err := <-client.Error: + zap.S().Error("error occured!", err) + return + } + } +} + +func (aH *APIHandler) logAggregate(w http.ResponseWriter, r *http.Request) { + params, err := logs.ParseLogAggregateParams(r) + if err != nil { + apiErr := &model.ApiError{Typ: model.ErrorBadData, Err: err} + respondError(w, apiErr, "Incorrect params") + return + } + + res, apiErr := (*aH.reader).AggregateLogs(r.Context(), params) + if apiErr != nil { + respondError(w, apiErr, "Failed to fetch logs aggregate from the DB") + return + } + aH.writeJSON(w, r, res) +} diff --git a/pkg/query-service/app/logs/parser.go b/pkg/query-service/app/logs/parser.go new file mode 100644 index 0000000000..53b168b17f --- /dev/null +++ b/pkg/query-service/app/logs/parser.go @@ -0,0 +1,320 @@ +package logs + +import ( + "fmt" + "net/http" + "regexp" + "strconv" + "strings" + + "go.signoz.io/query-service/model" +) + +var operatorMapping = map[string]string{ + "lt": "<", + "gt": ">", + "lte": "<=", + "gte": ">=", + "in": "IN", + "nin": "NOT IN", + "contains": "ILIKE", + "ncontains": "NOT ILIKE", +} + +const ( + AND = "and" + OR = "or" + ORDER = "order" + ORDER_BY = "orderBy" + TIMESTAMP_START = "timestampStart" + TIMESTAMP_END = "timestampEnd" + IdGt = "idGt" + IdLT = "idLt" + TIMESTAMP = "timestamp" + ASC = "asc" + DESC = "desc" +) + +var tokenRegex, _ = regexp.Compile(`(?i)(and( )*?|or( )*?)?(([\w.-]+ (in|nin) \([^(]+\))|([\w.]+ (gt|lt|gte|lte) (')?[\S]+(')?)|([\w.]+ (contains|ncontains)) '[^']+')`) +var operatorRegex, _ = regexp.Compile(`(?i)(?: )(in|nin|gt|lt|gte|lte|contains|ncontains)(?: )`) + +func ParseLogFilterParams(r *http.Request) (*model.LogsFilterParams, error) { + res := model.LogsFilterParams{ + Limit: 30, + OrderBy: "timestamp", + Order: "desc", + } + var err error + params := r.URL.Query() + if val, ok := params["limit"]; ok { + res.Limit, err = strconv.Atoi(val[0]) + if err != nil { + return nil, err + } + } + if val, ok := params[ORDER_BY]; ok { + res.OrderBy = val[0] + } + if val, ok := params[ORDER]; ok { + res.Order = val[0] + } + if val, ok := params["q"]; ok { + res.Query = val[0] + } + if val, ok := params[TIMESTAMP_START]; ok { + ts, err := strconv.Atoi(val[0]) + if err != nil { + return nil, err + } + res.TimestampStart = uint64(ts) + } + if val, ok := params[TIMESTAMP_END]; ok { + ts, err := strconv.Atoi(val[0]) + if err != nil { + return nil, err + } + res.TimestampEnd = uint64(ts) + } + if val, ok := params[IdGt]; ok { + res.IdGt = val[0] + } + if val, ok := params[IdLT]; ok { + res.IdLT = val[0] + } + return &res, nil +} + +func ParseLiveTailFilterParams(r *http.Request) (*model.LogsFilterParams, error) { + res := model.LogsFilterParams{} + params := r.URL.Query() + if val, ok := params["q"]; ok { + res.Query = val[0] + } + if val, ok := params[TIMESTAMP_START]; ok { + ts, err := strconv.Atoi(val[0]) + if err != nil { + return nil, err + } + res.TimestampStart = uint64(ts) + } + if val, ok := params[IdGt]; ok { + res.IdGt = val[0] + } + return &res, nil +} + +func ParseLogAggregateParams(r *http.Request) (*model.LogsAggregateParams, error) { + res := model.LogsAggregateParams{} + params := r.URL.Query() + if val, ok := params[TIMESTAMP_START]; ok { + ts, err := strconv.Atoi(val[0]) + if err != nil { + return nil, err + } + res.TimestampStart = uint64(ts) + } else { + return nil, fmt.Errorf("timestampStart is required") + } + if val, ok := params[TIMESTAMP_END]; ok { + ts, err := strconv.Atoi(val[0]) + if err != nil { + return nil, err + } + res.TimestampEnd = uint64(ts) + } else { + return nil, fmt.Errorf("timestampEnd is required") + } + + if val, ok := params["q"]; ok { + res.Query = val[0] + } + + if val, ok := params["groupBy"]; ok { + res.GroupBy = val[0] + } + + if val, ok := params["function"]; ok { + res.Function = val[0] + } + + if val, ok := params["step"]; ok { + step, err := strconv.Atoi(val[0]) + if err != nil { + return nil, err + } + res.StepSeconds = step + } else { + return nil, fmt.Errorf("step is required") + } + return &res, nil +} + +func parseLogQuery(query string) ([]string, error) { + sqlQueryTokens := []string{} + filterTokens := tokenRegex.FindAllString(query, -1) + + if len(filterTokens) == 0 { + sqlQueryTokens = append(sqlQueryTokens, fmt.Sprintf("body ILIKE '%%%s%%' ", query)) + return sqlQueryTokens, nil + } + + // replace and check if there is something that is lying around + if len(strings.TrimSpace(tokenRegex.ReplaceAllString(query, ""))) > 0 { + return nil, fmt.Errorf("failed to parse query, contains unknown tokens") + } + + for _, v := range filterTokens { + op := strings.TrimSpace(operatorRegex.FindString(v)) + opLower := strings.ToLower(op) + + if opLower == "contains" || opLower == "ncontains" { + searchString := strings.TrimSpace(strings.Split(v, op)[1]) + + operatorRemovedTokens := strings.Split(operatorRegex.ReplaceAllString(v, " "), " ") + searchCol := strings.ToLower(operatorRemovedTokens[0]) + if searchCol == AND || searchCol == OR { + searchCol = strings.ToLower(operatorRemovedTokens[1]) + } + col := searchCol + if strings.ToLower(searchCol) == "fulltext" { + col = "body" + } + + f := fmt.Sprintf(`%s %s '%%%s%%' `, col, operatorMapping[opLower], searchString[1:len(searchString)-1]) + if strings.HasPrefix(strings.ToLower(v), AND) { + f = "AND " + f + } else if strings.HasPrefix(strings.ToLower(v), OR) { + f = "OR " + f + } + sqlQueryTokens = append(sqlQueryTokens, f) + } else { + symbol := operatorMapping[strings.ToLower(op)] + sqlQueryTokens = append(sqlQueryTokens, strings.Replace(v, " "+op+" ", " "+symbol+" ", 1)+" ") + } + } + + return sqlQueryTokens, nil +} + +func parseColumn(s string) (*string, error) { + s = strings.ToLower(s) + + colName := "" + + // if has and/or as prefix + filter := strings.Split(s, " ") + if len(filter) < 3 { + return nil, fmt.Errorf("incorrect filter") + } + + if strings.HasPrefix(s, AND) || strings.HasPrefix(s, OR) { + colName = filter[1] + } else { + colName = filter[0] + } + + return &colName, nil +} + +func arrayToMap(fields []model.LogField) map[string]model.LogField { + res := map[string]model.LogField{} + for _, field := range fields { + res[field.Name] = field + } + return res +} + +func replaceInterestingFields(allFields *model.GetFieldsResponse, queryTokens []string) ([]string, error) { + // check if cols + selectedFieldsLookup := arrayToMap(allFields.Selected) + interestingFieldLookup := arrayToMap(allFields.Interesting) + + for index := 0; index < len(queryTokens); index++ { + queryToken := queryTokens[index] + col, err := parseColumn(queryToken) + if err != nil { + return nil, err + } + + sqlColName := *col + if _, ok := selectedFieldsLookup[*col]; !ok && *col != "body" { + if field, ok := interestingFieldLookup[*col]; ok { + sqlColName = fmt.Sprintf("%s_%s_value[indexOf(%s_%s_key, '%s')]", field.Type, strings.ToLower(field.DataType), field.Type, strings.ToLower(field.DataType), *col) + } else if strings.Compare(strings.ToLower(*col), "fulltext") != 0 { + return nil, fmt.Errorf("field not found for filtering") + } + } + queryTokens[index] = strings.Replace(queryToken, *col, sqlColName, 1) + } + return queryTokens, nil +} + +func CheckIfPrevousPaginateAndModifyOrder(params *model.LogsFilterParams) (isPaginatePrevious bool) { + if params.IdGt != "" && params.OrderBy == TIMESTAMP && params.Order == DESC { + isPaginatePrevious = true + params.Order = ASC + } else if params.IdLT != "" && params.OrderBy == TIMESTAMP && params.Order == ASC { + isPaginatePrevious = true + params.Order = DESC + } + return +} + +func GenerateSQLWhere(allFields *model.GetFieldsResponse, params *model.LogsFilterParams) (string, error) { + var tokens []string + var err error + var sqlWhere string + if params.Query != "" { + tokens, err = parseLogQuery(params.Query) + if err != nil { + return sqlWhere, err + } + } + + tokens, err = replaceInterestingFields(allFields, tokens) + if err != nil { + return sqlWhere, err + } + + filterTokens := []string{} + if params.TimestampStart != 0 { + filter := fmt.Sprintf("timestamp >= '%d' ", params.TimestampStart) + if len(filterTokens) > 0 { + filter = "and " + filter + } + filterTokens = append(filterTokens, filter) + } + if params.TimestampEnd != 0 { + filter := fmt.Sprintf("timestamp <= '%d' ", params.TimestampEnd) + if len(filterTokens) > 0 { + filter = "and " + filter + } + filterTokens = append(filterTokens, filter) + } + if params.IdGt != "" { + filter := fmt.Sprintf("id > '%v' ", params.IdGt) + if len(filterTokens) > 0 { + filter = "and " + filter + } + filterTokens = append(filterTokens, filter) + } + if params.IdLT != "" { + filter := fmt.Sprintf("id < '%v' ", params.IdLT) + if len(filterTokens) > 0 { + filter = "and " + filter + } + filterTokens = append(filterTokens, filter) + } + + if len(filterTokens) > 0 { + if len(tokens) > 0 { + tokens[0] = fmt.Sprintf("and %s", tokens[0]) + } + filterTokens = append(filterTokens, tokens...) + tokens = filterTokens + } + + sqlWhere = strings.Join(tokens, "") + + return sqlWhere, nil +} diff --git a/pkg/query-service/app/logs/parser_test.go b/pkg/query-service/app/logs/parser_test.go new file mode 100644 index 0000000000..ff47632a4b --- /dev/null +++ b/pkg/query-service/app/logs/parser_test.go @@ -0,0 +1,302 @@ +package logs + +import ( + "testing" + + . "github.com/smartystreets/goconvey/convey" + "go.signoz.io/query-service/model" +) + +var correctQueriesTest = []struct { + Name string + InputQuery string + WantSqlTokens []string +}{ + { + `filter with fulltext`, + `OPERATION in ('bcd') AND FULLTEXT contains 'searchstring'`, + []string{`OPERATION IN ('bcd') `, `AND body ILIKE '%searchstring%' `}, + }, + { + `fulltext`, + `searchstring`, + []string{`body ILIKE '%searchstring%' `}, + }, + { + `fulltext with quotes and space`, + `FULLTEXT contains 'Hello, "World"'`, + []string{`body ILIKE '%Hello, "World"%' `}, + }, + { + `contains search with a different attributes`, + `resource contains 'Hello, "World"'`, + []string{`resource ILIKE '%Hello, "World"%' `}, + }, + { + `more than one continas`, + `resource contains 'Hello, "World"' and myresource contains 'abcd'`, + []string{`resource ILIKE '%Hello, "World"%' `, `AND myresource ILIKE '%abcd%' `}, + }, + { + "contains with or", + `id in ('2CkBCauK8m3nkyKR19YhCw6WbdY') or fulltext contains 'OPTIONS /api/v1/logs'`, + []string{`id IN ('2CkBCauK8m3nkyKR19YhCw6WbdY') `, `OR body ILIKE '%OPTIONS /api/v1/logs%' `}, + }, + { + "mixing and or", + `id in ('2CkBCauK8m3nkyKR19YhCw6WbdY') and id in ('2CkBCauK8m3nkyKR19YhCw6WbdY','2CkBCauK8m3nkyKR19YhCw6WbdY') or fulltext contains 'OPTIONS /api/v1/logs'`, + []string{`id IN ('2CkBCauK8m3nkyKR19YhCw6WbdY') `, `and id IN ('2CkBCauK8m3nkyKR19YhCw6WbdY','2CkBCauK8m3nkyKR19YhCw6WbdY') `, `OR body ILIKE '%OPTIONS /api/v1/logs%' `}, + }, + { + `filters with lt,gt,lte,gte operators`, + `id lt 100 and id gt 50 and code lte 500 and code gte 400`, + []string{`id < 100 `, `and id > 50 `, `and code <= 500 `, `and code >= 400 `}, + }, + { + `filters with lt,gt,lte,gte operators seprated by OR`, + `id lt 100 or id gt 50 or code lte 500 or code gte 400`, + []string{`id < 100 `, `or id > 50 `, `or code <= 500 `, `or code >= 400 `}, + }, + { + `filter with number`, + `status gte 200 AND FULLTEXT ncontains '"key"'`, + []string{`status >= 200 `, `AND body NOT ILIKE '%"key"%' `}, + }, + { + `characters inside string`, + `service NIN ('name > 100') AND length gt 100`, + []string{`service NOT IN ('name > 100') `, `AND length > 100 `}, + }, + { + `fulltext with in`, + `key in 2`, + []string{`body ILIKE '%key in 2%' `}, + }, + { + `not valid fulltext but a filter`, + `key in (2,3)`, + []string{`key IN (2,3) `}, + }, + { + `filters with extra spaces`, + `service IN ('name > 100') AND length gt 100`, + []string{`service IN ('name > 100') `, `AND length > 100 `}, + }, + { + `filters with special characters in key name`, + `id.userid in (100) and id_userid gt 50`, + []string{`id.userid IN (100) `, `and id_userid > 50 `}, + }, +} + +func TestParseLogQueryCorrect(t *testing.T) { + for _, test := range correctQueriesTest { + Convey(test.Name, t, func() { + query, _ := parseLogQuery(test.InputQuery) + + So(query, ShouldResemble, test.WantSqlTokens) + }) + } +} + +var incorrectQueries = []struct { + Name string + Query string +}{ + { + "filter without a key", + "OPERATION in ('bcd') AND 'abcd' FULLTEXT contains 'helloxyz'", + }, + { + "fulltext without fulltext keyword", + "OPERATION in ('bcd') AND 'searchstring'", + }, + { + "fulltext in the beginning without keyword", + "'searchstring and OPERATION in ('bcd')", + }, +} + +func TestParseLogQueryInCorrect(t *testing.T) { + for _, test := range incorrectQueries { + Convey(test.Name, t, func() { + _, err := parseLogQuery(test.Query) + So(err, ShouldBeError) + }) + } +} + +var parseCorrectColumns = []struct { + Name string + Filter string + Column string +}{ + { + "column with IN operator", + "id.userid IN (100) ", + "id.userid", + }, + { + "column with NOT IN operator", + "service NOT IN ('name > 100') ", + "service", + }, + { + "column with > operator", + "and id_userid > 50 ", + "id_userid", + }, + { + "column with < operator", + "and id_userid < 50 ", + "id_userid", + }, + { + "column with <= operator", + "and id_userid <= 50 ", + "id_userid", + }, + { + "column with >= operator", + "and id_userid >= 50 ", + "id_userid", + }, + { + "column with ilike", + `AND body ILIKE '%searchstring%' `, + "body", + }, + { + "column with not ilike", + `AND body ILIKE '%searchstring%' `, + "body", + }, +} + +func TestParseColumn(t *testing.T) { + for _, test := range parseCorrectColumns { + Convey(test.Name, t, func() { + column, _ := parseColumn(test.Filter) + So(*column, ShouldEqual, test.Column) + }) + } +} + +func TestReplaceInterestingFields(t *testing.T) { + queryTokens := []string{"id.userid IN (100) ", "and id_key >= 50 ", `AND body ILIKE '%searchstring%'`} + allFields := model.GetFieldsResponse{ + Selected: []model.LogField{ + model.LogField{ + Name: "id_key", + DataType: "int64", + Type: "attributes", + }, + }, + Interesting: []model.LogField{ + model.LogField{ + Name: "id.userid", + DataType: "int64", + Type: "attributes", + }, + }, + } + + expectedTokens := []string{"attributes_int64_value[indexOf(attributes_int64_key, 'id.userid')] IN (100) ", "and id_key >= 50 ", `AND body ILIKE '%searchstring%'`} + Convey("testInterestingFields", t, func() { + tokens, _ := replaceInterestingFields(&allFields, queryTokens) + So(tokens, ShouldResemble, expectedTokens) + }) +} + +var previousPaginateTestCases = []struct { + Name string + Filter model.LogsFilterParams + IsPaginatePrev bool + Order string +}{ + { + Name: "empty", + Filter: model.LogsFilterParams{}, + IsPaginatePrev: false, + }, + { + Name: "next ordery by asc", + Filter: model.LogsFilterParams{ + OrderBy: TIMESTAMP, + Order: ASC, + IdGt: "myid", + }, + IsPaginatePrev: false, + Order: ASC, + }, + { + Name: "next ordery by desc", + Filter: model.LogsFilterParams{ + OrderBy: TIMESTAMP, + Order: DESC, + IdLT: "myid", + }, + IsPaginatePrev: false, + Order: DESC, + }, + { + Name: "prev ordery by desc", + Filter: model.LogsFilterParams{ + OrderBy: TIMESTAMP, + Order: DESC, + IdGt: "myid", + }, + IsPaginatePrev: true, + Order: ASC, + }, + { + Name: "prev ordery by asc", + Filter: model.LogsFilterParams{ + OrderBy: TIMESTAMP, + Order: ASC, + IdLT: "myid", + }, + IsPaginatePrev: true, + Order: DESC, + }, +} + +func TestCheckIfPrevousPaginateAndModifyOrder(t *testing.T) { + for _, test := range previousPaginateTestCases { + Convey(test.Name, t, func() { + isPrevPaginate := CheckIfPrevousPaginateAndModifyOrder(&test.Filter) + So(isPrevPaginate, ShouldEqual, test.IsPaginatePrev) + So(test.Order, ShouldEqual, test.Filter.Order) + }) + } +} + +func TestGenerateSQLQuery(t *testing.T) { + allFields := model.GetFieldsResponse{ + Selected: []model.LogField{ + { + Name: "id", + DataType: "int64", + Type: "attributes", + }, + }, + Interesting: []model.LogField{ + { + Name: "code", + DataType: "int64", + Type: "attributes", + }, + }, + } + + query := "id lt 100 and id gt 50 and code lte 500 and code gte 400" + tsStart := uint64(1657689292000) + tsEnd := uint64(1657689294000) + idStart := "2BsKLKv8cZrLCn6rkOcRGkdjBdM" + idEnd := "2BsKG6tRpFWjYMcWsAGKfSxoQdU" + sqlWhere := "timestamp >= '1657689292000' and timestamp <= '1657689294000' and id > '2BsKLKv8cZrLCn6rkOcRGkdjBdM' and id < '2BsKG6tRpFWjYMcWsAGKfSxoQdU' and id < 100 and id > 50 and attributes_int64_value[indexOf(attributes_int64_key, 'code')] <= 500 and attributes_int64_value[indexOf(attributes_int64_key, 'code')] >= 400 " + Convey("testGenerateSQL", t, func() { + res, _ := GenerateSQLWhere(&allFields, &model.LogsFilterParams{Query: query, TimestampStart: tsStart, TimestampEnd: tsEnd, IdGt: idStart, IdLT: idEnd}) + So(res, ShouldEqual, sqlWhere) + }) +} diff --git a/pkg/query-service/app/logs/validator.go b/pkg/query-service/app/logs/validator.go new file mode 100644 index 0000000000..0a27a11b15 --- /dev/null +++ b/pkg/query-service/app/logs/validator.go @@ -0,0 +1,40 @@ +package logs + +import ( + "fmt" + "regexp" + + "go.signoz.io/query-service/constants" + "go.signoz.io/query-service/model" +) + +func ValidateUpdateFieldPayload(field *model.UpdateField) error { + if field.Name == "" { + return fmt.Errorf("name cannot be empty") + } + if field.Type == "" { + return fmt.Errorf("type cannot be empty") + } + if field.DataType == "" { + return fmt.Errorf("dataType cannot be empty") + } + + matched, err := regexp.MatchString(fmt.Sprintf("^(%s|%s|%s)$", constants.Static, constants.Attributes, constants.Resources), field.Type) + if err != nil { + return err + } + if !matched { + return fmt.Errorf("type %s not supported", field.Type) + } + + if field.IndexType != "" { + matched, err := regexp.MatchString(`^(minmax|set\([0-9]\)|bloom_filter\((0?.?[0-9]+|1)\)|tokenbf_v1\([0-9]+,[0-9]+,[0-9]+\)|ngrambf_v1\([0-9]+,[0-9]+,[0-9]+,[0-9]+\))$`, field.IndexType) + if err != nil { + return err + } + if !matched { + return fmt.Errorf("index type %s not supported", field.IndexType) + } + } + return nil +} diff --git a/pkg/query-service/app/parser.go b/pkg/query-service/app/parser.go index 6991a03156..e7291e67a9 100644 --- a/pkg/query-service/app/parser.go +++ b/pkg/query-service/app/parser.go @@ -590,8 +590,8 @@ func parseTTLParams(r *http.Request) (*model.TTLParams, error) { } // Validate the type parameter - if typeTTL != constants.TraceTTL && typeTTL != constants.MetricsTTL { - return nil, fmt.Errorf("type param should be metrics|traces, got %v", typeTTL) + if typeTTL != constants.TraceTTL && typeTTL != constants.MetricsTTL && typeTTL != constants.LogsTTL { + return nil, fmt.Errorf("type param should be metrics|traces|logs, got %v", typeTTL) } // Validate the TTL duration. @@ -629,8 +629,8 @@ func parseGetTTL(r *http.Request) (*model.GetTTLParams, error) { return nil, fmt.Errorf("type param cannot be empty from the query") } else { // Validate the type parameter - if typeTTL != constants.TraceTTL && typeTTL != constants.MetricsTTL { - return nil, fmt.Errorf("type param should be metrics|traces, got %v", typeTTL) + if typeTTL != constants.TraceTTL && typeTTL != constants.MetricsTTL && typeTTL != constants.LogsTTL { + return nil, fmt.Errorf("type param should be metrics|traces|logs, got %v", typeTTL) } } diff --git a/pkg/query-service/app/server.go b/pkg/query-service/app/server.go index 845b75e9c4..dd00aea804 100644 --- a/pkg/query-service/app/server.go +++ b/pkg/query-service/app/server.go @@ -162,11 +162,12 @@ func (s *Server) createPublicServer(api *APIHandler) (*http.Server, error) { api.RegisterRoutes(r) api.RegisterMetricsRoutes(r) + api.RegisterLogsRoutes(r) c := cors.New(cors.Options{ AllowedOrigins: []string{"*"}, - AllowedMethods: []string{"GET", "DELETE", "POST", "PUT", "PATCH"}, - AllowedHeaders: []string{"Accept", "Authorization", "Content-Type"}, + AllowedMethods: []string{"GET", "DELETE", "POST", "PUT", "PATCH", "OPTIONS"}, + AllowedHeaders: []string{"Accept", "Authorization", "Content-Type", "cache-control"}, }) handler := c.Handler(r) @@ -217,6 +218,11 @@ func (lrw *loggingResponseWriter) WriteHeader(code int) { lrw.ResponseWriter.WriteHeader(code) } +// Flush implements the http.Flush interface. +func (lrw *loggingResponseWriter) Flush() { + lrw.ResponseWriter.(http.Flusher).Flush() +} + func (s *Server) analyticsMiddleware(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { route := mux.CurrentRoute(r) @@ -236,8 +242,14 @@ func (s *Server) analyticsMiddleware(next http.Handler) http.Handler { func setTimeoutMiddleware(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - ctx, cancel := context.WithTimeout(r.Context(), constants.ContextTimeout*time.Second) - defer cancel() + ctx := r.Context() + var cancel context.CancelFunc + // check if route is not excluded + url := r.URL.Path + if _, ok := constants.TimeoutExcludedRoutes[url]; !ok { + ctx, cancel = context.WithTimeout(r.Context(), constants.ContextTimeout*time.Second) + defer cancel() + } r = r.WithContext(ctx) next.ServeHTTP(w, r) diff --git a/pkg/query-service/constants/constants.go b/pkg/query-service/constants/constants.go index 69ed855262..2e01c976cb 100644 --- a/pkg/query-service/constants/constants.go +++ b/pkg/query-service/constants/constants.go @@ -3,6 +3,8 @@ package constants import ( "os" "strconv" + + "go.signoz.io/query-service/model" ) const ( @@ -24,6 +26,7 @@ func IsTelemetryEnabled() bool { const TraceTTL = "traces" const MetricsTTL = "metrics" +const LogsTTL = "logs" func GetAlertManagerApiPrefix() string { if os.Getenv("ALERTMANAGER_API_PREFIX") != "" { @@ -38,35 +41,40 @@ var AmChannelApiPath = GetOrDefaultEnv("ALERTMANAGER_API_CHANNEL_PATH", "v1/rout var RELATIONAL_DATASOURCE_PATH = GetOrDefaultEnv("SIGNOZ_LOCAL_DB_PATH", "/var/lib/signoz/signoz.db") const ( - ServiceName = "serviceName" - HttpRoute = "httpRoute" - HttpCode = "httpCode" - HttpHost = "httpHost" - HttpUrl = "httpUrl" - HttpMethod = "httpMethod" - Component = "component" - OperationDB = "name" - OperationRequest = "operation" - Status = "status" - Duration = "duration" - DBName = "dbName" - DBOperation = "dbOperation" - DBSystem = "dbSystem" - MsgSystem = "msgSystem" - MsgOperation = "msgOperation" - Timestamp = "timestamp" - RPCMethod = "rpcMethod" - ResponseStatusCode = "responseStatusCode" - Descending = "descending" - Ascending = "ascending" - ContextTimeout = 60 // seconds - StatusPending = "pending" - StatusFailed = "failed" - StatusSuccess = "success" - ExceptionType = "exceptionType" - ExceptionCount = "exceptionCount" - LastSeen = "lastSeen" - FirstSeen = "firstSeen" + ServiceName = "serviceName" + HttpRoute = "httpRoute" + HttpCode = "httpCode" + HttpHost = "httpHost" + HttpUrl = "httpUrl" + HttpMethod = "httpMethod" + Component = "component" + OperationDB = "name" + OperationRequest = "operation" + Status = "status" + Duration = "duration" + DBName = "dbName" + DBOperation = "dbOperation" + DBSystem = "dbSystem" + MsgSystem = "msgSystem" + MsgOperation = "msgOperation" + Timestamp = "timestamp" + RPCMethod = "rpcMethod" + ResponseStatusCode = "responseStatusCode" + Descending = "descending" + Ascending = "ascending" + ContextTimeout = 60 // seconds + StatusPending = "pending" + StatusFailed = "failed" + StatusSuccess = "success" + ExceptionType = "exceptionType" + ExceptionCount = "exceptionCount" + LastSeen = "lastSeen" + FirstSeen = "firstSeen" + Attributes = "attributes" + Resources = "resources" + Static = "static" + DefaultLogSkipIndexType = "bloom_filter(0.01)" + DefaultLogSkipIndexGranularity = 64 ) const ( SIGNOZ_METRIC_DBNAME = "signoz_metrics" @@ -74,6 +82,10 @@ const ( SIGNOZ_TIMESERIES_TABLENAME = "time_series_v2" ) +var TimeoutExcludedRoutes = map[string]bool{ + "/api/v1/logs/tail": true, +} + // alert related constants const ( // AlertHelpPage is used in case default alert repo url is not set @@ -87,3 +99,61 @@ func GetOrDefaultEnv(key string, fallback string) string { } return v } + +const ( + STRING = "String" + UINT32 = "UInt32" + LOWCARDINALITY_STRING = "LowCardinality(String)" + INT32 = "Int32" + UINT8 = "Uint8" +) + +var StaticInterestingLogFields = []model.LogField{ + { + Name: "trace_id", + DataType: STRING, + Type: Static, + }, + { + Name: "span_id", + DataType: STRING, + Type: Static, + }, + { + Name: "trace_flags", + DataType: UINT32, + Type: Static, + }, + { + Name: "severity_text", + DataType: LOWCARDINALITY_STRING, + Type: Static, + }, + { + Name: "severity_number", + DataType: UINT8, + Type: Static, + }, +} + +var StaticSelectedLogFields = []model.LogField{ + { + Name: "timestamp", + DataType: UINT32, + Type: Static, + }, + { + Name: "id", + DataType: STRING, + Type: Static, + }, +} + +const ( + LogsSQLSelect = "SELECT " + + "timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, body," + + "CAST((attributes_string_key, attributes_string_value), 'Map(String, String)') as attributes_string," + + "CAST((attributes_int64_key, attributes_int64_value), 'Map(String, Int64)') as attributes_int64," + + "CAST((attributes_float64_key, attributes_float64_value), 'Map(String, Float64)') as attributes_float64," + + "CAST((resources_string_key, resources_string_value), 'Map(String, String)') as resources_string " +) diff --git a/pkg/query-service/interfaces/interface.go b/pkg/query-service/interfaces/interface.go index 14bc4b5d63..a2dae6df01 100644 --- a/pkg/query-service/interfaces/interface.go +++ b/pkg/query-service/interfaces/interface.go @@ -60,6 +60,13 @@ type Reader interface { GetTimeSeriesInfo(ctx context.Context) (map[string]interface{}, error) GetSamplesInfoInLastHeartBeatInterval(ctx context.Context) (uint64, error) + // Logs + GetLogFields(ctx context.Context) (*model.GetFieldsResponse, *model.ApiError) + UpdateLogField(ctx context.Context, field *model.UpdateField) *model.ApiError + GetLogs(ctx context.Context, params *model.LogsFilterParams) (*[]model.GetLogsResponse, *model.ApiError) + TailLogs(ctx context.Context, client *model.LogsTailClient) + AggregateLogs(ctx context.Context, params *model.LogsAggregateParams) (*model.GetLogsAggregatesResponse, *model.ApiError) + // Connection needed for rules, not ideal but required GetConn() clickhouse.Conn } diff --git a/pkg/query-service/model/queryParams.go b/pkg/query-service/model/queryParams.go index a215fb8d9b..33827d63c1 100644 --- a/pkg/query-service/model/queryParams.go +++ b/pkg/query-service/model/queryParams.go @@ -320,3 +320,32 @@ type FilterSet struct { Operator string `json:"op,omitempty"` Items []FilterItem `json:"items"` } + +type UpdateField struct { + Name string `json:"name"` + DataType string `json:"dataType"` + Type string `json:"type"` + Selected bool `json:"selected"` + IndexType string `json:"index"` + IndexGranularity int `json:"indexGranularity"` +} + +type LogsFilterParams struct { + Limit int `json:"limit"` + OrderBy string `json:"orderBy"` + Order string `json:"order"` + Query string `json:"q"` + TimestampStart uint64 `json:"timestampStart"` + TimestampEnd uint64 `json:"timestampEnd"` + IdGt string `json:"idGt"` + IdLT string `json:"idLt"` +} + +type LogsAggregateParams struct { + Query string `json:"q"` + TimestampStart uint64 `json:"timestampStart"` + TimestampEnd uint64 `json:"timestampEnd"` + GroupBy string `json:"groupBy"` + Function string `json:"function"` + StepSeconds int `json:"step"` +} diff --git a/pkg/query-service/model/response.go b/pkg/query-service/model/response.go index 06ee6e6e2f..54ddbde620 100644 --- a/pkg/query-service/model/response.go +++ b/pkg/query-service/model/response.go @@ -19,18 +19,19 @@ type ApiError struct { type ErrorType string const ( - ErrorNone ErrorType = "" - ErrorTimeout ErrorType = "timeout" - ErrorCanceled ErrorType = "canceled" - ErrorExec ErrorType = "execution" - ErrorBadData ErrorType = "bad_data" - ErrorInternal ErrorType = "internal" - ErrorUnavailable ErrorType = "unavailable" - ErrorNotFound ErrorType = "not_found" - ErrorNotImplemented ErrorType = "not_implemented" - ErrorUnauthorized ErrorType = "unauthorized" - ErrorForbidden ErrorType = "forbidden" - ErrorConflict ErrorType = "conflict" + ErrorNone ErrorType = "" + ErrorTimeout ErrorType = "timeout" + ErrorCanceled ErrorType = "canceled" + ErrorExec ErrorType = "execution" + ErrorBadData ErrorType = "bad_data" + ErrorInternal ErrorType = "internal" + ErrorUnavailable ErrorType = "unavailable" + ErrorNotFound ErrorType = "not_found" + ErrorNotImplemented ErrorType = "not_implemented" + ErrorUnauthorized ErrorType = "unauthorized" + ErrorForbidden ErrorType = "forbidden" + ErrorConflict ErrorType = "conflict" + ErrorStreamingNotSupported ErrorType = "streaming is not supported" ) type QueryDataV2 struct { @@ -276,10 +277,14 @@ type GetTTLResponseItem struct { MetricsMoveTime int `json:"metrics_move_ttl_duration_hrs,omitempty"` TracesTime int `json:"traces_ttl_duration_hrs,omitempty"` TracesMoveTime int `json:"traces_move_ttl_duration_hrs,omitempty"` + LogsTime int `json:"logs_ttl_duration_hrs,omitempty"` + LogsMoveTime int `json:"logs_move_ttl_duration_hrs,omitempty"` ExpectedMetricsTime int `json:"expected_metrics_ttl_duration_hrs,omitempty"` ExpectedMetricsMoveTime int `json:"expected_metrics_move_ttl_duration_hrs,omitempty"` ExpectedTracesTime int `json:"expected_traces_ttl_duration_hrs,omitempty"` ExpectedTracesMoveTime int `json:"expected_traces_move_ttl_duration_hrs,omitempty"` + ExpectedLogsTime int `json:"expected_logs_ttl_duration_hrs,omitempty"` + ExpectedLogsMoveTime int `json:"expected_logs_move_ttl_duration_hrs,omitempty"` Status string `json:"status"` } @@ -407,6 +412,60 @@ func (p *MetricPoint) MarshalJSON() ([]byte, error) { return json.Marshal([...]interface{}{float64(p.Timestamp) / 1000, v}) } +type ShowCreateTableStatement struct { + Statement string `json:"statement" ch:"statement"` +} + +type LogField struct { + Name string `json:"name" ch:"name"` + DataType string `json:"dataType" ch:"datatype"` + Type string `json:"type"` +} + +type GetFieldsResponse struct { + Selected []LogField `json:"selected"` + Interesting []LogField `json:"interesting"` +} + +type GetLogsResponse struct { + Timestamp uint64 `json:"timestamp" ch:"timestamp"` + ID string `json:"id" ch:"id"` + TraceID string `json:"traceId" ch:"trace_id"` + SpanID string `json:"spanId" ch:"span_id"` + TraceFlags uint32 `json:"traceFlags" ch:"trace_flags"` + SeverityText string `json:"severityText" ch:"severity_text"` + SeverityNumber uint8 `json:"severityNumber" ch:"severity_number"` + Body string `json:"body" ch:"body"` + Resources_string map[string]string `json:"resourcesString" ch:"resources_string"` + Attributes_string map[string]string `json:"attributesString" ch:"attributes_string"` + Attributes_int64 map[string]int64 `json:"attributesInt" ch:"attributes_int64"` + Attributes_float64 map[string]float64 `json:"attributesFloat" ch:"attributes_float64"` +} + +type LogsTailClient struct { + Name string + Logs chan *GetLogsResponse + Done chan *bool + Error chan error + Filter LogsFilterParams +} + +type GetLogsAggregatesResponse struct { + Items map[int64]LogsAggregatesResponseItem `json:"items"` +} + +type LogsAggregatesResponseItem struct { + Timestamp int64 `json:"timestamp,omitempty" ` + Value interface{} `json:"value,omitempty"` + GroupBy map[string]interface{} `json:"groupBy,omitempty"` +} + +type LogsAggregatesDBResponseItem struct { + Timestamp int64 `ch:"time"` + Value float64 `ch:"value"` + GroupBy string `ch:"groupBy"` +} + // MarshalJSON implements json.Marshaler. func (s *ServiceItem) MarshalJSON() ([]byte, error) { // If a service didn't not send any data in the last interval duration