From 5faf1ab237fb635891bf2a1d6e7502e6e995b82d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ji=C5=99=C3=AD=20Morav=C4=8D=C3=ADk?= Date: Wed, 18 Jan 2023 10:36:33 +0100 Subject: [PATCH 1/9] feat: Implement RequestQueue class --- src/apify/_utils.py | 63 ++++- src/apify/storages/_utils.py | 57 ----- src/apify/storages/request_queue.py | 364 +++++++++++++++++++++++++++- 3 files changed, 424 insertions(+), 60 deletions(-) delete mode 100644 src/apify/storages/_utils.py diff --git a/src/apify/_utils.py b/src/apify/_utils.py index 2723979a..c987cc2b 100644 --- a/src/apify/_utils.py +++ b/src/apify/_utils.py @@ -9,11 +9,16 @@ import mimetypes import os import re +import secrets import sys import time +from collections import OrderedDict +from collections.abc import MutableMapping from datetime import datetime, timezone from enum import Enum -from typing import Any, Callable, Dict, Generic, NoReturn, Optional, TypeVar, Union, cast, overload +from typing import Any, Callable, Dict, Generic, ItemsView, Iterator, NoReturn, Optional +from typing import OrderedDict as OrderedDictType +from typing import TypeVar, Union, ValuesView, cast, overload import aioshutil import psutil @@ -296,3 +301,59 @@ def wrapper(*args: Any, **kwargs: Any) -> Any: return implementation(*args, **kwargs) return cast(MetadataType, wrapper) + + +def _crypto_random_object_id(length: int = 17) -> str: + """Python reimplementation of cryptoRandomObjectId from `@apify/utilities`.""" + chars = 'abcdefghijklmnopqrstuvwxyzABCEDFGHIJKLMNOPQRSTUVWXYZ0123456789' + return ''.join(secrets.choice(chars) for _ in range(length)) + + +T = TypeVar('T') + + +class LRUCache(MutableMapping, Generic[T]): + """Attempt to reimplement LRUCache from `@apify/datastructures` using `OrderedDict`.""" + + _cache: OrderedDictType[str, T] + + _max_length: int + + def __init__(self, max_length: int) -> None: + """Create a LRUCache with a specific max_length.""" + self._cache = OrderedDict() + self._max_length = max_length + + def __getitem__(self, key: str) -> T: + """Get an item from the cache. Move it to the end if present.""" + val = self._cache[key] + self._cache.move_to_end(key) + return val + + # Sadly TS impl returns bool indicating whether the key was already present or not + def __setitem__(self, key: str, value: T) -> None: + """Add an item to the cache. Remove least used item if max_length exceeded.""" + self._cache[key] = value + if len(self._cache) > self._max_length: + self._cache.popitem(last=False) + + def __delitem__(self, key: str) -> None: + """Remove an item from the cache.""" + # TODO: maybe do? self._cache.__delitem__(key) + del self._cache[key] + + def __iter__(self) -> Iterator: + """Iterate over the keys of the cache in order of insertion.""" + yield from self._cache.__iter__() + + def __len__(self) -> int: + """Get the number of items in the cache.""" + return len(self._cache) + + def values(self) -> ValuesView[T]: # Needed so we don't mutate the cache by __getitem__ + """Iterate over the values in the cache in order of insertion.""" + return self._cache.values() + + def items(self) -> ItemsView[str, T]: # Needed so we don't mutate the cache by __getitem__ + """Iterate over the pairs of (key, value) in the cache in order of insertion.""" + return self._cache.items() diff --git a/src/apify/storages/_utils.py b/src/apify/storages/_utils.py deleted file mode 100644 index 5bdec6d1..00000000 --- a/src/apify/storages/_utils.py +++ /dev/null @@ -1,57 +0,0 @@ -import secrets -from collections import OrderedDict -from typing import Generic, Optional -from typing import OrderedDict as OrderedDictType -from typing import TypeVar - - -def _crypto_random_object_id(length: int = 17) -> str: - """Python reimplementation of cryptoRandomObjectId from `@apify/utilities`.""" - chars = 'abcdefghijklmnopqrstuvwxyzABCEDFGHIJKLMNOPQRSTUVWXYZ0123456789' - return ''.join(secrets.choice(chars) for _ in range(length)) - - -T = TypeVar('T') - - -# TODO: Check out possible implementations of the LRUCache in Python -# e.g. https://github.com/jlhutch/pylru or https://stackoverflow.com/questions/19775685/how-to-correctly-implement-the-mapping-protocol-in-python -class LRUCache(Generic[T]): - """Attempt to reimplement LRUCache from `@apify/datastructures` using `OrderedDict`.""" - - _cache: OrderedDictType[str, T] - - _max_length: int - - def __init__(self, max_length: int) -> None: - """Crete a LRUCache with a specific max_length.""" - self._cache = OrderedDict() - self._max_length = max_length - - def get(self, key: str) -> Optional[T]: - """Get an item from the cache. Move it to the end if present.""" - val = self._cache.get(key) - if val is not None: - self._cache.move_to_end(key) - return val - - def add(self, key: str, value: T) -> bool: - """Add an item to the cache. Remove least used item if max_length exceeded.""" - if key in self._cache: - return False - self._cache[key] = value - if len(self._cache) > self._max_length: - self._cache.popitem(last=False) - return True - - def remove(self, key: str) -> Optional[T]: - """Remove an item from the cache.""" - return self._cache.pop(key) - - def clear(self) -> None: - """Clear the cache.""" - self._cache.clear() - - def __len__(self) -> int: - """Get the number of items in the cache.""" - return len(self._cache) diff --git a/src/apify/storages/request_queue.py b/src/apify/storages/request_queue.py index 10186f6e..4fe02d71 100644 --- a/src/apify/storages/request_queue.py +++ b/src/apify/storages/request_queue.py @@ -1,12 +1,66 @@ -from typing import Optional, Union +import asyncio +import json +import logging +from collections import OrderedDict +from datetime import datetime +from typing import Coroutine, Dict, Optional +from typing import OrderedDict as OrderedDictType +from typing import Set, TypedDict, Union from apify_client import ApifyClientAsync from apify_client.clients import RequestQueueClientAsync +from .._utils import LRUCache, _crypto_random_object_id, _unique_key_to_request_id from ..config import Configuration +from ..consts import REQUEST_QUEUE_HEAD_MAX_LIMIT from ..memory_storage import MemoryStorage from ..memory_storage.resource_clients import RequestQueueClient -from ._utils import _crypto_random_object_id +from .storage_manager import StorageManager + +MAX_CACHED_REQUESTS = 1_000_000 + +QUERY_HEAD_MIN_LENGTH = 100 +"""When requesting queue head we always fetch requestsInProgressCount * QUERY_HEAD_BUFFER number of requests.""" + +QUERY_HEAD_BUFFER = 3 + +API_PROCESSED_REQUESTS_DELAY_MILLIS = 10_000 +""" + If queue was modified (request added/updated/deleted) before more than API_PROCESSED_REQUESTS_DELAY_MILLIS + then we assume the get head operation to be consistent. +""" + +MAX_QUERIES_FOR_CONSISTENCY = 6 +""" + How many times we try to get queue head with queueModifiedAt older than API_PROCESSED_REQUESTS_DELAY_MILLIS. +""" + +RECENTLY_HANDLED_CACHE_SIZE = 1000 +""" + This number must be large enough so that processing of all these requests cannot be done in + a time lower than expected maximum latency of DynamoDB, but low enough not to waste too much memory. +""" + +STORAGE_CONSISTENCY_DELAY_MILLIS = 3000 +""" + Indicates how long it usually takes for the underlying storage to propagate all writes + to be available to subsequent reads. +""" + + +class QueueOperationInfo(TypedDict): + """TODO: docs.""" + + was_already_present: bool + """Indicates if request was already present in the queue.""" + + was_already_handled: bool + """Indicates if request was already marked as handled.""" + + request_id: str + """The id of the added request""" + + unique_key: str class RequestQueue: @@ -17,6 +71,15 @@ class RequestQueue: _client: Union[RequestQueueClientAsync, RequestQueueClient] _config: Configuration _client_key = _crypto_random_object_id() + _queue_head_dict: OrderedDictType[str, str] + _query_queue_head_promise: Optional[Coroutine] + _in_progress: Set[str] + _last_activity: datetime + _internal_timeout_seconds = 5 * 60 + _recently_handled: LRUCache[bool] + _assumed_total_count = 0 + _assumed_handled_count = 0 + _requests_cache: LRUCache[Dict] def __init__(self, id: str, name: Optional[str], client: Union[ApifyClientAsync, MemoryStorage]) -> None: """TODO: docs (constructor should be "internal").""" @@ -24,6 +87,12 @@ def __init__(self, id: str, name: Optional[str], client: Union[ApifyClientAsync, self._name = name self._client = client.request_queue(self._id, client_key=self._client_key) self._config = Configuration.get_global_configuration() # We always use the global config + self._queue_head_dict = OrderedDict() + self._query_queue_head_promise = None + self._in_progress = set() + self._last_activity = datetime.utcnow() + self._recently_handled = LRUCache[bool](max_length=RECENTLY_HANDLED_CACHE_SIZE) + self._requests_cache = LRUCache(max_length=MAX_CACHED_REQUESTS) @classmethod async def _create_instance(cls, request_queue_id_or_name: str, client: Union[ApifyClientAsync, MemoryStorage]) -> 'RequestQueue': @@ -37,3 +106,294 @@ async def _create_instance(cls, request_queue_id_or_name: str, client: Union[Api @classmethod def _get_default_name(cls, config: Configuration) -> str: return config.default_request_queue_id + + async def add_request(self, request_like: Dict, *, forefront: bool = False) -> Dict: # TODO: Validate request with pydantic + """TODO: docs.""" + self._last_activity = datetime.utcnow() + # const request = requestLike instanceof Request + # ? requestLike + # : new Request(requestLike); + request = request_like + + cache_key = _unique_key_to_request_id(request['uniqueKey']) + cached_info = self._requests_cache.get(cache_key) + + if cached_info: + request['id'] = cached_info['id'] + return { + 'wasAlreadyPresent': True, + # We may assume that if request is in local cache then also the information if the + # request was already handled is there because just one client should be using one queue. + 'wasAlreadyHandled': cached_info['isHandled'], + 'requestId': cached_info['id'], + 'uniqueKey': cached_info['uniqueKey'], + } + + queue_operation_info = await self._client.add_request(request, forefront=forefront) + queue_operation_info['uniqueKey'] = request['uniqueKey'] + + self._cache_request(cache_key, queue_operation_info) + + request_id, was_already_present = queue_operation_info['requestId'], queue_operation_info['wasAlreadyPresent'] + if not was_already_present and request_id not in self._in_progress and self._recently_handled.get(request_id) is None: + self._assumed_total_count += 1 + + self._maybe_add_request_to_queue_head(request_id, forefront) + + return queue_operation_info + + async def get_request(self, request_id: str) -> Optional[Dict]: + """TODO: docs.""" + return await self._client.get_request(request_id) # TODO: Maybe create a Request class? + + async def fetch_next_request(self) -> Optional[Dict]: + """TODO: docs.""" + await self._ensure_head_is_non_empty() + + # We are likely done at this point. + if len(self._queue_head_dict) == 0: + return None + + next_request_id, _ = self._queue_head_dict.popitem(last=False) # ~removeFirst() + + # This should never happen, but... + if next_request_id in self._in_progress or self._recently_handled.get(next_request_id): + logging.warning(f"""Queue head returned a request that is already in progress?! {json.dumps({ + 'nextRequestId': next_request_id, + 'inProgress': next_request_id in self._in_progress, + 'recentlyHandled': next_request_id in self._recently_handled, + })}""") + return None + self._in_progress.add(next_request_id) + self._last_activity = datetime.utcnow() + + try: + request = await self.get_request(next_request_id) + except Exception as e: + # On error, remove the request from in progress, otherwise it would be there forever + self._in_progress.remove(next_request_id) + raise e + + # NOTE: It can happen that the queue head index is inconsistent with the main queue table. This can occur in two situations: + + """ 1) Queue head index is ahead of the main table and the request is not present in the main table yet (i.e. getRequest() returned null). + In this case, keep the request marked as in progress for a short while, + so that isFinished() doesn't return true and _ensureHeadIsNonEmpty() doesn't not load the request + into the queueHeadDict straight again. After the interval expires, fetchNextRequest() + will try to fetch this request again, until it eventually appears in the main table. + """ + if request is None: + logging.debug(f'Cannot find a request from the beginning of queue, will be retried later. nextRequestId: {next_request_id}') + asyncio.get_event_loop().call_later(STORAGE_CONSISTENCY_DELAY_MILLIS // 1000, lambda: self._in_progress.remove(next_request_id)) + return None + + """ 2) Queue head index is behind the main table and the underlying request was already handled + (by some other client, since we keep the track of handled requests in recentlyHandled dictionary). + We just add the request to the recentlyHandled dictionary so that next call to _ensureHeadIsNonEmpty() + will not put the request again to queueHeadDict. + """ + if request.get('handledAt') is not None: + logging.debug(f'Request fetched from the beginning of queue was already handled. nextRequestId: {next_request_id}') + self._recently_handled[next_request_id] = True + return None + + return request + + async def mark_request_as_handled(self, request: Dict) -> Optional[Dict]: # TODO: Validate request with pydantic + """TODO: docs.""" + self._last_activity = datetime.utcnow() + if request['id'] not in self._in_progress: + logging.debug(f'Cannot mark request {request["id"]} as handled, because it is not in progress!') + return None + + request['handledAt'] = request.get('handledAt', datetime.utcnow()) + queue_operation_info = await self._client.update_request({**request}) + queue_operation_info['uniqueKey'] = request['uniqueKey'] + + self._in_progress.remove(request['id']) + self._recently_handled[request['id']] = True + + if not queue_operation_info['wasAlreadyHandled']: + self._assumed_handled_count += 1 + + self._cache_request(_unique_key_to_request_id(request['uniqueKey']), queue_operation_info) + + return queue_operation_info + + async def reclaim_request(self, request: Dict, forefront: bool = False) -> Optional[Dict]: # TODO: Validate request with pydantic + """TODO: docs.""" + self._last_activity = datetime.utcnow() + + if request['id'] not in self._in_progress: + logging.debug(f'Cannot reclaim request {request["id"]}, because it is not in progress!') + return None + + # TODO: If request hasn't been changed since the last getRequest(), + # we don't need to call updateRequest() and thus improve performance. + queue_operation_info = await self._client.update_request(request, forefront=forefront) + queue_operation_info['uniqueKey'] = request['uniqueKey'] + self._cache_request(_unique_key_to_request_id(request['uniqueKey']), queue_operation_info) + + # Wait a little to increase a chance that the next call to fetchNextRequest() will return the request with updated data. + # This is to compensate for the limitation of DynamoDB, where writes might not be immediately visible to subsequent reads. + def callback() -> None: + if request['id'] not in self._in_progress: + logging.debug(f'The request is no longer marked as in progress in the queue?! requestId: {request["id"]}') + return + + self._in_progress.remove(request['id']) + + # Performance optimization: add request straight to head if possible + self._maybe_add_request_to_queue_head(request['id'], forefront) + + asyncio.get_event_loop().call_later(STORAGE_CONSISTENCY_DELAY_MILLIS // 1000, callback) + + return queue_operation_info + + def _in_progress_count(self) -> int: + return len(self._in_progress) + + async def is_empty(self) -> bool: + """TODO: docs.""" + await self._ensure_head_is_non_empty() + return len(self._queue_head_dict) == 0 + + async def is_finished(self) -> bool: + """TODO: docs.""" + if self._in_progress_count() > 0 and (datetime.utcnow() - self._last_activity).seconds > self._internal_timeout_seconds: + message = f'The request queue seems to be stuck for {self._internal_timeout_seconds}s, resetting internal state.' + logging.warning(message) + self._reset() + + if (len(self._queue_head_dict) > 0 or self._in_progress_count() > 0): + return False + + is_head_consistent = await self._ensure_head_is_non_empty(True) + return is_head_consistent and len(self._queue_head_dict) == 0 and self._in_progress_count() == 0 + + def _reset(self) -> None: + self._queue_head_dict.clear() + self._query_queue_head_promise = None + self._in_progress.clear() + self._recently_handled.clear() + self._assumed_total_count = 0 + self._assumed_handled_count = 0 + self._requests_cache.clear() + self._last_activity = datetime.utcnow() + + def _cache_request(self, cache_key: str, queue_operation_info: Dict) -> None: + self._requests_cache[cache_key] = { + 'id': queue_operation_info['requestId'], + 'isHandled': queue_operation_info['wasAlreadyHandled'], + 'uniqueKey': queue_operation_info['uniqueKey'], + 'wasAlreadyHandled': queue_operation_info['wasAlreadyHandled'], + } + + async def _queue_query_head(self, limit: int) -> Dict: + query_started_at = datetime.utcnow() + + list_head = await self._client.list_head(limit=limit) + for request in list_head['items']: + # Queue head index might be behind the main table, so ensure we don't recycle requests + if not request['id'] or not request['uniqueKey'] or request['id'] in self._in_progress or self._recently_handled.get(request['id']): + continue + self._queue_head_dict[request['id']] = request['id'] + self._cache_request(_unique_key_to_request_id(request['uniqueKey']), { + 'request_id': request['id'], + 'was_already_handled': False, + 'was_already_present': True, + 'unique_key': request['uniqueKey'], + }) + + # This is needed so that the next call to _ensureHeadIsNonEmpty() will fetch the queue head again. + self._query_queue_head_promise = None + + return { + 'wasLimitReached': len(list_head['items']) >= limit, + 'prevLimit': limit, + 'queueModifiedAt': list_head['queueModifiedAt'], + 'queryStartedAt': query_started_at, + 'hadMultipleClients': list_head['hadMultipleClients'], + } + + async def _ensure_head_is_non_empty(self, ensure_consistency: bool = False, limit: Optional[int] = None, iteration: int = 0) -> bool: + # If is nonempty resolve immediately. + if len(self._queue_head_dict) > 0: + return True + + if limit is None: + limit = max(self._in_progress_count() * QUERY_HEAD_BUFFER, QUERY_HEAD_MIN_LENGTH) + + if self._query_queue_head_promise is None: + self._query_queue_head_promise = self._queue_query_head(limit) + + queue_head = await self._query_queue_head_promise + + # TODO: I feel this code below can be greatly simplified... (comes from TS implementation *wink*) + + """ If queue is still empty then one of the following holds: + - the other calls waiting for this promise already consumed all the returned requests + - the limit was too low and contained only requests in progress + - the writes from other clients were not propagated yet + - the whole queue was processed and we are done + """ + + # If limit was not reached in the call then there are no more requests to be returned. + if (queue_head['prevLimit'] >= REQUEST_QUEUE_HEAD_MAX_LIMIT): + logging.warning(f'Reached the maximum number of requests in progress: {REQUEST_QUEUE_HEAD_MAX_LIMIT}.') + + should_repeat_with_higher_limit = len( + self._queue_head_dict) == 0 and queue_head['wasLimitReached'] and queue_head['prevLimit'] < REQUEST_QUEUE_HEAD_MAX_LIMIT + + # If ensureConsistency=true then we must ensure that either: + # - queueModifiedAt is older than queryStartedAt by at least API_PROCESSED_REQUESTS_DELAY_MILLIS + # - hadMultipleClients=false and this.assumedTotalCount<=this.assumedHandledCount + is_database_consistent = (queue_head['queryStartedAt'] - queue_head['queueModifiedAt']).seconds >= (API_PROCESSED_REQUESTS_DELAY_MILLIS // 1000) + is_locally_consistent = not queue_head['hadMultipleClients'] and self._assumed_total_count <= self._assumed_handled_count + # Consistent information from one source is enough to consider request queue finished. + should_repeat_for_consistency = ensure_consistency and not is_database_consistent and not is_locally_consistent + + # If both are false then head is consistent and we may exit. + if not should_repeat_with_higher_limit and not should_repeat_for_consistency: + return True + + # If we are querying for consistency then we limit the number of queries to MAX_QUERIES_FOR_CONSISTENCY. + # If this is reached then we return false so that empty() and finished() returns possibly false negative. + if not should_repeat_with_higher_limit and iteration > MAX_QUERIES_FOR_CONSISTENCY: + return False + + next_limit = round(queue_head['prevLimit'] * 1.5) if should_repeat_with_higher_limit else queue_head['prevLimit'] + + # If we are repeating for consistency then wait required time. + if should_repeat_for_consistency: + delay_seconds = (API_PROCESSED_REQUESTS_DELAY_MILLIS // 1000) - (datetime.utcnow() - queue_head['queueModifiedAt']).seconds + logging.info(f'Waiting for {delay_seconds}s before considering the queue as finished to ensure that the data is consistent.') + await asyncio.sleep(delay_seconds) + + return await self._ensure_head_is_non_empty(ensure_consistency, next_limit, iteration + 1) + + def _maybe_add_request_to_queue_head(self, request_id: str, forefront: bool) -> None: + if forefront: + self._queue_head_dict[request_id] = request_id + # Move to start, i.e. forefront of the queue + self._queue_head_dict.move_to_end(request_id, last=False) + elif self._assumed_total_count < QUERY_HEAD_MIN_LENGTH: + # OrderedDict puts the item to the end of the queue by default + self._queue_head_dict[request_id] = request_id + + async def drop(self) -> None: + """TODO: docs.""" + await self._client.delete() + await StorageManager.close_storage(self.__class__, self._id, self._name) + + async def get_info(self) -> Optional[Dict]: + """TODO: docs.""" + return await self._client.get() + + # async def handled_count(self) -> int: + # """TODO: docs.""" + # # TODO: Do we even need this? + # # NOTE: We keep this function for compatibility with RequestList.handledCount() + # rq_info = await self.get_info() + # # TODO: int() wrapping to trick mypy is hacky, use typed dict? + # return int(rq_info['handledRequestCount']) if rq_info is not None else 0 From abf2e2ca7d7c2a2898c04188b84d75f85b651f22 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ji=C5=99=C3=AD=20Morav=C4=8D=C3=ADk?= Date: Wed, 18 Jan 2023 10:47:03 +0100 Subject: [PATCH 2/9] fix lint --- src/apify/storages/request_queue.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/apify/storages/request_queue.py b/src/apify/storages/request_queue.py index 4fe02d71..64b17b7f 100644 --- a/src/apify/storages/request_queue.py +++ b/src/apify/storages/request_queue.py @@ -348,7 +348,8 @@ async def _ensure_head_is_non_empty(self, ensure_consistency: bool = False, limi # If ensureConsistency=true then we must ensure that either: # - queueModifiedAt is older than queryStartedAt by at least API_PROCESSED_REQUESTS_DELAY_MILLIS # - hadMultipleClients=false and this.assumedTotalCount<=this.assumedHandledCount - is_database_consistent = (queue_head['queryStartedAt'] - queue_head['queueModifiedAt']).seconds >= (API_PROCESSED_REQUESTS_DELAY_MILLIS // 1000) + is_database_consistent = (queue_head['queryStartedAt'] - queue_head['queueModifiedAt'] + ).seconds >= (API_PROCESSED_REQUESTS_DELAY_MILLIS // 1000) is_locally_consistent = not queue_head['hadMultipleClients'] and self._assumed_total_count <= self._assumed_handled_count # Consistent information from one source is enough to consider request queue finished. should_repeat_for_consistency = ensure_consistency and not is_database_consistent and not is_locally_consistent From 5a88570b53ddee70ede96d80f5ce3dcd67a20c08 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ji=C5=99=C3=AD=20Morav=C4=8D=C3=ADk?= Date: Wed, 18 Jan 2023 14:25:40 +0100 Subject: [PATCH 3/9] small fixes --- setup.py | 2 +- src/apify/_utils.py | 1 + .../memory_storage/resource_clients/dataset_collection.py | 2 +- .../resource_clients/key_value_store_collection.py | 2 +- .../resource_clients/request_queue_collection.py | 2 +- src/apify/storages/request_queue.py | 8 -------- 6 files changed, 5 insertions(+), 12 deletions(-) diff --git a/setup.py b/setup.py index d9adda16..f36244ec 100644 --- a/setup.py +++ b/setup.py @@ -50,7 +50,7 @@ package_data={'apify': ['py.typed']}, python_requires='>=3.8', install_requires=[ - 'apify-client ~= 0.7.0b30', + 'apify-client ~= 0.7.0b39', 'httpx ~= 0.23.0', 'psutil ~= 5.9.4', 'pydantic ~= 1.10.2', diff --git a/src/apify/_utils.py b/src/apify/_utils.py index c987cc2b..0706e036 100644 --- a/src/apify/_utils.py +++ b/src/apify/_utils.py @@ -327,6 +327,7 @@ def __init__(self, max_length: int) -> None: def __getitem__(self, key: str) -> T: """Get an item from the cache. Move it to the end if present.""" val = self._cache[key] + # No 'key in cache' condition since the previous line would raise KeyError self._cache.move_to_end(key) return val diff --git a/src/apify/memory_storage/resource_clients/dataset_collection.py b/src/apify/memory_storage/resource_clients/dataset_collection.py index 3d49e5cc..c78b6ac6 100644 --- a/src/apify/memory_storage/resource_clients/dataset_collection.py +++ b/src/apify/memory_storage/resource_clients/dataset_collection.py @@ -21,7 +21,7 @@ def __init__(self, *, base_storage_directory: str, client: 'MemoryStorage') -> N self._datasets_directory = base_storage_directory self._client = client - def list(self) -> ListPage: + def list(self) -> ListPage[Dict]: """TODO: docs.""" def map_store(store: DatasetClient) -> Dict: return store.to_dataset_info() diff --git a/src/apify/memory_storage/resource_clients/key_value_store_collection.py b/src/apify/memory_storage/resource_clients/key_value_store_collection.py index f36c9b1e..f00ae656 100644 --- a/src/apify/memory_storage/resource_clients/key_value_store_collection.py +++ b/src/apify/memory_storage/resource_clients/key_value_store_collection.py @@ -21,7 +21,7 @@ def __init__(self, *, base_storage_directory: str, client: 'MemoryStorage') -> N self._key_value_stores_directory = base_storage_directory self._client = client - def list(self) -> ListPage: + def list(self) -> ListPage[Dict]: """TODO: docs.""" def map_store(store: KeyValueStoreClient) -> Dict: return store.to_key_value_store_info() diff --git a/src/apify/memory_storage/resource_clients/request_queue_collection.py b/src/apify/memory_storage/resource_clients/request_queue_collection.py index 56ccdff9..04e1ce4c 100644 --- a/src/apify/memory_storage/resource_clients/request_queue_collection.py +++ b/src/apify/memory_storage/resource_clients/request_queue_collection.py @@ -21,7 +21,7 @@ def __init__(self, *, base_storage_directory: str, client: 'MemoryStorage') -> N self._request_queues_directory = base_storage_directory self._client = client - def list(self) -> ListPage: + def list(self) -> ListPage[Dict]: """TODO: docs.""" def map_store(store: RequestQueueClient) -> Dict: return store.to_request_queue_info() diff --git a/src/apify/storages/request_queue.py b/src/apify/storages/request_queue.py index 64b17b7f..b73ac774 100644 --- a/src/apify/storages/request_queue.py +++ b/src/apify/storages/request_queue.py @@ -390,11 +390,3 @@ async def drop(self) -> None: async def get_info(self) -> Optional[Dict]: """TODO: docs.""" return await self._client.get() - - # async def handled_count(self) -> int: - # """TODO: docs.""" - # # TODO: Do we even need this? - # # NOTE: We keep this function for compatibility with RequestList.handledCount() - # rq_info = await self.get_info() - # # TODO: int() wrapping to trick mypy is hacky, use typed dict? - # return int(rq_info['handledRequestCount']) if rq_info is not None else 0 From 2054ec7c114c227f1fdc3fc2732c956abe3813c7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ji=C5=99=C3=AD=20Morav=C4=8D=C3=ADk?= Date: Wed, 18 Jan 2023 15:08:54 +0100 Subject: [PATCH 4/9] add some unit tests --- tests/unit/actor/test_actor_request_queue.py | 14 +++++ tests/unit/test_lru_cache.py | 57 ++++++++++++++++++++ tests/unit/test_utils.py | 9 ++++ 3 files changed, 80 insertions(+) create mode 100644 tests/unit/actor/test_actor_request_queue.py create mode 100644 tests/unit/test_lru_cache.py diff --git a/tests/unit/actor/test_actor_request_queue.py b/tests/unit/actor/test_actor_request_queue.py new file mode 100644 index 00000000..8bdcf614 --- /dev/null +++ b/tests/unit/actor/test_actor_request_queue.py @@ -0,0 +1,14 @@ +from apify import Actor + +# NOTE: We only test the references here. Actual tests for the implementations are in storages/ + + +async def test_same_references() -> None: + async with Actor: + rq1 = await Actor.open_request_queue() + rq2 = await Actor.open_request_queue() + assert rq1 is rq2 + rq_name = 'non-default' + rq_named1 = await Actor.open_request_queue(rq_name) + rq_named2 = await Actor.open_request_queue(rq_name) + assert rq_named1 is rq_named2 diff --git a/tests/unit/test_lru_cache.py b/tests/unit/test_lru_cache.py new file mode 100644 index 00000000..b9d9f37c --- /dev/null +++ b/tests/unit/test_lru_cache.py @@ -0,0 +1,57 @@ +import pytest + +from apify._utils import LRUCache + + +@pytest.fixture() +def lru_cache() -> LRUCache[int]: + cache = LRUCache[int](3) + cache['a'] = 1 + cache['c'] = 3 + cache['b'] = 2 + return cache + + +def test_get(lru_cache: LRUCache[int]) -> None: + # Key error with non-existent key + with pytest.raises(KeyError): + _ = lru_cache['non-existent-key'] + # None when using .get instead + assert lru_cache.get('non-existent-key') is None + # Should return correct value for existing key + assert lru_cache['c'] == 3 + # Check if order of keys changed based on LRU rule + for actual, target in zip(lru_cache, ['a', 'b', 'c']): + assert actual == target + + +def test_set(lru_cache: LRUCache[int]) -> None: + assert len(lru_cache) == 3 + lru_cache['d'] = 4 + # Check if max_length is not exceeded + assert len(lru_cache) == 3 + # Check if oldest key is removed + assert 'a' not in lru_cache + # Check if the newest addition is at the end + assert list(lru_cache.items())[-1] == ('d', 4) + + +def test_del(lru_cache: LRUCache[int]) -> None: + # Key error on non-existent key + with pytest.raises(KeyError): + del lru_cache['non-existent-key'] + # No error with existing key + len_before_del = len(lru_cache) + del lru_cache['a'] + assert len(lru_cache) == len_before_del - 1 + assert 'a' not in lru_cache + + +def test_len(lru_cache: LRUCache[int]) -> None: + assert len(lru_cache) == len(lru_cache._cache) + lru_cache.clear() + assert len(lru_cache) == 0 + + +def test_iter(lru_cache: LRUCache[int]) -> None: + assert list(lru_cache) == ['a', 'c', 'b'] diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py index 07a776a2..116cf525 100644 --- a/tests/unit/test_utils.py +++ b/tests/unit/test_utils.py @@ -9,6 +9,7 @@ from aiofiles.os import mkdir from apify._utils import ( + _crypto_random_object_id, _fetch_and_parse_env_var, _filter_out_none_values_recursively, _filter_out_none_values_recursively_internal, @@ -314,3 +315,11 @@ async def test__force_rename(tmp_path: str) -> None: assert os.path.exists(dst_file) is False # src_dir.txt should exist in dst_dir assert os.path.exists(os.path.join(dst_dir, 'src_dir.txt')) is True + + +def test__crypto_random_object_id() -> None: + assert len(_crypto_random_object_id()) == 17 + assert len(_crypto_random_object_id(5)) == 5 + long_random_object_id = _crypto_random_object_id(1000) + for char in long_random_object_id: + assert char in 'abcdefghijklmnopqrstuvwxyzABCEDFGHIJKLMNOPQRSTUVWXYZ0123456789' From cc430d55148e5b9eaef85bce4f6fcc1cbae94242 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ji=C5=99=C3=AD=20Morav=C4=8D=C3=ADk?= Date: Fri, 20 Jan 2023 21:50:01 +0100 Subject: [PATCH 5/9] fix name --- src/apify/storages/storage_manager.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/apify/storages/storage_manager.py b/src/apify/storages/storage_manager.py index ba383665..814648fc 100644 --- a/src/apify/storages/storage_manager.py +++ b/src/apify/storages/storage_manager.py @@ -43,7 +43,7 @@ def _get_default_instance(cls) -> 'StorageManager': async def open_storage( cls, storage_class: Type[T], - store_id_or_name: Optional[str] = None, + storage_id_or_name: Optional[str] = None, client: Optional[Union[ApifyClientAsync, MemoryStorage]] = None, config: Optional[Configuration] = None, ) -> T: @@ -57,11 +57,11 @@ async def open_storage( storage_manager._cache[storage_class] = {} # Fetch default name - if not store_id_or_name: - store_id_or_name = storage_class._get_default_name(used_config) + if not storage_id_or_name: + storage_id_or_name = storage_class._get_default_name(used_config) # Try to get the storage instance from cache - storage = storage_manager._cache[storage_class].get(store_id_or_name, None) + storage = storage_manager._cache[storage_class].get(storage_id_or_name, None) if storage is not None: # This cast is needed since we're storing all storages in one union dictionary return cast(T, storage) @@ -71,7 +71,7 @@ async def open_storage( await _purge_default_storages(used_client) # Create the storage - storage = await storage_class._create_instance(store_id_or_name, used_client) + storage = await storage_class._create_instance(storage_id_or_name, used_client) # Cache by id and name storage_manager._cache[storage_class][storage._id] = storage From 5ba44dfb6276e7b70fa1efb99bb62b31b8f88632 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ji=C5=99=C3=AD=20Morav=C4=8D=C3=ADk?= Date: Fri, 20 Jan 2023 22:34:33 +0100 Subject: [PATCH 6/9] unit tests etc. --- src/apify/storages/request_queue.py | 5 ++ src/apify/storages/storage_manager.py | 20 +++++-- tests/unit/storages/test_request_queue.py | 66 +++++++++++++++++++++++ 3 files changed, 87 insertions(+), 4 deletions(-) create mode 100644 tests/unit/storages/test_request_queue.py diff --git a/src/apify/storages/request_queue.py b/src/apify/storages/request_queue.py index b73ac774..bc5d3389 100644 --- a/src/apify/storages/request_queue.py +++ b/src/apify/storages/request_queue.py @@ -390,3 +390,8 @@ async def drop(self) -> None: async def get_info(self) -> Optional[Dict]: """TODO: docs.""" return await self._client.get() + + @classmethod + async def open(cls, request_queue_id_or_name: Optional[str] = None, config: Optional[Configuration] = None) -> 'RequestQueue': + """TODO: docs.""" + return await StorageManager.open_storage(cls, request_queue_id_or_name, None, config) diff --git a/src/apify/storages/storage_manager.py b/src/apify/storages/storage_manager.py index 814648fc..82e8c8e4 100644 --- a/src/apify/storages/storage_manager.py +++ b/src/apify/storages/storage_manager.py @@ -1,4 +1,4 @@ -from typing import TYPE_CHECKING, Dict, Optional, Type, TypeVar, Union, cast +from typing import TYPE_CHECKING, Dict, Optional, Protocol, Type, TypeVar, Union, cast from apify_client import ApifyClientAsync @@ -11,7 +11,19 @@ from .key_value_store import KeyValueStore from .request_queue import RequestQueue -T = TypeVar('T', 'Dataset', 'KeyValueStore', 'RequestQueue') +T = TypeVar('T', 'Dataset', 'KeyValueStore', 'RequestQueue', covariant=True) + + +class Storage(Protocol[T]): + """TODO: Docs.""" + + @classmethod + def _create_instance(cls, storage_id_or_name: str, client: Union[ApifyClientAsync, MemoryStorage]) -> T: # noqa: U100 + ... + + @classmethod + def _get_default_name(cls, config: Configuration) -> str: # noqa: U100 + ... async def _purge_default_storages(client: Union[ApifyClientAsync, MemoryStorage]) -> None: @@ -24,7 +36,7 @@ class StorageManager: """TODO: docs.""" _default_instance: Optional['StorageManager'] = None - _cache: Dict[Type[Union['Dataset', 'KeyValueStore', 'RequestQueue']], Dict[str, Union['Dataset', 'KeyValueStore', 'RequestQueue']]] + _cache: Dict[Type[Storage], Dict[str, Storage]] _config: Configuration def __init__(self) -> None: @@ -80,7 +92,7 @@ async def open_storage( return storage @classmethod - async def close_storage(cls, storage_class: Type[Union['Dataset', 'KeyValueStore', 'RequestQueue']], id: str, name: Optional[str]) -> None: + async def close_storage(cls, storage_class: Type[Storage], id: str, name: Optional[str]) -> None: """TODO: docs.""" storage_manager = StorageManager._get_default_instance() del storage_manager._cache[storage_class][id] diff --git a/tests/unit/storages/test_request_queue.py b/tests/unit/storages/test_request_queue.py new file mode 100644 index 00000000..174b1724 --- /dev/null +++ b/tests/unit/storages/test_request_queue.py @@ -0,0 +1,66 @@ +import asyncio + +import pytest + +from apify.storages import RequestQueue + + +@pytest.fixture() +async def request_queue() -> RequestQueue: + return await RequestQueue.open() + + +async def test_same_references() -> None: + rq1 = await RequestQueue.open() + rq2 = await RequestQueue.open() + assert rq1 is rq2 + rq_name = 'non-default' + rq_named1 = await RequestQueue.open(rq_name) + rq_named2 = await RequestQueue.open(rq_name) + assert rq_named1 is rq_named2 + + +async def test_drop() -> None: + rq1 = await RequestQueue.open() + await rq1.drop() + rq2 = await RequestQueue.open() + assert rq1 is not rq2 + + +async def test_add_fetch_handle_request(request_queue: RequestQueue) -> None: + url = 'https://example.com' + assert await request_queue.is_empty() is True + add_request_info = await request_queue.add_request({ + 'uniqueKey': url, + 'url': url, + }) + assert add_request_info['wasAlreadyPresent'] is False + assert add_request_info['wasAlreadyHandled'] is False + assert await request_queue.is_empty() is False + # Fetch the request + next = await request_queue.fetch_next_request() + assert next is not None + # Mark it as handled + queue_operation_info = await request_queue.mark_request_as_handled(next) + assert queue_operation_info is not None + assert queue_operation_info['uniqueKey'] == url + assert await request_queue.is_finished() is True + + +async def test_reclaim_request(request_queue: RequestQueue) -> None: + url = 'https://example.com' + await request_queue.add_request({ + 'uniqueKey': url, + 'url': url, + }) + # Fetch the request + next = await request_queue.fetch_next_request() + assert next is not None + assert next['uniqueKey'] == url + # Reclaim + await request_queue.reclaim_request(next) + # Try to fetch again after a few secs + await asyncio.sleep(4) # 3 seconds is the consistency delay in request queue + next_again = await request_queue.fetch_next_request() + assert next_again is not None + assert next_again['uniqueKey'] == url From b5b98acb637cc770b58d2754cd54569374290286 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ji=C5=99=C3=AD=20Morav=C4=8D=C3=ADk?= Date: Fri, 20 Jan 2023 22:37:40 +0100 Subject: [PATCH 7/9] add change from utils to prevent conflict --- src/apify/_utils.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/apify/_utils.py b/src/apify/_utils.py index 0706e036..45c42ab9 100644 --- a/src/apify/_utils.py +++ b/src/apify/_utils.py @@ -1,5 +1,6 @@ import asyncio import base64 +import builtins import contextlib import functools import hashlib @@ -50,6 +51,8 @@ def _log_system_info() -> None: print(f' Apify Client version: {client_version}') print(f' OS: {sys.platform}') print(f' Python version: {python_version}') + if _is_running_in_ipython(): + print(' Running in IPython: True') DualPropertyType = TypeVar('DualPropertyType') @@ -358,3 +361,7 @@ def values(self) -> ValuesView[T]: # Needed so we don't mutate the cache by __g def items(self) -> ItemsView[str, T]: # Needed so we don't mutate the cache by __getitem__ """Iterate over the pairs of (key, value) in the cache in order of insertion.""" return self._cache.items() + + +def _is_running_in_ipython() -> bool: + return getattr(builtins, '__IPYTHON__', False) From b4d447591e7e935fb0adfe5a69fe22fb38f7e7e5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ji=C5=99=C3=AD=20Morav=C4=8D=C3=ADk?= Date: Fri, 20 Jan 2023 22:46:47 +0100 Subject: [PATCH 8/9] remove unused typed dict --- src/apify/storages/request_queue.py | 17 +---------------- 1 file changed, 1 insertion(+), 16 deletions(-) diff --git a/src/apify/storages/request_queue.py b/src/apify/storages/request_queue.py index aa1d69c6..6f433ad9 100644 --- a/src/apify/storages/request_queue.py +++ b/src/apify/storages/request_queue.py @@ -5,7 +5,7 @@ from datetime import datetime from typing import Coroutine, Dict, Optional from typing import OrderedDict as OrderedDictType -from typing import Set, TypedDict, Union +from typing import Set, Union from apify_client import ApifyClientAsync from apify_client.clients import RequestQueueClientAsync @@ -48,21 +48,6 @@ """ -class QueueOperationInfo(TypedDict): - """TODO: docs.""" - - was_already_present: bool - """Indicates if request was already present in the queue.""" - - was_already_handled: bool - """Indicates if request was already marked as handled.""" - - request_id: str - """The id of the added request""" - - unique_key: str - - class RequestQueue: """TODO: docs.""" From c601d8cfff1b375fa0847a061256107fd830df8c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ji=C5=99=C3=AD=20Morav=C4=8D=C3=ADk?= Date: Sun, 22 Jan 2023 14:59:26 +0100 Subject: [PATCH 9/9] Address PR comments --- src/apify/storages/dataset.py | 2 -- src/apify/storages/key_value_store.py | 2 -- src/apify/storages/request_queue.py | 6 ++---- 3 files changed, 2 insertions(+), 8 deletions(-) diff --git a/src/apify/storages/dataset.py b/src/apify/storages/dataset.py index 5dbfce59..c123138a 100644 --- a/src/apify/storages/dataset.py +++ b/src/apify/storages/dataset.py @@ -21,7 +21,6 @@ class Dataset: _id: str _name: Optional[str] _client: Union[DatasetClientAsync, DatasetClient] - _config: Configuration def __init__(self, id: str, name: Optional[str], client: Union[ApifyClientAsync, MemoryStorage]) -> None: """TODO: docs (constructor should be "internal").""" @@ -32,7 +31,6 @@ def __init__(self, id: str, name: Optional[str], client: Union[ApifyClientAsync, self._id = id self._name = name self._client = client.dataset(self._id) - self._config = Configuration.get_global_configuration() # We always use the global config @classmethod async def _create_instance(cls, dataset_id_or_name: str, client: Union[ApifyClientAsync, MemoryStorage]) -> 'Dataset': diff --git a/src/apify/storages/key_value_store.py b/src/apify/storages/key_value_store.py index 76a0455f..6f6f4c58 100644 --- a/src/apify/storages/key_value_store.py +++ b/src/apify/storages/key_value_store.py @@ -18,7 +18,6 @@ class KeyValueStore: _id: str _name: Optional[str] _client: Union[KeyValueStoreClientAsync, KeyValueStoreClient] - _config: Configuration def __init__(self, id: str, name: Optional[str], client: Union[ApifyClientAsync, MemoryStorage]) -> None: """TODO: docs (constructor should be "internal").""" @@ -27,7 +26,6 @@ def __init__(self, id: str, name: Optional[str], client: Union[ApifyClientAsync, self._id = id self._name = name self._client = client.key_value_store(self._id) - self._config = Configuration.get_global_configuration() # We always use the global config @classmethod async def _create_instance(cls, store_id_or_name: str, client: Union[ApifyClientAsync, MemoryStorage]) -> 'KeyValueStore': diff --git a/src/apify/storages/request_queue.py b/src/apify/storages/request_queue.py index 6f433ad9..90bc7a3b 100644 --- a/src/apify/storages/request_queue.py +++ b/src/apify/storages/request_queue.py @@ -54,7 +54,6 @@ class RequestQueue: _id: str _name: Optional[str] _client: Union[RequestQueueClientAsync, RequestQueueClient] - _config: Configuration _client_key = _crypto_random_object_id() _queue_head_dict: OrderedDictType[str, str] _query_queue_head_promise: Optional[Coroutine] @@ -71,7 +70,6 @@ def __init__(self, id: str, name: Optional[str], client: Union[ApifyClientAsync, self._id = id self._name = name self._client = client.request_queue(self._id, client_key=self._client_key) - self._config = Configuration.get_global_configuration() # We always use the global config self._queue_head_dict = OrderedDict() self._query_queue_head_promise = None self._in_progress = set() @@ -169,7 +167,7 @@ async def fetch_next_request(self) -> Optional[Dict]: """ if request is None: logging.debug(f'Cannot find a request from the beginning of queue, will be retried later. nextRequestId: {next_request_id}') - asyncio.get_event_loop().call_later(STORAGE_CONSISTENCY_DELAY_MILLIS // 1000, lambda: self._in_progress.remove(next_request_id)) + asyncio.get_running_loop().call_later(STORAGE_CONSISTENCY_DELAY_MILLIS // 1000, lambda: self._in_progress.remove(next_request_id)) return None """ 2) Queue head index is behind the main table and the underlying request was already handled @@ -231,7 +229,7 @@ def callback() -> None: # Performance optimization: add request straight to head if possible self._maybe_add_request_to_queue_head(request['id'], forefront) - asyncio.get_event_loop().call_later(STORAGE_CONSISTENCY_DELAY_MILLIS // 1000, callback) + asyncio.get_running_loop().call_later(STORAGE_CONSISTENCY_DELAY_MILLIS // 1000, callback) return queue_operation_info