Chore/qs use collector simulator from signoz otel collector (#6539)

* chore: qs: logs pipeline preview: use collectorsimulator from signoz-otel-collector

* chore: qs: remove collectorsimulator: located in signoz-otel-collector now
This commit is contained in:
Raj Kamal Singh 2024-11-27 11:53:39 +05:30 committed by GitHub
parent 328d955a74
commit 486632b64e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 9 additions and 1098 deletions

View File

@ -6,13 +6,13 @@ import (
"strings"
"time"
"github.com/SigNoz/signoz-otel-collector/pkg/collectorsimulator"
_ "github.com/SigNoz/signoz-otel-collector/pkg/parser/grok"
"github.com/SigNoz/signoz-otel-collector/processor/signozlogspipelineprocessor"
"github.com/pkg/errors"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/processor"
"go.signoz.io/signoz/pkg/query-service/collectorsimulator"
"go.signoz.io/signoz/pkg/query-service/model"
)
@ -66,14 +66,20 @@ func SimulatePipelinesProcessing(
return updatedConf, nil
}
outputPLogs, collectorErrs, apiErr := collectorsimulator.SimulateLogsProcessing(
outputPLogs, collectorErrs, simulationErr := collectorsimulator.SimulateLogsProcessing(
ctx,
processorFactories,
configGenerator,
simulatorInputPLogs,
timeout,
)
if apiErr != nil {
if simulationErr != nil {
if errors.Is(simulationErr, collectorsimulator.ErrInvalidConfig) {
apiErr = model.BadRequest(simulationErr)
} else {
apiErr = model.InternalError(simulationErr)
}
return nil, collectorErrs, model.WrapApiError(apiErr,
"could not simulate log pipelines processing.\nCollector errors",
)

View File

@ -1,265 +0,0 @@
package collectorsimulator
import (
"context"
"fmt"
"os"
"strings"
"github.com/google/uuid"
"github.com/pkg/errors"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/confmap/converter/expandconverter"
"go.opentelemetry.io/collector/confmap/provider/fileprovider"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/otelcol"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/service"
"go.signoz.io/signoz/pkg/query-service/collectorsimulator/inmemoryexporter"
"go.signoz.io/signoz/pkg/query-service/collectorsimulator/inmemoryreceiver"
"go.signoz.io/signoz/pkg/query-service/model"
)
// Puts together a collector service with inmemory receiver and exporter
// for simulating processing of signal data through an otel collector
type CollectorSimulator struct {
// collector service to be used for the simulation
collectorSvc *service.Service
// tmp file where collectorSvc will log errors.
collectorLogsOutputFilePath string
// error channel where collector components will report fatal errors
// Gets passed in as AsyncErrorChannel in service.Settings when creating a collector service.
collectorErrorChannel chan error
// Unique ids of inmemory receiver and exporter instances that
// will be created by collectorSvc
inMemoryReceiverId string
inMemoryExporterId string
}
type ConfigGenerator func(baseConfYaml []byte) ([]byte, error)
func NewCollectorSimulator(
ctx context.Context,
processorFactories map[component.Type]processor.Factory,
configGenerator ConfigGenerator,
) (simulator *CollectorSimulator, cleanupFn func(), apiErr *model.ApiError) {
// Put together collector component factories for use in the simulation
receiverFactories, err := receiver.MakeFactoryMap(inmemoryreceiver.NewFactory())
if err != nil {
return nil, nil, model.InternalError(errors.Wrap(err, "could not create receiver factories."))
}
exporterFactories, err := exporter.MakeFactoryMap(inmemoryexporter.NewFactory())
if err != nil {
return nil, nil, model.InternalError(errors.Wrap(err, "could not create processor factories."))
}
factories := otelcol.Factories{
Receivers: receiverFactories,
Processors: processorFactories,
Exporters: exporterFactories,
}
// Prepare collector config yaml for simulation
inMemoryReceiverId := uuid.NewString()
inMemoryExporterId := uuid.NewString()
logsOutputFile, err := os.CreateTemp("", "collector-simulator-logs-*")
if err != nil {
return nil, nil, model.InternalError(errors.Wrap(
err, "could not create tmp file for capturing collector logs",
))
}
collectorLogsOutputFilePath := logsOutputFile.Name()
cleanupFn = func() {
os.Remove(collectorLogsOutputFilePath)
}
err = logsOutputFile.Close()
if err != nil {
return nil, cleanupFn, model.InternalError(errors.Wrap(err, "could not close tmp collector log file"))
}
collectorConfYaml, err := generateSimulationConfig(
inMemoryReceiverId,
configGenerator,
inMemoryExporterId,
collectorLogsOutputFilePath,
)
if err != nil {
return nil, cleanupFn, model.BadRequest(errors.Wrap(err, "could not generate collector config"))
}
// Read collector config using the same file provider we use in the actual collector.
// This ensures env variable substitution if any is taken into account.
simulationConfigFile, err := os.CreateTemp("", "collector-simulator-config-*")
if err != nil {
return nil, nil, model.InternalError(errors.Wrap(
err, "could not create tmp file for capturing collector logs",
))
}
simulationConfigPath := simulationConfigFile.Name()
cleanupFn = func() {
os.Remove(collectorLogsOutputFilePath)
os.Remove(simulationConfigPath)
}
_, err = simulationConfigFile.Write(collectorConfYaml)
if err != nil {
return nil, cleanupFn, model.InternalError(errors.Wrap(err, "could not write simulation config to tmp file"))
}
err = simulationConfigFile.Close()
if err != nil {
return nil, cleanupFn, model.InternalError(errors.Wrap(err, "could not close tmp simulation config file"))
}
fp := fileprovider.NewFactory()
confProvider, err := otelcol.NewConfigProvider(otelcol.ConfigProviderSettings{
ResolverSettings: confmap.ResolverSettings{
URIs: []string{simulationConfigPath},
ProviderFactories: []confmap.ProviderFactory{fp},
ConverterFactories: []confmap.ConverterFactory{expandconverter.NewFactory()},
},
})
if err != nil {
return nil, cleanupFn, model.BadRequest(errors.Wrap(err, "could not create config provider."))
}
collectorCfg, err := confProvider.Get(ctx, factories)
if err != nil {
return nil, cleanupFn, model.BadRequest(errors.Wrap(err, "failed to parse collector config"))
}
if err = collectorCfg.Validate(); err != nil {
return nil, cleanupFn, model.BadRequest(errors.Wrap(err, "invalid collector config"))
}
// Build and start collector service.
collectorErrChan := make(chan error)
svcSettings := service.Settings{
ReceiversConfigs: collectorCfg.Receivers,
ReceiversFactories: factories.Receivers,
ProcessorsConfigs: collectorCfg.Processors,
ProcessorsFactories: factories.Processors,
ExportersConfigs: collectorCfg.Exporters,
ExportersFactories: factories.Exporters,
ConnectorsConfigs: collectorCfg.Connectors,
ConnectorsFactories: factories.Connectors,
ExtensionsConfigs: collectorCfg.Extensions,
ExtensionsFactories: factories.Extensions,
AsyncErrorChannel: collectorErrChan,
}
collectorSvc, err := service.New(ctx, svcSettings, collectorCfg.Service)
if err != nil {
return nil, cleanupFn, model.InternalError(errors.Wrap(err, "could not instantiate collector service"))
}
return &CollectorSimulator{
inMemoryReceiverId: inMemoryReceiverId,
inMemoryExporterId: inMemoryExporterId,
collectorSvc: collectorSvc,
collectorErrorChannel: collectorErrChan,
collectorLogsOutputFilePath: collectorLogsOutputFilePath,
}, cleanupFn, nil
}
func (l *CollectorSimulator) Start(ctx context.Context) (
func(), *model.ApiError,
) {
// Calling collectorSvc.Start below will in turn call Start on
// inmemory receiver and exporter instances created by collectorSvc
//
// inmemory components are indexed in a global map after Start is called
// on them and will have to be cleaned up to ensure there is no memory leak
cleanupFn := func() {
inmemoryreceiver.CleanupInstance(l.inMemoryReceiverId)
inmemoryexporter.CleanupInstance(l.inMemoryExporterId)
}
err := l.collectorSvc.Start(ctx)
if err != nil {
return cleanupFn, model.InternalError(errors.Wrap(err, "could not start collector service for simulation"))
}
return cleanupFn, nil
}
func (l *CollectorSimulator) GetReceiver() *inmemoryreceiver.InMemoryReceiver {
return inmemoryreceiver.GetReceiverInstance(l.inMemoryReceiverId)
}
func (l *CollectorSimulator) GetExporter() *inmemoryexporter.InMemoryExporter {
return inmemoryexporter.GetExporterInstance(l.inMemoryExporterId)
}
func (l *CollectorSimulator) Shutdown(ctx context.Context) (
simulationErrs []string, apiErr *model.ApiError,
) {
shutdownErr := l.collectorSvc.Shutdown(ctx)
// Collect all errors logged or reported by collectorSvc
simulationErrs = []string{}
close(l.collectorErrorChannel)
for reportedErr := range l.collectorErrorChannel {
simulationErrs = append(simulationErrs, reportedErr.Error())
}
collectorWarnAndErrorLogs, err := os.ReadFile(l.collectorLogsOutputFilePath)
if err != nil {
return nil, model.InternalError(fmt.Errorf(
"could not read collector logs from tmp file: %w", err,
))
}
if len(collectorWarnAndErrorLogs) > 0 {
errorLines := strings.Split(string(collectorWarnAndErrorLogs), "\n")
simulationErrs = append(simulationErrs, errorLines...)
}
if shutdownErr != nil {
return simulationErrs, model.InternalError(errors.Wrap(
shutdownErr, "could not shutdown the collector service",
))
}
return simulationErrs, nil
}
func generateSimulationConfig(
receiverId string,
configGenerator ConfigGenerator,
exporterId string,
collectorLogsOutputPath string,
) ([]byte, error) {
baseConf := fmt.Sprintf(`
receivers:
memory:
id: %s
exporters:
memory:
id: %s
service:
pipelines:
logs:
receivers:
- memory
exporters:
- memory
telemetry:
metrics:
level: none
logs:
level: warn
output_paths: ["%s"]
`, receiverId, exporterId, collectorLogsOutputPath)
return configGenerator([]byte(baseConf))
}

View File

@ -1,16 +0,0 @@
package inmemoryexporter
import "fmt"
type Config struct {
// Unique id for the exporter.
// Useful for getting a hold of the exporter in code that doesn't control its instantiation.
Id string `mapstructure:"id"`
}
func (c *Config) Validate() error {
if len(c.Id) < 1 {
return fmt.Errorf("inmemory exporter: id is required")
}
return nil
}

View File

@ -1,48 +0,0 @@
package inmemoryexporter
import (
"testing"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap"
)
func TestValidate(t *testing.T) {
tests := []struct {
name string
rawConf *confmap.Conf
errorExpected bool
}{
{
name: "with id",
rawConf: confmap.NewFromStringMap(map[string]interface{}{
"id": "test_exporter",
}),
errorExpected: false,
},
{
name: "empty id",
rawConf: confmap.NewFromStringMap(map[string]interface{}{
"id": "",
}),
errorExpected: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
err := tt.rawConf.Unmarshal(cfg)
require.NoError(t, err, "could not UnmarshalConfig")
err = component.ValidateConfig(cfg)
if tt.errorExpected {
require.NotNilf(t, err, "Invalid config did not return validation error: %v", cfg)
} else {
require.NoErrorf(t, err, "Valid config returned validation error: %v", cfg)
}
})
}
}

View File

@ -1,86 +0,0 @@
package inmemoryexporter
import (
"context"
"fmt"
"sync"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/plog"
)
// An in-memory exporter for testing and generating previews.
type InMemoryExporter struct {
// Unique identifier for the exporter.
id string
// mu protects the data below
mu sync.Mutex
// slice of pdata.Logs that were received by this exporter.
logs []plog.Logs
}
// ConsumeLogs implements component.LogsExporter.
func (e *InMemoryExporter) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
e.mu.Lock()
defer e.mu.Unlock()
e.logs = append(e.logs, ld)
return nil
}
func (e *InMemoryExporter) GetLogs() []plog.Logs {
e.mu.Lock()
defer e.mu.Unlock()
return e.logs
}
func (e *InMemoryExporter) ResetLogs() {
e.mu.Lock()
defer e.mu.Unlock()
e.logs = nil
}
func (e *InMemoryExporter) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}
// Keep track of all exporter instances in the process.
// Useful for getting a hold of the exporter in scenarios where one doesn't
// create the instances. Eg: bringing up a collector service from collector config
var allExporterInstances map[string]*InMemoryExporter
var allExportersLock sync.Mutex
func init() {
allExporterInstances = make(map[string]*InMemoryExporter)
}
func GetExporterInstance(id string) *InMemoryExporter {
return allExporterInstances[id]
}
func CleanupInstance(exporterId string) {
allExportersLock.Lock()
defer allExportersLock.Unlock()
delete(allExporterInstances, exporterId)
}
func (e *InMemoryExporter) Start(ctx context.Context, host component.Host) error {
allExportersLock.Lock()
defer allExportersLock.Unlock()
if allExporterInstances[e.id] != nil {
return fmt.Errorf("exporter with id %s is already running", e.id)
}
allExporterInstances[e.id] = e
return nil
}
func (e *InMemoryExporter) Shutdown(ctx context.Context) error {
CleanupInstance(e.id)
return nil
}

View File

@ -1,64 +0,0 @@
package inmemoryexporter
import (
"context"
"testing"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/exporter"
)
func TestExporterLifecycle(t *testing.T) {
require := require.New(t)
testExporterId := uuid.NewString()
// Should be able to get a hold of the exporter after starting it.
require.Nil(GetExporterInstance(testExporterId))
constructed, err := makeTestExporter(testExporterId)
require.Nil(err, "could not make test exporter")
err = constructed.Start(context.Background(), componenttest.NewNopHost())
require.Nil(err, "could not start test exporter")
testExporter := GetExporterInstance(testExporterId)
require.NotNil(testExporter, "could not get exporter instance by Id")
// Should not be able to start 2 exporters with the same id
constructed2, err := makeTestExporter(testExporterId)
require.Nil(err, "could not create second exporter with same id")
err = constructed2.Start(context.Background(), componenttest.NewNopHost())
require.NotNil(err, "should not be able to start another exporter with same id before shutting down the previous one")
// Should not be able to get a hold of an exporter after shutdown
testExporter.Shutdown(context.Background())
require.Nil(GetExporterInstance(testExporterId), "should not be able to find exporter instance after shutdown")
// Should be able to start a new exporter with same id after shutting down
constructed3, err := makeTestExporter(testExporterId)
require.Nil(err, "could not make exporter with same Id after shutting down previous one")
err = constructed3.Start(context.Background(), componenttest.NewNopHost())
require.Nil(err, "should be able to start another exporter with same id after shutting down the previous one")
testExporter3 := GetExporterInstance(testExporterId)
require.NotNil(testExporter3, "could not get exporter instance by Id")
testExporter3.Shutdown(context.Background())
require.Nil(GetExporterInstance(testExporterId))
}
func makeTestExporter(exporterId string) (exporter.Logs, error) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
confmap.NewFromStringMap(map[string]any{"id": exporterId}).Unmarshal(&cfg)
return factory.CreateLogsExporter(
context.Background(), exporter.Settings{}, cfg,
)
}

