mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-08-15 14:55:52 +08:00
chore: update hosts list to use pre-aggregated data table dynamically (#6227)
This commit is contained in:
parent
f2e33d7ca9
commit
6448fb17e7
65
pkg/query-service/app/inframetrics/common.go
Normal file
65
pkg/query-service/app/inframetrics/common.go
Normal file
@ -0,0 +1,65 @@
|
|||||||
|
package inframetrics
|
||||||
|
|
||||||
|
import (
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"go.signoz.io/signoz/pkg/query-service/constants"
|
||||||
|
"go.signoz.io/signoz/pkg/query-service/model"
|
||||||
|
)
|
||||||
|
|
||||||
|
// getParamsForTopItems returns the step, time series table name and samples table name
|
||||||
|
// for the top items query. what are we doing here?
|
||||||
|
// we want to identify the top hosts/pods/nodes quickly, so we use pre-aggregated data
|
||||||
|
// for samples and time series tables to speed up the query
|
||||||
|
// the speed of the query depends on the number of values in group by clause, the higher
|
||||||
|
// the step interval, the faster the query will be as number of rows to group by is reduced
|
||||||
|
// here we are using the averaged value of the time series data to get the top items
|
||||||
|
func getParamsForTopItems(start, end int64) (int64, string, string) {
|
||||||
|
var step int64
|
||||||
|
var timeSeriesTableName string
|
||||||
|
var samplesTableName string
|
||||||
|
|
||||||
|
if end-start < time.Hour.Milliseconds() {
|
||||||
|
// 5 minute aggregation for any query less than 1 hour
|
||||||
|
step = 5 * 60
|
||||||
|
timeSeriesTableName = constants.SIGNOZ_TIMESERIES_v4_LOCAL_TABLENAME
|
||||||
|
samplesTableName = constants.SIGNOZ_SAMPLES_V4_AGG_5M_TABLENAME
|
||||||
|
} else if end-start < time.Hour.Milliseconds()*6 {
|
||||||
|
// 15 minute aggregation for any query less than 6 hours
|
||||||
|
step = 15 * 60
|
||||||
|
timeSeriesTableName = constants.SIGNOZ_TIMESERIES_v4_6HRS_LOCAL_TABLENAME
|
||||||
|
samplesTableName = constants.SIGNOZ_SAMPLES_V4_AGG_5M_TABLENAME
|
||||||
|
} else if end-start < time.Hour.Milliseconds()*24 {
|
||||||
|
// 1 hour aggregation for any query less than 1 day
|
||||||
|
step = 60 * 60
|
||||||
|
timeSeriesTableName = constants.SIGNOZ_TIMESERIES_v4_1DAY_LOCAL_TABLENAME
|
||||||
|
samplesTableName = constants.SIGNOZ_SAMPLES_V4_AGG_30M_TABLENAME
|
||||||
|
} else if end-start < time.Hour.Milliseconds()*7 {
|
||||||
|
// 6 hours aggregation for any query less than 1 week
|
||||||
|
step = 6 * 60 * 60
|
||||||
|
timeSeriesTableName = constants.SIGNOZ_TIMESERIES_v4_1WEEK_LOCAL_TABLENAME
|
||||||
|
samplesTableName = constants.SIGNOZ_SAMPLES_V4_AGG_30M_TABLENAME
|
||||||
|
} else {
|
||||||
|
// 12 hours aggregation for any query greater than 1 week
|
||||||
|
step = 12 * 60 * 60
|
||||||
|
timeSeriesTableName = constants.SIGNOZ_TIMESERIES_v4_1WEEK_LOCAL_TABLENAME
|
||||||
|
samplesTableName = constants.SIGNOZ_SAMPLES_V4_AGG_30M_TABLENAME
|
||||||
|
}
|
||||||
|
return step, timeSeriesTableName, samplesTableName
|
||||||
|
}
|
||||||
|
|
||||||
|
func getParamsForTopHosts(req model.HostListRequest) (int64, string, string) {
|
||||||
|
return getParamsForTopItems(req.Start, req.End)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO(srikanthccv): remove this
|
||||||
|
// What is happening here?
|
||||||
|
// The `PrepareTimeseriesFilterQuery` uses the local time series table for sub-query because each fingerprint
|
||||||
|
// goes to same shard.
|
||||||
|
// However, in this case, we are interested in the attributes values across all the shards.
|
||||||
|
// So, we replace the local time series table with the distributed time series table.
|
||||||
|
// See `PrepareTimeseriesFilterQuery` for more details.
|
||||||
|
func localQueryToDistributedQuery(query string) string {
|
||||||
|
return strings.Replace(query, ".time_series_v4", ".distributed_time_series_v4", 1)
|
||||||
|
}
|
@ -2,12 +2,10 @@ package inframetrics
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"sort"
|
||||||
"math"
|
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"go.signoz.io/signoz/pkg/query-service/app/metrics/v4/helpers"
|
|
||||||
"go.signoz.io/signoz/pkg/query-service/common"
|
"go.signoz.io/signoz/pkg/query-service/common"
|
||||||
"go.signoz.io/signoz/pkg/query-service/interfaces"
|
"go.signoz.io/signoz/pkg/query-service/interfaces"
|
||||||
"go.signoz.io/signoz/pkg/query-service/model"
|
"go.signoz.io/signoz/pkg/query-service/model"
|
||||||
@ -21,23 +19,45 @@ type HostsRepo struct {
|
|||||||
querierV2 interfaces.Querier
|
querierV2 interfaces.Querier
|
||||||
}
|
}
|
||||||
|
|
||||||
var pointAttrsToIgnore = []string{
|
var (
|
||||||
"state",
|
// we don't have a way to get the resource attributes from the current time series table
|
||||||
"cpu",
|
// but we only want to suggest resource attributes for system metrics,
|
||||||
"device",
|
// this is a list of attributes that we skip from all labels as they are data point attributes
|
||||||
"direction",
|
// TODO(srikanthccv): remove this once we have a way to get resource attributes
|
||||||
"mode",
|
|
||||||
"mountpoint",
|
pointAttrsToIgnore = []string{
|
||||||
"type",
|
"state",
|
||||||
"process.cgroup",
|
"cpu",
|
||||||
"process.command",
|
"device",
|
||||||
"process.command_line",
|
"direction",
|
||||||
"process.executable.name",
|
"mode",
|
||||||
"process.executable.path",
|
"mountpoint",
|
||||||
"process.owner",
|
"type",
|
||||||
"process.parent_pid",
|
"os_type",
|
||||||
"process.pid",
|
"process_cgroup",
|
||||||
}
|
"process_command",
|
||||||
|
"process_command_line",
|
||||||
|
"process_executable_name",
|
||||||
|
"process_executable_path",
|
||||||
|
"process_owner",
|
||||||
|
"process_parent_pid",
|
||||||
|
"process_pid",
|
||||||
|
}
|
||||||
|
|
||||||
|
queryNamesForTopHosts = map[string][]string{
|
||||||
|
"cpu": {"A", "B", "F1"},
|
||||||
|
"memory": {"C", "D", "F2"},
|
||||||
|
"wait": {"E", "F", "F3"},
|
||||||
|
"load15": {"G"},
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO(srikanthccv): remove hardcoded metric name and support keys from any system metric
|
||||||
|
metricToUseForHostAttributes = "system_cpu_load_average_15m"
|
||||||
|
hostNameAttrKey = "host_name"
|
||||||
|
// TODO(srikanthccv): remove k8s hacky logic from hosts repo after charts users are migrated
|
||||||
|
k8sNodeNameAttrKey = "k8s_node_name"
|
||||||
|
agentNameToIgnore = "k8s-infra-otel-agent"
|
||||||
|
)
|
||||||
|
|
||||||
func NewHostsRepo(reader interfaces.Reader, querierV2 interfaces.Querier) *HostsRepo {
|
func NewHostsRepo(reader interfaces.Reader, querierV2 interfaces.Querier) *HostsRepo {
|
||||||
return &HostsRepo{reader: reader, querierV2: querierV2}
|
return &HostsRepo{reader: reader, querierV2: querierV2}
|
||||||
@ -46,7 +66,7 @@ func NewHostsRepo(reader interfaces.Reader, querierV2 interfaces.Querier) *Hosts
|
|||||||
func (h *HostsRepo) GetHostAttributeKeys(ctx context.Context, req v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) {
|
func (h *HostsRepo) GetHostAttributeKeys(ctx context.Context, req v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) {
|
||||||
// TODO(srikanthccv): remove hardcoded metric name and support keys from any system metric
|
// TODO(srikanthccv): remove hardcoded metric name and support keys from any system metric
|
||||||
req.DataSource = v3.DataSourceMetrics
|
req.DataSource = v3.DataSourceMetrics
|
||||||
req.AggregateAttribute = "system_cpu_load_average_15m"
|
req.AggregateAttribute = metricToUseForHostAttributes
|
||||||
if req.Limit == 0 {
|
if req.Limit == 0 {
|
||||||
req.Limit = 50
|
req.Limit = 50
|
||||||
}
|
}
|
||||||
@ -71,7 +91,7 @@ func (h *HostsRepo) GetHostAttributeKeys(ctx context.Context, req v3.FilterAttri
|
|||||||
|
|
||||||
func (h *HostsRepo) GetHostAttributeValues(ctx context.Context, req v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error) {
|
func (h *HostsRepo) GetHostAttributeValues(ctx context.Context, req v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error) {
|
||||||
req.DataSource = v3.DataSourceMetrics
|
req.DataSource = v3.DataSourceMetrics
|
||||||
req.AggregateAttribute = "system_cpu_load_average_15m"
|
req.AggregateAttribute = metricToUseForHostAttributes
|
||||||
if req.Limit == 0 {
|
if req.Limit == 0 {
|
||||||
req.Limit = 50
|
req.Limit = 50
|
||||||
}
|
}
|
||||||
@ -80,21 +100,21 @@ func (h *HostsRepo) GetHostAttributeValues(ctx context.Context, req v3.FilterAtt
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if req.FilterAttributeKey != "host_name" {
|
if req.FilterAttributeKey != hostNameAttrKey {
|
||||||
return attributeValuesResponse, nil
|
return attributeValuesResponse, nil
|
||||||
}
|
}
|
||||||
hostNames := []string{}
|
hostNames := []string{}
|
||||||
|
|
||||||
for _, attributeValue := range attributeValuesResponse.StringAttributeValues {
|
for _, attributeValue := range attributeValuesResponse.StringAttributeValues {
|
||||||
if strings.Contains(attributeValue, "k8s-infra-otel-agent") {
|
if strings.Contains(attributeValue, agentNameToIgnore) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
hostNames = append(hostNames, attributeValue)
|
hostNames = append(hostNames, attributeValue)
|
||||||
}
|
}
|
||||||
|
|
||||||
req.FilterAttributeKey = "k8s_node_name"
|
req.FilterAttributeKey = k8sNodeNameAttrKey
|
||||||
req.DataSource = v3.DataSourceMetrics
|
req.DataSource = v3.DataSourceMetrics
|
||||||
req.AggregateAttribute = "system_cpu_load_average_15m"
|
req.AggregateAttribute = metricToUseForHostAttributes
|
||||||
if req.Limit == 0 {
|
if req.Limit == 0 {
|
||||||
req.Limit = 50
|
req.Limit = 50
|
||||||
}
|
}
|
||||||
@ -104,7 +124,7 @@ func (h *HostsRepo) GetHostAttributeValues(ctx context.Context, req v3.FilterAtt
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
for _, attributeValue := range attributeValuesResponse.StringAttributeValues {
|
for _, attributeValue := range attributeValuesResponse.StringAttributeValues {
|
||||||
if strings.Contains(attributeValue, "k8s-infra-otel-agent") {
|
if strings.Contains(attributeValue, agentNameToIgnore) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
hostNames = append(hostNames, attributeValue)
|
hostNames = append(hostNames, attributeValue)
|
||||||
@ -113,78 +133,6 @@ func (h *HostsRepo) GetHostAttributeValues(ctx context.Context, req v3.FilterAtt
|
|||||||
return &v3.FilterAttributeValueResponse{StringAttributeValues: hostNames}, nil
|
return &v3.FilterAttributeValueResponse{StringAttributeValues: hostNames}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func getGroupKey(record model.HostListRecord, groupBy []v3.AttributeKey) string {
|
|
||||||
groupKey := ""
|
|
||||||
for _, key := range groupBy {
|
|
||||||
groupKey += fmt.Sprintf("%s=%s,", key.Key, record.Meta[key.Key])
|
|
||||||
}
|
|
||||||
return groupKey
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *HostsRepo) getMetadataAttributes(ctx context.Context,
|
|
||||||
req model.HostListRequest, hostNameAttrKey string) (map[string]map[string]string, error) {
|
|
||||||
hostAttrs := map[string]map[string]string{}
|
|
||||||
|
|
||||||
hasHostName := false
|
|
||||||
for _, key := range req.GroupBy {
|
|
||||||
if key.Key == hostNameAttrKey {
|
|
||||||
hasHostName = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if !hasHostName {
|
|
||||||
req.GroupBy = append(req.GroupBy, v3.AttributeKey{Key: hostNameAttrKey})
|
|
||||||
}
|
|
||||||
|
|
||||||
mq := v3.BuilderQuery{
|
|
||||||
AggregateAttribute: v3.AttributeKey{
|
|
||||||
Key: "system_cpu_load_average_15m",
|
|
||||||
DataType: v3.AttributeKeyDataTypeFloat64,
|
|
||||||
},
|
|
||||||
Temporality: v3.Unspecified,
|
|
||||||
GroupBy: req.GroupBy,
|
|
||||||
}
|
|
||||||
query, err := helpers.PrepareTimeseriesFilterQuery(req.Start, req.End, &mq)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO(srikanthccv): remove this
|
|
||||||
// What is happening here?
|
|
||||||
// The `PrepareTimeseriesFilterQuery` uses the local time series table for sub-query because each fingerprint
|
|
||||||
// goes to same shard.
|
|
||||||
// However, in this case, we are interested in the attributes values across all the shards.
|
|
||||||
// So, we replace the local time series table with the distributed time series table.
|
|
||||||
// See `PrepareTimeseriesFilterQuery` for more details.
|
|
||||||
query = strings.Replace(query, ".time_series_v4", ".distributed_time_series_v4", 1)
|
|
||||||
|
|
||||||
attrsListResponse, err := h.reader.GetListResultV3(ctx, query)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, row := range attrsListResponse {
|
|
||||||
stringData := map[string]string{}
|
|
||||||
for key, value := range row.Data {
|
|
||||||
if str, ok := value.(string); ok {
|
|
||||||
stringData[key] = str
|
|
||||||
} else if strPtr, ok := value.(*string); ok {
|
|
||||||
stringData[key] = *strPtr
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
hostName := stringData[hostNameAttrKey]
|
|
||||||
if _, ok := hostAttrs[hostName]; !ok {
|
|
||||||
hostAttrs[hostName] = map[string]string{}
|
|
||||||
}
|
|
||||||
for _, key := range req.GroupBy {
|
|
||||||
hostAttrs[hostName][key.Key] = stringData[key.Key]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return hostAttrs, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *HostsRepo) getActiveHosts(ctx context.Context,
|
func (h *HostsRepo) getActiveHosts(ctx context.Context,
|
||||||
req model.HostListRequest, hostNameAttrKey string) (map[string]bool, error) {
|
req model.HostListRequest, hostNameAttrKey string) (map[string]bool, error) {
|
||||||
activeStatus := map[string]bool{}
|
activeStatus := map[string]bool{}
|
||||||
@ -202,7 +150,7 @@ func (h *HostsRepo) getActiveHosts(ctx context.Context,
|
|||||||
}
|
}
|
||||||
|
|
||||||
params := v3.QueryRangeParamsV3{
|
params := v3.QueryRangeParamsV3{
|
||||||
Start: time.Now().Add(-time.Hour).UTC().UnixMilli(),
|
Start: time.Now().Add(-time.Minute * 10).UTC().UnixMilli(),
|
||||||
End: time.Now().UTC().UnixMilli(),
|
End: time.Now().UTC().UnixMilli(),
|
||||||
Step: step,
|
Step: step,
|
||||||
CompositeQuery: &v3.CompositeQuery{
|
CompositeQuery: &v3.CompositeQuery{
|
||||||
@ -212,7 +160,7 @@ func (h *HostsRepo) getActiveHosts(ctx context.Context,
|
|||||||
StepInterval: step,
|
StepInterval: step,
|
||||||
DataSource: v3.DataSourceMetrics,
|
DataSource: v3.DataSourceMetrics,
|
||||||
AggregateAttribute: v3.AttributeKey{
|
AggregateAttribute: v3.AttributeKey{
|
||||||
Key: "system_cpu_load_average_15m",
|
Key: metricToUseForHostAttributes,
|
||||||
DataType: v3.AttributeKeyDataTypeFloat64,
|
DataType: v3.AttributeKeyDataTypeFloat64,
|
||||||
},
|
},
|
||||||
Temporality: v3.Unspecified,
|
Temporality: v3.Unspecified,
|
||||||
@ -244,25 +192,103 @@ func (h *HostsRepo) getActiveHosts(ctx context.Context,
|
|||||||
return activeStatus, nil
|
return activeStatus, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// getTopHosts returns the top hosts for the given order by column name
|
||||||
|
func (h *HostsRepo) getTopHosts(ctx context.Context, req model.HostListRequest, q *v3.QueryRangeParamsV3, hostNameAttrKey string) ([]string, []string, error) {
|
||||||
|
step, timeSeriesTableName, samplesTableName := getParamsForTopHosts(req)
|
||||||
|
|
||||||
|
queryNames := queryNamesForTopHosts[req.OrderBy.ColumnName]
|
||||||
|
topHostsQueryRangeParams := &v3.QueryRangeParamsV3{
|
||||||
|
Start: req.Start,
|
||||||
|
End: req.End,
|
||||||
|
Step: step,
|
||||||
|
CompositeQuery: &v3.CompositeQuery{
|
||||||
|
BuilderQueries: map[string]*v3.BuilderQuery{},
|
||||||
|
QueryType: v3.QueryTypeBuilder,
|
||||||
|
PanelType: v3.PanelTypeTable,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, queryName := range queryNames {
|
||||||
|
query := q.CompositeQuery.BuilderQueries[queryName].Clone()
|
||||||
|
query.StepInterval = step
|
||||||
|
query.MetricTableHints = &v3.MetricTableHints{
|
||||||
|
TimeSeriesTableName: timeSeriesTableName,
|
||||||
|
SamplesTableName: samplesTableName,
|
||||||
|
}
|
||||||
|
if req.Filters != nil && len(req.Filters.Items) > 0 {
|
||||||
|
if query.Filters == nil {
|
||||||
|
query.Filters = &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{}}
|
||||||
|
}
|
||||||
|
query.Filters.Items = append(query.Filters.Items, req.Filters.Items...)
|
||||||
|
}
|
||||||
|
topHostsQueryRangeParams.CompositeQuery.BuilderQueries[queryName] = query
|
||||||
|
}
|
||||||
|
|
||||||
|
queryResponse, _, err := h.querierV2.QueryRange(ctx, topHostsQueryRangeParams)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
formattedResponse, err := postprocess.PostProcessResult(queryResponse, topHostsQueryRangeParams)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(formattedResponse) == 0 || len(formattedResponse[0].Series) == 0 {
|
||||||
|
return nil, nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if req.OrderBy.Order == v3.DirectionDesc {
|
||||||
|
sort.Slice(formattedResponse[0].Series, func(i, j int) bool {
|
||||||
|
return formattedResponse[0].Series[i].Points[0].Value > formattedResponse[0].Series[j].Points[0].Value
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
sort.Slice(formattedResponse[0].Series, func(i, j int) bool {
|
||||||
|
return formattedResponse[0].Series[i].Points[0].Value < formattedResponse[0].Series[j].Points[0].Value
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
paginatedTopHostsSeries := formattedResponse[0].Series[req.Offset : req.Offset+req.Limit]
|
||||||
|
|
||||||
|
topHosts := []string{}
|
||||||
|
for _, series := range paginatedTopHostsSeries {
|
||||||
|
topHosts = append(topHosts, series.Labels[hostNameAttrKey])
|
||||||
|
}
|
||||||
|
allHosts := []string{}
|
||||||
|
for _, series := range formattedResponse[0].Series {
|
||||||
|
allHosts = append(allHosts, series.Labels[hostNameAttrKey])
|
||||||
|
}
|
||||||
|
|
||||||
|
return topHosts, allHosts, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (h *HostsRepo) getHostsForQuery(ctx context.Context,
|
func (h *HostsRepo) getHostsForQuery(ctx context.Context,
|
||||||
req model.HostListRequest, q *v3.QueryRangeParamsV3, hostNameAttrKey string) ([]model.HostListRecord, error) {
|
req model.HostListRequest, q *v3.QueryRangeParamsV3, hostNameAttrKey string) ([]model.HostListRecord, []string, error) {
|
||||||
|
|
||||||
step := common.MinAllowedStepInterval(req.Start, req.End)
|
step := common.MinAllowedStepInterval(req.Start, req.End)
|
||||||
|
|
||||||
query := q.Clone()
|
query := q.Clone()
|
||||||
if req.OrderBy != nil {
|
|
||||||
for _, q := range query.CompositeQuery.BuilderQueries {
|
|
||||||
q.OrderBy = []v3.OrderBy{*req.OrderBy}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
query.Start = req.Start
|
query.Start = req.Start
|
||||||
query.End = req.End
|
query.End = req.End
|
||||||
query.Step = step
|
query.Step = step
|
||||||
|
|
||||||
|
topHosts, allHosts, err := h.getTopHosts(ctx, req, q, hostNameAttrKey)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
for _, query := range query.CompositeQuery.BuilderQueries {
|
for _, query := range query.CompositeQuery.BuilderQueries {
|
||||||
query.StepInterval = step
|
query.StepInterval = step
|
||||||
|
// check if the filter has host_name and is either IN or EQUAL operator
|
||||||
|
// if so, we don't need to add the topHosts filter again
|
||||||
|
hasHostNameInOrEqual := false
|
||||||
|
|
||||||
if req.Filters != nil && len(req.Filters.Items) > 0 {
|
if req.Filters != nil && len(req.Filters.Items) > 0 {
|
||||||
|
for _, item := range req.Filters.Items {
|
||||||
|
if item.Key.Key == hostNameAttrKey && (item.Operator == v3.FilterOperatorIn || item.Operator == v3.FilterOperatorEqual) {
|
||||||
|
hasHostNameInOrEqual = true
|
||||||
|
}
|
||||||
|
}
|
||||||
if query.Filters == nil {
|
if query.Filters == nil {
|
||||||
query.Filters = &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{}}
|
query.Filters = &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{}}
|
||||||
}
|
}
|
||||||
@ -270,29 +296,36 @@ func (h *HostsRepo) getHostsForQuery(ctx context.Context,
|
|||||||
// what is happening here?
|
// what is happening here?
|
||||||
// if the filter has host_name and we are querying for k8s host metrics,
|
// if the filter has host_name and we are querying for k8s host metrics,
|
||||||
// we need to replace the host_name with k8s_node_name
|
// we need to replace the host_name with k8s_node_name
|
||||||
if hostNameAttrKey == "k8s_node_name" {
|
if hostNameAttrKey == k8sNodeNameAttrKey {
|
||||||
for idx, item := range query.Filters.Items {
|
for idx, item := range query.Filters.Items {
|
||||||
if item.Key.Key == "host_name" {
|
if item.Key.Key == hostNameAttrKey {
|
||||||
query.Filters.Items[idx].Key.Key = "k8s_node_name"
|
query.Filters.Items[idx].Key.Key = k8sNodeNameAttrKey
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
if !hasHostNameInOrEqual {
|
||||||
|
if query.Filters == nil {
|
||||||
hostAttrs, err := h.getMetadataAttributes(ctx, req, hostNameAttrKey)
|
query.Filters = &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{}}
|
||||||
if err != nil {
|
}
|
||||||
return nil, err
|
query.Filters.Items = append(query.Filters.Items, v3.FilterItem{
|
||||||
|
Key: v3.AttributeKey{
|
||||||
|
Key: hostNameAttrKey,
|
||||||
|
},
|
||||||
|
Value: topHosts,
|
||||||
|
Operator: v3.FilterOperatorIn,
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
activeHosts, err := h.getActiveHosts(ctx, req, hostNameAttrKey)
|
activeHosts, err := h.getActiveHosts(ctx, req, hostNameAttrKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
queryResponse, _, err := h.querierV2.QueryRange(ctx, query)
|
queryResponse, _, err := h.querierV2.QueryRange(ctx, query)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
type hostTSInfo struct {
|
type hostTSInfo struct {
|
||||||
@ -321,7 +354,7 @@ func (h *HostsRepo) getHostsForQuery(ctx context.Context,
|
|||||||
|
|
||||||
formulaResult, err := postprocess.PostProcessResult(queryResponse, query)
|
formulaResult, err := postprocess.PostProcessResult(queryResponse, query)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, result := range formulaResult {
|
for _, result := range formulaResult {
|
||||||
@ -383,7 +416,6 @@ func (h *HostsRepo) getHostsForQuery(ctx context.Context,
|
|||||||
if ok {
|
if ok {
|
||||||
record.Load15 = load15
|
record.Load15 = load15
|
||||||
}
|
}
|
||||||
record.Meta = hostAttrs[record.HostName]
|
|
||||||
record.Active = activeHosts[record.HostName]
|
record.Active = activeHosts[record.HostName]
|
||||||
if hostTSInfoMap[record.HostName] != nil {
|
if hostTSInfoMap[record.HostName] != nil {
|
||||||
record.CPUTimeSeries = hostTSInfoMap[record.HostName].cpuTimeSeries
|
record.CPUTimeSeries = hostTSInfoMap[record.HostName].cpuTimeSeries
|
||||||
@ -394,7 +426,7 @@ func (h *HostsRepo) getHostsForQuery(ctx context.Context,
|
|||||||
records = append(records, record)
|
records = append(records, record)
|
||||||
}
|
}
|
||||||
|
|
||||||
return records, nil
|
return records, allHosts, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func dedupRecords(records []model.HostListRecord) []model.HostListRecord {
|
func dedupRecords(records []model.HostListRecord) []model.HostListRecord {
|
||||||
@ -414,104 +446,40 @@ func (h *HostsRepo) GetHostList(ctx context.Context, req model.HostListRequest)
|
|||||||
req.Limit = 10
|
req.Limit = 10
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if req.OrderBy == nil {
|
||||||
|
req.OrderBy = &v3.OrderBy{ColumnName: "cpu", Order: v3.DirectionDesc}
|
||||||
|
}
|
||||||
|
|
||||||
resp := model.HostListResponse{
|
resp := model.HostListResponse{
|
||||||
Type: "list",
|
Type: "list",
|
||||||
}
|
}
|
||||||
|
|
||||||
vmRecords, err := h.getHostsForQuery(ctx, req, &NonK8STableListQuery, "host_name")
|
vmRecords, vmAllHosts, err := h.getHostsForQuery(ctx, req, &NonK8STableListQuery, hostNameAttrKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return resp, err
|
return resp, err
|
||||||
}
|
}
|
||||||
k8sRecords, err := h.getHostsForQuery(ctx, req, &K8STableListQuery, "k8s_node_name")
|
k8sRecords, k8sAllHosts, err := h.getHostsForQuery(ctx, req, &K8STableListQuery, k8sNodeNameAttrKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return resp, err
|
return resp, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
uniqueHosts := map[string]bool{}
|
||||||
|
for _, host := range vmAllHosts {
|
||||||
|
uniqueHosts[host] = true
|
||||||
|
}
|
||||||
|
for _, host := range k8sAllHosts {
|
||||||
|
uniqueHosts[host] = true
|
||||||
|
}
|
||||||
|
|
||||||
records := append(vmRecords, k8sRecords...)
|
records := append(vmRecords, k8sRecords...)
|
||||||
|
|
||||||
// since we added the fix for incorrect host name, it is possible that both host_name and k8s_node_name
|
// since we added the fix for incorrect host name, it is possible that both host_name and k8s_node_name
|
||||||
// are present in the response. we need to dedup the results.
|
// are present in the response. we need to dedup the results.
|
||||||
records = dedupRecords(records)
|
records = dedupRecords(records)
|
||||||
|
|
||||||
resp.Total = len(records)
|
resp.Total = len(uniqueHosts)
|
||||||
|
|
||||||
if req.Offset > 0 {
|
|
||||||
records = records[req.Offset:]
|
|
||||||
}
|
|
||||||
if req.Limit > 0 && len(records) > req.Limit {
|
|
||||||
records = records[:req.Limit]
|
|
||||||
}
|
|
||||||
resp.Records = records
|
resp.Records = records
|
||||||
|
|
||||||
if len(req.GroupBy) > 0 {
|
|
||||||
groups := []model.HostListGroup{}
|
|
||||||
|
|
||||||
groupMap := make(map[string][]model.HostListRecord)
|
|
||||||
for _, record := range records {
|
|
||||||
groupKey := getGroupKey(record, req.GroupBy)
|
|
||||||
if _, ok := groupMap[groupKey]; !ok {
|
|
||||||
groupMap[groupKey] = []model.HostListRecord{record}
|
|
||||||
} else {
|
|
||||||
groupMap[groupKey] = append(groupMap[groupKey], record)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// calculate the group stats, active hosts, etc.
|
|
||||||
for _, records := range groupMap {
|
|
||||||
var avgCPU, avgMemory, avgWait, avgLoad15 float64
|
|
||||||
var validCPU, validMemory, validWait, validLoad15, activeHosts int
|
|
||||||
for _, record := range records {
|
|
||||||
if !math.IsNaN(record.CPU) {
|
|
||||||
avgCPU += record.CPU
|
|
||||||
validCPU++
|
|
||||||
}
|
|
||||||
if !math.IsNaN(record.Memory) {
|
|
||||||
avgMemory += record.Memory
|
|
||||||
validMemory++
|
|
||||||
}
|
|
||||||
if !math.IsNaN(record.Wait) {
|
|
||||||
avgWait += record.Wait
|
|
||||||
validWait++
|
|
||||||
}
|
|
||||||
if !math.IsNaN(record.Load15) {
|
|
||||||
avgLoad15 += record.Load15
|
|
||||||
validLoad15++
|
|
||||||
}
|
|
||||||
if record.Active {
|
|
||||||
activeHosts++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
avgCPU /= float64(validCPU)
|
|
||||||
avgMemory /= float64(validMemory)
|
|
||||||
avgWait /= float64(validWait)
|
|
||||||
avgLoad15 /= float64(validLoad15)
|
|
||||||
inactiveHosts := len(records) - activeHosts
|
|
||||||
|
|
||||||
// take any record and make it as the group meta
|
|
||||||
firstRecord := records[0]
|
|
||||||
var groupValues []string
|
|
||||||
for _, key := range req.GroupBy {
|
|
||||||
groupValues = append(groupValues, firstRecord.Meta[key.Key])
|
|
||||||
}
|
|
||||||
hostNames := []string{}
|
|
||||||
for _, record := range records {
|
|
||||||
hostNames = append(hostNames, record.HostName)
|
|
||||||
}
|
|
||||||
|
|
||||||
groups = append(groups, model.HostListGroup{
|
|
||||||
GroupValues: groupValues,
|
|
||||||
Active: activeHosts,
|
|
||||||
Inactive: inactiveHosts,
|
|
||||||
GroupCPUAvg: avgCPU,
|
|
||||||
GroupMemoryAvg: avgMemory,
|
|
||||||
GroupWaitAvg: avgWait,
|
|
||||||
GroupLoad15Avg: avgLoad15,
|
|
||||||
HostNames: hostNames,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
resp.Groups = groups
|
|
||||||
resp.Type = "grouped_list"
|
|
||||||
}
|
|
||||||
|
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
@ -16,8 +16,30 @@ var (
|
|||||||
oneWeekInMilliseconds = oneDayInMilliseconds * 7
|
oneWeekInMilliseconds = oneDayInMilliseconds * 7
|
||||||
)
|
)
|
||||||
|
|
||||||
// start and end are in milliseconds
|
func whichTSTableToUse(start, end int64, mq *v3.BuilderQuery) (int64, int64, string) {
|
||||||
func whichTSTableToUse(start, end int64) (int64, int64, string) {
|
|
||||||
|
// if we have a hint for the table, we need to use it
|
||||||
|
// the hint will be used to override the default table selection logic
|
||||||
|
if mq.MetricTableHints != nil {
|
||||||
|
if mq.MetricTableHints.TimeSeriesTableName != "" {
|
||||||
|
switch mq.MetricTableHints.TimeSeriesTableName {
|
||||||
|
case constants.SIGNOZ_TIMESERIES_v4_LOCAL_TABLENAME:
|
||||||
|
// adjust the start time to nearest 1 hour
|
||||||
|
start = start - (start % (time.Hour.Milliseconds() * 1))
|
||||||
|
case constants.SIGNOZ_TIMESERIES_v4_6HRS_LOCAL_TABLENAME:
|
||||||
|
// adjust the start time to nearest 6 hours
|
||||||
|
start = start - (start % (time.Hour.Milliseconds() * 6))
|
||||||
|
case constants.SIGNOZ_TIMESERIES_v4_1DAY_LOCAL_TABLENAME:
|
||||||
|
// adjust the start time to nearest 1 day
|
||||||
|
start = start - (start % (time.Hour.Milliseconds() * 24))
|
||||||
|
case constants.SIGNOZ_TIMESERIES_v4_1WEEK_LOCAL_TABLENAME:
|
||||||
|
// adjust the start time to nearest 1 week
|
||||||
|
start = start - (start % (time.Hour.Milliseconds() * 24 * 7))
|
||||||
|
}
|
||||||
|
return start, end, mq.MetricTableHints.TimeSeriesTableName
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// If time range is less than 6 hours, we need to use the `time_series_v4` table
|
// If time range is less than 6 hours, we need to use the `time_series_v4` table
|
||||||
// else if time range is less than 1 day and greater than 6 hours, we need to use the `time_series_v4_6hrs` table
|
// else if time range is less than 1 day and greater than 6 hours, we need to use the `time_series_v4_6hrs` table
|
||||||
// else if time range is less than 1 week and greater than 1 day, we need to use the `time_series_v4_1day` table
|
// else if time range is less than 1 week and greater than 1 day, we need to use the `time_series_v4_1day` table
|
||||||
@ -58,6 +80,14 @@ func whichTSTableToUse(start, end int64) (int64, int64, string) {
|
|||||||
// if the `timeAggregation` is `count_distinct` we can't use the aggregated tables because they don't support it
|
// if the `timeAggregation` is `count_distinct` we can't use the aggregated tables because they don't support it
|
||||||
func WhichSamplesTableToUse(start, end int64, mq *v3.BuilderQuery) string {
|
func WhichSamplesTableToUse(start, end int64, mq *v3.BuilderQuery) string {
|
||||||
|
|
||||||
|
// if we have a hint for the table, we need to use it
|
||||||
|
// the hint will be used to override the default table selection logic
|
||||||
|
if mq.MetricTableHints != nil {
|
||||||
|
if mq.MetricTableHints.SamplesTableName != "" {
|
||||||
|
return mq.MetricTableHints.SamplesTableName
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// we don't have any aggregated table for sketches (yet)
|
// we don't have any aggregated table for sketches (yet)
|
||||||
if mq.AggregateAttribute.Type == v3.AttributeKeyType(v3.MetricTypeExponentialHistogram) {
|
if mq.AggregateAttribute.Type == v3.AttributeKeyType(v3.MetricTypeExponentialHistogram) {
|
||||||
return constants.SIGNOZ_EXP_HISTOGRAM_TABLENAME
|
return constants.SIGNOZ_EXP_HISTOGRAM_TABLENAME
|
||||||
@ -234,7 +264,7 @@ func PrepareTimeseriesFilterQuery(start, end int64, mq *v3.BuilderQuery) (string
|
|||||||
conditions = append(conditions, fmt.Sprintf("metric_name = %s", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key)))
|
conditions = append(conditions, fmt.Sprintf("metric_name = %s", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key)))
|
||||||
conditions = append(conditions, fmt.Sprintf("temporality = '%s'", mq.Temporality))
|
conditions = append(conditions, fmt.Sprintf("temporality = '%s'", mq.Temporality))
|
||||||
|
|
||||||
start, end, tableName := whichTSTableToUse(start, end)
|
start, end, tableName := whichTSTableToUse(start, end, mq)
|
||||||
|
|
||||||
conditions = append(conditions, fmt.Sprintf("unix_milli >= %d AND unix_milli < %d", start, end))
|
conditions = append(conditions, fmt.Sprintf("unix_milli >= %d AND unix_milli < %d", start, end))
|
||||||
|
|
||||||
@ -314,7 +344,7 @@ func PrepareTimeseriesFilterQueryV3(start, end int64, mq *v3.BuilderQuery) (stri
|
|||||||
conditions = append(conditions, fmt.Sprintf("metric_name = %s", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key)))
|
conditions = append(conditions, fmt.Sprintf("metric_name = %s", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key)))
|
||||||
conditions = append(conditions, fmt.Sprintf("temporality = '%s'", mq.Temporality))
|
conditions = append(conditions, fmt.Sprintf("temporality = '%s'", mq.Temporality))
|
||||||
|
|
||||||
start, end, tableName := whichTSTableToUse(start, end)
|
start, end, tableName := whichTSTableToUse(start, end, mq)
|
||||||
|
|
||||||
conditions = append(conditions, fmt.Sprintf("unix_milli >= %d AND unix_milli < %d", start, end))
|
conditions = append(conditions, fmt.Sprintf("unix_milli >= %d AND unix_milli < %d", start, end))
|
||||||
|
|
||||||
|
@ -763,6 +763,11 @@ type Function struct {
|
|||||||
NamedArgs map[string]interface{} `json:"namedArgs,omitempty"`
|
NamedArgs map[string]interface{} `json:"namedArgs,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type MetricTableHints struct {
|
||||||
|
TimeSeriesTableName string
|
||||||
|
SamplesTableName string
|
||||||
|
}
|
||||||
|
|
||||||
type BuilderQuery struct {
|
type BuilderQuery struct {
|
||||||
QueryName string `json:"queryName"`
|
QueryName string `json:"queryName"`
|
||||||
StepInterval int64 `json:"stepInterval"`
|
StepInterval int64 `json:"stepInterval"`
|
||||||
@ -788,6 +793,7 @@ type BuilderQuery struct {
|
|||||||
ShiftBy int64
|
ShiftBy int64
|
||||||
IsAnomaly bool
|
IsAnomaly bool
|
||||||
QueriesUsedInFormula []string
|
QueriesUsedInFormula []string
|
||||||
|
MetricTableHints *MetricTableHints `json:"-"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *BuilderQuery) SetShiftByFromFunc() {
|
func (b *BuilderQuery) SetShiftByFromFunc() {
|
||||||
@ -1108,9 +1114,16 @@ func (f *FilterItem) CacheKey() string {
|
|||||||
return fmt.Sprintf("key:%s,op:%s,value:%v", f.Key.CacheKey(), f.Operator, f.Value)
|
return fmt.Sprintf("key:%s,op:%s,value:%v", f.Key.CacheKey(), f.Operator, f.Value)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type Direction string
|
||||||
|
|
||||||
|
const (
|
||||||
|
DirectionAsc Direction = "asc"
|
||||||
|
DirectionDesc Direction = "desc"
|
||||||
|
)
|
||||||
|
|
||||||
type OrderBy struct {
|
type OrderBy struct {
|
||||||
ColumnName string `json:"columnName"`
|
ColumnName string `json:"columnName"`
|
||||||
Order string `json:"order"`
|
Order Direction `json:"order"`
|
||||||
Key string `json:"-"`
|
Key string `json:"-"`
|
||||||
DataType AttributeKeyDataType `json:"-"`
|
DataType AttributeKeyDataType `json:"-"`
|
||||||
Type AttributeKeyType `json:"-"`
|
Type AttributeKeyType `json:"-"`
|
||||||
|
Loading…
x
Reference in New Issue
Block a user