From 38e694cd36a3f3272a4bb6c3e3cc917fe50231f0 Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Tue, 30 Jul 2024 02:02:50 +0530 Subject: [PATCH] chore: only fetch top level operation from the selected time window (#5404) --- .../app/clickhouseReader/reader.go | 38 +++++++++---------- pkg/query-service/app/http_handler.go | 38 ++++++++++++++++++- pkg/query-service/interfaces/interface.go | 2 +- 3 files changed, 55 insertions(+), 23 deletions(-) diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index ccdffd88bd..632e6522a6 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -706,21 +706,25 @@ func (r *ClickHouseReader) GetServicesList(ctx context.Context) (*[]string, erro return &services, nil } -func (r *ClickHouseReader) GetTopLevelOperations(ctx context.Context, skipConfig *model.SkipConfig, start, end time.Time) (*map[string][]string, *map[string][]string, *model.ApiError) { +func (r *ClickHouseReader) GetTopLevelOperations(ctx context.Context, skipConfig *model.SkipConfig, start, end time.Time, services []string) (*map[string][]string, *model.ApiError) { start = start.In(time.UTC) // The `top_level_operations` that have `time` >= start operations := map[string][]string{} - // All top level operations for a service - allOperations := map[string][]string{} - query := fmt.Sprintf(`SELECT DISTINCT name, serviceName, time FROM %s.%s`, r.TraceDB, r.topLevelOperationsTable) + // We can't use the `end` because the `top_level_operations` table has the most recent instances of the operations + // We can only use the `start` time to filter the operations + query := fmt.Sprintf(`SELECT name, serviceName, max(time) as ts FROM %s.%s WHERE time >= @start`, r.TraceDB, r.topLevelOperationsTable) + if len(services) > 0 { + query += ` AND serviceName IN @services` + } + query += ` GROUP BY name, serviceName ORDER BY ts DESC LIMIT 5000` - rows, err := r.db.Query(ctx, query) + rows, err := r.db.Query(ctx, query, clickhouse.Named("start", start), clickhouse.Named("services", services)) if err != nil { zap.L().Error("Error in processing sql query", zap.Error(err)) - return nil, nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query")} + return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query")} } defer rows.Close() @@ -728,25 +732,17 @@ func (r *ClickHouseReader) GetTopLevelOperations(ctx context.Context, skipConfig var name, serviceName string var t time.Time if err := rows.Scan(&name, &serviceName, &t); err != nil { - return nil, nil, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("error in reading data")} + return nil, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("error in reading data")} } if _, ok := operations[serviceName]; !ok { - operations[serviceName] = []string{} - } - if _, ok := allOperations[serviceName]; !ok { - allOperations[serviceName] = []string{} + operations[serviceName] = []string{"overflow_operation"} } if skipConfig.ShouldSkip(serviceName, name) { continue } - allOperations[serviceName] = append(allOperations[serviceName], name) - // We can't use the `end` because the `top_level_operations` table has the most recent instances of the operations - // We can only use the `start` time to filter the operations - if t.After(start) { - operations[serviceName] = append(operations[serviceName], name) - } + operations[serviceName] = append(operations[serviceName], name) } - return &operations, &allOperations, nil + return &operations, nil } func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.GetServicesParams, skipConfig *model.SkipConfig) (*[]model.ServiceItem, *model.ApiError) { @@ -755,7 +751,7 @@ func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.G return nil, &model.ApiError{Typ: model.ErrorExec, Err: ErrNoIndexTable} } - topLevelOps, allTopLevelOps, apiErr := r.GetTopLevelOperations(ctx, skipConfig, *queryParams.Start, *queryParams.End) + topLevelOps, apiErr := r.GetTopLevelOperations(ctx, skipConfig, *queryParams.Start, *queryParams.End, nil) if apiErr != nil { return nil, apiErr } @@ -779,7 +775,7 @@ func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.G // the top level operations are high, we want to warn to let user know the issue // with the instrumentation serviceItem.DataWarning = model.DataWarning{ - TopLevelOps: (*allTopLevelOps)[svc], + TopLevelOps: (*topLevelOps)[svc], } // default max_query_size = 262144 @@ -868,7 +864,7 @@ func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.G func (r *ClickHouseReader) GetServiceOverview(ctx context.Context, queryParams *model.GetServiceOverviewParams, skipConfig *model.SkipConfig) (*[]model.ServiceOverviewItem, *model.ApiError) { - topLevelOps, _, apiErr := r.GetTopLevelOperations(ctx, skipConfig, *queryParams.Start, *queryParams.End) + topLevelOps, apiErr := r.GetTopLevelOperations(ctx, skipConfig, *queryParams.Start, *queryParams.End, nil) if apiErr != nil { return nil, apiErr } diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 3e4ddfbfcf..f7482f41f2 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -1347,8 +1347,44 @@ func (aH *APIHandler) getServiceOverview(w http.ResponseWriter, r *http.Request) func (aH *APIHandler) getServicesTopLevelOps(w http.ResponseWriter, r *http.Request) { var start, end time.Time + var services []string - result, _, apiErr := aH.reader.GetTopLevelOperations(r.Context(), aH.skipConfig, start, end) + type topLevelOpsParams struct { + Service string `json:"service"` + Start string `json:"start"` + End string `json:"end"` + } + + var params topLevelOpsParams + err := json.NewDecoder(r.Body).Decode(¶ms) + if err != nil { + zap.L().Error("Error in getting req body for get top operations API", zap.Error(err)) + } + + if params.Service != "" { + services = []string{params.Service} + } + + startEpoch := params.Start + if startEpoch != "" { + startEpochInt, err := strconv.ParseInt(startEpoch, 10, 64) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, "Error reading start time") + return + } + start = time.Unix(0, startEpochInt) + } + endEpoch := params.End + if endEpoch != "" { + endEpochInt, err := strconv.ParseInt(endEpoch, 10, 64) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, "Error reading end time") + return + } + end = time.Unix(0, endEpochInt) + } + + result, apiErr := aH.reader.GetTopLevelOperations(r.Context(), aH.skipConfig, start, end, services) if apiErr != nil { RespondError(w, apiErr, nil) return diff --git a/pkg/query-service/interfaces/interface.go b/pkg/query-service/interfaces/interface.go index 385d48173b..239ecf02bb 100644 --- a/pkg/query-service/interfaces/interface.go +++ b/pkg/query-service/interfaces/interface.go @@ -23,7 +23,7 @@ type Reader interface { GetInstantQueryMetricsResult(ctx context.Context, query *model.InstantQueryMetricsParams) (*promql.Result, *stats.QueryStats, *model.ApiError) GetQueryRangeResult(ctx context.Context, query *model.QueryRangeParams) (*promql.Result, *stats.QueryStats, *model.ApiError) GetServiceOverview(ctx context.Context, query *model.GetServiceOverviewParams, skipConfig *model.SkipConfig) (*[]model.ServiceOverviewItem, *model.ApiError) - GetTopLevelOperations(ctx context.Context, skipConfig *model.SkipConfig, start, end time.Time) (*map[string][]string, *map[string][]string, *model.ApiError) + GetTopLevelOperations(ctx context.Context, skipConfig *model.SkipConfig, start, end time.Time, services []string) (*map[string][]string, *model.ApiError) GetServices(ctx context.Context, query *model.GetServicesParams, skipConfig *model.SkipConfig) (*[]model.ServiceItem, *model.ApiError) GetTopOperations(ctx context.Context, query *model.GetTopOperationsParams) (*[]model.TopOperationsItem, *model.ApiError) GetUsage(ctx context.Context, query *model.GetUsageParams) (*[]model.UsageItem, error)