diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/HttpStorageOptions.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/HttpStorageOptions.java index dac8a010cd..2a19e7e8a2 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/HttpStorageOptions.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/HttpStorageOptions.java @@ -65,6 +65,7 @@ public class HttpStorageOptions extends StorageOptions { private transient RetryDependenciesAdapter retryDepsAdapter; private final BlobWriteSessionConfig blobWriteSessionConfig; + private final boolean enableHttpClientsMetrics; private transient OpenTelemetry openTelemetry; private HttpStorageOptions(Builder builder, StorageDefaults serviceDefaults) { @@ -75,6 +76,7 @@ private HttpStorageOptions(Builder builder, StorageDefaults serviceDefaults) { builder.storageRetryStrategy, defaults().getStorageRetryStrategy())); retryDepsAdapter = new RetryDependenciesAdapter(); blobWriteSessionConfig = builder.blobWriteSessionConfig; + enableHttpClientsMetrics = builder.enableHttpClientsMetrics; openTelemetry = builder.openTelemetry; } @@ -99,6 +101,15 @@ StorageRpc getStorageRpcV1() { @BetaApi @Override public OpenTelemetry getOpenTelemetry() { + if (openTelemetry == null || openTelemetry == OpenTelemetry.noop()) { + if (enableHttpClientsMetrics) { + openTelemetry = + OpenTelemetryBootstrappingUtils.getHttpOpenTelemetrySdk( + getProjectId(), getUniverseDomain(), getHost(), true); + } else { + return OpenTelemetry.noop(); + } + } return openTelemetry; } @@ -110,7 +121,11 @@ public HttpStorageOptions.Builder toBuilder() { @Override public int hashCode() { return Objects.hash( - retryAlgorithmManager, blobWriteSessionConfig, openTelemetry, baseHashCode()); + retryAlgorithmManager, + blobWriteSessionConfig, + enableHttpClientsMetrics, + openTelemetry, + baseHashCode()); } @Override @@ -124,6 +139,7 @@ public boolean equals(Object o) { HttpStorageOptions that = (HttpStorageOptions) o; return Objects.equals(retryAlgorithmManager, that.retryAlgorithmManager) && Objects.equals(blobWriteSessionConfig, that.blobWriteSessionConfig) + && Objects.equals(enableHttpClientsMetrics, that.enableHttpClientsMetrics) && Objects.equals(openTelemetry, that.openTelemetry) && this.baseEquals(that); } @@ -156,6 +172,7 @@ public static class Builder extends StorageOptions.Builder { private StorageRetryStrategy storageRetryStrategy; private BlobWriteSessionConfig blobWriteSessionConfig = HttpStorageDefaults.INSTANCE.getDefaultStorageWriterConfig(); + private boolean enableHttpClientsMetrics = false; private OpenTelemetry openTelemetry = HttpStorageDefaults.INSTANCE.getDefaultOpenTelemetry(); Builder() {} @@ -165,6 +182,7 @@ public static class Builder extends StorageOptions.Builder { HttpStorageOptions hso = (HttpStorageOptions) options; this.storageRetryStrategy = hso.retryAlgorithmManager.retryStrategy; this.blobWriteSessionConfig = hso.blobWriteSessionConfig; + this.enableHttpClientsMetrics = hso.enableHttpClientsMetrics; this.openTelemetry = hso.getOpenTelemetry(); } @@ -317,6 +335,18 @@ public HttpStorageOptions.Builder setOpenTelemetry(OpenTelemetry openTelemetry) this.openTelemetry = openTelemetry; return this; } + + /** + * Option for whether this client should emit internal Otel HTTP client metrics to + * Cloud Monitoring. To enable metric reporting, set this to true. False by default. + * + * @since 2.62.0 This new api is in preview and is subject to breaking changes. + */ + @BetaApi + public HttpStorageOptions.Builder setEnableHttpClientsMetrics(boolean enableHttpClientsMetrics) { + this.enableHttpClientsMetrics = enableHttpClientsMetrics; + return this; + } } public static final class HttpStorageDefaults extends StorageDefaults { @@ -360,6 +390,11 @@ public BlobWriteSessionConfig getDefaultStorageWriterConfig() { public OpenTelemetry getDefaultOpenTelemetry() { return OpenTelemetry.noop(); } + + @BetaApi + public boolean getDefaultEnableHttpClientsMetrics() { + return false; + } } /** diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/MultipartUploadClient.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/MultipartUploadClient.java index 4f80a7083d..2085ea33a5 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/MultipartUploadClient.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/MultipartUploadClient.java @@ -18,6 +18,7 @@ import com.google.api.core.BetaApi; import com.google.api.core.InternalExtensionOnly; +import com.google.cloud.storage.TransportCompatibility.Transport; import com.google.cloud.storage.multipartupload.model.AbortMultipartUploadRequest; import com.google.cloud.storage.multipartupload.model.AbortMultipartUploadResponse; import com.google.cloud.storage.multipartupload.model.CompleteMultipartUploadRequest; @@ -122,9 +123,12 @@ public abstract ListMultipartUploadsResponse listMultipartUploads( @BetaApi public static MultipartUploadClient create(MultipartUploadSettings config) { HttpStorageOptions options = config.getOptions(); - return new MultipartUploadClientImpl( - options.createRetrier(), - MultipartUploadHttpRequestManager.createFrom(options), - options.getRetryAlgorithmManager()); + MultipartUploadClient client = + new MultipartUploadClientImpl( + options.createRetrier(), + MultipartUploadHttpRequestManager.createFrom(options), + options.getRetryAlgorithmManager()); + return OtelMultipartUploadClientDecorator.decorate( + client, options.getOpenTelemetry(), Transport.HTTP); } } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/OpenTelemetryBootstrappingUtils.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/OpenTelemetryBootstrappingUtils.java index 53fea0b8b6..5b6e34054d 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/OpenTelemetryBootstrappingUtils.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/OpenTelemetryBootstrappingUtils.java @@ -28,6 +28,7 @@ import com.google.common.collect.Iterables; import io.grpc.ManagedChannelBuilder; import io.grpc.opentelemetry.GrpcOpenTelemetry; +import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.common.AttributesBuilder; @@ -124,7 +125,7 @@ static ChannelConfigurator enableGrpcMetrics( String metricServiceEndpoint = getCloudMonitoringEndpoint(endpoint, universeDomain); SdkMeterProvider provider = createMeterProvider( - metricServiceEndpoint, projectIdToUse, detectedAttributes, shouldSuppressExceptions); + metricServiceEndpoint, projectIdToUse, detectedAttributes, shouldSuppressExceptions, true); OpenTelemetrySdk openTelemetrySdk = OpenTelemetrySdk.builder().setMeterProvider(provider).build(); @@ -142,6 +143,49 @@ static ChannelConfigurator enableGrpcMetrics( return otelConfigurator.andThen(channelConfigurator); } + @NonNull + static OpenTelemetry getHttpOpenTelemetrySdk( + String projectId, + String universeDomain, + String host, + boolean shouldSuppressExceptions) { + GCPResourceProvider resourceProvider = new GCPResourceProvider(); + Attributes detectedAttributes = resourceProvider.getAttributes(); + + @Nullable + String detectedProjectId = detectedAttributes.get(AttributeKey.stringKey("cloud.account.id")); + if (projectId == null && detectedProjectId == null) { + log.warning( + "Unable to determine the Project ID in order to report metrics. No HTTP client metrics" + + " will be reported."); + return OpenTelemetry.noop(); + } + + String projectIdToUse = detectedProjectId == null ? projectId : detectedProjectId; + if (!projectIdToUse.equals(projectId)) { + log.warning( + "The Project ID configured for HTTP client metrics is " + + projectIdToUse + + ", but the Project ID of the storage client is " + + projectId + + ". Make sure that the service account in use has the required metric writing role " + + "(roles/monitoring.metricWriter) in the project " + + projectIdToUse + + ", or metrics will not be written."); + } + + String metricServiceEndpoint = getCloudMonitoringEndpoint(host, universeDomain); + SdkMeterProvider provider = + createMeterProvider( + metricServiceEndpoint, + projectIdToUse, + detectedAttributes, + shouldSuppressExceptions, + false); + + return OpenTelemetrySdk.builder().setMeterProvider(provider).build(); + } + @SuppressWarnings("rawtypes") // ManagedChannelBuilder @FunctionalInterface interface ChannelConfigurator extends ApiFunction { @@ -210,7 +254,8 @@ static SdkMeterProvider createMeterProvider( String metricServiceEndpoint, String projectIdToUse, Attributes detectedAttributes, - boolean shouldSuppressExceptions) { + boolean shouldSuppressExceptions, + boolean enableGrpcViews) { MonitoredResourceDescription monitoredResourceDescription = new MonitoredResourceDescription( @@ -233,11 +278,13 @@ static SdkMeterProvider createMeterProvider( // This replaces the dots with slashes in each metric, which is the format needed for this // monitored resource - for (String metric : - ImmutableList.copyOf(Iterables.concat(METRICS_TO_ENABLE, METRICS_ENABLED_BY_DEFAULT))) { - providerBuilder.registerView( - InstrumentSelector.builder().setName(metric).build(), - View.builder().setName(metric.replace(".", "/")).build()); + if (enableGrpcViews) { + for (String metric : + ImmutableList.copyOf(Iterables.concat(METRICS_TO_ENABLE, METRICS_ENABLED_BY_DEFAULT))) { + providerBuilder.registerView( + InstrumentSelector.builder().setName(metric).build(), + View.builder().setName(metric.replace(".", "/")).build()); + } } MetricExporter exporter = shouldSuppressExceptions @@ -274,18 +321,20 @@ static SdkMeterProvider createMeterProvider( .build()) .setResource(Resource.create(attributesBuilder.build())); - addHistogramView( - providerBuilder, latencyHistogramBoundaries(), "grpc/client/attempt/duration", "s"); - addHistogramView( - providerBuilder, - sizeHistogramBoundaries(), - "grpc/client/attempt/rcvd_total_compressed_message_size", - "By"); - addHistogramView( - providerBuilder, - sizeHistogramBoundaries(), - "grpc/client/attempt/sent_total_compressed_message_size", - "By"); + if (enableGrpcViews) { + addHistogramView( + providerBuilder, latencyHistogramBoundaries(), "grpc/client/attempt/duration", "s"); + addHistogramView( + providerBuilder, + sizeHistogramBoundaries(), + "grpc/client/attempt/rcvd_total_compressed_message_size", + "By"); + addHistogramView( + providerBuilder, + sizeHistogramBoundaries(), + "grpc/client/attempt/sent_total_compressed_message_size", + "By"); + } return providerBuilder.build(); } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/OtelMultipartUploadClientDecorator.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/OtelMultipartUploadClientDecorator.java new file mode 100644 index 0000000000..0c85e3e46f --- /dev/null +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/OtelMultipartUploadClientDecorator.java @@ -0,0 +1,359 @@ +/* + * Copyright 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage; + +import com.google.api.core.BetaApi; +import com.google.cloud.storage.TransportCompatibility.Transport; +import com.google.cloud.storage.multipartupload.model.AbortMultipartUploadRequest; +import com.google.cloud.storage.multipartupload.model.AbortMultipartUploadResponse; +import com.google.cloud.storage.multipartupload.model.CompleteMultipartUploadRequest; +import com.google.cloud.storage.multipartupload.model.CompleteMultipartUploadResponse; +import com.google.cloud.storage.multipartupload.model.CreateMultipartUploadRequest; +import com.google.cloud.storage.multipartupload.model.CreateMultipartUploadResponse; +import com.google.cloud.storage.multipartupload.model.ListMultipartUploadsRequest; +import com.google.cloud.storage.multipartupload.model.ListMultipartUploadsResponse; +import com.google.cloud.storage.multipartupload.model.ListPartsRequest; +import com.google.cloud.storage.multipartupload.model.ListPartsResponse; +import com.google.cloud.storage.multipartupload.model.UploadPartRequest; +import com.google.cloud.storage.multipartupload.model.UploadPartResponse; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Scope; +import io.opentelemetry.api.metrics.DoubleHistogram; +import io.opentelemetry.api.metrics.LongCounter; +import io.opentelemetry.api.metrics.LongHistogram; +import io.opentelemetry.api.metrics.Meter; + +/** + * A decorator for {@link MultipartUploadClient} that adds OpenTelemetry tracing and metrics. + * + * @since 2.62.0 This new api is in preview and is subject to breaking changes. + */ +@BetaApi +final class OtelMultipartUploadClientDecorator extends MultipartUploadClient { + + private final MultipartUploadClient delegate; + private final Tracer tracer; + + private final Meter meter; + private final DoubleHistogram createMultipartUploadLatency; + private final DoubleHistogram listPartsLatency; + private final DoubleHistogram abortMultipartUploadLatency; + private final DoubleHistogram completeMultipartUploadLatency; + private final DoubleHistogram uploadPartLatency; + private final DoubleHistogram listMultipartUploadsLatency; + + private final LongCounter uploadedBytes; + private final LongHistogram partSize; + + private OtelMultipartUploadClientDecorator( + MultipartUploadClient delegate, OpenTelemetry otel, Attributes baseAttributes) { + this.delegate = delegate; + this.tracer = + OtelStorageDecorator.TracerDecorator.decorate( + null, otel, baseAttributes, MultipartUploadClient.class.getName() + "/"); + + this.meter = otel.meterBuilder(MultipartUploadClient.class.getName()) + .build(); + + this.createMultipartUploadLatency = meter + .histogramBuilder("storage.multipart_upload.create_multipart_upload.latency") + .setDescription("Latency of Create Multipart Upload API calls") + .setUnit("ms") + .build(); + this.listPartsLatency = meter + .histogramBuilder("storage.multipart_upload.list_parts.latency") + .setDescription("Latency of List Parts API calls") + .setUnit("ms") + .build(); + this.abortMultipartUploadLatency = meter + .histogramBuilder("storage.multipart_upload.abort_multipart_upload.latency") + .setDescription("Latency of Abort Multipart Upload API calls") + .setUnit("ms") + .build(); + this.completeMultipartUploadLatency = meter + .histogramBuilder("storage.multipart_upload.complete_multipart_upload.latency") + .setDescription("Latency of Complete Multipart Upload API calls") + .setUnit("ms") + .build(); + this.uploadPartLatency = meter + .histogramBuilder("storage.multipart_upload.upload_part.latency") + .setDescription("Latency of Upload Part API calls") + .setUnit("ms") + .build(); + this.listMultipartUploadsLatency = meter + .histogramBuilder("storage.multipart_upload.list_multipart_uploads.latency") + .setDescription("Latency of List Multipart Uploads API calls") + .setUnit("ms") + .build(); + this.uploadedBytes = meter + .counterBuilder("storage.multipart_upload.uploaded_bytes") + .setDescription("Total bytes uploaded via Multipart Upload") + .setUnit("By") + .build(); + this.partSize = meter + .histogramBuilder("storage.multipart_upload.part_size") + .ofLongs() + .setDescription("Size of parts uploaded via Multipart Upload") + .setUnit("By") + .build(); + } + + @Override + public CreateMultipartUploadResponse createMultipartUpload(CreateMultipartUploadRequest request) { + long startTime = System.currentTimeMillis(); + Span span = + tracer + .spanBuilder("createMultipartUpload") + .setAttribute( + "gsutil.uri", String.format("gs://%s/%s", request.bucket(), request.key())) + .startSpan(); + Attributes metricAttributes = Attributes.builder() + .put("bucket", request.bucket()) + .put("key", request.key()) + .put("method", "createMultipartUpload") + .build(); + try (Scope ignore = span.makeCurrent()) { + CreateMultipartUploadResponse response = delegate.createMultipartUpload(request); + long duration = System.currentTimeMillis() - startTime; + createMultipartUploadLatency.record((double) duration, metricAttributes.toBuilder() + .put("status", "success") + .build()); + return response; + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + long duration = System.currentTimeMillis() - startTime; + createMultipartUploadLatency.record((double) duration, metricAttributes.toBuilder() + .put("status", "error") + .put("exception_type", t.getClass().getSimpleName()) + .build()); + throw t; + } finally { + span.end(); + } + } + + @Override + public ListPartsResponse listParts(ListPartsRequest request) { + long startTime = System.currentTimeMillis(); + Span span = + tracer + .spanBuilder("listParts") + .setAttribute( + "gsutil.uri", String.format("gs://%s/%s", request.bucket(), request.key())) + .startSpan(); + Attributes metricAttributes = Attributes.builder() + .put("bucket", request.bucket()) + .put("key", request.key()) + .put("method", "listParts") + .build(); + try (Scope ignore = span.makeCurrent()) { + ListPartsResponse response = delegate.listParts(request); + long duration = System.currentTimeMillis() - startTime; + listPartsLatency.record((double) duration, metricAttributes.toBuilder() + .put("status", "success") + .build()); + return response; + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + long duration = System.currentTimeMillis() - startTime; + listPartsLatency.record((double) duration, metricAttributes.toBuilder() + .put("status", "error") + .put("exception_type", t.getClass().getSimpleName()) + .build()); + throw t; + } finally { + span.end(); + } + } + + @Override + public AbortMultipartUploadResponse abortMultipartUpload(AbortMultipartUploadRequest request) { + long startTime = System.currentTimeMillis(); + Span span = + tracer + .spanBuilder("abortMultipartUpload") + .setAttribute( + "gsutil.uri", String.format("gs://%s/%s", request.bucket(), request.key())) + .startSpan(); + Attributes metricAttributes = Attributes.builder() + .put("bucket", request.bucket()) + .put("key", request.key()) + .put("method", "abortMultipartUpload") + .build(); + try (Scope ignore = span.makeCurrent()) { + AbortMultipartUploadResponse response = delegate.abortMultipartUpload(request); + long duration = System.currentTimeMillis() - startTime; + abortMultipartUploadLatency.record((double) duration, metricAttributes.toBuilder() + .put("status", "success") + .build()); + return response; + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + long duration = System.currentTimeMillis() - startTime; + abortMultipartUploadLatency.record((double) duration, metricAttributes.toBuilder() + .put("status", "error") + .put("exception_type", t.getClass().getSimpleName()) + .build()); + throw t; + } finally { + span.end(); + } + } + + @Override + public CompleteMultipartUploadResponse completeMultipartUpload( + CompleteMultipartUploadRequest request) { + long startTime = System.currentTimeMillis(); + Span span = + tracer + .spanBuilder("completeMultipartUpload") + .setAttribute( + "gsutil.uri", String.format("gs://%s/%s", request.bucket(), request.key())) + .startSpan(); + Attributes metricAttributes = Attributes.builder() + .put("bucket", request.bucket()) + .put("key", request.key()) + .put("method", "completeMultipartUpload") + .build(); + try (Scope ignore = span.makeCurrent()) { + CompleteMultipartUploadResponse response = delegate.completeMultipartUpload(request); + long duration = System.currentTimeMillis() - startTime; + completeMultipartUploadLatency.record((double) duration, metricAttributes.toBuilder() + .put("status", "success") + .build()); + return response; + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + long duration = System.currentTimeMillis() - startTime; + completeMultipartUploadLatency.record((double) duration, metricAttributes.toBuilder() + .put("status", "error") + .put("exception_type", t.getClass().getSimpleName()) + .build()); + throw t; + } finally { + span.end(); + } + } + + @Override + public UploadPartResponse uploadPart(UploadPartRequest request, RequestBody requestBody) { + long startTime = System.currentTimeMillis(); + + Span span = + tracer + .spanBuilder("uploadPart") + .setAttribute( + "gsutil.uri", String.format("gs://%s/%s", request.bucket(), request.key())) + .setAttribute("partNumber", request.partNumber()) + .startSpan(); + + Attributes metricAttributes = Attributes.builder() + .put("bucket", request.bucket()) + .put("key", request.key()) + .put("partNumber", request.partNumber()) + .put("method", "uploadPart") + .build(); + + try (Scope ignore = span.makeCurrent()) { + UploadPartResponse response = delegate.uploadPart(request, requestBody); + + long duration = System.currentTimeMillis() - startTime; + uploadPartLatency.record((double) duration, metricAttributes.toBuilder() + .put("status", "success") + .build()); + uploadedBytes.add(requestBody.getContent().getLength(), metricAttributes.toBuilder() + .put("status", "success") + .build()); + partSize.record(requestBody.getContent().getLength(), metricAttributes.toBuilder() + .put("status", "success") + .build()); + + return response; + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + + long duration = System.currentTimeMillis() - startTime; + uploadPartLatency.record((double) duration, metricAttributes.toBuilder() + .put("status", "error") + .put("exception_type", t.getClass().getSimpleName()) + .build()); + + throw t; + } finally { + span.end(); + } + } + + @Override + public ListMultipartUploadsResponse listMultipartUploads(ListMultipartUploadsRequest request) { + long startTime = System.currentTimeMillis(); + Span span = + tracer + .spanBuilder("listMultipartUploads") + .setAttribute("gsutil.uri", String.format("gs://%s/", request.bucket())) + .startSpan(); + Attributes metricAttributes = Attributes.builder() + .put("bucket", request.bucket()) + .put("method", "listMultipartUploads") + .build(); + try (Scope ignore = span.makeCurrent()) { + ListMultipartUploadsResponse response = delegate.listMultipartUploads(request); + long duration = System.currentTimeMillis() - startTime; + listMultipartUploadsLatency.record((double) duration, metricAttributes.toBuilder() + .put("status", "success") + .build()); + return response; + } catch (Throwable t) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + long duration = System.currentTimeMillis() - startTime; + listMultipartUploadsLatency.record((double) duration, metricAttributes.toBuilder() + .put("status", "error") + .put("exception_type", t.getClass().getSimpleName()) + .build()); + throw t; + } finally { + span.end(); + } + } + + static MultipartUploadClient decorate( + MultipartUploadClient delegate, OpenTelemetry otel, Transport transport) { + if (otel == OpenTelemetry.noop()) { + return delegate; + } + Attributes baseAttributes = + Attributes.builder() + .put("gcp.client.service", "Storage") + .put("gcp.client.version", StorageOptions.getDefaultInstance().getLibraryVersion()) + .put("gcp.client.repo", "googleapis/java-storage") + .put("gcp.client.artifact", "com.google.cloud:google-cloud-storage") + .put("rpc.system", "XML") + .put("service.name", "storage.googleapis.com") + .build(); + return new OtelMultipartUploadClientDecorator(delegate, otel, baseAttributes); + } +} diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/OtelStorageDecorator.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/OtelStorageDecorator.java index e418e5e106..291db00ae5 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/OtelStorageDecorator.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/OtelStorageDecorator.java @@ -1561,13 +1561,13 @@ static UnaryOperator retryContextDecorator(OpenTelemetry otel) { return String.format(Locale.US, "gs://%s/", bucket); } - private static final class TracerDecorator implements Tracer { + static final class TracerDecorator implements Tracer { @Nullable private final Context parentContextOverride; private final Tracer delegate; private final Attributes baseAttributes; private final String spanNamePrefix; - private TracerDecorator( + TracerDecorator( @Nullable Context parentContextOverride, Tracer delegate, Attributes baseAttributes, @@ -1578,7 +1578,7 @@ private TracerDecorator( this.spanNamePrefix = spanNamePrefix; } - private static TracerDecorator decorate( + static TracerDecorator decorate( @Nullable Context parentContextOverride, OpenTelemetry otel, Attributes baseAttributes, diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/HttpStorageOptionsOtelTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/HttpStorageOptionsOtelTest.java new file mode 100644 index 0000000000..2853144cce --- /dev/null +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/HttpStorageOptionsOtelTest.java @@ -0,0 +1,49 @@ +/* + * Copyright 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage; + +import static com.google.common.truth.Truth.assertThat; + +import io.opentelemetry.api.OpenTelemetry; +import org.junit.Test; + +public class HttpStorageOptionsOtelTest { + + @Test + public void testEnableHttpClientsMetrics() { + HttpStorageOptions options = + HttpStorageOptions.newBuilder() + .setProjectId("test-project") + .setEnableHttpClientsMetrics(true) + .build(); + + OpenTelemetry otel = options.getOpenTelemetry(); + assertThat(otel).isNotNull(); + assertThat(otel).isNotEqualTo(OpenTelemetry.noop()); + } + + @Test + public void testDefaultHttpClientsMetrics() { + HttpStorageOptions options = + HttpStorageOptions.newBuilder() + .setProjectId("test-project") + .build(); + + OpenTelemetry otel = options.getOpenTelemetry(); + assertThat(otel).isEqualTo(OpenTelemetry.noop()); + } +} diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGrpcMetricsTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGrpcMetricsTest.java index e71b9077dc..e18a20667e 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGrpcMetricsTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGrpcMetricsTest.java @@ -45,7 +45,7 @@ public void testGrpcMetrics() { "monitoring.googleapis.com:443", grpcStorageOptions.getProjectId(), detectedAttributes, - false); + true); /* * SDKMeterProvider doesn't expose the relevant fields we want to test, but they are present in diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITOpenTelemetryMPUTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITOpenTelemetryMPUTest.java new file mode 100644 index 0000000000..c6df27bdf5 --- /dev/null +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITOpenTelemetryMPUTest.java @@ -0,0 +1,291 @@ +/* + * Copyright 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.cloud.storage.TransportCompatibility.Transport; +import com.google.cloud.storage.it.runner.StorageITRunner; +import com.google.cloud.storage.it.runner.annotations.Backend; +import com.google.cloud.storage.it.runner.annotations.CrossRun; +import com.google.cloud.storage.it.runner.annotations.Inject; +import com.google.cloud.storage.it.runner.registry.Generator; +import com.google.cloud.storage.multipartupload.model.AbortMultipartUploadRequest; +import com.google.cloud.storage.multipartupload.model.CompleteMultipartUploadRequest; +import com.google.cloud.storage.multipartupload.model.CompletedMultipartUpload; +import com.google.cloud.storage.multipartupload.model.CompletedPart; +import com.google.cloud.storage.multipartupload.model.CreateMultipartUploadRequest; +import com.google.cloud.storage.multipartupload.model.CreateMultipartUploadResponse; +import com.google.cloud.storage.multipartupload.model.ListMultipartUploadsRequest; +import com.google.cloud.storage.multipartupload.model.ListPartsRequest; +import com.google.cloud.storage.multipartupload.model.UploadPartRequest; +import com.google.cloud.storage.multipartupload.model.UploadPartResponse; +import com.google.cloud.storage.otel.TestExporter; +import com.google.common.collect.ImmutableList; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Collection; +import java.util.List; +import org.junit.Test; +import org.junit.runner.RunWith; + +@RunWith(StorageITRunner.class) +@CrossRun( + backends = Backend.PROD, + transports = {Transport.HTTP}) +public final class ITOpenTelemetryMPUTest { + + @Inject public Storage storage; + @Inject public BucketInfo bucket; + @Inject public Generator generator; + + @Test + public void checkMPUTracing() throws Exception { + TestExporter exporter = new TestExporter(); + + OpenTelemetrySdk openTelemetrySdk = + OpenTelemetrySdk.builder() + .setTracerProvider( + SdkTracerProvider.builder() + .addSpanProcessor(SimpleSpanProcessor.create(exporter)) + .build()) + .build(); + + String objectName = generator.randomObjectName(); + runMpuOperations(openTelemetrySdk, objectName); + + List spans = exporter.getExportedSpans(); + assertThat(spans).hasSize(7); + + SpanData createSpan = spans.get(0); + assertThat(createSpan.getName()) + .isEqualTo("com.google.cloud.storage.MultipartUploadClient/createMultipartUpload"); + assertThat(createSpan.getAttributes().get(AttributeKey.stringKey("gsutil.uri"))) + .isEqualTo(String.format("gs://%s/%s", bucket.getName(), objectName)); + + SpanData uploadSpan = spans.get(1); + assertThat(uploadSpan.getName()) + .isEqualTo("com.google.cloud.storage.MultipartUploadClient/uploadPart"); + assertThat(uploadSpan.getAttributes().get(AttributeKey.stringKey("gsutil.uri"))) + .isEqualTo(String.format("gs://%s/%s", bucket.getName(), objectName)); + assertThat(uploadSpan.getAttributes().get(AttributeKey.longKey("partNumber"))).isEqualTo(1); + + SpanData listSpan = spans.get(2); + assertThat(listSpan.getName()) + .isEqualTo("com.google.cloud.storage.MultipartUploadClient/listParts"); + assertThat(listSpan.getAttributes().get(AttributeKey.stringKey("gsutil.uri"))) + .isEqualTo(String.format("gs://%s/%s", bucket.getName(), objectName)); + + SpanData completeSpan = spans.get(3); + assertThat(completeSpan.getName()) + .isEqualTo("com.google.cloud.storage.MultipartUploadClient/completeMultipartUpload"); + assertThat(completeSpan.getAttributes().get(AttributeKey.stringKey("gsutil.uri"))) + .isEqualTo(String.format("gs://%s/%s", bucket.getName(), objectName)); + + SpanData listUploadsSpan = spans.get(4); + assertThat(listUploadsSpan.getName()) + .isEqualTo("com.google.cloud.storage.MultipartUploadClient/listMultipartUploads"); + assertThat(listUploadsSpan.getAttributes().get(AttributeKey.stringKey("gsutil.uri"))) + .isEqualTo(String.format("gs://%s/", bucket.getName())); + + SpanData create2Span = spans.get(5); + assertThat(create2Span.getName()) + .isEqualTo("com.google.cloud.storage.MultipartUploadClient/createMultipartUpload"); + + SpanData abortSpan = spans.get(6); + assertThat(abortSpan.getName()) + .isEqualTo("com.google.cloud.storage.MultipartUploadClient/abortMultipartUpload"); + assertThat(abortSpan.getAttributes().get(AttributeKey.stringKey("gsutil.uri"))) + .isEqualTo(String.format("gs://%s/%s-abort", bucket.getName(), objectName)); + } + + @Test + public void checkMPUMetrics() throws Exception { + InMemoryMetricReader metricReader = InMemoryMetricReader.create(); + SdkMeterProvider meterProvider = + SdkMeterProvider.builder() + .registerMetricReader(metricReader) + .build(); + + OpenTelemetrySdk openTelemetrySdk = + OpenTelemetrySdk.builder().setMeterProvider(meterProvider).build(); + + String objectName = generator.randomObjectName(); + runMpuOperations(openTelemetrySdk, objectName); + + Collection metrics = metricReader.collectAllMetrics(); + System.err.println("Exported metrics count: " + metrics.size()); + metrics.forEach(m -> System.err.println("Metric: " + m.getName())); + + assertThat(metrics).hasSize(8); + + MetricData createMetric = + metrics.stream() + .filter(m -> m.getName().contains("create_multipart_upload")) + .findFirst() + .orElseThrow(() -> new RuntimeException("create_multipart_upload metric not found")); + assertThat(createMetric.getName()) + .isEqualTo("storage.multipart_upload.create_multipart_upload.latency"); + assertThat(createMetric.getData().getPoints()).hasSize(2); // 2 create calls + + MetricData uploadPartMetric = + metrics.stream() + .filter(m -> m.getName().contains("upload_part")) + .findFirst() + .orElseThrow(() -> new RuntimeException("upload_part metric not found")); + assertThat(uploadPartMetric.getName()) + .isEqualTo("storage.multipart_upload.upload_part.latency"); + assertThat(uploadPartMetric.getData().getPoints()).hasSize(1); + + MetricData completeMetric = + metrics.stream() + .filter(m -> m.getName().contains("complete_multipart_upload")) + .findFirst() + .orElseThrow(() -> new RuntimeException("complete_multipart_upload metric not found")); + assertThat(completeMetric.getName()) + .isEqualTo("storage.multipart_upload.complete_multipart_upload.latency"); + assertThat(completeMetric.getData().getPoints()).hasSize(1); + + MetricData listPartsMetric = + metrics.stream() + .filter(m -> m.getName().contains("list_parts")) + .findFirst() + .orElseThrow(() -> new RuntimeException("list_parts metric not found")); + assertThat(listPartsMetric.getName()) + .isEqualTo("storage.multipart_upload.list_parts.latency"); + assertThat(listPartsMetric.getData().getPoints()).hasSize(1); + + MetricData listUploadsMetric = + metrics.stream() + .filter(m -> m.getName().contains("list_multipart_uploads")) + .findFirst() + .orElseThrow(() -> new RuntimeException("list_multipart_uploads metric not found")); + assertThat(listUploadsMetric.getName()) + .isEqualTo("storage.multipart_upload.list_multipart_uploads.latency"); + assertThat(listUploadsMetric.getData().getPoints()).hasSize(1); + + MetricData abortMetric = + metrics.stream() + .filter(m -> m.getName().contains("abort_multipart_upload")) + .findFirst() + .orElseThrow(() -> new RuntimeException("abort_multipart_upload metric not found")); + assertThat(abortMetric.getName()) + .isEqualTo("storage.multipart_upload.abort_multipart_upload.latency"); + assertThat(abortMetric.getData().getPoints()).hasSize(1); + + MetricData uploadedBytesMetric = + metrics.stream() + .filter(m -> m.getName().contains("uploaded_bytes")) + .findFirst() + .orElseThrow(() -> new RuntimeException("uploaded_bytes metric not found")); + assertThat(uploadedBytesMetric.getName()) + .isEqualTo("storage.multipart_upload.uploaded_bytes"); + assertThat(uploadedBytesMetric.getData().getPoints()).hasSize(1); + + // "Hello, World!" is 13 bytes + assertThat(uploadedBytesMetric.getLongSumData().getPoints().iterator().next().getValue()) + .isEqualTo(13); + + MetricData partSizeMetric = + metrics.stream() + .filter(m -> m.getName().contains("part_size")) + .findFirst() + .orElseThrow(() -> new RuntimeException("part_size metric not found")); + assertThat(partSizeMetric.getName()) + .isEqualTo("storage.multipart_upload.part_size"); + assertThat(partSizeMetric.getData().getPoints()).hasSize(1); + assertThat(partSizeMetric.getHistogramData().getPoints().iterator().next().getSum()) + .isEqualTo(13); + } + + private void runMpuOperations(OpenTelemetrySdk openTelemetrySdk, String objectName) { + HttpStorageOptions httpStorageOptions = (HttpStorageOptions) storage.getOptions(); + StorageOptions storageOptions = + httpStorageOptions.toBuilder().setOpenTelemetry(openTelemetrySdk).build(); + + try (Storage storage = storageOptions.getService()) { + MultipartUploadClient mpuClient = + MultipartUploadClient.create( + MultipartUploadSettings.of((HttpStorageOptions) storage.getOptions())); + + CreateMultipartUploadResponse create = + mpuClient.createMultipartUpload( + CreateMultipartUploadRequest.builder() + .bucket(bucket.getName()) + .key(objectName) + .build()); + + byte[] data = "Hello, World!".getBytes(StandardCharsets.UTF_8); + RequestBody body = RequestBody.of(ByteBuffer.wrap(data)); + UploadPartResponse upload = + mpuClient.uploadPart( + UploadPartRequest.builder() + .bucket(bucket.getName()) + .key(objectName) + .uploadId(create.uploadId()) + .partNumber(1) + .build(), + body); + + mpuClient.listParts( + ListPartsRequest.builder() + .bucket(bucket.getName()) + .key(objectName) + .uploadId(create.uploadId()) + .build()); + + mpuClient.completeMultipartUpload( + CompleteMultipartUploadRequest.builder() + .bucket(bucket.getName()) + .key(objectName) + .uploadId(create.uploadId()) + .multipartUpload( + CompletedMultipartUpload.builder() + .parts( + ImmutableList.of( + CompletedPart.builder().partNumber(1).eTag(upload.eTag()).build())) + .build()) + .build()); + + mpuClient.listMultipartUploads( + ListMultipartUploadsRequest.builder().bucket(bucket.getName()).build()); + + CreateMultipartUploadResponse create2 = + mpuClient.createMultipartUpload( + CreateMultipartUploadRequest.builder() + .bucket(bucket.getName()) + .key(objectName + "-abort") + .build()); + mpuClient.abortMultipartUpload( + AbortMultipartUploadRequest.builder() + .bucket(bucket.getName()) + .key(objectName + "-abort") + .uploadId(create2.uploadId()) + .build()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } +}