From c24bdfc8cf70d33d8dee71dc65451cc35396c893 Mon Sep 17 00:00:00 2001 From: nityanandagohain Date: Wed, 20 Jul 2022 12:11:03 +0530 Subject: [PATCH] aggregate function added --- .../app/clickhouseReader/reader.go | 81 +++++++++++++++++++ pkg/query-service/app/http_handler.go | 17 ++++ pkg/query-service/app/logs/parser.go | 48 +++++++++++ pkg/query-service/interfaces/interface.go | 1 + pkg/query-service/model/queryParams.go | 9 +++ pkg/query-service/model/response.go | 16 ++++ 6 files changed, 172 insertions(+) diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index 4a57376b26..9cd521e3dc 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -2967,3 +2967,84 @@ func (r *ClickHouseReader) TailLogs(ctx context.Context, client *model.LogsTailC } } } + +func (r *ClickHouseReader) AggregateLogs(ctx context.Context, params *model.LogsAggregateParams) (*model.GetLogsAggregatesResponse, *model.ApiError) { + logAggregatesDBResponseItems := &[]model.LogsAggregatesDBResponseItem{} + + groupBy := "" + if params.GroupBy != nil { + groupBy = *params.GroupBy + } + + function := "toFloat64(count()) as value" + if params.Function != nil { + function = fmt.Sprintf("toFloat64(%s) as value", *params.Function) + } + + fields, apiErr := r.GetLogFields(ctx) + if apiErr != nil { + return nil, apiErr + } + + filterSql, err := logs.GenerateSQLWhere(fields, &model.LogsFilterParams{ + Query: params.Query, + }) + if err != nil { + return nil, &model.ApiError{Err: err, Typ: model.ErrorBadData} + } + + query := "" + if groupBy != "" { + query = fmt.Sprintf("SELECT toInt64(toUnixTimestamp(toStartOfInterval(toDateTime(timestamp/1000), INTERVAL %d minute))*1000) as time, toString(%s) as groupBy, "+ + "%s "+ + "FROM %s.%s WHERE timestamp >= '%d' AND timestamp <= '%d' ", + *params.StepSeconds/60, groupBy, function, r.logsDB, r.logsTable, *params.TimestampStart, *params.TimestampEnd) + } else { + query = fmt.Sprintf("SELECT toInt64(toUnixTimestamp(toStartOfInterval(toDateTime(timestamp/1000), INTERVAL %d minute))*1000) as time, "+ + "%s "+ + "FROM %s.%s WHERE timestamp >= '%d' AND timestamp <= '%d' ", + *params.StepSeconds/60, function, r.logsDB, r.logsTable, *params.TimestampStart, *params.TimestampEnd) + } + if filterSql != nil && *filterSql != "" { + query += fmt.Sprintf(" AND %s ", *filterSql) + } + if groupBy != "" { + query += fmt.Sprintf("GROUP BY time, toString(%s) as groupBy ORDER BY time", groupBy) + } else { + query += "GROUP BY time ORDER BY time" + } + + zap.S().Debug(query) + err = r.db.Select(ctx, logAggregatesDBResponseItems, query) + if err != nil { + return nil, &model.ApiError{Err: err, Typ: model.ErrorInternal} + } + + aggregateResponse := model.GetLogsAggregatesResponse{ + Items: make(map[int64]model.LogsAggregatesResponseItem), + } + + for i := range *logAggregatesDBResponseItems { + if elem, ok := aggregateResponse.Items[int64((*logAggregatesDBResponseItems)[i].Timestamp)]; ok { + if groupBy != "" && (*logAggregatesDBResponseItems)[i].GroupBy != "" { + elem.GroupBy[(*logAggregatesDBResponseItems)[i].GroupBy] = (*logAggregatesDBResponseItems)[i].Value + } + aggregateResponse.Items[(*logAggregatesDBResponseItems)[i].Timestamp] = elem + } else { + if groupBy != "" && (*logAggregatesDBResponseItems)[i].GroupBy != "" { + aggregateResponse.Items[(*logAggregatesDBResponseItems)[i].Timestamp] = model.LogsAggregatesResponseItem{ + Timestamp: (*logAggregatesDBResponseItems)[i].Timestamp, + GroupBy: map[string]interface{}{(*logAggregatesDBResponseItems)[i].GroupBy: (*logAggregatesDBResponseItems)[i].Value}, + } + } else if groupBy == "" { + aggregateResponse.Items[(*logAggregatesDBResponseItems)[i].Timestamp] = model.LogsAggregatesResponseItem{ + Timestamp: (*logAggregatesDBResponseItems)[i].Timestamp, + Value: (*logAggregatesDBResponseItems)[i].Value, + } + } + } + + } + + return &aggregateResponse, nil +} diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index a2f55fec60..c97ffd932f 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -1868,6 +1868,7 @@ func (aH *APIHandler) RegisterLogsRoutes(router *mux.Router) { subRouter.HandleFunc("/tail", ViewAccess(aH.tailLogs)).Methods(http.MethodGet) subRouter.HandleFunc("/fields", ViewAccess(aH.logFields)).Methods(http.MethodGet) subRouter.HandleFunc("/fields", ViewAccess(aH.logFieldUpdate)).Methods(http.MethodPost) + subRouter.HandleFunc("/aggregate", ViewAccess(aH.logAggregate)).Methods(http.MethodGet) } func (aH *APIHandler) logFields(w http.ResponseWriter, r *http.Request) { @@ -1960,3 +1961,19 @@ func (aH *APIHandler) tailLogs(w http.ResponseWriter, r *http.Request) { } } } + +func (aH *APIHandler) logAggregate(w http.ResponseWriter, r *http.Request) { + params, err := logs.ParseLogAggregateParams(r) + if err != nil { + apiErr := &model.ApiError{Typ: model.ErrorBadData, Err: err} + respondError(w, apiErr, "Incorrect params") + return + } + + res, apiErr := (*aH.reader).AggregateLogs(r.Context(), params) + if apiErr != nil { + respondError(w, apiErr, "Failed to fetch logs aggregate from the DB") + return + } + aH.writeJSON(w, r, res) +} diff --git a/pkg/query-service/app/logs/parser.go b/pkg/query-service/app/logs/parser.go index 4c8649b5fd..7d9438976b 100644 --- a/pkg/query-service/app/logs/parser.go +++ b/pkg/query-service/app/logs/parser.go @@ -92,6 +92,54 @@ func ParseLiveTailFilterParams(r *http.Request) (*model.LogsFilterParams, error) return &res, nil } +func ParseLogAggregateParams(r *http.Request) (*model.LogsAggregateParams, error) { + res := model.LogsAggregateParams{} + params := r.URL.Query() + if val, ok := params["timestampStart"]; ok { + ts, err := strconv.Atoi(val[0]) + if err != nil { + return nil, err + } + ts64 := uint64(ts) + res.TimestampStart = &ts64 + } else { + return nil, fmt.Errorf("timestampStart is required") + } + if val, ok := params["timestampEnd"]; ok { + ts, err := strconv.Atoi(val[0]) + if err != nil { + return nil, err + } + ts64 := uint64(ts) + res.TimestampEnd = &ts64 + } else { + return nil, fmt.Errorf("timestampEnd is required") + } + + if val, ok := params["q"]; ok { + res.Query = &val[0] + } + + if val, ok := params["groupBy"]; ok { + res.GroupBy = &val[0] + } + + if val, ok := params["function"]; ok { + res.Function = &val[0] + } + + if val, ok := params["step"]; ok { + step, err := strconv.Atoi(val[0]) + if err != nil { + return nil, err + } + res.StepSeconds = &step + } else { + return nil, fmt.Errorf("step is required") + } + return &res, nil +} + func parseLogQuery(query string) ([]string, error) { sqlQueryTokens := []string{} filterTokens := tokenRegex.FindAllString(query, -1) diff --git a/pkg/query-service/interfaces/interface.go b/pkg/query-service/interfaces/interface.go index b0429db9bb..c96a1af470 100644 --- a/pkg/query-service/interfaces/interface.go +++ b/pkg/query-service/interfaces/interface.go @@ -63,6 +63,7 @@ type Reader interface { UpdateLogField(ctx context.Context, field *model.UpdateField) *model.ApiError GetLogs(ctx context.Context, params *model.LogsFilterParams) (*[]model.GetLogsResponse, *model.ApiError) TailLogs(ctx context.Context, client *model.LogsTailClient) + AggregateLogs(ctx context.Context, params *model.LogsAggregateParams) (*model.GetLogsAggregatesResponse, *model.ApiError) // Connection needed for rules, not ideal but required GetConn() clickhouse.Conn diff --git a/pkg/query-service/model/queryParams.go b/pkg/query-service/model/queryParams.go index b6059107d9..ce9c4f53c3 100644 --- a/pkg/query-service/model/queryParams.go +++ b/pkg/query-service/model/queryParams.go @@ -340,3 +340,12 @@ type LogsFilterParams struct { IdStart *string `json:"idStart"` IdEnd *string `json:"idEnd"` } + +type LogsAggregateParams struct { + Query *string `json:"q"` + TimestampStart *uint64 `json:"timestampStart"` + TimestampEnd *uint64 `json:"timestampEnd"` + GroupBy *string `json:"groupBy"` + Function *string `json:"function"` + StepSeconds *int `json:"step"` +} diff --git a/pkg/query-service/model/response.go b/pkg/query-service/model/response.go index b85955da74..b802e3c4f7 100644 --- a/pkg/query-service/model/response.go +++ b/pkg/query-service/model/response.go @@ -443,3 +443,19 @@ type LogsTailClient struct { Error chan error Filter LogsFilterParams } + +type GetLogsAggregatesResponse struct { + Items map[int64]LogsAggregatesResponseItem `json:"items"` +} + +type LogsAggregatesResponseItem struct { + Timestamp int64 `json:"timestamp,omitempty" ` + Value interface{} `json:"value,omitempty"` + GroupBy map[string]interface{} `json:"groupBy,omitempty"` +} + +type LogsAggregatesDBResponseItem struct { + Timestamp int64 `ch:"time"` + Value float64 `ch:"value"` + GroupBy string `ch:"groupBy"` +}