fix: use structs instead of interface

This commit is contained in:
nityanandagohain 2023-03-16 10:20:57 +05:30
parent 1c867d3b4c
commit bac717e9e6
4 changed files with 103 additions and 58 deletions

View File

@ -212,7 +212,7 @@ func UpsertSamplingProcessor(ctx context.Context, version int, config *tsp.Confi
} }
// UpsertLogParsingProcessors updates the agent with log parsing processors // UpsertLogParsingProcessors updates the agent with log parsing processors
func UpsertLogParsingProcessor(ctx context.Context, version int, rawPipelineData []byte, config map[string]interface{}, names []interface{}) error { func UpsertLogParsingProcessor(ctx context.Context, version int, rawPipelineData []byte, config map[string]interface{}, names []string) error {
if !atomic.CompareAndSwapUint32(&m.lock, 0, 1) { if !atomic.CompareAndSwapUint32(&m.lock, 0, 1) {
return fmt.Errorf("agent updater is busy") return fmt.Errorf("agent updater is busy")
} }

View File

@ -3,6 +3,7 @@ package opamp
import ( import (
"context" "context"
"crypto/sha256" "crypto/sha256"
"encoding/json"
"fmt" "fmt"
"strings" "strings"
"sync" "sync"
@ -16,7 +17,7 @@ import (
var lockLogsPipelineSpec sync.RWMutex var lockLogsPipelineSpec sync.RWMutex
func UpsertLogsParsingProcessor(ctx context.Context, parsingProcessors map[string]interface{}, parsingProcessorsNames []interface{}, callback func(string, string, error)) (string, error) { func UpsertLogsParsingProcessor(ctx context.Context, parsingProcessors map[string]interface{}, parsingProcessorsNames []string, callback func(string, string, error)) (string, error) {
confHash := "" confHash := ""
if opAmpServer == nil { if opAmpServer == nil {
return confHash, fmt.Errorf("opamp server is down, unable to push config to agent at this moment") return confHash, fmt.Errorf("opamp server is down, unable to push config to agent at this moment")
@ -36,15 +37,20 @@ func UpsertLogsParsingProcessor(ctx context.Context, parsingProcessors map[strin
buildLogParsingProcessors(c, parsingProcessors) buildLogParsingProcessors(c, parsingProcessors)
// get the processor list p, err := getOtelPipelinFromConfig(c)
logs := c["service"].(map[string]interface{})["pipelines"].(map[string]interface{})["logs"] if err != nil {
processors := logs.(map[string]interface{})["processors"].([]interface{}) return confHash, err
}
if p.Pipelines.Logs == nil {
return confHash, fmt.Errorf("logs pipeline doesn't exist")
}
// build the new processor list // build the new processor list
updatedProcessorList, _ := buildLogsProcessors(processors, parsingProcessorsNames) updatedProcessorList, _ := buildLogsProcessors(p.Pipelines.Logs.Processors, parsingProcessorsNames)
p.Pipelines.Logs.Processors = updatedProcessorList
// add the new processor to the data // add the new processor to the data ( no checks required as the keys will exists)
c["service"].(map[string]interface{})["pipelines"].(map[string]interface{})["logs"].(map[string]interface{})["processors"] = updatedProcessorList c["service"].(map[string]interface{})["pipelines"].(map[string]interface{})["logs"] = p.Pipelines.Logs
updatedConf, err := yaml.Parser().Marshal(c) updatedConf, err := yaml.Parser().Marshal(c)
if err != nil { if err != nil {
@ -106,19 +112,44 @@ func buildLogParsingProcessors(agentConf, parsingProcessors map[string]interface
return nil return nil
} }
func buildLogsProcessors(current []interface{}, logsParserPipeline []interface{}) ([]interface{}, error) { type otelPipeline struct {
Pipelines struct {
Logs *struct {
Exporters []string `json:"exporters" yaml:"exporters"`
Processors []string `json:"processors" yaml:"processors"`
Receivers []string `json:"receivers" yaml:"receivers"`
} `json:"logs" yaml:"logs"`
} `json:"pipelines" yaml:"pipelines"`
}
func getOtelPipelinFromConfig(config map[string]interface{}) (*otelPipeline, error) {
if _, ok := config["service"]; !ok {
return nil, fmt.Errorf("service not found in OTEL config")
}
b, err := json.Marshal(config["service"])
if err != nil {
return nil, err
}
p := otelPipeline{}
if err := json.Unmarshal(b, &p); err != nil {
return nil, err
}
return &p, nil
}
func buildLogsProcessors(current []string, logsParserPipeline []string) ([]string, error) {
lockLogsPipelineSpec.Lock() lockLogsPipelineSpec.Lock()
defer lockLogsPipelineSpec.Unlock() defer lockLogsPipelineSpec.Unlock()
exists := map[string]struct{}{} exists := map[string]struct{}{}
for _, v := range logsParserPipeline { for _, v := range logsParserPipeline {
exists[v.(string)] = struct{}{} exists[v] = struct{}{}
} }
// removed the old processors which are not used // removed the old processors which are not used
var pipeline []interface{} var pipeline []string
for _, v := range current { for _, v := range current {
k := v.(string) k := v
if _, ok := exists[k]; ok || !strings.HasPrefix(k, constants.LogsPPLPfx) { if _, ok := exists[k]; ok || !strings.HasPrefix(k, constants.LogsPPLPfx) {
pipeline = append(pipeline, v) pipeline = append(pipeline, v)
} }
@ -127,7 +158,7 @@ func buildLogsProcessors(current []interface{}, logsParserPipeline []interface{}
// create a reverse map of existing config processors and their position // create a reverse map of existing config processors and their position
existing := map[string]int{} existing := map[string]int{}
for i, p := range current { for i, p := range current {
name := p.(string) name := p
existing[name] = i existing[name] = i
} }
@ -137,7 +168,7 @@ func buildLogsProcessors(current []interface{}, logsParserPipeline []interface{}
// go through plan and map its elements to current positions in effective config // go through plan and map its elements to current positions in effective config
for i, m := range logsParserPipeline { for i, m := range logsParserPipeline {
if loc, ok := existing[m.(string)]; ok { if loc, ok := existing[m]; ok {
specVsExistingMap[i] = loc specVsExistingMap[i] = loc
} }
} }
@ -153,13 +184,13 @@ func buildLogsProcessors(current []interface{}, logsParserPipeline []interface{}
} else { } else {
if lastMatched <= 0 { if lastMatched <= 0 {
zap.S().Debugf("build_pipeline: found a new item to be inserted, inserting at position 0:", m) zap.S().Debugf("build_pipeline: found a new item to be inserted, inserting at position 0:", m)
pipeline = append([]interface{}{m}, pipeline[lastMatched:]...) pipeline = append([]string{m}, pipeline[lastMatched:]...)
lastMatched++ lastMatched++
} else { } else {
zap.S().Debugf("build_pipeline: found a new item to be inserted, inserting at position :", lastMatched, " ", m) zap.S().Debugf("build_pipeline: found a new item to be inserted, inserting at position :", lastMatched, " ", m)
prior := make([]interface{}, len(pipeline[:lastMatched])) prior := make([]string, len(pipeline[:lastMatched]))
next := make([]interface{}, len(pipeline[lastMatched:])) next := make([]string, len(pipeline[lastMatched:]))
copy(prior, pipeline[:lastMatched]) copy(prior, pipeline[:lastMatched])
copy(next, pipeline[lastMatched:]) copy(next, pipeline[lastMatched:])
@ -170,7 +201,7 @@ func buildLogsProcessors(current []interface{}, logsParserPipeline []interface{}
} }
} }
if checkDuplicates(pipeline) { if checkDuplicateString(pipeline) {
// duplicates are most likely because the processor sequence in effective config conflicts // duplicates are most likely because the processor sequence in effective config conflicts
// with the planned sequence as per planned pipeline // with the planned sequence as per planned pipeline
return pipeline, fmt.Errorf("the effective config has an unexpected processor sequence: %v", pipeline) return pipeline, fmt.Errorf("the effective config has an unexpected processor sequence: %v", pipeline)
@ -178,3 +209,17 @@ func buildLogsProcessors(current []interface{}, logsParserPipeline []interface{}
return pipeline, nil return pipeline, nil
} }
func checkDuplicateString(pipeline []string) bool {
exists := make(map[string]bool, len(pipeline))
zap.S().Debugf("checking duplicate processors in the pipeline:", pipeline)
for _, processor := range pipeline {
name := processor
if _, ok := exists[name]; ok {
return true
}
exists[name] = true
}
return false
}

View File

@ -22,11 +22,11 @@ var buildProcessorTestData = []struct {
}, },
}, },
pipelineProcessor: map[string]interface{}{ pipelineProcessor: map[string]interface{}{
constants.LogsPPLPfx + "_b": struct{}{}, constants.LogsPPLPfx + "b": struct{}{},
}, },
outputConf: map[string]interface{}{ outputConf: map[string]interface{}{
"processors": map[string]interface{}{ "processors": map[string]interface{}{
constants.LogsPPLPfx + "_b": struct{}{}, constants.LogsPPLPfx + "b": struct{}{},
"batch": struct{}{}, "batch": struct{}{},
}, },
}, },
@ -35,7 +35,7 @@ var buildProcessorTestData = []struct {
Name: "Remove", Name: "Remove",
agentConf: map[string]interface{}{ agentConf: map[string]interface{}{
"processors": map[string]interface{}{ "processors": map[string]interface{}{
constants.LogsPPLPfx + "_b": struct{}{}, constants.LogsPPLPfx + "b": struct{}{},
"batch": struct{}{}, "batch": struct{}{},
}, },
}, },
@ -50,17 +50,17 @@ var buildProcessorTestData = []struct {
Name: "remove and upsert 1", Name: "remove and upsert 1",
agentConf: map[string]interface{}{ agentConf: map[string]interface{}{
"processors": map[string]interface{}{ "processors": map[string]interface{}{
constants.LogsPPLPfx + "_a": struct{}{}, constants.LogsPPLPfx + "a": struct{}{},
constants.LogsPPLPfx + "_b": struct{}{}, constants.LogsPPLPfx + "b": struct{}{},
"batch": struct{}{}, "batch": struct{}{},
}, },
}, },
pipelineProcessor: map[string]interface{}{ pipelineProcessor: map[string]interface{}{
constants.LogsPPLPfx + "_b": struct{}{}, constants.LogsPPLPfx + "b": struct{}{},
}, },
outputConf: map[string]interface{}{ outputConf: map[string]interface{}{
"processors": map[string]interface{}{ "processors": map[string]interface{}{
constants.LogsPPLPfx + "_b": struct{}{}, constants.LogsPPLPfx + "b": struct{}{},
"batch": struct{}{}, "batch": struct{}{},
}, },
}, },
@ -69,19 +69,19 @@ var buildProcessorTestData = []struct {
Name: "remove and upsert 2", Name: "remove and upsert 2",
agentConf: map[string]interface{}{ agentConf: map[string]interface{}{
"processors": map[string]interface{}{ "processors": map[string]interface{}{
"memory_limiter": struct{}{}, "memorylimiter": struct{}{},
constants.LogsPPLPfx + "_a": struct{}{}, constants.LogsPPLPfx + "a": struct{}{},
constants.LogsPPLPfx + "_b": struct{}{}, constants.LogsPPLPfx + "b": struct{}{},
"batch": struct{}{}, "batch": struct{}{},
}, },
}, },
pipelineProcessor: map[string]interface{}{ pipelineProcessor: map[string]interface{}{
constants.LogsPPLPfx + "_b": struct{}{}, constants.LogsPPLPfx + "b": struct{}{},
}, },
outputConf: map[string]interface{}{ outputConf: map[string]interface{}{
"processors": map[string]interface{}{ "processors": map[string]interface{}{
"memory_limiter": struct{}{}, "memorylimiter": struct{}{},
constants.LogsPPLPfx + "_b": struct{}{}, constants.LogsPPLPfx + "b": struct{}{},
"batch": struct{}{}, "batch": struct{}{},
}, },
}, },
@ -89,7 +89,7 @@ var buildProcessorTestData = []struct {
} }
func TestBuildLogParsingProcessors(t *testing.T) { func TestBuildLogParsingProcessors(t *testing.T) {
for _, test := range buildProcessorTestData { for , test := range buildProcessorTestData {
Convey(test.Name, t, func() { Convey(test.Name, t, func() {
err := buildLogParsingProcessors(test.agentConf, test.pipelineProcessor) err := buildLogParsingProcessors(test.agentConf, test.pipelineProcessor)
So(err, ShouldBeNil) So(err, ShouldBeNil)
@ -101,56 +101,56 @@ func TestBuildLogParsingProcessors(t *testing.T) {
var BuildLogsPipelineTestData = []struct { var BuildLogsPipelineTestData = []struct {
Name string Name string
currentPipeline []interface{} currentPipeline []string
logsPipeline []interface{} logsPipeline []string
expectedPipeline []interface{} expectedPipeline []string
}{ }{
{ {
Name: "Add new pipelines", Name: "Add new pipelines",
currentPipeline: []interface{}{"processor1", "processor2"}, currentPipeline: []string{"processor1", "processor2"},
logsPipeline: []interface{}{constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_b"}, logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b"},
expectedPipeline: []interface{}{constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_b", "processor1", "processor2"}, expectedPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", "processor1", "processor2"},
}, },
{ {
Name: "Add new pipeline and respect custom processors", Name: "Add new pipeline and respect custom processors",
currentPipeline: []interface{}{constants.LogsPPLPfx + "_a", "processor1", constants.LogsPPLPfx + "_b", "processor2"}, currentPipeline: []string{constants.LogsPPLPfx + "a", "processor1", constants.LogsPPLPfx + "b", "processor2"},
logsPipeline: []interface{}{constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_c"}, logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c"},
expectedPipeline: []interface{}{constants.LogsPPLPfx + "_a", "processor1", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_c", "processor2"}, expectedPipeline: []string{constants.LogsPPLPfx + "a", "processor1", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c", "processor2"},
}, },
{ {
Name: "Add new pipeline and respect custom processors in the beginning and middle", Name: "Add new pipeline and respect custom processors in the beginning and middle",
currentPipeline: []interface{}{"processor1", constants.LogsPPLPfx + "_a", "processor2", constants.LogsPPLPfx + "_b", "batch"}, currentPipeline: []string{"processor1", constants.LogsPPLPfx + "a", "processor2", constants.LogsPPLPfx + "b", "batch"},
logsPipeline: []interface{}{constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_c"}, logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c"},
expectedPipeline: []interface{}{"processor1", constants.LogsPPLPfx + "_a", "processor2", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_c", "batch"}, expectedPipeline: []string{"processor1", constants.LogsPPLPfx + "a", "processor2", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c", "batch"},
}, },
{ {
Name: "Remove old pipeline add add new", Name: "Remove old pipeline add add new",
currentPipeline: []interface{}{constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_b", "processor1", "processor2"}, currentPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", "processor1", "processor2"},
logsPipeline: []interface{}{constants.LogsPPLPfx + "_a"}, logsPipeline: []string{constants.LogsPPLPfx + "a"},
expectedPipeline: []interface{}{constants.LogsPPLPfx + "_a", "processor1", "processor2"}, expectedPipeline: []string{constants.LogsPPLPfx + "a", "processor1", "processor2"},
}, },
{ {
Name: "Remove old pipeline from middle", Name: "Remove old pipeline from middle",
currentPipeline: []interface{}{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch"}, currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "a", "processor3", constants.LogsPPLPfx + "b", "batch"},
logsPipeline: []interface{}{constants.LogsPPLPfx + "_a"}, logsPipeline: []string{constants.LogsPPLPfx + "a"},
expectedPipeline: []interface{}{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", "batch"}, expectedPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "a", "processor3", "batch"},
}, },
{ {
Name: "Remove old pipeline from middle and add new pipeline", Name: "Remove old pipeline from middle and add new pipeline",
currentPipeline: []interface{}{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch"}, currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "a", "processor3", constants.LogsPPLPfx + "b", "batch"},
logsPipeline: []interface{}{constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_c"}, logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "c"},
expectedPipeline: []interface{}{"processor1", "processor2", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_c", "processor3", "batch"}, expectedPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "c", "processor3", "batch"},
}, },
{ {
Name: "Remove multiple old pipelines from middle and add multiple new ones", Name: "Remove multiple old pipelines from middle and add multiple new ones",
currentPipeline: []interface{}{"processor1", constants.LogsPPLPfx + "_a", "processor2", constants.LogsPPLPfx + "_b", "processor3", constants.LogsPPLPfx + "_c", "processor4", constants.LogsPPLPfx + "_d", "processor5", "batch"}, currentPipeline: []string{"processor1", constants.LogsPPLPfx + "a", "processor2", constants.LogsPPLPfx + "b", "processor3", constants.LogsPPLPfx + "c", "processor4", constants.LogsPPLPfx + "d", "processor5", "batch"},
logsPipeline: []interface{}{constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_a1", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_c1"}, logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "a1", constants.LogsPPLPfx + "c", constants.LogsPPLPfx + "c1"},
expectedPipeline: []interface{}{"processor1", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_a1", "processor2", "processor3", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_c1", "processor4", "processor5", "batch"}, expectedPipeline: []string{"processor1", constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "a1", "processor2", "processor3", constants.LogsPPLPfx + "c", constants.LogsPPLPfx + "c1", "processor4", "processor5", "batch"},
}, },
} }
func TestBuildLogsPipeline(t *testing.T) { func TestBuildLogsPipeline(t *testing.T) {
for _, test := range BuildLogsPipelineTestData { for , test := range BuildLogsPipelineTestData {
Convey(test.Name, t, func() { Convey(test.Name, t, func() {
v, err := buildLogsProcessors(test.currentPipeline, test.logsPipeline) v, err := buildLogsProcessors(test.currentPipeline, test.logsPipeline)
So(err, ShouldBeNil) So(err, ShouldBeNil)

View File

@ -222,4 +222,4 @@ const (
) )
// logsPPLPfx is a short constant for logsPipelinePrefix // logsPPLPfx is a short constant for logsPipelinePrefix
const LogsPPLPfx = "logstransform/pipeline" const LogsPPLPfx = "logstransform/pipeline_"