diff --git a/.env.example b/.env.example index af11a5a3..68bab62c 100644 --- a/.env.example +++ b/.env.example @@ -32,8 +32,6 @@ EVALUATE_SCRIPT_PATH = 'evaluate/evaluate_model.py' REGISTER_SCRIPT_PATH = 'register/register_model.py' SOURCES_DIR_TRAIN = 'diabetes_regression' DATASET_NAME = 'diabetes_ds' -DATASTORE_NAME = 'datablobstore' -DATAFILE_NAME = 'diabetes.csv' # Optional. Used by a training pipeline with R on Databricks DB_CLUSTER_ID = '' diff --git a/diabetes_regression/evaluate/evaluate_model.py b/diabetes_regression/evaluate/evaluate_model.py index ed234b7a..57685b3d 100644 --- a/diabetes_regression/evaluate/evaluate_model.py +++ b/diabetes_regression/evaluate/evaluate_model.py @@ -90,6 +90,7 @@ help="Name of the Model", default="sklearn_regression_model.pkl", ) + parser.add_argument( "--allow_run_cancel", type=str, @@ -122,18 +123,10 @@ model_name, tag_name, exp.name, ws) if (model is not None): - - production_model_run_id = model.run_id - - # Get the run history for both production model and - # newly trained model and compare mse - production_model_run = Run(exp, run_id=production_model_run_id) - new_model_run = run.parent - print("Production model run is", production_model_run) - - production_model_mse = \ - production_model_run.get_metrics().get(metric_eval) - new_model_mse = new_model_run.get_metrics().get(metric_eval) + production_model_mse = 10000 + if (metric_eval in model.tags): + production_model_mse = float(model.tags[metric_eval]) + new_model_mse = float(run.parent.get_metrics().get(metric_eval)) if (production_model_mse is None or new_model_mse is None): print("Unable to find", metric_eval, "metrics, " "exiting evaluation") @@ -151,7 +144,7 @@ print("New trained model performs better, " "thus it should be registered") else: - print("New trained model metric is less than or equal to " + print("New trained model metric is worse than or equal to " "production model so skipping model registration.") if((allow_run_cancel).lower() == 'true'): run.parent.cancel() diff --git a/diabetes_regression/register/register_model.py b/diabetes_regression/register/register_model.py index 95b9e897..3fc89495 100644 --- a/diabetes_regression/register/register_model.py +++ b/diabetes_regression/register/register_model.py @@ -27,6 +27,7 @@ import sys import argparse import traceback +import joblib from azureml.core import Run, Experiment, Workspace from azureml.core.model import Model as AMLModel @@ -63,17 +64,24 @@ def main(): type=str, help="The Build ID of the build triggering this pipeline run", ) + parser.add_argument( "--run_id", type=str, help="Training run ID", ) + parser.add_argument( "--model_name", type=str, help="Name of the Model", default="sklearn_regression_model.pkl", ) + parser.add_argument( + "--step_input", + type=str, + help=("input from previous steps") + ) args = parser.parse_args() if (args.build_id is not None): @@ -83,18 +91,42 @@ def main(): if (run_id == 'amlcompute'): run_id = run.parent.id model_name = args.model_name + model_path = args.step_input - if (build_id is None): - register_aml_model(model_name, exp, run_id) - else: - run.tag("BuildId", value=build_id) - builduri_base = os.environ.get("BUILDURI_BASE") - if (builduri_base is not None): - build_uri = builduri_base + build_id - run.tag("BuildUri", value=build_uri) - register_aml_model(model_name, exp, run_id, build_id, build_uri) + # load the model + print("Loading model from " + model_path) + model_file = os.path.join(model_path, model_name) + model = joblib.load(model_file) + model_mse = run.parent.get_metrics()["mse"] + + if (model is not None): + if (build_id is None): + register_aml_model(model_file, model_name, exp, run_id) else: - register_aml_model(model_name, exp, run_id, build_id) + run.tag("BuildId", value=build_id) + builduri_base = os.environ.get("BUILDURI_BASE") + if (builduri_base is not None): + build_uri = builduri_base + build_id + run.tag("BuildUri", value=build_uri) + register_aml_model( + model_file, + model_name, + model_mse, + exp, + run_id, + build_id, + build_uri) + else: + register_aml_model( + model_file, + model_name, + model_mse, + exp, + run_id, + build_id) + else: + print("Model not found. Skipping model registration.") + sys.exit(0) def model_already_registered(model_name, exp, run_id): @@ -109,35 +141,30 @@ def model_already_registered(model_name, exp, run_id): def register_aml_model( + model_path, model_name, + model_mse, exp, run_id, build_id: str = 'none', build_uri=None ): try: + tagsValue = {"area": "diabetes_regression", + "run_id": run_id, + "experiment_name": exp.name, + "mse": model_mse} if (build_id != 'none'): model_already_registered(model_name, exp, run_id) - run = Run(experiment=exp, run_id=run_id) - tagsValue = {"area": "diabetes_regression", - "BuildId": build_id, "run_id": run_id, - "experiment_name": exp.name} + tagsValue["BuildId"] = build_id if (build_uri is not None): tagsValue["BuildUri"] = build_uri - else: - run = Run(experiment=exp, run_id=run_id) - if (run is not None): - tagsValue = {"area": "diabetes_regression", - "run_id": run_id, "experiment_name": exp.name} - else: - print("A model run for experiment", exp.name, - "matching properties run_id =", run_id, - "was not found. Skipping model registration.") - sys.exit(0) - - model = run.register_model(model_name=model_name, - model_path="./outputs/" + model_name, - tags=tagsValue) + + model = AMLModel.register( + workspace=exp.workspace, + model_name=model_name, + model_path=model_path, + tags=tagsValue) os.chdir("..") print( "Model registered: {} \nModel Description: {} " diff --git a/diabetes_regression/training/train.py b/diabetes_regression/training/train.py index a79d5e24..e3bdca27 100644 --- a/diabetes_regression/training/train.py +++ b/diabetes_regression/training/train.py @@ -24,10 +24,8 @@ POSSIBILITY OF SUCH DAMAGE. """ from azureml.core.run import Run -from azureml.core import Dataset import os import argparse -from sklearn.datasets import load_diabetes from sklearn.linear_model import Ridge from sklearn.metrics import mean_squared_error from sklearn.model_selection import train_test_split @@ -65,19 +63,20 @@ def main(): ) parser.add_argument( - "--dataset_name", + "--step_output", type=str, - help=("Dataset with the training data") + help=("output for passing data to next step") ) + args = parser.parse_args() print("Argument [build_id]: %s" % args.build_id) print("Argument [model_name]: %s" % args.model_name) - print("Argument [dataset_name]: %s" % args.dataset_name) + print("Argument [step_output]: %s" % args.step_output) model_name = args.model_name build_id = args.build_id - dataset_name = args.dataset_name + step_output_path = args.step_output print("Getting training parameters") @@ -91,15 +90,17 @@ def main(): print("Parameter alpha: %s" % alpha) run = Run.get_context() - ws = run.experiment.workspace - if (dataset_name): - dataset = Dataset.get_by_name(workspace=ws, name=dataset_name) + # Get the dataset + dataset = run.input_datasets['training_data'] + if (dataset): df = dataset.to_pandas_dataframe() X = df.values y = df.Y else: - X, y = load_diabetes(return_X_y=True) + e = ("No dataset provided") + print(e) + raise Exception(e) X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=0.2, random_state=0) @@ -108,21 +109,18 @@ def main(): reg = train_model(run, data, alpha) - joblib.dump(value=reg, filename=model_name) - - # upload model file explicitly into artifacts for parent run - run.parent.upload_file(name="./outputs/" + model_name, - path_or_stream=model_name) - print("Uploaded the model {} to experiment {}".format( - model_name, run.experiment.name)) - dirpath = os.getcwd() - print(dirpath) - print("Following files are uploaded ") - print(run.parent.get_file_names()) + # Pass model file to next step + os.makedirs(step_output_path, exist_ok=True) + model_output_path = os.path.join(step_output_path, model_name) + joblib.dump(value=reg, filename=model_output_path) - run.parent.tag("BuildId", value=build_id) + # Also upload model file to run outputs for history + os.makedirs('outputs', exist_ok=True) + output_path = os.path.join('outputs', model_name) + joblib.dump(value=reg, filename=output_path) # Add properties to identify this specific training run + run.parent.tag("BuildId", value=build_id) run.tag("BuildId", value=build_id) run.tag("run_type", value="train") builduri_base = os.environ.get("BUILDURI_BASE") diff --git a/docs/getting_started.md b/docs/getting_started.md index 0b4b8379..01352484 100644 --- a/docs/getting_started.md +++ b/docs/getting_started.md @@ -75,6 +75,10 @@ the BASE_NAME value should not exceed 10 characters and it should contain number The **RESOURCE_GROUP** parameter is used as the name for the resource group that will hold the Azure resources for the solution. If providing an existing AML Workspace, set this value to the corresponding resource group name. +The **WORKSPACE_SVC_CONNECTION** parameter is used to reference a service connection for the Azure ML workspace. You will create this after provisioning the workspace (we recommend using the IaC pipeline as described below), and installing the Azure ML extension in your Azure DevOps project. + +Optionally, a **DATASET_NAME** parameter can be used to reference a training dataset that you have registered in your Azure ML workspace (more details below). + Make sure to select the **Allow access to all pipelines** checkbox in the variable group configuration. @@ -125,8 +129,7 @@ Check out the newly created resources in the [Azure Portal](https://portal.azure (Optional) To remove the resources created for this project you can use the [/environment_setup/iac-remove-environment.yml](../environment_setup/iac-remove-environment.yml) definition or you can just delete the resource group in the [Azure Portal](https://portal.azure.com). -**Note:** The training ML pipeline uses a [sample diabetes dataset](https://scikit-learn.org/stable/modules/generated/sklearn.datasets.load_diabetes.html) as training data. If you want to use your own dataset, you need to [create and register a datastore](https://docs.microsoft.com/en-us/azure/machine-learning/how-to-access-data#azure-machine-learning-studio) in your ML workspace and upload the datafile (e.g. [diabetes.csv](./data/diabetes.csv)) to the corresponding blob container. You can also define a datastore in the ML Workspace with [az cli](https://docs.microsoft.com/en-us/cli/azure/ext/azure-cli-ml/ml/datastore?view=azure-cli-latest#ext-azure-cli-ml-az-ml-datastore-attach-blob). -You'll also need to configure DATASTORE_NAME and DATAFILE_NAME variables in ***devopsforai-aml-vg*** variable group. +**Note:** The training ML pipeline uses a [sample diabetes dataset](https://scikit-learn.org/stable/modules/generated/sklearn.datasets.load_diabetes.html) as training data. To use your own data, you need to [create a Dataset](https://docs.microsoft.com/azure/machine-learning/how-to-create-register-datasets) in your workspace and specify its name in a DATASET_NAME variable in the ***devopsforai-aml-vg*** variable group. You will also need to modify the test cases in the **ml_service/util/smoke_test_scoring_service.py** script to match the schema of the training features in your dataset. ## Create an Azure DevOps Azure ML Workspace Service Connection diff --git a/ml_service/pipelines/diabetes_regression_build_train_pipeline.py b/ml_service/pipelines/diabetes_regression_build_train_pipeline.py index b7d32f99..f382a476 100644 --- a/ml_service/pipelines/diabetes_regression_build_train_pipeline.py +++ b/ml_service/pipelines/diabetes_regression_build_train_pipeline.py @@ -1,11 +1,14 @@ from azureml.pipeline.core.graph import PipelineParameter from azureml.pipeline.steps import PythonScriptStep -from azureml.pipeline.core import Pipeline +from azureml.pipeline.core import Pipeline, PipelineData from azureml.core import Workspace, Environment from azureml.core.runconfig import RunConfiguration -from azureml.core import Dataset, Datastore +from azureml.core import Dataset from ml_service.util.attach_compute import get_compute from ml_service.util.env_variables import Env +from sklearn.datasets import load_diabetes +import pandas as pd +import os def main(): @@ -45,26 +48,59 @@ def main(): build_id_param = PipelineParameter( name="build_id", default_value=e.build_id) - dataset_name = "" - if (e.datastore_name is not None and e.datafile_name is not None): - dataset_name = e.dataset_name - datastore = Datastore.get(aml_workspace, e.datastore_name) - data_path = [(datastore, e.datafile_name)] - dataset = Dataset.Tabular.from_delimited_files(path=data_path) - dataset.register(workspace=aml_workspace, - name=e.dataset_name, - description="dataset with training data", - create_new_version=True) + # Get dataset name + dataset_name = e.dataset_name + + # Check to see if dataset exists + if (dataset_name not in aml_workspace.datasets): + # Create dataset from diabetes sample data + sample_data = load_diabetes() + df = pd.DataFrame( + data=sample_data.data, + columns=sample_data.feature_names) + df['Y'] = sample_data.target + file_name = 'diabetes.csv' + df.to_csv(file_name, index=False) + + # Upload file to default datastore in workspace + default_ds = aml_workspace.get_default_datastore() + target_path = 'training-data/' + default_ds.upload_files( + files=[file_name], + target_path=target_path, + overwrite=True, + show_progress=False) + + # Register dataset + path_on_datastore = os.path.join(target_path, file_name) + dataset = Dataset.Tabular.from_delimited_files( + path=(default_ds, path_on_datastore)) + dataset = dataset.register( + workspace=aml_workspace, + name=dataset_name, + description='diabetes training data', + tags={'format': 'CSV'}, + create_new_version=True) + + # Get the dataset + dataset = Dataset.get_by_name(aml_workspace, dataset_name) + + # Create a PipelineData to pass data between steps + pipeline_data = PipelineData( + 'pipeline_data', + datastore=aml_workspace.get_default_datastore()) train_step = PythonScriptStep( name="Train Model", script_name=e.train_script_path, compute_target=aml_compute, source_directory=e.sources_directory_train, + inputs=[dataset.as_named_input('training_data')], + outputs=[pipeline_data], arguments=[ "--build_id", build_id_param, "--model_name", model_name_param, - "--dataset_name", dataset_name, + "--step_output", pipeline_data ], runconfig=run_config, allow_reuse=False, @@ -91,9 +127,11 @@ def main(): script_name=e.register_script_path, compute_target=aml_compute, source_directory=e.sources_directory_train, + inputs=[pipeline_data], arguments=[ "--build_id", build_id_param, "--model_name", model_name_param, + "--step_input", pipeline_data, ], runconfig=run_config, allow_reuse=False, diff --git a/ml_service/util/env_variables.py b/ml_service/util/env_variables.py index c3f30e72..2386a5b3 100644 --- a/ml_service/util/env_variables.py +++ b/ml_service/util/env_variables.py @@ -41,8 +41,6 @@ def __init__(self): self._score_script = os.environ.get("SCORE_SCRIPT") self._collection_uri = os.environ.get("SYSTEM_COLLECTIONURI") self._teamproject_name = os.environ.get("SYSTEM_TEAMPROJECT") - self._datastore_name = os.environ.get("DATASTORE_NAME") - self._datafile_name = os.environ.get("DATAFILE_NAME") self._dataset_name = os.environ.get("DATASET_NAME") self._run_evaluation = os.environ.get("RUN_EVALUATION", "true") self._allow_run_cancel = os.environ.get( @@ -152,14 +150,6 @@ def collection_uri(self): def teamproject_name(self): return self._teamproject_name - @property - def datastore_name(self): - return self._datastore_name - - @property - def datafile_name(self): - return self._datafile_name - @property def dataset_name(self): return self._dataset_name