chore: add feature flag, handle out-of-index error, some house keeping work (#6344)

This commit is contained in:
Srikanth Chekuri 2024-11-02 01:23:43 +05:30 committed by GitHub
parent c7d0598ec0
commit db4338be42
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 517 additions and 837 deletions

View File

@ -1,6 +1,7 @@
package model package model
import ( import (
"go.signoz.io/signoz/pkg/query-service/constants"
basemodel "go.signoz.io/signoz/pkg/query-service/model" basemodel "go.signoz.io/signoz/pkg/query-service/model"
) )
@ -134,6 +135,13 @@ var BasicPlan = basemodel.FeatureSet{
UsageLimit: -1, UsageLimit: -1,
Route: "", Route: "",
}, },
basemodel.Feature{
Name: basemodel.HostsInfraMonitoring,
Active: constants.EnableHostsInfraMonitoring(),
Usage: 0,
UsageLimit: -1,
Route: "",
},
} }
var ProPlan = basemodel.FeatureSet{ var ProPlan = basemodel.FeatureSet{
@ -249,6 +257,13 @@ var ProPlan = basemodel.FeatureSet{
UsageLimit: -1, UsageLimit: -1,
Route: "", Route: "",
}, },
basemodel.Feature{
Name: basemodel.HostsInfraMonitoring,
Active: constants.EnableHostsInfraMonitoring(),
Usage: 0,
UsageLimit: -1,
Route: "",
},
} }
var EnterprisePlan = basemodel.FeatureSet{ var EnterprisePlan = basemodel.FeatureSet{
@ -378,4 +393,11 @@ var EnterprisePlan = basemodel.FeatureSet{
UsageLimit: -1, UsageLimit: -1,
Route: "", Route: "",
}, },
basemodel.Feature{
Name: basemodel.HostsInfraMonitoring,
Active: constants.EnableHostsInfraMonitoring(),
Usage: 0,
UsageLimit: -1,
Route: "",
},
} }

View File

@ -53,6 +53,10 @@ func getParamsForTopHosts(req model.HostListRequest) (int64, string, string) {
return getParamsForTopItems(req.Start, req.End) return getParamsForTopItems(req.Start, req.End)
} }
func getParamsForTopProcesses(req model.ProcessListRequest) (int64, string, string) {
return getParamsForTopItems(req.Start, req.End)
}
func getParamsForTopPods(req model.PodListRequest) (int64, string, string) { func getParamsForTopPods(req model.PodListRequest) (int64, string, string) {
return getParamsForTopItems(req.Start, req.End) return getParamsForTopItems(req.Start, req.End)
} }

View File

