mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-06-21 05:58:23 +08:00

* feat: get query progress tracker started * feat: flesh out query progress test some more and get first few assertions passing * chore: flesh out query tracker tests and impl some more * feat: add impl for QueryTracker.Subscribe * feat: send latest update if available on subscription * feat: broadcast query progress to all subscribers on update * feat: finish plumbing query tracker happy path * feat: finish with v0 impl for query progress tracker * chore: some cleanup of query progress tracker * feat: hook up query progress tracking for queryRangeV3 * feat: impl for query progress websocket API handler * feat: implement Hijacker iface for loggingResponseWriter for websocket upgrades * chore: some cleanup to query progress websocket API handler * chore: some cleanup * chore: move query progress impl into its own subpackage * chore: separate in-memory tracker impl from interface * chore: some more cleanup of in memory tracker * chore: some more cleanup of query progress tracker * chore: some final cleanups
103 lines
3.3 KiB
Go
103 lines
3.3 KiB
Go
package queryprogress
|
|
|
|
import (
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/ClickHouse/clickhouse-go/v2"
|
|
"github.com/stretchr/testify/require"
|
|
"go.signoz.io/signoz/pkg/query-service/model"
|
|
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
|
)
|
|
|
|
func TestQueryProgressTracking(t *testing.T) {
|
|
require := require.New(t)
|
|
|
|
tracker := NewQueryProgressTracker()
|
|
|
|
testQueryId := "test-query"
|
|
|
|
testProgress := &clickhouse.Progress{}
|
|
err := tracker.ReportQueryProgress(testQueryId, testProgress)
|
|
require.NotNil(err, "shouldn't be able to report query progress before query has been started")
|
|
require.Equal(err.Type(), model.ErrorNotFound)
|
|
|
|
ch, unsubscribe, err := tracker.SubscribeToQueryProgress(testQueryId)
|
|
require.NotNil(err, "shouldn't be able to subscribe for progress updates before query has been started")
|
|
require.Equal(err.Type(), model.ErrorNotFound)
|
|
require.Nil(ch)
|
|
require.Nil(unsubscribe)
|
|
|
|
reportQueryFinished, err := tracker.ReportQueryStarted(testQueryId)
|
|
require.Nil(err, "should be able to report start of a query to be tracked")
|
|
|
|
testProgress1 := &clickhouse.Progress{
|
|
Rows: 10,
|
|
Bytes: 20,
|
|
TotalRows: 100,
|
|
Elapsed: 20 * time.Millisecond,
|
|
}
|
|
err = tracker.ReportQueryProgress(testQueryId, testProgress1)
|
|
require.Nil(err, "should be able to report progress after query has started")
|
|
|
|
ch, unsubscribe, err = tracker.SubscribeToQueryProgress(testQueryId)
|
|
require.Nil(err, "should be able to subscribe to query progress updates after query started")
|
|
require.NotNil(ch)
|
|
require.NotNil(unsubscribe)
|
|
|
|
expectedProgress := v3.QueryProgress{}
|
|
updateQueryProgress(&expectedProgress, testProgress1)
|
|
require.Equal(expectedProgress.ReadRows, testProgress1.Rows)
|
|
select {
|
|
case qp := <-ch:
|
|
require.Equal(qp, expectedProgress)
|
|
default:
|
|
require.Fail("should receive latest query progress state immediately after subscription")
|
|
}
|
|
select {
|
|
case _ = <-ch:
|
|
require.Fail("should have had only one pending update at this point")
|
|
default:
|
|
}
|
|
|
|
testProgress2 := &clickhouse.Progress{
|
|
Rows: 20,
|
|
Bytes: 40,
|
|
TotalRows: 100,
|
|
Elapsed: 40 * time.Millisecond,
|
|
}
|
|
err = tracker.ReportQueryProgress(testQueryId, testProgress2)
|
|
require.Nil(err, "should be able to report progress multiple times while query is in progress")
|
|
|
|
updateQueryProgress(&expectedProgress, testProgress2)
|
|
select {
|
|
case qp := <-ch:
|
|
require.Equal(qp, expectedProgress)
|
|
default:
|
|
require.Fail("should receive updates whenever new progress updates get reported to tracker")
|
|
}
|
|
select {
|
|
case _ = <-ch:
|
|
require.Fail("should have had only one pending update at this point")
|
|
default:
|
|
}
|
|
|
|
reportQueryFinished()
|
|
select {
|
|
case _, isSubscriptionChannelOpen := <-ch:
|
|
require.False(isSubscriptionChannelOpen, "subscription channels should get closed after query finishes")
|
|
default:
|
|
require.Fail("subscription channels should get closed after query finishes")
|
|
}
|
|
|
|
err = tracker.ReportQueryProgress(testQueryId, testProgress)
|
|
require.NotNil(err, "shouldn't be able to report query progress after query has finished")
|
|
require.Equal(err.Type(), model.ErrorNotFound)
|
|
|
|
ch, unsubscribe, err = tracker.SubscribeToQueryProgress(testQueryId)
|
|
require.NotNil(err, "shouldn't be able to subscribe for progress updates after query has finished")
|
|
require.Equal(err.Type(), model.ErrorNotFound)
|
|
require.Nil(ch)
|
|
require.Nil(unsubscribe)
|
|
}
|