BIG 2015

发布时间 2023-12-24 23:49:26作者: 巴啦啦胖魔仙

有感于目前介绍深度学习+恶意软件检测的 blog 较少,本专栏旨在分享笔者在读论文和做实验的一些过程和想法。
本文介绍Microsoft Malware Classification Challenge (BIG 2015) 数据集和数据预处理过程。

背景介绍

Microsoft Malware Classification Challenge (BIG 2015) :2015 年微软发布的竞赛数据集,包含以下九个类别:

  1. Ramnit
  2. Lollipop
  3. Kelihos_ver3
  4. Vundo
  5. Simda
  6. Tracur
  7. Kelihos_ver1
  8. Obfuscator.ACY
  9. Gatak

对于每个恶意软件样本,提供一个.bytes 文件(二进制内容的十六进制表示形式)和.asm 文件(反汇编得到的文件)。训练数据中包含 10868 个样本,提供标签,测试数据不直接提供标签,需要注意完整数据集解压500GB+。
.asm 文件示例:
image.png
.bytes 文件示例:
image.png
训练数据样本分布情况:

类别 样本数量
Ramnit 1541
Lollipop 2478
Kelihos_ver3 2942
Vundo 475
Simda 42
Tracur 751
Kelihos_ver1 398
Obfuscator.ACY 1228
Gatak 1013

以下实验仅使用训练集。

开发环境

ubuntu 20.04

  • python 3.11.5
  • pandas 2.1.4
  • numpy 1.26.2
  • scikit-learn 1.2.2

数据预处理

构建数据集

数据与标签:

只用带有标签的 train.7z 文件,解压后的.bytes 和.asm 文件混杂在一个文件夹下:
目录结构为:

  • benchmarks
    • BIG2015
      • .asm
      • .bytes
  • codes
    • big2015

image.png
将 .asm 文件 划分到 ASMs 文件夹下,.bytes 文件划分到 PEs 文件夹下:


import os
import shutil


if __name__ == "__main__":
    # 指定目录路径
    directory = r'E:\benchmarks\BIG2015'

    # 遍历目录下的所有文件
    for filename in os.listdir(directory):
        if filename.endswith('.bytes'):
            source = os.path.join(directory, filename)
            destination_folder = os.path.join(directory, 'PEs')
            # 如果PEs文件夹不存在则创建
            if not os.path.exists(destination_folder):
                os.makedirs(destination_folder)
            # 移动.bytes文件到PEs文件夹
            shutil.move(source, os.path.join(destination_folder, filename))
        elif filename.endswith('.asm'):
            source = os.path.join(directory, filename)
            destination_folder = os.path.join(directory, 'ASMs')
            # 如果ASMs文件夹不存在则创建
            if not os.path.exists(destination_folder):
                os.makedirs(destination_folder)
            # 移动.asm文件到ASMs文件夹
            shutil.move(source, os.path.join(destination_folder, filename))

在ASMs 文件夹和 PEs 文件夹下的文件按照类别分类放入对应文件夹:

'''
同一类别的.bytes和asm文件放到同一个文件夹中
'''
import pandas as pd
import os
from tqdm import tqdm
import shutil

if __name__ == "__main__":
    # 设定特征和标签列
    data = pd.read_csv("../../../benchmarks/BIG2015/trainLabels.csv")

    categories = [0, "Ramnit", "Lollipop", "Kelihos_ver3", "Vundo", "Simda", "Tracur", "Kelihos_ver1", "Obfuscator.ACY", "Gatak"]
    
    asm_folder = r"E:\benchmarks\BIG2015\ASMs\\"
    for category in categories[1:]:
        category_folder = os.path.join(asm_folder, str(category))
        if not os.path.exists(category_folder):
            os.makedirs(category_folder)
    for index, row in tqdm(data.iterrows()):
        file_id = row['Id']
        file_class = row['Class']
        
        source_file = os.path.join(asm_folder, f"{file_id}.asm")  # 替换为实际的文件扩展名
        destination_folder = os.path.join(asm_folder, str(categories[file_class]))
        
        # 如果文件存在,则移动到对应的类别文件夹
        if os.path.exists(source_file) and os.path.exists(destination_folder):
            # print(os.path.join(destination_folder, f"{file_id}.bytes"))
            shutil.move(source_file, os.path.join(destination_folder, f"{file_id}.asm"))

