diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index c415950b8c..94f31cc67d 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -756,33 +756,47 @@ func (r *ClickHouseReader) GetServicesList(ctx context.Context) (*[]string, erro return &services, nil } -func (r *ClickHouseReader) GetTopLevelOperations(ctx context.Context, skipConfig *model.SkipConfig) (*map[string][]string, *model.ApiError) { +func (r *ClickHouseReader) GetTopLevelOperations(ctx context.Context, skipConfig *model.SkipConfig, start, end time.Time) (*map[string][]string, *map[string][]string, *model.ApiError) { + start = start.In(time.UTC) + + // The `top_level_operations` that have `time` >= start operations := map[string][]string{} - query := fmt.Sprintf(`SELECT DISTINCT name, serviceName FROM %s.%s`, r.TraceDB, r.topLevelOperationsTable) + // All top level operations for a service + allOperations := map[string][]string{} + query := fmt.Sprintf(`SELECT DISTINCT name, serviceName, time FROM %s.%s`, r.TraceDB, r.topLevelOperationsTable) rows, err := r.db.Query(ctx, query) if err != nil { zap.S().Error("Error in processing sql query: ", err) - return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("Error in processing sql query")} + return nil, nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query")} } defer rows.Close() for rows.Next() { var name, serviceName string - if err := rows.Scan(&name, &serviceName); err != nil { - return nil, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("Error in reading data")} + var t time.Time + if err := rows.Scan(&name, &serviceName, &t); err != nil { + return nil, nil, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("error in reading data")} } if _, ok := operations[serviceName]; !ok { operations[serviceName] = []string{} } + if _, ok := allOperations[serviceName]; !ok { + allOperations[serviceName] = []string{} + } if skipConfig.ShouldSkip(serviceName, name) { continue } - operations[serviceName] = append(operations[serviceName], name) + allOperations[serviceName] = append(allOperations[serviceName], name) + // We can't use the `end` because the `top_level_operations` table has the most recent instances of the operations + // We can only use the `start` time to filter the operations + if t.After(start) { + operations[serviceName] = append(operations[serviceName], name) + } } - return &operations, nil + return &operations, &allOperations, nil } func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.GetServicesParams, skipConfig *model.SkipConfig) (*[]model.ServiceItem, *model.ApiError) { @@ -791,7 +805,7 @@ func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.G return nil, &model.ApiError{Typ: model.ErrorExec, Err: ErrNoIndexTable} } - topLevelOps, apiErr := r.GetTopLevelOperations(ctx, skipConfig) + topLevelOps, allTopLevelOps, apiErr := r.GetTopLevelOperations(ctx, skipConfig, *queryParams.Start, *queryParams.End) if apiErr != nil { return nil, apiErr } @@ -810,6 +824,22 @@ func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.G defer func() { <-sem }() var serviceItem model.ServiceItem var numErrors uint64 + + // Even if the total number of operations within the time range is less and the all + // the top level operations are high, we want to warn to let user know the issue + // with the instrumentation + serviceItem.DataWarning = model.DataWarning{ + TopLevelOps: (*allTopLevelOps)[svc], + } + + // default max_query_size = 262144 + // Let's assume the average size of the item in `ops` is 50 bytes + // We can have 262144/50 = 5242 items in the `ops` array + // Although we have make it as big as 5k, We cap the number of items + // in the `ops` array to 1500 + + ops = ops[:int(math.Min(1500, float64(len(ops))))] + query := fmt.Sprintf( `SELECT quantile(0.99)(durationNano) as p99, @@ -858,6 +888,10 @@ func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.G return } subQuery, argsSubQuery, errStatus = buildQueryWithTagParams(ctx, tags) + if errStatus != nil { + zap.S().Error("Error building query with tag params: ", err) + return + } query += subQuery args = append(args, argsSubQuery...) err = r.db.QueryRow(ctx, errorQuery, args...).Scan(&numErrors) @@ -884,7 +918,7 @@ func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.G func (r *ClickHouseReader) GetServiceOverview(ctx context.Context, queryParams *model.GetServiceOverviewParams, skipConfig *model.SkipConfig) (*[]model.ServiceOverviewItem, *model.ApiError) { - topLevelOps, apiErr := r.GetTopLevelOperations(ctx, skipConfig) + topLevelOps, _, apiErr := r.GetTopLevelOperations(ctx, skipConfig, *queryParams.Start, *queryParams.End) if apiErr != nil { return nil, apiErr } @@ -1576,7 +1610,7 @@ func buildQueryWithTagParams(ctx context.Context, tags []model.TagQuery) (string case model.NotExistsOperator: subQuery, argsSubQuery = addExistsOperator(item, tagMapType, true) default: - return "", nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("Tag Operator %s not supported", item.GetOperator())} + return "", nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("filter operator %s not supported", item.GetOperator())} } query += subQuery args = append(args, argsSubQuery...) diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 6ff840bac2..e8200635f7 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -1556,7 +1556,9 @@ func (aH *APIHandler) getServiceOverview(w http.ResponseWriter, r *http.Request) func (aH *APIHandler) getServicesTopLevelOps(w http.ResponseWriter, r *http.Request) { - result, apiErr := aH.reader.GetTopLevelOperations(r.Context(), aH.skipConfig) + var start, end time.Time + + result, _, apiErr := aH.reader.GetTopLevelOperations(r.Context(), aH.skipConfig, start, end) if apiErr != nil { RespondError(w, apiErr, nil) return diff --git a/pkg/query-service/interfaces/interface.go b/pkg/query-service/interfaces/interface.go index 9d0d65c39c..1ca1fd9958 100644 --- a/pkg/query-service/interfaces/interface.go +++ b/pkg/query-service/interfaces/interface.go @@ -23,7 +23,7 @@ type Reader interface { GetInstantQueryMetricsResult(ctx context.Context, query *model.InstantQueryMetricsParams) (*promql.Result, *stats.QueryStats, *model.ApiError) GetQueryRangeResult(ctx context.Context, query *model.QueryRangeParams) (*promql.Result, *stats.QueryStats, *model.ApiError) GetServiceOverview(ctx context.Context, query *model.GetServiceOverviewParams, skipConfig *model.SkipConfig) (*[]model.ServiceOverviewItem, *model.ApiError) - GetTopLevelOperations(ctx context.Context, skipConfig *model.SkipConfig) (*map[string][]string, *model.ApiError) + GetTopLevelOperations(ctx context.Context, skipConfig *model.SkipConfig, start, end time.Time) (*map[string][]string, *map[string][]string, *model.ApiError) GetServices(ctx context.Context, query *model.GetServicesParams, skipConfig *model.SkipConfig) (*[]model.ServiceItem, *model.ApiError) GetTopOperations(ctx context.Context, query *model.GetTopOperationsParams) (*[]model.TopOperationsItem, *model.ApiError) GetUsage(ctx context.Context, query *model.GetUsageParams) (*[]model.UsageItem, error) diff --git a/pkg/query-service/model/response.go b/pkg/query-service/model/response.go index 0362ec6a35..05da7f5ab7 100644 --- a/pkg/query-service/model/response.go +++ b/pkg/query-service/model/response.go @@ -171,16 +171,21 @@ type AlertingRuleResponse struct { // Value float64 `json:"value"` } +type DataWarning struct { + TopLevelOps []string `json:"topLevelOps"` +} + type ServiceItem struct { - ServiceName string `json:"serviceName" ch:"serviceName"` - Percentile99 float64 `json:"p99" ch:"p99"` - AvgDuration float64 `json:"avgDuration" ch:"avgDuration"` - NumCalls uint64 `json:"numCalls" ch:"numCalls"` - CallRate float64 `json:"callRate" ch:"callRate"` - NumErrors uint64 `json:"numErrors" ch:"numErrors"` - ErrorRate float64 `json:"errorRate" ch:"errorRate"` - Num4XX uint64 `json:"num4XX" ch:"num4xx"` - FourXXRate float64 `json:"fourXXRate" ch:"fourXXRate"` + ServiceName string `json:"serviceName" ch:"serviceName"` + Percentile99 float64 `json:"p99" ch:"p99"` + AvgDuration float64 `json:"avgDuration" ch:"avgDuration"` + NumCalls uint64 `json:"numCalls" ch:"numCalls"` + CallRate float64 `json:"callRate" ch:"callRate"` + NumErrors uint64 `json:"numErrors" ch:"numErrors"` + ErrorRate float64 `json:"errorRate" ch:"errorRate"` + Num4XX uint64 `json:"num4XX" ch:"num4xx"` + FourXXRate float64 `json:"fourXXRate" ch:"fourXXRate"` + DataWarning DataWarning `json:"dataWarning"` } type ServiceErrorItem struct { Time time.Time `json:"time" ch:"time"`