mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-08-12 20:29:04 +08:00
chore: add related values (#6619)
This commit is contained in:
parent
2b32ce190f
commit
c5219ac157
@ -48,6 +48,9 @@ const (
|
|||||||
defaultTraceLocalTableName string = "signoz_index_v3"
|
defaultTraceLocalTableName string = "signoz_index_v3"
|
||||||
defaultTraceResourceTableV3 string = "distributed_traces_v3_resource"
|
defaultTraceResourceTableV3 string = "distributed_traces_v3_resource"
|
||||||
defaultTraceSummaryTable string = "distributed_trace_summary"
|
defaultTraceSummaryTable string = "distributed_trace_summary"
|
||||||
|
|
||||||
|
defaultMetadataDB string = "signoz_metadata"
|
||||||
|
defaultMetadataTable string = "distributed_attributes_metadata"
|
||||||
)
|
)
|
||||||
|
|
||||||
// NamespaceConfig is Clickhouse's internal configuration data
|
// NamespaceConfig is Clickhouse's internal configuration data
|
||||||
@ -88,6 +91,9 @@ type namespaceConfig struct {
|
|||||||
TraceLocalTableNameV3 string
|
TraceLocalTableNameV3 string
|
||||||
TraceResourceTableV3 string
|
TraceResourceTableV3 string
|
||||||
TraceSummaryTable string
|
TraceSummaryTable string
|
||||||
|
|
||||||
|
MetadataDB string
|
||||||
|
MetadataTable string
|
||||||
}
|
}
|
||||||
|
|
||||||
// Connecto defines how to connect to the database
|
// Connecto defines how to connect to the database
|
||||||
@ -141,6 +147,9 @@ func NewOptions(
|
|||||||
TraceLocalTableNameV3: defaultTraceLocalTableName,
|
TraceLocalTableNameV3: defaultTraceLocalTableName,
|
||||||
TraceResourceTableV3: defaultTraceResourceTableV3,
|
TraceResourceTableV3: defaultTraceResourceTableV3,
|
||||||
TraceSummaryTable: defaultTraceSummaryTable,
|
TraceSummaryTable: defaultTraceSummaryTable,
|
||||||
|
|
||||||
|
MetadataDB: defaultMetadataDB,
|
||||||
|
MetadataTable: defaultMetadataTable,
|
||||||
},
|
},
|
||||||
others: make(map[string]*namespaceConfig, len(otherNamespaces)),
|
others: make(map[string]*namespaceConfig, len(otherNamespaces)),
|
||||||
}
|
}
|
||||||
|
@ -161,6 +161,8 @@ type ClickHouseReader struct {
|
|||||||
|
|
||||||
fluxIntervalForTraceDetail time.Duration
|
fluxIntervalForTraceDetail time.Duration
|
||||||
cache cache.Cache
|
cache cache.Cache
|
||||||
|
metadataDB string
|
||||||
|
metadataTable string
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewTraceReader returns a TraceReader for the database
|
// NewTraceReader returns a TraceReader for the database
|
||||||
@ -256,6 +258,8 @@ func NewReaderFromClickhouseConnection(
|
|||||||
|
|
||||||
fluxIntervalForTraceDetail: fluxIntervalForTraceDetail,
|
fluxIntervalForTraceDetail: fluxIntervalForTraceDetail,
|
||||||
cache: cache,
|
cache: cache,
|
||||||
|
metadataDB: options.primary.MetadataDB,
|
||||||
|
metadataTable: options.primary.MetadataTable,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1454,7 +1458,7 @@ func (r *ClickHouseReader) GetWaterfallSpansForTraceWithMetadata(ctx context.Con
|
|||||||
var serviceNameIntervalMap = map[string][]tracedetail.Interval{}
|
var serviceNameIntervalMap = map[string][]tracedetail.Interval{}
|
||||||
var hasMissingSpans bool
|
var hasMissingSpans bool
|
||||||
|
|
||||||
userEmail , emailErr := auth.GetEmailFromJwt(ctx)
|
userEmail, emailErr := auth.GetEmailFromJwt(ctx)
|
||||||
cachedTraceData, err := r.GetWaterfallSpansForTraceWithMetadataCache(ctx, traceID)
|
cachedTraceData, err := r.GetWaterfallSpansForTraceWithMetadataCache(ctx, traceID)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
startTime = cachedTraceData.StartTime
|
startTime = cachedTraceData.StartTime
|
||||||
@ -1530,8 +1534,8 @@ func (r *ClickHouseReader) GetWaterfallSpansForTraceWithMetadata(ctx context.Con
|
|||||||
if startTime == 0 || startTimeUnixNano < startTime {
|
if startTime == 0 || startTimeUnixNano < startTime {
|
||||||
startTime = startTimeUnixNano
|
startTime = startTimeUnixNano
|
||||||
}
|
}
|
||||||
if endTime == 0 || (startTimeUnixNano + jsonItem.DurationNano ) > endTime {
|
if endTime == 0 || (startTimeUnixNano+jsonItem.DurationNano) > endTime {
|
||||||
endTime = (startTimeUnixNano + jsonItem.DurationNano )
|
endTime = (startTimeUnixNano + jsonItem.DurationNano)
|
||||||
}
|
}
|
||||||
if durationNano == 0 || jsonItem.DurationNano > durationNano {
|
if durationNano == 0 || jsonItem.DurationNano > durationNano {
|
||||||
durationNano = jsonItem.DurationNano
|
durationNano = jsonItem.DurationNano
|
||||||
@ -1712,8 +1716,8 @@ func (r *ClickHouseReader) GetFlamegraphSpansForTrace(ctx context.Context, trace
|
|||||||
if startTime == 0 || startTimeUnixNano < startTime {
|
if startTime == 0 || startTimeUnixNano < startTime {
|
||||||
startTime = startTimeUnixNano
|
startTime = startTimeUnixNano
|
||||||
}
|
}
|
||||||
if endTime == 0 || ( startTimeUnixNano + jsonItem.DurationNano ) > endTime {
|
if endTime == 0 || (startTimeUnixNano+jsonItem.DurationNano) > endTime {
|
||||||
endTime = (startTimeUnixNano + jsonItem.DurationNano )
|
endTime = (startTimeUnixNano + jsonItem.DurationNano)
|
||||||
}
|
}
|
||||||
if durationNano == 0 || jsonItem.DurationNano > durationNano {
|
if durationNano == 0 || jsonItem.DurationNano > durationNano {
|
||||||
durationNano = jsonItem.DurationNano
|
durationNano = jsonItem.DurationNano
|
||||||
@ -4111,6 +4115,97 @@ func (r *ClickHouseReader) GetLogAttributeKeys(ctx context.Context, req *v3.Filt
|
|||||||
return &response, nil
|
return &response, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *ClickHouseReader) FetchRelatedValues(ctx context.Context, req *v3.FilterAttributeValueRequest) ([]string, error) {
|
||||||
|
var andConditions []string
|
||||||
|
|
||||||
|
andConditions = append(andConditions, fmt.Sprintf("unix_milli >= %d", req.StartTimeMillis))
|
||||||
|
andConditions = append(andConditions, fmt.Sprintf("unix_milli <= %d", req.EndTimeMillis))
|
||||||
|
|
||||||
|
if len(req.ExistingFilterItems) != 0 {
|
||||||
|
for _, item := range req.ExistingFilterItems {
|
||||||
|
// we only support string for related values
|
||||||
|
if item.Key.DataType != v3.AttributeKeyDataTypeString {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
var colName string
|
||||||
|
switch item.Key.Type {
|
||||||
|
case v3.AttributeKeyTypeResource:
|
||||||
|
colName = "resource_attributes"
|
||||||
|
case v3.AttributeKeyTypeTag:
|
||||||
|
colName = "attributes"
|
||||||
|
default:
|
||||||
|
// we only support resource and tag for related values as of now
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// IN doesn't make use of map value index, we convert it to = or !=
|
||||||
|
operator := item.Operator
|
||||||
|
if v3.FilterOperator(strings.ToLower(string(item.Operator))) == v3.FilterOperatorIn {
|
||||||
|
operator = "="
|
||||||
|
} else if v3.FilterOperator(strings.ToLower(string(item.Operator))) == v3.FilterOperatorNotIn {
|
||||||
|
operator = "!="
|
||||||
|
}
|
||||||
|
addCondition := func(val string) {
|
||||||
|
andConditions = append(andConditions, fmt.Sprintf("mapContains(%s, '%s') AND %s['%s'] %s %s", colName, item.Key.Key, colName, item.Key.Key, operator, val))
|
||||||
|
}
|
||||||
|
switch v := item.Value.(type) {
|
||||||
|
case string:
|
||||||
|
fmtVal := utils.ClickHouseFormattedValue(v)
|
||||||
|
addCondition(fmtVal)
|
||||||
|
case []string:
|
||||||
|
for _, val := range v {
|
||||||
|
fmtVal := utils.ClickHouseFormattedValue(val)
|
||||||
|
addCondition(fmtVal)
|
||||||
|
}
|
||||||
|
case []interface{}:
|
||||||
|
for _, val := range v {
|
||||||
|
fmtVal := utils.ClickHouseFormattedValue(val)
|
||||||
|
addCondition(fmtVal)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
whereClause := strings.Join(andConditions, " AND ")
|
||||||
|
|
||||||
|
var selectColumn string
|
||||||
|
switch req.TagType {
|
||||||
|
case v3.TagTypeResource:
|
||||||
|
selectColumn = "resource_attributes" + "['" + req.FilterAttributeKey + "']"
|
||||||
|
case v3.TagTypeTag:
|
||||||
|
selectColumn = "attributes" + "['" + req.FilterAttributeKey + "']"
|
||||||
|
default:
|
||||||
|
selectColumn = "attributes" + "['" + req.FilterAttributeKey + "']"
|
||||||
|
}
|
||||||
|
|
||||||
|
filterSubQuery := fmt.Sprintf(
|
||||||
|
"SELECT DISTINCT %s FROM %s.%s WHERE %s LIMIT 100",
|
||||||
|
selectColumn,
|
||||||
|
r.metadataDB,
|
||||||
|
r.metadataTable,
|
||||||
|
whereClause,
|
||||||
|
)
|
||||||
|
zap.L().Debug("filterSubQuery for related values", zap.String("query", filterSubQuery))
|
||||||
|
|
||||||
|
rows, err := r.db.Query(ctx, filterSubQuery)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("error while executing query: %s", err.Error())
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
var attributeValues []string
|
||||||
|
for rows.Next() {
|
||||||
|
var value string
|
||||||
|
if err := rows.Scan(&value); err != nil {
|
||||||
|
return nil, fmt.Errorf("error while scanning rows: %s", err.Error())
|
||||||
|
}
|
||||||
|
if value != "" {
|
||||||
|
attributeValues = append(attributeValues, value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return attributeValues, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (r *ClickHouseReader) GetLogAttributeValues(ctx context.Context, req *v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error) {
|
func (r *ClickHouseReader) GetLogAttributeValues(ctx context.Context, req *v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error) {
|
||||||
var err error
|
var err error
|
||||||
var filterValueColumn string
|
var filterValueColumn string
|
||||||
@ -4212,6 +4307,11 @@ func (r *ClickHouseReader) GetLogAttributeValues(ctx context.Context, req *v3.Fi
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
relatedValues, _ := r.FetchRelatedValues(ctx, req)
|
||||||
|
attributeValues.RelatedValues = &v3.FilterAttributeValueResponse{
|
||||||
|
StringAttributeValues: relatedValues,
|
||||||
|
}
|
||||||
|
|
||||||
return &attributeValues, nil
|
return &attributeValues, nil
|
||||||
|
|
||||||
}
|
}
|
||||||
@ -4892,6 +4992,11 @@ func (r *ClickHouseReader) GetTraceAttributeValues(ctx context.Context, req *v3.
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
relatedValues, _ := r.FetchRelatedValues(ctx, req)
|
||||||
|
attributeValues.RelatedValues = &v3.FilterAttributeValueResponse{
|
||||||
|
StringAttributeValues: relatedValues,
|
||||||
|
}
|
||||||
|
|
||||||
return &attributeValues, nil
|
return &attributeValues, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -380,6 +380,7 @@ func (aH *APIHandler) RegisterQueryRangeV3Routes(router *mux.Router, am *AuthMid
|
|||||||
withCacheControl(AutoCompleteCacheControlAge, aH.autoCompleteAttributeKeys))).Methods(http.MethodGet)
|
withCacheControl(AutoCompleteCacheControlAge, aH.autoCompleteAttributeKeys))).Methods(http.MethodGet)
|
||||||
subRouter.HandleFunc("/autocomplete/attribute_values", am.ViewAccess(
|
subRouter.HandleFunc("/autocomplete/attribute_values", am.ViewAccess(
|
||||||
withCacheControl(AutoCompleteCacheControlAge, aH.autoCompleteAttributeValues))).Methods(http.MethodGet)
|
withCacheControl(AutoCompleteCacheControlAge, aH.autoCompleteAttributeValues))).Methods(http.MethodGet)
|
||||||
|
subRouter.HandleFunc("/autocomplete/attribute_values", am.ViewAccess(aH.autoCompleteAttributeValuesPost)).Methods(http.MethodPost)
|
||||||
subRouter.HandleFunc("/query_range", am.ViewAccess(aH.QueryRangeV3)).Methods(http.MethodPost)
|
subRouter.HandleFunc("/query_range", am.ViewAccess(aH.QueryRangeV3)).Methods(http.MethodPost)
|
||||||
subRouter.HandleFunc("/query_range/format", am.ViewAccess(aH.QueryRangeV3Format)).Methods(http.MethodPost)
|
subRouter.HandleFunc("/query_range/format", am.ViewAccess(aH.QueryRangeV3Format)).Methods(http.MethodPost)
|
||||||
|
|
||||||
@ -4600,6 +4601,35 @@ func (aH *APIHandler) autoCompleteAttributeValues(w http.ResponseWriter, r *http
|
|||||||
aH.Respond(w, response)
|
aH.Respond(w, response)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (aH *APIHandler) autoCompleteAttributeValuesPost(w http.ResponseWriter, r *http.Request) {
|
||||||
|
var response *v3.FilterAttributeValueResponse
|
||||||
|
req, err := parseFilterAttributeValueRequestBody(r)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
switch req.DataSource {
|
||||||
|
case v3.DataSourceMetrics:
|
||||||
|
response, err = aH.reader.GetMetricAttributeValues(r.Context(), req)
|
||||||
|
case v3.DataSourceLogs:
|
||||||
|
response, err = aH.reader.GetLogAttributeValues(r.Context(), req)
|
||||||
|
case v3.DataSourceTraces:
|
||||||
|
response, err = aH.reader.GetTraceAttributeValues(r.Context(), req)
|
||||||
|
default:
|
||||||
|
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("invalid data source")}, nil)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
aH.Respond(w, response)
|
||||||
|
}
|
||||||
|
|
||||||
func (aH *APIHandler) getSpanKeysV3(ctx context.Context, queryRangeParams *v3.QueryRangeParamsV3) (map[string]v3.AttributeKey, error) {
|
func (aH *APIHandler) getSpanKeysV3(ctx context.Context, queryRangeParams *v3.QueryRangeParamsV3) (map[string]v3.AttributeKey, error) {
|
||||||
data := map[string]v3.AttributeKey{}
|
data := map[string]v3.AttributeKey{}
|
||||||
for _, query := range queryRangeParams.CompositeQuery.BuilderQueries {
|
for _, query := range queryRangeParams.CompositeQuery.BuilderQueries {
|
||||||
|
@ -769,6 +769,25 @@ func parseFilterAttributeValueRequest(r *http.Request) (*v3.FilterAttributeValue
|
|||||||
return &req, nil
|
return &req, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func parseFilterAttributeValueRequestBody(r *http.Request) (*v3.FilterAttributeValueRequest, error) {
|
||||||
|
|
||||||
|
var req v3.FilterAttributeValueRequest
|
||||||
|
|
||||||
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := req.Validate(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// offset by two windows periods for start for better results
|
||||||
|
req.StartTimeMillis = req.StartTimeMillis - time.Hour.Milliseconds()*6*2
|
||||||
|
req.EndTimeMillis = req.EndTimeMillis + time.Hour.Milliseconds()*6
|
||||||
|
|
||||||
|
return &req, nil
|
||||||
|
}
|
||||||
|
|
||||||
func validateQueryRangeParamsV3(qp *v3.QueryRangeParamsV3) error {
|
func validateQueryRangeParamsV3(qp *v3.QueryRangeParamsV3) error {
|
||||||
err := qp.CompositeQuery.Validate()
|
err := qp.CompositeQuery.Validate()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -297,6 +297,8 @@ func (q AttributeKeyDataType) String() string {
|
|||||||
// for a selected aggregate operator, aggregate attribute, filter attribute key
|
// for a selected aggregate operator, aggregate attribute, filter attribute key
|
||||||
// and search text.
|
// and search text.
|
||||||
type FilterAttributeValueRequest struct {
|
type FilterAttributeValueRequest struct {
|
||||||
|
StartTimeMillis int64 `json:"startTimeMillis"`
|
||||||
|
EndTimeMillis int64 `json:"endTimeMillis"`
|
||||||
DataSource DataSource `json:"dataSource"`
|
DataSource DataSource `json:"dataSource"`
|
||||||
AggregateOperator AggregateOperator `json:"aggregateOperator"`
|
AggregateOperator AggregateOperator `json:"aggregateOperator"`
|
||||||
AggregateAttribute string `json:"aggregateAttribute"`
|
AggregateAttribute string `json:"aggregateAttribute"`
|
||||||
@ -305,6 +307,50 @@ type FilterAttributeValueRequest struct {
|
|||||||
TagType TagType `json:"tagType"`
|
TagType TagType `json:"tagType"`
|
||||||
SearchText string `json:"searchText"`
|
SearchText string `json:"searchText"`
|
||||||
Limit int `json:"limit"`
|
Limit int `json:"limit"`
|
||||||
|
ExistingFilterItems []FilterItem `json:"existingFilterItems"`
|
||||||
|
MetricNames []string `json:"metricNames"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *FilterAttributeValueRequest) Validate() error {
|
||||||
|
if f.FilterAttributeKey == "" {
|
||||||
|
return fmt.Errorf("filterAttributeKey is required")
|
||||||
|
}
|
||||||
|
|
||||||
|
if f.StartTimeMillis == 0 {
|
||||||
|
return fmt.Errorf("startTimeMillis is required")
|
||||||
|
}
|
||||||
|
|
||||||
|
if f.EndTimeMillis == 0 {
|
||||||
|
return fmt.Errorf("endTimeMillis is required")
|
||||||
|
}
|
||||||
|
|
||||||
|
if f.Limit == 0 {
|
||||||
|
f.Limit = 100
|
||||||
|
}
|
||||||
|
|
||||||
|
if f.Limit > 1000 {
|
||||||
|
return fmt.Errorf("limit must be less than 1000")
|
||||||
|
}
|
||||||
|
|
||||||
|
if f.ExistingFilterItems != nil {
|
||||||
|
for _, value := range f.ExistingFilterItems {
|
||||||
|
if value.Key.Key == "" {
|
||||||
|
return fmt.Errorf("existingFilterItems must contain a valid key")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := f.DataSource.Validate(); err != nil {
|
||||||
|
return fmt.Errorf("invalid data source: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if f.DataSource != DataSourceMetrics {
|
||||||
|
if err := f.AggregateOperator.Validate(); err != nil {
|
||||||
|
return fmt.Errorf("invalid aggregate operator: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type AggregateAttributeResponse struct {
|
type AggregateAttributeResponse struct {
|
||||||
@ -369,6 +415,8 @@ type FilterAttributeValueResponse struct {
|
|||||||
StringAttributeValues []string `json:"stringAttributeValues"`
|
StringAttributeValues []string `json:"stringAttributeValues"`
|
||||||
NumberAttributeValues []interface{} `json:"numberAttributeValues"`
|
NumberAttributeValues []interface{} `json:"numberAttributeValues"`
|
||||||
BoolAttributeValues []bool `json:"boolAttributeValues"`
|
BoolAttributeValues []bool `json:"boolAttributeValues"`
|
||||||
|
|
||||||
|
RelatedValues *FilterAttributeValueResponse `json:"relatedValues"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type QueryRangeParamsV3 struct {
|
type QueryRangeParamsV3 struct {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user