以上两步操作可以各自封装为函数,处理后的目录结构:
image.png
本文整理的目录结构为:

- benchmarks
	- BIG2015
    	- ASMs
        - opcode
    	- PEs
- codes
	- big2015
    	- features
        	- opcode_features.py
    	- preprocessing
        	- move_file.py
        	- class_split.py
    	- extract_opcode.py
    	- tfidf_classification.ipynb

提取操作码

定义用于特征提取的父类,参考自 Ember 的代码:

class FeatureType(object):
    ''' Base class from which each feature type may inherit '''

    name = ''
    dim = 0

    def __repr__(self):
        return '{}({})'.format(self.name, self.dim)

    def raw_features(self, bytez):
        ''' Generate a JSON-able representation of the file '''
        raise (NotImplementedError)

    def process_raw_features(self, raw_obj):
        ''' Generate a feature vector from the raw features '''
        raise (NotImplementedError)

    def feature_vector(self, bytez):
        ''' Directly calculate the feature vector from the sample itself. This should only be implemented differently
        if there are significant speedups to be gained from combining the two functions. '''
        return self.process_raw_features(self.raw_features(bytez))

以下定义OpcodeInfo类用于从.asm 文件中提取操作码:

class OpcodeInfo(FeatureType):
    name_tfidf = 'ins'
    """从ida pro反汇编的程序中提取操作码

    Returns:
        _type_: _description_
    """

    # dim = 0

    def __init__(self):
        super(FeatureType, self).__init__()

    def raw_features(self, bytez):
        """
        Get opcode sequence from .asm file
        :param filename: The name of .asm file
        :return: The opcode sequence(list)
        reference: https://github.com/dagrons/MalwareClassification/blob/master/asm/create_data/get_opcode.py
        """
        # Save opcode sequences in string arrays
        opcode_seq = []
        # Use regular expression to get all opcodes and combine opcodes into sequences
        p = re.compile(r'\s([a-fA-F0-9]{2}\s)+\s*([a-z]+)')
        for line in bytez:
            # If a line starts with .text, it is a assembly instruction line
            if line.startswith(".text"):
                m = re.findall(p, line)
                if m:
                    opc = m[0][1]
                    if opc != "align":
                        opcode_seq.append(opc)
        return opcode_seq

raw_features函数先判断是否进入了代码块(.text)的范围,再用正则表达式提取操作码部分,最后的返回值是一个包含操作码序列的列表,列表中每一个元素是一个操作码。
接下来使用上述定义的 OpcodeInfo类遍历类别文件夹提取特征, 将从.asm 文件中提取操作码保存为 csv 文件。
extract_opcode.py文件中 定义 extract_opcode_as_df函数:

def extract_opcode_as_df(parent_folder, save_folder, extractor):
    df = pd.DataFrame()
    # 获取父文件夹中的所有子文件夹
    subfolders = [f.path for f in os.scandir(parent_folder) if f.is_dir()]
    print(subfolders)
    file_name_list, opcode_list, label_list = [], [], []
    for folder in tqdm(subfolders, ncols=100):
        os.makedirs(save_folder, exist_ok=True)
        class_name = os.path.basename(folder)

        # 获取当前子文件夹中的所有文件
        files = [f for f in os.listdir(folder) if os.path.isfile(os.path.join(folder, f))]
        # print(files)

        # 处理当前子文件夹中的文件
        for file in tqdm(files, ncols=100):
            file_path = os.path.join(folder, file)
            
            with open(file_path, "r", encoding='ascii', errors="ignore") as fp:
                opcode = extractor.raw_features(fp)
                opcode_line = ' '.join(opcode) # 字符串
                # opcode_list = opcode_line.split()
            file_name_list.append(file)
            opcode_list.append(opcode_line)
            label_list.append(class_name)
    print(len(file_name_list), len(opcode_list), len(label_list))
    df = pd.DataFrame({
        "file_name": file_name_list,
        "opcode": opcode_list,
        "label": label_list
    }) 
    
    save_file_path = os.path.join(save_folder, "opcode.csv")
    df.to_csv(save_file_path)
    print(f"df save success: {save_file_path}")
    return save_file_path

