mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-08-08 17:08:58 +08:00
chore: add k8s metrics receiving status (#6977)
This commit is contained in:
parent
536656281d
commit
c8032f771e
@ -448,6 +448,9 @@ func (aH *APIHandler) RegisterInfraMetricsRoutes(router *mux.Router, am *AuthMid
|
||||
jobsSubRouter.HandleFunc("/attribute_keys", am.ViewAccess(aH.getJobAttributeKeys)).Methods(http.MethodGet)
|
||||
jobsSubRouter.HandleFunc("/attribute_values", am.ViewAccess(aH.getJobAttributeValues)).Methods(http.MethodGet)
|
||||
jobsSubRouter.HandleFunc("/list", am.ViewAccess(aH.getJobList)).Methods(http.MethodPost)
|
||||
|
||||
infraOnboardingSubRouter := router.PathPrefix("/api/v1/infra_onboarding").Subrouter()
|
||||
infraOnboardingSubRouter.HandleFunc("/k8s/status", am.ViewAccess(aH.getK8sInfraOnboardingStatus)).Methods(http.MethodGet)
|
||||
}
|
||||
|
||||
func (aH *APIHandler) RegisterWebSocketPaths(router *mux.Router, am *AuthMiddleware) {
|
||||
|
@ -597,3 +597,52 @@ func (aH *APIHandler) getPvcAttributeValues(w http.ResponseWriter, r *http.Reque
|
||||
|
||||
aH.Respond(w, values)
|
||||
}
|
||||
|
||||
func (aH *APIHandler) getK8sInfraOnboardingStatus(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := r.Context()
|
||||
|
||||
status := model.OnboardingStatus{}
|
||||
|
||||
didSendPodMetrics, err := aH.podsRepo.DidSendPodMetrics(ctx)
|
||||
if err != nil {
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
if !didSendPodMetrics {
|
||||
aH.Respond(w, status)
|
||||
return
|
||||
}
|
||||
|
||||
didSendClusterMetrics, err := aH.podsRepo.DidSendClusterMetrics(ctx)
|
||||
if err != nil {
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
didSendNodeMetrics, err := aH.nodesRepo.DidSendNodeMetrics(ctx)
|
||||
if err != nil {
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
didSendOptionalPodMetrics, err := aH.podsRepo.IsSendingOptionalPodMetrics(ctx)
|
||||
if err != nil {
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
requiredMetadata, err := aH.podsRepo.SendingRequiredMetadata(ctx)
|
||||
if err != nil {
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
status.DidSendPodMetrics = didSendPodMetrics
|
||||
status.DidSendClusterMetrics = didSendClusterMetrics
|
||||
status.DidSendNodeMetrics = didSendNodeMetrics
|
||||
status.IsSendingOptionalPodMetrics = didSendOptionalPodMetrics
|
||||
status.IsSendingRequiredMetadata = requiredMetadata
|
||||
|
||||
aH.Respond(w, status)
|
||||
}
|
||||
|
@ -8,6 +8,84 @@ import (
|
||||
"go.signoz.io/signoz/pkg/query-service/model"
|
||||
)
|
||||
|
||||
var (
|
||||
// TODO(srikanthccv): import metadata yaml from receivers and use generated files to check the metrics
|
||||
podMetricNamesToCheck = []string{
|
||||
"k8s_pod_cpu_utilization",
|
||||
"k8s_pod_memory_usage",
|
||||
"k8s_pod_cpu_request_utilization",
|
||||
"k8s_pod_memory_request_utilization",
|
||||
"k8s_pod_cpu_limit_utilization",
|
||||
"k8s_pod_memory_limit_utilization",
|
||||
"k8s_container_restarts",
|
||||
"k8s_pod_phase",
|
||||
}
|
||||
nodeMetricNamesToCheck = []string{
|
||||
"k8s_node_cpu_utilization",
|
||||
"k8s_node_allocatable_cpu",
|
||||
"k8s_node_memory_usage",
|
||||
"k8s_node_allocatable_memory",
|
||||
"k8s_node_condition_ready",
|
||||
}
|
||||
clusterMetricNamesToCheck = []string{
|
||||
"k8s_daemonset_desired_scheduled_nodes",
|
||||
"k8s_daemonset_current_scheduled_nodes",
|
||||
"k8s_deployment_desired",
|
||||
"k8s_deployment_available",
|
||||
"k8s_job_desired_successful_pods",
|
||||
"k8s_job_active_pods",
|
||||
"k8s_job_failed_pods",
|
||||
"k8s_job_successful_pods",
|
||||
"k8s_statefulset_desired_pods",
|
||||
"k8s_statefulset_current_pods",
|
||||
}
|
||||
optionalPodMetricNamesToCheck = []string{
|
||||
"k8s_pod_cpu_request_utilization",
|
||||
"k8s_pod_memory_request_utilization",
|
||||
"k8s_pod_cpu_limit_utilization",
|
||||
"k8s_pod_memory_limit_utilization",
|
||||
}
|
||||
|
||||
// did they ever send _any_ pod metrics?
|
||||
didSendPodMetricsQuery = `
|
||||
SELECT count() FROM %s.%s WHERE metric_name IN (%s)
|
||||
`
|
||||
|
||||
// did they ever send any node metrics?
|
||||
didSendNodeMetricsQuery = `
|
||||
SELECT count() FROM %s.%s WHERE metric_name IN (%s)
|
||||
`
|
||||
|
||||
// did they ever send any cluster metrics?
|
||||
didSendClusterMetricsQuery = `
|
||||
SELECT count() FROM %s.%s WHERE metric_name IN (%s)
|
||||
`
|
||||
|
||||
// if they ever sent _any_ pod metrics, we assume they know how to send pod metrics
|
||||
// now, are they sending optional pod metrics such request/limit metrics?
|
||||
isSendingOptionalPodMetricsQuery = `
|
||||
SELECT count() FROM %s.%s WHERE metric_name IN (%s)
|
||||
`
|
||||
|
||||
// there should be [cluster, node, namespace, one of (deployment, statefulset, daemonset, cronjob, job)] for each pod
|
||||
isSendingRequiredMetadataQuery = `
|
||||
SELECT any(JSONExtractString(labels, 'k8s_cluster_name')) as k8s_cluster_name,
|
||||
any(JSONExtractString(labels, 'k8s_node_name')) as k8s_node_name,
|
||||
any(JSONExtractString(labels, 'k8s_namespace_name')) as k8s_namespace_name,
|
||||
any(JSONExtractString(labels, 'k8s_deployment_name')) as k8s_deployment_name,
|
||||
any(JSONExtractString(labels, 'k8s_statefulset_name')) as k8s_statefulset_name,
|
||||
any(JSONExtractString(labels, 'k8s_daemonset_name')) as k8s_daemonset_name,
|
||||
any(JSONExtractString(labels, 'k8s_cronjob_name')) as k8s_cronjob_name,
|
||||
any(JSONExtractString(labels, 'k8s_job_name')) as k8s_job_name,
|
||||
JSONExtractString(labels, 'k8s_pod_name') as k8s_pod_name
|
||||
FROM %s.%s WHERE metric_name IN (%s)
|
||||
AND (unix_milli >= (toUnixTimestamp(now() - toIntervalMinute(60)) * 1000))
|
||||
AND JSONExtractString(labels, 'k8s_namespace_name') NOT IN ('kube-system', 'kube-public', 'kube-node-lease', 'metallb-system')
|
||||
GROUP BY k8s_pod_name
|
||||
LIMIT 1 BY k8s_cluster_name, k8s_node_name, k8s_namespace_name
|
||||
`
|
||||
)
|
||||
|
||||
// getParamsForTopItems returns the step, time series table name and samples table name
|
||||
// for the top items query. what are we doing here?
|
||||
// we want to identify the top hosts/pods/nodes quickly, so we use pre-aggregated data
|
||||
|
@ -2,11 +2,14 @@ package inframetrics
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
"sort"
|
||||
"strings"
|
||||
|
||||
"go.signoz.io/signoz/pkg/query-service/app/metrics/v4/helpers"
|
||||
"go.signoz.io/signoz/pkg/query-service/common"
|
||||
"go.signoz.io/signoz/pkg/query-service/constants"
|
||||
"go.signoz.io/signoz/pkg/query-service/interfaces"
|
||||
"go.signoz.io/signoz/pkg/query-service/model"
|
||||
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||
@ -62,6 +65,20 @@ func (n *NodesRepo) GetNodeAttributeKeys(ctx context.Context, req v3.FilterAttri
|
||||
return attributeKeysResponse, nil
|
||||
}
|
||||
|
||||
func (n *NodesRepo) DidSendNodeMetrics(ctx context.Context) (bool, error) {
|
||||
namesStr := "'" + strings.Join(nodeMetricNamesToCheck, "','") + "'"
|
||||
|
||||
query := fmt.Sprintf(didSendNodeMetricsQuery,
|
||||
constants.SIGNOZ_METRIC_DBNAME, constants.SIGNOZ_TIMESERIES_v4_1DAY_TABLENAME, namesStr)
|
||||
|
||||
count, err := n.reader.GetCountOfThings(ctx, query)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
return count > 0, nil
|
||||
}
|
||||
|
||||
func (n *NodesRepo) GetNodeAttributeValues(ctx context.Context, req v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error) {
|
||||
req.DataSource = v3.DataSourceMetrics
|
||||
req.AggregateAttribute = metricToUseForNodes
|
||||
|
@ -2,11 +2,14 @@ package inframetrics
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
"sort"
|
||||
"strings"
|
||||
|
||||
"go.signoz.io/signoz/pkg/query-service/app/metrics/v4/helpers"
|
||||
"go.signoz.io/signoz/pkg/query-service/common"
|
||||
"go.signoz.io/signoz/pkg/query-service/constants"
|
||||
"go.signoz.io/signoz/pkg/query-service/interfaces"
|
||||
"go.signoz.io/signoz/pkg/query-service/model"
|
||||
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||
@ -105,6 +108,137 @@ func (p *PodsRepo) GetPodAttributeValues(ctx context.Context, req v3.FilterAttri
|
||||
return attributeValuesResponse, nil
|
||||
}
|
||||
|
||||
func (p *PodsRepo) DidSendPodMetrics(ctx context.Context) (bool, error) {
|
||||
namesStr := "'" + strings.Join(podMetricNamesToCheck, "','") + "'"
|
||||
|
||||
query := fmt.Sprintf(didSendPodMetricsQuery,
|
||||
constants.SIGNOZ_METRIC_DBNAME, constants.SIGNOZ_TIMESERIES_v4_1DAY_TABLENAME, namesStr)
|
||||
|
||||
count, err := p.reader.GetCountOfThings(ctx, query)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
return count > 0, nil
|
||||
}
|
||||
|
||||
func (p *PodsRepo) DidSendClusterMetrics(ctx context.Context) (bool, error) {
|
||||
namesStr := "'" + strings.Join(clusterMetricNamesToCheck, "','") + "'"
|
||||
|
||||
query := fmt.Sprintf(didSendClusterMetricsQuery,
|
||||
constants.SIGNOZ_METRIC_DBNAME, constants.SIGNOZ_TIMESERIES_v4_1DAY_TABLENAME, namesStr)
|
||||
|
||||
count, err := p.reader.GetCountOfThings(ctx, query)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
return count > 0, nil
|
||||
}
|
||||
|
||||
func (p *PodsRepo) IsSendingOptionalPodMetrics(ctx context.Context) (bool, error) {
|
||||
namesStr := "'" + strings.Join(optionalPodMetricNamesToCheck, "','") + "'"
|
||||
|
||||
query := fmt.Sprintf(isSendingOptionalPodMetricsQuery,
|
||||
constants.SIGNOZ_METRIC_DBNAME, constants.SIGNOZ_TIMESERIES_v4_1DAY_TABLENAME, namesStr)
|
||||
|
||||
count, err := p.reader.GetCountOfThings(ctx, query)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
return count > 0, nil
|
||||
}
|
||||
|
||||
func (p *PodsRepo) SendingRequiredMetadata(ctx context.Context) ([]model.PodOnboardingStatus, error) {
|
||||
namesStr := "'" + strings.Join(podMetricNamesToCheck, "','") + "'"
|
||||
|
||||
query := fmt.Sprintf(isSendingRequiredMetadataQuery,
|
||||
constants.SIGNOZ_METRIC_DBNAME, constants.SIGNOZ_TIMESERIES_V4_TABLENAME, namesStr)
|
||||
|
||||
result, err := p.reader.GetListResultV3(ctx, query)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
statuses := []model.PodOnboardingStatus{}
|
||||
|
||||
// for each pod, check if we have all the required metadata
|
||||
for _, row := range result {
|
||||
status := model.PodOnboardingStatus{}
|
||||
switch v := row.Data["k8s_cluster_name"].(type) {
|
||||
case string:
|
||||
status.HasClusterName = true
|
||||
status.ClusterName = v
|
||||
case *string:
|
||||
status.HasClusterName = *v != ""
|
||||
status.ClusterName = *v
|
||||
}
|
||||
switch v := row.Data["k8s_node_name"].(type) {
|
||||
case string:
|
||||
status.HasNodeName = true
|
||||
status.NodeName = v
|
||||
case *string:
|
||||
status.HasNodeName = *v != ""
|
||||
status.NodeName = *v
|
||||
}
|
||||
switch v := row.Data["k8s_namespace_name"].(type) {
|
||||
case string:
|
||||
status.HasNamespaceName = true
|
||||
status.NamespaceName = v
|
||||
case *string:
|
||||
status.HasNamespaceName = *v != ""
|
||||
status.NamespaceName = *v
|
||||
}
|
||||
switch v := row.Data["k8s_deployment_name"].(type) {
|
||||
case string:
|
||||
status.HasDeploymentName = true
|
||||
case *string:
|
||||
status.HasDeploymentName = *v != ""
|
||||
}
|
||||
switch v := row.Data["k8s_statefulset_name"].(type) {
|
||||
case string:
|
||||
status.HasStatefulsetName = true
|
||||
case *string:
|
||||
status.HasStatefulsetName = *v != ""
|
||||
}
|
||||
switch v := row.Data["k8s_daemonset_name"].(type) {
|
||||
case string:
|
||||
status.HasDaemonsetName = true
|
||||
case *string:
|
||||
status.HasDaemonsetName = *v != ""
|
||||
}
|
||||
switch v := row.Data["k8s_cronjob_name"].(type) {
|
||||
case string:
|
||||
status.HasCronjobName = true
|
||||
case *string:
|
||||
status.HasCronjobName = *v != ""
|
||||
}
|
||||
switch v := row.Data["k8s_job_name"].(type) {
|
||||
case string:
|
||||
status.HasJobName = true
|
||||
case *string:
|
||||
status.HasJobName = *v != ""
|
||||
}
|
||||
|
||||
switch v := row.Data["k8s_pod_name"].(type) {
|
||||
case string:
|
||||
status.PodName = v
|
||||
case *string:
|
||||
status.PodName = *v
|
||||
}
|
||||
|
||||
if !status.HasClusterName ||
|
||||
!status.HasNodeName ||
|
||||
!status.HasNamespaceName ||
|
||||
(!status.HasDeploymentName && !status.HasStatefulsetName && !status.HasDaemonsetName && !status.HasCronjobName && !status.HasJobName) {
|
||||
statuses = append(statuses, status)
|
||||
}
|
||||
}
|
||||
|
||||
return statuses, nil
|
||||
}
|
||||
|
||||
func (p *PodsRepo) getMetadataAttributes(ctx context.Context, req model.PodListRequest) (map[string]map[string]string, error) {
|
||||
podAttrs := map[string]map[string]string{}
|
||||
|
||||
|
@ -731,3 +731,26 @@ type VolumeListRecord struct {
|
||||
VolumeUsage float64 `json:"volumeUsage"`
|
||||
Meta map[string]string `json:"meta"`
|
||||
}
|
||||
|
||||
type PodOnboardingStatus struct {
|
||||
ClusterName string `json:"clusterName"`
|
||||
NodeName string `json:"nodeName"`
|
||||
NamespaceName string `json:"namespaceName"`
|
||||
PodName string `json:"podName"`
|
||||
HasClusterName bool `json:"hasClusterName"`
|
||||
HasNodeName bool `json:"hasNodeName"`
|
||||
HasNamespaceName bool `json:"hasNamespaceName"`
|
||||
HasDeploymentName bool `json:"hasDeploymentName"`
|
||||
HasStatefulsetName bool `json:"hasStatefulsetName"`
|
||||
HasDaemonsetName bool `json:"hasDaemonsetName"`
|
||||
HasCronjobName bool `json:"hasCronjobName"`
|
||||
HasJobName bool `json:"hasJobName"`
|
||||
}
|
||||
|
||||
type OnboardingStatus struct {
|
||||
DidSendPodMetrics bool `json:"didSendPodMetrics"`
|
||||
DidSendNodeMetrics bool `json:"didSendNodeMetrics"`
|
||||
DidSendClusterMetrics bool `json:"didSendClusterMetrics"`
|
||||
IsSendingOptionalPodMetrics bool `json:"isSendingOptionalPodMetrics"`
|
||||
IsSendingRequiredMetadata []PodOnboardingStatus `json:"isSendingRequiredMetadata"`
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user