From e140109d1939377423273bcde4cc91dc91d3bc2a Mon Sep 17 00:00:00 2001 From: Chandra Date: Tue, 10 Feb 2026 17:19:00 +0000 Subject: [PATCH 01/10] feat: add time based benchmarks --- .../microbenchmarks/time_based/conftest.py | 85 +++++++++ .../time_based/reads/config.py | 102 +++++++++++ .../time_based/reads/config.yaml | 26 +++ .../time_based/reads/parameters.py | 22 +++ .../time_based/reads/test_reads.py | 164 ++++++++++++++++++ 5 files changed, 399 insertions(+) create mode 100644 tests/perf/microbenchmarks/time_based/conftest.py create mode 100644 tests/perf/microbenchmarks/time_based/reads/config.py create mode 100644 tests/perf/microbenchmarks/time_based/reads/config.yaml create mode 100644 tests/perf/microbenchmarks/time_based/reads/parameters.py create mode 100644 tests/perf/microbenchmarks/time_based/reads/test_reads.py diff --git a/tests/perf/microbenchmarks/time_based/conftest.py b/tests/perf/microbenchmarks/time_based/conftest.py new file mode 100644 index 000000000..b60b64610 --- /dev/null +++ b/tests/perf/microbenchmarks/time_based/conftest.py @@ -0,0 +1,85 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import pytest +import os +import multiprocessing +import logging +from google.cloud import storage + +_OBJECT_NAME_PREFIX = "time_based_tests" + + +# def _upload_worker(args): +# bucket_name, object_name, object_size = args +# storage_client = storage.Client() +# bucket = storage_client.bucket(bucket_name) +# blob = bucket.blob(object_name) + +# try: +# blob.reload() +# if blob.size >= object_size: +# logging.info(f"Object {object_name} already exists and has the required size.") +# return object_name, object_size +# except Exception: +# pass + +# logging.info(f"Creating object {object_name} of size {object_size} bytes.") +# # For large objects, it's better to upload in chunks. +# # Using urandom is slow, so for large objects, we will write the same chunk over and over. +# chunk_size = 100 * 1024 * 1024 # 100 MiB +# data_chunk = os.urandom(chunk_size) +# num_chunks = object_size // chunk_size +# remaining_bytes = object_size % chunk_size + +# from io import BytesIO +# with BytesIO() as f: +# for _ in range(num_chunks): +# f.write(data_chunk) +# if remaining_bytes > 0: +# f.write(data_chunk[:remaining_bytes]) + +# f.seek(0) +# blob.upload_from_file(f, size=object_size) + +# logging.info(f"Finished creating object {object_name}.") +# return object_name, object_size + + +# def _create_files(num_files, bucket_name, object_size): +# """ +# Create/Upload objects for benchmarking and return a list of their names. +# """ +# object_names = [f"{_OBJECT_NAME_PREFIX}_{i}" for i in range(num_files)] + +# args_list = [ +# (bucket_name, object_names[i], object_size) for i in range(num_files) +# ] + +# # Don't use a pool to avoid contention writing the same objects. +# # The check for existence should make this fast on subsequent runs. +# results = [_upload_worker(arg) for arg in args_list] + +# return [r[0] for r in results] + + +@pytest.fixture +def workload_params(request): + params = request.param + files_names = [f'fio-go_storage_fio.0.{i}' for i in range(0, params.num_processes)] + # files_names = _create_files( + # params.num_processes, # One file per process + # params.bucket_name, + # params.file_size_bytes, + # ) + return params, files_names diff --git a/tests/perf/microbenchmarks/time_based/reads/config.py b/tests/perf/microbenchmarks/time_based/reads/config.py new file mode 100644 index 000000000..bb6bf4729 --- /dev/null +++ b/tests/perf/microbenchmarks/time_based/reads/config.py @@ -0,0 +1,102 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import itertools +import os +from typing import Dict, List + +import yaml + +try: + from tests.perf.microbenchmarks.time_based.reads.parameters import ( + TimeBasedReadParameters, + ) +except ModuleNotFoundError: + from reads.parameters import TimeBasedReadParameters + + +def _get_params() -> Dict[str, List[TimeBasedReadParameters]]: + """Generates a dictionary of benchmark parameters for time based read operations.""" + params: Dict[str, List[TimeBasedReadParameters]] = {} + config_path = os.path.join(os.path.dirname(__file__), "config.yaml") + with open(config_path, "r") as f: + config = yaml.safe_load(f) + + common_params = config["common"] + bucket_types = common_params["bucket_types"] + file_sizes_mib = common_params["file_sizes_mib"] + chunk_sizes_mib = common_params["chunk_sizes_mib"] + rounds = common_params["rounds"] + duration = common_params["duration"] + warmup_duration = common_params["warmup_duration"] + + bucket_map = { + "zonal": os.environ.get( + "DEFAULT_RAPID_ZONAL_BUCKET", + config["defaults"]["DEFAULT_RAPID_ZONAL_BUCKET"], + ), + "regional": os.environ.get( + "DEFAULT_STANDARD_BUCKET", config["defaults"]["DEFAULT_STANDARD_BUCKET"] + ), + } + + for workload in config["workload"]: + workload_name = workload["name"] + params[workload_name] = [] + pattern = workload["pattern"] + processes = workload["processes"] + coros = workload["coros"] + + # Create a product of all parameter combinations + product = itertools.product( + bucket_types, + file_sizes_mib, + chunk_sizes_mib, + processes, + coros, + ) + + for ( + bucket_type, + file_size_mib, + chunk_size_mib, + num_processes, + num_coros, + ) in product: + file_size_bytes = file_size_mib * 1024 * 1024 + chunk_size_bytes = chunk_size_mib * 1024 * 1024 + bucket_name = bucket_map[bucket_type] + + num_files = num_processes * num_coros + + # Create a descriptive name for the parameter set + name = f"{pattern}_{bucket_type}_{num_processes}p_{file_size_mib}MiB_{chunk_size_mib}MiB" + + params[workload_name].append( + TimeBasedReadParameters( + name=name, + workload_name=workload_name, + pattern=pattern, + bucket_name=bucket_name, + bucket_type=bucket_type, + num_coros=num_coros, + num_processes=num_processes, + num_files=num_files, + rounds=rounds, + chunk_size_bytes=chunk_size_bytes, + file_size_bytes=file_size_bytes, + duration=duration, + warmup_duration=warmup_duration, + ) + ) + return params diff --git a/tests/perf/microbenchmarks/time_based/reads/config.yaml b/tests/perf/microbenchmarks/time_based/reads/config.yaml new file mode 100644 index 000000000..16d16b77f --- /dev/null +++ b/tests/perf/microbenchmarks/time_based/reads/config.yaml @@ -0,0 +1,26 @@ +common: + bucket_types: + - "zonal" + file_sizes_mib: + - 10240 # 10GiB + chunk_sizes_mib: [1, 16, 100, 200] # 16MiB + rounds: 1 + duration: 60 # seconds + warmup_duration: 5 # seconds + +workload: + ############# multi process multi coroutine ######### + - name: "read_seq_multi_process" + pattern: "seq" + coros: [1] + processes: [1, 48] + + + - name: "read_rand_multi_process" + pattern: "rand" + coros: [1] + processes: [1, 48] + +defaults: + DEFAULT_RAPID_ZONAL_BUCKET: "chandrasiri-benchmarks-zb" + DEFAULT_STANDARD_BUCKET: "chandrasiri-benchmarks-rb" diff --git a/tests/perf/microbenchmarks/time_based/reads/parameters.py b/tests/perf/microbenchmarks/time_based/reads/parameters.py new file mode 100644 index 000000000..d35150a23 --- /dev/null +++ b/tests/perf/microbenchmarks/time_based/reads/parameters.py @@ -0,0 +1,22 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from dataclasses import dataclass +from tests.perf.microbenchmarks.parameters import IOBenchmarkParameters + + +@dataclass +class TimeBasedReadParameters(IOBenchmarkParameters): + pattern: str + duration: int + warmup_duration: int diff --git a/tests/perf/microbenchmarks/time_based/reads/test_reads.py b/tests/perf/microbenchmarks/time_based/reads/test_reads.py new file mode 100644 index 000000000..60510ee79 --- /dev/null +++ b/tests/perf/microbenchmarks/time_based/reads/test_reads.py @@ -0,0 +1,164 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Microbenchmarks for time-based Google Cloud Storage read operations.""" + +import time +import asyncio +import random +import logging +import os +import multiprocessing + +import pytest + +from google.cloud.storage.asyncio.async_grpc_client import AsyncGrpcClient +from google.cloud.storage.asyncio.async_multi_range_downloader import ( + AsyncMultiRangeDownloader, +) +from tests.perf.microbenchmarks._utils import publish_benchmark_extra_info +from tests.perf.microbenchmarks.conftest import ( + publish_resource_metrics, +) +from io import BytesIO +import tests.perf.microbenchmarks.time_based.reads.config as config + +all_params = config._get_params() + + +async def create_client(): + """Initializes async client and gets the current event loop.""" + return AsyncGrpcClient() + + +# --- Global Variables for Worker Process --- +worker_loop = None +worker_client = None +worker_json_client = None +CORE_OFFSET = 20 # Start pinning cores from 20 + + +def _worker_init(bucket_type): + """Initializes a persistent event loop and client for each worker process.""" + os.sched_setaffinity(0, {i for i in range(20, 180)}) # Pin to cores 20-189 + global worker_loop, worker_client, worker_json_client + if bucket_type == "zonal": + worker_loop = asyncio.new_event_loop() + asyncio.set_event_loop(worker_loop) + worker_client = worker_loop.run_until_complete(create_client()) + else: # regional + from google.cloud import storage + + worker_json_client = storage.Client() + + +async def _download_time_based_async(client, filename, params): + total_bytes_downloaded = 0 + + + mrd = AsyncMultiRangeDownloader(client, params.bucket_name, filename) + await mrd.open() + + offset = 0 + is_warming_up = True + start_time = time.monotonic() + warmup_end_time = start_time + params.warmup_duration + test_end_time = warmup_end_time + params.duration + + while time.monotonic() < test_end_time: + current_time = time.monotonic() + if is_warming_up and current_time >= warmup_end_time: + is_warming_up = False + total_bytes_downloaded = 0 # Reset counter after warmup + + if params.pattern == "rand": + offset = random.randint(0, params.file_size_bytes - params.chunk_size_bytes) + + buffer = BytesIO() + await mrd.download_ranges([(offset, params.chunk_size_bytes, buffer)]) + + if not is_warming_up: + total_bytes_downloaded += params.chunk_size_bytes + + if params.pattern == "seq": + offset += params.chunk_size_bytes + if offset + params.chunk_size_bytes > params.file_size_bytes: + offset = 0 # Reset offset if end of file is reached + + await mrd.close() + return total_bytes_downloaded + + +def _download_files_worker(process_idx, filename, params, bucket_type): + + if bucket_type == "zonal": + return worker_loop.run_until_complete( + _download_time_based_async(worker_client, filename, params) + ) + else: # regional - JSON API not implemented for this test + raise NotImplementedError("JSON API not implemented for time-based tests") + + +def download_files_mp_mc_wrapper(pool, files_names, params, bucket_type): + args = [ + (i, files_names[i], params, bucket_type) for i in range(len(files_names)) + ] + + results = pool.starmap(_download_files_worker, args) + return sum(results) + + +@pytest.mark.parametrize( + "workload_params", + all_params["read_seq_multi_process"] + all_params["read_rand_multi_process"], + indirect=True, + ids=lambda p: p.name, +) +def test_downloads_multi_proc_multi_coro( + benchmark, storage_client, monitor, workload_params +): + params, files_names = workload_params + logging.info(f"num files: {len(files_names)}") + + ctx = multiprocessing.get_context("spawn") + pool = ctx.Pool( + processes=params.num_processes, + initializer=_worker_init, + initargs=(params.bucket_type,), + ) + + total_bytes_downloaded = 0 + + def target_wrapper(*args, **kwargs): + nonlocal total_bytes_downloaded + total_bytes_downloaded = download_files_mp_mc_wrapper(pool, *args, **kwargs) + # This benchmark doesn't return per-operation times, so we return a dummy value + return [0] + + try: + with monitor() as m: + benchmark.pedantic( + target=target_wrapper, + iterations=1, + rounds=params.rounds, + args=(files_names, params, params.bucket_type), + ) + finally: + pool.close() + pool.join() + throughput_mib_s = (total_bytes_downloaded / params.duration) / (1024 * 1024) + benchmark.extra_info["throughput_mib_s"] = f"{throughput_mib_s:.2f}" + print(f"Throughput: {throughput_mib_s:.2f} MiB/s") + publish_benchmark_extra_info(benchmark, params) + publish_resource_metrics(benchmark, m) + From ed67fcce49be43684e215bd5b8fda5aae0065e7c Mon Sep 17 00:00:00 2001 From: Chandra Date: Wed, 11 Feb 2026 05:44:37 +0000 Subject: [PATCH 02/10] take sizes in kib --- tests/perf/microbenchmarks/time_based/reads/config.py | 10 +++++----- .../perf/microbenchmarks/time_based/reads/config.yaml | 8 ++++---- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/tests/perf/microbenchmarks/time_based/reads/config.py b/tests/perf/microbenchmarks/time_based/reads/config.py index bb6bf4729..ac111faf2 100644 --- a/tests/perf/microbenchmarks/time_based/reads/config.py +++ b/tests/perf/microbenchmarks/time_based/reads/config.py @@ -35,7 +35,7 @@ def _get_params() -> Dict[str, List[TimeBasedReadParameters]]: common_params = config["common"] bucket_types = common_params["bucket_types"] file_sizes_mib = common_params["file_sizes_mib"] - chunk_sizes_mib = common_params["chunk_sizes_mib"] + chunk_sizes_kib = common_params["chunk_sizes_kib"] rounds = common_params["rounds"] duration = common_params["duration"] warmup_duration = common_params["warmup_duration"] @@ -61,7 +61,7 @@ def _get_params() -> Dict[str, List[TimeBasedReadParameters]]: product = itertools.product( bucket_types, file_sizes_mib, - chunk_sizes_mib, + chunk_sizes_kib, processes, coros, ) @@ -69,18 +69,18 @@ def _get_params() -> Dict[str, List[TimeBasedReadParameters]]: for ( bucket_type, file_size_mib, - chunk_size_mib, + chunk_size_kib, num_processes, num_coros, ) in product: file_size_bytes = file_size_mib * 1024 * 1024 - chunk_size_bytes = chunk_size_mib * 1024 * 1024 + chunk_size_bytes = chunk_size_kib * 1024 bucket_name = bucket_map[bucket_type] num_files = num_processes * num_coros # Create a descriptive name for the parameter set - name = f"{pattern}_{bucket_type}_{num_processes}p_{file_size_mib}MiB_{chunk_size_mib}MiB" + name = f"{pattern}_{bucket_type}_{num_processes}p_{file_size_mib}MiB_{chunk_size_kib}KiB" params[workload_name].append( TimeBasedReadParameters( diff --git a/tests/perf/microbenchmarks/time_based/reads/config.yaml b/tests/perf/microbenchmarks/time_based/reads/config.yaml index 16d16b77f..556d12e73 100644 --- a/tests/perf/microbenchmarks/time_based/reads/config.yaml +++ b/tests/perf/microbenchmarks/time_based/reads/config.yaml @@ -3,9 +3,9 @@ common: - "zonal" file_sizes_mib: - 10240 # 10GiB - chunk_sizes_mib: [1, 16, 100, 200] # 16MiB + chunk_sizes_kib: [64, 102400] # 16KiB rounds: 1 - duration: 60 # seconds + duration: 30 # seconds warmup_duration: 5 # seconds workload: @@ -13,13 +13,13 @@ workload: - name: "read_seq_multi_process" pattern: "seq" coros: [1] - processes: [1, 48] + processes: [48] - name: "read_rand_multi_process" pattern: "rand" coros: [1] - processes: [1, 48] + processes: [48] defaults: DEFAULT_RAPID_ZONAL_BUCKET: "chandrasiri-benchmarks-zb" From 0471f3b62e50ec92de08f7cbeb47a8b390c6580f Mon Sep 17 00:00:00 2001 From: Chandra Date: Wed, 11 Feb 2026 09:03:37 +0000 Subject: [PATCH 03/10] add support for queue_depth (num_ranges) --- .../time_based/reads/config.py | 6 +++- .../time_based/reads/config.yaml | 7 +++-- .../time_based/reads/parameters.py | 1 + .../time_based/reads/test_reads.py | 28 ++++++++++++------- 4 files changed, 28 insertions(+), 14 deletions(-) diff --git a/tests/perf/microbenchmarks/time_based/reads/config.py b/tests/perf/microbenchmarks/time_based/reads/config.py index ac111faf2..737bb3b84 100644 --- a/tests/perf/microbenchmarks/time_based/reads/config.py +++ b/tests/perf/microbenchmarks/time_based/reads/config.py @@ -36,6 +36,7 @@ def _get_params() -> Dict[str, List[TimeBasedReadParameters]]: bucket_types = common_params["bucket_types"] file_sizes_mib = common_params["file_sizes_mib"] chunk_sizes_kib = common_params["chunk_sizes_kib"] + num_ranges = common_params["num_ranges"] rounds = common_params["rounds"] duration = common_params["duration"] warmup_duration = common_params["warmup_duration"] @@ -62,6 +63,7 @@ def _get_params() -> Dict[str, List[TimeBasedReadParameters]]: bucket_types, file_sizes_mib, chunk_sizes_kib, + num_ranges, processes, coros, ) @@ -70,6 +72,7 @@ def _get_params() -> Dict[str, List[TimeBasedReadParameters]]: bucket_type, file_size_mib, chunk_size_kib, + num_ranges_val, num_processes, num_coros, ) in product: @@ -80,7 +83,7 @@ def _get_params() -> Dict[str, List[TimeBasedReadParameters]]: num_files = num_processes * num_coros # Create a descriptive name for the parameter set - name = f"{pattern}_{bucket_type}_{num_processes}p_{file_size_mib}MiB_{chunk_size_kib}KiB" + name = f"{pattern}_{bucket_type}_{num_processes}p_{file_size_mib}MiB_{chunk_size_kib}KiB_{num_ranges_val}ranges" params[workload_name].append( TimeBasedReadParameters( @@ -97,6 +100,7 @@ def _get_params() -> Dict[str, List[TimeBasedReadParameters]]: file_size_bytes=file_size_bytes, duration=duration, warmup_duration=warmup_duration, + num_ranges=num_ranges_val, ) ) return params diff --git a/tests/perf/microbenchmarks/time_based/reads/config.yaml b/tests/perf/microbenchmarks/time_based/reads/config.yaml index 556d12e73..df8b756cf 100644 --- a/tests/perf/microbenchmarks/time_based/reads/config.yaml +++ b/tests/perf/microbenchmarks/time_based/reads/config.yaml @@ -3,7 +3,8 @@ common: - "zonal" file_sizes_mib: - 10240 # 10GiB - chunk_sizes_kib: [64, 102400] # 16KiB + chunk_sizes_kib: [64] # 16KiB + num_ranges: [1, 4] rounds: 1 duration: 30 # seconds warmup_duration: 5 # seconds @@ -13,13 +14,13 @@ workload: - name: "read_seq_multi_process" pattern: "seq" coros: [1] - processes: [48] + processes: [1] - name: "read_rand_multi_process" pattern: "rand" coros: [1] - processes: [48] + processes: [1] defaults: DEFAULT_RAPID_ZONAL_BUCKET: "chandrasiri-benchmarks-zb" diff --git a/tests/perf/microbenchmarks/time_based/reads/parameters.py b/tests/perf/microbenchmarks/time_based/reads/parameters.py index d35150a23..6ed2da210 100644 --- a/tests/perf/microbenchmarks/time_based/reads/parameters.py +++ b/tests/perf/microbenchmarks/time_based/reads/parameters.py @@ -20,3 +20,4 @@ class TimeBasedReadParameters(IOBenchmarkParameters): pattern: str duration: int warmup_duration: int + num_ranges: int diff --git a/tests/perf/microbenchmarks/time_based/reads/test_reads.py b/tests/perf/microbenchmarks/time_based/reads/test_reads.py index 60510ee79..d699448ad 100644 --- a/tests/perf/microbenchmarks/time_based/reads/test_reads.py +++ b/tests/perf/microbenchmarks/time_based/reads/test_reads.py @@ -81,19 +81,27 @@ async def _download_time_based_async(client, filename, params): is_warming_up = False total_bytes_downloaded = 0 # Reset counter after warmup + ranges = [] if params.pattern == "rand": - offset = random.randint(0, params.file_size_bytes - params.chunk_size_bytes) - - buffer = BytesIO() - await mrd.download_ranges([(offset, params.chunk_size_bytes, buffer)]) + for _ in range(params.num_ranges): + offset = random.randint( + 0, params.file_size_bytes - params.chunk_size_bytes + ) + ranges.append((offset, params.chunk_size_bytes, BytesIO())) + else: # seq + for _ in range(params.num_ranges): + ranges.append((offset, params.chunk_size_bytes, BytesIO())) + offset += params.chunk_size_bytes + if offset + params.chunk_size_bytes > params.file_size_bytes: + offset = 0 # Reset offset if end of file is reached + + await mrd.download_ranges(ranges) + + bytes_in_buffers = sum(r[2].getbuffer().nbytes for r in ranges) + assert bytes_in_buffers == params.chunk_size_bytes * params.num_ranges if not is_warming_up: - total_bytes_downloaded += params.chunk_size_bytes - - if params.pattern == "seq": - offset += params.chunk_size_bytes - if offset + params.chunk_size_bytes > params.file_size_bytes: - offset = 0 # Reset offset if end of file is reached + total_bytes_downloaded += params.chunk_size_bytes * params.num_ranges await mrd.close() return total_bytes_downloaded From eaefc31df192353dafa6c53bcef2737f01c58f0d Mon Sep 17 00:00:00 2001 From: Chandra Date: Wed, 11 Feb 2026 12:39:24 +0000 Subject: [PATCH 04/10] fix throughput calculation logic --- tests/perf/microbenchmarks/_utils.py | 30 ++++++++++++++----- .../time_based/reads/config.yaml | 4 +-- .../time_based/reads/test_reads.py | 29 +++++++++--------- 3 files changed, 39 insertions(+), 24 deletions(-) diff --git a/tests/perf/microbenchmarks/_utils.py b/tests/perf/microbenchmarks/_utils.py index ff29b8783..71eef1af1 100644 --- a/tests/perf/microbenchmarks/_utils.py +++ b/tests/perf/microbenchmarks/_utils.py @@ -11,7 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from typing import Any, List +from typing import Any, List, Optional import statistics import io import os @@ -22,7 +22,10 @@ def publish_benchmark_extra_info( params: Any, benchmark_group: str = "read", true_times: List[float] = [], + download_bytes_list: Optional[List[int]] = None, + duration: Optional[int] = None, ) -> None: + """ Helper function to publish benchmark parameters to the extra_info property. """ @@ -40,14 +43,25 @@ def publish_benchmark_extra_info( benchmark.extra_info["bucket_type"] = params.bucket_type benchmark.extra_info["processes"] = params.num_processes benchmark.group = benchmark_group + print('this is download bytes list', download_bytes_list) - object_size = params.file_size_bytes - num_files = params.num_files - total_uploaded_mib = object_size / (1024 * 1024) * num_files - min_throughput = total_uploaded_mib / benchmark.stats["max"] - max_throughput = total_uploaded_mib / benchmark.stats["min"] - mean_throughput = total_uploaded_mib / benchmark.stats["mean"] - median_throughput = total_uploaded_mib / benchmark.stats["median"] + if download_bytes_list is not None: + assert duration is not None, "Duration must be provided if total_bytes_transferred is provided." + throughputs_list = [x / duration / (1024 * 1024) for x in download_bytes_list] + min_throughput = min(throughputs_list) + max_throughput = max(throughputs_list) + mean_throughput = statistics.mean(throughputs_list) + median_throughput = statistics.median(throughputs_list) + + + else: + object_size = params.file_size_bytes + num_files = params.num_files + total_uploaded_mib = object_size / (1024 * 1024) * num_files + min_throughput = total_uploaded_mib / benchmark.stats["max"] + max_throughput = total_uploaded_mib / benchmark.stats["min"] + mean_throughput = total_uploaded_mib / benchmark.stats["mean"] + median_throughput = total_uploaded_mib / benchmark.stats["median"] benchmark.extra_info["throughput_MiB_s_min"] = min_throughput benchmark.extra_info["throughput_MiB_s_max"] = max_throughput diff --git a/tests/perf/microbenchmarks/time_based/reads/config.yaml b/tests/perf/microbenchmarks/time_based/reads/config.yaml index df8b756cf..5d7b250e8 100644 --- a/tests/perf/microbenchmarks/time_based/reads/config.yaml +++ b/tests/perf/microbenchmarks/time_based/reads/config.yaml @@ -3,7 +3,7 @@ common: - "zonal" file_sizes_mib: - 10240 # 10GiB - chunk_sizes_kib: [64] # 16KiB + chunk_sizes_kib: [64, 1024, 16384, 102400, 204800] # 16KiB num_ranges: [1, 4] rounds: 1 duration: 30 # seconds @@ -24,4 +24,4 @@ workload: defaults: DEFAULT_RAPID_ZONAL_BUCKET: "chandrasiri-benchmarks-zb" - DEFAULT_STANDARD_BUCKET: "chandrasiri-benchmarks-rb" + DEFAULT_STANDARD_BUCKET: "chandrasiri-benchmarks-rb" \ No newline at end of file diff --git a/tests/perf/microbenchmarks/time_based/reads/test_reads.py b/tests/perf/microbenchmarks/time_based/reads/test_reads.py index d699448ad..f9f7cdfbe 100644 --- a/tests/perf/microbenchmarks/time_based/reads/test_reads.py +++ b/tests/perf/microbenchmarks/time_based/reads/test_reads.py @@ -65,7 +65,6 @@ def _worker_init(bucket_type): async def _download_time_based_async(client, filename, params): total_bytes_downloaded = 0 - mrd = AsyncMultiRangeDownloader(client, params.bucket_name, filename) await mrd.open() @@ -118,9 +117,7 @@ def _download_files_worker(process_idx, filename, params, bucket_type): def download_files_mp_mc_wrapper(pool, files_names, params, bucket_type): - args = [ - (i, files_names[i], params, bucket_type) for i in range(len(files_names)) - ] + args = [(i, files_names[i], params, bucket_type) for i in range(len(files_names))] results = pool.starmap(_download_files_worker, args) return sum(results) @@ -145,13 +142,12 @@ def test_downloads_multi_proc_multi_coro( initargs=(params.bucket_type,), ) - total_bytes_downloaded = 0 + download_bytes_list = [] def target_wrapper(*args, **kwargs): - nonlocal total_bytes_downloaded - total_bytes_downloaded = download_files_mp_mc_wrapper(pool, *args, **kwargs) - # This benchmark doesn't return per-operation times, so we return a dummy value - return [0] + nonlocal download_bytes_list + download_bytes_list.append(download_files_mp_mc_wrapper(pool, *args, **kwargs)) + return try: with monitor() as m: @@ -164,9 +160,14 @@ def target_wrapper(*args, **kwargs): finally: pool.close() pool.join() - throughput_mib_s = (total_bytes_downloaded / params.duration) / (1024 * 1024) - benchmark.extra_info["throughput_mib_s"] = f"{throughput_mib_s:.2f}" - print(f"Throughput: {throughput_mib_s:.2f} MiB/s") - publish_benchmark_extra_info(benchmark, params) + total_bytes_downloaded = sum(download_bytes_list) + throughput_mib_s = (total_bytes_downloaded / params.duration / params.rounds) / (1024 * 1024) + benchmark.extra_info["avg_throughput_mib_s"] = f"{throughput_mib_s:.2f}" + print(f"Avg Throughput of {params.rounds} round(s): {throughput_mib_s:.2f} MiB/s") + publish_benchmark_extra_info( + benchmark, + params, + download_bytes_list=download_bytes_list, + duration=params.duration, + ) publish_resource_metrics(benchmark, m) - From a1fa0445d5815ef5a0dd93af436e9bc6d7696653 Mon Sep 17 00:00:00 2001 From: Chandra Date: Wed, 11 Feb 2026 12:46:19 +0000 Subject: [PATCH 05/10] remove commented out code --- .../microbenchmarks/time_based/conftest.py | 64 ------------------- 1 file changed, 64 deletions(-) diff --git a/tests/perf/microbenchmarks/time_based/conftest.py b/tests/perf/microbenchmarks/time_based/conftest.py index b60b64610..bcd186d7b 100644 --- a/tests/perf/microbenchmarks/time_based/conftest.py +++ b/tests/perf/microbenchmarks/time_based/conftest.py @@ -12,74 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. import pytest -import os -import multiprocessing -import logging -from google.cloud import storage - -_OBJECT_NAME_PREFIX = "time_based_tests" - - -# def _upload_worker(args): -# bucket_name, object_name, object_size = args -# storage_client = storage.Client() -# bucket = storage_client.bucket(bucket_name) -# blob = bucket.blob(object_name) - -# try: -# blob.reload() -# if blob.size >= object_size: -# logging.info(f"Object {object_name} already exists and has the required size.") -# return object_name, object_size -# except Exception: -# pass - -# logging.info(f"Creating object {object_name} of size {object_size} bytes.") -# # For large objects, it's better to upload in chunks. -# # Using urandom is slow, so for large objects, we will write the same chunk over and over. -# chunk_size = 100 * 1024 * 1024 # 100 MiB -# data_chunk = os.urandom(chunk_size) -# num_chunks = object_size // chunk_size -# remaining_bytes = object_size % chunk_size - -# from io import BytesIO -# with BytesIO() as f: -# for _ in range(num_chunks): -# f.write(data_chunk) -# if remaining_bytes > 0: -# f.write(data_chunk[:remaining_bytes]) - -# f.seek(0) -# blob.upload_from_file(f, size=object_size) - -# logging.info(f"Finished creating object {object_name}.") -# return object_name, object_size - - -# def _create_files(num_files, bucket_name, object_size): -# """ -# Create/Upload objects for benchmarking and return a list of their names. -# """ -# object_names = [f"{_OBJECT_NAME_PREFIX}_{i}" for i in range(num_files)] - -# args_list = [ -# (bucket_name, object_names[i], object_size) for i in range(num_files) -# ] - -# # Don't use a pool to avoid contention writing the same objects. -# # The check for existence should make this fast on subsequent runs. -# results = [_upload_worker(arg) for arg in args_list] - -# return [r[0] for r in results] @pytest.fixture def workload_params(request): params = request.param files_names = [f'fio-go_storage_fio.0.{i}' for i in range(0, params.num_processes)] - # files_names = _create_files( - # params.num_processes, # One file per process - # params.bucket_name, - # params.file_size_bytes, - # ) return params, files_names From cc6826c1dd8439e1aabcadbc1bcddc9e5082a615 Mon Sep 17 00:00:00 2001 From: Chandra Date: Wed, 11 Feb 2026 13:18:49 +0000 Subject: [PATCH 06/10] support regional buckets for time based benchmarks --- .../time_based/reads/config.yaml | 7 +-- .../time_based/reads/test_reads.py | 51 +++++++++++++++++-- 2 files changed, 52 insertions(+), 6 deletions(-) diff --git a/tests/perf/microbenchmarks/time_based/reads/config.yaml b/tests/perf/microbenchmarks/time_based/reads/config.yaml index 5d7b250e8..e739bfd2f 100644 --- a/tests/perf/microbenchmarks/time_based/reads/config.yaml +++ b/tests/perf/microbenchmarks/time_based/reads/config.yaml @@ -1,10 +1,11 @@ common: bucket_types: + - "regional" - "zonal" file_sizes_mib: - 10240 # 10GiB - chunk_sizes_kib: [64, 1024, 16384, 102400, 204800] # 16KiB - num_ranges: [1, 4] + chunk_sizes_kib: [64] # 16KiB + num_ranges: [1] rounds: 1 duration: 30 # seconds warmup_duration: 5 # seconds @@ -14,7 +15,7 @@ workload: - name: "read_seq_multi_process" pattern: "seq" coros: [1] - processes: [1] + processes: [96] - name: "read_rand_multi_process" diff --git a/tests/perf/microbenchmarks/time_based/reads/test_reads.py b/tests/perf/microbenchmarks/time_based/reads/test_reads.py index f9f7cdfbe..7690e4751 100644 --- a/tests/perf/microbenchmarks/time_based/reads/test_reads.py +++ b/tests/perf/microbenchmarks/time_based/reads/test_reads.py @@ -62,6 +62,51 @@ def _worker_init(bucket_type): worker_json_client = storage.Client() + +def _download_time_based_json(client, filename, params): + """Performs time-based downloads using the JSON API.""" + total_bytes_downloaded = 0 + bucket = client.bucket(params.bucket_name) + blob = bucket.blob(filename) + + offset = 0 + is_warming_up = True + start_time = time.monotonic() + warmup_end_time = start_time + params.warmup_duration + test_end_time = warmup_end_time + params.duration + + while time.monotonic() < test_end_time: + current_time = time.monotonic() + if is_warming_up and current_time >= warmup_end_time: + is_warming_up = False + total_bytes_downloaded = 0 # Reset counter after warmup + + bytes_in_iteration = 0 + # For JSON, we can't batch ranges like gRPC, so we download one by one + for _ in range(params.num_ranges): + if params.pattern == "rand": + offset = random.randint( + 0, params.file_size_bytes - params.chunk_size_bytes + ) + + data = blob.download_as_bytes( + start=offset, end=offset + params.chunk_size_bytes - 1 + ) + bytes_in_iteration += len(data) + + if params.pattern == "seq": + offset += params.chunk_size_bytes + if offset + params.chunk_size_bytes > params.file_size_bytes: + offset = 0 + + assert bytes_in_iteration == params.chunk_size_bytes * params.num_ranges + + if not is_warming_up: + total_bytes_downloaded += bytes_in_iteration + + return total_bytes_downloaded + + async def _download_time_based_async(client, filename, params): total_bytes_downloaded = 0 @@ -112,8 +157,8 @@ def _download_files_worker(process_idx, filename, params, bucket_type): return worker_loop.run_until_complete( _download_time_based_async(worker_client, filename, params) ) - else: # regional - JSON API not implemented for this test - raise NotImplementedError("JSON API not implemented for time-based tests") + else: # regional + return _download_time_based_json(worker_json_client, filename, params) def download_files_mp_mc_wrapper(pool, files_names, params, bucket_type): @@ -125,7 +170,7 @@ def download_files_mp_mc_wrapper(pool, files_names, params, bucket_type): @pytest.mark.parametrize( "workload_params", - all_params["read_seq_multi_process"] + all_params["read_rand_multi_process"], + all_params["read_seq_multi_process"],# + all_params["read_rand_multi_process"], indirect=True, ids=lambda p: p.name, ) From b35819910cde4898e664eb716e4f3364526ad6d3 Mon Sep 17 00:00:00 2001 From: Chandra Date: Wed, 11 Feb 2026 15:06:47 +0000 Subject: [PATCH 07/10] run both seq and random --- tests/perf/microbenchmarks/time_based/reads/test_reads.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/perf/microbenchmarks/time_based/reads/test_reads.py b/tests/perf/microbenchmarks/time_based/reads/test_reads.py index 7690e4751..5fa4fc302 100644 --- a/tests/perf/microbenchmarks/time_based/reads/test_reads.py +++ b/tests/perf/microbenchmarks/time_based/reads/test_reads.py @@ -170,7 +170,7 @@ def download_files_mp_mc_wrapper(pool, files_names, params, bucket_type): @pytest.mark.parametrize( "workload_params", - all_params["read_seq_multi_process"],# + all_params["read_rand_multi_process"], + all_params["read_seq_multi_process"] + all_params["read_rand_multi_process"], indirect=True, ids=lambda p: p.name, ) From b733577b7bdbcbc1defb4079a99431d971a2dc46 Mon Sep 17 00:00:00 2001 From: Chandra Date: Fri, 13 Feb 2026 05:35:11 +0000 Subject: [PATCH 08/10] fix lint issues --- .../perf/microbenchmarks/time_based/reads/test_reads.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/tests/perf/microbenchmarks/time_based/reads/test_reads.py b/tests/perf/microbenchmarks/time_based/reads/test_reads.py index 5fa4fc302..aaac1d669 100644 --- a/tests/perf/microbenchmarks/time_based/reads/test_reads.py +++ b/tests/perf/microbenchmarks/time_based/reads/test_reads.py @@ -62,7 +62,6 @@ def _worker_init(bucket_type): worker_json_client = storage.Client() - def _download_time_based_json(client, filename, params): """Performs time-based downloads using the JSON API.""" total_bytes_downloaded = 0 @@ -206,9 +205,13 @@ def target_wrapper(*args, **kwargs): pool.close() pool.join() total_bytes_downloaded = sum(download_bytes_list) - throughput_mib_s = (total_bytes_downloaded / params.duration / params.rounds) / (1024 * 1024) + throughput_mib_s = ( + total_bytes_downloaded / params.duration / params.rounds + ) / (1024 * 1024) benchmark.extra_info["avg_throughput_mib_s"] = f"{throughput_mib_s:.2f}" - print(f"Avg Throughput of {params.rounds} round(s): {throughput_mib_s:.2f} MiB/s") + print( + f"Avg Throughput of {params.rounds} round(s): {throughput_mib_s:.2f} MiB/s" + ) publish_benchmark_extra_info( benchmark, params, From b62550779348646a8c7af48c979c479596e0c95d Mon Sep 17 00:00:00 2001 From: Chandra Date: Fri, 13 Feb 2026 06:11:53 +0000 Subject: [PATCH 09/10] remove nonlocal reference --- tests/perf/microbenchmarks/time_based/reads/test_reads.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/perf/microbenchmarks/time_based/reads/test_reads.py b/tests/perf/microbenchmarks/time_based/reads/test_reads.py index aaac1d669..c56112da9 100644 --- a/tests/perf/microbenchmarks/time_based/reads/test_reads.py +++ b/tests/perf/microbenchmarks/time_based/reads/test_reads.py @@ -189,7 +189,6 @@ def test_downloads_multi_proc_multi_coro( download_bytes_list = [] def target_wrapper(*args, **kwargs): - nonlocal download_bytes_list download_bytes_list.append(download_files_mp_mc_wrapper(pool, *args, **kwargs)) return From 9657d564eb0aad06400f7e9835d743efd08ee138 Mon Sep 17 00:00:00 2001 From: Chandra Date: Fri, 13 Feb 2026 07:35:22 +0000 Subject: [PATCH 10/10] remove print statement --- tests/perf/microbenchmarks/_utils.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/perf/microbenchmarks/_utils.py b/tests/perf/microbenchmarks/_utils.py index 71eef1af1..b7b66b542 100644 --- a/tests/perf/microbenchmarks/_utils.py +++ b/tests/perf/microbenchmarks/_utils.py @@ -43,7 +43,6 @@ def publish_benchmark_extra_info( benchmark.extra_info["bucket_type"] = params.bucket_type benchmark.extra_info["processes"] = params.num_processes benchmark.group = benchmark_group - print('this is download bytes list', download_bytes_list) if download_bytes_list is not None: assert duration is not None, "Duration must be provided if total_bytes_transferred is provided."