mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-08-12 18:49:06 +08:00
fix: remove unwanted trace API's (#6464)
This commit is contained in:
parent
22e61e1605
commit
91bbeaf175
@ -766,307 +766,6 @@ func buildFilterArrayQuery(_ context.Context, excludeMap map[string]struct{}, pa
|
|||||||
return args
|
return args
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *ClickHouseReader) GetSpanFilters(ctx context.Context, queryParams *model.SpanFilterParams) (*model.SpanFiltersResponse, *model.ApiError) {
|
|
||||||
|
|
||||||
var query string
|
|
||||||
excludeMap := make(map[string]struct{})
|
|
||||||
for _, e := range queryParams.Exclude {
|
|
||||||
if e == constants.OperationRequest {
|
|
||||||
excludeMap[constants.OperationDB] = struct{}{}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
excludeMap[e] = struct{}{}
|
|
||||||
}
|
|
||||||
|
|
||||||
args := []interface{}{clickhouse.Named("timestampL", strconv.FormatInt(queryParams.Start.UnixNano(), 10)), clickhouse.Named("timestampU", strconv.FormatInt(queryParams.End.UnixNano(), 10))}
|
|
||||||
if len(queryParams.TraceID) > 0 {
|
|
||||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.TraceID, constants.TraceID, &query, args)
|
|
||||||
}
|
|
||||||
if len(queryParams.ServiceName) > 0 {
|
|
||||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.ServiceName, constants.ServiceName, &query, args)
|
|
||||||
}
|
|
||||||
if len(queryParams.HttpRoute) > 0 {
|
|
||||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpRoute, constants.HttpRoute, &query, args)
|
|
||||||
}
|
|
||||||
if len(queryParams.HttpHost) > 0 {
|
|
||||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpHost, constants.HttpHost, &query, args)
|
|
||||||
}
|
|
||||||
if len(queryParams.HttpMethod) > 0 {
|
|
||||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpMethod, constants.HttpMethod, &query, args)
|
|
||||||
}
|
|
||||||
if len(queryParams.HttpUrl) > 0 {
|
|
||||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpUrl, constants.HttpUrl, &query, args)
|
|
||||||
}
|
|
||||||
if len(queryParams.Operation) > 0 {
|
|
||||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.Operation, constants.OperationDB, &query, args)
|
|
||||||
}
|
|
||||||
if len(queryParams.RPCMethod) > 0 {
|
|
||||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.RPCMethod, constants.RPCMethod, &query, args)
|
|
||||||
}
|
|
||||||
if len(queryParams.ResponseStatusCode) > 0 {
|
|
||||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.ResponseStatusCode, constants.ResponseStatusCode, &query, args)
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(queryParams.MinDuration) != 0 {
|
|
||||||
query = query + " AND durationNano >= @durationNanoMin"
|
|
||||||
args = append(args, clickhouse.Named("durationNanoMin", queryParams.MinDuration))
|
|
||||||
}
|
|
||||||
if len(queryParams.MaxDuration) != 0 {
|
|
||||||
query = query + " AND durationNano <= @durationNanoMax"
|
|
||||||
args = append(args, clickhouse.Named("durationNanoMax", queryParams.MaxDuration))
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(queryParams.SpanKind) != 0 {
|
|
||||||
query = query + " AND kind = @kind"
|
|
||||||
args = append(args, clickhouse.Named("kind", queryParams.SpanKind))
|
|
||||||
}
|
|
||||||
|
|
||||||
query = getStatusFilters(query, queryParams.Status, excludeMap)
|
|
||||||
|
|
||||||
traceFilterReponse := model.SpanFiltersResponse{
|
|
||||||
Status: map[string]uint64{},
|
|
||||||
Duration: map[string]uint64{},
|
|
||||||
ServiceName: map[string]uint64{},
|
|
||||||
Operation: map[string]uint64{},
|
|
||||||
ResponseStatusCode: map[string]uint64{},
|
|
||||||
RPCMethod: map[string]uint64{},
|
|
||||||
HttpMethod: map[string]uint64{},
|
|
||||||
HttpUrl: map[string]uint64{},
|
|
||||||
HttpRoute: map[string]uint64{},
|
|
||||||
HttpHost: map[string]uint64{},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, e := range queryParams.GetFilters {
|
|
||||||
switch e {
|
|
||||||
case constants.TraceID:
|
|
||||||
continue
|
|
||||||
case constants.ServiceName:
|
|
||||||
finalQuery := fmt.Sprintf("SELECT serviceName, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable)
|
|
||||||
finalQuery += query
|
|
||||||
finalQuery += " GROUP BY serviceName"
|
|
||||||
var dBResponse []model.DBResponseServiceName
|
|
||||||
err := r.db.Select(ctx, &dBResponse, finalQuery, args...)
|
|
||||||
zap.L().Info(finalQuery)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
|
||||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)}
|
|
||||||
}
|
|
||||||
for _, service := range dBResponse {
|
|
||||||
if service.ServiceName != "" {
|
|
||||||
traceFilterReponse.ServiceName[service.ServiceName] = service.Count
|
|
||||||
}
|
|
||||||
}
|
|
||||||
case constants.HttpRoute:
|
|
||||||
finalQuery := fmt.Sprintf("SELECT httpRoute, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable)
|
|
||||||
finalQuery += query
|
|
||||||
finalQuery += " GROUP BY httpRoute"
|
|
||||||
var dBResponse []model.DBResponseHttpRoute
|
|
||||||
err := r.db.Select(ctx, &dBResponse, finalQuery, args...)
|
|
||||||
zap.L().Info(finalQuery)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
|
||||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)}
|
|
||||||
}
|
|
||||||
for _, service := range dBResponse {
|
|
||||||
if service.HttpRoute != "" {
|
|
||||||
traceFilterReponse.HttpRoute[service.HttpRoute] = service.Count
|
|
||||||
}
|
|
||||||
}
|
|
||||||
case constants.HttpUrl:
|
|
||||||
finalQuery := fmt.Sprintf("SELECT httpUrl, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable)
|
|
||||||
finalQuery += query
|
|
||||||
finalQuery += " GROUP BY httpUrl"
|
|
||||||
var dBResponse []model.DBResponseHttpUrl
|
|
||||||
err := r.db.Select(ctx, &dBResponse, finalQuery, args...)
|
|
||||||
zap.L().Info(finalQuery)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
|
||||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)}
|
|
||||||
}
|
|
||||||
for _, service := range dBResponse {
|
|
||||||
if service.HttpUrl != "" {
|
|
||||||
traceFilterReponse.HttpUrl[service.HttpUrl] = service.Count
|
|
||||||
}
|
|
||||||
}
|
|
||||||
case constants.HttpMethod:
|
|
||||||
finalQuery := fmt.Sprintf("SELECT httpMethod, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable)
|
|
||||||
finalQuery += query
|
|
||||||
finalQuery += " GROUP BY httpMethod"
|
|
||||||
var dBResponse []model.DBResponseHttpMethod
|
|
||||||
err := r.db.Select(ctx, &dBResponse, finalQuery, args...)
|
|
||||||
zap.L().Info(finalQuery)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
|
||||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)}
|
|
||||||
}
|
|
||||||
for _, service := range dBResponse {
|
|
||||||
if service.HttpMethod != "" {
|
|
||||||
traceFilterReponse.HttpMethod[service.HttpMethod] = service.Count
|
|
||||||
}
|
|
||||||
}
|
|
||||||
case constants.HttpHost:
|
|
||||||
finalQuery := fmt.Sprintf("SELECT httpHost, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable)
|
|
||||||
finalQuery += query
|
|
||||||
finalQuery += " GROUP BY httpHost"
|
|
||||||
var dBResponse []model.DBResponseHttpHost
|
|
||||||
err := r.db.Select(ctx, &dBResponse, finalQuery, args...)
|
|
||||||
zap.L().Info(finalQuery)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
|
||||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)}
|
|
||||||
}
|
|
||||||
for _, service := range dBResponse {
|
|
||||||
if service.HttpHost != "" {
|
|
||||||
traceFilterReponse.HttpHost[service.HttpHost] = service.Count
|
|
||||||
}
|
|
||||||
}
|
|
||||||
case constants.OperationRequest:
|
|
||||||
finalQuery := fmt.Sprintf("SELECT name, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable)
|
|
||||||
finalQuery += query
|
|
||||||
finalQuery += " GROUP BY name"
|
|
||||||
var dBResponse []model.DBResponseOperation
|
|
||||||
err := r.db.Select(ctx, &dBResponse, finalQuery, args...)
|
|
||||||
zap.L().Info(finalQuery)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
|
||||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)}
|
|
||||||
}
|
|
||||||
for _, service := range dBResponse {
|
|
||||||
if service.Operation != "" {
|
|
||||||
traceFilterReponse.Operation[service.Operation] = service.Count
|
|
||||||
}
|
|
||||||
}
|
|
||||||
case constants.Status:
|
|
||||||
finalQuery := fmt.Sprintf("SELECT COUNT(*) as numTotal FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU AND hasError = true", r.TraceDB, r.indexTable)
|
|
||||||
finalQuery += query
|
|
||||||
var dBResponse []model.DBResponseTotal
|
|
||||||
err := r.db.Select(ctx, &dBResponse, finalQuery, args...)
|
|
||||||
zap.L().Info(finalQuery)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
|
||||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)}
|
|
||||||
}
|
|
||||||
|
|
||||||
finalQuery2 := fmt.Sprintf("SELECT COUNT(*) as numTotal FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU AND hasError = false", r.TraceDB, r.indexTable)
|
|
||||||
finalQuery2 += query
|
|
||||||
var dBResponse2 []model.DBResponseTotal
|
|
||||||
err = r.db.Select(ctx, &dBResponse2, finalQuery2, args...)
|
|
||||||
zap.L().Info(finalQuery2)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
|
||||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)}
|
|
||||||
}
|
|
||||||
if len(dBResponse) > 0 && len(dBResponse2) > 0 {
|
|
||||||
traceFilterReponse.Status = map[string]uint64{"ok": dBResponse2[0].NumTotal, "error": dBResponse[0].NumTotal}
|
|
||||||
} else if len(dBResponse) > 0 {
|
|
||||||
traceFilterReponse.Status = map[string]uint64{"ok": 0, "error": dBResponse[0].NumTotal}
|
|
||||||
} else if len(dBResponse2) > 0 {
|
|
||||||
traceFilterReponse.Status = map[string]uint64{"ok": dBResponse2[0].NumTotal, "error": 0}
|
|
||||||
} else {
|
|
||||||
traceFilterReponse.Status = map[string]uint64{"ok": 0, "error": 0}
|
|
||||||
}
|
|
||||||
case constants.Duration:
|
|
||||||
err := r.featureFlags.CheckFeature(constants.DurationSort)
|
|
||||||
durationSortEnabled := err == nil
|
|
||||||
finalQuery := ""
|
|
||||||
if !durationSortEnabled {
|
|
||||||
// if duration sort is not enabled, we need to get the min and max duration from the index table
|
|
||||||
finalQuery = fmt.Sprintf("SELECT min(durationNano) as min, max(durationNano) as max FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable)
|
|
||||||
finalQuery += query
|
|
||||||
var dBResponse []model.DBResponseMinMax
|
|
||||||
err = r.db.Select(ctx, &dBResponse, finalQuery, args...)
|
|
||||||
zap.L().Info(finalQuery)
|
|
||||||
if err != nil {
|
|
||||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
|
||||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)}
|
|
||||||
}
|
|
||||||
if len(dBResponse) > 0 {
|
|
||||||
traceFilterReponse.Duration = map[string]uint64{"minDuration": dBResponse[0].Min, "maxDuration": dBResponse[0].Max}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// when duration sort is enabled, we need to get the min and max duration from the duration table
|
|
||||||
finalQuery = fmt.Sprintf("SELECT durationNano as numTotal FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.durationTable)
|
|
||||||
finalQuery += query
|
|
||||||
finalQuery += " ORDER BY durationNano LIMIT 1"
|
|
||||||
var dBResponse []model.DBResponseTotal
|
|
||||||
err = r.db.Select(ctx, &dBResponse, finalQuery, args...)
|
|
||||||
zap.L().Info(finalQuery)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
|
||||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)}
|
|
||||||
}
|
|
||||||
|
|
||||||
finalQuery = fmt.Sprintf("SELECT durationNano as numTotal FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.durationTable)
|
|
||||||
finalQuery += query
|
|
||||||
finalQuery += " ORDER BY durationNano DESC LIMIT 1"
|
|
||||||
var dBResponse2 []model.DBResponseTotal
|
|
||||||
err = r.db.Select(ctx, &dBResponse2, finalQuery, args...)
|
|
||||||
zap.L().Info(finalQuery)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
|
||||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)}
|
|
||||||
}
|
|
||||||
if len(dBResponse) > 0 {
|
|
||||||
traceFilterReponse.Duration["minDuration"] = dBResponse[0].NumTotal
|
|
||||||
}
|
|
||||||
if len(dBResponse2) > 0 {
|
|
||||||
traceFilterReponse.Duration["maxDuration"] = dBResponse2[0].NumTotal
|
|
||||||
}
|
|
||||||
}
|
|
||||||
case constants.RPCMethod:
|
|
||||||
finalQuery := fmt.Sprintf("SELECT rpcMethod, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable)
|
|
||||||
finalQuery += query
|
|
||||||
finalQuery += " GROUP BY rpcMethod"
|
|
||||||
var dBResponse []model.DBResponseRPCMethod
|
|
||||||
err := r.db.Select(ctx, &dBResponse, finalQuery, args...)
|
|
||||||
zap.L().Info(finalQuery)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
|
||||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)}
|
|
||||||
}
|
|
||||||
for _, service := range dBResponse {
|
|
||||||
if service.RPCMethod != "" {
|
|
||||||
traceFilterReponse.RPCMethod[service.RPCMethod] = service.Count
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
case constants.ResponseStatusCode:
|
|
||||||
finalQuery := fmt.Sprintf("SELECT responseStatusCode, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable)
|
|
||||||
finalQuery += query
|
|
||||||
finalQuery += " GROUP BY responseStatusCode"
|
|
||||||
var dBResponse []model.DBResponseStatusCodeMethod
|
|
||||||
err := r.db.Select(ctx, &dBResponse, finalQuery, args...)
|
|
||||||
zap.L().Info(finalQuery)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
|
||||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)}
|
|
||||||
}
|
|
||||||
for _, service := range dBResponse {
|
|
||||||
if service.ResponseStatusCode != "" {
|
|
||||||
traceFilterReponse.ResponseStatusCode[service.ResponseStatusCode] = service.Count
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
default:
|
|
||||||
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("filter type: %s not supported", e)}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return &traceFilterReponse, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func getStatusFilters(query string, statusParams []string, excludeMap map[string]struct{}) string {
|
func getStatusFilters(query string, statusParams []string, excludeMap map[string]struct{}) string {
|
||||||
|
|
||||||
// status can only be two and if both are selected than they are equivalent to none selected
|
// status can only be two and if both are selected than they are equivalent to none selected
|
||||||
@ -1088,140 +787,6 @@ func getStatusFilters(query string, statusParams []string, excludeMap map[string
|
|||||||
return query
|
return query
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *ClickHouseReader) GetFilteredSpans(ctx context.Context, queryParams *model.GetFilteredSpansParams) (*model.GetFilterSpansResponse, *model.ApiError) {
|
|
||||||
|
|
||||||
queryTable := fmt.Sprintf("%s.%s", r.TraceDB, r.indexTable)
|
|
||||||
|
|
||||||
excludeMap := make(map[string]struct{})
|
|
||||||
for _, e := range queryParams.Exclude {
|
|
||||||
if e == constants.OperationRequest {
|
|
||||||
excludeMap[constants.OperationDB] = struct{}{}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
excludeMap[e] = struct{}{}
|
|
||||||
}
|
|
||||||
|
|
||||||
var query string
|
|
||||||
args := []interface{}{clickhouse.Named("timestampL", strconv.FormatInt(queryParams.Start.UnixNano(), 10)), clickhouse.Named("timestampU", strconv.FormatInt(queryParams.End.UnixNano(), 10))}
|
|
||||||
if len(queryParams.TraceID) > 0 {
|
|
||||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.TraceID, constants.TraceID, &query, args)
|
|
||||||
}
|
|
||||||
if len(queryParams.ServiceName) > 0 {
|
|
||||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.ServiceName, constants.ServiceName, &query, args)
|
|
||||||
}
|
|
||||||
if len(queryParams.HttpRoute) > 0 {
|
|
||||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpRoute, constants.HttpRoute, &query, args)
|
|
||||||
}
|
|
||||||
if len(queryParams.HttpHost) > 0 {
|
|
||||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpHost, constants.HttpHost, &query, args)
|
|
||||||
}
|
|
||||||
if len(queryParams.HttpMethod) > 0 {
|
|
||||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpMethod, constants.HttpMethod, &query, args)
|
|
||||||
}
|
|
||||||
if len(queryParams.HttpUrl) > 0 {
|
|
||||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpUrl, constants.HttpUrl, &query, args)
|
|
||||||
}
|
|
||||||
if len(queryParams.Operation) > 0 {
|
|
||||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.Operation, constants.OperationDB, &query, args)
|
|
||||||
}
|
|
||||||
if len(queryParams.RPCMethod) > 0 {
|
|
||||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.RPCMethod, constants.RPCMethod, &query, args)
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(queryParams.ResponseStatusCode) > 0 {
|
|
||||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.ResponseStatusCode, constants.ResponseStatusCode, &query, args)
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(queryParams.MinDuration) != 0 {
|
|
||||||
query = query + " AND durationNano >= @durationNanoMin"
|
|
||||||
args = append(args, clickhouse.Named("durationNanoMin", queryParams.MinDuration))
|
|
||||||
}
|
|
||||||
if len(queryParams.MaxDuration) != 0 {
|
|
||||||
query = query + " AND durationNano <= @durationNanoMax"
|
|
||||||
args = append(args, clickhouse.Named("durationNanoMax", queryParams.MaxDuration))
|
|
||||||
}
|
|
||||||
query = getStatusFilters(query, queryParams.Status, excludeMap)
|
|
||||||
|
|
||||||
if len(queryParams.SpanKind) != 0 {
|
|
||||||
query = query + " AND kind = @kind"
|
|
||||||
args = append(args, clickhouse.Named("kind", queryParams.SpanKind))
|
|
||||||
}
|
|
||||||
|
|
||||||
// create TagQuery from TagQueryParams
|
|
||||||
tags := createTagQueryFromTagQueryParams(queryParams.Tags)
|
|
||||||
subQuery, argsSubQuery, errStatus := buildQueryWithTagParams(ctx, tags)
|
|
||||||
query += subQuery
|
|
||||||
args = append(args, argsSubQuery...)
|
|
||||||
if errStatus != nil {
|
|
||||||
return nil, errStatus
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(queryParams.OrderParam) != 0 {
|
|
||||||
if queryParams.OrderParam == constants.Duration {
|
|
||||||
queryTable = fmt.Sprintf("%s.%s", r.TraceDB, r.durationTable)
|
|
||||||
if queryParams.Order == constants.Descending {
|
|
||||||
query = query + " ORDER BY durationNano DESC"
|
|
||||||
}
|
|
||||||
if queryParams.Order == constants.Ascending {
|
|
||||||
query = query + " ORDER BY durationNano ASC"
|
|
||||||
}
|
|
||||||
} else if queryParams.OrderParam == constants.Timestamp {
|
|
||||||
projectionOptQuery := "SET allow_experimental_projection_optimization = 1"
|
|
||||||
err := r.db.Exec(ctx, projectionOptQuery)
|
|
||||||
|
|
||||||
zap.L().Info(projectionOptQuery)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
|
||||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query")}
|
|
||||||
}
|
|
||||||
if queryParams.Order == constants.Descending {
|
|
||||||
query = query + " ORDER BY timestamp DESC"
|
|
||||||
}
|
|
||||||
if queryParams.Order == constants.Ascending {
|
|
||||||
query = query + " ORDER BY timestamp ASC"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if queryParams.Limit > 0 {
|
|
||||||
query = query + " LIMIT @limit"
|
|
||||||
args = append(args, clickhouse.Named("limit", queryParams.Limit))
|
|
||||||
}
|
|
||||||
|
|
||||||
if queryParams.Offset > 0 {
|
|
||||||
query = query + " OFFSET @offset"
|
|
||||||
args = append(args, clickhouse.Named("offset", queryParams.Offset))
|
|
||||||
}
|
|
||||||
|
|
||||||
var getFilterSpansResponseItems []model.GetFilterSpansResponseItem
|
|
||||||
|
|
||||||
baseQuery := fmt.Sprintf("SELECT timestamp, spanID, traceID, serviceName, name, durationNano, httpMethod, rpcMethod, responseStatusCode FROM %s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryTable)
|
|
||||||
baseQuery += query
|
|
||||||
err := r.db.Select(ctx, &getFilterSpansResponseItems, baseQuery, args...)
|
|
||||||
// Fill status and method
|
|
||||||
for i, e := range getFilterSpansResponseItems {
|
|
||||||
if e.RPCMethod != "" {
|
|
||||||
getFilterSpansResponseItems[i].Method = e.RPCMethod
|
|
||||||
} else {
|
|
||||||
getFilterSpansResponseItems[i].Method = e.HttpMethod
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
zap.L().Info(baseQuery)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
|
||||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query")}
|
|
||||||
}
|
|
||||||
|
|
||||||
getFilterSpansResponse := model.GetFilterSpansResponse{
|
|
||||||
Spans: getFilterSpansResponseItems,
|
|
||||||
TotalSpans: 1000,
|
|
||||||
}
|
|
||||||
|
|
||||||
return &getFilterSpansResponse, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func createTagQueryFromTagQueryParams(queryParams []model.TagQueryParam) []model.TagQuery {
|
func createTagQueryFromTagQueryParams(queryParams []model.TagQueryParam) []model.TagQuery {
|
||||||
tags := []model.TagQuery{}
|
tags := []model.TagQuery{}
|
||||||
for _, tag := range queryParams {
|
for _, tag := range queryParams {
|
||||||
@ -1379,87 +944,6 @@ func addExistsOperator(item model.TagQuery, tagMapType string, not bool) (string
|
|||||||
return fmt.Sprintf(" AND %s (%s)", notStr, strings.Join(tagOperatorPair, " OR ")), args
|
return fmt.Sprintf(" AND %s (%s)", notStr, strings.Join(tagOperatorPair, " OR ")), args
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *ClickHouseReader) GetTagFilters(ctx context.Context, queryParams *model.TagFilterParams) (*model.TagFilters, *model.ApiError) {
|
|
||||||
|
|
||||||
excludeMap := make(map[string]struct{})
|
|
||||||
for _, e := range queryParams.Exclude {
|
|
||||||
if e == constants.OperationRequest {
|
|
||||||
excludeMap[constants.OperationDB] = struct{}{}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
excludeMap[e] = struct{}{}
|
|
||||||
}
|
|
||||||
|
|
||||||
var query string
|
|
||||||
args := []interface{}{clickhouse.Named("timestampL", strconv.FormatInt(queryParams.Start.UnixNano(), 10)), clickhouse.Named("timestampU", strconv.FormatInt(queryParams.End.UnixNano(), 10))}
|
|
||||||
if len(queryParams.TraceID) > 0 {
|
|
||||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.TraceID, constants.TraceID, &query, args)
|
|
||||||
}
|
|
||||||
if len(queryParams.ServiceName) > 0 {
|
|
||||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.ServiceName, constants.ServiceName, &query, args)
|
|
||||||
}
|
|
||||||
if len(queryParams.HttpRoute) > 0 {
|
|
||||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpRoute, constants.HttpRoute, &query, args)
|
|
||||||
}
|
|
||||||
if len(queryParams.HttpHost) > 0 {
|
|
||||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpHost, constants.HttpHost, &query, args)
|
|
||||||
}
|
|
||||||
if len(queryParams.HttpMethod) > 0 {
|
|
||||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpMethod, constants.HttpMethod, &query, args)
|
|
||||||
}
|
|
||||||
if len(queryParams.HttpUrl) > 0 {
|
|
||||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpUrl, constants.HttpUrl, &query, args)
|
|
||||||
}
|
|
||||||
if len(queryParams.Operation) > 0 {
|
|
||||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.Operation, constants.OperationDB, &query, args)
|
|
||||||
}
|
|
||||||
if len(queryParams.RPCMethod) > 0 {
|
|
||||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.RPCMethod, constants.RPCMethod, &query, args)
|
|
||||||
}
|
|
||||||
if len(queryParams.ResponseStatusCode) > 0 {
|
|
||||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.ResponseStatusCode, constants.ResponseStatusCode, &query, args)
|
|
||||||
}
|
|
||||||
if len(queryParams.MinDuration) != 0 {
|
|
||||||
query = query + " AND durationNano >= @durationNanoMin"
|
|
||||||
args = append(args, clickhouse.Named("durationNanoMin", queryParams.MinDuration))
|
|
||||||
}
|
|
||||||
if len(queryParams.MaxDuration) != 0 {
|
|
||||||
query = query + " AND durationNano <= @durationNanoMax"
|
|
||||||
args = append(args, clickhouse.Named("durationNanoMax", queryParams.MaxDuration))
|
|
||||||
}
|
|
||||||
if len(queryParams.SpanKind) != 0 {
|
|
||||||
query = query + " AND kind = @kind"
|
|
||||||
args = append(args, clickhouse.Named("kind", queryParams.SpanKind))
|
|
||||||
}
|
|
||||||
|
|
||||||
query = getStatusFilters(query, queryParams.Status, excludeMap)
|
|
||||||
|
|
||||||
tagFilters := []model.TagFilters{}
|
|
||||||
|
|
||||||
// Alternative finalQuery := fmt.Sprintf(`SELECT DISTINCT arrayJoin(tagMap.keys) as tagKeys FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU`, r.TraceDB, r.indexTable)
|
|
||||||
finalQuery := fmt.Sprintf(`SELECT groupUniqArrayArray(mapKeys(stringTagMap)) as stringTagKeys, groupUniqArrayArray(mapKeys(numberTagMap)) as numberTagKeys, groupUniqArrayArray(mapKeys(boolTagMap)) as boolTagKeys FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU`, r.TraceDB, r.indexTable)
|
|
||||||
finalQuery += query
|
|
||||||
err := r.db.Select(ctx, &tagFilters, finalQuery, args...)
|
|
||||||
|
|
||||||
zap.L().Info(query)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
|
||||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query")}
|
|
||||||
}
|
|
||||||
tagFiltersResult := model.TagFilters{
|
|
||||||
StringTagKeys: make([]string, 0),
|
|
||||||
NumberTagKeys: make([]string, 0),
|
|
||||||
BoolTagKeys: make([]string, 0),
|
|
||||||
}
|
|
||||||
if len(tagFilters) != 0 {
|
|
||||||
tagFiltersResult.StringTagKeys = excludeTags(ctx, tagFilters[0].StringTagKeys)
|
|
||||||
tagFiltersResult.NumberTagKeys = excludeTags(ctx, tagFilters[0].NumberTagKeys)
|
|
||||||
tagFiltersResult.BoolTagKeys = excludeTags(ctx, tagFilters[0].BoolTagKeys)
|
|
||||||
}
|
|
||||||
return &tagFiltersResult, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func excludeTags(_ context.Context, tags []string) []string {
|
func excludeTags(_ context.Context, tags []string) []string {
|
||||||
excludedTagsMap := map[string]bool{
|
excludedTagsMap := map[string]bool{
|
||||||
"http.code": true,
|
"http.code": true,
|
||||||
@ -1483,102 +967,6 @@ func excludeTags(_ context.Context, tags []string) []string {
|
|||||||
return newTags
|
return newTags
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *ClickHouseReader) GetTagValues(ctx context.Context, queryParams *model.TagFilterParams) (*model.TagValues, *model.ApiError) {
|
|
||||||
|
|
||||||
if queryParams.TagKey.Type == model.TagTypeNumber {
|
|
||||||
return &model.TagValues{
|
|
||||||
NumberTagValues: make([]float64, 0),
|
|
||||||
StringTagValues: make([]string, 0),
|
|
||||||
BoolTagValues: make([]bool, 0),
|
|
||||||
}, nil
|
|
||||||
} else if queryParams.TagKey.Type == model.TagTypeBool {
|
|
||||||
return &model.TagValues{
|
|
||||||
NumberTagValues: make([]float64, 0),
|
|
||||||
StringTagValues: make([]string, 0),
|
|
||||||
BoolTagValues: []bool{true, false},
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
excludeMap := make(map[string]struct{})
|
|
||||||
for _, e := range queryParams.Exclude {
|
|
||||||
if e == constants.OperationRequest {
|
|
||||||
excludeMap[constants.OperationDB] = struct{}{}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
excludeMap[e] = struct{}{}
|
|
||||||
}
|
|
||||||
|
|
||||||
var query string
|
|
||||||
args := []interface{}{clickhouse.Named("timestampL", strconv.FormatInt(queryParams.Start.UnixNano(), 10)), clickhouse.Named("timestampU", strconv.FormatInt(queryParams.End.UnixNano(), 10))}
|
|
||||||
if len(queryParams.TraceID) > 0 {
|
|
||||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.TraceID, constants.TraceID, &query, args)
|
|
||||||
}
|
|
||||||
if len(queryParams.ServiceName) > 0 {
|
|
||||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.ServiceName, constants.ServiceName, &query, args)
|
|
||||||
}
|
|
||||||
if len(queryParams.HttpRoute) > 0 {
|
|
||||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpRoute, constants.HttpRoute, &query, args)
|
|
||||||
}
|
|
||||||
if len(queryParams.HttpHost) > 0 {
|
|
||||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpHost, constants.HttpHost, &query, args)
|
|
||||||
}
|
|
||||||
if len(queryParams.HttpMethod) > 0 {
|
|
||||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpMethod, constants.HttpMethod, &query, args)
|
|
||||||
}
|
|
||||||
if len(queryParams.HttpUrl) > 0 {
|
|
||||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpUrl, constants.HttpUrl, &query, args)
|
|
||||||
}
|
|
||||||
if len(queryParams.Operation) > 0 {
|
|
||||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.Operation, constants.OperationDB, &query, args)
|
|
||||||
}
|
|
||||||
if len(queryParams.MinDuration) != 0 {
|
|
||||||
query = query + " AND durationNano >= @durationNanoMin"
|
|
||||||
args = append(args, clickhouse.Named("durationNanoMin", queryParams.MinDuration))
|
|
||||||
}
|
|
||||||
if len(queryParams.MaxDuration) != 0 {
|
|
||||||
query = query + " AND durationNano <= @durationNanoMax"
|
|
||||||
args = append(args, clickhouse.Named("durationNanoMax", queryParams.MaxDuration))
|
|
||||||
}
|
|
||||||
if len(queryParams.SpanKind) != 0 {
|
|
||||||
query = query + " AND kind = @kind"
|
|
||||||
args = append(args, clickhouse.Named("kind", queryParams.SpanKind))
|
|
||||||
}
|
|
||||||
|
|
||||||
query = getStatusFilters(query, queryParams.Status, excludeMap)
|
|
||||||
|
|
||||||
tagValues := []model.TagValues{}
|
|
||||||
|
|
||||||
finalQuery := fmt.Sprintf(`SELECT groupArray(DISTINCT stringTagMap[@key]) as stringTagValues FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU`, r.TraceDB, r.indexTable)
|
|
||||||
finalQuery += query
|
|
||||||
finalQuery += " LIMIT @limit"
|
|
||||||
|
|
||||||
args = append(args, clickhouse.Named("key", queryParams.TagKey.Key))
|
|
||||||
args = append(args, clickhouse.Named("limit", queryParams.Limit))
|
|
||||||
err := r.db.Select(ctx, &tagValues, finalQuery, args...)
|
|
||||||
|
|
||||||
zap.L().Info(query)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
|
||||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query")}
|
|
||||||
}
|
|
||||||
|
|
||||||
cleanedTagValues := model.TagValues{
|
|
||||||
StringTagValues: []string{},
|
|
||||||
NumberTagValues: []float64{},
|
|
||||||
BoolTagValues: []bool{},
|
|
||||||
}
|
|
||||||
if len(tagValues) == 0 {
|
|
||||||
return &cleanedTagValues, nil
|
|
||||||
}
|
|
||||||
for _, e := range tagValues[0].StringTagValues {
|
|
||||||
if e != "" {
|
|
||||||
cleanedTagValues.StringTagValues = append(cleanedTagValues.StringTagValues, e)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return &cleanedTagValues, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *ClickHouseReader) GetTopOperations(ctx context.Context, queryParams *model.GetTopOperationsParams) (*[]model.TopOperationsItem, *model.ApiError) {
|
func (r *ClickHouseReader) GetTopOperations(ctx context.Context, queryParams *model.GetTopOperationsParams) (*[]model.TopOperationsItem, *model.ApiError) {
|
||||||
|
|
||||||
namedArgs := []interface{}{
|
namedArgs := []interface{}{
|
||||||
@ -1823,185 +1211,6 @@ func (r *ClickHouseReader) GetDependencyGraph(ctx context.Context, queryParams *
|
|||||||
return &response, nil
|
return &response, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *ClickHouseReader) GetFilteredSpansAggregates(ctx context.Context, queryParams *model.GetFilteredSpanAggregatesParams) (*model.GetFilteredSpansAggregatesResponse, *model.ApiError) {
|
|
||||||
|
|
||||||
excludeMap := make(map[string]struct{})
|
|
||||||
for _, e := range queryParams.Exclude {
|
|
||||||
if e == constants.OperationRequest {
|
|
||||||
excludeMap[constants.OperationDB] = struct{}{}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
excludeMap[e] = struct{}{}
|
|
||||||
}
|
|
||||||
|
|
||||||
SpanAggregatesDBResponseItems := []model.SpanAggregatesDBResponseItem{}
|
|
||||||
|
|
||||||
aggregation_query := ""
|
|
||||||
if queryParams.Dimension == "duration" {
|
|
||||||
switch queryParams.AggregationOption {
|
|
||||||
case "p50":
|
|
||||||
aggregation_query = " quantile(0.50)(durationNano) as float64Value "
|
|
||||||
case "p95":
|
|
||||||
aggregation_query = " quantile(0.95)(durationNano) as float64Value "
|
|
||||||
case "p90":
|
|
||||||
aggregation_query = " quantile(0.90)(durationNano) as float64Value "
|
|
||||||
case "p99":
|
|
||||||
aggregation_query = " quantile(0.99)(durationNano) as float64Value "
|
|
||||||
case "max":
|
|
||||||
aggregation_query = " max(durationNano) as value "
|
|
||||||
case "min":
|
|
||||||
aggregation_query = " min(durationNano) as value "
|
|
||||||
case "avg":
|
|
||||||
aggregation_query = " avg(durationNano) as float64Value "
|
|
||||||
case "sum":
|
|
||||||
aggregation_query = " sum(durationNano) as value "
|
|
||||||
default:
|
|
||||||
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("aggregate type: %s not supported", queryParams.AggregationOption)}
|
|
||||||
}
|
|
||||||
} else if queryParams.Dimension == "calls" {
|
|
||||||
aggregation_query = " count(*) as value "
|
|
||||||
}
|
|
||||||
|
|
||||||
args := []interface{}{clickhouse.Named("timestampL", strconv.FormatInt(queryParams.Start.UnixNano(), 10)), clickhouse.Named("timestampU", strconv.FormatInt(queryParams.End.UnixNano(), 10))}
|
|
||||||
|
|
||||||
var query string
|
|
||||||
var customStr []string
|
|
||||||
_, columnExists := constants.GroupByColMap[queryParams.GroupBy]
|
|
||||||
// Using %s for groupBy params as it can be a custom column and custom columns are not supported by clickhouse-go yet:
|
|
||||||
// issue link: https://github.com/ClickHouse/clickhouse-go/issues/870
|
|
||||||
if queryParams.GroupBy != "" && columnExists {
|
|
||||||
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, %s as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, queryParams.GroupBy, aggregation_query, r.TraceDB, r.indexTable)
|
|
||||||
args = append(args, clickhouse.Named("groupByVar", queryParams.GroupBy))
|
|
||||||
} else if queryParams.GroupBy != "" {
|
|
||||||
customStr = strings.Split(queryParams.GroupBy, ".(")
|
|
||||||
if len(customStr) < 2 {
|
|
||||||
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("GroupBy: %s not supported", queryParams.GroupBy)}
|
|
||||||
}
|
|
||||||
if customStr[1] == string(model.TagTypeString)+")" {
|
|
||||||
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, stringTagMap['%s'] as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, customStr[0], aggregation_query, r.TraceDB, r.indexTable)
|
|
||||||
} else if customStr[1] == string(model.TagTypeNumber)+")" {
|
|
||||||
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, toString(numberTagMap['%s']) as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, customStr[0], aggregation_query, r.TraceDB, r.indexTable)
|
|
||||||
} else if customStr[1] == string(model.TagTypeBool)+")" {
|
|
||||||
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, toString(boolTagMap['%s']) as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, customStr[0], aggregation_query, r.TraceDB, r.indexTable)
|
|
||||||
} else {
|
|
||||||
// return error for unsupported group by
|
|
||||||
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("GroupBy: %s not supported", queryParams.GroupBy)}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.TraceDB, r.indexTable)
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(queryParams.TraceID) > 0 {
|
|
||||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.TraceID, constants.TraceID, &query, args)
|
|
||||||
}
|
|
||||||
if len(queryParams.ServiceName) > 0 {
|
|
||||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.ServiceName, constants.ServiceName, &query, args)
|
|
||||||
}
|
|
||||||
if len(queryParams.HttpRoute) > 0 {
|
|
||||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpRoute, constants.HttpRoute, &query, args)
|
|
||||||
}
|
|
||||||
if len(queryParams.HttpHost) > 0 {
|
|
||||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpHost, constants.HttpHost, &query, args)
|
|
||||||
}
|
|
||||||
if len(queryParams.HttpMethod) > 0 {
|
|
||||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpMethod, constants.HttpMethod, &query, args)
|
|
||||||
}
|
|
||||||
if len(queryParams.HttpUrl) > 0 {
|
|
||||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpUrl, constants.HttpUrl, &query, args)
|
|
||||||
}
|
|
||||||
if len(queryParams.Operation) > 0 {
|
|
||||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.Operation, constants.OperationDB, &query, args)
|
|
||||||
}
|
|
||||||
if len(queryParams.RPCMethod) > 0 {
|
|
||||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.RPCMethod, constants.RPCMethod, &query, args)
|
|
||||||
}
|
|
||||||
if len(queryParams.ResponseStatusCode) > 0 {
|
|
||||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.ResponseStatusCode, constants.ResponseStatusCode, &query, args)
|
|
||||||
}
|
|
||||||
if len(queryParams.MinDuration) != 0 {
|
|
||||||
query = query + " AND durationNano >= @durationNanoMin"
|
|
||||||
args = append(args, clickhouse.Named("durationNanoMin", queryParams.MinDuration))
|
|
||||||
}
|
|
||||||
if len(queryParams.MaxDuration) != 0 {
|
|
||||||
query = query + " AND durationNano <= @durationNanoMax"
|
|
||||||
args = append(args, clickhouse.Named("durationNanoMax", queryParams.MaxDuration))
|
|
||||||
}
|
|
||||||
query = getStatusFilters(query, queryParams.Status, excludeMap)
|
|
||||||
|
|
||||||
if len(queryParams.SpanKind) != 0 {
|
|
||||||
query = query + " AND kind = @kind"
|
|
||||||
args = append(args, clickhouse.Named("kind", queryParams.SpanKind))
|
|
||||||
}
|
|
||||||
// create TagQuery from TagQueryParams
|
|
||||||
tags := createTagQueryFromTagQueryParams(queryParams.Tags)
|
|
||||||
subQuery, argsSubQuery, errStatus := buildQueryWithTagParams(ctx, tags)
|
|
||||||
query += subQuery
|
|
||||||
args = append(args, argsSubQuery...)
|
|
||||||
|
|
||||||
if errStatus != nil {
|
|
||||||
return nil, errStatus
|
|
||||||
}
|
|
||||||
|
|
||||||
if queryParams.GroupBy != "" && columnExists {
|
|
||||||
query = query + fmt.Sprintf(" GROUP BY time, %s as groupBy ORDER BY time", queryParams.GroupBy)
|
|
||||||
} else if queryParams.GroupBy != "" {
|
|
||||||
if customStr[1] == string(model.TagTypeString)+")" {
|
|
||||||
query = query + fmt.Sprintf(" GROUP BY time, stringTagMap['%s'] as groupBy ORDER BY time", customStr[0])
|
|
||||||
} else if customStr[1] == string(model.TagTypeNumber)+")" {
|
|
||||||
query = query + fmt.Sprintf(" GROUP BY time, toString(numberTagMap['%s']) as groupBy ORDER BY time", customStr[0])
|
|
||||||
} else if customStr[1] == string(model.TagTypeBool)+")" {
|
|
||||||
query = query + fmt.Sprintf(" GROUP BY time, toString(boolTagMap['%s']) as groupBy ORDER BY time", customStr[0])
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
query = query + " GROUP BY time ORDER BY time"
|
|
||||||
}
|
|
||||||
|
|
||||||
err := r.db.Select(ctx, &SpanAggregatesDBResponseItems, query, args...)
|
|
||||||
|
|
||||||
zap.L().Info(query)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
|
||||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query")}
|
|
||||||
}
|
|
||||||
|
|
||||||
GetFilteredSpansAggregatesResponse := model.GetFilteredSpansAggregatesResponse{
|
|
||||||
Items: map[int64]model.SpanAggregatesResponseItem{},
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := range SpanAggregatesDBResponseItems {
|
|
||||||
if SpanAggregatesDBResponseItems[i].Value == 0 {
|
|
||||||
SpanAggregatesDBResponseItems[i].Value = uint64(SpanAggregatesDBResponseItems[i].Float64Value)
|
|
||||||
}
|
|
||||||
SpanAggregatesDBResponseItems[i].Timestamp = int64(SpanAggregatesDBResponseItems[i].Time.UnixNano())
|
|
||||||
SpanAggregatesDBResponseItems[i].FloatValue = float32(SpanAggregatesDBResponseItems[i].Value)
|
|
||||||
if queryParams.AggregationOption == "rate_per_sec" {
|
|
||||||
SpanAggregatesDBResponseItems[i].FloatValue = float32(SpanAggregatesDBResponseItems[i].Value) / float32(queryParams.StepSeconds)
|
|
||||||
}
|
|
||||||
if responseElement, ok := GetFilteredSpansAggregatesResponse.Items[SpanAggregatesDBResponseItems[i].Timestamp]; !ok {
|
|
||||||
if queryParams.GroupBy != "" && SpanAggregatesDBResponseItems[i].GroupBy != "" {
|
|
||||||
GetFilteredSpansAggregatesResponse.Items[SpanAggregatesDBResponseItems[i].Timestamp] = model.SpanAggregatesResponseItem{
|
|
||||||
Timestamp: SpanAggregatesDBResponseItems[i].Timestamp,
|
|
||||||
GroupBy: map[string]float32{SpanAggregatesDBResponseItems[i].GroupBy: SpanAggregatesDBResponseItems[i].FloatValue},
|
|
||||||
}
|
|
||||||
} else if queryParams.GroupBy == "" {
|
|
||||||
GetFilteredSpansAggregatesResponse.Items[SpanAggregatesDBResponseItems[i].Timestamp] = model.SpanAggregatesResponseItem{
|
|
||||||
Timestamp: SpanAggregatesDBResponseItems[i].Timestamp,
|
|
||||||
Value: SpanAggregatesDBResponseItems[i].FloatValue,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
} else {
|
|
||||||
if queryParams.GroupBy != "" && SpanAggregatesDBResponseItems[i].GroupBy != "" {
|
|
||||||
responseElement.GroupBy[SpanAggregatesDBResponseItems[i].GroupBy] = SpanAggregatesDBResponseItems[i].FloatValue
|
|
||||||
}
|
|
||||||
GetFilteredSpansAggregatesResponse.Items[SpanAggregatesDBResponseItems[i].Timestamp] = responseElement
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return &GetFilteredSpansAggregatesResponse, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func getLocalTableName(tableName string) string {
|
func getLocalTableName(tableName string) string {
|
||||||
|
|
||||||
tableNameSplit := strings.Split(tableName, ".")
|
tableNameSplit := strings.Split(tableName, ".")
|
||||||
|
@ -526,12 +526,6 @@ func (aH *APIHandler) RegisterRoutes(router *mux.Router, am *AuthMiddleware) {
|
|||||||
router.HandleFunc("/api/v1/configs", am.OpenAccess(aH.getConfigs)).Methods(http.MethodGet)
|
router.HandleFunc("/api/v1/configs", am.OpenAccess(aH.getConfigs)).Methods(http.MethodGet)
|
||||||
router.HandleFunc("/api/v1/health", am.OpenAccess(aH.getHealth)).Methods(http.MethodGet)
|
router.HandleFunc("/api/v1/health", am.OpenAccess(aH.getHealth)).Methods(http.MethodGet)
|
||||||
|
|
||||||
router.HandleFunc("/api/v1/getSpanFilters", am.ViewAccess(aH.getSpanFilters)).Methods(http.MethodPost)
|
|
||||||
router.HandleFunc("/api/v1/getTagFilters", am.ViewAccess(aH.getTagFilters)).Methods(http.MethodPost)
|
|
||||||
router.HandleFunc("/api/v1/getFilteredSpans", am.ViewAccess(aH.getFilteredSpans)).Methods(http.MethodPost)
|
|
||||||
router.HandleFunc("/api/v1/getFilteredSpans/aggregates", am.ViewAccess(aH.getFilteredSpanAggregates)).Methods(http.MethodPost)
|
|
||||||
router.HandleFunc("/api/v1/getTagValues", am.ViewAccess(aH.getTagValues)).Methods(http.MethodPost)
|
|
||||||
|
|
||||||
router.HandleFunc("/api/v1/listErrors", am.ViewAccess(aH.listErrors)).Methods(http.MethodPost)
|
router.HandleFunc("/api/v1/listErrors", am.ViewAccess(aH.listErrors)).Methods(http.MethodPost)
|
||||||
router.HandleFunc("/api/v1/countErrors", am.ViewAccess(aH.countErrors)).Methods(http.MethodPost)
|
router.HandleFunc("/api/v1/countErrors", am.ViewAccess(aH.countErrors)).Methods(http.MethodPost)
|
||||||
router.HandleFunc("/api/v1/errorFromErrorID", am.ViewAccess(aH.getErrorFromErrorID)).Methods(http.MethodGet)
|
router.HandleFunc("/api/v1/errorFromErrorID", am.ViewAccess(aH.getErrorFromErrorID)).Methods(http.MethodGet)
|
||||||
@ -1847,86 +1841,6 @@ func (aH *APIHandler) getErrorFromGroupID(w http.ResponseWriter, r *http.Request
|
|||||||
aH.WriteJSON(w, r, result)
|
aH.WriteJSON(w, r, result)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (aH *APIHandler) getSpanFilters(w http.ResponseWriter, r *http.Request) {
|
|
||||||
|
|
||||||
query, err := parseSpanFilterRequestBody(r)
|
|
||||||
if aH.HandleError(w, err, http.StatusBadRequest) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
result, apiErr := aH.reader.GetSpanFilters(r.Context(), query)
|
|
||||||
|
|
||||||
if apiErr != nil && aH.HandleError(w, apiErr.Err, http.StatusInternalServerError) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
aH.WriteJSON(w, r, result)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (aH *APIHandler) getFilteredSpans(w http.ResponseWriter, r *http.Request) {
|
|
||||||
|
|
||||||
query, err := parseFilteredSpansRequest(r, aH)
|
|
||||||
if aH.HandleError(w, err, http.StatusBadRequest) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
result, apiErr := aH.reader.GetFilteredSpans(r.Context(), query)
|
|
||||||
|
|
||||||
if apiErr != nil && aH.HandleError(w, apiErr.Err, http.StatusInternalServerError) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
aH.WriteJSON(w, r, result)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (aH *APIHandler) getFilteredSpanAggregates(w http.ResponseWriter, r *http.Request) {
|
|
||||||
|
|
||||||
query, err := parseFilteredSpanAggregatesRequest(r)
|
|
||||||
if aH.HandleError(w, err, http.StatusBadRequest) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
result, apiErr := aH.reader.GetFilteredSpansAggregates(r.Context(), query)
|
|
||||||
|
|
||||||
if apiErr != nil && aH.HandleError(w, apiErr.Err, http.StatusInternalServerError) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
aH.WriteJSON(w, r, result)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (aH *APIHandler) getTagFilters(w http.ResponseWriter, r *http.Request) {
|
|
||||||
|
|
||||||
query, err := parseTagFilterRequest(r)
|
|
||||||
if aH.HandleError(w, err, http.StatusBadRequest) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
result, apiErr := aH.reader.GetTagFilters(r.Context(), query)
|
|
||||||
|
|
||||||
if apiErr != nil && aH.HandleError(w, apiErr.Err, http.StatusInternalServerError) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
aH.WriteJSON(w, r, result)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (aH *APIHandler) getTagValues(w http.ResponseWriter, r *http.Request) {
|
|
||||||
|
|
||||||
query, err := parseTagValueRequest(r)
|
|
||||||
if aH.HandleError(w, err, http.StatusBadRequest) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
result, apiErr := aH.reader.GetTagValues(r.Context(), query)
|
|
||||||
|
|
||||||
if apiErr != nil && aH.HandleError(w, apiErr.Err, http.StatusInternalServerError) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
aH.WriteJSON(w, r, result)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (aH *APIHandler) setTTL(w http.ResponseWriter, r *http.Request) {
|
func (aH *APIHandler) setTTL(w http.ResponseWriter, r *http.Request) {
|
||||||
ttlParams, err := parseTTLParams(r)
|
ttlParams, err := parseTTLParams(r)
|
||||||
if aH.HandleError(w, err, http.StatusBadRequest) {
|
if aH.HandleError(w, err, http.StatusBadRequest) {
|
||||||
|
@ -29,15 +29,10 @@ type Reader interface {
|
|||||||
// GetDisks returns a list of disks configured in the underlying DB. It is supported by
|
// GetDisks returns a list of disks configured in the underlying DB. It is supported by
|
||||||
// clickhouse only.
|
// clickhouse only.
|
||||||
GetDisks(ctx context.Context) (*[]model.DiskItem, *model.ApiError)
|
GetDisks(ctx context.Context) (*[]model.DiskItem, *model.ApiError)
|
||||||
GetSpanFilters(ctx context.Context, query *model.SpanFilterParams) (*model.SpanFiltersResponse, *model.ApiError)
|
|
||||||
GetTraceAggregateAttributes(ctx context.Context, req *v3.AggregateAttributeRequest) (*v3.AggregateAttributeResponse, error)
|
GetTraceAggregateAttributes(ctx context.Context, req *v3.AggregateAttributeRequest) (*v3.AggregateAttributeResponse, error)
|
||||||
GetTraceAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error)
|
GetTraceAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error)
|
||||||
GetTraceAttributeValues(ctx context.Context, req *v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error)
|
GetTraceAttributeValues(ctx context.Context, req *v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error)
|
||||||
GetSpanAttributeKeys(ctx context.Context) (map[string]v3.AttributeKey, error)
|
GetSpanAttributeKeys(ctx context.Context) (map[string]v3.AttributeKey, error)
|
||||||
GetTagFilters(ctx context.Context, query *model.TagFilterParams) (*model.TagFilters, *model.ApiError)
|
|
||||||
GetTagValues(ctx context.Context, query *model.TagFilterParams) (*model.TagValues, *model.ApiError)
|
|
||||||
GetFilteredSpans(ctx context.Context, query *model.GetFilteredSpansParams) (*model.GetFilterSpansResponse, *model.ApiError)
|
|
||||||
GetFilteredSpansAggregates(ctx context.Context, query *model.GetFilteredSpanAggregatesParams) (*model.GetFilteredSpansAggregatesResponse, *model.ApiError)
|
|
||||||
|
|
||||||
ListErrors(ctx context.Context, params *model.ListErrorsParams) (*[]model.Error, *model.ApiError)
|
ListErrors(ctx context.Context, params *model.ListErrorsParams) (*[]model.Error, *model.ApiError)
|
||||||
CountErrors(ctx context.Context, params *model.CountErrorsParams) (uint64, *model.ApiError)
|
CountErrors(ctx context.Context, params *model.CountErrorsParams) (uint64, *model.ApiError)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user