异常检测算法-完全卷积数据描述子FCDD

发布时间 2023-11-05 20:36:07作者: 澳大利亚树袋熊
文献来源:
EXPLAINABLE DEEP ONE-CLASS CLASSIFICATION
 
 
 
最近在做一些异物检测之类的算法任务,原本想使用目标识别算法,但是问题是正样本太多,而负样本没几个。所以有必要使用异常检测算法,日后不妨再结合目标识别任务去做。
在正式开始前,需要先简单介绍一个广义损失函数的东西。(https://zhuanlan.zhihu.com/p/494343423)

x损失、alpha变量、C和梯度的关系

 alpha是控制鲁棒性的的变形参数,c则是用来控制二次函数宽度的尺度系数。

当alpha=1时,有huber loss,这个函数的梯度很有意思,当损失很大时,可以反向一个不大于1的梯度。当损失很小时,倾向于反向一个很小的梯度。

当alpha=负无穷时,有Welsch loss。当损失很大时,就直接失效了。

这两个loss和原文提及的损失函数,非常的像,也就是HSC损失函数。

 按原文来说,该函数会使正常数据靠向中心,异常数据远离中心。

 

 网络的主体结构类似于这样,作者简单提了以下感受野这个基础概念,大概就是输出图与原始图之间的映射关系,网络越深,输出的一个像素就能代表更多的原始图像素。

作者仅使用了池化、卷积,只在最后一步:生成高分辨率热图时,使用了类似转置高斯卷积的方法来提高分辨率。其实我觉得没什么用,可能就是找些创新点的。

方法还是相当有效的,我使用了unet也取得了不错的效果,这里贴出代码:

#Unet.py
import torch
import torch.nn as nn
import torchvision

class Decoder(nn.Module):
  def __init__(self, in_channels, middle_channels, out_channels):
    super(Decoder, self).__init__()
    self.up = nn.ConvTranspose2d(in_channels, out_channels, kernel_size=2, stride=2)
    self.conv_relu = nn.Sequential(
        nn.Conv2d(middle_channels, out_channels, kernel_size=3, padding=1),
        nn.ReLU(inplace=True)
        )
  def forward(self, x1, x2):
    x1 = self.up(x1)
    x1 = torch.cat((x1, x2), dim=1)
    x1 = self.conv_relu(x1)
    return x1

class Unet(nn.Module):
    def __init__(self, n_class):
        super().__init__()

        self.base_model = torchvision.models.resnet18(True)
        self.base_layers = list(self.base_model.children())
        self.layer1 = nn.Sequential(
            nn.Conv2d(1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False),
            self.base_layers[1],
            self.base_layers[2])
        self.layer2 = nn.Sequential(*self.base_layers[3:5])
        self.layer3 = self.base_layers[5]
        self.layer4 = self.base_layers[6]
        self.layer5 = self.base_layers[7]
        self.decode4 = Decoder(512, 256+256, 256)
        self.decode3 = Decoder(256, 256+128, 256)
        self.decode2 = Decoder(256, 128+64, 128)
        self.decode1 = Decoder(128, 64+64, 64)
        self.decode0 = nn.Sequential(
            nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True),
            nn.Conv2d(64, 32, kernel_size=3, padding=1, bias=False),
            nn.Conv2d(32, 64, kernel_size=3, padding=1, bias=False)
            )
        self.conv_last = nn.Conv2d(64, n_class, 1)

    def forward(self, input):
        e1 = self.layer1(input) # 64,128,128
        e2 = self.layer2(e1) # 64,64,64
        e3 = self.layer3(e2) # 128,32,32
        e4 = self.layer4(e3) # 256,16,16
        f = self.layer5(e4) # 512,8,8
        d4 = self.decode4(f, e4) # 256,16,16
        d3 = self.decode3(d4, e3) # 256,32,32
        d2 = self.decode2(d3, e2) # 128,64,64
        d1 = self.decode1(d2, e1) # 64,128,128
        d0 = self.decode0(d1) # 64,256,256
        out = self.conv_last(d0) # 1,256,256
        return out
import random