@ -2,10 +2,12 @@ package inframetrics
import ( import (
"context" "context"
"math"
"sort" "sort"
"strings" "strings"
"time" "time"
"go.signoz.io/signoz/pkg/query-service/app/metrics/v4/helpers"
"go.signoz.io/signoz/pkg/query-service/common" "go.signoz.io/signoz/pkg/query-service/common"
"go.signoz.io/signoz/pkg/query-service/interfaces" "go.signoz.io/signoz/pkg/query-service/interfaces"
"go.signoz.io/signoz/pkg/query-service/model" "go.signoz.io/signoz/pkg/query-service/model"
@ -54,9 +56,16 @@ var (
// TODO(srikanthccv): remove hardcoded metric name and support keys from any system metric // TODO(srikanthccv): remove hardcoded metric name and support keys from any system metric
metricToUseForHostAttributes = "system_cpu_load_average_15m" metricToUseForHostAttributes = "system_cpu_load_average_15m"
hostNameAttrKey = "host_name" hostNameAttrKey = "host_name"
// TODO(srikanthccv): remove k8s hacky logic from hosts repo after charts users are migrated agentNameToIgnore = "k8s-infra-otel-agent"
k8sNodeNameAttrKey = "k8s_node_name" hostAttrsToEnrich = []string{
agentNameToIgnore = "k8s-infra-otel-agent" "os_type",
}
metricNamesForHosts = map[string]string{
"cpu": "system_cpu_time",
"memory": "system_memory_usage",
"load15": "system_cpu_load_average_15m",
"wait": "system_cpu_time",
}
) )
func NewHostsRepo(reader interfaces.Reader, querierV2 interfaces.Querier) *HostsRepo { func NewHostsRepo(reader interfaces.Reader, querierV2 interfaces.Querier) *HostsRepo {
@ -112,29 +121,10 @@ func (h *HostsRepo) GetHostAttributeValues(ctx context.Context, req v3.FilterAtt
hostNames = append(hostNames, attributeValue) hostNames = append(hostNames, attributeValue)
} }
req.FilterAttributeKey = k8sNodeNameAttrKey
req.DataSource = v3.DataSourceMetrics
req.AggregateAttribute = metricToUseForHostAttributes
if req.Limit == 0 {
req.Limit = 50
}
attributeValuesResponse, err = h.reader.GetMetricAttributeValues(ctx, &req)
if err != nil {
return nil, err
}
for _, attributeValue := range attributeValuesResponse.StringAttributeValues {
if strings.Contains(attributeValue, agentNameToIgnore) {
continue
}
hostNames = append(hostNames, attributeValue)
}
return &v3.FilterAttributeValueResponse{StringAttributeValues: hostNames}, nil return &v3.FilterAttributeValueResponse{StringAttributeValues: hostNames}, nil
} }
func (h *HostsRepo) getActiveHosts(ctx context.Context, func (h *HostsRepo) getActiveHosts(ctx context.Context, req model.HostListRequest) (map[string]bool, error) {
req model.HostListRequest, hostNameAttrKey string) (map[string]bool, error) {
activeStatus := map[string]bool{} activeStatus := map[string]bool{}
step := common.MinAllowedStepInterval(req.Start, req.End) step := common.MinAllowedStepInterval(req.Start, req.End)
@ -192,12 +182,72 @@ func (h *HostsRepo) getActiveHosts(ctx context.Context,
return activeStatus, nil return activeStatus, nil
} }
// getTopHosts returns the top hosts for the given order by column name func (h *HostsRepo) getMetadataAttributes(ctx context.Context, req model.HostListRequest) (map[string]map[string]string, error) {
func (h *HostsRepo) getTopHosts(ctx context.Context, req model.HostListRequest, q *v3.QueryRangeParamsV3, hostNameAttrKey string) ([]string, []string, error) { hostAttrs := map[string]map[string]string{}
for _, key := range hostAttrsToEnrich {
hasKey := false
for _, groupByKey := range req.GroupBy {
if groupByKey.Key == key {
hasKey = true
break
}
}
if !hasKey {
req.GroupBy = append(req.GroupBy, v3.AttributeKey{Key: key})
}
}
mq := v3.BuilderQuery{
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: metricToUseForHostAttributes,
DataType: v3.AttributeKeyDataTypeFloat64,
},
Temporality: v3.Unspecified,
GroupBy: req.GroupBy,
}
query, err := helpers.PrepareTimeseriesFilterQuery(req.Start, req.End, &mq)
if err != nil {
return nil, err
}
query = localQueryToDistributedQuery(query)
attrsListResponse, err := h.reader.GetListResultV3(ctx, query)
if err != nil {
return nil, err
}
for _, row := range attrsListResponse {
stringData := map[string]string{}
for key, value := range row.Data {
if str, ok := value.(string); ok {
stringData[key] = str
} else if strPtr, ok := value.(*string); ok {
stringData[key] = *strPtr
}
}
hostName := stringData[hostNameAttrKey]
if _, ok := hostAttrs[hostName]; !ok {
hostAttrs[hostName] = map[string]string{}
}
for _, key := range req.GroupBy {
hostAttrs[hostName][key.Key] = stringData[key.Key]
}
}
return hostAttrs, nil
}
func (h *HostsRepo) getTopHostGroups(ctx context.Context, req model.HostListRequest, q *v3.QueryRangeParamsV3) ([]map[string]string, []map[string]string, error) {
step, timeSeriesTableName, samplesTableName := getParamsForTopHosts(req) step, timeSeriesTableName, samplesTableName := getParamsForTopHosts(req)
queryNames := queryNamesForTopHosts[req.OrderBy.ColumnName] queryNames := queryNamesForTopHosts[req.OrderBy.ColumnName]
topHostsQueryRangeParams := &v3.QueryRangeParamsV3{ topHostGroupsQueryRangeParams := &v3.QueryRangeParamsV3{
Start: req.Start, Start: req.Start,
End: req.End, End: req.End,
Step: step, Step: step,
@ -216,19 +266,16 @@ func (h *HostsRepo) getTopHosts(ctx context.Context, req model.HostListRequest,
SamplesTableName: samplesTableName, SamplesTableName: samplesTableName,
} }
if req.Filters != nil && len(req.Filters.Items) > 0 { if req.Filters != nil && len(req.Filters.Items) > 0 {
if query.Filters == nil {
query.Filters = &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{}}
}
query.Filters.Items = append(query.Filters.Items, req.Filters.Items...) query.Filters.Items = append(query.Filters.Items, req.Filters.Items...)
} }
topHostsQueryRangeParams.CompositeQuery.BuilderQueries[queryName] = query topHostGroupsQueryRangeParams.CompositeQuery.BuilderQueries[queryName] = query
} }
queryResponse, _, err := h.querierV2.QueryRange(ctx, topHostsQueryRangeParams) queryResponse, _, err := h.querierV2.QueryRange(ctx, topHostGroupsQueryRangeParams)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
formattedResponse, err := postprocess.PostProcessResult(queryResponse, topHostsQueryRangeParams) formattedResponse, err := postprocess.PostProcessResult(queryResponse, topHostGroupsQueryRangeParams)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
@ -247,238 +294,150 @@ func (h *HostsRepo) getTopHosts(ctx context.Context, req model.HostListRequest,
}) })
} }
paginatedTopHostsSeries := formattedResponse[0].Series[req.Offset : req.Offset+req.Limit] limit := math.Min(float64(req.Offset+req.Limit), float64(len(formattedResponse[0].Series)))
topHosts := []string{} paginatedTopHostGroupsSeries := formattedResponse[0].Series[req.Offset:int(limit)]
for _, series := range paginatedTopHostsSeries {
topHosts = append(topHosts, series.Labels[hostNameAttrKey]) topHostGroups := []map[string]string{}
for _, series := range paginatedTopHostGroupsSeries {
topHostGroups = append(topHostGroups, series.Labels)
} }
allHosts := []string{} allHostGroups := []map[string]string{}
for _, series := range formattedResponse[0].Series { for _, series := range formattedResponse[0].Series {
allHosts = append(allHosts, series.Labels[hostNameAttrKey]) allHostGroups = append(allHostGroups, series.Labels)
} }
return topHosts, allHosts, nil return topHostGroups, allHostGroups, nil
} }
func (h *HostsRepo) getHostsForQuery(ctx context.Context, func (h *HostsRepo) GetHostList(ctx context.Context, req model.HostListRequest) (model.HostListResponse, error) {
req model.HostListRequest, q *v3.QueryRangeParamsV3, hostNameAttrKey string) ([]model.HostListRecord, []string, error) { resp := model.HostListResponse{}
step := common.MinAllowedStepInterval(req.Start, req.End) if req.Limit == 0 {
req.Limit = 10
}
query := q.Clone() // default to cpu order by
if req.OrderBy == nil {
req.OrderBy = &v3.OrderBy{ColumnName: "cpu", Order: v3.DirectionDesc}
}
// default to host name group by
if len(req.GroupBy) == 0 {
req.GroupBy = []v3.AttributeKey{{Key: hostNameAttrKey}}
resp.Type = model.ResponseTypeList
} else {
resp.Type = model.ResponseTypeGroupedList
}
step := int64(math.Max(float64(common.MinAllowedStepInterval(req.Start, req.End)), 60))
query := HostsTableListQuery.Clone()
query.Start = req.Start query.Start = req.Start
query.End = req.End query.End = req.End
query.Step = step query.Step = step
topHosts, allHosts, err := h.getTopHosts(ctx, req, q, hostNameAttrKey)
if err != nil {
return nil, nil, err
}
for _, query := range query.CompositeQuery.BuilderQueries { for _, query := range query.CompositeQuery.BuilderQueries {
query.StepInterval = step query.StepInterval = step
// check if the filter has host_name and is either IN or EQUAL operator
// if so, we don't need to add the topHosts filter again
hasHostNameInOrEqual := false
if req.Filters != nil && len(req.Filters.Items) > 0 { if req.Filters != nil && len(req.Filters.Items) > 0 {
for _, item := range req.Filters.Items {
if item.Key.Key == hostNameAttrKey && (item.Operator == v3.FilterOperatorIn || item.Operator == v3.FilterOperatorEqual) {
hasHostNameInOrEqual = true
}
}
if query.Filters == nil {
query.Filters = &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{}}
}
query.Filters.Items = append(query.Filters.Items, req.Filters.Items...) query.Filters.Items = append(query.Filters.Items, req.Filters.Items...)
// what is happening here?
// if the filter has host_name and we are querying for k8s host metrics,
// we need to replace the host_name with k8s_node_name
if hostNameAttrKey == k8sNodeNameAttrKey {
for idx, item := range query.Filters.Items {
if item.Key.Key == hostNameAttrKey {
query.Filters.Items[idx].Key.Key = k8sNodeNameAttrKey
}
}
}
} }
if !hasHostNameInOrEqual { query.GroupBy = req.GroupBy
if query.Filters == nil { }
query.Filters = &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{}}
} hostAttrs, err := h.getMetadataAttributes(ctx, req)
query.Filters.Items = append(query.Filters.Items, v3.FilterItem{ if err != nil {
Key: v3.AttributeKey{ return resp, err
Key: hostNameAttrKey, }
},
Value: topHosts, activeHosts, err := h.getActiveHosts(ctx, req)
Operator: v3.FilterOperatorIn, if err != nil {
}) return resp, err
}
topHostGroups, allHostGroups, err := h.getTopHostGroups(ctx, req, query)
if err != nil {
return resp, err
}
groupFilters := map[string][]string{}
for _, topHostGroup := range topHostGroups {
for k, v := range topHostGroup {
groupFilters[k] = append(groupFilters[k], v)
} }
} }
activeHosts, err := h.getActiveHosts(ctx, req, hostNameAttrKey) for groupKey, groupValues := range groupFilters {
if err != nil { hasGroupFilter := false
return nil, nil, err if req.Filters != nil && len(req.Filters.Items) > 0 {
for _, filter := range req.Filters.Items {
if filter.Key.Key == groupKey {
hasGroupFilter = true
break
}
}
}
if !hasGroupFilter {
for _, query := range query.CompositeQuery.BuilderQueries {
query.Filters.Items = append(query.Filters.Items, v3.FilterItem{
Key: v3.AttributeKey{Key: groupKey},
Value: groupValues,
Operator: v3.FilterOperatorIn,
})
}
}
} }
queryResponse, _, err := h.querierV2.QueryRange(ctx, query) queryResponse, _, err := h.querierV2.QueryRange(ctx, query)
if err != nil { if err != nil {
return nil, nil, err return resp, err
} }
type hostTSInfo struct { formattedResponse, err := postprocess.PostProcessResult(queryResponse, query)
cpuTimeSeries *v3.Series
memoryTimeSeries *v3.Series
waitTimeSeries *v3.Series
load15TimeSeries *v3.Series
}
hostTSInfoMap := map[string]*hostTSInfo{}
for _, result := range queryResponse {
for _, series := range result.Series {
hostName := series.Labels[hostNameAttrKey]
if _, ok := hostTSInfoMap[hostName]; !ok {
hostTSInfoMap[hostName] = &hostTSInfo{}
}
if result.QueryName == "G" {
loadSeries := *series
hostTSInfoMap[hostName].load15TimeSeries = &loadSeries
}
}
}
query.FormatForWeb = false
query.CompositeQuery.PanelType = v3.PanelTypeGraph
formulaResult, err := postprocess.PostProcessResult(queryResponse, query)
if err != nil { if err != nil {
return nil, nil, err return resp, err
} }
for _, result := range formulaResult {
for _, series := range result.Series {
hostName := series.Labels[hostNameAttrKey]
if _, ok := hostTSInfoMap[hostName]; !ok {
hostTSInfoMap[hostName] = &hostTSInfo{}
}
if result.QueryName == "F1" {
hostTSInfoMap[hostName].cpuTimeSeries = series
} else if result.QueryName == "F2" {
hostTSInfoMap[hostName].memoryTimeSeries = series
} else if result.QueryName == "F3" {
hostTSInfoMap[hostName].waitTimeSeries = series
}
}
}
query.FormatForWeb = true
query.CompositeQuery.PanelType = v3.PanelTypeTable
formattedResponse, _ := postprocess.PostProcessResult(queryResponse, query)
records := []model.HostListRecord{} records := []model.HostListRecord{}
// there should be only one result in the response for _, result := range formattedResponse {
hostsInfo := formattedResponse[0] for _, row := range result.Table.Rows {
// each row represents a host record := model.HostListRecord{
for _, row := range hostsInfo.Table.Rows { CPU: -1,
record := model.HostListRecord{ Memory: -1,
CPU: -1, Wait: -1,
Memory: -1, Load15: -1,
Wait: -1, }
Load15: -1,
}
hostName, ok := row.Data[hostNameAttrKey].(string) if hostName, ok := row.Data[hostNameAttrKey].(string); ok {
if ok { record.HostName = hostName
record.HostName = hostName }
}
osType, ok := row.Data["os_type"].(string) if cpu, ok := row.Data["F1"].(float64); ok {
if ok { record.CPU = cpu
record.OS = osType }
} if memory, ok := row.Data["F2"].(float64); ok {
record.Memory = memory
cpu, ok := row.Data["F1"].(float64) }
if ok { if wait, ok := row.Data["F3"].(float64); ok {
record.CPU = cpu record.Wait = wait
} }
memory, ok := row.Data["F2"].(float64) if load15, ok := row.Data["G"].(float64); ok {
if ok { record.Load15 = load15
record.Memory = memory }
} record.Meta = map[string]string{}
wait, ok := row.Data["F3"].(float64) if _, ok := hostAttrs[record.HostName]; ok {
if ok { record.Meta = hostAttrs[record.HostName]
record.Wait = wait }
} if osType, ok := record.Meta["os_type"]; ok {
load15, ok := row.Data["G"].(float64) record.OS = osType
if ok { }
record.Load15 = load15 record.Active = activeHosts[record.HostName]
} records = append(records, record)
record.Active = activeHosts[record.HostName]
if hostTSInfoMap[record.HostName] != nil {
record.CPUTimeSeries = hostTSInfoMap[record.HostName].cpuTimeSeries
record.MemoryTimeSeries = hostTSInfoMap[record.HostName].memoryTimeSeries
record.WaitTimeSeries = hostTSInfoMap[record.HostName].waitTimeSeries
record.Load15TimeSeries = hostTSInfoMap[record.HostName].load15TimeSeries
}
records = append(records, record)
}
return records, allHosts, nil
}
func dedupRecords(records []model.HostListRecord) []model.HostListRecord {
seen := map[string]bool{}
deduped := []model.HostListRecord{}
for _, record := range records {
if !seen[record.HostName] {
seen[record.HostName] = true
deduped = append(deduped, record)
} }
} }
return deduped resp.Total = len(allHostGroups)
}
func (h *HostsRepo) GetHostList(ctx context.Context, req model.HostListRequest) (model.HostListResponse, error) {
if req.Limit == 0 {
req.Limit = 10
}
if req.OrderBy == nil {
req.OrderBy = &v3.OrderBy{ColumnName: "cpu", Order: v3.DirectionDesc}
}
resp := model.HostListResponse{
Type: "list",
}
vmRecords, vmAllHosts, err := h.getHostsForQuery(ctx, req, &NonK8STableListQuery, hostNameAttrKey)
if err != nil {
return resp, err
}
k8sRecords, k8sAllHosts, err := h.getHostsForQuery(ctx, req, &K8STableListQuery, k8sNodeNameAttrKey)
if err != nil {
return resp, err
}
uniqueHosts := map[string]bool{}
for _, host := range vmAllHosts {
uniqueHosts[host] = true
}
for _, host := range k8sAllHosts {
uniqueHosts[host] = true
}
records := append(vmRecords, k8sRecords...)
// since we added the fix for incorrect host name, it is possible that both host_name and k8s_node_name
// are present in the response. we need to dedup the results.
records = dedupRecords(records)
resp.Total = len(uniqueHosts)
resp.Records = records resp.Records = records
return resp, nil return resp, nil

View File

@ -2,14 +2,14 @@ package inframetrics
import v3 "go.signoz.io/signoz/pkg/query-service/model/v3" import v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
var NonK8STableListQuery = v3.QueryRangeParamsV3{ var HostsTableListQuery = v3.QueryRangeParamsV3{
CompositeQuery: &v3.CompositeQuery{ CompositeQuery: &v3.CompositeQuery{
BuilderQueries: map[string]*v3.BuilderQuery{ BuilderQueries: map[string]*v3.BuilderQuery{
"A": { "A": {
QueryName: "A", QueryName: "A",
DataSource: v3.DataSourceMetrics, DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{ AggregateAttribute: v3.AttributeKey{
Key: "system_cpu_time", Key: metricNamesForHosts["cpu"],
DataType: v3.AttributeKeyDataTypeFloat64, DataType: v3.AttributeKeyDataTypeFloat64,
}, },
Temporality: v3.Cumulative, Temporality: v3.Cumulative,
@ -27,23 +27,18 @@ var NonK8STableListQuery = v3.QueryRangeParamsV3{
}, },
{ {
Key: v3.AttributeKey{ Key: v3.AttributeKey{
Key: "host_name", Key: hostNameAttrKey,
DataType: v3.AttributeKeyDataTypeString, DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeResource, Type: v3.AttributeKeyTypeResource,
}, },
Operator: v3.FilterOperatorNotContains, Operator: v3.FilterOperatorNotContains,
Value: "k8s-infra-otel-agent", Value: agentNameToIgnore,
}, },
}, },
}, },
GroupBy: []v3.AttributeKey{ GroupBy: []v3.AttributeKey{
{ {
Key: "host_name", Key: hostNameAttrKey,
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeResource,
},
{
Key: "os_type",
DataType: v3.AttributeKeyDataTypeString, DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeResource, Type: v3.AttributeKeyTypeResource,
}, },
@ -58,7 +53,7 @@ var NonK8STableListQuery = v3.QueryRangeParamsV3{
QueryName: "B", QueryName: "B",
DataSource: v3.DataSourceMetrics, DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{ AggregateAttribute: v3.AttributeKey{
Key: "system_cpu_time", Key: metricNamesForHosts["cpu"],
DataType: v3.AttributeKeyDataTypeFloat64, DataType: v3.AttributeKeyDataTypeFloat64,
}, },
Temporality: v3.Cumulative, Temporality: v3.Cumulative,
@ -67,23 +62,18 @@ var NonK8STableListQuery = v3.QueryRangeParamsV3{
Items: []v3.FilterItem{ Items: []v3.FilterItem{
{ {
Key: v3.AttributeKey{ Key: v3.AttributeKey{
Key: "host_name", Key: hostNameAttrKey,
DataType: v3.AttributeKeyDataTypeString, DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeResource, Type: v3.AttributeKeyTypeResource,
}, },
Operator: v3.FilterOperatorNotContains, Operator: v3.FilterOperatorNotContains,
Value: "k8s-infra-otel-agent", Value: agentNameToIgnore,
}, },
}, },
}, },
GroupBy: []v3.AttributeKey{ GroupBy: []v3.AttributeKey{
{ {
Key: "host_name", Key: hostNameAttrKey,
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeResource,
},
{
Key: "os_type",
DataType: v3.AttributeKeyDataTypeString, DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeResource, Type: v3.AttributeKeyTypeResource,
}, },
@ -98,12 +88,16 @@ var NonK8STableListQuery = v3.QueryRangeParamsV3{
QueryName: "F1", QueryName: "F1",
Expression: "A/B", Expression: "A/B",
Legend: "CPU Usage (%)", Legend: "CPU Usage (%)",
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{},
},
}, },
"C": { "C": {
QueryName: "C", QueryName: "C",
DataSource: v3.DataSourceMetrics, DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{ AggregateAttribute: v3.AttributeKey{
Key: "system_memory_usage", Key: metricNamesForHosts["memory"],
DataType: v3.AttributeKeyDataTypeFloat64, DataType: v3.AttributeKeyDataTypeFloat64,
}, },
Temporality: v3.Cumulative, Temporality: v3.Cumulative,
@ -121,23 +115,18 @@ var NonK8STableListQuery = v3.QueryRangeParamsV3{
}, },
{ {
Key: v3.AttributeKey{ Key: v3.AttributeKey{
Key: "host_name", Key: hostNameAttrKey,
DataType: v3.AttributeKeyDataTypeString, DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeResource, Type: v3.AttributeKeyTypeResource,
}, },
Operator: v3.FilterOperatorNotContains, Operator: v3.FilterOperatorNotContains,
Value: "k8s-infra-otel-agent", Value: agentNameToIgnore,
}, },
}, },
}, },
GroupBy: []v3.AttributeKey{ GroupBy: []v3.AttributeKey{
{ {
Key: "host_name", Key: hostNameAttrKey,
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeResource,
},
{
Key: "os_type",
DataType: v3.AttributeKeyDataTypeString, DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeResource, Type: v3.AttributeKeyTypeResource,
}, },
@ -152,7 +141,7 @@ var NonK8STableListQuery = v3.QueryRangeParamsV3{
QueryName: "D", QueryName: "D",
DataSource: v3.DataSourceMetrics, DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{ AggregateAttribute: v3.AttributeKey{
Key: "system_memory_usage", Key: metricNamesForHosts["memory"],
DataType: v3.AttributeKeyDataTypeFloat64, DataType: v3.AttributeKeyDataTypeFloat64,
}, },
Temporality: v3.Cumulative, Temporality: v3.Cumulative,
@ -161,23 +150,18 @@ var NonK8STableListQuery = v3.QueryRangeParamsV3{
Items: []v3.FilterItem{ Items: []v3.FilterItem{
{ {
Key: v3.AttributeKey{ Key: v3.AttributeKey{
Key: "host_name", Key: hostNameAttrKey,
DataType: v3.AttributeKeyDataTypeString, DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeResource, Type: v3.AttributeKeyTypeResource,
}, },
Operator: v3.FilterOperatorNotContains, Operator: v3.FilterOperatorNotContains,
Value: "k8s-infra-otel-agent", Value: agentNameToIgnore,
}, },
}, },
}, },
GroupBy: []v3.AttributeKey{ GroupBy: []v3.AttributeKey{
{ {
Key: "host_name", Key: hostNameAttrKey,
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeResource,
},
{
Key: "os_type",
DataType: v3.AttributeKeyDataTypeString, DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeResource, Type: v3.AttributeKeyTypeResource,
}, },
@ -192,12 +176,16 @@ var NonK8STableListQuery = v3.QueryRangeParamsV3{
QueryName: "F2", QueryName: "F2",
Expression: "C/D", Expression: "C/D",
Legend: "Memory Usage (%)", Legend: "Memory Usage (%)",
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{},
},
}, },
"E": { "E": {
QueryName: "E", QueryName: "E",
DataSource: v3.DataSourceMetrics, DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{ AggregateAttribute: v3.AttributeKey{
Key: "system_cpu_time", Key: metricNamesForHosts["wait"],
DataType: v3.AttributeKeyDataTypeFloat64, DataType: v3.AttributeKeyDataTypeFloat64,
}, },
Temporality: v3.Cumulative, Temporality: v3.Cumulative,
@ -215,23 +203,18 @@ var NonK8STableListQuery = v3.QueryRangeParamsV3{
}, },
{ {
Key: v3.AttributeKey{ Key: v3.AttributeKey{
Key: "host_name", Key: hostNameAttrKey,
DataType: v3.AttributeKeyDataTypeString, DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeResource, Type: v3.AttributeKeyTypeResource,
}, },
Operator: v3.FilterOperatorNotContains, Operator: v3.FilterOperatorNotContains,
Value: "k8s-infra-otel-agent", Value: agentNameToIgnore,
}, },
}, },
}, },
GroupBy: []v3.AttributeKey{ GroupBy: []v3.AttributeKey{
{ {
Key: "host_name", Key: hostNameAttrKey,
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeResource,
},
{
Key: "os_type",
DataType: v3.AttributeKeyDataTypeString, DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeResource, Type: v3.AttributeKeyTypeResource,
}, },
@ -246,7 +229,7 @@ var NonK8STableListQuery = v3.QueryRangeParamsV3{
QueryName: "F", QueryName: "F",
DataSource: v3.DataSourceMetrics, DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{ AggregateAttribute: v3.AttributeKey{
Key: "system_cpu_time", Key: metricNamesForHosts["wait"],
DataType: v3.AttributeKeyDataTypeFloat64, DataType: v3.AttributeKeyDataTypeFloat64,
}, },
Temporality: v3.Cumulative, Temporality: v3.Cumulative,
@ -255,23 +238,18 @@ var NonK8STableListQuery = v3.QueryRangeParamsV3{
Items: []v3.FilterItem{ Items: []v3.FilterItem{
{ {
Key: v3.AttributeKey{ Key: v3.AttributeKey{
Key: "host_name", Key: hostNameAttrKey,
DataType: v3.AttributeKeyDataTypeString, DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeResource, Type: v3.AttributeKeyTypeResource,
}, },
Operator: v3.FilterOperatorNotContains, Operator: v3.FilterOperatorNotContains,
Value: "k8s-infra-otel-agent", Value: agentNameToIgnore,
}, },
}, },
}, },
GroupBy: []v3.AttributeKey{ GroupBy: []v3.AttributeKey{
{ {
Key: "host_name", Key: hostNameAttrKey,
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeResource,
},
{
Key: "os_type",
DataType: v3.AttributeKeyDataTypeString, DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeResource, Type: v3.AttributeKeyTypeResource,
}, },
@ -286,12 +264,16 @@ var NonK8STableListQuery = v3.QueryRangeParamsV3{
QueryName: "F3", QueryName: "F3",
Expression: "E/F", Expression: "E/F",
Legend: "CPU Wait Time (%)", Legend: "CPU Wait Time (%)",
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{},
},
}, },
"G": { "G": {
QueryName: "G", QueryName: "G",
DataSource: v3.DataSourceMetrics, DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{ AggregateAttribute: v3.AttributeKey{
Key: "system_cpu_load_average_15m", Key: metricNamesForHosts["load15"],
DataType: v3.AttributeKeyDataTypeFloat64, DataType: v3.AttributeKeyDataTypeFloat64,
}, },
Temporality: v3.Unspecified, Temporality: v3.Unspecified,
@ -300,23 +282,18 @@ var NonK8STableListQuery = v3.QueryRangeParamsV3{
Items: []v3.FilterItem{ Items: []v3.FilterItem{
{ {
Key: v3.AttributeKey{ Key: v3.AttributeKey{
Key: "host_name", Key: hostNameAttrKey,
DataType: v3.AttributeKeyDataTypeString, DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeResource, Type: v3.AttributeKeyTypeResource,
}, },
Operator: v3.FilterOperatorNotContains, Operator: v3.FilterOperatorNotContains,
Value: "k8s-infra-otel-agent", Value: agentNameToIgnore,
}, },
}, },
}, },
GroupBy: []v3.AttributeKey{ GroupBy: []v3.AttributeKey{
{ {
Key: "host_name", Key: hostNameAttrKey,
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeResource,
},
{
Key: "os_type",
DataType: v3.AttributeKeyDataTypeString, DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeResource, Type: v3.AttributeKeyTypeResource,
}, },
@ -335,69 +312,3 @@ var NonK8STableListQuery = v3.QueryRangeParamsV3{
Version: "v4", Version: "v4",
FormatForWeb: true, FormatForWeb: true,
} }
var ProcessesTableListQuery = v3.QueryRangeParamsV3{
CompositeQuery: &v3.CompositeQuery{
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
QueryName: "A",
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: "process_cpu_time",
DataType: v3.AttributeKeyDataTypeFloat64,
},
Temporality: v3.Cumulative,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{},
},
GroupBy: []v3.AttributeKey{
{
Key: "process_pid",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeResource,
},
},
Expression: "A",
ReduceTo: v3.ReduceToOperatorAvg,
TimeAggregation: v3.TimeAggregationRate,
SpaceAggregation: v3.SpaceAggregationSum,
Disabled: true,
},
"F1": {
QueryName: "F1",
Expression: "A",
Legend: "Process CPU Usage (%)",
},
"C": {
QueryName: "C",
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: "process_memory_usage",
DataType: v3.AttributeKeyDataTypeFloat64,
},
Temporality: v3.Cumulative,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{},
},
GroupBy: []v3.AttributeKey{
{
Key: "process_pid",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeResource,
},
},
Expression: "C",
ReduceTo: v3.ReduceToOperatorAvg,
TimeAggregation: v3.TimeAggregationAvg,
SpaceAggregation: v3.SpaceAggregationSum,
Disabled: false,
},
},
PanelType: v3.PanelTypeTable,
QueryType: v3.QueryTypeBuilder,
},
Version: "v4",
FormatForWeb: true,
}

