90 lines
3.2 KiB
Python
90 lines
3.2 KiB
Python
import os, cv2, numpy as np
|
|
from concurrent.futures import ProcessPoolExecutor, as_completed
|
|
|
|
# ---- geometry: build remap grids once per (src_w, src_h, face_size) ----
|
|
def build_cubemap_luts(src_w, src_h, face_size):
|
|
faces = ["right", "left", "front", "back"]
|
|
luts = {}
|
|
|
|
# normalized pixel grid (u,v) in [0,1]
|
|
ys, xs = np.meshgrid(
|
|
(np.arange(face_size) + 0.5) / face_size,
|
|
(np.arange(face_size) + 0.5) / face_size,
|
|
indexing="ij"
|
|
)
|
|
a = 2.0 * xs - 1.0
|
|
b = 2.0 * ys - 1.0
|
|
|
|
# direction vectors for each face (no Python loops)
|
|
dirs = {
|
|
"right": np.stack([ np.ones_like(a), b, -a ], axis=-1),
|
|
"left": np.stack([-np.ones_like(a), b, a ], axis=-1),
|
|
"front": np.stack([ a, b, np.ones_like(a)], axis=-1),
|
|
"back": np.stack([ -a, b, -np.ones_like(a)], axis=-1),
|
|
}
|
|
|
|
for face, d in dirs.items():
|
|
# normalize
|
|
n = np.linalg.norm(d, axis=-1, keepdims=True)
|
|
d = d / n
|
|
|
|
# spherical (theta: -pi..pi, phi: -pi/2..pi/2)
|
|
theta = np.arctan2(d[..., 2], d[..., 0])
|
|
phi = np.arcsin(d[..., 1])
|
|
|
|
# equirect UV in [0,1]
|
|
uf = 0.5 * (theta / np.pi + 1.0)
|
|
vf = 0.5 * (phi / (np.pi / 2) + 1.0)
|
|
|
|
# convert to source pixel coords (wrap horizontally!)
|
|
map_x = (uf * (src_w - 1)).astype(np.float32)
|
|
map_y = ((1.0 - vf) * (src_h - 1)).astype(np.float32)
|
|
|
|
# IMPORTANT: wrap horizontally to avoid seams at 0/360
|
|
map_x = np.mod(map_x, src_w).astype(np.float32)
|
|
|
|
luts[face] = (map_x, map_y)
|
|
|
|
return luts
|
|
|
|
def process_image(path_in, out_dir, face_size, luts):
|
|
img = cv2.imread(path_in, cv2.IMREAD_COLOR)
|
|
if img is None:
|
|
return f"Skipped (read error): {path_in}"
|
|
|
|
h, w = img.shape[:2]
|
|
base = os.path.splitext(os.path.basename(path_in))[0]
|
|
|
|
for face, (mx, my) in luts.items():
|
|
face_img = cv2.remap(
|
|
img, mx, my,
|
|
interpolation=cv2.INTER_LINEAR,
|
|
borderMode=cv2.BORDER_WRAP, # seamless horizontally
|
|
)
|
|
face_img = cv2.rotate(face_img, cv2.ROTATE_180)
|
|
cv2.imwrite(os.path.join(out_dir, f"{base}_{face}.jpg"), face_img)
|
|
|
|
return f"Done: {base}"
|
|
|
|
def main(input_dir='FRAMES', output_dir='cubemaps', face_size=1024, workers= max(1, os.cpu_count()-1)):
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
|
|
# peek one frame to get source size
|
|
sample = next((f for f in sorted(os.listdir(input_dir)) if f.lower().endswith((".jpg",".jpeg",".png"))), None)
|
|
assert sample, "No input frames found."
|
|
sample_img = cv2.imread(os.path.join(input_dir, sample))
|
|
h, w = sample_img.shape[:2]
|
|
|
|
# precompute LUTs once
|
|
luts = build_cubemap_luts(w, h, face_size)
|
|
|
|
# batch process (parallel)
|
|
files = [os.path.join(input_dir, f) for f in sorted(os.listdir(input_dir)) if f.lower().endswith((".jpg",".jpeg",".png"))]
|
|
with ProcessPoolExecutor(max_workers=workers) as ex:
|
|
futures = [ex.submit(process_image, p, output_dir, face_size, luts) for p in files]
|
|
for _ in as_completed(futures):
|
|
pass # keep silent; or print(_) to see progress
|
|
|
|
if __name__ == "__main__":
|
|
main()
|