package clickhouseprometheus import ( "context" "fmt" "github.com/SigNoz/signoz/pkg/query-service/constants" "math" "strconv" "strings" "github.com/SigNoz/signoz/pkg/factory" "github.com/SigNoz/signoz/pkg/telemetrystore" promValue "github.com/prometheus/prometheus/model/value" "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage/remote" ) type client struct { settings factory.ScopedProviderSettings telemetryStore telemetrystore.TelemetryStore } func NewReadClient(settings factory.ScopedProviderSettings, telemetryStore telemetrystore.TelemetryStore) remote.ReadClient { return &client{ settings: settings, telemetryStore: telemetryStore, } } func (client *client) Read(ctx context.Context, query *prompb.Query, sortSeries bool) (storage.SeriesSet, error) { if len(query.Matchers) == 2 { var hasJob bool var queryString string for _, m := range query.Matchers { if m.Type == prompb.LabelMatcher_EQ && m.Name == "job" && m.Value == "rawsql" { hasJob = true } if m.Type == prompb.LabelMatcher_EQ && m.Name == "query" { queryString = m.Value } } if hasJob && queryString != "" { res, err := client.queryRaw(ctx, queryString, int64(query.EndTimestampMs)) if err != nil { return nil, err } return remote.FromQueryResult(sortSeries, res), nil } } var metricName string for _, matcher := range query.Matchers { if matcher.Name == "__name__" { metricName = matcher.Value } } clickhouseQuery, args, err := client.queryToClickhouseQuery(ctx, query, metricName, false) if err != nil { return nil, err } fingerprints, err := client.getFingerprintsFromClickhouseQuery(ctx, clickhouseQuery, args) if err != nil { return nil, err } if len(fingerprints) == 0 { return remote.FromQueryResult(sortSeries, new(prompb.QueryResult)), nil } clickhouseSubQuery, args, err := client.queryToClickhouseQuery(ctx, query, metricName, true) if err != nil { return nil, err } res := new(prompb.QueryResult) timeseries, err := client.querySamples(ctx, int64(query.StartTimestampMs), int64(query.EndTimestampMs), fingerprints, metricName, clickhouseSubQuery, args) if err != nil { return nil, err } res.Timeseries = timeseries return remote.FromQueryResult(sortSeries, res), nil } func (client *client) queryToClickhouseQuery(_ context.Context, query *prompb.Query, metricName string, subQuery bool) (string, []any, error) { var clickHouseQuery string var conditions []string var argCount int = 0 var selectString string = "fingerprint, any(labels)" if subQuery { argCount = 1 selectString = "fingerprint" } start, end, tableName := getStartAndEndAndTableName(query.StartTimestampMs, query.EndTimestampMs) var args []any conditions = append(conditions, fmt.Sprintf("metric_name = $%d", argCount+1)) conditions = append(conditions, "temporality IN ['Cumulative', 'Unspecified']") conditions = append(conditions, fmt.Sprintf("unix_milli >= %d AND unix_milli < %d", start, end)) normalized := true if constants.IsDotMetricsEnabled { normalized = false } conditions = append(conditions, fmt.Sprintf("__normalized = %v", normalized)) args = append(args, metricName) for _, m := range query.Matchers { switch m.Type { case prompb.LabelMatcher_EQ: conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, $%d) = $%d", argCount+2, argCount+3)) case prompb.LabelMatcher_NEQ: conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, $%d) != $%d", argCount+2, argCount+3)) case prompb.LabelMatcher_RE: conditions = append(conditions, fmt.Sprintf("match(JSONExtractString(labels, $%d), $%d)", argCount+2, argCount+3)) case prompb.LabelMatcher_NRE: conditions = append(conditions, fmt.Sprintf("not match(JSONExtractString(labels, $%d), $%d)", argCount+2, argCount+3)) default: return "", nil, fmt.Errorf("unexpected matcher found in query: %s", m.Type.String()) } args = append(args, m.Name, m.Value) argCount += 2 } whereClause := strings.Join(conditions, " AND ") clickHouseQuery = fmt.Sprintf(`SELECT %s FROM %s.%s WHERE %s GROUP BY fingerprint`, selectString, databaseName, tableName, whereClause) return clickHouseQuery, args, nil } func (client *client) getFingerprintsFromClickhouseQuery(ctx context.Context, query string, args []any) (map[uint64][]prompb.Label, error) { rows, err := client.telemetryStore.ClickhouseDB().Query(ctx, query, args...) if err != nil { return nil, err } defer rows.Close() fingerprints := make(map[uint64][]prompb.Label) var fingerprint uint64 var labelString string for rows.Next() { if err = rows.Scan(&fingerprint, &labelString); err != nil { return nil, err } labels, _, err := unmarshalLabels(labelString) if err != nil { return nil, err } fingerprints[fingerprint] = labels } if err := rows.Err(); err != nil { return nil, err } return fingerprints, nil } func (client *client) querySamples(ctx context.Context, start int64, end int64, fingerprints map[uint64][]prompb.Label, metricName string, subQuery string, args []any) ([]*prompb.TimeSeries, error) { argCount := len(args) query := fmt.Sprintf(` SELECT metric_name, fingerprint, unix_milli, value FROM %s.%s WHERE metric_name = $1 AND fingerprint GLOBAL IN (%s) AND unix_milli >= $%s AND unix_milli <= $%s ORDER BY fingerprint, unix_milli;`, databaseName, distributedSamplesV4, subQuery, strconv.Itoa(argCount+2), strconv.Itoa(argCount+3)) query = strings.TrimSpace(query) allArgs := append([]any{metricName}, args...) allArgs = append(allArgs, start, end) rows, err := client.telemetryStore.ClickhouseDB().Query(ctx, query, allArgs...) if err != nil { return nil, err } defer rows.Close() var res []*prompb.TimeSeries var ts *prompb.TimeSeries var fingerprint, prevFingerprint uint64 var timestampMs int64 var value float64 var flags uint32 for rows.Next() { if err := rows.Scan(&metricName, &fingerprint, ×tampMs, &value, &flags); err != nil { return nil, err } // collect samples in time series if fingerprint != prevFingerprint { // add collected time series to result prevFingerprint = fingerprint if ts != nil { res = append(res, ts) } labels := fingerprints[fingerprint] ts = &prompb.TimeSeries{ Labels: labels, } } if flags&1 == 1 { value = math.Float64frombits(promValue.StaleNaN) } // add samples to current time series ts.Samples = append(ts.Samples, prompb.Sample{ Timestamp: timestampMs, Value: value, }) } // add last time series if ts != nil { res = append(res, ts) } if err := rows.Err(); err != nil { return nil, err } return res, nil } func (client *client) queryRaw(ctx context.Context, query string, ts int64) (*prompb.QueryResult, error) { rows, err := client.telemetryStore.ClickhouseDB().Query(ctx, query) if err != nil { return nil, err } defer rows.Close() columns := rows.Columns() var res prompb.QueryResult targets := make([]any, len(columns)) for i := range targets { targets[i] = new(scanner) } for rows.Next() { if err = rows.Scan(targets...); err != nil { return nil, err } labels := make([]prompb.Label, 0, len(columns)) var value float64 for i, c := range columns { v := targets[i].(*scanner) switch c { case "value": value = v.f default: labels = append(labels, prompb.Label{ Name: c, Value: v.s, }) } } res.Timeseries = append(res.Timeseries, &prompb.TimeSeries{ Labels: labels, Samples: []prompb.Sample{{ Value: value, Timestamp: ts, }}, }) } if err = rows.Err(); err != nil { return nil, err } return &res, nil }