import torch import torch.nn as nn import torch.nn.functional as F from einops import rearrange # Borrowed from ''Improving image restoration by revisiting global information aggregation'' # -------------------------------------------------------------------------------- train_size = (1, 3, 256, 256) class AvgPool2d(nn.Module): def __init__(self, kernel_size=None, base_size=None, auto_pad=True, fast_imp=False): super().__init__() self.kernel_size = kernel_size self.base_size = base_size self.auto_pad = auto_pad # only used for fast implementation self.fast_imp = fast_imp self.rs = [5, 4, 3, 2, 1] self.max_r1 = self.rs[0] self.max_r2 = self.rs[0] def extra_repr(self) -> str: return 'kernel_size={}, base_size={}, stride={}, fast_imp={}'.format( self.kernel_size, self.base_size, self.kernel_size, self.fast_imp ) def forward(self, x): if self.kernel_size is None and self.base_size: if isinstance(self.base_size, int): self.base_size = (self.base_size, self.base_size) self.kernel_size = list(self.base_size) self.kernel_size[0] = x.shape[2] * self.base_size[0] // train_size[-2] self.kernel_size[1] = x.shape[3] * self.base_size[1] // train_size[-1] # only used for fast implementation self.max_r1 = max(1, self.rs[0] * x.shape[2] // train_size[-2]) self.max_r2 = max(1, self.rs[0] * x.shape[3] // train_size[-1]) if self.fast_imp: # Non-equivalent implementation but faster h, w = x.shape[2:] if self.kernel_size[0] >= h and self.kernel_size[1] >= w: out = F.adaptive_avg_pool2d(x, 1) else: r1 = [r for r in self.rs if h % r == 0][0] r2 = [r for r in self.rs if w % r == 0][0] r1 = min(self.max_r1, r1) r2 = min(self.max_r2, r2) s = x[:, :, ::r1, ::r2].cumsum(dim=-1).cumsum(dim=-2) n, c, h, w = s.shape k1, k2 = min(h - 1, self.kernel_size[0] // r1), min(w - 1, self.kernel_size[1] // r2) out = (s[:, :, :-k1, :-k2] - s[:, :, :-k1, k2:] - s[:, :, k1:, :-k2] + s[:, :, k1:, k2:]) / (k1 * k2) out = torch.nn.functional.interpolate(out, scale_factor=(r1, r2)) else: n, c, h, w = x.shape s = x.cumsum(dim=-1).cumsum(dim=-2) s = torch.nn.functional.pad(s, (1, 0, 1, 0)) # pad 0 for convenience k1, k2 = min(h, self.kernel_size[0]), min(w, self.kernel_size[1]) s1, s2, s3, s4 = s[:, :, :-k1, :-k2], s[:, :, :-k1, k2:], s[:, :, k1:, :-k2], s[:, :, k1:, k2:] out = s4 + s1 - s2 - s3 out = out / (k1 * k2) if self.auto_pad: n, c, h, w = x.shape _h, _w = out.shape[2:] pad2d = ((w - _w) // 2, (w - _w + 1) // 2, (h - _h) // 2, (h - _h + 1) // 2) out = torch.nn.functional.pad(out, pad2d, mode='replicate') return out # -------------------------------------------------------------------------------- class BasicConv(nn.Module): def __init__(self, in_channel, out_channel, kernel_size, stride, bias=True, norm=False, relu=True, transpose=False): super(BasicConv, self).__init__() if bias and norm: bias = False padding = kernel_size // 2 layers = list() if transpose: padding = kernel_size // 2 - 1 layers.append( nn.ConvTranspose2d(in_channel, out_channel, kernel_size, padding=padding, stride=stride, bias=bias)) else: layers.append(nn.Conv2d(in_channel, out_channel, kernel_size, padding=padding, stride=stride, bias=bias)) if norm: layers.append(nn.BatchNorm2d(out_channel)) if relu: layers.append(nn.GELU()) self.main = nn.Sequential(*layers) def forward(self, x): return self.main(x) class Gap(nn.Module): def __init__(self, in_channel, mode) -> None: super().__init__() self.fscale_d = nn.Parameter(torch.zeros(in_channel), requires_grad=True) self.fscale_h = nn.Parameter(torch.zeros(in_channel), requires_grad=True) if mode == 'train': self.gap = nn.AdaptiveAvgPool2d((1, 1)) elif mode == 'test': self.gap = AvgPool2d(base_size=246) def forward(self, x): x_d = self.gap(x) x_h = (x - x_d) * (self.fscale_h[None, :, None, None] + 1.) x_d = x_d * self.fscale_d[None, :, None, None] return x_d + x_h class YYBlock(nn.Module): def __init__(self, in_channel=3, out_channel=20, relu_slope=0.2): super(YYBlock, self).__init__() self.spatialConv = nn.Sequential(*[ nn.Conv2d(in_channel, out_channel, kernel_size=3, padding=1, bias=True), nn.LeakyReLU(relu_slope, inplace=False), nn.Conv2d(out_channel, out_channel, kernel_size=3, padding=1, bias=True), nn.LeakyReLU(relu_slope, inplace=False) ]) self.identity = nn.Conv2d(in_channel, out_channel, 1, 1, 0) self.fftConv2 = nn.Sequential(*[ nn.Conv2d(out_channel, out_channel, 1, 1, 0), nn.LeakyReLU(relu_slope, inplace=False), nn.Conv2d(out_channel, out_channel, 1, 1, 0) ]) self.fusion = nn.Conv2d(out_channel * 2, out_channel, 1, 1, 0) def forward(self, x1): spatial_out = self.spatialConv(x1) identity_out = self.identity(x1) out = spatial_out + identity_out x_fft = torch.fft.rfft2(out, norm='backward') x_amp = torch.abs(x_fft) x_phase = torch.angle(x_fft) enhanced_phase = self.fftConv2(x_phase) enhanced_amp = self.fftConv2(x_amp) # x_fft_out1 = torch.fft.irfft2(x_amp * torch.exp(1j * enhanced_phase), norm='backward') x_fft_out2 = torch.fft.irfft2(enhanced_amp * torch.exp(1j * x_phase), norm='backward') # out = self.fusion(torch.cat([out, x_fft_out2], dim=1)) return x_fft_out2 class ResBlock(nn.Module): def __init__(self, in_channel, out_channel, mode, filter=False): super(ResBlock, self).__init__() self.conv1 = BasicConv(in_channel, out_channel, kernel_size=3, stride=1, relu=True) self.conv2 = BasicConv(out_channel, out_channel, kernel_size=3, stride=1, relu=False) self.yyBlock = YYBlock(in_channel, out_channel, relu_slope=0.2) self.filter = filter def forward(self, x): out = self.conv1(x) out = self.yyBlock(out) out = out + x return out # class SFconv(nn.Module): # def __init__(self, features, mode, M=2, r=2, L=32) -> None: # super().__init__() # # d = max(int(features / r), L) # self.features = features # # self.fc = nn.Conv2d(features, d, 1, 1, 0) # self.fcs = nn.ModuleList([]) # for i in range(M): # self.fcs.append( # nn.Conv2d(d, features, 1, 1, 0) # ) # self.softmax = nn.Softmax(dim=1) # self.out = nn.Conv2d(features, features, 1, 1, 0) # # if mode == 'train': # self.gap = nn.AdaptiveAvgPool2d(1) # elif mode == 'test': # self.gap = AvgPool2d(base_size=246) # # def forward(self, low, high): # emerge = low + high # emerge = self.gap(emerge) # # fea_z = self.fc(emerge) # # high_att = self.fcs[0](fea_z) # low_att = self.fcs[1](fea_z) # # attention_vectors = torch.cat([high_att, low_att], dim=1) # # attention_vectors = self.softmax(attention_vectors) # high_att, low_att = torch.chunk(attention_vectors, 2, dim=1) # # fea_high = high * high_att # fea_low = low * low_att # # out = self.out(fea_high + fea_low) # return out # # # class Patch_ap(nn.Module): # def __init__(self, mode, inchannel, patch_size): # super(Patch_ap, self).__init__() # # if mode == 'train': # self.ap = nn.AdaptiveAvgPool2d((1, 1)) # elif mode == 'test': # self.ap = AvgPool2d(base_size=246) # # self.patch_size = patch_size # self.channel = inchannel * patch_size ** 2 # self.h = nn.Parameter(torch.zeros(self.channel)) # self.l = nn.Parameter(torch.zeros(self.channel)) # # def forward(self, x): # # patch_x = rearrange(x, 'b c (p1 w1) (p2 w2) -> b c p1 w1 p2 w2', p1=self.patch_size, p2=self.patch_size) # patch_x = rearrange(patch_x, ' b c p1 w1 p2 w2 -> b (c p1 p2) w1 w2', p1=self.patch_size, p2=self.patch_size) # # low = self.ap(patch_x) # high = (patch_x - low) * self.h[None, :, None, None] # out = high + low * self.l[None, :, None, None] # out = rearrange(out, 'b (c p1 p2) w1 w2 -> b c (p1 w1) (p2 w2)', p1=self.patch_size, p2=self.patch_size) # # return out
import torch import torch.nn as nn import torch.nn.functional as F from einops import rearrange # Borrowed from ''Improving image restoration by revisiting global information aggregation'' # -------------------------------------------------------------------------------- train_size = (1, 3, 256, 256) class AvgPool2d(nn.Module): def __init__(self, kernel_size=None, base_size=None, auto_pad=True, fast_imp=False): super().__init__() self.kernel_size = kernel_size self.base_size = base_size self.auto_pad = auto_pad # only used for fast implementation self.fast_imp = fast_imp self.rs = [5, 4, 3, 2, 1] self.max_r1 = self.rs[0] self.max_r2 = self.rs[0] def extra_repr(self) -> str: return 'kernel_size={}, base_size={}, stride={}, fast_imp={}'.format( self.kernel_size, self.base_size, self.kernel_size, self.fast_imp ) def forward(self, x): if self.kernel_size is None and self.base_size: if isinstance(self.base_size, int): self.base_size = (self.base_size, self.base_size) self.kernel_size = list(self.base_size) self.kernel_size[0] = x.shape[2] * self.base_size[0] // train_size[-2] self.kernel_size[1] = x.shape[3] * self.base_size[1] // train_size[-1] # only used for fast implementation self.max_r1 = max(1, self.rs[0] * x.shape[2] // train_size[-2]) self.max_r2 = max(1, self.rs[0] * x.shape[3] // train_size[-1]) if self.fast_imp: # Non-equivalent implementation but faster h, w = x.shape[2:] if self.kernel_size[0] >= h and self.kernel_size[1] >= w: out = F.adaptive_avg_pool2d(x, 1) else: r1 = [r for r in self.rs if h % r == 0][0] r2 = [r for r in self.rs if w % r == 0][0] r1 = min(self.max_r1, r1) r2 = min(self.max_r2, r2) s = x[:, :, ::r1, ::r2].cumsum(dim=-1).cumsum(dim=-2) n, c, h, w = s.shape k1, k2 = min(h - 1, self.kernel_size[0] // r1), min(w - 1, self.kernel_size[1] // r2) out = (s[:, :, :-k1, :-k2] - s[:, :, :-k1, k2:] - s[:, :, k1:, :-k2] + s[:, :, k1:, k2:]) / (k1 * k2) out = torch.nn.functional.interpolate(out, scale_factor=(r1, r2)) else: n, c, h, w = x.shape s = x.cumsum(dim=-1).cumsum(dim=-2) s = torch.nn.functional.pad(s, (1, 0, 1, 0)) # pad 0 for convenience k1, k2 = min(h, self.kernel_size[0]), min(w, self.kernel_size[1]) s1, s2, s3, s4 = s[:, :, :-k1, :-k2], s[:, :, :-k1, k2:], s[:, :, k1:, :-k2], s[:, :, k1:, k2:] out = s4 + s1 - s2 - s3 out = out / (k1 * k2) if self.auto_pad: n, c, h, w = x.shape _h, _w = out.shape[2:] pad2d = ((w - _w) // 2, (w - _w + 1) // 2, (h - _h) // 2, (h - _h + 1) // 2) out = torch.nn.functional.pad(out, pad2d, mode='replicate') return out # -------------------------------------------------------------------------------- class BasicConv(nn.Module): def __init__(self, in_channel, out_channel, kernel_size, stride, bias=True, norm=False, relu=True, transpose=False): super(BasicConv, self).__init__() if bias and norm: bias = False padding = kernel_size // 2 layers = list() if transpose: padding = kernel_size // 2 - 1 layers.append( nn.ConvTranspose2d(in_channel, out_channel, kernel_size, padding=padding, stride=stride, bias=bias)) else: layers.append(nn.Conv2d(in_channel, out_channel, kernel_size, padding=padding, stride=stride, bias=bias)) if norm: layers.append(nn.BatchNorm2d(out_channel)) if relu: layers.append(nn.GELU()) self.main = nn.Sequential(*layers) def forward(self, x): return self.main(x) class Gap(nn.Module): def __init__(self, in_channel, mode) -> None: super().__init__() self.fscale_d = nn.Parameter(torch.zeros(in_channel), requires_grad=True) self.fscale_h = nn.Parameter(torch.zeros(in_channel), requires_grad=True) if mode == 'train': self.gap = nn.AdaptiveAvgPool2d((1, 1)) elif mode == 'test': self.gap = AvgPool2d(base_size=246) def forward(self, x): x_d = self.gap(x) x_h = (x - x_d) * (self.fscale_h[None, :, None, None] + 1.) x_d = x_d * self.fscale_d[None, :, None, None] return x_d + x_h class YYBlock(nn.Module): def __init__(self, in_channel=3, out_channel=20, relu_slope=0.2): super(YYBlock, self).__init__() self.spatialConv = nn.Sequential(*[ nn.Conv2d(in_channel, out_channel, kernel_size=3, padding=1, bias=True), nn.LeakyReLU(relu_slope, inplace=False), nn.Conv2d(out_channel, out_channel, kernel_size=3, padding=1, bias=True), nn.LeakyReLU(relu_slope, inplace=False) ]) self.identity = nn.Conv2d(in_channel, out_channel, 1, 1, 0) self.fftConv2 = nn.Sequential(*[ nn.Conv2d(out_channel, out_channel, 1, 1, 0), nn.LeakyReLU(relu_slope, inplace=False), nn.Conv2d(out_channel, out_channel, 1, 1, 0) ]) self.fusion = nn.Conv2d(out_channel * 2, out_channel, 1, 1, 0) def forward(self, x1): spatial_out = self.spatialConv(x1) identity_out = self.identity(x1) out = spatial_out + identity_out x_fft = torch.fft.rfft2(out, norm='backward') x_amp = torch.abs(x_fft) x_phase = torch.angle(x_fft) enhanced_phase = self.fftConv2(x_phase) enhanced_amp = self.fftConv2(x_amp) # x_fft_out1 = torch.fft.irfft2(x_amp * torch.exp(1j * enhanced_phase), norm='backward') x_fft_out2 = torch.fft.irfft2(enhanced_amp * torch.exp(1j * x_phase), norm='backward') # out = self.fusion(torch.cat([out, x_fft_out2], dim=1)) return x_fft_out2 class ResBlock(nn.Module): def __init__(self, in_channel, out_channel, mode, filter=False): super(ResBlock, self).__init__() self.conv1 = BasicConv(in_channel, out_channel, kernel_size=3, stride=1, relu=True) self.yyBlock = YYBlock(in_channel, out_channel, relu_slope=0.2) self.filter = filter def forward(self, x): out = self.conv1(x) out = self.yyBlock(out) out = out + x return out # class SpaBlock(nn.Module): # def __init__(self, in_channel=3, out_channel=20, relu_slope=0.2): # super(SpaBlock, self).__init__() # self.identity = nn.Conv2d(in_channel, out_channel, 1, 1, 0) # self.conv1 = BasicConv(in_channel, out_channel, kernel_size=3, stride=1, relu=True) # # self.spatialConv = nn.Sequential(*[ # # nn.Conv2d(in_channel, out_channel, kernel_size=3, padding=1, bias=True), # # nn.GELU(), # nn.Conv2d(out_channel, out_channel, kernel_size=3, padding=1, bias=True), # nn.LeakyReLU(relu_slope, inplace=False), # nn.Conv2d(out_channel, out_channel, kernel_size=3, padding=1, bias=True), # nn.LeakyReLU(relu_slope, inplace=False) # ]) # # def forward(self, x): # conv1_out = self.conv1(x) # spa_out = self.spatialConv(conv1_out) # iden_out = self.identity(conv1_out) # out = iden_out + spa_out # # return out # # # class FreqBlock(nn.Module): # def __init__(self, in_channel=3, out_channel=20, relu_slope=0.2): # super(FreqBlock, self).__init__() # self.freqConv = nn.Sequential(*[ # nn.Conv2d(out_channel, out_channel, 1, 1, 0), # nn.LeakyReLU(relu_slope, inplace=False), # nn.Conv2d(out_channel, out_channel, 1, 1, 0) # ]) # # def forward(self, x): # x_fft = torch.fft.rfft2(x, norm='backward') # x_amp = torch.abs(x_fft) # x_phase = torch.angle(x_fft) # # # enhanced_phase = self.fftConv2(x_phase) # enhanced_amp = self.freqConv(x_amp) # x_fft_out2 = torch.fft.irfft2(enhanced_amp * torch.exp(1j * x_phase), norm='backward') # # return x_fft_out2 # # # class ResBlock(nn.Module): # def __init__(self, in_channel, out_channel, mode, filter=False): # super(ResBlock, self).__init__() # self.spaBlock = SpaBlock(in_channel, out_channel) # self.freqBlock = FreqBlock(in_channel, out_channel) # # def forward(self, x): # out = self.spaBlock(x) # out = self.freqBlock(out) # out = out + x # # return out