unet建筑物提取

发布时间 2023-08-19 17:20:32作者: DataAssassin

::: block-2

DataAssassin

全网同号,数据免费获取请关注公众号DataAssassin,如有问题请添加公众号联系作者。感谢您的关注。
:::
今天我们将基于tensorflow2.7深度学习框架构建UNet网络并实现建筑物遥感影像的语义分割。文章将分成以下几个部分。

  • 数据预处理
  • 网络构建
  • 模型训练
  • 模型评估

本次数据集使用的是Inria Aerial Image Labeling Dataset,它是一个用于城市建筑物检测的遥感图像数据集,其标记被分为建筑和非建筑两种,主要用于语义分割。

数据预处理

我们可以从它的官网https://project.inria.fr/aerialimagelabeling/下载数据集,数据集包含Training Set、Validation Set、Test Set三个部分,分别包含136,4,10幅1500*1500大小的遥感影像与对应的标签影像。部分数据如下所示。

我们定义一个随机裁剪函数,将训练与验证数据随机裁剪成256*256大小的数据集,构建我们的样本库。

size = 256
def split_images(tifPath,labelPath,tifout,labelout):
    for tif_file in os.listdir(tifPath):
        img_data = cv2.imread(img_name+ '/' +tif_file)
        label_data = cv2.imread(labelPath+ '/' +tif_file[:-1])/255.0
        count = 0
        while count < 50:
            random_width = random.randint(0, img_width - size - 1)
            random_height = random.randint(0, img_height - size - 1)
            num = '%0*d' % (4, count)
            child_img_data = img_data[:, random_height:random_height + size, random_width: random_width + size]
            # 暴力去除含有(255,255,255)的异常样本
            if child_img_data.max() == 255:
                continue
            # target影像为三波段,我们只需要有值的单波段
            child_label_data = label_data[0,random_height:random_height + size, random_width: random_width + size]
            count += 1
            cv2.imwrite(tifout + '/' + tif_file[:-5]+num + '.tif', child_img_data)
            cv2.imwrite(labelout + '/' + tif_file[:-5]+num + '.tif', child_label_data)

同时定义一个get_minibatch函数,方便训练时模型能以batch_size的大小批量读取样本数据。

def get_minibatch(image_dir, label_dir):
    img_blobs = []
    ground_truths = []  
    # 随机读取batchsize个数据
    for _ in range(config.BATCH_SIZE):
        instance_masks = []
        image_index = random.randrange(len(image_dir))
        img_name = image_dir[image_index]
        lab_name = label_dir[image_index]
        img_data = cv2.imread(img_name)
        img_grey = cv2.imread(lab_name, 0)
        for i in range(2):
            m = np.zeros([img_grey.shape[0], img_grey.shape[1]], dtype=np.uint8)
            m[np.where(img_grey == i)] = 1
            instance_masks.append(m)
        label_mask = np.stack(instance_masks, axis=2)
        img_blobs.append(img_data)
        ground_truths.append(label_mask)
    # 这里将数据转换成tensor与float32格式
    img_tensor = tf.convert_to_tensor(img_blobs)
    img_tensor1 = tf.cast(img_tensor, dtype=tf.float32)
    return img_tensor1, ground_truths。

网络构建

UNet网络结构图如下所示。

下面就开始构建UNet网络。

class ConvBlock(layers.Layer):
    def __init__(self, filters):
        super(ConvBlock, self).__init__()

        self.conv1 = layers.Conv2D(filters,(3,3),strides=(1,1),padding='same')
        self.bn1 = layers.BatchNormalization()
        self.ac1 = layers.Activation('relu')

        self.conv2 = layers.Conv2D(filters,(3,3),strides=(1,1),padding='same')
        self.bn2 = layers.BatchNormalization()
        self.ac2 = layers.Activation('relu')

    def call(self, input, training=None):
        x = self.conv1(input)
        x = self.bn1(x)
        x = self.ac1(x)
        x = self.conv2(x)
        x = self.bn2(x)
        out = self.ac2(x)
        return out


class MyUNet(Model):
    def __init__(self, filters=64):
        super(MyUNet, self).__init__()
        self.num_classes = config.NUM_CLASSES
        self.filters = filters

        self.block1 = ConvBlock(self.filters)
        self.pool1 = layers.MaxPooling2D(pool_size=(2, 2), padding='same')

        self.block2 = ConvBlock(self.filters*2)
        self.pool2 = layers.MaxPooling2D(pool_size=(2, 2), padding='same')

        self.block3 = ConvBlock(self.filters*4)
        self.pool3 = layers.MaxPooling2D(pool_size=(2, 2), padding='same')

        self.block4 = ConvBlock(self.filters*8)
        self.pool4 = layers.MaxPooling2D(pool_size=(2, 2), padding='same')

        self.block5 = ConvBlock(self.filters*16)

        self.up1 = layers.Conv2DTranspose(self.filters*8,2,strides=2,padding="same")
        self.bn1 = layers.BatchNormalization()

        self.upblock1 = ConvBlock(self.filters*8)

        self.up2= layers.Conv2DTranspose(self.filters*4,2,strides=2,padding="same")
        self.bn2 = layers.BatchNormalization()
        self.upblock2 = ConvBlock(self.filters*4)

        self.up3= layers.Conv2DTranspose(self.filters*2,2,strides=2,padding="same")
        self.bn3 = layers.BatchNormalization()
        self.upblock3 = ConvBlock(self.filters*2)

        self.up4 = layers.Conv2DTranspose(self.filters,2,strides=2,padding="same")
        self.bn4 = layers.BatchNormalization()
        self.upblock4 = ConvBlock(self.filters)

        self.o = layers.Conv2D(self.num_classes,(3,3),padding="same",activation="softmax")

    def call(self, x, training=None):

        b1 =self.block1(x)
        p1 = self.pool1(b1)

        b2 =self.block2(p1)
        p2 = self.pool2(b2)

        b3 =self.block3(p2)
        p3 = self.pool3(b3)

        b4 =self.block4(p3)
        p4 = self.pool4(b4)

        b5 = self.block5(p4)

        up1 = self.up1(b5)
        bn1 = self.bn1(up1)
        cc1 = layers.concatenate([bn1, b4],axis=3)
        ub1 = self.upblock1(cc1)

        up2 = self.up2(ub1)
        bn2 = self.bn2(up2)
        cc2 = layers.concatenate([bn2, b3],axis=3)
        ub2 = self.upblock2(cc2)

        up3 = self.up3(ub2)
        bn3 = self.bn3(up3)
        cc3 = layers.concatenate([bn3, b2],axis=3)
        ub3 = self.upblock3(cc3)

        up4 = self.up4(ub3)
        bn4 = self.bn4(up4)
        cc4 = layers.concatenate([bn4, b1],axis=3)
        ub4 = self.upblock4(cc4)

        output = self.o(ub4)
        return output

模型训练

训练所使用的代码如下。

model = MyUNet()

# 定义损失函数与优化器
loss_object = tf.keras.losses.BinaryCrossentropy(from_logits=True)
optimizer = tf.keras.optimizers.Adam(lr = 0.001)

train_loss = tf.keras.metrics.Mean(name='train_loss')
train_accuracy = tf.keras.metrics.BinaryAccuracy(name='train_accuracy')

valid_loss = tf.keras.metrics.Mean(name='valid_loss')
valid_accuracy = tf.keras.metrics.BinaryAccuracy(name='valid_accuracy')

@tf.function
def train_step(images, labels):
    with tf.GradientTape() as tape:
        predictions = model(images, training=True)
        loss = loss_object(y_true=labels, y_pred=predictions)
    gradients = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(grads_and_vars=zip(gradients, model.trainable_variables))
    train_loss(loss)
    train_accuracy(labels, predictions)

@tf.function
def valid_step(images, labels):
    predictions = model(images, training=False)
    v_loss = loss_object(labels, predictions)

    valid_loss(v_loss)
    valid_accuracy(labels, predictions)

# 定义4个列表存放loss与acc
fig_train_loss = np.zeros([config.EPOCHS])
fig_train_accuracy = np.zeros([config.EPOCHS])

fig_val_loss = np.zeros([config.EPOCHS])
fig_val_accuracy = np.zeros([config.EPOCHS])

# 开始训练,这里我们训练40轮
for epoch in range(config.EPOCHS):
    train_loss.reset_states()
    train_accuracy.reset_states()
    valid_loss.reset_states()
    valid_accuracy.reset_states()

    for step in range(0,config.steps):
        step += 1
        images, labels = get_minibatch(train_img_list, train_lab_list)
        train_step(images, labels)
		
        val_images, val_labels = get_minibatch(val_img_list, val_lab_list)
        valid_step(val_images, val_labels)
		# 每一个step我们都验证下数据,并打印训练与验证的loss与acc
        print("Epoch: {}/{}, step: {}/{}, train_loss: {:.5f}, train_accuracy: {:.5f},val_loss: {:.5f}, val_accuracy: {:.5f}".format(epoch + 1,
                                                                                 config.EPOCHS,
                                                                                 step,
                                                                                 config.steps,
                                                                                 train_loss.result(),
                                                                                 train_accuracy.result(),
                                                                                    valid_loss.result(),
                                                                                    valid_accuracy.result(),
                                                                                    ))

    fig_train_loss[epoch] = train_loss.result()
    fig_train_accuracy[epoch] = train_accuracy.result()

    fig_val_loss[epoch] = valid_loss.result()
    fig_val_accuracy[epoch] = valid_accuracy.result()

    print("Epoch: {}/{}, train loss: {:.5f}, train accuracy: {:.5f}, "
          "valid loss: {:.5f}, valid accuracy: {:.5f}".format(epoch + 1,
                                                              config.EPOCHS,
                                                              train_loss.result(),
                                                              train_accuracy.result(),
                                                              valid_loss.result(),
                                                              valid_accuracy.result()))

model.save_weights(filepath=config.save_model_dir, save_format='tf')

这里我们只训练了40轮,训练时的loss与acc变化情况如下所示。可以看到基本上模型是随着轮数的增加在一直收敛。

模型评估

我们利用训练好的模型来测试一下我们的数据,模型最终分割的部分结果如下所示。中间的是我们的结果,右边的是target,可以看到我们的分割结果基本上将建筑物都识别出来了。

我们也可以计算下测试结果的IOU评价指标,这里我们测一下IOU。

pre_path = 'result/' #预测结果的文件夹
gt_path = 'labels/' #target文件夹
img_size = (256, 256)  
files = os.listdir(gt_path)

D = np.zeros([len(files), img_size[0], img_size[1], 2]).astype(bool)#存储每一类的二值数据
for i, file in enumerate(files):
    img1 = cv2.imread(pre_path+file, 0)#以灰度值的形式读取
    img2 = cv2.imread(gt_path+file ,0)#以灰度值的形式读取

    D[i, :, :, 0] = img1 == 1
    D[i, :, :, 1] = img2 == 1
iou = np.sum(D[..., 0] & D[..., 1])/np.sum(D[..., 0] | D[..., 1]) #计算IOU

print("Class Building IOU is:"+str(iou))

源码获取

作者水平有限,如代码存在问题,请及时联系作者。
::: block-1

获取方法

如有需要,请转发公众号DataAssassin任意文章到朋友圈后,集齐17个赞,然后联系作者获取完整代码。
:::

本文由mdnice多平台发布