chore: add k8s {deployment, daemonset, statefulset, job} resources (#6401)

This commit is contained in:
Srikanth Chekuri 2024-11-12 20:53:40 +05:30 committed by GitHub
parent d5523fc092
commit fd9e9f0fb3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 2377 additions and 0 deletions

View File

@ -119,6 +119,11 @@ type APIHandler struct {
nodesRepo *inframetrics.NodesRepo nodesRepo *inframetrics.NodesRepo
namespacesRepo *inframetrics.NamespacesRepo namespacesRepo *inframetrics.NamespacesRepo
clustersRepo *inframetrics.ClustersRepo clustersRepo *inframetrics.ClustersRepo
// workloads
deploymentsRepo *inframetrics.DeploymentsRepo
daemonsetsRepo *inframetrics.DaemonSetsRepo
statefulsetsRepo *inframetrics.StatefulSetsRepo
jobsRepo *inframetrics.JobsRepo
} }
type APIHandlerOpts struct { type APIHandlerOpts struct {
@ -197,6 +202,10 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
nodesRepo := inframetrics.NewNodesRepo(opts.Reader, querierv2) nodesRepo := inframetrics.NewNodesRepo(opts.Reader, querierv2)
namespacesRepo := inframetrics.NewNamespacesRepo(opts.Reader, querierv2) namespacesRepo := inframetrics.NewNamespacesRepo(opts.Reader, querierv2)
clustersRepo := inframetrics.NewClustersRepo(opts.Reader, querierv2) clustersRepo := inframetrics.NewClustersRepo(opts.Reader, querierv2)
deploymentsRepo := inframetrics.NewDeploymentsRepo(opts.Reader, querierv2)
daemonsetsRepo := inframetrics.NewDaemonSetsRepo(opts.Reader, querierv2)
statefulsetsRepo := inframetrics.NewStatefulSetsRepo(opts.Reader, querierv2)
jobsRepo := inframetrics.NewJobsRepo(opts.Reader, querierv2)
aH := &APIHandler{ aH := &APIHandler{
reader: opts.Reader, reader: opts.Reader,
@ -222,6 +231,10 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
nodesRepo: nodesRepo, nodesRepo: nodesRepo,
namespacesRepo: namespacesRepo, namespacesRepo: namespacesRepo,
clustersRepo: clustersRepo, clustersRepo: clustersRepo,
deploymentsRepo: deploymentsRepo,
daemonsetsRepo: daemonsetsRepo,
statefulsetsRepo: statefulsetsRepo,
jobsRepo: jobsRepo,
} }
logsQueryBuilder := logsv3.PrepareLogsQuery logsQueryBuilder := logsv3.PrepareLogsQuery
@ -400,6 +413,26 @@ func (aH *APIHandler) RegisterInfraMetricsRoutes(router *mux.Router, am *AuthMid
clustersSubRouter.HandleFunc("/attribute_keys", am.ViewAccess(aH.getClusterAttributeKeys)).Methods(http.MethodGet) clustersSubRouter.HandleFunc("/attribute_keys", am.ViewAccess(aH.getClusterAttributeKeys)).Methods(http.MethodGet)
clustersSubRouter.HandleFunc("/attribute_values", am.ViewAccess(aH.getClusterAttributeValues)).Methods(http.MethodGet) clustersSubRouter.HandleFunc("/attribute_values", am.ViewAccess(aH.getClusterAttributeValues)).Methods(http.MethodGet)
clustersSubRouter.HandleFunc("/list", am.ViewAccess(aH.getClusterList)).Methods(http.MethodPost) clustersSubRouter.HandleFunc("/list", am.ViewAccess(aH.getClusterList)).Methods(http.MethodPost)
deploymentsSubRouter := router.PathPrefix("/api/v1/deployments").Subrouter()
deploymentsSubRouter.HandleFunc("/attribute_keys", am.ViewAccess(aH.getDeploymentAttributeKeys)).Methods(http.MethodGet)
deploymentsSubRouter.HandleFunc("/attribute_values", am.ViewAccess(aH.getDeploymentAttributeValues)).Methods(http.MethodGet)
deploymentsSubRouter.HandleFunc("/list", am.ViewAccess(aH.getDeploymentList)).Methods(http.MethodPost)
daemonsetsSubRouter := router.PathPrefix("/api/v1/daemonsets").Subrouter()
daemonsetsSubRouter.HandleFunc("/attribute_keys", am.ViewAccess(aH.getDaemonSetAttributeKeys)).Methods(http.MethodGet)
daemonsetsSubRouter.HandleFunc("/attribute_values", am.ViewAccess(aH.getDaemonSetAttributeValues)).Methods(http.MethodGet)
daemonsetsSubRouter.HandleFunc("/list", am.ViewAccess(aH.getDaemonSetList)).Methods(http.MethodPost)
statefulsetsSubRouter := router.PathPrefix("/api/v1/statefulsets").Subrouter()
statefulsetsSubRouter.HandleFunc("/attribute_keys", am.ViewAccess(aH.getStatefulSetAttributeKeys)).Methods(http.MethodGet)
statefulsetsSubRouter.HandleFunc("/attribute_values", am.ViewAccess(aH.getStatefulSetAttributeValues)).Methods(http.MethodGet)
statefulsetsSubRouter.HandleFunc("/list", am.ViewAccess(aH.getStatefulSetList)).Methods(http.MethodPost)
jobsSubRouter := router.PathPrefix("/api/v1/jobs").Subrouter()
jobsSubRouter.HandleFunc("/attribute_keys", am.ViewAccess(aH.getJobAttributeKeys)).Methods(http.MethodGet)
jobsSubRouter.HandleFunc("/attribute_values", am.ViewAccess(aH.getJobAttributeValues)).Methods(http.MethodGet)
jobsSubRouter.HandleFunc("/list", am.ViewAccess(aH.getJobList)).Methods(http.MethodPost)
} }
func (aH *APIHandler) RegisterWebSocketPaths(router *mux.Router, am *AuthMiddleware) { func (aH *APIHandler) RegisterWebSocketPaths(router *mux.Router, am *AuthMiddleware) {

View File

@ -334,3 +334,213 @@ func (aH *APIHandler) getClusterList(w http.ResponseWriter, r *http.Request) {
aH.Respond(w, clusterList) aH.Respond(w, clusterList)
} }
func (aH *APIHandler) getDeploymentAttributeKeys(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
req, err := parseFilterAttributeKeyRequest(r)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
keys, err := aH.deploymentsRepo.GetDeploymentAttributeKeys(ctx, *req)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
aH.Respond(w, keys)
}
func (aH *APIHandler) getDeploymentAttributeValues(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
req, err := parseFilterAttributeValueRequest(r)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
values, err := aH.deploymentsRepo.GetDeploymentAttributeValues(ctx, *req)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
aH.Respond(w, values)
}
func (aH *APIHandler) getDeploymentList(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
req := model.DeploymentListRequest{}
err := json.NewDecoder(r.Body).Decode(&req)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
deploymentList, err := aH.deploymentsRepo.GetDeploymentList(ctx, req)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
aH.Respond(w, deploymentList)
}
func (aH *APIHandler) getDaemonSetAttributeKeys(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
req, err := parseFilterAttributeKeyRequest(r)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
keys, err := aH.daemonsetsRepo.GetDaemonSetAttributeKeys(ctx, *req)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
aH.Respond(w, keys)
}
func (aH *APIHandler) getDaemonSetAttributeValues(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
req, err := parseFilterAttributeValueRequest(r)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
values, err := aH.daemonsetsRepo.GetDaemonSetAttributeValues(ctx, *req)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
aH.Respond(w, values)
}
func (aH *APIHandler) getDaemonSetList(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
req := model.DaemonSetListRequest{}
err := json.NewDecoder(r.Body).Decode(&req)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
daemonSetList, err := aH.daemonsetsRepo.GetDaemonSetList(ctx, req)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
aH.Respond(w, daemonSetList)
}
func (aH *APIHandler) getStatefulSetAttributeKeys(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
req, err := parseFilterAttributeKeyRequest(r)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
keys, err := aH.statefulsetsRepo.GetStatefulSetAttributeKeys(ctx, *req)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
aH.Respond(w, keys)
}
func (aH *APIHandler) getStatefulSetAttributeValues(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
req, err := parseFilterAttributeValueRequest(r)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
values, err := aH.statefulsetsRepo.GetStatefulSetAttributeValues(ctx, *req)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
aH.Respond(w, values)
}
func (aH *APIHandler) getStatefulSetList(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
req := model.StatefulSetListRequest{}
err := json.NewDecoder(r.Body).Decode(&req)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
statefulSetList, err := aH.statefulsetsRepo.GetStatefulSetList(ctx, req)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
aH.Respond(w, statefulSetList)
}
func (aH *APIHandler) getJobAttributeKeys(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
req, err := parseFilterAttributeKeyRequest(r)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
keys, err := aH.jobsRepo.GetJobAttributeKeys(ctx, *req)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
aH.Respond(w, keys)
}
func (aH *APIHandler) getJobAttributeValues(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
req, err := parseFilterAttributeValueRequest(r)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
values, err := aH.jobsRepo.GetJobAttributeValues(ctx, *req)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
aH.Respond(w, values)
}
func (aH *APIHandler) getJobList(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
req := model.JobListRequest{}
err := json.NewDecoder(r.Body).Decode(&req)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
jobList, err := aH.jobsRepo.GetJobList(ctx, req)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
aH.Respond(w, jobList)
}

View File

@ -73,6 +73,22 @@ func getParamsForTopClusters(req model.ClusterListRequest) (int64, string, strin
return getParamsForTopItems(req.Start, req.End) return getParamsForTopItems(req.Start, req.End)
} }
func getParamsForTopDeployments(req model.DeploymentListRequest) (int64, string, string) {
return getParamsForTopItems(req.Start, req.End)
}
func getParamsForTopDaemonSets(req model.DaemonSetListRequest) (int64, string, string) {
return getParamsForTopItems(req.Start, req.End)
}
func getParamsForTopStatefulSets(req model.StatefulSetListRequest) (int64, string, string) {
return getParamsForTopItems(req.Start, req.End)
}
func getParamsForTopJobs(req model.JobListRequest) (int64, string, string) {
return getParamsForTopItems(req.Start, req.End)
}
// TODO(srikanthccv): remove this // TODO(srikanthccv): remove this
// What is happening here? // What is happening here?
// The `PrepareTimeseriesFilterQuery` uses the local time series table for sub-query because each fingerprint // The `PrepareTimeseriesFilterQuery` uses the local time series table for sub-query because each fingerprint

View File

@ -0,0 +1,444 @@
package inframetrics
import (
"context"
"math"
"sort"
"go.signoz.io/signoz/pkg/query-service/app/metrics/v4/helpers"
"go.signoz.io/signoz/pkg/query-service/common"
"go.signoz.io/signoz/pkg/query-service/interfaces"
"go.signoz.io/signoz/pkg/query-service/model"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/query-service/postprocess"
"golang.org/x/exp/slices"
)
var (
metricToUseForDaemonSets = "k8s_pod_cpu_utilization"
k8sDaemonSetNameAttrKey = "k8s_daemonset_name"
metricNamesForDaemonSets = map[string]string{
"desired_nodes": "k8s_daemonset_desired_scheduled_nodes",
"available_nodes": "k8s_daemonset_current_scheduled_nodes",
}
daemonSetAttrsToEnrich = []string{
"k8s_daemonset_name",
"k8s_namespace_name",
"k8s_cluster_name",
}
queryNamesForDaemonSets = map[string][]string{
"cpu": {"A"},
"cpu_request": {"B", "A"},
"cpu_limit": {"C", "A"},
"memory": {"D"},
"memory_request": {"E", "D"},
"memory_limit": {"F", "D"},
"restarts": {"G", "A"},
"desired_nodes": {"H"},
"available_nodes": {"I"},
}
builderQueriesForDaemonSets = map[string]*v3.BuilderQuery{
// desired nodes
"H": {
QueryName: "H",
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: metricNamesForDaemonSets["desired_nodes"],
DataType: v3.AttributeKeyDataTypeFloat64,
},
Temporality: v3.Unspecified,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{},
},
GroupBy: []v3.AttributeKey{},
Expression: "H",
ReduceTo: v3.ReduceToOperatorLast,
TimeAggregation: v3.TimeAggregationAnyLast,
SpaceAggregation: v3.SpaceAggregationSum,
Disabled: false,
},
// available nodes
"I": {
QueryName: "I",
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: metricNamesForDaemonSets["available_nodes"],
DataType: v3.AttributeKeyDataTypeFloat64,
},
Temporality: v3.Unspecified,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{},
},
GroupBy: []v3.AttributeKey{},
Expression: "I",
ReduceTo: v3.ReduceToOperatorLast,
TimeAggregation: v3.TimeAggregationAnyLast,
SpaceAggregation: v3.SpaceAggregationSum,
Disabled: false,
},
}
daemonSetQueryNames = []string{"A", "B", "C", "D", "E", "F", "G", "H", "I"}
)
type DaemonSetsRepo struct {
reader interfaces.Reader
querierV2 interfaces.Querier
}
func NewDaemonSetsRepo(reader interfaces.Reader, querierV2 interfaces.Querier) *DaemonSetsRepo {
return &DaemonSetsRepo{reader: reader, querierV2: querierV2}
}
func (d *DaemonSetsRepo) GetDaemonSetAttributeKeys(ctx context.Context, req v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) {
// TODO(srikanthccv): remove hardcoded metric name and support keys from any pod metric
req.DataSource = v3.DataSourceMetrics
req.AggregateAttribute = metricToUseForDaemonSets
if req.Limit == 0 {
req.Limit = 50
}
attributeKeysResponse, err := d.reader.GetMetricAttributeKeys(ctx, &req)
if err != nil {
return nil, err
}
// TODO(srikanthccv): only return resource attributes when we have a way to
// distinguish between resource attributes and other attributes.
filteredKeys := []v3.AttributeKey{}
for _, key := range attributeKeysResponse.AttributeKeys {
if slices.Contains(pointAttrsToIgnore, key.Key) {
continue
}
filteredKeys = append(filteredKeys, key)
}
return &v3.FilterAttributeKeyResponse{AttributeKeys: filteredKeys}, nil
}
func (d *DaemonSetsRepo) GetDaemonSetAttributeValues(ctx context.Context, req v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error) {
req.DataSource = v3.DataSourceMetrics
req.AggregateAttribute = metricToUseForDaemonSets
if req.Limit == 0 {
req.Limit = 50
}
attributeValuesResponse, err := d.reader.GetMetricAttributeValues(ctx, &req)
if err != nil {
return nil, err
}
return attributeValuesResponse, nil
}
func (d *DaemonSetsRepo) getMetadataAttributes(ctx context.Context, req model.DaemonSetListRequest) (map[string]map[string]string, error) {
daemonSetAttrs := map[string]map[string]string{}
for _, key := range daemonSetAttrsToEnrich {
hasKey := false
for _, groupByKey := range req.GroupBy {
if groupByKey.Key == key {
hasKey = true
break
}
}
if !hasKey {
req.GroupBy = append(req.GroupBy, v3.AttributeKey{Key: key})
}
}
mq := v3.BuilderQuery{
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: metricToUseForDaemonSets,
DataType: v3.AttributeKeyDataTypeFloat64,
},
Temporality: v3.Unspecified,
GroupBy: req.GroupBy,
}
query, err := helpers.PrepareTimeseriesFilterQuery(req.Start, req.End, &mq)
if err != nil {
return nil, err
}
query = localQueryToDistributedQuery(query)
attrsListResponse, err := d.reader.GetListResultV3(ctx, query)
if err != nil {
return nil, err
}
for _, row := range attrsListResponse {
stringData := map[string]string{}
for key, value := range row.Data {
if str, ok := value.(string); ok {
stringData[key] = str
} else if strPtr, ok := value.(*string); ok {
stringData[key] = *strPtr
}
}
daemonSetName := stringData[k8sDaemonSetNameAttrKey]
if _, ok := daemonSetAttrs[daemonSetName]; !ok {
daemonSetAttrs[daemonSetName] = map[string]string{}
}
for _, key := range req.GroupBy {
daemonSetAttrs[daemonSetName][key.Key] = stringData[key.Key]
}
}
return daemonSetAttrs, nil
}
func (d *DaemonSetsRepo) getTopDaemonSetGroups(ctx context.Context, req model.DaemonSetListRequest, q *v3.QueryRangeParamsV3) ([]map[string]string, []map[string]string, error) {
step, timeSeriesTableName, samplesTableName := getParamsForTopDaemonSets(req)
queryNames := queryNamesForDaemonSets[req.OrderBy.ColumnName]
topDaemonSetGroupsQueryRangeParams := &v3.QueryRangeParamsV3{
Start: req.Start,
End: req.End,
Step: step,
CompositeQuery: &v3.CompositeQuery{
BuilderQueries: map[string]*v3.BuilderQuery{},
QueryType: v3.QueryTypeBuilder,
PanelType: v3.PanelTypeTable,
},
}
for _, queryName := range queryNames {
query := q.CompositeQuery.BuilderQueries[queryName].Clone()
query.StepInterval = step
query.MetricTableHints = &v3.MetricTableHints{
TimeSeriesTableName: timeSeriesTableName,
SamplesTableName: samplesTableName,
}
if req.Filters != nil && len(req.Filters.Items) > 0 {
if query.Filters == nil {
query.Filters = &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{}}
}
query.Filters.Items = append(query.Filters.Items, req.Filters.Items...)
}
topDaemonSetGroupsQueryRangeParams.CompositeQuery.BuilderQueries[queryName] = query
}
queryResponse, _, err := d.querierV2.QueryRange(ctx, topDaemonSetGroupsQueryRangeParams)
if err != nil {
return nil, nil, err
}
formattedResponse, err := postprocess.PostProcessResult(queryResponse, topDaemonSetGroupsQueryRangeParams)
if err != nil {
return nil, nil, err
}
if len(formattedResponse) == 0 || len(formattedResponse[0].Series) == 0 {
return nil, nil, nil
}
if req.OrderBy.Order == v3.DirectionDesc {
sort.Slice(formattedResponse[0].Series, func(i, j int) bool {
return formattedResponse[0].Series[i].Points[0].Value > formattedResponse[0].Series[j].Points[0].Value
})
} else {
sort.Slice(formattedResponse[0].Series, func(i, j int) bool {
return formattedResponse[0].Series[i].Points[0].Value < formattedResponse[0].Series[j].Points[0].Value
})
}
limit := math.Min(float64(req.Offset+req.Limit), float64(len(formattedResponse[0].Series)))
paginatedTopDaemonSetGroupsSeries := formattedResponse[0].Series[req.Offset:int(limit)]
topDaemonSetGroups := []map[string]string{}
for _, series := range paginatedTopDaemonSetGroupsSeries {
topDaemonSetGroups = append(topDaemonSetGroups, series.Labels)
}
allDaemonSetGroups := []map[string]string{}
for _, series := range formattedResponse[0].Series {
allDaemonSetGroups = append(allDaemonSetGroups, series.Labels)
}
return topDaemonSetGroups, allDaemonSetGroups, nil
}
func (d *DaemonSetsRepo) GetDaemonSetList(ctx context.Context, req model.DaemonSetListRequest) (model.DaemonSetListResponse, error) {
resp := model.DaemonSetListResponse{}
if req.Limit == 0 {
req.Limit = 10
}
if req.OrderBy == nil {
req.OrderBy = &v3.OrderBy{ColumnName: "cpu", Order: v3.DirectionDesc}
}
if req.GroupBy == nil {
req.GroupBy = []v3.AttributeKey{{Key: k8sDaemonSetNameAttrKey}}
resp.Type = model.ResponseTypeList
} else {
resp.Type = model.ResponseTypeGroupedList
}
step := int64(math.Max(float64(common.MinAllowedStepInterval(req.Start, req.End)), 60))
query := WorkloadTableListQuery.Clone()
query.Start = req.Start
query.End = req.End
query.Step = step
// add additional queries for daemon sets
for _, daemonSetQuery := range builderQueriesForDaemonSets {
query.CompositeQuery.BuilderQueries[daemonSetQuery.QueryName] = daemonSetQuery
}
for _, query := range query.CompositeQuery.BuilderQueries {
query.StepInterval = step
if req.Filters != nil && len(req.Filters.Items) > 0 {
if query.Filters == nil {
query.Filters = &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{}}
}
query.Filters.Items = append(query.Filters.Items, req.Filters.Items...)
}
query.GroupBy = req.GroupBy
// make sure we only get records for daemon sets
query.Filters.Items = append(query.Filters.Items, v3.FilterItem{
Key: v3.AttributeKey{Key: k8sDaemonSetNameAttrKey},
Operator: v3.FilterOperatorExists,
})
}
daemonSetAttrs, err := d.getMetadataAttributes(ctx, req)
if err != nil {
return resp, err
}
topDaemonSetGroups, allDaemonSetGroups, err := d.getTopDaemonSetGroups(ctx, req, query)
if err != nil {
return resp, err
}
groupFilters := map[string][]string{}
for _, topDaemonSetGroup := range topDaemonSetGroups {
for k, v := range topDaemonSetGroup {
groupFilters[k] = append(groupFilters[k], v)
}
}
for groupKey, groupValues := range groupFilters {
hasGroupFilter := false
if req.Filters != nil && len(req.Filters.Items) > 0 {
for _, filter := range req.Filters.Items {
if filter.Key.Key == groupKey {
hasGroupFilter = true
break
}
}
}
if !hasGroupFilter {
for _, query := range query.CompositeQuery.BuilderQueries {
query.Filters.Items = append(query.Filters.Items, v3.FilterItem{
Key: v3.AttributeKey{Key: groupKey},
Value: groupValues,
Operator: v3.FilterOperatorIn,
})
}
}
}
queryResponse, _, err := d.querierV2.QueryRange(ctx, query)
if err != nil {
return resp, err
}
formattedResponse, err := postprocess.PostProcessResult(queryResponse, query)
if err != nil {
return resp, err
}
records := []model.DaemonSetListRecord{}
for _, result := range formattedResponse {
for _, row := range result.Table.Rows {
record := model.DaemonSetListRecord{
DaemonSetName: "",
CPUUsage: -1,
CPURequest: -1,
CPULimit: -1,
MemoryUsage: -1,
MemoryRequest: -1,
MemoryLimit: -1,
DesiredNodes: -1,
AvailableNodes: -1,
}
if daemonSetName, ok := row.Data[k8sDaemonSetNameAttrKey].(string); ok {
record.DaemonSetName = daemonSetName
}
if cpu, ok := row.Data["A"].(float64); ok {
record.CPUUsage = cpu
}
if cpuRequest, ok := row.Data["B"].(float64); ok {
record.CPURequest = cpuRequest
}
if cpuLimit, ok := row.Data["C"].(float64); ok {
record.CPULimit = cpuLimit
}
if memory, ok := row.Data["D"].(float64); ok {
record.MemoryUsage = memory
}
if memoryRequest, ok := row.Data["E"].(float64); ok {
record.MemoryRequest = memoryRequest
}
if memoryLimit, ok := row.Data["F"].(float64); ok {
record.MemoryLimit = memoryLimit
}
if restarts, ok := row.Data["G"].(float64); ok {
record.Restarts = int(restarts)
}
if desiredNodes, ok := row.Data["H"].(float64); ok {
record.DesiredNodes = int(desiredNodes)
}
if availableNodes, ok := row.Data["I"].(float64); ok {
record.AvailableNodes = int(availableNodes)
}
record.Meta = map[string]string{}
if _, ok := daemonSetAttrs[record.DaemonSetName]; ok {
record.Meta = daemonSetAttrs[record.DaemonSetName]
}
for k, v := range row.Data {
if slices.Contains(daemonSetQueryNames, k) {
continue
}
if labelValue, ok := v.(string); ok {
record.Meta[k] = labelValue
}
}
records = append(records, record)
}
}
resp.Total = len(allDaemonSetGroups)
resp.Records = records
return resp, nil
}

View File

@ -0,0 +1,444 @@
package inframetrics
import (
"context"
"math"
"sort"
"go.signoz.io/signoz/pkg/query-service/app/metrics/v4/helpers"
"go.signoz.io/signoz/pkg/query-service/common"
"go.signoz.io/signoz/pkg/query-service/interfaces"
"go.signoz.io/signoz/pkg/query-service/model"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/query-service/postprocess"
"golang.org/x/exp/slices"
)
var (
metricToUseForDeployments = "k8s_pod_cpu_utilization"
k8sDeploymentNameAttrKey = "k8s_deployment_name"
metricNamesForDeployments = map[string]string{
"desired_pods": "k8s_deployment_desired",
"available_pods": "k8s_deployment_available",
}
deploymentAttrsToEnrich = []string{
"k8s_deployment_name",
"k8s_namespace_name",
"k8s_cluster_name",
}
queryNamesForDeployments = map[string][]string{
"cpu": {"A"},
"cpu_request": {"B", "A"},
"cpu_limit": {"C", "A"},
"memory": {"D"},
"memory_request": {"E", "D"},
"memory_limit": {"F", "D"},
"restarts": {"G", "A"},
"desired_pods": {"H"},
"available_pods": {"I"},
}
builderQueriesForDeployments = map[string]*v3.BuilderQuery{
// desired pods
"H": {
QueryName: "H",
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: metricNamesForDeployments["desired_pods"],
DataType: v3.AttributeKeyDataTypeFloat64,
},
Temporality: v3.Unspecified,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{},
},
GroupBy: []v3.AttributeKey{},
Expression: "H",
ReduceTo: v3.ReduceToOperatorLast,
TimeAggregation: v3.TimeAggregationAnyLast,
SpaceAggregation: v3.SpaceAggregationSum,
Disabled: false,
},
// available pods
"I": {
QueryName: "I",
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: metricNamesForDeployments["available_pods"],
DataType: v3.AttributeKeyDataTypeFloat64,
},
Temporality: v3.Unspecified,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{},
},
GroupBy: []v3.AttributeKey{},
Expression: "I",
ReduceTo: v3.ReduceToOperatorLast,
TimeAggregation: v3.TimeAggregationAnyLast,
SpaceAggregation: v3.SpaceAggregationSum,
Disabled: false,
},
}
deploymentQueryNames = []string{"A", "B", "C", "D", "E", "F", "G", "H", "I"}
)
type DeploymentsRepo struct {
reader interfaces.Reader
querierV2 interfaces.Querier
}
func NewDeploymentsRepo(reader interfaces.Reader, querierV2 interfaces.Querier) *DeploymentsRepo {
return &DeploymentsRepo{reader: reader, querierV2: querierV2}
}
func (d *DeploymentsRepo) GetDeploymentAttributeKeys(ctx context.Context, req v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) {
// TODO(srikanthccv): remove hardcoded metric name and support keys from any pod metric
req.DataSource = v3.DataSourceMetrics
req.AggregateAttribute = metricToUseForDeployments
if req.Limit == 0 {
req.Limit = 50
}
attributeKeysResponse, err := d.reader.GetMetricAttributeKeys(ctx, &req)
if err != nil {
return nil, err
}
// TODO(srikanthccv): only return resource attributes when we have a way to
// distinguish between resource attributes and other attributes.
filteredKeys := []v3.AttributeKey{}
for _, key := range attributeKeysResponse.AttributeKeys {
if slices.Contains(pointAttrsToIgnore, key.Key) {
continue
}
filteredKeys = append(filteredKeys, key)
}
return &v3.FilterAttributeKeyResponse{AttributeKeys: filteredKeys}, nil
}
func (d *DeploymentsRepo) GetDeploymentAttributeValues(ctx context.Context, req v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error) {
req.DataSource = v3.DataSourceMetrics
req.AggregateAttribute = metricToUseForDeployments
if req.Limit == 0 {
req.Limit = 50
}
attributeValuesResponse, err := d.reader.GetMetricAttributeValues(ctx, &req)
if err != nil {
return nil, err
}
return attributeValuesResponse, nil
}
func (d *DeploymentsRepo) getMetadataAttributes(ctx context.Context, req model.DeploymentListRequest) (map[string]map[string]string, error) {
deploymentAttrs := map[string]map[string]string{}
for _, key := range deploymentAttrsToEnrich {
hasKey := false
for _, groupByKey := range req.GroupBy {
if groupByKey.Key == key {
hasKey = true
break
}
}
if !hasKey {
req.GroupBy = append(req.GroupBy, v3.AttributeKey{Key: key})
}
}
mq := v3.BuilderQuery{
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: metricToUseForDeployments,
DataType: v3.AttributeKeyDataTypeFloat64,
},
Temporality: v3.Unspecified,
GroupBy: req.GroupBy,
}
query, err := helpers.PrepareTimeseriesFilterQuery(req.Start, req.End, &mq)
if err != nil {
return nil, err
}
query = localQueryToDistributedQuery(query)
attrsListResponse, err := d.reader.GetListResultV3(ctx, query)
if err != nil {
return nil, err
}
for _, row := range attrsListResponse {
stringData := map[string]string{}
for key, value := range row.Data {
if str, ok := value.(string); ok {
stringData[key] = str
} else if strPtr, ok := value.(*string); ok {
stringData[key] = *strPtr
}
}
deploymentName := stringData[k8sDeploymentNameAttrKey]
if _, ok := deploymentAttrs[deploymentName]; !ok {
deploymentAttrs[deploymentName] = map[string]string{}
}
for _, key := range req.GroupBy {
deploymentAttrs[deploymentName][key.Key] = stringData[key.Key]
}
}
return deploymentAttrs, nil
}
func (d *DeploymentsRepo) getTopDeploymentGroups(ctx context.Context, req model.DeploymentListRequest, q *v3.QueryRangeParamsV3) ([]map[string]string, []map[string]string, error) {
step, timeSeriesTableName, samplesTableName := getParamsForTopDeployments(req)
queryNames := queryNamesForDeployments[req.OrderBy.ColumnName]
topDeploymentGroupsQueryRangeParams := &v3.QueryRangeParamsV3{
Start: req.Start,
End: req.End,
Step: step,
CompositeQuery: &v3.CompositeQuery{
BuilderQueries: map[string]*v3.BuilderQuery{},
QueryType: v3.QueryTypeBuilder,
PanelType: v3.PanelTypeTable,
},
}
for _, queryName := range queryNames {
query := q.CompositeQuery.BuilderQueries[queryName].Clone()
query.StepInterval = step
query.MetricTableHints = &v3.MetricTableHints{
TimeSeriesTableName: timeSeriesTableName,
SamplesTableName: samplesTableName,
}
if req.Filters != nil && len(req.Filters.Items) > 0 {
if query.Filters == nil {
query.Filters = &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{}}
}
query.Filters.Items = append(query.Filters.Items, req.Filters.Items...)
}
topDeploymentGroupsQueryRangeParams.CompositeQuery.BuilderQueries[queryName] = query
}
queryResponse, _, err := d.querierV2.QueryRange(ctx, topDeploymentGroupsQueryRangeParams)
if err != nil {
return nil, nil, err
}
formattedResponse, err := postprocess.PostProcessResult(queryResponse, topDeploymentGroupsQueryRangeParams)
if err != nil {
return nil, nil, err
}
if len(formattedResponse) == 0 || len(formattedResponse[0].Series) == 0 {
return nil, nil, nil
}
if req.OrderBy.Order == v3.DirectionDesc {
sort.Slice(formattedResponse[0].Series, func(i, j int) bool {
return formattedResponse[0].Series[i].Points[0].Value > formattedResponse[0].Series[j].Points[0].Value
})
} else {
sort.Slice(formattedResponse[0].Series, func(i, j int) bool {
return formattedResponse[0].Series[i].Points[0].Value < formattedResponse[0].Series[j].Points[0].Value
})
}
limit := math.Min(float64(req.Offset+req.Limit), float64(len(formattedResponse[0].Series)))
paginatedTopDeploymentGroupsSeries := formattedResponse[0].Series[req.Offset:int(limit)]
topDeploymentGroups := []map[string]string{}
for _, series := range paginatedTopDeploymentGroupsSeries {
topDeploymentGroups = append(topDeploymentGroups, series.Labels)
}
allDeploymentGroups := []map[string]string{}
for _, series := range formattedResponse[0].Series {
allDeploymentGroups = append(allDeploymentGroups, series.Labels)
}
return topDeploymentGroups, allDeploymentGroups, nil
}
func (d *DeploymentsRepo) GetDeploymentList(ctx context.Context, req model.DeploymentListRequest) (model.DeploymentListResponse, error) {
resp := model.DeploymentListResponse{}
if req.Limit == 0 {
req.Limit = 10
}
if req.OrderBy == nil {
req.OrderBy = &v3.OrderBy{ColumnName: "cpu", Order: v3.DirectionDesc}
}
if req.GroupBy == nil {
req.GroupBy = []v3.AttributeKey{{Key: k8sDeploymentNameAttrKey}}
resp.Type = model.ResponseTypeList
} else {
resp.Type = model.ResponseTypeGroupedList
}
step := int64(math.Max(float64(common.MinAllowedStepInterval(req.Start, req.End)), 60))
query := WorkloadTableListQuery.Clone()
query.Start = req.Start
query.End = req.End
query.Step = step
// add additional queries for deployments
for _, deploymentQuery := range builderQueriesForDeployments {
query.CompositeQuery.BuilderQueries[deploymentQuery.QueryName] = deploymentQuery
}
for _, query := range query.CompositeQuery.BuilderQueries {
query.StepInterval = step
if req.Filters != nil && len(req.Filters.Items) > 0 {
if query.Filters == nil {
query.Filters = &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{}}
}
query.Filters.Items = append(query.Filters.Items, req.Filters.Items...)
}
query.GroupBy = req.GroupBy
// make sure we only get records for deployments
query.Filters.Items = append(query.Filters.Items, v3.FilterItem{
Key: v3.AttributeKey{Key: k8sDeploymentNameAttrKey},
Operator: v3.FilterOperatorExists,
})
}
deploymentAttrs, err := d.getMetadataAttributes(ctx, req)
if err != nil {
return resp, err
}
topDeploymentGroups, allDeploymentGroups, err := d.getTopDeploymentGroups(ctx, req, query)
if err != nil {
return resp, err
}
groupFilters := map[string][]string{}
for _, topDeploymentGroup := range topDeploymentGroups {
for k, v := range topDeploymentGroup {
groupFilters[k] = append(groupFilters[k], v)
}
}
for groupKey, groupValues := range groupFilters {
hasGroupFilter := false
if req.Filters != nil && len(req.Filters.Items) > 0 {
for _, filter := range req.Filters.Items {
if filter.Key.Key == groupKey {
hasGroupFilter = true
break
}
}
}
if !hasGroupFilter {
for _, query := range query.CompositeQuery.BuilderQueries {
query.Filters.Items = append(query.Filters.Items, v3.FilterItem{
Key: v3.AttributeKey{Key: groupKey},
Value: groupValues,
Operator: v3.FilterOperatorIn,
})
}
}
}
queryResponse, _, err := d.querierV2.QueryRange(ctx, query)
if err != nil {
return resp, err
}
formattedResponse, err := postprocess.PostProcessResult(queryResponse, query)
if err != nil {
return resp, err
}
records := []model.DeploymentListRecord{}
for _, result := range formattedResponse {
for _, row := range result.Table.Rows {
record := model.DeploymentListRecord{
DeploymentName: "",
CPUUsage: -1,
CPURequest: -1,
CPULimit: -1,
MemoryUsage: -1,
MemoryRequest: -1,
MemoryLimit: -1,
DesiredPods: -1,
AvailablePods: -1,
}
if deploymentName, ok := row.Data[k8sDeploymentNameAttrKey].(string); ok {
record.DeploymentName = deploymentName
}
if cpu, ok := row.Data["A"].(float64); ok {
record.CPUUsage = cpu
}
if cpuRequest, ok := row.Data["B"].(float64); ok {
record.CPURequest = cpuRequest
}
if cpuLimit, ok := row.Data["C"].(float64); ok {
record.CPULimit = cpuLimit
}
if memory, ok := row.Data["D"].(float64); ok {
record.MemoryUsage = memory
}
if memoryRequest, ok := row.Data["E"].(float64); ok {
record.MemoryRequest = memoryRequest
}
if memoryLimit, ok := row.Data["F"].(float64); ok {
record.MemoryLimit = memoryLimit
}
if restarts, ok := row.Data["G"].(float64); ok {
record.Restarts = int(restarts)
}
if desiredPods, ok := row.Data["H"].(float64); ok {
record.DesiredPods = int(desiredPods)
}
if availablePods, ok := row.Data["I"].(float64); ok {
record.AvailablePods = int(availablePods)
}
record.Meta = map[string]string{}
if _, ok := deploymentAttrs[record.DeploymentName]; ok {
record.Meta = deploymentAttrs[record.DeploymentName]
}
for k, v := range row.Data {
if slices.Contains(deploymentQueryNames, k) {
continue
}
if labelValue, ok := v.(string); ok {
record.Meta[k] = labelValue
}
}
records = append(records, record)
}
}
resp.Total = len(allDeploymentGroups)
resp.Records = records
return resp, nil
}

View File

@ -0,0 +1,498 @@
package inframetrics
import (
"context"
"math"
"sort"
"go.signoz.io/signoz/pkg/query-service/app/metrics/v4/helpers"
"go.signoz.io/signoz/pkg/query-service/common"
"go.signoz.io/signoz/pkg/query-service/interfaces"
"go.signoz.io/signoz/pkg/query-service/model"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/query-service/postprocess"
"golang.org/x/exp/slices"
)
var (
metricToUseForJobs = "k8s_pod_cpu_utilization"
k8sJobNameAttrKey = "k8s_job_name"
metricNamesForJobs = map[string]string{
"desired_successful_pods": "k8s_job_desired_successful_pods",
"active_pods": "k8s_job_active_pods",
"failed_pods": "k8s_job_failed_pods",
"successful_pods": "k8s_job_successful_pods",
}
jobAttrsToEnrich = []string{
"k8s_job_name",
"k8s_namespace_name",
"k8s_cluster_name",
}
queryNamesForJobs = map[string][]string{
"cpu": {"A"},
"cpu_request": {"B", "A"},
"cpu_limit": {"C", "A"},
"memory": {"D"},
"memory_request": {"E", "D"},
"memory_limit": {"F", "D"},
"restarts": {"G", "A"},
"desired_pods": {"H"},
"active_pods": {"I"},
"failed_pods": {"J"},
"successful_pods": {"K"},
}
builderQueriesForJobs = map[string]*v3.BuilderQuery{
// desired nodes
"H": {
QueryName: "H",
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: metricNamesForJobs["desired_successful_pods"],
DataType: v3.AttributeKeyDataTypeFloat64,
},
Temporality: v3.Unspecified,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{},
},
GroupBy: []v3.AttributeKey{},
Expression: "H",
ReduceTo: v3.ReduceToOperatorLast,
TimeAggregation: v3.TimeAggregationAnyLast,
SpaceAggregation: v3.SpaceAggregationSum,
Disabled: false,
},
// available nodes
"I": {
QueryName: "I",
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: metricNamesForJobs["active_pods"],
DataType: v3.AttributeKeyDataTypeFloat64,
},
Temporality: v3.Unspecified,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{},
},
GroupBy: []v3.AttributeKey{},
Expression: "I",
ReduceTo: v3.ReduceToOperatorLast,
TimeAggregation: v3.TimeAggregationAnyLast,
SpaceAggregation: v3.SpaceAggregationSum,
Disabled: false,
},
// failed pods
"J": {
QueryName: "J",
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: metricNamesForJobs["failed_pods"],
DataType: v3.AttributeKeyDataTypeFloat64,
},
Temporality: v3.Unspecified,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{},
},
GroupBy: []v3.AttributeKey{},
Expression: "J",
ReduceTo: v3.ReduceToOperatorLast,
TimeAggregation: v3.TimeAggregationAnyLast,
SpaceAggregation: v3.SpaceAggregationSum,
Disabled: false,
},
// successful pods
"K": {
QueryName: "K",
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: metricNamesForJobs["successful_pods"],
DataType: v3.AttributeKeyDataTypeFloat64,
},
Temporality: v3.Unspecified,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{},
},
GroupBy: []v3.AttributeKey{},
Expression: "K",
ReduceTo: v3.ReduceToOperatorLast,
TimeAggregation: v3.TimeAggregationAnyLast,
SpaceAggregation: v3.SpaceAggregationSum,
Disabled: false,
},
}
jobQueryNames = []string{"A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K"}
)
type JobsRepo struct {
reader interfaces.Reader
querierV2 interfaces.Querier
}
func NewJobsRepo(reader interfaces.Reader, querierV2 interfaces.Querier) *JobsRepo {
return &JobsRepo{reader: reader, querierV2: querierV2}
}
func (d *JobsRepo) GetJobAttributeKeys(ctx context.Context, req v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) {
// TODO(srikanthccv): remove hardcoded metric name and support keys from any pod metric
req.DataSource = v3.DataSourceMetrics
req.AggregateAttribute = metricToUseForJobs
if req.Limit == 0 {
req.Limit = 50
}
attributeKeysResponse, err := d.reader.GetMetricAttributeKeys(ctx, &req)
if err != nil {
return nil, err
}
// TODO(srikanthccv): only return resource attributes when we have a way to
// distinguish between resource attributes and other attributes.
filteredKeys := []v3.AttributeKey{}
for _, key := range attributeKeysResponse.AttributeKeys {
if slices.Contains(pointAttrsToIgnore, key.Key) {
continue
}
filteredKeys = append(filteredKeys, key)
}
return &v3.FilterAttributeKeyResponse{AttributeKeys: filteredKeys}, nil
}
func (d *JobsRepo) GetJobAttributeValues(ctx context.Context, req v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error) {
req.DataSource = v3.DataSourceMetrics
req.AggregateAttribute = metricToUseForJobs
if req.Limit == 0 {
req.Limit = 50
}
attributeValuesResponse, err := d.reader.GetMetricAttributeValues(ctx, &req)
if err != nil {
return nil, err
}
return attributeValuesResponse, nil
}
func (d *JobsRepo) getMetadataAttributes(ctx context.Context, req model.JobListRequest) (map[string]map[string]string, error) {
jobAttrs := map[string]map[string]string{}
for _, key := range jobAttrsToEnrich {
hasKey := false
for _, groupByKey := range req.GroupBy {
if groupByKey.Key == key {
hasKey = true
break
}
}
if !hasKey {
req.GroupBy = append(req.GroupBy, v3.AttributeKey{Key: key})
}
}
mq := v3.BuilderQuery{
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: metricToUseForJobs,
DataType: v3.AttributeKeyDataTypeFloat64,
},
Temporality: v3.Unspecified,
GroupBy: req.GroupBy,
}
query, err := helpers.PrepareTimeseriesFilterQuery(req.Start, req.End, &mq)
if err != nil {
return nil, err
}
query = localQueryToDistributedQuery(query)
attrsListResponse, err := d.reader.GetListResultV3(ctx, query)
if err != nil {
return nil, err
}
for _, row := range attrsListResponse {
stringData := map[string]string{}
for key, value := range row.Data {
if str, ok := value.(string); ok {
stringData[key] = str
} else if strPtr, ok := value.(*string); ok {
stringData[key] = *strPtr
}
}
jobName := stringData[k8sJobNameAttrKey]
if _, ok := jobAttrs[jobName]; !ok {
jobAttrs[jobName] = map[string]string{}
}
for _, key := range req.GroupBy {
jobAttrs[jobName][key.Key] = stringData[key.Key]
}
}
return jobAttrs, nil
}
func (d *JobsRepo) getTopJobGroups(ctx context.Context, req model.JobListRequest, q *v3.QueryRangeParamsV3) ([]map[string]string, []map[string]string, error) {
step, timeSeriesTableName, samplesTableName := getParamsForTopJobs(req)
queryNames := queryNamesForJobs[req.OrderBy.ColumnName]
topJobGroupsQueryRangeParams := &v3.QueryRangeParamsV3{
Start: req.Start,
End: req.End,
Step: step,
CompositeQuery: &v3.CompositeQuery{
BuilderQueries: map[string]*v3.BuilderQuery{},
QueryType: v3.QueryTypeBuilder,
PanelType: v3.PanelTypeTable,
},
}
for _, queryName := range queryNames {
query := q.CompositeQuery.BuilderQueries[queryName].Clone()
query.StepInterval = step
query.MetricTableHints = &v3.MetricTableHints{
TimeSeriesTableName: timeSeriesTableName,
SamplesTableName: samplesTableName,
}
if req.Filters != nil && len(req.Filters.Items) > 0 {
if query.Filters == nil {
query.Filters = &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{}}
}
query.Filters.Items = append(query.Filters.Items, req.Filters.Items...)
}
topJobGroupsQueryRangeParams.CompositeQuery.BuilderQueries[queryName] = query
}
queryResponse, _, err := d.querierV2.QueryRange(ctx, topJobGroupsQueryRangeParams)
if err != nil {
return nil, nil, err
}
formattedResponse, err := postprocess.PostProcessResult(queryResponse, topJobGroupsQueryRangeParams)
if err != nil {
return nil, nil, err
}
if len(formattedResponse) == 0 || len(formattedResponse[0].Series) == 0 {
return nil, nil, nil
}
if req.OrderBy.Order == v3.DirectionDesc {
sort.Slice(formattedResponse[0].Series, func(i, j int) bool {
return formattedResponse[0].Series[i].Points[0].Value > formattedResponse[0].Series[j].Points[0].Value
})
} else {
sort.Slice(formattedResponse[0].Series, func(i, j int) bool {
return formattedResponse[0].Series[i].Points[0].Value < formattedResponse[0].Series[j].Points[0].Value
})
}
limit := math.Min(float64(req.Offset+req.Limit), float64(len(formattedResponse[0].Series)))
paginatedTopJobGroupsSeries := formattedResponse[0].Series[req.Offset:int(limit)]
topJobGroups := []map[string]string{}
for _, series := range paginatedTopJobGroupsSeries {
topJobGroups = append(topJobGroups, series.Labels)
}
allJobGroups := []map[string]string{}
for _, series := range formattedResponse[0].Series {
allJobGroups = append(allJobGroups, series.Labels)
}
return topJobGroups, allJobGroups, nil
}
func (d *JobsRepo) GetJobList(ctx context.Context, req model.JobListRequest) (model.JobListResponse, error) {
resp := model.JobListResponse{}
if req.Limit == 0 {
req.Limit = 10
}
if req.OrderBy == nil {
req.OrderBy = &v3.OrderBy{ColumnName: "desired_pods", Order: v3.DirectionDesc}
}
if req.GroupBy == nil {
req.GroupBy = []v3.AttributeKey{{Key: k8sJobNameAttrKey}}
resp.Type = model.ResponseTypeList
} else {
resp.Type = model.ResponseTypeGroupedList
}
step := int64(math.Max(float64(common.MinAllowedStepInterval(req.Start, req.End)), 60))
query := WorkloadTableListQuery.Clone()
query.Start = req.Start
query.End = req.End
query.Step = step
// add additional queries for jobs
for _, jobQuery := range builderQueriesForJobs {
query.CompositeQuery.BuilderQueries[jobQuery.QueryName] = jobQuery
}
for _, query := range query.CompositeQuery.BuilderQueries {
query.StepInterval = step
if req.Filters != nil && len(req.Filters.Items) > 0 {
if query.Filters == nil {
query.Filters = &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{}}
}
query.Filters.Items = append(query.Filters.Items, req.Filters.Items...)
}
query.GroupBy = req.GroupBy
// make sure we only get records for jobs
query.Filters.Items = append(query.Filters.Items, v3.FilterItem{
Key: v3.AttributeKey{Key: k8sJobNameAttrKey},
Operator: v3.FilterOperatorExists,
})
}
jobAttrs, err := d.getMetadataAttributes(ctx, req)
if err != nil {
return resp, err
}
topJobGroups, allJobGroups, err := d.getTopJobGroups(ctx, req, query)
if err != nil {
return resp, err
}
groupFilters := map[string][]string{}
for _, topJobGroup := range topJobGroups {
for k, v := range topJobGroup {
groupFilters[k] = append(groupFilters[k], v)
}
}
for groupKey, groupValues := range groupFilters {
hasGroupFilter := false
if req.Filters != nil && len(req.Filters.Items) > 0 {
for _, filter := range req.Filters.Items {
if filter.Key.Key == groupKey {
hasGroupFilter = true
break
}
}
}
if !hasGroupFilter {
for _, query := range query.CompositeQuery.BuilderQueries {
query.Filters.Items = append(query.Filters.Items, v3.FilterItem{
Key: v3.AttributeKey{Key: groupKey},
Value: groupValues,
Operator: v3.FilterOperatorIn,
})
}
}
}
queryResponse, _, err := d.querierV2.QueryRange(ctx, query)
if err != nil {
return resp, err
}
formattedResponse, err := postprocess.PostProcessResult(queryResponse, query)
if err != nil {
return resp, err
}
records := []model.JobListRecord{}
for _, result := range formattedResponse {
for _, row := range result.Table.Rows {
record := model.JobListRecord{
JobName: "",
CPUUsage: -1,
CPURequest: -1,
CPULimit: -1,
MemoryUsage: -1,
MemoryRequest: -1,
MemoryLimit: -1,
DesiredSuccessfulPods: -1,
ActivePods: -1,
FailedPods: -1,
SuccessfulPods: -1,
}
if jobName, ok := row.Data[k8sJobNameAttrKey].(string); ok {
record.JobName = jobName
}
if cpu, ok := row.Data["A"].(float64); ok {
record.CPUUsage = cpu
}
if cpuRequest, ok := row.Data["B"].(float64); ok {
record.CPURequest = cpuRequest
}
if cpuLimit, ok := row.Data["C"].(float64); ok {
record.CPULimit = cpuLimit
}
if memory, ok := row.Data["D"].(float64); ok {
record.MemoryUsage = memory
}
if memoryRequest, ok := row.Data["E"].(float64); ok {
record.MemoryRequest = memoryRequest
}
if memoryLimit, ok := row.Data["F"].(float64); ok {
record.MemoryLimit = memoryLimit
}
if restarts, ok := row.Data["G"].(float64); ok {
record.Restarts = int(restarts)
}
if desiredSuccessfulPods, ok := row.Data["H"].(float64); ok {
record.DesiredSuccessfulPods = int(desiredSuccessfulPods)
}
if activePods, ok := row.Data["I"].(float64); ok {
record.ActivePods = int(activePods)
}
if failedPods, ok := row.Data["J"].(float64); ok {
record.FailedPods = int(failedPods)
}
if successfulPods, ok := row.Data["K"].(float64); ok {
record.SuccessfulPods = int(successfulPods)
}
record.Meta = map[string]string{}
if _, ok := jobAttrs[record.JobName]; ok {
record.Meta = jobAttrs[record.JobName]
}
for k, v := range row.Data {
if slices.Contains(jobQueryNames, k) {
continue
}
if labelValue, ok := v.(string); ok {
record.Meta[k] = labelValue
}
}
records = append(records, record)
}
}
resp.Total = len(allJobGroups)
resp.Records = records
return resp, nil
}

View File

@ -0,0 +1,444 @@
package inframetrics
import (
"context"
"math"
"sort"
"go.signoz.io/signoz/pkg/query-service/app/metrics/v4/helpers"
"go.signoz.io/signoz/pkg/query-service/common"
"go.signoz.io/signoz/pkg/query-service/interfaces"
"go.signoz.io/signoz/pkg/query-service/model"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/query-service/postprocess"
"golang.org/x/exp/slices"
)
var (
metricToUseForStatefulSets = "k8s_pod_cpu_utilization"
k8sStatefulSetNameAttrKey = "k8s_statefulset_name"
metricNamesForStatefulSets = map[string]string{
"desired_pods": "k8s_statefulset_desired_pods",
"available_pods": "k8s_statefulset_current_pods",
}
statefulSetAttrsToEnrich = []string{
"k8s_statefulset_name",
"k8s_namespace_name",
"k8s_cluster_name",
}
queryNamesForStatefulSets = map[string][]string{
"cpu": {"A"},
"cpu_request": {"B", "A"},
"cpu_limit": {"C", "A"},
"memory": {"D"},
"memory_request": {"E", "D"},
"memory_limit": {"F", "D"},
"restarts": {"G", "A"},
"desired_pods": {"H"},
"available_pods": {"I"},
}
builderQueriesForStatefulSets = map[string]*v3.BuilderQuery{
// desired pods
"H": {
QueryName: "H",
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: metricNamesForStatefulSets["desired_pods"],
DataType: v3.AttributeKeyDataTypeFloat64,
},
Temporality: v3.Unspecified,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{},
},
GroupBy: []v3.AttributeKey{},
Expression: "H",
ReduceTo: v3.ReduceToOperatorLast,
TimeAggregation: v3.TimeAggregationAnyLast,
SpaceAggregation: v3.SpaceAggregationSum,
Disabled: false,
},
// available pods
"I": {
QueryName: "I",
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: metricNamesForStatefulSets["available_pods"],
DataType: v3.AttributeKeyDataTypeFloat64,
},
Temporality: v3.Unspecified,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{},
},
GroupBy: []v3.AttributeKey{},
Expression: "I",
ReduceTo: v3.ReduceToOperatorLast,
TimeAggregation: v3.TimeAggregationAnyLast,
SpaceAggregation: v3.SpaceAggregationSum,
Disabled: false,
},
}
statefulSetQueryNames = []string{"A", "B", "C", "D", "E", "F", "G", "H", "I"}
)
type StatefulSetsRepo struct {
reader interfaces.Reader
querierV2 interfaces.Querier
}
func NewStatefulSetsRepo(reader interfaces.Reader, querierV2 interfaces.Querier) *StatefulSetsRepo {
return &StatefulSetsRepo{reader: reader, querierV2: querierV2}
}
func (d *StatefulSetsRepo) GetStatefulSetAttributeKeys(ctx context.Context, req v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) {
// TODO(srikanthccv): remove hardcoded metric name and support keys from any pod metric
req.DataSource = v3.DataSourceMetrics
req.AggregateAttribute = metricToUseForStatefulSets
if req.Limit == 0 {
req.Limit = 50
}
attributeKeysResponse, err := d.reader.GetMetricAttributeKeys(ctx, &req)
if err != nil {
return nil, err
}
// TODO(srikanthccv): only return resource attributes when we have a way to
// distinguish between resource attributes and other attributes.
filteredKeys := []v3.AttributeKey{}
for _, key := range attributeKeysResponse.AttributeKeys {
if slices.Contains(pointAttrsToIgnore, key.Key) {
continue
}
filteredKeys = append(filteredKeys, key)
}
return &v3.FilterAttributeKeyResponse{AttributeKeys: filteredKeys}, nil
}
func (d *StatefulSetsRepo) GetStatefulSetAttributeValues(ctx context.Context, req v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error) {
req.DataSource = v3.DataSourceMetrics
req.AggregateAttribute = metricToUseForStatefulSets
if req.Limit == 0 {
req.Limit = 50
}
attributeValuesResponse, err := d.reader.GetMetricAttributeValues(ctx, &req)
if err != nil {
return nil, err
}
return attributeValuesResponse, nil
}
func (d *StatefulSetsRepo) getMetadataAttributes(ctx context.Context, req model.StatefulSetListRequest) (map[string]map[string]string, error) {
statefulSetAttrs := map[string]map[string]string{}
for _, key := range statefulSetAttrsToEnrich {
hasKey := false
for _, groupByKey := range req.GroupBy {
if groupByKey.Key == key {
hasKey = true
break
}
}
if !hasKey {
req.GroupBy = append(req.GroupBy, v3.AttributeKey{Key: key})
}
}
mq := v3.BuilderQuery{
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: metricToUseForStatefulSets,
DataType: v3.AttributeKeyDataTypeFloat64,
},
Temporality: v3.Unspecified,
GroupBy: req.GroupBy,
}
query, err := helpers.PrepareTimeseriesFilterQuery(req.Start, req.End, &mq)
if err != nil {
return nil, err
}
query = localQueryToDistributedQuery(query)
attrsListResponse, err := d.reader.GetListResultV3(ctx, query)
if err != nil {
return nil, err
}
for _, row := range attrsListResponse {
stringData := map[string]string{}
for key, value := range row.Data {
if str, ok := value.(string); ok {
stringData[key] = str
} else if strPtr, ok := value.(*string); ok {
stringData[key] = *strPtr
}
}
statefulSetName := stringData[k8sStatefulSetNameAttrKey]
if _, ok := statefulSetAttrs[statefulSetName]; !ok {
statefulSetAttrs[statefulSetName] = map[string]string{}
}
for _, key := range req.GroupBy {
statefulSetAttrs[statefulSetName][key.Key] = stringData[key.Key]
}
}
return statefulSetAttrs, nil
}
func (d *StatefulSetsRepo) getTopStatefulSetGroups(ctx context.Context, req model.StatefulSetListRequest, q *v3.QueryRangeParamsV3) ([]map[string]string, []map[string]string, error) {
step, timeSeriesTableName, samplesTableName := getParamsForTopStatefulSets(req)
queryNames := queryNamesForStatefulSets[req.OrderBy.ColumnName]
topStatefulSetGroupsQueryRangeParams := &v3.QueryRangeParamsV3{
Start: req.Start,
End: req.End,
Step: step,
CompositeQuery: &v3.CompositeQuery{
BuilderQueries: map[string]*v3.BuilderQuery{},
QueryType: v3.QueryTypeBuilder,
PanelType: v3.PanelTypeTable,
},
}
for _, queryName := range queryNames {
query := q.CompositeQuery.BuilderQueries[queryName].Clone()
query.StepInterval = step
query.MetricTableHints = &v3.MetricTableHints{
TimeSeriesTableName: timeSeriesTableName,
SamplesTableName: samplesTableName,
}
if req.Filters != nil && len(req.Filters.Items) > 0 {
if query.Filters == nil {
query.Filters = &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{}}
}
query.Filters.Items = append(query.Filters.Items, req.Filters.Items...)
}
topStatefulSetGroupsQueryRangeParams.CompositeQuery.BuilderQueries[queryName] = query
}
queryResponse, _, err := d.querierV2.QueryRange(ctx, topStatefulSetGroupsQueryRangeParams)
if err != nil {
return nil, nil, err
}
formattedResponse, err := postprocess.PostProcessResult(queryResponse, topStatefulSetGroupsQueryRangeParams)
if err != nil {
return nil, nil, err
}
if len(formattedResponse) == 0 || len(formattedResponse[0].Series) == 0 {
return nil, nil, nil
}
if req.OrderBy.Order == v3.DirectionDesc {
sort.Slice(formattedResponse[0].Series, func(i, j int) bool {
return formattedResponse[0].Series[i].Points[0].Value > formattedResponse[0].Series[j].Points[0].Value
})
} else {
sort.Slice(formattedResponse[0].Series, func(i, j int) bool {
return formattedResponse[0].Series[i].Points[0].Value < formattedResponse[0].Series[j].Points[0].Value
})
}
limit := math.Min(float64(req.Offset+req.Limit), float64(len(formattedResponse[0].Series)))
paginatedTopStatefulSetGroupsSeries := formattedResponse[0].Series[req.Offset:int(limit)]
topStatefulSetGroups := []map[string]string{}
for _, series := range paginatedTopStatefulSetGroupsSeries {
topStatefulSetGroups = append(topStatefulSetGroups, series.Labels)
}
allStatefulSetGroups := []map[string]string{}
for _, series := range formattedResponse[0].Series {
allStatefulSetGroups = append(allStatefulSetGroups, series.Labels)
}
return topStatefulSetGroups, allStatefulSetGroups, nil
}
func (d *StatefulSetsRepo) GetStatefulSetList(ctx context.Context, req model.StatefulSetListRequest) (model.StatefulSetListResponse, error) {
resp := model.StatefulSetListResponse{}
if req.Limit == 0 {
req.Limit = 10
}
if req.OrderBy == nil {
req.OrderBy = &v3.OrderBy{ColumnName: "cpu", Order: v3.DirectionDesc}
}
if req.GroupBy == nil {
req.GroupBy = []v3.AttributeKey{{Key: k8sStatefulSetNameAttrKey}}
resp.Type = model.ResponseTypeList
} else {
resp.Type = model.ResponseTypeGroupedList
}
step := int64(math.Max(float64(common.MinAllowedStepInterval(req.Start, req.End)), 60))
query := WorkloadTableListQuery.Clone()
query.Start = req.Start
query.End = req.End
query.Step = step
// add additional queries for stateful sets
for _, statefulSetQuery := range builderQueriesForStatefulSets {
query.CompositeQuery.BuilderQueries[statefulSetQuery.QueryName] = statefulSetQuery
}
for _, query := range query.CompositeQuery.BuilderQueries {
query.StepInterval = step
if req.Filters != nil && len(req.Filters.Items) > 0 {
if query.Filters == nil {
query.Filters = &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{}}
}
query.Filters.Items = append(query.Filters.Items, req.Filters.Items...)
}
query.GroupBy = req.GroupBy
// make sure we only get records for daemon sets
query.Filters.Items = append(query.Filters.Items, v3.FilterItem{
Key: v3.AttributeKey{Key: k8sStatefulSetNameAttrKey},
Operator: v3.FilterOperatorExists,
})
}
statefulSetAttrs, err := d.getMetadataAttributes(ctx, req)
if err != nil {
return resp, err
}
topStatefulSetGroups, allStatefulSetGroups, err := d.getTopStatefulSetGroups(ctx, req, query)
if err != nil {
return resp, err
}
groupFilters := map[string][]string{}
for _, topStatefulSetGroup := range topStatefulSetGroups {
for k, v := range topStatefulSetGroup {
groupFilters[k] = append(groupFilters[k], v)
}
}
for groupKey, groupValues := range groupFilters {
hasGroupFilter := false
if req.Filters != nil && len(req.Filters.Items) > 0 {
for _, filter := range req.Filters.Items {
if filter.Key.Key == groupKey {
hasGroupFilter = true
break
}
}
}
if !hasGroupFilter {
for _, query := range query.CompositeQuery.BuilderQueries {
query.Filters.Items = append(query.Filters.Items, v3.FilterItem{
Key: v3.AttributeKey{Key: groupKey},
Value: groupValues,
Operator: v3.FilterOperatorIn,
})
}
}
}
queryResponse, _, err := d.querierV2.QueryRange(ctx, query)
if err != nil {
return resp, err
}
formattedResponse, err := postprocess.PostProcessResult(queryResponse, query)
if err != nil {
return resp, err
}
records := []model.StatefulSetListRecord{}
for _, result := range formattedResponse {
for _, row := range result.Table.Rows {
record := model.StatefulSetListRecord{
StatefulSetName: "",
CPUUsage: -1,
CPURequest: -1,
CPULimit: -1,
MemoryUsage: -1,
MemoryRequest: -1,
MemoryLimit: -1,
DesiredPods: -1,
AvailablePods: -1,
}
if statefulSetName, ok := row.Data[k8sStatefulSetNameAttrKey].(string); ok {
record.StatefulSetName = statefulSetName
}
if cpu, ok := row.Data["A"].(float64); ok {
record.CPUUsage = cpu
}
if cpuRequest, ok := row.Data["B"].(float64); ok {
record.CPURequest = cpuRequest
}
if cpuLimit, ok := row.Data["C"].(float64); ok {
record.CPULimit = cpuLimit
}
if memory, ok := row.Data["D"].(float64); ok {
record.MemoryUsage = memory
}
if memoryRequest, ok := row.Data["E"].(float64); ok {
record.MemoryRequest = memoryRequest
}
if memoryLimit, ok := row.Data["F"].(float64); ok {
record.MemoryLimit = memoryLimit
}
if restarts, ok := row.Data["G"].(float64); ok {
record.Restarts = int(restarts)
}
if desiredPods, ok := row.Data["H"].(float64); ok {
record.DesiredPods = int(desiredPods)
}
if availablePods, ok := row.Data["I"].(float64); ok {
record.AvailablePods = int(availablePods)
}
record.Meta = map[string]string{}
if _, ok := statefulSetAttrs[record.StatefulSetName]; ok {
record.Meta = statefulSetAttrs[record.StatefulSetName]
}
for k, v := range row.Data {
if slices.Contains(statefulSetQueryNames, k) {
continue
}
if labelValue, ok := v.(string); ok {
record.Meta[k] = labelValue
}
}
records = append(records, record)
}
}
resp.Total = len(allStatefulSetGroups)
resp.Records = records
return resp, nil
}

View File

@ -0,0 +1,166 @@
package inframetrics
import v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
var (
metricNamesForWorkloads = map[string]string{
"cpu": "k8s_pod_cpu_utilization",
"cpu_req": "k8s_pod_cpu_request_utilization",
"cpu_limit": "k8s_pod_cpu_limit_utilization",
"memory": "k8s_pod_memory_usage",
"memory_req": "k8s_pod_memory_request_utilization",
"memory_limit": "k8s_pod_memory_limit_utilization",
"restarts": "k8s_container_restarts",
}
)
var WorkloadTableListQuery = v3.QueryRangeParamsV3{
CompositeQuery: &v3.CompositeQuery{
BuilderQueries: map[string]*v3.BuilderQuery{
// pod cpu utilization
"A": {
QueryName: "A",
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: metricNamesForWorkloads["cpu"],
DataType: v3.AttributeKeyDataTypeFloat64,
},
Temporality: v3.Unspecified,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{},
},
GroupBy: []v3.AttributeKey{},
Expression: "A",
ReduceTo: v3.ReduceToOperatorAvg,
TimeAggregation: v3.TimeAggregationAvg,
SpaceAggregation: v3.SpaceAggregationSum,
Disabled: false,
},
// pod cpu request utilization
"B": {
QueryName: "B",
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: metricNamesForWorkloads["cpu_request"],
DataType: v3.AttributeKeyDataTypeFloat64,
},
Temporality: v3.Unspecified,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{},
},
GroupBy: []v3.AttributeKey{},
Expression: "B",
ReduceTo: v3.ReduceToOperatorAvg,
TimeAggregation: v3.TimeAggregationAvg,
SpaceAggregation: v3.SpaceAggregationSum,
Disabled: false,
},
// pod cpu limit utilization
"C": {
QueryName: "C",
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: metricNamesForWorkloads["cpu_limit"],
DataType: v3.AttributeKeyDataTypeFloat64,
},
Temporality: v3.Unspecified,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{},
},
GroupBy: []v3.AttributeKey{},
Expression: "C",
ReduceTo: v3.ReduceToOperatorAvg,
TimeAggregation: v3.TimeAggregationAvg,
SpaceAggregation: v3.SpaceAggregationSum,
Disabled: false,
},
// pod memory utilization
"D": {
QueryName: "D",
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: metricNamesForWorkloads["memory"],
DataType: v3.AttributeKeyDataTypeFloat64,
},
Temporality: v3.Unspecified,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{},
},
GroupBy: []v3.AttributeKey{},
Expression: "D",
ReduceTo: v3.ReduceToOperatorAvg,
TimeAggregation: v3.TimeAggregationAvg,
SpaceAggregation: v3.SpaceAggregationSum,
Disabled: false,
},
// pod memory request utilization
"E": {
QueryName: "E",
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: metricNamesForWorkloads["memory_request"],
DataType: v3.AttributeKeyDataTypeFloat64,
},
Temporality: v3.Unspecified,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{},
},
GroupBy: []v3.AttributeKey{},
Expression: "E",
ReduceTo: v3.ReduceToOperatorAvg,
TimeAggregation: v3.TimeAggregationAvg,
SpaceAggregation: v3.SpaceAggregationSum,
Disabled: false,
},
// pod memory limit utilization
"F": {
QueryName: "F",
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: metricNamesForWorkloads["memory_limit"],
DataType: v3.AttributeKeyDataTypeFloat64,
},
Temporality: v3.Unspecified,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{},
},
GroupBy: []v3.AttributeKey{},
Expression: "F",
ReduceTo: v3.ReduceToOperatorAvg,
TimeAggregation: v3.TimeAggregationAvg,
SpaceAggregation: v3.SpaceAggregationSum,
Disabled: false,
},
"G": {
QueryName: "G",
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: metricNamesForWorkloads["restarts"],
DataType: v3.AttributeKeyDataTypeFloat64,
},
Temporality: v3.Unspecified,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{},
},
GroupBy: []v3.AttributeKey{},
Expression: "G",
ReduceTo: v3.ReduceToOperatorSum,
TimeAggregation: v3.TimeAggregationAnyLast,
SpaceAggregation: v3.SpaceAggregationMax,
Functions: []v3.Function{{Name: v3.FunctionNameRunningDiff}},
Disabled: false,
},
},
PanelType: v3.PanelTypeTable,
QueryType: v3.QueryTypeBuilder,
},
Version: "v4",
FormatForWeb: true,
}

