feat: update clickhouse reader to support new trace schema (#6479)

* feat: update clickhouse reader to support new trace schema

* fix: minor fixes

* fix: address comments

* fix: add changes to overview function

* fix: add changes to overview function

* fix: use hardcoded true

* fix: address comments
This commit is contained in:
Nityananda Gohain 2024-11-20 23:35:44 +05:30 committed by GitHub
parent 5044861773
commit d43adc24ef
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 524 additions and 23 deletions

View File

@ -22,6 +22,7 @@ const (
defaultTraceDB string = "signoz_traces"
defaultOperationsTable string = "distributed_signoz_operations"
defaultIndexTable string = "distributed_signoz_index_v2"
defaultLocalIndexTable string = "signoz_index_v2"
defaultErrorTable string = "distributed_signoz_error_index_v2"
defaultDurationTable string = "distributed_durationSort"
defaultUsageExplorerTable string = "distributed_usage_explorer"
@ -45,6 +46,11 @@ const (
defaultLogsTableV2 string = "distributed_logs_v2"
defaultLogsResourceLocalTableV2 string = "logs_v2_resource"
defaultLogsResourceTableV2 string = "distributed_logs_v2_resource"
defaultTraceIndexTableV3 string = "distributed_signoz_index_v3"
defaultTraceLocalTableName string = "signoz_index_v3"
defaultTraceResourceTableV3 string = "distributed_traces_v3_resource"
defaultTraceSummaryTable string = "distributed_trace_summary"
)
// NamespaceConfig is Clickhouse's internal configuration data
@ -58,6 +64,7 @@ type namespaceConfig struct {
TraceDB string
OperationsTable string
IndexTable string
LocalIndexTable string
DurationTable string
UsageExplorerTable string
SpansTable string
@ -82,6 +89,11 @@ type namespaceConfig struct {
LogsTableV2 string
LogsResourceLocalTableV2 string
LogsResourceTableV2 string
TraceIndexTableV3 string
TraceLocalTableNameV3 string
TraceResourceTableV3 string
TraceSummaryTable string
}
// Connecto defines how to connect to the database
@ -150,6 +162,7 @@ func NewOptions(
TraceDB: defaultTraceDB,
OperationsTable: defaultOperationsTable,
IndexTable: defaultIndexTable,
LocalIndexTable: defaultLocalIndexTable,
ErrorTable: defaultErrorTable,
DurationTable: defaultDurationTable,
UsageExplorerTable: defaultUsageExplorerTable,
@ -174,6 +187,11 @@ func NewOptions(
LogsLocalTableV2: defaultLogsLocalTableV2,
LogsResourceTableV2: defaultLogsResourceTableV2,
LogsResourceLocalTableV2: defaultLogsResourceLocalTableV2,
TraceIndexTableV3: defaultTraceIndexTableV3,
TraceLocalTableNameV3: defaultTraceLocalTableName,
TraceResourceTableV3: defaultTraceResourceTableV3,
TraceSummaryTable: defaultTraceSummaryTable,
},
others: make(map[string]*namespaceConfig, len(otherNamespaces)),
}

View File

@ -145,9 +145,16 @@ type ClickHouseReader struct {
liveTailRefreshSeconds int
cluster string
useLogsNewSchema bool
useLogsNewSchema bool
useTraceNewSchema bool
logsTableName string
logsLocalTableName string
traceTableName string
traceLocalTableName string
traceResourceTableV3 string
traceSummaryTable string
}
// NewTraceReader returns a TraceReader for the database
@ -160,6 +167,7 @@ func NewReader(
dialTimeout time.Duration,
cluster string,
useLogsNewSchema bool,
// useTraceNewSchema bool, // TODO: uncomment this in integration PR
) *ClickHouseReader {
datasource := os.Getenv("ClickHouseUrl")
@ -181,6 +189,7 @@ func NewReaderFromClickhouseConnection(
featureFlag interfaces.FeatureLookup,
cluster string,
useLogsNewSchema bool,
// useTraceNewSchema bool,
) *ClickHouseReader {
alertManager, err := am.New()
if err != nil {
@ -218,6 +227,14 @@ func NewReaderFromClickhouseConnection(
logsLocalTableName = options.primary.LogsLocalTableV2
}
traceTableName := options.primary.IndexTable
traceLocalTableName := options.primary.LocalIndexTable
// TODO: uncomment this in integration PR
// if useTraceNewSchema {
// traceTableName = options.primary.TraceIndexTableV3
// traceLocalTableName = options.primary.TraceLocalTableNameV3
// }
return &ClickHouseReader{
db: wrap,
localDB: localDB,
@ -253,6 +270,12 @@ func NewReaderFromClickhouseConnection(
logsResourceLocalTableV2: options.primary.LogsResourceLocalTableV2,
logsTableName: logsTableName,
logsLocalTableName: logsLocalTableName,
// useTraceNewSchema: useTraceNewSchema,
traceLocalTableName: traceLocalTableName,
traceTableName: traceTableName,
traceResourceTableV3: options.primary.TraceResourceTableV3,
traceSummaryTable: options.primary.TraceSummaryTable,
}
}
@ -465,7 +488,11 @@ func (r *ClickHouseReader) GetQueryRangeResult(ctx context.Context, query *model
func (r *ClickHouseReader) GetServicesList(ctx context.Context) (*[]string, error) {
services := []string{}
query := fmt.Sprintf(`SELECT DISTINCT serviceName FROM %s.%s WHERE toDate(timestamp) > now() - INTERVAL 1 DAY`, r.TraceDB, r.indexTable)
query := fmt.Sprintf(`SELECT DISTINCT serviceName FROM %s.%s WHERE toDate(timestamp) > now() - INTERVAL 1 DAY`, r.TraceDB, r.traceTableName)
if r.useTraceNewSchema {
query = fmt.Sprintf(`SELECT DISTINCT serviceName FROM %s.%s WHERE ts_bucket_start > (toUnixTimestamp(now() - INTERVAL 1 DAY) - 1800) AND toDate(timestamp) > now() - INTERVAL 1 DAY`, r.TraceDB, r.traceTableName)
}
rows, err := r.db.Query(ctx, query)
@ -574,14 +601,14 @@ func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.G
count(*) as numCalls
FROM %s.%s
WHERE serviceName = @serviceName AND name In @names AND timestamp>= @start AND timestamp<= @end`,
r.TraceDB, r.indexTable,
r.TraceDB, r.traceTableName,
)
errorQuery := fmt.Sprintf(
`SELECT
count(*) as numErrors
FROM %s.%s
WHERE serviceName = @serviceName AND name In @names AND timestamp>= @start AND timestamp<= @end AND statusCode=2`,
r.TraceDB, r.indexTable,
r.TraceDB, r.traceTableName,
)
args := []interface{}{}
@ -591,6 +618,18 @@ func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.G
clickhouse.Named("serviceName", svc),
clickhouse.Named("names", ops),
)
if r.useTraceNewSchema {
resourceBucketFilter := fmt.Sprintf(constants.TraceResourceBucketFilterWithServiceName, r.TraceDB, r.traceResourceTableV3)
query += resourceBucketFilter
errorQuery += resourceBucketFilter
args = append(args,
clickhouse.Named("start_bucket", strconv.FormatInt(queryParams.Start.Unix()-1800, 10)),
clickhouse.Named("end_bucket", strconv.FormatInt(queryParams.End.Unix(), 10)),
clickhouse.Named("labelFilter", "%service.name%"+strings.ToLower(utils.QuoteEscapedStringForContains(svc, true))+"%"),
)
}
// create TagQuery from TagQueryParams
tags := createTagQueryFromTagQueryParams(queryParams.Tags)
subQuery, argsSubQuery, errStatus := buildQueryWithTagParams(ctx, tags)
@ -642,6 +681,7 @@ func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.G
}
return &serviceItems, nil
}
func getStatusFilters(query string, statusParams []string, excludeMap map[string]struct{}) string {
// status can only be two and if both are selected than they are equivalent to none selected
@ -863,8 +903,19 @@ func (r *ClickHouseReader) GetTopOperations(ctx context.Context, queryParams *mo
name
FROM %s.%s
WHERE serviceName = @serviceName AND timestamp>= @start AND timestamp<= @end`,
r.TraceDB, r.indexTable,
r.TraceDB, r.traceTableName,
)
if r.useTraceNewSchema {
resourceBucketFilter := fmt.Sprintf(constants.TraceResourceBucketFilterWithServiceName, r.TraceDB, r.traceResourceTableV3)
query += resourceBucketFilter
namedArgs = append(namedArgs,
clickhouse.Named("start_bucket", strconv.FormatInt(queryParams.Start.Unix()-1800, 10)),
clickhouse.Named("end_bucket", strconv.FormatInt(queryParams.End.Unix(), 10)),
clickhouse.Named("labelFilter", "%service.name%"+strings.ToLower(utils.QuoteEscapedStringForContains(queryParams.ServiceName, true))+"%"),
)
}
args := []interface{}{}
args = append(args, namedArgs...)
// create TagQuery from TagQueryParams
@ -930,10 +981,140 @@ func (r *ClickHouseReader) GetUsage(ctx context.Context, queryParams *model.GetU
return &usageItems, nil
}
func (r *ClickHouseReader) SearchTracesV2(ctx context.Context, params *model.SearchTracesParams) (*[]model.SearchSpansResult, error) {
searchSpansResult := []model.SearchSpansResult{
{
Columns: []string{"__time", "SpanId", "TraceId", "ServiceName", "Name", "Kind", "DurationNano", "TagsKeys", "TagsValues", "References", "Events", "HasError", "StatusMessage", "StatusCodeString", "SpanKind"},
IsSubTree: false,
Events: make([][]interface{}, 0),
},
}
var traceSummary model.TraceSummary
summaryQuery := fmt.Sprintf("SELECT * from %s.%s WHERE trace_id=$1", r.TraceDB, r.traceSummaryTable)
err := r.db.QueryRow(ctx, summaryQuery, params.TraceID).Scan(&traceSummary.TraceID, &traceSummary.Start, &traceSummary.End, &traceSummary.NumSpans)
if err != nil {
if err == sql.ErrNoRows {
return &searchSpansResult, nil
}
zap.L().Error("Error in processing sql query", zap.Error(err))
return nil, fmt.Errorf("error in processing sql query")
}
if traceSummary.NumSpans > uint64(params.MaxSpansInTrace) {
zap.L().Error("Max spans allowed in a trace limit reached", zap.Int("MaxSpansInTrace", params.MaxSpansInTrace),
zap.Uint64("Count", traceSummary.NumSpans))
userEmail, err := auth.GetEmailFromJwt(ctx)
if err == nil {
data := map[string]interface{}{
"traceSize": traceSummary.NumSpans,
"maxSpansInTraceLimit": params.MaxSpansInTrace,
}
telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_MAX_SPANS_ALLOWED_LIMIT_REACHED, data, userEmail, true, false)
}
return nil, fmt.Errorf("max spans allowed in trace limit reached, please contact support for more details")
}
userEmail, err := auth.GetEmailFromJwt(ctx)
if err == nil {
data := map[string]interface{}{
"traceSize": traceSummary.NumSpans,
}
telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_TRACE_DETAIL_API, data, userEmail, true, false)
}
var startTime, endTime, durationNano uint64
var searchScanResponses []model.SpanItemV2
query := fmt.Sprintf("SELECT timestamp, duration_nano, span_id, trace_id, has_error, kind, resource_string_service$$name, name, references, attributes_string, attributes_number, attributes_bool, resources_string, events, status_message, status_code_string, kind_string FROM %s.%s WHERE trace_id=$1 and ts_bucket_start>=$2 and ts_bucket_start<=$3", r.TraceDB, r.traceTableName)
start := time.Now()
err = r.db.Select(ctx, &searchScanResponses, query, params.TraceID, strconv.FormatInt(traceSummary.Start.Unix()-1800, 10), strconv.FormatInt(traceSummary.End.Unix(), 10))
zap.L().Info(query)
if err != nil {
zap.L().Error("Error in processing sql query", zap.Error(err))
return nil, fmt.Errorf("error in processing sql query")
}
end := time.Now()
zap.L().Debug("getTraceSQLQuery took: ", zap.Duration("duration", end.Sub(start)))
searchSpansResult[0].Events = make([][]interface{}, len(searchScanResponses))
searchSpanResponses := []model.SearchSpanResponseItem{}
start = time.Now()
for _, item := range searchScanResponses {
ref := []model.OtelSpanRef{}
err := json.Unmarshal([]byte(item.References), &ref)
if err != nil {
zap.L().Error("Error unmarshalling references", zap.Error(err))
return nil, err
}
// merge attributes_number and attributes_bool to attributes_string
for k, v := range item.Attributes_bool {
item.Attributes_string[k] = fmt.Sprintf("%v", v)
}
for k, v := range item.Attributes_number {
item.Attributes_string[k] = fmt.Sprintf("%v", v)
}
for k, v := range item.Resources_string {
item.Attributes_string[k] = v
}
jsonItem := model.SearchSpanResponseItem{
SpanID: item.SpanID,
TraceID: item.TraceID,
ServiceName: item.ServiceName,
Name: item.Name,
Kind: int32(item.Kind),
DurationNano: int64(item.DurationNano),
HasError: item.HasError,
StatusMessage: item.StatusMessage,
StatusCodeString: item.StatusCodeString,
SpanKind: item.SpanKind,
References: ref,
Events: item.Events,
TagMap: item.Attributes_string,
}
jsonItem.TimeUnixNano = uint64(item.TimeUnixNano.UnixNano() / 1000000)
searchSpanResponses = append(searchSpanResponses, jsonItem)
if startTime == 0 || jsonItem.TimeUnixNano < startTime {
startTime = jsonItem.TimeUnixNano
}
if endTime == 0 || jsonItem.TimeUnixNano > endTime {
endTime = jsonItem.TimeUnixNano
}
if durationNano == 0 || uint64(jsonItem.DurationNano) > durationNano {
durationNano = uint64(jsonItem.DurationNano)
}
}
end = time.Now()
zap.L().Debug("getTraceSQLQuery unmarshal took: ", zap.Duration("duration", end.Sub(start)))
for i, item := range searchSpanResponses {
spanEvents := item.GetValues()
searchSpansResult[0].Events[i] = spanEvents
}
searchSpansResult[0].StartTimestampMillis = startTime - (durationNano / 1000000)
searchSpansResult[0].EndTimestampMillis = endTime + (durationNano / 1000000)
return &searchSpansResult, nil
}
func (r *ClickHouseReader) SearchTraces(ctx context.Context, params *model.SearchTracesParams,
smartTraceAlgorithm func(payload []model.SearchSpanResponseItem, targetSpanId string,
levelUp int, levelDown int, spanLimit int) ([]model.SearchSpansResult, error)) (*[]model.SearchSpansResult, error) {
if r.useTraceNewSchema {
return r.SearchTracesV2(ctx, params)
}
var countSpans uint64
countQuery := fmt.Sprintf("SELECT count() as count from %s.%s WHERE traceID=$1", r.TraceDB, r.SpansTable)
err := r.db.QueryRow(ctx, countQuery, params.TraceID).Scan(&countSpans)
@ -2011,7 +2192,7 @@ func (r *ClickHouseReader) GetTotalSpans(ctx context.Context) (uint64, error) {
var totalSpans uint64
queryStr := fmt.Sprintf("SELECT count() from %s.%s;", signozTraceDBName, signozTraceTableName)
queryStr := fmt.Sprintf("SELECT count() from %s.%s;", signozTraceDBName, r.traceTableName)
r.db.QueryRow(ctx, queryStr).Scan(&totalSpans)
return totalSpans, nil
@ -2022,7 +2203,9 @@ func (r *ClickHouseReader) GetSpansInLastHeartBeatInterval(ctx context.Context,
var spansInLastHeartBeatInterval uint64
queryStr := fmt.Sprintf("SELECT count() from %s.%s where timestamp > toUnixTimestamp(now()-toIntervalMinute(%d));", signozTraceDBName, signozSpansTable, int(interval.Minutes()))
if r.useTraceNewSchema {
queryStr = fmt.Sprintf("SELECT count() from %s.%s where ts_bucket_start >= toUInt64(toUnixTimestamp(now() - toIntervalMinute(%d))) - 1800 and timestamp > toUnixTimestamp(now()-toIntervalMinute(%d));", signozTraceDBName, r.traceTableName, int(interval.Minutes()), int(interval.Minutes()))
}
r.db.QueryRow(ctx, queryStr).Scan(&spansInLastHeartBeatInterval)
return spansInLastHeartBeatInterval, nil
@ -2141,11 +2324,17 @@ func (r *ClickHouseReader) GetLogsInfoInLastHeartBeatInterval(ctx context.Contex
}
func (r *ClickHouseReader) GetTagsInfoInLastHeartBeatInterval(ctx context.Context, interval time.Duration) (*model.TagsInfo, error) {
queryStr := fmt.Sprintf(`select serviceName, stringTagMap['deployment.environment'] as env,
stringTagMap['telemetry.sdk.language'] as language from %s.%s
where timestamp > toUnixTimestamp(now()-toIntervalMinute(%d))
group by serviceName, env, language;`, r.TraceDB, r.indexTable, int(interval.Minutes()))
group by serviceName, env, language;`, r.TraceDB, r.traceTableName, int(interval.Minutes()))
if r.useTraceNewSchema {
queryStr = fmt.Sprintf(`select serviceName, resources_string['deployment.environment'] as env,
resources_string['telemetry.sdk.language'] as language from %s.%s
where timestamp > toUnixTimestamp(now()-toIntervalMinute(%d))
group by serviceName, env, language;`, r.TraceDB, r.traceTableName, int(interval.Minutes()))
}
tagTelemetryDataList := []model.TagTelemetryData{}
err := r.db.Select(ctx, &tagTelemetryDataList, queryStr)
@ -3603,7 +3792,102 @@ func (r *ClickHouseReader) CheckClickHouse(ctx context.Context) error {
return nil
}
func (r *ClickHouseReader) GetTraceAggregateAttributesV2(ctx context.Context, req *v3.AggregateAttributeRequest) (*v3.AggregateAttributeResponse, error) {
var query string
var err error
var rows driver.Rows
var response v3.AggregateAttributeResponse
var stringAllowed bool
where := ""
switch req.Operator {
case
v3.AggregateOperatorCountDistinct,
v3.AggregateOperatorCount:
where = "tagKey ILIKE $1"
stringAllowed = true
case
v3.AggregateOperatorRateSum,
v3.AggregateOperatorRateMax,
v3.AggregateOperatorRateAvg,
v3.AggregateOperatorRate,
v3.AggregateOperatorRateMin,
v3.AggregateOperatorP05,
v3.AggregateOperatorP10,
v3.AggregateOperatorP20,
v3.AggregateOperatorP25,
v3.AggregateOperatorP50,
v3.AggregateOperatorP75,
v3.AggregateOperatorP90,
v3.AggregateOperatorP95,
v3.AggregateOperatorP99,
v3.AggregateOperatorAvg,
v3.AggregateOperatorSum,
v3.AggregateOperatorMin,
v3.AggregateOperatorMax:
where = "tagKey ILIKE $1 AND dataType='float64'"
stringAllowed = false
case
v3.AggregateOperatorNoOp:
return &v3.AggregateAttributeResponse{}, nil
default:
return nil, fmt.Errorf("unsupported aggregate operator")
}
query = fmt.Sprintf("SELECT DISTINCT(tagKey), tagType, dataType FROM %s.%s WHERE %s", r.TraceDB, r.spanAttributeTable, where)
if req.Limit != 0 {
query = query + fmt.Sprintf(" LIMIT %d;", req.Limit)
}
rows, err = r.db.Query(ctx, query, fmt.Sprintf("%%%s%%", req.SearchText))
if err != nil {
zap.L().Error("Error while executing query", zap.Error(err))
return nil, fmt.Errorf("error while executing query: %s", err.Error())
}
defer rows.Close()
statements := []model.ShowCreateTableStatement{}
query = fmt.Sprintf("SHOW CREATE TABLE %s.%s", r.TraceDB, r.traceLocalTableName)
err = r.db.Select(ctx, &statements, query)
if err != nil {
return nil, fmt.Errorf("error while fetching trace schema: %s", err.Error())
}
var tagKey string
var dataType string
var tagType string
for rows.Next() {
if err := rows.Scan(&tagKey, &tagType, &dataType); err != nil {
return nil, fmt.Errorf("error while scanning rows: %s", err.Error())
}
key := v3.AttributeKey{
Key: tagKey,
DataType: v3.AttributeKeyDataType(dataType),
Type: v3.AttributeKeyType(tagType),
IsColumn: isColumn(true, statements[0].Statement, tagType, tagKey, dataType),
}
if _, ok := constants.DeprecatedStaticFieldsTraces[tagKey]; !ok {
response.AttributeKeys = append(response.AttributeKeys, key)
}
}
// add the new static fields
for _, field := range constants.NewStaticFieldsTraces {
if (!stringAllowed && field.DataType == v3.AttributeKeyDataTypeString) || (v3.AttributeKey{} == field) {
continue
} else if len(req.SearchText) == 0 || strings.Contains(field.Key, req.SearchText) {
response.AttributeKeys = append(response.AttributeKeys, field)
}
}
return &response, nil
}
func (r *ClickHouseReader) GetTraceAggregateAttributes(ctx context.Context, req *v3.AggregateAttributeRequest) (*v3.AggregateAttributeResponse, error) {
if r.useTraceNewSchema {
return r.GetTraceAggregateAttributesV2(ctx, req)
}
var query string
var err error
var rows driver.Rows
@ -3660,8 +3944,6 @@ func (r *ClickHouseReader) GetTraceAggregateAttributes(ctx context.Context, req
if err := rows.Scan(&tagKey, &tagType, &dataType, &isColumn); err != nil {
return nil, fmt.Errorf("error while scanning rows: %s", err.Error())
}
// TODO: Remove this once the column name are updated in the table
tagKey = tempHandleFixedColumns(tagKey)
key := v3.AttributeKey{
Key: tagKey,
DataType: v3.AttributeKeyDataType(dataType),
@ -3673,7 +3955,69 @@ func (r *ClickHouseReader) GetTraceAggregateAttributes(ctx context.Context, req
return &response, nil
}
func (r *ClickHouseReader) GetTraceAttributeKeysV2(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) {
var query string
var err error
var rows driver.Rows
var response v3.FilterAttributeKeyResponse
query = fmt.Sprintf("SELECT DISTINCT(tagKey), tagType, dataType FROM %s.%s WHERE tagKey ILIKE $1 LIMIT $2", r.TraceDB, r.spanAttributeTable)
rows, err = r.db.Query(ctx, query, fmt.Sprintf("%%%s%%", req.SearchText), req.Limit)
if err != nil {
zap.L().Error("Error while executing query", zap.Error(err))
return nil, fmt.Errorf("error while executing query: %s", err.Error())
}
defer rows.Close()
statements := []model.ShowCreateTableStatement{}
query = fmt.Sprintf("SHOW CREATE TABLE %s.%s", r.TraceDB, r.traceLocalTableName)
err = r.db.Select(ctx, &statements, query)
if err != nil {
return nil, fmt.Errorf("error while fetching trace schema: %s", err.Error())
}
var tagKey string
var dataType string
var tagType string
for rows.Next() {
if err := rows.Scan(&tagKey, &tagType, &dataType); err != nil {
return nil, fmt.Errorf("error while scanning rows: %s", err.Error())
}
key := v3.AttributeKey{
Key: tagKey,
DataType: v3.AttributeKeyDataType(dataType),
Type: v3.AttributeKeyType(tagType),
IsColumn: isColumn(true, statements[0].Statement, tagType, tagKey, dataType),
}
// don't send deprecated static fields
// this is added so that once the old tenants are moved to new schema,
// they old attributes are not sent to the frontend autocomplete
if _, ok := constants.DeprecatedStaticFieldsTraces[tagKey]; !ok {
response.AttributeKeys = append(response.AttributeKeys, key)
}
}
// add the new static fields
for _, f := range constants.NewStaticFieldsTraces {
if (v3.AttributeKey{} == f) {
continue
}
if len(req.SearchText) == 0 || strings.Contains(f.Key, req.SearchText) {
response.AttributeKeys = append(response.AttributeKeys, f)
}
}
return &response, nil
}
func (r *ClickHouseReader) GetTraceAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) {
if r.useTraceNewSchema {
return r.GetTraceAttributeKeysV2(ctx, req)
}
var query string
var err error
@ -3701,8 +4045,6 @@ func (r *ClickHouseReader) GetTraceAttributeKeys(ctx context.Context, req *v3.Fi
if err := rows.Scan(&tagKey, &tagType, &dataType, &isColumn); err != nil {
return nil, fmt.Errorf("error while scanning rows: %s", err.Error())
}
// TODO: Remove this once the column name are updated in the table
tagKey = tempHandleFixedColumns(tagKey)
key := v3.AttributeKey{
Key: tagKey,
DataType: v3.AttributeKeyDataType(dataType),
@ -3714,21 +4056,96 @@ func (r *ClickHouseReader) GetTraceAttributeKeys(ctx context.Context, req *v3.Fi
return &response, nil
}
// tempHandleFixedColumns is a temporary function to handle the fixed columns whose name has been changed in AttributeKeys Table
func tempHandleFixedColumns(tagKey string) string {
switch {
case tagKey == "traceId":
tagKey = "traceID"
case tagKey == "spanId":
tagKey = "spanID"
case tagKey == "parentSpanId":
tagKey = "parentSpanID"
func (r *ClickHouseReader) GetTraceAttributeValuesV2(ctx context.Context, req *v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error) {
var query string
var filterValueColumn string
var err error
var rows driver.Rows
var attributeValues v3.FilterAttributeValueResponse
// if dataType or tagType is not present return empty response
if len(req.FilterAttributeKeyDataType) == 0 || len(req.TagType) == 0 {
// add data type if it's a top level key
if k, ok := constants.StaticFieldsTraces[req.FilterAttributeKey]; ok {
req.FilterAttributeKeyDataType = k.DataType
} else {
return &v3.FilterAttributeValueResponse{}, nil
}
}
return tagKey
// if data type is bool, return true and false
if req.FilterAttributeKeyDataType == v3.AttributeKeyDataTypeBool {
return &v3.FilterAttributeValueResponse{
BoolAttributeValues: []bool{true, false},
}, nil
}
query = "select distinct"
switch req.FilterAttributeKeyDataType {
case v3.AttributeKeyDataTypeFloat64:
filterValueColumn = "float64TagValue"
case v3.AttributeKeyDataTypeString:
filterValueColumn = "stringTagValue"
}
searchText := fmt.Sprintf("%%%s%%", req.SearchText)
// check if the tagKey is a topLevelColumn
// here we are using StaticFieldsTraces instead of NewStaticFieldsTraces as we want to consider old columns as well.
if _, ok := constants.StaticFieldsTraces[req.FilterAttributeKey]; ok {
// query the column for the last 48 hours
filterValueColumnWhere := req.FilterAttributeKey
selectKey := req.FilterAttributeKey
if req.FilterAttributeKeyDataType != v3.AttributeKeyDataTypeString {
filterValueColumnWhere = fmt.Sprintf("toString(%s)", req.FilterAttributeKey)
selectKey = fmt.Sprintf("toInt64(%s)", req.FilterAttributeKey)
}
// TODO(nitya): remove 24 hour limit in future after checking the perf/resource implications
query = fmt.Sprintf("select distinct %s from %s.%s where ts_bucket_start >= toUInt64(toUnixTimestamp(now() - INTERVAL 48 HOUR)) AND timestamp >= toDateTime64(now() - INTERVAL 48 HOUR, 9) and %s ILIKE $1 limit $2", selectKey, r.TraceDB, r.traceTableName, filterValueColumnWhere)
rows, err = r.db.Query(ctx, query, searchText, req.Limit)
} else {
filterValueColumnWhere := filterValueColumn
if req.FilterAttributeKeyDataType != v3.AttributeKeyDataTypeString {
filterValueColumnWhere = fmt.Sprintf("toString(%s)", filterValueColumn)
}
query = fmt.Sprintf("select distinct %s from %s.%s where tagKey=$1 and %s ILIKE $2 and tagType=$3 limit $4", filterValueColumn, r.TraceDB, r.spanAttributeTable, filterValueColumnWhere)
rows, err = r.db.Query(ctx, query, req.FilterAttributeKey, searchText, req.TagType, req.Limit)
}
if err != nil {
zap.L().Error("Error while executing query", zap.Error(err))
return nil, fmt.Errorf("error while executing query: %s", err.Error())
}
defer rows.Close()
var strAttributeValue string
var float64AttributeValue sql.NullFloat64
for rows.Next() {
switch req.FilterAttributeKeyDataType {
case v3.AttributeKeyDataTypeFloat64:
if err := rows.Scan(&float64AttributeValue); err != nil {
return nil, fmt.Errorf("error while scanning rows: %s", err.Error())
}
if float64AttributeValue.Valid {
attributeValues.NumberAttributeValues = append(attributeValues.NumberAttributeValues, float64AttributeValue.Float64)
}
case v3.AttributeKeyDataTypeString:
if err := rows.Scan(&strAttributeValue); err != nil {
return nil, fmt.Errorf("error while scanning rows: %s", err.Error())
}
attributeValues.StringAttributeValues = append(attributeValues.StringAttributeValues, strAttributeValue)
}
}
return &attributeValues, nil
}
func (r *ClickHouseReader) GetTraceAttributeValues(ctx context.Context, req *v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error) {
if r.useTraceNewSchema {
return r.GetTraceAttributeValuesV2(ctx, req)
}
var query string
var err error
var rows driver.Rows
@ -3781,7 +4198,58 @@ func (r *ClickHouseReader) GetTraceAttributeValues(ctx context.Context, req *v3.
return &attributeValues, nil
}
func (r *ClickHouseReader) GetSpanAttributeKeysV2(ctx context.Context) (map[string]v3.AttributeKey, error) {
var query string
var err error
var rows driver.Rows
response := map[string]v3.AttributeKey{}
query = fmt.Sprintf("SELECT DISTINCT(tagKey), tagType, dataType FROM %s.%s", r.TraceDB, r.spanAttributesKeysTable)
rows, err = r.db.Query(ctx, query)
if err != nil {
zap.L().Error("Error while executing query", zap.Error(err))
return nil, fmt.Errorf("error while executing query: %s", err.Error())
}
defer rows.Close()
statements := []model.ShowCreateTableStatement{}
query = fmt.Sprintf("SHOW CREATE TABLE %s.%s", r.TraceDB, r.traceTableName)
err = r.db.Select(ctx, &statements, query)
if err != nil {
return nil, fmt.Errorf("error while fetching trace schema: %s", err.Error())
}
var tagKey string
var dataType string
var tagType string
for rows.Next() {
if err := rows.Scan(&tagKey, &tagType, &dataType); err != nil {
return nil, fmt.Errorf("error while scanning rows: %s", err.Error())
}
key := v3.AttributeKey{
Key: tagKey,
DataType: v3.AttributeKeyDataType(dataType),
Type: v3.AttributeKeyType(tagType),
IsColumn: isColumn(true, statements[0].Statement, tagType, tagKey, dataType),
}
name := tagKey + "##" + tagType + "##" + strings.ToLower(dataType)
response[name] = key
}
for _, key := range constants.StaticFieldsTraces {
name := key.Key + "##" + key.Type.String() + "##" + strings.ToLower(key.DataType.String())
response[name] = key
}
return response, nil
}
func (r *ClickHouseReader) GetSpanAttributeKeys(ctx context.Context) (map[string]v3.AttributeKey, error) {
if r.useTraceNewSchema {
return r.GetSpanAttributeKeysV2(ctx)
}
var query string
var err error
var rows driver.Rows

View File

@ -718,3 +718,17 @@ func init() {
}
const TRACE_V4_MAX_PAGINATION_LIMIT = 10000
const TraceResourceBucketFilterWithServiceName = `
AND (
resource_fingerprint GLOBAL IN
(
SELECT fingerprint FROM %s.%s
WHERE
seen_at_ts_bucket_start >= @start_bucket AND seen_at_ts_bucket_start <= @end_bucket AND
simpleJSONExtractString(labels, 'service.name') = @serviceName AND
labels like @labelFilter
)
)
AND ts_bucket_start >= @start_bucket AND ts_bucket_start <= @end_bucket
`

View File

@ -15,6 +15,7 @@ type SpanItemV2 struct {
Attributes_string map[string]string `ch:"attributes_string"`
Attributes_number map[string]float64 `ch:"attributes_number"`
Attributes_bool map[string]bool `ch:"attributes_bool"`
Resources_string map[string]string `ch:"resources_string"`
Events []string `ch:"events"`
StatusMessage string `ch:"status_message"`
StatusCodeString string `ch:"status_code_string"`