[BIG2015] 2. 基于操作码序列和TextCNN分类

发布时间 2023-12-29 20:44:51作者: 巴啦啦胖魔仙

导入包:

import pandas as pd
import os
import numpy as np
import matplotlib.pyplot as plt
from sklearn.feature_extraction.text import CountVectorizer          #For Bag of words
from sklearn.feature_extraction.text import TfidfVectorizer          #For TF-IDF
from sklearn.model_selection import train_test_split
from catboost import CatBoostClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
from sklearn.preprocessing import LabelEncoder
from preprocessing.build_vocab import build_vocab
from sklearn.metrics import accuracy_score, precision_recall_fscore_support

import gensim
from gensim.models import Word2Vec     #For Word2Vec  
from gensim.corpora import Dictionary 

import os
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
from torch.utils.data import TensorDataset, DataLoader
from tqdm import tqdm

构建词表

读取数据:

dataset_folder = "../../benchmarks/BIG2015/"
data_path = "../../benchmarks/BIG2015/opcode/opcode.csv"
data_folder = "../../benchmarks/BIG2015/opcode/"

df = pd.read_csv(data_path)
# label
label = LabelEncoder().fit_transform(df['label'])
# data
data = df['opcode']
# 填充缺失值
df.fillna("")

image.png
构建词表:

  • 使用 gensim 的 Dictionary类处理语料,输入类似 [['add', 'push'], ...],需要先将单个文档(一段话)分割为列表。
  • 因为在整个训练数据集上构建词表,没有加 unknown标记,加了 pad用于填充单个文档到固定长度。
# # 构建词表
vocab_save_path = "../../benchmarks/BIG2015/word2id.json"
opcode_seq_list_split = [seq.split() for seq in opcode_seq_list]

dct = Dictionary(opcode_seq_list_split)
print(len(opcode_seq_list_split))# 10868
print(dct) # Dictionary<735 unique tokens: ['add', 'and', 'call', 'cmp', 'db']...>
print(len(list(dct.token2id.keys()))) # 735

special_tokens = {"pad": 0}
dct.patch_with_special_tokens(special_tokens)
print(len(list(dct.token2id.keys()))) # 736

语料库中文档的长度分布:

len_list = [len(seq) for seq in opcode_seq_list_split]
len_list

plt.hist(len_list)
# plt.xticks(range(0, 10000, 100)) 


print(f"小于 1000 的元素占比为: {(sum(1 for value in len_list if value < 1000) / len(len_list)) :.2f}")
print(f"小于 10000 的元素占比为: {sum(1 for value in len_list if value < 10000 )/len(len_list):.2f}")

image.png

以 json 格式 保存词汇表:

import json
with open(vocab_save_path, "w") as file:
    json.dump(dct.token2id, file, indent=4)

构建整数索引语料

使用doc2idx 将语料转换为整数索引语料,之后对每句话填充或者截断到固定长度。

int_opcode_list = [] # 保存整数索引语料
desired_size = 1000 # 序列长度
for opcode in opcode_seq_list_split:
    int_opcode = dct.doc2idx(opcode)
    if len(int_opcode) < 1000: # 文档长度小于1000, 用pad对应的整数索引填充到1000
        int_opcode += [0] * (desired_size - len(int_opcode)) # 填充
        int_opcode_list.append(int_opcode)
    elif len(int_opcode) >= 1000: # 文档长度大于1000, 截断到1000
        int_opcode_list.append(int_opcode[0:1000]) # 截断
print(len(int_opcode_list))

保存标签和转换后的数据:

np.save(os.path.join(data_folder, "opcode_int_top1000.npy"), np.array(int_opcode_list))
np.save(os.path.join("../../benchmarks/BIG2015/", "label.npy"), np.array(label))

构建 dataset 和 dataloader

读取保存的数据并划分训练集、验证集和测试集,数据划分比例为 4:4:2。

# 读取数据
data = np.load(os.path.join(data_folder, "opcode_int_top1000.npy"))
label = np.load(os.path.join(dataset_folder, "label.npy"))
print(data.shape, label.shape) # (12695, 1000) (10868,)

# 划分训练集/验证集和测试集
x_train, x_temp, y_train, y_temp = train_test_split(data, label, test_size=0.2, random_state=42) # 4:4:2
x_test, x_val, y_test, y_val = train_test_split(x_temp, y_temp, test_size=0.5, random_state=42)
print(x_train.shape, x_val.shape, x_test.shape)

构建 dataloader:

# dataloader
def build_dataloader(x_train=None, y_train=None, x_val=None, y_val=None, x_test=None, y_test=None):
    """获取dataloader
    """
    batch_size = 32
    y_train, y_test, y_val = y_train.reshape(-1), y_test.reshape(-1), y_val.reshape(-1)
    train_set = TensorDataset(torch.from_numpy(x_train), torch.from_numpy(y_train))
    train_loader = DataLoader(
        dataset=train_set,
        batch_size=batch_size,
        shuffle=True,
        num_workers=1,
    )
    val_set = TensorDataset(torch.from_numpy(x_val), torch.from_numpy(y_val))
    val_loader = DataLoader(
        dataset=val_set,
        shuffle=True,
        batch_size=batch_size,
        num_workers=1,
    )
    test_set = TensorDataset(torch.from_numpy(x_test), torch.from_numpy(y_test))
    test_loader = DataLoader(
        dataset=test_set,
        shuffle=True,
        batch_size=batch_size,
        num_workers=1,
    )
    return train_loader, val_loader, test_loader

构建训练函数和推理函数

在训练函数和推理函数中:

  • 得到所有预测结果后和真实标签计算 accuracy, precision, recall, f1
device = f'cuda:{0}' if torch.cuda.is_available()  else 'cpu' 
def train(epoch, model, train_loader, optimizer, criterion):
    """训练函数

    Args:
        epoch (_type_): _description_
        model (_type_): _description_
        train_loader (_type_): _description_
        optimizer (_type_): _description_
        criterion (_type_): _description_

    Returns:
        _type_: _description_
    """
    model.train()
    
    total_samples = 0
    total_accuracy = 0
    train_loss = 0
    predictions_all = []
    labels_all = []
    for batch, labels in tqdm(train_loader, ncols=100, desc=f"epoch: {epoch},   training"):
        batch, labels = batch.to(device), labels.to(device)
        optimizer.zero_grad()
        
        outputs = model(batch)
        loss = criterion(outputs, labels)
        # print(outputs, labels)
        loss.backward()
        optimizer.step()
        
        train_loss += loss * batch.size(0)
        _, predictions = outputs.max(1)
        # accuracy = predictions.eq(labels).sum().item()
        # total_accuracy += accuracy
        # 收集所有预测和标签以便后续计算
        predictions_all.extend(predictions.cpu().numpy())
        labels_all.extend(labels.cpu().numpy())
        
        total_samples += labels.size(0)
       
    # accuracy = total_accuracy  / total_samples
    accuracy = accuracy_score(predictions_all, labels_all)
    train_loss = train_loss / total_samples
    precision, recall, f1, _ = precision_recall_fscore_support(labels_all, predictions_all, average='macro', zero_division=0)
    return train_loss, accuracy, precision, recall, f1  
def val(epoch, model, val_loader, criterion):
    model.eval()
    
    total_samples = 0
    total_accuracy = 0
    val_loss = 0
    predictions_all = []
    labels_all = []
    with torch.no_grad():
        for batch, labels in tqdm(val_loader, desc=f"epoch: {epoch}, validating", ncols=100):
            batch, labels = batch.to(device), labels.to(device)
            
            outputs = model(batch)
            loss = criterion(outputs, labels)
            
            val_loss += loss * batch.size(0)
            _, predictions = outputs.max(1)
            # accuracy = predictions.eq(labels).sum().item()
            # total_accuracy += accuracy
            # 收集所有预测和标签以便后续计算
            predictions_all.extend(predictions.cpu().numpy())
            labels_all.extend(labels.cpu().numpy())
            
            total_samples += labels.size(0)
            
        # accuracy = total_accuracy  / total_samples
        accuracy = accuracy_score(predictions_all, labels_all)
        val_loss = val_loss / total_samples
        precision, recall, f1, _ = precision_recall_fscore_support(labels_all, predictions_all, average='macro', zero_division=0)
        
        return val_loss,accuracy, precision, recall, f1

构建 TextCNN 模型:

import torch
import torch.nn as nn
import torch.nn.functional as F
class TextCNN(nn.Module):
    def __init__(self, vocab_size, embedding_dim, num_filters, filter_sizes, output_dim, dropout):
        super(TextCNN, self).__init__()
        
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        
        self.convs = nn.ModuleList([
            nn.Conv1d(in_channels=embedding_dim, 
                      out_channels=num_filters, 
                      kernel_size=fs)
            for fs in filter_sizes
        ])
        
        self.fc = nn.Linear(len(filter_sizes) * num_filters, output_dim)
        
        self.dropout = nn.Dropout(dropout)

    def forward(self, text):
        # text.shape = [batch_size, seq_len]

        embedded = self.embedding(text)
        # embedded.shape = [batch_size, seq_len, embedding_dim]

        embedded = embedded.permute(0, 2, 1)
        # embedded.shape = [batch_size, embedding_dim, seq_len]

        conved = [F.relu(conv(embedded)) for conv in self.convs]
        # conved[i].shape = [batch_size, num_filters, *]

        pooled = [F.max_pool1d(conv, conv.shape[2]).squeeze(2) for conv in conved]
        # pooled[i].shape = [batch_size, num_filters]

        cat = self.dropout(torch.cat(pooled, dim=1))
        # cat.shape = [batch_size, len(filter_sizes) * num_filters]

        output = self.fc(cat)
        # output.shape = [batch_size, output_dim]

        return output

