mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-08-14 05:55:59 +08:00
Add Span attributes APIs (#2414)
This commit is contained in:
parent
1726469aaa
commit
2c206e8bf4
@ -28,6 +28,7 @@ const (
|
|||||||
defaultSpansTable string = "distributed_signoz_spans"
|
defaultSpansTable string = "distributed_signoz_spans"
|
||||||
defaultDependencyGraphTable string = "distributed_dependency_graph_minutes_v2"
|
defaultDependencyGraphTable string = "distributed_dependency_graph_minutes_v2"
|
||||||
defaultTopLevelOperationsTable string = "distributed_top_level_operations"
|
defaultTopLevelOperationsTable string = "distributed_top_level_operations"
|
||||||
|
defaultSpanAttributeTable string = "distributed_span_attributes"
|
||||||
defaultLogsDB string = "signoz_logs"
|
defaultLogsDB string = "signoz_logs"
|
||||||
defaultLogsTable string = "distributed_logs"
|
defaultLogsTable string = "distributed_logs"
|
||||||
defaultLogsLocalTable string = "logs"
|
defaultLogsLocalTable string = "logs"
|
||||||
@ -63,6 +64,7 @@ type namespaceConfig struct {
|
|||||||
UsageExplorerTable string
|
UsageExplorerTable string
|
||||||
SpansTable string
|
SpansTable string
|
||||||
ErrorTable string
|
ErrorTable string
|
||||||
|
SpanAttributeTable string
|
||||||
DependencyGraphTable string
|
DependencyGraphTable string
|
||||||
TopLevelOperationsTable string
|
TopLevelOperationsTable string
|
||||||
LogsDB string
|
LogsDB string
|
||||||
@ -132,6 +134,7 @@ func NewOptions(datasource string, primaryNamespace string, otherNamespaces ...s
|
|||||||
DurationTable: defaultDurationTable,
|
DurationTable: defaultDurationTable,
|
||||||
UsageExplorerTable: defaultUsageExplorerTable,
|
UsageExplorerTable: defaultUsageExplorerTable,
|
||||||
SpansTable: defaultSpansTable,
|
SpansTable: defaultSpansTable,
|
||||||
|
SpanAttributeTable: defaultSpanAttributeTable,
|
||||||
DependencyGraphTable: defaultDependencyGraphTable,
|
DependencyGraphTable: defaultDependencyGraphTable,
|
||||||
TopLevelOperationsTable: defaultTopLevelOperationsTable,
|
TopLevelOperationsTable: defaultTopLevelOperationsTable,
|
||||||
LogsDB: defaultLogsDB,
|
LogsDB: defaultLogsDB,
|
||||||
|
@ -95,6 +95,7 @@ type ClickHouseReader struct {
|
|||||||
errorTable string
|
errorTable string
|
||||||
usageExplorerTable string
|
usageExplorerTable string
|
||||||
SpansTable string
|
SpansTable string
|
||||||
|
spanAttributeTable string
|
||||||
dependencyGraphTable string
|
dependencyGraphTable string
|
||||||
topLevelOperationsTable string
|
topLevelOperationsTable string
|
||||||
logsDB string
|
logsDB string
|
||||||
@ -145,6 +146,7 @@ func NewReader(localDB *sqlx.DB, configFile string, featureFlag interfaces.Featu
|
|||||||
usageExplorerTable: options.primary.UsageExplorerTable,
|
usageExplorerTable: options.primary.UsageExplorerTable,
|
||||||
durationTable: options.primary.DurationTable,
|
durationTable: options.primary.DurationTable,
|
||||||
SpansTable: options.primary.SpansTable,
|
SpansTable: options.primary.SpansTable,
|
||||||
|
spanAttributeTable: options.primary.SpanAttributeTable,
|
||||||
dependencyGraphTable: options.primary.DependencyGraphTable,
|
dependencyGraphTable: options.primary.DependencyGraphTable,
|
||||||
topLevelOperationsTable: options.primary.TopLevelOperationsTable,
|
topLevelOperationsTable: options.primary.TopLevelOperationsTable,
|
||||||
logsDB: options.primary.LogsDB,
|
logsDB: options.primary.LogsDB,
|
||||||
@ -4218,3 +4220,163 @@ func (r *ClickHouseReader) CheckClickHouse(ctx context.Context) error {
|
|||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *ClickHouseReader) GetTraceAggregateAttributes(ctx context.Context, req *v3.AggregateAttributeRequest) (*v3.AggregateAttributeResponse, error) {
|
||||||
|
var query string
|
||||||
|
var err error
|
||||||
|
var rows driver.Rows
|
||||||
|
var response v3.AggregateAttributeResponse
|
||||||
|
where := ""
|
||||||
|
switch req.Operator {
|
||||||
|
case v3.AggregateOperatorCountDistinct:
|
||||||
|
where = "tagKey ILIKE $1"
|
||||||
|
case
|
||||||
|
v3.AggregateOperatorRateSum,
|
||||||
|
v3.AggregateOperatorRateMax,
|
||||||
|
v3.AggregateOperatorRateAvg,
|
||||||
|
v3.AggregateOperatorRate,
|
||||||
|
v3.AggregateOperatorRateMin,
|
||||||
|
v3.AggregateOperatorP05,
|
||||||
|
v3.AggregateOperatorP10,
|
||||||
|
v3.AggregateOperatorP20,
|
||||||
|
v3.AggregateOperatorP25,
|
||||||
|
v3.AggregateOperatorP50,
|
||||||
|
v3.AggregateOperatorP75,
|
||||||
|
v3.AggregateOperatorP90,
|
||||||
|
v3.AggregateOperatorP95,
|
||||||
|
v3.AggregateOperatorP99,
|
||||||
|
v3.AggregateOperatorAvg,
|
||||||
|
v3.AggregateOperatorSum,
|
||||||
|
v3.AggregateOperatorMin,
|
||||||
|
v3.AggregateOperatorMax:
|
||||||
|
where = "tagKey ILIKE $1 AND dataType='float64'"
|
||||||
|
case
|
||||||
|
v3.AggregateOpeatorCount,
|
||||||
|
v3.AggregateOperatorNoOp:
|
||||||
|
return &v3.AggregateAttributeResponse{}, nil
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("unsupported aggregate operator")
|
||||||
|
}
|
||||||
|
query = fmt.Sprintf("SELECT DISTINCT(tagKey), tagType, dataType, isColumn FROM %s.%s WHERE %s", r.TraceDB, r.spanAttributeTable, where)
|
||||||
|
if req.Limit != 0 {
|
||||||
|
query = query + fmt.Sprintf(" LIMIT %d;", req.Limit)
|
||||||
|
}
|
||||||
|
rows, err = r.db.Query(ctx, query, fmt.Sprintf("%%%s%%", req.SearchText))
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
zap.S().Error(err)
|
||||||
|
return nil, fmt.Errorf("error while executing query: %s", err.Error())
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
var tagKey string
|
||||||
|
var dataType string
|
||||||
|
var tagType string
|
||||||
|
var isColumn bool
|
||||||
|
for rows.Next() {
|
||||||
|
if err := rows.Scan(&tagKey, &tagType, &dataType, &isColumn); err != nil {
|
||||||
|
return nil, fmt.Errorf("error while scanning rows: %s", err.Error())
|
||||||
|
}
|
||||||
|
key := v3.AttributeKey{
|
||||||
|
Key: tagKey,
|
||||||
|
DataType: v3.AttributeKeyDataType(dataType),
|
||||||
|
Type: v3.AttributeKeyType(tagType),
|
||||||
|
IsColumn: isColumn,
|
||||||
|
}
|
||||||
|
response.AttributeKeys = append(response.AttributeKeys, key)
|
||||||
|
}
|
||||||
|
return &response, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *ClickHouseReader) GetTraceAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) {
|
||||||
|
|
||||||
|
var query string
|
||||||
|
var err error
|
||||||
|
var rows driver.Rows
|
||||||
|
var response v3.FilterAttributeKeyResponse
|
||||||
|
|
||||||
|
query = fmt.Sprintf("SELECT DISTINCT(tagKey), tagType, dataType, isColumn FROM %s.%s WHERE tagKey ILIKE $1", r.TraceDB, r.spanAttributeTable)
|
||||||
|
|
||||||
|
if req.Limit != 0 {
|
||||||
|
query = query + fmt.Sprintf(" LIMIT %d;", req.Limit)
|
||||||
|
}
|
||||||
|
rows, err = r.db.Query(ctx, query, fmt.Sprintf("%%%s%%", req.SearchText))
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
zap.S().Error(err)
|
||||||
|
return nil, fmt.Errorf("error while executing query: %s", err.Error())
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
var tagKey string
|
||||||
|
var dataType string
|
||||||
|
var tagType string
|
||||||
|
var isColumn bool
|
||||||
|
for rows.Next() {
|
||||||
|
if err := rows.Scan(&tagKey, &tagType, &dataType, &isColumn); err != nil {
|
||||||
|
return nil, fmt.Errorf("error while scanning rows: %s", err.Error())
|
||||||
|
}
|
||||||
|
key := v3.AttributeKey{
|
||||||
|
Key: tagKey,
|
||||||
|
DataType: v3.AttributeKeyDataType(dataType),
|
||||||
|
Type: v3.AttributeKeyType(tagType),
|
||||||
|
IsColumn: isColumn,
|
||||||
|
}
|
||||||
|
response.AttributeKeys = append(response.AttributeKeys, key)
|
||||||
|
}
|
||||||
|
return &response, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *ClickHouseReader) GetTraceAttributeValues(ctx context.Context, req *v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error) {
|
||||||
|
|
||||||
|
var query string
|
||||||
|
var err error
|
||||||
|
var rows driver.Rows
|
||||||
|
var attributeValues v3.FilterAttributeValueResponse
|
||||||
|
// if dataType or tagType is not present return empty response
|
||||||
|
if len(req.FilterAttributeKeyDataType) == 0 || len(req.TagType) == 0 || req.FilterAttributeKey == "body" {
|
||||||
|
return &v3.FilterAttributeValueResponse{}, nil
|
||||||
|
}
|
||||||
|
switch req.FilterAttributeKeyDataType {
|
||||||
|
case v3.AttributeKeyDataTypeString:
|
||||||
|
query = fmt.Sprintf("SELECT DISTINCT stringTagValue from %s.%s WHERE tagKey = $1 AND stringTagValue ILIKE $2 AND tagType=$3 limit $4", r.TraceDB, r.spanAttributeTable)
|
||||||
|
rows, err = r.db.Query(ctx, query, req.FilterAttributeKey, fmt.Sprintf("%%%s%%", req.SearchText), req.TagType, req.Limit)
|
||||||
|
if err != nil {
|
||||||
|
zap.S().Error(err)
|
||||||
|
return nil, fmt.Errorf("error while executing query: %s", err.Error())
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
var strAttributeValue string
|
||||||
|
for rows.Next() {
|
||||||
|
if err := rows.Scan(&strAttributeValue); err != nil {
|
||||||
|
return nil, fmt.Errorf("error while scanning rows: %s", err.Error())
|
||||||
|
}
|
||||||
|
attributeValues.StringAttributeValues = append(attributeValues.StringAttributeValues, strAttributeValue)
|
||||||
|
}
|
||||||
|
case v3.AttributeKeyDataTypeFloat64, v3.AttributeKeyDataTypeInt64:
|
||||||
|
query = fmt.Sprintf("SELECT DISTINCT float64TagValue from %s.%s where tagKey = $1 AND toString(float64TagValue) ILIKE $2 AND tagType=$3 limit $4", r.TraceDB, r.spanAttributeTable)
|
||||||
|
rows, err = r.db.Query(ctx, query, req.FilterAttributeKey, fmt.Sprintf("%%%s%%", req.SearchText), req.TagType, req.Limit)
|
||||||
|
if err != nil {
|
||||||
|
zap.S().Error(err)
|
||||||
|
return nil, fmt.Errorf("error while executing query: %s", err.Error())
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
var numberAttributeValue sql.NullFloat64
|
||||||
|
for rows.Next() {
|
||||||
|
if err := rows.Scan(&numberAttributeValue); err != nil {
|
||||||
|
return nil, fmt.Errorf("error while scanning rows: %s", err.Error())
|
||||||
|
}
|
||||||
|
if numberAttributeValue.Valid {
|
||||||
|
attributeValues.NumberAttributeValues = append(attributeValues.NumberAttributeValues, numberAttributeValue.Float64)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case v3.AttributeKeyDataTypeBool:
|
||||||
|
attributeValues.BoolAttributeValues = []bool{true, false}
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("invalid data type")
|
||||||
|
}
|
||||||
|
|
||||||
|
return &attributeValues, nil
|
||||||
|
}
|
||||||
|
@ -2370,7 +2370,7 @@ func (aH *APIHandler) autocompleteAggregateAttributes(w http.ResponseWriter, r *
|
|||||||
case v3.DataSourceLogs:
|
case v3.DataSourceLogs:
|
||||||
response, err = aH.reader.GetLogAggregateAttributes(r.Context(), req)
|
response, err = aH.reader.GetLogAggregateAttributes(r.Context(), req)
|
||||||
case v3.DataSourceTraces:
|
case v3.DataSourceTraces:
|
||||||
// TODO: implement
|
response, err = aH.reader.GetTraceAggregateAttributes(r.Context(), req)
|
||||||
default:
|
default:
|
||||||
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("invalid data source")}, nil)
|
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("invalid data source")}, nil)
|
||||||
return
|
return
|
||||||
@ -2399,7 +2399,7 @@ func (aH *APIHandler) autoCompleteAttributeKeys(w http.ResponseWriter, r *http.R
|
|||||||
case v3.DataSourceLogs:
|
case v3.DataSourceLogs:
|
||||||
response, err = aH.reader.GetLogAttributeKeys(r.Context(), req)
|
response, err = aH.reader.GetLogAttributeKeys(r.Context(), req)
|
||||||
case v3.DataSourceTraces:
|
case v3.DataSourceTraces:
|
||||||
// TODO: implement
|
response, err = aH.reader.GetTraceAttributeKeys(r.Context(), req)
|
||||||
default:
|
default:
|
||||||
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("invalid data source")}, nil)
|
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("invalid data source")}, nil)
|
||||||
return
|
return
|
||||||
@ -2428,7 +2428,7 @@ func (aH *APIHandler) autoCompleteAttributeValues(w http.ResponseWriter, r *http
|
|||||||
case v3.DataSourceLogs:
|
case v3.DataSourceLogs:
|
||||||
response, err = aH.reader.GetLogAttributeValues(r.Context(), req)
|
response, err = aH.reader.GetLogAttributeValues(r.Context(), req)
|
||||||
case v3.DataSourceTraces:
|
case v3.DataSourceTraces:
|
||||||
// TODO: implement
|
response, err = aH.reader.GetTraceAttributeValues(r.Context(), req)
|
||||||
default:
|
default:
|
||||||
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("invalid data source")}, nil)
|
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("invalid data source")}, nil)
|
||||||
return
|
return
|
||||||
|
@ -35,6 +35,9 @@ type Reader interface {
|
|||||||
// clickhouse only.
|
// clickhouse only.
|
||||||
GetDisks(ctx context.Context) (*[]model.DiskItem, *model.ApiError)
|
GetDisks(ctx context.Context) (*[]model.DiskItem, *model.ApiError)
|
||||||
GetSpanFilters(ctx context.Context, query *model.SpanFilterParams) (*model.SpanFiltersResponse, *model.ApiError)
|
GetSpanFilters(ctx context.Context, query *model.SpanFilterParams) (*model.SpanFiltersResponse, *model.ApiError)
|
||||||
|
GetTraceAggregateAttributes(ctx context.Context, req *v3.AggregateAttributeRequest) (*v3.AggregateAttributeResponse, error)
|
||||||
|
GetTraceAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error)
|
||||||
|
GetTraceAttributeValues(ctx context.Context, req *v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error)
|
||||||
GetTagFilters(ctx context.Context, query *model.TagFilterParams) (*model.TagFilters, *model.ApiError)
|
GetTagFilters(ctx context.Context, query *model.TagFilterParams) (*model.TagFilters, *model.ApiError)
|
||||||
GetTagValues(ctx context.Context, query *model.TagFilterParams) (*model.TagValues, *model.ApiError)
|
GetTagValues(ctx context.Context, query *model.TagFilterParams) (*model.TagValues, *model.ApiError)
|
||||||
GetFilteredSpans(ctx context.Context, query *model.GetFilteredSpansParams) (*model.GetFilterSpansResponse, *model.ApiError)
|
GetFilteredSpans(ctx context.Context, query *model.GetFilteredSpansParams) (*model.GetFilterSpansResponse, *model.ApiError)
|
||||||
|
@ -221,7 +221,7 @@ func (q AttributeKeyDataType) Validate() error {
|
|||||||
case AttributeKeyDataTypeString, AttributeKeyDataTypeInt64, AttributeKeyDataTypeFloat64, AttributeKeyDataTypeBool:
|
case AttributeKeyDataTypeString, AttributeKeyDataTypeInt64, AttributeKeyDataTypeFloat64, AttributeKeyDataTypeBool:
|
||||||
return nil
|
return nil
|
||||||
default:
|
default:
|
||||||
return fmt.Errorf("invalid tag type: %s", q)
|
return fmt.Errorf("invalid tag data type: %s", q)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user