Created
March 25, 2023 15:07
-
-
Save jiachen247/1aae22ac510a0630cb4e47c5991c49f8 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from dataclasses import replace | |
import numpy as np | |
class Node(): | |
""" | |
Implements an individual node in the Decision Tree. | |
""" | |
def __init__(self, y): | |
self.y = y # Values of samples assigned to that node | |
self.score = np.inf # RSS score of the node (measure of impurity) | |
self.feature_idx = None # The feature used for the split (column number) | |
self.threshold = None # Threshold used for the splitt (scalar value) | |
self.left_child = None # Left child of the node (of type Node) | |
self.right_child = None # Right child of the node (of type Node) | |
def is_leaf(self): | |
if self.feature_idx is None: | |
return True | |
else: | |
return False | |
class MyDecisionTreeRegressor: | |
def __init__(self, max_depth=None, min_samples_split=2): | |
self.max_depth = max_depth | |
self.min_samples_split = min_samples_split | |
self.node = None | |
def calc_rss_score_node(self, y): | |
return np.sum(np.square(y - np.mean(y))) | |
def calc_rss_score_split(self, y_left, y_right): | |
return self.calc_rss_score_node(y_left) + self.calc_rss_score_node(y_right) | |
def calc_thresholds(self, x): | |
# x is a list of feature values; see example input in 1.1a) | |
# Your method should compute 'thresholds', which is the set of thresholds. | |
# The following initialization line is optional and can be removed if you wish. | |
thresholds = set() | |
######################################################################################### | |
### Your code starts here ############################################################### | |
uniq = np.unique(x) | |
uniq_count = len(uniq) | |
if uniq_count > 1: | |
for i in range(0, len(uniq) - 1): | |
x1 = uniq[i] | |
x2 = uniq[i + 1] | |
thresholds.add((x1 + x2) / 2.0) | |
### Your code ends here ################################################################# | |
######################################################################################### | |
return thresholds | |
def create_split(self, x, threshold): | |
## Get all row indices where the value is <= threshold | |
indices_left = np.where(x <= threshold)[0] | |
## Get all row indices where the value is > threshold | |
indices_right = np.where(x > threshold)[0] | |
## Return split | |
return indices_left, indices_right | |
def calc_best_split(self, X, y): | |
## X is the feature matrix; y is a vector of the response variable | |
## Initialize the return values | |
best_score, best_threshold, best_feature_idx, best_split = np.inf, None, None, None | |
## Loop through all features (columns of X) to find the best split | |
for feature_idx in range(X.shape[1]): | |
# Get all values for current feature | |
x = X[:, feature_idx] | |
################################################################################ | |
### Your code starts here ###################################################### | |
# You should use the calc_thresholds, create_split, and calc_rss_score_split functions. | |
# Note that create_split returns the *indices* of samples in the split, while | |
# calc_rss_score_split requires the *response values* (i.e. y) of samples in the split. | |
for threshold in self.calc_thresholds(x): | |
indices_left, indices_right = self.create_split(x, threshold) | |
yfunc = np.vectorize(lambda i: y[i]) | |
y_left = yfunc(indices_left) | |
y_right = yfunc(indices_right) | |
score = self.calc_rss_score_split(y_left, y_right) | |
if score <= best_score: | |
best_score = score | |
best_threshold = threshold | |
best_feature_idx = feature_idx | |
best_split = (indices_left, indices_right) | |
### Your code ends here ######################################################## | |
################################################################################ | |
return best_score, best_threshold, best_feature_idx, best_split | |
def fit(self, X, y): | |
## Initializa Decision Tree as a single root node | |
self.node = Node(y) | |
## Start recursive building of Decision Tree | |
self._fit(X, y, self.node) | |
## Return Decision Tree object | |
return self | |
def _fit(self, X, y, node, depth=0): | |
## Calculate and set RSS score of the node itself | |
node.score = self.calc_rss_score_node(y) | |
## If only one sample here, we can stop. | |
if len(y) <= 1: | |
return | |
######################################################################################### | |
### Your code starts here ############################################################### | |
# Please implement the stopping conditions as indicated by self.max_depth and self.min_sample_split. | |
# Note that if either variable is None, you should ignore that stopping condition. | |
if (self.max_depth and depth >= self.max_depth) or len(y) < self.min_samples_split: | |
return | |
### Your code ends here ################################################################# | |
######################################################################################### | |
## Calculate the best split | |
score, threshold, feature_idx, split = self.calc_best_split(X, y) | |
## If the information gain is negative, no need for further splitting | |
if score > node.score: | |
return | |
## Split the input and labels using the indices from the split | |
X_left, X_right = X[split[0]], X[split[1]] | |
y_left, y_right = y[split[0]], y[split[1]] | |
## Update the parent node based on the best split | |
node.feature_idx = feature_idx | |
node.threshold = threshold | |
node.left_child = Node(y_left) | |
node.right_child = Node(y_right) | |
## Recursively fit both child nodes (left and right) | |
self._fit(X_left, y_left, node.left_child, depth=depth+1) | |
self._fit(X_right, y_right, node.right_child, depth=depth+1) | |
def predict(self, X): | |
## Return list of individually predicted labels | |
return np.array([ self.predict_sample(self.node, x) for x in X ]) | |
def predict_sample(self, node, x): | |
## If the node is a leaf, return the mean value as the prediction | |
if node.is_leaf(): | |
return np.mean(node.y) | |
## If the node is not a leaf, go down the left or right subtree (depending on the feature value) | |
if x[node.feature_idx] <= node.threshold: | |
return self.predict_sample(node.left_child, x) | |
else: | |
return self.predict_sample(node.right_child, x) | |
def get_node_count(self): | |
return self._get_node_count(self.node) | |
def _get_node_count(self, node): | |
if node.is_leaf(): | |
return 1 | |
else: | |
return 1 + self._get_node_count(node.left_child) + self._get_node_count(node.right_child) | |
class MyRandomForestRegressor: | |
def __init__(self, n_estimators=100, max_depth=None, min_samples_split=2, max_features=1.0): | |
self.n_estimators = n_estimators | |
self.max_depth = max_depth | |
self.min_samples_split = min_samples_split | |
self.max_features = max_features | |
self.estimators = [] | |
def bootstrap_sampling(self, X, y): | |
X_bootstrap, y_bootstrap = None, None | |
N, d = X.shape | |
######################################################################################### | |
### Your code starts here ############################################################### | |
# Hint: consider np.random.choice. Bootstrap sampling should be *with replacement*. | |
choices = np.random.choice(N, N, replace=True) | |
X_bootstrap = np.zeros_like(X) | |
for i in range(N): | |
X_bootstrap[i] = X[choices[i]] | |
yfunc = np.vectorize(lambda i: y[i]) | |
y_bootstrap = yfunc(choices) | |
### Your code ends here ################################################################# | |
######################################################################################### | |
return X_bootstrap, y_bootstrap | |
def feature_sampling(self, X): | |
N, d = X.shape | |
X_features_sampled, indices_sampled = None, None | |
######################################################################################### | |
### Your code starts here ############################################################### | |
# Hint: feature sampling should be *without replacement*. | |
feature_count = np.int(np.ceil(d * self.max_features)) | |
indices_sampled = np.random.choice(d, feature_count, replace=False) | |
X_features_sampled = X[:, indices_sampled] | |
### Your code ends here ################################################################# | |
######################################################################################### | |
return X_features_sampled, indices_sampled | |
def fit(self, X, y): | |
self.estimators = [] | |
for _ in range(self.n_estimators): | |
regressor, indices_sampled = None, None | |
######################################################################################### | |
### Your code starts here ############################################################### | |
# Use your implementation of MyDecisionTreeRegressor in here, making sure to correctly pass it | |
# the values of self.max_depth and self.min_samples_split. | |
X_sampled, y_bootstrap = self.bootstrap_sampling(X, y) | |
X_features_sampled, indices_sampled = self.feature_sampling(X_sampled) | |
regressor = MyDecisionTreeRegressor(max_depth=self.max_depth, min_samples_split=self.min_samples_split).fit(X_features_sampled, y_bootstrap) | |
### Your code ends here ################################################################# | |
######################################################################################### | |
self.estimators.append((regressor, indices_sampled)) | |
return self | |
def predict(self, X): | |
predictions = [] | |
######################################################################################### | |
### Your code starts here ############################################################### | |
for (regressor, indices_sampled) in self.estimators: | |
X_new = X[:, indices_sampled] | |
predictions.append(regressor.predict(X_new)) | |
predictions = np.average(predictions, axis=0) | |
### Your code ends here ################################################################# | |
######################################################################################### | |
return predictions | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment