机器学习算法原理实现——随机森林,核心是就是行列抽样,可以并行

发布时间 2023-09-20 00:06:04作者: bonelee

记得参考之前的文章

机器学习算法原理实现——cart决策树:分类&回归

 

随机森林算法训练步骤:

代码实现(决策树复用了之前的深度剪枝实现):

# 导入numpy库
import numpy as np
from sklearn.metrics import accuracy_score

class TreeNode:
    def __init__(self, gini, num_samples, num_samples_per_class, predicted_class):
        self.gini = gini
        self.num_samples = num_samples
        self.num_samples_per_class = num_samples_per_class
        self.predicted_class = predicted_class
        self.feature_index = 0
        self.threshold = 0
        self.left = None
        self.right = None
 
def gini(y):
    m = len(y)
    return 1.0 - sum([(np.sum(y == c) / m) ** 2 for c in np.unique(y)])
 
def grow_tree(X, y, depth=0, max_depth=None):
    classes = np.unique(y)
    num_samples_per_class = [np.sum(y == c) for c in classes]
    predicted_class = classes[np.argmax(num_samples_per_class)]
    node = TreeNode(
        gini=gini(y),
        num_samples=len(y),
        num_samples_per_class=num_samples_per_class,
        predicted_class=predicted_class,
    )
 
    if depth < max_depth:
        idx, thr = best_split(X, y)
        if idx is not None:
            indices_left = X[:, idx] < thr
            X_left, y_left = X[indices_left], y[indices_left]
            X_right, y_right = X[~indices_left], y[~indices_left]
            node.feature_index = idx
            node.threshold = thr
            node.left = grow_tree(X_left, y_left, depth + 1, max_depth)
            node.right = grow_tree(X_right, y_right, depth + 1, max_depth)
    return node
 
 
def best_split(X, y):
    """
    用numpy实现best_split,见下,可以先看不用numpy的实现
    """
    n_samples, n_features = X.shape
     
    if len(np.unique(y)) == 1:
        return None, None
     
    best = {}
    min_gini = float('inf')
     
    for feature_idx in range(n_features):
        thresholds = np.unique(X[:, feature_idx])
        for threshold in thresholds:
            left_mask = X[:, feature_idx] < threshold
            right_mask = ~left_mask
             
            gini_left = gini(y[left_mask])
            gini_right = gini(y[right_mask])
             
            weighted_gini = len(y[left_mask]) / n_samples * gini_left + len(y[right_mask]) / n_samples * gini_right
            if weighted_gini < min_gini:
                best = {
                    'feature_index': feature_idx,
                    'threshold': threshold,
                    'left_labels': y[left_mask],
                    'right_labels': y[right_mask],
                    'gini': weighted_gini
                }
                min_gini = weighted_gini
                 
    return best['feature_index'], best['threshold']
 
 
def best_split2(X, y):
    """
    不用numpy实现best_split
    """
    n_samples, n_features = len(X), len(X[0])
     
    # 如果样本中只有一种输出标签或样本为空,则返回None
    if len(set(y)) == 1:
        return None, None
     
    # 初始化最佳分割的信息
    best = {}
    min_gini = float('inf')
     
    # 遍历每个特征
    for feature_idx in range(n_features):
        # 获取当前特征的所有唯一值,并排序
        unique_values = sorted(set(row[feature_idx] for row in X))
         
        # 遍历每个唯一值,考虑将其作为分割阈值
        for value in unique_values:
            left_y, right_y = [], []
             
            # 对于每个样本,根据其特征值与阈值的关系分到左子集或右子集
            for i, row in enumerate(X):
                if row[feature_idx] < value:
                    left_y.append(y[i])
                else:
                    right_y.append(y[i])
             
            # 计算左子集和右子集的基尼指数
            gini_left = 1.0 - sum([(left_y.count(label) / len(left_y)) ** 2 for label in set(left_y)])
            gini_right = 1.0 - sum([(right_y.count(label) / len(right_y)) ** 2 for label in set(right_y)])
             
            # 计算加权基尼指数
            weighted_gini = len(left_y) / len(y) * gini_left + len(right_y) / len(y) * gini_right
             
            # 如果当前基尼值小于已知的最小基尼值,更新最佳分割
            if weighted_gini < min_gini:
                best = {
                    'feature_index': feature_idx,
                    'threshold': value,
                    'left_labels': left_y,
                    'right_labels': right_y,
                    'gini': weighted_gini
                }
                min_gini = weighted_gini
                 
    return best['feature_index'], best['threshold']
 
def predict_tree(node, X):
    if node.left is None and node.right is None:
        return node.predicted_class
    if X[node.feature_index] < node.threshold:
        return predict_tree(node.left, X)
    else:
        return predict_tree(node.right, X)
 
def predict_tree2(node, X):
    if node.left is None and node.right is None:
        return node.predicted_class
    if X[node.feature_index] < node.threshold:
        return predict_tree(node.left, X)
    else:
        return predict_tree(node.right, X)
 
class CARTClassifier:
    def __init__(self, max_depth=None):
        self.max_depth = max_depth
 
    def fit(self, X, y):
        self.tree_ = grow_tree(X, y, max_depth=self.max_depth)
 
    def predict(self, X):
        return [predict_tree(self.tree_, x) for x in X]

### 定义随机森林类
class RandomForest:
    def __init__(self, n_estimators=100, max_depth=float("inf"), max_features=None):
        # 树的棵数
        self.n_estimators = n_estimators
        # 树最大深度
        self.max_depth = max_depth
        # 所使用最大特征数
        self.max_features = max_features
        self.trees = []
        # 基于决策树构建森林
        for _ in range(self.n_estimators):
            tree = CARTClassifier(max_depth = self.max_depth)
            self.trees.append(tree)

    # 自助抽样
    def bootstrap_sampling(self, X, y):
        X_y = np.concatenate([X, y.reshape(-1,1)], axis=1)
        np.random.shuffle(X_y)
        n_samples = X.shape[0]
        sampling_subsets = []
        for _ in range(self.n_estimators):
            # 第一个随机性,行抽样, 从 0到n_samples-1 的整数中,有放回地随机抽取 n_samples 个样本
            idx1 = np.random.choice(n_samples, n_samples, replace=True)
            bootstrap_Xy = X_y[idx1, :]
            bootstrap_X = bootstrap_Xy[:, :-1] # 最后一列是类别,前面是数据
            bootstrap_y = bootstrap_Xy[:, -1] # 类别
            sampling_subsets.append([bootstrap_X, bootstrap_y])
        return sampling_subsets

    # 随机森林训练
    def fit(self, X, y):
        # 对森林中每棵树训练一个双随机抽样子集
        sub_sets = self.bootstrap_sampling(X, y)
        n_features = X.shape[1]
        # 设置max_feature
        if self.max_features == None:
            self.max_features = int(np.sqrt(n_features))
        for i in range(self.n_estimators):
            # 第二个随机性,列抽样
            sub_X, sub_y = sub_sets[i]
            idx2 = np.random.choice(n_features, self.max_features, replace=True)
            sub_X = sub_X[:, idx2]
            self.trees[i].fit(sub_X, sub_y)
            # 保存每次列抽样的列索引,方便预测时每棵树调用
            self.trees[i].feature_indices = idx2
            print('The {}th tree is trained done...'.format(i+1))

    # 随机森林预测
    def predict(self, X):
        # 初始化预测结果列表
        y_preds = []
        # 遍历预测
        for i in range(self.n_estimators):
            idx = self.trees[i].feature_indices
            sub_X = X[:, idx]
            y_pred = self.trees[i].predict(sub_X)
            y_preds.append(y_pred)
        # 对分类结果进行集成
        y_preds = np.array(y_preds).T
        res = []
        # 取多数类为预测类
        for j in y_preds:
            res.append(np.bincount(j.astype('int')).argmax())
        return res
    
# 导入相关模块
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
# 生成模拟二分类数据集
X, y = make_classification(n_samples=1000,n_features=20,
                           n_redundant=0, n_informative=2,random_state=1,
                           n_clusters_per_class=1)
rng = np.random.RandomState(2)
X += 2 * rng.uniform(size=X.shape)
# 划分数据集
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3)
# 创建随机森林模型实例
rf = RandomForest(n_estimators=10, max_features=15)
# 模型训练
rf.fit(X_train, y_train)
# 模型预测
y_pred = rf.predict(X_test)
acc = accuracy_score(y_test, y_pred)
# 输出分类准确率
print ("Accuracy of NumPy Random Forest:", acc)


# 导入随机森林分类器
from sklearn.ensemble import RandomForestClassifier
# 创建随机森林分类器实例
clf = RandomForestClassifier(max_depth=3, random_state=0)
# 模型拟合
clf.fit(X_train, y_train)
# 预测
y_pred = clf.predict(X_test)
acc = accuracy_score(y_test, y_pred)
# 输出分类准确率
print ("Accuracy of sklearn Random Forest:", acc)

 

输出:

The 1th tree is trained done...
The 2th tree is trained done...
The 3th tree is trained done...
The 4th tree is trained done...
The 5th tree is trained done...
The 6th tree is trained done...
The 7th tree is trained done...
The 8th tree is trained done...
The 9th tree is trained done...
The 10th tree is trained done...
Accuracy of NumPy Random Forest: 0.7666666666666667
Accuracy of sklearn Random Forest: 0.7933333333333333

 

至于精度和sklearn库的实现还是有差别!