Skip to content

Instantly share code, notes, and snippets.

@mrmaheshrajput
Created March 27, 2024 03:58
Show Gist options
  • Save mrmaheshrajput/4d18047c0e93028f451b058726bd014c to your computer and use it in GitHub Desktop.
Save mrmaheshrajput/4d18047c0e93028f451b058726bd014c to your computer and use it in GitHub Desktop.
!pip install -U sagemaker -q
import boto3
import sagemaker
import sagemaker.session
session = sagemaker.session.Session()
region = session.boto_region_name
role = sagemaker.get_execution_role()
bucket = session.default_bucket()
prefix = "paramaterized" # Prefix to S3 artifacts
pipeline_name = "DEMO-parameterized-pipeline" # SageMaker Pipeline name
credit_model_group = "DEMO-credit-registry"
churn_model_group = "DEMO-churn-registry"
# Download data
s3 = boto3.client("s3")
s3.download_file(
f"sagemaker-example-files-prod-{region}",
"datasets/tabular/uci_statlog_german_credit_data/german_credit_data.csv",
"credit_risk/german_credit_data.csv",
)
s3.download_file(
f"sagemaker-example-files-prod-{region}",
"datasets/tabular/synthetic/churn.csv",
"customer_churn/churn-dataset.csv",
)
# Upload data
# Upload the raw datasets and scripts to S3
customer_churn_data_uri = session.upload_data(
path="customer_churn/churn-dataset.csv", key_prefix=prefix + "/data"
)
credit_data_uri = session.upload_data(
path="credit_risk/german_credit_data.csv", key_prefix=prefix + "/data"
)
churn_preprocess_uri = session.upload_data(
path="customer_churn/preprocess.py", key_prefix=prefix + "/preprocess/churn"
)
credit_preprocess_uri = session.upload_data(
path="credit_risk/preprocess.py", key_prefix=prefix + "/preprocess/credit"
)
evaluate_script_uri = session.upload_data(path="evaluate.py", key_prefix=prefix + "/evaluate")
print("Customer churn data set uploaded to ", customer_churn_data_uri)
print("Credit data set uploaded to ", credit_data_uri)
print("Customer churn preprocessing script uploaded to ", churn_preprocess_uri)
print("Credit preprocessing script uploaded to ", credit_preprocess_uri)
print("Evaluation script uploaded to ", evaluate_script_uri)
# Pipeline input parameters
from sagemaker.workflow.parameters import (
ParameterInteger,
ParameterString,
ParameterFloat,
)
# To what Registry to register the model and its versions.
model_registry_package = ParameterString(name="ModelGroup", default_value="default-registry")
# S3 URI to input data
input_data = ParameterString(name="InputData", default_value="s3://{}/uri/data.csv".format(bucket))
# S3 URI to preprocessing script
preprocess_script = ParameterString(
name="PreprocessScript", default_value="s3://{}/uri/preprocess.py".format(bucket)
)
# S3 URI to evaluation script
evaluate_script = ParameterString(
name="EvaluateScript", default_value="s3://{}/uri/evaluate.py".format(bucket)
)
# Maximum amount of training jobs to allow in the HP tuning
max_training_jobs = ParameterInteger(name="MaxiumTrainingJobs", default_value=1)
# Maximum amount of trainingjobs to allow in the HP tuning
max_parallel_training_jobs = ParameterInteger(name="MaxiumParallelTrainingJobs", default_value=1)
# Accuracy threshold to decide whether or not to register the model with Model Registry
accuracy_condition_threshold = ParameterFloat(name="AccuracyConditionThreshold", default_value=0.7)
# What instance type to use for processing.
processing_instance_type = ParameterString(
name="ProcessingInstanceType", default_value="ml.m5.large"
)
# What instance type to use for training.
training_instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.m5.xlarge")
# Preprocess data step
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.functions import Join
from sagemaker.workflow.execution_variables import ExecutionVariables
# Create SKlearn processor object,
# The object contains information about what instance type to use, the IAM role to use etc.
# A managed processor comes with a preconfigured container, so only specifying version is required.
sklearn_processor = SKLearnProcessor(
framework_version="0.23-1", role=role, instance_type=processing_instance_type, instance_count=1
)
# Use the sklearn_processor in a SageMaker Pipelines ProcessingStep
step_preprocess_data = ProcessingStep(
name="Preprocess-Data",
processor=sklearn_processor,
inputs=[
ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
],
outputs=[
ProcessingOutput(
output_name="train",
source="/opt/ml/processing/train",
destination=Join(
on="/",
values=[
"s3://{}".format(bucket),
prefix,
ExecutionVariables.PIPELINE_EXECUTION_ID,
"train",
],
),
),
ProcessingOutput(
output_name="validation",
source="/opt/ml/processing/validation",
destination=Join(
on="/",
values=[
"s3://{}".format(bucket),
prefix,
ExecutionVariables.PIPELINE_EXECUTION_ID,
"validation",
],
),
),
ProcessingOutput(
output_name="test",
source="/opt/ml/processing/test",
destination=Join(
on="/",
values=[
"s3://{}".format(bucket),
prefix,
ExecutionVariables.PIPELINE_EXECUTION_ID,
"test",
],
),
),
],
code=preprocess_script,
)
# Train model step
from sagemaker.inputs import TrainingInput
from sagemaker.estimator import Estimator
from sagemaker.tuner import HyperparameterTuner, ContinuousParameter, IntegerParameter
from sagemaker.workflow.steps import TuningStep
# Fetch container to use for training
image_uri = sagemaker.image_uris.retrieve(
framework="xgboost",
region=region,
version="1.2-2",
py_version="py3",
instance_type="ml.m5.xlarge",
)
# Create XGBoost estimator object
# The object contains information about what container to use, what instance type etc.
xgb_estimator = Estimator(
image_uri=image_uri,
instance_type=training_instance_type,
instance_count=1,
role=role,
disable_profiler=True,
)
# Create Hyperparameter tuner object. Ranges from https://docs.aws.amazon.com/sagemaker/latest/dg/xgboost-tuning.html
xgb_tuner = HyperparameterTuner(
estimator=xgb_estimator,
objective_metric_name="validation:auc",
hyperparameter_ranges={
"eta": ContinuousParameter(0, 0.5),
"alpha": ContinuousParameter(0, 1000),
"min_child_weight": ContinuousParameter(1, 120),
"max_depth": IntegerParameter(1, 10),
"num_round": IntegerParameter(1, 2000),
"subsample": ContinuousParameter(0.5, 1),
},
max_jobs=max_training_jobs,
max_parallel_jobs=max_parallel_training_jobs,
)
# use the tuner in a SageMaker pipielines tuning step.
step_tuning = TuningStep(
name="Train-And-Tune-Model",
tuner=xgb_tuner,
inputs={
"train": TrainingInput(
s3_data=step_preprocess_data.properties.ProcessingOutputConfig.Outputs[
"train"
].S3Output.S3Uri,
content_type="text/csv",
),
"validation": TrainingInput(
s3_data=step_preprocess_data.properties.ProcessingOutputConfig.Outputs[
"validation"
].S3Output.S3Uri,
content_type="text/csv",
),
},
)
# Evaluate model step
from sagemaker.processing import ScriptProcessor
from sagemaker.workflow.properties import PropertyFile
# Create ScriptProcessor object.
# The object contains information about what container to use, what instance type etc.
evaluate_model_processor = ScriptProcessor(
image_uri=image_uri,
command=["python3"],
instance_type=processing_instance_type,
instance_count=1,
role=role,
)
# Create a PropertyFile
# A PropertyFile is used to be able to reference outputs from a processing step, for instance to use in a condition step.
# For more information, visit https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-propertyfile.html
evaluation_report = PropertyFile(
name="EvaluationReport", output_name="evaluation", path="evaluation.json"
)
# Use the evaluate_model_processor in a SageMaker Pipelines ProcessingStep.
# Extract the best model for evaluation.
step_evaluate_model = ProcessingStep(
name="Evaluate-Model",
processor=evaluate_model_processor,
inputs=[
ProcessingInput(
source=step_tuning.get_top_model_s3_uri(top_k=0, s3_bucket=bucket),
destination="/opt/ml/processing/model",
),
ProcessingInput(
source=step_preprocess_data.properties.ProcessingOutputConfig.Outputs[
"test"
].S3Output.S3Uri,
destination="/opt/ml/processing/test",
),
],
outputs=[
ProcessingOutput(
output_name="evaluation",
source="/opt/ml/processing/evaluation",
destination=Join(
on="/",
values=[
"s3://{}".format(bucket),
prefix,
ExecutionVariables.PIPELINE_EXECUTION_ID,
"evaluation-report",
],
),
),
],
code=evaluate_script,
property_files=[evaluation_report],
)
# Register model step
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.workflow.step_collections import RegisterModel
# Create ModelMetrics object using the evaluation report from the evaluation step
# A ModelMetrics object contains metrics captured from a model.
model_metrics = ModelMetrics(
model_statistics=MetricsSource(
s3_uri=Join(
on="/",
values=[
step_evaluate_model.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"][
"S3Uri"
],
"evaluation.json",
],
),
content_type="application/json",
)
)
# Crete a RegisterModel step, which registers the model with SageMaker Model Registry.
step_register_model = RegisterModel(
name="Register-Model",
estimator=xgb_estimator,
model_data=step_tuning.get_top_model_s3_uri(top_k=0, s3_bucket=bucket),
content_types=["text/csv"],
response_types=["text/csv"],
inference_instances=["ml.t2.medium", "ml.m5.xlarge", "ml.m5.large"],
transform_instances=["ml.m5.xlarge"],
model_package_group_name=model_registry_package,
model_metrics=model_metrics,
)
# Accuracy condition step
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet
# Create accuracy condition to ensure the model meets performance requirements.
# Models with a test accuracy lower than the condition will not be registered with the model registry.
cond_gte = ConditionGreaterThanOrEqualTo(
left=JsonGet(
step_name=step_evaluate_model.name,
property_file=evaluation_report,
json_path="binary_classification_metrics.accuracy.value",
),
right=accuracy_condition_threshold,
)
# Create a SageMaker Pipelines ConditionStep, using the condition above.
# Enter the steps to perform if the condition returns True / False.
step_cond = ConditionStep(
name="Accuracy-Condition",
conditions=[cond_gte],
if_steps=[step_register_model],
else_steps=[],
)
# Create sagemaker pipeline
pipeline = Pipeline(
name=pipeline_name,
parameters=[
processing_instance_type,
training_instance_type,
input_data,
preprocess_script,
evaluate_script,
accuracy_condition_threshold,
model_registry_package,
max_parallel_training_jobs,
max_training_jobs,
],
steps=[step_preprocess_data, step_tuning, step_evaluate_model, step_cond],
)
pipeline.upsert(role_arn=role)
# Start the pipeline
pipeline.start(
execution_display_name="Credit",
parameters=dict(
InputData=credit_data_uri,
PreprocessScript=credit_preprocess_uri,
EvaluateScript=evaluate_script_uri,
AccuracyConditionThreshold=0.2,
MaxiumParallelTrainingJobs=2,
MaxiumTrainingJobs=5,
ModelGroup=credit_model_group,
),
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment