From c72c9828d40850ffbfc5cdf706253f820947a22a Mon Sep 17 00:00:00 2001 From: neeratyoy Date: Mon, 1 Feb 2021 20:58:50 +0100 Subject: [PATCH 1/8] Black fix + removal of untested unit test --- openml/config.py | 12 +++ openml/runs/functions.py | 129 +++++++++++++++++--------- tests/test_runs/test_run_functions.py | 50 ++++++++++ 3 files changed, 148 insertions(+), 43 deletions(-) diff --git a/openml/config.py b/openml/config.py index 237e71170..408921879 100644 --- a/openml/config.py +++ b/openml/config.py @@ -89,6 +89,7 @@ def set_file_log_level(file_output_level: int): "avoid_duplicate_runs": "True", "connection_n_retries": 10, "max_retries": 20, + "n_jobs": 4, } config_file = os.path.expanduser(os.path.join("~", ".openml", "config")) @@ -118,6 +119,7 @@ def get_server_base_url() -> str: # Number of retries if the connection breaks connection_n_retries = _defaults["connection_n_retries"] max_retries = _defaults["max_retries"] +n_jobs = _defaults["n_jobs"] class ConfigurationForExamples: @@ -170,6 +172,12 @@ def stop_using_configuration_for_example(cls): apikey = cls._last_used_key cls._start_last_called = False + @classmethod + def set_n_jobs_for_parallel_runs(cls, n=4): + """ Set the number of workers to be used while running a flow/model on a task. """ + global n_jobs + n_jobs = n + def _setup(): """Setup openml package. Called on first import. @@ -186,6 +194,7 @@ def _setup(): global avoid_duplicate_runs global connection_n_retries global max_retries + global n_jobs # read config file, create cache directory try: @@ -288,12 +297,15 @@ def set_cache_directory(cachedir): ConfigurationForExamples.start_using_configuration_for_example ) stop_using_configuration_for_example = ConfigurationForExamples.stop_using_configuration_for_example +set_n_jobs_for_parallel_runs = ConfigurationForExamples.set_n_jobs_for_parallel_runs + __all__ = [ "get_cache_directory", "set_cache_directory", "start_using_configuration_for_example", "stop_using_configuration_for_example", + "set_n_jobs_for_parallel_runs", ] _setup() diff --git a/openml/runs/functions.py b/openml/runs/functions.py index 89b811d10..591ed3374 100644 --- a/openml/runs/functions.py +++ b/openml/runs/functions.py @@ -12,6 +12,7 @@ import xmltodict import numpy as np import pandas as pd +from joblib.parallel import Parallel, delayed import openml import openml.utils @@ -447,55 +448,33 @@ def _run_task_get_arffcontent( # methods, less maintenance, less confusion) num_reps, num_folds, num_samples = task.get_split_dimensions() + jobs = [] for n_fit, (rep_no, fold_no, sample_no) in enumerate( itertools.product(range(num_reps), range(num_folds), range(num_samples),), start=1 ): - - train_indices, test_indices = task.get_train_test_split_indices( - repeat=rep_no, fold=fold_no, sample=sample_no - ) - if isinstance(task, OpenMLSupervisedTask): - x, y = task.get_X_and_y(dataset_format=dataset_format) - if dataset_format == "dataframe": - train_x = x.iloc[train_indices] - train_y = y.iloc[train_indices] - test_x = x.iloc[test_indices] - test_y = y.iloc[test_indices] - else: - train_x = x[train_indices] - train_y = y[train_indices] - test_x = x[test_indices] - test_y = y[test_indices] - elif isinstance(task, OpenMLClusteringTask): - x = task.get_X(dataset_format=dataset_format) - if dataset_format == "dataframe": - train_x = x.iloc[train_indices] - else: - train_x = x[train_indices] - train_y = None - test_x = None - test_y = None - else: - raise NotImplementedError(task.task_type) - - config.logger.info( - "Going to execute flow '%s' on task %d for repeat %d fold %d sample %d.", - flow.name, - task.task_id, - rep_no, - fold_no, - sample_no, - ) - - pred_y, proba_y, user_defined_measures_fold, trace = extension._run_model_on_fold( + jobs.append((n_fit, rep_no, fold_no, sample_no)) + + # Execute runs in parallel + # assuming the same number of tasks as workers (n_jobs), the total compute time for this + # statement will be similar to the slowest run + job_rvals = Parallel(verbose=0, n_jobs=openml.config.n_jobs)( + delayed(_run_task_get_arffcontent_parallel_helper)( + extension=extension, + flow=flow, + fold_no=fold_no, model=model, - task=task, - X_train=train_x, - y_train=train_y, rep_no=rep_no, - fold_no=fold_no, - X_test=test_x, + sample_no=sample_no, + task=task, + dataset_format=dataset_format, ) + for n_fit, rep_no, fold_no, sample_no in jobs + ) # job_rvals contain the output of all the runs with one-to-one correspondence with `jobs` + + for n_fit, rep_no, fold_no, sample_no in jobs: + pred_y, proba_y, test_indices, test_y, trace, user_defined_measures_fold = job_rvals[ + n_fit - 1 + ] if trace is not None: traces.append(trace) @@ -615,6 +594,70 @@ def _calculate_local_measure(sklearn_fn, openml_name): ) +def _run_task_get_arffcontent_parallel_helper( + extension: "Extension", + flow: OpenMLFlow, + fold_no: int, + model: Any, + rep_no: int, + sample_no: int, + task: OpenMLTask, + dataset_format: str, +) -> Tuple[ + np.ndarray, + Optional[pd.DataFrame], + np.ndarray, + Optional[pd.DataFrame], + Optional[OpenMLRunTrace], + "OrderedDict[str, float]", +]: + train_indices, test_indices = task.get_train_test_split_indices( + repeat=rep_no, fold=fold_no, sample=sample_no + ) + + if isinstance(task, OpenMLSupervisedTask): + x, y = task.get_X_and_y(dataset_format=dataset_format) + if dataset_format == "dataframe": + train_x = x.iloc[train_indices] + train_y = y.iloc[train_indices] + test_x = x.iloc[test_indices] + test_y = y.iloc[test_indices] + else: + train_x = x[train_indices] + train_y = y[train_indices] + test_x = x[test_indices] + test_y = y[test_indices] + elif isinstance(task, OpenMLClusteringTask): + x = task.get_X(dataset_format=dataset_format) + if dataset_format == "dataframe": + train_x = x.iloc[train_indices] + else: + train_x = x[train_indices] + train_y = None + test_x = None + test_y = None + else: + raise NotImplementedError(task.task_type) + config.logger.info( + "Going to execute flow '%s' on task %d for repeat %d fold %d sample %d.", + flow.name, + task.task_id, + rep_no, + fold_no, + sample_no, + ) + (pred_y, proba_y, user_defined_measures_fold, trace,) = extension._run_model_on_fold( + model=model, + task=task, + X_train=train_x, + y_train=train_y, + rep_no=rep_no, + fold_no=fold_no, + X_test=test_x, + ) + return pred_y, proba_y, test_indices, test_y, trace, user_defined_measures_fold + + def get_runs(run_ids): """Gets all runs in run_ids list. diff --git a/tests/test_runs/test_run_functions.py b/tests/test_runs/test_run_functions.py index e7c0c06fc..be96d4383 100644 --- a/tests/test_runs/test_run_functions.py +++ b/tests/test_runs/test_run_functions.py @@ -10,6 +10,7 @@ import unittest.mock import numpy as np +from joblib import parallel_backend import openml import openml.exceptions @@ -1575,3 +1576,52 @@ def test_format_prediction_task_regression(self): ignored_input = [0] * 5 res = format_prediction(regression, *ignored_input) self.assertListEqual(res, [0] * 5) + + @unittest.mock.patch("openml.runs.functions._run_task_get_arffcontent_parallel_helper") + def test__run_task_get_arffcontent_2(self, mock): + # Unit test style test + def side_effect(*args, **kwargs): + return ( + np.array([0, 1]), + np.array([[0.8, 0.2], [0.2, 0.8]]), + np.array([1, 2]), + np.array([1, 1]), + None, + {}, + ) + + mock.side_effect = side_effect + + task = openml.tasks.get_task(7) + + flow = unittest.mock.Mock() + flow.name = "dummy" + clf = SGDClassifier(loss="log", random_state=1) + + # Unit test doesn't work with loky and multiprocessing backend + for n_jobs, backend, call_count in ( + (1, "sequential", 10), + (1, "threading", 20), + (-1, "threading", 30), + (2, "threading", 40), + (None, "threading", 50), + ): + with parallel_backend(backend, n_jobs=n_jobs): + res = openml.runs.functions._run_task_get_arffcontent( + flow=flow, + extension=self.extension, + model=clf, + task=task, + add_local_measures=True, + dataset_format="dataframe", + ) + assert len(res) == 4, len(res) # General function interface + assert len(res[0]) == 20 # 10 folds x 2 predictions returned + assert res[1] is None # No trace + assert len(res[2]) == 1 + assert len(res[2]["predictive_accuracy"]) == 1 + assert len(res[2]["predictive_accuracy"][0]) == 10 + assert len(res[3]) == 1 + assert len(res[3]["predictive_accuracy"]) == 1 + assert len(res[3]["predictive_accuracy"][0]) == 10 + assert mock.call_count == call_count, (mock.call_count, call_count) From 06b1fa4dae47170e7b0d18f47168f7d0bd7a7a14 Mon Sep 17 00:00:00 2001 From: neeratyoy Date: Wed, 3 Feb 2021 21:08:36 +0100 Subject: [PATCH 2/8] Black fix --- openml/config.py | 43 ++++++++++++++---- openml/runs/functions.py | 11 ++++- tests/test_openml/test_config.py | 28 ++++++++++++ tests/test_runs/test_run_functions.py | 65 +++++++++------------------ 4 files changed, 92 insertions(+), 55 deletions(-) diff --git a/openml/config.py b/openml/config.py index 408921879..339658ac0 100644 --- a/openml/config.py +++ b/openml/config.py @@ -179,7 +179,7 @@ def set_n_jobs_for_parallel_runs(cls, n=4): n_jobs = n -def _setup(): +def _setup(config=None): """Setup openml package. Called on first import. Reads the config file and sets up apikey, server, cache appropriately. @@ -203,13 +203,28 @@ def _setup(): # For other errors, we want to propagate the error as openml does not work without cache pass - config = _parse_config() - apikey = config.get("FAKE_SECTION", "apikey") - server = config.get("FAKE_SECTION", "server") + if config is None: + config = _parse_config() - short_cache_dir = config.get("FAKE_SECTION", "cachedir") - cache_directory = os.path.expanduser(short_cache_dir) + def _get(config, key): + return config.get("FAKE_SECTION", key) + + avoid_duplicate_runs = config.getboolean("FAKE_SECTION", "avoid_duplicate_runs") + else: + + def _get(config, key): + return config.get(key) + + avoid_duplicate_runs = config.get("avoid_duplicate_runs") + apikey = _get(config, "apikey") + server = _get(config, "server") + short_cache_dir = _get(config, "cachedir") + connection_n_retries = _get(config, "connection_n_retries") + max_retries = _get(config, "max_retries") + n_jobs = _get(config, "n_jobs") + + cache_directory = os.path.expanduser(short_cache_dir) # create the cache subdirectory try: os.mkdir(cache_directory) @@ -217,9 +232,6 @@ def _setup(): # For other errors, we want to propagate the error as openml does not work without cache pass - avoid_duplicate_runs = config.getboolean("FAKE_SECTION", "avoid_duplicate_runs") - connection_n_retries = config.get("FAKE_SECTION", "connection_n_retries") - max_retries = config.get("FAKE_SECTION", "max_retries") if connection_n_retries > max_retries: raise ValueError( "A higher number of retries than {} is not allowed to keep the " @@ -255,6 +267,18 @@ def _parse_config(): return config +def get_config_as_dict(): + config = dict() + config["apikey"] = apikey + config["server"] = server + config["cachedir"] = cache_directory + config["avoid_duplicate_runs"] = avoid_duplicate_runs + config["connection_n_retries"] = connection_n_retries + config["max_retries"] = max_retries + config["n_jobs"] = n_jobs + return config + + def get_cache_directory(): """Get the current cache directory. @@ -306,6 +330,7 @@ def set_cache_directory(cachedir): "start_using_configuration_for_example", "stop_using_configuration_for_example", "set_n_jobs_for_parallel_runs", + "get_config_as_dict", ] _setup() diff --git a/openml/runs/functions.py b/openml/runs/functions.py index 591ed3374..ae2c86c3a 100644 --- a/openml/runs/functions.py +++ b/openml/runs/functions.py @@ -454,10 +454,13 @@ def _run_task_get_arffcontent( ): jobs.append((n_fit, rep_no, fold_no, sample_no)) + # The forked child process may not copy the configuration state of OpenML from the parent. + # Current configuration setup needs to be copied and passed to the child processes. + _config = config.get_config_as_dict() # Execute runs in parallel # assuming the same number of tasks as workers (n_jobs), the total compute time for this # statement will be similar to the slowest run - job_rvals = Parallel(verbose=0, n_jobs=openml.config.n_jobs)( + job_rvals = Parallel(verbose=0, n_jobs=config.n_jobs)( delayed(_run_task_get_arffcontent_parallel_helper)( extension=extension, flow=flow, @@ -467,6 +470,7 @@ def _run_task_get_arffcontent( sample_no=sample_no, task=task, dataset_format=dataset_format, + configuration=_config, ) for n_fit, rep_no, fold_no, sample_no in jobs ) # job_rvals contain the output of all the runs with one-to-one correspondence with `jobs` @@ -603,6 +607,7 @@ def _run_task_get_arffcontent_parallel_helper( sample_no: int, task: OpenMLTask, dataset_format: str, + configuration: Dict = None, ) -> Tuple[ np.ndarray, Optional[pd.DataFrame], @@ -611,6 +616,10 @@ def _run_task_get_arffcontent_parallel_helper( Optional[OpenMLRunTrace], "OrderedDict[str, float]", ]: + # Sets up the OpenML instantiated in the child process to match that of the parent's + # if configuration=None, loads the default + config._setup(configuration) + train_indices, test_indices = task.get_train_test_split_indices( repeat=rep_no, fold=fold_no, sample=sample_no ) diff --git a/tests/test_openml/test_config.py b/tests/test_openml/test_config.py index 88136dbd9..16214b7c3 100644 --- a/tests/test_openml/test_config.py +++ b/tests/test_openml/test_config.py @@ -11,6 +11,34 @@ def test_config_loading(self): self.assertTrue(os.path.exists(openml.config.config_file)) self.assertTrue(os.path.isdir(os.path.expanduser("~/.openml"))) + def test_get_config_as_dict(self): + """ Checks if the current configuration is returned accurately as a dict. """ + config = openml.config.get_config_as_dict() + self.assertIsInstance(config, dict) + self.assertEqual(len(config), 7) + self.assertEqual(config.get("server"), "https://test.openml.org/api/v1/xml") + self.assertEqual(config.get("apikey"), "610344db6388d9ba34f6db45a3cf71de") + self.assertEqual(config.get("cachedir"), self.workdir) + self.assertEqual(config.get("avoid_duplicate_runs"), False) + self.assertEqual(config.get("max_retries"), 20) + self.assertEqual(config.get("n_jobs"), 4) + + def test_setup_with_config(self): + """ Checks if the OpenML configuration can be updated using _setup(). """ + _config = dict() + _config["apikey"] = "610344db6388d9ba34f6db45a3cf71de" + _config["server"] = "https://www.openml.org/api/v1/xml" + _config["cachedir"] = self.workdir + _config["avoid_duplicate_runs"] = True + _config["connection_n_retries"] = 100 + _config["max_retries"] = 1000 + _config["n_jobs"] = 64 + orig_config = openml.config.get_config_as_dict() + openml.config._setup(_config) + updated_config = openml.config.get_config_as_dict() + openml.config._setup(orig_config) # important to not affect other unit tests + self.assertDictEqual(_config, updated_config) + class TestConfigurationForExamples(openml.testing.TestBase): def test_switch_to_example_configuration(self): diff --git a/tests/test_runs/test_run_functions.py b/tests/test_runs/test_run_functions.py index be96d4383..58b7da36e 100644 --- a/tests/test_runs/test_run_functions.py +++ b/tests/test_runs/test_run_functions.py @@ -1577,51 +1577,26 @@ def test_format_prediction_task_regression(self): res = format_prediction(regression, *ignored_input) self.assertListEqual(res, [0] * 5) - @unittest.mock.patch("openml.runs.functions._run_task_get_arffcontent_parallel_helper") - def test__run_task_get_arffcontent_2(self, mock): - # Unit test style test - def side_effect(*args, **kwargs): - return ( - np.array([0, 1]), - np.array([[0.8, 0.2], [0.2, 0.8]]), - np.array([1, 2]), - np.array([1, 1]), - None, - {}, - ) - - mock.side_effect = side_effect - - task = openml.tasks.get_task(7) - + def test__run_task_get_arffcontent_2(self): + """ Tests if a run executed in parallel is collated correctly. """ + task = openml.tasks.get_task(7) # Supervised Classification on kr-vs-kp + x, y = task.get_X_and_y(dataset_format="dataframe") + num_instances = x.shape[0] + line_length = 6 + len(task.class_labels) flow = unittest.mock.Mock() flow.name = "dummy" clf = SGDClassifier(loss="log", random_state=1) - - # Unit test doesn't work with loky and multiprocessing backend - for n_jobs, backend, call_count in ( - (1, "sequential", 10), - (1, "threading", 20), - (-1, "threading", 30), - (2, "threading", 40), - (None, "threading", 50), - ): - with parallel_backend(backend, n_jobs=n_jobs): - res = openml.runs.functions._run_task_get_arffcontent( - flow=flow, - extension=self.extension, - model=clf, - task=task, - add_local_measures=True, - dataset_format="dataframe", - ) - assert len(res) == 4, len(res) # General function interface - assert len(res[0]) == 20 # 10 folds x 2 predictions returned - assert res[1] is None # No trace - assert len(res[2]) == 1 - assert len(res[2]["predictive_accuracy"]) == 1 - assert len(res[2]["predictive_accuracy"][0]) == 10 - assert len(res[3]) == 1 - assert len(res[3]["predictive_accuracy"]) == 1 - assert len(res[3]["predictive_accuracy"][0]) == 10 - assert mock.call_count == call_count, (mock.call_count, call_count) + with parallel_backend("loky", n_jobs=2): + res = openml.runs.functions._run_task_get_arffcontent( + flow=flow, + extension=self.extension, + model=clf, + task=task, + add_local_measures=True, + dataset_format="array", + ) + self.assertEqual(type(res[0]), list) + self.assertEqual(len(res[0]), num_instances) + self.assertEqual(len(res[0][0]), line_length) + self.assertEqual(len(res[2]), 7) + self.assertEqual(len(res[3]), 7) From 43ae42da964f518c2e066f4f46b843307042cf7c Mon Sep 17 00:00:00 2001 From: neeratyoy Date: Fri, 5 Feb 2021 14:47:47 +0100 Subject: [PATCH 3/8] More unit tests --- openml/config.py | 13 ------ openml/runs/functions.py | 7 ++- openml/testing.py | 2 + tests/test_openml/test_config.py | 4 +- tests/test_runs/test_run_functions.py | 63 ++++++++++++++++++++++++++- 5 files changed, 70 insertions(+), 19 deletions(-) diff --git a/openml/config.py b/openml/config.py index 339658ac0..2e0380951 100644 --- a/openml/config.py +++ b/openml/config.py @@ -89,7 +89,6 @@ def set_file_log_level(file_output_level: int): "avoid_duplicate_runs": "True", "connection_n_retries": 10, "max_retries": 20, - "n_jobs": 4, } config_file = os.path.expanduser(os.path.join("~", ".openml", "config")) @@ -119,7 +118,6 @@ def get_server_base_url() -> str: # Number of retries if the connection breaks connection_n_retries = _defaults["connection_n_retries"] max_retries = _defaults["max_retries"] -n_jobs = _defaults["n_jobs"] class ConfigurationForExamples: @@ -172,12 +170,6 @@ def stop_using_configuration_for_example(cls): apikey = cls._last_used_key cls._start_last_called = False - @classmethod - def set_n_jobs_for_parallel_runs(cls, n=4): - """ Set the number of workers to be used while running a flow/model on a task. """ - global n_jobs - n_jobs = n - def _setup(config=None): """Setup openml package. Called on first import. @@ -194,7 +186,6 @@ def _setup(config=None): global avoid_duplicate_runs global connection_n_retries global max_retries - global n_jobs # read config file, create cache directory try: @@ -222,7 +213,6 @@ def _get(config, key): short_cache_dir = _get(config, "cachedir") connection_n_retries = _get(config, "connection_n_retries") max_retries = _get(config, "max_retries") - n_jobs = _get(config, "n_jobs") cache_directory = os.path.expanduser(short_cache_dir) # create the cache subdirectory @@ -275,7 +265,6 @@ def get_config_as_dict(): config["avoid_duplicate_runs"] = avoid_duplicate_runs config["connection_n_retries"] = connection_n_retries config["max_retries"] = max_retries - config["n_jobs"] = n_jobs return config @@ -321,7 +310,6 @@ def set_cache_directory(cachedir): ConfigurationForExamples.start_using_configuration_for_example ) stop_using_configuration_for_example = ConfigurationForExamples.stop_using_configuration_for_example -set_n_jobs_for_parallel_runs = ConfigurationForExamples.set_n_jobs_for_parallel_runs __all__ = [ @@ -329,7 +317,6 @@ def set_cache_directory(cachedir): "set_cache_directory", "start_using_configuration_for_example", "stop_using_configuration_for_example", - "set_n_jobs_for_parallel_runs", "get_config_as_dict", ] diff --git a/openml/runs/functions.py b/openml/runs/functions.py index ae2c86c3a..aaf7034ca 100644 --- a/openml/runs/functions.py +++ b/openml/runs/functions.py @@ -55,6 +55,7 @@ def run_model_on_task( upload_flow: bool = False, return_flow: bool = False, dataset_format: str = "dataframe", + n_jobs: int = None, ) -> Union[OpenMLRun, Tuple[OpenMLRun, OpenMLFlow]]: """Run the model on the dataset defined by the task. @@ -132,6 +133,7 @@ def get_task_and_type_conversion(task: Union[int, str, OpenMLTask]) -> OpenMLTas add_local_measures=add_local_measures, upload_flow=upload_flow, dataset_format=dataset_format, + n_jobs=n_jobs, ) if return_flow: return run, flow @@ -147,6 +149,7 @@ def run_flow_on_task( add_local_measures: bool = True, upload_flow: bool = False, dataset_format: str = "dataframe", + n_jobs: int = None, ) -> OpenMLRun: """Run the model provided by the flow on the dataset defined by task. @@ -266,6 +269,7 @@ def run_flow_on_task( extension=flow.extension, add_local_measures=add_local_measures, dataset_format=dataset_format, + n_jobs=n_jobs, ) data_content, trace, fold_evaluations, sample_evaluations = res @@ -426,6 +430,7 @@ def _run_task_get_arffcontent( extension: "Extension", add_local_measures: bool, dataset_format: str, + n_jobs: int = None, ) -> Tuple[ List[List], Optional[OpenMLRunTrace], @@ -460,7 +465,7 @@ def _run_task_get_arffcontent( # Execute runs in parallel # assuming the same number of tasks as workers (n_jobs), the total compute time for this # statement will be similar to the slowest run - job_rvals = Parallel(verbose=0, n_jobs=config.n_jobs)( + job_rvals = Parallel(verbose=0, n_jobs=n_jobs)( delayed(_run_task_get_arffcontent_parallel_helper)( extension=extension, flow=flow, diff --git a/openml/testing.py b/openml/testing.py index 31bd87b9a..ca04a0b15 100644 --- a/openml/testing.py +++ b/openml/testing.py @@ -110,6 +110,8 @@ def setUp(self, n_levels: int = 1): # Increase the number of retries to avoid spurious server failures self.connection_n_retries = openml.config.connection_n_retries openml.config.connection_n_retries = 10 + # Number of processes to parallelize any evaluation made by a unit test + self.n_jobs = 2 def tearDown(self): os.chdir(self.cwd) diff --git a/tests/test_openml/test_config.py b/tests/test_openml/test_config.py index 16214b7c3..85849236a 100644 --- a/tests/test_openml/test_config.py +++ b/tests/test_openml/test_config.py @@ -15,13 +15,12 @@ def test_get_config_as_dict(self): """ Checks if the current configuration is returned accurately as a dict. """ config = openml.config.get_config_as_dict() self.assertIsInstance(config, dict) - self.assertEqual(len(config), 7) + self.assertEqual(len(config), 6) self.assertEqual(config.get("server"), "https://test.openml.org/api/v1/xml") self.assertEqual(config.get("apikey"), "610344db6388d9ba34f6db45a3cf71de") self.assertEqual(config.get("cachedir"), self.workdir) self.assertEqual(config.get("avoid_duplicate_runs"), False) self.assertEqual(config.get("max_retries"), 20) - self.assertEqual(config.get("n_jobs"), 4) def test_setup_with_config(self): """ Checks if the OpenML configuration can be updated using _setup(). """ @@ -32,7 +31,6 @@ def test_setup_with_config(self): _config["avoid_duplicate_runs"] = True _config["connection_n_retries"] = 100 _config["max_retries"] = 1000 - _config["n_jobs"] = 64 orig_config = openml.config.get_config_as_dict() openml.config._setup(_config) updated_config = openml.config.get_config_as_dict() diff --git a/tests/test_runs/test_run_functions.py b/tests/test_runs/test_run_functions.py index 58b7da36e..0bd7143a3 100644 --- a/tests/test_runs/test_run_functions.py +++ b/tests/test_runs/test_run_functions.py @@ -1586,17 +1586,76 @@ def test__run_task_get_arffcontent_2(self): flow = unittest.mock.Mock() flow.name = "dummy" clf = SGDClassifier(loss="log", random_state=1) - with parallel_backend("loky", n_jobs=2): + with parallel_backend("loky", n_jobs=self.n_jobs): res = openml.runs.functions._run_task_get_arffcontent( flow=flow, extension=self.extension, model=clf, task=task, add_local_measures=True, - dataset_format="array", + dataset_format="array", # "dataframe" would require handling of categoricals + n_jobs=self.n_jobs, ) self.assertEqual(type(res[0]), list) self.assertEqual(len(res[0]), num_instances) self.assertEqual(len(res[0][0]), line_length) self.assertEqual(len(res[2]), 7) self.assertEqual(len(res[3]), 7) + + @unittest.skipIf( + LooseVersion(sklearn.__version__) < "0.21", + reason="couldn't perform local tests successfully w/o bloating RAM", + ) + def test_joblib_backends(self): + """ Tests evaluation of a run using various joblib backends and n_jobs. """ + task = openml.tasks.get_task(7) # Supervised Classification on kr-vs-kp + x, y = task.get_X_and_y(dataset_format="dataframe") + num_instances = x.shape[0] + line_length = 6 + len(task.class_labels) + flow = unittest.mock.Mock() + flow.name = "dummy" + + for n_jobs, backend, len_time_stats in [ + (1, "loky", 7), + (2, "loky", 4), + (-1, "loky", 1), + (1, "threading", 7), + (-1, "threading", 1), + (1, "sequential", 7), + ]: + clf = sklearn.model_selection.RandomizedSearchCV( + estimator=sklearn.ensemble.RandomForestClassifier(n_estimators=5), + param_distributions={ + "max_depth": [3, None], + "max_features": [1, 2, 3, 4], + "min_samples_split": [2, 3, 4, 5, 6, 7, 8, 9, 10], + "min_samples_leaf": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], + "bootstrap": [True, False], + "criterion": ["gini", "entropy"], + }, + random_state=1, + cv=sklearn.model_selection.StratifiedKFold( + n_splits=2, shuffle=True, random_state=1 + ), + n_iter=5, + n_jobs=n_jobs, + ) + with parallel_backend(backend, n_jobs=n_jobs): + res = openml.runs.functions._run_task_get_arffcontent( + flow=flow, + extension=self.extension, + model=clf, + task=task, + add_local_measures=True, + dataset_format="array", # "dataframe" would require handling of categoricals + n_jobs=n_jobs, + ) + self.assertEqual(type(res[0]), list) + self.assertEqual(len(res[0]), num_instances) + self.assertEqual(len(res[0][0]), line_length) + # usercpu_time_millis_* not recorded when n_jobs > 1 + # *_time_millis_* not recorded when n_jobs = -1 + self.assertEqual(len(res[2]), len_time_stats) + self.assertEqual(len(res[3]), len_time_stats) + self.assertEqual(len(res[2]["predictive_accuracy"][0]), 10) + self.assertEqual(len(res[3]["predictive_accuracy"][0]), 10) From e7311787e8c09c10890d64cfa4958e2a5fa65639 Mon Sep 17 00:00:00 2001 From: neeratyoy Date: Mon, 8 Feb 2021 10:55:20 +0100 Subject: [PATCH 4/8] Docstrings + unit test robustness --- openml/runs/functions.py | 14 +++++++++++--- tests/test_openml/test_config.py | 13 ++++++++----- tests/test_runs/test_run_functions.py | 20 ++++++++++++++++++-- 3 files changed, 37 insertions(+), 10 deletions(-) diff --git a/openml/runs/functions.py b/openml/runs/functions.py index aaf7034ca..6558bb4eb 100644 --- a/openml/runs/functions.py +++ b/openml/runs/functions.py @@ -55,7 +55,7 @@ def run_model_on_task( upload_flow: bool = False, return_flow: bool = False, dataset_format: str = "dataframe", - n_jobs: int = None, + n_jobs: Optional[int] = None, ) -> Union[OpenMLRun, Tuple[OpenMLRun, OpenMLFlow]]: """Run the model on the dataset defined by the task. @@ -86,6 +86,10 @@ def run_model_on_task( dataset_format : str (default='dataframe') If 'array', the dataset is passed to the model as a numpy array. If 'dataframe', the dataset is passed to the model as a pandas dataframe. + n_jobs : int (default=None) + The number of processes/threads to distribute the evaluation asynchronously. + If `None` or `1`, then the evaluation is treated as synchronous and processed sequentially. + If `-1`, then the job uses as many cores available. Returns ------- @@ -149,7 +153,7 @@ def run_flow_on_task( add_local_measures: bool = True, upload_flow: bool = False, dataset_format: str = "dataframe", - n_jobs: int = None, + n_jobs: Optional[int] = None, ) -> OpenMLRun: """Run the model provided by the flow on the dataset defined by task. @@ -185,6 +189,10 @@ def run_flow_on_task( dataset_format : str (default='dataframe') If 'array', the dataset is passed to the model as a numpy array. If 'dataframe', the dataset is passed to the model as a pandas dataframe. + n_jobs : int (default=None) + The number of processes/threads to distribute the evaluation asynchronously. + If `None` or `1`, then the evaluation is treated as synchronous and processed sequentially. + If `-1`, then the job uses as many cores available. Returns ------- @@ -660,7 +668,7 @@ def _run_task_get_arffcontent_parallel_helper( fold_no, sample_no, ) - (pred_y, proba_y, user_defined_measures_fold, trace,) = extension._run_model_on_fold( + pred_y, proba_y, user_defined_measures_fold, trace, = extension._run_model_on_fold( model=model, task=task, X_train=train_x, diff --git a/tests/test_openml/test_config.py b/tests/test_openml/test_config.py index 85849236a..e4610ffd0 100644 --- a/tests/test_openml/test_config.py +++ b/tests/test_openml/test_config.py @@ -14,13 +14,16 @@ def test_config_loading(self): def test_get_config_as_dict(self): """ Checks if the current configuration is returned accurately as a dict. """ config = openml.config.get_config_as_dict() + _config = dict() + _config["apikey"] = "610344db6388d9ba34f6db45a3cf71de" + _config["server"] = "https://test.openml.org/api/v1/xml" + _config["cachedir"] = self.workdir + _config["avoid_duplicate_runs"] = False + _config["connection_n_retries"] = 10 + _config["max_retries"] = 20 self.assertIsInstance(config, dict) self.assertEqual(len(config), 6) - self.assertEqual(config.get("server"), "https://test.openml.org/api/v1/xml") - self.assertEqual(config.get("apikey"), "610344db6388d9ba34f6db45a3cf71de") - self.assertEqual(config.get("cachedir"), self.workdir) - self.assertEqual(config.get("avoid_duplicate_runs"), False) - self.assertEqual(config.get("max_retries"), 20) + self.assertDictEqual(config, _config) def test_setup_with_config(self): """ Checks if the OpenML configuration can be updated using _setup(). """ diff --git a/tests/test_runs/test_run_functions.py b/tests/test_runs/test_run_functions.py index 0bd7143a3..42e1d9b39 100644 --- a/tests/test_runs/test_run_functions.py +++ b/tests/test_runs/test_run_functions.py @@ -1577,7 +1577,8 @@ def test_format_prediction_task_regression(self): res = format_prediction(regression, *ignored_input) self.assertListEqual(res, [0] * 5) - def test__run_task_get_arffcontent_2(self): + @unittest.mock.patch("joblib.parallel_backend") + def test__run_task_get_arffcontent_2(self, parallel_mock): """ Tests if a run executed in parallel is collated correctly. """ task = openml.tasks.get_task(7) # Supervised Classification on kr-vs-kp x, y = task.get_X_and_y(dataset_format="dataframe") @@ -1596,11 +1597,26 @@ def test__run_task_get_arffcontent_2(self): dataset_format="array", # "dataframe" would require handling of categoricals n_jobs=self.n_jobs, ) - self.assertEqual(type(res[0]), list) + self.assertIsInstance(res[0], list) self.assertEqual(len(res[0]), num_instances) self.assertEqual(len(res[0][0]), line_length) self.assertEqual(len(res[2]), 7) self.assertEqual(len(res[3]), 7) + expected_scores = [ + 0.965625, + 0.94375, + 0.946875, + 0.953125, + 0.96875, + 0.965625, + 0.9435736677115988, + 0.9467084639498433, + 0.9749216300940439, + 0.9655172413793104, + ] + scores = [v for k, v in res[2]["predictive_accuracy"][0].items()] + self.assertSequenceEqual(scores, expected_scores, seq_type=list) + self.assertEqual(parallel_mock.call_count, 0) @unittest.skipIf( LooseVersion(sklearn.__version__) < "0.21", From 6a880f5e2317c1e6b47b93ce949a602cde449e4d Mon Sep 17 00:00:00 2001 From: neeratyoy Date: Mon, 8 Feb 2021 17:03:31 +0100 Subject: [PATCH 5/8] Skipping unit test for lower sklearn versions --- tests/test_runs/test_run_functions.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/test_runs/test_run_functions.py b/tests/test_runs/test_run_functions.py index 42e1d9b39..a25257b94 100644 --- a/tests/test_runs/test_run_functions.py +++ b/tests/test_runs/test_run_functions.py @@ -1577,6 +1577,10 @@ def test_format_prediction_task_regression(self): res = format_prediction(regression, *ignored_input) self.assertListEqual(res, [0] * 5) + @unittest.skipIf( + LooseVersion(sklearn.__version__) < "0.21", + reason="couldn't perform local tests successfully w/o bloating RAM", + ) @unittest.mock.patch("joblib.parallel_backend") def test__run_task_get_arffcontent_2(self, parallel_mock): """ Tests if a run executed in parallel is collated correctly. """ From 52e1d34cc973b392920cbb82b41699b8ea0ec906 Mon Sep 17 00:00:00 2001 From: neeratyoy Date: Mon, 8 Feb 2021 17:14:41 +0100 Subject: [PATCH 6/8] Skipping unit test for lower sklearn versions --- tests/test_runs/test_run_functions.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/test_runs/test_run_functions.py b/tests/test_runs/test_run_functions.py index 42e1d9b39..a25257b94 100644 --- a/tests/test_runs/test_run_functions.py +++ b/tests/test_runs/test_run_functions.py @@ -1577,6 +1577,10 @@ def test_format_prediction_task_regression(self): res = format_prediction(regression, *ignored_input) self.assertListEqual(res, [0] * 5) + @unittest.skipIf( + LooseVersion(sklearn.__version__) < "0.21", + reason="couldn't perform local tests successfully w/o bloating RAM", + ) @unittest.mock.patch("joblib.parallel_backend") def test__run_task_get_arffcontent_2(self, parallel_mock): """ Tests if a run executed in parallel is collated correctly. """ From 17e3916b8fbfe914facb4a3dc56869e65073660b Mon Sep 17 00:00:00 2001 From: neeratyoy Date: Tue, 16 Feb 2021 18:39:50 +0100 Subject: [PATCH 7/8] Refining unit tests --- openml/testing.py | 2 -- tests/test_runs/test_run_functions.py | 31 ++++++++++++++++----------- 2 files changed, 19 insertions(+), 14 deletions(-) diff --git a/openml/testing.py b/openml/testing.py index ca04a0b15..31bd87b9a 100644 --- a/openml/testing.py +++ b/openml/testing.py @@ -110,8 +110,6 @@ def setUp(self, n_levels: int = 1): # Increase the number of retries to avoid spurious server failures self.connection_n_retries = openml.config.connection_n_retries openml.config.connection_n_retries = 10 - # Number of processes to parallelize any evaluation made by a unit test - self.n_jobs = 2 def tearDown(self): os.chdir(self.cwd) diff --git a/tests/test_runs/test_run_functions.py b/tests/test_runs/test_run_functions.py index a25257b94..fdbbc1e76 100644 --- a/tests/test_runs/test_run_functions.py +++ b/tests/test_runs/test_run_functions.py @@ -1581,7 +1581,7 @@ def test_format_prediction_task_regression(self): LooseVersion(sklearn.__version__) < "0.21", reason="couldn't perform local tests successfully w/o bloating RAM", ) - @unittest.mock.patch("joblib.parallel_backend") + @unittest.mock.patch("openml.extensions.sklearn.SklearnExtension._run_model_on_fold") def test__run_task_get_arffcontent_2(self, parallel_mock): """ Tests if a run executed in parallel is collated correctly. """ task = openml.tasks.get_task(7) # Supervised Classification on kr-vs-kp @@ -1591,7 +1591,8 @@ def test__run_task_get_arffcontent_2(self, parallel_mock): flow = unittest.mock.Mock() flow.name = "dummy" clf = SGDClassifier(loss="log", random_state=1) - with parallel_backend("loky", n_jobs=self.n_jobs): + n_jobs = 2 + with parallel_backend("loky", n_jobs=n_jobs): res = openml.runs.functions._run_task_get_arffcontent( flow=flow, extension=self.extension, @@ -1599,8 +1600,13 @@ def test__run_task_get_arffcontent_2(self, parallel_mock): task=task, add_local_measures=True, dataset_format="array", # "dataframe" would require handling of categoricals - n_jobs=self.n_jobs, + n_jobs=n_jobs, ) + # This unit test will fail if joblib is unable to distribute successfully since the + # function _run_model_on_fold is being mocked out. However, for a new spawned worker, it + # is not and the mock call_count should remain 0 while the subsequent check of actual + # results should also hold, only on successful distribution of tasks to workers. + self.assertEqual(parallel_mock.call_count, 0) self.assertIsInstance(res[0], list) self.assertEqual(len(res[0]), num_instances) self.assertEqual(len(res[0][0]), line_length) @@ -1620,13 +1626,13 @@ def test__run_task_get_arffcontent_2(self, parallel_mock): ] scores = [v for k, v in res[2]["predictive_accuracy"][0].items()] self.assertSequenceEqual(scores, expected_scores, seq_type=list) - self.assertEqual(parallel_mock.call_count, 0) @unittest.skipIf( LooseVersion(sklearn.__version__) < "0.21", reason="couldn't perform local tests successfully w/o bloating RAM", ) - def test_joblib_backends(self): + @unittest.mock.patch("openml.extensions.sklearn.SklearnExtension._prevent_optimize_n_jobs") + def test_joblib_backends(self, parallel_mock): """ Tests evaluation of a run using various joblib backends and n_jobs. """ task = openml.tasks.get_task(7) # Supervised Classification on kr-vs-kp x, y = task.get_X_and_y(dataset_format="dataframe") @@ -1635,13 +1641,13 @@ def test_joblib_backends(self): flow = unittest.mock.Mock() flow.name = "dummy" - for n_jobs, backend, len_time_stats in [ - (1, "loky", 7), - (2, "loky", 4), - (-1, "loky", 1), - (1, "threading", 7), - (-1, "threading", 1), - (1, "sequential", 7), + for n_jobs, backend, len_time_stats, call_count in [ + (1, "loky", 7, 10), + (2, "loky", 4, 10), + (-1, "loky", 1, 10), + (1, "threading", 7, 20), + (-1, "threading", 1, 30), + (1, "sequential", 7, 40), ]: clf = sklearn.model_selection.RandomizedSearchCV( estimator=sklearn.ensemble.RandomForestClassifier(n_estimators=5), @@ -1679,3 +1685,4 @@ def test_joblib_backends(self): self.assertEqual(len(res[3]), len_time_stats) self.assertEqual(len(res[2]["predictive_accuracy"][0]), 10) self.assertEqual(len(res[3]["predictive_accuracy"][0]), 10) + self.assertEqual(parallel_mock.call_count, call_count) From 2b0622ef04f9e3d789ee1d936ba6dee700220373 Mon Sep 17 00:00:00 2001 From: Matthias Feurer Date: Thu, 18 Feb 2021 06:45:36 +0100 Subject: [PATCH 8/8] fix merge conflict --- openml/config.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/openml/config.py b/openml/config.py index be402f833..8daaa2d5c 100644 --- a/openml/config.py +++ b/openml/config.py @@ -221,7 +221,7 @@ def _setup(config=None): ) if config is None: - config = _parse_config() + config = _parse_config(config_file) def _get(config, key): return config.get("FAKE_SECTION", key) @@ -237,8 +237,8 @@ def _get(config, key): apikey = _get(config, "apikey") server = _get(config, "server") short_cache_dir = _get(config, "cachedir") - connection_n_retries = _get(config, "connection_n_retries") - max_retries = _get(config, "max_retries") + connection_n_retries = int(_get(config, "connection_n_retries")) + max_retries = int(_get(config, "max_retries")) cache_directory = os.path.expanduser(short_cache_dir) # create the cache subdirectory