if __name__ == "__main__":
    ############################################################ BIG2015
    # 以下代码是将提取的操作码序列保存为dataframe
    target_folder = "../../benchmarks/BIG2015/ASMs/"
    save_folder = "../../benchmarks/BIG2015/opcode/"
    os.makedirs(save_folder, exist_ok=True)
    opcode_ex = OpcodeInfo()
    # 提取操作码序列
    extract_opcode_as_df(parent_folder = os.path.join(target_folder, ""), 
                save_folder = os.path.join(save_folder, ""), extractor=opcode_ex)

其中:

  • opcode = extractor.raw_features(fp) 返回操作码列表,再把列表拼成字符串 opcode_line,方便存入 dataframe。
  • 用三个列表分别保存文件名、操作码序列和标签,再转换为 dataframe,保存为 csv。
  • label 列保存的是对应的类名,而不是整数索引,故而在训练模型和推理时需要将其转换为整数索引。

image.png

构建分类模型

CatBoost,与 XGBoost 类似,属于一类梯度提升树模型。能够处理分类变量(离散特征)而无需事先进行数据预处理,从经验上看CatBoost 的过拟合倾向会小些,同时训练时间更短。
以下介绍三种特征构建方法:

  1. 直接计算每种操作码出现的频率作为特征(Count)。
  2. 计算操作码的 tf-idf 值作为特征(Tfidf)
  3. ngram 和 tf-idf 的结合(N-gram + Tf-idf)

import:

import pandas as pd
import os
import numpy as np
import matplotlib.pyplot as plt
from sklearn.feature_extraction.text import CountVectorizer          #For Bag of words
from sklearn.feature_extraction.text import TfidfVectorizer          #For TF-IDF
# from sklearn.model_selection import train_test_split
# from gensim.models import Word2Vec                                   #For Word2Vec
from catboost import CatBoostClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
from sklearn.preprocessing import LabelEncoder

Count + Catboost

计算每种操作码在样本中出现的频率作为特征,也被称为操作码直方图。
读入数据:

  • 调用 LabelEncoder将原本的类名标签转换为整数标签。
  • 调用 train_test_split 以 8:2 的比例划分训练集和测试集。
data_folder = "../../benchmarks/BIG2015/opcode/opcode.csv"
df = pd.read_csv(data_folder)
# label
label = LabelEncoder().fit_transform(df['label'])
# data
data = df['opcode']
# 填充缺失值
data.fillna("", inplace=True)

x_train, x_test, y_train, y_test = train_test_split(data, label, test_size=0.2, random_state=42)

image.png
数据预处理和训练分类模型:

  • CountVectorizer(max_features=100, lowercase=True)
    • 限制词表大小为 100,保留出现频率最高的前 100 个词。
    • 此外 binary=True表示二进制词袋,该词出现过标记为 1,未出现过标记为 0.
    • fit_transformtransform处理数据返回到结果是一个稀疏矩阵。
  • CatBoostClassifier(iterations=100, learning_rate=0.001, loss_function='MultiClass', verbose=True, random_seed=42)
    • 定义 catboost 分类器,多分类损失,训练过程可视化等。
################ CountVectorizer 计算词频分类
# 训练集词表大小为682
# CountVectorizer: max_features=100 限制词表大小为100, binary=True 时表示二进制词袋
count_vect = CountVectorizer(max_features=100, lowercase=True) 
# 拟合和转换训练数据
x_train_trans  = count_vect.fit_transform(x_train)
# 根据在训练数据上拟合得到的词典计算和转换测试数据
x_test_trans = count_vect.transform(x_test)
# 分类器和预测
clf = CatBoostClassifier(iterations=100, learning_rate=0.001, loss_function='MultiClass', verbose=True, random_seed=42) # verbose显示训练进度
clf.fit(x_train_trans, y_train)
predictions = clf.predict(x_test_trans)
accuracy = accuracy_score(predictions, y_test)
print(f"Accuracy: {accuracy}") # Accuracy: 0.9158233670653174

训练过程和推理结果如下:
image.png
此外可以查看构建的词表等:

dense_matrix = x_train_trans.toarray() # 转换为稠密矩阵查看
print(dense_matrix)
print(len(count_vect.get_feature_names_out())) # 查看词表

# [[  1  47   4 ...   0   0  25]
#  [  0 183 134 ...   0   0 131]
#  [  0 227 145 ...   0   1 234]
#  ...
#  [  9 119   8 ...   0   1  12]
#  [ 10 335 103 ...   0   6 455]
#  [  0 108  13 ...   0  15  63]]
# 100

查看词表中前 10 个词在所有样本中出现的频率,以下结果显示词表是按照字母顺序排序:

# 计算每个词语的频率总和
word_frequencies = dense_matrix.sum(axis=0)
feature_name = count_vect.get_feature_names_out()[0:10]
# 绘制词频直方图
plt.figure(figsize=(10, 6))
plt.bar(range(len(feature_name)), word_frequencies[0:10], tick_label=feature_name)
plt.xticks(rotation=90)
plt.xlabel('Words')
plt.ylabel('Frequency')
plt.title('Word Frequency Histogram')
plt.show()

image.png

Tfidf + Catboost

上一步统计词表中各种词出现的频率作为特征,但需要考虑到词出现越多次并不意味着就越重要,就像文章中的"the", "a"这样的词出现频率高,却并不能反映文章主题。
TF-IDF 可以有效地衡量文本中具有区分度高的词语的重要性。例如,在文本中出现频率很高,但在所有文章中都出现频率很高的词语,其重要性通常不高。TF-IDF 可以通过考虑文档频率来消除这种影响。

  1. 计算词频(TF):词在文章中出现的频率/词在所有文章中出现的频率
  2. 计算逆文档频率(IDF):log(总文章数/(该词出现的文章数+1))
  3. 计算 TF-IDF: TF-IDF = 词频(TF) * 逆文档频率(IDF)

TF-IDF 可以视为一种对词的选择方法,对比词频,以更合理的方式衡量词在文档中的重要性。

######### TfidfVectorizer
tf_idf = TfidfVectorizer(max_features=100, lowercase=True)
# 拟合和转换训练数据
x_train_trans = tf_idf.fit_transform(x_train)
# 根据在训练数据上拟合得到的词典计算和转换测试数据
x_test_trans = tf_idf.transform(x_test)

clf = CatBoostClassifier(iterations=1000, learning_rate=0.001, loss_function='MultiClass', verbose=True, random_seed=42) # verbose显示训练进度
clf.fit(x_train_trans, y_train)
predictions = clf.predict(x_test_trans)

# accuracy = sum(predictions.flatten() == y_test) / len(y_test)
# 计算精度
accuracy = accuracy_score(predictions, y_test)
print(f"Accuracy: {accuracy}") # Accuracy: 0.9581416743330267
print(tf_idf.vocabulary_) # 查看此表

输出结果如下:

