Last active
July 3, 2024 07:50
-
-
Save naveenrobo/007a46447c589766a78ad35b0d17942a to your computer and use it in GitHub Desktop.
imu_bosch.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import rclpy | |
from rclpy.node import Node | |
from std_msgs.msg import Float32 | |
from tf_transformations import euler_from_quaternion | |
from sensor_msgs.msg import Imu, MagneticField | |
from geometry_msgs.msg import Vector3Stamped | |
from math import radians | |
from numpy import sign | |
import socket | |
import json | |
import time | |
import traceback | |
class ImuDataExtract(Node): | |
def __init__(self): | |
super().__init__("imu_data_extract") | |
self.declare_parameters( | |
namespace='', | |
parameters=[ | |
('udp_port', rclpy.Parameter.Type.INTEGER), | |
('calibration_mode', False) | |
] | |
) | |
self.udp_port = self.get_parameter('udp_port').value | |
self.calibration_mode = self.get_parameter('calibration_mode').value | |
self.initial_angle = None | |
self.imu_ready = True # Assuming IMU is ready since data is coming from mobile | |
self.default_value = False | |
self.last_default_value = None | |
self.imu_error = True | |
self.start_time = None | |
self.error_start_time = None | |
self.ori_pub = self.create_publisher(Vector3Stamped, '/orientation', 10) | |
self.imu_data_pub = self.create_publisher(Imu, "/sensors/imu", 10) | |
if self.calibration_mode: | |
self.accl_pub = self.create_publisher(Imu, '/imu/accelerometer', 10) | |
self.mag_data_pub = self.create_publisher(MagneticField, '/imu/magnetometer', 10) | |
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
self.socket.bind(("0.0.0.0", self.udp_port)) | |
print(f"Listening for UDP packets on port {self.udp_port}") | |
def read_imu_data(self): | |
while rclpy.ok(): | |
try: | |
data, addr = self.socket.recvfrom(1024) # Buffer size is 1024 bytes | |
data = data.decode("utf-8").strip() | |
data_dict = json.loads(data) | |
euler_angle = euler_from_quaternion( | |
[data_dict['quatX'], data_dict['quatY'], data_dict['quatZ'], data_dict['quatW']])[2] | |
if self.initial_angle is None: | |
self.initial_angle = euler_angle | |
euler_angle = self.initial_angle - euler_angle | |
if sign(euler_angle) == -1: | |
euler_angle += 6.283185307179586 | |
val_array = list(data_dict.values()) | |
default_val = all(c == 0.0 for c in val_array) | |
if default_val and default_val != self.last_default_value: | |
self.error_start_time = time.time() | |
self.imu_error = True | |
elif not default_val and default_val != self.last_default_value: | |
self.error_start_time = None | |
self.imu_error = False | |
if self.imu_error: | |
if self.error_start_time is not None: | |
elapsed_time = time.time() - self.error_start_time | |
if elapsed_time > 5: | |
print("IMU error detected. No valid data for more than 5 seconds.") | |
else: | |
imu = Imu() | |
imu.header.frame_id = "imu_link" | |
imu.header.stamp = self.get_clock().now().to_msg() | |
imu.orientation.x = data_dict['quatX'] | |
imu.orientation.y = data_dict['quatY'] | |
imu.orientation.z = data_dict['quatZ'] | |
imu.orientation.w = data_dict['quatW'] | |
imu.angular_velocity.x = radians(data_dict['gyroX']) | |
imu.angular_velocity.y = radians(data_dict['gyroY']) | |
imu.angular_velocity.z = radians(data_dict['gyroZ']) | |
imu.linear_acceleration.x = data_dict['accelX'] | |
imu.linear_acceleration.y = data_dict['accelY'] | |
imu.linear_acceleration.z = data_dict['accelZ'] | |
self.imu_data_pub.publish(imu) | |
data_msg = Vector3Stamped() | |
data_msg.header.frame_id = 'imu_link' | |
data_msg.header.stamp = self.get_clock().now().to_msg() | |
data_msg.vector.z = euler_angle | |
self.ori_pub.publish(data_msg) | |
if self.calibration_mode: | |
mag_data = MagneticField() | |
mag_data.header.frame_id = 'imu_link' | |
mag_data.header.stamp = self.get_clock().now().to_msg() | |
mag_data.magnetic_field.x = data_dict['magX'] | |
mag_data.magnetic_field.y = data_dict['magY'] | |
mag_data.magnetic_field.z = data_dict['magZ'] | |
self.mag_data_pub.publish(mag_data) | |
self.accl_pub.publish(imu) | |
self.last_default_value = default_val | |
except Exception as e: | |
print(f"Unexpected error: {e}\n{traceback.format_exc()}") | |
def BoschIMUNode(args=None): | |
rclpy.init(args=args) | |
imu_extract = ImuDataExtract() | |
imu_extract.read_imu_data() | |
imu_extract.destroy_node() | |
rclpy.shutdown() | |
if __name__ == '__main__': | |
BoschIMUNode() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment