diff --git a/pkg/query-service/app/logparsingpipeline/preview.go b/pkg/query-service/app/logparsingpipeline/preview.go index 548c1ee2f5..af713b75a2 100644 --- a/pkg/query-service/app/logparsingpipeline/preview.go +++ b/pkg/query-service/app/logparsingpipeline/preview.go @@ -6,13 +6,13 @@ import ( "strings" "time" + "github.com/SigNoz/signoz-otel-collector/pkg/collectorsimulator" _ "github.com/SigNoz/signoz-otel-collector/pkg/parser/grok" "github.com/SigNoz/signoz-otel-collector/processor/signozlogspipelineprocessor" "github.com/pkg/errors" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/processor" - "go.signoz.io/signoz/pkg/query-service/collectorsimulator" "go.signoz.io/signoz/pkg/query-service/model" ) @@ -66,14 +66,20 @@ func SimulatePipelinesProcessing( return updatedConf, nil } - outputPLogs, collectorErrs, apiErr := collectorsimulator.SimulateLogsProcessing( + outputPLogs, collectorErrs, simulationErr := collectorsimulator.SimulateLogsProcessing( ctx, processorFactories, configGenerator, simulatorInputPLogs, timeout, ) - if apiErr != nil { + if simulationErr != nil { + if errors.Is(simulationErr, collectorsimulator.ErrInvalidConfig) { + apiErr = model.BadRequest(simulationErr) + } else { + apiErr = model.InternalError(simulationErr) + } + return nil, collectorErrs, model.WrapApiError(apiErr, "could not simulate log pipelines processing.\nCollector errors", ) diff --git a/pkg/query-service/collectorsimulator/collectorsimulator.go b/pkg/query-service/collectorsimulator/collectorsimulator.go deleted file mode 100644 index daae85a797..0000000000 --- a/pkg/query-service/collectorsimulator/collectorsimulator.go +++ /dev/null @@ -1,265 +0,0 @@ -package collectorsimulator - -import ( - "context" - "fmt" - "os" - "strings" - - "github.com/google/uuid" - "github.com/pkg/errors" - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/confmap" - "go.opentelemetry.io/collector/confmap/converter/expandconverter" - "go.opentelemetry.io/collector/confmap/provider/fileprovider" - "go.opentelemetry.io/collector/exporter" - "go.opentelemetry.io/collector/otelcol" - "go.opentelemetry.io/collector/processor" - "go.opentelemetry.io/collector/receiver" - "go.opentelemetry.io/collector/service" - - "go.signoz.io/signoz/pkg/query-service/collectorsimulator/inmemoryexporter" - "go.signoz.io/signoz/pkg/query-service/collectorsimulator/inmemoryreceiver" - "go.signoz.io/signoz/pkg/query-service/model" -) - -// Puts together a collector service with inmemory receiver and exporter -// for simulating processing of signal data through an otel collector -type CollectorSimulator struct { - // collector service to be used for the simulation - collectorSvc *service.Service - - // tmp file where collectorSvc will log errors. - collectorLogsOutputFilePath string - - // error channel where collector components will report fatal errors - // Gets passed in as AsyncErrorChannel in service.Settings when creating a collector service. - collectorErrorChannel chan error - - // Unique ids of inmemory receiver and exporter instances that - // will be created by collectorSvc - inMemoryReceiverId string - inMemoryExporterId string -} - -type ConfigGenerator func(baseConfYaml []byte) ([]byte, error) - -func NewCollectorSimulator( - ctx context.Context, - processorFactories map[component.Type]processor.Factory, - configGenerator ConfigGenerator, -) (simulator *CollectorSimulator, cleanupFn func(), apiErr *model.ApiError) { - // Put together collector component factories for use in the simulation - receiverFactories, err := receiver.MakeFactoryMap(inmemoryreceiver.NewFactory()) - if err != nil { - return nil, nil, model.InternalError(errors.Wrap(err, "could not create receiver factories.")) - } - exporterFactories, err := exporter.MakeFactoryMap(inmemoryexporter.NewFactory()) - if err != nil { - return nil, nil, model.InternalError(errors.Wrap(err, "could not create processor factories.")) - } - factories := otelcol.Factories{ - Receivers: receiverFactories, - Processors: processorFactories, - Exporters: exporterFactories, - } - - // Prepare collector config yaml for simulation - inMemoryReceiverId := uuid.NewString() - inMemoryExporterId := uuid.NewString() - - logsOutputFile, err := os.CreateTemp("", "collector-simulator-logs-*") - if err != nil { - return nil, nil, model.InternalError(errors.Wrap( - err, "could not create tmp file for capturing collector logs", - )) - } - collectorLogsOutputFilePath := logsOutputFile.Name() - cleanupFn = func() { - os.Remove(collectorLogsOutputFilePath) - } - err = logsOutputFile.Close() - if err != nil { - return nil, cleanupFn, model.InternalError(errors.Wrap(err, "could not close tmp collector log file")) - } - - collectorConfYaml, err := generateSimulationConfig( - inMemoryReceiverId, - configGenerator, - inMemoryExporterId, - collectorLogsOutputFilePath, - ) - if err != nil { - return nil, cleanupFn, model.BadRequest(errors.Wrap(err, "could not generate collector config")) - } - - // Read collector config using the same file provider we use in the actual collector. - // This ensures env variable substitution if any is taken into account. - simulationConfigFile, err := os.CreateTemp("", "collector-simulator-config-*") - if err != nil { - return nil, nil, model.InternalError(errors.Wrap( - err, "could not create tmp file for capturing collector logs", - )) - } - simulationConfigPath := simulationConfigFile.Name() - cleanupFn = func() { - os.Remove(collectorLogsOutputFilePath) - os.Remove(simulationConfigPath) - } - - _, err = simulationConfigFile.Write(collectorConfYaml) - - if err != nil { - return nil, cleanupFn, model.InternalError(errors.Wrap(err, "could not write simulation config to tmp file")) - } - err = simulationConfigFile.Close() - if err != nil { - return nil, cleanupFn, model.InternalError(errors.Wrap(err, "could not close tmp simulation config file")) - } - - fp := fileprovider.NewFactory() - confProvider, err := otelcol.NewConfigProvider(otelcol.ConfigProviderSettings{ - ResolverSettings: confmap.ResolverSettings{ - URIs: []string{simulationConfigPath}, - ProviderFactories: []confmap.ProviderFactory{fp}, - ConverterFactories: []confmap.ConverterFactory{expandconverter.NewFactory()}, - }, - }) - if err != nil { - return nil, cleanupFn, model.BadRequest(errors.Wrap(err, "could not create config provider.")) - } - - collectorCfg, err := confProvider.Get(ctx, factories) - if err != nil { - return nil, cleanupFn, model.BadRequest(errors.Wrap(err, "failed to parse collector config")) - } - - if err = collectorCfg.Validate(); err != nil { - return nil, cleanupFn, model.BadRequest(errors.Wrap(err, "invalid collector config")) - } - - // Build and start collector service. - collectorErrChan := make(chan error) - svcSettings := service.Settings{ - ReceiversConfigs: collectorCfg.Receivers, - ReceiversFactories: factories.Receivers, - - ProcessorsConfigs: collectorCfg.Processors, - ProcessorsFactories: factories.Processors, - - ExportersConfigs: collectorCfg.Exporters, - ExportersFactories: factories.Exporters, - - ConnectorsConfigs: collectorCfg.Connectors, - ConnectorsFactories: factories.Connectors, - - ExtensionsConfigs: collectorCfg.Extensions, - ExtensionsFactories: factories.Extensions, - - AsyncErrorChannel: collectorErrChan, - } - - collectorSvc, err := service.New(ctx, svcSettings, collectorCfg.Service) - if err != nil { - return nil, cleanupFn, model.InternalError(errors.Wrap(err, "could not instantiate collector service")) - } - - return &CollectorSimulator{ - inMemoryReceiverId: inMemoryReceiverId, - inMemoryExporterId: inMemoryExporterId, - collectorSvc: collectorSvc, - collectorErrorChannel: collectorErrChan, - collectorLogsOutputFilePath: collectorLogsOutputFilePath, - }, cleanupFn, nil -} - -func (l *CollectorSimulator) Start(ctx context.Context) ( - func(), *model.ApiError, -) { - // Calling collectorSvc.Start below will in turn call Start on - // inmemory receiver and exporter instances created by collectorSvc - // - // inmemory components are indexed in a global map after Start is called - // on them and will have to be cleaned up to ensure there is no memory leak - cleanupFn := func() { - inmemoryreceiver.CleanupInstance(l.inMemoryReceiverId) - inmemoryexporter.CleanupInstance(l.inMemoryExporterId) - } - - err := l.collectorSvc.Start(ctx) - if err != nil { - return cleanupFn, model.InternalError(errors.Wrap(err, "could not start collector service for simulation")) - } - - return cleanupFn, nil -} - -func (l *CollectorSimulator) GetReceiver() *inmemoryreceiver.InMemoryReceiver { - return inmemoryreceiver.GetReceiverInstance(l.inMemoryReceiverId) -} - -func (l *CollectorSimulator) GetExporter() *inmemoryexporter.InMemoryExporter { - return inmemoryexporter.GetExporterInstance(l.inMemoryExporterId) -} - -func (l *CollectorSimulator) Shutdown(ctx context.Context) ( - simulationErrs []string, apiErr *model.ApiError, -) { - shutdownErr := l.collectorSvc.Shutdown(ctx) - - // Collect all errors logged or reported by collectorSvc - simulationErrs = []string{} - close(l.collectorErrorChannel) - for reportedErr := range l.collectorErrorChannel { - simulationErrs = append(simulationErrs, reportedErr.Error()) - } - - collectorWarnAndErrorLogs, err := os.ReadFile(l.collectorLogsOutputFilePath) - if err != nil { - return nil, model.InternalError(fmt.Errorf( - "could not read collector logs from tmp file: %w", err, - )) - } - if len(collectorWarnAndErrorLogs) > 0 { - errorLines := strings.Split(string(collectorWarnAndErrorLogs), "\n") - simulationErrs = append(simulationErrs, errorLines...) - } - - if shutdownErr != nil { - return simulationErrs, model.InternalError(errors.Wrap( - shutdownErr, "could not shutdown the collector service", - )) - } - return simulationErrs, nil -} - -func generateSimulationConfig( - receiverId string, - configGenerator ConfigGenerator, - exporterId string, - collectorLogsOutputPath string, -) ([]byte, error) { - baseConf := fmt.Sprintf(` - receivers: - memory: - id: %s - exporters: - memory: - id: %s - service: - pipelines: - logs: - receivers: - - memory - exporters: - - memory - telemetry: - metrics: - level: none - logs: - level: warn - output_paths: ["%s"] - `, receiverId, exporterId, collectorLogsOutputPath) - - return configGenerator([]byte(baseConf)) -} diff --git a/pkg/query-service/collectorsimulator/inmemoryexporter/config.go b/pkg/query-service/collectorsimulator/inmemoryexporter/config.go deleted file mode 100644 index 5b23b041ce..0000000000 --- a/pkg/query-service/collectorsimulator/inmemoryexporter/config.go +++ /dev/null @@ -1,16 +0,0 @@ -package inmemoryexporter - -import "fmt" - -type Config struct { - // Unique id for the exporter. - // Useful for getting a hold of the exporter in code that doesn't control its instantiation. - Id string `mapstructure:"id"` -} - -func (c *Config) Validate() error { - if len(c.Id) < 1 { - return fmt.Errorf("inmemory exporter: id is required") - } - return nil -} diff --git a/pkg/query-service/collectorsimulator/inmemoryexporter/config_test.go b/pkg/query-service/collectorsimulator/inmemoryexporter/config_test.go deleted file mode 100644 index c9628451d4..0000000000 --- a/pkg/query-service/collectorsimulator/inmemoryexporter/config_test.go +++ /dev/null @@ -1,48 +0,0 @@ -package inmemoryexporter - -import ( - "testing" - - "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/confmap" -) - -func TestValidate(t *testing.T) { - tests := []struct { - name string - rawConf *confmap.Conf - errorExpected bool - }{ - { - name: "with id", - rawConf: confmap.NewFromStringMap(map[string]interface{}{ - "id": "test_exporter", - }), - errorExpected: false, - }, - { - name: "empty id", - rawConf: confmap.NewFromStringMap(map[string]interface{}{ - "id": "", - }), - errorExpected: true, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - factory := NewFactory() - cfg := factory.CreateDefaultConfig() - err := tt.rawConf.Unmarshal(cfg) - require.NoError(t, err, "could not UnmarshalConfig") - - err = component.ValidateConfig(cfg) - if tt.errorExpected { - require.NotNilf(t, err, "Invalid config did not return validation error: %v", cfg) - } else { - require.NoErrorf(t, err, "Valid config returned validation error: %v", cfg) - } - }) - } -} diff --git a/pkg/query-service/collectorsimulator/inmemoryexporter/exporter.go b/pkg/query-service/collectorsimulator/inmemoryexporter/exporter.go deleted file mode 100644 index 3cff186016..0000000000 --- a/pkg/query-service/collectorsimulator/inmemoryexporter/exporter.go +++ /dev/null @@ -1,86 +0,0 @@ -package inmemoryexporter - -import ( - "context" - "fmt" - "sync" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/pdata/plog" -) - -// An in-memory exporter for testing and generating previews. -type InMemoryExporter struct { - // Unique identifier for the exporter. - id string - // mu protects the data below - mu sync.Mutex - // slice of pdata.Logs that were received by this exporter. - logs []plog.Logs -} - -// ConsumeLogs implements component.LogsExporter. -func (e *InMemoryExporter) ConsumeLogs(ctx context.Context, ld plog.Logs) error { - e.mu.Lock() - defer e.mu.Unlock() - - e.logs = append(e.logs, ld) - return nil -} - -func (e *InMemoryExporter) GetLogs() []plog.Logs { - e.mu.Lock() - defer e.mu.Unlock() - - return e.logs -} - -func (e *InMemoryExporter) ResetLogs() { - e.mu.Lock() - defer e.mu.Unlock() - - e.logs = nil -} - -func (e *InMemoryExporter) Capabilities() consumer.Capabilities { - return consumer.Capabilities{MutatesData: false} -} - -// Keep track of all exporter instances in the process. -// Useful for getting a hold of the exporter in scenarios where one doesn't -// create the instances. Eg: bringing up a collector service from collector config -var allExporterInstances map[string]*InMemoryExporter -var allExportersLock sync.Mutex - -func init() { - allExporterInstances = make(map[string]*InMemoryExporter) -} - -func GetExporterInstance(id string) *InMemoryExporter { - return allExporterInstances[id] -} - -func CleanupInstance(exporterId string) { - allExportersLock.Lock() - defer allExportersLock.Unlock() - - delete(allExporterInstances, exporterId) -} - -func (e *InMemoryExporter) Start(ctx context.Context, host component.Host) error { - allExportersLock.Lock() - defer allExportersLock.Unlock() - - if allExporterInstances[e.id] != nil { - return fmt.Errorf("exporter with id %s is already running", e.id) - } - - allExporterInstances[e.id] = e - return nil -} - -func (e *InMemoryExporter) Shutdown(ctx context.Context) error { - CleanupInstance(e.id) - return nil -} diff --git a/pkg/query-service/collectorsimulator/inmemoryexporter/exporter_test.go b/pkg/query-service/collectorsimulator/inmemoryexporter/exporter_test.go deleted file mode 100644 index 2e8466614e..0000000000 --- a/pkg/query-service/collectorsimulator/inmemoryexporter/exporter_test.go +++ /dev/null @@ -1,64 +0,0 @@ -package inmemoryexporter - -import ( - "context" - "testing" - - "github.com/google/uuid" - "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/confmap" - "go.opentelemetry.io/collector/exporter" -) - -func TestExporterLifecycle(t *testing.T) { - require := require.New(t) - testExporterId := uuid.NewString() - - // Should be able to get a hold of the exporter after starting it. - require.Nil(GetExporterInstance(testExporterId)) - - constructed, err := makeTestExporter(testExporterId) - require.Nil(err, "could not make test exporter") - - err = constructed.Start(context.Background(), componenttest.NewNopHost()) - require.Nil(err, "could not start test exporter") - - testExporter := GetExporterInstance(testExporterId) - require.NotNil(testExporter, "could not get exporter instance by Id") - - // Should not be able to start 2 exporters with the same id - constructed2, err := makeTestExporter(testExporterId) - require.Nil(err, "could not create second exporter with same id") - - err = constructed2.Start(context.Background(), componenttest.NewNopHost()) - require.NotNil(err, "should not be able to start another exporter with same id before shutting down the previous one") - - // Should not be able to get a hold of an exporter after shutdown - testExporter.Shutdown(context.Background()) - require.Nil(GetExporterInstance(testExporterId), "should not be able to find exporter instance after shutdown") - - // Should be able to start a new exporter with same id after shutting down - constructed3, err := makeTestExporter(testExporterId) - require.Nil(err, "could not make exporter with same Id after shutting down previous one") - - err = constructed3.Start(context.Background(), componenttest.NewNopHost()) - require.Nil(err, "should be able to start another exporter with same id after shutting down the previous one") - - testExporter3 := GetExporterInstance(testExporterId) - require.NotNil(testExporter3, "could not get exporter instance by Id") - - testExporter3.Shutdown(context.Background()) - require.Nil(GetExporterInstance(testExporterId)) -} - -func makeTestExporter(exporterId string) (exporter.Logs, error) { - factory := NewFactory() - - cfg := factory.CreateDefaultConfig() - confmap.NewFromStringMap(map[string]any{"id": exporterId}).Unmarshal(&cfg) - - return factory.CreateLogsExporter( - context.Background(), exporter.Settings{}, cfg, - ) -} diff --git a/pkg/query-service/collectorsimulator/inmemoryexporter/factory.go b/pkg/query-service/collectorsimulator/inmemoryexporter/factory.go deleted file mode 100644 index b86d9bc7dc..0000000000 --- a/pkg/query-service/collectorsimulator/inmemoryexporter/factory.go +++ /dev/null @@ -1,34 +0,0 @@ -package inmemoryexporter - -import ( - "context" - - "github.com/google/uuid" - "github.com/pkg/errors" - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/exporter" -) - -func createDefaultConfig() component.Config { - return &Config{ - Id: uuid.NewString(), - } -} - -func createLogsExporter( - _ context.Context, _ exporter.Settings, config component.Config, -) (exporter.Logs, error) { - if err := component.ValidateConfig(config); err != nil { - return nil, errors.Wrap(err, "invalid inmemory exporter config") - } - return &InMemoryExporter{ - id: config.(*Config).Id, - }, nil -} - -func NewFactory() exporter.Factory { - return exporter.NewFactory( - component.MustNewType("memory"), - createDefaultConfig, - exporter.WithLogs(createLogsExporter, component.StabilityLevelBeta)) -} diff --git a/pkg/query-service/collectorsimulator/inmemoryexporter/factory_test.go b/pkg/query-service/collectorsimulator/inmemoryexporter/factory_test.go deleted file mode 100644 index 641fee11bd..0000000000 --- a/pkg/query-service/collectorsimulator/inmemoryexporter/factory_test.go +++ /dev/null @@ -1,28 +0,0 @@ -package inmemoryexporter - -import ( - "context" - "testing" - - "github.com/stretchr/testify/assert" - "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/exporter" -) - -func TestCreateDefaultConfig(t *testing.T) { - factory := NewFactory() - cfg := factory.CreateDefaultConfig() - assert.NotNil(t, cfg, "failed to create default config") - assert.NoError(t, componenttest.CheckConfigStruct(cfg)) -} - -func TestCreateLogsExporter(t *testing.T) { - factory := NewFactory() - cfg := factory.CreateDefaultConfig() - - te, err := factory.CreateLogsExporter( - context.Background(), exporter.Settings{}, cfg, - ) - assert.NoError(t, err) - assert.NotNil(t, te) -} diff --git a/pkg/query-service/collectorsimulator/inmemoryreceiver/config.go b/pkg/query-service/collectorsimulator/inmemoryreceiver/config.go deleted file mode 100644 index 6df842ce3e..0000000000 --- a/pkg/query-service/collectorsimulator/inmemoryreceiver/config.go +++ /dev/null @@ -1,16 +0,0 @@ -package inmemoryreceiver - -import "fmt" - -type Config struct { - // Unique id for the receiver. - // Useful for getting a hold of the receiver in code that doesn't control its instantiation. - Id string `mapstructure:"id"` -} - -func (c *Config) Validate() error { - if len(c.Id) < 1 { - return fmt.Errorf("inmemory receiver: id is required") - } - return nil -} diff --git a/pkg/query-service/collectorsimulator/inmemoryreceiver/config_test.go b/pkg/query-service/collectorsimulator/inmemoryreceiver/config_test.go deleted file mode 100644 index 5a6144a379..0000000000 --- a/pkg/query-service/collectorsimulator/inmemoryreceiver/config_test.go +++ /dev/null @@ -1,48 +0,0 @@ -package inmemoryreceiver - -import ( - "testing" - - "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/confmap" -) - -func TestValidate(t *testing.T) { - tests := []struct { - name string - rawConf *confmap.Conf - errorExpected bool - }{ - { - name: "with id", - rawConf: confmap.NewFromStringMap(map[string]interface{}{ - "id": "test_receiver", - }), - errorExpected: false, - }, - { - name: "empty id", - rawConf: confmap.NewFromStringMap(map[string]interface{}{ - "id": "", - }), - errorExpected: true, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - factory := NewFactory() - cfg := factory.CreateDefaultConfig() - err := tt.rawConf.Unmarshal(&cfg) - require.NoError(t, err, "could not UnmarshalConfig") - - err = component.ValidateConfig(cfg) - if tt.errorExpected { - require.NotNilf(t, err, "Invalid config did not return validation error: %v", cfg) - } else { - require.NoErrorf(t, err, "Valid config returned validation error: %v", cfg) - } - }) - } -} diff --git a/pkg/query-service/collectorsimulator/inmemoryreceiver/factory.go b/pkg/query-service/collectorsimulator/inmemoryreceiver/factory.go deleted file mode 100644 index ed90e06cd8..0000000000 --- a/pkg/query-service/collectorsimulator/inmemoryreceiver/factory.go +++ /dev/null @@ -1,41 +0,0 @@ -package inmemoryreceiver - -import ( - "context" - - "github.com/google/uuid" - "github.com/pkg/errors" - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/receiver" -) - -func createDefaultConfig() component.Config { - return &Config{ - Id: uuid.NewString(), - } -} - -func createLogsReceiver( - _ context.Context, - _ receiver.Settings, - config component.Config, - consumer consumer.Logs, -) (receiver.Logs, error) { - if err := component.ValidateConfig(config); err != nil { - return nil, errors.Wrap(err, "invalid inmemory receiver config") - } - return &InMemoryReceiver{ - id: config.(*Config).Id, - nextConsumer: consumer, - }, nil - -} - -// NewFactory creates a new OTLP receiver factory. -func NewFactory() receiver.Factory { - return receiver.NewFactory( - component.MustNewType("memory"), - createDefaultConfig, - receiver.WithLogs(createLogsReceiver, component.StabilityLevelBeta)) -} diff --git a/pkg/query-service/collectorsimulator/inmemoryreceiver/factory_test.go b/pkg/query-service/collectorsimulator/inmemoryreceiver/factory_test.go deleted file mode 100644 index 6c79622e92..0000000000 --- a/pkg/query-service/collectorsimulator/inmemoryreceiver/factory_test.go +++ /dev/null @@ -1,29 +0,0 @@ -package inmemoryreceiver - -import ( - "context" - "testing" - - "github.com/stretchr/testify/assert" - "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/consumer/consumertest" - "go.opentelemetry.io/collector/receiver" -) - -func TestCreateDefaultConfig(t *testing.T) { - factory := NewFactory() - cfg := factory.CreateDefaultConfig() - assert.NotNil(t, cfg, "failed to create default config") - assert.NoError(t, componenttest.CheckConfigStruct(cfg)) -} - -func TestCreateLogsReceiver(t *testing.T) { - factory := NewFactory() - cfg := factory.CreateDefaultConfig() - - te, err := factory.CreateLogsReceiver( - context.Background(), receiver.Settings{}, cfg, consumertest.NewNop(), - ) - assert.NoError(t, err) - assert.NotNil(t, te) -} diff --git a/pkg/query-service/collectorsimulator/inmemoryreceiver/receiver.go b/pkg/query-service/collectorsimulator/inmemoryreceiver/receiver.go deleted file mode 100644 index d4b0a2abfe..0000000000 --- a/pkg/query-service/collectorsimulator/inmemoryreceiver/receiver.go +++ /dev/null @@ -1,64 +0,0 @@ -package inmemoryreceiver - -import ( - "context" - "fmt" - "sync" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/pdata/plog" -) - -// In memory receiver for testing and simulation -type InMemoryReceiver struct { - // Unique identifier for the receiver. - id string - - nextConsumer consumer.Logs -} - -func (r *InMemoryReceiver) ConsumeLogs(ctx context.Context, ld plog.Logs) error { - return r.nextConsumer.ConsumeLogs(ctx, ld) -} - -func (r *InMemoryReceiver) Capabilities() consumer.Capabilities { - return consumer.Capabilities{MutatesData: false} -} - -// Keep track of all receiver instances in the process. -// Useful for getting a hold of the receiver in scenarios where one doesn't -// create the instances. Eg: bringing up a collector service from collector config -var allReceiverInstances map[string]*InMemoryReceiver -var allReceiversLock sync.Mutex - -func init() { - allReceiverInstances = make(map[string]*InMemoryReceiver) -} - -func CleanupInstance(receiverId string) { - allReceiversLock.Lock() - defer allReceiversLock.Unlock() - delete(allReceiverInstances, receiverId) -} - -func (r *InMemoryReceiver) Start(ctx context.Context, host component.Host) error { - allReceiversLock.Lock() - defer allReceiversLock.Unlock() - - if allReceiverInstances[r.id] != nil { - return fmt.Errorf("receiver with id %s is already running", r.id) - } - - allReceiverInstances[r.id] = r - return nil -} - -func (r *InMemoryReceiver) Shutdown(ctx context.Context) error { - CleanupInstance(r.id) - return nil -} - -func GetReceiverInstance(id string) *InMemoryReceiver { - return allReceiverInstances[id] -} diff --git a/pkg/query-service/collectorsimulator/inmemoryreceiver/receiver_test.go b/pkg/query-service/collectorsimulator/inmemoryreceiver/receiver_test.go deleted file mode 100644 index a3fa1b81ca..0000000000 --- a/pkg/query-service/collectorsimulator/inmemoryreceiver/receiver_test.go +++ /dev/null @@ -1,66 +0,0 @@ -package inmemoryreceiver - -import ( - "context" - "testing" - - "github.com/google/uuid" - "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/confmap" - "go.opentelemetry.io/collector/consumer/consumertest" - "go.opentelemetry.io/collector/receiver" -) - -func TestReceiverLifecycle(t *testing.T) { - require := require.New(t) - testReceiverId := uuid.NewString() - - // Should be able to get a hold of the receiver after starting it. - require.Nil(GetReceiverInstance(testReceiverId), "receiver instance should not exist before Start()") - - constructed, err := makeTestLogReceiver(testReceiverId) - require.Nil(err, "could not make test receiver") - - err = constructed.Start(context.Background(), componenttest.NewNopHost()) - require.Nil(err, "could not start test receiver") - - testReceiver := GetReceiverInstance(testReceiverId) - require.NotNil(testReceiver, "could not get receiver instance by Id") - - // Should not be able to start 2 receivers with the same id - constructed2, err := makeTestLogReceiver(testReceiverId) - require.Nil(err, "could not create second receiver with same id") - - err = constructed2.Start(context.Background(), componenttest.NewNopHost()) - require.NotNil(err, "should not be able to start another receiver with same id before shutting down the previous one") - - // Should not be able to get a hold of an receiver after shutdown - testReceiver.Shutdown(context.Background()) - require.Nil(GetReceiverInstance(testReceiverId), "should not be able to find inmemory receiver after shutdown") - - // Should be able to start a new receiver with same id after shutting down - constructed3, err := makeTestLogReceiver(testReceiverId) - require.Nil(err, "could not make receiver with same Id after shutting down old one") - - err = constructed3.Start(context.Background(), componenttest.NewNopHost()) - require.Nil(err, "should be able to start another receiver with same id after shutting down the previous one") - - testReceiver3 := GetReceiverInstance(testReceiverId) - require.NotNil(testReceiver3, "could not get receiver instance by Id") - - testReceiver3.Shutdown(context.Background()) - require.Nil(GetReceiverInstance(testReceiverId)) -} - -func makeTestLogReceiver(receiverId string) (receiver.Logs, error) { - factory := NewFactory() - - cfg := factory.CreateDefaultConfig() - - confmap.NewFromStringMap(map[string]any{"id": receiverId}).Unmarshal(&cfg) - - return factory.CreateLogsReceiver( - context.Background(), receiver.Settings{}, cfg, consumertest.NewNop(), - ) -} diff --git a/pkg/query-service/collectorsimulator/logs.go b/pkg/query-service/collectorsimulator/logs.go deleted file mode 100644 index d1b4f01abb..0000000000 --- a/pkg/query-service/collectorsimulator/logs.go +++ /dev/null @@ -1,131 +0,0 @@ -package collectorsimulator - -import ( - "context" - "fmt" - "strings" - "time" - - "github.com/pkg/errors" - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/pdata/plog" - "go.opentelemetry.io/collector/processor" - "go.signoz.io/signoz/pkg/query-service/model" -) - -// Simulate processing of logs through the otel collector. -// Useful for testing, validation and generating previews. -func SimulateLogsProcessing( - ctx context.Context, - processorFactories map[component.Type]processor.Factory, - configGenerator ConfigGenerator, - logs []plog.Logs, - timeout time.Duration, -) ( - outputLogs []plog.Logs, collectorErrs []string, apiErr *model.ApiError, -) { - // Construct and start a simulator (wraps a collector service) - simulator, simulatorInitCleanup, apiErr := NewCollectorSimulator( - ctx, processorFactories, configGenerator, - ) - if simulatorInitCleanup != nil { - defer simulatorInitCleanup() - } - if apiErr != nil { - return nil, nil, model.WrapApiError(apiErr, "could not create logs processing simulator") - } - - simulatorCleanup, apiErr := simulator.Start(ctx) - // We can not rely on collector service to shutdown successfully and cleanup refs to inmemory components. - if simulatorCleanup != nil { - defer simulatorCleanup() - } - if apiErr != nil { - return nil, nil, apiErr - } - - // Do the simulation - for _, plog := range logs { - apiErr = SendLogsToSimulator(ctx, simulator, plog) - if apiErr != nil { - return nil, nil, model.WrapApiError(apiErr, "could not consume logs for simulation") - } - } - - result, apiErr := GetProcessedLogsFromSimulator( - simulator, len(logs), timeout, - ) - if apiErr != nil { - return nil, nil, model.InternalError(model.WrapApiError(apiErr, - "could not get processed logs from simulator", - )) - } - - // Shut down the simulator - simulationErrs, apiErr := simulator.Shutdown(ctx) - if apiErr != nil { - return nil, simulationErrs, model.WrapApiError(apiErr, - "could not shutdown logs processing simulator", - ) - } - - for _, log := range simulationErrs { - // if log is empty or log comes from featuregate.go, then remove it - if log == "" || strings.Contains(log, "featuregate.go") { - continue - } - collectorErrs = append(collectorErrs, log) - } - - return result, collectorErrs, nil -} - -func SendLogsToSimulator( - ctx context.Context, - simulator *CollectorSimulator, - plog plog.Logs, -) *model.ApiError { - receiver := simulator.GetReceiver() - if receiver == nil { - return model.InternalError(fmt.Errorf("could not find in memory receiver for simulator")) - } - if err := receiver.ConsumeLogs(ctx, plog); err != nil { - return model.InternalError(errors.Wrap(err, - "inmemory receiver could not consume logs for simulation", - )) - } - return nil -} - -func GetProcessedLogsFromSimulator( - simulator *CollectorSimulator, - minLogCount int, - timeout time.Duration, -) ( - []plog.Logs, *model.ApiError, -) { - exporter := simulator.GetExporter() - if exporter == nil { - return nil, model.InternalError(fmt.Errorf("could not find in memory exporter for simulator")) - } - - // Must do a time based wait to ensure all logs come through. - // For example, logstransformprocessor does internal batching and it - // takes (processorCount * batchTime) for logs to get through. - startTsMillis := time.Now().UnixMilli() - for { - elapsedMillis := time.Now().UnixMilli() - startTsMillis - if elapsedMillis > timeout.Milliseconds() { - break - } - - exportedLogs := exporter.GetLogs() - if len(exportedLogs) >= minLogCount { - return exportedLogs, nil - } - - time.Sleep(50 * time.Millisecond) - } - - return exporter.GetLogs(), nil -} diff --git a/pkg/query-service/collectorsimulator/logs_test.go b/pkg/query-service/collectorsimulator/logs_test.go deleted file mode 100644 index 628c33c537..0000000000 --- a/pkg/query-service/collectorsimulator/logs_test.go +++ /dev/null @@ -1,159 +0,0 @@ -package collectorsimulator - -import ( - "context" - "testing" - "time" - - "github.com/knadh/koanf/parsers/yaml" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/logstransformprocessor" - "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/pdata/plog" - "go.opentelemetry.io/collector/processor" -) - -type ProcessorConfig struct { - Name string - Config map[string]interface{} -} - -func TestLogsProcessingSimulation(t *testing.T) { - require := require.New(t) - - inputLogs := []plog.Logs{ - makeTestPlog("test log 1", map[string]string{ - "method": "GET", - }), - makeTestPlog("test log 2", map[string]string{ - "method": "POST", - }), - } - - testLogstransformConf1, err := yaml.Parser().Unmarshal([]byte(` - operators: - - type: router - id: router_signoz - routes: - - output: add - expr: attributes.method == "GET" - default: noop - - type: add - id: add - field: attributes.test - value: test-value-get - - type: noop - id: noop - `)) - require.Nil(err, "could not unmarshal test logstransform op config") - testProcessor1 := ProcessorConfig{ - Name: "logstransform/test", - Config: testLogstransformConf1, - } - - testLogstransformConf2, err := yaml.Parser().Unmarshal([]byte(` - operators: - - type: router - id: router_signoz - routes: - - output: add - expr: attributes.method == "POST" - default: noop - - type: add - id: add - field: attributes.test - value: test-value-post - - type: noop - id: noop - `)) - require.Nil(err, "could not unmarshal test logstransform op config") - testProcessor2 := ProcessorConfig{ - Name: "logstransform/test2", - Config: testLogstransformConf2, - } - - processorFactories, err := processor.MakeFactoryMap( - logstransformprocessor.NewFactory(), - ) - require.Nil(err, "could not create processors factory map") - - configGenerator := makeTestConfigGenerator( - []ProcessorConfig{testProcessor1, testProcessor2}, - ) - outputLogs, collectorErrs, apiErr := SimulateLogsProcessing( - context.Background(), - processorFactories, - configGenerator, - inputLogs, - 300*time.Millisecond, - ) - require.Nil(apiErr, apiErr.ToError().Error()) - require.Equal(len(collectorErrs), 0) - - for _, l := range outputLogs { - rl := l.ResourceLogs().At(0) - sl := rl.ScopeLogs().At(0) - record := sl.LogRecords().At(0) - method, exists := record.Attributes().Get("method") - require.True(exists) - testVal, exists := record.Attributes().Get("test") - require.True(exists) - if method.Str() == "GET" { - require.Equal(testVal.Str(), "test-value-get") - } else { - require.Equal(testVal.Str(), "test-value-post") - } - } -} - -func makeTestPlog(body string, attrsStr map[string]string) plog.Logs { - pl := plog.NewLogs() - rl := pl.ResourceLogs().AppendEmpty() - - scopeLog := rl.ScopeLogs().AppendEmpty() - slRecord := scopeLog.LogRecords().AppendEmpty() - slRecord.Body().SetStr(body) - slAttribs := slRecord.Attributes() - for k, v := range attrsStr { - slAttribs.PutStr(k, v) - } - - return pl -} - -func makeTestConfigGenerator( - processorConfigs []ProcessorConfig, -) ConfigGenerator { - return func(baseConf []byte) ([]byte, error) { - conf, err := yaml.Parser().Unmarshal([]byte(baseConf)) - if err != nil { - return nil, err - } - - processors := map[string]interface{}{} - if conf["processors"] != nil { - processors = conf["processors"].(map[string]interface{}) - } - logsProcessors := []string{} - svc := conf["service"].(map[string]interface{}) - svcPipelines := svc["pipelines"].(map[string]interface{}) - svcLogsPipeline := svcPipelines["logs"].(map[string]interface{}) - if svcLogsPipeline["processors"] != nil { - logsProcessors = svcLogsPipeline["processors"].([]string) - } - - for _, processorConf := range processorConfigs { - processors[processorConf.Name] = processorConf.Config - logsProcessors = append(logsProcessors, processorConf.Name) - } - - conf["processors"] = processors - svcLogsPipeline["processors"] = logsProcessors - - confYaml, err := yaml.Parser().Marshal(conf) - if err != nil { - return nil, err - } - - return confYaml, nil - } -}