From 5d6bb18679d54106d7a4eb295dd7d3702a668021 Mon Sep 17 00:00:00 2001 From: Nityananda Gohain Date: Wed, 23 Aug 2023 10:27:33 +0530 Subject: [PATCH 1/3] fix: disable proxy buffering and cache (#3399) * fix: disable proxy buffering and cache * fix: changed for docker swarm * fix: nginx config updated only for live tail * fix: corrected value of buffering --------- Co-authored-by: Prashant Shahi --- deploy/docker-swarm/common/nginx-config.conf | 12 ++++++++++++ deploy/docker/common/nginx-config.conf | 12 ++++++++++++ 2 files changed, 24 insertions(+) diff --git a/deploy/docker-swarm/common/nginx-config.conf b/deploy/docker-swarm/common/nginx-config.conf index a8673496a2..158effc8bf 100644 --- a/deploy/docker-swarm/common/nginx-config.conf +++ b/deploy/docker-swarm/common/nginx-config.conf @@ -28,6 +28,18 @@ server { proxy_pass http://alertmanager:9093/api/v2; } + location ~ ^/api/(v1|v3)/logs/(tail|livetail){ + proxy_pass http://query-service:8080; + proxy_http_version 1.1; + + # connection will be closed if no data is read for 600s between successive read operations + proxy_read_timeout 600s; + + # dont buffer the data send it directly to client. + proxy_buffering off; + proxy_cache off; + } + location /api { proxy_pass http://query-service:8080/api; # connection will be closed if no data is read for 600s between successive read operations diff --git a/deploy/docker/common/nginx-config.conf b/deploy/docker/common/nginx-config.conf index a8673496a2..158effc8bf 100644 --- a/deploy/docker/common/nginx-config.conf +++ b/deploy/docker/common/nginx-config.conf @@ -28,6 +28,18 @@ server { proxy_pass http://alertmanager:9093/api/v2; } + location ~ ^/api/(v1|v3)/logs/(tail|livetail){ + proxy_pass http://query-service:8080; + proxy_http_version 1.1; + + # connection will be closed if no data is read for 600s between successive read operations + proxy_read_timeout 600s; + + # dont buffer the data send it directly to client. + proxy_buffering off; + proxy_cache off; + } + location /api { proxy_pass http://query-service:8080/api; # connection will be closed if no data is read for 600s between successive read operations From ee6b290a0ca2300412427824f44487462794d1d8 Mon Sep 17 00:00:00 2001 From: Nityananda Gohain Date: Wed, 23 Aug 2023 15:03:24 +0530 Subject: [PATCH 2/3] =?UTF-8?q?feat:=20get=20and=20update=20log=20fields?= =?UTF-8?q?=20updated=20to=20support=20new=20materialized=20c=E2=80=A6=20(?= =?UTF-8?q?#3275)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: get and update log fields updated to support new materialized columns and index * feat: get attribute keys logic updated * feat: logs qb changes to support new materialized columns * feat: don't allow updating static top level fields * feat: old logs parser updated * feat: upgrade clickhouse * fix: columnname format updated for materialized columns * feat: get fields updated * feat: minor changes * fix: default values for distributed columns * feat: common function for for clickhouse formatted column name * chore: remove interesting selected fields * fix: update clickhouse version in other place --------- Co-authored-by: Prashant Shahi --- .../clickhouse-setup/docker-compose.yaml | 8 +- .../clickhouse-setup/docker-compose-core.yaml | 2 +- .../clickhouse-setup/docker-compose.yaml | 9 +- .../app/clickhouseReader/reader.go | 133 ++++++++++++++---- pkg/query-service/app/logs/parser.go | 6 + pkg/query-service/app/logs/parser_test.go | 24 +++- .../app/logs/v3/query_builder.go | 14 ++ .../app/logs/v3/query_builder_test.go | 20 ++- pkg/query-service/constants/constants.go | 31 ++-- .../tests/test-deploy/docker-compose.yaml | 8 +- pkg/query-service/utils/format.go | 14 ++ pkg/query-service/utils/format_test.go | 49 +++++++ 12 files changed, 248 insertions(+), 70 deletions(-) diff --git a/deploy/docker-swarm/clickhouse-setup/docker-compose.yaml b/deploy/docker-swarm/clickhouse-setup/docker-compose.yaml index 458557c5ad..96890f7f63 100644 --- a/deploy/docker-swarm/clickhouse-setup/docker-compose.yaml +++ b/deploy/docker-swarm/clickhouse-setup/docker-compose.yaml @@ -1,8 +1,7 @@ version: "3.9" -x-clickhouse-defaults: - &clickhouse-defaults - image: clickhouse/clickhouse-server:22.8.8-alpine +x-clickhouse-defaults: &clickhouse-defaults + image: clickhouse/clickhouse-server:23.7.3-alpine tty: true deploy: restart_policy: @@ -34,8 +33,7 @@ x-clickhouse-defaults: soft: 262144 hard: 262144 -x-clickhouse-depend: - &clickhouse-depend +x-clickhouse-depend: &clickhouse-depend depends_on: - clickhouse # - clickhouse-2 diff --git a/deploy/docker/clickhouse-setup/docker-compose-core.yaml b/deploy/docker/clickhouse-setup/docker-compose-core.yaml index ae40af5209..f0ef45ee10 100644 --- a/deploy/docker/clickhouse-setup/docker-compose-core.yaml +++ b/deploy/docker/clickhouse-setup/docker-compose-core.yaml @@ -2,7 +2,7 @@ version: "2.4" services: clickhouse: - image: clickhouse/clickhouse-server:22.8.8-alpine + image: clickhouse/clickhouse-server:23.7.3-alpine container_name: signoz-clickhouse # ports: # - "9000:9000" diff --git a/deploy/docker/clickhouse-setup/docker-compose.yaml b/deploy/docker/clickhouse-setup/docker-compose.yaml index 7f94a52602..27087ac42d 100644 --- a/deploy/docker/clickhouse-setup/docker-compose.yaml +++ b/deploy/docker/clickhouse-setup/docker-compose.yaml @@ -1,9 +1,9 @@ version: "2.4" -x-clickhouse-defaults: - &clickhouse-defaults +x-clickhouse-defaults: &clickhouse-defaults restart: on-failure - image: clickhouse/clickhouse-server:22.8.8-alpine + # addding non LTS version due to this fix https://github.com/ClickHouse/ClickHouse/commit/32caf8716352f45c1b617274c7508c86b7d1afab + image: clickhouse/clickhouse-server:23.7.3-alpine tty: true depends_on: - zookeeper-1 @@ -32,8 +32,7 @@ x-clickhouse-defaults: soft: 262144 hard: 262144 -x-clickhouse-depend: - &clickhouse-depend +x-clickhouse-depend: &clickhouse-depend depends_on: clickhouse: condition: service_healthy diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index 1d02035719..4a87e2419b 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -3440,7 +3440,6 @@ func (r *ClickHouseReader) GetLogFields(ctx context.Context) (*model.GetFieldsRe extractSelectedAndInterestingFields(statements[0].Statement, constants.Attributes, &attributes, &response) extractSelectedAndInterestingFields(statements[0].Statement, constants.Resources, &resources, &response) - extractSelectedAndInterestingFields(statements[0].Statement, constants.Static, &constants.StaticInterestingLogFields, &response) return &response, nil } @@ -3448,7 +3447,8 @@ func (r *ClickHouseReader) GetLogFields(ctx context.Context) (*model.GetFieldsRe func extractSelectedAndInterestingFields(tableStatement string, fieldType string, fields *[]model.LogField, response *model.GetFieldsResponse) { for _, field := range *fields { field.Type = fieldType - if isSelectedField(tableStatement, field.Name) { + // all static fields are assumed to be selected as we don't allow changing them + if isSelectedField(tableStatement, field) { response.Selected = append(response.Selected, field) } else { response.Interesting = append(response.Interesting, field) @@ -3456,28 +3456,72 @@ func extractSelectedAndInterestingFields(tableStatement string, fieldType string } } -func isSelectedField(tableStatement, field string) bool { - return strings.Contains(tableStatement, fmt.Sprintf("INDEX %s_idx", field)) +func isSelectedField(tableStatement string, field model.LogField) bool { + // in case of attributes and resources, if there is a materialized column present then it is selected + // TODO: handle partial change complete eg:- index is removed but materialized column is still present + name := utils.GetClickhouseColumnName(field.Type, field.DataType, field.Name) + return strings.Contains(tableStatement, fmt.Sprintf("`%s`", name)) } func (r *ClickHouseReader) UpdateLogField(ctx context.Context, field *model.UpdateField) *model.ApiError { + // don't allow updating static fields + if field.Type == constants.Static { + err := errors.New("cannot update static fields") + return &model.ApiError{Err: err, Typ: model.ErrorBadData} + } + + colname := utils.GetClickhouseColumnName(field.Type, field.DataType, field.Name) + // if a field is selected it means that the field needs to be indexed if field.Selected { - // if the type is attribute or resource, create the materialized column first - if field.Type == constants.Attributes || field.Type == constants.Resources { - // create materialized - query := fmt.Sprintf("ALTER TABLE %s.%s ON CLUSTER %s ADD COLUMN IF NOT EXISTS %s %s MATERIALIZED %s_%s_value[indexOf(%s_%s_key, '%s')] CODEC(LZ4)", r.logsDB, r.logsLocalTable, cluster, field.Name, field.DataType, field.Type, strings.ToLower(field.DataType), field.Type, strings.ToLower(field.DataType), field.Name) + keyColName := fmt.Sprintf("%s_%s_key", field.Type, strings.ToLower(field.DataType)) + valueColName := fmt.Sprintf("%s_%s_value", field.Type, strings.ToLower(field.DataType)) - err := r.db.Exec(ctx, query) - if err != nil { - return &model.ApiError{Err: err, Typ: model.ErrorInternal} - } + // create materialized column + query := fmt.Sprintf("ALTER TABLE %s.%s ON CLUSTER %s ADD COLUMN IF NOT EXISTS %s %s MATERIALIZED %s[indexOf(%s, '%s')] CODEC(ZSTD(1))", + r.logsDB, r.logsLocalTable, + cluster, + colname, field.DataType, + valueColName, + keyColName, + field.Name, + ) + err := r.db.Exec(ctx, query) + if err != nil { + return &model.ApiError{Err: err, Typ: model.ErrorInternal} + } - query = fmt.Sprintf("ALTER TABLE %s.%s ON CLUSTER %s ADD COLUMN IF NOT EXISTS %s %s MATERIALIZED -1", r.logsDB, r.logsTable, cluster, field.Name, field.DataType) - err = r.db.Exec(ctx, query) - if err != nil { - return &model.ApiError{Err: err, Typ: model.ErrorInternal} - } + query = fmt.Sprintf("ALTER TABLE %s.%s ON CLUSTER %s ADD COLUMN IF NOT EXISTS %s %s MATERIALIZED -1", + r.logsDB, r.logsTable, + cluster, + colname, field.DataType, + ) + err = r.db.Exec(ctx, query) + if err != nil { + return &model.ApiError{Err: err, Typ: model.ErrorInternal} + } + + // create exists column + query = fmt.Sprintf("ALTER TABLE %s.%s ON CLUSTER %s ADD COLUMN IF NOT EXISTS %s_exists bool MATERIALIZED if(indexOf(%s, '%s') != 0, true, false) CODEC(ZSTD(1))", + r.logsDB, r.logsLocalTable, + cluster, + colname, + keyColName, + field.Name, + ) + err = r.db.Exec(ctx, query) + if err != nil { + return &model.ApiError{Err: err, Typ: model.ErrorInternal} + } + + query = fmt.Sprintf("ALTER TABLE %s.%s ON CLUSTER %s ADD COLUMN IF NOT EXISTS %s_exists bool MATERIALIZED false", + r.logsDB, r.logsTable, + cluster, + colname, + ) + err = r.db.Exec(ctx, query) + if err != nil { + return &model.ApiError{Err: err, Typ: model.ErrorInternal} } // create the index @@ -3487,20 +3531,52 @@ func (r *ClickHouseReader) UpdateLogField(ctx context.Context, field *model.Upda if field.IndexGranularity == 0 { field.IndexGranularity = constants.DefaultLogSkipIndexGranularity } - query := fmt.Sprintf("ALTER TABLE %s.%s ON CLUSTER %s ADD INDEX IF NOT EXISTS %s_idx (%s) TYPE %s GRANULARITY %d", r.logsDB, r.logsLocalTable, cluster, field.Name, field.Name, field.IndexType, field.IndexGranularity) - err := r.db.Exec(ctx, query) + query = fmt.Sprintf("ALTER TABLE %s.%s ON CLUSTER %s ADD INDEX IF NOT EXISTS %s_idx (%s) TYPE %s GRANULARITY %d", + r.logsDB, r.logsLocalTable, + cluster, + colname, + colname, + field.IndexType, + field.IndexGranularity, + ) + err = r.db.Exec(ctx, query) if err != nil { return &model.ApiError{Err: err, Typ: model.ErrorInternal} } } else { - // remove index - query := fmt.Sprintf("ALTER TABLE %s.%s ON CLUSTER %s DROP INDEX IF EXISTS %s_idx", r.logsDB, r.logsLocalTable, cluster, field.Name) + // Delete the index first + query := fmt.Sprintf("ALTER TABLE %s.%s ON CLUSTER %s DROP INDEX IF EXISTS %s_idx", r.logsDB, r.logsLocalTable, cluster, colname) err := r.db.Exec(ctx, query) - // we are ignoring errors with code 341 as it is an error with updating old part https://github.com/SigNoz/engineering-pod/issues/919#issuecomment-1366344346 - if err != nil && !strings.HasPrefix(err.Error(), "code: 341") { + if err != nil { return &model.ApiError{Err: err, Typ: model.ErrorInternal} } + + for _, table := range []string{r.logsLocalTable, r.logsTable} { + // drop materialized column from logs table + query := "ALTER TABLE %s.%s ON CLUSTER %s DROP COLUMN IF EXISTS %s " + err := r.db.Exec(ctx, fmt.Sprintf(query, + r.logsDB, table, + cluster, + colname, + ), + ) + if err != nil { + return &model.ApiError{Err: err, Typ: model.ErrorInternal} + } + + // drop exists column on logs table + query = "ALTER TABLE %s.%s ON CLUSTER %s DROP COLUMN IF EXISTS %s_exists " + err = r.db.Exec(ctx, fmt.Sprintf(query, + r.logsDB, table, + cluster, + colname, + ), + ) + if err != nil { + return &model.ApiError{Err: err, Typ: model.ErrorInternal} + } + } } return nil } @@ -3897,8 +3973,11 @@ func (r *ClickHouseReader) GetLatencyMetricMetadata(ctx context.Context, metricN }, nil } -func isColumn(tableStatement, field string) bool { - return strings.Contains(tableStatement, fmt.Sprintf("`%s` ", field)) +func isColumn(tableStatement, attrType, field, datType string) bool { + // value of attrType will be `resource` or `tag`, if `tag` change it to `attribute` + name := utils.GetClickhouseColumnName(attrType, datType, field) + + return strings.Contains(tableStatement, fmt.Sprintf("`%s` ", name)) } func (r *ClickHouseReader) GetLogAggregateAttributes(ctx context.Context, req *v3.AggregateAttributeRequest) (*v3.AggregateAttributeResponse, error) { @@ -3970,7 +4049,7 @@ func (r *ClickHouseReader) GetLogAggregateAttributes(ctx context.Context, req *v Key: tagKey, DataType: v3.AttributeKeyDataType(dataType), Type: v3.AttributeKeyType(attType), - IsColumn: isColumn(statements[0].Statement, tagKey), + IsColumn: isColumn(statements[0].Statement, attType, tagKey, dataType), } response.AttributeKeys = append(response.AttributeKeys, key) } @@ -4025,7 +4104,7 @@ func (r *ClickHouseReader) GetLogAttributeKeys(ctx context.Context, req *v3.Filt Key: attributeKey, DataType: v3.AttributeKeyDataType(attributeDataType), Type: v3.AttributeKeyType(tagType), - IsColumn: isColumn(statements[0].Statement, attributeKey), + IsColumn: isColumn(statements[0].Statement, tagType, attributeKey, attributeDataType), } response.AttributeKeys = append(response.AttributeKeys, key) diff --git a/pkg/query-service/app/logs/parser.go b/pkg/query-service/app/logs/parser.go index a661d4e894..06ea6d2271 100644 --- a/pkg/query-service/app/logs/parser.go +++ b/pkg/query-service/app/logs/parser.go @@ -9,6 +9,7 @@ import ( "go.signoz.io/signoz/pkg/query-service/constants" "go.signoz.io/signoz/pkg/query-service/model" + "go.signoz.io/signoz/pkg/query-service/utils" ) var operatorMapping = map[string]string{ @@ -304,6 +305,11 @@ func replaceFieldInToken(queryToken string, selectedFieldsLookup map[string]mode } else if strings.Compare(strings.ToLower(*col), "fulltext") != 0 && field.Type != constants.Static { return "", fmt.Errorf("field not found for filtering") } + } else { + field := selectedFieldsLookup[sqlColName] + if field.Type != constants.Static { + sqlColName = utils.GetClickhouseColumnName(field.Type, field.DataType, field.Name) + } } } return strings.Replace(queryToken, *col, sqlColName, 1), nil diff --git a/pkg/query-service/app/logs/parser_test.go b/pkg/query-service/app/logs/parser_test.go index 8bb70991d6..b02284ea05 100644 --- a/pkg/query-service/app/logs/parser_test.go +++ b/pkg/query-service/app/logs/parser_test.go @@ -252,7 +252,7 @@ func TestReplaceInterestingFields(t *testing.T) { }, } - expectedTokens := []string{"attributes_int64_value[indexOf(attributes_int64_key, 'id.userid')] IN (100) ", "and id_key >= 50 ", `AND body ILIKE '%searchstring%'`} + expectedTokens := []string{"attributes_int64_value[indexOf(attributes_int64_key, 'id.userid')] IN (100) ", "and attribute_int64_id_key >= 50 ", `AND body ILIKE '%searchstring%'`} Convey("testInterestingFields", t, func() { tokens, err := replaceInterestingFields(&allFields, queryTokens) So(err, ShouldBeNil) @@ -340,6 +340,11 @@ var generateSQLQueryFields = model.GetFieldsResponse{ DataType: "string", Type: "attributes", }, + { + Name: "severity_number", + DataType: "string", + Type: "static", + }, }, Interesting: []model.LogField{ { @@ -369,7 +374,7 @@ var generateSQLQueryTestCases = []struct { IdGt: "2BsKLKv8cZrLCn6rkOcRGkdjBdM", IdLT: "2BsKG6tRpFWjYMcWsAGKfSxoQdU", }, - SqlFilter: "( timestamp >= '1657689292000' and timestamp <= '1657689294000' and id > '2BsKLKv8cZrLCn6rkOcRGkdjBdM' and id < '2BsKG6tRpFWjYMcWsAGKfSxoQdU' ) and ( field1 < 100 and field1 > 50 and attributes_int64_value[indexOf(attributes_int64_key, 'code')] <= 500 and attributes_int64_value[indexOf(attributes_int64_key, 'code')] >= 400 ) ", + SqlFilter: "( timestamp >= '1657689292000' and timestamp <= '1657689294000' and id > '2BsKLKv8cZrLCn6rkOcRGkdjBdM' and id < '2BsKG6tRpFWjYMcWsAGKfSxoQdU' ) and ( attribute_int64_field1 < 100 and attribute_int64_field1 > 50 and attributes_int64_value[indexOf(attributes_int64_key, 'code')] <= 500 and attributes_int64_value[indexOf(attributes_int64_key, 'code')] >= 400 ) ", }, { Name: "second query with only timestamp range", @@ -378,7 +383,7 @@ var generateSQLQueryTestCases = []struct { TimestampStart: uint64(1657689292000), TimestampEnd: uint64(1657689294000), }, - SqlFilter: "( timestamp >= '1657689292000' and timestamp <= '1657689294000' ) and ( field1 < 100 and field1 > 50 and attributes_int64_value[indexOf(attributes_int64_key, 'code')] <= 500 and attributes_int64_value[indexOf(attributes_int64_key, 'code')] >= 400 ) ", + SqlFilter: "( timestamp >= '1657689292000' and timestamp <= '1657689294000' ) and ( attribute_int64_field1 < 100 and attribute_int64_field1 > 50 and attributes_int64_value[indexOf(attributes_int64_key, 'code')] <= 500 and attributes_int64_value[indexOf(attributes_int64_key, 'code')] >= 400 ) ", }, { Name: "generate case sensitive query", @@ -387,7 +392,7 @@ var generateSQLQueryTestCases = []struct { TimestampStart: uint64(1657689292000), TimestampEnd: uint64(1657689294000), }, - SqlFilter: "( timestamp >= '1657689292000' and timestamp <= '1657689294000' ) and ( field1 < 100 and attributes_int64_value[indexOf(attributes_int64_key, 'FielD1')] > 50 and Field2 > 10 and attributes_int64_value[indexOf(attributes_int64_key, 'code')] <= 500 and attributes_int64_value[indexOf(attributes_int64_key, 'code')] >= 400 ) ", + SqlFilter: "( timestamp >= '1657689292000' and timestamp <= '1657689294000' ) and ( attribute_int64_field1 < 100 and attributes_int64_value[indexOf(attributes_int64_key, 'FielD1')] > 50 and attribute_double64_Field2 > 10 and attributes_int64_value[indexOf(attributes_int64_key, 'code')] <= 500 and attributes_int64_value[indexOf(attributes_int64_key, 'code')] >= 400 ) ", }, { Name: "Check exists and not exists", @@ -396,7 +401,16 @@ var generateSQLQueryTestCases = []struct { TimestampStart: uint64(1657689292000), TimestampEnd: uint64(1657689294000), }, - SqlFilter: "( timestamp >= '1657689292000' and timestamp <= '1657689294000' ) and ( has(attributes_int64_key, 'field1') and NOT has(attributes_double64_key, 'Field2') and Field2 > 10 ) ", + SqlFilter: "( timestamp >= '1657689292000' and timestamp <= '1657689294000' ) and ( has(attributes_int64_key, 'field1') and NOT has(attributes_double64_key, 'Field2') and attribute_double64_Field2 > 10 ) ", + }, + { + Name: "Check top level key filter", + Filter: model.LogsFilterParams{ + Query: "severity_number in (1)", + TimestampStart: uint64(1657689292000), + TimestampEnd: uint64(1657689294000), + }, + SqlFilter: "( timestamp >= '1657689292000' and timestamp <= '1657689294000' ) and ( severity_number IN (1) ) ", }, { Name: "Check exists and not exists on top level keys", diff --git a/pkg/query-service/app/logs/v3/query_builder.go b/pkg/query-service/app/logs/v3/query_builder.go index 4a95fe9d1d..cfe67046ba 100644 --- a/pkg/query-service/app/logs/v3/query_builder.go +++ b/pkg/query-service/app/logs/v3/query_builder.go @@ -83,7 +83,17 @@ func getClickhouseColumnName(key v3.AttributeKey) string { columnType := getClickhouseLogsColumnType(key.Type) columnDataType := getClickhouseLogsColumnDataType(key.DataType) clickhouseColumn = fmt.Sprintf("%s_%s_value[indexOf(%s_%s_key, '%s')]", columnType, columnDataType, columnType, columnDataType, key.Key) + return clickhouseColumn } + + // check if it is a static field + if key.Type == v3.AttributeKeyTypeUnspecified { + // name is the column name + return clickhouseColumn + } + + // materialized column created from query + clickhouseColumn = utils.GetClickhouseColumnName(string(key.Type), string(key.DataType), key.Key) return clickhouseColumn } @@ -123,6 +133,7 @@ func buildLogsTimeSeriesFilterQuery(fs *v3.FilterSet, groupBy []v3.AttributeKey) if err != nil { return "", fmt.Errorf("failed to validate and cast value for %s: %v", item.Key.Key, err) } + if logsOp, ok := logOperators[op]; ok { switch op { case v3.FilterOperatorExists, v3.FilterOperatorNotExists: @@ -153,6 +164,9 @@ func buildLogsTimeSeriesFilterQuery(fs *v3.FilterSet, groupBy []v3.AttributeKey) columnType := getClickhouseLogsColumnType(attr.Type) columnDataType := getClickhouseLogsColumnDataType(attr.DataType) conditions = append(conditions, fmt.Sprintf("indexOf(%s_%s_key, '%s') > 0", columnType, columnDataType, attr.Key)) + } else if attr.Type != v3.AttributeKeyTypeUnspecified { + // for materialzied columns + conditions = append(conditions, fmt.Sprintf("%s_exists=true", getClickhouseColumnName(attr))) } } diff --git a/pkg/query-service/app/logs/v3/query_builder_test.go b/pkg/query-service/app/logs/v3/query_builder_test.go index d8c0eedd32..bb8c6e670b 100644 --- a/pkg/query-service/app/logs/v3/query_builder_test.go +++ b/pkg/query-service/app/logs/v3/query_builder_test.go @@ -26,7 +26,17 @@ var testGetClickhouseColumnNameData = []struct { { Name: "selected field", AttributeKey: v3.AttributeKey{Key: "servicename", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: true}, - ExpectedColumnName: "servicename", + ExpectedColumnName: "attribute_string_servicename", + }, + { + Name: "selected field resource", + AttributeKey: v3.AttributeKey{Key: "sdk_version", DataType: v3.AttributeKeyDataTypeInt64, Type: v3.AttributeKeyTypeResource, IsColumn: true}, + ExpectedColumnName: "resource_int64_sdk_version", + }, + { + Name: "selected field float", + AttributeKey: v3.AttributeKey{Key: "sdk_version", DataType: v3.AttributeKeyDataTypeFloat64, Type: v3.AttributeKeyTypeTag, IsColumn: true}, + ExpectedColumnName: "attribute_float64_sdk_version", }, { Name: "same name as top level column", @@ -35,7 +45,7 @@ var testGetClickhouseColumnNameData = []struct { }, { Name: "top level column", - AttributeKey: v3.AttributeKey{Key: "trace_id", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: true}, + AttributeKey: v3.AttributeKey{Key: "trace_id", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeUnspecified, IsColumn: true}, ExpectedColumnName: "trace_id", }, } @@ -121,7 +131,7 @@ var timeSeriesFilterQueryData = []struct { {Key: v3.AttributeKey{Key: "user_name", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: true}, Value: "john", Operator: "="}, {Key: v3.AttributeKey{Key: "k8s_namespace", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeResource}, Value: "my_service", Operator: "!="}, }}, - ExpectedFilter: " AND user_name = 'john' AND resources_string_value[indexOf(resources_string_key, 'k8s_namespace')] != 'my_service'", + ExpectedFilter: " AND attribute_string_user_name = 'john' AND resources_string_value[indexOf(resources_string_key, 'k8s_namespace')] != 'my_service'", }, { Name: "Test like", @@ -184,7 +194,7 @@ var timeSeriesFilterQueryData = []struct { FilterSet: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{ {Key: v3.AttributeKey{Key: "host", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: true}, Value: "host: \"(?P\\S+)\"", Operator: "regex"}, }}, - ExpectedFilter: " AND match(host, 'host: \"(?P\\\\S+)\"')", + ExpectedFilter: " AND match(attribute_string_host, 'host: \"(?P\\\\S+)\"')", }, { Name: "Test not regex", @@ -207,7 +217,7 @@ var timeSeriesFilterQueryData = []struct { {Key: v3.AttributeKey{Key: "host", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}, Value: "102.", Operator: "ncontains"}, }}, GroupBy: []v3.AttributeKey{{Key: "host", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: true}}, - ExpectedFilter: " AND attributes_string_value[indexOf(attributes_string_key, 'host')] NOT ILIKE '%102.%'", + ExpectedFilter: " AND attributes_string_value[indexOf(attributes_string_key, 'host')] NOT ILIKE '%102.%' AND attribute_string_host_exists=true", }, { Name: "Wrong data", diff --git a/pkg/query-service/constants/constants.go b/pkg/query-service/constants/constants.go index e0d8fcbaa2..0f181c36fd 100644 --- a/pkg/query-service/constants/constants.go +++ b/pkg/query-service/constants/constants.go @@ -221,22 +221,17 @@ const ( UINT8 = "Uint8" ) -var StaticInterestingLogFields = []model.LogField{ +var StaticSelectedLogFields = []model.LogField{ { - Name: "trace_id", - DataType: STRING, - Type: Static, - }, - { - Name: "span_id", - DataType: STRING, - Type: Static, - }, - { - Name: "trace_flags", + Name: "timestamp", DataType: UINT32, Type: Static, }, + { + Name: "id", + DataType: STRING, + Type: Static, + }, { Name: "severity_text", DataType: LOWCARDINALITY_STRING, @@ -247,16 +242,18 @@ var StaticInterestingLogFields = []model.LogField{ DataType: UINT8, Type: Static, }, -} - -var StaticSelectedLogFields = []model.LogField{ { - Name: "timestamp", + Name: "trace_flags", DataType: UINT32, Type: Static, }, { - Name: "id", + Name: "trace_id", + DataType: STRING, + Type: Static, + }, + { + Name: "span_id", DataType: STRING, Type: Static, }, diff --git a/pkg/query-service/tests/test-deploy/docker-compose.yaml b/pkg/query-service/tests/test-deploy/docker-compose.yaml index 8cc84f7566..7f331f4c5b 100644 --- a/pkg/query-service/tests/test-deploy/docker-compose.yaml +++ b/pkg/query-service/tests/test-deploy/docker-compose.yaml @@ -1,9 +1,8 @@ version: "2.4" -x-clickhouse-defaults: - &clickhouse-defaults +x-clickhouse-defaults: &clickhouse-defaults restart: on-failure - image: clickhouse/clickhouse-server:22.8.8-alpine + image: clickhouse/clickhouse-server:23.7.3-alpine tty: true depends_on: - zookeeper-1 @@ -32,8 +31,7 @@ x-clickhouse-defaults: soft: 262144 hard: 262144 -x-clickhouse-depends: - &clickhouse-depends +x-clickhouse-depends: &clickhouse-depends depends_on: clickhouse: condition: service_healthy diff --git a/pkg/query-service/utils/format.go b/pkg/query-service/utils/format.go index 1bfccd6858..0a3f46600a 100644 --- a/pkg/query-service/utils/format.go +++ b/pkg/query-service/utils/format.go @@ -7,6 +7,7 @@ import ( "strconv" "strings" + "go.signoz.io/signoz/pkg/query-service/constants" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" "go.uber.org/zap" ) @@ -230,6 +231,19 @@ func getPointerValue(v interface{}) interface{} { } } +func GetClickhouseColumnName(typeName string, dataType, field string) string { + if typeName == string(v3.AttributeKeyTypeTag) { + typeName = constants.Attributes + } + + if typeName != string(v3.AttributeKeyTypeResource) { + typeName = typeName[:len(typeName)-1] + } + + colName := fmt.Sprintf("%s_%s_%s", strings.ToLower(typeName), strings.ToLower(dataType), field) + return colName +} + // GetEpochNanoSecs takes epoch and returns it in ns func GetEpochNanoSecs(epoch int64) int64 { temp := epoch diff --git a/pkg/query-service/utils/format_test.go b/pkg/query-service/utils/format_test.go index bd3c0eaf35..2c7aab1e6d 100644 --- a/pkg/query-service/utils/format_test.go +++ b/pkg/query-service/utils/format_test.go @@ -4,6 +4,7 @@ import ( "reflect" "testing" + "go.signoz.io/signoz/pkg/query-service/constants" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" ) @@ -388,6 +389,54 @@ func TestClickHouseFormattedValue(t *testing.T) { } } +var testGetClickhouseColumnName = []struct { + name string + typeName string + dataType string + field string + want string +}{ + { + name: "tag", + typeName: string(v3.AttributeKeyTypeTag), + dataType: string(v3.AttributeKeyDataTypeInt64), + field: "tag1", + want: "attribute_int64_tag1", + }, + { + name: "resource", + typeName: string(v3.AttributeKeyTypeResource), + dataType: string(v3.AttributeKeyDataTypeInt64), + field: "tag1", + want: "resource_int64_tag1", + }, + { + name: "attribute old parser", + typeName: constants.Attributes, + dataType: string(v3.AttributeKeyDataTypeInt64), + field: "tag1", + want: "attribute_int64_tag1", + }, + { + name: "resource old parser", + typeName: constants.Resources, + dataType: string(v3.AttributeKeyDataTypeInt64), + field: "tag1", + want: "resource_int64_tag1", + }, +} + +func TestGetClickhouseColumnName(t *testing.T) { + for _, tt := range testGetClickhouseColumnName { + t.Run(tt.name, func(t *testing.T) { + got := GetClickhouseColumnName(tt.typeName, tt.dataType, tt.field) + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("ClickHouseFormattedValue() = %v, want %v", got, tt.want) + } + }) + } +} + var testGetEpochNanoSecsData = []struct { Name string Epoch int64 From 591ea96285448313a61f91993c35faf24e391377 Mon Sep 17 00:00:00 2001 From: Ankit Nayan Date: Wed, 23 Aug 2023 16:22:24 +0530 Subject: [PATCH 3/3] feat: Just-in-time provisioning of SSO users (#3394) * feat: auto provisioning of SSO users rather than needing invite link to login each user * updating errors Co-authored-by: Srikanth Chekuri * fix: set IsUser: true when SSO feature is available * fix: signoz login from IDP (#3396) * fix: enable login from IDP with relayState set with domainName * update comments on function Co-authored-by: Srikanth Chekuri * chore: added error checks to fetch domain from SAML relay state --------- Co-authored-by: Srikanth Chekuri --------- Co-authored-by: Srikanth Chekuri --- ee/query-service/app/api/auth.go | 2 +- ee/query-service/dao/sqlite/auth.go | 69 +++++++++++++++++++++++++-- ee/query-service/dao/sqlite/domain.go | 65 ++++++++++++++++++++----- ee/query-service/model/errors.go | 3 +- pkg/query-service/auth/auth.go | 12 ++--- 5 files changed, 127 insertions(+), 24 deletions(-) diff --git a/ee/query-service/app/api/auth.go b/ee/query-service/app/api/auth.go index 8d96320778..e013b87b29 100644 --- a/ee/query-service/app/api/auth.go +++ b/ee/query-service/app/api/auth.go @@ -113,7 +113,7 @@ func (ah *APIHandler) registerUser(w http.ResponseWriter, r *http.Request) { } if domain != nil && domain.SsoEnabled { - // so is enabled, create user and respond precheck data + // sso is enabled, create user and respond precheck data user, apierr := baseauth.RegisterInvitedUser(ctx, req, true) if apierr != nil { RespondError(w, apierr, nil) diff --git a/ee/query-service/dao/sqlite/auth.go b/ee/query-service/dao/sqlite/auth.go index 6eb46d2ea6..e06c073997 100644 --- a/ee/query-service/dao/sqlite/auth.go +++ b/ee/query-service/dao/sqlite/auth.go @@ -5,16 +5,61 @@ import ( "fmt" "net/url" "strings" + "time" + "github.com/google/uuid" "go.signoz.io/signoz/ee/query-service/constants" "go.signoz.io/signoz/ee/query-service/model" + baseauth "go.signoz.io/signoz/pkg/query-service/auth" baseconst "go.signoz.io/signoz/pkg/query-service/constants" basemodel "go.signoz.io/signoz/pkg/query-service/model" - baseauth "go.signoz.io/signoz/pkg/query-service/auth" + "go.signoz.io/signoz/pkg/query-service/utils" "go.uber.org/zap" ) -// PrepareSsoRedirect prepares redirect page link after SSO response +func (m *modelDao) createUserForSAMLRequest(ctx context.Context, email string) (*basemodel.User, basemodel.BaseApiError) { + // get auth domain from email domain + domain, apierr := m.GetDomainByEmail(ctx, email) + + if apierr != nil { + zap.S().Errorf("failed to get domain from email", apierr) + return nil, model.InternalErrorStr("failed to get domain from email") + } + + hash, err := baseauth.PasswordHash(utils.GeneratePassowrd()) + if err != nil { + zap.S().Errorf("failed to generate password hash when registering a user via SSO redirect", zap.Error(err)) + return nil, model.InternalErrorStr("failed to generate password hash") + } + + group, apiErr := m.GetGroupByName(ctx, baseconst.ViewerGroup) + if apiErr != nil { + zap.S().Debugf("GetGroupByName failed, err: %v\n", apiErr.Err) + return nil, apiErr + } + + user := &basemodel.User{ + Id: uuid.NewString(), + Name: "", + Email: email, + Password: hash, + CreatedAt: time.Now().Unix(), + ProfilePictureURL: "", // Currently unused + GroupId: group.Id, + OrgId: domain.OrgId, + } + + user, apiErr = m.CreateUser(ctx, user, false) + if apiErr != nil { + zap.S().Debugf("CreateUser failed, err: %v\n", apiErr.Err) + return nil, apiErr + } + + return user, nil + +} + +// PrepareSsoRedirect prepares redirect page link after SSO response // is successfully parsed (i.e. valid email is available) func (m *modelDao) PrepareSsoRedirect(ctx context.Context, redirectUri, email string) (redirectURL string, apierr basemodel.BaseApiError) { @@ -24,7 +69,20 @@ func (m *modelDao) PrepareSsoRedirect(ctx context.Context, redirectUri, email st return "", model.BadRequestStr("invalid user email received from the auth provider") } - tokenStore, err := baseauth.GenerateJWTForUser(&userPayload.User) + user := &basemodel.User{} + + if userPayload == nil { + newUser, apiErr := m.createUserForSAMLRequest(ctx, email) + user = newUser + if apiErr != nil { + zap.S().Errorf("failed to create user with email received from auth provider: %v", apierr.Error()) + return "", apiErr + } + } else { + user = &userPayload.User + } + + tokenStore, err := baseauth.GenerateJWTForUser(user) if err != nil { zap.S().Errorf("failed to generate token for SSO login user", err) return "", model.InternalErrorStr("failed to generate token for the user") @@ -33,7 +91,7 @@ func (m *modelDao) PrepareSsoRedirect(ctx context.Context, redirectUri, email st return fmt.Sprintf("%s?jwt=%s&usr=%s&refreshjwt=%s", redirectUri, tokenStore.AccessJwt, - userPayload.User.Id, + user.Id, tokenStore.RefreshJwt), nil } @@ -76,6 +134,7 @@ func (m *modelDao) PrecheckLogin(ctx context.Context, email, sourceUrl string) ( if userPayload == nil { resp.IsUser = false } + ssoAvailable := true err := m.checkFeature(model.SSO) if err != nil { @@ -91,6 +150,8 @@ func (m *modelDao) PrecheckLogin(ctx context.Context, email, sourceUrl string) ( if ssoAvailable { + resp.IsUser = true + // find domain from email orgDomain, apierr := m.GetDomainByEmail(ctx, email) if apierr != nil { diff --git a/ee/query-service/dao/sqlite/domain.go b/ee/query-service/dao/sqlite/domain.go index d1ef8aa8d2..9fbee9e9df 100644 --- a/ee/query-service/dao/sqlite/domain.go +++ b/ee/query-service/dao/sqlite/domain.go @@ -4,8 +4,8 @@ import ( "context" "database/sql" "encoding/json" - "net/url" "fmt" + "net/url" "strings" "time" @@ -28,29 +28,70 @@ type StoredDomain struct { // GetDomainFromSsoResponse uses relay state received from IdP to fetch // user domain. The domain is further used to process validity of the response. -// when sending login request to IdP we send relay state as URL (site url) -// with domainId as query parameter. +// when sending login request to IdP we send relay state as URL (site url) +// with domainId or domainName as query parameter. func (m *modelDao) GetDomainFromSsoResponse(ctx context.Context, relayState *url.URL) (*model.OrgDomain, error) { // derive domain id from relay state now - var domainIdStr string + var domainIdStr string + var domainNameStr string + var domain *model.OrgDomain + for k, v := range relayState.Query() { if k == "domainId" && len(v) > 0 { domainIdStr = strings.Replace(v[0], ":", "-", -1) } + if k == "domainName" && len(v) > 0 { + domainNameStr = v[0] + } } - domainId, err := uuid.Parse(domainIdStr) + if domainIdStr != "" { + domainId, err := uuid.Parse(domainIdStr) + if err != nil { + zap.S().Errorf("failed to parse domainId from relay state", err) + return nil, fmt.Errorf("failed to parse domainId from IdP response") + } + + domain, err = m.GetDomain(ctx, domainId) + if (err != nil) || domain == nil { + zap.S().Errorf("failed to find domain from domainId received in IdP response", err.Error()) + return nil, fmt.Errorf("invalid credentials") + } + } + + if domainNameStr != "" { + + domainFromDB, err := m.GetDomainByName(ctx, domainNameStr) + domain = domainFromDB + if (err != nil) || domain == nil { + zap.S().Errorf("failed to find domain from domainName received in IdP response", err.Error()) + return nil, fmt.Errorf("invalid credentials") + } + } + if domain != nil { + return domain, nil + } + + return nil, fmt.Errorf("failed to find domain received in IdP response") +} + +// GetDomainByName returns org domain for a given domain name +func (m *modelDao) GetDomainByName(ctx context.Context, name string) (*model.OrgDomain, basemodel.BaseApiError) { + + stored := StoredDomain{} + err := m.DB().Get(&stored, `SELECT * FROM org_domains WHERE name=$1 LIMIT 1`, name) + if err != nil { - zap.S().Errorf("failed to parse domain id from relay state", err) - return nil, fmt.Errorf("failed to parse response from IdP response") + if err == sql.ErrNoRows { + return nil, model.BadRequest(fmt.Errorf("invalid domain name")) + } + return nil, model.InternalError(err) } - domain, err := m.GetDomain(ctx, domainId) - if (err != nil) || domain == nil { - zap.S().Errorf("failed to find domain received in IdP response", err.Error()) - return nil, fmt.Errorf("invalid credentials") + domain := &model.OrgDomain{Id: stored.Id, Name: stored.Name, OrgId: stored.OrgId} + if err := domain.LoadConfig(stored.Data); err != nil { + return domain, model.InternalError(err) } - return domain, nil } diff --git a/ee/query-service/model/errors.go b/ee/query-service/model/errors.go index 6820cf8d44..7e7b8410e2 100644 --- a/ee/query-service/model/errors.go +++ b/ee/query-service/model/errors.go @@ -2,6 +2,7 @@ package model import ( "fmt" + basemodel "go.signoz.io/signoz/pkg/query-service/model" ) @@ -61,7 +62,6 @@ func InternalError(err error) *ApiError { } } - // InternalErrorStr returns a ApiError object of internal type for string input func InternalErrorStr(s string) *ApiError { return &ApiError{ @@ -69,6 +69,7 @@ func InternalErrorStr(s string) *ApiError { Err: fmt.Errorf(s), } } + var ( ErrorNone basemodel.ErrorType = "" ErrorTimeout basemodel.ErrorType = "timeout" diff --git a/pkg/query-service/auth/auth.go b/pkg/query-service/auth/auth.go index d2488a1399..7f78fa3660 100644 --- a/pkg/query-service/auth/auth.go +++ b/pkg/query-service/auth/auth.go @@ -165,7 +165,7 @@ func ResetPassword(ctx context.Context, req *model.ResetPasswordRequest) error { return errors.New("Invalid reset password request") } - hash, err := passwordHash(req.Password) + hash, err := PasswordHash(req.Password) if err != nil { return errors.Wrap(err, "Failed to generate password hash") } @@ -192,7 +192,7 @@ func ChangePassword(ctx context.Context, req *model.ChangePasswordRequest) error return ErrorInvalidCreds } - hash, err := passwordHash(req.NewPassword) + hash, err := PasswordHash(req.NewPassword) if err != nil { return errors.Wrap(err, "Failed to generate password hash") } @@ -243,7 +243,7 @@ func RegisterFirstUser(ctx context.Context, req *RegisterRequest) (*model.User, var hash string var err error - hash, err = passwordHash(req.Password) + hash, err = PasswordHash(req.Password) if err != nil { zap.S().Errorf("failed to generate password hash when registering a user", zap.Error(err)) return nil, model.InternalError(model.ErrSignupFailed{}) @@ -314,13 +314,13 @@ func RegisterInvitedUser(ctx context.Context, req *RegisterRequest, nopassword b // check if password is not empty, as for SSO case it can be if req.Password != "" { - hash, err = passwordHash(req.Password) + hash, err = PasswordHash(req.Password) if err != nil { zap.S().Errorf("failed to generate password hash when registering a user", zap.Error(err)) return nil, model.InternalError(model.ErrSignupFailed{}) } } else { - hash, err = passwordHash(utils.GeneratePassowrd()) + hash, err = PasswordHash(utils.GeneratePassowrd()) if err != nil { zap.S().Errorf("failed to generate password hash when registering a user", zap.Error(err)) return nil, model.InternalError(model.ErrSignupFailed{}) @@ -419,7 +419,7 @@ func authenticateLogin(ctx context.Context, req *model.LoginRequest) (*model.Use } // Generate hash from the password. -func passwordHash(pass string) (string, error) { +func PasswordHash(pass string) (string, error) { hash, err := bcrypt.GenerateFromPassword([]byte(pass), bcrypt.DefaultCost) if err != nil { return "", err