View File

@ -1,34 +0,0 @@
package inmemoryexporter
import (
"context"
"github.com/google/uuid"
"github.com/pkg/errors"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter"
)
func createDefaultConfig() component.Config {
return &Config{
Id: uuid.NewString(),
}
}
func createLogsExporter(
_ context.Context, _ exporter.Settings, config component.Config,
) (exporter.Logs, error) {
if err := component.ValidateConfig(config); err != nil {
return nil, errors.Wrap(err, "invalid inmemory exporter config")
}
return &InMemoryExporter{
id: config.(*Config).Id,
}, nil
}
func NewFactory() exporter.Factory {
return exporter.NewFactory(
component.MustNewType("memory"),
createDefaultConfig,
exporter.WithLogs(createLogsExporter, component.StabilityLevelBeta))
}

View File

@ -1,28 +0,0 @@
package inmemoryexporter
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/exporter"
)
func TestCreateDefaultConfig(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
assert.NotNil(t, cfg, "failed to create default config")
assert.NoError(t, componenttest.CheckConfigStruct(cfg))
}
func TestCreateLogsExporter(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
te, err := factory.CreateLogsExporter(
context.Background(), exporter.Settings{}, cfg,
)
assert.NoError(t, err)
assert.NotNil(t, te)
}

View File

@ -1,16 +0,0 @@
package inmemoryreceiver
import "fmt"
type Config struct {
// Unique id for the receiver.
// Useful for getting a hold of the receiver in code that doesn't control its instantiation.
Id string `mapstructure:"id"`
}
func (c *Config) Validate() error {
if len(c.Id) < 1 {
return fmt.Errorf("inmemory receiver: id is required")
}
return nil
}

View File

@ -1,48 +0,0 @@
package inmemoryreceiver
import (
"testing"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap"
)
func TestValidate(t *testing.T) {
tests := []struct {
name string
rawConf *confmap.Conf
errorExpected bool
}{
{
name: "with id",
rawConf: confmap.NewFromStringMap(map[string]interface{}{
"id": "test_receiver",
}),
errorExpected: false,
},
{
name: "empty id",
rawConf: confmap.NewFromStringMap(map[string]interface{}{
"id": "",
}),
errorExpected: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
err := tt.rawConf.Unmarshal(&cfg)
require.NoError(t, err, "could not UnmarshalConfig")
err = component.ValidateConfig(cfg)
if tt.errorExpected {
require.NotNilf(t, err, "Invalid config did not return validation error: %v", cfg)
} else {
require.NoErrorf(t, err, "Valid config returned validation error: %v", cfg)
}
})
}
}