View File

@ -178,7 +178,9 @@ func (p *NamespacesRepo) getTopNamespaceGroups(ctx context.Context, req model.Na
}) })
} }
paginatedTopNamespaceGroupsSeries := formattedResponse[0].Series[req.Offset : req.Offset+req.Limit] limit := math.Min(float64(req.Offset+req.Limit), float64(len(formattedResponse[0].Series)))
paginatedTopNamespaceGroupsSeries := formattedResponse[0].Series[req.Offset:int(limit)]
topNamespaceGroups := []map[string]string{} topNamespaceGroups := []map[string]string{}
for _, series := range paginatedTopNamespaceGroupsSeries { for _, series := range paginatedTopNamespaceGroupsSeries {

View File

@ -217,7 +217,9 @@ func (p *PodsRepo) getTopPodGroups(ctx context.Context, req model.PodListRequest
}) })
} }
paginatedTopPodGroupsSeries := formattedResponse[0].Series[req.Offset : req.Offset+req.Limit] limit := math.Min(float64(req.Offset+req.Limit), float64(len(formattedResponse[0].Series)))
paginatedTopPodGroupsSeries := formattedResponse[0].Series[req.Offset:int(limit)]
topPodGroups := []map[string]string{} topPodGroups := []map[string]string{}
for _, series := range paginatedTopPodGroupsSeries { for _, series := range paginatedTopPodGroupsSeries {

View File

@ -0,0 +1,73 @@
package inframetrics
import v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
var ProcessesTableListQuery = v3.QueryRangeParamsV3{
CompositeQuery: &v3.CompositeQuery{
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
QueryName: "A",
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: metricNamesForProcesses["cpu"],
DataType: v3.AttributeKeyDataTypeFloat64,
},
Temporality: v3.Cumulative,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{},
},
GroupBy: []v3.AttributeKey{
{
Key: processPIDAttrKey,
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeResource,
},
},
Expression: "A",
ReduceTo: v3.ReduceToOperatorAvg,
TimeAggregation: v3.TimeAggregationRate,
SpaceAggregation: v3.SpaceAggregationSum,
Disabled: true,
},
"F1": {
QueryName: "F1",
Expression: "A",
Legend: "Process CPU Usage (%)",
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{},
},
},
"C": {
QueryName: "C",
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: metricNamesForProcesses["memory"],
DataType: v3.AttributeKeyDataTypeFloat64,
},
Temporality: v3.Cumulative,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{},
},
GroupBy: []v3.AttributeKey{
{
Key: processPIDAttrKey,
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeResource,
},
},
Expression: "C",
ReduceTo: v3.ReduceToOperatorAvg,
TimeAggregation: v3.TimeAggregationAvg,
SpaceAggregation: v3.SpaceAggregationSum,
Disabled: false,
},
},
PanelType: v3.PanelTypeTable,
QueryType: v3.QueryTypeBuilder,
},
Version: "v4",
FormatForWeb: true,
}