画图函数:

def plot_data(save_path, list1, list2, label="Loss"):
    # 创建一个数组,表示每个epoch的位置
    x = range(1, len(list1) + 1)

    # 创建一个折线图,使用epoch作为x轴,损失作为y轴
    plt.plot(x, list1, label=f'Train {label}') # 绘制训练损失曲线,添加标签
    plt.plot(x, list2, label=f'Validate {label}') # 绘制验证损失曲线,添加标签
    plt.legend() # 显示图例
    plt.title(f'{label} Curve') # 设置标题
    plt.xlabel('Epoch') # 设置x轴标签
    plt.ylabel(f'{label}') # 设置y轴标签
    # 保存图形到本地文件
    plt.savefig(os.path.join(save_path, f'{label}_curve.png'), format='png') # 指定文件名和格式
    # 显示图形
    plt.show()
    plt.close()

训练、推理和结果分析

训练和验证代码:

  • 使用 GPU 的情况下,训练时间较短,在训练完成后可以加上推理代码。
epochs = 10
output_dir = "../outputs/big2015/"
vocab_size = 736
embedding_dim = 128
num_filters = 300  # 设定卷积核数量为 300
filter_sizes = [3, 4, 5, 6]  # 三个不同大小的卷积核: 2, 3, 4
output_dim = 9  # 输出类别数为 5,可以根据你的任务设定
dropout = 0.2  # 设定 Dropout 概率为 0.5,可以根据需求调整

# data
train_loader, val_loader, test_loader = build_dataloader(x_train, y_train, x_val, y_val, x_test, y_test)

# model
model = TextCNN(vocab_size, embedding_dim, num_filters, filter_sizes, output_dim, dropout)
model.to(device)
print(model)

# loss
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# train
train_loss_list, train_acc_list, val_loss_list, val_acc_list = [],[],[],[]
best_epoch = 0
best_val_loss = float('inf')
best_val_acc= float('inf')
patience = 5
for epoch in range(1, epochs):
    train_loss, train_acc, train_precision, train_recall, train_f1 = train(epoch, model, train_loader, optimizer, criterion)  
    print('Epoch: {}, Train Loss: {:.4f}, Train Acc: {:.2f}%, Train Precision: {:.2f}%, Train Recall: {:.2f}%, Train F1: {:.2f}%'.format((epoch), train_loss, 100*train_acc, 100*train_precision, 100*train_recall, 100*train_f1))
    
    val_loss, val_acc, val_precision, val_recall, val_f1  = val(epoch, model, val_loader, criterion)
    print('Epoch: {}, Val Loss: {:.4f}, Val Acc: {:.2f}%, Val Precision: {:.2f}%, Val Recall: {:.2f}%, Val F1: {:.2f}%, (Best Val Acc: {:.2f}%)'.format(epoch, val_loss, 100*val_acc, 100*val_precision, 100*val_recall, 100*val_f1, 100*best_val_acc))
    # 检查是否进行早停
    if best_val_loss > val_loss :
        best_epoch = epoch
        # ! 根据最小loss记录对应的acc作为best_val_acc
        best_val_acc = val_acc
        best_val_loss = val_loss
        wait = 0  # 重置等待次数
        
        # 保存模型
        save_dir = os.path.join(output_dir, "test")
        os.makedirs(save_dir, exist_ok=True)
        print(f"--> save model success: {save_dir}")
        torch.save(model.state_dict(), os.path.join(save_dir, "textcnn.pth"))
    else:
        wait += 1  # 没有改善,等待次数加1
        if wait >= patience:
            print(f'Early stopping at epoch {epoch}...')
            # exit(0)  # 达到等待次数上限,停止训练

    print(f"--> Best Epoch: {best_epoch}, Best Val Acc: {best_val_acc}, Best Val Loss: {best_val_loss}")
    train_loss_list.append(train_loss.cpu().detach().numpy())
    train_acc_list.append(train_acc)
    val_loss_list.append(val_loss.cpu().detach().numpy())
    val_acc_list.append(val_acc)

    print("-------------------------------------------------")
plot_data(os.path.join(output_dir, "test"), train_acc_list, val_acc_list, label="Acc")
plot_data(os.path.join(output_dir, "test"), train_loss_list, val_loss_list, label='Loss')

image.png

image.png

  • 第 7 个 epoch 开始有过拟合倾向,但总体过拟合不严重。
  • 限制长度为 1000,使用 TextCNN,验证精度可以达到 97+。
  • 提取操作码时,没有去除 dd dw这类数据定义指令,部分操作码序列长度很长,后续可以考虑去掉这类指令,并合并处理语义相近的指令,可以进一步缩小序列长度,实验效果是否能有提升需要再验证。
  • 一个非常有挑战性的问题,指令的参数是否能帮助检测/分类任务。