机器学习算法原理实现——gbdt

发布时间 2023-09-16 17:05:13作者: bonelee

 

 

前面的文章介绍了决策树的实现,我们基于之前决策树的实现(https://www.cnblogs.com/bonelee/p/17691555.html),写一个gbdt:

 

import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
import matplotlib.pyplot as plt


class TreeNode:
    def __init__(self, mse, num_samples, predicted_value):
        self.mse = mse
        self.num_samples = num_samples
        self.predicted_value = predicted_value
        self.feature_index = 0
        self.threshold = 0
        self.left = None
        self.right = None
 
def mse(y):
    if len(y) == 0:
        return 0
    return np.mean((y - np.mean(y)) ** 2)
 
def grow_tree(X, y, depth=0, max_depth=None):
    num_samples = len(y)
    predicted_value = np.mean(y)
    node = TreeNode(
        mse=mse(y),
        num_samples=num_samples,
        predicted_value=predicted_value,
    )
 
    if depth < max_depth:
        idx, thr = best_split(X, y)
        if idx is not None:
            indices_left = X[:, idx] < thr
            X_left, y_left = X[indices_left], y[indices_left]
            X_right, y_right = X[~indices_left], y[~indices_left]
            node.feature_index = idx
            node.threshold = thr
            node.left = grow_tree(X_left, y_left, depth + 1, max_depth)
            node.right = grow_tree(X_right, y_right, depth + 1, max_depth)
    return node
 
def best_split(X, y):
    n_samples, n_features = X.shape
    if n_samples <= 1:
        return None, None
 
    best = {}
    min_mse = float('inf')
 
    for feature_idx in range(n_features):
        thresholds = np.unique(X[:, feature_idx])
        for threshold in thresholds:
            left_mask = X[:, feature_idx] < threshold
            right_mask = ~left_mask
 
            mse_left = mse(y[left_mask])
            mse_right = mse(y[right_mask])
 
            weighted_mse = len(y[left_mask]) / n_samples * mse_left + len(y[right_mask]) / n_samples * mse_right
            if weighted_mse < min_mse:
                best = {
                    'feature_index': feature_idx,
                    'threshold': threshold,
                    'left_values': y[left_mask],
                    'right_values': y[right_mask],
                    'mse': weighted_mse
                }
                min_mse = weighted_mse
 
    return best['feature_index'], best['threshold']
 
 
def predict_tree(node, X):
    if node.left is None and node.right is None:
        return node.predicted_value
    if X[node.feature_index] < node.threshold:
        return predict_tree(node.left, X)
    else:
        return predict_tree(node.right, X)
 
class CARTRegressor:
    def __init__(self, max_depth=None):
        self.max_depth = max_depth
 
    def fit(self, X, y):
        self.tree_ = grow_tree(X, y, max_depth=self.max_depth)
 
    def predict(self, X):
        return [predict_tree(self.tree_, x) for x in X]
    

### 平方损失
class SquareLoss:
    # 平方损失函数
    def loss(self, y, y_pred):
        return 0.5 * np.power((y - y_pred), 2)
    # 平方损失的一阶导数
    def gradient(self, y, y_pred):
        return -(y - y_pred)

### GBDT定义
class GBDT(object):
    def __init__(self, n_estimators, learning_rate, min_samples_split,
                 min_gini_impurity, max_depth, regression):
        ### 基本超参数
        # 树的棵数
        self.n_estimators = n_estimators
        # 学习率
        self.learning_rate = learning_rate
        # 结点最小分裂样本数
        self.min_samples_split = min_samples_split # todo
        # 结点最小基尼不纯度
        self.min_gini_impurity = min_gini_impurity # todo
        # 最大深度
        self.max_depth = max_depth
        # 默认为回归树
        self.regression = regression
        # 损失为平方损失
        self.loss = SquareLoss()
        # 如果是分类树,需要定义分类树损失函数
        # 这里省略,如需使用,需自定义分类损失函数
        if not self.regression:
            self.loss = None
        # 多棵树叠加
        self.estimators = []
        for i in range(self.n_estimators):
            self.estimators.append(CARTRegressor(max_depth=self.max_depth))

    # 拟合方法
    def fit(self, X, y):
        # 前向分步模型初始化,第一棵树
        self.estimators[0].fit(X, y)
        # 第一棵树的预测结果
        y_pred = self.estimators[0].predict(X)
        # 前向分步迭代训练
        for i in range(1, self.n_estimators):
            gradient = self.loss.gradient(y, y_pred)
            self.estimators[i].fit(X, gradient)
            y_pred -= np.multiply(self.learning_rate, self.estimators[i].predict(X))

    # 预测方法
    def predict(self, X):
        # 回归树预测
        y_pred = self.estimators[0].predict(X)
        for i in range(1, self.n_estimators):
            y_pred -= np.multiply(self.learning_rate, self.estimators[i].predict(X))
        # 分类树预测
        if not self.regression:
            # 将预测值转化为概率
            y_pred = np.exp(y_pred) / np.expand_dims(np.sum(np.exp(y_pred), axis=1), axis=1)
            # 转化为预测标签
            y_pred = np.argmax(y_pred, axis=1)
        return y_pred



### GBDT分类树
class GBDTClassifier(GBDT):
    def __init__(self, n_estimators=300, learning_rate=.5,
                 min_samples_split=2, min_info_gain=1e-6, max_depth=2):
        super(GBDTClassifier,self).__init__(
              n_estimators=n_estimators,
              learning_rate=learning_rate,
              min_samples_split=min_samples_split,
              min_gini_impurity=min_info_gain,
              max_depth=max_depth,
              regression=False)
        


### GBDT回归树
class GBDTRegressor(GBDT):
    def __init__(self, n_estimators=300, learning_rate=0.1, min_samples_split=2,
                 min_var_reduction=1e-6, max_depth=3):
        super(GBDTRegressor, self).__init__(
              n_estimators=n_estimators,
              learning_rate=learning_rate,
              min_samples_split=min_samples_split,
              min_gini_impurity=min_var_reduction,
              max_depth=max_depth,
              regression=True)
        

### GBRT回归树
# 导入数据集模块
from sklearn import datasets
# 导入波士顿房价数据集
iris = datasets.load_iris()
# 加载数据
# X = iris.data
# 仅使用前两个特征
X = iris.data[:, :2]
y = iris.target

# 划分训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# 创建GBRT实例
model = GBDTRegressor()
# 模型训练
model.fit(X_train, y_train)
# 模型预测
y_pred = model.predict(X_test)
# 计算模型预测的均方误差
mse = mean_squared_error(y_test, y_pred)
print("Mean Squared Error of NumPy GBRT:", mse)



# 导入GradientBoostingRegressor模块
from sklearn.ensemble import GradientBoostingRegressor
# 创建模型实例
reg = GradientBoostingRegressor(n_estimators=300, learning_rate=0.1,
                                max_depth=3, random_state=0)
# 模型拟合
reg.fit(X_train, y_train)
# 模型预测
y_pred = reg.predict(X_test)
# 计算模型预测的均方误差
mse = mean_squared_error(y_test, y_pred)
print("Mean Squared Error of sklearn GBDT:", mse)

 

输出:

Mean Squared Error of NumPy GBRT: 0.24127739600520248
Mean Squared Error of sklearn GBDT: 0.20883609477073248
 
可以看到,我们的实现和sklearn的还是比较接近的。

  

 

注意里面也有学习率:

 本质上也就是迭代过程也是不断减少残差的过程!