import os, cv2, numpy as np from concurrent.futures import ProcessPoolExecutor, as_completed # ---- geometry: build remap grids once per (src_w, src_h, face_size) ---- def build_cubemap_luts(src_w, src_h, face_size): faces = ["right", "left", "front", "back"] luts = {} # normalized pixel grid (u,v) in [0,1] ys, xs = np.meshgrid( (np.arange(face_size) + 0.5) / face_size, (np.arange(face_size) + 0.5) / face_size, indexing="ij" ) a = 2.0 * xs - 1.0 b = 2.0 * ys - 1.0 # direction vectors for each face (no Python loops) dirs = { "right": np.stack([ np.ones_like(a), b, -a ], axis=-1), "left": np.stack([-np.ones_like(a), b, a ], axis=-1), "front": np.stack([ a, b, np.ones_like(a)], axis=-1), "back": np.stack([ -a, b, -np.ones_like(a)], axis=-1), } for face, d in dirs.items(): # normalize n = np.linalg.norm(d, axis=-1, keepdims=True) d = d / n # spherical (theta: -pi..pi, phi: -pi/2..pi/2) theta = np.arctan2(d[..., 2], d[..., 0]) phi = np.arcsin(d[..., 1]) # equirect UV in [0,1] uf = 0.5 * (theta / np.pi + 1.0) vf = 0.5 * (phi / (np.pi / 2) + 1.0) # convert to source pixel coords (wrap horizontally!) map_x = (uf * (src_w - 1)).astype(np.float32) map_y = ((1.0 - vf) * (src_h - 1)).astype(np.float32) # IMPORTANT: wrap horizontally to avoid seams at 0/360 map_x = np.mod(map_x, src_w).astype(np.float32) luts[face] = (map_x, map_y) return luts def process_image(path_in, out_dir, face_size, luts): img = cv2.imread(path_in, cv2.IMREAD_COLOR) if img is None: return f"Skipped (read error): {path_in}" h, w = img.shape[:2] base = os.path.splitext(os.path.basename(path_in))[0] for face, (mx, my) in luts.items(): face_img = cv2.remap( img, mx, my, interpolation=cv2.INTER_LINEAR, borderMode=cv2.BORDER_WRAP, # seamless horizontally ) face_img = cv2.rotate(face_img, cv2.ROTATE_180) cv2.imwrite(os.path.join(out_dir, f"{base}_{face}.jpg"), face_img) return f"Done: {base}" def main(input_dir='FRAMES', output_dir='cubemaps', face_size=1024, workers= max(1, os.cpu_count()-1)): os.makedirs(output_dir, exist_ok=True) # peek one frame to get source size sample = next((f for f in sorted(os.listdir(input_dir)) if f.lower().endswith((".jpg",".jpeg",".png"))), None) assert sample, "No input frames found." sample_img = cv2.imread(os.path.join(input_dir, sample)) h, w = sample_img.shape[:2] # precompute LUTs once luts = build_cubemap_luts(w, h, face_size) # batch process (parallel) files = [os.path.join(input_dir, f) for f in sorted(os.listdir(input_dir)) if f.lower().endswith((".jpg",".jpeg",".png"))] with ProcessPoolExecutor(max_workers=workers) as ex: futures = [ex.submit(process_image, p, output_dir, face_size, luts) for p in files] for _ in as_completed(futures): pass # keep silent; or print(_) to see progress if __name__ == "__main__": main()