View File

@ -2,9 +2,8 @@ package inframetrics
import ( import (
"context" "context"
"fmt"
"math" "math"
"strings" "sort"
"go.signoz.io/signoz/pkg/query-service/app/metrics/v4/helpers" "go.signoz.io/signoz/pkg/query-service/app/metrics/v4/helpers"
"go.signoz.io/signoz/pkg/query-service/common" "go.signoz.io/signoz/pkg/query-service/common"
@ -15,6 +14,23 @@ import (
"golang.org/x/exp/slices" "golang.org/x/exp/slices"
) )
var (
queryNamesForTopProcesses = map[string][]string{
"cpu": {"A"},
"memory": {"C"},
}
processPIDAttrKey = "process_pid"
metricNamesForProcesses = map[string]string{
"cpu": "process_cpu_time",
"memory": "process_memory_usage",
}
metricToUseForProcessAttributes = "process_memory_usage"
processNameAttrKey = "process_executable_name"
processCMDAttrKey = "process_command"
processCMDLineAttrKey = "process_command_line"
)
type ProcessesRepo struct { type ProcessesRepo struct {
reader interfaces.Reader reader interfaces.Reader
querierV2 interfaces.Querier querierV2 interfaces.Querier
@ -64,14 +80,6 @@ func (p *ProcessesRepo) GetProcessAttributeValues(ctx context.Context, req v3.Fi
return attributeValuesResponse, nil return attributeValuesResponse, nil
} }
func getGroupKeyForProcesses(record model.ProcessListRecord, groupBy []v3.AttributeKey) string {
groupKey := ""
for _, key := range groupBy {
groupKey += fmt.Sprintf("%s=%s,", key.Key, record.Meta[key.Key])
}
return groupKey
}
func (p *ProcessesRepo) getMetadataAttributes(ctx context.Context, func (p *ProcessesRepo) getMetadataAttributes(ctx context.Context,
req model.ProcessListRequest) (map[string]map[string]string, error) { req model.ProcessListRequest) (map[string]map[string]string, error) {
processAttrs := map[string]map[string]string{} processAttrs := map[string]map[string]string{}
@ -92,7 +100,7 @@ func (p *ProcessesRepo) getMetadataAttributes(ctx context.Context,
mq := v3.BuilderQuery{ mq := v3.BuilderQuery{
AggregateAttribute: v3.AttributeKey{ AggregateAttribute: v3.AttributeKey{
Key: "process_memory_usage", Key: metricToUseForProcessAttributes,
DataType: v3.AttributeKeyDataTypeFloat64, DataType: v3.AttributeKeyDataTypeFloat64,
}, },
Temporality: v3.Cumulative, Temporality: v3.Cumulative,
@ -104,14 +112,7 @@ func (p *ProcessesRepo) getMetadataAttributes(ctx context.Context,
return nil, err return nil, err
} }
// TODO(srikanthccv): remove this query = localQueryToDistributedQuery(query)
// What is happening here?
// The `PrepareTimeseriesFilterQuery` uses the local time series table for sub-query because each fingerprint
// goes to same shard.
// However, in this case, we are interested in the attributes values across all the shards.
// So, we replace the local time series table with the distributed time series table.
// See `PrepareTimeseriesFilterQuery` for more details.
query = strings.Replace(query, ".time_series_v4", ".distributed_time_series_v4", 1)
attrsListResponse, err := p.reader.GetListResultV3(ctx, query) attrsListResponse, err := p.reader.GetListResultV3(ctx, query)
if err != nil { if err != nil {
@ -128,36 +129,108 @@ func (p *ProcessesRepo) getMetadataAttributes(ctx context.Context,
} }
} }
pid := stringData["process_pid"] processID := stringData[processPIDAttrKey]
if _, ok := processAttrs[pid]; !ok { if _, ok := processAttrs[processID]; !ok {
processAttrs[pid] = map[string]string{} processAttrs[processID] = map[string]string{}
} }
for _, key := range req.GroupBy { for _, key := range req.GroupBy {
processAttrs[pid][key.Key] = stringData[key.Key] processAttrs[processID][key.Key] = stringData[key.Key]
} }
} }
return processAttrs, nil return processAttrs, nil
} }
func (p *ProcessesRepo) getTopProcessGroups(ctx context.Context, req model.ProcessListRequest, q *v3.QueryRangeParamsV3) ([]map[string]string, []map[string]string, error) {
step, timeSeriesTableName, samplesTableName := getParamsForTopProcesses(req)
queryNames := queryNamesForTopProcesses[req.OrderBy.ColumnName]
topProcessGroupsQueryRangeParams := &v3.QueryRangeParamsV3{
Start: req.Start,
End: req.End,
Step: step,
CompositeQuery: &v3.CompositeQuery{
BuilderQueries: map[string]*v3.BuilderQuery{},
QueryType: v3.QueryTypeBuilder,
PanelType: v3.PanelTypeTable,
},
}
for _, queryName := range queryNames {
query := q.CompositeQuery.BuilderQueries[queryName].Clone()
query.StepInterval = step
query.MetricTableHints = &v3.MetricTableHints{
TimeSeriesTableName: timeSeriesTableName,
SamplesTableName: samplesTableName,
}
if req.Filters != nil && len(req.Filters.Items) > 0 {
query.Filters.Items = append(query.Filters.Items, req.Filters.Items...)
}
topProcessGroupsQueryRangeParams.CompositeQuery.BuilderQueries[queryName] = query
}
queryResponse, _, err := p.querierV2.QueryRange(ctx, topProcessGroupsQueryRangeParams)
if err != nil {
return nil, nil, err
}
formattedResponse, err := postprocess.PostProcessResult(queryResponse, topProcessGroupsQueryRangeParams)
if err != nil {
return nil, nil, err
}
if len(formattedResponse) == 0 || len(formattedResponse[0].Series) == 0 {
return nil, nil, nil
}
if req.OrderBy.Order == v3.DirectionDesc {
sort.Slice(formattedResponse[0].Series, func(i, j int) bool {
return formattedResponse[0].Series[i].Points[0].Value > formattedResponse[0].Series[j].Points[0].Value
})
} else {
sort.Slice(formattedResponse[0].Series, func(i, j int) bool {
return formattedResponse[0].Series[i].Points[0].Value < formattedResponse[0].Series[j].Points[0].Value
})
}
limit := math.Min(float64(req.Offset+req.Limit), float64(len(formattedResponse[0].Series)))
paginatedTopProcessGroupsSeries := formattedResponse[0].Series[req.Offset:int(limit)]
topProcessGroups := []map[string]string{}
for _, series := range paginatedTopProcessGroupsSeries {
topProcessGroups = append(topProcessGroups, series.Labels)
}
allProcessGroups := []map[string]string{}
for _, series := range formattedResponse[0].Series {
allProcessGroups = append(allProcessGroups, series.Labels)
}
return topProcessGroups, allProcessGroups, nil
}
func (p *ProcessesRepo) GetProcessList(ctx context.Context, req model.ProcessListRequest) (model.ProcessListResponse, error) { func (p *ProcessesRepo) GetProcessList(ctx context.Context, req model.ProcessListRequest) (model.ProcessListResponse, error) {
resp := model.ProcessListResponse{}
if req.Limit == 0 { if req.Limit == 0 {
req.Limit = 10 req.Limit = 10
} }
resp := model.ProcessListResponse{ // default to cpu order by
Type: "list", if req.OrderBy == nil {
req.OrderBy = &v3.OrderBy{ColumnName: "cpu", Order: v3.DirectionDesc}
} }
step := common.MinAllowedStepInterval(req.Start, req.End) // default to process pid group by
if len(req.GroupBy) == 0 {
req.GroupBy = []v3.AttributeKey{{Key: processPIDAttrKey}}
resp.Type = model.ResponseTypeList
} else {
resp.Type = model.ResponseTypeGroupedList
}
step := int64(math.Max(float64(common.MinAllowedStepInterval(req.Start, req.End)), 60))
query := ProcessesTableListQuery.Clone() query := ProcessesTableListQuery.Clone()
if req.OrderBy != nil {
for _, q := range query.CompositeQuery.BuilderQueries {
q.OrderBy = []v3.OrderBy{*req.OrderBy}
}
}
query.Start = req.Start query.Start = req.Start
query.End = req.End query.End = req.End
@ -166,11 +239,9 @@ func (p *ProcessesRepo) GetProcessList(ctx context.Context, req model.ProcessLis
for _, query := range query.CompositeQuery.BuilderQueries { for _, query := range query.CompositeQuery.BuilderQueries {
query.StepInterval = step query.StepInterval = step
if req.Filters != nil && len(req.Filters.Items) > 0 { if req.Filters != nil && len(req.Filters.Items) > 0 {
if query.Filters == nil {
query.Filters = &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{}}
}
query.Filters.Items = append(query.Filters.Items, req.Filters.Items...) query.Filters.Items = append(query.Filters.Items, req.Filters.Items...)
} }
query.GroupBy = req.GroupBy
} }
processAttrs, err := p.getMetadataAttributes(ctx, req) processAttrs, err := p.getMetadataAttributes(ctx, req)
@ -178,157 +249,83 @@ func (p *ProcessesRepo) GetProcessList(ctx context.Context, req model.ProcessLis
return resp, err return resp, err
} }
topProcessGroups, allProcessGroups, err := p.getTopProcessGroups(ctx, req, query)
if err != nil {
return resp, err
}
groupFilters := map[string][]string{}
for _, topProcessGroup := range topProcessGroups {
for k, v := range topProcessGroup {
groupFilters[k] = append(groupFilters[k], v)
}
}
for groupKey, groupValues := range groupFilters {
hasGroupFilter := false
if req.Filters != nil && len(req.Filters.Items) > 0 {
for _, filter := range req.Filters.Items {
if filter.Key.Key == groupKey {
hasGroupFilter = true
break
}
}
}
if !hasGroupFilter {
for _, query := range query.CompositeQuery.BuilderQueries {
query.Filters.Items = append(query.Filters.Items, v3.FilterItem{
Key: v3.AttributeKey{Key: groupKey},
Value: groupValues,
Operator: v3.FilterOperatorIn,
})
}
}
}
queryResponse, _, err := p.querierV2.QueryRange(ctx, query) queryResponse, _, err := p.querierV2.QueryRange(ctx, query)
if err != nil { if err != nil {
return resp, err return resp, err
} }
type processTSInfo struct {
CpuTimeSeries *v3.Series `json:"cpu_time_series"`
MemoryTimeSeries *v3.Series `json:"memory_time_series"`
}
processTSInfoMap := map[string]*processTSInfo{}
for _, result := range queryResponse {
for _, series := range result.Series {
pid := series.Labels["process_pid"]
if _, ok := processTSInfoMap[pid]; !ok {
processTSInfoMap[pid] = &processTSInfo{}
}
}
}
query.FormatForWeb = false
query.CompositeQuery.PanelType = v3.PanelTypeGraph
formulaResult, err := postprocess.PostProcessResult(queryResponse, query)
if err != nil {
return resp, err
}
for _, result := range formulaResult {
for _, series := range result.Series {
pid := series.Labels["process_pid"]
if _, ok := processTSInfoMap[pid]; !ok {
processTSInfoMap[pid] = &processTSInfo{}
}
loadSeries := *series
if result.QueryName == "F1" {
processTSInfoMap[pid].CpuTimeSeries = &loadSeries
} else if result.QueryName == "C" {
processTSInfoMap[pid].MemoryTimeSeries = &loadSeries
}
}
}
query.FormatForWeb = true
query.CompositeQuery.PanelType = v3.PanelTypeTable
formattedResponse, err := postprocess.PostProcessResult(queryResponse, query) formattedResponse, err := postprocess.PostProcessResult(queryResponse, query)
if err != nil { if err != nil {
return resp, err return resp, err
} }
if len(formattedResponse) == 0 {
return resp, nil
}
records := []model.ProcessListRecord{} records := []model.ProcessListRecord{}
// there should be only one result in the response for _, result := range formattedResponse {
processInfo := formattedResponse[0] for _, row := range result.Table.Rows {
record := model.ProcessListRecord{
ProcessCPU: -1,
ProcessMemory: -1,
}
for _, row := range processInfo.Table.Rows { pid, ok := row.Data[processPIDAttrKey].(string)
record := model.ProcessListRecord{ if ok {
ProcessCPU: -1, record.ProcessID = pid
ProcessMemory: -1, }
}
pid, ok := row.Data["process_pid"].(string) processCPU, ok := row.Data["F1"].(float64)
if ok { if ok {
record.ProcessID = pid record.ProcessCPU = processCPU
} }
processCPU, ok := row.Data["F1"].(float64) processMemory, ok := row.Data["C"].(float64)
if ok { if ok {
record.ProcessCPU = processCPU record.ProcessMemory = processMemory
}
record.Meta = processAttrs[record.ProcessID]
record.ProcessName = record.Meta[processNameAttrKey]
record.ProcessCMD = record.Meta[processCMDAttrKey]
record.ProcessCMDLine = record.Meta[processCMDLineAttrKey]
records = append(records, record)
} }
processMemory, ok := row.Data["C"].(float64)
if ok {
record.ProcessMemory = processMemory
}
record.Meta = processAttrs[record.ProcessID]
if processTSInfoMap[record.ProcessID] != nil {
record.ProcessCPUTimeSeries = processTSInfoMap[record.ProcessID].CpuTimeSeries
record.ProcessMemoryTimeSeries = processTSInfoMap[record.ProcessID].MemoryTimeSeries
}
record.ProcessName = record.Meta["process_executable_name"]
record.ProcessCMD = record.Meta["process_command"]
record.ProcessCMDLine = record.Meta["process_command_line"]
records = append(records, record)
} }
resp.Total = len(records) resp.Total = len(allProcessGroups)
if req.Offset > 0 {
records = records[req.Offset:]
}
if req.Limit > 0 && len(records) > req.Limit {
records = records[:req.Limit]
}
resp.Records = records resp.Records = records
if len(req.GroupBy) > 0 {
groups := []model.ProcessListGroup{}
groupMap := make(map[string][]model.ProcessListRecord)
for _, record := range records {
groupKey := getGroupKeyForProcesses(record, req.GroupBy)
if _, ok := groupMap[groupKey]; !ok {
groupMap[groupKey] = []model.ProcessListRecord{record}
} else {
groupMap[groupKey] = append(groupMap[groupKey], record)
}
}
for _, records := range groupMap {
var avgCPU, avgMemory float64
var validCPU, validMemory int
for _, record := range records {
if !math.IsNaN(record.ProcessCPU) {
avgCPU += record.ProcessCPU
validCPU++
}
if !math.IsNaN(record.ProcessMemory) {
avgMemory += record.ProcessMemory
validMemory++
}
}
avgCPU /= float64(validCPU)
avgMemory /= float64(validMemory)
// take any record and make it as the group meta
firstRecord := records[0]
var groupValues []string
for _, key := range req.GroupBy {
groupValues = append(groupValues, firstRecord.Meta[key.Key])
}
processNames := []string{}
for _, record := range records {
processNames = append(processNames, record.ProcessName)
}
groups = append(groups, model.ProcessListGroup{
GroupValues: groupValues,
GroupCPUAvg: avgCPU,
GroupMemoryAvg: avgMemory,
ProcessNames: processNames,
})
}
resp.Groups = groups
resp.Type = "grouped_list"
}
return resp, nil return resp, nil
} }

