Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 2096991

Browse filesBrowse files
authored
perf: add multiplexing performance tests for AsyncMultiRangeDownloader (#16501)
## Overview This PR introduces new microbenchmarks to measure and expose the performance bottleneck caused by lock contention in the `AsyncMultiRangeDownloader`. It provides a concrete way to compare the previous serialized implementation against the new multiplexed architecture. ## Before vs. After: The Performance Gap ### Before (Serialized via Lock) In the previous implementation, `download_ranges` used a shared lock to prevent concurrent access to the bidi-gRPC stream. This meant that even with multiple coroutines, only one could "own" the stream at a time. The entire download cycle (Send -> Receive All) had to complete before another task could start. **Execution Flow:** ```mermaid sequenceDiagram participant C1 as Coroutine 1 participant C2 as Coroutine 2 participant S as gRPC Stream C1->>C1: Acquire Lock C1->>S: Send Requests S-->>C1: Receive Data (Streaming...) S-->>C1: End of Range C1->>C1: Release Lock Note over C2: Waiting for Lock... C2->>C2: Acquire Lock C2->>S: Send Requests S-->>C2: Receive Data (Streaming...) S-->>C2: End of Range C2->>C2: Release Lock ``` ### After (Multiplexed Concurrent) With the introduction of the `_StreamMultiplexer`, multiple coroutines can now share the same stream concurrently. Requests are interleaved, and a background receiver loop routes incoming data to the correct task using `read_id`. **Execution Flow:** ```mermaid sequenceDiagram participant C1 as Coroutine 1 participant C2 as Coroutine 2 participant M as Multiplexer participant S as gRPC Stream C1->>M: Send Requests M->>S: Forward Req 1 C2->>M: Send Requests M->>S: Forward Req 2 Note over C1,C2: Tasks wait on their own queues S-->>M: Data for C1 M-->>C1: Route to Q1 S-->>M: Data for C2 M-->>C2: Route to Q2 S-->>M: Data for C1 M-->>C1: Route to Q1 ``` ## How the Benchmark Works This PR adds a `read_rand_multi_coro` workload that: 1. Spawns multiple asynchronous tasks (coroutines). 2. Shares a single `AsyncMultiRangeDownloader` instance across all tasks. 3. Simulates the old serialized behavior by explicitly passing a `shared_lock` to `download_ranges`. 4. Measures total throughput (MiB/s) and resource utilization. ## Key Changes - **`test_reads.py`**: Refactored to support launching concurrent coroutines within a single worker process. - **`config.yaml`**: Added `read_rand_multi_coro` with 1, 16 coroutines to stress the downloader. - **`config.py`**: Updated naming convention to include coroutine count (e.g., `16c`) in reports for easier differentiation.
1 parent d3d6840 commit 2096991
Copy full SHA for 2096991

5 files changed

+45-40Lines changed: 45 additions & 40 deletions

File tree

Expand file treeCollapse file tree
Open diff view settings
Filter options
Expand file treeCollapse file tree
Open diff view settings
Collapse file

‎packages/google-cloud-storage/output.json‎

Copy file name to clipboardExpand all lines: packages/google-cloud-storage/output.json
Whitespace-only changes.
Collapse file

‎packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/conftest.py‎

Copy file name to clipboardExpand all lines: packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/conftest.py
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,5 +17,5 @@
1717
@pytest.fixture
1818
def workload_params(request):
1919
params = request.param
20-
files_names = [f"fio-go_storage_fio.0.{i}" for i in range(0, params.num_processes)]
20+
files_names = [f"fio-go_storage_fio.0.{i}" for i in range(0, params.num_files)]
2121
return params, files_names
Collapse file

‎packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads/config.py‎

Copy file name to clipboardExpand all lines: packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads/config.py
+2-2Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,10 +80,10 @@ def _get_params() -> Dict[str, List[TimeBasedReadParameters]]:
8080
chunk_size_bytes = chunk_size_kib * 1024
8181
bucket_name = bucket_map[bucket_type]
8282

83-
num_files = num_processes * num_coros
83+
num_files = num_processes
8484

8585
# Create a descriptive name for the parameter set
86-
name = f"{pattern}_{bucket_type}_{num_processes}p_{file_size_mib}MiB_{chunk_size_kib}KiB_{num_ranges_val}ranges"
86+
name = f"{pattern}_{bucket_type}_{num_processes}p_{num_coros}c_{file_size_mib}MiB_{chunk_size_kib}KiB_{num_ranges_val}ranges"
8787

8888
params[workload_name].append(
8989
TimeBasedReadParameters(
Collapse file

‎packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads/config.yaml‎

Copy file name to clipboardExpand all lines: packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads/config.yaml
+2-1Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,10 @@ workload:
2020

2121
- name: "read_rand_multi_process"
2222
pattern: "rand"
23-
coros: [1]
23+
coros: [1, 16]
2424
processes: [1]
2525

26+
2627
defaults:
2728
DEFAULT_RAPID_ZONAL_BUCKET: "chandrasiri-benchmarks-zb"
2829
DEFAULT_STANDARD_BUCKET: "chandrasiri-benchmarks-rb"
Collapse file

‎packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads/test_reads.py‎

Copy file name to clipboardExpand all lines: packages/google-cloud-storage/tests/perf/microbenchmarks/time_based/reads/test_reads.py
+40-36Lines changed: 40 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -115,47 +115,51 @@ def _download_time_based_json(client, filename, params):
115115

116116

117117
async def _download_time_based_async(client, filename, params):
118-
total_bytes_downloaded = 0
119-
120118
mrd = AsyncMultiRangeDownloader(client, params.bucket_name, filename)
121119
await mrd.open()
122120

123-
offset = 0
124-
is_warming_up = True
125-
start_time = time.monotonic()
126-
warmup_end_time = start_time + params.warmup_duration
127-
test_end_time = warmup_end_time + params.duration
128-
129-
while time.monotonic() < test_end_time:
130-
current_time = time.monotonic()
131-
if is_warming_up and current_time >= warmup_end_time:
132-
is_warming_up = False
133-
total_bytes_downloaded = 0 # Reset counter after warmup
134-
135-
ranges = []
136-
if params.pattern == "rand":
137-
for _ in range(params.num_ranges):
138-
offset = random.randint(
139-
0, params.file_size_bytes - params.chunk_size_bytes
140-
)
141-
ranges.append((offset, params.chunk_size_bytes, BytesIO()))
142-
else: # seq
143-
for _ in range(params.num_ranges):
144-
ranges.append((offset, params.chunk_size_bytes, BytesIO()))
145-
offset += params.chunk_size_bytes
146-
if offset + params.chunk_size_bytes > params.file_size_bytes:
147-
offset = 0 # Reset offset if end of file is reached
148-
149-
await mrd.download_ranges(ranges)
150-
151-
bytes_in_buffers = sum(r[2].getbuffer().nbytes for r in ranges)
152-
assert bytes_in_buffers == params.chunk_size_bytes * params.num_ranges
153-
154-
if not is_warming_up:
155-
total_bytes_downloaded += params.chunk_size_bytes * params.num_ranges
121+
async def _worker_coro():
122+
total_bytes_downloaded = 0
123+
offset = 0
124+
is_warming_up = True
125+
start_time = time.monotonic()
126+
warmup_end_time = start_time + params.warmup_duration
127+
test_end_time = warmup_end_time + params.duration
128+
129+
while time.monotonic() < test_end_time:
130+
current_time = time.monotonic()
131+
if is_warming_up and current_time >= warmup_end_time:
132+
is_warming_up = False
133+
total_bytes_downloaded = 0 # Reset counter after warmup
134+
135+
ranges = []
136+
if params.pattern == "rand":
137+
for _ in range(params.num_ranges):
138+
offset = random.randint(
139+
0, params.file_size_bytes - params.chunk_size_bytes
140+
)
141+
ranges.append((offset, params.chunk_size_bytes, BytesIO()))
142+
else: # seq
143+
for _ in range(params.num_ranges):
144+
ranges.append((offset, params.chunk_size_bytes, BytesIO()))
145+
offset += params.chunk_size_bytes
146+
if offset + params.chunk_size_bytes > params.file_size_bytes:
147+
offset = 0 # Reset offset if end of file is reached
148+
149+
await mrd.download_ranges(ranges)
150+
151+
bytes_in_buffers = sum(r[2].getbuffer().nbytes for r in ranges)
152+
assert bytes_in_buffers == params.chunk_size_bytes * params.num_ranges
153+
154+
if not is_warming_up:
155+
total_bytes_downloaded += params.chunk_size_bytes * params.num_ranges
156+
return total_bytes_downloaded
157+
158+
tasks = [asyncio.create_task(_worker_coro()) for _ in range(params.num_coros)]
159+
results = await asyncio.gather(*tasks)
156160

157161
await mrd.close()
158-
return total_bytes_downloaded
162+
return sum(results)
159163

160164

161165
def _download_files_worker(process_idx, filename, params, bucket_type):

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.