Last active
February 14, 2022 08:35
-
-
Save marc-tonsen/0bff75f776da16c594ced1eef9fdd397 to your computer and use it in GitHub Desktop.
This code undistorts the scene video of a Pupil Invisible recording in raw binary format. The camera intrinisics used for the undistortion are downloaded from Pupil Cloud, so a connection to the internet is necessary.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import av | |
import cv2 | |
import json | |
import struct | |
import pathlib | |
import argparse | |
import requests | |
import itertools | |
import numpy as np | |
from tqdm import tqdm | |
def fetch_intrinsics(serial_number): | |
""" | |
Using the serial number of the scene camera, fetch the camera matrix and | |
distortion coefficients. | |
You can find the serial number of the scene camera on the back of the scene | |
camera module, or in the info.json file of a recording. Note, that this is a | |
different serial number from the 'glasses serial number'. | |
""" | |
if serial_number == "default": | |
serial_number = "00000" | |
base_url = ( | |
"https://pupil-invisible-hardware-calibrations" ".s3.eu-central-1.amazonaws.com" | |
) | |
version = "v1" | |
response = requests.get(base_url + "/" + version + "/" + serial_number) | |
response.raise_for_status() | |
binary_data = response.content | |
data = struct.unpack("<5s 9d 8d 9d", binary_data) | |
result = { | |
"serial_number": data[0].decode("utf-8"), | |
"camera_matrix": np.array(data[1:10], dtype=np.float64).reshape(3, 3), | |
"distortion_coefficients": np.array(data[10:18], dtype=np.float64).reshape( | |
1, 8 | |
), | |
} | |
if result["serial_number"] != serial_number: | |
raise ValueError( | |
"The serial number returned from the cloud does not" | |
"match the queried serial number!" | |
) | |
return result["camera_matrix"], result["distortion_coefficients"] | |
def undistort_video( | |
original_video_path, undistorted_video_path, camera_matrix, dist_coefs | |
): | |
original_container = av.open(str(original_video_path)) | |
original_video_stream = original_container.streams.video[0] | |
undistorted_container = av.open(str(undistorted_video_path), "w") | |
try: | |
undistorted_video = undistorted_container.add_stream("h264_nvenc") | |
except Exception as e: | |
print("nvenc not available", e) | |
undistorted_video = undistorted_container.add_stream("h264") | |
undistorted_video.options["bf"] = "0" | |
undistorted_video.options["movflags"] = "faststart" | |
undistorted_video.gop_size = original_video_stream.gop_size | |
undistorted_video.codec_context.height = original_video_stream.height | |
undistorted_video.codec_context.width = original_video_stream.width | |
undistorted_video.codec_context.time_base = original_video_stream.time_base | |
undistorted_video.codec_context.bit_rate = original_video_stream.bit_rate | |
if original_container.streams.audio: | |
audio_stream = original_container.streams.audio[0] | |
output_audio_stream = undistorted_container.add_stream("aac") | |
output_audio_stream.codec_context.layout = audio_stream.layout.name | |
output_audio_stream.codec_context.time_base = audio_stream.time_base | |
output_audio_stream.codec_context.extradata = audio_stream.extradata | |
output_audio_stream.codec_context.bit_rate = audio_stream.bit_rate | |
output_audio_stream.codec_context.sample_rate = audio_stream.sample_rate | |
progress = tqdm(unit=" frames") | |
with undistorted_container: | |
for packet in original_container.demux(): | |
frames = packet.decode() | |
if packet.stream.type == "audio": | |
for frame in frames: | |
frame.pts = None | |
packets = output_audio_stream.encode(frame) | |
undistorted_container.mux(packets) | |
elif packet.stream.type == "video": | |
for frame in frames: | |
img = frame.to_ndarray(format="bgr24") | |
undistorted_img = cv2.undistort(img, camera_matrix, dist_coefs) | |
undistorted_img = img | |
new_frame = frame.from_ndarray(undistorted_img, format="bgr24") | |
new_frame.pts = frame.pts | |
packets = undistorted_video.encode(new_frame) | |
progress.update() | |
undistorted_container.mux(packets) | |
def undistort_recording(recording_or_video_path: pathlib.Path): | |
if (recording_or_video_path / "info.json").exists(): | |
rec_path = recording_or_video_path | |
video_paths = [ | |
path | |
for path in itertools.chain( | |
rec_path.glob("PI world v1 ps*.mp4"), rec_path.rglob("scene.mp4") | |
) | |
] | |
else: | |
rec_path = recording_or_video_path.parent | |
video_paths = [recording_or_video_path] | |
info_file = rec_path / "info.json" | |
camera_serial = json.load(info_file.open("rb"))["scene_camera_serial_number"] | |
camera_matrix, dist_coefs = fetch_intrinsics(camera_serial) | |
for video_path in video_paths: | |
undistorted_video_path = video_path.with_stem(f"undistorted-{video_path.stem}") | |
print(f"undistorting {video_path} to {undistorted_video_path}") | |
undistort_video(video_path, undistorted_video_path, camera_matrix, dist_coefs) | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description="undistort video") | |
parser.add_argument( | |
"recording_or_video_path", default=".", type=pathlib.Path, help="" | |
) | |
args = parser.parse_args() | |
undistort_recording(args.recording_or_video_path) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment