mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-08-12 03:39:02 +08:00
feat: added new cache package for query service (#6733)
* feat: added new cache package for query service * feat: handle type checking for inmemory * feat: some copy corrections * feat: added inmemory test cases * chore: some renaming * feat: added redis handling * chore: add redis tests * feat(cache): refactor the code * feat(cache): refactor the code * feat(cache): added defaults for redis config * feat(cache): update makefile to run all tetss * feat(cache): update tests and docs * feat(cache): update tests and docs * feat(cache): handle signoz web flag * feat(cache): handle signoz web flag * feat(cache): handle signoz web flag
This commit is contained in:
parent
c5938b6c10
commit
4967696da8
2
Makefile
2
Makefile
@ -190,4 +190,4 @@ check-no-ee-references:
|
||||
fi
|
||||
|
||||
test:
|
||||
go test ./pkg/query-service/...
|
||||
go test ./pkg/...
|
||||
|
@ -9,3 +9,24 @@ web:
|
||||
prefix: /
|
||||
# The directory containing the static build files.
|
||||
directory: /etc/signoz/web
|
||||
|
||||
##################### Cache #####################
|
||||
cache:
|
||||
# specifies the caching provider to use.
|
||||
provider: memory
|
||||
# memory: Uses in-memory caching.
|
||||
memory:
|
||||
# Time-to-live for cache entries in memory. Specify the duration in ns
|
||||
ttl: 60000000000
|
||||
# The interval at which the cache will be cleaned up
|
||||
cleanupInterval:
|
||||
# redis: Uses Redis as the caching backend.
|
||||
redis:
|
||||
# The hostname or IP address of the Redis server.
|
||||
host: localhost
|
||||
# The port on which the Redis server is running. Default is usually 6379.
|
||||
port: 6379
|
||||
# The password for authenticating with the Redis server, if required.
|
||||
password:
|
||||
# The Redis database number to use
|
||||
db: 0
|
@ -32,6 +32,7 @@ import (
|
||||
baseauth "go.signoz.io/signoz/pkg/query-service/auth"
|
||||
"go.signoz.io/signoz/pkg/query-service/migrate"
|
||||
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||
"go.signoz.io/signoz/pkg/signoz"
|
||||
"go.signoz.io/signoz/pkg/web"
|
||||
|
||||
licensepkg "go.signoz.io/signoz/ee/query-service/license"
|
||||
@ -62,6 +63,7 @@ import (
|
||||
const AppDbEngine = "sqlite"
|
||||
|
||||
type ServerOptions struct {
|
||||
SigNoz *signoz.SigNoz
|
||||
PromConfigPath string
|
||||
SkipTopLvlOpsPath string
|
||||
HTTPHostPort string
|
||||
@ -109,7 +111,7 @@ func (s Server) HealthCheckStatus() chan healthcheck.Status {
|
||||
}
|
||||
|
||||
// NewServer creates and initializes Server
|
||||
func NewServer(serverOptions *ServerOptions, web *web.Web) (*Server, error) {
|
||||
func NewServer(serverOptions *ServerOptions) (*Server, error) {
|
||||
|
||||
modelDao, err := dao.InitDao("sqlite", baseconst.RELATIONAL_DATASOURCE_PATH)
|
||||
if err != nil {
|
||||
@ -291,7 +293,7 @@ func NewServer(serverOptions *ServerOptions, web *web.Web) (*Server, error) {
|
||||
usageManager: usageManager,
|
||||
}
|
||||
|
||||
httpServer, err := s.createPublicServer(apiHandler, web)
|
||||
httpServer, err := s.createPublicServer(apiHandler, serverOptions.SigNoz.Web)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -20,7 +20,7 @@ import (
|
||||
baseconst "go.signoz.io/signoz/pkg/query-service/constants"
|
||||
"go.signoz.io/signoz/pkg/query-service/migrate"
|
||||
"go.signoz.io/signoz/pkg/query-service/version"
|
||||
signozweb "go.signoz.io/signoz/pkg/web"
|
||||
"go.signoz.io/signoz/pkg/signoz"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
|
||||
@ -148,12 +148,13 @@ func main() {
|
||||
zap.L().Fatal("Failed to create config", zap.Error(err))
|
||||
}
|
||||
|
||||
web, err := signozweb.New(zap.L(), config.Web)
|
||||
if err != nil && !skipWebFrontend {
|
||||
zap.L().Fatal("Failed to create web", zap.Error(err))
|
||||
signoz, err := signoz.New(config, skipWebFrontend)
|
||||
if err != nil {
|
||||
zap.L().Fatal("Failed to create signoz struct", zap.Error(err))
|
||||
}
|
||||
|
||||
serverOptions := &app.ServerOptions{
|
||||
SigNoz: signoz,
|
||||
HTTPHostPort: baseconst.HTTPHostPort,
|
||||
PromConfigPath: promConfigPath,
|
||||
SkipTopLvlOpsPath: skipTopLvlOpsPath,
|
||||
@ -188,7 +189,7 @@ func main() {
|
||||
zap.L().Info("Migration successful")
|
||||
}
|
||||
|
||||
server, err := app.NewServer(serverOptions, web)
|
||||
server, err := app.NewServer(serverOptions)
|
||||
if err != nil {
|
||||
zap.L().Fatal("Failed to create server", zap.Error(err))
|
||||
}
|
||||
|
71
pkg/cache/cache.go
vendored
Normal file
71
pkg/cache/cache.go
vendored
Normal file
@ -0,0 +1,71 @@
|
||||
package cache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"time"
|
||||
)
|
||||
|
||||
// cacheable entity
|
||||
type CacheableEntity interface {
|
||||
encoding.BinaryMarshaler
|
||||
encoding.BinaryUnmarshaler
|
||||
}
|
||||
|
||||
func WrapCacheableEntityErrors(rt reflect.Type, caller string) error {
|
||||
if rt == nil {
|
||||
return fmt.Errorf("%s: (nil)", caller)
|
||||
}
|
||||
|
||||
if rt.Kind() != reflect.Pointer {
|
||||
return fmt.Errorf("%s: (non-pointer \"%s\")", caller, rt.String())
|
||||
}
|
||||
|
||||
return fmt.Errorf("%s: (nil \"%s\")", caller, rt.String())
|
||||
|
||||
}
|
||||
|
||||
// cache status
|
||||
type RetrieveStatus int
|
||||
|
||||
const (
|
||||
RetrieveStatusHit = RetrieveStatus(iota)
|
||||
RetrieveStatusPartialHit
|
||||
RetrieveStatusRangeMiss
|
||||
RetrieveStatusKeyMiss
|
||||
RetrieveStatusRevalidated
|
||||
|
||||
RetrieveStatusError
|
||||
)
|
||||
|
||||
func (s RetrieveStatus) String() string {
|
||||
switch s {
|
||||
case RetrieveStatusHit:
|
||||
return "hit"
|
||||
case RetrieveStatusPartialHit:
|
||||
return "partial hit"
|
||||
case RetrieveStatusRangeMiss:
|
||||
return "range miss"
|
||||
case RetrieveStatusKeyMiss:
|
||||
return "key miss"
|
||||
case RetrieveStatusRevalidated:
|
||||
return "revalidated"
|
||||
case RetrieveStatusError:
|
||||
return "error"
|
||||
default:
|
||||
return "unknown"
|
||||
}
|
||||
}
|
||||
|
||||
// cache interface
|
||||
type Cache interface {
|
||||
Connect(ctx context.Context) error
|
||||
Store(ctx context.Context, cacheKey string, data CacheableEntity, ttl time.Duration) error
|
||||
Retrieve(ctx context.Context, cacheKey string, dest CacheableEntity, allowExpired bool) (RetrieveStatus, error)
|
||||
SetTTL(ctx context.Context, cacheKey string, ttl time.Duration)
|
||||
Remove(ctx context.Context, cacheKey string)
|
||||
BulkRemove(ctx context.Context, cacheKeys []string)
|
||||
Close(ctx context.Context) error
|
||||
}
|
49
pkg/cache/config.go
vendored
Normal file
49
pkg/cache/config.go
vendored
Normal file
@ -0,0 +1,49 @@
|
||||
package cache
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
go_cache "github.com/patrickmn/go-cache"
|
||||
"go.signoz.io/signoz/pkg/confmap"
|
||||
)
|
||||
|
||||
// Config satisfies the confmap.Config interface
|
||||
var _ confmap.Config = (*Config)(nil)
|
||||
|
||||
type Memory struct {
|
||||
TTL time.Duration `mapstructure:"ttl"`
|
||||
CleanupInterval time.Duration `mapstructure:"cleanupInterval"`
|
||||
}
|
||||
|
||||
type Redis struct {
|
||||
Host string `mapstructure:"host"`
|
||||
Port int `mapstructure:"port"`
|
||||
Password string `mapstructure:"password"`
|
||||
DB int `mapstructure:"db"`
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
Provider string `mapstructure:"provider"`
|
||||
Memory Memory `mapstructure:"memory"`
|
||||
Redis Redis `mapstructure:"redis"`
|
||||
}
|
||||
|
||||
func (c *Config) NewWithDefaults() confmap.Config {
|
||||
return &Config{
|
||||
Provider: "memory",
|
||||
Memory: Memory{
|
||||
TTL: go_cache.NoExpiration,
|
||||
CleanupInterval: 1 * time.Minute,
|
||||
},
|
||||
Redis: Redis{
|
||||
Host: "localhost",
|
||||
Port: 6379,
|
||||
Password: "",
|
||||
DB: 0,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Config) Validate() error {
|
||||
return nil
|
||||
}
|
96
pkg/cache/strategy/memory/memory.go
vendored
Normal file
96
pkg/cache/strategy/memory/memory.go
vendored
Normal file
@ -0,0 +1,96 @@
|
||||
package memory
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"time"
|
||||
|
||||
go_cache "github.com/patrickmn/go-cache"
|
||||
_cache "go.signoz.io/signoz/pkg/cache"
|
||||
)
|
||||
|
||||
type cache struct {
|
||||
cc *go_cache.Cache
|
||||
}
|
||||
|
||||
func New(opts *_cache.Memory) *cache {
|
||||
return &cache{cc: go_cache.New(opts.TTL, opts.CleanupInterval)}
|
||||
}
|
||||
|
||||
// Connect does nothing
|
||||
func (c *cache) Connect(_ context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Store stores the data in the cache
|
||||
func (c *cache) Store(_ context.Context, cacheKey string, data _cache.CacheableEntity, ttl time.Duration) error {
|
||||
// check if the data being passed is a pointer and is not nil
|
||||
rv := reflect.ValueOf(data)
|
||||
if rv.Kind() != reflect.Pointer || rv.IsNil() {
|
||||
return _cache.WrapCacheableEntityErrors(reflect.TypeOf(data), "inmemory")
|
||||
}
|
||||
|
||||
c.cc.Set(cacheKey, data, ttl)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Retrieve retrieves the data from the cache
|
||||
func (c *cache) Retrieve(_ context.Context, cacheKey string, dest _cache.CacheableEntity, allowExpired bool) (_cache.RetrieveStatus, error) {
|
||||
// check if the destination being passed is a pointer and is not nil
|
||||
dstv := reflect.ValueOf(dest)
|
||||
if dstv.Kind() != reflect.Pointer || dstv.IsNil() {
|
||||
return _cache.RetrieveStatusError, _cache.WrapCacheableEntityErrors(reflect.TypeOf(dest), "inmemory")
|
||||
}
|
||||
|
||||
// check if the destination value is settable
|
||||
if !dstv.Elem().CanSet() {
|
||||
return _cache.RetrieveStatusError, fmt.Errorf("destination value is not settable, %s", dstv.Elem())
|
||||
}
|
||||
|
||||
data, found := c.cc.Get(cacheKey)
|
||||
if !found {
|
||||
return _cache.RetrieveStatusKeyMiss, nil
|
||||
}
|
||||
|
||||
// check the type compatbility between the src and dest
|
||||
srcv := reflect.ValueOf(data)
|
||||
if !srcv.Type().AssignableTo(dstv.Type()) {
|
||||
return _cache.RetrieveStatusError, fmt.Errorf("src type is not assignable to dst type")
|
||||
}
|
||||
|
||||
// set the value to from src to dest
|
||||
dstv.Elem().Set(srcv.Elem())
|
||||
return _cache.RetrieveStatusHit, nil
|
||||
}
|
||||
|
||||
// SetTTL sets the TTL for the cache entry
|
||||
func (c *cache) SetTTL(_ context.Context, cacheKey string, ttl time.Duration) {
|
||||
item, found := c.cc.Get(cacheKey)
|
||||
if !found {
|
||||
return
|
||||
}
|
||||
c.cc.Replace(cacheKey, item, ttl)
|
||||
}
|
||||
|
||||
// Remove removes the cache entry
|
||||
func (c *cache) Remove(_ context.Context, cacheKey string) {
|
||||
c.cc.Delete(cacheKey)
|
||||
}
|
||||
|
||||
// BulkRemove removes the cache entries
|
||||
func (c *cache) BulkRemove(_ context.Context, cacheKeys []string) {
|
||||
for _, cacheKey := range cacheKeys {
|
||||
c.cc.Delete(cacheKey)
|
||||
}
|
||||
}
|
||||
|
||||
// Close does nothing
|
||||
func (c *cache) Close(_ context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Configuration returns the cache configuration
|
||||
func (c *cache) Configuration() *_cache.Memory {
|
||||
return nil
|
||||
}
|
264
pkg/cache/strategy/memory/memory_test.go
vendored
Normal file
264
pkg/cache/strategy/memory/memory_test.go
vendored
Normal file
@ -0,0 +1,264 @@
|
||||
package memory
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
_cache "go.signoz.io/signoz/pkg/cache"
|
||||
)
|
||||
|
||||
// TestNew tests the New function
|
||||
func TestNew(t *testing.T) {
|
||||
opts := &_cache.Memory{
|
||||
TTL: 10 * time.Second,
|
||||
CleanupInterval: 10 * time.Second,
|
||||
}
|
||||
c := New(opts)
|
||||
assert.NotNil(t, c)
|
||||
assert.NotNil(t, c.cc)
|
||||
assert.NoError(t, c.Connect(context.Background()))
|
||||
}
|
||||
|
||||
type CacheableEntity struct {
|
||||
Key string
|
||||
Value int
|
||||
Expiry time.Duration
|
||||
}
|
||||
|
||||
func (ce CacheableEntity) MarshalBinary() ([]byte, error) {
|
||||
return json.Marshal(ce)
|
||||
}
|
||||
|
||||
func (ce CacheableEntity) UnmarshalBinary(data []byte) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
type DCacheableEntity struct {
|
||||
Key string
|
||||
Value int
|
||||
Expiry time.Duration
|
||||
}
|
||||
|
||||
func (dce DCacheableEntity) MarshalBinary() ([]byte, error) {
|
||||
return json.Marshal(dce)
|
||||
}
|
||||
|
||||
func (dce DCacheableEntity) UnmarshalBinary(data []byte) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// TestStore tests the Store function
|
||||
// this should fail because of nil pointer error
|
||||
func TestStoreWithNilPointer(t *testing.T) {
|
||||
opts := &_cache.Memory{
|
||||
TTL: 10 * time.Second,
|
||||
CleanupInterval: 10 * time.Second,
|
||||
}
|
||||
c := New(opts)
|
||||
var storeCacheableEntity *CacheableEntity
|
||||
assert.Error(t, c.Store(context.Background(), "key", storeCacheableEntity, 10*time.Second))
|
||||
}
|
||||
|
||||
// this should fail because of no pointer error
|
||||
func TestStoreWithStruct(t *testing.T) {
|
||||
opts := &_cache.Memory{
|
||||
TTL: 10 * time.Second,
|
||||
CleanupInterval: 10 * time.Second,
|
||||
}
|
||||
c := New(opts)
|
||||
var storeCacheableEntity CacheableEntity
|
||||
assert.Error(t, c.Store(context.Background(), "key", storeCacheableEntity, 10*time.Second))
|
||||
}
|
||||
|
||||
func TestStoreWithNonNilPointer(t *testing.T) {
|
||||
opts := &_cache.Memory{
|
||||
TTL: 10 * time.Second,
|
||||
CleanupInterval: 10 * time.Second,
|
||||
}
|
||||
c := New(opts)
|
||||
storeCacheableEntity := &CacheableEntity{
|
||||
Key: "some-random-key",
|
||||
Value: 1,
|
||||
Expiry: time.Microsecond,
|
||||
}
|
||||
assert.NoError(t, c.Store(context.Background(), "key", storeCacheableEntity, 10*time.Second))
|
||||
}
|
||||
|
||||
// TestRetrieve tests the Retrieve function
|
||||
func TestRetrieveWithNilPointer(t *testing.T) {
|
||||
opts := &_cache.Memory{
|
||||
TTL: 10 * time.Second,
|
||||
CleanupInterval: 10 * time.Second,
|
||||
}
|
||||
c := New(opts)
|
||||
storeCacheableEntity := &CacheableEntity{
|
||||
Key: "some-random-key",
|
||||
Value: 1,
|
||||
Expiry: time.Microsecond,
|
||||
}
|
||||
assert.NoError(t, c.Store(context.Background(), "key", storeCacheableEntity, 10*time.Second))
|
||||
|
||||
var retrieveCacheableEntity *CacheableEntity
|
||||
|
||||
retrieveStatus, err := c.Retrieve(context.Background(), "key", retrieveCacheableEntity, false)
|
||||
assert.Error(t, err)
|
||||
assert.Equal(t, retrieveStatus, _cache.RetrieveStatusError)
|
||||
}
|
||||
|
||||
func TestRetrieveWitNonPointer(t *testing.T) {
|
||||
opts := &_cache.Memory{
|
||||
TTL: 10 * time.Second,
|
||||
CleanupInterval: 10 * time.Second,
|
||||
}
|
||||
c := New(opts)
|
||||
storeCacheableEntity := &CacheableEntity{
|
||||
Key: "some-random-key",
|
||||
Value: 1,
|
||||
Expiry: time.Microsecond,
|
||||
}
|
||||
assert.NoError(t, c.Store(context.Background(), "key", storeCacheableEntity, 10*time.Second))
|
||||
|
||||
var retrieveCacheableEntity CacheableEntity
|
||||
|
||||
retrieveStatus, err := c.Retrieve(context.Background(), "key", retrieveCacheableEntity, false)
|
||||
assert.Error(t, err)
|
||||
assert.Equal(t, retrieveStatus, _cache.RetrieveStatusError)
|
||||
}
|
||||
|
||||
func TestRetrieveWithDifferentTypes(t *testing.T) {
|
||||
opts := &_cache.Memory{
|
||||
TTL: 10 * time.Second,
|
||||
CleanupInterval: 10 * time.Second,
|
||||
}
|
||||
c := New(opts)
|
||||
storeCacheableEntity := &CacheableEntity{
|
||||
Key: "some-random-key",
|
||||
Value: 1,
|
||||
Expiry: time.Microsecond,
|
||||
}
|
||||
assert.NoError(t, c.Store(context.Background(), "key", storeCacheableEntity, 10*time.Second))
|
||||
|
||||
retrieveCacheableEntity := new(DCacheableEntity)
|
||||
retrieveStatus, err := c.Retrieve(context.Background(), "key", retrieveCacheableEntity, false)
|
||||
assert.Error(t, err)
|
||||
assert.Equal(t, retrieveStatus, _cache.RetrieveStatusError)
|
||||
}
|
||||
|
||||
func TestRetrieveWithSameTypes(t *testing.T) {
|
||||
opts := &_cache.Memory{
|
||||
TTL: 10 * time.Second,
|
||||
CleanupInterval: 10 * time.Second,
|
||||
}
|
||||
c := New(opts)
|
||||
storeCacheableEntity := &CacheableEntity{
|
||||
Key: "some-random-key",
|
||||
Value: 1,
|
||||
Expiry: time.Microsecond,
|
||||
}
|
||||
assert.NoError(t, c.Store(context.Background(), "key", storeCacheableEntity, 10*time.Second))
|
||||
|
||||
retrieveCacheableEntity := new(CacheableEntity)
|
||||
retrieveStatus, err := c.Retrieve(context.Background(), "key", retrieveCacheableEntity, false)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, retrieveStatus, _cache.RetrieveStatusHit)
|
||||
assert.Equal(t, storeCacheableEntity, retrieveCacheableEntity)
|
||||
}
|
||||
|
||||
// TestSetTTL tests the SetTTL function
|
||||
func TestSetTTL(t *testing.T) {
|
||||
c := New(&_cache.Memory{TTL: 10 * time.Second, CleanupInterval: 1 * time.Second})
|
||||
storeCacheableEntity := &CacheableEntity{
|
||||
Key: "some-random-key",
|
||||
Value: 1,
|
||||
Expiry: time.Microsecond,
|
||||
}
|
||||
retrieveCacheableEntity := new(CacheableEntity)
|
||||
assert.NoError(t, c.Store(context.Background(), "key", storeCacheableEntity, 2*time.Second))
|
||||
time.Sleep(3 * time.Second)
|
||||
retrieveStatus, err := c.Retrieve(context.Background(), "key", retrieveCacheableEntity, false)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, retrieveStatus, _cache.RetrieveStatusKeyMiss)
|
||||
assert.Equal(t, new(CacheableEntity), retrieveCacheableEntity)
|
||||
|
||||
assert.NoError(t, c.Store(context.Background(), "key", storeCacheableEntity, 2*time.Second))
|
||||
c.SetTTL(context.Background(), "key", 4*time.Second)
|
||||
time.Sleep(3 * time.Second)
|
||||
retrieveStatus, err = c.Retrieve(context.Background(), "key", retrieveCacheableEntity, false)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, retrieveStatus, _cache.RetrieveStatusHit)
|
||||
assert.Equal(t, retrieveCacheableEntity, storeCacheableEntity)
|
||||
}
|
||||
|
||||
// TestRemove tests the Remove function
|
||||
func TestRemove(t *testing.T) {
|
||||
opts := &_cache.Memory{
|
||||
TTL: 10 * time.Second,
|
||||
CleanupInterval: 10 * time.Second,
|
||||
}
|
||||
c := New(opts)
|
||||
storeCacheableEntity := &CacheableEntity{
|
||||
Key: "some-random-key",
|
||||
Value: 1,
|
||||
Expiry: time.Microsecond,
|
||||
}
|
||||
retrieveCacheableEntity := new(CacheableEntity)
|
||||
assert.NoError(t, c.Store(context.Background(), "key", storeCacheableEntity, 10*time.Second))
|
||||
c.Remove(context.Background(), "key")
|
||||
|
||||
retrieveStatus, err := c.Retrieve(context.Background(), "key", retrieveCacheableEntity, false)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, retrieveStatus, _cache.RetrieveStatusKeyMiss)
|
||||
assert.Equal(t, new(CacheableEntity), retrieveCacheableEntity)
|
||||
}
|
||||
|
||||
// TestBulkRemove tests the BulkRemove function
|
||||
func TestBulkRemove(t *testing.T) {
|
||||
opts := &_cache.Memory{
|
||||
TTL: 10 * time.Second,
|
||||
CleanupInterval: 10 * time.Second,
|
||||
}
|
||||
c := New(opts)
|
||||
storeCacheableEntity := &CacheableEntity{
|
||||
Key: "some-random-key",
|
||||
Value: 1,
|
||||
Expiry: time.Microsecond,
|
||||
}
|
||||
retrieveCacheableEntity := new(CacheableEntity)
|
||||
assert.NoError(t, c.Store(context.Background(), "key1", storeCacheableEntity, 10*time.Second))
|
||||
assert.NoError(t, c.Store(context.Background(), "key2", storeCacheableEntity, 10*time.Second))
|
||||
c.BulkRemove(context.Background(), []string{"key1", "key2"})
|
||||
|
||||
retrieveStatus, err := c.Retrieve(context.Background(), "key1", retrieveCacheableEntity, false)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, retrieveStatus, _cache.RetrieveStatusKeyMiss)
|
||||
assert.Equal(t, new(CacheableEntity), retrieveCacheableEntity)
|
||||
|
||||
retrieveStatus, err = c.Retrieve(context.Background(), "key2", retrieveCacheableEntity, false)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, retrieveStatus, _cache.RetrieveStatusKeyMiss)
|
||||
assert.Equal(t, new(CacheableEntity), retrieveCacheableEntity)
|
||||
}
|
||||
|
||||
// TestCache tests the cache
|
||||
func TestCache(t *testing.T) {
|
||||
opts := &_cache.Memory{
|
||||
TTL: 10 * time.Second,
|
||||
CleanupInterval: 10 * time.Second,
|
||||
}
|
||||
c := New(opts)
|
||||
storeCacheableEntity := &CacheableEntity{
|
||||
Key: "some-random-key",
|
||||
Value: 1,
|
||||
Expiry: time.Microsecond,
|
||||
}
|
||||
retrieveCacheableEntity := new(CacheableEntity)
|
||||
assert.NoError(t, c.Store(context.Background(), "key", storeCacheableEntity, 10*time.Second))
|
||||
retrieveStatus, err := c.Retrieve(context.Background(), "key", retrieveCacheableEntity, false)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, retrieveStatus, _cache.RetrieveStatusHit)
|
||||
assert.Equal(t, storeCacheableEntity, retrieveCacheableEntity)
|
||||
c.Remove(context.Background(), "key")
|
||||
}
|
120
pkg/cache/strategy/redis/redis.go
vendored
Normal file
120
pkg/cache/strategy/redis/redis.go
vendored
Normal file
@ -0,0 +1,120 @@
|
||||
package redis
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/go-redis/redis/v8"
|
||||
_cache "go.signoz.io/signoz/pkg/cache"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type cache struct {
|
||||
client *redis.Client
|
||||
opts *_cache.Redis
|
||||
}
|
||||
|
||||
func New(opts *_cache.Redis) *cache {
|
||||
return &cache{opts: opts}
|
||||
}
|
||||
|
||||
// WithClient creates a new cache with the given client
|
||||
func WithClient(client *redis.Client) *cache {
|
||||
return &cache{client: client}
|
||||
}
|
||||
|
||||
// Connect connects to the redis server
|
||||
func (c *cache) Connect(_ context.Context) error {
|
||||
c.client = redis.NewClient(&redis.Options{
|
||||
Addr: fmt.Sprintf("%s:%d", c.opts.Host, c.opts.Port),
|
||||
Password: c.opts.Password,
|
||||
DB: c.opts.DB,
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
// Store stores the data in the cache
|
||||
func (c *cache) Store(ctx context.Context, cacheKey string, data _cache.CacheableEntity, ttl time.Duration) error {
|
||||
return c.client.Set(ctx, cacheKey, data, ttl).Err()
|
||||
}
|
||||
|
||||
// Retrieve retrieves the data from the cache
|
||||
func (c *cache) Retrieve(ctx context.Context, cacheKey string, dest _cache.CacheableEntity, allowExpired bool) (_cache.RetrieveStatus, error) {
|
||||
err := c.client.Get(ctx, cacheKey).Scan(dest)
|
||||
if err != nil {
|
||||
if errors.Is(err, redis.Nil) {
|
||||
return _cache.RetrieveStatusKeyMiss, nil
|
||||
}
|
||||
return _cache.RetrieveStatusError, err
|
||||
}
|
||||
return _cache.RetrieveStatusHit, nil
|
||||
}
|
||||
|
||||
// SetTTL sets the TTL for the cache entry
|
||||
func (c *cache) SetTTL(ctx context.Context, cacheKey string, ttl time.Duration) {
|
||||
err := c.client.Expire(ctx, cacheKey, ttl).Err()
|
||||
if err != nil {
|
||||
zap.L().Error("error setting TTL for cache key", zap.String("cacheKey", cacheKey), zap.Duration("ttl", ttl), zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
// Remove removes the cache entry
|
||||
func (c *cache) Remove(ctx context.Context, cacheKey string) {
|
||||
c.BulkRemove(ctx, []string{cacheKey})
|
||||
}
|
||||
|
||||
// BulkRemove removes the cache entries
|
||||
func (c *cache) BulkRemove(ctx context.Context, cacheKeys []string) {
|
||||
if err := c.client.Del(ctx, cacheKeys...).Err(); err != nil {
|
||||
zap.L().Error("error deleting cache keys", zap.Strings("cacheKeys", cacheKeys), zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
// Close closes the connection to the redis server
|
||||
func (c *cache) Close(_ context.Context) error {
|
||||
return c.client.Close()
|
||||
}
|
||||
|
||||
// Ping pings the redis server
|
||||
func (c *cache) Ping(ctx context.Context) error {
|
||||
return c.client.Ping(ctx).Err()
|
||||
}
|
||||
|
||||
// GetClient returns the redis client
|
||||
func (c *cache) GetClient() *redis.Client {
|
||||
return c.client
|
||||
}
|
||||
|
||||
// GetOptions returns the options
|
||||
func (c *cache) GetOptions() *_cache.Redis {
|
||||
return c.opts
|
||||
}
|
||||
|
||||
// GetTTL returns the TTL for the cache entry
|
||||
func (c *cache) GetTTL(ctx context.Context, cacheKey string) time.Duration {
|
||||
ttl, err := c.client.TTL(ctx, cacheKey).Result()
|
||||
if err != nil {
|
||||
zap.L().Error("error getting TTL for cache key", zap.String("cacheKey", cacheKey), zap.Error(err))
|
||||
}
|
||||
return ttl
|
||||
}
|
||||
|
||||
// GetKeys returns the keys matching the pattern
|
||||
func (c *cache) GetKeys(ctx context.Context, pattern string) ([]string, error) {
|
||||
return c.client.Keys(ctx, pattern).Result()
|
||||
}
|
||||
|
||||
// GetKeysWithTTL returns the keys matching the pattern with their TTL
|
||||
func (c *cache) GetKeysWithTTL(ctx context.Context, pattern string) (map[string]time.Duration, error) {
|
||||
keys, err := c.GetKeys(ctx, pattern)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result := make(map[string]time.Duration)
|
||||
for _, key := range keys {
|
||||
result[key] = c.GetTTL(ctx, key)
|
||||
}
|
||||
return result, nil
|
||||
}
|
139
pkg/cache/strategy/redis/redis_test.go
vendored
Normal file
139
pkg/cache/strategy/redis/redis_test.go
vendored
Normal file
@ -0,0 +1,139 @@
|
||||
package redis
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/go-redis/redismock/v8"
|
||||
"github.com/stretchr/testify/assert"
|
||||
_cache "go.signoz.io/signoz/pkg/cache"
|
||||
)
|
||||
|
||||
type CacheableEntity struct {
|
||||
Key string
|
||||
Value int
|
||||
Expiry time.Duration
|
||||
}
|
||||
|
||||
func (ce *CacheableEntity) MarshalBinary() ([]byte, error) {
|
||||
return json.Marshal(ce)
|
||||
}
|
||||
|
||||
func (ce *CacheableEntity) UnmarshalBinary(data []byte) error {
|
||||
return json.Unmarshal(data, ce)
|
||||
}
|
||||
|
||||
func TestStore(t *testing.T) {
|
||||
db, mock := redismock.NewClientMock()
|
||||
cache := WithClient(db)
|
||||
storeCacheableEntity := &CacheableEntity{
|
||||
Key: "some-random-key",
|
||||
Value: 1,
|
||||
Expiry: time.Microsecond,
|
||||
}
|
||||
|
||||
mock.ExpectSet("key", storeCacheableEntity, 10*time.Second).RedisNil()
|
||||
cache.Store(context.Background(), "key", storeCacheableEntity, 10*time.Second)
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("there were unfulfilled expectations: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRetrieve(t *testing.T) {
|
||||
db, mock := redismock.NewClientMock()
|
||||
cache := WithClient(db)
|
||||
storeCacheableEntity := &CacheableEntity{
|
||||
Key: "some-random-key",
|
||||
Value: 1,
|
||||
Expiry: time.Microsecond,
|
||||
}
|
||||
retrieveCacheableEntity := new(CacheableEntity)
|
||||
|
||||
mock.ExpectSet("key", storeCacheableEntity, 10*time.Second).RedisNil()
|
||||
cache.Store(context.Background(), "key", storeCacheableEntity, 10*time.Second)
|
||||
|
||||
data, err := storeCacheableEntity.MarshalBinary()
|
||||
assert.NoError(t, err)
|
||||
|
||||
mock.ExpectGet("key").SetVal(string(data))
|
||||
retrieveStatus, err := cache.Retrieve(context.Background(), "key", retrieveCacheableEntity, false)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %s", err)
|
||||
}
|
||||
|
||||
if retrieveStatus != _cache.RetrieveStatusHit {
|
||||
t.Errorf("expected status %d, got %d", _cache.RetrieveStatusHit, retrieveStatus)
|
||||
}
|
||||
|
||||
assert.Equal(t, storeCacheableEntity, retrieveCacheableEntity)
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("there were unfulfilled expectations: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSetTTL(t *testing.T) {
|
||||
db, mock := redismock.NewClientMock()
|
||||
cache := WithClient(db)
|
||||
storeCacheableEntity := &CacheableEntity{
|
||||
Key: "some-random-key",
|
||||
Value: 1,
|
||||
Expiry: time.Microsecond,
|
||||
}
|
||||
|
||||
mock.ExpectSet("key", storeCacheableEntity, 10*time.Second).RedisNil()
|
||||
cache.Store(context.Background(), "key", storeCacheableEntity, 10*time.Second)
|
||||
|
||||
mock.ExpectExpire("key", 4*time.Second).RedisNil()
|
||||
cache.SetTTL(context.Background(), "key", 4*time.Second)
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("there were unfulfilled expectations: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRemove(t *testing.T) {
|
||||
db, mock := redismock.NewClientMock()
|
||||
c := WithClient(db)
|
||||
storeCacheableEntity := &CacheableEntity{
|
||||
Key: "some-random-key",
|
||||
Value: 1,
|
||||
Expiry: time.Microsecond,
|
||||
}
|
||||
|
||||
mock.ExpectSet("key", storeCacheableEntity, 10*time.Second).RedisNil()
|
||||
c.Store(context.Background(), "key", storeCacheableEntity, 10*time.Second)
|
||||
|
||||
mock.ExpectDel("key").RedisNil()
|
||||
c.Remove(context.Background(), "key")
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("there were unfulfilled expectations: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBulkRemove(t *testing.T) {
|
||||
db, mock := redismock.NewClientMock()
|
||||
c := WithClient(db)
|
||||
storeCacheableEntity := &CacheableEntity{
|
||||
Key: "some-random-key",
|
||||
Value: 1,
|
||||
Expiry: time.Microsecond,
|
||||
}
|
||||
|
||||
mock.ExpectSet("key", storeCacheableEntity, 10*time.Second).RedisNil()
|
||||
c.Store(context.Background(), "key", storeCacheableEntity, 10*time.Second)
|
||||
|
||||
mock.ExpectSet("key2", storeCacheableEntity, 10*time.Second).RedisNil()
|
||||
c.Store(context.Background(), "key2", storeCacheableEntity, 10*time.Second)
|
||||
|
||||
mock.ExpectDel("key", "key2").RedisNil()
|
||||
c.BulkRemove(context.Background(), []string{"key", "key2"})
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("there were unfulfilled expectations: %s", err)
|
||||
}
|
||||
}
|
@ -3,6 +3,7 @@ package config
|
||||
import (
|
||||
"context"
|
||||
|
||||
"go.signoz.io/signoz/pkg/cache"
|
||||
signozconfmap "go.signoz.io/signoz/pkg/confmap"
|
||||
"go.signoz.io/signoz/pkg/instrumentation"
|
||||
"go.signoz.io/signoz/pkg/web"
|
||||
@ -13,6 +14,7 @@ var (
|
||||
defaults = map[string]signozconfmap.Config{
|
||||
"instrumentation": &instrumentation.Config{},
|
||||
"web": &web.Config{},
|
||||
"cache": &cache.Config{},
|
||||
}
|
||||
)
|
||||
|
||||
@ -20,6 +22,7 @@ var (
|
||||
type Config struct {
|
||||
Instrumentation instrumentation.Config `mapstructure:"instrumentation"`
|
||||
Web web.Config `mapstructure:"web"`
|
||||
Cache cache.Config `mapstructure:"cache"`
|
||||
}
|
||||
|
||||
func New(ctx context.Context, settings ProviderSettings) (*Config, error) {
|
||||
|
@ -3,22 +3,22 @@ package config
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.opentelemetry.io/collector/confmap"
|
||||
contribsdkconfig "go.opentelemetry.io/contrib/config"
|
||||
"go.signoz.io/signoz/pkg/cache"
|
||||
"go.signoz.io/signoz/pkg/confmap/provider/signozenvprovider"
|
||||
"go.signoz.io/signoz/pkg/instrumentation"
|
||||
"go.signoz.io/signoz/pkg/web"
|
||||
)
|
||||
|
||||
func TestNewWithSignozEnvProvider(t *testing.T) {
|
||||
t.Setenv("SIGNOZ__INSTRUMENTATION__LOGS__ENABLED", "true")
|
||||
t.Setenv("SIGNOZ__INSTRUMENTATION__LOGS__PROCESSORS__BATCH__EXPORTER__OTLP__ENDPOINT", "0.0.0.0:4317")
|
||||
t.Setenv("SIGNOZ__INSTRUMENTATION__LOGS__PROCESSORS__BATCH__EXPORT_TIMEOUT", "10")
|
||||
|
||||
t.Setenv("SIGNOZ__WEB__PREFIX", "/web")
|
||||
t.Setenv("SIGNOZ__WEB__DIRECTORY", "/build")
|
||||
t.Setenv("SIGNOZ__CACHE__PROVIDER", "redis")
|
||||
t.Setenv("SIGNOZ__CACHE__REDIS__HOST", "127.0.0.1")
|
||||
|
||||
config, err := New(context.Background(), ProviderSettings{
|
||||
ResolverSettings: confmap.ResolverSettings{
|
||||
@ -30,31 +30,24 @@ func TestNewWithSignozEnvProvider(t *testing.T) {
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
i := 10
|
||||
expected := &Config{
|
||||
Instrumentation: instrumentation.Config{
|
||||
Logs: instrumentation.LogsConfig{
|
||||
Enabled: true,
|
||||
LoggerProvider: contribsdkconfig.LoggerProvider{
|
||||
Processors: []contribsdkconfig.LogRecordProcessor{
|
||||
{
|
||||
Batch: &contribsdkconfig.BatchLogRecordProcessor{
|
||||
ExportTimeout: &i,
|
||||
Exporter: contribsdkconfig.LogRecordExporter{
|
||||
OTLP: &contribsdkconfig.OTLP{
|
||||
Endpoint: "0.0.0.0:4317",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Web: web.Config{
|
||||
Prefix: "/web",
|
||||
Directory: "/build",
|
||||
},
|
||||
Cache: cache.Config{
|
||||
Provider: "redis",
|
||||
Memory: cache.Memory{
|
||||
TTL: time.Duration(-1),
|
||||
CleanupInterval: 1 * time.Minute,
|
||||
},
|
||||
Redis: cache.Redis{
|
||||
Host: "127.0.0.1",
|
||||
Port: 6379,
|
||||
Password: "",
|
||||
DB: 0,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
assert.Equal(t, expected, config)
|
||||
|
37
pkg/signoz/signoz.go
Normal file
37
pkg/signoz/signoz.go
Normal file
@ -0,0 +1,37 @@
|
||||
package signoz
|
||||
|
||||
import (
|
||||
"go.signoz.io/signoz/pkg/cache"
|
||||
"go.signoz.io/signoz/pkg/cache/strategy/memory"
|
||||
"go.signoz.io/signoz/pkg/cache/strategy/redis"
|
||||
"go.signoz.io/signoz/pkg/config"
|
||||
"go.signoz.io/signoz/pkg/web"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type SigNoz struct {
|
||||
Cache cache.Cache
|
||||
Web *web.Web
|
||||
}
|
||||
|
||||
func New(config *config.Config, skipWebFrontend bool) (*SigNoz, error) {
|
||||
var cache cache.Cache
|
||||
|
||||
// init for the cache
|
||||
switch config.Cache.Provider {
|
||||
case "memory":
|
||||
cache = memory.New(&config.Cache.Memory)
|
||||
case "redis":
|
||||
cache = redis.New(&config.Cache.Redis)
|
||||
}
|
||||
|
||||
web, err := web.New(zap.L(), config.Web)
|
||||
if err != nil && !skipWebFrontend {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &SigNoz{
|
||||
Cache: cache,
|
||||
Web: web,
|
||||
}, nil
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user