diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index 368ecf7944..830db0043e 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -2825,15 +2825,13 @@ func (r *ClickHouseReader) UpdateLogField(ctx context.Context, field *model.Upda } // create the index - if field.IndexType == nil { - iType := constants.DefaultLogSkipIndexType - field.IndexType = &iType + if field.IndexType == "" { + field.IndexType = constants.DefaultLogSkipIndexType } - if field.IndexGranularity == nil { - granularity := constants.DefaultLogSkipIndexGranularity - field.IndexGranularity = &granularity + if field.IndexGranularity == 0 { + field.IndexGranularity = constants.DefaultLogSkipIndexGranularity } - query := fmt.Sprintf("ALTER TABLE %s.%s ADD INDEX IF NOT EXISTS %s_idx (%s) TYPE %s GRANULARITY %d", r.logsDB, r.logsTable, field.Name, field.Name, *field.IndexType, *field.IndexGranularity) + query := fmt.Sprintf("ALTER TABLE %s.%s ADD INDEX IF NOT EXISTS %s_idx (%s) TYPE %s GRANULARITY %d", r.logsDB, r.logsTable, field.Name, field.Name, field.IndexType, field.IndexGranularity) err := r.db.Exec(ctx, query) if err != nil { return &model.ApiError{Err: err, Typ: model.ErrorInternal} @@ -2897,13 +2895,13 @@ func (r *ClickHouseReader) TailLogs(ctx context.Context, client *model.LogsTailC query := fmt.Sprintf("%s from %s.%s", constants.LogsSQLSelect, r.logsDB, r.logsTable) tsStart := uint64(time.Now().UnixNano()) - if client.Filter.TimestampStart != nil { - tsStart = *client.Filter.TimestampStart + if client.Filter.TimestampStart != 0 { + tsStart = client.Filter.TimestampStart } var idStart string - if client.Filter.IdStart != nil { - idStart = *client.Filter.IdStart + if client.Filter.IdStart != "" { + idStart = client.Filter.IdStart } for { @@ -2954,14 +2952,9 @@ func (r *ClickHouseReader) TailLogs(ctx context.Context, client *model.LogsTailC func (r *ClickHouseReader) AggregateLogs(ctx context.Context, params *model.LogsAggregateParams) (*model.GetLogsAggregatesResponse, *model.ApiError) { logAggregatesDBResponseItems := []model.LogsAggregatesDBResponseItem{} - groupBy := "" - if params.GroupBy != nil { - groupBy = *params.GroupBy - } - function := "toFloat64(count()) as value" - if params.Function != nil { - function = fmt.Sprintf("toFloat64(%s) as value", *params.Function) + if params.Function != "" { + function = fmt.Sprintf("toFloat64(%s) as value", params.Function) } fields, apiErr := r.GetLogFields(ctx) @@ -2977,22 +2970,22 @@ func (r *ClickHouseReader) AggregateLogs(ctx context.Context, params *model.Logs } query := "" - if groupBy != "" { + if params.GroupBy != "" { query = fmt.Sprintf("SELECT toInt64(toUnixTimestamp(toStartOfInterval(toDateTime(timestamp/1000000000), INTERVAL %d minute))*1000000000) as time, toString(%s) as groupBy, "+ "%s "+ "FROM %s.%s WHERE timestamp >= '%d' AND timestamp <= '%d' ", - *params.StepSeconds/60, groupBy, function, r.logsDB, r.logsTable, *params.TimestampStart, *params.TimestampEnd) + params.StepSeconds/60, params.GroupBy, function, r.logsDB, r.logsTable, params.TimestampStart, params.TimestampEnd) } else { query = fmt.Sprintf("SELECT toInt64(toUnixTimestamp(toStartOfInterval(toDateTime(timestamp/1000000000), INTERVAL %d minute))*1000000000) as time, "+ "%s "+ "FROM %s.%s WHERE timestamp >= '%d' AND timestamp <= '%d' ", - *params.StepSeconds/60, function, r.logsDB, r.logsTable, *params.TimestampStart, *params.TimestampEnd) + params.StepSeconds/60, function, r.logsDB, r.logsTable, params.TimestampStart, params.TimestampEnd) } if filterSql != "" { query += fmt.Sprintf(" AND %s ", filterSql) } - if groupBy != "" { - query += fmt.Sprintf("GROUP BY time, toString(%s) as groupBy ORDER BY time", groupBy) + if params.GroupBy != "" { + query += fmt.Sprintf("GROUP BY time, toString(%s) as groupBy ORDER BY time", params.GroupBy) } else { query += "GROUP BY time ORDER BY time" } @@ -3009,17 +3002,17 @@ func (r *ClickHouseReader) AggregateLogs(ctx context.Context, params *model.Logs for i := range logAggregatesDBResponseItems { if elem, ok := aggregateResponse.Items[int64(logAggregatesDBResponseItems[i].Timestamp)]; ok { - if groupBy != "" && logAggregatesDBResponseItems[i].GroupBy != "" { + if params.GroupBy != "" && logAggregatesDBResponseItems[i].GroupBy != "" { elem.GroupBy[logAggregatesDBResponseItems[i].GroupBy] = logAggregatesDBResponseItems[i].Value } aggregateResponse.Items[logAggregatesDBResponseItems[i].Timestamp] = elem } else { - if groupBy != "" && logAggregatesDBResponseItems[i].GroupBy != "" { + if params.GroupBy != "" && logAggregatesDBResponseItems[i].GroupBy != "" { aggregateResponse.Items[logAggregatesDBResponseItems[i].Timestamp] = model.LogsAggregatesResponseItem{ Timestamp: logAggregatesDBResponseItems[i].Timestamp, GroupBy: map[string]interface{}{logAggregatesDBResponseItems[i].GroupBy: logAggregatesDBResponseItems[i].Value}, } - } else if groupBy == "" { + } else if params.GroupBy == "" { aggregateResponse.Items[logAggregatesDBResponseItems[i].Timestamp] = model.LogsAggregatesResponseItem{ Timestamp: logAggregatesDBResponseItems[i].Timestamp, Value: logAggregatesDBResponseItems[i].Value, diff --git a/pkg/query-service/app/logs/parser.go b/pkg/query-service/app/logs/parser.go index 3a2a7dd1e7..fb819337ce 100644 --- a/pkg/query-service/app/logs/parser.go +++ b/pkg/query-service/app/logs/parser.go @@ -22,7 +22,13 @@ var operatorMapping = map[string]string{ } const ( - AND = "and" + AND = "and" + ORDER = "order" + ORDER_BY = "orderBy" + TIMESTAMP_START = "timestampStart" + TIMESTAMP_END = "timestampEnd" + IDSTART = "idStart" + IDEND = "idEnd" ) var tokenRegex, _ = regexp.Compile(`(?i)(and( )*?)?(([\w.-]+ (in|nin) \([\S ]+\))|([\w.]+ (gt|lt|gte|lte) (')?[\S]+(')?)|([\w.]+ (contains|ncontains)) (')?[\S ]+(')?)`) @@ -42,36 +48,34 @@ func ParseLogFilterParams(r *http.Request) (*model.LogsFilterParams, error) { return nil, err } } - if val, ok := params["orderBy"]; ok { + if val, ok := params[ORDER_BY]; ok { res.OrderBy = val[0] } - if val, ok := params["order"]; ok { + if val, ok := params[ORDER]; ok { res.Order = val[0] } if val, ok := params["q"]; ok { - res.Query = &val[0] + res.Query = val[0] } - if val, ok := params["timestampStart"]; ok { + if val, ok := params[TIMESTAMP_START]; ok { ts, err := strconv.Atoi(val[0]) if err != nil { return nil, err } - ts64 := uint64(ts) - res.TimestampStart = &ts64 + res.TimestampStart = uint64(ts) } - if val, ok := params["timestampEnd"]; ok { + if val, ok := params[TIMESTAMP_END]; ok { ts, err := strconv.Atoi(val[0]) if err != nil { return nil, err } - ts64 := uint64(ts) - res.TimestampEnd = &ts64 + res.TimestampEnd = uint64(ts) } - if val, ok := params["idStart"]; ok { - res.IdStart = &val[0] + if val, ok := params[IDSTART]; ok { + res.IdStart = val[0] } - if val, ok := params["idEnd"]; ok { - res.IdEnd = &val[0] + if val, ok := params[IDEND]; ok { + res.IdEnd = val[0] } return &res, nil } @@ -80,18 +84,17 @@ func ParseLiveTailFilterParams(r *http.Request) (*model.LogsFilterParams, error) res := model.LogsFilterParams{} params := r.URL.Query() if val, ok := params["q"]; ok { - res.Query = &val[0] + res.Query = val[0] } - if val, ok := params["timestampStart"]; ok { + if val, ok := params[TIMESTAMP_START]; ok { ts, err := strconv.Atoi(val[0]) if err != nil { return nil, err } - ts64 := uint64(ts) - res.TimestampStart = &ts64 + res.TimestampStart = uint64(ts) } - if val, ok := params["idStart"]; ok { - res.IdStart = &val[0] + if val, ok := params[IDSTART]; ok { + res.IdStart = val[0] } return &res, nil } @@ -99,37 +102,35 @@ func ParseLiveTailFilterParams(r *http.Request) (*model.LogsFilterParams, error) func ParseLogAggregateParams(r *http.Request) (*model.LogsAggregateParams, error) { res := model.LogsAggregateParams{} params := r.URL.Query() - if val, ok := params["timestampStart"]; ok { + if val, ok := params[TIMESTAMP_START]; ok { ts, err := strconv.Atoi(val[0]) if err != nil { return nil, err } - ts64 := uint64(ts) - res.TimestampStart = &ts64 + res.TimestampStart = uint64(ts) } else { return nil, fmt.Errorf("timestampStart is required") } - if val, ok := params["timestampEnd"]; ok { + if val, ok := params[TIMESTAMP_END]; ok { ts, err := strconv.Atoi(val[0]) if err != nil { return nil, err } - ts64 := uint64(ts) - res.TimestampEnd = &ts64 + res.TimestampEnd = uint64(ts) } else { return nil, fmt.Errorf("timestampEnd is required") } if val, ok := params["q"]; ok { - res.Query = &val[0] + res.Query = val[0] } if val, ok := params["groupBy"]; ok { - res.GroupBy = &val[0] + res.GroupBy = val[0] } if val, ok := params["function"]; ok { - res.Function = &val[0] + res.Function = val[0] } if val, ok := params["step"]; ok { @@ -137,7 +138,7 @@ func ParseLogAggregateParams(r *http.Request) (*model.LogsAggregateParams, error if err != nil { return nil, err } - res.StepSeconds = &step + res.StepSeconds = step } else { return nil, fmt.Errorf("step is required") } @@ -246,8 +247,8 @@ func GenerateSQLWhere(allFields *model.GetFieldsResponse, params *model.LogsFilt var tokens []string var err error var sqlWhere string - if params.Query != nil { - tokens, err = parseLogQuery(*params.Query) + if params.Query != "" { + tokens, err = parseLogQuery(params.Query) if err != nil { return sqlWhere, err } @@ -258,29 +259,29 @@ func GenerateSQLWhere(allFields *model.GetFieldsResponse, params *model.LogsFilt return sqlWhere, err } - if params.TimestampStart != nil { - filter := fmt.Sprintf("timestamp >= '%d' ", *params.TimestampStart) + if params.TimestampStart != 0 { + filter := fmt.Sprintf("timestamp >= '%d' ", params.TimestampStart) if len(tokens) > 0 { filter = "and " + filter } tokens = append(tokens, filter) } - if params.TimestampEnd != nil { - filter := fmt.Sprintf("timestamp <= '%d' ", *params.TimestampEnd) + if params.TimestampEnd != 0 { + filter := fmt.Sprintf("timestamp <= '%d' ", params.TimestampEnd) if len(tokens) > 0 { filter = "and " + filter } tokens = append(tokens, filter) } - if params.IdStart != nil { - filter := fmt.Sprintf("id > '%v' ", *params.IdStart) + if params.IdStart != "" { + filter := fmt.Sprintf("id > '%v' ", params.IdStart) if len(tokens) > 0 { filter = "and " + filter } tokens = append(tokens, filter) } - if params.IdEnd != nil { - filter := fmt.Sprintf("id < '%v' ", *params.IdEnd) + if params.IdEnd != "" { + filter := fmt.Sprintf("id < '%v' ", params.IdEnd) if len(tokens) > 0 { filter = "and " + filter } diff --git a/pkg/query-service/app/logs/parser_test.go b/pkg/query-service/app/logs/parser_test.go index d2855ef1f8..84984acb3b 100644 --- a/pkg/query-service/app/logs/parser_test.go +++ b/pkg/query-service/app/logs/parser_test.go @@ -213,7 +213,7 @@ func TestGenerateSQLQuery(t *testing.T) { idEnd := "2BsKG6tRpFWjYMcWsAGKfSxoQdU" sqlWhere := "id < 100 and id > 50 and attributes_int64_value[indexOf(attributes_int64_key, 'code')] <= 500 and attributes_int64_value[indexOf(attributes_int64_key, 'code')] >= 400 and timestamp >= '1657689292000' and timestamp <= '1657689294000' and id > '2BsKLKv8cZrLCn6rkOcRGkdjBdM' and id < '2BsKG6tRpFWjYMcWsAGKfSxoQdU' " Convey("testInterestingFields", t, func() { - res, _ := GenerateSQLWhere(&allFields, &model.LogsFilterParams{Query: &query, TimestampStart: &tsStart, TimestampEnd: &tsEnd, IdStart: &idStart, IdEnd: &idEnd}) + res, _ := GenerateSQLWhere(&allFields, &model.LogsFilterParams{Query: query, TimestampStart: tsStart, TimestampEnd: tsEnd, IdStart: idStart, IdEnd: idEnd}) So(res, ShouldEqual, sqlWhere) }) } diff --git a/pkg/query-service/app/logs/validator.go b/pkg/query-service/app/logs/validator.go index 9277bda047..0a27a11b15 100644 --- a/pkg/query-service/app/logs/validator.go +++ b/pkg/query-service/app/logs/validator.go @@ -27,13 +27,13 @@ func ValidateUpdateFieldPayload(field *model.UpdateField) error { return fmt.Errorf("type %s not supported", field.Type) } - if field.IndexType != nil { - matched, err := regexp.MatchString(`^(minmax|set\([0-9]\)|bloom_filter\((0?.?[0-9]+|1)\)|tokenbf_v1\([0-9]+,[0-9]+,[0-9]+\)|ngrambf_v1\([0-9]+,[0-9]+,[0-9]+,[0-9]+\))$`, *field.IndexType) + if field.IndexType != "" { + matched, err := regexp.MatchString(`^(minmax|set\([0-9]\)|bloom_filter\((0?.?[0-9]+|1)\)|tokenbf_v1\([0-9]+,[0-9]+,[0-9]+\)|ngrambf_v1\([0-9]+,[0-9]+,[0-9]+,[0-9]+\))$`, field.IndexType) if err != nil { return err } if !matched { - return fmt.Errorf("index type %s not supported", *field.IndexType) + return fmt.Errorf("index type %s not supported", field.IndexType) } } return nil diff --git a/pkg/query-service/model/queryParams.go b/pkg/query-service/model/queryParams.go index ce9c4f53c3..f020a429ea 100644 --- a/pkg/query-service/model/queryParams.go +++ b/pkg/query-service/model/queryParams.go @@ -322,30 +322,30 @@ type FilterSet struct { } type UpdateField struct { - Name string `json:"name"` - DataType string `json:"dataType"` - Type string `json:"type"` - Selected bool `json:"selected"` - IndexType *string `json:"index"` - IndexGranularity *int `json:"indexGranularity"` + Name string `json:"name"` + DataType string `json:"dataType"` + Type string `json:"type"` + Selected bool `json:"selected"` + IndexType string `json:"index"` + IndexGranularity int `json:"indexGranularity"` } type LogsFilterParams struct { - Limit int `json:"limit"` - OrderBy string `json:"orderBy"` - Order string `json:"order"` - Query *string `json:"q"` - TimestampStart *uint64 `json:"timestampStart"` - TimestampEnd *uint64 `json:"timestampEnd"` - IdStart *string `json:"idStart"` - IdEnd *string `json:"idEnd"` + Limit int `json:"limit"` + OrderBy string `json:"orderBy"` + Order string `json:"order"` + Query string `json:"q"` + TimestampStart uint64 `json:"timestampStart"` + TimestampEnd uint64 `json:"timestampEnd"` + IdStart string `json:"idStart"` + IdEnd string `json:"idEnd"` } type LogsAggregateParams struct { - Query *string `json:"q"` - TimestampStart *uint64 `json:"timestampStart"` - TimestampEnd *uint64 `json:"timestampEnd"` - GroupBy *string `json:"groupBy"` - Function *string `json:"function"` - StepSeconds *int `json:"step"` + Query string `json:"q"` + TimestampStart uint64 `json:"timestampStart"` + TimestampEnd uint64 `json:"timestampEnd"` + GroupBy string `json:"groupBy"` + Function string `json:"function"` + StepSeconds int `json:"step"` }