Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

feat: Implement MemoryStorage and local storage clients #15

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 27 commits into from
Jan 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
715b65e
feat: Implement MemoryStorage and local storage clients
jirimoravcik Dec 19, 2022
9db2a30
fix lint 1
jirimoravcik Dec 19, 2022
daa5d10
lint fix 2
jirimoravcik Dec 19, 2022
1294a08
Merge branch 'master' into feature/implement-memory-storage
jirimoravcik Dec 19, 2022
cb955ee
json dumps utility
jirimoravcik Dec 21, 2022
d15490c
fix lint
jirimoravcik Dec 21, 2022
2a3cc71
resolve todos, refactoring
jirimoravcik Dec 22, 2022
98d5c86
refactoring, fixes
jirimoravcik Dec 23, 2022
a22b835
Merge branch 'master' into feature/implement-memory-storage
jirimoravcik Dec 29, 2022
05378e4
Merge branch 'master' into feature/implement-memory-storage
jirimoravcik Dec 30, 2022
550b118
refactoring stuff, merging master
jirimoravcik Dec 30, 2022
cdeeea0
refactoring, docs
jirimoravcik Dec 30, 2022
bf744ba
Add unit tests for new utility methods
jirimoravcik Dec 30, 2022
f798e2b
fix format
jirimoravcik Dec 30, 2022
eb91c7f
revert MemoryStorage export
jirimoravcik Dec 30, 2022
ef91fd0
fixes and refactoring
jirimoravcik Jan 1, 2023
117a2d0
unit tests for memory storage
jirimoravcik Jan 1, 2023
db4fdc0
unit tests for collection clients
jirimoravcik Jan 1, 2023
a370f64
memory storage via an async fixture
jirimoravcik Jan 1, 2023
4cdfbd2
update timestamps private
jirimoravcik Jan 1, 2023
79e1140
Add unit tests for non-collection clients, add new configs with TODO,…
jirimoravcik Jan 2, 2023
d375c78
Merge branch 'master' into feature/implement-memory-storage
jirimoravcik Jan 3, 2023
a975048
address some PR comments
jirimoravcik Jan 3, 2023
b291a07
address PR comments
jirimoravcik Jan 4, 2023
b4f7a30
fix lint, align interface of MemoryStorage with python client one
jirimoravcik Jan 4, 2023
84fa371
fix types
jirimoravcik Jan 4, 2023
ebea14f
fix
jirimoravcik Jan 4, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions 1 mypy.ini
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
[mypy]
python_version=3.8
files =
docs,
scripts,
Expand Down
2 changes: 2 additions & 0 deletions 2 pytest.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[pytest]
asyncio_mode=auto
3 changes: 3 additions & 0 deletions 3 setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@
'pyee ~= 9.0.4',
'typing-extensions ~= 4.4.0',
'websockets ~= 10.4',
'aiofiles ~= 22.1.0',
'aioshutil ~= 1.2',
],
extras_require={
'dev': [
Expand All @@ -79,6 +81,7 @@
'sphinx ~= 5.3.0',
'sphinx-autodoc-typehints ~= 1.19.5',
'sphinx-markdown-builder == 0.5.4', # pinned to 0.5.4, because 0.5.5 has a formatting bug
'types-aiofiles ~= 22.1.0.4',
'types-psutil ~= 5.9.5.5',
'types-setuptools ~= 65.6.0.1',
],
Expand Down
6 changes: 6 additions & 0 deletions 6 src/apify/_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from typing import Any, Dict, List, Union

# Type for representing json-serializable values
# It's close enough to the real thing supported by json.parse, and the best we can do until mypy supports recursive types
# It was suggested in a discussion with (and approved by) Guido van Rossum, so I'd consider it correct enough
JSONSerializable = Union[str, int, float, bool, None, Dict[str, Any], List[Any]]
148 changes: 147 additions & 1 deletion 148 src/apify/_utils.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,23 @@
import asyncio
import base64
import contextlib
import hashlib
import inspect
import io
import json
import mimetypes
import os
import re
import sys
import time
from datetime import datetime, timezone
from enum import Enum
from typing import Any, Callable, Generic, Optional, TypeVar, Union, overload
from typing import Any, Callable, Dict, Generic, List, NoReturn, Optional, TypeVar, Union, cast, overload

import aioshutil
import psutil
from aiofiles import ospath
from aiofiles.os import remove, rename

from apify_client import __version__ as client_version

Expand All @@ -20,7 +30,9 @@
BOOL_ENV_VARS,
DATETIME_ENV_VARS,
INTEGER_ENV_VARS,
REQUEST_ID_LENGTH,
ApifyEnvVars,
StorageTypes,
)


Expand Down Expand Up @@ -164,3 +176,137 @@ async def _run_func_at_interval_async(func: Callable, interval_secs: float) -> N
res = func()
if inspect.isawaitable(res):
await res


class ListPage: # TODO: Rather use exported version from Apify client
"""A single page of items returned from a list() method."""

#: list: List of returned objects on this page
items: List
#: int: Count of the returned objects on this page
count: int
#: int: The limit on the number of returned objects offset specified in the API call
offset: int
#: int: The offset of the first object specified in the API call
limit: int
#: int: Total number of objects matching the API call criteria
total: int
#: bool: Whether the listing is descending or not
desc: bool

def __init__(self, data: Dict) -> None:
"""Initialize a ListPage instance from the API response data."""
self.items = data['items'] if 'items' in data else []
self.offset = data['offset'] if 'offset' in data else 0
self.limit = data['limit'] if 'limit' in data else 0
self.count = data['count'] if 'count' in data else len(self.items)
self.total = data['total'] if 'total' in data else self.offset + self.count
self.desc = data['desc'] if 'desc' in data else False


async def _force_remove(filename: str) -> None:
"""JS-like rm(filename, { force: true })."""
with contextlib.suppress(FileNotFoundError):
await remove(filename)


def _json_serializer(obj: Any) -> str: # TODO: Decide how to parse/dump/handle datetimes!
if isinstance(obj, (datetime)):
return obj.isoformat(timespec='milliseconds') + 'Z'
else:
return str(obj)


def _filter_out_none_values_recursively(dictionary: Dict) -> Dict:
"""Return copy of the dictionary, recursively omitting all keys for which values are None."""
return cast(dict, _filter_out_none_values_recursively_internal(dictionary))


# Unfortunately, it's necessary to have an internal function for the correct result typing, without having to create complicated overloads
def _filter_out_none_values_recursively_internal(dictionary: Dict, remove_empty_dicts: Optional[bool] = None) -> Optional[Dict]:
result = {}
for k, v in dictionary.items():
if isinstance(v, dict):
v = _filter_out_none_values_recursively_internal(v, remove_empty_dicts is True or remove_empty_dicts is None)
if v is not None:
result[k] = v
if not result and remove_empty_dicts:
return None
return result


def _json_dumps(obj: Any) -> str:
"""Dump JSON to a string with the correct settings and serializer."""
return json.dumps(obj, ensure_ascii=False, indent=2, default=_json_serializer)


uuid_regex = re.compile('[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}', re.I)


def _is_uuid(string: str) -> bool:
"""Test whether the given string matches UUID format."""
return bool(uuid_regex.match(string))


def _raise_on_non_existing_storage(client_type: StorageTypes, id: str) -> NoReturn:
raise ValueError(f'{client_type} with id: {id} does not exist.')


def _raise_on_duplicate_storage(client_type: StorageTypes, key_name: str, value: str) -> NoReturn:
raise ValueError(f'{client_type} with {key_name}: {value} already exists.')


def _guess_file_extension(content_type: str) -> Optional[str]:
"""Guess the file extension based on content type."""
# e.g. mimetypes.guess_extension('application/json ') does not work...
actual_content_type = content_type.split(';')[0].strip()
ext = mimetypes.guess_extension(actual_content_type)
# Remove the leading dot if extension successfully parsed
return ext[1:] if ext is not None else ext


def _is_content_type_json(content_type: str) -> bool:
return bool(re.search(r'^application/json', content_type, flags=re.IGNORECASE))


def _is_content_type_xml(content_type: str) -> bool:
return bool(re.search(r'^application/.*xml$', content_type, flags=re.IGNORECASE))


def _is_content_type_text(content_type: str) -> bool:
return bool(re.search(r'^text/', content_type, flags=re.IGNORECASE))


def _is_file_or_bytes(value: Any) -> bool:
# The check for IOBase is not ideal, it would be better to use duck typing,
# but then the check would be super complex, judging from how the 'requests' library does it.
# This way should be good enough for the vast majority of use cases, if it causes issues, we can improve it later.
return isinstance(value, (bytes, bytearray, io.IOBase))


def _maybe_parse_body(body: bytes, content_type: str) -> Any: # TODO: Improve return type
try:
if _is_content_type_json(content_type):
return json.loads(body) # Returns any
elif _is_content_type_xml(content_type) or _is_content_type_text(content_type):
return body.decode('utf-8') # TODO: Check if utf-8 can be assumed
except ValueError as err:
print('_maybe_parse_body error', err)
return body


def _unique_key_to_request_id(unique_key: str) -> str:
"""Generate request ID based on unique key in a deterministic way."""
id = re.sub(r'(\+|\/|=)', '', base64.b64encode(hashlib.sha256(unique_key.encode('utf-8')).digest()).decode('utf-8'))

return id[:REQUEST_ID_LENGTH] if len(id) > REQUEST_ID_LENGTH else id


