Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit f7095fa

Browse filesBrowse files
authored
feat: make flush size configurable (#1677)
feat: make flush size configurable
1 parent a0668ec commit f7095fa
Copy full SHA for f7095fa

File tree

Expand file treeCollapse file tree

3 files changed

+139
-9
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

3 files changed

+139
-9
lines changed
Open diff view settings
Collapse file

‎google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py‎

Copy file name to clipboardExpand all lines: google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py
+21-5Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
from typing import Optional, Union
2525

2626
from google_crc32c import Checksum
27+
from google.api_core import exceptions
2728

2829
from ._utils import raise_if_no_fast_crc32c
2930
from google.cloud import _storage_v2
@@ -36,7 +37,7 @@
3637

3738

3839
_MAX_CHUNK_SIZE_BYTES = 2 * 1024 * 1024 # 2 MiB
39-
_MAX_BUFFER_SIZE_BYTES = 16 * 1024 * 1024 # 16 MiB
40+
_DEFAULT_FLUSH_INTERVAL_BYTES = 16 * 1024 * 1024 # 16 MiB
4041

4142

4243
class AsyncAppendableObjectWriter:
@@ -49,6 +50,7 @@ def __init__(
4950
object_name: str,
5051
generation=None,
5152
write_handle=None,
53+
writer_options: Optional[dict] = None,
5254
):
5355
"""
5456
Class for appending data to a GCS Appendable Object.
@@ -125,6 +127,21 @@ def __init__(
125127
# Please note: `offset` and `persisted_size` are same when the stream is
126128
# opened.
127129
self.persisted_size: Optional[int] = None
130+
if writer_options is None:
131+
writer_options = {}
132+
self.flush_interval = writer_options.get(
133+
"FLUSH_INTERVAL_BYTES", _DEFAULT_FLUSH_INTERVAL_BYTES
134+
)
135+
# TODO: add test case for this.
136+
if self.flush_interval < _MAX_CHUNK_SIZE_BYTES:
137+
raise exceptions.OutOfRange(
138+
f"flush_interval must be >= {_MAX_CHUNK_SIZE_BYTES} , but provided {self.flush_interval}"
139+
)
140+
if self.flush_interval % _MAX_CHUNK_SIZE_BYTES != 0:
141+
raise exceptions.OutOfRange(
142+
f"flush_interval must be a multiple of {_MAX_CHUNK_SIZE_BYTES}, but provided {self.flush_interval}"
143+
)
144+
self.bytes_appended_since_last_flush = 0
128145

129146
async def state_lookup(self) -> int:
130147
"""Returns the persisted_size
@@ -193,7 +210,6 @@ async def append(self, data: bytes) -> None:
193210
self.offset = self.persisted_size
194211

195212
start_idx = 0
196-
bytes_to_flush = 0
197213
while start_idx < total_bytes:
198214
end_idx = min(start_idx + _MAX_CHUNK_SIZE_BYTES, total_bytes)
199215
data_chunk = data[start_idx:end_idx]
@@ -208,10 +224,10 @@ async def append(self, data: bytes) -> None:
208224
)
209225
chunk_size = end_idx - start_idx
210226
self.offset += chunk_size
211-
bytes_to_flush += chunk_size
212-
if bytes_to_flush >= _MAX_BUFFER_SIZE_BYTES:
227+
self.bytes_appended_since_last_flush += chunk_size
228+
if self.bytes_appended_since_last_flush >= self.flush_interval:
213229
await self.simple_flush()
214-
bytes_to_flush = 0
230+
self.bytes_appended_since_last_flush = 0
215231
start_idx = end_idx
216232

217233
async def simple_flush(self) -> None:
Collapse file

‎tests/system/test_zonal.py‎

Copy file name to clipboardExpand all lines: tests/system/test_zonal.py
+54Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient
1414
from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import (
1515
AsyncAppendableObjectWriter,
16+
_DEFAULT_FLUSH_INTERVAL_BYTES,
1617
)
1718
from google.cloud.storage._experimental.asyncio.async_multi_range_downloader import (
1819
AsyncMultiRangeDownloader,
@@ -162,6 +163,59 @@ async def test_basic_wrd_in_slices(storage_client, blobs_to_delete, object_size)
162163
blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))
163164

164165

166+
@pytest.mark.asyncio
167+
@pytest.mark.parametrize(
168+
"flush_interval",
169+
[2 * 1024 * 1024, 4 * 1024 * 1024, 8 * 1024 * 1024, _DEFAULT_FLUSH_INTERVAL_BYTES],
170+
)
171+
async def test_wrd_with_non_default_flush_interval(
172+
storage_client,
173+
blobs_to_delete,
174+
flush_interval,
175+
):
176+
object_name = f"test_basic_wrd-{str(uuid.uuid4())}"
177+
object_size = 9 * 1024 * 1024
178+
179+
# Client instantiation; it cannot be part of fixture because.
180+
# grpc_client's event loop and event loop of coroutine running it
181+
# (i.e. this test) must be same.
182+
# Note:
183+
# 1. @pytest.mark.asyncio ensures new event loop for each test.
184+
# 2. we can keep the same event loop for entire module but that may
185+
# create issues if tests are run in parallel and one test hogs the event
186+
# loop slowing down other tests.
187+
object_data = os.urandom(object_size)
188+
object_checksum = google_crc32c.value(object_data)
189+
grpc_client = AsyncGrpcClient().grpc_client
190+
191+
writer = AsyncAppendableObjectWriter(
192+
grpc_client,
193+
_ZONAL_BUCKET,
194+
object_name,
195+
writer_options={"FLUSH_INTERVAL_BYTES": flush_interval},
196+
)
197+
await writer.open()
198+
mark1, mark2 = _get_equal_dist(0, object_size)
199+
await writer.append(object_data[0:mark1])
200+
await writer.append(object_data[mark1:mark2])
201+
await writer.append(object_data[mark2:])
202+
object_metadata = await writer.close(finalize_on_close=True)
203+
assert object_metadata.size == object_size
204+
assert int(object_metadata.checksums.crc32c) == object_checksum
205+
206+
mrd = AsyncMultiRangeDownloader(grpc_client, _ZONAL_BUCKET, object_name)
207+
buffer = BytesIO()
208+
await mrd.open()
209+
# (0, 0) means read the whole object
210+
await mrd.download_ranges([(0, 0, buffer)])
211+
await mrd.close()
212+
assert buffer.getvalue() == object_data
213+
assert mrd.persisted_size == object_size
214+
215+
# Clean up; use json client (i.e. `storage_client` fixture) to delete.
216+
blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))
217+
218+
165219
@pytest.mark.asyncio
166220
async def test_read_unfinalized_appendable_object(storage_client, blobs_to_delete):
167221
object_name = f"read_unfinalized_appendable_object-{str(uuid.uuid4())[:4]}"
Collapse file

‎tests/unit/asyncio/test_async_appendable_object_writer.py‎

Copy file name to clipboardExpand all lines: tests/unit/asyncio/test_async_appendable_object_writer.py
+64-4Lines changed: 64 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@
2121
from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import (
2222
AsyncAppendableObjectWriter,
2323
)
24+
from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import (
25+
_MAX_CHUNK_SIZE_BYTES,
26+
)
2427
from google.cloud import _storage_v2
2528

2629

@@ -29,6 +32,7 @@
2932
GENERATION = 123
3033
WRITE_HANDLE = b"test-write-handle"
3134
PERSISTED_SIZE = 456
35+
EIGHT_MIB = 8 * 1024 * 1024
3236

3337

3438
@pytest.fixture
@@ -52,6 +56,7 @@ def test_init(mock_write_object_stream, mock_client):
5256
assert not writer._is_stream_open
5357
assert writer.offset is None
5458
assert writer.persisted_size is None
59+
assert writer.bytes_appended_since_last_flush == 0
5560

