diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index f4c6b53269..2b54bd7cce 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -112,7 +112,8 @@ type APIHandler struct { UseLogsNewSchema bool - hostsRepo *inframetrics.HostsRepo + hostsRepo *inframetrics.HostsRepo + processesRepo *inframetrics.ProcessesRepo } type APIHandlerOpts struct { @@ -183,6 +184,7 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) { querierv2 := querierV2.NewQuerier(querierOptsV2) hostsRepo := inframetrics.NewHostsRepo(opts.Reader, querierv2) + processesRepo := inframetrics.NewProcessesRepo(opts.Reader, querierv2) aH := &APIHandler{ reader: opts.Reader, @@ -202,6 +204,7 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) { querierV2: querierv2, UseLogsNewSchema: opts.UseLogsNewSchema, hostsRepo: hostsRepo, + processesRepo: processesRepo, } logsQueryBuilder := logsv3.PrepareLogsQuery @@ -351,10 +354,15 @@ func (aH *APIHandler) RegisterQueryRangeV3Routes(router *mux.Router, am *AuthMid } func (aH *APIHandler) RegisterInfraMetricsRoutes(router *mux.Router, am *AuthMiddleware) { - subRouter := router.PathPrefix("/api/v1/hosts").Subrouter() - subRouter.HandleFunc("/attribute_keys", am.ViewAccess(aH.getHostAttributeKeys)).Methods(http.MethodGet) - subRouter.HandleFunc("/attribute_values", am.ViewAccess(aH.getHostAttributeValues)).Methods(http.MethodGet) - subRouter.HandleFunc("/list", am.ViewAccess(aH.getHostList)).Methods(http.MethodPost) + hostsSubRouter := router.PathPrefix("/api/v1/hosts").Subrouter() + hostsSubRouter.HandleFunc("/attribute_keys", am.ViewAccess(aH.getHostAttributeKeys)).Methods(http.MethodGet) + hostsSubRouter.HandleFunc("/attribute_values", am.ViewAccess(aH.getHostAttributeValues)).Methods(http.MethodGet) + hostsSubRouter.HandleFunc("/list", am.ViewAccess(aH.getHostList)).Methods(http.MethodPost) + + processesSubRouter := router.PathPrefix("/api/v1/processes").Subrouter() + processesSubRouter.HandleFunc("/attribute_keys", am.ViewAccess(aH.getProcessAttributeKeys)).Methods(http.MethodGet) + processesSubRouter.HandleFunc("/attribute_values", am.ViewAccess(aH.getProcessAttributeValues)).Methods(http.MethodGet) + processesSubRouter.HandleFunc("/list", am.ViewAccess(aH.getProcessList)).Methods(http.MethodPost) } func (aH *APIHandler) RegisterWebSocketPaths(router *mux.Router, am *AuthMiddleware) { diff --git a/pkg/query-service/app/infra.go b/pkg/query-service/app/infra.go index bc0557543c..fdc1510f15 100644 --- a/pkg/query-service/app/infra.go +++ b/pkg/query-service/app/infra.go @@ -69,3 +69,56 @@ func (aH *APIHandler) getHostList(w http.ResponseWriter, r *http.Request) { // write response aH.Respond(w, hostList) } + +func (aH *APIHandler) getProcessAttributeKeys(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + req, err := parseFilterAttributeKeyRequest(r) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) + return + } + + keys, err := aH.processesRepo.GetProcessAttributeKeys(ctx, *req) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) + return + } + + aH.Respond(w, keys) +} + +func (aH *APIHandler) getProcessAttributeValues(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + req, err := parseFilterAttributeValueRequest(r) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) + return + } + + values, err := aH.processesRepo.GetProcessAttributeValues(ctx, *req) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) + return + } + + aH.Respond(w, values) +} + +func (aH *APIHandler) getProcessList(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + req := model.ProcessListRequest{} + + err := json.NewDecoder(r.Body).Decode(&req) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) + return + } + + hostList, err := aH.processesRepo.GetProcessList(ctx, req) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) + return + } + + aH.Respond(w, hostList) +} diff --git a/pkg/query-service/app/inframetrics/hosts.go b/pkg/query-service/app/inframetrics/hosts.go index 1e99454244..5b2d2d2112 100644 --- a/pkg/query-service/app/inframetrics/hosts.go +++ b/pkg/query-service/app/inframetrics/hosts.go @@ -394,13 +394,6 @@ func (h *HostsRepo) getHostsForQuery(ctx context.Context, records = append(records, record) } - if req.Offset > 0 { - records = records[req.Offset:] - } - if req.Limit > 0 && len(records) > req.Limit { - records = records[:req.Limit] - } - return records, nil } @@ -417,6 +410,10 @@ func dedupRecords(records []model.HostListRecord) []model.HostListRecord { } func (h *HostsRepo) GetHostList(ctx context.Context, req model.HostListRequest) (model.HostListResponse, error) { + if req.Limit == 0 { + req.Limit = 10 + } + resp := model.HostListResponse{ Type: "list", } @@ -436,6 +433,16 @@ func (h *HostsRepo) GetHostList(ctx context.Context, req model.HostListRequest) // are present in the response. we need to dedup the results. records = dedupRecords(records) + resp.Total = len(records) + + if req.Offset > 0 { + records = records[req.Offset:] + } + if req.Limit > 0 && len(records) > req.Limit { + records = records[:req.Limit] + } + resp.Records = records + if len(req.GroupBy) > 0 { groups := []model.HostListGroup{} @@ -505,8 +512,6 @@ func (h *HostsRepo) GetHostList(ctx context.Context, req model.HostListRequest) resp.Groups = groups resp.Type = "grouped_list" } - resp.Records = records - resp.Total = len(records) return resp, nil } diff --git a/pkg/query-service/app/inframetrics/processes.go b/pkg/query-service/app/inframetrics/processes.go new file mode 100644 index 0000000000..5ca3c74c21 --- /dev/null +++ b/pkg/query-service/app/inframetrics/processes.go @@ -0,0 +1,334 @@ +package inframetrics + +import ( + "context" + "fmt" + "math" + "strings" + + "go.signoz.io/signoz/pkg/query-service/app/metrics/v4/helpers" + "go.signoz.io/signoz/pkg/query-service/common" + "go.signoz.io/signoz/pkg/query-service/interfaces" + "go.signoz.io/signoz/pkg/query-service/model" + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + "go.signoz.io/signoz/pkg/query-service/postprocess" + "golang.org/x/exp/slices" +) + +type ProcessesRepo struct { + reader interfaces.Reader + querierV2 interfaces.Querier +} + +func NewProcessesRepo(reader interfaces.Reader, querierV2 interfaces.Querier) *ProcessesRepo { + return &ProcessesRepo{reader: reader, querierV2: querierV2} +} + +func (p *ProcessesRepo) GetProcessAttributeKeys(ctx context.Context, req v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) { + // TODO(srikanthccv): remove hardcoded metric name and support keys from any system metric + req.DataSource = v3.DataSourceMetrics + req.AggregateAttribute = "process_memory_usage" + if req.Limit == 0 { + req.Limit = 50 + } + + attributeKeysResponse, err := p.reader.GetMetricAttributeKeys(ctx, &req) + if err != nil { + return nil, err + } + + // TODO(srikanthccv): only return resource attributes when we have a way to + // distinguish between resource attributes and other attributes. + filteredKeys := []v3.AttributeKey{} + for _, key := range attributeKeysResponse.AttributeKeys { + if slices.Contains(pointAttrsToIgnore, key.Key) { + continue + } + filteredKeys = append(filteredKeys, key) + } + + return &v3.FilterAttributeKeyResponse{AttributeKeys: filteredKeys}, nil +} + +func (p *ProcessesRepo) GetProcessAttributeValues(ctx context.Context, req v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error) { + req.DataSource = v3.DataSourceMetrics + req.AggregateAttribute = "process_memory_usage" + if req.Limit == 0 { + req.Limit = 50 + } + + attributeValuesResponse, err := p.reader.GetMetricAttributeValues(ctx, &req) + if err != nil { + return nil, err + } + return attributeValuesResponse, nil +} + +func getGroupKeyForProcesses(record model.ProcessListRecord, groupBy []v3.AttributeKey) string { + groupKey := "" + for _, key := range groupBy { + groupKey += fmt.Sprintf("%s=%s,", key.Key, record.Meta[key.Key]) + } + return groupKey +} + +func (p *ProcessesRepo) getMetadataAttributes(ctx context.Context, + req model.ProcessListRequest) (map[string]map[string]string, error) { + processAttrs := map[string]map[string]string{} + + keysToAdd := []string{"process_pid", "process_executable_name", "process_command", "process_command_line"} + for _, key := range keysToAdd { + hasKey := false + for _, groupByKey := range req.GroupBy { + if groupByKey.Key == key { + hasKey = true + break + } + } + if !hasKey { + req.GroupBy = append(req.GroupBy, v3.AttributeKey{Key: key}) + } + } + + mq := v3.BuilderQuery{ + AggregateAttribute: v3.AttributeKey{ + Key: "process_memory_usage", + DataType: v3.AttributeKeyDataTypeFloat64, + }, + Temporality: v3.Cumulative, + GroupBy: req.GroupBy, + } + + query, err := helpers.PrepareTimeseriesFilterQuery(req.Start, req.End, &mq) + if err != nil { + return nil, err + } + + // TODO(srikanthccv): remove this + // What is happening here? + // The `PrepareTimeseriesFilterQuery` uses the local time series table for sub-query because each fingerprint + // goes to same shard. + // However, in this case, we are interested in the attributes values across all the shards. + // So, we replace the local time series table with the distributed time series table. + // See `PrepareTimeseriesFilterQuery` for more details. + query = strings.Replace(query, ".time_series_v4", ".distributed_time_series_v4", 1) + + attrsListResponse, err := p.reader.GetListResultV3(ctx, query) + if err != nil { + return nil, err + } + + for _, row := range attrsListResponse { + stringData := map[string]string{} + for key, value := range row.Data { + if str, ok := value.(string); ok { + stringData[key] = str + } else if strPtr, ok := value.(*string); ok { + stringData[key] = *strPtr + } + } + + pid := stringData["process_pid"] + if _, ok := processAttrs[pid]; !ok { + processAttrs[pid] = map[string]string{} + } + + for _, key := range req.GroupBy { + processAttrs[pid][key.Key] = stringData[key.Key] + } + } + + return processAttrs, nil +} + +func (p *ProcessesRepo) GetProcessList(ctx context.Context, req model.ProcessListRequest) (model.ProcessListResponse, error) { + if req.Limit == 0 { + req.Limit = 10 + } + + resp := model.ProcessListResponse{ + Type: "list", + } + + step := common.MinAllowedStepInterval(req.Start, req.End) + + query := ProcessesTableListQuery.Clone() + if req.OrderBy != nil { + for _, q := range query.CompositeQuery.BuilderQueries { + q.OrderBy = []v3.OrderBy{*req.OrderBy} + } + } + + query.Start = req.Start + query.End = req.End + query.Step = step + + for _, query := range query.CompositeQuery.BuilderQueries { + query.StepInterval = step + if req.Filters != nil && len(req.Filters.Items) > 0 { + if query.Filters == nil { + query.Filters = &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{}} + } + query.Filters.Items = append(query.Filters.Items, req.Filters.Items...) + } + } + + processAttrs, err := p.getMetadataAttributes(ctx, req) + if err != nil { + return resp, err + } + + queryResponse, _, err := p.querierV2.QueryRange(ctx, query) + if err != nil { + return resp, err + } + + type processTSInfo struct { + CpuTimeSeries *v3.Series `json:"cpu_time_series"` + MemoryTimeSeries *v3.Series `json:"memory_time_series"` + } + processTSInfoMap := map[string]*processTSInfo{} + + for _, result := range queryResponse { + for _, series := range result.Series { + pid := series.Labels["process_pid"] + if _, ok := processTSInfoMap[pid]; !ok { + processTSInfoMap[pid] = &processTSInfo{} + } + } + } + + query.FormatForWeb = false + query.CompositeQuery.PanelType = v3.PanelTypeGraph + + formulaResult, err := postprocess.PostProcessResult(queryResponse, query) + if err != nil { + return resp, err + } + + for _, result := range formulaResult { + for _, series := range result.Series { + pid := series.Labels["process_pid"] + if _, ok := processTSInfoMap[pid]; !ok { + processTSInfoMap[pid] = &processTSInfo{} + } + loadSeries := *series + if result.QueryName == "F1" { + processTSInfoMap[pid].CpuTimeSeries = &loadSeries + } else if result.QueryName == "C" { + processTSInfoMap[pid].MemoryTimeSeries = &loadSeries + } + } + } + + query.FormatForWeb = true + query.CompositeQuery.PanelType = v3.PanelTypeTable + + formattedResponse, err := postprocess.PostProcessResult(queryResponse, query) + if err != nil { + return resp, err + } + + if len(formattedResponse) == 0 { + return resp, nil + } + + records := []model.ProcessListRecord{} + + // there should be only one result in the response + processInfo := formattedResponse[0] + + for _, row := range processInfo.Table.Rows { + record := model.ProcessListRecord{ + ProcessCPU: -1, + ProcessMemory: -1, + } + + pid, ok := row.Data["process_pid"].(string) + if ok { + record.ProcessID = pid + } + + processCPU, ok := row.Data["F1"].(float64) + if ok { + record.ProcessCPU = processCPU + } + + processMemory, ok := row.Data["C"].(float64) + if ok { + record.ProcessMemory = processMemory + } + record.Meta = processAttrs[record.ProcessID] + if processTSInfoMap[record.ProcessID] != nil { + record.ProcessCPUTimeSeries = processTSInfoMap[record.ProcessID].CpuTimeSeries + record.ProcessMemoryTimeSeries = processTSInfoMap[record.ProcessID].MemoryTimeSeries + } + record.ProcessName = record.Meta["process_executable_name"] + record.ProcessCMD = record.Meta["process_command"] + record.ProcessCMDLine = record.Meta["process_command_line"] + records = append(records, record) + } + + resp.Total = len(records) + + if req.Offset > 0 { + records = records[req.Offset:] + } + if req.Limit > 0 && len(records) > req.Limit { + records = records[:req.Limit] + } + resp.Records = records + + if len(req.GroupBy) > 0 { + groups := []model.ProcessListGroup{} + + groupMap := make(map[string][]model.ProcessListRecord) + for _, record := range records { + groupKey := getGroupKeyForProcesses(record, req.GroupBy) + if _, ok := groupMap[groupKey]; !ok { + groupMap[groupKey] = []model.ProcessListRecord{record} + } else { + groupMap[groupKey] = append(groupMap[groupKey], record) + } + } + + for _, records := range groupMap { + var avgCPU, avgMemory float64 + var validCPU, validMemory int + for _, record := range records { + if !math.IsNaN(record.ProcessCPU) { + avgCPU += record.ProcessCPU + validCPU++ + } + if !math.IsNaN(record.ProcessMemory) { + avgMemory += record.ProcessMemory + validMemory++ + } + } + avgCPU /= float64(validCPU) + avgMemory /= float64(validMemory) + + // take any record and make it as the group meta + firstRecord := records[0] + var groupValues []string + for _, key := range req.GroupBy { + groupValues = append(groupValues, firstRecord.Meta[key.Key]) + } + processNames := []string{} + for _, record := range records { + processNames = append(processNames, record.ProcessName) + } + + groups = append(groups, model.ProcessListGroup{ + GroupValues: groupValues, + GroupCPUAvg: avgCPU, + GroupMemoryAvg: avgMemory, + ProcessNames: processNames, + }) + } + resp.Groups = groups + resp.Type = "grouped_list" + } + + return resp, nil +} diff --git a/pkg/query-service/app/inframetrics/table_list_query.go b/pkg/query-service/app/inframetrics/table_list_query.go index dd44511348..3ea748a354 100644 --- a/pkg/query-service/app/inframetrics/table_list_query.go +++ b/pkg/query-service/app/inframetrics/table_list_query.go @@ -335,3 +335,69 @@ var NonK8STableListQuery = v3.QueryRangeParamsV3{ Version: "v4", FormatForWeb: true, } + +var ProcessesTableListQuery = v3.QueryRangeParamsV3{ + CompositeQuery: &v3.CompositeQuery{ + BuilderQueries: map[string]*v3.BuilderQuery{ + "A": { + QueryName: "A", + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: "process_cpu_time", + DataType: v3.AttributeKeyDataTypeFloat64, + }, + Temporality: v3.Cumulative, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{}, + }, + GroupBy: []v3.AttributeKey{ + { + Key: "process_pid", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeResource, + }, + }, + Expression: "A", + ReduceTo: v3.ReduceToOperatorAvg, + TimeAggregation: v3.TimeAggregationRate, + SpaceAggregation: v3.SpaceAggregationSum, + Disabled: true, + }, + "F1": { + QueryName: "F1", + Expression: "A", + Legend: "Process CPU Usage (%)", + }, + "C": { + QueryName: "C", + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: "process_memory_usage", + DataType: v3.AttributeKeyDataTypeFloat64, + }, + Temporality: v3.Cumulative, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{}, + }, + GroupBy: []v3.AttributeKey{ + { + Key: "process_pid", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeResource, + }, + }, + Expression: "C", + ReduceTo: v3.ReduceToOperatorAvg, + TimeAggregation: v3.TimeAggregationAvg, + SpaceAggregation: v3.SpaceAggregationSum, + Disabled: false, + }, + }, + PanelType: v3.PanelTypeTable, + QueryType: v3.QueryTypeBuilder, + }, + Version: "v4", + FormatForWeb: true, +} diff --git a/pkg/query-service/model/infra.go b/pkg/query-service/model/infra.go index ade805315d..8807e7e046 100644 --- a/pkg/query-service/model/infra.go +++ b/pkg/query-service/model/infra.go @@ -44,3 +44,39 @@ type HostListResponse struct { Groups []HostListGroup `json:"groups"` Total int `json:"total"` } + +type ProcessListRequest struct { + Start int64 `json:"start"` // epoch time in ms + End int64 `json:"end"` // epoch time in ms + Filters *v3.FilterSet `json:"filters"` + GroupBy []v3.AttributeKey `json:"groupBy"` + OrderBy *v3.OrderBy `json:"orderBy"` + Offset int `json:"offset"` + Limit int `json:"limit"` +} + +type ProcessListResponse struct { + Type string `json:"type"` + Records []ProcessListRecord `json:"records"` + Groups []ProcessListGroup `json:"groups"` + Total int `json:"total"` +} + +type ProcessListRecord struct { + ProcessName string `json:"processName"` + ProcessID string `json:"processID"` + ProcessCMD string `json:"processCMD"` + ProcessCMDLine string `json:"processCMDLine"` + ProcessCPU float64 `json:"processCPU"` + ProcessCPUTimeSeries *v3.Series `json:"processCPUTimeSeries"` + ProcessMemory float64 `json:"processMemory"` + ProcessMemoryTimeSeries *v3.Series `json:"processMemoryTimeSeries"` + Meta map[string]string `json:"-"` +} + +type ProcessListGroup struct { + GroupValues []string `json:"groupValues"` + GroupCPUAvg float64 `json:"groupCPUAvg"` + GroupMemoryAvg float64 `json:"groupMemoryAvg"` + ProcessNames []string `json:"processNames"` +}