diff --git a/deploy/docker/clickhouse-setup/docker-compose.yaml b/deploy/docker/clickhouse-setup/docker-compose.yaml index 2a9d3db67a..16851112da 100644 --- a/deploy/docker/clickhouse-setup/docker-compose.yaml +++ b/deploy/docker/clickhouse-setup/docker-compose.yaml @@ -11,7 +11,6 @@ services: - 8123:8123 volumes: - ./clickhouse-config.xml:/etc/clickhouse-server/config.xml - - ./docker-entrypoint-initdb.d/init-db.sql:/docker-entrypoint-initdb.d/init-db.sql - ./data/clickhouse/:/var/lib/clickhouse/ healthcheck: diff --git a/deploy/docker/clickhouse-setup/docker-entrypoint-initdb.d/init-db.sql b/deploy/docker/clickhouse-setup/docker-entrypoint-initdb.d/init-db.sql deleted file mode 100644 index f71983c083..0000000000 --- a/deploy/docker/clickhouse-setup/docker-entrypoint-initdb.d/init-db.sql +++ /dev/null @@ -1,31 +0,0 @@ -CREATE TABLE IF NOT EXISTS signoz_index ( - timestamp DateTime64(9) CODEC(Delta, ZSTD(1)), - traceID String CODEC(ZSTD(1)), - spanID String CODEC(ZSTD(1)), - parentSpanID String CODEC(ZSTD(1)), - serviceName LowCardinality(String) CODEC(ZSTD(1)), - name LowCardinality(String) CODEC(ZSTD(1)), - kind Int32 CODEC(ZSTD(1)), - durationNano UInt64 CODEC(ZSTD(1)), - tags Array(String) CODEC(ZSTD(1)), - tagsKeys Array(String) CODEC(ZSTD(1)), - tagsValues Array(String) CODEC(ZSTD(1)), - statusCode Int64 CODEC(ZSTD(1)), - references String CODEC(ZSTD(1)), - externalHttpMethod Nullable(String) CODEC(ZSTD(1)), - externalHttpUrl Nullable(String) CODEC(ZSTD(1)), - component Nullable(String) CODEC(ZSTD(1)), - dbSystem Nullable(String) CODEC(ZSTD(1)), - dbName Nullable(String) CODEC(ZSTD(1)), - dbOperation Nullable(String) CODEC(ZSTD(1)), - peerService Nullable(String) CODEC(ZSTD(1)), - INDEX idx_traceID traceID TYPE bloom_filter GRANULARITY 4, - INDEX idx_service serviceName TYPE bloom_filter GRANULARITY 4, - INDEX idx_name name TYPE bloom_filter GRANULARITY 4, - INDEX idx_kind kind TYPE minmax GRANULARITY 4, - INDEX idx_tagsKeys tagsKeys TYPE bloom_filter(0.01) GRANULARITY 64, - INDEX idx_tagsValues tagsValues TYPE bloom_filter(0.01) GRANULARITY 64, - INDEX idx_duration durationNano TYPE minmax GRANULARITY 1 -) ENGINE MergeTree() -PARTITION BY toDate(timestamp) -ORDER BY (serviceName, -toUnixTimestamp(timestamp)) \ No newline at end of file diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index 2ea149a563..4d418ce0e4 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -1329,6 +1329,730 @@ func (r *ClickHouseReader) SearchSpans(ctx context.Context, queryParams *model.S return &searchSpansResult, nil } +func (r *ClickHouseReader) GetSpanFilters(ctx context.Context, queryParams *model.SpanFilterParams) (*model.SpanFiltersResponse, *model.ApiError) { + + var query string + args := []interface{}{strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10)} + if len(queryParams.ServiceName) > 0 { + for i, e := range queryParams.ServiceName { + if i == 0 && i == len(queryParams.ServiceName)-1 { + query += " AND (serviceName=?)" + } else if i == 0 && i != len(queryParams.ServiceName)-1 { + query += " AND (serviceName=?" + } else if i != 0 && i == len(queryParams.ServiceName)-1 { + query += " OR serviceName=?)" + } else { + query += " OR serviceName=?" + } + args = append(args, e) + } + } + if len(queryParams.HttpRoute) > 0 { + for i, e := range queryParams.HttpRoute { + if i == 0 && i == len(queryParams.HttpRoute)-1 { + query += " AND (httpRoute=?)" + } else if i == 0 && i != len(queryParams.HttpRoute)-1 { + query += " AND (httpRoute=?" + } else if i != 0 && i == len(queryParams.HttpRoute)-1 { + query += " OR httpRoute=?)" + } else { + query += " OR httpRoute=?" + } + args = append(args, e) + } + } + if len(queryParams.HttpCode) > 0 { + for i, e := range queryParams.HttpCode { + if i == 0 && i == len(queryParams.HttpCode)-1 { + query += " AND (httpCode=?)" + } else if i == 0 && i != len(queryParams.HttpCode)-1 { + query += " AND (httpCode=?" + } else if i != 0 && i == len(queryParams.HttpCode)-1 { + query += " OR httpCode=?)" + } else { + query += " OR httpCode=?" + } + args = append(args, e) + } + } + if len(queryParams.HttpHost) > 0 { + for i, e := range queryParams.HttpHost { + if i == 0 && i == len(queryParams.HttpHost)-1 { + query += " AND (httpHost=?)" + } else if i == 0 && i != len(queryParams.HttpHost)-1 { + query += " AND (httpHost=?" + } else if i != 0 && i == len(queryParams.HttpHost)-1 { + query += " OR httpHost=?)" + } else { + query += " OR httpHost=?" + } + args = append(args, e) + } + } + if len(queryParams.HttpMethod) > 0 { + for i, e := range queryParams.HttpMethod { + if i == 0 && i == len(queryParams.HttpMethod)-1 { + query += " AND (httpMethod=?)" + } else if i == 0 && i != len(queryParams.HttpMethod)-1 { + query += " AND (httpMethod=?" + } else if i != 0 && i == len(queryParams.HttpMethod)-1 { + query += " OR httpMethod=?)" + } else { + query += " OR httpMethod=?" + } + args = append(args, e) + } + } + if len(queryParams.HttpUrl) > 0 { + for i, e := range queryParams.HttpUrl { + if i == 0 && i == len(queryParams.HttpUrl)-1 { + query += " AND (httpUrl=?)" + } else if i == 0 && i != len(queryParams.HttpUrl)-1 { + query += " AND (httpUrl=?" + } else if i != 0 && i == len(queryParams.HttpUrl)-1 { + query += " OR httpUrl=?)" + } else { + query += " OR httpUrl=?" + } + args = append(args, e) + } + } + if len(queryParams.Component) > 0 { + for i, e := range queryParams.Component { + if i == 0 && i == len(queryParams.Component)-1 { + query += " AND (component=?)" + } else if i == 0 && i != len(queryParams.Component)-1 { + query += " AND (component=?" + } else if i != 0 && i == len(queryParams.Component)-1 { + query += " OR component=?)" + } else { + query += " OR component=?" + } + args = append(args, e) + } + } + if len(queryParams.Operation) > 0 { + for i, e := range queryParams.Operation { + if i == 0 && i == len(queryParams.Operation)-1 { + query += " AND (name=?)" + } else if i == 0 && i != len(queryParams.Operation)-1 { + query += " AND (name=?" + } else if i != 0 && i == len(queryParams.Operation)-1 { + query += " OR name=?)" + } else { + query += " OR name=?" + } + args = append(args, e) + } + } + + if len(queryParams.MinDuration) != 0 { + query = query + " AND durationNano >= ?" + args = append(args, queryParams.MinDuration) + } + if len(queryParams.MaxDuration) != 0 { + query = query + " AND durationNano <= ?" + args = append(args, queryParams.MaxDuration) + } + + // status can only be two and if both are selected than they are equivalent to none selected + if len(queryParams.Status) == 1 { + if queryParams.Status[0] == "error" { + query += " AND ( ( has(tags, 'error:true') OR statusCode>=500 OR statusCode=2))" + } else if queryParams.Status[0] == "ok" { + query += " AND ((NOT ( has(tags, 'error:true')) AND statusCode<500 AND statusCode!=2))" + } + } + + traceFilterReponse := model.SpanFiltersResponse{ + Status: map[string]int{}, + Duration: map[string]int{}, + ServiceName: map[string]int{}, + Operation: map[string]int{}, + HttpCode: map[string]int{}, + HttpMethod: map[string]int{}, + HttpUrl: map[string]int{}, + HttpRoute: map[string]int{}, + HttpHost: map[string]int{}, + Component: map[string]int{}, + } + + for _, e := range queryParams.GetFilters { + switch e { + case "serviceName": + finalQuery := fmt.Sprintf("SELECT serviceName, count() as count FROM %s WHERE timestamp >= ? AND timestamp <= ?", r.indexTable) + finalQuery += query + finalQuery += " GROUP BY serviceName" + var dBResponse []model.DBResponseServiceName + err := r.db.Select(&dBResponse, finalQuery, args...) + if err != nil { + zap.S().Debug("Error in processing sql query: ", err) + return nil, &model.ApiError{model.ErrorExec, fmt.Errorf("Error in processing sql query", err)} + } + for _, service := range dBResponse { + if service.ServiceName != "" { + traceFilterReponse.ServiceName[service.ServiceName] = service.Count + } + } + case "httpCode": + finalQuery := fmt.Sprintf("SELECT httpCode, count() as count FROM %s WHERE timestamp >= ? AND timestamp <= ?", r.indexTable) + finalQuery += query + finalQuery += " GROUP BY httpCode" + var dBResponse []model.DBResponseHttpCode + err := r.db.Select(&dBResponse, finalQuery, args...) + if err != nil { + zap.S().Debug("Error in processing sql query: ", err) + return nil, &model.ApiError{model.ErrorExec, fmt.Errorf("Error in processing sql query", err)} + } + for _, service := range dBResponse { + if service.HttpCode != "" { + traceFilterReponse.HttpCode[service.HttpCode] = service.Count + } + } + case "httpRoute": + finalQuery := fmt.Sprintf("SELECT httpRoute, count() as count FROM %s WHERE timestamp >= ? AND timestamp <= ?", r.indexTable) + finalQuery += query + finalQuery += " GROUP BY httpRoute" + var dBResponse []model.DBResponseHttpRoute + err := r.db.Select(&dBResponse, finalQuery, args...) + if err != nil { + zap.S().Debug("Error in processing sql query: ", err) + return nil, &model.ApiError{model.ErrorExec, fmt.Errorf("Error in processing sql query", err)} + } + for _, service := range dBResponse { + if service.HttpRoute != "" { + traceFilterReponse.HttpRoute[service.HttpRoute] = service.Count + } + } + case "httpUrl": + finalQuery := fmt.Sprintf("SELECT httpUrl, count() as count FROM %s WHERE timestamp >= ? AND timestamp <= ?", r.indexTable) + finalQuery += query + finalQuery += " GROUP BY httpUrl" + var dBResponse []model.DBResponseHttpUrl + err := r.db.Select(&dBResponse, finalQuery, args...) + if err != nil { + zap.S().Debug("Error in processing sql query: ", err) + return nil, &model.ApiError{model.ErrorExec, fmt.Errorf("Error in processing sql query", err)} + } + for _, service := range dBResponse { + if service.HttpUrl != "" { + traceFilterReponse.HttpUrl[service.HttpUrl] = service.Count + } + } + case "httpMethod": + finalQuery := fmt.Sprintf("SELECT httpMethod, count() as count FROM %s WHERE timestamp >= ? AND timestamp <= ?", r.indexTable) + finalQuery += query + finalQuery += " GROUP BY httpMethod" + var dBResponse []model.DBResponseHttpMethod + err := r.db.Select(&dBResponse, finalQuery, args...) + if err != nil { + zap.S().Debug("Error in processing sql query: ", err) + return nil, &model.ApiError{model.ErrorExec, fmt.Errorf("Error in processing sql query", err)} + } + for _, service := range dBResponse { + if service.HttpMethod != "" { + traceFilterReponse.HttpMethod[service.HttpMethod] = service.Count + } + } + case "httpHost": + finalQuery := fmt.Sprintf("SELECT httpHost, count() as count FROM %s WHERE timestamp >= ? AND timestamp <= ?", r.indexTable) + finalQuery += query + finalQuery += " GROUP BY httpHost" + var dBResponse []model.DBResponseHttpHost + err := r.db.Select(&dBResponse, finalQuery, args...) + if err != nil { + zap.S().Debug("Error in processing sql query: ", err) + return nil, &model.ApiError{model.ErrorExec, fmt.Errorf("Error in processing sql query", err)} + } + for _, service := range dBResponse { + if service.HttpHost != "" { + traceFilterReponse.HttpHost[service.HttpHost] = service.Count + } + } + case "operation": + finalQuery := fmt.Sprintf("SELECT name, count() as count FROM %s WHERE timestamp >= ? AND timestamp <= ?", r.indexTable) + finalQuery += query + finalQuery += " GROUP BY name" + var dBResponse []model.DBResponseOperation + err := r.db.Select(&dBResponse, finalQuery, args...) + if err != nil { + zap.S().Debug("Error in processing sql query: ", err) + return nil, &model.ApiError{model.ErrorExec, fmt.Errorf("Error in processing sql query", err)} + } + for _, service := range dBResponse { + if service.Operation != "" { + traceFilterReponse.Operation[service.Operation] = service.Count + } + } + case "component": + finalQuery := fmt.Sprintf("SELECT component, count() as count FROM %s WHERE timestamp >= ? AND timestamp <= ?", r.indexTable) + finalQuery += query + finalQuery += " GROUP BY component" + var dBResponse []model.DBResponseComponent + err := r.db.Select(&dBResponse, finalQuery, args...) + if err != nil { + zap.S().Debug("Error in processing sql query: ", err) + return nil, &model.ApiError{model.ErrorExec, fmt.Errorf("Error in processing sql query", err)} + } + for _, service := range dBResponse { + if service.Component.String != "" { + traceFilterReponse.Component[service.Component.String] = service.Count + } + } + case "status": + finalQuery := fmt.Sprintf("SELECT COUNT(*) as numErrors FROM %s WHERE timestamp >= ? AND timestamp <= ? AND ( ( has(tags, 'error:true') OR statusCode>=500 OR statusCode=2))", r.indexTable) + finalQuery += query + var dBResponse []model.DBResponseErrors + err := r.db.Select(&dBResponse, finalQuery, args...) + if err != nil { + zap.S().Debug("Error in processing sql query: ", err) + return nil, &model.ApiError{model.ErrorExec, fmt.Errorf("Error in processing sql query", err)} + } + + finalQuery2 := fmt.Sprintf("SELECT COUNT(*) as numTotal FROM %s WHERE timestamp >= ? AND timestamp <= ?", r.indexTable) + finalQuery2 += query + var dBResponse2 []model.DBResponseTotal + err = r.db.Select(&dBResponse2, finalQuery2, args...) + if err != nil { + zap.S().Debug("Error in processing sql query: ", err) + return nil, &model.ApiError{model.ErrorExec, fmt.Errorf("Error in processing sql query", err)} + } + traceFilterReponse.Status = map[string]int{"ok": dBResponse2[0].NumTotal - dBResponse[0].NumErrors, "error": dBResponse[0].NumErrors} + case "duration": + finalQuery := fmt.Sprintf("SELECT min(durationNano), max(durationNano) FROM %s WHERE timestamp >= ? AND timestamp <= ?", r.indexTable) + finalQuery += query + var dBResponse []model.DBResponseMinMaxDuration + err := r.db.Select(&dBResponse, finalQuery, args...) + if err != nil { + zap.S().Debug("Error in processing sql query: ", err) + return nil, &model.ApiError{model.ErrorExec, fmt.Errorf("Error in processing sql query", err)} + } + for _, service := range dBResponse { + traceFilterReponse.Duration["minDuration"] = service.MinDuration + traceFilterReponse.Duration["maxDuration"] = service.MaxDuration + } + default: + return nil, &model.ApiError{model.ErrorBadData, fmt.Errorf("filter type: %s not supported", e)} + } + } + + return &traceFilterReponse, nil +} + +func (r *ClickHouseReader) GetFilteredSpans(ctx context.Context, queryParams *model.GetFilteredSpansParams) (*model.GetFilterSpansResponse, *model.ApiError) { + + baseQuery := fmt.Sprintf("SELECT timestamp, spanID, traceID, serviceName, name, durationNano, httpCode, httpMethod FROM %s WHERE timestamp >= ? AND timestamp <= ?", r.indexTable) + + var query string + args := []interface{}{strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10)} + if len(queryParams.ServiceName) > 0 { + for i, e := range queryParams.ServiceName { + if i == 0 && i == len(queryParams.ServiceName)-1 { + query += " AND (serviceName=?)" + } else if i == 0 && i != len(queryParams.ServiceName)-1 { + query += " AND (serviceName=?" + } else if i != 0 && i == len(queryParams.ServiceName)-1 { + query += " OR serviceName=?)" + } else { + query += " OR serviceName=?" + } + args = append(args, e) + } + } + if len(queryParams.HttpRoute) > 0 { + for i, e := range queryParams.HttpRoute { + if i == 0 && i == len(queryParams.HttpRoute)-1 { + query += " AND (httpRoute=?)" + } else if i == 0 && i != len(queryParams.HttpRoute)-1 { + query += " AND (httpRoute=?" + } else if i != 0 && i == len(queryParams.HttpRoute)-1 { + query += " OR httpRoute=?)" + } else { + query += " OR httpRoute=?" + } + args = append(args, e) + } + } + if len(queryParams.HttpCode) > 0 { + for i, e := range queryParams.HttpCode { + if i == 0 && i == len(queryParams.HttpCode)-1 { + query += " AND (httpCode=?)" + } else if i == 0 && i != len(queryParams.HttpCode)-1 { + query += " AND (httpCode=?" + } else if i != 0 && i == len(queryParams.HttpCode)-1 { + query += " OR httpCode=?)" + } else { + query += " OR httpCode=?" + } + args = append(args, e) + } + } + if len(queryParams.HttpHost) > 0 { + for i, e := range queryParams.HttpHost { + if i == 0 && i == len(queryParams.HttpHost)-1 { + query += " AND (httpHost=?)" + } else if i == 0 && i != len(queryParams.HttpHost)-1 { + query += " AND (httpHost=?" + } else if i != 0 && i == len(queryParams.HttpHost)-1 { + query += " OR httpHost=?)" + } else { + query += " OR httpHost=?" + } + args = append(args, e) + } + } + if len(queryParams.HttpMethod) > 0 { + for i, e := range queryParams.HttpMethod { + if i == 0 && i == len(queryParams.HttpMethod)-1 { + query += " AND (httpMethod=?)" + } else if i == 0 && i != len(queryParams.HttpMethod)-1 { + query += " AND (httpMethod=?" + } else if i != 0 && i == len(queryParams.HttpMethod)-1 { + query += " OR httpMethod=?)" + } else { + query += " OR httpMethod=?" + } + args = append(args, e) + } + } + if len(queryParams.HttpUrl) > 0 { + for i, e := range queryParams.HttpUrl { + if i == 0 && i == len(queryParams.HttpUrl)-1 { + query += " AND (httpUrl=?)" + } else if i == 0 && i != len(queryParams.HttpUrl)-1 { + query += " AND (httpUrl=?" + } else if i != 0 && i == len(queryParams.HttpUrl)-1 { + query += " OR httpUrl=?)" + } else { + query += " OR httpUrl=?" + } + args = append(args, e) + } + } + if len(queryParams.Component) > 0 { + for i, e := range queryParams.Component { + if i == 0 && i == len(queryParams.Component)-1 { + query += " AND (component=?)" + } else if i == 0 && i != len(queryParams.Component)-1 { + query += " AND (component=?" + } else if i != 0 && i == len(queryParams.Component)-1 { + query += " OR component=?)" + } else { + query += " OR component=?" + } + args = append(args, e) + } + } + if len(queryParams.Operation) > 0 { + for i, e := range queryParams.Operation { + if i == 0 && i == len(queryParams.Operation)-1 { + query += " AND (name=?)" + } else if i == 0 && i != len(queryParams.Operation)-1 { + query += " AND (name=?" + } else if i != 0 && i == len(queryParams.Operation)-1 { + query += " OR name=?)" + } else { + query += " OR name=?" + } + args = append(args, e) + } + } + if len(queryParams.MinDuration) != 0 { + query = query + " AND durationNano >= ?" + args = append(args, queryParams.MinDuration) + } + if len(queryParams.MaxDuration) != 0 { + query = query + " AND durationNano <= ?" + args = append(args, queryParams.MaxDuration) + } + if len(queryParams.Status) != 0 { + for _, e := range queryParams.Status { + if e == "error" { + query += " AND ( ( has(tags, 'error:true') OR statusCode>=500 OR statusCode=2))" + } else if e == "ok" { + query += " AND (NOT ( has(tags, 'error:true') AND statusCode<500 AND statusCode!=2))" + } + } + } + if len(queryParams.Kind) != 0 { + query = query + " AND kind = ?" + args = append(args, queryParams.Kind) + } + + for _, item := range queryParams.Tags { + + if item.Operator == "in" { + for i, value := range item.Values { + if i == 0 && i == len(item.Values)-1 { + query += " AND has(tags, ?)" + } else if i == 0 && i != len(item.Values)-1 { + query += " AND (has(tags, ?)" + } else if i != 0 && i == len(item.Values)-1 { + query += " OR has(tags, ?))" + } else { + query += " OR has(tags, ?)" + } + args = append(args, fmt.Sprintf("%s:%s", item.Key, value)) + } + } else if item.Operator == "not in" { + for i, value := range item.Values { + if i == 0 && i == len(item.Values)-1 { + query += " AND NOT has(tags, ?)" + } else if i == 0 && i != len(item.Values)-1 { + query += " AND NOT (has(tags, ?)" + } else if i != 0 && i == len(item.Values)-1 { + query += " OR has(tags, ?))" + } else { + query += " OR has(tags, ?)" + } + args = append(args, fmt.Sprintf("%s:%s", item.Key, value)) + } + } else if item.Operator == "regex" { + if len(item.Values) != 1 { + return nil, &model.ApiError{model.ErrorExec, fmt.Errorf("Regex tag operator should only have one value")} + } + query = query + " AND match(tagsValues[indexOf(tagsKeys, ?)], ?)" + args = append(args, item.Key) + args = append(args, item.Values[0]) + } else if item.Operator == "isnotnull" { + for range item.Values { + query = query + " AND has(tagsKeys, ?)" + args = append(args, item.Key) + } + } else { + return nil, &model.ApiError{model.ErrorExec, fmt.Errorf("Tag Operator %s not supported", item.Operator)} + } + + } + + var totalSpans []model.DBResponseTotal + + totalSpansQuery := fmt.Sprintf(`SELECT count() as numTotal FROM %s WHERE timestamp >= ? AND timestamp <= ?`, r.indexTable) + + totalSpansQuery += query + err := r.db.Select(&totalSpans, totalSpansQuery, args...) + + zap.S().Info(totalSpansQuery) + + if err != nil { + zap.S().Debug("Error in processing sql query: ", err) + return nil, &model.ApiError{model.ErrorExec, fmt.Errorf("Error in processing sql query")} + } + + if len(queryParams.Order) != 0 { + if queryParams.Order == "descending" { + query = query + " ORDER BY timestamp DESC" + } + if queryParams.Order == "ascending" { + query = query + " ORDER BY timestamp ASC" + } + } + if queryParams.Limit > 0 { + query = query + " LIMIT ?" + args = append(args, queryParams.Limit) + } + + if queryParams.Offset > 0 { + // due to bug in SQLx driver, using %d temporarily + query = query + fmt.Sprintf(" OFFSET %d", queryParams.Offset) + // args = append(args, queryParams.Offset) + } + + var getFilterSpansResponseItems []model.GetFilterSpansResponseItem + + baseQuery += query + err = r.db.Select(&getFilterSpansResponseItems, baseQuery, args...) + + zap.S().Info(baseQuery) + + if err != nil { + zap.S().Debug("Error in processing sql query: ", err) + return nil, &model.ApiError{model.ErrorExec, fmt.Errorf("Error in processing sql query")} + } + + getFilterSpansResponse := model.GetFilterSpansResponse{ + Spans: getFilterSpansResponseItems, + TotalSpans: totalSpans[0].NumTotal, + } + + return &getFilterSpansResponse, nil +} + +func (r *ClickHouseReader) GetTagFilters(ctx context.Context, queryParams *model.TagFilterParams) (*[]model.TagFilters, *model.ApiError) { + + var query string + args := []interface{}{strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10)} + if len(queryParams.ServiceName) > 0 { + for i, e := range queryParams.ServiceName { + if i == 0 && i == len(queryParams.ServiceName)-1 { + query += " AND (serviceName=?)" + } else if i == 0 && i != len(queryParams.ServiceName)-1 { + query += " AND (serviceName=?" + } else if i != 0 && i == len(queryParams.ServiceName)-1 { + query += " OR serviceName=?)" + } else { + query += " OR serviceName=?" + } + args = append(args, e) + } + } + if len(queryParams.HttpRoute) > 0 { + for i, e := range queryParams.HttpRoute { + if i == 0 && i == len(queryParams.HttpRoute)-1 { + query += " AND (httpRoute=?)" + } else if i == 0 && i != len(queryParams.HttpRoute)-1 { + query += " AND (httpRoute=?" + } else if i != 0 && i == len(queryParams.HttpRoute)-1 { + query += " OR httpRoute=?)" + } else { + query += " OR httpRoute=?" + } + args = append(args, e) + } + } + if len(queryParams.HttpCode) > 0 { + for i, e := range queryParams.HttpCode { + if i == 0 && i == len(queryParams.HttpCode)-1 { + query += " AND (httpCode=?)" + } else if i == 0 && i != len(queryParams.HttpCode)-1 { + query += " AND (httpCode=?" + } else if i != 0 && i == len(queryParams.HttpCode)-1 { + query += " OR httpCode=?)" + } else { + query += " OR httpCode=?" + } + args = append(args, e) + } + } + if len(queryParams.HttpHost) > 0 { + for i, e := range queryParams.HttpHost { + if i == 0 && i == len(queryParams.HttpHost)-1 { + query += " AND (httpHost=?)" + } else if i == 0 && i != len(queryParams.HttpHost)-1 { + query += " AND (httpHost=?" + } else if i != 0 && i == len(queryParams.HttpHost)-1 { + query += " OR httpHost=?)" + } else { + query += " OR httpHost=?" + } + args = append(args, e) + } + } + if len(queryParams.HttpMethod) > 0 { + for i, e := range queryParams.HttpMethod { + if i == 0 && i == len(queryParams.HttpMethod)-1 { + query += " AND (httpMethod=?)" + } else if i == 0 && i != len(queryParams.HttpMethod)-1 { + query += " AND (httpMethod=?" + } else if i != 0 && i == len(queryParams.HttpMethod)-1 { + query += " OR httpMethod=?)" + } else { + query += " OR httpMethod=?" + } + args = append(args, e) + } + } + if len(queryParams.HttpUrl) > 0 { + for i, e := range queryParams.HttpUrl { + if i == 0 && i == len(queryParams.HttpUrl)-1 { + query += " AND (httpUrl=?)" + } else if i == 0 && i != len(queryParams.HttpUrl)-1 { + query += " AND (httpUrl=?" + } else if i != 0 && i == len(queryParams.HttpUrl)-1 { + query += " OR httpUrl=?)" + } else { + query += " OR httpUrl=?" + } + args = append(args, e) + } + } + if len(queryParams.Component) > 0 { + for i, e := range queryParams.Component { + if i == 0 && i == len(queryParams.Component)-1 { + query += " AND (component=?)" + } else if i == 0 && i != len(queryParams.Component)-1 { + query += " AND (component=?" + } else if i != 0 && i == len(queryParams.Component)-1 { + query += " OR component=?)" + } else { + query += " OR component=?" + } + args = append(args, e) + } + } + if len(queryParams.Operation) > 0 { + for i, e := range queryParams.Operation { + if i == 0 && i == len(queryParams.Operation)-1 { + query += " AND (name=?)" + } else if i == 0 && i != len(queryParams.Operation)-1 { + query += " AND (name=?" + } else if i != 0 && i == len(queryParams.Operation)-1 { + query += " OR name=?)" + } else { + query += " OR name=?" + } + args = append(args, e) + } + } + + if len(queryParams.MinDuration) != 0 { + query = query + " AND durationNano >= ?" + args = append(args, queryParams.MinDuration) + } + if len(queryParams.MaxDuration) != 0 { + query = query + " AND durationNano <= ?" + args = append(args, queryParams.MaxDuration) + } + if len(queryParams.Status) != 0 { + for _, e := range queryParams.Status { + if e == "error" { + query += " AND ( ( has(tags, 'error:true') OR statusCode>=500 OR statusCode=2))" + } else if e == "ok" { + query += " AND (NOT ( has(tags, 'error:true') AND statusCode<500 AND statusCode!=2))" + } + } + } + tagFilters := []model.TagFilters{} + + finalQuery := fmt.Sprintf(`SELECT DISTINCT arrayJoin(tagsKeys) as tagKeys FROM %s WHERE timestamp >= ? AND timestamp <= ?`, r.indexTable) + finalQuery += query + fmt.Println(finalQuery) + err := r.db.Select(&tagFilters, finalQuery, args...) + + zap.S().Info(query) + + if err != nil { + zap.S().Debug("Error in processing sql query: ", err) + return nil, &model.ApiError{model.ErrorExec, fmt.Errorf("Error in processing sql query")} + } + tagFilters = excludeTags(ctx, tagFilters) + + return &tagFilters, nil +} + +func excludeTags(ctx context.Context, tags []model.TagFilters) []model.TagFilters { + excludedTagsMap := map[string]bool{ + "http.code": true, + "http.route": true, + "http.method": true, + "http.url": true, + "http.status_code": true, + "http.host": true, + "messaging.system": true, + "messaging.operation": true, + "component": true, + "error": true, + } + var newTags []model.TagFilters + for _, tag := range tags { + _, ok := excludedTagsMap[tag.TagKeys] + if !ok { + newTags = append(newTags, tag) + } + } + return newTags +} + func (r *ClickHouseReader) GetServiceDBOverview(ctx context.Context, queryParams *model.GetServiceOverviewParams) (*[]model.ServiceDBOverviewItem, error) { var serviceDBOverviewItems []model.ServiceDBOverviewItem @@ -1583,7 +2307,7 @@ func (r *ClickHouseReader) SearchTraces(ctx context.Context, traceId string) (*[ var searchScanReponses []model.SearchSpanReponseItem - query := fmt.Sprintf("SELECT timestamp, spanID, traceID, serviceName, name, kind, durationNano, tagsKeys, tagsValues, references, events FROM %s WHERE traceID='%s'", r.indexTable, traceId) + query := fmt.Sprintf("SELECT timestamp, spanID, traceID, serviceName, name, kind, durationNano, tagsKeys, tagsValues, references, events FROM %s WHERE traceID=?", r.indexTable) err := r.db.Select(&searchScanReponses, query, traceId) @@ -1759,6 +2483,334 @@ func (r *ClickHouseReader) SearchSpansAggregate(ctx context.Context, queryParams } +func (r *ClickHouseReader) GetFilteredSpansAggregates(ctx context.Context, queryParams *model.GetFilteredSpanAggregatesParams) (*model.GetFilteredSpansAggregatesResponse, *model.ApiError) { + + SpanAggregatesDBResponseItems := []model.SpanAggregatesDBResponseItem{} + + aggregation_query := "" + if queryParams.Dimension == "duration" { + switch queryParams.AggregationOption { + case "p50": + aggregation_query = " quantile(0.50)(durationNano) as value " + case "p95": + aggregation_query = " quantile(0.95)(durationNano) as value " + case "p90": + aggregation_query = " quantile(0.90)(durationNano) as value " + case "p99": + aggregation_query = " quantile(0.99)(durationNano) as value " + case "max": + aggregation_query = " max(durationNano) as value " + case "min": + aggregation_query = " min(durationNano) as value " + case "avg": + aggregation_query = " avg(durationNano) as value " + case "sum": + aggregation_query = " sum(durationNano) as value " + default: + return nil, &model.ApiError{model.ErrorBadData, fmt.Errorf("Aggregate type: %s not supported", queryParams.AggregationOption)} + } + } else if queryParams.Dimension == "calls" { + aggregation_query = " count(*) as value " + } + + args := []interface{}{strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10)} + + var query string + if queryParams.GroupBy != "" { + switch queryParams.GroupBy { + case "serviceName": + query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, serviceName as groupBy, %s FROM %s WHERE timestamp >= ? AND timestamp <= ?", queryParams.StepSeconds/60, aggregation_query, r.indexTable) + case "httpCode": + query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, httpCode as groupBy, %s FROM %s WHERE timestamp >= ? AND timestamp <= ?", queryParams.StepSeconds/60, aggregation_query, r.indexTable) + case "httpMethod": + query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, httpMethod as groupBy, %s FROM %s WHERE timestamp >= ? AND timestamp <= ?", queryParams.StepSeconds/60, aggregation_query, r.indexTable) + case "httpUrl": + query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, httpUrl as groupBy, %s FROM %s WHERE timestamp >= ? AND timestamp <= ?", queryParams.StepSeconds/60, aggregation_query, r.indexTable) + case "httpRoute": + query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, httpRoute as groupBy, %s FROM %s WHERE timestamp >= ? AND timestamp <= ?", queryParams.StepSeconds/60, aggregation_query, r.indexTable) + case "httpHost": + query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, httpHost as groupBy, %s FROM %s WHERE timestamp >= ? AND timestamp <= ?", queryParams.StepSeconds/60, aggregation_query, r.indexTable) + case "dbName": + query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, dbName as groupBy, %s FROM %s WHERE timestamp >= ? AND timestamp <= ?", queryParams.StepSeconds/60, aggregation_query, r.indexTable) + case "dbOperation": + query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, dbOperation as groupBy, %s FROM %s WHERE timestamp >= ? AND timestamp <= ?", queryParams.StepSeconds/60, aggregation_query, r.indexTable) + case "operation": + query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, name as groupBy, %s FROM %s WHERE timestamp >= ? AND timestamp <= ?", queryParams.StepSeconds/60, aggregation_query, r.indexTable) + case "msgSystem": + query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, msgSystem as groupBy, %s FROM %s WHERE timestamp >= ? AND timestamp <= ?", queryParams.StepSeconds/60, aggregation_query, r.indexTable) + case "msgOperation": + query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, msgOperation as groupBy, %s FROM %s WHERE timestamp >= ? AND timestamp <= ?", queryParams.StepSeconds/60, aggregation_query, r.indexTable) + case "dbSystem": + query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, dbSystem as groupBy, %s FROM %s WHERE timestamp >= ? AND timestamp <= ?", queryParams.StepSeconds/60, aggregation_query, r.indexTable) + case "component": + query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, component as groupBy, %s FROM %s WHERE timestamp >= ? AND timestamp <= ?", queryParams.StepSeconds/60, aggregation_query, r.indexTable) + default: + return nil, &model.ApiError{model.ErrorBadData, fmt.Errorf("groupBy type: %s not supported", queryParams.GroupBy)} + } + } else { + query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, %s FROM %s WHERE timestamp >= ? AND timestamp <= ?", queryParams.StepSeconds/60, aggregation_query, r.indexTable) + } + + if len(queryParams.ServiceName) > 0 { + for i, e := range queryParams.ServiceName { + if i == 0 && i == len(queryParams.ServiceName)-1 { + query += " AND (serviceName=?)" + } else if i == 0 && i != len(queryParams.ServiceName)-1 { + query += " AND (serviceName=?" + } else if i != 0 && i == len(queryParams.ServiceName)-1 { + query += " OR serviceName=?)" + } else { + query += " OR serviceName=?" + } + args = append(args, e) + } + } + if len(queryParams.HttpRoute) > 0 { + for i, e := range queryParams.HttpRoute { + if i == 0 && i == len(queryParams.HttpRoute)-1 { + query += " AND (httpRoute=?)" + } else if i == 0 && i != len(queryParams.HttpRoute)-1 { + query += " AND (httpRoute=?" + } else if i != 0 && i == len(queryParams.HttpRoute)-1 { + query += " OR httpRoute=?)" + } else { + query += " OR httpRoute=?" + } + args = append(args, e) + } + } + if len(queryParams.HttpCode) > 0 { + for i, e := range queryParams.HttpCode { + if i == 0 && i == len(queryParams.HttpCode)-1 { + query += " AND (httpCode=?)" + } else if i == 0 && i != len(queryParams.HttpCode)-1 { + query += " AND (httpCode=?" + } else if i != 0 && i == len(queryParams.HttpCode)-1 { + query += " OR httpCode=?)" + } else { + query += " OR httpCode=?" + } + args = append(args, e) + } + } + if len(queryParams.HttpHost) > 0 { + for i, e := range queryParams.HttpHost { + if i == 0 && i == len(queryParams.HttpHost)-1 { + query += " AND (httpHost=?)" + } else if i == 0 && i != len(queryParams.HttpHost)-1 { + query += " AND (httpHost=?" + } else if i != 0 && i == len(queryParams.HttpHost)-1 { + query += " OR httpHost=?)" + } else { + query += " OR httpHost=?" + } + args = append(args, e) + } + } + if len(queryParams.HttpMethod) > 0 { + for i, e := range queryParams.HttpMethod { + if i == 0 && i == len(queryParams.HttpMethod)-1 { + query += " AND (httpMethod=?)" + } else if i == 0 && i != len(queryParams.HttpMethod)-1 { + query += " AND (httpMethod=?" + } else if i != 0 && i == len(queryParams.HttpMethod)-1 { + query += " OR httpMethod=?)" + } else { + query += " OR httpMethod=?" + } + args = append(args, e) + } + } + if len(queryParams.HttpUrl) > 0 { + for i, e := range queryParams.HttpUrl { + if i == 0 && i == len(queryParams.HttpUrl)-1 { + query += " AND (httpUrl=?)" + } else if i == 0 && i != len(queryParams.HttpUrl)-1 { + query += " AND (httpUrl=?" + } else if i != 0 && i == len(queryParams.HttpUrl)-1 { + query += " OR httpUrl=?)" + } else { + query += " OR httpUrl=?" + } + args = append(args, e) + } + } + if len(queryParams.Component) > 0 { + for i, e := range queryParams.Component { + if i == 0 && i == len(queryParams.Component)-1 { + query += " AND (component=?)" + } else if i == 0 && i != len(queryParams.Component)-1 { + query += " AND (component=?" + } else if i != 0 && i == len(queryParams.Component)-1 { + query += " OR component=?)" + } else { + query += " OR component=?" + } + args = append(args, e) + } + } + if len(queryParams.Operation) > 0 { + for i, e := range queryParams.Operation { + if i == 0 && i == len(queryParams.Operation)-1 { + query += " AND (name=?)" + } else if i == 0 && i != len(queryParams.Operation)-1 { + query += " AND (name=?" + } else if i != 0 && i == len(queryParams.Operation)-1 { + query += " OR name=?)" + } else { + query += " OR name=?" + } + args = append(args, e) + } + } + if len(queryParams.MinDuration) != 0 { + query = query + " AND durationNano >= ?" + args = append(args, queryParams.MinDuration) + } + if len(queryParams.MaxDuration) != 0 { + query = query + " AND durationNano <= ?" + args = append(args, queryParams.MaxDuration) + } + if len(queryParams.Status) != 0 { + for _, e := range queryParams.Status { + if e == "error" { + query += " AND ( ( has(tags, 'error:true') OR statusCode>=500 OR statusCode=2))" + } else if e == "ok" { + query += " AND (NOT ( has(tags, 'error:true') AND statusCode<500 AND statusCode!=2))" + } + } + } + if len(queryParams.Kind) != 0 { + query = query + " AND kind = ?" + args = append(args, queryParams.Kind) + } + + for _, item := range queryParams.Tags { + + if item.Operator == "in" { + for i, value := range item.Values { + if i == 0 && i == len(item.Values)-1 { + query += " AND has(tags, ?)" + } else if i == 0 && i != len(item.Values)-1 { + query += " AND (has(tags, ?)" + } else if i != 0 && i == len(item.Values)-1 { + query += " OR has(tags, ?))" + } else { + query += " OR has(tags, ?)" + } + args = append(args, fmt.Sprintf("%s:%s", item.Key, value)) + } + } else if item.Operator == "not in" { + for i, value := range item.Values { + if i == 0 && i == len(item.Values)-1 { + query += " AND NOT has(tags, ?)" + } else if i == 0 && i != len(item.Values)-1 { + query += " AND NOT (has(tags, ?)" + } else if i != 0 && i == len(item.Values)-1 { + query += " OR has(tags, ?))" + } else { + query += " OR has(tags, ?)" + } + args = append(args, fmt.Sprintf("%s:%s", item.Key, value)) + } + } else if item.Operator == "regex" { + if len(item.Values) != 1 { + return nil, &model.ApiError{model.ErrorExec, fmt.Errorf("Regex tag operator should only have one value")} + } + query = query + " AND match(tagsValues[indexOf(tagsKeys, ?)], ?)" + args = append(args, item.Key) + args = append(args, item.Values[0]) + } else if item.Operator == "isnotnull" { + for range item.Values { + query = query + " AND has(tagsKeys, ?)" + args = append(args, item.Key) + } + } else { + return nil, &model.ApiError{model.ErrorExec, fmt.Errorf("Tag Operator %s not supported", item.Operator)} + } + + } + + if queryParams.GroupBy != "" { + switch queryParams.GroupBy { + case "serviceName": + query = query + " GROUP BY time, serviceName as groupBy ORDER BY time" + case "httpCode": + query = query + " GROUP BY time, httpCode as groupBy ORDER BY time" + case "httpMethod": + query = query + " GROUP BY time, httpMethod as groupBy ORDER BY time" + case "httpUrl": + query = query + " GROUP BY time, httpUrl as groupBy ORDER BY time" + case "httpRoute": + query = query + " GROUP BY time, httpRoute as groupBy ORDER BY time" + case "httpHost": + query = query + " GROUP BY time, httpHost as groupBy ORDER BY time" + case "dbName": + query = query + " GROUP BY time, dbName as groupBy ORDER BY time" + case "dbOperation": + query = query + " GROUP BY time, dbOperation as groupBy ORDER BY time" + case "operation": + query = query + " GROUP BY time, name as groupBy ORDER BY time" + case "msgSystem": + query = query + " GROUP BY time, msgSystem as groupBy ORDER BY time" + case "msgOperation": + query = query + " GROUP BY time, msgOperation as groupBy ORDER BY time" + case "dbSystem": + query = query + " GROUP BY time, dbSystem as groupBy ORDER BY time" + case "component": + query = query + " GROUP BY time, component as groupBy ORDER BY time" + default: + return nil, &model.ApiError{model.ErrorBadData, fmt.Errorf("groupBy type: %s not supported", queryParams.GroupBy)} + } + } else { + query = query + " GROUP BY time ORDER BY time" + } + + err := r.db.Select(&SpanAggregatesDBResponseItems, query, args...) + + zap.S().Info(query) + + if err != nil { + zap.S().Debug("Error in processing sql query: ", err) + return nil, &model.ApiError{model.ErrorExec, fmt.Errorf("Error in processing sql query")} + } + + GetFilteredSpansAggregatesResponse := model.GetFilteredSpansAggregatesResponse{ + Items: map[int64]model.SpanAggregatesResponseItem{}, + } + + for i, _ := range SpanAggregatesDBResponseItems { + + timeObj, _ := time.Parse(time.RFC3339Nano, SpanAggregatesDBResponseItems[i].Time) + SpanAggregatesDBResponseItems[i].Timestamp = int64(timeObj.UnixNano()) + SpanAggregatesDBResponseItems[i].Time = "" + if queryParams.AggregationOption == "rate_per_sec" { + SpanAggregatesDBResponseItems[i].Value = float32(SpanAggregatesDBResponseItems[i].Value) / float32(queryParams.StepSeconds) + } + if responseElement, ok := GetFilteredSpansAggregatesResponse.Items[SpanAggregatesDBResponseItems[i].Timestamp]; !ok { + if queryParams.GroupBy != "" { + GetFilteredSpansAggregatesResponse.Items[SpanAggregatesDBResponseItems[i].Timestamp] = model.SpanAggregatesResponseItem{ + Timestamp: SpanAggregatesDBResponseItems[i].Timestamp, + GroupBy: map[string]float32{SpanAggregatesDBResponseItems[i].GroupBy.String: SpanAggregatesDBResponseItems[i].Value}, + } + } else { + GetFilteredSpansAggregatesResponse.Items[SpanAggregatesDBResponseItems[i].Timestamp] = model.SpanAggregatesResponseItem{ + Timestamp: SpanAggregatesDBResponseItems[i].Timestamp, + Value: SpanAggregatesDBResponseItems[i].Value, + } + } + + } else { + if queryParams.GroupBy != "" { + responseElement.GroupBy[SpanAggregatesDBResponseItems[i].GroupBy.String] = SpanAggregatesDBResponseItems[i].Value + } + GetFilteredSpansAggregatesResponse.Items[SpanAggregatesDBResponseItems[i].Timestamp] = responseElement + } + } + + return &GetFilteredSpansAggregatesResponse, nil +} + func (r *ClickHouseReader) SetTTL(ctx context.Context, ttlParams *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) { switch ttlParams.Type { diff --git a/pkg/query-service/app/druidReader/reader.go b/pkg/query-service/app/druidReader/reader.go index db6bac273d..bd44ad2670 100644 --- a/pkg/query-service/app/druidReader/reader.go +++ b/pkg/query-service/app/druidReader/reader.go @@ -165,6 +165,22 @@ func (druid *DruidReader) GetTTL(_ context.Context, _ *model.GetTTLParams) (*mod return nil, &model.ApiError{model.ErrorNotImplemented, fmt.Errorf("druid does not support setting ttl configuration")} } +func (druid *DruidReader) GetSpanFilters(_ context.Context, _ *model.SpanFilterParams) (*model.SpanFiltersResponse, *model.ApiError) { + return nil, &model.ApiError{model.ErrorNotImplemented, fmt.Errorf("druid does not support getting spanfilters")} +} + +func (druid *DruidReader) GetTagFilters(_ context.Context, _ *model.TagFilterParams) (*[]model.TagFilters, *model.ApiError) { + return nil, &model.ApiError{model.ErrorNotImplemented, fmt.Errorf("druid does not support getting tagFilters")} +} + +func (druid *DruidReader) GetFilteredSpans(_ context.Context, _ *model.GetFilteredSpansParams) (*model.GetFilterSpansResponse, *model.ApiError) { + return nil, &model.ApiError{model.ErrorNotImplemented, fmt.Errorf("druid does not support getting FilteredSpans")} +} + +func (druid *DruidReader) GetFilteredSpansAggregates(_ context.Context, _ *model.GetFilteredSpanAggregatesParams) (*model.GetFilteredSpansAggregatesResponse, *model.ApiError) { + return nil, &model.ApiError{model.ErrorNotImplemented, fmt.Errorf("druid does not support getting FilteredSpans")} +} + func (druid *DruidReader) GetErrors(_ context.Context, _ *model.GetErrorsParams) (*[]model.Error, *model.ApiError) { return nil, &model.ApiError{model.ErrorNotImplemented, fmt.Errorf("druid does not support get error API")} } diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index f190def948..e64a9ac1d6 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -203,6 +203,10 @@ func (aH *APIHandler) RegisterRoutes(router *mux.Router) { router.HandleFunc("/api/v1/serviceMapDependencies", aH.serviceMapDependencies).Methods(http.MethodGet) router.HandleFunc("/api/v1/settings/ttl", aH.setTTL).Methods(http.MethodPost) router.HandleFunc("/api/v1/settings/ttl", aH.getTTL).Methods(http.MethodGet) + router.HandleFunc("/api/v1/getSpanFilters", aH.getSpanFilters).Methods(http.MethodGet) + router.HandleFunc("/api/v1/getTagFilters", aH.getTagFilters).Methods(http.MethodGet) + router.HandleFunc("/api/v1/getFilteredSpans", aH.getFilteredSpans).Methods(http.MethodGet) + router.HandleFunc("/api/v1/getFilteredSpans/aggregates", aH.getFilteredSpanAggregates).Methods(http.MethodGet) router.HandleFunc("/api/v1/errors", aH.getErrors).Methods(http.MethodGet) router.HandleFunc("/api/v1/errorWithId", aH.getErrorForId).Methods(http.MethodGet) router.HandleFunc("/api/v1/errorWithType", aH.getErrorForType).Methods(http.MethodGet) @@ -965,6 +969,70 @@ func (aH *APIHandler) searchSpans(w http.ResponseWriter, r *http.Request) { aH.writeJSON(w, r, result) } +func (aH *APIHandler) getSpanFilters(w http.ResponseWriter, r *http.Request) { + + query, err := parseSpanFilterRequest(r) + if aH.handleError(w, err, http.StatusBadRequest) { + return + } + + result, apiErr := (*aH.reader).GetSpanFilters(context.Background(), query) + + if apiErr != nil && aH.handleError(w, apiErr.Err, http.StatusInternalServerError) { + return + } + + aH.writeJSON(w, r, result) +} + +func (aH *APIHandler) getFilteredSpans(w http.ResponseWriter, r *http.Request) { + + query, err := parseFilteredSpansRequest(r) + if aH.handleError(w, err, http.StatusBadRequest) { + return + } + + result, apiErr := (*aH.reader).GetFilteredSpans(context.Background(), query) + + if apiErr != nil && aH.handleError(w, apiErr.Err, http.StatusInternalServerError) { + return + } + + aH.writeJSON(w, r, result) +} + +func (aH *APIHandler) getFilteredSpanAggregates(w http.ResponseWriter, r *http.Request) { + + query, err := parseFilteredSpanAggregatesRequest(r) + if aH.handleError(w, err, http.StatusBadRequest) { + return + } + + result, apiErr := (*aH.reader).GetFilteredSpansAggregates(context.Background(), query) + + if apiErr != nil && aH.handleError(w, apiErr.Err, http.StatusInternalServerError) { + return + } + + aH.writeJSON(w, r, result) +} + +func (aH *APIHandler) getTagFilters(w http.ResponseWriter, r *http.Request) { + + query, err := parseTagFilterRequest(r) + if aH.handleError(w, err, http.StatusBadRequest) { + return + } + + result, apiErr := (*aH.reader).GetTagFilters(context.Background(), query) + + if apiErr != nil && aH.handleError(w, apiErr.Err, http.StatusInternalServerError) { + return + } + + aH.writeJSON(w, r, result) +} + func (aH *APIHandler) setTTL(w http.ResponseWriter, r *http.Request) { ttlParams, err := parseDuration(r) if aH.handleError(w, err, http.StatusBadRequest) { diff --git a/pkg/query-service/app/interface.go b/pkg/query-service/app/interface.go index 01cebe5173..de5bdec35b 100644 --- a/pkg/query-service/app/interface.go +++ b/pkg/query-service/app/interface.go @@ -37,6 +37,11 @@ type Reader interface { GetServicesList(ctx context.Context) (*[]string, error) GetServiceMapDependencies(ctx context.Context, query *model.GetServicesParams) (*[]model.ServiceMapDependencyResponseItem, error) GetTTL(ctx context.Context, ttlParams *model.GetTTLParams) (*model.GetTTLResponseItem, *model.ApiError) + GetSpanFilters(ctx context.Context, query *model.SpanFilterParams) (*model.SpanFiltersResponse, *model.ApiError) + GetTagFilters(ctx context.Context, query *model.TagFilterParams) (*[]model.TagFilters, *model.ApiError) + GetFilteredSpans(ctx context.Context, query *model.GetFilteredSpansParams) (*model.GetFilterSpansResponse, *model.ApiError) + GetFilteredSpansAggregates(ctx context.Context, query *model.GetFilteredSpanAggregatesParams) (*model.GetFilteredSpansAggregatesResponse, *model.ApiError) + GetErrors(ctx context.Context, params *model.GetErrorsParams) (*[]model.Error, *model.ApiError) GetErrorForId(ctx context.Context, params *model.GetErrorParams) (*model.ErrorWithSpan, *model.ApiError) GetErrorForType(ctx context.Context, params *model.GetErrorParams) (*model.ErrorWithSpan, *model.ApiError) diff --git a/pkg/query-service/app/parser.go b/pkg/query-service/app/parser.go index 864b5e9b29..39113823b9 100644 --- a/pkg/query-service/app/parser.go +++ b/pkg/query-service/app/parser.go @@ -19,9 +19,11 @@ import ( var allowedDimesions = []string{"calls", "duration"} +var allowedFunctions = []string{"count", "ratePerSec", "sum", "avg", "min", "max", "p50", "p90", "p95", "p99"} + var allowedAggregations = map[string][]string{ "calls": {"count", "rate_per_sec"}, - "duration": {"avg", "p50", "p95", "p99"}, + "duration": {"avg", "p50", "p95", "p90", "p99", "min", "max", "sum"}, } func parseUser(r *http.Request) (*model.User, error) { @@ -480,6 +482,284 @@ func parseSpanSearchRequest(r *http.Request) (*model.SpanSearchParams, error) { return params, nil } +func parseSpanFilterRequest(r *http.Request) (*model.SpanFilterParams, error) { + + startTime, err := parseTime("start", r) + if err != nil { + return nil, err + } + endTime, err := parseTimeMinusBuffer("end", r) + if err != nil { + return nil, err + } + + params := &model.SpanFilterParams{ + Start: startTime, + End: endTime, + ServiceName: []string{}, + HttpRoute: []string{}, + HttpCode: []string{}, + HttpUrl: []string{}, + HttpHost: []string{}, + HttpMethod: []string{}, + Component: []string{}, + Status: []string{}, + Operation: []string{}, + GetFilters: []string{}, + } + + params.ServiceName = fetchArrayValues("serviceName", r) + + params.Status = fetchArrayValues("status", r) + + params.Operation = fetchArrayValues("operation", r) + + params.HttpCode = fetchArrayValues("httpCode", r) + + params.HttpUrl = fetchArrayValues("httpUrl", r) + + params.HttpHost = fetchArrayValues("httpHost", r) + + params.HttpRoute = fetchArrayValues("httpRoute", r) + + params.HttpMethod = fetchArrayValues("httpMethod", r) + + params.Component = fetchArrayValues("component", r) + + params.GetFilters = fetchArrayValues("getFilters", r) + + minDuration, err := parseTimestamp("minDuration", r) + if err == nil { + params.MinDuration = *minDuration + } + maxDuration, err := parseTimestamp("maxDuration", r) + if err == nil { + params.MaxDuration = *maxDuration + } + + return params, nil +} + +func parseFilteredSpansRequest(r *http.Request) (*model.GetFilteredSpansParams, error) { + + startTime, err := parseTime("start", r) + if err != nil { + return nil, err + } + endTime, err := parseTimeMinusBuffer("end", r) + if err != nil { + return nil, err + } + + params := &model.GetFilteredSpansParams{ + Start: startTime, + End: endTime, + ServiceName: []string{}, + HttpRoute: []string{}, + HttpCode: []string{}, + HttpUrl: []string{}, + HttpHost: []string{}, + HttpMethod: []string{}, + Component: []string{}, + Status: []string{}, + Operation: []string{}, + Limit: 100, + Order: "descending", + } + + params.ServiceName = fetchArrayValues("serviceName", r) + + params.Status = fetchArrayValues("status", r) + + params.Operation = fetchArrayValues("operation", r) + + params.HttpCode = fetchArrayValues("httpCode", r) + + params.HttpUrl = fetchArrayValues("httpUrl", r) + + params.HttpHost = fetchArrayValues("httpHost", r) + + params.HttpRoute = fetchArrayValues("httpRoute", r) + + params.HttpMethod = fetchArrayValues("httpMethod", r) + + params.Component = fetchArrayValues("component", r) + + limitStr := r.URL.Query().Get("limit") + if len(limitStr) != 0 { + limit, err := strconv.ParseInt(limitStr, 10, 64) + if err != nil { + return nil, errors.New("Limit param is not in correct format") + } + params.Limit = limit + } else { + params.Limit = 100 + } + + offsetStr := r.URL.Query().Get("offset") + if len(offsetStr) != 0 { + offset, err := strconv.ParseInt(offsetStr, 10, 64) + if err != nil { + return nil, errors.New("Offset param is not in correct format") + } + params.Offset = offset + } + + tags, err := parseTagsV2("tags", r) + if err != nil { + return nil, err + } + if len(*tags) != 0 { + params.Tags = *tags + } + + minDuration, err := parseTimestamp("minDuration", r) + if err == nil { + params.MinDuration = *minDuration + } + maxDuration, err := parseTimestamp("maxDuration", r) + if err == nil { + params.MaxDuration = *maxDuration + } + + kind := r.URL.Query().Get("kind") + if len(kind) != 0 { + params.Kind = kind + } + + return params, nil +} + +func parseFilteredSpanAggregatesRequest(r *http.Request) (*model.GetFilteredSpanAggregatesParams, error) { + + startTime, err := parseTime("start", r) + if err != nil { + return nil, err + } + + endTime, err := parseTimeMinusBuffer("end", r) + if err != nil { + return nil, err + } + + stepStr := r.URL.Query().Get("step") + if len(stepStr) == 0 { + return nil, errors.New("step param missing in query") + } + + stepInt, err := strconv.Atoi(stepStr) + if err != nil { + return nil, errors.New("step param is not in correct format") + } + + function := r.URL.Query().Get("function") + if len(function) == 0 { + return nil, errors.New("function param missing in query") + } else { + if !DoesExistInSlice(function, allowedFunctions) { + return nil, errors.New(fmt.Sprintf("given function: %s is not allowed in query", function)) + } + } + + var dimension, aggregationOption string + + switch function { + case "count": + dimension = "calls" + aggregationOption = "count" + case "ratePerSec": + dimension = "calls" + aggregationOption = "rate_per_sec" + case "avg": + dimension = "duration" + aggregationOption = "avg" + case "sum": + dimension = "duration" + aggregationOption = "sum" + case "p50": + dimension = "duration" + aggregationOption = "p50" + case "p90": + dimension = "duration" + aggregationOption = "p90" + case "p95": + dimension = "duration" + aggregationOption = "p95" + case "p99": + dimension = "duration" + aggregationOption = "p99" + case "min": + dimension = "duration" + aggregationOption = "min" + case "max": + dimension = "duration" + aggregationOption = "max" + } + + params := &model.GetFilteredSpanAggregatesParams{ + Start: startTime, + End: endTime, + ServiceName: []string{}, + HttpRoute: []string{}, + HttpCode: []string{}, + HttpUrl: []string{}, + HttpHost: []string{}, + HttpMethod: []string{}, + Component: []string{}, + Status: []string{}, + Operation: []string{}, + StepSeconds: stepInt, + Dimension: dimension, + AggregationOption: aggregationOption, + } + + params.ServiceName = fetchArrayValues("serviceName", r) + + params.Status = fetchArrayValues("status", r) + + params.Operation = fetchArrayValues("operation", r) + + params.HttpCode = fetchArrayValues("httpCode", r) + + params.HttpUrl = fetchArrayValues("httpUrl", r) + + params.HttpHost = fetchArrayValues("httpHost", r) + + params.HttpRoute = fetchArrayValues("httpRoute", r) + + params.HttpMethod = fetchArrayValues("httpMethod", r) + + params.Component = fetchArrayValues("component", r) + + tags, err := parseTagsV2("tags", r) + if err != nil { + return nil, err + } + if len(*tags) != 0 { + params.Tags = *tags + } + + minDuration, err := parseTimestamp("minDuration", r) + if err == nil { + params.MinDuration = *minDuration + } + maxDuration, err := parseTimestamp("maxDuration", r) + if err == nil { + params.MaxDuration = *maxDuration + } + + kind := r.URL.Query().Get("kind") + if len(kind) != 0 { + params.Kind = kind + } + groupBy := r.URL.Query().Get("groupBy") + if len(groupBy) != 0 { + params.GroupBy = groupBy + } + + return params, nil +} + func parseErrorRequest(r *http.Request) (*model.GetErrorParams, error) { params := &model.GetErrorParams{} @@ -502,6 +782,60 @@ func parseErrorRequest(r *http.Request) (*model.GetErrorParams, error) { return params, nil } +func parseTagFilterRequest(r *http.Request) (*model.TagFilterParams, error) { + + startTime, err := parseTime("start", r) + if err != nil { + return nil, err + } + endTime, err := parseTimeMinusBuffer("end", r) + if err != nil { + return nil, err + } + + params := &model.TagFilterParams{ + Start: startTime, + End: endTime, + ServiceName: []string{}, + HttpRoute: []string{}, + HttpCode: []string{}, + HttpUrl: []string{}, + HttpHost: []string{}, + HttpMethod: []string{}, + Component: []string{}, + Status: []string{}, + Operation: []string{}, + } + + params.ServiceName = fetchArrayValues("serviceName", r) + + params.Status = fetchArrayValues("status", r) + + params.Operation = fetchArrayValues("operation", r) + + params.HttpCode = fetchArrayValues("httpCode", r) + + params.HttpUrl = fetchArrayValues("httpUrl", r) + + params.HttpHost = fetchArrayValues("httpHost", r) + + params.HttpRoute = fetchArrayValues("httpRoute", r) + + params.HttpMethod = fetchArrayValues("httpMethod", r) + + params.Component = fetchArrayValues("component", r) + + minDuration, err := parseTimestamp("minDuration", r) + if err == nil { + params.MinDuration = *minDuration + } + maxDuration, err := parseTimestamp("maxDuration", r) + if err == nil { + params.MaxDuration = *maxDuration + } + + return params, nil +} func parseErrorsRequest(r *http.Request) (*model.GetErrorsParams, error) { startTime, err := parseTime("start", r) @@ -521,6 +855,19 @@ func parseErrorsRequest(r *http.Request) (*model.GetErrorsParams, error) { return params, nil } +func fetchArrayValues(param string, r *http.Request) []string { + valueStr := r.URL.Query().Get(param) + var values []string + if len(valueStr) == 0 { + return values + } + err := json.Unmarshal([]byte(valueStr), &values) + if err != nil { + zap.S().Error("Error in parsing service params", zap.Error(err)) + } + return values +} + func parseTags(param string, r *http.Request) (*[]model.TagQuery, error) { tags := new([]model.TagQuery) @@ -539,6 +886,24 @@ func parseTags(param string, r *http.Request) (*[]model.TagQuery, error) { return tags, nil } +func parseTagsV2(param string, r *http.Request) (*[]model.TagQueryV2, error) { + + tags := new([]model.TagQueryV2) + tagsStr := r.URL.Query().Get(param) + + if len(tagsStr) == 0 { + return tags, nil + } + err := json.Unmarshal([]byte(tagsStr), tags) + if err != nil { + zap.S().Error("Error in parsig tags", zap.Error(err)) + return nil, fmt.Errorf("error in parsing %s ", param) + } + // zap.S().Info("Tags: ", *tags) + + return tags, nil +} + func parseApplicationPercentileRequest(r *http.Request) (*model.ApplicationPercentileParams, error) { startTime, err := parseTime("start", r) diff --git a/pkg/query-service/model/queryParams.go b/pkg/query-service/model/queryParams.go index 9b22a1a0fc..4732bbbb6b 100644 --- a/pkg/query-service/model/queryParams.go +++ b/pkg/query-service/model/queryParams.go @@ -78,6 +78,11 @@ type TagQuery struct { Operator string } +type TagQueryV2 struct { + Key string + Values []string + Operator string +} type SpanSearchAggregatesParams struct { ServiceName string OperationName string @@ -111,6 +116,82 @@ type SpanSearchParams struct { Tags []TagQuery } +type GetFilteredSpansParams struct { + ServiceName []string + Operation []string + Kind string + Status []string + HttpRoute []string + HttpCode []string + HttpUrl []string + HttpHost []string + HttpMethod []string + Component []string + Start *time.Time + End *time.Time + MinDuration string + MaxDuration string + Limit int64 + Order string + Offset int64 + Tags []TagQueryV2 +} + +type GetFilteredSpanAggregatesParams struct { + ServiceName []string + Operation []string + Kind string + Status []string + HttpRoute []string + HttpCode []string + HttpUrl []string + HttpHost []string + HttpMethod []string + Component []string + MinDuration string + MaxDuration string + Tags []TagQueryV2 + Start *time.Time + End *time.Time + StepSeconds int + Dimension string + AggregationOption string + GroupBy string + Function string +} + +type SpanFilterParams struct { + Status []string + ServiceName []string + HttpRoute []string + HttpCode []string + HttpUrl []string + HttpHost []string + HttpMethod []string + Component []string + Operation []string + GetFilters []string + MinDuration string + MaxDuration string + Start *time.Time + End *time.Time +} + +type TagFilterParams struct { + Status []string + ServiceName []string + HttpRoute []string + HttpCode []string + HttpUrl []string + HttpHost []string + HttpMethod []string + Component []string + Operation []string + MinDuration string + MaxDuration string + Start *time.Time + End *time.Time +} type TTLParams struct { Type string Duration string diff --git a/pkg/query-service/model/response.go b/pkg/query-service/model/response.go index d6f6e1fcc7..d3e15490f1 100644 --- a/pkg/query-service/model/response.go +++ b/pkg/query-service/model/response.go @@ -1,6 +1,7 @@ package model import ( + "database/sql" "encoding/json" "fmt" "strconv" @@ -128,6 +129,22 @@ type SearchSpansResult struct { Events [][]interface{} `json:"events"` } +type GetFilterSpansResponseItem struct { + Timestamp string `db:"timestamp" json:"timestamp"` + SpanID string `db:"spanID" json:"spanID"` + TraceID string `db:"traceID" json:"traceID"` + ServiceName string `db:"serviceName" json:"serviceName"` + Operation string `db:"name" json:"operation"` + DurationNano int64 `db:"durationNano" json:"durationNano"` + HttpCode string `db:"httpCode" json:"httpCode"` + HttpMethod string `db:"httpMethod" json:"httpMethod"` +} + +type GetFilterSpansResponse struct { + Spans []GetFilterSpansResponseItem `json:"spans"` + TotalSpans int `json:"totalSpans"` +} + type TraceResult struct { Data []interface{} `json:"data" db:"data"` Total int `json:"total" db:"total"` @@ -247,6 +264,9 @@ type TagItem struct { TagCount int `json:"tagCount" db:"tagCount"` } +type TagFilters struct { + TagKeys string `json:"tagKeys" db:"tagKeys"` +} type ServiceMapDependencyResponseItem struct { Parent string `json:"parent,omitempty" db:"parent,omitempty"` Child string `json:"child,omitempty" db:"child,omitempty"` @@ -259,6 +279,21 @@ type SpanSearchAggregatesResponseItem struct { Value float32 `json:"value,omitempty" db:"value"` } +type GetFilteredSpansAggregatesResponse struct { + Items map[int64]SpanAggregatesResponseItem `json:"items"` +} +type SpanAggregatesResponseItem struct { + Timestamp int64 `json:"timestamp,omitempty" ` + Value float32 `json:"value,omitempty"` + GroupBy map[string]float32 `json:"groupBy,omitempty"` +} +type SpanAggregatesDBResponseItem struct { + Timestamp int64 `json:"timestamp,omitempty" db:"timestamp" ` + Time string `json:"time,omitempty" db:"time"` + Value float32 `json:"value,omitempty" db:"value"` + GroupBy sql.NullString `json:"groupBy,omitempty" db:"groupBy"` +} + type SetTTLResponseItem struct { Message string `json:"message"` } @@ -272,6 +307,71 @@ type GetTTLResponseItem struct { TracesTime int `json:"traces_ttl_duration_hrs"` } +type DBResponseMinMaxDuration struct { + MinDuration int `db:"min(durationNano)"` + MaxDuration int `db:"max(durationNano)"` +} + +type DBResponseServiceName struct { + ServiceName string `db:"serviceName"` + Count int `db:"count"` +} + +type DBResponseHttpCode struct { + HttpCode string `db:"httpCode"` + Count int `db:"count"` +} + +type DBResponseHttpRoute struct { + HttpRoute string `db:"httpRoute"` + Count int `db:"count"` +} + +type DBResponseHttpUrl struct { + HttpUrl string `db:"httpUrl"` + Count int `db:"count"` +} + +type DBResponseHttpMethod struct { + HttpMethod string `db:"httpMethod"` + Count int `db:"count"` +} + +type DBResponseHttpHost struct { + HttpHost string `db:"httpHost"` + Count int `db:"count"` +} + +type DBResponseOperation struct { + Operation string `db:"name"` + Count int `db:"count"` +} + +type DBResponseComponent struct { + Component sql.NullString `db:"component"` + Count int `db:"count"` +} + +type DBResponseErrors struct { + NumErrors int `db:"numErrors"` +} + +type DBResponseTotal struct { + NumTotal int `db:"numTotal"` +} + +type SpanFiltersResponse struct { + ServiceName map[string]int `json:"serviceName"` + Status map[string]int `json:"status"` + Duration map[string]int `json:"duration"` + Operation map[string]int `json:"operation"` + HttpCode map[string]int `json:"httpCode"` + HttpUrl map[string]int `json:"httpUrl"` + HttpMethod map[string]int `json:"httpMethod"` + HttpRoute map[string]int `json:"httpRoute"` + HttpHost map[string]int `json:"httpHost"` + Component map[string]int `json:"component"` +} type Error struct { ExceptionType string `json:"exceptionType" db:"exceptionType"` ExceptionMsg string `json:"exceptionMessage" db:"exceptionMessage"`