diff --git a/sink/cloudwatch/cloudwatch.go b/sink/cloudwatch/cloudwatch.go index 8283e5b..41e98a8 100644 --- a/sink/cloudwatch/cloudwatch.go +++ b/sink/cloudwatch/cloudwatch.go @@ -42,10 +42,19 @@ func PutLogEvents(input *cloudwatchlogs.PutLogEventsInput) csink.Op[Client] { // group and stream. The event timestamp is set to the current time in // milliseconds since the Unix epoch, as required by the CloudWatch Logs API. // Callers that need to control the timestamp, sequence token, or multiple -// events in one request should use [PutLogEvents] instead. +// events in one request should use [PutLogEventAt] or [PutLogEvents] instead. func PutLogEvent(logGroup, logStream, message string) csink.Op[Client] { + return PutLogEventAt(logGroup, logStream, message, time.Now()) +} + +// PutLogEventAt returns an Op that sends a single log event stamped with the +// supplied time. It is the deterministic, clock-injectable form of +// [PutLogEvent]: tests pass a fixed time and assert the emitted timestamp +// without depending on the wall clock. The time is converted to milliseconds +// since the Unix epoch, as required by the CloudWatch Logs API. +func PutLogEventAt(logGroup, logStream, message string, at time.Time) csink.Op[Client] { return csink.OpFunc[Client](func(ctx context.Context, c Client) error { - ts := time.Now().UnixMilli() + ts := at.UnixMilli() _, err := c.PutLogEvents(ctx, &cloudwatchlogs.PutLogEventsInput{ LogGroupName: &logGroup, LogStreamName: &logStream, diff --git a/sink/cloudwatch/putlogevent_at_test.go b/sink/cloudwatch/putlogevent_at_test.go new file mode 100644 index 0000000..cacb02b --- /dev/null +++ b/sink/cloudwatch/putlogevent_at_test.go @@ -0,0 +1,59 @@ +// SPDX-License-Identifier: Apache-2.0 + +package cloudwatch_test + +import ( + "context" + "testing" + "time" + + cw "github.com/stablekernel/crucible/sink/cloudwatch" +) + +// TestPutLogEventAt_StampsSuppliedTime verifies the clock-injectable variant +// stamps the event with the caller-supplied time rather than the wall clock, so +// the emitted timestamp is deterministic. +func TestPutLogEventAt_StampsSuppliedTime(t *testing.T) { + t.Parallel() + + at := time.Unix(1700000000, 0) // a fixed, known instant + c := &fakeClient{} + op := cw.PutLogEventAt("/app/events", "app-stream", "deterministic", at) + if err := op.Apply(context.Background(), c); err != nil { + t.Fatalf("Apply() error = %v", err) + } + if len(c.calls) != 1 { + t.Fatalf("PutLogEvents call count = %d, want 1", len(c.calls)) + } + events := c.calls[0].LogEvents + if len(events) != 1 { + t.Fatalf("events count = %d, want 1", len(events)) + } + if events[0].Timestamp == nil { + t.Fatal("event timestamp is nil") + } + if got, want := *events[0].Timestamp, at.UnixMilli(); got != want { + t.Fatalf("timestamp = %d, want %d (the supplied time in ms)", got, want) + } + if events[0].Message == nil || *events[0].Message != "deterministic" { + t.Errorf("message = %v, want %q", events[0].Message, "deterministic") + } +} + +// TestPutLogEvent_DelegatesToPutLogEventAt verifies the convenience wrapper +// still emits exactly one event to the named group and stream. +func TestPutLogEvent_DelegatesToPutLogEventAt(t *testing.T) { + t.Parallel() + + c := &fakeClient{} + op := cw.PutLogEvent("/app/events", "app-stream", "now") + if err := op.Apply(context.Background(), c); err != nil { + t.Fatalf("Apply() error = %v", err) + } + if len(c.calls) != 1 || len(c.calls[0].LogEvents) != 1 { + t.Fatalf("calls = %+v, want one call with one event", c.calls) + } + if c.calls[0].LogEvents[0].Timestamp == nil { + t.Error("PutLogEvent should stamp a timestamp") + } +} diff --git a/sink/dynamo/batchwrite_checked_test.go b/sink/dynamo/batchwrite_checked_test.go new file mode 100644 index 0000000..5915e8e --- /dev/null +++ b/sink/dynamo/batchwrite_checked_test.go @@ -0,0 +1,107 @@ +// SPDX-License-Identifier: Apache-2.0 + +package dynamo_test + +import ( + "context" + "errors" + "strings" + "testing" + + "github.com/aws/aws-sdk-go-v2/service/dynamodb" + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" + + "github.com/stablekernel/crucible/sink/dynamo" +) + +// batchClient is a hand-rolled Client that returns a configurable +// BatchWriteItem response, so the UnprocessedItems path can be driven without a +// live table. +type batchClient struct { + unprocessed map[string][]types.WriteRequest + err error +} + +func (b *batchClient) PutItem(context.Context, *dynamodb.PutItemInput, ...func(*dynamodb.Options)) (*dynamodb.PutItemOutput, error) { + return &dynamodb.PutItemOutput{}, nil +} + +func (b *batchClient) UpdateItem(context.Context, *dynamodb.UpdateItemInput, ...func(*dynamodb.Options)) (*dynamodb.UpdateItemOutput, error) { + return &dynamodb.UpdateItemOutput{}, nil +} + +func (b *batchClient) DeleteItem(context.Context, *dynamodb.DeleteItemInput, ...func(*dynamodb.Options)) (*dynamodb.DeleteItemOutput, error) { + return &dynamodb.DeleteItemOutput{}, nil +} + +func (b *batchClient) TransactWriteItems(context.Context, *dynamodb.TransactWriteItemsInput, ...func(*dynamodb.Options)) (*dynamodb.TransactWriteItemsOutput, error) { + return &dynamodb.TransactWriteItemsOutput{}, nil +} + +func (b *batchClient) BatchWriteItem(_ context.Context, _ *dynamodb.BatchWriteItemInput, _ ...func(*dynamodb.Options)) (*dynamodb.BatchWriteItemOutput, error) { + if b.err != nil { + return nil, b.err + } + return &dynamodb.BatchWriteItemOutput{UnprocessedItems: b.unprocessed}, nil +} + +func writeReq(id string) types.WriteRequest { + return types.WriteRequest{ + PutRequest: &types.PutRequest{Item: item(id)}, + } +} + +func TestBatchWriteChecked_NoUnprocessedSucceeds(t *testing.T) { + t.Parallel() + + c := &batchClient{} + op := dynamo.BatchWriteChecked(&dynamodb.BatchWriteItemInput{}) + if err := op.Apply(context.Background(), c); err != nil { + t.Fatalf("Apply() error = %v, want nil when no items are unprocessed", err) + } +} + +func TestBatchWriteChecked_UnprocessedReturnsSentinel(t *testing.T) { + t.Parallel() + + c := &batchClient{ + unprocessed: map[string][]types.WriteRequest{ + "orders": {writeReq("A-1"), writeReq("A-2")}, + "shipments": {writeReq("S-1")}, + }, + } + op := dynamo.BatchWriteChecked(&dynamodb.BatchWriteItemInput{}) + err := op.Apply(context.Background(), c) + if !errors.Is(err, dynamo.ErrUnprocessedItems) { + t.Fatalf("Apply() = %v, want to wrap ErrUnprocessedItems", err) + } + if !strings.Contains(err.Error(), "3 item(s)") || !strings.Contains(err.Error(), "2 table(s)") { + t.Errorf("error = %v, want it to report 3 items across 2 tables", err) + } +} + +func TestBatchWriteChecked_RequestErrorPropagates(t *testing.T) { + t.Parallel() + + boom := errors.New("throttled") + c := &batchClient{err: boom} + op := dynamo.BatchWriteChecked(&dynamodb.BatchWriteItemInput{}) + if err := op.Apply(context.Background(), c); !errors.Is(err, boom) { + t.Fatalf("Apply() = %v, want wrapped %v", err, boom) + } +} + +// TestBatchWrite_IgnoresUnprocessed documents that the unchecked BatchWrite +// still reports success when items are left unprocessed, so callers who need the +// stronger guarantee reach for BatchWriteChecked. +func TestBatchWrite_IgnoresUnprocessed(t *testing.T) { + t.Parallel() + + c := &batchClient{ + unprocessed: map[string][]types.WriteRequest{"orders": {writeReq("A-1")}}, + } + op := dynamo.BatchWrite(&dynamodb.BatchWriteItemInput{}) + if err := op.Apply(context.Background(), c); err != nil { + t.Fatalf("BatchWrite Apply() = %v, want nil (unchecked variant ignores unprocessed)", err) + } +} diff --git a/sink/dynamo/dynamo.go b/sink/dynamo/dynamo.go index 8434567..8f5e28b 100644 --- a/sink/dynamo/dynamo.go +++ b/sink/dynamo/dynamo.go @@ -14,12 +14,21 @@ package dynamo import ( "context" + "errors" + "fmt" "github.com/aws/aws-sdk-go-v2/service/dynamodb" csink "github.com/stablekernel/crucible/sink" ) +// ErrUnprocessedItems reports that a BatchWriteItem call returned without a +// request-level error but left one or more items unprocessed (DynamoDB returns +// these via UnprocessedItems on the output, typically under throttling). +// [BatchWriteChecked] returns an error wrapping this sentinel so a caller can +// match the class with errors.Is and retry the unprocessed items. +var ErrUnprocessedItems = errors.New("dynamo: BatchWriteItem left items unprocessed") + // Client is the narrow DynamoDB surface this destination needs. It declares // only the write operations the package issues, so consumers wire the real // *dynamodb.Client (which satisfies it structurally) while tests use a @@ -70,8 +79,8 @@ func TransactWrite(in *dynamodb.TransactWriteItemsInput) csink.Op[Client] { // BatchWrite returns an Op that issues a batch of put and delete requests. The // SDK reports per-item failures via UnprocessedItems on the output; this Op -// returns only the request-level error, so a caller needing retry on -// unprocessed items should compose a custom Op instead. +// returns only the request-level error and ignores unprocessed items. Use +// [BatchWriteChecked] when a left-behind item must surface as an error. func BatchWrite(in *dynamodb.BatchWriteItemInput) csink.Op[Client] { return csink.OpFunc[Client](func(ctx context.Context, c Client) error { _, err := c.BatchWriteItem(ctx, in) @@ -79,6 +88,31 @@ func BatchWrite(in *dynamodb.BatchWriteItemInput) csink.Op[Client] { }) } +// BatchWriteChecked returns an Op that issues a batch of put and delete requests +// and treats unprocessed items as a failure. DynamoDB returns HTTP 200 with a +// populated UnprocessedItems map when it could not write every item (commonly +// under throttling), so the request-level error is nil even though work was +// dropped. The Op inspects the response and, when any item is unprocessed, +// returns an error wrapping [ErrUnprocessedItems] reporting how many tables and +// items were left behind. Pair it with sink.Reservoir or a retry middleware to +// resubmit the unprocessed items. +func BatchWriteChecked(in *dynamodb.BatchWriteItemInput) csink.Op[Client] { + return csink.OpFunc[Client](func(ctx context.Context, c Client) error { + out, err := c.BatchWriteItem(ctx, in) + if err != nil { + return err + } + var items int + for _, reqs := range out.UnprocessedItems { + items += len(reqs) + } + if items > 0 { + return fmt.Errorf("%w: %d item(s) across %d table(s)", ErrUnprocessedItems, items, len(out.UnprocessedItems)) + } + return nil + }) +} + // NewRegistry returns an empty registry of Op[Client] for callers to populate // with sink.Register. func NewRegistry() *csink.Registry[csink.Op[Client]] { diff --git a/sink/errors.go b/sink/errors.go index 8962ec9..8dcde03 100644 --- a/sink/errors.go +++ b/sink/errors.go @@ -18,7 +18,9 @@ type Phase string const ( // PhaseTransform marks a failure turning a payload into a destination - // operation (a registry transformer returning an error-bearing op). + // operation. Registry transformers cannot themselves return an error, so the + // built-in Emitter never emits this phase today; it is reserved for outlets + // whose payload-to-operation step can fail and want to classify it. PhaseTransform Phase = "transform" // PhaseApply marks a failure applying the operation to the destination // client (the write itself). diff --git a/sink/eventbridge/eventbridge.go b/sink/eventbridge/eventbridge.go index 8dc3bcf..283d2bd 100644 --- a/sink/eventbridge/eventbridge.go +++ b/sink/eventbridge/eventbridge.go @@ -48,7 +48,7 @@ func PutEvents(input *awseb.PutEventsInput) csink.Op[Client] { return err } if out.FailedEntryCount > 0 { - return partialFailureError(out.Entries) + return partialFailureError(out.FailedEntryCount, out.Entries) } return nil }) @@ -73,8 +73,11 @@ func PutEvent(eventBusName, source, detailType, detail string) csink.Op[Client] // partialFailureError builds a descriptive error from the failed entries // returned in a PutEvents response. EventBridge returns HTTP 200 for partial -// failures, so the Op must inspect FailedEntryCount explicitly. -func partialFailureError(entries []types.PutEventsResultEntry) error { +// failures, so the Op inspects FailedEntryCount explicitly and passes it here as +// the authoritative count. The reported number is FailedEntryCount, not the +// number of entries that happened to carry an ErrorCode, so the count and its +// plural stay correct even when an entry omits a code. +func partialFailureError(failed int32, entries []types.PutEventsResultEntry) error { var parts []string for _, e := range entries { if e.ErrorCode == nil { @@ -87,14 +90,13 @@ func partialFailureError(entries []types.PutEventsResultEntry) error { } parts = append(parts, fmt.Sprintf("%s: %s", code, msg)) } - n := len(parts) suffix := "y" - if n != 1 { + if failed != 1 { suffix = "ies" } return fmt.Errorf( "eventbridge: %d entr%s failed: %s", - n, + failed, suffix, strings.Join(parts, "; "), ) diff --git a/sink/eventbridge/partial_failure_test.go b/sink/eventbridge/partial_failure_test.go new file mode 100644 index 0000000..0b4d914 --- /dev/null +++ b/sink/eventbridge/partial_failure_test.go @@ -0,0 +1,84 @@ +// SPDX-License-Identifier: Apache-2.0 + +package eventbridge_test + +import ( + "context" + "strings" + "testing" + + awseb "github.com/aws/aws-sdk-go-v2/service/eventbridge" + "github.com/aws/aws-sdk-go-v2/service/eventbridge/types" + csink "github.com/stablekernel/crucible/sink" + ebsink "github.com/stablekernel/crucible/sink/eventbridge" +) + +// TestPutEvents_NilErrorCodeSkipped verifies that entries with a nil ErrorCode +// are skipped when building the detail list, while the reported count still +// reflects the authoritative FailedEntryCount rather than the number of entries +// that happened to carry a code. +func TestPutEvents_NilErrorCodeSkipped(t *testing.T) { + t.Parallel() + + c := &fakeClient{ + output: &awseb.PutEventsOutput{ + FailedEntryCount: 2, + Entries: []types.PutEventsResultEntry{ + {EventId: ptr("ok-1")}, // succeeded: no ErrorCode + {ErrorCode: ptr("ThrottlingException"), ErrorMessage: ptr("slow down")}, // failed + {}, // failed but no code reported + }, + }, + } + reg := ebsink.NewRegistry() + csink.Register(reg, func(_ context.Context, o orderPlaced) csink.Op[ebsink.Client] { + return ebsink.PutEvent("orders", "src", "OrderPlaced", `{}`) + }) + outlet := ebsink.New(c, reg) + + err := outlet.Sink(context.Background(), orderPlaced{OrderID: "ORD-1"}) + if err == nil { + t.Fatal("Sink() = nil, want partial failure error") + } + msg := err.Error() + // The count comes from FailedEntryCount (2), not from the single entry that + // carried an ErrorCode. + if !strings.Contains(msg, "2 entries failed") { + t.Errorf("error = %q, want the authoritative count %q", msg, "2 entries failed") + } + // The nil-code entry is skipped, so only the coded detail appears. + if !strings.Contains(msg, "ThrottlingException") { + t.Errorf("error = %q, want the coded entry detail", msg) + } + if strings.Contains(msg, "ok-1") { + t.Errorf("error = %q, should not mention the succeeded entry", msg) + } +} + +// TestPutEvents_SingleFailureSingularPlural locks the singular plural arm so a +// single failed entry reads "1 entry failed". +func TestPutEvents_SingleFailureSingularPlural(t *testing.T) { + t.Parallel() + + c := &fakeClient{ + output: &awseb.PutEventsOutput{ + FailedEntryCount: 1, + Entries: []types.PutEventsResultEntry{ + {ErrorCode: ptr("InternalFailure"), ErrorMessage: ptr("retry")}, + }, + }, + } + reg := ebsink.NewRegistry() + csink.Register(reg, func(_ context.Context, o orderPlaced) csink.Op[ebsink.Client] { + return ebsink.PutEvent("orders", "src", "OrderPlaced", `{}`) + }) + outlet := ebsink.New(c, reg) + + err := outlet.Sink(context.Background(), orderPlaced{OrderID: "ORD-2"}) + if err == nil { + t.Fatal("Sink() = nil, want failure error") + } + if !strings.Contains(err.Error(), "1 entry failed") { + t.Errorf("error = %q, want the singular form %q", err.Error(), "1 entry failed") + } +} diff --git a/sink/http/error_paths_test.go b/sink/http/error_paths_test.go new file mode 100644 index 0000000..d652f72 --- /dev/null +++ b/sink/http/error_paths_test.go @@ -0,0 +1,57 @@ +// SPDX-License-Identifier: Apache-2.0 + +package http_test + +import ( + "context" + "strings" + "testing" + + httpsink "github.com/stablekernel/crucible/sink/http" +) + +// TestPost_BuildRequestError exercises the NewRequestWithContext failure branch: +// a URL containing a control character cannot be parsed into a request, so Post +// returns a build-request error before any Do call. +func TestPost_BuildRequestError(t *testing.T) { + t.Parallel() + + doer := &fakeDoer{} + // A control character in the URL makes http.NewRequestWithContext fail. + err := httpsink.Post("http://example.test/\x7f", "application/json", []byte("{}")). + Apply(context.Background(), doer) + if err == nil { + t.Fatal("Apply() = nil, want a build-request error") + } + if !strings.Contains(err.Error(), "build request") { + t.Errorf("error = %v, want it to mention building the request", err) + } + if len(doer.requests) != 0 { + t.Errorf("Do() called %d times, want 0 (request never built)", len(doer.requests)) + } +} + +// unmarshalable is a type json.Marshal rejects: a channel field cannot be +// encoded, driving the PostJSON marshal-error branch. +type unmarshalable struct { + Ch chan int +} + +// TestPostJSON_MarshalError exercises the json.Marshal failure branch: an +// unmarshalable payload returns a marshal error before any request is built. +func TestPostJSON_MarshalError(t *testing.T) { + t.Parallel() + + doer := &fakeDoer{} + err := httpsink.PostJSON("http://example.test/items", unmarshalable{Ch: make(chan int)}). + Apply(context.Background(), doer) + if err == nil { + t.Fatal("Apply() = nil, want a marshal error") + } + if !strings.Contains(err.Error(), "marshal json") { + t.Errorf("error = %v, want it to mention marshaling json", err) + } + if len(doer.requests) != 0 { + t.Errorf("Do() called %d times, want 0 (marshal failed first)", len(doer.requests)) + } +} diff --git a/sink/http/http.go b/sink/http/http.go index bdc70f1..9fa8800 100644 --- a/sink/http/http.go +++ b/sink/http/http.go @@ -12,12 +12,12 @@ package http import ( + "bytes" "context" "encoding/json" "fmt" "io" gohttp "net/http" - "strings" csink "github.com/stablekernel/crucible/sink" ) @@ -35,7 +35,7 @@ type Doer interface { // A non-2xx status code is returned as an error that includes the status text. func Post(url, contentType string, body []byte) csink.Op[Doer] { return csink.OpFunc[Doer](func(ctx context.Context, doer Doer) error { - req, err := gohttp.NewRequestWithContext(ctx, gohttp.MethodPost, url, strings.NewReader(string(body))) + req, err := gohttp.NewRequestWithContext(ctx, gohttp.MethodPost, url, bytes.NewReader(body)) if err != nil { return fmt.Errorf("http sink: build request: %w", err) } diff --git a/sink/kinesis/kinesis.go b/sink/kinesis/kinesis.go index 0a1c15d..2020d8a 100644 --- a/sink/kinesis/kinesis.go +++ b/sink/kinesis/kinesis.go @@ -16,6 +16,9 @@ package kinesis import ( "context" + "errors" + "fmt" + "strings" "github.com/aws/aws-sdk-go-v2/service/kinesis" "github.com/aws/aws-sdk-go-v2/service/kinesis/types" @@ -23,6 +26,13 @@ import ( csink "github.com/stablekernel/crucible/sink" ) +// ErrPartialFailure reports that a PutRecords batch returned HTTP 200 but the +// response carried a non-zero FailedRecordCount, meaning one or more records +// were rejected. Kinesis does not surface these as a request-level error, so +// the Op inspects FailedRecordCount and returns this sentinel (wrapped with the +// per-record detail) instead of silently succeeding. Match it with errors.Is. +var ErrPartialFailure = errors.New("kinesis: PutRecords reported failed records") + // Client is the narrow Kinesis surface this destination needs. It is satisfied // structurally by *kinesis.Client from the AWS SDK v2, keeping the package free // of a hard dependency on the concrete SDK type in production callers. @@ -81,13 +91,46 @@ func PutRecordOf(params PutRecordParams) csink.Op[Client] { // single PutRecords call. The StreamName or StreamARN field (or both) in input // must identify the target stream. The caller owns the input struct and must // not mutate it concurrently after calling PutRecords. +// +// Kinesis returns HTTP 200 for a batch in which some records were rejected, so +// the Op inspects FailedRecordCount on the response. When it is non-zero the Op +// returns an error that wraps [ErrPartialFailure] and lists the per-record error +// codes, rather than reporting success. func PutRecords(input *kinesis.PutRecordsInput) csink.Op[Client] { return csink.OpFunc[Client](func(ctx context.Context, c Client) error { - _, err := c.PutRecords(ctx, input) - return err + out, err := c.PutRecords(ctx, input) + if err != nil { + return err + } + if out.FailedRecordCount != nil && *out.FailedRecordCount > 0 { + return partialFailureError(*out.FailedRecordCount, out.Records) + } + return nil }) } +// partialFailureError builds an error describing the rejected records in a +// PutRecords response. It wraps [ErrPartialFailure] so callers can match the +// class with errors.Is while still reading the per-record detail from the +// message. +func partialFailureError(failed int32, records []types.PutRecordsResultEntry) error { + var parts []string + for _, r := range records { + if r.ErrorCode == nil { + continue + } + msg := "" + if r.ErrorMessage != nil { + msg = *r.ErrorMessage + } + parts = append(parts, fmt.Sprintf("%s: %s", *r.ErrorCode, msg)) + } + if len(parts) == 0 { + return fmt.Errorf("%w: %d failed", ErrPartialFailure, failed) + } + return fmt.Errorf("%w: %d failed: %s", ErrPartialFailure, failed, strings.Join(parts, "; ")) +} + // PutRecordsEntry is a single record within a batch write. It mirrors the // fields of types.PutRecordsRequestEntry that callers typically set, allowing // [PutRecordsOf] to be used without importing the SDK types package directly. diff --git a/sink/kinesis/partial_failure_test.go b/sink/kinesis/partial_failure_test.go new file mode 100644 index 0000000..8390e8f --- /dev/null +++ b/sink/kinesis/partial_failure_test.go @@ -0,0 +1,106 @@ +// SPDX-License-Identifier: Apache-2.0 + +package kinesis_test + +import ( + "context" + "errors" + "strings" + "testing" + + awskinesis "github.com/aws/aws-sdk-go-v2/service/kinesis" + "github.com/aws/aws-sdk-go-v2/service/kinesis/types" + + kinesissink "github.com/stablekernel/crucible/sink/kinesis" +) + +// failingClient returns a PutRecords response with a non-zero FailedRecordCount +// so the partial-failure path is exercised. Kinesis surfaces such a batch as +// HTTP 200, so only inspecting the response detects it. +type failingClient struct { + failedCount int32 + records []types.PutRecordsResultEntry +} + +func (f *failingClient) PutRecord(context.Context, *awskinesis.PutRecordInput, ...func(*awskinesis.Options)) (*awskinesis.PutRecordOutput, error) { + return &awskinesis.PutRecordOutput{}, nil +} + +func (f *failingClient) PutRecords(_ context.Context, _ *awskinesis.PutRecordsInput, _ ...func(*awskinesis.Options)) (*awskinesis.PutRecordsOutput, error) { + count := f.failedCount + return &awskinesis.PutRecordsOutput{FailedRecordCount: &count, Records: f.records}, nil +} + +func TestPutRecords_PartialFailureReturnsSentinel(t *testing.T) { + t.Parallel() + + c := &failingClient{ + failedCount: 1, + records: []types.PutRecordsResultEntry{ + {SequenceNumber: ptrStr("1"), ShardId: ptrStr("shardId-0")}, + {ErrorCode: ptrStr("ProvisionedThroughputExceededException"), ErrorMessage: ptrStr("rate exceeded")}, + }, + } + stream := streamName + pk := "pk-1" + op := kinesissink.PutRecords(&awskinesis.PutRecordsInput{ + StreamName: &stream, + Records: []types.PutRecordsRequestEntry{{PartitionKey: &pk, Data: []byte("a")}}, + }) + + err := op.Apply(context.Background(), c) + if !errors.Is(err, kinesissink.ErrPartialFailure) { + t.Fatalf("Apply() = %v, want to wrap ErrPartialFailure", err) + } + if !strings.Contains(err.Error(), "ProvisionedThroughputExceededException") { + t.Errorf("error = %v, want the per-record error code in the message", err) + } + if !strings.Contains(err.Error(), "1 failed") { + t.Errorf("error = %v, want the failed count in the message", err) + } +} + +func TestPutRecords_PartialFailureWithoutErrorCodes(t *testing.T) { + t.Parallel() + + // FailedRecordCount is non-zero but no entry carries an ErrorCode; the Op + // still reports the sentinel rather than silently succeeding. + c := &failingClient{failedCount: 2, records: []types.PutRecordsResultEntry{{}, {}}} + stream := streamName + pk := "pk-2" + op := kinesissink.PutRecords(&awskinesis.PutRecordsInput{ + StreamName: &stream, + Records: []types.PutRecordsRequestEntry{{PartitionKey: &pk, Data: []byte("b")}}, + }) + + err := op.Apply(context.Background(), c) + if !errors.Is(err, kinesissink.ErrPartialFailure) { + t.Fatalf("Apply() = %v, want ErrPartialFailure", err) + } + if !strings.Contains(err.Error(), "2 failed") { + t.Errorf("error = %v, want count 2", err) + } +} + +func TestPutRecordsOf_StreamARNPath(t *testing.T) { + t.Parallel() + + c := &fakeClient{} + arn := "arn:aws:kinesis:us-east-1:123456789012:stream/test" + op := kinesissink.PutRecordsOf("", arn, []kinesissink.PutRecordsEntry{ + {PartitionKey: "pk-x", Data: []byte("x")}, + }) + + if err := op.Apply(context.Background(), c); err != nil { + t.Fatalf("Apply() error = %v", err) + } + got := c.putRecordsCalls[0] + if got.StreamName != nil { + t.Errorf("StreamName should be nil when only ARN is set, got %q", *got.StreamName) + } + if got.StreamARN == nil || *got.StreamARN != arn { + t.Errorf("StreamARN = %v, want %q", got.StreamARN, arn) + } +} + +func ptrStr(s string) *string { return &s } diff --git a/sink/manifold.go b/sink/manifold.go index a0072b9..a7d9c7d 100644 --- a/sink/manifold.go +++ b/sink/manifold.go @@ -50,20 +50,26 @@ func NewManifold(opts ...Option) *Manifold { } // Attach adds outlets to the Manifold and returns it for chaining. It is safe -// for concurrent use. +// for concurrent use. Attach replaces the outlet slice with a freshly allocated +// one (copy-on-write) rather than appending in place, so a concurrent Sink that +// is iterating an earlier snapshot never observes a mutated backing array. func (m *Manifold) Attach(outlets ...Outlet) *Manifold { m.mu.Lock() defer m.mu.Unlock() - m.outlets = append(m.outlets, outlets...) + next := make([]Outlet, 0, len(m.outlets)+len(outlets)) + next = append(next, m.outlets...) + next = append(next, outlets...) + m.outlets = next return m } -// snapshot returns a stable view of the attached outlets for one operation, so a -// concurrent Attach never mutates the slice mid-fan-out. +// snapshot returns the current outlet slice for one operation. Because Attach +// is copy-on-write the returned slice is never mutated in place, so callers may +// read it without copying; the RLock only guards reading the slice header. func (m *Manifold) snapshot() []Outlet { m.mu.RLock() defer m.mu.RUnlock() - return append([]Outlet(nil), m.outlets...) + return m.outlets } // Sink fans payload out to every attached outlet, fire-and-forget. It starts a diff --git a/sink/manifold_concurrent_test.go b/sink/manifold_concurrent_test.go new file mode 100644 index 0000000..f3b6f1b --- /dev/null +++ b/sink/manifold_concurrent_test.go @@ -0,0 +1,35 @@ +// SPDX-License-Identifier: Apache-2.0 + +package sink_test + +import ( + "context" + "sync" + "testing" + + "github.com/stablekernel/crucible/sink" +) + +// TestManifoldConcurrentSinkAndAttach exercises the copy-on-write snapshot path: +// Sink reads the outlet slice without copying it, so a concurrent Attach must +// replace the slice rather than mutate the one a fan-out is iterating. Run under +// -race, this fails if Attach ever mutates a slice a Sink is reading. +func TestManifoldConcurrentSinkAndAttach(t *testing.T) { + t.Parallel() + + m := sink.NewManifold().Attach(sink.NewBucket()) + + var wg sync.WaitGroup + for i := 0; i < 16; i++ { + wg.Add(2) + go func() { + defer wg.Done() + m.Sink(context.Background(), "payload") + }() + go func() { + defer wg.Done() + m.Attach(sink.NewBucket()) + }() + } + wg.Wait() +} diff --git a/sink/poller.go b/sink/poller.go index 9431dfe..58fa872 100644 --- a/sink/poller.go +++ b/sink/poller.go @@ -18,11 +18,10 @@ type PollerOption func(*pollerConfig) type pollerConfig struct { interval time.Duration - now func() time.Time } func defaultPollerConfig() pollerConfig { - return pollerConfig{interval: 60 * time.Second, now: time.Now} + return pollerConfig{interval: 60 * time.Second} } // WithPollInterval sets the sampling period. The default is 60s. @@ -30,16 +29,6 @@ func WithPollInterval(d time.Duration) PollerOption { return func(c *pollerConfig) { c.interval = d } } -// WithPollerClock injects the clock the Poller reads, for deterministic tests. -// The default is time.Now. A nil clock is ignored. -func WithPollerClock(now func() time.Time) PollerOption { - return func(c *pollerConfig) { - if now != nil { - c.now = now - } - } -} - // Poller periodically runs a CollectFunc and sinks each yielded payload to a // target Outlet (via SinkBatch when the target supports it). Construct with // NewPoller; drive it with Start and stop it with Stop. @@ -47,7 +36,6 @@ type Poller struct { target Outlet collect CollectFunc interval time.Duration - now func() time.Time mu sync.Mutex cancel context.CancelFunc @@ -55,14 +43,14 @@ type Poller struct { started bool } -// NewPoller binds a target and a CollectFunc into a Poller. Defaults: interval -// 60s, now = time.Now. +// NewPoller binds a target and a CollectFunc into a Poller. The default +// interval is 60s. func NewPoller(target Outlet, collect CollectFunc, opts ...PollerOption) *Poller { cfg := defaultPollerConfig() for _, o := range opts { o(&cfg) } - return &Poller{target: target, collect: collect, interval: cfg.interval, now: cfg.now} + return &Poller{target: target, collect: collect, interval: cfg.interval} } // Start launches the sampling loop and returns the Poller for chaining. It is diff --git a/sink/prometheus/determinism_test.go b/sink/prometheus/determinism_test.go new file mode 100644 index 0000000..c3ebf32 --- /dev/null +++ b/sink/prometheus/determinism_test.go @@ -0,0 +1,68 @@ +// SPDX-License-Identifier: Apache-2.0 + +package prometheus_test + +import ( + "context" + "net/http" + "testing" + + csink "github.com/stablekernel/crucible/sink" + prom "github.com/stablekernel/crucible/sink/prometheus" +) + +// TestPushMetrics_MultiLabelDeterministicOrder verifies that a metric with +// several labels serializes its labels in a stable, sorted order regardless of +// map iteration order. Serializing the same metric many times must produce +// byte-identical output. +func TestPushMetrics_MultiLabelDeterministicOrder(t *testing.T) { + t.Parallel() + + metrics := []prom.Metric{ + { + Name: "http_requests_total", + Type: prom.TypeCounter, + Value: "7", + Labels: map[string]string{ + "method": "GET", + "status": "200", + "region": "us-east-1", + "app": "api", + }, + }, + } + + const want = `http_requests_total{app="api",method="GET",region="us-east-1",status="200"} 7` + + var first string + for i := 0; i < 50; i++ { + fd := &fakeDoer{status: http.StatusOK} + reg := prom.NewRegistry() + csink.Register(reg, func(_ context.Context, _ deployFinished) csink.Op[prom.Doer] { + return prom.PushMetrics("http://gw", "job", metrics) + }) + outlet := prom.New(fd, reg) + if err := outlet.Sink(context.Background(), deployFinished{Env: "prod"}); err != nil { + t.Fatalf("Sink() error = %v", err) + } + if i == 0 { + first = fd.body + if !contains(first, want) { + t.Fatalf("body = %q, want it to contain %q", first, want) + } + continue + } + if fd.body != first { + t.Fatalf("label order is not deterministic:\n run 0: %q\n run %d: %q", first, i, fd.body) + } + } +} + +func contains(haystack, needle string) bool { + for i := 0; i+len(needle) <= len(haystack); i++ { + if haystack[i:i+len(needle)] == needle { + return true + } + } + return false +} diff --git a/sink/prometheus/prometheus.go b/sink/prometheus/prometheus.go index d0fd082..2296a31 100644 --- a/sink/prometheus/prometheus.go +++ b/sink/prometheus/prometheus.go @@ -20,6 +20,7 @@ import ( "fmt" "io" "net/http" + "sort" "strings" csink "github.com/stablekernel/crucible/sink" @@ -106,14 +107,18 @@ func marshalMetrics(metrics []Metric) string { b.WriteString(m.Name) if len(m.Labels) > 0 { b.WriteByte('{') - first := true - // Iterate in a stable order for deterministic output. - for k, v := range m.Labels { - if !first { + // Sort the label keys so the serialized output is deterministic + // regardless of map iteration order. + keys := make([]string, 0, len(m.Labels)) + for k := range m.Labels { + keys = append(keys, k) + } + sort.Strings(keys) + for i, k := range keys { + if i > 0 { b.WriteByte(',') } - fmt.Fprintf(&b, `%s="%s"`, k, v) - first = false + fmt.Fprintf(&b, `%s="%s"`, k, m.Labels[k]) } b.WriteByte('}') } diff --git a/sink/reservoir.go b/sink/reservoir.go index 5afab06..359e922 100644 --- a/sink/reservoir.go +++ b/sink/reservoir.go @@ -4,6 +4,7 @@ package sink import ( "context" + "errors" "sync" "time" @@ -81,8 +82,9 @@ type reservoir struct { flushLatency telemetry.Histogram dropped telemetry.Counter - mu sync.Mutex - buf []any + mu sync.Mutex + buf []any + closed bool // set by Shutdown; Sink passes through afterward cancel context.CancelFunc wg sync.WaitGroup @@ -121,8 +123,15 @@ func Reservoir(inner Outlet, opts ...ReservoirOption) Outlet { // Sink buffers payload, flushing synchronously if the buffer reaches the batch // size. An over-cap payload is dropped and counted, never blocking the caller. +// After Shutdown the background loop is gone, so a payload would otherwise be +// stranded in the buffer; once closed, Sink dispatches each payload synchronously +// to inner instead of buffering it. func (r *reservoir) Sink(ctx context.Context, payload any) error { r.mu.Lock() + if r.closed { + r.mu.Unlock() + return r.dispatch(ctx, []any{payload}) + } if r.maxBuf > 0 && len(r.buf) >= r.maxBuf { r.mu.Unlock() r.dropped.Add(ctx, 1) @@ -158,6 +167,9 @@ func (r *reservoir) Shutdown(ctx context.Context) error { r.cancel() } r.wg.Wait() + r.mu.Lock() + r.closed = true + r.mu.Unlock() }) return r.Flush(ctx) } @@ -183,11 +195,13 @@ func (r *reservoir) dispatch(ctx context.Context, batch []any) error { if b, ok := r.inner.(BatchOutlet); ok { err = b.SinkBatch(ctx, batch) } else { + var errs []error for _, p := range batch { if e := r.inner.Sink(ctx, p); e != nil { - err = e + errs = append(errs, e) } } + err = errors.Join(errs...) } r.flushLatency.Record(ctx, float64(r.now().Sub(start).Milliseconds())) return err diff --git a/sink/reservoir_errors_test.go b/sink/reservoir_errors_test.go new file mode 100644 index 0000000..720ee0e --- /dev/null +++ b/sink/reservoir_errors_test.go @@ -0,0 +1,91 @@ +// SPDX-License-Identifier: Apache-2.0 + +package sink + +import ( + "context" + "errors" + "sync" + "testing" +) + +// erroringOutlet fails its first failN Sink calls with a distinct error each, +// then succeeds. It is not a BatchOutlet, so the reservoir takes the per-payload +// dispatch path where errors must be joined. +type erroringOutlet struct { + mu sync.Mutex + errs []error + called int +} + +func (o *erroringOutlet) Sink(_ context.Context, _ any) error { + o.mu.Lock() + defer o.mu.Unlock() + i := o.called + o.called++ + if i < len(o.errs) { + return o.errs[i] + } + return nil +} + +// TestReservoirNonBatchJoinsAllErrors verifies the per-payload dispatch path +// joins every error rather than keeping only the last one. +func TestReservoirNonBatchJoinsAllErrors(t *testing.T) { + t.Parallel() + + err1 := errors.New("first failed") + err2 := errors.New("second failed") + out := &erroringOutlet{errs: []error{err1, err2}} + r := Reservoir(out, WithBatchSize(0), WithBatchInterval(0)).(*reservoir) + + _ = r.Sink(context.Background(), "a") + _ = r.Sink(context.Background(), "b") + err := r.Flush(context.Background()) + if err == nil { + t.Fatal("Flush() = nil, want a joined error") + } + if !errors.Is(err, err1) { + t.Errorf("joined error does not contain the first error: %v", err) + } + if !errors.Is(err, err2) { + t.Errorf("joined error does not contain the second error: %v", err) + } +} + +// TestReservoirSinkAfterShutdownPassesThrough verifies a payload sunk after +// Shutdown is dispatched synchronously to inner rather than stranded in the +// buffer with no loop left to flush it. +func TestReservoirSinkAfterShutdownPassesThrough(t *testing.T) { + t.Parallel() + + bucket := NewBucket() + r := Reservoir(bucket, WithBatchSize(100), WithBatchInterval(0)).(*reservoir) + if err := r.Shutdown(context.Background()); err != nil { + t.Fatalf("Shutdown() error = %v", err) + } + + if err := r.Sink(context.Background(), "late"); err != nil { + t.Fatalf("Sink() after Shutdown error = %v", err) + } + all := bucket.All() + if len(all) != 1 || all[0] != "late" { + t.Fatalf("inner has %v, want the post-shutdown payload delivered immediately", all) + } +} + +// TestReservoirSinkAfterShutdownSurfacesError verifies the pass-through path +// still returns inner's error to the caller. +func TestReservoirSinkAfterShutdownSurfacesError(t *testing.T) { + t.Parallel() + + boom := errors.New("inner down") + out := &erroringOutlet{errs: []error{boom}} + r := Reservoir(out, WithBatchSize(100), WithBatchInterval(0)).(*reservoir) + if err := r.Shutdown(context.Background()); err != nil { + t.Fatalf("Shutdown() error = %v", err) + } + if err := r.Sink(context.Background(), "late"); !errors.Is(err, boom) { + t.Fatalf("Sink() after Shutdown = %v, want %v", err, boom) + } +} diff --git a/sink/sinktest/error_branches_test.go b/sink/sinktest/error_branches_test.go new file mode 100644 index 0000000..1e47e93 --- /dev/null +++ b/sink/sinktest/error_branches_test.go @@ -0,0 +1,108 @@ +// SPDX-License-Identifier: Apache-2.0 + +package sinktest + +import ( + "context" + "errors" + "testing" + + "github.com/stablekernel/crucible/sink" +) + +// flushOutlet is a conforming Outlet whose Flush behavior is configurable, so +// the harness's Flusher branch can be driven into both its error and panic +// arms. +type flushOutlet struct { + flushErr error + flushPanic bool +} + +func (o *flushOutlet) Sink(_ context.Context, _ any) error { return sink.ErrUnregistered } + +func (o *flushOutlet) Flush(context.Context) error { + if o.flushPanic { + panic("flush boom") + } + return o.flushErr +} + +// shutdownOutlet is a conforming Outlet whose Shutdown behavior is configurable. +type shutdownOutlet struct { + shutdownErr error + shutdownPanic bool +} + +func (o *shutdownOutlet) Sink(_ context.Context, _ any) error { return sink.ErrUnregistered } + +func (o *shutdownOutlet) Shutdown(context.Context) error { + if o.shutdownPanic { + panic("shutdown boom") + } + return o.shutdownErr +} + +func TestCheckFlusherReportsError(t *testing.T) { + t.Parallel() + + boom := errors.New("dirty flush") + errs := checkOutlet(func() sink.Outlet { return &flushOutlet{flushErr: boom} }) + if !containsErr(errs, "Flush on a clean outlet") { + t.Fatalf("checkOutlet did not flag a flush error; got %v", errs) + } +} + +func TestCheckFlusherRecoversPanic(t *testing.T) { + t.Parallel() + + errs := checkOutlet(func() sink.Outlet { return &flushOutlet{flushPanic: true} }) + if !containsErr(errs, "Flush panicked") { + t.Fatalf("checkOutlet did not report a flush panic; got %v", errs) + } +} + +func TestCheckShutdownerReportsError(t *testing.T) { + t.Parallel() + + boom := errors.New("dirty shutdown") + errs := checkOutlet(func() sink.Outlet { return &shutdownOutlet{shutdownErr: boom} }) + if !containsErr(errs, "Shutdown returned") { + t.Fatalf("checkOutlet did not flag a shutdown error; got %v", errs) + } +} + +func TestCheckShutdownerRecoversPanic(t *testing.T) { + t.Parallel() + + errs := checkOutlet(func() sink.Outlet { return &shutdownOutlet{shutdownPanic: true} }) + if !containsErr(errs, "Shutdown panicked") { + t.Fatalf("checkOutlet did not report a shutdown panic; got %v", errs) + } +} + +func TestCheckOutletRejectsNilOutlet(t *testing.T) { + t.Parallel() + + errs := checkOutlet(func() sink.Outlet { return nil }) + if !containsErr(errs, "nil Outlet") { + t.Fatalf("checkOutlet did not flag a nil Outlet; got %v", errs) + } +} + +func containsErr(errs []error, substr string) bool { + for _, e := range errs { + if e != nil && contains(e.Error(), substr) { + return true + } + } + return false +} + +func contains(haystack, needle string) bool { + for i := 0; i+len(needle) <= len(haystack); i++ { + if haystack[i:i+len(needle)] == needle { + return true + } + } + return false +} diff --git a/sink/sns/publish_batch_failure_test.go b/sink/sns/publish_batch_failure_test.go new file mode 100644 index 0000000..a27eeaa --- /dev/null +++ b/sink/sns/publish_batch_failure_test.go @@ -0,0 +1,91 @@ +// SPDX-License-Identifier: Apache-2.0 + +package sns_test + +import ( + "context" + "strings" + "testing" + + awssns "github.com/aws/aws-sdk-go-v2/service/sns" + "github.com/aws/aws-sdk-go-v2/service/sns/types" + + snssink "github.com/stablekernel/crucible/sink/sns" +) + +// batchFailureClient returns a configurable Failed list so the PublishBatch +// partial-failure path is exercised. SNS surfaces such a batch as HTTP 200, so +// only inspecting the response detects it. +type batchFailureClient struct { + failed []types.BatchResultErrorEntry +} + +func (b *batchFailureClient) Publish(context.Context, *awssns.PublishInput, ...func(*awssns.Options)) (*awssns.PublishOutput, error) { + return &awssns.PublishOutput{}, nil +} + +func (b *batchFailureClient) PublishBatch(context.Context, *awssns.PublishBatchInput, ...func(*awssns.Options)) (*awssns.PublishBatchOutput, error) { + return &awssns.PublishBatchOutput{Failed: b.failed}, nil +} + +func TestPublishBatch_PartialFailureReturnsError(t *testing.T) { + t.Parallel() + + c := &batchFailureClient{ + failed: []types.BatchResultErrorEntry{ + {Id: ptr("1"), Code: ptr("InternalError"), SenderFault: false}, + {Id: ptr("2"), Code: ptr("InvalidParameter"), SenderFault: true}, + }, + } + op := snssink.PublishBatch(&awssns.PublishBatchInput{ + TopicArn: ptr("arn:aws:sns:us-east-1:123456789012:orders"), + PublishBatchRequestEntries: []types.PublishBatchRequestEntry{ + {Id: ptr("1"), Message: ptr("a")}, + {Id: ptr("2"), Message: ptr("b")}, + }, + }) + + err := op.Apply(context.Background(), c) + if err == nil { + t.Fatal("Apply() = nil, want a partial-batch failure error") + } + msg := err.Error() + if !strings.Contains(msg, "2 batch entries failed") { + t.Errorf("error = %q, want the plural count", msg) + } + if !strings.Contains(msg, "1(InternalError)") || !strings.Contains(msg, "2(InvalidParameter)") { + t.Errorf("error = %q, want both failed ids/codes listed", msg) + } +} + +func TestPublishBatch_SingleFailureSingularPlural(t *testing.T) { + t.Parallel() + + c := &batchFailureClient{ + failed: []types.BatchResultErrorEntry{{Id: ptr("only"), Code: ptr("Throttled")}}, + } + op := snssink.PublishBatch(&awssns.PublishBatchInput{ + TopicArn: ptr("arn:aws:sns:us-east-1:123456789012:orders"), + PublishBatchRequestEntries: []types.PublishBatchRequestEntry{{Id: ptr("only"), Message: ptr("x")}}, + }) + err := op.Apply(context.Background(), c) + if err == nil { + t.Fatal("Apply() = nil, want a single-entry failure error") + } + if !strings.Contains(err.Error(), "1 batch entry failed") { + t.Errorf("error = %q, want the singular form", err.Error()) + } +} + +func TestPublishBatch_NoFailuresSucceeds(t *testing.T) { + t.Parallel() + + c := &batchFailureClient{} + op := snssink.PublishBatch(&awssns.PublishBatchInput{ + TopicArn: ptr("arn:aws:sns:us-east-1:123456789012:orders"), + PublishBatchRequestEntries: []types.PublishBatchRequestEntry{{Id: ptr("1"), Message: ptr("ok")}}, + }) + if err := op.Apply(context.Background(), c); err != nil { + t.Fatalf("Apply() = %v, want nil when no entries fail", err) + } +} diff --git a/sink/sns/sns.go b/sink/sns/sns.go index 7a8a234..d7e877a 100644 --- a/sink/sns/sns.go +++ b/sink/sns/sns.go @@ -13,8 +13,11 @@ package sns import ( "context" + "fmt" + "strings" awssns "github.com/aws/aws-sdk-go-v2/service/sns" + "github.com/aws/aws-sdk-go-v2/service/sns/types" csink "github.com/stablekernel/crucible/sink" ) @@ -53,13 +56,51 @@ func PublishInput(input *awssns.PublishInput) csink.Op[Client] { // PublishBatch returns an Op that calls sns.PublishBatch with the supplied // input. The SDK accepts up to ten entries per batch request; callers are // responsible for chunking larger slices before building the input. +// +// SNS returns HTTP 200 for a batch in which some entries were rejected, so the +// Op inspects the response's Failed list. When any entry failed the Op returns +// an error listing the failed IDs and codes, rather than reporting success. func PublishBatch(input *awssns.PublishBatchInput) csink.Op[Client] { return csink.OpFunc[Client](func(ctx context.Context, c Client) error { - _, err := c.PublishBatch(ctx, input) - return err + out, err := c.PublishBatch(ctx, input) + if err != nil { + return err + } + if len(out.Failed) > 0 { + return batchFailureError(out.Failed) + } + return nil }) } +// batchFailureError builds a descriptive error from partial-batch failures +// returned by SNS. SNS returns HTTP 200 for partial failures, so callers must +// inspect out.Failed explicitly. +func batchFailureError(failed []types.BatchResultErrorEntry) error { + ids := make([]string, 0, len(failed)) + for _, f := range failed { + id := "" + if f.Id != nil { + id = *f.Id + } + code := "" + if f.Code != nil { + code = *f.Code + } + ids = append(ids, fmt.Sprintf("%s(%s)", id, code)) + } + suffix := "y" + if len(failed) != 1 { + suffix = "ies" + } + return fmt.Errorf( + "sns: %d batch entr%s failed: %s", + len(failed), + suffix, + strings.Join(ids, ", "), + ) +} + // NewRegistry returns an empty registry of Op[Client] for callers to populate // with sink.Register. func NewRegistry() *csink.Registry[csink.Op[Client]] { diff --git a/sink/sqs/plural_test.go b/sink/sqs/plural_test.go new file mode 100644 index 0000000..eb6e238 --- /dev/null +++ b/sink/sqs/plural_test.go @@ -0,0 +1,55 @@ +// SPDX-License-Identifier: Apache-2.0 + +package sqs_test + +import ( + "context" + "strings" + "testing" + + "github.com/aws/aws-sdk-go-v2/service/sqs/types" +) + +// TestSendMessageBatchOp_MultiFailurePluralSuffix exercises the multi-failure +// branch of pluralSuffix: more than one failed entry uses the "ies" plural and +// lists every failed id/code pair. +func TestSendMessageBatchOp_MultiFailurePluralSuffix(t *testing.T) { + t.Parallel() + + c := &fakeClient{ + batchFailed: []types.BatchResultErrorEntry{ + {Id: str("item-0"), Code: str("InvalidMessageContents"), SenderFault: true}, + {Id: str("item-1"), Code: str("ServiceUnavailable")}, + }, + } + err := newOutlet(c).Sink(context.Background(), payloadC{Items: []string{"a", "b"}}) + if err == nil { + t.Fatal("Sink() = nil, want a multi-entry batch failure error") + } + msg := err.Error() + if !strings.Contains(msg, "2 batch entries failed") { + t.Errorf("error = %q, want the plural form %q", msg, "2 batch entries failed") + } + if !strings.Contains(msg, "item-0(InvalidMessageContents)") || !strings.Contains(msg, "item-1(ServiceUnavailable)") { + t.Errorf("error = %q, want both failed entries listed", msg) + } +} + +// TestSendMessageBatchOp_SingleFailureSingularSuffix locks the singular branch +// so the two plural arms stay covered together. +func TestSendMessageBatchOp_SingleFailureSingularSuffix(t *testing.T) { + t.Parallel() + + c := &fakeClient{ + batchFailed: []types.BatchResultErrorEntry{ + {Id: str("only"), Code: str("Throttled")}, + }, + } + err := newOutlet(c).Sink(context.Background(), payloadC{Items: []string{"a"}}) + if err == nil { + t.Fatal("Sink() = nil, want a single-entry batch failure error") + } + if !strings.Contains(err.Error(), "1 batch entry failed") { + t.Errorf("error = %q, want the singular form %q", err.Error(), "1 batch entry failed") + } +} diff --git a/sink/statsd/options_test.go b/sink/statsd/options_test.go new file mode 100644 index 0000000..9a1c126 --- /dev/null +++ b/sink/statsd/options_test.go @@ -0,0 +1,109 @@ +// SPDX-License-Identifier: Apache-2.0 + +package statsd_test + +import ( + "context" + "errors" + "testing" + "time" + + csink "github.com/stablekernel/crucible/sink" + statsdsink "github.com/stablekernel/crucible/sink/statsd" +) + +// TestWithName_AppearsInFlushError verifies WithName overrides the outlet name +// carried on the *sink.Error returned from a failing flush. +func TestWithName_AppearsInFlushError(t *testing.T) { + t.Parallel() + + boom := errors.New("emit failed") + fc := &fakeClient{err: boom} + agg := statsdsink.NewAggregator(fc, + statsdsink.WithName("custom-statsd"), + statsdsink.WithInterval(0), // no background loop; Flush is the only emit + ) + + if err := agg.Sink(context.Background(), statsdsink.Metric{Type: statsdsink.TypeCount, Name: "n", Int: 1, Rate: 1}); err != nil { + t.Fatalf("Sink() error = %v", err) + } + f, ok := agg.(csink.Flusher) + if !ok { + t.Fatal("Aggregator does not implement sink.Flusher") + } + err := f.Flush(context.Background()) + if !errors.Is(err, boom) { + t.Fatalf("Flush() = %v, want to wrap %v", err, boom) + } + var se *csink.Error + if !errors.As(err, &se) || se.Outlet != "custom-statsd" { + t.Fatalf("error = %+v, want Outlet=custom-statsd", se) + } +} + +// TestWithName_EmptyIgnored verifies an empty name leaves the default in place. +func TestWithName_EmptyIgnored(t *testing.T) { + t.Parallel() + + boom := errors.New("emit failed") + fc := &fakeClient{err: boom} + agg := statsdsink.NewAggregator(fc, statsdsink.WithName(""), statsdsink.WithInterval(0)) + _ = agg.Sink(context.Background(), statsdsink.Metric{Type: statsdsink.TypeCount, Name: "n", Int: 1, Rate: 1}) + + err := agg.(csink.Flusher).Flush(context.Background()) + var se *csink.Error + if !errors.As(err, &se) || se.Outlet != "statsd" { + t.Fatalf("error = %+v, want default Outlet=statsd", se) + } +} + +// TestWithClock_NonNilAcceptedNilIgnored verifies WithClock installs a non-nil +// clock and ignores a nil one. The aggregator stays usable in both cases. +func TestWithClock_NonNilAcceptedNilIgnored(t *testing.T) { + t.Parallel() + + fixed := time.Unix(42, 0) + clock := func() time.Time { return fixed } + + for _, tc := range []struct { + name string + now func() time.Time + }{ + {"non-nil clock", clock}, + {"nil clock falls back to default", nil}, + } { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + fc := &fakeClient{} + agg := statsdsink.NewAggregator(fc, statsdsink.WithClock(tc.now), statsdsink.WithInterval(0)) + if err := agg.Sink(context.Background(), statsdsink.Metric{Type: statsdsink.TypeGauge, Name: "g", Value: 1, Rate: 1}); err != nil { + t.Fatalf("Sink() error = %v", err) + } + if err := agg.(csink.Flusher).Flush(context.Background()); err != nil { + t.Fatalf("Flush() error = %v", err) + } + if got := len(fc.snapshot()); got != 1 { + t.Fatalf("emitted %d metrics, want 1", got) + } + }) + } +} + +// TestDial_ReturnsUsableClient verifies Dial constructs a Client over a UDP +// address without requiring a live StatsD server. The datadog SDK resolves the +// address lazily, so a valid host:port yields a ready client. +func TestDial_ReturnsUsableClient(t *testing.T) { + t.Parallel() + + c, err := statsdsink.Dial("127.0.0.1:8125") + if err != nil { + t.Fatalf("Dial() error = %v", err) + } + if c == nil { + t.Fatal("Dial() returned a nil Client") + } + if err := c.Count("dial.test", 1, nil, 1); err != nil { + t.Fatalf("Count() on dialed client error = %v", err) + } +} diff --git a/sink/statsd/statsd.go b/sink/statsd/statsd.go index 8cc60e5..304f26e 100644 --- a/sink/statsd/statsd.go +++ b/sink/statsd/statsd.go @@ -250,8 +250,8 @@ func NewAggregator(client Client, opts ...AggregatorOption) csink.Outlet { // directly; any other payload is resolved through the registry installed with // WithRegistry, and an unregistered type returns sink.ErrUnregistered. The // first Sink starts the background flush loop when an interval is configured. -func (a *Aggregator) Sink(_ context.Context, payload any) error { - m, ok := a.metricFor(payload) +func (a *Aggregator) Sink(ctx context.Context, payload any) error { + m, ok := a.metricFor(ctx, payload) if !ok { return csink.ErrUnregistered } @@ -263,7 +263,9 @@ func (a *Aggregator) Sink(_ context.Context, payload any) error { } // metricFor resolves payload to a Metric, either directly or via the registry. -func (a *Aggregator) metricFor(payload any) (Metric, bool) { +// The caller's context is threaded into the registry transformer so deadline +// and trace propagation survive the resolution step. +func (a *Aggregator) metricFor(ctx context.Context, payload any) (Metric, bool) { if m, ok := payload.(Metric); ok { return m, true } @@ -274,7 +276,7 @@ func (a *Aggregator) metricFor(payload any) (Metric, bool) { if !ok { return Metric{}, false } - return transform(context.Background(), payload), true + return transform(ctx, payload), true } // Flush swaps the live window for a fresh one and emits the captured window to diff --git a/sink/statsd/ticker_internal_test.go b/sink/statsd/ticker_internal_test.go new file mode 100644 index 0000000..4a2dddf --- /dev/null +++ b/sink/statsd/ticker_internal_test.go @@ -0,0 +1,88 @@ +// SPDX-License-Identifier: Apache-2.0 + +package statsd + +import ( + "context" + "testing" + "time" + + csink "github.com/stablekernel/crucible/sink" +) + +// TestRealTickerFiresStampedTicks verifies the production tick source wraps +// time.Ticker, stamps each tick with the injected clock, and releases its +// goroutine on Stop without leaking. +func TestRealTickerFiresStampedTicks(t *testing.T) { + t.Parallel() + + stamp := time.Unix(1000, 0) + tk := newRealTicker(time.Millisecond, func() time.Time { return stamp }) + t.Cleanup(tk.Stop) + + select { + case got := <-tk.C(): + if !got.Equal(stamp) { + t.Fatalf("tick stamp = %v, want %v (the injected clock)", got, stamp) + } + case <-time.After(time.Second): + t.Fatal("realTicker did not fire within 1s") + } +} + +// TestRealTickerStopIsClean verifies Stop returns once the run goroutine has +// exited, so a second Stop via t.Cleanup would be redundant rather than racy. +func TestRealTickerStopIsClean(t *testing.T) { + t.Parallel() + + tk := newRealTicker(time.Hour, time.Now) + done := make(chan struct{}) + go func() { + tk.Stop() + close(done) + }() + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("Stop did not return within 1s") + } +} + +// ctxKey is a private context key used to assert the caller context survives +// the registry transform step. +type ctxKey struct{} + +// TestMetricForThreadsContext verifies metricFor passes the caller's context +// into the registry transformer rather than substituting context.Background. +func TestMetricForThreadsContext(t *testing.T) { + t.Parallel() + + type payload struct{ V int } + reg := NewMetricRegistry() + var seen context.Context + csink.Register(reg, func(ctx context.Context, p payload) Metric { + seen = ctx + return Metric{Type: TypeCount, Name: "n", Int: int64(p.V), Rate: 1} + }) + + a := &Aggregator{ + client: &countingClient{flushed: make(chan struct{}, 1)}, + registry: reg, + name: "statsd", + cur: newWindow(), + stop: make(chan struct{}), + done: make(chan struct{}), + } + + ctx := context.WithValue(context.Background(), ctxKey{}, "sentinel") + m, ok := a.metricFor(ctx, payload{V: 3}) + if !ok { + t.Fatal("metricFor returned ok=false for a registered payload") + } + if m.Int != 3 { + t.Fatalf("metric Int = %d, want 3", m.Int) + } + if seen == nil || seen.Value(ctxKey{}) != "sentinel" { + t.Fatalf("transformer context = %v, want the caller context with the sentinel value", seen) + } +}