View File

@ -1,269 +0,0 @@
package inframetrics
import v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
var K8STableListQuery = v3.QueryRangeParamsV3{
CompositeQuery: &v3.CompositeQuery{
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
QueryName: "A",
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: "system_cpu_time",
DataType: v3.AttributeKeyDataTypeFloat64,
},
Temporality: v3.Cumulative,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{
Key: "state",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
},
Operator: v3.FilterOperatorNotEqual,
Value: "idle",
},
},
},
GroupBy: []v3.AttributeKey{
{
Key: "k8s_node_name",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeResource,
},
{
Key: "os_type",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeResource,
},
},
Expression: "A",
ReduceTo: v3.ReduceToOperatorAvg,
TimeAggregation: v3.TimeAggregationRate,
SpaceAggregation: v3.SpaceAggregationSum,
Disabled: true,
},
"B": {
QueryName: "B",
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: "system_cpu_time",
DataType: v3.AttributeKeyDataTypeFloat64,
},
Temporality: v3.Cumulative,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{},
},
GroupBy: []v3.AttributeKey{
{
Key: "k8s_node_name",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeResource,
},
{
Key: "os_type",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeResource,
},
},
Expression: "B",
ReduceTo: v3.ReduceToOperatorAvg,
TimeAggregation: v3.TimeAggregationRate,
SpaceAggregation: v3.SpaceAggregationSum,
Disabled: true,
},
"F1": {
QueryName: "F1",
Expression: "A/B",
Legend: "CPU Usage (%)",
},
"C": {
QueryName: "C",
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: "system_memory_usage",
DataType: v3.AttributeKeyDataTypeFloat64,
},
Temporality: v3.Cumulative,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{
Key: "state",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
},
Operator: v3.FilterOperatorIn,
Value: []string{"used", "cached"},
},
},
},
GroupBy: []v3.AttributeKey{
{
Key: "k8s_node_name",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeResource,
},
{
Key: "os_type",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeResource,
},
},
Expression: "C",
ReduceTo: v3.ReduceToOperatorAvg,
TimeAggregation: v3.TimeAggregationAvg,
SpaceAggregation: v3.SpaceAggregationSum,
Disabled: true,
},
"D": {
QueryName: "D",
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: "system_memory_usage",
DataType: v3.AttributeKeyDataTypeFloat64,
},
Temporality: v3.Cumulative,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{},
},
GroupBy: []v3.AttributeKey{
{
Key: "k8s_node_name",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeResource,
},
{
Key: "os_type",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeResource,
},
},
Expression: "D",
ReduceTo: v3.ReduceToOperatorAvg,
TimeAggregation: v3.TimeAggregationAvg,
SpaceAggregation: v3.SpaceAggregationSum,
Disabled: true,
},
"F2": {
QueryName: "F2",
Expression: "C/D",
Legend: "Memory Usage (%)",
},
"E": {
QueryName: "E",
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: "system_cpu_time",
DataType: v3.AttributeKeyDataTypeFloat64,
},
Temporality: v3.Cumulative,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{
Key: "state",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
},
Operator: v3.FilterOperatorEqual,
Value: "wait",
},
},
},
GroupBy: []v3.AttributeKey{
{
Key: "k8s_node_name",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeResource,
},
{
Key: "os_type",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeResource,
},
},
Expression: "E",
ReduceTo: v3.ReduceToOperatorAvg,
TimeAggregation: v3.TimeAggregationRate,
SpaceAggregation: v3.SpaceAggregationSum,
Disabled: true,
},
"F": {
QueryName: "F",
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: "system_cpu_time",
DataType: v3.AttributeKeyDataTypeFloat64,
},
Temporality: v3.Cumulative,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{},
},
GroupBy: []v3.AttributeKey{
{
Key: "k8s_node_name",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeResource,
},
{
Key: "os_type",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeResource,
},
},
Expression: "F",
ReduceTo: v3.ReduceToOperatorAvg,
TimeAggregation: v3.TimeAggregationRate,
SpaceAggregation: v3.SpaceAggregationSum,
Disabled: true,
},
"F3": {
QueryName: "F3",
Expression: "E/F",
Legend: "CPU Wait Time (%)",
},
"G": {
QueryName: "G",
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: "system_cpu_load_average_15m",
DataType: v3.AttributeKeyDataTypeFloat64,
},
Temporality: v3.Unspecified,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{},
},
GroupBy: []v3.AttributeKey{
{
Key: "k8s_node_name",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeResource,
},
{
Key: "os_type",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeResource,
},
},
Expression: "G",
ReduceTo: v3.ReduceToOperatorAvg,
TimeAggregation: v3.TimeAggregationAvg,
SpaceAggregation: v3.SpaceAggregationSum,
Legend: "CPU Load Average (15m)",
},
},
PanelType: v3.PanelTypeTable,
QueryType: v3.QueryTypeBuilder,
},
Version: "v4",
FormatForWeb: true,
}

