Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
c5c46ea
[PECOBLR] Enable telemetry by default with DSN-controlled priority an…
samikshya-db Apr 13, 2026
ddc415f
Refactor: replace ClientExplicit+bool with *bool tristate for EnableT…
samikshya-db Apr 13, 2026
561698e
Remove redundant tests; rename survivors to match their true scope
samikshya-db Apr 13, 2026
3ad77bd
Fix telemetry: correct EXECUTE_STATEMENT latency, add sql_statement_i…
samikshya-db Apr 13, 2026
33fcf74
Add per-chunk download timing to telemetry
samikshya-db Apr 13, 2026
242fad2
Delay EXECUTE_STATEMENT metric emission to rows.Close()
samikshya-db Apr 13, 2026
6aee916
Fix metric loss: drain exportQueue before cancelling workers in close()
samikshya-db Apr 13, 2026
6c99a84
[PECOBLR-1384] Fix telemetry gaps: CloudFetch S3 timing, chunk totals…
samikshya-db Apr 14, 2026
b084c59
Add correctness tests for telemetry gap fixes
samikshya-db Apr 14, 2026
3aaedd1
Fix chunk_total_present for paginated CloudFetch
samikshya-db Apr 14, 2026
cd1b371
Fix lint: remove unused slowExporter, apply gofmt -s
samikshya-db Apr 14, 2026
77f15a4
Merge branch 'main' into telemetry-default-on-and-dsn-config
samikshya-db Apr 14, 2026
42f004b
Address review feedback: timeout-safe close, retry config fix, callba…
samikshya-db Apr 14, 2026
d8371e1
Fix telemetry accuracy: iteration errors, sub-ms CloudFetch counting,…
samikshya-db Apr 14, 2026
a715f59
Fix error reporting: separate iteration/close errors, track ScanRow, …
samikshya-db Apr 14, 2026
55272d2
Address code review findings: extract helpers, fix ctx capture, use t…
samikshya-db Apr 14, 2026
8cbbd32
Fix review findings: Go 1.20 compat, retry bounds in InitializeForCon…
samikshya-db Apr 14, 2026
91f5a5f
Merge branch 'main' into telemetry-default-on-and-dsn-config
samikshya-db Apr 14, 2026
9f0ae68
Address remaining review findings R5-R11
samikshya-db Apr 14, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 155 additions & 30 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (c *conn) Close() error {

// Record DELETE_SESSION regardless of error (matches JDBC), then flush and release
if c.telemetry != nil {
c.telemetry.RecordOperation(ctx, c.id, telemetry.OperationTypeDeleteSession, time.Since(closeStart).Milliseconds(), err)
c.telemetry.RecordOperation(ctx, c.id, "", telemetry.OperationTypeDeleteSession, time.Since(closeStart).Milliseconds(), err)
_ = c.telemetry.Close(ctx)
telemetry.ReleaseForConnection(c.cfg.Host)
}
Expand Down Expand Up @@ -130,15 +130,20 @@ func (c *conn) ExecContext(ctx context.Context, query string, args []driver.Name
executeStart := time.Now()
exStmtResp, opStatusResp, err := c.runQuery(ctx, query, args)
log, ctx = client.LoggerAndContext(ctx, exStmtResp)
stagingErr := c.execStagingOperation(exStmtResp, ctx)

// Telemetry: track statement execution
// Telemetry: set up metric context BEFORE staging operation so that the
// staging op's telemetryUpdate callback can attach tags to the metric context.
var statementID string
var closeOpErr error // Track CloseOperation errors for telemetry
if c.telemetry != nil && exStmtResp != nil && exStmtResp.OperationHandle != nil && exStmtResp.OperationHandle.OperationId != nil {
statementID = client.SprintGuid(exStmtResp.OperationHandle.OperationId.GUID)
// Use BeforeExecuteWithTime to set the correct start time (before execution)
ctx = c.telemetry.BeforeExecuteWithTime(ctx, c.id, statementID, executeStart)
c.telemetry.AddTag(ctx, telemetry.TagOperationType, telemetry.OperationTypeExecuteStatement)
}

stagingErr := c.execStagingOperation(exStmtResp, ctx)

if c.telemetry != nil && statementID != "" {
defer func() {
finalErr := err
if stagingErr != nil {
Expand All @@ -163,7 +168,7 @@ func (c *conn) ExecContext(ctx context.Context, query string, args []driver.Name
OperationHandle: exStmtResp.OperationHandle,
})
if c.telemetry != nil {
c.telemetry.RecordOperation(ctx, c.id, telemetry.OperationTypeCloseStatement, time.Since(closeOpStart).Milliseconds(), err1)
c.telemetry.RecordOperation(ctx, c.id, statementID, telemetry.OperationTypeCloseStatement, time.Since(closeOpStart).Milliseconds(), err1)
}
if err1 != nil {
log.Err(err1).Msg("databricks: failed to close operation after executing statement")
Expand All @@ -179,14 +184,55 @@ func (c *conn) ExecContext(ctx context.Context, query string, args []driver.Name

if stagingErr != nil {
log.Err(stagingErr).Msgf("databricks: failed to execute query: query %s", query)
return nil, dbsqlerrint.NewExecutionError(ctx, dbsqlerr.ErrQueryExecution, err, opStatusResp)
return nil, dbsqlerrint.NewExecutionError(ctx, dbsqlerr.ErrQueryExecution, stagingErr, opStatusResp)
}

res := result{AffectedRows: opStatusResp.GetNumModifiedRows()}

return &res, nil
}

// chunkTimingAccumulator aggregates per-chunk fetch latencies for telemetry.
// It tracks the initial, slowest, and cumulative latencies, plus the number
// of CloudFetch file downloads. All fields should be accessed under the
// serialization provided by database/sql's closemu (see QueryContext).
type chunkTimingAccumulator struct {
initialMs int64
slowestMs int64
sumMs int64
initialSet bool
// cloudFetchFileCount counts individual S3 files downloaded via CloudFetch.
// Used to set chunk_total_present correctly for both bulk and paginated CloudFetch:
// - paginated CF (1 link/FetchResults): file count == page count == correct total
// - bulk CF (all links in DirectResults): file count == actual S3 downloads
// For inline ArrowBatch results this stays 0 and chunk_total_present falls back to chunkCount.
cloudFetchFileCount int
}

// record accumulates a single chunk or download latency. Returns true if
// the latency was positive and tags should be updated; false otherwise.
func (a *chunkTimingAccumulator) record(latencyMs int64) bool {
if latencyMs <= 0 {
return false
}
if !a.initialSet {
a.initialMs = latencyMs
a.initialSet = true
}
if latencyMs > a.slowestMs {
a.slowestMs = latencyMs
}
a.sumMs += latencyMs
return true
}

// applyTags writes the current timing state to the telemetry context.
func (a *chunkTimingAccumulator) applyTags(ctx context.Context, interceptor *telemetry.Interceptor) {
interceptor.AddTag(ctx, telemetry.TagChunkInitialLatencyMs, a.initialMs)
interceptor.AddTag(ctx, telemetry.TagChunkSlowestLatencyMs, a.slowestMs)
interceptor.AddTag(ctx, telemetry.TagChunkSumLatencyMs, a.sumMs)
}

// QueryContext executes a query that may return rows, such as a
// SELECT.
//
Expand All @@ -206,32 +252,116 @@ func (c *conn) QueryContext(ctx context.Context, query string, args []driver.Nam
log, ctx = client.LoggerAndContext(ctx, exStmtResp)
defer log.Duration(msg, start)

// Telemetry: track statement execution
// Telemetry: set up metric context for the statement.
// BeforeExecuteWithTime anchors startTime to before runQuery() ran.
var statementID string
if c.telemetry != nil && exStmtResp != nil && exStmtResp.OperationHandle != nil && exStmtResp.OperationHandle.OperationId != nil {
statementID = client.SprintGuid(exStmtResp.OperationHandle.OperationId.GUID)
// Use BeforeExecuteWithTime to set the correct start time (before execution)
ctx = c.telemetry.BeforeExecuteWithTime(ctx, c.id, statementID, executeStart)
defer func() {
c.telemetry.AfterExecute(ctx, err)
c.telemetry.CompleteStatement(ctx, statementID, err != nil)
}()
c.telemetry.AddTag(ctx, telemetry.TagOperationType, telemetry.OperationTypeExecuteStatement)
}

if err != nil {
// Error path: finalize and emit the EXECUTE_STATEMENT metric immediately —
// there are no rows to iterate so the metric is complete right now.
if c.telemetry != nil && statementID != "" {
c.telemetry.AfterExecute(ctx, err)
c.telemetry.CompleteStatement(ctx, statementID, true)
}
log.Err(err).Msg("databricks: failed to run query") // To log query we need to redact credentials
return nil, dbsqlerrint.NewExecutionError(ctx, dbsqlerr.ErrQueryExecution, err, opStatusResp)
}

// Telemetry callback for tracking row fetching metrics
telemetryUpdate := func(chunkCount int, bytesDownloaded int64) {
if c.telemetry != nil {
c.telemetry.AddTag(ctx, "chunk_count", chunkCount)
c.telemetry.AddTag(ctx, "bytes_downloaded", bytesDownloaded)
// Success path: freeze execute latency NOW (before row iteration inflates time.Since).
// AfterExecute/CompleteStatement are called from closeCallback after all chunks
// are fetched, so the final metric carries complete chunk timing data.
if c.telemetry != nil && statementID != "" {
c.telemetry.FinalizeLatency(ctx)
}

// chunkTimingAccumulator aggregates per-chunk fetch latencies across all
// fetchResultPage calls. These fields are safe without a mutex because they
// are only mutated from callbacks serialized by database/sql's closemu lock:
// telemetryUpdate and cloudFetchCallback run inside rows.Next() (which
// holds closemu.RLock), and closeCallback runs inside rows.Close() (which
// holds closemu.Lock). This ensures mutual exclusion even when Close() is
// called from database/sql's awaitDone goroutine on context cancellation.
var timing chunkTimingAccumulator

// Detach from caller's context so that telemetry tag writes and flushes
// survive context cancellation (e.g. query timeout, database/sql awaitDone).
// All three callbacks (telemetryUpdate, cloudFetchCallback, closeCallback)
// use this detached context uniformly.
telemetryCtx := context2.WithoutCancel(ctx)

// Telemetry callback invoked after each result page is fetched.
telemetryUpdate := func(chunkCount int, bytesDownloaded int64, chunkIndex int, chunkLatencyMs int64, _ int32) {
if c.telemetry == nil {
return
}
c.telemetry.AddTag(telemetryCtx, telemetry.TagChunkCount, chunkCount)
c.telemetry.AddTag(telemetryCtx, telemetry.TagBytesDownloaded, bytesDownloaded)

// Aggregate per-chunk fetch latencies (skip direct results where latency is 0).
if timing.record(chunkLatencyMs) {
timing.applyTags(telemetryCtx, c.telemetry)
}
// chunk_total_present is set definitively in closeCallback once all pages are known.
}

// cloudFetchCallback is invoked per S3 file download for CloudFetch result sets.
// It aggregates individual file download times into the same initial/slowest/sum vars
// used for inline chunk timing, matching JDBC's per-chunk HTTP GET timing model.
// For inline (non-CloudFetch) result sets this is never called.
var cloudFetchCallback func(downloadMs int64)
if c.telemetry != nil {
cloudFetchCallback = func(downloadMs int64) {
timing.cloudFetchFileCount++ // always count files for chunk_total_present, even sub-ms downloads
if timing.record(downloadMs) {
timing.applyTags(telemetryCtx, c.telemetry)
}
}
}

// closeCallback is invoked from rows.Close() after all rows have been consumed.
// At that point chunk timing is fully accumulated in telemetryCtx tags, so we
// finalize EXECUTE_STATEMENT here rather than at QueryContext return time.
var closeCallback func(latencyMs int64, chunkCount int, iterErr error, closeErr error)
if c.telemetry != nil && statementID != "" {
interceptor := c.telemetry
connID := c.id
stmtID := statementID
closeCallback = func(latencyMs int64, chunkCount int, iterErr error, closeErr error) {
// Set chunk_total_present to the definitive total now that all iteration is done.
// For CloudFetch, use cloudFetchFileCount (actual S3 downloads) — this handles
// both paginated CF (1 link/page, so file count == page count) and bulk CF
// (all links in DirectResults, so file count == total S3 files).
// For inline ArrowBatch, cloudFetchFileCount is 0; fall back to chunkCount.
if timing.cloudFetchFileCount > 0 {
interceptor.AddTag(telemetryCtx, telemetry.TagChunkTotalPresent, timing.cloudFetchFileCount)
} else if chunkCount > 0 {
interceptor.AddTag(telemetryCtx, telemetry.TagChunkTotalPresent, chunkCount)
}
// EXECUTE_STATEMENT uses the iteration error (row consumption failure)
// to correctly report whether the statement succeeded or failed.
interceptor.AfterExecute(telemetryCtx, iterErr)
interceptor.CompleteStatement(telemetryCtx, stmtID, iterErr != nil)
// CLOSE_STATEMENT uses the actual CloseOperation RPC error.
interceptor.RecordOperation(telemetryCtx, connID, stmtID, telemetry.OperationTypeCloseStatement, latencyMs, closeErr)
}
} else if c.telemetry != nil {
interceptor := c.telemetry
connID := c.id
closeCallback = func(latencyMs int64, _ int, _ error, closeErr error) {
interceptor.RecordOperation(telemetryCtx, connID, "", telemetry.OperationTypeCloseStatement, latencyMs, closeErr)
}
}

rows, err := rows.NewRows(ctx, exStmtResp.OperationHandle, c.client, c.cfg, exStmtResp.DirectResults, telemetryUpdate)
rows, err := rows.NewRows(ctx, exStmtResp.OperationHandle, c.client, c.cfg, exStmtResp.DirectResults, &rows.TelemetryCallbacks{
OnChunkFetched: telemetryUpdate,
OnClose: closeCallback,
OnCloudFetchFile: cloudFetchCallback,
})
return rows, err

}
Expand Down Expand Up @@ -396,14 +526,7 @@ func (c *conn) executeStatement(ctx context.Context, query string, args []driver
}
}

executeStart := time.Now()
resp, err := c.client.ExecuteStatement(ctx, &req)
// Record the Thrift call latency as a separate operation metric.
// This is distinct from the statement-level metric (BeforeExecuteWithTime), which
// measures end-to-end latency including polling and row fetching.
if c.telemetry != nil {
c.telemetry.RecordOperation(ctx, c.id, telemetry.OperationTypeExecuteStatement, time.Since(executeStart).Milliseconds(), err)
}
var log *logger.DBSQLLogger
log, ctx = client.LoggerAndContext(ctx, resp)

Expand Down Expand Up @@ -668,14 +791,16 @@ func (c *conn) execStagingOperation(
}

if len(driverctx.StagingPathsFromContext(ctx)) != 0 {
// Telemetry callback for staging operation row fetching
telemetryUpdate := func(chunkCount int, bytesDownloaded int64) {
// Telemetry callback for staging operation row fetching (chunk timing not tracked for staging ops).
telemetryUpdate := func(chunkCount int, bytesDownloaded int64, chunkIndex int, chunkLatencyMs int64, totalChunksPresent int32) {
if c.telemetry != nil {
c.telemetry.AddTag(ctx, "chunk_count", chunkCount)
c.telemetry.AddTag(ctx, "bytes_downloaded", bytesDownloaded)
c.telemetry.AddTag(ctx, telemetry.TagChunkCount, chunkCount)
c.telemetry.AddTag(ctx, telemetry.TagBytesDownloaded, bytesDownloaded)
}
}
row, err = rows.NewRows(ctx, exStmtResp.OperationHandle, c.client, c.cfg, exStmtResp.DirectResults, telemetryUpdate)
row, err = rows.NewRows(ctx, exStmtResp.OperationHandle, c.client, c.cfg, exStmtResp.DirectResults, &rows.TelemetryCallbacks{
OnChunkFetched: telemetryUpdate,
})
if err != nil {
return dbsqlerrint.NewDriverError(ctx, "error reading row.", err)
}
Expand Down
53 changes: 53 additions & 0 deletions connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1975,6 +1975,59 @@ func TestConn_execStagingOperation(t *testing.T) {
})
}

func TestChunkTimingAccumulator_Record(t *testing.T) {
tests := []struct {
name string
latencies []int64
wantInit int64
wantSlow int64
wantSum int64
wantReturn []bool
}{
{"zero latency skipped", []int64{0}, 0, 0, 0, []bool{false}},
{"negative skipped", []int64{-5}, 0, 0, 0, []bool{false}},
{"single positive", []int64{10}, 10, 10, 10, []bool{true}},
{"initial preserved across calls", []int64{10, 20}, 10, 20, 30, []bool{true, true}},
{"slowest tracks max not last", []int64{30, 10, 50}, 30, 50, 90, []bool{true, true, true}},
{"zero interleaved skipped", []int64{10, 0, 20}, 10, 20, 30, []bool{true, false, true}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var a chunkTimingAccumulator
for i, lat := range tt.latencies {
got := a.record(lat)
if got != tt.wantReturn[i] {
t.Errorf("record(%d) = %v, want %v", lat, got, tt.wantReturn[i])
}
}
if a.initialMs != tt.wantInit {
t.Errorf("initialMs = %d, want %d", a.initialMs, tt.wantInit)
}
if a.slowestMs != tt.wantSlow {
t.Errorf("slowestMs = %d, want %d", a.slowestMs, tt.wantSlow)
}
if a.sumMs != tt.wantSum {
t.Errorf("sumMs = %d, want %d", a.sumMs, tt.wantSum)
}
})
}
}

func TestChunkTimingAccumulator_CloudFetchFileCount(t *testing.T) {
var a chunkTimingAccumulator
a.cloudFetchFileCount++
a.record(0) // sub-ms download — still counted but not timed
a.cloudFetchFileCount++
a.record(5)

if a.cloudFetchFileCount != 2 {
t.Errorf("cloudFetchFileCount = %d, want 2", a.cloudFetchFileCount)
}
if a.initialMs != 5 {
t.Errorf("initialMs = %d, want 5 (zero-latency file should not set initial)", a.initialMs)
}
}

func getTestSession() *cli_service.TOpenSessionResp {
return &cli_service.TOpenSessionResp{SessionHandle: &cli_service.TSessionHandle{
SessionId: &cli_service.THandleIdentifier{
Expand Down
21 changes: 11 additions & 10 deletions connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,18 +81,19 @@ func (c *connector) Connect(ctx context.Context) (driver.Conn, error) {
log := logger.WithContext(conn.id, driverctx.CorrelationIdFromContext(ctx), "")

// Initialize telemetry: client config overlay decides; if unset, feature flags decide
conn.telemetry = telemetry.InitializeForConnection(
ctx,
c.cfg.Host,
c.cfg.DriverVersion,
c.client,
c.cfg.EnableTelemetry,
c.cfg.TelemetryBatchSize,
c.cfg.TelemetryFlushInterval,
)
conn.telemetry = telemetry.InitializeForConnection(ctx, telemetry.TelemetryInitOptions{
Host: c.cfg.Host,
DriverVersion: c.cfg.DriverVersion,
HTTPClient: c.client,
EnableTelemetry: c.cfg.EnableTelemetry,
BatchSize: c.cfg.TelemetryBatchSize,
FlushInterval: c.cfg.TelemetryFlushInterval,
RetryCount: c.cfg.TelemetryRetryCount,
RetryDelay: c.cfg.TelemetryRetryDelay,
})
if conn.telemetry != nil {
log.Debug().Msg("telemetry initialized for connection")
conn.telemetry.RecordOperation(ctx, conn.id, telemetry.OperationTypeCreateSession, sessionLatencyMs, nil)
conn.telemetry.RecordOperation(ctx, conn.id, "", telemetry.OperationTypeCreateSession, sessionLatencyMs, nil)
}

log.Info().Msgf("connect: host=%s port=%d httpPath=%s serverProtocolVersion=0x%X", c.cfg.Host, c.cfg.Port, c.cfg.HTTPPath, session.ServerProtocolVersion)
Expand Down
Loading
Loading