Skip to content

feat(storage): support retries with sink uploads#236

Open
brianquinlan wants to merge 38 commits intogoogleapis:mainfrom
brianquinlan:upload_testbench_plus_test
Open

feat(storage): support retries with sink uploads#236
brianquinlan wants to merge 38 commits intogoogleapis:mainfrom
brianquinlan:upload_testbench_plus_test

Conversation

@brianquinlan
Copy link
Copy Markdown
Contributor

@brianquinlan brianquinlan commented Apr 14, 2026

Fixes #218

Adds ifGenerationMatch and retry parameters to uploadObjectFromSink.

Part of the complexity of the retry strategy is that it is possible for Google Cloud Storage to only acknowledge part of the upload and uploads must be done in 256KB chunks.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request significantly improves the resumable upload implementation for Google Cloud Storage by introducing a robust chunking mechanism that correctly handles 308 status codes, server-side range tracking, and automated retries. It also adds support for idempotency through generation matching and refactors internal buffering for better efficiency. Review feedback identifies a logic error in the finalization of uploads when receiving a 308 response, suggests using memory-efficient views instead of copying data during chunking, and points out a bug in the manual calculation of the Content-Length header for metadata.

Comment on lines +207 to +219
if (isClose && parsed != null && parsed == newEnd) {
throw StateError(
'Server returned 308 but all bytes were sent and acknowledged.',
);
}

