Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Latest commit

 

History

History
History
592 lines (516 loc) · 26.7 KB

File metadata and controls

592 lines (516 loc) · 26.7 KB
Copy raw file
Download raw file
Open symbols panel
Edit and raw actions
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
"""Test docstring"""
from __future__ import absolute_import
from typing import Optional, Union, Dict, List
import sagemaker
import sagemaker.parameter
from sagemaker import vpc_utils
from sagemaker.deserializers import BytesDeserializer
from sagemaker.deprecations import removed_kwargs
from sagemaker.estimator import EstimatorBase
from sagemaker.inputs import TrainingInput, FileSystemInput
from sagemaker.serializers import IdentitySerializer
from sagemaker.transformer import Transformer
from sagemaker.predictor import Predictor
from sagemaker.session import Session
from sagemaker.workflow.entities import PipelineVariable
from sagemaker.workflow.pipeline_context import runnable_by_pipeline
from sagemaker.utils import format_tags, Tags
from sagemaker.workflow import is_pipeline_variable
class AlgorithmEstimator(EstimatorBase):
"""A generic Estimator to train using any algorithm object (with an ``algorithm_arn``).
The Algorithm can be your own, or any Algorithm from AWS
Marketplace that you have a valid subscription for. This class will perform
client-side validation on all the inputs.
"""
# These Hyperparameter Types have a range definition.
_hyperpameters_with_range = ("Integer", "Continuous", "Categorical")
def __init__(
self,
algorithm_arn: str,
role: str = None,
instance_count: Optional[Union[int, PipelineVariable]] = None,
instance_type: Optional[Union[str, PipelineVariable]] = None,
volume_size: Union[int, PipelineVariable] = 30,
volume_kms_key: Optional[Union[str, PipelineVariable]] = None,
max_run: Union[int, PipelineVariable] = 24 * 60 * 60,
input_mode: Union[str, PipelineVariable] = "File",
output_path: Optional[Union[str, PipelineVariable]] = None,
output_kms_key: Optional[Union[str, PipelineVariable]] = None,
base_job_name: Optional[str] = None,
sagemaker_session: Optional[Session] = None,
hyperparameters: Optional[Dict[str, Union[str, PipelineVariable]]] = None,
tags: Optional[Tags] = None,
subnets: Optional[List[Union[str, PipelineVariable]]] = None,
security_group_ids: Optional[List[Union[str, PipelineVariable]]] = None,
model_uri: Optional[str] = None,
model_channel_name: Union[str, PipelineVariable] = "model",
metric_definitions: Optional[List[Dict[str, Union[str, PipelineVariable]]]] = None,
encrypt_inter_container_traffic: Union[bool, PipelineVariable] = False,
use_spot_instances: Union[bool, PipelineVariable] = False,
max_wait: Optional[Union[int, PipelineVariable]] = None,
**kwargs # pylint: disable=W0613
):
"""Initialize an ``AlgorithmEstimator`` instance.
Args:
algorithm_arn (str): algorithm arn used for training. Can be just the name if your
account owns the algorithm.
role (str): An AWS IAM role (either name or full ARN). The Amazon SageMaker
training jobs and APIsthat create Amazon SageMaker endpoints use this role to
access training data and model artifacts. After the endpoint
is created, the inference code might use the IAM role, if it
needs to access an AWS resource.
instance_count (int or PipelineVariable): Number of Amazon EC2 instances to use
for training.
instance_type (str or PipelineVariable): Type of EC2 instance to use for training,
for example, 'ml.c4.xlarge'.
volume_size (int or PipelineVariable): Size in GB of the EBS volume to use for
storing input data during training (default: 30). Must be large enough to store
training data if File Mode is used (which is the default).
volume_kms_key (str or PipelineVariable): Optional. KMS key ID for encrypting
EBS volume attached to the training instance (default: None).
max_run (int or PipelineVariable): Timeout in seconds for training
(default: 24 * 60 * 60).
After this amount of time Amazon SageMaker terminates the
job regardless of its current status.
input_mode (str or PipelineVariable): The input mode that the algorithm supports
(default: 'File'). Valid modes:
* 'File' - Amazon SageMaker copies the training dataset from
the S3 location to a local directory.
* 'Pipe' - Amazon SageMaker streams data directly from S3 to
the container via a Unix-named pipe.
This argument can be overriden on a per-channel basis using
``sagemaker.inputs.TrainingInput.input_mode``.
output_path (str or PipelineVariable): S3 location for saving the training result
(model artifacts and output files). If not specified,
results are stored to a default bucket. If
the bucket with the specific name does not exist, the
estimator creates the bucket during the
:meth:`~sagemaker.estimator.EstimatorBase.fit` method
execution.
output_kms_key (str or PipelineVariable): Optional. KMS key ID for encrypting the
training output (default: None). base_job_name (str): Prefix for
training job name when the
:meth:`~sagemaker.estimator.EstimatorBase.fit`
method launches. If not specified, the estimator generates a
default job name, based on the training image name and
current timestamp.
sagemaker_session (sagemaker.session.Session): Session object which manages
interactions with Amazon SageMaker APIs and any other AWS services needed. If
not specified, the estimator creates one using the default
AWS configuration chain.
tags (Union[Tags]): Tags for
labeling a training job. For more, see
https://docs.aws.amazon.com/sagemaker/latest/dg/API_Tag.html.
subnets (list[str] or list[PipelineVariable]): List of subnet ids. If not specified
training job will be created without VPC config.
security_group_ids (list[str]): List of security group ids. If
not specified training job will be created without VPC config.
model_uri (str): URI where a pre-trained model is stored, either locally or in S3
(default: None). If specified, the estimator will create a channel pointing to
the model so the training job can download it. This model
can be a 'model.tar.gz' from a previous training job, or
other artifacts coming from a different source.
More information:
https://docs.aws.amazon.com/sagemaker/latest/dg/cdf-training.html#td-deserialization
model_channel_name (str or PipelineVariable): Name of the channel where 'model_uri'
will be downloaded (default: 'model'). metric_definitions
(list[dict]): A list of dictionaries that defines the metric(s)
used to evaluate the training jobs. Each dictionary contains two keys: 'Name' for
the name of the metric, and 'Regex' for the regular
expression used to extract the metric from the logs.
encrypt_inter_container_traffic (bool or PipelineVariable): Specifies whether traffic
between training containers is encrypted for the training job (default: ``False``).
use_spot_instances (bool or PipelineVariable): Specifies whether to use SageMaker
Managed Spot instances for training. If enabled then the
`max_wait` arg should also be set.
More information:
https://docs.aws.amazon.com/sagemaker/latest/dg/model-managed-spot-training.html
(default: ``False``).
max_wait (int or PipelineVariable): Timeout in seconds waiting for spot training
instances (default: None). After this amount of time Amazon
SageMaker will stop waiting for Spot instances to become
available (default: ``None``).
**kwargs: Additional kwargs. This is unused. It's only added for AlgorithmEstimator
to ignore the irrelevant arguments.
"""
self.algorithm_arn = algorithm_arn
super(AlgorithmEstimator, self).__init__(
role,
instance_count=instance_count,
instance_type=instance_type,
volume_size=volume_size,
volume_kms_key=volume_kms_key,
max_run=max_run,
input_mode=input_mode,
output_path=output_path,
output_kms_key=output_kms_key,
base_job_name=base_job_name,
sagemaker_session=sagemaker_session,
tags=format_tags(tags),
subnets=subnets,
security_group_ids=security_group_ids,
model_uri=model_uri,
model_channel_name=model_channel_name,
metric_definitions=metric_definitions,
encrypt_inter_container_traffic=encrypt_inter_container_traffic,
use_spot_instances=use_spot_instances,
max_wait=max_wait,
)
self.algorithm_spec = self.sagemaker_session.sagemaker_client.describe_algorithm(
AlgorithmName=algorithm_arn
)
self.validate_train_spec()
self.hyperparameter_definitions = self._parse_hyperparameters()
self._hyperparameters = {}
if hyperparameters:
self.set_hyperparameters(**hyperparameters)
def validate_train_spec(self):
"""Placeholder docstring"""
train_spec = self.algorithm_spec["TrainingSpecification"]
algorithm_name = self.algorithm_spec["AlgorithmName"]
# Check that the input mode provided is compatible with the training input modes for the
# algorithm.
input_modes = self._algorithm_training_input_modes(train_spec["TrainingChannels"])
if not is_pipeline_variable(self.input_mode) and self.input_mode not in input_modes:
raise ValueError(
"Invalid input mode: %s. %s only supports: %s"
% (self.input_mode, algorithm_name, input_modes)
)
# Check that the training instance type is compatible with the algorithm.
supported_instances = train_spec["SupportedTrainingInstanceTypes"]
if (
not is_pipeline_variable(self.instance_type)
and self.instance_type not in supported_instances
):
raise ValueError(
"Invalid instance_type: %s. %s supports the following instance types: %s"
% (self.instance_type, algorithm_name, supported_instances)
)
# Verify if distributed training is supported by the algorithm
if not is_pipeline_variable(self.instance_count) and (
self.instance_count > 1
and "SupportsDistributedTraining" in train_spec
and not train_spec["SupportsDistributedTraining"]
):
raise ValueError(
"Distributed training is not supported by %s. "
"Please set instance_count=1" % algorithm_name
)
def set_hyperparameters(self, **kwargs):
"""Placeholder docstring"""
for k, v in kwargs.items():
value = self._validate_and_cast_hyperparameter(k, v)
self._hyperparameters[k] = value
self._validate_and_set_default_hyperparameters()
def hyperparameters(self):
"""Returns the hyperparameters as a dictionary to use for training.
The fit() method, that does the model training, calls this method to
find the hyperparameters you specified.
"""
return self._hyperparameters
def training_image_uri(self):
"""Returns the docker image to use for training.
The fit() method, that does the model training, calls this method to
find the image to use for model training.
"""
raise RuntimeError("training_image_uri is never meant to be called on Algorithm Estimators")
def enable_network_isolation(self):
"""Return True if this Estimator will need network isolation to run.
On Algorithm Estimators this depends on the algorithm being used. If
this is algorithm owned by your account it will be False. If this is an
an algorithm consumed from Marketplace it will be True.
Returns:
bool: Whether this Estimator needs network isolation or not.
"""
return self._is_marketplace()
def create_model(
self,
role=None,
predictor_cls=None,
serializer=IdentitySerializer(),
deserializer=BytesDeserializer(),
vpc_config_override=vpc_utils.VPC_CONFIG_DEFAULT,
**kwargs
):
"""Create a model to deploy.
The serializer and deserializer are only used to define a default
Predictor. They are ignored if an explicit predictor class is passed in.
Other arguments are passed through to the Model class.
Args:
role (str): The ``ExecutionRoleArn`` IAM Role ARN for the ``Model``,
which is also used during transform jobs. If not specified, the
role from the Estimator will be used.
predictor_cls (Predictor): The predictor class to use when
deploying the model.
serializer (:class:`~sagemaker.serializers.BaseSerializer`): A
serializer object, used to encode data for an inference endpoint
(default: :class:`~sagemaker.serializers.IdentitySerializer`).
deserializer (:class:`~sagemaker.deserializers.BaseDeserializer`): A
deserializer object, used to decode data from an inference
endpoint (default: :class:`~sagemaker.deserializers.BytesDeserializer`).
vpc_config_override (dict[str, list[str]]): Optional override for VpcConfig set on
the model. Default: use subnets and security groups from this Estimator.
* 'Subnets' (list[str]): List of subnet ids.
* 'SecurityGroupIds' (list[str]): List of security group ids.
**kwargs: Additional arguments for creating a :class:`~sagemaker.model.ModelPackage`.
.. tip::
You can find additional parameters for using this method at
:class:`~sagemaker.model.ModelPackage` and
:class:`~sagemaker.model.Model`.
Returns:
a Model ready for deployment.
"""
removed_kwargs("content_type", kwargs)
removed_kwargs("accept", kwargs)
if predictor_cls is None:
def predict_wrapper(endpoint, session):
return Predictor(endpoint, session, serializer, deserializer)
predictor_cls = predict_wrapper
role = role or self.role
return sagemaker.ModelPackage(
role,
algorithm_arn=self.algorithm_arn,
model_data=self.model_data,
vpc_config=self.get_vpc_config(vpc_config_override),
sagemaker_session=self.sagemaker_session,
predictor_cls=predictor_cls,
**kwargs
)
def transformer(
self,
instance_count,
instance_type,
strategy=None,
assemble_with=None,
output_path=None,
output_kms_key=None,
accept=None,
env=None,
max_concurrent_transforms=None,
max_payload=None,
tags=None,
role=None,
volume_kms_key=None,
):
"""Return a ``Transformer`` that uses a SageMaker Model based on the training job.
It reuses the SageMaker Session and base job name used by the Estimator.
Args:
instance_count (int): Number of EC2 instances to use.
instance_type (str): Type of EC2 instance to use, for example,
'ml.c4.xlarge'.
strategy (str): The strategy used to decide how to batch records in
a single request (default: None). Valid values: 'MultiRecord'
and 'SingleRecord'.
assemble_with (str): How the output is assembled (default: None).
Valid values: 'Line' or 'None'.
output_path (str): S3 location for saving the transform result. If
not specified, results are stored to a default bucket.
output_kms_key (str): Optional. KMS key ID for encrypting the
transform output (default: None).
accept (str): The accept header passed by the client to
the inference endpoint. If it is supported by the endpoint,
it will be the format of the batch transform output.
env (dict): Environment variables to be set for use during the
transform job (default: None).
max_concurrent_transforms (int): The maximum number of HTTP requests
to be made to each individual transform container at one time.
max_payload (int): Maximum size of the payload in a single HTTP
request to the container in MB.
tags (list[dict]): List of tags for labeling a transform job. If
none specified, then the tags used for the training job are used
for the transform job.
role (str): The ``ExecutionRoleArn`` IAM Role ARN for the ``Model``,
which is also used during transform jobs. If not specified, the
role from the Estimator will be used.
volume_kms_key (str): Optional. KMS key ID for encrypting the volume
attached to the ML compute instance (default: None).
"""
role = role or self.role
if self.latest_training_job is not None:
model = self.create_model(role=role)
model._create_sagemaker_model()
model_name = model.name
transform_env = {}
if env is not None:
transform_env = model.env.copy()
transform_env.update(env)
if self._is_marketplace():
transform_env = None
tags = format_tags(tags) or self.tags
else:
raise RuntimeError("No finished training job found associated with this estimator")
return Transformer(
model_name,
instance_count,
instance_type,
strategy=strategy,
assemble_with=assemble_with,
output_path=output_path,
output_kms_key=output_kms_key,
accept=accept,
max_concurrent_transforms=max_concurrent_transforms,
max_payload=max_payload,
env=transform_env,
tags=tags,
base_transform_job_name=self.base_job_name,
volume_kms_key=volume_kms_key,
sagemaker_session=self.sagemaker_session,
)
def _is_marketplace(self):
"""Placeholder docstring"""
return "ProductId" in self.algorithm_spec
def _ensure_base_job_name(self):
"""Set ``self.base_job_name`` if it is not set already."""
if self.base_job_name is None:
self.base_job_name = self.algorithm_arn.split("/")[-1]
def _prepare_for_training(self, job_name=None):
# Validate hyperparameters
# an explicit call to set_hyperparameters() will also validate the hyperparameters
# but it is possible that the user never called it.
self._validate_and_set_default_hyperparameters()
super(AlgorithmEstimator, self)._prepare_for_training(job_name)
@runnable_by_pipeline
def fit(
self,
inputs: Optional[Union[str, Dict, TrainingInput, FileSystemInput]] = None,
wait: bool = True,
logs: bool = True,
job_name: Optional[str] = None,
):
"""Placeholder docstring"""
if inputs:
self._validate_input_channels(inputs)
return super(AlgorithmEstimator, self).fit(inputs, wait, logs, job_name)
def _validate_input_channels(self, channels):
"""Placeholder docstring"""
train_spec = self.algorithm_spec["TrainingSpecification"]
algorithm_name = self.algorithm_spec["AlgorithmName"]
training_channels = {c["Name"]: c for c in train_spec["TrainingChannels"]}
# check for unknown channels that the algorithm does not support
for c in channels:
if c not in training_channels:
raise ValueError(
"Unknown input channel: %s is not supported by: %s" % (c, algorithm_name)
)
# check for required channels that were not provided
for name, channel in training_channels.items():
if name not in channels and "IsRequired" in channel and channel["IsRequired"]:
raise ValueError("Required input channel: %s Was not provided." % (name))
def _validate_and_cast_hyperparameter(self, name, v):
"""Placeholder docstring"""
algorithm_name = self.algorithm_spec["AlgorithmName"]
if name not in self.hyperparameter_definitions:
raise ValueError(
"Invalid hyperparameter: %s is not supported by %s" % (name, algorithm_name)
)
definition = self.hyperparameter_definitions[name]
if "class" in definition:
value = definition["class"].cast_to_type(v)
else:
value = v
if "range" in definition and not definition["range"].is_valid(value):
valid_range = definition["range"].as_tuning_range(name)
raise ValueError("Invalid value: %s Supported range: %s" % (value, valid_range))
return value
def _validate_and_set_default_hyperparameters(self):
"""Placeholder docstring"""
# Check if all the required hyperparameters are set. If there is a default value
# for one, set it.
for name, definition in self.hyperparameter_definitions.items():
if name not in self._hyperparameters:
spec = definition["spec"]
if "DefaultValue" in spec:
self._hyperparameters[name] = spec["DefaultValue"]
elif "IsRequired" in spec and spec["IsRequired"]:
raise ValueError("Required hyperparameter: %s is not set" % name)
def _parse_hyperparameters(self):
"""Placeholder docstring"""
definitions = {}
training_spec = self.algorithm_spec["TrainingSpecification"]
if "SupportedHyperParameters" in training_spec:
hyperparameters = training_spec["SupportedHyperParameters"]
for h in hyperparameters:
parameter_type = h["Type"]
name = h["Name"]
parameter_class, parameter_range = self._hyperparameter_range_and_class(
parameter_type, h
)
definitions[name] = {"spec": h}
if parameter_range:
definitions[name]["range"] = parameter_range
if parameter_class:
definitions[name]["class"] = parameter_class
return definitions
def _hyperparameter_range_and_class(self, parameter_type, hyperparameter):
"""Placeholder docstring."""
if parameter_type in self._hyperpameters_with_range:
range_name = parameter_type + "ParameterRangeSpecification"
parameter_class = None
parameter_range = None
if parameter_type in ("Integer", "Continuous"):
# Integer and Continuous are handled the same way. We get the min and max values
# and just create an Instance of Parameter. Note that the range is optional for all
# the Parameter Types.
if parameter_type == "Integer":
parameter_class = sagemaker.parameter.IntegerParameter
else:
parameter_class = sagemaker.parameter.ContinuousParameter
if "Range" in hyperparameter:
min_value = parameter_class.cast_to_type(
hyperparameter["Range"][range_name]["MinValue"]
)
max_value = parameter_class.cast_to_type(
hyperparameter["Range"][range_name]["MaxValue"]
)
parameter_range = parameter_class(min_value, max_value)
elif parameter_type == "Categorical":
parameter_class = sagemaker.parameter.CategoricalParameter
if "Range" in hyperparameter:
values = hyperparameter["Range"][range_name]["Values"]
parameter_range = sagemaker.parameter.CategoricalParameter(values)
elif parameter_type == "FreeText":
pass
else:
raise ValueError(
"Invalid Hyperparameter type: %s. Valid ones are:"
"(Integer, Continuous, Categorical, FreeText)" % parameter_type
)
return parameter_class, parameter_range
def _algorithm_training_input_modes(self, training_channels):
"""Placeholder docstring"""
current_input_modes = {"File", "Pipe"}
for channel in training_channels:
supported_input_modes = set(channel["SupportedInputModes"])
current_input_modes = current_input_modes & supported_input_modes
return current_input_modes
@classmethod
def _prepare_init_params_from_job_description(cls, job_details, model_channel_name=None):
"""Convert the job description to init params that can be handled by the class constructor.
Args:
job_details (dict): the returned job details from a DescribeTrainingJob
API call.
model_channel_name (str): Name of the channel where pre-trained
model data will be downloaded.
Returns:
dict: The transformed init_params
"""
init_params = super(AlgorithmEstimator, cls)._prepare_init_params_from_job_description(
job_details, model_channel_name
)
# This hyperparameter is added by Amazon SageMaker Automatic Model Tuning.
# It cannot be set through instantiating an estimator.
if "_tuning_objective_metric" in init_params["hyperparameters"]:
del init_params["hyperparameters"]["_tuning_objective_metric"]
return init_params
Morty Proxy This is a proxified and sanitized view of the page, visit original site.