Add some cool diffusion testing scripts

This commit is contained in:
James Betker 2021-06-16 16:26:36 -06:00
parent 730c0135fd
commit 68cbbed886
4 changed files with 295 additions and 3 deletions

View File

@ -0,0 +1,96 @@
import os
import os.path as osp
import logging
import random
import time
import argparse
from collections import OrderedDict
import numpy
from PIL import Image
from torchvision.transforms import ToTensor
import utils
import utils.options as option
import utils.util as util
from trainer.ExtensibleTrainer import ExtensibleTrainer
from data import create_dataset, create_dataloader
from tqdm import tqdm
import torch
import numpy as np
# A rough copy of test.py that "surfs" along a spectrum of correction factors for a single image.
def forward_pass(model, data, output_dir, jc, bc):
with torch.no_grad():
model.feed_data(data, 0)
model.test()
visuals = model.get_current_visuals()['rlt'].cpu()
img_path = data['GT_path'][0]
img_name = osp.splitext(osp.basename(img_path))[0]
sr_img = util.tensor2img(visuals[0]) # uint8
# save images
suffixes = [f'_blur_{int(bc*1000)}_{int(jc*1000)}',
f'_jpeg_{int(jc*1000)}_{int(bc*1000)}']
for suffix in suffixes:
save_img_path = osp.join(output_dir, img_name + suffix + '.png')
util.save_img(sr_img, save_img_path)
if __name__ == "__main__":
# Set seeds
torch.manual_seed(5555)
random.seed(5555)
np.random.seed(5555)
#### options
torch.backends.cudnn.benchmark = True
want_metrics = False
parser = argparse.ArgumentParser()
parser.add_argument('-opt', type=str, help='Path to options YAML file.', default='../options/test_diffusion_unet.yml')
opt = option.parse(parser.parse_args().opt, is_train=False)
opt = option.dict_to_nonedict(opt)
utils.util.loaded_options = opt
util.mkdirs(
(path for key, path in opt['path'].items()
if not key == 'experiments_root' and 'pretrain_model' not in key and 'resume' not in key))
util.setup_logger('base', opt['path']['log'], 'test_' + opt['name'], level=logging.INFO,
screen=True, tofile=True)
logger = logging.getLogger('base')
logger.info(option.dict2str(opt))
# Load test image
im = ToTensor()(Image.open(opt['image'])) * 2 - 1
_, h, w = im.shape
if h % 2 == 1:
im = im[:,1:,:]
h = h-1
if w % 2 == 1:
im = im[:,:,1:]
w = w-1
dh, dw = (h - 32 * (h // 32)) // 2, (w - 32 * (w // 32)) // 2
if dh > 0:
im = im[:,dh:-dh]
if dw > 0:
im = im[:,:,dw:-dw]
im = im.unsqueeze(0)
# Build the corruption indexes we are going to use.
jpegs = list(numpy.arange(opt['min_jpeg_correction'], opt['max_jpeg_correction'], opt['jpeg_correction_step_size']))
deblurs = list(numpy.arange(opt['min_blur_correction'], opt['max_blur_correction'], opt['blur_correction_step_size']))
model = ExtensibleTrainer(opt)
results_dir = osp.join(opt['path']['results_root'], os.path.basename(opt['image']))
util.mkdir(results_dir)
for jpeg_correction in jpegs:
for blur_correction in deblurs:
data = {
'hq': im.to('cuda'),
'corruption_entropy': torch.tensor([[jpeg_correction, blur_correction]], device='cuda',
dtype=torch.float),
'GT_path': opt['image']
}
forward_pass(model, data, results_dir, jpeg_correction, blur_correction)

View File

@ -0,0 +1,94 @@
import os
import os.path as osp
import logging
import random
import time
import argparse
from collections import OrderedDict
import numpy
from PIL import Image
from torchvision.transforms import ToTensor
import utils
import utils.options as option
import utils.util as util
from trainer.ExtensibleTrainer import ExtensibleTrainer
from data import create_dataset, create_dataloader
from tqdm import tqdm
import torch
import numpy as np
# A rough copy of test.py that "surfs" along a set of random noise priors to show the affect of gaussian noise on the results.
def forward_pass(model, data, output_dir, spacing):
with torch.no_grad():
model.feed_data(data, 0)
model.test()
visuals = model.get_current_visuals()['rlt'].cpu()
img_path = data['GT_path'][0]
img_name = osp.splitext(osp.basename(img_path))[0]
sr_img = util.tensor2img(visuals[0]) # uint8
# save images
suffixes = [f'_{int(spacing)}']
for suffix in suffixes:
save_img_path = osp.join(output_dir, img_name + suffix + '.png')
util.save_img(sr_img, save_img_path)
if __name__ == "__main__":
# Set seeds
torch.manual_seed(5555)
random.seed(5555)
np.random.seed(5555)
#### options
torch.backends.cudnn.benchmark = True
want_metrics = False
parser = argparse.ArgumentParser()
parser.add_argument('-opt', type=str, help='Path to options YAML file.', default='../options/test_diffusion_unet.yml')
opt = option.parse(parser.parse_args().opt, is_train=False)
opt = option.dict_to_nonedict(opt)
utils.util.loaded_options = opt
util.mkdirs(
(path for key, path in opt['path'].items()
if not key == 'experiments_root' and 'pretrain_model' not in key and 'resume' not in key))
util.setup_logger('base', opt['path']['log'], 'test_' + opt['name'], level=logging.INFO,
screen=True, tofile=True)
logger = logging.getLogger('base')
logger.info(option.dict2str(opt))
# Load test image
im = ToTensor()(Image.open(opt['image'])) * 2 - 1
_, h, w = im.shape
if h % 2 == 1:
im = im[:,1:,:]
h = h-1
if w % 2 == 1:
im = im[:,:,1:]
w = w-1
dh, dw = (h - 32 * (h // 32)) // 2, (w - 32 * (w // 32)) // 2
if dh > 0:
im = im[:,dh:-dh]
if dw > 0:
im = im[:,:,dw:-dw]
im = im.unsqueeze(0)
# Build the corruption indexes we are going to use.
correction_factors = opt['correction_factor']
opt['steps']['generator']['injectors']['visual_debug']['zero_noise'] = False
model = ExtensibleTrainer(opt)
results_dir = osp.join(opt['path']['results_root'], os.path.basename(opt['image']))
util.mkdir(results_dir)
for i in range(10):
data = {
'hq': im.to('cuda'),
'corruption_entropy': torch.tensor([correction_factors], device='cuda',
dtype=torch.float),
'GT_path': opt['image']
}
forward_pass(model, data, results_dir, i)

View File

@ -0,0 +1,95 @@
import os
import os.path as osp
import logging
import random
import time
import argparse
from collections import OrderedDict
import numpy
from PIL import Image
from torchvision.transforms import ToTensor
import utils
import utils.options as option
import utils.util as util
from trainer.ExtensibleTrainer import ExtensibleTrainer
from data import create_dataset, create_dataloader
from tqdm import tqdm
import torch
import numpy as np
# A rough copy of test.py that "surfs" along a spectrum of timestep spacings to show what differences can be gleaned.
def forward_pass(model, data, output_dir, spacing):
with torch.no_grad():
model.feed_data(data, 0)
model.test()
visuals = model.get_current_visuals()['rlt'].cpu()
img_path = data['GT_path'][0]
img_name = osp.splitext(osp.basename(img_path))[0]
sr_img = util.tensor2img(visuals[0]) # uint8
# save images
suffixes = [f'_{int(spacing)}']
for suffix in suffixes:
save_img_path = osp.join(output_dir, img_name + suffix + '.png')
util.save_img(sr_img, save_img_path)
if __name__ == "__main__":
# Set seeds
torch.manual_seed(5555)
random.seed(5555)
np.random.seed(5555)
#### options
torch.backends.cudnn.benchmark = True
want_metrics = False
parser = argparse.ArgumentParser()
parser.add_argument('-opt', type=str, help='Path to options YAML file.', default='../options/test_diffusion_unet.yml')
opt = option.parse(parser.parse_args().opt, is_train=False)
opt = option.dict_to_nonedict(opt)
utils.util.loaded_options = opt
util.mkdirs(
(path for key, path in opt['path'].items()
if not key == 'experiments_root' and 'pretrain_model' not in key and 'resume' not in key))
util.setup_logger('base', opt['path']['log'], 'test_' + opt['name'], level=logging.INFO,
screen=True, tofile=True)
logger = logging.getLogger('base')
logger.info(option.dict2str(opt))
# Load test image
im = ToTensor()(Image.open(opt['image'])) * 2 - 1
_, h, w = im.shape
if h % 2 == 1:
im = im[:,1:,:]
h = h-1
if w % 2 == 1:
im = im[:,:,1:]
w = w-1
dh, dw = (h - 32 * (h // 32)) // 2, (w - 32 * (w // 32)) // 2
if dh > 0:
im = im[:,dh:-dh]
if dw > 0:
im = im[:,:,dw:-dw]
im = im.unsqueeze(0)
# Build the corruption indexes we are going to use.
timestep_spacings = opt['timestep_spacings']
correction_factors = opt['correction_factor']
results_dir = osp.join(opt['path']['results_root'], os.path.basename(opt['image']))
util.mkdir(results_dir)
for spacing in timestep_spacings:
opt['steps']['generator']['injectors']['visual_debug']['respaced_timestep_spacing'] = spacing
model = ExtensibleTrainer(opt)
data = {
'hq': im.to('cuda'),
'corruption_entropy': torch.tensor([correction_factors], device='cuda',
dtype=torch.float),
'GT_path': opt['image']
}
forward_pass(model, data, results_dir, spacing)

View File

@ -39,16 +39,22 @@ class GaussianDiffusionInjector(Injector):
class GaussianDiffusionInferenceInjector(Injector):
def __init__(self, opt, env):
super().__init__(opt, env)
use_ddim = opt_get(opt, ['use_ddim'], False)
self.generator = opt['generator']
self.output_batch_size = opt['output_batch_size']
self.output_scale_factor = opt['output_scale_factor']
self.undo_n1_to_1 = opt_get(opt, ['undo_n1_to_1'], False) # Explanation: when specified, will shift the output of this injector from [-1,1] to [0,1]
opt['diffusion_args']['betas'] = get_named_beta_schedule(**opt['beta_schedule'])
opt['diffusion_args']['use_timesteps'] = space_timesteps(opt['beta_schedule']['num_diffusion_timesteps'],
[opt_get(opt, ['respaced_timestep_spacing'], opt['beta_schedule']['num_diffusion_timesteps'])])
if use_ddim:
spacing = "ddim" + str(opt['respaced_timestep_spacing'])
else:
spacing = [opt_get(opt, ['respaced_timestep_spacing'], opt['beta_schedule']['num_diffusion_timesteps'])]
opt['diffusion_args']['use_timesteps'] = space_timesteps(opt['beta_schedule']['num_diffusion_timesteps'], spacing)
self.diffusion = SpacedDiffusion(**opt['diffusion_args'])
self.sampling_fn = self.diffusion.ddim_sample_loop if use_ddim else self.diffusion.p_sample_loop
self.model_input_keys = opt_get(opt, ['model_input_keys'], [])
self.use_ema_model = opt_get(opt, ['use_ema'], False)
self.zero_noise = opt_get(opt, ['zero_noise'], False)
def forward(self, state):
if self.use_ema_model:
@ -60,7 +66,8 @@ class GaussianDiffusionInferenceInjector(Injector):
with torch.no_grad():
output_shape = (self.output_batch_size, 3, model_inputs['low_res'].shape[-2] * self.output_scale_factor,
model_inputs['low_res'].shape[-1] * self.output_scale_factor)
gen = self.diffusion.p_sample_loop(gen, output_shape, model_kwargs=model_inputs)
noise = torch.zeros(output_shape, device=model_inputs['low_res'].device) if self.zero_noise else None
gen = self.sampling_fn(gen, output_shape, noise=noise, model_kwargs=model_inputs, progress=True)
if self.undo_n1_to_1:
gen = (gen + 1) / 2
return {self.output: gen}