Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

feat: Implement Dataset, KeyValueStore classes, create storage management logic #21

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 25 commits into from
Jan 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
746b92e
feat: Implement Dataset, KeyValueStore and RequeestQueue classes
jirimoravcik Jan 4, 2023
a2001a0
add some docs with interfaces of crawlee code
jirimoravcik Jan 5, 2023
ef5b3c6
import list page from apify_client._utils
jirimoravcik Jan 5, 2023
16ecda1
lots of refactoring, basic kvs implementation, boilerplate for datase…
jirimoravcik Jan 6, 2023
689cec5
more improvements to for each key in kvs
jirimoravcik Jan 6, 2023
9dcb3f8
fixes, refactoring, more progress
jirimoravcik Jan 7, 2023
491f65d
simplify code in actor.py
jirimoravcik Jan 7, 2023
5289c49
fix class datetime references
jirimoravcik Jan 9, 2023
6ed3f2a
fix space
jirimoravcik Jan 9, 2023
be4b368
add storage manager, use it for opening of storages
jirimoravcik Jan 10, 2023
0a8e8ac
fix caching of default storages
jirimoravcik Jan 10, 2023
019a102
instance creation should be named differently...
jirimoravcik Jan 10, 2023
0d644b2
fix
jirimoravcik Jan 10, 2023
b2f072c
store config in the classes
jirimoravcik Jan 10, 2023
5eda90a
the battle rages on
jirimoravcik Jan 10, 2023
7f2bbb5
request queue progress
jirimoravcik Jan 10, 2023
cecea8b
LRUCache impl
jirimoravcik Jan 10, 2023
38c1f8b
cleanup
jirimoravcik Jan 11, 2023
a6ee4fe
fix ordered dict type vs class
jirimoravcik Jan 11, 2023
dbb4c93
fix
jirimoravcik Jan 11, 2023
3f47ebb
the battle rages on
jirimoravcik Jan 11, 2023
16ebf1c
format
jirimoravcik Jan 11, 2023
a8d8ee9
address PR comments
jirimoravcik Jan 14, 2023
a848435
fix docs
jirimoravcik Jan 16, 2023
2ea2d4d
remove todo
jirimoravcik Jan 16, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions 2 .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ __pycache__
*.egg
dist/
build/
# default folder for memory storage data
storage/

.vscode
.idea
12 changes: 6 additions & 6 deletions 12 docs/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,33 +70,33 @@ TODO: docs.

***

#### async classmethod open_dataset(dataset_id_or_name=None)
#### async classmethod open_dataset(dataset_id_or_name=None, \*, force_cloud=False)

TODO: docs.

* **Return type**

`DatasetClientAsync`
`Dataset`

***

#### async classmethod open_key_value_store(key_value_store_id_or_name=None)
#### async classmethod open_key_value_store(key_value_store_id_or_name=None, \*, force_cloud=False)

TODO: docs.

* **Return type**

`KeyValueStoreClientAsync`
`KeyValueStore`

***

#### async classmethod open_request_queue(request_queue_id_or_name=None)
#### async classmethod open_request_queue(request_queue_id_or_name=None, \*, force_cloud=False)

TODO: docs.

* **Return type**

`RequestQueueClientAsync`
`RequestQueue`

***

Expand Down
40 changes: 13 additions & 27 deletions 40 src/apify/_utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import base64
import contextlib
import functools
import hashlib
import inspect
import io
Expand All @@ -12,7 +13,7 @@
import time
from datetime import datetime, timezone
from enum import Enum
from typing import Any, Callable, Dict, Generic, List, NoReturn, Optional, TypeVar, Union, cast, overload
from typing import Any, Callable, Dict, Generic, NoReturn, Optional, TypeVar, Union, cast, overload

import aioshutil
import psutil
Expand Down Expand Up @@ -178,32 +179,6 @@ async def _run_func_at_interval_async(func: Callable, interval_secs: float) -> N
await res


class ListPage: # TODO: Rather use exported version from Apify client
"""A single page of items returned from a list() method."""

#: list: List of returned objects on this page
items: List
#: int: Count of the returned objects on this page
count: int
#: int: The limit on the number of returned objects offset specified in the API call
offset: int
#: int: The offset of the first object specified in the API call
limit: int
#: int: Total number of objects matching the API call criteria
total: int
#: bool: Whether the listing is descending or not
desc: bool

def __init__(self, data: Dict) -> None:
"""Initialize a ListPage instance from the API response data."""
self.items = data['items'] if 'items' in data else []
self.offset = data['offset'] if 'offset' in data else 0
self.limit = data['limit'] if 'limit' in data else 0
self.count = data['count'] if 'count' in data else len(self.items)
self.total = data['total'] if 'total' in data else self.offset + self.count
self.desc = data['desc'] if 'desc' in data else False


