Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit dba96de

Browse filesBrowse files
committed
Issue #40: add delegator for dynamically detected cube processes
adds a property `dynamic` to ImageCollectionClient instances, which allows to call processes that are dynamically detected from the backend process listing (and not necessarily predefined in the client)
1 parent cd204f4 commit dba96de
Copy full SHA for dba96de

File tree

Expand file treeCollapse file tree

3 files changed

+115
-0
lines changed
Filter options
Expand file treeCollapse file tree

3 files changed

+115
-0
lines changed

‎openeo/rest/connection.py

Copy file name to clipboardExpand all lines: openeo/rest/connection.py
+30Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ def __init__(self, url, auth: AuthBase = None, session: requests.Session = None)
174174
"""
175175
super().__init__(root_url=url, auth=auth, session=session)
176176
self._cached_capabilities = None
177+
self._process_registry = None
177178

178179
# Initial API version check.
179180
if self._api_version.below(self._MINIMUM_API_VERSION):
@@ -267,6 +268,15 @@ def capabilities(self) -> 'Capabilities':
267268

268269
return self._cached_capabilities
269270

271+
def process_registry(self) -> 'ProcessRegistry':
272+
"""
273+
Load all processes supported by the backend (lazy/cached)
274+
:return: ProcessRegistry
275+
"""
276+
if self._process_registry is None:
277+
self._process_registry = ProcessRegistry.from_connection(connection=self)
278+
return self._process_registry
279+
270280
@deprecated("Use 'list_output_formats' instead")
271281
def list_file_types(self) -> dict:
272282
return self.list_output_formats()
@@ -554,3 +564,23 @@ def session(userid=None, endpoint: str = "https://openeo.org/openeo") -> Connect
554564
"""
555565
return connect(url=endpoint)
556566

567+
568+
class ProcessRegistry:
569+
"""
570+
Registry of process specs (e.g. the processes supported by a backend)
571+
"""
572+
def __init__(self, processes: dict):
573+
self._reg = processes
574+
575+
@classmethod
576+
def from_connection(cls, connection=Connection):
577+
"""Factory to load process registry from given backend connection."""
578+
# Get as list from API
579+
processes = connection.get('/processes').json()['processes']
580+
# Make it a dictionary for more efficient retrieval
581+
processes = {p['id']: p for p in processes}
582+
return cls(processes=processes)
583+
584+
def get_parameters(self, process_id: str) -> List[dict]:
585+
"""Get parameters for given process_id."""
586+
return self._reg[process_id]["parameters"]

‎openeo/rest/imagecollectionclient.py

Copy file name to clipboardExpand all lines: openeo/rest/imagecollectionclient.py
+57Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ def __init__(self, node_id: str, builder: GraphBuilder, session: 'Connection', m
3131
self.session = session
3232
self.graph = builder.processes
3333
self.metadata = metadata
34+
self.dynamic = DynamicCubeMethodDelegator(cube=self)
3435

3536
def __str__(self):
3637
return "ImageCollection: %s" % self.node_id
@@ -1070,3 +1071,59 @@ def to_graphviz(self):
10701071
# TODO: add subgraph for "callback" arguments?
10711072

10721073
return graph
1074+
1075+
1076+
class DynamicProcessException(Exception):
1077+
pass
1078+
1079+
1080+
class _DynamicCubeMethod:
1081+
"""
1082+
A dynamically detected process bound to a raster cube.
1083+
The process should have a single "raster-cube" parameter.
1084+
"""
1085+
1086+
def __init__(self, cube: ImageCollectionClient, process_id: str, parameters: List[dict]):
1087+
self.cube = cube
1088+
self.process_id = process_id
1089+
self.parameters = parameters
1090+
1091+
# Find raster-cube parameter.
1092+
expected_schema = {"type": "object", "subtype": "raster-cube"}
1093+
names = [p["name"] for p in self.parameters if p["schema"] == expected_schema]
1094+
if len(names) != 1:
1095+
raise DynamicProcessException("Need one raster-cube parameter but found {c}".format(c=len(names)))
1096+
self.cube_parameter = names[0]
1097+
1098+
def __call__(self, *args, **kwargs):
1099+
"""Call the "cube method": pass cube and other arguments to the process."""
1100+
arguments = {
1101+
self.cube_parameter: {"from_node": self.cube.node_id}
1102+
}
1103+
# TODO: more advanced parameter checking (required vs optional), normalization based on type, ...
1104+
for i, arg in enumerate(args):
1105+
arguments[self.parameters[i]["name"]] = arg
1106+
for key, value in kwargs.items():
1107+
assert any(p["name"] == key for p in self.parameters)
1108+
assert key not in arguments
1109+
arguments[key] = value
1110+
1111+
return self.cube.graph_add_process(
1112+
process_id=self.process_id,
1113+
args=arguments,
1114+
)
1115+
1116+
1117+
class DynamicCubeMethodDelegator:
1118+
"""
1119+
Wrapper for a DataCube to group and delegate to dynamically detected processes
1120+
(depending on a particular backend or API spec)
1121+
"""
1122+
1123+
def __init__(self, cube: ImageCollectionClient):
1124+
self.cube = cube
1125+
1126+
def __getattr__(self, process_id):
1127+
self.process_registry = self.cube.session.process_registry()
1128+
parameters = self.process_registry.get_parameters(process_id)
1129+
return _DynamicCubeMethod(self.cube, process_id=process_id, parameters=parameters)

‎tests/rest/test_imagecollectionclient.py

Copy file name to clipboardExpand all lines: tests/rest/test_imagecollectionclient.py
+28Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
def session040(requests_mock):
1515
requests_mock.get(API_URL + "/", json={"api_version": "0.4.0"})
1616
session = openeo.connect(API_URL)
17+
# Reset graph builder
18+
GraphBuilder.id_counter = {}
1719
return session
1820

1921

@@ -82,3 +84,29 @@ def result_callback(request, context):
8284
path = tmpdir.join("tmp.tiff")
8385
session040.load_collection("SENTINEL2").download(str(path), format="GTIFF")
8486
assert path.read() == "tiffdata"
87+
88+
89+
def test_dynamic_cube_method(session040, requests_mock):
90+
processes = [
91+
{
92+
"id": "make_larger",
93+
"description": "multiply a raster cube with a factor",
94+
"parameters": [
95+
{"name": "data", "schema": {"type": "object", "subtype": "raster-cube"}},
96+
{"name": "factor", "schema": {"type": "float"}},
97+
]}
98+
]
99+
requests_mock.get(API_URL + '/processes', json={"processes": processes})
100+
requests_mock.get(API_URL + "/collections/SENTINEL2", json={"foo": "bar"})
101+
102+
cube = session040.load_collection("SENTINEL2")
103+
evi = cube.dynamic.make_larger(factor=42)
104+
assert set(evi.graph.keys()) == {"loadcollection1", "makelarger1"}
105+
assert evi.graph["makelarger1"] == {
106+
"process_id": "make_larger",
107+
"arguments": {
108+
"data": {"from_node": "loadcollection1"},
109+
"factor": 42,
110+
},
111+
"result": False
112+
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.