Created
March 3, 2021 04:37
-
-
Save jainxy/27f9767a9ee019008923661b2d843eb3 to your computer and use it in GitHub Desktop.
Scikit-learn and related code samples
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Which attributes/features to choose? | |
# Which model to use? | |
# Tune/optimize the chosen model for the best performance | |
# Ensuring the trained model will generalize to unseen data | |
# Estimate performance of the trained model on unseen data | |
# imports | |
import sklearn | |
import IPython.display | |
import matplotlib.pyplot as plt | |
import seaborn as sns | |
import pandas as pd | |
import numpy as np | |
from sklearn import metrics | |
from sklearn.linear_model import LogisticRegression | |
from sklearn.neighbors import KNeighborsClassifier | |
from sklearn.model_selection import train_test_split, cross_val_score | |
# load toy dataset / sklearn class/functions are imported on demand | |
from sklearn.datasets import load_iris | |
iris = load_iris() | |
X = iris.data | |
y = iris.target | |
print(X.shape) | |
print(y.shape) | |
# KNN classification illustrating sklearn's 4-step modelling pattern | |
# Step-1: import the class | |
from sklearn.neighbors import KNeighborsClassifier | |
# Step-2: Instantiate the estimator (model in sklearn) passing desired parameter values | |
knn = KNeighborsClassifier(n_neighbors=1) | |
# Step-3: Fit the model, i.e. perform model training | |
knn.fit(X,y) # in-place operation | |
# Step-4: Predict for new data samples | |
knn.predict([[3,7,11,2]]) | |
X_test = [[3,5,4,2], [5,4,3,2]] | |
knn.predict(X_test) | |
# MODEL TUNING : try different model parameters for best performance | |
n_neighbors = 5 | |
knn = KNeighborsClassifier(n_neighbors=1) | |
knn.fit(X,y) | |
knn.predict(X_test) | |
# Try different estimator | |
# import | |
from sklearn.linear_model import LogisticRegression | |
# instantiate | |
est = LogisticRegression() | |
# train | |
est.fit(X,y) | |
# predict | |
y_pred = est.predict(X) | |
# choose model -> tune model -> estimate performacne on out-of-sample data | |
# evaluate classification accuracies | |
# accuracy | |
acc = metrics.accuracy_score(y, y_pred) | |
print(acc) | |
# TRAIN-TEST split | |
# split into train-test sets ; train on the train set ; evaluate on the test set | |
from sklearn.model_selection import train_test_split | |
X_train, X_test, y_train, y_test = train_test_split(X,y,test_size=0.4) | |
model = LogisticRegression() | |
model.fit(X_train,y_train) | |
y_test_pred = model.predict(X_test) | |
acc_test = metrics.accuracy_score(y_test, y_test_pred) | |
print(acc_test) | |
# search for k in knnclassifier | |
k_range = range(1, 26) | |
acc_scores = [] | |
for k in k_range: | |
est = KNeighborsClassifier(n_neighbors=k) | |
est.fit(X_train, y_train) | |
y_test_pred = est.predict(X_test) | |
acc_scores.append(round(metrics.accuracy_score(y_test, y_test_pred), 3)) | |
# plot testing accuracy vs model complexity | |
plt.plot(k_range, acc_scores) | |
plt.xlabel("K in KNN") | |
plt.ylabel("Test accuracy") | |
# after choosing and tuning the model based on train-test-val splits, retrain the final one on whole dataset | |
# train-test split evaluation gives high variance estimate of out-of-sample accuracy, as testing accuracy can vary | |
# a lot based on which samples are there in the test set. | |
# use k-folds cross-validation for better estimate | |
#======================================== | |
df = pd.DataFrame(np.concatenate([X,y[:,np.newaxis]],axis=1), columns = iris.feature_names+['type']) | |
# visualize relationship b/w features and response variable | |
# sns.pairplot(df,x_vars=iris.feature_names, y_vars='type') | |
# Use train-test split for feature selection well. | |
## Use k-fold cross-validation for selecting model, tuning parameters, selecting features | |
# kflod provides better estimate of out-of-sample accuracy and use every sample for both training and testing | |
from sklearn.model_selection import KFold | |
kf = KFold(n_splits=10) # 10 splits is recommended | |
kf.split(X) | |
# in classification problems, use Startified sampling | |
# repeat multiple cross-validations with different random splits of the data, and average across test accuracy/score. | |
# use hold-out set from cv; then report final accuracy of tuned model on this. | |
# feature engg and selection within cv. | |
from sklearn.model_selection import cross_val_score | |
knn = KNeighborsClassifier(n_neighbors=5) | |
scores = cross_val_score(knn, X, y, cv=10, scoring='accuracy') | |
print(scores.mean()) | |
# search/tune k with cv accuracy | |
k_scores = [] | |
for k in range(1,31): | |
knn = KNeighborsClassifier(n_neighbors=k) | |
scores = cross_val_score(knn,X,y,cv=10,scoring='accuracy') | |
k_scores.append(round(scores.mean(), 3)) | |
## Grid search of hyper-parameters | |
from sklearn.model_selection import GridSearchCV, RandomizedSearchCV | |
# define parameter ranges to be searched | |
k_range = range(1,31) | |
# create parameter grid as dict | |
param_grid = dict(n_neighbors=k_range) | |
# instantiate the grid | |
grid = GridSearchCV(knn, param_grid, cv=10, scoring='accuracy') | |
# fit the grid object | |
grid.fit(X,y) | |
grid.cv_results_['mean_test_score'] | |
# examine the best model | |
grid.best_score_ | |
grid.best_params_ | |
grid.best_estimator_ | |
## search/tune multiple parameters simultaneously | |
k_range = range(1,31) | |
weight_options = ['uniform', 'distance'] | |
param_grid = dict(n_neighbors=k_range, weights = weight_options) | |
grid = GridSearchCV(knn, param_grid, cv=10, scoring='accuracy') | |
grid.fit(X,y) | |
# examine the best model | |
grid.best_score_ | |
grid.best_params_ | |
grid.best_estimator_ | |
# predict using grid object | |
grid.predict([[3,6,8,11]]) | |
## RandomizedSearchCV —> seaches a subset of parameters, which we can control | |
k_range = range(1,31) | |
weight_options = ['uniform', 'distance'] | |
# specify param distribution | |
param_dist = dict(n_neighbors=k_range, weights = weight_options) | |
# instantiate for 20 iteration of search | |
rand = RandomizedSearchCV(knn, param_distributions=param_dist, scoring='accuracy', n_iter=20, random_state=50) | |
rand.fit(X,y) | |
# examine the best model | |
rand.best_score_ | |
rand.best_params_ | |
rand.best_estimator_ | |
## start with grid-search and see how much time it takes. If longer, use RandomSearch selecting appropriate n_iter value | |
## Model evaluation procedure and evaluation metric to generalize how well overall pipeline perform on the out-of-sample data -> Model selection ; paramter tuning/optimal selection ; choosing among features / feature-selection | |
import pandas as pd | |
data_url = 'https://raw.githubusercontent.com/justmarkham/scikit-learn-videos/master/data/pima-indians-diabetes.data' | |
col_names = ['pregnant', 'glucose', 'bp', 'skin', 'insulin', 'bmi', 'pedigree', 'age', 'label'] | |
pima = pd.read_csv(data_url, header=None, names=col_names) | |
## define X and y | |
# select some of the features | |
feature_names = ['pregnant', 'insulin', 'bmi', 'age'] | |
X = pima[feature_names] | |
y = pima.label | |
# split data | |
from sklearn.model_selection import train_test_split | |
X_train, X_test, y_train, y_test = train_test_split(X,y,test_size=0.25, random_state=10) | |
# train a logistic regression model on the training set | |
from sklearn.linear_model import LogisticRegression | |
logreg = LogisticRegression() | |
logreg.fit(X_train, y_train) | |
# predict for test-set | |
y_test_pred = logreg.predict(X_test) | |
# classification accuracy | |
from sklearn import metrics | |
acc = metrics.accuracy_score(y_test, y_test_pred) | |
print(acc) | |
# check class distribution of the test-set | |
y_test.value_counts() | |
# percentage samples in each class | |
per0, per1 = y_test.value_counts(normalize=True) | |
# null accuracy - predict label of dominant class (a baseline) | |
acc_null = round(y_test.value_counts(normalize=True).max(), 3) | |
print(acc_null) | |
## Classification accuracy doesn't represent underlying label/prediction distribution | |
# confusion matrix -> can be used to generate multiple performance metrics; useful with multi-class problem as well | |
cmatrix = metrics.confusion_matrix(y_test, y_test_pred) | |
TN, FP = cmatrix[0] ; FN, TP = cmatrix[1] | |
## compute metrics from confusion matrix | |
# accuracy and error-rate | |
acc_conf = (TP+TN)/(TP + TN + FP + FN) | |
err_conf = 1-acc_conf | |
# sensitivity/recall/TPR | |
sens = TP/(TP+FN) | |
metrics.recall_score(y_test, y_test_pred) | |
# specificity/selectivity/TNR | |
spec = TN/(TN+FP) | |
# FPR | |
fpr = FP/(TN+FP) | |
# precision | |
precision = TP/(TP+FP) | |
metrics.precision_score(y_test, y_test_pred) | |
# F1 score | |
f1 = metrics.f1_score(y_test, y_test_pred) | |
## Adjusting classification threshold | |
y_test_pred = logreg.predict(X_test) | |
y_test_prob = logreg.predict_proba(X_test)[:,1] | |
# analyze distribution/histogram | |
plt.hist(y_test_prob, bins=8) | |
plt.xlim(0, 1) | |
plt.title('Histogram of predicted probabilities') | |
plt.xlabel('Predicted probability of diabetes') | |
plt.ylabel('Frequency') | |
# decrease the threshold (default is 0.5) for predicting +ve class, thus increasing sensitivity | |
from sklearn.preprocessing import binarize | |
y_pred_thresh = binarize([y_test_prob], threshold=0.3)[0] | |
cmatrix_thresh = metrics.confusion_matrix(y_test, y_pred_thresh) | |
sens = metrics.recall_score(y_test, y_pred_thresh) # TPR: increased | |
spec = cmatrix_thresh[0,0]/(cmatrix_thresh[0,0] + cmatrix_thresh[0,1]) # TNR: decreased | |
# Sensitivity and specificity have an inverse relationship | |
# generate and plot sensitivity-vs-specificity ROC for different thresholds; | |
fpr, tpr, thresholds = metrics.roc_curve(y_test, y_test_prob) | |
plt.plot(fpr, tpr) | |
plt.xlim([0.0, 1.0]) | |
plt.ylim([0.0, 1.0]) | |
plt.title('ROC curve for diabetes classifier') | |
plt.xlabel('False Positive Rate (1 - Specificity)') | |
plt.ylabel('True Positive Rate (Sensitivity)') | |
plt.grid(True) | |
# define a function that accepts a threshold and prints sensitivity and specificity | |
def evaluate_threshold(threshold): | |
print('Sensitivity:', tpr[thresholds > threshold][-1]) | |
print('Specificity:', 1 - fpr[thresholds > threshold][-1]) | |
# summarize ROC by calculating AUC | |
# ROC_AUC represents likelihood of classifier higher +ve prob to a +ve observation | |
# useful with high class imbalance as well unlike classification accuracy | |
# we dont need to mention/choose any particular threshold for model evaluation | |
# but less interpretable for multicalss problems, here confusion matrix is a better choice | |
roc_auc = metrics.roc_auc_score(y_test, y_test_prob) | |
print(roc_auc) | |
# Use ROC-AUC with cross_val_score | |
cross_val_score(logreg, X, y, cv=10, scoring='roc_auc').max() | |
============================== | |
## XGBoost | |
# import dataset | |
import pandas as pd | |
import xgboost as xgb | |
import numpy as np | |
from sklearn.datasets import load_boston | |
from sklearn.model_selection import train_test_split | |
from sklearn import metrics | |
boston = load_boston(return_X_y=False) | |
data = pd.DataFrame(boston.data) | |
data.columns = boston.feature_names | |
data['PRICE'] = boston.target | |
X, y = data.iloc[:,:-1],data.iloc[:,-1] | |
data_dmatrix = xgb.DMatrix(data=X,label=y) | |
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=123) | |
xg_reg = xgb.XGBRegressor(objective ='reg:linear', colsample_bytree = 0.3, learning_rate = 0.1, | |
max_depth = 5, alpha = 10, n_estimators = 10) | |
xg_reg.fit(X_train,y_train) | |
preds = xg_reg.predict(X_test) | |
rmse = np.sqrt(metrics.mean_squared_error(y_test, preds)) | |
print("RMSE: %f" % (rmse)) | |
params = {"objective":"reg:linear",'colsample_bytree': 0.3,'learning_rate': 0.1, | |
'max_depth': 5, 'alpha': 10} | |
cv_results = xgb.cv(dtrain=data_dmatrix, params=params, nfold=3, | |
num_boost_round=50,early_stopping_rounds=10,metrics="rmse", as_pandas=True, seed=123) | |
cv_results.head() | |
print((cv_results["test-rmse-mean"]).tail(1)) | |
=========================== | |
(1) | |
{'bootstrap': [True, False], | |
'max_depth': [10, 20, 30, 40, 50, 60, 70, 80, 90, 100, None], | |
'max_features': ['auto', 'sqrt'], | |
'min_samples_leaf': [1, 2, 4], | |
'min_samples_split': [2, 5, 10], | |
'n_estimators': [10, 20, 40, 60, 100, 150, 200, 400, 600]} | |
# Create the random grid | |
random_grid = {'n_estimators': n_estimators, | |
'max_features': max_features, | |
'max_depth': max_depth, | |
'min_samples_split': min_samples_split, | |
'min_samples_leaf': min_samples_leaf, | |
'bootstrap': bootstrap} | |
rf_random = RandomizedSearchCV(estimator = rf, param_distributions = random_grid, n_iter = 100, cv = 3, verbose=2, random_state=42, n_jobs = -1) | |
best_random = rf_random.best_estimator_ | |
(2) | |
param_grid = { | |
'bootstrap': [True], | |
'max_depth': [80, 90, 100, 110], | |
'max_features': [2, 3], | |
'min_samples_leaf': [3, 4, 5], | |
'min_samples_split': [8, 10, 12], | |
'n_estimators': [10, 20, 50, 100, 200] | |
} | |
# Create a based model | |
rf = RandomForestRegressor()# Instantiate the grid search model | |
grid_search = GridSearchCV(estimator = rf, param_grid = param_grid, cv = 3, n_jobs = -1, verbose = 2) | |
(3) | |
===================== | |
from imblearn.over_sampling import RandomOverSampler, SMOTE, ADASYN | |
RANDOM_STATE=42 | |
# just uncomment the oversampling strategy you want to experiment with | |
ros = RandomOverSampler(random_state=RANDOM_STATE) | |
#ros = SMOTE(random_state=RANDOM_STATE) | |
#ros = ADASYN(random_state=RANDOM_STATE) | |
X_resampled, y_resampled = ros.fit_resample(X_train, y_train) | |
# this is just to check if now the 2 classes are equally distributed | |
print(sorted(Counter(y_resampled).items())) | |
rf = RandomForestClassifier(n_jobs=-1, random_state=RANDOM_STATE, | |
n_estimators=100, min_samples_leaf=11) | |
rf.fit(X_resampled, y_resampled) | |
print_report(rf, X_valid, y_valid, t=0.4, X_train=X_train, y_train=y_train) | |
======================== | |
params = {"objective":"reg:linear",'colsample_bytree': 0.3,'learning_rate': 0.1, | |
'max_depth': 5, 'alpha': 10} | |
cv_results = xgb.cv(dtrain=data_dmatrix, params=params, nfold=3, | |
num_boost_round=50,early_stopping_rounds=10,metrics="rmse", as_pandas=True, seed=123) | |
========================= | |
{"learning_rate" : [0.05, 0.10, 0.15, 0.20, 0.30 ] , | |
"max_depth" : [ 3, 5, 7, 10, 12, 15], | |
"min_child_weight" : [ 1, 3, 5, 7 ], | |
"gamma" : [ 0.0, 0.1, 0.2 , 0.3, 0.4 ], | |
"colsample_bytree" : [ 0.3, 0.4, 0.5 , 0.7 ] } | |
# A parameter grid for XGBoost | |
params = { | |
'min_child_weight': [1, 5, 10], | |
'gamma': [0.5, 1, 1.5, 2, 5], | |
'subsample': [0.6, 0.8, 1.0], | |
'colsample_bytree': [0.6, 0.8, 1.0], | |
'max_depth': [3, 4, 5] | |
} | |
xgb = XGBClassifier(learning_rate=0.02, n_estimators=600, objective='binary:logistic', | |
silent=True, nthread=1) | |
folds = 5 | |
param_comb = 50 | |
skf = StratifiedKFold(n_splits=folds, shuffle = True, random_state = 1001) | |
random_search = RandomizedSearchCV(xgb, param_distributions=params, n_iter=param_comb, scoring='roc_auc', n_jobs=-1, cv=skf.split(X,Y), verbose=3, random_state=1001 ) | |
random_search.fit(X, Y) | |
#=============== | |
#SVM | |
tuned_parameters = [{'kernel': ['rbf'], 'C': [0.1, 1, 10, 100, 1000], 'gamma': [1, 0.1, 0.01, 0.001, 0.0001]} , | |
{'kernel': ['linear'], 'C': [1, 10, 100, 1000]}] | |
scores = ['precision', 'recall'] | |
for score in scores: | |
print("# Tuning hyper-parameters for %s" % score) | |
print() | |
clf = GridSearchCV( | |
SVC(), tuned_parameters, scoring='%s_macro' % score | |
) | |
clf.fit(X_train, y_train) | |
print("Best parameters set found on development set:") | |
print() | |
print(clf.best_params_) | |
print() | |
print("Grid scores on development set:") | |
print() | |
means = clf.cv_results_['mean_test_score'] | |
stds = clf.cv_results_['std_test_score'] | |
for mean, std, params in zip(means, stds, clf.cv_results_['params']): | |
print("%0.3f (+/-%0.03f) for %r" | |
% (mean, std * 2, params)) | |
print() | |
print("Detailed classification report:") | |
print() | |
print("The model is trained on the full development set.") | |
print("The scores are computed on the full evaluation set.") | |
print() | |
y_true, y_pred = y_test, clf.predict(X_test) | |
print(classification_report(y_true, y_pred)) | |
print() | |
#=============== | |
# GaussianProcessClassifier | |
from sklearn.gaussian_process import GaussianProcessClassifier | |
from sklearn.gaussian_process.kernels import Matern, DotProduct, RBF, RationalQuadratic, WhiteKernel | |
model = GaussianProcessClassifier(kernel=DotProduct(1.0)) | |
param_grid = {'kernel':[DotProduct(i) for i in [0.2, 0.5, 1,2,3,5]] + [Matern(i) for i in [0.2, 0.5, 1,2,3,5]] + [RBF(i) for i in [0.2, 0.5, 1,2,3,5]] + [RationalQuadratic(i) for i in [0.2, 0.5, 1,2,3,5]] + [WhiteKernel(i) for i in [0.2, 0.5, 1,2,3,5]]} | |
clf = RandomizedSearchCV(model, param_grid,n_jobs=-1,n_iter=100,random_state=0,verbose=3) | |
#=============== # NN Classifier | |
# TIPS: | |
# 1. Multi-layer Perceptron is sensitive to feature scaling, so it is highly recommended to scale your data. | |
# 2. use StandardScaler in a Pipeline | |
# 3. Finding a reasonable regularization parameter is best done using GridSearchCV, usually in the range 10.0 ** -np.arange(1, 7). | |
# 4. For relatively large datasets (with thousands of training samples or more), however, Adam is very robust. | |
# 5. Currently fir our usecase, default settings are good. | |
from sklearn.neural_network import MLPClassifier | |
# create model object | |
clf = MLPClassifier(random_state=1, max_iter=300) | |
# train | |
clf.fit(X_train, y_train) | |
# predict | |
clf.predict_proba(X_test[:1]) | |
clf.predict(X_test[:5, :]) | |
clf.score(X_test, y_test) | |
## Using Keras | |
from tensorflow import keras | |
from scipy.stats import reciprocal | |
from keras.constraints import maxnorm | |
# set seed for reproducibility | |
seed = 7 | |
numpy.random.seed(seed) | |
def build_model(n_hidden=1, n_neurons=30, learning_rate=3e-3, input_shape=[28, 28], init_mode='uniform', activation='relu', dropout_rate=0.5, weight_constraint=0): | |
model = keras.models.Sequential() | |
model.add(keras.layers.Flatten(input_shape=input_shape)) | |
for layer in range(n_hidden): | |
model.add(keras.layers.Dense(n_neurons, activation=activation, kernel_initializer=init_mode, | |
kernel_constraint=keras.constraints.MaxNorm(weight_constraint))) | |
model.add(keras.layers.Dropout(dropout_rate)) | |
model.add(keras.layers.Dense(10, activation="softmax", kernel_initializer=init_mode)) | |
optimizer = keras.optimizers.Adam(lr=learning_rate) | |
model.compile(loss="categorical_crossentropy", optimizer=optimizer, metrics=["accuracy"]) | |
return model | |
# define the grid search parameters | |
batch_size = [10, 20, 40, 60, 80, 100] | |
epochs = [10, 50, 100] | |
init_mode = ['uniform', 'lecun_uniform', 'normal', 'zero', 'glorot_normal', 'glorot_uniform', 'he_normal', 'he_uniform'] | |
activation = ['relu', 'tanh', 'sigmoid', 'linear'] | |
weight_constraint = [0, 1, 2, 3] | |
dropout_rate = [0.0, 0.1, 0.2, 0.3, 0.4, 0.5] | |
keras_param_space = {"n_hidden": [1, 2, 3, 4], | |
"n_neurons": np.arange(30, 300), | |
"learning_rate": reciprocal(3e-4, 3e-2) | |
} | |
param_grid = dict(batch_size=batch_size, epochs=epochs, init_mode=init_mode, dropout_rate=dropout_rate, weight_constraint=weight_constraint) | |
keras_param_space.update(param_grid) | |
# build model | |
keras_clf = tf.keras.wrappers.scikit_learn.KerasClassifier(build_fn=build_model, *<args-to-build_model>, epochs=100, batch_size=20, verbose=0) | |
keras_rand_search = RandomizedSearchCV(keras_clf, keras_param_space, n_iter=20, | |
cv=5, scoring="accuracy", n_jobs=-1, verbose=True) | |
keras_rand_search.fit(X, y) | |
# keras_rand_search.fit(X_train, y_train, epochs=100, | |
# validation_data=(X_valid, y_valid), | |
# callbacks=[keras.callbacks.EarlyStopping(patience=10)]) | |
# summarize results | |
print("Best: %f using %s" % (grid_result.best_score_, grid_result.best_params_)) | |
means = grid_result.cv_results_['mean_test_score'] | |
stds = grid_result.cv_results_['std_test_score'] | |
params = grid_result.cv_results_['params'] | |
for mean, stdev, param in zip(means, stds, params): | |
print("%f (%f) with: %r" % (mean, stdev, param)) | |
#=============== # Callbacks | |
# example | |
def on_step(optim_result): | |
""" | |
Callback meant to view scores after | |
each iteration while performing Bayesian | |
Optimization in Skopt""" | |
score = forest_bayes_search.best_score_ | |
print("best score: %s" % score) # Prints score after each iteration | |
if score >= 0.98: # early-stopping kind of functionality | |
print('Interrupting!') | |
return True | |
hpo_search.fit(X_train, y_train, callback=on_step) # callback=on_step will print score after each iteration | |
#=============== | |
# Deploy | |
# ---------------------------- Create end-point | |
from sagemaker.sklearn.estimator import SKLearn | |
role = sagemaker.get_execution_role() | |
# Create the SKLearn Object by directing it to the aws_sklearn_main.py script | |
aws_sklearn = SKLearn(entry_point='aws_sklearn_main.py', | |
train_instance_type='ml.m4.xlarge', | |
role=role) | |
# Deploy model | |
# The deploy method creates the deployable model, configures the SageMaker hosting services endpoint, and launches the endpoint to host the model | |
aws_sklearn_predictor = aws_sklearn.deploy(instance_type='ml.m4.xlarge', | |
initial_instance_count=1, | |
content_type='text/csv',) | |
# a prediction request to the endpoint | |
response = aws_sklearn_predictor.predict(data) | |
model = aws_sklearn_predictor.create_model() | |
# Print the endpoint to test in next step | |
print(aws_sklearn_predictor.endpoint) | |
# Tears down the endpoint container and deletes the corresponding endpoint configuration | |
# aws_sklearn_predictor.delete_endpoint() | |
# Deletes the model | |
# aws_sklearn_predictor.delete_model() | |
# ---------------------------- Create end-point from existing model | |
# If you have an existing model and want to deploy it locally, don’t specify a sagemaker_session argument to the estimator constructor | |
# The correct session is generated when you call model.deploy() | |
import numpy | |
from sagemaker.mxnet import MXNetModel | |
model_location = 's3://mybucket/my_model.tar.gz' | |
code_location = 's3://mybucket/sourcedir.tar.gz' | |
s3_model = MXNetModel(model_data=model_location, role='SageMakerRole', | |
entry_point='mnist.py', source_dir=code_location) | |
predictor = s3_model.deploy(initial_instance_count=1, instance_type='local') | |
data = numpy.zeros(shape=(1, 1, 28, 28)) | |
predictor.predict(data) | |
# ---------------------------- Call end-point | |
# create sagemaker client using boto3 | |
# client = boto3.client('sagemaker-runtime') | |
client = boto3.client('runtime.sagemaker') | |
# Specify endpoint and content_type | |
custom_attributes = "c000b4f9-df62-4c85-a0bf-7c525f9104a4" # Optional | |
endpoint_name = "endpoint_from_deployed_model_in_step_6" | |
content_type = "text/csv" | |
accept = "..." # The desired MIME type of the inference in the response. | |
request_body = "..." | |
# Make call to endpoint | |
response = client.invoke_endpoint( | |
EndpointName=endpoint_name, | |
ContentType=content_type, | |
Body=request_body | |
) | |
print(response['CustomAttributes']) # Optional | |
# result = response['Body'].read().decode() | |
# print('Predicted label is {}.'.format(result)) | |
## Useful excepts | |
# A customer's model containers must respond to requests within 60 seconds. The model itself can have a maximum processing time of 60 seconds before responding to invocations. If your model is going to take 50-60 seconds of processing time, the SDK socket timeout should be set to be 70 seconds. | |
# we create the model objects with the image and model data. These model objects are used to deploy production variants on an endpoint. The models are developed by training ML models on different data sets, different algorithms or ML frameworks, and different hyperparameters | |
# ---------------------------- Call end-point2 | |
from sagemaker.amazon.amazon_estimator import get_image_uri | |
model_name = f"DEMO-xgb-churn-pred-{datetime.now():%Y-%m-%d-%H-%M-%S}" | |
model_name2 = f"DEMO-xgb-churn-pred2-{datetime.now():%Y-%m-%d-%H-%M-%S}" | |
image_uri = get_image_uri(boto3.Session().region_name, 'xgboost', '0.90-1') | |
image_uri2 = get_image_uri(boto3.Session().region_name, 'xgboost', '0.90-2') | |
sm_session.create_model(name=model_name, role=role, container_defs={ | |
'Image': image_uri, | |
'ModelDataUrl': model_url | |
}) | |
sm_session.create_model(name=model_name2, role=role, container_defs={ | |
'Image': image_uri2, | |
'ModelDataUrl': model_url2 | |
}) | |
from sagemaker.session import production_variant | |
variant1 = production_variant(model_name=model_name, | |
instance_type="ml.m5.xlarge", | |
initial_instance_count=1, | |
variant_name='Variant1', | |
initial_weight=1) | |
variant2 = production_variant(model_name=model_name2, | |
instance_type="ml.m5.xlarge", | |
initial_instance_count=1, | |
variant_name='Variant2', | |
initial_weight=1) | |
endpoint_name = f"DEMO-xgb-churn-pred-{datetime.now():%Y-%m-%d-%H-%M-%S}" | |
print(f"EndpointName={endpoint_name}") | |
sm_session.endpoint_from_production_variants( | |
name=endpoint_name, | |
production_variants=[variant1, variant2] | |
) | |
# invoke end-point | |
# get a subset of test data for a quick test | |
!tail -120 test_data/test-dataset-input-cols.csv > test_data/test_sample_tail_input_cols.csv | |
print(f"Sending test traffic to the endpoint {endpoint_name}. \nPlease wait...") | |
with open('test_data/test_sample_tail_input_cols.csv', 'r') as f: | |
for row in f: | |
print(".", end="", flush=True) | |
payload = row.rstrip('\n') | |
sm_runtime.invoke_endpoint(EndpointName=endpoint_name, | |
ContentType="text/csv", | |
Body=payload, | |
TargetVariant="Variant1",# optional | |
) | |
time.sleep(0.5) | |
print("Done!") | |
# ---------------------------- Update endpoints production variant | |
sm.update_endpoint_weights_and_capacities( | |
EndpointName=endpoint_name, | |
DesiredWeightsAndCapacities=[ | |
{ | |
"DesiredWeight": 25, | |
"VariantName": variant1["VariantName"] | |
}, | |
{ | |
"DesiredWeight": 75, | |
"VariantName": variant2["VariantName"] | |
} | |
] | |
) | |
# ---------------------------- Inference data formats | |
# Amazon SageMaker algorithms accept and produce several different MIME types for the HTTP payloads used in retrieving online and mini-batch predictions. | |
# At a minimum, you need to convert the data for the following: Inference request serialization (handled by you), Inference request deserialization (handled by the algorithm), Inference response serialization (handled by the algorithm), Inference response deserialization (handled by you) | |
# Any transformations performed on the training data should also be performed on the data before obtaining inference. The order of the features matters and must remain unchanged. | |
# | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment