From 4dc9fb5869a2a4b7829d03fcd05e4bdf0301dd0f Mon Sep 17 00:00:00 2001 From: Mukil Date: Wed, 18 Feb 2026 15:50:27 -0800 Subject: [PATCH] [GPUHEALTH-1361] Add Jitter to Intial OTLP Push to avoid thundering herd Signed-off-by: Mukil Modify jitter Signed-off-by: Mukil modify jitter handling Signed-off-by: Mukil --- internal/exporter/exporter.go | 2 +- internal/exporter/writer/http.go | 34 +++++++++++++++++++++++++-- internal/exporter/writer/http_test.go | 22 ++++++++--------- 3 files changed, 44 insertions(+), 14 deletions(-) diff --git a/internal/exporter/exporter.go b/internal/exporter/exporter.go index 90fd4929..bf0c9052 100644 --- a/internal/exporter/exporter.go +++ b/internal/exporter/exporter.go @@ -231,7 +231,7 @@ func (e *healthExporter) exportToHTTP(ctx context.Context, data *collector.Healt return nil } - newToken, err := e.httpWriter.Send(ctx, data, e.options.config.MetricsEndpoint, e.options.config.LogsEndpoint, e.options.config.RetryMaxAttempts, e.options.config.AuthToken) + newToken, err := e.httpWriter.Send(ctx, data, e.options.config.MetricsEndpoint, e.options.config.LogsEndpoint, e.options.config.RetryMaxAttempts, e.options.config.AuthToken, e.options.config.Interval.Duration) if err != nil { return fmt.Errorf("failed to send data: %w", err) } diff --git a/internal/exporter/writer/http.go b/internal/exporter/writer/http.go index 4111a4a5..13854ff2 100644 --- a/internal/exporter/writer/http.go +++ b/internal/exporter/writer/http.go @@ -35,6 +35,10 @@ import ( const ( // defaultRetryDelay is the default delay between retry attempts defaultRetryDelay = 5 * time.Second + + // maxInitialJitter caps the initial jitter so that even with very long + // collection intervals the pre-push delay stays reasonable. + maxInitialJitter = 1 * time.Minute ) // HTTPError represents an HTTP error with status code @@ -56,7 +60,7 @@ type JWTRefreshFunc func(ctx context.Context) (string, error) // HTTPWriter defines the interface for sending health data via HTTP type HTTPWriter interface { - Send(ctx context.Context, data *collector.HealthData, metricsEndpoint string, logsEndpoint string, maxRetries int, authToken string) (newToken string, err error) + Send(ctx context.Context, data *collector.HealthData, metricsEndpoint string, logsEndpoint string, maxRetries int, authToken string, collectionInterval time.Duration) (newToken string, err error) SetJWTRefreshFunc(refreshFunc JWTRefreshFunc) } @@ -81,7 +85,33 @@ func (w *httpWriter) SetJWTRefreshFunc(refreshFunc JWTRefreshFunc) { } // Send sends health data to the specified endpoint -func (w *httpWriter) Send(ctx context.Context, data *collector.HealthData, metricsEndpoint string, logsEndpoint string, maxRetries int, authToken string) (string, error) { +func (w *httpWriter) Send(ctx context.Context, data *collector.HealthData, metricsEndpoint string, logsEndpoint string, maxRetries int, authToken string, collectionInterval time.Duration) (string, error) { + // Add jitter before the initial push to spread agent requests and prevent + // thundering herd when many agents share the same collection tick. + // Use 5% of the collection interval, capped at maxInitialJitter + jitterCap := collectionInterval / 20 // 5% + if jitterCap > maxInitialJitter { + jitterCap = maxInitialJitter + } + if jitterCap <= 0 { + jitterCap = defaultRetryDelay / 2 // fallback for zero/negative intervals + } + + // Apply a further limit to the initial jitter of half the remaining context deadline. + // This is to prevent the jitter from consuming the entire timeout budget. + if deadline, ok := ctx.Deadline(); ok { + if remaining := time.Until(deadline) / 2; remaining > 0 && jitterCap > remaining { + jitterCap = remaining + } + } + if jitter := calculateJitter(jitterCap); jitter > 0 { + select { + case <-ctx.Done(): + return "", ctx.Err() + case <-time.After(jitter): + } + } + // Convert to OTLP format otlpData := w.otlpConverter.Convert(data) diff --git a/internal/exporter/writer/http_test.go b/internal/exporter/writer/http_test.go index c558d03c..1258f355 100644 --- a/internal/exporter/writer/http_test.go +++ b/internal/exporter/writer/http_test.go @@ -86,7 +86,7 @@ func TestHTTPWriter_Send_Success(t *testing.T) { writer := NewHTTPWriter(httpClient, otlpConverter) ctx := context.Background() - newToken, err := writer.Send(ctx, testData, metricsServer.URL, logsServer.URL, 1, "test-token") + newToken, err := writer.Send(ctx, testData, metricsServer.URL, logsServer.URL, 1, "test-token", 1*time.Minute) require.NoError(t, err) assert.Empty(t, newToken) @@ -126,7 +126,7 @@ func TestHTTPWriter_Send_EmptyMetrics(t *testing.T) { writer := NewHTTPWriter(httpClient, otlpConverter) ctx := context.Background() - _, err := writer.Send(ctx, testData, "", logsServer.URL, 1, "test-token") + _, err := writer.Send(ctx, testData, "", logsServer.URL, 1, "test-token", 1*time.Minute) require.NoError(t, err) assert.Equal(t, 1, logsRequests) @@ -164,7 +164,7 @@ func TestHTTPWriter_Send_EmptyLogs(t *testing.T) { writer := NewHTTPWriter(httpClient, otlpConverter) ctx := context.Background() - _, err := writer.Send(ctx, testData, metricsServer.URL, "", 1, "test-token") + _, err := writer.Send(ctx, testData, metricsServer.URL, "", 1, "test-token", 1*time.Minute) require.NoError(t, err) assert.Equal(t, 1, metricsRequests) @@ -204,7 +204,7 @@ func TestHTTPWriter_Send_RetryOnFailure(t *testing.T) { writer := NewHTTPWriter(httpClient, otlpConverter) ctx := context.Background() - _, err := writer.Send(ctx, testData, server.URL, "", 3, "test-token") + _, err := writer.Send(ctx, testData, server.URL, "", 3, "test-token", 1*time.Minute) require.NoError(t, err) assert.Equal(t, 3, attempts) @@ -241,7 +241,7 @@ func TestHTTPWriter_Send_RetryExhausted(t *testing.T) { writer := NewHTTPWriter(httpClient, otlpConverter) ctx := context.Background() - _, err := writer.Send(ctx, testData, "", server.URL, maxAttempts, "test-token") + _, err := writer.Send(ctx, testData, "", server.URL, maxAttempts, "test-token", 1*time.Minute) require.Error(t, err) assert.Equal(t, maxAttempts, attempts) @@ -292,7 +292,7 @@ func TestHTTPWriter_Send_UnauthorizedWithJWTRefresh(t *testing.T) { }) ctx := context.Background() - _, err := writer.Send(ctx, testData, server.URL, "", 3, "old-token") + _, err := writer.Send(ctx, testData, server.URL, "", 3, "old-token", 1*time.Minute) require.NoError(t, err) assert.True(t, refreshCalled) @@ -332,7 +332,7 @@ func TestHTTPWriter_Send_UnauthorizedJWTRefreshFails(t *testing.T) { }) ctx := context.Background() - _, err := writer.Send(ctx, testData, "", server.URL, 2, "old-token") // Reduce retries + _, err := writer.Send(ctx, testData, "", server.URL, 2, "old-token", 1*time.Minute) // Reduce retries require.Error(t, err) assert.GreaterOrEqual(t, attempts, 1) @@ -368,7 +368,7 @@ func TestHTTPWriter_Send_JWTTokenRefreshFromHeader(t *testing.T) { writer := NewHTTPWriter(httpClient, otlpConverter) ctx := context.Background() - returnedToken, err := writer.Send(ctx, testData, server.URL, "", 1, "test-token") + returnedToken, err := writer.Send(ctx, testData, server.URL, "", 1, "test-token", 1*time.Minute) require.NoError(t, err) assert.Equal(t, newToken, returnedToken) @@ -404,7 +404,7 @@ func TestHTTPWriter_Send_ContextCancellation(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) defer cancel() - _, err := writer.Send(ctx, testData, "", server.URL, 1, "test-token") + _, err := writer.Send(ctx, testData, "", server.URL, 1, "test-token", 1*time.Minute) require.Error(t, err) } @@ -443,7 +443,7 @@ func TestHTTPWriter_Send_LogsFailure(t *testing.T) { writer := NewHTTPWriter(httpClient, otlpConverter) ctx := context.Background() - _, err := writer.Send(ctx, testData, metricsServer.URL, logsServer.URL, 1, "test-token") + _, err := writer.Send(ctx, testData, metricsServer.URL, logsServer.URL, 1, "test-token", 1*time.Minute) require.Error(t, err) assert.Contains(t, err.Error(), "failed to send critical logs data") @@ -571,7 +571,7 @@ func TestHTTPWriter_MarshalError(t *testing.T) { ctx := context.Background() // Pass empty endpoints - should not send anything - newToken, err := writer.Send(ctx, testData, "", "", 1, "test-token") + newToken, err := writer.Send(ctx, testData, "", "", 1, "test-token", 1*time.Minute) require.NoError(t, err) assert.Empty(t, newToken)