Skip to content

Instantly share code, notes, and snippets.

@c0nn3r
Created August 25, 2017 23:41
Show Gist options
  • Save c0nn3r/96057c6e3cdcc25b54c1e7652e35bf10 to your computer and use it in GitHub Desktop.
Save c0nn3r/96057c6e3cdcc25b54c1e7652e35bf10 to your computer and use it in GitHub Desktop.
import cv2
import rospy
import numpy as np
from cv_bridge import CvBridge
from sensor_msgs.msg import CompressedImage, Image
class ImageLoader(object):
'''
Constantly loads images from a subscription channel and
sets a flag showing it has a new image to prevent double image
loading.
'''
def __init__(self,
subscribe_channel='/front_camera_flipped/image_color/compressed'):
self.image = np.zeros(1)
rospy.Subscriber(
subscribe_channel,
CompressedImage,
self.image_callback,
)
self.new_image = False
self.bridge = CvBridge()
def image_callback(self, compressed_image):
self.image = self.bridge.compressed_imgmsg_to_cv2(compressed_image,
desired_encoding='passthrough')
self.new_image = True
class CalibrationResult(object):
def __init__(self, truck, camera, camera_matrix, distortion):
self.truck = truck
self.camera = camera
self.camera_matrix = camera_matrix
self.distortion = distortion
def __str__(self):
return '{}: {}'.format(self.truck, self.camera)
class ImageCorrecter(object):
'''
Receives an image and (if enabled) applies the cameras calibration,
it then publishes the image to a channel.
'''
def __init__(self, publish_channel, image_loader, calibration, flip_image=True, enabled=True):
self.publisher = rospy.Publisher(
publish_channel,
Image,
queue_size=1,
)
self.image_loader = image_loader
self.camera_matrix = calibration.camera_matrix
self.distortion = calibration.distortion
self.flip_image = flip_image
self.enabled = enabled
self.bridge = CvBridge()
def publish(self, image):
message = self.bridge.cv2_to_imgmsg(image, encoding='rgb8')
self.publisher.publish(message)
def correct(self, image):
if self.flip_image:
image = cv2.flip(image, 0)
if self.enabled:
image = cv2.undistort(
image,
self.camera_matrix,
self.distortion,
)
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
return image
def start(self):
rospy.init_node('image_correcter', anonymous=False)
while not rospy.is_shutdown():
if self.image_loader.new_image:
corrected_image = self.correct(image_loader.image)
self.publish(corrected_image)
self.image_loader.new_image = False
if __name__ == '__main__':
image_loader = ImageLoader()
calibration_result = CalibrationResult(
truck='lizzie',
camera='center_camera',
camera_matrix=np.array([[227.285916, 0.000000, 332.218473],
[0.000000, 227.520197, 249.377605],
[0.000000, 0.000000, 1.000000]]),
distortion=np.array([0.008439, -0.001149, -0.000811, 0.000408, 0.000000])
)
print('Using calibration result... | {} |'.format(calibration_result))
image_correcter = ImageCorrecter(
'/corrected_image/',
image_loader,
calibration_result,
)
image_correcter.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment