From e569fd803e85e9e7a48a5dbaf52dc5e3a93c46ee Mon Sep 17 00:00:00 2001 From: Arkadiy Kukarkin Date: Wed, 29 Apr 2026 13:25:10 +0200 Subject: [PATCH 1/4] pdp: add /pdp/piece/pull client and combined extraData encoder Curio's POST /pdp/piece/pull is the FWSS-mediated SP-submits flow: client posts EIP-712-signed extraData and HTTPS source URLs, Curio fetches the pieces, validates extraData via eth_call against PDPVerifier.addPieces(), and submits the on-chain tx. Idempotent on (service, sha256(extraData), dataSetId, recordKeeper). Used by singularity to push pieces to SPs without holding the SP's wallet key. Adds: - Server.PullPieces and Server.WaitForPullPieces in pdp/server.go - PullStatus, PullPieceInput, PullPieceStatus, PullPiecesRequest, PullPiecesResponse, PullPiecesOptions in pdp/types.go - EncodeCreateDataSetAndAddPiecesExtraData in pdp/abi.go for the abi.encode(bytes,bytes) wrapper used when DataSetID is 0 (atomic create-and-add path) --- pdp/abi.go | 36 ++++++++++ pdp/abi_test.go | 134 ++++++++++++++++++++++++++++++++++++ pdp/server.go | 76 ++++++++++++++++++++ pdp/server_test.go | 168 +++++++++++++++++++++++++++++++++++++++++++++ pdp/types.go | 60 ++++++++++++++++ 5 files changed, 474 insertions(+) create mode 100644 pdp/abi_test.go diff --git a/pdp/abi.go b/pdp/abi.go index cc4afd6..1ef3b13 100644 --- a/pdp/abi.go +++ b/pdp/abi.go @@ -1,8 +1,10 @@ package pdp import ( + "encoding/hex" "fmt" "math/big" + "strings" "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/common" @@ -84,3 +86,37 @@ func EncodeScheduleRemovalsExtraData(signature []byte) (string, error) { return "0x" + common.Bytes2Hex(encoded), nil } + + +// EncodeCreateDataSetAndAddPiecesExtraData wraps the two extraData blobs +// (from EncodeDataSetCreateData and EncodeAddPiecesExtraData) into the +// combined abi.encode(bytes,bytes) form Curio's /pdp/piece/pull expects +// when atomically creating a new data set and adding pieces in one shot. +// Inputs are hex strings (with or without 0x prefix), as produced by the +// sibling encoders in this file. +func EncodeCreateDataSetAndAddPiecesExtraData(createDataSetExtraHex, addPiecesExtraHex string) (string, error) { + createBytes, err := decodeHex(createDataSetExtraHex) + if err != nil { + return "", fmt.Errorf("invalid createDataSet extra data: %w", err) + } + addPiecesBytes, err := decodeHex(addPiecesExtraHex) + if err != nil { + return "", fmt.Errorf("invalid addPieces extra data: %w", err) + } + + args := abi.Arguments{ + {Type: bytesType}, + {Type: bytesType}, + } + + encoded, err := args.Pack(createBytes, addPiecesBytes) + if err != nil { + return "", fmt.Errorf("failed to encode create-and-add extra data: %w", err) + } + + return "0x" + common.Bytes2Hex(encoded), nil +} + +func decodeHex(s string) ([]byte, error) { + return hex.DecodeString(strings.TrimPrefix(s, "0x")) +} diff --git a/pdp/abi_test.go b/pdp/abi_test.go new file mode 100644 index 0000000..e94ed82 --- /dev/null +++ b/pdp/abi_test.go @@ -0,0 +1,134 @@ +package pdp + +import ( + "encoding/hex" + "math/big" + "strings" + "testing" + + "github.com/ethereum/go-ethereum/accounts/abi" +) + +func TestEncodeCreateDataSetAndAddPiecesExtraData(t *testing.T) { + t.Run("round-trips through abi.Unpack", func(t *testing.T) { + create := []byte{0xde, 0xad, 0xbe, 0xef} + add := []byte{0xfe, 0xed, 0xfa, 0xce, 0xca, 0xfe} + + out, err := EncodeCreateDataSetAndAddPiecesExtraData( + "0x"+hex.EncodeToString(create), + "0x"+hex.EncodeToString(add), + ) + if err != nil { + t.Fatalf("encode failed: %v", err) + } + if !strings.HasPrefix(out, "0x") { + t.Fatalf("output missing 0x prefix: %s", out) + } + + raw, err := hex.DecodeString(out[2:]) + if err != nil { + t.Fatalf("decode output: %v", err) + } + + args := abi.Arguments{ + {Type: bytesType}, + {Type: bytesType}, + } + unpacked, err := args.Unpack(raw) + if err != nil { + t.Fatalf("unpack: %v", err) + } + if len(unpacked) != 2 { + t.Fatalf("expected 2 fields, got %d", len(unpacked)) + } + gotCreate, ok := unpacked[0].([]byte) + if !ok { + t.Fatalf("first field not []byte: %T", unpacked[0]) + } + gotAdd, ok := unpacked[1].([]byte) + if !ok { + t.Fatalf("second field not []byte: %T", unpacked[1]) + } + if string(gotCreate) != string(create) { + t.Errorf("createDataSet round-trip mismatch: got %x, want %x", gotCreate, create) + } + if string(gotAdd) != string(add) { + t.Errorf("addPieces round-trip mismatch: got %x, want %x", gotAdd, add) + } + }) + + t.Run("accepts inputs without 0x prefix", func(t *testing.T) { + _, err := EncodeCreateDataSetAndAddPiecesExtraData("deadbeef", "feedface") + if err != nil { + t.Fatalf("expected no-prefix inputs to work, got %v", err) + } + }) + + t.Run("rejects non-hex input", func(t *testing.T) { + _, err := EncodeCreateDataSetAndAddPiecesExtraData("0xnothex!", "0xdeadbeef") + if err == nil { + t.Error("expected error on non-hex createDataSet input") + } + _, err = EncodeCreateDataSetAndAddPiecesExtraData("0xdeadbeef", "0xnothex!") + if err == nil { + t.Error("expected error on non-hex addPieces input") + } + }) + + t.Run("round-trips real CreateDataSet+AddPieces extras", func(t *testing.T) { + // produce a CreateDataSet extraData and an AddPieces extraData via + // the sibling encoders, then wrap. Verifies that the canonical caller + // path (sign -> encode each -> wrap combined) round-trips cleanly. + auth := testAuthHelper(t) + clientDataSetID := big.NewInt(1) + payee := auth.Address() + + createSig, err := auth.SignCreateDataSet(clientDataSetID, payee, nil) + if err != nil { + t.Fatalf("sign create: %v", err) + } + createExtra, err := EncodeDataSetCreateData(payee, clientDataSetID, nil, createSig.Signature) + if err != nil { + t.Fatalf("encode create extra: %v", err) + } + + nonce := big.NewInt(42) + addSig, err := auth.SignAddPieces(clientDataSetID, nonce, nil, nil) + if err != nil { + t.Fatalf("sign add: %v", err) + } + addExtra, err := EncodeAddPiecesExtraData(nonce, nil, addSig.Signature) + if err != nil { + t.Fatalf("encode add extra: %v", err) + } + + combined, err := EncodeCreateDataSetAndAddPiecesExtraData(createExtra, addExtra) + if err != nil { + t.Fatalf("combine: %v", err) + } + if !strings.HasPrefix(combined, "0x") { + t.Fatalf("combined missing 0x prefix: %s", combined) + } + + raw, err := hex.DecodeString(combined[2:]) + if err != nil { + t.Fatalf("decode combined: %v", err) + } + args := abi.Arguments{ + {Type: bytesType}, + {Type: bytesType}, + } + unpacked, err := args.Unpack(raw) + if err != nil { + t.Fatalf("unpack combined: %v", err) + } + gotCreateHex := "0x" + hex.EncodeToString(unpacked[0].([]byte)) + gotAddHex := "0x" + hex.EncodeToString(unpacked[1].([]byte)) + if gotCreateHex != createExtra { + t.Errorf("createDataSet extra mismatch") + } + if gotAddHex != addExtra { + t.Errorf("addPieces extra mismatch") + } + }) +} diff --git a/pdp/server.go b/pdp/server.go index af30fe8..95e442f 100644 --- a/pdp/server.go +++ b/pdp/server.go @@ -446,6 +446,82 @@ func (s *Server) GetDataSet(ctx context.Context, dataSetID int) (*DataSetData, e } +// PullPieces issues POST /pdp/piece/pull. The endpoint is idempotent on +// (service, sha256(extraData), dataSetId, recordKeeper); calling with the +// same arguments returns the current status of an existing pull rather +// than starting a new one. Authorization is the EIP-712-signed extraData, +// which Curio verifies via eth_call against PDPVerifier.addPieces() before +// any state change. Pass DataSetID == 0 to atomically create a new data +// set and add the pieces in one shot. +func (s *Server) PullPieces(ctx context.Context, opts PullPiecesOptions) (*PullPiecesResponse, error) { + reqBody := PullPiecesRequest{ + ExtraData: opts.ExtraData, + RecordKeeper: opts.RecordKeeper, + Pieces: opts.Pieces, + } + if opts.DataSetID > 0 { + id := opts.DataSetID + reqBody.DataSetID = &id + } + + body, err := json.Marshal(reqBody) + if err != nil { + return nil, fmt.Errorf("failed to marshal pull pieces request: %w", err) + } + + req, err := http.NewRequestWithContext(ctx, "POST", s.baseURL+"/pdp/piece/pull", bytes.NewReader(body)) + if err != nil { + return nil, fmt.Errorf("failed to create pull pieces request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + + resp, err := s.httpClient.Do(req) + if err != nil { + return nil, fmt.Errorf("pull pieces request failed: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated && resp.StatusCode != http.StatusAccepted { + respBody, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("unexpected status %d: %s", resp.StatusCode, string(respBody)) + } + + var pullResp PullPiecesResponse + if err := json.NewDecoder(resp.Body).Decode(&pullResp); err != nil { + return nil, fmt.Errorf("failed to decode pull pieces response: %w", err) + } + + return &pullResp, nil +} + + +// WaitForPullPieces re-POSTs the same pull request (idempotent) until the +// aggregate status is complete or failed, or the timeout elapses. +func (s *Server) WaitForPullPieces(ctx context.Context, opts PullPiecesOptions, timeout time.Duration) (*PullPiecesResponse, error) { + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + var last *PullPiecesResponse + err := retry.Poll(ctx, 4*time.Second, timeout, func() (bool, error) { + resp, err := s.PullPieces(ctx, opts) + if err != nil { + return false, err + } + last = resp + switch resp.Status { + case PullStatusComplete, PullStatusFailed: + return true, nil + default: + return false, nil + } + }) + if err != nil { + return last, err + } + return last, nil +} + + func (s *Server) Ping(ctx context.Context) error { req, err := http.NewRequestWithContext(ctx, "GET", s.baseURL+"/pdp/ping", nil) if err != nil { diff --git a/pdp/server_test.go b/pdp/server_test.go index d881024..f509f40 100644 --- a/pdp/server_test.go +++ b/pdp/server_test.go @@ -3,10 +3,14 @@ package pdp import ( "context" "encoding/hex" + "encoding/json" + "io" "math/big" "net/http" "net/http/httptest" + "sync/atomic" "testing" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" @@ -246,3 +250,167 @@ func TestServer_GetDataSet(t *testing.T) { } }) } + +func TestServer_PullPieces(t *testing.T) { + pieces := []PullPieceInput{ + {PieceCID: "bafkz...A", SourceURL: "https://example.com/piece/bafkz...A"}, + {PieceCID: "bafkz...B", SourceURL: "https://example.com/piece/bafkz...B"}, + } + recordKeeper := "0x02925630df557F957f70E112bA06e50965417CA0" + extraData := "0xdeadbeef" + + t.Run("create-new omits dataSetId on the wire", func(t *testing.T) { + var seen PullPiecesRequest + server, _ := setupMockServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != "POST" { + t.Errorf("Expected POST, got %s", r.Method) + } + if r.URL.Path != "/pdp/piece/pull" { + t.Errorf("Expected path /pdp/piece/pull, got %s", r.URL.Path) + } + body, _ := io.ReadAll(r.Body) + // re-decode into a map to assert dataSetId truly absent + var raw map[string]any + if err := json.Unmarshal(body, &raw); err != nil { + t.Fatalf("decode raw body: %v", err) + } + if _, present := raw["dataSetId"]; present { + t.Errorf("expected dataSetId omitted for new set, body=%s", string(body)) + } + if err := json.Unmarshal(body, &seen); err != nil { + t.Fatalf("decode typed body: %v", err) + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{"status":"pending","pieces":[{"pieceCid":"bafkz...A","status":"pending"},{"pieceCid":"bafkz...B","status":"pending"}]}`)) + })) + + resp, err := server.PullPieces(context.Background(), PullPiecesOptions{ + RecordKeeper: recordKeeper, + Pieces: pieces, + ExtraData: extraData, + DataSetID: 0, + }) + if err != nil { + t.Fatalf("PullPieces failed: %v", err) + } + if resp.Status != PullStatusPending { + t.Errorf("Status = %s, want pending", resp.Status) + } + if len(resp.Pieces) != 2 { + t.Errorf("len(Pieces) = %d, want 2", len(resp.Pieces)) + } + if seen.RecordKeeper != recordKeeper { + t.Errorf("RecordKeeper = %s, want %s", seen.RecordKeeper, recordKeeper) + } + if seen.ExtraData != extraData { + t.Errorf("ExtraData = %s, want %s", seen.ExtraData, extraData) + } + if len(seen.Pieces) != 2 || seen.Pieces[0].PieceCID != pieces[0].PieceCID { + t.Errorf("Pieces mismatch: %+v", seen.Pieces) + } + }) + + t.Run("existing set sends dataSetId on the wire", func(t *testing.T) { + server, _ := setupMockServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, _ := io.ReadAll(r.Body) + var raw map[string]any + if err := json.Unmarshal(body, &raw); err != nil { + t.Fatalf("decode raw body: %v", err) + } + id, ok := raw["dataSetId"].(float64) + if !ok { + t.Errorf("expected numeric dataSetId, body=%s", string(body)) + } + if uint64(id) != 13245 { + t.Errorf("dataSetId = %v, want 13245", id) + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{"status":"complete","pieces":[{"pieceCid":"bafkz...A","status":"complete"}]}`)) + })) + + resp, err := server.PullPieces(context.Background(), PullPiecesOptions{ + RecordKeeper: recordKeeper, + Pieces: pieces[:1], + ExtraData: extraData, + DataSetID: 13245, + }) + if err != nil { + t.Fatalf("PullPieces failed: %v", err) + } + if resp.Status != PullStatusComplete { + t.Errorf("Status = %s, want complete", resp.Status) + } + }) + + t.Run("server error is surfaced", func(t *testing.T) { + server, _ := setupMockServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusBadRequest) + _, _ = w.Write([]byte("extraData validation failed")) + })) + + _, err := server.PullPieces(context.Background(), PullPiecesOptions{ + RecordKeeper: recordKeeper, + Pieces: pieces, + ExtraData: extraData, + }) + if err == nil { + t.Error("Expected error from 400 response, got nil") + } + }) +} + +func TestServer_WaitForPullPieces(t *testing.T) { + pieces := []PullPieceInput{ + {PieceCID: "bafkz...A", SourceURL: "https://example.com/piece/bafkz...A"}, + } + opts := PullPiecesOptions{ + RecordKeeper: "0x02925630df557F957f70E112bA06e50965417CA0", + Pieces: pieces, + ExtraData: "0xdeadbeef", + } + + t.Run("polls until complete", func(t *testing.T) { + var hits int32 + server, _ := setupMockServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + n := atomic.AddInt32(&hits, 1) + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + if n < 2 { + _, _ = w.Write([]byte(`{"status":"inProgress","pieces":[{"pieceCid":"bafkz...A","status":"inProgress"}]}`)) + return + } + _, _ = w.Write([]byte(`{"status":"complete","pieces":[{"pieceCid":"bafkz...A","status":"complete"}]}`)) + })) + + // shrink the poll interval inside retry.Poll: not configurable, but + // the default 4s window is fine if we give a generous timeout. + resp, err := server.WaitForPullPieces(context.Background(), opts, 30*time.Second) + if err != nil { + t.Fatalf("WaitForPullPieces failed: %v", err) + } + if resp.Status != PullStatusComplete { + t.Errorf("Status = %s, want complete", resp.Status) + } + if atomic.LoadInt32(&hits) < 2 { + t.Errorf("expected at least 2 polls, got %d", hits) + } + }) + + t.Run("returns failed status without error", func(t *testing.T) { + server, _ := setupMockServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{"status":"failed","pieces":[{"pieceCid":"bafkz...A","status":"failed"}]}`)) + })) + + resp, err := server.WaitForPullPieces(context.Background(), opts, 5*time.Second) + if err != nil { + t.Fatalf("WaitForPullPieces unexpectedly errored on failed status: %v", err) + } + if resp.Status != PullStatusFailed { + t.Errorf("Status = %s, want failed", resp.Status) + } + }) +} diff --git a/pdp/types.go b/pdp/types.go index fdd40c7..3ce4da9 100644 --- a/pdp/types.go +++ b/pdp/types.go @@ -112,6 +112,66 @@ type UploadCompleteResponse struct { Size int64 `json:"size"` } +// PullStatus is the wire-level status enum for a pull request or an +// individual piece within one. Aggregate (response-level) status reflects +// the worst case across all pieces: failed > retrying > inProgress > +// pending > complete. +type PullStatus string + +const ( + PullStatusPending PullStatus = "pending" + PullStatusInProgress PullStatus = "inProgress" + PullStatusRetrying PullStatus = "retrying" + PullStatusComplete PullStatus = "complete" + PullStatusFailed PullStatus = "failed" +) + +// PullPieceInput names a piece and the source URL Curio should fetch it +// from. SourceURL must be HTTPS and end in /piece/{pieceCid}. +type PullPieceInput struct { + PieceCID string `json:"pieceCid"` + SourceURL string `json:"sourceUrl"` +} + +// PullPieceStatus is per-piece status in a PullPiecesResponse. +type PullPieceStatus struct { + PieceCID string `json:"pieceCid"` + Status PullStatus `json:"status"` +} + +// PullPiecesRequest is the JSON body Curio expects on POST /pdp/piece/pull. +// DataSetID is omitted from the wire when nil; callers signal +// "create a new set" by passing nil and supplying a combined +// CreateDataSet+AddPieces extraData (see EncodeCreateDataSetAndAddPiecesExtraData). +type PullPiecesRequest struct { + ExtraData string `json:"extraData"` + RecordKeeper string `json:"recordKeeper"` + Pieces []PullPieceInput `json:"pieces"` + DataSetID *uint64 `json:"dataSetId,omitempty"` +} + +// PullPiecesResponse is the JSON body Curio returns from POST /pdp/piece/pull. +type PullPiecesResponse struct { + Status PullStatus `json:"status"` + Pieces []PullPieceStatus `json:"pieces"` +} + +// PullPiecesOptions is the higher-level argument for Server.PullPieces and +// Server.WaitForPullPieces. DataSetID == 0 signals "create a new set" +// (the request will omit the field on the wire). +type PullPiecesOptions struct { + // RecordKeeper is the FWSS contract address (hex). Required. + RecordKeeper string + // Pieces lists the pieces to pull and their source URLs. + Pieces []PullPieceInput + // ExtraData is the EIP-712-signed authorization blob. For new sets, + // build it with EncodeCreateDataSetAndAddPiecesExtraData. For + // existing sets, EncodeAddPiecesExtraData alone. + ExtraData string + // DataSetID is the target set ID, or 0 to create a new set atomically. + DataSetID uint64 +} + // ManagerConfig holds configuration options for the Manager type ManagerConfig struct { // GasBufferPercent is the percentage buffer to add to gas estimates (0-100) From 70bf37a85c099261fff197019b2bbaca241bd2dd Mon Sep 17 00:00:00 2001 From: Arkadiy Kukarkin Date: Wed, 29 Apr 2026 13:39:58 +0200 Subject: [PATCH 2/4] pdp: gofmt files touched by piece-pull PR abi.go and server.go had pre-existing whitespace drift (extra blank lines between top-level decls, misaligned var-block alignment). CI's golangci-lint v2 default doesn't run gofmt so master accumulated the drift, but cleaning it up keeps editors+pre-commit hooks happy. --- pdp/abi.go | 14 ++++---------- pdp/server.go | 16 ---------------- 2 files changed, 4 insertions(+), 26 deletions(-) diff --git a/pdp/abi.go b/pdp/abi.go index 1ef3b13..facf8d7 100644 --- a/pdp/abi.go +++ b/pdp/abi.go @@ -10,17 +10,14 @@ import ( "github.com/ethereum/go-ethereum/common" ) - - var ( - addressType, _ = abi.NewType("address", "", nil) - uint256Type, _ = abi.NewType("uint256", "", nil) - stringArrayType, _ = abi.NewType("string[]", "", nil) + addressType, _ = abi.NewType("address", "", nil) + uint256Type, _ = abi.NewType("uint256", "", nil) + stringArrayType, _ = abi.NewType("string[]", "", nil) stringArray2DType, _ = abi.NewType("string[][]", "", nil) - bytesType, _ = abi.NewType("bytes", "", nil) + bytesType, _ = abi.NewType("bytes", "", nil) ) - func EncodeDataSetCreateData(payer common.Address, clientDataSetID *big.Int, metadata []MetadataEntry, signature []byte) (string, error) { keys := make([]string, len(metadata)) values := make([]string, len(metadata)) @@ -45,7 +42,6 @@ func EncodeDataSetCreateData(payer common.Address, clientDataSetID *big.Int, met return "0x" + common.Bytes2Hex(encoded), nil } - func EncodeAddPiecesExtraData(nonce *big.Int, metadata [][]MetadataEntry, signature []byte) (string, error) { keys := make([][]string, len(metadata)) values := make([][]string, len(metadata)) @@ -73,7 +69,6 @@ func EncodeAddPiecesExtraData(nonce *big.Int, metadata [][]MetadataEntry, signat return "0x" + common.Bytes2Hex(encoded), nil } - func EncodeScheduleRemovalsExtraData(signature []byte) (string, error) { args := abi.Arguments{ {Type: bytesType}, @@ -87,7 +82,6 @@ func EncodeScheduleRemovalsExtraData(signature []byte) (string, error) { return "0x" + common.Bytes2Hex(encoded), nil } - // EncodeCreateDataSetAndAddPiecesExtraData wraps the two extraData blobs // (from EncodeDataSetCreateData and EncodeAddPiecesExtraData) into the // combined abi.encode(bytes,bytes) form Curio's /pdp/piece/pull expects diff --git a/pdp/server.go b/pdp/server.go index 95e442f..dac61aa 100644 --- a/pdp/server.go +++ b/pdp/server.go @@ -29,7 +29,6 @@ type Server struct { uploadClientVal *http.Client } - func NewServer(baseURL string, authHelper *AuthHelper) *Server { baseURL = strings.TrimSuffix(baseURL, "/") @@ -51,12 +50,10 @@ func (s *Server) uploadClient() *http.Client { return s.uploadClientVal } - func (s *Server) BaseURL() string { return s.baseURL } - func (s *Server) CreateDataSet(ctx context.Context, recordKeeper string, extraData string) (*CreateDataSetResponse, error) { reqBody := map[string]string{ "recordKeeper": recordKeeper, @@ -104,7 +101,6 @@ func (s *Server) CreateDataSet(ctx context.Context, recordKeeper string, extraDa }, nil } - func (s *Server) GetDataSetCreationStatus(ctx context.Context, txHash string) (*DataSetCreationStatus, error) { req, err := http.NewRequestWithContext(ctx, "GET", s.baseURL+"/pdp/data-sets/created/"+txHash, nil) if err != nil { @@ -135,7 +131,6 @@ func (s *Server) GetDataSetCreationStatus(ctx context.Context, txHash string) (* return &status, nil } - func (s *Server) WaitForDataSetCreation(ctx context.Context, txHash string, timeout time.Duration) (*DataSetCreationStatus, error) { ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() @@ -155,7 +150,6 @@ func (s *Server) WaitForDataSetCreation(ctx context.Context, txHash string, time return status, nil } - func (s *Server) AddPieces(ctx context.Context, dataSetID int, pieceCIDs []cid.Cid, extraData string) (*AddPiecesResponse, error) { pieces := make([]PieceData, len(pieceCIDs)) for i, c := range pieceCIDs { @@ -213,7 +207,6 @@ func (s *Server) AddPieces(ctx context.Context, dataSetID int, pieceCIDs []cid.C }, nil } - func (s *Server) GetPieceAdditionStatus(ctx context.Context, dataSetID int, txHash string) (*PieceAdditionStatus, error) { url := fmt.Sprintf("%s/pdp/data-sets/%d/pieces/added/%s", s.baseURL, dataSetID, txHash) req, err := http.NewRequestWithContext(ctx, "GET", url, nil) @@ -245,7 +238,6 @@ func (s *Server) GetPieceAdditionStatus(ctx context.Context, dataSetID int, txHa return &status, nil } - func (s *Server) WaitForPieceAddition(ctx context.Context, dataSetID int, txHash string, timeout time.Duration) (*PieceAdditionStatus, error) { ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() @@ -265,7 +257,6 @@ func (s *Server) WaitForPieceAddition(ctx context.Context, dataSetID int, txHash return status, nil } - func (s *Server) UploadPiece(ctx context.Context, data io.Reader, size int64, pieceCID cid.Cid) (*UploadPieceResponse, error) { createReq, err := http.NewRequestWithContext(ctx, "POST", s.baseURL+"/pdp/piece/uploads", nil) if err != nil { @@ -345,7 +336,6 @@ func (s *Server) UploadPiece(ctx context.Context, data io.Reader, size int64, pi }, nil } - func (s *Server) FindPiece(ctx context.Context, pieceCID cid.Cid) error { params := url.Values{} params.Set("pieceCid", pieceCID.String()) @@ -374,7 +364,6 @@ func (s *Server) FindPiece(ctx context.Context, pieceCID cid.Cid) error { return nil } - func (s *Server) WaitForPiece(ctx context.Context, pieceCID cid.Cid, timeout time.Duration) error { return retry.Poll(ctx, 5*time.Second, timeout, func() (bool, error) { err := s.FindPiece(ctx, pieceCID) @@ -388,7 +377,6 @@ func (s *Server) WaitForPiece(ctx context.Context, pieceCID cid.Cid, timeout tim }) } - func (s *Server) DownloadPiece(ctx context.Context, pieceCID cid.Cid) ([]byte, error) { reqURL := fmt.Sprintf("%s/pdp/piece/%s", s.baseURL, pieceCID.String()) req, err := http.NewRequestWithContext(ctx, "GET", reqURL, nil) @@ -414,7 +402,6 @@ func (s *Server) DownloadPiece(ctx context.Context, pieceCID cid.Cid) ([]byte, e return io.ReadAll(resp.Body) } - func (s *Server) GetDataSet(ctx context.Context, dataSetID int) (*DataSetData, error) { reqURL := fmt.Sprintf("%s/pdp/data-sets/%d", s.baseURL, dataSetID) req, err := http.NewRequestWithContext(ctx, "GET", reqURL, nil) @@ -445,7 +432,6 @@ func (s *Server) GetDataSet(ctx context.Context, dataSetID int) (*DataSetData, e return &data, nil } - // PullPieces issues POST /pdp/piece/pull. The endpoint is idempotent on // (service, sha256(extraData), dataSetId, recordKeeper); calling with the // same arguments returns the current status of an existing pull rather @@ -494,7 +480,6 @@ func (s *Server) PullPieces(ctx context.Context, opts PullPiecesOptions) (*PullP return &pullResp, nil } - // WaitForPullPieces re-POSTs the same pull request (idempotent) until the // aggregate status is complete or failed, or the timeout elapses. func (s *Server) WaitForPullPieces(ctx context.Context, opts PullPiecesOptions, timeout time.Duration) (*PullPiecesResponse, error) { @@ -521,7 +506,6 @@ func (s *Server) WaitForPullPieces(ctx context.Context, opts PullPiecesOptions, return last, nil } - func (s *Server) Ping(ctx context.Context) error { req, err := http.NewRequestWithContext(ctx, "GET", s.baseURL+"/pdp/ping", nil) if err != nil { From 0c0de3471a2b5ea1f52950a737ec14af945f87d6 Mon Sep 17 00:00:00 2001 From: Arkadiy Kukarkin Date: Wed, 29 Apr 2026 13:27:43 +0200 Subject: [PATCH 3/4] pdp: drop unused authHelper field from Server The authHelper field on Server has never been read by any method -- it was apparently reserved for an HTTP-level JWT auth path that never materialized. Curio's FWSS-mediated endpoints authenticate via the EIP-712 signature inside extraData (verified server-side via eth_call), not via Authorization headers. Default Curio deployments run NullAuth. Drops the field from Server, the param from NewServer, the param from Client.NewPDPServer, and the unused local in Client.Storage. Adds a doc comment on Server explaining the actual auth model. Tests adjusted. --- pdp/server.go | 13 +++++++++---- pdp/server_test.go | 7 ++----- synapse.go | 5 ++--- 3 files changed, 13 insertions(+), 12 deletions(-) diff --git a/pdp/server.go b/pdp/server.go index dac61aa..d243d0e 100644 --- a/pdp/server.go +++ b/pdp/server.go @@ -21,20 +21,25 @@ const ( defaultTimeout = 5 * time.Minute ) +// Server is a thin HTTP client for Curio's /pdp/* endpoints. It does not +// hold an EIP-712 signer: extraData blobs (build via AuthHelper + +// EncodeDataSetCreateData / EncodeAddPiecesExtraData and friends) are +// passed in pre-signed by callers, and Curio verifies them server-side via +// eth_call against PDPVerifier. There is no HTTP-level auth required by +// default Curio deployments (NullAuth); operators can opt into JWTAuth, +// but wiring that in is out of scope for this client. type Server struct { baseURL string - authHelper *AuthHelper httpClient *http.Client uploadClientMu sync.Mutex uploadClientVal *http.Client } -func NewServer(baseURL string, authHelper *AuthHelper) *Server { +func NewServer(baseURL string) *Server { baseURL = strings.TrimSuffix(baseURL, "/") return &Server{ - baseURL: baseURL, - authHelper: authHelper, + baseURL: baseURL, httpClient: &http.Client{ Timeout: defaultTimeout, }, diff --git a/pdp/server_test.go b/pdp/server_test.go index f509f40..831afe3 100644 --- a/pdp/server_test.go +++ b/pdp/server_test.go @@ -26,15 +26,12 @@ func testAuthHelper(t *testing.T) *AuthHelper { } func setupMockServer(t *testing.T, handler http.Handler) (*Server, *httptest.Server) { - authHelper := testAuthHelper(t) mockServer := httptest.NewServer(handler) t.Cleanup(mockServer.Close) - return NewServer(mockServer.URL, authHelper), mockServer + return NewServer(mockServer.URL), mockServer } func TestServer_NewServer(t *testing.T) { - authHelper := testAuthHelper(t) - tests := []struct { name string baseURL string @@ -59,7 +56,7 @@ func TestServer_NewServer(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - server := NewServer(tt.baseURL, authHelper) + server := NewServer(tt.baseURL) if server.BaseURL() != tt.expectedURL { t.Errorf("BaseURL() = %s, want %s", server.BaseURL(), tt.expectedURL) } diff --git a/synapse.go b/synapse.go index daff797..39ec7dd 100644 --- a/synapse.go +++ b/synapse.go @@ -135,7 +135,7 @@ func (c *Client) Storage() (*storage.Manager, error) { } authHelper := pdp.NewAuthHelper(c.privateKey, c.warmStorageAddress, big.NewInt(c.chainID)) - pdpServer := pdp.NewServer(c.providerURL, authHelper) + pdpServer := pdp.NewServer(c.providerURL) var opts []storage.ManagerOption if c.dataSetID != 0 { @@ -208,6 +208,5 @@ func (c *Client) NewAuthHelper() *pdp.AuthHelper { } func (c *Client) NewPDPServer(providerURL string) *pdp.Server { - authHelper := c.NewAuthHelper() - return pdp.NewServer(providerURL, authHelper) + return pdp.NewServer(providerURL) } From 2552fa9c0cd495db9913e690957ba55f9c521118 Mon Sep 17 00:00:00 2001 From: Arkadiy Kukarkin Date: Wed, 29 Apr 2026 13:32:20 +0200 Subject: [PATCH 4/4] pdp: gofmt + drop now-unused testAuthHelper After dropping authHelper from setupMockServer, testAuthHelper had no remaining callers, tripping staticcheck U1000. Remove it. Also runs gofmt -w on the files this PR touches (drift was pre-existing on master; CI's golangci-lint v2 default does not run gofmt, but it's still worth keeping clean). --- synapse.go | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/synapse.go b/synapse.go index 39ec7dd..23d368e 100644 --- a/synapse.go +++ b/synapse.go @@ -1,5 +1,3 @@ - - package synapse import ( @@ -18,7 +16,6 @@ import ( "github.com/ethereum/go-ethereum/ethclient" ) - type Options struct { PrivateKey *ecdsa.PrivateKey @@ -31,7 +28,6 @@ type Options struct { DataSetID int } - type Client struct { network Network chainID int64 @@ -45,7 +41,6 @@ type Client struct { dataSetID int } - func New(ctx context.Context, opts Options) (*Client, error) { if opts.PrivateKey == nil { return nil, fmt.Errorf("private key is required") @@ -100,7 +95,6 @@ func New(ctx context.Context, opts Options) (*Client, error) { return client, nil } - func (c *Client) Network() Network { return c.network } @@ -109,22 +103,18 @@ func (c *Client) ChainID() int64 { return c.chainID } - func (c *Client) Address() common.Address { return c.address } - func (c *Client) WarmStorageAddress() common.Address { return c.warmStorageAddress } - func (c *Client) EthClient() *ethclient.Client { return c.ethClient } - func (c *Client) Storage() (*storage.Manager, error) { if c.storageManager != nil { return c.storageManager, nil @@ -159,7 +149,6 @@ func (c *Client) Storage() (*storage.Manager, error) { return c.storageManager, nil } - // Costs returns a lazily-initialized costs service for computing storage // costs and deposit requirements. func (c *Client) Costs() (*costs.Service, error) { @@ -202,7 +191,6 @@ func (c *Client) Close() { } } - func (c *Client) NewAuthHelper() *pdp.AuthHelper { return pdp.NewAuthHelper(c.privateKey, c.warmStorageAddress, big.NewInt(c.chainID)) }