Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit d1b357c

Browse filesBrowse files
authored
feat: Implement Dataset, KeyValueStore classes, create storage management logic (#21)
1 parent b7c9886 commit d1b357c
Copy full SHA for d1b357c
Expand file treeCollapse file tree

34 files changed

+933
-172
lines changed

‎.gitignore

Copy file name to clipboardExpand all lines: .gitignore
+2Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ __pycache__
1111
*.egg
1212
dist/
1313
build/
14+
# default folder for memory storage data
15+
storage/
1416

1517
.vscode
1618
.idea

‎docs/docs.md

Copy file name to clipboardExpand all lines: docs/docs.md
+6-6Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -70,33 +70,33 @@ TODO: docs.
7070

7171
***
7272

73-
#### async classmethod open_dataset(dataset_id_or_name=None)
73+
#### async classmethod open_dataset(dataset_id_or_name=None, \*, force_cloud=False)
7474

7575
TODO: docs.
7676

7777
* **Return type**
7878

79-
`DatasetClientAsync`
79+
`Dataset`
8080

8181
***
8282

83-
#### async classmethod open_key_value_store(key_value_store_id_or_name=None)
83+
#### async classmethod open_key_value_store(key_value_store_id_or_name=None, \*, force_cloud=False)
8484

8585
TODO: docs.
8686

8787
* **Return type**
8888

89-
`KeyValueStoreClientAsync`
89+
`KeyValueStore`
9090

9191
***
9292

93-
#### async classmethod open_request_queue(request_queue_id_or_name=None)
93+
#### async classmethod open_request_queue(request_queue_id_or_name=None, \*, force_cloud=False)
9494

9595
TODO: docs.
9696

9797
* **Return type**
9898

99-
`RequestQueueClientAsync`
99+
`RequestQueue`
100100

101101
***
102102

‎src/apify/_utils.py

Copy file name to clipboardExpand all lines: src/apify/_utils.py
+13-27Lines changed: 13 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import asyncio
22
import base64
33
import contextlib
4+
import functools
45
import hashlib
56
import inspect
67
import io
@@ -12,7 +13,7 @@
1213
import time
1314
from datetime import datetime, timezone
1415
from enum import Enum
15-
from typing import Any, Callable, Dict, Generic, List, NoReturn, Optional, TypeVar, Union, cast, overload
16+
from typing import Any, Callable, Dict, Generic, NoReturn, Optional, TypeVar, Union, cast, overload
1617

1718
import aioshutil
1819
import psutil
@@ -178,32 +179,6 @@ async def _run_func_at_interval_async(func: Callable, interval_secs: float) -> N
178179
await res
179180

180181

181-
class ListPage: # TODO: Rather use exported version from Apify client
182-
"""A single page of items returned from a list() method."""
183-
184-
#: list: List of returned objects on this page
185-
items: List
186-
#: int: Count of the returned objects on this page
187-
count: int
188-
#: int: The limit on the number of returned objects offset specified in the API call
189-
offset: int
190-
#: int: The offset of the first object specified in the API call
191-
limit: int
192-
#: int: Total number of objects matching the API call criteria
193-
total: int
194-
#: bool: Whether the listing is descending or not
195-
desc: bool
196-
197-
def __init__(self, data: Dict) -> None:
198-
"""Initialize a ListPage instance from the API response data."""
199-
self.items = data['items'] if 'items' in data else []
200-
self.offset = data['offset'] if 'offset' in data else 0
201-
self.limit = data['limit'] if 'limit' in data else 0
202-
self.count = data['count'] if 'count' in data else len(self.items)
203-
self.total = data['total'] if 'total' in data else self.offset + self.count
204-
self.desc = data['desc'] if 'desc' in data else False
205-
206-
207182
async def _force_remove(filename: str) -> None:
208183
"""JS-like rm(filename, { force: true })."""
209184
with contextlib.suppress(FileNotFoundError):
@@ -310,3 +285,14 @@ async def _force_rename(src_dir: str, dst_dir: str) -> None:
310285
if await ospath.exists(dst_dir):
311286
await aioshutil.rmtree(dst_dir, ignore_errors=True)
312287
await rename(src_dir, dst_dir)
288+
289+
ImplementationType = TypeVar('ImplementationType', bound=Callable)
290+
MetadataType = TypeVar('MetadataType', bound=Callable)
291+
292+
293+
def _wrap_internal(implementation: ImplementationType, metadata_source: MetadataType) -> MetadataType:
294+
@functools.wraps(metadata_source)
295+
def wrapper(*args: Any, **kwargs: Any) -> Any:
296+
return implementation(*args, **kwargs)
297+
298+
return cast(MetadataType, wrapper)

‎src/apify/actor.py

Copy file name to clipboardExpand all lines: src/apify/actor.py
+39-75Lines changed: 39 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,10 @@
11
import asyncio
2-
import functools
32
import inspect
43
from datetime import datetime
54
from types import TracebackType
65
from typing import Any, Awaitable, Callable, Coroutine, Dict, List, Optional, Type, TypeVar, Union, cast
76

87
from apify_client import ApifyClientAsync
9-
from apify_client.clients import DatasetClientAsync, KeyValueStoreClientAsync, RequestQueueClientAsync
108
from apify_client.consts import WebhookEventType
119

1210
from ._utils import (
@@ -15,30 +13,24 @@
1513
_get_memory_usage_bytes,
1614
_log_system_info,
1715
_run_func_at_interval_async,
16+
_wrap_internal,
1817
dualproperty,
1918
)
2019
from .config import Configuration
2120
from .consts import ActorEventType, ApifyEnvVars
2221
from .event_manager import EventManager
22+
from .memory_storage import MemoryStorage
2323
from .proxy_configuration import ProxyConfiguration
24+
from .storage_client_manager import StorageClientManager
25+
from .storages import Dataset, KeyValueStore, RequestQueue, StorageManager
2426

2527
MainReturnType = TypeVar('MainReturnType')
2628

27-
T = TypeVar('T', bound=Callable)
28-
U = TypeVar('U', bound=Callable)
29-
30-
31-
def _wrap_internal(implementation: T, metadata_source: U) -> U:
32-
@functools.wraps(metadata_source)
33-
def wrapper(*args: Any, **kwargs: Any) -> Any:
34-
return implementation(*args, **kwargs)
35-
36-
return cast(U, wrapper)
37-
38-
3929
# This metaclass is needed so you can do `with Actor: ...` instead of `with Actor() as a: ...`
4030
# and have automatic `Actor.init()` and `Actor.exit()`
4131
# TODO: decide if this mumbo jumbo is worth it or not, or if it maybe breaks something
32+
33+
4234
class _ActorContextManager(type):
4335
@staticmethod
4436
async def __aenter__() -> Type['Actor']:
@@ -67,6 +59,7 @@ class Actor(metaclass=_ActorContextManager):
6759

6860
_default_instance: Optional['Actor'] = None
6961
_apify_client: ApifyClientAsync
62+
_memory_storage: MemoryStorage
7063
_config: Configuration
7164
_event_manager: EventManager
7265
_send_system_info_interval_task: Optional[asyncio.Task] = None
@@ -195,7 +188,9 @@ async def _init_internal(self) -> None:
195188
),
196189
)
197190