{'pop': 69, 'xor': 99, 'call': 4, 'push': 74, 'rep': 77, 'dw': 13, 'mov': 51, 'sub': 94, 'lea': 48, 'cmp': 8, 'jbe': 35, 'add': 1, 'retn': 80, 'and': 2, 'adc': 0, 'or': 66, 'pusha': 75, 'jb': 34, 'jg': 36, 'popa': 70, 'sbb': 85, 'test': 95, 'jnz': 43, 'dec': 11, 'jmp': 40, 'jz': 47, 'not': 63, 'movzx': 58, 'imul': 31, 'shl': 89, 'neg': 61, 'nop': 62, 'sar': 84, 'mul': 59, 'shr': 90, 'ror': 82, 'div': 12, 'inc': 32, 'rol': 81, 'ja': 33, 'jnb': 41, 'fld': 22, 'fstp': 27, 'pushf': 76, 'popf': 71, 'setnle': 86, 'jl': 38, 'leave': 49, 'db': 9, 'stosd': 93, 'movsd': 55, 'cdq': 5, 'jns': 42, 'jge': 37, 'offset': 65, 'dd': 10, 'jle': 39, 'xchg': 98, 'setz': 88, 'setnz': 87, 'movsx': 57, 'fcom': 18, 'fnstsw': 25, 'jp': 45, 'js': 46, 'dword': 14, 'word': 97, 'std': 92, 'cld': 7, 'idiv': 30, 'fldz': 23, 'fxch': 29, 'byte': 3, 'retf': 79, 'rva': 83, 'off': 64, 'unicode': 96, 'fsub': 28, 'fcomp': 19, 'jo': 44, 'fdiv': 20, 'fild': 21, 'fmul': 24, 'fadd': 15, 'stc': 91, 'lock': 50, 'faddp': 16, 'fst': 26, 'fchs': 17, 'clc': 6, 'repe': 78, 'movq': 54, 'movd': 52, 'paddd': 67, 'movdqa': 53, 'paddw': 68, 'psubw': 72, 'punpcklbw': 73, 'movss': 56, 'mulss': 60}

N-gram + Tf-idf + Catboost

n-gram(n 元语法),可以看作是一种文本分词方法,对于"I love you"这句话“:

  • Unigram: {I,love,you}
  • Bi-gram:{(I,love),(love,you)}
  • Tri-gram:{(I,love,you)}

n-gram 基于马尔可夫链假设,用马尔可夫链假设解释:

  • Unigram:每个词出现的概率独立,与其他词无关。
  • Bi-gram:一个词出现的概率与它其前一个词相关。
  • Tri-gram:一个词出现的概率与它其前两个词相关。

基于马尔可夫链假设,可以加大简化概率计算。假设有一个由 m 个词组成的序列\(w_1,w_2, ..., w_m\),则该序列出现的概率可记为\(P(w_1, w_2, w_3, ..., w_{m})\),转换为条件概率计算:
\(p(w_1,w_2,.·,w_m)=p(w_1)*p(w_2|w_1)*p(w_3|w_1,w_2)...*p(w_m|w_1,..,w_{m-1})\)
引入马尔可夫链假设:

  • Unigram,每个词出现的概率独立,与其他词无关:\(p(w_1,w_2,.·,w_n)=p(w_1)*p(w_2)*p(w_3)*...p(w_m)\)
  • Bi-gram,每个词出现的概率与前一个词有关:\(p(w_1,w_2,.·,w_n)=p(w_1)*p(w_2|w_1)*p(w_3|w_2)...*p(w_m|w_{m-1})\)'
  • Tri-gram,每个词出现的概率与它其前两个词相关:\(p(w_1,w_2,.·,w_n)=p(w_1)*p(w_2|w_1)*p(w_3|w_1,w_2)...*p(w_m|w_{m-2},w_{m-1})\)

之后概率计算转换为频率计算:
image.png
\(p(w_m|w_{m-1})\)为序列 \((w_{m-1}, w_m)\)在句子中出现的频率除以词\(w_{m-1}\)在句子中出现的频率。

######### TfidfVectorizer
# 下面这段代码运行时间较久
tf_idf = TfidfVectorizer(max_features=100, lowercase=True, ngram_range=(1, 3))
x_train_trans = tf_idf.fit_transform(x_train)
x_test_trans = tf_idf.transform(x_test)

clf = CatBoostClassifier(iterations=1000, learning_rate=0.001, loss_function='MultiClass', verbose=True, random_seed=42) # verbose显示训练进度
clf.fit(x_train_trans, y_train)
predictions = clf.predict(x_test_trans)

# accuracy = sum(predictions.flatten() == y_test) / len(y_test)
accuracy = accuracy_score(predictions, y_test)
print(f"Accuracy: {accuracy}") # Accuracy: 0.9411223551057958
print(tf_idf.vocabulary_)

输出词表如下:

{'pop': 68, 'xor': 97, 'call': 3, 'push': 72, 'mov': 33, 'sub': 92, 'lea': 30, 'cmp': 10, 'add': 0, 'retn': 86, 'and': 2, 'or': 67, 'test': 94, 'jnz': 26, 'call push': 8, 'mov push': 47, 'push mov': 78, 'mov sub': 52, 'push push': 82, 'push call': 73, 'call add': 4, 'call pop': 7, 'pop retn': 71, 'retn mov': 87, 'sub mov': 93, 'pop mov': 69, 'call mov': 5, 'mov mov': 41, 'add mov': 1, 'mov add': 34, 'retn push': 88, 'lea mov': 31, 'mov lea': 40, 'push lea': 76, 'lea push': 32, 'test jnz': 95, 'mov pop': 46, 'mov xor': 54, 'cmp jnz': 11, 'mov push mov': 49, 'push push push': 85, 'push push call': 83, 'push call add': 74, 'push mov push': 81, 'mov push call': 48, 'push call mov': 75, 'call mov mov': 6, 'mov mov push': 43, 'mov push push': 50, 'push lea push': 77, 'jmp': 24, 'jz': 28, 'movzx': 55, 'imul': 18, 'nop': 61, 'mul': 56, 'inc': 23, 'db': 13, 'dd': 16, 'mov jmp': 39, 'mov call': 35, 'test jz': 96, 'imul mov': 20, 'mov imul': 38, 'mov test': 53, 'jz mov': 29, 'mov nop': 45, 'jnz mov': 27, 'pop pop': 70, 'mov mul': 44, 'imul imul': 19, 'mul mov': 58, 'jmp mov': 25, 'mov cmp': 37, 'xor mov': 98, 'nop mov': 63, 'cmp jz': 12, 'db db': 14, 'push mov mov': 80, 'mov mov mov': 42, 'push push mov': 84, 'push mov call': 79, 'mov call mov': 36, 'db db db': 15, 'nop nop': 65, 'dd dd': 17, 'std': 89, 'cld': 9, 'std mov': 90, 'imul mul': 21, 'mul imul': 57, 'mov std': 51, 'mul mul': 59, 'std std': 91, 'nop imul': 62, 'mul nop': 60, 'xor nop': 99, 'nop mul': 64, 'imul nop': 22, 'nop xor': 66}

Discussion

TF-IDF 对长文本的效果可能不如短文本,长文本中包含的词语数量通常比短文本多,导致 TF-IDF 的值被稀释。而词频则没有这些缺点,对文本长度不敏感。
TF-IDF 和词频都是从统计的角度计算词的重要性,没有考虑到词在文章中的具体含义以及词的上下文模式,但是计算简单而且效果也挺好。
n-gram 补充了一部分上下文信息,通常取值为 1 到 3,n 取值更大特征规模会随之爆炸,n-gram 仍然无法捕捉到长距离的依赖关系或上下文信息。同样高阶N-gram模型在处理大规模数据时可能会导致高维稀疏的问题,需要大量的存储空间和计算资源来处理大量的n-gram特征。N-gram模型可能对于训练数据中没有出现的序列缺乏泛化能力,容易出现过拟合的情况。

Reference

sklearn——CountVectorizer详解
机器学习:生动理解TF-IDF算法
自然语言处理(NLP)的N-gram模型是什么?
Microsoft Malware Classification Challenge (BIG 2015)