Add FID evaluator for diffusion models
This commit is contained in:
parent
9cfe840872
commit
5b4f86293f
|
@ -58,7 +58,7 @@ if __name__ == "__main__":
|
||||||
torch.backends.cudnn.benchmark = True
|
torch.backends.cudnn.benchmark = True
|
||||||
want_metrics = False
|
want_metrics = False
|
||||||
parser = argparse.ArgumentParser()
|
parser = argparse.ArgumentParser()
|
||||||
parser.add_argument('-opt', type=str, help='Path to options YAML file.', default='../options/test_diffusion_unet_sm.yml')
|
parser.add_argument('-opt', type=str, help='Path to options YAML file.', default='../options/test_diffusion_unet.yml')
|
||||||
opt = option.parse(parser.parse_args().opt, is_train=False)
|
opt = option.parse(parser.parse_args().opt, is_train=False)
|
||||||
opt = option.dict_to_nonedict(opt)
|
opt = option.dict_to_nonedict(opt)
|
||||||
utils.util.loaded_options = opt
|
utils.util.loaded_options = opt
|
||||||
|
|
|
@ -143,6 +143,7 @@ class ExtensibleTrainer(BaseModel):
|
||||||
# Replace the env networks with the wrapped networks
|
# Replace the env networks with the wrapped networks
|
||||||
self.env['generators'] = self.netsG
|
self.env['generators'] = self.netsG
|
||||||
self.env['discriminators'] = self.netsD
|
self.env['discriminators'] = self.netsD
|
||||||
|
self.env['emas'] = self.emas
|
||||||
|
|
||||||
self.print_network() # print network
|
self.print_network() # print network
|
||||||
self.load() # load networks from save states as needed
|
self.load() # load networks from save states as needed
|
||||||
|
|
|
@ -17,7 +17,7 @@ class Evaluator:
|
||||||
|
|
||||||
|
|
||||||
def format_evaluator_name(name):
|
def format_evaluator_name(name):
|
||||||
# Formats by converting from CamelCase to snake_case and removing trailing "_injector"
|
# Formats by converting from CamelCase to snake_case and removing trailing "_evaluator"
|
||||||
name = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
|
name = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
|
||||||
name = re.sub('([a-z0-9])([A-Z])', r'\1_\2', name).lower()
|
name = re.sub('([a-z0-9])([A-Z])', r'\1_\2', name).lower()
|
||||||
return name.replace("_evaluator", "")
|
return name.replace("_evaluator", "")
|
||||||
|
|
50
codes/trainer/eval/sr_diffusion_fid.py
Normal file
50
codes/trainer/eval/sr_diffusion_fid.py
Normal file
|
@ -0,0 +1,50 @@
|
||||||
|
import os
|
||||||
|
import torch
|
||||||
|
import os.path as osp
|
||||||
|
import torchvision
|
||||||
|
from torch.nn.functional import interpolate
|
||||||
|
from tqdm import tqdm
|
||||||
|
|
||||||
|
import trainer.eval.evaluator as evaluator
|
||||||
|
|
||||||
|
from pytorch_fid import fid_score
|
||||||
|
from data import create_dataset
|
||||||
|
from torch.utils.data import DataLoader
|
||||||
|
|
||||||
|
from trainer.injectors.gaussian_diffusion_injector import GaussianDiffusionInferenceInjector
|
||||||
|
from utils.util import opt_get
|
||||||
|
|
||||||
|
|
||||||
|
class SrDiffusionFidEvaluator(evaluator.Evaluator):
|
||||||
|
def __init__(self, model, opt_eval, env):
|
||||||
|
super().__init__(model, opt_eval, env)
|
||||||
|
self.batch_sz = opt_eval['batch_size']
|
||||||
|
self.fid_batch_size = opt_get(opt_eval, ['fid_batch_size'], 64)
|
||||||
|
assert self.batch_sz is not None
|
||||||
|
self.dataset = create_dataset(opt_eval['dataset'])
|
||||||
|
self.fid_real_samples = opt_eval['dataset']['paths'] # This is assumed to exist for the given dataset.
|
||||||
|
assert isinstance(self.fid_real_samples, str)
|
||||||
|
self.dataloader = DataLoader(self.dataset, self.batch_sz, shuffle=False, num_workers=1)
|
||||||
|
self.gd = GaussianDiffusionInferenceInjector(opt_eval['diffusion_params'], env)
|
||||||
|
self.out_key = opt_eval['diffusion_params']['out']
|
||||||
|
|
||||||
|
def perform_eval(self):
|
||||||
|
fid_fake_path = osp.join(self.env['base_path'], "..", "fid", str(self.env["step"]))
|
||||||
|
os.makedirs(fid_fake_path, exist_ok=True)
|
||||||
|
counter = 0
|
||||||
|
for batch in tqdm(self.dataloader):
|
||||||
|
batch = {k: v.to(self.env['device']) if isinstance(v, torch.Tensor) else v for k, v in batch.items()}
|
||||||
|
gen = self.gd(batch)[self.out_key]
|
||||||
|
|
||||||
|
# All gather if we're in distributed mode.
|
||||||
|
if torch.distributed.is_available() and torch.distributed.is_initialized():
|
||||||
|
gather_list = [torch.zeros_like(gen) for _ in range(torch.distributed.get_world_size())]
|
||||||
|
torch.distributed.all_gather(gather_list, gen)
|
||||||
|
gen = torch.cat(gather_list, dim=0)
|
||||||
|
|
||||||
|
for b in range(self.batch_sz):
|
||||||
|
torchvision.utils.save_image(gen[b], osp.join(fid_fake_path, "%i_.png" % (counter)))
|
||||||
|
counter += 1
|
||||||
|
|
||||||
|
return {"fid": fid_score.calculate_fid_given_paths([self.fid_real_samples, fid_fake_path], self.fid_batch_size,
|
||||||
|
True, 2048)}
|
|
@ -48,8 +48,12 @@ class GaussianDiffusionInferenceInjector(Injector):
|
||||||
[opt_get(opt, ['respaced_timestep_spacing'], opt['beta_schedule']['num_diffusion_timesteps'])])
|
[opt_get(opt, ['respaced_timestep_spacing'], opt['beta_schedule']['num_diffusion_timesteps'])])
|
||||||
self.diffusion = SpacedDiffusion(**opt['diffusion_args'])
|
self.diffusion = SpacedDiffusion(**opt['diffusion_args'])
|
||||||
self.model_input_keys = opt_get(opt, ['model_input_keys'], [])
|
self.model_input_keys = opt_get(opt, ['model_input_keys'], [])
|
||||||
|
self.use_ema_model = opt_get(opt, ['use_ema'], False)
|
||||||
|
|
||||||
def forward(self, state):
|
def forward(self, state):
|
||||||
|
if self.use_ema_model:
|
||||||
|
gen = self.env['emas'][self.opt['generator']]
|
||||||
|
else:
|
||||||
gen = self.env['generators'][self.opt['generator']]
|
gen = self.env['generators'][self.opt['generator']]
|
||||||
model_inputs = {k: state[v][:self.output_batch_size] for k, v in self.model_input_keys.items()}
|
model_inputs = {k: state[v][:self.output_batch_size] for k, v in self.model_input_keys.items()}
|
||||||
gen.eval()
|
gen.eval()
|
||||||
|
|
Loading…
Reference in New Issue
Block a user