Merge branch 'develop' into issue-3400-ui-horizontal-scroll-logs-context

This commit is contained in:
Palash Gupta 2023-08-23 17:01:15 +05:30 committed by GitHub
commit 1dd7bdb100
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 399 additions and 94 deletions

View File

@ -1,8 +1,7 @@
version: "3.9" version: "3.9"
x-clickhouse-defaults: x-clickhouse-defaults: &clickhouse-defaults
&clickhouse-defaults image: clickhouse/clickhouse-server:23.7.3-alpine
image: clickhouse/clickhouse-server:22.8.8-alpine
tty: true tty: true
deploy: deploy:
restart_policy: restart_policy:
@ -34,8 +33,7 @@ x-clickhouse-defaults:
soft: 262144 soft: 262144
hard: 262144 hard: 262144
x-clickhouse-depend: x-clickhouse-depend: &clickhouse-depend
&clickhouse-depend
depends_on: depends_on:
- clickhouse - clickhouse
# - clickhouse-2 # - clickhouse-2

View File

@ -28,6 +28,18 @@ server {
proxy_pass http://alertmanager:9093/api/v2; proxy_pass http://alertmanager:9093/api/v2;
} }
location ~ ^/api/(v1|v3)/logs/(tail|livetail){
proxy_pass http://query-service:8080;
proxy_http_version 1.1;
# connection will be closed if no data is read for 600s between successive read operations
proxy_read_timeout 600s;
# dont buffer the data send it directly to client.
proxy_buffering off;
proxy_cache off;
}
location /api { location /api {
proxy_pass http://query-service:8080/api; proxy_pass http://query-service:8080/api;
# connection will be closed if no data is read for 600s between successive read operations # connection will be closed if no data is read for 600s between successive read operations

View File

@ -2,7 +2,7 @@ version: "2.4"
services: services:
clickhouse: clickhouse:
image: clickhouse/clickhouse-server:22.8.8-alpine image: clickhouse/clickhouse-server:23.7.3-alpine
container_name: signoz-clickhouse container_name: signoz-clickhouse
# ports: # ports:
# - "9000:9000" # - "9000:9000"

View File

@ -1,9 +1,9 @@
version: "2.4" version: "2.4"
x-clickhouse-defaults: x-clickhouse-defaults: &clickhouse-defaults
&clickhouse-defaults
restart: on-failure restart: on-failure
image: clickhouse/clickhouse-server:22.8.8-alpine # addding non LTS version due to this fix https://github.com/ClickHouse/ClickHouse/commit/32caf8716352f45c1b617274c7508c86b7d1afab
image: clickhouse/clickhouse-server:23.7.3-alpine
tty: true tty: true
depends_on: depends_on:
- zookeeper-1 - zookeeper-1
@ -32,8 +32,7 @@ x-clickhouse-defaults:
soft: 262144 soft: 262144
hard: 262144 hard: 262144
x-clickhouse-depend: x-clickhouse-depend: &clickhouse-depend
&clickhouse-depend
depends_on: depends_on:
clickhouse: clickhouse:
condition: service_healthy condition: service_healthy

View File

@ -28,6 +28,18 @@ server {
proxy_pass http://alertmanager:9093/api/v2; proxy_pass http://alertmanager:9093/api/v2;
} }
location ~ ^/api/(v1|v3)/logs/(tail|livetail){
proxy_pass http://query-service:8080;
proxy_http_version 1.1;
# connection will be closed if no data is read for 600s between successive read operations
proxy_read_timeout 600s;
# dont buffer the data send it directly to client.
proxy_buffering off;
proxy_cache off;
}
location /api { location /api {
proxy_pass http://query-service:8080/api; proxy_pass http://query-service:8080/api;
# connection will be closed if no data is read for 600s between successive read operations # connection will be closed if no data is read for 600s between successive read operations

View File

@ -113,7 +113,7 @@ func (ah *APIHandler) registerUser(w http.ResponseWriter, r *http.Request) {
} }
if domain != nil && domain.SsoEnabled { if domain != nil && domain.SsoEnabled {
// so is enabled, create user and respond precheck data // sso is enabled, create user and respond precheck data
user, apierr := baseauth.RegisterInvitedUser(ctx, req, true) user, apierr := baseauth.RegisterInvitedUser(ctx, req, true)
if apierr != nil { if apierr != nil {
RespondError(w, apierr, nil) RespondError(w, apierr, nil)

View File

@ -5,15 +5,60 @@ import (
"fmt" "fmt"
"net/url" "net/url"
"strings" "strings"
"time"
"github.com/google/uuid"
"go.signoz.io/signoz/ee/query-service/constants" "go.signoz.io/signoz/ee/query-service/constants"
"go.signoz.io/signoz/ee/query-service/model" "go.signoz.io/signoz/ee/query-service/model"
baseauth "go.signoz.io/signoz/pkg/query-service/auth"
baseconst "go.signoz.io/signoz/pkg/query-service/constants" baseconst "go.signoz.io/signoz/pkg/query-service/constants"
basemodel "go.signoz.io/signoz/pkg/query-service/model" basemodel "go.signoz.io/signoz/pkg/query-service/model"
baseauth "go.signoz.io/signoz/pkg/query-service/auth" "go.signoz.io/signoz/pkg/query-service/utils"
"go.uber.org/zap" "go.uber.org/zap"
) )
func (m *modelDao) createUserForSAMLRequest(ctx context.Context, email string) (*basemodel.User, basemodel.BaseApiError) {
// get auth domain from email domain
domain, apierr := m.GetDomainByEmail(ctx, email)
if apierr != nil {
zap.S().Errorf("failed to get domain from email", apierr)
return nil, model.InternalErrorStr("failed to get domain from email")
}
hash, err := baseauth.PasswordHash(utils.GeneratePassowrd())
if err != nil {
zap.S().Errorf("failed to generate password hash when registering a user via SSO redirect", zap.Error(err))
return nil, model.InternalErrorStr("failed to generate password hash")
}
group, apiErr := m.GetGroupByName(ctx, baseconst.ViewerGroup)
if apiErr != nil {
zap.S().Debugf("GetGroupByName failed, err: %v\n", apiErr.Err)
return nil, apiErr
}
user := &basemodel.User{
Id: uuid.NewString(),
Name: "",
Email: email,
Password: hash,
CreatedAt: time.Now().Unix(),
ProfilePictureURL: "", // Currently unused
GroupId: group.Id,
OrgId: domain.OrgId,
}
user, apiErr = m.CreateUser(ctx, user, false)
if apiErr != nil {
zap.S().Debugf("CreateUser failed, err: %v\n", apiErr.Err)
return nil, apiErr
}
return user, nil
}
// PrepareSsoRedirect prepares redirect page link after SSO response // PrepareSsoRedirect prepares redirect page link after SSO response
// is successfully parsed (i.e. valid email is available) // is successfully parsed (i.e. valid email is available)
func (m *modelDao) PrepareSsoRedirect(ctx context.Context, redirectUri, email string) (redirectURL string, apierr basemodel.BaseApiError) { func (m *modelDao) PrepareSsoRedirect(ctx context.Context, redirectUri, email string) (redirectURL string, apierr basemodel.BaseApiError) {
@ -24,7 +69,20 @@ func (m *modelDao) PrepareSsoRedirect(ctx context.Context, redirectUri, email st
return "", model.BadRequestStr("invalid user email received from the auth provider") return "", model.BadRequestStr("invalid user email received from the auth provider")
} }
tokenStore, err := baseauth.GenerateJWTForUser(&userPayload.User) user := &basemodel.User{}
if userPayload == nil {
newUser, apiErr := m.createUserForSAMLRequest(ctx, email)
user = newUser
if apiErr != nil {
zap.S().Errorf("failed to create user with email received from auth provider: %v", apierr.Error())
return "", apiErr
}
} else {
user = &userPayload.User
}
tokenStore, err := baseauth.GenerateJWTForUser(user)
if err != nil { if err != nil {
zap.S().Errorf("failed to generate token for SSO login user", err) zap.S().Errorf("failed to generate token for SSO login user", err)
return "", model.InternalErrorStr("failed to generate token for the user") return "", model.InternalErrorStr("failed to generate token for the user")
@ -33,7 +91,7 @@ func (m *modelDao) PrepareSsoRedirect(ctx context.Context, redirectUri, email st
return fmt.Sprintf("%s?jwt=%s&usr=%s&refreshjwt=%s", return fmt.Sprintf("%s?jwt=%s&usr=%s&refreshjwt=%s",
redirectUri, redirectUri,
tokenStore.AccessJwt, tokenStore.AccessJwt,
userPayload.User.Id, user.Id,
tokenStore.RefreshJwt), nil tokenStore.RefreshJwt), nil
} }
@ -76,6 +134,7 @@ func (m *modelDao) PrecheckLogin(ctx context.Context, email, sourceUrl string) (
if userPayload == nil { if userPayload == nil {
resp.IsUser = false resp.IsUser = false
} }
ssoAvailable := true ssoAvailable := true
err := m.checkFeature(model.SSO) err := m.checkFeature(model.SSO)
if err != nil { if err != nil {
@ -91,6 +150,8 @@ func (m *modelDao) PrecheckLogin(ctx context.Context, email, sourceUrl string) (
if ssoAvailable { if ssoAvailable {
resp.IsUser = true
// find domain from email // find domain from email
orgDomain, apierr := m.GetDomainByEmail(ctx, email) orgDomain, apierr := m.GetDomainByEmail(ctx, email)
if apierr != nil { if apierr != nil {

View File

@ -4,8 +4,8 @@ import (
"context" "context"
"database/sql" "database/sql"
"encoding/json" "encoding/json"
"net/url"
"fmt" "fmt"
"net/url"
"strings" "strings"
"time" "time"
@ -29,28 +29,69 @@ type StoredDomain struct {
// GetDomainFromSsoResponse uses relay state received from IdP to fetch // GetDomainFromSsoResponse uses relay state received from IdP to fetch
// user domain. The domain is further used to process validity of the response. // user domain. The domain is further used to process validity of the response.
// when sending login request to IdP we send relay state as URL (site url) // when sending login request to IdP we send relay state as URL (site url)
// with domainId as query parameter. // with domainId or domainName as query parameter.
func (m *modelDao) GetDomainFromSsoResponse(ctx context.Context, relayState *url.URL) (*model.OrgDomain, error) { func (m *modelDao) GetDomainFromSsoResponse(ctx context.Context, relayState *url.URL) (*model.OrgDomain, error) {
// derive domain id from relay state now // derive domain id from relay state now
var domainIdStr string var domainIdStr string
var domainNameStr string
var domain *model.OrgDomain
for k, v := range relayState.Query() { for k, v := range relayState.Query() {
if k == "domainId" && len(v) > 0 { if k == "domainId" && len(v) > 0 {
domainIdStr = strings.Replace(v[0], ":", "-", -1) domainIdStr = strings.Replace(v[0], ":", "-", -1)
} }
if k == "domainName" && len(v) > 0 {
domainNameStr = v[0]
}
} }
domainId, err := uuid.Parse(domainIdStr) if domainIdStr != "" {
domainId, err := uuid.Parse(domainIdStr)
if err != nil {
zap.S().Errorf("failed to parse domainId from relay state", err)
return nil, fmt.Errorf("failed to parse domainId from IdP response")
}
domain, err = m.GetDomain(ctx, domainId)
if (err != nil) || domain == nil {
zap.S().Errorf("failed to find domain from domainId received in IdP response", err.Error())
return nil, fmt.Errorf("invalid credentials")
}
}
if domainNameStr != "" {
domainFromDB, err := m.GetDomainByName(ctx, domainNameStr)
domain = domainFromDB
if (err != nil) || domain == nil {
zap.S().Errorf("failed to find domain from domainName received in IdP response", err.Error())
return nil, fmt.Errorf("invalid credentials")
}
}
if domain != nil {
return domain, nil
}
return nil, fmt.Errorf("failed to find domain received in IdP response")
}
// GetDomainByName returns org domain for a given domain name
func (m *modelDao) GetDomainByName(ctx context.Context, name string) (*model.OrgDomain, basemodel.BaseApiError) {
stored := StoredDomain{}
err := m.DB().Get(&stored, `SELECT * FROM org_domains WHERE name=$1 LIMIT 1`, name)
if err != nil { if err != nil {
zap.S().Errorf("failed to parse domain id from relay state", err) if err == sql.ErrNoRows {
return nil, fmt.Errorf("failed to parse response from IdP response") return nil, model.BadRequest(fmt.Errorf("invalid domain name"))
}
return nil, model.InternalError(err)
} }
domain, err := m.GetDomain(ctx, domainId) domain := &model.OrgDomain{Id: stored.Id, Name: stored.Name, OrgId: stored.OrgId}
if (err != nil) || domain == nil { if err := domain.LoadConfig(stored.Data); err != nil {
zap.S().Errorf("failed to find domain received in IdP response", err.Error()) return domain, model.InternalError(err)
return nil, fmt.Errorf("invalid credentials")
} }
return domain, nil return domain, nil
} }

View File

@ -2,6 +2,7 @@ package model
import ( import (
"fmt" "fmt"
basemodel "go.signoz.io/signoz/pkg/query-service/model" basemodel "go.signoz.io/signoz/pkg/query-service/model"
) )
@ -61,7 +62,6 @@ func InternalError(err error) *ApiError {
} }
} }
// InternalErrorStr returns a ApiError object of internal type for string input // InternalErrorStr returns a ApiError object of internal type for string input
func InternalErrorStr(s string) *ApiError { func InternalErrorStr(s string) *ApiError {
return &ApiError{ return &ApiError{
@ -69,6 +69,7 @@ func InternalErrorStr(s string) *ApiError {
Err: fmt.Errorf(s), Err: fmt.Errorf(s),
} }
} }
var ( var (
ErrorNone basemodel.ErrorType = "" ErrorNone basemodel.ErrorType = ""
ErrorTimeout basemodel.ErrorType = "timeout" ErrorTimeout basemodel.ErrorType = "timeout"

View File

@ -3440,7 +3440,6 @@ func (r *ClickHouseReader) GetLogFields(ctx context.Context) (*model.GetFieldsRe
extractSelectedAndInterestingFields(statements[0].Statement, constants.Attributes, &attributes, &response) extractSelectedAndInterestingFields(statements[0].Statement, constants.Attributes, &attributes, &response)
extractSelectedAndInterestingFields(statements[0].Statement, constants.Resources, &resources, &response) extractSelectedAndInterestingFields(statements[0].Statement, constants.Resources, &resources, &response)
extractSelectedAndInterestingFields(statements[0].Statement, constants.Static, &constants.StaticInterestingLogFields, &response)
return &response, nil return &response, nil
} }
@ -3448,7 +3447,8 @@ func (r *ClickHouseReader) GetLogFields(ctx context.Context) (*model.GetFieldsRe
func extractSelectedAndInterestingFields(tableStatement string, fieldType string, fields *[]model.LogField, response *model.GetFieldsResponse) { func extractSelectedAndInterestingFields(tableStatement string, fieldType string, fields *[]model.LogField, response *model.GetFieldsResponse) {
for _, field := range *fields { for _, field := range *fields {
field.Type = fieldType field.Type = fieldType
if isSelectedField(tableStatement, field.Name) { // all static fields are assumed to be selected as we don't allow changing them
if isSelectedField(tableStatement, field) {
response.Selected = append(response.Selected, field) response.Selected = append(response.Selected, field)
} else { } else {
response.Interesting = append(response.Interesting, field) response.Interesting = append(response.Interesting, field)
@ -3456,28 +3456,72 @@ func extractSelectedAndInterestingFields(tableStatement string, fieldType string
} }
} }
func isSelectedField(tableStatement, field string) bool { func isSelectedField(tableStatement string, field model.LogField) bool {
return strings.Contains(tableStatement, fmt.Sprintf("INDEX %s_idx", field)) // in case of attributes and resources, if there is a materialized column present then it is selected
// TODO: handle partial change complete eg:- index is removed but materialized column is still present
name := utils.GetClickhouseColumnName(field.Type, field.DataType, field.Name)
return strings.Contains(tableStatement, fmt.Sprintf("`%s`", name))
} }
func (r *ClickHouseReader) UpdateLogField(ctx context.Context, field *model.UpdateField) *model.ApiError { func (r *ClickHouseReader) UpdateLogField(ctx context.Context, field *model.UpdateField) *model.ApiError {
// don't allow updating static fields
if field.Type == constants.Static {
err := errors.New("cannot update static fields")
return &model.ApiError{Err: err, Typ: model.ErrorBadData}
}
colname := utils.GetClickhouseColumnName(field.Type, field.DataType, field.Name)
// if a field is selected it means that the field needs to be indexed // if a field is selected it means that the field needs to be indexed
if field.Selected { if field.Selected {
// if the type is attribute or resource, create the materialized column first keyColName := fmt.Sprintf("%s_%s_key", field.Type, strings.ToLower(field.DataType))
if field.Type == constants.Attributes || field.Type == constants.Resources { valueColName := fmt.Sprintf("%s_%s_value", field.Type, strings.ToLower(field.DataType))
// create materialized
query := fmt.Sprintf("ALTER TABLE %s.%s ON CLUSTER %s ADD COLUMN IF NOT EXISTS %s %s MATERIALIZED %s_%s_value[indexOf(%s_%s_key, '%s')] CODEC(LZ4)", r.logsDB, r.logsLocalTable, cluster, field.Name, field.DataType, field.Type, strings.ToLower(field.DataType), field.Type, strings.ToLower(field.DataType), field.Name)
err := r.db.Exec(ctx, query) // create materialized column
if err != nil { query := fmt.Sprintf("ALTER TABLE %s.%s ON CLUSTER %s ADD COLUMN IF NOT EXISTS %s %s MATERIALIZED %s[indexOf(%s, '%s')] CODEC(ZSTD(1))",
return &model.ApiError{Err: err, Typ: model.ErrorInternal} r.logsDB, r.logsLocalTable,
} cluster,
colname, field.DataType,
valueColName,
keyColName,
field.Name,
)
err := r.db.Exec(ctx, query)
if err != nil {
return &model.ApiError{Err: err, Typ: model.ErrorInternal}
}
query = fmt.Sprintf("ALTER TABLE %s.%s ON CLUSTER %s ADD COLUMN IF NOT EXISTS %s %s MATERIALIZED -1", r.logsDB, r.logsTable, cluster, field.Name, field.DataType) query = fmt.Sprintf("ALTER TABLE %s.%s ON CLUSTER %s ADD COLUMN IF NOT EXISTS %s %s MATERIALIZED -1",
err = r.db.Exec(ctx, query) r.logsDB, r.logsTable,
if err != nil { cluster,
return &model.ApiError{Err: err, Typ: model.ErrorInternal} colname, field.DataType,
} )
err = r.db.Exec(ctx, query)
if err != nil {
return &model.ApiError{Err: err, Typ: model.ErrorInternal}
}
// create exists column
query = fmt.Sprintf("ALTER TABLE %s.%s ON CLUSTER %s ADD COLUMN IF NOT EXISTS %s_exists bool MATERIALIZED if(indexOf(%s, '%s') != 0, true, false) CODEC(ZSTD(1))",
r.logsDB, r.logsLocalTable,
cluster,
colname,
keyColName,
field.Name,
)
err = r.db.Exec(ctx, query)
if err != nil {
return &model.ApiError{Err: err, Typ: model.ErrorInternal}
}
query = fmt.Sprintf("ALTER TABLE %s.%s ON CLUSTER %s ADD COLUMN IF NOT EXISTS %s_exists bool MATERIALIZED false",
r.logsDB, r.logsTable,
cluster,
colname,
)
err = r.db.Exec(ctx, query)
if err != nil {
return &model.ApiError{Err: err, Typ: model.ErrorInternal}
} }
// create the index // create the index
@ -3487,20 +3531,52 @@ func (r *ClickHouseReader) UpdateLogField(ctx context.Context, field *model.Upda
if field.IndexGranularity == 0 { if field.IndexGranularity == 0 {
field.IndexGranularity = constants.DefaultLogSkipIndexGranularity field.IndexGranularity = constants.DefaultLogSkipIndexGranularity
} }
query := fmt.Sprintf("ALTER TABLE %s.%s ON CLUSTER %s ADD INDEX IF NOT EXISTS %s_idx (%s) TYPE %s GRANULARITY %d", r.logsDB, r.logsLocalTable, cluster, field.Name, field.Name, field.IndexType, field.IndexGranularity) query = fmt.Sprintf("ALTER TABLE %s.%s ON CLUSTER %s ADD INDEX IF NOT EXISTS %s_idx (%s) TYPE %s GRANULARITY %d",
err := r.db.Exec(ctx, query) r.logsDB, r.logsLocalTable,
cluster,
colname,
colname,
field.IndexType,
field.IndexGranularity,
)
err = r.db.Exec(ctx, query)
if err != nil { if err != nil {
return &model.ApiError{Err: err, Typ: model.ErrorInternal} return &model.ApiError{Err: err, Typ: model.ErrorInternal}
} }
} else { } else {
// remove index // Delete the index first
query := fmt.Sprintf("ALTER TABLE %s.%s ON CLUSTER %s DROP INDEX IF EXISTS %s_idx", r.logsDB, r.logsLocalTable, cluster, field.Name) query := fmt.Sprintf("ALTER TABLE %s.%s ON CLUSTER %s DROP INDEX IF EXISTS %s_idx", r.logsDB, r.logsLocalTable, cluster, colname)
err := r.db.Exec(ctx, query) err := r.db.Exec(ctx, query)
// we are ignoring errors with code 341 as it is an error with updating old part https://github.com/SigNoz/engineering-pod/issues/919#issuecomment-1366344346 if err != nil {
if err != nil && !strings.HasPrefix(err.Error(), "code: 341") {
return &model.ApiError{Err: err, Typ: model.ErrorInternal} return &model.ApiError{Err: err, Typ: model.ErrorInternal}
} }
for _, table := range []string{r.logsLocalTable, r.logsTable} {
// drop materialized column from logs table
query := "ALTER TABLE %s.%s ON CLUSTER %s DROP COLUMN IF EXISTS %s "
err := r.db.Exec(ctx, fmt.Sprintf(query,
r.logsDB, table,
cluster,
colname,
),
)
if err != nil {
return &model.ApiError{Err: err, Typ: model.ErrorInternal}
}
// drop exists column on logs table
query = "ALTER TABLE %s.%s ON CLUSTER %s DROP COLUMN IF EXISTS %s_exists "
err = r.db.Exec(ctx, fmt.Sprintf(query,
r.logsDB, table,
cluster,
colname,
),
)
if err != nil {
return &model.ApiError{Err: err, Typ: model.ErrorInternal}
}
}
} }
return nil return nil
} }
@ -3897,8 +3973,11 @@ func (r *ClickHouseReader) GetLatencyMetricMetadata(ctx context.Context, metricN
}, nil }, nil
} }
func isColumn(tableStatement, field string) bool { func isColumn(tableStatement, attrType, field, datType string) bool {
return strings.Contains(tableStatement, fmt.Sprintf("`%s` ", field)) // value of attrType will be `resource` or `tag`, if `tag` change it to `attribute`
name := utils.GetClickhouseColumnName(attrType, datType, field)
return strings.Contains(tableStatement, fmt.Sprintf("`%s` ", name))
} }
func (r *ClickHouseReader) GetLogAggregateAttributes(ctx context.Context, req *v3.AggregateAttributeRequest) (*v3.AggregateAttributeResponse, error) { func (r *ClickHouseReader) GetLogAggregateAttributes(ctx context.Context, req *v3.AggregateAttributeRequest) (*v3.AggregateAttributeResponse, error) {
@ -3970,7 +4049,7 @@ func (r *ClickHouseReader) GetLogAggregateAttributes(ctx context.Context, req *v
Key: tagKey, Key: tagKey,
DataType: v3.AttributeKeyDataType(dataType), DataType: v3.AttributeKeyDataType(dataType),
Type: v3.AttributeKeyType(attType), Type: v3.AttributeKeyType(attType),
IsColumn: isColumn(statements[0].Statement, tagKey), IsColumn: isColumn(statements[0].Statement, attType, tagKey, dataType),
} }
response.AttributeKeys = append(response.AttributeKeys, key) response.AttributeKeys = append(response.AttributeKeys, key)
} }
@ -4025,7 +4104,7 @@ func (r *ClickHouseReader) GetLogAttributeKeys(ctx context.Context, req *v3.Filt
Key: attributeKey, Key: attributeKey,
DataType: v3.AttributeKeyDataType(attributeDataType), DataType: v3.AttributeKeyDataType(attributeDataType),
Type: v3.AttributeKeyType(tagType), Type: v3.AttributeKeyType(tagType),
IsColumn: isColumn(statements[0].Statement, attributeKey), IsColumn: isColumn(statements[0].Statement, tagType, attributeKey, attributeDataType),
} }
response.AttributeKeys = append(response.AttributeKeys, key) response.AttributeKeys = append(response.AttributeKeys, key)

View File

@ -9,6 +9,7 @@ import (
"go.signoz.io/signoz/pkg/query-service/constants" "go.signoz.io/signoz/pkg/query-service/constants"
"go.signoz.io/signoz/pkg/query-service/model" "go.signoz.io/signoz/pkg/query-service/model"
"go.signoz.io/signoz/pkg/query-service/utils"
) )
var operatorMapping = map[string]string{ var operatorMapping = map[string]string{
@ -304,6 +305,11 @@ func replaceFieldInToken(queryToken string, selectedFieldsLookup map[string]mode
} else if strings.Compare(strings.ToLower(*col), "fulltext") != 0 && field.Type != constants.Static { } else if strings.Compare(strings.ToLower(*col), "fulltext") != 0 && field.Type != constants.Static {
return "", fmt.Errorf("field not found for filtering") return "", fmt.Errorf("field not found for filtering")
} }
} else {
field := selectedFieldsLookup[sqlColName]
if field.Type != constants.Static {
sqlColName = utils.GetClickhouseColumnName(field.Type, field.DataType, field.Name)
}
} }
} }
return strings.Replace(queryToken, *col, sqlColName, 1), nil return strings.Replace(queryToken, *col, sqlColName, 1), nil

View File

@ -252,7 +252,7 @@ func TestReplaceInterestingFields(t *testing.T) {
}, },
} }
expectedTokens := []string{"attributes_int64_value[indexOf(attributes_int64_key, 'id.userid')] IN (100) ", "and id_key >= 50 ", `AND body ILIKE '%searchstring%'`} expectedTokens := []string{"attributes_int64_value[indexOf(attributes_int64_key, 'id.userid')] IN (100) ", "and attribute_int64_id_key >= 50 ", `AND body ILIKE '%searchstring%'`}
Convey("testInterestingFields", t, func() { Convey("testInterestingFields", t, func() {
tokens, err := replaceInterestingFields(&allFields, queryTokens) tokens, err := replaceInterestingFields(&allFields, queryTokens)
So(err, ShouldBeNil) So(err, ShouldBeNil)
@ -340,6 +340,11 @@ var generateSQLQueryFields = model.GetFieldsResponse{
DataType: "string", DataType: "string",
Type: "attributes", Type: "attributes",
}, },
{
Name: "severity_number",
DataType: "string",
Type: "static",
},
}, },
Interesting: []model.LogField{ Interesting: []model.LogField{
{ {
@ -369,7 +374,7 @@ var generateSQLQueryTestCases = []struct {
IdGt: "2BsKLKv8cZrLCn6rkOcRGkdjBdM", IdGt: "2BsKLKv8cZrLCn6rkOcRGkdjBdM",
IdLT: "2BsKG6tRpFWjYMcWsAGKfSxoQdU", IdLT: "2BsKG6tRpFWjYMcWsAGKfSxoQdU",
}, },
SqlFilter: "( timestamp >= '1657689292000' and timestamp <= '1657689294000' and id > '2BsKLKv8cZrLCn6rkOcRGkdjBdM' and id < '2BsKG6tRpFWjYMcWsAGKfSxoQdU' ) and ( field1 < 100 and field1 > 50 and attributes_int64_value[indexOf(attributes_int64_key, 'code')] <= 500 and attributes_int64_value[indexOf(attributes_int64_key, 'code')] >= 400 ) ", SqlFilter: "( timestamp >= '1657689292000' and timestamp <= '1657689294000' and id > '2BsKLKv8cZrLCn6rkOcRGkdjBdM' and id < '2BsKG6tRpFWjYMcWsAGKfSxoQdU' ) and ( attribute_int64_field1 < 100 and attribute_int64_field1 > 50 and attributes_int64_value[indexOf(attributes_int64_key, 'code')] <= 500 and attributes_int64_value[indexOf(attributes_int64_key, 'code')] >= 400 ) ",
}, },
{ {
Name: "second query with only timestamp range", Name: "second query with only timestamp range",
@ -378,7 +383,7 @@ var generateSQLQueryTestCases = []struct {
TimestampStart: uint64(1657689292000), TimestampStart: uint64(1657689292000),
TimestampEnd: uint64(1657689294000), TimestampEnd: uint64(1657689294000),
}, },
SqlFilter: "( timestamp >= '1657689292000' and timestamp <= '1657689294000' ) and ( field1 < 100 and field1 > 50 and attributes_int64_value[indexOf(attributes_int64_key, 'code')] <= 500 and attributes_int64_value[indexOf(attributes_int64_key, 'code')] >= 400 ) ", SqlFilter: "( timestamp >= '1657689292000' and timestamp <= '1657689294000' ) and ( attribute_int64_field1 < 100 and attribute_int64_field1 > 50 and attributes_int64_value[indexOf(attributes_int64_key, 'code')] <= 500 and attributes_int64_value[indexOf(attributes_int64_key, 'code')] >= 400 ) ",
}, },
{ {
Name: "generate case sensitive query", Name: "generate case sensitive query",
@ -387,7 +392,7 @@ var generateSQLQueryTestCases = []struct {
TimestampStart: uint64(1657689292000), TimestampStart: uint64(1657689292000),
TimestampEnd: uint64(1657689294000), TimestampEnd: uint64(1657689294000),
}, },
SqlFilter: "( timestamp >= '1657689292000' and timestamp <= '1657689294000' ) and ( field1 < 100 and attributes_int64_value[indexOf(attributes_int64_key, 'FielD1')] > 50 and Field2 > 10 and attributes_int64_value[indexOf(attributes_int64_key, 'code')] <= 500 and attributes_int64_value[indexOf(attributes_int64_key, 'code')] >= 400 ) ", SqlFilter: "( timestamp >= '1657689292000' and timestamp <= '1657689294000' ) and ( attribute_int64_field1 < 100 and attributes_int64_value[indexOf(attributes_int64_key, 'FielD1')] > 50 and attribute_double64_Field2 > 10 and attributes_int64_value[indexOf(attributes_int64_key, 'code')] <= 500 and attributes_int64_value[indexOf(attributes_int64_key, 'code')] >= 400 ) ",
}, },
{ {
Name: "Check exists and not exists", Name: "Check exists and not exists",
@ -396,7 +401,16 @@ var generateSQLQueryTestCases = []struct {
TimestampStart: uint64(1657689292000), TimestampStart: uint64(1657689292000),
TimestampEnd: uint64(1657689294000), TimestampEnd: uint64(1657689294000),
}, },
SqlFilter: "( timestamp >= '1657689292000' and timestamp <= '1657689294000' ) and ( has(attributes_int64_key, 'field1') and NOT has(attributes_double64_key, 'Field2') and Field2 > 10 ) ", SqlFilter: "( timestamp >= '1657689292000' and timestamp <= '1657689294000' ) and ( has(attributes_int64_key, 'field1') and NOT has(attributes_double64_key, 'Field2') and attribute_double64_Field2 > 10 ) ",
},
{
Name: "Check top level key filter",
Filter: model.LogsFilterParams{
Query: "severity_number in (1)",
TimestampStart: uint64(1657689292000),
TimestampEnd: uint64(1657689294000),
},
SqlFilter: "( timestamp >= '1657689292000' and timestamp <= '1657689294000' ) and ( severity_number IN (1) ) ",
}, },
{ {
Name: "Check exists and not exists on top level keys", Name: "Check exists and not exists on top level keys",

View File

@ -83,7 +83,17 @@ func getClickhouseColumnName(key v3.AttributeKey) string {
columnType := getClickhouseLogsColumnType(key.Type) columnType := getClickhouseLogsColumnType(key.Type)
columnDataType := getClickhouseLogsColumnDataType(key.DataType) columnDataType := getClickhouseLogsColumnDataType(key.DataType)
clickhouseColumn = fmt.Sprintf("%s_%s_value[indexOf(%s_%s_key, '%s')]", columnType, columnDataType, columnType, columnDataType, key.Key) clickhouseColumn = fmt.Sprintf("%s_%s_value[indexOf(%s_%s_key, '%s')]", columnType, columnDataType, columnType, columnDataType, key.Key)
return clickhouseColumn
} }
// check if it is a static field
if key.Type == v3.AttributeKeyTypeUnspecified {
// name is the column name
return clickhouseColumn
}
// materialized column created from query
clickhouseColumn = utils.GetClickhouseColumnName(string(key.Type), string(key.DataType), key.Key)
return clickhouseColumn return clickhouseColumn
} }
@ -123,6 +133,7 @@ func buildLogsTimeSeriesFilterQuery(fs *v3.FilterSet, groupBy []v3.AttributeKey)
if err != nil { if err != nil {
return "", fmt.Errorf("failed to validate and cast value for %s: %v", item.Key.Key, err) return "", fmt.Errorf("failed to validate and cast value for %s: %v", item.Key.Key, err)
} }
if logsOp, ok := logOperators[op]; ok { if logsOp, ok := logOperators[op]; ok {
switch op { switch op {
case v3.FilterOperatorExists, v3.FilterOperatorNotExists: case v3.FilterOperatorExists, v3.FilterOperatorNotExists:
@ -153,6 +164,9 @@ func buildLogsTimeSeriesFilterQuery(fs *v3.FilterSet, groupBy []v3.AttributeKey)
columnType := getClickhouseLogsColumnType(attr.Type) columnType := getClickhouseLogsColumnType(attr.Type)
columnDataType := getClickhouseLogsColumnDataType(attr.DataType) columnDataType := getClickhouseLogsColumnDataType(attr.DataType)
conditions = append(conditions, fmt.Sprintf("indexOf(%s_%s_key, '%s') > 0", columnType, columnDataType, attr.Key)) conditions = append(conditions, fmt.Sprintf("indexOf(%s_%s_key, '%s') > 0", columnType, columnDataType, attr.Key))
} else if attr.Type != v3.AttributeKeyTypeUnspecified {
// for materialzied columns
conditions = append(conditions, fmt.Sprintf("%s_exists=true", getClickhouseColumnName(attr)))
} }
} }

View File

@ -26,7 +26,17 @@ var testGetClickhouseColumnNameData = []struct {
{ {
Name: "selected field", Name: "selected field",
AttributeKey: v3.AttributeKey{Key: "servicename", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: true}, AttributeKey: v3.AttributeKey{Key: "servicename", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: true},
ExpectedColumnName: "servicename", ExpectedColumnName: "attribute_string_servicename",
},
{
Name: "selected field resource",
AttributeKey: v3.AttributeKey{Key: "sdk_version", DataType: v3.AttributeKeyDataTypeInt64, Type: v3.AttributeKeyTypeResource, IsColumn: true},
ExpectedColumnName: "resource_int64_sdk_version",
},
{
Name: "selected field float",
AttributeKey: v3.AttributeKey{Key: "sdk_version", DataType: v3.AttributeKeyDataTypeFloat64, Type: v3.AttributeKeyTypeTag, IsColumn: true},
ExpectedColumnName: "attribute_float64_sdk_version",
}, },
{ {
Name: "same name as top level column", Name: "same name as top level column",
@ -35,7 +45,7 @@ var testGetClickhouseColumnNameData = []struct {
}, },
{ {
Name: "top level column", Name: "top level column",
AttributeKey: v3.AttributeKey{Key: "trace_id", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: true}, AttributeKey: v3.AttributeKey{Key: "trace_id", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeUnspecified, IsColumn: true},
ExpectedColumnName: "trace_id", ExpectedColumnName: "trace_id",
}, },
} }
@ -121,7 +131,7 @@ var timeSeriesFilterQueryData = []struct {
{Key: v3.AttributeKey{Key: "user_name", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: true}, Value: "john", Operator: "="}, {Key: v3.AttributeKey{Key: "user_name", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: true}, Value: "john", Operator: "="},
{Key: v3.AttributeKey{Key: "k8s_namespace", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeResource}, Value: "my_service", Operator: "!="}, {Key: v3.AttributeKey{Key: "k8s_namespace", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeResource}, Value: "my_service", Operator: "!="},
}}, }},
ExpectedFilter: " AND user_name = 'john' AND resources_string_value[indexOf(resources_string_key, 'k8s_namespace')] != 'my_service'", ExpectedFilter: " AND attribute_string_user_name = 'john' AND resources_string_value[indexOf(resources_string_key, 'k8s_namespace')] != 'my_service'",
}, },
{ {
Name: "Test like", Name: "Test like",
@ -184,7 +194,7 @@ var timeSeriesFilterQueryData = []struct {
FilterSet: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{ FilterSet: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{
{Key: v3.AttributeKey{Key: "host", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: true}, Value: "host: \"(?P<host>\\S+)\"", Operator: "regex"}, {Key: v3.AttributeKey{Key: "host", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: true}, Value: "host: \"(?P<host>\\S+)\"", Operator: "regex"},
}}, }},
ExpectedFilter: " AND match(host, 'host: \"(?P<host>\\\\S+)\"')", ExpectedFilter: " AND match(attribute_string_host, 'host: \"(?P<host>\\\\S+)\"')",
}, },
{ {
Name: "Test not regex", Name: "Test not regex",
@ -207,7 +217,7 @@ var timeSeriesFilterQueryData = []struct {
{Key: v3.AttributeKey{Key: "host", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}, Value: "102.", Operator: "ncontains"}, {Key: v3.AttributeKey{Key: "host", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}, Value: "102.", Operator: "ncontains"},
}}, }},
GroupBy: []v3.AttributeKey{{Key: "host", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: true}}, GroupBy: []v3.AttributeKey{{Key: "host", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: true}},
ExpectedFilter: " AND attributes_string_value[indexOf(attributes_string_key, 'host')] NOT ILIKE '%102.%'", ExpectedFilter: " AND attributes_string_value[indexOf(attributes_string_key, 'host')] NOT ILIKE '%102.%' AND attribute_string_host_exists=true",
}, },
{ {
Name: "Wrong data", Name: "Wrong data",

View File

@ -165,7 +165,7 @@ func ResetPassword(ctx context.Context, req *model.ResetPasswordRequest) error {
return errors.New("Invalid reset password request") return errors.New("Invalid reset password request")
} }
hash, err := passwordHash(req.Password) hash, err := PasswordHash(req.Password)
if err != nil { if err != nil {
return errors.Wrap(err, "Failed to generate password hash") return errors.Wrap(err, "Failed to generate password hash")
} }
@ -192,7 +192,7 @@ func ChangePassword(ctx context.Context, req *model.ChangePasswordRequest) error
return ErrorInvalidCreds return ErrorInvalidCreds
} }
hash, err := passwordHash(req.NewPassword) hash, err := PasswordHash(req.NewPassword)
if err != nil { if err != nil {
return errors.Wrap(err, "Failed to generate password hash") return errors.Wrap(err, "Failed to generate password hash")
} }
@ -243,7 +243,7 @@ func RegisterFirstUser(ctx context.Context, req *RegisterRequest) (*model.User,
var hash string var hash string
var err error var err error
hash, err = passwordHash(req.Password) hash, err = PasswordHash(req.Password)
if err != nil { if err != nil {
zap.S().Errorf("failed to generate password hash when registering a user", zap.Error(err)) zap.S().Errorf("failed to generate password hash when registering a user", zap.Error(err))
return nil, model.InternalError(model.ErrSignupFailed{}) return nil, model.InternalError(model.ErrSignupFailed{})
@ -314,13 +314,13 @@ func RegisterInvitedUser(ctx context.Context, req *RegisterRequest, nopassword b
// check if password is not empty, as for SSO case it can be // check if password is not empty, as for SSO case it can be
if req.Password != "" { if req.Password != "" {
hash, err = passwordHash(req.Password) hash, err = PasswordHash(req.Password)
if err != nil { if err != nil {
zap.S().Errorf("failed to generate password hash when registering a user", zap.Error(err)) zap.S().Errorf("failed to generate password hash when registering a user", zap.Error(err))
return nil, model.InternalError(model.ErrSignupFailed{}) return nil, model.InternalError(model.ErrSignupFailed{})
} }
} else { } else {
hash, err = passwordHash(utils.GeneratePassowrd()) hash, err = PasswordHash(utils.GeneratePassowrd())
if err != nil { if err != nil {
zap.S().Errorf("failed to generate password hash when registering a user", zap.Error(err)) zap.S().Errorf("failed to generate password hash when registering a user", zap.Error(err))
return nil, model.InternalError(model.ErrSignupFailed{}) return nil, model.InternalError(model.ErrSignupFailed{})
@ -419,7 +419,7 @@ func authenticateLogin(ctx context.Context, req *model.LoginRequest) (*model.Use
} }
// Generate hash from the password. // Generate hash from the password.
func passwordHash(pass string) (string, error) { func PasswordHash(pass string) (string, error) {
hash, err := bcrypt.GenerateFromPassword([]byte(pass), bcrypt.DefaultCost) hash, err := bcrypt.GenerateFromPassword([]byte(pass), bcrypt.DefaultCost)
if err != nil { if err != nil {
return "", err return "", err

View File

@ -221,22 +221,17 @@ const (
UINT8 = "Uint8" UINT8 = "Uint8"
) )
var StaticInterestingLogFields = []model.LogField{ var StaticSelectedLogFields = []model.LogField{
{ {
Name: "trace_id", Name: "timestamp",
DataType: STRING,
Type: Static,
},
{
Name: "span_id",
DataType: STRING,
Type: Static,
},
{
Name: "trace_flags",
DataType: UINT32, DataType: UINT32,
Type: Static, Type: Static,
}, },
{
Name: "id",
DataType: STRING,
Type: Static,
},
{ {
Name: "severity_text", Name: "severity_text",
DataType: LOWCARDINALITY_STRING, DataType: LOWCARDINALITY_STRING,
@ -247,16 +242,18 @@ var StaticInterestingLogFields = []model.LogField{
DataType: UINT8, DataType: UINT8,
Type: Static, Type: Static,
}, },
}
var StaticSelectedLogFields = []model.LogField{
{ {
Name: "timestamp", Name: "trace_flags",
DataType: UINT32, DataType: UINT32,
Type: Static, Type: Static,
}, },
{ {
Name: "id", Name: "trace_id",
DataType: STRING,
Type: Static,
},
{
Name: "span_id",
DataType: STRING, DataType: STRING,
Type: Static, Type: Static,
}, },

View File

@ -1,9 +1,8 @@
version: "2.4" version: "2.4"
x-clickhouse-defaults: x-clickhouse-defaults: &clickhouse-defaults
&clickhouse-defaults
restart: on-failure restart: on-failure
image: clickhouse/clickhouse-server:22.8.8-alpine image: clickhouse/clickhouse-server:23.7.3-alpine
tty: true tty: true
depends_on: depends_on:
- zookeeper-1 - zookeeper-1
@ -32,8 +31,7 @@ x-clickhouse-defaults:
soft: 262144 soft: 262144
hard: 262144 hard: 262144
x-clickhouse-depends: x-clickhouse-depends: &clickhouse-depends
&clickhouse-depends
depends_on: depends_on:
clickhouse: clickhouse:
condition: service_healthy condition: service_healthy

View File

@ -7,6 +7,7 @@ import (
"strconv" "strconv"
"strings" "strings"
"go.signoz.io/signoz/pkg/query-service/constants"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3" v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -230,6 +231,19 @@ func getPointerValue(v interface{}) interface{} {
} }
} }
func GetClickhouseColumnName(typeName string, dataType, field string) string {
if typeName == string(v3.AttributeKeyTypeTag) {
typeName = constants.Attributes
}
if typeName != string(v3.AttributeKeyTypeResource) {
typeName = typeName[:len(typeName)-1]
}
colName := fmt.Sprintf("%s_%s_%s", strings.ToLower(typeName), strings.ToLower(dataType), field)
return colName
}
// GetEpochNanoSecs takes epoch and returns it in ns // GetEpochNanoSecs takes epoch and returns it in ns
func GetEpochNanoSecs(epoch int64) int64 { func GetEpochNanoSecs(epoch int64) int64 {
temp := epoch temp := epoch

View File

@ -4,6 +4,7 @@ import (
"reflect" "reflect"
"testing" "testing"
"go.signoz.io/signoz/pkg/query-service/constants"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3" v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
) )
@ -388,6 +389,54 @@ func TestClickHouseFormattedValue(t *testing.T) {
} }
} }
var testGetClickhouseColumnName = []struct {
name string
typeName string
dataType string
field string
want string
}{
{
name: "tag",
typeName: string(v3.AttributeKeyTypeTag),
dataType: string(v3.AttributeKeyDataTypeInt64),
field: "tag1",
want: "attribute_int64_tag1",
},
{
name: "resource",
typeName: string(v3.AttributeKeyTypeResource),
dataType: string(v3.AttributeKeyDataTypeInt64),
field: "tag1",
want: "resource_int64_tag1",
},
{
name: "attribute old parser",
typeName: constants.Attributes,
dataType: string(v3.AttributeKeyDataTypeInt64),
field: "tag1",
want: "attribute_int64_tag1",
},
{
name: "resource old parser",
typeName: constants.Resources,
dataType: string(v3.AttributeKeyDataTypeInt64),
field: "tag1",
want: "resource_int64_tag1",
},
}
func TestGetClickhouseColumnName(t *testing.T) {
for _, tt := range testGetClickhouseColumnName {
t.Run(tt.name, func(t *testing.T) {
got := GetClickhouseColumnName(tt.typeName, tt.dataType, tt.field)
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("ClickHouseFormattedValue() = %v, want %v", got, tt.want)
}
})
}
}
var testGetEpochNanoSecsData = []struct { var testGetEpochNanoSecsData = []struct {
Name string Name string
Epoch int64 Epoch int64