import torchvision.datasets
import torch
import torch.nn as nn
#导入dataloader的包
from torch.utils.data import DataLoader
from torch.utils.data import Dataset
from torch.utils.tensorboard import SummaryWriter
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import cv2
from torchvision import transforms
import Unet
train_path = r"E:\machine_learning\AE\train_pic"
test_path = r"E:\machine_learning\AE\test_pic"
err_path = r"E:\machine_learning\AE\err_pic"
lr=1e-4
epochs=50
batch_size=5
device="cuda"
def load_data(normal_source,anomal_source,image_suffix_name=".jpg"):
    image_file=[]
    label=[]
    for parent_folder, _, file_names in os.walk(normal_source):
        # 遍历当前子文件夹中的所有文件
        for file_name in file_names:
            # 只处理图片文件
            # if file_name.endswith(('jpg', 'jpeg', 'png', 'gif')):#提取jpg、jpeg等格式的文件到指定目录
            if file_name.endswith((image_suffix_name)):  # 提取json格式的文件到指定目录
                # 构造源文件路径和目标文件路径
                image_file.append(normal_source+"\\"+file_name)
                label.append(0)

    for parent_folder, _, file_names in os.walk(anomal_source):
        # 遍历当前子文件夹中的所有文件
        for file_name in file_names:
            # 只处理图片文件
            # if file_name.endswith(('jpg', 'jpeg', 'png', 'gif')):#提取jpg、jpeg等格式的文件到指定目录
            if file_name.endswith((image_suffix_name)):  # 提取json格式的文件到指定目录
                # 构造源文件路径和目标文件路径
                image_file.append(anomal_source+"\\"+file_name)
                label.append(1)
    return image_file,label

def image_rancut(image):
    (MAXheight, MAXwidth) = image.shape
    h=random.randint(64,212)
    w=random.randint(64,212)
    x=random.randint(0,MAXwidth-w)
    y = random.randint(0, MAXheight-h)
    return image[y:y+h,x:x+w,]

def letterbox(im, new_shape=(128, 128), color=114):
    # Resize and pad image while meeting stride-multiple constraints
    shape = im.shape[:2]  # current shape [height, width]

    # Scale ratio (new / old)
    r = min(new_shape[0] / shape[0], new_shape[1] / shape[1])

    # Compute padding
    new_unpad = int(round(shape[1] * r)), int(round(shape[0] * r))
    dw, dh = (new_shape[1] - new_unpad[0]) / 2, (new_shape[0] - new_unpad[1]) / 2  # wh padding
    top, bottom = int(round(dh - 0.1)), int(round(dh + 0.1))
    left, right = int(round(dw - 0.1)), int(round(dw + 0.1))

    if shape[::-1] != new_unpad:  # resize
        im = cv2.resize(im, new_unpad, interpolation=cv2.INTER_LINEAR)
    im = cv2.copyMakeBorder(im, top, bottom, left, right, cv2.BORDER_CONSTANT, value=color)  # add border
    return im


class resizeAndNormalize():
    def __init__(self, size, interpolation=cv2.INTER_LINEAR):
        # 注意对于opencv,size的格式是(w,h)
        self.size = size
        self.interpolation = interpolation
        # ToTensor属于类  """Convert a ``PIL Image`` or ``numpy.ndarray`` to tensor.
        self.toTensor = transforms.ToTensor()

    def __call__(self, image):
        # (x,y) 对于opencv来说,图像宽对应x轴,高对应y轴
        image = letterbox(image, self.size,random.randint(0,255))
        #cv2.imwrite("test.jpg",image)
        # 转为tensor的数据结构
        #cv2.imwrite("test.jpg", image)
        image = self.toTensor(image)
        # 对图像进行归一化操作
        #image = image.sub_(0.5).div_(0.5)
        return image

class FCDD_DataSet(Dataset):
    def __init__(self, normal_source,anomal_source,train=True):
        super(FCDD_DataSet, self).__init__()
        self.image_file,self.label= load_data(normal_source,anomal_source)

    def __len__(self):
        return len(self.image_file)

    def __getitem__(self, index):
        img=cv2.imread(self.image_file[index],cv2.IMREAD_GRAYSCALE)
        #img=image_rancut(img)
        size_width=640
        size_height=640
        transform = resizeAndNormalize((size_width, size_height))
        # 图像预处理
        imageTensor = transform(img)
        labelTensor=torch.tensor(self.label[index])
        #label tensor
        return imageTensor,labelTensor


