package inframetrics import ( "context" "errors" "fmt" "math" "sort" "strings" "time" "github.com/SigNoz/signoz/pkg/query-service/app/metrics/v4/helpers" "github.com/SigNoz/signoz/pkg/query-service/common" "github.com/SigNoz/signoz/pkg/query-service/constants" "github.com/SigNoz/signoz/pkg/query-service/interfaces" "github.com/SigNoz/signoz/pkg/query-service/model" v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3" "github.com/SigNoz/signoz/pkg/query-service/postprocess" "github.com/SigNoz/signoz/pkg/valuer" "go.uber.org/zap" "golang.org/x/exp/maps" "golang.org/x/exp/slices" ) type HostsRepo struct { reader interfaces.Reader querierV2 interfaces.Querier } var ( // we don't have a way to get the resource attributes from the current time series table // but we only want to suggest resource attributes for system metrics, // this is a list of attributes that we skip from all labels as they are data point attributes // TODO(srikanthccv): remove this once we have a way to get resource attributes pointAttrsToIgnore = []string{ "state", "cpu", "device", "direction", "mode", "mountpoint", "type", GetDotMetrics("os_type"), GetDotMetrics("process_cgroup"), GetDotMetrics("process_command"), GetDotMetrics("process_command_line"), GetDotMetrics("process_executable_name"), GetDotMetrics("process_executable_path"), GetDotMetrics("process_owner"), GetDotMetrics("process_parent_pid"), GetDotMetrics("process_pid"), } queryNamesForTopHosts = map[string][]string{ "cpu": {"A", "B", "F1"}, "memory": {"C", "D", "F2"}, "wait": {"E", "F", "F3"}, "load15": {"G"}, } // TODO(srikanthccv): remove hardcoded metric name and support keys from any system metric metricToUseForHostAttributes = GetDotMetrics("system_cpu_load_average_15m") hostNameAttrKey = GetDotMetrics("host_name") agentNameToIgnore = "k8s-infra-otel-agent" hostAttrsToEnrich = []string{ GetDotMetrics("os_type"), } metricNamesForHosts = map[string]string{ "cpu": GetDotMetrics("system_cpu_time"), "memory": GetDotMetrics("system_memory_usage"), "load15": GetDotMetrics("system_cpu_load_average_15m"), "wait": GetDotMetrics("system_cpu_time"), } ) func NewHostsRepo(reader interfaces.Reader, querierV2 interfaces.Querier) *HostsRepo { return &HostsRepo{reader: reader, querierV2: querierV2} } func (h *HostsRepo) GetHostAttributeKeys(ctx context.Context, req v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) { // TODO(srikanthccv): remove hardcoded metric name and support keys from any system metric req.DataSource = v3.DataSourceMetrics req.AggregateAttribute = metricToUseForHostAttributes if req.Limit == 0 { req.Limit = 50 } attributeKeysResponse, err := h.reader.GetMetricAttributeKeys(ctx, &req) if err != nil { return nil, err } // TODO(srikanthccv): only return resource attributes when we have a way to // distinguish between resource attributes and other attributes. filteredKeys := []v3.AttributeKey{} for _, key := range attributeKeysResponse.AttributeKeys { if slices.Contains(pointAttrsToIgnore, key.Key) { continue } filteredKeys = append(filteredKeys, key) } return &v3.FilterAttributeKeyResponse{AttributeKeys: filteredKeys}, nil } func (h *HostsRepo) GetHostAttributeValues(ctx context.Context, req v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error) { req.DataSource = v3.DataSourceMetrics req.AggregateAttribute = metricToUseForHostAttributes if req.Limit == 0 { req.Limit = 50 } attributeValuesResponse, err := h.reader.GetMetricAttributeValues(ctx, &req) if err != nil { return nil, err } if req.FilterAttributeKey != hostNameAttrKey { return attributeValuesResponse, nil } hostNames := []string{} for _, attributeValue := range attributeValuesResponse.StringAttributeValues { if strings.Contains(attributeValue, agentNameToIgnore) { continue } hostNames = append(hostNames, attributeValue) } return &v3.FilterAttributeValueResponse{StringAttributeValues: hostNames}, nil } func (h *HostsRepo) getActiveHosts(ctx context.Context, orgID valuer.UUID, req model.HostListRequest) (map[string]bool, error) { activeStatus := map[string]bool{} step := common.MinAllowedStepInterval(req.Start, req.End) hasHostName := false for _, key := range req.GroupBy { if key.Key == hostNameAttrKey { hasHostName = true } } if !hasHostName { req.GroupBy = append(req.GroupBy, v3.AttributeKey{Key: hostNameAttrKey}) } params := v3.QueryRangeParamsV3{ Start: time.Now().Add(-time.Minute * 10).UTC().UnixMilli(), End: time.Now().UTC().UnixMilli(), Step: step, CompositeQuery: &v3.CompositeQuery{ BuilderQueries: map[string]*v3.BuilderQuery{ "A": { QueryName: "A", StepInterval: step, DataSource: v3.DataSourceMetrics, AggregateAttribute: v3.AttributeKey{ Key: metricToUseForHostAttributes, DataType: v3.AttributeKeyDataTypeFloat64, }, Temporality: v3.Unspecified, Filters: req.Filters, GroupBy: req.GroupBy, Expression: "A", TimeAggregation: v3.TimeAggregationAvg, SpaceAggregation: v3.SpaceAggregationAvg, Disabled: false, }, }, QueryType: v3.QueryTypeBuilder, PanelType: v3.PanelTypeGraph, }, } queryResponse, _, err := h.querierV2.QueryRange(ctx, orgID, ¶ms) if err != nil { return nil, err } for _, result := range queryResponse { for _, series := range result.Series { name := series.Labels[hostNameAttrKey] activeStatus[name] = true } } return activeStatus, nil } func (h *HostsRepo) getMetadataAttributes(ctx context.Context, req model.HostListRequest) (map[string]map[string]string, error) { hostAttrs := map[string]map[string]string{} for _, key := range hostAttrsToEnrich { hasKey := false for _, groupByKey := range req.GroupBy { if groupByKey.Key == key { hasKey = true break } } if !hasKey { req.GroupBy = append(req.GroupBy, v3.AttributeKey{Key: key}) } } mq := v3.BuilderQuery{ DataSource: v3.DataSourceMetrics, AggregateAttribute: v3.AttributeKey{ Key: metricToUseForHostAttributes, DataType: v3.AttributeKeyDataTypeFloat64, }, Temporality: v3.Unspecified, GroupBy: req.GroupBy, } query, err := helpers.PrepareTimeseriesFilterQuery(req.Start, req.End, &mq) if err != nil { return nil, err } query = localQueryToDistributedQuery(query) attrsListResponse, err := h.reader.GetListResultV3(ctx, query) if err != nil { return nil, err } for _, row := range attrsListResponse { stringData := map[string]string{} for key, value := range row.Data { if str, ok := value.(string); ok { stringData[key] = str } else if strPtr, ok := value.(*string); ok { stringData[key] = *strPtr } } hostName := stringData[hostNameAttrKey] if _, ok := hostAttrs[hostName]; !ok { hostAttrs[hostName] = map[string]string{} } for _, key := range req.GroupBy { hostAttrs[hostName][key.Key] = stringData[key.Key] } } return hostAttrs, nil } func (h *HostsRepo) getTopHostGroups(ctx context.Context, orgID valuer.UUID, req model.HostListRequest, q *v3.QueryRangeParamsV3) ([]map[string]string, []map[string]string, error) { step, timeSeriesTableName, samplesTableName := getParamsForTopHosts(req) queryNames := queryNamesForTopHosts[req.OrderBy.ColumnName] topHostGroupsQueryRangeParams := &v3.QueryRangeParamsV3{ Start: req.Start, End: req.End, Step: step, CompositeQuery: &v3.CompositeQuery{ BuilderQueries: map[string]*v3.BuilderQuery{}, QueryType: v3.QueryTypeBuilder, PanelType: v3.PanelTypeTable, }, } for _, queryName := range queryNames { query := q.CompositeQuery.BuilderQueries[queryName].Clone() query.StepInterval = step query.MetricTableHints = &v3.MetricTableHints{ TimeSeriesTableName: timeSeriesTableName, SamplesTableName: samplesTableName, } if req.Filters != nil && len(req.Filters.Items) > 0 { query.Filters.Items = append(query.Filters.Items, req.Filters.Items...) } topHostGroupsQueryRangeParams.CompositeQuery.BuilderQueries[queryName] = query } queryResponse, _, err := h.querierV2.QueryRange(ctx, orgID, topHostGroupsQueryRangeParams) if err != nil { return nil, nil, err } formattedResponse, err := postprocess.PostProcessResult(queryResponse, topHostGroupsQueryRangeParams) if err != nil { return nil, nil, err } if len(formattedResponse) == 0 || len(formattedResponse[0].Series) == 0 { return nil, nil, nil } if req.OrderBy.Order == v3.DirectionDesc { sort.Slice(formattedResponse[0].Series, func(i, j int) bool { return formattedResponse[0].Series[i].Points[0].Value > formattedResponse[0].Series[j].Points[0].Value }) } else { sort.Slice(formattedResponse[0].Series, func(i, j int) bool { return formattedResponse[0].Series[i].Points[0].Value < formattedResponse[0].Series[j].Points[0].Value }) } limit := math.Min(float64(req.Offset+req.Limit), float64(len(formattedResponse[0].Series))) paginatedTopHostGroupsSeries := formattedResponse[0].Series[req.Offset:int(limit)] topHostGroups := []map[string]string{} for _, series := range paginatedTopHostGroupsSeries { topHostGroups = append(topHostGroups, series.Labels) } allHostGroups := []map[string]string{} for _, series := range formattedResponse[0].Series { allHostGroups = append(allHostGroups, series.Labels) } return topHostGroups, allHostGroups, nil } func (h *HostsRepo) DidSendHostMetricsData(ctx context.Context, req model.HostListRequest) (bool, error) { names := []string{} for _, metricName := range metricNamesForHosts { names = append(names, metricName) } namesStr := "'" + strings.Join(names, "','") + "'" query := fmt.Sprintf("SELECT count() FROM %s.%s WHERE metric_name IN (%s)", constants.SIGNOZ_METRIC_DBNAME, constants.SIGNOZ_TIMESERIES_v4_1DAY_TABLENAME, namesStr) count, err := h.reader.GetCountOfThings(ctx, query) if err != nil { return false, err } return count > 0, nil } func (h *HostsRepo) IsSendingK8SAgentMetrics(ctx context.Context, req model.HostListRequest) ([]string, []string, error) { names := []string{} for _, metricName := range metricNamesForHosts { names = append(names, metricName) } namesStr := "'" + strings.Join(names, "','") + "'" queryForRecentFingerprints := fmt.Sprintf(` SELECT DISTINCT fingerprint FROM %s.%s WHERE metric_name IN (%s) AND unix_milli >= toUnixTimestamp(now() - INTERVAL 5 MINUTE) * 1000`, constants.SIGNOZ_METRIC_DBNAME, constants.SIGNOZ_SAMPLES_V4_TABLENAME, namesStr) query := fmt.Sprintf(` SELECT DISTINCT JSONExtractString(labels, '%s') as k8s_cluster_name, JSONExtractString(labels, '%s') as k8s_node_name FROM %s.%s WHERE metric_name IN (%s) AND unix_milli >= toUnixTimestamp(now() - INTERVAL 60 MINUTE) * 1000 AND JSONExtractString(labels, '%s') LIKE '%%-otel-agent%%' AND fingerprint GLOBAL IN (%s)`, GetDotMetrics("k8s_cluster_name"), GetDotMetrics("k8s_node_name"), constants.SIGNOZ_METRIC_DBNAME, constants.SIGNOZ_TIMESERIES_V4_TABLENAME, namesStr, GetDotMetrics("host_name"), queryForRecentFingerprints) result, err := h.reader.GetListResultV3(ctx, query) if err != nil { return nil, nil, err } clusterNames := make(map[string]struct{}) nodeNames := make(map[string]struct{}) for _, row := range result { switch v := row.Data[GetDotMetrics("k8s_cluster_name")].(type) { case string: clusterNames[v] = struct{}{} case *string: clusterNames[*v] = struct{}{} } switch v := row.Data[GetDotMetrics("k8s_node_name")].(type) { case string: nodeNames[v] = struct{}{} case *string: nodeNames[*v] = struct{}{} } } return maps.Keys(clusterNames), maps.Keys(nodeNames), nil } func (h *HostsRepo) GetHostList(ctx context.Context, orgID valuer.UUID, req model.HostListRequest) (model.HostListResponse, error) { resp := model.HostListResponse{} if req.Limit == 0 { req.Limit = 10 } // default to cpu order by if req.OrderBy == nil { req.OrderBy = &v3.OrderBy{ColumnName: "cpu", Order: v3.DirectionDesc} } // default to host name group by if len(req.GroupBy) == 0 { req.GroupBy = []v3.AttributeKey{{Key: hostNameAttrKey}} resp.Type = model.ResponseTypeList } else { resp.Type = model.ResponseTypeGroupedList } // don't fail the request if we can't get these values if clusterNames, nodeNames, err := h.IsSendingK8SAgentMetrics(ctx, req); err == nil { resp.IsSendingK8SAgentMetrics = len(clusterNames) > 0 || len(nodeNames) > 0 resp.ClusterNames = clusterNames resp.NodeNames = nodeNames } if sentAnyHostMetricsData, err := h.DidSendHostMetricsData(ctx, req); err == nil { resp.SentAnyHostMetricsData = sentAnyHostMetricsData } step := int64(math.Max(float64(common.MinAllowedStepInterval(req.Start, req.End)), 60)) if step <= 0 { zap.L().Error("step is less than or equal to 0", zap.Int64("step", step)) return resp, errors.New("step is less than or equal to 0") } query := HostsTableListQuery.Clone() query.Start = req.Start query.End = req.End query.Step = step for _, query := range query.CompositeQuery.BuilderQueries { query.StepInterval = step if req.Filters != nil && len(req.Filters.Items) > 0 { query.Filters.Items = append(query.Filters.Items, req.Filters.Items...) } query.GroupBy = req.GroupBy } hostAttrs, err := h.getMetadataAttributes(ctx, req) if err != nil { return resp, err } activeHosts, err := h.getActiveHosts(ctx, orgID, req) if err != nil { return resp, err } topHostGroups, allHostGroups, err := h.getTopHostGroups(ctx, orgID, req, query) if err != nil { return resp, err } groupFilters := map[string][]string{} for _, topHostGroup := range topHostGroups { for k, v := range topHostGroup { groupFilters[k] = append(groupFilters[k], v) } } for groupKey, groupValues := range groupFilters { hasGroupFilter := false if req.Filters != nil && len(req.Filters.Items) > 0 { for _, filter := range req.Filters.Items { if filter.Key.Key == groupKey { hasGroupFilter = true break } } } if !hasGroupFilter { for _, query := range query.CompositeQuery.BuilderQueries { query.Filters.Items = append(query.Filters.Items, v3.FilterItem{ Key: v3.AttributeKey{Key: groupKey}, Value: groupValues, Operator: v3.FilterOperatorIn, }) } } } queryResponse, _, err := h.querierV2.QueryRange(ctx, orgID, query) if err != nil { return resp, err } formattedResponse, err := postprocess.PostProcessResult(queryResponse, query) if err != nil { return resp, err } records := []model.HostListRecord{} for _, result := range formattedResponse { for _, row := range result.Table.Rows { record := model.HostListRecord{ CPU: -1, Memory: -1, Wait: -1, Load15: -1, } if hostName, ok := row.Data[hostNameAttrKey].(string); ok { record.HostName = hostName } if cpu, ok := row.Data["F1"].(float64); ok { record.CPU = cpu } if memory, ok := row.Data["F2"].(float64); ok { record.Memory = memory } if wait, ok := row.Data["F3"].(float64); ok { record.Wait = wait } if load15, ok := row.Data["G"].(float64); ok { record.Load15 = load15 } record.Meta = map[string]string{} if _, ok := hostAttrs[record.HostName]; ok { record.Meta = hostAttrs[record.HostName] } if osType, ok := record.Meta["os_type"]; ok { record.OS = osType } record.Active = activeHosts[record.HostName] records = append(records, record) } } resp.Total = len(allHostGroups) resp.Records = records resp.SortBy(req.OrderBy) return resp, nil }