async def _force_remove(filename: str) -> None:
"""JS-like rm(filename, { force: true })."""
with contextlib.suppress(FileNotFoundError):
Expand Down Expand Up @@ -310,3 +285,14 @@ async def _force_rename(src_dir: str, dst_dir: str) -> None:
if await ospath.exists(dst_dir):
await aioshutil.rmtree(dst_dir, ignore_errors=True)
await rename(src_dir, dst_dir)

ImplementationType = TypeVar('ImplementationType', bound=Callable)
MetadataType = TypeVar('MetadataType', bound=Callable)


def _wrap_internal(implementation: ImplementationType, metadata_source: MetadataType) -> MetadataType:
@functools.wraps(metadata_source)
def wrapper(*args: Any, **kwargs: Any) -> Any:
return implementation(*args, **kwargs)

return cast(MetadataType, wrapper)
114 changes: 39 additions & 75 deletions 114 src/apify/actor.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
import asyncio
import functools
import inspect
from datetime import datetime
from types import TracebackType
from typing import Any, Awaitable, Callable, Coroutine, Dict, List, Optional, Type, TypeVar, Union, cast

from apify_client import ApifyClientAsync
from apify_client.clients import DatasetClientAsync, KeyValueStoreClientAsync, RequestQueueClientAsync
from apify_client.consts import WebhookEventType

from ._utils import (
Expand All @@ -15,30 +13,24 @@
_get_memory_usage_bytes,
_log_system_info,
_run_func_at_interval_async,
_wrap_internal,
dualproperty,
)
from .config import Configuration
from .consts import ActorEventType, ApifyEnvVars
from .event_manager import EventManager
from .memory_storage import MemoryStorage
from .proxy_configuration import ProxyConfiguration
from .storage_client_manager import StorageClientManager
from .storages import Dataset, KeyValueStore, RequestQueue, StorageManager

MainReturnType = TypeVar('MainReturnType')

T = TypeVar('T', bound=Callable)
U = TypeVar('U', bound=Callable)


def _wrap_internal(implementation: T, metadata_source: U) -> U:
@functools.wraps(metadata_source)
def wrapper(*args: Any, **kwargs: Any) -> Any:
return implementation(*args, **kwargs)

return cast(U, wrapper)


# This metaclass is needed so you can do `with Actor: ...` instead of `with Actor() as a: ...`
# and have automatic `Actor.init()` and `Actor.exit()`
# TODO: decide if this mumbo jumbo is worth it or not, or if it maybe breaks something


class _ActorContextManager(type):
@staticmethod
async def __aenter__() -> Type['Actor']:
Expand Down Expand Up @@ -67,6 +59,7 @@ class Actor(metaclass=_ActorContextManager):

_default_instance: Optional['Actor'] = None
_apify_client: ApifyClientAsync
_memory_storage: MemoryStorage
_config: Configuration
_event_manager: EventManager
_send_system_info_interval_task: Optional[asyncio.Task] = None
Expand Down Expand Up @@ -195,7 +188,9 @@ async def _init_internal(self) -> None:
),
)