if (parsed != null) {
loopExpectedByte = parsed;
currentExpectedByte = parsed;
final expectedEnd = _nextExpectedByte + chunk.length;
if (loopExpectedByte >= expectedEnd) {
return res;
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The current logic throws a StateError or returns a 308 response when a chunk upload is completed but not yet finalized by the server (i.e., when isClose is true). For resumable uploads, the final request must return a 2xx status code containing the object metadata. If the server returns a 308 even after all bytes are acknowledged, the client should proceed to send a finalization request (using bytes */total) instead of throwing an error or returning the 308 response.

          if (parsed != null) {
            loopExpectedByte = parsed;
            currentExpectedByte = parsed;
            final expectedEnd = _nextExpectedByte + chunk.length;
            if (!isClose && loopExpectedByte >= expectedEnd) {
              return res;
            }
          }

Comment thread pkgs/google_cloud_storage/lib/src/resumeable_upload.dart Outdated
Comment thread pkgs/google_cloud_storage/lib/src/resumeable_upload.dart Outdated
@brianquinlan brianquinlan requested a review from natebosch April 14, 2026 21:11
Comment thread pkgs/google_cloud_storage/lib/src/client.dart Outdated
Comment thread pkgs/google_cloud_storage/lib/src/client.dart Outdated
/// > not assume that the server received all bytes sent in any given request.
int? _parseRange(String? rangeHeader) {
if (rangeHeader == null) return null;
final match = RegExp(r'bytes=0-(\d+)').firstMatch(rangeHeader);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[optional] Gemini tells me that GCS does always use 0- ranges (I didn't look it up to confirm), but I wonder it it makes sense to be target the more general spec anyway?

Suggested change
final match = RegExp(r'bytes=0-(\d+)').firstMatch(rangeHeader);
final match = RegExp(r'bytes=\d+-(\d+)').firstMatch(rangeHeader);

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This header is GCS' acknowledgment of the bytes that it has received. The documentation doesn't describe how to interpret the lower value

Repeat the above steps for each remaining chunk of data that you want to upload, using the upper value contained in the Range header of each response to determine where to start each successive chunk; you should not assume that the server received all bytes sent in any given request.

If the lower value is non-zero then I treat it like the "Range" header was absent, which means that:

  1. in a check request, assume that all bytes must be resent
  2. in an upload request, assume that the state of the upload is unknown and do a check request

Comment thread pkgs/google_cloud_storage/lib/src/resumeable_upload.dart Outdated
Comment thread pkgs/google_cloud_storage/lib/src/resumeable_upload.dart
Comment on lines +129 to +136
if (statusRes.statusCode == 308) {
loopExpectedByte = _parseRange(statusRes.headers['range']) ?? 0;
} else if (statusRes.statusCode == 200 ||
statusRes.statusCode == 201) {
return statusRes;
} else {
throw ServiceException.fromHttpResponse(statusRes, statusRes.body);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[optional] Since 2 of 3 conditions in the chain leave the function I think it's bit easer to read as guard clauses.

Suggested change
if (statusRes.statusCode == 308) {
loopExpectedByte = _parseRange(statusRes.headers['range']) ?? 0;
} else if (statusRes.statusCode == 200 ||
statusRes.statusCode == 201) {
return statusRes;
} else {
throw ServiceException.fromHttpResponse(statusRes, statusRes.body);
}
if (statusRes.statusCode == 200 || statusRes.statusCode == 201) {
return statusRes;
}
if (statusRes.statusCOde != 308) {
throw ServiceException.fromHttpResponse(statusRes, statusRes.body);
}
loopExpectedByte = _parseRange(statusRes.headers['range']) ?? 0;

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't follow your exact advice but I restructured this a bit to be more (IMO) readable.

newSize = requiredCapacity;
Future<http.Response> _uploadChunk(
Uint8List chunk,
bool isClose,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] What is the specific meaning of "close" here? isClosed? isClosingChunk? isNearby?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed this to isLast with a note that this is the final chunk to upload.

FutureOr<http.Client> client,
Uri url, {
ObjectMetadata? metadata,
bool isIdempotent = false,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] Can you add a doc comment on the method that describes how to use this argument?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Comment on lines +340 to +342
if (res.statusCode < 200 || res.statusCode >= 300) {
throw ServiceException.fromHttpResponse(res, res.body);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[optional] I like how patterns read here.

Suggested change
if (res.statusCode < 200 || res.statusCode >= 300) {
throw ServiceException.fromHttpResponse(res, res.body);
}
if (res.statusCode case < 200 || >= 300) {
throw ServiceException.fromHttpResponse(res, res.body);
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor disagree on that one ;-)

Comment on lines +587 to +612
final mockClient = MockClient((request) async {
count++;
if (count == 1 && request.method == 'POST') {
// Start resumable upload.
return http.Response(
'',
200,
headers: {'location': 'http://example.com/location'},
);
} else if (count == 2 && request.method == 'PUT') {
// First chunk upload works.
return http.Response('', 308, headers: {'range': 'bytes=0-262143'});
} else if (count == 3 && request.method == 'PUT') {
// Second chunk upload fails.
throw http.ClientException('message');
} else if (count == 4 && request.method == 'PUT') {
// Status check returns a range smaller than what was acknowledged
// previously.
return http.Response('', 308, headers: {'range': 'bytes=0-1000'});
} else {
throw StateError(
'Unexpected call (count: $count, method: ${request.method}, '
'uri: ${request.url})',
);
}
});
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[optional] The count behavior, especially in the second tests where it's used in an expectation, feels a little more indirect than I like for tests.

What about something like a List<(String, http.Response Function()> like

final responses = <(String, http.Response Function())>[
  ('POST', () => http.Response('', 200, headers: {'location': 'http://example.com/location'})),
  ('PUT',  () => http.Response('', 308, headers: {'range': 'bytes=0-262143'})),
  ('PUT',  () => throw http.ClientException('message')),
  ('PUT',  () => http.Response('', 308, headers: {'range': 'bytes=0-1000'})),
];

Use responses.removeAt(0) to know what method to check for, and use the callback to respond. The expect(count, 6) can become something like expect(responses, isEmpty);, which doesn't carry all the semantics of the test in the expectation, but I think will be a little more clear than a failure like "expected: 6, actual: 5"

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. I'm neutral on whether this is better.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

uploadObjectFromSink support resending data on non-matching range headers

2 participants