Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func (e *healthExporter) exportToHTTP(ctx context.Context, data *collector.Healt
return nil
}

newToken, err := e.httpWriter.Send(ctx, data, e.options.config.MetricsEndpoint, e.options.config.LogsEndpoint, e.options.config.RetryMaxAttempts, e.options.config.AuthToken)
newToken, err := e.httpWriter.Send(ctx, data, e.options.config.MetricsEndpoint, e.options.config.LogsEndpoint, e.options.config.RetryMaxAttempts, e.options.config.AuthToken, e.options.config.Interval.Duration)
if err != nil {
return fmt.Errorf("failed to send data: %w", err)
}
Expand Down
34 changes: 32 additions & 2 deletions internal/exporter/writer/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ import (
const (
// defaultRetryDelay is the default delay between retry attempts
defaultRetryDelay = 5 * time.Second

// maxInitialJitter caps the initial jitter so that even with very long
// collection intervals the pre-push delay stays reasonable.
maxInitialJitter = 1 * time.Minute
)

// HTTPError represents an HTTP error with status code
Expand All @@ -56,7 +60,7 @@ type JWTRefreshFunc func(ctx context.Context) (string, error)

// HTTPWriter defines the interface for sending health data via HTTP
type HTTPWriter interface {
Send(ctx context.Context, data *collector.HealthData, metricsEndpoint string, logsEndpoint string, maxRetries int, authToken string) (newToken string, err error)
Send(ctx context.Context, data *collector.HealthData, metricsEndpoint string, logsEndpoint string, maxRetries int, authToken string, collectionInterval time.Duration) (newToken string, err error)
SetJWTRefreshFunc(refreshFunc JWTRefreshFunc)
}

Expand All @@ -81,7 +85,33 @@ func (w *httpWriter) SetJWTRefreshFunc(refreshFunc JWTRefreshFunc) {
}

// Send sends health data to the specified endpoint
func (w *httpWriter) Send(ctx context.Context, data *collector.HealthData, metricsEndpoint string, logsEndpoint string, maxRetries int, authToken string) (string, error) {
func (w *httpWriter) Send(ctx context.Context, data *collector.HealthData, metricsEndpoint string, logsEndpoint string, maxRetries int, authToken string, collectionInterval time.Duration) (string, error) {
// Add jitter before the initial push to spread agent requests and prevent
// thundering herd when many agents share the same collection tick.
// Use 5% of the collection interval, capped at maxInitialJitter
jitterCap := collectionInterval / 20 // 5%
if jitterCap > maxInitialJitter {
jitterCap = maxInitialJitter
}
if jitterCap <= 0 {
jitterCap = defaultRetryDelay / 2 // fallback for zero/negative intervals
}

// Apply a further limit to the initial jitter of half the remaining context deadline.
// This is to prevent the jitter from consuming the entire timeout budget.
if deadline, ok := ctx.Deadline(); ok {
if remaining := time.Until(deadline) / 2; remaining > 0 && jitterCap > remaining {
jitterCap = remaining
}
}
if jitter := calculateJitter(jitterCap); jitter > 0 {
select {
case <-ctx.Done():
return "", ctx.Err()
case <-time.After(jitter):
}
}

// Convert to OTLP format
otlpData := w.otlpConverter.Convert(data)

Expand Down
22 changes: 11 additions & 11 deletions internal/exporter/writer/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func TestHTTPWriter_Send_Success(t *testing.T) {
writer := NewHTTPWriter(httpClient, otlpConverter)

ctx := context.Background()
newToken, err := writer.Send(ctx, testData, metricsServer.URL, logsServer.URL, 1, "test-token")
newToken, err := writer.Send(ctx, testData, metricsServer.URL, logsServer.URL, 1, "test-token", 1*time.Minute)

require.NoError(t, err)
assert.Empty(t, newToken)
Expand Down Expand Up @@ -126,7 +126,7 @@ func TestHTTPWriter_Send_EmptyMetrics(t *testing.T) {
writer := NewHTTPWriter(httpClient, otlpConverter)

ctx := context.Background()
_, err := writer.Send(ctx, testData, "", logsServer.URL, 1, "test-token")
_, err := writer.Send(ctx, testData, "", logsServer.URL, 1, "test-token", 1*time.Minute)

require.NoError(t, err)
assert.Equal(t, 1, logsRequests)
Expand Down Expand Up @@ -164,7 +164,7 @@ func TestHTTPWriter_Send_EmptyLogs(t *testing.T) {
writer := NewHTTPWriter(httpClient, otlpConverter)

ctx := context.Background()
_, err := writer.Send(ctx, testData, metricsServer.URL, "", 1, "test-token")
_, err := writer.Send(ctx, testData, metricsServer.URL, "", 1, "test-token", 1*time.Minute)

require.NoError(t, err)
assert.Equal(t, 1, metricsRequests)
Expand Down Expand Up @@ -204,7 +204,7 @@ func TestHTTPWriter_Send_RetryOnFailure(t *testing.T) {
writer := NewHTTPWriter(httpClient, otlpConverter)

ctx := context.Background()
_, err := writer.Send(ctx, testData, server.URL, "", 3, "test-token")
_, err := writer.Send(ctx, testData, server.URL, "", 3, "test-token", 1*time.Minute)

require.NoError(t, err)
assert.Equal(t, 3, attempts)
Expand Down Expand Up @@ -241,7 +241,7 @@ func TestHTTPWriter_Send_RetryExhausted(t *testing.T) {
writer := NewHTTPWriter(httpClient, otlpConverter)

ctx := context.Background()
_, err := writer.Send(ctx, testData, "", server.URL, maxAttempts, "test-token")
_, err := writer.Send(ctx, testData, "", server.URL, maxAttempts, "test-token", 1*time.Minute)

require.Error(t, err)
assert.Equal(t, maxAttempts, attempts)
Expand Down Expand Up @@ -292,7 +292,7 @@ func TestHTTPWriter_Send_UnauthorizedWithJWTRefresh(t *testing.T) {
})

ctx := context.Background()
_, err := writer.Send(ctx, testData, server.URL, "", 3, "old-token")
_, err := writer.Send(ctx, testData, server.URL, "", 3, "old-token", 1*time.Minute)

require.NoError(t, err)
assert.True(t, refreshCalled)
Expand Down Expand Up @@ -332,7 +332,7 @@ func TestHTTPWriter_Send_UnauthorizedJWTRefreshFails(t *testing.T) {
})

ctx := context.Background()
_, err := writer.Send(ctx, testData, "", server.URL, 2, "old-token") // Reduce retries
_, err := writer.Send(ctx, testData, "", server.URL, 2, "old-token", 1*time.Minute) // Reduce retries

require.Error(t, err)
assert.GreaterOrEqual(t, attempts, 1)
Expand Down Expand Up @@ -368,7 +368,7 @@ func TestHTTPWriter_Send_JWTTokenRefreshFromHeader(t *testing.T) {
writer := NewHTTPWriter(httpClient, otlpConverter)

ctx := context.Background()
returnedToken, err := writer.Send(ctx, testData, server.URL, "", 1, "test-token")
returnedToken, err := writer.Send(ctx, testData, server.URL, "", 1, "test-token", 1*time.Minute)

require.NoError(t, err)
assert.Equal(t, newToken, returnedToken)
Expand Down Expand Up @@ -404,7 +404,7 @@ func TestHTTPWriter_Send_ContextCancellation(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()

_, err := writer.Send(ctx, testData, "", server.URL, 1, "test-token")
_, err := writer.Send(ctx, testData, "", server.URL, 1, "test-token", 1*time.Minute)

require.Error(t, err)
}
Expand Down Expand Up @@ -443,7 +443,7 @@ func TestHTTPWriter_Send_LogsFailure(t *testing.T) {
writer := NewHTTPWriter(httpClient, otlpConverter)

ctx := context.Background()
_, err := writer.Send(ctx, testData, metricsServer.URL, logsServer.URL, 1, "test-token")
_, err := writer.Send(ctx, testData, metricsServer.URL, logsServer.URL, 1, "test-token", 1*time.Minute)

require.Error(t, err)
assert.Contains(t, err.Error(), "failed to send critical logs data")
Expand Down Expand Up @@ -571,7 +571,7 @@ func TestHTTPWriter_MarshalError(t *testing.T) {

ctx := context.Background()
// Pass empty endpoints - should not send anything
newToken, err := writer.Send(ctx, testData, "", "", 1, "test-token")
newToken, err := writer.Send(ctx, testData, "", "", 1, "test-token", 1*time.Minute)

require.NoError(t, err)
assert.Empty(t, newToken)
Expand Down
Loading