if not self.is_at_home():
if self.is_at_home():
StorageClientManager.set_storage_client(self._apify_client)
else:
self._send_system_info_interval_task = asyncio.create_task(
_run_func_at_interval_async(
lambda: self._event_manager.emit(ActorEventType.SYSTEM_INFO, self._get_system_info()),
Expand Down Expand Up @@ -340,89 +335,60 @@ def _new_client_internal(
timeout_secs=timeout_secs,
)

# TODO: create proper Dataset, KeyValueStore and RequestQueue class
def _get_storage_client(self, force_cloud: bool) -> Optional[ApifyClientAsync]:
return self._apify_client if force_cloud else None

@classmethod
async def open_dataset(cls, dataset_id_or_name: Optional[str] = None) -> DatasetClientAsync:
async def open_dataset(cls, dataset_id_or_name: Optional[str] = None, *, force_cloud: bool = False) -> Dataset:
"""TODO: docs."""
return await cls._get_default_instance().open_dataset(dataset_id_or_name=dataset_id_or_name)

async def _open_dataset_internal(self, dataset_id_or_name: Optional[str] = None) -> DatasetClientAsync:
# TODO: this should return a Dataset class rather than the raw client
return await cls._get_default_instance().open_dataset(dataset_id_or_name=dataset_id_or_name, force_cloud=force_cloud)

async def _open_dataset_internal(self, dataset_id_or_name: Optional[str] = None, *, force_cloud: bool = False) -> Dataset:
self._raise_if_not_initialized()

if not dataset_id_or_name:
dataset_id_or_name = self._config.default_dataset_id

dataset_client = self._apify_client.dataset(dataset_id_or_name)

if await dataset_client.get():
return dataset_client

else:
dataset = await self._apify_client.datasets().get_or_create(name=dataset_id_or_name)
return self._apify_client.dataset(dataset['id'])
return await StorageManager.open_storage(Dataset, dataset_id_or_name, self._get_storage_client(force_cloud), self._config)

@classmethod
async def open_key_value_store(cls, key_value_store_id_or_name: Optional[str] = None) -> KeyValueStoreClientAsync:
async def open_key_value_store(cls, key_value_store_id_or_name: Optional[str] = None, *, force_cloud: bool = False) -> KeyValueStore:
"""TODO: docs."""
return await cls._get_default_instance().open_key_value_store(key_value_store_id_or_name=key_value_store_id_or_name)

async def _open_key_value_store_internal(self, key_value_store_id_or_name: Optional[str] = None) -> KeyValueStoreClientAsync:
# TODO: this should return a KeyValueStore class rather than the raw client
return await cls._get_default_instance().open_key_value_store(key_value_store_id_or_name=key_value_store_id_or_name, force_cloud=force_cloud)

async def _open_key_value_store_internal(self, key_value_store_id_or_name: Optional[str] = None, *, force_cloud: bool = False) -> KeyValueStore:
self._raise_if_not_initialized()

if not key_value_store_id_or_name:
key_value_store_id_or_name = self._config.default_key_value_store_id

store_client = self._apify_client.key_value_store(key_value_store_id_or_name)

if await store_client.get():
return store_client

else:
key_value_store = await self._apify_client.key_value_stores().get_or_create(name=key_value_store_id_or_name)
return self._apify_client.key_value_store(key_value_store['id'])
return await StorageManager.open_storage(KeyValueStore, key_value_store_id_or_name, self._get_storage_client(force_cloud), self._config)

@classmethod
async def open_request_queue(cls, request_queue_id_or_name: Optional[str] = None) -> RequestQueueClientAsync:
async def open_request_queue(cls, request_queue_id_or_name: Optional[str] = None, *, force_cloud: bool = False) -> RequestQueue:
"""TODO: docs."""
return await cls._get_default_instance().open_request_queue(request_queue_id_or_name=request_queue_id_or_name)

async def _open_request_queue_internal(self, request_queue_id_or_name: Optional[str] = None) -> RequestQueueClientAsync:
# TODO: this should return a RequestQueue class rather than the raw client
return await cls._get_default_instance().open_request_queue(request_queue_id_or_name=request_queue_id_or_name, force_cloud=force_cloud)

async def _open_request_queue_internal(
self,
request_queue_id_or_name: Optional[str] = None,
*,
force_cloud: bool = False,
) -> RequestQueue:
self._raise_if_not_initialized()

if not request_queue_id_or_name:
request_queue_id_or_name = self._config.default_request_queue_id

queue_client = self._apify_client.request_queue(request_queue_id_or_name)

if await queue_client.get():
return queue_client

else:
request_queue = await self._apify_client.request_queues().get_or_create(name=request_queue_id_or_name)
return self._apify_client.request_queue(request_queue['id'])
return await StorageManager.open_storage(RequestQueue, request_queue_id_or_name, self._get_storage_client(force_cloud), self._config)

@classmethod
async def push_data(cls, data: Any) -> None:
"""TODO: docs."""
return await cls._get_default_instance().push_data(data=data)
await cls._get_default_instance().push_data(data=data)

async def _push_data_internal(self, data: Any) -> None:
self._raise_if_not_initialized()

if not data:
return

if not isinstance(data, list):
if not isinstance(data, list): # TODO: Memory storage does this on its own...
data = [data]

dataset_client = await self.open_dataset()
return await dataset_client.push_items(data)
dataset = await self.open_dataset()
await dataset.push_data(data)

@classmethod
async def get_input(cls) -> Any:
Expand All @@ -444,11 +410,9 @@ async def get_value(cls, key: str) -> Any:
async def _get_value_internal(self, key: str) -> Any:
self._raise_if_not_initialized()

key_value_store_client = await self.open_key_value_store()
record = await key_value_store_client.get_record(key)
if record:
return record['value']
return None
key_value_store = await self.open_key_value_store()
value = await key_value_store.get_value(key)
return value

@classmethod
async def set_value(cls, key: str, value: Any, options: Optional[Dict] = None) -> None:
Expand All @@ -464,8 +428,8 @@ async def _set_value_internal(self, key: str, value: Any, options: Optional[Dict

content_type = options['content_type'] if options else None

key_value_store_client = await self.open_key_value_store()
return await key_value_store_client.set_record(key, value, content_type=content_type)
key_value_store = await self.open_key_value_store()
return await key_value_store.set_value(key, value, content_type=content_type)

@classmethod
def on(cls, event: ActorEventType, listener: Callable) -> Callable:
Expand Down
1 change: 1 addition & 0 deletions 1 src/apify/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ def __init__(self) -> None:
self.meta_origin = _fetch_and_parse_env_var(ApifyEnvVars.META_ORIGIN)
self.metamorph_after_sleep_millis = _fetch_and_parse_env_var(ApifyEnvVars.METAMORPH_AFTER_SLEEP_MILLIS, 300000)
self.persist_state_interval_millis = _fetch_and_parse_env_var(ApifyEnvVars.PERSIST_STATE_INTERVAL_MILLIS, 60000)
self.persist_storage = _fetch_and_parse_env_var(ApifyEnvVars.PERSIST_STORAGE)
self.proxy_hostname = _fetch_and_parse_env_var(ApifyEnvVars.PROXY_HOSTNAME, 'proxy.apify.com')
self.proxy_password = _fetch_and_parse_env_var(ApifyEnvVars.PROXY_PASSWORD)
self.proxy_port = _fetch_and_parse_env_var(ApifyEnvVars.PROXY_PORT, 8000)
Expand Down
4 changes: 4 additions & 0 deletions 4 src/apify/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class ApifyEnvVars(str, Enum):
LOG_LEVEL = 'APIFY_LOG_LEVEL'
MEMORY_MBYTES = 'APIFY_MEMORY_MBYTES'
META_ORIGIN = 'APIFY_META_ORIGIN'
PERSIST_STORAGE = 'APIFY_PERSIST_STORAGE'
PROXY_HOSTNAME = 'APIFY_PROXY_HOSTNAME'
PROXY_PASSWORD = 'APIFY_PROXY_PASSWORD'
PROXY_PORT = 'APIFY_PROXY_PORT'
Expand Down Expand Up @@ -86,6 +87,7 @@ class ApifyEnvVars(str, Enum):
ApifyEnvVars.DISABLE_OUTDATED_WARNING,
ApifyEnvVars.HEADLESS,
ApifyEnvVars.IS_AT_HOME,
ApifyEnvVars.PERSIST_STORAGE,
ApifyEnvVars.PURGE_ON_START,
ApifyEnvVars.XVFB,
]
Expand Down Expand Up @@ -146,3 +148,5 @@ class StorageTypes(str, Enum):
DEFAULT_API_PARAM_LIMIT = 1000

REQUEST_ID_LENGTH = 15

REQUEST_QUEUE_HEAD_MAX_LIMIT = 1000
3 changes: 3 additions & 0 deletions 3 src/apify/memory_storage/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .memory_storage import MemoryStorage

__all__ = ['MemoryStorage']
5 changes: 4 additions & 1 deletion 5 src/apify/memory_storage/memory_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ class MemoryStorage:
_key_value_stores_handled: List[KeyValueStoreClient]
_request_queues_handled: List[RequestQueueClient]

_purged: bool = False
"""Indicates whether a purge was already performed on this instance"""

def __init__(
self, *, local_data_directory: str = './storage', write_metadata: Optional[bool] = None, persist_storage: Optional[bool] = None,
) -> None:
Expand Down Expand Up @@ -83,7 +86,7 @@ def request_queues(self) -> RequestQueueCollectionClient:
"""Retrieve the sub-client for manipulating request queues."""
return RequestQueueCollectionClient(base_storage_directory=self._request_queues_directory, client=self)

def request_queue(self, request_queue_id: str, *, _client_key: Optional[str] = None) -> RequestQueueClient:
def request_queue(self, request_queue_id: str, *, client_key: Optional[str] = None) -> RequestQueueClient: # noqa: U100
"""Retrieve the sub-client for manipulating a single request queue.

Args:
Expand Down
9 changes: 9 additions & 0 deletions 9 src/apify/memory_storage/resource_clients/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from .dataset import DatasetClient
from .dataset_collection import DatasetCollectionClient
from .key_value_store import KeyValueStoreClient
from .key_value_store_collection import KeyValueStoreCollectionClient
from .request_queue import RequestQueueClient
from .request_queue_collection import RequestQueueCollectionClient

__all__ = ['DatasetClient', 'DatasetCollectionClient', 'KeyValueStoreClient',
'KeyValueStoreCollectionClient', 'RequestQueueClient', 'RequestQueueCollectionClient']
Loading
Morty Proxy This is a proxified and sanitized view of the page, visit original site.