diff --git a/openml/config.py b/openml/config.py index b9a9788ac..8daaa2d5c 100644 --- a/openml/config.py +++ b/openml/config.py @@ -177,7 +177,7 @@ def stop_using_configuration_for_example(cls): cls._start_last_called = False -def _setup(): +def _setup(config=None): """Setup openml package. Called on first import. Reads the config file and sets up apikey, server, cache appropriately. @@ -220,13 +220,27 @@ def _setup(): "not working properly." % config_dir ) - config = _parse_config(config_file) - apikey = config.get("FAKE_SECTION", "apikey") - server = config.get("FAKE_SECTION", "server") + if config is None: + config = _parse_config(config_file) - cache_dir = config.get("FAKE_SECTION", "cachedir") - cache_directory = os.path.expanduser(cache_dir) + def _get(config, key): + return config.get("FAKE_SECTION", key) + avoid_duplicate_runs = config.getboolean("FAKE_SECTION", "avoid_duplicate_runs") + else: + + def _get(config, key): + return config.get(key) + + avoid_duplicate_runs = config.get("avoid_duplicate_runs") + + apikey = _get(config, "apikey") + server = _get(config, "server") + short_cache_dir = _get(config, "cachedir") + connection_n_retries = int(_get(config, "connection_n_retries")) + max_retries = int(_get(config, "max_retries")) + + cache_directory = os.path.expanduser(short_cache_dir) # create the cache subdirectory if not os.path.exists(cache_directory): try: @@ -237,9 +251,6 @@ def _setup(): "OpenML-Python not working properly." % cache_directory ) - avoid_duplicate_runs = config.getboolean("FAKE_SECTION", "avoid_duplicate_runs") - connection_n_retries = int(config.get("FAKE_SECTION", "connection_n_retries")) - max_retries = int(config.get("FAKE_SECTION", "max_retries")) if connection_n_retries > max_retries: raise ValueError( "A higher number of retries than {} is not allowed to keep the " @@ -268,6 +279,17 @@ def _parse_config(config_file: str): return config +def get_config_as_dict(): + config = dict() + config["apikey"] = apikey + config["server"] = server + config["cachedir"] = cache_directory + config["avoid_duplicate_runs"] = avoid_duplicate_runs + config["connection_n_retries"] = connection_n_retries + config["max_retries"] = max_retries + return config + + def get_cache_directory(): """Get the current cache directory. @@ -307,11 +329,13 @@ def set_cache_directory(cachedir): ) stop_using_configuration_for_example = ConfigurationForExamples.stop_using_configuration_for_example + __all__ = [ "get_cache_directory", "set_cache_directory", "start_using_configuration_for_example", "stop_using_configuration_for_example", + "get_config_as_dict", ] _setup() diff --git a/openml/runs/functions.py b/openml/runs/functions.py index 89b811d10..6558bb4eb 100644 --- a/openml/runs/functions.py +++ b/openml/runs/functions.py @@ -12,6 +12,7 @@ import xmltodict import numpy as np import pandas as pd +from joblib.parallel import Parallel, delayed import openml import openml.utils @@ -54,6 +55,7 @@ def run_model_on_task( upload_flow: bool = False, return_flow: bool = False, dataset_format: str = "dataframe", + n_jobs: Optional[int] = None, ) -> Union[OpenMLRun, Tuple[OpenMLRun, OpenMLFlow]]: """Run the model on the dataset defined by the task. @@ -84,6 +86,10 @@ def run_model_on_task( dataset_format : str (default='dataframe') If 'array', the dataset is passed to the model as a numpy array. If 'dataframe', the dataset is passed to the model as a pandas dataframe. + n_jobs : int (default=None) + The number of processes/threads to distribute the evaluation asynchronously. + If `None` or `1`, then the evaluation is treated as synchronous and processed sequentially. + If `-1`, then the job uses as many cores available. Returns ------- @@ -131,6 +137,7 @@ def get_task_and_type_conversion(task: Union[int, str, OpenMLTask]) -> OpenMLTas add_local_measures=add_local_measures, upload_flow=upload_flow, dataset_format=dataset_format, + n_jobs=n_jobs, ) if return_flow: return run, flow @@ -146,6 +153,7 @@ def run_flow_on_task( add_local_measures: bool = True, upload_flow: bool = False, dataset_format: str = "dataframe", + n_jobs: Optional[int] = None, ) -> OpenMLRun: """Run the model provided by the flow on the dataset defined by task. @@ -181,6 +189,10 @@ def run_flow_on_task( dataset_format : str (default='dataframe') If 'array', the dataset is passed to the model as a numpy array. If 'dataframe', the dataset is passed to the model as a pandas dataframe. + n_jobs : int (default=None) + The number of processes/threads to distribute the evaluation asynchronously. + If `None` or `1`, then the evaluation is treated as synchronous and processed sequentially. + If `-1`, then the job uses as many cores available. Returns ------- @@ -265,6 +277,7 @@ def run_flow_on_task( extension=flow.extension, add_local_measures=add_local_measures, dataset_format=dataset_format, + n_jobs=n_jobs, ) data_content, trace, fold_evaluations, sample_evaluations = res @@ -425,6 +438,7 @@ def _run_task_get_arffcontent( extension: "Extension", add_local_measures: bool, dataset_format: str, + n_jobs: int = None, ) -> Tuple[ List[List], Optional[OpenMLRunTrace], @@ -447,55 +461,37 @@ def _run_task_get_arffcontent( # methods, less maintenance, less confusion) num_reps, num_folds, num_samples = task.get_split_dimensions() + jobs = [] for n_fit, (rep_no, fold_no, sample_no) in enumerate( itertools.product(range(num_reps), range(num_folds), range(num_samples),), start=1 ): - - train_indices, test_indices = task.get_train_test_split_indices( - repeat=rep_no, fold=fold_no, sample=sample_no - ) - if isinstance(task, OpenMLSupervisedTask): - x, y = task.get_X_and_y(dataset_format=dataset_format) - if dataset_format == "dataframe": - train_x = x.iloc[train_indices] - train_y = y.iloc[train_indices] - test_x = x.iloc[test_indices] - test_y = y.iloc[test_indices] - else: - train_x = x[train_indices] - train_y = y[train_indices] - test_x = x[test_indices] - test_y = y[test_indices] - elif isinstance(task, OpenMLClusteringTask): - x = task.get_X(dataset_format=dataset_format) - if dataset_format == "dataframe": - train_x = x.iloc[train_indices] - else: - train_x = x[train_indices] - train_y = None - test_x = None - test_y = None - else: - raise NotImplementedError(task.task_type) - - config.logger.info( - "Going to execute flow '%s' on task %d for repeat %d fold %d sample %d.", - flow.name, - task.task_id, - rep_no, - fold_no, - sample_no, - ) - - pred_y, proba_y, user_defined_measures_fold, trace = extension._run_model_on_fold( + jobs.append((n_fit, rep_no, fold_no, sample_no)) + + # The forked child process may not copy the configuration state of OpenML from the parent. + # Current configuration setup needs to be copied and passed to the child processes. + _config = config.get_config_as_dict() + # Execute runs in parallel + # assuming the same number of tasks as workers (n_jobs), the total compute time for this + # statement will be similar to the slowest run + job_rvals = Parallel(verbose=0, n_jobs=n_jobs)( + delayed(_run_task_get_arffcontent_parallel_helper)( + extension=extension, + flow=flow, + fold_no=fold_no, model=model, - task=task, - X_train=train_x, - y_train=train_y, rep_no=rep_no, - fold_no=fold_no, - X_test=test_x, + sample_no=sample_no, + task=task, + dataset_format=dataset_format, + configuration=_config, ) + for n_fit, rep_no, fold_no, sample_no in jobs + ) # job_rvals contain the output of all the runs with one-to-one correspondence with `jobs` + + for n_fit, rep_no, fold_no, sample_no in jobs: + pred_y, proba_y, test_indices, test_y, trace, user_defined_measures_fold = job_rvals[ + n_fit - 1 + ] if trace is not None: traces.append(trace) @@ -615,6 +611,75 @@ def _calculate_local_measure(sklearn_fn, openml_name): ) +def _run_task_get_arffcontent_parallel_helper( + extension: "Extension", + flow: OpenMLFlow, + fold_no: int, + model: Any, + rep_no: int, + sample_no: int, + task: OpenMLTask, + dataset_format: str, + configuration: Dict = None, +) -> Tuple[ + np.ndarray, + Optional[pd.DataFrame], + np.ndarray, + Optional[pd.DataFrame], + Optional[OpenMLRunTrace], + "OrderedDict[str, float]", +]: + # Sets up the OpenML instantiated in the child process to match that of the parent's + # if configuration=None, loads the default + config._setup(configuration) + + train_indices, test_indices = task.get_train_test_split_indices( + repeat=rep_no, fold=fold_no, sample=sample_no + ) + + if isinstance(task, OpenMLSupervisedTask): + x, y = task.get_X_and_y(dataset_format=dataset_format) + if dataset_format == "dataframe": + train_x = x.iloc[train_indices] + train_y = y.iloc[train_indices] + test_x = x.iloc[test_indices] + test_y = y.iloc[test_indices] + else: + train_x = x[train_indices] + train_y = y[train_indices] + test_x = x[test_indices] + test_y = y[test_indices] + elif isinstance(task, OpenMLClusteringTask): + x = task.get_X(dataset_format=dataset_format) + if dataset_format == "dataframe": + train_x = x.iloc[train_indices] + else: + train_x = x[train_indices] + train_y = None + test_x = None + test_y = None + else: + raise NotImplementedError(task.task_type) + config.logger.info( + "Going to execute flow '%s' on task %d for repeat %d fold %d sample %d.", + flow.name, + task.task_id, + rep_no, + fold_no, + sample_no, + ) + pred_y, proba_y, user_defined_measures_fold, trace, = extension._run_model_on_fold( + model=model, + task=task, + X_train=train_x, + y_train=train_y, + rep_no=rep_no, + fold_no=fold_no, + X_test=test_x, + ) + return pred_y, proba_y, test_indices, test_y, trace, user_defined_measures_fold + + def get_runs(run_ids): """Gets all runs in run_ids list. diff --git a/tests/test_openml/test_config.py b/tests/test_openml/test_config.py index 73507aabb..35488c579 100644 --- a/tests/test_openml/test_config.py +++ b/tests/test_openml/test_config.py @@ -25,6 +25,35 @@ def test_non_writable_home(self, log_handler_mock, warnings_mock, expanduser_moc self.assertEqual(log_handler_mock.call_count, 1) self.assertFalse(log_handler_mock.call_args_list[0][1]["create_file_handler"]) + def test_get_config_as_dict(self): + """ Checks if the current configuration is returned accurately as a dict. """ + config = openml.config.get_config_as_dict() + _config = dict() + _config["apikey"] = "610344db6388d9ba34f6db45a3cf71de" + _config["server"] = "https://test.openml.org/api/v1/xml" + _config["cachedir"] = self.workdir + _config["avoid_duplicate_runs"] = False + _config["connection_n_retries"] = 10 + _config["max_retries"] = 20 + self.assertIsInstance(config, dict) + self.assertEqual(len(config), 6) + self.assertDictEqual(config, _config) + + def test_setup_with_config(self): + """ Checks if the OpenML configuration can be updated using _setup(). """ + _config = dict() + _config["apikey"] = "610344db6388d9ba34f6db45a3cf71de" + _config["server"] = "https://www.openml.org/api/v1/xml" + _config["cachedir"] = self.workdir + _config["avoid_duplicate_runs"] = True + _config["connection_n_retries"] = 100 + _config["max_retries"] = 1000 + orig_config = openml.config.get_config_as_dict() + openml.config._setup(_config) + updated_config = openml.config.get_config_as_dict() + openml.config._setup(orig_config) # important to not affect other unit tests + self.assertDictEqual(_config, updated_config) + class TestConfigurationForExamples(openml.testing.TestBase): def test_switch_to_example_configuration(self): diff --git a/tests/test_runs/test_run_functions.py b/tests/test_runs/test_run_functions.py index e7c0c06fc..fdbbc1e76 100644 --- a/tests/test_runs/test_run_functions.py +++ b/tests/test_runs/test_run_functions.py @@ -10,6 +10,7 @@ import unittest.mock import numpy as np +from joblib import parallel_backend import openml import openml.exceptions @@ -1575,3 +1576,113 @@ def test_format_prediction_task_regression(self): ignored_input = [0] * 5 res = format_prediction(regression, *ignored_input) self.assertListEqual(res, [0] * 5) + + @unittest.skipIf( + LooseVersion(sklearn.__version__) < "0.21", + reason="couldn't perform local tests successfully w/o bloating RAM", + ) + @unittest.mock.patch("openml.extensions.sklearn.SklearnExtension._run_model_on_fold") + def test__run_task_get_arffcontent_2(self, parallel_mock): + """ Tests if a run executed in parallel is collated correctly. """ + task = openml.tasks.get_task(7) # Supervised Classification on kr-vs-kp + x, y = task.get_X_and_y(dataset_format="dataframe") + num_instances = x.shape[0] + line_length = 6 + len(task.class_labels) + flow = unittest.mock.Mock() + flow.name = "dummy" + clf = SGDClassifier(loss="log", random_state=1) + n_jobs = 2 + with parallel_backend("loky", n_jobs=n_jobs): + res = openml.runs.functions._run_task_get_arffcontent( + flow=flow, + extension=self.extension, + model=clf, + task=task, + add_local_measures=True, + dataset_format="array", # "dataframe" would require handling of categoricals + n_jobs=n_jobs, + ) + # This unit test will fail if joblib is unable to distribute successfully since the + # function _run_model_on_fold is being mocked out. However, for a new spawned worker, it + # is not and the mock call_count should remain 0 while the subsequent check of actual + # results should also hold, only on successful distribution of tasks to workers. + self.assertEqual(parallel_mock.call_count, 0) + self.assertIsInstance(res[0], list) + self.assertEqual(len(res[0]), num_instances) + self.assertEqual(len(res[0][0]), line_length) + self.assertEqual(len(res[2]), 7) + self.assertEqual(len(res[3]), 7) + expected_scores = [ + 0.965625, + 0.94375, + 0.946875, + 0.953125, + 0.96875, + 0.965625, + 0.9435736677115988, + 0.9467084639498433, + 0.9749216300940439, + 0.9655172413793104, + ] + scores = [v for k, v in res[2]["predictive_accuracy"][0].items()] + self.assertSequenceEqual(scores, expected_scores, seq_type=list) + + @unittest.skipIf( + LooseVersion(sklearn.__version__) < "0.21", + reason="couldn't perform local tests successfully w/o bloating RAM", + ) + @unittest.mock.patch("openml.extensions.sklearn.SklearnExtension._prevent_optimize_n_jobs") + def test_joblib_backends(self, parallel_mock): + """ Tests evaluation of a run using various joblib backends and n_jobs. """ + task = openml.tasks.get_task(7) # Supervised Classification on kr-vs-kp + x, y = task.get_X_and_y(dataset_format="dataframe") + num_instances = x.shape[0] + line_length = 6 + len(task.class_labels) + flow = unittest.mock.Mock() + flow.name = "dummy" + + for n_jobs, backend, len_time_stats, call_count in [ + (1, "loky", 7, 10), + (2, "loky", 4, 10), + (-1, "loky", 1, 10), + (1, "threading", 7, 20), + (-1, "threading", 1, 30), + (1, "sequential", 7, 40), + ]: + clf = sklearn.model_selection.RandomizedSearchCV( + estimator=sklearn.ensemble.RandomForestClassifier(n_estimators=5), + param_distributions={ + "max_depth": [3, None], + "max_features": [1, 2, 3, 4], + "min_samples_split": [2, 3, 4, 5, 6, 7, 8, 9, 10], + "min_samples_leaf": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], + "bootstrap": [True, False], + "criterion": ["gini", "entropy"], + }, + random_state=1, + cv=sklearn.model_selection.StratifiedKFold( + n_splits=2, shuffle=True, random_state=1 + ), + n_iter=5, + n_jobs=n_jobs, + ) + with parallel_backend(backend, n_jobs=n_jobs): + res = openml.runs.functions._run_task_get_arffcontent( + flow=flow, + extension=self.extension, + model=clf, + task=task, + add_local_measures=True, + dataset_format="array", # "dataframe" would require handling of categoricals + n_jobs=n_jobs, + ) + self.assertEqual(type(res[0]), list) + self.assertEqual(len(res[0]), num_instances) + self.assertEqual(len(res[0][0]), line_length) + # usercpu_time_millis_* not recorded when n_jobs > 1 + # *_time_millis_* not recorded when n_jobs = -1 + self.assertEqual(len(res[2]), len_time_stats) + self.assertEqual(len(res[3]), len_time_stats) + self.assertEqual(len(res[2]["predictive_accuracy"][0]), 10) + self.assertEqual(len(res[3]["predictive_accuracy"][0]), 10) + self.assertEqual(parallel_mock.call_count, call_count)