From 1720d616f602a0aa17979f8c563b941f7ee9523b Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Thu, 10 Oct 2024 17:02:46 +0530 Subject: [PATCH] chore: add hosts list support (#6123) --- ee/query-service/app/server.go | 1 + pkg/query-service/app/http_handler.go | 13 + pkg/query-service/app/infra.go | 71 +++ pkg/query-service/app/inframetrics/hosts.go | 512 ++++++++++++++++++ .../app/inframetrics/table_list_query.go | 337 ++++++++++++ .../app/inframetrics/table_list_query_k8s.go | 269 +++++++++ pkg/query-service/app/querier/v2/querier.go | 3 + pkg/query-service/app/server.go | 1 + pkg/query-service/model/infra.go | 46 ++ 9 files changed, 1253 insertions(+) create mode 100644 pkg/query-service/app/infra.go create mode 100644 pkg/query-service/app/inframetrics/hosts.go create mode 100644 pkg/query-service/app/inframetrics/table_list_query.go create mode 100644 pkg/query-service/app/inframetrics/table_list_query_k8s.go create mode 100644 pkg/query-service/model/infra.go diff --git a/ee/query-service/app/server.go b/ee/query-service/app/server.go index 7468be4698..cf54693ba8 100644 --- a/ee/query-service/app/server.go +++ b/ee/query-service/app/server.go @@ -364,6 +364,7 @@ func (s *Server) createPublicServer(apiHandler *api.APIHandler) (*http.Server, e apiHandler.RegisterLogsRoutes(r, am) apiHandler.RegisterIntegrationRoutes(r, am) apiHandler.RegisterQueryRangeV3Routes(r, am) + apiHandler.RegisterInfraMetricsRoutes(r, am) apiHandler.RegisterQueryRangeV4Routes(r, am) apiHandler.RegisterWebSocketPaths(r, am) apiHandler.RegisterMessagingQueuesRoutes(r, am) diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 546235437f..f4c6b53269 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -27,6 +27,7 @@ import ( "go.signoz.io/signoz/pkg/query-service/agentConf" "go.signoz.io/signoz/pkg/query-service/app/dashboards" "go.signoz.io/signoz/pkg/query-service/app/explorer" + "go.signoz.io/signoz/pkg/query-service/app/inframetrics" "go.signoz.io/signoz/pkg/query-service/app/integrations" "go.signoz.io/signoz/pkg/query-service/app/logs" logsv3 "go.signoz.io/signoz/pkg/query-service/app/logs/v3" @@ -110,6 +111,8 @@ type APIHandler struct { Upgrader *websocket.Upgrader UseLogsNewSchema bool + + hostsRepo *inframetrics.HostsRepo } type APIHandlerOpts struct { @@ -179,6 +182,8 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) { querier := querier.NewQuerier(querierOpts) querierv2 := querierV2.NewQuerier(querierOptsV2) + hostsRepo := inframetrics.NewHostsRepo(opts.Reader, querierv2) + aH := &APIHandler{ reader: opts.Reader, appDao: opts.AppDao, @@ -196,6 +201,7 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) { querier: querier, querierV2: querierv2, UseLogsNewSchema: opts.UseLogsNewSchema, + hostsRepo: hostsRepo, } logsQueryBuilder := logsv3.PrepareLogsQuery @@ -344,6 +350,13 @@ func (aH *APIHandler) RegisterQueryRangeV3Routes(router *mux.Router, am *AuthMid subRouter.HandleFunc("/logs/livetail", am.ViewAccess(aH.liveTailLogs)).Methods(http.MethodGet) } +func (aH *APIHandler) RegisterInfraMetricsRoutes(router *mux.Router, am *AuthMiddleware) { + subRouter := router.PathPrefix("/api/v1/hosts").Subrouter() + subRouter.HandleFunc("/attribute_keys", am.ViewAccess(aH.getHostAttributeKeys)).Methods(http.MethodGet) + subRouter.HandleFunc("/attribute_values", am.ViewAccess(aH.getHostAttributeValues)).Methods(http.MethodGet) + subRouter.HandleFunc("/list", am.ViewAccess(aH.getHostList)).Methods(http.MethodPost) +} + func (aH *APIHandler) RegisterWebSocketPaths(router *mux.Router, am *AuthMiddleware) { subRouter := router.PathPrefix("/ws").Subrouter() subRouter.HandleFunc("/query_progress", am.ViewAccess(aH.GetQueryProgressUpdates)).Methods(http.MethodGet) diff --git a/pkg/query-service/app/infra.go b/pkg/query-service/app/infra.go new file mode 100644 index 0000000000..bc0557543c --- /dev/null +++ b/pkg/query-service/app/infra.go @@ -0,0 +1,71 @@ +package app + +import ( + "encoding/json" + "net/http" + + "go.signoz.io/signoz/pkg/query-service/model" +) + +func (aH *APIHandler) getHostAttributeKeys(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + req, err := parseFilterAttributeKeyRequest(r) + + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) + return + } + + // get attribute keys + keys, err := aH.hostsRepo.GetHostAttributeKeys(ctx, *req) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) + return + } + + // write response + aH.Respond(w, keys) +} + +func (aH *APIHandler) getHostAttributeValues(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + // parse request + req, err := parseFilterAttributeValueRequest(r) + + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) + return + } + + // get attribute values + values, err := aH.hostsRepo.GetHostAttributeValues(ctx, *req) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) + return + } + + // write response + aH.Respond(w, values) +} + +func (aH *APIHandler) getHostList(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + req := model.HostListRequest{} + + // parse request + err := json.NewDecoder(r.Body).Decode(&req) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) + return + } + + // get host list + hostList, err := aH.hostsRepo.GetHostList(ctx, req) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) + return + } + + // write response + aH.Respond(w, hostList) +} diff --git a/pkg/query-service/app/inframetrics/hosts.go b/pkg/query-service/app/inframetrics/hosts.go new file mode 100644 index 0000000000..1e99454244 --- /dev/null +++ b/pkg/query-service/app/inframetrics/hosts.go @@ -0,0 +1,512 @@ +package inframetrics + +import ( + "context" + "fmt" + "math" + "strings" + "time" + + "go.signoz.io/signoz/pkg/query-service/app/metrics/v4/helpers" + "go.signoz.io/signoz/pkg/query-service/common" + "go.signoz.io/signoz/pkg/query-service/interfaces" + "go.signoz.io/signoz/pkg/query-service/model" + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + "go.signoz.io/signoz/pkg/query-service/postprocess" + "golang.org/x/exp/slices" +) + +type HostsRepo struct { + reader interfaces.Reader + querierV2 interfaces.Querier +} + +var pointAttrsToIgnore = []string{ + "state", + "cpu", + "device", + "direction", + "mode", + "mountpoint", + "type", + "process.cgroup", + "process.command", + "process.command_line", + "process.executable.name", + "process.executable.path", + "process.owner", + "process.parent_pid", + "process.pid", +} + +func NewHostsRepo(reader interfaces.Reader, querierV2 interfaces.Querier) *HostsRepo { + return &HostsRepo{reader: reader, querierV2: querierV2} +} + +func (h *HostsRepo) GetHostAttributeKeys(ctx context.Context, req v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) { + // TODO(srikanthccv): remove hardcoded metric name and support keys from any system metric + req.DataSource = v3.DataSourceMetrics + req.AggregateAttribute = "system_cpu_load_average_15m" + if req.Limit == 0 { + req.Limit = 50 + } + + attributeKeysResponse, err := h.reader.GetMetricAttributeKeys(ctx, &req) + if err != nil { + return nil, err + } + + // TODO(srikanthccv): only return resource attributes when we have a way to + // distinguish between resource attributes and other attributes. + filteredKeys := []v3.AttributeKey{} + for _, key := range attributeKeysResponse.AttributeKeys { + if slices.Contains(pointAttrsToIgnore, key.Key) { + continue + } + filteredKeys = append(filteredKeys, key) + } + + return &v3.FilterAttributeKeyResponse{AttributeKeys: filteredKeys}, nil +} + +func (h *HostsRepo) GetHostAttributeValues(ctx context.Context, req v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error) { + req.DataSource = v3.DataSourceMetrics + req.AggregateAttribute = "system_cpu_load_average_15m" + if req.Limit == 0 { + req.Limit = 50 + } + + attributeValuesResponse, err := h.reader.GetMetricAttributeValues(ctx, &req) + if err != nil { + return nil, err + } + if req.FilterAttributeKey != "host_name" { + return attributeValuesResponse, nil + } + hostNames := []string{} + + for _, attributeValue := range attributeValuesResponse.StringAttributeValues { + if strings.Contains(attributeValue, "k8s-infra-otel-agent") { + continue + } + hostNames = append(hostNames, attributeValue) + } + + req.FilterAttributeKey = "k8s_node_name" + req.DataSource = v3.DataSourceMetrics + req.AggregateAttribute = "system_cpu_load_average_15m" + if req.Limit == 0 { + req.Limit = 50 + } + + attributeValuesResponse, err = h.reader.GetMetricAttributeValues(ctx, &req) + if err != nil { + return nil, err + } + for _, attributeValue := range attributeValuesResponse.StringAttributeValues { + if strings.Contains(attributeValue, "k8s-infra-otel-agent") { + continue + } + hostNames = append(hostNames, attributeValue) + } + + return &v3.FilterAttributeValueResponse{StringAttributeValues: hostNames}, nil +} + +func getGroupKey(record model.HostListRecord, groupBy []v3.AttributeKey) string { + groupKey := "" + for _, key := range groupBy { + groupKey += fmt.Sprintf("%s=%s,", key.Key, record.Meta[key.Key]) + } + return groupKey +} + +func (h *HostsRepo) getMetadataAttributes(ctx context.Context, + req model.HostListRequest, hostNameAttrKey string) (map[string]map[string]string, error) { + hostAttrs := map[string]map[string]string{} + + hasHostName := false + for _, key := range req.GroupBy { + if key.Key == hostNameAttrKey { + hasHostName = true + } + } + + if !hasHostName { + req.GroupBy = append(req.GroupBy, v3.AttributeKey{Key: hostNameAttrKey}) + } + + mq := v3.BuilderQuery{ + AggregateAttribute: v3.AttributeKey{ + Key: "system_cpu_load_average_15m", + DataType: v3.AttributeKeyDataTypeFloat64, + }, + Temporality: v3.Unspecified, + GroupBy: req.GroupBy, + } + query, err := helpers.PrepareTimeseriesFilterQuery(req.Start, req.End, &mq) + if err != nil { + return nil, err + } + + // TODO(srikanthccv): remove this + // What is happening here? + // The `PrepareTimeseriesFilterQuery` uses the local time series table for sub-query because each fingerprint + // goes to same shard. + // However, in this case, we are interested in the attributes values across all the shards. + // So, we replace the local time series table with the distributed time series table. + // See `PrepareTimeseriesFilterQuery` for more details. + query = strings.Replace(query, ".time_series_v4", ".distributed_time_series_v4", 1) + + attrsListResponse, err := h.reader.GetListResultV3(ctx, query) + if err != nil { + return nil, err + } + + for _, row := range attrsListResponse { + stringData := map[string]string{} + for key, value := range row.Data { + if str, ok := value.(string); ok { + stringData[key] = str + } else if strPtr, ok := value.(*string); ok { + stringData[key] = *strPtr + } + } + + hostName := stringData[hostNameAttrKey] + if _, ok := hostAttrs[hostName]; !ok { + hostAttrs[hostName] = map[string]string{} + } + for _, key := range req.GroupBy { + hostAttrs[hostName][key.Key] = stringData[key.Key] + } + } + + return hostAttrs, nil +} + +func (h *HostsRepo) getActiveHosts(ctx context.Context, + req model.HostListRequest, hostNameAttrKey string) (map[string]bool, error) { + activeStatus := map[string]bool{} + step := common.MinAllowedStepInterval(req.Start, req.End) + + hasHostName := false + for _, key := range req.GroupBy { + if key.Key == hostNameAttrKey { + hasHostName = true + } + } + + if !hasHostName { + req.GroupBy = append(req.GroupBy, v3.AttributeKey{Key: hostNameAttrKey}) + } + + params := v3.QueryRangeParamsV3{ + Start: time.Now().Add(-time.Hour).UTC().UnixMilli(), + End: time.Now().UTC().UnixMilli(), + Step: step, + CompositeQuery: &v3.CompositeQuery{ + BuilderQueries: map[string]*v3.BuilderQuery{ + "A": { + QueryName: "A", + StepInterval: step, + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: "system_cpu_load_average_15m", + DataType: v3.AttributeKeyDataTypeFloat64, + }, + Temporality: v3.Unspecified, + Filters: req.Filters, + GroupBy: req.GroupBy, + Expression: "A", + TimeAggregation: v3.TimeAggregationAvg, + SpaceAggregation: v3.SpaceAggregationAvg, + Disabled: false, + }, + }, + QueryType: v3.QueryTypeBuilder, + PanelType: v3.PanelTypeGraph, + }, + } + + queryResponse, _, err := h.querierV2.QueryRange(ctx, ¶ms) + if err != nil { + return nil, err + } + + for _, result := range queryResponse { + for _, series := range result.Series { + name := series.Labels[hostNameAttrKey] + activeStatus[name] = true + } + } + + return activeStatus, nil +} + +func (h *HostsRepo) getHostsForQuery(ctx context.Context, + req model.HostListRequest, q *v3.QueryRangeParamsV3, hostNameAttrKey string) ([]model.HostListRecord, error) { + + step := common.MinAllowedStepInterval(req.Start, req.End) + + query := q.Clone() + if req.OrderBy != nil { + for _, q := range query.CompositeQuery.BuilderQueries { + q.OrderBy = []v3.OrderBy{*req.OrderBy} + } + } + + query.Start = req.Start + query.End = req.End + query.Step = step + + for _, query := range query.CompositeQuery.BuilderQueries { + query.StepInterval = step + if req.Filters != nil && len(req.Filters.Items) > 0 { + if query.Filters == nil { + query.Filters = &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{}} + } + query.Filters.Items = append(query.Filters.Items, req.Filters.Items...) + // what is happening here? + // if the filter has host_name and we are querying for k8s host metrics, + // we need to replace the host_name with k8s_node_name + if hostNameAttrKey == "k8s_node_name" { + for idx, item := range query.Filters.Items { + if item.Key.Key == "host_name" { + query.Filters.Items[idx].Key.Key = "k8s_node_name" + } + } + } + } + } + + hostAttrs, err := h.getMetadataAttributes(ctx, req, hostNameAttrKey) + if err != nil { + return nil, err + } + + activeHosts, err := h.getActiveHosts(ctx, req, hostNameAttrKey) + if err != nil { + return nil, err + } + + queryResponse, _, err := h.querierV2.QueryRange(ctx, query) + if err != nil { + return nil, err + } + + type hostTSInfo struct { + cpuTimeSeries *v3.Series + memoryTimeSeries *v3.Series + waitTimeSeries *v3.Series + load15TimeSeries *v3.Series + } + hostTSInfoMap := map[string]*hostTSInfo{} + + for _, result := range queryResponse { + for _, series := range result.Series { + hostName := series.Labels[hostNameAttrKey] + if _, ok := hostTSInfoMap[hostName]; !ok { + hostTSInfoMap[hostName] = &hostTSInfo{} + } + if result.QueryName == "G" { + loadSeries := *series + hostTSInfoMap[hostName].load15TimeSeries = &loadSeries + } + } + } + + query.FormatForWeb = false + query.CompositeQuery.PanelType = v3.PanelTypeGraph + + formulaResult, err := postprocess.PostProcessResult(queryResponse, query) + if err != nil { + return nil, err + } + + for _, result := range formulaResult { + for _, series := range result.Series { + hostName := series.Labels[hostNameAttrKey] + if _, ok := hostTSInfoMap[hostName]; !ok { + hostTSInfoMap[hostName] = &hostTSInfo{} + } + if result.QueryName == "F1" { + hostTSInfoMap[hostName].cpuTimeSeries = series + } else if result.QueryName == "F2" { + hostTSInfoMap[hostName].memoryTimeSeries = series + } else if result.QueryName == "F3" { + hostTSInfoMap[hostName].waitTimeSeries = series + } + } + } + + query.FormatForWeb = true + query.CompositeQuery.PanelType = v3.PanelTypeTable + formattedResponse, _ := postprocess.PostProcessResult(queryResponse, query) + + records := []model.HostListRecord{} + + // there should be only one result in the response + hostsInfo := formattedResponse[0] + // each row represents a host + for _, row := range hostsInfo.Table.Rows { + record := model.HostListRecord{ + CPU: -1, + Memory: -1, + Wait: -1, + Load15: -1, + } + + hostName, ok := row.Data[hostNameAttrKey].(string) + if ok { + record.HostName = hostName + } + + osType, ok := row.Data["os_type"].(string) + if ok { + record.OS = osType + } + + cpu, ok := row.Data["F1"].(float64) + if ok { + record.CPU = cpu + } + memory, ok := row.Data["F2"].(float64) + if ok { + record.Memory = memory + } + wait, ok := row.Data["F3"].(float64) + if ok { + record.Wait = wait + } + load15, ok := row.Data["G"].(float64) + if ok { + record.Load15 = load15 + } + record.Meta = hostAttrs[record.HostName] + record.Active = activeHosts[record.HostName] + if hostTSInfoMap[record.HostName] != nil { + record.CPUTimeSeries = hostTSInfoMap[record.HostName].cpuTimeSeries + record.MemoryTimeSeries = hostTSInfoMap[record.HostName].memoryTimeSeries + record.WaitTimeSeries = hostTSInfoMap[record.HostName].waitTimeSeries + record.Load15TimeSeries = hostTSInfoMap[record.HostName].load15TimeSeries + } + records = append(records, record) + } + + if req.Offset > 0 { + records = records[req.Offset:] + } + if req.Limit > 0 && len(records) > req.Limit { + records = records[:req.Limit] + } + + return records, nil +} + +func dedupRecords(records []model.HostListRecord) []model.HostListRecord { + seen := map[string]bool{} + deduped := []model.HostListRecord{} + for _, record := range records { + if !seen[record.HostName] { + seen[record.HostName] = true + deduped = append(deduped, record) + } + } + return deduped +} + +func (h *HostsRepo) GetHostList(ctx context.Context, req model.HostListRequest) (model.HostListResponse, error) { + resp := model.HostListResponse{ + Type: "list", + } + + vmRecords, err := h.getHostsForQuery(ctx, req, &NonK8STableListQuery, "host_name") + if err != nil { + return resp, err + } + k8sRecords, err := h.getHostsForQuery(ctx, req, &K8STableListQuery, "k8s_node_name") + if err != nil { + return resp, err + } + + records := append(vmRecords, k8sRecords...) + + // since we added the fix for incorrect host name, it is possible that both host_name and k8s_node_name + // are present in the response. we need to dedup the results. + records = dedupRecords(records) + + if len(req.GroupBy) > 0 { + groups := []model.HostListGroup{} + + groupMap := make(map[string][]model.HostListRecord) + for _, record := range records { + groupKey := getGroupKey(record, req.GroupBy) + if _, ok := groupMap[groupKey]; !ok { + groupMap[groupKey] = []model.HostListRecord{record} + } else { + groupMap[groupKey] = append(groupMap[groupKey], record) + } + } + + // calculate the group stats, active hosts, etc. + for _, records := range groupMap { + var avgCPU, avgMemory, avgWait, avgLoad15 float64 + var validCPU, validMemory, validWait, validLoad15, activeHosts int + for _, record := range records { + if !math.IsNaN(record.CPU) { + avgCPU += record.CPU + validCPU++ + } + if !math.IsNaN(record.Memory) { + avgMemory += record.Memory + validMemory++ + } + if !math.IsNaN(record.Wait) { + avgWait += record.Wait + validWait++ + } + if !math.IsNaN(record.Load15) { + avgLoad15 += record.Load15 + validLoad15++ + } + if record.Active { + activeHosts++ + } + } + avgCPU /= float64(validCPU) + avgMemory /= float64(validMemory) + avgWait /= float64(validWait) + avgLoad15 /= float64(validLoad15) + inactiveHosts := len(records) - activeHosts + + // take any record and make it as the group meta + firstRecord := records[0] + var groupValues []string + for _, key := range req.GroupBy { + groupValues = append(groupValues, firstRecord.Meta[key.Key]) + } + hostNames := []string{} + for _, record := range records { + hostNames = append(hostNames, record.HostName) + } + + groups = append(groups, model.HostListGroup{ + GroupValues: groupValues, + Active: activeHosts, + Inactive: inactiveHosts, + GroupCPUAvg: avgCPU, + GroupMemoryAvg: avgMemory, + GroupWaitAvg: avgWait, + GroupLoad15Avg: avgLoad15, + HostNames: hostNames, + }) + } + resp.Groups = groups + resp.Type = "grouped_list" + } + resp.Records = records + resp.Total = len(records) + + return resp, nil +} diff --git a/pkg/query-service/app/inframetrics/table_list_query.go b/pkg/query-service/app/inframetrics/table_list_query.go new file mode 100644 index 0000000000..dd44511348 --- /dev/null +++ b/pkg/query-service/app/inframetrics/table_list_query.go @@ -0,0 +1,337 @@ +package inframetrics + +import v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + +var NonK8STableListQuery = v3.QueryRangeParamsV3{ + CompositeQuery: &v3.CompositeQuery{ + BuilderQueries: map[string]*v3.BuilderQuery{ + "A": { + QueryName: "A", + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: "system_cpu_time", + DataType: v3.AttributeKeyDataTypeFloat64, + }, + Temporality: v3.Cumulative, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{ + Key: "state", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + Operator: v3.FilterOperatorNotEqual, + Value: "idle", + }, + { + Key: v3.AttributeKey{ + Key: "host_name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeResource, + }, + Operator: v3.FilterOperatorNotContains, + Value: "k8s-infra-otel-agent", + }, + }, + }, + GroupBy: []v3.AttributeKey{ + { + Key: "host_name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeResource, + }, + { + Key: "os_type", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeResource, + }, + }, + Expression: "A", + ReduceTo: v3.ReduceToOperatorAvg, + TimeAggregation: v3.TimeAggregationRate, + SpaceAggregation: v3.SpaceAggregationSum, + Disabled: true, + }, + "B": { + QueryName: "B", + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: "system_cpu_time", + DataType: v3.AttributeKeyDataTypeFloat64, + }, + Temporality: v3.Cumulative, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{ + Key: "host_name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeResource, + }, + Operator: v3.FilterOperatorNotContains, + Value: "k8s-infra-otel-agent", + }, + }, + }, + GroupBy: []v3.AttributeKey{ + { + Key: "host_name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeResource, + }, + { + Key: "os_type", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeResource, + }, + }, + Expression: "B", + ReduceTo: v3.ReduceToOperatorAvg, + TimeAggregation: v3.TimeAggregationRate, + SpaceAggregation: v3.SpaceAggregationSum, + Disabled: true, + }, + "F1": { + QueryName: "F1", + Expression: "A/B", + Legend: "CPU Usage (%)", + }, + "C": { + QueryName: "C", + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: "system_memory_usage", + DataType: v3.AttributeKeyDataTypeFloat64, + }, + Temporality: v3.Cumulative, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{ + Key: "state", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + Operator: v3.FilterOperatorIn, + Value: []string{"used", "cached"}, + }, + { + Key: v3.AttributeKey{ + Key: "host_name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeResource, + }, + Operator: v3.FilterOperatorNotContains, + Value: "k8s-infra-otel-agent", + }, + }, + }, + GroupBy: []v3.AttributeKey{ + { + Key: "host_name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeResource, + }, + { + Key: "os_type", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeResource, + }, + }, + Expression: "C", + ReduceTo: v3.ReduceToOperatorAvg, + TimeAggregation: v3.TimeAggregationAvg, + SpaceAggregation: v3.SpaceAggregationSum, + Disabled: true, + }, + "D": { + QueryName: "D", + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: "system_memory_usage", + DataType: v3.AttributeKeyDataTypeFloat64, + }, + Temporality: v3.Cumulative, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{ + Key: "host_name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeResource, + }, + Operator: v3.FilterOperatorNotContains, + Value: "k8s-infra-otel-agent", + }, + }, + }, + GroupBy: []v3.AttributeKey{ + { + Key: "host_name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeResource, + }, + { + Key: "os_type", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeResource, + }, + }, + Expression: "D", + ReduceTo: v3.ReduceToOperatorAvg, + TimeAggregation: v3.TimeAggregationAvg, + SpaceAggregation: v3.SpaceAggregationSum, + Disabled: true, + }, + "F2": { + QueryName: "F2", + Expression: "C/D", + Legend: "Memory Usage (%)", + }, + "E": { + QueryName: "E", + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: "system_cpu_time", + DataType: v3.AttributeKeyDataTypeFloat64, + }, + Temporality: v3.Cumulative, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{ + Key: "state", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + Operator: v3.FilterOperatorEqual, + Value: "wait", + }, + { + Key: v3.AttributeKey{ + Key: "host_name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeResource, + }, + Operator: v3.FilterOperatorNotContains, + Value: "k8s-infra-otel-agent", + }, + }, + }, + GroupBy: []v3.AttributeKey{ + { + Key: "host_name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeResource, + }, + { + Key: "os_type", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeResource, + }, + }, + Expression: "E", + ReduceTo: v3.ReduceToOperatorAvg, + TimeAggregation: v3.TimeAggregationRate, + SpaceAggregation: v3.SpaceAggregationSum, + Disabled: true, + }, + "F": { + QueryName: "F", + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: "system_cpu_time", + DataType: v3.AttributeKeyDataTypeFloat64, + }, + Temporality: v3.Cumulative, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{ + Key: "host_name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeResource, + }, + Operator: v3.FilterOperatorNotContains, + Value: "k8s-infra-otel-agent", + }, + }, + }, + GroupBy: []v3.AttributeKey{ + { + Key: "host_name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeResource, + }, + { + Key: "os_type", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeResource, + }, + }, + Expression: "F", + ReduceTo: v3.ReduceToOperatorAvg, + TimeAggregation: v3.TimeAggregationRate, + SpaceAggregation: v3.SpaceAggregationSum, + Disabled: true, + }, + "F3": { + QueryName: "F3", + Expression: "E/F", + Legend: "CPU Wait Time (%)", + }, + "G": { + QueryName: "G", + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: "system_cpu_load_average_15m", + DataType: v3.AttributeKeyDataTypeFloat64, + }, + Temporality: v3.Unspecified, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{ + Key: "host_name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeResource, + }, + Operator: v3.FilterOperatorNotContains, + Value: "k8s-infra-otel-agent", + }, + }, + }, + GroupBy: []v3.AttributeKey{ + { + Key: "host_name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeResource, + }, + { + Key: "os_type", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeResource, + }, + }, + Expression: "G", + ReduceTo: v3.ReduceToOperatorAvg, + TimeAggregation: v3.TimeAggregationAvg, + SpaceAggregation: v3.SpaceAggregationSum, + Disabled: false, + Legend: "CPU Load Average (15m)", + }, + }, + PanelType: v3.PanelTypeTable, + QueryType: v3.QueryTypeBuilder, + }, + Version: "v4", + FormatForWeb: true, +} diff --git a/pkg/query-service/app/inframetrics/table_list_query_k8s.go b/pkg/query-service/app/inframetrics/table_list_query_k8s.go new file mode 100644 index 0000000000..68cc9b92d6 --- /dev/null +++ b/pkg/query-service/app/inframetrics/table_list_query_k8s.go @@ -0,0 +1,269 @@ +package inframetrics + +import v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + +var K8STableListQuery = v3.QueryRangeParamsV3{ + CompositeQuery: &v3.CompositeQuery{ + BuilderQueries: map[string]*v3.BuilderQuery{ + "A": { + QueryName: "A", + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: "system_cpu_time", + DataType: v3.AttributeKeyDataTypeFloat64, + }, + Temporality: v3.Cumulative, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{ + Key: "state", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + Operator: v3.FilterOperatorNotEqual, + Value: "idle", + }, + }, + }, + GroupBy: []v3.AttributeKey{ + { + Key: "k8s_node_name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeResource, + }, + { + Key: "os_type", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeResource, + }, + }, + Expression: "A", + ReduceTo: v3.ReduceToOperatorAvg, + TimeAggregation: v3.TimeAggregationRate, + SpaceAggregation: v3.SpaceAggregationSum, + Disabled: true, + }, + "B": { + QueryName: "B", + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: "system_cpu_time", + DataType: v3.AttributeKeyDataTypeFloat64, + }, + Temporality: v3.Cumulative, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{}, + }, + GroupBy: []v3.AttributeKey{ + { + Key: "k8s_node_name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeResource, + }, + { + Key: "os_type", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeResource, + }, + }, + Expression: "B", + ReduceTo: v3.ReduceToOperatorAvg, + TimeAggregation: v3.TimeAggregationRate, + SpaceAggregation: v3.SpaceAggregationSum, + Disabled: true, + }, + "F1": { + QueryName: "F1", + Expression: "A/B", + Legend: "CPU Usage (%)", + }, + "C": { + QueryName: "C", + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: "system_memory_usage", + DataType: v3.AttributeKeyDataTypeFloat64, + }, + Temporality: v3.Cumulative, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{ + Key: "state", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + Operator: v3.FilterOperatorIn, + Value: []string{"used", "cached"}, + }, + }, + }, + GroupBy: []v3.AttributeKey{ + { + Key: "k8s_node_name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeResource, + }, + { + Key: "os_type", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeResource, + }, + }, + Expression: "C", + ReduceTo: v3.ReduceToOperatorAvg, + TimeAggregation: v3.TimeAggregationAvg, + SpaceAggregation: v3.SpaceAggregationSum, + Disabled: true, + }, + "D": { + QueryName: "D", + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: "system_memory_usage", + DataType: v3.AttributeKeyDataTypeFloat64, + }, + Temporality: v3.Cumulative, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{}, + }, + GroupBy: []v3.AttributeKey{ + { + Key: "k8s_node_name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeResource, + }, + { + Key: "os_type", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeResource, + }, + }, + Expression: "D", + ReduceTo: v3.ReduceToOperatorAvg, + TimeAggregation: v3.TimeAggregationAvg, + SpaceAggregation: v3.SpaceAggregationSum, + Disabled: true, + }, + "F2": { + QueryName: "F2", + Expression: "C/D", + Legend: "Memory Usage (%)", + }, + "E": { + QueryName: "E", + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: "system_cpu_time", + DataType: v3.AttributeKeyDataTypeFloat64, + }, + Temporality: v3.Cumulative, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{ + Key: "state", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + Operator: v3.FilterOperatorEqual, + Value: "wait", + }, + }, + }, + GroupBy: []v3.AttributeKey{ + { + Key: "k8s_node_name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeResource, + }, + { + Key: "os_type", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeResource, + }, + }, + Expression: "E", + ReduceTo: v3.ReduceToOperatorAvg, + TimeAggregation: v3.TimeAggregationRate, + SpaceAggregation: v3.SpaceAggregationSum, + Disabled: true, + }, + "F": { + QueryName: "F", + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: "system_cpu_time", + DataType: v3.AttributeKeyDataTypeFloat64, + }, + Temporality: v3.Cumulative, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{}, + }, + GroupBy: []v3.AttributeKey{ + { + Key: "k8s_node_name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeResource, + }, + { + Key: "os_type", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeResource, + }, + }, + Expression: "F", + ReduceTo: v3.ReduceToOperatorAvg, + TimeAggregation: v3.TimeAggregationRate, + SpaceAggregation: v3.SpaceAggregationSum, + Disabled: true, + }, + "F3": { + QueryName: "F3", + Expression: "E/F", + Legend: "CPU Wait Time (%)", + }, + "G": { + QueryName: "G", + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: "system_cpu_load_average_15m", + DataType: v3.AttributeKeyDataTypeFloat64, + }, + Temporality: v3.Unspecified, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{}, + }, + GroupBy: []v3.AttributeKey{ + { + Key: "k8s_node_name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeResource, + }, + { + Key: "os_type", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeResource, + }, + }, + Expression: "G", + ReduceTo: v3.ReduceToOperatorAvg, + TimeAggregation: v3.TimeAggregationAvg, + SpaceAggregation: v3.SpaceAggregationSum, + Legend: "CPU Load Average (15m)", + }, + }, + PanelType: v3.PanelTypeTable, + QueryType: v3.QueryTypeBuilder, + }, + Version: "v4", + FormatForWeb: true, +} diff --git a/pkg/query-service/app/querier/v2/querier.go b/pkg/query-service/app/querier/v2/querier.go index f8316d6f6c..f41fd761dd 100644 --- a/pkg/query-service/app/querier/v2/querier.go +++ b/pkg/query-service/app/querier/v2/querier.go @@ -159,6 +159,8 @@ func (q *querier) runBuilderQueries(ctx context.Context, params *v3.QueryRangePa cacheKeys := q.keyGenerator.GenerateKeys(params) + now := time.Now() + ch := make(chan channelResult, len(params.CompositeQuery.BuilderQueries)) var wg sync.WaitGroup @@ -171,6 +173,7 @@ func (q *querier) runBuilderQueries(ctx context.Context, params *v3.QueryRangePa wg.Wait() close(ch) + zap.L().Info("time taken to run builder queries", zap.Duration("multiQueryDuration", time.Since(now)), zap.Int("num_queries", len(params.CompositeQuery.BuilderQueries))) results := make([]*v3.Result, 0) errQueriesByName := make(map[string]error) diff --git a/pkg/query-service/app/server.go b/pkg/query-service/app/server.go index f16597aa31..dc6ac21e15 100644 --- a/pkg/query-service/app/server.go +++ b/pkg/query-service/app/server.go @@ -308,6 +308,7 @@ func (s *Server) createPublicServer(api *APIHandler) (*http.Server, error) { api.RegisterLogsRoutes(r, am) api.RegisterIntegrationRoutes(r, am) api.RegisterQueryRangeV3Routes(r, am) + api.RegisterInfraMetricsRoutes(r, am) api.RegisterWebSocketPaths(r, am) api.RegisterQueryRangeV4Routes(r, am) api.RegisterMessagingQueuesRoutes(r, am) diff --git a/pkg/query-service/model/infra.go b/pkg/query-service/model/infra.go new file mode 100644 index 0000000000..ade805315d --- /dev/null +++ b/pkg/query-service/model/infra.go @@ -0,0 +1,46 @@ +package model + +import v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + +type HostListRequest struct { + Start int64 `json:"start"` // epoch time in ms + End int64 `json:"end"` // epoch time in ms + Filters *v3.FilterSet `json:"filters"` + GroupBy []v3.AttributeKey `json:"groupBy"` + OrderBy *v3.OrderBy `json:"orderBy"` + Offset int `json:"offset"` + Limit int `json:"limit"` +} + +type HostListRecord struct { + HostName string `json:"hostName"` + Active bool `json:"active"` + OS string `json:"os"` + CPU float64 `json:"cpu"` + CPUTimeSeries *v3.Series `json:"cpuTimeSeries"` + Memory float64 `json:"memory"` + MemoryTimeSeries *v3.Series `json:"memoryTimeSeries"` + Wait float64 `json:"wait"` + WaitTimeSeries *v3.Series `json:"waitTimeSeries"` + Load15 float64 `json:"load15"` + Load15TimeSeries *v3.Series `json:"load15TimeSeries"` + Meta map[string]string `json:"-"` +} + +type HostListGroup struct { + GroupValues []string `json:"groupValues"` + Active int `json:"active"` + Inactive int `json:"inactive"` + GroupCPUAvg float64 `json:"groupCPUAvg"` + GroupMemoryAvg float64 `json:"groupMemoryAvg"` + GroupWaitAvg float64 `json:"groupWaitAvg"` + GroupLoad15Avg float64 `json:"groupLoad15Avg"` + HostNames []string `json:"hostNames"` +} + +type HostListResponse struct { + Type string `json:"type"` + Records []HostListRecord `json:"records"` + Groups []HostListGroup `json:"groups"` + Total int `json:"total"` +}