Created
January 27, 2024 11:53
-
-
Save wildlarva/8dae3b01d3be50f9c3746ed7245dbdfa to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Copyright 2016 Open Source Robotics Foundation, Inc. | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
import rclpy | |
from rclpy.node import Node | |
from std_msgs.msg import String | |
from typing import Callable | |
from typing import Optional | |
from typing import TypeVar | |
from typing import Union | |
from rclpy.callback_groups import CallbackGroup | |
from rclpy.exceptions import InvalidTopicNameException | |
from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy | |
from rclpy.qos import QoSProfile | |
from rclpy.qos_event import SubscriptionEventCallbacks | |
from rclpy.qos_overriding_options import _declare_qos_parameters | |
from rclpy.qos_overriding_options import QoSOverridingOptions | |
from rclpy.subscription import Subscription | |
from rclpy.type_support import check_is_valid_msg_type | |
MsgType = TypeVar('MsgType') | |
class NodeEx(Node): | |
def create_subscription_ex( | |
self, | |
msg_type, | |
topic: str, | |
callback: Callable[[MsgType], None] | None, | |
qos_profile: Union[QoSProfile, int], | |
*, | |
callback_group: Optional[CallbackGroup] = None, | |
event_callbacks: Optional[SubscriptionEventCallbacks] = None, | |
qos_overriding_options: Optional[QoSOverridingOptions] = None, | |
raw: bool = False | |
) -> Subscription: | |
""" | |
Create a new subscription. | |
:param msg_type: The type of ROS messages the subscription will subscribe to. | |
:param topic: The name of the topic the subscription will subscribe to. | |
:param callback: A user-defined callback function that is called when a message is | |
received by the subscription. | |
:param qos_profile: A QoSProfile or a history depth to apply to the subscription. | |
In the case that a history depth is provided, the QoS history is set to | |
KEEP_LAST, the QoS history depth is set to the value | |
of the parameter, and all other QoS settings are set to their default values. | |
:param callback_group: The callback group for the subscription. If ``None``, then the | |
default callback group for the node is used. | |
:param event_callbacks: User-defined callbacks for middleware events. | |
:param raw: If ``True``, then received messages will be stored in raw binary | |
representation. | |
""" | |
qos_profile = self._validate_qos_or_depth_parameter(qos_profile) | |
callback_group = callback_group or self.default_callback_group | |
try: | |
final_topic = self.resolve_topic_name(topic) | |
except RuntimeError: | |
# if it's name validation error, raise a more appropriate exception. | |
try: | |
self._validate_topic_or_service_name(topic) | |
except InvalidTopicNameException as ex: | |
raise ex from None | |
# else reraise the previous exception | |
raise | |
if qos_overriding_options is None: | |
qos_overriding_options = QoSOverridingOptions([]) | |
_declare_qos_parameters( | |
Subscription, self, final_topic, qos_profile, qos_overriding_options) | |
# this line imports the typesupport for the message module if not already done | |
failed = False | |
check_is_valid_msg_type(msg_type) | |
try: | |
with self.handle: | |
subscription_object = _rclpy.Subscription( | |
self.handle, msg_type, topic, qos_profile.get_c_qos_profile()) | |
except ValueError: | |
failed = True | |
if failed: | |
self._validate_topic_or_service_name(topic) | |
try: | |
subscription = Subscription( | |
subscription_object, msg_type, | |
topic, callback, callback_group, qos_profile, raw, | |
event_callbacks=event_callbacks or SubscriptionEventCallbacks()) | |
except Exception: | |
subscription_object.destroy_when_not_in_use() | |
raise | |
# Comment out registration not to execute subscription callback | |
# callback_group.add_entity(subscription) | |
# self._subscriptions.append(subscription) | |
# self._wake_executor() | |
# for event_handler in subscription.event_handlers: | |
# self.add_waitable(event_handler) | |
return subscription | |
class MinimalSubscriber(NodeEx): | |
count_ = 0 | |
def __init__(self): | |
super().__init__('minimal_subscriber') | |
self.subscription = self.create_subscription_ex( | |
String, | |
'topic', | |
self.listener_callback, | |
10) | |
timer_period = 1 # seconds | |
self.timer = self.create_timer(timer_period, self.timer_callback) | |
def timer_callback(self): | |
if not self.subscription: | |
return; | |
msg_array = self.subscription.handle.take_message(self.subscription.msg_type, self.subscription.raw) | |
if not msg_array: | |
return | |
msg = msg_array[0] | |
self.get_logger().info('I heard: "%s"' % msg.data) | |
def listener_callback(self, msg): | |
self.get_logger().info('I heard: "%s"' % msg.data) | |
def main(args=None): | |
rclpy.init(args=args) | |
minimal_subscriber = MinimalSubscriber() | |
rclpy.spin(minimal_subscriber) | |
minimal_subscriber.destroy_node() | |
rclpy.shutdown() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment