mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-07-28 02:42:02 +08:00
285 lines
7.6 KiB
Go
285 lines
7.6 KiB
Go
package clickhouseprometheus
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"github.com/SigNoz/signoz/pkg/query-service/constants"
|
|
"math"
|
|
"strconv"
|
|
"strings"
|
|
|
|
"github.com/SigNoz/signoz/pkg/factory"
|
|
"github.com/SigNoz/signoz/pkg/telemetrystore"
|
|
promValue "github.com/prometheus/prometheus/model/value"
|
|
"github.com/prometheus/prometheus/prompb"
|
|
"github.com/prometheus/prometheus/storage"
|
|
"github.com/prometheus/prometheus/storage/remote"
|
|
)
|
|
|
|
type client struct {
|
|
settings factory.ScopedProviderSettings
|
|
telemetryStore telemetrystore.TelemetryStore
|
|
}
|
|
|
|
func NewReadClient(settings factory.ScopedProviderSettings, telemetryStore telemetrystore.TelemetryStore) remote.ReadClient {
|
|
return &client{
|
|
settings: settings,
|
|
telemetryStore: telemetryStore,
|
|
}
|
|
}
|
|
|
|
func (client *client) Read(ctx context.Context, query *prompb.Query, sortSeries bool) (storage.SeriesSet, error) {
|
|
if len(query.Matchers) == 2 {
|
|
var hasJob bool
|
|
var queryString string
|
|
for _, m := range query.Matchers {
|
|
if m.Type == prompb.LabelMatcher_EQ && m.Name == "job" && m.Value == "rawsql" {
|
|
hasJob = true
|
|
}
|
|
if m.Type == prompb.LabelMatcher_EQ && m.Name == "query" {
|
|
queryString = m.Value
|
|
}
|
|
}
|
|
|
|
if hasJob && queryString != "" {
|
|
res, err := client.queryRaw(ctx, queryString, int64(query.EndTimestampMs))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return remote.FromQueryResult(sortSeries, res), nil
|
|
}
|
|
}
|
|
|
|
var metricName string
|
|
for _, matcher := range query.Matchers {
|
|
if matcher.Name == "__name__" {
|
|
metricName = matcher.Value
|
|
}
|
|
}
|
|
|
|
clickhouseQuery, args, err := client.queryToClickhouseQuery(ctx, query, metricName, false)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
fingerprints, err := client.getFingerprintsFromClickhouseQuery(ctx, clickhouseQuery, args)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if len(fingerprints) == 0 {
|
|
return remote.FromQueryResult(sortSeries, new(prompb.QueryResult)), nil
|
|
}
|
|
|
|
clickhouseSubQuery, args, err := client.queryToClickhouseQuery(ctx, query, metricName, true)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
res := new(prompb.QueryResult)
|
|
timeseries, err := client.querySamples(ctx, int64(query.StartTimestampMs), int64(query.EndTimestampMs), fingerprints, metricName, clickhouseSubQuery, args)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
res.Timeseries = timeseries
|
|
|
|
return remote.FromQueryResult(sortSeries, res), nil
|
|
}
|
|
|
|
func (client *client) queryToClickhouseQuery(_ context.Context, query *prompb.Query, metricName string, subQuery bool) (string, []any, error) {
|
|
var clickHouseQuery string
|
|
var conditions []string
|
|
var argCount int = 0
|
|
var selectString string = "fingerprint, any(labels)"
|
|
if subQuery {
|
|
argCount = 1
|
|
selectString = "fingerprint"
|
|
}
|
|
|
|
start, end, tableName := getStartAndEndAndTableName(query.StartTimestampMs, query.EndTimestampMs)
|
|
|
|
var args []any
|
|
conditions = append(conditions, fmt.Sprintf("metric_name = $%d", argCount+1))
|
|
conditions = append(conditions, "temporality IN ['Cumulative', 'Unspecified']")
|
|
conditions = append(conditions, fmt.Sprintf("unix_milli >= %d AND unix_milli < %d", start, end))
|
|
|
|
normalized := true
|
|
if constants.IsDotMetricsEnabled {
|
|
normalized = false
|
|
}
|
|
|
|
conditions = append(conditions, fmt.Sprintf("__normalized = %v", normalized))
|
|
|
|
args = append(args, metricName)
|
|
for _, m := range query.Matchers {
|
|
switch m.Type {
|
|
case prompb.LabelMatcher_EQ:
|
|
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, $%d) = $%d", argCount+2, argCount+3))
|
|
case prompb.LabelMatcher_NEQ:
|
|
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, $%d) != $%d", argCount+2, argCount+3))
|
|
case prompb.LabelMatcher_RE:
|
|
conditions = append(conditions, fmt.Sprintf("match(JSONExtractString(labels, $%d), $%d)", argCount+2, argCount+3))
|
|
case prompb.LabelMatcher_NRE:
|
|
conditions = append(conditions, fmt.Sprintf("not match(JSONExtractString(labels, $%d), $%d)", argCount+2, argCount+3))
|
|
default:
|
|
return "", nil, fmt.Errorf("unexpected matcher found in query: %s", m.Type.String())
|
|
}
|
|
args = append(args, m.Name, m.Value)
|
|
argCount += 2
|
|
}
|
|
|
|
whereClause := strings.Join(conditions, " AND ")
|
|
|
|
clickHouseQuery = fmt.Sprintf(`SELECT %s FROM %s.%s WHERE %s GROUP BY fingerprint`, selectString, databaseName, tableName, whereClause)
|
|
|
|
return clickHouseQuery, args, nil
|
|
}
|
|
|
|
func (client *client) getFingerprintsFromClickhouseQuery(ctx context.Context, query string, args []any) (map[uint64][]prompb.Label, error) {
|
|
rows, err := client.telemetryStore.ClickhouseDB().Query(ctx, query, args...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
fingerprints := make(map[uint64][]prompb.Label)
|
|
|
|
var fingerprint uint64
|
|
var labelString string
|
|
for rows.Next() {
|
|
if err = rows.Scan(&fingerprint, &labelString); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
labels, _, err := unmarshalLabels(labelString)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
fingerprints[fingerprint] = labels
|
|
}
|
|
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return fingerprints, nil
|
|
}
|
|
|
|
func (client *client) querySamples(ctx context.Context, start int64, end int64, fingerprints map[uint64][]prompb.Label, metricName string, subQuery string, args []any) ([]*prompb.TimeSeries, error) {
|
|
argCount := len(args)
|
|
|
|
query := fmt.Sprintf(`
|
|
SELECT metric_name, fingerprint, unix_milli, value
|
|
FROM %s.%s
|
|
WHERE metric_name = $1 AND fingerprint GLOBAL IN (%s) AND unix_milli >= $%s AND unix_milli <= $%s ORDER BY fingerprint, unix_milli;`,
|
|
databaseName, distributedSamplesV4, subQuery, strconv.Itoa(argCount+2), strconv.Itoa(argCount+3))
|
|
query = strings.TrimSpace(query)
|
|
|
|
allArgs := append([]any{metricName}, args...)
|
|
allArgs = append(allArgs, start, end)
|
|
|
|
rows, err := client.telemetryStore.ClickhouseDB().Query(ctx, query, allArgs...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var res []*prompb.TimeSeries
|
|
var ts *prompb.TimeSeries
|
|
var fingerprint, prevFingerprint uint64
|
|
var timestampMs int64
|
|
var value float64
|
|
var flags uint32
|
|
|
|
for rows.Next() {
|
|
if err := rows.Scan(&metricName, &fingerprint, ×tampMs, &value, &flags); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// collect samples in time series
|
|
if fingerprint != prevFingerprint {
|
|
// add collected time series to result
|
|
prevFingerprint = fingerprint
|
|
if ts != nil {
|
|
res = append(res, ts)
|
|
}
|
|
|
|
labels := fingerprints[fingerprint]
|
|
ts = &prompb.TimeSeries{
|
|
Labels: labels,
|
|
}
|
|
}
|
|
|
|
if flags&1 == 1 {
|
|
value = math.Float64frombits(promValue.StaleNaN)
|
|
}
|
|
|
|
// add samples to current time series
|
|
ts.Samples = append(ts.Samples, prompb.Sample{
|
|
Timestamp: timestampMs,
|
|
Value: value,
|
|
})
|
|
}
|
|
|
|
// add last time series
|
|
if ts != nil {
|
|
res = append(res, ts)
|
|
}
|
|
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return res, nil
|
|
}
|
|
|
|
func (client *client) queryRaw(ctx context.Context, query string, ts int64) (*prompb.QueryResult, error) {
|
|
rows, err := client.telemetryStore.ClickhouseDB().Query(ctx, query)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
columns := rows.Columns()
|
|
var res prompb.QueryResult
|
|
targets := make([]any, len(columns))
|
|
for i := range targets {
|
|
targets[i] = new(scanner)
|
|
}
|
|
|
|
for rows.Next() {
|
|
if err = rows.Scan(targets...); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
labels := make([]prompb.Label, 0, len(columns))
|
|
var value float64
|
|
for i, c := range columns {
|
|
v := targets[i].(*scanner)
|
|
switch c {
|
|
case "value":
|
|
value = v.f
|
|
default:
|
|
labels = append(labels, prompb.Label{
|
|
Name: c,
|
|
Value: v.s,
|
|
})
|
|
}
|
|
}
|
|
|
|
res.Timeseries = append(res.Timeseries, &prompb.TimeSeries{
|
|
Labels: labels,
|
|
Samples: []prompb.Sample{{
|
|
Value: value,
|
|
Timestamp: ts,
|
|
}},
|
|
})
|
|
}
|
|
if err = rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &res, nil
|
|
}
|