import glob import math import os import pickle import random import cv2 import numpy import numpy as np import torch import torchvision #################### # Files & IO #################### ###################### get image path list ###################### IMG_EXTENSIONS = ['.jpg', '.JPG', '.jpeg', '.JPEG', '.png', '.PNG', '.ppm', '.PPM', '.bmp', '.BMP', '.webp', '.WEBP'] def torch2cv(tensor): if len(tensor.shape) == 4: squeezed = True tensor = tensor.squeeze(0) assert len(tensor.shape) == 3 pil = torchvision.transforms.ToPILImage()(tensor) np = numpy.array(pil) return cv2.cvtColor(np, cv2.COLOR_RGB2BGR) / 255.0 def cv2torch(cv, batchify=True): cv = cv2.cvtColor(cv, cv2.COLOR_BGR2RGB) tens = torch.from_numpy(np.ascontiguousarray( np.transpose(cv, (2, 0, 1)))).float() if batchify: tens = tens.unsqueeze(0) return tens def is_image_file(filename): return any(filename.endswith(extension) for extension in IMG_EXTENSIONS) def is_wav_file(filename): return filename.endswith('.wav') def is_audio_file(filename): AUDIO_EXTENSIONS = ['.wav', '.mp3', '.wma', '.m4b', '.flac', '.aac'] return any(filename.endswith(extension) for extension in AUDIO_EXTENSIONS) def _get_paths_from_images(path, qualifier=is_image_file): """get image path list from image folder""" assert os.path.isdir(path), '{:s} is not a valid directory'.format(path) images = [] for dirpath, _, fnames in sorted(os.walk(path)): for fname in sorted(fnames): if qualifier(fname) and 'ref.jpg' not in fname: img_path = os.path.join(dirpath, fname) images.append(img_path) if not images: print("Warning: {:s} has no valid image file".format(path)) return images def _get_paths_from_lmdb(dataroot): """get image path list from lmdb meta info""" meta_info = pickle.load( open(os.path.join(dataroot, 'meta_info.pkl'), 'rb')) paths = meta_info['keys'] sizes = meta_info['resolution'] if len(sizes) == 1: sizes = sizes * len(paths) return paths, sizes def find_audio_files(dataroot, include_nonwav=False): if include_nonwav: return find_files_of_type(None, dataroot, qualifier=is_audio_file)[0] else: return find_files_of_type(None, dataroot, qualifier=is_wav_file)[0] def find_files_of_type(data_type, dataroot, weights=[], qualifier=is_image_file): if isinstance(dataroot, list): paths = [] for i in range(len(dataroot)): r = dataroot[i] extends = 1 # Weights have the effect of repeatedly adding the paths from the given root to the final product. if weights: extends = weights[i] for j in range(extends): paths.extend(_get_paths_from_images(r, qualifier)) paths = sorted(paths) sizes = len(paths) else: paths = sorted(_get_paths_from_images(dataroot, qualifier)) sizes = len(paths) return paths, sizes def glob_file_list(root): return sorted(glob.glob(os.path.join(root, '*'))) ###################### read images ###################### def _read_img_lmdb(env, key, size): """read image from lmdb with key (w/ and w/o fixed size) size: (C, H, W) tuple""" with env.begin(write=False) as txn: buf = txn.get(key.encode('ascii')) img_flat = np.frombuffer(buf, dtype=np.uint8) C, H, W = size img = img_flat.reshape(H, W, C) return img def read_img(env, path, size=None, rgb=False): """read image by cv2 or from lmdb or from a buffer (in which case path=buffer) return: Numpy float32, HWC, BGR, [0,1]""" if env is None: # img # Indirect open then process to support unicode files. stream = open(path, "rb") bytes = bytearray(stream.read()) img = cv2.imdecode(np.asarray(bytes, dtype=np.uint8), cv2.IMREAD_UNCHANGED) elif env == 'lmdb': img = _read_img_lmdb(env, path, size) elif env == 'buffer': img = cv2.imdecode(path, cv2.IMREAD_UNCHANGED) else: raise NotImplementedError("Unsupported env: %s" % (env,)) if img is None: print("Image error: %s" % (path,)) img = img.astype(np.float32) / 255. if img.ndim == 2: img = np.expand_dims(img, axis=2) # some images have 4 channels if img.shape[2] > 3: img = img[:, :, :3] if rgb: img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) return img def read_img_seq(path): """Read a sequence of images from a given folder path Args: path (list/str): list of image paths/image folder path Returns: imgs (Tensor): size (T, C, H, W), RGB, [0, 1] """ if type(path) is list: img_path_l = path else: img_path_l = sorted(glob.glob(os.path.join(path, '*'))) img_l = [read_img(None, v) for v in img_path_l] # stack to Torch tensor imgs = np.stack(img_l, axis=0) imgs = imgs[:, :, :, [2, 1, 0]] imgs = torch.from_numpy(np.ascontiguousarray( np.transpose(imgs, (0, 3, 1, 2)))).float() return imgs def index_generation(crt_i, max_n, N, padding='reflection'): """Generate an index list for reading N frames from a sequence of images Args: crt_i (int): current center index max_n (int): max number of the sequence of images (calculated from 1) N (int): reading N frames padding (str): padding mode, one of replicate | reflection | new_info | circle Example: crt_i = 0, N = 5 replicate: [0, 0, 0, 1, 2] reflection: [2, 1, 0, 1, 2] new_info: [4, 3, 0, 1, 2] circle: [3, 4, 0, 1, 2] Returns: return_l (list [int]): a list of indexes """ max_n = max_n - 1 n_pad = N // 2 return_l = [] for i in range(crt_i - n_pad, crt_i + n_pad + 1): if i < 0: if padding == 'replicate': add_idx = 0 elif padding == 'reflection': add_idx = -i elif padding == 'new_info': add_idx = (crt_i + n_pad) + (-i) elif padding == 'circle': add_idx = N + i else: raise ValueError('Wrong padding mode') elif i > max_n: if padding == 'replicate': add_idx = max_n elif padding == 'reflection': add_idx = max_n * 2 - i elif padding == 'new_info': add_idx = (crt_i - n_pad) - (i - max_n) elif padding == 'circle': add_idx = i - N else: raise ValueError('Wrong padding mode') else: add_idx = i return_l.append(add_idx) return return_l #################### # image processing # process on numpy image #################### def augment(img_list, hflip=True, rot=True): """horizontal flip OR rotate (0, 90, 180, 270 degrees)""" hflip = hflip and random.random() < 0.5 vflip = rot and random.random() < 0.5 rot90 = rot and random.random() < 0.5 def _augment(img): if hflip: img = img[:, ::-1, :] if vflip: img = img[::-1, :, :] if rot90: img = img.transpose(1, 0, 2) return img return [_augment(img) for img in img_list] def augment_flow(img_list, flow_list, hflip=True, rot=True): """horizontal flip OR rotate (0, 90, 180, 270 degrees) with flows""" hflip = hflip and random.random() < 0.5 vflip = rot and random.random() < 0.5 rot90 = rot and random.random() < 0.5 def _augment(img): if hflip: img = img[:, ::-1, :] if vflip: img = img[::-1, :, :] if rot90: img = img.transpose(1, 0, 2) return img def _augment_flow(flow): if hflip: flow = flow[:, ::-1, :] flow[:, :, 0] *= -1 if vflip: flow = flow[::-1, :, :] flow[:, :, 1] *= -1 if rot90: flow = flow.transpose(1, 0, 2) flow = flow[:, :, [1, 0]] return flow rlt_img_list = [_augment(img) for img in img_list] rlt_flow_list = [_augment_flow(flow) for flow in flow_list] return rlt_img_list, rlt_flow_list def channel_convert(in_c, tar_type, img_list): """conversion among BGR, gray and y""" if in_c == 3 and tar_type == 'gray': # BGR to gray gray_list = [cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) for img in img_list] return [np.expand_dims(img, axis=2) for img in gray_list] elif in_c == 3 and tar_type == 'y': # BGR to y y_list = [bgr2ycbcr(img, only_y=True) for img in img_list] return [np.expand_dims(img, axis=2) for img in y_list] elif in_c == 1 and tar_type == 'RGB': # gray/y to BGR return [cv2.cvtColor(img, cv2.COLOR_GRAY2BGR) for img in img_list] else: return img_list def rgb2ycbcr(img, only_y=True): """same as matlab rgb2ycbcr only_y: only return Y channel Input: uint8, [0, 255] float, [0, 1] """ in_img_type = img.dtype img.astype(np.float32) if in_img_type != np.uint8: img *= 255. # convert if only_y: rlt = np.dot(img, [65.481, 128.553, 24.966]) / 255.0 + 16.0 else: rlt = np.matmul(img, [[65.481, -37.797, 112.0], [128.553, -74.203, -93.786], [24.966, 112.0, -18.214]]) / 255.0 + [16, 128, 128] if in_img_type == np.uint8: rlt = rlt.round() else: rlt /= 255. return rlt.astype(in_img_type) def bgr2ycbcr(img, only_y=True): """bgr version of rgb2ycbcr only_y: only return Y channel Input: uint8, [0, 255] float, [0, 1] """ in_img_type = img.dtype img.astype(np.float32) if in_img_type != np.uint8: img *= 255. # convert if only_y: rlt = np.dot(img, [24.966, 128.553, 65.481]) / 255.0 + 16.0 else: rlt = np.matmul(img, [[24.966, 112.0, -18.214], [128.553, -74.203, -93.786], [65.481, -37.797, 112.0]]) / 255.0 + [16, 128, 128] if in_img_type == np.uint8: rlt = rlt.round() else: rlt /= 255. return rlt.astype(in_img_type) def ycbcr2rgb(img): """same as matlab ycbcr2rgb Input: uint8, [0, 255] float, [0, 1] """ in_img_type = img.dtype img.astype(np.float32) if in_img_type != np.uint8: img *= 255. # convert rlt = np.matmul(img, [[0.00456621, 0.00456621, 0.00456621], [0, -0.00153632, 0.00791071], [0.00625893, -0.00318811, 0]]) * 255.0 + [-222.921, 135.576, -276.836] if in_img_type == np.uint8: rlt = rlt.round() else: rlt /= 255. return rlt.astype(in_img_type) def modcrop(img_in, scale): """img_in: Numpy, HWC or HW""" img = np.copy(img_in) if img.ndim == 2: H, W = img.shape H_r, W_r = H % scale, W % scale img = img[:H - H_r, :W - W_r] elif img.ndim == 3: H, W, C = img.shape H_r, W_r = H % scale, W % scale img = img[:H - H_r, :W - W_r, :] else: raise ValueError('Wrong img ndim: [{:d}].'.format(img.ndim)) return img #################### # Functions #################### # matlab 'imresize' function, now only support 'bicubic' def cubic(x): absx = torch.abs(x) absx2 = absx**2 absx3 = absx**3 return (1.5 * absx3 - 2.5 * absx2 + 1) * ( (absx <= 1).type_as(absx)) + (-0.5 * absx3 + 2.5 * absx2 - 4 * absx + 2) * (( (absx > 1) * (absx <= 2)).type_as(absx)) def calculate_weights_indices(in_length, out_length, scale, kernel, kernel_width, antialiasing): if (scale < 1) and (antialiasing): # Use a modified kernel to simultaneously interpolate and antialias- larger kernel width kernel_width = kernel_width / scale # Output-space coordinates x = torch.linspace(1, out_length, out_length) # Input-space coordinates. Calculate the inverse mapping such that 0.5 # in output space maps to 0.5 in input space, and 0.5+scale in output # space maps to 1.5 in input space. u = x / scale + 0.5 * (1 - 1 / scale) # What is the left-most pixel that can be involved in the computation? left = torch.floor(u - kernel_width / 2) # What is the maximum number of pixels that can be involved in the # computation? Note: it's OK to use an extra pixel here; if the # corresponding weights are all zero, it will be eliminated at the end # of this function. P = math.ceil(kernel_width) + 2 # The indices of the input pixels involved in computing the k-th output # pixel are in row k of the indices matrix. indices = left.view(out_length, 1).expand(out_length, P) + torch.linspace(0, P - 1, P).view( 1, P).expand(out_length, P) # The weights used to compute the k-th output pixel are in row k of the # weights matrix. distance_to_center = u.view(out_length, 1).expand(out_length, P) - indices # apply cubic kernel if (scale < 1) and (antialiasing): weights = scale * cubic(distance_to_center * scale) else: weights = cubic(distance_to_center) # Normalize the weights matrix so that each row sums to 1. weights_sum = torch.sum(weights, 1).view(out_length, 1) weights = weights / weights_sum.expand(out_length, P) # If a column in weights is all zero, get rid of it. only consider the first and last column. weights_zero_tmp = torch.sum((weights == 0), 0) if not math.isclose(weights_zero_tmp[0], 0, rel_tol=1e-6): indices = indices.narrow(1, 1, P - 2) weights = weights.narrow(1, 1, P - 2) if not math.isclose(weights_zero_tmp[-1], 0, rel_tol=1e-6): indices = indices.narrow(1, 0, P - 2) weights = weights.narrow(1, 0, P - 2) weights = weights.contiguous() indices = indices.contiguous() sym_len_s = -indices.min() + 1 sym_len_e = indices.max() - in_length indices = indices + sym_len_s - 1 return weights, indices, int(sym_len_s), int(sym_len_e) def imresize(img, scale, antialiasing=True): # Now the scale should be the same for H and W # input: img: CHW RGB [0,1] # output: CHW RGB [0,1] w/o round in_C, in_H, in_W = img.size() _, out_H, out_W = in_C, math.ceil(in_H * scale), math.ceil(in_W * scale) kernel_width = 4 kernel = 'cubic' # Return the desired dimension order for performing the resize. The # strategy is to perform the resize first along the dimension with the # smallest scale factor. # Now we do not support this. # get weights and indices weights_H, indices_H, sym_len_Hs, sym_len_He = calculate_weights_indices( in_H, out_H, scale, kernel, kernel_width, antialiasing) weights_W, indices_W, sym_len_Ws, sym_len_We = calculate_weights_indices( in_W, out_W, scale, kernel, kernel_width, antialiasing) # process H dimension # symmetric copying img_aug = torch.FloatTensor(in_C, in_H + sym_len_Hs + sym_len_He, in_W) img_aug.narrow(1, sym_len_Hs, in_H).copy_(img) sym_patch = img[:, :sym_len_Hs, :] inv_idx = torch.arange(sym_patch.size(1) - 1, -1, -1).long() sym_patch_inv = sym_patch.index_select(1, inv_idx) img_aug.narrow(1, 0, sym_len_Hs).copy_(sym_patch_inv) sym_patch = img[:, -sym_len_He:, :] inv_idx = torch.arange(sym_patch.size(1) - 1, -1, -1).long() sym_patch_inv = sym_patch.index_select(1, inv_idx) img_aug.narrow(1, sym_len_Hs + in_H, sym_len_He).copy_(sym_patch_inv) out_1 = torch.FloatTensor(in_C, out_H, in_W) kernel_width = weights_H.size(1) for i in range(out_H): idx = int(indices_H[i][0]) out_1[0, i, :] = img_aug[0, idx:idx + kernel_width, :].transpose(0, 1).mv(weights_H[i]) out_1[1, i, :] = img_aug[1, idx:idx + kernel_width, :].transpose(0, 1).mv(weights_H[i]) out_1[2, i, :] = img_aug[2, idx:idx + kernel_width, :].transpose(0, 1).mv(weights_H[i]) # process W dimension # symmetric copying out_1_aug = torch.FloatTensor(in_C, out_H, in_W + sym_len_Ws + sym_len_We) out_1_aug.narrow(2, sym_len_Ws, in_W).copy_(out_1) sym_patch = out_1[:, :, :sym_len_Ws] inv_idx = torch.arange(sym_patch.size(2) - 1, -1, -1).long() sym_patch_inv = sym_patch.index_select(2, inv_idx) out_1_aug.narrow(2, 0, sym_len_Ws).copy_(sym_patch_inv) sym_patch = out_1[:, :, -sym_len_We:] inv_idx = torch.arange(sym_patch.size(2) - 1, -1, -1).long() sym_patch_inv = sym_patch.index_select(2, inv_idx) out_1_aug.narrow(2, sym_len_Ws + in_W, sym_len_We).copy_(sym_patch_inv) out_2 = torch.FloatTensor(in_C, out_H, out_W) kernel_width = weights_W.size(1) for i in range(out_W): idx = int(indices_W[i][0]) out_2[0, :, i] = out_1_aug[0, :, idx:idx + kernel_width].mv(weights_W[i]) out_2[1, :, i] = out_1_aug[1, :, idx:idx + kernel_width].mv(weights_W[i]) out_2[2, :, i] = out_1_aug[2, :, idx:idx + kernel_width].mv(weights_W[i]) return out_2 def imresize_np(img, scale, antialiasing=True): # Now the scale should be the same for H and W # input: img: Numpy, HWC BGR [0,1] # output: HWC BGR [0,1] w/o round img = torch.from_numpy(img) in_H, in_W, in_C = img.size() _, out_H, out_W = in_C, math.ceil(in_H * scale), math.ceil(in_W * scale) kernel_width = 4 kernel = 'cubic' # Return the desired dimension order for performing the resize. The # strategy is to perform the resize first along the dimension with the # smallest scale factor. # Now we do not support this. # get weights and indices weights_H, indices_H, sym_len_Hs, sym_len_He = calculate_weights_indices( in_H, out_H, scale, kernel, kernel_width, antialiasing) weights_W, indices_W, sym_len_Ws, sym_len_We = calculate_weights_indices( in_W, out_W, scale, kernel, kernel_width, antialiasing) # process H dimension # symmetric copying img_aug = torch.FloatTensor(in_H + sym_len_Hs + sym_len_He, in_W, in_C) img_aug.narrow(0, sym_len_Hs, in_H).copy_(img) sym_patch = img[:sym_len_Hs, :, :] inv_idx = torch.arange(sym_patch.size(0) - 1, -1, -1).long() sym_patch_inv = sym_patch.index_select(0, inv_idx) img_aug.narrow(0, 0, sym_len_Hs).copy_(sym_patch_inv) sym_patch = img[-sym_len_He:, :, :] inv_idx = torch.arange(sym_patch.size(0) - 1, -1, -1).long() sym_patch_inv = sym_patch.index_select(0, inv_idx) img_aug.narrow(0, sym_len_Hs + in_H, sym_len_He).copy_(sym_patch_inv) out_1 = torch.FloatTensor(out_H, in_W, in_C) kernel_width = weights_H.size(1) for i in range(out_H): idx = int(indices_H[i][0]) out_1[i, :, 0] = img_aug[idx:idx + kernel_width, :, 0].transpose(0, 1).mv(weights_H[i]) out_1[i, :, 1] = img_aug[idx:idx + kernel_width, :, 1].transpose(0, 1).mv(weights_H[i]) out_1[i, :, 2] = img_aug[idx:idx + kernel_width, :, 2].transpose(0, 1).mv(weights_H[i]) # process W dimension # symmetric copying out_1_aug = torch.FloatTensor(out_H, in_W + sym_len_Ws + sym_len_We, in_C) out_1_aug.narrow(1, sym_len_Ws, in_W).copy_(out_1) sym_patch = out_1[:, :sym_len_Ws, :] inv_idx = torch.arange(sym_patch.size(1) - 1, -1, -1).long() sym_patch_inv = sym_patch.index_select(1, inv_idx) out_1_aug.narrow(1, 0, sym_len_Ws).copy_(sym_patch_inv) sym_patch = out_1[:, -sym_len_We:, :] inv_idx = torch.arange(sym_patch.size(1) - 1, -1, -1).long() sym_patch_inv = sym_patch.index_select(1, inv_idx) out_1_aug.narrow(1, sym_len_Ws + in_W, sym_len_We).copy_(sym_patch_inv) out_2 = torch.FloatTensor(out_H, out_W, in_C) kernel_width = weights_W.size(1) for i in range(out_W): idx = int(indices_W[i][0]) out_2[:, i, 0] = out_1_aug[:, idx:idx + kernel_width, 0].mv(weights_W[i]) out_2[:, i, 1] = out_1_aug[:, idx:idx + kernel_width, 1].mv(weights_W[i]) out_2[:, i, 2] = out_1_aug[:, idx:idx + kernel_width, 2].mv(weights_W[i]) return out_2.numpy() def load_paths_from_cache(paths, cache_path, exclusion_list=[], endswith=[], not_endswith=[]): if not isinstance(paths, list): paths = [paths] if os.path.exists(cache_path): output = torch.load(cache_path) else: print(f"Building cache for contents of {paths}..") output = [] for p in paths: output.extend(find_files_of_type( 'img', p, qualifier=is_audio_file)[0]) if exclusion_list is not None and len(exclusion_list) > 0: print(f"Removing exclusion lists..") before = len(output) master_set = set(output) exclusion_set = set(exclusion_list) output = list(master_set - exclusion_set) print(f"Excluded {before-len(output)} files.") if endswith is not None: before = len(output) def filter_fn(p): for e in endswith: if not p.endswith(e): return False for e in not_endswith: if p.endswith(e): return False return True output = list(filter(filter_fn, output)) print( f"!!Excluded {before-len(output)} files with endswith mask. For total of {len(output)} files") print("Done.") torch.save(output, cache_path) return output if __name__ == '__main__': # test imresize function # read images img = cv2.imread('test.png') img = img * 1.0 / 255 img = torch.from_numpy(np.transpose( img[:, :, [2, 1, 0]], (2, 0, 1))).float() # imresize scale = 1 / 4 import time total_time = 0 for i in range(10): start_time = time.time() rlt = imresize(img, scale, antialiasing=True) use_time = time.time() - start_time total_time += use_time print('average time: {}'.format(total_time / 10)) import torchvision.utils torchvision.utils.save_image((rlt * 255).round() / 255, 'rlt.png', nrow=1, padding=0, normalize=False)