Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings
77 changes: 72 additions & 5 deletions 77 tests/system/test_zonal.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from io import BytesIO

# python additional imports
import google_crc32c

import pytest

# current library imports
Expand All @@ -28,6 +30,11 @@
_BYTES_TO_UPLOAD = b"dummy_bytes_to_write_read_and_delete_appendable_object"


def _get_equal_dist(a: int, b: int) -> tuple[int, int]:
step = (b - a) // 3
return a + step, a + 2 * step


async def write_one_appendable_object(
bucket_name: str,
object_name: str,
Expand Down Expand Up @@ -59,11 +66,21 @@ def appendable_object(storage_client, blobs_to_delete):


@pytest.mark.asyncio
@pytest.mark.parametrize(
"object_size",
[
256, # less than _chunk size
10 * 1024 * 1024, # less than _MAX_BUFFER_SIZE_BYTES
20 * 1024 * 1024, # greater than _MAX_BUFFER_SIZE
],
)
@pytest.mark.parametrize(
"attempt_direct_path",
[True, False],
)
async def test_basic_wrd(storage_client, blobs_to_delete, attempt_direct_path):
async def test_basic_wrd(
storage_client, blobs_to_delete, attempt_direct_path, object_size
):
object_name = f"test_basic_wrd-{str(uuid.uuid4())}"

# Client instantiation; it cannot be part of fixture because.
Expand All @@ -74,22 +91,72 @@ async def test_basic_wrd(storage_client, blobs_to_delete, attempt_direct_path):
# 2. we can keep the same event loop for entire module but that may
# create issues if tests are run in parallel and one test hogs the event
# loop slowing down other tests.
object_data = os.urandom(object_size)
object_checksum = google_crc32c.value(object_data)
grpc_client = AsyncGrpcClient(attempt_direct_path=attempt_direct_path).grpc_client

writer = AsyncAppendableObjectWriter(grpc_client, _ZONAL_BUCKET, object_name)
await writer.open()
await writer.append(_BYTES_TO_UPLOAD)
await writer.append(object_data)
object_metadata = await writer.close(finalize_on_close=True)
assert object_metadata.size == len(_BYTES_TO_UPLOAD)
assert object_metadata.size == object_size
assert int(object_metadata.checksums.crc32c) == object_checksum

mrd = AsyncMultiRangeDownloader(grpc_client, _ZONAL_BUCKET, object_name)
buffer = BytesIO()
await mrd.open()
# (0, 0) means read the whole object
await mrd.download_ranges([(0, 0, buffer)])
await mrd.close()
assert buffer.getvalue() == _BYTES_TO_UPLOAD
assert mrd.persisted_size == len(_BYTES_TO_UPLOAD)
assert buffer.getvalue() == object_data
assert mrd.persisted_size == object_size

# Clean up; use json client (i.e. `storage_client` fixture) to delete.
blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))


@pytest.mark.asyncio
@pytest.mark.parametrize(
"object_size",
[
10, # less than _chunk size,
10 * 1024 * 1024, # less than _MAX_BUFFER_SIZE_BYTES
20 * 1024 * 1024, # greater than _MAX_BUFFER_SIZE_BYTES
],
)
async def test_basic_wrd_in_slices(storage_client, blobs_to_delete, object_size):
object_name = f"test_basic_wrd-{str(uuid.uuid4())}"

# Client instantiation; it cannot be part of fixture because.
# grpc_client's event loop and event loop of coroutine running it
# (i.e. this test) must be same.
# Note:
# 1. @pytest.mark.asyncio ensures new event loop for each test.
# 2. we can keep the same event loop for entire module but that may
# create issues if tests are run in parallel and one test hogs the event
# loop slowing down other tests.
object_data = os.urandom(object_size)
object_checksum = google_crc32c.value(object_data)
grpc_client = AsyncGrpcClient().grpc_client

writer = AsyncAppendableObjectWriter(grpc_client, _ZONAL_BUCKET, object_name)
await writer.open()
mark1, mark2 = _get_equal_dist(0, object_size)
await writer.append(object_data[0:mark1])
await writer.append(object_data[mark1:mark2])
await writer.append(object_data[mark2:])
object_metadata = await writer.close(finalize_on_close=True)
assert object_metadata.size == object_size
assert int(object_metadata.checksums.crc32c) == object_checksum

mrd = AsyncMultiRangeDownloader(grpc_client, _ZONAL_BUCKET, object_name)
buffer = BytesIO()
await mrd.open()
# (0, 0) means read the whole object
await mrd.download_ranges([(0, 0, buffer)])
await mrd.close()
assert buffer.getvalue() == object_data
assert mrd.persisted_size == object_size

# Clean up; use json client (i.e. `storage_client` fixture) to delete.
blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))
Expand Down
Morty Proxy This is a proxified and sanitized view of the page, visit original site.