feat(zeus): add zeus package (#7745)

* feat(zeus): add zeus package

* feat(signoz): add DI for zeus

* feat(zeus): integrate with the codebase

* ci(make): change makefile

* ci: change workflows to point to the new zeus url

* Update ee/query-service/usage/manager.go

Co-authored-by: ellipsis-dev[bot] <65095814+ellipsis-dev[bot]@users.noreply.github.com>

* Update ee/query-service/license/manager.go

Co-authored-by: ellipsis-dev[bot] <65095814+ellipsis-dev[bot]@users.noreply.github.com>

* fix: fix nil retriable

* fix: fix zeus DI

* fix: fix path of ldflag

* feat(zeus): added zeus integration tests

* feat(zeus): added zeus integration tests

* feat(zeus): format the pytest

---------

Co-authored-by: ellipsis-dev[bot] <65095814+ellipsis-dev[bot]@users.noreply.github.com>
Co-authored-by: vikrantgupta25 <vikrant.thomso@gmail.com>
Co-authored-by: Vikrant Gupta <vikrant@signoz.io>
This commit is contained in:
Vibhu Pandey 2025-04-28 19:50:47 +05:30 committed by GitHub
parent 940313d28b
commit e73e1bd078
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 627 additions and 294 deletions

View File

@ -104,8 +104,8 @@ jobs:
-X github.com/SigNoz/signoz/pkg/version.hash=${{ needs.prepare.outputs.hash }}
-X github.com/SigNoz/signoz/pkg/version.time=${{ needs.prepare.outputs.time }}
-X github.com/SigNoz/signoz/pkg/version.branch=${{ needs.prepare.outputs.branch }}
-X github.com/SigNoz/signoz/ee/query-service/constants.ZeusURL=https://api.signoz.cloud
-X github.com/SigNoz/signoz/ee/query-service/constants.LicenseSignozIo=https://license.signoz.io/api/v1'
-X github.com/SigNoz/signoz/ee/zeus.url=https://api.signoz.cloud
-X github.com/SigNoz/signoz/ee/zeus.deprecatedURL=https://license.signoz.io'
GO_CGO_ENABLED: 1
DOCKER_BASE_IMAGES: '{"alpine": "alpine:3.20.3"}'
DOCKER_DOCKERFILE_PATH: ./ee/query-service/Dockerfile.multi-arch

View File

@ -101,8 +101,8 @@ jobs:
-X github.com/SigNoz/signoz/pkg/version.hash=${{ needs.prepare.outputs.hash }}
-X github.com/SigNoz/signoz/pkg/version.time=${{ needs.prepare.outputs.time }}
-X github.com/SigNoz/signoz/pkg/version.branch=${{ needs.prepare.outputs.branch }}
-X github.com/SigNoz/signoz/ee/query-service/constants.ZeusURL=https://api.staging.signoz.cloud
-X github.com/SigNoz/signoz/ee/query-service/constants.LicenseSignozIo=https://license.staging.signoz.cloud/api/v1'
-X github.com/SigNoz/signoz/ee/zeus.url=https://api.staging.signoz.cloud
-X github.com/SigNoz/signoz/ee/zeus.deprecatedURL=https://license.staging.signoz.cloud'
GO_CGO_ENABLED: 1
DOCKER_BASE_IMAGES: '{"alpine": "alpine:3.20.3"}'
DOCKER_DOCKERFILE_PATH: ./ee/query-service/Dockerfile.multi-arch

View File

@ -14,9 +14,9 @@ ARCHS ?= amd64 arm64
TARGET_DIR ?= $(shell pwd)/target
ZEUS_URL ?= https://api.signoz.cloud
GO_BUILD_LDFLAG_ZEUS_URL = -X github.com/SigNoz/signoz/ee/query-service/constants.ZeusURL=$(ZEUS_URL)
LICENSE_URL ?= https://license.signoz.io/api/v1
GO_BUILD_LDFLAG_LICENSE_SIGNOZ_IO = -X github.com/SigNoz/signoz/ee/query-service/constants.LicenseSignozIo=$(LICENSE_URL)
GO_BUILD_LDFLAG_ZEUS_URL = -X github.com/SigNoz/signoz/ee/zeus.url=$(ZEUS_URL)
LICENSE_URL ?= https://license.signoz.io
GO_BUILD_LDFLAG_LICENSE_SIGNOZ_IO = -X github.com/SigNoz/signoz/ee/zeus.deprecatedURL=$(LICENSE_URL)
GO_BUILD_VERSION_LDFLAGS = -X github.com/SigNoz/signoz/pkg/version.version=$(VERSION) -X github.com/SigNoz/signoz/pkg/version.hash=$(COMMIT_SHORT_SHA) -X github.com/SigNoz/signoz/pkg/version.time=$(TIMESTAMP) -X github.com/SigNoz/signoz/pkg/version.branch=$(BRANCH_NAME)
GO_BUILD_ARCHS_COMMUNITY = $(addprefix go-build-community-,$(ARCHS))

View File

@ -35,8 +35,8 @@ builds:
- -X github.com/SigNoz/signoz/pkg/version.hash={{ .ShortCommit }}
- -X github.com/SigNoz/signoz/pkg/version.time={{ .CommitTimestamp }}
- -X github.com/SigNoz/signoz/pkg/version.branch={{ .Branch }}
- -X github.com/SigNoz/signoz/ee/query-service/constants.ZeusURL=https://api.signoz.cloud
- -X github.com/SigNoz/signoz/ee/query-service/constants.LicenseSignozIo=https://license.signoz.io/api/v1
- -X github.com/SigNoz/signoz/ee/zeus.url=https://api.signoz.cloud
- -X github.com/SigNoz/signoz/ee/zeus.deprecatedURL=https://license.signoz.io
- >-
{{- if eq .Os "linux" }}-linkmode external -extldflags '-static'{{- end }}
mod_timestamp: "{{ .CommitTimestamp }}"

View File

@ -9,6 +9,8 @@ import (
"github.com/SigNoz/signoz/ee/query-service/integrations/signozio"
"github.com/SigNoz/signoz/ee/query-service/model"
"github.com/SigNoz/signoz/pkg/http/render"
"github.com/SigNoz/signoz/pkg/query-service/telemetry"
"github.com/SigNoz/signoz/pkg/types/authtypes"
)
type DayWiseBreakdown struct {
@ -90,8 +92,13 @@ func (ah *APIHandler) getActiveLicenseV3(w http.ResponseWriter, r *http.Request)
// this function is called by zeus when inserting licenses in the query-service
func (ah *APIHandler) applyLicenseV3(w http.ResponseWriter, r *http.Request) {
var licenseKey ApplyLicenseRequest
claims, err := authtypes.ClaimsFromContext(r.Context())
if err != nil {
render.Error(w, err)
return
}
var licenseKey ApplyLicenseRequest
if err := json.NewDecoder(r.Body).Decode(&licenseKey); err != nil {
RespondError(w, model.BadRequest(err), nil)
return
@ -102,9 +109,10 @@ func (ah *APIHandler) applyLicenseV3(w http.ResponseWriter, r *http.Request) {
return
}
_, apiError := ah.LM().ActivateV3(r.Context(), licenseKey.LicenseKey)
if apiError != nil {
RespondError(w, apiError, nil)
_, err = ah.LM().ActivateV3(r.Context(), licenseKey.LicenseKey)
if err != nil {
telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_LICENSE_ACT_FAILED, map[string]interface{}{"err": err.Error()}, claims.Email, true, false)
render.Error(w, err)
return
}
@ -112,10 +120,9 @@ func (ah *APIHandler) applyLicenseV3(w http.ResponseWriter, r *http.Request) {
}
func (ah *APIHandler) refreshLicensesV3(w http.ResponseWriter, r *http.Request) {
apiError := ah.LM().RefreshLicense(r.Context())
if apiError != nil {
RespondError(w, apiError, nil)
err := ah.LM().RefreshLicense(r.Context())
if err != nil {
render.Error(w, err)
return
}
@ -127,7 +134,6 @@ func getCheckoutPortalResponse(redirectURL string) *Redirect {
}
func (ah *APIHandler) checkout(w http.ResponseWriter, r *http.Request) {
checkoutRequest := &model.CheckoutRequest{}
if err := json.NewDecoder(r.Body).Decode(checkoutRequest); err != nil {
RespondError(w, model.BadRequest(err), nil)
@ -140,9 +146,9 @@ func (ah *APIHandler) checkout(w http.ResponseWriter, r *http.Request) {
return
}
redirectUrl, err := signozio.CheckoutSession(r.Context(), checkoutRequest, license.Key)
redirectUrl, err := signozio.CheckoutSession(r.Context(), checkoutRequest, license.Key, ah.Signoz.Zeus)
if err != nil {
RespondError(w, err, nil)
render.Error(w, err)
return
}
@ -230,7 +236,6 @@ func (ah *APIHandler) listLicensesV2(w http.ResponseWriter, r *http.Request) {
}
func (ah *APIHandler) portalSession(w http.ResponseWriter, r *http.Request) {
portalRequest := &model.PortalRequest{}
if err := json.NewDecoder(r.Body).Decode(portalRequest); err != nil {
RespondError(w, model.BadRequest(err), nil)
@ -243,9 +248,9 @@ func (ah *APIHandler) portalSession(w http.ResponseWriter, r *http.Request) {
return
}
redirectUrl, err := signozio.PortalSession(r.Context(), portalRequest, license.Key)
redirectUrl, err := signozio.PortalSession(r.Context(), portalRequest, license.Key, ah.Signoz.Zeus)
if err != nil {
RespondError(w, err, nil)
render.Error(w, err)
return
}

View File

@ -114,7 +114,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
}
// initiate license manager
lm, err := licensepkg.StartManager(serverOptions.SigNoz.SQLStore.SQLxDB(), serverOptions.SigNoz.SQLStore)
lm, err := licensepkg.StartManager(serverOptions.SigNoz.SQLStore.SQLxDB(), serverOptions.SigNoz.SQLStore, serverOptions.SigNoz.Zeus)
if err != nil {
return nil, err
}
@ -201,7 +201,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
}
// start the usagemanager
usageManager, err := usage.New(modelDao, lm.GetRepo(), serverOptions.SigNoz.TelemetryStore.ClickhouseDB(), serverOptions.Config.TelemetryStore.Clickhouse.DSN)
usageManager, err := usage.New(modelDao, lm.GetRepo(), serverOptions.SigNoz.TelemetryStore.ClickhouseDB(), serverOptions.SigNoz.Zeus)
if err != nil {
return nil, err
}

View File

@ -1,16 +0,0 @@
package signozio
type status string
type ValidateLicenseResponse struct {
Status status `json:"status"`
Data map[string]interface{} `json:"data"`
}
type CheckoutSessionRedirect struct {
RedirectURL string `json:"url"`
}
type CheckoutResponse struct {
Status status `json:"status"`
Data CheckoutSessionRedirect `json:"data"`
}

View File

@ -1,222 +1,67 @@
package signozio
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"time"
"github.com/SigNoz/signoz/ee/query-service/constants"
"github.com/SigNoz/signoz/ee/query-service/model"
"github.com/pkg/errors"
"github.com/SigNoz/signoz/pkg/zeus"
"github.com/tidwall/gjson"
)
var C *Client
const (
POST = "POST"
APPLICATION_JSON = "application/json"
)
type Client struct {
Prefix string
GatewayUrl string
}
func New() *Client {
return &Client{
Prefix: constants.LicenseSignozIo,
GatewayUrl: constants.ZeusURL,
}
}
func init() {
C = New()
}
func ValidateLicenseV3(licenseKey string) (*model.LicenseV3, *model.ApiError) {
// Creating an HTTP client with a timeout for better control
client := &http.Client{
Timeout: 10 * time.Second,
}
req, err := http.NewRequest("GET", C.GatewayUrl+"/v2/licenses/me", nil)
if err != nil {
return nil, model.BadRequest(errors.Wrap(err, "failed to create request"))
}
// Setting the custom header
req.Header.Set("X-Signoz-Cloud-Api-Key", licenseKey)
response, err := client.Do(req)
if err != nil {
return nil, model.BadRequest(errors.Wrap(err, "failed to make post request"))
}
body, err := io.ReadAll(response.Body)
if err != nil {
return nil, model.BadRequest(errors.Wrap(err, fmt.Sprintf("failed to read validation response from %v", C.GatewayUrl)))
}
defer response.Body.Close()
switch response.StatusCode {
case 200:
a := ValidateLicenseResponse{}
err = json.Unmarshal(body, &a)
if err != nil {
return nil, model.BadRequest(errors.Wrap(err, "failed to marshal license validation response"))
}
license, err := model.NewLicenseV3(a.Data)
if err != nil {
return nil, model.BadRequest(errors.Wrap(err, "failed to generate new license v3"))
}
return license, nil
case 400:
return nil, model.BadRequest(errors.Wrap(fmt.Errorf(string(body)),
fmt.Sprintf("bad request error received from %v", C.GatewayUrl)))
case 401:
return nil, model.Unauthorized(errors.Wrap(fmt.Errorf(string(body)),
fmt.Sprintf("unauthorized request error received from %v", C.GatewayUrl)))
default:
return nil, model.InternalError(errors.Wrap(fmt.Errorf(string(body)),
fmt.Sprintf("internal request error received from %v", C.GatewayUrl)))
}
}
func NewPostRequestWithCtx(ctx context.Context, url string, contentType string, body io.Reader) (*http.Request, error) {
req, err := http.NewRequestWithContext(ctx, POST, url, body)
func ValidateLicenseV3(ctx context.Context, licenseKey string, zeus zeus.Zeus) (*model.LicenseV3, error) {
data, err := zeus.GetLicense(ctx, licenseKey)
if err != nil {
return nil, err
}
req.Header.Add("Content-Type", contentType)
return req, err
var m map[string]any
if err = json.Unmarshal(data, &m); err != nil {
return nil, err
}
license, err := model.NewLicenseV3(m)
if err != nil {
return nil, err
}
return license, nil
}
// SendUsage reports the usage of signoz to license server
func SendUsage(ctx context.Context, usage model.UsagePayload) *model.ApiError {
reqString, _ := json.Marshal(usage)
req, err := NewPostRequestWithCtx(ctx, C.Prefix+"/usage", APPLICATION_JSON, bytes.NewBuffer(reqString))
func SendUsage(ctx context.Context, usage model.UsagePayload, zeus zeus.Zeus) error {
body, err := json.Marshal(usage)
if err != nil {
return model.BadRequest(errors.Wrap(err, "unable to create http request"))
return err
}
res, err := http.DefaultClient.Do(req)
if err != nil {
return model.BadRequest(errors.Wrap(err, "unable to connect with license.signoz.io, please check your network connection"))
}
body, err := io.ReadAll(res.Body)
if err != nil {
return model.BadRequest(errors.Wrap(err, "failed to read usage response from license.signoz.io"))
}
defer res.Body.Close()
switch res.StatusCode {
case 200, 201:
return nil
case 400, 401:
return model.BadRequest(errors.Wrap(errors.New(string(body)),
"bad request error received from license.signoz.io"))
default:
return model.InternalError(errors.Wrap(errors.New(string(body)),
"internal error received from license.signoz.io"))
}
return zeus.PutMeters(ctx, usage.LicenseKey.String(), body)
}
func CheckoutSession(ctx context.Context, checkoutRequest *model.CheckoutRequest, licenseKey string) (string, *model.ApiError) {
hClient := &http.Client{}
reqString, err := json.Marshal(checkoutRequest)
func CheckoutSession(ctx context.Context, checkoutRequest *model.CheckoutRequest, licenseKey string, zeus zeus.Zeus) (string, error) {
body, err := json.Marshal(checkoutRequest)
if err != nil {
return "", model.BadRequest(err)
return "", err
}
req, err := http.NewRequestWithContext(ctx, "POST", C.GatewayUrl+"/v2/subscriptions/me/sessions/checkout", bytes.NewBuffer(reqString))
response, err := zeus.GetCheckoutURL(ctx, licenseKey, body)
if err != nil {
return "", model.BadRequest(err)
return "", err
}
req.Header.Set("X-Signoz-Cloud-Api-Key", licenseKey)
response, err := hClient.Do(req)
if err != nil {
return "", model.BadRequest(err)
}
body, err := io.ReadAll(response.Body)
if err != nil {
return "", model.BadRequest(errors.Wrap(err, fmt.Sprintf("failed to read checkout response from %v", C.GatewayUrl)))
}
defer response.Body.Close()
switch response.StatusCode {
case 201:
a := CheckoutResponse{}
err = json.Unmarshal(body, &a)
if err != nil {
return "", model.BadRequest(errors.Wrap(err, "failed to unmarshal zeus checkout response"))
}
return a.Data.RedirectURL, nil
case 400:
return "", model.BadRequest(errors.Wrap(errors.New(string(body)),
fmt.Sprintf("bad request error received from %v", C.GatewayUrl)))
case 401:
return "", model.Unauthorized(errors.Wrap(errors.New(string(body)),
fmt.Sprintf("unauthorized request error received from %v", C.GatewayUrl)))
default:
return "", model.InternalError(errors.Wrap(errors.New(string(body)),
fmt.Sprintf("internal request error received from %v", C.GatewayUrl)))
}
return gjson.GetBytes(response, "url").String(), nil
}
func PortalSession(ctx context.Context, checkoutRequest *model.PortalRequest, licenseKey string) (string, *model.ApiError) {
hClient := &http.Client{}
reqString, err := json.Marshal(checkoutRequest)
func PortalSession(ctx context.Context, portalRequest *model.PortalRequest, licenseKey string, zeus zeus.Zeus) (string, error) {
body, err := json.Marshal(portalRequest)
if err != nil {
return "", model.BadRequest(err)
return "", err
}
req, err := http.NewRequestWithContext(ctx, "POST", C.GatewayUrl+"/v2/subscriptions/me/sessions/portal", bytes.NewBuffer(reqString))
response, err := zeus.GetPortalURL(ctx, licenseKey, body)
if err != nil {
return "", model.BadRequest(err)
return "", err
}
req.Header.Set("X-Signoz-Cloud-Api-Key", licenseKey)
response, err := hClient.Do(req)
if err != nil {
return "", model.BadRequest(err)
}
body, err := io.ReadAll(response.Body)
if err != nil {
return "", model.BadRequest(errors.Wrap(err, fmt.Sprintf("failed to read portal response from %v", C.GatewayUrl)))
}
defer response.Body.Close()
switch response.StatusCode {
case 201:
a := CheckoutResponse{}
err = json.Unmarshal(body, &a)
if err != nil {
return "", model.BadRequest(errors.Wrap(err, "failed to unmarshal zeus portal response"))
}
return a.Data.RedirectURL, nil
case 400:
return "", model.BadRequest(errors.Wrap(errors.New(string(body)),
fmt.Sprintf("bad request error received from %v", C.GatewayUrl)))
case 401:
return "", model.Unauthorized(errors.Wrap(errors.New(string(body)),
fmt.Sprintf("unauthorized request error received from %v", C.GatewayUrl)))
default:
return "", model.InternalError(errors.Wrap(errors.New(string(body)),
fmt.Sprintf("internal request error received from %v", C.GatewayUrl)))
}
return gjson.GetBytes(response, "url").String(), nil
}

View File

@ -6,14 +6,13 @@ import (
"time"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
"sync"
baseconstants "github.com/SigNoz/signoz/pkg/query-service/constants"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/zeus"
validate "github.com/SigNoz/signoz/ee/query-service/integrations/signozio"
"github.com/SigNoz/signoz/ee/query-service/model"
@ -29,6 +28,7 @@ var validationFrequency = 24 * 60 * time.Minute
type Manager struct {
repo *Repo
zeus zeus.Zeus
mutex sync.Mutex
validatorRunning bool
// end the license validation, this is important to gracefully
@ -45,7 +45,7 @@ type Manager struct {
activeFeatures basemodel.FeatureSet
}
func StartManager(db *sqlx.DB, store sqlstore.SQLStore, features ...basemodel.Feature) (*Manager, error) {
func StartManager(db *sqlx.DB, store sqlstore.SQLStore, zeus zeus.Zeus, features ...basemodel.Feature) (*Manager, error) {
if LM != nil {
return LM, nil
}
@ -53,6 +53,7 @@ func StartManager(db *sqlx.DB, store sqlstore.SQLStore, features ...basemodel.Fe
repo := NewLicenseRepo(db, store)
m := &Manager{
repo: &repo,
zeus: zeus,
}
if err := m.start(features...); err != nil {
return m, err
@ -172,17 +173,15 @@ func (lm *Manager) ValidatorV3(ctx context.Context) {
}
}
func (lm *Manager) RefreshLicense(ctx context.Context) *model.ApiError {
license, apiError := validate.ValidateLicenseV3(lm.activeLicenseV3.Key)
if apiError != nil {
zap.L().Error("failed to validate license", zap.Error(apiError.Err))
return apiError
func (lm *Manager) RefreshLicense(ctx context.Context) error {
license, err := validate.ValidateLicenseV3(ctx, lm.activeLicenseV3.Key, lm.zeus)
if err != nil {
return err
}
err := lm.repo.UpdateLicenseV3(ctx, license)
err = lm.repo.UpdateLicenseV3(ctx, license)
if err != nil {
return model.BadRequest(errors.Wrap(err, "failed to update the new license"))
return err
}
lm.SetActiveV3(license)
@ -190,7 +189,6 @@ func (lm *Manager) RefreshLicense(ctx context.Context) *model.ApiError {
}
func (lm *Manager) ValidateV3(ctx context.Context) (reterr error) {
zap.L().Info("License validation started")
if lm.activeLicenseV3 == nil {
return nil
}
@ -236,28 +234,17 @@ func (lm *Manager) ValidateV3(ctx context.Context) (reterr error) {
return nil
}
func (lm *Manager) ActivateV3(ctx context.Context, licenseKey string) (licenseResponse *model.LicenseV3, errResponse *model.ApiError) {
defer func() {
if errResponse != nil {
claims, err := authtypes.ClaimsFromContext(ctx)
if err != nil {
telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_LICENSE_ACT_FAILED,
map[string]interface{}{"err": errResponse.Err.Error()}, claims.Email, true, false)
}
}
}()
license, apiError := validate.ValidateLicenseV3(licenseKey)
if apiError != nil {
zap.L().Error("failed to get the license", zap.Error(apiError.Err))
return nil, apiError
func (lm *Manager) ActivateV3(ctx context.Context, licenseKey string) (*model.LicenseV3, error) {
license, err := validate.ValidateLicenseV3(ctx, licenseKey, lm.zeus)
if err != nil {
return nil, err
}
// insert the new license to the sqlite db
err := lm.repo.InsertLicenseV3(ctx, license)
if err != nil {
zap.L().Error("failed to activate license", zap.Error(err))
return nil, err
modelErr := lm.repo.InsertLicenseV3(ctx, license)
if modelErr != nil {
zap.L().Error("failed to activate license", zap.Error(modelErr))
return nil, modelErr
}
// license is valid, activate it

View File

@ -8,6 +8,8 @@ import (
"github.com/SigNoz/signoz/ee/query-service/app"
"github.com/SigNoz/signoz/ee/sqlstore/postgressqlstore"
"github.com/SigNoz/signoz/ee/zeus"
"github.com/SigNoz/signoz/ee/zeus/httpzeus"
"github.com/SigNoz/signoz/pkg/config"
"github.com/SigNoz/signoz/pkg/config/envprovider"
"github.com/SigNoz/signoz/pkg/config/fileprovider"
@ -106,6 +108,8 @@ func main() {
signoz, err := signoz.New(
context.Background(),
config,
zeus.Config(),
httpzeus.NewProviderFactory(),
signoz.NewCacheProviderFactories(),
signoz.NewWebProviderFactories(),
sqlStoreFactories,

View File

@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"regexp"
"strings"
"sync/atomic"
"time"
@ -16,10 +15,10 @@ import (
"go.uber.org/zap"
"github.com/SigNoz/signoz/ee/query-service/dao"
licenseserver "github.com/SigNoz/signoz/ee/query-service/integrations/signozio"
"github.com/SigNoz/signoz/ee/query-service/license"
"github.com/SigNoz/signoz/ee/query-service/model"
"github.com/SigNoz/signoz/pkg/query-service/utils/encryption"
"github.com/SigNoz/signoz/pkg/zeus"
)
const (
@ -42,26 +41,16 @@ type Manager struct {
modelDao dao.ModelDao
tenantID string
zeus zeus.Zeus
}
func New(modelDao dao.ModelDao, licenseRepo *license.Repo, clickhouseConn clickhouse.Conn, chUrl string) (*Manager, error) {
hostNameRegex := regexp.MustCompile(`tcp://(?P<hostname>.*):`)
hostNameRegexMatches := hostNameRegex.FindStringSubmatch(chUrl)
tenantID := ""
if len(hostNameRegexMatches) == 2 {
tenantID = hostNameRegexMatches[1]
tenantID = strings.TrimSuffix(tenantID, "-clickhouse")
}
func New(modelDao dao.ModelDao, licenseRepo *license.Repo, clickhouseConn clickhouse.Conn, zeus zeus.Zeus) (*Manager, error) {
m := &Manager{
// repository: repo,
clickhouseConn: clickhouseConn,
licenseRepo: licenseRepo,
scheduler: gocron.NewScheduler(time.UTC).Every(1).Day().At("00:00"), // send usage every at 00:00 UTC
modelDao: modelDao,
tenantID: tenantID,
zeus: zeus,
}
return m, nil
}
@ -158,7 +147,7 @@ func (lm *Manager) UploadUsage() {
usageData.Type = usage.Type
usageData.Tenant = "default"
usageData.OrgName = "default"
usageData.TenantId = lm.tenantID
usageData.TenantId = "default"
usagesPayload = append(usagesPayload, usageData)
}
@ -167,24 +156,18 @@ func (lm *Manager) UploadUsage() {
LicenseKey: key,
Usage: usagesPayload,
}
lm.UploadUsageWithExponentalBackOff(ctx, payload)
}
func (lm *Manager) UploadUsageWithExponentalBackOff(ctx context.Context, payload model.UsagePayload) {
for i := 1; i <= MaxRetries; i++ {
apiErr := licenseserver.SendUsage(ctx, payload)
if apiErr != nil && i == MaxRetries {
zap.L().Error("retries stopped : %v", zap.Error(apiErr))
// not returning error here since it is captured in the failed count
return
} else if apiErr != nil {
// sleeping for exponential backoff
sleepDuration := RetryInterval * time.Duration(i)
zap.L().Error("failed to upload snapshot retrying after %v secs : %v", zap.Duration("sleepDuration", sleepDuration), zap.Error(apiErr.Err))
time.Sleep(sleepDuration)
} else {
break
}
body, errv2 := json.Marshal(payload)
if errv2 != nil {
zap.L().Error("error while marshalling usage payload: %v", zap.Error(errv2))
return
}
errv2 = lm.zeus.PutMeters(ctx, payload.LicenseKey.String(), body)
if errv2 != nil {
zap.L().Error("failed to upload usage: %v", zap.Error(errv2))
// not returning error here since it is captured in the failed count
return
}
}

42
ee/zeus/config.go Normal file
View File

@ -0,0 +1,42 @@
package zeus
import (
"fmt"
neturl "net/url"
"sync"
"github.com/SigNoz/signoz/pkg/zeus"
)
// This will be set via ldflags at build time.
var (
url string = "<unset>"
deprecatedURL string = "<unset>"
)
var (
config zeus.Config
once sync.Once
)
// initializes the Zeus configuration
func Config() zeus.Config {
once.Do(func() {
parsedURL, err := neturl.Parse(url)
if err != nil {
panic(fmt.Errorf("invalid zeus URL: %w", err))
}
deprecatedParsedURL, err := neturl.Parse(deprecatedURL)
if err != nil {
panic(fmt.Errorf("invalid zeus deprecated URL: %w", err))
}
config = zeus.Config{URL: parsedURL, DeprecatedURL: deprecatedParsedURL}
if err := config.Validate(); err != nil {
panic(fmt.Errorf("invalid zeus config: %w", err))
}
})
return config
}

View File

@ -0,0 +1,189 @@
package httpzeus
import (
"bytes"
"context"
"io"
"net/http"
"net/url"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/http/client"
"github.com/SigNoz/signoz/pkg/zeus"
"github.com/tidwall/gjson"
)
type Provider struct {
settings factory.ScopedProviderSettings
config zeus.Config
httpClient *client.Client
}
func NewProviderFactory() factory.ProviderFactory[zeus.Zeus, zeus.Config] {
return factory.NewProviderFactory(factory.MustNewName("http"), func(ctx context.Context, providerSettings factory.ProviderSettings, config zeus.Config) (zeus.Zeus, error) {
return New(ctx, providerSettings, config)
})
}
func New(ctx context.Context, providerSettings factory.ProviderSettings, config zeus.Config) (zeus.Zeus, error) {
settings := factory.NewScopedProviderSettings(providerSettings, "github.com/SigNoz/signoz/ee/zeus/httpzeus")
httpClient, err := client.New(
settings.Logger(),
providerSettings.TracerProvider,
providerSettings.MeterProvider,
client.WithRequestResponseLog(true),
client.WithRetryCount(3),
)
if err != nil {
return nil, err
}
return &Provider{
settings: settings,
config: config,
httpClient: httpClient,
}, nil
}
func (provider *Provider) GetLicense(ctx context.Context, key string) ([]byte, error) {
response, err := provider.do(
ctx,
provider.config.URL.JoinPath("/v2/licenses/me"),
http.MethodGet,
key,
nil,
)
if err != nil {
return nil, err
}
return []byte(gjson.GetBytes(response, "data").String()), nil
}
func (provider *Provider) GetCheckoutURL(ctx context.Context, key string, body []byte) ([]byte, error) {
response, err := provider.do(
ctx,
provider.config.URL.JoinPath("/v2/subscriptions/me/sessions/checkout"),
http.MethodPost,
key,
body,
)
if err != nil {
return nil, err
}
return []byte(gjson.GetBytes(response, "data").String()), nil
}
func (provider *Provider) GetPortalURL(ctx context.Context, key string, body []byte) ([]byte, error) {
response, err := provider.do(
ctx,
provider.config.URL.JoinPath("/v2/subscriptions/me/sessions/portal"),
http.MethodPost,
key,
body,
)
if err != nil {
return nil, err
}
return []byte(gjson.GetBytes(response, "data").String()), nil
}
func (provider *Provider) GetDeployment(ctx context.Context, key string) ([]byte, error) {
response, err := provider.do(
ctx,
provider.config.URL.JoinPath("/v2/deployments/me"),
http.MethodGet,
key,
nil,
)
if err != nil {
return nil, err
}
return []byte(gjson.GetBytes(response, "data").String()), nil
}
func (provider *Provider) PutMeters(ctx context.Context, key string, data []byte) error {
_, err := provider.do(
ctx,
provider.config.DeprecatedURL.JoinPath("/api/v1/usage"),
http.MethodPost,
key,
data,
)
return err
}
func (provider *Provider) PutProfile(ctx context.Context, key string, body []byte) error {
_, err := provider.do(
ctx,
provider.config.URL.JoinPath("/v2/profiles/me"),
http.MethodPut,
key,
body,
)
return err
}
func (provider *Provider) PutHost(ctx context.Context, key string, body []byte) error {
_, err := provider.do(
ctx,
provider.config.URL.JoinPath("/v2/deployments/me/hosts"),
http.MethodPut,
key,
body,
)
return err
}
func (provider *Provider) do(ctx context.Context, url *url.URL, method string, key string, requestBody []byte) ([]byte, error) {
request, err := http.NewRequestWithContext(ctx, method, url.String(), bytes.NewBuffer(requestBody))
if err != nil {
return nil, err
}
request.Header.Set("X-Signoz-Cloud-Api-Key", key)
request.Header.Set("Content-Type", "application/json")
response, err := provider.httpClient.Do(request)
if err != nil {
return nil, err
}
defer func() {
_ = response.Body.Close()
}()
body, err := io.ReadAll(response.Body)
if err != nil {
return nil, err
}
if response.StatusCode/100 == 2 {
return body, nil
}
return nil, provider.errFromStatusCode(response.StatusCode)
}
// This can be taken down to the client package
func (provider *Provider) errFromStatusCode(statusCode int) error {
switch statusCode {
case http.StatusBadRequest:
return errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "bad request")
case http.StatusUnauthorized:
return errors.Newf(errors.TypeUnauthenticated, errors.CodeUnauthenticated, "unauthenticated")
case http.StatusForbidden:
return errors.Newf(errors.TypeForbidden, errors.CodeForbidden, "forbidden")
case http.StatusNotFound:
return errors.Newf(errors.TypeNotFound, errors.CodeNotFound, "not found")
}
return errors.Newf(errors.TypeInternal, errors.CodeInternal, "internal")
}

View File

@ -6,6 +6,7 @@ import (
"time"
"github.com/SigNoz/signoz/pkg/http/client/plugin"
"github.com/gojek/heimdall/v7"
"github.com/gojek/heimdall/v7/httpclient"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel/metric"
@ -33,6 +34,15 @@ func New(logger *slog.Logger, tracerProvider trace.TracerProvider, meterProvider
Transport: otelhttp.NewTransport(http.DefaultTransport, otelhttp.WithTracerProvider(tracerProvider), otelhttp.WithMeterProvider(meterProvider)),
}
if clientOpts.retriable == nil {
clientOpts.retriable = heimdall.NewRetrier(
heimdall.NewConstantBackoff(
2*time.Second,
100*time.Millisecond,
),
)
}
c := httpclient.NewClient(
httpclient.WithHTTPClient(netc),
httpclient.WithRetrier(clientOpts.retriable),

View File

@ -63,7 +63,7 @@ func (plugin *reqResLog) OnRequestEnd(request *http.Request, response *http.Resp
func (plugin *reqResLog) OnError(request *http.Request, err error) {
host, port, _ := net.SplitHostPort(request.Host)
fields := []any{
err,
"error", err,
string(semconv.HTTPRequestMethodKey), request.Method,
string(semconv.URLPathKey), request.URL.Path,
string(semconv.URLSchemeKey), request.URL.Scheme,

View File

@ -14,6 +14,8 @@ import (
"github.com/SigNoz/signoz/pkg/signoz"
"github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/version"
"github.com/SigNoz/signoz/pkg/zeus"
"github.com/SigNoz/signoz/pkg/zeus/noopzeus"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
@ -97,6 +99,8 @@ func main() {
signoz, err := signoz.New(
context.Background(),
config,
zeus.Config{},
noopzeus.NewProviderFactory(),
signoz.NewCacheProviderFactories(),
signoz.NewWebProviderFactories(),
signoz.NewSQLStoreProviderFactories(),

View File

@ -13,6 +13,7 @@ import (
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/version"
"github.com/SigNoz/signoz/pkg/zeus"
"github.com/SigNoz/signoz/pkg/web"
)
@ -26,6 +27,7 @@ type SigNoz struct {
TelemetryStore telemetrystore.TelemetryStore
Prometheus prometheus.Prometheus
Alertmanager alertmanager.Alertmanager
Zeus zeus.Zeus
Modules Modules
Handlers Handlers
}
@ -33,6 +35,8 @@ type SigNoz struct {
func New(
ctx context.Context,
config Config,
zeusConfig zeus.Config,
zeusProviderFactory factory.ProviderFactory[zeus.Zeus, zeus.Config],
cacheProviderFactories factory.NamedMap[factory.ProviderFactory[cache.Cache, cache.Config]],
webProviderFactories factory.NamedMap[factory.ProviderFactory[web.Web, web.Config]],
sqlstoreProviderFactories factory.NamedMap[factory.ProviderFactory[sqlstore.SQLStore, sqlstore.Config]],
@ -50,6 +54,17 @@ func New(
// Get the provider settings from instrumentation
providerSettings := instrumentation.ToProviderSettings()
// Initialize zeus from the available zeus provider factory. This is not config controlled
// and depends on the variant of the build.
zeus, err := zeusProviderFactory.New(
ctx,
providerSettings,
zeusConfig,
)
if err != nil {
return nil, err
}
// Initialize cache from the available cache provider factories
cache, err := factory.NewProviderFromNamedMap(
ctx,
@ -162,6 +177,7 @@ func New(
TelemetryStore: telemetrystore,
Prometheus: prometheus,
Alertmanager: alertmanager,
Zeus: zeus,
Modules: modules,
Handlers: handlers,
}, nil

18
pkg/zeus/config.go Normal file
View File

@ -0,0 +1,18 @@
package zeus
import (
"net/url"
"github.com/SigNoz/signoz/pkg/factory"
)
var _ factory.Config = (*Config)(nil)
type Config struct {
URL *url.URL `mapstructure:"url"`
DeprecatedURL *url.URL `mapstructure:"deprecated_url"`
}
func (c Config) Validate() error {
return nil
}

View File

@ -0,0 +1,49 @@
package noopzeus
import (
"context"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/zeus"
)
type provider struct{}
func NewProviderFactory() factory.ProviderFactory[zeus.Zeus, zeus.Config] {
return factory.NewProviderFactory(factory.MustNewName("noop"), func(ctx context.Context, providerSettings factory.ProviderSettings, config zeus.Config) (zeus.Zeus, error) {
return New(ctx, providerSettings, config)
})
}
func New(_ context.Context, _ factory.ProviderSettings, _ zeus.Config) (zeus.Zeus, error) {
return &provider{}, nil
}
func (provider *provider) GetLicense(_ context.Context, _ string) ([]byte, error) {
return nil, errors.New(errors.TypeUnsupported, zeus.ErrCodeUnsupported, "fetching license is not supported")
}
func (provider *provider) GetCheckoutURL(_ context.Context, _ string, _ []byte) ([]byte, error) {
return nil, errors.New(errors.TypeUnsupported, zeus.ErrCodeUnsupported, "getting the checkout url is not supported")
}
func (provider *provider) GetPortalURL(_ context.Context, _ string, _ []byte) ([]byte, error) {
return nil, errors.New(errors.TypeUnsupported, zeus.ErrCodeUnsupported, "getting the portal url is not supported")
}
func (provider *provider) GetDeployment(_ context.Context, _ string) ([]byte, error) {
return nil, errors.New(errors.TypeUnsupported, zeus.ErrCodeUnsupported, "getting the deployment is not supported")
}
func (provider *provider) PutMeters(_ context.Context, _ string, _ []byte) error {
return errors.New(errors.TypeUnsupported, zeus.ErrCodeUnsupported, "putting meters is not supported")
}
func (provider *provider) PutProfile(_ context.Context, _ string, _ []byte) error {
return errors.New(errors.TypeUnsupported, zeus.ErrCodeUnsupported, "putting profile is not supported")
}
func (provider *provider) PutHost(_ context.Context, _ string, _ []byte) error {
return errors.New(errors.TypeUnsupported, zeus.ErrCodeUnsupported, "putting host is not supported")
}

35
pkg/zeus/zeus.go Normal file
View File

@ -0,0 +1,35 @@
package zeus
import (
"context"
"github.com/SigNoz/signoz/pkg/errors"
)
var (
ErrCodeUnsupported = errors.MustNewCode("zeus_unsupported")
ErrCodeResponseMalformed = errors.MustNewCode("zeus_response_malformed")
)
type Zeus interface {
// Returns the license for the given key.
GetLicense(context.Context, string) ([]byte, error)
// Returns the checkout URL for the given license key.
GetCheckoutURL(context.Context, string, []byte) ([]byte, error)
// Returns the portal URL for the given license key.
GetPortalURL(context.Context, string, []byte) ([]byte, error)
// Returns the deployment for the given license key.
GetDeployment(context.Context, string) ([]byte, error)
// Puts the meters for the given license key.
PutMeters(context.Context, string, []byte) error
// Put profile for the given license key.
PutProfile(context.Context, string, []byte) error
// Put host for the given license key.
PutHost(context.Context, string, []byte) error
}

View File

@ -1,4 +1,5 @@
import http
import json
import requests
from wiremock.client import (
@ -69,3 +70,162 @@ def test_apply_license(signoz: SigNoz, make_http_mocks, get_jwt_token) -> None:
)
assert response.json()["count"] >= 1
def test_refresh_license(signoz: SigNoz, make_http_mocks, get_jwt_token) -> None:
make_http_mocks(
signoz.zeus,
[
Mapping(
request=MappingRequest(
method=HttpMethods.GET,
url="/v2/licenses/me",
headers={
"X-Signoz-Cloud-Api-Key": {
WireMockMatchers.EQUAL_TO: "secret-key"
}
},
),
response=MappingResponse(
status=200,
json_body={
"status": "success",
"data": {
"id": "0196360e-90cd-7a74-8313-1aa815ce2a67",
"key": "secret-key",
"valid_from": 1732146922,
"valid_until": -1,
"status": "VALID",
"state": "EVALUATING",
"plan": {
"name": "ENTERPRISE",
},
"platform": "CLOUD",
"features": [],
"event_queue": {},
},
},
),
persistent=False,
)
],
)
access_token = get_jwt_token("admin@integration.test", "password")
response = requests.put(
url=signoz.self.host_config.get("/api/v3/licenses"),
headers={"Authorization": "Bearer " + access_token},
timeout=5,
)
assert response.status_code == http.HTTPStatus.NO_CONTENT
cursor = signoz.sqlstore.conn.cursor()
cursor.execute(
"SELECT data FROM licenses_v3 WHERE id='0196360e-90cd-7a74-8313-1aa815ce2a67'"
)
record = cursor.fetchone()[0]
assert json.loads(record)["valid_from"] == 1732146922
response = requests.post(
url=signoz.zeus.host_config.get("/__admin/requests/count"),
json={"method": "GET", "url": "/v2/licenses/me"},
timeout=5,
)
assert response.json()["count"] >= 1
def test_license_checkout(signoz: SigNoz, make_http_mocks, get_jwt_token) -> None:
make_http_mocks(
signoz.zeus,
[
Mapping(
request=MappingRequest(
method=HttpMethods.POST,
url="/v2/subscriptions/me/sessions/checkout",
headers={
"X-Signoz-Cloud-Api-Key": {
WireMockMatchers.EQUAL_TO: "secret-key"
}
},
),
response=MappingResponse(
status=200,
json_body={
"status": "success",
"data": {"url": "https://signoz.checkout.com"},
},
),
persistent=False,
)
],
)
access_token = get_jwt_token("admin@integration.test", "password")
response = requests.post(
url=signoz.self.host_config.get("/api/v1/checkout"),
json={"url": "https://integration-signoz.com"},
headers={"Authorization": "Bearer " + access_token},
timeout=5,
)
assert response.status_code == http.HTTPStatus.OK
assert response.json()["data"]["redirectURL"] == "https://signoz.checkout.com"
response = requests.post(
url=signoz.zeus.host_config.get("/__admin/requests/count"),
json={"method": "POST", "url": "/v2/subscriptions/me/sessions/checkout"},
timeout=5,
)
assert response.json()["count"] == 1
def test_license_portal(signoz: SigNoz, make_http_mocks, get_jwt_token) -> None:
make_http_mocks(
signoz.zeus,
[
Mapping(
request=MappingRequest(
method=HttpMethods.POST,
url="/v2/subscriptions/me/sessions/portal",
headers={
"X-Signoz-Cloud-Api-Key": {
WireMockMatchers.EQUAL_TO: "secret-key"
}
},
),
response=MappingResponse(
status=200,
json_body={
"status": "success",
"data": {"url": "https://signoz.portal.com"},
},
),
persistent=False,
)
],
)
access_token = get_jwt_token("admin@integration.test", "password")
response = requests.post(
url=signoz.self.host_config.get("/api/v1/portal"),
json={"url": "https://integration-signoz.com"},
headers={"Authorization": "Bearer " + access_token},
timeout=5,
)
assert response.status_code == http.HTTPStatus.OK
assert response.json()["data"]["redirectURL"] == "https://signoz.portal.com"
response = requests.post(
url=signoz.zeus.host_config.get("/__admin/requests/count"),
json={"method": "POST", "url": "/v2/subscriptions/me/sessions/portal"},
timeout=5,
)
assert response.json()["count"] == 1

View File

@ -1,7 +1,9 @@
from fixtures import types
import requests
from http import HTTPStatus
import requests
from fixtures import types
def test_api_key(signoz: types.SigNoz, get_jwt_token) -> None:
admin_token = get_jwt_token("admin@integration.test", "password")