feat(storage): support retries with sink uploads#236
feat(storage): support retries with sink uploads#236brianquinlan wants to merge 38 commits intogoogleapis:mainfrom
Conversation
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
There was a problem hiding this comment.
Code Review
This pull request significantly improves the resumable upload implementation for Google Cloud Storage by introducing a robust chunking mechanism that correctly handles 308 status codes, server-side range tracking, and automated retries. It also adds support for idempotency through generation matching and refactors internal buffering for better efficiency. Review feedback identifies a logic error in the finalization of uploads when receiving a 308 response, suggests using memory-efficient views instead of copying data during chunking, and points out a bug in the manual calculation of the Content-Length header for metadata.
| if (isClose && parsed != null && parsed == newEnd) { | ||
| throw StateError( | ||
| 'Server returned 308 but all bytes were sent and acknowledged.', | ||
| ); | ||
| } | ||
|
|
||
| if (parsed != null) { | ||
| loopExpectedByte = parsed; | ||
| currentExpectedByte = parsed; | ||
| final expectedEnd = _nextExpectedByte + chunk.length; | ||
| if (loopExpectedByte >= expectedEnd) { | ||
| return res; | ||
| } |
There was a problem hiding this comment.
The current logic throws a StateError or returns a 308 response when a chunk upload is completed but not yet finalized by the server (i.e., when isClose is true). For resumable uploads, the final request must return a 2xx status code containing the object metadata. If the server returns a 308 even after all bytes are acknowledged, the client should proceed to send a finalization request (using bytes */total) instead of throwing an error or returning the 308 response.
if (parsed != null) {
loopExpectedByte = parsed;
currentExpectedByte = parsed;
final expectedEnd = _nextExpectedByte + chunk.length;
if (!isClose && loopExpectedByte >= expectedEnd) {
return res;
}
}| /// > not assume that the server received all bytes sent in any given request. | ||
| int? _parseRange(String? rangeHeader) { | ||
| if (rangeHeader == null) return null; | ||
| final match = RegExp(r'bytes=0-(\d+)').firstMatch(rangeHeader); |
There was a problem hiding this comment.
[optional] Gemini tells me that GCS does always use 0- ranges (I didn't look it up to confirm), but I wonder it it makes sense to be target the more general spec anyway?
| final match = RegExp(r'bytes=0-(\d+)').firstMatch(rangeHeader); | |
| final match = RegExp(r'bytes=\d+-(\d+)').firstMatch(rangeHeader); |
There was a problem hiding this comment.
This header is GCS' acknowledgment of the bytes that it has received. The documentation doesn't describe how to interpret the lower value
Repeat the above steps for each remaining chunk of data that you want to upload, using the upper value contained in the Range header of each response to determine where to start each successive chunk; you should not assume that the server received all bytes sent in any given request.
If the lower value is non-zero then I treat it like the "Range" header was absent, which means that:
- in a check request, assume that all bytes must be resent
- in an upload request, assume that the state of the upload is unknown and do a check request
| if (statusRes.statusCode == 308) { | ||
| loopExpectedByte = _parseRange(statusRes.headers['range']) ?? 0; | ||
| } else if (statusRes.statusCode == 200 || | ||
| statusRes.statusCode == 201) { | ||
| return statusRes; | ||
| } else { | ||
| throw ServiceException.fromHttpResponse(statusRes, statusRes.body); | ||
| } |
There was a problem hiding this comment.
[optional] Since 2 of 3 conditions in the chain leave the function I think it's bit easer to read as guard clauses.
| if (statusRes.statusCode == 308) { | |
| loopExpectedByte = _parseRange(statusRes.headers['range']) ?? 0; | |
| } else if (statusRes.statusCode == 200 || | |
| statusRes.statusCode == 201) { | |
| return statusRes; | |
| } else { | |
| throw ServiceException.fromHttpResponse(statusRes, statusRes.body); | |
| } | |
| if (statusRes.statusCode == 200 || statusRes.statusCode == 201) { | |
| return statusRes; | |
| } | |
| if (statusRes.statusCOde != 308) { | |
| throw ServiceException.fromHttpResponse(statusRes, statusRes.body); | |
| } | |
| loopExpectedByte = _parseRange(statusRes.headers['range']) ?? 0; |
There was a problem hiding this comment.
I didn't follow your exact advice but I restructured this a bit to be more (IMO) readable.
| newSize = requiredCapacity; | ||
| Future<http.Response> _uploadChunk( | ||
| Uint8List chunk, | ||
| bool isClose, |
There was a problem hiding this comment.
[nit] What is the specific meaning of "close" here? isClosed? isClosingChunk? isNearby?
There was a problem hiding this comment.
I changed this to isLast with a note that this is the final chunk to upload.
| FutureOr<http.Client> client, | ||
| Uri url, { | ||
| ObjectMetadata? metadata, | ||
| bool isIdempotent = false, |
There was a problem hiding this comment.
[nit] Can you add a doc comment on the method that describes how to use this argument?
| if (res.statusCode < 200 || res.statusCode >= 300) { | ||
| throw ServiceException.fromHttpResponse(res, res.body); | ||
| } |
There was a problem hiding this comment.
[optional] I like how patterns read here.
| if (res.statusCode < 200 || res.statusCode >= 300) { | |
| throw ServiceException.fromHttpResponse(res, res.body); | |
| } | |
| if (res.statusCode case < 200 || >= 300) { | |
| throw ServiceException.fromHttpResponse(res, res.body); | |
| } |
There was a problem hiding this comment.
Minor disagree on that one ;-)
| final mockClient = MockClient((request) async { | ||
| count++; | ||
| if (count == 1 && request.method == 'POST') { | ||
| // Start resumable upload. | ||
| return http.Response( | ||
| '', | ||
| 200, | ||
| headers: {'location': 'http://example.com/location'}, | ||
| ); | ||
| } else if (count == 2 && request.method == 'PUT') { | ||
| // First chunk upload works. | ||
| return http.Response('', 308, headers: {'range': 'bytes=0-262143'}); | ||
| } else if (count == 3 && request.method == 'PUT') { | ||
| // Second chunk upload fails. | ||
| throw http.ClientException('message'); | ||
| } else if (count == 4 && request.method == 'PUT') { | ||
| // Status check returns a range smaller than what was acknowledged | ||
| // previously. | ||
| return http.Response('', 308, headers: {'range': 'bytes=0-1000'}); | ||
| } else { | ||
| throw StateError( | ||
| 'Unexpected call (count: $count, method: ${request.method}, ' | ||
| 'uri: ${request.url})', | ||
| ); | ||
| } | ||
| }); |
There was a problem hiding this comment.
[optional] The count behavior, especially in the second tests where it's used in an expectation, feels a little more indirect than I like for tests.
What about something like a List<(String, http.Response Function()> like
final responses = <(String, http.Response Function())>[
('POST', () => http.Response('', 200, headers: {'location': 'http://example.com/location'})),
('PUT', () => http.Response('', 308, headers: {'range': 'bytes=0-262143'})),
('PUT', () => throw http.ClientException('message')),
('PUT', () => http.Response('', 308, headers: {'range': 'bytes=0-1000'})),
];Use responses.removeAt(0) to know what method to check for, and use the callback to respond. The expect(count, 6) can become something like expect(responses, isEmpty);, which doesn't carry all the semantics of the test in the expectation, but I think will be a little more clear than a failure like "expected: 6, actual: 5"
There was a problem hiding this comment.
Done. I'm neutral on whether this is better.
Co-authored-by: Nate Bosch <nbosch1@gmail.com>
Co-authored-by: Nate Bosch <nbosch1@gmail.com>
Co-authored-by: Nate Bosch <nbosch1@gmail.com>
Co-authored-by: Nate Bosch <nbosch1@gmail.com>
…quinlan/google-cloud-dart into upload_testbench_plus_test
Fixes #218
Adds
ifGenerationMatchandretryparameters to uploadObjectFromSink.Part of the complexity of the retry strategy is that it is possible for Google Cloud Storage to only acknowledge part of the upload and uploads must be done in 256KB chunks.