diff --git a/pkg/cache/memorycache/provider_test.go b/pkg/cache/memorycache/provider_test.go index e17b36eb7b..abc816c050 100644 --- a/pkg/cache/memorycache/provider_test.go +++ b/pkg/cache/memorycache/provider_test.go @@ -9,7 +9,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.signoz.io/signoz/pkg/cache" - "go.signoz.io/signoz/pkg/factory/providertest" + "go.signoz.io/signoz/pkg/factory/factorytest" ) // TestNew tests the New function @@ -18,7 +18,7 @@ func TestNew(t *testing.T) { TTL: 10 * time.Second, CleanupInterval: 10 * time.Second, } - c, err := New(context.Background(), providertest.NewSettings(), cache.Config{Provider: "memory", Memory: opts}) + c, err := New(context.Background(), factorytest.NewSettings(), cache.Config{Provider: "memory", Memory: opts}) require.NoError(t, err) assert.NotNil(t, c) assert.NotNil(t, c.(*provider).cc) @@ -60,7 +60,7 @@ func TestStoreWithNilPointer(t *testing.T) { TTL: 10 * time.Second, CleanupInterval: 10 * time.Second, } - c, err := New(context.Background(), providertest.NewSettings(), cache.Config{Provider: "memory", Memory: opts}) + c, err := New(context.Background(), factorytest.NewSettings(), cache.Config{Provider: "memory", Memory: opts}) require.NoError(t, err) var storeCacheableEntity *CacheableEntity assert.Error(t, c.Store(context.Background(), "key", storeCacheableEntity, 10*time.Second)) @@ -72,7 +72,7 @@ func TestStoreWithStruct(t *testing.T) { TTL: 10 * time.Second, CleanupInterval: 10 * time.Second, } - c, err := New(context.Background(), providertest.NewSettings(), cache.Config{Provider: "memory", Memory: opts}) + c, err := New(context.Background(), factorytest.NewSettings(), cache.Config{Provider: "memory", Memory: opts}) require.NoError(t, err) var storeCacheableEntity CacheableEntity assert.Error(t, c.Store(context.Background(), "key", storeCacheableEntity, 10*time.Second)) @@ -83,7 +83,7 @@ func TestStoreWithNonNilPointer(t *testing.T) { TTL: 10 * time.Second, CleanupInterval: 10 * time.Second, } - c, err := New(context.Background(), providertest.NewSettings(), cache.Config{Provider: "memory", Memory: opts}) + c, err := New(context.Background(), factorytest.NewSettings(), cache.Config{Provider: "memory", Memory: opts}) require.NoError(t, err) storeCacheableEntity := &CacheableEntity{ Key: "some-random-key", @@ -99,7 +99,7 @@ func TestRetrieveWithNilPointer(t *testing.T) { TTL: 10 * time.Second, CleanupInterval: 10 * time.Second, } - c, err := New(context.Background(), providertest.NewSettings(), cache.Config{Provider: "memory", Memory: opts}) + c, err := New(context.Background(), factorytest.NewSettings(), cache.Config{Provider: "memory", Memory: opts}) require.NoError(t, err) storeCacheableEntity := &CacheableEntity{ Key: "some-random-key", @@ -120,7 +120,7 @@ func TestRetrieveWitNonPointer(t *testing.T) { TTL: 10 * time.Second, CleanupInterval: 10 * time.Second, } - c, err := New(context.Background(), providertest.NewSettings(), cache.Config{Provider: "memory", Memory: opts}) + c, err := New(context.Background(), factorytest.NewSettings(), cache.Config{Provider: "memory", Memory: opts}) require.NoError(t, err) storeCacheableEntity := &CacheableEntity{ Key: "some-random-key", @@ -141,7 +141,7 @@ func TestRetrieveWithDifferentTypes(t *testing.T) { TTL: 10 * time.Second, CleanupInterval: 10 * time.Second, } - c, err := New(context.Background(), providertest.NewSettings(), cache.Config{Provider: "memory", Memory: opts}) + c, err := New(context.Background(), factorytest.NewSettings(), cache.Config{Provider: "memory", Memory: opts}) require.NoError(t, err) storeCacheableEntity := &CacheableEntity{ Key: "some-random-key", @@ -161,7 +161,7 @@ func TestRetrieveWithSameTypes(t *testing.T) { TTL: 10 * time.Second, CleanupInterval: 10 * time.Second, } - c, err := New(context.Background(), providertest.NewSettings(), cache.Config{Provider: "memory", Memory: opts}) + c, err := New(context.Background(), factorytest.NewSettings(), cache.Config{Provider: "memory", Memory: opts}) require.NoError(t, err) storeCacheableEntity := &CacheableEntity{ Key: "some-random-key", @@ -179,7 +179,7 @@ func TestRetrieveWithSameTypes(t *testing.T) { // TestSetTTL tests the SetTTL function func TestSetTTL(t *testing.T) { - c, err := New(context.Background(), providertest.NewSettings(), cache.Config{Provider: "memory", Memory: cache.Memory{TTL: 10 * time.Second, CleanupInterval: 1 * time.Second}}) + c, err := New(context.Background(), factorytest.NewSettings(), cache.Config{Provider: "memory", Memory: cache.Memory{TTL: 10 * time.Second, CleanupInterval: 1 * time.Second}}) require.NoError(t, err) storeCacheableEntity := &CacheableEntity{ Key: "some-random-key", @@ -209,7 +209,7 @@ func TestRemove(t *testing.T) { TTL: 10 * time.Second, CleanupInterval: 10 * time.Second, } - c, err := New(context.Background(), providertest.NewSettings(), cache.Config{Provider: "memory", Memory: opts}) + c, err := New(context.Background(), factorytest.NewSettings(), cache.Config{Provider: "memory", Memory: opts}) require.NoError(t, err) storeCacheableEntity := &CacheableEntity{ Key: "some-random-key", @@ -232,7 +232,7 @@ func TestBulkRemove(t *testing.T) { TTL: 10 * time.Second, CleanupInterval: 10 * time.Second, } - c, err := New(context.Background(), providertest.NewSettings(), cache.Config{Provider: "memory", Memory: opts}) + c, err := New(context.Background(), factorytest.NewSettings(), cache.Config{Provider: "memory", Memory: opts}) require.NoError(t, err) storeCacheableEntity := &CacheableEntity{ Key: "some-random-key", @@ -261,7 +261,7 @@ func TestCache(t *testing.T) { TTL: 10 * time.Second, CleanupInterval: 10 * time.Second, } - c, err := New(context.Background(), providertest.NewSettings(), cache.Config{Provider: "memory", Memory: opts}) + c, err := New(context.Background(), factorytest.NewSettings(), cache.Config{Provider: "memory", Memory: opts}) require.NoError(t, err) storeCacheableEntity := &CacheableEntity{ Key: "some-random-key", diff --git a/pkg/factory/providertest/setting.go b/pkg/factory/factorytest/settings.go similarity index 91% rename from pkg/factory/providertest/setting.go rename to pkg/factory/factorytest/settings.go index a42de90115..ee1a61ff21 100644 --- a/pkg/factory/providertest/setting.go +++ b/pkg/factory/factorytest/settings.go @@ -1,4 +1,4 @@ -package providertest +package factorytest import ( "go.signoz.io/signoz/pkg/factory" diff --git a/pkg/factory/name.go b/pkg/factory/name.go index 9646fe2c5c..6a76f93ad5 100644 --- a/pkg/factory/name.go +++ b/pkg/factory/name.go @@ -2,9 +2,12 @@ package factory import ( "fmt" + "log/slog" "regexp" ) +var _ slog.LogValuer = (Name{}) + var ( // nameRegex is a regex that matches a valid name. // It must start with a alphabet, and can only contain alphabets, numbers, underscores or hyphens. @@ -15,6 +18,10 @@ type Name struct { name string } +func (n Name) LogValue() slog.Value { + return slog.StringValue(n.name) +} + func (n Name) String() string { return n.name } diff --git a/pkg/registry/registry.go b/pkg/factory/registry.go similarity index 54% rename from pkg/registry/registry.go rename to pkg/factory/registry.go index 410044970b..2eda44279b 100644 --- a/pkg/registry/registry.go +++ b/pkg/factory/registry.go @@ -1,26 +1,24 @@ -package registry +package factory import ( "context" "errors" "fmt" + "log/slog" "os" "os/signal" "syscall" - - "go.signoz.io/signoz/pkg/factory" - "go.uber.org/zap" ) type Registry struct { - services []factory.Service - logger *zap.Logger + services NamedMap[NamedService] + logger *slog.Logger startCh chan error stopCh chan error } // New creates a new registry of services. It needs at least one service in the input. -func New(logger *zap.Logger, services ...factory.Service) (*Registry, error) { +func NewRegistry(logger *slog.Logger, services ...NamedService) (*Registry, error) { if logger == nil { return nil, fmt.Errorf("cannot build registry, logger is required") } @@ -29,17 +27,23 @@ func New(logger *zap.Logger, services ...factory.Service) (*Registry, error) { return nil, fmt.Errorf("cannot build registry, at least one service is required") } + m, err := NewNamedMap(services...) + if err != nil { + return nil, err + } + return &Registry{ - logger: logger.Named("go.signoz.io/pkg/registry"), - services: services, + logger: logger.With("pkg", "go.signoz.io/pkg/factory"), + services: m, startCh: make(chan error, 1), stopCh: make(chan error, len(services)), }, nil } func (r *Registry) Start(ctx context.Context) error { - for _, s := range r.services { - go func(s factory.Service) { + for _, s := range r.services.GetInOrder() { + go func(s NamedService) { + r.logger.InfoContext(ctx, "starting service", "service", s.Name()) err := s.Start(ctx) r.startCh <- err }(s) @@ -54,11 +58,11 @@ func (r *Registry) Wait(ctx context.Context) error { select { case <-ctx.Done(): - r.logger.Info("caught context error, exiting", zap.Any("context", ctx)) + r.logger.InfoContext(ctx, "caught context error, exiting", "error", ctx.Err()) case s := <-interrupt: - r.logger.Info("caught interrupt signal, exiting", zap.Any("context", ctx), zap.Any("signal", s)) + r.logger.InfoContext(ctx, "caught interrupt signal, exiting", "signal", s) case err := <-r.startCh: - r.logger.Info("caught service error, exiting", zap.Any("context", ctx), zap.Error(err)) + r.logger.ErrorContext(ctx, "caught service error, exiting", "error", err) return err } @@ -66,15 +70,16 @@ func (r *Registry) Wait(ctx context.Context) error { } func (r *Registry) Stop(ctx context.Context) error { - for _, s := range r.services { - go func(s factory.Service) { + for _, s := range r.services.GetInOrder() { + go func(s NamedService) { + r.logger.InfoContext(ctx, "stopping service", "service", s.Name()) err := s.Stop(ctx) r.stopCh <- err }(s) } - errs := make([]error, len(r.services)) - for i := 0; i < len(r.services); i++ { + errs := make([]error, len(r.services.GetInOrder())) + for i := 0; i < len(r.services.GetInOrder()); i++ { err := <-r.stopCh if err != nil { errs = append(errs, err) diff --git a/pkg/factory/registry_test.go b/pkg/factory/registry_test.go new file mode 100644 index 0000000000..57a7b0df67 --- /dev/null +++ b/pkg/factory/registry_test.go @@ -0,0 +1,70 @@ +package factory + +import ( + "context" + "io" + "log/slog" + "sync" + "testing" + + "github.com/stretchr/testify/require" +) + +type tservice struct { + c chan struct{} +} + +func newTestService(t *testing.T) *tservice { + t.Helper() + return &tservice{c: make(chan struct{})} +} + +func (s *tservice) Start(_ context.Context) error { + <-s.c + return nil +} + +func (s *tservice) Stop(_ context.Context) error { + close(s.c) + return nil +} + +func TestRegistryWith2Services(t *testing.T) { + s1 := newTestService(t) + s2 := newTestService(t) + + registry, err := NewRegistry(slog.New(slog.NewTextHandler(io.Discard, nil)), NewNamedService(MustNewName("s1"), s1), NewNamedService(MustNewName("s2"), s2)) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + require.NoError(t, registry.Start(ctx)) + require.NoError(t, registry.Wait(ctx)) + require.NoError(t, registry.Stop(ctx)) + }() + cancel() + + wg.Wait() +} + +func TestRegistryWith2ServicesWithoutWait(t *testing.T) { + s1 := newTestService(t) + s2 := newTestService(t) + + registry, err := NewRegistry(slog.New(slog.NewTextHandler(io.Discard, nil)), NewNamedService(MustNewName("s1"), s1), NewNamedService(MustNewName("s2"), s2)) + require.NoError(t, err) + + ctx := context.Background() + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + require.NoError(t, registry.Start(ctx)) + require.NoError(t, registry.Stop(ctx)) + }() + + wg.Wait() +} diff --git a/pkg/factory/service.go b/pkg/factory/service.go index 880f6fe2e3..1bde069350 100644 --- a/pkg/factory/service.go +++ b/pkg/factory/service.go @@ -3,8 +3,29 @@ package factory import "context" type Service interface { - // Starts a service. The service should return an error if it cannot be started. + // Starts a service. It should block and should not return until the service is stopped or it fails. Start(context.Context) error // Stops a service. Stop(context.Context) error } + +type NamedService interface { + Named + Service +} + +type namedService struct { + name Name + Service +} + +func (s *namedService) Name() Name { + return s.name +} + +func NewNamedService(name Name, service Service) NamedService { + return &namedService{ + name: name, + Service: service, + } +} diff --git a/pkg/factory/servicetest/http.go b/pkg/factory/servicetest/http.go deleted file mode 100644 index 9960c70c39..0000000000 --- a/pkg/factory/servicetest/http.go +++ /dev/null @@ -1,51 +0,0 @@ -package servicetest - -import ( - "context" - "net" - "net/http" - - "go.signoz.io/signoz/pkg/factory" -) - -var _ factory.Service = (*httpService)(nil) - -type httpService struct { - Listener net.Listener - Server *http.Server - name string -} - -func NewHttpService(name string) (*httpService, error) { - return &httpService{ - name: name, - Server: &http.Server{}, - }, nil -} - -func (service *httpService) Name() factory.Name { - return factory.MustNewName(service.name) -} - -func (service *httpService) Start(ctx context.Context) error { - listener, err := net.Listen("tcp", "localhost:0") - if err != nil { - return err - } - service.Listener = listener - - if err := service.Server.Serve(service.Listener); err != nil { - if err != http.ErrServerClosed { - return err - } - } - return nil -} - -func (service *httpService) Stop(ctx context.Context) error { - if err := service.Server.Shutdown(ctx); err != nil { - return err - } - - return nil -} diff --git a/pkg/factory/setting.go b/pkg/factory/settings.go similarity index 100% rename from pkg/factory/setting.go rename to pkg/factory/settings.go diff --git a/pkg/query-service/utils/testutils.go b/pkg/query-service/utils/testutils.go index 4c008e3b85..cf4a5882d9 100644 --- a/pkg/query-service/utils/testutils.go +++ b/pkg/query-service/utils/testutils.go @@ -7,7 +7,7 @@ import ( _ "github.com/mattn/go-sqlite3" "go.signoz.io/signoz/pkg/factory" - "go.signoz.io/signoz/pkg/factory/providertest" + "go.signoz.io/signoz/pkg/factory/factorytest" "go.signoz.io/signoz/pkg/query-service/app/dashboards" "go.signoz.io/signoz/pkg/query-service/dao" "go.signoz.io/signoz/pkg/sqlmigration" @@ -25,14 +25,14 @@ func NewTestSqliteDB(t *testing.T) (sqlStore sqlstore.SQLStore, testDBFilePath s t.Cleanup(func() { os.Remove(testDBFilePath) }) testDBFile.Close() - sqlStore, err = sqlitesqlstore.New(context.Background(), providertest.NewSettings(), sqlstore.Config{Provider: "sqlite", Sqlite: sqlstore.SqliteConfig{Path: testDBFilePath}}) + sqlStore, err = sqlitesqlstore.New(context.Background(), factorytest.NewSettings(), sqlstore.Config{Provider: "sqlite", Sqlite: sqlstore.SqliteConfig{Path: testDBFilePath}}) if err != nil { t.Fatalf("could not create test db sqlite store: %v", err) } sqlmigrations, err := sqlmigration.New( context.Background(), - providertest.NewSettings(), + factorytest.NewSettings(), sqlmigration.Config{}, factory.MustNewNamedMap( sqlmigration.NewAddDataMigrationsFactory(), @@ -51,7 +51,7 @@ func NewTestSqliteDB(t *testing.T) (sqlStore sqlstore.SQLStore, testDBFilePath s t.Fatalf("could not create test db sql migrations: %v", err) } - err = sqlmigrator.New(context.Background(), providertest.NewSettings(), sqlStore, sqlmigrations, sqlmigrator.Config{}).Migrate(context.Background()) + err = sqlmigrator.New(context.Background(), factorytest.NewSettings(), sqlStore, sqlmigrations, sqlmigrator.Config{}).Migrate(context.Background()) if err != nil { t.Fatalf("could not migrate test db sql migrations: %v", err) } diff --git a/pkg/registry/doc.go b/pkg/registry/doc.go deleted file mode 100644 index ff2debbefe..0000000000 --- a/pkg/registry/doc.go +++ /dev/null @@ -1,3 +0,0 @@ -// package registry contains a simple implementation of https://github.com/google/guava/wiki/ServiceExplained -// Here the the "ServiceManager" is called the "Registry" -package registry diff --git a/pkg/registry/registry_test.go b/pkg/registry/registry_test.go deleted file mode 100644 index 2fb46e28db..0000000000 --- a/pkg/registry/registry_test.go +++ /dev/null @@ -1,57 +0,0 @@ -package registry - -import ( - "context" - "sync" - "testing" - - "github.com/stretchr/testify/require" - "go.signoz.io/signoz/pkg/factory/servicetest" - "go.uber.org/zap" -) - -func TestRegistryWith2HttpServers(t *testing.T) { - http1, err := servicetest.NewHttpService("http1") - require.NoError(t, err) - - http2, err := servicetest.NewHttpService("http2") - require.NoError(t, err) - - registry, err := New(zap.NewNop(), http1, http2) - require.NoError(t, err) - - ctx, cancel := context.WithCancel(context.Background()) - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - require.NoError(t, registry.Start(ctx)) - require.NoError(t, registry.Wait(ctx)) - require.NoError(t, registry.Stop(ctx)) - }() - cancel() - - wg.Wait() -} - -func TestRegistryWith2HttpServersWithoutWait(t *testing.T) { - http1, err := servicetest.NewHttpService("http1") - require.NoError(t, err) - - http2, err := servicetest.NewHttpService("http2") - require.NoError(t, err) - - registry, err := New(zap.NewNop(), http1, http2) - require.NoError(t, err) - - ctx := context.Background() - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - require.NoError(t, registry.Start(ctx)) - require.NoError(t, registry.Stop(ctx)) - }() - - wg.Wait() -} diff --git a/pkg/web/routerweb/provider_test.go b/pkg/web/routerweb/provider_test.go index 554cde3248..52be2ec970 100644 --- a/pkg/web/routerweb/provider_test.go +++ b/pkg/web/routerweb/provider_test.go @@ -12,7 +12,7 @@ import ( "github.com/gorilla/mux" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.signoz.io/signoz/pkg/factory/providertest" + "go.signoz.io/signoz/pkg/factory/factorytest" "go.signoz.io/signoz/pkg/web" ) @@ -24,7 +24,7 @@ func TestServeHttpWithoutPrefix(t *testing.T) { expected, err := io.ReadAll(fi) require.NoError(t, err) - web, err := New(context.Background(), providertest.NewSettings(), web.Config{Prefix: "/", Directory: filepath.Join("testdata")}) + web, err := New(context.Background(), factorytest.NewSettings(), web.Config{Prefix: "/", Directory: filepath.Join("testdata")}) require.NoError(t, err) router := mux.NewRouter() @@ -89,7 +89,7 @@ func TestServeHttpWithPrefix(t *testing.T) { expected, err := io.ReadAll(fi) require.NoError(t, err) - web, err := New(context.Background(), providertest.NewSettings(), web.Config{Prefix: "/web", Directory: filepath.Join("testdata")}) + web, err := New(context.Background(), factorytest.NewSettings(), web.Config{Prefix: "/web", Directory: filepath.Join("testdata")}) require.NoError(t, err) router := mux.NewRouter()