Merge pull request #6668 from SigNoz/chore/deprecate-develop

chore: develop deprecation and related changes
This commit is contained in:
Prashant Shahi 2024-12-19 13:48:29 +05:30 committed by GitHub
commit cecc57e72d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
30 changed files with 452 additions and 164 deletions

View File

@ -3,7 +3,6 @@ name: build-pipeline
on: on:
pull_request: pull_request:
branches: branches:
- develop
- main - main
- release/v* - release/v*

View File

@ -3,7 +3,7 @@ name: "Update PR labels and Block PR until related docs are shipped for the feat
on: on:
pull_request: pull_request:
branches: branches:
- develop - main
types: [opened, edited, labeled, unlabeled] types: [opened, edited, labeled, unlabeled]
permissions: permissions:

View File

@ -42,7 +42,7 @@ jobs:
kubectl create ns sample-application kubectl create ns sample-application
# apply hotrod k8s manifest file # apply hotrod k8s manifest file
kubectl -n sample-application apply -f https://raw.githubusercontent.com/SigNoz/signoz/develop/sample-apps/hotrod/hotrod.yaml kubectl -n sample-application apply -f https://raw.githubusercontent.com/SigNoz/signoz/main/sample-apps/hotrod/hotrod.yaml
# wait for all deployments in sample-application namespace to be READY # wait for all deployments in sample-application namespace to be READY
kubectl -n sample-application get deploy --output name | xargs -r -n1 -t kubectl -n sample-application rollout status --timeout=300s kubectl -n sample-application get deploy --output name | xargs -r -n1 -t kubectl -n sample-application rollout status --timeout=300s

View File

@ -2,7 +2,8 @@ name: Jest Coverage - changed files
on: on:
pull_request: pull_request:
branches: develop branches:
- main
jobs: jobs:
build: build:
@ -11,7 +12,7 @@ jobs:
- name: Checkout - name: Checkout
uses: actions/checkout@v4 uses: actions/checkout@v4
with: with:
ref: "refs/heads/develop" ref: "refs/heads/main"
token: ${{ secrets.GITHUB_TOKEN }} # Provide the GitHub token for authentication token: ${{ secrets.GITHUB_TOKEN }} # Provide the GitHub token for authentication
- name: Fetch branch - name: Fetch branch

View File

@ -4,7 +4,6 @@ on:
push: push:
branches: branches:
- main - main
- develop
tags: tags:
- v* - v*

View File

@ -3,7 +3,6 @@ on:
pull_request: pull_request:
branches: branches:
- main - main
- develop
paths: paths:
- 'frontend/**' - 'frontend/**'
defaults: defaults:

View File

@ -1,12 +1,12 @@
name: staging-deployment name: staging-deployment
# Trigger deployment only on push to develop branch # Trigger deployment only on push to main branch
on: on:
push: push:
branches: branches:
- develop - main
jobs: jobs:
deploy: deploy:
name: Deploy latest develop branch to staging name: Deploy latest main branch to staging
runs-on: ubuntu-latest runs-on: ubuntu-latest
environment: staging environment: staging
permissions: permissions:

View File

@ -44,7 +44,7 @@ jobs:
git add . git add .
git stash push -m "stashed on $(date --iso-8601=seconds)" git stash push -m "stashed on $(date --iso-8601=seconds)"
git fetch origin git fetch origin
git checkout develop git checkout main
git pull git pull
# This is added to include the scenerio when new commit in PR is force-pushed # This is added to include the scenerio when new commit in PR is force-pushed
git branch -D ${GITHUB_BRANCH} git branch -D ${GITHUB_BRANCH}

View File

@ -339,7 +339,7 @@ to make SigNoz UI available at [localhost:3301](http://localhost:3301)
**5.1.1 To install the HotROD sample app:** **5.1.1 To install the HotROD sample app:**
```bash ```bash
curl -sL https://github.com/SigNoz/signoz/raw/develop/sample-apps/hotrod/hotrod-install.sh \ curl -sL https://github.com/SigNoz/signoz/raw/main/sample-apps/hotrod/hotrod-install.sh \
| HELM_RELEASE=my-release SIGNOZ_NAMESPACE=platform bash | HELM_RELEASE=my-release SIGNOZ_NAMESPACE=platform bash
``` ```
@ -362,7 +362,7 @@ kubectl -n sample-application run strzal --image=djbingham/curl \
**5.1.4 To delete the HotROD sample app:** **5.1.4 To delete the HotROD sample app:**
```bash ```bash
curl -sL https://github.com/SigNoz/signoz/raw/develop/sample-apps/hotrod/hotrod-delete.sh \ curl -sL https://github.com/SigNoz/signoz/raw/main/sample-apps/hotrod/hotrod-delete.sh \
| HOTROD_NAMESPACE=sample-application bash | HOTROD_NAMESPACE=sample-application bash
``` ```

View File

@ -58,7 +58,7 @@ from the HotROD application, you should see the data generated from hotrod in Si
```sh ```sh
kubectl create ns sample-application kubectl create ns sample-application
kubectl -n sample-application apply -f https://raw.githubusercontent.com/SigNoz/signoz/develop/sample-apps/hotrod/hotrod.yaml kubectl -n sample-application apply -f https://raw.githubusercontent.com/SigNoz/signoz/main/sample-apps/hotrod/hotrod.yaml
``` ```
To generate load: To generate load:

View File

@ -13,8 +13,3 @@ if [ "$branch" = "main" ]; then
echo "${color_red}${bold}You can't commit directly to the main branch${reset}" echo "${color_red}${bold}You can't commit directly to the main branch${reset}"
exit 1 exit 1
fi fi
if [ "$branch" = "develop" ]; then
echo "${color_red}${bold}You can't commit directly to the develop branch${reset}"
exit 1
fi

View File

@ -24,13 +24,13 @@ const MQServiceDetailTypePerView = (
producerLatencyOption: ProducerLatencyOptions, producerLatencyOption: ProducerLatencyOptions,
): Record<string, MessagingQueueServiceDetailType[]> => ({ ): Record<string, MessagingQueueServiceDetailType[]> => ({
[MessagingQueuesViewType.consumerLag.value]: [ [MessagingQueuesViewType.consumerLag.value]: [
MessagingQueueServiceDetailType.ConsumerDetails,
MessagingQueueServiceDetailType.ProducerDetails, MessagingQueueServiceDetailType.ProducerDetails,
MessagingQueueServiceDetailType.ConsumerDetails,
MessagingQueueServiceDetailType.NetworkLatency, MessagingQueueServiceDetailType.NetworkLatency,
], ],
[MessagingQueuesViewType.partitionLatency.value]: [ [MessagingQueuesViewType.partitionLatency.value]: [
MessagingQueueServiceDetailType.ConsumerDetails,
MessagingQueueServiceDetailType.ProducerDetails, MessagingQueueServiceDetailType.ProducerDetails,
MessagingQueueServiceDetailType.ConsumerDetails,
], ],
[MessagingQueuesViewType.producerLatency.value]: [ [MessagingQueuesViewType.producerLatency.value]: [
producerLatencyOption === ProducerLatencyOptions.Consumers producerLatencyOption === ProducerLatencyOptions.Consumers
@ -122,7 +122,7 @@ function MessagingQueuesDetails({
producerLatencyOption: ProducerLatencyOptions; producerLatencyOption: ProducerLatencyOptions;
}): JSX.Element { }): JSX.Element {
const [currentTab, setCurrentTab] = useState<MessagingQueueServiceDetailType>( const [currentTab, setCurrentTab] = useState<MessagingQueueServiceDetailType>(
MessagingQueueServiceDetailType.ConsumerDetails, MessagingQueueServiceDetailType.ProducerDetails,
); );
useEffect(() => { useEffect(() => {

View File

@ -179,10 +179,13 @@ export const convertToNanoseconds = (timestamp: number): bigint =>
export const getStartAndEndTimesInMilliseconds = ( export const getStartAndEndTimesInMilliseconds = (
timestamp: number, timestamp: number,
): { start: number; end: number } => { ): { start: number; end: number } => {
const FIVE_MINUTES_IN_MILLISECONDS = 5 * 60 * 1000; // 5 minutes in milliseconds - check with Shivanshu once const FIVE_MINUTES_IN_MILLISECONDS = 5 * 60 * 1000; // 300,000 milliseconds
const start = Math.floor(timestamp); const pointInTime = Math.floor(timestamp * 1000);
const end = Math.floor(start + FIVE_MINUTES_IN_MILLISECONDS);
// Convert timestamp to milliseconds and floor it
const start = Math.floor(pointInTime - FIVE_MINUTES_IN_MILLISECONDS);
const end = Math.floor(pointInTime + FIVE_MINUTES_IN_MILLISECONDS);
return { start, end }; return { start, end };
}; };
@ -311,8 +314,8 @@ export const getMetaDataAndAPIPerView = (
return { return {
[MessagingQueuesViewType.consumerLag.value]: { [MessagingQueuesViewType.consumerLag.value]: {
tableApiPayload: { tableApiPayload: {
start: (selectedTimelineQuery?.start || 0) * 1e9, start: (selectedTimelineQuery?.start || 0) * 1e6,
end: (selectedTimelineQuery?.end || 0) * 1e9, end: (selectedTimelineQuery?.end || 0) * 1e6,
variables: { variables: {
partition: selectedTimelineQuery?.partition, partition: selectedTimelineQuery?.partition,
topic: selectedTimelineQuery?.topic, topic: selectedTimelineQuery?.topic,

View File

@ -2694,8 +2694,8 @@ func (r *ClickHouseReader) GetTagsInfoInLastHeartBeatInterval(ctx context.Contex
} }
// remove this after sometime // remove this after sometime
func removeUnderscoreDuplicateFields(fields []model.LogField) []model.LogField { func removeUnderscoreDuplicateFields(fields []model.Field) []model.Field {
lookup := map[string]model.LogField{} lookup := map[string]model.Field{}
for _, v := range fields { for _, v := range fields {
lookup[v.Name+v.DataType] = v lookup[v.Name+v.DataType] = v
} }
@ -2706,7 +2706,7 @@ func removeUnderscoreDuplicateFields(fields []model.LogField) []model.LogField {
} }
} }
updatedFields := []model.LogField{} updatedFields := []model.Field{}
for _, v := range lookup { for _, v := range lookup {
updatedFields = append(updatedFields, v) updatedFields = append(updatedFields, v)
} }
@ -2717,11 +2717,11 @@ func (r *ClickHouseReader) GetLogFields(ctx context.Context) (*model.GetFieldsRe
// response will contain top level fields from the otel log model // response will contain top level fields from the otel log model
response := model.GetFieldsResponse{ response := model.GetFieldsResponse{
Selected: constants.StaticSelectedLogFields, Selected: constants.StaticSelectedLogFields,
Interesting: []model.LogField{}, Interesting: []model.Field{},
} }
// get attribute keys // get attribute keys
attributes := []model.LogField{} attributes := []model.Field{}
query := fmt.Sprintf("SELECT DISTINCT name, datatype from %s.%s group by name, datatype", r.logsDB, r.logsAttributeKeys) query := fmt.Sprintf("SELECT DISTINCT name, datatype from %s.%s group by name, datatype", r.logsDB, r.logsAttributeKeys)
err := r.db.Select(ctx, &attributes, query) err := r.db.Select(ctx, &attributes, query)
if err != nil { if err != nil {
@ -2729,7 +2729,7 @@ func (r *ClickHouseReader) GetLogFields(ctx context.Context) (*model.GetFieldsRe
} }
// get resource keys // get resource keys
resources := []model.LogField{} resources := []model.Field{}
query = fmt.Sprintf("SELECT DISTINCT name, datatype from %s.%s group by name, datatype", r.logsDB, r.logsResourceKeys) query = fmt.Sprintf("SELECT DISTINCT name, datatype from %s.%s group by name, datatype", r.logsDB, r.logsResourceKeys)
err = r.db.Select(ctx, &resources, query) err = r.db.Select(ctx, &resources, query)
if err != nil { if err != nil {
@ -2753,9 +2753,11 @@ func (r *ClickHouseReader) GetLogFields(ctx context.Context) (*model.GetFieldsRe
return &response, nil return &response, nil
} }
func (r *ClickHouseReader) extractSelectedAndInterestingFields(tableStatement string, fieldType string, fields *[]model.LogField, response *model.GetFieldsResponse) { func (r *ClickHouseReader) extractSelectedAndInterestingFields(tableStatement string, overrideFieldType string, fields *[]model.Field, response *model.GetFieldsResponse) {
for _, field := range *fields { for _, field := range *fields {
field.Type = fieldType if overrideFieldType != "" {
field.Type = overrideFieldType
}
// all static fields are assumed to be selected as we don't allow changing them // all static fields are assumed to be selected as we don't allow changing them
if isColumn(r.useLogsNewSchema, tableStatement, field.Type, field.Name, field.DataType) { if isColumn(r.useLogsNewSchema, tableStatement, field.Type, field.Name, field.DataType) {
response.Selected = append(response.Selected, field) response.Selected = append(response.Selected, field)
@ -2945,6 +2947,165 @@ func (r *ClickHouseReader) UpdateLogField(ctx context.Context, field *model.Upda
return nil return nil
} }
func (r *ClickHouseReader) GetTraceFields(ctx context.Context) (*model.GetFieldsResponse, *model.ApiError) {
// response will contain top level fields from the otel trace model
response := model.GetFieldsResponse{
Selected: []model.Field{},
Interesting: []model.Field{},
}
// get the top level selected fields
for _, field := range constants.NewStaticFieldsTraces {
if (v3.AttributeKey{} == field) {
continue
}
response.Selected = append(response.Selected, model.Field{
Name: field.Key,
DataType: field.DataType.String(),
Type: constants.Static,
})
}
// get attribute keys
attributes := []model.Field{}
query := fmt.Sprintf("SELECT tagKey, tagType, dataType from %s.%s group by tagKey, tagType, dataType", r.TraceDB, r.spanAttributesKeysTable)
rows, err := r.db.Query(ctx, query)
if err != nil {
return nil, &model.ApiError{Err: err, Typ: model.ErrorInternal}
}
defer rows.Close()
var tagKey string
var dataType string
var tagType string
for rows.Next() {
if err := rows.Scan(&tagKey, &tagType, &dataType); err != nil {
return nil, &model.ApiError{Err: err, Typ: model.ErrorInternal}
}
attributes = append(attributes, model.Field{
Name: tagKey,
DataType: dataType,
Type: tagType,
})
}
statements := []model.ShowCreateTableStatement{}
query = fmt.Sprintf("SHOW CREATE TABLE %s.%s", r.TraceDB, r.traceLocalTableName)
err = r.db.Select(ctx, &statements, query)
if err != nil {
return nil, &model.ApiError{Err: err, Typ: model.ErrorInternal}
}
r.extractSelectedAndInterestingFields(statements[0].Statement, "", &attributes, &response)
return &response, nil
}
func (r *ClickHouseReader) UpdateTraceField(ctx context.Context, field *model.UpdateField) *model.ApiError {
if !field.Selected {
return model.ForbiddenError(errors.New("removing a selected field is not allowed, please reach out to support."))
}
// name of the materialized column
colname := utils.GetClickhouseColumnNameV2(field.Type, field.DataType, field.Name)
field.DataType = strings.ToLower(field.DataType)
// dataType and chDataType of the materialized column
var dataTypeMap = map[string]string{
"string": "string",
"bool": "bool",
"int64": "number",
"float64": "number",
}
var chDataTypeMap = map[string]string{
"string": "String",
"bool": "Bool",
"int64": "Float64",
"float64": "Float64",
}
chDataType := chDataTypeMap[field.DataType]
dataType := dataTypeMap[field.DataType]
// typeName: tag => attributes, resource => resources
typeName := field.Type
if field.Type == string(v3.AttributeKeyTypeTag) {
typeName = constants.Attributes
} else if field.Type == string(v3.AttributeKeyTypeResource) {
typeName = constants.Resources
}
attrColName := fmt.Sprintf("%s_%s", typeName, dataType)
for _, table := range []string{r.traceLocalTableName, r.traceTableName} {
q := "ALTER TABLE %s.%s ON CLUSTER %s ADD COLUMN IF NOT EXISTS `%s` %s DEFAULT %s['%s'] CODEC(ZSTD(1))"
query := fmt.Sprintf(q,
r.TraceDB, table,
r.cluster,
colname, chDataType,
attrColName,
field.Name,
)
err := r.db.Exec(ctx, query)
if err != nil {
return &model.ApiError{Err: err, Typ: model.ErrorInternal}
}
query = fmt.Sprintf("ALTER TABLE %s.%s ON CLUSTER %s ADD COLUMN IF NOT EXISTS `%s_exists` bool DEFAULT if(mapContains(%s, '%s') != 0, true, false) CODEC(ZSTD(1))",
r.TraceDB, table,
r.cluster,
colname,
attrColName,
field.Name,
)
err = r.db.Exec(ctx, query)
if err != nil {
return &model.ApiError{Err: err, Typ: model.ErrorInternal}
}
}
// create the index
if strings.ToLower(field.DataType) == "bool" {
// there is no point in creating index for bool attributes as the cardinality is just 2
return nil
}
if field.IndexType == "" {
field.IndexType = constants.DefaultLogSkipIndexType
}
if field.IndexGranularity == 0 {
field.IndexGranularity = constants.DefaultLogSkipIndexGranularity
}
query := fmt.Sprintf("ALTER TABLE %s.%s ON CLUSTER %s ADD INDEX IF NOT EXISTS `%s_idx` (`%s`) TYPE %s GRANULARITY %d",
r.TraceDB, r.traceLocalTableName,
r.cluster,
colname,
colname,
field.IndexType,
field.IndexGranularity,
)
err := r.db.Exec(ctx, query)
if err != nil {
return &model.ApiError{Err: err, Typ: model.ErrorInternal}
}
// add a default minmax index for numbers
if dataType == "number" {
query = fmt.Sprintf("ALTER TABLE %s.%s ON CLUSTER %s ADD INDEX IF NOT EXISTS `%s_minmax_idx` (`%s`) TYPE minmax GRANULARITY 1",
r.TraceDB, r.traceLocalTableName,
r.cluster,
colname,
colname,
)
err = r.db.Exec(ctx, query)
if err != nil {
return &model.ApiError{Err: err, Typ: model.ErrorInternal}
}
}
return nil
}
func (r *ClickHouseReader) GetLogs(ctx context.Context, params *model.LogsFilterParams) (*[]model.SignozLog, *model.ApiError) { func (r *ClickHouseReader) GetLogs(ctx context.Context, params *model.LogsFilterParams) (*[]model.SignozLog, *model.ApiError) {
response := []model.SignozLog{} response := []model.SignozLog{}
fields, apiErr := r.GetLogFields(ctx) fields, apiErr := r.GetLogFields(ctx)

View File

@ -527,6 +527,9 @@ func (aH *APIHandler) RegisterRoutes(router *mux.Router, am *AuthMiddleware) {
router.HandleFunc("/api/v1/settings/ingestion_key", am.AdminAccess(aH.insertIngestionKey)).Methods(http.MethodPost) router.HandleFunc("/api/v1/settings/ingestion_key", am.AdminAccess(aH.insertIngestionKey)).Methods(http.MethodPost)
router.HandleFunc("/api/v1/settings/ingestion_key", am.ViewAccess(aH.getIngestionKeys)).Methods(http.MethodGet) router.HandleFunc("/api/v1/settings/ingestion_key", am.ViewAccess(aH.getIngestionKeys)).Methods(http.MethodGet)
router.HandleFunc("/api/v2/traces/fields", am.ViewAccess(aH.traceFields)).Methods(http.MethodGet)
router.HandleFunc("/api/v2/traces/fields", am.EditAccess(aH.updateTraceField)).Methods(http.MethodPost)
router.HandleFunc("/api/v1/version", am.OpenAccess(aH.getVersion)).Methods(http.MethodGet) router.HandleFunc("/api/v1/version", am.OpenAccess(aH.getVersion)).Methods(http.MethodGet)
router.HandleFunc("/api/v1/featureFlags", am.OpenAccess(aH.getFeatureFlags)).Methods(http.MethodGet) router.HandleFunc("/api/v1/featureFlags", am.OpenAccess(aH.getFeatureFlags)).Methods(http.MethodGet)
router.HandleFunc("/api/v1/configs", am.OpenAccess(aH.getConfigs)).Methods(http.MethodGet) router.HandleFunc("/api/v1/configs", am.OpenAccess(aH.getConfigs)).Methods(http.MethodGet)
@ -4892,3 +4895,35 @@ func (aH *APIHandler) QueryRangeV4(w http.ResponseWriter, r *http.Request) {
aH.queryRangeV4(r.Context(), queryRangeParams, w, r) aH.queryRangeV4(r.Context(), queryRangeParams, w, r)
} }
func (aH *APIHandler) traceFields(w http.ResponseWriter, r *http.Request) {
fields, apiErr := aH.reader.GetTraceFields(r.Context())
if apiErr != nil {
RespondError(w, apiErr, "failed to fetch fields from the db")
return
}
aH.WriteJSON(w, r, fields)
}
func (aH *APIHandler) updateTraceField(w http.ResponseWriter, r *http.Request) {
field := model.UpdateField{}
if err := json.NewDecoder(r.Body).Decode(&field); err != nil {
apiErr := &model.ApiError{Typ: model.ErrorBadData, Err: err}
RespondError(w, apiErr, "failed to decode payload")
return
}
err := logs.ValidateUpdateFieldPayloadV2(&field)
if err != nil {
apiErr := &model.ApiError{Typ: model.ErrorBadData, Err: err}
RespondError(w, apiErr, "incorrect payload")
return
}
apiErr := aH.reader.UpdateTraceField(r.Context(), &field)
if apiErr != nil {
RespondError(w, apiErr, "failed to update field in the db")
return
}
aH.WriteJSON(w, r, field)
}

View File

@ -6,28 +6,32 @@ import (
func generateConsumerSQL(start, end int64, topic, partition, consumerGroup, queueType string) string { func generateConsumerSQL(start, end int64, topic, partition, consumerGroup, queueType string) string {
timeRange := (end - start) / 1000000000 timeRange := (end - start) / 1000000000
tsBucketStart := (start / 1000000000) - 1800
tsBucketEnd := end / 1000000000
query := fmt.Sprintf(` query := fmt.Sprintf(`
WITH consumer_query AS ( WITH consumer_query AS (
SELECT SELECT
serviceName, resource_string_service$$name,
quantile(0.99)(durationNano) / 1000000 AS p99, quantile(0.99)(durationNano) / 1000000 AS p99,
COUNT(*) AS total_requests, COUNT(*) AS total_requests,
sumIf(1, statusCode = 2) AS error_count, sumIf(1, status_code = 2) AS error_count,
avg(CASE WHEN has(numberTagMap, 'messaging.message.body.size') THEN numberTagMap['messaging.message.body.size'] ELSE NULL END) AS avg_msg_size avg(CASE WHEN has(attributes_number, 'messaging.message.body.size') THEN attributes_number['messaging.message.body.size'] ELSE NULL END) AS avg_msg_size
FROM signoz_traces.distributed_signoz_index_v2 FROM signoz_traces.distributed_signoz_index_v3
WHERE WHERE
timestamp >= '%d' timestamp >= '%d'
AND timestamp <= '%d' AND timestamp <= '%d'
AND ts_bucket_start >= '%d'
AND ts_bucket_start <= '%d'
AND kind = 5 AND kind = 5
AND msgSystem = '%s' AND attribute_string_messaging$$system = '%s'
AND stringTagMap['messaging.destination.name'] = '%s' AND attributes_string['messaging.destination.name'] = '%s'
AND stringTagMap['messaging.destination.partition.id'] = '%s' AND attributes_string['messaging.destination.partition.id'] = '%s'
AND stringTagMap['messaging.kafka.consumer.group'] = '%s' AND attributes_string['messaging.kafka.consumer.group'] = '%s'
GROUP BY serviceName GROUP BY resource_string_service$$name
) )
SELECT SELECT
serviceName AS service_name, resource_string_service$$name AS service_name,
p99, p99,
COALESCE((error_count * 100.0) / total_requests, 0) AS error_rate, COALESCE((error_count * 100.0) / total_requests, 0) AS error_rate,
COALESCE(total_requests / %d, 0) AS throughput, COALESCE(total_requests / %d, 0) AS throughput,
@ -35,27 +39,31 @@ SELECT
FROM FROM
consumer_query consumer_query
ORDER BY ORDER BY
serviceName; resource_string_service$$name;
`, start, end, queueType, topic, partition, consumerGroup, timeRange) `, start, end, tsBucketStart, tsBucketEnd, queueType, topic, partition, consumerGroup, timeRange)
return query return query
} }
// S1 landing // S1 landing
func generatePartitionLatencySQL(start, end int64, queueType string) string { func generatePartitionLatencySQL(start, end int64, queueType string) string {
timeRange := (end - start) / 1000000000 timeRange := (end - start) / 1000000000
tsBucketStart := (start / 1000000000) - 1800
tsBucketEnd := end / 1000000000
query := fmt.Sprintf(` query := fmt.Sprintf(`
WITH partition_query AS ( WITH partition_query AS (
SELECT SELECT
quantile(0.99)(durationNano) / 1000000 AS p99, quantile(0.99)(durationNano) / 1000000 AS p99,
count(*) AS total_requests, count(*) AS total_requests,
stringTagMap['messaging.destination.name'] AS topic, attributes_string['messaging.destination.name'] AS topic,
stringTagMap['messaging.destination.partition.id'] AS partition attributes_string['messaging.destination.partition.id'] AS partition
FROM signoz_traces.distributed_signoz_index_v2 FROM signoz_traces.distributed_signoz_index_v3
WHERE WHERE
timestamp >= '%d' timestamp >= '%d'
AND timestamp <= '%d' AND timestamp <= '%d'
AND ts_bucket_start >= '%d'
AND ts_bucket_start <= '%d'
AND kind = 4 AND kind = 4
AND msgSystem = '%s' AND attribute_string_messaging$$system = '%s'
GROUP BY topic, partition GROUP BY topic, partition
) )
@ -68,35 +76,39 @@ FROM
partition_query partition_query
ORDER BY ORDER BY
topic; topic;
`, start, end, queueType, timeRange) `, start, end, tsBucketStart, tsBucketEnd, queueType, timeRange)
return query return query
} }
// S1 consumer // S1 consumer
func generateConsumerPartitionLatencySQL(start, end int64, topic, partition, queueType string) string { func generateConsumerPartitionLatencySQL(start, end int64, topic, partition, queueType string) string {
timeRange := (end - start) / 1000000000 timeRange := (end - start) / 1000000000
tsBucketStart := (start / 1000000000) - 1800
tsBucketEnd := end / 1000000000
query := fmt.Sprintf(` query := fmt.Sprintf(`
WITH consumer_pl AS ( WITH consumer_pl AS (
SELECT SELECT
stringTagMap['messaging.kafka.consumer.group'] AS consumer_group, attributes_string['messaging.kafka.consumer.group'] AS consumer_group,
serviceName, resource_string_service$$name,
quantile(0.99)(durationNano) / 1000000 AS p99, quantile(0.99)(durationNano) / 1000000 AS p99,
COUNT(*) AS total_requests, COUNT(*) AS total_requests,
sumIf(1, statusCode = 2) AS error_count sumIf(1, status_code = 2) AS error_count
FROM signoz_traces.distributed_signoz_index_v2 FROM signoz_traces.distributed_signoz_index_v3
WHERE WHERE
timestamp >= '%d' timestamp >= '%d'
AND timestamp <= '%d' AND timestamp <= '%d'
AND ts_bucket_start >= '%d'
AND ts_bucket_start <= '%d'
AND kind = 5 AND kind = 5
AND msgSystem = '%s' AND attribute_string_messaging$$system = '%s'
AND stringTagMap['messaging.destination.name'] = '%s' AND attributes_string['messaging.destination.name'] = '%s'
AND stringTagMap['messaging.destination.partition.id'] = '%s' AND attributes_string['messaging.destination.partition.id'] = '%s'
GROUP BY consumer_group, serviceName GROUP BY consumer_group, resource_string_service$$name
) )
SELECT SELECT
consumer_group, consumer_group,
serviceName AS service_name, resource_string_service$$name AS service_name,
p99, p99,
COALESCE((error_count * 100.0) / total_requests, 0) AS error_rate, COALESCE((error_count * 100.0) / total_requests, 0) AS error_rate,
COALESCE(total_requests / %d, 0) AS throughput COALESCE(total_requests / %d, 0) AS throughput
@ -104,61 +116,68 @@ FROM
consumer_pl consumer_pl
ORDER BY ORDER BY
consumer_group; consumer_group;
`, start, end, queueType, topic, partition, timeRange) `, start, end, tsBucketStart, tsBucketEnd, queueType, topic, partition, timeRange)
return query return query
} }
// S3, producer overview // S3, producer overview
func generateProducerPartitionThroughputSQL(start, end int64, queueType string) string { func generateProducerPartitionThroughputSQL(start, end int64, queueType string) string {
timeRange := (end - start) / 1000000000 timeRange := (end - start) / 1000000000
// t, svc, rps, byte*, p99, err tsBucketStart := (start / 1000000000) - 1800
tsBucketEnd := end / 1000000000 // t, svc, rps, byte*, p99, err
query := fmt.Sprintf(` query := fmt.Sprintf(`
WITH producer_latency AS ( WITH producer_latency AS (
SELECT SELECT
serviceName, resource_string_service$$name,
quantile(0.99)(durationNano) / 1000000 AS p99, quantile(0.99)(durationNano) / 1000000 AS p99,
stringTagMap['messaging.destination.name'] AS topic, attributes_string['messaging.destination.name'] AS topic,
COUNT(*) AS total_requests, COUNT(*) AS total_requests,
sumIf(1, statusCode = 2) AS error_count sumIf(1, status_code = 2) AS error_count
FROM signoz_traces.distributed_signoz_index_v2 FROM signoz_traces.distributed_signoz_index_v3
WHERE WHERE
timestamp >= '%d' timestamp >= '%d'
AND timestamp <= '%d' AND timestamp <= '%d'
AND ts_bucket_start >= '%d'
AND ts_bucket_start <= '%d'
AND kind = 4 AND kind = 4
AND msgSystem = '%s' AND attribute_string_messaging$$system = '%s'
GROUP BY topic, serviceName GROUP BY topic, resource_string_service$$name
) )
SELECT SELECT
topic, topic,
serviceName AS service_name, resource_string_service$$name AS service_name,
p99, p99,
COALESCE((error_count * 100.0) / total_requests, 0) AS error_rate, COALESCE((error_count * 100.0) / total_requests, 0) AS error_rate,
COALESCE(total_requests / %d, 0) AS throughput COALESCE(total_requests / %d, 0) AS throughput
FROM FROM
producer_latency producer_latency
`, start, end, queueType, timeRange) `, start, end, tsBucketStart, tsBucketEnd, queueType, timeRange)
return query return query
} }
// S3, producer topic/service overview // S3, producer topic/service overview
func generateProducerTopicLatencySQL(start, end int64, topic, service, queueType string) string { func generateProducerTopicLatencySQL(start, end int64, topic, service, queueType string) string {
timeRange := (end - start) / 1000000000 timeRange := (end - start) / 1000000000
tsBucketStart := (start / 1000000000) - 1800
tsBucketEnd := end / 1000000000
query := fmt.Sprintf(` query := fmt.Sprintf(`
WITH consumer_latency AS ( WITH consumer_latency AS (
SELECT SELECT
quantile(0.99)(durationNano) / 1000000 AS p99, quantile(0.99)(durationNano) / 1000000 AS p99,
stringTagMap['messaging.destination.partition.id'] AS partition, attributes_string['messaging.destination.partition.id'] AS partition,
COUNT(*) AS total_requests, COUNT(*) AS total_requests,
sumIf(1, statusCode = 2) AS error_count sumIf(1, status_code = 2) AS error_count
FROM signoz_traces.distributed_signoz_index_v2 FROM signoz_traces.distributed_signoz_index_v3
WHERE WHERE
timestamp >= '%d' timestamp >= '%d'
AND timestamp <= '%d' AND timestamp <= '%d'
AND ts_bucket_start >= '%d'
AND ts_bucket_start <= '%d'
AND kind = 4 AND kind = 4
AND serviceName = '%s' AND resource_string_service$$name = '%s'
AND msgSystem = '%s' AND attribute_string_messaging$$system = '%s'
AND stringTagMap['messaging.destination.name'] = '%s' AND attributes_string['messaging.destination.name'] = '%s'
GROUP BY partition GROUP BY partition
) )
@ -169,34 +188,38 @@ SELECT
COALESCE(total_requests / %d, 0) AS throughput COALESCE(total_requests / %d, 0) AS throughput
FROM FROM
consumer_latency consumer_latency
`, start, end, service, queueType, topic, timeRange) `, start, end, tsBucketStart, tsBucketEnd, service, queueType, topic, timeRange)
return query return query
} }
// S3 consumer overview // S3 consumer overview
func generateConsumerLatencySQL(start, end int64, queueType string) string { func generateConsumerLatencySQL(start, end int64, queueType string) string {
timeRange := (end - start) / 1000000000 timeRange := (end - start) / 1000000000
tsBucketStart := (start / 1000000000) - 1800
tsBucketEnd := end / 1000000000
query := fmt.Sprintf(` query := fmt.Sprintf(`
WITH consumer_latency AS ( WITH consumer_latency AS (
SELECT SELECT
serviceName, resource_string_service$$name,
stringTagMap['messaging.destination.name'] AS topic, attributes_string['messaging.destination.name'] AS topic,
quantile(0.99)(durationNano) / 1000000 AS p99, quantile(0.99)(durationNano) / 1000000 AS p99,
COUNT(*) AS total_requests, COUNT(*) AS total_requests,
sumIf(1, statusCode = 2) AS error_count, sumIf(1, status_code = 2) AS error_count,
SUM(numberTagMap['messaging.message.body.size']) AS total_bytes SUM(attributes_number['messaging.message.body.size']) AS total_bytes
FROM signoz_traces.distributed_signoz_index_v2 FROM signoz_traces.distributed_signoz_index_v3
WHERE WHERE
timestamp >= '%d' timestamp >= '%d'
AND timestamp <= '%d' AND timestamp <= '%d'
AND ts_bucket_start >= '%d'
AND ts_bucket_start <= '%d'
AND kind = 5 AND kind = 5
AND msgSystem = '%s' AND attribute_string_messaging$$system = '%s'
GROUP BY topic, serviceName GROUP BY topic, resource_string_service$$name
) )
SELECT SELECT
topic, topic,
serviceName AS service_name, resource_string_service$$name AS service_name,
p99, p99,
COALESCE((error_count * 100.0) / total_requests, 0) AS error_rate, COALESCE((error_count * 100.0) / total_requests, 0) AS error_rate,
COALESCE(total_requests / %d, 0) AS ingestion_rate, COALESCE(total_requests / %d, 0) AS ingestion_rate,
@ -205,28 +228,32 @@ FROM
consumer_latency consumer_latency
ORDER BY ORDER BY
topic; topic;
`, start, end, queueType, timeRange, timeRange) `, start, end, tsBucketStart, tsBucketEnd, queueType, timeRange, timeRange)
return query return query
} }
// S3 consumer topic/service // S3 consumer topic/service
func generateConsumerServiceLatencySQL(start, end int64, topic, service, queueType string) string { func generateConsumerServiceLatencySQL(start, end int64, topic, service, queueType string) string {
timeRange := (end - start) / 1000000000 timeRange := (end - start) / 1000000000
tsBucketStart := (start / 1000000000) - 1800
tsBucketEnd := end / 1000000000
query := fmt.Sprintf(` query := fmt.Sprintf(`
WITH consumer_latency AS ( WITH consumer_latency AS (
SELECT SELECT
quantile(0.99)(durationNano) / 1000000 AS p99, quantile(0.99)(durationNano) / 1000000 AS p99,
stringTagMap['messaging.destination.partition.id'] AS partition, attributes_string['messaging.destination.partition.id'] AS partition,
COUNT(*) AS total_requests, COUNT(*) AS total_requests,
sumIf(1, statusCode = 2) AS error_count sumIf(1, status_code = 2) AS error_count
FROM signoz_traces.distributed_signoz_index_v2 FROM signoz_traces.distributed_signoz_index_v3
WHERE WHERE
timestamp >= '%d' timestamp >= '%d'
AND timestamp <= '%d' AND timestamp <= '%d'
AND ts_bucket_start >= '%d'
AND ts_bucket_start <= '%d'
AND kind = 5 AND kind = 5
AND serviceName = '%s' AND resource_string_service$$name = '%s'
AND msgSystem = '%s' AND attribute_string_messaging$$system = '%s'
AND stringTagMap['messaging.destination.name'] = '%s' AND attributes_string['messaging.destination.name'] = '%s'
GROUP BY partition GROUP BY partition
) )
@ -237,7 +264,7 @@ SELECT
COALESCE(total_requests / %d, 0) AS throughput COALESCE(total_requests / %d, 0) AS throughput
FROM FROM
consumer_latency consumer_latency
`, start, end, service, queueType, topic, timeRange) `, start, end, tsBucketStart, tsBucketEnd, service, queueType, topic, timeRange)
return query return query
} }
@ -246,26 +273,26 @@ func generateProducerConsumerEvalSQL(start, end int64, queueType string, evalTim
query := fmt.Sprintf(` query := fmt.Sprintf(`
WITH trace_data AS ( WITH trace_data AS (
SELECT SELECT
p.serviceName AS producer_service, p.resource_string_service$$name AS producer_service,
c.serviceName AS consumer_service, c.resource_string_service$$name AS consumer_service,
p.traceID, p.trace_id,
p.timestamp AS producer_timestamp, p.timestamp AS producer_timestamp,
c.timestamp AS consumer_timestamp, c.timestamp AS consumer_timestamp,
p.durationNano AS durationNano, p.durationNano AS durationNano,
(toUnixTimestamp64Nano(c.timestamp) - toUnixTimestamp64Nano(p.timestamp)) + p.durationNano AS time_difference (toUnixTimestamp64Nano(c.timestamp) - toUnixTimestamp64Nano(p.timestamp)) + p.durationNano AS time_difference
FROM FROM
signoz_traces.distributed_signoz_index_v2 p signoz_traces.distributed_signoz_index_v3 p
INNER JOIN INNER JOIN
signoz_traces.distributed_signoz_index_v2 c signoz_traces.distributed_signoz_index_v3 c
ON p.traceID = c.traceID ON p.trace_id = c.trace_id
AND c.parentSpanID = p.spanID AND c.parent_span_id = p.span_id
WHERE WHERE
p.kind = 4 p.kind = 4
AND c.kind = 5 AND c.kind = 5
AND toUnixTimestamp64Nano(p.timestamp) BETWEEN '%d' AND '%d' AND toUnixTimestamp64Nano(p.timestamp) BETWEEN '%d' AND '%d'
AND toUnixTimestamp64Nano(c.timestamp) BETWEEN '%d' AND '%d' AND toUnixTimestamp64Nano(c.timestamp) BETWEEN '%d' AND '%d'
AND c.msgSystem = '%s' AND c.attribute_string_messaging$$system = '%s'
AND p.msgSystem = '%s' AND p.attribute_string_messaging$$system = '%s'
) )
SELECT SELECT
@ -278,7 +305,7 @@ SELECT
arrayMap(x -> x.1, arrayMap(x -> x.1,
arraySort( arraySort(
x -> -x.2, x -> -x.2,
groupArrayIf((traceID, time_difference), time_difference > '%d') groupArrayIf((trace_id, time_difference), time_difference > '%d')
) )
), ),
1, 10 1, 10
@ -293,91 +320,107 @@ GROUP BY
func generateProducerSQL(start, end int64, topic, partition, queueType string) string { func generateProducerSQL(start, end int64, topic, partition, queueType string) string {
timeRange := (end - start) / 1000000000 timeRange := (end - start) / 1000000000
tsBucketStart := (start / 1000000000) - 1800
tsBucketEnd := end / 1000000000
query := fmt.Sprintf(` query := fmt.Sprintf(`
WITH producer_query AS ( WITH producer_query AS (
SELECT SELECT
serviceName, resource_string_service$$name,
quantile(0.99)(durationNano) / 1000000 AS p99, quantile(0.99)(durationNano) / 1000000 AS p99,
count(*) AS total_count, count(*) AS total_count,
sumIf(1, statusCode = 2) AS error_count sumIf(1, status_code = 2) AS error_count
FROM signoz_traces.distributed_signoz_index_v2 FROM signoz_traces.distributed_signoz_index_v3
WHERE WHERE
timestamp >= '%d' timestamp >= '%d'
AND timestamp <= '%d' AND timestamp <= '%d'
AND ts_bucket_start >= '%d'
AND ts_bucket_start <= '%d'
AND kind = 4 AND kind = 4
AND msgSystem = '%s' AND attribute_string_messaging$$system = '%s'
AND stringTagMap['messaging.destination.name'] = '%s' AND attributes_string['messaging.destination.name'] = '%s'
AND stringTagMap['messaging.destination.partition.id'] = '%s' AND attributes_string['messaging.destination.partition.id'] = '%s'
GROUP BY serviceName GROUP BY resource_string_service$$name
) )
SELECT SELECT
serviceName AS service_name, resource_string_service$$name AS service_name,
p99, p99,
COALESCE((error_count * 100.0) / total_count, 0) AS error_percentage, COALESCE((error_count * 100.0) / total_count, 0) AS error_percentage,
COALESCE(total_count / %d, 0) AS throughput COALESCE(total_count / %d, 0) AS throughput
FROM FROM
producer_query producer_query
ORDER BY ORDER BY
serviceName; resource_string_service$$name;
`, start, end, queueType, topic, partition, timeRange) `, start, end, tsBucketStart, tsBucketEnd, queueType, topic, partition, timeRange)
return query return query
} }
func generateNetworkLatencyThroughputSQL(start, end int64, consumerGroup, partitionID, queueType string) string { func generateNetworkLatencyThroughputSQL(start, end int64, consumerGroup, partitionID, queueType string) string {
timeRange := (end - start) / 1000000000 timeRange := (end - start) / 1000000000
tsBucketStart := (start / 1000000000) - 1800
tsBucketEnd := end / 1000000000
query := fmt.Sprintf(` query := fmt.Sprintf(`
SELECT SELECT
stringTagMap['messaging.client_id'] AS client_id, attributes_string['messaging.client_id'] AS client_id,
stringTagMap['service.instance.id'] AS service_instance_id, resources_string['service.instance.id'] AS service_instance_id,
serviceName AS service_name, resource_string_service$$name AS service_name,
count(*) / %d AS throughput count(*) / %d AS throughput
FROM signoz_traces.distributed_signoz_index_v2 FROM signoz_traces.distributed_signoz_index_v3
WHERE WHERE
timestamp >= '%d' timestamp >= '%d'
AND timestamp <= '%d' AND timestamp <= '%d'
AND ts_bucket_start >= '%d'
AND ts_bucket_start <= '%d'
AND kind = 5 AND kind = 5
AND msgSystem = '%s' AND attribute_string_messaging$$system = '%s'
AND stringTagMap['messaging.kafka.consumer.group'] = '%s' AND attributes_string['messaging.kafka.consumer.group'] = '%s'
AND stringTagMap['messaging.destination.partition.id'] = '%s' AND attributes_string['messaging.destination.partition.id'] = '%s'
GROUP BY service_name, client_id, service_instance_id GROUP BY service_name, client_id, service_instance_id
ORDER BY throughput DESC ORDER BY throughput DESC
`, timeRange, start, end, queueType, consumerGroup, partitionID) `, timeRange, start, end, tsBucketStart, tsBucketEnd, queueType, consumerGroup, partitionID)
return query return query
} }
func onboardProducersSQL(start, end int64, queueType string) string { func onboardProducersSQL(start, end int64, queueType string) string {
tsBucketStart := (start / 1000000000) - 1800
tsBucketEnd := end / 1000000000
query := fmt.Sprintf(` query := fmt.Sprintf(`
SELECT SELECT
COUNT(*) = 0 AS entries, COUNT(*) = 0 AS entries,
COUNT(IF(msgSystem = '%s', 1, NULL)) = 0 AS queue, COUNT(IF(attribute_string_messaging$$system = '%s', 1, NULL)) = 0 AS queue,
COUNT(IF(kind = 4, 1, NULL)) = 0 AS kind, COUNT(IF(kind = 4, 1, NULL)) = 0 AS kind,
COUNT(IF(has(stringTagMap, 'messaging.destination.name'), 1, NULL)) = 0 AS destination, COUNT(IF(has(attributes_string, 'messaging.destination.name'), 1, NULL)) = 0 AS destination,
COUNT(IF(has(stringTagMap, 'messaging.destination.partition.id'), 1, NULL)) = 0 AS partition COUNT(IF(has(attributes_string, 'messaging.destination.partition.id'), 1, NULL)) = 0 AS partition
FROM FROM
signoz_traces.distributed_signoz_index_v2 signoz_traces.distributed_signoz_index_v3
WHERE WHERE
timestamp >= '%d' timestamp >= '%d'
AND timestamp <= '%d';`, queueType, start, end) AND timestamp <= '%d'
AND ts_bucket_start >= '%d'
AND ts_bucket_start <= '%d';`, queueType, start, end, tsBucketStart, tsBucketEnd)
return query return query
} }
func onboardConsumerSQL(start, end int64, queueType string) string { func onboardConsumerSQL(start, end int64, queueType string) string {
tsBucketStart := (start / 1000000000) - 1800
tsBucketEnd := end / 1000000000
query := fmt.Sprintf(` query := fmt.Sprintf(`
SELECT SELECT
COUNT(*) = 0 AS entries, COUNT(*) = 0 AS entries,
COUNT(IF(msgSystem = '%s', 1, NULL)) = 0 AS queue, COUNT(IF(attribute_string_messaging$$system = '%s', 1, NULL)) = 0 AS queue,
COUNT(IF(kind = 5, 1, NULL)) = 0 AS kind, COUNT(IF(kind = 5, 1, NULL)) = 0 AS kind,
COUNT(serviceName) = 0 AS svc, COUNT(resource_string_service$$name) = 0 AS svc,
COUNT(IF(has(stringTagMap, 'messaging.destination.name'), 1, NULL)) = 0 AS destination, COUNT(IF(has(attributes_string, 'messaging.destination.name'), 1, NULL)) = 0 AS destination,
COUNT(IF(has(stringTagMap, 'messaging.destination.partition.id'), 1, NULL)) = 0 AS partition, COUNT(IF(has(attributes_string, 'messaging.destination.partition.id'), 1, NULL)) = 0 AS partition,
COUNT(IF(has(stringTagMap, 'messaging.kafka.consumer.group'), 1, NULL)) = 0 AS cgroup, COUNT(IF(has(attributes_string, 'messaging.kafka.consumer.group'), 1, NULL)) = 0 AS cgroup,
COUNT(IF(has(numberTagMap, 'messaging.message.body.size'), 1, NULL)) = 0 AS bodysize, COUNT(IF(has(attributes_number, 'messaging.message.body.size'), 1, NULL)) = 0 AS bodysize,
COUNT(IF(has(stringTagMap, 'messaging.client_id'), 1, NULL)) = 0 AS clientid, COUNT(IF(has(attributes_string, 'messaging.client_id'), 1, NULL)) = 0 AS clientid,
COUNT(IF(has(stringTagMap, 'service.instance.id'), 1, NULL)) = 0 AS instanceid COUNT(IF(has(resources_string, 'service.instance.id'), 1, NULL)) = 0 AS instanceid
FROM signoz_traces.distributed_signoz_index_v2 FROM signoz_traces.distributed_signoz_index_v3
WHERE WHERE
timestamp >= '%d' timestamp >= '%d'
AND timestamp <= '%d';`, queueType, start, end) AND timestamp <= '%d'
AND ts_bucket_start >= '%d'
AND ts_bucket_start <= '%d' ;`, queueType, start, end, tsBucketStart, tsBucketEnd)
return query return query
} }

View File

@ -228,8 +228,8 @@ func parseColumn(s string) (*string, error) {
return &colName, nil return &colName, nil
} }
func arrayToMap(fields []model.LogField) map[string]model.LogField { func arrayToMap(fields []model.Field) map[string]model.Field {
res := map[string]model.LogField{} res := map[string]model.Field{}
for _, field := range fields { for _, field := range fields {
res[field.Name] = field res[field.Name] = field
} }
@ -251,7 +251,7 @@ func replaceInterestingFields(allFields *model.GetFieldsResponse, queryTokens []
return queryTokens, nil return queryTokens, nil
} }
func replaceFieldInToken(queryToken string, selectedFieldsLookup map[string]model.LogField, interestingFieldLookup map[string]model.LogField) (string, error) { func replaceFieldInToken(queryToken string, selectedFieldsLookup map[string]model.Field, interestingFieldLookup map[string]model.Field) (string, error) {
op := strings.TrimSpace(operatorRegex.FindString(queryToken)) op := strings.TrimSpace(operatorRegex.FindString(queryToken))
opLower := strings.ToLower(op) opLower := strings.ToLower(op)
@ -283,7 +283,7 @@ func replaceFieldInToken(queryToken string, selectedFieldsLookup map[string]mode
} }
} else { } else {
// creating the query token here as we have the metadata // creating the query token here as we have the metadata
field := model.LogField{} field := model.Field{}
if sfield, ok := selectedFieldsLookup[sqlColName]; ok { if sfield, ok := selectedFieldsLookup[sqlColName]; ok {
field = sfield field = sfield

View File

@ -238,14 +238,14 @@ func TestParseColumn(t *testing.T) {
func TestReplaceInterestingFields(t *testing.T) { func TestReplaceInterestingFields(t *testing.T) {
queryTokens := []string{"id.userid IN (100) ", "and id_key >= 50 ", `AND body ILIKE '%searchstring%'`} queryTokens := []string{"id.userid IN (100) ", "and id_key >= 50 ", `AND body ILIKE '%searchstring%'`}
allFields := model.GetFieldsResponse{ allFields := model.GetFieldsResponse{
Selected: []model.LogField{ Selected: []model.Field{
{ {
Name: "id_key", Name: "id_key",
DataType: "int64", DataType: "int64",
Type: "attributes", Type: "attributes",
}, },
}, },
Interesting: []model.LogField{ Interesting: []model.Field{
{ {
Name: "id.userid", Name: "id.userid",
DataType: "int64", DataType: "int64",
@ -326,7 +326,7 @@ func TestCheckIfPrevousPaginateAndModifyOrder(t *testing.T) {
} }
var generateSQLQueryFields = model.GetFieldsResponse{ var generateSQLQueryFields = model.GetFieldsResponse{
Selected: []model.LogField{ Selected: []model.Field{
{ {
Name: "field1", Name: "field1",
DataType: "int64", DataType: "int64",
@ -348,7 +348,7 @@ var generateSQLQueryFields = model.GetFieldsResponse{
Type: "static", Type: "static",
}, },
}, },
Interesting: []model.LogField{ Interesting: []model.Field{
{ {
Name: "FielD1", Name: "FielD1",
DataType: "int64", DataType: "int64",

View File

@ -6,6 +6,7 @@ import (
"go.signoz.io/signoz/pkg/query-service/constants" "go.signoz.io/signoz/pkg/query-service/constants"
"go.signoz.io/signoz/pkg/query-service/model" "go.signoz.io/signoz/pkg/query-service/model"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
) )
func ValidateUpdateFieldPayload(field *model.UpdateField) error { func ValidateUpdateFieldPayload(field *model.UpdateField) error {
@ -38,3 +39,36 @@ func ValidateUpdateFieldPayload(field *model.UpdateField) error {
} }
return nil return nil
} }
func ValidateUpdateFieldPayloadV2(field *model.UpdateField) error {
if field.Name == "" {
return fmt.Errorf("name cannot be empty")
}
if field.Type == "" {
return fmt.Errorf("type cannot be empty")
}
if field.DataType == "" {
return fmt.Errorf("dataType cannot be empty")
}
// the logs api uses the old names i.e attributes and resources while traces use tag and attribute.
// update log api to use tag and attribute.
matched, err := regexp.MatchString(fmt.Sprintf("^(%s|%s)$", v3.AttributeKeyTypeTag, v3.AttributeKeyTypeResource), field.Type)
if err != nil {
return err
}
if !matched {
return fmt.Errorf("type %s not supported", field.Type)
}
if field.IndexType != "" {
matched, err := regexp.MatchString(`^(minmax|set\([0-9]\)|bloom_filter\((0?.?[0-9]+|1)\)|tokenbf_v1\([0-9]+,[0-9]+,[0-9]+\)|ngrambf_v1\([0-9]+,[0-9]+,[0-9]+,[0-9]+\))$`, field.IndexType)
if err != nil {
return err
}
if !matched {
return fmt.Errorf("index type %s not supported", field.IndexType)
}
}
return nil
}

View File

@ -190,7 +190,7 @@ func (q *querier) runBuilderQuery(
ch <- channelResult{Err: err, Name: queryName, Query: limitQuery, Series: nil} ch <- channelResult{Err: err, Name: queryName, Query: limitQuery, Series: nil}
return return
} }
query = fmt.Sprintf(placeholderQuery, limitQuery) query = strings.Replace(placeholderQuery, "#LIMIT_PLACEHOLDER", limitQuery, 1)
} else { } else {
query, err = tracesQueryBuilder( query, err = tracesQueryBuilder(
start, start,

View File

@ -190,7 +190,7 @@ func (q *querier) runBuilderQuery(
ch <- channelResult{Err: err, Name: queryName, Query: limitQuery, Series: nil} ch <- channelResult{Err: err, Name: queryName, Query: limitQuery, Series: nil}
return return
} }
query = fmt.Sprintf(placeholderQuery, limitQuery) query = strings.Replace(placeholderQuery, "#LIMIT_PLACEHOLDER", limitQuery, 1)
} else { } else {
query, err = tracesQueryBuilder( query, err = tracesQueryBuilder(
start, start,

View File

@ -34,8 +34,8 @@ func enrichKeyWithMetadata(key v3.AttributeKey, keys map[string]v3.AttributeKey)
return v return v
} }
for _, key := range utils.GenerateEnrichmentKeys(key) { for _, tkey := range utils.GenerateEnrichmentKeys(key) {
if val, ok := keys[key]; ok { if val, ok := keys[tkey]; ok {
return val return val
} }
} }

View File

@ -74,6 +74,19 @@ func getSelectLabels(groupBy []v3.AttributeKey) string {
return strings.Join(labels, ",") return strings.Join(labels, ",")
} }
// TODO(nitya): use the _exists columns as well in the future similar to logs
func existsSubQueryForFixedColumn(key v3.AttributeKey, op v3.FilterOperator) (string, error) {
if key.DataType == v3.AttributeKeyDataTypeString {
if op == v3.FilterOperatorExists {
return fmt.Sprintf("%s %s ''", getColumnName(key), tracesOperatorMappingV3[v3.FilterOperatorNotEqual]), nil
} else {
return fmt.Sprintf("%s %s ''", getColumnName(key), tracesOperatorMappingV3[v3.FilterOperatorEqual]), nil
}
} else {
return "", fmt.Errorf("unsupported operation, exists and not exists can only be applied on custom attributes or string type columns")
}
}
func buildTracesFilterQuery(fs *v3.FilterSet) (string, error) { func buildTracesFilterQuery(fs *v3.FilterSet) (string, error) {
var conditions []string var conditions []string
@ -110,7 +123,7 @@ func buildTracesFilterQuery(fs *v3.FilterSet) (string, error) {
conditions = append(conditions, fmt.Sprintf(operator, columnName, fmtVal)) conditions = append(conditions, fmt.Sprintf(operator, columnName, fmtVal))
case v3.FilterOperatorExists, v3.FilterOperatorNotExists: case v3.FilterOperatorExists, v3.FilterOperatorNotExists:
if item.Key.IsColumn { if item.Key.IsColumn {
subQuery, err := tracesV3.ExistsSubQueryForFixedColumn(item.Key, item.Operator) subQuery, err := existsSubQueryForFixedColumn(item.Key, item.Operator)
if err != nil { if err != nil {
return "", err return "", err
} }
@ -312,7 +325,7 @@ func buildTracesQuery(start, end, step int64, mq *v3.BuilderQuery, panelType v3.
} }
if options.GraphLimitQtype == constants.SecondQueryGraphLimit { if options.GraphLimitQtype == constants.SecondQueryGraphLimit {
filterSubQuery = filterSubQuery + " AND " + fmt.Sprintf("(%s) GLOBAL IN (", tracesV3.GetSelectKeys(mq.AggregateOperator, mq.GroupBy)) + "%s)" filterSubQuery = filterSubQuery + " AND " + fmt.Sprintf("(%s) GLOBAL IN (", tracesV3.GetSelectKeys(mq.AggregateOperator, mq.GroupBy)) + "#LIMIT_PLACEHOLDER)"
} }
switch mq.AggregateOperator { switch mq.AggregateOperator {
@ -350,7 +363,7 @@ func buildTracesQuery(start, end, step int64, mq *v3.BuilderQuery, panelType v3.
case v3.AggregateOperatorCount: case v3.AggregateOperatorCount:
if mq.AggregateAttribute.Key != "" { if mq.AggregateAttribute.Key != "" {
if mq.AggregateAttribute.IsColumn { if mq.AggregateAttribute.IsColumn {
subQuery, err := tracesV3.ExistsSubQueryForFixedColumn(mq.AggregateAttribute, v3.FilterOperatorExists) subQuery, err := existsSubQueryForFixedColumn(mq.AggregateAttribute, v3.FilterOperatorExists)
if err == nil { if err == nil {
filterSubQuery = fmt.Sprintf("%s AND %s", filterSubQuery, subQuery) filterSubQuery = fmt.Sprintf("%s AND %s", filterSubQuery, subQuery)
} }

View File

@ -265,9 +265,11 @@ func Test_buildTracesFilterQuery(t *testing.T) {
{Key: v3.AttributeKey{Key: "isDone", DataType: v3.AttributeKeyDataTypeBool, Type: v3.AttributeKeyTypeTag}, Operator: v3.FilterOperatorNotExists}, {Key: v3.AttributeKey{Key: "isDone", DataType: v3.AttributeKeyDataTypeBool, Type: v3.AttributeKeyTypeTag}, Operator: v3.FilterOperatorNotExists},
{Key: v3.AttributeKey{Key: "host1", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}, Operator: v3.FilterOperatorNotExists}, {Key: v3.AttributeKey{Key: "host1", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}, Operator: v3.FilterOperatorNotExists},
{Key: v3.AttributeKey{Key: "path", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: true}, Operator: v3.FilterOperatorNotExists}, {Key: v3.AttributeKey{Key: "path", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: true}, Operator: v3.FilterOperatorNotExists},
{Key: v3.AttributeKey{Key: "http_url", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: true}, Operator: v3.FilterOperatorNotExists},
{Key: v3.AttributeKey{Key: "http.route", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: true}, Operator: v3.FilterOperatorNotExists},
}}, }},
}, },
want: "mapContains(attributes_string, 'host') AND mapContains(attributes_number, 'duration') AND NOT mapContains(attributes_bool, 'isDone') AND NOT mapContains(attributes_string, 'host1') AND path = ''", want: "mapContains(attributes_string, 'host') AND mapContains(attributes_number, 'duration') AND NOT mapContains(attributes_bool, 'isDone') AND NOT mapContains(attributes_string, 'host1') AND `attribute_string_path` = '' AND http_url = '' AND `attribute_string_http$$route` = ''",
}, },
} }
for _, tt := range tests { for _, tt := range tests {
@ -683,7 +685,7 @@ func TestPrepareTracesQuery(t *testing.T) {
}, },
}, },
want: "SELECT attributes_string['function'] as `function`, toFloat64(count(distinct(name))) as value from signoz_traces.distributed_signoz_index_v3 where " + want: "SELECT attributes_string['function'] as `function`, toFloat64(count(distinct(name))) as value from signoz_traces.distributed_signoz_index_v3 where " +
"(timestamp >= '1680066360726210000' AND timestamp <= '1680066458000000000') AND (ts_bucket_start >= 1680064560 AND ts_bucket_start <= 1680066458) AND mapContains(attributes_string, 'function') AND (`function`) GLOBAL IN (%s) group by `function` order by value DESC", "(timestamp >= '1680066360726210000' AND timestamp <= '1680066458000000000') AND (ts_bucket_start >= 1680064560 AND ts_bucket_start <= 1680066458) AND mapContains(attributes_string, 'function') AND (`function`) GLOBAL IN (#LIMIT_PLACEHOLDER) group by `function` order by value DESC",
}, },
{ {
name: "test with limit with resources- first", name: "test with limit with resources- first",
@ -766,7 +768,7 @@ func TestPrepareTracesQuery(t *testing.T) {
want: "SELECT `attribute_string_function` as `function`, serviceName as `serviceName`, toFloat64(count(distinct(name))) as value from signoz_traces.distributed_signoz_index_v3 " + want: "SELECT `attribute_string_function` as `function`, serviceName as `serviceName`, toFloat64(count(distinct(name))) as value from signoz_traces.distributed_signoz_index_v3 " +
"where (timestamp >= '1680066360726210000' AND timestamp <= '1680066458000000000') AND (ts_bucket_start >= 1680064560 AND ts_bucket_start <= 1680066458) AND attributes_number['line'] = 100 " + "where (timestamp >= '1680066360726210000' AND timestamp <= '1680066458000000000') AND (ts_bucket_start >= 1680064560 AND ts_bucket_start <= 1680066458) AND attributes_number['line'] = 100 " +
"AND (resource_fingerprint GLOBAL IN (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (seen_at_ts_bucket_start >= 1680064560) AND (seen_at_ts_bucket_start <= 1680066458) " + "AND (resource_fingerprint GLOBAL IN (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (seen_at_ts_bucket_start >= 1680064560) AND (seen_at_ts_bucket_start <= 1680066458) " +
"AND simpleJSONExtractString(labels, 'hostname') = 'server1' AND labels like '%hostname%server1%')) AND (`function`,`serviceName`) GLOBAL IN (%s) group by `function`,`serviceName` order by value DESC", "AND simpleJSONExtractString(labels, 'hostname') = 'server1' AND labels like '%hostname%server1%')) AND (`function`,`serviceName`) GLOBAL IN (#LIMIT_PLACEHOLDER) group by `function`,`serviceName` order by value DESC",
}, },
} }

View File

@ -290,7 +290,7 @@ const (
UINT8 = "Uint8" UINT8 = "Uint8"
) )
var StaticSelectedLogFields = []model.LogField{ var StaticSelectedLogFields = []model.Field{
{ {
Name: "timestamp", Name: "timestamp",
DataType: UINT32, DataType: UINT32,

View File

@ -109,6 +109,10 @@ type Reader interface {
SubscribeToQueryProgress(queryId string) (<-chan model.QueryProgress, func(), *model.ApiError) SubscribeToQueryProgress(queryId string) (<-chan model.QueryProgress, func(), *model.ApiError)
GetCountOfThings(ctx context.Context, query string) (uint64, error) GetCountOfThings(ctx context.Context, query string) (uint64, error)
//trace
GetTraceFields(ctx context.Context) (*model.GetFieldsResponse, *model.ApiError)
UpdateTraceField(ctx context.Context, field *model.UpdateField) *model.ApiError
} }
type Querier interface { type Querier interface {

View File

@ -509,15 +509,15 @@ type ShowCreateTableStatement struct {
Statement string `json:"statement" ch:"statement"` Statement string `json:"statement" ch:"statement"`
} }
type LogField struct { type Field struct {
Name string `json:"name" ch:"name"` Name string `json:"name" ch:"name"`
DataType string `json:"dataType" ch:"datatype"` DataType string `json:"dataType" ch:"datatype"`
Type string `json:"type"` Type string `json:"type"`
} }
type GetFieldsResponse struct { type GetFieldsResponse struct {
Selected []LogField `json:"selected"` Selected []Field `json:"selected"`
Interesting []LogField `json:"interesting"` Interesting []Field `json:"interesting"`
} }
// Represents a log record in query service requests and responses. // Represents a log record in query service requests and responses.

View File

@ -5,7 +5,7 @@ Follow the steps in this section to install a sample application named HotR.O.D,
```console ```console
kubectl create ns sample-application kubectl create ns sample-application
kubectl -n sample-application apply -f https://github.com/SigNoz/signoz/raw/develop/sample-apps/hotrod/hotrod.yaml kubectl -n sample-application apply -f https://github.com/SigNoz/signoz/raw/main/sample-apps/hotrod/hotrod.yaml
``` ```
In case, you have installed SigNoz in namespace other than `platform` or selected Helm release name other than `my-release`, follow the steps below: In case, you have installed SigNoz in namespace other than `platform` or selected Helm release name other than `my-release`, follow the steps below:
@ -15,7 +15,7 @@ export HELM_RELEASE=my-release-2
export SIGNOZ_NAMESPACE=platform-2 export SIGNOZ_NAMESPACE=platform-2
export HOTROD_NAMESPACE=sample-application-2 export HOTROD_NAMESPACE=sample-application-2
curl -sL https://github.com/SigNoz/signoz/raw/develop/sample-apps/hotrod/hotrod-install.sh | bash curl -sL https://github.com/SigNoz/signoz/raw/main/sample-apps/hotrod/hotrod-install.sh | bash
``` ```
To delete sample application: To delete sample application:
@ -23,7 +23,7 @@ To delete sample application:
```console ```console
export HOTROD_NAMESPACE=sample-application-2 export HOTROD_NAMESPACE=sample-application-2
curl -sL https://github.com/SigNoz/signoz/raw/develop/sample-apps/hotrod/hotrod-delete.sh | bash curl -sL https://github.com/SigNoz/signoz/raw/main/sample-apps/hotrod/hotrod-delete.sh | bash
``` ```
For testing with local scripts, you can use the following commands: For testing with local scripts, you can use the following commands:

View File

@ -7,7 +7,7 @@ HOTROD_NAMESPACE=${HOTROD_NAMESPACE:-"sample-application"}
if [[ "${HOTROD_NAMESPACE}" == "default" || "${HOTROD_NAMESPACE}" == "kube-system" || "${HOTROD_NAMESPACE}" == "platform" ]]; then if [[ "${HOTROD_NAMESPACE}" == "default" || "${HOTROD_NAMESPACE}" == "kube-system" || "${HOTROD_NAMESPACE}" == "platform" ]]; then
echo "Default k8s namespace and SigNoz namespace must not be deleted" echo "Default k8s namespace and SigNoz namespace must not be deleted"
echo "Deleting components only" echo "Deleting components only"
kubectl delete --namespace="${HOTROD_NAMESPACE}" -f <(cat hotrod-template.yaml || curl -sL https://github.com/SigNoz/signoz/raw/develop/sample-apps/hotrod/hotrod-template.yaml) kubectl delete --namespace="${HOTROD_NAMESPACE}" -f <(cat hotrod-template.yaml || curl -sL https://github.com/SigNoz/signoz/raw/main/sample-apps/hotrod/hotrod-template.yaml)
else else
echo "Delete HotROD sample app namespace ${HOTROD_NAMESPACE}" echo "Delete HotROD sample app namespace ${HOTROD_NAMESPACE}"
kubectl delete namespace "${HOTROD_NAMESPACE}" kubectl delete namespace "${HOTROD_NAMESPACE}"

View File

@ -37,7 +37,7 @@ kubectl create namespace "$HOTROD_NAMESPACE" --save-config --dry-run -o yaml 2>/
# Setup sample apps into specified namespace # Setup sample apps into specified namespace
kubectl apply --namespace="${HOTROD_NAMESPACE}" -f <( \ kubectl apply --namespace="${HOTROD_NAMESPACE}" -f <( \
(cat hotrod-template.yaml 2>/dev/null || curl -sL https://github.com/SigNoz/signoz/raw/develop/sample-apps/hotrod/hotrod-template.yaml) | \ (cat hotrod-template.yaml 2>/dev/null || curl -sL https://github.com/SigNoz/signoz/raw/main/sample-apps/hotrod/hotrod-template.yaml) | \
HOTROD_NAMESPACE="${HOTROD_NAMESPACE}" \ HOTROD_NAMESPACE="${HOTROD_NAMESPACE}" \
HOTROD_IMAGE="${HOTROD_IMAGE}" \ HOTROD_IMAGE="${HOTROD_IMAGE}" \
LOCUST_IMAGE="${LOCUST_IMAGE}" \ LOCUST_IMAGE="${LOCUST_IMAGE}" \