198-
if not self.is_at_home():
191+
if self.is_at_home():
192+
StorageClientManager.set_storage_client(self._apify_client)
193+
else:
199194
self._send_system_info_interval_task = asyncio.create_task(
200195
_run_func_at_interval_async(
201196
lambda: self._event_manager.emit(ActorEventType.SYSTEM_INFO, self._get_system_info()),
@@ -340,89 +335,60 @@ def _new_client_internal(
340335
timeout_secs=timeout_secs,
341336
)
342337

343-
# TODO: create proper Dataset, KeyValueStore and RequestQueue class
338+
def _get_storage_client(self, force_cloud: bool) -> Optional[ApifyClientAsync]:
339+
return self._apify_client if force_cloud else None
340+
344341
@classmethod
345-
async def open_dataset(cls, dataset_id_or_name: Optional[str] = None) -> DatasetClientAsync:
342+
async def open_dataset(cls, dataset_id_or_name: Optional[str] = None, *, force_cloud: bool = False) -> Dataset:
346343
"""TODO: docs."""
347-
return await cls._get_default_instance().open_dataset(dataset_id_or_name=dataset_id_or_name)
348-
349-
async def _open_dataset_internal(self, dataset_id_or_name: Optional[str] = None) -> DatasetClientAsync:
350-
# TODO: this should return a Dataset class rather than the raw client
344+
return await cls._get_default_instance().open_dataset(dataset_id_or_name=dataset_id_or_name, force_cloud=force_cloud)
351345

346+
async def _open_dataset_internal(self, dataset_id_or_name: Optional[str] = None, *, force_cloud: bool = False) -> Dataset:
352347
self._raise_if_not_initialized()
353348

354-
if not dataset_id_or_name:
355-
dataset_id_or_name = self._config.default_dataset_id
356-
357-
dataset_client = self._apify_client.dataset(dataset_id_or_name)
358-
359-
if await dataset_client.get():
360-
return dataset_client
361-
362-
else:
363-
dataset = await self._apify_client.datasets().get_or_create(name=dataset_id_or_name)
364-
return self._apify_client.dataset(dataset['id'])
349+
return await StorageManager.open_storage(Dataset, dataset_id_or_name, self._get_storage_client(force_cloud), self._config)
365350

366351
@classmethod
367-
async def open_key_value_store(cls, key_value_store_id_or_name: Optional[str] = None) -> KeyValueStoreClientAsync:
352+
async def open_key_value_store(cls, key_value_store_id_or_name: Optional[str] = None, *, force_cloud: bool = False) -> KeyValueStore:
368353
"""TODO: docs."""
369-
return await cls._get_default_instance().open_key_value_store(key_value_store_id_or_name=key_value_store_id_or_name)
370-
371-
async def _open_key_value_store_internal(self, key_value_store_id_or_name: Optional[str] = None) -> KeyValueStoreClientAsync:
372-
# TODO: this should return a KeyValueStore class rather than the raw client
354+
return await cls._get_default_instance().open_key_value_store(key_value_store_id_or_name=key_value_store_id_or_name, force_cloud=force_cloud)
373355

356+
async def _open_key_value_store_internal(self, key_value_store_id_or_name: Optional[str] = None, *, force_cloud: bool = False) -> KeyValueStore:
374357
self._raise_if_not_initialized()
375358

376-
if not key_value_store_id_or_name:
377-
key_value_store_id_or_name = self._config.default_key_value_store_id
378-
379-
store_client = self._apify_client.key_value_store(key_value_store_id_or_name)
380-
381-
if await store_client.get():
382-
return store_client
383-
384-
else:
385-
key_value_store = await self._apify_client.key_value_stores().get_or_create(name=key_value_store_id_or_name)
386-
return self._apify_client.key_value_store(key_value_store['id'])
359+
return await StorageManager.open_storage(KeyValueStore, key_value_store_id_or_name, self._get_storage_client(force_cloud), self._config)
387360

388361
@classmethod
389-
async def open_request_queue(cls, request_queue_id_or_name: Optional[str] = None) -> RequestQueueClientAsync:
362+
async def open_request_queue(cls, request_queue_id_or_name: Optional[str] = None, *, force_cloud: bool = False) -> RequestQueue:
390363
"""TODO: docs."""
391-
return await cls._get_default_instance().open_request_queue(request_queue_id_or_name=request_queue_id_or_name)
392-
393-
async def _open_request_queue_internal(self, request_queue_id_or_name: Optional[str] = None) -> RequestQueueClientAsync:
394-
# TODO: this should return a RequestQueue class rather than the raw client
364+
return await cls._get_default_instance().open_request_queue(request_queue_id_or_name=request_queue_id_or_name, force_cloud=force_cloud)
395365

366+
async def _open_request_queue_internal(
367+
self,
368+
request_queue_id_or_name: Optional[str] = None,
369+
*,
370+
force_cloud: bool = False,
371+
) -> RequestQueue:
396372
self._raise_if_not_initialized()
397373

398-
if not request_queue_id_or_name:
399-
request_queue_id_or_name = self._config.default_request_queue_id
400-
401-
queue_client = self._apify_client.request_queue(request_queue_id_or_name)
402-
403-
if await queue_client.get():
404-
return queue_client
405-
406-
else:
407-
request_queue = await self._apify_client.request_queues().get_or_create(name=request_queue_id_or_name)
408-
return self._apify_client.request_queue(request_queue['id'])
374+
return await StorageManager.open_storage(RequestQueue, request_queue_id_or_name, self._get_storage_client(force_cloud), self._config)
409375

410376
@classmethod
411377
async def push_data(cls, data: Any) -> None:
412378
"""TODO: docs."""
413-
return await cls._get_default_instance().push_data(data=data)
379+
await cls._get_default_instance().push_data(data=data)
414380

415381
async def _push_data_internal(self, data: Any) -> None:
416382
self._raise_if_not_initialized()
417383

418384
if not data:
419385
return
420386

421-
if not isinstance(data, list):
387+
if not isinstance(data, list): # TODO: Memory storage does this on its own...
422388
data = [data]
423389

424-
dataset_client = await self.open_dataset()
425-
return await dataset_client.push_items(data)
390+
dataset = await self.open_dataset()
391+
await dataset.push_data(data)
426392

427393
@classmethod
428394
async def get_input(cls) -> Any:
@@ -444,11 +410,9 @@ async def get_value(cls, key: str) -> Any:
444410
async def _get_value_internal(self, key: str) -> Any:
445411
self._raise_if_not_initialized()
446412

447-
key_value_store_client = await self.open_key_value_store()
448-
record = await key_value_store_client.get_record(key)
449-
if record:
450-
return record['value']
451-
return None
413+
key_value_store = await self.open_key_value_store()
414+
value = await key_value_store.get_value(key)
415+
return value
452416

453417
@classmethod
454418
async def set_value(cls, key: str, value: Any, options: Optional[Dict] = None) -> None:
@@ -464,8 +428,8 @@ async def _set_value_internal(self, key: str, value: Any, options: Optional[Dict
464428

465429
content_type = options['content_type'] if options else None
466430

467-
key_value_store_client = await self.open_key_value_store()
468-
return await key_value_store_client.set_record(key, value, content_type=content_type)
431+
key_value_store = await self.open_key_value_store()
432+
return await key_value_store.set_value(key, value, content_type=content_type)
469433

470434
@classmethod
471435
def on(cls, event: ActorEventType, listener: Callable) -> Callable:

‎src/apify/config.py

Copy file name to clipboardExpand all lines: src/apify/config.py
+1Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ def __init__(self) -> None:
3737
self.meta_origin = _fetch_and_parse_env_var(ApifyEnvVars.META_ORIGIN)
3838
self.metamorph_after_sleep_millis = _fetch_and_parse_env_var(ApifyEnvVars.METAMORPH_AFTER_SLEEP_MILLIS, 300000)
3939
self.persist_state_interval_millis = _fetch_and_parse_env_var(ApifyEnvVars.PERSIST_STATE_INTERVAL_MILLIS, 60000)
40+
self.persist_storage = _fetch_and_parse_env_var(ApifyEnvVars.PERSIST_STORAGE)
4041
self.proxy_hostname = _fetch_and_parse_env_var(ApifyEnvVars.PROXY_HOSTNAME, 'proxy.apify.com')
4142
self.proxy_password = _fetch_and_parse_env_var(ApifyEnvVars.PROXY_PASSWORD)
4243
self.proxy_port = _fetch_and_parse_env_var(ApifyEnvVars.PROXY_PORT, 8000)

‎src/apify/consts.py

Copy file name to clipboardExpand all lines: src/apify/consts.py
+4Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ class ApifyEnvVars(str, Enum):
5151
LOG_LEVEL = 'APIFY_LOG_LEVEL'
5252
MEMORY_MBYTES = 'APIFY_MEMORY_MBYTES'
5353
META_ORIGIN = 'APIFY_META_ORIGIN'
54+
PERSIST_STORAGE = 'APIFY_PERSIST_STORAGE'
5455
PROXY_HOSTNAME = 'APIFY_PROXY_HOSTNAME'
5556
PROXY_PASSWORD = 'APIFY_PROXY_PASSWORD'
5657
PROXY_PORT = 'APIFY_PROXY_PORT'
@@ -86,6 +87,7 @@ class ApifyEnvVars(str, Enum):
8687
ApifyEnvVars.DISABLE_OUTDATED_WARNING,
8788
ApifyEnvVars.HEADLESS,
8889
ApifyEnvVars.IS_AT_HOME,
90+
ApifyEnvVars.PERSIST_STORAGE,
8991
ApifyEnvVars.PURGE_ON_START,
9092
ApifyEnvVars.XVFB,
9193
]
@@ -146,3 +148,5 @@ class StorageTypes(str, Enum):
146148
DEFAULT_API_PARAM_LIMIT = 1000
147149

148150
REQUEST_ID_LENGTH = 15
151+
152+
REQUEST_QUEUE_HEAD_MAX_LIMIT = 1000

‎src/apify/memory_storage/__init__.py

Copy file name to clipboard
+3Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from .memory_storage import MemoryStorage
2+
3+
__all__ = ['MemoryStorage']

‎src/apify/memory_storage/memory_storage.py

Copy file name to clipboardExpand all lines: src/apify/memory_storage/memory_storage.py
+4-1Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ class MemoryStorage:
3434
_key_value_stores_handled: List[KeyValueStoreClient]
3535
_request_queues_handled: List[RequestQueueClient]
3636

37+
_purged: bool = False
38+
"""Indicates whether a purge was already performed on this instance"""
39+
3740
def __init__(
3841
self, *, local_data_directory: str = './storage', write_metadata: Optional[bool] = None, persist_storage: Optional[bool] = None,
3942
) -> None:
@@ -83,7 +86,7 @@ def request_queues(self) -> RequestQueueCollectionClient:
8386
"""Retrieve the sub-client for manipulating request queues."""
8487
return RequestQueueCollectionClient(base_storage_directory=self._request_queues_directory, client=self)
8588

86-
def request_queue(self, request_queue_id: str, *, _client_key: Optional[str] = None) -> RequestQueueClient:
89+
def request_queue(self, request_queue_id: str, *, client_key: Optional[str] = None) -> RequestQueueClient: # noqa: U100
8790
"""Retrieve the sub-client for manipulating a single request queue.
8891
8992
Args:
+9Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
from .dataset import DatasetClient
2+
from .dataset_collection import DatasetCollectionClient
3+
from .key_value_store import KeyValueStoreClient
4+
from .key_value_store_collection import KeyValueStoreCollectionClient
5+
from .request_queue import RequestQueueClient
6+
from .request_queue_collection import RequestQueueCollectionClient
7+
8+
__all__ = ['DatasetClient', 'DatasetCollectionClient', 'KeyValueStoreClient',
9+
'KeyValueStoreCollectionClient', 'RequestQueueClient', 'RequestQueueCollectionClient']

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.