class FCDDLoss(nn.Module):
    def __int__(self):
        super(FCDDLoss,self).__init__()

    def forward(self,output,labels):
        loss = output ** 2
        loss = (loss + 1).sqrt() - 1
        loss = loss.reshape(labels.size(0), -1).mean(-1)
        norm = loss[labels == 0]
        anom = (-(((1 - (-loss[labels == 1]).exp()) + 1e-31).log()))
        loss[(1 - labels).nonzero().squeeze()] = norm
        loss[labels.nonzero().squeeze()] = anom
        return loss.mean()

class Autoencoder(nn.Module):
    def __init__(self):
        super(Autoencoder, self).__init__()
        # encoder
        self.efc1=nn.Sequential(
            nn.Linear(2048,1024),
            nn.Linear(1024, 512),
            #nn.Linear(256, 128),
        )
        self.dfc1 = nn.Sequential(
            #nn.Linear(128, 256),
            nn.Linear(512, 1024),
            nn.Linear(1024, 2048),
        )
        self.efc = nn.Sequential(
            nn.Conv2d(
                in_channels=1,              # input height
                out_channels=16,            # n_filters
                kernel_size=3,              # filter size
                stride=1,                   # filter movement/step
                padding=1,                  # if want same width and length of this image after Conv2d, padding=(kernel_size-1)/2 if stride=1
            ),                              # output shape (16, 64)                     # activation
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2),
            nn.Conv2d(16, 16, 3, 1, 1),  # output shape (16, 32)
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(16, 32, 3, 1, 1),  # output shape (32, 16)
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(2),# output shape (32, 8,8)
        )
        self.dfc = nn.Sequential(
            nn.Linear(32*16*16,2*64*64),
            nn.Dropout(0.3),
            nn.ReLU(),
            nn.Linear(2*64*64,128*128),
        )
    def forward(self, x):
        x = self.efc(x)
        x=x.view(-1,32*16*16)
        y = self.dfc(x)
        y=y.view(-1,1,128,128)
        return y


AE=Unet.Unet(1).to(device)
AE.train()
criterion = FCDDLoss()
optimizer = torch.optim.Adam(AE.parameters(), lr=lr)
train_dataset=FCDD_DataSet(train_path,err_path)
train_dataloder = DataLoader(train_dataset, batch_size=batch_size,
                            num_workers=0, drop_last=True,shuffle=True)

test_dataloder = DataLoader(train_dataset, batch_size=1,
                            num_workers=0, drop_last=True,shuffle=True)
loss_list=[]
for i in range(epochs):
    for data,labels in train_dataloder:
        data=data.to(device)
        labels=labels.to(device)
        output=AE(data)
        loss =criterion(output, labels)  # cross entropy loss
        optimizer.zero_grad()           # clear gradients for this training step
        loss.backward()                 # backpropagation, compute gradients
        optimizer.step()
        loss_list.append(float(loss))
        print(str(i)+"次loss:"+str(float(loss)))
AE.eval()
test_list=[]
for data,labels in test_dataloder:
    data = data.to(device)
    labels = labels.to(device)
    output=AE(data)
    loss =criterion(output,  labels)
    test_list.append(float(loss))

plt.figure(figsize=(10, 10))
plt.subplot(2,1,1)
#plt.ylim(0, 0.5)
plt.plot(loss_list)
plt.title('train')
plt.subplot(2,1,2)
plt.bar(range(len(test_list)),test_list)
plt.title('test_loss')
plt.tight_layout(h_pad=3.0)
plt.savefig("total.jpg")
pp=0
for data,labels in test_dataloder:
    pp+=1
    if(pp==20):
        break
    data = data.to(device)
    output = AE(data)
    output=(output**2 + 1).sqrt() - 1
    data=data.view(1,640,640)
    output =output.view(1, 640, 640)
    test_jpg=data.detach().to("cpu").numpy()*255
    res_jpg=output.detach().to("cpu").numpy()*255
    cv2.imwrite(str(pp)+"etest.jpg",np.transpose(test_jpg, (1, 2, 0)))
    cv2.imwrite(str(pp) +"eres.jpg",np.transpose(res_jpg, (1, 2, 0)))