5661
mock_write_object_stream.assert_called_once_with(
5762
client=mock_client,
@@ -78,6 +83,7 @@ def test_init_with_optional_args(mock_write_object_stream, mock_client):
7883

7984
assert writer.generation == GENERATION
8085
assert writer.write_handle == WRITE_HANDLE
86+
assert writer.bytes_appended_since_last_flush == 0
8187

8288
mock_write_object_stream.assert_called_once_with(
8389
client=mock_client,
@@ -88,6 +94,60 @@ def test_init_with_optional_args(mock_write_object_stream, mock_client):
8894
)
8995

9096

97+
@mock.patch(
98+
"google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream"
99+
)
100+
def test_init_with_writer_options(mock_write_object_stream, mock_client):
101+
"""Test the constructor with optional arguments."""
102+
writer = AsyncAppendableObjectWriter(
103+
mock_client,
104+
BUCKET,
105+
OBJECT,
106+
writer_options={"FLUSH_INTERVAL_BYTES": EIGHT_MIB},
107+
)
108+
109+
assert writer.flush_interval == EIGHT_MIB
110+
assert writer.bytes_appended_since_last_flush == 0
111+
112+
mock_write_object_stream.assert_called_once_with(
113+
client=mock_client,
114+
bucket_name=BUCKET,
115+
object_name=OBJECT,
116+
generation_number=None,
117+
write_handle=None,
118+
)
119+
120+
121+
@mock.patch(
122+
"google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream"
123+
)
124+
def test_init_with_flush_interval_less_than_chunk_size_raises_error(mock_client):
125+
"""Test that an OutOfRange error is raised if flush_interval is less than the chunk size."""
126+
127+
with pytest.raises(exceptions.OutOfRange):
128+
AsyncAppendableObjectWriter(
129+
mock_client,
130+
BUCKET,
131+
OBJECT,
132+
writer_options={"FLUSH_INTERVAL_BYTES": _MAX_CHUNK_SIZE_BYTES - 1},
133+
)
134+
135+
136+
@mock.patch(
137+
"google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream"
138+
)
139+
def test_init_with_flush_interval_not_multiple_of_chunk_size_raises_error(mock_client):
140+
"""Test that an OutOfRange error is raised if flush_interval is not a multiple of the chunk size."""
141+
142+
with pytest.raises(exceptions.OutOfRange):
143+
AsyncAppendableObjectWriter(
144+
mock_client,
145+
BUCKET,
146+
OBJECT,
147+
writer_options={"FLUSH_INTERVAL_BYTES": _MAX_CHUNK_SIZE_BYTES + 1},
148+
)
149+
150+
91151
@mock.patch("google.cloud.storage._experimental.asyncio._utils.google_crc32c")
92152
@mock.patch(
93153
"google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client"
@@ -477,7 +537,7 @@ async def test_append_flushes_when_buffer_is_full(
477537
):
478538
"""Test that append flushes the stream when the buffer size is reached."""
479539
from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import (
480-
_MAX_BUFFER_SIZE_BYTES,
540+
_DEFAULT_FLUSH_INTERVAL_BYTES,
481541
)
482542

483543
writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT)
@@ -487,7 +547,7 @@ async def test_append_flushes_when_buffer_is_full(
487547
mock_stream.send = mock.AsyncMock()
488548
writer.simple_flush = mock.AsyncMock()
489549

490-
data = b"a" * _MAX_BUFFER_SIZE_BYTES
550+
data = b"a" * _DEFAULT_FLUSH_INTERVAL_BYTES
491551
await writer.append(data)
492552

493553
writer.simple_flush.assert_awaited_once()
@@ -500,7 +560,7 @@ async def test_append_flushes_when_buffer_is_full(
500560
async def test_append_handles_large_data(mock_write_object_stream, mock_client):
501561
"""Test that append handles data larger than the buffer size."""
502562
from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import (
503-
_MAX_BUFFER_SIZE_BYTES,
563+
_DEFAULT_FLUSH_INTERVAL_BYTES,
504564
)
505565

506566
writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT)
@@ -510,7 +570,7 @@ async def test_append_handles_large_data(mock_write_object_stream, mock_client):
510570
mock_stream.send = mock.AsyncMock()
511571
writer.simple_flush = mock.AsyncMock()
512572

513-
data = b"a" * (_MAX_BUFFER_SIZE_BYTES * 2 + 1)
573+
data = b"a" * (_DEFAULT_FLUSH_INTERVAL_BYTES * 2 + 1)
514574
await writer.append(data)
515575

516576
assert writer.simple_flush.await_count == 2

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.