Created
March 27, 2024 03:58
-
-
Save mrmaheshrajput/4d18047c0e93028f451b058726bd014c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
!pip install -U sagemaker -q | |
import boto3 | |
import sagemaker | |
import sagemaker.session | |
session = sagemaker.session.Session() | |
region = session.boto_region_name | |
role = sagemaker.get_execution_role() | |
bucket = session.default_bucket() | |
prefix = "paramaterized" # Prefix to S3 artifacts | |
pipeline_name = "DEMO-parameterized-pipeline" # SageMaker Pipeline name | |
credit_model_group = "DEMO-credit-registry" | |
churn_model_group = "DEMO-churn-registry" | |
# Download data | |
s3 = boto3.client("s3") | |
s3.download_file( | |
f"sagemaker-example-files-prod-{region}", | |
"datasets/tabular/uci_statlog_german_credit_data/german_credit_data.csv", | |
"credit_risk/german_credit_data.csv", | |
) | |
s3.download_file( | |
f"sagemaker-example-files-prod-{region}", | |
"datasets/tabular/synthetic/churn.csv", | |
"customer_churn/churn-dataset.csv", | |
) | |
# Upload data | |
# Upload the raw datasets and scripts to S3 | |
customer_churn_data_uri = session.upload_data( | |
path="customer_churn/churn-dataset.csv", key_prefix=prefix + "/data" | |
) | |
credit_data_uri = session.upload_data( | |
path="credit_risk/german_credit_data.csv", key_prefix=prefix + "/data" | |
) | |
churn_preprocess_uri = session.upload_data( | |
path="customer_churn/preprocess.py", key_prefix=prefix + "/preprocess/churn" | |
) | |
credit_preprocess_uri = session.upload_data( | |
path="credit_risk/preprocess.py", key_prefix=prefix + "/preprocess/credit" | |
) | |
evaluate_script_uri = session.upload_data(path="evaluate.py", key_prefix=prefix + "/evaluate") | |
print("Customer churn data set uploaded to ", customer_churn_data_uri) | |
print("Credit data set uploaded to ", credit_data_uri) | |
print("Customer churn preprocessing script uploaded to ", churn_preprocess_uri) | |
print("Credit preprocessing script uploaded to ", credit_preprocess_uri) | |
print("Evaluation script uploaded to ", evaluate_script_uri) | |
# Pipeline input parameters | |
from sagemaker.workflow.parameters import ( | |
ParameterInteger, | |
ParameterString, | |
ParameterFloat, | |
) | |
# To what Registry to register the model and its versions. | |
model_registry_package = ParameterString(name="ModelGroup", default_value="default-registry") | |
# S3 URI to input data | |
input_data = ParameterString(name="InputData", default_value="s3://{}/uri/data.csv".format(bucket)) | |
# S3 URI to preprocessing script | |
preprocess_script = ParameterString( | |
name="PreprocessScript", default_value="s3://{}/uri/preprocess.py".format(bucket) | |
) | |
# S3 URI to evaluation script | |
evaluate_script = ParameterString( | |
name="EvaluateScript", default_value="s3://{}/uri/evaluate.py".format(bucket) | |
) | |
# Maximum amount of training jobs to allow in the HP tuning | |
max_training_jobs = ParameterInteger(name="MaxiumTrainingJobs", default_value=1) | |
# Maximum amount of trainingjobs to allow in the HP tuning | |
max_parallel_training_jobs = ParameterInteger(name="MaxiumParallelTrainingJobs", default_value=1) | |
# Accuracy threshold to decide whether or not to register the model with Model Registry | |
accuracy_condition_threshold = ParameterFloat(name="AccuracyConditionThreshold", default_value=0.7) | |
# What instance type to use for processing. | |
processing_instance_type = ParameterString( | |
name="ProcessingInstanceType", default_value="ml.m5.large" | |
) | |
# What instance type to use for training. | |
training_instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.m5.xlarge") | |
# Preprocess data step | |
from sagemaker.sklearn.processing import SKLearnProcessor | |
from sagemaker.processing import ProcessingInput, ProcessingOutput | |
from sagemaker.workflow.steps import ProcessingStep | |
from sagemaker.workflow.functions import Join | |
from sagemaker.workflow.execution_variables import ExecutionVariables | |
# Create SKlearn processor object, | |
# The object contains information about what instance type to use, the IAM role to use etc. | |
# A managed processor comes with a preconfigured container, so only specifying version is required. | |
sklearn_processor = SKLearnProcessor( | |
framework_version="0.23-1", role=role, instance_type=processing_instance_type, instance_count=1 | |
) | |
# Use the sklearn_processor in a SageMaker Pipelines ProcessingStep | |
step_preprocess_data = ProcessingStep( | |
name="Preprocess-Data", | |
processor=sklearn_processor, | |
inputs=[ | |
ProcessingInput(source=input_data, destination="/opt/ml/processing/input"), | |
], | |
outputs=[ | |
ProcessingOutput( | |
output_name="train", | |
source="/opt/ml/processing/train", | |
destination=Join( | |
on="/", | |
values=[ | |
"s3://{}".format(bucket), | |
prefix, | |
ExecutionVariables.PIPELINE_EXECUTION_ID, | |
"train", | |
], | |
), | |
), | |
ProcessingOutput( | |
output_name="validation", | |
source="/opt/ml/processing/validation", | |
destination=Join( | |
on="/", | |
values=[ | |
"s3://{}".format(bucket), | |
prefix, | |
ExecutionVariables.PIPELINE_EXECUTION_ID, | |
"validation", | |
], | |
), | |
), | |
ProcessingOutput( | |
output_name="test", | |
source="/opt/ml/processing/test", | |
destination=Join( | |
on="/", | |
values=[ | |
"s3://{}".format(bucket), | |
prefix, | |
ExecutionVariables.PIPELINE_EXECUTION_ID, | |
"test", | |
], | |
), | |
), | |
], | |
code=preprocess_script, | |
) | |
# Train model step | |
from sagemaker.inputs import TrainingInput | |
from sagemaker.estimator import Estimator | |
from sagemaker.tuner import HyperparameterTuner, ContinuousParameter, IntegerParameter | |
from sagemaker.workflow.steps import TuningStep | |
# Fetch container to use for training | |
image_uri = sagemaker.image_uris.retrieve( | |
framework="xgboost", | |
region=region, | |
version="1.2-2", | |
py_version="py3", | |
instance_type="ml.m5.xlarge", | |
) | |
# Create XGBoost estimator object | |
# The object contains information about what container to use, what instance type etc. | |
xgb_estimator = Estimator( | |
image_uri=image_uri, | |
instance_type=training_instance_type, | |
instance_count=1, | |
role=role, | |
disable_profiler=True, | |
) | |
# Create Hyperparameter tuner object. Ranges from https://docs.aws.amazon.com/sagemaker/latest/dg/xgboost-tuning.html | |
xgb_tuner = HyperparameterTuner( | |
estimator=xgb_estimator, | |
objective_metric_name="validation:auc", | |
hyperparameter_ranges={ | |
"eta": ContinuousParameter(0, 0.5), | |
"alpha": ContinuousParameter(0, 1000), | |
"min_child_weight": ContinuousParameter(1, 120), | |
"max_depth": IntegerParameter(1, 10), | |
"num_round": IntegerParameter(1, 2000), | |
"subsample": ContinuousParameter(0.5, 1), | |
}, | |
max_jobs=max_training_jobs, | |
max_parallel_jobs=max_parallel_training_jobs, | |
) | |
# use the tuner in a SageMaker pipielines tuning step. | |
step_tuning = TuningStep( | |
name="Train-And-Tune-Model", | |
tuner=xgb_tuner, | |
inputs={ | |
"train": TrainingInput( | |
s3_data=step_preprocess_data.properties.ProcessingOutputConfig.Outputs[ | |
"train" | |
].S3Output.S3Uri, | |
content_type="text/csv", | |
), | |
"validation": TrainingInput( | |
s3_data=step_preprocess_data.properties.ProcessingOutputConfig.Outputs[ | |
"validation" | |
].S3Output.S3Uri, | |
content_type="text/csv", | |
), | |
}, | |
) | |
# Evaluate model step | |
from sagemaker.processing import ScriptProcessor | |
from sagemaker.workflow.properties import PropertyFile | |
# Create ScriptProcessor object. | |
# The object contains information about what container to use, what instance type etc. | |
evaluate_model_processor = ScriptProcessor( | |
image_uri=image_uri, | |
command=["python3"], | |
instance_type=processing_instance_type, | |
instance_count=1, | |
role=role, | |
) | |
# Create a PropertyFile | |
# A PropertyFile is used to be able to reference outputs from a processing step, for instance to use in a condition step. | |
# For more information, visit https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-propertyfile.html | |
evaluation_report = PropertyFile( | |
name="EvaluationReport", output_name="evaluation", path="evaluation.json" | |
) | |
# Use the evaluate_model_processor in a SageMaker Pipelines ProcessingStep. | |
# Extract the best model for evaluation. | |
step_evaluate_model = ProcessingStep( | |
name="Evaluate-Model", | |
processor=evaluate_model_processor, | |
inputs=[ | |
ProcessingInput( | |
source=step_tuning.get_top_model_s3_uri(top_k=0, s3_bucket=bucket), | |
destination="/opt/ml/processing/model", | |
), | |
ProcessingInput( | |
source=step_preprocess_data.properties.ProcessingOutputConfig.Outputs[ | |
"test" | |
].S3Output.S3Uri, | |
destination="/opt/ml/processing/test", | |
), | |
], | |
outputs=[ | |
ProcessingOutput( | |
output_name="evaluation", | |
source="/opt/ml/processing/evaluation", | |
destination=Join( | |
on="/", | |
values=[ | |
"s3://{}".format(bucket), | |
prefix, | |
ExecutionVariables.PIPELINE_EXECUTION_ID, | |
"evaluation-report", | |
], | |
), | |
), | |
], | |
code=evaluate_script, | |
property_files=[evaluation_report], | |
) | |
# Register model step | |
from sagemaker.model_metrics import MetricsSource, ModelMetrics | |
from sagemaker.workflow.step_collections import RegisterModel | |
# Create ModelMetrics object using the evaluation report from the evaluation step | |
# A ModelMetrics object contains metrics captured from a model. | |
model_metrics = ModelMetrics( | |
model_statistics=MetricsSource( | |
s3_uri=Join( | |
on="/", | |
values=[ | |
step_evaluate_model.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"][ | |
"S3Uri" | |
], | |
"evaluation.json", | |
], | |
), | |
content_type="application/json", | |
) | |
) | |
# Crete a RegisterModel step, which registers the model with SageMaker Model Registry. | |
step_register_model = RegisterModel( | |
name="Register-Model", | |
estimator=xgb_estimator, | |
model_data=step_tuning.get_top_model_s3_uri(top_k=0, s3_bucket=bucket), | |
content_types=["text/csv"], | |
response_types=["text/csv"], | |
inference_instances=["ml.t2.medium", "ml.m5.xlarge", "ml.m5.large"], | |
transform_instances=["ml.m5.xlarge"], | |
model_package_group_name=model_registry_package, | |
model_metrics=model_metrics, | |
) | |
# Accuracy condition step | |
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo | |
from sagemaker.workflow.condition_step import ConditionStep | |
from sagemaker.workflow.functions import JsonGet | |
# Create accuracy condition to ensure the model meets performance requirements. | |
# Models with a test accuracy lower than the condition will not be registered with the model registry. | |
cond_gte = ConditionGreaterThanOrEqualTo( | |
left=JsonGet( | |
step_name=step_evaluate_model.name, | |
property_file=evaluation_report, | |
json_path="binary_classification_metrics.accuracy.value", | |
), | |
right=accuracy_condition_threshold, | |
) | |
# Create a SageMaker Pipelines ConditionStep, using the condition above. | |
# Enter the steps to perform if the condition returns True / False. | |
step_cond = ConditionStep( | |
name="Accuracy-Condition", | |
conditions=[cond_gte], | |
if_steps=[step_register_model], | |
else_steps=[], | |
) | |
# Create sagemaker pipeline | |
pipeline = Pipeline( | |
name=pipeline_name, | |
parameters=[ | |
processing_instance_type, | |
training_instance_type, | |
input_data, | |
preprocess_script, | |
evaluate_script, | |
accuracy_condition_threshold, | |
model_registry_package, | |
max_parallel_training_jobs, | |
max_training_jobs, | |
], | |
steps=[step_preprocess_data, step_tuning, step_evaluate_model, step_cond], | |
) | |
pipeline.upsert(role_arn=role) | |
# Start the pipeline | |
pipeline.start( | |
execution_display_name="Credit", | |
parameters=dict( | |
InputData=credit_data_uri, | |
PreprocessScript=credit_preprocess_uri, | |
EvaluateScript=evaluate_script_uri, | |
AccuracyConditionThreshold=0.2, | |
MaxiumParallelTrainingJobs=2, | |
MaxiumTrainingJobs=5, | |
ModelGroup=credit_model_group, | |
), | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment