Utilizing SageMaker Pipeline to construct a buyer churn prediction mannequin for a telecom firm
Right here’s an instance of a real-world use case for utilizing Amazon SageMaker Pipelines in an end-to-end machine studying workflow. On this situation, we’re constructing a buyer churn prediction mannequin for a telecom firm. The pipeline consists of steps from information assortment to mannequin monitoring. I’ll stroll you thru every step with instance code.
Step one is gathering the uncooked information. This instance assumes the info is saved in Amazon S3.
import sagemaker
from sagemaker.workflow.parameters import ParameterStrings3_input_data = ParameterString(
identify="InputData",
default_value="s3://my-bucket/churn-data/raw-data.csv"
)
Subsequent, you preprocess the info, together with dealing with lacking values and scaling options.
from sagemaker.processing import ScriptProcessor, ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStepscript_processor = ScriptProcessor(
image_uri=sagemaker.image_uris.retrieve(
framework='sklearn',
area='us-west-2',
model='0.23-1'),
position=position,
instance_count=1,
instance_type='ml.m5.xlarge'
)
processing_step = ProcessingStep(
identify="DataProcessing",
processor=script_processor,
inputs=[
ProcessingInput(source=s3_input_data, destination='/opt/ml/processing/input')
],
outputs=[
ProcessingOutput(output_name="train_data", source='/opt/ml/processing/train'),
ProcessingOutput(output_name="test_data", source='/opt/ml/processing/test')
],
code="scripts/preprocessing.py"
)
Right here, preprocessing.py
handles characteristic engineering, corresponding to filling lacking values, one-hot encoding categorical variables, and scaling.
For label engineering, we extract the goal variable (e.g., churn or not) from the dataset.
label_engineering_step = ProcessingStep(
identify="LabelEngineering",
processor=script_processor,
inputs=[
ProcessingInput(source=s3_input_data, destination='/opt/ml/processing/input')
],
outputs=[
ProcessingOutput(output_name="train_labels", source='/opt/ml/processing/train_labels'),
ProcessingOutput(output_name="test_labels", source='/opt/ml/processing/test_labels')
],
code="scripts/label_engineering.py"
)
This step defines the mannequin we need to prepare. We use the XGBoost algorithm for instance.
from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStepxgboost_estimator = Estimator(
image_uri=sagemaker.image_uris.retrieve('xgboost', area='us-west-2', model='1.2-1'),
position=position,
instance_count=1,
instance_type='ml.m5.xlarge',
output_path="s3://my-bucket/churn-data/mannequin/"
)
xgboost_estimator.set_hyperparameters(
max_depth=5,
eta=0.2,
gamma=4,
min_child_weight=6,
subsample=0.7,
goal='binary:logistic',
num_round=100
)
training_step = TrainingStep(
identify="ModelTraining",
estimator=xgboost_estimator,
inputs={
'prepare': TrainingInput(s3_data=processing_step.properties.Outputs['train_data']),
'validation': TrainingInput(s3_data=processing_step.properties.Outputs['test_data'])
}
)
Subsequent, we carry out hyperparameter optimization (HPO) to search out one of the best set of hyperparameters for our mannequin.
from sagemaker.tuner import HyperparameterTuner, ContinuousParameterhyperparameter_ranges = {
'eta': ContinuousParameter(0.1, 0.5),
'max_depth': ContinuousParameter(3, 7)
}
tuner = HyperparameterTuner(
estimator=xgboost_estimator,
objective_metric_name='validation:auc',
hyperparameter_ranges=hyperparameter_ranges,
max_jobs=10,
max_parallel_jobs=2
)
tuning_step = TuningStep(
identify="HyperparameterTuning",
tuner=tuner,
inputs={
'prepare': TrainingInput(s3_data=processing_step.properties.Outputs['train_data']),
'validation': TrainingInput(s3_data=processing_step.properties.Outputs['test_data'])
}
)
As soon as the mannequin is educated, we consider its efficiency.
evaluation_step = ProcessingStep(
identify="ModelEvaluation",
processor=script_processor,
inputs=[
ProcessingInput(source=training_step.properties.ModelArtifacts.S3ModelArtifacts, destination='/opt/ml/processing/model')
],
outputs=[
ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation")
],
code="scripts/consider.py"
)
In consider.py
, we calculate metrics corresponding to accuracy, AUC, or F1-score.
After a profitable analysis, we register the mannequin to the SageMaker Mannequin Registry.
from sagemaker.mannequin import Mannequin
from sagemaker.workflow.model_step import ModelStepmannequin = Mannequin(
image_uri=sagemaker.image_uris.retrieve('xgboost', area='us-west-2', model='1.2-1'),
model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts,
position=position
)
model_registration_step = ModelStep(
identify="ModelRegistration",
mannequin=mannequin,
content_types=["text/csv"],
response_types=["text/csv"],
approval_status="PendingManualApproval"
)
Deploy the mannequin to an endpoint for real-time inference.
from sagemaker.workflow.steps import TransformStep
from sagemaker.transformer import Transformertransformer = Transformer(
model_name=model_registration_step.properties.ModelName,
instance_count=1,
instance_type='ml.m5.giant',
output_path='s3://my-bucket/churn-data/output'
)
transform_step = TransformStep(
identify="BatchInference",
transformer=transformer,
inputs=TrainingInput(s3_data=processing_step.properties.Outputs['test_data'])
)
After deployment, we monitor the mannequin for information drift and different efficiency points.
from sagemaker.model_monitor import DefaultModelMonitormonitor = DefaultModelMonitor(
position=position,
instance_count=1,
instance_type='ml.m5.giant',
volume_size_in_gb=20,
max_runtime_in_seconds=1800
)
monitor.create_monitoring_schedule(
endpoint_name='my-endpoint',
output_s3_uri='s3://my-bucket/monitoring',
schedule_cron_expression='cron(0 * ? * * *)'
)
Lastly, we outline the entire pipeline by combining all steps.
from sagemaker.workflow.pipeline import Pipelinepipeline = Pipeline(
identify="CustomerChurnPipeline",
steps=[
processing_step,
label_engineering_step,
training_step,
evaluation_step,
model_registration_step,
transform_step
]
)
Now you can execute your entire pipeline and monitor its progress.
execution = pipeline.begin()
execution.wait()
You’ll be able to record all pipeline executions and monitor variations.
pipeline.list_executions()