Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions sink/cloudwatch/cloudwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,19 @@ func PutLogEvents(input *cloudwatchlogs.PutLogEventsInput) csink.Op[Client] {
// group and stream. The event timestamp is set to the current time in
// milliseconds since the Unix epoch, as required by the CloudWatch Logs API.
// Callers that need to control the timestamp, sequence token, or multiple
// events in one request should use [PutLogEvents] instead.
// events in one request should use [PutLogEventAt] or [PutLogEvents] instead.
func PutLogEvent(logGroup, logStream, message string) csink.Op[Client] {
return PutLogEventAt(logGroup, logStream, message, time.Now())
}

// PutLogEventAt returns an Op that sends a single log event stamped with the
// supplied time. It is the deterministic, clock-injectable form of
// [PutLogEvent]: tests pass a fixed time and assert the emitted timestamp
// without depending on the wall clock. The time is converted to milliseconds
// since the Unix epoch, as required by the CloudWatch Logs API.
func PutLogEventAt(logGroup, logStream, message string, at time.Time) csink.Op[Client] {
return csink.OpFunc[Client](func(ctx context.Context, c Client) error {
ts := time.Now().UnixMilli()
ts := at.UnixMilli()
_, err := c.PutLogEvents(ctx, &cloudwatchlogs.PutLogEventsInput{
LogGroupName: &logGroup,
LogStreamName: &logStream,
Expand Down
59 changes: 59 additions & 0 deletions sink/cloudwatch/putlogevent_at_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// SPDX-License-Identifier: Apache-2.0

package cloudwatch_test

import (
"context"
"testing"
"time"

cw "github.com/stablekernel/crucible/sink/cloudwatch"
)

// TestPutLogEventAt_StampsSuppliedTime verifies the clock-injectable variant
// stamps the event with the caller-supplied time rather than the wall clock, so
// the emitted timestamp is deterministic.
func TestPutLogEventAt_StampsSuppliedTime(t *testing.T) {
t.Parallel()

at := time.Unix(1700000000, 0) // a fixed, known instant
c := &fakeClient{}
op := cw.PutLogEventAt("/app/events", "app-stream", "deterministic", at)
if err := op.Apply(context.Background(), c); err != nil {
t.Fatalf("Apply() error = %v", err)
}
if len(c.calls) != 1 {
t.Fatalf("PutLogEvents call count = %d, want 1", len(c.calls))
}
events := c.calls[0].LogEvents
if len(events) != 1 {
t.Fatalf("events count = %d, want 1", len(events))
}
if events[0].Timestamp == nil {
t.Fatal("event timestamp is nil")
}
if got, want := *events[0].Timestamp, at.UnixMilli(); got != want {
t.Fatalf("timestamp = %d, want %d (the supplied time in ms)", got, want)
}
if events[0].Message == nil || *events[0].Message != "deterministic" {
t.Errorf("message = %v, want %q", events[0].Message, "deterministic")
}
}

// TestPutLogEvent_DelegatesToPutLogEventAt verifies the convenience wrapper
// still emits exactly one event to the named group and stream.
func TestPutLogEvent_DelegatesToPutLogEventAt(t *testing.T) {
t.Parallel()

c := &fakeClient{}
op := cw.PutLogEvent("/app/events", "app-stream", "now")
if err := op.Apply(context.Background(), c); err != nil {
t.Fatalf("Apply() error = %v", err)
}
if len(c.calls) != 1 || len(c.calls[0].LogEvents) != 1 {
t.Fatalf("calls = %+v, want one call with one event", c.calls)
}
if c.calls[0].LogEvents[0].Timestamp == nil {
t.Error("PutLogEvent should stamp a timestamp")
}
}
107 changes: 107 additions & 0 deletions sink/dynamo/batchwrite_checked_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// SPDX-License-Identifier: Apache-2.0

package dynamo_test

import (
"context"
"errors"
"strings"
"testing"

"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"

"github.com/stablekernel/crucible/sink/dynamo"
)

// batchClient is a hand-rolled Client that returns a configurable
// BatchWriteItem response, so the UnprocessedItems path can be driven without a
// live table.
type batchClient struct {
unprocessed map[string][]types.WriteRequest
err error
}

func (b *batchClient) PutItem(context.Context, *dynamodb.PutItemInput, ...func(*dynamodb.Options)) (*dynamodb.PutItemOutput, error) {
return &dynamodb.PutItemOutput{}, nil
}

func (b *batchClient) UpdateItem(context.Context, *dynamodb.UpdateItemInput, ...func(*dynamodb.Options)) (*dynamodb.UpdateItemOutput, error) {
return &dynamodb.UpdateItemOutput{}, nil
}

func (b *batchClient) DeleteItem(context.Context, *dynamodb.DeleteItemInput, ...func(*dynamodb.Options)) (*dynamodb.DeleteItemOutput, error) {
return &dynamodb.DeleteItemOutput{}, nil
}

func (b *batchClient) TransactWriteItems(context.Context, *dynamodb.TransactWriteItemsInput, ...func(*dynamodb.Options)) (*dynamodb.TransactWriteItemsOutput, error) {
return &dynamodb.TransactWriteItemsOutput{}, nil
}

func (b *batchClient) BatchWriteItem(_ context.Context, _ *dynamodb.BatchWriteItemInput, _ ...func(*dynamodb.Options)) (*dynamodb.BatchWriteItemOutput, error) {
if b.err != nil {
return nil, b.err
}
return &dynamodb.BatchWriteItemOutput{UnprocessedItems: b.unprocessed}, nil
}

func writeReq(id string) types.WriteRequest {
return types.WriteRequest{
PutRequest: &types.PutRequest{Item: item(id)},
}
}

func TestBatchWriteChecked_NoUnprocessedSucceeds(t *testing.T) {
t.Parallel()

c := &batchClient{}
op := dynamo.BatchWriteChecked(&dynamodb.BatchWriteItemInput{})
if err := op.Apply(context.Background(), c); err != nil {
t.Fatalf("Apply() error = %v, want nil when no items are unprocessed", err)
}
}

func TestBatchWriteChecked_UnprocessedReturnsSentinel(t *testing.T) {
t.Parallel()

c := &batchClient{
unprocessed: map[string][]types.WriteRequest{
"orders": {writeReq("A-1"), writeReq("A-2")},
"shipments": {writeReq("S-1")},
},
}
op := dynamo.BatchWriteChecked(&dynamodb.BatchWriteItemInput{})
err := op.Apply(context.Background(), c)
if !errors.Is(err, dynamo.ErrUnprocessedItems) {
t.Fatalf("Apply() = %v, want to wrap ErrUnprocessedItems", err)
}
if !strings.Contains(err.Error(), "3 item(s)") || !strings.Contains(err.Error(), "2 table(s)") {
t.Errorf("error = %v, want it to report 3 items across 2 tables", err)
}
}

func TestBatchWriteChecked_RequestErrorPropagates(t *testing.T) {
t.Parallel()

boom := errors.New("throttled")
c := &batchClient{err: boom}
op := dynamo.BatchWriteChecked(&dynamodb.BatchWriteItemInput{})
if err := op.Apply(context.Background(), c); !errors.Is(err, boom) {
t.Fatalf("Apply() = %v, want wrapped %v", err, boom)
}
}

// TestBatchWrite_IgnoresUnprocessed documents that the unchecked BatchWrite
// still reports success when items are left unprocessed, so callers who need the
// stronger guarantee reach for BatchWriteChecked.
func TestBatchWrite_IgnoresUnprocessed(t *testing.T) {
t.Parallel()

c := &batchClient{
unprocessed: map[string][]types.WriteRequest{"orders": {writeReq("A-1")}},
}
op := dynamo.BatchWrite(&dynamodb.BatchWriteItemInput{})
if err := op.Apply(context.Background(), c); err != nil {
t.Fatalf("BatchWrite Apply() = %v, want nil (unchecked variant ignores unprocessed)", err)
}
}
38 changes: 36 additions & 2 deletions sink/dynamo/dynamo.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,21 @@ package dynamo

import (
"context"
"errors"
"fmt"

"github.com/aws/aws-sdk-go-v2/service/dynamodb"

csink "github.com/stablekernel/crucible/sink"
)

// ErrUnprocessedItems reports that a BatchWriteItem call returned without a
// request-level error but left one or more items unprocessed (DynamoDB returns
// these via UnprocessedItems on the output, typically under throttling).
// [BatchWriteChecked] returns an error wrapping this sentinel so a caller can
// match the class with errors.Is and retry the unprocessed items.
var ErrUnprocessedItems = errors.New("dynamo: BatchWriteItem left items unprocessed")

// Client is the narrow DynamoDB surface this destination needs. It declares
// only the write operations the package issues, so consumers wire the real
// *dynamodb.Client (which satisfies it structurally) while tests use a
Expand Down Expand Up @@ -70,15 +79,40 @@ func TransactWrite(in *dynamodb.TransactWriteItemsInput) csink.Op[Client] {

// BatchWrite returns an Op that issues a batch of put and delete requests. The
// SDK reports per-item failures via UnprocessedItems on the output; this Op
// returns only the request-level error, so a caller needing retry on
// unprocessed items should compose a custom Op instead.
// returns only the request-level error and ignores unprocessed items. Use
// [BatchWriteChecked] when a left-behind item must surface as an error.
func BatchWrite(in *dynamodb.BatchWriteItemInput) csink.Op[Client] {
return csink.OpFunc[Client](func(ctx context.Context, c Client) error {
_, err := c.BatchWriteItem(ctx, in)
return err
})
}

// BatchWriteChecked returns an Op that issues a batch of put and delete requests
// and treats unprocessed items as a failure. DynamoDB returns HTTP 200 with a
// populated UnprocessedItems map when it could not write every item (commonly
// under throttling), so the request-level error is nil even though work was
// dropped. The Op inspects the response and, when any item is unprocessed,
// returns an error wrapping [ErrUnprocessedItems] reporting how many tables and
// items were left behind. Pair it with sink.Reservoir or a retry middleware to
// resubmit the unprocessed items.
func BatchWriteChecked(in *dynamodb.BatchWriteItemInput) csink.Op[Client] {
return csink.OpFunc[Client](func(ctx context.Context, c Client) error {
out, err := c.BatchWriteItem(ctx, in)
if err != nil {
return err
}
var items int
for _, reqs := range out.UnprocessedItems {
items += len(reqs)
}
if items > 0 {
return fmt.Errorf("%w: %d item(s) across %d table(s)", ErrUnprocessedItems, items, len(out.UnprocessedItems))
}
return nil
})
}

// NewRegistry returns an empty registry of Op[Client] for callers to populate
// with sink.Register.
func NewRegistry() *csink.Registry[csink.Op[Client]] {
Expand Down
4 changes: 3 additions & 1 deletion sink/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ type Phase string

const (
// PhaseTransform marks a failure turning a payload into a destination
// operation (a registry transformer returning an error-bearing op).
// operation. Registry transformers cannot themselves return an error, so the
// built-in Emitter never emits this phase today; it is reserved for outlets
// whose payload-to-operation step can fail and want to classify it.
PhaseTransform Phase = "transform"
// PhaseApply marks a failure applying the operation to the destination
// client (the write itself).
Expand Down
14 changes: 8 additions & 6 deletions sink/eventbridge/eventbridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func PutEvents(input *awseb.PutEventsInput) csink.Op[Client] {
return err
}
if out.FailedEntryCount > 0 {
return partialFailureError(out.Entries)
return partialFailureError(out.FailedEntryCount, out.Entries)
}
return nil
})
Expand All @@ -73,8 +73,11 @@ func PutEvent(eventBusName, source, detailType, detail string) csink.Op[Client]

// partialFailureError builds a descriptive error from the failed entries
// returned in a PutEvents response. EventBridge returns HTTP 200 for partial
// failures, so the Op must inspect FailedEntryCount explicitly.
func partialFailureError(entries []types.PutEventsResultEntry) error {
// failures, so the Op inspects FailedEntryCount explicitly and passes it here as
// the authoritative count. The reported number is FailedEntryCount, not the
// number of entries that happened to carry an ErrorCode, so the count and its
// plural stay correct even when an entry omits a code.
func partialFailureError(failed int32, entries []types.PutEventsResultEntry) error {
var parts []string
for _, e := range entries {
if e.ErrorCode == nil {
Expand All @@ -87,14 +90,13 @@ func partialFailureError(entries []types.PutEventsResultEntry) error {
}
parts = append(parts, fmt.Sprintf("%s: %s", code, msg))
}
n := len(parts)
suffix := "y"
if n != 1 {
if failed != 1 {
suffix = "ies"
}
return fmt.Errorf(
"eventbridge: %d entr%s failed: %s",
n,
failed,
suffix,
strings.Join(parts, "; "),
)
Expand Down
84 changes: 84 additions & 0 deletions sink/eventbridge/partial_failure_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// SPDX-License-Identifier: Apache-2.0

package eventbridge_test

import (
"context"
"strings"
"testing"

awseb "github.com/aws/aws-sdk-go-v2/service/eventbridge"
"github.com/aws/aws-sdk-go-v2/service/eventbridge/types"
csink "github.com/stablekernel/crucible/sink"
ebsink "github.com/stablekernel/crucible/sink/eventbridge"
)

// TestPutEvents_NilErrorCodeSkipped verifies that entries with a nil ErrorCode
// are skipped when building the detail list, while the reported count still
// reflects the authoritative FailedEntryCount rather than the number of entries
// that happened to carry a code.
func TestPutEvents_NilErrorCodeSkipped(t *testing.T) {
t.Parallel()

c := &fakeClient{
output: &awseb.PutEventsOutput{
FailedEntryCount: 2,
Entries: []types.PutEventsResultEntry{
{EventId: ptr("ok-1")}, // succeeded: no ErrorCode
{ErrorCode: ptr("ThrottlingException"), ErrorMessage: ptr("slow down")}, // failed
{}, // failed but no code reported
},
},
}
reg := ebsink.NewRegistry()
csink.Register(reg, func(_ context.Context, o orderPlaced) csink.Op[ebsink.Client] {
return ebsink.PutEvent("orders", "src", "OrderPlaced", `{}`)
})
outlet := ebsink.New(c, reg)

err := outlet.Sink(context.Background(), orderPlaced{OrderID: "ORD-1"})
if err == nil {
t.Fatal("Sink() = nil, want partial failure error")
}
msg := err.Error()
// The count comes from FailedEntryCount (2), not from the single entry that
// carried an ErrorCode.
if !strings.Contains(msg, "2 entries failed") {
t.Errorf("error = %q, want the authoritative count %q", msg, "2 entries failed")
}
// The nil-code entry is skipped, so only the coded detail appears.
if !strings.Contains(msg, "ThrottlingException") {
t.Errorf("error = %q, want the coded entry detail", msg)
}
if strings.Contains(msg, "ok-1") {
t.Errorf("error = %q, should not mention the succeeded entry", msg)
}
}

// TestPutEvents_SingleFailureSingularPlural locks the singular plural arm so a
// single failed entry reads "1 entry failed".
func TestPutEvents_SingleFailureSingularPlural(t *testing.T) {
t.Parallel()

c := &fakeClient{
output: &awseb.PutEventsOutput{
FailedEntryCount: 1,
Entries: []types.PutEventsResultEntry{
{ErrorCode: ptr("InternalFailure"), ErrorMessage: ptr("retry")},
},
},
}
reg := ebsink.NewRegistry()
csink.Register(reg, func(_ context.Context, o orderPlaced) csink.Op[ebsink.Client] {
return ebsink.PutEvent("orders", "src", "OrderPlaced", `{}`)
})
outlet := ebsink.New(c, reg)

err := outlet.Sink(context.Background(), orderPlaced{OrderID: "ORD-2"})
if err == nil {
t.Fatal("Sink() = nil, want failure error")
}
if !strings.Contains(err.Error(), "1 entry failed") {
t.Errorf("error = %q, want the singular form %q", err.Error(), "1 entry failed")
}
}
Loading
Loading