View File

@ -173,3 +173,125 @@ type ClusterListRecord struct {
MemoryAllocatable float64 `json:"memoryAllocatable"` MemoryAllocatable float64 `json:"memoryAllocatable"`
Meta map[string]string `json:"meta"` Meta map[string]string `json:"meta"`
} }
type DeploymentListRequest struct {
Start int64 `json:"start"` // epoch time in ms
End int64 `json:"end"` // epoch time in ms
Filters *v3.FilterSet `json:"filters"`
GroupBy []v3.AttributeKey `json:"groupBy"`
OrderBy *v3.OrderBy `json:"orderBy"`
Offset int `json:"offset"`
Limit int `json:"limit"`
}
type DeploymentListResponse struct {
Type ResponseType `json:"type"`
Records []DeploymentListRecord `json:"records"`
Total int `json:"total"`
}
type DeploymentListRecord struct {
DeploymentName string `json:"deploymentName"`
CPUUsage float64 `json:"cpuUsage"`
MemoryUsage float64 `json:"memoryUsage"`
DesiredPods int `json:"desiredPods"`
AvailablePods int `json:"availablePods"`
CPURequest float64 `json:"cpuRequest"`
MemoryRequest float64 `json:"memoryRequest"`
CPULimit float64 `json:"cpuLimit"`
MemoryLimit float64 `json:"memoryLimit"`
Restarts int `json:"restarts"`
Meta map[string]string `json:"meta"`
}
type DaemonSetListRequest struct {
Start int64 `json:"start"` // epoch time in ms
End int64 `json:"end"` // epoch time in ms
Filters *v3.FilterSet `json:"filters"`
GroupBy []v3.AttributeKey `json:"groupBy"`
OrderBy *v3.OrderBy `json:"orderBy"`
Offset int `json:"offset"`
Limit int `json:"limit"`
}
type DaemonSetListResponse struct {
Type ResponseType `json:"type"`
Records []DaemonSetListRecord `json:"records"`
Total int `json:"total"`
}
type DaemonSetListRecord struct {
DaemonSetName string `json:"daemonSetName"`
CPUUsage float64 `json:"cpuUsage"`
MemoryUsage float64 `json:"memoryUsage"`
CPURequest float64 `json:"cpuRequest"`
MemoryRequest float64 `json:"memoryRequest"`
CPULimit float64 `json:"cpuLimit"`
MemoryLimit float64 `json:"memoryLimit"`
Restarts int `json:"restarts"`
DesiredNodes int `json:"desiredNodes"`
AvailableNodes int `json:"availableNodes"`
Meta map[string]string `json:"meta"`
}
type StatefulSetListRequest struct {
Start int64 `json:"start"` // epoch time in ms
End int64 `json:"end"` // epoch time in ms
Filters *v3.FilterSet `json:"filters"`
GroupBy []v3.AttributeKey `json:"groupBy"`
OrderBy *v3.OrderBy `json:"orderBy"`
Offset int `json:"offset"`
Limit int `json:"limit"`
}
type StatefulSetListResponse struct {
Type ResponseType `json:"type"`
Records []StatefulSetListRecord `json:"records"`
Total int `json:"total"`
}
type StatefulSetListRecord struct {
StatefulSetName string `json:"statefulSetName"`
CPUUsage float64 `json:"cpuUsage"`
MemoryUsage float64 `json:"memoryUsage"`
CPURequest float64 `json:"cpuRequest"`
MemoryRequest float64 `json:"memoryRequest"`
CPULimit float64 `json:"cpuLimit"`
MemoryLimit float64 `json:"memoryLimit"`
Restarts int `json:"restarts"`
DesiredPods int `json:"desiredPods"`
AvailablePods int `json:"availablePods"`
Meta map[string]string `json:"meta"`
}
type JobListRequest struct {
Start int64 `json:"start"` // epoch time in ms
End int64 `json:"end"` // epoch time in ms
Filters *v3.FilterSet `json:"filters"`
GroupBy []v3.AttributeKey `json:"groupBy"`
OrderBy *v3.OrderBy `json:"orderBy"`
Offset int `json:"offset"`
Limit int `json:"limit"`
}
type JobListResponse struct {
Type ResponseType `json:"type"`
Records []JobListRecord `json:"records"`
Total int `json:"total"`
}
type JobListRecord struct {
JobName string `json:"jobName"`
CPUUsage float64 `json:"cpuUsage"`
MemoryUsage float64 `json:"memoryUsage"`
CPURequest float64 `json:"cpuRequest"`
MemoryRequest float64 `json:"memoryRequest"`
CPULimit float64 `json:"cpuLimit"`
MemoryLimit float64 `json:"memoryLimit"`
Restarts int `json:"restarts"`
DesiredSuccessfulPods int `json:"desiredSuccessfulPods"`
ActivePods int `json:"activePods"`
FailedPods int `json:"failedPods"`
SuccessfulPods int `json:"successfulPods"`
Meta map[string]string `json:"meta"`
}