From 91bbeaf17594bdc87da7867ebb152fdb171e463d Mon Sep 17 00:00:00 2001 From: Nityananda Gohain Date: Mon, 18 Nov 2024 10:27:08 +0530 Subject: [PATCH] fix: remove unwanted trace API's (#6464) --- .../app/clickhouseReader/reader.go | 791 ------------------ pkg/query-service/app/http_handler.go | 86 -- pkg/query-service/interfaces/interface.go | 5 - 3 files changed, 882 deletions(-) diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index a6811dc2eb..7e57e5ccaf 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -766,307 +766,6 @@ func buildFilterArrayQuery(_ context.Context, excludeMap map[string]struct{}, pa return args } -func (r *ClickHouseReader) GetSpanFilters(ctx context.Context, queryParams *model.SpanFilterParams) (*model.SpanFiltersResponse, *model.ApiError) { - - var query string - excludeMap := make(map[string]struct{}) - for _, e := range queryParams.Exclude { - if e == constants.OperationRequest { - excludeMap[constants.OperationDB] = struct{}{} - continue - } - excludeMap[e] = struct{}{} - } - - args := []interface{}{clickhouse.Named("timestampL", strconv.FormatInt(queryParams.Start.UnixNano(), 10)), clickhouse.Named("timestampU", strconv.FormatInt(queryParams.End.UnixNano(), 10))} - if len(queryParams.TraceID) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.TraceID, constants.TraceID, &query, args) - } - if len(queryParams.ServiceName) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.ServiceName, constants.ServiceName, &query, args) - } - if len(queryParams.HttpRoute) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpRoute, constants.HttpRoute, &query, args) - } - if len(queryParams.HttpHost) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpHost, constants.HttpHost, &query, args) - } - if len(queryParams.HttpMethod) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpMethod, constants.HttpMethod, &query, args) - } - if len(queryParams.HttpUrl) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpUrl, constants.HttpUrl, &query, args) - } - if len(queryParams.Operation) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.Operation, constants.OperationDB, &query, args) - } - if len(queryParams.RPCMethod) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.RPCMethod, constants.RPCMethod, &query, args) - } - if len(queryParams.ResponseStatusCode) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.ResponseStatusCode, constants.ResponseStatusCode, &query, args) - } - - if len(queryParams.MinDuration) != 0 { - query = query + " AND durationNano >= @durationNanoMin" - args = append(args, clickhouse.Named("durationNanoMin", queryParams.MinDuration)) - } - if len(queryParams.MaxDuration) != 0 { - query = query + " AND durationNano <= @durationNanoMax" - args = append(args, clickhouse.Named("durationNanoMax", queryParams.MaxDuration)) - } - - if len(queryParams.SpanKind) != 0 { - query = query + " AND kind = @kind" - args = append(args, clickhouse.Named("kind", queryParams.SpanKind)) - } - - query = getStatusFilters(query, queryParams.Status, excludeMap) - - traceFilterReponse := model.SpanFiltersResponse{ - Status: map[string]uint64{}, - Duration: map[string]uint64{}, - ServiceName: map[string]uint64{}, - Operation: map[string]uint64{}, - ResponseStatusCode: map[string]uint64{}, - RPCMethod: map[string]uint64{}, - HttpMethod: map[string]uint64{}, - HttpUrl: map[string]uint64{}, - HttpRoute: map[string]uint64{}, - HttpHost: map[string]uint64{}, - } - - for _, e := range queryParams.GetFilters { - switch e { - case constants.TraceID: - continue - case constants.ServiceName: - finalQuery := fmt.Sprintf("SELECT serviceName, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable) - finalQuery += query - finalQuery += " GROUP BY serviceName" - var dBResponse []model.DBResponseServiceName - err := r.db.Select(ctx, &dBResponse, finalQuery, args...) - zap.L().Info(finalQuery) - - if err != nil { - zap.L().Error("Error in processing sql query", zap.Error(err)) - return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)} - } - for _, service := range dBResponse { - if service.ServiceName != "" { - traceFilterReponse.ServiceName[service.ServiceName] = service.Count - } - } - case constants.HttpRoute: - finalQuery := fmt.Sprintf("SELECT httpRoute, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable) - finalQuery += query - finalQuery += " GROUP BY httpRoute" - var dBResponse []model.DBResponseHttpRoute - err := r.db.Select(ctx, &dBResponse, finalQuery, args...) - zap.L().Info(finalQuery) - - if err != nil { - zap.L().Error("Error in processing sql query", zap.Error(err)) - return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)} - } - for _, service := range dBResponse { - if service.HttpRoute != "" { - traceFilterReponse.HttpRoute[service.HttpRoute] = service.Count - } - } - case constants.HttpUrl: - finalQuery := fmt.Sprintf("SELECT httpUrl, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable) - finalQuery += query - finalQuery += " GROUP BY httpUrl" - var dBResponse []model.DBResponseHttpUrl - err := r.db.Select(ctx, &dBResponse, finalQuery, args...) - zap.L().Info(finalQuery) - - if err != nil { - zap.L().Error("Error in processing sql query", zap.Error(err)) - return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)} - } - for _, service := range dBResponse { - if service.HttpUrl != "" { - traceFilterReponse.HttpUrl[service.HttpUrl] = service.Count - } - } - case constants.HttpMethod: - finalQuery := fmt.Sprintf("SELECT httpMethod, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable) - finalQuery += query - finalQuery += " GROUP BY httpMethod" - var dBResponse []model.DBResponseHttpMethod - err := r.db.Select(ctx, &dBResponse, finalQuery, args...) - zap.L().Info(finalQuery) - - if err != nil { - zap.L().Error("Error in processing sql query", zap.Error(err)) - return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)} - } - for _, service := range dBResponse { - if service.HttpMethod != "" { - traceFilterReponse.HttpMethod[service.HttpMethod] = service.Count - } - } - case constants.HttpHost: - finalQuery := fmt.Sprintf("SELECT httpHost, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable) - finalQuery += query - finalQuery += " GROUP BY httpHost" - var dBResponse []model.DBResponseHttpHost - err := r.db.Select(ctx, &dBResponse, finalQuery, args...) - zap.L().Info(finalQuery) - - if err != nil { - zap.L().Error("Error in processing sql query", zap.Error(err)) - return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)} - } - for _, service := range dBResponse { - if service.HttpHost != "" { - traceFilterReponse.HttpHost[service.HttpHost] = service.Count - } - } - case constants.OperationRequest: - finalQuery := fmt.Sprintf("SELECT name, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable) - finalQuery += query - finalQuery += " GROUP BY name" - var dBResponse []model.DBResponseOperation - err := r.db.Select(ctx, &dBResponse, finalQuery, args...) - zap.L().Info(finalQuery) - - if err != nil { - zap.L().Error("Error in processing sql query", zap.Error(err)) - return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)} - } - for _, service := range dBResponse { - if service.Operation != "" { - traceFilterReponse.Operation[service.Operation] = service.Count - } - } - case constants.Status: - finalQuery := fmt.Sprintf("SELECT COUNT(*) as numTotal FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU AND hasError = true", r.TraceDB, r.indexTable) - finalQuery += query - var dBResponse []model.DBResponseTotal - err := r.db.Select(ctx, &dBResponse, finalQuery, args...) - zap.L().Info(finalQuery) - - if err != nil { - zap.L().Error("Error in processing sql query", zap.Error(err)) - return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)} - } - - finalQuery2 := fmt.Sprintf("SELECT COUNT(*) as numTotal FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU AND hasError = false", r.TraceDB, r.indexTable) - finalQuery2 += query - var dBResponse2 []model.DBResponseTotal - err = r.db.Select(ctx, &dBResponse2, finalQuery2, args...) - zap.L().Info(finalQuery2) - - if err != nil { - zap.L().Error("Error in processing sql query", zap.Error(err)) - return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)} - } - if len(dBResponse) > 0 && len(dBResponse2) > 0 { - traceFilterReponse.Status = map[string]uint64{"ok": dBResponse2[0].NumTotal, "error": dBResponse[0].NumTotal} - } else if len(dBResponse) > 0 { - traceFilterReponse.Status = map[string]uint64{"ok": 0, "error": dBResponse[0].NumTotal} - } else if len(dBResponse2) > 0 { - traceFilterReponse.Status = map[string]uint64{"ok": dBResponse2[0].NumTotal, "error": 0} - } else { - traceFilterReponse.Status = map[string]uint64{"ok": 0, "error": 0} - } - case constants.Duration: - err := r.featureFlags.CheckFeature(constants.DurationSort) - durationSortEnabled := err == nil - finalQuery := "" - if !durationSortEnabled { - // if duration sort is not enabled, we need to get the min and max duration from the index table - finalQuery = fmt.Sprintf("SELECT min(durationNano) as min, max(durationNano) as max FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable) - finalQuery += query - var dBResponse []model.DBResponseMinMax - err = r.db.Select(ctx, &dBResponse, finalQuery, args...) - zap.L().Info(finalQuery) - if err != nil { - zap.L().Error("Error in processing sql query", zap.Error(err)) - return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)} - } - if len(dBResponse) > 0 { - traceFilterReponse.Duration = map[string]uint64{"minDuration": dBResponse[0].Min, "maxDuration": dBResponse[0].Max} - } - } else { - // when duration sort is enabled, we need to get the min and max duration from the duration table - finalQuery = fmt.Sprintf("SELECT durationNano as numTotal FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.durationTable) - finalQuery += query - finalQuery += " ORDER BY durationNano LIMIT 1" - var dBResponse []model.DBResponseTotal - err = r.db.Select(ctx, &dBResponse, finalQuery, args...) - zap.L().Info(finalQuery) - - if err != nil { - zap.L().Error("Error in processing sql query", zap.Error(err)) - return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)} - } - - finalQuery = fmt.Sprintf("SELECT durationNano as numTotal FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.durationTable) - finalQuery += query - finalQuery += " ORDER BY durationNano DESC LIMIT 1" - var dBResponse2 []model.DBResponseTotal - err = r.db.Select(ctx, &dBResponse2, finalQuery, args...) - zap.L().Info(finalQuery) - - if err != nil { - zap.L().Error("Error in processing sql query", zap.Error(err)) - return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)} - } - if len(dBResponse) > 0 { - traceFilterReponse.Duration["minDuration"] = dBResponse[0].NumTotal - } - if len(dBResponse2) > 0 { - traceFilterReponse.Duration["maxDuration"] = dBResponse2[0].NumTotal - } - } - case constants.RPCMethod: - finalQuery := fmt.Sprintf("SELECT rpcMethod, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable) - finalQuery += query - finalQuery += " GROUP BY rpcMethod" - var dBResponse []model.DBResponseRPCMethod - err := r.db.Select(ctx, &dBResponse, finalQuery, args...) - zap.L().Info(finalQuery) - - if err != nil { - zap.L().Error("Error in processing sql query", zap.Error(err)) - return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)} - } - for _, service := range dBResponse { - if service.RPCMethod != "" { - traceFilterReponse.RPCMethod[service.RPCMethod] = service.Count - } - } - - case constants.ResponseStatusCode: - finalQuery := fmt.Sprintf("SELECT responseStatusCode, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable) - finalQuery += query - finalQuery += " GROUP BY responseStatusCode" - var dBResponse []model.DBResponseStatusCodeMethod - err := r.db.Select(ctx, &dBResponse, finalQuery, args...) - zap.L().Info(finalQuery) - - if err != nil { - zap.L().Error("Error in processing sql query", zap.Error(err)) - return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)} - } - for _, service := range dBResponse { - if service.ResponseStatusCode != "" { - traceFilterReponse.ResponseStatusCode[service.ResponseStatusCode] = service.Count - } - } - - default: - return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("filter type: %s not supported", e)} - } - } - - return &traceFilterReponse, nil -} - func getStatusFilters(query string, statusParams []string, excludeMap map[string]struct{}) string { // status can only be two and if both are selected than they are equivalent to none selected @@ -1088,140 +787,6 @@ func getStatusFilters(query string, statusParams []string, excludeMap map[string return query } -func (r *ClickHouseReader) GetFilteredSpans(ctx context.Context, queryParams *model.GetFilteredSpansParams) (*model.GetFilterSpansResponse, *model.ApiError) { - - queryTable := fmt.Sprintf("%s.%s", r.TraceDB, r.indexTable) - - excludeMap := make(map[string]struct{}) - for _, e := range queryParams.Exclude { - if e == constants.OperationRequest { - excludeMap[constants.OperationDB] = struct{}{} - continue - } - excludeMap[e] = struct{}{} - } - - var query string - args := []interface{}{clickhouse.Named("timestampL", strconv.FormatInt(queryParams.Start.UnixNano(), 10)), clickhouse.Named("timestampU", strconv.FormatInt(queryParams.End.UnixNano(), 10))} - if len(queryParams.TraceID) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.TraceID, constants.TraceID, &query, args) - } - if len(queryParams.ServiceName) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.ServiceName, constants.ServiceName, &query, args) - } - if len(queryParams.HttpRoute) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpRoute, constants.HttpRoute, &query, args) - } - if len(queryParams.HttpHost) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpHost, constants.HttpHost, &query, args) - } - if len(queryParams.HttpMethod) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpMethod, constants.HttpMethod, &query, args) - } - if len(queryParams.HttpUrl) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpUrl, constants.HttpUrl, &query, args) - } - if len(queryParams.Operation) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.Operation, constants.OperationDB, &query, args) - } - if len(queryParams.RPCMethod) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.RPCMethod, constants.RPCMethod, &query, args) - } - - if len(queryParams.ResponseStatusCode) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.ResponseStatusCode, constants.ResponseStatusCode, &query, args) - } - - if len(queryParams.MinDuration) != 0 { - query = query + " AND durationNano >= @durationNanoMin" - args = append(args, clickhouse.Named("durationNanoMin", queryParams.MinDuration)) - } - if len(queryParams.MaxDuration) != 0 { - query = query + " AND durationNano <= @durationNanoMax" - args = append(args, clickhouse.Named("durationNanoMax", queryParams.MaxDuration)) - } - query = getStatusFilters(query, queryParams.Status, excludeMap) - - if len(queryParams.SpanKind) != 0 { - query = query + " AND kind = @kind" - args = append(args, clickhouse.Named("kind", queryParams.SpanKind)) - } - - // create TagQuery from TagQueryParams - tags := createTagQueryFromTagQueryParams(queryParams.Tags) - subQuery, argsSubQuery, errStatus := buildQueryWithTagParams(ctx, tags) - query += subQuery - args = append(args, argsSubQuery...) - if errStatus != nil { - return nil, errStatus - } - - if len(queryParams.OrderParam) != 0 { - if queryParams.OrderParam == constants.Duration { - queryTable = fmt.Sprintf("%s.%s", r.TraceDB, r.durationTable) - if queryParams.Order == constants.Descending { - query = query + " ORDER BY durationNano DESC" - } - if queryParams.Order == constants.Ascending { - query = query + " ORDER BY durationNano ASC" - } - } else if queryParams.OrderParam == constants.Timestamp { - projectionOptQuery := "SET allow_experimental_projection_optimization = 1" - err := r.db.Exec(ctx, projectionOptQuery) - - zap.L().Info(projectionOptQuery) - - if err != nil { - zap.L().Error("Error in processing sql query", zap.Error(err)) - return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query")} - } - if queryParams.Order == constants.Descending { - query = query + " ORDER BY timestamp DESC" - } - if queryParams.Order == constants.Ascending { - query = query + " ORDER BY timestamp ASC" - } - } - } - if queryParams.Limit > 0 { - query = query + " LIMIT @limit" - args = append(args, clickhouse.Named("limit", queryParams.Limit)) - } - - if queryParams.Offset > 0 { - query = query + " OFFSET @offset" - args = append(args, clickhouse.Named("offset", queryParams.Offset)) - } - - var getFilterSpansResponseItems []model.GetFilterSpansResponseItem - - baseQuery := fmt.Sprintf("SELECT timestamp, spanID, traceID, serviceName, name, durationNano, httpMethod, rpcMethod, responseStatusCode FROM %s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryTable) - baseQuery += query - err := r.db.Select(ctx, &getFilterSpansResponseItems, baseQuery, args...) - // Fill status and method - for i, e := range getFilterSpansResponseItems { - if e.RPCMethod != "" { - getFilterSpansResponseItems[i].Method = e.RPCMethod - } else { - getFilterSpansResponseItems[i].Method = e.HttpMethod - } - } - - zap.L().Info(baseQuery) - - if err != nil { - zap.L().Error("Error in processing sql query", zap.Error(err)) - return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query")} - } - - getFilterSpansResponse := model.GetFilterSpansResponse{ - Spans: getFilterSpansResponseItems, - TotalSpans: 1000, - } - - return &getFilterSpansResponse, nil -} - func createTagQueryFromTagQueryParams(queryParams []model.TagQueryParam) []model.TagQuery { tags := []model.TagQuery{} for _, tag := range queryParams { @@ -1379,87 +944,6 @@ func addExistsOperator(item model.TagQuery, tagMapType string, not bool) (string return fmt.Sprintf(" AND %s (%s)", notStr, strings.Join(tagOperatorPair, " OR ")), args } -func (r *ClickHouseReader) GetTagFilters(ctx context.Context, queryParams *model.TagFilterParams) (*model.TagFilters, *model.ApiError) { - - excludeMap := make(map[string]struct{}) - for _, e := range queryParams.Exclude { - if e == constants.OperationRequest { - excludeMap[constants.OperationDB] = struct{}{} - continue - } - excludeMap[e] = struct{}{} - } - - var query string - args := []interface{}{clickhouse.Named("timestampL", strconv.FormatInt(queryParams.Start.UnixNano(), 10)), clickhouse.Named("timestampU", strconv.FormatInt(queryParams.End.UnixNano(), 10))} - if len(queryParams.TraceID) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.TraceID, constants.TraceID, &query, args) - } - if len(queryParams.ServiceName) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.ServiceName, constants.ServiceName, &query, args) - } - if len(queryParams.HttpRoute) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpRoute, constants.HttpRoute, &query, args) - } - if len(queryParams.HttpHost) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpHost, constants.HttpHost, &query, args) - } - if len(queryParams.HttpMethod) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpMethod, constants.HttpMethod, &query, args) - } - if len(queryParams.HttpUrl) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpUrl, constants.HttpUrl, &query, args) - } - if len(queryParams.Operation) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.Operation, constants.OperationDB, &query, args) - } - if len(queryParams.RPCMethod) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.RPCMethod, constants.RPCMethod, &query, args) - } - if len(queryParams.ResponseStatusCode) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.ResponseStatusCode, constants.ResponseStatusCode, &query, args) - } - if len(queryParams.MinDuration) != 0 { - query = query + " AND durationNano >= @durationNanoMin" - args = append(args, clickhouse.Named("durationNanoMin", queryParams.MinDuration)) - } - if len(queryParams.MaxDuration) != 0 { - query = query + " AND durationNano <= @durationNanoMax" - args = append(args, clickhouse.Named("durationNanoMax", queryParams.MaxDuration)) - } - if len(queryParams.SpanKind) != 0 { - query = query + " AND kind = @kind" - args = append(args, clickhouse.Named("kind", queryParams.SpanKind)) - } - - query = getStatusFilters(query, queryParams.Status, excludeMap) - - tagFilters := []model.TagFilters{} - - // Alternative finalQuery := fmt.Sprintf(`SELECT DISTINCT arrayJoin(tagMap.keys) as tagKeys FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU`, r.TraceDB, r.indexTable) - finalQuery := fmt.Sprintf(`SELECT groupUniqArrayArray(mapKeys(stringTagMap)) as stringTagKeys, groupUniqArrayArray(mapKeys(numberTagMap)) as numberTagKeys, groupUniqArrayArray(mapKeys(boolTagMap)) as boolTagKeys FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU`, r.TraceDB, r.indexTable) - finalQuery += query - err := r.db.Select(ctx, &tagFilters, finalQuery, args...) - - zap.L().Info(query) - - if err != nil { - zap.L().Error("Error in processing sql query", zap.Error(err)) - return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query")} - } - tagFiltersResult := model.TagFilters{ - StringTagKeys: make([]string, 0), - NumberTagKeys: make([]string, 0), - BoolTagKeys: make([]string, 0), - } - if len(tagFilters) != 0 { - tagFiltersResult.StringTagKeys = excludeTags(ctx, tagFilters[0].StringTagKeys) - tagFiltersResult.NumberTagKeys = excludeTags(ctx, tagFilters[0].NumberTagKeys) - tagFiltersResult.BoolTagKeys = excludeTags(ctx, tagFilters[0].BoolTagKeys) - } - return &tagFiltersResult, nil -} - func excludeTags(_ context.Context, tags []string) []string { excludedTagsMap := map[string]bool{ "http.code": true, @@ -1483,102 +967,6 @@ func excludeTags(_ context.Context, tags []string) []string { return newTags } -func (r *ClickHouseReader) GetTagValues(ctx context.Context, queryParams *model.TagFilterParams) (*model.TagValues, *model.ApiError) { - - if queryParams.TagKey.Type == model.TagTypeNumber { - return &model.TagValues{ - NumberTagValues: make([]float64, 0), - StringTagValues: make([]string, 0), - BoolTagValues: make([]bool, 0), - }, nil - } else if queryParams.TagKey.Type == model.TagTypeBool { - return &model.TagValues{ - NumberTagValues: make([]float64, 0), - StringTagValues: make([]string, 0), - BoolTagValues: []bool{true, false}, - }, nil - } - - excludeMap := make(map[string]struct{}) - for _, e := range queryParams.Exclude { - if e == constants.OperationRequest { - excludeMap[constants.OperationDB] = struct{}{} - continue - } - excludeMap[e] = struct{}{} - } - - var query string - args := []interface{}{clickhouse.Named("timestampL", strconv.FormatInt(queryParams.Start.UnixNano(), 10)), clickhouse.Named("timestampU", strconv.FormatInt(queryParams.End.UnixNano(), 10))} - if len(queryParams.TraceID) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.TraceID, constants.TraceID, &query, args) - } - if len(queryParams.ServiceName) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.ServiceName, constants.ServiceName, &query, args) - } - if len(queryParams.HttpRoute) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpRoute, constants.HttpRoute, &query, args) - } - if len(queryParams.HttpHost) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpHost, constants.HttpHost, &query, args) - } - if len(queryParams.HttpMethod) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpMethod, constants.HttpMethod, &query, args) - } - if len(queryParams.HttpUrl) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpUrl, constants.HttpUrl, &query, args) - } - if len(queryParams.Operation) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.Operation, constants.OperationDB, &query, args) - } - if len(queryParams.MinDuration) != 0 { - query = query + " AND durationNano >= @durationNanoMin" - args = append(args, clickhouse.Named("durationNanoMin", queryParams.MinDuration)) - } - if len(queryParams.MaxDuration) != 0 { - query = query + " AND durationNano <= @durationNanoMax" - args = append(args, clickhouse.Named("durationNanoMax", queryParams.MaxDuration)) - } - if len(queryParams.SpanKind) != 0 { - query = query + " AND kind = @kind" - args = append(args, clickhouse.Named("kind", queryParams.SpanKind)) - } - - query = getStatusFilters(query, queryParams.Status, excludeMap) - - tagValues := []model.TagValues{} - - finalQuery := fmt.Sprintf(`SELECT groupArray(DISTINCT stringTagMap[@key]) as stringTagValues FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU`, r.TraceDB, r.indexTable) - finalQuery += query - finalQuery += " LIMIT @limit" - - args = append(args, clickhouse.Named("key", queryParams.TagKey.Key)) - args = append(args, clickhouse.Named("limit", queryParams.Limit)) - err := r.db.Select(ctx, &tagValues, finalQuery, args...) - - zap.L().Info(query) - - if err != nil { - zap.L().Error("Error in processing sql query", zap.Error(err)) - return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query")} - } - - cleanedTagValues := model.TagValues{ - StringTagValues: []string{}, - NumberTagValues: []float64{}, - BoolTagValues: []bool{}, - } - if len(tagValues) == 0 { - return &cleanedTagValues, nil - } - for _, e := range tagValues[0].StringTagValues { - if e != "" { - cleanedTagValues.StringTagValues = append(cleanedTagValues.StringTagValues, e) - } - } - return &cleanedTagValues, nil -} - func (r *ClickHouseReader) GetTopOperations(ctx context.Context, queryParams *model.GetTopOperationsParams) (*[]model.TopOperationsItem, *model.ApiError) { namedArgs := []interface{}{ @@ -1823,185 +1211,6 @@ func (r *ClickHouseReader) GetDependencyGraph(ctx context.Context, queryParams * return &response, nil } -func (r *ClickHouseReader) GetFilteredSpansAggregates(ctx context.Context, queryParams *model.GetFilteredSpanAggregatesParams) (*model.GetFilteredSpansAggregatesResponse, *model.ApiError) { - - excludeMap := make(map[string]struct{}) - for _, e := range queryParams.Exclude { - if e == constants.OperationRequest { - excludeMap[constants.OperationDB] = struct{}{} - continue - } - excludeMap[e] = struct{}{} - } - - SpanAggregatesDBResponseItems := []model.SpanAggregatesDBResponseItem{} - - aggregation_query := "" - if queryParams.Dimension == "duration" { - switch queryParams.AggregationOption { - case "p50": - aggregation_query = " quantile(0.50)(durationNano) as float64Value " - case "p95": - aggregation_query = " quantile(0.95)(durationNano) as float64Value " - case "p90": - aggregation_query = " quantile(0.90)(durationNano) as float64Value " - case "p99": - aggregation_query = " quantile(0.99)(durationNano) as float64Value " - case "max": - aggregation_query = " max(durationNano) as value " - case "min": - aggregation_query = " min(durationNano) as value " - case "avg": - aggregation_query = " avg(durationNano) as float64Value " - case "sum": - aggregation_query = " sum(durationNano) as value " - default: - return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("aggregate type: %s not supported", queryParams.AggregationOption)} - } - } else if queryParams.Dimension == "calls" { - aggregation_query = " count(*) as value " - } - - args := []interface{}{clickhouse.Named("timestampL", strconv.FormatInt(queryParams.Start.UnixNano(), 10)), clickhouse.Named("timestampU", strconv.FormatInt(queryParams.End.UnixNano(), 10))} - - var query string - var customStr []string - _, columnExists := constants.GroupByColMap[queryParams.GroupBy] - // Using %s for groupBy params as it can be a custom column and custom columns are not supported by clickhouse-go yet: - // issue link: https://github.com/ClickHouse/clickhouse-go/issues/870 - if queryParams.GroupBy != "" && columnExists { - query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, %s as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, queryParams.GroupBy, aggregation_query, r.TraceDB, r.indexTable) - args = append(args, clickhouse.Named("groupByVar", queryParams.GroupBy)) - } else if queryParams.GroupBy != "" { - customStr = strings.Split(queryParams.GroupBy, ".(") - if len(customStr) < 2 { - return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("GroupBy: %s not supported", queryParams.GroupBy)} - } - if customStr[1] == string(model.TagTypeString)+")" { - query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, stringTagMap['%s'] as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, customStr[0], aggregation_query, r.TraceDB, r.indexTable) - } else if customStr[1] == string(model.TagTypeNumber)+")" { - query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, toString(numberTagMap['%s']) as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, customStr[0], aggregation_query, r.TraceDB, r.indexTable) - } else if customStr[1] == string(model.TagTypeBool)+")" { - query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, toString(boolTagMap['%s']) as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, customStr[0], aggregation_query, r.TraceDB, r.indexTable) - } else { - // return error for unsupported group by - return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("GroupBy: %s not supported", queryParams.GroupBy)} - } - } else { - query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.TraceDB, r.indexTable) - } - - if len(queryParams.TraceID) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.TraceID, constants.TraceID, &query, args) - } - if len(queryParams.ServiceName) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.ServiceName, constants.ServiceName, &query, args) - } - if len(queryParams.HttpRoute) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpRoute, constants.HttpRoute, &query, args) - } - if len(queryParams.HttpHost) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpHost, constants.HttpHost, &query, args) - } - if len(queryParams.HttpMethod) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpMethod, constants.HttpMethod, &query, args) - } - if len(queryParams.HttpUrl) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpUrl, constants.HttpUrl, &query, args) - } - if len(queryParams.Operation) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.Operation, constants.OperationDB, &query, args) - } - if len(queryParams.RPCMethod) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.RPCMethod, constants.RPCMethod, &query, args) - } - if len(queryParams.ResponseStatusCode) > 0 { - args = buildFilterArrayQuery(ctx, excludeMap, queryParams.ResponseStatusCode, constants.ResponseStatusCode, &query, args) - } - if len(queryParams.MinDuration) != 0 { - query = query + " AND durationNano >= @durationNanoMin" - args = append(args, clickhouse.Named("durationNanoMin", queryParams.MinDuration)) - } - if len(queryParams.MaxDuration) != 0 { - query = query + " AND durationNano <= @durationNanoMax" - args = append(args, clickhouse.Named("durationNanoMax", queryParams.MaxDuration)) - } - query = getStatusFilters(query, queryParams.Status, excludeMap) - - if len(queryParams.SpanKind) != 0 { - query = query + " AND kind = @kind" - args = append(args, clickhouse.Named("kind", queryParams.SpanKind)) - } - // create TagQuery from TagQueryParams - tags := createTagQueryFromTagQueryParams(queryParams.Tags) - subQuery, argsSubQuery, errStatus := buildQueryWithTagParams(ctx, tags) - query += subQuery - args = append(args, argsSubQuery...) - - if errStatus != nil { - return nil, errStatus - } - - if queryParams.GroupBy != "" && columnExists { - query = query + fmt.Sprintf(" GROUP BY time, %s as groupBy ORDER BY time", queryParams.GroupBy) - } else if queryParams.GroupBy != "" { - if customStr[1] == string(model.TagTypeString)+")" { - query = query + fmt.Sprintf(" GROUP BY time, stringTagMap['%s'] as groupBy ORDER BY time", customStr[0]) - } else if customStr[1] == string(model.TagTypeNumber)+")" { - query = query + fmt.Sprintf(" GROUP BY time, toString(numberTagMap['%s']) as groupBy ORDER BY time", customStr[0]) - } else if customStr[1] == string(model.TagTypeBool)+")" { - query = query + fmt.Sprintf(" GROUP BY time, toString(boolTagMap['%s']) as groupBy ORDER BY time", customStr[0]) - } - } else { - query = query + " GROUP BY time ORDER BY time" - } - - err := r.db.Select(ctx, &SpanAggregatesDBResponseItems, query, args...) - - zap.L().Info(query) - - if err != nil { - zap.L().Error("Error in processing sql query", zap.Error(err)) - return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query")} - } - - GetFilteredSpansAggregatesResponse := model.GetFilteredSpansAggregatesResponse{ - Items: map[int64]model.SpanAggregatesResponseItem{}, - } - - for i := range SpanAggregatesDBResponseItems { - if SpanAggregatesDBResponseItems[i].Value == 0 { - SpanAggregatesDBResponseItems[i].Value = uint64(SpanAggregatesDBResponseItems[i].Float64Value) - } - SpanAggregatesDBResponseItems[i].Timestamp = int64(SpanAggregatesDBResponseItems[i].Time.UnixNano()) - SpanAggregatesDBResponseItems[i].FloatValue = float32(SpanAggregatesDBResponseItems[i].Value) - if queryParams.AggregationOption == "rate_per_sec" { - SpanAggregatesDBResponseItems[i].FloatValue = float32(SpanAggregatesDBResponseItems[i].Value) / float32(queryParams.StepSeconds) - } - if responseElement, ok := GetFilteredSpansAggregatesResponse.Items[SpanAggregatesDBResponseItems[i].Timestamp]; !ok { - if queryParams.GroupBy != "" && SpanAggregatesDBResponseItems[i].GroupBy != "" { - GetFilteredSpansAggregatesResponse.Items[SpanAggregatesDBResponseItems[i].Timestamp] = model.SpanAggregatesResponseItem{ - Timestamp: SpanAggregatesDBResponseItems[i].Timestamp, - GroupBy: map[string]float32{SpanAggregatesDBResponseItems[i].GroupBy: SpanAggregatesDBResponseItems[i].FloatValue}, - } - } else if queryParams.GroupBy == "" { - GetFilteredSpansAggregatesResponse.Items[SpanAggregatesDBResponseItems[i].Timestamp] = model.SpanAggregatesResponseItem{ - Timestamp: SpanAggregatesDBResponseItems[i].Timestamp, - Value: SpanAggregatesDBResponseItems[i].FloatValue, - } - } - - } else { - if queryParams.GroupBy != "" && SpanAggregatesDBResponseItems[i].GroupBy != "" { - responseElement.GroupBy[SpanAggregatesDBResponseItems[i].GroupBy] = SpanAggregatesDBResponseItems[i].FloatValue - } - GetFilteredSpansAggregatesResponse.Items[SpanAggregatesDBResponseItems[i].Timestamp] = responseElement - } - } - - return &GetFilteredSpansAggregatesResponse, nil -} - func getLocalTableName(tableName string) string { tableNameSplit := strings.Split(tableName, ".") diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 6586e21d98..2be68ede43 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -526,12 +526,6 @@ func (aH *APIHandler) RegisterRoutes(router *mux.Router, am *AuthMiddleware) { router.HandleFunc("/api/v1/configs", am.OpenAccess(aH.getConfigs)).Methods(http.MethodGet) router.HandleFunc("/api/v1/health", am.OpenAccess(aH.getHealth)).Methods(http.MethodGet) - router.HandleFunc("/api/v1/getSpanFilters", am.ViewAccess(aH.getSpanFilters)).Methods(http.MethodPost) - router.HandleFunc("/api/v1/getTagFilters", am.ViewAccess(aH.getTagFilters)).Methods(http.MethodPost) - router.HandleFunc("/api/v1/getFilteredSpans", am.ViewAccess(aH.getFilteredSpans)).Methods(http.MethodPost) - router.HandleFunc("/api/v1/getFilteredSpans/aggregates", am.ViewAccess(aH.getFilteredSpanAggregates)).Methods(http.MethodPost) - router.HandleFunc("/api/v1/getTagValues", am.ViewAccess(aH.getTagValues)).Methods(http.MethodPost) - router.HandleFunc("/api/v1/listErrors", am.ViewAccess(aH.listErrors)).Methods(http.MethodPost) router.HandleFunc("/api/v1/countErrors", am.ViewAccess(aH.countErrors)).Methods(http.MethodPost) router.HandleFunc("/api/v1/errorFromErrorID", am.ViewAccess(aH.getErrorFromErrorID)).Methods(http.MethodGet) @@ -1847,86 +1841,6 @@ func (aH *APIHandler) getErrorFromGroupID(w http.ResponseWriter, r *http.Request aH.WriteJSON(w, r, result) } -func (aH *APIHandler) getSpanFilters(w http.ResponseWriter, r *http.Request) { - - query, err := parseSpanFilterRequestBody(r) - if aH.HandleError(w, err, http.StatusBadRequest) { - return - } - - result, apiErr := aH.reader.GetSpanFilters(r.Context(), query) - - if apiErr != nil && aH.HandleError(w, apiErr.Err, http.StatusInternalServerError) { - return - } - - aH.WriteJSON(w, r, result) -} - -func (aH *APIHandler) getFilteredSpans(w http.ResponseWriter, r *http.Request) { - - query, err := parseFilteredSpansRequest(r, aH) - if aH.HandleError(w, err, http.StatusBadRequest) { - return - } - - result, apiErr := aH.reader.GetFilteredSpans(r.Context(), query) - - if apiErr != nil && aH.HandleError(w, apiErr.Err, http.StatusInternalServerError) { - return - } - - aH.WriteJSON(w, r, result) -} - -func (aH *APIHandler) getFilteredSpanAggregates(w http.ResponseWriter, r *http.Request) { - - query, err := parseFilteredSpanAggregatesRequest(r) - if aH.HandleError(w, err, http.StatusBadRequest) { - return - } - - result, apiErr := aH.reader.GetFilteredSpansAggregates(r.Context(), query) - - if apiErr != nil && aH.HandleError(w, apiErr.Err, http.StatusInternalServerError) { - return - } - - aH.WriteJSON(w, r, result) -} - -func (aH *APIHandler) getTagFilters(w http.ResponseWriter, r *http.Request) { - - query, err := parseTagFilterRequest(r) - if aH.HandleError(w, err, http.StatusBadRequest) { - return - } - - result, apiErr := aH.reader.GetTagFilters(r.Context(), query) - - if apiErr != nil && aH.HandleError(w, apiErr.Err, http.StatusInternalServerError) { - return - } - - aH.WriteJSON(w, r, result) -} - -func (aH *APIHandler) getTagValues(w http.ResponseWriter, r *http.Request) { - - query, err := parseTagValueRequest(r) - if aH.HandleError(w, err, http.StatusBadRequest) { - return - } - - result, apiErr := aH.reader.GetTagValues(r.Context(), query) - - if apiErr != nil && aH.HandleError(w, apiErr.Err, http.StatusInternalServerError) { - return - } - - aH.WriteJSON(w, r, result) -} - func (aH *APIHandler) setTTL(w http.ResponseWriter, r *http.Request) { ttlParams, err := parseTTLParams(r) if aH.HandleError(w, err, http.StatusBadRequest) { diff --git a/pkg/query-service/interfaces/interface.go b/pkg/query-service/interfaces/interface.go index baad5f0a22..10c718aa28 100644 --- a/pkg/query-service/interfaces/interface.go +++ b/pkg/query-service/interfaces/interface.go @@ -29,15 +29,10 @@ type Reader interface { // GetDisks returns a list of disks configured in the underlying DB. It is supported by // clickhouse only. GetDisks(ctx context.Context) (*[]model.DiskItem, *model.ApiError) - GetSpanFilters(ctx context.Context, query *model.SpanFilterParams) (*model.SpanFiltersResponse, *model.ApiError) GetTraceAggregateAttributes(ctx context.Context, req *v3.AggregateAttributeRequest) (*v3.AggregateAttributeResponse, error) GetTraceAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) GetTraceAttributeValues(ctx context.Context, req *v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error) GetSpanAttributeKeys(ctx context.Context) (map[string]v3.AttributeKey, error) - GetTagFilters(ctx context.Context, query *model.TagFilterParams) (*model.TagFilters, *model.ApiError) - GetTagValues(ctx context.Context, query *model.TagFilterParams) (*model.TagValues, *model.ApiError) - GetFilteredSpans(ctx context.Context, query *model.GetFilteredSpansParams) (*model.GetFilterSpansResponse, *model.ApiError) - GetFilteredSpansAggregates(ctx context.Context, query *model.GetFilteredSpanAggregatesParams) (*model.GetFilteredSpansAggregatesResponse, *model.ApiError) ListErrors(ctx context.Context, params *model.ListErrorsParams) (*[]model.Error, *model.ApiError) CountErrors(ctx context.Context, params *model.CountErrorsParams) (uint64, *model.ApiError)