async def _force_rename(src_dir: str, dst_dir: str) -> None:
"""Rename a directory. Checks for existence of soruce directory and removes destination directory if it exists."""
# Make sure source directory exists
if await ospath.exists(src_dir):
# Remove destination directory if it exists
if await ospath.exists(dst_dir):
await aioshutil.rmtree(dst_dir, ignore_errors=True)
await rename(src_dir, dst_dir)
4 changes: 2 additions & 2 deletions 4 src/apify/actor.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
import datetime
import functools
import inspect
from datetime import datetime
from types import TracebackType
from typing import Any, Awaitable, Callable, Coroutine, Dict, List, Optional, Type, TypeVar, Union, cast

Expand Down Expand Up @@ -216,7 +216,7 @@ def _get_system_info(self) -> Dict:
memory_usage_bytes = _get_memory_usage_bytes()
# This is in camel case to be compatible with the events from the platform
result = {
'createdAt': datetime.datetime.now().isoformat(timespec='milliseconds') + 'Z',
'createdAt': datetime.utcnow().isoformat(timespec='milliseconds') + 'Z',
'cpuCurrentUsage': cpu_usage_percent,
'memCurrentBytes': memory_usage_bytes,
}
Expand Down
13 changes: 13 additions & 0 deletions 13 src/apify/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,16 @@ class ApifyEnvVars(str, Enum):
]

STRING_ENV_VARS: List[_STRING_ENV_VARS_TYPE] = list(get_args(_STRING_ENV_VARS_TYPE))


class StorageTypes(str, Enum):
"""Possible Apify storage types."""

DATASET = 'Dataset'
KEY_VALUE_STORE = 'Key-value store'
REQUEST_QUEUE = 'Request queue'


DEFAULT_API_PARAM_LIMIT = 1000

REQUEST_ID_LENGTH = 15
108 changes: 108 additions & 0 deletions 108 src/apify/memory_storage/file_storage_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import os
from typing import Dict, List, Tuple

import aiofiles
from aiofiles.os import makedirs

from .._utils import _force_remove, _json_dumps


async def _update_metadata(*, data: Dict, entity_directory: str, write_metadata: bool) -> None:
# Skip writing the actual metadata file. This is done after ensuring the directory exists so we have the directory present
if not write_metadata:
return

# Ensure the directory for the entity exists
await makedirs(entity_directory, exist_ok=True)

# Write the metadata to the file
file_path = os.path.join(entity_directory, '__metadata__.json')
async with aiofiles.open(file_path, mode='wb') as f:
await f.write(_json_dumps(data).encode('utf-8'))


async def _update_dataset_items(
*,
data: List[Tuple[str, Dict]],
entity_directory: str,
persist_storage: bool,
) -> None:
# Skip writing files to the disk if the client has the option set to false
if not persist_storage:
return

# Ensure the directory for the entity exists
await makedirs(entity_directory, exist_ok=True)

# Save all the new items to the disk
for idx, item in data:
file_path = os.path.join(entity_directory, f'{idx}.json')
async with aiofiles.open(file_path, mode='wb') as f:
await f.write(_json_dumps(item).encode('utf-8'))


async def _set_or_delete_key_value_store_record(
*,
entity_directory: str,
persist_storage: bool,
record: Dict,
should_set: bool,
write_metadata: bool,
) -> None:
# Skip writing files to the disk if the client has the option set to false
if not persist_storage:
return

# Ensure the directory for the entity exists
await makedirs(entity_directory, exist_ok=True)

# Create files for the record
record_path = os.path.join(entity_directory, f"""{record['key']}.{record['extension']}""")
record_metadata_path = os.path.join(entity_directory, f"""{record['key']}.__metadata__.json""")

await _force_remove(record_path)
await _force_remove(record_metadata_path)

if should_set:
if write_metadata:
async with aiofiles.open(record_metadata_path, mode='wb') as f:
await f.write(_json_dumps({
'key': record['key'],
'contentType': record.get('content_type') or 'unknown/no content type',
'extension': record['extension'],
}).encode('utf-8'))

# Convert to bytes if string
if isinstance(record['value'], str):
record['value'] = record['value'].encode('utf-8')

async with aiofiles.open(record_path, mode='wb') as f:
await f.write(record['value'])


async def _update_request_queue_item(
*,
request_id: str,
request: Dict,
entity_directory: str,
persist_storage: bool,
) -> None:
# Skip writing files to the disk if the client has the option set to false
if not persist_storage:
return

# Ensure the directory for the entity exists
await makedirs(entity_directory, exist_ok=True)

# Write the request to the file
file_path = os.path.join(entity_directory, f'{request_id}.json')
async with aiofiles.open(file_path, mode='wb') as f:
await f.write(_json_dumps(request).encode('utf-8'))


async def _delete_request(*, request_id: str, entity_directory: str) -> None:
# Ensure the directory for the entity exists
await makedirs(entity_directory, exist_ok=True)

file_path = os.path.join(entity_directory, f'{request_id}.json')
await _force_remove(file_path)
Loading
Morty Proxy This is a proxified and sanitized view of the page, visit original site.