Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 33 additions & 9 deletions 42 openml/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ def stop_using_configuration_for_example(cls):
cls._start_last_called = False


def _setup():
def _setup(config=None):
"""Setup openml package. Called on first import.

Reads the config file and sets up apikey, server, cache appropriately.
Expand Down Expand Up @@ -220,13 +220,27 @@ def _setup():
"not working properly." % config_dir
)

config = _parse_config(config_file)
apikey = config.get("FAKE_SECTION", "apikey")
server = config.get("FAKE_SECTION", "server")
if config is None:
config = _parse_config(config_file)

cache_dir = config.get("FAKE_SECTION", "cachedir")
cache_directory = os.path.expanduser(cache_dir)
def _get(config, key):
return config.get("FAKE_SECTION", key)

avoid_duplicate_runs = config.getboolean("FAKE_SECTION", "avoid_duplicate_runs")
else:

def _get(config, key):
return config.get(key)

avoid_duplicate_runs = config.get("avoid_duplicate_runs")

apikey = _get(config, "apikey")
server = _get(config, "server")
short_cache_dir = _get(config, "cachedir")
connection_n_retries = int(_get(config, "connection_n_retries"))
max_retries = int(_get(config, "max_retries"))

cache_directory = os.path.expanduser(short_cache_dir)
# create the cache subdirectory
if not os.path.exists(cache_directory):
try:
Expand All @@ -237,9 +251,6 @@ def _setup():
"OpenML-Python not working properly." % cache_directory
)

avoid_duplicate_runs = config.getboolean("FAKE_SECTION", "avoid_duplicate_runs")
connection_n_retries = int(config.get("FAKE_SECTION", "connection_n_retries"))
max_retries = int(config.get("FAKE_SECTION", "max_retries"))
if connection_n_retries > max_retries:
raise ValueError(
"A higher number of retries than {} is not allowed to keep the "
Expand Down Expand Up @@ -268,6 +279,17 @@ def _parse_config(config_file: str):
return config


def get_config_as_dict():
config = dict()
config["apikey"] = apikey
config["server"] = server
config["cachedir"] = cache_directory
config["avoid_duplicate_runs"] = avoid_duplicate_runs
config["connection_n_retries"] = connection_n_retries
config["max_retries"] = max_retries
return config


def get_cache_directory():
"""Get the current cache directory.

Expand Down Expand Up @@ -307,11 +329,13 @@ def set_cache_directory(cachedir):
)
stop_using_configuration_for_example = ConfigurationForExamples.stop_using_configuration_for_example


__all__ = [
"get_cache_directory",
"set_cache_directory",
"start_using_configuration_for_example",
"stop_using_configuration_for_example",
"get_config_as_dict",
]

_setup()
151 changes: 108 additions & 43 deletions 151 openml/runs/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import xmltodict
import numpy as np
import pandas as pd
from joblib.parallel import Parallel, delayed

import openml
import openml.utils
Expand Down Expand Up @@ -54,6 +55,7 @@ def run_model_on_task(
upload_flow: bool = False,
return_flow: bool = False,
dataset_format: str = "dataframe",
n_jobs: Optional[int] = None,
) -> Union[OpenMLRun, Tuple[OpenMLRun, OpenMLFlow]]:
"""Run the model on the dataset defined by the task.

Expand Down Expand Up @@ -84,6 +86,10 @@ def run_model_on_task(
dataset_format : str (default='dataframe')
If 'array', the dataset is passed to the model as a numpy array.
If 'dataframe', the dataset is passed to the model as a pandas dataframe.
n_jobs : int (default=None)
The number of processes/threads to distribute the evaluation asynchronously.
If `None` or `1`, then the evaluation is treated as synchronous and processed sequentially.
If `-1`, then the job uses as many cores available.

Returns
-------
Expand Down Expand Up @@ -131,6 +137,7 @@ def get_task_and_type_conversion(task: Union[int, str, OpenMLTask]) -> OpenMLTas
add_local_measures=add_local_measures,
upload_flow=upload_flow,
dataset_format=dataset_format,
n_jobs=n_jobs,
)
if return_flow:
return run, flow
Expand All @@ -146,6 +153,7 @@ def run_flow_on_task(
add_local_measures: bool = True,
upload_flow: bool = False,
dataset_format: str = "dataframe",
n_jobs: Optional[int] = None,
) -> OpenMLRun:

"""Run the model provided by the flow on the dataset defined by task.
Expand Down Expand Up @@ -181,6 +189,10 @@ def run_flow_on_task(
dataset_format : str (default='dataframe')
If 'array', the dataset is passed to the model as a numpy array.
If 'dataframe', the dataset is passed to the model as a pandas dataframe.
n_jobs : int (default=None)
The number of processes/threads to distribute the evaluation asynchronously.
If `None` or `1`, then the evaluation is treated as synchronous and processed sequentially.
If `-1`, then the job uses as many cores available.

Returns
-------
Expand Down Expand Up @@ -265,6 +277,7 @@ def run_flow_on_task(
extension=flow.extension,
add_local_measures=add_local_measures,
dataset_format=dataset_format,
n_jobs=n_jobs,
)

data_content, trace, fold_evaluations, sample_evaluations = res
Expand Down Expand Up @@ -425,6 +438,7 @@ def _run_task_get_arffcontent(
extension: "Extension",
add_local_measures: bool,
dataset_format: str,
n_jobs: int = None,
) -> Tuple[
List[List],
Optional[OpenMLRunTrace],
Expand All @@ -447,55 +461,37 @@ def _run_task_get_arffcontent(
# methods, less maintenance, less confusion)
num_reps, num_folds, num_samples = task.get_split_dimensions()

jobs = []
for n_fit, (rep_no, fold_no, sample_no) in enumerate(
itertools.product(range(num_reps), range(num_folds), range(num_samples),), start=1
):

train_indices, test_indices = task.get_train_test_split_indices(
repeat=rep_no, fold=fold_no, sample=sample_no
)
if isinstance(task, OpenMLSupervisedTask):
x, y = task.get_X_and_y(dataset_format=dataset_format)
if dataset_format == "dataframe":
train_x = x.iloc[train_indices]
train_y = y.iloc[train_indices]
test_x = x.iloc[test_indices]
test_y = y.iloc[test_indices]
else:
train_x = x[train_indices]
train_y = y[train_indices]
test_x = x[test_indices]
test_y = y[test_indices]
elif isinstance(task, OpenMLClusteringTask):
x = task.get_X(dataset_format=dataset_format)
if dataset_format == "dataframe":
train_x = x.iloc[train_indices]
else:
train_x = x[train_indices]
train_y = None
test_x = None
test_y = None
else:
raise NotImplementedError(task.task_type)

config.logger.info(
"Going to execute flow '%s' on task %d for repeat %d fold %d sample %d.",
flow.name,
task.task_id,
rep_no,
fold_no,
sample_no,
)

pred_y, proba_y, user_defined_measures_fold, trace = extension._run_model_on_fold(
jobs.append((n_fit, rep_no, fold_no, sample_no))

# The forked child process may not copy the configuration state of OpenML from the parent.
# Current configuration setup needs to be copied and passed to the child processes.
_config = config.get_config_as_dict()
# Execute runs in parallel
# assuming the same number of tasks as workers (n_jobs), the total compute time for this
# statement will be similar to the slowest run
job_rvals = Parallel(verbose=0, n_jobs=n_jobs)(
delayed(_run_task_get_arffcontent_parallel_helper)(
extension=extension,
flow=flow,
fold_no=fold_no,
model=model,
task=task,
X_train=train_x,
y_train=train_y,
rep_no=rep_no,
fold_no=fold_no,
X_test=test_x,
sample_no=sample_no,
task=task,
dataset_format=dataset_format,
configuration=_config,
)
for n_fit, rep_no, fold_no, sample_no in jobs
) # job_rvals contain the output of all the runs with one-to-one correspondence with `jobs`

for n_fit, rep_no, fold_no, sample_no in jobs:
pred_y, proba_y, test_indices, test_y, trace, user_defined_measures_fold = job_rvals[
n_fit - 1
]
if trace is not None:
traces.append(trace)

Expand Down Expand Up @@ -615,6 +611,75 @@ def _calculate_local_measure(sklearn_fn, openml_name):
)


def _run_task_get_arffcontent_parallel_helper(
extension: "Extension",
flow: OpenMLFlow,
fold_no: int,
model: Any,
rep_no: int,
sample_no: int,
task: OpenMLTask,
dataset_format: str,
configuration: Dict = None,
) -> Tuple[
np.ndarray,
Optional[pd.DataFrame],
np.ndarray,
Optional[pd.DataFrame],
Optional[OpenMLRunTrace],
"OrderedDict[str, float]",
]:
# Sets up the OpenML instantiated in the child process to match that of the parent's
# if configuration=None, loads the default
config._setup(configuration)

train_indices, test_indices = task.get_train_test_split_indices(
repeat=rep_no, fold=fold_no, sample=sample_no
)

if isinstance(task, OpenMLSupervisedTask):
x, y = task.get_X_and_y(dataset_format=dataset_format)
if dataset_format == "dataframe":
train_x = x.iloc[train_indices]
train_y = y.iloc[train_indices]
test_x = x.iloc[test_indices]
test_y = y.iloc[test_indices]
else:
train_x = x[train_indices]
train_y = y[train_indices]
test_x = x[test_indices]
test_y = y[test_indices]
elif isinstance(task, OpenMLClusteringTask):
x = task.get_X(dataset_format=dataset_format)
if dataset_format == "dataframe":
train_x = x.iloc[train_indices]
else:
train_x = x[train_indices]
train_y = None
test_x = None
test_y = None
else:
raise NotImplementedError(task.task_type)
config.logger.info(
"Going to execute flow '%s' on task %d for repeat %d fold %d sample %d.",
flow.name,
task.task_id,
rep_no,
fold_no,
sample_no,
)
pred_y, proba_y, user_defined_measures_fold, trace, = extension._run_model_on_fold(
model=model,
task=task,
X_train=train_x,
y_train=train_y,
rep_no=rep_no,
fold_no=fold_no,
X_test=test_x,
)
return pred_y, proba_y, test_indices, test_y, trace, user_defined_measures_fold


def get_runs(run_ids):
"""Gets all runs in run_ids list.

Expand Down
29 changes: 29 additions & 0 deletions 29 tests/test_openml/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,35 @@ def test_non_writable_home(self, log_handler_mock, warnings_mock, expanduser_moc
self.assertEqual(log_handler_mock.call_count, 1)
self.assertFalse(log_handler_mock.call_args_list[0][1]["create_file_handler"])

def test_get_config_as_dict(self):
""" Checks if the current configuration is returned accurately as a dict. """
config = openml.config.get_config_as_dict()
_config = dict()
_config["apikey"] = "610344db6388d9ba34f6db45a3cf71de"
_config["server"] = "https://test.openml.org/api/v1/xml"
_config["cachedir"] = self.workdir
_config["avoid_duplicate_runs"] = False
_config["connection_n_retries"] = 10
_config["max_retries"] = 20
self.assertIsInstance(config, dict)
self.assertEqual(len(config), 6)
self.assertDictEqual(config, _config)

def test_setup_with_config(self):
""" Checks if the OpenML configuration can be updated using _setup(). """
_config = dict()
_config["apikey"] = "610344db6388d9ba34f6db45a3cf71de"
_config["server"] = "https://www.openml.org/api/v1/xml"
_config["cachedir"] = self.workdir
_config["avoid_duplicate_runs"] = True
_config["connection_n_retries"] = 100
_config["max_retries"] = 1000
orig_config = openml.config.get_config_as_dict()
openml.config._setup(_config)
updated_config = openml.config.get_config_as_dict()
openml.config._setup(orig_config) # important to not affect other unit tests
self.assertDictEqual(_config, updated_config)


class TestConfigurationForExamples(openml.testing.TestBase):
def test_switch_to_example_configuration(self):
Expand Down
Loading
Morty Proxy This is a proxified and sanitized view of the page, visit original site.