View File

@ -85,6 +85,10 @@ func UseMetricsPreAggregation() bool {
return GetOrDefaultEnv("USE_METRICS_PRE_AGGREGATION", "true") == "true" return GetOrDefaultEnv("USE_METRICS_PRE_AGGREGATION", "true") == "true"
} }
func EnableHostsInfraMonitoring() bool {
return GetOrDefaultEnv("ENABLE_INFRA_METRICS", "true") == "true"
}
var KafkaSpanEval = GetOrDefaultEnv("KAFKA_SPAN_EVAL", "false") var KafkaSpanEval = GetOrDefaultEnv("KAFKA_SPAN_EVAL", "false")
func IsDurationSortFeatureEnabled() bool { func IsDurationSortFeatureEnabled() bool {

View File

@ -23,6 +23,7 @@ const AlertChannelMsTeams = "ALERT_CHANNEL_MSTEAMS"
const AlertChannelOpsgenie = "ALERT_CHANNEL_OPSGENIE" const AlertChannelOpsgenie = "ALERT_CHANNEL_OPSGENIE"
const AlertChannelEmail = "ALERT_CHANNEL_EMAIL" const AlertChannelEmail = "ALERT_CHANNEL_EMAIL"
const AnomalyDetection = "ANOMALY_DETECTION" const AnomalyDetection = "ANOMALY_DETECTION"
const HostsInfraMonitoring = "HOSTS_INFRA_MONITORING"
var BasicPlan = FeatureSet{ var BasicPlan = FeatureSet{
Feature{ Feature{

View File

@ -22,35 +22,19 @@ type HostListRequest struct {
} }
type HostListRecord struct { type HostListRecord struct {
HostName string `json:"hostName"` HostName string `json:"hostName"`
Active bool `json:"active"` Active bool `json:"active"`
OS string `json:"os"` OS string `json:"os"`
CPU float64 `json:"cpu"` CPU float64 `json:"cpu"`
CPUTimeSeries *v3.Series `json:"cpuTimeSeries"` Memory float64 `json:"memory"`
Memory float64 `json:"memory"` Wait float64 `json:"wait"`
MemoryTimeSeries *v3.Series `json:"memoryTimeSeries"` Load15 float64 `json:"load15"`
Wait float64 `json:"wait"` Meta map[string]string `json:"meta"`
WaitTimeSeries *v3.Series `json:"waitTimeSeries"`
Load15 float64 `json:"load15"`
Load15TimeSeries *v3.Series `json:"load15TimeSeries"`
Meta map[string]string `json:"-"`
}
type HostListGroup struct {
GroupValues []string `json:"groupValues"`
Active int `json:"active"`
Inactive int `json:"inactive"`
GroupCPUAvg float64 `json:"groupCPUAvg"`
GroupMemoryAvg float64 `json:"groupMemoryAvg"`
GroupWaitAvg float64 `json:"groupWaitAvg"`
GroupLoad15Avg float64 `json:"groupLoad15Avg"`
HostNames []string `json:"hostNames"`
} }
type HostListResponse struct { type HostListResponse struct {
Type string `json:"type"` Type ResponseType `json:"type"`
Records []HostListRecord `json:"records"` Records []HostListRecord `json:"records"`
Groups []HostListGroup `json:"groups"`
Total int `json:"total"` Total int `json:"total"`
} }
@ -65,29 +49,19 @@ type ProcessListRequest struct {
} }
type ProcessListResponse struct { type ProcessListResponse struct {
Type string `json:"type"` Type ResponseType `json:"type"`
Records []ProcessListRecord `json:"records"` Records []ProcessListRecord `json:"records"`
Groups []ProcessListGroup `json:"groups"`
Total int `json:"total"` Total int `json:"total"`
} }
type ProcessListRecord struct { type ProcessListRecord struct {
ProcessName string `json:"processName"` ProcessName string `json:"processName"`
ProcessID string `json:"processID"` ProcessID string `json:"processID"`
ProcessCMD string `json:"processCMD"` ProcessCMD string `json:"processCMD"`
ProcessCMDLine string `json:"processCMDLine"` ProcessCMDLine string `json:"processCMDLine"`
ProcessCPU float64 `json:"processCPU"` ProcessCPU float64 `json:"processCPU"`
ProcessCPUTimeSeries *v3.Series `json:"processCPUTimeSeries"` ProcessMemory float64 `json:"processMemory"`
ProcessMemory float64 `json:"processMemory"` Meta map[string]string `json:"meta"`
ProcessMemoryTimeSeries *v3.Series `json:"processMemoryTimeSeries"`
Meta map[string]string `json:"-"`
}
type ProcessListGroup struct {
GroupValues []string `json:"groupValues"`
GroupCPUAvg float64 `json:"groupCPUAvg"`
GroupMemoryAvg float64 `json:"groupMemoryAvg"`
ProcessNames []string `json:"processNames"`
} }
type PodListRequest struct { type PodListRequest struct {