View File

@ -1,41 +0,0 @@
package inmemoryreceiver
import (
"context"
"github.com/google/uuid"
"github.com/pkg/errors"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver"
)
func createDefaultConfig() component.Config {
return &Config{
Id: uuid.NewString(),
}
}
func createLogsReceiver(
_ context.Context,
_ receiver.Settings,
config component.Config,
consumer consumer.Logs,
) (receiver.Logs, error) {
if err := component.ValidateConfig(config); err != nil {
return nil, errors.Wrap(err, "invalid inmemory receiver config")
}
return &InMemoryReceiver{
id: config.(*Config).Id,
nextConsumer: consumer,
}, nil
}
// NewFactory creates a new OTLP receiver factory.
func NewFactory() receiver.Factory {
return receiver.NewFactory(
component.MustNewType("memory"),
createDefaultConfig,
receiver.WithLogs(createLogsReceiver, component.StabilityLevelBeta))
}

View File

@ -1,29 +0,0 @@
package inmemoryreceiver
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/receiver"
)
func TestCreateDefaultConfig(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
assert.NotNil(t, cfg, "failed to create default config")
assert.NoError(t, componenttest.CheckConfigStruct(cfg))
}
func TestCreateLogsReceiver(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
te, err := factory.CreateLogsReceiver(
context.Background(), receiver.Settings{}, cfg, consumertest.NewNop(),
)
assert.NoError(t, err)
assert.NotNil(t, te)
}

View File

@ -1,64 +0,0 @@
package inmemoryreceiver
import (
"context"
"fmt"
"sync"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/plog"
)
// In memory receiver for testing and simulation
type InMemoryReceiver struct {
// Unique identifier for the receiver.
id string
nextConsumer consumer.Logs
}
func (r *InMemoryReceiver) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
return r.nextConsumer.ConsumeLogs(ctx, ld)
}
func (r *InMemoryReceiver) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}
// Keep track of all receiver instances in the process.
// Useful for getting a hold of the receiver in scenarios where one doesn't
// create the instances. Eg: bringing up a collector service from collector config
var allReceiverInstances map[string]*InMemoryReceiver
var allReceiversLock sync.Mutex
func init() {
allReceiverInstances = make(map[string]*InMemoryReceiver)
}
func CleanupInstance(receiverId string) {
allReceiversLock.Lock()
defer allReceiversLock.Unlock()
delete(allReceiverInstances, receiverId)
}
func (r *InMemoryReceiver) Start(ctx context.Context, host component.Host) error {
allReceiversLock.Lock()
defer allReceiversLock.Unlock()
if allReceiverInstances[r.id] != nil {
return fmt.Errorf("receiver with id %s is already running", r.id)
}
allReceiverInstances[r.id] = r
return nil
}
func (r *InMemoryReceiver) Shutdown(ctx context.Context) error {
CleanupInstance(r.id)
return nil
}
func GetReceiverInstance(id string) *InMemoryReceiver {
return allReceiverInstances[id]
}

View File

@ -1,66 +0,0 @@
package inmemoryreceiver
import (
"context"
"testing"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/receiver"
)
func TestReceiverLifecycle(t *testing.T) {
require := require.New(t)
testReceiverId := uuid.NewString()
// Should be able to get a hold of the receiver after starting it.
require.Nil(GetReceiverInstance(testReceiverId), "receiver instance should not exist before Start()")
constructed, err := makeTestLogReceiver(testReceiverId)
require.Nil(err, "could not make test receiver")
err = constructed.Start(context.Background(), componenttest.NewNopHost())
require.Nil(err, "could not start test receiver")
testReceiver := GetReceiverInstance(testReceiverId)
require.NotNil(testReceiver, "could not get receiver instance by Id")
// Should not be able to start 2 receivers with the same id
constructed2, err := makeTestLogReceiver(testReceiverId)
require.Nil(err, "could not create second receiver with same id")
err = constructed2.Start(context.Background(), componenttest.NewNopHost())
require.NotNil(err, "should not be able to start another receiver with same id before shutting down the previous one")
// Should not be able to get a hold of an receiver after shutdown
testReceiver.Shutdown(context.Background())
require.Nil(GetReceiverInstance(testReceiverId), "should not be able to find inmemory receiver after shutdown")
// Should be able to start a new receiver with same id after shutting down
constructed3, err := makeTestLogReceiver(testReceiverId)
require.Nil(err, "could not make receiver with same Id after shutting down old one")
err = constructed3.Start(context.Background(), componenttest.NewNopHost())
require.Nil(err, "should be able to start another receiver with same id after shutting down the previous one")
testReceiver3 := GetReceiverInstance(testReceiverId)
require.NotNil(testReceiver3, "could not get receiver instance by Id")
testReceiver3.Shutdown(context.Background())
require.Nil(GetReceiverInstance(testReceiverId))
}
func makeTestLogReceiver(receiverId string) (receiver.Logs, error) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
confmap.NewFromStringMap(map[string]any{"id": receiverId}).Unmarshal(&cfg)
return factory.CreateLogsReceiver(
context.Background(), receiver.Settings{}, cfg, consumertest.NewNop(),
)
}

View File

@ -1,131 +0,0 @@
package collectorsimulator
import (
"context"
"fmt"
"strings"
"time"
"github.com/pkg/errors"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/processor"
"go.signoz.io/signoz/pkg/query-service/model"
)
// Simulate processing of logs through the otel collector.
// Useful for testing, validation and generating previews.
func SimulateLogsProcessing(
ctx context.Context,
processorFactories map[component.Type]processor.Factory,
configGenerator ConfigGenerator,
logs []plog.Logs,
timeout time.Duration,
) (
outputLogs []plog.Logs, collectorErrs []string, apiErr *model.ApiError,
) {
// Construct and start a simulator (wraps a collector service)
simulator, simulatorInitCleanup, apiErr := NewCollectorSimulator(
ctx, processorFactories, configGenerator,
)
if simulatorInitCleanup != nil {
defer simulatorInitCleanup()
}
if apiErr != nil {
return nil, nil, model.WrapApiError(apiErr, "could not create logs processing simulator")
}
simulatorCleanup, apiErr := simulator.Start(ctx)
// We can not rely on collector service to shutdown successfully and cleanup refs to inmemory components.
if simulatorCleanup != nil {
defer simulatorCleanup()
}
if apiErr != nil {
return nil, nil, apiErr
}
// Do the simulation
for _, plog := range logs {
apiErr = SendLogsToSimulator(ctx, simulator, plog)
if apiErr != nil {
return nil, nil, model.WrapApiError(apiErr, "could not consume logs for simulation")
}
}
result, apiErr := GetProcessedLogsFromSimulator(
simulator, len(logs), timeout,
)
if apiErr != nil {
return nil, nil, model.InternalError(model.WrapApiError(apiErr,
"could not get processed logs from simulator",
))
}
// Shut down the simulator
simulationErrs, apiErr := simulator.Shutdown(ctx)
if apiErr != nil {
return nil, simulationErrs, model.WrapApiError(apiErr,
"could not shutdown logs processing simulator",
)
}
for _, log := range simulationErrs {
// if log is empty or log comes from featuregate.go, then remove it
if log == "" || strings.Contains(log, "featuregate.go") {
continue
}
collectorErrs = append(collectorErrs, log)
}
return result, collectorErrs, nil
}
func SendLogsToSimulator(
ctx context.Context,
simulator *CollectorSimulator,
plog plog.Logs,
) *model.ApiError {
receiver := simulator.GetReceiver()
if receiver == nil {
return model.InternalError(fmt.Errorf("could not find in memory receiver for simulator"))
}
if err := receiver.ConsumeLogs(ctx, plog); err != nil {
return model.InternalError(errors.Wrap(err,
"inmemory receiver could not consume logs for simulation",
))
}
return nil
}
func GetProcessedLogsFromSimulator(
simulator *CollectorSimulator,
minLogCount int,
timeout time.Duration,
) (
[]plog.Logs, *model.ApiError,
) {
exporter := simulator.GetExporter()
if exporter == nil {
return nil, model.InternalError(fmt.Errorf("could not find in memory exporter for simulator"))
}
// Must do a time based wait to ensure all logs come through.
// For example, logstransformprocessor does internal batching and it
// takes (processorCount * batchTime) for logs to get through.
startTsMillis := time.Now().UnixMilli()
for {
elapsedMillis := time.Now().UnixMilli() - startTsMillis
if elapsedMillis > timeout.Milliseconds() {
break
}
exportedLogs := exporter.GetLogs()
if len(exportedLogs) >= minLogCount {
return exportedLogs, nil
}
time.Sleep(50 * time.Millisecond)
}
return exporter.GetLogs(), nil
}

View File

@ -1,159 +0,0 @@
package collectorsimulator
import (
"context"
"testing"
"time"
"github.com/knadh/koanf/parsers/yaml"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/logstransformprocessor"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/processor"
)
type ProcessorConfig struct {
Name string
Config map[string]interface{}
}
func TestLogsProcessingSimulation(t *testing.T) {
require := require.New(t)
inputLogs := []plog.Logs{
makeTestPlog("test log 1", map[string]string{
"method": "GET",
}),
makeTestPlog("test log 2", map[string]string{
"method": "POST",
}),
}
testLogstransformConf1, err := yaml.Parser().Unmarshal([]byte(`
operators:
- type: router
id: router_signoz
routes:
- output: add
expr: attributes.method == "GET"
default: noop
- type: add
id: add
field: attributes.test
value: test-value-get
- type: noop
id: noop
`))
require.Nil(err, "could not unmarshal test logstransform op config")
testProcessor1 := ProcessorConfig{
Name: "logstransform/test",
Config: testLogstransformConf1,
}
testLogstransformConf2, err := yaml.Parser().Unmarshal([]byte(`
operators:
- type: router
id: router_signoz
routes:
- output: add
expr: attributes.method == "POST"
default: noop
- type: add
id: add
field: attributes.test
value: test-value-post
- type: noop
id: noop
`))
require.Nil(err, "could not unmarshal test logstransform op config")
testProcessor2 := ProcessorConfig{
Name: "logstransform/test2",
Config: testLogstransformConf2,
}
processorFactories, err := processor.MakeFactoryMap(
logstransformprocessor.NewFactory(),
)
require.Nil(err, "could not create processors factory map")
configGenerator := makeTestConfigGenerator(
[]ProcessorConfig{testProcessor1, testProcessor2},
)
outputLogs, collectorErrs, apiErr := SimulateLogsProcessing(
context.Background(),
processorFactories,
configGenerator,
inputLogs,
300*time.Millisecond,
)
require.Nil(apiErr, apiErr.ToError().Error())
require.Equal(len(collectorErrs), 0)
for _, l := range outputLogs {
rl := l.ResourceLogs().At(0)
sl := rl.ScopeLogs().At(0)
record := sl.LogRecords().At(0)
method, exists := record.Attributes().Get("method")
require.True(exists)
testVal, exists := record.Attributes().Get("test")
require.True(exists)
if method.Str() == "GET" {
require.Equal(testVal.Str(), "test-value-get")
} else {
require.Equal(testVal.Str(), "test-value-post")
}
}
}
func makeTestPlog(body string, attrsStr map[string]string) plog.Logs {
pl := plog.NewLogs()
rl := pl.ResourceLogs().AppendEmpty()
scopeLog := rl.ScopeLogs().AppendEmpty()
slRecord := scopeLog.LogRecords().AppendEmpty()
slRecord.Body().SetStr(body)
slAttribs := slRecord.Attributes()
for k, v := range attrsStr {
slAttribs.PutStr(k, v)
}
return pl
}
func makeTestConfigGenerator(
processorConfigs []ProcessorConfig,
) ConfigGenerator {
return func(baseConf []byte) ([]byte, error) {
conf, err := yaml.Parser().Unmarshal([]byte(baseConf))
if err != nil {
return nil, err
}
processors := map[string]interface{}{}
if conf["processors"] != nil {
processors = conf["processors"].(map[string]interface{})
}
logsProcessors := []string{}
svc := conf["service"].(map[string]interface{})
svcPipelines := svc["pipelines"].(map[string]interface{})
svcLogsPipeline := svcPipelines["logs"].(map[string]interface{})
if svcLogsPipeline["processors"] != nil {
logsProcessors = svcLogsPipeline["processors"].([]string)
}
for _, processorConf := range processorConfigs {
processors[processorConf.Name] = processorConf.Config
logsProcessors = append(logsProcessors, processorConf.Name)
}
conf["processors"] = processors
svcLogsPipeline["processors"] = logsProcessors
confYaml, err := yaml.Parser().Marshal(conf)
if err != nil {
return nil, err
}
return confYaml, nil
}
}